メモ: Kafka Idempotent Producerにおける重複排除
Idempotent Producerとは
enable.idempotence=true
にするとKafka Producerはレコードを送信するときにProducer IDとシーケンス番号を含め、Kafka Broker側でそのシーケンス番号の確認がされるようになります。
ref. Idempotent Writer
The Kafka producer tags each batch of Events that it sends to the Kafka cluster with a sequence number.
Brokers in the cluster use this sequence number to enforce deduplication of Events sent from this specific producer
シーケンス番号が以前に送られてきたものより大きくなければout-of-sequenceエラーになります。
if its sequence number is exactly one greater than that of the last committed batch; otherwise, it results in an out-of-sequence error.
ProducerStateManager
)
Kafka BrokerにおけるProducerの状態管理(Kafka Brokerで重複排除を実現するためには、各Producerが送ってきたメッセージのシーケンス番号を管理する必要がありProducerStateManagerがその役割を担っています。
ProducerStateManagerはProducer ID毎にProducerStateEntry
を持ち
private final Map<Long, ProducerStateEntry> producers = new HashMap<>();
ProducerStateEntry
にProducer IDやシーケンス番号が含まれています。
public class ProducerStateEntry {
public static final int NUM_BATCHES_TO_RETAIN = 5;
private final long producerId;
private final Deque<BatchMetadata> batchMetadata = new ArrayDeque<>();
ProducerStateEntry
には複数のRecord Batchが含まれていて、シーケンス番号の昇順に並んでいます。
It contains batchMetadata queue which is ordered such that the batch with the lowest sequence is at the head of the
- queue while the batch with the highest sequence is at the tail of the queue
public int firstSeq() {
return isEmpty() ? RecordBatch.NO_SEQUENCE : batchMetadata.getFirst().firstSeq();
}
public int lastSeq() {
return isEmpty() ? RecordBatch.NO_SEQUENCE : batchMetadata.getLast().lastSeq;
}
Record Batch
Record Batchのフォーマットは、以下のようになっています。
baseOffset: int64
batchLength: int32
partitionLeaderEpoch: int32
magic: int8 (current magic value is 2)
crc: uint32
attributes: int16
bit 0~2:
0: no compression
1: gzip
2: snappy
3: lz4
4: zstd
bit 3: timestampType
bit 4: isTransactional (0 means not transactional)
bit 5: isControlBatch (0 means not a control batch)
bit 6: hasDeleteHorizonMs (0 means baseTimestamp is not set as the delete horizon for compaction)
bit 7~15: unused
lastOffsetDelta: int32
baseTimestamp: int64
maxTimestamp: int64
producerId: int64
producerEpoch: int16
baseSequence: int32
recordsCount: int32
records: [Record]
例えば、baseSequence
番号を取得するときは、DefaultRecordBatchでは以下のようにオフセットから指定バイト数読むといった実装になっています。
public int baseSequence() {
return buffer.getInt(BASE_SEQUENCE_OFFSET);
}
また、lastSequence
は、baseSequence
にlastOffsetDelata
を足した値になっています。
public int lastSequence() {
int baseSequence = baseSequence();
if (baseSequence == RecordBatch.NO_SEQUENCE)
return RecordBatch.NO_SEQUENCE;
return incrementSequence(baseSequence, lastOffsetDelta());
}
Kafka Brokerでシーケンス番号の確認がされるまでの流れ
Produce Requestを受け取ったKafka BrokerがReplicaManager.appendRecordsを実行し
- ReplicaManager.appendEntries
- ReplicaManager.appendToLocalLog
- Partition.appendRecordsToLeader
- UnifiedLog.appendAsLeader
- UnifiedLog.append
と処理が進みUnifiedLog.analyzeAndValidateProducerStateでIdempotent Producerのシーケンス番号のチェックが始まります。
Record BatchにProducer Idが含まれているかを確認し
private AnalyzeAndValidateProducerStateResult analyzeAndValidateProducerState(LogOffsetMetadata appendOffsetMetadata,
MemoryRecords records,
AppendOrigin origin,
VerificationGuard requestVerificationGuard) {
~~
for (MutableRecordBatch batch : records.batches()) {
if (batch.hasProducerId()) {
UnifiedLog.updateProducersを実行しProducer Idに対応したProducerStateEntryを取得します。
private def updateProducers(producerStateManager: ProducerStateManager,
batch: RecordBatch,
producers: mutable.Map[Long, ProducerAppendInfo],
firstOffsetMetadata: Option[LogOffsetMetadata],
origin: AppendOrigin): Option[CompletedTxn] = {
val producerId = batch.producerId
val appendInfo = producers.getOrElseUpdate(producerId, producerStateManager.prepareUpdate(producerId, origin))
実際に取得するのはProducerAppendInfoで, ProducerAppendInfo.appendでシーケンス番号のチェックがされます。
private def updateProducers(producerStateManager: ProducerStateManager,
batch: RecordBatch,
producers: mutable.Map[Long, ProducerAppendInfo],
firstOffsetMetadata: Option[LogOffsetMetadata],
origin: AppendOrigin): Option[CompletedTxn] = {
~~
val completedTxn = appendInfo.append(batch, firstOffsetMetadata.asJava).asScala
- ProducerAppendInfo.appendDataBatch
- ProducerAppendInfo.maybeValidateDataBatch
- ProducerAppendInfo.checkSequence
- ProducerAppendInfo.inSequence
と処理が進みシーケンス番号のチェックが始まります。inSequence
では今回追加するRecord BatchのbaseSequence
が、ProducerStateEntry
の最後のシーケンス番号 + 1と一致していない場合、OutOfOrderSequenceException
が投げられます
if (!(currentEntry.producerEpoch() == RecordBatch.NO_PRODUCER_EPOCH || inSequence(currentLastSeq, appendFirstSeq))) {
throw new OutOfOrderSequenceException("Out of order sequence number for producer " + producerId + " at " +
"offset " + offset + " in partition " + topicPartition + ": " + appendFirstSeq +
" (incoming seq. number), " + currentLastSeq + " (current end sequence number)");
}
~~
private boolean inSequence(int lastSeq, int nextSeq) {
return nextSeq == lastSeq + 1L || (nextSeq == 0 && lastSeq == Integer.MAX_VALUE);
}
この確認にパスしたら、UnifiedLog.analyzeAndValidateProducerStateから返りLocalLog.appendを実行してProducer Stateの更新をします
localLog.append(appendInfo.lastOffset, appendInfo.maxTimestamp, appendInfo.offsetOfMaxTimestamp, validRecords)
updateHighWatermarkWithLogEndOffset()
// update the producer state
updatedProducers.values.foreach(producerAppendInfo => producerStateManager.update(producerAppendInfo))
また、ProducerStateのMapEndOffsetはRecord BatchのlastOffsetに更新されます。
// always update the last producer id map offset so that the snapshot reflects the current offset
// even if there isn't any idempotent data being written
producerStateManager.updateMapEndOffset(appendInfo.lastOffset + 1)
このOffsetはスナップショットを作成するとき、どのOffsetまでのスナップショットを作るかを判定するために使われます。
スナップショットについて
Kafka Brokerが保持するプロデューサーの状態は定期的にスナップショットとしてファイルに書き出されます。
/**
* Take a snapshot at the current end offset if one does not already exist, then return the snapshot file if taken.
*/
public Optional<File> takeSnapshot(boolean sync) throws IOException {
// If not a new offset, then it is not worth taking another snapshot
if (lastMapOffset > lastSnapOffset) {
SnapshotFile snapshotFile = new SnapshotFile(LogFileUtils.producerSnapshotFile(logDir, lastMapOffset));
long start = time.hiResClockMs();
writeSnapshot(snapshotFile.file(), producers, sync);
スナップショットファイルには最後に書き込まれたシーケンス番号などが書かれています。
private static final Schema PRODUCER_SNAPSHOT_ENTRY_SCHEMA =
new Schema(new Field(PRODUCER_ID_FIELD, Type.INT64, "The producer ID"),
new Field(PRODUCER_EPOCH_FIELD, Type.INT16, "Current epoch of the producer"),
new Field(LAST_SEQUENCE_FIELD, Type.INT32, "Last written sequence of the producer"),
new Field(LAST_OFFSET_FIELD, Type.INT64, "Last written offset of the producer"),
new Field(OFFSET_DELTA_FIELD, Type.INT32, "The difference of the last sequence and first sequence in the last written batch"),
new Field(TIMESTAMP_FIELD, Type.INT64, "Max timestamp from the last written entry"),
new Field(COORDINATOR_EPOCH_FIELD, Type.INT32, "The epoch of the last transaction coordinator to send an end transaction marker"),
new Field(CURRENT_TXN_FIRST_OFFSET_FIELD, Type.INT64, "The first offset of the on-going transaction (-1 if there is none)"));
private static final Schema PID_SNAPSHOT_MAP_SCHEMA =
new Schema(new Field(VERSION_FIELD, Type.INT16, "Version of the snapshot file"),
new Field(CRC_FIELD, Type.UNSIGNED_INT32, "CRC of the snapshot data"),
new Field(PRODUCER_ENTRIES_FIELD, new ArrayOf(PRODUCER_SNAPSHOT_ENTRY_SCHEMA), "The entries in the producer table"));
このスナップショットファイルは、例えば、Brokerがシャットダウンするときに書き出され、再起動するときにスナップショットを読み込むことで、Producer Stateを復元します。
Kafka Broker再起動時のProducer Stateの復元
シャットダウン時の動作
Kafka Brokerがシャットダウンするときの大まかな流れは以下のようになっています。
- flushしてディスクに書き込む
- recovery pointを更新して
recovery-point-offset-checkpoint
ファイルを作成する - スナップショットを作成する
- graceful shutdownに成功した場合、
.kafka_cleanshutdown
ファイルを作成する
Kafka Brokerがシャットダウンするとき、以下の流れでUnifiedLog.flush
とUnifiedLog.close
が実行されます。
def shutdown(brokerEpoch: Long = -1): Unit = {
~~
val jobsForDir = logs.map { log =>
val runnable: Runnable = () => {
// flush the log to ensure latest possible recovery point
log.flush(true)
log.close()
}
runnable
}
UnifiedLog.flushでは、logEndOffset
までflushされ
def flush(forceFlushActiveSegment: Boolean): Unit = flush(logEndOffset, forceFlushActiveSegment)
recoveryPoint
がlogEndOffset
に更新されます。
private def flush(offset: Long, includingOffset: Boolean): Unit = {
val flushOffset = if (includingOffset) offset + 1 else offset
val newRecoveryPoint = offset
~~
localLog.flush(flushOffset)
lock synchronized {
localLog.markFlushed(newRecoveryPoint)
}
}
}
LocalLog.markFlushedでrecoveryPoint
が更新されます。
private[log] def markFlushed(offset: Long): Unit = {
~~
if (offset > recoveryPoint) {
updateRecoveryPoint(offset)
lastFlushedTime.set(time.milliseconds)
}
}
このrecovery pointはLogManager.checkpointLogRecoveryOffsetsによってrecovery-point-offset-checkpoint
ファイルに書かれます。
def checkpointLogRecoveryOffsets(): Unit = {
val logsByDirCached = logsByDir
liveLogDirs.foreach { logDir =>
val logsToCheckpoint = logsInDir(logsByDirCached, logDir)
checkpointRecoveryOffsetsInDir(logDir, logsToCheckpoint)
}
}
private def checkpointRecoveryOffsetsInDir(logDir: File, logsToCheckpoint: Map[TopicPartition, UnifiedLog]): Unit = {
try {
recoveryPointCheckpoints.get(logDir).foreach { checkpoint =>
val recoveryOffsets = logsToCheckpoint.map { case (tp, log) => tp -> log.recoveryPoint }
// checkpoint.write calls Utils.atomicMoveWithFallback, which flushes the parent
// directory and guarantees crash consistency.
checkpoint.write(recoveryOffsets)
@volatile private var recoveryPointCheckpoints = liveLogDirs.map(dir =>
(dir, new OffsetCheckpointFile(new File(dir, RecoveryPointCheckpointFile), logDirFailureChannel))).toMap
val RecoveryPointCheckpointFile = "recovery-point-offset-checkpoint"
次にKafka Brokerが起動するとき、もしKafka Brokerがclean shutdownできていなかった場合、このrecovery point以降のセグメントをリカバリーします。
なお、LogManager.checkpointLogRecoveryOffsets
は起動時にflushRecoveryOffsetCheckPointMs
毎に実行されるようにスケジュールされます。
scheduler.schedule("kafka-recovery-point-checkpoint",
() => checkpointLogRecoveryOffsets(),
InitialTaskDelayMs,
flushRecoveryOffsetCheckpointMs)
その後、UnifiedLog.closeでスナップショットが作成されます。
override def close(): Unit = {
debug("Closing log")
lock synchronized {
~~
maybeHandleIOException(s"Error while renaming dir for $topicPartition in dir ${dir.getParent}") {
// We take a snapshot at the last written offset to hopefully avoid the need to scan the log
// after restarting and to ensure that we cannot inadvertently hit the upgrade optimization
// (the clean shutdown file is written after the logs are all closed).
producerStateManager.takeSnapshot()
このとき、ProducerStateMangerのLastMapOffsetまでのスナップショットが作成されます。
public Optional<File> takeSnapshot(boolean sync) throws IOException {
// If not a new offset, then it is not worth taking another snapshot
if (lastMapOffset > lastSnapOffset) {
SnapshotFile snapshotFile = new SnapshotFile(LogFileUtils.producerSnapshotFile(logDir, lastMapOffset));
long start = time.hiResClockMs();
writeSnapshot(snapshotFile.file(), producers, sync);
log.info("Wrote producer snapshot at offset {} with {} producer ids in {} ms.", lastMapOffset,
producers.size(), time.hiResClockMs() - start);
UnifiedLog.close
が完了した後、CleanShutdownFileHandler.writeを実行し
val cleanShutdownFileHandler = new CleanShutdownFileHandler(dir.getPath)
debug(s"Writing clean shutdown marker at $dir with broker epoch=$brokerEpoch")
CoreUtils.swallow(cleanShutdownFileHandler.write(brokerEpoch), this)
それぞれのdataディレクトリに.kafka_cleanshutdown
ファイルが作成されます。後にKafka Brokerが再起動するときに、このファイルが作成されていればclean shutdownができたと判定されます。
Kafka Broker起動時の動作
Kafka Brokerが起動するときの大まかな流れは以下のようになっています。
- clean shutdownができたかどうかを確認する
-
recovery-point-offset-checkpoint
ファイルからrecovery pointを取得する - clean shutdownができていなかった場合、recovery point以降のセグメントをリカバリーする
- スナップショットを読み込み、Producer Stateを再構築する
Kafka Brokerが再起動するときにProducer Stateの再構築が始まります。
と進み.kafka_cleanshutdown
ファイルがあればhadCleanShutdown
をtrueにします。
private[log] def loadLogs(defaultConfig: LogConfig, topicConfigOverrides: Map[String, LogConfig]): Unit = {
~~
if (cleanShutdownFileHandler.exists()) {
// Cache the clean shutdown status and use that for rest of log loading workflow. Delete the CleanShutdownFile
// so that if broker crashes while loading the log, it is considered hard shutdown during the next boot up. KAFKA-10471
cleanShutdownFileHandler.delete()
hadCleanShutdown = true
}
hadCl
その後、recovery-point-offset-checkpoint
ファイルからrecovery pointを読み(これはlogEndOffset
になっているはず?)
recoveryPoints = this.recoveryPointCheckpoints(dir).read()
LogManager.loadlogを実行します
LogManager.loadlog
でUnifiedLogを作成しLogLoader.load → LogLoader.recoverlogでclean shutdownしなかたっときのためのリカバリーがされます。
// If we have the clean shutdown marker, skip recovery.
if (!hadCleanShutdown) {
val unflushed = segments.values(recoveryPointCheckpoint, Long.MaxValue)
val unflushedIter = unflushed.iterator
~~
while (unflushedIter.hasNext && !truncated) {
val segment = unflushedIter.next()
~~
try {
recoverSegment(segment)
まずは、LogLoader.recoverSegmentでUnifiedLog.rebuildProduceStateを実行しProducer Stateの再構築をします。
private def recoverSegment(segment: LogSegment): Int = {
val producerStateManager = new ProducerStateManager(
topicPartition,
dir,
this.producerStateManager.maxTransactionTimeoutMs(),
this.producerStateManager.producerStateManagerConfig(),
time)
UnifiedLog.rebuildProducerState(
producerStateManager,
segments,
logStartOffsetCheckpoint,
segment.baseOffset,
config.recordVersion,
time,
reloadFromCleanShutdown = false,
logIdent)
次にProducerStateManager.truncateAndReloadを実行し
private[log] def rebuildProducerState(producerStateManager: ProducerStateManager,
segments: LogSegments,
logStartOffset: Long,
lastOffset: Long,
recordVersion: RecordVersion,
time: Time,
reloadFromCleanShutdown: Boolean,
logPrefix: String): Unit = {
~~
info(s"${logPrefix}Reloading from producer snapshot and rebuilding producer state from offset $lastOffset")
~~
producerStateManager.truncateAndReload(logStartOffset, lastOffset, time.milliseconds())
ProducerStateManger.loadFromSnapshotで
public void truncateAndReload(long logStartOffset, long logEndOffset, long currentTimeMs) throws IOException {
~~
if (logEndOffset != mapEndOffset()) {
clearProducerIds();
ongoingTxns.clear();
updateOldestTxnTimestamp();
// since we assume that the offset is less than or equal to the high watermark, it is
// safe to clear the unreplicated transactions
unreplicatedTxns.clear();
loadFromSnapshot(logStartOffset, currentTimeMs);
~~
最新のスナップショットを取得しlastMapOffsetなどを更新します
private void loadFromSnapshot(long logStartOffset, long currentTime) throws IOException {
~~
Optional<SnapshotFile> latestSnapshotFileOptional = latestSnapshotFile();
~~
log.info("Loading producer state from snapshot file '{}'", snapshot);
Stream<ProducerStateEntry> loadedProducers = readSnapshot(snapshot.file()).stream().filter(producerEntry -> !isProducerExpired(currentTime, producerEntry));
loadedProducers.forEach(this::loadProducerEntry);
lastSnapOffset = snapshot.offset;
lastMapOffset = lastSnapOffset;`
最新のスナップショットを読み込んだ後、スナップショットからProducer Stateを復元できたoffsetがlastOffset
(↑でsegment.baseOffsetが渡される)以下の場合、対象セグメントの読み込みを行います
// Only do the potentially expensive reloading if the last snapshot offset is lower than the log end
// offset (which would be the case on first startup) and there were active producers prior to truncation
~~
if (lastOffset > producerStateManager.mapEndOffset && !isEmptyBeforeTruncation) {
val segmentOfLastOffset = segments.floorSegment(lastOffset)
segments.values(producerStateManager.mapEndOffset, lastOffset).forEach { segment =>
~~
private[log] def rebuildProducerState(producerStateManager: ProducerStateManager,
segments: LogSegments,
logStartOffset: Long,
lastOffset: Long,
recordVersion: RecordVersion,
time: Time,
reloadFromCleanShutdown: Boolean,
logPrefix: String): Unit = {
~~
~~
val fetchDataInfo = segment.read(startOffset, Int.MaxValue, maxPosition)
if (fetchDataInfo != null)
loadProducersFromRecords(producerStateManager, fetchDataInfo.records)
UnifiedLog.loadProducersFromRecordsを実行します。
UnifiedLog.LoadProducersFromRecords
で上記でも出てきたUnifiedLog.updateProducersを実行しProducer Stateの再構築を行います。
リカバリー完了後、LogLoader.load
の処理に戻り再度UnifiedLog.rebuildProducerState
を実行し、ProducerStateManger
にProducer Stateが再構築されます。
// Reload all snapshots into the ProducerStateManager cache, the intermediate ProducerStateManager used
// during log recovery may have deleted some files without the LogLoader.producerStateManager instance witnessing the
// deletion.
producerStateManager.removeStraySnapshots(segments.baseOffsets)
UnifiedLog.rebuildProducerState(
Discussion