Spark SQL编程的示例分析

67次阅读
没有评论

共计 5995 个字符,预计需要花费 15 分钟才能阅读完成。

这篇文章将为大家详细讲解有关 Spark SQL 编程的示例分析,丸趣 TV 小编觉得挺实用的,因此分享给大家做个参考,希望大家阅读完这篇文章后可以有所收获。

#Spark SQL 编程指南 #

## 简介 ## Spark SQL 支持在 Spark 中执行 SQL,或者 HiveQL 的关系查询表达式。它的核心组件是一个新增的 RDD 类型 JavaSchemaRDD。JavaSchemaRDD 由 Row 对象和表述这个行的每一列的数据类型的 schema 组成。一个 JavaSchemaRDD 类似于传统关系数据库的一个表。JavaSchemaRDD 可以通过一个已存在的 RDD,Parquet 文件,JSON 数据集,或者通过运行 HiveSQL 获得存储在 Apache Hive 上的数据创建。

Spark SQL 目前是一个 alpha 组件。尽管我们会尽量减少 API 变化,但是一些 API 任然后再以后的发布中改变。

## 入门 ## 在 Spark 中,所有关系函数功能的入口点是 JavaSQLContext 类。或者他的子类。要创建一个基本的 JavaSQLContext,所有你需要的只是一个 JavaSparkContext。

JavaSparkContext sc = ...; // An existing JavaSparkContext.
JavaSQLContext sqlContext = new org.apache.spark.sql.api.java.JavaSQLContext(sc);

## 数据源 ## Spark SQL 支持通过 JavaSchemaRDD 接口操作各种各样的数据源。一单一个数据集被加载,它可以被注册成一个表,甚至和来自其他源的数据连接。

###RDDs### Spark SQL 支持的表的其中一个类型是由 JavaBeans 的 RDD。BeanInfo 定义了这个表的 schema。现在,Spark SQL 不支持包括嵌套或者复杂类型例如 Lists 或者 Arrays 的 JavaBeans。你可以通过创建一个实现了 Serializable 并且它的所有字段都有 getters 和 setters 方法的类类创建一个 JavaBeans。

public static class Person implements Serializable {
 private String name;
 private int age;
 public String getName() {
 return name;
 }
 public void setName(String name) {
 this.name = name;
 }
 public int getAge() {
 return age;
 }
 public void setAge(int age) {
 this.age = age;
 }
}

一个 schema 可以被应用在一个已存在的 RDD 上,通过调用 applySchema 并且提供这个 JavaBean 的类对象。

// sc is an existing JavaSparkContext.
JavaSQLContext sqlContext = new org.apache.spark.sql.api.java.JavaSQLContext(sc)
// Load a text file and convert each line to a JavaBean.
JavaRDD Person  people = sc.textFile(examples/src/main/resources/people.txt).map( new Function String, Person () { public Person call(String line) throws Exception { String[] parts = line.split( , 
 Person person = new Person();
 person.setName(parts[0]);
 person.setAge(Integer.parseInt(parts[1].trim()));
 return person;
 }
 });
// Apply a schema to an RDD of JavaBeans and register it as a table.
JavaSchemaRDD schemaPeople = sqlContext.applySchema(people, Person.class);
schemaPeople.registerAsTable( people 
// SQL can be run over RDDs that have been registered as tables.
JavaSchemaRDD teenagers = sqlContext.sql(SELECT name FROM people WHERE age  = 13 AND age  = 19)
// The results of SQL queries are SchemaRDDs and support all the normal RDD operations.
// The columns of a row in the result can be accessed by ordinal.
List String  teenagerNames = teenagers.map(new Function Row, String () { public String call(Row row) { return  Name:   + row.getString(0);
 }
}).collect();

注意,Spark SQL 目前使用一个非常简单的 SQL 解析器。用户如果想获得一个更加完整的 SQL 方言,应该看看 HiveContext 提供的 HiveQL 支持。

###Parquet Files### Parquet 是一个 columnar 格式,并且被许多其他数据处理系统支持。Spark SQL 对读写 Parquet 文件提供支持,并且自动保存原始数据的 Schema。通过下面的例子使用数据:

// sqlContext from the previous example is used in this example.
JavaSchemaRDD schemaPeople = ... // The JavaSchemaRDD from the previous example.
// JavaSchemaRDDs can be saved as Parquet files, maintaining the schema information.
schemaPeople.saveAsParquetFile( people.parquet 
// Read in the Parquet file created above. Parquet files are self-describing so the schema is preserved.
// The result of loading a parquet file is also a JavaSchemaRDD.
JavaSchemaRDD parquetFile = sqlContext.parquetFile( people.parquet 
//Parquet files can also be registered as tables and then used in SQL statements.
parquetFile.registerAsTable( parquetFile 
JavaSchemaRDD teenagers = sqlContext.sql( SELECT name FROM parquetFile WHERE age  = 13 AND age  = 19 
List String  teenagerNames = teenagers.map(new Function Row, String () { public String call(Row row) { return  Name:   + row.getString(0);
 }
}).collect();

###JSON Datasets### Spark SQL 可以自动推断一个 JSON 数据集的 schema,并加载成一个 JavaSchemaRDD。这个转换可以通过 JavaSQLContext 中的两个方法中的一个完成:

jsonFile - 从一个目录下的文件中加载数据,这个文件中的每一行都是一个 JSON 对象。

jsonRdd - 从一个已存在的 RDD 加载数据,这个 RDD 中的每一个元素是一个包含一个 JSON 对象的 String。

 // sc is an existing JavaSparkContext.
 JavaSQLContext sqlContext = new org.apache.spark.sql.api.java.JavaSQLContext(sc);
 // A JSON dataset is pointed to by path.
 // The path can be either a single text file or a directory storing text files.
 String path =  examples/src/main/resources/people.json 
 // Create a JavaSchemaRDD from the file(s) pointed to by path
 JavaSchemaRDD people = sqlContext.jsonFile(path);
 // The inferred schema can be visualized using the printSchema() method.
 people.printSchema();
 // root
 // |-- age: IntegerType
 // |-- name: StringType
 // Register this JavaSchemaRDD as a table.
 people.registerAsTable( people 
 // SQL statements can be run by using the sql methods provided by sqlContext.
 JavaSchemaRDD teenagers = sqlContext.sql( SELECT name FROM people WHERE age  = 13 AND age  = 19 
 // Alternatively, a JavaSchemaRDD can be created for a JSON dataset represented by
 // an RDD[String] storing one JSON object per string.
 List String  jsonData = Arrays.asList(  {\ name\ :\ Yin\ ,\ address\ :{\ city\ :\ Columbus\ ,\ state\ :\ Ohio\}} 
 JavaRDD String  anotherPeopleRDD = sc.parallelize(jsonData);
 JavaSchemaRDD anotherPeople = sqlContext.jsonRDD(anotherPeopleRDD);

###Hive Tables### Spark SQL 也支持读和写存储在 apache Hive 中的数据。然而,由于 Hive 有一个非常大的依赖,他没有在 Spark 默认宝中包括。为了使用 Hive,你必须运行‘SPARK_HIVE=true sbt/sbt assembly/assembly (或者对 Maven 使用 -Phive)。这个命令构建一个包含 Hive 的 assembly。注意,这个 Hive assembly 必须放在所有的工作节点上,因为它们需要访问 Hive 的序列化和方序列化包(SerDes),以此访问存储在 Hive 中的数据。

可以通过 conf 目录下的 hive-site.xml 文件完成 Hive 配置。

要和 Hive 配合工作,你需要构造一个 JavaHiveContext,它继承了 JavaSQLContext,并且添加了发现 MetaStore 中的表和使用 HiveQL 编写查询的功能。此外,除了 sql 方法,JavaHiveContext 方法还提供了一个 hql 方法,它允许查询使用 HiveQL 表达。

##Writing Language-Integrated Relational Queries## Language-Integrated 查询目前只在 Scala 中被支持。

Spark SQL 同样支持使用领域特定的语言来编写查询。再次,使用上面例子中的数据:

// sc is an existing SparkContext.
val sqlContext = new org.apache.spark.sql.SQLContext(sc)
// Importing the SQL context gives access to all the public SQL functions and implicit conversions.
import sqlContext._
val people: RDD[Person] = ... // An RDD of case class objects, from the first example.
// The following is the same as  SELECT name FROM people WHERE age  = 10 AND age  = 19 
val teenagers = people.where(age  = 10).where(age  = 19).select(name)
teenagers.map(t =   Name:   + t(0)).collect().foreach(println)

DSL 使用 Scala 中得到标记来表示基础表中的表,他们使用一个前缀’标识。隐式转换这些标记为被 SQL 执行引擎评估的表达式。支持这些功能的完成列表可以再 ScalaDoc 找到。

关于“Spark SQL 编程的示例分析”这篇文章就分享到这里了,希望以上内容可以对大家有一定的帮助,使各位可以学到更多知识,如果觉得文章不错,请把它分享出去让更多的人看到。

正文完
 
丸趣
版权声明:本站原创文章,由 丸趣 2023-08-16发表,共计5995字。
转载说明:除特殊说明外本站除技术相关以外文章皆由网络搜集发布,转载请注明出处。
评论(没有评论)