🫧

Kafka schema-registry の取り扱い

2025/03/18に公開

大前提として kafka 自体は schema-registry には依存していない。Kafka では、メッセージが基本単位になっている。メッセージはレコードの配列をもつ。レコードは key, value, header を持つ。key, value は単なる binary の箱である。header は dict[str,str] である。ユーザがデータを格納する先は、ほとんどの場合 value である。text, json の形式で格納するように使うけれど、結局 binary として保存されている。読み書きの時点で text や json であると合意しているに過ぎない。汎用的な kafka クライアントは、このインターフェースでいつでも利用できる。

さてここで schema-registry である。schema-registry はこの binary の箱に対する合意形成のための仕組みである。Confluentが使っている方式がデファクトスタンダードで、界隈では Confluent wire format と呼ばれる(出典)。kafka プロトコル上は、次のようになっている。サードパーティー製のソフトウェアで AVRO, protobuf などをサポートしている際は、この方式をサポートしているだろう。

  • value binary は MAGIC uint8(0) と schema ID uint32 で始まる。schema ID は、どこかにある schema registry で採番された数値で、reader, writer 間で schema registry を合意しておく。
  • schema ID が採番されたスキーマは、AVRO, JSON schema, protobuf のいずれかで記述されており、schema registry に登録されている。
  • schema ID uint32 の後、AVRO でエンコードされたバイナリが続く。あるいは JSON schema の場合は、JSON が UTF-8 バイナリとして続く。protobuf は少し変則的で、index array list[int] がエンコードされた後に、ペイロードが続く。
  • record key についても record value と同様である。

Avro ドキュメントにある Single-object encoding とは異なる、という点に注意。

さて kafka レコードに schema ID が埋め込まれるわけだが、書き込んだ時と読み出した時とでスキーマが変化していることもある。ここから先は多くは実装依存だが、読み出す際に次の要素が加味される。

  • 書き込んだ時点のスキーマID
  • Subject の最新のスキーマ定義(schema-registry上)
  • 読み出し側で指定されたスキーマ定義
  • スキーマ間の migaration の適用

これらの調停ロジックは、読み書きするライブラリの中のシリアライザに組み込まれて隠蔽されている。意識せずに使えるようにと、よかれと思ってロジックが隠蔽されているのだろう。しかし結局 kafka 汎用ツールは併用することになるだろうから、そのときに大混乱しないように中身を押さえておきたい。

rest proxy

confluent の rest proxy は schema-registry をサポートしており、rest proxy に投げ込まれたペイロードについて、rest proxy が schema-registry の照会を行ったりする。

Avro を使う時は、Content-type で指定をする(Reference)。

schema の指定には value_schema_id, value_schema, key_schema_id, key_schema を使って、records と同じ階層で指定する(Code)。当然のことながら、Content-type での指定と整合させること。

value_schema を指定する際は、rest proxy が schema registry を照会して、受け入れ可能かどうかをチェックする。

import requests
import json
schema=json.loads(requests.get("https://raw.githubusercontent.com/confluentinc/confluent-kafka-python/refs/heads/master/examples/avro/user_specific.avsc).text)
res = requests.post(
    url="http://localhost:18083/topics/test",
    data=json.dumps(dict(
        records=[
            {"value": {
                "favorite_color": "red",
                "favorite_number": 1,
                "name": "foo"
            }},
        ],
        value_schema=json.dumps(schema)
    )),
    headers={"Content-Type": "application/vnd.kafka.avro.v2+json"})

value_schema_id を指定する際は、次のようになる。実際には schema registry に登録されている数値で読み替えてください。

import requests
import json
schema=json.loads(requests.get("https://raw.githubusercontent.com/confluentinc/confluent-kafka-python/refs/heads/master/examples/avro/user_specific.avsc).text)
res = requests.post(
    url="http://localhost:18083/topics/test",
    data=json.dumps(dict(
        records=[
            {"value": {
                "favorite_color": "red",
                "favorite_number": 1,
                "name": "foo"
            }},
        ],
        value_schema_id=2
    )),
    headers={"Content-Type": "application/vnd.kafka.avro.v2+json"})

protobuf や json-schema も同様である。

残念なことに、現時点では redpanda の pandaproxy は書き込みに対応していない。素朴に json で保存することのみができる。

kcat (kafkacat)

avro サポートを有効にしてビルドした上で必要なオプションが満たされると、value_schema_id といったフィールド名で schema-registry の値を読み出してくれる。

$ kcat -C -b localhost:19092 -t test -r http://localhost:18081 -s value=avro -J -e | jq
{
  "topic": "test",
  "partition": 0,
  "offset": 0,
  "tstype": "create",
  "ts": 1742182834827,
  "broker": 0,
  "key": "fc502368-d29a-4208-98f7-cfad2364f257",
  "payload": {
    "name": "foo",
    "favorite_number": 1,
    "favorite_color": "red"
  },
  "value_schema_id": 2
}

おまけ

schema-registry の設定は、この後 Iceberg に繋がっていくのだけれども、それの話は次回以降に。

Discussion