Pythonでグラフ画像を作成して、Slackに通知する with AWS
はじめに
わたしはちょっとしたグラフ画像を作成するときに、よく Python を使います。
その作成や、完了した旨の通知を Lambda で実現できれば、楽だなあと思っていました。
今回は、それに取り組みます。
コードに関する情報
実行環境
- Python 3.9.15
- Pipenv version 2022.11.11
- Node.js v18.11.0
- Docker
Node.js は AWS CDK 用、Docker は Lambda コンテナイメージのビルド用です。
Python が 3.9 系なのは、2022/12 時点で、lambda/python イメージの最新版が 3.9 系だからです。
ディレクトリ構成
├── cdk
│ ├── README.md
│ ├── bin
│ ├── cdk.json
│ ├── cdk.out
│ ├── jest.config.js
│ ├── lib
│ ├── node_modules
│ ├── package-lock.json
│ ├── package.json
│ ├── test
│ └── tsconfig.json
└── lambda
├── notification
│ ├── Dockerfile
│ ├── Pipfile
│ ├── Pipfile.lock
│ └── handler.py
└── plot
├── Dockerfile
├── Pipfile
├── Pipfile.lock
└── handler.py
リポジトリ
グラフ作成
Python でグラフ画像を生成する
Python でグラフを描画するなら、matplotlib やそれをベースにした seaborn が候補に挙がります。
今回は複雑なグラフを描画することは目的ではないので、グラフ描画コードは seaborn のサンプルから拝借しましょう。
処理の最後で、描画したグラフをファイルに保存するように修正しています。
import seaborn as sns
import matplotlib.pyplot as plt
sns.set_theme(style="ticks")
# seabornのサンプルデータセットを読み込む
df = sns.load_dataset("anscombe")
# グラフを描画する
sns.lmplot(
data=df, x="x", y="y", col="dataset", hue="dataset",
col_wrap=2, palette="muted", ci=None,
height=4, scatter_kws={"s": 50, "alpha": 1}
)
# 描画したグラフをファイルに保存する
plt.savefig("sample.png")
これを実行してみます。
# ライブラリをインストール
pipenv install seaborn==0.12.1 matplotlib==3.6.2
# 実行
pipenv run python plot.py
同一ディレクトリに下記のグラフ画像ファイルが生成されました。
sample.png
Lambda を作成する
先ほどのコードを元に、Lambda のコードを作成します。
今回は、コンテナイメージの Lambda とします。
作成したグラフ画像は S3 にアップロードするようにしています。
FROM public.ecr.aws/lambda/python:3.9
RUN pip install pipenv
COPY Pipfile Pipfile.lock ./
RUN pipenv install --system
COPY . .
CMD [ "handler.main" ]
[[source]]
url = "https://pypi.org/simple"
verify_ssl = true
name = "pypi"
[packages]
seaborn = "==0.12.1"
matplotlib = "==3.6.2"
boto3 = "==1.26.14"
[dev-packages]
[requires]
python_version = "3.9"
import seaborn as sns
import matplotlib.pyplot as plt
import boto3
import os
from pathlib import PurePath
def main(event, context):
file_path = "/tmp/sample.png"
plot_and_save_to_file(file_path)
upload_file_to_s3(file_path)
def plot_and_save_to_file(file_path: str):
sns.set_theme(style="ticks")
df = sns.load_dataset("anscombe")
sns.lmplot(
data=df,
x="x",
y="y",
col="dataset",
hue="dataset",
col_wrap=2,
palette="muted",
ci=None,
height=4,
scatter_kws={"s": 50, "alpha": 1},
)
plt.savefig(file_path)
def upload_file_to_s3(file_path: str):
S3_BUCKET_NAME = os.environ["S3_BUCKET_NAME"]
s3 = boto3.resource("s3")
obj = s3.Object(S3_BUCKET_NAME, PurePath(file_path).name)
obj.upload_file(file_path)
AWS リソースを作成する
今回は、AWS CDK を用いて、必要な AWS リソースを定義します。
# CDKのコードをセットアップ(言語はTypeScript)
npx cdk init app --language typescript
cdk-stack.ts
を下記のように修正します。
import { Construct } from "constructs";
import { join } from "path";
import {
StackProps,
Stack,
RemovalPolicy,
SecretValue,
Duration,
} from "aws-cdk-lib";
import * as iam from "aws-cdk-lib/aws-iam";
import * as lambda from "aws-cdk-lib/aws-lambda";
import * as s3 from "aws-cdk-lib/aws-s3";
import * as s3deploy from "aws-cdk-lib/aws-s3-deployment";
import * as cloudfront from "aws-cdk-lib/aws-cloudfront";
import * as cloudfront_origins from "aws-cdk-lib/aws-cloudfront-origins";
export class CdkStack extends Stack {
constructor(scope: Construct, id: string, props: StackProps) {
super(scope, id, props);
// ファイルを保存&公開するためのCloudFront&S3を定義
const cloudfrontOAI = new cloudfront.OriginAccessIdentity(
this,
"cloudfront-OAI"
);
const assetBucket = new s3.Bucket(this, "AssetBucket", {
removalPolicy: RemovalPolicy.DESTROY,
autoDeleteObjects: true,
});
assetBucket.addToResourcePolicy(
new iam.PolicyStatement({
actions: ["s3:GetObject"],
resources: [assetBucket.arnForObjects("*")],
principals: [
new iam.CanonicalUserPrincipal(
cloudfrontOAI.cloudFrontOriginAccessIdentityS3CanonicalUserId
),
],
})
);
const distribution = new cloudfront.Distribution(
this,
"AssetDistribution",
{
minimumProtocolVersion: cloudfront.SecurityPolicyProtocol.TLS_V1_2_2021,
defaultBehavior: {
origin: new cloudfront_origins.S3Origin(assetBucket, {
originAccessIdentity: cloudfrontOAI,
}),
compress: true,
allowedMethods: cloudfront.AllowedMethods.ALLOW_GET_HEAD_OPTIONS,
viewerProtocolPolicy:
cloudfront.ViewerProtocolPolicy.REDIRECT_TO_HTTPS,
},
}
);
const { deployedBucket } = new s3deploy.BucketDeployment(
this,
"AssetDeployment",
{
sources: [],
destinationBucket: assetBucket,
distribution,
distributionPaths: ["/*"],
}
);
// グラフを描画するLambda
const plotLambda = new lambda.Function(this, "PlotLambda", {
runtime: lambda.Runtime.FROM_IMAGE,
handler: lambda.Handler.FROM_IMAGE,
// Lambdaコードが存在するディレクトリを指定
code: lambda.Code.fromAssetImage(join(__dirname, "../../lambda/plot")),
environment: {
S3_BUCKET_NAME: deployedBucket.bucketName,
},
timeout: Duration.minutes(3),
});
// LambdaからS3バケットにアップロードできるように許可
deployedBucket.grantPut(plotLambda);
}
}
CDK で定義したリソースをデプロイします。
npm run cdk bootstrap aws://<アカウント番号>/<リージョン>
npm run cdk deploy
Lambda を実行する
動作確認のため、Lambda を実行してみます。
S3 バケットにグラフ画像ファイルが保存されました。
Slack へのメッセージ送信
Python で Slack にメッセージを送る
メッセージの送信先となる Slack アプリを作成します。
Slack アプリの App Manifest は下記になります。
display_information:
name: seaborn-lambda-slack-sample
features:
bot_user:
display_name: seaborn-lambda-slack-sample
always_online: false
oauth_config:
scopes:
bot:
- chat:write # ここで権限を付与。それ以外のパラメータは自由に設定可。
settings:
org_deploy_enabled: false
socket_mode_enabled: false
token_rotation_enabled: false
From an app manifest で作成する場合は、上記の YML をペーストすれば、必要な権限が設定されます。
もしくは、手動で Bot Token Scopes の chat:write を付与してください。
Slack アプリを、ワークスペースにインストールします。
その後、アプリを任意のチャンネルに追加します。
メッセージを送信する Python コードを作成します。
from slack_sdk import WebClient
import os
slack_token = os.environ["SLACK_BOT_TOKEN"]
channel = os.environ["SLACK_CHANNEL_TO_NOTIFY"]
client = WebClient(token=slack_token)
client.chat_postMessage(
channel=channel,
blocks=[
{
"type": "section",
"text": {
"type": "mrkdwn",
"text": "ダミーメッセージ"
},
"accessory": {
"type": "image",
"image_url": "https://media.giphy.com/media/SVZGEcYt7brkFUyU90/giphy.gif",
"alt_text": "ダミーテキスト"
}
}
]
)
実行してみます。
# ライブラリをインストール
pipenv install slack-sdk==3.19.4
# 環境変数を設定
export SLACK_BOT_TOKEN=<SlackアプリのBOTトークン>
export SLACK_CHANNEL_TO_NOTIFY=<Slackアプリを招待したチャンネルID>
# 実行
pipenv run python notification.py
Slack に通知が来ました。
Lambda を作成する
先ほどのコードを元に、Lambda のコードを作成します。
Slack アプリの BOT トークンなどは AWS Secrets Manager で管理するようにしました。
FROM public.ecr.aws/lambda/python:3.9
RUN pip install pipenv
COPY Pipfile Pipfile.lock ./
RUN pipenv install --system
COPY . .
CMD [ "handler.main" ]
[[source]]
url = "https://pypi.org/simple"
verify_ssl = true
name = "pypi"
[packages]
boto3 = "==1.26.14"
slack-sdk = "==3.19.4"
[dev-packages]
[requires]
python_version = "3.9"
from slack_sdk import WebClient
import boto3
import os
import json
def main(event, context):
file_name = "dummy_file_name"
notify_slack(file_name, "ファイルがアップロードされました")
def notify_slack(file_name: str, message: str):
secretsmanager_client = boto3.client("secretsmanager")
secret_id = os.environ["SLACK_CREDENTIALS_SECRET_ID"]
secret_value = secretsmanager_client.get_secret_value(SecretId=secret_id)
secret = json.loads(secret_value["SecretString"])
slack_token = secret["SLACK_BOT_TOKEN"]
channel = secret["SLACK_CHANNEL_TO_NOTIFY"]
client = WebClient(token=slack_token)
image_url = os.environ["CLOUD_FRONT_DISTRIBUTION_URL"] + "/" + file_name
client.chat_postMessage(
channel=channel,
blocks=[
{
"type": "section",
"text": {"type": "mrkdwn", "text": message},
"accessory": {
"type": "image",
"image_url": image_url,
"alt_text": file_name,
},
}
],
)
AWS リソースを変更する
CDK のコードを編集して、Slack へのメッセージ送信に関するリソースを作成します。
// Slackにアクセスするためのシークレット
const secret = new secretsmanager.Secret(this, "Secret", {
removalPolicy: RemovalPolicy.DESTROY,
// デプロイ後に実際の値を手動で設定すること
secretObjectValue: {
SLACK_BOT_TOKEN: SecretValue.unsafePlainText("dummy"),
SLACK_CHANNEL_TO_NOTIFY: SecretValue.unsafePlainText("dummy"),
},
});
// Slackへメッセージを送信するLambda
const notificationLambda = new lambda.Function(this, "NotificationLambda", {
runtime: lambda.Runtime.FROM_IMAGE,
handler: lambda.Handler.FROM_IMAGE,
// Lambdaコードが存在するディレクトリを指定
code: lambda.Code.fromAssetImage(
join(__dirname, "../../lambda/notification")
),
environment: {
SLACK_CREDENTIALS_SECRET_ID: secret.secretName,
CLOUD_FRONT_DISTRIBUTION_URL:
"https://" + distribution.distributionDomainName,
},
});
// Lambdaがシークレットを取得できるように許可
secret.grantRead(notificationLambda);
更新後の cdk-stack.ts
import { Construct } from "constructs";
import { join } from "path";
import {
StackProps,
Stack,
RemovalPolicy,
SecretValue,
Duration,
} from "aws-cdk-lib";
import * as iam from "aws-cdk-lib/aws-iam";
import * as lambda from "aws-cdk-lib/aws-lambda";
import * as s3 from "aws-cdk-lib/aws-s3";
import * as s3deploy from "aws-cdk-lib/aws-s3-deployment";
import * as cloudfront from "aws-cdk-lib/aws-cloudfront";
import * as cloudfront_origins from "aws-cdk-lib/aws-cloudfront-origins";
import * as secretsmanager from "aws-cdk-lib/aws-secretsmanager";
export class CdkStack extends Stack {
constructor(scope: Construct, id: string, props: StackProps) {
super(scope, id, props);
// ファイルを保存&公開するためのCloudFront&S3を定義
const cloudfrontOAI = new cloudfront.OriginAccessIdentity(
this,
"cloudfront-OAI"
);
const assetBucket = new s3.Bucket(this, "AssetBucket", {
removalPolicy: RemovalPolicy.DESTROY,
autoDeleteObjects: true,
});
assetBucket.addToResourcePolicy(
new iam.PolicyStatement({
actions: ["s3:GetObject"],
resources: [assetBucket.arnForObjects("*")],
principals: [
new iam.CanonicalUserPrincipal(
cloudfrontOAI.cloudFrontOriginAccessIdentityS3CanonicalUserId
),
],
})
);
const distribution = new cloudfront.Distribution(
this,
"AssetDistribution",
{
minimumProtocolVersion: cloudfront.SecurityPolicyProtocol.TLS_V1_2_2021,
defaultBehavior: {
origin: new cloudfront_origins.S3Origin(assetBucket, {
originAccessIdentity: cloudfrontOAI,
}),
compress: true,
allowedMethods: cloudfront.AllowedMethods.ALLOW_GET_HEAD_OPTIONS,
viewerProtocolPolicy:
cloudfront.ViewerProtocolPolicy.REDIRECT_TO_HTTPS,
},
}
);
const { deployedBucket } = new s3deploy.BucketDeployment(
this,
"AssetDeployment",
{
sources: [],
destinationBucket: assetBucket,
distribution,
distributionPaths: ["/*"],
}
);
// グラフを描画するLambda
const plotLambda = new lambda.Function(this, "PlotLambda", {
runtime: lambda.Runtime.FROM_IMAGE,
handler: lambda.Handler.FROM_IMAGE,
// Lambdaコードが存在するディレクトリを指定
code: lambda.Code.fromAssetImage(join(__dirname, "../../lambda/plot")),
environment: {
S3_BUCKET_NAME: deployedBucket.bucketName,
},
timeout: Duration.minutes(3),
});
deployedBucket.grantPut(plotLambda);
// Slackにアクセスするためのシークレット
const secret = new secretsmanager.Secret(this, "Secret", {
removalPolicy: RemovalPolicy.DESTROY,
// デプロイ後に実際の値を手動で設定すること
secretObjectValue: {
SLACK_BOT_TOKEN: SecretValue.unsafePlainText("dummy"),
SLACK_CHANNEL_TO_NOTIFY: SecretValue.unsafePlainText("dummy"),
},
});
const = new lambda.Function(this, "NotificationLambda", {
runtime: lambda.Runtime.FROM_IMAGE,
handler: lambda.Handler.FROM_IMAGE,
// Lambdaコードが存在するディレクトリを指定
code: lambda.Code.fromAssetImage(
join(__dirname, "../../lambda/notification")
),
environment: {
SLACK_CREDENTIALS_SECRET_ID: secret.secretName,
CLOUD_FRONT_DISTRIBUTION_URL:
"https://" + distribution.distributionDomainName,
},
});
secret.grantRead(notificationLambda);
}
}
シークレットをコンソールから手動で設定します。
Lambda を実行する
動作確認のため、Lambda を実行してみます。
Slack に通知が来ました。
2 つの Lambda を Step Functions を使って連続で実行する
では、仕上げにグラフ作成の Lambda と Slack へのメッセージ送信の Lambda を連続で実行できるようにしてみます。今回は AWS Step Functions を使って実現します。
Lambda を変更する
1 つ目の Lambda から 2 つ目の Lambda にファイル名を送るように変更します
import seaborn as sns
import matplotlib.pyplot as plt
import boto3
import os
from pathlib import PurePath
def main(event, context):
file_path = "/tmp/sample.png"
plot_and_save_to_file(file_path)
upload_file_to_s3(file_path)
+ return {"file_name": PurePath(file_path).name}
def plot_and_save_to_file(file_path: str):
sns.set_theme(style="ticks")
df = sns.load_dataset("anscombe")
sns.lmplot(
data=df,
x="x",
y="y",
col="dataset",
hue="dataset",
col_wrap=2,
palette="muted",
ci=None,
height=4,
scatter_kws={"s": 50, "alpha": 1},
)
plt.savefig(file_path)
def upload_file_to_s3(file_path: str):
S3_BUCKET_NAME = os.environ["S3_BUCKET_NAME"]
s3 = boto3.resource("s3")
obj = s3.Object(S3_BUCKET_NAME, PurePath(file_path).name)
obj.upload_file(file_path)
from slack_sdk import WebClient
import boto3
import os
import json
def main(event, context):
- file_name = "dummy_file_name"
+ file_name = event["Payload"]["file_name"]
notify_slack(file_name, "ファイルがアップロードされました")
def notify_slack(file_name: str, message: str):
secretsmanager_client = boto3.client("secretsmanager")
secret_id = os.environ["SLACK_CREDENTIALS_SECRET_ID"]
secret_value = secretsmanager_client.get_secret_value(SecretId=secret_id)
secret = json.loads(secret_value["SecretString"])
slack_token = secret["SLACK_BOT_TOKEN"]
channel = secret["SLACK_CHANNEL_TO_NOTIFY"]
client = WebClient(token=slack_token)
image_url = os.environ["CLOUD_FRONT_DISTRIBUTION_URL"] + "/" + file_name
client.chat_postMessage(
channel=channel,
blocks=[
{
"type": "section",
"text": {"type": "mrkdwn", "text": message},
"accessory": {
"type": "image",
"image_url": image_url,
"alt_text": file_name,
},
}
],
)
AWS リソースを変更する
CDK のコードを編集して、Step Functions に関するリソースを作成します。
const plotLambdaTask = new stepfunctions_tasks.LambdaInvoke(
this,
"PlotLambdaTask",
{
lambdaFunction: plotLambda,
}
);
const notificationLambdaTask = new stepfunctions_tasks.LambdaInvoke(
this,
"NotificationLambdaTask",
{ lambdaFunction: notificationLambda }
);
const definition = plotLambdaTask.next(notificationLambdaTask);
new stepfunctions.StateMachine(this, "StateMachine", { definition });
更新後の cdk-stack.ts
import { Construct } from "constructs";
import { join } from "path";
import {
StackProps,
Stack,
RemovalPolicy,
SecretValue,
Duration,
} from "aws-cdk-lib";
import * as iam from "aws-cdk-lib/aws-iam";
import * as lambda from "aws-cdk-lib/aws-lambda";
import * as s3 from "aws-cdk-lib/aws-s3";
import * as s3deploy from "aws-cdk-lib/aws-s3-deployment";
import * as cloudfront from "aws-cdk-lib/aws-cloudfront";
import * as cloudfront_origins from "aws-cdk-lib/aws-cloudfront-origins";
import * as secretsmanager from "aws-cdk-lib/aws-secretsmanager";
import * as stepfunctions from "aws-cdk-lib/aws-stepfunctions";
import * as stepfunctions_tasks from "aws-cdk-lib/aws-stepfunctions-tasks";
export class CdkStack extends Stack {
constructor(scope: Construct, id: string, props: StackProps) {
super(scope, id, props);
// ファイルを保存&公開するためのCloudFront&S3を定義
const cloudfrontOAI = new cloudfront.OriginAccessIdentity(
this,
"cloudfront-OAI"
);
const assetBucket = new s3.Bucket(this, "AssetBucket", {
removalPolicy: RemovalPolicy.DESTROY,
autoDeleteObjects: true,
});
assetBucket.addToResourcePolicy(
new iam.PolicyStatement({
actions: ["s3:GetObject"],
resources: [assetBucket.arnForObjects("*")],
principals: [
new iam.CanonicalUserPrincipal(
cloudfrontOAI.cloudFrontOriginAccessIdentityS3CanonicalUserId
),
],
})
);
const distribution = new cloudfront.Distribution(
this,
"AssetDistribution",
{
minimumProtocolVersion: cloudfront.SecurityPolicyProtocol.TLS_V1_2_2021,
defaultBehavior: {
origin: new cloudfront_origins.S3Origin(assetBucket, {
originAccessIdentity: cloudfrontOAI,
}),
compress: true,
allowedMethods: cloudfront.AllowedMethods.ALLOW_GET_HEAD_OPTIONS,
viewerProtocolPolicy:
cloudfront.ViewerProtocolPolicy.REDIRECT_TO_HTTPS,
},
}
);
const { deployedBucket } = new s3deploy.BucketDeployment(
this,
"AssetDeployment",
{
sources: [],
destinationBucket: assetBucket,
distribution,
distributionPaths: ["/*"],
}
);
// グラフを描画するLambda
const plotLambda = new lambda.Function(this, "PlotLambda", {
runtime: lambda.Runtime.FROM_IMAGE,
handler: lambda.Handler.FROM_IMAGE,
// Lambdaコードが存在するディレクトリを指定
code: lambda.Code.fromAssetImage(join(__dirname, "../../lambda/plot")),
environment: {
S3_BUCKET_NAME: deployedBucket.bucketName,
},
timeout: Duration.minutes(3),
});
deployedBucket.grantPut(plotLambda);
// Slackにアクセスするためのシークレット
const secret = new secretsmanager.Secret(this, "Secret", {
removalPolicy: RemovalPolicy.DESTROY,
// デプロイ後に実際の値を手動で設定すること
secretObjectValue: {
SLACK_BOT_TOKEN: SecretValue.unsafePlainText("dummy"),
SLACK_CHANNEL_TO_NOTIFY: SecretValue.unsafePlainText("dummy"),
},
});
// Slackへメッセージを送信するLambda
const notificationLambda = new lambda.Function(this, "NotificationLambda", {
runtime: lambda.Runtime.FROM_IMAGE,
handler: lambda.Handler.FROM_IMAGE,
// Lambdaコードが存在するディレクトリを指定
code: lambda.Code.fromAssetImage(
join(__dirname, "../../lambda/notification")
),
environment: {
SLACK_CREDENTIALS_SECRET_ID: secret.secretName,
CLOUD_FRONT_DISTRIBUTION_URL:
"https://" + distribution.distributionDomainName,
},
});
secret.grantRead(notificationLambda);
const plotLambdaTask = new stepfunctions_tasks.LambdaInvoke(
this,
"PlotLambdaTask",
{
lambdaFunction: plotLambda,
}
);
const notificationLambdaTask = new stepfunctions_tasks.LambdaInvoke(
this,
"NotificationLambdaTask",
{ lambdaFunction: notificationLambda }
);
const definition = plotLambdaTask.next(notificationLambdaTask);
new stepfunctions.StateMachine(this, "StateMachine", { definition });
}
}
Step Functions を実行する
Step Functions を実行してみます。
実行が完了しました。
Slack に通知が来ました。
最後に
今回は Step Functions を手動実行しましたが、定期実行するように設定すれば、毎日 DB から値を取得してレポート用のグラフを作成する、のようなことに応用が効くと思います。
Discussion