💭

メモ: Kafkaのレプリケーション

に公開

Confluentが公開しているKafkaのレプリケーションの記事が分かりやすかったので、該当コードを追って深掘りする(Advancing the Follower High Watermarkまで)

参照するコードは3.7.0

Kafkaへの書き込みと読み込みの流れ

  1. Produce Requestを受け取ったLeaderがLeaderEpochを書き込む
  2. FollowerがFetch Requestを送信する
  3. LeaderがHigh Watermarkを更新する
  4. FollowerがHigh Watermarkを更新する

LeaderEpochとは

LeaderEpochは現在のLeaderで何が行われていたかを追跡するために使われ、Leaderが変わったときに増加します。

The epoch is used to keep track of what work was done while this replica was the leader and it will be increased whenever a new leader is elected.

ref. Kafka Data Replication Protocol: A Complete Guide

LeaderがProduceされたレコードを書き込むとき、このLeaderEpochを記録します。

LeaderがLeaderEpochを書き込むまでの流れ

Produce Requestを受け取ったLeaderはhandleProduceRequestでProduce RequestのデータからTopicPartition -> MemoryRecordsのマップを作成します。

    produceRequest.data.topicData.forEach(topic => topic.partitionData.forEach { partition =>
      val topicPartition = new TopicPartition(topic.name, partition.index)
      // This caller assumes the type is MemoryRecords and that is true on current serialization
      // We cast the type to avoid causing big change to code base.
      // https://issues.apache.org/jira/browse/KAFKA-10698
      val memoryRecords = partition.records.asInstanceOf[MemoryRecords]
~~
        try {
          ProduceRequest.validateRecords(request.header.apiVersion, memoryRecords)
          authorizedRequestInfo += (topicPartition -> memoryRecords)
        }
    })

TopicPartitionはProduce Request内のトピック名とインデックス、MemoryRecordsはProduce Requestのrecordsから作成されます。

Produce Request (Version: 10) => transactional_id acks timeout_ms [topic_data] TAG_BUFFER
  transactional_id => COMPACT_NULLABLE_STRING
  acks => INT16
  timeout_ms => INT32
  topic_data => name [partition_data] TAG_BUFFER
    name => COMPACT_STRING
    partition_data => index records TAG_BUFFER
      index => INT32
      records => COMPACT_RECORDS

TopicPartition -> MemoryRecordsのマップはReplicaManager.appendRecordsに渡され

と処理が進み

LogValidator.validateMessagesAndAssignOffsetsValidなMemoryRecordsが作成されます。

MemoryRecordsはMutableRecordBatchを持ち

Iterable<MutableRecordBatch> batches

MutablerecordBatchRecordBatchをextendしていて
DefaultRecordBatchは以下のようなスキーマになっており、PartitionLeaderEpochがLeaderEpochに対応します。

 RecordBatch =>
  BaseOffset => Int64
  Length => Int32
  PartitionLeaderEpoch => Int32
  Magic => Int8
  CRC => Uint32
  Attributes => Int16
  LastOffsetDelta => Int32 // also serves as LastSequenceDelta
  BaseTimestamp => Int64
  MaxTimestamp => Int64
  ProducerId => Int64
  ProducerEpoch => Int16
  BaseSequence => Int32
  Records => [Record]

ref. https://github.com/apache/kafka/blob/3.7.0/clients/src/main/java/org/apache/kafka/common/record/DefaultRecordBatch.java

なお、LeaderEpochはPartitionが持っていて、PartitionからUnifiedLog.appendAsLeaderを呼ぶときに渡されます。

  def appendRecordsToLeader(records: MemoryRecords, origin: AppendOrigin, requiredAcks: Int,
                            requestLocal: RequestLocal, verificationGuard: VerificationGuard = VerificationGuard.SENTINEL): LogAppendInfo = {
    val (info, leaderHWIncremented) = inReadLock(leaderIsrUpdateLock) {
      leaderLogIfLocal match {
        case Some(leaderLog) =>
~~
          val info = leaderLog.appendAsLeader(records, leaderEpoch = this.leaderEpoch, origin,
            interBrokerProtocolVersion, requestLocal, verificationGuard)
~~

ValidなメモリレコードはLocalLog.appendに渡され、ActiveなLogSegmentのappendが実行されます。

  private[log] def append(lastOffset: Long, largestTimestamp: Long, shallowOffsetOfMaxTimestamp: Long, records: MemoryRecords): Unit = {
    segments.activeSegment.append(lastOffset, largestTimestamp, shallowOffsetOfMaxTimestamp, records)
    updateLogEndOffset(lastOffset + 1)
  }

LocalLogLogSegmentのシーケンス(LogSegments)であり、

/**
 * An append-only log for storing messages locally. The log is a sequence of LogSegments, each with a base offset.
~~
*/
class LocalLog(@volatile private var _dir: File,
               @volatile private[log] var config: LogConfig,
               private[log] val segments: LogSegments,

一番最後のLogSegmentActiveなセグメントになります。[1]

    /**
     * The active segment that is currently taking appends
     */
    public LogSegment activeSegment() {
        return lastSegment().get();
    }

Logsegment.appendでは、FileRecords.appendが呼ばれMemoryRecordsをファイルに書き込みます。

FileRecords.appendMemoryRecords.writeFullyToを実行し、

    public int append(MemoryRecords records) throws IOException {
~~
        int written = records.writeFullyTo(channel);
        size.getAndAdd(written);
        return written;
    }

channelFileRecords.openChannelを見る限り、Fileへの書き込みになるようです。

    /**
     * Open a channel for the given file
~~
     */
    private static FileChannel openChannel(File file,
                                           boolean mutable,
                                           boolean fileAlreadyExists,
                                           int initFileSize,
                                           boolean preallocate) throws IOException {
        if (mutable) {
            if (fileAlreadyExists || !preallocate) {
                return FileChannel.open(file.toPath(), StandardOpenOption.CREATE, StandardOpenOption.READ,
                        StandardOpenOption.WRITE);

FollowerがFetch Requestを送信する

Fetch RequestはFollowerのReplicaFetchManagerから送信されます。

ReplicaFetchManagerReplicaManagerが作られるときに作成され、

class ReplicaManager(val config: KafkaConfig,
~~
                     ) extends Logging {
~~
  val replicaFetcherManager = createReplicaFetcherManager(metrics, time, threadNamePrefix, quotaManagers.follower)

ReplicaFetcherManagerはFetcherThreadをLeader毎に作ります

class ReplicaFetcherManager(brokerConfig: KafkaConfig,
~~
  override def createFetcherThread(fetcherId: Int, sourceBroker: BrokerEndPoint):
~~
    new ReplicaFetcherThread(threadName, leader, brokerConfig, failedPartitions, replicaManager,
      quotaManager, logContext.logPrefix, metadataVersionSupplier)

FetcherThreadではAbstractFetcherThread.doWorkを実行し、

を経てLeaderにFetch Requestが送信されます。

  private def processFetchRequest(sessionPartitions: util.Map[TopicPartition, FetchRequest.PartitionData],
                                  fetchRequest: FetchRequest.Builder): Unit = {
    val partitionsWithError = mutable.Set[TopicPartition]()
    val divergingEndOffsets = mutable.Map.empty[TopicPartition, EpochEndOffset]
    var responseData: Map[TopicPartition, FetchData] = Map.empty

    try {
      trace(s"Sending fetch request $fetchRequest")
      responseData = leader.fetch(fetchRequest)

Fetch RequestはAbstractFetcherThread.maybeFetch内のLeaderEndPoint.buildFetchで作られ、

  private def maybeFetch(): Unit = {
    val fetchRequestOpt = inLock(partitionMapLock) {
      val ResultWithPartitions(fetchRequestOpt, partitionsWithError) = leader.buildFetch(partitionStates.partitionStateMap.asScala)

Fetch Requestに含む対象パーティションのfetch_offsetは、FetchState.fetchOffsetが使われます。また、このOffsetはFetch Requestのレスポンスを受け取ったタイミングで更新されます。

  override def buildFetch(partitions: Map[TopicPartition, PartitionFetchState]): ResultWithPartitions[Option[ReplicaFetch]] = {
~~
    val builder = fetchSessionHandler.newBuilder(partitions.size, false)
    partitions.foreachEntry { (topicPartition, fetchState) =>
~~
          builder.add(topicPartition, new FetchRequest.PartitionData(
            fetchState.topicId.getOrElse(Uuid.ZERO_UUID),
            fetchState.fetchOffset,
            logStartOffset,
            fetchSize,
            Optional.of(fetchState.currentLeaderEpoch),
            lastFetchedEpoch))

FetcherThreadにパーティションを登録する

LeaderAndIsrReqeustを受け取ったときにhandleLeaderAndIsrRequestが実行されます。

その後、ReplicaManager.becomeLeaderOrFollowerが実行され

LeaderAndIsrReqeustpartition_statepartitionsToBeFollowerに追加されます。

  def becomeLeaderOrFollower(correlationId: Int,
                             leaderAndIsrRequest: LeaderAndIsrRequest,
                             onLeadershipChange: (Iterable[Partition], Iterable[Partition]) => Unit): LeaderAndIsrResponse = {
    val startMs = time.milliseconds()
    replicaStateChangeLock synchronized {
      val controllerId = leaderAndIsrRequest.controllerId
      val requestPartitionStates = leaderAndIsrRequest.partitionStates.asScala
~~
// First create the partition if it doesn't exist already
          requestPartitionStates.foreach { partitionState =>
~~
            // Next check the topic ID and the partition's leader epoch
            partitionOpt.foreach { partition =>
~~
                if (partitionState.replicas.contains(localBrokerId)) {
                  partitions += partition
                  if (partitionState.leader == localBrokerId) {
                    partitionsToBeLeader.put(partition, partitionState)
                  } else {
                    partitionsToBeFollower.put(partition, partitionState)
                  }

makeFollowersから

          val partitionsBecomeFollower = if (partitionsToBeFollower.nonEmpty)
            makeFollowers(controllerId, controllerEpoch, partitionsToBeFollower, correlationId, responseMap,
              highWatermarkCheckpoints, topicIdFromRequest)
          else

ReplicaFetcherManager.addFetcherForPartitionsを実行し

  protected def addPartitionsToFetcherThread(fetcherThread: T,
                                             initialOffsetAndEpochs: collection.Map[TopicPartition, InitialFetchState]): Unit = {
    fetcherThread.addPartitions(initialOffsetAndEpochs)
    info(s"Added fetcher to broker ${fetcherThread.leader.brokerEndPoint().id} for partitions $initialOffsetAndEpochs")
  }

FetcherThreadに対象パーティションが追加されます

  def addPartitions(initialFetchStates: Map[TopicPartition, InitialFetchState]): Set[TopicPartition] = {
~~
      failedPartitions.removeAll(initialFetchStates.keySet)

      initialFetchStates.forKeyValue { (tp, initialFetchState) =>
        val currentState = partitionStates.stateValue(tp)
        val updatedState = partitionFetchState(tp, initialFetchState, currentState)
        partitionStates.updateAndMoveToEnd(tp, updatedState)
      }
~~
  }

なお、このときの初期offsetはFollowerのlogEndOffsetになっています

  protected def initialFetchOffset(log: UnifiedLog): Long = {
    if (metadataCache.metadataVersion().isTruncationOnFetchSupported && log.latestEpoch.nonEmpty)
      log.logEndOffset
    else
      log.highWatermark
  }

LeaderがHigh Watermarkを更新する

Followerが特定のoffsetまでfetchが完了すると、Leaderはそのレコードをcommitします。kafka consumerはそのoffsetまでのレコードを読み込むことができ、このoffsetがHigh Watermarkと呼ばれます。

Fetch Requestを受け取ったLeaderはhandleFetchから

  1. ReplicaManger.fetchMessages
  2. ReplicaManger.readFromLog
  3. Partition.fetchRecords
  4. Partition.updateFollowerFetchState
  5. Partition.maybeIncrementLeaderHW

と進みHigh Watermarkを更新します。

Partition.maybeIncrementLeaderHWの処理

min.insync.replicasを満たしているかチェックをして、満たしていなけばスキップします

  private def maybeIncrementLeaderHW(leaderLog: UnifiedLog, currentTimeMs: Long = time.milliseconds): Boolean = {
    if (isUnderMinIsr) {
      trace(s"Not increasing HWM because partition is under min ISR(ISR=${partitionState.isr}")
      return false
    }
~~
  }

次にHigh Watermarkとなるオフセットを決定します。CaughtUp、かつEligibleであるreplicaの中で最も小さいreplicaState.logEndOffsetMetadataをHigh Watermarkに採用します

  private def maybeIncrementLeaderHW(leaderLog: UnifiedLog, currentTimeMs: Long = time.milliseconds): Boolean = {
~~
    // maybeIncrementLeaderHW is in the hot path, the following code is written to
    // avoid unnecessary collection generation
    val leaderLogEndOffset = leaderLog.logEndOffsetMetadata
    var newHighWatermark = leaderLogEndOffset
    remoteReplicasMap.values.foreach { replica =>
      val replicaState = replica.stateSnapshot

      def shouldWaitForReplicaToJoinIsr: Boolean = {
        replicaState.isCaughtUp(leaderLogEndOffset.messageOffset, currentTimeMs, replicaLagTimeMaxMs) &&
        isReplicaIsrEligible(replica.brokerId)
      }

      // Note here we are using the "maximal", see explanation above
      if (replicaState.logEndOffsetMetadata.messageOffset < newHighWatermark.messageOffset &&
          (partitionState.maximalIsr.contains(replica.brokerId) || shouldWaitForReplicaToJoinIsr)
      ) {
        newHighWatermark = replicaState.logEndOffsetMetadata
      }
    }
~~
  }

leaderLogEndOffsetリーダーのLocalLogのoffset、つまりLeaderのLocalLogの次の書き込み位置を示すoffsetになっていて

  /**
   * The offset metadata of the next message that will be appended to the log
   */
  def logEndOffsetMetadata: LogOffsetMetadata = localLog.logEndOffsetMetadata

replicaState.logEndOffsetMetadataFollowerがLeaderから正常にFetchした最新のoffsetになっています。

  // The log end offset value, kept in all replicas; for local replica it is the
  // log's end offset, for remote replicas its value is only updated by follower fetch.
  logEndOffsetMetadata: LogOffsetMetadata,

replicaState.logEndOffsetMetadataは、先に実行されたPartition.updateFollowerFetchStateReplica.updateFetchStateOrThrowで更新されます

  def updateFollowerFetchState(
    replica: Replica,
    followerFetchOffsetMetadata: LogOffsetMetadata,
    followerStartOffset: Long,
    followerFetchTimeMs: Long,
    leaderEndOffset: Long,
    brokerEpoch: Long
  ): Unit = {
~~
      replica.updateFetchStateOrThrow(
        followerFetchOffsetMetadata,
        followerStartOffset,
        followerFetchTimeMs,
        leaderEndOffset,
        brokerEpoch
      )
    }

replicaがcaught upしているかは、以下の2つのうちいずれかを満たしているかどうかで判断しています。

  • リーダーのendOffsetとレプリカのendOffsetが同じである
  • 最後にcaught upした時刻からreplicaMaxLagMs以内である

https://github.com/apache/kafka/blob/3.7.0/core/src/main/scala/kafka/cluster/Replica.scala#L67..L73

  /**
   * Returns true when the replica is considered as "caught-up". A replica is
   * considered "caught-up" when its log end offset is equals to the log end
   * offset of the leader OR when its last caught up time minus the current
   * time is smaller than the max replica lag.
   */
  def isCaughtUp(
    leaderEndOffset: Long,
    currentTimeMs: Long,
    replicaMaxLagMs: Long
  ): Boolean = {
    leaderEndOffset == logEndOffset || currentTimeMs - lastCaughtUpTimeMs <= replicaMaxLagMs
  }

lastCaughtUpTimeMsは先ほども出てきた
Replica.updateFetchStateOrThrowで更新されます。

      val lastCaughtUpTime = if (followerFetchOffsetMetadata.messageOffset >= leaderEndOffset) {
        math.max(currentReplicaState.lastCaughtUpTimeMs, followerFetchTimeMs)
      } else if (followerFetchOffsetMetadata.messageOffset >= currentReplicaState.lastFetchLeaderLogEndOffset) {
        math.max(currentReplicaState.lastCaughtUpTimeMs, currentReplicaState.lastFetchTimeMs)
      } else {
        currentReplicaState.lastCaughtUpTimeMs
      }
  /**
~~
   * If the FetchRequest reads up to the log end offset of the leader when the current fetch request is received,
   * set `lastCaughtUpTimeMs` to the time when the current fetch request was received.
   *
   * Else if the FetchRequest reads up to the log end offset of the leader when the previous fetch request was received,
   * set `lastCaughtUpTimeMs` to the time when the previous fetch request was received.
~~
   */

Eligibleであるかは、zookeeperを使っている場合該当ノードが生きているかどうかで判定しているようです

      // In ZK mode, we just ensure the broker is alive. Although we do not check for shutting down brokers here,
      // the controller will block them from being added to ISR.
      case zkMetadataCache: ZkMetadataCache =>
        zkMetadataCache.hasAliveBroker(followerReplicaId)

FollowerがHigh Watermarkを更新する

FollowerがFetch Requestに対するレスポンスを受け取るReplicaFetcherThreadがprocessPartitionDataを実行し、UnifiedLog.maybeUpdateHighWatermarkFetchレスポンスに含まれるHigh Watermarkに更新されます

  // process fetched data
  override def processPartitionData(topicPartition: TopicPartition,
                                    fetchOffset: Long,
                                    partitionData: FetchData): Option[LogAppendInfo] = {
~~
    var maybeUpdateHighWatermarkMessage = s"but did not update replica high watermark"
    log.maybeUpdateHighWatermark(partitionData.highWatermark).foreach { newHighWatermark =>
      maybeUpdateHighWatermarkMessage = s"and updated replica high watermark to $newHighWatermark"
      partitionsWithNewHighWatermark += topicPartition
    }

FollowerはprocessPartitionDataでLeaderからFetchしたレコードを書き込み

  // process fetched data
  override def processPartitionData(topicPartition: TopicPartition,
                                    fetchOffset: Long,
                                    partitionData: FetchData): Option[LogAppendInfo] = {
~~
    // Append the leader's messages to the log
    val logAppendInfo = partition.appendRecordsToFollowerOrFutureReplica(records, isFuture = false)

書き込んだレコードのLastOffsetが次のfetch_offsetになります。

private def processFetchRequest(sessionPartitions: util.Map[TopicPartition, FetchRequest.PartitionData],
                                  fetchRequest: FetchRequest.Builder): Unit = {
~
                 // Update partitionStates only if there is no exception during processPartitionData
                          val newFetchState = PartitionFetchState(currentFetchState.topicId, nextOffset, Some(lag),
                            currentFetchState.currentLeaderEpoch, state = Fetching, lastFetchedEpoch)
                          partitionStates.updateAndMoveToEnd(topicPartition, newFetchState)
~~
脚注
  1. セグメントについての説明はKafka Topic Internals: Segments and Indexes | Learn Apache Kafkaが分かりやすかったです。 ↩︎

Discussion