Open12

Confluent Kafka(とApache Druid)で気になったこと

Naoki FujitaNaoki Fujita

https://kafka.apache.org/documentation
基本的にはこのドキュメントが一時情報源なので、こちらを読む。
実際にはApache KafkaにはSchema RegistryがないのでConfluent版のKafkaを使う予定だが、技術的なドキュメントとしてはこちらの方がよりダイレクトにまとまっているのでこっちを読んでいる。
筆者の調査動機としては、ビッグデータをスマートに捌く方法を知る上でKafkaは避けて通れないし、学習価値があると考えているからである。
個人的な関心しか書き連ねないのでスクラップで

Naoki FujitaNaoki Fujita

The deficiency of a naive pull-based system is that if the broker has no data the consumer may end up polling in a tight loop, effectively busy-waiting for data to arrive. To avoid this we have parameters in our pull request that allow the consumer request to block in a "long poll" waiting until data arrives (and optionally waiting until a given number of bytes is available to ensure large transfer sizes).

Kafka ConsumerがKafka BrokerからデータをPullするときに、通常はポーリングが必要で低レイテンシーなプルを実現することを考えるとビジーウェイトに近い状態になる。これを避けるためのロングポーリングオプションがある。また転送サイズを指定しそれが満たされるまで待つというオプションもある。

Naoki FujitaNaoki Fujita

https://kafka.apache.org/documentation/#design_consumerposition
Kafka ConsumerとKafka Brokerにおいて、どのメッセージを消費したのかについて合意を取ることは難しいようだ。(少し考えればわかることだが)

パターン1

メッセージキューはBrokerが保持している。
BrokerがConsumerへメッセージ送信し、ACKを受け取った段階で未処理→消費済にステート変化させる。
この場合Consumerがその後の実行時エラーにより停止した場合、(消費済とみなした)メッセージはロストする。

パターン2

パターン1において、ACKを受け取った段階で、未処理→消費済ではなく、未処理→送信済にステート変化させる。さらにConsumerがメッセージ処理に成功した段階でシグナルを発行し送信済→消費済にステート変化させる。
この場合処理には成功したが、何らかの理由でシグナルの送受信に失敗した場合、同一メッセージは2回処理される。またステート管理が煩雑になりすぎて大変である。

パターン3(Kafkaのアプローチ)

BrokerはあるConsumer(Consumer Group)が次に処理すべきメッセージの位置(オフセット)を保持する。この設計により、メッセージごとの状態は必要なくなり、topic(正確にはpartition)に対して単一の整数値で状態管理できる。
この設計の副次的効果としてConsumerの処理にバグがあった場合オフセットを巻き戻すことができる。
(しかしパターン1, パターン2のような送信完了や、処理完了についてBroker、Consumerがどういう動きをするかはここで明瞭に示されていない。)

Naoki FujitaNaoki Fujita

https://kafka.apache.org/documentation/#semantics
メッセージセマンティクスの保証について(at most once, at least once, exactly onceなど)書かれている。保証とは、

  1. メッセージをパブリッシュする時の耐久性の保証
  2. メッセージを消費する時の保証

という2つの保証を分けて考えることができると示唆している。
多くのシステムではexactly onceがあるとうたわれるものも多いが、それだと粗すぎるようだ。
例えば具体的には、

  1. ConsumerまたはProducerが処理に失敗した場合
  2. 複数のConsumerが存在する場合
  3. データが記録されているディスクが利用不能になった場合

などについてシステムの挙動を示す必要がある(が、このことが示されていないことが多い。)
v0.11.0以前では、ProducerからBrokerへの送信時にACKを受理できなければ再送信する他なかった。(この場合、exactly onceは保証し得ない。)
v0.11.0以降では、べき等配信がサポートされた。メッセージとともにProducer Idとシーケンス番号を送信することで重複排除できる。(TCPっぽい)
v0.11.0以降では、さらにトランザクション(セマンティクス)もサポートされた。Kafkaがいうトランザクションとは複数のTopic(Partition)に同時に書き込む(Commit)か、失敗させる(Rollback)かをアトミックに処理する機能のこと。
Producerには送達保証にはいくつかのレベルがある。(パフォーマンスと送達保証のトレードオフ)

  1. メッセージがコミットされるのを待つ(10msオーダーの時間がかかる)
  2. leader Brokerにコミットされるのを待つ(followerにコミットされたかは気にしない)
  3. 完全に非同期

Consumer側にもいろいろな選択肢がある。

  • メッセージ読み込み→オフセットの移動、保存→メッセージ処理
    • この場合メッセージ処理に失敗すると処理が欠落する(at most once)
  • メッセージ読み込み→メッセージ処理→オフセットの移動、保存
    • この場合処理後にオフセット保存に失敗するとメッセージが重複処理されるおそれ(at least once)
      • PrimaryKeyに対するupdateなどベキ等処理については問題にはならない

Kafka Streamにおけるat exact once

  • 2つのトピックを同時に操作する(そのような好例こそKafka Stream)
    • メッセージにConsumerのオフセットを保存する
    • 出力トピックと入力トピックにKafkaトランザクションを使って同時に書き込む
    • メッセージ処理とオフセット保存がトランザクションでアトミックに処理できるのでexactly onceを保証できる
      • Kafkaにも分離レベルがある、デフォルトはread_uncommited

外部システムと連携する場合のat exact once

  • 古典的な方法
    • Consumerのメッセージ保存とオフセット保存の間に2 phase commitを導入する
  • 簡便な方法
    • 出力とオフセットを同じ場所に保存する
    • 2 phase commitをサポートしてなくても導入できる
    • Kafka Connectがこのような処理をうまく扱ってくれる

それ以外のケースではKafkaはデフォルトではat least onceをサポートする。Producerのリトライ処理をなくしConsumerでのメッセージ処理前にオフセットを保存することでat most onceもサポートする。

Naoki FujitaNaoki Fujita

ksqlDBでうまくストリームをキャプチャできないので色々調査する必要ありそう。
topic pageviewsを流れるストリームフローを確認するコマンド(kcat)

docker run -it --network=host edenhill/kcat:1.7.1 -b localhost:9092 -t pageviews -o end

で確認したがトピックにはデータは流れていそう。ksqlは今すぐ使う予定がないのでスルーしてもいいのだが、quickstartの通りにいかないのも気になる。

docker exec -it ksqldb-cli ksql http://ksqldb-server:8088

ksqldb-cliで直繋ぎするとストリームが動作していることを確認。LAN越しでcontrol centerにアクセスしたのがまずいのかもしれない、とりあえず先に進める。

P.S.
control centerをhostからlocalhost:9021にアクセスする分には問題なくストリームをキャプチャできた。

Naoki FujitaNaoki Fujita

schema registryを扱うにあたり、いくつかスキーマ定義の形式が選択できる。
Avro vs protobuf vs JSON schema
見たところAvroの事例が多い。これはKafkaのデフォルトであるからとのこと。
一方でprotobufで構成した方がアプリケーションサーバの定義と合わせて一元管理できるのでは、と思い軽く温度感を調査した。
https://dev.to/saubury/kafka-with-avro-vs-kafka-with-protobuf-vs-kafka-with-json-schema-1nd8
https://blog.devops.dev/kafka-throughput-and-performance-with-avro-vs-protobuf-serialization-formats-456d399959ca
Avroの方が柔軟性は高いがProtobufの方が実行性能は高そうな感じである。
とりあえず動く方で

Naoki FujitaNaoki Fujita

https://www.confluent.io/blog/put-several-event-types-kafka-topic/
topicとschemaの互換性に関する重要な記事
トピックは特にイベントを処理するという利用法を想定すると、さまざまなイベントタイプ(スキーマ)のデータを処理できなければならない。それはKafkaにおいてはトピック、より正確にはパーティション内でイベントの順序性が保持されるからである。複数のパーティションへの分離は処理並行性を高める文脈において有効だが、イベントが強く順序に依存する処理(≒トランザクション)においてはデータ不整合(イベント集約の失敗)を招く。
デフォルトではスキーマレジストリはあるトピックについて後方互換性を破壊するスキーマ変更を容認しなかったが(TopicNameStrategy)、この制限を緩和するストラテジをいくつか追加したというのが記事の趣旨である。
当初トピックはRDBのテーブルに似たものであると考えられ、従ってアップストリームとダウンストリーム間でのスキーマの共通化に強い意味があると考えられた。(RDBを使う時、更新側と参照側で全く同じスキーマであると仮定して使うよね?)
トピックにはそこに順序性の保持というセマンティクスが重要になってくることがある。トピックはスキーマの共通化と順序性の保持という2つの役割を持っている以上、スキーマの共通化よりも順序性の保持が優先されるべきユースケースが確かに存在するということである。
実際プロデューサとコンシューマを非同期的に変更を加えることを考えると、スキーマの後方互換性(コントラクト)が保たれなければ、コンシューマが突如理解不能なデータプロトコルに晒される危険性はある。それと順序性の保持の天秤にかけたときに、順序性の方が重要視されるケースはザラにある。
ところでAvroにおける後方互換性とはどこまであるのかは考えてみる必要がある。例えば適切にイベントタイプを(例えば代数的データ型、protobufの言うところのoneofに基づき)表現した場合は、データの後方互換性を保ちつつイベントタイプの追加に対する適応を表現できるかもしれない。
あと順序性の絶対的な保持は水平スケーリングができないことを意味する。これは古典的なDB論で言うところのSerializableと同じことだ。

Naoki FujitaNaoki Fujita

https://martin.kleppmann.com/2012/12/05/schema-evolution-in-avro-protocol-buffers-thrift.html
Martin Kleppmann先生は天才である

https://franklinlindemberg.medium.com/using-protobuf-with-apache-kafka-and-without-schema-registry-8535f43a2569
理論上はschema registryはOFFにできると言う記事、ByteArrayとしてシリアライズし、クライアントサイドでデコードする。
ただこれをやるとksqlなどの便利ツールは利用できない可能性がある。
と思いきや↓のような記事もある
https://www.confluent.io/blog/announcing-ksqldb-0-27-1/#protobufs-without-schema-registry

schema registryは後方互換性を検証すると言う役割をまだ持っているが、これは↓でカットできる
https://stackoverflow.com/a/69361114/5225993
突き詰めるとschema-registryのホスティングが不要になる可能性があるので、この路線で攻める

https://blog.hellmar-becker.de/2022/05/26/ingesting-protobuf-messages-into-apache-druid/
https://druid.apache.org/docs/0.22.1/development/extensions-core/protobuf.html
最終的なゴールはここらへんにある

Naoki FujitaNaoki Fujita

Rust->Kafka(Confluent版) -> Apache Druidの流れが大体掴めた
色々と山ほど試して書きたいことは山ほどあるが、とりあえず色々有益っぽいことのみ記す。

  • Apache Druidで.descファイルベースでのprotobufのデコーディングを試すなら、例えば以下のように.descファイルを生成することを薦める

    • protoc --descriptor_set_out /tmp/metrics.desc --include_imports ./protos/protobuf/v1/metrics.proto
      • importしたメッセージなども合わせてエンコードされる、ただそれだけ
      • protoc情報少ない
    • 関連example
  • Schema Registryを試したかったがちょっと無理っぽいので諦めた

    • 色々頑張ってRustでSchema Registry連携のコードを書いてみたが
      • https://github.com/fn-reflection/testcases/commit/ec229ce947357570184c5c080fcf63182f7dcafd
      • Confluent Control Centerではうまくデコードされたが、Druidが理解できない(file形式だと)
        • 多分schema_registry_converterがくっつけてるバイト列が解釈されたりされなかったり
      • Druidのschema_registry連携もなぜかうまくいかない
        • inputFormatが空ぶってる感覚、旧形式のparserを使うspecしか書かれていないので、それで試すもタスクが実行時エラー

サーバサイドのエラーハンドリングがゆるふわな感はあるがJavaだし、かなりニッチなユースケースをついているので仕方ないかも、とりあえず変なことはしないが正解
一旦は最低限使い倒せるようになってきたかも