Executor容错安全性实例分析

71次阅读
没有评论

共计 6042 个字符,预计需要花费 16 分钟才能阅读完成。

这篇文章主要介绍“Executor 容错安全性实例分析”,在日常操作中,相信很多人在 Executor 容错安全性实例分析问题上存在疑惑,丸趣 TV 小编查阅了各式资料,整理出简单好用的操作方法,希望对大家解答”Executor 容错安全性实例分析”的疑惑有所帮助!接下来,请跟着丸趣 TV 小编一起来学习吧!

sparkstreaming 会不断的接收数据、不断的产生 job、不断的提交 job。所以有一个至关重要的问题就是数据安全性。由于 sparkstreaming 是基于 sparkcore 的,如果我们可以确保数据安全可靠的话(sparkstreaming 生产 job 的时候里面是基于 RDD),即使运行的时候出现错误或者故障,也可以基于 RDD 的容错的能力自动进行恢复。所以要确保数据的安全性。

对于 executor 的安全容错主要是数据的安全容错。Executor 计算时候的安全容错是借助 spark core 的 RDD 的,所以天然是安全的。

数据安全性的一种方式是存储一份副本,另一种方式是不做副本,但是数据源支持重放(也就是可以反复的读取数据源的数据),如果之前读取的数据出现问题,可以重新读取数据。

做副本的方式可以借助 blockmanager 做备份。Blockmanager 存储数据的时候有很多 storagelevel,Receiver 接收数据后,存储的时候指定 storagelevel 为 MEMORY_AND_DISK_SER_2 的方式。Blockmanager 早存储的时候会先考虑 memory,只有 memory 不够的时候才会考虑 disk,一般 memory 都是够的。所以至少两个 executor 上都会有数据,假设一个 executor 挂掉,就会马上切换到另一个 executor。

ReceiverSupervisorImpl 在存储数据的时候会有两种方式,一种是 WAL 的方式,究竟是不是 WAL 得方式是通过配置修改的。默认是 false。如果用 WAL 的方式必须有 checkpoint 的目录,因为 WAL 的数据是放在 checkpoint 的目录之下的。

def enableReceiverLog(conf: SparkConf): Boolean = {
 conf.getBoolean(RECEIVER_WAL_ENABLE_CONF_KEY, false)
}

Storagelevel 是在构建 inputDstream 的时候传入的,默认就是 MEMORY_AND_DISK_SER_2。

* @param storageLevel  Storage level to use for storing the received objects
 *  (default: StorageLevel.MEMORY_AND_DISK_SER_2)
 */
def socketTextStream(
 hostname: String,
 port: Int,
 storageLevel: StorageLevel = StorageLevel.MEMORY_AND_DISK_SER_2
 ): ReceiverInputDStream[String] = withNamedScope(socket text stream) {
 socketStream[String](hostname, port, SocketReceiver.bytesToLines, storageLevel)
}

现在来看 ReceiverSupervisorImpl 在存储数据的另一种方式(副本方式)。注释中说的很清楚,根据指定的 storagelevel 把接收的 blocks 交给 blockmanager。也就是通过 blockmanager 来存储。

/**
 * Implementation of a [[org.apache.spark.streaming.receiver.ReceivedBlockHandler]] which
 * stores the received blocks into a block manager with the specified storage level.
 */
private[streaming] class BlockManagerBasedBlockHandler(
 blockManager: BlockManager, storageLevel: StorageLevel)

Blockmanager 存储的时候会分为多种不同的数据类型,ArrayBufferBlock,IteratorBlock,ByteBufferBlock。

Blockmanager 存储数据前面已经讲过了。Receiver 在接收到数据后除了在自己这个 executor 上面存储,还会在另外一个 executor 上存储。如果一个 executor 出现问题会瞬间切换到另一个 executor。

WAL 的方式原理:在具体的目录下会做一份日志,假设后续处理的过程中出了问题,可以基于日志恢复,日志是写在 checkpoint 下。在生产环境下 checkpoint 是在 HDFS 上,这样日志就会有三份副本。

下面就是用 WAL 存储数据的类,先写日志再交给 blockmanager 存储。

/**
 * Implementation of a [[org.apache.spark.streaming.receiver.ReceivedBlockHandler]] which
 * stores the received blocks in both, a write ahead log and a block manager.
 */
private[streaming] class WriteAheadLogBasedBlockHandler(

如果采用 WAL 的方式,存储数据的时候就不需要有两份副本,这样太浪费内存,如果 storagelevel.replication 大于 1 就会打印警告日志。

private val effectiveStorageLevel = {
 if (storageLevel.deserialized) {
 logWarning(s Storage level serialization ${storageLevel.deserialized} is not supported when  +
 s write ahead log is enabled, change to serialization false )
 }
 if (storageLevel.replication  1) {
 logWarning(s Storage level replication ${storageLevel.replication} is unnecessary when  +
 s write ahead log is enabled, change to replication 1 )
 }

 StorageLevel(storageLevel.useDisk, storageLevel.useMemory, storageLevel.useOffHeap, false, 1)
}

这里采用两条线程的线程池,使得 blockmanager 存储数据和 write ahead log 可以并发的执行。

// For processing futures used in parallel block storing into block manager and write ahead log
// # threads = 2, so that both writing to BM and WAL can proceed in parallel
implicit private val executionContext = ExecutionContext.fromExecutorService(
 ThreadUtils.newDaemonFixedThreadPool(2, this.getClass.getSimpleName))

这个是把日志写入 WAL 中

// Store the block in write ahead log
val storeInWriteAheadLogFuture = Future {
 writeAheadLog.write(serializedBlock, clock.getTimeMillis())
}

负责读写 WAL 的是 WriteAheadLog,这是一个抽象类,负责写入、读取、清除数据的功能。在写入数据后会返回一个句柄,以供读取数据使用。

看一下具体写入数据的实现。如果失败并且失败次数小于最大的失败次数就会重试。确实是返回了一个句柄。

/**
 * Write a byte buffer to the log file. This method synchronously writes the data in the
 * ByteBuffer to HDFS. When this method returns, the data is guaranteed to have been flushed
 * to HDFS, and will be available for readers to read.
 */
def write(byteBuffer: ByteBuffer, time: Long): FileBasedWriteAheadLogSegment = synchronized {
 var fileSegment: FileBasedWriteAheadLogSegment = null
 var failures = 0
 var lastException: Exception = null
 var succeeded = false
 while (!succeeded failures maxFailures) {
 try {
 fileSegment = getLogWriter(time).write(byteBuffer)
 if (closeFileAfterWrite) {
 resetWriter()
 }
 succeeded = true
 } catch {
 case ex: Exception =
 lastException = ex
 logWarning(Failed to write to write ahead log)
 resetWriter()
 failures += 1
 }
 }
 if (fileSegment == null) {
 logError(s Failed to write to write ahead log after $failures failures)
 throw lastException
 }
 fileSegment
}

下面就是把数据写入 HDFS 的代码

/** Write the bytebuffer to the log file */
def write(data: ByteBuffer): FileBasedWriteAheadLogSegment = synchronized {
 assertOpen()
 data.rewind() // Rewind to ensure all data in the buffer is retrieved
 val lengthToWrite = data.remaining()
 val segment = new FileBasedWriteAheadLogSegment(path, nextOffset, lengthToWrite)
 stream.writeInt(lengthToWrite)
 if (data.hasArray) {
 stream.write(data.array())
 } else {
 // If the buffer is not backed by an array, we transfer using temp array
 // Note that despite the extra array copy, this should be faster than byte-by-byte copy
 while (data.hasRemaining) {
 val array = new Array[Byte](data.remaining)
 data.get(array)
 stream.write(array)
 }
 }
 flush()
 nextOffset = stream.getPos()
 segment
}

不管是 WAL 还是直接交给 blockmanager 都是采用副本的方式。还有一种是数据源支持数据存放,典型的就是 kafka。Kafka 已经成为了数据存储系统,它天然具有容错和数据副本。

Kafka 有 receiver 和 direct 的方式。Receiver 的方式其实是交给 zookeper 来管理 matadata 的(偏移量 offset),如果数据处理失败后,kafka 会基于 offset 重新读取数据。为什么可以重新读取?如果程序崩溃或者数据没处理完是不会给 zookeper 发 ack。Zookeper 就认为这个数据没有被消费。实际生产环境下越来越多的使用 directAPI 的方式,直接去操作 kafka 并且是自己管理 offset。这就可以保证有且只有一次的容错处理。DirectKafkaInputDstream,它会去看最新的 offset,并把这个内容放入 batch 中。

获取最新的 offset,通过最新的 offset 减去上一个 offset 就可以确定读哪些数据,也就是一个 batch 中的数据。

@tailrec
protected final def latestLeaderOffsets(retries: Int): Map[TopicAndPartition, LeaderOffset] = {
 val o = kc.getLatestLeaderOffsets(currentOffsets.keySet)
 // Either.fold would confuse @tailrec, do it manually
 if (o.isLeft) {
 val err = o.left.get.toString
 if (retries = 0) {
 throw new SparkException(err)
 } else {
 log.error(err)
 Thread.sleep(kc.config.refreshLeaderBackoffMs)
 latestLeaderOffsets(retries – 1)
 }
 } else {
 o.right.get
 }
}

容错的弊端就是消耗性能,占用时间。也不是所有情况都不能容忍数据丢失。有些情况下可以不进行容错来提高性能。

假如一次处理 1000 个 block,但是有 1 个 block 出错,就需要把 1000 个 block 进行重新读取或者恢复,这也有性能问题。

到此,关于“Executor 容错安全性实例分析”的学习就结束了,希望能够解决大家的疑惑。理论与实践的搭配能更好的帮助大家学习,快去试试吧!若想继续学习更多相关知识,请继续关注丸趣 TV 网站,丸趣 TV 小编会继续努力为大家带来更多实用的文章!

正文完
 
丸趣
版权声明:本站原创文章,由 丸趣 2023-08-16发表,共计6042字。
转载说明:除特殊说明外本站除技术相关以外文章皆由网络搜集发布,转载请注明出处。
评论(没有评论)