kafka-reassign-partitionsの使い方
公式ドキュメントを読めば基本的な使い方は分かるのですが、
日本語のまとまった記事が欲しかったので自分が欲しいと思った情報をまとめました。
kafka-reassign-partitionsについて
Kafkaではノードを新しく追加しても自動的にデータの再配置をやってくれないので、新しく追加されたノードにも既存のデータを配置したい場合は、自分でそのためのオペレーションを実行する必要があります。そのようなトピックのデータを移動するためのツールがpartition reassignment tool
です。
上記の例以外にもpartition reassignment tool
は以下のような場面で使われます。
- すでにあるトピックのReplication Factorを増やしたい、または減らしたい
- あるKafka Brokerにあるデータを別のKafka Brokerに移動したい(rebalance)
partition reassignment toolの使い方
reassignは以下のような流れで実施します。
- パーティションの割り当て情報が書かれたファイルを作成する
- partition reassignment toolを実行する
- reassignの進行状況を確認する
パーティションの割り当て情報が書かれたファイルを作成する
まずは、パーティションの割り当て情報が書かれたJSONファイルを作成します。
{
"version": 1,
"partitions": [
{
"topic": "topic1",
"partition": 0,
"replicas": [
1,
2,
3
],
"log_dirs": [
"/var/lib/kafka/data1",
"/var/lib/kafka/data2",
"/var/lib/kafka/data3"
]
}
]
}
この例ではtopic1
のパーティション 0をbroker.id
が1、2、3であるKafka Brokerの/var/lib/kafka/data1
,/var/lib/kafka/data2
,/var/lib/kafka/data3
に配置するといった内容になっています。このJSONファイルをpartition reassignment toolに渡すとこの状態になるように、データの移動が行われます。
このファイルはpartition reassignment toolの--generate
オプションで自動生成できるのですが、割とナイーブなラウンドロビンで割り当てられるBrokerが決まり、Topicのサイズが考慮されていないなどの制約があるため、基本的にはスクリプトなどを使って自分達が望むような割り当てファイルを生成することになると思います。
なお、log_dirs
をany
にした場合は、割り当れられているパーティション数が少なくかつディレクトリサイズが小さい順番で選ばれるようです。
reassignを実行する
パーティションの割り当て情報が書かれたファイル(assignment.json
)をpartition reassignment toolに渡し--execute
オプションを付けて実行します。
kafka-reassign-partitions --bootstrap-server localhost:9092 \
--reassignment-json-file assignment.json \
--execute --throttle 31457280
{"version":1,"partitions":[{"topic":"topic1","partition":0,"replicas":[1,2,3],"log_dirs":["any","any","any"]}]}
Save this to use as the --reassignment-json-file option during rollback
Warning: You must run --verify periodically, until the reassignment completes, to ensure the throttle is removed.
The inter-broker throttle limit was set to 31457280 B/s
Successfully started partition reassignment for topic1-0
Successfully started log directory moves for: topic1-0-
このコマンドが成功するとassignment.json
の割り当て情報に書かれた配置になるようにデータの移動が始まります。データの移動はKafkaのレプリケーションの仕組みを使って実現されています。
なお、--cancel
オプションでreassignそのものをcancelすることもできます。
kafka-reassign-partitions --bootstrap-server localhost:9092 \
--reassignment-json-file assignment.json \
--cancel
reassignが中断されるだけで、データ配置の状態がreassignを実行する前のものに戻るわけではないことに注意が必要です。
データ移動に使う帯域制御の方法
データ移動(レプリケーション)に使える帯域(単位はByte/sec
)を--throttle
オプションで制御することができます。例えば、--throttle
を31457280
とすると30MiB/secのthrottleがかかります。オプションを指定しないと無制限に帯域を使用してしまい、Kafkaを使っているアプリケーションに影響が出るため--throttle
オプションを付けて実行することをお勧めします。
また、throttleの値は--addtional
オプションでreassign実行中に変更することができます。Brokerの負荷が高くなったときにthrottleをもっと絞りたいときなど、後からthrottleを変えたいときに使います。
kafka-reassign-partitions --bootstrap-server localhost:9092 \
--reassignment-json-file assignment.json \
--execute --throttle 15756302 --additional
あまりに低い値を設定するとreassignが全然進まないことがあるので、reassignの進行状況を監視しておくのが良いです。
reassignの進行状況を確認する
主に2つの方法で確認します
-
--verify
オプション - replication lagメトリクス
kafka-reassign-partitions
の--verify
オプションでどのパーティションのreaasignが進んでいるか、または完了しているか確認できます。
# 出力例
kafka-reassign-partitions --bootstrap-server localhost:9092 --reassignment-json-file assignment.json --verify
~~
Reassignment of replica my-topic-75-1022 completed successfully.
Clearing broker-level throttles on brokers 1013,1,6,1021,1012,1006,2,1007,7,1022,1003,1008,1010,4,1005,5,1015,3,1009,1014
Clearing topic-level throttles on topic my-topic
また、具体的にreassignがどのくらい進んでいるかはreplication lagを見ると分かります。具体的には以下のメトリクスです。
kafka.server:type=ReplicaFetcherManager,name=MaxLag,clientId=Replica
ref. https://kafka.apache.org/documentation/#uses_metrics
reassignの進行中、replication lagは減っていきreassignが完了したときにこのreplication lagは0になります。
reassignが完了した後に--verify
オプションを付けてkafka-reassign-partitions
を実行すると、throttleが解除されます。逆に言うとreassignが完了した後に--verify
を実行しないとずっとthrottleがかかったままになってしまいます。
同じノード内でディレクトリのみを変更するreassignの場合
同じノード内でディレクトリのみを変更するreassignの場合、replicationは発生しない(replication lagは0)ので、directoryのoffset lagを見て進捗を確認します。
kafka-log-dirs
コマンドでisFuture
フラグがtrueになっているdirectoryが、reassign先になっているのでそのdirectoryのoffsetLag
で進捗状況を確認できます。
kafka-log-dirs --bootstrap-server localhost:9092 --describe --topic-list <topic name> \
| tail -n 1 | jq '.brokers[].logDirs[].partitions[] | select(.isFuture == true)'
throttleについての補足
どのようにthrottleがかかるのかやや分かりにくいと感じたので少し補足します。
reassignにおけるthrottleはLeaderとFollowerにかかるものがありそれぞれ2つの設定で機能します。
- Leader throttle
leader.replication.throttled.rate
leader.replication.throttled.replicas
- Follower throttle
follower.replication.throttled.rate
follower.replication.throttled.replicas
*.replication.throttled.rate
でthrottleの値(Byte/sec)を設定し、*.replication.throttled.replicas
でthrottleの対象になるトピックとパーティションを指定します。
Leader throttle、Follower throttleがぞれぞれどのように働くかはKIP-73 Replication Quotasに分かりやすい図があるので、これを読むとイメージできると思います。
Leader throttleはLeaderからFollowerのレプリケーションにかかるthrottleで、設定したQuotaを越えるとFollowerからのFetchリクエストのレスポンスに対象のパーティションデータを含めないようになります。Follower throttleは、FollowerからLeaderにFetchリクエストを送るときに、前回のリクエストまでに受け取ったレスポンスが設定したQuotaを越えると、throttle対象になっているパーティションのデータを今回のFetchリクエストに含めないようになります。
公式ドキュメントの例でいえば、
By default kafka-reassign-partitions.sh will apply the leader throttle to all replicas that exist before the rebalance, any one of which might be leader. It will apply the follower throttle to all move destinations. So if there is a partition with replicas on brokers 101,102, being reassigned to 102,103, a leader throttle, for that partition, would be applied to 101,102 and a follower throttle would be applied to 103 only.
Kafka Broker101, 102が持っているパーティションを101と103が持つようにreassignしたとき、元々のレプリカだった101と102にはLeader throttleが、新しくレプリカになる103にはFollower throttleがかかります。
ブローカーに設定されているLeader throttleやFollower throttleは、kafka-configs
コマンドで確認できます。
bin/kafka-configs.sh --describe --bootstrap-server localhost:9092 --entity-type topics
Configs for topic 'my-topic' are leader.replication.throttled.replicas=1:102,0:101,
follower.replication.throttled.replicas=1:101,0:102
reassignのときにかかるthrottleですが2つほど注意点があります。
まず1つめは、このthrottleがreassignに伴うレプリケーションだけではなく、通常のレプリケーションに対してもかかってしまうことです。例えば、reassignによって新しくレプリカになるBrokerにFollower throttleがかかりますが、そのBrokerに対するreassignが完了した後、何らかの理由でそのBrokerがIn Sync Replica(ISR)から外れてしまった場合、そのブローカー以外のreassignも含めて全部のreassignが終わらない間はFollower throttleがかかっている状態で通常のレプリケーションが始まるので、ISRに戻るまで時間がかかる可能性があります。
2つめは、*.replication.throttled.replicas
で設定していないレプリカとパーティションはthrottleの対象にならないことです。throttleが通常のレプリケーションを含めてかかってしまう影響を緩和するためだと思うのですが、*.replication.throttled.replicas
で設定されていないレプリカとパーティションは対象ではないため、これらの*.replication.throttled.rate
で設定した値以上のレプリケーションが発生します。そのため。Brokerが耐えられる負荷ぎりぎりの値を設定しまうとBrokerが負荷に耐えられずにアプリケーションに影響が出る可能性があります。
それぞれKIPはあるので今後の改善によってこの問題は解消されるかもしれないです。
reassignを実行するとReplicaがどのように変化するのか
Kafkaのソースコードにあるコメントをもとに、reassignが実行されるときにReplicaがどのように変化するのかを見てみます。
{
"version": 1,
"partitions": [
{
"topic": "my-topic",
"partition": 138,
"replicas": [1009, 1012, 1008]
}
]
}
を
{
"version": 1,
"partitions": [
{
"topic": "my-topic",
"partition": 138,
"replicas": [1009, 6, 1010]
}
]
}
にreassignすると
- Adding Replicas
- 新しく追加するReplica
- この例では6と1010
- Removing Replicas
- Replicasから削除されるReplica
- この例では1012と1008
が追加されます。reassignを実行している間、ReplicaやISR(In sync replicas)は以下のように変化します。
Replicas | Adding Replicas | Removing Replicas | ISR(In sync replicas) | |
---|---|---|---|---|
初期 | {1009, 1012, 1008} | {} | {} | {1009, 1012, 1008} |
reassign実行 | {1009, 1012, 1008, 6, 1010} | {6, 1010} | {1012,1008} | {1009, 1012, 1008} |
全てのレプリカが追い付いた | {1009, 1012, 1008, 6, 1010} | {6, 1010} | {1012,1008} | {1009, 1012, 1008, 6, 1010} |
ISRをshrink | {1009, 1012, 1008, 6, 1010} | {6, 1010} | {1012,1008} | {1009, 6, 1010} |
Replicasを更新 | {1009, 6, 1010} | {} | {} | {1009, 6, 1010} |
Adding Replicasなどはreassign実行中にkafka-topics
コマンドを実行すると実際に確認することができます。
kafka-topics --bootstrap-server localhost:9092 --describe --topic my-topic
Topic: my-topic
Partition: 138
Leader: 1009
Replicas: 1009,6,1010,1012,1008
Isr: 1008,1009,1012
Adding Replicas: 6,1010
Removing Replicas: 1012,1008
遭遇したトラブル
Error: Failed to alter dir for xxx
となり、directoryがofflineになる
kafka-reassign-partitions
を実行したすぐ後に発生し、directoryがofflineになってしまうことがありました。その結果、そのdirectoryに配置されているpartitionのレプリケーションが止まり、Under replicated partitionが発生しました。
エラーメッセージ
ERROR Error while flushing log for my-topic-1 in dir /var/lib/kafka/data with offset 3030296 (exclusive) and recovery point 3030296 (org.apache.kafka.storage.internals.log.LogDirFailureChannel)
java.nio.file.NoSuchFileException: /var/lib/kafka/data/my-topic-1.121e03ef8465440b952a8e16d57a58f4-future
at java.base/sun.nio.fs.UnixException.translateToIOException(UnixException.java:92)
at java.base/sun.nio.fs.UnixException.rethrowAsIOException(UnixException.java:111)
at java.base/sun.nio.fs.UnixException.rethrowAsIOException(UnixException.java:116)
at java.base/sun.nio.fs.UnixFileSystemProvider.newFileChannel(UnixFileSystemProvider.java:182)
at java.base/java.nio.channels.FileChannel.open(FileChannel.java:292)
at java.base/java.nio.channels.FileChannel.open(FileChannel.java:345)
at org.apache.kafka.common.utils.Utils.flushDir(Utils.java:973)
at kafka.log.LocalLog.flush(LocalLog.scala:177)
at kafka.log.UnifiedLog.$anonfun$flush$2(UnifiedLog.scala:1537)
at kafka.log.UnifiedLog.flush(UnifiedLog.scala:1724)
at kafka.log.UnifiedLog.flushUptoOffsetExclusive(UnifiedLog.scala:1518)
at kafka.log.UnifiedLog.$anonfun$roll$1(UnifiedLog.scala:1499)
at org.apache.kafka.server.util.KafkaScheduler.lambda$schedule$1(KafkaScheduler.java:150)
at java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515)
at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264)
at java.base/java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:304)
at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
directoryがofflineである状態はKafkaの再起動をすれば解消はするのですが、複数のBrokerでこのエラーが出た場合、1台ブローカーを再起動すると起動が完了するまでの間offline partitionが出てしまう可能性があるので、再起動する前にISRを満たさなくなるpartitionが存在するか確認した方が良いと思います。
このエラーに関連するチケットがあり、3.6.0にアップデートしたところこのエラーは発生しなくなったのですが、別のエラーが発生しdirectoryがofflineになることがあったので、3.6.0ではまだ完全に解消されたわけではなさそうです。
エラーメッセージ
ERROR Error while flushing log for my-topic-1 in dir /var/lib/kafka/data
with offset 634 (exclusive) and recovery point 634 (org.apache.kafka.storage.internals.log.LogDirFailureChannel)
java.nio.channels.ClosedChannelException
at java.base/sun.nio.ch.FileChannelImpl.ensureOpen(FileChannelImpl.java:150)
at java.base/sun.nio.ch.FileChannelImpl.force(FileChannelImpl.java:452)
at org.apache.kafka.common.record.FileRecords.flush(FileRecords.java:197)
at kafka.log.LogSegment.$anonfun$flush$1(LogSegment.scala:470)
at kafka.log.LogSegment.$anonfun$flush$1$adapted(LogSegment.scala:469)
at com.yammer.metrics.core.Timer.time(Timer.java:91)
at kafka.log.LogSegment.flush(LogSegment.scala:469)
at kafka.log.LocalLog.$anonfun$flush$1(LocalLog.scala:174)
at kafka.log.LocalLog.$anonfun$flush$1$adapted(LocalLog.scala:174)
at scala.collection.IterableOnceOps.foreach(IterableOnce.scala:576)
at scala.collection.IterableOnceOps.foreach$(IterableOnce.scala:574)
at scala.collection.AbstractIterable.foreach(Iterable.scala:933)
at kafka.log.LocalLog.flush(LocalLog.scala:174)
at kafka.log.UnifiedLog.$anonfun$flush$2(UnifiedLog.scala:1656)
at kafka.log.UnifiedLog.flush(UnifiedLog.scala:1845)
at kafka.log.UnifiedLog.flushUptoOffsetExclusive(UnifiedLog.scala:1637)
at kafka.log.UnifiedLog.$anonfun$roll$1(UnifiedLog.scala:1618)
at org.apache.kafka.server.util.KafkaScheduler.lambda$schedule$1(KafkaScheduler.java:150)
at java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515)
at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264)
at java.base/java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:304)
at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
at java.base/java.lang.Thread.run(Thread.java:829)
ERROR Uncaught exception in scheduled task 'flush-log' (org.apache.kafka.server.util.KafkaScheduler)
org.apache.kafka.common.errors.KafkaStorageException: Error while flushing log for my-topic-1 in di
r /var/lib/kafka/data with offset 634 (exclusive) and recovery point 634
Caused by: java.nio.channels.ClosedChannelException
at java.base/sun.nio.ch.FileChannelImpl.ensureOpen(FileChannelImpl.java:150)
at java.base/sun.nio.ch.FileChannelImpl.force(FileChannelImpl.java:452)
at org.apache.kafka.common.record.FileRecords.flush(FileRecords.java:197)
at kafka.log.LogSegment.$anonfun$flush$1(LogSegment.scala:470)
at kafka.log.LogSegment.$anonfun$flush$1$adapted(LogSegment.scala:469)
at com.yammer.metrics.core.Timer.time(Timer.java:91)
at kafka.log.LogSegment.flush(LogSegment.scala:469)
at kafka.log.LocalLog.$anonfun$flush$1(LocalLog.scala:174)
at kafka.log.LocalLog.$anonfun$flush$1$adapted(LocalLog.scala:174)
at scala.collection.IterableOnceOps.foreach(IterableOnce.scala:576)
at scala.collection.IterableOnceOps.foreach$(IterableOnce.scala:574)
at scala.collection.AbstractIterable.foreach(Iterable.scala:933)
at kafka.log.LocalLog.flush(LocalLog.scala:174)
at kafka.log.UnifiedLog.$anonfun$flush$2(UnifiedLog.scala:1656)
at kafka.log.UnifiedLog.flush(UnifiedLog.scala:1845)
at kafka.log.UnifiedLog.flushUptoOffsetExclusive(UnifiedLog.scala:1637)
at kafka.log.UnifiedLog.$anonfun$roll$1(UnifiedLog.scala:1618)
at org.apache.kafka.server.util.KafkaScheduler.lambda$schedule$1(KafkaScheduler.java:150)
at java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515)
at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264)
at java.base/java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:304)
at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
at java.base/java.lang.Thread.run(Thread.java:829)
Discussion