😁

Dataflow の Day 2 Operation ベタープラクティス

2023/08/01に公開

Google Cloud Japan の RyuSA です。👋

最近「 Pub/Sub からイベントとデータを引っ張って BigQuery に投げ込みたい」「 Cloud Spanner の変更ストリームを BigQuery に配置し、データを分析に使いたい」など、様々な用途で大量なデータの処理のために Dataflow を利用いただいているのを観測しています。👀

さて、巨大な分散処理をマネージドで実行してくれる便利な Dataflow ですが、運用する上でいくつか気を付けないといけないことがあります。そしてそれらの多くは「問題が発生してから」発覚することが多いです。この記事では「 Dataflowジョブの運用に関しての FAQ やよくある問題」に対する回答をまとめておきました。

監視 / 可観測性

Q: ジョブのログベースの監視のベストプラクティスをおしえて!

A. ログベース監視をやめる 🙅‍♀️
Dataflow ジョブをログベースで監視するのは非常に難しく、定期的に行われるワーカーノードのアップグレードでもログの内容が変わる可能性があります。
ログはあくまでも「説明のため」の可観測性であり、ワークロードが適切に動いているかはなるべくメトリクスを利用して監視することを推奨いたします。

Q: Cloud Logging にエラーログが出てるんだけど大丈夫?

A. (メトリクス上問題なければ)(多くの場合は) 大丈夫!💪

Dataflow はジョブ起動後や起動中に時折エラーメッセージを出力します。Dataflow がエラーログを出力することはよくあることであり、恐れることはありません。Dataflow のジョブはなるべくログではなく、メトリクスをベースに監視をすることを推奨しております。

もし不安であれば、ソフトウェアそのもののログであるワーカー(dataflow.googleapis.com/worker)やジョブメッセージ(dataflow.googleapis.com/job-message)のエラーログについてだけは確認しておくと良いかもしれません。

Q: 内部の状況が見えなくて不安……やっぱりログ出力?

A. カスタムメトリクスを実装しましょう!👍

Apache Beam にはメトリクスを自作する機構が用意されています。ジョブの可観測性を高めたい場合、ログ出力も大事ですが、カスタムメトリクスを実装して Dataflow の収集をしてもらうこともひとつの手段です。
https://beam.apache.org/releases/javadoc/2.49.0/index.html?org/apache/beam/sdk/metrics/Metrics.html

Dataflow は Apache Beam で実装されたカスタムメトリクスを取得して、Metrics Expoloer や Dataflow ジョブのコンソール上で見られるようになっています。

サンプルとして、たとえば BigQueryIO で利用できるカスタムメトリクス recordsAppended はこのように実装されています。
https://github.com/apache/beam/blob/v2.49.0/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/StorageApiWritesShardedRecords.java#L290-L291

Q: (Streamingジョブ) Dataflow のメトリクスが多すぎて、どれをどう見ればいいのかわからない

A. Data Freshness / Backlog(bytes) / System Lag は見ておきましょう 👀

Dataflow はジョブのメトリクスとして様々なデータを公開しています。しかしその中でも特に重要なメトリクスが Data Freshness / Backlog / System Lag の3種類です。(他が不要という意味ではありません)

  • Data Freshness : 未処理のデータの中で一番古いタイムスタンプと処理済みのデータの中で最新のタイムスタンプの差
  • Backlog : 各ステージが処理できず放置しているデータ量
  • System Lag : 各ステージごとのシステム側の実行時間(パイプラインそのものの処理やシャッフルの時間など)

これを組み合わせると、データの遅延の見え方が鮮明になってきます。

ケース1 Data Freshness と System Lag が共に高い

「古い未処理のデータが残っており、かつパイプラインの処理も遅い」ということを表しています。単純にパイプラインそのものの処理に時間がかかっているか、もしくはパイプライン内でエラーが発生しスタックしている可能性が高いです。
常にこのメトリクスが高い場合、ソースコード側に問題があることが多いです。ボトルネックを解消する必要がある場合はCloud Profilerを使って詳細な調査をしてみましょう。

ケース2 Data Freshness が高いが System Lag は低い

「処理はスムーズだけれども古いデータがまだ残っている」ということを表しています。たとえば「ソースの取得ににFixedWindowを使って一定時間データを集約している」という時などは一定周期で高いメトリクスになるはずです。

ケース3 Backlog が高い

処理は高速に進んでいるもののやることが積み上がっている状態です。おそらくインスタンスのパフォーマンス不足やワーカーの数が足りていない可能性が高いです。Dataflow ジョブに割り当てるリソース量を増やしてみることで解消することが多いでしょう。

実装 / ライフサイクル

Q : Dataflow ジョブ、いい感じに動いてるからソースコードを1年くらい放置してるんだけど大丈夫?

A. ダメ🙅‍♀️

Dataflow のジョブは Apache Beam を利用して実装します。 Dataflow はこの Apache Beam のサポートバージョンを定めています。
https://cloud.google.com/dataflow/docs/support/sdk-version-support-status

今までの傾向としては新しいバージョンがリリースしてから1年ほどは Dataflow はランタイムとしてその Apache Beam のバージョンをサポートします。これはつまり、1年以上 Apache Beam のバージョンをアップデートしない場合「気がついたら本番環境のジョブがサポート外になってた」ということが起こりがちです。

また、Apache Beam には様々なサービスを呼び出す I/O Connector が含まれています。たとえば BigQueryIO は Apache Beam からシームレスに BigQuery のジョブを呼び出したり Streaming するためのコンポーネントです。
Apache Beam のアップデートにはこういった Google Cloud サービスを呼び出すコンポーネントのバグ修正も含まれています。定期的に Apache Beam のアップデートをして、なるべく最新のバグ修正を受けるようにすることを推奨します。

Q: (Streamingジョブ) ジョブが無限リトライを繰り返しててスタックしてる!どうすればいい!?

A. コードを修正してジョブアップデートしましょう👍

Streaming ジョブ内でエラーが発生した場合、Dataflow は無限に処理をリトライしてしまいます。
https://cloud.google.com/dataflow/docs/guides/troubleshooting-your-pipeline?hl=ja#detect_an_exception_in_worker_code

Dataflow にデプロイしたコードのエラーハンドリングなどにバグがあり、結果 Dataflow 無限リトライ編に入ってしまった場合などはパイプラインをドレインできません。(リトライ部分を待機する必要があるため)

基本的にこういった場合はパイプラインのコードを修正し、ジョブをアップデートして問題の解消ができます。ジョブのアップデートではデータをロストすることなく、シームレスに新しいコードをデプロイすることができます。

Q: エラーハンドリング、どうすればいい?

A. デッドレター パターンで実装するとすっきりしやすいです ✉️

Dataflow のジョブは処理中にエラーが発生した際、バッチジョブの場合は4回、ストリーミングジョブの場合は無制限にリトライします。
https://cloud.google.com/dataflow/docs/guides/troubleshooting-your-pipeline?hl=ja#detect_an_exception_in_worker_code

よくある設計としては「リトライで解決できないデータをデッドレターとしてキューに入れて、後ろの処理に任せる」方式、通称デッドレター (キュー)パターンです。

たとえば BigQueryIO.Write が返す PCollection には「挿入に成功した行」と「挿入できなかった行」の両方が含まれるようになっています。
https://github.com/apache/beam/blob/v2.49.0/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/StorageApiLoads.java#L258-L269

このように処理に失敗した PCollection で Dataflow に無限リトライしてほしくないエラー( INVALID_ARGUMENT などリトライで解決しないようなもの)に関しては後ろのステップに引き渡してし個別対応することが可能です。

Q: Dataflow ジョブ、どの言語で実装するのがいい?

A. チームのスキルセットに依りますが、(個人的には) Java がおすすめです

Apache Beam は様々な抽象的なデータ処理レイヤーを提供しており、それを様々な言語で記述できることが強みです。Dataflow では2023年8月1日現在 Python / Java / Go 言語の3つの言語をサポートしています。

最近の Apache Beamでは Cross Language Transforms という別の言語の SDK を利用する機能が使われ始めており、実際 Python のBigQueryIO の実装の一部が Java に置き換わったり Go 言語の BigQueryIO をすべて Java に置き換える話が出てきていたりします。
Java の SDK は比較的活発に開発されることが多いため、特にこだわりがない場合は Java での実装を(個人的に)おすすめしています。

パフォーマンス / オートスケール

Q: Dataflow ジョブの画面にパフォーマンスワーニング出てるんだけど、大丈夫?

A. (メトリクス上、ワークロードが遅延を許容できていれば) 大丈夫!💪

Dataflow のコンソール画面上、時折ジョブの遅延や推奨事項がワーニングとして記載されることがあります。たとえば上記のワーニングは「何かしらのイベント等を受け、Data Freshness の指標が大きく跳ね上がった」ことを示しています。

しかしメトリクスを確認して Backlog や Data Freshness がその後落ち着いていればジョブはデータを適切に処理できていることを表しています。メトリクスの特性に合わせ、無視できない遅延が起きている場合は問題になっているステップのコードの修正やインスタンスサイズの変更など検討ください。

Q: ジョブのインスタンスとかワーカー数、どうやって決めればいい?

A. 詳細はこの記事を参考にして推測してみましょう 👀
https://cloud.google.com/blog/products/data-analytics/benchmarking-dataflow-jobs?hl=en

その Dataflow ジョブに求める「定量的に流入するデータ量」「各要素の平均データ量」「ピーク時の流入データ量」「ピークの頻度」などを仮定して計算することで、必要になる vCPU の数などの簡単な推測できます。これをベースにインスタンスタイプや最大ワーカー数を設定してみてください。

しかしこれはあくまでも「簡単な推測」であり、最後は「推測するな、計測せよ」に従って実際のパイプラインのパフォーマンスを検証することをおすすめします。「巨大なリソースを持つワーカーでまとめて処理する方が早い DoFn 」や「データを小さく分散し、大量のワーカーで分散処理する方が早い DoFn 」などパイプライン、データによって特性があります。最後は実際に計測して検証することが大切です。

Q: Dataflow 初心者なんだけど、何か落とし穴的な、気を付けておくことある?

A. 最大ワーカー数と Dataflow Prime に注意するといいかも……? 🫠

Dataflow は水平 / 垂直にオートスケールする機構を持っております。そのため、特に開発中などはスケールに関するフラグ値は注意して設定してください。

最大ワーカー数のデフォルト値は 100 です。開発中のパイプラインにバグがあり、結果無限にスケールして上限の 100 台までスケールしちゃった……というお話を何度か聞いたことがあります。必要に応じたワーカー数の上限を設定することを推奨します。

Dataflow Prime は縦方向へ自動スケールする機能で、 ジョブ起動時にdataflowServiceOptions=enable_prime のフラグを追加することで有効にできます。 この機能が有効になっている場合、OOM の発生回数などのメトリクスをベースに自動的にインスタンスサイズを上げていきます。
デフォルトではオフですが、Dataflow Prime を有効にして開発している間にインスタンスサイズが想定していないほど大きくなってしまっていた……というお話をたまに聞きます。縦方向へのスケールが必要にならない限り無効のままにしておくと良いかもしれません。

おわりに

分散処理というのものは、データの同期にリトライやエラーの設計、インフラストラクチャーのスケールなど考えなくてはならないことがたくさんあります。
Dataflow はそういった課題を解決できる環境を提供しているプロダクトであり、使いこなしていただけると強力な武器になると思います 💪

もし Dataflow をワークロードとして利用する場合、まずはこの記事で紹介したことを思い出してみてください 👋

Google Cloud Japan

Discussion