Open13

Kong Event Gateway

Shuhei KawamuraShuhei Kawamura

KongのEvent Gatewayを試してみる

https://developer.konghq.com/event-gateway/

Shuhei KawamuraShuhei Kawamura

Event Gatewayは主に以下の2機能を持つ

  • Kong Gatewayを用いたプロトコル変換(HTTP to Kafka / Kafka to HTTP)
  • Native Event Proxyを用いたKafkaとの連携
Shuhei KawamuraShuhei Kawamura

Kong Gatewayを用いたプロトコル変換は、プラグインで実現する模様

  • Kafka Upstream
  • Kafka Consume
  • Confluent
  • Confluent Consume
  • Solace Upstream
Shuhei KawamuraShuhei Kawamura

Native Event Proxyは、Konnect上に仮想クラスタ、トピックを作成し物理Kafka(本当のKafkaクラスタやトピック)とやり取りする模様

絵をみる限り、Client -> KNEP -> Kafkaの中間でポリシー制御できるみたい

Shuhei KawamuraShuhei Kawamura

Kong Gatewayを用いたプロトコル変換

Shuhei KawamuraShuhei Kawamura

まずは、Kafkaクラスタを作成する。

In Kong Gateway 3.9 or earlier, the message format is not customizable.

とのことなので、3.9より上位のバージョンで実施するのが無難っぽい。加えて、Schema Registryも構築して設定しておくと、Kafkaのトピックへ送信するメッセージにちゃんとスキーマを定めることができる(=スキーマ違反しているものはGatewayレイヤーでエラーを返却するが、なくても動作するので最初の検証ではなしで進める。ちなみに、Schema RegistryはConfluent社が開発したKafka上を流れるデータのスキーマ(Avro、JSON Schema、Protobuf など)を一元的に管理するコンポーネントのこと。

以下の compose.yaml を使って必要なリソースを一括で起動する。一部機微情報を含むため、割愛しています。KonnectでGatewayを作成する際に必要な値は参照できるのでそこから拾って設定してください。

compose.yaml
x-default: &default
  networks:
    - knep-network

networks:
  knep-network:

services:
  kafka:
    <<: *default
    image: apache/kafka:4.1.0
    container_name: kafka
    ports:
      - 9092:19092
    environment:
      KAFKA_NODE_ID: 1
      KAFKA_PROCESS_ROLES: broker,controller
      KAFKA_CONTROLLER_LISTENER_NAMES: CONTROLLER
      KAFKA_LISTENERS: INTERNAL://kafka:9092,CONTROLLER://kafka:9093,EXTERNAL://0.0.0.0:19092
      KAFKA_ADVERTISED_LISTENERS: INTERNAL://kafka:9092,EXTERNAL://localhost:9092
      KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: CONTROLLER:PLAINTEXT,INTERNAL:PLAINTEXT,EXTERNAL:PLAINTEXT
      KAFKA_INTER_BROKER_LISTENER_NAME: INTERNAL
      KAFKA_CONTROLLER_QUORUM_VOTERS: 1@kafka:9093
      KAFKA_CLUSTER_ID: "abcdefghijklmnopqrstuv"
      KAFKA_LOG_DIRS: /tmp/kraft-combined-logs

  konnect-gateway:
    <<: *default
    image: kong/kong-gateway:3.11
    container_name: konnect-gateway
    environment:
      KONG_ROLE: data_plane
      KONG_DATABASE: off
      KONG_VITALS: off
      KONG_CLUSTER_MTLS: pki
      KONG_CLUSTER_CONTROL_PLANE: ${KONNECT_CONTROL_PLANE_ID:-}.us.cp.konghq.com:443
      KONG_CLUSTER_SERVER_NAME: ${KONNECT_CONTROL_PLANE_ID:-}.us.cp.konghq.com
      KONG_CLUSTER_TELEMETRY_ENDPOINT: ${KONNECT_CONTROL_PLANE_ID:-}.us.tp.konghq.com:443
      KONG_CLUSTER_TELEMETRY_SERVER_NAME: ${KONNECT_CONTROL_PLANE_ID:-}.us.tp.konghq.com
      KONG_CLUSTER_CERT: |
        -----BEGIN CERTIFICATE-----
        MI...
        -----END CERTIFICATE-----
      KONG_CLUSTER_CERT_KEY: |
        -----BEGIN PRIVATE KEY-----
        MIG...
        -----END PRIVATE KEY-----
      KONG_LUA_SSL_TRUSTED_CERTIFICATE: system
      KONG_KONNECT_MODE: on
      KONG_CLUSTER_DP_LABELS: "type:docker-macOsArmOS"
      KONG_ROUTER_FLAVOR: expressions
      KONG_HEADERS_UPSTREAM: off
    ports:
      - 8000:8000
      - 8443:8443
Shuhei KawamuraShuhei Kawamura

NativeなKafkaクライアント(ここでは、kafkactl)を利用してPub/Subしてみる。

まずは、トピックを作成する。

$ kafkactl create topic test
topic created: test

作成できたかどうか確認する。

$ kafkactl get topics
TOPIC     PARTITIONS     REPLICATION FACTOR
test      1              1

test トピックをサブスクライブしておく。

$ kafkactl consume test

別のターミナルから test トピックにメッセージをプロデュースする。

$ kafkactl produce test -v hello-world
message produced (partition=0	offset=0)

最初のターミナルできちんとメッセージがサブスクライブできていることを確認する。

$ kafkactl consume test
hello-world
Shuhei KawamuraShuhei Kawamura

Kafka Upstreamプラグインを設定して、HTTPクライアント(ここでは、curl)を使ってメッセージをプロデュースしてみる。まずは、Kong Gatewayの設定をする。

kong.yaml
_format_version: "3.0"
_konnect:
  control_plane_name: local-gateway

services:
  - name: kafka-service
    url: https://httpbin.org
    routes:
      - name: kafka-route
        paths:
          - /mock
        strip_path: true

plugins:
  - name: kafka-upstream
    service: kafka-service
    config:
      topic: test
      bootstrap_servers:
        - host: kafka
          port: 9092

decKを用いてKongに設定を加える。(Control Plane名やPATなどは適宜設定してください)

deck gateway sync kong.yaml

メッセージをHTTPクライアントからプロデュースしてみる。

curl -X POST http://localhost:8000/mock -d hello-world

継続してサブスクライブしているターミナルで以下のように表示されれば成功です。

{"body_args":{"hello-world":true},"body":"hello-world"}
Shuhei KawamuraShuhei Kawamura

Kongの設定ファイルを以下に置換する。

kong.yaml
_format_version: "3.0"
_konnect:
  control_plane_name: local-gateway

services:
  - name: kafka-produce-service
    url: https://httpbin.org
    routes:
      - name: produce-route
        paths:
          - /produce
        strip_path: true
  - name: kafka-consume-service
    url: https://httpbin.org
    routes:
      - name: consume-route
        paths:
          - /consume
        strip_path: true

plugins:
  - name: kafka-upstream
    service: kafka-produce-service
    config:
      topic: test
      bootstrap_servers:
        - host: kafka
          port: 9092
  - name: kafka-consume
    service: kafka-consume-service
    config:
      bootstrap_servers:
        - host: kafka
          port: 9092
      topics:
        - name: test
      mode: http-get
Shuhei KawamuraShuhei Kawamura

トピックからメッセージを読み出してみる。

curl -X GET http://localhost:8000/consume

エラーが出た

konnect-gateway  | 2025/09/30 08:20:17 [warn] 2641#0: *23001 [kong] consumers.lua:168 [kafka-consume] no cluster_name provided in plugin configuration, using default cluster name. If more than one Kafka plugin is configured without a cluster_name, these plugins will use the same cluster, client: 192.168.65.1, server: kong, request: "GET /consume HTTP/1.1", host: "localhost:8000", request_id: "892f1439775cfa2184e5d140ff7cde67"
konnect-gateway  | 2025/09/30 08:20:17 [warn] 2641#0: *23001 [lua] client.lua:265: negotiate_all_api_versions(): Provided min_version subceeds min_version for ApiKey: 0 ProduceRequest(KAFKA-18659) use suggest_min:3, client: 192.168.65.1, server: kong, request: "GET /consume HTTP/1.1", host: "localhost:8000", request_id: "892f1439775cfa2184e5d140ff7cde67"
konnect-gateway  | 2025/09/30 08:20:17 [warn] 2641#0: *23001 [lua] client.lua:260: negotiate_all_api_versions(): Provided min_version subceeds min_version for ApiKey: 19 use suggest_min:2, client: 192.168.65.1, server: kong, request: "GET /consume HTTP/1.1", host: "localhost:8000", request_id: "892f1439775cfa2184e5d140ff7cde67"
konnect-gateway  | 2025/09/30 08:20:17 [warn] 2641#0: *23001 [lua] client.lua:260: negotiate_all_api_versions(): Provided min_version subceeds min_version for ApiKey: 8 use suggest_min:2, client: 192.168.65.1, server: kong, request: "GET /consume HTTP/1.1", host: "localhost:8000", request_id: "892f1439775cfa2184e5d140ff7cde67"
konnect-gateway  | 2025/09/30 08:20:17 [warn] 2641#0: *23001 [lua] client.lua:416: get_group_coordinator(): failed to get group coordinator (attempt 1 of 5). Retrying in 1 seconds, client: 192.168.65.1, server: kong, request: "GET /consume HTTP/1.1", host: "localhost:8000", request_id: "892f1439775cfa2184e5d140ff7cde67"
konnect-gateway  | 2025/09/30 08:20:18 [warn] 2641#0: *23001 [lua] client.lua:416: get_group_coordinator(): failed to get group coordinator (attempt 2 of 5). Retrying in 1 seconds, client: 192.168.65.1, server: kong, request: "GET /consume HTTP/1.1", host: "localhost:8000", request_id: "892f1439775cfa2184e5d140ff7cde67"
konnect-gateway  | 2025/09/30 08:20:19 [warn] 2641#0: *23001 [lua] client.lua:416: get_group_coordinator(): failed to get group coordinator (attempt 3 of 5). Retrying in 1 seconds, client: 192.168.65.1, server: kong, request: "GET /consume HTTP/1.1", host: "localhost:8000", request_id: "892f1439775cfa2184e5d140ff7cde67"
konnect-gateway  | 2025/09/30 08:20:20 [warn] 2641#0: *23001 [lua] client.lua:416: get_group_coordinator(): failed to get group coordinator (attempt 4 of 5). Retrying in 1 seconds, client: 192.168.65.1, server: kong, request: "GET /consume HTTP/1.1", host: "localhost:8000", request_id: "892f1439775cfa2184e5d140ff7cde67"
konnect-gateway  | 2025/09/30 08:20:21 [warn] 2641#0: *23001 [lua] consumer.lua:487: subscribe(): failed to get group coordinator, retrying : failed to find coordinator for group: com.konghq.kafka.bab1885ddafdd3916909414673e240b4 after 5 attempts, client: 192.168.65.1, server: kong, request: "GET /consume HTTP/1.1", host: "localhost:8000", request_id: "892f1439775cfa2184e5d140ff7cde67"
konnect-gateway  | 2025/09/30 08:20:22 [warn] 2641#0: *23001 [lua] client.lua:416: get_group_coordinator(): failed to get group coordinator (attempt 1 of 5). Retrying in 1 seconds, client: 192.168.65.1, server: kong, request: "GET /consume HTTP/1.1", host: "localhost:8000", request_id: "892f1439775cfa2184e5d140ff7cde67"
konnect-gateway  | 2025/09/30 08:20:23 [warn] 2641#0: *23001 [lua] client.lua:416: get_group_coordinator(): failed to get group coordinator (attempt 2 of 5). Retrying in 1 seconds, client: 192.168.65.1, server: kong, request: "GET /consume HTTP/1.1", host: "localhost:8000", request_id: "892f1439775cfa2184e5d140ff7cde67"
konnect-gateway  | 2025/09/30 08:20:24 [warn] 2641#0: *23001 [lua] client.lua:416: get_group_coordinator(): failed to get group coordinator (attempt 3 of 5). Retrying in 1 seconds, client: 192.168.65.1, server: kong, request: "GET /consume HTTP/1.1", host: "localhost:8000", request_id: "892f1439775cfa2184e5d140ff7cde67"
konnect-gateway  | 2025/09/30 08:20:25 [warn] 2641#0: *23001 [lua] client.lua:416: get_group_coordinator(): failed to get group coordinator (attempt 4 of 5). Retrying in 1 seconds, client: 192.168.65.1, server: kong, request: "GET /consume HTTP/1.1", host: "localhost:8000", request_id: "892f1439775cfa2184e5d140ff7cde67"
konnect-gateway  | 2025/09/30 08:20:26 [warn] 2641#0: *23001 [lua] consumer.lua:487: subscribe(): failed to get group coordinator, retrying : failed to find coordinator for group: com.konghq.kafka.bab1885ddafdd3916909414673e240b4 after 5 attempts, client: 192.168.65.1, server: kong, request: "GET /consume HTTP/1.1", host: "localhost:8000", request_id: "892f1439775cfa2184e5d140ff7cde67"
konnect-gateway  | 2025/09/30 08:20:27 [warn] 2641#0: *23001 [lua] client.lua:416: get_group_coordinator(): failed to get group coordinator (attempt 1 of 5). Retrying in 1 seconds, client: 192.168.65.1, server: kong, request: "GET /consume HTTP/1.1", host: "localhost:8000", request_id: "892f1439775cfa2184e5d140ff7cde67"
konnect-gateway  | 2025/09/30 08:20:28 [warn] 2641#0: *23001 [lua] client.lua:416: get_group_coordinator(): failed to get group coordinator (attempt 2 of 5). Retrying in 1 seconds, client: 192.168.65.1, server: kong, request: "GET /consume HTTP/1.1", host: "localhost:8000", request_id: "892f1439775cfa2184e5d140ff7cde67"
konnect-gateway  | 2025/09/30 08:20:29 [warn] 2641#0: *23001 [lua] client.lua:416: get_group_coordinator(): failed to get group coordinator (attempt 3 of 5). Retrying in 1 seconds, client: 192.168.65.1, server: kong, request: "GET /consume HTTP/1.1", host: "localhost:8000", request_id: "892f1439775cfa2184e5d140ff7cde67"
konnect-gateway  | 2025/09/30 08:20:30 [warn] 2641#0: *23001 [lua] client.lua:416: get_group_coordinator(): failed to get group coordinator (attempt 4 of 5). Retrying in 1 seconds, client: 192.168.65.1, server: kong, request: "GET /consume HTTP/1.1", host: "localhost:8000", request_id: "892f1439775cfa2184e5d140ff7cde67"
konnect-gateway  | 2025/09/30 08:20:31 [warn] 2641#0: *23001 [lua] consumer.lua:487: subscribe(): failed to get group coordinator, retrying : failed to find coordinator for group: com.konghq.kafka.bab1885ddafdd3916909414673e240b4 after 5 attempts, client: 192.168.65.1, server: kong, request: "GET /consume HTTP/1.1", host: "localhost:8000", request_id: "892f1439775cfa2184e5d140ff7cde67"
konnect-gateway  | 2025/09/30 08:20:32 [error] 2641#0: *23001 [kong] consumers.lua:257 [kafka-consume] Error while processing: failed to subscribe to Kafka topic, client: 192.168.65.1, server: kong, request: "GET /consume HTTP/1.1", host: "localhost:8000", request_id: "892f1439775cfa2184e5d140ff7cde67"
konnect-gateway  | 192.168.65.1 - - [30/Sep/2025:08:20:32 +0000] "GET /consume HTTP/1.1" 502 126 "-" "curl/8.7.1" kong_request_id: "892f1439775cfa2184e5d140ff7cde67"