🌊

Dataflow ストリーミング ジョブのステータスを監視してみた

2024/12/27に公開

こんにちは。クラウドエース第三開発部の濵田です。
さて、皆さんは、Dataflow ストリーミング ジョブのステータスを監視する際に、Canceled と Drained も監視したいと思ったことはありませんか。
ジョブの指標では、Failed は提供されていますが、Canceled と Drained は提供されていません。
そこで本記事では、ログベースの指標を用いて Dataflow ストリーミング ジョブのステータス(Failed、Canceled、Drained)を監視する方法について解説します。

はじめに

なぜ、このような記事を書こうと思ったのか。
それは、弊社のサポート窓口に「Dataflow ストリーミング ジョブのステータスを監視する方法が知りたい」とお問い合わせがあり、公式ドキュメントを探しても記載がなかったからです。
同じようなことでお困りになっている方の参考に少しでもなれたら幸いです。

Dataflow とは

Dataflow は、Google Cloud で提供される大規模データ処理のためのサービスです。
特に、ストリーミング処理に優れ、リアルタイム分析やイベント処理など、さまざまなユースケースで活用されています。
Dataflow は、スケーラビリティが高く、自動的にリソースを調整するため、大量のデータを効率的に処理できます。
また、オープンソースの Apache Beam ライブラリを用いることで、さまざまなプログラミング言語でデータ処理パイプラインを構築できます。

ストリーミング ジョブとは

Dataflow ジョブには、バッチジョブとストリーミング ジョブの2種類が存在します。
バッチジョブではドレインがサポートされていないため、本記事ではストリーミング ジョブを中心に説明します。
ストリーミング ジョブとは、データの発生源から継続的にデータを読み込み、リアルタイムで処理を行うジョブです。
Dataflow ストリーミング ジョブには、主に以下の3つのステータスが存在します。

  • Failed: ジョブが異常終了した場合のステータス
  • Canceled: ユーザーがジョブを途中で停止し、データの取り込みと処理が中断された場合のステータス
  • Drained: ユーザーがジョブを途中で停止し、バッファ内のデータの処理が完了した場合のステータス

これらのステータスを監視することは、ジョブの実行状況を把握し、問題発生時の迅速な対応を可能にする上で非常に重要です。
例えば、ジョブが Failed ステータスになった場合は、エラーの原因を調査し、速やかに復旧作業を行う必要があります。

ジョブの指標を用いた監視方法

実は 公式ドキュメント にジョブの指標を用いた監視方法の記載があります。
ジョブの指標では、Failed は提供されていますが、Canceled と Drained は提供されていません。
image00
では、どうやって、Canceled と Drained を検知するのか。そこで用いるのが、ログベースの指標です。

ログベースの指標とは

ログベースの指標とは、システムやアプリケーションの実行ログから特定のイベントを抽出し、指標として利用できるようにしたものです。
ログには処理の開始や終了、エラー発生などのさまざまなイベントが記録されています。これらのログ情報を分析し、指標として活用することで、システムの状態やアプリケーションの動作状況を詳細に把握することができます。
例えば、ログから「ジョブがキャンセルされました」というメッセージを検出することで、Canceled ステータスを指標として把握することができます。

ログベースの指標を用いた監視方法

それでは、ログベースの指標を用いて Dataflow ストリーミング ジョブのステータスを監視する手順を以下のように分けて説明します。

  1. ログシンクの作成
    a. Canceled のログシンク
    b. Drained のログシンク
    c. Failed のログシンク
  2. アラートポリシーの作成
    a. Canceled のアラートポリシー
    b. Drained のアラートポリシー
    c. Failed のアラートポリシー

ログシンクの作成

Canceled のログシンク

まず、Canceled のログシンクを作成します。

  1. Logging のページに移動します。
    a. コンソールを開き、検索窓に「Logging」と入力します。
    b. 検索結果から「Logging」を選択します。
    image01
  2. ナビゲーションパネルで 「ログルーター」を選択します。
    image02
  3. 「シンクを作成」をクリックします。
    image03
  4. シンク名に「Canceled_Dataflow_Streaming_Job」を入力し、「次へ」をクリックします。
    image04
  5. 「シンクサービスの選択」ドロップダウン メニューを開き、「Logging バケット」を選択します。
  6. 「ログバケットの選択」ドロップダウン メニューを開き、「新しいログバケットを作成」をクリックします。
  7. 任意のログバケットを作成し、作成したログバケットを選択します。
  8. 「次へ」をクリックします。
    image05
  9. 「シンクに含めるログの選択」で以下のように入力します。
    resource.type="dataflow_step" 
    textPayload:"Cancel"
    
  10. 「シンクを作成」をクリックします。
    image06

Drained のログシンク

Canceled と同様に、Drained のログシンクを作成します。

  1. 「シンクを作成」をクリックします。
    image03
  2. シンク名に「Drained_Dataflow_Streaming_Job」を入力し、「次へ」をクリックします。
    image07
  3. 「シンクサービスの選択」ドロップダウン メニューを開き、「Logging バケット」を選択します。
  4. 「ログバケットの選択」ドロップダウン メニューを開き、「新しいログバケットを作成」をクリックします。
  5. 任意のログバケットを作成し、作成したログバケットを選択します。
  6. 「次へ」をクリックします。
    image08
  7. 「シンクに含めるログの選択」で以下のように入力します。
    resource.type="dataflow_step" 
    textPayload:"Drain"
    
  8. 「シンクを作成」をクリックします。
    image09

Failed のログシンク

ジョブの指標では、Failed が提供されていますが、すべてのエラーを検知できるわけではありません。
そこで本記事では、Failed もログベースの指標で作成します。

  1. 「シンクを作成」をクリックします。
    image03
  2. シンク名に「Failed_Dataflow_Streaming_Job」を入力し、「次へ」をクリックします。
    image07
  3. 「シンクサービスの選択」ドロップダウン メニューを開き、「Logging バケット」を選択します。
  4. 「ログバケットの選択」ドロップダウン メニューを開き、「新しいログバケットを作成」をクリックします。
  5. 任意のログバケットを作成し、作成したログバケットを選択します。
  6. 「次へ」をクリックします。
    image08
  7. 「シンクに含めるログの選択」で以下のように入力します。
    resource.type="dataflow_step"
    log_name:"job-message"
    severity=ERROR
    
  8. 「シンクを作成」をクリックします。
    image09

アラートポリシーの作成

Canceled のアラートポリシー

先程作成したログシンクを使って、Canceled のアラートポリシーを作成します。

  1. アラートのページに移動します。
    a. コンソールを開き、検索窓に「アラート」と入力します。
    b. 検索結果から「アラート」を選択します。
    image10
  2. 「Create policy」をクリックします。
    image11
  3. 「Select a metric」下の「指標を選択」ドロップダウン メニューを開き、検索欄に「Logging export sink」と入力します。
  4. 「Logging export sink」 > 「ログベースの指標」 > 「Exported log entries」 の順に選択します。
  5. 「適用」をクリックします。
    image12
  6. 「Add a filter」をクリックします。
  7. 「name」 > 「=」 > 「Canceled_Dataflow_Streaming_Job」の順に設定します。
    image13
  8. ローリング ウィンドウを「5分」に設定します。
  9. ローリング ウィンドウ関数を「sum」に設定します。
  10. 「Next」をクリックします。
    image14
  11. Condition Types を「Threshold」に設定します。
  12. しきい値に「0」と入力します。
  13. 「Next」をクリックします。
    image15
  14. メールなどに通知したい場合は、「通知チャンネルを使用」を有効にします。
  15. 「通知チャンネル」ドロップダウン メニューを開き、「Manage Notification Channels」をクリックします。
    image16
  16. 「Email」まで下にスクロールし、右端にある「Add New」をクリックします。
  17. Email Address と Display Name を入力し、「Save」をクリックします。
    image17
  18. 「通知チャンネル」ドロップダウン メニューをもう一度開きます。
  19. 追加した Email を選択して「OK」をクリックします。
    image18
  20. 件名に「Dataflow streaming job is canceled」と入力します。
    image19
  21. アラートポリシー名に「Canceled Dataflow Streaming Job」と入力します。
  22. 「Next」をクリックします。
    image20
  23. 「ポリシーを作成」をクリックします。
    image21

Drained のアラートポリシー

Canceled と同様に、Drained のアラートポリシーを作成します。

  1. 「Create policy」をクリックします。
    image11
  2. 「Select a metric」下の「指標を選択」ドロップダウン メニューを開き、検索欄に「Logging export sink」と入力します。
  3. 「Logging export sink」 > 「ログベースの指標」 > 「Exported log entries」 の順に選択します。
  4. 「適用」をクリックします。
    image12
  5. 「Add a filter」をクリックします。
  6. 「name」 > 「=」 > 「Drained_Dataflow_Streaming_Job」の順に設定します。
    image22
  7. ローリング ウィンドウを「5分」に設定します。
  8. ローリング ウィンドウ関数を「sum」に設定します。
  9. 「Next」をクリックします。
    image14
  10. Condition Types を「Threshold」に設定します。
  11. しきい値に「0」と入力します。
  12. 「Next」をクリックします。
    image15
  13. メールなどに通知したい場合は、「通知チャンネルを使用」を有効にします。
  14. 「通知チャンネル」ドロップダウン メニューを開きます。
  15. Email を選択して「OK」をクリックします。
    image18
  16. 件名に「Dataflow streaming job is drained」と入力します。
    image23
  17. アラートポリシー名に「Drained Dataflow Streaming Job」と入力します。
  18. 「Next」をクリックします。
    image24
  19. 「ポリシーを作成」をクリックします。
    image25

Failed のアラートポリシー

続いて、Failed のアラートポリシーを作成します。

  1. 「Create policy」をクリックします。
    image11
  2. 「Select a metric」下の「指標を選択」ドロップダウン メニューを開き、検索欄に「Logging export sink」と入力します。
  3. 「Logging export sink」 > 「ログベースの指標」 > 「Exported log entries」 の順に選択します。
  4. 「適用」をクリックします。
    image12
  5. 「Add a filter」をクリックします。
  6. 「name」 > 「=」 > 「Failed_Dataflow_Streaming_Job」の順に設定します。
    image43
  7. ローリング ウィンドウを「5分」に設定します。
  8. ローリング ウィンドウ関数を「sum」に設定します。
  9. 「Next」をクリックします。
    image14
  10. Condition Types を「Threshold」に設定します。
  11. しきい値に「0」と入力します。
  12. 「Next」をクリックします。
    image15
  13. メールなどに通知したい場合は、「通知チャンネルを使用」を有効にします。
  14. 「通知チャンネル」ドロップダウン メニューを開きます。
  15. Email を選択して「OK」をクリックします。
    image18
  16. 件名に「Dataflow streaming job is failed」と入力します。
    image27
  17. アラートポリシー名に「Failed Dataflow Streaming Job」と入力します。
  18. 「Next」をクリックします。
    image28
  19. 「ポリシーを作成」をクリックします。
    image29

動作確認

先ほど作成したアラートの動作確認を行います。

Canceled

  1. Dataflow テンプレートを使用してストリーミング パイプラインを作成する」を参考にストリーミング ジョブを作成します。
  2. ストリーミング ジョブを実行します。
    image30
  3. 実行から5分経過後、ストリーミング ジョブをキャンセルします。
    a. 「停止」をクリックします。
    image31
    b. 「キャンセル」を選択し、「STOP JOB」をクリックします。
    image32
  4. ストリーミング ジョブのステータスが「キャンセルされました」になります。
    image33
  5. アラートのページに移動し、アラートが発報されているか確認します。
    ※アラートが発報されるまで数分かかることがあります。
    image34
    image35
  6. 通知メールが来ているか確認します。
    image36

Drained

  1. Dataflow テンプレートを使用してストリーミング パイプラインを作成する」を参考にストリーミング ジョブを作成します。
  2. ストリーミング ジョブを実行します。
    image37
  3. 実行から5分経過後、ストリーミング ジョブをドレインします。
    a. 「停止」をクリックします。
    image31
    b. 「ドレイン」を選択し、「STOP JOB」をクリックします。
    image38
  4. ストリーミング ジョブのステータスが「ドレインされました」になります。
    image39
  5. アラートのページに移動し、アラートが発報されているか確認します。
    ※アラートが発報されるまで数分かかることがあります。
    image40
    image41
  6. 通知メールが来ているか確認します。
    image42

Failed

  1. Dataflow テンプレートを使用してストリーミング パイプラインを作成する」を参考にストリーミング ジョブを作成します。
    ※意図的に Failed にするために存在しないバケットを指定します。
    image44
  2. ストリーミング ジョブを実行します。
  3. 実行から約5分経過後、ストリーミング ジョブのステータスが「失敗しました」になります。
    image45
  4. アラートのページに移動し、アラートが発報されているか確認します。
    ※アラートが発報されるまで数分かかることがあります。
    image46
    image47
  5. 通知メールが来ているか確認します。
    image48

おわりに

本記事では、ログベースの指標を用いて Dataflow ストリーミング ジョブのステータスを監視する方法を紹介しました。
実はこの記事が私にとって初めての投稿です。
記事を書くにあたり読者に分かりやすく伝えることの難しさを痛感しましたが、一方で自分の経験を共有することの楽しさにも気づかされました。
弊社のサポート窓口には、この他にもさまざまなお問い合わせが寄せられています。
今後も、サポート業務での事例を元に、読者の皆様に役立つ情報を発信していきます。

Discussion