Kong Event Gateway

KongのEvent Gatewayを試してみる

Event Gatewayは主に以下の2機能を持つ
- Kong Gatewayを用いたプロトコル変換(HTTP to Kafka / Kafka to HTTP)
- Native Event Proxyを用いたKafkaとの連携

Kong Gatewayを用いたプロトコル変換は、プラグインで実現する模様
- Kafka Upstream
- Kafka Consume
- Confluent
- Confluent Consume
- Solace Upstream

Native Event Proxyは、Konnect上に仮想クラスタ、トピックを作成し物理Kafka(本当のKafkaクラスタやトピック)とやり取りする模様
絵をみる限り、Client -> KNEP -> Kafkaの中間でポリシー制御できるみたい

Kong Gatewayを用いたプロトコル変換

Kafka Upstream Pluginを試す

まずは、Kafkaクラスタを作成する。
In Kong Gateway 3.9 or earlier, the message format is not customizable.
とのことなので、3.9より上位のバージョンで実施するのが無難っぽい。加えて、Schema Registryも構築して設定しておくと、Kafkaのトピックへ送信するメッセージにちゃんとスキーマを定めることができる(=スキーマ違反しているものはGatewayレイヤーでエラーを返却するが、なくても動作するので最初の検証ではなしで進める。ちなみに、Schema RegistryはConfluent社が開発したKafka上を流れるデータのスキーマ(Avro、JSON Schema、Protobuf など)を一元的に管理するコンポーネントのこと。
以下の compose.yaml
を使って必要なリソースを一括で起動する。一部機微情報を含むため、割愛しています。KonnectでGatewayを作成する際に必要な値は参照できるのでそこから拾って設定してください。
x-default: &default
networks:
- knep-network
networks:
knep-network:
services:
kafka:
<<: *default
image: apache/kafka:4.1.0
container_name: kafka
ports:
- 9092:19092
environment:
KAFKA_NODE_ID: 1
KAFKA_PROCESS_ROLES: broker,controller
KAFKA_CONTROLLER_LISTENER_NAMES: CONTROLLER
KAFKA_LISTENERS: INTERNAL://kafka:9092,CONTROLLER://kafka:9093,EXTERNAL://0.0.0.0:19092
KAFKA_ADVERTISED_LISTENERS: INTERNAL://kafka:9092,EXTERNAL://localhost:9092
KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: CONTROLLER:PLAINTEXT,INTERNAL:PLAINTEXT,EXTERNAL:PLAINTEXT
KAFKA_INTER_BROKER_LISTENER_NAME: INTERNAL
KAFKA_CONTROLLER_QUORUM_VOTERS: 1@kafka:9093
KAFKA_CLUSTER_ID: "abcdefghijklmnopqrstuv"
KAFKA_LOG_DIRS: /tmp/kraft-combined-logs
konnect-gateway:
<<: *default
image: kong/kong-gateway:3.11
container_name: konnect-gateway
environment:
KONG_ROLE: data_plane
KONG_DATABASE: off
KONG_VITALS: off
KONG_CLUSTER_MTLS: pki
KONG_CLUSTER_CONTROL_PLANE: ${KONNECT_CONTROL_PLANE_ID:-}.us.cp.konghq.com:443
KONG_CLUSTER_SERVER_NAME: ${KONNECT_CONTROL_PLANE_ID:-}.us.cp.konghq.com
KONG_CLUSTER_TELEMETRY_ENDPOINT: ${KONNECT_CONTROL_PLANE_ID:-}.us.tp.konghq.com:443
KONG_CLUSTER_TELEMETRY_SERVER_NAME: ${KONNECT_CONTROL_PLANE_ID:-}.us.tp.konghq.com
KONG_CLUSTER_CERT: |
-----BEGIN CERTIFICATE-----
MI...
-----END CERTIFICATE-----
KONG_CLUSTER_CERT_KEY: |
-----BEGIN PRIVATE KEY-----
MIG...
-----END PRIVATE KEY-----
KONG_LUA_SSL_TRUSTED_CERTIFICATE: system
KONG_KONNECT_MODE: on
KONG_CLUSTER_DP_LABELS: "type:docker-macOsArmOS"
KONG_ROUTER_FLAVOR: expressions
KONG_HEADERS_UPSTREAM: off
ports:
- 8000:8000
- 8443:8443

NativeなKafkaクライアント(ここでは、kafkactl)を利用してPub/Subしてみる。
まずは、トピックを作成する。
$ kafkactl create topic test
topic created: test
作成できたかどうか確認する。
$ kafkactl get topics
TOPIC PARTITIONS REPLICATION FACTOR
test 1 1
test
トピックをサブスクライブしておく。
$ kafkactl consume test
別のターミナルから test
トピックにメッセージをプロデュースする。
$ kafkactl produce test -v hello-world
message produced (partition=0 offset=0)
最初のターミナルできちんとメッセージがサブスクライブできていることを確認する。
$ kafkactl consume test
hello-world

Kafka Upstreamプラグインを設定して、HTTPクライアント(ここでは、curl)を使ってメッセージをプロデュースしてみる。まずは、Kong Gatewayの設定をする。
_format_version: "3.0"
_konnect:
control_plane_name: local-gateway
services:
- name: kafka-service
url: https://httpbin.org
routes:
- name: kafka-route
paths:
- /mock
strip_path: true
plugins:
- name: kafka-upstream
service: kafka-service
config:
topic: test
bootstrap_servers:
- host: kafka
port: 9092
decKを用いてKongに設定を加える。(Control Plane名やPATなどは適宜設定してください)
deck gateway sync kong.yaml
メッセージをHTTPクライアントからプロデュースしてみる。
curl -X POST http://localhost:8000/mock -d hello-world
継続してサブスクライブしているターミナルで以下のように表示されれば成功です。
{"body_args":{"hello-world":true},"body":"hello-world"}

次に、Kafka Consumeプラグインを試す。

Kongの設定ファイルを以下に置換する。
_format_version: "3.0"
_konnect:
control_plane_name: local-gateway
services:
- name: kafka-produce-service
url: https://httpbin.org
routes:
- name: produce-route
paths:
- /produce
strip_path: true
- name: kafka-consume-service
url: https://httpbin.org
routes:
- name: consume-route
paths:
- /consume
strip_path: true
plugins:
- name: kafka-upstream
service: kafka-produce-service
config:
topic: test
bootstrap_servers:
- host: kafka
port: 9092
- name: kafka-consume
service: kafka-consume-service
config:
bootstrap_servers:
- host: kafka
port: 9092
topics:
- name: test
mode: http-get

トピックからメッセージを読み出してみる。
curl -X GET http://localhost:8000/consume
エラーが出た
konnect-gateway | 2025/09/30 08:20:17 [warn] 2641#0: *23001 [kong] consumers.lua:168 [kafka-consume] no cluster_name provided in plugin configuration, using default cluster name. If more than one Kafka plugin is configured without a cluster_name, these plugins will use the same cluster, client: 192.168.65.1, server: kong, request: "GET /consume HTTP/1.1", host: "localhost:8000", request_id: "892f1439775cfa2184e5d140ff7cde67"
konnect-gateway | 2025/09/30 08:20:17 [warn] 2641#0: *23001 [lua] client.lua:265: negotiate_all_api_versions(): Provided min_version subceeds min_version for ApiKey: 0 ProduceRequest(KAFKA-18659) use suggest_min:3, client: 192.168.65.1, server: kong, request: "GET /consume HTTP/1.1", host: "localhost:8000", request_id: "892f1439775cfa2184e5d140ff7cde67"
konnect-gateway | 2025/09/30 08:20:17 [warn] 2641#0: *23001 [lua] client.lua:260: negotiate_all_api_versions(): Provided min_version subceeds min_version for ApiKey: 19 use suggest_min:2, client: 192.168.65.1, server: kong, request: "GET /consume HTTP/1.1", host: "localhost:8000", request_id: "892f1439775cfa2184e5d140ff7cde67"
konnect-gateway | 2025/09/30 08:20:17 [warn] 2641#0: *23001 [lua] client.lua:260: negotiate_all_api_versions(): Provided min_version subceeds min_version for ApiKey: 8 use suggest_min:2, client: 192.168.65.1, server: kong, request: "GET /consume HTTP/1.1", host: "localhost:8000", request_id: "892f1439775cfa2184e5d140ff7cde67"
konnect-gateway | 2025/09/30 08:20:17 [warn] 2641#0: *23001 [lua] client.lua:416: get_group_coordinator(): failed to get group coordinator (attempt 1 of 5). Retrying in 1 seconds, client: 192.168.65.1, server: kong, request: "GET /consume HTTP/1.1", host: "localhost:8000", request_id: "892f1439775cfa2184e5d140ff7cde67"
konnect-gateway | 2025/09/30 08:20:18 [warn] 2641#0: *23001 [lua] client.lua:416: get_group_coordinator(): failed to get group coordinator (attempt 2 of 5). Retrying in 1 seconds, client: 192.168.65.1, server: kong, request: "GET /consume HTTP/1.1", host: "localhost:8000", request_id: "892f1439775cfa2184e5d140ff7cde67"
konnect-gateway | 2025/09/30 08:20:19 [warn] 2641#0: *23001 [lua] client.lua:416: get_group_coordinator(): failed to get group coordinator (attempt 3 of 5). Retrying in 1 seconds, client: 192.168.65.1, server: kong, request: "GET /consume HTTP/1.1", host: "localhost:8000", request_id: "892f1439775cfa2184e5d140ff7cde67"
konnect-gateway | 2025/09/30 08:20:20 [warn] 2641#0: *23001 [lua] client.lua:416: get_group_coordinator(): failed to get group coordinator (attempt 4 of 5). Retrying in 1 seconds, client: 192.168.65.1, server: kong, request: "GET /consume HTTP/1.1", host: "localhost:8000", request_id: "892f1439775cfa2184e5d140ff7cde67"
konnect-gateway | 2025/09/30 08:20:21 [warn] 2641#0: *23001 [lua] consumer.lua:487: subscribe(): failed to get group coordinator, retrying : failed to find coordinator for group: com.konghq.kafka.bab1885ddafdd3916909414673e240b4 after 5 attempts, client: 192.168.65.1, server: kong, request: "GET /consume HTTP/1.1", host: "localhost:8000", request_id: "892f1439775cfa2184e5d140ff7cde67"
konnect-gateway | 2025/09/30 08:20:22 [warn] 2641#0: *23001 [lua] client.lua:416: get_group_coordinator(): failed to get group coordinator (attempt 1 of 5). Retrying in 1 seconds, client: 192.168.65.1, server: kong, request: "GET /consume HTTP/1.1", host: "localhost:8000", request_id: "892f1439775cfa2184e5d140ff7cde67"
konnect-gateway | 2025/09/30 08:20:23 [warn] 2641#0: *23001 [lua] client.lua:416: get_group_coordinator(): failed to get group coordinator (attempt 2 of 5). Retrying in 1 seconds, client: 192.168.65.1, server: kong, request: "GET /consume HTTP/1.1", host: "localhost:8000", request_id: "892f1439775cfa2184e5d140ff7cde67"
konnect-gateway | 2025/09/30 08:20:24 [warn] 2641#0: *23001 [lua] client.lua:416: get_group_coordinator(): failed to get group coordinator (attempt 3 of 5). Retrying in 1 seconds, client: 192.168.65.1, server: kong, request: "GET /consume HTTP/1.1", host: "localhost:8000", request_id: "892f1439775cfa2184e5d140ff7cde67"
konnect-gateway | 2025/09/30 08:20:25 [warn] 2641#0: *23001 [lua] client.lua:416: get_group_coordinator(): failed to get group coordinator (attempt 4 of 5). Retrying in 1 seconds, client: 192.168.65.1, server: kong, request: "GET /consume HTTP/1.1", host: "localhost:8000", request_id: "892f1439775cfa2184e5d140ff7cde67"
konnect-gateway | 2025/09/30 08:20:26 [warn] 2641#0: *23001 [lua] consumer.lua:487: subscribe(): failed to get group coordinator, retrying : failed to find coordinator for group: com.konghq.kafka.bab1885ddafdd3916909414673e240b4 after 5 attempts, client: 192.168.65.1, server: kong, request: "GET /consume HTTP/1.1", host: "localhost:8000", request_id: "892f1439775cfa2184e5d140ff7cde67"
konnect-gateway | 2025/09/30 08:20:27 [warn] 2641#0: *23001 [lua] client.lua:416: get_group_coordinator(): failed to get group coordinator (attempt 1 of 5). Retrying in 1 seconds, client: 192.168.65.1, server: kong, request: "GET /consume HTTP/1.1", host: "localhost:8000", request_id: "892f1439775cfa2184e5d140ff7cde67"
konnect-gateway | 2025/09/30 08:20:28 [warn] 2641#0: *23001 [lua] client.lua:416: get_group_coordinator(): failed to get group coordinator (attempt 2 of 5). Retrying in 1 seconds, client: 192.168.65.1, server: kong, request: "GET /consume HTTP/1.1", host: "localhost:8000", request_id: "892f1439775cfa2184e5d140ff7cde67"
konnect-gateway | 2025/09/30 08:20:29 [warn] 2641#0: *23001 [lua] client.lua:416: get_group_coordinator(): failed to get group coordinator (attempt 3 of 5). Retrying in 1 seconds, client: 192.168.65.1, server: kong, request: "GET /consume HTTP/1.1", host: "localhost:8000", request_id: "892f1439775cfa2184e5d140ff7cde67"
konnect-gateway | 2025/09/30 08:20:30 [warn] 2641#0: *23001 [lua] client.lua:416: get_group_coordinator(): failed to get group coordinator (attempt 4 of 5). Retrying in 1 seconds, client: 192.168.65.1, server: kong, request: "GET /consume HTTP/1.1", host: "localhost:8000", request_id: "892f1439775cfa2184e5d140ff7cde67"
konnect-gateway | 2025/09/30 08:20:31 [warn] 2641#0: *23001 [lua] consumer.lua:487: subscribe(): failed to get group coordinator, retrying : failed to find coordinator for group: com.konghq.kafka.bab1885ddafdd3916909414673e240b4 after 5 attempts, client: 192.168.65.1, server: kong, request: "GET /consume HTTP/1.1", host: "localhost:8000", request_id: "892f1439775cfa2184e5d140ff7cde67"
konnect-gateway | 2025/09/30 08:20:32 [error] 2641#0: *23001 [kong] consumers.lua:257 [kafka-consume] Error while processing: failed to subscribe to Kafka topic, client: 192.168.65.1, server: kong, request: "GET /consume HTTP/1.1", host: "localhost:8000", request_id: "892f1439775cfa2184e5d140ff7cde67"
konnect-gateway | 192.168.65.1 - - [30/Sep/2025:08:20:32 +0000] "GET /consume HTTP/1.1" 502 126 "-" "curl/8.7.1" kong_request_id: "892f1439775cfa2184e5d140ff7cde67"
