🎢

Cloud Workflows を触ってみた(Basic)

2021/08/01に公開

はじめに

これまで Google Cloud には AWS Step Functions みたいに複数のサービスをオーケストレーションする役割を持ったサービスが存在しなかったんですが、ついに Google Cloud Workflows っていう名称で登場してきました。去年の夏ごろに Beta 版がリリースされ、今年頭には GA になり結構力を入れて改善し続けている印象が強いと感じています。そして、Workflow から他の Google Cloud サービスとの連携をサポートしてくれる Cloud Workflow Connector っていう機能も Pre-GA として一緒に公開されて複数のサービスを組み合わせる作業が宣言的な書き方で楽になるんじゃないかなと思いました。

リリース情報

  • 2021/01/25
    • Cloud Workflows の GA 版、Cloud Workflow Connector の Pre-GA 版
  • 2020/08/25
    • Cloud Workflows の Beta 版

https://cloud.google.com/workflows/docs/release-notes

ユースケース

  • 複数のサービスをまとめて段階的に処理
  • 一つ以上の API が必要なトランザクション
  • インフラ周りの自動化

登場人物

  • Workflow

    • 複数の Step や Sub Workflow で構成されている
    • YAML と JSON で定義が可能
      • YAML おすすめ
  • Sub Workflow

    • 言葉通り子 Workflow
  • Step

    • 実行単位
  • Call

    • 実行タイプ
  • Result

    • 実行結果
  • Return

    • 実行完了

課金

基準

  • GCP 内部サービスとの通信
    • 1,000 Step あたり $0.01
  • GCP 外部サービスとの通信
    • 1,000 Step あたり $0.025

無料枠

  • GCP 内部サービスとの通信
    • 5,000 Step まで
  • GCP 外部サービスとの通信
    • 2,000 Step まで

ref. https://cloud.google.com/workflows/pricing

サポートされているリージョン(2021/07/31 時点)

  • us-central1 (Iowa)
  • europe-west4 (Netherlands)
  • asia-southeast1 (Singapore)

ref. https://cloud.google.com/workflows/docs/locations

触ってみる

ひとまず、Cloud Workflows 実行に必要なリソースを定義しましょう。

  • 共通
    • Cloud Workflows API を有効化
$ gcloud services enable workflows.googleapis.com

a. gcloud

$ gcloud iam service-accounts create workflow-demo
$ gcloud projects add-iam-policy-binding PROJECT_ID \
    --member "serviceAccount:workflow-demo@PROJECT_ID.iam.gserviceaccount.com" \
    --role "roles/logging.logWriter"
$ gcloud workflows deploy demo --source=demo-workflow.yaml --service-account=workflow-demo@PROJECT_ID.iam.gserviceaccount.com

b. Terraform

Cloud Workflows リソース (google_workflows_workflow) は google provider v3.59.0 以上から使えるんですが、v3.67.0 未満だと yaml (source_contents) を更新するたびに Workflow リソースが削除されてから生成される実装になっているので、v3.67.0 以上のバージョンを利用した方が良いと思います。

resource "google_service_account" "workflow_demo" {
  account_id = "workflow-demo"
}

variable "workflow_demo_roles" {
  default = [
    "roles/logging.logWriter"
  ]
}

resource "google_project_iam_member" "workflow_demo" {
  for_each = toset(var.workflow_demo_roles)
  role     = each.value
  member   = "serviceAccount:${google_service_account.workflow_demo.email}"
}

resource "google_workflows_workflow" "demo" {
  name            = "demo"
  region          = "us-central1"
  service_account = google_service_account.workflow_demo.id
  source_contents = templatefile("../${path.module}/demo-workflow.yaml", {})
}

ref. https://registry.terraform.io/providers/hashicorp/google/latest/docs/resources/workflows_workflow, https://github.com/hashicorp/terraform-provider-google/releases/tag/v3.67.0

環境変数の定義

assign を利用します。
Workflow にビルトインされている環境変数を別名で定義できるし、Workflow を実行する時に渡せるパラメーターを受け取って定義することもできます。

main:
  params: [args]
  steps:
  - init:
      assign:
        - project_id: ${sys.get_env("GOOGLE_CLOUD_PROJECT_ID")}
	- foo: ${args.foo}
  - finish:
      return: ${foo}
$ gcloud workflows execute demo --data={\"foo\":\"foo\"}
$ gcloud workflows executions wait-last --format=json | jq .result
"\"foo\""

ref. https://cloud.google.com/workflows/docs/reference/environment-variables, https://cloud.google.com/workflows/docs/passing-runtime-arguments

条件分岐の定義

switch を利用します。
普通のプログラミング言語の if-else ステートメントみたいな挙動が実現可能です。

main:
  params: [args]
  steps:
  - init:
      assign:
        - score: ${args.score}
  - check:
      switch:
        - condition: ${score % 2 == 0}
          next: this_is_even
      next: this_is_odd
  - this_is_even:
      return: "Score is even number."
  - this_is_odd:
      return: "Score is odd number."
$ gcloud workflows execute demo --data={\"score\":100}
$ gcloud workflows executions wait-last --format=json | jq .result
"\"Score is even number.\""

ステップ分岐の定義

next を利用します。
普通のプログラミング言語の goto ステートメントみたいな挙動が実現可能です。

main:
  steps:
  - init:
      assign:
        - bar: bar
      next: finish
  - will_be_skipped:
      return: skip
  - finish:
      return: ${bar}
$ gcloud workflows execute demo
$ gcloud workflows executions wait-last --format=json | jq .result
"\"bar\""

一応、next: end を定義したら return value がないまま Workflow が終了されるのでユースケースに合う場合は使える場面があるかなと思います。

main:
  steps:
  - init:
      assign:
        - bar: bar
      next: end
  - finish:
      return: ${bar}

内部関数

https://cloud.google.com/workflows/docs/reference/stdlib/overview

標準ライブラリーが用意されているので、ある程度柔軟な制御はできると思います。

  • base64
  • http
  • json
  • list
  • retry
  • sys
  • text
  • など…

ロギング

sys.log を利用します。
ログメッセージには文字列以外に様々な型も定義できるし、ログのレベルも severity で設定することができます。ただし、Workflow に付けるサービスアカウントの権限に roles/logging.logWriter が必要になるので忘れないようにしましょう。

main:
  steps:
  - init:
      assign:
        - greeting: Hello World
  - logging:
      call: sys.log
      args:
        text: ${greeting}
        severity: INFO
  - finish:
      return: true
$ gcloud workflows execute demo
$ gcloud logging read "resource.type=workflows.googleapis.com/Workflow" --limit 2 --format json | jq '.[].textPayload'
null
"Hello World"

ref. https://cloud.google.com/workflows/docs/reference/stdlib/sys/log

エラーハンドリング

try/except を利用します。

main:
  steps:
  - init:
      assign:
        - project_id: ${sys.get_env("GOOGLE_CLOUD_PROJECT_ID")}

  - create_document:
      try:
        call: googleapis.firestore.v1.projects.databases.documents.createDocument
        args:
          parent: ${"projects/" + project_id + "/databases/(default)/documents/demo/v0"}
          collectionId: users
          documentId: johnsmith
          body:
            fields:
              first_name:
                stringValue: John
              last_name:
                stringValue: Smith
              email:
                stringValue: johnsmith@gmail.com
              locale:
                stringValue: english
        result: resp
      except:
        as: e
        steps:
          - known_erros:
              switch:
              - condition: ${e.code == 403}
                return: "403 PERMISSION_DENIED"
          - unknown_errors:
              return: ${e}

  - get_document:
      try:
        call: googleapis.firestore.v1.projects.databases.documents.get
        args:
          name: ${"projects/" + project_id + "/databases/(default)/documents/demo/v0/users/johnsmith"}
          mask:
            fieldPaths:
              - email
        result: resp
      except:
        as: e
        steps:
          - known_erros:
              switch:
              - condition: ${e.code == 401}
                return: "401 UNAUTHENTICATED"
              - condition: ${e.code == 403}
                return: "403 PERMISSION_DENIED"
              - condition: ${e.code == 404}
                return: "404 NOT FOUND"
          - unknown_errors:
              raise: ${e}

  - logging:
      call: sys.log
      args:
        text: ${resp}
        severity: INFO

  - finish:
      return: true

Cloud Firestore への書き込み権限がなくて ${e.code == 403} 条件に引っかかっていますね。

$ gcloud workflows execute demo
$ gcloud workflows executions wait-last --format=json | jq .result
"\"403 PERMISSION_DENIED\""

Workflow のサービスアカウントに roles/datastore.user を付けて再度実行してみましょう。無事に true が返却されました。ログにも Cloud Firestore に書き込んだデータの情報が出力されているので問題なさそうですね。

$ gcloud workflows execute demo
$ gcloud workflows executions wait-last --format=json | jq .result
"\"true\""
$ gcloud logging read "resource.type=workflows.googleapis.com/Workflow" --limit 2 --format json | jq '.[].textPayload'
null
"{\"createTime\":\"2021-07-31T13:30:27.084976Z\",\"fields\":{\"email\":{\"stringValue\":\"johnsmith@gmail.com\"}},\"name\":\"projects/PROJECT_ID/databases/(default)/documents/demo/v0/users/johnsmith\",\"updateTime\":\"2021-07-31T13:30:27.084976Z\"}"

ref. https://cloud.google.com/workflows/docs/reference/syntax/catching-errors

スケジューリング

Cloud Scheduler を組んで置けば定期的な実行が可能になります。

resource "google_service_account" "scheduler_demo" {
  account_id = "scheduler-demo"
}

variable "scheduler_demo_roles" {
  default = [
    "roles/workflows.invoker"
  ]
}

resource "google_project_iam_member" "scheduler_demo" {
  for_each = toset(var.scheduler_demo_roles)
  role     = each.value
  member   = "serviceAccount:${google_service_account.scheduler_demo.email}"
}

resource "google_cloud_scheduler_job" "demo" {
  name             = "workflow-demo-job"
  schedule         = "5 23 * * *"
  time_zone        = "Asia/Tokyo"
  attempt_deadline = "300s"

  retry_config {
    retry_count = 1
  }

  http_target {
    http_method = "POST"
    oauth_token {
      service_account_email = google_service_account.scheduler_demo.email
    }
    uri         = "https://workflowexecutions.googleapis.com/v1/projects/PROJECT_ID/locations/us-central1/workflows/demo/executions"
    body        = base64encode("{\"argument\":\"{}\"}")
  }
}

それぞれのリソースについて Cloud Logging を確認してみたんですが問題なさそうですね。

$ gcloud logging read "resource.type=cloud_scheduler_job" --limit 2 --format json
[
  {
    "httpRequest": {
      "status": 200
    },
    ...
  },
  {
    ...
  }
]
$ gcloud logging read "resource.type=workflows.googleapis.com/Workflow" --limit 2 --format json | jq '.[].textPayload'
null
"{\"createTime\":\"2021-07-31T14:05:01.024365Z\",\"fields\":{\"email\":{\"stringValue\":\"johnsmith@gmail.com\"}},\"name\":\"projects/PROJECT_ID/databases/(default)/documents/demo/v0/users/johnsmith\",\"updateTime\":\"2021-07-31T14:05:01.024365Z\"}"

ref. https://cloud.google.com/workflows/docs/schedule-workflow, https://registry.terraform.io/providers/hashicorp/google/latest/docs/resources/cloud_scheduler_job

デバッグ

現状、ローカルでデバッグをサポートする仕組みが存在していないので、毎回改修した YAML もしくは JSON を該当する Workflow にデプロイして動作確認を行う必要があります。

# 生成
$ gcloud workflows deploy WORKFLOW_NAME --source=xxx.yaml --service-account=SERVICE_ACCOUNT_NAME@PROJECT_ID.iam.gserviceaccount.com
$ gcloud workflows execute WORKFLOW_NAME
$ gcloud workflows executions wait-last --format=json

# 更新
$ gcloud workflows deploy WORKFLOW_NAME --source=xxx.yaml
$ gcloud workflows execute WORKFLOW_NAME
$ gcloud workflows executions wait-last --format=json

# Cloud Logging 確認
$ gcloud logging read "resource.type=workflows.googleapis.com/Workflow" --limit 5 --format json

Workflow 単体では gcloud だけで問題ないと思いますが、Workflow 以外のリソースもデバッグの対象にしたい場合、Terraform に定義した方が一番楽かなと個人的に思っています。

参考

Discussion