🔥

メモ: KafkaでProduce Requestがどう処理されるのか

2024/12/24に公開

以下の動画と資料がとても分かりやすい

Inside the Apache Kafka Broker


Inside the Apache Kafka Broker より

上記の理解を踏まえて、実際にApache Kafkaのソースコードはどうなっているのか追ってみる

対象は3.7.0
https://github.com/apache/kafka/tree/2ae524ed625438c5fee89e78648bd73e64a3ada0

KafkaSeverの起動

KafkaServer.startupで起動する。

やっていること

  • SocketServerの起動
    • RequestChannelの作成
      • Requeust Queueに相当する
    • (DataPlane)AcceptorProcessorの作成
      • ProcessorはNetwork Threadに相当する
      • (DataPlane)Acceptoracceptを実行してクライアントからの接続を待つ
    • (DataPlane)Acceptorの起動
    • Processorの起動
  • KafkaRequestHandlerPoolの作成
    • KafkaRequestHandlerはIO Threadに相当する

クラス図

シーケンス図

最終的にどういう状態になるか

  • DataPlaneAcceptorはソケットを作りacceptしてクライアントから接続が来るのを待つ
  • ProcessornewConnectionsにソケットが入るのを待つ
  • KafkaRequestHandlerRequestChannelにリクエストが入るのを待つ

SocketServerの起動

KafkaServer.startupSocketServerを作成する

socketServer = new SocketServer(config, metrics, time, credentialProvider, apiVersionManager)

RequestChannelの作成

SocketServerのコンストラクタでRequestChannelを作成する

val dataPlaneRequestChannel = new RequestChannel(maxQueuedRequests, DataPlaneAcceptor.MetricPrefix, time, apiVersionManager.newRequestMetrics)

(DataPlane)AcceptorProcessorの作成

SocketServerのコンストラクタでcreateDataPlaneAcceptorAndProcessorsを実行してDataPlaneAcceptorを作成DataPlaneAcceptor.configureを実行する

  if (apiVersionManager.listenerType.equals(ListenerType.CONTROLLER)) {
    config.controllerListeners.foreach(createDataPlaneAcceptorAndProcessors)
  } else {
    config.controlPlaneListener.foreach(createControlPlaneAcceptorAndProcessor)
    config.dataPlaneListeners.foreach(createDataPlaneAcceptorAndProcessors)
  }
  def createDataPlaneAcceptorAndProcessors(endpoint: EndPoint): Unit = synchronized {
~~
    val dataPlaneAcceptor = createDataPlaneAcceptor(endpoint, isPrivilegedListener, dataPlaneRequestChannel)
    config.addReconfigurable(dataPlaneAcceptor)
    dataPlaneAcceptor.configure(parsedConfigs)
~~
  }
~~

  protected def createDataPlaneAcceptor(endPoint: EndPoint, isPrivilegedListener: Boolean, requestChannel: RequestChannel): DataPlaneAcceptor = {
    new DataPlaneAcceptor(this, endPoint, config, nodeId, connectionQuotas, time, isPrivilegedListener, requestChannel, metrics, credentialProvider, logContext, memoryPool, apiVersionManager)
  }

DataPlaneAcceptor.configureAcceptor.addProcessorsを実行する

  override def configure(configs: util.Map[String, _]): Unit = {
    addProcessors(configs.get(KafkaConfig.NumNetworkThreadsProp).asInstanceOf[Int])
  }

Acceptor.addProcessorsKafkaConfig.NumNetworkThreadsPropだけProcessorを作成し、そのProcessorを自身(Acceptorprocessors)とRequestChannelに追加する

  def addProcessors(toCreate: Int): Unit = synchronized {
    val listenerName = endPoint.listenerName
    val securityProtocol = endPoint.securityProtocol
    val listenerProcessors = new ArrayBuffer[Processor]()

    for (_ <- 0 until toCreate) {
      val processor = newProcessor(socketServer.nextProcessorId(), listenerName, securityProtocol)
      listenerProcessors += processor
      requestChannel.addProcessor(processor)
~~
    }
    processors ++= listenerProcessors
  }

(DataPlane)Acceptorの起動

KafkaServer.startupからSocketServer.enableRequestProcessingを実行しAcceptor.startを実行する

  def enableRequestProcessing(
    authorizerFutures: Map[Endpoint, CompletableFuture[Void]]
  ): CompletableFuture[Void] = this.synchronized {
~
    def chainAcceptorFuture(acceptor: Acceptor): Unit = {
~~
          acceptor.start()
~~
      })
    }

    info("Enabling request processing.")
~~

Acceptor.startの中で自身のスレッドと作成したProcessorの起動を行う。またacceptするためのソケットも作成する。

  def start(): Unit = synchronized {
    try {
~~
      if (serverChannel == null) {
        # acceptするソケットを作成する
        serverChannel = openServerSocket(endPoint.host, endPoint.port, listenBacklogSize)
        debug(s"Opened endpoint ${endPoint.host}:${endPoint.port}")
      }
      debug(s"Starting processors for listener ${endPoint.listenerName}")
      # Processorの起動
      processors.foreach(_.start())
~~
      # 自身のスレッドをstart
      thread.start()

ソケットはノンブロッキングモードで作成する

   val serverChannel = ServerSocketChannel.open()
    serverChannel.configureBlocking(false)

Acceptor.startで作成したソケットは、Acceptor.runでacceptが可能になったかどうかの監視がされる。ここでは、Selectorを使ったノンブロッキングのacceptとIO多重化が行われている

Selectorの動作イメージは、以下の記事などを読むと掴める

ref. Java によるネットワークプログラミングの基礎知識

  override def run(): Unit = {
    serverChannel.register(nioSelector, SelectionKey.OP_ACCEPT)
    try {
      while (shouldRun.get()) {
        try {
          acceptNewConnections()
          closeThrottledConnections()
        }
~~

Acceptor.acceptNewConnectionでソケットがaccept可能になるまで待ち、acceptが可能になったらAcceptor.acceptを実行しクライアントからの接続を待つ

  private def acceptNewConnections(): Unit = {
    val ready = nioSelector.select(500)
~~
          if (key.isAcceptable) {
            accept(key).foreach { socketChannel =>
~~
              do {
~~
              } while (!assignNewConnection(socketChannel, processor, retriesLeft == 0))
~~
  private def accept(key: SelectionKey): Option[SocketChannel] = {
    val serverSocketChannel = key.channel().asInstanceOf[ServerSocketChannel]
    val socketChannel = serverSocketChannel.accept()
~~

Processorの起動

Acceptor.startの中でProcessorの起動を行う

Processorの起動処理は
Processor.runでされていて、Processor.configureNewConnectionsを実行する

override def run(): Unit = {
    try {
      while (shouldRun.get()) {
        try {
          // setup any new connections that have been queued up
          configureNewConnections()
          ~~
          poll()

~~

Processor.configureNewConnectionsではArrayBlockingQueueであるnewConnectionspollしてnewConnectionsにソケットが追加されるのを待つ。ソケットが追加された場合は、Selector.registerを実行し、ソケットが読み込み可能になるのを監視する。

private def configureNewConnections(): Unit = {
    var connectionsProcessed = 0
    while (connectionsProcessed < connectionQueueSize && !newConnections.isEmpty) {
      val channel = newConnections.poll()
~~
        selector.register(connectionId(channel.socket), channel)
~~
    }

Processor.pollではselectorpollして、そのソケットが読み込み可能になるまで待つ

  private def poll(): Unit = {
    val pollTimeout = if (newConnections.isEmpty) 300 else 0
    try selector.poll(pollTimeout)
~~
  }

Selector.pollでは、selectを実行する

    public void poll(long timeout) throws IOException {
~~
        /* check ready keys */
        long startSelect = time.nanoseconds();
        int numReadyKeys = select(timeout);
        long endSelect = time.nanoseconds();

KafkaRequestHandlerPoolの作成

Kafka.startup
numIoThreadsだけKafkaRequestHandlerを作る

        dataPlaneRequestHandlerPool = new KafkaRequestHandlerPool(config.brokerId, socketServer.dataPlaneRequestChannel, dataPlaneRequestProcessor, time,
          config.numIoThreads, s"${DataPlaneAcceptor.MetricPrefix}RequestHandlerAvgIdlePercent", DataPlaneAcceptor.ThreadPrefix)

KafkaRequestHandlerPool

class KafkaRequestHandlerPool(
~~
  for (i <- 0 until numThreads) {
    createHandler(i)
  }

作成されたKafkaRequestHandlerRequetChannelにリクエストが来るのを待つ

  def run(): Unit = {
    threadRequestChannel.set(requestChannel)
    while (!stopped) {
~~
      val req = requestChannel.receiveRequest(300)

接続を受け付けてからIO Threadに処理が移るまで

シーケンス図

接続を受け付けてからNetwork Threadに処理が移るまで

接続があった場合、 Acceptor.acceptが返りAcceptor.assignNewConnectionを実行する

          if (key.isAcceptable) {
            accept(key).foreach { socketChannel =>
~~
              do {
~~
              } while (!assignNewConnection(socketChannel, processor, retriesLeft == 0))
~~

Acceptor.acceptacceptした新しいソケットを返す

  private def accept(key: SelectionKey): Option[SocketChannel] = {
    val serverSocketChannel = key.channel().asInstanceOf[ServerSocketChannel]
    val socketChannel = serverSocketChannel.accept()
    try {
~~
      Some(socketChannel)
~~

Acceptor.assignNewConnectionでは、Processor#acceptを実行しprocessorにソケットをassignする。

  private def assignNewConnection(socketChannel: SocketChannel, processor: Processor, mayBlock: Boolean): Boolean = {
    if (processor.accept(socketChannel, mayBlock, blockedPercentMeter)) {

Processor.acceptでは、BlockindgQueueであるnewConnectionsにソケットの挿入を試み、成功したらProcessor.wakeupを実行する。また、wakeupを呼ぶので、select(Processor.run内のProcessor.poll)でブロックしている箇所の処理が進む。

  def accept(socketChannel: SocketChannel,
             mayBlock: Boolean,
             acceptorIdlePercentMeter: com.yammer.metrics.core.Meter): Boolean = {
    val accepted = {
      if (newConnections.offer(socketChannel))
        true
~~
    }
    if (accepted)
      wakeup()

ref. https://docs.oracle.com/javase/jp/8/docs/api/java/util/concurrent/BlockingQueue.html

Network ThreadがIO Threadに処理を移すまで

newConnectionsにソケットが入るとProcessor.configureNewConnectionsから先に進み、Processor#pollでソケットが読み込み可能になるまで待ったあと、Processor.processCompletedReceivesを実行しrequestChannelにリクエストを送る。requestChannelはRequest Queueでこの段階ではqueueにリクエストがputされるだけ

  private def processCompletedReceives(): Unit = {
~~
                    val req = new RequestChannel.Request(processor = id, context = context,
                  startTimeNanos = nowNanos, memoryPool, receive.payload, requestChannel.metrics, None)
~~
                    requestChannel.sendRequest(req)

IO Threadが処理を実行し、クライアントにレスポンスが返るまで

RequetChannelにリクエストが来るのを待っているKafkaRequestHandler.runが先に進む

  def run(): Unit = {
    threadRequestChannel.set(requestChannel)
    while (!stopped) {
~~
      val req = requestChannel.receiveRequest(300)

RequestChannel.Requestの場合、ApiHandler.handleを実行する

        case request: RequestChannel.Request =>
          try {
            request.requestDequeueTimeNanos = endTime
            trace(s"Kafka request handler $id on broker $brokerId handling request $request")
            threadCurrentRequest.set(request)
            apis.handle(request, requestLocal)

KafkaApis.handleでそれぞれのリクエストの種類に応じてメソッドを呼び出す。

  override def handle(request: RequestChannel.Request, requestLocal: RequestLocal): Unit = {

    try {
~~
      request.header.apiKey match {
        case ApiKeys.PRODUCE => handleProduceRequest(request, requestLocal)

Produce Requestの場合は、KafkaApis.handleProduceRequestを実行する

KafkaApis.handleProduceRequestからReplicaManager.handleProcessAppendを呼びローカルに書きこむ。

def handleProduceRequest(request: RequestChannel.Request, requestLocal: RequestLocal): Unit = {
~~
     replicaManager.handleProduceAppend(
        timeout = produceRequest.timeout.toLong,
        requiredAcks = produceRequest.acks,
        internalTopicsAllowed = internalTopicsAllowed,
        transactionalId = produceRequest.transactionalId,
        entriesPerPartition = authorizedRequestInfo,
        responseCallback = sendResponseCallback,
        recordValidationStatsCallback = processingStatsCallback,
        requestLocal = requestLocal,
        transactionSupportedOperation = transactionSupportedOperation)

ReplicaManager.handleProceAppend自体はリーダーへの書き込みが完了したらすぐに返るが、acks=allの場合、レプリケーションの完了を待つ

ref. https://kafka.apache.org/37/documentation.html#producerconfigs_acks

This means the leader will wait for the full set of in-sync replicas to acknowledge the record

  private def maybeAddDelayedProduce(
~~
    if (delayedProduceRequestRequired(requiredAcks, entriesPerPartition, initialAppendResults)) {
      // create delayed produce operation
      val produceMetadata = ProduceMetadata(requiredAcks, initialProduceStatus)
      val delayedProduce = new DelayedProduce(timeoutMs, produceMetadata, this, responseCallback, delayedProduceLock)

      // create a list of (topic, partition) pairs to use as keys for this delayed produce operation
      val producerRequestKeys = entriesPerPartition.keys.map(TopicPartitionOperationKey(_)).toSeq

      // try to complete the request immediately, otherwise put it into the purgatory
      // this is because while the delayed produce operation is being created, new
      // requests may arrive and hence make this operation completable.
      delayedProducePurgatory.tryCompleteElseWatch(delayedProduce, producerRequestKeys)

この完了を待つときにDelayedProduceが使われる。

  /**
   * The delayed produce operation can be completed if every partition
   * it produces to is satisfied by one of the following:
   *
   * Case A: Replica not assigned to partition
   * Case B: Replica is no longer the leader of this partition
   * Case C: This broker is the leader:
   *   C.1 - If there was a local error thrown while checking if at least requiredAcks
   *         replicas have caught up to this operation: set an error in response
   *   C.2 - Otherwise, set the response with no error.
   */
  override def tryComplete(): Boolean = {
    // check for each partition if it still has pending acks
    produceMetadata.produceStatus.forKeyValue { (topicPartition, status) =>
      trace(s"Checking produce satisfaction for $topicPartition, current status $status")
      // skip those partitions that have already been satisfied
      if (status.acksPending) {
        val (hasEnough, error) = replicaManager.getPartitionOrError(topicPartition) match {
          case Left(err) =>
            // Case A
            (false, err)

          case Right(partition) =>
            partition.checkEnoughReplicasReachOffset(status.requiredOffset)
        }

        // Case B || C.1 || C.2
        if (error != Errors.NONE || hasEnough) {
          status.acksPending = false
          status.responseStatus.error = error
        }
      }
    }

    // check if every partition has satisfied at least one of case A, B or C
    if (!produceMetadata.produceStatus.values.exists(_.acksPending))
      forceComplete()
    else
      false
  }

これは上の図(Inside the Apache Kafka Broker
)のPurgatoryに相当する処理を行っている。

Purgatoryが必要な理由は、Inside the Apache Kafka Brokerで以下のように説明されている

To avoid tying up the I/O threads while waiting for the replication step to complete, the request object will be stored in a map-like data structure called purgatory (it’s where things go to wait).

レプリカに対するリクエストの完了を待たずにIO Threadを早く解放するのが目的。

リクエストが完了したらPurgatoryに登録してあるresponseCallbackが実行される。responseCallback
KafkaApis#handleProduceRequestReplicaManager.handleProcessAppendを実行するときに作成されていて、RequestChannel.sendReponseでレスポンスを返す

    def sendResponseCallback(responseStatus: Map[TopicPartition, PartitionResponse]): Unit = {
 ~~
        requestChannel.sendResponse(request, new ProduceResponse(mergedResponseStatus.asJava, maxThrottleTimeMs, nodeEndpoints.values.toList.asJava), None)
      }
    }

RequestChannel.sendResponseProccessor.enqueueResponseを実行しresponseQueueにレスポンスを入れる

private[network] def sendResponse(response: RequestChannel.Response): Unit = {
~~
 processor.enqueueResponse(response)

responseQueueProcessor.runで実行されるprocessNewResponsesで処理される

  override def run(): Unit = {
    try {
      while (shouldRun.get()) {
        try {
          // setup any new connections that have been queued up
          configureNewConnections()
          // register any new responses for writing
          processNewResponses()

processNewResponsesからsendResponseが呼ばれてクライアントにレスポンスが返る

  private def processNewResponses(): Unit = {
    var currentResponse: RequestChannel.Response = null
    while ({currentResponse = dequeueResponse(); currentResponse != null}) {
      val channelId = currentResponse.request.context.connectionId
      try {
        currentResponse match {
~~
          case response: SendResponse =>
            sendResponse(response, response.responseSend)

Requestメトリクス

ついでに、以下のメトリクスについて調べる

https://kafka.apache.org/documentation/#monitoring

  • RequestQueueTimeMs
    • Time the request waits in the request queue

  • LocalTimeMs
    • Time the request is processed at the leader

  • RemoteTimeMs
    • Time the request waits for the follower

  • ResponseQueueTimeMs
    • Time the request waits in the response queue

  • ResponseSendTimeMs
    • Time to send the response

  • TotalTimeMs
    • Request total time

KafkaのRequestに関するメトリクスはRequestChannelで計算されている

      val requestQueueTimeMs = nanosToMs(requestDequeueTimeNanos - startTimeNanos)
~~
      val apiLocalTimeMs = nanosToMs(apiLocalCompleteTimeNanos - requestDequeueTimeNanos + callbackRequestTimeNanos)
      val apiRemoteTimeMs = nanosToMs(responseCompleteTimeNanos - apiLocalCompleteTimeNanos - callbackRequestTimeNanos)
      val responseQueueTimeMs = nanosToMs(responseDequeueTimeNanos - responseCompleteTimeNanos)
      val responseSendTimeMs = nanosToMs(endTimeNanos - responseDequeueTimeNanos)
      val messageConversionsTimeMs = nanosToMs(messageConversionsTimeNanos)
      val totalTimeMs = nanosToMs(endTimeNanos - startTimeNanos)

シーケンス図

RequestQueueTimeMs

requestDequeueTimeNanos - startTimeNanosで計算される

LocalTimeMs

apiLocalCompleteTimeNanos - requestDequeueTimeNanoで計算される

RemoteTimeMs

responseCompleteTimeNanos - apiLocalCompleteTimeNanosで計算される

ResponseQueueTimeMs

responseDequeueTimeNanos - responseCompleteTimeNanosで計算される

ResponseSendTimeMs

endTimeNanos - responseDequeueTimeNanosで計算される

TotalTimeMs

endTimeNanos - startTimeNanosで計算される

各時刻がセットされる場所

startTimeNanos

Processor.processCompletedReceivesで現在時刻がセットされる

  val req = new RequestChannel.Request(processor = id, context = context,
    startTimeNanos = nowNanos, memoryPool, receive.payload, requestChannel.metrics, None)

requestDequeueTimeNanos

KafkaRequestHandler.runRequestChannelからリクエストを受信した時刻

def run(): Unit = {
  val req = requestChannel.receiveRequest(300)
  val endTime = time.nanoseconds
~~
    case request: RequestChannel.Request =>
      try {
        request.requestDequeueTimeNanos = endTime

apiLocalCompleteTimeNanos

KafkaApis.handlefinallyブロックで現在時刻がセットされる

  override def handle(request: RequestChannel.Request, requestLocal: RequestLocal): Unit = {
~~
    } finally {
~~
      if (request.apiLocalCompleteTimeNanos < 0)
        request.apiLocalCompleteTimeNanos = time.nanoseconds

responseCompleteTimeNanos

RequestChannel.sendReponseでレスポンスを受信したときに現在時刻がセットされる

  private[network] def sendResponse(response: RequestChannel.Response): Unit = {
~~
    response match {
      // We should only send one of the following per request
      case _: SendResponse | _: NoOpResponse | _: CloseConnectionResponse =>
        val request = response.request
        val timeNanos = time.nanoseconds()
        request.responseCompleteTimeNanos = timeNanos

responseDequeueTimeNanos

SocketServer.dequeueResponseで現在時刻がセットされる

  private def dequeueResponse(): RequestChannel.Response = {
    val response = responseQueue.poll()
    if (response != null)
      response.request.responseDequeueTimeNanos = Time.SYSTEM.nanoseconds
    response
  }

endTimeNanos

SocketServer.processCompletedSendsからRequestChannel.updateRequestMetricsで現在時刻がセットされる

# SocketServer
  private def processCompletedSends(): Unit = {
    selector.completedSends.forEach { send =>
      try {
~~
        // Invoke send completion callback, and then update request metrics since there might be some
        // request metrics got updated during callback
        response.onComplete.foreach(onComplete => onComplete(send))
        updateRequestMetrics(response)
# RequestChannel
    def updateRequestMetrics(networkThreadTimeNanos: Long, response: Response): Unit = {
      val endTimeNanos = Time.SYSTEM.nanoseconds

Discussion