RDD 是怎么生成的?RDD 依靠什么生成?RDD 生成的依据是什么?Spark Streaming 中 RDD 的执行是否和 Spark Core 中的 RDD 执行有所不同?运行之后我们对 RDD 怎么处理?

RDD 本身也是基本的对象,例如说 BatchInterval 为 1 秒,那么每一秒都会产生 RDD,内存中不能完全容纳该对象。每个 BatchInterval 的作业执行完后,怎么对已有的 RDD 进行管理。

ForEachDStream 不一定会触发 Job 的执行,和 Job 的执行没有关系。

Job 的产生是由 Spark Streaming 框架造成的。

foreachRDD 是 Spark Streaming 的后门,可以直接对 RDD 进行操作。

DStream 就是 RDD 的模板,后面的 DStream 与前面的 DStream 有依赖。

val lines = jsc.socketTextStream( , 9999) 这里产生了 SocketInputDStream。

lines.flatMap(_.split()).map(word = (word, 1)).reduceByKey(_ + _).print() 这里由 SocketInputDStream 转换为 FlatMappedDStream,再转换为 MappedDStream,再转换为 ShuffledDStream,再转换为 ForEachDStream。

对于 DStream 类,源码中是这样解释的。

* DStreams internally is characterized by a few basic properties:
*  – A list of other DStreams that the DStream depends on
*  – A time interval at which the DStream generates an RDD
*  – A function that is used to generate an RDD after each time interval


1.DStream 依赖于其他 DStream。

2. 每隔 BatchDuration,DStream 生成一个 RDD

3. 每隔 BatchDuration,DStream 内部函数会生成 RDD

DStream 是从后往前依赖,因为 DStream 代表 Spark Streaming 业务逻辑,RDD 是从后往前依赖的,DStream 是 lazy 级别的。DStream 的依赖关系必须和 RDD 的依赖关系保持高度一致。

DStream 类中 generatedRDDs 存储着不同时间对应的 RDD 实例。每一个 DStream 实例都有自己的 generatedRDDs。实际运算的时候,由于是从后往前推,计算只作用于最后一个 DStream。

// RDDs generated, marked as private[streaming] so that testsuites can access it
private[streaming] var generatedRDDs = new HashMap[Time, RDD[T]] ()

generatedRDDs 是如何获取的。DStream 的 getOrCompute 方法,先根据时间判断 HashMap 中是否已存在该时间对应的 RDD,如果没有则调用 compute 得到 RDD,并放入到 HashMap 中。

 * Get the RDD corresponding to the given time; either retrieve it from cache
 * or compute-and-cache it.
private[streaming] final def getOrCompute(time: Time): Option[RDD[T]] = {
 // If RDD was already generated, then retrieve it from HashMap,
 // or else compute the RDD
 generatedRDDs.get(time).orElse {
 // Compute the RDD if time is valid (e.g. correct time in a sliding window)
 // of RDD generation, else generate nothing.
 if (isTimeValid(time)) {

 val rddOption = createRDDWithLocalProperties(time, displayInnerRDDOps = false) {
 // Disable checks for existing output directories in jobs launched by the streaming
 // scheduler, since we may need to write output to an existing directory during checkpoint
 // recovery; see SPARK-4835 for more details. We need to have this call here because
 // compute() might cause Spark jobs to be launched.
 PairRDDFunctions.disableOutputSpecValidation.withValue(true) {

 rddOption.foreach {case newRDD =
 // Register the generated RDD for caching and checkpointing
 if (storageLevel != StorageLevel.NONE) {
 logDebug(s Persisting RDD ${newRDD.id} for time $time to $storageLevel )
 if (checkpointDuration != null (time – zeroTime).isMultipleOf(checkpointDuration)) {
 logInfo(s Marking RDD ${newRDD.id} for time $time for checkpointing )
 generatedRDDs.put(time, newRDD)
 } else {

拿 DStream 的子类 ReceiverInputDStream 来说明 compute 方法,内部调用了 createBlockRDD 这个方法。

 * Generates RDDs with blocks received by the receiver of this stream. */
override def compute(validTime: Time): Option[RDD[T]] = {
 val blockRDD = {
 if (validTime graph.startTime) {
 // If this is called for any time before the start time of the context,
 // then this returns an empty RDD. This may happen when recovering from a
 // driver failure without any write ahead log to recover pre-failure data.
 new BlockRDD[T](ssc.sc, Array.empty)
 } else {
 // Otherwise, ask the tracker for all the blocks that have been allocated to this stream
 // for this batch
 val receiverTracker = ssc.scheduler.receiverTracker
 val blockInfos = receiverTracker.getBlocksOfBatch(validTime).getOrElse(id, Seq.empty)

 // Register the input blocks information into InputInfoTracker
 val inputInfo = StreamInputInfo(id, blockInfos.flatMap(_.numRecords).sum)
 ssc.scheduler.inputInfoTracker.reportInfo(validTime, inputInfo)

 // Create the BlockRDD
 createBlockRDD(validTime, blockInfos)

createBlockRDD 会返回 BlockRDD,由于 ReceiverInputDStream 没有父依赖,所以自己生成 RDD。

private[streaming] def createBlockRDD(time: Time, blockInfos: Seq[ReceivedBlockInfo]): RDD[T] = {
 if (blockInfos.nonEmpty) {
 val blockIds = blockInfos.map {_.blockId.asInstanceOf[BlockId] }.toArray

 // Are WAL record handles present with all the blocks
 val areWALRecordHandlesPresent = blockInfos.forall {_.walRecordHandleOption.nonEmpty}

 if (areWALRecordHandlesPresent) {
 // If all the blocks have WAL record handle, then create a WALBackedBlockRDD
 val isBlockIdValid = blockInfos.map {_.isBlockIdValid() }.toArray
 val walRecordHandles = blockInfos.map {_.walRecordHandleOption.get}.toArray
 new WriteAheadLogBackedBlockRDD[T](
 ssc.sparkContext, blockIds, walRecordHandles, isBlockIdValid)
 } else {
 // Else, create a BlockRDD. However, if there are some blocks with WAL info but not
 // others then that is unexpected and log a warning accordingly.
 if (blockInfos.find(_.walRecordHandleOption.nonEmpty).nonEmpty) {
 if (WriteAheadLogUtils.enableReceiverLog(ssc.conf)) {
 logError(Some blocks do not have Write Ahead Log information; +
  this is unexpected and data may not be recoverable after driver failures )
 } else {
 logWarning(Some blocks have Write Ahead Log information; this is unexpected)
 val validBlockIds = blockIds.filter {id =
 if (validBlockIds.size != blockIds.size) {
 logWarning(Some blocks could not be recovered as they were not found in memory. +
  To prevent such data loss, enabled Write Ahead Log (see programming guide +
  for more details. )
 new BlockRDD[T](ssc.sc, validBlockIds)
 } else {
 // If no block is ready now, creating WriteAheadLogBackedBlockRDD or BlockRDD
 // according to the configuration
 if (WriteAheadLogUtils.enableReceiverLog(ssc.conf)) {
 new WriteAheadLogBackedBlockRDD[T](
 ssc.sparkContext, Array.empty, Array.empty, Array.empty)
 } else {
 new BlockRDD[T](ssc.sc, Array.empty)

再拿 DStream 的子类 MappedDStream 来说,这里的 compute 方法,是调用父 RDD 的 getOrCompute 方法获得 RDD,再使用 map 操作。

class MappedDStream[T: ClassTag, U: ClassTag] (
 parent: DStream[T],
 mapFunc: T = U
 ) extends DStream[U](parent.ssc) {

 override def dependencies: List[DStream[_]] = List(parent)

 override def slideDuration: Duration = parent.slideDuration

 override def compute(validTime: Time): Option[RDD[U]] = {

从上面两个 DStream 的子类,可以说明第一个 DStream,即 InputDStream 的 comput 方法是自己获取数据并计算的,而其他的 DStream 是依赖父 DStream 的,调用父 DStream 的 getOrCompute 方法,然后进行计算。

以上说明了对 DStream 的操作最后作用于对 RDD 的操作。

接着看下 DStream 的另一个子类 ForEachDStream,发现其 compute 方法没有任何操作,但是重写了 generateJob 方法。

class ForEachDStream[T: ClassTag] (
 parent: DStream[T],
 foreachFunc: (RDD[T], Time) = Unit,
 displayInnerRDDOps: Boolean
 ) extends DStream[Unit](parent.ssc) {

 override def dependencies: List[DStream[_]] = List(parent)

 override def slideDuration: Duration = parent.slideDuration

 override def compute(validTime: Time): Option[RDD[Unit]] = None

 override def generateJob(time: Time): Option[Job] = {
 parent.getOrCompute(time) match {
 case Some(rdd) =
 val jobFunc = () = createRDDWithLocalProperties(time, displayInnerRDDOps) {
 foreachFunc(rdd, time)
 Some(new Job(time, jobFunc))
 case None = None

从 Job 生成入手,JobGenerator 的 generateJobs 方法,内部调用的 DStreamGraph 的 generateJobs 方法。

/** Generate jobs and perform checkpoint for the given `time`.  */
private def generateJobs(time: Time) {
 // Set the SparkEnv in this thread, so that job generation code can access the environment
 // Example: BlockRDDs are created in this thread, and it needs to access BlockManager
 // Update: This is probably redundant after threadlocal stuff in SparkEnv has been removed.
 Try {
 // 根据特定的时间获取具体的数据
 jobScheduler.receiverTracker.allocateBlocksToBatch(time) // allocate received blocks to batch
 // 调用 DStreamGraph 的 generateJobs 生成 Job
 graph.generateJobs(time) // generate jobs using allocated block
 } match {
 case Success(jobs) =
 val streamIdToInputInfos = jobScheduler.inputInfoTracker.getInfo(time)
 jobScheduler.submitJobSet(JobSet(time, jobs, streamIdToInputInfos))
 case Failure(e) =
 jobScheduler.reportError(Error generating jobs for time + time, e)
 eventLoop.post(DoCheckpoint(time, clearCheckpointDataLater = false))

DStreamGraph 的 generateJobs 方法调用了 OutputStream 的 generateJob 方法,OutputStream 就是 ForEachDStream。

def generateJobs(time: Time): Seq[Job] = {
 logDebug(Generating jobs for time + time)
 val jobs = this.synchronized {
 outputStreams.flatMap {outputStream =
 val jobOption = outputStream.generateJob(time)
 logDebug(Generated + jobs.length + jobs for time + time)

