🦁

Argo Workflow・Argo EventとAmazon SQSを利用したイベント駆動処理の実装

2024/08/07に公開

概要

KubernetesのArgoWorkflow/ArgoEventとAmazon SQSを利用して非同期のイベント駆動処理を実装する機会があったのでその実装内容について備忘録としてまとめていきます。

まず今回利用するリソースについて簡単に概要だけ確認していた後実際の実装について見ていきます。

ArgoWorkflowとは

Argo WorkflowとはオープンソースのコンテナネイティブエンジンでKubernetes上で並行ジョブをコントロールします。Argo WorkflowはKubernetes CRD(Custom Resource Definition)として実装されています。

特徴

  • それぞれのステップがコンテナとしてWorkflowが定義される
  • DAG(directed acyclic graph)を利用したタスクのシーケンスまたはタスク間の依存処理のcaptureとしてのマルチステップモデルのworkflow
  • Argo Workflowを利用することでマシーンラーニングやデータ処理のような計算量の大き処理を一瞬で容易に実行することが可能

https://argo-workflows.readthedocs.io/en/latest/

ArgoEventとは

Argo EventはイベントドリブンのworkflowでKurbernetesのための自動フレームワークです。
K8sのリソースやArgo Workflow,サーバーレスワークロードなどをトリガーすることができます。またイベントのソースとしてwebhook,S3,schedule,メッセージングキュー、gcp pubsub, sns, sqsなどを指定できます。

https://argoproj.github.io/argo-events/

Amazon SQSとは

Amazon SQS(Simple Queue Service)は分散されたソフトウェアシステムとコンポーネントを統合と分離ができる安全性、耐久性があるホストキューです。
デッドレターキューやコスト配分タグ、FIFOキューなどの一般的な構造を提供しています。
AWS SDKをサポートする任意のプログラミング言語で利用できるようになっておりプログラムからも簡単に利用できるようになっています。

https://docs.aws.amazon.com/ja_jp/AWSSimpleQueueService/latest/SQSDeveloperGuide/welcome.html

実装内容の概要

今回は ArgoEvent関連のリソースである EventSource, Sensor, EventBus、並びにEvent SourceとしてAWS SQSを利用してSQSのキューにメッセージが送信されたことをトリガーとしてKubernetesのPodを起動してキューに送られたメッセージのbodyの内容を出力するということをやっていきます。

※なお、今回はArgoEventなどのargo project関連の実行に必要な権限の付与のためのrolebindやserviceNameなどの設定の説明については割愛しています。

ArgoEvent関連のリソースの実装

Event Source

Event Sourceはその名の通り、イベント駆動処理のイベント発生ソースを記述するリソースになります。
AWSのSNSやSQS, GCPのPubsubや Webhookなど20以上のソースが指定可能になっています。

https://argoproj.github.io/argo-events/concepts/event_source/

実装例は下記のようになります。

apiVersion: argoproj.io/v1alpha1
kind: EventSource
metadata:
  name: sample-event-source
spec:
  sqs:
    default:
      # jsonBody specifies that all event body payload coming from this
      # source will be JSON
      jsonBody: true
      # accessKey contains information about K8s secret that stores the access key
      accessKey:
        # Key within the K8s secret whose corresponding value (must be base64 encoded) is access key
        key: sample-access-key
        # Name of the K8s secret that contains the access key
        name: sample-service
      # secretKey contains information about K8s secret that stores the secret key
      secretKey:
        # Key within the K8s secret whose corresponding value (must be base64 encoded) is secret key
        key: sample-secret-key
        # Name of the K8s secret that contains the secret key
        name: sample-service
      # namespace: argo-events
      # aws region
      region: ap-northeast-1
      # name of the queue. The eventsource resolves the url of the queue from the queue name.
      queue: sample-service-sqs
      # AWS Account Id that created the queue
      queueAccountId: "111111111111"      
      # The duration (in seconds) for which the call waits for a message to arrive in the queue before returning.
      # MUST BE > 0 AND <= 20
      waitTimeSeconds: 20
  
  • まずmetadataでlabelなどを指定します。

  • specでeventsourceのソースとなるリソースを指定します。今回はSQSをソースに想定しているのでsqsと指定しています。
    https://github.com/argoproj/argo-events/blob/master/api/event-source.md#eventsource

  • accessKey,secretKeyでK8sの secretリソースに保存しているawsのアクセスキーとシークレットアクセスキー保存ようのプロパティを指定します。SQSに接続するために必要になります。

  • regitonやqueue, queueAccountIdでそれぞれSQSがあるAWSのリージョン、SQSのキュー名、acount idを指定します。

  • waitTimeSecondsは呼び出しが戻る前にキューにメッセージが到着するまでの待ち時間を指定します

参考:
https://github.com/argoproj/argo-events/blob/master/api/event-source.md#argoproj.io/v1alpha1.SQSEventSource

examples
https://github.com/argoproj/argo-events/tree/master/examples/event-sources

Sensor

SensorはEventの依存関係(input, source)とトリガー(outputs)の一連のセットを定義するリソースです。Eventbus上のeventを監視してイベントが発生した場合にトリガーを実行します。

https://argoproj.github.io/argo-events/concepts/sensor/

Sensorの実装例は次のようになります。

apiVersion: argoproj.io/v1alpha1
kind: Sensor
metadata:
  name: sample-sensor
  labels:
    service: sample-service
spec:
  template:
    serviceAccountName: sample-service
  dependencies:
    - name: sample-service-sqs
      eventSourceName: sample-event-source
      eventName: default
  triggers:
    - template:
      name: sqs-workflow
      k8s:
        operation: submit
        source:
          resource:
            apiVersion: argoproj.io/v1alpha1
            kind: Workflow
            metadata:
              generateName: sample-service-workflow
              name: sample-service-workflow
            spec:
              entrypoint: whalesay
              arguments:
                parameters:
                - name: message
                  # デフォルト値。parametersに指定しているで上書きされます。
                  value: hello world
              templates:
              - name: whalesay
                inputs:
                  parameters:
                  - name: message
                container:
                  image: docker/whalesay:latest
                  command: [cowsay]
                  args: ["{{inputs.parameters.message}}"]
        parameters:
          - src:
              dependencyName: sample-service-sqs
              dataKey: body
            dest: spec.arguments.parameters.0.value

トップレベルでmetadataやSensorSpecを指定するためのspecなどを指定します。
https://github.com/argoproj/argo-events/blob/master/api/sensor.md#argoproj.io/v1alpha1.Sensor

SensorSpecにはEventの依存関係(input)をdependeciesフィールドに指定します。
Eventのトリガーをtriggersフィールドに指定します。
templateフィールドにSensorで起動するpodのspecificationを指定します。今回はServiceAccountNameを指定してます。

https://github.com/argoproj/argo-events/blob/master/api/sensor.md#argoproj.io/v1alpha1.SensorSpec

dependencies

dependenciesフィールドのnameにはEventの依存関係のリソース名を指定します。今回は
sample-service-sqsという名前を指定してます。名前はユニークである必要があります。
eventSourceNameはSensorが依存しているEventSourceの名前を指定します。前節で見たEventSourceのsample-event-sourceを指定しています。
eventNameはその名の通りenvet名を指定するもので今回はdefaultとしています。

https://github.com/argoproj/argo-events/blob/master/api/sensor.md#argoproj.io/v1alpha1.EventDependency

triggers

triggerフィールドにはアクションを実行したり、結果を出力したりなどのEventの作成、メッセージの送信のための記述を行います。

主なフィールドはtemplateフィールドで実行するtriggerの仕様を記述します。
そのほかはtrigger template の定義に適用されるparameterやpolicyなどを指定できます。

https://github.com/argoproj/argo-events/blob/master/api/sensor.md#argoproj.io/v1alpha1.Trigger

templateフィールドにはnameで名前を指定し、さらに実行するアクションの種類に応じたtriggerを指定します。 標準のk8sリソースをトリガーする場合は k8sを、 ArgoWorkflowをトリガーしたい場合は argoWorkflowを http reqeustを実行したい場合は httpを指定します。そのほかにも slackemailなども指定できます。

https://github.com/argoproj/argo-events/blob/master/api/sensor.md#argoproj.io/v1alpha1.TriggerTemplate

今回は k8sを指定して通常のKubernetesのリソースを定義するときと同じようにして実行したいtriggerを記述しています。今回は KindにWorkflowを指定して受けとたメッセージを標準出力するようなtemplateを記載しています。メッセージは parametersでSQSのキューに送られたメッセージの値で上書きするように指定しています。

https://github.com/argoproj/argo-events/blob/master/api/sensor.md#argoproj.io/v1alpha1.StandardK8STrigger

また、k8sではなくArgoWorfklowTemplateを使ってtriggerを指定する場合は次のような記述になります。

  triggers:
    - template:
        name: sqs-workflow
        argoWorkflow:
          operation: submit
          source:
            resource:
              apiVersion: argoproj.io/v1alpha1
              kind: Workflow
              metadata:
                generateName: sample-service
                name: argo-workflow-trigger
              spec:
                workflowTemplateRef:
                  name: argo-workflow-trigger  # 内部 `WorkflowTemplate` の名前
                arguments:
                  parameters:
                    - name: message
                      value: ""  # デフォルト値。parametersで上書き
          parameters:
            - src:
                dependencyName: sample-service-sqs
                dataKey: body
              dest: spec.arguments.parameters.0.value

参考
examples: https://github.com/argoproj/argo-events/tree/master/examples/sensors

EventBus

EventBusはEventSourtとSensorを接続することでArgo Eventのトランスポート層として機能します。
EventBusにはNATS(deprecated)、Jetstream, Kafkの三つの実装があります。

https://argoproj.github.io/argo-events/concepts/eventbus/

実装例は下記になります。

apiVersion: argoproj.io/v1alpha1
kind: EventBus
metadata:
  name: default
spec:
  nats:
    native:
      # Optional, defaults to 3. If it is < 3, set it to 3, that is the minimal requirement.
      replicas: 3

EventBusの実装に今回は natsを指定してます。そのほかに replicasフィールドでreplica数に3を指定しています。

https://github.com/argoproj/argo-events/blob/master/api/event-bus.md#eventbus

NATSBus
https://github.com/argoproj/argo-events/blob/master/api/event-bus.md#argoproj.io/v1alpha1.NATSBus

examples
https://github.com/argoproj/argo-events/tree/master/examples/eventbus

検証

aws cliなどで対象のSQSにメッセージを送信するとSensorでそのEventをキャッチしてtriggerに記載の内容でアクションを実行してくれます。
今回のk8sのアクションの場合はEventのトリガー自体はほぼ遅延なくすぐに実行され、Podを立ち上げてからstepを実行するまでに数秒要するというような形でこれ自体は ArgoWorkflowでジョブを実行するのとほぼ同じ時間でスムーズにEventがトリガーされることを確認できました。

aws sqs send-message --queue-url https://sqs.ap-northeast-1.amazonaws.com/111111111111/sample-service-sqs --message-body '{"message":"sample message"}'

まとめ

ArgoEventを利用することで簡単にKubernetes環境でEvent駆動の処理を実行できることが分かりました。
SensorやEventSource, EventBusなどの実態としてはこれらのリソースから実際のアクションを実行するためのPodを起動する形になります。権限が足りない等などの何らかのバグの原因を調査する場合はそのPodのログを見ていく形になり通常のk8sのリソースの調査と同様に行うことが確認できました。

Discussion