共计 5998 个字符,预计需要花费 15 分钟才能阅读完成。
本篇内容介绍了“storm 消息的可靠处理方法是什么”的有关知识,在实际案例的操作过程中,不少人都会遇到这样的困境,接下来就让丸趣 TV 小编带领大家学习一下如何处理这些情况吧!希望大家仔细阅读,能够学有所成!
4.1 简介
storm 可以确保 spout 发送出来的每个消息都会被完整的处理。本章将会描述 storm 体系是如何达到这个目标的,并将会详述开发者应该如何使用 storm 的这些机制来实现数据的可靠处理。
4.2 理解消息被完整处理
一个消息 (tuple) 从 spout 发送出来,可能会导致成百上千的消息基于此消息被创建。
我们来思考一下流式的“单词统计”的例子:
storm 任务从数据源(Kestrel queue)每次读取一个完整的英文句子;将这个句子分解为独立的单词,最后,实时的输出每个单词以及它出现过的次数。
本例中,每个从 spout 发送出来的消息(每个英文句子)都会触发很多的消息被创建,那些从句子中分隔出来的单词就是被创建出来的新消息。
这些消息构成一个树状结构,我们称之为“tuple tree”,看起来如图 1 所示:
图 1 示例 tuple tree
在什么条件下,Storm 才会认为一个从 spout 发送出来的消息被完整处理呢?答案就是下面的条件同时被满足:
tuple tree 不再生长
树中的任何消息被标识为“已处理”
如果在指定的时间内,一个消息衍生出来的 tuple tree 未被完全处理成功,则认为此消息未被完整处理。这个超时值可以通过任务级参数 Config.TOPOLOGY_MESSAGE_TIMEOUT_SECS 进行配置,默认超时值为 30 秒。
4.3 消息的生命周期
如果消息被完整处理或者未被完整处理,Storm 会如何进行接下来的操作呢?为了弄清这个问题,我们来研究一下从 spout 发出来的消息的生命周期。这里列出了 spout 应该实现的接口:
首先,Storm 使用 spout 实例的 nextTuple()方法从 spout 请求一个消息(tuple)。收到请求以后,spout 使用 open 方法中提供的 SpoutOutputCollector 向它的输出流发送一个或多个消息。每发送一个消息,Spout 会给这个消息提供一个 message ID,它将会被用来标识这个消息。
假设我们从 kestrel 队列中读取消息,Spout 会将 kestrel 队列为这个消息设置的 ID 作为此消息的 message ID。向 SpoutOutputCollector 中发送消息格式如下:
接来下,这些消息会被发送到后续业务处理的 bolts,并且 Storm 会跟踪由此消息产生出来的新消息。当检测到一个消息衍生出来的 tuple tree 被完整处理后,Storm 会调用 Spout 中的 ack 方法,并将此消息的 messageID 作为参数传入。同理,如果某消息处理超时,则此消息对应的 Spout 的 fail 方法会被调用,调用时此消息的 messageID 会被作为参数传入。
注意:一个消息只会由发送它的那个 spout 任务来调用 ack 或 fail。如果系统中某个 spout 由多个任务运行,消息也只会由创建它的 spout 任务来应答(ack 或 fail),决不会由其他的 spout 任务来应答。
我们继续使用从 kestrel 队列中读取消息的例子来阐述高可靠性下 spout 需要做些什么(假设这个 spout 的名字是 KestrelSpout)。
我们先简述一下 kestrel 消息队列:
当 KestrelSpout 从 kestrel 队列中读取一个消息,表示它“打开”了队列中某个消息。这意味着,此消息并未从队列中真正的删除,而是将此消息设置为“pending”状态,它等待来自客户端的应答,被应答以后,此消息才会被真正的从队列中删除。处于“pending”状态的消息不会被其他的客户端看到。另外,如果一个客户端意外的断开连接,则由此客户端“打开”的所有消息都会被重新加入到队列中。当消息被“打开”的时候,kestrel 队列同时会为这个消息提供一个唯一的标识。
KestrelSpout 就是使用这个唯一的标识作为这个 tuple 的 messageID 的。稍后当 ack 或 fail 被调用的时候,KestrelSpout 会把 ack 或者 fail 连同 messageID 一起发送给 kestrel 队列,kestrel 会将消息从队列中真正删除或者将它重新放回队列中。
4.4 可靠相关的 API
为了使用 Storm 提供的可靠处理特性,我们需要做两件事情:
无论何时在 tuple tree 中创建了一个新的节点,我们需要明确的通知 Storm;
当处理完一个单独的消息时,我们需要告诉 Storm 这棵 tuple tree 的变化状态。
通过上面的两步,storm 就可以检测到一个 tuple tree 何时被完全处理了,并且会调用相关的 ack 或 fail 方法。Storm 提供了简单明了的方法来完成上述两步。
为 tuple tree 中指定的节点增加一个新的节点,我们称之为锚定(anchoring)。锚定是在我们发送消息的同时进行的。为了更容易说明问题,我们使用下面代码作为例子。本示例的 bolt 将包含整句话的消息分解为一系列的子消息,每个子消息包含一个单词。
每个消息都通过这种方式被锚定:把输入消息作为 emit 方法的第一个参数。因为 word 消息被锚定在了输入消息上,这个输入消息是 spout 发送过来的 tuple tree 的根节点,如果任意一个 word 消息处理失败,派生这个 tuple tree 那个 spout 消息将会被重新发送。
与此相反,我们来看看使用下面的方式 emit 消息时,Storm 会如何处理:
如果以这种方式发送消息,将会导致这个消息不会被锚定。如果此 tuple tree 中的消息处理失败,派生此 tuple tree 的根消息不会被重新发送。根据任务的容错级别,有时候很适合发送一个非锚定的消息。
一个输出消息可以被锚定在一个或者多个输入消息上,这在做 join 或聚合的时候是很有用的。一个被多重锚定的消息处理失败,会导致与之关联的多个 spout 消息被重新发送。多重锚定通过在 emit 方法中指定多个输入消息来实现:
多重锚定会将被锚定的消息加到多棵 tuple tree 上。
注意:多重绑定可能会破坏传统的树形结构,从而构成一个 DAGs(有向无环图),如图 2 所示:
图 2 多重锚定构成的钻石型结构
Storm 的实现可以像处理树那样来处理 DAGs。
锚定表明了如何将一个消息加入到指定的 tuple tree 中,高可靠处理 API 的接下来部分将向您描述当处理完 tuple tree 中一个单独的消息时我们该做些什么。这些是通过 OutputCollector 的 ack 和 fail 方法来实现的。回头看一下例子 SplitSentence,可以发现当所有的 word 消息被发送完成后,输入的表示句子的消息会被应答(acked)。
每个被处理的消息必须表明成功或失败(acked 或者 failed)。Storm 是使用内存来跟踪每个消息的处理情况的,如果被处理的消息没有应答的话,迟早内存会被耗尽!
很多 bolt 遵循特定的处理流程:读取一个消息、发送它派生出来的子消息、在 execute 结尾处应答此消息。一般的过滤器(filter)或者是简单的处理功能都是这类的应用。Storm 有一个 BasicBolt 接口封装了上述的流程。示例 SplitSentence 可以使用 BasicBolt 来重写:
使用这种方式,代码比之前稍微简单了一些,但是实现的功能是一样的。发送到 BasicOutputCollector 的消息会被自动的锚定到输入消息,并且,当 execute 执行完毕的时候,会自动的应答输入消息。
很多情况下,一个消息需要延迟应答,例如聚合或者是 join。只有根据一组输入消息得到一个结果之后,才会应答之前所有的输入消息。并且聚合和 join 大部分时候对输出消息都是多重锚定。然而,这些特性不是 IBasicBolt 所能处理的。
4.5 高效的实现 tuple tree
Storm 系统中有一组叫做“acker”的特殊的任务,它们负责跟踪 DAG(有向无环图)中的每个消息。每当发现一个 DAG 被完全处理,它就向创建这个根消息的 spout 任务发送一个信号。拓扑中 acker 任务的并行度可以通过配置参数 Config.TOPOLOGY_ACKERS 来设置。默认的 acker 任务并行度为 1,当系统中有大量的消息时,应该适当提高 acker 任务的并发度。
为了理解 Storm 可靠性处理机制,我们从研究一个消息的生命周期和 tuple tree 的管理入手。当一个消息被创建的时候(无论是在 spout 还是 bolt 中),系统都为该消息分配一个 64bit 的随机值作为 id。这些随机的 id 是 acker 用来跟踪由 spout 消息派生出来的 tuple tree 的。
每个消息都知道它所在的 tuple tree 对应的根消息的 id。每当 bolt 新生成一个消息,对应 tuple tree 中的根消息的 messageId 就拷贝到这个消息中。当这个消息被应答的时候,它就把关于 tuple tree 变化的信息发送给跟踪这棵树的 acker。例如,他会告诉 acker:本消息已经处理完毕,但是我派生出了一些新的消息,帮忙跟踪一下吧。
举个例子,假设消息 D 和 E 是由消息 C 派生出来的,这里演示了消息 C 被应答时,tuple tree 是如何变化的。
因为在 C 被从树中移除的同时 D 和 E 会被加入到 tuple tree 中,因此 tuple tree 不会被过早的认为已完全处理。
关于 Storm 如何跟踪 tuple tree,我们再深入的探讨一下。前面说过系统中可以有任意个数的 acker,那么,每当一个消息被创建或应答的时候,它怎么知道应该通知哪个 acker 呢?
系统使用一种哈希算法来根据 spout 消息的 messageId 确定由哪个 acker 跟踪此消息派生出来的 tuple tree。因为每个消息都知道与之对应的根消息的 messageId,因此它知道应该与哪个 acker 通信。
当 spout 发送一个消息的时候,它就通知对应的 acker 一个新的根消息产生了,这时 acker 就会创建一个新的 tuple tree。当 acker 发现这棵树被完全处理之后,他就会通知对应的 spout 任务。
tuple 是如何被跟踪的呢?系统中有成千上万的消息,如果为每个 spout 发送的消息都构建一棵树的话,很快内存就会耗尽。所以,必须采用不同的策略来跟踪每个消息。由于使用了新的跟踪算法,Storm 只需要固定的内存(大约 20 字节)就可以跟踪一棵树。这个算法是 storm 正确运行的核心,也是 storm 最大的突破。
acker 任务保存了 spout 消息 id 到一对值的映射。第一个值就是 spout 的任务 id,通过这个 id,acker 就知道消息处理完成时该通知哪个 spout 任务。第二个值是一个 64bit 的数字,我们称之为“ack val”,它是树中所有消息的随机 id 的异或结果。ack val 表示了整棵树的的状态,无论这棵树多大,只需要这个固定大小的数字就可以跟踪整棵树。当消息被创建和被应答的时候都会有相同的消息 id 发送过来做异或。
每当 acker 发现一棵树的 ack val 值为 0 的时候,它就知道这棵树已经被完全处理了。因为消息的随机 ID 是一个 64bit 的值,因此 ack val 在树处理完之前被置为 0 的概率非常小。假设你每秒钟发送一万个消息,从概率上说,至少需要 50,000,000 年才会有机会发生一次错误。即使如此,也只有在这个消息确实处理失败的情况下才会有数据的丢失!
4.6 选择合适的可靠性级别
Acker 任务是轻量级的,所以在拓扑中并不需要太多的 acker 存在。可以通过 Storm UI 来观察 acker 任务的吞吐量,如果看上去吞吐量不够的话,说明需要添加额外的 acker。
如果你并不要求每个消息必须被处理(你允许在处理过程中丢失一些信息),那么可以关闭消息的可靠处理机制,从而可以获取较好的性能。关闭消息的可靠处理机制意味着系统中的消息数会减半(每个消息不需要应答了)。另外,关闭消息的可靠处理可以减少消息的大小(不需要每个 tuple 记录它的根 id 了),从而节省带宽。
有三种方法可以关系消息的可靠处理机制:
将参数 Config.TOPOLOGY_ACKERS 设置为 0,通过此方法,当 Spout 发送一个消息的时候,它的 ack 方法将立刻被调用;
第二个方法是 Spout 发送一个消息时,不指定此消息的 messageID。当需要关闭特定消息可靠性的时候,可以使用此方法;
最后,如果你不在意某个消息派生出来的子孙消息的可靠性,则此消息派生出来的子消息在发送时不要做锚定,即在 emit 方法中不指定输入消息。因为这些子孙消息没有被锚定在任何 tuple tree 中,因此他们的失败不会引起任何 spout 重新发送消息。
4.7 集群的各级容错
到现在为止,大家已经理解了 Storm 的可靠性机制,并且知道了如何选择不同的可靠性级别来满足需求。接下来我们研究一下 Storm 如何保证在各种情况下确保数据不丢失。
3.7.1 任务级失败
因为 bolt 任务 crash 引起的消息未被应答。此时,acker 中所有与此 bolt 任务关联的消息都会因为超时而失败,对应 spout 的 fail 方法将被调用。
acker 任务失败。如果 acker 任务本身失败了,它在失败之前持有的所有消息都将会因为超时而失败。Spout 的 fail 方法将被调用。
Spout 任务失败。这种情况下,Spout 任务对接的外部设备(如 MQ)负责消息的完整性。例如当客户端异常的情况下,kestrel 队列会将处于 pending 状态的所有的消息重新放回到队列中。
4.7.2 任务槽(slot) 故障
worker 失败。每个 worker 中包含数个 bolt(或 spout)任务。supervisor 负责监控这些任务,当 worker 失败后,supervisor 会尝试在本机重启它。
supervisor 失败。supervisor 是无状态的,因此 supervisor 的失败不会影响当前正在运行的任务,只要及时的将它重新启动即可。supervisor 不是自举的,需要外部监控来及时重启。
nimbus 失败。nimbus 是无状态的,因此 nimbus 的失败不会影响当前正在运行的任务(nimbus 失败时,无法提交新的任务),只要及时的将它重新启动即可。nimbus 不是自举的,需要外部监控来及时重启。
4.7.3. 集群节点(机器)故障
storm 集群中的节点故障。此时 nimbus 会将此机器上所有正在运行的任务转移到其他可用的机器上运行。
zookeeper 集群中的节点故障。zookeeper 保证少于半数的机器宕机仍可正常运行,及时修复故障机器即可。
“storm 消息的可靠处理方法是什么”的内容就介绍到这里了,感谢大家的阅读。如果想了解更多行业相关的知识可以关注丸趣 TV 网站,丸趣 TV 小编将为大家输出更多高质量的实用文章!