😺

kafka-reassign-partitionsの使い方

に公開

公式ドキュメントを読めば基本的な使い方は分かるのですが、
日本語のまとまった記事が欲しかったので自分が欲しいと思った情報をまとめました。

kafka-reassign-partitionsについて

Kafkaではノードを新しく追加しても自動的にデータの再配置をやってくれないので、新しく追加されたノードにも既存のデータを配置したい場合は、自分でそのためのオペレーションを実行する必要があります。そのようなトピックのデータを移動するためのツールがpartition reassignment toolです。

上記の例以外にもpartition reassignment toolは以下のような場面で使われます。

  • すでにあるトピックのReplication Factorを増やしたい、または減らしたい
  • あるKafka Brokerにあるデータを別のKafka Brokerに移動したい(rebalance)

partition reassignment toolの使い方

reassignは以下のような流れで実施します。

  • パーティションの割り当て情報が書かれたファイルを作成する
  • partition reassignment toolを実行する
  • reassignの進行状況を確認する

パーティションの割り当て情報が書かれたファイルを作成する

まずは、パーティションの割り当て情報が書かれたJSONファイルを作成します。

assignment.json
{
  "version": 1,
  "partitions": [
    {
      "topic": "topic1",
      "partition": 0,
      "replicas": [
        1,
        2,
        3
      ],
      "log_dirs": [
        "/var/lib/kafka/data1",
        "/var/lib/kafka/data2",
        "/var/lib/kafka/data3"
      ]
    }
  ]
}

この例ではtopic1のパーティション 0をbroker.idが1、2、3であるKafka Brokerの/var/lib/kafka/data1,/var/lib/kafka/data2,/var/lib/kafka/data3に配置するといった内容になっています。このJSONファイルをpartition reassignment toolに渡すとこの状態になるように、データの移動が行われます。

このファイルはpartition reassignment toolの--generateオプションで自動生成できるのですが、割とナイーブなラウンドロビンで割り当てられるBrokerが決まり、Topicのサイズが考慮されていないなどの制約があるため、基本的にはスクリプトなどを使って自分達が望むような割り当てファイルを生成することになると思います。

なお、log_dirsanyにした場合は、割り当れられているパーティション数が少なくかつディレクトリサイズが小さい順番で選ばれるようです。

reassignを実行する

パーティションの割り当て情報が書かれたファイル(assignment.json)をpartition reassignment toolに渡し--executeオプションを付けて実行します。

kafka-reassign-partitions --bootstrap-server localhost:9092 \
--reassignment-json-file assignment.json \
--execute --throttle 31457280

{"version":1,"partitions":[{"topic":"topic1","partition":0,"replicas":[1,2,3],"log_dirs":["any","any","any"]}]}

Save this to use as the --reassignment-json-file option during rollback
Warning: You must run --verify periodically, until the reassignment completes, to ensure the throttle is removed.
The inter-broker throttle limit was set to 31457280 B/s
Successfully started partition reassignment for topic1-0
Successfully started log directory moves for: topic1-0-

このコマンドが成功するとassignment.jsonの割り当て情報に書かれた配置になるようにデータの移動が始まります。データの移動はKafkaのレプリケーションの仕組みを使って実現されています。

なお、--cancelオプションでreassignそのものをcancelすることもできます。

kafka-reassign-partitions --bootstrap-server localhost:9092 \
--reassignment-json-file assignment.json \
--cancel

reassignが中断されるだけで、データ配置の状態がreassignを実行する前のものに戻るわけではないことに注意が必要です。

データ移動に使う帯域制御の方法

データ移動(レプリケーション)に使える帯域(単位はByte/sec)を--throttleオプションで制御することができます。例えば、--throttle31457280とすると30MiB/secのthrottleがかかります。オプションを指定しないと無制限に帯域を使用してしまい、Kafkaを使っているアプリケーションに影響が出るため--throttleオプションを付けて実行することをお勧めします。

また、throttleの値は--addtionalオプションでreassign実行中に変更することができます。Brokerの負荷が高くなったときにthrottleをもっと絞りたいときなど、後からthrottleを変えたいときに使います。

kafka-reassign-partitions --bootstrap-server localhost:9092 \
--reassignment-json-file assignment.json \
--execute --throttle 15756302 --additional

あまりに低い値を設定するとreassignが全然進まないことがあるので、reassignの進行状況を監視しておくのが良いです。

reassignの進行状況を確認する

主に2つの方法で確認します

  • --verifyオプション
  • replication lagメトリクス

kafka-reassign-partitions--verifyオプションでどのパーティションのreaasignが進んでいるか、または完了しているか確認できます。

# 出力例
kafka-reassign-partitions --bootstrap-server localhost:9092 --reassignment-json-file assignment.json --verify
~~
Reassignment of replica my-topic-75-1022 completed successfully.
Clearing broker-level throttles on brokers 1013,1,6,1021,1012,1006,2,1007,7,1022,1003,1008,1010,4,1005,5,1015,3,1009,1014
Clearing topic-level throttles on topic my-topic

また、具体的にreassignがどのくらい進んでいるかはreplication lagを見ると分かります。具体的には以下のメトリクスです。

kafka.server:type=ReplicaFetcherManager,name=MaxLag,clientId=Replica

ref. https://kafka.apache.org/documentation/#uses_metrics
reassignの進行中、replication lagは減っていきreassignが完了したときにこのreplication lagは0になります。

reassignが完了した後に--verifyオプションを付けてkafka-reassign-partitionsを実行すると、throttleが解除されます。逆に言うとreassignが完了した後に--verifyを実行しないとずっとthrottleがかかったままになってしまいます。

同じノード内でディレクトリのみを変更するreassignの場合

同じノード内でディレクトリのみを変更するreassignの場合、replicationは発生しない(replication lagは0)ので、directoryのoffset lagを見て進捗を確認します。

kafka-log-dirsコマンドでisFutureフラグがtrueになっているdirectoryが、reassign先になっているのでそのdirectoryのoffsetLagで進捗状況を確認できます。

 kafka-log-dirs --bootstrap-server localhost:9092 --describe --topic-list <topic name> \
 | tail -n 1 | jq '.brokers[].logDirs[].partitions[] | select(.isFuture == true)'

c.f. https://cwiki.apache.org/confluence/display/KAFKA/KIP-113%3A+Support+replicas+movement+between+log+directories

throttleについての補足

どのようにthrottleがかかるのかやや分かりにくいと感じたので少し補足します。

reassignにおけるthrottleはLeaderとFollowerにかかるものがありそれぞれ2つの設定で機能します。

  • Leader throttle
    • leader.replication.throttled.rate
    • leader.replication.throttled.replicas
  • Follower throttle
    • follower.replication.throttled.rate
    • follower.replication.throttled.replicas

*.replication.throttled.rateでthrottleの値(Byte/sec)を設定し、*.replication.throttled.replicasでthrottleの対象になるトピックとパーティションを指定します。

Leader throttle、Follower throttleがぞれぞれどのように働くかはKIP-73 Replication Quotasに分かりやすい図があるので、これを読むとイメージできると思います。

Leader throttleはLeaderからFollowerのレプリケーションにかかるthrottleで、設定したQuotaを越えるとFollowerからのFetchリクエストのレスポンスに対象のパーティションデータを含めないようになります。Follower throttleは、FollowerからLeaderにFetchリクエストを送るときに、前回のリクエストまでに受け取ったレスポンスが設定したQuotaを越えると、throttle対象になっているパーティションのデータを今回のFetchリクエストに含めないようになります。

公式ドキュメントの例でいえば、

By default kafka-reassign-partitions.sh will apply the leader throttle to all replicas that exist before the rebalance, any one of which might be leader. It will apply the follower throttle to all move destinations. So if there is a partition with replicas on brokers 101,102, being reassigned to 102,103, a leader throttle, for that partition, would be applied to 101,102 and a follower throttle would be applied to 103 only.

Kafka Broker101, 102が持っているパーティションを101と103が持つようにreassignしたとき、元々のレプリカだった101と102にはLeader throttleが、新しくレプリカになる103にはFollower throttleがかかります。

ブローカーに設定されているLeader throttleやFollower throttleは、kafka-configsコマンドで確認できます。

bin/kafka-configs.sh --describe --bootstrap-server localhost:9092 --entity-type topics
  Configs for topic 'my-topic' are leader.replication.throttled.replicas=1:102,0:101,
      follower.replication.throttled.replicas=1:101,0:102

reassignのときにかかるthrottleですが2つほど注意点があります。

まず1つめは、このthrottleがreassignに伴うレプリケーションだけではなく、通常のレプリケーションに対してもかかってしまうことです。例えば、reassignによって新しくレプリカになるBrokerにFollower throttleがかかりますが、そのBrokerに対するreassignが完了した後、何らかの理由でそのBrokerがIn Sync Replica(ISR)から外れてしまった場合、そのブローカー以外のreassignも含めて全部のreassignが終わらない間はFollower throttleがかかっている状態で通常のレプリケーションが始まるので、ISRに戻るまで時間がかかる可能性があります。

2つめは、*.replication.throttled.replicasで設定していないレプリカとパーティションはthrottleの対象にならないことです。throttleが通常のレプリケーションを含めてかかってしまう影響を緩和するためだと思うのですが、*.replication.throttled.replicasで設定されていないレプリカとパーティションは対象ではないため、これらの*.replication.throttled.rateで設定した値以上のレプリケーションが発生します。そのため。Brokerが耐えられる負荷ぎりぎりの値を設定しまうとBrokerが負荷に耐えられずにアプリケーションに影響が出る可能性があります。

それぞれKIPはあるので今後の改善によってこの問題は解消されるかもしれないです。

https://cwiki.apache.org/confluence/display/KAFKA/KIP-542%3A+Partition+Reassignment+Throttling#KIP542:PartitionReassignmentThrottling-BehaviorofExistingConfigs
https://cwiki.apache.org/confluence/display/KAFKA/KIP-1009%3A+Add+Broker-level+Throttle+Configurations

reassignを実行するとReplicaがどのように変化するのか

Kafkaのソースコードにあるコメントをもとに、reassignが実行されるときにReplicaがどのように変化するのかを見てみます。

{
  "version": 1,
  "partitions": [
    {
      "topic": "my-topic",
      "partition": 138,
      "replicas": [1009, 1012, 1008]
    }
 ]
}

{
  "version": 1,
  "partitions": [
    {
      "topic": "my-topic",
      "partition": 138,
      "replicas": [1009, 6, 1010]
    }
 ]
}

にreassignすると

  • Adding Replicas
    • 新しく追加するReplica
    • この例では6と1010
  • Removing Replicas
    • Replicasから削除されるReplica
    • この例では1012と1008

が追加されます。reassignを実行している間、ReplicaやISR(In sync replicas)は以下のように変化します。

Replicas Adding Replicas Removing Replicas ISR(In sync replicas)
初期 {1009, 1012, 1008} {} {} {1009, 1012, 1008}
reassign実行 {1009, 1012, 1008, 6, 1010} {6, 1010} {1012,1008} {1009, 1012, 1008}
全てのレプリカが追い付いた {1009, 1012, 1008, 6, 1010} {6, 1010} {1012,1008} {1009, 1012, 1008, 6, 1010}
ISRをshrink {1009, 1012, 1008, 6, 1010} {6, 1010} {1012,1008} {1009, 6, 1010}
Replicasを更新 {1009, 6, 1010} {} {} {1009, 6, 1010}

Adding Replicasなどはreassign実行中にkafka-topicsコマンドを実行すると実際に確認することができます。

kafka-topics --bootstrap-server localhost:9092 --describe --topic my-topic
Topic: my-topic
Partition: 138
Leader: 1009
Replicas: 1009,6,1010,1012,1008
Isr: 1008,1009,1012
Adding Replicas: 6,1010
Removing Replicas: 1012,1008

遭遇したトラブル

Error: Failed to alter dir for xxxとなり、directoryがofflineになる

kafka-reassign-partitionsを実行したすぐ後に発生し、directoryがofflineになってしまうことがありました。その結果、そのdirectoryに配置されているpartitionのレプリケーションが止まり、Under replicated partitionが発生しました。

エラーメッセージ

ERROR Error while flushing log for my-topic-1 in dir /var/lib/kafka/data with offset 3030296 (exclusive) and recovery point 3030296 (org.apache.kafka.storage.internals.log.LogDirFailureChannel)
java.nio.file.NoSuchFileException: /var/lib/kafka/data/my-topic-1.121e03ef8465440b952a8e16d57a58f4-future
        at java.base/sun.nio.fs.UnixException.translateToIOException(UnixException.java:92)
        at java.base/sun.nio.fs.UnixException.rethrowAsIOException(UnixException.java:111)
        at java.base/sun.nio.fs.UnixException.rethrowAsIOException(UnixException.java:116)
        at java.base/sun.nio.fs.UnixFileSystemProvider.newFileChannel(UnixFileSystemProvider.java:182)
        at java.base/java.nio.channels.FileChannel.open(FileChannel.java:292)
        at java.base/java.nio.channels.FileChannel.open(FileChannel.java:345)
        at org.apache.kafka.common.utils.Utils.flushDir(Utils.java:973)
        at kafka.log.LocalLog.flush(LocalLog.scala:177)
        at kafka.log.UnifiedLog.$anonfun$flush$2(UnifiedLog.scala:1537)
        at kafka.log.UnifiedLog.flush(UnifiedLog.scala:1724)
        at kafka.log.UnifiedLog.flushUptoOffsetExclusive(UnifiedLog.scala:1518)
        at kafka.log.UnifiedLog.$anonfun$roll$1(UnifiedLog.scala:1499)
        at org.apache.kafka.server.util.KafkaScheduler.lambda$schedule$1(KafkaScheduler.java:150)
        at java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515)
        at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264)
        at java.base/java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:304)
        at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)

directoryがofflineである状態はKafkaの再起動をすれば解消はするのですが、複数のBrokerでこのエラーが出た場合、1台ブローカーを再起動すると起動が完了するまでの間offline partitionが出てしまう可能性があるので、再起動する前にISRを満たさなくなるpartitionが存在するか確認した方が良いと思います。

このエラーに関連するチケットがあり、3.6.0にアップデートしたところこのエラーは発生しなくなったのですが、別のエラーが発生しdirectoryがofflineになることがあったので、3.6.0ではまだ完全に解消されたわけではなさそうです。

エラーメッセージ

ERROR Error while flushing log for my-topic-1 in dir /var/lib/kafka/data
with offset 634 (exclusive) and recovery point 634 (org.apache.kafka.storage.internals.log.LogDirFailureChannel)
java.nio.channels.ClosedChannelException
        at java.base/sun.nio.ch.FileChannelImpl.ensureOpen(FileChannelImpl.java:150)
        at java.base/sun.nio.ch.FileChannelImpl.force(FileChannelImpl.java:452)
        at org.apache.kafka.common.record.FileRecords.flush(FileRecords.java:197)
        at kafka.log.LogSegment.$anonfun$flush$1(LogSegment.scala:470)
        at kafka.log.LogSegment.$anonfun$flush$1$adapted(LogSegment.scala:469)
        at com.yammer.metrics.core.Timer.time(Timer.java:91)
        at kafka.log.LogSegment.flush(LogSegment.scala:469)
        at kafka.log.LocalLog.$anonfun$flush$1(LocalLog.scala:174)
        at kafka.log.LocalLog.$anonfun$flush$1$adapted(LocalLog.scala:174)
        at scala.collection.IterableOnceOps.foreach(IterableOnce.scala:576)
        at scala.collection.IterableOnceOps.foreach$(IterableOnce.scala:574)
        at scala.collection.AbstractIterable.foreach(Iterable.scala:933)
        at kafka.log.LocalLog.flush(LocalLog.scala:174)
        at kafka.log.UnifiedLog.$anonfun$flush$2(UnifiedLog.scala:1656)
        at kafka.log.UnifiedLog.flush(UnifiedLog.scala:1845)
        at kafka.log.UnifiedLog.flushUptoOffsetExclusive(UnifiedLog.scala:1637)
        at kafka.log.UnifiedLog.$anonfun$roll$1(UnifiedLog.scala:1618)
        at org.apache.kafka.server.util.KafkaScheduler.lambda$schedule$1(KafkaScheduler.java:150)
        at java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515)
        at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264)
        at java.base/java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:304)
        at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
        at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
        at java.base/java.lang.Thread.run(Thread.java:829)
ERROR Uncaught exception in scheduled task 'flush-log' (org.apache.kafka.server.util.KafkaScheduler)
org.apache.kafka.common.errors.KafkaStorageException: Error while flushing log for my-topic-1 in di
r /var/lib/kafka/data with offset 634 (exclusive) and recovery point 634
Caused by: java.nio.channels.ClosedChannelException
        at java.base/sun.nio.ch.FileChannelImpl.ensureOpen(FileChannelImpl.java:150)
        at java.base/sun.nio.ch.FileChannelImpl.force(FileChannelImpl.java:452)
        at org.apache.kafka.common.record.FileRecords.flush(FileRecords.java:197)
        at kafka.log.LogSegment.$anonfun$flush$1(LogSegment.scala:470)
        at kafka.log.LogSegment.$anonfun$flush$1$adapted(LogSegment.scala:469)
        at com.yammer.metrics.core.Timer.time(Timer.java:91)
        at kafka.log.LogSegment.flush(LogSegment.scala:469)
        at kafka.log.LocalLog.$anonfun$flush$1(LocalLog.scala:174)
        at kafka.log.LocalLog.$anonfun$flush$1$adapted(LocalLog.scala:174)
        at scala.collection.IterableOnceOps.foreach(IterableOnce.scala:576)
        at scala.collection.IterableOnceOps.foreach$(IterableOnce.scala:574)
        at scala.collection.AbstractIterable.foreach(Iterable.scala:933)
        at kafka.log.LocalLog.flush(LocalLog.scala:174)
        at kafka.log.UnifiedLog.$anonfun$flush$2(UnifiedLog.scala:1656)
        at kafka.log.UnifiedLog.flush(UnifiedLog.scala:1845)
        at kafka.log.UnifiedLog.flushUptoOffsetExclusive(UnifiedLog.scala:1637)
        at kafka.log.UnifiedLog.$anonfun$roll$1(UnifiedLog.scala:1618)
        at org.apache.kafka.server.util.KafkaScheduler.lambda$schedule$1(KafkaScheduler.java:150)
        at java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515)
        at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264)
        at java.base/java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:304)
        at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
        at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
        at java.base/java.lang.Thread.run(Thread.java:829)

Discussion