🌟

メモ: Kafka Idempotent Producerにおける重複排除

に公開

Idempotent Producerとは

enable.idempotence=trueにするとKafka Producerはレコードを送信するときにProducer IDとシーケンス番号を含め、Kafka Broker側でそのシーケンス番号の確認がされるようになります。

ref. Idempotent Writer

The Kafka producer tags each batch of Events that it sends to the Kafka cluster with a sequence number.

Brokers in the cluster use this sequence number to enforce deduplication of Events sent from this specific producer

シーケンス番号が以前に送られてきたものより大きくなければout-of-sequenceエラーになります。

if its sequence number is exactly one greater than that of the last committed batch; otherwise, it results in an out-of-sequence error.

Kafka BrokerにおけるProducerの状態管理(ProducerStateManager)

Kafka Brokerで重複排除を実現するためには、各Producerが送ってきたメッセージのシーケンス番号を管理する必要がありProducerStateManagerがその役割を担っています。

ProducerStateManagerはProducer ID毎にProducerStateEntryを持ち

private final Map<Long, ProducerStateEntry> producers = new HashMap<>();

ProducerStateEntryProducer IDやシーケンス番号が含まれています

public class ProducerStateEntry {
    public static final int NUM_BATCHES_TO_RETAIN = 5;
    private final long producerId;
    private final Deque<BatchMetadata> batchMetadata = new ArrayDeque<>();

ProducerStateEntryには複数のRecord Batchが含まれていて、シーケンス番号の昇順に並んでいます。

It contains batchMetadata queue which is ordered such that the batch with the lowest sequence is at the head of the

  • queue while the batch with the highest sequence is at the tail of the queue
    public int firstSeq() {
        return isEmpty() ? RecordBatch.NO_SEQUENCE : batchMetadata.getFirst().firstSeq();
    }

    public int lastSeq() {
        return isEmpty() ? RecordBatch.NO_SEQUENCE : batchMetadata.getLast().lastSeq;
    }

ref. https://github.com/apache/kafka/blob/trunk/storage/src/main/java/org/apache/kafka/storage/internals/log/ProducerStateEntry.java#L58..L64

Record Batch

Record Batchのフォーマットは、以下のようになっています。

baseOffset: int64
batchLength: int32
partitionLeaderEpoch: int32
magic: int8 (current magic value is 2)
crc: uint32
attributes: int16
    bit 0~2:
        0: no compression
        1: gzip
        2: snappy
        3: lz4
        4: zstd
    bit 3: timestampType
    bit 4: isTransactional (0 means not transactional)
    bit 5: isControlBatch (0 means not a control batch)
    bit 6: hasDeleteHorizonMs (0 means baseTimestamp is not set as the delete horizon for compaction)
    bit 7~15: unused
lastOffsetDelta: int32
baseTimestamp: int64
maxTimestamp: int64
producerId: int64
producerEpoch: int16
baseSequence: int32
recordsCount: int32
records: [Record]

例えば、baseSequence番号を取得するときは、DefaultRecordBatchでは以下のようにオフセットから指定バイト数読むといった実装になっています。

    public int baseSequence() {
        return buffer.getInt(BASE_SEQUENCE_OFFSET);
    }

また、lastSequenceは、baseSequencelastOffsetDelataを足した値になっています。

    public int lastSequence() {
        int baseSequence = baseSequence();
        if (baseSequence == RecordBatch.NO_SEQUENCE)
            return RecordBatch.NO_SEQUENCE;
        return incrementSequence(baseSequence, lastOffsetDelta());
    }

Kafka Brokerでシーケンス番号の確認がされるまでの流れ

Produce Requestを受け取ったKafka BrokerがReplicaManager.appendRecordsを実行し

と処理が進みUnifiedLog.analyzeAndValidateProducerStateでIdempotent Producerのシーケンス番号のチェックが始まります。

Record BatchProducer Idが含まれているかを確認し

    private AnalyzeAndValidateProducerStateResult analyzeAndValidateProducerState(LogOffsetMetadata appendOffsetMetadata,
                 MemoryRecords records,
                 AppendOrigin origin,
                 VerificationGuard requestVerificationGuard) {
~~
        for (MutableRecordBatch batch : records.batches()) {
            if (batch.hasProducerId()) {

UnifiedLog.updateProducersを実行しProducer Idに対応したProducerStateEntryを取得します。

  private def updateProducers(producerStateManager: ProducerStateManager,
                              batch: RecordBatch,
                              producers: mutable.Map[Long, ProducerAppendInfo],
                              firstOffsetMetadata: Option[LogOffsetMetadata],
                              origin: AppendOrigin): Option[CompletedTxn] = {
    val producerId = batch.producerId
    val appendInfo = producers.getOrElseUpdate(producerId, producerStateManager.prepareUpdate(producerId, origin))

実際に取得するのはProducerAppendInfoで, ProducerAppendInfo.appendでシーケンス番号のチェックがされます。

  private def updateProducers(producerStateManager: ProducerStateManager,
                              batch: RecordBatch,
                              producers: mutable.Map[Long, ProducerAppendInfo],
                              firstOffsetMetadata: Option[LogOffsetMetadata],
                              origin: AppendOrigin): Option[CompletedTxn] = {
~~
    val completedTxn = appendInfo.append(batch, firstOffsetMetadata.asJava).asScala

と処理が進みシーケンス番号のチェックが始まります。inSequenceでは今回追加するRecord BatchのbaseSequenceが、ProducerStateEntryの最後のシーケンス番号 + 1と一致していない場合、OutOfOrderSequenceExceptionが投げられます

    if (!(currentEntry.producerEpoch() == RecordBatch.NO_PRODUCER_EPOCH || inSequence(currentLastSeq, appendFirstSeq))) {
                throw new OutOfOrderSequenceException("Out of order sequence number for producer " + producerId + " at " +
                        "offset " + offset + " in partition " + topicPartition + ": " + appendFirstSeq +
                        " (incoming seq. number), " + currentLastSeq + " (current end sequence number)");
            }
~~
    private boolean inSequence(int lastSeq, int nextSeq) {
        return nextSeq == lastSeq + 1L || (nextSeq == 0 && lastSeq == Integer.MAX_VALUE);
    }

この確認にパスしたら、UnifiedLog.analyzeAndValidateProducerStateから返りLocalLog.appendを実行してProducer Stateの更新をします

              localLog.append(appendInfo.lastOffset, appendInfo.maxTimestamp, appendInfo.offsetOfMaxTimestamp, validRecords)
              updateHighWatermarkWithLogEndOffset()

              // update the producer state
              updatedProducers.values.foreach(producerAppendInfo => producerStateManager.update(producerAppendInfo))

また、ProducerStateのMapEndOffsetはRecord BatchのlastOffsetに更新されます。

              // always update the last producer id map offset so that the snapshot reflects the current offset
              // even if there isn't any idempotent data being written
              producerStateManager.updateMapEndOffset(appendInfo.lastOffset + 1)

このOffsetはスナップショットを作成するとき、どのOffsetまでのスナップショットを作るかを判定するために使われます。

スナップショットについて

Kafka Brokerが保持するプロデューサーの状態は定期的にスナップショットとしてファイルに書き出されます。

    /**
     * Take a snapshot at the current end offset if one does not already exist, then return the snapshot file if taken.
     */
    public Optional<File> takeSnapshot(boolean sync) throws IOException {
        // If not a new offset, then it is not worth taking another snapshot
        if (lastMapOffset > lastSnapOffset) {
            SnapshotFile snapshotFile = new SnapshotFile(LogFileUtils.producerSnapshotFile(logDir, lastMapOffset));
            long start = time.hiResClockMs();
            writeSnapshot(snapshotFile.file(), producers, sync);

ref. https://github.com/apache/kafka/blob/3.7.0/storage/src/main/java/org/apache/kafka/storage/internals/log/ProducerStateManager.java#L477..L479

スナップショットファイルには最後に書き込まれたシーケンス番号などが書かれています。

private static final Schema PRODUCER_SNAPSHOT_ENTRY_SCHEMA =
            new Schema(new Field(PRODUCER_ID_FIELD, Type.INT64, "The producer ID"),
                    new Field(PRODUCER_EPOCH_FIELD, Type.INT16, "Current epoch of the producer"),
                    new Field(LAST_SEQUENCE_FIELD, Type.INT32, "Last written sequence of the producer"),
                    new Field(LAST_OFFSET_FIELD, Type.INT64, "Last written offset of the producer"),
                    new Field(OFFSET_DELTA_FIELD, Type.INT32, "The difference of the last sequence and first sequence in the last written batch"),
                    new Field(TIMESTAMP_FIELD, Type.INT64, "Max timestamp from the last written entry"),
                    new Field(COORDINATOR_EPOCH_FIELD, Type.INT32, "The epoch of the last transaction coordinator to send an end transaction marker"),
                    new Field(CURRENT_TXN_FIRST_OFFSET_FIELD, Type.INT64, "The first offset of the on-going transaction (-1 if there is none)"));
    private static final Schema PID_SNAPSHOT_MAP_SCHEMA =
            new Schema(new Field(VERSION_FIELD, Type.INT16, "Version of the snapshot file"),
                    new Field(CRC_FIELD, Type.UNSIGNED_INT32, "CRC of the snapshot data"),
                    new Field(PRODUCER_ENTRIES_FIELD, new ArrayOf(PRODUCER_SNAPSHOT_ENTRY_SCHEMA), "The entries in the producer table"));

ref. https://github.com/apache/kafka/blob/3.7.0/storage/src/main/java/org/apache/kafka/storage/internals/log/ProducerStateManager.java#L93..L105

このスナップショットファイルは、例えば、Brokerがシャットダウンするときに書き出され、再起動するときにスナップショットを読み込むことで、Producer Stateを復元します。

Kafka Broker再起動時のProducer Stateの復元

シャットダウン時の動作

Kafka Brokerがシャットダウンするときの大まかな流れは以下のようになっています。

  1. flushしてディスクに書き込む
  2. recovery pointを更新してrecovery-point-offset-checkpointファイルを作成する
  3. スナップショットを作成する
  4. graceful shutdownに成功した場合、.kafka_cleanshutdownファイルを作成する

Kafka Brokerがシャットダウンするとき、以下の流れでUnifiedLog.flushUnifiedLog.closeが実行されます。

  def shutdown(brokerEpoch: Long = -1): Unit = {
~~
      val jobsForDir = logs.map { log =>
        val runnable: Runnable = () => {
          // flush the log to ensure latest possible recovery point
          log.flush(true)
          log.close()
        }
        runnable
      }

UnifiedLog.flushでは、logEndOffsetまでflushされ

  def flush(forceFlushActiveSegment: Boolean): Unit = flush(logEndOffset, forceFlushActiveSegment)

recoveryPointlogEndOffsetに更新されます

  private def flush(offset: Long, includingOffset: Boolean): Unit = {
    val flushOffset = if (includingOffset) offset + 1  else offset
    val newRecoveryPoint = offset
~~
        localLog.flush(flushOffset)
        lock synchronized {
          localLog.markFlushed(newRecoveryPoint)
        }
      }
    }

LocalLog.markFlushedrecoveryPointが更新されます。

  private[log] def markFlushed(offset: Long): Unit = {
~~
    if (offset > recoveryPoint) {
      updateRecoveryPoint(offset)
      lastFlushedTime.set(time.milliseconds)
    }
  }

このrecovery pointはLogManager.checkpointLogRecoveryOffsetsによってrecovery-point-offset-checkpointファイルに書かれます。

  def checkpointLogRecoveryOffsets(): Unit = {
    val logsByDirCached = logsByDir
    liveLogDirs.foreach { logDir =>
      val logsToCheckpoint = logsInDir(logsByDirCached, logDir)
      checkpointRecoveryOffsetsInDir(logDir, logsToCheckpoint)
    }
  }

https://github.com/apache/kafka/blob/3.7.0/core/src/main/scala/kafka/log/LogManager.scala#L843..L847

  private def checkpointRecoveryOffsetsInDir(logDir: File, logsToCheckpoint: Map[TopicPartition, UnifiedLog]): Unit = {
    try {
      recoveryPointCheckpoints.get(logDir).foreach { checkpoint =>
        val recoveryOffsets = logsToCheckpoint.map { case (tp, log) => tp -> log.recoveryPoint }
        // checkpoint.write calls Utils.atomicMoveWithFallback, which flushes the parent
        // directory and guarantees crash consistency.
        checkpoint.write(recoveryOffsets)

https://github.com/apache/kafka/blob/3.7.0/core/src/main/scala/kafka/log/LogManager.scala#L133..L134

  @volatile private var recoveryPointCheckpoints = liveLogDirs.map(dir =>
    (dir, new OffsetCheckpointFile(new File(dir, RecoveryPointCheckpointFile), logDirFailureChannel))).toMap

https://github.com/apache/kafka/blob/3.7.0/core/src/main/scala/kafka/log/LogManager.scala#L1502

 val RecoveryPointCheckpointFile = "recovery-point-offset-checkpoint"

次にKafka Brokerが起動するとき、もしKafka Brokerがclean shutdownできていなかった場合、このrecovery point以降のセグメントをリカバリーします。

なお、LogManager.checkpointLogRecoveryOffsets起動時にflushRecoveryOffsetCheckPointMs毎に実行されるようにスケジュールされます。

      scheduler.schedule("kafka-recovery-point-checkpoint",
                         () => checkpointLogRecoveryOffsets(),
                         InitialTaskDelayMs,
                         flushRecoveryOffsetCheckpointMs)

その後、UnifiedLog.closeスナップショットが作成されます。

  override def close(): Unit = {
    debug("Closing log")
    lock synchronized {
~~
      maybeHandleIOException(s"Error while renaming dir for $topicPartition in dir ${dir.getParent}") {
        // We take a snapshot at the last written offset to hopefully avoid the need to scan the log
        // after restarting and to ensure that we cannot inadvertently hit the upgrade optimization
        // (the clean shutdown file is written after the logs are all closed).
        producerStateManager.takeSnapshot()

このとき、ProducerStateMangerのLastMapOffsetまでのスナップショットが作成されます。

    public Optional<File> takeSnapshot(boolean sync) throws IOException {
        // If not a new offset, then it is not worth taking another snapshot
        if (lastMapOffset > lastSnapOffset) {
            SnapshotFile snapshotFile = new SnapshotFile(LogFileUtils.producerSnapshotFile(logDir, lastMapOffset));
            long start = time.hiResClockMs();
            writeSnapshot(snapshotFile.file(), producers, sync);
            log.info("Wrote producer snapshot at offset {} with {} producer ids in {} ms.", lastMapOffset,
                    producers.size(), time.hiResClockMs() - start);

UnifiedLog.closeが完了した後、CleanShutdownFileHandler.writeを実行し

            val cleanShutdownFileHandler = new CleanShutdownFileHandler(dir.getPath)
            debug(s"Writing clean shutdown marker at $dir with broker epoch=$brokerEpoch")
            CoreUtils.swallow(cleanShutdownFileHandler.write(brokerEpoch), this)

それぞれのdataディレクトリに.kafka_cleanshutdownファイルが作成されます。後にKafka Brokerが再起動するときに、このファイルが作成されていればclean shutdownができたと判定されます。

Kafka Broker起動時の動作

Kafka Brokerが起動するときの大まかな流れは以下のようになっています。

  1. clean shutdownができたかどうかを確認する
  2. recovery-point-offset-checkpointファイルからrecovery pointを取得する
  3. clean shutdownができていなかった場合、recovery point以降のセグメントをリカバリーする
  4. スナップショットを読み込み、Producer Stateを再構築する

Kafka Brokerが再起動するときにProducer Stateの再構築が始まります。

と進み.kafka_cleanshutdownファイルがあればhadCleanShutdownをtrueにします。

  private[log] def loadLogs(defaultConfig: LogConfig, topicConfigOverrides: Map[String, LogConfig]): Unit = {
~~
        if (cleanShutdownFileHandler.exists()) {
          // Cache the clean shutdown status and use that for rest of log loading workflow. Delete the CleanShutdownFile
          // so that if broker crashes while loading the log, it is considered hard shutdown during the next boot up. KAFKA-10471
          cleanShutdownFileHandler.delete()
          hadCleanShutdown = true
        }
        hadCl

その後、recovery-point-offset-checkpointファイルからrecovery pointを読み(これはlogEndOffsetになっているはず?)

recoveryPoints = this.recoveryPointCheckpoints(dir).read()

LogManager.loadlogを実行します

LogManager.loadlogUnifiedLogを作成LogLoader.loadLogLoader.recoverlogclean shutdownしなかたっときのためのリカバリーがされます。

    // If we have the clean shutdown marker, skip recovery.
    if (!hadCleanShutdown) {
      val unflushed = segments.values(recoveryPointCheckpoint, Long.MaxValue)
      val unflushedIter = unflushed.iterator
~~
      while (unflushedIter.hasNext && !truncated) {
        val segment = unflushedIter.next()
~~
          try {
            recoverSegment(segment)

まずは、LogLoader.recoverSegmentでUnifiedLog.rebuildProduceStateを実行しProducer Stateの再構築をします。

  private def recoverSegment(segment: LogSegment): Int = {
    val producerStateManager = new ProducerStateManager(
      topicPartition,
      dir,
      this.producerStateManager.maxTransactionTimeoutMs(),
      this.producerStateManager.producerStateManagerConfig(),
      time)
    UnifiedLog.rebuildProducerState(
      producerStateManager,
      segments,
      logStartOffsetCheckpoint,
      segment.baseOffset,
      config.recordVersion,
      time,
      reloadFromCleanShutdown = false,
      logIdent)

次にProducerStateManager.truncateAndReloadを実行し

 private[log] def rebuildProducerState(producerStateManager: ProducerStateManager,
                                        segments: LogSegments,
                                        logStartOffset: Long,
                                        lastOffset: Long,
                                        recordVersion: RecordVersion,
                                        time: Time,
                                        reloadFromCleanShutdown: Boolean,
                                        logPrefix: String): Unit = {
~~
      info(s"${logPrefix}Reloading from producer snapshot and rebuilding producer state from offset $lastOffset")
~~
      producerStateManager.truncateAndReload(logStartOffset, lastOffset, time.milliseconds())

ProducerStateManger.loadFromSnapshotで

    public void truncateAndReload(long logStartOffset, long logEndOffset, long currentTimeMs) throws IOException {
~~
        if (logEndOffset != mapEndOffset()) {
            clearProducerIds();
            ongoingTxns.clear();
            updateOldestTxnTimestamp();

            // since we assume that the offset is less than or equal to the high watermark, it is
            // safe to clear the unreplicated transactions
            unreplicatedTxns.clear();
            loadFromSnapshot(logStartOffset, currentTimeMs);
~~

最新のスナップショットを取得しlastMapOffsetなどを更新します


    private void loadFromSnapshot(long logStartOffset, long currentTime) throws IOException {
~~
            Optional<SnapshotFile> latestSnapshotFileOptional = latestSnapshotFile();
~~
                    log.info("Loading producer state from snapshot file '{}'", snapshot);
                    Stream<ProducerStateEntry> loadedProducers = readSnapshot(snapshot.file()).stream().filter(producerEntry -> !isProducerExpired(currentTime, producerEntry));
                    loadedProducers.forEach(this::loadProducerEntry);
                    lastSnapOffset = snapshot.offset;
                    lastMapOffset = lastSnapOffset;`

最新のスナップショットを読み込んだ後、スナップショットからProducer Stateを復元できたoffsetがlastOffset(↑でsegment.baseOffsetが渡される)以下の場合、対象セグメントの読み込みを行います

      // Only do the potentially expensive reloading if the last snapshot offset is lower than the log end
      // offset (which would be the case on first startup) and there were active producers prior to truncation
~~
      if (lastOffset > producerStateManager.mapEndOffset && !isEmptyBeforeTruncation) {
        val segmentOfLastOffset = segments.floorSegment(lastOffset)

        segments.values(producerStateManager.mapEndOffset, lastOffset).forEach { segment =>
~~

LogSegment.readで対象セグメントを読み込み

  private[log] def rebuildProducerState(producerStateManager: ProducerStateManager,
                                        segments: LogSegments,
                                        logStartOffset: Long,
                                        lastOffset: Long,
                                        recordVersion: RecordVersion,
                                        time: Time,
                                        reloadFromCleanShutdown: Boolean,
                                        logPrefix: String): Unit = {
~~
~~
          val fetchDataInfo = segment.read(startOffset, Int.MaxValue, maxPosition)
          if (fetchDataInfo != null)
            loadProducersFromRecords(producerStateManager, fetchDataInfo.records)

UnifiedLog.loadProducersFromRecordsを実行します。

UnifiedLog.LoadProducersFromRecordsで上記でも出てきたUnifiedLog.updateProducersを実行しProducer Stateの再構築を行います。

リカバリー完了後、LogLoader.loadの処理に戻り再度UnifiedLog.rebuildProducerStateを実行しProducerStateMangerにProducer Stateが再構築されます。

    // Reload all snapshots into the ProducerStateManager cache, the intermediate ProducerStateManager used
    // during log recovery may have deleted some files without the LogLoader.producerStateManager instance witnessing the
    // deletion.
    producerStateManager.removeStraySnapshots(segments.baseOffsets)
    UnifiedLog.rebuildProducerState(

Discussion