快速开始

简要概述

通过监控事件然后触发工作流,以实现基于事件驱动的自动化系统。

消息总线

它作为事件源和传感器之间的数据传输通道,资源定义为:

apiVersion: argoproj.io/v1alpha1
kind: EventBus
metadata:
  name: default
spec:
  # 支持以下 5 种配置,同一时间仅可配置一项
  nats:
    native: {}
    exotic: {}
  jetstream: {}
  kafka: {}
  jetstreamExotic: {}

事件源产生发布事件,而传感器订阅事件以执行触发器,消息总线有三类实现方式:

  1. Nats(已弃用)
  2. Jetstream
  3. Kafka

数据结构定义见 EventBus

事件源

它主要把第三方公有服务产生的事件进一步将转换为基于 cloudevents 标准化的格式,并将它们发送到消息总线,资源定义为:

apiVersion: argoproj.io/v1alpha1
kind: EventSource
metadata:
  name: source-test
spec:
  # 关联哪个消息总线,默认为 "default"
  eventBusName: default
  # 副本数量
  replicas: 1

  # 支持公知服务的列表
  # 双主模式:开启多副本,则所有 pod 均可用用于服务
  sns: {}
  sqs: {}
  bitbucketserver: {}
  bitbucket: {}
  github: {}
  gitlab: {}
  storageGrid: {}
  slack: {}
  stripe: {}
  webhook: {}

  # 主从模式:多副本需进行选主
  # 消息总线使用 kafka 则利用 k8s lease 资源
  # 消息总线使用 nats 方案则使用该资源
  amqp: {}
  azureEventsHub: {}
  calendar: {}
  emitter: {}
  pubSub: {}
  generic: {}
  file: {}
  hdfs: {}
  kafka: {}
  minio: {}
  mqtt: {}
  nats: {}
  nsq: {}
  pulsar: {}
  redis: {}
  resource: {}

  # 未确认模式
  redisStream: {}
  azureServiceBus: {}
  azureQueueStorage: {}
  sftp: {}
  gerrit: {}

以上如果需混合多个事件源,则它们必须是相同类型的,相同类型是指:主从模式、双主模式。

数据结构定义见 EventSource

传感器

它监听消息总线上的事件,如果来自匹配到的事件时触发执行相应触发器,资源定义为:

apiVersion: argoproj.io/v1alpha1
kind: Sensor
metadata:
  name: sensor-test
spec:
  ...

数据结构定义见 Sensor

触发器

根据不同事件执行具体动作,资源定义为:

apiVersion: argoproj.io/v1alpha1
kind: Sensor
metadata:
  name: sensor-test
spec:
  ...
  # 可同时执行多个触发器,每个 "template" 下仅可配置一项
  triggers:
  - template:
      # 管理 k8s 相关资源,如 pod deployment service 等
      k8s: {}
  - template:
      # 管理 argo workflows 流水线
      argoWorkflow: {}
  - template:
      # 使用 http 调用指定地址
      http: {}
  - template:
      # 使用 AWS Lambda 服务
      awsLambda: {}
  - template:
      # 自定义的服务,通过实现 grpc service 与方法
      custom: {}
  - template:
      # 作为生产者发送消息至 kafka 集群
      kafka: {}
  - template:
      # 作为生产者发送消息至 nats 集群
      nats: {}
  - template:
      # 作为生产者发送消息至 slack 服务
      slack: {}
  - template:
      # 作为生产者发送消息至 openWhisk 服务框架
      openWhisk: {}
  - template:
      # 把接收到的消息直接终端输入用于调试目的
      log: {}
  - template:
      # 作为生产者发送消息至 azure event hub 服务
      azureEventHubs: {}
  - template:
      # 作为生产者发送消息至 pulsar 集群
      pulsar: {}
  - template:
      # 作为生产者发送消息至 azure service hub 服务
      azureServiceBus: {}
  - template:
      # 作为生产者发送消息至 email 服务
      email: {}

数据结构定义见 Trigger

安装部署

见:https://argoproj.github.io/argo-events/quick_start/

消息总线

自动创建 NATS 集群

apiVersion: argoproj.io/v1alpha1
kind: EventBus
metadata:
  name: default
  namespace: argo-events
spec:
  nats:
    native:
      replicas: 3
      auth: none

复用已有 NATS 集群

apiVersion: argoproj.io/v1alpha1
kind: EventBus
metadata:
  name: test
  namespace: argo-events
spec:
  nats:
    exotic:
      url: nats://10.5.39.39:4222
      clusterID: test

使用 Kafka 集群

通过 SCRAM-SHA-256 认证连接 kafka 集群:

  • 创建认证信息
kubectl create secret generic kafka-cred -n argo-events \
	--from-literal=username=uptime \
	--from-literal=password=testkey

或者

apiVersion: v1
kind: Secret
metadata:
  name: kafka-cred
  namespace: argo-events
stringData:
  password: testkey
  username: uptime
type: Opaque
  • 连接集群
apiVersion: argoproj.io/v1alpha1
kind: EventBus
metadata:
  name: test
  namespace: argo-events
spec:
  kafka:
    url: 10.5.119.40:9092,10.5.119.96:9092,10.5.119.235:9092
    topic: uptime-alertmanager
    sasl:
      mechanism: SCRAM-SHA-256
      userSecret:
        key: username
        name: kafka-cred
      passwordSecret:
        key: password
        name: kafka-cred
    consumerGroup:
      groupName: uptime

以上可配置参数说明可参考数据结构KafkaEventSource 或者 KafkaBus

  • 集群至少满足 acl 权限

需开启 “auto.create.topics.enable=true” 自动创建 topic 或者满足以下 acl 规则:

事件源

TODO;

传感器

TODO;

触发器

TODO;




最后修改 2024.06.07: docs: update argo (81bc931)