Storm分布式RPC怎么配置

68次阅读
没有评论

共计 3175 个字符,预计需要花费 8 分钟才能阅读完成。

这篇文章主要介绍“Storm 分布式 RPC 怎么配置”,在日常操作中,相信很多人在 Storm 分布式 RPC 怎么配置问题上存在疑惑,丸趣 TV 小编查阅了各式资料,整理出简单好用的操作方法,希望对大家解答”Storm 分布式 RPC 怎么配置”的疑惑有所帮助!接下来,请跟着丸趣 TV 小编一起来学习吧!

首先需要在 storm 集群上把 DRPC 的环境准备好,在 storm.yaml 当中增加如下内容

 drpc.servers:
  – 192.168.1.118

之后通过 storm drpc 启动分布式 RPC 服务。

之后,跟其他的 topology 并没有什么不同,我们需要写点代码,我这边直接从 storm 的例子当中找了个:

public class BasicDRPCTopology {
  public static class ExclaimBolt extends BaseBasicBolt {
  @Override
  public void execute(Tuple tuple, BasicOutputCollector collector) {
  String input = tuple.getString(1);
  collector.emit(new Values(tuple.getValue(0), input + ! ));
  }

  @Override
  public void declareOutputFields(OutputFieldsDeclarer declarer) {
  declarer.declare(new Fields( id , result));
  }

  }

  public static void main(String[] args) throws Exception {
  LinearDRPCTopologyBuilder builder = new LinearDRPCTopologyBuilder(exclamation
  builder.addBolt(new ExclaimBolt(), 3);

  Config conf = new Config();
  conf.setNumWorkers(3);
  StormSubmitter.submitTopologyWithProgressBar(DRCP-TEST , conf, builder.createRemoteTopology());
  }
}

从 main 函数开始,简单解释一下:

首先 new 一个 LinearDRPCTopologyBuilder 对象,其中的参数【exclamation】就是我们在执行 rpc 调用时候的方法名。

之后我们加入一个自己的 bolt,并行数量为 3

之后用 StormSubmitter 把这个 topology 提交上去就行了。

代码完成之后,打一个 jar 包,用 storm jar 把 topology 提交到集群上。

客户端调用,非常简单

  DRPCClient client = new DRPCClient(192.168.1.118 , 3772);
  String result = client.execute(exclamation , china
  System.out.println(result);

到此为止,一个最简单的 DRPC 调用的工作已经完成了。

等等,还有点问题,LinearDRPCTopologyBuilder 这个东西是不建议使用的(我这里的版本是 0.9.3)。

源码上有这么一行:

Trident subsumes the functionality provided by this class, so it s deprecated

大概意思就是 trident 这个东西已经包含了 LinearDRPCTopologyBuilder 当中的功能。

trident 是什么意思?翻译了一下,【三叉戟】,靠,看起来很牛逼的样子。必须试试。

那么上第二份代码:

public class TridentDRPCTopology {
  public static void main(String[] args) throws Exception {
  Config conf = new Config();
  StormSubmitter.submitTopologyWithProgressBar(word-count , conf, buildTopology());
  }

  public static StormTopology buildTopology() {
  TridentTopology topology = new TridentTopology();

  topology.newDRPCStream(word-count).
  each(new Fields( args), new Split(), new Fields( word)).
  groupBy(new Fields( word)).
  aggregate(new One(), new Fields(one)).
  aggregate(new Fields( one), new Sum(), new Fields( word-count));
  return topology.build();
  }

  public static class Split extends BaseFunction {
  @Override
  public void execute(TridentTuple tuple, TridentCollector collector) {
  String sentence = tuple.getString(0);
  for (String word : sentence.split()) {
  collector.emit(new Values(word));
  }
  }
  }

  public static class One implements CombinerAggregator Integer {
  @Override
  public Integer init(TridentTuple tuple) {
  return 1;
  }

  @Override
  public Integer combine(Integer val1, Integer val2) {
  return 1;
  }

  @Override
  public Integer zero() {
  return 1;
  }
  }
}

这个 topology 的功能要稍稍复杂一些,给出一句话,查一下一共有多少个词,当然了,不能重复计数。main 函数当中非常简单,提交一个 topology。而这个 topology 的构建过程是在 buildTopology 当中完成的。

  topology.newDRPCStream(word-count).
  each(new Fields( args), new Split(), new Fields( word)).  // 用空格分词
  groupBy(new Fields( word)).  // 分组
  aggregate(new One(), new Fields(one)).  // 给每组的数量设定为 1
  aggregate(new Fields( one), new Sum(), new Fields( word-count));  //sum 计算总和

这样的方式看起来跟 spark 当中对 RDD 的操作是有些像的。

好了,还是打包,提交。

然后是客户端测试:

  DRPCClient client = new DRPCClient(192.168.1.118 , 3772);
  String result = client.execute(word-count , mywife asdf asdf asdfasdfasfweqw saaa weweew
  System.out.println(result);

到此,关于“Storm 分布式 RPC 怎么配置”的学习就结束了,希望能够解决大家的疑惑。理论与实践的搭配能更好的帮助大家学习,快去试试吧!若想继续学习更多相关知识,请继续关注丸趣 TV 网站,丸趣 TV 小编会继续努力为大家带来更多实用的文章!

正文完
 
丸趣
版权声明:本站原创文章,由 丸趣 2023-08-16发表,共计3175字。
转载说明:除特殊说明外本站除技术相关以外文章皆由网络搜集发布,转载请注明出处。
评论(没有评论)