【Google Cloud】 Workflowsのベストプラクティス - 親子ワークフローで実現する堅牢なオーケストレーション
はじめに
Google Cloud Workflowsは、複数のサービスやAPIを組み合わせて、複雑なビジネスプロセスを自動化するためのサーバーレスオーケストレーションサービスです。個々の処理を独立したワークフローとして実装した後、それらを組み合わせて一連の作業を実行する際、親子ワークフローの設計パターンが重要になります。
本記事では、特に一連の作業を一回の実行で完了したい場合に、エラーやリトライに強い親子ワークフローの設計方法を解説します。個々の手順をワークフロー化した次のステップとして、実践的なベストプラクティスを網羅的に紹介します。
以下の記事で紹介するコードは一例です。開発環境と用途に合うものを作成し必ず検証してください。
本記事で学べること:
- 親子ワークフローの設計パターンと選択基準
- エラー処理とリトライ戦略の実装方法
- 直列実行における親子ワークフローの呼び出しパターン
- 実践的なワークフロー設計のベストプラクティス
- 具体的なユースケース(GKEアップデートなど)での実装例
1. 親子ワークフローの基本概念
1.1 親子ワークフローとは
Google Cloud Workflowsでは、1つのワークフロー(親ワークフロー)から別のワークフロー(子ワークフロー)を呼び出すことができます。これにより、複雑な処理をモジュール化し、再利用可能なワークフローを構築できます。
親子ワークフローのメリット:
- モジュール化: 個々の処理を独立したワークフローとして実装
- 再利用性: 同じワークフローを複数の親ワークフローから呼び出し可能
- 保守性: 個々の処理の変更が他のワークフローに影響しない
- テスト容易性: 個々のワークフローを独立してテスト可能
1.2 呼び出しパターンの選択
一連の作業を実行する際、以下の2つのパターンが考えられます:
パターン1: 親が順次呼び出す(推奨)
パターン2: 各子が次の子を呼び出す(非推奨)
推奨パターン: 親が順次呼び出す
理由:
- エラー処理が明確: 親ワークフローで一元的にエラー処理が可能
- 実行フローの可視性: 親ワークフローを見るだけで全体の流れが分かる
- 柔軟性: 実行順序の変更や条件分岐が容易
- デバッグ容易性: どのステップで失敗したかが明確
2. 実践例:GKEアップデートワークフロー
2.1 ユースケースの定義
以下の手順を実行するワークフローを設計します:
- Podのステータス確認
- GKEのアップデート
- Podのステータス確認(アップデート後の確認)
- NewRelicのアラート確認
2.2 子ワークフローの実装
各手順を独立した子ワークフローとして実装します。
2.2.1 Podステータス確認ワークフロー
# check-pod-status.yaml
main:
params: [projectId, clusterName, namespace, podName]
steps:
- check_pod_status:
call: http.post
args:
url: https://container.googleapis.com/v1/projects/${projectId}/locations/${clusterName}/clusters/${clusterName}/pods/${namespace}/${podName}
auth:
type: OAuth2
result: pod_status
- return_result:
return: ${pod_status}
2.2.2 GKEアップデートワークフロー
# update-gke.yaml
main:
params: [projectId, clusterName, nodePoolName, version]
steps:
- update_node_pool:
call: http.post
args:
url: https://container.googleapis.com/v1/projects/${projectId}/locations/${clusterName}/clusters/${clusterName}/nodePools/${nodePoolName}/update
auth:
type: OAuth2
body:
nodeVersion: ${version}
result: update_result
- wait_for_completion:
call: sys.sleep
args:
seconds: 30
- return_result:
return: ${update_result}
2.2.3 NewRelicアラート確認ワークフロー
# check-newrelic-alerts.yaml
main:
params: [apiKey, accountId, timeRange]
steps:
- get_alerts:
call: http.get
args:
url: https://api.newrelic.com/v2/alerts.json
headers:
X-Api-Key: ${apiKey}
query:
account_id: ${accountId}
time_range: ${timeRange}
result: alerts
- return_result:
return: ${alerts}
2.3 親ワークフローの実装(基本版)
親ワークフローが各子ワークフローを順次呼び出すパターン:
# gke-update-orchestrator.yaml
main:
params: [projectId, clusterName, namespace, podName, nodePoolName, version, newRelicApiKey, newRelicAccountId]
steps:
- check_pod_before:
call: workflows.run
args:
name: check-pod-status
input:
projectId: ${projectId}
clusterName: ${clusterName}
namespace: ${namespace}
podName: ${podName}
result: pod_status_before
- update_gke:
call: workflows.run
args:
name: update-gke
input:
projectId: ${projectId}
clusterName: ${clusterName}
nodePoolName: ${nodePoolName}
version: ${version}
result: update_result
- check_pod_after:
call: workflows.run
args:
name: check-pod-status
input:
projectId: ${projectId}
clusterName: ${clusterName}
namespace: ${namespace}
podName: ${podName}
result: pod_status_after
- check_newrelic_alerts:
call: workflows.run
args:
name: check-newrelic-alerts
input:
apiKey: ${newRelicApiKey}
accountId: ${newRelicAccountId}
timeRange: "1h"
result: alerts
- return_summary:
return:
pod_status_before: ${pod_status_before}
update_result: ${update_result}
pod_status_after: ${pod_status_after}
alerts: ${alerts}
3. エラー処理とリトライ戦略
3.1 エラー処理の基本
親ワークフローでエラー処理を行うことで、一元的にエラーを管理できます。
3.2 リトライ機能の実装
Workflowsでは、retryパラメータを使用して自動リトライを設定できます。
子ワークフロー呼び出し時のリトライ:
main:
params: [projectId, clusterName, namespace, podName]
steps:
- check_pod_with_retry:
call: workflows.run
args:
name: check-pod-status
input:
projectId: ${projectId}
clusterName: ${clusterName}
namespace: ${namespace}
podName: ${podName}
retry:
predicate: ${http.default_retry_predicate}
max_retries: 3
backoff:
initial_delay: 2
max_delay: 60
multiplier: 2
result: pod_status
HTTP呼び出し時のリトライ:
main:
params: [projectId, clusterName, nodePoolName, version]
steps:
- update_node_pool_with_retry:
call: http.post
args:
url: https://container.googleapis.com/v1/projects/${projectId}/locations/${clusterName}/clusters/${clusterName}/nodePools/${nodePoolName}/update
auth:
type: OAuth2
body:
nodeVersion: ${version}
retry:
predicate: ${http.default_retry_predicate}
max_retries: 5
backoff:
initial_delay: 5
max_delay: 300
multiplier: 2
result: update_result
3.3 カスタムエラーハンドリング
親ワークフローで、各ステップのエラーを捕捉して適切に処理します。
# gke-update-orchestrator-with-error-handling.yaml
main:
params: [projectId, clusterName, namespace, podName, nodePoolName, version, newRelicApiKey, newRelicAccountId]
steps:
- check_pod_before:
try:
call: workflows.run
args:
name: check-pod-status
input:
projectId: ${projectId}
clusterName: ${clusterName}
namespace: ${namespace}
podName: ${podName}
result: pod_status_before
except:
as: e
steps:
- handle_error:
assign:
- error_message: ${"Pod status check failed: " + e.message}
- error_step: "check_pod_before"
raise: ${error_message}
- update_gke:
try:
call: workflows.run
args:
name: update-gke
input:
projectId: ${projectId}
clusterName: ${clusterName}
nodePoolName: ${nodePoolName}
version: ${version}
result: update_result
except:
as: e
steps:
- rollback_consideration:
assign:
- error_message: ${"GKE update failed: " + e.message}
- error_step: "update_gke"
# ロールバック処理をここに追加可能
raise: ${error_message}
- check_pod_after:
try:
call: workflows.run
args:
name: check-pod-status
input:
projectId: ${projectId}
clusterName: ${clusterName}
namespace: ${namespace}
podName: ${podName}
result: pod_status_after
except:
as: e
steps:
- handle_error:
assign:
- error_message: ${"Post-update pod status check failed: " + e.message}
- error_step: "check_pod_after"
raise: ${error_message}
- check_newrelic_alerts:
try:
call: workflows.run
args:
name: check-newrelic-alerts
input:
apiKey: ${newRelicApiKey}
accountId: ${newRelicAccountId}
timeRange: "1h"
result: alerts
except:
as: e
steps:
- handle_error:
assign:
- error_message: ${"NewRelic alert check failed: " + e.message}
- error_step: "check_newrelic_alerts"
# アラート確認の失敗は警告として扱うことも可能
next: continue_workflow
- return_summary:
return:
pod_status_before: ${pod_status_before}
update_result: ${update_result}
pod_status_after: ${pod_status_after}
alerts: ${alerts}
4. ベストプラクティス
4.1 ワークフロー設計の原則
1. 単一責任の原則
各子ワークフローは、1つの明確な責任を持つべきです。
2. パラメータの明確化
各ワークフローは、必要なパラメータを明確に定義します。
main:
params:
- projectId: string
- clusterName: string
- namespace: string
- podName: string
steps:
# ...
3. 戻り値の標準化
各子ワークフローは、統一された形式で結果を返します。
main:
steps:
- return_result:
return:
success: true
data: ${result_data}
timestamp: ${sys.now()}
4.2 エラー処理のベストプラクティス
1. 階層的なエラー処理
2. エラーの分類
エラーを分類し、適切に処理します:
- 一時的エラー: リトライ可能
- 永続的エラー: ロールバックが必要
- 警告: 処理を継続可能
main:
steps:
- handle_error:
switch:
- condition: ${e.code == 429 or e.code == 503}
steps:
- retry_step:
# 一時的エラー: リトライ
- condition: ${e.code >= 400 and e.code < 500}
steps:
- rollback_step:
# 永続的エラー: ロールバック
- condition: ${true}
steps:
- log_warning:
# その他のエラー: 警告として記録
4.3 リトライ戦略のベストプラクティス
1. 指数バックオフ
retry:
predicate: ${http.default_retry_predicate}
max_retries: 5
backoff:
initial_delay: 2 # 初期待機時間(秒)
max_delay: 300 # 最大待機時間(秒)
multiplier: 2 # 指数倍率
2. リトライ可能なエラーの判定
main:
steps:
- define_retry_predicate:
assign:
- retry_predicate: ${e.code == 429 or e.code == 503 or e.code == 500}
- call_with_custom_retry:
call: http.post
args:
url: ${url}
retry:
predicate: ${retry_predicate}
max_retries: 3
3. 部分的なリトライ
特定のステップのみをリトライする場合:
main:
steps:
- retry_loop:
for:
value: attempt
in: ${[1, 2, 3]}
steps:
- try_step:
try:
call: workflows.run
args:
name: critical-step
result: result
except:
as: e
steps:
- check_retry:
switch:
- condition: ${attempt < 3}
next: retry_loop
- condition: ${true}
raise: ${"Max retries exceeded: " + e.message}
4.4 実行フローの可視化
親ワークフローで全体の実行フローを明確にします。
5. 高度なパターン
5.1 条件分岐とループ
親ワークフローで条件分岐やループを使用して、柔軟な実行フローを実現します。
main:
params: [projectId, clusterName, podNames, nodePoolName, version]
steps:
- check_all_pods_before:
for:
value: podName
in: ${podNames}
steps:
- check_pod:
call: workflows.run
args:
name: check-pod-status
input:
projectId: ${projectId}
clusterName: ${clusterName}
namespace: "default"
podName: ${podName}
result: pod_status
- update_gke:
call: workflows.run
args:
name: update-gke
input:
projectId: ${projectId}
clusterName: ${clusterName}
nodePoolName: ${nodePoolName}
version: ${version}
result: update_result
- wait_for_stabilization:
call: sys.sleep
args:
seconds: 60
- check_all_pods_after:
for:
value: podName
in: ${podNames}
steps:
- check_pod:
call: workflows.run
args:
name: check-pod-status
input:
projectId: ${projectId}
clusterName: ${clusterName}
namespace: "default"
podName: ${podName}
result: pod_status
- validate_pod_status:
switch:
- condition: ${pod_status.status != "Running"}
raise: ${"Pod " + podName + " is not running"}
5.2 並列実行
独立した処理は並列実行することで、全体の処理時間を短縮できます。
main:
params: [projectId, clusterName, podNames]
steps:
- check_pods_in_parallel:
parallel:
- check_pod_1:
call: workflows.run
args:
name: check-pod-status
input:
projectId: ${projectId}
clusterName: ${clusterName}
namespace: "default"
podName: ${podNames[0]}
result: pod1_status
- check_pod_2:
call: workflows.run
args:
name: check-pod-status
input:
projectId: ${projectId}
clusterName: ${clusterName}
namespace: "default"
podName: ${podNames[1]}
result: pod2_status
- check_pod_3:
call: workflows.run
args:
name: check-pod-status
input:
projectId: ${projectId}
clusterName: ${clusterName}
namespace: "default"
podName: ${podNames[2]}
result: pod3_status
5.3 コールバックを使用した待機
長時間実行される処理の完了を待つ場合、コールバックを使用します。
main:
params: [projectId, clusterName, nodePoolName, version]
steps:
- start_update:
call: workflows.run
args:
name: update-gke-async
input:
projectId: ${projectId}
clusterName: ${clusterName}
nodePoolName: ${nodePoolName}
version: ${version}
result: update_job
- wait_for_completion:
call: workflows.run
args:
name: wait-for-gke-update
input:
callbackUrl: ${update_job.callback_url}
timeout: 3600
result: completion_result
6. 実践的な実装例
6.1 完全なGKEアップデートワークフロー
エラー処理とリトライを含む完全な実装例:
# gke-update-complete.yaml
main:
params:
- projectId: string
- clusterName: string
- namespace: string
- podNames: list
- nodePoolName: string
- version: string
- newRelicApiKey: string
- newRelicAccountId: string
steps:
- initialize:
assign:
- errors: []
- start_time: ${sys.now()}
- check_pods_before:
for:
value: podName
in: ${podNames}
steps:
- check_pod:
try:
call: workflows.run
args:
name: check-pod-status
input:
projectId: ${projectId}
clusterName: ${clusterName}
namespace: ${namespace}
podName: ${podName}
retry:
predicate: ${http.default_retry_predicate}
max_retries: 3
backoff:
initial_delay: 2
max_delay: 30
multiplier: 2
result: pod_status
except:
as: e
steps:
- record_error:
assign:
- errors: ${errors + [{"step": "check_pods_before", "pod": podName, "error": e.message}]}
raise: ${"Pod check failed: " + podName}
- update_gke:
try:
call: workflows.run
args:
name: update-gke
input:
projectId: ${projectId}
clusterName: ${clusterName}
nodePoolName: ${nodePoolName}
version: ${version}
retry:
predicate: ${http.default_retry_predicate}
max_retries: 5
backoff:
initial_delay: 5
max_delay: 300
multiplier: 2
result: update_result
except:
as: e
steps:
- record_error:
assign:
- errors: ${errors + [{"step": "update_gke", "error": e.message}]}
- rollback_consideration:
# ロールバック処理をここに追加
raise: ${"GKE update failed: " + e.message}
- wait_for_stabilization:
call: sys.sleep
args:
seconds: 60
- check_pods_after:
for:
value: podName
in: ${podNames}
steps:
- check_pod:
try:
call: workflows.run
args:
name: check-pod-status
input:
projectId: ${projectId}
clusterName: ${clusterName}
namespace: ${namespace}
podName: ${podName}
retry:
predicate: ${http.default_retry_predicate}
max_retries: 3
backoff:
initial_delay: 2
max_delay: 30
multiplier: 2
result: pod_status
except:
as: e
steps:
- record_error:
assign:
- errors: ${errors + [{"step": "check_pods_after", "pod": podName, "error": e.message}]}
raise: ${"Post-update pod check failed: " + podName}
- check_newrelic_alerts:
try:
call: workflows.run
args:
name: check-newrelic-alerts
input:
apiKey: ${newRelicApiKey}
accountId: ${newRelicAccountId}
timeRange: "1h"
retry:
predicate: ${http.default_retry_predicate}
max_retries: 2
backoff:
initial_delay: 1
max_delay: 10
multiplier: 2
result: alerts
except:
as: e
steps:
- record_warning:
assign:
- errors: ${errors + [{"step": "check_newrelic_alerts", "error": e.message, "severity": "warning"}]}
# アラート確認の失敗は警告として扱う
next: continue_workflow
- return_summary:
assign:
- end_time: ${sys.now()}
- duration: ${end_time - start_time}
return:
success: ${len(errors) == 0}
errors: ${errors}
duration_seconds: ${duration}
update_result: ${update_result}
alerts: ${alerts}
7. ワークフロー設計のチェックリスト
7.1 設計時の確認事項
- 各子ワークフローは単一責任を持っているか
- 親ワークフローが各子ワークフローを順次呼び出しているか
- エラー処理が親ワークフローで一元管理されているか
- リトライ戦略が適切に設定されているか
- パラメータが明確に定義されているか
- 戻り値が統一された形式か
- ログが適切に記録されているか
7.2 実装時の確認事項
- 一時的エラーと永続的エラーを適切に分類しているか
- ロールバック処理が実装されているか(必要な場合)
- タイムアウトが適切に設定されているか
- 並列実行が可能な処理は並列化しているか
- コールバックを使用すべき箇所で使用しているか
8. まとめ
本記事では、Google Cloud Workflowsを使用した親子ワークフローの設計と実装について紹介しました。
重要なポイント:
- 親が順次呼び出すパターンを推奨: エラー処理が明確で、実行フローの可視性が高い
- エラー処理は親ワークフローで一元管理: 一元的なエラー処理により、保守性が向上
- 適切なリトライ戦略の実装: 指数バックオフを使用したリトライで、一時的エラーに対応
- 単一責任の原則: 各子ワークフローは1つの明確な責任を持つ
- 統一されたパラメータと戻り値: モジュール化と再利用性を高める
次のアクション:
- 既存のワークフローをレビューし、親子ワークフローパターンに適合するか確認
- エラー処理とリトライ戦略を見直す
- 実践的なユースケースで親子ワークフローを実装
- ワークフローの実行ログを確認し、改善点を洗い出す
- チーム内でベストプラクティスを共有
Discussion