🍧

次世代データレイクの定番になるかもしれない S3 Tables を使ってみた

に公開

S3 Tables とは

re:Invent 2024 で発表されたデータ分析に特化したストレージです。Apache Iceberg 形式のデータテーブルに最適化されています。

https://aws.amazon.com/jp/blogs/news/new-amazon-s3-tables-storage-optimized-for-analytics-workloads/

S3 Tables の何がうれしいのか

Iceberg 形式のデータを時前の汎用 S3 バケットに保存するのと比べ、以下メリットがあります。

  • クエリ速度 3 倍、トランザクション速度 10 倍
  • コンパクション・スナップショット・参照切れファイル削除の自動化
  • Athena, Redshift, EMR, Glue 等分析サービスとシームレスな連携

https://aws.amazon.com/jp/s3/features/tables/

Apache Iceberg とは

2017 年に Netflix が開発したデータ形式で、OTF (Open Table Format) のひとつです。

https://pages.awscloud.com/rs/112-TZM-766/images/AWS-Black-Belt_2023_Datalake-Format-On-AWS_0516_v1.pdf

Apache Iceberg の特徴

  • 同時書き込み、読み込みの一貫性 (ACID)
  • データ操作が広告
  • タイムトラベル
  • スキーマ・パーティションの進化(エボリューション)
  • Hidden パーティショニング

https://github.com/lawofcycles/apache-iceberg-101-ja

S3 Tables 使ってみた

やったこと

  • EC2 作成 & Spark インストール
  • Table Bucket 作成
  • Namespace & Table作成(Spark シェル)
  • データの挿入(Sparkシェル)
  • Lake Formation で Datacatalog のアクセス権限付与
  • Athena でクエリ

コマンドや実行結果等

Spark インストール
wget https://dlcdn.apache.org/spark/spark-3.5.3/spark-3.5.3-bin-hadoop3.tgz
tar xvf spark-3.5.3-bin-hadoop3.tgz

vi ~/.bashrc
export SPARK_HOME=/home/ec2-user/environment/s3tables/spark-3.5.3-bin-hadoop3
export PATH=$PATH:$SPARK_HOME/bin
source ~/.bashrc

spark-shell --version
Welcome to
      ____              __
     / __/__  ___ _____/ /__
    _\ \/ _ \/ _ `/ __/  '_/
   /___/ .__/\_,_/_/ /_/\_\   version 3.5.3
      /_/
                        
Using Scala version 2.12.18, OpenJDK 64-Bit Server VM, 17.0.13
Branch HEAD
Compiled by user haejoon.lee on 2024-09-09T05:20:05Z
Revision 32232e9ed33bb16b93ad58cfde8b82e0f07c0970
Url https://github.com/apache/spark
Type --help for more information.
S3 Tables 接続
 spark-shell \
 --packages org.apache.iceberg:iceberg-spark-runtime-3.5_2.12:1.6.1,software.amazon.s3tables:s3-tables-catalog-for-iceberg-runtime:0.1.3,software.amazon.awssdk:s3tables:2.29.26,software.amazon.awssdk:s3:2.29.26,software.amazon.awssdk:sts:2.29.26,software.amazon.awssdk:kms:2.29.26,software.amazon.awssdk:dynamodb:2.29.26,software.amazon.awssdk:kms:2.29.26,software.amazon.awssdk:glue:2.29.26,org.apache.hadoop:hadoop-aws:3.3.4 \
 --conf spark.sql.catalog.s3tablesbucket=org.apache.iceberg.spark.SparkCatalog \
 --conf spark.sql.catalog.s3tablesbucket.catalog-impl=software.amazon.s3tables.iceberg.S3TablesCatalog \
 --conf spark.sql.catalog.s3tablesbucket.warehouse=arn:aws:s3tables:us-east-1:xxxxxxxxxxxx:bucket/test-s3tables \
 --conf spark.sql.extensions=org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions \
 --conf spark.driver.extraJavaOptions="-Djava.security.manager=allow"
データ操作
spark.sql("create namespace if not exists s3tablesbucket.sample_ns")
spark.sql("create table if not exists s3tablesbucket.sample_ns.sales ( product string, amount int, timestamp timestamp) using iceberg ")
spark.sql("""
    INSERT INTO s3tablesbucket.sample_ns.sales
    VALUES 
        ('Laptop', 1200, timestamp '2023-12-01 10:30:00'),
        ('Smartphone', 800, timestamp '2023-12-01 11:15:00'),
        ('Headphones', 150, timestamp '2023-12-02 09:45:00'),
        ('Monitor', 350, timestamp '2023-12-02 14:20:00'),
        ('Keyboard', 80, timestamp '2023-12-03 16:30:00'),
        ('Mouse', 45, timestamp '2023-12-03 16:35:00'),
        ('Tablet', 500, timestamp '2023-12-04 13:15:00'),
        ('Printer', 250, timestamp '2023-12-04 15:45:00'),
        ('Speaker', 120, timestamp '2023-12-05 10:20:00'),
        ('Webcam', 90, timestamp '2023-12-05 11:30:00')
     """)
spark.sql("select * from s3tablesbucket.sample_ns.sales").show()
+----------+------+-------------------+                                         
|   product|amount|          timestamp|
+----------+------+-------------------+
|    Laptop|  1200|2023-12-01 10:30:00|
|Smartphone|   800|2023-12-01 11:15:00|
|Headphones|   150|2023-12-02 09:45:00|
|   Monitor|   350|2023-12-02 14:20:00|
|  Keyboard|    80|2023-12-03 16:30:00|
|     Mouse|    45|2023-12-03 16:35:00|
|    Tablet|   500|2023-12-04 13:15:00|
|   Printer|   250|2023-12-04 15:45:00|
|   Speaker|   120|2023-12-05 10:20:00|
|    Webcam|    90|2023-12-05 11:30:00|
+----------+------+-------------------+
タイムトラベル
spark.sql("update s3tablesbucket.sample_ns.sales SET amount = 0")
spark.sql("select * from s3tablesbucket.sample_ns.sales").show()
+----------+------+-------------------+
|   product|amount|          timestamp|
+----------+------+-------------------+
|     Mouse|     0|2023-12-03 16:35:00|
|    Tablet|     0|2023-12-04 13:15:00|
|   Printer|     0|2023-12-04 15:45:00|
|   Speaker|     0|2023-12-05 10:20:00|
|    Webcam|     0|2023-12-05 11:30:00|
|    Laptop|     0|2023-12-01 10:30:00|
|Smartphone|     0|2023-12-01 11:15:00|
|Headphones|     0|2023-12-02 09:45:00|
|   Monitor|     0|2023-12-02 14:20:00|
|  Keyboard|     0|2023-12-03 16:30:00|
+----------+------+-------------------+
spark.sql("select * from s3tablesbucket.sample_ns.sales timestamp as of '2024-12-12 00:00:00';").show()
+----------+------+-------------------+                                         
|   product|amount|          timestamp|
+----------+------+-------------------+
|    Laptop|  1200|2023-12-01 10:30:00|
|Smartphone|   800|2023-12-01 11:15:00|
|Headphones|   150|2023-12-02 09:45:00|
|   Monitor|   350|2023-12-02 14:20:00|
|  Keyboard|    80|2023-12-03 16:30:00|
|     Mouse|    45|2023-12-03 16:35:00|
|    Tablet|   500|2023-12-04 13:15:00|
|   Printer|   250|2023-12-04 15:45:00|
|   Speaker|   120|2023-12-05 10:20:00|
|    Webcam|    90|2023-12-05 11:30:00|
+----------+------+-------------------+
spark.sql("select * from s3tablesbucket.sample_ns.sales.snapshots").show()
+--------------------+-------------------+-------------------+---------+--------------------+--------------------+
|        committed_at|        snapshot_id|          parent_id|operation|       manifest_list|             summary|
+--------------------+-------------------+-------------------+---------+--------------------+--------------------+
|2024-12-09 12:32:...|4933515383696854367|               NULL|   append|s3://96d13727-a49...|{spark.app.id -> ...|
|2024-12-12 08:56:...|3396824992302167843|4933515383696854367|overwrite|s3://96d13727-a49...|{spark.app.id -> ...|
+--------------------+-------------------+-------------------+---------+--------------------+--------------------+
spark.sql(f"CALL s3tablesbucket.system.rollback_to_snapshot('s3tablesbucket.sample_ns.sales', 4933515383696854367)")
spark.sql("select * from s3tablesbucket.sample_ns.sales").show()
+----------+------+-------------------+                                         
|   product|amount|          timestamp|
+----------+------+-------------------+
|    Laptop|  1200|2023-12-01 10:30:00|
|Smartphone|   800|2023-12-01 11:15:00|
|Headphones|   150|2023-12-02 09:45:00|
|   Monitor|   350|2023-12-02 14:20:00|
|  Keyboard|    80|2023-12-03 16:30:00|
|     Mouse|    45|2023-12-03 16:35:00|
|    Tablet|   500|2023-12-04 13:15:00|
|   Printer|   250|2023-12-04 15:45:00|
|   Speaker|   120|2023-12-05 10:20:00|
|    Webcam|    90|2023-12-05 11:30:00|
+----------+------+-------------------+

パフォーマンス測定

Go で適当なデータを作成し S3 上のデータを Athena でクエリ、速度を計測します。
1,000 行 × 1,000 ファイル = 1,000,000 レコードのデータでは以下結果となりました。

センサーデータ作成プログラム
package main

import (
	"context"
	"fmt"
	"math/rand"
	"os"
	"time"

	"github.com/aws/aws-sdk-go-v2/config"
	"github.com/aws/aws-sdk-go-v2/service/s3"
	"github.com/xitongsys/parquet-go-source/local"
	"github.com/xitongsys/parquet-go/writer"
)

// SensorRecord represents a single sensor data record
type SensorRecord struct {
	SensorID int64   `parquet:"name=sensor_id, type=INT64"`
	XAxis    float64 `parquet:"name=x_axis, type=DOUBLE"`
	YAxis    float64 `parquet:"name=y_axis, type=DOUBLE"`
}

// generateSensorRecords creates a slice of sensor records
func generateSensorRecords(numRecords int) []SensorRecord {
	records := make([]SensorRecord, numRecords)
	rand.Seed(time.Now().UnixNano())

	for i := 0; i < numRecords; i++ {
		records[i] = SensorRecord{
			SensorID: rand.Int63n(1000),    // Random sensor ID between 0-999
			XAxis:    rand.Float64() * 100, // Random X value between 0-100
			YAxis:    rand.Float64() * 100, // Random Y value between 0-100
		}
	}

	return records
}

// writeParquetFile writes sensor records to a parquet file
func writeParquetFile(records []SensorRecord, filename string) error {
	// Create local file writer
	fw, err := local.NewLocalFileWriter(filename)
	if err != nil {
		return fmt.Errorf("create file writer error: %v", err)
	}
	defer fw.Close()

	// Create parquet writer
	pw, err := writer.NewParquetWriter(fw, new(SensorRecord), 4)
	if err != nil {
		return fmt.Errorf("create parquet writer error: %v", err)
	}

	// Write records
	for _, record := range records {
		if err = pw.Write(record); err != nil {
			return fmt.Errorf("write record error: %v", err)
		}
	}

	// Close the writer
	if err = pw.WriteStop(); err != nil {
		return fmt.Errorf("write stop error: %v", err)
	}

	return nil
}

// uploadToS3 uploads a file to the specified S3 bucket
func uploadToS3(bucketName, filename string) error {
	// Load the AWS SDK configuration
	cfg, err := config.LoadDefaultConfig(context.TODO(), config.WithRegion("us-east-1"))
	if err != nil {
		return fmt.Errorf("unable to load SDK config: %v", err)
	}

	// Create an S3 client
	client := s3.NewFromConfig(cfg)

	// Open the file
	file, err := os.Open(filename)
	if err != nil {
		return fmt.Errorf("unable to open file: %v", err)
	}
	defer file.Close()

	// Upload the file to S3
	_, err = client.PutObject(context.TODO(), &s3.PutObjectInput{
		Bucket: &bucketName,
		Key:    &filename,
		Body:   file,
	})

	return err
}

func main() {
	// Configuration parameters
	recordsPerFile := 1000
	numFiles := 1000
	s3BucketName := "test-s3tables-parquet"

	// Generate and upload multiple files
	for i := 0; i < numFiles; i++ {
		// Generate records
		records := generateSensorRecords(recordsPerFile)

		// Create filename
		filename := fmt.Sprintf("sensor_data_%d.parquet", i+1)

		// Write parquet file
		err := writeParquetFile(records, filename)
		if err != nil {
			fmt.Printf("Error writing parquet file %s: %v\n", filename, err)
			continue
		}

		// Upload to S3
		err = uploadToS3(s3BucketName, filename)
		if err != nil {
			fmt.Printf("Error uploading file %s to S3: %v\n", filename, err)
		}
	}

	fmt.Println("Sensor data generation and upload completed.")
}
S3 Tables に既存データをロード
spark.sql( 
""" CREATE TABLE IF NOT EXISTS s3tablesbucket.sample_ns.sensor_data ( 
    sensor_id BIGINT, 
    x_axis DOUBLE, 
    y_axis DOUBLE 
) 
USING iceberg """
)
val data_file_location = "s3a://test-s3tables-parquet/"
val data_file = spark.read.option("header","true").option("recursiveFileLookup","true").parquet(data_file_location)
data_file.writeTo("s3tablesbucket.sample_ns.sensor_data").using("Iceberg").tableProperty("format-version", "2").createOrReplace()
Athena のクエリ
select * from "sensor_data" where x_axis > 90 and y_axis > 90
ファイル形式 クエリ速度
parquet 5.94 sec
iceberg 7.15 sec
s3tables 6.73 sec

S3 Tables が最速とはなりませんでした。AWS サポートに問い合わせてみたものの簡単には解決せず…。データの特性やクエリによっては、必ずしも他のファイル形式と比べて早くなるわけではないようです。実利用する際はパフォーマンス測定を忘れず検証に組み込みましょう。

Discussion