Kinesis Client Libraryはデフォルトで拡張ファンアウトを使う
TL;DR
タイトル通り。これサンプルから読みとけって無理じゃね?みんなどうしてるの?
(多分ドキュメントのどこかに書いてある気がするけど)
Kinesisについて
まあ説明いらんでしょ。 簡単に使えていいですよね。MSKとガチンコ比較した記事をどなたか書いてください。
拡張ファンアウトについて
Kiensisはシャード単位で性能が決まっていて、1シャード辺り秒間最大2MBの読み取り性能がある。Kinesisは複数のコンシューマグループをつなげることができるが、コンシューマグループをつなげすぎると最大読み取り性能を超えてしまう可能性がある。
そんなときに使えるのが拡張ファンアウト (Enhanced fan-out)。これは、コンシューマごとにスループットを専有できるので、他のコンシューマに邪魔されることなくコンシューマを繋げられる=いっぱいコンシューマをつけても大丈夫、というのが私の理解。
一つのストリームに複数コンシューマ繋げたいケースっていっぱいあると思うので、これは嬉しい機能。シャード数増やすより手軽だし、真面目に計算していないけど多分安い。
Kinesis Consumer Library (KCL) について
拡張ファンアウトみたいな機能を使うには、KCLのv2を使うのが良いらしい。KCL v2はPythonとJavaが用意されているらしい。とりあえず手軽に使えるPython版を使ってみよう。
Python
意気揚々とPythonのライブラリを使ってみようとする。ソースコードを眺める感じ、底知れないPython2っぽさや、PythonのライブラリというよりJavaのラッパーじゃね、とか感じ取りながら、とりあえずチュートリアルを動かしてみる。
動いているっぽいが、これでいいのか・・・とか思いながら、AWSのマネコンを眺めてみる。すると、特に設定した覚えがないのに、先程動かしたPythonのKCLが拡張ファンアウトに登録されているではないか!
なんでやわからん・・・とか思いつつ、どうせJavaで動いているのならJavaの方見たほうが良いな、と思い、結局Javaのライブラリを眺めてみることに。
Java
拡張ファンアウトを利用するためのサンプルアプリがAWSのドキュメントには乗っている。見てみたが、明示的にfan-outの設定をしているようには見えない。なんだこれ。どこで設定しているのだろう。。。
こういうのは多分schedulerに登録するときなのかな、と思うのだが・・・
Scheduler scheduler = new Scheduler(
configsBuilder.checkpointConfig(),
configsBuilder.coordinatorConfig(),
configsBuilder.leaseManagementConfig(),
configsBuilder.lifecycleConfig(),
configsBuilder.metricsConfig(),
configsBuilder.processorConfig(),
configsBuilder.retrievalConfig()
);
よくわからないので、スループット共有(従来の方式)の方を見てみる。こっちも明示的には設定していなくね・・・と思ったが、差分があった。
Scheduler scheduler = new Scheduler(
configsBuilder.checkpointConfig(),
configsBuilder.coordinatorConfig(),
configsBuilder.leaseManagementConfig(),
configsBuilder.lifecycleConfig(),
configsBuilder.metricsConfig(),
configsBuilder.processorConfig(),
configsBuilder.retrievalConfig().retrievalSpecificConfig(new PollingConfig(streamName, kinesisClient))
);
もしかして: PollingConfig
なるものがスループット共有の設定になる・・・?そして、retrivalConfigはデフォルトだとfan-outになるのではないだろうか・・・?
javadocを見てみる
javadocによると、どうやらpollingと対になる設定で、fanoutの設定があるっぽい。なので、おそらく予想はあっているっぽい。でも、どこにデフォルトの設定が書いてあるんだ・・・
- https://javadoc.io/static/software.amazon.kinesis/amazon-kinesis-client/2.3.4/software/amazon/kinesis/retrieval/polling/package-summary.html
- https://javadoc.io/static/software.amazon.kinesis/amazon-kinesis-client/2.3.4/software/amazon/kinesis/retrieval/fanout/package-summary.html
ソースコードを見てみる
うおおん
public RetrievalFactory retrievalFactory() {
if (retrievalFactory == null) {
if (retrievalSpecificConfig == null) {
retrievalSpecificConfig = new FanOutConfig(kinesisClient())
.applicationName(applicationName());
retrievalSpecificConfig = appStreamTracker.map(multiStreamTracker -> retrievalSpecificConfig,
streamConfig -> ((FanOutConfig) retrievalSpecificConfig).streamName(streamConfig.streamIdentifier().streamName()));
}
retrievalFactory = retrievalSpecificConfig.retrievalFactory();
}
return retrievalFactory;
}
まとめ
いや、逆にそれくらい自分で調べられないと、(AWSを使うのは)難しい。
Discussion