共计 3025 个字符,预计需要花费 8 分钟才能阅读完成。
本篇内容介绍了“RDS 与 POLARDB 归档到 X -Pack Spark 计算的方法”的有关知识,在实际案例的操作过程中,不少人都会遇到这样的困境,接下来就让丸趣 TV 小编带领大家学习一下如何处理这些情况吧!希望大家仔细阅读,能够学有所成!
X-Pack Spark 服务通过外部计算资源的方式,为 Redis、Cassandra、MongoDB、HBase、RDS 存储服务提供复杂分析、流式处理及入库、机器学习的能力,从而更好的解决用户数据处理相关场景问题。
RDS POLARDB 分表归档到 X -Pack Spark 步骤
一键关联 POLARDB 到 Spark 集群
POLARDB 表存储
在 database‘test1’中每 5 分钟生成一张表,这里假设为表 test1、test2、test2、…
具体的建表语句如下:
* 请左右滑动阅览
CREATE TABLE `test1` ( `a` int(11) NOT NULL,
`b` time DEFAULT NULL,
`c` double DEFAULT NULL,
PRIMARY KEY (`a`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8
归档到 Spark 的调试
x-pack spark 提供交互式查询模式支持直接在控制台提交 sql、python 脚本、scala code 来调试。
1、首先创建一个交互式查询的 session,在其中添加 mysql-connector 的 jar 包。
2、创建交互式查询
以 pyspark 为例,下面是具体归档 demo 的代码:
* 请左右滑动阅览
spark.sql(drop table sparktest).show()
# 创建一张 spark 表,三级分区,分别是天、小时、分钟,最后一级分钟用来存储具体的 5 分钟的一张 polardb 表达的数据。字段和 polardb 里面的类型一致
spark.sql(CREATE table sparktest(a int , b timestamp , c double ,dt string,hh string,mm string)
USING parquet PARTITIONED BY (dt ,hh ,mm ) ).show()
#本例子在 polardb 里面创建了 databse test1,具有三张表 test1 ,test2,test3, 这里遍历这三张表,每个表存储 spark 的一个 5min 的分区
# CREATE TABLE `test1` (# `a` int(11) NOT NULL,
# `b` time DEFAULT NULL,
# `c` double DEFAULT NULL,
# PRIMARY KEY (`a`)
# ) ENGINE=InnoDB DEFAULT CHARSET=utf8
for num in range(1, 4):
# 构造 polardb 的表名
dbtable = test1. + test + str(num)
#spark 外表关联 polardb 对应的表
externalPolarDBTableNow = spark.read \
.format(jdbc) \
.option(driver , com.mysql.jdbc.Driver) \
.option(url , jdbc:mysql://pc-xxx.mysql.polardb.rds.aliyuncs.com:3306) \
.option(dbtable , dbtable) \
.option(user , name) \
.option(password , xxx*) \
.load().registerTempTable( polardbTableTemp)
# 生成本次 polardb 表数据要写入的 spark 表的分区信息
(dtValue, hhValue, mmValue) = (20191015 , 13 , str(05 * num))
# 执行导数据 sql
spark.sql(insert into sparktest partition(dt= %s ,hh= %s , mm=%s )
select * from polardbTableTemp % (dtValue, hhValue, mmValue)).show()
# 删除临时的 spark 映射 polardb 表的 catalog
spark.catalog.dropTempView(polardbTableTemp)
# 查看下分区以及统计下数据,主要用来做测试验证,实际运行过程可以删除
spark.sql(show partitions sparktest).show(1000, False)
spark.sql(select count(*) from sparktest ).show()
归档作业上生产
交互式查询定位为临时查询及调试,生产的作业还是建议使用 spark 作业的方式运行,使用文档参考。这里以 pyspark 作业为例:
/polardb/polardbArchiving.py 内容如下:
* 请左右滑动阅览
# -*- coding: UTF-8 -*-
from __future__ import print_function
import sys
from operator import add
from pyspark.sql import SparkSession
if __name__ == __main__ :
spark = SparkSession \
.builder \
.appName(PolardbArchiving) \
.enableHiveSupport() \
.getOrCreate()
spark.sql(drop table sparktest).show()
# 创建一张 spark 表,三级分区,分别是天、小时、分钟,最后一级分钟用来存储具体的 5 分钟的一张 polardb 表达的数据。字段和 polardb 里面的类型一致
spark.sql(CREATE table sparktest(a int , b timestamp , c double ,dt string,hh string,mm string)
USING parquet PARTITIONED BY (dt ,hh ,mm ) ).show()
# 本例子在 polardb 里面创建了 databse test1,具有三张表 test1 ,test2,test3, 这里遍历这三张表,每个表存储 spark 的一个 5min 的分区
# CREATE TABLE `test1` ( # `a` int(11) NOT NULL,
# `b` time DEFAULT NULL,
# `c` double DEFAULT NULL,
# PRIMARY KEY (`a`)
# ) ENGINE=InnoDB DEFAULT CHARSET=utf8
for num in range(1, 4):
# 构造 polardb 的表名
dbtable = test1.
“RDS 与 POLARDB 归档到 X -Pack Spark 计算的方法”的内容就介绍到这里了,感谢大家的阅读。如果想了解更多行业相关的知识可以关注丸趣 TV 网站,丸趣 TV 小编将为大家输出更多高质量的实用文章!