Postmanを使用してKafkaトピックにメッセージを送信する方法
はじめに
ApacheKafkaは、高スループット、フォールトトレラントな分散ストリーミングプラットフォームとして広く採用されています。開発者やテスターにとって、KafkaトピックにメッセージをAPIを通じて簡単に送信できることは非常に重要です。Postmanは、APIのテストや操作を容易にする強力なツールであり、KafkaのRESTプロキシを利用することで、Kafkaトピックとの対話が可能になります。
この技術チュートリアルでは、Postmanを使用してKafkaトピックにメッセージを送信する方法を詳しく説明します。Confluent REST Proxyの設定から、JSON、Avro、バイナリ形式のメッセージの送信まで、ステップバイステップで解説していきます。
前提条件
このチュートリアルを進めるには、以下のものが必要です:
- Confluent Platform(または少なくともKafkaとConfluent REST Proxy)がインストールされていること
- Postmanがインストールされていること
- 基本的なKafkaの概念とRESTの知識
Confluent REST Proxyとは
Confluent REST ProxyはKafkaクラスターへのRESTfulインターフェースを提供するサービスで、HTTPリクエストを通じてKafkaとやり取りすることができます。これにより、Javaやその他のKafkaクライアントライブラリを使用せずに、様々な言語やプラットフォームからKafkaトピックにメッセージを生成したり消費したりすることが可能になります。
環境のセットアップ
Confluent REST Proxyの起動
Confluent Platformがすでにインストールされている場合、以下のコマンドでREST Proxyを起動できます:
confluent local services kafka-rest start
または、各サービスを個別に起動する場合:
bin/zookeeper-server-start ./etc/kafka/zookeeper.properties
bin/kafka-server-start ./etc/kafka/server.properties
bin/schema-registry-start ./etc/schema-registry/schema-registry.properties
bin/kafka-rest-start ./etc/kafka-rest/kafka-rest.properties
デフォルトでは、REST ProxyはポートのHTTP 8082でリッスンします。
Postmanの設定
- Postmanを開きます
- 新しいリクエストを作成します
- HTTPメソッドとして「POST」を選択します
- ベースURLとして「http://localhost:8082」を使用します(環境に合わせて調整)
JSONメッセージの送信
1. トピックの作成(オプション)
まず、メッセージを送信するためのトピックが必要です。既存のトピックがない場合は、Kafka CLIツールを使用して作成するか、以下のPostmanリクエストで確認できます:
GET http://localhost:8082/topics
2. JSONメッセージの送信
-
新しいPOSTリクエストを作成します:
- URL:
http://localhost:8082/topics/jsontest
- メソッド: POST
- URL:
-
ヘッダー設定:
- Key:
Content-Type
- Value:
application/vnd.kafka.json.v2+json
- Key:
-
リクエストボディ(Raw/JSON形式):
{
"records": [
{
"value": {
"message": "これはテストメッセージです",
"timestamp": "2023-12-01T12:00:00"
}
}
]
}
-
「Send」ボタンをクリックしてリクエストを送信します。
-
成功した場合、以下のようなレスポンスが返ります:
{
"offsets": [
{
"partition": 0,
"offset": 0,
"error_code": null,
"error": null
}
],
"key_schema_id": null,
"value_schema_id": null
}
このレスポンスは、メッセージがパーティション0に送信され、オフセットが0であることを示しています。
3. キーを含むJSONメッセージの送信
特定のキーでメッセージを送信したい場合は、以下のようにリクエストボディを変更します:
{
"records": [
{
"key": "key1",
"value": {
"message": "これはキー付きのテストメッセージです",
"timestamp": "2023-12-01T12:30:00"
}
}
]
}
キーは、同じキーを持つメッセージが同じパーティションに送信されることを保証するために使用されます。
Avroフォーマットでのメッセージ送信
Avroはスキーマベースのデータ形式で、スキーマレジストリと組み合わせることで、データの整合性を確保できます。
1. Avroメッセージの送信
-
新しいPOSTリクエストを作成:
- URL:
http://localhost:8082/topics/avrotest
- メソッド: POST
- URL:
-
ヘッダー設定:
- Key:
Content-Type
- Value:
application/vnd.kafka.avro.v2+json
- Key:
Accept
- Value:
application/vnd.kafka.v2+json
- Key:
-
リクエストボディ:
{
"value_schema": "{\"type\": \"record\", \"name\": \"User\", \"fields\": [{\"name\": \"name\", \"type\": \"string\"}, {\"name\": \"age\", \"type\": \"int\"}]}",
"records": [
{
"value": {
"name": "田中太郎",
"age": 30
}
}
]
}
-
リクエストを送信します。
-
成功レスポンス:
{
"offsets": [
{
"partition": 0,
"offset": 0,
"error_code": null,
"error": null
}
],
"key_schema_id": null,
"value_schema_id": 1
}
value_schema_id
はスキーマレジストリに登録されたスキーマのIDを示しています。
2. キーと値の両方にAvroスキーマを使用
キーと値の両方にAvroスキーマを使用する場合のリクエストは以下のようになります:
{
"key_schema": "{\"type\": \"string\"}",
"value_schema": "{\"type\": \"record\", \"name\": \"User\", \"fields\": [{\"name\": \"name\", \"type\": \"string\"}, {\"name\": \"age\", \"type\": \"int\"}]}",
"records": [
{
"key": "user1",
"value": {
"name": "鈴木花子",
"age": 25
}
}
]
}
バイナリメッセージの送信
バイナリデータをKafkaトピックに送信することもできます。
-
新しいPOSTリクエストを作成:
- URL:
http://localhost:8082/topics/binarytest
- メソッド: POST
- URL:
-
ヘッダー設定:
- Key:
Content-Type
- Value:
application/vnd.kafka.binary.v2+json
- Key:
Accept
- Value:
application/vnd.kafka.v2+json
- Key:
-
リクエストボディ:
{
"records": [
{
"value": "S2Fma2HjgadnvZPlpJbjgII="
}
]
}
注意: バイナリ値はBase64エンコードされている必要があります。
JSONスキーマフォーマットでのメッセージ送信
JSONスキーマを使用したメッセージ送信も可能です。
-
新しいPOSTリクエストを作成:
- URL:
http://localhost:8082/topics/jsonschematest
- メソッド: POST
- URL:
-
ヘッダー設定:
- Key:
Content-Type
- Value:
application/vnd.kafka.jsonschema.v2+json
- Key:
Accept
- Value:
application/vnd.kafka.v2+json
- Key:
-
リクエストボディ:
{
"value_schema": "{\"type\":\"object\",\"properties\":{\"name\":{\"type\":\"string\"},\"email\":{\"type\":\"string\",\"format\":\"email\"}}}",
"records": [
{
"value": {
"name": "山田次郎",
"email": "yamada@example.com"
}
}
]
}
トピックメタデータの確認
Postmanを使用してKafkaトピックのメタデータを確認することもできます:
- トピック一覧の取得:
GET http://localhost:8082/topics
- 特定のトピックの詳細情報:
GET http://localhost:8082/topics/jsontest
- トピックのパーティション情報:
GET http://localhost:8082/topics/jsontest/partitions
コンシューマーグループの作成と管理
REST Proxyを使用してコンシューマーグループを作成し、メッセージを消費することも可能です。
- コンシューマーの作成:
POST http://localhost:8082/consumers/my_json_consumer_group
ヘッダー:
Content-Type: application/vnd.kafka.v2+json
リクエストボディ:
{
"name": "my_consumer_instance",
"format": "json",
"auto.offset.reset": "earliest"
}
- トピックのサブスクリプション:
POST http://localhost:8082/consumers/my_json_consumer_group/instances/my_consumer_instance/subscription
ヘッダー:
Content-Type: application/vnd.kafka.v2+json
リクエストボディ:
{
"topics": ["jsontest"]
}
- メッセージの取得:
GET http://localhost:8082/consumers/my_json_consumer_group/instances/my_consumer_instance/records
ヘッダー:
Accept: application/vnd.kafka.json.v2+json
- コンシューマーの削除:
DELETE http://localhost:8082/consumers/my_json_consumer_group/instances/my_consumer_instance
Postmanのコレクションとして保存
繰り返し使用する場合は、これらのリクエストをPostmanのコレクションとして保存することをお勧めします:
- リクエストを右クリックして「Save to Collection」を選択
- 新しいコレクション(例:「Kafka REST API」)を作成
- 環境変数を設定して、異なる環境(開発、テスト、本番など)で再利用できるようにする
環境変数の活用
Postmanの環境変数を使用すると、異なる環境での再利用が容易になります:
-
「Environments」タブをクリックして新しい環境を作成
-
以下の変数を追加:
-
kafkaRestUrl
: http://localhost:8082 -
topic
: jsontest
-
-
リクエストURL内で変数を使用:
{{kafkaRestUrl}}/topics/{{topic}}
セキュリティ設定
本番環境では、REST Proxyへのアクセスにセキュリティ対策が必要です。サポートされる認証メカニズムには以下が含まれます:
-
Basic認証:
- ヘッダーに
Authorization: Basic base64(username:password)
を追加
- ヘッダーに
-
OAuth:
- トークンを取得してヘッダーに
Authorization: Bearer <token>
を追加
- トークンを取得してヘッダーに
-
SSL/TLS:
- Postmanの設定で適切な証明書を構成
トラブルシューティング
RESTプロキシを使用する際に発生する一般的な問題とその解決策:
-
接続エラー:REST Proxyサーバーが実行されていることを確認してください。
-
認証エラー:認証情報が正しいことを確認してください。
-
スキーマエラー:Avroスキーマが有効であることを確認し、値がスキーマに準拠していることを確認してください。
-
Content-Type不一致:リクエストとレスポンスのContent-Typeが正しいことを確認してください。
まとめ
このチュートリアルでは、PostmanとConfluent REST Proxyを使用してKafkaトピックにメッセージを送信する方法を詳しく説明しました。JSON、Avro、バイナリ、JSONスキーマなど、さまざまな形式でのメッセージ送信をカバーし、トピックメタデータの確認方法やコンシューマーグループの管理方法も紹介しました。
この知識を活用することで、Kafkaを使ったアプリケーション開発やテストが効率化され、クライアントライブラリを使わずともRESTインターフェースを通じてKafkaと対話することができます。さらに高度なユースケースや設定については、Confluent公式ドキュメントを参照してください。
参考リソース
このガイドが、KafkaトピックとPostmanを使った開発作業に役立つことを願っています。
Discussion