Amazon SQS を AWS SDK for Rust で操作してみる
記事の概要
こんにちは!
ファスト株式会社の yutak です。
今回はAWSが提供しているマネージド型メッセージキューサービスのAmazon SQSをRustから操作していきたいと思います。
クレートに関しては、AWSが提供しているAWS SDK for Rustを利用していきます。
前提条件
前提条件は以下です。
- Rustとcargoがインストールされていること
- AWSのアクセスキーやシークレットなどのクレデンシャルが作成されていること
動作環境
今回は、以下の環境で動作させました。
- Rustバージョン: 1.81.0
利用するクレートは以下です。
- aws-sdk-sqs: v1.43.0
- aws-config: 1.5.6
- tokio: 1.40.0
SQSに対して操作すること
今回、SQSに対して操作することは以下です。
- キューの作成
- メッセージの送受信
- メッセージの削除
事前準備
SQSをRustから利用するための準備をします。
1. プロジェクト作成
cargo new
でプロジェクトを作成していきます。
cargo new sqs-playground
プロジェクト名は sqs-playground
としています。
2. クレートの追加
利用するクレートを追加します。
Cargo.tomlのdependenciesが以下のようになっていればOKです。
[dependencies]
aws-config = { version = "1.5.6", features = ["behavior-version-latest"] }
aws-sdk-sqs = "1.43.0"
dotenvy = "0.15.7"
tokio = { version = "1.40.0", features = ["full"] }
それぞれのクレートの説明を簡単にします。
aws-config
AWS SDKの設定を管理するためのクレートです。リージョンやクレデンシャルの設定に使用します。
aws-sdk-sqs
Amazon SQSのAPIを使用するための特定のSDKクレートです。
tokio
非同期ランタイムを提供するクレートです。
tokioが今のところデファクトスタンダードとなっていると認識しています。
クレートが必要な理由としては、Rustは実行時の非同期ランタイムが組み込まれていません。
AWS SDK for Rustはasync/awaitベースの非同期APIを使用しているため、非同期ランタイムを提供するtokioが必要になってきます。
今回は、features = ["full"]を指定することで、tokioの全機能を使用可能にしています。
3. AWSの情報をプロジェクトに追加
今回は、.env
ファイルを作成して、そこにAWSのアクセスキーやシークレット、リージョンを設定します。
AWS_ACCESS_KEY_ID=<AWS_ACCESS_KEY_IDをいれます>
AWS_SECRET_ACCESS_KEY=<AWS_SECRET_ACCESS_KEYをいれます>
AWS_REGION=<AWS_ACCESS_KEY_IDをいれます>
実際にSQSを操作していきます。
キューの作成
実装
キューの作成は以下のように行えます。
use aws_sdk_sqs::Client;
#[tokio::main]
async fn main() {
// 環境変数読み込み
dotenvy::dotenv().expect("Failed to load env");
let sdk_config = aws_config::load_from_env().await;
// SQS clientの作成
let sqs_client = Client::new(&sdk_config);
let queue_name = "create-queue-test-from-rs";
let created_queue = sqs_client.create_queue()
.queue_name(queue_name)
.send()
.await?;
match created_queue {
Ok(output) => println!("Successfully created queue: {:?}", output),
Err(err) => println!("Failed to created queue: {:?}", err),
}
}
上記のコードでは、
- dotenvyで環境変数を読み込み
- aws_config::load_from_env()でAWS設定を読み込み
- Client::new()でSQSクライアントを作成
- create_queue()メソッドでキューを作成し、結果を処理
をおこなっています。
コード実行
実際に cargo run
でコードを実行してみます。
❯ cargo run
Compiling sqs-playground v0.1.0 (/Users/yutak/workspace/rust_sandbox/sqs-playground)
Finished `dev` profile [unoptimized + debuginfo] target(s) in 2.02s
Running `target/debug/sqs-playground`
CreateQueueOutput { queue_url: Some("https://sqs.ap-northeast-1.amazonaws.com/account_id/create-queue-test-from-rs"), _request_id: Some("request_id") }
キュー作成が成功すると、create_queueメソッドはCreatedQueueOutput構造体返します。
queue_url
フィールドに作成したキューのURLが格納されています。
メッセージの送信
先ほど作成したqueueにメッセージを送信してみます。
実装
// use文は変わらずなので、省略...
#[tokio::main]
async fn main() {
// 環境変数読み込み省略...
// SQS clientの作成は省略...
let queue_url = "https://sqs.ap-northeast-1.amazonaws.com/account_id/create-queue-test-from-rs";
let send_message = sqs_client.send_message()
.queue_url(queue_url)
.message_body("TEST MESSAGE".to_string())
.send()
.await;
match send_message {
Ok(output) => println!("Successfully sent message: {:?}", output),
Err(err) => println!("Failed to send message: {:?}", err),
}
}
send_messageメソッドが提供されているので、このメソッドを利用してキューにメッセージを送信します。
今回のサンプルコードでは実装していませんが、メッセージの属性なども設定することが可能です。
コード実行
❯ cargo run
Finished `dev` profile [unoptimized + debuginfo] target(s) in 0.24s
Running `target/debug/sqs-playground`
SendMessageOutput { md5_of_message_body: Some("uid"), md5_of_message_attributes: None, md5_of_message_system_attributes: None, message_id: Some("uuid"), sequence_number: None, _request_id: Some("uuid") }
SendMessageOutput構造体が返され、その中にmessage_idなどが格納されています。
メッセージ受信
先ほど、送信したメッセージを実際に受信してみます。
実装
// use文は変わらずなので、省略...
#[tokio::main]
async fn main() {
// 環境変数読み込み省略...
// SQS clientの作成は省略...
let queue_url = "https://sqs.ap-northeast-1.amazonaws.com/account_id/create-queue-test-from-rs";
let messages = sqs_client.receive_message()
.queue_url(queue_url)
.send()
.await;
match messages {
Ok(mess) => {
println!("Message received: {:?}", mess);
}
Err(err) => println!("{:?}", err),
}
}
メッセージの受信はreceive_message()メソッドを使います。
queue_url()でurlを指定します。
コード実行
❯ cargo run
Finished `dev` profile [unoptimized + debuginfo] target(s) in 1.32s
Running `target/debug/sqs-playground`
Message received: ReceiveMessageOutput { messages: Some([Message { message_id: Some("message_id"), receipt_handle: Some("receipt_handle"), md5_of_body: Some("md5_of_body"), body: Some("TEST MESSAGE"), attributes: None, md5_of_message_attributes: None, message_attributes: None }]), _request_id: Some("request_id") }
受信に成功すると、ReceiveMessageOutput構造体にmessage_idやbodyなどのフィールドに値が格納されます。
メッセージの内容は、bodyフィールドの値で確認することができます。
body: Some("TEST MESSAGE")
上記の実行結果を抜粋すると、先程送信したメッセージの本文が格納されていることがわかります。
メッセージ削除
最後に、先程送信したメッセージを削除してみます。
実装
// use文は変わらずなので、省略...
#[tokio::main]
async fn main() {
// 環境変数読み込み省略...
// SQS clientの作成は省略...
let queue_url = "https://sqs.ap-northeast-1.amazonaws.com/account_id/create-queue-test-from-rs";
let receipt_handle = "receipt_handle";
let delete_message_result = sqs_client.delete_message()
.queue_url(queue_url)
.receipt_handle(receipt_handle)
.send()
.await;
match delete_message_result {
Ok(res) => println!("Successfully deleted message: {:?}", res),
Err(err) => println!("Failed to delete message: {:?}", err),
}
}
メッセージの削除は、delete_message()メソッドを利用します。
削除メッセージの対象は、receipt_handleを利用して特定します。
コード実行
❯ cargo run
Finished `dev` profile [unoptimized + debuginfo] target(s) in 1.72s
Running `target/debug/sqs-playground`
Successfully deleted message: DeleteMessageOutput { _request_id: Some("request_id") }
メッセージ削除に成功すると、DeleteMessageOutput構造体を返します。
まとめ
AWS SDK for Rust を利用して、SQSへキューの作成やメッセージの送受信、削除を簡単にですがおこないました。
今回は説明しなかったデットレターキューへの操作、可視性タイムアウトやメッセージの属性の設定、エラーハンドリングなどを詳細に行うことでプロダクションでも十分利用できると思います。
RustアプリケーションからSQSを操作してみたいときに参考になりますと幸いです。
Discussion