共计 3523 个字符,预计需要花费 9 分钟才能阅读完成。
这篇文章主要介绍“Storm 的 Grouping 有哪些”,在日常操作中,相信很多人在 Storm 的 Grouping 有哪些问题上存在疑惑,丸趣 TV 小编查阅了各式资料,整理出简单好用的操作方法,希望对大家解答”Storm 的 Grouping 有哪些”的疑惑有所帮助!接下来,请跟着丸趣 TV 小编一起来学习吧!
##Storm Grouping
shuffleGrouping
将流分组定义为混排。这种混排分组意味着来自 Spout 的输入将混排,或随机分发给此 Bolt 中的任务。shuffle grouping 对各个 task 的 tuple 分配的比较均匀。
fieldsGrouping
这种 grouping 机制保证相同 field 值的 tuple 会去同一个 task,这对于 WordCount 来说非常关键,如果同一个单词不去同一个 task,那么统计出来的单词次数就不对了。
All grouping
广播发送,对于每一个 tuple 将会复制到每一个 bolt 中处理。
Global grouping
Stream 中的所有的 tuple 都会发送给同一个 bolt 任务处理,所有的 tuple 将会发送给拥有最小 task_id 的 bolt 任务处理。
None grouping
不关注并行处理负载均衡策略时使用该方式,目前等同于 shuffle grouping, 另外 storm 将会把 bolt 任务和他的上游提供数据的任务安排在同一个线程下。
Direct grouping
由 tuple 的发射单元直接决定 tuple 将发射给那个 bolt,一般情况下是由接收 tuple 的 bolt 决定接收哪个 bolt 发射的 Tuple。这是一种比较特别的分组方法,用这种分组意味着消息的发送者指定由消息接收者的哪个 task 处理这个消息。只有被声明为 Direct Stream 的消息流可以声明这种分组方法。而且这种消息 tuple 必须使用 emitDirect 方法来发射。消息处理者可以通过 TopologyContext 来获取处理它的消息的 taskid (OutputCollector.emit 方法也会返回 taskid)
##fieldsGrouping
如果你了解 Storm,我想你能明白其中的大多数 Grouping。这里的 Grouping 策略我想着重介绍一下 fieldsGrouping,也最难理解的。
fieldsGrouping 是按照数据中字段 Field 的值分组的。下面是我的测试代码:
TopologyBuilder builder = new TopologyBuilder();
builder.setSpout(words , new TestWordSpout(), 2);
builder.setBolt(exclaim2 , new DefaultStringBolt(), 5)
.fieldsGrouping(words , new Fields( word));
测试的例子 Spout 是 Storm 自带的例子,Blot 代码如下:
public void execute(Tuple tuple) {log.info( rev a message: + tuple.getString(0));
collector.emit(tuple, new Values(tuple.getString(0) + !!! ));
collector.ack(tuple);
public void declareOutputFields(OutputFieldsDeclarer declarer) { declarer.declare(new Fields( word));
}
Storm 自带的例子 Spout 能随机的返回 code new String[] { nathan , mike , jackson , golda , bertels /code 列表中的几个字符串。这也是测试 FieldGroup 的好例子。
按照我最早做 Storm 开始前的理解,既然是按照 Field 分组,那么是所有相同的 Field 值得数据都会到达一个 Blot 的。我测试很多次,其结果并不是这样,一个 Blot 会收到多个不同的值。我没有仔细探究 Storm 这样分组有什么特别的地方,以至于自己对 Storm 的学习停滞了很长时间。
Storm 能保证所有相同 Field 值的数据到达的是相同的 Blot,但是不保证一个 Blot 只处理一个值域。
也就是说,所有值是 nathan 能到达到一个 Blot,但是到达同一个 Blot 的值可能有多个,如 nathan , mike 的数据都到达。
理解到这点上,fieldsGrouping 就算是理解了。
下面是测试日志:
9144 [Thread-35-exclaim2] INFO cn.pointways.dstorm.bolt.DefaultStringBolt - rev a message: bertels
9234 [Thread-35-exclaim2] INFO cn.pointways.dstorm.bolt.DefaultStringBolt - rev a message: mike
9245 [Thread-33-exclaim2] INFO cn.pointways.dstorm.bolt.DefaultStringBolt - rev a message: nathan
9335 [Thread-26-exclaim2] INFO cn.pointways.dstorm.bolt.DefaultStringBolt - rev a message: golda
9346 [Thread-26-exclaim2] INFO cn.pointways.dstorm.bolt.DefaultStringBolt - rev a message: golda
9437 [Thread-35-exclaim2] INFO cn.pointways.dstorm.bolt.DefaultStringBolt - rev a message: jackson
9447 [Thread-35-exclaim2] INFO cn.pointways.dstorm.bolt.DefaultStringBolt - rev a message: mike
9537 [Thread-26-exclaim2] INFO cn.pointways.dstorm.bolt.DefaultStringBolt - rev a message: golda
9548 [Thread-35-exclaim2] INFO cn.pointways.dstorm.bolt.DefaultStringBolt - rev a message: jackson
9639 [Thread-33-exclaim2] INFO cn.pointways.dstorm.bolt.DefaultStringBolt - rev a message: nathan
9649 [Thread-35-exclaim2] INFO cn.pointways.dstorm.bolt.DefaultStringBolt - rev a message: jackson
9740 [Thread-33-exclaim2] INFO cn.pointways.dstorm.bolt.DefaultStringBolt - rev a message: nathan
9749 [Thread-35-exclaim2] INFO cn.pointways.dstorm.bolt.DefaultStringBolt - rev a message: jackson
9841 [Thread-35-exclaim2] INFO cn.pointways.dstorm.bolt.DefaultStringBolt - rev a message: bertels
9850 [Thread-26-exclaim2] INFO cn.pointways.dstorm.bolt.DefaultStringBolt - rev a message: golda
由上面的日志可以看出,golda 这个值的数据,的确归并到一个 Blot 处理的。线程编号:Thread-26-exclaim2。其它值也都是相同值都是在一个线程内被处理的。
到此,关于“Storm 的 Grouping 有哪些”的学习就结束了,希望能够解决大家的疑惑。理论与实践的搭配能更好的帮助大家学习,快去试试吧!若想继续学习更多相关知识,请继续关注丸趣 TV 网站,丸趣 TV 小编会继续努力为大家带来更多实用的文章!