共计 9812 个字符,预计需要花费 25 分钟才能阅读完成。
这篇文章主要介绍了 flink 中如何实现基于 k8s 的环境搭建,具有一定借鉴价值,感兴趣的朋友可以参考下,希望大家阅读完这篇文章之后大有收获,下面让丸趣 TV 小编带着大家一起了解一下。
前面写了一些 flink 的基础组件,但是还没有说过 flink 的环境搭建,现在我们来说下基本的环境搭建
1. 使用 StatefulSet 的原因
对于 Flink 来说,使用 sts 的最大的原因是 pod 的 hostname 是有序的;这样潜在的好处有
hostname 为 - 0 和 - 1 的 pod 可以直接指定为 jobmanager;可以使用一个 statefulset 启动一个 cluster,而 deployment 必须 2 个;Jobmanager 和 TaskManager 分别独立的 deployment
pod 由于各种原因 fail 后,由于 StatefulSet 重新拉起的 pod 的 hostname 不变,集群 recover 的速度理论上可以比 deployment 更快(deployment 每次主机名随机)
2. 使用 StatefulSet 部署 Flink
2.1 docker 的 entrypoint
由于要由主机名来判断是启动 jobmanager 还是 taskmanager,因此需要在 entrypoint 中去匹配设置的 jobmanager 的主机名是否有一致
传入参数为:cluster ha;则自动根据主机名判断启动那个角色;也可以直接指定角色名称
docker-entrypoint.sh 的脚本内容如下:
#!/bin/sh
# If unspecified, the hostname of the container is taken as the JobManager address
ACTION_CMD= $1
# if use cluster model, pod ${JOB_CLUSTER_NAME}-0,${JOB_CLUSTER_NAME}-1 as jobmanager
if [ ${ACTION_CMD} == cluster ]; then
jobmanagers=(${JOB_MANGER_HOSTS
//,/ })
ACTION_CMD= taskmanager
for i in ${!jobmanagers[@]}
do
if [ $(hostname -s) == ${jobmanagers[i]} ]; then
ACTION_CMD= jobmanager
echo pod hostname match jobmanager config host, change action to jobmanager.
fi
done
# if ha model, replace ha configuration
if [ $2 == ha ]; then
sed -i -e s|high-availability.cluster-id: cluster-id|high-availability.cluster-id: ${FLINK_CLUSTER_IDENT
}|g $FLINK_CONF_DIR/flink-conf.yaml
sed -i -e s|high-availability.zookeeper.quorum: localhost:2181|high-availability.zookeeper.quorum: ${FLINK_ZK_QUORUM
}|g $FLINK_CONF_DIR/flink-conf.yaml
sed -i -e s|state.backend.fs.checkpointdir: checkpointdir|state.backend.fs.checkpointdir: hdfs:///user/flink/flink-checkpoints/${FLINK_CLUSTER_IDENT
}|g $FLINK_CONF_DIR/flink-conf.yaml
sed -i -e s|high-availability.storageDir: hdfs:///flink/ha/|high-availability.storageDir: hdfs:///user/flink/ha/${FLINK_CLUSTER_IDENT
}|g $FLINK_CONF_DIR/flink-conf.yaml
if [ ${ACTION_CMD} == help ]; then
echo Usage: $(basename $0) (cluster ha|jobmanager|taskmanager|local|help)
exit 0
elif [ ${ACTION_CMD} == jobmanager ]; then
JOB_MANAGER_RPC_ADDRESS=${JOB_MANAGER_RPC_ADDRESS:-$(hostname -f)}
echo Starting Job Manager
sed -i -e s/jobmanager.rpc.address: localhost/jobmanager.rpc.address: ${JOB_MANAGER_RPC_ADDRESS}/g $FLINK_CONF_DIR/flink-conf.yaml
sed -i -e s/jobmanager.heap.mb: 1024/jobmanager.heap.mb: ${JOB_MANAGER_HEAP_MB
}/g $FLINK_CONF_DIR/flink-conf.yaml
echo config file: grep ^[^\n#] $FLINK_CONF_DIR/flink-conf.yaml
exec $FLINK_HOME/bin/jobmanager.sh start-foreground cluster
elif [ ${ACTION_CMD} == taskmanager ]; then
TASK_MANAGER_NUMBER_OF_TASK_SLOTS
=${TASK_MANAGER_NUMBER_OF_TASK_SLOTS
:-$(grep -c ^processor /proc/cpuinfo)}
echo Starting Task Manager
sed -i -e s/taskmanager.heap.mb: 1024/taskmanager.heap.mb: ${TASK_MANAGER_HEAP_MB
}/g $FLINK_CONF_DIR/flink-conf.yaml
sed -i -e s/taskmanager.numberOfTaskSlots: 1/taskmanager.numberOfTaskSlots: $TASK_MANAGER_NUMBER_OF_TASK_SLOTS
/g $FLINK_CONF_DIR/flink-conf.yaml
echo config file: grep ^[^\n#] $FLINK_CONF_DIR/flink-conf.yaml
exec $FLINK_HOME/bin/taskmanager.sh start-foreground
elif [ ${ACTION_CMD} == local ]; then
echo Starting local cluster
exec $FLINK_HOME/bin/jobmanager.sh start-foreground local
exec $@
2.2. 使用 ConfigMap 分发 hdfs 和 flink 配置文件
ConfigMap 介绍参考:
https://kubernetes.io/docs/tasks/configure-pod-container/configure-pod-configmap/#create-configmaps-from-files
Q:为什么使用 ConfigMap
A:由于 hadoop 配置文件在不同的环境不一样,不方便打包到镜像里面;因此合适的方式就只有 2 种,使用 ConfigMap 和 Pod 的 InitContainer。使用 InitContainer 的话,可以 wget 获取远程的一个配置文件,但是这样还需要依赖一个配置服务。相比而已,ConfigMap 更简单。
创建 ConfigMap
[root@rc-mzgjg ~]# kubectl create configmap hdfs-conf –from-file=hdfs-site.xml –from-file=core-site.xml
[root@rc-mzgjg ~]# kubectl create configmap flink-conf –from-file=flink-conf/log4j-console.properties –from-file=flink-conf/flink-conf.yaml
使用 describe 命令查看创建的名词为 hdfs-conf 的 ConfigMap,会显示文件的内容到控制台
[root@rc-mzgjg ~]# kubectl describe configmap hdfs-conf
Name: hdfs-conf
Namespace: default
Labels: none
Annotations: none
Data
====
core-site.xml:
通过 volumeMounts 使用 ConfigMap
Pod 的 Container 要使用配置文件,则可以通过 volumeMounts 方式挂载到 Container 中。如下 demo 所示,将配置文件挂载到 /home/xxxx/conf/hadoop 目录下
apiVersion: apps/v1
kind: StatefulSet
metadata:
name: flink-jm
spec:
selector:
matchLabels:
app: flink-jm
serviceName: flink-jm
replicas: 2
podManagementPolicy: Parallel
template:
metadata:
labels:
app: flink-jm
spec:
terminationGracePeriodSeconds: 2
containers:
- name: test
imagePullPolicy: Always
image: ip:5000/test:latest
args: [sleep , 1d]
volumeMounts:
- name: hdfs-conf
mountPath: /home/xxxx/conf/hadoop
volumes:
- name: hdfs-conf
configMap:
# Provide the name of the ConfigMap containing the files you want to add to the container
name: hdfs-conf
创建好 Pod 后,查看配置文件的挂载
[hadoop@flink-jm-0 hadoop]$ ll /home/xxxx/conf/hadoop
total 0
lrwxrwxrwx. 1 root root 20 Apr 9 06:54 core-site.xml – ..data/core-site.xml
lrwxrwxrwx. 1 root root 20 Apr 9 06:54 hdfs-site.xml – ..data/hdfs-site.xml
配置文件是链接到了..data 目录
1.10 才能支持 Pod 多 Container 的 namespace 共享
最初的想法是一个 Pod 里面多个 Container,然后配置文件是其中一个 Container;测试验证起数据目录并不能互相访问;如预想的配置,其中一个 Container 里面的 image 是 hdfs-conf 的配置文件
containers:
- name: hdfs-conf
imagePullPolicy: Always
image: ip:5000/hdfs-dev:2.6
args: [sleep , 1d]
- name: flink-jm
imagePullPolicy: Always
image: ip:5000/flink:1.4.2
实际验证,两个 Container 的只能共享网络,文件目录彼此看不见
“Share Process Namespace between Containers in a Pod”这个是 Kubernates 1.10 才开始支持,参考
https://kubernetes.io/docs/tasks/configure-pod-container/share-process-namespace/
2.3 StatefulSet 的配置
Flink 的配置文件和 hadoop 的配置文件,依赖 ConfigMap 来分发
环境变量名称
参数
内容
说明
FLINK_CLUSTER_IDENT
namespace/StatefulSet.name
default/flink-cluster
用来做 zk ha 设置和 hdfs checkpiont 的根目录
FLINK_ZK_QUORUM
env:FLINK_ZK_QUORUM
ip:2181
HA ZK 的地址
JOB_MANAGER_HEAP_MB
env:JOB_MANAGER_HEAP_MB
value:containers.resources.memory.limit -1024
512JM 的 Heap 大小,由于存在 Netty 的堆外内存,需要小于 container.resources.memory.limits;否则容易 OOM kill
JOB_MANGER_HOSTS
StatefulSet.name-0,StatefulSet.name-1
flink-cluster-0,flink-cluster-1
JM 的主机名,短主机名;可以不用 FQDN
TASK_MANAGER_HEAP_MB
env:TASK_MANAGER_HEAP_MB
value: containers.resources.memory.limit -1024
512TM 的 Heap 大小,由于存在 Netty 的堆外内存,需要小于 container.resources.memory.limits;否则容易 OOM kill
TASK_MANAGER_NUMBER_OF_TASK_SLOTS
containers.resources.cpu.limits
2TM 的 slot 数量,根据 resources.cpu.limits 来设置
Pod 的 imagePullPolicy 策略,测试环境 Always,每次都 pull,方便验证;线上则是 IfNotPresent;线上如果对 images 做了变更,必须更改 images 的 tag
完整的内容可以参考如下:
# headless service for statefulset apiVersion: v1 kind: Service metadata: name: flink-cluster labels: app: flink-cluster spec: clusterIP: None ports: - port: 8080 name: ui selector: app: flink-cluster # create flink statefulset apiVersion: apps/v1 kind: StatefulSet metadata: name: flink-cluster spec: selector: matchLabels: app: flink-cluster serviceName: flink-cluster replicas: 4 podManagementPolicy: Parallel template: metadata: labels: app: flink-cluster spec: terminationGracePeriodSeconds: 2 containers: - name: flink-cluster imagePullPolicy: Always image: ip:5000/flink:1.4.2 args: [cluster , ha] volumeMounts: - name: hdfs-conf mountPath: /home/xxxx/conf/hadoop - name: flink-conf mountPath: /home/xxxx/conf/flink - name: flink-log mountPath: /home/xxxx/logs resources: requests: memory: 1536Mi cpu: 1 limits: memory: 1536Mi cpu: 2 env: - name:JOB_MANGER_HOSTS
value:flink-cluster-0,flink-cluster-1
- name:FLINK_CLUSTER_IDENT
value:default/flink-cluster
- name:TASK_MANAGER_NUMBER_OF_TASK_SLOTS
value: 2 - name:FLINK_ZK_QUORUM
value:ip:2181
- name:JOB_MANAGER_HEAP_MB
value: 512 - name:TASK_MANAGER_HEAP_MB
value: 512 ports: - containerPort: 6124 name: blob - containerPort: 6125 name: query - containerPort: 8080 name: flink-ui volumes: - name: hdfs-conf configMap: # Provide the name of the ConfigMap containing the files you want to add to the container name: hdfs-conf - name: flink-conf configMap: name: flink-conf - name: flink-log hostPath: # directory location on host path: /tmp # this field is optional type: Directory
3. 测试环境对外暴露 Flink UI
由于测试环境使用 Flannel 进行网络通信,在 K8S 集群外部无法访问到 Flink UI 的 IP 和端口,因此需要通过 NodePort 方式将内部 IP 映射出来。配置如下:
# only for test k8s cluster
# use service to expose flink jobmanager 0 s web port
apiVersion: v1
kind: Service
metadata:
labels:
app: flink-cluster
statefulset.kubernetes.io/pod-name: flink-cluster-0
name: flink-web-0
namespace: default
spec:
ports:
- port: 8080
protocol: TCP
targetPort: 8080
selector:
app: flink-cluster
statefulset.kubernetes.io/pod-name: flink-cluster-0
type: NodePort
# use service to expose flink jobmanager 1 s web port
apiVersion: v1
kind: Service
metadata:
labels:
app: flink-cluster
statefulset.kubernetes.io/pod-name: flink-cluster-1
name: flink-web-1
namespace: default
spec:
ports:
- port: 8080
protocol: TCP
targetPort: 8080
selector:
app: flink-cluster
statefulset.kubernetes.io/pod-name: flink-cluster-1
type: NodePort
4. 服务部署状态
执行完前面操作后,可以查看到当前的 StatefulSet 状态
[root@rc-mzgjg ~]# kubectl get sts flink-cluster -o wide
NAME DESIRED CURRENT AGE CONTAINERS IMAGES
flink-cluster 4 4 1h flink-cluster ip:5000/flink:1.4.2
容器的 Pod 状态
[root@rc-mzgjg ~]# kubectl get pod -l app=flink-cluster -o wide
NAME READY STATUS RESTARTS AGE IP NODE
flink-cluster-0 1/1 Running 0 1h ip1 ip5
flink-cluster-1 1/1 Running 0 1h ip2 ip6
flink-cluster-2 1/1 Running 0 1h ip3 ip7
flink-cluster-3 1/1 Running 0 1h ip4 ip8
相关的 Service 信息
[root@rc-mzgjg ~]# kubectl get svc -l app=flink-cluster -o wide
NAME TYPE CLUSTER-IP EXTERNAL-IP PORT(S) AGE SELECTOR
flink-cluster ClusterIP None none 8080/TCP 2h app=flink-cluster
flink-web-0 NodePort 10.254.8.103 none 8080:30495/TCP 1h app=flink-cluster,statefulset.kubernetes.io/pod-name=flink-cluster-0
flink-web-1 NodePort 10.254.172.158 none 8080:30158/TCP 1h app=flink-cluster,statefulset.kubernetes.io/pod-name=flink-cluster-1
根据 Service 的信息;可以通过任何一个 k8s node 的 ip 地址加 PORT 来访问 Flink UI
这里主要说一下,在搭建的过程中遇到了一个和权限相关的问题
错误日志如下
ERROR setFile(null,true) call failed
FileNotFoundException:no such file or directory
原因:是因为 flink 服务缺少日志目录的权限
修改方式:
1.adduser flink 添加相应的用户
2.chown -R flink:flink /home/xxxx/logs
感谢你能够认真阅读完这篇文章,希望丸趣 TV 小编分享的“flink 中如何实现基于 k8s 的环境搭建”这篇文章对大家有帮助,同时也希望大家多多支持丸趣 TV,关注丸趣 TV 行业资讯频道,更多相关知识等着你来学习!