📖
RustでAWSのDynamoDBで一覧を大量に楽して取得したい
目次
RustでAWSのS3で一覧を大量に楽して取得したいのDynamoDB版です。
DynamoDBもscanする場合大量にデータがある場合はページング処理が必要になります。こちらを楽してストリームで処理する方法を紹介します。
コード
get_stream
肝の部分はこちらになります。into_paginatorとページングをinto_stream_03xでストリーム化をしています。
pub fn get_stream(
&self,
table_name: &str,
filter_expression: Option<String>,
expression_value: Option<HashMap<String, AttributeValue>>,
) -> impl Stream<Item = Result<HashMap<String, AttributeValue>, anyhow::Error>> {
self.client
.scan()
.table_name(table_name)
.set_filter_expression(filter_expression)
.set_expression_attribute_values(expression_value)
.into_paginator()
.items()
.send()
.into_stream_03x()
.err_into()
}
全体
Cargo.toml
[package]
name = "dynamo"
version = "0.1.0"
edition = "2021"
[dependencies]
anyhow = "1"
aws-smithy-types-convert = { version = "0.60.2", features = ["convert-streams"] }
aws-config = { version = "1.1.7", features = ["behavior-version-latest"] }
aws-sdk-dynamodb = "1.16.0"
futures-util = "0.3.30"
tokio = { version = "1", features = ["macros", "rt-multi-thread"] }
dynamo.rs
use std::collections::HashMap;
use aws_config::Region;
use aws_sdk_dynamodb::{
operation::{
create_table::CreateTableOutput, delete_table::DeleteTableOutput, put_item::PutItemOutput,
},
types::{
AttributeDefinition, AttributeValue, KeySchemaElement, KeyType, ProvisionedThroughput,
ScalarAttributeType,
},
};
use aws_smithy_types_convert::stream::PaginationStreamExt;
use futures_util::{Stream, TryStreamExt};
pub struct Client {
client: aws_sdk_dynamodb::client::Client,
}
impl Client {
pub async fn new() -> Self {
std::env::set_var("AWS_ACCESS_KEY_ID", "xxx");
std::env::set_var("AWS_SECRET_ACCESS_KEY", "xxx");
std::env::set_var("AWS_DEFAULT_REGION", "us-east-1");
let config = aws_config::load_from_env().await;
let builder = aws_sdk_dynamodb::config::Builder::from(&config)
.endpoint_url("http://localhost:8000")
.region(Some(Region::from_static("us-east-1")));
Self {
client: aws_sdk_dynamodb::Client::from_conf(builder.build()),
}
}
pub async fn create_table(
&self,
table_name: &str,
key: &str,
) -> anyhow::Result<CreateTableOutput> {
let ad = AttributeDefinition::builder()
.attribute_name(key)
.attribute_type(ScalarAttributeType::S)
.build()?;
let ks = KeySchemaElement::builder()
.attribute_name(key)
.key_type(KeyType::Hash)
.build()?;
let pt = ProvisionedThroughput::builder()
.read_capacity_units(1)
.write_capacity_units(1)
.build()?;
self.client
.create_table()
.table_name(table_name)
.set_key_schema(Some(vec![ks]))
.set_attribute_definitions(Some(vec![ad]))
.set_provisioned_throughput(Some(pt))
.send()
.await
.map_err(|e| e.into())
}
pub async fn delete_table(&self, table_name: &str) -> anyhow::Result<DeleteTableOutput> {
self.client
.delete_table()
.table_name(table_name)
.send()
.await
.map_err(|e| e.into())
}
pub async fn create(
&self,
table_name: &str,
data: HashMap<String, AttributeValue>,
) -> anyhow::Result<PutItemOutput> {
self.client
.put_item()
.table_name(table_name)
.set_item(Some(data))
.send()
.await
.map_err(|e| e.into())
}
pub fn get_stream(
&self,
table_name: &str,
filter_expression: Option<String>,
expression_value: Option<HashMap<String, AttributeValue>>,
) -> impl Stream<Item = Result<HashMap<String, AttributeValue>, anyhow::Error>> {
self.client
.scan()
.table_name(table_name)
.set_filter_expression(filter_expression)
.set_expression_attribute_values(expression_value)
.into_paginator()
.items()
.send()
.into_stream_03x()
.err_into()
}
}
main.rs
use aws_sdk_dynamodb::types::AttributeValue;
use dynamo::Client;
use futures_util::TryStreamExt;
use std::collections::HashMap;
pub mod dynamo;
const TABLE_NAME: &str = "test_table";
#[tokio::main]
async fn main() -> anyhow::Result<()> {
let client = Client::new().await;
let _ = client.create_table(TABLE_NAME, "id").await?;
for i in 1..1100 {
let mut map = HashMap::new();
map.insert("id".to_string(), AttributeValue::S(i.to_string()));
map.insert("value".to_string(), AttributeValue::S(i.to_string()));
let _ = client.create(TABLE_NAME, map).await?;
}
let mut stream = client.get_stream(TABLE_NAME, None, None);
while let Some(item) = stream.try_next().await? {
println!("{:?}", item);
}
let _ = client.delete_table(TABLE_NAME).await?;
Ok(())
}
まとめ
コードはこちらになります。
Discussion