KafkaMusic code reading
残タスク
- topologyを可視化して概要を把握する
- streamDSLの部分を読み込む
- REST API部分の処理を読む
- test codeを読む
- TopFiveSerdeの実装を理解する
confluentinc/kafka-streams-examples: Demo applications and code examples for Apache Kafka's Streams API. にあるKafkaMusicというsampleのコードを読んで内容を理解したい
動作確認は下記のscrapで済ましている
今回はこのsampleを通じてStreamDSLの使い方やInteractive Queriesについて理解を深める
このscrapでは上記repositoryの 6.0.1-post
branchのコードを読む (scrap作成時点でdefaultになっているbranch)
KafkaMusicは再生されてる楽曲のtop5を集計し続けるアプリらしい
集計されたデータはREST API経由でアクセスできる
input topicsは下記の2つ
- song-feed
- 楽曲情報
- KTableで表現される
- play-events
- 再生ログ
- ただし30秒以上再生されたものだけ処理するよう実装されてるらしい
上記のtopicがjoinされ、 songPlayCounts
というKTable( song-play-count
という state store) になる
さらにこれをgroupingしてジャンルごとに集計し、 top-five-songs-by-genre
という state storeが作られる
まずは KafkaMusic のあたりのfileをみる
- KafkaMusicExample
- 今回のstream app本体
- stream appの設定やtopologyなどはここを見る
- KafkaMusicExampleDriver
- 動作確認用にdummy dataを流してくれるdriver
- MusicPlaysRestService
- Interactive Queries APIを使ってStateStoreを取得するREST proxy
- sampleとして用意されたendpointやInteractive Queriesの使い方はここを読む
- web serverにはjettyというのを使ってるらしい https://www.eclipse.org/jetty/
- SongBean
- MusicPlaysRestServiceで結果を返すのに使っている?
- jettyどころかjavaもよくわからないので別途調べる
- MusicPlaysRestServiceで結果を返すのに使っている?
- SongPlayCountBean
- 同上
Beanってこれのこと? https://e-words.jp/w/Bean.html
Song
や SongPlayCount
, PlayEvent
はこちらでschemaが定義されている。この定義をもとにclassが自動生成されるはず
次はいきなり KafkaMusicExample
を見る前に KafkaMusicExampleDriver.javaを読んでinput-topicに流れるrecordからイメージできるようにする
動作方法はcode commentにあるように以下で実行する
引数を渡すことでbootstrap-serverやschema-registry-urlを変更できるが、defaultなら下記のままでよい
$ java -cp target/kafka-streams-examples-6.0.1-standalone.jar io.confluent.examples.streams.interactivequeries.kafkamusic.KafkaMusicExampleDriver
処理は大きく2つ
- kafka-streams-examples/song_source.csv をloadする
- 一定間隔で
playEvent
をproduceする- songIdはrandomで設定、再生時間(duration)は60秒固定
- こちらはdriverを止めるまで生成し続ける
流すmessageを変えたい場合は、このDriverのコードを書き換えてbuildし直す
入力データはわかったのでKafkaMusicExample.java を読んでstream applicationのtopologyまわりを理解する
REST APIまわりのコードはあとで確認する
コードも詳細は後にして、まずは全体の流れを理解する
登場人物
- serde
- playEventSerde
- keySongSerde
- valueSongSerde
- songPlayCountSerde
- topFiveSerde
- これだけavro schemaではなくinner classで定義している
- 理由はコードを読んで後ほど理解する
- KStream
- playEvents
- DriverからのplayEventのstream
- playsBySongId
- playEventsを30秒以上の条件でfilteringしたのち、songIdでrepartitionしたstream
- songPlays
- playsBySongIdをsongTableでenrichmentしたもの
- playEvents
- KTable
- songTable
- Driverからのsongのtable
- songPlayCounts
- songPlaysをもとに曲ごとに再生回数を集計したtable
- songTable
- StateStore (Materialized.asで名前付けされているもの)
- SONG_PLAY_COUNT_STORE
- 曲ごとに再生回数を持ったstate store
- TOP_FIVE_SONGS_BY_GENRE_STORE
- ジャンルごとに再生回数が上位5曲の結果を持ったstate store
- TOP_FIVE_SONGS_STORE
- すべての曲から再生回数が上位5曲の結果を持ったstate store
- SONG_PLAY_COUNT_STORE
今回state storeに記録されたデータはInteractive Queriesによってアクセスされ、REST API経由で外部から読めるようになっている
topology
https://zz85.github.io/kafka-streams-viz/ でtopologyを可視化する
topologyの出力は System.out.println(topology.describe());
といったコードを埋め込んでもう一度packageを作る
testの実行にとても時間がかかるので、 mvn -Dskip.tests=true package
などとしてtestをskipする
Topologies:
Sub-topology: 0
Source: KSTREAM-SOURCE-0000000000 (topics: [play-events])
--> KSTREAM-FILTER-0000000003
Processor: KSTREAM-FILTER-0000000003 (stores: [])
--> KSTREAM-MAP-0000000004
<-- KSTREAM-SOURCE-0000000000
Processor: KSTREAM-MAP-0000000004 (stores: [])
--> KSTREAM-FILTER-0000000006
<-- KSTREAM-FILTER-0000000003
Processor: KSTREAM-FILTER-0000000006 (stores: [])
--> KSTREAM-SINK-0000000005
<-- KSTREAM-MAP-0000000004
Sink: KSTREAM-SINK-0000000005 (topic: KSTREAM-MAP-0000000004-repartition)
<-- KSTREAM-FILTER-0000000006
Sub-topology: 1
Source: KSTREAM-SOURCE-0000000007 (topics: [KSTREAM-MAP-0000000004-repartition])
--> KSTREAM-LEFTJOIN-0000000008
Processor: KSTREAM-LEFTJOIN-0000000008 (stores: [all-songs])
--> KSTREAM-KEY-SELECT-0000000009
<-- KSTREAM-SOURCE-0000000007
Processor: KSTREAM-KEY-SELECT-0000000009 (stores: [])
--> song-play-count-repartition-filter
<-- KSTREAM-LEFTJOIN-0000000008
Source: KSTREAM-SOURCE-0000000001 (topics: [song-feed])
--> KTABLE-SOURCE-0000000002
Processor: song-play-count-repartition-filter (stores: [])
--> song-play-count-repartition-sink
<-- KSTREAM-KEY-SELECT-0000000009
Processor: KTABLE-SOURCE-0000000002 (stores: [all-songs])
--> none
<-- KSTREAM-SOURCE-0000000001
Sink: song-play-count-repartition-sink (topic: song-play-count-repartition)
<-- song-play-count-repartition-filter
Sub-topology: 2
Source: song-play-count-repartition-source (topics: [song-play-count-repartition])
--> KSTREAM-AGGREGATE-0000000010
Processor: KSTREAM-AGGREGATE-0000000010 (stores: [song-play-count])
--> KTABLE-SELECT-0000000014, KTABLE-SELECT-0000000018
<-- song-play-count-repartition-source
Processor: KTABLE-SELECT-0000000014 (stores: [])
--> KSTREAM-SINK-0000000015
<-- KSTREAM-AGGREGATE-0000000010
Processor: KTABLE-SELECT-0000000018 (stores: [])
--> KSTREAM-SINK-0000000019
<-- KSTREAM-AGGREGATE-0000000010
Sink: KSTREAM-SINK-0000000015 (topic: top-five-songs-by-genre-repartition)
<-- KTABLE-SELECT-0000000014
Sink: KSTREAM-SINK-0000000019 (topic: top-five-songs-repartition)
<-- KTABLE-SELECT-0000000018
Sub-topology: 3
Source: KSTREAM-SOURCE-0000000016 (topics: [top-five-songs-by-genre-repartition])
--> KTABLE-AGGREGATE-0000000017
Processor: KTABLE-AGGREGATE-0000000017 (stores: [top-five-songs-by-genre])
--> none
<-- KSTREAM-SOURCE-0000000016
Sub-topology: 4
Source: KSTREAM-SOURCE-0000000020 (topics: [top-five-songs-repartition])
--> KTABLE-AGGREGATE-0000000021
Processor: KTABLE-AGGREGATE-0000000021 (stores: [top-five-songs])
--> none
<-- KSTREAM-SOURCE-0000000020
time limitなので今日はここまで
続きはtopologyの図を見ながらコードを読む
sub-topologyが5つ
sub-toplogy 0 : playsBySongId
再生時間が指定以上のもののfilteringとrepartitionのためのmap
repartitionは次のsub-topologyでjoinするため
疑問:mapのあとにfilterのprocessorがあるのはなぜ
該当コードはKafkaMusicExample.java#L314あたり
// get a stream of play events
final KStream<String, PlayEvent> playEvents = builder.stream(
PLAY_EVENTS,
Consumed.with(Serdes.String(), playEventSerde));
...
// Accept play events that have a duration >= the minimum
final KStream<Long, PlayEvent> playsBySongId =
playEvents.filter((region, event) -> event.getDuration() >= MIN_CHARTABLE_DURATION)
// repartition based on song id
.map((key, value) -> KeyValue.pair(value.getSongId(), value));
sub-topology 1 : songPlayCountsのgroupBy
以下を行うtopology
- song-feedを読み込んで前述のplaysBySongIdとleftJoinする
- まだここではaggregateがないので、countの処理前までのgroupByよるselectKeyやrepartitionまででsub-topologyになっているぽい
- ここは理解があやしい
該当コードはKafkaMusicExample.java#L326あたり
sub-topology 2 : songPlayCountsのcount
sub-topology1をsourceとしてcountで集計し、StateStoreと後のsub-topologyのためのtopicにrecordを流す
aggregateの後ろのselectとrepartitionはtop-five-songs-by-genreとtop-five-songsのgroupByによるものだろう
sub-topology 3 : top-five-songs-by-genre
songPlayCountsからジャンル毎のtop5を集計し、state storeに結果を記録する
このsampleではinteractive queriesによってデータを使うため、出力topicはない
該当コードはKafkaMusicExample.java#L338あたり
// Compute the top five charts for each genre. The results of this computation will continuously update the state
// store "top-five-songs-by-genre", and this state store can then be queried interactively via a REST API (cf.
// MusicPlaysRestService) for the latest charts per genre.
songPlayCounts.groupBy((song, plays) ->
KeyValue.pair(song.getGenre().toLowerCase(),
new SongPlayCount(song.getId(), plays)),
Grouped.with(Serdes.String(), songPlayCountSerde))
// aggregate into a TopFiveSongs instance that will keep track
// of the current top five for each genre. The data will be available in the
// top-five-songs-genre store
.aggregate(TopFiveSongs::new,
(aggKey, value, aggregate) -> {
aggregate.add(value);
return aggregate;
},
(aggKey, value, aggregate) -> {
aggregate.remove(value);
return aggregate;
},
Materialized.<String, TopFiveSongs, KeyValueStore<Bytes, byte[]>>as(TOP_FIVE_SONGS_BY_GENRE_STORE)
.withKeySerde(Serdes.String())
.withValueSerde(topFiveSerde)
);
sub-topology 4 : top-five-songs
sub-topology 3とほぼ同じで、こちらはジャンル別ではなく全楽曲での集計
該当コードはKafkaMusicExample.java#L362あたり
// Compute the top five chart. The results of this computation will continuously update the state
// store "top-five-songs", and this state store can then be queried interactively via a REST API (cf.
// MusicPlaysRestService) for the latest charts per genre.
songPlayCounts.groupBy((song, plays) ->
KeyValue.pair(TOP_FIVE_KEY,
new SongPlayCount(song.getId(), plays)),
Grouped.with(Serdes.String(), songPlayCountSerde))
.aggregate(TopFiveSongs::new,
(aggKey, value, aggregate) -> {
aggregate.add(value);
return aggregate;
},
(aggKey, value, aggregate) -> {
aggregate.remove(value);
return aggregate;
},
Materialized.<String, TopFiveSongs, KeyValueStore<Bytes, byte[]>>as(TOP_FIVE_SONGS_STORE)
.withKeySerde(Serdes.String())
.withValueSerde(topFiveSerde)
);
TopFiveSongs はMapやTreeSetといったfieldを持っているので、そのためにTopFiveSerdeを用意している
TopFiveSerdeでは serialize
と deserialize
を定義してbyte列を相互変換できるようにしているみたい
TopFiveSongs class
topologyの最後でtop5の集計のときにaggregate methodで使われる集計用のclass
メインの処理は topFive
というTreeSetと add
methodあたり
tree set
JavaのTreeSetのdoc : TreeSet (Java Platform SE 8 )
SortedSet interfaceを実装しているSetとしてTreeSetを使ってる?
処理自体は再生回数が多い順にsortし、もし同数ならsongIdが若い方を優先するようになっている
private final TreeSet<SongPlayCount> topFive = new TreeSet<>((o1, o2) -> {
final Long o1Plays = o1.getPlays();
final Long o2Plays = o2.getPlays();
final int result = o2Plays.compareTo(o1Plays);
if (result != 0) {
return result;
}
final Long o1SongId = o1.getSongId();
final Long o2SongId = o2.getSongId();
return o1SongId.compareTo(o2SongId);
});
add method
新しいtop5候補を追加する
すでにtop5に同じものが含まれてる場合、削除からの追加で値を更新する
候補が5件を超えたら topFive.last()
で一番後ろの候補を取り除くことで5件を維持している
public void add(final SongPlayCount songPlayCount) {
if(currentSongs.containsKey(songPlayCount.getSongId())) {
topFive.remove(currentSongs.remove(songPlayCount.getSongId()));
}
topFive.add(songPlayCount);
currentSongs.put(songPlayCount.getSongId(), songPlayCount);
if (topFive.size() > 5) {
final SongPlayCount last = topFive.last();
currentSongs.remove(last.getSongId());
topFive.remove(last);
}
}
topologyの全体の流れはわかった気がするので、次にRestServiceの方の概要を把握してtopologyの最後にstate storeに記録した値がどのように参照されてるのかを理解する
読むのは主に二箇所:
- KafkaMusicExample.java のmainでの設定とRestServiceをstartしてるところ
- MusicPlaysRestService.java 全体
mainのほうはこのぐらい
- settings
StreamsConfig.APPLICATION_SERVER_CONFIG
StreamsConfig.STATE_DIR_CONFIG
- RestServiceの初期化
- streamsのinstanceとhostInfoを渡してあとはMusicPlayRestServiceで処理
MusicPlaysRessetService
constructorの方は引数をそのまま保持してるのとMetatdataServiceの初期化ぐらいなので、 start()
の方から読んでみる
start()
MusicPlaysRestService.java#212
Eclipse Jetty | The Eclipse Foundationというのをつかってweb serverを起動しているみたいで、その初期設定とserverの起動処理が書いてある
endpointsを調べる
このsampleの使い方の例に以下のendpointが挙がっているので、これらの処理を一つずつ追ってみる
* # List all running instances of this application
* http://localhost:7070/kafka-music/instances
*
* # List app instances that currently manage (parts of) state store "song-play-count"
* http://localhost:7070/kafka-music/instances/song-play-count
*
* # Get the latest top five for the genre "punk"
* http://localhost:7070/kafka-music/charts/genre/punk
*
* # Get the latest top five across all genres
* http://localhost:7070/kafka-music/charts/top-five
http://localhost:7070/kafka-music/instances
endpoint :KafkaMusic applicationで動いているinstanceの一覧を返す
MusicPlaysRestService.javaL188
MetadataService.java のほうが実体で、やっていることは streams.allMetadata()
で値を引っ張ってきてhostInfoにmapして値を返している
http://localhost:7070/kafka-music/instances/song-play-count
endpoint :song-play-count
state storeを一部でも持っているinstanceを列挙する
MusicPlaysRestService.javaL201
前述と処理はほぼ同じで、 streams.allMetadata()
の代わりに streams.allMetadataForStore(store)
を使っている
http://localhost:7070/kafka-music/charts/genre/punk
endpoint :ジャンル別のtop5を返す。このendpointだと punk のtop5を返す
requestを受け付けたapplication instanceが該当ジャンルを含むstate storeを保持しているとは限らないので、 streams.metadataForKey(store, key, serializer)
を使って取得先を見つけ、そちらに再度requestする ( fetchSongPlayCount
あたり )
もし自身が該当するstate storeを保持している場合は取得したtop5のデータに加え、 ALL_SONGS
から楽曲情報を取得し、enrichmentしたデータを返す。このとき、この ALL_SONGS
はGlobalKTableではなくKTableなので、取得したい値が同一hostに存在するとは限らない。そのため、こちらも同様にmetadataで保持しているhostを探し、必要であればrequestを投げる
http://localhost:7070/kafka-music/charts/top-five
endpoint :前述の内容とほぼ同じで、top5の取得先が全楽曲の方のstate storeを見ている