🦀

Amazon ECS コンテナから AWS SDK for Rust で Amazon SQS キューをポーリングする

2022/02/11に公開

はじめに

Amazon ECS サービスを Amazon SQS の残メッセージ数でスケールさせる構成を検証する機会がありました。コンテナ内のアプリケーションは去年 Developer Preview になった Rust SDK で書いてみました。以前試した時は動かすだけでいっぱいいっぱいだったのですが、今回はもう少しちゃんとしようということでユニットテストを導入しました。ソースコードはこちら
※ Rust SDK は Developer Preview なので本番導入は控えておいた方がいいでしょう。

やってみる

以前書いた AWS CDK の歩き方と途中まで同じです。今回は ECS Service は一つだけでスタックを分けるとかえって見通しが悪くなりそうなので、一つのスタックにどんどんリソースを定義していくことにします。

プロジェクトの初期化

$ npm -g install aws-cdk@2.0.0
$ mkdir fargate-sqs-scaling
$ cd fargate-sqs-scaling
$ cdk init --language typescript
...
✅ All done!

Auto Scaling の設定

ECS Task のスケーリング設定はドキュメントのこのページで解説されています。 FargateService なり Ec2Service なりから autoScaleTaskCount を呼ぶことで設定できるようです。 autoScaleTaskCount ScalableTaskCount を返し、このクラスに定義された scaleOnXXX メソッドを呼ぶことでタスク数を増減できます(たとえば scaleOnCpuUtilization だと CPU 使用率に応じてスケールさせることができます)。今回は SQS のメッセージ数に応じてスケールさせるので、 scaleOnMetric メソッドを使ってメトリクスには queue.metricApproximateNumberOfMessagesVisible() を指定します。

Rust SDK

去年 Rust SDK が Developer Preview で発表された時に記事を書いたので基本的な部分はそちらに譲るとして、ここではユニットテストをどうするか見て行きます。実は issue が立っていて、そこに書いているように aws_smithy_client::test_connection::TestConnection を使って http のリクエストとレスポンスをモックすればいいです。注意として test-util feature flag を有効にしないといけませんでした。ということで Cargo.toml はこんな感じになります。

[dependencies]
ctrlc = "3.2.0"
aws-config = "0.3.0"
aws-sdk-sqs = "0.3.0"
aws-smithy-client = { version = "0.33.1", features = ["test-util"]}
aws-smithy-http = "0.33.1"
http = "0.2.6"
tokio = { version = "1", features = ["full"] }

そしてモックしたクライアントを返す関数は以下のようにかけます。 /data/receive_message.xml には SQS の API レファレンス の Sample Response を書いています。

fn mock_client() -> Client {
    let creds = Credentials::new(
        "TESTCLIENT",
        "testsecretkey",
        Some("testsessiontoken".to_string()),
        None,
        "mock",
    );
    let config = Config::builder()
        .credentials_provider(creds)
        .region(Region::new("us-east-1"))
        .build();
    let data =
        fs::read_to_string("./data/receive_message.xml").expect("Failed to read mock response");
    let conn = TestConnection::new(vec![(
        http::Request::builder()
            .uri(Uri::from_static("https://sqs.us-east-1.amazonaws.com/"))
            .body(SdkBody::from(r#"{"NumberOfBytes":64}"#))
            .unwrap(),
        http::Response::builder()
            .status(http::StatusCode::from_u16(200).unwrap())
            .body(SdkBody::from(data))
            .unwrap(),
    )]);
    let conn = DynConnector::new(conn);
    let client = Client::from_conf_conn(config, conn);
    client
}

SQS からメッセージを受信する関数にこのモックしたクライアントを渡してテストできます。

#[cfg(test)]
mod tests {
    use super::*;
    #[test]
    fn test_receive_message() {
        env::set_var("SQS_URL", "TestSqsUrl");
        let sqs_url = get_sqs_url();
        let client = mock_client();
        let messages = get_messages(sqs_url, client);
        let body = extract_body(&messages.unwrap()[0]);
        assert_eq!(body, "This is a test message".to_string());
    }
}

CDK のコード

CDK のコードは普通ですね。以前の CDK で ECS 環境を作った記事とあんま変わらなくて、スタックひとつにまとめて SQS キューと AutoScaling の設定を追加したのが差分です。

import { Stack, StackProps, Duration, RemovalPolicy, Tags } from 'aws-cdk-lib';
import { Construct } from 'constructs';
import * as sqs from 'aws-cdk-lib/aws-sqs';
import {
    aws_ec2 as ec2,
    aws_ecs as ecs,
    aws_iam as iam,
    aws_logs as logs,
} from 'aws-cdk-lib';

export class EcsSqsScalingStack extends Stack {
    constructor(scope: Construct, id: string, props?: StackProps) {
        super(scope, id, props);

        const queue = new sqs.Queue(this, 'EcsSqsScalingQueue', {
            visibilityTimeout: Duration.seconds(300),
            removalPolicy: RemovalPolicy.DESTROY,
        });

        // ECS Cluster
        const vpc = new ec2.Vpc(this, 'VPC', {});
        Tags.of(vpc).add('Name', 'SQSVPC');

        const cluster = new ecs.Cluster(this, 'Cluster', {
            vpc: vpc,
        });

        const ECSExecPolicyStatement = new iam.PolicyStatement({
            sid: 'allowECSExec',
            resources: ['*'],
            actions: [
                'ssmmessages:CreateControlChannel',
                'ssmmessages:CreateDataChannel',
                'ssmmessages:OpenControlChannel',
                'ssmmessages:OpenDataChannel',
                'logs:CreateLogStream',
                'logs:DescribeLogGroups',
                'logs:DescribeLogStreams',
                'logs:PutLogEvents',
            ],
        });

        const taskRole = new iam.Role(this, 'TaskRole', {
            assumedBy: new iam.ServicePrincipal('ecs-tasks.amazonaws.com'),
            managedPolicies: [
                {
                    managedPolicyArn:
                        'arn:aws:iam::aws:policy/AmazonSQSFullAccess',
                },
            ],
        });
        taskRole.addToPolicy(ECSExecPolicyStatement);

        const taskExecutionRole = new iam.Role(this, 'TaskExecutionRole', {
            assumedBy: new iam.ServicePrincipal('ecs-tasks.amazonaws.com'),
            managedPolicies: [
                {
                    managedPolicyArn:
                        'arn:aws:iam::aws:policy/service-role/AmazonECSTaskExecutionRolePolicy',
                },
            ],
        });

        const logGroup = new logs.LogGroup(this, 'LogGroup', {
            logGroupName: 'cdk-sqs-scaling',
            removalPolicy: RemovalPolicy.DESTROY,
        });

        const taskDefinition = new ecs.FargateTaskDefinition(this, 'TaskDef', {
            memoryLimitMiB: 512,
            cpu: 256,
            executionRole: taskExecutionRole,
            taskRole: taskRole,
        });

        const image = new ecs.AssetImage('image');

        taskDefinition.addContainer('Queue-consumer', {
            image: image,
            environment: {
                SQS_URL: queue.queueUrl,
            },
            logging: ecs.LogDriver.awsLogs({
                streamPrefix: 'rust-queue-consumer',
                logGroup: logGroup,
            }),
        });

        const service = new ecs.FargateService(this, 'Service', {
            cluster: cluster,
            assignPublicIp: false,
            taskDefinition: taskDefinition,
            enableExecuteCommand: true,
        });

        const scaling = service.autoScaleTaskCount({
            maxCapacity: 5,
            minCapacity: 1,
        });

        scaling.scaleOnMetric('QueueDepthScaling', {
            metric: queue.metricApproximateNumberOfMessagesVisible(),
            scalingSteps: [
                {
                    change: 2,
                    lower: 4,
                },
                {
                    change: -2,
                    upper: 3,
                },
            ],
        });
    }
}

デプロイして動作確認

cdk deploy して作成される SQS キューにメッセージをいくつか送信してみてください。メッセージ数に応じて ECS タスクがスケールするのが確認できます。

おわりに

去年触った時はユニットテストの書き方がわからずに四苦八苦してましたが、これでとりあえず動くユニットテストは書けるようになったのでこれから検証する時は Rust SDK を使っていきたいです。

Discussion