😎

RustでAWSのS3で一覧を大量に楽して取得したい

2024/03/14に公開

目的

AWSのS3にたくさんオブジェクトがあって、それを全部取得したいです。
この時ListObjectsV2を使って取得すると1000個づつ取得できますが、次のページを取得する必要があります。
ここではページングを勝手にやってくれて、ストリームで処理する方法を紹介します。

コード

ストリーム取得部分

今回のコードの肝はここになります。
into_paginatorでページ送りを自動でやってくれます。
またinto_stream_03xでストリームが取得できるようになります。

fn get_stream(
    client: &Client,
    bucket_name: &str,
    prefix: Option<String>,
) -> impl Stream<Item = anyhow::Result<ListObjectsV2Output>> {
    client
        .list_objects_v2()
        .bucket(bucket_name)
        .set_prefix(prefix)
        .into_paginator()
        .send()
        .into_stream_03x()
        .err_into()
}

全体

Cargo.toml
[package]
name = "s3"
version = "0.1.0"
edition = "2021"

[dependencies]
anyhow = "1"
aws-smithy-types-convert = { version = "0.60.2", features = ["convert-streams"] }
aws-config = { version = "1.1.7", features = ["behavior-version-latest"] }
aws-sdk-s3 = "1.17.0"
futures-util = "0.3.30"
tokio = { version = "1", features = ["macros", "rt-multi-thread"] }
main.rs
use aws_config::Region;
use aws_sdk_s3::{
    config::Credentials,
    operation::{list_objects_v2::ListObjectsV2Output, put_object::PutObjectOutput},
    primitives::ByteStream,
    Client,
};
use aws_smithy_types_convert::stream::PaginationStreamExt;
use futures_util::{Stream, TryStreamExt};

#[tokio::main]
async fn main() -> anyhow::Result<()> {
    let bucket_name = "my-bucket";
    let credentials_provider = Credentials::new("admin123", "admin123", None, None, "example");
    let config = aws_sdk_s3::Config::builder()
        .behavior_version_latest()
        .credentials_provider(credentials_provider)
        .region(Region::new("ap-northeast-1"))
        .force_path_style(true)
        .endpoint_url("http://localhost:9000")
        .build();
    let client = Client::from_conf(config);
    for i in 1..1100 {
        let _ = put_object(&client, bucket_name, &i.to_string(), i.to_string()).await?;
    }
    let mut stream = get_stream(&client, bucket_name, None);
    while let Some(item) = stream.try_next().await? {
        println!("{}", item.contents().len());
        for line in item.contents() {
            println!("{:?}", line.key());
        }
    }
    Ok(())
}

async fn put_object(
    client: &Client,
    bucket_name: &str,
    key: &str,
    data: String,
) -> anyhow::Result<PutObjectOutput> {
    client
        .put_object()
        .set_bucket(Some(bucket_name.to_owned()))
        .set_key(Some(key.to_owned()))
        .set_body(Some(ByteStream::from(data.into_bytes())))
        .send()
        .await
        .map_err(|e| e.into())
}

fn get_stream(
    client: &Client,
    bucket_name: &str,
    prefix: Option<String>,
) -> impl Stream<Item = anyhow::Result<ListObjectsV2Output>> {
    client
        .list_objects_v2()
        .bucket(bucket_name)
        .set_prefix(prefix)
        .into_paginator()
        .send()
        .into_stream_03x()
        .err_into()
}

まとめ

コードはこちらにあります。
ストリームなので全部をメモリに持たない安心感もありますね。

Discussion