【KEDA】Azure Container Appsでイベントドリブン処理
準備
Azure Storage Queueによる連携
Azure Event Hubによる連携
Azure Service Busによる連携
Azure Container Apps(ACA)で様々なサービスをトリガーとしてイベントドリブン処理を行うことが可能です。
Azure Container Appsをジョブ処理に活用することで
- 処理が無い時には0個までスケールインするサーバレス処理を定義できる
- Linuxコンテナを用いてジョブ定義ができることにより選択幅が柔軟
- 数十分以上かかるような長時間の処理や複数コンテナで並列処理したいといった関数サービスでは実現しづらい条件の処理も定義可能
等の利点が得られます。
準備
に則って準備を行います。(一部省略、修正しています)
ログインとツールの整備
az login
az extension add --name containerapp --upgrade
RESOURCE_GROUP="rg-aca-event-test"
LOCATION="japaneast"
ENVIRONMENT="env-aca-event-test"
CONTAINER_IMAGE_NAME="queue-reader-job:1.0"
CONTAINER_REGISTRY_NAME="cracaeventtest"
リソースグループの作成
az group create --name "$RESOURCE_GROUP" --location "$LOCATION"
Container Apps環境の作成
az containerapp env create --name "$ENVIRONMENT" --resource-group "$RESOURCE_GROUP" --location "$LOCATION"
コンテナレジストリの作成
az acr create \
--name "$CONTAINER_REGISTRY_NAME" \
--resource-group "$RESOURCE_GROUP" \
--location "$LOCATION" \
--sku Basic \
--admin-enabled true
テスト用イメージの登録
az acr build --registry "$CONTAINER_REGISTRY_NAME" --image "$CONTAINER_IMAGE_NAME" "https://github.com/Azure-Samples/container-apps-event-driven-jobs-tutorial.git" --resource-group "$RESOURCE_GROUP"
ここまでで準備は完了です。
Azureストレージキューによる連携
ストレージキューの作成
STORAGE_ACCOUNT_NAME="acaeventteststorage"
QUEUE_NAME="myqueue"
az storage account create \
--name "$STORAGE_ACCOUNT_NAME" \
--resource-group "$RESOURCE_GROUP" \
--location "$LOCATION" \
--sku Standard_LRS \
--kind StorageV2
az storage queue create \
--name "$QUEUE_NAME" \
--account-name "$STORAGE_ACCOUNT_NAME" \
--connection-string "$QUEUE_CONNECTION_STRING"
キューへの接続文字列の取得
QUEUE_CONNECTION_STRING=`az storage account show-connection-string -g $RESOURCE_GROUP --name $STORAGE_ACCOUNT_NAME --query connectionString --output tsv`
コンテナジョブの登録
JOB_NAME="aca-event-test-job-storage-queue"
az containerapp job create \
--name "$JOB_NAME" \
--resource-group "$RESOURCE_GROUP" \
--environment "$ENVIRONMENT" \
--trigger-type "Event" \
--replica-timeout "1800" \
--replica-retry-limit "1" \
--replica-completion-count "1" \
--parallelism "1" \
--min-executions "0" \
--max-executions "10" \
--polling-interval "60" \
--scale-rule-name "queue" \
--scale-rule-type "azure-queue" \
--scale-rule-metadata "accountName=$STORAGE_ACCOUNT_NAME" "queueName=$QUEUE_NAME" "queueLength=1" \
--scale-rule-auth "connection=connection-string-secret" \
--image "$CONTAINER_REGISTRY_NAME.azurecr.io/$CONTAINER_IMAGE_NAME" \
--cpu "0.5" \
--memory "1Gi" \
--secrets "connection-string-secret=$QUEUE_CONNECTION_STRING" \
--registry-server "$CONTAINER_REGISTRY_NAME.azurecr.io" \
--env-vars "AZURE_STORAGE_QUEUE_NAME=$QUEUE_NAME" "AZURE_STORAGE_CONNECTION_STRING=secretref:connection-string-secret"
上記のジョブでトリガーに関係する変数は以下のような意味を持ちます。
変数名 | 値 | 内容 |
---|---|---|
trigger-type | Event |
このジョブがイベントにより起動することを示す。他の指定にはManual、Scheduleがある |
replica-timeout | 1800 |
レプリカが実行できる最長時間。 |
replica-retry-limit | 1 |
レプリカが失敗するまでの再試行の最大数。 |
replica-completion-count | 1 |
ジョブを完遂するためにレプリカを再試行する回数。今回は1を指定しているが、3を指定すればトリガーに対して3回成功するまで実行される |
parallelism | 1 |
ジョブの実行ごとに開始するレプリカ数。 |
min-executions | 0 |
ポーリング間隔ごとに実行するジョブの実行の最小数。今回は0だが、例えば1を指定するとポーリング間隔ごとにジョブを1回実行する |
max-executions | 10 |
ポーリング間隔ごとに実行するジョブ実行の最大数。 |
polling-interval | 60 |
各イベント ソースを秒単位でチェックする間隔(秒)。 既定値は 30 秒。 |
scale-rule-name | queue |
スケール ルールの名前。 |
scale-rule-type | azure-queue |
スケール ルールの種類。ここではazure-queue が指定されていますが他のトリガーの指定方法については後述 |
scale-rule-metadata | "accountName=$STORAGE_ACCOUNT_NAME" "queueName=$QUEUE_NAME" "queueLength=1" |
スケール ルールのメタデータ。メタデータの形式は "key=value"となる。本項目の指定方法については後述 |
scale-rule-auth | "connection=connection-string-secret" |
スケーリングルール用の認証パラメーター。 認証パラメーターの形式は "key=value"となる。この場合ではazure-queueにアクセスするための認証方法 |
scale-rule-type
と scale-rule-type
の指定方法についてはKEDAのスケーラー指定方法に則ったパラメータを与える必要があります。
Azure Storage Queueの場合には が該当します。
イベント発生と動作確認
キューにメッセージを投入します。
az storage message put \
--content "Hello Queue Reader Job" \
--queue-name "$QUEUE_NAME" \
--connection-string "$QUEUE_CONNECTION_STRING"
ジョブ履歴を確認するコマンドでイベントに連動してジョブができていることを確認します。
(1分ごとにポーリングを行っているため、最大1分程度待ってから確認したほうが良いかもしれません)
az containerapp job execution list \
--name "$JOB_NAME" \
--resource-group "$RESOURCE_GROUP" \
--output json
[
{
"id": "/subscriptions/xxxxxxxx-xxxx-xxxx-xxxx-xxxxxxxxxxxx/resourceGroups/rg-aca-event-test/providers/Microsoft.App/jobs/aca-event-test-job1/executions/aca-event-test-job1-vgvvb",
"name": "aca-event-test-job1-vgvvb",
"properties": {
"endTime": "2023-10-17T10:26:07+00:00",
"startTime": "2023-10-17T10:25:52+00:00",
"status": "Succeeded",
"template": {
"containers": [
{
"env": [
{
"name": "AZURE_STORAGE_QUEUE_NAME",
"value": "myqueue"
},
{
"name": "AZURE_STORAGE_CONNECTION_STRING",
"secretRef": "cappjob-aca-event-test-job1"
},
{
"name": "CONTAINER_APP_JOB_NAME",
"value": "aca-event-test-job1"
}
],
"image": "cracaeventtest.azurecr.io/queue-reader-job:1.0",
"name": "aca-event-test-job1",
"resources": {
"cpu": 0.5,
"memory": "1Gi"
}
}
],
"initContainers": []
}
},
"resourceGroup": "rg-aca-event-test",
"type": "Microsoft.App/jobs/executions"
}
]
startTimeとendTimeがトリガーされた時間を示していることを確認してください。
Event Hubsによる連携
Event Hubs名前空間の作成
EVENT_HUB_NAMESPACE="aca-event-test-space$RANDOM"
上記の名前はユニークである必要があるためランダムを付与しています
az eventhubs namespace create --name $EVENT_HUBS_NAMESPACE --resource-group $RESOURCE_GROUP -l $LOCATION
Event Hubsの作成
EVENT_HUB_NAME="aca-evnt-test-HUB$RANDOM"
az eventhubs eventhub create --name $EVENT_HUB_NAME --resource-group $RESOURCE_GROUP --namespace-name $EVENT_HUB_NAMESPACE
Event Hubs接続文字列の生成
イベントハブの設定 > 共有アクセスポリシー > 追加からSASポリシーを追加します。
権限はリッスンのみで可。
登録されたら、開いて以下のような接続文字列を取得し、EVENTHUB_CONNECTION_STRING
という変数に保持します。
EVENTHUB_CONNECTION_STRING="Endpoint=sb://aca-event-test-space.servicebus.windows.net/;SharedAccessKeyName=test1;SharedAccessKey=XXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXX=;EntityPath=aca-evnt-test-hub11193"
ストレージアカウントへのBLOBコンテナの追加
-
ストレージアカウントにBLOBコンテナを追加します
-
ストレージ接続用文字列を取得します
STORAGE_CONNECTION_STRING=`az storage account show-connection-string -g $RESOURCE_GROUP --name $STORAGE_ACCOUNT_NAME --query connectionString --output tsv`
EventHubからストレージアカウントへのキャプチャの設定
以下のようにEventHubの機能>キャプチャからストレージアカウントへのキャプチャ設定を追加します
コンテナジョブの登録
JOB_NAME="aca-event-test-job-event-hub"
az containerapp job create \
--name "$JOB_NAME" \
--resource-group "$RESOURCE_GROUP" \
--environment "$ENVIRONMENT" \
--trigger-type "Event" \
--replica-timeout "1800" \
--replica-retry-limit "1" \
--replica-completion-count "1" \
--parallelism "1" \
--min-executions "0" \
--max-executions "2" \
--polling-interval "60" \
--image "$CONTAINER_REGISTRY_NAME.azurecr.io/$CONTAINER_IMAGE_NAME" \
--cpu "0.5" \
--memory "1Gi" \
--registry-server "$CONTAINER_REGISTRY_NAME.azurecr.io" \
--secrets "connection-string-secret=$EVENTHUB_CONNECTION_STRING" "connecting-storage-string-secret=$STORAGE_CONNECTION_STRING" \
--scale-rule-name "eventhub" \
--scale-rule-type "azure-eventhub" \
--scale-rule-metadata "eventHubNamespace=aca-event-test-space" "eventHubName=aca-evnt-test-hub11193" "blobContainer=eventhubcapture" "consumerGroup=\$Default" "checkpointStrategy=blobMetadata"\
--scale-rule-auth "connection=connection-string-secret" "storageConnection=connecting-storage-string-secret"
Event Hubsへのイベント送信
Event HubsにはJava, Pytohn, .NET, Go, Javascriptなど様々な言語に接続用のライブラリが用意されています。
ここではPythonによるイベント送信サンプルスクリプトを使用してみます。
- 必要パッケージのインストール
pip install azure-eventhub
pip install azure-identity
pip install aiohttp
-
ロールの付与
Event Hubs名前空間のIAMから「Azure Event Hubs データ送信者」ロールを自身のユーザーに付与してください。 -
スクリプトの用意
import os
from datetime import datetime
import asyncio
import json
from azure.eventhub import EventData
from azure.eventhub.aio import EventHubProducerClient
from azure.identity.aio import DefaultAzureCredential
EVENT_HUB_FULLY_QUALIFIED_NAMESPACE = os.environ["EVENT_HUB_FULLY_QUALIFIED_NAMESPACE"]
EVENT_HUB_NAME = os.environ["EVENT_HUB_NAME"]
credential = DefaultAzureCredential()
async def run():
# Create a producer client to send messages to the event hub.
# Specify a credential that has correct role assigned to access
# event hubs namespace and the event hub name.
producer = EventHubProducerClient(
fully_qualified_namespace=EVENT_HUB_FULLY_QUALIFIED_NAMESPACE,
eventhub_name=EVENT_HUB_NAME,
credential=credential,
)
async with producer:
# Create a batch.
event_data_batch = await producer.create_batch()
json_object =[
{
"timestamp": datetime.utcnow().isoformat() + "Z",
"message":"hello, event hub!",
"key1": "value1",
"key2": "value2",
"key3": "value3",
"nestedKey": {
"nestedKey1": "nestedValue1"
},
"arrayKey": [
"arrayValue1",
"arrayValue2"
]
}
]
json_string = json.dumps(json_object, indent=4)
# Add events to the batch.
event_data_batch.add(EventData(json_string))
# Send the batch of events to the event hub.
await producer.send_batch(event_data_batch)
# Close credential when no longer needed.
await credential.close()
asyncio.run(run())
上記のスクリプトはチュートリアルのスクリプトを検証しやすく修正したものです。
- スクリプトの実行
EVENT_HUB_FULLY_QUALIFIED_NAMESPACE="aca-event-test-space.servicebus.windows.net"
python send_event_hub.py
上記によってEventHubsにイベント登録が発行->設定によりBlobストレージにキャプチャされコンテナジョブがそれを検知して動作します。
Storage Queueではコマンドで動作確認を行いましたが、コンテナジョブのMonitoring > Execution historyから確認することも可能です。
Service Busによる連携
Service Busの作成
export SERVICE_BUS_NAMESPACE=ksservicebus20231023
export SERVICE_BUS_QUEUE_NAME=kssbqueue
Service Busの名前空間名はドメインに用いられるためユニークである必要があります。
適宜変更してください。
・Service Bus名前空間の作成
az servicebus namespace create --resource-group $RESOURCE_GROUP --name $SERVICE_BUS_NAMESPACE --location $LOCATION
・Service Busキューの作成
az servicebus queue create --resource-group $RESOURCE_GROUP --namespace-name $SERVICE_BUS_NAMESPACE --name $SERVICE_BUS_QUEUE_NAME
・SB接続文字列の取得
export SERVICE_BUS_CONNECTION_STR=`az servicebus namespace authorization-rule keys list --resource-group $RESOURCE_GROUP --namespace-name $SERVICE_BUS_NAMESPACE --name RootManageSharedAccessKey --query primaryConnectionString --output tsv`
コンテナジョブの登録
JOB_NAME=aca-event-test-job-service-bus
az containerapp job create \
--name "$JOB_NAME" \
--resource-group "$RESOURCE_GROUP" \
--environment "$ENVIRONMENT" \
--trigger-type "Event" \
--replica-timeout "1800" \
--replica-retry-limit "1" \
--replica-completion-count "1" \
--parallelism "1" \
--min-executions "0" \
--max-executions "2" \
--polling-interval "60" \
--image "$CONTAINER_REGISTRY_NAME.azurecr.io/$CONTAINER_IMAGE_NAME" \
--cpu "0.5" \
--memory "1Gi" \
--registry-server "$CONTAINER_REGISTRY_NAME.azurecr.io" \
--secrets "connection-string-secret=$SERVICE_BUS_CONNECTION_STR" \
--scale-rule-name "servicebus" \
--scale-rule-type "azure-servicebus" \
--scale-rule-metadata "queueName=$SERVICE_BUS_QUEUE_NAME" "namespace=$SERVICE_BUS_NAMESPACE"\
--scale-rule-auth "connection=connection-string-secret"
scale-ruleパラメータの指定方法については以下を参照してください
イベントの発行
pip install azure-servicebus
pip install azure-identity
pip install aiohttp
送信用Pythonスクリプト
import asyncio
import os
from azure.servicebus.aio import ServiceBusClient
from azure.servicebus import ServiceBusMessage
from azure.identity.aio import DefaultAzureCredential
NAMESPACE_CONNECTION_STR = os.environ["SERVICE_BUS_CONNECTION_STR"]
QUEUE_NAME = os.environ["SERVICE_BUS_QUEUE_NAME"]
credential = DefaultAzureCredential()
async def send_single_message(sender):
# Create a Service Bus message and send it to the queue
message = ServiceBusMessage("Single Message")
await sender.send_messages(message)
print("Sent a single message")
async def send_a_list_of_messages(sender):
# Create a list of messages and send it to the queue
messages = [ServiceBusMessage("Message in list") for _ in range(5)]
await sender.send_messages(messages)
print("Sent a list of 5 messages")
async def send_batch_message(sender):
# Create a batch of messages
async with sender:
batch_message = await sender.create_message_batch()
for _ in range(10):
try:
# Add a message to the batch
batch_message.add_message(ServiceBusMessage("Message inside a ServiceBusMessageBatch"))
except ValueError:
# ServiceBusMessageBatch object reaches max_size.
# New ServiceBusMessageBatch object can be created here to send more data.
break
# Send the batch of messages to the queue
await sender.send_messages(batch_message)
print("Sent a batch of 10 messages")
async def run():
# create a Service Bus client using the connection string
async with ServiceBusClient.from_connection_string(
conn_str=NAMESPACE_CONNECTION_STR,
logging_enable=True) as servicebus_client:
# Get a Queue Sender object to send messages to the queue
sender = servicebus_client.get_queue_sender(queue_name=QUEUE_NAME)
async with sender:
# Send one message
await send_single_message(sender)
# Send a list of messages
await send_a_list_of_messages(sender)
# Send a batch of messages
await send_batch_message(sender)
asyncio.run(run())
print("Done sending messages")
print("-----------------------")
上記を実行すると、ポータルでジョブが実行されることを確認できます
(コンテナ中でキューからのメッセージ受け取り処理をしていない限りキューにはメッセージが残り続けるため、ポーリングのためにジョブが起動してしまう点にはご注意ください)
以下をトリガーとしたイベント連携方法を追記予定です
- Azure Log Analytics
- Azure Monitor
参考:
Discussion