メモ: KafkaでProduce Requestがどう処理されるのか
以下の動画と資料がとても分かりやすい
Inside the Apache Kafka Broker
Inside the Apache Kafka Broker より
上記の理解を踏まえて、実際にApache Kafkaのソースコードはどうなっているのか追ってみる
対象は3.7.0
KafkaSeverの起動
KafkaServer.startup
で起動する。
やっていること
-
SocketServer
の起動-
RequestChannel
の作成- Requeust Queueに相当する
-
(DataPlane)Acceptor
とProcessor
の作成-
Processor
はNetwork Threadに相当する -
(DataPlane)Acceptor
はaccept
を実行してクライアントからの接続を待つ
-
-
(DataPlane)Acceptor
の起動 -
Processor
の起動
-
-
KafkaRequestHandlerPool
の作成-
KafkaRequestHandler
はIO Threadに相当する
-
クラス図
シーケンス図
最終的にどういう状態になるか
-
DataPlaneAcceptor
はソケットを作りaccept
してクライアントから接続が来るのを待つ -
Processor
はnewConnections
にソケットが入るのを待つ -
KafkaRequestHandler
はRequestChannel
にリクエストが入るのを待つ
SocketServer
の起動
KafkaServer.startup
でSocketServer
を作成する
socketServer = new SocketServer(config, metrics, time, credentialProvider, apiVersionManager)
RequestChannel
の作成
SocketServer
のコンストラクタでRequestChannel
を作成する
val dataPlaneRequestChannel = new RequestChannel(maxQueuedRequests, DataPlaneAcceptor.MetricPrefix, time, apiVersionManager.newRequestMetrics)
(DataPlane)Acceptor
とProcessor
の作成
SocketServer
のコンストラクタでcreateDataPlaneAcceptorAndProcessors
を実行してDataPlaneAcceptor
を作成しDataPlaneAcceptor.configure
を実行する
if (apiVersionManager.listenerType.equals(ListenerType.CONTROLLER)) {
config.controllerListeners.foreach(createDataPlaneAcceptorAndProcessors)
} else {
config.controlPlaneListener.foreach(createControlPlaneAcceptorAndProcessor)
config.dataPlaneListeners.foreach(createDataPlaneAcceptorAndProcessors)
}
def createDataPlaneAcceptorAndProcessors(endpoint: EndPoint): Unit = synchronized {
~~
val dataPlaneAcceptor = createDataPlaneAcceptor(endpoint, isPrivilegedListener, dataPlaneRequestChannel)
config.addReconfigurable(dataPlaneAcceptor)
dataPlaneAcceptor.configure(parsedConfigs)
~~
}
~~
protected def createDataPlaneAcceptor(endPoint: EndPoint, isPrivilegedListener: Boolean, requestChannel: RequestChannel): DataPlaneAcceptor = {
new DataPlaneAcceptor(this, endPoint, config, nodeId, connectionQuotas, time, isPrivilegedListener, requestChannel, metrics, credentialProvider, logContext, memoryPool, apiVersionManager)
}
DataPlaneAcceptor.configure
でAcceptor.addProcessors
を実行する
override def configure(configs: util.Map[String, _]): Unit = {
addProcessors(configs.get(KafkaConfig.NumNetworkThreadsProp).asInstanceOf[Int])
}
Acceptor.addProcessors
でKafkaConfig.NumNetworkThreadsProp
だけProcessor
を作成し、そのProcessor
を自身(Acceptor
のprocessors
)とRequestChannel
に追加する
def addProcessors(toCreate: Int): Unit = synchronized {
val listenerName = endPoint.listenerName
val securityProtocol = endPoint.securityProtocol
val listenerProcessors = new ArrayBuffer[Processor]()
for (_ <- 0 until toCreate) {
val processor = newProcessor(socketServer.nextProcessorId(), listenerName, securityProtocol)
listenerProcessors += processor
requestChannel.addProcessor(processor)
~~
}
processors ++= listenerProcessors
}
(DataPlane)Acceptor
の起動
KafkaServer.startup
からSocketServer.enableRequestProcessing
を実行しAcceptor.start
を実行する
def enableRequestProcessing(
authorizerFutures: Map[Endpoint, CompletableFuture[Void]]
): CompletableFuture[Void] = this.synchronized {
~
def chainAcceptorFuture(acceptor: Acceptor): Unit = {
~~
acceptor.start()
~~
})
}
info("Enabling request processing.")
~~
Acceptor.start
の中で自身のスレッドと作成したProcessor
の起動を行う。またaccept
するためのソケットも作成する。
def start(): Unit = synchronized {
try {
~~
if (serverChannel == null) {
# acceptするソケットを作成する
serverChannel = openServerSocket(endPoint.host, endPoint.port, listenBacklogSize)
debug(s"Opened endpoint ${endPoint.host}:${endPoint.port}")
}
debug(s"Starting processors for listener ${endPoint.listenerName}")
# Processorの起動
processors.foreach(_.start())
~~
# 自身のスレッドをstart
thread.start()
ソケットはノンブロッキングモードで作成する
val serverChannel = ServerSocketChannel.open()
serverChannel.configureBlocking(false)
Acceptor.start
で作成したソケットは、Acceptor.run
でacceptが可能になったかどうかの監視がされる。ここでは、Selector
を使ったノンブロッキングのaccept
とIO多重化が行われている
Selector
の動作イメージは、以下の記事などを読むと掴める
ref. Java によるネットワークプログラミングの基礎知識
override def run(): Unit = {
serverChannel.register(nioSelector, SelectionKey.OP_ACCEPT)
try {
while (shouldRun.get()) {
try {
acceptNewConnections()
closeThrottledConnections()
}
~~
Acceptor.acceptNewConnection
でソケットがaccept可能になるまで待ち、acceptが可能になったらAcceptor.accept
を実行しクライアントからの接続を待つ
private def acceptNewConnections(): Unit = {
val ready = nioSelector.select(500)
~~
if (key.isAcceptable) {
accept(key).foreach { socketChannel =>
~~
do {
~~
} while (!assignNewConnection(socketChannel, processor, retriesLeft == 0))
~~
private def accept(key: SelectionKey): Option[SocketChannel] = {
val serverSocketChannel = key.channel().asInstanceOf[ServerSocketChannel]
val socketChannel = serverSocketChannel.accept()
~~
Processor
の起動
Acceptor.start
の中でProcessor
の起動を行う
Processor
の起動処理は
Processor.run
でされていて、Processor.configureNewConnections
を実行する
override def run(): Unit = {
try {
while (shouldRun.get()) {
try {
// setup any new connections that have been queued up
configureNewConnections()
~~
poll()
~~
Processor.configureNewConnections
ではArrayBlockingQueue
であるnewConnections
をpoll
してnewConnections
にソケットが追加されるのを待つ。ソケットが追加された場合は、Selector.register
を実行し、ソケットが読み込み可能になるのを監視する。
private def configureNewConnections(): Unit = {
var connectionsProcessed = 0
while (connectionsProcessed < connectionQueueSize && !newConnections.isEmpty) {
val channel = newConnections.poll()
~~
selector.register(connectionId(channel.socket), channel)
~~
}
Processor.poll
ではselector
をpoll
して、そのソケットが読み込み可能になるまで待つ
private def poll(): Unit = {
val pollTimeout = if (newConnections.isEmpty) 300 else 0
try selector.poll(pollTimeout)
~~
}
Selector.poll
では、selectを実行する
public void poll(long timeout) throws IOException {
~~
/* check ready keys */
long startSelect = time.nanoseconds();
int numReadyKeys = select(timeout);
long endSelect = time.nanoseconds();
KafkaRequestHandlerPool
の作成
Kafka.startup
で
numIoThreads
だけKafkaRequestHandler
を作る
dataPlaneRequestHandlerPool = new KafkaRequestHandlerPool(config.brokerId, socketServer.dataPlaneRequestChannel, dataPlaneRequestProcessor, time,
config.numIoThreads, s"${DataPlaneAcceptor.MetricPrefix}RequestHandlerAvgIdlePercent", DataPlaneAcceptor.ThreadPrefix)
class KafkaRequestHandlerPool(
~~
for (i <- 0 until numThreads) {
createHandler(i)
}
作成されたKafkaRequestHandler
はRequetChannel
にリクエストが来るのを待つ
def run(): Unit = {
threadRequestChannel.set(requestChannel)
while (!stopped) {
~~
val req = requestChannel.receiveRequest(300)
接続を受け付けてからIO Threadに処理が移るまで
シーケンス図
接続を受け付けてからNetwork Threadに処理が移るまで
接続があった場合、 Acceptor.accept
が返りAcceptor.assignNewConnection
を実行する
if (key.isAcceptable) {
accept(key).foreach { socketChannel =>
~~
do {
~~
} while (!assignNewConnection(socketChannel, processor, retriesLeft == 0))
~~
Acceptor.accept
はaccept
した新しいソケットを返す
private def accept(key: SelectionKey): Option[SocketChannel] = {
val serverSocketChannel = key.channel().asInstanceOf[ServerSocketChannel]
val socketChannel = serverSocketChannel.accept()
try {
~~
Some(socketChannel)
~~
Acceptor.assignNewConnection
では、Processor#accept
を実行しprocessorにソケットをassignする。
private def assignNewConnection(socketChannel: SocketChannel, processor: Processor, mayBlock: Boolean): Boolean = {
if (processor.accept(socketChannel, mayBlock, blockedPercentMeter)) {
Processor.accept
では、BlockindgQueue
であるnewConnections
にソケットの挿入を試み、成功したらProcessor.wakeup
を実行する。また、wakeup
を呼ぶので、select
(Processor.run
内のProcessor.poll
)でブロックしている箇所の処理が進む。
def accept(socketChannel: SocketChannel,
mayBlock: Boolean,
acceptorIdlePercentMeter: com.yammer.metrics.core.Meter): Boolean = {
val accepted = {
if (newConnections.offer(socketChannel))
true
~~
}
if (accepted)
wakeup()
ref. https://docs.oracle.com/javase/jp/8/docs/api/java/util/concurrent/BlockingQueue.html
Network ThreadがIO Threadに処理を移すまで
newConnections
にソケットが入るとProcessor.configureNewConnections
から先に進み、Processor#poll
でソケットが読み込み可能になるまで待ったあと、Processor.processCompletedReceives
を実行しrequestChannel
にリクエストを送る。requestChannel
はRequest Queueでこの段階ではqueueにリクエストがputされるだけ
private def processCompletedReceives(): Unit = {
~~
val req = new RequestChannel.Request(processor = id, context = context,
startTimeNanos = nowNanos, memoryPool, receive.payload, requestChannel.metrics, None)
~~
requestChannel.sendRequest(req)
IO Threadが処理を実行し、クライアントにレスポンスが返るまで
RequetChannel
にリクエストが来るのを待っているKafkaRequestHandler.run
が先に進む
def run(): Unit = {
threadRequestChannel.set(requestChannel)
while (!stopped) {
~~
val req = requestChannel.receiveRequest(300)
RequestChannel.Request
の場合、ApiHandler.handle
を実行する
case request: RequestChannel.Request =>
try {
request.requestDequeueTimeNanos = endTime
trace(s"Kafka request handler $id on broker $brokerId handling request $request")
threadCurrentRequest.set(request)
apis.handle(request, requestLocal)
KafkaApis.handle
でそれぞれのリクエストの種類に応じてメソッドを呼び出す。
override def handle(request: RequestChannel.Request, requestLocal: RequestLocal): Unit = {
try {
~~
request.header.apiKey match {
case ApiKeys.PRODUCE => handleProduceRequest(request, requestLocal)
Produce Requestの場合は、KafkaApis.handleProduceRequest
を実行する
KafkaApis.handleProduceRequest
からReplicaManager.handleProcessAppend
を呼びローカルに書きこむ。
def handleProduceRequest(request: RequestChannel.Request, requestLocal: RequestLocal): Unit = {
~~
replicaManager.handleProduceAppend(
timeout = produceRequest.timeout.toLong,
requiredAcks = produceRequest.acks,
internalTopicsAllowed = internalTopicsAllowed,
transactionalId = produceRequest.transactionalId,
entriesPerPartition = authorizedRequestInfo,
responseCallback = sendResponseCallback,
recordValidationStatsCallback = processingStatsCallback,
requestLocal = requestLocal,
transactionSupportedOperation = transactionSupportedOperation)
ReplicaManager.handleProceAppend
自体はリーダーへの書き込みが完了したらすぐに返るが、acks=all
の場合、レプリケーションの完了を待つ。
ref. https://kafka.apache.org/37/documentation.html#producerconfigs_acks
This means the leader will wait for the full set of in-sync replicas to acknowledge the record
private def maybeAddDelayedProduce(
~~
if (delayedProduceRequestRequired(requiredAcks, entriesPerPartition, initialAppendResults)) {
// create delayed produce operation
val produceMetadata = ProduceMetadata(requiredAcks, initialProduceStatus)
val delayedProduce = new DelayedProduce(timeoutMs, produceMetadata, this, responseCallback, delayedProduceLock)
// create a list of (topic, partition) pairs to use as keys for this delayed produce operation
val producerRequestKeys = entriesPerPartition.keys.map(TopicPartitionOperationKey(_)).toSeq
// try to complete the request immediately, otherwise put it into the purgatory
// this is because while the delayed produce operation is being created, new
// requests may arrive and hence make this operation completable.
delayedProducePurgatory.tryCompleteElseWatch(delayedProduce, producerRequestKeys)
この完了を待つときにDelayedProduceが使われる。
/**
* The delayed produce operation can be completed if every partition
* it produces to is satisfied by one of the following:
*
* Case A: Replica not assigned to partition
* Case B: Replica is no longer the leader of this partition
* Case C: This broker is the leader:
* C.1 - If there was a local error thrown while checking if at least requiredAcks
* replicas have caught up to this operation: set an error in response
* C.2 - Otherwise, set the response with no error.
*/
override def tryComplete(): Boolean = {
// check for each partition if it still has pending acks
produceMetadata.produceStatus.forKeyValue { (topicPartition, status) =>
trace(s"Checking produce satisfaction for $topicPartition, current status $status")
// skip those partitions that have already been satisfied
if (status.acksPending) {
val (hasEnough, error) = replicaManager.getPartitionOrError(topicPartition) match {
case Left(err) =>
// Case A
(false, err)
case Right(partition) =>
partition.checkEnoughReplicasReachOffset(status.requiredOffset)
}
// Case B || C.1 || C.2
if (error != Errors.NONE || hasEnough) {
status.acksPending = false
status.responseStatus.error = error
}
}
}
// check if every partition has satisfied at least one of case A, B or C
if (!produceMetadata.produceStatus.values.exists(_.acksPending))
forceComplete()
else
false
}
これは上の図(Inside the Apache Kafka Broker
)のPurgatoryに相当する処理を行っている。
Purgatoryが必要な理由は、Inside the Apache Kafka Brokerで以下のように説明されている
To avoid tying up the I/O threads while waiting for the replication step to complete, the request object will be stored in a map-like data structure called purgatory (it’s where things go to wait).
レプリカに対するリクエストの完了を待たずにIO Threadを早く解放するのが目的。
リクエストが完了したらPurgatoryに登録してあるresponseCallbackが実行される。responseCallback
は
KafkaApis#handleProduceRequest
でReplicaManager.handleProcessAppend
を実行するときに作成されていて、RequestChannel.sendReponse
でレスポンスを返す
def sendResponseCallback(responseStatus: Map[TopicPartition, PartitionResponse]): Unit = {
~~
requestChannel.sendResponse(request, new ProduceResponse(mergedResponseStatus.asJava, maxThrottleTimeMs, nodeEndpoints.values.toList.asJava), None)
}
}
RequestChannel.sendResponse
でProccessor.enqueueResponse
を実行しresponseQueue
にレスポンスを入れる
private[network] def sendResponse(response: RequestChannel.Response): Unit = {
~~
processor.enqueueResponse(response)
responseQueue
はProcessor.run
で実行されるprocessNewResponses
で処理される
override def run(): Unit = {
try {
while (shouldRun.get()) {
try {
// setup any new connections that have been queued up
configureNewConnections()
// register any new responses for writing
processNewResponses()
processNewResponses
からsendResponse
が呼ばれてクライアントにレスポンスが返る
private def processNewResponses(): Unit = {
var currentResponse: RequestChannel.Response = null
while ({currentResponse = dequeueResponse(); currentResponse != null}) {
val channelId = currentResponse.request.context.connectionId
try {
currentResponse match {
~~
case response: SendResponse =>
sendResponse(response, response.responseSend)
Requestメトリクス
ついでに、以下のメトリクスについて調べる
-
RequestQueueTimeMs
-
Time the request waits in the request queue
-
-
LocalTimeMs
-
Time the request is processed at the leader
-
-
RemoteTimeMs
-
Time the request waits for the follower
-
-
ResponseQueueTimeMs
-
Time the request waits in the response queue
-
-
ResponseSendTimeMs
-
Time to send the response
-
-
TotalTimeMs
-
Request total time
-
KafkaのRequestに関するメトリクスはRequestChannel
で計算されている
val requestQueueTimeMs = nanosToMs(requestDequeueTimeNanos - startTimeNanos)
~~
val apiLocalTimeMs = nanosToMs(apiLocalCompleteTimeNanos - requestDequeueTimeNanos + callbackRequestTimeNanos)
val apiRemoteTimeMs = nanosToMs(responseCompleteTimeNanos - apiLocalCompleteTimeNanos - callbackRequestTimeNanos)
val responseQueueTimeMs = nanosToMs(responseDequeueTimeNanos - responseCompleteTimeNanos)
val responseSendTimeMs = nanosToMs(endTimeNanos - responseDequeueTimeNanos)
val messageConversionsTimeMs = nanosToMs(messageConversionsTimeNanos)
val totalTimeMs = nanosToMs(endTimeNanos - startTimeNanos)
シーケンス図
RequestQueueTimeMs
requestDequeueTimeNanos
- startTimeNanos
で計算される
LocalTimeMs
apiLocalCompleteTimeNanos
- requestDequeueTimeNano
で計算される
RemoteTimeMs
responseCompleteTimeNanos
- apiLocalCompleteTimeNanos
で計算される
ResponseQueueTimeMs
responseDequeueTimeNanos
- responseCompleteTimeNanos
で計算される
ResponseSendTimeMs
endTimeNanos
- responseDequeueTimeNanos
で計算される
TotalTimeMs
endTimeNanos
- startTimeNanos
で計算される
各時刻がセットされる場所
startTimeNanos
Processor.processCompletedReceives
で現在時刻がセットされる
val req = new RequestChannel.Request(processor = id, context = context,
startTimeNanos = nowNanos, memoryPool, receive.payload, requestChannel.metrics, None)
requestDequeueTimeNanos
KafkaRequestHandler.runでRequestChannel
からリクエストを受信した時刻
def run(): Unit = {
val req = requestChannel.receiveRequest(300)
val endTime = time.nanoseconds
~~
case request: RequestChannel.Request =>
try {
request.requestDequeueTimeNanos = endTime
apiLocalCompleteTimeNanos
KafkaApis.handleのfinally
ブロックで現在時刻がセットされる
override def handle(request: RequestChannel.Request, requestLocal: RequestLocal): Unit = {
~~
} finally {
~~
if (request.apiLocalCompleteTimeNanos < 0)
request.apiLocalCompleteTimeNanos = time.nanoseconds
responseCompleteTimeNanos
RequestChannel.sendReponse
でレスポンスを受信したときに現在時刻がセットされる
private[network] def sendResponse(response: RequestChannel.Response): Unit = {
~~
response match {
// We should only send one of the following per request
case _: SendResponse | _: NoOpResponse | _: CloseConnectionResponse =>
val request = response.request
val timeNanos = time.nanoseconds()
request.responseCompleteTimeNanos = timeNanos
responseDequeueTimeNanos
SocketServer.dequeueResponseで現在時刻がセットされる
private def dequeueResponse(): RequestChannel.Response = {
val response = responseQueue.poll()
if (response != null)
response.request.responseDequeueTimeNanos = Time.SYSTEM.nanoseconds
response
}
endTimeNanos
SocketServer.processCompletedSendsからRequestChannel.updateRequestMetricsで現在時刻がセットされる
# SocketServer
private def processCompletedSends(): Unit = {
selector.completedSends.forEach { send =>
try {
~~
// Invoke send completion callback, and then update request metrics since there might be some
// request metrics got updated during callback
response.onComplete.foreach(onComplete => onComplete(send))
updateRequestMetrics(response)
# RequestChannel
def updateRequestMetrics(networkThreadTimeNanos: Long, response: Response): Unit = {
val endTimeNanos = Time.SYSTEM.nanoseconds
Discussion