🐅

Apache Flinkを使いKinesisストリームデータを処理する

2020/12/08に公開

これはただの集団 Advent Calendar 2020の7日目,Scala Advent Calendar 2020の記事です。

はじめに

最近ストリーム処理について調べ、Apache Flinkについて調査したのでFlinkの使い方を紹介します。

サンプルコードはこちらです。

Flinkについて

Flinkは、分散ストリームとバッチデータ処理のフレームワークです。

Retrieved from Apache Flink® — Stateful Computations over Data Streams

Scala、Javaで作られており、FlinkのAPIとしてはJava,Scala、Pythonで提供されています。
ファイルやKinesisなどのストリームデータからデータを受け取り、ETL処理して、別のデータストアへデータを流すことができます。

またFlinkはAWSのEMRで動かすことができ、Apache Hadoop YARNを使い複数のノードで並列に処理を実行することが可能です。

参考:Amazon EMR 内で Flink ジョブを操作する

ストリーム処理の技術選定

今回ストリーム処理を行う上で、評価にしたことが性能(スループット)と、スケールアウト、実現のしやすさ、社内実績です。

検討対象

  • 実データで比較した訳ではないのですが、Flinkは高いスループットでレイテンシーが低いという説明が多く見受けられ、2015年にYahoo社の行われた比較から、性能面でSparkより良さそうと判断しました。(また、Flinkはthroughputに対するレイテンシが定数であるが、Sparkは線形増加)
    https://www.slideshare.net/sbaltagi/flink-vs-spark

https://yahooeng.tumblr.com/post/135321837876/benchmarking-streaming-computation-engines-at

AkkaよりもFlinkは明示的に書くことが少ないため、より実現性が高いものにした。

社内実績

Flinkを採用したプロダクトで、実データですでにストリームデータを処理できる実績があり、性能的にも大丈夫であると判断した。

技術選定のまとめ

ストリーム処理できること 処理性能 実装、実現のしやすさ 当社実績
Flink
Spark
Akka

※上記はあくまで私の視点での評価です。Akkaの熟練者であれば評価が変わると思います。

Akkaを使いたかってみたったのですが、学習コストが他と比較してかかってしまうため今回はFlinkを使うことにしました。


またAkkaに詳しい方より、「Akkaはフレームワークではなく、ライブラリなので、フレームワークはAkkaをベースに作ってくれ」 という立ち位置であり、フレームワークと比較するとトレーサビリティなどで見劣りするかもしれませんと教えて頂きました。

後で後述しますが、Flinkにも内部でAkkaが使われていて、Akkaベースで作られておりました。
AkkaはSpark、またはFlinkよりも低レベルのライブラリで、より多くのパワーがあるが、考慮すべき考慮事項も多くなるということだということかなと思います。

Flinkの強み

  • データの保証 Exactly once

    • メッセージは欠損も重複もなく届けられる。
  • 高いスループットとローテイテンシー

    • 公式ドキュメントでFlinkは高いスループットとローテイテンシーであると説明されてますが、以下の記事からも、Flinkはthroughputに対するレイテンシが定数であるとのこと(Sparkは線形増加)

参考:Apache Flink とは

処理するデータ量が増えても処理がつまりにくいのではないかと想定しています。

Flinkの基本的なAPI

公式ドキュメントに基本的な使い方などが書かれています。

ここでFlinkの基本的なAPIについて紹介します。

  • DataStream

    • DataStreamはFlinkでストリームデータを処理するためのコレクションを表現するクラスです。
      • コレクションを表すが重複を含むことができ、有限、無限のデータを扱えます。
      • 不変であり、そのまま要素追加、要素の削除はできません。
      • DataStream API の操作を使いデータの変換します。
      • What is a DataStream?
  • Data Sinks
    Data SinksはDataStreamを消費し、ファイル、ソケット、外部システムに転送したり、標準出力してDataStreamの値を返します。

  • Windows

    • Windowsはストリームを有限サイズの「バケット」に分割し、その上で計算を行います。
    • ストリームデータを一定の時間でまとめて、処理することができるます。
  • Event Time

    • データストリームソースがどのように振る舞うか(例えば、タイムスタンプを割り当てるかどうか)、KeyedStream.timeWindow(Time.seconds(30))のようなウィンドウ操作でどのような時間の概念を使用すべきかを定義します。
  • Async I/O API

    • 非同期にI/O処理を行う場合(DBからデータを参照する時や外部APIと通信する場合など)に利用する。

使い方

Flinkのプログラムの基本的な構成は以下のような流れになります。

  • 実行環境を用意する。
  • データを読み込む。 or 作る。
  • ストリームデータを変換する。
  • 処理した結果をどこかに配置する。
  • プログラムの実行を走らせる。

今回はKinesinのストリームデータからデータを取得し、標準出力を処理まで試してみます。

必要なライブラリを追加

sbtプロジェクトを用意し、build.sbtに追加します。

val flinkVersion = "1.11.2"

libraryDependencies ++= Seq(
  "org.apache.flink" %% "flink-streaming-scala"   % flinkVersion,
  "org.apache.flink" %% "flink-connector-kinesis" % flinkVersion,
  "org.apache.flink" %% "flink-clients"           % flinkVersion,
  "org.apache.flink" %% "flink-runtime-web"       % flinkVersion,
  "ch.qos.logback"   % "logback-classic"          % "1.2.3"

いくつかライブラリについて解説します。

  • flink-streaming-scala

  • flink-connector-kinesis

  • flink-clients

    • ローカル環境でflinkを実行する場合に追加します。
  • flink-runtime-web

    • Apache Flink Web Dashboardを使う場合に追加します。リアルタイムでFlinkの処理をモニタリングすることが可能です。
  • logback-classic

    • ログを出力するために追加。

Kinesisからストリームデータを取得する

Flinkには基本的なデータソース、シンクが組み込まれております。

  • Apache Kafka (source/sink)
  • Apache Cassandra (sink)
  • Amazon Kinesis Streams (source/sink)
  • Elasticsearch (sink)
  • Hadoop FileSystem (sink)
  • RabbitMQ (source/sink)
  • Apache NiFi (source/sink)
  • Twitter Streaming API (source)
  • Google PubSub (source/sink)
  • JDBC (sink)

定義済みのデータソースでは、コレクションやイテレータからのデータの取り込みなどがあり、定義済みのデータシンクは、ファイル、標準出力、標準エラー、ソケットへの書き込みをサポートしています。

ここではAmazon AWS Kinesis Streams Connectorを使います。

import java.util.Properties

import org.apache.flink.api.common.serialization.SimpleStringSchema
import org.apache.flink.streaming.api.scala._
import org.apache.flink.streaming.connectors.kinesis.FlinkKinesisConsumer
import org.apache.flink.streaming.connectors.kinesis.config.{AWSConfigConstants, ConsumerConfigConstants}

object MyFlinkKinesisConsumer {

  def createKinesisSourceFrom(env: StreamExecutionEnvironment): DataStream[String] = {
    val consumerConfig = new Properties()
    consumerConfig.put(AWSConfigConstants.AWS_REGION, "us-east-1")
    consumerConfig.put(AWSConfigConstants.AWS_ACCESS_KEY_ID, "aws_access_key_id")
    consumerConfig.put(AWSConfigConstants.AWS_SECRET_ACCESS_KEY, "aws_secret_access_key")
    consumerConfig.put(ConsumerConfigConstants.STREAM_INITIAL_POSITION, "LATEST")

    env.enableCheckpointing(Config.kinesis.checkPointInterval)
    env.addSource(new FlinkKinesisConsumer[String]("sample-stream", new SimpleStringSchema, consumerConfig))
  }
}

ここでは、AWSのCredential情報からセットアップし、
Kinesisのsample-streamという名前のストリームからデータを取得する設定を行っています。

実際にAWS上で動かす際は、対象のインスタンスにIAM ロールを付与し 以下の設定を行うことでKinesisへアクセスすることができます。

    val consumerConfig = new Properties()
    consumerConfig.put(AWSConfigConstants.AWS_REGION, "us-east-1")
    consumerConfig.put(AWSConfigConstants.AWS_CREDENTIALS_PROVIDER, "AUTO")

エントリーポイントの用意

import org.apache.flink.streaming.api.scala._
import org.apache.flink.streaming.api.windowing.time.Time

object Main {

  def main(args: Array[String]): Unit = {
    val env: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment
    // デバッグ目的で実行するため並列数を1にして実行します。デフォルトでは実行する環境で最大の並列数で実行されます。
    env.setParallelism(1)
    val input = MyFlinkKinesisConsumer.createKinesisSourceFrom(env)

    val counts = input.flatMap { _.toLowerCase.split("\\W+") filter { _.nonEmpty } }
      .map { (_, 1) }
      .keyBy(_._1)
      .timeWindow(Time.seconds(5))
      .sum(1)
    counts.print()

// 以下で処理が実行される。
    env.execute("run flink sample application")
  }
}

上記は、Kinesisのストリームデータから文字列を読み込み、単語毎に分割して、文字でグルーピングする処理を行っております。
Windowsを使い5秒間のストリーム処理の中で、何度その単語が渡されたのか集計を行い、コンソールに出力しています。

実際に動かします。

% sbt run

別ターミナルから Kinesisのストリームを作成し、データをputします。

# streamを作成
% aws kinesis create-stream --stream-name sample-stream --shard-count 1

# データレコードをストリームに配置
% aws kinesis put-record --stream-name sample-stream --partition-key 001 --data 'this is a Flink sample application.'

出力結果

(this,1)
(application,1)
(sample,1)
(flink,1)
(a,1)
(is,1)

// 5秒間で3回同じデータをKinesisにputした場合
(this,3)
(application,3)
(sample,3)
(flink,3)
(a,3)
(is,3)

単語が分割されて、集計されていることがわかります。

Webコンソール画面

送られたデータ数やデータの容量がモニタリングできます。

ということでざっとKinesisのストリームデータを処理しました。

感想

FlinkのAPIを覚える必要がありますが、手軽な印象で、デプロイする際にノード数を指定できるので、スケールアウトも行いやすいなと感じました。

Kinesisを使ったストリーム処理を行う場合は、通常はKCL(Kinesis Client Library)を使う必要がありますが、Kinesisのコネクターを使うことで、明示的にKCLの設定を行う必要はありません。

またクラスターの管理画面やモニタリングが備わっており、手軽に並列処理、分散したノードでストリーム処理ができる手軽さが良いです。

その他

ストリーム処理ということで、Akka(Actor, Stream, Cluster, Alpakka)を活用すれば実現できるのではないかと考えたのですが、私がAkkaを実務で経験したことがなく、学習コストがかかりそうだったので今回は使いませんでした。

一方で、Akka Stream と Apache Flinkの違いについてなんだろうを悩んでいたところ以下のリンクを見つけました。

Akka streams vs Apache Flink

I would say the main reasons to go for Apache Flink is the exactly once guarantees and automated distribution that comes with it. Otherwise Akka Streams is a very powerful API with simpler execution model.
Apache Flink を選ぶ主な理由は、正確な一度だけの保証と自動化されたディストリビューションにあると言えるでしょう。

Apache Flinkは、exactly onceと自動化されたスケールアウトなどが利点とのことです。

一方でFlinkの内部では分散通信としてAkkaが使われており、
Flinkシステムは、通信する必要がある3つの分散コンポーネント(JobClient、JobManager、TaskManager)で構成されています。

Retrieved from Akka and Actors

ジョブクライアントは、ユーザーからFlinkジョブを受け取り、それをジョブマネージャに送信します。
ジョブマネージャは、ジョブの実行のオーケストレーションを行います。
まず、必要な量のリソースを割り当てます。これには主にタスクマネージャの実行スロットが含まれます。
リソースの割り当て後、ジョブマネージャはジョブの個々のタスクをそれぞれのタスクマネージャにデプロイします。 タスクを受け取ると、タスクマネージャはタスクを実行するスレッドをスポーンします。計算の開始や終了などの状態の変化がジョブマネージャに送信されます。これらの状態の更新に基づいて、ジョブマネージャは終了するまでジョブの実行を制御します。ジョブが終了すると、その結果がジョブクライアントに送信され、ユーザに通知されます。

Flinkに分散コンポーネントが用意されているので、Akkaの熟練者でなくとも、手軽にアクターシステムを使った処理が可能であるとが良い利点だと思います。

参考リンク

Discussion