⚙️

[Rust] AWS S3からShift-JIS形式のCSVを読み取る

2023/02/22に公開

概要

  • 数100MB以上を想定
  • Shift-JIS
  • ヘッダーが日本語

上記のCSVをS3の特定バケットに保管しているので、SDKを利用して取得します。
https://github.com/awslabs/aws-sdk-rust

コード

Cargo.toml

Cargo.toml
[dependencies]
aws-config = "0.54.1"
aws-sdk-s3 = "0.24.0"
csv-async = { version = "1.2.5", features = ["tokio"] }
encoding_rs = "0.8.32"
serde = "1.0.152"
tokio = { version = "1", features = ["full"] }

S3からオブジェクトを取得する

まずはS3接続のためにaws_sdk_s3::Clientを用意します。

async fn create_client() -> aws_sdk_s3::Client {
    let config = aws_config::from_env()
        .region(Region::new("us-east-1")) // 適宜変更
        .load()
        .await;
    aws_sdk_s3::Client::new(&config)
}

次に、S3の特定バケットからオブジェクト一覧を取得します。

use aws_sdk_s3::model::Object;

async fn get_objects(client: &aws_sdk_s3::Client) -> Vec<Object> {
    let output = client
        .list_objects_v2()
        .bucket("sample-bucket") // 適宜変更
        .send()
        .await
        .unwrap();
    output.contents().unwrap().to_vec()
}

aws_sdk_s3::model::Objectから取得できるkeyを利用して、オブジェクトの中身(CSV)を取得します。

async fn download_object(client: &Client, key: &str) -> GetObjectOutput {
    client
        .get_object()
        .bucket("sample-bucket") // 適宜変更
        .key(key)
        .send()
        .await
        .unwrap()
}

バケット内にあるCSVを順次処理する準備が整いました。

async fn run() {
    let client = aws::create_client().await;
    let objects = aws::get_objects(&client).await;
    for object in objects {
        let key = object.key.unwrap();
        let object = aws::download_object(&client, &key).await;
        if object.content_type() == Some("text/csv") {
            // CSVデータに対する処理(以下に記載)
        }
    }
}

CSVを処理する

実際にCSVを読み取り構造体に変換していきます。
まずは、最終目標の構造体を用意します。

use serde::Deserialize;

#[derive(Deserialize, Debug)]
struct ProductClass {
    #[serde(rename = "品番")]
    id: String,
    #[serde(rename = "商品名")]
    name: String,
    #[serde(rename = "表示用商品名")]
    display: String,
}

今回の例では、CSVのヘッダーが日本語なので、serdeに名前を別途設定しています。

CSVを構造体に変換します。

use aws_sdk_s3::output::GetObjectOutput;
use csv_async::{ByteRecord, StringRecord};
use encoding_rs;

async fn read_csv(obj: GetObjectOutput) -> Vec<ProductClass> {
    let mut rdr = csv_async::AsyncDeserializer::from_reader(obj.body.into_async_read());
    let mut record = ByteRecord::new();
    let headers = rdr.byte_headers().await.unwrap().decoded_string_record();
    let mut products: Vec<ProductClass> = Vec::new();
    while rdr.read_byte_record(&mut record).await.unwrap() {
        let row: ProductClass = record
            .decoded_string_record()
            .deserialize(Some(&headers))
            .unwrap();
        products.push(row);
    }
    products
}

trait DecodeByteRecord {
    fn decoded_string_record(&self) -> StringRecord;
}

impl DecodeByteRecord for ByteRecord {
    fn decoded_string_record(&self) -> StringRecord {
        let res: Vec<String> = self
            .iter()
            .map(|r| encoding_rs::SHIFT_JIS.decode(r).0.to_string())
            .collect();
        StringRecord::from(res)
    }
}

簡単な解説
CSVファイルの容量が大きいので、1行ごとにバイトデータを処理します。
また、Shift-JISをそのままRustの文字列に取り込むとエラーが発生するので、
一旦バイトデータをそのまま取得後、encoding_rsにて変換します。

csv_asyncのデシリアライズメソッドにヘッダーを渡すことで、
先ほどserdeに設定した名前と同列のデータを、構造体の該当フィールドにセットしてくれます。

let row: ProductClass = record
            .decoded_string_record()
            .deserialize(Some(&headers))
            .unwrap();

https://docs.rs/csv-async/latest/csv_async/

参考

https://users.rust-lang.org/t/stream-csv-file-from-hyper-body-into-deserializer/54565/1

Discussion