🚧

[戒め]MSKのトピック名は重複させてはいけない

に公開

こんにちは、TOKIUMでembedded SREをしております對馬です。

弊社ではCDC連携にAWS MSKを利用していますが、先日MSKに新規コネクタを作成しようとしたところ、設定不備によって一定期間データ連携が止まってしまう事態となってしまいました。

この記事では、自戒の意味も込めて、トラブルの経緯と解決の道筋を記載します。

タイトルがもろネタバレしてますが、最後まで読んでいただけると嬉しいです。

想定読者

  • AWS MSK を利用している、もしくは利用予定の方
  • Kafka / Debezium を運用されている方

AWS MSK とは?

概要

AWS MSKは、Apache Kafka をマネージドサービスとして提供するものです。
生のKafkaを運用したことがないんですが、おそらく生で扱うより断然楽に運用できるんだと思います。


AWS Black Belt

Kafka では、Producer が送信したイベントを Topic に格納し、それを Consumer に配信するという構造になっています。Producer と Consumer の間には、コネクタ が入る場合もあります。

kafkaを使うことで、マイクロサービス間でのデータ連携が、疎結合かつリアルタイムにできるというわけです。

TOKIUMでの使い方

TOKIUMには、古くより事業を支えるモノリシックなアプリケーション(with PostgreSQL)が存在します。このアプリの DB には、大量の顧客データやマスターデータがぎっしり詰まっており、他のマイクロサービスでも利用したい場面が多々あります。
こういった時に、MSK + Debezium を活用し、CDC(Change Data Capture)を通じてデータをリアルタイムで他サービスに連携しています。

前段はこのくらいにして、連携不備の説明をします。

本題

トラブルの経緯

新規サービス開発にあたり、モノリスアプリのデータを利用する必要がありました。そこでMSKの出番なわけですが、すでに 既存サービス用にMSKクラスターが稼働していたため、同じクラスターを使いつつ、データ連携自体は、新しいコネクタを作ることにしました。

既存サービスと新規サービスのSourceコネクタ設定は以下になります。
(Sinkコネクタは今回の問題とあまり関係ないので割愛します。)

◽️既存サービス

{
  "connector.class": "io.debezium.connector.postgresql.PostgresConnector",
  ...(中略)...
  "topic.prefix": "prod",
  "table.include.list": "public.users",
  "column.include.list": "public.users.id,public.users.name"
}

◽️新規サービス

{
  "connector.class": "io.debezium.connector.postgresql.PostgresConnector",
  ...(中略)...
  "topic.prefix": "prod",
  "table.include.list": "public.users",
  "column.include.list": "public.users.id,public.users.name,public.users.is_deleted"
}

ポイントは下記になります。

  • 同じテーブル(users)の連携設定であること
  • topic.prefixが同一の値であること
  • 新規サービスのみ、is_deletedカラムを連携対象に選んでいること

上記の設定でコネクタをつくってみると、既存サービスのsinkコネクタに以下のエラーが発生しはじめました

[Worker-0056be97440760ff6] [2025-07-10 02:53:22,484] ERROR [existing-service-sink-users|task-0] Failed to alter the table 'users'. (io.debezium.connector.jdbc.JdbcChangeEventSink:268)

なんやこれは。一切触っていない既存サービス側で連携不備が生じるのは想定外でした。
復旧させようと、コネクタを全て削除&再作成しましたが、エラーは解消されず。
その日は解決まで至れず、日を改めて問題に臨むこととなりました。

何が悪かったのか

異なるconsumerに対して、同じトピックが使われるようになっていました。
トピック名は、<topic.prefix>.<schema>.<table>という形式で命名されます。
例えば、

  "topic.prefix": "prod",
  "table.include.list": "public.users",

上記のように設定されていれば、prod.public.usersが作成されることになります。
新規サービスと既存サービスのコネクタ設定を見ていただければわかる通り、prefixもinclude.listも完全に一致してしまっています
その上で、新規サービスでは、既存サービスで連携していないis_deletedカラムを連携対象にしていました。
その結果、既存サービスにis_deletedを含むイベントが流れてしまい、エラーの原因となってしまっていたようです。(詳細はAWSサポート確認中)

復旧作業

以下の流れで復旧をしました。

  1. 既存のコネクタとトピックは削除
  2. トピック名がprd_<サービス名>という形式となるようにして、コネクタを再作成
  3. エラーが発生しないことを確認

急ぎ復旧をしたかったため既存のトピックを削除しましたが、復旧後にconsumerに余計なデータが残ってしまってることに気づきました。
本当はもっとよい方法があったのかも。
知見がある方がいらっしゃったらコメントいただけると幸いです。

まとめ

現在AWSサポートに問い合わせ中なので、さらに詳しいことがわかったら追記します。

mskの変更は慎重にやりましょう

TOKIUMプロダクトチーム テックブログ

Discussion