メモ: Kafkaのレプリケーション
Confluentが公開しているKafkaのレプリケーションの記事が分かりやすかったので、該当コードを追って深掘りする(Advancing the Follower High Watermark
まで)
参照するコードは3.7.0
Kafkaへの書き込みと読み込みの流れ
- Produce Requestを受け取ったLeaderがLeaderEpochを書き込む
- FollowerがFetch Requestを送信する
- LeaderがHigh Watermarkを更新する
- FollowerがHigh Watermarkを更新する
LeaderEpochとは
LeaderEpochは現在のLeaderで何が行われていたかを追跡するために使われ、Leaderが変わったときに増加します。
The epoch is used to keep track of what work was done while this replica was the leader and it will be increased whenever a new leader is elected.
ref. Kafka Data Replication Protocol: A Complete Guide
LeaderがProduceされたレコードを書き込むとき、このLeaderEpochを記録します。
LeaderがLeaderEpochを書き込むまでの流れ
Produce Requestを受け取ったLeaderはhandleProduceRequestでProduce RequestのデータからTopicPartition -> MemoryRecords
のマップを作成します。
produceRequest.data.topicData.forEach(topic => topic.partitionData.forEach { partition =>
val topicPartition = new TopicPartition(topic.name, partition.index)
// This caller assumes the type is MemoryRecords and that is true on current serialization
// We cast the type to avoid causing big change to code base.
// https://issues.apache.org/jira/browse/KAFKA-10698
val memoryRecords = partition.records.asInstanceOf[MemoryRecords]
~~
try {
ProduceRequest.validateRecords(request.header.apiVersion, memoryRecords)
authorizedRequestInfo += (topicPartition -> memoryRecords)
}
})
TopicPartitionはProduce Request内のトピック名とインデックス、MemoryRecords
はProduce Requestのrecords
から作成されます。
Produce Request (Version: 10) => transactional_id acks timeout_ms [topic_data] TAG_BUFFER
transactional_id => COMPACT_NULLABLE_STRING
acks => INT16
timeout_ms => INT32
topic_data => name [partition_data] TAG_BUFFER
name => COMPACT_STRING
partition_data => index records TAG_BUFFER
index => INT32
records => COMPACT_RECORDS
TopicPartition -> MemoryRecords
のマップはReplicaManager.appendRecordsに渡され
- ReplicaManager.appendRecordsToLeader
- ReplicaManger.appendToLocalLog
- Partition.appendRecordsToLeader
- UnifiedLog.appendAsLeader
- UnifiedLog.append
と処理が進み
LogValidator.validateMessagesAndAssignOffsetsでValidなMemoryRecords
が作成されます。
MemoryRecordsはMutableRecordBatchを持ち
Iterable<MutableRecordBatch> batches
MutablerecordBatch
はRecordBatchをextendしていて、
DefaultRecordBatch
は以下のようなスキーマになっており、PartitionLeaderEpoch
がLeaderEpochに対応します。
RecordBatch =>
BaseOffset => Int64
Length => Int32
PartitionLeaderEpoch => Int32
Magic => Int8
CRC => Uint32
Attributes => Int16
LastOffsetDelta => Int32 // also serves as LastSequenceDelta
BaseTimestamp => Int64
MaxTimestamp => Int64
ProducerId => Int64
ProducerEpoch => Int16
BaseSequence => Int32
Records => [Record]
なお、LeaderEpochはPartitionが持っていて、PartitionからUnifiedLog.appendAsLeaderを呼ぶときに渡されます。
def appendRecordsToLeader(records: MemoryRecords, origin: AppendOrigin, requiredAcks: Int,
requestLocal: RequestLocal, verificationGuard: VerificationGuard = VerificationGuard.SENTINEL): LogAppendInfo = {
val (info, leaderHWIncremented) = inReadLock(leaderIsrUpdateLock) {
leaderLogIfLocal match {
case Some(leaderLog) =>
~~
val info = leaderLog.appendAsLeader(records, leaderEpoch = this.leaderEpoch, origin,
interBrokerProtocolVersion, requestLocal, verificationGuard)
~~
ValidなメモリレコードはLocalLog.appendに渡され、ActiveなLogSegmentのappendが実行されます。
private[log] def append(lastOffset: Long, largestTimestamp: Long, shallowOffsetOfMaxTimestamp: Long, records: MemoryRecords): Unit = {
segments.activeSegment.append(lastOffset, largestTimestamp, shallowOffsetOfMaxTimestamp, records)
updateLogEndOffset(lastOffset + 1)
}
LocalLog
はLogSegment
のシーケンス(LogSegments
)であり、
/**
* An append-only log for storing messages locally. The log is a sequence of LogSegments, each with a base offset.
~~
*/
class LocalLog(@volatile private var _dir: File,
@volatile private[log] var config: LogConfig,
private[log] val segments: LogSegments,
一番最後のLogSegment
がActiveなセグメントになります。[1]
/**
* The active segment that is currently taking appends
*/
public LogSegment activeSegment() {
return lastSegment().get();
}
Logsegment.appendでは、FileRecords.appendが呼ばれMemoryRecords
をファイルに書き込みます。
FileRecords.append
でMemoryRecords.writeFullyToを実行し、
public int append(MemoryRecords records) throws IOException {
~~
int written = records.writeFullyTo(channel);
size.getAndAdd(written);
return written;
}
channel
はFileRecords.openChannelを見る限り、File
への書き込みになるようです。
/**
* Open a channel for the given file
~~
*/
private static FileChannel openChannel(File file,
boolean mutable,
boolean fileAlreadyExists,
int initFileSize,
boolean preallocate) throws IOException {
if (mutable) {
if (fileAlreadyExists || !preallocate) {
return FileChannel.open(file.toPath(), StandardOpenOption.CREATE, StandardOpenOption.READ,
StandardOpenOption.WRITE);
FollowerがFetch Requestを送信する
Fetch RequestはFollowerのReplicaFetchManager
から送信されます。
ReplicaFetchManager
はReplicaManagerが作られるときに作成され、
class ReplicaManager(val config: KafkaConfig,
~~
) extends Logging {
~~
val replicaFetcherManager = createReplicaFetcherManager(metrics, time, threadNamePrefix, quotaManagers.follower)
ReplicaFetcherManagerはFetcherThreadをLeader毎に作ります
class ReplicaFetcherManager(brokerConfig: KafkaConfig,
~~
override def createFetcherThread(fetcherId: Int, sourceBroker: BrokerEndPoint):
~~
new ReplicaFetcherThread(threadName, leader, brokerConfig, failedPartitions, replicaManager,
quotaManager, logContext.logPrefix, metadataVersionSupplier)
FetcherThread
ではAbstractFetcherThread.doWorkを実行し、
を経てLeaderにFetch Requestが送信されます。
private def processFetchRequest(sessionPartitions: util.Map[TopicPartition, FetchRequest.PartitionData],
fetchRequest: FetchRequest.Builder): Unit = {
val partitionsWithError = mutable.Set[TopicPartition]()
val divergingEndOffsets = mutable.Map.empty[TopicPartition, EpochEndOffset]
var responseData: Map[TopicPartition, FetchData] = Map.empty
try {
trace(s"Sending fetch request $fetchRequest")
responseData = leader.fetch(fetchRequest)
Fetch RequestはAbstractFetcherThread.maybeFetch
内のLeaderEndPoint.buildFetchで作られ、
private def maybeFetch(): Unit = {
val fetchRequestOpt = inLock(partitionMapLock) {
val ResultWithPartitions(fetchRequestOpt, partitionsWithError) = leader.buildFetch(partitionStates.partitionStateMap.asScala)
Fetch Requestに含む対象パーティションのfetch_offset
は、FetchState.fetchOffsetが使われます。また、このOffsetはFetch Requestのレスポンスを受け取ったタイミングで更新されます。
override def buildFetch(partitions: Map[TopicPartition, PartitionFetchState]): ResultWithPartitions[Option[ReplicaFetch]] = {
~~
val builder = fetchSessionHandler.newBuilder(partitions.size, false)
partitions.foreachEntry { (topicPartition, fetchState) =>
~~
builder.add(topicPartition, new FetchRequest.PartitionData(
fetchState.topicId.getOrElse(Uuid.ZERO_UUID),
fetchState.fetchOffset,
logStartOffset,
fetchSize,
Optional.of(fetchState.currentLeaderEpoch),
lastFetchedEpoch))
FetcherThreadにパーティションを登録する
LeaderAndIsrReqeustを受け取ったときにhandleLeaderAndIsrRequestが実行されます。
その後、ReplicaManager.becomeLeaderOrFollowerが実行され
LeaderAndIsrReqeust
のpartition_state
がpartitionsToBeFollower
に追加されます。
def becomeLeaderOrFollower(correlationId: Int,
leaderAndIsrRequest: LeaderAndIsrRequest,
onLeadershipChange: (Iterable[Partition], Iterable[Partition]) => Unit): LeaderAndIsrResponse = {
val startMs = time.milliseconds()
replicaStateChangeLock synchronized {
val controllerId = leaderAndIsrRequest.controllerId
val requestPartitionStates = leaderAndIsrRequest.partitionStates.asScala
~~
// First create the partition if it doesn't exist already
requestPartitionStates.foreach { partitionState =>
~~
// Next check the topic ID and the partition's leader epoch
partitionOpt.foreach { partition =>
~~
if (partitionState.replicas.contains(localBrokerId)) {
partitions += partition
if (partitionState.leader == localBrokerId) {
partitionsToBeLeader.put(partition, partitionState)
} else {
partitionsToBeFollower.put(partition, partitionState)
}
val partitionsBecomeFollower = if (partitionsToBeFollower.nonEmpty)
makeFollowers(controllerId, controllerEpoch, partitionsToBeFollower, correlationId, responseMap,
highWatermarkCheckpoints, topicIdFromRequest)
else
ReplicaFetcherManager.addFetcherForPartitionsを実行し
protected def addPartitionsToFetcherThread(fetcherThread: T,
initialOffsetAndEpochs: collection.Map[TopicPartition, InitialFetchState]): Unit = {
fetcherThread.addPartitions(initialOffsetAndEpochs)
info(s"Added fetcher to broker ${fetcherThread.leader.brokerEndPoint().id} for partitions $initialOffsetAndEpochs")
}
FetcherThreadに対象パーティションが追加されます
def addPartitions(initialFetchStates: Map[TopicPartition, InitialFetchState]): Set[TopicPartition] = {
~~
failedPartitions.removeAll(initialFetchStates.keySet)
initialFetchStates.forKeyValue { (tp, initialFetchState) =>
val currentState = partitionStates.stateValue(tp)
val updatedState = partitionFetchState(tp, initialFetchState, currentState)
partitionStates.updateAndMoveToEnd(tp, updatedState)
}
~~
}
なお、このときの初期offsetはFollowerのlogEndOffset
になっています
protected def initialFetchOffset(log: UnifiedLog): Long = {
if (metadataCache.metadataVersion().isTruncationOnFetchSupported && log.latestEpoch.nonEmpty)
log.logEndOffset
else
log.highWatermark
}
LeaderがHigh Watermarkを更新する
Followerが特定のoffsetまでfetchが完了すると、Leaderはそのレコードをcommitします。kafka consumerはそのoffsetまでのレコードを読み込むことができ、このoffsetがHigh Watermarkと呼ばれます。
Fetch Requestを受け取ったLeaderはhandleFetchから
- ReplicaManger.fetchMessages
- ReplicaManger.readFromLog
- Partition.fetchRecords
- Partition.updateFollowerFetchState
- Partition.maybeIncrementLeaderHW
と進みHigh Watermarkを更新します。
Partition.maybeIncrementLeaderHWの処理
min.insync.replicasを満たしているかチェックをして、満たしていなけばスキップします
private def maybeIncrementLeaderHW(leaderLog: UnifiedLog, currentTimeMs: Long = time.milliseconds): Boolean = {
if (isUnderMinIsr) {
trace(s"Not increasing HWM because partition is under min ISR(ISR=${partitionState.isr}")
return false
}
~~
}
次にHigh Watermarkとなるオフセットを決定します。CaughtUp、かつEligibleであるreplicaの中で最も小さいreplicaState.logEndOffsetMetadataをHigh Watermarkに採用します
private def maybeIncrementLeaderHW(leaderLog: UnifiedLog, currentTimeMs: Long = time.milliseconds): Boolean = {
~~
// maybeIncrementLeaderHW is in the hot path, the following code is written to
// avoid unnecessary collection generation
val leaderLogEndOffset = leaderLog.logEndOffsetMetadata
var newHighWatermark = leaderLogEndOffset
remoteReplicasMap.values.foreach { replica =>
val replicaState = replica.stateSnapshot
def shouldWaitForReplicaToJoinIsr: Boolean = {
replicaState.isCaughtUp(leaderLogEndOffset.messageOffset, currentTimeMs, replicaLagTimeMaxMs) &&
isReplicaIsrEligible(replica.brokerId)
}
// Note here we are using the "maximal", see explanation above
if (replicaState.logEndOffsetMetadata.messageOffset < newHighWatermark.messageOffset &&
(partitionState.maximalIsr.contains(replica.brokerId) || shouldWaitForReplicaToJoinIsr)
) {
newHighWatermark = replicaState.logEndOffsetMetadata
}
}
~~
}
leaderLogEndOffset
はリーダーのLocalLogのoffset、つまりLeaderのLocalLog
の次の書き込み位置を示すoffsetになっていて
/**
* The offset metadata of the next message that will be appended to the log
*/
def logEndOffsetMetadata: LogOffsetMetadata = localLog.logEndOffsetMetadata
replicaState.logEndOffsetMetadata
はFollowerがLeaderから正常にFetchした最新のoffsetになっています。
// The log end offset value, kept in all replicas; for local replica it is the
// log's end offset, for remote replicas its value is only updated by follower fetch.
logEndOffsetMetadata: LogOffsetMetadata,
replicaState.logEndOffsetMetadata
は、先に実行されたPartition.updateFollowerFetchStateでReplica.updateFetchStateOrThrowで更新されます
def updateFollowerFetchState(
replica: Replica,
followerFetchOffsetMetadata: LogOffsetMetadata,
followerStartOffset: Long,
followerFetchTimeMs: Long,
leaderEndOffset: Long,
brokerEpoch: Long
): Unit = {
~~
replica.updateFetchStateOrThrow(
followerFetchOffsetMetadata,
followerStartOffset,
followerFetchTimeMs,
leaderEndOffset,
brokerEpoch
)
}
replicaがcaught upしているかは、以下の2つのうちいずれかを満たしているかどうかで判断しています。
- リーダーのendOffsetとレプリカのendOffsetが同じである
- 最後にcaught upした時刻から
replicaMaxLagMs
以内である
/**
* Returns true when the replica is considered as "caught-up". A replica is
* considered "caught-up" when its log end offset is equals to the log end
* offset of the leader OR when its last caught up time minus the current
* time is smaller than the max replica lag.
*/
def isCaughtUp(
leaderEndOffset: Long,
currentTimeMs: Long,
replicaMaxLagMs: Long
): Boolean = {
leaderEndOffset == logEndOffset || currentTimeMs - lastCaughtUpTimeMs <= replicaMaxLagMs
}
lastCaughtUpTimeMs
は先ほども出てきた
Replica.updateFetchStateOrThrowで更新されます。
val lastCaughtUpTime = if (followerFetchOffsetMetadata.messageOffset >= leaderEndOffset) {
math.max(currentReplicaState.lastCaughtUpTimeMs, followerFetchTimeMs)
} else if (followerFetchOffsetMetadata.messageOffset >= currentReplicaState.lastFetchLeaderLogEndOffset) {
math.max(currentReplicaState.lastCaughtUpTimeMs, currentReplicaState.lastFetchTimeMs)
} else {
currentReplicaState.lastCaughtUpTimeMs
}
/**
~~
* If the FetchRequest reads up to the log end offset of the leader when the current fetch request is received,
* set `lastCaughtUpTimeMs` to the time when the current fetch request was received.
*
* Else if the FetchRequest reads up to the log end offset of the leader when the previous fetch request was received,
* set `lastCaughtUpTimeMs` to the time when the previous fetch request was received.
~~
*/
Eligibleであるかは、zookeeperを使っている場合該当ノードが生きているかどうかで判定しているようです
// In ZK mode, we just ensure the broker is alive. Although we do not check for shutting down brokers here,
// the controller will block them from being added to ISR.
case zkMetadataCache: ZkMetadataCache =>
zkMetadataCache.hasAliveBroker(followerReplicaId)
FollowerがHigh Watermarkを更新する
FollowerがFetch Requestに対するレスポンスを受け取るとReplicaFetcherThreadがprocessPartitionDataを実行し、UnifiedLog.maybeUpdateHighWatermarkでFetchレスポンスに含まれるHigh Watermarkに更新されます
// process fetched data
override def processPartitionData(topicPartition: TopicPartition,
fetchOffset: Long,
partitionData: FetchData): Option[LogAppendInfo] = {
~~
var maybeUpdateHighWatermarkMessage = s"but did not update replica high watermark"
log.maybeUpdateHighWatermark(partitionData.highWatermark).foreach { newHighWatermark =>
maybeUpdateHighWatermarkMessage = s"and updated replica high watermark to $newHighWatermark"
partitionsWithNewHighWatermark += topicPartition
}
FollowerはprocessPartitionDataでLeaderからFetchしたレコードを書き込み
// process fetched data
override def processPartitionData(topicPartition: TopicPartition,
fetchOffset: Long,
partitionData: FetchData): Option[LogAppendInfo] = {
~~
// Append the leader's messages to the log
val logAppendInfo = partition.appendRecordsToFollowerOrFutureReplica(records, isFuture = false)
書き込んだレコードのLastOffsetが次のfetch_offset
になります。
private def processFetchRequest(sessionPartitions: util.Map[TopicPartition, FetchRequest.PartitionData],
fetchRequest: FetchRequest.Builder): Unit = {
~
// Update partitionStates only if there is no exception during processPartitionData
val newFetchState = PartitionFetchState(currentFetchState.topicId, nextOffset, Some(lag),
currentFetchState.currentLeaderEpoch, state = Fetching, lastFetchedEpoch)
partitionStates.updateAndMoveToEnd(topicPartition, newFetchState)
~~
-
セグメントについての説明はKafka Topic Internals: Segments and Indexes | Learn Apache Kafkaが分かりやすかったです。 ↩︎
Discussion