Argo Workflow・Argo EventとAmazon SQSを利用したイベント駆動処理の実装
概要
KubernetesのArgoWorkflow/ArgoEventとAmazon SQSを利用して非同期のイベント駆動処理を実装する機会があったのでその実装内容について備忘録としてまとめていきます。
まず今回利用するリソースについて簡単に概要だけ確認していた後実際の実装について見ていきます。
ArgoWorkflowとは
Argo WorkflowとはオープンソースのコンテナネイティブエンジンでKubernetes上で並行ジョブをコントロールします。Argo WorkflowはKubernetes CRD(Custom Resource Definition)として実装されています。
特徴
- それぞれのステップがコンテナとしてWorkflowが定義される
- DAG(directed acyclic graph)を利用したタスクのシーケンスまたはタスク間の依存処理のcaptureとしてのマルチステップモデルのworkflow
- Argo Workflowを利用することでマシーンラーニングやデータ処理のような計算量の大き処理を一瞬で容易に実行することが可能
ArgoEventとは
Argo EventはイベントドリブンのworkflowでKurbernetesのための自動フレームワークです。
K8sのリソースやArgo Workflow,サーバーレスワークロードなどをトリガーすることができます。またイベントのソースとしてwebhook,S3,schedule,メッセージングキュー、gcp pubsub, sns, sqsなどを指定できます。
Amazon SQSとは
Amazon SQS(Simple Queue Service)は分散されたソフトウェアシステムとコンポーネントを統合と分離ができる安全性、耐久性があるホストキューです。
デッドレターキューやコスト配分タグ、FIFOキューなどの一般的な構造を提供しています。
AWS SDKをサポートする任意のプログラミング言語で利用できるようになっておりプログラムからも簡単に利用できるようになっています。
実装内容の概要
今回は ArgoEvent関連のリソースである EventSource, Sensor, EventBus、並びにEvent SourceとしてAWS SQSを利用してSQSのキューにメッセージが送信されたことをトリガーとしてKubernetesのPodを起動してキューに送られたメッセージのbodyの内容を出力するということをやっていきます。
※なお、今回はArgoEventなどのargo project関連の実行に必要な権限の付与のためのrolebindやserviceNameなどの設定の説明については割愛しています。
ArgoEvent関連のリソースの実装
Event Source
Event Sourceはその名の通り、イベント駆動処理のイベント発生ソースを記述するリソースになります。
AWSのSNSやSQS, GCPのPubsubや Webhookなど20以上のソースが指定可能になっています。
実装例は下記のようになります。
apiVersion: argoproj.io/v1alpha1
kind: EventSource
metadata:
name: sample-event-source
spec:
sqs:
default:
# jsonBody specifies that all event body payload coming from this
# source will be JSON
jsonBody: true
# accessKey contains information about K8s secret that stores the access key
accessKey:
# Key within the K8s secret whose corresponding value (must be base64 encoded) is access key
key: sample-access-key
# Name of the K8s secret that contains the access key
name: sample-service
# secretKey contains information about K8s secret that stores the secret key
secretKey:
# Key within the K8s secret whose corresponding value (must be base64 encoded) is secret key
key: sample-secret-key
# Name of the K8s secret that contains the secret key
name: sample-service
# namespace: argo-events
# aws region
region: ap-northeast-1
# name of the queue. The eventsource resolves the url of the queue from the queue name.
queue: sample-service-sqs
# AWS Account Id that created the queue
queueAccountId: "111111111111"
# The duration (in seconds) for which the call waits for a message to arrive in the queue before returning.
# MUST BE > 0 AND <= 20
waitTimeSeconds: 20
-
まずmetadataでlabelなどを指定します。
-
specでeventsourceのソースとなるリソースを指定します。今回はSQSをソースに想定しているのでsqsと指定しています。
https://github.com/argoproj/argo-events/blob/master/api/event-source.md#eventsource -
accessKey,secretKeyでK8sの secretリソースに保存しているawsのアクセスキーとシークレットアクセスキー保存ようのプロパティを指定します。SQSに接続するために必要になります。
-
regitonやqueue, queueAccountIdでそれぞれSQSがあるAWSのリージョン、SQSのキュー名、acount idを指定します。
-
waitTimeSecondsは呼び出しが戻る前にキューにメッセージが到着するまでの待ち時間を指定します
参考:
examples
Sensor
SensorはEventの依存関係(input, source)とトリガー(outputs)の一連のセットを定義するリソースです。Eventbus上のeventを監視してイベントが発生した場合にトリガーを実行します。
Sensorの実装例は次のようになります。
apiVersion: argoproj.io/v1alpha1
kind: Sensor
metadata:
name: sample-sensor
labels:
service: sample-service
spec:
template:
serviceAccountName: sample-service
dependencies:
- name: sample-service-sqs
eventSourceName: sample-event-source
eventName: default
triggers:
- template:
name: sqs-workflow
k8s:
operation: submit
source:
resource:
apiVersion: argoproj.io/v1alpha1
kind: Workflow
metadata:
generateName: sample-service-workflow
name: sample-service-workflow
spec:
entrypoint: whalesay
arguments:
parameters:
- name: message
# デフォルト値。parametersに指定しているで上書きされます。
value: hello world
templates:
- name: whalesay
inputs:
parameters:
- name: message
container:
image: docker/whalesay:latest
command: [cowsay]
args: ["{{inputs.parameters.message}}"]
parameters:
- src:
dependencyName: sample-service-sqs
dataKey: body
dest: spec.arguments.parameters.0.value
トップレベルでmetadataやSensorSpecを指定するためのspecなどを指定します。
SensorSpecにはEventの依存関係(input)をdependecies
フィールドに指定します。
Eventのトリガーをtriggers
フィールドに指定します。
template
フィールドにSensorで起動するpodのspecificationを指定します。今回はServiceAccountNameを指定してます。
dependencies
dependencies
フィールドのnameにはEventの依存関係のリソース名を指定します。今回は
sample-service-sqsという名前を指定してます。名前はユニークである必要があります。
eventSourceName
はSensorが依存しているEventSourceの名前を指定します。前節で見たEventSourceのsample-event-sourceを指定しています。
eventName
はその名の通りenvet名を指定するもので今回はdefaultとしています。
triggers
triggerフィールドにはアクションを実行したり、結果を出力したりなどのEventの作成、メッセージの送信のための記述を行います。
主なフィールドはtemplate
フィールドで実行するtriggerの仕様を記述します。
そのほかはtrigger template の定義に適用されるparameterやpolicyなどを指定できます。
template
フィールドにはname
で名前を指定し、さらに実行するアクションの種類に応じたtriggerを指定します。 標準のk8sリソースをトリガーする場合は k8s
を、 ArgoWorkflowをトリガーしたい場合は argoWorkflow
を http reqeustを実行したい場合は http
を指定します。そのほかにも slack
やemail
なども指定できます。
今回は k8s
を指定して通常のKubernetesのリソースを定義するときと同じようにして実行したいtriggerを記述しています。今回は KindにWorkflowを指定して受けとたメッセージを標準出力するようなtemplateを記載しています。メッセージは parameters
でSQSのキューに送られたメッセージの値で上書きするように指定しています。
また、k8sではなくArgoWorfklowTemplateを使ってtriggerを指定する場合は次のような記述になります。
triggers:
- template:
name: sqs-workflow
argoWorkflow:
operation: submit
source:
resource:
apiVersion: argoproj.io/v1alpha1
kind: Workflow
metadata:
generateName: sample-service
name: argo-workflow-trigger
spec:
workflowTemplateRef:
name: argo-workflow-trigger # 内部 `WorkflowTemplate` の名前
arguments:
parameters:
- name: message
value: "" # デフォルト値。parametersで上書き
parameters:
- src:
dependencyName: sample-service-sqs
dataKey: body
dest: spec.arguments.parameters.0.value
参考
examples: https://github.com/argoproj/argo-events/tree/master/examples/sensors
EventBus
EventBusはEventSourtとSensorを接続することでArgo Eventのトランスポート層として機能します。
EventBusにはNATS(deprecated)、Jetstream, Kafkの三つの実装があります。
実装例は下記になります。
apiVersion: argoproj.io/v1alpha1
kind: EventBus
metadata:
name: default
spec:
nats:
native:
# Optional, defaults to 3. If it is < 3, set it to 3, that is the minimal requirement.
replicas: 3
EventBusの実装に今回は nats
を指定してます。そのほかに replicas
フィールドでreplica数に3を指定しています。
NATSBus
examples
検証
aws cliなどで対象のSQSにメッセージを送信するとSensorでそのEventをキャッチしてtriggerに記載の内容でアクションを実行してくれます。
今回のk8sのアクションの場合はEventのトリガー自体はほぼ遅延なくすぐに実行され、Podを立ち上げてからstepを実行するまでに数秒要するというような形でこれ自体は ArgoWorkflowでジョブを実行するのとほぼ同じ時間でスムーズにEventがトリガーされることを確認できました。
aws sqs send-message --queue-url https://sqs.ap-northeast-1.amazonaws.com/111111111111/sample-service-sqs --message-body '{"message":"sample message"}'
まとめ
ArgoEventを利用することで簡単にKubernetes環境でEvent駆動の処理を実行できることが分かりました。
SensorやEventSource, EventBusなどの実態としてはこれらのリソースから実際のアクションを実行するためのPodを起動する形になります。権限が足りない等などの何らかのバグの原因を調査する場合はそのPodのログを見ていく形になり通常のk8sのリソースの調査と同様に行うことが確認できました。
Discussion