共计 3175 个字符,预计需要花费 8 分钟才能阅读完成。
这篇文章主要介绍“Storm 分布式 RPC 怎么配置”,在日常操作中,相信很多人在 Storm 分布式 RPC 怎么配置问题上存在疑惑,丸趣 TV 小编查阅了各式资料,整理出简单好用的操作方法,希望对大家解答”Storm 分布式 RPC 怎么配置”的疑惑有所帮助!接下来,请跟着丸趣 TV 小编一起来学习吧!
首先需要在 storm 集群上把 DRPC 的环境准备好,在 storm.yaml 当中增加如下内容
drpc.servers:
– 192.168.1.118
之后通过 storm drpc 启动分布式 RPC 服务。
之后,跟其他的 topology 并没有什么不同,我们需要写点代码,我这边直接从 storm 的例子当中找了个:
public class BasicDRPCTopology {
public static class ExclaimBolt extends BaseBasicBolt {
@Override
public void execute(Tuple tuple, BasicOutputCollector collector) {
String input = tuple.getString(1);
collector.emit(new Values(tuple.getValue(0), input + ! ));
}
@Override
public void declareOutputFields(OutputFieldsDeclarer declarer) {
declarer.declare(new Fields( id , result));
}
}
public static void main(String[] args) throws Exception {
LinearDRPCTopologyBuilder builder = new LinearDRPCTopologyBuilder(exclamation
builder.addBolt(new ExclaimBolt(), 3);
Config conf = new Config();
conf.setNumWorkers(3);
StormSubmitter.submitTopologyWithProgressBar(DRCP-TEST , conf, builder.createRemoteTopology());
}
}
从 main 函数开始,简单解释一下:
首先 new 一个 LinearDRPCTopologyBuilder 对象,其中的参数【exclamation】就是我们在执行 rpc 调用时候的方法名。
之后我们加入一个自己的 bolt,并行数量为 3
之后用 StormSubmitter 把这个 topology 提交上去就行了。
代码完成之后,打一个 jar 包,用 storm jar 把 topology 提交到集群上。
客户端调用,非常简单
DRPCClient client = new DRPCClient(192.168.1.118 , 3772);
String result = client.execute(exclamation , china
System.out.println(result);
到此为止,一个最简单的 DRPC 调用的工作已经完成了。
等等,还有点问题,LinearDRPCTopologyBuilder 这个东西是不建议使用的(我这里的版本是 0.9.3)。
源码上有这么一行:
Trident subsumes the functionality provided by this class, so it s deprecated
大概意思就是 trident 这个东西已经包含了 LinearDRPCTopologyBuilder 当中的功能。
trident 是什么意思?翻译了一下,【三叉戟】,靠,看起来很牛逼的样子。必须试试。
那么上第二份代码:
public class TridentDRPCTopology {
public static void main(String[] args) throws Exception {
Config conf = new Config();
StormSubmitter.submitTopologyWithProgressBar(word-count , conf, buildTopology());
}
public static StormTopology buildTopology() {
TridentTopology topology = new TridentTopology();
topology.newDRPCStream(word-count).
each(new Fields( args), new Split(), new Fields( word)).
groupBy(new Fields( word)).
aggregate(new One(), new Fields(one)).
aggregate(new Fields( one), new Sum(), new Fields( word-count));
return topology.build();
}
public static class Split extends BaseFunction {
@Override
public void execute(TridentTuple tuple, TridentCollector collector) {
String sentence = tuple.getString(0);
for (String word : sentence.split()) {
collector.emit(new Values(word));
}
}
}
public static class One implements CombinerAggregator Integer {
@Override
public Integer init(TridentTuple tuple) {
return 1;
}
@Override
public Integer combine(Integer val1, Integer val2) {
return 1;
}
@Override
public Integer zero() {
return 1;
}
}
}
这个 topology 的功能要稍稍复杂一些,给出一句话,查一下一共有多少个词,当然了,不能重复计数。main 函数当中非常简单,提交一个 topology。而这个 topology 的构建过程是在 buildTopology 当中完成的。
topology.newDRPCStream(word-count).
each(new Fields( args), new Split(), new Fields( word)). // 用空格分词
groupBy(new Fields( word)). // 分组
aggregate(new One(), new Fields(one)). // 给每组的数量设定为 1
aggregate(new Fields( one), new Sum(), new Fields( word-count)); //sum 计算总和
这样的方式看起来跟 spark 当中对 RDD 的操作是有些像的。
好了,还是打包,提交。
然后是客户端测试:
DRPCClient client = new DRPCClient(192.168.1.118 , 3772);
String result = client.execute(word-count , mywife asdf asdf asdfasdfasfweqw saaa weweew
System.out.println(result);
到此,关于“Storm 分布式 RPC 怎么配置”的学习就结束了,希望能够解决大家的疑惑。理论与实践的搭配能更好的帮助大家学习,快去试试吧!若想继续学习更多相关知识,请继续关注丸趣 TV 网站,丸趣 TV 小编会继续努力为大家带来更多实用的文章!