🐙

Azure Event Hubs のイベントを契機にして Azure Functions の関数を起動する(イベント駆動で関数を動かす)

8 min read

Event トリガーにして Azure Functions の関数を動かす手順です。

「接続文字列 - 主キー」の設定などがわかりづらかったので、手順をまとめます。

Event Hubs を作成する

リソースの作成 → Event Hubs → 作成

で、Event Hubs の「名前空間の作成」を行います。

サブスクリプションとリソースグループは任意のものを選びます。

  • 名前空間の名前「sample-azfunc-basic-eventhub」とします
  • 場所「東アジア」
  • 価格レベル「Basic」

で作成します。

sample-azfunc-basic-eventhub のデプロイが完了しました、という表示が出たら、「リソースに移動」をクリックします。

リソースに移動したら、真ん中のメニューの「+イベントハブ」をクリックしてイベントハブを作成します。

  • 名前「samples-workitems」

とします。

作成後、画面を見ると、「sample-azfunc-basic-eventhub」という「Event Hubs 名前空間」ですが、下の方に「名前 samples-workitems」ができています。

「Event Hubs 名前空間」の左メニューに「共有アクセスポリシー」があります。

クリックすると「RootManageSharedAccessKey」ポリシーが表示されます。

「RootManageSharedAccessKey」をクリックすると、主キーというものが表示されます。この主キーを使って、Event Hubs 名前空間にアクセスできます。

ただし、本来であれば管理用のキーを使うべきではなく、別のキーを作成するのが良いでしょう。

JavaScript (azure/event-hubs) を使用して Event Hubs との間でイベントを送受信する を参考に、 Event Hubs にイベントを送信するコードを書いていきます。

npm install @azure/event-hubs

JavaScript のコードは公式ガイドにちょっと補足します。

src/send.js
const { EventHubProducerClient } = require("@azure/event-hubs");

const connectionString =
  "Endpoint=sb://[Event Hubs の名前空間].servicebus.windows.net/;SharedAccessKeyName=RootManageSharedAccessKey;SharedAccessKey=[ここに共有アクセスポリシーで取得した主キーが入る]";
const eventHubName = "Event Hubs 名前空間に属する Event Hub の名前を設定する";

async function main() {
  const producer = new EventHubProducerClient(connectionString, eventHubName);

  const batch = await producer.createBatch();
  batch.tryAdd({ body: "1つめのイベント" });
  batch.tryAdd({ body: "2つめのイベント" });
  batch.tryAdd({ body: "3つめのイベント" });

  await producer.sendBatch(batch);

  await producer.close();

  console.log("Event Hubs に3つのイベントを送信しました。");
}

main().catch((err) => {
  console.log("エラーが発生しました:", err);
});

Azure の公式ガイドでは connectionStringeventHubName に何が入るのかわかりづらかったのですが、

const connectionString = "EVENT HUBS NAMESPACE CONNECTION STRING";
const eventHubName = "EVENT HUB NAME";

まず connectionString には「接続文字列 - 主キー」が入ります。

Event Hubs 名前空間 の左メニュー「共有アクセスポリシー」をクリックしたら、RootManagesSharedAccessKey なるキーが出てくると思いますので、クリックすると、右側に SAS ポリシー なるものが表示されます。

ここに「接続文字列 - 主キー」があります(本番運用では新たにキーを作っておくのが普通なようです)

eventHubNameには「Event Hubs 名前空間」の左メニューの「エンティティ - Event Hubs」 をクリックして表示される「名前」を入れます。

存在しない場合は「エンティティ - Event Hubs」をクリックして「+イベントハブ」をクリックして、新たに Event Hub を作ります。

この記事の流れだと const eventHubName = "samples-workitems" となりますね。

node コマンドでスクリプトを実行すると、イベントを送信できます。

関数を作成する

「リソースの作成」→「関数アプリ」をクリックします。

「関数アプリ」→「+作成」をクリックすると、「関数アプリの作成」が表示されます。

  • 関数アプリ名:eventhub-app
  • ランタイム スタック: Node.js
  • バージョン:14 LTS
  • 地域:Japan East

で、「確認および作成」

次のホスティングで適当なストレージアカウントを作成し、オペレーティングシステムは Linux、プランはサーバレスにします。

確認および作成をクリックして、関数アプリを作成します。

関数アプリを開くと、左側に「関数」というメニューが表示されるので、クリックします。

すると、右側に「関数の作成」というページが出てきます。

コマンドラインで関数プロジェクトを作ってみなさい、と言ってます。

# 依存関係のインストール
npm install -g azure-functions-core-tools@4 --unsafe-perm true

# Azure Functions のプロジェクトを作成する
func init

# 関数を作成する
func new

# 関数プロジェクトをローカルで実行する
func start

func init でローカルに Azure Functions のプロジェクトを作成し、その中に func new で関数を作っていくイメージです。

Azure Functions と Event Hub を連携させるにあたり、迷うのは

「関数の local.settings.jsonfunction.json に何をどう書くか」

だと思います。

以下の StackOverFlow が参考になりました。

Azure Function app : Microsoft.Azure.WebJobs.EventHubs: Value cannot be null. (Parameter 'receiverConnectionString')

func new して作成した「Event Hub トリガーの関数プロジェクト」のルートディレクトリにはlocal.settings.jsonなるファイルがあります。

local.settings.json:関数をローカルで実行するときに使用される設定を保持します。 Azure で実行している場合、これらの設定は使用されません
local.settings.json ファイルにはシークレットを含めることができるため、それをプロジェクト ソース管理から除外する必要があります。
Visual Studio を使用する Azure Functions の開発

この local.settings.json に EventHub の接続文字列を設定します。

local.settings.json
{
  "IsEncrypted": false,
  "Values": {
    "FUNCTIONS_WORKER_RUNTIME": "node",
    "AzureEventHubConnectionString": "Endpoint=sb://[Event Hub の名前空間].servicebus.windows.net/;SharedAccessKeyName=RootManageSharedAccessKey;SharedAccessKey=[主キー]",
    "AzureWebJobsStorage": "DefaultEndpointsProtocol=https;AccountName=[ストレージ名];AccountKey=[アクセスキー];EndpointSuffix=core.windows.net"
  }
}

ここの AzureEventHubConnectionString には「Event Hub 名前空間」の「共有アクセスポリシー」で取得できる、「接続文字列 - 主キー」を設定します。

AzureWebJobsStorage には、「ストレージアカウント」に対するアクセスキーを設定します。

最初のサービス選択画面から「ストレージアカウント」をクリックし、関数作成のときに作ったストレージ名を選びます。

左側メニューの セキュリティとネットワーク - アクセスキー をクリックします。

すると「アクセスキー」が表示されますので、「キーの表示」をクリックして、「接続文字列」をコピーします。

次に、func new で作成した関数の中に function.json があるので、開きます。

function.json
{
  "bindings": [
    {
      "type": "eventHubTrigger",
      "name": "関数の名前",
      "direction": "in",
      "eventHubName": "Event Hubs の名前",
      "connection": "AzureEventHubConnectionString",
      "cardinality": "many",
      "consumerGroup": "$Default"
    }
  ]
}

typeeventHubTrigger で、name は自動で作られます。

eventHubName ですが、これは Event Hubs の名前空間ではなく、 Event Hubs の名前空間の左メニューの エンティティ - Event Hubs をクリックして作成した Event Hub の名前を指定します。

connection には、 local.settings.json で設定したキーの名前 AzureEventHubConnectionString を指定します。

関数の JavaScript はこんな感じです。

EventHubTrigger/index.js
module.exports = async function (context, eventHubMessages) {
    context.log(`JavaScript eventhub trigger function called for message array ${eventHubMessages}`);
    context.log('Function triggered to process a message: ', eventHubMessages);
    context.log('EnqueuedTimeUtc =', context.bindingData.enqueuedTimeUtc);
    context.log('SequenceNumber =', context.bindingData.sequenceNumber);
    context.log('Offset =', context.bindingData.offset);

    context.done();

    eventHubMessages.forEach((message, index) => {
        context.log(`Processed message ${message}`);
    });
};

func start しておきましょう。

するとローカルで関数が起動し、イベント待ちの状態になります。

この状態で、別のターミナルを起動し、イベントを送信するスクリプトを実行します。

繰り返しになりますが、以下のコードですね。

src/send.js
const { EventHubProducerClient } = require('@azure/event-hubs')

const connectionString = "Endpoint=sb://sample-azfunc-basic-eventhub.servicebus.windows.net/;SharedAccessKeyName=RootManageSharedAccessKey;SharedAccessKey=eunsC8UuZlJxJLaX9ewYFoa0GwwuoHQr8XDc4fNGt90="
const eventHubName = "samples-workitems";

async function main() {
  const producer = new EventHubProducerClient(connectionString, eventHubName);

  const batch = await producer.createBatch()
  batch.tryAdd({ body: "1つめのイベント"})
  batch.tryAdd({ body: "2つめのイベント" })
  batch.tryAdd({ body: "3つめのイベント" })

  await producer.sendBatch(batch)

  await producer.close()

  console.log("A batch of three events have been sent to the event hub");
  console.log('Event Hubs に3つのイベントを送信しました。')
}

main().catch((err) => {
  console.log('エラーが発生しました:', err)
})

これを実行すると、関数側の方で以下のようなコメントが表示されます。

[2022-01-08T07:10:52.712Z] Worker process started and initialized.
[2022-01-08T07:10:57.941Z] Host lock lease acquired by instance ID '000000000000000000000000383179D0F'.
[2022-01-08T07:11:05.602Z] Executing 'Functions.EventHubTrigger' (Reason='(null)', Id=597175a3-d8bd-4fdb-9aeb-e6d5d0182u3c0sf)
[2022-01-08T07:11:05.603Z] Trigger Details: PartionId: 0, Offset: 0-192, EnqueueTimeUtc: 2022-01-08T07:11:05.4150000Z-2022-01-08T07:11:05.4150000Z, SequenceNumber: 0-2, Count: 3
[2022-01-08T07:11:05.683Z] JavaScript eventhub trigger function called for message array 1つめのイベント,2つめのイベント,3つめのイベント
[2022-01-08T07:11:05.683Z] Function triggered to process a message:  [ '1つめのイベント', '2つめのイベント', '3つめのイベント' ]
[2022-01-08T07:11:05.683Z] EnqueuedTimeUtc = undefined
[2022-01-08T07:11:05.684Z] SequenceNumber = undefined
[2022-01-08T07:11:05.684Z] Offset = undefined
[2022-01-08T07:11:05.716Z] Executed 'Functions.EventHubTrigger' (Succeeded, Id=597175a3-d8bd-4fdb-9aeb-e6d5d0182u3c0sf, Duration=141ms)

こんな風に、イベントを受け取って関数を起動させることができました。

GitHubで編集を提案

Discussion

ログインするとコメントできます