💬

Postmanを使用してKafkaトピックにメッセージを送信する方法

に公開

はじめに

ApacheKafkaは、高スループット、フォールトトレラントな分散ストリーミングプラットフォームとして広く採用されています。開発者やテスターにとって、KafkaトピックにメッセージをAPIを通じて簡単に送信できることは非常に重要です。Postmanは、APIのテストや操作を容易にする強力なツールであり、KafkaのRESTプロキシを利用することで、Kafkaトピックとの対話が可能になります。

この技術チュートリアルでは、Postmanを使用してKafkaトピックにメッセージを送信する方法を詳しく説明します。Confluent REST Proxyの設定から、JSON、Avro、バイナリ形式のメッセージの送信まで、ステップバイステップで解説していきます。

前提条件

このチュートリアルを進めるには、以下のものが必要です:

  1. Confluent Platform(または少なくともKafkaとConfluent REST Proxy)がインストールされていること
  2. Postmanがインストールされていること
  3. 基本的なKafkaの概念とRESTの知識

Confluent REST Proxyとは

Confluent REST ProxyはKafkaクラスターへのRESTfulインターフェースを提供するサービスで、HTTPリクエストを通じてKafkaとやり取りすることができます。これにより、Javaやその他のKafkaクライアントライブラリを使用せずに、様々な言語やプラットフォームからKafkaトピックにメッセージを生成したり消費したりすることが可能になります。

環境のセットアップ

Confluent REST Proxyの起動

Confluent Platformがすでにインストールされている場合、以下のコマンドでREST Proxyを起動できます:

confluent local services kafka-rest start

または、各サービスを個別に起動する場合:

bin/zookeeper-server-start ./etc/kafka/zookeeper.properties
bin/kafka-server-start ./etc/kafka/server.properties
bin/schema-registry-start ./etc/schema-registry/schema-registry.properties
bin/kafka-rest-start ./etc/kafka-rest/kafka-rest.properties

デフォルトでは、REST ProxyはポートのHTTP 8082でリッスンします。

Postmanの設定

  1. Postmanを開きます
  2. 新しいリクエストを作成します
  3. HTTPメソッドとして「POST」を選択します
  4. ベースURLとして「http://localhost:8082」を使用します(環境に合わせて調整)

JSONメッセージの送信

1. トピックの作成(オプション)

まず、メッセージを送信するためのトピックが必要です。既存のトピックがない場合は、Kafka CLIツールを使用して作成するか、以下のPostmanリクエストで確認できます:

GET http://localhost:8082/topics

2. JSONメッセージの送信

  1. 新しいPOSTリクエストを作成します:

    • URL: http://localhost:8082/topics/jsontest
    • メソッド: POST
  2. ヘッダー設定:

    • Key: Content-Type
    • Value: application/vnd.kafka.json.v2+json
  3. リクエストボディ(Raw/JSON形式):

{
  "records": [
    {
      "value": {
        "message": "これはテストメッセージです",
        "timestamp": "2023-12-01T12:00:00"
      }
    }
  ]
}
  1. 「Send」ボタンをクリックしてリクエストを送信します。

  2. 成功した場合、以下のようなレスポンスが返ります:

{
  "offsets": [
    {
      "partition": 0,
      "offset": 0,
      "error_code": null,
      "error": null
    }
  ],
  "key_schema_id": null,
  "value_schema_id": null
}

このレスポンスは、メッセージがパーティション0に送信され、オフセットが0であることを示しています。

3. キーを含むJSONメッセージの送信

特定のキーでメッセージを送信したい場合は、以下のようにリクエストボディを変更します:

{
  "records": [
    {
      "key": "key1",
      "value": {
        "message": "これはキー付きのテストメッセージです",
        "timestamp": "2023-12-01T12:30:00"
      }
    }
  ]
}

キーは、同じキーを持つメッセージが同じパーティションに送信されることを保証するために使用されます。

Avroフォーマットでのメッセージ送信

Avroはスキーマベースのデータ形式で、スキーマレジストリと組み合わせることで、データの整合性を確保できます。

1. Avroメッセージの送信

  1. 新しいPOSTリクエストを作成:

    • URL: http://localhost:8082/topics/avrotest
    • メソッド: POST
  2. ヘッダー設定:

    • Key: Content-Type
    • Value: application/vnd.kafka.avro.v2+json
    • Key: Accept
    • Value: application/vnd.kafka.v2+json
  3. リクエストボディ:

{
  "value_schema": "{\"type\": \"record\", \"name\": \"User\", \"fields\": [{\"name\": \"name\", \"type\": \"string\"}, {\"name\": \"age\", \"type\": \"int\"}]}",
  "records": [
    {
      "value": {
        "name": "田中太郎",
        "age": 30
      }
    }
  ]
}
  1. リクエストを送信します。

  2. 成功レスポンス:

{
  "offsets": [
    {
      "partition": 0,
      "offset": 0,
      "error_code": null,
      "error": null
    }
  ],
  "key_schema_id": null,
  "value_schema_id": 1
}

value_schema_id はスキーマレジストリに登録されたスキーマのIDを示しています。

2. キーと値の両方にAvroスキーマを使用

キーと値の両方にAvroスキーマを使用する場合のリクエストは以下のようになります:

{
  "key_schema": "{\"type\": \"string\"}",
  "value_schema": "{\"type\": \"record\", \"name\": \"User\", \"fields\": [{\"name\": \"name\", \"type\": \"string\"}, {\"name\": \"age\", \"type\": \"int\"}]}",
  "records": [
    {
      "key": "user1",
      "value": {
        "name": "鈴木花子",
        "age": 25
      }
    }
  ]
}

バイナリメッセージの送信

バイナリデータをKafkaトピックに送信することもできます。

  1. 新しいPOSTリクエストを作成:

    • URL: http://localhost:8082/topics/binarytest
    • メソッド: POST
  2. ヘッダー設定:

    • Key: Content-Type
    • Value: application/vnd.kafka.binary.v2+json
    • Key: Accept
    • Value: application/vnd.kafka.v2+json
  3. リクエストボディ:

{
  "records": [
    {
      "value": "S2Fma2HjgadnvZPlpJbjgII="
    }
  ]
}

注意: バイナリ値はBase64エンコードされている必要があります。

JSONスキーマフォーマットでのメッセージ送信

JSONスキーマを使用したメッセージ送信も可能です。

  1. 新しいPOSTリクエストを作成:

    • URL: http://localhost:8082/topics/jsonschematest
    • メソッド: POST
  2. ヘッダー設定:

    • Key: Content-Type
    • Value: application/vnd.kafka.jsonschema.v2+json
    • Key: Accept
    • Value: application/vnd.kafka.v2+json
  3. リクエストボディ:

{
  "value_schema": "{\"type\":\"object\",\"properties\":{\"name\":{\"type\":\"string\"},\"email\":{\"type\":\"string\",\"format\":\"email\"}}}",
  "records": [
    {
      "value": {
        "name": "山田次郎",
        "email": "yamada@example.com"
      }
    }
  ]
}

トピックメタデータの確認

Postmanを使用してKafkaトピックのメタデータを確認することもできます:

  1. トピック一覧の取得:
GET http://localhost:8082/topics
  1. 特定のトピックの詳細情報:
GET http://localhost:8082/topics/jsontest
  1. トピックのパーティション情報:
GET http://localhost:8082/topics/jsontest/partitions

コンシューマーグループの作成と管理

REST Proxyを使用してコンシューマーグループを作成し、メッセージを消費することも可能です。

  1. コンシューマーの作成:
POST http://localhost:8082/consumers/my_json_consumer_group

ヘッダー:

  • Content-Type: application/vnd.kafka.v2+json

リクエストボディ:

{
  "name": "my_consumer_instance",
  "format": "json",
  "auto.offset.reset": "earliest"
}
  1. トピックのサブスクリプション:
POST http://localhost:8082/consumers/my_json_consumer_group/instances/my_consumer_instance/subscription

ヘッダー:

  • Content-Type: application/vnd.kafka.v2+json

リクエストボディ:

{
  "topics": ["jsontest"]
}
  1. メッセージの取得:
GET http://localhost:8082/consumers/my_json_consumer_group/instances/my_consumer_instance/records

ヘッダー:

  • Accept: application/vnd.kafka.json.v2+json
  1. コンシューマーの削除:
DELETE http://localhost:8082/consumers/my_json_consumer_group/instances/my_consumer_instance

Postmanのコレクションとして保存

繰り返し使用する場合は、これらのリクエストをPostmanのコレクションとして保存することをお勧めします:

  1. リクエストを右クリックして「Save to Collection」を選択
  2. 新しいコレクション(例:「Kafka REST API」)を作成
  3. 環境変数を設定して、異なる環境(開発、テスト、本番など)で再利用できるようにする

環境変数の活用

Postmanの環境変数を使用すると、異なる環境での再利用が容易になります:

  1. 「Environments」タブをクリックして新しい環境を作成

  2. 以下の変数を追加:

  3. リクエストURL内で変数を使用: {{kafkaRestUrl}}/topics/{{topic}}

セキュリティ設定

本番環境では、REST Proxyへのアクセスにセキュリティ対策が必要です。サポートされる認証メカニズムには以下が含まれます:

  1. Basic認証:

    • ヘッダーに Authorization: Basic base64(username:password) を追加
  2. OAuth:

    • トークンを取得してヘッダーに Authorization: Bearer <token> を追加
  3. SSL/TLS:

    • Postmanの設定で適切な証明書を構成

トラブルシューティング

RESTプロキシを使用する際に発生する一般的な問題とその解決策:

  1. 接続エラー:REST Proxyサーバーが実行されていることを確認してください。

  2. 認証エラー:認証情報が正しいことを確認してください。

  3. スキーマエラー:Avroスキーマが有効であることを確認し、値がスキーマに準拠していることを確認してください。

  4. Content-Type不一致:リクエストとレスポンスのContent-Typeが正しいことを確認してください。

まとめ

このチュートリアルでは、PostmanとConfluent REST Proxyを使用してKafkaトピックにメッセージを送信する方法を詳しく説明しました。JSON、Avro、バイナリ、JSONスキーマなど、さまざまな形式でのメッセージ送信をカバーし、トピックメタデータの確認方法やコンシューマーグループの管理方法も紹介しました。

この知識を活用することで、Kafkaを使ったアプリケーション開発やテストが効率化され、クライアントライブラリを使わずともRESTインターフェースを通じてKafkaと対話することができます。さらに高度なユースケースや設定については、Confluent公式ドキュメントを参照してください。

参考リソース

このガイドが、KafkaトピックとPostmanを使った開発作業に役立つことを願っています。

Discussion