Closed2

[Kafka] Tombstoneが削除されずに永遠に残り続ける問題

harrythecodeharrythecode

結論

古いバージョンのKafkaには「Tombstoneが削除されずに永遠に残り続ける」バグが存在します。このバグは 3.1.0 のリリースで修正されてます。[1]

Kafka quirks: tombstones that refuse to disappear:

簡単に言うと、特定の状況下では、Kafkaのトゥームストーン(tombstones)は定期的に「タイムアウト」が更新されるため、delete.retention.ms を尊重せずに残り続けることがあります。

解決策としては、delete.retention.ms をゼロに設定し、トゥームストーンを即座に削除する方法があります。これにより、トピックをリプレイするコンシューマーのために残しておく代わりに、すぐに削除されます。

しかし、この方法は慎重に使用する必要があります。Kafka Streams アプリケーションやチェンジログトピックのシナリオでは、リストアフェーズ中に予期せぬ副作用が生じる可能性があります。リストア中にコンパクションが実行されると、既に消費したエントリーのトゥームストーンレコードを見逃し、削除されるべきキー/値ストア内のエントリーが残ってしまうことがあります。

では次に delete.retention.ms をゼロにして、 delete policyを compact にした場合、どう言う挙動を示すのかを検証していきます。

参考になったサイト達

脚注
  1. https://archive.apache.org/dist/kafka/3.1.0/RELEASE_NOTES.html - [KAFKA-8522] - Tombstones can survive forever ↩︎

harrythecodeharrythecode

環境設定

以下の通りローカル環境を用意しテストします。

https://zenn.dev/amezousan/articles/2022-09-06-kafka-playground-setup

今回使用するバージョンは、Scala 2.12 - Kafka 2.2.1 です。

https://kafka.apache.org/downloads

から Scala 2.12 - kafka_2.12-2.2.0.tgz をダウンロードします。

結論

色んなプロパティをいじって試しましたが、Compactionが発動するのは同一キー、Valueの時だけでした。またTombstone (値が null になってるキー) 達は delete.retention.ms が0でも永遠に残り続けてました。もしかしたら特定のバージョンにしか有効でないworkaroundかも?

素直にアップグレードするか、scriptを定期実行して tombstoneをcleanupする、が良さげです。

Key Value Compaction Triggered
Same Same Yes
Same Unique No
Same null No
Unique Same No
Unique Unique No
Unique null No
produce-data.py
from datetime import datetime
from kafka import KafkaProducer

# Kafka configuration
bootstrap_servers = 'localhost:9092'
topic_name = 'projection'
number_of_messages = 100

def start_producer():
    producer = KafkaProducer(
        bootstrap_servers=bootstrap_servers,
        key_serializer=lambda k: k.encode('utf-8'),
        value_serializer=lambda v: v.encode('utf-8') if v is not None else None
    )

    # Create a large message
    large_message = 'x' * (1024 * 10)  # String of 10KB size

    for i in range(number_of_messages):
        timestamp = datetime.now().strftime("%Y%m%d%H%M%S%f")
        producer.send(topic_name, key="same", value=timestamp)
        print(f"Sent message {i+1} with data")

    producer.close()

start_producer()
  • 設定変更
./bin/kafka-topics.sh --create --bootstrap-server localhost:9092 --replication-factor 1 --partitions 1 --topic projection --config cleanup.policy=compact --config segment.bytes=10485760 --config retention.ms=86400000 delete.retention.ms=0

./bin/kafka-configs.sh --zookeeper=localhost:2181 --entity-type topics --entity-name projection --alter --add-config segment.bytes=51200

./bin/kafka-configs.sh --zookeeper=localhost:2181 --entity-type topics --entity-name projection --alter --add-config segment.ms=60000

./bin/kafka-configs.sh --zookeeper=localhost:2181 --entity-type topics --entity-name projection --alter --add-config retention.ms=60000

./bin/kafka-configs.sh --zookeeper=localhost:2181 --entity-type topics --entity-name projection --alter --add-config min.cleanable.dirty.ratio=0.7

./bin/kafka-configs.sh --zookeeper=localhost:2181 --entity-type topics --entity-name projection --alter --add-config delete.retention.ms=0

./bin/kafka-configs.sh --zookeeper=localhost:2181 --entity-type topics --entity-name projection --alter --add-config delete.retention.ms=86400000
このスクラップは4ヶ月前にクローズされました