如何进行Twitter Storm Stream Grouping编写自定义分组实现

65次阅读
没有评论

共计 3652 个字符,预计需要花费 10 分钟才能阅读完成。

本篇文章为大家展示了如何进行 Twitter Storm Stream Grouping 编写自定义分组实现,内容简明扼要并且容易理解,绝对能使你眼前一亮,通过这篇文章的详细介绍希望你能有所收获。

## 自定义 Grouping 测试

Storm 是支持自定义分组的,本篇文章就是探究 Storm 如何编写一个自定义分组器,以及对 Storm 分组器如何分组数据的理解。

这是我写的一个自定义分组,总是把数据分到第一个 Task:

public class MyFirstStreamGrouping implements CustomStreamGrouping { private static Logger log = LoggerFactory.getLogger(MyFirstStreamGrouping.class);
 private List Integer  tasks;
 @Override
 public void prepare(WorkerTopologyContext context, GlobalStreamId stream,
 List Integer  targetTasks) {
  this.tasks = targetTasks;
  log.info(tasks.toString());
 } 
 @Override
 public List Integer  chooseTasks(int taskId, List Object  values) { log.info(values.toString());
  return Arrays.asList(tasks.get(0));
 }
}

从上面的代码可以看出,该自定义分组会把数据归并到第一个 Task code Arrays.asList(tasks.get(0)); /code,也就是数据到达后总是被派发到第一组。

测试代码:

TopologyBuilder builder = new TopologyBuilder();builder.setSpout(words , new TestWordSpout(), 2); 
// 自定义分组,builder.setBolt(exclaim1 , new DefaultStringBolt(), 3)
  .customGrouping(words , new MyFirstStreamGrouping());

和之前的测试用例一样,Spout 总是发送 code new String[] {“nathan”,“mike”,“jackson”,“golda”,“bertels”} /code 列表的字符串。我们运行验证一下:

11878 [Thread-25-exclaim1] INFO cn.pointways.dstorm.bolt.DefaultStringBolt - rev a message: jackson
11943 [Thread-41-words] INFO cn.pointways.dstorm.grouping.MyFirstStreamGrouping - [nathan]
11944 [Thread-25-exclaim1] INFO cn.pointways.dstorm.bolt.DefaultStringBolt - rev a message: nathan
11979 [Thread-29-words] INFO cn.pointways.dstorm.grouping.MyFirstStreamGrouping - [mike]
11980 [Thread-25-exclaim1] INFO cn.pointways.dstorm.bolt.DefaultStringBolt - rev a message: mike
12045 [Thread-41-words] INFO cn.pointways.dstorm.grouping.MyFirstStreamGrouping - [jackson]
12045 [Thread-25-exclaim1] INFO cn.pointways.dstorm.bolt.DefaultStringBolt - rev a message: jackson
12080 [Thread-29-words] INFO cn.pointways.dstorm.grouping.MyFirstStreamGrouping - [jackson]
12081 [Thread-25-exclaim1] INFO cn.pointways.dstorm.bolt.DefaultStringBolt - rev a message: jackson
12145 [Thread-41-words] INFO cn.pointways.dstorm.grouping.MyFirstStreamGrouping - [mike]
12146 [Thread-25-exclaim1] INFO cn.pointways.dstorm.bolt.DefaultStringBolt - rev a message: mike

从这个运行日志我们可以看出,数据总是派发到一个 Blot:Thread-25-exclaim1。因为我时本地测试,Thread-25-exclaim1 是线程名。而派发的线程是数据多个线程的。因此该测试符合预期,即总是发送到一个 Task,并且这个 Task 也是第一个。

## 理解自定义分组实现

自己实现一个自定义分组难吗?其实如果你理解了 Hadoop 的 Partitioner,Storm 的 CustomStreamGrouping 和它也是一样的道理。

Hadoop MapReduce 的 Map 完成后会把 Map 的中间结果写入磁盘,在写磁盘前,线程首先根据数据最终要传送到的 Reducer 把数据划分成相应的分区,然后不同的分区进入不同的 Reduce。我们先来看看 Hadoop 是怎样把数据怎样分组的,这是 Partitioner 唯一一个方法:

public class Partitioner K, V  {
 @Override
 public int getPartition(K key, V value, int numReduceTasks) {
 return 0;
 }
}

上面的代码中:Map 输出的数据都会经过 getPartition()方法,用来确定下一步的分组。numReduceTasks 是一个 Job 的 Reduce 数量,而返回值就是确定该条数据进入哪个 Reduce。返回值必须大于等于 0,小于 numReduceTasks,否则就会报错。返回 0 就意味着这条数据进入第一个 Reduce。对于随机分组来说,这个方法可以这么实现:

public int getPartition(K key, V value, int numReduceTasks) { return hash(key) % numReduceTasks;
}

其实 Hadoop 默认的 Hash 分组策略也正是这么实现的。这样好处是,数据在整个集群基本上是负载平衡的。

搞通了 Hadoop 的 Partitioner,我们来看看 Storm 的 CustomStreamGrouping。

这是 CustomStreamGrouping 类的源码:

public interface CustomStreamGrouping extends Serializable { void prepare(WorkerTopologyContext context, GlobalStreamId stream, List Integer  targetTasks);
 List Integer  chooseTasks(int taskId, List Object  values); 
}

一模一样的道理,targetTasks 就是 Storm 运行时告诉你,当前有几个目标 Task 可以选择,每一个都给编上了数字编号。而 code chooseTasks(int taskId, List Object values); /code 就是让你选择,你的这条数据 values,是要哪几个目标 Task 处理?

如上文文章开头的自定义分组器实现的代码,我选择的总是让第一个 Task 来处理数据,code return Arrays.asList(tasks.get(0)); /code。和 Hadoop 不同的是,Storm 允许一条数据被多个 Task 处理,因此返回值是 List Integer . 就是让你来在提供的 List Integer targetTasks Task 中选择任意的几个(必须至少是一个)Task 来处理数据。

上述内容就是如何进行 Twitter Storm Stream Grouping 编写自定义分组实现,你们学到知识或技能了吗?如果还想学到更多技能或者丰富自己的知识储备,欢迎关注丸趣 TV 行业资讯频道。

正文完
 
丸趣
版权声明:本站原创文章,由 丸趣 2023-08-17发表,共计3652字。
转载说明:除特殊说明外本站除技术相关以外文章皆由网络搜集发布,转载请注明出处。
评论(没有评论)