Parallel/Map内でエラー処理するけどマシン全体は失敗させる
AWS Step FunctionsのParallel/Mapステート内でエラーをキャッチしてすべてのブランチ/イテレーションは完了させるけど、ステートマシン全体は失敗させる方法です。
はじめに
Parallel/Mapステート内でエラーが発生した場合、デフォルトでは、その時点で他のブランチ/イテレーションの実行がキャンセルされステートマシン全体が失敗します。そのため、一部の処理が失敗してもすべての処理を完了させたい場合は、Parallel/Mapステート内でエラーをキャッチする処理が必要になります。しかし、そうするとステートマシン全体としては成功で完了するため、ステートマシンの失敗を監視していても一部の処理でエラーがあったことに気づけない可能性があります。そこで、対策として以下の方法が考えられます。
- 内部のエラー処理の中で通知する
- すべての処理が終わった後でステートマシン全体を失敗させる
1の方が素直な方法である気はしますが、ここでは、2の方法を取り上げます。
戦略
ParallelステートもMapステートも、内部のすべてのブランチ/イテレーションの結果を束ねたJSON配列を出力します。これを利用して、Parallel/Mapステートの後で結果判定を行い、ブランチ/イテレーションの結果にエラーが含まれていた場合はFailステートに移行してステートマシン全体が失敗するようにします。なお、JSON配列のままでは判定が難しいため、JSON配列を文字列に変換してそこにErrorが含まれるかというざっくり判定を行います。
Parallelステートの例

1番目のブランチのInvocationFailsステートは失敗するようになっていますが、エラーはキャッチされてブランチとしては成功で終わります。ブランチの結果は、InvocationFailsステートの出力がFallbackステートによってスルーされるため{"Error": ..., "Cause": ...}です。2番目のブランチAnotherBranchは単純に成功します。
Parallelステートの出力は、ResultSelectorを使用して{"ParallelResultString": JSON配列を文字列にしたもの}になるようにしています。次のIfPrallelResultIncludesErrorステートで、$.ParallelResultStringが*Error*にマッチするかを判定しています。
{
"Comment": "A description of my state machine",
"StartAt": "Parallel",
"States": {
"Parallel": {
"Type": "Parallel",
"Branches": [
{
"StartAt": "InvocationFails",
"States": {
"InvocationFails": {
"Type": "Task",
"Resource": "arn:aws:states:::lambda:invoke",
"OutputPath": "$.Payload",
"Parameters": {
"FunctionName": "function-you-cannot-invoke"
},
"Catch": [
{
"ErrorEquals": [
"States.ALL"
],
"Next": "Fallback"
}
],
"End": true
},
"Fallback": {
"Type": "Succeed"
}
}
},
{
"StartAt": "AnotherBranch",
"States": {
"AnotherBranch": {
"Type": "Pass",
"End": true,
"Result": {
"AnotherBranchOutput": "foobar"
}
}
}
}
],
"Next": "IfPrallelResultIncludesError",
"ResultSelector": {
"ParallelResultString.$": "States.JsonToString($)"
}
},
"IfPrallelResultIncludesError": {
"Type": "Choice",
"Choices": [
{
"Variable": "$.ParallelResultString",
"StringMatches": "*Error*",
"Next": "Fail"
}
],
"Default": "Success"
},
"Fail": {
"Type": "Fail"
},
"Success": {
"Type": "Succeed"
}
}
}
実行結果

想定どおり、Parallelステートの処理はすべて完了しましたが、ステートマシンは失敗しました。
IfPrallelResultIncludesErrorステートには、以下のような入力がありました。
{
"ParallelResultString": "[{\"Error\":\"Lambda.AWSLambdaException\",\"Cause\":\"User: arn:aws:sts::123456789012:assumed-role/StepFunctionsRoleWithNoPermissions/qykZQHtyrmbixgiHPVyPKmrruQeCrceC is not authorized to perform: lambda:InvokeFunction on resource: arn:aws:lambda:ap-northeast-1:123456789012:function:function-you-cannot-invoke because no identity-based policy allows the lambda:InvokeFunction action (Service: AWSLambda; Status Code: 403; Error Code: AccessDeniedException; Request ID: c5d59aec-cc04-4e31-ac04-b25526f50330; Proxy: null)\"},{\"AnotherBranchOutput\":\"foobar\"}]"
}
文字列化する前のParallelステートの結果は[{"Error": ..., "Cause": ...}, {...}]の形式だったことが分かります。
Mapステートの例

Parallelステートの例と同様の考え方です。
{
"Comment": "A description of my state machine",
"StartAt": "Map",
"States": {
"Map": {
"Type": "Map",
"ItemProcessor": {
"ProcessorConfig": {
"Mode": "INLINE"
},
"StartAt": "InvocationFails",
"States": {
"InvocationFails": {
"Type": "Task",
"Resource": "arn:aws:states:::lambda:invoke",
"OutputPath": "$.Payload",
"Parameters": {
"FunctionName": "function-you-cannot-invoke"
},
"Catch": [
{
"ErrorEquals": [
"States.ALL"
],
"Next": "Fallback"
}
],
"End": true
},
"Fallback": {
"Type": "Succeed"
}
}
},
"ResultSelector": {
"MapResultString.$": "States.JsonToString($)"
},
"Next": "IfMapResultIncludesError"
},
"IfMapResultIncludesError": {
"Type": "Choice",
"Choices": [
{
"Variable": "$.MapResultString",
"StringMatches": "*Error*",
"Next": "Fail"
}
],
"Default": "Success"
},
"Fail": {
"Type": "Fail"
},
"Success": {
"Type": "Succeed"
}
}
}
実行結果
MapステートへのJSON配列の入力が必要になるため、例として[1, 2]を入力します。以下にその結果を示します。

こちらも、Mapステートの処理はすべて完了しましたが、ステートマシンは失敗しました。
IfMapResultIncludesErrorステートには、以下のような入力がありました。
{
"MapResultString": "[{\"Error\":\"Lambda.AWSLambdaException\",\"Cause\":\"User: arn:aws:sts::123456789012:assumed-role/StepFunctionsRoleWithNoPermissions/DtWzIFZEbVWeVYGlSpxilCjBwYznSCZi is not authorized to perform: lambda:InvokeFunction on resource: arn:aws:lambda:ap-northeast-1:123456789012:function:function-you-cannot-invoke because no identity-based policy allows the lambda:InvokeFunction action (Service: AWSLambda; Status Code: 403; Error Code: AccessDeniedException; Request ID: 202d051f-4c44-4fca-b23a-2defa3508929; Proxy: null)\"},{\"Error\":\"Lambda.AWSLambdaException\",\"Cause\":\"User: arn:aws:sts::123456789012:assumed-role/StepFunctionsRoleWithNoPermissions/xMXgsZgollEOElGhLVIgdEQVVrieQDcc is not authorized to perform: lambda:InvokeFunction on resource: arn:aws:lambda:ap-northeast-1:123456789012:function:function-you-cannot-invoke because no identity-based policy allows the lambda:InvokeFunction action (Service: AWSLambda; Status Code: 403; Error Code: AccessDeniedException; Request ID: 2608a009-3f35-49ea-8dc6-061a5ec179fb; Proxy: null)\"}]"
}
文字列化する前のMapステートの結果は[{"Error": ..., "Cause": ...}, {"Error": ..., "Cause", ...}]の形式だったことが分かります。
参考
こちらの内容とほぼ同じです。Mapの場合のCDKの例が示されています。
Discussion