共计 5466 个字符,预计需要花费 14 分钟才能阅读完成。
如何通过 AWS EMR 降低集群计算成本,相信很多没有经验的人对此束手无策,为此本文总结了问题出现的原因和解决方法,通过这篇文章希望你能解决这个问题。
AWS EMR 是一个计算集群。可以通过 ta 创建自定义配置的虚拟机,并自动安装所需计算框架(Spark,Hadoop,Hive 等),以便用来进行大数据计算。
1. 项目背景
公司目前有一个项目,通过爬虫收集数据,离线计算得到用户画像,并将最终结果写入 rds,通过 api 向外展示数据。
2. 架构演进 2.1 技术栈
计算框架 Spark
调度框架 Airflow
数据存储 Hadoop,Mysql
数仓工具 Hive,Presto
辅助工具 Zepplin
脚本语言 Java,Scala,Python
2.2 第一版
环境
我们在某云厂商开了 6 台虚拟器(4 核 8G),spark on yarn 模式运行,其中 1 台作为主节点,运行 hadoop 主节点和 airflow 调度程序,其余作为数据节点。
计算过程
通过 Spark Streaming 将数据落地到 Hadoop
Airflow 定时向主节点通过 Spark-submit 方式提交命令
Spark 计算后将最终结果写入 Mysql
平时开发人员可以在 Zepplin 进行查询
效果
计算流程可以正常进行
思考
通过一段时间的观察分析,我们发现
大部分计算任务都能在较短时间内完成
机器每天闲置时间很长
业务没有很高的实时性要求
高配置虚拟器成本很高
结论
基于现状,我们希望能有个即开即用的系统,就像电脑一样,要用就打开,用完就关闭。经过调研,最终选择了 AWS 的 EMR。
2.3 第二版
环境
在将系统迁移到 AWS EMR 之后,在 AWS 上开了一台虚拟器(1 核 2G)运行 Airflow 和 Kinesis
这台虚拟器需要一直运行,但 Airflow 本身不需要高配置
计算过程
通过 Kinesis 将数据落到 S3
Airflow 定时发起任务
发起创建 EMR 请求
可自定义机器配置,要安装的计算框架,也可覆盖框架配置。可通过 Python 脚本检测集群是否创建成功
提交计算任务
关闭集群
效果
计算流程可以正常进行,但不需要长开机器了,只需要一台低配来触发定时任务即可
思考
通过一段时间的观察
EMR 费用比起虚拟器,确实便宜很多
可以通过 console 台查看集群状态,控制集群开关
不方便的地方,平时要查看 Hadoop 的数据,需要自己写脚本拉取,不能使用辅助工具了
Talk is cheap, show me the code
准备工作
注册 AWS 账号,登录
开通 EMR,S3
开通 S3 的目的是为了持久化数据,因为 EMR 集群本身不带额外硬盘,需要外部介质储存
开通 AWS 内网可访问的 Mysql
如果不用 Hive,可跳过这一步,同理,需要外部介质储存 Hive 的数据结构
准备创建 EMR 集群的脚本
这里有个坑,开始我们使用的 AWS SDK 来做这件事,但无法自定义计算框架配置(应该是 BUG),最初我们通过修改 SDK 源码解决了这个问题,但后来发现基本没用到 SDK 其他功能时,我们将这部分代码提成了单独的文件,由于使用了 Airflow 进行调度,所以决定用了 Python
编写 Spark 任务,打包上传至 S3
EMR LIB
# coding: UTF-8
import boto3, json, requests, requests
from datetime import datetime
def get_region():
# 这个地址不用改
r = requests.get(http://169.254.169.254/latest/dynamic/instance-identity/document)
response_json = r.json()
return response_json.get(region)
def client(region_name):
global emr
emr = boto3.client(emr , region_name=region_name)
# 创建 EMR
def create_cluster(name):
param = {
# 修改需要的框架
Applications :[{
Name : Hadoop
},{
Name : Hive
},{
Name : Spark
}],
# 这里的名字会显示到控制台
Name :name,
ServiceRole : EMR_DefaultRole ,
Tags :[],
ReleaseLabel : emr-5.26.0 ,
Instances :{
TerminationProtected :False,
EmrManagedMasterSecurityGroup : sg-0085fba9c3a6818f5 ,
InstanceGroups :[{
InstanceCount :1,
Name : 主实例组 - 1 ,
InstanceRole : MASTER ,
EbsConfiguration :{
EbsBlockDeviceConfigs :[{
VolumeSpecification :{
SizeInGB :32,
VolumeType : gp2
},
VolumesPerInstance :1
}]
},
# 修改需要的硬件配置
InstanceType : m4.large ,
Market : ON_DEMAND ,
Configurations :[{
# 修改 Hive 的 meta 源
Classification : hive-site ,
Properties :{
javax.jdo.option.ConnectionURL : jdbc:mysql://host:port/db?useUnicode=true characterEncoding=UTF-8 ,
javax.jdo.option.ConnectionDriverName : org.mariadb.jdbc.Driver ,
javax.jdo.option.ConnectionUserName : user ,
javax.jdo.option.ConnectionPassword : pwd
}
},{
Classification : yarn-env ,
Properties :{},
Configurations :[{
Classification : export ,
Properties :{
AWS_REGION : cn-northwest-1 ,
S3_ENDPOINT : s3.cn-northwest-1.amazonaws.com.cn ,
S3_USE_HTTPS : 0 ,
S3_VERIFY_SSL : 0
}
}]
}]
},{
InstanceRole : CORE ,
InstanceCount :1,
Name : 核心实例组 - 2 ,
Market : ON_DEMAND ,
# 修改需要的硬件配置
InstanceType : r5d.2xlarge ,
Configurations :[{
Classification : hive-site ,
Properties :{
javax.jdo.option.ConnectionURL : jdbc:mysql://host:port/db?useUnicode=true characterEncoding=UTF-8 ,
javax.jdo.option.ConnectionDriverName : org.mariadb.jdbc.Driver ,
javax.jdo.option.ConnectionUserName : user ,
javax.jdo.option.ConnectionPassword : pwd
}
},{
Classification : yarn-env ,
Properties :{},
Configurations :[{
Classification : export ,
Properties :{
AWS_REGION : cn-northwest-1 ,
S3_ENDPOINT : s3.cn-northwest-1.amazonaws.com.cn ,
S3_USE_HTTPS : 0 ,
S3_VERIFY_SSL : 0
}
}]
}]
},{
# 修改需要的工作节点数
InstanceCount :4,
Name : 任务实例组 - 4 ,
InstanceRole : TASK ,
EbsConfiguration :{
EbsBlockDeviceConfigs :[{
VolumeSpecification :{
SizeInGB :32,
VolumeType : gp2
},
VolumesPerInstance :4
}]
},
# 修改需要的硬件配置
InstanceType : r5d.2xlarge ,
Market : ON_DEMAND ,
Configurations :[{
Classification : hive-site ,
Properties :{
javax.jdo.option.ConnectionURL : jdbc:mysql://host:port/db?useUnicode=true characterEncoding=UTF-8 ,
javax.jdo.option.ConnectionDriverName : org.mariadb.jdbc.Driver ,
javax.jdo.option.ConnectionUserName : user ,
javax.jdo.option.ConnectionPassword : pwd
}
},{
Classification : yarn-env ,
Properties :{},
Configurations :[{
Classification : export ,
Properties :{
AWS_REGION : cn-northwest-1 ,
S3_ENDPOINT : s3.cn-northwest-1.amazonaws.com.cn ,
S3_USE_HTTPS : 0 ,
S3_VERIFY_SSL : 0
}
}]
}]
}],
KeepJobFlowAliveWhenNoSteps :True,
Ec2SubnetId : subnet-027bff297ea95039b ,
Ec2KeyName : hifive.airflow ,
EmrManagedSlaveSecurityGroup : sg-05a0e076ee7babb9e
},
JobFlowRole : EMR_EC2_DefaultRole ,
Steps :[{
HadoopJarStep :{ Args :[ state-pusher-script],
Jar : command-runner.jar
},
Name : Setup Hadoop Debugging
}],
ScaleDownBehavior : TERMINATE_AT_TASK_COMPLETION ,
VisibleToAllUsers :True,
EbsRootVolumeSize :10,
LogUri : s3n://aws-logs-550775287661-cn-northwest-1/elasticmapreduce/ ,
AutoScalingRole : EMR_AutoScaling_DefaultRole
}
cluster_response = emr.run_job_flow(**param)
return cluster_response[JobFlowId]
# 获取 EMR 访问入口
def get_cluster_dns(cluster_id):
response = emr.describe_cluster(ClusterId=cluster_id)
return response[Cluster][MasterPublicDnsName]
# 等待集群创建完成
def wait_for_cluster_creation(cluster_id):
emr.get_waiter(cluster_running).wait(ClusterId=cluster_id)
# 关闭 EMR
def terminate_cluster(cluster_id):
emr.terminate_job_flows(JobFlowIds=[cluster_id])
调用测试
# 创建 6 台机器的集群(1 master,1 core,4 worker)cluster_id = create_cluster(biz_daily_2020_10_09)
# 阻塞直到创建成功
wait_for_cluster_creation(cluster_id)
# dns 相当于虚拟机的 ssh 地址,每次都不同
# ssh 登录这个地址可以提交 spark 命令了,这里使用 Airflow 的 SSHOperator 模拟登录并提交命令
cluster_dns = get_cluster_dns(cluster_id)
# 关闭集群
terminate_cluster(cluster_id)
3. 其他坑
Airflow 1.9.0 的时间模板 {{ds}} 生成的是格林尼治时间,要改为我国时间,需手动加 8 小时,不知道新版本是否支持本地时间。
ssh 登录 dns 用户名 hadoop,这个用户是 AWS 生成的,似乎无法修改。
看完上述内容,你们掌握如何通过 AWS EMR 降低集群计算成本的方法了吗?如果还想学到更多技能或想了解更多相关内容,欢迎关注丸趣 TV 行业资讯频道,感谢各位的阅读!