🛝

Amazon S3 Tables と Rust で戯れる

2024/12/07に公開

この記事は株式会社LabBase テックカレンダー Advent Calendar 2024 7日目の記事です。

ありがたいことに Advent Calendar の時期には AWS re:Invent があります。
AWS re:Invent で発表されたサービスを触れば記事が書けるのです。
ありがたいですねぇ。正月は笑っている方が絶対いいですからね。

今回は新しく発表された Amazon S3 Tables を触ります。

https://aws.amazon.com/jp/blogs/aws/new-amazon-s3-tables-storage-optimized-for-analytics-workloads/

ただし、単純に Amazon S3 Tables を公式のチュートリアルに沿ってやってみたするのではなく、iceberg-rust という Apache Iceberg を Rust で触るためのツールを Amazon S3 Tables に対応するように実装しながら Amazon S3 Tables と戯れます。

Amazon S3 Tables(Table じゃないよ、Tables)

Amazon S3 はみなさんご存知の何でも入るバケツですね。そのバケツが今回テーブルになりました。
意味わからないですね、すみません。
Amazon S3 Tables は Apache Iceberg のマネージドサービスであり、Amazon S3 Tables を利用してテーブルデータを SQL で読み書きができます。テーブルデータといっても、OLAP などの分析ワークロードに適したストレージサービスとなります。

Apache Iceberg

株式会社LabBase には Iceberg Mind というバリューがありますが関係ありません。
Apache Iceberg とはデータレイクのためのテーブル形式の一つで、データレイク上にテーブルデータを構築するためにオブジェクトストレージにどのようにファイルを配置するかを規定した仕様になります。Apache Iceberg について詳しくは他の人が書かれた記事がわかりやすいので読んでください。Apache Iceberg を利用することで、GDPR等の法規制に対応したデータレイクを構築することができます。

元々 Apache Iceberg を利用するには従来の S3 バケットを利用可能ですが、Amazon S3 Tables では Apache Iceberg を運用する上で必要となる Compaction などのメンテナンスタスクを自動で実行してくれます。ただし、その分料金も取られます。

Amazon S3 Tables を触る

公式のチュートリアルがあります。
Spark を利用するため EMR Cluster を立てています。ちょっと嫌です。
EMR Cluster がなくても Spark の環境を用意すればいいですね。
こっちのドキュメントはそうです。
でも Rust を使わないと CTO に怒られます。

iceberg-rust

https://github.com/apache/iceberg-rust
ありました。Rust で Apache Iceberg を触れそうなもの。Apache のお墨付きです。
でもまだだめです。ぽっと出の Amazon S3 Tables には iceberg-rust が対応していません。

Apache Iceberg を利用するには Catalog が必要です。
Catalog とはテーブルのメタデータを管理しています。
先に紹介した Spark を利用するチュートリアルでは、AWS が用意してくれたライブラリを利用していました。これは Amazon S3 Tables API を利用して、Apache Iceberg Catalog として利用できるようにしてくれています。

ということは、同じようなことをすれば iceberg-rust でも Amazon S3 Tables が触れそうです。

iceberg-rust は書き込みがフルサポートされていない

困りました。iceberg-rust ではまだ書き込みがフルサポートされていません
なので今回は泣く泣くチュートリアルで INSERT まで実行したテーブルからデータの読み取りをしたいと思います。

Catalog の実装

iceberg-rust では AWS GlueHive Metastore のための Catalog 実装があります。
これらの実装を眺めると、iceberg-rust には Catalog という trait があり table に対する CRUD のような操作を実装しています。
つまり、Amazon S3 Tables API を利用して Catalog trait を impl すれば良さそうです。

load_table

Apache Iceberg テーブルからデータを読み取るには Table 構造体が必要です。
Table 構造体を生成するには、load_table を実装します。
既存の Catalog 実装を見ると、下記が必要そうです。

  • metadata_location
  • FileIO

まず、metadata_location ですが、GetTableMetadataLocation API から取得できます。
AWS CLI で GetTableMetadataLocation API を叩くと以下のようなレスポンスが帰ってきました。

{
    "versionToken": "61118935834374733ed5",
    "metadataLocation": "s3://1cfa2546-e521-44f3-zaeqouojs434bisib3mqs3ran861huse1b--table-s3/metadata/00000-f6bfe416-d9bd-4031-94d8-9a3801d6ef82.metadata.json",
    "warehouseLocation": "s3://1cfa2546-e521-44f3-zaeqouojs434bisib3mqs3ran861huse1b--table-s3"
}

この metadataLocation は Amazon S3 Tables の Table buckets 上に保存されている JSON ファイルで、Apache Iceberg の Table Metadata になります。
また、GetObject API を利用して取得できます。

Amazon S3 Tables では、テーブルを作成すると Table buckets が作成され、
Table buckets に Table Metadata ファイルやデータファイルが格納されるようです。
また、このバケットには特定の S3 API のみが有効で、例えば ListObjects などは利用できません。

 $ aws s3 ls s3://1cfa2546-e521-44f3-zaeqouojs434bisib3mqs3ran861huse1b--table-s3

An error occurred (MethodNotAllowed) when calling the ListObjectsV2 operation: The specified method is not allowed against this resource.

load_table の話に戻ります。
次に FileIO 構造体が必要ですが、FileIO は実際に S3 バケットとファイルのやり取りをするものです。
FileIO の作成には S3 バケットの URI がわかれば良いので、GetTableMetadataLocation API のレスポンスの warehouseLocation が利用できそうです。

これらを実装すると以下のようになります。

use std::collections::HashMap;
use std::fmt::Debug;
use async_trait::async_trait;
use iceberg::io::FileIO;
use iceberg::spec::TableMetadata;
use iceberg::table::Table;
use iceberg::{Catalog, Namespace, NamespaceIdent, Result, TableCommit, TableCreation, TableIdent};
use typed_builder::TypedBuilder;
use crate::error::from_aws_sdk_error;
use crate::utils::{create_sdk_config, validate_namespace};

#[derive(Debug, TypedBuilder)]
pub struct S3TablesCatalogConfig {
    #[builder(default, setter(strip_option(fallback = uri_opt)))]
    uri: Option<String>,
    bucket_arn: String,
    #[builder(default)]
    props: HashMap<String, String>,
}

struct S3TablesClient(aws_sdk_s3tables::Client);

pub struct S3TablesCatalog {
    config: S3TablesCatalogConfig,
    client: S3TablesClient,
}

impl Debug for S3TablesCatalog {
    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
        f.debug_struct("S3TablesCatalog")
            .field("config", &self.config)
            .finish_non_exhaustive()
    }
}

impl S3TablesCatalog {
    pub async fn new(config: S3TablesCatalogConfig) -> Result<Self> {
        let sdk_config = create_sdk_config(&config.props, config.uri.as_ref()).await;
        let client = aws_sdk_s3tables::Client::new(&sdk_config);
        Ok(S3TablesCatalog {
            config,
            client: S3TablesClient(client),
        })
    }
}

#[async_trait]
impl Catalog for S3TablesCatalog {
    async fn load_table(&self, table: &TableIdent) -> Result<Table> {
        let namespace_name = validate_namespace(table.namespace())?;
        let table_name = table.name();

        let builder = self
            .client
            .0
            .get_table()
            .table_bucket_arn(self.config.bucket_arn.clone())
            .namespace(&namespace_name)
            .name(table_name);

        let s3tables_table_output = builder.send().await.map_err(from_aws_sdk_error)?;

        let metadata_location = s3tables_table_output.metadata_location().unwrap();
        let warehouse_location = s3tables_table_output.warehouse_location();

        let file_io = FileIO::from_path(warehouse_location)?
            .with_props(&self.config.props)
            .build()?;
        let input_file = file_io.new_input(&metadata_location)?;
        let metadata_content = input_file.read().await?;
        let metadata = serde_json::from_slice::<TableMetadata>(&metadata_content)?;

        Table::builder()
            .file_io(file_io)
            .metadata_location(metadata_location)
            .metadata(metadata)
            .identifier(TableIdent::new(
                NamespaceIdent::new(namespace_name),
                table_name.to_string(),
            ))
            .build()
    }
}

Apache Iceberg テーブルからデータを読み取る

チュートリアルでは次のようなデータを入れています。

    INSERT INTO s3tablesbucket.example_namespace.example_table 
    VALUES 
        (1, 'ABC', 100), 
        (2, 'XYZ', 200)

このデータを表示するには次のようなコードを書きます。

async fn get_catalog() -> S3TablesCatalog {
    let props = HashMap::from([(AWS_REGION_NAME.to_string(), "us-east-1".to_string())]);

    let config = S3TablesCatalogConfig::builder()
        .bucket_arn(
            "arn:aws:s3tables:us-east-1:111122223333:bucket/amzn-s3-demo-bucket1".to_string(),
        )
        .props(props.clone())
        .build();

    S3TablesCatalog::new(config).await.unwrap()
}

#[tokio::test]
async fn test_load_table() -> Result<()> {
    let catalog = get_catalog().await;
    let table = catalog
        .load_table(&TableIdent::new(
            NamespaceIdent::new("example_namespace".into()),
            "example_table".to_string(),
        ))
        .await?;

    let batch_stream = table
        .scan()
        .select_all()
        .build()
        .unwrap()
        .to_arrow()
        .await
        .unwrap();
    let batches: Vec<_> = batch_stream.try_collect().await.unwrap();

    println!("{:?}", batches);

    Ok(())
}

これを動かすと次のような出力になりました!

---- test_load_table stdout ----
[RecordBatch { schema: Schema { fields: [Field { name: "id", data_type: Int32, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {"PARQUET:field_id": "1"} }, Field { name: "name", data_type: Utf8, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {"PARQUET:field_id": "2"} }, Field { name: "value", data_type: Int32, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {"PARQUET:field_id": "3"} }], metadata: {} }, columns: [PrimitiveArray<Int32>
[
  2,
], StringArray
[
  "XYZ",
], PrimitiveArray<Int32>
[
  200,
]], row_count: 1 }, RecordBatch { schema: Schema { fields: [Field { name: "id", data_type: Int32, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {"PARQUET:field_id": "1"} }, Field { name: "name", data_type: Utf8, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {"PARQUET:field_id": "2"} }, Field { name: "value", data_type: Int32, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {"PARQUET:field_id": "3"} }], metadata: {} }, columns: [PrimitiveArray<Int32>
[
  1,
], StringArray
[
  "ABC",
], PrimitiveArray<Int32>
[
  100,
]], row_count: 1 }]

無事データが読み込めていそうです。

まとめ

今回のようにチュートリアルに沿って新しいサービスを触れるだけでなく、既存のツールで不足している機能を自分で実装を進めながらサービスを触ることでよりサービスに対する理解が深めることができました。
iceberg-rust における Amazon S3 Tables Catalog 実装は issue が出ているので、時間があるときに実装を進めて Pull Request を出したいと思います。

参考

Discussion