Spark RDD的创建方式及算子的使用方法是什么

52次阅读
没有评论

共计 6085 个字符,预计需要花费 16 分钟才能阅读完成。

这篇文章主要介绍“Spark RDD 的创建方式及算子的使用方法是什么”,在日常操作中,相信很多人在 Spark RDD 的创建方式及算子的使用方法是什么问题上存在疑惑,丸趣 TV 小编查阅了各式资料,整理出简单好用的操作方法,希望对大家解答”Spark RDD 的创建方式及算子的使用方法是什么”的疑惑有所帮助!接下来,请跟着丸趣 TV 小编一起来学习吧!

一:简单了解 RDD 和 RDD 处理数据

 RDD,全称为 Resilient Distributed Datasets,是一个容错的、并行的数据结构,可以让用户显式地将数据存储到磁盘和内存中,并能控制数据的分区。

 RDD:Spark 的核心概念是 RDD (resilientdistributed dataset),指的是一个只读的,可分区的分布式数据集,这个数据集的全部或部分可以缓存在内存中,在多次计算间重用。

 RDD 本质上是一个内存数据集,在访问 RDD 时,指针只会指向与操作相关的部分。例如存在一个面向列的数据结构,其中一个实现为 Int 的数组,另一个实现为 Float 的数组。如果只需要访问 Int 字段,RDD 的指针可以只访问 Int 数组,避免了对整个数据结构的扫描。

 RDD 将操作分为两类:transformation 与 action。无论执行了多少次 transformation 操作,RDD 都不会真正执行运算,只有当 action 操作被执行时,运算才会触发。而在 RDD 的内部实现机制中,底层接口则是基于迭代器的,从而使得数据访问变得更高效,也避免了大量中间结果对内存的消耗。

  在实现时,RDD 针对 transformation 操作,都提供了对应的继承自 RDD 的类型,例如 map 操作会返回 MappedRDD,而 flatMap 则返回 FlatMappedRDD。当我们执行 map 或 flatMap 操作时,不过是将当前 RDD 对象传递给对应的 RDD 对象而已。

注意:创建的 Maven 工程,以下是 pom.xml 中的依赖:

dependencies 
 dependency 
 groupId junit /groupId 
 artifactId junit /artifactId 
 version 4.12 /version 
 /dependency 
 dependency 
 groupId org.apache.spark /groupId 
 artifactId spark-core_2.10 /artifactId 
 version 1.6.1 /version 
 /dependency 
 dependency 
 groupId org.apache.hadoop /groupId 
 artifactId hadoop-client /artifactId 
 version 2.6.4 /version 
 /dependency 
 dependency 
 groupId org.apache.spark /groupId 
 artifactId spark-sql_2.10 /artifactId 
 version 1.6.1 /version 
 /dependency 
 /dependencies

二:从 Hadoop 文件系统(或与 Hadoop 兼容的其他持久化存储系统,如 Hive,HBase)输出(HDFS)创建。

    eg:  求 HDFS 文件中内容所有行数据长度及总长度。

public class TestRDD1 {public static void main(String[] args) {createRDDFromHDFS();
 private static void createRDDFromHDFS(){SparkConf conf = new SparkConf();
 conf.set( spark.testing.memory ,  269522560000 
 JavaSparkContext sc = new JavaSparkContext(local ,  Spark Test , conf);
 System.out.println( sc );
 JavaRDD String  rdd = sc.textFile( hdfs://192.168.226.129:9000/txt/sparkshell/sparkshell.txt 
 JavaRDD Integer  newRDD = rdd.map( new Function String,Integer (){
 private static final long serialVersionUID = 1L;
 public Integer call(String string) throws Exception {System.out.println( string +     + string.length() );
 return string.length();
 System.out.println( newRDD.count() );
 int length = newRDD.reduce( new Function2 Integer, Integer, Integer (){
 private static final long serialVersionUID = 1L;
 public Integer call(Integer int1, Integer int2) throws Exception {
 return int1+int2;
 System.out.println(总和  + length);
}

三:通过 parallelize 或 makeRDD 将单击数据创建为分布式 RDD。

eg:求总和。

public class TestRDD2 {public static void main(String[] args) {createRDDFromSuperRDD();
  * JavaSparkContext(String master, String appName, SparkConf conf)
  * master - Cluster URL to connect to (e.g. mesos://host:port, spark://host:port, local[4]).
  * appName - A name for your application, to display on the cluster web UI
  * conf - a SparkConf object specifying other Spark parameters
  * */
 private static void createRDDFromSuperRDD(){SparkConf conf = new SparkConf();
 conf.set( spark.testing.memory ,  269522560000 
 JavaSparkContext sc = new JavaSparkContext(local ,  Spark Test , conf);
 System.out.println( sc );
 List Integer  list = new ArrayList Integer 
 for( int i=1;i i++){list.add(i);
 JavaRDD Integer  rdd = sc.parallelize(list);
 JavaRDD Integer  newRDD = rdd.map( new Function Integer,Integer (){
 private static final long serialVersionUID = 1L;
 public Integer call(Integer int1) throws Exception {
 return int1;
 int count = newRDD.reduce( new Function2 Integer, Integer, Integer (){
 private static final long serialVersionUID = 1L;
 public Integer call(Integer int1, Integer int2) throws Exception {
 return int1+int2;
 System.out.println(总和  + count);
}

注意:上述两段代码中,在获取 JavaSparkContext 的时候,是这样写的:

 SparkConf conf = new SparkConf();

     conf.set(spark.testing.memory , 269522560000  // 给 jvm 足够的资源。

     JavaSparkContext sc = new JavaSparkContext(local , Spark Test , conf);

而对于标记的加粗红色部分,参照 API 如下:

 JavaSparkContext(String master, String appName, SparkConf conf)

 -master – Cluster URL to connect to (e.g. mesos://host:port, spark://host:port, local[4]).
 -appName – A name for your application, to display on the cluster web UI
 -conf – a SparkConf object specifying other Spark parameters

对于 master,官网有详细的介绍:

我这里写的是 local,表示的是:

    对于本地模式测试和单元测试,可以通过 local 在 spark 内运行程序。

******************************

另外写的一段,对算子中一些基本方法的使用

参考学习:

    RDD 算子分类: http://my.oschina.net/gently/blog/686800 (自己的。)

public class TestRDD3 {
 private static String appName =  Test Spark RDD 
 private static String master =  local 
 public static void main(String[] args) {SparkConf conf = new SparkConf();
 conf.set( spark.testing.memory ,  269522560000 
 JavaSparkContext sc = new JavaSparkContext(master, appName, conf);
 System.out.println( sc );
 List String  list = new ArrayList String 
 list.add(  Berg  );
 list.add(  Hadoop  );
 list.add(  HBase  );
 list.add(  Hive  );
 list.add(  Spark  );
 JavaRDD String  rdd = sc.parallelize(list);
 JavaRDD Integer  newrdd = rdd.map( new Function String,Integer (){
 private static final long serialVersionUID = 1L;
 public Integer call(String string) throws Exception {System.out.println( string +  \t  +string.length() );
 return string.length();
 Integer length = newrdd.reduce( new Function2 Integer, Integer, Integer () {
 private static final long serialVersionUID = 1L;
 public Integer call(Integer i1, Integer i2) throws Exception {
 return i1+i2;
 long count = newrdd.count();
 List Integer  listnewrdd = newrdd.collect();
 for (Integer integer : listnewrdd) {System.out.print(integer +   \t  );
 System.out.println(  \nlength --    + length +     + count );
 System.out.println(  \n\n**************************************\n\n 
 List Integer  list1 = new ArrayList Integer 
 for( int i=1; i i++){list1.add( i );
 JavaRDD Integer  rdd1 = sc.parallelize(list1);
 JavaRDD Integer  unionrdd = newrdd.union(rdd1);
 JavaRDD Integer  rdd2 = unionrdd.map( new Function Integer,Integer (){
 private static final long serialVersionUID = 1L;
 public Integer call(Integer i) throws Exception {
 return i;
 long count2 = rdd2.reduce( new Function2 Integer, Integer, Integer () {
 private static final long serialVersionUID = 1L;
 public Integer call(Integer arg0, Integer arg1) throws Exception {
 return arg0 + arg1;
 System.out.println(count2 --    +count2 );
 rdd2.foreach( new VoidFunction Integer (){
 private static final long serialVersionUID = 1L;
 public void call(Integer arg0) throws Exception {System.out.println(  foreach--    + arg0 );
}

到此,关于“Spark RDD 的创建方式及算子的使用方法是什么”的学习就结束了,希望能够解决大家的疑惑。理论与实践的搭配能更好的帮助大家学习,快去试试吧!若想继续学习更多相关知识,请继续关注丸趣 TV 网站,丸趣 TV 小编会继续努力为大家带来更多实用的文章!

正文完
 
丸趣
版权声明:本站原创文章,由 丸趣 2023-08-16发表,共计6085字。
转载说明:除特殊说明外本站除技术相关以外文章皆由网络搜集发布,转载请注明出处。
评论(没有评论)