Step Functions で人間の承認を待つワークフローのデプロイのチュートリアルをやってみた
Step Functions で人間の承認を待つワークフローのデプロイ - AWS Step Functions
このチュートリアルでは、AWS Step Functions がタスク中に実行を停止し、ユーザーが E メールに返答するまで待機することができる人間による承諾プロジェクトをデプロイする方法を説明します。
上記チュートリアルをやってみました。
ステップ 1: AWS CloudFormation テンプレートを作成する
ドキュメントに記載されているテンプレートソースコードをコピーして、human-approval.yaml というファイル名で保存します。
コード
AWSTemplateFormatVersion: "2010-09-09"
Description: "AWS Step Functions Human based task example. It sends an email with an HTTP URL for approval."
Parameters:
Email:
Type: String
AllowedPattern: "^[a-zA-Z0-9_.+-]+@[a-zA-Z0-9-]+\\.[a-zA-Z0-9-.]+$"
ConstraintDescription: Must be a valid email address.
Resources:
# Begin API Gateway Resources
ExecutionApi:
Type: "AWS::ApiGateway::RestApi"
Properties:
Name: "Human approval endpoint"
Description: "HTTP Endpoint backed by API Gateway and Lambda"
FailOnWarnings: true
ExecutionResource:
Type: 'AWS::ApiGateway::Resource'
Properties:
RestApiId: !Ref ExecutionApi
ParentId: !GetAtt "ExecutionApi.RootResourceId"
PathPart: execution
ExecutionMethod:
Type: "AWS::ApiGateway::Method"
Properties:
AuthorizationType: NONE
HttpMethod: GET
Integration:
Type: AWS
IntegrationHttpMethod: POST
Uri: !Sub "arn:aws:apigateway:${AWS::Region}:lambda:path/2015-03-31/functions/${LambdaApprovalFunction.Arn}/invocations"
IntegrationResponses:
- StatusCode: 302
ResponseParameters:
method.response.header.Location: "integration.response.body.headers.Location"
RequestTemplates:
application/json: |
{
"body" : $input.json('$'),
"headers": {
#foreach($header in $input.params().header.keySet())
"$header": "$util.escapeJavaScript($input.params().header.get($header))" #if($foreach.hasNext),#end
#end
},
"method": "$context.httpMethod",
"params": {
#foreach($param in $input.params().path.keySet())
"$param": "$util.escapeJavaScript($input.params().path.get($param))" #if($foreach.hasNext),#end
#end
},
"query": {
#foreach($queryParam in $input.params().querystring.keySet())
"$queryParam": "$util.escapeJavaScript($input.params().querystring.get($queryParam))" #if($foreach.hasNext),#end
#end
}
}
ResourceId: !Ref ExecutionResource
RestApiId: !Ref ExecutionApi
MethodResponses:
- StatusCode: 302
ResponseParameters:
method.response.header.Location: true
ApiGatewayAccount:
Type: 'AWS::ApiGateway::Account'
Properties:
CloudWatchRoleArn: !GetAtt "ApiGatewayCloudWatchLogsRole.Arn"
ApiGatewayCloudWatchLogsRole:
Type: 'AWS::IAM::Role'
Properties:
AssumeRolePolicyDocument:
Version: "2012-10-17"
Statement:
- Effect: Allow
Principal:
Service:
- apigateway.amazonaws.com
Action:
- 'sts:AssumeRole'
Policies:
- PolicyName: ApiGatewayLogsPolicy
PolicyDocument:
Version: 2012-10-17
Statement:
- Effect: Allow
Action:
- "logs:*"
Resource: !Sub "arn:${AWS::Partition}:logs:*:*:*"
ExecutionApiStage:
DependsOn:
- ApiGatewayAccount
Type: 'AWS::ApiGateway::Stage'
Properties:
DeploymentId: !Ref ApiDeployment
MethodSettings:
- DataTraceEnabled: true
HttpMethod: '*'
LoggingLevel: INFO
ResourcePath: /*
RestApiId: !Ref ExecutionApi
StageName: states
ApiDeployment:
Type: "AWS::ApiGateway::Deployment"
DependsOn:
- ExecutionMethod
Properties:
RestApiId: !Ref ExecutionApi
StageName: DummyStage
# End API Gateway Resources
# Begin
# Lambda that will be invoked by API Gateway
LambdaApprovalFunction:
Type: 'AWS::Lambda::Function'
Properties:
Code:
ZipFile:
Fn::Sub: |
const { SFN: StepFunctions } = require("@aws-sdk/client-sfn");
var redirectToStepFunctions = function(lambdaArn, statemachineName, executionName, callback) {
const lambdaArnTokens = lambdaArn.split(":");
const partition = lambdaArnTokens[1];
const region = lambdaArnTokens[3];
const accountId = lambdaArnTokens[4];
console.log("partition=" + partition);
console.log("region=" + region);
console.log("accountId=" + accountId);
const executionArn = "arn:" + partition + ":states:" + region + ":" + accountId + ":execution:" + statemachineName + ":" + executionName;
console.log("executionArn=" + executionArn);
const url = "https://console.aws.amazon.com/states/home?region=" + region + "#/executions/details/" + executionArn;
callback(null, {
statusCode: 302,
headers: {
Location: url
}
});
};
exports.handler = (event, context, callback) => {
console.log('Event= ' + JSON.stringify(event));
const action = event.query.action;
const taskToken = event.query.taskToken;
const statemachineName = event.query.sm;
const executionName = event.query.ex;
const stepfunctions = new StepFunctions();
var message = "";
if (action === "approve") {
message = { "Status": "Approved! Task approved by ${Email}" };
} else if (action === "reject") {
message = { "Status": "Rejected! Task rejected by ${Email}" };
} else {
console.error("Unrecognized action. Expected: approve, reject.");
callback({"Status": "Failed to process the request. Unrecognized Action."});
}
stepfunctions.sendTaskSuccess({
output: JSON.stringify(message),
taskToken: event.query.taskToken
})
.then(function(data) {
redirectToStepFunctions(context.invokedFunctionArn, statemachineName, executionName, callback);
}).catch(function(err) {
console.error(err, err.stack);
callback(err);
});
}
Description: Lambda function that callback to AWS Step Functions
FunctionName: LambdaApprovalFunction
Handler: index.handler
Role: !GetAtt "LambdaApiGatewayIAMRole.Arn"
Runtime: nodejs18.x
LambdaApiGatewayInvoke:
Type: "AWS::Lambda::Permission"
Properties:
Action: "lambda:InvokeFunction"
FunctionName: !GetAtt "LambdaApprovalFunction.Arn"
Principal: "apigateway.amazonaws.com"
SourceArn: !Sub "arn:aws:execute-api:${AWS::Region}:${AWS::AccountId}:${ExecutionApi}/*"
LambdaApiGatewayIAMRole:
Type: "AWS::IAM::Role"
Properties:
AssumeRolePolicyDocument:
Version: "2012-10-17"
Statement:
- Action:
- "sts:AssumeRole"
Effect: "Allow"
Principal:
Service:
- "lambda.amazonaws.com"
Policies:
- PolicyName: CloudWatchLogsPolicy
PolicyDocument:
Statement:
- Effect: Allow
Action:
- "logs:*"
Resource: !Sub "arn:${AWS::Partition}:logs:*:*:*"
- PolicyName: StepFunctionsPolicy
PolicyDocument:
Statement:
- Effect: Allow
Action:
- "states:SendTaskFailure"
- "states:SendTaskSuccess"
Resource: "*"
# End Lambda that will be invoked by API Gateway
# Begin state machine that publishes to Lambda and sends an email with the link for approval
HumanApprovalLambdaStateMachine:
Type: AWS::StepFunctions::StateMachine
Properties:
RoleArn: !GetAtt LambdaStateMachineExecutionRole.Arn
DefinitionString:
Fn::Sub: |
{
"StartAt": "Lambda Callback",
"TimeoutSeconds": 3600,
"States": {
"Lambda Callback": {
"Type": "Task",
"Resource": "arn:${AWS::Partition}:states:::lambda:invoke.waitForTaskToken",
"Parameters": {
"FunctionName": "${LambdaHumanApprovalSendEmailFunction.Arn}",
"Payload": {
"ExecutionContext.$": "$$",
"APIGatewayEndpoint": "https://${ExecutionApi}.execute-api.${AWS::Region}.amazonaws.com/states"
}
},
"Next": "ManualApprovalChoiceState"
},
"ManualApprovalChoiceState": {
"Type": "Choice",
"Choices": [
{
"Variable": "$.Status",
"StringEquals": "Approved! Task approved by ${Email}",
"Next": "ApprovedPassState"
},
{
"Variable": "$.Status",
"StringEquals": "Rejected! Task rejected by ${Email}",
"Next": "RejectedPassState"
}
]
},
"ApprovedPassState": {
"Type": "Pass",
"End": true
},
"RejectedPassState": {
"Type": "Pass",
"End": true
}
}
}
SNSHumanApprovalEmailTopic:
Type: AWS::SNS::Topic
Properties:
Subscription:
-
Endpoint: !Sub ${Email}
Protocol: email
LambdaHumanApprovalSendEmailFunction:
Type: "AWS::Lambda::Function"
Properties:
Handler: "index.lambda_handler"
Role: !GetAtt LambdaSendEmailExecutionRole.Arn
Runtime: "nodejs18.x"
Timeout: "25"
Code:
ZipFile:
Fn::Sub: |
console.log('Loading function');
const { SNS } = require("@aws-sdk/client-sns");
exports.lambda_handler = (event, context, callback) => {
console.log('event= ' + JSON.stringify(event));
console.log('context= ' + JSON.stringify(context));
const executionContext = event.ExecutionContext;
console.log('executionContext= ' + executionContext);
const executionName = executionContext.Execution.Name;
console.log('executionName= ' + executionName);
const statemachineName = executionContext.StateMachine.Name;
console.log('statemachineName= ' + statemachineName);
const taskToken = executionContext.Task.Token;
console.log('taskToken= ' + taskToken);
const apigwEndpint = event.APIGatewayEndpoint;
console.log('apigwEndpint = ' + apigwEndpint)
const approveEndpoint = apigwEndpint + "/execution?action=approve&ex=" + executionName + "&sm=" + statemachineName + "&taskToken=" + encodeURIComponent(taskToken);
console.log('approveEndpoint= ' + approveEndpoint);
const rejectEndpoint = apigwEndpint + "/execution?action=reject&ex=" + executionName + "&sm=" + statemachineName + "&taskToken=" + encodeURIComponent(taskToken);
console.log('rejectEndpoint= ' + rejectEndpoint);
const emailSnsTopic = "${SNSHumanApprovalEmailTopic}";
console.log('emailSnsTopic= ' + emailSnsTopic);
var emailMessage = 'Welcome! \n\n';
emailMessage += 'This is an email requiring an approval for a step functions execution. \n\n'
emailMessage += 'Check the following information and click "Approve" link if you want to approve. \n\n'
emailMessage += 'Execution Name -> ' + executionName + '\n\n'
emailMessage += 'Approve ' + approveEndpoint + '\n\n'
emailMessage += 'Reject ' + rejectEndpoint + '\n\n'
emailMessage += 'Thanks for using Step functions!'
const sns = new SNS();
var params = {
Message: emailMessage,
Subject: "Required approval from AWS Step Functions",
TopicArn: emailSnsTopic
};
sns.publish(params)
.then(function(data) {
console.log("MessageID is " + data.MessageId);
callback(null);
}).catch(
function(err) {
console.error(err, err.stack);
callback(err);
});
}
LambdaStateMachineExecutionRole:
Type: "AWS::IAM::Role"
Properties:
AssumeRolePolicyDocument:
Version: "2012-10-17"
Statement:
- Effect: Allow
Principal:
Service: states.amazonaws.com
Action: "sts:AssumeRole"
Policies:
- PolicyName: InvokeCallbackLambda
PolicyDocument:
Statement:
- Effect: Allow
Action:
- "lambda:InvokeFunction"
Resource:
- !Sub "${LambdaHumanApprovalSendEmailFunction.Arn}"
LambdaSendEmailExecutionRole:
Type: "AWS::IAM::Role"
Properties:
AssumeRolePolicyDocument:
Version: "2012-10-17"
Statement:
- Effect: Allow
Principal:
Service: lambda.amazonaws.com
Action: "sts:AssumeRole"
Policies:
- PolicyName: CloudWatchLogsPolicy
PolicyDocument:
Statement:
- Effect: Allow
Action:
- "logs:CreateLogGroup"
- "logs:CreateLogStream"
- "logs:PutLogEvents"
Resource: !Sub "arn:${AWS::Partition}:logs:*:*:*"
- PolicyName: SNSSendEmailPolicy
PolicyDocument:
Statement:
- Effect: Allow
Action:
- "SNS:Publish"
Resource:
- !Sub "${SNSHumanApprovalEmailTopic}"
# End state machine that publishes to Lambda and sends an email with the link for approval
Outputs:
ApiGatewayInvokeURL:
Value: !Sub "https://${ExecutionApi}.execute-api.${AWS::Region}.amazonaws.com/states"
StateMachineHumanApprovalArn:
Value: !Ref HumanApprovalLambdaStateMachine
ステップ 2: スタックを作成する
CloudFormation コンソールからステップ 1 のテンプレートをデプロイします。
パラメータには有効な E メールアドレスを入力します。
ステップ 3: Amazon SNS サブスクリプションを承認する
CloudFormation スタックのリソースタブから SNS トピックのリンクをクリックします。
CloudFormation スタックを作成しただけではメールアドレスが承認されません。
ステップ 1 でパラメータに入力した E メールアドレス宛てに以下のようなメールが届いているので、「Confirm subscription」のリンクをクリックします。
メール
タイトル: AWS Notification - Subscription Confirmation
You have chosen to subscribe to the topic:
arn:aws:sns:ap-northeast-1:012345678901:sample-SNSHumanApprovalEmailTopic-o7TZrt2VtErr
To confirm this subscription, click or visit the link below (If this was in error no action is necessary):
Confirm subscription
Please do not reply directly to this email. If you wish to remove yourself from receiving all future SNS subscription confirmation requests please send an email to sns-opt-out
以下の画面が表示されていればメールアドレスの承認完了です。
SNS コンソール側でもステータスが確認済みになりました。
ステップ 4: ステートマシンを実行する
CloudFormation スタックのリソースタブから Step Functions のステートマシンのリンクをクリックします。
「実行を開始」をクリックします。
[入力] ボックスに以下の JSON を入力してワークフローを実行します。
{
"Comment": "Testing the human approval tutorial."
}
Lambda Callback のタスクでワークフローが一時停止中になります。
ステップ 1 でパラメータに入力した E メールアドレス宛てに以下のようなメールが届いているので、「Approve」のリンクをクリックします。
メール
タイトル: Required approval from AWS Step Functions
AWS Notifications <no-reply@sns.amazonaws.com>
Welcome!
This is an email requiring an approval for a step functions execution.
Check the following information and click "Approve" link if you want to approve.
Execution Name -> bae882cc-6707-4d0b-8cde-6d725d9e6cb9
Approve https://o61y6vzty3.execute-api.ap-northeast-1.amazonaws.com/states/execution?action=approve&ex=bae882cc-6707-4d0b-8cde-6d725d9e6cb9&sm=HumanApprovalLambdaStateMachine-GD6QjAdOYn3K&taskToken=xxx
Reject https://o61y6vzty3.execute-api.ap-northeast-1.amazonaws.com/states/execution?action=reject&ex=bae882cc-6707-4d0b-8cde-6d725d9e6cb9&sm=HumanApprovalLambdaStateMachine-GD6QjAdOYn3K&taskToken=xxx
Thanks for using Step functions!
--
If you wish to stop receiving notifications from this topic, please click or visit the link below to unsubscribe:
https://sns.ap-northeast-1.amazonaws.com/unsubscribe.html?SubscriptionArn=arn:aws:sns:ap-northeast-1:012345678901:sample-SNSHumanApprovalEmailTopic-o7TZrt2VtErr:a552a1ab-3709-4e15-bd65-28556e107d3c&Endpoint=xxx@gmail.com
Please do not reply directly to this email. If you have any questions or comments regarding this email, please contact us at https://aws.amazon.com/support
Step Functions コンソールで ApprovedPassState に遷移して終了しました。
再度同じ設定で Step Functions を実行して Reject のリンクをクリックすると、RejectedPassState に遷移することも確認できます。
実装について
処理のフローは以下の実装になっています。
- Step Functions ワークフローを実行する
- 「sample-LambdaHumanApprovalSendEmailFunction」という Lambda 関数を呼び出す
- Step Functions からタスクトークンや API Gateway エンドポイント情報を取得する
- 承認/拒否の URL を生成する
- SNS トピックへメッセージを送信する
- SNS から E メールアドレス宛にメッセージ送信する
- E メールに記載されている承認/拒否のリンクをクリックする
- API Gateway の API を呼び出す
- API Gateway から「LambdaApprovalFunction」という Lambda 関数を呼び出す
- Step Functions ワークフローに承認/拒否の結果を返す
- Step Functions が承認/拒否の結果で分岐
ポイントは承認または拒否されるまで Step Functions ワークフローが一時停止する点です。
より専門的にはタスクトークンのコールバックまで待機するフローと呼ぶこともできます。
一時停止の最大期間は 1 年間です。
Step Functions でサービス統合パターンを検出する - AWS Step Functions
このようなタスクでは、ワークフローの実行が 1 年間のサービスクォータに達するまで Step Functions を一時停止して (状態のスロットリングに関連するクォータ を参照)、外部のプロセスあるいはワークフローが完了するまで待機させることができます
今回の実装以外に、SQS を利用する実装についてもドキュメントに例が記載されているので、あわせてご参照ください。
Amazon SQS、Amazon SNS、Lambda を使ってコールバックパターンの例を作成する - AWS Step Functions
まとめ
今回は Step Functions で人間の承認を待つワークフローのデプロイのチュートリアルをやってみました。
どなたかの参考になれば幸いです。
Discussion