Storm DRPC怎么使用

47次阅读
没有评论

共计 3258 个字符,预计需要花费 9 分钟才能阅读完成。

本篇内容主要讲解“Storm DRPC 怎么使用”,感兴趣的朋友不妨来看看。本文介绍的方法操作简单快捷,实用性强。下面就让丸趣 TV 小编来带大家学习“Storm DRPC 怎么使用”吧!

Storm 里面引入 DRPC 主要是利用 storm 的实时计算能力来并行化 CPU 密集型(CPU intensive)的计算任务。DRPC 的 stormtopology 以函数的参数流作为输入,而把这些函数调用的返回值作为 topology 的输出流。

DRPC 其实不能算是 storm 本身的一个特性,它是通过组合 storm 的原语 stream、spout、bolt、topology 而成的一种模式(pattern)。本来应该把 DRPC 单独打成一个包的,但是 DRPC 实在是太有用了,所以我们我们把它和 storm 捆绑在一起。

概览

Distributed RPC 是由一个”DPRC 服务器”协调(storm 自带了一个实现)。DRPC 服务器协调:①  接收一个 RPC 请求  ②  发送请求到 storm topology ③  从 storm topology 接收结果  ④  把结果发回给等待的客户端。从客户端的角度来看一个 DRPC 调用跟一个普通的 RPC 调用没有任何区别。比如下面是客户端如何调用 RPC 计算“reach”功能(function)的结果

 

DRPCClient client = new DRPCClient(drpc-host , 3772);

 

String result = client.execute(reach , http://twitter.com

 

DRPC 的工作流大致是这样的(重要☆):

客户端给 DRPC 服务器发送要执行的函数(function)的名字,以及这个函数的参数。实现了这个函数的 topology 使用 DRPCSpout 从 DRPC 服务器接收函数调用流,每个函数调用被 DRPC 服务器标记了一个唯一的 id。这个 topology 然后计算结果,在 topology 的最后,一个叫做 ReturnResults 的 bolt 会连接到 DRPC 服务器,并且把这个调用的结果发送给 DRPC 服务器(通过那个唯一的 id 标识)。DRPC 服务器用那个唯一 id 来跟等待的客户端匹配上,唤醒这个客户端并且把结果发送给它。

LinearDRPCTopologyBuilder

Storm 自带了一个称作 LinearDRPCTopologyBuilder 的 topology builder,它把实现 DRPC 的几乎所有步骤都自动化了。这些步骤包括:

1、设置 spout

2、把结果返回给 DRPC 服务器

3、给 bolt 提供有限聚合几组 tuples 的能力

来看一个简单的例子,下面是一个把输入参数后面添加一个”!”的 DRPC topology 的实现:

 

public static class ExclaimBolt extends BaseBasicBolt {

 

    public void execute(Tuple tuple, BasicOutputCollector collector) {

 

     String input = tuple.getString(1);

 

     collector.emit(new Values(tuple.getValue(0), input + ! ));

 

    }

 

    public void declareOutputFields(OutputFieldsDeclarer declarer) {

 

     declarer.declare(new Fields( id , result));

 

    }

 

}

 

 

public static void main(String[] args) throws Exception {

 

    LinearDRPCTopologyBuilder builder = new LinearDRPCTopologyBuilder(exclamation

 

    builder.addBolt(new ExclaimBolt(), 3);

 

    // …

 

}

 

可以看出来,我们需要做的事情非常的少。创建 LinearDRPCTopologyBuilder 的时候,你需要告诉它你要实现的 DRPC 函数 (DRPC function) 的名字。一个 DRPC 服务器可以协调很多函数,函数与函数之间靠函数名字来区分。你声明的第一个 bolt 会接收一个两维 tuple,tuple 的第一个字段是 request-id,第二个字段是这个请求的参数。LinearDRPCTopologyBuilder 同时要求我们 topology 的最后一个 bolt 发送一个形如 [id, result] 的二维 tuple:第一个 field 是 request-id,第二个 field 是这个函数的结果。最后所有中间 tuple 的第一个 field 必须是 request-id。

在这里例子里面 ExclaimBolt 简单地在输入 tuple 的第二个 field 后面再添加一个”!”,其余的事情都由 LinearDRPCTopologyBuilder 帮我们搞定:连接到 DRPC 服务器,并且把结果发回。

本地模式 DRPC

DRPC 可以以本地模式运行,下面就是以本地模式运行上面例子的代码:

 

LocalDRPC drpc = new LocalDRPC();

 

LocalCluster cluster = new LocalCluster();

 

 

cluster.submitTopology(drpc-demo , conf, builder.createLocalTopology(drpc));

 

 

System.out.println(Results for  hello : + drpc.execute( exclamation , hello));

 

 

cluster.shutdown();

 

drpc.shutdown();

 

首先你创建一个 LocalDRPC 对象,这个对象在进程内模拟一个 DRPC 服务器(这很类似于 LocalCluster 在进程内模拟一个 Storm 集群),然后创建 LocalCluster 对象在本地模式运行 topology。LinearTopologyBuilder 有单独的方法来创建本地的 topology 和远程的 topology。在本地模式里面 LocalDRPC 对象不和任何端口绑定,所以我们的 topology 对象需要知道和谁交互,这就是为什么 createLocalTopology 方法接受一个 LocalDRPC 对象作为输入的原因。

把 topology 启动了之后,你就可以通过调用 LocalDRPC 对象的 execute 来调用 RPC 方法了。

远程模式 DRPC

在一个真实集群上面 DRPC 也是非常简单的,有三个步骤:

1、启动 DRPC 服务器

2、配置 DRPC 服务器的地址

3、提交 DRPCtopology 到 storm 集群里面去。

我们可以通过“bin/storm drpc”命令来启动 DRPC 服务器。

接着,你需要让你的 storm 集群知道你的 DRPC 服务器的地址。DRPCSpout 需要这个地址从而可以从 DRPC 服务器来接收函数调用。这个可以配置在 storm.yaml 或者通过代码的方式配置在 topology 里面。通过 storm.yaml 配置是这样的:

 

drpc.servers:

 

 – drpc1.foo.com

 

 – drpc2.foo.com

 

最后,你通过 StormSubmitter 对象来提交 DRPC topology(这个跟你提交其它 topology 没有区别)。如果要以远程的方式运行上面的例子,用下面的代码:

 

StormSubmitter.submitTopology(exclamation-drpc , conf, builder.createRemoteTopology());

 

我们用 createRemoteTopology 方法来创建运行在真实集群上的 DRPC topology。

到此,相信大家对“Storm DRPC 怎么使用”有了更深的了解,不妨来实际操作一番吧!这里是丸趣 TV 网站,更多相关内容可以进入相关频道进行查询,关注我们,继续学习!

正文完
 
丸趣
版权声明:本站原创文章,由 丸趣 2023-08-16发表,共计3258字。
转载说明:除特殊说明外本站除技术相关以外文章皆由网络搜集发布,转载请注明出处。
评论(没有评论)