🎃

Step Functions と Lake Formation を使った異なる AWS アカウントへの Aurora のデータ共有

2022/12/09に公開

この記事は Safie Engineers' Blog! Advent Calendar 9日目の記事です。

セーフィーのインフラグループに所属している佐伯です。普段は AWS のあれこれを担当しており最近は SRE 的な活動もはじめています。今回はせっかくなので Zenn で書いてみることにしました。

はじめに

AWS Step Functions と AWS Lake Formation を使って異なる AWS アカウントへ Aurora のデータを共有する仕組みを作ったので書いてみました。なお、細かい設定についてはかなり省いています。このエントリで伝えたいのは「こういった課題があって解決したくてマネージドサービスを利用した構成で実装してみたけど、制約や制限があるのでこういった設計をしました」という部分が主な内容なのでご了承ください。

背景

セーフィーではサービスレベルの向上などを目的として様々なデータを収集、分析するデータ分析基盤があり、サービスで利用している Amazon Aurora についても必要なデータをデータ分析基盤へ連携しています。

サービスとデータ分析基盤が稼働する AWS アカウントは別アカウントであり、これまでは Jenkins ジョブ(中身はシェルスクリプト)で以下の処理を日次で実行していました。

  • サービス AWS アカウント
    1. 対象の Aurora クラスターのクローンを作成
    2. 連携するテーブルのみを残し、且つセンシティブなカラムをマスキング
    3. クローンで作成した Aurora クラスターからスナップショットを作成
    4. 作成したスナップショットをデータ分析基盤 AWS アカウントへ共有
  • データ分析基盤 AWS アカウント
    1. スナップショットから S3 エクスポートを実行し、S3 バケットに保存
    2. Python スクリプト等で Redshift にデータをインポート

課題

これまでの仕組みでは以下の課題がありました。

  • Aurora クラスターごとにクローンを作成しているため単純に時間がかかる
  • 不要なテーブルを削除しており連携するテーブルを追加する度にシェルスクリプトの変更が必要
  • これは大した問題ではないが Aurora クラスターの作成/削除が頻繁に行われるため CloudWatch の AWS/RDS ネームスペースのディメンションが増加し、CloudWatch コンソールから確認したいメトリクスを探す際ストレスが溜まる

上記課題を解決するため何かいい感じの方法はないかと GitHub を調べてみたところ参考になりそうなリポジトリを発見しました。

上記リポジトリの構成をベースにもっといい感じにできないかなと検討した結果、AWS Step Functions と AWS Lake Formation を使うことにしました。

構成概要

ざっくりとですが以下のような構成です。

S3 エクスポートを使用している関係上、どうしても S3 エクスポート完了までには時間がかかります。そのため、この仕組みではニアリアルタイムなデータ分析が要件の場合はマッチしませんので別の方法を選択する必要があります。

Amazon EventBridge

現状は日次でデータを共有できれば問題ないとのことなので、Aurora クラスターの自動スナップショット作成完了を EventBridge で検知し Step Functions をトリガーすることにしました。EventBridge のイベントパターンは以下のようにしています。

{
  "source": ["aws.rds"]
  "detail-type": ["RDS DB Cluster Snapshot Event"],
  "detail": {
    "Message": ["Automated cluster snapshot created"],
    "SourceIdentifier": [{
      "prefix": "rds:${AURORA_CLUSTER_NAME}"
    }]
  }
}

また、アップデートによりスナップショットを作成せずとも S3 エクスポートができるようになっているので、これから作るのであれば自動スナップショット作成完了をトリガーにしなくてもいいかなとも思います。

AWS Step Functions

実行履歴の確認やマネジメントコンソールから簡単に再実行できるため Step Functions で実装することを選択しました。

また、S3 エクスポートは成功したが後続のタスクで失敗した場合など、同じスナップショットから再度 S3 エクスポートを実行することはできないので、S3 エクスポートを実行する State Machine と S3 オブジェクトのコピー&Glue クローラーを実行する State Machine を分割し、ネストされたワークフローを作成しています。ワークフローは以下の通りです。

Main State Machine

以下は Step Functions コンソールの Workflow Studio でエクスポートしたワークフローの図です。

Step Functions で特別難しいことはしていないので、ASL の定義は省略させていただきました。簡単に処理の流れは以下のとおりです。

  1. Lambda Function で S3 エクスポートを実行
  2. Choice State での分岐
    • S3 エクスポートを実行できた場合: DescribeExportTasks を定期的に実行しエクスポート完了まで待機
    • S3 エクスポートを実行できなかった場合: 10分待機し再度 Lambda Function を実行
  3. S3 エクスポート完了後、Sub State Machine を実行

Lambda Function

AWS API をコールするだけの部分は Step Functions から AWS SDK 統合で直接 AWS API をコールしていますが、S3 エクスポートを実行する部分は「アカウントにつき最大 5 つまで」という同時実行数制限があるため、同時実行数を制御、もし同時実行数を超えてエラーとなった場合はエラーハンドリングするために Lambda Function で実行しています。

Amazon S3 への DB スナップショットデータのエクスポート - Amazon Relational Database Service
1 つのアカウントにつき、最大 5 つの DB スナップショットエクスポートタスクを同時に実行できます。

弊社のサーバーサイドでは Python がメインで使われてることもあり、Lambda Function は Python で書きました。以下はサンプルコードとなります。この Lambda Function のレスポンスをワークフローの後続のタスクで参照することになります。

import json
import logging
import os
import re

import boto3
from botocore.exceptions import ClientError


class JsonFormatter:
    def format(self, record):
        return json.dumps(vars(record))


logging.basicConfig()
logging.getLogger().handlers[0].setFormatter(JsonFormatter())

logger = logging.getLogger(__name__)
logger.setLevel(os.environ.get("LOG_LEVEL", "INFO"))

S3_BUCKET = os.getenv("S3_BUCKET")
S3_EXPORT_IAM_ROLE_ARN = os.getenv("S3_EXPORT_IAM_ROLE_ARN")
KMS_KEY_ARN = os.getenv("KMS_KEY_ARN")
EXPORT_TASK_LIMIT = int(os.getenv("EXPORT_TASK_LIMIT"))

client = boto3.client("rds")


def get_running_task_count():
    response = client.describe_export_tasks(
        Filters=[
            {
                "Name": "status",
                "Values": [
                    "in_progress",
                    "starting",
                ],
            },
        ],
    )

    count = 0
    for export_task in response["ExportTasks"]:
        logger.info(
            f"Export Task: {export_task['ExportTaskIdentifier']} is {export_task['Status']}"
        )
        count += 1

    logger.info(f"Export tasks running count is {count}")
    return count


def start_export_task(id, snapshot, cluster_id, date):
    try:
        response = client.start_export_task(
            ExportTaskIdentifier=id,
            SourceArn=snapshot,
            S3BucketName=S3_BUCKET,
            S3Prefix=f"s3-export/{cluster_id}/{date}",
            IamRoleArn=S3_EXPORT_IAM_ROLE_ARN,
            KmsKeyId=KMS_KEY_ARN,
        )

        logger.info(f"start_export_task API response={response}")

        result = {
            "ExportTask": {
                "ClusterIdentifier": cluster_id,
                "ExportTaskIdentifier": response["ExportTaskIdentifier"],
                "SourceArn": response["SourceArn"],
                "S3Bucket": response["S3Bucket"],
                "S3Prefix": response["S3Prefix"],
                "Skipped": False,
            }
        }

        return result
    except ClientError as e:
        if e.response["Error"]["Code"] == "ExportTaskLimitReachedFault":
            logger.info(f"start_export_task API response={e.response}")

            result = {"ExportTask": {"ClusterIdentifier": cluster_id, "Skipped": True}}

            return result
        else:
            raise e


def lambda_handler(event, context):
    logger.info(f"Triggerd event={event}")

    source_snapshot = event["detail"]["SourceArn"]
    source_id = event["detail"]["SourceIdentifier"].replace("rds:", "")
    cluster_id = re.sub("-\d{4}-\d{2}-\d{2}-\d{2}-\d{2}", "", source_id)
    date = re.findall("\d{4}-\d{2}-\d{2}", source_id)[0]

    count = get_running_task_count()

    if count < EXPORT_TASK_LIMIT:
        logger.info(
            f"Running export tasks is less than {EXPORT_TASK_LIMIT}, to execute start export task"
        )
        result = start_export_task(source_id, source_snapshot, cluster_id, date)
    else:
        logger.info(
            f"Running export tasks is {EXPORT_TASK_LIMIT} or more, to skip start export task"
        )
        result = {"ExportTask": {"ClusterIdentifier": cluster_id, "Skipped": True}}

    if "ExportTaskIdentifier" in result["ExportTask"]:
        return result
    else:
        return {**event, **result}

Sub State Machine

Main State Machine から実行される Sub State Machine は S3 エクスポートで作成された Parquet ファイルを Glue クローラーがクロールするパスへコピーし、Glue クローラーの実行を行います。

簡単に処理の流れは以下のとおりです。

  1. S3 エクスポートで作成された Parquet ファイルを Glue クローラーが参照するパスへコピー
  2. Glue クローラーを実行
  3. Glue クローラーのステータスが READY になるまで60秒おきにステータスを確認
  4. Glue クローラーのステータスが READY になった後に LastCrawlInfo を確認
    • SUCCEEDED だった場合は State Machine を Success State で終了
    • FAILED, CANCELLED だった場合は State Machine を Fail State で終了

Lambda Function

Lambda Function では Glue クローラーがクロールするパスへ Parquet ファイルのコピーを行っています。

S3 エクスポートでエクスポートされるデータの命名規則にはふたつの規則があり、以下ドキュメントに記載の通り、"現在の規則", "以前の規則"のふたつの規則があります。どちらでエクスポートされても対応できるようパスを組み立て、且つ Glue クローラーがパーティションを作成できるよう date=YYYY-MM-DD なども追加しています。

また、対象の Aurora クラスターによってはエクスポートされるファイル数が多く、シーケンシャルに実行すると Lambda のタイムアウトのハードリミットである15分ではコピーを完了できなかったため、asyncio で非同期処理にしています。厳密にどの程度のファイル数まで大丈夫かというのは計測していませんが、9000ファイル程度であれば6分程度でコピーできることは確認しました(Lambda Function のメモリは 1024MB)

以下はサンプルコートです。

import asyncio
import json
import logging
import os
import re

import boto3


class JsonFormatter:
    def format(self, record):
        return json.dumps(vars(record))


logging.basicConfig()
logging.getLogger().handlers[0].setFormatter(JsonFormatter())

logger = logging.getLogger(__name__)
logger.setLevel(os.environ.get("LOG_LEVEL", "INFO"))

TO_BUCKET = os.getenv("TO_BUCKET")
TO_PREFIX = os.getenv("TO_PREFIX")

s3 = boto3.resource("s3")


def copy_obcjet(from_bucket, to_bucket, from_key, to_key):
    source = {"Bucket": from_bucket, "Key": from_key}
    s3.Object(to_bucket, to_key).copy_from(CopySource=source)
    logger.info(f"Copy from s3://{from_bucket}/{from_key} to s3://{to_bucket}/{to_key}")


async def async_copy(from_bucket, to_bucket, keys):
    loop = asyncio.get_running_loop()

    await asyncio.gather(
        *[
            loop.run_in_executor(
                None, copy_obcjet, from_bucket, to_bucket, key["from"], key["to"]
            )
            for key in keys
        ]
    )


def lambda_handler(event, context):
    logger.info(f"Triggerd event={event}")

    cluster_id = event["ExportTask"]["ClusterIdentifier"]
    export_task_id = event["ExportTask"]["ExportTaskIdentifier"]
    from_bucket = event["ExportTask"]["S3Bucket"]
    s3_prefix = event["ExportTask"]["S3Prefix"]
    date = "date=" + s3_prefix.split("/")[-1]
    from_prefix = f"{s3_prefix}/{export_task_id}"
    to_prefix = f"{TO_PREFIX}/{cluster_id}"

    keys = []
    for obj in s3.Bucket(from_bucket).objects.filter(Prefix=from_prefix):
        if obj.key.endswith(".gz.parquet"):
            to_key = obj.key.replace(from_prefix, to_prefix, 1)
            to_key = os.path.join(
                to_key.split("/")[0],
                to_key.split("/")[1],
                to_key.split("/")[2],
                to_key.split("/")[3],
                date,
                os.path.split(to_key)[1],
            )

            keys.append(dict({"from": obj.key, "to": to_key}))

    loop = asyncio.get_event_loop()
    loop.run_until_complete(async_copy(from_bucket, TO_BUCKET, keys))

    return event

なお、コピー元とコピーした Parquet ファイルは一定期間経過後に削除するよう S3 のライフサイクルポリシーを設定しています。

AWS Lake Formation

公式ブログや参考になる記事はたくさんあるため細かい設定については省略しています。少し検討したところや制限などについて記載します。AWS Lake Formation でのデータ共有に関しては以下公式ブログが参考になると思います。

共有方法について

AWS Lake Formation では LF-TBAC(Lake Formation tag-based access control) 方式とデータフィルターを使用してデータ分析基盤 AWS アカウントへ権限を付与します。なぜ複数の方法を使用しているかというと、センシティブなカラムはデータ分析基盤 AWS アカウントからは参照できないようにしたいという要件があり、列フィルタリングを行う必要がありました。

以下ドキュメントに記載の通り、Glue からの参照において列フィルタリングを行うにはデータフィルターを使用する必要があります。そのため列フィルタリングが不要なテーブルは LF-TBAC で共有、列フィルタリングが必要なテーブルはデータフィルターで共有という方法を採用しています。

列レベルのフィルタリングに関する注意点と制限 - AWS Lake Formation
AWS Glue ETL ジョブは、データフィルター (セルレベルのセキュリティ) を使用することによる列フィルタリングのみをサポートします。

Glue から参照しないのであればデータフィルターを使う必要はなく、LF-TBAC 方式でも特定カラムだけ LF-Tag 設定を上書きすることで列フィルタリングは可能です。

LF-Tag

どういったタグを設定するかというのは迷いましたが、LF-Tag の変更はカジュアルにできるのであまり難しく考えずとりあえず機密レベルと AWS アカウントを起点に以下のような設計にしました。なお、現状 confidential: sensitive は利用してないです。

Key Value 用途
confidential sensitive 自アカウントであっても特定のプリンシパルのみ参照可能
confidential private 自アカウントのプリンシパルのみ参照できるデータ
confidential public 他アカウントへ共有するデータ

データカタログへの LF-Tag 設定

明示的に共有したいテーブルのみデータ分析基盤 AWS アカウントから参照できるようにしたかったため、データカタログのデータベースには confidential: private の LF-Tag を設定、共有したいテーブルのみ confidential: public で LF-Tag を上書きすることにしました。

データフィルター

参照させたくないカラムが存在するテーブルは LF-Tag ではなくデータフィルターで共有の設定を行います。データフィルターに関しては以下の記載があったため include columns で参照可能なカラムを指定し作成しました。

列レベルのフィルタリングに関する注意点と制限 - AWS Lake Formation
grant オプションと列フィルタリングを伴う SELECT を付与するには、除外リストではなく、包含リストを使用する必要があります grant オプションを使用しない場合は、包含リストまたは除外リストのどちらでも使用することができます。

余談ですが、terraform-provider-aws がデータフィルターには対応しておらず、AWS CLI の入力ファイルを Git 管理することでなんとかしています。

課題が解決できたか

実行時間

2022/12 時点で日次で16の Aurora クラスターを対象に Step Functions の処理を実行しています。S3 エクスポート、Parquet ファイルのコピー、データカタログの更新を1時間程度で完了できているので早いとは言えずとも日次でデータを共有する弊社のユースケースにおいては許容範囲だと思っています。

メンテナンスコスト

S3 エクスポートでは全てのテーブルをエクスポートし、データカタログを更新しているので共有するテーブルを追加する場合は LF-Tag を設定 or データフィルタの作成、権限付与するのみなのでその部分ではメンテナンスコストは下がりました。

ただし、新たに何か要件がでてきた場合など Step Functions のワークフローや Lambda Function の処理を変更する必要もでてきたため、この仕組を作った人以外変更しづらいといった状況が発生しているかもしれません。(それはシェルスクリプトでも同じかもしれませんが)

まとめ

普段は主に ECS などのインフラ系サービスを触ることが多く、あまり馴染みのないサービスを今回触ってみることになりました。AWS Lake Formation、使ってみる前は「なんか落とし穴がありそう...」とか思ってたんですが、意外と罠にハマることもなくやりたいことを実現できたので良いサービスだと感じてます。(といっても今回は一部の機能しか使ってないですが)

また、Step Functions でワークフローを作ったことで再実行がしやすくデバッグ等も捗りました。失敗時の Slack 通知も Amazon EventBridge -> Amazon SNS -> AWS Chatbot でできる(通知内容は情報量がなくて微妙に感じてはいますが)ため、運用も少しは楽になったと思います。

以上、この記事がどなたかの参考になればと思います。

Discussion