Open14

KafkaMusic code reading

ピン留めされたアイテム
kei_qkei_q

残タスク

  • topologyを可視化して概要を把握する
  • streamDSLの部分を読み込む
  • REST API部分の処理を読む
  • test codeを読む
  • TopFiveSerdeの実装を理解する
kei_qkei_q

confluentinc/kafka-streams-examples: Demo applications and code examples for Apache Kafka's Streams API. にあるKafkaMusicというsampleのコードを読んで内容を理解したい

動作確認は下記のscrapで済ましている
https://zenn.dev/kei_q/scraps/be8e4b5d093b6f

今回はこのsampleを通じてStreamDSLの使い方やInteractive Queriesについて理解を深める

このscrapでは上記repositoryの 6.0.1-post branchのコードを読む (scrap作成時点でdefaultになっているbranch)

kei_qkei_q

KafkaMusicは再生されてる楽曲のtop5を集計し続けるアプリらしい
集計されたデータはREST API経由でアクセスできる

input topicsは下記の2つ

  • song-feed
    • 楽曲情報
    • KTableで表現される
  • play-events
    • 再生ログ
    • ただし30秒以上再生されたものだけ処理するよう実装されてるらしい

上記のtopicがjoinされ、 songPlayCounts というKTable( song-play-count という state store) になる

さらにこれをgroupingしてジャンルごとに集計し、 top-five-songs-by-genre という state storeが作られる

kei_qkei_q

まずは KafkaMusic のあたりのfileをみる

  • KafkaMusicExample
    • 今回のstream app本体
    • stream appの設定やtopologyなどはここを見る
  • KafkaMusicExampleDriver
    • 動作確認用にdummy dataを流してくれるdriver
  • MusicPlaysRestService
    • Interactive Queries APIを使ってStateStoreを取得するREST proxy
    • sampleとして用意されたendpointやInteractive Queriesの使い方はここを読む
    • web serverにはjettyというのを使ってるらしい https://www.eclipse.org/jetty/
  • SongBean
    • MusicPlaysRestServiceで結果を返すのに使っている?
      • jettyどころかjavaもよくわからないので別途調べる
  • SongPlayCountBean
    • 同上

Beanってこれのこと? https://e-words.jp/w/Bean.html

SongSongPlayCount, PlayEvent はこちらでschemaが定義されている。この定義をもとにclassが自動生成されるはず
https://github.com/confluentinc/kafka-streams-examples/tree/59061b606fa0104e3b7ce4971b3a4cfde274f887/src/main/resources/avro/io/confluent/examples/streams

kei_qkei_q

次はいきなり KafkaMusicExample を見る前に KafkaMusicExampleDriver.javaを読んでinput-topicに流れるrecordからイメージできるようにする

動作方法はcode commentにあるように以下で実行する
引数を渡すことでbootstrap-serverやschema-registry-urlを変更できるが、defaultなら下記のままでよい

$ java -cp target/kafka-streams-examples-6.0.1-standalone.jar io.confluent.examples.streams.interactivequeries.kafkamusic.KafkaMusicExampleDriver

処理は大きく2つ

  • kafka-streams-examples/song_source.csv をloadする
  • 一定間隔で playEvent をproduceする
    • songIdはrandomで設定、再生時間(duration)は60秒固定
    • こちらはdriverを止めるまで生成し続ける

流すmessageを変えたい場合は、このDriverのコードを書き換えてbuildし直す

kei_qkei_q

入力データはわかったのでKafkaMusicExample.java を読んでstream applicationのtopologyまわりを理解する
REST APIまわりのコードはあとで確認する
コードも詳細は後にして、まずは全体の流れを理解する

登場人物

  • serde
    • playEventSerde
    • keySongSerde
    • valueSongSerde
    • songPlayCountSerde
    • topFiveSerde
      • これだけavro schemaではなくinner classで定義している
      • 理由はコードを読んで後ほど理解する
  • KStream
    • playEvents
      • DriverからのplayEventのstream
    • playsBySongId
      • playEventsを30秒以上の条件でfilteringしたのち、songIdでrepartitionしたstream
    • songPlays
      • playsBySongIdをsongTableでenrichmentしたもの
  • KTable
    • songTable
      • Driverからのsongのtable
    • songPlayCounts
      • songPlaysをもとに曲ごとに再生回数を集計したtable
  • StateStore (Materialized.asで名前付けされているもの)
    • SONG_PLAY_COUNT_STORE
      • 曲ごとに再生回数を持ったstate store
    • TOP_FIVE_SONGS_BY_GENRE_STORE
      • ジャンルごとに再生回数が上位5曲の結果を持ったstate store
    • TOP_FIVE_SONGS_STORE
      • すべての曲から再生回数が上位5曲の結果を持ったstate store

今回state storeに記録されたデータはInteractive Queriesによってアクセスされ、REST API経由で外部から読めるようになっている

kei_qkei_q

topology

https://zz85.github.io/kafka-streams-viz/ でtopologyを可視化する
topologyの出力は System.out.println(topology.describe()); といったコードを埋め込んでもう一度packageを作る
testの実行にとても時間がかかるので、 mvn -Dskip.tests=true package などとしてtestをskipする

Topologies:                                                                                                                                    
   Sub-topology: 0                                                                                                                             
    Source: KSTREAM-SOURCE-0000000000 (topics: [play-events])                                                                                  
      --> KSTREAM-FILTER-0000000003                                                                                                            
    Processor: KSTREAM-FILTER-0000000003 (stores: [])                                                                                          
      --> KSTREAM-MAP-0000000004                                                                                                               
      <-- KSTREAM-SOURCE-0000000000                                                                                                            
    Processor: KSTREAM-MAP-0000000004 (stores: [])                                                                                             
      --> KSTREAM-FILTER-0000000006                                                                                                            
      <-- KSTREAM-FILTER-0000000003                                                                                                            
    Processor: KSTREAM-FILTER-0000000006 (stores: [])                                                                                          
      --> KSTREAM-SINK-0000000005                                                                                                              
      <-- KSTREAM-MAP-0000000004                                                                                                               
    Sink: KSTREAM-SINK-0000000005 (topic: KSTREAM-MAP-0000000004-repartition)                                                                  
      <-- KSTREAM-FILTER-0000000006                                                                                                            
                                                                                                                                               
  Sub-topology: 1                                                                                                                                
    Source: KSTREAM-SOURCE-0000000007 (topics: [KSTREAM-MAP-0000000004-repartition])                                                           
      --> KSTREAM-LEFTJOIN-0000000008                                                                                                          
    Processor: KSTREAM-LEFTJOIN-0000000008 (stores: [all-songs])                                                                               
      --> KSTREAM-KEY-SELECT-0000000009                                                                                                             
      <-- KSTREAM-SOURCE-0000000007                                                                                                              
    Processor: KSTREAM-KEY-SELECT-0000000009 (stores: [])                                                                                            
      --> song-play-count-repartition-filter                                                                                                   
      <-- KSTREAM-LEFTJOIN-0000000008                                                                                                                  
    Source: KSTREAM-SOURCE-0000000001 (topics: [song-feed])                                                                                    
      --> KTABLE-SOURCE-0000000002                                                                                                                    
    Processor: song-play-count-repartition-filter (stores: [])                                                                                      
      --> song-play-count-repartition-sink                             
      <-- KSTREAM-KEY-SELECT-0000000009                                                                                                              
    Processor: KTABLE-SOURCE-0000000002 (stores: [all-songs])                                                                                         
      --> none                                                                                                                                         
      <-- KSTREAM-SOURCE-0000000001                                    
    Sink: song-play-count-repartition-sink (topic: song-play-count-repartition)                                                                        
      <-- song-play-count-repartition-filter                                                                                                   
                                                                                                                                               
  Sub-topology: 2                                                                                                                              
    Source: song-play-count-repartition-source (topics: [song-play-count-repartition])                                                                 
      --> KSTREAM-AGGREGATE-0000000010                                                                                                         
    Processor: KSTREAM-AGGREGATE-0000000010 (stores: [song-play-count])                                                                        
      --> KTABLE-SELECT-0000000014, KTABLE-SELECT-0000000018                                                                                   
      <-- song-play-count-repartition-source                           
    Processor: KTABLE-SELECT-0000000014 (stores: [])                                                                                           
      --> KSTREAM-SINK-0000000015                                      
      <-- KSTREAM-AGGREGATE-0000000010                                                                                                           
    Processor: KTABLE-SELECT-0000000018 (stores: [])                   
      --> KSTREAM-SINK-0000000019                                                                                                                 
      <-- KSTREAM-AGGREGATE-0000000010                                                                                                         
    Sink: KSTREAM-SINK-0000000015 (topic: top-five-songs-by-genre-repartition)                                                                      
      <-- KTABLE-SELECT-0000000014                                     
    Sink: KSTREAM-SINK-0000000019 (topic: top-five-songs-repartition)                                                                                
      <-- KTABLE-SELECT-0000000018                                                                                                                    
                                                                                                                                                       
  Sub-topology: 3                                                      
    Source: KSTREAM-SOURCE-0000000016 (topics: [top-five-songs-by-genre-repartition])                                                          
      --> KTABLE-AGGREGATE-0000000017                                                                                                          
    Processor: KTABLE-AGGREGATE-0000000017 (stores: [top-five-songs-by-genre])                                                                 
      --> none                                                         
      <-- KSTREAM-SOURCE-0000000016                                                                                                            
                                                                                                                                               
  Sub-topology: 4                                                                                                                              
    Source: KSTREAM-SOURCE-0000000020 (topics: [top-five-songs-repartition])                                                                   
      --> KTABLE-AGGREGATE-0000000021                                                                                                          
    Processor: KTABLE-AGGREGATE-0000000021 (stores: [top-five-songs])  
      --> none                                                                                                                                   
      <-- KSTREAM-SOURCE-0000000020    

kei_qkei_q

time limitなので今日はここまで
続きはtopologyの図を見ながらコードを読む

kei_qkei_q

sub-topologyが5つ

sub-toplogy 0 : playsBySongId

再生時間が指定以上のもののfilteringとrepartitionのためのmap
repartitionは次のsub-topologyでjoinするため
疑問:mapのあとにfilterのprocessorがあるのはなぜ

該当コードはKafkaMusicExample.java#L314あたり

    // get a stream of play events
    final KStream<String, PlayEvent> playEvents = builder.stream(
        PLAY_EVENTS,
        Consumed.with(Serdes.String(), playEventSerde));
...
    // Accept play events that have a duration >= the minimum
    final KStream<Long, PlayEvent> playsBySongId =
        playEvents.filter((region, event) -> event.getDuration() >= MIN_CHARTABLE_DURATION)
            // repartition based on song id
            .map((key, value) -> KeyValue.pair(value.getSongId(), value));

sub-topology 1 : songPlayCountsのgroupBy

以下を行うtopology

  • song-feedを読み込んで前述のplaysBySongIdとleftJoinする
  • まだここではaggregateがないので、countの処理前までのgroupByよるselectKeyやrepartitionまででsub-topologyになっているぽい
    • ここは理解があやしい

該当コードはKafkaMusicExample.java#L326あたり

sub-topology 2 : songPlayCountsのcount

sub-topology1をsourceとしてcountで集計し、StateStoreと後のsub-topologyのためのtopicにrecordを流す
aggregateの後ろのselectとrepartitionはtop-five-songs-by-genreとtop-five-songsのgroupByによるものだろう

sub-topology 3 : top-five-songs-by-genre

songPlayCountsからジャンル毎のtop5を集計し、state storeに結果を記録する
このsampleではinteractive queriesによってデータを使うため、出力topicはない

該当コードはKafkaMusicExample.java#L338あたり

    // Compute the top five charts for each genre. The results of this computation will continuously update the state
    // store "top-five-songs-by-genre", and this state store can then be queried interactively via a REST API (cf.
    // MusicPlaysRestService) for the latest charts per genre.
    songPlayCounts.groupBy((song, plays) ->
            KeyValue.pair(song.getGenre().toLowerCase(),
                new SongPlayCount(song.getId(), plays)),
        Grouped.with(Serdes.String(), songPlayCountSerde))
        // aggregate into a TopFiveSongs instance that will keep track
        // of the current top five for each genre. The data will be available in the
        // top-five-songs-genre store
        .aggregate(TopFiveSongs::new,
            (aggKey, value, aggregate) -> {
              aggregate.add(value);
              return aggregate;
            },
            (aggKey, value, aggregate) -> {
              aggregate.remove(value);
              return aggregate;
            },
            Materialized.<String, TopFiveSongs, KeyValueStore<Bytes, byte[]>>as(TOP_FIVE_SONGS_BY_GENRE_STORE)
                .withKeySerde(Serdes.String())
                .withValueSerde(topFiveSerde)
        );

sub-topology 4 : top-five-songs

sub-topology 3とほぼ同じで、こちらはジャンル別ではなく全楽曲での集計

該当コードはKafkaMusicExample.java#L362あたり


    // Compute the top five chart. The results of this computation will continuously update the state
    // store "top-five-songs", and this state store can then be queried interactively via a REST API (cf.
    // MusicPlaysRestService) for the latest charts per genre.
    songPlayCounts.groupBy((song, plays) ->
            KeyValue.pair(TOP_FIVE_KEY,
                new SongPlayCount(song.getId(), plays)),
        Grouped.with(Serdes.String(), songPlayCountSerde))
        .aggregate(TopFiveSongs::new,
            (aggKey, value, aggregate) -> {
              aggregate.add(value);
              return aggregate;
            },
            (aggKey, value, aggregate) -> {
              aggregate.remove(value);
              return aggregate;
            },
            Materialized.<String, TopFiveSongs, KeyValueStore<Bytes, byte[]>>as(TOP_FIVE_SONGS_STORE)
                .withKeySerde(Serdes.String())
                .withValueSerde(topFiveSerde)
        );
kei_qkei_q

TopFiveSongs はMapやTreeSetといったfieldを持っているので、そのためにTopFiveSerdeを用意している

TopFiveSerdeでは serializedeserialize を定義してbyte列を相互変換できるようにしているみたい

kei_qkei_q

TopFiveSongs class

topologyの最後でtop5の集計のときにaggregate methodで使われる集計用のclass
メインの処理は topFive というTreeSetと add methodあたり

tree set

JavaのTreeSetのdoc : TreeSet (Java Platform SE 8 )
SortedSet interfaceを実装しているSetとしてTreeSetを使ってる?
処理自体は再生回数が多い順にsortし、もし同数ならsongIdが若い方を優先するようになっている

    private final TreeSet<SongPlayCount> topFive = new TreeSet<>((o1, o2) -> {
      final Long o1Plays = o1.getPlays();
      final Long o2Plays = o2.getPlays();

      final int result = o2Plays.compareTo(o1Plays);
      if (result != 0) {
        return result;
      }
      final Long o1SongId = o1.getSongId();
      final Long o2SongId = o2.getSongId();
      return o1SongId.compareTo(o2SongId);
    });

add method

新しいtop5候補を追加する
すでにtop5に同じものが含まれてる場合、削除からの追加で値を更新する
候補が5件を超えたら topFive.last() で一番後ろの候補を取り除くことで5件を維持している

    public void add(final SongPlayCount songPlayCount) {
      if(currentSongs.containsKey(songPlayCount.getSongId())) {
        topFive.remove(currentSongs.remove(songPlayCount.getSongId()));
      }
      topFive.add(songPlayCount);
      currentSongs.put(songPlayCount.getSongId(), songPlayCount);
      if (topFive.size() > 5) {
        final SongPlayCount last = topFive.last();
        currentSongs.remove(last.getSongId());
        topFive.remove(last);
      }
    }
kei_qkei_q

topologyの全体の流れはわかった気がするので、次にRestServiceの方の概要を把握してtopologyの最後にstate storeに記録した値がどのように参照されてるのかを理解する
読むのは主に二箇所:

kei_qkei_q

mainのほうはこのぐらい

  • settings
    • StreamsConfig.APPLICATION_SERVER_CONFIG
    • StreamsConfig.STATE_DIR_CONFIG
  • RestServiceの初期化
    • streamsのinstanceとhostInfoを渡してあとはMusicPlayRestServiceで処理
kei_qkei_q

MusicPlaysRessetService

constructorの方は引数をそのまま保持してるのとMetatdataServiceの初期化ぐらいなので、 start() の方から読んでみる

start()

MusicPlaysRestService.java#212

Eclipse Jetty | The Eclipse Foundationというのをつかってweb serverを起動しているみたいで、その初期設定とserverの起動処理が書いてある

endpointsを調べる

このsampleの使い方の例に以下のendpointが挙がっているので、これらの処理を一つずつ追ってみる

 * # List all running instances of this application
 * http://localhost:7070/kafka-music/instances
 *
 * # List app instances that currently manage (parts of) state store "song-play-count"
 * http://localhost:7070/kafka-music/instances/song-play-count
 *
 * # Get the latest top five for the genre "punk"
 * http://localhost:7070/kafka-music/charts/genre/punk
 *
 * # Get the latest top five across all genres
 * http://localhost:7070/kafka-music/charts/top-five

endpoint : http://localhost:7070/kafka-music/instances

KafkaMusic applicationで動いているinstanceの一覧を返す

MusicPlaysRestService.javaL188

MetadataService.java のほうが実体で、やっていることは streams.allMetadata() で値を引っ張ってきてhostInfoにmapして値を返している

endpoint : http://localhost:7070/kafka-music/instances/song-play-count

song-play-count state storeを一部でも持っているinstanceを列挙する

MusicPlaysRestService.javaL201

前述と処理はほぼ同じで、 streams.allMetadata() の代わりに streams.allMetadataForStore(store) を使っている

endpoint : http://localhost:7070/kafka-music/charts/genre/punk

ジャンル別のtop5を返す。このendpointだと punk のtop5を返す

MusicPlaysRestService.javaL75

requestを受け付けたapplication instanceが該当ジャンルを含むstate storeを保持しているとは限らないので、 streams.metadataForKey(store, key, serializer) を使って取得先を見つけ、そちらに再度requestする ( fetchSongPlayCount あたり )
もし自身が該当するstate storeを保持している場合は取得したtop5のデータに加え、 ALL_SONGS から楽曲情報を取得し、enrichmentしたデータを返す。このとき、この ALL_SONGS はGlobalKTableではなくKTableなので、取得したい値が同一hostに存在するとは限らない。そのため、こちらも同様にmetadataで保持しているhostを探し、必要であればrequestを投げる

endpoint : http://localhost:7070/kafka-music/charts/top-five

前述の内容とほぼ同じで、top5の取得先が全楽曲の方のstate storeを見ている