💬

RabbitMQチュートリアル:作業キュー (Task Queues) その2

に公開

Work Queues

チュートリアルその1ではシンプルな送信、受信のサンプルでした。次は複数のタスクを複数のコンシューマーで処理する例を見てみましょう。

プロデューサーとキューとコンシューマ

  • プロデューサーは、メッセージを送信するアプリケーションです。
  • キューは、プロデューサーからコンシューマーにメッセージが移動する際に、一時的にメッセージを保存する場所です。キューは、プロデューサーからのメッセージを保持し、コンシューマーに配信します。コンシューマーが応答を送信しない場合、キューは応答を受信するまでメッセージを保持します
  • コンシューマ はキューからメッセージを受け取り、処理を行います。

キューを使うメリット

ワークキューを利用することで、リソースを大量に消費するタスクをすぐに実行するのを避け、後で完了するようにスケジュールすることができるようになります。
各タスクをメッセージとしてパッケージ化し、キューに送信します。バックグラウンドで実行されているワーカープロセスがタスクを取得して実行します。複数のワーカーがいる場合、タスクはそれらの間で分散されます。

プロデューサーの実装

プロデューサはメッセージ(タスク)をキューに送信します。

using RabbitMQ.Client;
using System.Text;

var factory = new ConnectionFactory { HostName = "localhost" };
using var connection = await factory.CreateConnectionAsync();
using var channel = await connection.CreateChannelAsync();

await channel.QueueDeclareAsync(queue: "task_queue", durable: true, exclusive: false,
    autoDelete: false, arguments: null);

var message = GetMessage(args);
var body = Encoding.UTF8.GetBytes(message);

var properties = new BasicProperties
{
    Persistent = true
};

await channel.BasicPublishAsync(exchange: string.Empty, routingKey: "task_queue", mandatory: true,
    basicProperties: properties, body: body);
Console.WriteLine($" [x] Sent {message}");

static string GetMessage(string[] args)
{
    return ((args.Length > 0) ? string.Join(" ", args) : "Hello World!");
}

ポイント解説

  • キューの宣言: QueueDeclareメソッドでtask_queueという名前のキューを作成。
  • durable: QueueDeclareAsynに渡しているdurableがtrueを指定。このことによりキューが永続化され、サーバーが再起動した後でもキューが再現されます。
  • Persistent: BasicPublishAsyncメソッドに渡しているPropetiesにPersistent=trueを指定、これによりメッセージが永続化されます。

ワーカーの実装

using RabbitMQ.Client;
using RabbitMQ.Client.Events;
using System.Text;

var factory = new ConnectionFactory { HostName = "localhost" };
using var connection = await factory.CreateConnectionAsync();
using var channel = await connection.CreateChannelAsync();

await channel.QueueDeclareAsync(queue: "task_queue", durable: true, exclusive: false,
    autoDelete: false, arguments: null);

await channel.BasicQosAsync(prefetchSize: 0, prefetchCount: 1, global: false);

Console.WriteLine(" [*] Waiting for messages.");

var consumer = new AsyncEventingBasicConsumer(channel);
consumer.ReceivedAsync += async (model, ea) =>
{
    byte[] body = ea.Body.ToArray();
    var message = Encoding.UTF8.GetString(body);
    Console.WriteLine($" [x] Received {message}");

    int dots = message.Split('.').Length - 1;
    await Task.Delay(dots * 1000);

    Console.WriteLine(" [x] Done");

    // here channel could also be accessed as ((AsyncEventingBasicConsumer)sender).Channel
    await channel.BasicAckAsync(deliveryTag: ea.DeliveryTag, multiple: false);
};

await channel.BasicConsumeAsync("task_queue", autoAck: false, consumer: consumer);

Console.WriteLine(" Press [enter] to exit.");
Console.ReadLine();

ポイント解説

  • BasicQos: BasicQoc(Quality of Service)でコンシューマが一度に扱えるメッセージの数を1つに設定
  • autoAck: コンシューマを実行する際の引数のautoAckにfalseを指定して、Acknowledgement(確認応答)の送信を手動に指定

Acknowledgement(確認応答)について

RabbitMQでは、メッセージをキューから取り出して処理が完了したことをRabbitMQに通知する必要があります。この通知を「Acknowledgement」(確認応答)と呼びます。確認応答を使用することで、次のようなメリットがあります

  1. メッセージの損失防止
    ワーカーがメッセージを処理中にクラッシュしても、確認応答が送られていなければ、RabbitMQはそのメッセージを未処理とみなし、別のワーカーに再送信します。
  2. 確実なメッセージ処理
    メッセージが処理完了したことをRabbitMQに通知することで、正常に処理されたメッセージのみがキューから削除されます。

autoAckの挙動の違い

自動応答(autoAck: true):

メッセージを受け取ると即座にRabbitMQに完了通知を送信。
ワーカーが処理に失敗してもメッセージが失われる可能性あり。

手動応答(autoAck: false):

確実に処理が完了した後で応答を送信。
メッセージが確実に処理される。

ディスパッチ方式の違い

ディスパッチの課題

デフォルトでは、RabbitMQはメッセージをラウンドロビン方式でコンシューマーに配信します。ラウンドロビン方式では、常にコンシューマが決められた順に実行されます。負荷の重いコンシューマがあると後続のコンシューマに待機時間が発生することになります。

フェアディスパッチの仕組み

フェアディスパッチは、RabbitMQが「コンシューマーが処理可能になるまで新しいタスクを送らない」設定を有効にするものです。この設定を行うことで、各ワーカーが効率的にタスクを処理できるようになります。

Discussion