AWS Glue × Apache Iceberg で構築する更新可能なデータレイクテーブル
こんにちは。シンプルフォーム株式会社 にてインフラエンジニアをしています、山岸です。
社内向けに運用しているデータ分析基盤について現状抱えているいくつかの課題を克服すべく、最近は更改に向けた検証に取り組んでいます。今回は取り組みの一つである「AWS Glue と Apache Iceberg によるデータレイクテーブル構築」についてご紹介したいと思います。
概要
- 当社ではデータ分析基盤の ETL 処理に AWS Glue を使用しています。社内のデータ分析業務等のため、RDS データベース等のデータソースから日次で S3 上に構築された DWH に連携しています。
- 現行のデータ分析基盤では、DB テーブル上のデータを毎日全件洗い替えています。このような処理方法は ETL 実装や問題発生時の復旧が簡単である一方、ETL 処理のコスト効率が悪く、データ量の増加に伴って処理時間も長くなっていきます。
- プロダクトの利用実績増加に伴い、これまでの処理方法では ETL にかかるコスト・処理時間が肥大化してきたため、この度 Apache Iceberg を用いて更新可能なデータレイクテーブルの構築に取り組みました。本記事ではその内容について、ETL の実装も交えながらご紹介します。
背景・課題感
現行のデータ分析基盤について
まず、現行のデータ分析基盤について少しご紹介したいと思います。
データ分析基盤は、データソースに負荷をかけず、また様々なデータを統合的に分析することを可能にします。当社では、データ分析基盤の各レイヤー(データレイク/DWH/データマート)をそれぞれ S3 上に構築しており、Glue クローラーでデータカタログ化しています。
BI ツールとして ECS サービス上に構築した Redash を使用しており、データソースとして Redshift Serverless を追加しています。Redshift Spectrum の機能を使用して、Glue データカタログを Redshift の外部テーブルとして登録しておくことで、データ実体を S3 に置いたまま Redshift での分析が可能になります。
各種データソースからのデータ取得や、レイヤー間データ転送のための ETL 処理を Glue ジョブで実装しています。DWH には、データレイクに蓄積された各種アプリケーションログやメトリクスデータの他、RDS からも連携対象として指定されたテーブル群のデータを S3 出力しています。
課題感
冒頭にも述べた通り、上記のような定期的にデータを全件洗い替えるようなアーキテクチャはシンプル、かつ何か問題があった際の復旧が簡単である一方、更新のないレコードに対しても毎回処理の対象になってしまうため、ETL のコスト効率が良くありません。データ量の増加に合わせて ETL ワークフロー全体の処理時間が長くなり、また金銭的コストも重くなっていきます。
レコードの更新日時を格納するカラムを用意しておいて、増分 ETL を実装すれば良いのではないかと思われるかもしれません。確かにデータソースにおけるレコードの更新や削除といったイベントも全てレコード挿入として表現するようなテーブルであればそのような実装も可能ですが、通常の DB テーブルにおいては OLAP 向けに列志向フォーマットで保存されたファイルの一部だけを更新するというのは容易ではありません。
とはいえ問題が深刻化する前に、何とか UPSERT 可能なテーブルの構築と ETL 実装を実現したいという想いがありました。そこで検討したのが、今回ご紹介する Apache Iceberg をはじめとしたデータレイクテーブルフォーマットです。
Apache Iceberg とは
詳しく説明し出すとそれだけで一記事になってしまうので深くは立ち入りませんが、本旨から外れない程度に概要を説明したいと思います。
Apache Iceberg は、大規模な分析ワークロードのために設計された、いわゆる「レイクハウスアーキテクチャ」におけるテーブルフォーマットの一つです。Netflix によって開発され、現在は Apache Software Foundation に寄贈されてオープンソースプロジェクトとなっています。
OLAP ワークロードでは一般的である列志向フォーマットでデータを保存・蓄積しつつも、従来のデータレイクでは持ち得なかった以下のような機能をサポートします。
- ACID トランザクション
- スキーマの進化
- 隠れパーティション
- テーブルスナップショット / タイムトラベルクエリ
レイクハウスアーキテクチャにおいて使用される代表的なテーブルフォーマットとして、Iceberg の他に Apache Hudi や Delta Lake があります。
参考記事
参考にさせて頂いた記事のうちのいくつかを以下に記載しておきます。
- データレイクとデータウェアハウスとは?それぞれの強み・弱みと次世代のデータ管理システム「データレイクハウス」を解説 - Databricks Blog
- データウェアハウスのあらゆるニーズに対応:Icebergによるオープンデータレイクハウス - Cloudera Blog
- Icebergテーブル - Snowflake Documentation
- 5 Compelling Reasons to Choose Apache Iceberg - Snowflake Blog
- How Apache Iceberg enables ACID compliance for data lakes - Sumeet Tandure @Snowflake, Medium
- Comparison of Data Lake Table Formats (Apache Iceberg, Apache Hudi and Delta Lake) - Dremio Blog
実装
アーキテクチャの全体像
以下のようなアーキテクチャを実装しました。
プロダクト用アカウントに存在するアプリケーション DB から、Iceberg テーブルに書き込むデータを 1 つめの Glue ジョブで中間データ用バケットにエクスポートし、これを 2 つ目の Glue ジョブで Iceberg テーブルに書き込みます。初回のテーブル作成と以降のテーブル更新で、ETL 処理の内容は若干異なりますが、ワークフローの構造はほぼ共通です。
- (初回のテーブル作成)Export Full → Create Iceberg Table
- (以降のテーブル更新)Export Diff → Upsert Iceberg Table
ETL ワークフローは、分析アカウント側に作成された AWS Step Functions (SFN) ステートマシンとして実装しています。エクスポート用 Glue ジョブロールの信頼関係に SFN ステートマシン実行ロールを追加しておくことで、クロスアカウントでの Glue ジョブ実行が可能です。
Glue ジョブの共通設定
Glue ジョブで Iceberg テーブルを扱う上で、テーブル作成時・更新時とも必要となる設定について以下に説明します。[1]
--datalake-formats
ジョブパラメータ
組み込みで利用可能な Glue ジョブパラメータ の 1 つである --datalake-formats
を指定します。値は [ hudi
, delta
, iceberg
] から指定可能ですが、今回は iceberg
を指定します。
SparkConf
以下のように SparkConf オブジェクトを作成し、SparkConf.set() メソッドで各種パラメータを設定します。
from pyspark.context import SparkContext
from pyspark.conf import SparkConf
conf = SparkConf()
conf.set("spark.sql.extensions", "org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions")
conf.set("spark.sql.catalog.glue_catalog", "org.apache.iceberg.spark.SparkCatalog")
conf.set("spark.sql.catalog.glue_catalog.warehouse", f"s3://{DLH_BUCKET_NAME}/iceberg/")
conf.set("spark.sql.catalog.glue_catalog.catalog-impl", "org.apache.iceberg.aws.glue.GlueCatalog")
conf.set("spark.sql.catalog.glue_catalog.io-impl", "org.apache.iceberg.aws.s3.S3FileIO")
sc = SparkContext(conf=conf)
glueContext = GlueContext(sc)
spark = glueContext.spark_session
spark.sql.catalog.glue_catalog.warehouse
の値は、ご自身の環境に合わせて設定してください。上記のような設定で、Glue テーブル {database_name}.{table_name}
を作成すると、S3 上では以下のようなパス構造になります。
s3://{DLH_BUCKET_NAME}/
└── iceberg/
└── {database_name}.db/
├── {table_name}/
│ ├── data/
│ └── metadata/
└── ...
テーブル作成ワークフロー
テーブル作成ワークフローから実行される Glue ジョブの実装について以下に説明します。
全量エクスポートジョブ(Export Full)
全量エクスポートでは、データソースであるアプリケーション DB のデータを全てロードし、中間データ用バケットに書き込みます。データ量が大きい場合には、Glue ジョブのワーカータイプやワーカー数を調整します。
def export(table_name) -> None:
# Glue データカタログ経由でアプリケーション DB テーブルのデータをロード
df = glueContext.create_dynamic_frame.from_catalog(
database=SRC_GLUE_DATABASE_NAME,
table_name=f"{SRC_GLUE_TABLE_PREFIX}_{table_name}",
).toDF()
# Parquet 形式で S3 に出力
dst_s3_prefix = os.path.join("path", "to", "target")
df.write.mode("overwrite").parquet(
f"s3://{DST_BUCKET_NAME}/{dst_s3_prefix}/"
)
return
テーブル作成ジョブ(Create Iceberg Table)
中間データ用バケットに出力したデータを PySpark DataFrame として読み込み、TempView を作成して CTAS (CREATE TABLE AS SELECT) クエリを実行します。Glue テーブルが既に存在しているとエラーになるため、以下の実装では事前にテーブルが存在しないことを確認しています。
def create_iceberg_table(df: DataFrame, table_name: str) -> bool:
if not check_if_table_exists(table_name=table_name):
# PySpark DataFrame から TempView の作成
df.createOrReplaceTempView(f"tmp_{table_name}")
# Iceverg V2 を使用した CTAS クエリの実行
glue_table_name = f"{GLUE_TABLE_PREFIX}_{table_name}"
table_uri = f"glue_catalog.{GLUE_DATABASE_NAME}.{glue_table_name}"
ctas_query = f"""
CREATE TABLE {table_uri}
USING iceberg
TBLPROPERTIES ("format-version"="2")
AS SELECT * FROM tmp_{table_name}
"""
spark.sql(ctas_query)
return True
else:
return False
上記実装を見ての通り、CTAS クエリの中で Iceberg V2 テーブルとして作成することを指定します。Table properties として、format-version
以外にも、ファイル保存のフォーマットや圧縮方式などを設定できます。
プロパティ名 | 説明 | デフォルト値 |
---|---|---|
write.format.default |
テーブルのデフォルトファイルフォーマット | parquet |
write.parquet.compression-codec |
Parquet の場合の圧縮方式 | zstd |
テーブル更新ワークフロー
テーブル更新ワークフローから実行される Glue ジョブの実装について以下に説明します。
差分エクスポートジョブ(Export Diff)
差分エクスポートにおいても、アプリケーション DB からロードしたデータを中間データ用バケットに出力する点は全量エクスポートと同じです。
異なる点としては、差分エクスポートでは sampleQuery
オプション [2] を使用して、Glue ジョブワーカーにデータを読み込む前の段階で updated_at
列(レコード更新時タイムスタンプ)によるフィルタリングを実装しています。
def create_updated_at_where_clause(start_time: str, end_time: str) -> str:
"""updated_at の条件句を生成する"""
if start_time != "-" and end_time != "-":
return f"updated_at >= '{start_time}' AND updated_at <= '{end_time}'"
elif start_time != "-" and end_time == "-":
return f"updated_at >= '{start_time}'"
elif start_time == "-" and end_time != "-":
return f"updated_at <= '{end_time}'"
else:
raise ValueError("Either 'start_time' or 'end_time' must be not null.")
def export(table_name: str) -> None:
# sampleQuery の WHERE 句に埋め込む文字列を生成
updated_at_where_clause = create_updated_at_where_clause(
start_time=TARGET_START_TIME,
end_time=TARGET_END_TIME,
)
query = f"""
SELECT * FROM {SRC_GLUE_TABLE_PREFIX}.{table_name}
WHERE {updated_at_where_clause}
"""
# Glue データカタログ経由でアプリケーション DB テーブルのデータをロード
# (sampleQuery オプションを与えることで、Glue ジョブワーカーが読み込む対象データを絞る)
df_diff = glueContext.create_dynamic_frame.from_catalog(
database=SRC_GLUE_DATABASE_NAME,
table_name=f"{SRC_GLUE_TABLE_PREFIX}_{table_name}",
additional_options={
"sampleQuery": query,
}
).toDF()
# 差分レコードが存在する場合、Parquet 形式で S3 に出力
diff_count = df_diff.count()
if diff_count != 0:
dst_s3_prefix = os.path.join("path", "to", "target")
df_diff.write.mode("overwrite").parquet(
f"s3://{DST_BUCKET_NAME}/{dst_s3_prefix}/"
)
return
sampleQuery
については以前の記事でも取り扱っているので、良ければ併せてご覧ください。
- Glue DynamicFrame 生成時のカラム SELECT でパフォーマンス改善した話 - @simpleform, Zenn
テーブル更新ジョブ(Upsert Iceberg Table)
さて、本記事の肝でもあるテーブル更新ジョブです。一般的な UPSERT 処理と同様、Iceberg テーブルにおいても MERGE INTO
ステートメントを使用できます。
当社では原則すべての DB テーブルにサロゲートキー id
カラムを定義していますが、以下の実装は「差分 TempView の ID が既存のテーブル内に存在すれば UPDATE、存在しなければ INSERT を実行する」という処理になります。
def upsert_iceberg_table(df_diff: DataFrame, table_name: str):
# TempView の作成
df_diff.createOrReplaceTempView(diff_view_name)
# UPSERT クエリの実行
iceberg_table_uri = f"glue_catalog.{GLUE_DATABASE_NAME}.{GLUE_TABLE_PREFIX}_{table_name}"
diff_view_name = f"diff_{table_name}"
upsert_query = f"""
MERGE INTO {iceberg_table_uri} AS existing_table
USING {diff_view_name} AS diff_view
ON existing_table.id = diff_view.id
WHEN MATCHED THEN
UPDATE SET *
WHEN NOT MATCHED THEN
INSERT *
"""
spark.sql(upsert_query)
return
実装に関する説明は以上です。
落穂拾い
AWS サービスの対応状況
当社のデータ分析基盤の技術スタックに含まれる AWS Glue および Amazon Redshift について、Apache Iceberg テーブルとの対応状況に言及しておきます。(Amazon Athena や Amazon EMR など、その他のサービスについては公式ドキュメントをご確認ください)
AWS Glue
Apache Iceberg の OSS 最新バージョンは 1.5.0
となっていますが、公式ドキュメント によると Glue バージョンの対応状況は以下の通りです。Glue 4.0 では最新メジャーバージョンである 1.x がサポートされています。
AWS Glue バージョン | サポートされる Iceberg バージョン |
---|---|
4.0 |
1.0.0 |
3.0 |
0.13.1 |
また、Glue クローラーによる Iceberg テーブルのスキーマ抽出もサポートされています。[3]
Amazon Redshift
Iceberg V1, V2 テーブルに対して、Redshift Spectrum を使用した外部表としての読み取りがサポートされています。ただし、読み取り専用でテーブルデータソースへの書き込みはできないなど、いくつか考慮すべき事項もあります。詳細は 公式ドキュメント をご確認ください。
テーブル作成時のオプション設定
本題とは直接関係ないので実装例では割愛しましたが、テーブル作成ジョブの中でテーブル作成と同時に行う処理として、以下のようなものを組み込んでいます。
- 自動コンパクションの設定 ... Iceberg テーブルの読み取り性能を保つために必要な「コンパクション処理」を自動で行ってくれる Glue Data Catalog の機能 [4] の設定。
- テーブル単位の LF-Tag の設定 ... AWS Lake Formation によるきめ細かなアクセス制御 (FGAC) を簡易に設定するための LF-Tag の設定。
いずれもテーブルの増加に合わせて手動、または IaC で継続的に管理するのは難しいかなと感じた設定です。プロジェクトの状況や要件に合わせて検討してみても良いかもしれません。
さいごに
AWS Glue と Apache Iceberg によるデータレイクテーブルの ETL について書いてみました。Iceberg についてはまだまだ触りたてということもあり、まだ挙動を確認できていない機能も多いので、知見が溜まってきたらまたアウトプットしてみたいと思います。読者様においても今回の記事が検討の一助になれば幸いです。
最後まで読んで頂き、ありがとうございました。
-
Iceberg フレームワークの有効化 - AWS Glue ユーザーガイド ↩︎
-
JDBC ソースを操作するときのプッシュダウン - AWS Glue ユーザーガイド ↩︎
-
Introducing AWS Glue crawler and create table support for Apache Iceberg format - AWS Big Data Blog ↩︎
-
AWS Glue Data Catalog now supports automatic compaction of Apache Iceberg tables - AWS News Blog ↩︎
リアルタイム法人調査システム「SimpleCheck」を開発・運営するシンプルフォーム株式会社の開発チームのメンバーが、日々の開発で得た知見や試してみた技術などについて発信していきます。 Publication 運用への移行前の記事は zenn.dev/simpleform からご覧ください。
Discussion