🔔

Snowpipeとタスクのエラー通知をサクッと設定するよ

2022/12/11に公開

こちらは Snowflake Advent Calender 2日目の記事となっています。
盛大に遅刻しており非常に申し訳ないです。。。

これは何?

Snowpipeのエラー通知」と「タスクのエラー通知」を Terraform でサクッと設定してみるお話です。

はじめに

Snowpipe でデータを自動ロードさせたり、タスクをスケジューリングして定期的にデータを処理させるなど、Snowpipe とタスクを使用する場面はかなり多いと思います。
便利で頼れる機能たちなのですが、はたして元気に Loaded になってくれているのか、気付かぬうちに Failed になってはいないか、気になりますよね。運用上、異常が発生していたらすぐに発見したいところです。しかしながら、ずっと張り付いてステータスを監視するわけにもいきませんよね。

そんな悩みを抱える全ての運用マンに、ぜひともご紹介したいのが「Snowpipeのエラー通知」と「タスクのエラー通知」でございます。
その名の通り Snowpipe とタスクでエラーが発生した時に通知を送れる機能で、監視作業が非常に捗るものとなっています。AWS の場合は SNS を使用するので、メールで通知を送信したり、Lambda に送信して見やすく加工することもできます。

今回の記事では、そんなありがたみ溢れる便利機能を Terraform で一気に設定してしまおうと思います。

ざっくり構成

こんな感じです。

こちらもざっくりですが、このような流れでエラーを通知します:

  1. Snowflake Snowpipe またはタスクでエラーが起きると
  2. エラーメッセージが AWS SNS トピックにパブリッシュされて
  3. サブスクライブされている配信先に通知が飛ぶ

どんな通知が来るの?

メールを通知先にすると、こんな感じでエラーメッセージが飛んできます。これはタスクでエラーが発生した場合のもの。

メッセージは JSON ですので、ちょっと整えて Slack に通知を飛ばすこともできます。

やること

公式ドキュメントの手順に従うと、次の手順になります。

  • AWS側
    1. SNS トピック作成
      • トピックにメッセージを発行できるユーザーを「全員」 or 「指定されたAWSアカウントのみ」に設定
    2. IAM ポリシー作成
    3. IAM ロール作成
    4. SNS トピックにSnowflakeからのアクセスを許可する(Snowfklake側で [🌟] 対応後)
    5. SNS トピックのメッセージをお好きな配信先にサブスクライブ
  • Snowflake側
    1. [🌟] Notification Integration作成
    2. Snowpipe またはタスクでエラー通知を有効化する

terraforming するので、だいたい一気に進みます。

やってみよう

それでは、Let's Terraforming!

今回の検証環境はこちら↓になります。

ツール等 バージョン
Terraform 1.1.0
Snowflake provider 0.52.0
AWS provider バージョン

AWS SNS トピック作成

まずは AWS SNS トピックを作成します。
このとき、デフォルトのアクセスポリシーでは、トピックの所有者のみがメッセージ発行とサブスクライブを行える状態になっています。そのため、アクセスポリシーでトピックにメッセージを発行できるユーザーを「指定されたAWSアカウントのみ」に設定します。[1]

ソースコード

variables

variable "snowflake_aws_id" {
  type        = string
  description = "Snowflakeアカウントが使用しているAWS アカウントID"
}

variable "aws_sns_topic_name" {
  type        = string
  description = "SNS トピック名"
}

data

data "aws_caller_identity" "current" {}
data "aws_availability_zones" "current" {}

resources

# 
# SNS topic
# 

resource "aws_sns_topic" "notif_sns_topic" {
  name         = var.aws_sns_topic_name
  display_name = var.aws_sns_topic_name
  policy = jsonencode(
    {
      "Version" : "2008-10-17",
      "Id" : "__default_policy_ID",
      "Statement" : [
        {
          "Sid" : "__default_statement_ID",
          "Effect" : "Allow",
          "Principal" : {
            "AWS" : "*"
          },
          "Action" : [
            "SNS:GetTopicAttributes",
            "SNS:SetTopicAttributes",
            "SNS:AddPermission",
            "SNS:RemovePermission",
            "SNS:DeleteTopic",
            "SNS:Subscribe",
            "SNS:ListSubscriptionsByTopic",
            "SNS:Publish"
          ],
          "Resource" : "arn:aws:sns:${data.aws_availability_zones.current.id}:${data.aws_caller_identity.current.account_id}:${var.aws_sns_topic_name}",
          "Condition" : {
            "StringEquals" : {
              "AWS:SourceOwner" : "${data.aws_caller_identity.current.account_id}"
            }
          }
        },
        # トピックにメッセージを発行できるユーザー:「指定されたAWSアカウントのみ」
        {
          "Sid" : "__console_pub_0",
          "Effect" : "Allow",
          "Principal" : {
            "AWS" : "arn:aws:iam::${var.snowflake_aws_id}:root"
          },
          "Action" : "SNS:Publish",
          "Resource" : ""
        }
      ]
    }
  )
}

AWS IAM ポリシー/ロール

次に、SNS トピックにメッセージを公開する権限を付与する IAM ポリシーIAM ロールを作成します。
IAM ロールに対して、後でSNS トピックに Snowflake からのアクセスを許可するために、IAM ユーザー ARM と外部IDを IAM ロールの信頼関係に設定する必要がありますが、terraform コードでは snowflake_notification_integration リソースを参照して IAM ユーザー ARN と外部IDを自動的に取得できるようにしておきます。

ソースコード

variables

variable "aws_iam_policy_name" {
  type        = string
  description = "AWS IAM ポリシー名"
}

variable "aws_iam_role_name" {
  type        = string
  description = "IAM ロール名"
}

resources

# 
# IAM policy
# 

data "aws_iam_policy_document" "notif_policy_document" {
  statement {
    effect = "Allow"
    actions = [
      "sns:Publish",
    ]
    resources = [
      aws_sns_topic.notif_sns_topic.arn
    ]
  }
}

module "notif_policy" {
  source  = "terraform-aws-modules/iam/aws//modules/iam-policy"
  version = "4.1.0"

  name = var.aws_iam_policy_name
  path = "/"

  policy = data.aws_iam_policy_document.notif_policy_document.json
}

# 
# IAM role
# 

data "aws_iam_policy_document" "notif_role_policy_document" {
  statement {
    effect  = "Allow"
    actions = ["sts:AssumeRole"]
    condition {
      test     = "StringEquals"
      variable = "sts:ExternalId"
      values   = [snowflake_notification_integration.notif_int.aws_sns_external_id]
    }
    principals {
      type        = "AWS"
      identifiers = [snowflake_notification_integration.notif_int.aws_sns_iam_user_arn]
    }
  }
}

resource "aws_iam_role" "notif_role" {
  name               = var.aws_iam_role_name
  assume_role_policy = data.aws_iam_policy_document.notif_role_policy_document.json
  # Terraformの外でポリシーがアタッチされる可能性がある場合は、aws_iam_role_policy_attachment でアタッチするべし
  managed_policy_arns = [
    module.notif_policy.arn,
  ]
}

Snowflake Notification integration

先ほど作成した AWS SNS トピックを参照する Snowflake notification integration を作成します。

ソースコード

variables

variable "snowflake_notif_int_name" {
  type        = string
  description = "Notification integration名"
}

# 前段で記載したものと同じ
# variable "aws_sns_topic_name" { ... }
# variable "aws_iam_role_name" { ... }

data

data "aws_region" "current" {}

# 前段で記載したものと同じ
# data "aws_caller_identity" "current" {}

resources

# 
# Notification integration(AWS SNS)
# 

resource "snowflake_notification_integration" "notif_int" {
  provider = snowflake.accountadmin
  name     = var.snowflake_notif_int_name
  comment  = "Snowpipe・タスク エラー通知のためのnotification integration"

  enabled   = true
  type      = "QUEUE"
  direction = "OUTBOUND"

  # AWS_SNS
  notification_provider = "AWS_SNS"
  # AWS側の情報。決め打ち可能
  aws_sns_role_arn = "arn:aws:iam::${data.aws_caller_identity.current.account_id}:role/${var.aws_iam_role_name}"
  aws_sns_topic_arn = "arn:aws:sns:${data.aws_region.current.name}:${data.aws_caller_identity.current.account_id}:${var.aws_sns_topic_name}"
}

以上の terraform コードで、エラー通知に必要なリソースがそろいます。お好きな variable を設定して、terraform aplly すれば、エラー通知の準備は完了です。

Snowflake Snowpipeまたはタスクでエラー通知を有効化

あとは、作成した notification integration を error_integration に設定すればOKです。
Terraform コードで設定する場合も、error_integrationに指定してあげればOKです。
試しに、絶対にエラーになるタスクを起動して、通知が来るのを確認してみましょう。

こんなタスクを作成します。

create or replace task "TEST_TASK_NOTIF"
  error_integration = notif_int -- 作成した notification integration
  schedule = '60 minutes'
as
  select dummy; -- dummy って何やねん、ということで絶対にエラーになる

タスクに設定したステートメントを実行すると、当然エラーになります。

SELECT dummy;
-- output
Error: invalid identifier 'DUMMY' (line 6)

それでは、タスクを実行しちゃいましょう。

execute task "TEST_TASK_NOTIF";
-- output
status
Task TEST_TASK_NOTIF is scheduled to run immediately.

ちょっとだけ待っていると。。。

といった感じで、エラー通知が届きます。
エラーを発生させたタスクの名前が taskName で、発生したエラーの内容は errorMessage で確認てきますね。

通知が届くだけでも十分ですが、生のメッセージはちょっと読みづらいかなー。。。と思ったときは、Lambda でちょっと加工して Slack に流す、なんてこともできます。

Lambdaソースコード
import json
import logging
import os
from datetime import datetime
from dateutil import tz
from urllib.request import Request, urlopen
from textwrap import dedent


# 環境変数の読み込み
log_level = os.environ["LOG_LEVEL"]
web_hook = os.environ["WEB_HOOK"]
slack_channel = os.environ["SLACK_CHANNEL"]
slack_username = os.environ["SLACK_USERNAME"]

# loggerの初期設定
logger = logging.getLogger()
logger.setLevel(log_level)


def notify_slack(msgs, web_hook, slack_channel, slack_username):
    """
    slackにメッセージを投稿する
    Args:
        msgs(str): 投稿するメッセージ
        web_hook (str): 投稿するslackのwebhook URL
        slack_channel (str): 投稿するslackのチャンネル名
        slack_username (str): 投稿するslackのユーザ名
    """

    send_data = {
        "channel": slack_channel,
        "username": slack_username,
        "text": msgs
    }
    payload = "payload=" + json.dumps(send_data)
    request = Request(
        web_hook,
        data=payload.encode("utf-8"),
        method="POST"
    )

    with urlopen(request) as response:
        response_body = response.read().decode("utf-8")
    logger.info(f"response:  {response_body}")

    return response_body


def get_datetime_jst(dt: str) -> datetime:
    return datetime.strptime(
        dt, '%Y-%m-%dT%H:%M:%S.%f%z'
    ).astimezone(
        tz.gettz('Asia/Tokyo')
    )


def lambda_handler(event, context):
    print("Received event: " + json.dumps(event, indent=2))
    msg = json.loads(event["Records"][0]["Sns"]["Message"])
    err_msg = msg['messages'][0]  # Note: 2つ以上の要素が入ってきたことはないが、、、
    logger.info(f'msg: {msg}')

    if pipe_name := msg.get("pipeName"):
        table_name = msg["tableName"]
        timestamp = msg["timestamp"]
        timezone_jst = get_datetime_jst(timestamp)
        file_name = f'{msg["stageLocation"]}{err_msg["fileName"]}'
        err = err_msg["firstError"].replace("\n", " ")

        msg = dedent(
            f"""
            :warning: *Snowpipe Failed* :warning:
            ```
            Task       {pipe_name}
            Table      {table_name}
            File       {file_name}
            Timestamp  {timestamp}
                 (JST  {timezone_jst})
            Error      {err}
            ```
            """
        ).strip()

    elif task_name := msg.get("taskName"):
        timestamp = msg["timestamp"]
        timezone_jst = get_datetime_jst(timestamp)
        query_id = f'{err_msg["queryId"]}'
        err = err_msg["errorMessage"].replace("\n", " ")

        msg = dedent(
            f"""
            :warning: *Task Failed* :warning:
            ```
            Task:      {task_name}
            Query ID:  {query_id}
            Timestamp: {timestamp}
                 (JST: {timezone_jst})
            Error:     {err}
            ```
            """
        ).strip()
    logger.info(msg)

    try:
        notify_slack(msg, web_hook, slack_channel, slack_username)
        logger.info("Successful notification via slack")
    except Exception as e:
        logger.exception("Failed to notify slack: %s.", e)
        return "ng"

    return "ok"

メッセージはこんな感じです。すっきりしましたね。

Snowpipe の場合はこんな感じ。

おわりに

Snowpipe とタスクのエラー通知を設定しておくと、安心してワークフロー運用ができて、とても良い感じです。
初手で設定しておきたいですね!

脚注
  1. 「全員」でもエラー通知はできるかもですが、解放しすぎなので試したことはないです。 ↩︎

Discussion