共计 3258 个字符,预计需要花费 9 分钟才能阅读完成。
本篇内容主要讲解“Storm DRPC 怎么使用”,感兴趣的朋友不妨来看看。本文介绍的方法操作简单快捷,实用性强。下面就让丸趣 TV 小编来带大家学习“Storm DRPC 怎么使用”吧!
Storm 里面引入 DRPC 主要是利用 storm 的实时计算能力来并行化 CPU 密集型(CPU intensive)的计算任务。DRPC 的 stormtopology 以函数的参数流作为输入,而把这些函数调用的返回值作为 topology 的输出流。
DRPC 其实不能算是 storm 本身的一个特性,它是通过组合 storm 的原语 stream、spout、bolt、topology 而成的一种模式(pattern)。本来应该把 DRPC 单独打成一个包的,但是 DRPC 实在是太有用了,所以我们我们把它和 storm 捆绑在一起。
概览
Distributed RPC 是由一个”DPRC 服务器”协调(storm 自带了一个实现)。DRPC 服务器协调:① 接收一个 RPC 请求 ② 发送请求到 storm topology ③ 从 storm topology 接收结果 ④ 把结果发回给等待的客户端。从客户端的角度来看一个 DRPC 调用跟一个普通的 RPC 调用没有任何区别。比如下面是客户端如何调用 RPC 计算“reach”功能(function)的结果
DRPCClient client = new DRPCClient(drpc-host , 3772);
String result = client.execute(reach , http://twitter.com
DRPC 的工作流大致是这样的(重要☆):
客户端给 DRPC 服务器发送要执行的函数(function)的名字,以及这个函数的参数。实现了这个函数的 topology 使用 DRPCSpout 从 DRPC 服务器接收函数调用流,每个函数调用被 DRPC 服务器标记了一个唯一的 id。这个 topology 然后计算结果,在 topology 的最后,一个叫做 ReturnResults 的 bolt 会连接到 DRPC 服务器,并且把这个调用的结果发送给 DRPC 服务器(通过那个唯一的 id 标识)。DRPC 服务器用那个唯一 id 来跟等待的客户端匹配上,唤醒这个客户端并且把结果发送给它。
LinearDRPCTopologyBuilder
Storm 自带了一个称作 LinearDRPCTopologyBuilder 的 topology builder,它把实现 DRPC 的几乎所有步骤都自动化了。这些步骤包括:
1、设置 spout
2、把结果返回给 DRPC 服务器
3、给 bolt 提供有限聚合几组 tuples 的能力
来看一个简单的例子,下面是一个把输入参数后面添加一个”!”的 DRPC topology 的实现:
public static class ExclaimBolt extends BaseBasicBolt {
public void execute(Tuple tuple, BasicOutputCollector collector) {
String input = tuple.getString(1);
collector.emit(new Values(tuple.getValue(0), input + ! ));
}
public void declareOutputFields(OutputFieldsDeclarer declarer) {
declarer.declare(new Fields( id , result));
}
}
public static void main(String[] args) throws Exception {
LinearDRPCTopologyBuilder builder = new LinearDRPCTopologyBuilder(exclamation
builder.addBolt(new ExclaimBolt(), 3);
// …
}
可以看出来,我们需要做的事情非常的少。创建 LinearDRPCTopologyBuilder 的时候,你需要告诉它你要实现的 DRPC 函数 (DRPC function) 的名字。一个 DRPC 服务器可以协调很多函数,函数与函数之间靠函数名字来区分。你声明的第一个 bolt 会接收一个两维 tuple,tuple 的第一个字段是 request-id,第二个字段是这个请求的参数。LinearDRPCTopologyBuilder 同时要求我们 topology 的最后一个 bolt 发送一个形如 [id, result] 的二维 tuple:第一个 field 是 request-id,第二个 field 是这个函数的结果。最后所有中间 tuple 的第一个 field 必须是 request-id。
在这里例子里面 ExclaimBolt 简单地在输入 tuple 的第二个 field 后面再添加一个”!”,其余的事情都由 LinearDRPCTopologyBuilder 帮我们搞定:连接到 DRPC 服务器,并且把结果发回。
本地模式 DRPC
DRPC 可以以本地模式运行,下面就是以本地模式运行上面例子的代码:
LocalDRPC drpc = new LocalDRPC();
LocalCluster cluster = new LocalCluster();
cluster.submitTopology(drpc-demo , conf, builder.createLocalTopology(drpc));
System.out.println(Results for hello : + drpc.execute( exclamation , hello));
cluster.shutdown();
drpc.shutdown();
首先你创建一个 LocalDRPC 对象,这个对象在进程内模拟一个 DRPC 服务器(这很类似于 LocalCluster 在进程内模拟一个 Storm 集群),然后创建 LocalCluster 对象在本地模式运行 topology。LinearTopologyBuilder 有单独的方法来创建本地的 topology 和远程的 topology。在本地模式里面 LocalDRPC 对象不和任何端口绑定,所以我们的 topology 对象需要知道和谁交互,这就是为什么 createLocalTopology 方法接受一个 LocalDRPC 对象作为输入的原因。
把 topology 启动了之后,你就可以通过调用 LocalDRPC 对象的 execute 来调用 RPC 方法了。
远程模式 DRPC
在一个真实集群上面 DRPC 也是非常简单的,有三个步骤:
1、启动 DRPC 服务器
2、配置 DRPC 服务器的地址
3、提交 DRPCtopology 到 storm 集群里面去。
我们可以通过“bin/storm drpc”命令来启动 DRPC 服务器。
接着,你需要让你的 storm 集群知道你的 DRPC 服务器的地址。DRPCSpout 需要这个地址从而可以从 DRPC 服务器来接收函数调用。这个可以配置在 storm.yaml 或者通过代码的方式配置在 topology 里面。通过 storm.yaml 配置是这样的:
drpc.servers:
– drpc1.foo.com
– drpc2.foo.com
最后,你通过 StormSubmitter 对象来提交 DRPC topology(这个跟你提交其它 topology 没有区别)。如果要以远程的方式运行上面的例子,用下面的代码:
StormSubmitter.submitTopology(exclamation-drpc , conf, builder.createRemoteTopology());
我们用 createRemoteTopology 方法来创建运行在真实集群上的 DRPC topology。
到此,相信大家对“Storm DRPC 怎么使用”有了更深的了解,不妨来实际操作一番吧!这里是丸趣 TV 网站,更多相关内容可以进入相关频道进行查询,关注我们,继续学习!