AWS StepFunctionsでAWS Batchの実行失敗をSlackに通知する
はじめに
AWS StepFunctionsで起きたエラーをSlackに通知する場合、AWSChatbotは利用できません。
LambdaからSlackのWebhookURLに対してPOSTして通知させるCDKコードを書きました。
動作するすべてのコードは下記にアップロードしています。
参考)
動作方法
AWSアカウントIDとリージョン、SlackのWebhookURLを利用します。またAWSCLIの設定は予め必要となります。
AWSアカウントIDはマネジメントコンソールから確認できます。
WebhookURLは下記の方法で作れます。
python -m venv .venv
. .venv/bin/activate
pip install -r requirements.txt
export AWS_ACCOUNT_ID=<<your aws account id>>
export REGION=<<your aws region>>
export SLACK_WEBHOOK_URL=<<slack notification webhook url>>
cdk bootstrap
cdk deploy
構成
-
batch
AWS Batchで動作するDockerの定義が入っています。 -
slack_notification
Slackに通知するLambdaのコードが入っています。 -
cdk
CDKの定義が入っています。
Batch
50%の確率で失敗するBatchの定義をしています。
import random
choice = random.choice([True, False])
if choice:
print("hello world")
else:
raise Exception("error occur.")
FROM python:3
WORKDIR /app
COPY . /app
CMD ["python","hello.py"]
slack_notification
LambdaからSlackにエラー通知するスクリプトです。
このプログラムは配列ジョブのケースなので、単一ジョブの場合はリンクを差し替える必要があります。
import json
import urllib3
import pprint
import os
http = urllib3.PoolManager()
def handler(event, context):
url = os.environ["SLACK_WEBHOOK_URL"]
rec = json.loads(event["Records"][0]["Sns"]["Message"])
region = "us-east-1"
job_id = None
if "Container" in rec and "Environment" in rec["Container"]:
for env in rec["Container"]["Environment"]:
if "Name" in env and "Value" in env and env["Name"] == "AWS_REGION":
region = env["Value"]
if "JobId" in rec:
job_id = rec["JobId"]
batch_url = f'https://{region}.console.aws.amazon.com/batch/home?region={region}#jobs/array-job/{job_id}'
output_message = f"""*AWS Batch failed to execute*
```
{pprint.pformat(rec)}
```
Link: <{batch_url}>
"""
else:
output_message = f"""*AWS Batch failed to execute*
```
{pprint.pformat(rec)}
```
"""
message_blocks = [
{
"type": "section",
"text": {
"type": "mrkdwn",
"text": output_message,
},
},
]
msg = {
"blocks": message_blocks,
}
encoded_msg = json.dumps(msg).encode("utf-8")
resp = http.request("POST", url, body=encoded_msg)
print(
{
"message": event["Records"][0]["Sns"]["Message"],
"status_code": resp.status,
"response": resp.data,
}
)
cdk
Batch用InstanceProfileの定義です。
from aws_cdk import aws_iam as iam
from constructs import Construct
class InstanceProfile(Construct):
'''
Custom construct for the Instance Profile resource.
Used to wrap the Instance Role construct.
'''
@property
def profile_arn(self):
if self._instance is None:
self._instance = self._create_instance()
return self._instance.attr_arn
def attach_role(self, role):
self._roles.append(role.role_name)
def _create_instance(self):
return iam.CfnInstanceProfile(self,
self._id + "cfn-instance-profile",
roles=self._roles
)
def __init__(self, scope: Construct, id: str):
super().__init__(scope, id)
self._roles = []
self._instance = None
self._id = id
cdk_stack.pyは長いので抜粋します。
まず、動作させるために下記のライブラリが必要になります。requirements.txtに追記があるので忘れずにpip installしてください。
aws-cdk.aws-batch-alpha
aws-cdk.aws_lambda_python_alpha
Batchが動作するVPCを作成しています。NATGatewayは0ですべてパブリックなサブネットで動作させます。
参考)
vpc = ec2.Vpc(
self, "batch-job-vpc", nat_gateways=0, max_azs=1
)
指定ディレクトリからECRのコンテナイメージにDockerイメージをアップする定義です。
docker_image_asset = ecr_assets.DockerImageAsset(
self,
"ecr-docker-image-asset",
directory="./batch",
)
docker_container_image = ecs.ContainerImage.from_docker_image_asset(
docker_image_asset
)
ここはStepFunctionsの定義です。
Batchを実行して成功したらsuccess_passを通して終了させ、失敗したらSNSで失敗を通知させます。
submit_job = stepfunctions_tasks.BatchSubmitJob(
self,
"batch-submit-job",
job_definition_arn=batch_job_definition.job_definition_arn,
job_queue_arn=job_queue.job_queue_arn,
array_size=5,
job_name="Job",
)
topic = sns.Topic(
self,
"batch-notification-topic",
display_name="chatbot-batch-notification-topic",
topic_name="ChatbotBatchNotificationTopic",
)
sns_publish_task = stepfunctions_tasks.SnsPublish(
self,
"batch-fail-publish",
topic=topic,
message=stepfunctions.TaskInput.from_json_path_at("$.Cause"),
)
success_pass = stepfunctions.Pass(self, "Succeeded")
submit_job.add_catch(sns_publish_task, errors=["States.ALL"])
# Create Chain
stepfunction_definition = submit_job.next(success_pass)
# Create State Machine
stepfunction_state_machine = stepfunctions.StateMachine(
self,
"StateMachine",
definition=stepfunction_definition,
timeout=Duration.minutes(5),
)
Pythonで書いたSlackにエラーを通知するコードをLambda化してSNSのサブスクリプションに追加するコードです。
slack_notification_func = aws_lambda_python.PythonFunction(
self,
"slack-notification",
entry="./slack_notification",
runtime=aws_lambda.Runtime.PYTHON_3_8,
environment={
"SLACK_WEBHOOK_URL", os.environ["SLACK_WEBHOOK_URL"]
}
)
slack_notification_func.add_event_source(
aws_lambda_event_sources.SnsEventSource(topic)
)
おわりに
当初Chatbotを利用してSlack通知しようとして動作させることができず非常に苦労しました。
今回なんとか最小構成でCDK化できたのでなにかの役にたてば幸いです。
Discussion