🐙

RabbitMQ入門: チュートリアル その1

2025/01/12に公開

大量の画像を処理するタスクがあって何とか効率化したい。
分散システムや非同期処理なんかを活用して、なんとかコンカレントに複数のバッチを処理ができないか模索中です。

キューシステムにはRabbitMQやApache Kafkaなどがありますが
今回は学習が楽そうなRabbitMQを使てみたいと思います。

RabbitMQは、分散システムや非同期通信で使用されるメッセージブローカーの一つです。本記事では、RabbitMQを使用してメッセージをキューに送信する基本的な方法を学びます。

以下の流れで説明を進めます。

  1. RabbitMQとは?
  2. チュートリアルの概要
  3. RabbitMQサーバーのセットアップ
  4. 必要なライブラリのインストール
  5. C#コード解説
  6. 結果の確認

RabbitMQとは?

RabbitMQは、メッセージキューイングの役割を担うソフトウェアです。プロデューサー(メッセージを送信する側)からコンシューマー(メッセージを受信する側)にデータを効率的に配信します。システムを非同期にすることでパフォーマンスが向上し、スケーラビリティを確保できます。

このチュートリアルの概要

このチュートリアルでは、以下の内容を実装します。

  • プロデューサーとしてC#アプリケーションを作成。
  • RabbitMQサーバーに接続。
  • キューを作成し、簡単なメッセージを送信。

RabbitMQサーバーのセットアップ

インストールはOSそれぞれのパッケージマネージャで行うことができます。
RabbitMQのサーバーのインストールについては、公式のInstalling RabbitMQを参照すると詳しくかいてあります。

今回はDockerを使ってインストールを行いたいと思います。

version: '3.8'

services:
  rabbitmq:
    image: rabbitmq:3-management
    container_name: rabbitmq
    ports:
      - "5672:5672"    # RabbitMQ標準のポート
      - "15672:15672"  # 管理UIのポート
    environment:
      - RABBITMQ_DEFAULT_USER=user     # デフォルトユーザー名
      - RABBITMQ_DEFAULT_PASS=password # デフォルトパスワード
    volumes:
      - rabbitmq_data:/var/lib/rabbitmq # データの永続化
    restart: unless-stopped

volumes:
  rabbitmq_data:
docker compose up

コンテナが起動したら、ブラウザでhttp://localhost:15672にアクセスする。
ログイン画面が表示されればOKです。

必要なライブラリのインストール

今回はC#を使ってアプリケーションを作成するので、nugetを使ってRabbitMQ.Clientをインストールします。

C#コード解説

チュートリアルでは、RabbitMQサーバーにメッセージを送信するSendとメッセージをサーバーから取り出すReceivを作成します。

メッセージの送信

全体のコードは以下の通りです。

Send.cs
using RabbitMQ.Client;
using System.Text;

var factory = new ConnectionFactory {
    HostName = "localhost",
    UserName = "foobar",
    Password = "password"
};
using var connection = await factory.CreateConnectionAsync();
using var channel = await connection.CreateChannelAsync();

await channel.QueueDeclareAsync(queue: "hello", durable: false, exclusive: false, autoDelete: false,
    arguments: null);

const string message = "Hello World!";
var body = Encoding.UTF8.GetBytes(message);

await channel.BasicPublishAsync(exchange: string.Empty, routingKey: "hello", body: body);
Console.WriteLine($" [x] Sent {message}");

Console.WriteLine(" Press [enter] to exit.");
Console.ReadLine();

コードの詳細

1. サーバーへの接続
var factory = new ConnectionFactory {
    HostName = "localhost",
    UserName = "foobar",
    Password = "password"
};
using var connection = await factory.CreateConnectionAsync();
using var channel = await connection.CreateChannelAsync();

ConnectionFactoryのインスタンスを使ってサーバーに接続を行いconnectionオブジェクトを作成します。
コネクションオブジェクトからChannelオブジェクトを作成します。

2. キューの宣言
channel.QueueDeclare(queue: "hello",
                     durable: false,
                     exclusive: false,
                     autoDelete: false,
                     arguments: null);

チャネルに対してキューを宣言します。

  • queue: キューの名前(例: hello)。
  • durable: サーバー再起動時にキューを保持するか。
  • exclusive: 接続ごとの専用キューにするか。
  • autoDelete: キューが未使用時に削除されるか。
3. メッセージの送信
const string message = "Hello World!";
var body = Encoding.UTF8.GetBytes(message);

await channel.BasicPublishAsync(exchange: string.Empty, routingKey: "hello", body: body);
  • exchange: メッセージのルーティング方法を指定(今回はデフォルトの空文字)。
  • routingKey: メッセージの送信先キュー。
  • body: メッセージデータ(バイト配列)。

メッセージ受信処理

つづいてサーバーからメッセージを受信するコードを見ていきます。

using RabbitMQ.Client;
using RabbitMQ.Client.Events;
using System.Text;

var factory = new ConnectionFactory {
    HostName = "localhost"
    UserName = "foobar",
    Password = "password"
};
using var connection = await factory.CreateConnectionAsync();
using var channel = await connection.CreateChannelAsync();

await channel.QueueDeclareAsync(queue: "hello", durable: false, exclusive: false, autoDelete: false,
    arguments: null);

Console.WriteLine(" [*] Waiting for messages.");

var consumer = new AsyncEventingBasicConsumer(channel);
consumer.ReceivedAsync += (model, ea) =>
{
    var body = ea.Body.ToArray();
    var message = Encoding.UTF8.GetString(body);
    Console.WriteLine($" [x] Received {message}");
    return Task.CompletedTask;
};

await channel.BasicConsumeAsync("hello", autoAck: true, consumer: consumer);

Console.WriteLine(" Press [enter] to exit.");
Console.ReadLine();

さて、詳しく見ていきましょう。

コードの解説

1. コネクションの作成とキューの宣言
var factory = new ConnectionFactory {
    HostName = "localhost"
    UserName = "foobar",
    Password = "password"
};
using var connection = await factory.CreateConnectionAsync();
using var channel = await connection.CreateChannelAsync();

await channel.QueueDeclareAsync(queue: "hello", durable: false, exclusive: false, autoDelete: false,
    arguments: null);

キューを宣言するところまでは送信時と同じですね

2. メッセージ受信のためのイベントハンドラの設定

ConsumerにRecivedAsyncイベントハンドラを登録します。

var consumer = new AsyncEventingBasicConsumer(channel);
consumer.ReceivedAsync += (model, ea) =>
{
    var body = ea.Body.ToArray();
    var message = Encoding.UTF8.GetString(body);
    Console.WriteLine($" [x] Received {message}");
    return Task.CompletedTask;
};

ReceivedAsyncのイベントハンドラの引数の意味は次の通り

model(型: object)
イベントを発生させたコンシューマーモデルへの参照。
通常はAsyncEventingBasicConsumerのインスタンス
チャネルへのアクセスが必要な場合:((AsyncEventingBasicConsumer)model).Channel

ea(型: BasicDeliverEventArgs)
メッセージの配信に関する情報を含むイベント引数

// メッセージの本体
* ea.Body.ToArray()

// メッセージの一意の識別子(Ack/Nack時に使用)
* ea.DeliveryTag

// このメッセージが再配信されたものかどうか
* ea.Redelivered

// メッセージが発行されたエクスチェンジ名
* ea.Exchange

// メッセージのルーティングキー
* ea.RoutingKey

// メッセージのプロパティ(永続性、優先度など)
* ea.BasicProperties
3. メッセージの受信を開始

BasicConsumeAsyncでメッセージの受信を開始します。

await channel.BasicConsumeAsync("hello", autoAck: true, consumer: consumer);

送信結果の確認

管理画面からQueueにメッセージが登録されているか確認できます。

メッセージが送信されるとRabbitMQサーバーにメッセージが追加されたのが確認できます。

メッセージが受信されるとRabbitMQサーバーのメッセージがなくなっているのが確認できます。

Discussion