MaxCompute Spark开发的示例分析

57次阅读
没有评论

共计 13742 个字符,预计需要花费 35 分钟才能阅读完成。

这期内容当中丸趣 TV 小编将会给大家带来有关 MaxCompute Spark 开发的示例分析,文章内容丰富且以专业的角度为大家分析和叙述,阅读完这篇文章希望大家可以有所收获。

MaxCompute Spark 开发 0. 概述

MaxCompute Spark 是 MaxCompute 提供的兼容开源的 Spark 计算服务,它在统一的计算资源和数据集权限体系之上,提供 Spark 计算框架,支持用户以熟悉的开发使用方式提交运行 Spark 作业,以满足更丰富的数据处理分析场景。

下面将重点介绍 MaxCompute Spark 能够支撑的应用场景,同时说明开发的依赖条件和环境准备,重点对 Spark 作业开发、提交到 MaxCompute 集群执行、诊断进行介绍。

1. 前提条件

MaxCompute Spark 是阿里云提供的 Spark on MaxCompute 的解决方案,能够让 Spark 应用运行在托管的 MaxCompute 计算环境中。为了能够在 MaxCompute 环境中安全地运行 Spark 作业,MaxCompute 提供了以下 SDK 和 MaxCompute Spark 定制发布包。

SDK 定位于开源应用接入 MaxCompute SDK:
提供了集成所需的 API 说明以及相关功能 Demo,用户可以基于项目提供的 Spark-1.x 以及 Spark-2.x 的 example 项目构建自己的应用,并且提交到 MaxCompute 集群上。
MaxCompute Spark 客户端发布包:
集成了 MaxCompute 认证功功能,作为客户端工具,用于通过 Spark-submit 方式提交作业到 MaxCompute 项目中运行,目前提供了面向 Spark1.x 和 Spark2.x 的 2 个发布包:spark-1.6.3 和 spark-2.3.0 SDK 在开发时,可以通过配置 Maven 依赖进行引用。Spark 客户端需要根据开发的 Spark 版本,提前下载。如,需要开发 Spark1.x 应用,应下载 spark-1.6.3 版本客户端;如需开发 Spark2.x 应用,应下载 spark-2.3.0 客户端。

2. 开发环境准备 2.1 Maxcompute Spark 客户端准备

MaxCompute Spark 发布包:集成了 MaxCompute 认证功功能,作为客户端工具,用于通过 Spark-submit 方式提交作业到 MaxCompute 项目中运行,目前提供了面向 Spark1.x 和 Spark2.x 的 2 个发布包:

spark-1.6.3

spark-2.3.0

请根据需要开发的 Spark 版本,选择合适的版本下载并解压 Maxcompute Spark 发布包。

2.2 设置环境变量

JAVA_HOME 设置

# 尽量使用 JDK 1.7+ 1.8+ 最佳
export JAVA_HOME=/path/to/jdk
export CLASSPATH=.:$JAVA_HOME/lib/dt.jar:$JAVA_HOME/lib/tools.jar
export PATH=$JAVA_HOME/bin:$PATH

SPARK_HOME 设置

export SPARK_HOME=/path/to/spark_extracted_package
export PATH=$SPARK_HOME/bin:$PATH

2.3 设置 Spark-defaults.conf

在 $SPARK_HOME/conf
路径下存在 spark-defaults.conf.template 文件,这个可以作为 spark-defaults.conf 的模版,需要在该文件中设置 MaxCompute 相关的账号信息后,才可以提交 Spark 任务到 MaxCompute。默认配置内容如下,将空白部分根据实际的账号信息填上即可,其余的配置可以保持不变。

# MaxCompute 账号信息
spark.hadoop.odps.project.name =
spark.hadoop.odps.access.id =
spark.hadoop.odps.access.key =
# 以下配置保持不变
spark.sql.catalogImplementation=odps
spark.hadoop.odps.task.major.version = cupid_v2
spark.hadoop.odps.cupid.container.image.enable = true
spark.hadoop.odps.cupid.container.vm.engine.type = hyper
spark.hadoop.odps.end.point = http://service.cn.maxcompute.aliyun.com/api
spark.hadoop.odps.runtime.end.point = http://service.cn.maxcompute.aliyun-inc.com/api

3. 访问 MaxCompute 表所需依赖

若作业需要访问 MaxCompute 表,需要依赖 odps-spark-datasource 模块,本节介绍如何把该依赖编译安装到本地 maven 仓库;若无需访问可直接跳过。

git clone 代码,github 地址: https://github.com/aliyun/aliyun-cupid-sdk/tree/3.3.2-public

#git clone git@github.com:aliyun/aliyun-cupid-sdk.git

编译模块

#cd ${path to aliyun-cupid-sdk}
#git checkout 3.3.2-public

// 编译并安装 cupid-sdk
#cd ${path to aliyun-cupid-sdk}/core/cupid-sdk/
#mvn clean install -DskipTests

// 编译并安装 datasource。依赖 cupid-sdk
// for spark-2.x
# cd ${path to aliyun-cupid-sdk}/spark/spark-2.x/datasource
# mvn clean install -DskipTests
// for spark-1.x
# cd ${path to aliyun-cupid-sdk}/spark/spark-1.x/datasource
#mvn clean install -DskipTests

添加依赖

!– Spark-1.x 请依赖此模块 —
dependency
groupId com.aliyun.odps /groupId
artifactId odps-spark-datasource_2.10 /artifactId
version 3.3.2-public /version
/dependency

!– Spark-2.x 请依赖此模块 —
dependency
  groupId com.aliyun.odps /groupId
  artifactId odps-spark-datasource_2.11 /artifactId
  version 3.3.2-public /version
/dependency

4. OSS 依赖

若作业需要访问 OSS,直接添加以下依赖即可

dependency
  groupId com.aliyun.odps /groupId
  artifactId hadoop-fs-oss /artifactId
  version 3.3.2-public /version
/dependency

5. 应用开发

MaxCompute 产品提供了两个应用构建的模版,用户可以基于此模版进行开发,最后统一构建整个项目后用生成的应用包即可直接提交到 MaxCompute 集群上运行 Spark 应用。

5.1 通过模版构建应用

MaxCompute Spark 提供两个应用构建模版,用户可以基于此模版进行开发,最后统一构建整个项目后用生成的应用包即可直接提交到 MaxCompute 集群上运行 Spark 应用。首先需要把代码 clone 下来

#git clone git@github.com:aliyun/aliyun-cupid-sdk.git
#cd aliyun-cupid-sdk
#checkout 3.3.2-public
#cd archetypes

// for Spark-1.x
sh Create-AliSpark-1.x-APP.sh spark-1.x-demo /tmp

// for Spark-2.x
Create-AliSpark-2.x-APP.sh spark-2.x-demo /tmp

以上命令会在 /tmp 目录下创建名为 spark-1.x-demo(spark-2.x-demo)的 maven project,执行以下命令进行编译和提交作业:

#cd /tmp/spark-2.x/demo
#mvn clean package

// 提交作业
$SPARK_HOME/bin/spark-submit \
–master yarn-cluster \
–class SparkPi \
/tmp/spark-2.x-demo/target/AliSpark-2.x-quickstart-1.0-SNAPSHOT-shaded.jar

# Usage: sh Create-AliSpark-2.x-APP.sh app_name target_path
sh Create-AliSpark-2.x-APP.sh spark-2.x-demo /tmp/
cd /tmp/spark-2.x-demo
mvn clean package
# 冒烟测试
# 1 利用编译出来的 shaded jar 包
# 2 按照文档所示下载 MaxCompute Spark 客户端
# 3 参考文档”置环境变量”指引,填写 MaxCompute 项目相关配置项
# 执行 spark-submit 命令 如下
$SPARK_HOME/bin/spark-submit \
  –master yarn-cluster \
  –class SparkPi \
  /tmp/spark-2.x-demo/target/AliSpark-2.x-quickstart-1.0-SNAPSHOT-shaded.jar

5.2 Java/Scala 开发样例 Spark-1.x

pom.xml 须知
请注意 用户构建 Spark 应用的时候,由于是用 MaxCompute 提供的 Spark 客户端去提交应用,故需要注意一些依赖 scope 的定义

spark-core spark-sql 等所有 spark 社区发布的包,用 provided scope

odps-spark-datasource 用默认的 compile scope

!– spark 相关依赖, provided —
dependency
  groupId org.apache.spark /groupId
  artifactId spark-mllib_${scala.binary.version} /artifactId
  version ${spark.version} /version
  scope provided /scope
/dependency
dependency
  groupId org.apache.spark /groupId
  artifactId spark-sql_${scala.binary.version} /artifactId
  version ${spark.version} /version
  scope provided /scope
/dependency
dependency
  groupId org.apache.spark /groupId
  artifactId spark-core_${scala.binary.version} /artifactId
  version ${spark.version} /version
  scope provided /scope
/dependency

!– datasource 依赖, 用于访问 MaxCompute 表 —
dependency
  groupId com.aliyun.odps /groupId
  artifactId odps-spark-datasource_${scala.binary.version} /artifactId
  version 3.3.2-public /version
/dependency

案例说明

WordCount

详细代码
提交方式

Step 1. build aliyun-cupid-sdk
Step 2. properly set spark.defaults.conf
Step 3. bin/spark-submit –master yarn-cluster –class \
  com.aliyun.odps.spark.examples.WordCount \
  ${path to aliyun-cupid-sdk}/spark/spark-1.x/spark-examples/target/spark-examples_2.10-version-shaded.jar

Spark-SQL on MaxCompute Table

详细代码
提交方式

# 运行可能会报 Table Not Found 的异常,因为用户的 MaxCompute Project 中没有代码中指定的表
# 可以参考代码中的各种接口,实现对应 Table 的 SparkSQL 应用
Step 1. build aliyun-cupid-sdk
Step 2. properly set spark.defaults.conf
Step 3. bin/spark-submit –master yarn-cluster –class \
  com.aliyun.odps.spark.examples.sparksql.SparkSQL \
  ${path to aliyun-cupid-sdk}/spark/spark-1.x/spark-examples/target/spark-examples_2.10-version-shaded.jar

GraphX PageRank

详细代码
提交方式

Step 1. build aliyun-cupid-sdk
Step 2. properly set spark.defaults.conf
Step 3. bin/spark-submit –master yarn-cluster –class \
  com.aliyun.odps.spark.examples.graphx.PageRank \
  ${path to aliyun-cupid-sdk}/spark/spark-1.x/spark-examples/target/spark-examples_2.10-version-shaded.jar

Mllib Kmeans-ON-OSS

详细代码
提交方式

# 代码中的 OSS 账号信息相关需要填上,再编译提交
conf.set(spark.hadoop.fs.oss.accessKeyId , ***)
conf.set(spark.hadoop.fs.oss.accessKeySecret , ***)
conf.set(spark.hadoop.fs.oss.endpoint , oss-cn-hangzhou-zmf.aliyuncs.com)
Step 1. build aliyun-cupid-sdk
Step 2. properly set spark.defaults.conf
Step 3. bin/spark-submit –master yarn-cluster –class \
  com.aliyun.odps.spark.examples.mllib.KmeansModelSaveToOss \
  ${path to aliyun-cupid-sdk}/spark/spark-1.x/spark-examples/target/spark-examples_2.10-version-shaded.jar

OSS UnstructuredData

详细代码
提交方式

# 代码中的 OSS 账号信息相关需要填上,再编译提交
conf.set(spark.hadoop.fs.oss.accessKeyId , ***)
conf.set(spark.hadoop.fs.oss.accessKeySecret , ***)
conf.set(spark.hadoop.fs.oss.endpoint , oss-cn-hangzhou-zmf.aliyuncs.com)
Step 1. build aliyun-cupid-sdk
Step 2. properly set spark.defaults.conf
Step 3. bin/spark-submit –master yarn-cluster –class \
  com.aliyun.odps.spark.examples.oss.SparkUnstructuredDataCompute \
  ${path to aliyun-cupid-sdk}/spark/spark-1.x/spark-examples/target/spark-examples_2.10-version-shaded.jar

Spark-2.x

pom.xml 须知
请注意 用户构建 Spark 应用的时候,由于是用 MaxCompute 提供的 Spark 客户端去提交应用,故需要注意一些依赖 scope 的定义

spark-core spark-sql 等所有 spark 社区发布的包,用 provided scope

odps-spark-datasource 用默认的 compile scope

!– spark 相关依赖, provided —
dependency
  groupId org.apache.spark /groupId
  artifactId spark-mllib_${scala.binary.version} /artifactId
  version ${spark.version} /version
  scope provided /scope
/dependency
dependency
  groupId org.apache.spark /groupId
  artifactId spark-sql_${scala.binary.version} /artifactId
  version ${spark.version} /version
  scope provided /scope
/dependency
dependency
  groupId org.apache.spark /groupId
  artifactId spark-core_${scala.binary.version} /artifactId
  version ${spark.version} /version
  scope provided /scope
/dependency
dependency
  groupId com.aliyun.odps /groupId
  artifactId cupid-sdk /artifactId
  scope provided /scope
/dependency

!– datasource 依赖, 用于访问 MaxCompute 表 —
dependency
  groupId com.aliyun.odps /groupId
  artifactId odps-spark-datasource_${scala.binary.version} /artifactId
  version 3.3.2-public /version
/dependency

案例说明

WordCount

详细代码
提交方式

Step 1. build aliyun-cupid-sdk
Step 2. properly set spark.defaults.conf
Step 3. bin/spark-submit –master yarn-cluster –class \
  com.aliyun.odps.spark.examples.WordCount \
  ${path to aliyun-cupid-sdk}/spark/spark-2.x/spark-examples/target/spark-examples_2.11-version-shaded.jar

Spark-SQL 操作 MaxCompute 表

详细代码
提交方式

# 运行可能会报 Table Not Found 的异常,因为用户的 MaxCompute Project 中没有代码中指定的表
# 可以参考代码中的各种接口,实现对应 Table 的 SparkSQL 应用
Step 1. build aliyun-cupid-sdk
Step 2. properly set spark.defaults.conf
Step 3. bin/spark-submit –master yarn-cluster –class \
  com.aliyun.odps.spark.examples.sparksql.SparkSQL \
  ${path to aliyun-cupid-sdk}/spark/spark-2.x/spark-examples/target/spark-examples_2.11-version-shaded.jar

GraphX PageRank

详细代码
提交方式

Step 1. build aliyun-cupid-sdk
Step 2. properly set spark.defaults.conf
Step 3. bin/spark-submit –master yarn-cluster –class \
  com.aliyun.odps.spark.examples.graphx.PageRank \
  ${path to aliyun-cupid-sdk}/spark/spark-2.x/spark-examples/target/spark-examples_2.11-version-shaded.jar

Mllib Kmeans-ON-OSS

KmeansModelSaveToOss
详细代码
提交方式

# 代码中的 OSS 账号信息相关需要填上,再编译提交
val spark = SparkSession
  .builder()
  .config(spark.hadoop.fs.oss.accessKeyId , ***)
  .config(spark.hadoop.fs.oss.accessKeySecret , ***)
  .config(spark.hadoop.fs.oss.endpoint , oss-cn-hangzhou-zmf.aliyuncs.com)
  .appName(KmeansModelSaveToOss)
  .getOrCreate()
Step 1. build aliyun-cupid-sdk
Step 2. properly set spark.defaults.conf
Step 3. bin/spark-submit –master yarn-cluster –class \
  com.aliyun.odps.spark.examples.mllib.KmeansModelSaveToOss \
  ${path to aliyun-cupid-sdk}/spark/spark-2.x/spark-examples/target/spark-examples_2.11-version-shaded.jar

OSS UnstructuredData

SparkUnstructuredDataCompute
详细代码
提交方式

# 代码中的 OSS 账号信息相关需要填上,再编译提交
val spark = SparkSession
  .builder()
  .config(spark.hadoop.fs.oss.accessKeyId , ***)
  .config(spark.hadoop.fs.oss.accessKeySecret , ***)
  .config(spark.hadoop.fs.oss.endpoint , oss-cn-hangzhou-zmf.aliyuncs.com)
  .appName(SparkUnstructuredDataCompute)
  .getOrCreate()
Step 1. build aliyun-cupid-sdk
Step 2. properly set spark.defaults.conf
Step 3. bin/spark-submit –master yarn-cluster –class \
  com.aliyun.odps.spark.examples.oss.SparkUnstructuredDataCompute \
  ${path to aliyun-cupid-sdk}/spark/spark-2.x/spark-examples/target/spark-examples_2.11-version-shaded.jar

PySpark 开发样例

需要文件
若需要访问 MaxCompute 表,则需要参考第三节(访问 MaxCompute 表所需依赖) 编译 datasource 包

SparkSQL 应用示例(spark1.6)

from pyspark import SparkContext, SparkConf
from pyspark.sql import OdpsContext

if __name__ == __main__ :
  conf = SparkConf().setAppName( odps_pyspark)
  sc = SparkContext(conf=conf)
  sql_context = OdpsContext(sc)
  df = sql_context.sql(select id, value from cupid_wordcount)
  df.printSchema()
  df.show(200)
 
  df_2 = sql_context.sql(select id, value from cupid_partition_table1 where pt1 = part1)
  df_2.show(200)

  #Create Drop Table
  sql_context.sql(create table TestCtas as select * from cupid_wordcount).show()
  sql_context.sql(drop table TestCtas).show()

提交运行:

./bin/spark-submit \
–jars ${path to odps-spark-datasource_2.10-3.3.2-public.jar} \
example.py

SparkSQL 应用示例(spark2.3)

from pyspark.sql import SparkSession

if __name__ == __main__ :
  spark = SparkSession.builder.appName(spark sql).getOrCreate()

  df = spark.sql(select id, value from cupid_wordcount)
  df.printSchema()
  df.show(10, 200)

  df_2 = spark.sql(SELECT product,category,revenue FROM (SELECT product,category,revenue, dense_rank() OVER (PARTITION BY category ORDER BY revenue DESC) as rank  FROM productRevenue) tmp WHERE  rank = 2
  df_2.printSchema()
  df_2.show(10, 200)

  df_3 = spark.sql(select id, value from cupid_partition_table1 where pt1 = part1)
  df_3.show(10, 200)

  #Create Drop Table
  spark.sql(create table TestCtas as select * from cupid_wordcount).show()
  spark.sql(drop table TestCtas).show()

提交运行:

spark-submit –master yarn-cluster \
–jars ${path to odps-spark-datasource_2.11-3.3.2-public.jar \
example.py

6. 通过 Spark 访问 VPC 环境内服务

对于用户使用 Spark on MaxCompute 对 VPC 环境内的 RDS、Redis、ECS 主机部署的服务等,受限于 VPC 的访问限制,暂时还无法访问,即将在近期支持。

7. 如何把开源 Spark 代码迁移到 Spark on MaxCompute

case1. 作业无需访问 MaxCompute 表和 OSS
用户 jar 包可直接运行,参照第二节准备开发环境和修改配置。注意,对于 spark 或 hadoop 的依赖必须设成 provided。
case2. 作业需要访问 MaxCompute 表
参考第三节编译 datasource 并安装到本地 maven 仓库,在 pom 中添加依赖后重新打包即可。
case3. 作业需要访问 OSS
参考第四节在 pom 中添加依赖后重新打包即可。

8. 任务提交执行

目前 MaxCompute Spark 支持以下几种运行方式:local 模式,cluster 模式,和在 DataWorks 中执行模式。

8.1 Local 模式

local 模式主要是让用户能够方便的调试应用代码,使用方式跟社区相同,我们添加了用 tunnel 读写 ODPS 表的功能。用户可以在 ide 和命令行中使用该模式,需要添加配置 spark.master=local[N],其中 N 表示执行该模式所需要的 cpu 资源。此外,local 模式下的读写表是通过读写 tunnel 完成的,需要在 Spark-defaults.conf 中增加 tunnel 配置项(请根据 MaxCompute 项目所在的 region 及网络环境填写对应的 Tunnel Endpoint 地址):tunnel_end_point=http://dt.cn-beijing.maxcompute.aliyun.com。命令行执行该模式的方式如下:

1.bin/spark-submit –master local[4] \
–class com.aliyun.odps.spark.examples.SparkPi \
${path to aliyun-cupid-sdk}/spark/spark-2.x/spark-examples/target/spark-examples_2.11-version-shaded.jar

8.2 Cluster 模式

在 Cluster 模式中,用户需要指定自定义程序入口 Main,Main 结束(Success or Fail)spark job 就会结束。使用场景适合于离线作业,可以与阿里云 DataWorks 产品结合进行作业调度。命令行提交方式如下:

1.bin/spark-submit –master yarn-cluster \
–class SparkPi \
${ProjectRoot}/spark/spark-2.x/spark-examples/target/spark-examples_2.11-version-shaded.jar

8.3 DataWorks 执行模式

用户可以在 DataWorks 中运行 MaxCompute Spark 离线作业(cluster 模式),以方便与其他类型执行节点集成和调度。

用户需要在 DataWorks 的业务流程中上传并提交 (记得要单击 提交 按钮) 资源:

第二步:在创建的业务流程中,从数据开发组件中选择 ODPS Spark 节点。

双击拖拽到工作流的 Spark 节点,对 Spark 作业进行任务定义: 
          选择 Spark 的版本、任务使用的开发语言,并指定任务所使用的资源文件。这里的资源文件就是第一步在业务流程中预先上传并发布的资源文件。同时,您还可以指定提交作业时的配置项,如 executor 的数量、内存大小等配置项。同时设置配置项:spark.hadoop.odps.cupid.webproxy.endpoint(取值填写项目所在 region 的 endpoint,如 http://service.cn.maxcompute.aliyun-inc.com/api)、spark.hadoop.odps.moye.trackurl.host(取值填写:http://jobview.odps.aliyun.com)
以便能够查看日志中打印出的 jobview 信息。
手动执行 Spark 节点,可以查看该任务的执行日志,从打印出来的日志中可以获取该任务的 logview 和 jobview 的 url,编译进一步查看与诊断

Spark 作业定义完成后,即可以在业务流程中对不同类型服务进行编排、统一调度执行。9. 作业诊断

提交作业后,需要根据作业日志来检查作业是否正常提交并执行,MaxCompute 对于 Spark 作业提供了 Logview 工具以及 Spark Web-UI 来帮助开发者进行作业诊断。

例如,通过 Spark-submit 方式 (dataworks 执行 spark 任务时也会产生相应日志) 提交作业,在作业日志中会打印以下关键内容:

通过日志输出的 logview 在浏览器中可以查看 CUPID 类型的任务执行的基本信息。

 

单击 TempRoot 的 StdOut 按钮可以查看 SparkPi 的输出结果:
 

         
 

 

日志中打印出上述的 TrackingUrl,表示您的作业已经提交到 MaxCompute 集群,这个 TrackingUrl 非常关键,它既是 SparkWebUI,也是 HistoryServer 的 Url。在浏览器中打开这个 Url,可以追踪 Spark 作业的运行情况。

单击 driver 的 stdout 即可以查看 Spark 作业的输出内容。

上述就是丸趣 TV 小编为大家分享的 MaxCompute Spark 开发的示例分析了,如果刚好有类似的疑惑,不妨参照上述分析进行理解。如果想知道更多相关知识,欢迎关注丸趣 TV 行业资讯频道。

正文完
 
丸趣
版权声明:本站原创文章,由 丸趣 2023-08-16发表,共计13742字。
转载说明:除特殊说明外本站除技术相关以外文章皆由网络搜集发布,转载请注明出处。
评论(没有评论)