共计 5617 个字符,预计需要花费 15 分钟才能阅读完成。
这篇“Argo Event 云原生的事件驱动架构怎么用”文章的知识点大部分人都不太理解,所以丸趣 TV 小编给大家总结了以下内容,内容详细,步骤清晰,具有一定的借鉴价值,希望大家阅读完这篇文章能有所收获,下面我们一起来看看这篇“Argo Event 云原生的事件驱动架构怎么用”文章吧。
1、Argo Event 简介
Event 事件大家都很熟悉,可以说 Kubernetes 就是完全由事件驱动的,不同的 controller manager 本质就是实现了不同的事件处理函数,比如所有 ReplicaSet 对象是由 ReplicaSetController 控制器管理,该控制器通过 Informer 监听 ReplicaSet 以及其关联的 Pod 的事件变化,从而维持运行状态和我们声明 spec 保持一致。
当然 Kubernetes 无论是什么 Controller,其监听和处理的都是内部事件,而在应用层上我们也有很多外部事件,比如 CICD 事件、Webhook 事件、日志事件等等,如何处理这些事件呢,目前 Kubernetes 原生是无法实现的。
当然你可以自己实现一个 event handler 运行在 Kubernetes 平台,不过实现难度也不小。而 Argo Event 组件完美解决了这个问题。
如图是 Argo Event 官方提供的的流程图:
首先事件源 EventSource 可以是 Webhook、S3、Github、SQS 等等,中间会经过一个叫 Gateway(新版本叫 EventBus) 的组件,更准确地说老版本原来 gateway 的配置功能已经合并到 EventSource 了,EventBus 是新引入的组件,后端默认基于高性能分布式消息中间件 NATS[1] 实现,当然其他中间件比如 Kafka 也是可以的。
这个 EventBus 可以看做是事件的一个消息队列,消息生产者连接 EvenSource,EventSource 又连接到 Sensor。更详细地说 EvenSource 把事件发送给 EvenBus,Sensor 会订阅 EvenBus 的消息队列,EvenBus 负责把事件转发到已订阅该事件的 Sensor 组件,EventSorce 在上图中没有体现,具体设计文档可以参考 Argo-events Enhancement Proposals[2]。
有些人可能会说为什么 EventBus 不直接到 Trigger,中间引入一个 Sensor,这主要是两个原因,一是为了使事件转发和处理松耦合,二是为了实现 Trigger 事件的参数化,通过 Sensor 不仅可以实现事件的过滤,还可以实现事件的参数化,比如后面的 Trigger 是创建一个 Kubernetes Pod,那这个 Pod 的 metadata、env 等,都可以根据事件内容进行填充。
Sensor 组件注册关联了一个或者多个触发器,这些触发器可以触发 AWS Lambda 事件、Argo Workflow 事件、Kubernetes Objects 等,通俗简单地说,可以执行 Lambda 函数,可以动态地创建 Kubernetes 的对象或者创建前面的介绍的 Workflow。
还记得前面介绍的 Argo Rollout 吗,我们演示了手动 promote 实现应用发布或者回滚,通过 Argo Event 就可以很完美地和测试平台或者 CI/CD 事件结合起来,实现自动应用自动发布或者回滚。
2、一个简单的 Webhook 例子
关于 Argo Event 的部署非常简单,直接通过 kubecl apply 或者 helm 均可,可以参考文档 Installation[3],这里不再赘述。
Argo Event 部署完成后注意还需要部署 EventBus,官方推荐使用 NATS 中间件,文档中有部署 NATS stateful 的文档。
接下来我们以一个最简单的 Webhook 事件为例,从而了解 Argo Event 的几个组件功能以及用法。
首先按照前面的介绍,我们需要先定义 EventSource:
apiVersion: argoproj.io/v1alpha1
kind: EventSource
metadata:
name: webhook
spec:
service:
ports:
- port: 12000
targetPort: 12000
webhook:
webhook_example:
port: 12000
endpoint: /webhook
method: POST
这个 EventSource 定义了一个 webhook webhook_example,端口为 12000,路径为 /webhook,一般 Webhook 为 POST 方法,因此该 Webhhok 处理器我们配置只接收 POST 方法。
为了把这个 Webhook EventSource 暴露,我们还创建了一个 Service,端口也是 12000。
此时我们可以手动 curl 该 Service:
# kubectl get svc -l eventsource-name=webhook
NAME TYPE CLUSTER-IP EXTERNAL-IP PORT(S) AGE
webhook-eventsource-svc ClusterIP 10.96.93.24 none 12000/TCP 5m49s
# curl -X POST -d {} 10.96.93.24:12000/webhook
success
当然此时由于没有注册任何的 Sensor,因此什么都不会发生。
接下来我们定义 Sensor:
首先在 dependencies 中定义了订阅的 EventSource 以及具体的 Webhook,由于一个 EventSource 可以定义多个 Webhook,因此必须同时指定 EventSource 和 Webhook 两个参数。
在 Trigger 中我们定义了对应 Action 为 create 一个 workflow,这个 workflow 的 spec 定义在 resource 中配置。
最后的 parameters 部分定义了 workflow 的参数,这些参数值从 event 中获取,这里我们会把整个 event 都当作 workflow 的 input。当然你可以通过 dataKey 只汲取 body 部分:dataKey: body.message。
此时我们再次 curl 这个 webhook 事件:
curl -X POST -d {message : HelloWorld!} 10.96.93.24:12000/webhook
此时我们获取 argo workflow 列表发现新创建了一个实例:
# argo list
NAME STATUS AGE DURATION PRIORITY
webhook-8xt4s Succeeded 1m 18s 0
查看 workflow 输出如下:
由于我们是把整个 event 作为 workflow input 发过去的,因此 data 内容部分是 base64 编码,我们可以查看解码后的内容如下:
{
header : {
Accept : [
*/*
],
Content-Length : [
26
],
Content-Type : [
application/x-www-form-urlencoded
],
User-Agent : [
curl/7.58.0
]
},
body : {
message : HelloWorld!
}
}
从这里我们也可以看出 Event 包含两个部分,一个是 context,一个是 data,data 中又包含 header 部分以及 body 部分,在 parameters 中可以通过 Key 获取任意部分内容。
如上的 webhook 触发是通过手动 curl 的,你可以很容易地在 github 或者 bitbucket 上配置到 webhook 中,这样一旦代码有更新就能触发这个事件了。
3、Kubernetes 触发 AWS Lambda 函数
前面的例子中的 EventSource 使用了 Webhook,除了 Webhook,Argo Event 还支持很多的 EventSource,比如:
amqp
aws-sns
aws-sqs
github/gitlab
hdfs
kafka
redis
Kubernetes resource
…
Trigger 也同样支持很多,比如:
aws lambda
amqp
kafka
…
如上官方都提供了非常丰富的例子,可以参考 argo events examples[4]。
这里以 Kubernetes resource 事件源为例,这个事件监听 Kubernetes 的资源状态,比如 Pod 创建、删除等,这里以创建 Pod 为例:
apiVersion: argoproj.io/v1alpha1
kind: EventSource
metadata:
name: k8s-resource-demo
spec:
template:
serviceAccountName: argo-events-sa
resource:
pod_demo:
namespace: argo-events
version: v1
resource: pods
eventTypes:
- ADD
filter:
afterStart: true
labels:
- key: app
operation: ==
value: my-pod
如上例子监听 Pods 的 ADD 事件,即创建 Pod,filter 中过滤只有包含 app=my-pod 标签的 Pod,特别需要注意的是使用的 serviceaccount argo-events-sa 必须具有 Pod 的 list、watch 权限。
接下来我们使用 AWS Lambda 触发器,Lambda 函数已经在 AWS 提前创建好:
这个 Lambda 函数很简单,直接返回 event 本身。
创建 Sensor 如下:
apiVersion: argoproj.io/v1alpha1
kind: Sensor
metadata:
name: aws-lambda-trigger-demo
spec:
template:
serviceAccountName: argo-events-sa
dependencies:
- name: test-dep
eventSourceName: k8s-resource-demo
eventName: pod_demo
triggers:
- template:
name: lambda-trigger
awsLambda:
functionName: hello
accessKey:
name: aws-secret
key: accesskey
secretKey:
name: aws-secret
key: secretkey
namespace: argo-events
region: cn-northwest-1
payload:
- src:
dependencyName: test-dep
dataKey: body.name
dest: name
如上 AWS access key 和 access secret 需要提前放到 aws-secret 中。
此时我们创建一个新的 Pod my-pod:
apiVersion: v1
kind: Pod
metadata:
labels:
app: my-pod
name: my-pod
spec:
containers:
- image: nginx
name: my-pod
dnsPolicy: ClusterFirst
restartPolicy: Always
当 Pod 启动后,我们发现 AWS Lambda 函数被触发执行:
4、event filter
前面的例子中 webhook 中所有的事件都会被 sensor 触发,我们有时不需要处理所有的事件,Argo Event 支持基于 data 以及 context 过滤,比如我们只处理 message 为 hello 或者为 hey 的事件,其他消息忽略,只需要在原来的 dependencies 中 test-dep 增加 filter 即可:
dependencies:
- name: test-dep
eventSourceName: webhook
eventName: webhook_example
filters:
- name: data-filter
data:
- path: body.message
type: string
value:
- hello
- hey
filter 指定了基于 data 过滤,过滤的字段为 body.message,匹配的内容为 hello、hey。
5、trigger policy
trigger policy 主要用来判断最后触发器执行的结果是成功还是失败,如果是创建 Kubernetes 资源比如 Workflow,可以根据 Workflow 最终状态决定这个 Trigger 的执行结果,而如果是触发一个 HTTP 或者 AWS Lambda,则需要自定义 policy status。
awsLambda:
functionName: hello
accessKey:
name: aws-secret
key: accesskey
secretKey:
name: aws-secret
key: secretkey
namespace: argo-events
region: us-east-1
payload:
- src:
dependencyName: test-dep
dataKey: body.message
dest: message
policy:
status:
allow:
- 200
- 201
如上表示当 AWS Lambda 返回 200 或者 201 时表示 Trigger 成功。
以上就是关于“Argo Event 云原生的事件驱动架构怎么用”这篇文章的内容,相信大家都有了一定的了解,希望丸趣 TV 小编分享的内容对大家有帮助,若想了解更多相关的知识内容,请关注丸趣 TV 行业资讯频道。