共计 7991 个字符,预计需要花费 20 分钟才能阅读完成。
本篇内容主要讲解“DStream 与 RDD 关系是什么”,感兴趣的朋友不妨来看看。本文介绍的方法操作简单快捷,实用性强。下面就让丸趣 TV 小编来带大家学习“DStream 与 RDD 关系是什么”吧!
RDD 是怎么生成的?RDD 依靠什么生成?RDD 生成的依据是什么?Spark Streaming 中 RDD 的执行是否和 Spark Core 中的 RDD 执行有所不同?运行之后我们对 RDD 怎么处理?
RDD 本身也是基本的对象,例如说 BatchInterval 为 1 秒,那么每一秒都会产生 RDD,内存中不能完全容纳该对象。每个 BatchInterval 的作业执行完后,怎么对已有的 RDD 进行管理。
ForEachDStream 不一定会触发 Job 的执行,和 Job 的执行没有关系。
Job 的产生是由 Spark Streaming 框架造成的。
foreachRDD 是 Spark Streaming 的后门,可以直接对 RDD 进行操作。
DStream 就是 RDD 的模板,后面的 DStream 与前面的 DStream 有依赖。
val lines = jsc.socketTextStream(127.0.0.1 , 9999) 这里产生了 SocketInputDStream。
lines.flatMap(_.split()).map(word = (word, 1)).reduceByKey(_ + _).print() 这里由 SocketInputDStream 转换为 FlatMappedDStream,再转换为 MappedDStream,再转换为 ShuffledDStream,再转换为 ForEachDStream。
对于 DStream 类,源码中是这样解释的。
* DStreams internally is characterized by a few basic properties:
* – A list of other DStreams that the DStream depends on
* – A time interval at which the DStream generates an RDD
* – A function that is used to generate an RDD after each time interval
大致意思是:
1.DStream 依赖于其他 DStream。
2. 每隔 BatchDuration,DStream 生成一个 RDD
3. 每隔 BatchDuration,DStream 内部函数会生成 RDD
DStream 是从后往前依赖,因为 DStream 代表 Spark Streaming 业务逻辑,RDD 是从后往前依赖的,DStream 是 lazy 级别的。DStream 的依赖关系必须和 RDD 的依赖关系保持高度一致。
DStream 类中 generatedRDDs 存储着不同时间对应的 RDD 实例。每一个 DStream 实例都有自己的 generatedRDDs。实际运算的时候,由于是从后往前推,计算只作用于最后一个 DStream。
// RDDs generated, marked as private[streaming] so that testsuites can access it
@transient
private[streaming] var generatedRDDs = new HashMap[Time, RDD[T]] ()
generatedRDDs 是如何获取的。DStream 的 getOrCompute 方法,先根据时间判断 HashMap 中是否已存在该时间对应的 RDD,如果没有则调用 compute 得到 RDD,并放入到 HashMap 中。
/**
* Get the RDD corresponding to the given time; either retrieve it from cache
* or compute-and-cache it.
*/
private[streaming] final def getOrCompute(time: Time): Option[RDD[T]] = {
// If RDD was already generated, then retrieve it from HashMap,
// or else compute the RDD
generatedRDDs.get(time).orElse {
// Compute the RDD if time is valid (e.g. correct time in a sliding window)
// of RDD generation, else generate nothing.
if (isTimeValid(time)) {
val rddOption = createRDDWithLocalProperties(time, displayInnerRDDOps = false) {
// Disable checks for existing output directories in jobs launched by the streaming
// scheduler, since we may need to write output to an existing directory during checkpoint
// recovery; see SPARK-4835 for more details. We need to have this call here because
// compute() might cause Spark jobs to be launched.
PairRDDFunctions.disableOutputSpecValidation.withValue(true) {
compute(time)
}
}
rddOption.foreach {case newRDD =
// Register the generated RDD for caching and checkpointing
if (storageLevel != StorageLevel.NONE) {
newRDD.persist(storageLevel)
logDebug(s Persisting RDD ${newRDD.id} for time $time to $storageLevel )
}
if (checkpointDuration != null (time – zeroTime).isMultipleOf(checkpointDuration)) {
newRDD.checkpoint()
logInfo(s Marking RDD ${newRDD.id} for time $time for checkpointing )
}
generatedRDDs.put(time, newRDD)
}
rddOption
} else {
None
}
}
}
拿 DStream 的子类 ReceiverInputDStream 来说明 compute 方法,内部调用了 createBlockRDD 这个方法。
/**
* Generates RDDs with blocks received by the receiver of this stream. */
override def compute(validTime: Time): Option[RDD[T]] = {
val blockRDD = {
if (validTime graph.startTime) {
// If this is called for any time before the start time of the context,
// then this returns an empty RDD. This may happen when recovering from a
// driver failure without any write ahead log to recover pre-failure data.
new BlockRDD[T](ssc.sc, Array.empty)
} else {
// Otherwise, ask the tracker for all the blocks that have been allocated to this stream
// for this batch
val receiverTracker = ssc.scheduler.receiverTracker
val blockInfos = receiverTracker.getBlocksOfBatch(validTime).getOrElse(id, Seq.empty)
// Register the input blocks information into InputInfoTracker
val inputInfo = StreamInputInfo(id, blockInfos.flatMap(_.numRecords).sum)
ssc.scheduler.inputInfoTracker.reportInfo(validTime, inputInfo)
// Create the BlockRDD
createBlockRDD(validTime, blockInfos)
}
}
Some(blockRDD)
}
createBlockRDD 会返回 BlockRDD,由于 ReceiverInputDStream 没有父依赖,所以自己生成 RDD。
private[streaming] def createBlockRDD(time: Time, blockInfos: Seq[ReceivedBlockInfo]): RDD[T] = {
if (blockInfos.nonEmpty) {
val blockIds = blockInfos.map {_.blockId.asInstanceOf[BlockId] }.toArray
// Are WAL record handles present with all the blocks
val areWALRecordHandlesPresent = blockInfos.forall {_.walRecordHandleOption.nonEmpty}
if (areWALRecordHandlesPresent) {
// If all the blocks have WAL record handle, then create a WALBackedBlockRDD
val isBlockIdValid = blockInfos.map {_.isBlockIdValid() }.toArray
val walRecordHandles = blockInfos.map {_.walRecordHandleOption.get}.toArray
new WriteAheadLogBackedBlockRDD[T](
ssc.sparkContext, blockIds, walRecordHandles, isBlockIdValid)
} else {
// Else, create a BlockRDD. However, if there are some blocks with WAL info but not
// others then that is unexpected and log a warning accordingly.
if (blockInfos.find(_.walRecordHandleOption.nonEmpty).nonEmpty) {
if (WriteAheadLogUtils.enableReceiverLog(ssc.conf)) {
logError(Some blocks do not have Write Ahead Log information; +
this is unexpected and data may not be recoverable after driver failures )
} else {
logWarning(Some blocks have Write Ahead Log information; this is unexpected)
}
}
val validBlockIds = blockIds.filter {id =
ssc.sparkContext.env.blockManager.master.contains(id)
}
if (validBlockIds.size != blockIds.size) {
logWarning(Some blocks could not be recovered as they were not found in memory. +
To prevent such data loss, enabled Write Ahead Log (see programming guide +
for more details. )
}
new BlockRDD[T](ssc.sc, validBlockIds)
}
} else {
// If no block is ready now, creating WriteAheadLogBackedBlockRDD or BlockRDD
// according to the configuration
if (WriteAheadLogUtils.enableReceiverLog(ssc.conf)) {
new WriteAheadLogBackedBlockRDD[T](
ssc.sparkContext, Array.empty, Array.empty, Array.empty)
} else {
new BlockRDD[T](ssc.sc, Array.empty)
}
}
}
再拿 DStream 的子类 MappedDStream 来说,这里的 compute 方法,是调用父 RDD 的 getOrCompute 方法获得 RDD,再使用 map 操作。
private[streaming]
class MappedDStream[T: ClassTag, U: ClassTag] (
parent: DStream[T],
mapFunc: T = U
) extends DStream[U](parent.ssc) {
override def dependencies: List[DStream[_]] = List(parent)
override def slideDuration: Duration = parent.slideDuration
override def compute(validTime: Time): Option[RDD[U]] = {
parent.getOrCompute(validTime).map(_.map[U](mapFunc))
}
}
从上面两个 DStream 的子类,可以说明第一个 DStream,即 InputDStream 的 comput 方法是自己获取数据并计算的,而其他的 DStream 是依赖父 DStream 的,调用父 DStream 的 getOrCompute 方法,然后进行计算。
以上说明了对 DStream 的操作最后作用于对 RDD 的操作。
接着看下 DStream 的另一个子类 ForEachDStream,发现其 compute 方法没有任何操作,但是重写了 generateJob 方法。
private[streaming]
class ForEachDStream[T: ClassTag] (
parent: DStream[T],
foreachFunc: (RDD[T], Time) = Unit,
displayInnerRDDOps: Boolean
) extends DStream[Unit](parent.ssc) {
override def dependencies: List[DStream[_]] = List(parent)
override def slideDuration: Duration = parent.slideDuration
override def compute(validTime: Time): Option[RDD[Unit]] = None
override def generateJob(time: Time): Option[Job] = {
parent.getOrCompute(time) match {
case Some(rdd) =
val jobFunc = () = createRDDWithLocalProperties(time, displayInnerRDDOps) {
foreachFunc(rdd, time)
}
Some(new Job(time, jobFunc))
case None = None
}
}
}
从 Job 生成入手,JobGenerator 的 generateJobs 方法,内部调用的 DStreamGraph 的 generateJobs 方法。
/** Generate jobs and perform checkpoint for the given `time`. */
private def generateJobs(time: Time) {
// Set the SparkEnv in this thread, so that job generation code can access the environment
// Example: BlockRDDs are created in this thread, and it needs to access BlockManager
// Update: This is probably redundant after threadlocal stuff in SparkEnv has been removed.
SparkEnv.set(ssc.env)
Try {
// 根据特定的时间获取具体的数据
jobScheduler.receiverTracker.allocateBlocksToBatch(time) // allocate received blocks to batch
// 调用 DStreamGraph 的 generateJobs 生成 Job
graph.generateJobs(time) // generate jobs using allocated block
} match {
case Success(jobs) =
val streamIdToInputInfos = jobScheduler.inputInfoTracker.getInfo(time)
jobScheduler.submitJobSet(JobSet(time, jobs, streamIdToInputInfos))
case Failure(e) =
jobScheduler.reportError(Error generating jobs for time + time, e)
}
eventLoop.post(DoCheckpoint(time, clearCheckpointDataLater = false))
}
DStreamGraph 的 generateJobs 方法调用了 OutputStream 的 generateJob 方法,OutputStream 就是 ForEachDStream。
def generateJobs(time: Time): Seq[Job] = {
logDebug(Generating jobs for time + time)
val jobs = this.synchronized {
outputStreams.flatMap {outputStream =
val jobOption = outputStream.generateJob(time)
jobOption.foreach(_.setCallSite(outputStream.creationSite))
jobOption
}
}
logDebug(Generated + jobs.length + jobs for time + time)
jobs
}
到此,相信大家对“DStream 与 RDD 关系是什么”有了更深的了解,不妨来实际操作一番吧!这里是丸趣 TV 网站,更多相关内容可以进入相关频道进行查询,关注我们,继续学习!