RDS与POLARDB归档到X

79次阅读
没有评论

共计 3025 个字符,预计需要花费 8 分钟才能阅读完成。

本篇内容介绍了“RDS 与 POLARDB 归档到 X -Pack Spark 计算的方法”的有关知识,在实际案例的操作过程中,不少人都会遇到这样的困境,接下来就让丸趣 TV 小编带领大家学习一下如何处理这些情况吧!希望大家仔细阅读,能够学有所成!

X-Pack Spark 服务通过外部计算资源的方式,为 Redis、Cassandra、MongoDB、HBase、RDS 存储服务提供复杂分析、流式处理及入库、机器学习的能力,从而更好的解决用户数据处理相关场景问题。

RDS POLARDB 分表归档到 X -Pack Spark 步骤

一键关联 POLARDB 到 Spark 集群

POLARDB 表存储

在 database‘test1’中每 5 分钟生成一张表,这里假设为表 test1、test2、test2、…

具体的建表语句如下:

* 请左右滑动阅览

 CREATE TABLE `test1` ( `a` int(11) NOT NULL,
 `b` time DEFAULT NULL, 
 `c` double DEFAULT NULL,
 PRIMARY KEY (`a`)
 ) ENGINE=InnoDB DEFAULT CHARSET=utf8

归档到 Spark 的调试

x-pack spark 提供交互式查询模式支持直接在控制台提交 sql、python 脚本、scala code 来调试。

1、首先创建一个交互式查询的 session,在其中添加 mysql-connector 的 jar 包。

2、创建交互式查询

以 pyspark 为例,下面是具体归档 demo 的代码:

* 请左右滑动阅览

spark.sql(drop table sparktest).show()
#  创建一张 spark 表,三级分区,分别是天、小时、分钟,最后一级分钟用来存储具体的 5 分钟的一张 polardb 表达的数据。字段和 polardb 里面的类型一致
spark.sql(CREATE table sparktest(a int , b timestamp , c double ,dt string,hh string,mm string)  
  USING parquet PARTITIONED BY (dt ,hh ,mm ) ).show()
#本例子在 polardb 里面创建了 databse test1,具有三张表 test1 ,test2,test3, 这里遍历这三张表,每个表存储 spark 的一个 5min 的分区
# CREATE TABLE `test1` (# `a` int(11) NOT NULL,
# `b` time DEFAULT NULL,
# `c` double DEFAULT NULL,
# PRIMARY KEY (`a`)
# ) ENGINE=InnoDB DEFAULT CHARSET=utf8
for num in range(1, 4): 
 # 构造 polardb 的表名
 dbtable =  test1.  +  test  + str(num)
 #spark 外表关联 polardb 对应的表
 externalPolarDBTableNow = spark.read \
 .format(jdbc) \
 .option(driver ,  com.mysql.jdbc.Driver) \
 .option(url ,  jdbc:mysql://pc-xxx.mysql.polardb.rds.aliyuncs.com:3306) \
 .option(dbtable , dbtable) \
 .option(user ,  name) \
 .option(password ,  xxx*) \
 .load().registerTempTable( polardbTableTemp)
 # 生成本次 polardb 表数据要写入的 spark 表的分区信息
 (dtValue, hhValue, mmValue) = (20191015 ,  13 , str(05 * num))
 # 执行导数据 sql 
 spark.sql(insert into sparktest partition(dt= %s ,hh= %s , mm=%s )  
  select * from polardbTableTemp   % (dtValue, hhValue, mmValue)).show()
 # 删除临时的 spark 映射 polardb 表的 catalog
 spark.catalog.dropTempView(polardbTableTemp)
 # 查看下分区以及统计下数据,主要用来做测试验证,实际运行过程可以删除
 spark.sql(show partitions sparktest).show(1000, False)
 spark.sql(select count(*) from sparktest ).show()

归档作业上生产

交互式查询定位为临时查询及调试,生产的作业还是建议使用 spark 作业的方式运行,使用文档参考。这里以 pyspark 作业为例:

/polardb/polardbArchiving.py 内容如下:

* 请左右滑动阅览

# -*- coding: UTF-8 -*-
from __future__ import print_function
import sys
from operator import add
from pyspark.sql import SparkSession
if __name__ ==  __main__ :
 spark = SparkSession \
 .builder \
 .appName(PolardbArchiving) \
 .enableHiveSupport() \
 .getOrCreate()
 spark.sql(drop table sparktest).show()
 #  创建一张 spark 表,三级分区,分别是天、小时、分钟,最后一级分钟用来存储具体的 5 分钟的一张 polardb 表达的数据。字段和 polardb 里面的类型一致
 spark.sql(CREATE table sparktest(a int , b timestamp , c double ,dt string,hh string,mm string)  
  USING parquet PARTITIONED BY (dt ,hh ,mm ) ).show()
 # 本例子在 polardb 里面创建了 databse test1,具有三张表 test1 ,test2,test3, 这里遍历这三张表,每个表存储 spark 的一个 5min 的分区
 # CREATE TABLE `test1` ( # `a` int(11) NOT NULL,
 # `b` time DEFAULT NULL,
 # `c` double DEFAULT NULL,
 # PRIMARY KEY (`a`)
 # ) ENGINE=InnoDB DEFAULT CHARSET=utf8
 for num in range(1, 4):
 # 构造 polardb 的表名
 dbtable =  test1.

“RDS 与 POLARDB 归档到 X -Pack Spark 计算的方法”的内容就介绍到这里了,感谢大家的阅读。如果想了解更多行业相关的知识可以关注丸趣 TV 网站,丸趣 TV 小编将为大家输出更多高质量的实用文章!

正文完
 
丸趣
版权声明:本站原创文章,由 丸趣 2023-08-04发表,共计3025字。
转载说明:除特殊说明外本站除技术相关以外文章皆由网络搜集发布,转载请注明出处。
评论(没有评论)