📊

Dataflow のストリーミングジョブにおいてストラグラーを検知できるようになりました

2023/10/05に公開

はじめに

こんにちは、クラウドエース データML ディビジョン所属の森です。

データML ディビジョンでは、Google Cloud が提供しているデータ領域のプロダクトについて、
新規リリースをキャッチアップするための調査報告会を毎週実施しています。
新規リリースの中でも、特に重要と考えるリリースを記事としてまとめ、本ページのように公開しています。

今回紹介するリリースは、「Dataflow ストリーミングジョブでのストラグラー検知」についてです。
このリリースによって、今まで Dataflow バッチジョブでのみ提供されていたストラグラー検知機能が、Dataflow ストリーミングジョブでも使用できるようになりました。

Dataflow とは

Dataflow は、Google Cloud が提供するフルマネージド型のデータ処理サービスです。
大量のデータをリアルタイムまたはバッチ処理で効率的に変換、分析、統合することが可能です。

Dataflow の主な特徴は以下の通りです。

  • サーバーレス
    • ユーザーはインフラストラクチャの管理を気にすることなく、データパイプラインの開発に集中が可能です。
  • 自動スケーリング
    • データ量や計算の複雑さに応じて、自動的にリソースのスケーリングを行うことが可能です。
  • リアルタイムとバッチ処理
    • Dataflow はストリーミングデータとバッチデータの両方を効率的に処理することが可能です。
  • 他プロダクトとの連携
    • Dataflow は Google Cloud の他のサービス(BigQuery、Cloud Storage、Pub/Sub 等)とシームレスに連携が可能です。
  • オープンソース
    • Dataflow は Apache Beam というオープンソースのモデルに基づいています。
      このため、異なる実行環境間でデータパイプラインを移植することが可能です。

Dataflow は、大規模なデータセットを取り扱うデータエンジニアリング、データ分析、機械学習のタスクに特に適しています。

Dataflow 用語

本稿にて頻出する Dataflow の用語について説明します。

  • ジョブ
    • データのバッチ処理またはストリーミング処理を行うために送信された、特定のデータ処理タスク。
  • ジョブグラフ
    • ジョブの一連の処理がグラフィカルに表現されるグラフ。
  • ステージ
    • Dataflow パイプラインで融合されるステップの単位。
  • ワーカー
    • Dataflow ジョブを実行する Compute Engine VM インスタンス。

今回のリリースについて

ストラグラーとは

Dataflow のストラグラー(Straggler)とは、同一ステージ内のデータ処理タスクの中で、他のタスクに比べて遅れて完了するタスクのことを指します。
これらのストラグラーは、全体のパイプラインのパフォーマンスに影響を与え、処理時間を増加させる可能性があります。

補足:ストラグラー(Straggler)には、「落伍者」「はぐれ者」等の意味があるようです。

Dataflow のストラグラー検知機能について

今回のリリースによって、Dataflow ストリーミングジョブの同一ステージ内で時間がかかっている特定の処理を検知できるようになりました。

Dataflow ストラグラー検知の例として、Dataflow ステージ内にて 1 つの処理に時間がかかっている場合を想定してみます。
10 分弱で処理が終了されると想定されている「処理 A 〜 処理 E」にて、「処理 E」の完了までに時間がかかっている場合、これを Dataflow はストラグラーとして検知してくれます。

検知できる内容

ステージの進捗状況から確認ができるストラグラーの詳細項目には、以下の情報が記載されます。

  • 検出開始時刻
    • ストラグラーが検出された開始時刻が記載される。
  • Worker
    • ストラグラーが発生しているワーカーが記載される。
      ワーカーをクリックすることで、Cloud Logging のログエクスプローラが開く。

検知機能を利用するための事前設定は必要ありません。
ストリーミングジョブの実行中にストラグラーが発生した際は、自動的に検知してくれます。

実際にストラグラー検知機能を使ってみる

ここからは、Dataflow のストリーミングジョブにてストラグラーが発生した場合に、どのように確認できるかを見ていきましょう。

想定したシチュエーション

今回は、以下の状況を想定し意図的にストラグラーを発生させてみました。

  • 処理内容
    • Dataflow にて、Cloud Pub/Sub から Cloud Storage へデータを書き込むストリーミングジョブを実行する。
  • ストラグラー発生箇所
    • Cloud Storage へデータを書き込む際。
  • ストラグラーを発生させる方法
    • Cloud Storage へデータを書き込む箇所において、Sleep 関数で 1 時間処理を待機させる。

ストラグラー発生までの手順

具体的な手順としては、以下の通りです。
ここでは 公式ドキュメントのコードサンプル(Python)をベースに使用しています。

  1. クイックスタート: Dataflow を使用して Pub/Sub からメッセージをストリーミングするを参考に、始める前にの手順 1 から、Pub/Sub プロジェクトを設定するの手順 5 までを実行する。
  2. 上記手順 5 にてクローンしたディレクトリのpython-docs-samples/pubsub/streaming-analytics配下にあるサンプルコード「PubSubToGCS.py」を開く。
  3. import 処理記載箇所に time 関数を追加する。
PubSubToGCS.py
 # [START pubsub_to_gcs]
 import argparse
 from datetime import datetime
 import logging
 import random

+# コード追加:timeモジュールをインポート
+import time

 from apache_beam import (
 
  1. Cloud Storage へデータを書き込む処理であるclass WriteToGCS(DoFn):def processに、スリープ処理のコードを追加する。
PubSubToGCS.py
 class WriteToGCS(DoFn):
     def __init__(self, output_path):
      def process(self, key_value, window=DoFn.WindowParam):
         """Write messages in a batch to Google Cloud Storage."""

         ts_format = "%H:%M"
         window_start = window.start.to_utc_datetime().strftime(ts_format)
         window_end = window.end.to_utc_datetime().strftime(ts_format)
         shard_id, batch = key_value
         filename = "-".join([self.output_path, window_start, window_end, str(shard_id)])

+        # コード追加:1時間停止するスリープ処理を追加
+        time.sleep(3600)

         with io.gcsio.GcsIO().open(filename=filename, mode="w") as f:
 
  1. 変更を保存して閉じる。
  2. パイプラインを開始するに記載のコマンドを実行し、ジョブを起動する。

また、ストラグラーとして検知されるまでにはタイムラグがあったため、上記手順にてストラグラーが発生しない場合には、時間を置いた後に確認してみましょう。
ここからは、ストラグラーの確認手順を見ていきます。

ストラグラーの確認手順

ストラグラーは、Dataflow ジョブを開始した後に Google Cloud コンソールから確認できます。
ストラグラーは以下のジョブグラフから確認できるため、それぞれについて具体的な確認手順を見ていきます。

  • ステージの進捗状況
  • ステージのワークフロー

ステージの進捗状況

  1. Google Cloud コンソールで、Dataflow の 「ジョブ」 ページに移動します。
  2. ジョブの名前をクリックして「ジョブの詳細」ページに移動します。
  3. 「ジョブの詳細」 ページで、「実行の詳細」 タブをクリックします。
  4. 「グラフ表示」 リストで、「ステージの進捗状況」 を選択します。
  5. 進捗状況グラフにて、検出されたストラグラーが表示されます。

ステージのワークフロー

  1. Google Cloud コンソールで、Dataflow の 「ジョブ」 ページに移動します。
  2. ジョブの名前をクリックして「ジョブの詳細」ページに移動します。
  3. 「ジョブの詳細」 ページで、「実行の詳細」 タブをクリックします。
  4. 「グラフ表示」 リストで、「ステージのワークフロー」 を選択します。
  5. 進捗状況グラフにて、検出されたストラグラーが表示されます。

詳細な確認手順は Google Cloud 公式ドキュメントをご参照下さい。

ストラグラー検知を活用できるケース

ストラグラー検知を活用できる場面として、以下の2つが挙げられます。

  • ステージ内のデータ処理タスクが想定より遅く終了した場合。
  • Dataflow 分析情報 にて問題が発生していないが、処理が遅い場合。

このような場合に、コンソール画面からストラグラーが発生していないかを確認することが有効です。

ストラグラーが発生していることが確認できた場合、トラブルシュートとして以下の観点から問題がないかを確認することが大切になります。

  • DoFn [1] コード内にバグが発生、または DoFns の停止がされていないか。
  • 完了までに長い時間がかかる外部サービスの呼び出しに時間がかかっていないか。
  • Sink [2] の割り当て上限に引っかかっていないか。
  • 永続的な状態で大規模な読み取り / 書き込みオペレーションを行う DoFn がないか。

このように、ストラグラー検知機能を用いることで、処理遅延の改善への対応が広く行えるようになると考えられます。

トラブルシューティングの詳細については、Google Cloud 公式ドキュメントをご確認ください。

まとめ

今回の記事では、Dataflow ストリーミングジョブでのストラグラー検知について紹介しました。
Dataflow ストリーミングジョブをお使いの際に処理が想定より遅い場合には、ストラグラーの有無の確認を行っていただき、処理遅延改善の一助としていただければ幸いです。

脚注
  1. DoFnとは、パイプラインの中でデータ要素に対して特定の操作を適用するためのカスタム関数のことです。 ↩︎

  2. Sinkとは、ファイルやデータベースなどの外部データ ストレージ システムに書き込む変換のことです。 ↩︎

Discussion