🎉

Dataflow の費用モニタリングが一般提供(GA)になりました

2023/09/25に公開

はじめに

こんにちは、クラウドエース データML ディビジョン所属の篠田です。
クラウドエースのITエンジニアリングを担うシステム開発部の中で、特にデータ基盤構築・分析基盤構築からデータ分析までを含む一貫したデータ課題の解決を専門とするのがデータML ディビジョンです。

データML ディビジョンでは活動の一環として、毎週Google Cloud の新規リリースを調査・発表し、データ領域のプロダクトのキャッチアップをしています。その中でも重要と考えるリリースを本ページ含め記事として公開しています。

1.概要

今回紹介するリリースは、Dataflow ジョブの推定費用が、Google Cloud コンソールの[費用]ページより確認できるようになったことについてです。(2023年8月15日一般提供)

本記事では、下記について触れます。

  • リリースの詳細
  • ジョブの推定費用が設定値を超えた際のアラート発報方法について
  • ジョブの推定費用が設定値を超えた際のジョブ停止方法について

まず初めに、Dataflow や Cloud Monitoring の概要についてです。

1.1 Dataflow概要

Dataflowは、さまざまなデータ処理パターンの実行に対応したGoogle Cloudのサーバーレス サービスです。
Dataflow で実行するデータ処理パイプラインは、Apache Beam で記述します。
Apache Beam とは、バッチ処理やストリーミング処理といったデータ処理パイプラインを定義・実行するための、オープンソース 統合プログラミングモデルです。
そして、Apache Beam で記述されたパイプラインは、さまざまな実行エンジンで実行することができます。
Dataflowは数ある実行エンジンの内の一つであり、ビッグデータを処理するための最適な実行環境を提供します。
ユーザーはインフラストラクチャの管理を心配することなく、データ処理パイプラインのプログラミングに集中できます。

1.2 Dataflow と Cloud Monitoring

DataflowはCloud Monitoringと統合されており、これにより以下のようなメリットが提供されます。

  • リアルタイムのパフォーマンス監視:Cloud Monitoringを使用すると、Dataflowジョブのパフォーマンスメトリクスをリアルタイムで視覚化し、監視することができます。これにより、ジョブの状態を随時確認し、必要に応じて調整することが可能です。

  • アラート機能:特定のメトリクスが閾値を超えた場合に通知を受け取ることができます。これにより、問題が発生した際に迅速に対応することが可能です。

  • ロギングとトラブルシューティング:Cloud MonitoringはDataflowジョブのログ情報にもアクセスできます。これにより、問題の診断やトラブルシューティングが容易です。

  • リソースの最適化:Cloud Monitoringを使用すると、リソースの使用状況を理解し、必要に応じてリソースを最適化することができます。これにより、コストを抑えつつパフォーマンスを維持することが可能です。

これらのメリットにより、DataflowとCloud Monitoringの統合は、データ処理ジョブのパフォーマンスを最大限に引き出すための強力なツールとなります。

2.リリース概要

今回のリリースで、Google Cloud コンソールの[費用]ページに、 Dataflow ジョブの推定費用が表示されるようになりました

ジョブで使用しているどのリソースに対して、どれだけの費用がかかっているかを、下図のようにコンソールより簡単にモニタリングできます。

1_image.png

リソースのタイプには下記があります。

  • vCPU
  • メモリ
  • 処理された データ
  • HDD
  • SSD

2.1.推定費用の算出方法

ジョブの推定費用は、Cloud MonitoringのDataflow指標(各種リソースの使用量)を元に算出されます。具体的には、ジョブが動作しているリージョンのリソース単価を、使用したリソース量に乗じることで計算します。

※注意点
推定費用は、実際のジョブ費用が反映されていない場合があります。(契約上の割引や、一時的な請求額の調整などが考慮されない為)
実際にかかった費用を確認するには、Google Cloud コンソールで Cloud 請求先アカウントの Cloud Billing レポートを確認する必要があります。

2.2.費用モニタリングの対象ジョブ

ジョブの費用モニタリングは、バッチジョブ(FlexRSを含む)とストリーミング ジョブで使用できます。
Dataflow Prime ジョブでは使用できません。
Prime_job_image.png

ジョブ 可否
バッチ ジョブ
ストリーミング ジョブ
Dataflow Prime ジョブ ×

また、GPUを使うジョブの場合、GPUリソース使用量に対する推定費用は費用モニタリングに含まれないので注意が必要です。

3.ストリーミングジョブをモニタリングしてみる

本章では、実行中のストリーミング ジョブ の費用モニタリングを見ていきます。

下図のように、ストリーミング ジョブが4時間29分(経過時間)動き続けています。
2_image.png

ジョブの[費用]タブを見ます。
3_image.png
このストリーミング ジョブでは、ワーカーVMが1台常時動いています。
また、データ処理はしていないです。
なので、ワーカーVMの起動分だけ費用が発生し、線形的にvCPUおよびメモリの推定費用が増加しています。

総費用を見てみると、現在までの推定総費用は、$1.13 であることがわかります。
また、各リソースごと(vCPU,メモリなど)にかかった推定費用もわかります。
例えは、vCPUに$0.94、メモリに$0.18 費用がかかっていると推定されます。

3.1 推定する時間枠の変更

モニタリング時間枠を変更することによって、特定の期間内に発生した推定費用を表示させることもできます。
例えば、期間を、2023/09/11 16:00:00 JST ~ 2023/09/11 17:00:00 JST と設定します。
すると、選択した範囲の費用は$0.22と推定することができます。
4_image.png

3.2 推定費用の調整

Google Cloud リソース使用料の割引率が事前にわかっている場合、割引後の推定費用を算出することができます。
例えば仮に、全てのリソースに対して3%割引の場合、調整額を3%に設定します。
すると、3%割引を考慮した推定費用を算出できます。

5_image.png

4.アラートを発報してみる

Cloud Monitoring の アラートポリシーを使うことによって、ジョブの推定費用が設定値を超えた際にアラートを発報することができます。(この方法は今回のリリースに関係なく、以前からできました)

まず、Google Cloud コンソール より「アラートの作成」をクリックします。
6_image.png

すると、Cloud Monitoring の アラートポリシーを作成する画面に遷移します。
7_image.png

アラートポリシーは、MQL(Monitoring Query Language)クエリを使って作成することも可能です。
具体的には、上記画面左側にある「Edit Query」でMQLクエリを記述します。
クエリを実行すると、画面右側にMQLチャートが生成されます。
今回のケースでは、「アラートの作成」をクリックしたことで、MQLクエリが自動的に生成されました。
今回のリリースのメリットとして、「アラートの作成」をクリックをするだけで、そのジョブに対するMQLが自動生成されることが挙げられます。

生成されたMQL クエリは下記です。

fetch dataflow_job
| {
{
{
{
metric 'dataflow.googleapis.com/job/total_vcpu_time'
|filter resource.project_id == <project_id>
&& metric.job_id == '2023-09-10_19_27_35-7028866276887309219'
|group_by 1m,
    [value_total_vcpu_time_max: max(value.total_vcpu_time)]
|div 3600  # convert from seconds to hours
|mul 0.0897  # cost per vCPU per hour in asia-northeast1
|mul 1  # discount factor
|cast_units '1'  # convert from time units to number units for $
|every 1m;

metric 'dataflow.googleapis.com/job/total_memory_usage_time'
|filter resource.project_id == <project_id>
&& metric.job_id == '2023-09-10_19_27_35-7028866276887309219'
|group_by 1m,
    [value_total_memory_usage_time_max: max(value.total_memory_usage_time)]
|div 3600  # convert from seconds to hours
|mul 0.0046241  # cost per GB per hour in asia-northeast1
|mul 1  # discount factor
|cast_units '1'  # convert from time units to number units for $
|every 1m
}
| join
| add;

metric 'dataflow.googleapis.com/job/total_streaming_data_processed'
|filter resource.project_id == <project_id>
&& metric.job_id == '2023-09-10_19_27_35-7028866276887309219'
|group_by 1m,
    [value_total_streaming_data_processed_max: max(value.total_streaming_data_processed)]
|div 1000000000  # convert from B to GB
|mul 0.0234  # cost per GB in asia-northeast1
|mul 1  # discount factor
|cast_units '1'
|every 1m
}
| join
| add;

metric 'dataflow.googleapis.com/job/total_pd_usage_time'
|filter resource.project_id == <project_id>
&& metric.job_id == '2023-09-10_19_27_35-7028866276887309219'
&& metric.storage_type == 'HDD'
|group_by []
|div 3600  # convert from seconds to hours
|mul 0.0000702  # cost per GB per hour in asia-northeast1
|mul 1  # discount factor
|cast_units '1'  # convert from time units to number units for $
|every 1m
}
| join
| add;

metric 'dataflow.googleapis.com/job/total_pd_usage_time'
|filter resource.project_id == <project_id>
&& metric.job_id == '2023-09-10_19_27_35-7028866276887309219'
&& metric.storage_type == 'SSD'
|group_by []
|div 3600  # convert from seconds to hours
|mul 0.0003874  # cost per GB per hour in asia-northeast1
|mul 1  # discount factor
|cast_units '1'  # convert from time units to number units for $
|every 1m
}
| join
| add
# Edit the below value to change the threshold
| condition val() > 1000

アラートの発報条件は、上記クエリの最後の行、condition val() > 1000 の箇所で設定しています。
閾値を1000(今回の場合、$1000)に設定しています。
つまり、ジョブ推定費用のTotalが$1000超えると、アラートが発報します。
MQLクエリは編集できるので、適宜条件を変更できます。

4.1通知チャンネルの作成

アラートが発報した際のアクションは、通知チャンネルを使用することによって設定できます。
Slack ,Webhooks ,Email ,SMS ,Pub/Sub など色々とオプションがあります。

今回は、自分のメールアドレス宛にアラートを送る設定をしていきます。

下図は通知チャンネルの作成画面です。
今回は、Emailの通知チャンネルを作成しました。

8_image.png

4.2アラートポリシーの作成とアラート発報

そして、作成した通知チャンネルを選択します。
9_image.png

最後に、「ポリシーを作成」を押して、アラートポリシーの作成は完了です。
ジョブ推定費用のTotalが$1000を超えると、アラートが発報し、自分のメールアドレス宛にアラート通知が送られます。(実際には閾値を少額に設定し、アラート発報テストしました)
下図は、アラートが発報された際に届いたメールです。

email_image.png

5.アラートの発報に伴いジョブを自動停止してみる

今回リリースのDataflowコンソールからの費用モニタリング機能を使用して、Dataflowジョブを停止する設定は存在しません。
同様に、Cloud Monitoringのアラートポリシーを使ってDataflowジョブを直接停止することもできません。

その為、Dataflow ジョブを自動停止させるアーキテクチャの例として、下図アーキテクチャが考えられます。
アラートポリシーのアラート発火に伴い、Pub/Subへメッセージをパブリッシュします。
そして、Pub/SubからのメッセージをトリガーとしてCloud Functions の関数からDataflow REST APIを呼び出すことにより、Dataflow ジョブを停止します。

architecture_image.png

それぞれの役割を簡単にまとめると下記です。

  1. Cloud Monitoring
    Google Cloudの監視ツールです。様々な指標(今回の場合は、Dataflow の指標)を監視します。

  2. アラートポリシー (Cloud Monitoring)
    アラートポリシーを設定することで、特定の条件を満たした場合に通知を送ることができます。
    今回は、MQL を使用してアラート ポリシーを作成します。
    MQL クエリが内部的に定期実行され、Dataflow の指標を監視します。
    そしてクエリ結果が一定の閾値を超えた場合、アラートを発報します。

  3. Pub/Sub 通知チャンネル (Cloud Monitoring)
    Pub/Subを通知チャンネルとして使用する場合、Cloud MonitoringからのアラートはPub/Subのトピックに送信されます。
    Pub/Subのトピックに送信されるメッセージには、Dataflow job のproject_id ,region ,job_id なども含まれます。

  4. Pub/Sub
    Pub/SubはGoogle Cloudのメッセージングサービスです。
    メッセージを発行(publish)し、それを購読(subscribe)することで、システム間で情報を即時に共有できます。

  5. Cloud Functions
    Cloud Functionsは、イベント駆動型のサーバーレスサービスです。
    Pub/Subからのメッセージをトリガーとして、定義した関数を実行します。
    関数内には、REST APIを呼び出すコードを書きます。
    Dataflow REST APIを呼び出すことにより、Dataflow ジョブを停止することができます。

  6. Dataflow
    Cloud FunctionsからのAPI呼び出しにより、Dataflow ジョブを自動的に停止します。

このように、Google Cloud の各サービスを組み合わせることで、リアルタイムの監視と自動化が可能です。

最後に、コンソール画面や、Cloud Functions で書いたコードを交えながら見ていきます。

下図は、アラートポリシーの設定後です。ジョブ推定費用のTotalが$0.1を超えると、アラートが発報します。
10_image.png

下図は、Dataflowのストリーミング ジョブ が動いている様子です。このジョブを停止させます。
11_image.png

下記コードは、Cloud Functions(第2世代)に記述したコードです。
アラートが発報されると、Pub/Subからのメッセージをトリガーとして実行されます。

main.py
import base64
from googleapiclient.discovery import build
import functions_framework
import json

# Triggered from a message on a Cloud Pub/Sub topic.
@functions_framework.cloud_event
def stop_dataflow_job(cloud_event):
    data = cloud_event.data['message']['data']
    # メッセージのデータフィールドをBase64デコード
    decoded_data = base64.b64decode(data).decode('utf-8')
    
    # decoded_dataをPythonの辞書として読み込み
    data_dict = json.loads(decoded_data)
    
    # 必要な情報を抽出
    project_id = data_dict['incident']['resource']['labels']['project_id']
    region = data_dict['incident']['resource']['labels']['region']
    job_id = data_dict['incident']['metric']['labels']['job_id']

    # Dataflow REST APIを呼び出し、JOBのステータスを変更(停止)する
    service = build('dataflow', 'v1b3')
    request = service.projects().locations().jobs().update(
        projectId = project_id,
        location = region,
        jobId = job_id,
        body = {'requestedState': 'JOB_STATE_CANCELLED'}
    )
    response = request.execute()
requirements.txt
functions-framework==3.*
google-api-python-client

時間が経つと、アラートが発報しました。
そして、Cloud Functionsの関数が実行されます。
APIリクエストが成功しました。
13_image.png

そして無事、設定した閾値を超えたDataflow ストリーミング ジョブ は停止しました。
12_image.png

まとめ

ジョブごとの推定費用をDataflow コンソールからモニタリングできることで、各ジョブ コストの監視・制御が容易になりました。
アラートポリシーで費用の閾値を設定することで、必要に応じてアラートを発報したり、ジョブを停止したりすることが可能です。
これにより、予期せぬリソース消費による請求費用の急増を防ぐことができそうです。

最後に、コストの見直し方法については、Google Cloud 公式ブログやBeam Summit 動画によくまとめられている為、ご紹介します。

Discussion