Spark

85次阅读
没有评论

共计 6476 个字符,预计需要花费 17 分钟才能阅读完成。

这篇文章主要为大家展示了“Spark-sql 如何创建外部分区表”,内容简而易懂,条理清晰,希望能够帮助大家解决疑惑,下面让丸趣 TV 小编带领大家一起研究并学习一下“Spark-sql 如何创建外部分区表”这篇文章吧。

一、Spark-sql 创建外部分区表

1. 使用 spark-sql

spark-sql --queue spark --master yarn --deploy-mode client --num-executors 10 --executor-cores 2 --executor-memory 3G

2.spark-sql 中创建 parquet 分区表:

create external table pgls.convert_parq(
bill_num string,
logis_id string,
store_id string,
store_code string,
creater_id string,
order_status INT,
pay_status INT,
order_require_varieties INT,
order_require_amount decimal(19,4),
order_rec_amount decimal(19,4),
order_rec_gpf decimal(19,4),
deli_fee FLOAT,
order_type INT,
last_modify_time timestamp,
order_submit_time timestamp
) 
partitioned by(order_submit_date date)
row format serde  org.apache.hadoop.hive.ql.io.parquet.serde.ParquetHiveSerDe 
stored as parquetfile
location  /test/spark/convert/parquet/bill_parq/

二、CSV 转 Parquet

代码:org.apache.spark.ConvertToParquet.scala

package org.apache.spark
import com.ecfront.fs.operation.HDFSOperation
import org.apache.hadoop.conf.Configuration
import org.apache.spark.sql.SQLContext
import org.apache.spark.sql.types._
* CSV  转换为  parquet
*  参数:输入路径,  输出路径,  分区数
object ConvertToParquet{def main(args: Array[String]) {if(args.length != 3){println( jar args: inputFiles outPath numpartitions)
System.exit(0)
val inputPath = args(0)
val outPath = args(1)
val numPartitions = args(2).toInt
println(==========================================)
println(=========input:  + inputPath )
println(=========output:  + outPath )
println(==numPartitions:  + numPartitions )
println(==========================================)
// 判断输出目录是否存在,存在则删除
val fo = HDFSOperation(new Configuration())
val existDir = fo.existDir(outPath)
if(existDir) {println( HDFS exists outpath:   + outPath)
println(start to delete ...)
val isDelete = fo.deleteDir(outPath)
if(isDelete){println(outPath +  delete done. )
val conf = new SparkConf()
val sc = new SparkContext(conf) // 参数 SparkConf 创建 SparkContext,
val sqlContext = new SQLContext(sc) // 参数 SparkContext 创建 SQLContext
val schema = StructType(Array(StructField( bill_num ,DataTypes.StringType,false),
StructField(logis_id ,DataTypes.StringType,false),
StructField(store_id ,DataTypes.StringType,false),
StructField(store_code ,DataTypes.StringType,false),
StructField(creater_id ,DataTypes.StringType,false),
StructField(order_status ,DataTypes.IntegerType,false),
StructField(pay_status ,DataTypes.IntegerType,false),
StructField(order_require_varieties ,DataTypes.IntegerType,false),
StructField(order_require_amount ,DataTypes.createDecimalType(19,4),false),
StructField(order_rec_amount ,DataTypes.createDecimalType(19,4),false),
StructField(order_rec_gpf ,DataTypes.createDecimalType(19,4),false),
StructField(deli_fee ,DataTypes.FloatType,false),
StructField(order_type ,DataTypes.IntegerType,false),
StructField(last_modify_time ,DataTypes.TimestampType,false),
StructField(order_submit_time ,DataTypes.TimestampType,false),
StructField(order_submit_date ,DataTypes.DateType,false)))
convert(sqlContext, inputPath, schema, outPath, numPartitions)
//CSV 转换为 parquet
def convert(sqlContext: SQLContext, inputpath: String, schema: StructType, outpath: String, numPartitions: Int) {
//  将 text 导入到 DataFrame
val df = sqlContext.read.format(com.databricks.spark.csv).
schema(schema).option(delimiter ,  ,).load(inputpath)
//  转换为 parquet
// df.write.parquet(outpath) //  转换时以 block 数为分区数
df.coalesce(numPartitions).write.parquet(outpath) // 自定义分区数
}
 打包后 jar 上传至本地目录:/soft/sparkTest/convert/spark.mydemo-1.0-SNAPSHOT.jar
事先在 HDFS 上生成 CSV 文件,HDFS 目录:/test/spark/convert/data/order/2016-05-01/
执行命令:
spark-submit --queue spark --master yarn --num-executors 10 --executor-cores 2 --executor-memory 3G --class org.apache.spark.ConvertToParquet --name ConvertToParquet file:/soft/sparkTest/convert/spark.mydemo-1.0-SNAPSHOT.jar /test/spark/convert/data/order/2016-05-01/ /test/spark/convert/parquet/bill_parq/order_submit_date=2016-05-01

pom.xml 相关内容:

1. 依赖包:

dependencies 
 !--  操作 HDFS -- 
 dependency 
  groupId com.ecfront /groupId 
  artifactId ez-fs /artifactId 
  version 0.9 /version 
  /dependency 
 !--spark -- 
 dependency 
  groupId org.apache.spark /groupId 
  artifactId spark-core_2.10 /artifactId 
  version 1.6.1 /version 
  /dependency 
  dependency 
  groupId org.apache.spark /groupId 
  artifactId spark-sql_2.10 /artifactId 
  version 1.6.1 /version 
  /dependency 
 !--spark csv-- 
 dependency 
  groupId com.databricks /groupId 
  artifactId spark-csv_2.11 /artifactId 
  version 1.4.0 /version 
  /dependency 
 !--hadoop -- 
 dependency 
  groupId org.apache.hadoop /groupId 
  artifactId hadoop-client /artifactId 
  version 2.6.0 /version 
  /dependency 
 /dependencies

    2.plugins(含打入依赖包)

build 
  pluginManagement 
  plugins 
  plugin 
  groupId net.alchim31.maven /groupId 
  artifactId scala-maven-plugin /artifactId 
  version 3.2.1 /version 
  /plugin 
  plugin 
  groupId org.apache.maven.plugins /groupId 
  artifactId maven-compiler-plugin /artifactId 
  version 2.0.2 /version 
  /plugin 
  plugin 
  groupId org.apache.maven.plugins /groupId 
  artifactId maven-shade-plugin /artifactId 
  version 1.4 /version 
  configuration 
  filters 
  filter 
  artifact *:* /artifact 
  excludes 
  exclude META-INF/*.SF /exclude 
  exclude META-INF/*.DSA /exclude 
  exclude META-INF/*.RSA /exclude 
  /excludes 
  /filter 
  /filters 
  /configuration 
  /plugin 
  /plugins 
  /pluginManagement 
  plugins 
  plugin 
  groupId net.alchim31.maven /groupId 
  artifactId scala-maven-plugin /artifactId 
  executions 
  execution 
  id scala-compile-first /id 
  phase process-resources /phase 
  goals 
  goal add-source /goal 
  goal compile /goal 
  /goals 
  /execution 
  execution 
  id scala-test-compile /id 
  phase process-test-resources /phase 
  goals 
  goal testCompile /goal 
  /goals 
  /execution 
  /executions 
  /plugin 
  plugin 
  groupId org.apache.maven.plugins /groupId 
  artifactId maven-compiler-plugin /artifactId 
  executions 
  execution 
  phase compile /phase 
  goals 
  goal compile /goal 
  /goals 
  /execution 
  /executions 
  /plugin 
  plugin 
  groupId org.apache.maven.plugins /groupId 
  artifactId maven-shade-plugin /artifactId 
  version 1.4 /version 
  configuration 
  createDependencyReducedPom true /createDependencyReducedPom 
  /configuration 
  executions 
  execution 
  phase package /phase 
  goals 
  goal shade /goal 
  /goals 
  configuration 
  transformers 
  transformer
implementation= org.apache.maven.plugins.shade.resource.ServicesResourceTransformer / 
  transformer
implementation= org.apache.maven.plugins.shade.resource.ManifestResourceTransformer 
  mainClass org.apache.spark.ConvertToParquet /mainClass 
  /transformer 
  /transformers 
  /configuration 
  /execution 
  /executions 
  /plugin 
  /plugins 
 /build

三、表添加分区

spark-sql 下执行

alter table pgls.convert_parq add partition(order_submit_date= 2016-05-01

可通过 sql 查询到相应数据:

select * from pgls.convert_parq where order_submit_date= 2016-05-01  limit 5;

Spark-sql 如何创建外部分区表

以上是“Spark-sql 如何创建外部分区表”这篇文章的所有内容,感谢各位的阅读!相信大家都有了一定的了解,希望分享的内容对大家有所帮助,如果还想学习更多知识,欢迎关注丸趣 TV 行业资讯频道!

正文完
 
丸趣
版权声明:本站原创文章,由 丸趣 2023-08-25发表,共计6476字。
转载说明:除特殊说明外本站除技术相关以外文章皆由网络搜集发布,转载请注明出处。
评论(没有评论)