🗂

【Google Cloud】 Workflowsのベストプラクティス - 親子ワークフローで実現する堅牢なオーケストレーション

に公開

はじめに

Google Cloud Workflowsは、複数のサービスやAPIを組み合わせて、複雑なビジネスプロセスを自動化するためのサーバーレスオーケストレーションサービスです。個々の処理を独立したワークフローとして実装した後、それらを組み合わせて一連の作業を実行する際、親子ワークフローの設計パターンが重要になります。

本記事では、特に一連の作業を一回の実行で完了したい場合に、エラーやリトライに強い親子ワークフローの設計方法を解説します。個々の手順をワークフロー化した次のステップとして、実践的なベストプラクティスを網羅的に紹介します。

以下の記事で紹介するコードは一例です。開発環境と用途に合うものを作成し必ず検証してください。

本記事で学べること:

  • 親子ワークフローの設計パターンと選択基準
  • エラー処理とリトライ戦略の実装方法
  • 直列実行における親子ワークフローの呼び出しパターン
  • 実践的なワークフロー設計のベストプラクティス
  • 具体的なユースケース(GKEアップデートなど)での実装例

1. 親子ワークフローの基本概念

1.1 親子ワークフローとは

Google Cloud Workflowsでは、1つのワークフロー(親ワークフロー)から別のワークフロー(子ワークフロー)を呼び出すことができます。これにより、複雑な処理をモジュール化し、再利用可能なワークフローを構築できます。

親子ワークフローのメリット

  • モジュール化: 個々の処理を独立したワークフローとして実装
  • 再利用性: 同じワークフローを複数の親ワークフローから呼び出し可能
  • 保守性: 個々の処理の変更が他のワークフローに影響しない
  • テスト容易性: 個々のワークフローを独立してテスト可能

1.2 呼び出しパターンの選択

一連の作業を実行する際、以下の2つのパターンが考えられます:

パターン1: 親が順次呼び出す(推奨)

パターン2: 各子が次の子を呼び出す(非推奨)

推奨パターン: 親が順次呼び出す

理由:

  • エラー処理が明確: 親ワークフローで一元的にエラー処理が可能
  • 実行フローの可視性: 親ワークフローを見るだけで全体の流れが分かる
  • 柔軟性: 実行順序の変更や条件分岐が容易
  • デバッグ容易性: どのステップで失敗したかが明確

2. 実践例:GKEアップデートワークフロー

2.1 ユースケースの定義

以下の手順を実行するワークフローを設計します:

  1. Podのステータス確認
  2. GKEのアップデート
  3. Podのステータス確認(アップデート後の確認)
  4. NewRelicのアラート確認

2.2 子ワークフローの実装

各手順を独立した子ワークフローとして実装します。

2.2.1 Podステータス確認ワークフロー

# check-pod-status.yaml
main:
  params: [projectId, clusterName, namespace, podName]
  steps:
    - check_pod_status:
        call: http.post
        args:
          url: https://container.googleapis.com/v1/projects/${projectId}/locations/${clusterName}/clusters/${clusterName}/pods/${namespace}/${podName}
          auth:
            type: OAuth2
        result: pod_status
    
    - return_result:
        return: ${pod_status}

2.2.2 GKEアップデートワークフロー

# update-gke.yaml
main:
  params: [projectId, clusterName, nodePoolName, version]
  steps:
    - update_node_pool:
        call: http.post
        args:
          url: https://container.googleapis.com/v1/projects/${projectId}/locations/${clusterName}/clusters/${clusterName}/nodePools/${nodePoolName}/update
          auth:
            type: OAuth2
          body:
            nodeVersion: ${version}
        result: update_result
    
    - wait_for_completion:
        call: sys.sleep
        args:
          seconds: 30
    
    - return_result:
        return: ${update_result}

2.2.3 NewRelicアラート確認ワークフロー

# check-newrelic-alerts.yaml
main:
  params: [apiKey, accountId, timeRange]
  steps:
    - get_alerts:
        call: http.get
        args:
          url: https://api.newrelic.com/v2/alerts.json
          headers:
            X-Api-Key: ${apiKey}
          query:
            account_id: ${accountId}
            time_range: ${timeRange}
        result: alerts
    
    - return_result:
        return: ${alerts}

2.3 親ワークフローの実装(基本版)

親ワークフローが各子ワークフローを順次呼び出すパターン:

# gke-update-orchestrator.yaml
main:
  params: [projectId, clusterName, namespace, podName, nodePoolName, version, newRelicApiKey, newRelicAccountId]
  steps:
    - check_pod_before:
        call: workflows.run
        args:
          name: check-pod-status
          input:
            projectId: ${projectId}
            clusterName: ${clusterName}
            namespace: ${namespace}
            podName: ${podName}
        result: pod_status_before
    
    - update_gke:
        call: workflows.run
        args:
          name: update-gke
          input:
            projectId: ${projectId}
            clusterName: ${clusterName}
            nodePoolName: ${nodePoolName}
            version: ${version}
        result: update_result
    
    - check_pod_after:
        call: workflows.run
        args:
          name: check-pod-status
          input:
            projectId: ${projectId}
            clusterName: ${clusterName}
            namespace: ${namespace}
            podName: ${podName}
        result: pod_status_after
    
    - check_newrelic_alerts:
        call: workflows.run
        args:
          name: check-newrelic-alerts
          input:
            apiKey: ${newRelicApiKey}
            accountId: ${newRelicAccountId}
            timeRange: "1h"
        result: alerts
    
    - return_summary:
        return:
          pod_status_before: ${pod_status_before}
          update_result: ${update_result}
          pod_status_after: ${pod_status_after}
          alerts: ${alerts}

3. エラー処理とリトライ戦略

3.1 エラー処理の基本

親ワークフローでエラー処理を行うことで、一元的にエラーを管理できます。

3.2 リトライ機能の実装

Workflowsでは、retryパラメータを使用して自動リトライを設定できます。

子ワークフロー呼び出し時のリトライ

main:
  params: [projectId, clusterName, namespace, podName]
  steps:
    - check_pod_with_retry:
        call: workflows.run
        args:
          name: check-pod-status
          input:
            projectId: ${projectId}
            clusterName: ${clusterName}
            namespace: ${namespace}
            podName: ${podName}
        retry:
          predicate: ${http.default_retry_predicate}
          max_retries: 3
          backoff:
            initial_delay: 2
            max_delay: 60
            multiplier: 2
        result: pod_status

HTTP呼び出し時のリトライ

main:
  params: [projectId, clusterName, nodePoolName, version]
  steps:
    - update_node_pool_with_retry:
        call: http.post
        args:
          url: https://container.googleapis.com/v1/projects/${projectId}/locations/${clusterName}/clusters/${clusterName}/nodePools/${nodePoolName}/update
          auth:
            type: OAuth2
          body:
            nodeVersion: ${version}
        retry:
          predicate: ${http.default_retry_predicate}
          max_retries: 5
          backoff:
            initial_delay: 5
            max_delay: 300
            multiplier: 2
        result: update_result

3.3 カスタムエラーハンドリング

親ワークフローで、各ステップのエラーを捕捉して適切に処理します。

# gke-update-orchestrator-with-error-handling.yaml
main:
  params: [projectId, clusterName, namespace, podName, nodePoolName, version, newRelicApiKey, newRelicAccountId]
  steps:
    - check_pod_before:
        try:
          call: workflows.run
          args:
            name: check-pod-status
            input:
              projectId: ${projectId}
              clusterName: ${clusterName}
              namespace: ${namespace}
              podName: ${podName}
          result: pod_status_before
        except:
          as: e
          steps:
            - handle_error:
                assign:
                  - error_message: ${"Pod status check failed: " + e.message}
                  - error_step: "check_pod_before"
                raise: ${error_message}
    
    - update_gke:
        try:
          call: workflows.run
          args:
            name: update-gke
            input:
              projectId: ${projectId}
              clusterName: ${clusterName}
              nodePoolName: ${nodePoolName}
              version: ${version}
          result: update_result
        except:
          as: e
          steps:
            - rollback_consideration:
                assign:
                  - error_message: ${"GKE update failed: " + e.message}
                  - error_step: "update_gke"
                  # ロールバック処理をここに追加可能
                raise: ${error_message}
    
    - check_pod_after:
        try:
          call: workflows.run
          args:
            name: check-pod-status
            input:
              projectId: ${projectId}
              clusterName: ${clusterName}
              namespace: ${namespace}
              podName: ${podName}
          result: pod_status_after
        except:
          as: e
          steps:
            - handle_error:
                assign:
                  - error_message: ${"Post-update pod status check failed: " + e.message}
                  - error_step: "check_pod_after"
                raise: ${error_message}
    
    - check_newrelic_alerts:
        try:
          call: workflows.run
          args:
            name: check-newrelic-alerts
            input:
              apiKey: ${newRelicApiKey}
              accountId: ${newRelicAccountId}
              timeRange: "1h"
          result: alerts
        except:
          as: e
          steps:
            - handle_error:
                assign:
                  - error_message: ${"NewRelic alert check failed: " + e.message}
                  - error_step: "check_newrelic_alerts"
                  # アラート確認の失敗は警告として扱うことも可能
                next: continue_workflow
    
    - return_summary:
        return:
          pod_status_before: ${pod_status_before}
          update_result: ${update_result}
          pod_status_after: ${pod_status_after}
          alerts: ${alerts}

4. ベストプラクティス

4.1 ワークフロー設計の原則

1. 単一責任の原則

各子ワークフローは、1つの明確な責任を持つべきです。

2. パラメータの明確化

各ワークフローは、必要なパラメータを明確に定義します。

main:
  params: 
    - projectId: string
    - clusterName: string
    - namespace: string
    - podName: string
  steps:
    # ...

3. 戻り値の標準化

各子ワークフローは、統一された形式で結果を返します。

main:
  steps:
    - return_result:
        return:
          success: true
          data: ${result_data}
          timestamp: ${sys.now()}

4.2 エラー処理のベストプラクティス

1. 階層的なエラー処理

2. エラーの分類

エラーを分類し、適切に処理します:

  • 一時的エラー: リトライ可能
  • 永続的エラー: ロールバックが必要
  • 警告: 処理を継続可能
main:
  steps:
    - handle_error:
        switch:
          - condition: ${e.code == 429 or e.code == 503}
            steps:
              - retry_step:
                  # 一時的エラー: リトライ
          - condition: ${e.code >= 400 and e.code < 500}
            steps:
              - rollback_step:
                  # 永続的エラー: ロールバック
          - condition: ${true}
            steps:
              - log_warning:
                  # その他のエラー: 警告として記録

4.3 リトライ戦略のベストプラクティス

1. 指数バックオフ

retry:
  predicate: ${http.default_retry_predicate}
  max_retries: 5
  backoff:
    initial_delay: 2      # 初期待機時間(秒)
    max_delay: 300       # 最大待機時間(秒)
    multiplier: 2       # 指数倍率

2. リトライ可能なエラーの判定

main:
  steps:
    - define_retry_predicate:
        assign:
          - retry_predicate: ${e.code == 429 or e.code == 503 or e.code == 500}
    
    - call_with_custom_retry:
        call: http.post
        args:
          url: ${url}
        retry:
          predicate: ${retry_predicate}
          max_retries: 3

3. 部分的なリトライ

特定のステップのみをリトライする場合:

main:
  steps:
    - retry_loop:
        for:
          value: attempt
          in: ${[1, 2, 3]}
          steps:
            - try_step:
                try:
                  call: workflows.run
                  args:
                    name: critical-step
                  result: result
                except:
                  as: e
                  steps:
                    - check_retry:
                        switch:
                          - condition: ${attempt < 3}
                            next: retry_loop
                          - condition: ${true}
                            raise: ${"Max retries exceeded: " + e.message}

4.4 実行フローの可視化

親ワークフローで全体の実行フローを明確にします。

5. 高度なパターン

5.1 条件分岐とループ

親ワークフローで条件分岐やループを使用して、柔軟な実行フローを実現します。

main:
  params: [projectId, clusterName, podNames, nodePoolName, version]
  steps:
    - check_all_pods_before:
        for:
          value: podName
          in: ${podNames}
          steps:
            - check_pod:
                call: workflows.run
                args:
                  name: check-pod-status
                  input:
                    projectId: ${projectId}
                    clusterName: ${clusterName}
                    namespace: "default"
                    podName: ${podName}
                result: pod_status
    
    - update_gke:
        call: workflows.run
        args:
          name: update-gke
          input:
            projectId: ${projectId}
            clusterName: ${clusterName}
            nodePoolName: ${nodePoolName}
            version: ${version}
        result: update_result
    
    - wait_for_stabilization:
        call: sys.sleep
        args:
          seconds: 60
    
    - check_all_pods_after:
        for:
          value: podName
          in: ${podNames}
          steps:
            - check_pod:
                call: workflows.run
                args:
                  name: check-pod-status
                  input:
                    projectId: ${projectId}
                    clusterName: ${clusterName}
                    namespace: "default"
                    podName: ${podName}
                result: pod_status
            - validate_pod_status:
                switch:
                  - condition: ${pod_status.status != "Running"}
                    raise: ${"Pod " + podName + " is not running"}

5.2 並列実行

独立した処理は並列実行することで、全体の処理時間を短縮できます。

main:
  params: [projectId, clusterName, podNames]
  steps:
    - check_pods_in_parallel:
        parallel:
          - check_pod_1:
              call: workflows.run
              args:
                name: check-pod-status
                input:
                  projectId: ${projectId}
                  clusterName: ${clusterName}
                  namespace: "default"
                  podName: ${podNames[0]}
              result: pod1_status
          - check_pod_2:
              call: workflows.run
              args:
                name: check-pod-status
                input:
                  projectId: ${projectId}
                  clusterName: ${clusterName}
                  namespace: "default"
                  podName: ${podNames[1]}
              result: pod2_status
          - check_pod_3:
              call: workflows.run
              args:
                name: check-pod-status
                input:
                  projectId: ${projectId}
                  clusterName: ${clusterName}
                  namespace: "default"
                  podName: ${podNames[2]}
              result: pod3_status

5.3 コールバックを使用した待機

長時間実行される処理の完了を待つ場合、コールバックを使用します。

main:
  params: [projectId, clusterName, nodePoolName, version]
  steps:
    - start_update:
        call: workflows.run
        args:
          name: update-gke-async
          input:
            projectId: ${projectId}
            clusterName: ${clusterName}
            nodePoolName: ${nodePoolName}
            version: ${version}
        result: update_job
    
    - wait_for_completion:
        call: workflows.run
        args:
          name: wait-for-gke-update
          input:
            callbackUrl: ${update_job.callback_url}
            timeout: 3600
        result: completion_result

6. 実践的な実装例

6.1 完全なGKEアップデートワークフロー

エラー処理とリトライを含む完全な実装例:

# gke-update-complete.yaml
main:
  params: 
    - projectId: string
    - clusterName: string
    - namespace: string
    - podNames: list
    - nodePoolName: string
    - version: string
    - newRelicApiKey: string
    - newRelicAccountId: string
  steps:
    - initialize:
        assign:
          - errors: []
          - start_time: ${sys.now()}
    
    - check_pods_before:
        for:
          value: podName
          in: ${podNames}
          steps:
            - check_pod:
                try:
                  call: workflows.run
                  args:
                    name: check-pod-status
                    input:
                      projectId: ${projectId}
                      clusterName: ${clusterName}
                      namespace: ${namespace}
                      podName: ${podName}
                    retry:
                      predicate: ${http.default_retry_predicate}
                      max_retries: 3
                      backoff:
                        initial_delay: 2
                        max_delay: 30
                        multiplier: 2
                  result: pod_status
                except:
                  as: e
                  steps:
                    - record_error:
                        assign:
                          - errors: ${errors + [{"step": "check_pods_before", "pod": podName, "error": e.message}]}
                        raise: ${"Pod check failed: " + podName}
    
    - update_gke:
        try:
          call: workflows.run
          args:
            name: update-gke
            input:
              projectId: ${projectId}
              clusterName: ${clusterName}
              nodePoolName: ${nodePoolName}
              version: ${version}
            retry:
              predicate: ${http.default_retry_predicate}
              max_retries: 5
              backoff:
                initial_delay: 5
                max_delay: 300
                multiplier: 2
          result: update_result
        except:
          as: e
          steps:
            - record_error:
                assign:
                  - errors: ${errors + [{"step": "update_gke", "error": e.message}]}
            - rollback_consideration:
                # ロールバック処理をここに追加
                raise: ${"GKE update failed: " + e.message}
    
    - wait_for_stabilization:
        call: sys.sleep
        args:
          seconds: 60
    
    - check_pods_after:
        for:
          value: podName
          in: ${podNames}
          steps:
            - check_pod:
                try:
                  call: workflows.run
                  args:
                    name: check-pod-status
                    input:
                      projectId: ${projectId}
                      clusterName: ${clusterName}
                      namespace: ${namespace}
                      podName: ${podName}
                    retry:
                      predicate: ${http.default_retry_predicate}
                      max_retries: 3
                      backoff:
                        initial_delay: 2
                        max_delay: 30
                        multiplier: 2
                  result: pod_status
                except:
                  as: e
                  steps:
                    - record_error:
                        assign:
                          - errors: ${errors + [{"step": "check_pods_after", "pod": podName, "error": e.message}]}
                        raise: ${"Post-update pod check failed: " + podName}
    
    - check_newrelic_alerts:
        try:
          call: workflows.run
          args:
            name: check-newrelic-alerts
            input:
              apiKey: ${newRelicApiKey}
              accountId: ${newRelicAccountId}
              timeRange: "1h"
            retry:
              predicate: ${http.default_retry_predicate}
              max_retries: 2
              backoff:
                initial_delay: 1
                max_delay: 10
                multiplier: 2
          result: alerts
        except:
          as: e
          steps:
            - record_warning:
                assign:
                  - errors: ${errors + [{"step": "check_newrelic_alerts", "error": e.message, "severity": "warning"}]}
                # アラート確認の失敗は警告として扱う
                next: continue_workflow
    
    - return_summary:
        assign:
          - end_time: ${sys.now()}
          - duration: ${end_time - start_time}
        return:
          success: ${len(errors) == 0}
          errors: ${errors}
          duration_seconds: ${duration}
          update_result: ${update_result}
          alerts: ${alerts}

7. ワークフロー設計のチェックリスト

7.1 設計時の確認事項

  • 各子ワークフローは単一責任を持っているか
  • 親ワークフローが各子ワークフローを順次呼び出しているか
  • エラー処理が親ワークフローで一元管理されているか
  • リトライ戦略が適切に設定されているか
  • パラメータが明確に定義されているか
  • 戻り値が統一された形式か
  • ログが適切に記録されているか

7.2 実装時の確認事項

  • 一時的エラーと永続的エラーを適切に分類しているか
  • ロールバック処理が実装されているか(必要な場合)
  • タイムアウトが適切に設定されているか
  • 並列実行が可能な処理は並列化しているか
  • コールバックを使用すべき箇所で使用しているか

8. まとめ

本記事では、Google Cloud Workflowsを使用した親子ワークフローの設計と実装について紹介しました。

重要なポイント

  1. 親が順次呼び出すパターンを推奨: エラー処理が明確で、実行フローの可視性が高い
  2. エラー処理は親ワークフローで一元管理: 一元的なエラー処理により、保守性が向上
  3. 適切なリトライ戦略の実装: 指数バックオフを使用したリトライで、一時的エラーに対応
  4. 単一責任の原則: 各子ワークフローは1つの明確な責任を持つ
  5. 統一されたパラメータと戻り値: モジュール化と再利用性を高める

次のアクション

  1. 既存のワークフローをレビューし、親子ワークフローパターンに適合するか確認
  2. エラー処理とリトライ戦略を見直す
  3. 実践的なユースケースで親子ワークフローを実装
  4. ワークフローの実行ログを確認し、改善点を洗い出す
  5. チーム内でベストプラクティスを共有

参考資料

Discussion