メモ: KafkaでProduce Requestがどう処理されるのか
以下の動画と資料がとても分かりやすい
Inside the Apache Kafka Broker

Inside the Apache Kafka Broker より
上記の理解を踏まえて、実際にApache Kafkaのソースコードはどうなっているのか追ってみる
対象は3.7.0
KafkaSeverの起動
KafkaServer.startupで起動する。
やっていること
-
SocketServerの起動-
RequestChannelの作成- Requeust Queueに相当する
-
(DataPlane)AcceptorとProcessorの作成-
ProcessorはNetwork Threadに相当する -
(DataPlane)Acceptorはacceptを実行してクライアントからの接続を待つ
-
-
(DataPlane)Acceptorの起動 -
Processorの起動
-
-
KafkaRequestHandlerPoolの作成-
KafkaRequestHandlerはIO Threadに相当する
-
クラス図
シーケンス図
最終的にどういう状態になるか
-
DataPlaneAcceptorはソケットを作りacceptしてクライアントから接続が来るのを待つ -
ProcessorはnewConnectionsにソケットが入るのを待つ -
KafkaRequestHandlerはRequestChannelにリクエストが入るのを待つ
SocketServerの起動
KafkaServer.startupでSocketServerを作成する
socketServer = new SocketServer(config, metrics, time, credentialProvider, apiVersionManager)
RequestChannelの作成
SocketServerのコンストラクタでRequestChannelを作成する
val dataPlaneRequestChannel = new RequestChannel(maxQueuedRequests, DataPlaneAcceptor.MetricPrefix, time, apiVersionManager.newRequestMetrics)
(DataPlane)AcceptorとProcessorの作成
SocketServerのコンストラクタでcreateDataPlaneAcceptorAndProcessorsを実行してDataPlaneAcceptorを作成しDataPlaneAcceptor.configureを実行する
if (apiVersionManager.listenerType.equals(ListenerType.CONTROLLER)) {
config.controllerListeners.foreach(createDataPlaneAcceptorAndProcessors)
} else {
config.controlPlaneListener.foreach(createControlPlaneAcceptorAndProcessor)
config.dataPlaneListeners.foreach(createDataPlaneAcceptorAndProcessors)
}
def createDataPlaneAcceptorAndProcessors(endpoint: EndPoint): Unit = synchronized {
~~
val dataPlaneAcceptor = createDataPlaneAcceptor(endpoint, isPrivilegedListener, dataPlaneRequestChannel)
config.addReconfigurable(dataPlaneAcceptor)
dataPlaneAcceptor.configure(parsedConfigs)
~~
}
~~
protected def createDataPlaneAcceptor(endPoint: EndPoint, isPrivilegedListener: Boolean, requestChannel: RequestChannel): DataPlaneAcceptor = {
new DataPlaneAcceptor(this, endPoint, config, nodeId, connectionQuotas, time, isPrivilegedListener, requestChannel, metrics, credentialProvider, logContext, memoryPool, apiVersionManager)
}
DataPlaneAcceptor.configureでAcceptor.addProcessorsを実行する
override def configure(configs: util.Map[String, _]): Unit = {
addProcessors(configs.get(KafkaConfig.NumNetworkThreadsProp).asInstanceOf[Int])
}
Acceptor.addProcessorsでKafkaConfig.NumNetworkThreadsPropだけProcessorを作成し、そのProcessorを自身(Acceptorのprocessors)とRequestChannelに追加する
def addProcessors(toCreate: Int): Unit = synchronized {
val listenerName = endPoint.listenerName
val securityProtocol = endPoint.securityProtocol
val listenerProcessors = new ArrayBuffer[Processor]()
for (_ <- 0 until toCreate) {
val processor = newProcessor(socketServer.nextProcessorId(), listenerName, securityProtocol)
listenerProcessors += processor
requestChannel.addProcessor(processor)
~~
}
processors ++= listenerProcessors
}
(DataPlane)Acceptorの起動
KafkaServer.startupからSocketServer.enableRequestProcessingを実行しAcceptor.startを実行する
def enableRequestProcessing(
authorizerFutures: Map[Endpoint, CompletableFuture[Void]]
): CompletableFuture[Void] = this.synchronized {
~
def chainAcceptorFuture(acceptor: Acceptor): Unit = {
~~
acceptor.start()
~~
})
}
info("Enabling request processing.")
~~
Acceptor.startの中で自身のスレッドと作成したProcessorの起動を行う。またacceptするためのソケットも作成する。
def start(): Unit = synchronized {
try {
~~
if (serverChannel == null) {
# acceptするソケットを作成する
serverChannel = openServerSocket(endPoint.host, endPoint.port, listenBacklogSize)
debug(s"Opened endpoint ${endPoint.host}:${endPoint.port}")
}
debug(s"Starting processors for listener ${endPoint.listenerName}")
# Processorの起動
processors.foreach(_.start())
~~
# 自身のスレッドをstart
thread.start()
ソケットはノンブロッキングモードで作成する
val serverChannel = ServerSocketChannel.open()
serverChannel.configureBlocking(false)
Acceptor.startで作成したソケットは、Acceptor.runでacceptが可能になったかどうかの監視がされる。ここでは、Selectorを使ったノンブロッキングのacceptとIO多重化が行われている
Selectorの動作イメージは、以下の記事などを読むと掴める
ref. Java によるネットワークプログラミングの基礎知識
override def run(): Unit = {
serverChannel.register(nioSelector, SelectionKey.OP_ACCEPT)
try {
while (shouldRun.get()) {
try {
acceptNewConnections()
closeThrottledConnections()
}
~~
Acceptor.acceptNewConnectionでソケットがaccept可能になるまで待ち、acceptが可能になったらAcceptor.acceptを実行しクライアントからの接続を待つ
private def acceptNewConnections(): Unit = {
val ready = nioSelector.select(500)
~~
if (key.isAcceptable) {
accept(key).foreach { socketChannel =>
~~
do {
~~
} while (!assignNewConnection(socketChannel, processor, retriesLeft == 0))
~~
private def accept(key: SelectionKey): Option[SocketChannel] = {
val serverSocketChannel = key.channel().asInstanceOf[ServerSocketChannel]
val socketChannel = serverSocketChannel.accept()
~~
Processorの起動
Acceptor.startの中でProcessorの起動を行う
Processorの起動処理は
Processor.runでされていて、Processor.configureNewConnectionsを実行する
override def run(): Unit = {
try {
while (shouldRun.get()) {
try {
// setup any new connections that have been queued up
configureNewConnections()
~~
poll()
~~
Processor.configureNewConnectionsではArrayBlockingQueueであるnewConnectionsをpollしてnewConnectionsにソケットが追加されるのを待つ。ソケットが追加された場合は、Selector.registerを実行し、ソケットが読み込み可能になるのを監視する。
private def configureNewConnections(): Unit = {
var connectionsProcessed = 0
while (connectionsProcessed < connectionQueueSize && !newConnections.isEmpty) {
val channel = newConnections.poll()
~~
selector.register(connectionId(channel.socket), channel)
~~
}
Processor.pollではselectorをpollして、そのソケットが読み込み可能になるまで待つ
private def poll(): Unit = {
val pollTimeout = if (newConnections.isEmpty) 300 else 0
try selector.poll(pollTimeout)
~~
}
Selector.pollでは、selectを実行する
public void poll(long timeout) throws IOException {
~~
/* check ready keys */
long startSelect = time.nanoseconds();
int numReadyKeys = select(timeout);
long endSelect = time.nanoseconds();
KafkaRequestHandlerPoolの作成
Kafka.startupで
numIoThreadsだけKafkaRequestHandlerを作る
dataPlaneRequestHandlerPool = new KafkaRequestHandlerPool(config.brokerId, socketServer.dataPlaneRequestChannel, dataPlaneRequestProcessor, time,
config.numIoThreads, s"${DataPlaneAcceptor.MetricPrefix}RequestHandlerAvgIdlePercent", DataPlaneAcceptor.ThreadPrefix)
class KafkaRequestHandlerPool(
~~
for (i <- 0 until numThreads) {
createHandler(i)
}
作成されたKafkaRequestHandlerはRequetChannelにリクエストが来るのを待つ
def run(): Unit = {
threadRequestChannel.set(requestChannel)
while (!stopped) {
~~
val req = requestChannel.receiveRequest(300)
接続を受け付けてからIO Threadに処理が移るまで
シーケンス図
接続を受け付けてからNetwork Threadに処理が移るまで
接続があった場合、 Acceptor.acceptが返りAcceptor.assignNewConnectionを実行する
if (key.isAcceptable) {
accept(key).foreach { socketChannel =>
~~
do {
~~
} while (!assignNewConnection(socketChannel, processor, retriesLeft == 0))
~~
Acceptor.acceptはacceptした新しいソケットを返す
private def accept(key: SelectionKey): Option[SocketChannel] = {
val serverSocketChannel = key.channel().asInstanceOf[ServerSocketChannel]
val socketChannel = serverSocketChannel.accept()
try {
~~
Some(socketChannel)
~~
Acceptor.assignNewConnectionでは、Processor#acceptを実行しprocessorにソケットをassignする。
private def assignNewConnection(socketChannel: SocketChannel, processor: Processor, mayBlock: Boolean): Boolean = {
if (processor.accept(socketChannel, mayBlock, blockedPercentMeter)) {
Processor.acceptでは、BlockindgQueueであるnewConnectionsにソケットの挿入を試み、成功したらProcessor.wakeupを実行する。また、wakeupを呼ぶので、select(Processor.run内のProcessor.poll)でブロックしている箇所の処理が進む。
def accept(socketChannel: SocketChannel,
mayBlock: Boolean,
acceptorIdlePercentMeter: com.yammer.metrics.core.Meter): Boolean = {
val accepted = {
if (newConnections.offer(socketChannel))
true
~~
}
if (accepted)
wakeup()
ref. https://docs.oracle.com/javase/jp/8/docs/api/java/util/concurrent/BlockingQueue.html
Network ThreadがIO Threadに処理を移すまで
newConnectionsにソケットが入るとProcessor.configureNewConnectionsから先に進み、Processor#pollでソケットが読み込み可能になるまで待ったあと、Processor.processCompletedReceivesを実行しrequestChannelにリクエストを送る。requestChannelはRequest Queueでこの段階ではqueueにリクエストがputされるだけ
private def processCompletedReceives(): Unit = {
~~
val req = new RequestChannel.Request(processor = id, context = context,
startTimeNanos = nowNanos, memoryPool, receive.payload, requestChannel.metrics, None)
~~
requestChannel.sendRequest(req)
IO Threadが処理を実行し、クライアントにレスポンスが返るまで
RequetChannelにリクエストが来るのを待っているKafkaRequestHandler.runが先に進む
def run(): Unit = {
threadRequestChannel.set(requestChannel)
while (!stopped) {
~~
val req = requestChannel.receiveRequest(300)
RequestChannel.Requestの場合、ApiHandler.handleを実行する
case request: RequestChannel.Request =>
try {
request.requestDequeueTimeNanos = endTime
trace(s"Kafka request handler $id on broker $brokerId handling request $request")
threadCurrentRequest.set(request)
apis.handle(request, requestLocal)
KafkaApis.handleでそれぞれのリクエストの種類に応じてメソッドを呼び出す。
override def handle(request: RequestChannel.Request, requestLocal: RequestLocal): Unit = {
try {
~~
request.header.apiKey match {
case ApiKeys.PRODUCE => handleProduceRequest(request, requestLocal)
Produce Requestの場合は、KafkaApis.handleProduceRequestを実行する
KafkaApis.handleProduceRequestからReplicaManager.handleProcessAppendを呼びローカルに書きこむ。
def handleProduceRequest(request: RequestChannel.Request, requestLocal: RequestLocal): Unit = {
~~
replicaManager.handleProduceAppend(
timeout = produceRequest.timeout.toLong,
requiredAcks = produceRequest.acks,
internalTopicsAllowed = internalTopicsAllowed,
transactionalId = produceRequest.transactionalId,
entriesPerPartition = authorizedRequestInfo,
responseCallback = sendResponseCallback,
recordValidationStatsCallback = processingStatsCallback,
requestLocal = requestLocal,
transactionSupportedOperation = transactionSupportedOperation)
ReplicaManager.handleProceAppend自体はリーダーへの書き込みが完了したらすぐに返るが、acks=allの場合、レプリケーションの完了を待つ。
ref. https://kafka.apache.org/37/documentation.html#producerconfigs_acks
This means the leader will wait for the full set of in-sync replicas to acknowledge the record
private def maybeAddDelayedProduce(
~~
if (delayedProduceRequestRequired(requiredAcks, entriesPerPartition, initialAppendResults)) {
// create delayed produce operation
val produceMetadata = ProduceMetadata(requiredAcks, initialProduceStatus)
val delayedProduce = new DelayedProduce(timeoutMs, produceMetadata, this, responseCallback, delayedProduceLock)
// create a list of (topic, partition) pairs to use as keys for this delayed produce operation
val producerRequestKeys = entriesPerPartition.keys.map(TopicPartitionOperationKey(_)).toSeq
// try to complete the request immediately, otherwise put it into the purgatory
// this is because while the delayed produce operation is being created, new
// requests may arrive and hence make this operation completable.
delayedProducePurgatory.tryCompleteElseWatch(delayedProduce, producerRequestKeys)
この完了を待つときにDelayedProduceが使われる。
/**
* The delayed produce operation can be completed if every partition
* it produces to is satisfied by one of the following:
*
* Case A: Replica not assigned to partition
* Case B: Replica is no longer the leader of this partition
* Case C: This broker is the leader:
* C.1 - If there was a local error thrown while checking if at least requiredAcks
* replicas have caught up to this operation: set an error in response
* C.2 - Otherwise, set the response with no error.
*/
override def tryComplete(): Boolean = {
// check for each partition if it still has pending acks
produceMetadata.produceStatus.forKeyValue { (topicPartition, status) =>
trace(s"Checking produce satisfaction for $topicPartition, current status $status")
// skip those partitions that have already been satisfied
if (status.acksPending) {
val (hasEnough, error) = replicaManager.getPartitionOrError(topicPartition) match {
case Left(err) =>
// Case A
(false, err)
case Right(partition) =>
partition.checkEnoughReplicasReachOffset(status.requiredOffset)
}
// Case B || C.1 || C.2
if (error != Errors.NONE || hasEnough) {
status.acksPending = false
status.responseStatus.error = error
}
}
}
// check if every partition has satisfied at least one of case A, B or C
if (!produceMetadata.produceStatus.values.exists(_.acksPending))
forceComplete()
else
false
}
これは上の図(Inside the Apache Kafka Broker
)のPurgatoryに相当する処理を行っている。
Purgatoryが必要な理由は、Inside the Apache Kafka Brokerで以下のように説明されている
To avoid tying up the I/O threads while waiting for the replication step to complete, the request object will be stored in a map-like data structure called purgatory (it’s where things go to wait).
レプリカに対するリクエストの完了を待たずにIO Threadを早く解放するのが目的。
リクエストが完了したらPurgatoryに登録してあるresponseCallbackが実行される。responseCallbackは
KafkaApis#handleProduceRequestでReplicaManager.handleProcessAppendを実行するときに作成されていて、RequestChannel.sendReponseでレスポンスを返す
def sendResponseCallback(responseStatus: Map[TopicPartition, PartitionResponse]): Unit = {
~~
requestChannel.sendResponse(request, new ProduceResponse(mergedResponseStatus.asJava, maxThrottleTimeMs, nodeEndpoints.values.toList.asJava), None)
}
}
RequestChannel.sendResponseでProccessor.enqueueResponseを実行しresponseQueueにレスポンスを入れる
private[network] def sendResponse(response: RequestChannel.Response): Unit = {
~~
processor.enqueueResponse(response)
responseQueueはProcessor.runで実行されるprocessNewResponsesで処理される
override def run(): Unit = {
try {
while (shouldRun.get()) {
try {
// setup any new connections that have been queued up
configureNewConnections()
// register any new responses for writing
processNewResponses()
processNewResponsesからsendResponseが呼ばれてクライアントにレスポンスが返る
private def processNewResponses(): Unit = {
var currentResponse: RequestChannel.Response = null
while ({currentResponse = dequeueResponse(); currentResponse != null}) {
val channelId = currentResponse.request.context.connectionId
try {
currentResponse match {
~~
case response: SendResponse =>
sendResponse(response, response.responseSend)
Requestメトリクス
ついでに、以下のメトリクスについて調べる
-
RequestQueueTimeMs-
Time the request waits in the request queue
-
-
LocalTimeMs-
Time the request is processed at the leader
-
-
RemoteTimeMs-
Time the request waits for the follower
-
-
ResponseQueueTimeMs-
Time the request waits in the response queue
-
-
ResponseSendTimeMs-
Time to send the response
-
-
TotalTimeMs-
Request total time
-
KafkaのRequestに関するメトリクスはRequestChannelで計算されている
val requestQueueTimeMs = nanosToMs(requestDequeueTimeNanos - startTimeNanos)
~~
val apiLocalTimeMs = nanosToMs(apiLocalCompleteTimeNanos - requestDequeueTimeNanos + callbackRequestTimeNanos)
val apiRemoteTimeMs = nanosToMs(responseCompleteTimeNanos - apiLocalCompleteTimeNanos - callbackRequestTimeNanos)
val responseQueueTimeMs = nanosToMs(responseDequeueTimeNanos - responseCompleteTimeNanos)
val responseSendTimeMs = nanosToMs(endTimeNanos - responseDequeueTimeNanos)
val messageConversionsTimeMs = nanosToMs(messageConversionsTimeNanos)
val totalTimeMs = nanosToMs(endTimeNanos - startTimeNanos)
シーケンス図
RequestQueueTimeMs
requestDequeueTimeNanos - startTimeNanosで計算される
LocalTimeMs
apiLocalCompleteTimeNanos - requestDequeueTimeNanoで計算される
RemoteTimeMs
responseCompleteTimeNanos - apiLocalCompleteTimeNanosで計算される
ResponseQueueTimeMs
responseDequeueTimeNanos - responseCompleteTimeNanosで計算される
ResponseSendTimeMs
endTimeNanos - responseDequeueTimeNanosで計算される
TotalTimeMs
endTimeNanos - startTimeNanosで計算される
各時刻がセットされる場所
startTimeNanos
Processor.processCompletedReceivesで現在時刻がセットされる
val req = new RequestChannel.Request(processor = id, context = context,
startTimeNanos = nowNanos, memoryPool, receive.payload, requestChannel.metrics, None)
requestDequeueTimeNanos
KafkaRequestHandler.runでRequestChannelからリクエストを受信した時刻
def run(): Unit = {
val req = requestChannel.receiveRequest(300)
val endTime = time.nanoseconds
~~
case request: RequestChannel.Request =>
try {
request.requestDequeueTimeNanos = endTime
apiLocalCompleteTimeNanos
KafkaApis.handleのfinallyブロックで現在時刻がセットされる
override def handle(request: RequestChannel.Request, requestLocal: RequestLocal): Unit = {
~~
} finally {
~~
if (request.apiLocalCompleteTimeNanos < 0)
request.apiLocalCompleteTimeNanos = time.nanoseconds
responseCompleteTimeNanos
RequestChannel.sendReponseでレスポンスを受信したときに現在時刻がセットされる
private[network] def sendResponse(response: RequestChannel.Response): Unit = {
~~
response match {
// We should only send one of the following per request
case _: SendResponse | _: NoOpResponse | _: CloseConnectionResponse =>
val request = response.request
val timeNanos = time.nanoseconds()
request.responseCompleteTimeNanos = timeNanos
responseDequeueTimeNanos
SocketServer.dequeueResponseで現在時刻がセットされる
private def dequeueResponse(): RequestChannel.Response = {
val response = responseQueue.poll()
if (response != null)
response.request.responseDequeueTimeNanos = Time.SYSTEM.nanoseconds
response
}
endTimeNanos
SocketServer.processCompletedSendsからRequestChannel.updateRequestMetricsで現在時刻がセットされる
# SocketServer
private def processCompletedSends(): Unit = {
selector.completedSends.forEach { send =>
try {
~~
// Invoke send completion callback, and then update request metrics since there might be some
// request metrics got updated during callback
response.onComplete.foreach(onComplete => onComplete(send))
updateRequestMetrics(response)
# RequestChannel
def updateRequestMetrics(networkThreadTimeNanos: Long, response: Response): Unit = {
val endTimeNanos = Time.SYSTEM.nanoseconds
Discussion