🍇

【KEDA】Azure Container Appsでイベントドリブン処理

2023/10/17に公開

準備
Azure Storage Queueによる連携
Azure Event Hubによる連携
Azure Service Busによる連携

Azure Container Apps(ACA)で様々なサービスをトリガーとしてイベントドリブン処理を行うことが可能です。

Azure Container Appsをジョブ処理に活用することで

  • 処理が無い時には0個までスケールインするサーバレス処理を定義できる
  • Linuxコンテナを用いてジョブ定義ができることにより選択幅が柔軟
  • 数十分以上かかるような長時間の処理や複数コンテナで並列処理したいといった関数サービスでは実現しづらい条件の処理も定義可能

等の利点が得られます。

準備

https://learn.microsoft.com/ja-jp/azure/container-apps/tutorial-event-driven-jobs
に則って準備を行います。(一部省略、修正しています)

ログインとツールの整備

az login
az extension add --name containerapp --upgrade
RESOURCE_GROUP="rg-aca-event-test"
LOCATION="japaneast"
ENVIRONMENT="env-aca-event-test"
CONTAINER_IMAGE_NAME="queue-reader-job:1.0"
CONTAINER_REGISTRY_NAME="cracaeventtest"

リソースグループの作成

az group create --name "$RESOURCE_GROUP" --location "$LOCATION"

Container Apps環境の作成

az containerapp env create --name "$ENVIRONMENT" --resource-group "$RESOURCE_GROUP" --location "$LOCATION"

コンテナレジストリの作成

az acr create \
    --name "$CONTAINER_REGISTRY_NAME" \
    --resource-group "$RESOURCE_GROUP" \
    --location "$LOCATION" \
    --sku Basic \
    --admin-enabled true

テスト用イメージの登録

az acr build --registry "$CONTAINER_REGISTRY_NAME" --image "$CONTAINER_IMAGE_NAME" "https://github.com/Azure-Samples/container-apps-event-driven-jobs-tutorial.git" --resource-group "$RESOURCE_GROUP"

ここまでで準備は完了です。

Azureストレージキューによる連携

ストレージキューの作成

STORAGE_ACCOUNT_NAME="acaeventteststorage"
QUEUE_NAME="myqueue"
az storage account create \
    --name "$STORAGE_ACCOUNT_NAME" \
    --resource-group "$RESOURCE_GROUP" \
    --location "$LOCATION" \
    --sku Standard_LRS \
    --kind StorageV2
az storage queue create \
    --name "$QUEUE_NAME" \
    --account-name "$STORAGE_ACCOUNT_NAME" \
    --connection-string "$QUEUE_CONNECTION_STRING"

キューへの接続文字列の取得

QUEUE_CONNECTION_STRING=`az storage account show-connection-string -g $RESOURCE_GROUP --name $STORAGE_ACCOUNT_NAME --query connectionString --output tsv`

コンテナジョブの登録

JOB_NAME="aca-event-test-job-storage-queue"
az containerapp job create \
    --name "$JOB_NAME" \
    --resource-group "$RESOURCE_GROUP" \
    --environment "$ENVIRONMENT" \
    --trigger-type "Event" \
    --replica-timeout "1800" \
    --replica-retry-limit "1" \
    --replica-completion-count "1" \
    --parallelism "1" \
    --min-executions "0" \
    --max-executions "10" \
    --polling-interval "60" \
    --scale-rule-name "queue" \
    --scale-rule-type "azure-queue" \
    --scale-rule-metadata "accountName=$STORAGE_ACCOUNT_NAME" "queueName=$QUEUE_NAME" "queueLength=1" \
    --scale-rule-auth "connection=connection-string-secret" \
    --image "$CONTAINER_REGISTRY_NAME.azurecr.io/$CONTAINER_IMAGE_NAME" \
    --cpu "0.5" \
    --memory "1Gi" \
    --secrets "connection-string-secret=$QUEUE_CONNECTION_STRING" \
    --registry-server "$CONTAINER_REGISTRY_NAME.azurecr.io" \
    --env-vars "AZURE_STORAGE_QUEUE_NAME=$QUEUE_NAME" "AZURE_STORAGE_CONNECTION_STRING=secretref:connection-string-secret"

上記のジョブでトリガーに関係する変数は以下のような意味を持ちます。

変数名 内容
trigger-type Event このジョブがイベントにより起動することを示す。他の指定にはManual、Scheduleがある
replica-timeout 1800 レプリカが実行できる最長時間。
replica-retry-limit 1 レプリカが失敗するまでの再試行の最大数。
replica-completion-count 1 ジョブを完遂するためにレプリカを再試行する回数。今回は1を指定しているが、3を指定すればトリガーに対して3回成功するまで実行される
parallelism 1 ジョブの実行ごとに開始するレプリカ数。
min-executions 0 ポーリング間隔ごとに実行するジョブの実行の最小数。今回は0だが、例えば1を指定するとポーリング間隔ごとにジョブを1回実行する
max-executions 10 ポーリング間隔ごとに実行するジョブ実行の最大数。
polling-interval 60 各イベント ソースを秒単位でチェックする間隔(秒)。 既定値は 30 秒。
scale-rule-name queue スケール ルールの名前。
scale-rule-type azure-queue スケール ルールの種類。ここではazure-queueが指定されていますが他のトリガーの指定方法については後述
scale-rule-metadata "accountName=$STORAGE_ACCOUNT_NAME" "queueName=$QUEUE_NAME" "queueLength=1" スケール ルールのメタデータ。メタデータの形式は "key=value"となる。本項目の指定方法については後述
scale-rule-auth "connection=connection-string-secret" スケーリングルール用の認証パラメーター。 認証パラメーターの形式は "key=value"となる。この場合ではazure-queueにアクセスするための認証方法

scale-rule-typescale-rule-type の指定方法についてはKEDAのスケーラー指定方法に則ったパラメータを与える必要があります。

https://keda.sh/docs/2.10/scalers/
Azure Storage Queueの場合には
https://keda.sh/docs/2.10/scalers/azure-storage-queue/
が該当します。

イベント発生と動作確認

キューにメッセージを投入します。

az storage message put \
    --content "Hello Queue Reader Job" \
    --queue-name "$QUEUE_NAME" \
    --connection-string "$QUEUE_CONNECTION_STRING"

ジョブ履歴を確認するコマンドでイベントに連動してジョブができていることを確認します。
(1分ごとにポーリングを行っているため、最大1分程度待ってから確認したほうが良いかもしれません)

az containerapp job execution list \
    --name "$JOB_NAME" \
    --resource-group "$RESOURCE_GROUP" \
    --output json
[
  {
    "id": "/subscriptions/xxxxxxxx-xxxx-xxxx-xxxx-xxxxxxxxxxxx/resourceGroups/rg-aca-event-test/providers/Microsoft.App/jobs/aca-event-test-job1/executions/aca-event-test-job1-vgvvb",
    "name": "aca-event-test-job1-vgvvb",
    "properties": {
      "endTime": "2023-10-17T10:26:07+00:00",
      "startTime": "2023-10-17T10:25:52+00:00",
      "status": "Succeeded",
      "template": {
        "containers": [
          {
            "env": [
              {
                "name": "AZURE_STORAGE_QUEUE_NAME",
                "value": "myqueue"
              },
              {
                "name": "AZURE_STORAGE_CONNECTION_STRING",
                "secretRef": "cappjob-aca-event-test-job1"
              },
              {
                "name": "CONTAINER_APP_JOB_NAME",
                "value": "aca-event-test-job1"
              }
            ],
            "image": "cracaeventtest.azurecr.io/queue-reader-job:1.0",
            "name": "aca-event-test-job1",
            "resources": {
              "cpu": 0.5,
              "memory": "1Gi"
            }
          }
        ],
        "initContainers": []
      }
    },
    "resourceGroup": "rg-aca-event-test",
    "type": "Microsoft.App/jobs/executions"
  }
]

startTimeとendTimeがトリガーされた時間を示していることを確認してください。

Event Hubsによる連携

Event Hubs名前空間の作成

EVENT_HUB_NAMESPACE="aca-event-test-space$RANDOM"

上記の名前はユニークである必要があるためランダムを付与しています

az eventhubs namespace create --name $EVENT_HUBS_NAMESPACE --resource-group $RESOURCE_GROUP -l $LOCATION

Event Hubsの作成

EVENT_HUB_NAME="aca-evnt-test-HUB$RANDOM"
az eventhubs eventhub create --name $EVENT_HUB_NAME --resource-group $RESOURCE_GROUP --namespace-name $EVENT_HUB_NAMESPACE

Event Hubs接続文字列の生成

イベントハブの設定 > 共有アクセスポリシー > 追加からSASポリシーを追加します。

権限はリッスンのみで可。
登録されたら、開いて以下のような接続文字列を取得し、EVENTHUB_CONNECTION_STRINGという変数に保持します。

EVENTHUB_CONNECTION_STRING="Endpoint=sb://aca-event-test-space.servicebus.windows.net/;SharedAccessKeyName=test1;SharedAccessKey=XXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXX=;EntityPath=aca-evnt-test-hub11193"

ストレージアカウントへのBLOBコンテナの追加

  • ストレージアカウントにBLOBコンテナを追加します

  • ストレージ接続用文字列を取得します

STORAGE_CONNECTION_STRING=`az storage account show-connection-string -g $RESOURCE_GROUP --name $STORAGE_ACCOUNT_NAME --query connectionString --output tsv`

EventHubからストレージアカウントへのキャプチャの設定

以下のようにEventHubの機能>キャプチャからストレージアカウントへのキャプチャ設定を追加します

コンテナジョブの登録

JOB_NAME="aca-event-test-job-event-hub"

az containerapp job create \
    --name "$JOB_NAME" \
    --resource-group "$RESOURCE_GROUP" \
    --environment "$ENVIRONMENT" \
    --trigger-type "Event" \
    --replica-timeout "1800" \
    --replica-retry-limit "1" \
    --replica-completion-count "1" \
    --parallelism "1" \
    --min-executions "0" \
    --max-executions "2" \
    --polling-interval "60" \
    --image "$CONTAINER_REGISTRY_NAME.azurecr.io/$CONTAINER_IMAGE_NAME" \
    --cpu "0.5" \
    --memory "1Gi" \
    --registry-server "$CONTAINER_REGISTRY_NAME.azurecr.io" \
    --secrets "connection-string-secret=$EVENTHUB_CONNECTION_STRING" "connecting-storage-string-secret=$STORAGE_CONNECTION_STRING" \
    --scale-rule-name "eventhub" \
    --scale-rule-type "azure-eventhub" \
    --scale-rule-metadata "eventHubNamespace=aca-event-test-space" "eventHubName=aca-evnt-test-hub11193" "blobContainer=eventhubcapture" "consumerGroup=\$Default" "checkpointStrategy=blobMetadata"\
    --scale-rule-auth "connection=connection-string-secret" "storageConnection=connecting-storage-string-secret"

Event Hubsへのイベント送信

Event HubsにはJava, Pytohn, .NET, Go, Javascriptなど様々な言語に接続用のライブラリが用意されています。
ここではPythonによるイベント送信サンプルスクリプトを使用してみます。

  • 必要パッケージのインストール
pip install azure-eventhub
pip install azure-identity
pip install aiohttp
  • ロールの付与
    Event Hubs名前空間のIAMから「Azure Event Hubs データ送信者」ロールを自身のユーザーに付与してください。

  • スクリプトの用意

send_event_hub.py
import os
from datetime import datetime
import asyncio
import json

from azure.eventhub import EventData
from azure.eventhub.aio import EventHubProducerClient
from azure.identity.aio import DefaultAzureCredential

EVENT_HUB_FULLY_QUALIFIED_NAMESPACE = os.environ["EVENT_HUB_FULLY_QUALIFIED_NAMESPACE"]
EVENT_HUB_NAME = os.environ["EVENT_HUB_NAME"]

credential = DefaultAzureCredential()

async def run():
    # Create a producer client to send messages to the event hub.
    # Specify a credential that has correct role assigned to access
    # event hubs namespace and the event hub name.
    producer = EventHubProducerClient(
        fully_qualified_namespace=EVENT_HUB_FULLY_QUALIFIED_NAMESPACE,
        eventhub_name=EVENT_HUB_NAME,
        credential=credential,
    )
    async with producer:
        # Create a batch.
        event_data_batch = await producer.create_batch()

        json_object =[
            {
                "timestamp": datetime.utcnow().isoformat() + "Z",
                "message":"hello, event hub!",
                "key1": "value1",
                "key2": "value2",
                "key3": "value3",
                "nestedKey": {
                    "nestedKey1": "nestedValue1"
                },
                "arrayKey": [
                    "arrayValue1",
                    "arrayValue2"
                ]
            }
        ]
        json_string = json.dumps(json_object, indent=4)
        # Add events to the batch.
        event_data_batch.add(EventData(json_string))

        # Send the batch of events to the event hub.
        await producer.send_batch(event_data_batch)

        # Close credential when no longer needed.
        await credential.close()

asyncio.run(run())

上記のスクリプトはチュートリアルのスクリプトを検証しやすく修正したものです。

  • スクリプトの実行
EVENT_HUB_FULLY_QUALIFIED_NAMESPACE="aca-event-test-space.servicebus.windows.net"
python send_event_hub.py

上記によってEventHubsにイベント登録が発行->設定によりBlobストレージにキャプチャされコンテナジョブがそれを検知して動作します。

Storage Queueではコマンドで動作確認を行いましたが、コンテナジョブのMonitoring > Execution historyから確認することも可能です。

Service Busによる連携

Service Busの作成

export SERVICE_BUS_NAMESPACE=ksservicebus20231023
export SERVICE_BUS_QUEUE_NAME=kssbqueue

Service Busの名前空間名はドメインに用いられるためユニークである必要があります。
適宜変更してください。

・Service Bus名前空間の作成

az servicebus namespace create --resource-group $RESOURCE_GROUP --name $SERVICE_BUS_NAMESPACE --location $LOCATION

・Service Busキューの作成

az servicebus queue create --resource-group $RESOURCE_GROUP --namespace-name $SERVICE_BUS_NAMESPACE --name $SERVICE_BUS_QUEUE_NAME

・SB接続文字列の取得

export SERVICE_BUS_CONNECTION_STR=`az servicebus namespace authorization-rule keys list --resource-group $RESOURCE_GROUP --namespace-name $SERVICE_BUS_NAMESPACE --name RootManageSharedAccessKey --query primaryConnectionString --output tsv`

コンテナジョブの登録

JOB_NAME=aca-event-test-job-service-bus

az containerapp job create \
    --name "$JOB_NAME" \
    --resource-group "$RESOURCE_GROUP" \
    --environment "$ENVIRONMENT" \
    --trigger-type "Event" \
    --replica-timeout "1800" \
    --replica-retry-limit "1" \
    --replica-completion-count "1" \
    --parallelism "1" \
    --min-executions "0" \
    --max-executions "2" \
    --polling-interval "60" \
    --image "$CONTAINER_REGISTRY_NAME.azurecr.io/$CONTAINER_IMAGE_NAME" \
    --cpu "0.5" \
    --memory "1Gi" \
    --registry-server "$CONTAINER_REGISTRY_NAME.azurecr.io" \
    --secrets "connection-string-secret=$SERVICE_BUS_CONNECTION_STR" \
    --scale-rule-name "servicebus" \
    --scale-rule-type "azure-servicebus" \
    --scale-rule-metadata "queueName=$SERVICE_BUS_QUEUE_NAME" "namespace=$SERVICE_BUS_NAMESPACE"\
    --scale-rule-auth "connection=connection-string-secret"

scale-ruleパラメータの指定方法については以下を参照してください
https://keda.sh/docs/2.12/scalers/azure-service-bus/

イベントの発行

pip install azure-servicebus
pip install azure-identity
pip install aiohttp

送信用Pythonスクリプト

import asyncio
import os
from azure.servicebus.aio import ServiceBusClient
from azure.servicebus import ServiceBusMessage
from azure.identity.aio import DefaultAzureCredential

NAMESPACE_CONNECTION_STR = os.environ["SERVICE_BUS_CONNECTION_STR"]
QUEUE_NAME = os.environ["SERVICE_BUS_QUEUE_NAME"]


credential = DefaultAzureCredential()

async def send_single_message(sender):
    # Create a Service Bus message and send it to the queue
    message = ServiceBusMessage("Single Message")
    await sender.send_messages(message)
    print("Sent a single message")

async def send_a_list_of_messages(sender):
    # Create a list of messages and send it to the queue
    messages = [ServiceBusMessage("Message in list") for _ in range(5)]
    await sender.send_messages(messages)
    print("Sent a list of 5 messages")


async def send_batch_message(sender):
    # Create a batch of messages
    async with sender:
        batch_message = await sender.create_message_batch()
        for _ in range(10):
            try:
                # Add a message to the batch
                batch_message.add_message(ServiceBusMessage("Message inside a ServiceBusMessageBatch"))
            except ValueError:
                # ServiceBusMessageBatch object reaches max_size.
                # New ServiceBusMessageBatch object can be created here to send more data.
                break
        # Send the batch of messages to the queue
        await sender.send_messages(batch_message)
    print("Sent a batch of 10 messages")

async def run():
    # create a Service Bus client using the connection string
    async with ServiceBusClient.from_connection_string(
        conn_str=NAMESPACE_CONNECTION_STR,
        logging_enable=True) as servicebus_client:
        # Get a Queue Sender object to send messages to the queue
        sender = servicebus_client.get_queue_sender(queue_name=QUEUE_NAME)
        async with sender:
            # Send one message
            await send_single_message(sender)
            # Send a list of messages
            await send_a_list_of_messages(sender)
            # Send a batch of messages
            await send_batch_message(sender)

asyncio.run(run())
print("Done sending messages")
print("-----------------------")

上記を実行すると、ポータルでジョブが実行されることを確認できます

(コンテナ中でキューからのメッセージ受け取り処理をしていない限りキューにはメッセージが残り続けるため、ポーリングのためにジョブが起動してしまう点にはご注意ください)


以下をトリガーとしたイベント連携方法を追記予定です

  • Azure Log Analytics
  • Azure Monitor

参考:

https://techblog.ap-com.co.jp/entry/2022/12/14/203000

Discussion