🐰

Java + QuarkusでAsynchronous Message Passingなるものをしてみむとてするなり

2023/03/26に公開

こんにちは! :)

みなさんこんにちは!
私は普段、ある企業の新米TechLeadとして、いくつかのプロジェクトのインフラやアプリケーションのデザインやレビュー、そして実際のコーディングなど担当しております。
私がその部署に異動したタイミングで、すでにいくつかのプロダクトが存在していたのですが、あるプロジェクトでJavaのバックエンドアプリケーションでQuarkusというフレームワークを利用することになりました。
ライブラリやドキュメントもかなり充実しており、とてもエキサイティングなフレームワークです。
この記事では、そのQuarkusフレームワークと親和性の高いVertxライブラリのEventBusを利用したAsynchronous Message Passing、その中でEventBusのPubSubロジックをどのように実装したかをメモとして残したいと思います。

Quarkusってどんなんなん?


Quarkusの特筆すべき技術的優位点は主に以下の2つです。

  • コンテナ・ファースト(Container First)
  • コンティニュアム(Continuum)

1つ目のコンテナ・ファースについてはいずれ別途記事を書きたいと思います。
2つ目のコンティニュアムについては、これを正確に説明するのは大変ですが、基本的にはReactive(リアクティブ)アーキテクチャを主軸においた同期的処理・非同期処理を直感的に実装できる方法が提供されており、ちょっとしたアノテーションの付与で思い通りに実装できることを目的としています。

ほんで、なにしたかったかというと?

ドキュメントを読んでReactiveええやん!ええやん!となっていたのですが、まず、プロトタイプのサービスを作成した段階では、全てのREST APIで提供される機能は慣例的、普通の同期処理でした。
つまり、HTTPのリクエストを受け付けたタイミングでValidationチェックを行い、問題なければ、データベースへの操作、そして、必要であれば、外部のサービスへのリクエストをいくつか投げて、ぜーーーーんぶ成功してから、クライアントに20Xを返す仕様になっていました。

ここで、実際の要件を確認すると、いくつかの処理、とくに外部のシステムに依存する操作は、非同期で実施して、その操作の成功・失敗に関わらずクライアントに20xを返した方がいいということがわかりました。

とくに目新しいこともなく今までは、このような場合、外部システムへのリクエストを行うロジックを非同期処理にしたり、Kafkaのようなメッセージングサービスを間に介してリクエストを投げて、非同期に処理するようにしていましたが、このアプリの規模も小さいので、できるだけ自己完結でいきたい、かつ、ひとつの処理が成功したら同時に2つの処理を呼び出し、それぞれの処理の成功・失敗によって取るべき挙動が異なる、非同期に処理を呼び出してさらにそこから処理が成功したら、非同期に別のロジックを非同期にスタートする。。。みたいな、ちょっとだけ複雑な感じなので、でした。
この非同期に動かしたロジックは今後の要件から、複数増えたり、減ったりしそうなので、そのあたりの拡張性も気になるところでした。

このようなロジックを実装するには、いくつかの方法がありますが、ノンブロッキングな処理をQuarkusが提供するReactiveアプリケーション内で実装するために、vertxエクステンションが提供するEvent Busを利用したpublish/subscribeロジックで実装してみようということに、なんやかんやでなりました。

いっちょやってみよ〜

今回の実装のサンプルプログラムはこちらのGitHubレポジトリにて公開しています。詳細はそちらを参照してください。

こちらの公式ドキュメントを参考にしました。
公式ドキュメントにあるように、以下のコマンドでMavenプロジェクトを生成します。

mvn io.quarkus.platform:quarkus-maven-plugin:2.16.5.Final:create \
    -DprojectGroupId=org.acme \
    -DprojectArtifactId=vertx-http-quickstart \
    -Dextensions='vertx,resteasy-reactive' \
    -DnoCode
cd vertx-http-quickstart

pom.xmlができあがるだけで、javaフォルダ以下は空ですね。。
以下のように実装していきます。

Reactive REST Endpointの仕様:

@Path("/async")
public class SampleResource {

  @Inject
  SampleController controller;

  private final Logger logger = LoggerFactory.getLogger(this.getClass());

  @GET
  @Produces(MediaType.TEXT_PLAIN)
  @Path("{name}")
  public Uni<String> greeting(String name) {
    logger.info("Logic A: request received");
    return controller.startEvent(name);
  }

}

ControllerクラスにてValidationなどなどを行ったあとに、Serviceクラス(上の絵でいうLogic B)にて以下のようなロジックを実装します。

  @Inject
  EventBus eventBus;

  private final Logger logger = LoggerFactory.getLogger(this.getClass());

  @Inject
  SampleRepository repository;

  public Uni<String> startService(String name) {

    final var message = new LogicCDMessage(name);
    eventBus.publish(TOPIC_CD, message);

    logger.info("Logic E: do something against the database");
    return repository.doSomeThingInDatabase(name);
  }

みて頂いて分かる通り、eventBus.publish()で1つ目の引数でトピック名、そして、2つ目の引数で渡したいメッセージをセットします。

そして、repository.doSomeThingInDatabase(name)で、エンドポイントで受け付けたnameの値をLogic Eにて以下のように変形して単純にクライアントにかえしています。

@ApplicationScoped
public class SampleRepositoryImpl implements SampleRepository {

  @Override
  public Uni<String> doSomeThingInDatabase(String name) {
    return Uni.createFrom().item(() -> String.format("Hello %s!!", name));
  }
}

一方で、上の絵のLogic C,D,G,Hは特定のメッセージをSubscribeして非同期に処理を呼び出したいので、
C,DはTOPIC_CDをサブスクライブして、
G,HはTOPIC_GHをサブスクライブするように設定します。
具体的には@ConsumeEventアノテーションでサブスクライブしたいTOPICを引数に渡して起動させます。
以下はLogic Dを実装したEventConsumerクラスです。

@Singleton
public class LogicDEventConsumer extends LogicEventConsumerBase {

  @Override
  String getLogicName() {
    return "Logic D";
  }

  @Inject
  EventBus eventBus;

  @ConsumeEvent(TOPIC_CD)
  public void consume(final LogicCDMessage message) {
    logger.info("{}: FIRED. Received message: 'name: {}'", getLogicName(), message.name());
    logger.info("{}: Do something asynchronously", getLogicName());
    logger.info("{}: DONE successfully", getLogicName());

    var nextMessage = new LogicGHMessage(message.name());
    eventBus.publish(TOPIC_GH, nextMessage);
  }
}

このconsumeメソッド内で新たにメッセージをpublishしてロジックG,Hを呼び出します。

やってみた結果

以下のコマンドでMavenのQuarkusアプリをLive Codingモードで起動することができます。

 ./mvnw compile quarkus:dev

アスキーアートで QUARKUSと表示されていますね(正直よめない)

ブラウザで単純にhttp://0.0.0.0:9000/async/hogehogeにアクセスしてみます。(name=hogehote)
すると、すぐにHello hogehoge!!とかえってきます!ええやん。
ログは以下のようになっています。


Logic A -> Logic Bとシーケンシャルにロジックが走ってます。
次に、Logic Eが動き、さっさとクライアントにレスポンスを返しています。
その後、Logic C, Dが起動されており、そろぞれの処理が終了後、
Logic G,Hがスタートして、無事終了しています。
つまり、上の絵の狙い通りに動いています!!ええやん!ええやん!

おしまい

ここまで読んでくださいましてありがとうございました。
記事を書くのは結構時間かかるし、伝わるように書くのはとてもむずかしいですね。。。
今後も"してみむとてするなりメモ"をできるだけ書き続けようとおもいます。また、皆様の素晴らしい記事も今後参考にしながら、日々研鑽していきたいと思います。

GitHubで編集を提案

Discussion