🚀

Apache Beam on Flink デプロイのベストプラクティスガイド

に公開

はじめに

Apache Beamは、バッチとストリーミングの両方に対応する統一プログラミングモデルです。Beamで記述したパイプラインは、Apache Flink、Apache Spark、Google Cloud Dataflowなど、複数の分散処理エンジン(ランナー)で実行できます。特にApache Flinkは、低遅延、高スループット、堅牢なステートフル処理能力を持つため、ストリーミング処理の主要な選択肢です。

しかし、Beam Java SDKで開発したアプリケーションをFlinkクラスタへ本番デプロイする際には、以下のような多くの課題が存在します。

  • 依存関係の競合
  • 不適切なパッケージング
  • 非効率なリソース設定
  • 不十分な耐障害性構成

これらの課題は、パイプラインの不安定化やパフォーマンス低下を引き起こします。

この記事では、Gradleをビルドツールとして利用し、Beam Java SDKアプリケーションをFlinkクラスタへデプロイするためのベストプラクティスを解説します。データエンジニアが堅牢でスケーラブルなデータパイプラインを構築するための、実践的で詳細なガイドを提供します。

Apache Beam on Flinkのプロダクションコードで考慮するポイントは、以下の記事を参照してください。

https://zenn.dev/suwash/articles/apache_beam_flink_20250523

https://zenn.dev/suwash/articles/beam_java_best_practice_20250917

Flinkジョブ実行のライフサイクル:JARからメモリまで

flink runコマンド実行後、データが処理されるまでの内部的な流れを解説します。

要約

  • ジョブ投入時、FlinkクライアントはBeamパイプラインをFlinkのJobGraphに変換し、Fat JARと共にJobManagerへ送信します。
  • JobManagerJobGraphを物理実行計画であるExecutionGraphに変換し、JARファイルを各TaskManagerに配布してタスクの実行を指示します。
  • TaskManagerでは、ジョブごとに独立したUser Code ClassLoaderが作成され、Fat JARからクラスをロードします。これにより、ジョブ間の依存関係が分離されます。
  • タスク実行時、ユーザーコードが生成するオブジェクトはJVMヒープに、RocksDBのキャッシュなどはオフヒープの管理メモリに格納され、ワークロードに応じてメモリ領域が使い分けられます。

ジョブ実行フロー

実行ステップの詳細

ステップ 実行主体 アクション
1. ジョブ投入 Flinkクライアント 1. mainメソッドを実行し、BeamパイプラインをJobGraphに変換。
2. JobGraphとFat JARをJobManagerに送信。
2. 調整 JobManager 1. Fat JARを全TaskManagerに配布。
2. JobGraphを物理実行計画であるExecutionGraphに変換。
3. ResourceManagerにスロットを要求し、タスクをデプロイ。
3. 準備 TaskManager 1. ジョブごとに新しいUser Code ClassLoaderを作成。
2. 配布されたFat JARから、親クラスローダー(Flinkフレームワーク)に存在しないクラスをロード。
4. 実行 TaskManager 1. スロット内でタスクを実行。
2. ワークロードに応じてJVMヒープや管理メモリなどのメモリ領域を利用。

TaskManagerのメモリ利用

スロット内で実行されるタスクは、TaskManagerのJVMが持つメモリ領域を共有して利用します。

メモリ領域 格納されるもの 主な特徴
JVMヒープ (Task Heap) ・ユーザーコード内で生成されるJavaオブジェクト
DoFnなどの関数インスタンス
・Memory/FsStateBackendの実行中ステート
・GCの対象
・巨大なステートを保持するとGC停止が頻発するリスク
管理メモリ (Managed Memory) ・RocksDBのブロックキャッシュ
・ソート、ハッシュ処理用の一時バッファ
・通常はオフヒープ
・GCの対象外で、パフォーマンスが予測可能
・巨大なステートを持つRocksDBの性能を左右する
ネットワークメモリ ・タスク間データ転送用のバッファ ・オフヒープ
・シャッフル性能に影響
JVMメタスペース ・ロードされたクラスの定義情報 ・クラスのメタデータが格納される領域

セクション1:プロジェクト基盤の設計

堅牢なプロジェクト基盤は、開発後半で発生する問題を未然に防ぎます。特にバージョン互換性の確保と依存関係の管理が重要です。

要約

  • build.gradle.ktsjava, application, shadowプラグインを適用し、ビルドの基盤を確立します。
  • Apache Beam公式の互換性マトリクスに基づき、Beam SDK、Flink Runner、Flinkクラスタのバージョンを厳密に揃えます。
  • Apache Beamが提供するBOM (Bill of Materials) を利用して、数十に及ぶ推移的依存関係のバージョンを競合なく一元管理します。

プラクティス1.1:堅牢なbuild.gradle.kts構成の確立

保守性の高いプロジェクトの礎として、適切に構造化されたbuild.gradle.ktsファイルを作成します。

  1. プラグインの適用:

    • javaプラグイン: Javaのコンパイル機能を提供します。
    • applicationプラグイン: アプリケーションのエントリポイント(メインクラス)を指定します。flink runコマンドでの実行に必要です。
    • com.gradleup.shadowプラグイン: 全ての依存関係を含む単一の実行可能JARファイル(Fat JAR)を作成します。
  2. build.gradle.ktsの初期設定例:

    // build.gradle.kts
    
    // 必要なプラグインを適用
    plugins {
        java
        application
        // Fat JARを作成するためのShadow Plugin
        id("com.gradleup.shadow") version "8.3.0"
    }
    
    // プロジェクトの基本情報
    group = "org.example.beam"
    version = "0.1-SNAPSHOT"
    
    // 依存関係を解決するためのリポジトリ
    repositories {
        mavenCentral()
    }
    
    // Javaのバージョンを指定
    java {
        sourceCompatibility = JavaVersion.VERSION_11
        targetCompatibility = JavaVersion.VERSION_11
    }
    
    // アプリケーションのメインクラスを指定
    // 'flink run -c' コマンドで必要
    application {
        mainClass.set("org.example.beam.WordCount")
    }
    

プラクティス1.2:スタック全体でのバージョン互換性の強制

Beam SDK、Flink Runner、Flinkクラスタ間のバージョン非互換性は、最も頻繁に発生する失敗原因です。

  • 互換性の重要性: Flink Runnerは特定のFlinkバージョンの内部APIに密結合しています。そのため、ターゲットのFlinkクラスタバージョンに適合するRunnerを正確に選択する必要があります。
  • 互換性マトリクスの参照: プロジェクト開始時に、Apache Beamの公式ドキュメントが提供する互換性マトリクスを確認し、バージョンの組み合わせを決定します。
Flink Version Artifact Id Supported Beam Versions
1.19.x beam-runners-flink-1.19 ≥ 2.61.0
1.18.x beam-runners-flink-1.18 ≥ 2.57.0
1.17.x beam-runners-flink-1.17 ≥ 2.56.0
1.16.x beam-runners-flink-1.16 2.47.0 - 2.60.0
1.15.x beam-runners-flink-1.15 2.40.0 - 2.60.0
1.14.x beam-runners-flink-1.14 2.38.0 - 2.56.0
1.13.x beam-runners-flink-1.13 2.31.0 - 2.55.0

プラクティス1.3:Apache Beam BOMによる依存関係管理の簡素化

Bill of Materials (BOM) を使用して、数十の推移的依存関係のバージョンを安全に管理します。

  • BOMの利点: 依存関係のバージョンを個別に管理する手間を省き、「ダイヤモンド依存問題」のような競合を防ぎます。
  • 推奨されるBOM: Beamは2つのBOMを提供しますが、gRPCやProtobufなどの競合を最小限に抑えるため、より包括的なbeam-sdks-java-google-cloud-platform-bomの使用を推奨します。
  • 設定方法: Gradleのdependenciesブロック内でplatform()キーワードを使いBOMをインポートします。その後、個々のBeam依存関係からバージョン指定を削除します。

build.gradle.ktsの依存関係ブロック設定例

// build.gradle.kts

// バージョンを一元管理するための変数
val beamVersion = "2.57.0"
// ターゲットのFlinkバージョンに対応するランナーのバージョンを指定
val flinkRunnerVersion = "1.18"

dependencies {
    // 包括的なBOMをインポートして、すべての依存関係のバージョンを管理
    implementation(platform("org.apache.beam:beam-sdks-java-google-cloud-platform-bom:$beamVersion"))

    // Beamの依存関係をバージョン指定なしで宣言
    implementation("org.apache.beam:beam-sdks-java-core")
    implementation("org.apache.beam:beam-runners-flink-$flinkRunnerVersion")

    // 必要に応じて他のI/Oや依存関係を追加(例:KafkaIO)
    implementation("org.apache.beam:beam-sdks-java-io-kafka")

    // ロギングのためのSLF4J API
    implementation("org.slf4j:slf4j-api:2.0.13")
}

セクション2:Flink向けアプリケーションパッケージングの習得

アプリケーションを、依存関係を含んだ単一のデプロイ可能なアーティファクト(Fat JAR)にパッケージングします。

要約

  • Gradle Shadow Pluginを使い、アプリケーションコードと依存関係をすべて含む自己完結型のFat JARをビルドします。
  • Flinkクラスタが実行時に提供するライブラリ(Flink APIなど)は、compileOnlyスコープで指定し、Fat JARへのパッケージングから除外します。
  • GuavaやProtobufなど、避けられない依存関係の競合は、パッケージリロケーション機能でクラスを別名前に変更して分離・解決します。
  • Java ServiceLoaderの仕組みが正しく機能するように、META-INF/services配下のファイルをマージする設定を有効化します。

プラクティス2.1:Gradle Shadow Pluginによる自己完結型実行可能ファイルのビルド

flink runコマンドは、全ての依存関係を含む単一のJARファイルを要求します。Gradle Shadow Pluginは、この「Fat JAR」を作成するための標準ツールです。

  • shadowJarタスク: このタスクは、プロジェクトの依存関係をすべて内包したJARファイルを生成します。
  • 設定: shadowJarタスクを設定して、出力ファイル名のカスタマイズや、後述するSPIファイルのマージを有効化します。

build.gradle.ktsのShadowJar設定例

// build.gradle.kts
import com.github.jengelman.gradle.plugins.shadow.tasks.ShadowJar

// ... (plugins, repositories, dependencies ブロック) ...

tasks.withType<ShadowJar> {
    // CI/CDスクリプトで扱いやすいようにベース名を指定
    archiveBaseName.set("my-beam-app")
    // どのランナー向けかを示す分類子を追加
    archiveClassifier.set("flink-runner")
    // デプロイスクリプトのためにバージョンなしの名前にすることが多い
    archiveVersion.set("")
    // ServiceLoaderファイルのマージを有効化
    mergeServiceFiles()
}

// 'gradle build' を実行した際に shadowJar も実行されるように設定
tasks.build {
    dependsOn(tasks.shadowJar)
}

以下のコマンドでFat JARがbuild/libs/ディレクトリに生成されます。

./gradlew shadowJar

プラクティス2.2:compileOnlyスコープによるクラスタ依存関係の分離

アプリケーションのJARファイルには、Flinkクラスタのランタイムに既に存在するライブラリ(Flink自体やHadoopライブラリ)を含めるべきではありません。

  • compileOnlyの役割: このスコープで宣言された依存関係は、コンパイル時には利用できますが、最終的なFat JARにはパッケージングされません。これにより、実行環境が提供するライブラリとのクラスパス競合を防ぎます。
  • 環境との契約: compileOnlyの使用は、アプリケーションが実行時にFlinkクラスタによって提供されるクラスに依存することを意味します。これにより、クラスタのバージョンとアプリケーションの依存関係を一致させることが極めて重要になります。

build.gradle.ktscompileOnly使用例

// build.gradle.kts
val flinkVersion = "1.18.1" // Flinkクラスタのバージョンと一致させる

dependencies {
    // ... (implementation 依存関係) ...

    // Flinkの依存関係はコンパイルに必要だが、クラスタによって提供される。
    // Fat JARにバンドルしてはならない。
    compileOnly("org.apache.flink:flink-java:$flinkVersion")
    compileOnly("org.apache.flink:flink-streaming-java:$flinkVersion")

    // FlinkのAPIを直接使用する場合は、他のFlinkモジュールも追加する
}

プラクティス2.3:パッケージリロケーションによる競合の事前解決

BOMを使用しても避けられない依存関係の競合は、パッケージリロケーションという高度なテクニックで解決します。

  • リロケーションの仕組み: Shadow Pluginが、Fat JARにバンドルされた依存関係のバイトコードを書き換え、パッケージ名を変更します。これにより、同じライブラリの異なるバージョンが互いに干渉することなく共存できます。
  • 最終手段: 依存関係の競合を「解決」するのではなく「分離」するための強力な手法です。特に、Flinkが内部で使用するGuavaやProtobufとの競合を防ぐのに有効です。

build.gradle.ktsのリロケーション設定例

// build.gradle.kts
import com.github.jengelman.gradle.plugins.shadow.tasks.ShadowJar

tasks.withType<ShadowJar> {
    // ... (他のshadow設定) ...

    // Flink/Hadoopが使用するバージョンとの競合を避けるためにGuavaをリロケートする
    relocate("com.google.common", "my.beam.app.shadow.com.google.common")
    relocate("com.google.protobuf", "my.beam.app.shadow.com.google.protobuf")
}

プラクティス2.4:Service Provider Interface (SPI) のマージ処理

多くのJavaライブラリは、META-INF/servicesディレクトリにある設定ファイルを使って、実行時に実装を発見します(Java ServiceLoader)。

  • マージの必要性: Fat JAR作成時、複数の依存関係が同名のサービスファイルを持つ場合、これらは上書きされず、内容を結合(マージ)する必要があります。この処理を怠ると、Beam I/Oコネクタなどのサービスが読み込まれず、ランタイムエラーが発生します。
  • mergeServiceFiles(): Shadow Pluginが提供するこの設定を有効にすることで、サービスファイルが正しくマージされ、SPIメカニズムが期待通りに機能します。

build.gradle.ktsのマージ設定例

// build.gradle.kts
import com.github.jengelman.gradle.plugins.shadow.tasks.ShadowJar

tasks.withType<ShadowJar> {
    // ... (他のshadow設定) ...
    mergeServiceFiles()
}

セクション3:デプロイと開発のワークフロー

パイプラインのテストと実行のワークフローを確立します。ローカルでの迅速な開発と、クラスタへの本番デプロイの両方を扱います。

要約

  • 開発段階では、ローカルの単一JVMでパイプラインを実行するDirectRunnerを使い、ロジックの正当性を迅速にテスト・デバッグします。
  • 本番環境へは、flink runコマンドを使い、ビルドしたFat JARをFlinkクラスタにサブミットして実行します。
  • flink runコマンドのオプションを理解し、実行環境(デタッチモードなど)とパイプラインの振る舞い(Runner指定、並列度など)を正しく設定します。

プラクティス3.1:DirectRunnerによる開発の加速

パイプラインのロジックを、完全なFlinkクラスタにデプロイする前にローカルでテストします。

  • DirectRunnerの目的: ローカルマシンの単一JVM上でパイプラインを実行し、ロジックの正当性を検証するために使用します。パフォーマンスよりも正確性に最適化されており、分散環境で発生しうるバグを早期に発見できます。
  • 使用方法: beam-runners-direct-javaへの依存関係をtestImplementationスコープで追加し、パイプラインオプションとして--runner=DirectRunnerを渡します。

build.gradle.ktsのテスト依存関係設定例

// build.gradle.kts
dependencies {
    // ... (他の依存関係) ...

    // DirectRunnerはテストとローカル実行のために使用
    testImplementation("org.apache.beam:beam-runners-direct-java")
}

Gradle runタスクによるローカル実行コマンド

# プロジェクトのルートディレクトリから実行
./gradlew run --args="--runner=DirectRunner --inputFile=./input.txt --output=./output"

パッケージ化したBeamアプリケーションをFlinkクラスタにサブミットするための標準的な方法です。

  • コマンドの構造: flink runコマンドは、Flink固有のオプションとBeamパイプラインのオプションの2つに分かれます。JARファイルパス以降の引数が、パイプラインのmainメソッドに渡されます。
  • 主要なオプション:
オプション 説明
-c, --class エントリポイントとなるメインクラス
-d, --detached クライアント終了後もジョブを実行し続けるデタッチモード(本番用)
--runner=FlinkRunner BeamにFlink実行エンジンを使用させる必須オプション
-p, --parallelism ジョブ全体の並列度
# Flinkがインストールされ、そのbinディレクトリがPATHに含まれていることを想定
# Fat JARがbuild/libs/にあることを想定

flink run \
  -d \
  # 本番用のデタッチモード
  -c org.example.beam.WordCount \
  # メインクラス
  /path/to/project/build/libs/my-beam-app-flink-runner.jar \
  # パイプラインオプションの開始
  --runner=FlinkRunner \
  --jobName=MyWordCountJob \
  --inputFile=hdfs:///data/input.txt \
  --output=hdfs:///data/output \
  --parallelism=16
  # Flink固有のオプション

セクション4:ステート管理と信頼性の確保

ステートフルなストリーミングアプリケーションにとって、耐障害性は最重要です。ここではFlinkのチェックポイント機構とステート管理について解説します。

要約

  • チェックポイントを有効化し、アプリケーションの状態を定期的にスナップショットすることで、障害からの回復(Exactly-Onceセマンティクス)を可能にします。
  • 本番環境のステートバックエンドには、大規模な状態を効率的に扱えるRocksDBStateBackendを選択します。
  • パイプラインの設計において、Windowの集計方法を工夫したり、不要なステートをTTLで削除したりすることで、ステートサイズを最適化し、長期的な安定稼働を実現します。

プラクティス4.1:チェックポイントによる耐障害性の設定

チェックポイントは、アプリケーションの状態の整合性あるスナップショットを作成するFlinkの仕組みです。障害発生時、最新のチェックポイントから処理を再開し、データ損失を防ぎます。

  • 重要性: 本番環境でステートフルなストリーミングパイプラインを実行する場合、チェックポイントは必須です。Exactly-Onceセマンティクスと耐障害性を実現する中心的なメカニズムです。
  • 有効化: BeamのFlink Runnerではデフォルトで無効になっており、パイプラインオプションで明示的に有効にする必要があります。
  • 主要なオプション:
オプション 説明
checkpointingInterval チェックポイントを取得する間隔(ミリ秒)
checkpointTimeoutMillis チェックポイントが失敗と見なされるまでの最大時間(ミリ秒)
checkpointingMode EXACTLY_ONCE または AT_LEAST_ONCE
externalizedCheckpointsEnabled trueに設定すると、ジョブキャンセル後もチェックポイントを保持し、手動での再起動やアップグレードが可能
flink run ... \
  --runner=FlinkRunner \
  # 60秒ごとにチェックポイントを実行
  --checkpointingInterval=60000 \
  # 3分後にタイムアウト
  --checkpointTimeoutMillis=180000 \
  # Exactly-Onceセマンティクスを保証
  --checkpointingMode=EXACTLY_ONCE \
  # ジョブキャンセル後もチェックポイントを保持
  --externalizedCheckpointsEnabled=true

プラクティス4.2:本番用ステートバックエンドの選択:RocksDB

ステートバックエンドは、Flinkが状態をどこに、どのように保存するかを決定します。

ステートバックエンドの比較

バックエンド 状態の保存場所(実行中) 特徴 用途
MemoryStateBackend JVMヒープ 高速だが、ヒープサイズに制限され、GCの影響を受ける ローカルテスト、非常に小さな状態
FsStateBackend JVMヒープ チェックポイントは分散ファイルシステム(HDFS, S3)に永続化 中間的な選択肢
RocksDBStateBackend オフヒープメモリ、ローカルディスク 状態サイズはディスク容量にのみ依存。インクリメンタルチェックポイントをサポート すべての大規模本番ワークロード(推奨)

RocksDBStateBackendを選択すると、パフォーマンスのボトルネックがJVMヒープからローカルディスクのI/Oとオフヒープメモリ管理へ移行します。そのため、TaskManagerノードには高速なローカルディスク(NVMe SSDなど)を搭載することが重要です。

# デフォルトのステートバックエンドとしてRocksDBを使用
state.backend: rocksdb

# 大規模なステートを持つ場合にスナップショットを高速化するため、インクリメンタルチェックポイントを有効化
state.backend.incremental: true

# TaskManagerのローカルディスク上のRocksDBファイル用ディレクトリ
# 重要:このためには高速な専用ディスク(SSDなど)を使用すること
state.backend.rocksdb.localdir: /data/flink/rocksdb

# チェックポイントを保存する分散ファイルシステム(HDFS, S3)上のディレクトリ
state.checkpoints.dir: hdfs:///flink/checkpoints

プラクティス4.3: ステートサイズの決定要因と最適化戦略

ステートのサイズは、パフォーマンス、リソース要件、チェックポイント時間に直接影響します。

ステートサイズを決定する主要因

  • Windowの構成:
    • 期間: Windowの期間が長いほど、保持するデータが増え、ステートは増大します。
    • 種類: ウィンドウが重複するスライディングウィンドウは、タンブリングウィンドウよりも多くのステートを必要とします。
    • 集計方法: ウィンドウ内の全要素をListとして保持するとステートが肥大化します。CombineやReduceを使い、逐次的に集計することで、ステートサイズをほぼ一定に保てます。
  • Windowに依存しないキー付きステート:
    • DoFn内で明示的に管理されるステートは自動的にクリアされません。不要になったステートを明示的に削除するか、State TTL (Time-To-Live) を設定して自動的に期限切れにする戦略が不可欠です。
  • キーのカーディナリティ:
    • Flinkのステートはキーごとに分割されます。ユニークキーの数が、ステート全体のサイズを決定する主要な乗数となります。キーが増え続けるユースケースでは、古いキーをTTLで削除することが長期的な安定運用のために必須です。

セクション5:リソース設定とサイジングによるパフォーマンスの最適化

アプリケーションのニーズに合わせてFlinkのリソースを設定し、スループットを最大化します。

要約

  • 並列度はジョブのスケーリングの主要な手段であり、flink runコマンドで設定します。
  • TaskManagerあたりのタスクスロット数は、通常はCPUコア数に合わせ、リソースを効率的に利用します。
  • Flinkの詳細なメモリモデルを理解し、ワークロードの特性(特にRocksDBの使用有無)に応じて、JVMヒープと管理メモリ(オフヒープ)の割合を調整します。
  • ワークロード分析に基づき、ホストスペックを選定し、並列度とクラスタ台数を体系的に決定します。

プラクティス5.1:並列度とタスクスロットの設定

  • 並列度 (Parallelism): あるオペレーションが同時に実行されるタスクにどれだけ分割されるかを示す度合いです。
  • タスクスロット (Task Slots): 各TaskManagerが提供するリソースの単位で、1つの並列タスクが実行される場所です。クラスタ内の総タスクスロット数が、ジョブの最大可能並列度を決定します。

TaskManager数とスロット数のトレードオフ

構成 特徴
多数のTM、少数のスロット (例: 1スロット/TM) ・最良のリソース分離 (1つのタスク障害が影響する範囲が限定的)
・ネットワークオーバーヘッドが増加する可能性
少数のTM、多数のスロット (例: 8スロット/TM) ・同じTM内のタスクがTCP接続などを共有でき、ネットワークオーバーヘッドを削減
・1つのタスク障害がTM全体をクラッシュさせ、他のタスクも停止する可能性

ジョブの並列度だけでなく、その並列度を物理リソースにどう分散させるかを考慮することが重要です。

並列度の設定コマンド

# ジョブのデフォルト並列度を128に設定
flink run -p 128 ...

プラクティス5.2:FlinkのTaskManagerメモリモデル

Flinkのメモリを正しく設定することは、安定性とパフォーマンスにとって不可欠です。特にRocksDBを使用する場合は、管理メモリの割り当てが重要になります。

TaskManagerメモリモデルの図解

メモリ領域の役割

要素名 説明
JVM Heap Flinkフレームワークとユーザーコード(DoFnなど)が利用するメモリ。GCの対象。
Managed Memory Flinkが制御するオフヒープメモリ。RocksDBのブロックキャッシュやソート処理に使われる。GCの対象外。
Network Memory タスク間のデータ転送(シャッフル)に使われるバッファ。
JVM Metaspace & Overhead ロードされたクラス定義やJVM自体のためのメモリ。

RocksDBを使用する場合、ディスクI/Oを削減するために、メモリの大部分を管理メモリに割り当てることが極めて重要です。

サンプル flink-conf.yaml(RocksDBワークロード向けメモリ設定):

flink-conf.yaml
# TaskManagerのコンテナ/プロセスに割り当てられる総メモリ
taskmanager.memory.process.size: 8192m

# Flinkが管理するメモリ(RocksDBキャッシュ用)にメモリの大部分を割り当てる
# これによりJVMヒープは少なくなるが、状態がオフヒープにあるため問題ない
taskmanager.memory.managed.fraction: 0.6

# ネットワークバッファのサイズ
taskmanager.memory.network.fraction: 0.1

プラクティス5.3: 実践的なクラスタサイジングと設定例

ホストスペックの選定ガイド

スペック 小規模 / 開発 中規模 / 一般的なステートフル処理 大規模 / 高負荷・巨大ステート
CPUコア 2 - 4 コア 8 - 16 コア 16 - 64 コア以上
メモリ (RAM) 8 - 16 GB 32 - 128 GB 128 GB 以上
ディスク 一般的なSSD 高速なSSD NVMe SSD (強く推奨)
ネットワーク 1 Gbps 10 Gbps 10 - 25 Gbps
  • シナリオA: ステートが小さい / CPU負荷が高いワークロード

    • JVMヒープ上で多くのオブジェクトを処理するため、タスクヒープの割合を高く設定します。

    • Hostスペック例: 16コア, 64GB RAM

      flink-conf.yaml 設定例
      # OS用に8GB残し、56GBをTMプロセスに割り当てる
      taskmanager.memory.process.size: 56320m
      
      # JVMヒープの割合を比較的高く設定 (例: 60%)
      taskmanager.memory.task.heap.size: 33792m
      
      # RocksDBを使わないため、管理メモリはソート等で使われる分だけで良い
      taskmanager.memory.managed.fraction: 0.2
      
  • シナリオB: ステートが大きい / I/O負荷が高いワークロード (RocksDB)

    • RocksDBのキャッシュ性能が重要になるため、管理メモリの割合を非常に高く設定します。

    • Hostスペック例: 32コア, 128GB RAM, NVMe SSD

      flink-conf.yaml 設定例
      # OS用に16GB残し、112GBをTMプロセスに割り当てる
      taskmanager.memory.process.size: 114688m
      
      # RocksDBのキャッシュのために管理メモリの割合を非常に高く設定
      taskmanager.memory.managed.fraction: 0.7
      

クラスタ台数の決定プロセス

  1. ワークロード分析: スループット要件、ステートサイズ、ソースのパーティション数を分析します。
  2. 単一TMの設定: ホストスペックとワークロードに基づき、1台のTMあたりのスロット数(通常はCPUコア数)とメモリ設定を決定します。
  3. 並列度の決定: ソースのパーティション数(例: Kafkaトピックのパーティション数)を基準に、ベースとなる並列度を決定します。
  4. クラスタ台数の計算: 必要なTM台数 = 並列度 / (1TMあたりのスロット数) で算出します。
  5. テストと調整: 算出した構成でジョブを実行し、Flink Web UIでバックプレッシャーやチェックポイント時間などを監視し、ボトルネックがあれば設定を調整します。

セクション6:運用準備:ロギングとモニタリング

デプロイしたジョブは、その状態が観測可能でなければなりません。ロギングとモニタリングのベストプラクティスを解説します。

要約

  • アプリケーションコードではSLF4J APIを使い、Flinkのデフォルト実装(Log4j 2)と一貫したロギングを行います。
  • 本番環境では、ディスクを圧迫しないようにログローテーションを設定します。
  • Flink Web UIでリアルタイムのジョブ状態を監視し、本番モニタリングではPrometheusなどの外部システムにメトリクスをエクスポートします。
  • 健全性、スループット、レイテンシ、バックプレッシャー、JVMメトリクスを主要な監視対象とします。

プラクティス6.1:一貫したロギング戦略の実装

FlinkはSLF4Jロギングファサードを使用しています。デバッグには一貫性のあるロギングが不可欠です。

  • SLF4J APIの使用: アプリケーションコードでもSLF4J APIを使用し、Flinkのロギング実装との一貫性を保ちます。
  • 設定ファイル: Flinkのconfディレクトリにあるlog4j.properties(またはlogback.xml)でログレベルや出力先を管理します。
  • ログローテーション: 本番環境では、ログファイルがディスクを使い切ることを防ぐため、サイズや時間に基づいたログローテーションを設定します。

DoFnでのロギング実装例

import org.apache.beam.sdk.transforms.DoFn;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class MyDoFn extends DoFn<String, String> {
    // 各クラスでロガーを静的フィールドとして初期化
    private static final Logger LOG = LoggerFactory.getLogger(MyDoFn.class);

    @ProcessElement
    public void processElement(ProcessContext c) {
        String element = c.element();
        // ログレベルを適切に使い分ける(例:デバッグレベル)
        LOG.debug("Processing element: {}", element);
        try {
            // ... business logic ...
            c.output(element.toUpperCase());
        } catch (Exception e) {
            // エラー発生時はエラーレベルでログを記録
            LOG.error("Failed to process element: {}", element, e);
        }
    }
}

プラクティス6.2:アプリケーションの健全性とパフォーマンスのモニタリング

Flinkは、ジョブの健全性を理解するために豊富な内部メトリクスを提供します。

  • Flink Web UI: リアルタイムでジョブグラフ、データフロー、各オペレータのメトリクスを確認できる主要ツールです。デフォルトではJobManagerのポート8081で実行されます。
  • メトリクスレポーター: 本番モニタリングでは、メトリクスをPrometheusやJMXなどの外部システムにエクスポートします。

監視すべき主要メトリクス

  • 健全性:
    • restartingTime
    • numberOfCompletedCheckpoints
    • numberOfFailedCheckpoints
  • スループット:
    • numRecordsInPerSecond
    • numRecordsOutPerSecond
  • レイテンシ:
    • Flinkのレイテンシ追跡マーカーを使用して測定
  • バックプレッシャー:
    • Flink UIで可視化。高い値はパイプラインのボトルネックを示す
  • JVMメトリクス:
    • ヒープ/非ヒープメモリ使用量
    • GC時間
metrics.reporters: prom
metrics.reporter.prom.class: org.apache.flink.metrics.prometheus.PrometheusReporter
metrics.reporter.prom.port: 9250-9260

セクション7:マルチジョブデプロイメントと共通コードの管理

複数の独立したFlinkジョブ間で共通のロジック(データモデル、ユーティリティ関数など)を管理する方法について解説します。

要約

  • アンチパターン: 共通コードをJARにまとめ、Flinkクラスタのlibフォルダに配置する方法は、バージョニングの競合やデプロイの密結合を引き起こすため、強く非推奨です。
  • ベストプラクティス: 各ジョブが、共通コードを含む全ての依存関係を内包した、自己完結型のFat JARとしてパッケージングされるべきです。これにより、各ジョブの独立したデプロイとライフサイクルが保証されます。
  • Gradleのマルチプロジェクトビルド機能を使えば、このベストプラクティスを効率的に実現できます。

アンチパターン:共通JARをFlinkクラスタのクラスパスに配置する

このアプローチは、以下の深刻な問題を引き起こします。

  • バージョニングの競合("Dependency Hell"): あるジョブが共通ライブラリのv1.0に、別のジョブがv2.0に依存する場合、両方を同時に稼働させることができなくなります。共通ライブラリの更新が、それを利用する全てのジョブの同時更新を強制します。
  • デプロイの密結合: 共通ライブラリの更新が、アプリケーションのデプロイだけでなく、Flinkクラスタ自体の構成変更や再起動を必要とし、各ジョブの独立性が失われます。
  • クラスローディングの予測不能性: Flinkはlibフォルダ内のクラスをアプリケーションJAR内のクラスより優先してロードするため、意図しないバージョンのライブラリが使われ、予期せぬエラーを引き起こします。

ベストプラクティス:各ジョブが自己完結したFat JARを持つ

各ジョブは、自身のコードと、共通処理モジュールを含む全ての依存ライブラリをパッケージングした単一のFat JARを生成します。

  • 独立したデプロイとライフサイクル: 異なるバージョンの共通ライブラリを内包したジョブが、同じクラスタ上で互いに影響なく同時に稼働できます。
  • 依存関係の分離と予測可能性: ローカルでのテスト環境とクラスタ上の実行環境の依存関係が完全に一致するため、環境差異による問題を劇的に減らせます。
  • CI/CDとロールバックの簡素化: 各ジョブのビルドは単一のアーティファクトを生成するだけで完結します。問題発生時も、以前のバージョンのFat JARを再デプロイするだけで安全にロールバックできます。

Gradleマルチプロジェクトビルドによる実装

このベストプラクティスは、Gradleのマルチプロジェクト構成で効率的に実現できます。

プロジェクト構造

my-flink-system/
├── settings.gradle.kts
├── common-library/
│   ├── build.gradle.kts
│   └── src/main/java/...
├── job-a/
│   ├── build.gradle.kts
│   └── src/main/java/...
└── job-b/
    ├── build.gradle.kts
    └── src/main/java/...

settings.gradle.kts

rootProject.name = "my-flink-system"
include("common-library", "job-a", "job-b")

job-a/build.gradle.kts

import com.github.jengelman.gradle.plugins.shadow.tasks.ShadowJar

plugins {
    java
    application
    id("com.gradleup.shadow") version "8.3.0"
}

// ... group, version, repositories...

dependencies {
    // 共通ライブラリモジュールを通常の依存関係として追加
    implementation(project(":common-library"))

    // BeamやFlink Runnerなどの他の依存関係
    implementation(platform("org.apache.beam:beam-sdks-java-google-cloud-platform-bom:2.57.0"))
    implementation("org.apache.beam:beam-sdks-java-core")
    // ...
}

tasks.withType<ShadowJar> {
    // ... shadowJarの設定...
}

この構成により、./gradlew :job-a:shadowJar を実行すると、job-acommon-libraryのコードが単一のFat JARにパッケージングされます。

まとめ

Apache Beam on FlinkアプリケーションをGradleで構築し、本番デプロイを成功させるには、体系的なアプローチが必要です。本レポートで解説したベストプラクティスは、以下の核心的な原則に集約されます。

  • 厳格なバージョン管理:
    プロジェクト初期に、Beam SDK、Flink Runner、Flinkクラスタ間の互換性を公式マトリクスで検証し、BOMで依存関係を一元管理します。

  • 意図的なパッケージングと分離:
    各ジョブを、共通ライブラリを含む自己完結型のFat JARとしてパッケージングします。これにより、ジョブの独立したデプロイとライフサイクルを保証します。

  • 目的に応じた実行環境の選択:
    DirectRunnerによるローカルテストと、flink runによるクラスタデプロイを明確に使い分け、開発サイクルを効率化します。

  • 耐障害性とステート管理の設計:
    チェックポイントを必須機能とし、本番ではRocksDBStateBackendを採用します。ステートサイズを意識した設計で、長期的な安定稼働を目指します。

  • データ駆動型のリソース設計:
    Flinkのクラスタ構成(メモリ、並列度)を、画一的ではなく、ワークロードの特性に応じて体系的に設計・調整します。

  • 観測可能性の確保:
    一貫したロギング戦略と、健全性やスループットを網羅するメトリクス監視を実装し、安定運用と迅速な問題解決を可能にします。

これらのプラクティスを遵守することで、Beamの抽象化とFlinkの強力な実行能力を最大限に引き出し、真に本番環境グレードのデータ処理基盤を構築できるようになります。この記事が、あなたの開発の一助になれば幸いです。

この記事が少しでも参考になった、あるいは改善点などがあれば、ぜひリアクションやコメント、SNSでのシェアをいただけると励みになります!


関連リンク

https://zenn.dev/suwash/articles/apache_beam_20250522

https://zenn.dev/suwash/articles/apache_flink_20250522

https://zenn.dev/suwash/articles/apache_beam_flink_20250523

https://zenn.dev/suwash/articles/beam_java_best_practice_20250917

これらの記事を読み込ませたNotebookLMを公開しています

https://notebooklm.google.com/notebook/bf316077-e909-40c5-89c7-2f5a841439eb


参考リンク

Discussion