Kinesis Client Libraryはデフォルトで拡張ファンアウトを使う

4 min read読了の目安(約3800字

TL;DR

タイトル通り。これサンプルから読みとけって無理じゃね?みんなどうしてるの?
(多分ドキュメントのどこかに書いてある気がするけど)

Kinesisについて

まあ説明いらんでしょ。 簡単に使えていいですよね。MSKとガチンコ比較した記事をどなたか書いてください。

拡張ファンアウトについて

Kiensisはシャード単位で性能が決まっていて、1シャード辺り秒間最大2MBの読み取り性能がある。Kinesisは複数のコンシューマグループをつなげることができるが、コンシューマグループをつなげすぎると最大読み取り性能を超えてしまう可能性がある。
そんなときに使えるのが拡張ファンアウト (Enhanced fan-out)。これは、コンシューマごとにスループットを専有できるので、他のコンシューマに邪魔されることなくコンシューマを繋げられる=いっぱいコンシューマをつけても大丈夫、というのが私の理解。

一つのストリームに複数コンシューマ繋げたいケースっていっぱいあると思うので、これは嬉しい機能。シャード数増やすより手軽だし、真面目に計算していないけど多分安い。

Kinesis Consumer Library (KCL) について

拡張ファンアウトみたいな機能を使うには、KCLのv2を使うのが良いらしい。KCL v2はPythonとJavaが用意されているらしい。とりあえず手軽に使えるPython版を使ってみよう。

Python

意気揚々とPythonのライブラリを使ってみようとする。ソースコードを眺める感じ、底知れないPython2っぽさや、PythonのライブラリというよりJavaのラッパーじゃね、とか感じ取りながら、とりあえずチュートリアルを動かしてみる。

動いているっぽいが、これでいいのか・・・とか思いながら、AWSのマネコンを眺めてみる。すると、特に設定した覚えがないのに、先程動かしたPythonのKCLが拡張ファンアウトに登録されているではないか!

なんでやわからん・・・とか思いつつ、どうせJavaで動いているのならJavaの方見たほうが良いな、と思い、結局Javaのライブラリを眺めてみることに。

Java

拡張ファンアウトを利用するためのサンプルアプリがAWSのドキュメントには乗っている。見てみたが、明示的にfan-outの設定をしているようには見えない。なんだこれ。どこで設定しているのだろう。。。

こういうのは多分schedulerに登録するときなのかな、と思うのだが・・・

        Scheduler scheduler = new Scheduler(
                configsBuilder.checkpointConfig(),
                configsBuilder.coordinatorConfig(),
                configsBuilder.leaseManagementConfig(),
                configsBuilder.lifecycleConfig(),
                configsBuilder.metricsConfig(),
                configsBuilder.processorConfig(),
                configsBuilder.retrievalConfig()
        );

よくわからないので、スループット共有(従来の方式)の方を見てみる。こっちも明示的には設定していなくね・・・と思ったが、差分があった。

        Scheduler scheduler = new Scheduler(
                configsBuilder.checkpointConfig(),
                configsBuilder.coordinatorConfig(),
                configsBuilder.leaseManagementConfig(),
                configsBuilder.lifecycleConfig(),
                configsBuilder.metricsConfig(),
                configsBuilder.processorConfig(),
                configsBuilder.retrievalConfig().retrievalSpecificConfig(new PollingConfig(streamName, kinesisClient))
        );

もしかして: PollingConfig なるものがスループット共有の設定になる・・・?そして、retrivalConfigはデフォルトだとfan-outになるのではないだろうか・・・?

javadocを見てみる

javadocによると、どうやらpollingと対になる設定で、fanoutの設定があるっぽい。なので、おそらく予想はあっているっぽい。でも、どこにデフォルトの設定が書いてあるんだ・・・

ソースコードを見てみる

うおおん

    public RetrievalFactory retrievalFactory() {
        if (retrievalFactory == null) {
            if (retrievalSpecificConfig == null) {
                retrievalSpecificConfig = new FanOutConfig(kinesisClient())
                        .applicationName(applicationName());
                retrievalSpecificConfig = appStreamTracker.map(multiStreamTracker -> retrievalSpecificConfig,
                        streamConfig -> ((FanOutConfig) retrievalSpecificConfig).streamName(streamConfig.streamIdentifier().streamName()));
            }
            retrievalFactory = retrievalSpecificConfig.retrievalFactory();
        }
        return retrievalFactory;
    }

まとめ

いや、逆にそれくらい自分で調べられないと、(AWSを使うのは)難しい。