RDD怎么向spark传递函数

45次阅读
没有评论

共计 3598 个字符,预计需要花费 9 分钟才能阅读完成。

本篇内容介绍了“RDD 怎么向 spark 传递函数”的有关知识,在实际案例的操作过程中,不少人都会遇到这样的困境,接下来就让丸趣 TV 小编带领大家学习一下如何处理这些情况吧!希望大家仔细阅读,能够学有所成!

惰性求值

RDD 的转换操作都是惰性求值的。
惰性求值意味着我们对 RDD 调用转化操做(例如 map 操作)并不会立即执行,相反 spark 会在内部记录下所要求执行的操作的相关信息。
把数据读取到 RDD 的操作同样也是惰性的,因此我们调用 sc.textFile() 时数据没有立即读取进来,而是有必要时才会读取。和转化操作一样读取数据操作也有可能被多次执行。这在写代码时要特别注意。

关于惰性求值,对新手来说可能有与直觉相违背之处。有接触过函数式语言类如 haskell 的应该不会陌生。
在最初接触 spark 时,我们也会有这样的疑问。
也参与过这样的讨论:

 val sc = new SparkContext(local[2] ,  test )
 val f:Int ⇒ Int = (x:Int) ⇒ x + 1
 val g:Int ⇒ Int = (x:Int) ⇒ x + 1
 val rdd = sc.parallelize(Seq(1,2,3,4),1)
 //1
 val res1 = rdd.map(x ⇒ g(f(x))).collect
 //2
 val res2 = rdd.map(g).map(f).collect

第 1 和第 2 两种操作均能得到我们想要的结果,但那种操作更好呢?
直观上我们会觉得第 1 种操作更好,因为第一种操作可以仅仅需要一次迭代就能得到我们想要的结果。第二种操作需要两次迭代操作才能完成。
是我们想象的这样吗?让我们对函数 f 和 g 的调用加上打印。按照上面的假设。1 和 2 的输出分别是这样的:

1: f g f g f g f g 
2: g g g g f f f f

代码:

val sc = new SparkContext(local[2] ,  test )
val f:Int ⇒ Int = (x:Int) ⇒ { print( f\t)
 x + 1
 }
val g:Int ⇒ Int = (x:Int) ⇒ { print( g\t)
 x + 1
val rdd = sc.parallelize(Seq(1,2,3,4), 1
val res1 = rdd.map(x ⇒ g(f(x))).collect()
val res2 = rdd.map(f).map(g).collect()

将上面的代码 copy 试着运行一下吧,我们在控制台得到的结果是这样的。

f g f g f g f g
f g f g f g f g

是不是大大出乎我们的意料?这说明什么?说明 spark 是懒性求值的! 我们在调用 map(f) 时并不会真正去计算, map(f)只是告诉 spark 数据是怎么计算出来的。map(f).map(g) 其实就是在告诉 spark 数据先通过 f 在通过 g 计算出来的。然后在 collect() 时,spark 在一次迭代中先后对数据调用 f、g。

继续回到我们最初的问题,既然两种调用方式,在性能上毫无差异,那种调用方式更好呢?我们更推荐第二种调用方式,除了 api 更加清晰之外。在调用链很长的情况下,我们可以利用 spark 的检查点机制,在中间添加检查点,这样数据恢复的代价更小。而第一种方式调用链一旦出错,数据只能从头计算。

那么 spark 到底施加了何种魔法,如此神奇?让我们来拨开 spark 的层层面纱。最好的方式当然是看源码了。以 map 为例:

RDD 的 map 方法

 /**
 * Return a new RDD by applying a function to all elements of this RDD.
 */
 def map[U: ClassTag](f: T =  U): RDD[U] = withScope { val cleanF = sc.clean(f)
 new MapPartitionsRDD[U, T](this, (context, pid, iter) =  iter.map(cleanF))
 }

和 MapPartitionsRDD 的 compute 方法

override def compute(split: Partition, context:
 TaskContext): Iterator[U] =
 f(context, split.index,
 firstParent[T].iterator(split, 
 context))

关键是这个 iter.map(cleanF)),我们调用一个 map 方法其实是在 iter 对象上调用一个 map 方法。iter 对象是 scala.collection.Iterator 的一个实例。
在看一下 Iterator 的 map 方法

def map[B](f: A =  B): Iterator[B]= 
 new AbstractIterator[B] {
 def hasNext = self.hasNext
 def next() = f(self.next())
}

联想到我们刚才说的我们在 RDD 上调用一个 map 方法只是告诉 spark 数据是怎么计算出来的,并不会真正计算。是不是恍然大悟了。

向 spark 传递函数

我们可以把定义好的内联函数、方法的引用或静态方法传递给 spark。就像 scala 的其它函数式 API 一样。我们还要考虑一些细节,比如传递的函数及其引用的变量是可序列话的(实现了 java 的 Serializable 接口 )。除此之外传递一个对象的方法或字段时,会包含对整个对象的引用。我们可以把该字段放到一个局部变量中,来避免传递包含该字段的整个对象。

scala 中的函数传递

class SearchFunctions(val query:String){ def isMatch(s:String) = s.contains(query)
 def getMatchFuncRef(rdd:RDD[String])
 :RDD[String]= {
 //isMatch  代表 this.isMatch 因此我们要传递整个 this
 rdd.map(isMatch)
 }
 def getMatchFieldRef(rdd:RDD[String])={
 //query 表示 this.query 因此我们要传递整个 this
 rdd.map(x= x.split(query))
 }
 def getMatchsNoRef(rdd:RDD[String])={
 // 安全只要把我们需要的字段放到局部变量中
 val q = this.query
 rdd.map(x= x.split(query))
 }
}

如果在 scala 中出现了 NotSerializableException,通常问题就在我们传递了一个不可序列化类中的函数或字段。传递局部可序列变量或顶级对象中的函数始终是安全的。

持久化

如前所述,spark 的 RDD 是惰性求值的,有时我们希望能过多次使用同一个 RDD。如果只是简单的对 RDD 调用行动操作,spark 每次都会重算 RDD 和它的依赖。这在迭代算法中消耗巨大。可以使用 RDD.persist() 让 spark 把 RDD 缓存下来。

避免 GroupByKey

让我们来看看两种 workCount 的方式,一种使用 reduceByKey,另一种使用 groupByKey。

val words = Array(one ,  two ,  two ,  three ,  three ,  three)
val wordPairsRDD = sc.parallelize(words).map(word =  (word, 1))
val wordCountsWithReduce = wordPairsRDD
 .reduceByKey(_ + _)
 .collect()
val wordCountsWithGroup = wordPairsRDD
 .groupByKey()
 .map(t =  (t._1, t._2.sum))
 .collect()

虽然两种方式都能产生正确的结果,但 reduceByKey 在大数据集时工作的更好。这时因为 spark 会在 shuffling 数据之前,为每一个分区添加一个 combine 操作。这将大大减少 shuffling 前的数据。

看下图来理解 reduceBykey 的过程

而 groupBykey 会 shuff 所有的数据,这大大加重了网络传输的数据量。另外如果一个 key 对应很多 value,这样也可能引起 out of memory。

如图,groupby 的过程

“RDD 怎么向 spark 传递函数”的内容就介绍到这里了,感谢大家的阅读。如果想了解更多行业相关的知识可以关注丸趣 TV 网站,丸趣 TV 小编将为大家输出更多高质量的实用文章!

正文完
 
丸趣
版权声明:本站原创文章,由 丸趣 2023-08-16发表,共计3598字。
转载说明:除特殊说明外本站除技术相关以外文章皆由网络搜集发布,转载请注明出处。
评论(没有评论)