共计 4346 个字符,预计需要花费 11 分钟才能阅读完成。
如何理解 Storm 的并行度、Grouping 策略以及消息可靠处理机制,相信很多没有经验的人对此束手无策,为此本文总结了问题出现的原因和解决方法,通过这篇文章希望你能解决这个问题。
概念:
Workers (JVMs): 在一个节点上可以运行一个或多个独立的 JVM 进程。一个 Topology 可以包含一个或多个 worker(并行的跑在不同的 machine 上), 所以 worker process 就是执行一个 topology 的子集, 并且 worker 只能对应于一个 topology
Executors (threads): 在一个 worker JVM 进程中运行着多个 Java 线程。一个 executor 线程可以执行一个或多个 tasks。但一般默认每个 executor 只执行一个 task。一个 worker 可以包含一个或多个 executor, 每个 component (spout 或 bolt) 至少对应于一个 executor, 所以可以说 executor 执行一个 compenent 的子集, 同时一个 executor 只能对应于一个 component。
Tasks(bolt/spout instances):Task 就是具体的处理逻辑对象,每一个 Spout 和 Bolt 会被当作很多 task 在整个集群里面执行。每一个 task 对应到一个线程,而 stream grouping 则是定义怎么从一堆 task 发射 tuple 到另外一堆 task。你可以调用 TopologyBuilder.setSpout 和 TopBuilder.setBolt 来设置并行度 — 也就是有多少个 task。
配置并行度
对于并发度的配置, 在 storm 里面可以在多个地方进行配置, 优先级为:defaults.yaml storm.yaml topology-specific configuration internal component-specific configuration external component-specific configuration
worker processes 的数目, 可以通过配置文件和代码中配置, worker 就是执行进程, 所以考虑并发的效果, 数目至少应该大亍 machines 的数目
executor 的数目, component 的并发线程数,只能在代码中配置 (通过 setBolt 和 setSpout 的参数), 例如, setBolt(green-bolt , new GreenBolt(), 2)
tasks 的数目, 可以不配置, 默认和 executor1:1, 也可以通过 setNumTasks() 配置
Topology 的 worker 数通过 config 设置,即执行该 topology 的 worker(java)进程数。它可以通过 storm rebalance 命令任意调整。
Config conf = new Config();
conf.setNumWorkers(2); // use two worker processes
topologyBuilder.setSpout(blue-spout , new BlueSpout(), 2); // set parallelism hint to 2
topologyBuilder.setBolt(green-bolt , new GreenBolt(), 2).setNumTasks(4).shuffleGrouping( blue-spout //set tasks number to 4
topologyBuilder.setBolt(yellow-bolt , new YellowBolt(), 6).shuffleGrouping( green-bolt
StormSubmitter.submitTopology(mytopology , conf, topologyBuilder.createTopology());
动态的改变并行度
Storm 支持在不 restart topology 的情况下, 动态的改变 (增减) worker processes 的数目和 executors 的数目, 称为 rebalancing. 通过 Storm web UI,或者通过 storm rebalance 命令实现:storm rebalance mytopology -n 5 -e blue-spout=3 -e yellow-bolt=10
流分组策略 —-Stream Grouping
Stream Grouping,告诉 topology 如何在两个组件之间发送 tuple
定义一个 topology 的其中一步是定义每个 bolt 接收什么样的流作为输入。stream grouping 就是用来定义一个 stream 应该如果分配数据给 bolts 上面的多个 tasks
Storm 里面有 7 种类型的 stream grouping,你也可以通过实现 CustomStreamGrouping 接口来实现自定义流分组
1. Shuffle Grouping
随机分组,随机派发 stream 里面的 tuple,保证每个 bolt task 接收到的 tuple 数目大致相同。
2. Fields Grouping
按字段分组,比如,按 user-id 这个字段来分组,那么具有同样 user-id 的 tuple 会被分到相同的 Bolt 里的一个 task,而不同的 user-id 则可能会被分配到不同的 task。
3. All Grouping
广播发送,对亍每一个 tuple,所有的 bolts 都会收到
4. Global Grouping
全局分组,整个 stream 被分配到 storm 中的一个 bolt 的其中一个 task。再具体一点就是分配给 id 值最低的那个 task。
5. None Grouping
不分组,这个分组的意思是说 stream 不关心到底怎样分组。目前这种分组和 Shuffle grouping 是一样的效果,有一点不同的是 storm 会把使用 none grouping 的这个 bolt 放到这个 bolt 的订阅者同一个线程里面去执行(如果可能的话)。
6. Direct Grouping
指向型分组,这是一种比较特别的分组方法,用这种分组意味着消息(tuple)的发送者指定由消息接收者的哪个 task 处理这个消息。只有被声明为 Direct Stream 的消息流可以声明这种分组方法。而且这种消息 tuple 必须使用 emitDirect 方法来发射。消息处理者可以通过 TopologyContext 来获取处理它的消息的 task 的 id (OutputCollector.emit 方法也会返回 task 的 id)
7. Local or shuffle grouping
本地或随机分组。如果目标 bolt 有一个或者多个 task 与源 bolt 的 task 在同一个工作进程中,tuple 将会被随机发送给这些同进程中的 tasks。否则,和普通的 Shuffle Grouping 行为一致。
消息的可靠处理机制
在 storm 中,可靠的信息处理机制是从 spout 开始的。一个提供了可靠的处理机制的 spout 需要记录他发射出去的 tuple,当下游 bolt 处理 tuple 或者子 tuple 失败时 spout 能够重新发射。
Storm 通过调用 Spout 的 nextTuple() 发送一个 tuple。为实现可靠的消息处理,首先要给每个发出的 tuple 带上唯一的 ID,并且将 ID 作为参数传递给 SoputOutputCollector 的 emit() 方法:collector.emit(new Values( value1 , value2), msgId); 给 tuple 指定 ID 告诉 Storm 系统,无论处理成功还是失败,spout 都要接收 tuple 树上所有节点返回的通知。如果处理成功,spout 的 ack() 方法将会对编号是 msgId 的消息应答确认;如果处理失败或者超时,会调用 fail() 方法。
bolt 要实现可靠的信息处理机制包含两个步骤:1. 当发射衍生的 tuple 时,需要锚定读入的 tuple;2. 当处理消息成功或失败时分别确认应答或者报错。
锚定一个 tuple 的意思是,建立读入 tuple 和衍生出的 tuple 之间的对应关系,这样下游的 bolt 就可以通过应答确认、报错或超时来加入到 tuple 树结构中。可以通过调用 OutputCollector 的 emit() 的一个重载函数锚定一个或一组 tuple:collector.emit(tuple, new Values(word))
非锚定(collector.emit(new Values(word));)的 tuple 不会对数据流的可靠性起作用。如果一个非锚定的 tuple 在下游处理失败,原始的根 tuple 不会重新发送。
超时时间可以通过任务级参数 Config.TOPOLOGY_MESSAGE_TIMEOUT_SECS 进行配置,默认超时值为 30 秒。
Storm 系统中有一组叫做 acker 的特殊的任务,它们负责跟踪 DAG(有向无环图)中的每个消息。acker 任务保存了 spout 消息 id 到一对值的映射。第一个值就是 spout 的任务 id,通过这个 id,acker 就知道消息处理完成时该通知哪个 spout 任务。第二个值是一个 64bit 的数字,我们称之为 ack val,它是树中所有消息的随机 id 的异或计算结果。ack val 表示了整棵树的的状态,无论这棵树多大,只需要这个固定大小的数字就可以跟踪整棵树。当消息被创建和被应答的时候都会有相同的消息 id 发送过来做异或。
每当 acker 发现一棵树的 ack val 值为 0 的时候,它就知道这棵树已经被完全处理了。因为消息的随机 ID 是一个 64bit 的值,因此 ack val 在树处理完之前被置为 0 的概率非常小。假设你每秒钟发送一万个消息,从概率上说,至少需要 50,000,000 年才会有机会发生一次错误。即使如此,也只有在这个消息确实处理失败的情况下才会有数据的丢失!
有三种方法可以去掉消息的可靠性:
1、将参数 Config.TOPOLOGY_ACKERS 设置为 0,通过此方法,当 Spout 发送一个消息的时候,它的 ack 方法将立刻被调用;
2、Spout 发送一个消息时,不指定此消息的 messageID。当需要关闭特定消息可靠性的时候,可以使用此方法;
3、最后,如果你不在意某个消息派生出来的子孙消息的可靠性,则此消息派生出来的子消息在发送时不要做锚定,即在 emit 方法中不指定输入消息。因为这些子孙消息没有被锚定在任何 tuple tree 中,因此他们的失败不会引起任何 spout 重新发送消息。
看完上述内容,你们掌握如何理解 Storm 的并行度、Grouping 策略以及消息可靠处理机制的方法了吗?如果还想学到更多技能或想了解更多相关内容,欢迎关注丸趣 TV 行业资讯频道,感谢各位的阅读!