🌊

Pythonでグラフ画像を作成して、Slackに通知する with AWS

2022/12/04に公開約21,600字

はじめに

わたしはちょっとしたグラフ画像を作成するときに、よく Python を使います。
その作成や、完了した旨の通知を Lambda で実現できれば、楽だなあと思っていました。
今回は、それに取り組みます。

コードに関する情報

実行環境

  • Python 3.9.15
  • Pipenv version 2022.11.11
  • Node.js v18.11.0
  • Docker

Node.js は AWS CDK 用、Docker は Lambda コンテナイメージのビルド用です。
Python が 3.9 系なのは、2022/12 時点で、lambda/python イメージの最新版が 3.9 系だからです。

ディレクトリ構成

├── cdk
│   ├── README.md
│   ├── bin
│   ├── cdk.json
│   ├── cdk.out
│   ├── jest.config.js
│   ├── lib
│   ├── node_modules
│   ├── package-lock.json
│   ├── package.json
│   ├── test
│   └── tsconfig.json
└── lambda
    ├── notification
    │   ├── Dockerfile
    │   ├── Pipfile
    │   ├── Pipfile.lock
    │   └── handler.py
    └── plot
        ├── Dockerfile
        ├── Pipfile
        ├── Pipfile.lock
        └── handler.py

リポジトリ

https://github.com/hirokisakabe/seaborn-lambda-slack-sample

グラフ作成

Python でグラフ画像を生成する

Python でグラフを描画するなら、matplotlib やそれをベースにした seaborn が候補に挙がります。

今回は複雑なグラフを描画することは目的ではないので、グラフ描画コードは seaborn のサンプルから拝借しましょう。

処理の最後で、描画したグラフをファイルに保存するように修正しています。

plot.py
import seaborn as sns
import matplotlib.pyplot as plt

sns.set_theme(style="ticks")

# seabornのサンプルデータセットを読み込む
df = sns.load_dataset("anscombe")

# グラフを描画する
sns.lmplot(
    data=df, x="x", y="y", col="dataset", hue="dataset",
    col_wrap=2, palette="muted", ci=None,
    height=4, scatter_kws={"s": 50, "alpha": 1}
)

# 描画したグラフをファイルに保存する
plt.savefig("sample.png")

これを実行してみます。

terminal
# ライブラリをインストール
pipenv install seaborn==0.12.1 matplotlib==3.6.2

# 実行
pipenv run python plot.py

同一ディレクトリに下記のグラフ画像ファイルが生成されました。


sample.png

Lambda を作成する

先ほどのコードを元に、Lambda のコードを作成します。

今回は、コンテナイメージの Lambda とします。

作成したグラフ画像は S3 にアップロードするようにしています。

Dockerfile
FROM public.ecr.aws/lambda/python:3.9

RUN pip install pipenv

COPY Pipfile Pipfile.lock ./

RUN pipenv install --system

COPY . .

CMD [ "handler.main" ]
Pipfile
[[source]]
url = "https://pypi.org/simple"
verify_ssl = true
name = "pypi"

[packages]
seaborn = "==0.12.1"
matplotlib = "==3.6.2"
boto3 = "==1.26.14"

[dev-packages]

[requires]
python_version = "3.9"
handler.py
import seaborn as sns
import matplotlib.pyplot as plt
import boto3
import os
from pathlib import PurePath


def main(event, context):
    file_path = "/tmp/sample.png"

    plot_and_save_to_file(file_path)

    upload_file_to_s3(file_path)


def plot_and_save_to_file(file_path: str):
    sns.set_theme(style="ticks")

    df = sns.load_dataset("anscombe")

    sns.lmplot(
        data=df,
        x="x",
        y="y",
        col="dataset",
        hue="dataset",
        col_wrap=2,
        palette="muted",
        ci=None,
        height=4,
        scatter_kws={"s": 50, "alpha": 1},
    )

    plt.savefig(file_path)


def upload_file_to_s3(file_path: str):
    S3_BUCKET_NAME = os.environ["S3_BUCKET_NAME"]

    s3 = boto3.resource("s3")

    obj = s3.Object(S3_BUCKET_NAME, PurePath(file_path).name)

    obj.upload_file(file_path)

AWS リソースを作成する

今回は、AWS CDK を用いて、必要な AWS リソースを定義します。

terminal
# CDKのコードをセットアップ(言語はTypeScript)
npx cdk init app --language typescript

cdk-stack.tsを下記のように修正します。

cdk-stack.ts
import { Construct } from "constructs";
import { join } from "path";
import {
  StackProps,
  Stack,
  RemovalPolicy,
  SecretValue,
  Duration,
} from "aws-cdk-lib";
import * as iam from "aws-cdk-lib/aws-iam";
import * as lambda from "aws-cdk-lib/aws-lambda";
import * as s3 from "aws-cdk-lib/aws-s3";
import * as s3deploy from "aws-cdk-lib/aws-s3-deployment";
import * as cloudfront from "aws-cdk-lib/aws-cloudfront";
import * as cloudfront_origins from "aws-cdk-lib/aws-cloudfront-origins";

export class CdkStack extends Stack {
  constructor(scope: Construct, id: string, props: StackProps) {
    super(scope, id, props);

    // ファイルを保存&公開するためのCloudFront&S3を定義

    const cloudfrontOAI = new cloudfront.OriginAccessIdentity(
      this,
      "cloudfront-OAI"
    );

    const assetBucket = new s3.Bucket(this, "AssetBucket", {
      removalPolicy: RemovalPolicy.DESTROY,
      autoDeleteObjects: true,
    });

    assetBucket.addToResourcePolicy(
      new iam.PolicyStatement({
        actions: ["s3:GetObject"],
        resources: [assetBucket.arnForObjects("*")],
        principals: [
          new iam.CanonicalUserPrincipal(
            cloudfrontOAI.cloudFrontOriginAccessIdentityS3CanonicalUserId
          ),
        ],
      })
    );

    const distribution = new cloudfront.Distribution(
      this,
      "AssetDistribution",
      {
        minimumProtocolVersion: cloudfront.SecurityPolicyProtocol.TLS_V1_2_2021,
        defaultBehavior: {
          origin: new cloudfront_origins.S3Origin(assetBucket, {
            originAccessIdentity: cloudfrontOAI,
          }),
          compress: true,
          allowedMethods: cloudfront.AllowedMethods.ALLOW_GET_HEAD_OPTIONS,
          viewerProtocolPolicy:
            cloudfront.ViewerProtocolPolicy.REDIRECT_TO_HTTPS,
        },
      }
    );

    const { deployedBucket } = new s3deploy.BucketDeployment(
      this,
      "AssetDeployment",
      {
        sources: [],
        destinationBucket: assetBucket,
        distribution,
        distributionPaths: ["/*"],
      }
    );

    // グラフを描画するLambda
    const plotLambda = new lambda.Function(this, "PlotLambda", {
      runtime: lambda.Runtime.FROM_IMAGE,
      handler: lambda.Handler.FROM_IMAGE,
      // Lambdaコードが存在するディレクトリを指定
      code: lambda.Code.fromAssetImage(join(__dirname, "../../lambda/plot")),
      environment: {
        S3_BUCKET_NAME: deployedBucket.bucketName,
      },
      timeout: Duration.minutes(3),
    });

    // LambdaからS3バケットにアップロードできるように許可
    deployedBucket.grantPut(plotLambda);
  }
}

CDK で定義したリソースをデプロイします。

terminal
npm run cdk bootstrap aws://<アカウント番号>/<リージョン>

npm run cdk deploy

Lambda を実行する

動作確認のため、Lambda を実行してみます。

S3 バケットにグラフ画像ファイルが保存されました。

Slack へのメッセージ送信

Python で Slack にメッセージを送る

メッセージの送信先となる Slack アプリを作成します。

Slack アプリの App Manifest は下記になります。

display_information:
  name: seaborn-lambda-slack-sample
features:
  bot_user:
    display_name: seaborn-lambda-slack-sample
    always_online: false
oauth_config:
  scopes:
    bot:
      - chat:write # ここで権限を付与。それ以外のパラメータは自由に設定可。
settings:
  org_deploy_enabled: false
  socket_mode_enabled: false
  token_rotation_enabled: false

From an app manifest で作成する場合は、上記の YML をペーストすれば、必要な権限が設定されます。

もしくは、手動で Bot Token Scopes の chat:write を付与してください。

Slack アプリを、ワークスペースにインストールします。

その後、アプリを任意のチャンネルに追加します。

メッセージを送信する Python コードを作成します。

notification.py
from slack_sdk import WebClient
import os

slack_token = os.environ["SLACK_BOT_TOKEN"]
channel = os.environ["SLACK_CHANNEL_TO_NOTIFY"]

client = WebClient(token=slack_token)

client.chat_postMessage(
    channel=channel,
    blocks=[
        {
            "type": "section",
            "text": {
                "type": "mrkdwn",
                "text": "ダミーメッセージ"
            },
            "accessory": {
                "type": "image",
                "image_url": "https://media.giphy.com/media/SVZGEcYt7brkFUyU90/giphy.gif",
                "alt_text": "ダミーテキスト"
            }
        }
    ]
)

実行してみます。

terminal
# ライブラリをインストール
pipenv install slack-sdk==3.19.4

# 環境変数を設定
export SLACK_BOT_TOKEN=<SlackアプリのBOTトークン>
export SLACK_CHANNEL_TO_NOTIFY=<Slackアプリを招待したチャンネルID>

# 実行
pipenv run python notification.py

Slack に通知が来ました。

Lambda を作成する

先ほどのコードを元に、Lambda のコードを作成します。

Slack アプリの BOT トークンなどは AWS Secrets Manager で管理するようにしました。

Dockerfile
FROM public.ecr.aws/lambda/python:3.9

RUN pip install pipenv

COPY Pipfile Pipfile.lock ./

RUN pipenv install --system

COPY . .

CMD [ "handler.main" ]
Pipfile
[[source]]
url = "https://pypi.org/simple"
verify_ssl = true
name = "pypi"

[packages]
boto3 = "==1.26.14"
slack-sdk = "==3.19.4"

[dev-packages]

[requires]
python_version = "3.9"
handler.py
from slack_sdk import WebClient
import boto3
import os
import json


def main(event, context):
    file_name = "dummy_file_name"

    notify_slack(file_name, "ファイルがアップロードされました")


def notify_slack(file_name: str, message: str):
    secretsmanager_client = boto3.client("secretsmanager")

    secret_id = os.environ["SLACK_CREDENTIALS_SECRET_ID"]

    secret_value = secretsmanager_client.get_secret_value(SecretId=secret_id)
    secret = json.loads(secret_value["SecretString"])

    slack_token = secret["SLACK_BOT_TOKEN"]
    channel = secret["SLACK_CHANNEL_TO_NOTIFY"]

    client = WebClient(token=slack_token)

    image_url = os.environ["CLOUD_FRONT_DISTRIBUTION_URL"] + "/" + file_name

    client.chat_postMessage(
        channel=channel,
        blocks=[
            {
                "type": "section",
                "text": {"type": "mrkdwn", "text": message},
                "accessory": {
                    "type": "image",
                    "image_url": image_url,
                    "alt_text": file_name,
                },
            }
        ],
    )

AWS リソースを変更する

CDK のコードを編集して、Slack へのメッセージ送信に関するリソースを作成します。

cdk-stack.tsの追加内容
// Slackにアクセスするためのシークレット
const secret = new secretsmanager.Secret(this, "Secret", {
  removalPolicy: RemovalPolicy.DESTROY,
  // デプロイ後に実際の値を手動で設定すること
  secretObjectValue: {
    SLACK_BOT_TOKEN: SecretValue.unsafePlainText("dummy"),
    SLACK_CHANNEL_TO_NOTIFY: SecretValue.unsafePlainText("dummy"),
  },
});

// Slackへメッセージを送信するLambda
const notificationLambda = new lambda.Function(this, "NotificationLambda", {
  runtime: lambda.Runtime.FROM_IMAGE,
  handler: lambda.Handler.FROM_IMAGE,
  // Lambdaコードが存在するディレクトリを指定
  code: lambda.Code.fromAssetImage(
    join(__dirname, "../../lambda/notification")
  ),
  environment: {
    SLACK_CREDENTIALS_SECRET_ID: secret.secretName,
    CLOUD_FRONT_DISTRIBUTION_URL:
      "https://" + distribution.distributionDomainName,
  },
});

// Lambdaがシークレットを取得できるように許可
secret.grantRead(notificationLambda);
更新後の cdk-stack.ts
cdk-stack.ts
import { Construct } from "constructs";
import { join } from "path";
import {
  StackProps,
  Stack,
  RemovalPolicy,
  SecretValue,
  Duration,
} from "aws-cdk-lib";
import * as iam from "aws-cdk-lib/aws-iam";
import * as lambda from "aws-cdk-lib/aws-lambda";
import * as s3 from "aws-cdk-lib/aws-s3";
import * as s3deploy from "aws-cdk-lib/aws-s3-deployment";
import * as cloudfront from "aws-cdk-lib/aws-cloudfront";
import * as cloudfront_origins from "aws-cdk-lib/aws-cloudfront-origins";
import * as secretsmanager from "aws-cdk-lib/aws-secretsmanager";

export class CdkStack extends Stack {
  constructor(scope: Construct, id: string, props: StackProps) {
    super(scope, id, props);

    // ファイルを保存&公開するためのCloudFront&S3を定義

    const cloudfrontOAI = new cloudfront.OriginAccessIdentity(
      this,
      "cloudfront-OAI"
    );

    const assetBucket = new s3.Bucket(this, "AssetBucket", {
      removalPolicy: RemovalPolicy.DESTROY,
      autoDeleteObjects: true,
    });

    assetBucket.addToResourcePolicy(
      new iam.PolicyStatement({
        actions: ["s3:GetObject"],
        resources: [assetBucket.arnForObjects("*")],
        principals: [
          new iam.CanonicalUserPrincipal(
            cloudfrontOAI.cloudFrontOriginAccessIdentityS3CanonicalUserId
          ),
        ],
      })
    );

    const distribution = new cloudfront.Distribution(
      this,
      "AssetDistribution",
      {
        minimumProtocolVersion: cloudfront.SecurityPolicyProtocol.TLS_V1_2_2021,
        defaultBehavior: {
          origin: new cloudfront_origins.S3Origin(assetBucket, {
            originAccessIdentity: cloudfrontOAI,
          }),
          compress: true,
          allowedMethods: cloudfront.AllowedMethods.ALLOW_GET_HEAD_OPTIONS,
          viewerProtocolPolicy:
            cloudfront.ViewerProtocolPolicy.REDIRECT_TO_HTTPS,
        },
      }
    );

    const { deployedBucket } = new s3deploy.BucketDeployment(
      this,
      "AssetDeployment",
      {
        sources: [],
        destinationBucket: assetBucket,
        distribution,
        distributionPaths: ["/*"],
      }
    );

    // グラフを描画するLambda
    const plotLambda = new lambda.Function(this, "PlotLambda", {
      runtime: lambda.Runtime.FROM_IMAGE,
      handler: lambda.Handler.FROM_IMAGE,
      // Lambdaコードが存在するディレクトリを指定
      code: lambda.Code.fromAssetImage(join(__dirname, "../../lambda/plot")),
      environment: {
        S3_BUCKET_NAME: deployedBucket.bucketName,
      },
      timeout: Duration.minutes(3),
    });

    deployedBucket.grantPut(plotLambda);

    // Slackにアクセスするためのシークレット
    const secret = new secretsmanager.Secret(this, "Secret", {
      removalPolicy: RemovalPolicy.DESTROY,
      // デプロイ後に実際の値を手動で設定すること
      secretObjectValue: {
        SLACK_BOT_TOKEN: SecretValue.unsafePlainText("dummy"),
        SLACK_CHANNEL_TO_NOTIFY: SecretValue.unsafePlainText("dummy"),
      },
    });

    const  = new lambda.Function(this, "NotificationLambda", {
      runtime: lambda.Runtime.FROM_IMAGE,
      handler: lambda.Handler.FROM_IMAGE,
      // Lambdaコードが存在するディレクトリを指定
      code: lambda.Code.fromAssetImage(
        join(__dirname, "../../lambda/notification")
      ),
      environment: {
        SLACK_CREDENTIALS_SECRET_ID: secret.secretName,
        CLOUD_FRONT_DISTRIBUTION_URL:
          "https://" + distribution.distributionDomainName,
      },
    });

    secret.grantRead(notificationLambda);
  }
}

シークレットをコンソールから手動で設定します。

Lambda を実行する

動作確認のため、Lambda を実行してみます。

Slack に通知が来ました。

2 つの Lambda を Step Functions を使って連続で実行する

では、仕上げにグラフ作成の Lambda と Slack へのメッセージ送信の Lambda を連続で実行できるようにしてみます。今回は AWS Step Functions を使って実現します。

Lambda を変更する

1 つ目の Lambda から 2 つ目の Lambda にファイル名を送るように変更します

plot/handler.py
import seaborn as sns
import matplotlib.pyplot as plt
import boto3
import os
from pathlib import PurePath


def main(event, context):
    file_path = "/tmp/sample.png"

    plot_and_save_to_file(file_path)

    upload_file_to_s3(file_path)

+   return {"file_name": PurePath(file_path).name}


def plot_and_save_to_file(file_path: str):
    sns.set_theme(style="ticks")

    df = sns.load_dataset("anscombe")

    sns.lmplot(
        data=df,
        x="x",
        y="y",
        col="dataset",
        hue="dataset",
        col_wrap=2,
        palette="muted",
        ci=None,
        height=4,
        scatter_kws={"s": 50, "alpha": 1},
    )

    plt.savefig(file_path)


def upload_file_to_s3(file_path: str):
    S3_BUCKET_NAME = os.environ["S3_BUCKET_NAME"]

    s3 = boto3.resource("s3")

    obj = s3.Object(S3_BUCKET_NAME, PurePath(file_path).name)

    obj.upload_file(file_path)

nofitication/handler.py
from slack_sdk import WebClient
import boto3
import os
import json


def main(event, context):
-   file_name = "dummy_file_name"
+   file_name = event["Payload"]["file_name"]

    notify_slack(file_name, "ファイルがアップロードされました")


def notify_slack(file_name: str, message: str):
    secretsmanager_client = boto3.client("secretsmanager")

    secret_id = os.environ["SLACK_CREDENTIALS_SECRET_ID"]

    secret_value = secretsmanager_client.get_secret_value(SecretId=secret_id)
    secret = json.loads(secret_value["SecretString"])

    slack_token = secret["SLACK_BOT_TOKEN"]
    channel = secret["SLACK_CHANNEL_TO_NOTIFY"]

    client = WebClient(token=slack_token)

    image_url = os.environ["CLOUD_FRONT_DISTRIBUTION_URL"] + "/" + file_name

    client.chat_postMessage(
        channel=channel,
        blocks=[
            {
                "type": "section",
                "text": {"type": "mrkdwn", "text": message},
                "accessory": {
                    "type": "image",
                    "image_url": image_url,
                    "alt_text": file_name,
                },
            }
        ],
    )

AWS リソースを変更する

CDK のコードを編集して、Step Functions に関するリソースを作成します。

cdk-stack.tsの追加内容
const plotLambdaTask = new stepfunctions_tasks.LambdaInvoke(
  this,
  "PlotLambdaTask",
  {
    lambdaFunction: plotLambda,
  }
);

const notificationLambdaTask = new stepfunctions_tasks.LambdaInvoke(
  this,
  "NotificationLambdaTask",
  { lambdaFunction: notificationLambda }
);

const definition = plotLambdaTask.next(notificationLambdaTask);

new stepfunctions.StateMachine(this, "StateMachine", { definition });
更新後の cdk-stack.ts
cdk-stack.ts
import { Construct } from "constructs";
import { join } from "path";
import {
  StackProps,
  Stack,
  RemovalPolicy,
  SecretValue,
  Duration,
} from "aws-cdk-lib";
import * as iam from "aws-cdk-lib/aws-iam";
import * as lambda from "aws-cdk-lib/aws-lambda";
import * as s3 from "aws-cdk-lib/aws-s3";
import * as s3deploy from "aws-cdk-lib/aws-s3-deployment";
import * as cloudfront from "aws-cdk-lib/aws-cloudfront";
import * as cloudfront_origins from "aws-cdk-lib/aws-cloudfront-origins";
import * as secretsmanager from "aws-cdk-lib/aws-secretsmanager";
import * as stepfunctions from "aws-cdk-lib/aws-stepfunctions";
import * as stepfunctions_tasks from "aws-cdk-lib/aws-stepfunctions-tasks";

export class CdkStack extends Stack {
  constructor(scope: Construct, id: string, props: StackProps) {
    super(scope, id, props);

    // ファイルを保存&公開するためのCloudFront&S3を定義

    const cloudfrontOAI = new cloudfront.OriginAccessIdentity(
      this,
      "cloudfront-OAI"
    );

    const assetBucket = new s3.Bucket(this, "AssetBucket", {
      removalPolicy: RemovalPolicy.DESTROY,
      autoDeleteObjects: true,
    });

    assetBucket.addToResourcePolicy(
      new iam.PolicyStatement({
        actions: ["s3:GetObject"],
        resources: [assetBucket.arnForObjects("*")],
        principals: [
          new iam.CanonicalUserPrincipal(
            cloudfrontOAI.cloudFrontOriginAccessIdentityS3CanonicalUserId
          ),
        ],
      })
    );

    const distribution = new cloudfront.Distribution(
      this,
      "AssetDistribution",
      {
        minimumProtocolVersion: cloudfront.SecurityPolicyProtocol.TLS_V1_2_2021,
        defaultBehavior: {
          origin: new cloudfront_origins.S3Origin(assetBucket, {
            originAccessIdentity: cloudfrontOAI,
          }),
          compress: true,
          allowedMethods: cloudfront.AllowedMethods.ALLOW_GET_HEAD_OPTIONS,
          viewerProtocolPolicy:
            cloudfront.ViewerProtocolPolicy.REDIRECT_TO_HTTPS,
        },
      }
    );

    const { deployedBucket } = new s3deploy.BucketDeployment(
      this,
      "AssetDeployment",
      {
        sources: [],
        destinationBucket: assetBucket,
        distribution,
        distributionPaths: ["/*"],
      }
    );

    // グラフを描画するLambda
    const plotLambda = new lambda.Function(this, "PlotLambda", {
      runtime: lambda.Runtime.FROM_IMAGE,
      handler: lambda.Handler.FROM_IMAGE,
      // Lambdaコードが存在するディレクトリを指定
      code: lambda.Code.fromAssetImage(join(__dirname, "../../lambda/plot")),
      environment: {
        S3_BUCKET_NAME: deployedBucket.bucketName,
      },
      timeout: Duration.minutes(3),
    });

    deployedBucket.grantPut(plotLambda);

    // Slackにアクセスするためのシークレット
    const secret = new secretsmanager.Secret(this, "Secret", {
      removalPolicy: RemovalPolicy.DESTROY,
      // デプロイ後に実際の値を手動で設定すること
      secretObjectValue: {
        SLACK_BOT_TOKEN: SecretValue.unsafePlainText("dummy"),
        SLACK_CHANNEL_TO_NOTIFY: SecretValue.unsafePlainText("dummy"),
      },
    });

    // Slackへメッセージを送信するLambda
    const notificationLambda = new lambda.Function(this, "NotificationLambda", {
      runtime: lambda.Runtime.FROM_IMAGE,
      handler: lambda.Handler.FROM_IMAGE,
      // Lambdaコードが存在するディレクトリを指定
      code: lambda.Code.fromAssetImage(
        join(__dirname, "../../lambda/notification")
      ),
      environment: {
        SLACK_CREDENTIALS_SECRET_ID: secret.secretName,
        CLOUD_FRONT_DISTRIBUTION_URL:
          "https://" + distribution.distributionDomainName,
      },
    });

    secret.grantRead(notificationLambda);

    const plotLambdaTask = new stepfunctions_tasks.LambdaInvoke(
      this,
      "PlotLambdaTask",
      {
        lambdaFunction: plotLambda,
      }
    );

    const notificationLambdaTask = new stepfunctions_tasks.LambdaInvoke(
      this,
      "NotificationLambdaTask",
      { lambdaFunction: notificationLambda }
    );

    const definition = plotLambdaTask.next(notificationLambdaTask);

    new stepfunctions.StateMachine(this, "StateMachine", { definition });
  }
}

Step Functions を実行する

Step Functions を実行してみます。

実行が完了しました。

Slack に通知が来ました。

最後に

今回は Step Functions を手動実行しましたが、定期実行するように設定すれば、毎日 DB から値を取得してレポート用のグラフを作成する、のようなことに応用が効くと思います。

Discussion

ログインするとコメントできます