⚙️

技術調査 - Flink Kubernetes OperatorとArgoCDによるGitOps

に公開

■はじめに

現代のデータ処理環境では、リアルタイム性とスケーラビリティへの要求がますます高まっています。Apache Beam、Apache Flink、Kubernetes (k8s)、そしてArgoCDは、各技術領域で強力な機能を提供します。

  • データ処理パイプライン定義: Apache Beam
  • 分散ストリーム処理: Apache Flink
  • コンテナオーケストレーション: Kubernetes
  • GitOpsによる継続的デリバリー: ArgoCD

これらの技術、特にApache Flink Kubernetes OperatorとArgoCDを連携させると、データ処理基盤の構築と運用を大きく変革できます。

●コアテクノロジーの概要

  • Apache Beam: バッチ処理とストリーム処理の統一プログラミングモデル
  • Apache Flink: 高スループット、低レイテンシなステートフルコンピューティングを実現する分散処理エンジン
  • Kubernetes (k8s): コンテナ化アプリケーションのデプロイ、スケーリング、管理を自動化するシステム
  • ArgoCD: Kubernetesネイティブな宣言型のGitOps継続的デリバリーツール

Apache Flink Kubernetes Operatorは、Kubernetes上でFlinkアプリケーションのライフサイクル管理を自動化するツールです。一方、ArgoCDはGitリポジトリを信頼できる唯一の情報源 (Single Source of Truth) とし、Kubernetesクラスタの状態をGitリポジトリの記述と同期させるGitOpsを実現します。

これらの連携は、Flinkアプリケーションの定義(FlinkDeploymentカスタムリソース)、設定、バージョン管理をGitで行い、ArgoCDを通じてKubernetesクラスタへ自動的にデプロイ・更新することを可能にします。この組み合わせは、データパイプラインの運用に大きな利点をもたらします。

  • 宣言的な設定管理: Flinkアプリケーションの望ましい状態をGitで宣言的に管理
  • 自動化されたデプロイメント: Gitへの変更をトリガーに、ArgoCDが自動的にデプロイし同期
  • バージョン管理と監査証跡: Gitの履歴で全ての変更を追跡し監査
  • ロールバックの容易性: Gitのコミットを戻すことで迅速にロールバック
  • 開発者エクスペリエンスの向上: 開発者はインフラ詳細を意識せず、アプリケーションロジックとGitワークフローに集中可能

■コアテクノロジーの詳細

●Apache Beam

https://zenn.dev/suwash/articles/apache_beam_20250522

https://zenn.dev/suwash/articles/apache_flink_20250522

https://zenn.dev/suwash/articles/apache_beam_flink_20250523

●Kubernetes (k8s)

https://zenn.dev/suwash/articles/apache_beam_flink_k8s_20250526

●ArgoCD

ArgoCDは、Kubernetesネイティブな宣言型のGitOps継続的デリバリー(CD)ツールです。Gitリポジトリを信頼できる唯一の情報源(Single Source of Truth)として使用し、Kubernetesリソースの望ましい状態を定義します。ArgoCDは、アプリケーションの定義、設定、環境を宣言的かつバージョン管理された状態に保ちます。また、アプリケーションのデプロイとライフサイクル管理を自動化し、監査できる理解しやすいものにします。

主要な機能とコンポーネント

  • GitOpsワークフロー: Gitリポジトリ内のマニフェスト(プレーンなYAML/JSON、Helmチャート、Kustomizeなど)に基づきアプリケーションをデプロイ・同期
  • 自動同期: Gitリポジトリの変更を検知し、クラスタの状態を自動的に同期。手動同期も可能
  • 状態の可視化とドリフト検出: Web UIとCLIを通じて、アプリケーションのデプロイ状況、同期状態、設定のドリフト(Gitの状態とライブ状態の差異)を可視化
  • ロールバック: Gitのコミット履歴を利用して、以前の安定したバージョンへ簡単にロールバック
  • マルチクラスタ管理: 複数のKubernetesクラスタへのアプリケーションデプロイを管理
  • カスタムリソース定義 (CRD): ArgoCD自体がKubernetesコントローラとして動作し、ApplicationやAppProjectといったCRDを使用して設定を管理

コンポーネントの構成

要素名 説明
Web UI ArgoCDのWebインターフェース
CLI ArgoCDのコマンドラインインターフェース
CI/CDシステム 継続的インテグレーション/継続的デリバリーシステム
Git Webhook Gitリポジトリからのイベント通知 (例: pushイベント)
APIサーバー (argocd-server) Web UI、CLI、CI/CDシステムからのAPIリクエストを処理するgRPC/RESTサーバー。アプリケーション管理、状態報告、操作の呼び出し、リポジトリとクラスタの認証情報管理、認証・認可、Git Webhookイベントのリスナーなどを担当
リポジトリサーバー (argocd-repo-server) Gitリポジトリのローカルキャッシュを維持し、マニフェストを生成する内部サービス
アプリケーションコントローラ (argocd-application-controller) 実行中のアプリケーションを継続的に監視し、現在のライブ状態とGitリポジトリで指定された望ましいターゲット状態を比較。OutOfSync状態を検出し、必要に応じて修正措置を実行
ApplicationSetコントローラ ApplicationSet CRDを調整し、複数のArgoCD Applicationを自動生成・管理
Gitリポジトリ アプリケーションのマニフェストや設定が保存されるバージョン管理システム
Kubernetes API Kubernetesクラスタを操作するためのAPI

Apache Flink Kubernetes Operatorは、Kubernetes上でApache Flinkアプリケーションのデプロイとライフサイクル全体を管理するコントロールプレーンとして機能します。このOperatorはJavaで実装されており、kubectlのようなネイティブなKubernetesツールを通じてFlinkアプリケーションとそのライフサイクルを管理できます。

●役割と機能

Flink Kubernetes Operatorの主な責務は、Flinkアプリケーションの完全な本番ライフサイクルを管理することです。これには以下の機能が含まれます。

  • アプリケーションの操作: Flinkアプリケーションの実行、一時停止、削除
  • アプリケーションのアップグレード: ステートフルおよびステートレスなアプリケーションのアップグレード
  • セーブポイントの管理: セーブポイントのトリガーと管理。アップグレードやバックアップに不可欠
  • エラー処理とロールバック: 障害発生時のエラー処理や、問題のあるアップグレードからの自動ロールバック

Operatorは、人間のオペレーターがFlinkデプロイメントを管理する際の知識と責任をコード化したものです。これにより、Flinkクラスタの起動、ジョブのデプロイ、アップグレード、問題発生時の対応といった運用タスクを自動化します。

●アーキテクチャと主要コンポーネント

Flink Kubernetes Operatorは、KubernetesのOperatorパターンに従って構築されています。Operatorは、Kubernetes APIを拡張するカスタムリソース(CR)を定義し、これらのCRの状態を監視・調整するコントローラを実行します。

主要コンポーネントと概念

  • カスタムリソース (CR): Operatorは主に2種類のCRを定義します。
    • FlinkDeployment: Flinkアプリケーションクラスタ(ジョブごとに独立したクラスタ)またはFlinkセッションクラスタ(複数のジョブをホストする共有クラスタ)のデプロイメントを定義
    • FlinkSessionJob: 既存のFlinkセッションクラスタ上で実行される個々のFlinkジョブを定義
  • コントローラ: Operatorの中心的なコンポーネント。FlinkDeploymentFlinkSessionJob CRの変更を監視。ユーザーがCRを送信すると、Operatorは現在のクラスタ状態を観測し、CRで定義された望ましい状態と一致するように必要なKubernetesリソース(JobManager Pod、TaskManager Pod、ConfigMap、Serviceなど)を作成・更新・削除。このプロセスはリコンサイルループと呼ばれる
  • デプロイメントモード:
    • Nativeモード: FlinkがKubernetes APIと直接通信し、TaskManager Podなどのリソースを自身で調整。高度なユーザーや既存の管理システムとの統合に適している
    • Standaloneモード: Kubernetesを単なるオーケストレーションプラットフォームとして使用し、FlinkはKubernetesを意識しない。Operatorが全てのリソースを管理
  • Java Operator SDK: OperatorはJava Operator SDKで構築

ユーザーがFlinkDeployment CRをkubectl applyで送信すると、Operatorは以下のステップで動作します。

  1. OperatorがCRの変更を検知します。
  2. Operatorは(以前にデプロイされていれば)Flinkリソースの現在の状態を観測します。
  3. Operatorは送信されたリソース変更を検証します。
  4. Operatorは必要な変更を調整し、アップグレードなどを実行します。

●FlinkDeploymentとFlinkSessionJob

FlinkDeployment CRは、Flinkアプリケーションのデプロイメント全体を定義します。specセクションには、使用するFlinkイメージ、Flinkバージョン、Flink設定(flinkConfiguration)、JobManagerとTaskManagerのリソース要求、そしてアプリケーションモードの場合はジョブ固有の設定(JAR URI、並列度、アップグレードモードなど)が含まれます。

FlinkSessionJob CRは、事前にFlinkDeploymentで作成されたセッションクラスタ上で動作するジョブを定義します。これにはジョブのJAR URIや並列度などが含まれます。

Flink Kubernetes Operatorの特徴

特徴 説明
完全自動化されたジョブライフサイクル管理 アプリケーションの実行、一時停止、削除、ステートフル/ステートレスアップグレード、セーブポイント管理、エラー処理、ロールバックを自動化
多様なデプロイメントモード アプリケーションクラスタ、セッションクラスタ、セッションジョブをサポート
複数Flinkバージョンサポート 異なるFlinkバージョンに対応(例: v1.16~v1.20、プレビュー版のv2.0もサポート)
設定管理の柔軟性 デフォルト設定、ジョブごとの設定、環境変数、PodテンプレートによるPodのカスタマイズ
ジョブオートスケーラー ラグや使用率メトリクスに基づいてジョブの並列度を動的に調整
スナップショット管理 Kubernetes CRを介したセーブポイント/チェックポイントの管理
監視とロギングの統合 Flinkメトリックシステムとの連携、プラガブルなメトリクスレポーター、カスタマイズ可能なロギング
Helmによるインストール Helmチャートを使用した容易なOperatorのインストールと設定

Flink Kubernetes Operatorは、FlinkアプリケーションをKubernetes上で実行するための標準的かつ強力な方法を提供します。これにより、Flinkの運用を大幅に簡素化し、開発者がアプリケーションロジックの開発に集中できるようにします。このOperatorの存在は、ArgoCDのようなGitOpsツールとの連携を自然なものにしてくれます。

GitOpsは、インフラストラクチャとアプリケーションのデプロイメントをGitを通じて管理・自動化する手法です。バージョン管理されたGitリポジトリを信頼できる唯一の情報源(Single Source of Truth)として扱います。ArgoCDは、このGitOpsパターンをKubernetes環境で実現するための主要なツールの一つです。Flink Kubernetes Operatorによって管理されるFlinkアプリケーションも、ArgoCDを用いることでGitOpsワークフローに統合できます。

●FlinkDeploymentカスタムリソース定義(CRD)の管理

Flink Kubernetes Operatorは、FlinkDeploymentというカスタムリソース定義(CRD)を導入します。これを通じてFlinkアプリケーションクラスタやセッションクラスタを宣言的に定義します。GitOpsアプローチでは、このFlinkDeploymentのYAMLマニフェストをGitリポジトリで管理します。

主要なspecフィールドとそのGitOps管理

FlinkDeployment CRDのspecセクションには、Flinkクラスタとジョブの望ましい状態を定義するための多数のフィールドが含まれています。これらはGitでバージョン管理され、ArgoCDによってクラスタに適用されます。

フィールドパス 説明・目的 GitOps管理値の例
spec.image Flinkコンテナイメージ app.image.repository: my-repo/my-flink-app, app.image.tag: 1.2.3
spec.flinkVersion Flinkバージョン app.flinkVersion: v1_20
spec.flinkConfiguration Flink設定 (flink-conf.yaml) のオーバーライド app.flinkConfiguration: { "taskmanager.numberOfTaskSlots": "4", "state.backend": "rocksdb" }
spec.jobManager.resource JobManagerのリソース要求/制限 app.jobManager.resources: { "memory": "4096m", "cpu": 2 }
spec.taskManager.resource TaskManagerのリソース要求/制限 app.taskManager.resources: { "memory": "8192m", "cpu": 4 }
spec.job.jarURI ジョブJARのURI app.job.jarURI: local:///opt/flink/usrlib/my-job.jar
spec.job.parallelism ジョブの並列度 app.job.parallelism: 8
spec.job.entryClass ジョブのエントリークラス app.job.entryClass: com.example.MyFlinkJob
spec.job.args ジョブ引数 app.job.args: []
spec.job.state ジョブの望ましい状態 app.job.state: running
spec.job.upgradeMode アップグレードモード app.job.upgradeMode: savepoint
spec.podTemplate JobManager/TaskManagerのPodテンプレート app.podTemplate: (YAML構造でボリュームマウント等を定義)

これらのフィールドをGitで管理することにより、Flinkアプリケーションのデプロイメントを宣言的かつ再現可能にします。ArgoCDはGitリポジトリの変更を監視し、クラスタの状態を自動的に同期するため、運用負荷を大幅に軽減します。

●FlinkアプリケーションのためのHelmチャート

HelmはKubernetesアプリケーションのパッケージ管理ツールです。Flinkアプリケーション(FlinkDeployment CRDを含む)をHelmチャートとしてパッケージ化することは、GitOpsワークフローでの一般的なベストプラクティスです。

Helmチャートの構造

一般的なFlinkアプリケーションのHelmチャートは以下の構造を持ちます。

  • Chart.yaml: チャート名、バージョン、説明などのメタデータ
  • values.yaml: デフォルトの設定値。FlinkDeploymentspecフィールド(イメージ、リソース、Flink設定、ジョブ設定など)の多くをここでパラメータ化
  • templates/: Kubernetesマニフェストのテンプレートファイル群
    • flinkdeployment.yaml: FlinkDeployment CRDのテンプレート。values.yamlの値を使用して動的に生成
    • その他、必要に応じてConfigMap(例: カスタムlog4j.properties用)、ServiceAccountなどのテンプレート
  • crds/: (Operator自体のチャートの場合)FlinkDeploymentなどのCRD定義ファイル。アプリケーションチャートの場合は通常不要で、CRDはクラスタに別途インストールされている前提

FlinkDeploymentのテンプレート化

templates/flinkdeployment.yaml内では、Goテンプレート言語を使用してFlinkDeploymentリソースを定義します。values.yamlからの値は.Valuesオブジェクトを通じてアクセスされます。

例 (templates/flinkdeployment.yamlの抜粋):

apiVersion: flink.apache.org/v1beta1
kind: FlinkDeployment
metadata:
  name: {{ .Release.Name }}
  namespace: {{ .Release.Namespace }}
spec:
  image: {{ .Values.app.image.repository }}:{{ .Values.app.image.tag }}
  flinkVersion: {{ .Values.app.flinkVersion }}
  flinkConfiguration:
    {{- toYaml .Values.app.flinkConfiguration | nindent 4 }}
  jobManager:
    resource:
      memory: {{ .Values.app.jobManager.resources.memory }}
      cpu: {{ .Values.app.jobManager.resources.cpu }}
  taskManager:
    resource:
      memory: {{ .Values.app.taskManager.resources.memory }}
      cpu: {{ .Values.app.taskManager.resources.cpu }}
  job:
    jarURI: {{ .Values.app.job.jarURI }}
    parallelism: {{ .Values.app.job.parallelism }}
    entryClass: {{ .Values.app.job.entryClass }}
    args:
      {{- toYaml .Values.app.job.args | nindent 6 }}
    upgradeMode: {{ .Values.app.job.upgradeMode }}
    state: {{ .Values.app.job.state }}
  {{- if .Values.app.job.pythonEntryPoint }}
    pythonEntryPoint: {{ .Values.app.job.pythonEntryPoint }}
  {{- end }}
  #...その他の設定...

Wikimediaが提供するflink-app Helmチャートは、flink-kubernetes-operatorFlinkDeployment CRDを使用してFlink Native Kubernetes Applicationクラスタをデプロイします。このチャートでは、values.yamlを通じてイメージ、ジョブ仕様(jarURIentryClassargs、Pythonジョブ用のpythonEntryPointなど)、リソース、Flink設定、さらにはアプリケーション設定ファイルをConfigMapとしてマウントする方法などを設定できます。例えば、app.config_filesセクションで定義された設定は、コンテナ内の指定されたパスにファイルとしてマウントされます。

Helmチャートを利用することで、Flinkアプリケーションの構成を標準化し、異なる環境(開発、ステージング、本番)へのデプロイを容易にし、設定の再利用性を高めることができます。これはGitOpsの原則とよくマッチします。

●ArgoCD Applicationマニフェスト

ArgoCDは、ApplicationというCRDを通じて、デプロイするアプリケーションを定義します。このマニフェストは、Gitリポジトリの場所、追跡するリビジョン(ブランチ、タグ、コミットSHA)、マニフェストのパス、デプロイ先のクラスタと名前空間、同期ポリシーなどを指定します。

HelmチャートをGitリポジトリからデプロイする場合のArgoCD Applicationマニフェスト例

Gitリポジトリに格納されたHelmチャート(Flinkアプリケーションを定義)をArgoCDでデプロイする場合

apiVersion: argoproj.io/v1alpha1
kind: Application
metadata:
  name: my-flink-app
  namespace: argocd # ArgoCDがインストールされている名前空間
spec:
  project: default # ArgoCDプロジェクト
  source:
    repoURL: 'https://github.com/my-org/my-flink-app-config.git' # FlinkアプリのHelmチャートを含むGitリポジトリ
    path: 'charts/my-flink-job' # Gitリポジトリ内のHelmチャートへのパス
    targetRevision: 'main' # 追跡するブランチ、タグ、またはコミットSHA
    helm:
      valueFiles: # Helmチャートのvalues.yamlファイルを指定
        - 'values.yaml' # チャートのデフォルト値
        - 'envs/prod-values.yaml' # 環境固有のオーバーライド値 (例: 本番環境用)
      parameters: # 個別のHelmパラメータを直接オーバーライド
        - name: 'image.tag'
          value: '1.2.3' # 特定のイメージタグを指定
        - name: 'job.parallelism'
          value: '4' # ジョブの並列度をオーバーライド
      # releaseName: my-flink-release # 必要に応じてHelmリリース名を指定 (デフォルトはArgoCDアプリ名)
  destination:
    server: 'https://kubernetes.default.svc' # デプロイ先のKubernetes APIサーバー (同一クラスタの場合)
    namespace: 'flink-production' # デプロイ先の名前空間
  syncPolicy:
    automated: # 自動同期を有効化
      prune: true # Gitに存在しないリソースをクラスタから削除
      selfHeal: true # クラスタでの手動変更をGitの状態に自動修正
    syncOptions:
      - CreateNamespace=true # ターゲット名前空間が存在しない場合に自動作成

ArgoCD Applicationマニフェスト

specフィールド 説明
project アプリケーションが属するArgoCDプロジェクト default
source.repoURL マニフェスト(Helmチャート)が格納されているGitリポジトリのURL https://github.com/my-org/my-flink-app-config.git
source.path repoURL内のHelmチャートディレクトリへのパス charts/my-flink-job
source.targetRevision 同期するGitリビジョン(ブランチ、タグ、コミットSHA) main または v1.0.0
source.helm.valueFiles Helmチャートに適用する値ファイルのリスト(pathからの相対パス) ['values.yaml', 'envs/prod-values.yaml']
source.helm.parameters Helm値を直接オーバーライドするためのパラメータリスト [{name: 'image.tag', value: '1.2.3'}]
source.helm.releaseName Helmリリース名(オプション、デフォルトはArgoCDアプリ名) my-flink-release
destination.server デプロイ先のKubernetesクラスタAPIサーバーURL https://kubernetes.default.svc (同一クラスタ)
destination.namespace デプロイ先の名前空間 flink-production
syncPolicy.automated 自動同期設定。pruneで不要リソース削除、selfHealで手動変更の自動修正 {prune: true, selfHeal: true}
syncPolicy.syncOptions 同期時の追加オプション ['CreateNamespace=true']

ArgoCDがプレーンなFlinkDeployment YAMLファイルを直接デプロイする場合(Helmを使用しない場合)、spec.source.pathFlinkDeployment CRDを含むYAMLマニフェストが置かれたディレクトリを指し、spec.source.directoryフィールドを使用して再帰的な検索などを設定できます。

●FlinkジョブJARと設定の管理

JARファイルの管理戦略

  1. カスタムDockerイメージへの焼き付け(推奨):
    • FlinkジョブのJARファイルをカスタムDockerイメージに含め、そのイメージをコンテナレジストリにプッシュします。
    • FlinkDeploymentspec.imageでこのカスタムイメージを指定し、spec.job.jarURIではlocal:///path/to/job.jarのようにコンテナ内のパスを指定します。
    • イメージのタグはGit(例: Helm values.yamlやArgoCD Applicationパラメータ)でバージョン管理されます。
    • このアプローチは、不変性とバージョン管理の観点から最も堅牢です。ジョブコードと依存関係をイメージ内にカプセル化するため、実行環境の一貫性を保証し、ロールバックもイメージタグの変更とArgoCDの同期によって容易に行えます。
  2. リモートURI(S3、GCSなど)の参照:
    • spec.job.jarURIs3://...gs://...のようなリモートストレージ上のJARファイルのURIを指定します。
    • Flinkイメージには適切なファイルシステムコネクタが含まれている必要があり、IAMロールなどでアクセス権限を付与する必要があります。
    • URI自体をGitでバージョン管理します。
    • リモートストレージの可用性や、URIが指すアーティファクトの変更可能性といった点で、イメージ焼き付け方式に比べて複雑さが増す可能性があります。
  3. InitContainerによるJARダウンロード:
    • Podテンプレートを使用してInitContainerを定義し、ジョブ実行前にJARファイルをダウンロードして共有ボリュームに配置する方法も考えられます(主にFlinkジョブクラスタ向け)。

flink-conf.yaml設定の管理

  • FlinkDeployment spec内での直接指定: spec.flinkConfigurationマップに必要な設定キーと値を記述します。
  • Helm values.yaml経由: values.yamlでパラメータ化し、Helmテンプレートを通じてspec.flinkConfigurationに注入します。これが最も一般的で柔軟な方法です。
  • ConfigMapマウント: Flink Kubernetes Operatorは、標準的なFlinkのConfigMapの場所(例: /opt/flink/conf)にマウントされたflink-conf.yamlを認識できます。PodテンプレートでConfigMapをボリュームとしてマウントし、flinkConfigurationでそのパスを指定するか、Operatorが自動的に検出するようにします。Wikimediaのflink-appチャートでは、app.config_filesを通じてアプリケーション設定ファイルをConfigMapとしてマウントし、コンテナ内で利用可能にする例があります。

Log4j / ロギング設定の管理

  • FlinkDeploymentspec.logConfigurationフィールドを使用して、log4j-operator.propertieslogback-operator.xmlなどのロギング設定ファイルの内容を直接指定できます。
  • これもHelm values.yamlを通じてテンプレート化できます。

バージョン管理とロールバック

イメージタグ、JAR URI、Flink設定(flinkConfigurationlogConfigurationの内容)は全てGitリポジトリ(通常はHelm values.yaml内、またはHelmを使用しない場合はFlinkDeployment YAML内)に保存されます。ArgoCDはこれらのバージョン管理された設定をクラスタに適用します。ロールバックが必要な場合は、Gitのコミットを元に戻し、ArgoCDに同期させることで実現されます。

ジョブJARをカスタムDockerイメージに焼き付け、そのイメージをバージョン管理し、Git(ArgoCDによって管理されるHelm values経由)でイメージタグを参照する方法は、Flinkジョブアーティファクトを管理するための最も堅牢でGitOpsに適したアプローチです。これにより、Flinkジョブコードを他のアプリケーションアーティファクトと同様に扱い、ビルドの再現性とデプロイメントの信頼性を向上させます。これは本番システムにとって不可欠です。テキストベースであるFlinkの設定は、自然にGitのバージョン管理に適合します。

Helmチャートを利用した場合の構成イメージ

■デプロイメントワークフローとケーススタディ

●FlinkアプリケーションのステップバイステップGitOpsワークフロー

  1. 開発者ワークフロー:
    1. Apache BeamまたはFlinkジョブのコードを開発・更新します。
    2. ジョブJARファイルをビルドします。
    3. (JARをイメージに焼き付ける場合)JARファイルを含む新しいDockerイメージをビルドし、コンテナレジストリにプッシュします。
    4. Gitリポジトリを更新します:
      • FlinkDeploymentのYAMLマニフェスト、またはそれを生成するHelmチャートのvalues.yamlを修正します(例: 新しいイメージタグ、更新されたjarURI、並列度、Flink設定など)。
      • 変更を指定されたGitリポジトリのブランチにコミットします。
      • レビューと承認のためにプルリクエスト(またはマージリクエスト)を作成します(GitOpsのベストプラクティス)。
  2. ArgoCDワークフロー:
    1. ArgoCDがGitリポジトリの変更を(Webhookまたはポーリングにより)検出します。
    2. ArgoCDはGitリポジトリの望ましい状態とKubernetesクラスタの現在のライブ状態を比較します。
    3. 状態が同期していない場合、ArgoCDはsyncPolicyに基づいて以下を実行します:
      • 変更を自動的に同期します。
      • 「OutOfSync」状態を表示し、手動同期を待ちます。
    4. ArgoCDは更新されたFlinkDeployment CRD(またはそれをレンダリングするHelmチャート)をKubernetesクラスタに適用します。
  3. Flink Kubernetes Operatorワークフロー:
    1. Flink Kubernetes OperatorがFlinkDeployment CRDの作成または変更を検出します。
    2. Operatorは新しい仕様に基づいてFlinkアプリケーションを調整(リコンサイル)します:
      • ステートフルアップグレードの場合、セーブポイントをトリガーすることがあります。
      • 新しいイメージや設定でJobManager/TaskManager Podをロールアウトします。
      • ジョブをサブミットします。
  4. 検証:
    • Flink UI、Kubernetesのログ(JobManager、TaskManager、OperatorのPodログ)、ArgoCD UIを通じてFlinkジョブのステータスを監視します。

このワークフローにより、Flinkアプリケーションの変更からデプロイまでを一貫して自動化し、人的ミスを削減し、デプロイ速度を向上させます。

●ケーススタディ: Segment社の移行とアーキテクチャ

Twilio Segment社は、リアルタイム計算プラットフォームを従来のAmazon EMRベースのFlinkセットアップから、Kubernetesベースのアーキテクチャに移行しました。この移行は、運用上の課題解決と開発者エクスペリエンス向上を目的としていました。

https://segment.com/blog/revamping-segments-flink-real-time-compute-platform/

EMRでの課題

  • FlinkバージョンがEMRバージョンに束縛され、カスタムFlinkバージョンのネイティブサポートがない
  • EMRノードのハードウェア障害時にアプリケーションの自動回復がない
  • アプリケーションのライフサイクル更新(停止/再起動)やJAR更新が手動で複雑(EMRノードへのSSH、YARNコマンド実行、手動ファイルコピーなど)
  • EMRサーチャージによる追加コスト(年間約25万ドル)

Flink on Kubernetes with ArgoCDでの構成

  • インフラストラクチャ: Terraformを使用してAWS EKSクラスタ、EC2ノードプール、Flinkステート用S3バケット、IAMロールなどをセットアップ
  • デプロイメント: ArgoCDとHelmを使用してFlink Kubernetes OperatorおよびFlinkアプリケーションをEKSにデプロイ。ユーザーがカスタマイズ可能なHelmテンプレートを提供
  • 分離: 各Flinkアプリケーションは専用のEKSノードプール、名前空間、サービスアカウントで実行され、分離性を向上
  • 管理: 単一のFlink Kubernetes Operatorが単一EKSクラスタ内の複数アプリケーションを管理
  • 可観測性: Flink TaskManager、JobManager、Operator PodをGrafana Loki(ログ)およびDataDog(メトリクス)と統合

達成されたメリット

  • 運用オーバーヘッドの削減: 高可用で信頼性の高いセットアップにより、Flinkアプリケーションの維持に必要な運用作業が大幅に削減
  • 開発者エクスペリエンスの向上: 開発者は数行のYAML記述と自動化により、数分で新しいFlinkアプリケーションをシームレスにカスタマイズ・デプロイ可能に
  • コスト削減: AWS EMRサーチャージの排除により、インフラコストが年間約25万ドル削減

移行中の課題と対応

  • Segment EKSインフラのGravitonインスタンス対応
  • Flink Kubernetes Operatorに関するArgoCDヘルスチェックの更新
  • Flinkチェックポイント性能の改善
    • S3ファイルシステムプラグインをflink-s3-fs-hadoopからflink-s3-fs-prestoへ切り替え
    • S3バケットのパーティショニング など
  • Flinkクエリアブルステートサービスを公開するためのKubernetes Ingressサポート追加

■運用のベストプラクティスと考慮事項

Flink Kubernetes OperatorとArgoCDを組み合わせたシステムを効果的に運用するためには、いくつかのベストプラクティスと考慮事項があります。

●ヘルスチェック: FlinkDeploymentのためのカスタムArgoCDヘルスチェックの設定

課題
ArgoCDのデフォルトのヘルスチェックは、FlinkDeployment CRの状態を正確に反映しない場合があります。Flinkジョブが正常に実行されていても、ArgoCD UIでは「Progressing(進行中)」と表示され続けることがあります。これは、ArgoCDがOperatorによって管理されるFlinkジョブの複雑なライフサイクル状態を本質的に理解していないためです。

解決策
argocd-cm ConfigMapにカスタムLuaヘルスチェックスクリプトを定義することで、ArgoCDがFlinkDeploymentのヘルスステータスをより正確に判断できるようにします。

Luaスクリプトのロジック
スクリプトは、FlinkDeploymentオブジェクトのstatus.lifecycleState(例: STABLE, SUSPENDED, FAILED, DEPLOYED)とstatus.jobStatus.state(例: RUNNING, FINISHED)を検査します。これらの状態に基づいて、ArgoCDのヘルスステータス(Healthy, Progressing, Degraded, Suspended)と適切なメッセージを返します。

カスタムヘルスチェックLuaスクリプトの例:

hs = {} -- ヘルスステータスを格納するテーブルを初期化
-- status または jobStatus フィールドが存在しない場合、Operatorがまだ処理中である可能性がある
if obj.status == nil or obj.status.jobStatus == nil then
  hs.status = "Progressing"
  hs.message = "Waiting for Flink operator"
  return hs
end

-- 安定しており、ジョブが実行中の場合、Healthy
if obj.status.lifecycleState == "STABLE" and obj.status.jobStatus.state == "RUNNING" then
  hs.status = "Healthy"
  hs.message = "Flink deployment is running"
  return hs
end

-- 一時停止しており、ジョブが終了した場合、Suspended
if obj.status.lifecycleState == "SUSPENDED" and obj.status.jobStatus.state == "FINISHED" then
  hs.status = "Suspended"
  hs.message = "Flink deployment has been suspended or all jobs have finished running"
  return hs
end

-- ライフサイクル状態が FAILED の場合、Degraded
if obj.status.lifecycleState == "FAILED" then
  hs.status = "Degraded"
  hs.message = obj.status.error -- エラーメッセージをステータスから取得
  return hs
end

-- Flink Operator 1.5.0 の場合
-- reconciliationStatus.state が存在し、DEPLOYED の場合に Healthy とする
if obj.status.reconciliationStatus ~= nil and obj.status.reconciliationStatus.state == "DEPLOYED" then
    hs.status = "Healthy"
    hs.message = "Flink deployment is deployed and reconciled"
    return hs
end

-- 上記のいずれにも該当しない場合は Progressing とし、現在の状態を表示
hs.status = "Progressing"
hs.message = "Current state: lifecycleState=".. obj.status.lifecycleState.. ", jobStatus.state=".. obj.status.jobStatus.state
if obj.status.reconciliationStatus ~= nil and obj.status.reconciliationStatus.state ~= nil then
    hs.message = hs.message.. ", reconciliationState=".. obj.status.reconciliationStatus.state
end
return hs

表5: ArgoCDカスタムヘルスチェック (FlinkDeployment用) - Luaスクリプトのロジック

FlinkDeploymentステータス条件 ArgoCDヘルスステータス ArgoCDメッセージ
obj.status == nil または obj.status.jobStatus == nil Progressing Waiting for Flink operator
obj.status.lifecycleState == "STABLE" かつ obj.status.jobStatus.state == "RUNNING" Healthy Flink deployment is running
obj.status.lifecycleState == "SUSPENDED" かつ obj.status.jobStatus.state == "FINISHED" Suspended Flink deployment has been suspended or all jobs have finished running
obj.status.lifecycleState == "FAILED" Degraded obj.status.error からのエラーメッセージ
obj.status.reconciliationStatus.state == "DEPLOYED" (Flink Operator 1.5.0向け) Healthy Flink deployment is deployed and reconciled
上記以外 Progressing 現在のlifecycleStatejobStatus.stateを示すメッセージ

影響
このカスタムヘルスチェックにより、ArgoCD UIでFlinkアプリケーションのヘルスステータスが正確に表示され、信頼性の高い自動化アクションとオペレーターによる状況把握が可能になります。これは、GitOpsループを効果的に機能させる上で非常に重要です。なぜなら、ArgoCDがFlinkアプリケーションが正常に目的の状態に達したかどうかを正確に判断できなければ、自動化されたプロセスやオペレーターの信頼性が損なわれるからです。

●監視とロギング

  • Flinkメトリクス: JobManager、TaskManager、ジョブ固有のメトリクスなど、Flink自身のメトリックシステムを活用
  • Operatorメトリクス: Flink Kubernetes Operatorも独自の運用メトリクスを公開
  • Kubernetesメトリクス: Podやコンテナのリソース使用状況、Kubeletメトリクスなどを監視
  • 統合: Prometheusでメトリクスを収集し、Grafanaでダッシュボードを構築するのが一般的。Segment社はDataDogとGrafana Lokiを統合
  • ロギング: FlinkのLog4j設定をカスタマイズし、FluentdやFluent Bitのようなサイドカーコンテナを使用してログを一元的なロギングシステム(例: Elasticsearch、Loki)に転送

●セキュリティ

  • Kubernetes RBAC: 以下のエンティティに対してRole、ClusterRole、RoleBinding、ClusterRoleBindingを適切に定義します。
    • Flink Kubernetes Operator: Kubernetesリソース(Deployment、Service、ConfigMap、CRDなど)を管理するための権限。通常、OperatorのHelmチャートがこれらをセットアップします。
    • Flink JobManager/TaskManager Pod: サービスアカウント(例: flinkサービスアカウント)。Native Kubernetesモードを使用する場合、TaskManager Podを作成・削除する権限などが必要です。
    • ArgoCD: ターゲット名前空間のリソースを管理するための権限。設定によってはクラスタ全体への権限が必要になる場合もあります。
  • ArgoCDセキュリティ:
    • リポジトリ認証情報(SSHキー、HTTPSトークンなど)をKubernetes Secretとして安全に管理します。
    • ArgoCD内のRBAC(policy.csv)を使用して、ユーザーやプロジェクトのアプリケーションおよびクラスタへのアクセスを制御します。
    • SSO(シングルサインオン)連携。
  • シークレット管理: Flinkジョブが外部システム(Kafka認証情報、データベースパスワードなど)にアクセスするために必要なシークレットは、Kubernetes Secretとして保存し、Podテンプレート経由でFlink Podにマウントします。ArgoCDはGitで定義されたこれらのSecretを管理できますが、Gitフレンドリーなシークレット管理のためにはSealed SecretsやExternalSecrets Operatorのようなツールの利用を検討します。

●スケーラビリティ

  • Flinkジョブ並列度: FlinkDeployment.spec.job.parallelismで設定
  • TaskManagerスケーリング: Flink Operatorは、ジョブの並列度とtaskmanager.numberOfTaskSlotsに基づいてTaskManagerのレプリカ数を調整
  • Flinkジョブオートスケーラー: Flink Kubernetes Operatorは、ラグや使用率などのメトリクスに基づいてジョブの並列度を動的に調整するオートスケーラーモジュールを装備。これはリソース使用量とパフォーマンスを最適化するための重要な機能であり、job.autoscaler.*プロパティを通じて設定
  • Kubernetes Horizontal Pod Autoscaler (HPA): 必要に応じてステートレスなコンポーネント(Operator自体など)に使用可能。ただし、FlinkジョブのTaskManagerはFlink自身のオートスケーラーで管理するのが望ましい。ArgoCDのベストプラクティスとして、HPAがレプリカ数を管理する場合はGitでreplicasを追跡しないようにする

●FlinkデプロイメントのためのGitOpsベストプラクティス

ベストプラクティス領域 推奨事項 根拠・利点
リポジトリ構造 設定リポジトリとソースコードリポジトリを分離 監査ログのクリーン化、関心事の分離、CIループ回避
マニフェストの不変性 ベースHelmチャート、Dockerイメージ、リモートKustomizeベースのバージョンを特定のタグやコミットSHAに固定。latestタグや可変ブランチの使用を回避 デプロイの再現性と安定性を保証。予期せぬ変更を防止
CRD管理 Flink Kubernetes OperatorのCRDは、オペレーターインストールの一環としてクラスタワイドにインストールし、各アプリケーションチャートに含めない CRDのバージョン管理を一元化し、競合を回避
ArgoCD ApplicationSet 多数の類似したFlinkDeploymentインスタンス(例: チーム別、環境別)を管理するためにApplicationSetを使用 テンプレートから複数のArgoCD Applicationを効率的に生成・管理できる
Flinkアプリケーション用Helmチャート 主要なFlinkDeploymentフィールドをvalues.yamlで公開。デフォルトFlink設定を含めつつ、オーバーライドを可能にする。必要なサービスアカウント定義を含める 設定の標準化、再利用性向上、環境ごとのカスタマイズ容易化
シークレット管理 Gitに平文のシークレットを保存しない。Bitnami Sealed Secrets、HashiCorp Vault連携、Kubernetes External Secrets Operatorなどのツールを使用 セキュリティリスクの低減。GitOpsと親和性の高い方法でシークレットを管理
JARファイル管理 ジョブJARをバージョン管理されたカスタムDockerイメージに焼き付ける 不変性、バージョン管理、ロールバックの容易性、依存関係のカプセル化

■課題への対応と解決策

Flink Kubernetes OperatorとArgoCDを組み合わせたシステムは強力ですが、運用上いくつかの課題が生じる可能性があります。ここでは、一般的な課題とその解決策について説明します。

●マルチクラスタ/マルチテナントArgoCDセットアップの複雑性

多数のKubernetesクラスタや複数の開発チームに対してFlinkデプロイメントを管理する場合、ArgoCDのセットアップが複雑になることがあります。

  • 課題: 単一のArgoCDインスタンスで多数のクラスタを管理すると、そのインスタンスが単一障害点になったり、パフォーマンスのボトルネックになったりする可能性があります。また、各クラスタにArgoCDインスタンスをデプロイすると、運用オーバーヘッドが増加します。
  • 解決策:
    • ArgoCD ApplicationSet: テンプレートに基づいて複数のクラスタやリポジトリパスに対してArgoCD Applicationリソースを自動生成できます。これにより、多数の類似したFlinkデプロイメントの管理を簡素化します。
    • ArgoCDインスタンスアーキテクチャの選択:
      • 単一インスタンス: 全てのクラスタを単一のArgoCDで管理。管理は集中化されますが、スケーラビリティと障害分離に課題があります。
      • クラスタごとのインスタンス: 各クラスタに専用のArgoCDを配置。分離性は高いですが、運用コストが増加します。
      • ハブアンドスポークモデル(管理クラスタ): 中央の管理クラスタでArgoCDを実行し、そこから複数のワークロードクラスタを管理。バランスの取れたアプローチです。

●プログレッシブデリバリー

ArgoCD自体は基本的な同期機能を提供しますが、Flinkジョブのようなステートフルアプリケーションに対する高度なプログレッシブデリバリー(カナリアリリース、ブルー/グリーンデプロイメント)は複雑です。

  • 課題: Flinkジョブの状態(ステート)を維持しながら安全に新しいバージョンに移行するには、単純なPodの置き換え以上の制御が必要です。
  • 解決策/アプローチ:
    • Flinkのアップグレードメカニズムの活用: FlinkDeployment.spec.job.upgradeModesavepointlast-state)をGitOpsで制御し、Flink Operatorにアップグレード処理を実行させます。
    • Argo Rolloutsとの連携: Argo Rolloutsは高度なデプロイメント戦略を提供しますが、Flinkのステートフルな特性との連携にはカスタムロジックやフックが必要になります。
    • 手動介入を伴う段階的ロールアウト: ArgoCDの同期ポリシーをManualに設定し、新しいバージョンをデプロイ後、トラフィックを徐々に移行し、問題がなければ古いバージョンを削除する、といった手順をGitOpsの枠組みの中で計画的に実行します。

●セキュリティとコンプライアンスのギャップ

ArgoCDはGitにあるものを同期しますが、デフォルトではイメージの脆弱性スキャンや、RBACを超える詳細なポリシー適用を行いません。

  • 課題: セキュリティ脆弱性のあるイメージや、コンプライアンスに違反する設定がデプロイされるリスクがあります。
  • 解決策:
    • CIパイプラインでのセキュリティスキャン: マニフェストやイメージタグがGitにコミットされる前に、CIパイプラインでイメージスキャン(例: Trivy、Clair)や静的解析を実施します。
    • ポリシーエンジンとの連携: Open Policy Agent (OPA) GatekeeperのようなアドミッションコントローラをArgoCDと連携させ、デプロイ前にポリシー違反をチェック・強制します。

●ArgoCDの同期とヘルスステータスの不一致のトラブルシューティング

FlinkDeploymentのヘルスステータスがArgoCDで正しく表示できない問題は、前述のカスタムヘルスチェックで対応できます。

  • 一般的なArgoCDの「OutOfSync」の原因:
    • マニフェストのバグ(Kubernetes仕様外のフィールドなど)
    • pruneが無効な状態での同期後に残存するリソース
    • ミューテーティングWebhookや他のコントローラによるライブ状態の変更
    • HelmチャートでのrandAlphaNumのような毎回異なる値を生成する関数の使用
  • デバッグ方法:
    • argocd app diff <APPNAME>で差分を確認
    • argocd app manifests <APPNAME>でArgoCDが生成するマニフェストを確認
  • 差分の無視: 正当な理由でコントローラによって変更されるフィールド(一部のステータスフィールドなど)は、ArgoCDのignoreDifferences設定で無視できます。

●Flinkジョブでのスキーマ進化の管理

データストリームのスキーマ変更は、実行中のFlinkジョブを破壊する可能性があります。

  • 課題: スキーマの変更とFlinkジョブの更新を協調させる必要があります。
  • GitOpsアプローチ:
    • スキーマ定義(例: スキーマレジストリ内のAvroスキーマ)もバージョン管理し、Gitで管理します。
    • スキーマの更新は、Flinkジョブコードの更新(新しいJAR/イメージのビルド)、場合によってはFlinkステートのマイグレーションやジョブの再起動を伴う可能性があります。これら全ての変更をGitコミットとArgoCDの同期を通じて調整します。
    • Flinkが提供するスキーマ進化の機能(利用可能な場合)の活用も検討します。
  • リソース管理とチューニング: JobManager/TaskManagerのCPU、メモリ割り当ての最適化
  • チェックポイントとセーブポイントの設定とパフォーマンス: 特に大規模なステートを持つ場合の性能問題。参考)Segment社のS3ファイルシステムプラグイン
  • エラー処理とデバッグ: Kubernetes、Flinkエンジン、コネクタなど、複数のレイヤーで発生しうるエラーの切り分け

Flink OperatorとArgoCDを組み合わせたシステムは高度な自動化を実現しますが 「設定して終わり」というわけではありません。特に、ステートフルで複雑なFlinkアプリケーションのヘルスチェック、ステート管理、スキーマ進化といった運用上の課題には、継続的な注意とカスタム対応が不可欠です。

■まとめ

Flink Kubernetes OperatorとArgoCDを連携させるメリット

  • 開発者のアジリティ向上(迅速なデプロイとイテレーション)
  • 運用効率の改善(自動化による手作業の削減)
  • スケーラビリティ(KubernetesとFlinkの能力活用)
  • レジリエンス(自己修復と容易なロールバック)
  • 一貫性(Gitによる宣言的管理)
  • 監査可能性(Git履歴による変更追跡)

導入と運用の成功に向けて気をつけるポイント

  1. Operatorの専門知識への投資: Flink Kubernetes Operatorの機能とCRD(特にFlinkDeployment)を深く理解することが不可欠です。公式ドキュメントやコミュニティリソースを活用します。
  2. アプリケーションパッケージングの標準化: Flinkアプリケーションのデプロイには、Helmチャート(例: Wikimediaのflink-appチャート)をベストプラクティスとして採用し、FlinkDeploymentの定義をパラメータ化します。
  3. GitOpsの完全な採用: 全てのFlinkアプリケーション設定(コード、設定ファイル、デプロイメントマニフェスト)について、Gitを信頼できる唯一の情報源とし、ArgoCDによる自動同期を徹底します。
  4. カスタムヘルスチェックの優先: FlinkDeploymentリソースに対する堅牢なArgoCDカスタムヘルスチェック(Luaスクリプト)を早期に実装します。これは運用安定性の鍵となります。
  5. 明確なJAR管理戦略の策定: ジョブJARはバージョン管理されたDockerイメージに焼き付けることを優先します。
  6. 堅牢な監視とロギング: Kubernetes、Flink Operator、Flinkアプリケーション、ArgoCDの各レイヤーにわたる包括的な可観測性を実装します。
  7. 反復的な導入: まずは単純なFlinkジョブから始め、徐々により複雑なワークロードを移行し、経験を蓄積します。
  8. セキュリティバイデザイン: RBAC設定、シークレット管理、ネットワークポリシーなど、セキュリティに関する考慮事項を設計初期から統合します。

この統合スタックの導入には初期投資と学習コストが伴いますが、ステートフルなデータ処理において、運用安定性の向上、開発速度の加速、管理の容易化といった長期的なメリットが非常に大きいです。

Kubernetesでデータワークロードを運用する組織にとって、Flink Kubernetes OperatorとArgoCDの組み合わせは、もはや単なる選択肢ではなく、Flinkを効率的かつ大規模に管理するための戦略的必須事項と言えます。OperatorがFlink特有の複雑な運用を抽象化し、ArgoCDによるGitOpsが自動化、監査可能性、一貫性のあるデプロイを実現することで、Flinkを現代的なクラウドネイティブの運用に適応させます。

このスタックは、Kubernetes上でのステートフルストリーム処理のベストプラクティスアーキテクチャです。これを採用することで、組織は特注の運用ツール開発の負担から解放され、Flinkの能力を最大限に活用してビジネス価値の創出に集中できるようになります。

●参考リンク

この記事が少しでも参考になった、あるいは改善点などがあれば、ぜひリアクションやコメント、SNSでのシェアをいただけると励みになります!

Discussion