Exactly once事务的处理方法是什么

87次阅读
没有评论

共计 2310 个字符,预计需要花费 6 分钟才能阅读完成。

本篇内容介绍了“Exactly once 事务的处理方法是什么”的有关知识,在实际案例的操作过程中,不少人都会遇到这样的困境,接下来就让丸趣 TV 小编带领大家学习一下如何处理这些情况吧!希望大家仔细阅读,能够学有所成!

1,Exactly once 事务

什么事 Exactly once 事务?

数据仅处理一次并且仅输出一次,这样才是完整的事务处理。

Spark 在运行出错时不能保证输出也是事务级别的。在 Task 执行一半的时候出错了,虽然在语义上做了事务处理,数据仅被处理一次,但是如果是输出到数据库中,那有空能将结果多次保存到数据库中。Spark 在任务失败时会进行重试,这样会导致结果多次保存到数据库中。

如下图,当运行在 Executor 上的 Receiver 接收到数据通过 BlockManager 写入内存和磁盘,或者通过 WAL 机制写记录日志,然后把 metedata 信息汇报给 Driver。在 Driver 端定期进行 checkpoint 操作。Job 的执行还是基于 Spark Core 的调度模式在 Executor 上执行 Task。

Exactly once 事务的处理:

1,数据零丢失:必须有可靠的数据来源和可靠的 Receiver,且整个应用程序的 metadata 必须进行 checkpoint,且通过 WAL 来保证数据安全。

我们以数据来自 Kafka 为例,运行在 Executor 上的 Receiver 在接收到来自 Kafka 的数据时会向 Kafka 发送 ACK 确认收到信息并读取下一条信息,kafka 会 updateOffset 来记录 Receiver 接收到的偏移,这种方式保证了在 Executor 数据零丢失。

在 Driver 端,定期进行 checkpoint 操作,出错时从 Checkpoint 的文件系统中把数据读取进来进行恢复,内部会重新构建 StreamingContext(也就是构建 SparkContext)并启动,恢复出元数据 metedata,再次产生 RDD,恢复的是上次的 Job,然后再次提交到集群执行。

那么数据可能丢失的地方有哪些呢和相应的解决方式?

  在 Receiver 收到数据且通过 Driver 的调度 Executor 开始计算数据的时候,如果 Driver 突然奔溃,则此时 Executor 会被杀死,那么 Executor 中的数据就会丢失(如果没有进行 WAL 的操作)。

解决方式:此时就必须通过例如 WAL 的方式,让所有的数据都通过例如 HDFS 的方式首先进行安全性容错处理。此时如果 Executor 中的数据丢失的话,就可以通过 WAL 恢复回来。

这种方式的弊端是通过 WAL 的方式会极大额损伤 SparkStreaming 中 Receivers 接收数据的性能。

数据重复读取的情况:

  在 Receiver 收到数据保存到 HDFS 等持久化引擎但是没有来得及进行 updateOffsets(以 Kafka 为例),此时 Receiver 崩溃后重新启动就会通过管理 Kafka 的 Zookeeper 中元数据再次重复读取数据,但是此时 SparkStreaming 认为是成功的,但是 kafka 认为是失败的(因为没有更新 offset 到 ZooKeeper 中),此时就会导致数据重新消费的情况。

  解决方式:以 Receiver 基于 ZooKeeper 的方式,当读取数据时去访问 Kafka 的元数据信息,在处理代码中例如 foreachRDD 或 transform 时,将信息写入到内存数据库中(memorySet),在计算时读取内存数据库信息,判断是否已处理过,如果以处理过则跳过计算。这些元数据信息可以保存到内存数据结构或者 memsql,sqllite 中。

如果通过 Kafka 作为数据来源的话,Kafka 中有数据,然后 Receiver 接收的时候又会有数据副本,这个时候其实是存储资源的浪费。

Spark 在 1.3 的时候为了避免 WAL 的性能损失和实现 Exactly Once 而提供了 Kafka Direct API,把 Kafka 作为文件存储系统。此时兼具有流的优势和文件系统的优势,至此 Spark Streaming+Kafka 就构建了完美的流处理世界(1,数据不需要拷贝副本;2,不需要 WAL 对性能的损耗;3,Kafka 使用 ZeroCopy 比 HDFS 更高效)。所有的 Executors 通过 Kafka API 直接消息数据,直接管理 Offset,所以也不会重复消费数据。

2,输出不重复

关于 Spark Streaming 数据输出多次重写及其解决方案:

1,为什么会有这个问题,因为 Spark Streaming 在计算的时候基于 Spark Core 天生会做以下事情导致 Spark Streaming 的结果 (部分) 重复输出。Task 重试,慢任务推测,Stage 重试,Job 重试。

2,具体解决方案:

设置 spark.task.maxFailures 次数为 1,这样就不会有 Task 重试了。设置 spark.speculation 为关闭状态,就不会有慢任务推测了,因为慢任务推测非常消耗性能,所以关闭后可以显著提高 Spark Streaming 处理性能。

Spark Streaming On Kafka 的话,Job 失败后可以设置 Kafka 的参数 auto.offset.reset 为 largest 方式。

  最后再次强调可以通过 transform 和 foreachRDD 基于业务逻辑代码进行逻辑控制来实现数据不重复消费和输出不重复。这两个方法类似于 Spark Streaming 的后门,可以做任意想象的控制操作。

“Exactly once 事务的处理方法是什么”的内容就介绍到这里了,感谢大家的阅读。如果想了解更多行业相关的知识可以关注丸趣 TV 网站,丸趣 TV 小编将为大家输出更多高质量的实用文章!

正文完
 
丸趣
版权声明:本站原创文章,由 丸趣 2023-08-16发表,共计2310字。
转载说明:除特殊说明外本站除技术相关以外文章皆由网络搜集发布,转载请注明出处。
评论(没有评论)