🦔

Next.js から Azure Event Hubs にイベントを送ってみる

4 min read

Next.js から Azure Event Hubs にイベントを送る簡単なサンプルです。

npx create-next-app@latest --typescript で Next.js の雛形を作ります。

Next.js の環境変数 に Azure Event Hubs への接続文字列を設定します。 .env.local をルートディレクトリに作成します。

.env.local
NEXT_PUBLIC_EVENTHUB_CONNECTION_STRING="Endpoint=sb://xxxx"
NEXT_PUBLIC_EVENTHUB_NAME="event-hub-name"

hooks ディレクトリを作り、hooks/useEventHub.ts を作ります。

hooks/useEventHub.ts
import { EventHubProducerClient } from "@azure/event-hubs";

export const useEventHub =  () => {
  const connectEventHub = async() => {
    const connectionString = process.env.NEXT_PUBLIC_EVENTHUB_CONNECTION_STRING
    const eventHubName =  process.env.NEXT_PUBLIC_EVENTHUB_NAME

    if (connectionString != null && eventHubName != null) {
      const producer = new EventHubProducerClient(connectionString, eventHubName)
      const batch = await producer.createBatch();
      batch.tryAdd({ body: { "message": "Hello World" }})
      await producer.sendBatch(batch)
      await producer.close()
      console.log('イベントを送信しました。')
    }
  }
  return { connectEventHub }
}

この Hooks を使って、ボタンクリックなどのイベントを契機に Azure Event Hubs にイベントを送ることができます。

Azure Event Hubs に送信するイベントでは、

{
  "body": {
    "message": "hello world"
  }
}

のように、body のプロパティとして任意のオブジェクトを設定できます。

送信する側と受信する側でイベントの型を共有しておくのが良いと思います。

Azure Event Hubs client library for JavaScript - Version 5.6.0

イベントを受信する

サンプルそのままですが、どこの文字列を何に設定するのかがわかりづらかったかと思いますので、その辺をわかるように書いてみました。
コードは雑なので、もしこの記事を見た方がいたら、適宜書き直して使ってください。

const { EventHubConsumerClient } = require("@azure/event-hubs");
const { ContainerClient } = require("@azure/storage-blob");
const {
  BlobCheckpointStore,
} = require("@azure/eventhubs-checkpointstore-blob");

const connectionString =
  "Event Hubs 名前空間 - 設定 - 共有アクセスポリシー - 選択したポリシー の「接続文字列 - 主キー」";
const eventHubName =
  "Event Hubs 名前空間 - エンティティ - EventHubs の「名前」列にあるイベントハブ";

const consumerGroup = "$Default"; // name of the default consumer group(ここの値はよくわからなかった)
const storageConnectionString =
  "ストレージアカウント - 左メニューの「セキュリティとネットワーク」 - アクセスキー の key1 の接続文字列 DefaultEndpointsProtocol=...をコピーしたもの";

const containerName =
  "ストレージアカウント - 左メニューの「データストレージ」 - コンテナー の「名前」の列にある名前";

async function main() {
  console.log("Receiving...");
  const containerClient = new ContainerClient(
    storageConnectionString,
    containerName
  );
  const checkpointStore = new BlobCheckpointStore(containerClient);
  const consumerClient = new EventHubConsumerClient(
    consumerGroup,
    connectionString,
    eventHubName,
    checkpointStore
  );

  const subscription = consumerClient.subscribe({
    processEvents: async (events, context) => {
      if (events.length === 0) {
        console.log(
          `No events received within wait time. Waiting for next interval`
        );
        return;
      }

      for (const event of events) {
        console.log("event:", event);
        console.log(
          `Received event: '${event.body.message}' from partition: '${context.partitionId}' and consumer group: '${context.consumerGroup}'`
        );
      }

      await context.updateCheckpoint(events[events.length - 1]);
    },
    processError: async (err, context) => {
      console.log(`Error: ${err}`);
    },
  });

  await new Promise((resolve) => {
    setTimeout(async () => {
      await subscription.close();
      await consumerClient.close();
      resolve();
    }, 60000);
  });
}

main().catch((err) => {
  console.log("Error occurred: ", err);
});

イベントを送ると、以下のようなメッセージがコンソールに表示されます。

event: {
  body: { message: 'Hello World' },
  properties: {},
  offset: '2144',
  sequenceNumber: 23,
  enqueuedTimeUtc: 2022-01-10T12:52:56.888Z,
  partitionKey: undefined,
  systemProperties: {},
  getRawAmqpMessage: [Function: getRawAmqpMessage]
}
Received event: 'Hello World' from partition: '1' and consumer group: '$Default'

GitHubで編集を提案

Discussion

ログインするとコメントできます