共计 6476 个字符,预计需要花费 17 分钟才能阅读完成。
这篇文章主要为大家展示了“Spark-sql 如何创建外部分区表”,内容简而易懂,条理清晰,希望能够帮助大家解决疑惑,下面让丸趣 TV 小编带领大家一起研究并学习一下“Spark-sql 如何创建外部分区表”这篇文章吧。
一、Spark-sql 创建外部分区表
1. 使用 spark-sql
spark-sql --queue spark --master yarn --deploy-mode client --num-executors 10 --executor-cores 2 --executor-memory 3G
2.spark-sql 中创建 parquet 分区表:
create external table pgls.convert_parq(
bill_num string,
logis_id string,
store_id string,
store_code string,
creater_id string,
order_status INT,
pay_status INT,
order_require_varieties INT,
order_require_amount decimal(19,4),
order_rec_amount decimal(19,4),
order_rec_gpf decimal(19,4),
deli_fee FLOAT,
order_type INT,
last_modify_time timestamp,
order_submit_time timestamp
)
partitioned by(order_submit_date date)
row format serde org.apache.hadoop.hive.ql.io.parquet.serde.ParquetHiveSerDe
stored as parquetfile
location /test/spark/convert/parquet/bill_parq/
二、CSV 转 Parquet
代码:org.apache.spark.ConvertToParquet.scala
package org.apache.spark
import com.ecfront.fs.operation.HDFSOperation
import org.apache.hadoop.conf.Configuration
import org.apache.spark.sql.SQLContext
import org.apache.spark.sql.types._
* CSV 转换为 parquet
* 参数:输入路径, 输出路径, 分区数
object ConvertToParquet{def main(args: Array[String]) {if(args.length != 3){println( jar args: inputFiles outPath numpartitions)
System.exit(0)
val inputPath = args(0)
val outPath = args(1)
val numPartitions = args(2).toInt
println(==========================================)
println(=========input: + inputPath )
println(=========output: + outPath )
println(==numPartitions: + numPartitions )
println(==========================================)
// 判断输出目录是否存在,存在则删除
val fo = HDFSOperation(new Configuration())
val existDir = fo.existDir(outPath)
if(existDir) {println( HDFS exists outpath: + outPath)
println(start to delete ...)
val isDelete = fo.deleteDir(outPath)
if(isDelete){println(outPath + delete done. )
val conf = new SparkConf()
val sc = new SparkContext(conf) // 参数 SparkConf 创建 SparkContext,
val sqlContext = new SQLContext(sc) // 参数 SparkContext 创建 SQLContext
val schema = StructType(Array(StructField( bill_num ,DataTypes.StringType,false),
StructField(logis_id ,DataTypes.StringType,false),
StructField(store_id ,DataTypes.StringType,false),
StructField(store_code ,DataTypes.StringType,false),
StructField(creater_id ,DataTypes.StringType,false),
StructField(order_status ,DataTypes.IntegerType,false),
StructField(pay_status ,DataTypes.IntegerType,false),
StructField(order_require_varieties ,DataTypes.IntegerType,false),
StructField(order_require_amount ,DataTypes.createDecimalType(19,4),false),
StructField(order_rec_amount ,DataTypes.createDecimalType(19,4),false),
StructField(order_rec_gpf ,DataTypes.createDecimalType(19,4),false),
StructField(deli_fee ,DataTypes.FloatType,false),
StructField(order_type ,DataTypes.IntegerType,false),
StructField(last_modify_time ,DataTypes.TimestampType,false),
StructField(order_submit_time ,DataTypes.TimestampType,false),
StructField(order_submit_date ,DataTypes.DateType,false)))
convert(sqlContext, inputPath, schema, outPath, numPartitions)
//CSV 转换为 parquet
def convert(sqlContext: SQLContext, inputpath: String, schema: StructType, outpath: String, numPartitions: Int) {
// 将 text 导入到 DataFrame
val df = sqlContext.read.format(com.databricks.spark.csv).
schema(schema).option(delimiter , ,).load(inputpath)
// 转换为 parquet
// df.write.parquet(outpath) // 转换时以 block 数为分区数
df.coalesce(numPartitions).write.parquet(outpath) // 自定义分区数
}
打包后 jar 上传至本地目录:/soft/sparkTest/convert/spark.mydemo-1.0-SNAPSHOT.jar
事先在 HDFS 上生成 CSV 文件,HDFS 目录:/test/spark/convert/data/order/2016-05-01/
执行命令:
spark-submit --queue spark --master yarn --num-executors 10 --executor-cores 2 --executor-memory 3G --class org.apache.spark.ConvertToParquet --name ConvertToParquet file:/soft/sparkTest/convert/spark.mydemo-1.0-SNAPSHOT.jar /test/spark/convert/data/order/2016-05-01/ /test/spark/convert/parquet/bill_parq/order_submit_date=2016-05-01
pom.xml 相关内容:
1. 依赖包:
dependencies
!-- 操作 HDFS --
dependency
groupId com.ecfront /groupId
artifactId ez-fs /artifactId
version 0.9 /version
/dependency
!--spark --
dependency
groupId org.apache.spark /groupId
artifactId spark-core_2.10 /artifactId
version 1.6.1 /version
/dependency
dependency
groupId org.apache.spark /groupId
artifactId spark-sql_2.10 /artifactId
version 1.6.1 /version
/dependency
!--spark csv--
dependency
groupId com.databricks /groupId
artifactId spark-csv_2.11 /artifactId
version 1.4.0 /version
/dependency
!--hadoop --
dependency
groupId org.apache.hadoop /groupId
artifactId hadoop-client /artifactId
version 2.6.0 /version
/dependency
/dependencies
2.plugins(含打入依赖包)
build
pluginManagement
plugins
plugin
groupId net.alchim31.maven /groupId
artifactId scala-maven-plugin /artifactId
version 3.2.1 /version
/plugin
plugin
groupId org.apache.maven.plugins /groupId
artifactId maven-compiler-plugin /artifactId
version 2.0.2 /version
/plugin
plugin
groupId org.apache.maven.plugins /groupId
artifactId maven-shade-plugin /artifactId
version 1.4 /version
configuration
filters
filter
artifact *:* /artifact
excludes
exclude META-INF/*.SF /exclude
exclude META-INF/*.DSA /exclude
exclude META-INF/*.RSA /exclude
/excludes
/filter
/filters
/configuration
/plugin
/plugins
/pluginManagement
plugins
plugin
groupId net.alchim31.maven /groupId
artifactId scala-maven-plugin /artifactId
executions
execution
id scala-compile-first /id
phase process-resources /phase
goals
goal add-source /goal
goal compile /goal
/goals
/execution
execution
id scala-test-compile /id
phase process-test-resources /phase
goals
goal testCompile /goal
/goals
/execution
/executions
/plugin
plugin
groupId org.apache.maven.plugins /groupId
artifactId maven-compiler-plugin /artifactId
executions
execution
phase compile /phase
goals
goal compile /goal
/goals
/execution
/executions
/plugin
plugin
groupId org.apache.maven.plugins /groupId
artifactId maven-shade-plugin /artifactId
version 1.4 /version
configuration
createDependencyReducedPom true /createDependencyReducedPom
/configuration
executions
execution
phase package /phase
goals
goal shade /goal
/goals
configuration
transformers
transformer
implementation= org.apache.maven.plugins.shade.resource.ServicesResourceTransformer /
transformer
implementation= org.apache.maven.plugins.shade.resource.ManifestResourceTransformer
mainClass org.apache.spark.ConvertToParquet /mainClass
/transformer
/transformers
/configuration
/execution
/executions
/plugin
/plugins
/build
三、表添加分区
spark-sql 下执行
alter table pgls.convert_parq add partition(order_submit_date= 2016-05-01
可通过 sql 查询到相应数据:
select * from pgls.convert_parq where order_submit_date= 2016-05-01 limit 5;
以上是“Spark-sql 如何创建外部分区表”这篇文章的所有内容,感谢各位的阅读!相信大家都有了一定的了解,希望分享的内容对大家有所帮助,如果还想学习更多知识,欢迎关注丸趣 TV 行业资讯频道!
正文完