🎉

ActiveMQ Artemisを使用した非同期処理を構築してみた

2023/02/12に公開

本記事の目的

本記事のゴールは、メッセージキューイングによる非同期処理を構築し、ActiveMQ Artemis を利用したシステムを実際に動作確認してみることです。
Spring Boot × JMS を用いてパブリッシャーとサブスクライバーをそれぞれ構築し、キューへのメッセージ登録・キューからのメッセージ取得を確認してみましょう!

なお、サンプルプロジェクトは Github においてありますのでこちらと合わせて確認いただけたらと思います。

https://github.com/yuuLab/active-mq

ActiveMQ Artemis とは?

ActiveMQ は Java ベースのメッセージブローカーです。
ActiveMQ には 2023 年 2 月現在、”Classic”と”Artemis”と呼ばれる二種類が提供されており、Artemis は次世代型のメッセージブローカーという位置付けです。
マイクロサービスアーキテクチャで使われるようなシステム間で非同期に情報をやりとりする際に利用されます。

メッセージキューイングシステムとしては、最近だと Amazon SQS や Amazon MQ などのクラウドサービスがよく利用されているかと思います。

ActiveMQ Artemis のインストール

まずは以下手順に従い、ActiveMQ を開発マシンにインストール&サーバー起動まで行います。
なお、公式ドキュメント > Latest User Documentation > Using the Serverに詳細が載っているのでそちらからでも OK です。

  1. Apache ActiveMQの公式サイトから Artemis 版をダウンロードする。

  2. 解凍後、${ARTEMIS_HOME}/bin 配下に移動して以下コマンドを実行し、任意の名前でブローカーを作成する。その後ユーザー名やパスワードの入力が促されるので適当に入力します。(例では mybroker

    ${ARTEMIS_HOME}/artemis create mybroker
    
  3. mybroker ディレクトリが作成されているので、run コマンドで ActiveMQ サーバーを起動します。

    ${ARTEMIS_HOME}/mybroker/bin/artemis run
    
  4. 出力されたアドレスにアクセスします。(以下のようにログが出力されていれば問題なし。)

    2023-02-11 18:15:48,252 INFO [org.apache.activemq.artemis] AMQ241004 :Artemis Console available at http://localhost:8161/console
    
  5. アクセスすると以下のようなログイン画面が出てくるので、ブローカー作成時に入力したユーザー名・パスワードを指定してログインします。

ログインできたら ActiveMQ の準備は完了です!

パブリッシャー側の構築(メッセージ送信)

Java Message Service (JMS)を利用して、メッセージをキューに送信するアプリケーションを作成します。
JMS は JNDI を使用するため、まずはプロパティファイルjndi.propertiesを用意しましょう。
今回はTestQueueという名前のキューに対してメッセージを送信することとします。

<project-dir>/src/main/resouces/jndi.properties

# JNDI初期コンテキストファクトリーの設定
java.naming.factory.initial = org.apache.activemq.artemis.jndi.ActiveMQInitialContextFactory

# JMS接続ファクトリー(接続作成のためのエントリーポイント:URL)
# connectionFactory.<factory-name> = <connection-uri>
connectionFactory.ConnectionFactory=tcp://localhost:61616

# キューオブジェクトをJNDIに登録
# queue.[JNDI名] = [キューの名前]
# queue.<queue-lookup-name> = <queue-name>
queue.Queue = TestQueue
# キューの数だけ追記する。
# ※ キューではなくトピックの場合は、topic.<topic-lookup-name> = <topic-name>

あとは Java 側の実装でプロパティファイルの設定からキューに接続し、データを送信します。
今回は動作確認なので、送信データはメソッドの引数でもらい、それをそのまま MQ に送信する仕様とします。

/src/main/java/com/yuulab/activemq/producer/Pruducer.java

public class Producer {
  /**
  *  キューにオブジェクトデータを送信する。
  *
  * @param data 送信データ
  * @return JMS Message ID
  */
  public String send(Serializable data) {
    InitialContext initialContext;
    QueueConnectionFactory factory;
    try {
      // JNDIの設定からFactory生成
      initialContext = new InitialContext();
      factory = (QueueConnectionFactory) initialContext.lookup("ConnectionFactory");
    } catch (NamingException e) {
      throw new RuntimeException("名前解決に失敗しました。", e);
    }

    // connection, session 生成
    try (QueueConnection connection = factory.createQueueConnection();
          QueueSession session = connection.createQueueSession(false, Session.AUTO_ACKNOWLEDGE);) {
      connection.start();
      // create message
      ObjectMessage message = session.createObjectMessage(data);
      // send
      Queue originalQueue = (Queue) initialContext.lookup("Queue");
      session.createSender(originalQueue).send(message);
      System.out.println("ActiveMQにメッセージを送信しました。" + message.getJMSMessageID());
      connection.close();
      return message.getJMSMessageID();
    } catch (JMSException | NamingException e) {
      throw new RuntimeException("送信に失敗しました。", e);
    }
  }
}

さて、リファクタリングの余地はありそうですが基本的な作りは以上で完了です。実際にメッセージを送ってみましょう。
API 経由で氏名情報を送り、それをブローカーに登録する流れと仮定します。
ですので突貫ですが Controller と送信用データのレコードクラスを作成しておきます。MQ には First Name, Last Name, Random ID を連携する仕様とします。

/src/main/java/com/yuulab/activemq/presentation/TestController.java

@RestController
public class TestController {

  @GetMapping(value = "/test")
  public Status get(@RequestParam(name = "fistName", defaultValue = "taro") String firstName,
      @RequestParam(name = "lastName", defaultValue = "tanaka") String lastName) {
    Producer pub = new Producer();
    TestQueueData data = new TestQueueData(this.generateId(), firstName, lastName);
    pub.send(data);

    return new Status("OK");
  }

  @Getter
  class Status {
    private final String status;

    public Status(String status) {
    this.status = status;
  }
}

  private String generateId() {
    SecureRandom sr;
    try {
      sr = SecureRandom.getInstance("SHA1PRNG");
      return String.valueOf(sr.nextInt());
    } catch (NoSuchAlgorithmException e) {
      throw new RuntimeException("IDの生成に失敗しました。", e);
    }
  }
}

/src/main/java/com/yuulab/activemq/data/TestQueueData.java

public record TestQueueData(
  String id,
  String firstName,
  String lastName) implements Serializable {
}

以上で準備 OK です。サーバーを起動してリクエストを送信します。
なお、キューはあらかじめ作成しておく必要はなく、指定した名前のキューが存在しなければ勝手に作成してくれるみたいです。(?)
ですので今回だとTestQueueという名前のキューが作成され、そこにメッセージが登録されていくことになります。

では、動かして確かめてみましょう。API サーバーを起動後、以下 curl コマンド実行してみます。

curl -i -X GET \
 'http://localhost:8080/test?firstName=sato&lastName=jiro'

サーバーログを見ると、メッセージ送信が成功していることがわかります。

.....
[2m2023-02-11T19:16:31.562+09:00[0;39m [32m INFO[0;39m [35m16968[0;39m [2m---[0;39m [2m[nio-8080-exec-1][0;39m [36mo.a.c.c.C.[Tomcat].[localhost].[/]      [0;39m [2m:[0;39m Initializing Spring DispatcherServlet 'dispatcherServlet'
[2m2023-02-11T19:16:31.563+09:00[0;39m [32m INFO[0;39m [35m16968[0;39m [2m---[0;39m [2m[nio-8080-exec-1][0;39m [36mo.s.web.servlet.DispatcherServlet       [0;39m [2m:[0;39m Initializing Servlet 'dispatcherServlet'
[2m2023-02-11T19:16:31.565+09:00[0;39m [32m INFO[0;39m [35m16968[0;39m [2m---[0;39m [2m[nio-8080-exec-1][0;39m [36mo.s.web.servlet.DispatcherServlet       [0;39m [2m:[0;39m Completed initialization in 1 ms
ActiveMQにメッセージを送信しました。ID:284e8734-a9f5-11ed-8b67-7278e6dd7973

それでは ActiveMQ の管理画面を確認して、メッセージが登録されているかどうかみてみます。
管理画面から Queue の情報を見ると、以下のキャプチャの通りTestQueueという名前がついたキューが作成され、Message Countが 1 になっているのがわかるかと思います。
DLQ,ExpiryQueueは ActiveMQ 起動時に最初からいる今回とは関係ないキューであることに注意。

メッセージのキューイングに無事成功です!

Message Countの数字部分のリンクをクリックしてさらに詳細を確認します。

登録されているデータは、Object タイプのメッセージで、User ID もログに出力されていた ID と一致しています。
なお、現状キューから取り出すアプリケーションがない状態なので、登録されたメッセージはそのまま保持されています。

次はメッセージの受信側を構築していきます!

サブスクライバー側の構築(メッセージ受信)

受け取り側は Spring 様のおかけでけっこうシンプルです。Spring の別プロジェクトを用意し、application.propertiesに設定を記載します。

<project-dir>/src/main/resouces/application.properties

spring.artemis.mode=native
spring.artemis.broker-url=tcp://localhost:61616
# port被らないように設定しておく
server.port=8081

あとは以下のようなリスナークラスを用意してあげれば、自動的にキューにメッセージが追加されたらそれを拾ってくれます。
/src/main/java/com/yuulab/activemq/consumer/TestQueueListener

@Component
public class TestQueueListener {

  /**
   * TestQueueからメッセージを受け取る。
   * @param data メッセージ
   * @throws InterruptedException
   */
  @JmsListener(destination = "TestQueue")
  public void receive(TestQueueData data) throws InterruptedException {
    System.out.println("メッセージを受信しました。" + data);
    // dummy sleep
    Thread.sleep(10000L);
  }
}

さて、アプリケーションを起動して、最初に登録したメッセージを取り出してログに出力されることを確認します。
以下アプリケーション起動後のログ内容です。すぐに先ほどキューに登録したメッセージを拾えていることがわかります。

...
[2m2023-02-11T21:13:46.654+09:00[0;39m [32m INFO[0;39m [35m24331[0;39m [2m---[0;39m [2m[  restartedMain][0;39m [36mo.s.b.w.embedded.tomcat.TomcatWebServer [0;39m [2m:[0;39m Tomcat started on port(s): 8081 (http) with context path ''
[2m2023-02-11T21:13:47.300+09:00[0;39m [32m INFO[0;39m [35m24331[0;39m [2m---[0;39m [2m[  restartedMain][0;39m [36mc.y.a.ActiveMqConsumerApplication       [0;39m [2m:[0;39m Started ActiveMqConsumerApplication in 2.917 seconds (process running for 3.798)
メッセージを受信しました。TestQueueData[id=-2117026429, firstName=taro, lastName=jiro]

簡単ですね、、フレームワークは偉大です。

最後に

メッセージキューイング処理の基本的な機能の構築は以上となります。
実際のシステムでは要件に合わせ、キューの数やメッセージ取り出し後の処理、エラーハンドリング、失敗したメッセージの扱い、メッセージの永続化など考える必要がさらにたくさんあるかと思います。

ただ、まずは動くものを作ってみるというのも重要なので、すぐに動くメッセージキューイング処理を作りたい場合にこちらの記事/コードがお役に立てると嬉しいです。
今度機会があれば Amazon SQS を利用したメッセージキューイングのサンプル作ってみたいと思います。

ではまたお会いしましょう!

参考記事

https://activemq.apache.org/components/artemis/documentation/

https://terasolunaorg.github.io/guideline/5.3.0.RELEASE/ja/ArchitectureInDetail/MessagingDetail/JMS.html

https://access.redhat.com/documentation/ja-jp/red_hat_amq/7.7/html/using_the_amq_core_protocol_jms_client/index

https://spring.pleiades.io/guides/gs/messaging-jms/

https://spring.pleiades.io/spring-boot/docs/current/reference/html/messaging.html

https://www.techscore.com/tech/Java/JavaEE/JMS/

GitHubで編集を提案

Discussion