Spark Data Sources怎么使用

63次阅读
没有评论

共计 4740 个字符,预计需要花费 12 分钟才能阅读完成。

本篇内容主要讲解“Spark Data Sources 怎么使用”,感兴趣的朋友不妨来看看。本文介绍的方法操作简单快捷,实用性强。下面就让丸趣 TV 小编来带大家学习“Spark Data Sources 怎么使用”吧!

一:Data Sources(数据源):

1.1    了解数据源。

 Spark SQL 支持对各种数据源通过 DataFrame 接口操作。DataFrame 可以作为正常 的 RDDs 进行操作,也可以注册为一个临时表。
    注册 DataFrame 为一个表允许您在其数据运行 SQL 查询。本节介绍用于加载和保存使用 Spark 数据源的数据的一般方法,然后再进入到可用的内置数据源的特定选项。

1.2 Generic Load/Save Functions(通用加载 / 保存功能)。

  最简单的形式,默认的数据源(parquet 除非否则配置由 spark.sql.sources.default)将用于所有操作。

    eg:第一种读取方式:通过  parquetFile(xxx) 来读取。

  首先把 spark-1.6.1-bin-hadoop2.6\examples\src\main\resources 下的 users.parquet 上传到 HDFS 上。

public class SparkSqlDemo4 {
 private static String appName =  Test Spark RDD 
 private static String master =  local 
 public static void main(String[] args) {SparkConf conf = new SparkConf();
 conf.set( spark.testing.memory ,  269522560000 
 JavaSparkContext sc = new JavaSparkContext(master, appName, conf);
 // 创建了  sqlContext 的上下文,注意,它是 DataFrame 的起点。SQLContext sqlContext = new SQLContext(sc);
 DataFrame df = sqlContext.read().load( hdfs://192.168.226.129:9000/txt/sparkshell/users.parquet 
 df.select(name ,  favorite_color).write().save( namesAndFavColors.parquet 
 // 指定保存模式  
 //df.select(name ,  favorite_color).write().mode(SaveMode.Overwrite).save( namesAndFavColors.parquet  
 // 第一种读取方式  
 DataFrame parquetFile = sqlContext.parquetFile( namesAndFavColors.parquet 
 parquetFile.registerTempTable( users  
 DataFrame df1 = sqlContext.sql( SELECT name,favorite_color FROM users   
 df1.show();
 List String  listString = df1.javaRDD().map(new Function Row, String () { 
 private static final long serialVersionUID = 1L;
 public String call(Row row) { 
 return  Name:   + row.getString(0) +   ,FavoriteColor:   + row.getString(1); 
 } 
 }).collect(); 
 for (String string : listString) {System.out.println( string );
}

输出结果如下:

+------+--------------+
| name|favorite_color|
+------+--------------+
|Alyssa| null|
| Ben| red|
+------+--------------+
Name: Alyssa ,FavoriteColor: null
Name: Ben ,FavoriteColor: red

1.3 Manually Specifying Options(手动指定选项):

  你可以也手动指定的数据源,将与您想要将传递给数据源的任何额外选项一起使用。

  数据源由其完全限定名称 (即 org.apache.spark.sql.parquet),

  但对于内置来源您还可以使用 他们短名称 (json, parquet, jdbc)。

 DataFrames 任何类型可以转换为其他类型,使用此语法。

  eg:第二种读取方式:通过  parquet(xxx) 来读取。

public class SparkSqlDemo5 {
 private static String appName =  Test Spark RDD 
 private static String master =  local 
 public static void main(String[] args) {SparkConf conf = new SparkConf();
 conf.set( spark.testing.memory ,  269522560000 
 JavaSparkContext sc = new JavaSparkContext(master, appName, conf);
 // 创建了  sqlContext 的上下文,注意,它是 DataFrame 的起点。SQLContext sqlContext = new SQLContext(sc);

DataFrame df = sqlContext.read().format( json).load( hdfs://192.168.226.129:9000/txt/sparkshell/people.json df.select(id ,  name , sex , age).write().format( parquet).save( people.parquet
DataFrame parquetFile = sqlContext.read().parquet( people.parquet   parquetFile.registerTempTable( people   DataFrame df1 = sqlContext.sql( SELECT id,name,sex,age FROM people WHERE age  = 21 AND age  = 23   df1.show(); df1.printSchema(); List String  listString = df1.javaRDD().map(new Function Row, String () {  private static final long serialVersionUID = 1L; public String call(Row row) {  return  Id:   + row.getString(0) +  , Name:  +row.getString(1) +  ,Sex  +row.getString(2)+  , Age:   + row.getLong(3); }).collect();  for (String string : listString) {System.out.println( string ); }

1.4 Run SQL on files directly(直接在文件上运行 SQL)

  您也可以查询该文件直接使用 SQL,并对其进行查询,而不是使用 API 读取文件加载到 DataFrame。

public class SparkSqlDemo6 {
 private static String appName =  Test Spark RDD 
 private static String master =  local 
 public static void main(String[] args) {SparkConf conf = new SparkConf();
 conf.set( spark.testing.memory ,  269522560000 
 JavaSparkContext sc = new JavaSparkContext(master, appName, conf);
 // 创建了  sqlContext 的上下文,注意,它是 DataFrame 的起点。SQLContext sqlContext = new SQLContext(sc);
 //  注意  sql  语句  parquet  后面目录的符号  。 
 DataFrame df = sqlContext.sql( SELECT * FROM parquet.`hdfs://192.168.226.129:9000/txt/sparkshell/users.parquet` 
 df.show();}

    注意:sql 语句中红色部分标记的符号。

SELECT * FROM parquet.`hdfs://192.168.226.129:9000/txt/sparkshell/users.parquet`

二:Save Modes(保存模式):

 Save 操作可以可选择性地接收一个 SaveModel,如果数据已经存在了,指定如何处理已经存在的数据。意识到这些保存模式没有利用任何锁,也不是原子的,这很重要。因此,如果有多个写入者试图往同一个地方写入,这是不安全的。此外,当执行一个 Overwrite,在写入新的数据之前会将原来的数据进行删除。

eg:指定保存模式:
    df.select(name , favorite_color).write().mode(SaveMode.Overwrite).save(namesAndFavColors.parquet  

Scala/JavaAny LanguageMeaningSaveMode.ErrorIfExists(default) error (default) 当保存 DataFrame 到一个数据源,如果数据已经存在,将会引发异常。SaveMode.Append append

当保存 DataFrame 到一个数据源,如果数据 / 表已经存在,DataFrame 的内容预计将追加到现有数据后面。

SaveMode.Overwrite overwrite 覆盖模式意味着当保存 DataFrame 到一个数据源,如果数据 / 表已存在,现有数据预计将覆盖原有的 DataFrame 内容。SaveMode.Ignore ignore Ignore 模式意味着当向数据源中保存一个 DataFrame 时,如果数据已经存在,save 操作不会将 DataFrame 的内容进行保存,也不会修改已经存在的数据。这与 SQL 中的 `CREATE TABLE IF NOT EXISTS` 相似。

三:Parquet 文件

 Parquet 是一种列式存储格式的文件,被许多其他数据处理系统所支持。Spark SQL 支持度对 Parquet 文件的读和写,自动保存原有数据的模式。

到此,相信大家对“Spark Data Sources 怎么使用”有了更深的了解,不妨来实际操作一番吧!这里是丸趣 TV 网站,更多相关内容可以进入相关频道进行查询,关注我们,继续学习!

正文完
 
丸趣
版权声明:本站原创文章,由 丸趣 2023-08-16发表,共计4740字。
转载说明:除特殊说明外本站除技术相关以外文章皆由网络搜集发布,转载请注明出处。
评论(没有评论)