共计 9668 个字符,预计需要花费 25 分钟才能阅读完成。
这篇文章主要讲解了“Spark 编程知识点有哪些”,文中的讲解内容简单清晰,易于学习与理解,下面请大家跟着丸趣 TV 小编的思路慢慢深入,一起来研究和学习“Spark 编程知识点有哪些”吧!
#Spark 开发指南 #
## 简介 ## 总的来说,每一个 Spark 应用程序,都是由一个驱动程序组成,它运行用户的 main 函数,并且在一个集群上执行各种各样的并行操作。Spark 提供的主要的抽象(概念)是一个弹性分布式数据集,它是一个元素集合,划分到集群的不同节点上,可以被并行操作。RDDs 的创建可以从 Hadoop 文件系统(或者任何支持 Hadoop 的文件系统)上的一个文件开始,或者通过转换这个驱动程序中已存在的 Scala 集合而来。用户也可以使 Spark 持久化一个 RDD 到内存中,使其能在并行操作中被有效的重用。最后,RDDs 能自动从节点故障中恢复。
Spark 中的第二个抽象(概念)是共享变量,他可以在并行操作中使用。默认情况下,Spark 通过不同节点上的一系列任务来并行运行一个函数。他将每一个函数中用的到变量的拷贝传递到每一个任务中。有时候,一个变量需要在不同的任务之间,或者任务和驱动程序之间共享。Spark 支持两种类型的共享变量:广播变量,可以再所有节点的内存中缓存一个值,累加器,一个只能做加法的变量,例如计数器和求和。
本指南通过每一种 Spark 支持的语言来展示 Spark 的每个特性。It is easiest to follow along with if you launch Spark’s interactive shell – either bin/spark-shell for the Scala shell or bin/pyspark for the Python one.
## 接入 Spark##
###Java###
Spark1.0.2 工作在 Java6 或者 java6 以后之上。如果你在使用 Java8,Spark 支持 lamdba 表达式来简化函数编写,否则,你可以使用 org.apache.spark.api.java.function 包下的类。
用 Java 编写 Spark 应用,你需要添加 Spark 的依赖,Spark 可以通过 Maven Central 使用:
groupId=org.apache.spark artifactId=spark-core_2.10 version=1.0.2
另外,如果你想访问一个 HDFS 集群,你需要根据你的 HDFS 版本添加一个 hadoop-client 依赖。一些常用的 HDFS 版本标签显示在页面。
groupId=org.apache.hadoop artifactId=hadoop-client version= your-hdfs-version
最后,你需要在你的程序中导入一些 Spark 类,通过添加如下几行:
import org.apache.spark.api.java.JavaSparkContext import org.apache.spark.api.java.JavaRDD import org.apache.spark.SparkConf
## 初始化 Spark##
###Java### Spark 程序需要做的第一件事就是创建一个 JavaSparkContext 对象,它将告诉 Spark 怎样访问一个集群。创建一个 SparkContext,你首先必须创建 SparkConf 对象,它包含关于你的应用程序的信息。
SparkConf conf=new SparkConf().setAppName(appName).setMaster(master); JavaSparkContext sc=new JavaSparkContext(conf);
appName 参数是你的应用程序的名字,将会在集群的UI上显示。master 是 Spark、Mesos、或者 YARN 集群 URL,或者一个专用的字符串”local“使其在本地模式下运行。在实践中,当运行在一个集群上,你将不会想要把 master 硬编码到程序中,而是通过使用 spark-submit 运行程序并且接受 master。但是,在本地测试或者单元测试中,你可以传递”local“在进程内运行 Spark。
## 弹性分布式数据集 ## Spark 反复围绕的一个概念是弹性分布式数据集。它是一个有容错机制的元素集合,并且可以被并行操作。有两种创建 RDDs 的方法。并行化你的驱动程序中已存在的集合,或者引用一个外部存储系统的数据集,例如一个共享文件系统,HDFS、HBase、或者任何可以提供一个 Hadoop InputFormat 的数据源。
### 并行集合 ### 并行集合通过调用 JavaSparkContext 的 parallelize 方法,在你的驱动程序中已存在的 Collection 上创建。集合的元素将会拷贝组成一个可以被并行操作的分布式数据集。例如,下面是如何创建一个包含数字 1 到 5 的并行集合:
List Integer data=Arrays.asList(1,2,3,4,5); JavaRDD Integer distData=sc.parallelize(data);
一旦创建,分布式数据集(distData)就可以并行操作。例如,我们可以调用 distData.reduce((a,b)- a+b) 来将列表中的元素相加。我们稍后将会在分布式数据集的操作中描述。
注意:在这个指南中,我们经常使用简洁的 Java8 lamdba 语法来定义 java functions,但是在老的 Java 版本中,你可以实现 org.apache.spark.api.java.function 包中的接口。我们将会在下面详细描述 passing functions to Spark。
并行集合的另一个重要的参数是数据集被切分成切片(slices)的数量。Spark 将会为集群中的每一个 slice 运行一个 task。通常情况下,你要为集群中的每个 CPU 2- 4 个 slice。通常,Spark 会尝试根据你的集群自动设置 slice 的数量。然而,你可以手动的设置它,把它作为第二个参数传递给 parallelize(例如:sc.parallelize(data,10)).
### 外部数据集 ### Spark 可以通过任何 Hadoop 支持的存储源创建分布式数据集。包括你的本地文件系统,HDFS,Cassandra,HBase,Amazon S3 等等。Spark 支持 text files(文本文件),SequenceFiles(序列化文件),和任何其他的 Hadoop InputFormat(输入格式)。
Text file 可以通过使用 SparkContext 的 textFile 方式创建。这个方法接受一个文件的 URI(或者机器上的一个本地路径, 或者 hdfs://,s3n:// 等 URI)并且把这个文件读成一个行的集合。下面是一个调用的例子:
JavaRDD String distFile=sc.textFile(data.txt
一旦创建,distFile 可以被进行数据集操作。例如: 我们可以通过使用 map 和 reduce 将所有数据行的长度相加. 例如:distFile.map(s- s.length()).reduce((a,b)- (a+b)).
Spark 读文件时的一些注意事项:
如果使用本地文件系统上的路径,
Spark 的所有基于文件的输入方法,包括 textFile, 支持运行目录,压缩文件盒通配符。例如,你可以食用 textFile(/my/directory/ ),textFile(/my/directory/.txt), 和 textFile(/my/directory/.gz)
textFile 方法也可以接受一个可选的第二参数来控制这个文件的 slice 数目。默认情况下,Spark 为每一个文件创建一个 slice(HDFS 中 block 默认为 64MB)。但是你可以通过传递一个较大的值来指定一个跟高的 slice 值。注意你的 slice 数不能小于 block 数。
除了文本文件,Spark 的 Java API 也支持集中其他数据格式。
JavaSparkContext.wholeTextFiles lets you read a directory containing multiple small text files, and returns each of them as (filename, content) pairs. This is in contrast with textFile, which would return one record per line in each file.
对于序列化文件(SequenceFiles),使用 SparkContext 的 sequenceFile[K,V],K 和 V 是文件中 key 和 value 的类型。他们必须是 Hadoop 的 Writeable 接口的子类,像 IntWriteable 和 Text。
对于其他的 Hadoop 输入格式,你可以使用 JavaSparkContext.hadoopRDD 方法。它可以接受任意(类型)的 JobConf 和输入格式类,key 类和 value 类。按照像 Hadoop Job 一样,来设置输入源就可以了。你也可以为 InputFormats 使用 JavaSparkContext.newHadoopRDD,基于”new“MapReduce API(org.apache.hadoop.mapreduce).
JavaRDD.saveAsObjectFile 和 JavaContext.objectFile 支持以一种由 Java 对象序列化组成的简单的格式保存 RDD。虽然这不是有效地专门的格式向 Avro,但是它提供了一个简单的方式存储 RDD。
###RDD 操作 ### RDDs 支持两种类型的操作:转换(transformations),它从一个现有的数据集创建一个新的数据集。动作 (actions),它在数据集上运行计算后,返回一个值给驱动程序。例如:map 就是一个转换,它将数据集的每一个元素传递给一个函数,并且返回一个新的 RDD 表示结果。另一方面,reduce 是一个动作,他通过一些行数将一些 RDD 的所有元素聚合起来,并把最终的结果返回给驱动程序(不过还有一个并行的 reduceByKey,它返回一个分布式数据集)。
Spark 中的所有转换都是惰性的,也就是说,他们不会立即计算出结果。相反,他们只是记住应用到这些基础数据集(例如 file)上的转换。只有当发生一个需要返回一个结果给驱动程序的动作时,这些转换才真正执行。这样的设计使得 Spark 运行更加高效——例如,我们可以实现,通过 map 创建一个数据集,并在 reduce 中使用,最终只返回 reduce 的结果给驱动程序,而不是整个大的新数据集。
默认情况下,每一个转换过的 RDD 都会在你在它上面运行一个 action 时重新计算。然而,你也可以使用 persist 方法(或者 cache)持久化一个 RDD 到内存中。在这种情况下,Spark 将会在集群中,保存相关元素,下次你访问这个 RDD 时,它将能够更快速访问,。在磁盘上持久化数据集,或者在集群间复制数据集也是支持的。
#### 基本操作 #### 为了说明 RDD 基础,考虑下面的简单的程序:
JavaDDD String lines=sc.textFile(data.txtt JavaRDD Integer lineLengths=lines.map(s- s.length()); int totalLength=lineLengths.reduce((a,b)- a+b);
第一行通过一个外部文件定义了一个基本的 RDD。这个数据集未被加载到内存,也未在上面执行动作。lines 仅仅是这个文件的一个指针。第二行定义了 lineLengths 作为 map 转换的结果。此外,lineLengths 因为惰性没有立即计算。最后,我们运行 reduce,他是一个 action。这时候,Spark 将这个计算拆分成不同的 task,并使其运行在独立的机器上,并且每台机器运行它自己的 map 部分和本地的 reducation,仅仅返回他的结果给驱动程序。
如果我们想在以后重复使用 lineLengths,我们可以添加:
lineLengths.persist();
在 reduce 之前,这将导致 lineLengths 在第一次被计算之后被保存在内存中。
#### 传递 Functions 到 Spark#### Spark 的 API,在很大程度上依赖于传递函数使其驱动程序在集群上运行。在 Java 中,函数有实现了 org.apache.spark.api.java.function 包中接口的类表示。有两种创建这样的函数的方式:
在你自己的类中实现 Function 接口,可以是匿名内部类,后者命名类,并且你要传递他的一个实例到 Spark
在 Java8 中,使用 lamdba 表达式来简洁的定义一种实现
为了简洁起见,本指南中的大多数使用 lamdba 语法,它易于使用,所有的 APIs in long-form,例如,我们可以编写上面的代码如下:
JavaRDD String lines = sc.textFile( data.txt
JavaRDD Integer lineLengths = lines.map(new Function String, Integer () { public Integer call(String s) { return s.length(); }
int totalLength = lineLengths.reduce(new Function2 Integer, Integer, Integer () { public Integer call(Integer a, Integer b) { return a + b; }
});
或者,如果编写内联函数显得很笨拙:
class GetLength implements Function String, Integer { public Integer call(String s) { return s.length(); }
class Sum implements Function2 Integer, Integer, Integer { public Integer call(Integer a, Integer b) { return a + b; }
JavaRDD String lines = sc.textFile( data.txt
JavaRDD Integer lineLengths = lines.map(new GetLength());
int totalLength = lineLengths.reduce(new Sum());
Note that anonymous inner classes in Java can also access variables in the enclosing scope as long as they are marked final. Spark will ship copies of these variables to each worker node as it does for other languages
####Wroking with Key-Value Pairs 使用键 / 值对工作 #### 虽然大多数 Spark 操作工作在包含各种类型的对象的 RDDs 之上,一些特殊的操作仅仅能够使用包含 key-value 对的 RDDs。最常见的操作之一是分布式”shuffle“操作,例如通过 key 分组或者聚合元素。
在 Java 中,key-value 对使用 scala 标准包下的 scala Tuple2 类表示。你可以简单的调用 new Tuple2(a,b) 去创建一个 tuuple,并且通过 tuple._1() 和 tuple._2() 访问它的字段。
key-value 对的 RDDs 通过 JavaPairRDD 表示。你可以通过 JavaRDDs 构建 JavaPairRDDs,使用指定的 map 操作版本,像 mapToPair 和 flatMapToPair。JavaPair 将不仅拥有标准 RDD 函数,并且有特殊的 key-value 函数。
例如,下面的代码在 key-value 对上使用 reduceByKey 操作来计算在一个文件中每行文本出现的次数和。
JavaRDD String lines=sc.textFile(data.txt JavaPairRDD String,Integer pairs=lines.mapToPair(s- new Tuple2(s,1)) JavaPairRDD String,Integer counts=pairs.reduceByKey((a,b)- a+b);
我们也可以使用 counts.sortByKey(), 例如,按照字母顺序排序这个键值对。并且最后调用 counts.collect() 作为一个对象数组返回给驱动程序。
注意:当使用自定义的对象作为 key-value 对操作的 key 时,你必须确保自定义 equals()方法伴随着一个匹配的 hashCode()方法。有关详情,参考 Object.hashCode() 文档大纲中列出的规定。
#### 转换 #### 下面的表格列出了 Spark 支持的常见的转换。更多信息可以参考 RDD API 文档和 pair RDD 函数文档。
#### 动作 #### 下面的表格列出了 Spark 支持的常见的动作。更多信息可以参考 RDD API 文档和 pair RDD 函数文档。
###RDD 持久化 ### Spark 最重要的一个功能是在不同的操作间,持久化(或者缓存)一个数据集到内存中。当你持久化一个 RDD 时,每一个节点都把它计算的分片结果保存在内存中,并且在对此数据集(或者衍生出的数据集)进行其他动作时重用。这将使后续的动作变得更快(通过快 109 倍以上)。缓存是(Spark)迭代算法和快速交互使用的关键工具。
你可以使用 persist() 和 cache()方法来标记一个将要持久化的 RDD。第一次他被一个动作进行计算,他将会保留在这个节点的内存中。Spark 的缓存有容错性 - 如果 RDD 的任何一个分区丢失了,他会通过使用最初创建的它转换操作,自动重新计算。
此外,每一个持久化 RDD 可以使用不同的存储级别存储。允许你,例如,持久化数据集到磁盘,持久化数据集到内存作为序列化的 Java 对象(节省空间),跨节点复制,或者 store it off-heap in Tachyon。这些级别通过传递一个 StorageLevel 对象(Scala,Java,Python)到 persist()来设置。cache()方法是使用默认存储级别的快捷方法,即 StorageLevel.MEMORY_ONLY(存储反序列化对象到内存), 完整的存储级别设置为:
Spark 也会在 shuffle 操作(例如,reduceByKey)中自动的持久化一些中间数据。甚至当用户未调用 persist 方法。这样做是为了阻止在进行 shuffle 操作时由于一个节点故障而重新计算整个输入。我们依然推荐用户在作为结果的 RDD 上调用 persist 如果想打算重用它。
#### 存储级别的选择 ####
#### 移除数据 #### Spark 自动监视每一个节点上的缓存使用,并且使用 LRU 方式删除老的数据分区。如果你想手工的删除 yige RDD 而不是等他自动从缓存中清除,使用 RDD.unpersist() 方法。
## 共享变量 ## 通常,当传递给 Spark 操作(例如 map 或者 reduce)的函数在远程集群节点上运行时,它实际上操作的是这个函数使用到的所有变量的独立拷贝。这些变量被拷贝到每一台机器,并且在远程机器上的对这些变量的所有更新都不会传回给驱动程序。通常看来,在不同的任务之间读写变量是低效的。然而,Spark 还是为两种常见的使用模式提供了两种有限的共享变量:广播变量和累加器。
### 广播变量 ### 广播变量允许程序员保存一个只读的变量缓存在每一台机器上,而不是每个任务都保存一份拷贝。它们可以这样被使用,例如:以一种高效的方式给每一个节点一个大的输入数据集。Spark 会尝试使用一种高效的广播算法来分配广播变量,以减小通信的代价。
广播变量通过调用 SparkContext.broadcast(v) 方法从变量 v 创建。广播变量是一个 v 的包装器。它的值可以通过调用 value 方法访问。下面的代码展示了这些:
Broadcast int[] broadcastVar = sc.broadcast(new int[] {1, 2, 3});
broadcastVar.value();
// returns [1, 2, 3]
在广播变量被创建后,它应该在集群上任何函数中代替 v 被使用,使 v 不再传递到这些节点上。此外,对象 v 在被广播后不能被修改,这样可以保证所有节点获得的广播变量的值是相同的(例如,这个变量在之后被传递到一个新的节点)。
### 累加器 ### 累加器是一种只能通过关联操作进行”加“操作的变量。因此可以高效的支持并行计算。它们可以用于实现计数器(* 例如在 MapReduce 中)和求和。Spark 原生支持数字类型的累加器。开发者也可以自己添加新的支持类型。
一个累加器可以通过调用 SparkContext.accumulator(v) 方法从一个初始值 v 中创建。运行在集群上的任务,可以通过使用 add 方法或者 += 操作(在 Scala 和 Python 中)来给它加值。然而,他们不能读取这个值。只有驱动程序可以使用 value 的方法来读取累加器的值。
如下的代码,展示了如何利用累加器,将一个数组里面的所有元素相加:
Accumulator Integer accum = sc.accumulator(0);
sc.parallelize(Arrays.asList(1, 2, 3, 4)).foreach(x - accum.add(x));
// ...
// 10/09/29 18:41:08 INFO SparkContext: Tasks finished in 0.317106 s
accum.value();
// returns 10
虽然这段代码使用了内置支持 Integer 类型的累加器。但是开发者也可以通过实现 AccumulatorParam 创建自己的类型。AccumulatorParam 接口有两个方法:zero 为你的数据类型提供一个 zero value ,addInPlace 将两个值相加。例如,假设我们有一个向量类来表示数学向量,我们可以这样写:
class VectorAccumulatorParam implements AccumulatorParam Vector { public Vector zero(Vector initialValue) { return Vector.zeros(initialValue.size());
}
public Vector addInPlace(Vector v1, Vector v2) { v1.addInPlace(v2); return v1;
}
// Then, create an Accumulator of this type:
Accumulator Vector vecAccum = sc.accumulator(new Vector(...), new VectorAccumulatorParam());
在 Java 中,Spark 也支持更通用的 Accumulable 接口来累加数据,他们的计算结果类型和相加的元素的类型不一样(例如,收集同样的元素构建一个 list)。
感谢各位的阅读,以上就是“Spark 编程知识点有哪些”的内容了,经过本文的学习后,相信大家对 Spark 编程知识点有哪些这一问题有了更深刻的体会,具体使用情况还需要大家实践验证。这里是丸趣 TV,丸趣 TV 小编将为大家推送更多相关知识点的文章,欢迎关注!