怎么给Spark传递函数

71次阅读
没有评论

共计 2403 个字符,预计需要花费 7 分钟才能阅读完成。

本篇文章给大家分享的是有关怎么给 Spark 传递函数,丸趣 TV 小编觉得挺实用的,因此分享给大家学习,希望大家阅读完这篇文章后可以有所收获,话不多说,跟着丸趣 TV 小编一起来看看吧。

  相信很多人在开始用 Spark 的时候一定会遇到 Task not serializable 的问题,这种问题大多数都是在 RDD 的算子中调用了不能序列化的对象引起的。为什么传入算子中的对象一定要能够序列化呢?这就要从 Spark 本身说起,Spark 是一个分布式的计算框架,RDD(Resilient Distributed Datasets, 弹性分布式数据集)是对分布式数据集的抽象,数据实际上是分布在集群的各个节点的,通过 RDD 进行抽象,让用户感觉好像是在本地交互一样。但是实际的运算中,算子中的操作都要发送到计算节点(Executor)端来执行,这就要求传入算子中的对象可以进行序列化。

      Spark 的算子很大程度上是上通过向集群上的驱动程序传递函数来实现的,编写 Spark 应用的关键就是使用算子(或者称为转换),给 Spark 传递函数来实现。常用的向 Spark 传递函数的方式有两种(来自于 Spark 官方文档,Spark 编程指南):

        第一种:匿名函数,处理的代码比较少的时候,可以采用匿名函数,直接写在算子里面:

myrdd.map(x =  x+ 1)

        第二种:全局单例对象中的静态方法:先定义 object 对象 MyFunctions,以及静态方法:funcOne, 然后传递 MyFunctions.funcOne 给 RDD 算子。

object MyFunctions { def funcOne(s: String): String = { ... }
 }

 myRdd.map(MyFunctions.funcOne)

      在业务员开发中,需要把 RDD 的引用传递给某一个类的实例的某个方法,传递给 RDD 的函数,为类实例的实例方法:

class MyClass { def funcOne(s: String): String = { ... }
 def doStuff(rdd: RDD[String]): RDD[String] = { rdd.map(funcOne }
 }

        在这个例子中,我们定义了一个类 MyClass,类的实例方法 doStuff 中传入了一个 RDD,RDD 算子中调用了类的另外一个实例方法 funcOne, 在我么 New 一个 MyClass 的实例并调用 doStuff 的方法的时候,需要讲整个实例对象发给集群,所以类 MyClass 必须可以序列化,需要 extends Serializable。

      相似的,访问方法外部的对象变量也会引用整个对象,需要把整个对象发送到集群:

class MyClass {
 val field =  Hello 
 def doStuff(rdd: RDD[String]): RDD[String] = { rdd.map(x =  field 
 + x) }
}

      为了避免整个对象都发送给集群,可以定义一个局部变量来保存外部对象 field 的引用,这种情况尤其在一些大对象里,可以避免整个对象发送到集群,提高效率。

def doStuff(rdd: RDD[String]): RDD[String] = {
 val field_ = this.field
 rdd.map(x =  field_ + x)
}

    Spark 应用最终是要在集群中运行的,许多问题在单一的本地环境中无法暴露出来,有时候经常会遇到本地运行结果和集群运行结果不一致的问题,这就要求开发的时候多使用函数式编程风格,尽量使的写的函数都为纯函数。纯函数的好处是:无状态, 线程安全, 不需要线程同步,应用程序或者运行环境 (Runtime) 可以对纯函数的运算结果进行缓存, 运算加快速度。

    那么什么是纯函数了?

    纯函数(Pure Function)是这样一种函数——输入输出数据流全是显式(Explicit)的。显式(Explicit)的意思是,函数与外界交换数据只有一个唯一渠道——参数和返回值;函数从函数外部接受的所有输入信息都通过参数传递到该函数内部;函数输出到函数外部的所有信息都通过返回值传递到该函数外部。如果一个函数通过隐式(Implicit)方式,从外界获取数据,或者向外部输出数据,那么,该函数就不是纯函数,叫作非纯函数(Impure Function)。隐式(Implicit)的意思是,函数通过参数和返回值以外的渠道,和外界进行数据交换。比如,读取全局变量,修改全局变量,都叫作以隐式的方式和外界进行数据交换;比如,利用 I /O API(输入输出系统函数库)读取配置文件,或者输出到文件,打印到屏幕,都叫做隐式的方式和外界进行数据交换。

      在计算过程中涉及到对象的交互时,尽量选用无状态的对象,比如对于一个 bean,成员变量都为 val 的,在需要数据交互的地方 new 一个新的。

      关于(commutative and associative)交换律和结合律。在传递给 reudce,reduceByKey, 以及其他的一些 merge,聚合的操作中的函数必须要满足交换律和结合律,交换律和结合律就是我们数学上学过的:

      a + b = b + a,a + b + c =  a + (b + c)

定义的函数 func(a,b)和 f(b,a)应该得到相同的结果,f(f(a,b),c)和 f(a,f(b,c))应该得到相同的结果。

    最后说一下广播变量和累加器的使用。在程序中不要定义一个全局的变量,如果需要在多个节点共享一个数据,可以采用广播变量的方法。如果需要一些全局的聚合计算,可以使用累加器。

以上就是怎么给 Spark 传递函数,丸趣 TV 小编相信有部分知识点可能是我们日常工作会见到或用到的。希望你能通过这篇文章学到更多知识。更多详情敬请关注丸趣 TV 行业资讯频道。

正文完
 
丸趣
版权声明:本站原创文章,由 丸趣 2023-08-16发表,共计2403字。
转载说明:除特殊说明外本站除技术相关以外文章皆由网络搜集发布,转载请注明出处。
评论(没有评论)