🗿

Google Cloud Workflowsを使ってみよう

2024/07/19に公開

概要

データのETL(データ抽出・加工・ロード)やバックエンドの処理等で一連の流れで数珠繋ぎで実行した場合があります。
そんな時に使うのがワークフローで、今回はGoogle CloudのWorkflowsを紹介します。

Workflowsとは

WorkflowsはGoogle Cloudのフルマネージドのサービスで、Cloud RunやCloud Functions等の各サービスを定義した順序で実行できます。また、BigQueryやCloud Storage等のコネクタが用意されているため、Cloud StorageからBigQueryに取り込み、BigQueryでデータ加工するといったETLの処理を簡単に実装できます。

ワークフローを実現できるサービスはGoogle Cloudに他にもあります。

項目 Workflows CloudComposer Vertex AI Pipelines
仕組み Google独自サービス Airflow(OSS) Kubeflow(OSS)
スケジュール実行 Cloud Shedulerから実行 AirflowのDAG定義で実行 Vertex AI Pipelinesの scheduler APIで実行
使用可能リソース 呼び出し先のサービスによる
CloudRunの場合
・CPU:1〜8
・メモリ:512Mib〜32Gib
ワーカーの構成による
・CPU:0.5〜28
・メモリ:0.5GB〜80GB
・CPU:1〜96
・メモリ:最大624GB
・GPU/TPU利用可能
コスト 5,000ステップまで無料。
1,000 ステップあたり $0.01
CloudRun等のサービス実行費用は別途
最小構成でも月$350程度 パイプライン実行ごとに$0.03
使用リソースの料金は別途
リトライ 失敗したタスクからのリトライは不可
最初から再実行となる
失敗したタスクからリトライ可能 失敗したタスクからのリトライは不可
最初から再実行となる
学習コスト Google独自サービスのため独自の構文であるがYAML/JSON形式で学習コストは高くない pythonベース
Operatorと呼ばれる処理を作成し、呼び出し順を定義する。一定の学習が必要
pythonベース
Componentと呼ばれる処理を作成し、呼び出し順を定義する。一定の学習が必要

各ワークフローのサービスにメリデメはありますが、どれを使用したらよいかはざっくり以下のようなイメージとなります。

  • 簡単にワークフローを組みたい → Workflows
  • 複雑なワークフローを組む必要がある → Cloud Composer
  • GPU/TPU等を利用した機械学習等の重い処理も実行したい → Vertex AI Pipelines

Workflowsの使用方法

ここからは、Workflowsの使用方法を見ていきます。
Google Cloudのコンソールで、ワークフローの作成ボタンを押し進めていくと、サンプルのワークフロー定義が入力され、右側に各STEPが可視化されています。

このサンプルは、文字列をINPUTに実行しその文字列のWikipedia記事一覧を返すというものになります。httpのコネクタが用意されているので、簡単にhttp通信を行うことができます。

その他にもコードのサンプルが用意されているので、参考に実装することができます。

以降に基本的な構文をいくつか記載します。

main

main:
    params: [input]
    steps:
        - STEP_NAME:  

引数のパラメータの定義をします。steps以降に各STEPの処理を定義します。
※mainは、引数の定義やサブワークフローを使用しない場合は省略可能ですが、ベストプラクティスとしてmainの記載が推奨されています。

STEP

基本形

基本は以下のようになります。callにはhttpのリクエストや、コネクタを利用してCloudRun等を呼び出すことができます。
resultに処理結果を入れることができ、以降のSTEPで参照することができます。

    - STEP_NAME: ← STEPの名前を記載
        call: FUNCTION_NAME ← 呼び出す処理名を記載
        args:
            ARG_1: VALUE_1 ← 処理に引数があれば指定
        result: OUTPUT_VARIABLE ← 処理結果を変数に格納

switch

switchはcase文のように条件分岐することができ、nextで次に呼び出すSTEPを定義します。

    - STEP_NAME_A: ← STEPの名前を記載
        switch:
            - condition: ${EXPRESSION_A} ← 条件1
              next: STEP_NAME_B ← 条件1のときに呼び出すSTEP名
            - condition: ${EXPRESSION_B} ← 条件2
              next: STEP_NAME_C ← 条件2のときに呼び出すSTEP名
        next: STEP_NAME_D ← 条件1,2以外のときに呼び出すSTEP名

parallel

処理を並列に実行したい場合は、parallelを使用します。
sharedで共有変数を定義することができ、並列実行した各結果を代入することができます。

    - STEP_NAME_A:
        parallel:
          shared: [A, B]
          branches:
            - parallel_name_A:
                steps:
                  - call_name_A:
                      call: FUNCTION_NAME
                      args:
                        ARG_1: VALUE_1
                      result: A
            - parallel_name_B:
                steps:
                  - call_name_B:
                      call: FUNCTION_NAME
                      args:
                        ARG_1: VALUE_1
                      result: B

ここまでよく使いそうな構文を紹介しましたが、他にもよくプログラムで使う構文が用意されていますので、公式のドキュメントを参照してみてください。

Workflowsを実行してみよう

さて、ここまでWorkflowsの作り方を紹介してきましたが、まだどうやって作ったらよいのかわからないという方は、Geminiに聞いてみましょう。
Geminiのコンソール上で、作りたいWorkflowを聞いてみます。
すると、Workflowsの定義と説明まで答えてくれます。

サンプルのワークフローの記載を置き換え、projectId、bucket、dataset、tableを設定してデプロイボタンを押します。

ワークフローが登録できたら、実行します。
無事に成功しました。

用意したcsvのデータがbigqueryのテーブルに入っていることも確認できました。

この後に、dataformやCloudRunで実装したdbtを実行することで、ETL(ELT)といった処理を実装することができます。

まとめ

ワークフローを実現するサービスやOSSはいくつかありますが、Workflowsは最も手軽に実装できるサービスだとと思います。一般的なプログラムの構文は実現できるので、やりたいことはできるのではないでしょうか。また、わからないことはGeminiに聞いてみると答えてくれるかもしれません。

合同会社カメレオンミーム Tech Blog

Discussion