🐶

AWS StepFunctionsでAWS Batchの実行失敗をSlackに通知する

2022/11/01に公開約5,900字

はじめに

AWS StepFunctionsで起きたエラーをSlackに通知する場合、AWSChatbotは利用できません。
LambdaからSlackのWebhookURLに対してPOSTして通知させるCDKコードを書きました。

動作するすべてのコードは下記にアップロードしています。
https://github.com/inunekousapon/batch-stepfunction-test

参考)
https://dev.classmethod.jp/articles/slack-notify-incoming-webhook-using-lambda/

動作方法

AWSアカウントIDとリージョン、SlackのWebhookURLを利用します。またAWSCLIの設定は予め必要となります。

AWSアカウントIDはマネジメントコンソールから確認できます。

WebhookURLは下記の方法で作れます。
https://slack.com/intl/ja-jp/help/articles/115005265063-Slack-での-Incoming-Webhook-の利用

python -m venv .venv
. .venv/bin/activate
pip install -r requirements.txt

export AWS_ACCOUNT_ID=<<your aws account id>>
export REGION=<<your aws region>>
export SLACK_WEBHOOK_URL=<<slack notification webhook url>>

cdk bootstrap
cdk deploy

構成

  • batch
    AWS Batchで動作するDockerの定義が入っています。

  • slack_notification
    Slackに通知するLambdaのコードが入っています。

  • cdk
    CDKの定義が入っています。

Batch

50%の確率で失敗するBatchの定義をしています。

hello.py
import random

choice = random.choice([True, False])

if choice:
    print("hello world")
else:
    raise Exception("error occur.")
Dockerfile
FROM python:3
WORKDIR /app
COPY . /app
CMD ["python","hello.py"]

slack_notification

LambdaからSlackにエラー通知するスクリプトです。
このプログラムは配列ジョブのケースなので、単一ジョブの場合はリンクを差し替える必要があります。

index.py
import json
import urllib3
import pprint
import os

http = urllib3.PoolManager()



def handler(event, context):
    url = os.environ["SLACK_WEBHOOK_URL"]
    rec = json.loads(event["Records"][0]["Sns"]["Message"])

    region = "us-east-1"
    job_id = None
    if "Container" in rec and "Environment" in rec["Container"]:
        for env in rec["Container"]["Environment"]:
            if "Name" in env and "Value" in env and env["Name"] == "AWS_REGION":
                region = env["Value"]
    if "JobId" in rec:
        job_id = rec["JobId"]
        batch_url = f'https://{region}.console.aws.amazon.com/batch/home?region={region}#jobs/array-job/{job_id}'
        output_message = f"""*AWS Batch failed to execute*
```
        {pprint.pformat(rec)}
```
Link: <{batch_url}>
"""
    else:
        output_message = f"""*AWS Batch failed to execute*
```
{pprint.pformat(rec)}
```
"""
    message_blocks = [
        {
            "type": "section",
            "text": {
                "type": "mrkdwn",
                "text": output_message,
            },
        },
    ]

    msg = {
        "blocks": message_blocks,
    }

    encoded_msg = json.dumps(msg).encode("utf-8")
    resp = http.request("POST", url, body=encoded_msg)

    print(
        {
            "message": event["Records"][0]["Sns"]["Message"],
            "status_code": resp.status,
            "response": resp.data,
        }
    )

cdk

Batch用InstanceProfileの定義です。

instance_profile.py
from aws_cdk import aws_iam as iam
from constructs import Construct


class InstanceProfile(Construct):
    '''
    Custom construct for the Instance Profile resource.
    Used to wrap the Instance Role construct.
    '''
    @property
    def profile_arn(self):
        if self._instance is None:
            self._instance = self._create_instance()
        return self._instance.attr_arn

    def attach_role(self, role):
        self._roles.append(role.role_name)

    def _create_instance(self):
        return iam.CfnInstanceProfile(self,
            self._id + "cfn-instance-profile",
            roles=self._roles
        )

    def __init__(self, scope: Construct, id: str):
        super().__init__(scope, id)
        self._roles = []
        self._instance = None
        self._id = id

cdk_stack.pyは長いので抜粋します。
まず、動作させるために下記のライブラリが必要になります。requirements.txtに追記があるので忘れずにpip installしてください。

aws-cdk.aws-batch-alpha
aws-cdk.aws_lambda_python_alpha

Batchが動作するVPCを作成しています。NATGatewayは0ですべてパブリックなサブネットで動作させます。

参考)
https://dev.classmethod.jp/articles/reduce-unnecessary-costs-for-nat-gateway/

vpc = ec2.Vpc(
    self, "batch-job-vpc", nat_gateways=0, max_azs=1
)

指定ディレクトリからECRのコンテナイメージにDockerイメージをアップする定義です。

docker_image_asset = ecr_assets.DockerImageAsset(
    self,
    "ecr-docker-image-asset",
    directory="./batch",
)
docker_container_image = ecs.ContainerImage.from_docker_image_asset(
    docker_image_asset
)

ここはStepFunctionsの定義です。
Batchを実行して成功したらsuccess_passを通して終了させ、失敗したらSNSで失敗を通知させます。

submit_job = stepfunctions_tasks.BatchSubmitJob(
    self,
    "batch-submit-job",
    job_definition_arn=batch_job_definition.job_definition_arn,
    job_queue_arn=job_queue.job_queue_arn,
    array_size=5,
    job_name="Job",
)
topic = sns.Topic(
    self,
    "batch-notification-topic",
    display_name="chatbot-batch-notification-topic",
    topic_name="ChatbotBatchNotificationTopic",
)
sns_publish_task = stepfunctions_tasks.SnsPublish(
    self,
    "batch-fail-publish",
    topic=topic,
    message=stepfunctions.TaskInput.from_json_path_at("$.Cause"),
)

success_pass = stepfunctions.Pass(self, "Succeeded")        
submit_job.add_catch(sns_publish_task, errors=["States.ALL"])

# Create Chain
stepfunction_definition = submit_job.next(success_pass)

# Create State Machine
stepfunction_state_machine = stepfunctions.StateMachine(
    self,
    "StateMachine",
    definition=stepfunction_definition,
    timeout=Duration.minutes(5),
)

Pythonで書いたSlackにエラーを通知するコードをLambda化してSNSのサブスクリプションに追加するコードです。

slack_notification_func = aws_lambda_python.PythonFunction(
    self,
    "slack-notification",
    entry="./slack_notification",
    runtime=aws_lambda.Runtime.PYTHON_3_8,
    environment={
	"SLACK_WEBHOOK_URL", os.environ["SLACK_WEBHOOK_URL"]
    }
)
slack_notification_func.add_event_source(
    aws_lambda_event_sources.SnsEventSource(topic)
)

おわりに

当初Chatbotを利用してSlack通知しようとして動作させることができず非常に苦労しました。
今回なんとか最小構成でCDK化できたのでなにかの役にたてば幸いです。

Discussion

ログインするとコメントできます