📝

Elastic Beanstalk でチュートリアルレベルのワーカー環境を作成してみた

に公開

ドキュメントにはワーカー環境を作成できるサンプルアプリがなさそうだったので作ってみました。

ワーカー環境とは

Elastic Beanstalk worker environments - AWS Elastic Beanstalk

If your AWS Elastic Beanstalk application performs operations or workflows that take a long time to complete, you can offload those tasks to a dedicated worker environment. Decoupling your web application front end from a process that performs blocking operations is a common way to ensure that your application stays responsive under load.

ウェブアプリケーション側で処理するには時間がかかる処理をワーカー環境に任せて、ウェブアプリケーション側とワーカー環境で処理を分担できる機能です。
ウェブアプリケーション側からは SQS キューにメッセージを送信し、ワーカー環境側は SQS キューからメッセージを取得して処理します。

今回はコンソール上からワーカー環境の作成と SQS キューへのメッセージ送信をやってみました。

アプリケーションの作成

今回使用するアプリケーションの構造は以下の通りです。

nodejs-worker-app/
├── node_modules/
├── index.js
├── package-lock.json
├── package.json
└── Procfile

アプリケーション作成手順

まずはアプリケーション用のディレクトリを作成して Node.js アプリの初期化やライブラリのインストールを実行します。

$ mkdir nodejs-worker-app: cd nodejs-worker-app

$ npm init -y
$ npm install express body-parser

続いて以下の内容で index.js ファイルを作成します。

index.js
const express = require('express');
const bodyParser = require('body-parser');
const app = express();
const port = process.env.PORT || 8080;

app.use(bodyParser.json());

app.post('/', (req, res) => {
  console.log('Received message from SQS:', req.body);
  res.status(200).send('Message processed');
});

app.listen(port, () => {
  console.log(`Worker running on port ${port}`);
});

Elastic Beanstalk のワーカー環境ではデーモンプロセスが実行されています。
デーモンは SQS キューからメッセージを取得し、ローカルポート 80 の http://localhost/ に POST メソッドでメッセージを送信します。
上記 index.js では POST メソッドに送信されたメッセージを取得してログ出力しています。

続いて、以下の内容で Procfile ファイルを作成します。

Procfile
web: node index.js

Procfile ではアプリケーションを起動するコマンドを指定できます。
また、長時間継続的に実行されるアプリケーションプロセスには Procfile を使用することが推奨されています。

Configuring custom start commands with a Procfile on Elastic Beanstalk - AWS Elastic Beanstalk

You can include a file that's called Procfile at the root of your source bundle to specify the command that starts your application.

Buildfile and Procfile - AWS Elastic Beanstalk

Use a Procfile for long-running application processes that shouldn't exit.

上記のディレクトリおよびファイルを zip 化します。
ただし、zip 化する際にルートディレクトリである nodejs-worker-app は含まないように zip 化する必要があります。
Create an Elastic Beanstalk application source bundle - AWS Elastic Beanstalk

Not include a parent folder or top-level directory (subdirectories are fine)

$ zip -r nodejs-worker-app.zip ./*

Elastic Beanstalk 環境へのデプロイ

Elastic Beanstalk コンソールから上記アプリケーションをデプロイします。
環境枠はワーカー環境を選択してください。

プラットフォームは Node.js の最新バージョンを選択しました。

アプリケーションコードはローカル環境で作成した上記の zip ファイルをアップロードします。
バージョンラベルには任意の値を入力してください。

プリセットは「単一インスタンス (無料利用枠の対象)」を選択しました。

その他の設定はほとんどデフォルト値ですが、「更新、モニタリング、ログ記録を設定 - オプション」で「CloudWatch ログへのインスタンスログのストリーミング」を有効化します。

CloudWatch Logs へのログストリーミングを有効化することで CloudWatch コンソールからログを確認できます。

以上の設定で環境を作成します。
数分でデプロイが完了します。

動作確認

デフォルト設定ではデプロイ中にワーカー用の SQS キューが自動的に作成されます。

ワーカーキューのリンクをクリックすると SQS コンソールに移動できます。
SQS コンソールの「メッセージを送受信」からメッセージを送信してみます。

今回のアプリケーションでは JSON 形式でメッセージを受信する想定なので、以下のような JSON 形式のメッセージを送信します。

{
  "key": "value"
}

メッセージを送信後、CloudWatch コンソールからロググループを選択し、末尾が「web.stdout.log」のロググループを検索します。

ログストリームをクリックして、ログに上記 JSON が出力されていればワーカー環境でメッセージを処理できています。

May  4 13:20:54 ip-172-31-41-83 web[2296]: Received message from SQS: { key: 'value' }

また、末尾が「nginx/access.log」を確認すると以下のように POST リクエストを受信している記録も確認できます。

127.0.0.1 - - [04/May/2025:13:20:54 +0000] "POST / HTTP/1.1" 200 17 "-" "aws-sqsd/3.0.4" "-"

ワーカー環境ではワーカーキューのほかにデッドレターキューも作成されており、ワーカーキューでの処理が失敗したメッセージはデッドレターキューに移動します。

デフォルト設定ではワーカーキューの最大再試行回数は 10 回なので 10 回までメッセージの取得がリトライされますが、10 回以上失敗した場合にはデッドレターキューに送信されます。
Configure a dead-letter queue using the Amazon SQS console - Amazon Simple Queue Service

Set the Maximum receives value, which defines how many times a message can be received before being sent to the dead-letter queue (valid range: 1 to 1,000).

ただし、デフォルトではデッドレターキューからメッセージを受信する Lambda 関数やコンシューマーアプリケーションは設定されません。
そのため、デッドレターキューからメッセージを受信して再試行するロジックについては独自に実装が必要です。

デッドレターキューにメッセージを移動してみる

デッドレターキューにメッセージを移動して Lambda 関数で失敗したメッセージを出力するロジックを追加してみます。

まず、Elastic Beanstalk の設定からワーカーの最大再試行回数を 1 、エラー可視性タイムアウトを 10 秒に設定して、ワーカーが処理に失敗したメッセージをすぐにデッドレターキューに送信するようにします。

次に、以下のコードで Node.js の Lambda 関数を作成します。

index.js
export const handler = async (event) => {
  // TODO implement
  console.log(event,null,2)
  const response = {
    statusCode: 200,
    body: JSON.stringify('Hello from Lambda!'),
  };
  return response;
};

最後に SQS キューのデッドレターキューの Lambda トリガーから上記 Lambda 関数を設定します。

デッドレターキューの動作確認

ワーカーキューに JSON 形式ではない test という文字列を送信してみます。

web.stdout.log を確認するとエラーになっていることが確認できます。

SyntaxError: Unexpected token 't', "test" is not valid JSON

nginx/access.log には 400 エラーが記録されています。

127.0.0.1 - - [04/May/2025:13:35:16 +0000] "POST / HTTP/1.1" 400 1027 "-" "aws-sqsd/3.0.4" "-"

1 分ほど経過するとデッドレターキューにメッセージが送信され、Lambda の実行ログに処理に失敗したメッセージが記録されます。

Records: [
    {
      messageId: 'efaba87e-505b-482c-95a0-d41f59700db7',
      receiptHandle: 'AQEBRA/a4zLLxWFmGXqy/4MVjW+6z+660rF2db8Xo0DBSvh2qHrzyR4Jx/bLW2Ez1GQlP6jBzCCwtgfnOBzVol4x0iBNdWnLUnR+QccNrGELf2WXfcCqveJx7AJgKFPQaddVgoVZqyMnRlT5jv/JC2MbHjKmNDpdid6Rddpc9gF/2wdmqiQnrqoiLUnGqNurs4vYAAod7pw8dGrL/fR3cUxBpyfkcCDarBVBEf+DkafLDdot4+Oq/nKxA+XsXRjEFaqMxQVTm0o04zDyA3AP/tt92YFLVhU4Ip6lLcvF5lNrymRBSGzGn1ESmLPzd5Oe3NC99f64R/7VK4qtFweYdWS/Sz1I4wf+IgXegRKKQjYdocS5MNXyv5/7JI4A7kqzaVzEaUq+KRiBMJyesZwHLdNdc7WspXvaQtJAXPHQ9hVSd6auxxqHXq3UUtGtExEqZBqFYQA2Z//Afj+vF+0qgBxL7g==',
      body: 'test',
      attributes: [Object],
      messageAttributes: {},
      md5OfBody: '098f6bcd4621d373cade4e832627b4f6',
      eventSource: 'aws:sqs',
      eventSourceARN: 'arn:aws:sqs:ap-northeast-1:012345678901:awseb-e-2d8yvrzymb-stack-AWSEBWorkerDeadLetterQueue-RomF1niKjACY',
      awsRegion: 'ap-northeast-1'
    }
  ]

今回は失敗したメッセージをログ出力するだけでしたが、必要に応じて再度ワーカーキューにメッセージを送信してリトライするというロジックも考えられます。

まとめ

今回は Elastic Beanstalk でチュートリアルレベルのワーカー環境を作成してみました。
どなたかの参考になれば幸いです。

参考資料

Discussion