🍧
次世代データレイクの定番になるかもしれない S3 Tables を使ってみた
S3 Tables とは
re:Invent 2024 で発表されたデータ分析に特化したストレージです。Apache Iceberg 形式のデータテーブルに最適化されています。
S3 Tables の何がうれしいのか
Iceberg 形式のデータを時前の汎用 S3 バケットに保存するのと比べ、以下メリットがあります。
- クエリ速度 3 倍、トランザクション速度 10 倍
- コンパクション・スナップショット・参照切れファイル削除の自動化
- Athena, Redshift, EMR, Glue 等分析サービスとシームレスな連携
Apache Iceberg とは
2017 年に Netflix が開発したデータ形式で、OTF (Open Table Format) のひとつです。
Apache Iceberg の特徴
- 同時書き込み、読み込みの一貫性 (ACID)
- データ操作が広告
- タイムトラベル
- スキーマ・パーティションの進化(エボリューション)
- Hidden パーティショニング
S3 Tables 使ってみた
やったこと
- EC2 作成 & Spark インストール
- Table Bucket 作成
- Namespace & Table作成(Spark シェル)
- データの挿入(Sparkシェル)
- Lake Formation で Datacatalog のアクセス権限付与
- Athena でクエリ
コマンドや実行結果等
Spark インストール
wget https://dlcdn.apache.org/spark/spark-3.5.3/spark-3.5.3-bin-hadoop3.tgz
tar xvf spark-3.5.3-bin-hadoop3.tgz
vi ~/.bashrc
export SPARK_HOME=/home/ec2-user/environment/s3tables/spark-3.5.3-bin-hadoop3
export PATH=$PATH:$SPARK_HOME/bin
source ~/.bashrc
spark-shell --version
Welcome to
____ __
/ __/__ ___ _____/ /__
_\ \/ _ \/ _ `/ __/ '_/
/___/ .__/\_,_/_/ /_/\_\ version 3.5.3
/_/
Using Scala version 2.12.18, OpenJDK 64-Bit Server VM, 17.0.13
Branch HEAD
Compiled by user haejoon.lee on 2024-09-09T05:20:05Z
Revision 32232e9ed33bb16b93ad58cfde8b82e0f07c0970
Url https://github.com/apache/spark
Type --help for more information.
S3 Tables 接続
spark-shell \
--packages org.apache.iceberg:iceberg-spark-runtime-3.5_2.12:1.6.1,software.amazon.s3tables:s3-tables-catalog-for-iceberg-runtime:0.1.3,software.amazon.awssdk:s3tables:2.29.26,software.amazon.awssdk:s3:2.29.26,software.amazon.awssdk:sts:2.29.26,software.amazon.awssdk:kms:2.29.26,software.amazon.awssdk:dynamodb:2.29.26,software.amazon.awssdk:kms:2.29.26,software.amazon.awssdk:glue:2.29.26,org.apache.hadoop:hadoop-aws:3.3.4 \
--conf spark.sql.catalog.s3tablesbucket=org.apache.iceberg.spark.SparkCatalog \
--conf spark.sql.catalog.s3tablesbucket.catalog-impl=software.amazon.s3tables.iceberg.S3TablesCatalog \
--conf spark.sql.catalog.s3tablesbucket.warehouse=arn:aws:s3tables:us-east-1:xxxxxxxxxxxx:bucket/test-s3tables \
--conf spark.sql.extensions=org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions \
--conf spark.driver.extraJavaOptions="-Djava.security.manager=allow"
データ操作
spark.sql("create namespace if not exists s3tablesbucket.sample_ns")
spark.sql("create table if not exists s3tablesbucket.sample_ns.sales ( product string, amount int, timestamp timestamp) using iceberg ")
spark.sql("""
INSERT INTO s3tablesbucket.sample_ns.sales
VALUES
('Laptop', 1200, timestamp '2023-12-01 10:30:00'),
('Smartphone', 800, timestamp '2023-12-01 11:15:00'),
('Headphones', 150, timestamp '2023-12-02 09:45:00'),
('Monitor', 350, timestamp '2023-12-02 14:20:00'),
('Keyboard', 80, timestamp '2023-12-03 16:30:00'),
('Mouse', 45, timestamp '2023-12-03 16:35:00'),
('Tablet', 500, timestamp '2023-12-04 13:15:00'),
('Printer', 250, timestamp '2023-12-04 15:45:00'),
('Speaker', 120, timestamp '2023-12-05 10:20:00'),
('Webcam', 90, timestamp '2023-12-05 11:30:00')
""")
spark.sql("select * from s3tablesbucket.sample_ns.sales").show()
+----------+------+-------------------+
| product|amount| timestamp|
+----------+------+-------------------+
| Laptop| 1200|2023-12-01 10:30:00|
|Smartphone| 800|2023-12-01 11:15:00|
|Headphones| 150|2023-12-02 09:45:00|
| Monitor| 350|2023-12-02 14:20:00|
| Keyboard| 80|2023-12-03 16:30:00|
| Mouse| 45|2023-12-03 16:35:00|
| Tablet| 500|2023-12-04 13:15:00|
| Printer| 250|2023-12-04 15:45:00|
| Speaker| 120|2023-12-05 10:20:00|
| Webcam| 90|2023-12-05 11:30:00|
+----------+------+-------------------+
タイムトラベル
spark.sql("update s3tablesbucket.sample_ns.sales SET amount = 0")
spark.sql("select * from s3tablesbucket.sample_ns.sales").show()
+----------+------+-------------------+
| product|amount| timestamp|
+----------+------+-------------------+
| Mouse| 0|2023-12-03 16:35:00|
| Tablet| 0|2023-12-04 13:15:00|
| Printer| 0|2023-12-04 15:45:00|
| Speaker| 0|2023-12-05 10:20:00|
| Webcam| 0|2023-12-05 11:30:00|
| Laptop| 0|2023-12-01 10:30:00|
|Smartphone| 0|2023-12-01 11:15:00|
|Headphones| 0|2023-12-02 09:45:00|
| Monitor| 0|2023-12-02 14:20:00|
| Keyboard| 0|2023-12-03 16:30:00|
+----------+------+-------------------+
spark.sql("select * from s3tablesbucket.sample_ns.sales timestamp as of '2024-12-12 00:00:00';").show()
+----------+------+-------------------+
| product|amount| timestamp|
+----------+------+-------------------+
| Laptop| 1200|2023-12-01 10:30:00|
|Smartphone| 800|2023-12-01 11:15:00|
|Headphones| 150|2023-12-02 09:45:00|
| Monitor| 350|2023-12-02 14:20:00|
| Keyboard| 80|2023-12-03 16:30:00|
| Mouse| 45|2023-12-03 16:35:00|
| Tablet| 500|2023-12-04 13:15:00|
| Printer| 250|2023-12-04 15:45:00|
| Speaker| 120|2023-12-05 10:20:00|
| Webcam| 90|2023-12-05 11:30:00|
+----------+------+-------------------+
spark.sql("select * from s3tablesbucket.sample_ns.sales.snapshots").show()
+--------------------+-------------------+-------------------+---------+--------------------+--------------------+
| committed_at| snapshot_id| parent_id|operation| manifest_list| summary|
+--------------------+-------------------+-------------------+---------+--------------------+--------------------+
|2024-12-09 12:32:...|4933515383696854367| NULL| append|s3://96d13727-a49...|{spark.app.id -> ...|
|2024-12-12 08:56:...|3396824992302167843|4933515383696854367|overwrite|s3://96d13727-a49...|{spark.app.id -> ...|
+--------------------+-------------------+-------------------+---------+--------------------+--------------------+
spark.sql(f"CALL s3tablesbucket.system.rollback_to_snapshot('s3tablesbucket.sample_ns.sales', 4933515383696854367)")
spark.sql("select * from s3tablesbucket.sample_ns.sales").show()
+----------+------+-------------------+
| product|amount| timestamp|
+----------+------+-------------------+
| Laptop| 1200|2023-12-01 10:30:00|
|Smartphone| 800|2023-12-01 11:15:00|
|Headphones| 150|2023-12-02 09:45:00|
| Monitor| 350|2023-12-02 14:20:00|
| Keyboard| 80|2023-12-03 16:30:00|
| Mouse| 45|2023-12-03 16:35:00|
| Tablet| 500|2023-12-04 13:15:00|
| Printer| 250|2023-12-04 15:45:00|
| Speaker| 120|2023-12-05 10:20:00|
| Webcam| 90|2023-12-05 11:30:00|
+----------+------+-------------------+
パフォーマンス測定
Go で適当なデータを作成し S3 上のデータを Athena でクエリ、速度を計測します。
1,000 行 × 1,000 ファイル = 1,000,000 レコードのデータでは以下結果となりました。
センサーデータ作成プログラム
package main
import (
"context"
"fmt"
"math/rand"
"os"
"time"
"github.com/aws/aws-sdk-go-v2/config"
"github.com/aws/aws-sdk-go-v2/service/s3"
"github.com/xitongsys/parquet-go-source/local"
"github.com/xitongsys/parquet-go/writer"
)
// SensorRecord represents a single sensor data record
type SensorRecord struct {
SensorID int64 `parquet:"name=sensor_id, type=INT64"`
XAxis float64 `parquet:"name=x_axis, type=DOUBLE"`
YAxis float64 `parquet:"name=y_axis, type=DOUBLE"`
}
// generateSensorRecords creates a slice of sensor records
func generateSensorRecords(numRecords int) []SensorRecord {
records := make([]SensorRecord, numRecords)
rand.Seed(time.Now().UnixNano())
for i := 0; i < numRecords; i++ {
records[i] = SensorRecord{
SensorID: rand.Int63n(1000), // Random sensor ID between 0-999
XAxis: rand.Float64() * 100, // Random X value between 0-100
YAxis: rand.Float64() * 100, // Random Y value between 0-100
}
}
return records
}
// writeParquetFile writes sensor records to a parquet file
func writeParquetFile(records []SensorRecord, filename string) error {
// Create local file writer
fw, err := local.NewLocalFileWriter(filename)
if err != nil {
return fmt.Errorf("create file writer error: %v", err)
}
defer fw.Close()
// Create parquet writer
pw, err := writer.NewParquetWriter(fw, new(SensorRecord), 4)
if err != nil {
return fmt.Errorf("create parquet writer error: %v", err)
}
// Write records
for _, record := range records {
if err = pw.Write(record); err != nil {
return fmt.Errorf("write record error: %v", err)
}
}
// Close the writer
if err = pw.WriteStop(); err != nil {
return fmt.Errorf("write stop error: %v", err)
}
return nil
}
// uploadToS3 uploads a file to the specified S3 bucket
func uploadToS3(bucketName, filename string) error {
// Load the AWS SDK configuration
cfg, err := config.LoadDefaultConfig(context.TODO(), config.WithRegion("us-east-1"))
if err != nil {
return fmt.Errorf("unable to load SDK config: %v", err)
}
// Create an S3 client
client := s3.NewFromConfig(cfg)
// Open the file
file, err := os.Open(filename)
if err != nil {
return fmt.Errorf("unable to open file: %v", err)
}
defer file.Close()
// Upload the file to S3
_, err = client.PutObject(context.TODO(), &s3.PutObjectInput{
Bucket: &bucketName,
Key: &filename,
Body: file,
})
return err
}
func main() {
// Configuration parameters
recordsPerFile := 1000
numFiles := 1000
s3BucketName := "test-s3tables-parquet"
// Generate and upload multiple files
for i := 0; i < numFiles; i++ {
// Generate records
records := generateSensorRecords(recordsPerFile)
// Create filename
filename := fmt.Sprintf("sensor_data_%d.parquet", i+1)
// Write parquet file
err := writeParquetFile(records, filename)
if err != nil {
fmt.Printf("Error writing parquet file %s: %v\n", filename, err)
continue
}
// Upload to S3
err = uploadToS3(s3BucketName, filename)
if err != nil {
fmt.Printf("Error uploading file %s to S3: %v\n", filename, err)
}
}
fmt.Println("Sensor data generation and upload completed.")
}
S3 Tables に既存データをロード
spark.sql(
""" CREATE TABLE IF NOT EXISTS s3tablesbucket.sample_ns.sensor_data (
sensor_id BIGINT,
x_axis DOUBLE,
y_axis DOUBLE
)
USING iceberg """
)
val data_file_location = "s3a://test-s3tables-parquet/"
val data_file = spark.read.option("header","true").option("recursiveFileLookup","true").parquet(data_file_location)
data_file.writeTo("s3tablesbucket.sample_ns.sensor_data").using("Iceberg").tableProperty("format-version", "2").createOrReplace()
Athena のクエリ
select * from "sensor_data" where x_axis > 90 and y_axis > 90
| ファイル形式 | クエリ速度 |
|---|---|
| parquet | 5.94 sec |
| iceberg | 7.15 sec |
| s3tables | 6.73 sec |
S3 Tables が最速とはなりませんでした。AWS サポートに問い合わせてみたものの簡単には解決せず…。データの特性やクエリによっては、必ずしも他のファイル形式と比べて早くなるわけではないようです。実利用する際はパフォーマンス測定を忘れず検証に組み込みましょう。
Discussion