共计 4245 个字符,预计需要花费 11 分钟才能阅读完成。
这篇文章将为大家详细讲解有关分布式数据集 SparkRDD 的依赖与缓存是怎样的,文章内容质量较高,因此丸趣 TV 小编分享给大家做个参考,希望大家阅读完这篇文章后对相关知识有一定的了解。
RDD 简介
RDD(Resilient Distributed Dataset)叫做分布式数据集,是 Spark 中最基本的数据抽象,它代表一个不可变、可分区、里面的元素可并行计算的集合。RDD 是一个类
RDD 的属性
1. 一个列表,存储存取每个 Partition 的优先位置(preferred location)。对于一个 HDFS 文件来说,这个列表保存的就是每个 Partition 所在的块的位置。按照“移动数据不如移动计算”的理念,Spark 在进行任务调度的时候,会尽可能地将计算任务分配到其所要处理数据块的存储位置。
2. 保存了计算每个分区的函数,这个计算方法会应用到每一个数据块上,Spark 中 RDD 的计算是以分片为单位的,每个 RDD 都会实现 compute 函数以达到这个目的。compute 函数会对迭代器进行复合,不需要保存每次计算的结果。
3.RDD 之间的依赖关系。RDD 的每次转换都会生成一个新的 RDD,所以 RDD 之间就会形成类似于流水线一样的前后依赖关系。在部分分区数据丢失时,Spark 可以通过这个依赖关系重新计算丢失的分区数据,而不是对 RDD 的所有分区进行重新计算。
4.RDD 的分片函数(Partitioner),一个是基于哈希的 HashPartitioner,另外一个是基于范围的 RangePartitioner。只有对于于 key-value 的 RDD,才会有 Partitioner,非 key-value 的 RDD 的 Parititioner 的值是 None。Partitioner 函数不但决定了 RDD 本身的分片数量,也决定了 parent RDD Shuffle 输出时的分片数量。
5. 一组分片(Partition),即数据集的基本组成单位。对于 RDD 来说,每个分片都会被一个计算任务处理,并决定并行计算的粒度。用户可以在创建 RDD 时指定 RDD 的分片个数,如果没有指定,那么就会采用默认值。默认值就是程序所分配到的 CPU Core 的数目。
如何创建 RDD
1. 通过序列化集合的方式创建 RDD(parallelize,makeRDD)
2. 通过读取外部的数据源(testFile)
3. 通过其他的 rdd 做 transformation 操作转换成行的 RDD
RDD 的两种算子:
1.Transformation
map(func) : 返回一个新的分布式数据集,由每个原元素经过 func 函数转换后组成
filter(func) : 返回一个新的数据集,由经过 func 函数后返回值为 true 的原元素组成
flatMap(func) : 类似于 map,但是每一个输入元素,会被映射为 0 到多个输出元素(因此,func 函数的返回值是一个 Seq,而不是单一元素)
flatMap(func) : 类似于 map,但是每一个输入元素,会被映射为 0 到多个输出元素(因此,func 函数的返回值是一个 Seq,而不是单一元素)
sample(withReplacement, frac, seed) :
根据 fraction 指定的比例对数据进行采样,可以选择是否使用随机数进行替换,seed 用于指定随机数生成器种子
union(otherDataset) : 返回一个新的数据集,由原数据集和参数联合而成
reduceByKey(func, [numTasks]) : 在一个 (K,V) 对的数据集上使用,返回一个 (K,V) 对的数据集,key 相同的值,都被使用指定的 reduce 函数聚合到一起。和 groupbykey 类似,任务的个数是可以通过第二个可选参数来配置的。
join(otherDataset, [numTasks]) :
在类型为 (K,V) 和(K,W)类型的数据集上调用,返回一个 (K,(V,W)) 对,每个 key 中的所有元素都在一起的数据集
groupWith(otherDataset, [numTasks]) : 在类型为 (K,V) 和(K,W)类型的数据集上调用,返回一个数据集,组成元素为(K, Seq[V], Seq[W]) Tuples。这个操作在其它框架,称为 CoGroup
cartesian(otherDataset) : 笛卡尔积。但在数据集 T 和 U 上调用时,返回一个 (T,U) 对的数据集,所有元素交互进行笛卡尔积。
intersection(otherDataset): 对源 RDD 和参数 RDD 求交集后返回一个新的 RDD
distinct([numTasks])) 对源 RDD 进行去重后返回一个新的 RDD
groupByKey([numTasks]) 在一个 (K,V) 的 RDD 上调用,返回一个 (K, Iterator[V]) 的 RDD
reduceByKey(func, [numTasks]) 在一个 (K,V) 的 RDD 上调用,返回一个 (K,V) 的 RDD,使用指定的 reduce 函数,将相同 key 的值聚合到一起,与 groupByKey 类似,reduce 任务的个数可以通过第二个可选的参数来设置
aggregateByKey(zeroValue)(seqOp, combOp, [numTasks])
sortByKey([ascending], [numTasks]) 在一个 (K,V) 的 RDD 上调用,K 必须实现 Ordered 接口,返回一个按照 key 进行排序的 (K,V) 的 RDD
sortBy(func,[ascending], [numTasks]) 与 sortByKey 类似,但是更灵活
join(otherDataset, [numTasks]) 在类型为 (K,V) 和(K,W)的 RDD 上调用,返回一个相同 key 对应的所有元素对在一起的 (K,(V,W)) 的 RDD
cogroup(otherDataset, [numTasks]) 在类型为 (K,V) 和(K,W)的 RDD 上调用,返回一个(K,(Iterable
2.Action
reduce(func) 通过 func 函数聚集 RDD 中的所有元素,这个功能必须是课交换且可并联的
collect() 在驱动程序中,以数组的形式返回数据集的所有元素
count() 返回 RDD 的元素个数
first() 返回 RDD 的 *** 个元素(类似于 take(1))
take(n) 返回一个由数据集的前 n 个元素组成的数组
takeSample(withReplacement,num, [seed]) 返回一个数组,该数组由从数据集中随机采样的 num 个元素组成,可以选择是否用随机数替换不足的部分,seed 用于指定随机数生成器种子
takeOrdered(n, [ordering])
saveAsTextFile(path) 将数据集的元素以 textfile 的形式保存到 HDFS 文件系统或者其他支持的文件系统,对于每个元素,Spark 将会调用 toString 方法,将它装换为文件中的文本
saveAsSequenceFile(path) 将数据集中的元素以 Hadoop sequencefile 的格式保存到指定的目录下,可以使 HDFS 或者其他 Hadoop 支持的文件系统。
saveAsObjectFile(path)
countByKey() 针对 (K,V) 类型的 RDD,返回一个 (K,Int) 的 map,表示每一个 key 对应的元素个数。
foreach(func) 在数据集的每一个元素上,运行函数 func 进行更新。
RDD 的依赖关系
1. 窄依赖
窄依赖指的是每一个父 RDD 的 Partition 最多被子 RDD 的一个 Partition 使用
总结:窄依赖我们形象的比喻为独生子女
2. 宽依赖
宽依赖指的是多个子 RDD 的 Partition 会依赖同一个父 RDD 的 Partition
总结:窄依赖我们形象的比喻为超生
3.Lineage(血统)
RDD 只支持粗粒度转换,即在大量记录上执行的单个操作。将创建 RDD 的一系列 Lineage(即血统)记录下来,以便恢复丢失的分区。RDD 的 Lineage 会记录 RDD 的元数据信息和转换行为,当该 RDD 的部分分区数据丢失时,它可以根据这些信息来重新运算和恢复丢失的数据分区。
DAG 的生成
DAG(Directed Acyclic Graph)叫做有向无环图,原始的 RDD 通过一系列的转换就就形成了 DAG,根据 RDD 之间的依赖关系的不同将 DAG 划分成不同的 Stage,对于窄依赖,partition 的转换处理在 Stage 中完成计算。对于宽依赖,由于有 Shuffle 的存在,只能在 parent RDD 处理完成后,才能开始接下来的计算,因此宽依赖是划分 Stage 的依据。
RDD 的缓存
Spark 速度非常快的原因之一,就是在不同操作中可以在内存中持久化或缓存个数据集。当持久化某个 RDD 后,每一个节点都将把计算的分片结果保存在内存中,并在对此 RDD 或衍生出的 RDD 进行的其他动作中重用。这使得后续的动作变得更加迅速。RDD 相关的持久化和缓存,是 Spark 最重要的特征之一。可以说,缓存是 Spark 构建迭代式算法和快速交互式查询的关键。
找依赖关系划分 stage 的目的之一就是划分缓存,如何通过 stage 的划分设置缓存?
(1)在窄依赖想设置缓存时用 cache
(2)在宽依赖想设置缓存时用 checkpoint
如何设置 cache 和 checkpoint?
cache:someRDD.cache()就添加成功缓存,放入到内存中
someRDD.persist(StorageLevel.MEMORY_AND_DISK):根据自己的需要设置缓存的位置(内存和硬盘)
checkpoint:可以把 RDD 计算后的数据存储在本地磁盘上,也可以是 hdfs
sc.setCheckpointDIr(hdfs://hadoop1:9000/checkpoint)设置 checkpoint 的路径 在宽依赖前设置
someRDD.checkpoint()设置 checkpoint
cache 和 checkpoint 的区别
cache 只是缓存数据,不改变 RDD 的依赖关系,checkpoint 生成了一个新的 RDD,后面的 RDD 将依赖新的 RDD 依赖关系已经改变 。数据恢复的顺序:checkpoint —》cache–》重算
关于分布式数据集 SparkRDD 的依赖与缓存是怎样的就分享到这里了,希望以上内容可以对大家有一定的帮助,可以学到更多知识。如果觉得文章不错,可以把它分享出去让更多的人看到。