🧊

AWSで実現するWrite-Audit-Publish(WAP)パターン(Apache Icebergでのbranch処理)

2024/12/10に公開

はじめに

過去の記事で、データのガバナンスを中心に解説しました。
その中で、データの品質確保が課題となると述べたと思います。
ビジネスにおいて価値のあるデータを提供するためにはデータの信頼性や品質を確保することが重要となります。
データの品質確保にはWrite-Audit-Publish(WAP)パターンData Contract等様々な方法があります。
今回はデータに対して監査や承認を行うことで品質を高める手法のWrite-Audit-Publish (WAP)パターンについて解説し、AWSでの簡易的な実装例を紹介したいと思います。
※育休中の隙間で作成した短時間クオリティの記事です。話題のApache Icebergを触ってみたかっただけという話だったりします。

Write-Audit-Publish(WAP)パターンとは

Write-Audit-Publish (WAP)パターンは、2017年にNetflixによって提唱されたデータパイプライン設計のアプローチです。
このパターンではデータの処理フローを以下の3つのステージに分けて管理します。

Write: データの書き込みステージ。新しいデータがシステムに取り込まれる。

Audit: データの検証ステージ。データ品質や整合性の検証を行い、基準を満たさないデータを識別する。

Publish: 本番公開ステージ。検証済みのデータを公開し、消費者に利用可能にする。

Netflixはこのパターンをデータレイク環境での課題解決のために採用し特にデータ品質の保証や透明性の向上といった課題に対応しました。
WAPパターンを利用することでデータの不整合を未然に防ぎ信頼性の高いデータパイプラインを構築することが可能となります。

Open Table Format(OTF) とは

WAPパターンを実現する際に、重要な要素があります。
それがOpen Table Format(OTF)と呼ばれるテーブルフォーマットです。
OTFはデータレイクに最適化されたテーブルフォーマットで様々な特徴があります。

主な特徴としては
データフォーマットの標準化: 分散データレイク環境で統一された操作性の提供が可能となる。
バージョニングとタイムトラベル: データの状態を履歴として保持し、過去の状態に簡単に戻すことが可能となる。
スキーマ進化のサポート: 柔軟にスキーマ変更が可能で、変更に強いデータ構造を構築することが可能となる。

これ以外にも様々な特徴がありますが、データレイクを構築する上で今後主流となる形式です。
AWS re:Invent 2024でもOTFを代表するApache IcebergへのS3最適化が大きな話題となっていたかと思います。

https://aws.amazon.com/about-aws/whats-new/2024/12/amazon-s3-tables-apache-iceberg-tables-analytics-workloads/

Netflixを含む多くの企業がこのフォーマットを採用し、データ管理の効率化や整合性確保につなげています。
OTFには前述したようなApache IcebergやApache HudiそしてDelta Lake等があります。
今回の実装例ではこのOTFの中でも特に有力なフォーマットのApache Icebergを使用したいと思います。

※個人的にはDatabricksからOTFに入ったのでDelta Lakeに思い入れがあったりします。

Apache Icebergとは

Apache IcebergはNetflixにより提唱されたテーブルフォーマットです。
SparkやFlink等の様々なコンピューティングエンジンより操作が可能で、効率的にデータを扱うことができます。

Apache Iceberg は、巨大な分析データセット用のオープンテーブル形式です。Icebergは、SQL テーブルと同じように機能する高性能なテーブル形式を使用して、Spark、Trino、PrestoDB、Flink、Hive、Impala などのコンピューティング エンジンにテーブルを追加します。

https://iceberg.apache.org/docs/nightly/

Apache Iceberg には以下のような特徴があります。

スキーマの進化:スキーマの追加、削除、更新、名前の変更をサポートし、変更に強いデータ構造を構築することが可能となります。
非表示のパーティショニング:ユーザーのミスによる誤った結果や極端に遅いクエリの発生を防止することが可能となります。
パーティションレイアウトの進化:データ量やクエリパターンの変化に応じてテーブルのレイアウトを更新することが可能となります。これにより柔軟にデータを扱うことができます。
タイムトラベル:過去のデータのスナップショットを使用し、スナップショットを取った時点のデータにアクセスることが可能となります。これにより変更履歴の追跡やデータの復元ができます。
バージョンロールバック:トランザクションのサポートを通じてデータ整合性と信頼性を保証しますることが可能となります。過去のバージョンにロールバックして問題の修正ができます。

このような仕組みを活用することでWAPパターンの「Write」、「Audit」および「Publish」フェーズで、品質チェックや整合性の担保を行うことが可能となります。

AWSでの構成例

実装上のポイント

AWSで実現するにあたり、Apache Icebergのbranchの仕組みを使用します。

https://iceberg.apache.org/docs/latest/branching/#audit-branch

構成図

ソースデータからデータレイクに書き込み(Write)、なにかしらの品質チェック(Audit)、チェックOKのデータをProdに公開(Publish)します。

サンプルコード

今回は1つのNotebook内で処理を完結させました。
実際に適用する場合は処理を分けてGlueのジョブでフローを作成したほうがいいと思います。

また、サンプルということでAudit処理は厳密なものは行っていません。(何の意味があるAuditなんだ…といった感じの処理にしてます。)

サンプルデータは下記から取得しています。(サンプルとしてよく使われるNYCのタクシーのデータです)
https://aws.amazon.com/marketplace/pp/prodview-okyonroqg5b2u

https://github.com/dataPenginPenguin/WAP-sample

ちなみにipynb形式で作成しています。(ipynb形式でうまくソースを埋め込む方法が分からなかった…githubでnotebookの実行結果を見てもらうのがいいかも)

ポイントは以下です。

  1. WAP処理の有効可処理
  2. fast_forwardでのデータの登録処理
%session_id_prefix native-iceberg-dataframe-
%glue_version 5.0
%idle_timeout 60
%%configure 
{
  "--conf": "spark.sql.extensions=org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions",
  "--datalake-formats": "iceberg"
}

from pyspark.sql import SparkSession
from pyspark.sql.functions import lit


# Sparkセッションの設定
spark = SparkSession.builder \
    .appName("GlueNotebook-Iceberg") \
    .config("spark.sql.extensions", "org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions") \
    .config("spark.sql.catalog.glue_catalog", "org.apache.iceberg.spark.SparkCatalog") \
    .config("spark.sql.catalog.glue_catalog.warehouse", "s3://waptest20241208/warehouse/") \
    .config("spark.sql.catalog.glue_catalog.catalog-impl", "org.apache.iceberg.aws.glue.GlueCatalog") \
    .config("spark.sql.catalog.glue_catalog.io-impl", "org.apache.iceberg.aws.s3.S3FileIO") \
    .getOrCreate()

# データの読み込み
rawDataPath = "s3://waptest20241208/raw_data/"
auditDf = spark.read.format("parquet").load(f"{rawDataPath}/nyc_tlc/yellow/mar2024/")

# Iceberg のブランチ設定
prodTable = "glue_catalog.nyc.yellow_taxi_trips"
auditBranch = "dataAudit_202403"

# WAP 処理を有効化
spark.sql(f"ALTER TABLE {prodTable} SET TBLPROPERTIES ('write.wap.enabled'='true')")
spark.sql(f"ALTER TABLE {prodTable} CREATE BRANCH {auditBranch}")
spark.conf.set("spark.wap.branch", auditBranch)

# Write
# データに年と月を追加
auditDf = auditDf.withColumn("month", lit(3)).withColumn("year", lit(2024))

# データを書き込み
auditDf.writeTo(prodTable).append()

# 中間データの確認
# Auditブランチ
spark.table(prodTable).groupBy("year", "month").count().show()

# mainブランチ
spark.read.option("BRANCH", "main").table(prodTable).groupBy("year", "month").count().show()

from pyspark.sql.functions import col
# Auditブランチからデータのロード
auditData = spark.read.option("BRANCH", auditBranch).table(prodTable)

# Audit
monthDf = auditData.filter(col("month") >= 2)

if monthDf.isEmpty():
    # データの登録をスキップ
    # SNSで通知するのがいいかなと
    print("No data found for month = 2. Skipping registration and sending notification.")
else:
    # データの登録
    spark.sql(f"""CALL glue_catalog.system.fast_forward('{prodTable}', 'main', '{auditBranch}')""")
    print("Data registered successfully.")

# データの登録を確認

# Auditブランチ
spark.table(prodTable).groupBy("year", "month").count().show()

# mainブランチ
spark.read.option("BRANCH", "main").table(prodTable).groupBy("year", "month").count().show()


# Audit/Publish
from pyspark.sql.functions import col
# Auditブランチからデータのロード
auditData = spark.read.option("BRANCH", auditBranch).table(prodTable)

# Audit
monthDf = auditData.filter(col("month") >= 2)

if monthDf.isEmpty():
    # データの登録をスキップ
    # SNSで通知するのがいいかなと
    print("No data found for month = 2. Skipping registration and sending notification.")
else:
    # データの登録
    spark.sql(f"""CALL glue_catalog.system.fast_forward('{prodTable}', 'main', '{auditBranch}')""")
    print("Data registered successfully.")

# データの登録を確認
# Auditブランチ
spark.table(prodTable).groupBy("year", "month").count().show()

# mainブランチ
spark.read.option("BRANCH", "main").table(prodTable).groupBy("year", "month").count().show()

# 後処理
spark.sql(f"ALTER TABLE {prodTable} UNSET TBLPROPERTIES ('write.wap.enabled')")
spark.sql(f"ALTER TABLE {prodTable} DROP BRANCH {auditBranch}")
%stop_session

実行結果

中間データの確認

下記のようにauditブランチに2024/03のデータが格納

最終データの登録を確認

下記のようにmainブランチに2024/03のデータが格納

ちなみにAudit処理についてAWS Glue Data Qualityでうまいことできないかと考えましたが、Notebookだと下記のようにルールセットをコード内で作成する必要があったためうーん…といった感じでした。(ルールの追加・変更のたびにコードをいじりたくないなと。ルールセットをS3等の外部に配置して取得する方法はできそう。)
https://docs.aws.amazon.com/ja_jp/glue/latest/dg/data-quality-gs-studio-notebooks.html

おまけ

WAPパターンとは別にGlueのETL JobでAWS Glue Data Qualityでのチェックを行えばそもそもこのパターンの実装は不要な気がします。
WAPパターンの強みとしては「Write」、「Audit」および「Publish」のステージを明確に分けることができる点とタイムトラベル機能により履歴をさかのぼることができる点だと思います。
この強みが要件として必要な場合にはWAPパターンを検討するのが良いのかなと思います。

まとめ

データの品質確保のWrite-Audit-Publish(WAP)パターンを紹介しました。
そもそもコンシューマ側のアプリケーションでデータの品質の担保が必要とは思いますが、データ基盤側でも品質を保証することで、よりデータの品質を向上させることが可能です。

今回は簡易版となりますが、他のサービスと組み合わせることでより高度なWAP処理を行うことができます。
もしデータ基盤側で品質の担保が必要という要件が出た場合にはWAPパターンでの実装を検討していただければと思います。

Discussion