⚙️
[Rust] AWS S3からShift-JIS形式のCSVを読み取る
概要
- 数100MB以上を想定
- Shift-JIS
- ヘッダーが日本語
上記のCSVをS3の特定バケットに保管しているので、SDKを利用して取得します。
コード
Cargo.toml
Cargo.toml
[dependencies]
aws-config = "0.54.1"
aws-sdk-s3 = "0.24.0"
csv-async = { version = "1.2.5", features = ["tokio"] }
encoding_rs = "0.8.32"
serde = "1.0.152"
tokio = { version = "1", features = ["full"] }
S3からオブジェクトを取得する
まずはS3接続のためにaws_sdk_s3::Client
を用意します。
async fn create_client() -> aws_sdk_s3::Client {
let config = aws_config::from_env()
.region(Region::new("us-east-1")) // 適宜変更
.load()
.await;
aws_sdk_s3::Client::new(&config)
}
次に、S3の特定バケットからオブジェクト一覧を取得します。
use aws_sdk_s3::model::Object;
async fn get_objects(client: &aws_sdk_s3::Client) -> Vec<Object> {
let output = client
.list_objects_v2()
.bucket("sample-bucket") // 適宜変更
.send()
.await
.unwrap();
output.contents().unwrap().to_vec()
}
aws_sdk_s3::model::Object
から取得できるkeyを利用して、オブジェクトの中身(CSV)を取得します。
async fn download_object(client: &Client, key: &str) -> GetObjectOutput {
client
.get_object()
.bucket("sample-bucket") // 適宜変更
.key(key)
.send()
.await
.unwrap()
}
バケット内にあるCSVを順次処理する準備が整いました。
async fn run() {
let client = aws::create_client().await;
let objects = aws::get_objects(&client).await;
for object in objects {
let key = object.key.unwrap();
let object = aws::download_object(&client, &key).await;
if object.content_type() == Some("text/csv") {
// CSVデータに対する処理(以下に記載)
}
}
}
CSVを処理する
実際にCSVを読み取り構造体に変換していきます。
まずは、最終目標の構造体を用意します。
use serde::Deserialize;
#[derive(Deserialize, Debug)]
struct ProductClass {
#[serde(rename = "品番")]
id: String,
#[serde(rename = "商品名")]
name: String,
#[serde(rename = "表示用商品名")]
display: String,
}
今回の例では、CSVのヘッダーが日本語なので、serdeに名前を別途設定しています。
CSVを構造体に変換します。
use aws_sdk_s3::output::GetObjectOutput;
use csv_async::{ByteRecord, StringRecord};
use encoding_rs;
async fn read_csv(obj: GetObjectOutput) -> Vec<ProductClass> {
let mut rdr = csv_async::AsyncDeserializer::from_reader(obj.body.into_async_read());
let mut record = ByteRecord::new();
let headers = rdr.byte_headers().await.unwrap().decoded_string_record();
let mut products: Vec<ProductClass> = Vec::new();
while rdr.read_byte_record(&mut record).await.unwrap() {
let row: ProductClass = record
.decoded_string_record()
.deserialize(Some(&headers))
.unwrap();
products.push(row);
}
products
}
trait DecodeByteRecord {
fn decoded_string_record(&self) -> StringRecord;
}
impl DecodeByteRecord for ByteRecord {
fn decoded_string_record(&self) -> StringRecord {
let res: Vec<String> = self
.iter()
.map(|r| encoding_rs::SHIFT_JIS.decode(r).0.to_string())
.collect();
StringRecord::from(res)
}
}
簡単な解説
CSVファイルの容量が大きいので、1行ごとにバイトデータを処理します。
また、Shift-JISをそのままRustの文字列に取り込むとエラーが発生するので、
一旦バイトデータをそのまま取得後、encoding_rsにて変換します。
csv_asyncのデシリアライズメソッドにヘッダーを渡すことで、
先ほどserdeに設定した名前と同列のデータを、構造体の該当フィールドにセットしてくれます。
let row: ProductClass = record
.decoded_string_record()
.deserialize(Some(&headers))
.unwrap();
参考
Discussion