Dataflow ストリーミング ジョブのステータスを監視してみた
こんにちは。クラウドエース第三開発部の濵田です。
さて、皆さんは、Dataflow ストリーミング ジョブのステータスを監視する際に、Canceled と Drained も監視したいと思ったことはありませんか。
ジョブの指標では、Failed は提供されていますが、Canceled と Drained は提供されていません。
そこで本記事では、ログベースの指標を用いて Dataflow ストリーミング ジョブのステータス(Failed、Canceled、Drained)を監視する方法について解説します。
はじめに
なぜ、このような記事を書こうと思ったのか。
それは、弊社のサポート窓口に「Dataflow ストリーミング ジョブのステータスを監視する方法が知りたい」とお問い合わせがあり、公式ドキュメントを探しても記載がなかったからです。
同じようなことでお困りになっている方の参考に少しでもなれたら幸いです。
Dataflow とは
Dataflow は、Google Cloud で提供される大規模データ処理のためのサービスです。
特に、ストリーミング処理に優れ、リアルタイム分析やイベント処理など、さまざまなユースケースで活用されています。
Dataflow は、スケーラビリティが高く、自動的にリソースを調整するため、大量のデータを効率的に処理できます。
また、オープンソースの Apache Beam ライブラリを用いることで、さまざまなプログラミング言語でデータ処理パイプラインを構築できます。
ストリーミング ジョブとは
Dataflow ジョブには、バッチジョブとストリーミング ジョブの2種類が存在します。
バッチジョブではドレインがサポートされていないため、本記事ではストリーミング ジョブを中心に説明します。
ストリーミング ジョブとは、データの発生源から継続的にデータを読み込み、リアルタイムで処理を行うジョブです。
Dataflow ストリーミング ジョブには、主に以下の3つのステータスが存在します。
- Failed: ジョブが異常終了した場合のステータス
- Canceled: ユーザーがジョブを途中で停止し、データの取り込みと処理が中断された場合のステータス
- Drained: ユーザーがジョブを途中で停止し、バッファ内のデータの処理が完了した場合のステータス
これらのステータスを監視することは、ジョブの実行状況を把握し、問題発生時の迅速な対応を可能にする上で非常に重要です。
例えば、ジョブが Failed ステータスになった場合は、エラーの原因を調査し、速やかに復旧作業を行う必要があります。
ジョブの指標を用いた監視方法
実は 公式ドキュメント にジョブの指標を用いた監視方法の記載があります。
ジョブの指標では、Failed は提供されていますが、Canceled と Drained は提供されていません。
では、どうやって、Canceled と Drained を検知するのか。そこで用いるのが、ログベースの指標です。
ログベースの指標とは
ログベースの指標とは、システムやアプリケーションの実行ログから特定のイベントを抽出し、指標として利用できるようにしたものです。
ログには処理の開始や終了、エラー発生などのさまざまなイベントが記録されています。これらのログ情報を分析し、指標として活用することで、システムの状態やアプリケーションの動作状況を詳細に把握することができます。
例えば、ログから「ジョブがキャンセルされました」というメッセージを検出することで、Canceled ステータスを指標として把握することができます。
ログベースの指標を用いた監視方法
それでは、ログベースの指標を用いて Dataflow ストリーミング ジョブのステータスを監視する手順を以下のように分けて説明します。
- ログシンクの作成
a. Canceled のログシンク
b. Drained のログシンク
c. Failed のログシンク - アラートポリシーの作成
a. Canceled のアラートポリシー
b. Drained のアラートポリシー
c. Failed のアラートポリシー
ログシンクの作成
Canceled のログシンク
まず、Canceled のログシンクを作成します。
- Logging のページに移動します。
a. コンソールを開き、検索窓に「Logging」と入力します。
b. 検索結果から「Logging」を選択します。
- ナビゲーションパネルで 「ログルーター」を選択します。
- 「シンクを作成」をクリックします。
- シンク名に「Canceled_Dataflow_Streaming_Job」を入力し、「次へ」をクリックします。
- 「シンクサービスの選択」ドロップダウン メニューを開き、「Logging バケット」を選択します。
- 「ログバケットの選択」ドロップダウン メニューを開き、「新しいログバケットを作成」をクリックします。
- 任意のログバケットを作成し、作成したログバケットを選択します。
- 「次へ」をクリックします。
- 「シンクに含めるログの選択」で以下のように入力します。
resource.type="dataflow_step" textPayload:"Cancel"
- 「シンクを作成」をクリックします。
Drained のログシンク
Canceled と同様に、Drained のログシンクを作成します。
- 「シンクを作成」をクリックします。
- シンク名に「Drained_Dataflow_Streaming_Job」を入力し、「次へ」をクリックします。
- 「シンクサービスの選択」ドロップダウン メニューを開き、「Logging バケット」を選択します。
- 「ログバケットの選択」ドロップダウン メニューを開き、「新しいログバケットを作成」をクリックします。
- 任意のログバケットを作成し、作成したログバケットを選択します。
- 「次へ」をクリックします。
- 「シンクに含めるログの選択」で以下のように入力します。
resource.type="dataflow_step" textPayload:"Drain"
- 「シンクを作成」をクリックします。
Failed のログシンク
ジョブの指標では、Failed が提供されていますが、すべてのエラーを検知できるわけではありません。
そこで本記事では、Failed もログベースの指標で作成します。
- 「シンクを作成」をクリックします。
- シンク名に「Failed_Dataflow_Streaming_Job」を入力し、「次へ」をクリックします。
- 「シンクサービスの選択」ドロップダウン メニューを開き、「Logging バケット」を選択します。
- 「ログバケットの選択」ドロップダウン メニューを開き、「新しいログバケットを作成」をクリックします。
- 任意のログバケットを作成し、作成したログバケットを選択します。
- 「次へ」をクリックします。
- 「シンクに含めるログの選択」で以下のように入力します。
resource.type="dataflow_step" log_name:"job-message" severity=ERROR
- 「シンクを作成」をクリックします。
アラートポリシーの作成
Canceled のアラートポリシー
先程作成したログシンクを使って、Canceled のアラートポリシーを作成します。
- アラートのページに移動します。
a. コンソールを開き、検索窓に「アラート」と入力します。
b. 検索結果から「アラート」を選択します。
- 「Create policy」をクリックします。
- 「Select a metric」下の「指標を選択」ドロップダウン メニューを開き、検索欄に「Logging export sink」と入力します。
- 「Logging export sink」 > 「ログベースの指標」 > 「Exported log entries」 の順に選択します。
- 「適用」をクリックします。
- 「Add a filter」をクリックします。
- 「name」 > 「=」 > 「Canceled_Dataflow_Streaming_Job」の順に設定します。
- ローリング ウィンドウを「5分」に設定します。
- ローリング ウィンドウ関数を「sum」に設定します。
- 「Next」をクリックします。
- Condition Types を「Threshold」に設定します。
- しきい値に「0」と入力します。
- 「Next」をクリックします。
- メールなどに通知したい場合は、「通知チャンネルを使用」を有効にします。
- 「通知チャンネル」ドロップダウン メニューを開き、「Manage Notification Channels」をクリックします。
- 「Email」まで下にスクロールし、右端にある「Add New」をクリックします。
- Email Address と Display Name を入力し、「Save」をクリックします。
- 「通知チャンネル」ドロップダウン メニューをもう一度開きます。
- 追加した Email を選択して「OK」をクリックします。
- 件名に「Dataflow streaming job is canceled」と入力します。
- アラートポリシー名に「Canceled Dataflow Streaming Job」と入力します。
- 「Next」をクリックします。
- 「ポリシーを作成」をクリックします。
Drained のアラートポリシー
Canceled と同様に、Drained のアラートポリシーを作成します。
- 「Create policy」をクリックします。
- 「Select a metric」下の「指標を選択」ドロップダウン メニューを開き、検索欄に「Logging export sink」と入力します。
- 「Logging export sink」 > 「ログベースの指標」 > 「Exported log entries」 の順に選択します。
- 「適用」をクリックします。
- 「Add a filter」をクリックします。
- 「name」 > 「=」 > 「Drained_Dataflow_Streaming_Job」の順に設定します。
- ローリング ウィンドウを「5分」に設定します。
- ローリング ウィンドウ関数を「sum」に設定します。
- 「Next」をクリックします。
- Condition Types を「Threshold」に設定します。
- しきい値に「0」と入力します。
- 「Next」をクリックします。
- メールなどに通知したい場合は、「通知チャンネルを使用」を有効にします。
- 「通知チャンネル」ドロップダウン メニューを開きます。
- Email を選択して「OK」をクリックします。
- 件名に「Dataflow streaming job is drained」と入力します。
- アラートポリシー名に「Drained Dataflow Streaming Job」と入力します。
- 「Next」をクリックします。
- 「ポリシーを作成」をクリックします。
Failed のアラートポリシー
続いて、Failed のアラートポリシーを作成します。
- 「Create policy」をクリックします。
- 「Select a metric」下の「指標を選択」ドロップダウン メニューを開き、検索欄に「Logging export sink」と入力します。
- 「Logging export sink」 > 「ログベースの指標」 > 「Exported log entries」 の順に選択します。
- 「適用」をクリックします。
- 「Add a filter」をクリックします。
- 「name」 > 「=」 > 「Failed_Dataflow_Streaming_Job」の順に設定します。
- ローリング ウィンドウを「5分」に設定します。
- ローリング ウィンドウ関数を「sum」に設定します。
- 「Next」をクリックします。
- Condition Types を「Threshold」に設定します。
- しきい値に「0」と入力します。
- 「Next」をクリックします。
- メールなどに通知したい場合は、「通知チャンネルを使用」を有効にします。
- 「通知チャンネル」ドロップダウン メニューを開きます。
- Email を選択して「OK」をクリックします。
- 件名に「Dataflow streaming job is failed」と入力します。
- アラートポリシー名に「Failed Dataflow Streaming Job」と入力します。
- 「Next」をクリックします。
- 「ポリシーを作成」をクリックします。
動作確認
先ほど作成したアラートの動作確認を行います。
Canceled
- 「Dataflow テンプレートを使用してストリーミング パイプラインを作成する」を参考にストリーミング ジョブを作成します。
- ストリーミング ジョブを実行します。
- 実行から5分経過後、ストリーミング ジョブをキャンセルします。
a. 「停止」をクリックします。
b. 「キャンセル」を選択し、「STOP JOB」をクリックします。
- ストリーミング ジョブのステータスが「キャンセルされました」になります。
- アラートのページに移動し、アラートが発報されているか確認します。
※アラートが発報されるまで数分かかることがあります。
- 通知メールが来ているか確認します。
Drained
- 「Dataflow テンプレートを使用してストリーミング パイプラインを作成する」を参考にストリーミング ジョブを作成します。
- ストリーミング ジョブを実行します。
- 実行から5分経過後、ストリーミング ジョブをドレインします。
a. 「停止」をクリックします。
b. 「ドレイン」を選択し、「STOP JOB」をクリックします。
- ストリーミング ジョブのステータスが「ドレインされました」になります。
- アラートのページに移動し、アラートが発報されているか確認します。
※アラートが発報されるまで数分かかることがあります。
- 通知メールが来ているか確認します。
Failed
- 「Dataflow テンプレートを使用してストリーミング パイプラインを作成する」を参考にストリーミング ジョブを作成します。
※意図的に Failed にするために存在しないバケットを指定します。
- ストリーミング ジョブを実行します。
- 実行から約5分経過後、ストリーミング ジョブのステータスが「失敗しました」になります。
- アラートのページに移動し、アラートが発報されているか確認します。
※アラートが発報されるまで数分かかることがあります。
- 通知メールが来ているか確認します。
おわりに
本記事では、ログベースの指標を用いて Dataflow ストリーミング ジョブのステータスを監視する方法を紹介しました。
実はこの記事が私にとって初めての投稿です。
記事を書くにあたり読者に分かりやすく伝えることの難しさを痛感しましたが、一方で自分の経験を共有することの楽しさにも気づかされました。
弊社のサポート窓口には、この他にもさまざまなお問い合わせが寄せられています。
今後も、サポート業務での事例を元に、読者の皆様に役立つ情報を発信していきます。
Discussion