💬
RabbitMQチュートリアル:作業キュー (Task Queues) その2
Work Queues
チュートリアルその1ではシンプルな送信、受信のサンプルでした。次は複数のタスクを複数のコンシューマーで処理する例を見てみましょう。
プロデューサーとキューとコンシューマ
- プロデューサーは、メッセージを送信するアプリケーションです。
- キューは、プロデューサーからコンシューマーにメッセージが移動する際に、一時的にメッセージを保存する場所です。キューは、プロデューサーからのメッセージを保持し、コンシューマーに配信します。コンシューマーが応答を送信しない場合、キューは応答を受信するまでメッセージを保持します
- コンシューマ はキューからメッセージを受け取り、処理を行います。
キューを使うメリット
ワークキューを利用することで、リソースを大量に消費するタスクをすぐに実行するのを避け、後で完了するようにスケジュールすることができるようになります。
各タスクをメッセージとしてパッケージ化し、キューに送信します。バックグラウンドで実行されているワーカープロセスがタスクを取得して実行します。複数のワーカーがいる場合、タスクはそれらの間で分散されます。
プロデューサーの実装
プロデューサはメッセージ(タスク)をキューに送信します。
using RabbitMQ.Client;
using System.Text;
var factory = new ConnectionFactory { HostName = "localhost" };
using var connection = await factory.CreateConnectionAsync();
using var channel = await connection.CreateChannelAsync();
await channel.QueueDeclareAsync(queue: "task_queue", durable: true, exclusive: false,
autoDelete: false, arguments: null);
var message = GetMessage(args);
var body = Encoding.UTF8.GetBytes(message);
var properties = new BasicProperties
{
Persistent = true
};
await channel.BasicPublishAsync(exchange: string.Empty, routingKey: "task_queue", mandatory: true,
basicProperties: properties, body: body);
Console.WriteLine($" [x] Sent {message}");
static string GetMessage(string[] args)
{
return ((args.Length > 0) ? string.Join(" ", args) : "Hello World!");
}
ポイント解説
- キューの宣言: QueueDeclareメソッドでtask_queueという名前のキューを作成。
- durable: QueueDeclareAsynに渡しているdurableがtrueを指定。このことによりキューが永続化され、サーバーが再起動した後でもキューが再現されます。
- Persistent: BasicPublishAsyncメソッドに渡しているPropetiesにPersistent=trueを指定、これによりメッセージが永続化されます。
ワーカーの実装
using RabbitMQ.Client;
using RabbitMQ.Client.Events;
using System.Text;
var factory = new ConnectionFactory { HostName = "localhost" };
using var connection = await factory.CreateConnectionAsync();
using var channel = await connection.CreateChannelAsync();
await channel.QueueDeclareAsync(queue: "task_queue", durable: true, exclusive: false,
autoDelete: false, arguments: null);
await channel.BasicQosAsync(prefetchSize: 0, prefetchCount: 1, global: false);
Console.WriteLine(" [*] Waiting for messages.");
var consumer = new AsyncEventingBasicConsumer(channel);
consumer.ReceivedAsync += async (model, ea) =>
{
byte[] body = ea.Body.ToArray();
var message = Encoding.UTF8.GetString(body);
Console.WriteLine($" [x] Received {message}");
int dots = message.Split('.').Length - 1;
await Task.Delay(dots * 1000);
Console.WriteLine(" [x] Done");
// here channel could also be accessed as ((AsyncEventingBasicConsumer)sender).Channel
await channel.BasicAckAsync(deliveryTag: ea.DeliveryTag, multiple: false);
};
await channel.BasicConsumeAsync("task_queue", autoAck: false, consumer: consumer);
Console.WriteLine(" Press [enter] to exit.");
Console.ReadLine();
ポイント解説
- BasicQos: BasicQoc(Quality of Service)でコンシューマが一度に扱えるメッセージの数を1つに設定
- autoAck: コンシューマを実行する際の引数のautoAckにfalseを指定して、Acknowledgement(確認応答)の送信を手動に指定
Acknowledgement(確認応答)について
RabbitMQでは、メッセージをキューから取り出して処理が完了したことをRabbitMQに通知する必要があります。この通知を「Acknowledgement」(確認応答)と呼びます。確認応答を使用することで、次のようなメリットがあります
- メッセージの損失防止
ワーカーがメッセージを処理中にクラッシュしても、確認応答が送られていなければ、RabbitMQはそのメッセージを未処理とみなし、別のワーカーに再送信します。 - 確実なメッセージ処理
メッセージが処理完了したことをRabbitMQに通知することで、正常に処理されたメッセージのみがキューから削除されます。
autoAckの挙動の違い
自動応答(autoAck: true):
メッセージを受け取ると即座にRabbitMQに完了通知を送信。
ワーカーが処理に失敗してもメッセージが失われる可能性あり。
手動応答(autoAck: false):
確実に処理が完了した後で応答を送信。
メッセージが確実に処理される。
ディスパッチ方式の違い
ディスパッチの課題
デフォルトでは、RabbitMQはメッセージをラウンドロビン方式でコンシューマーに配信します。ラウンドロビン方式では、常にコンシューマが決められた順に実行されます。負荷の重いコンシューマがあると後続のコンシューマに待機時間が発生することになります。
フェアディスパッチの仕組み
フェアディスパッチは、RabbitMQが「コンシューマーが処理可能になるまで新しいタスクを送らない」設定を有効にするものです。この設定を行うことで、各ワーカーが効率的にタスクを処理できるようになります。
Discussion