Snowpipeとタスクのエラー通知をサクッと設定するよ
こちらは Snowflake Advent Calender 2日目の記事となっています。
盛大に遅刻しており非常に申し訳ないです。。。
これは何?
「Snowpipeのエラー通知」と「タスクのエラー通知」を Terraform でサクッと設定してみるお話です。
はじめに
Snowpipe でデータを自動ロードさせたり、タスクをスケジューリングして定期的にデータを処理させるなど、Snowpipe とタスクを使用する場面はかなり多いと思います。
便利で頼れる機能たちなのですが、はたして元気に Loaded
になってくれているのか、気付かぬうちに Failed
になってはいないか、気になりますよね。運用上、異常が発生していたらすぐに発見したいところです。しかしながら、ずっと張り付いてステータスを監視するわけにもいきませんよね。
そんな悩みを抱える全ての運用マンに、ぜひともご紹介したいのが「Snowpipeのエラー通知」と「タスクのエラー通知」でございます。
その名の通り Snowpipe とタスクでエラーが発生した時に通知を送れる機能で、監視作業が非常に捗るものとなっています。AWS の場合は SNS を使用するので、メールで通知を送信したり、Lambda に送信して見やすく加工することもできます。
今回の記事では、そんなありがたみ溢れる便利機能を Terraform で一気に設定してしまおうと思います。
ざっくり構成
こんな感じです。
こちらもざっくりですが、このような流れでエラーを通知します:
- Snowflake Snowpipe またはタスクでエラーが起きると
- エラーメッセージが AWS SNS トピックにパブリッシュされて
- サブスクライブされている配信先に通知が飛ぶ
どんな通知が来るの?
メールを通知先にすると、こんな感じでエラーメッセージが飛んできます。これはタスクでエラーが発生した場合のもの。
メッセージは JSON ですので、ちょっと整えて Slack に通知を飛ばすこともできます。
やること
公式ドキュメントの手順に従うと、次の手順になります。
- AWS側
- SNS トピック作成
- トピックにメッセージを発行できるユーザーを「全員」 or 「指定されたAWSアカウントのみ」に設定
- IAM ポリシー作成
- IAM ロール作成
- SNS トピックにSnowflakeからのアクセスを許可する(Snowfklake側で [🌟] 対応後)
- SNS トピックのメッセージをお好きな配信先にサブスクライブ
- SNS トピック作成
- Snowflake側
- [🌟] Notification Integration作成
- Snowpipe またはタスクでエラー通知を有効化する
terraforming するので、だいたい一気に進みます。
やってみよう
それでは、Let's Terraforming!
今回の検証環境はこちら↓になります。
ツール等 | バージョン |
---|---|
Terraform | 1.1.0 |
Snowflake provider | 0.52.0 |
AWS provider | バージョン |
AWS SNS トピック作成
まずは AWS SNS トピックを作成します。
このとき、デフォルトのアクセスポリシーでは、トピックの所有者のみがメッセージ発行とサブスクライブを行える状態になっています。そのため、アクセスポリシーでトピックにメッセージを発行できるユーザーを「指定されたAWSアカウントのみ」に設定します。[1]
ソースコード
variables
variable "snowflake_aws_id" {
type = string
description = "Snowflakeアカウントが使用しているAWS アカウントID"
}
variable "aws_sns_topic_name" {
type = string
description = "SNS トピック名"
}
data
data "aws_caller_identity" "current" {}
data "aws_availability_zones" "current" {}
resources
#
# SNS topic
#
resource "aws_sns_topic" "notif_sns_topic" {
name = var.aws_sns_topic_name
display_name = var.aws_sns_topic_name
policy = jsonencode(
{
"Version" : "2008-10-17",
"Id" : "__default_policy_ID",
"Statement" : [
{
"Sid" : "__default_statement_ID",
"Effect" : "Allow",
"Principal" : {
"AWS" : "*"
},
"Action" : [
"SNS:GetTopicAttributes",
"SNS:SetTopicAttributes",
"SNS:AddPermission",
"SNS:RemovePermission",
"SNS:DeleteTopic",
"SNS:Subscribe",
"SNS:ListSubscriptionsByTopic",
"SNS:Publish"
],
"Resource" : "arn:aws:sns:${data.aws_availability_zones.current.id}:${data.aws_caller_identity.current.account_id}:${var.aws_sns_topic_name}",
"Condition" : {
"StringEquals" : {
"AWS:SourceOwner" : "${data.aws_caller_identity.current.account_id}"
}
}
},
# トピックにメッセージを発行できるユーザー:「指定されたAWSアカウントのみ」
{
"Sid" : "__console_pub_0",
"Effect" : "Allow",
"Principal" : {
"AWS" : "arn:aws:iam::${var.snowflake_aws_id}:root"
},
"Action" : "SNS:Publish",
"Resource" : ""
}
]
}
)
}
AWS IAM ポリシー/ロール
次に、SNS トピックにメッセージを公開する権限を付与する IAM ポリシーとIAM ロールを作成します。
IAM ロールに対して、後でSNS トピックに Snowflake からのアクセスを許可するために、IAM ユーザー ARM と外部IDを IAM ロールの信頼関係に設定する必要がありますが、terraform コードでは snowflake_notification_integration
リソースを参照して IAM ユーザー ARN と外部IDを自動的に取得できるようにしておきます。
ソースコード
variables
variable "aws_iam_policy_name" {
type = string
description = "AWS IAM ポリシー名"
}
variable "aws_iam_role_name" {
type = string
description = "IAM ロール名"
}
resources
#
# IAM policy
#
data "aws_iam_policy_document" "notif_policy_document" {
statement {
effect = "Allow"
actions = [
"sns:Publish",
]
resources = [
aws_sns_topic.notif_sns_topic.arn
]
}
}
module "notif_policy" {
source = "terraform-aws-modules/iam/aws//modules/iam-policy"
version = "4.1.0"
name = var.aws_iam_policy_name
path = "/"
policy = data.aws_iam_policy_document.notif_policy_document.json
}
#
# IAM role
#
data "aws_iam_policy_document" "notif_role_policy_document" {
statement {
effect = "Allow"
actions = ["sts:AssumeRole"]
condition {
test = "StringEquals"
variable = "sts:ExternalId"
values = [snowflake_notification_integration.notif_int.aws_sns_external_id]
}
principals {
type = "AWS"
identifiers = [snowflake_notification_integration.notif_int.aws_sns_iam_user_arn]
}
}
}
resource "aws_iam_role" "notif_role" {
name = var.aws_iam_role_name
assume_role_policy = data.aws_iam_policy_document.notif_role_policy_document.json
# Terraformの外でポリシーがアタッチされる可能性がある場合は、aws_iam_role_policy_attachment でアタッチするべし
managed_policy_arns = [
module.notif_policy.arn,
]
}
Snowflake Notification integration
先ほど作成した AWS SNS トピックを参照する Snowflake notification integration を作成します。
ソースコード
variables
variable "snowflake_notif_int_name" {
type = string
description = "Notification integration名"
}
# 前段で記載したものと同じ
# variable "aws_sns_topic_name" { ... }
# variable "aws_iam_role_name" { ... }
data
data "aws_region" "current" {}
# 前段で記載したものと同じ
# data "aws_caller_identity" "current" {}
resources
#
# Notification integration(AWS SNS)
#
resource "snowflake_notification_integration" "notif_int" {
provider = snowflake.accountadmin
name = var.snowflake_notif_int_name
comment = "Snowpipe・タスク エラー通知のためのnotification integration"
enabled = true
type = "QUEUE"
direction = "OUTBOUND"
# AWS_SNS
notification_provider = "AWS_SNS"
# AWS側の情報。決め打ち可能
aws_sns_role_arn = "arn:aws:iam::${data.aws_caller_identity.current.account_id}:role/${var.aws_iam_role_name}"
aws_sns_topic_arn = "arn:aws:sns:${data.aws_region.current.name}:${data.aws_caller_identity.current.account_id}:${var.aws_sns_topic_name}"
}
以上の terraform コードで、エラー通知に必要なリソースがそろいます。お好きな variable を設定して、terraform aplly
すれば、エラー通知の準備は完了です。
Snowflake Snowpipeまたはタスクでエラー通知を有効化
あとは、作成した notification integration を error_integration
に設定すればOKです。
Terraform コードで設定する場合も、error_integration
に指定してあげればOKです。
試しに、絶対にエラーになるタスクを起動して、通知が来るのを確認してみましょう。
こんなタスクを作成します。
create or replace task "TEST_TASK_NOTIF"
error_integration = notif_int -- 作成した notification integration
schedule = '60 minutes'
as
select dummy; -- dummy って何やねん、ということで絶対にエラーになる
タスクに設定したステートメントを実行すると、当然エラーになります。
SELECT dummy;
-- output
Error: invalid identifier 'DUMMY' (line 6)
それでは、タスクを実行しちゃいましょう。
execute task "TEST_TASK_NOTIF";
-- output
status
Task TEST_TASK_NOTIF is scheduled to run immediately.
ちょっとだけ待っていると。。。
といった感じで、エラー通知が届きます。
エラーを発生させたタスクの名前が taskName
で、発生したエラーの内容は errorMessage
で確認てきますね。
通知が届くだけでも十分ですが、生のメッセージはちょっと読みづらいかなー。。。と思ったときは、Lambda でちょっと加工して Slack に流す、なんてこともできます。
Lambdaソースコード
import json
import logging
import os
from datetime import datetime
from dateutil import tz
from urllib.request import Request, urlopen
from textwrap import dedent
# 環境変数の読み込み
log_level = os.environ["LOG_LEVEL"]
web_hook = os.environ["WEB_HOOK"]
slack_channel = os.environ["SLACK_CHANNEL"]
slack_username = os.environ["SLACK_USERNAME"]
# loggerの初期設定
logger = logging.getLogger()
logger.setLevel(log_level)
def notify_slack(msgs, web_hook, slack_channel, slack_username):
"""
slackにメッセージを投稿する
Args:
msgs(str): 投稿するメッセージ
web_hook (str): 投稿するslackのwebhook URL
slack_channel (str): 投稿するslackのチャンネル名
slack_username (str): 投稿するslackのユーザ名
"""
send_data = {
"channel": slack_channel,
"username": slack_username,
"text": msgs
}
payload = "payload=" + json.dumps(send_data)
request = Request(
web_hook,
data=payload.encode("utf-8"),
method="POST"
)
with urlopen(request) as response:
response_body = response.read().decode("utf-8")
logger.info(f"response: {response_body}")
return response_body
def get_datetime_jst(dt: str) -> datetime:
return datetime.strptime(
dt, '%Y-%m-%dT%H:%M:%S.%f%z'
).astimezone(
tz.gettz('Asia/Tokyo')
)
def lambda_handler(event, context):
print("Received event: " + json.dumps(event, indent=2))
msg = json.loads(event["Records"][0]["Sns"]["Message"])
err_msg = msg['messages'][0] # Note: 2つ以上の要素が入ってきたことはないが、、、
logger.info(f'msg: {msg}')
if pipe_name := msg.get("pipeName"):
table_name = msg["tableName"]
timestamp = msg["timestamp"]
timezone_jst = get_datetime_jst(timestamp)
file_name = f'{msg["stageLocation"]}{err_msg["fileName"]}'
err = err_msg["firstError"].replace("\n", " ")
msg = dedent(
f"""
:warning: *Snowpipe Failed* :warning:
```
Task {pipe_name}
Table {table_name}
File {file_name}
Timestamp {timestamp}
(JST {timezone_jst})
Error {err}
```
"""
).strip()
elif task_name := msg.get("taskName"):
timestamp = msg["timestamp"]
timezone_jst = get_datetime_jst(timestamp)
query_id = f'{err_msg["queryId"]}'
err = err_msg["errorMessage"].replace("\n", " ")
msg = dedent(
f"""
:warning: *Task Failed* :warning:
```
Task: {task_name}
Query ID: {query_id}
Timestamp: {timestamp}
(JST: {timezone_jst})
Error: {err}
```
"""
).strip()
logger.info(msg)
try:
notify_slack(msg, web_hook, slack_channel, slack_username)
logger.info("Successful notification via slack")
except Exception as e:
logger.exception("Failed to notify slack: %s.", e)
return "ng"
return "ok"
メッセージはこんな感じです。すっきりしましたね。
Snowpipe の場合はこんな感じ。
おわりに
Snowpipe とタスクのエラー通知を設定しておくと、安心してワークフロー運用ができて、とても良い感じです。
初手で設定しておきたいですね!
-
「全員」でもエラー通知はできるかもですが、解放しすぎなので試したことはないです。 ↩︎
Discussion