Spark的RDD如何创建

89次阅读
没有评论

共计 2894 个字符,预计需要花费 8 分钟才能阅读完成。

本篇内容介绍了“Spark 的 RDD 如何创建”的有关知识,在实际案例的操作过程中,不少人都会遇到这样的困境,接下来就让丸趣 TV 小编带领大家学习一下如何处理这些情况吧!希望大家仔细阅读,能够学有所成!

一:Scala

 Scala 是一门现代的多范式编程语言,志在以简练、优雅及类型安全的方式来表达常用编程模式。它平滑地集成了面向对象和函数语言的特性。Scala 运行于 Java 平台(JVM,Java 虚拟机),并兼容现有的 Java 程序。

执行以下命令,启动 spark-shell:

hadoop@master:/mysoftware/spark-1.6.1$ spark-shell

二:弹性分布式数据集(RDD)

1.RDD(Resilient Distributed Dataset,弹性分布式数据集)。   

 Spark 是一个分布式计算框架,而 RDD 是其对分布式内存数据的抽象,可以认为 RDD 就是 Spark 分布式算法的数据结构。而 RDD 之上的操作是 Spark 分布式算法的核心原语,由数据结构和原语设计上层算法。Spark 最终会将算法翻译为 DAG 形式的工作流进行调度,并进行分布式任务的发布。

    RDD,它在集群中的多台机器上进行了数据分区,逻辑上可以认为是一个分布式的数组,而数组中每个记录可以是用户自定义的任意数据结构。RDD 是 Spark 的核心数据结构,通过 RDD 的依赖关系形成 Spark 的调度顺序,通过对 RDD 的操作形成了整个 Spark 程序。

2.RDD 的创建方式

    2.1 从 Hadoop 文件系统(或与 Hadoop 兼容的其他持久化存储系统,如 Hive,HBase)输出(HDFS)创建。

    2.2 从父 RDD 转换得到新的 RDD

    2.3 通过 parallelize 或 makeRDD 将单击数据创建为分布式 RDD。

scala  var textFile = sc.textFile( hdfs://192.168.226.129:9000/txt/sparkshell/sparkshell.txt  
textFile: org.apache.spark.rdd.RDD[String] = hdfs://192.168.226.129:9000/txt/sparkshell/sparkshell.txt MapPartitionsRDD[1] at textFile at  console :27
scala  val a = sc.parallelize(1 to 9, 3)
a: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[6] at parallelize at  console :27
scala

3.RDD 的两种操作算子: 转换(Transformation),行动(Action)。

    3.1   转换(Transformation):延迟计算的,也就是说从一个 RDD 转换生成另外一个 RDD 的转换操作不是马上执行,需要等到有 Action 操作的时候才会真正触发运算。

    3.2   行动(Action):Action 算子会触发 Spark 提交作业 (Job), 并将数据输出 Spark 系统。

4.RDD 的重要内部属性。

    4.1   分区列表:通过分区列表可以找到一个 RDD 中包含的所有分区及其所在地址。

    4.2   计算每个分片的函数:通过函数可以对每个数据块进行 RDD 需要进行的用户自定义函数                                                 运算。

    4.3   对父 RDD 的依赖列表:为了能够回溯带父 RDD,为容错等提供支持。

    4.4   对 key-value pair 数据类型 RDD 的分区器,控制分区策略和分区数。通过分区函数可以确定数据记录在各个分区和节点上的分配,减少分布不平衡。

    4.5   每个数据分区的地址列表(如 HDFS 上的数据块的地址)。
  如果数据有副本,则通过地址列表可以获知单个数据块的所有副本地址,为负载均衡和容错提供支持。

4.  Spark 计算工作流

    途中描述了 Spark 的输入,运行转换,输出。在运行转换中通过算子对 RDD 进行转换。算子 RDD 中定义的函数,可以对 RDD 中的数据进行转换和操作。

    输入:在 Spark 程序运行中,数据从外部数据空间(eg:HDFS)输入到 Spark,数据就进入了 Spark 运行时数据空间,会转化为 Spark 中的数据块,通过 BlockManager 进行管理。

    运行:在 Spark 数据输入形成 RDD 后,便可以通过变换算子 fliter 等,对数据操作并将 RDD 转换为新的 RDD,通过行动 Action 算子,触发 Spark 提交作业。如果数据需要服用,可以通过 Cache 算子,将数据缓存到内存。

    输出:程序运行结束数据会输出 Spark 运行时空间,存储到分布式存储中(如 saveAsTextFile 输出到 HDFS)或 Scala 数据或集合中(collect 输出到 Scala 集合,count 返回 Scala Int 型数据)。
 

 

    Spark 的核心数据模型是 RDD,但 RDD 是个抽象类,具体由各子类实现,如 MappedRDD,ShuffledRDD 等子类。Spark 将常用的大数据操作都转换成为 RDD 的子类。

对其一些基本操作的使用:

scala  3*7
res0: Int = 21
scala  var textFile = sc.textFile( hdfs://192.168.226.129:9000/txt/sparkshell/sparkshell.txt  
textFile: org.apache.spark.rdd.RDD[String] = hdfs://192.168.226.129:9000/txt/sparkshell/sparkshell.txt MapPartitionsRDD[1] at textFile at  console :27
scala  textFile.count()
res1: Long = 3 
scala  textFile.first()
res2: String = 1 spark
scala  textFile.filter(line =  line.contains( berg)).count() 
res3: Long = 1
scala  textFile.filter(line =  line.contains( bergs)).count() 
res4: Long = 0
scala  textFile.map(line =  line.split(  ).size).reduce((a, b) =  if (a   b) a else b)
res5: Int = 1
scala  textFile.map(line =  line.split( \t).size).reduce((a, b) =  if (a   b) a else b)
res6: Int = 2

“Spark 的 RDD 如何创建”的内容就介绍到这里了,感谢大家的阅读。如果想了解更多行业相关的知识可以关注丸趣 TV 网站,丸趣 TV 小编将为大家输出更多高质量的实用文章!

正文完
 
丸趣
版权声明:本站原创文章,由 丸趣 2023-08-16发表,共计2894字。
转载说明:除特殊说明外本站除技术相关以外文章皆由网络搜集发布,转载请注明出处。
评论(没有评论)