📌

S3 Selectをaws-sdk-rustで行う

2024/07/15に公開

S3を準備

S3 Bucketを作成

CloudFormationを用いて以下を作成します。

  • S3 Bucket
  • 作成したS3からデータを取得するためのIAMユーザー
AWSTemplateFormatVersion: "2010-09-09"
Resources:
  S3Bucket:
    Type: "AWS::S3::Bucket"
    Properties:
      PublicAccessBlockConfiguration:
        BlockPublicAcls: true
        BlockPublicPolicy: true
        IgnorePublicAcls: true
        RestrictPublicBuckets: true
      OwnershipControls:
        Rules:
          - ObjectOwnership: BucketOwnerEnforced
      VersioningConfiguration:
        Status: Enabled
      BucketEncryption:
        ServerSideEncryptionConfiguration:
          - ServerSideEncryptionByDefault:
              SSEAlgorithm: AES256
            BucketKeyEnabled: true
  S3AccessUser:
    Type: "AWS::IAM::User"
    Properties:
      Path: /
      UserName: S3AccessUser
      Policies:
        - PolicyName: S3AccessPolicy
          PolicyDocument:
            Version: "2012-10-17"
            Statement:
              - Effect: Allow
                Action:
                  - "s3:GetObject"
                  - "s3:ListBucket"
                Resource:
                  - !Sub "arn:aws:s3:::${S3Bucket}"
                  - !Sub "arn:aws:s3:::${S3Bucket}/*"

Outputs:
  BucketName:
    Value: !Ref S3Bucket
    Description: Name of S3 bucket
  UserName:
    Value: !Ref S3AccessUser
    Description: Name of IAM user

マネジメントコンソールからCloudFormation Stackを作成します。

データの準備

テストデータはJSON Linesファイルで用意してみます。
以下のファイルを作成されたBucketにアップロードします。

test.jsonl
{ "id": 1, "name": "aaaa" }
{ "id": 2, "name": "bbbb" }
{ "id": 3, "name": "cccc" }
{ "id": 4, "name": "dddd" }

~/.aws/credentials の設定

[test-user]
aws_access_key_id = 作成されたIAMユーザーのアクセスキー
aws_secret_access_key = 作成されたIAMユーザーのシークレットアクセスキー

Rust

依存関係

Cargo.toml
[dependencies]
aws-config = { version = "1.5.4", features = ["behavior-version-latest"] }
aws-sdk-s3 = "1.40.0"
tokio = { version = "1", features = ["macros", "rt-multi-thread"] }
serde = { version = "1.0.136", features = ["derive"] }
serde_json = "1.0.120"

コード

main.rs
use std::{env, error::Error};

use aws_sdk_s3::{
    types::{
        InputSerialization, JsonInput, JsonOutput, OutputSerialization,
        SelectObjectContentEventStream,
    },
    Client,
};
use serde::{de::IgnoredAny, Deserialize};
use serde_json::from_str;

#[derive(Deserialize, Debug)]
pub struct Record {
    pub id: i32,
    pub name: String,
}

type AppError = Box<dyn Error>;

fn parse_line_bufferd(buf: &mut String, line: &str) -> Result<Option<Record>, AppError> {
    if buf.is_empty() && is_valid_json(line) {
        Ok(Some(from_str(line)?))
    } else {
        buf.push_str(line);
        if is_valid_json(&buf) {
            let record = from_str(buf)?;
            buf.clear();
            Ok(Some(record))
        } else {
            Ok(None)
        }
    }
}

fn is_valid_json(data: impl AsRef<str>) -> bool {
    from_str::<IgnoredAny>(data.as_ref()).is_ok()
}

async fn handler(client: &Client) -> Result<Vec<Record>, AppError> {
    let bucket_name = match env::var("BUCKET_NAME") {
        Ok(val) => val,
        Err(_) => panic!("BUCKET_NAME is not set"),
    };
    let object_key = match env::var("OBJECT_KEY") {
        Ok(val) => val,
        Err(_) => panic!("OBJECT_KEY is not set"),
    };

    let mut output = match client
        .select_object_content()
        .bucket(bucket_name)
        .key(object_key)
        .expression_type(aws_sdk_s3::types::ExpressionType::Sql)
        .expression("SELECT * FROM s3object s")
        .input_serialization(
            InputSerialization::builder()
                .json(
                    JsonInput::builder()
                        .r#type(aws_sdk_s3::types::JsonType::Lines)
                        .build(),
                )
                .build(),
        )
        .output_serialization(
            OutputSerialization::builder()
                .json(JsonOutput::builder().build())
                .build(),
        )
        .send()
        .await
    {
        Ok(val) => val,
        Err(e) => panic!("Error: {:?}", e),
    };

    let mut processed_records: Vec<Record> = vec![];
    let mut buf = String::new();

    while let Some(event) = output.payload.recv().await? {
        if let SelectObjectContentEventStream::Records(records) = event {
            let records_str = records.payload().map(|p| p.as_ref()).unwrap_or_default();
            let records_str = std::str::from_utf8(records_str).expect("invalid utf8");
            for line in records_str.lines() {
                if let Some(record) = parse_line_bufferd(&mut buf, line)? {
                    processed_records.push(record);
                }
            }
        }
    }

    Ok(processed_records)
}

#[tokio::main]
async fn main() -> Result<(), AppError> {
    let config = aws_config::load_from_env().await;
    let client = aws_sdk_s3::Client::new(&config);

    let records = handler(&client).await?;
    println!("{:?}", records);
    Ok(())
}

実行

以下のコマンドで実行します。

AWS_PROFILE=test-user \
BUCKET_NAME=作成されたBucket名 \
OBJECT_KEY=test.jsonl \
cargo run

以下のような出力がされれば成功です。

[Record { id: 1, name: "aaaa" }, Record { id: 2, name: "bbbb" }, Record { id: 3, name: "cccc" }, Record { id: 4, name: "dddd" }]

WHERE句も試してみます。
クエリは55行目付近のexpressionを変更します。

  • idが1のレコードを取得
SELECT * FROM s3object s WHERE s.id = 1
[Record { id: 1, name: "aaaa" }]
  • idが1より大きいレコードを取得
SELECT * FROM s3object s WHERE s.id > 1
[Record { id: 2, name: "bbbb" }, Record { id: 3, name: "cccc" }, Record { id: 4, name: "dddd" }]
  • nameにbが含まれるレコードを取得
SELECT * FROM s3object s WHERE s.name LIKE '%b%'
[Record { id: 2, name: "bbbb" }]

読んでいただきありがとうございました。

Discussion