Google Cloud Workflowsを使ってみよう
概要
データのETL(データ抽出・加工・ロード)やバックエンドの処理等で一連の流れで数珠繋ぎで実行した場合があります。
そんな時に使うのがワークフローで、今回はGoogle CloudのWorkflowsを紹介します。
Workflowsとは
WorkflowsはGoogle Cloudのフルマネージドのサービスで、Cloud RunやCloud Functions等の各サービスを定義した順序で実行できます。また、BigQueryやCloud Storage等のコネクタが用意されているため、Cloud StorageからBigQueryに取り込み、BigQueryでデータ加工するといったETLの処理を簡単に実装できます。
ワークフローを実現できるサービスはGoogle Cloudに他にもあります。
項目 | Workflows | CloudComposer | Vertex AI Pipelines |
---|---|---|---|
仕組み | Google独自サービス | Airflow(OSS) | Kubeflow(OSS) |
スケジュール実行 | Cloud Shedulerから実行 | AirflowのDAG定義で実行 | Vertex AI Pipelinesの scheduler APIで実行 |
使用可能リソース | 呼び出し先のサービスによる CloudRunの場合 ・CPU:1〜8 ・メモリ:512Mib〜32Gib |
ワーカーの構成による ・CPU:0.5〜28 ・メモリ:0.5GB〜80GB |
・CPU:1〜96 ・メモリ:最大624GB ・GPU/TPU利用可能 |
コスト | 5,000ステップまで無料。 1,000 ステップあたり $0.01 CloudRun等のサービス実行費用は別途 |
最小構成でも月$350程度 | パイプライン実行ごとに$0.03 使用リソースの料金は別途 |
リトライ | 失敗したタスクからのリトライは不可 最初から再実行となる |
失敗したタスクからリトライ可能 | 失敗したタスクからのリトライは不可 最初から再実行となる |
学習コスト | Google独自サービスのため独自の構文であるがYAML/JSON形式で学習コストは高くない | pythonベース Operatorと呼ばれる処理を作成し、呼び出し順を定義する。一定の学習が必要 |
pythonベース Componentと呼ばれる処理を作成し、呼び出し順を定義する。一定の学習が必要 |
各ワークフローのサービスにメリデメはありますが、どれを使用したらよいかはざっくり以下のようなイメージとなります。
- 簡単にワークフローを組みたい → Workflows
- 複雑なワークフローを組む必要がある → Cloud Composer
- GPU/TPU等を利用した機械学習等の重い処理も実行したい → Vertex AI Pipelines
Workflowsの使用方法
ここからは、Workflowsの使用方法を見ていきます。
Google Cloudのコンソールで、ワークフローの作成ボタンを押し進めていくと、サンプルのワークフロー定義が入力され、右側に各STEPが可視化されています。
このサンプルは、文字列をINPUTに実行しその文字列のWikipedia記事一覧を返すというものになります。httpのコネクタが用意されているので、簡単にhttp通信を行うことができます。
その他にもコードのサンプルが用意されているので、参考に実装することができます。
以降に基本的な構文をいくつか記載します。
main
main:
params: [input]
steps:
- STEP_NAME:
引数のパラメータの定義をします。steps以降に各STEPの処理を定義します。
※mainは、引数の定義やサブワークフローを使用しない場合は省略可能ですが、ベストプラクティスとしてmainの記載が推奨されています。
STEP
基本形
基本は以下のようになります。callにはhttpのリクエストや、コネクタを利用してCloudRun等を呼び出すことができます。
resultに処理結果を入れることができ、以降のSTEPで参照することができます。
- STEP_NAME: ← STEPの名前を記載
call: FUNCTION_NAME ← 呼び出す処理名を記載
args:
ARG_1: VALUE_1 ← 処理に引数があれば指定
result: OUTPUT_VARIABLE ← 処理結果を変数に格納
switch
switchはcase文のように条件分岐することができ、nextで次に呼び出すSTEPを定義します。
- STEP_NAME_A: ← STEPの名前を記載
switch:
- condition: ${EXPRESSION_A} ← 条件1
next: STEP_NAME_B ← 条件1のときに呼び出すSTEP名
- condition: ${EXPRESSION_B} ← 条件2
next: STEP_NAME_C ← 条件2のときに呼び出すSTEP名
next: STEP_NAME_D ← 条件1,2以外のときに呼び出すSTEP名
parallel
処理を並列に実行したい場合は、parallelを使用します。
sharedで共有変数を定義することができ、並列実行した各結果を代入することができます。
- STEP_NAME_A:
parallel:
shared: [A, B]
branches:
- parallel_name_A:
steps:
- call_name_A:
call: FUNCTION_NAME
args:
ARG_1: VALUE_1
result: A
- parallel_name_B:
steps:
- call_name_B:
call: FUNCTION_NAME
args:
ARG_1: VALUE_1
result: B
ここまでよく使いそうな構文を紹介しましたが、他にもよくプログラムで使う構文が用意されていますので、公式のドキュメントを参照してみてください。
Workflowsを実行してみよう
さて、ここまでWorkflowsの作り方を紹介してきましたが、まだどうやって作ったらよいのかわからないという方は、Geminiに聞いてみましょう。
Geminiのコンソール上で、作りたいWorkflowを聞いてみます。
すると、Workflowsの定義と説明まで答えてくれます。
サンプルのワークフローの記載を置き換え、projectId、bucket、dataset、tableを設定してデプロイボタンを押します。
ワークフローが登録できたら、実行します。
無事に成功しました。
用意したcsvのデータがbigqueryのテーブルに入っていることも確認できました。
この後に、dataformやCloudRunで実装したdbtを実行することで、ETL(ELT)といった処理を実装することができます。
まとめ
ワークフローを実現するサービスやOSSはいくつかありますが、Workflowsは最も手軽に実装できるサービスだとと思います。一般的なプログラムの構文は実現できるので、やりたいことはできるのではないでしょうか。また、わからないことはGeminiに聞いてみると答えてくれるかもしれません。
chameleonmeme.com/ きっかけは、偶然同じ現場で働いていたエンジニア3人の 「もっと仕事にのめり込んだり、熱中したいよね」という雑談でした。 営業から開発、サービスの提供まですべての工程を自分たちの手で行い、 気の合う仲間と楽しく仕事をすることで熱中するためにチームをスタートしました。
Discussion