🐕

ログラスの非同期処理基盤の設計と導入

2024/05/23に公開

こんにちは、ログラスでエンジニアをしております南部です。
唐突ですが、みなさんは「非同期処理」の実装を行っていますか?

非同期処理とは、プログラムの処理を他の処理と並行して行う方法です。これにより、ユーザーの操作がブロックされることなく、バックグラウンドで時間のかかる処理を実行することができます。

Loglassではユーザーの大量のデータを取り扱うこともあり、非同期処理を多く利用しています。
今までは単一のAPIで同期処理・非同期処理どちらも行っていましたが、今回独立したインフラに処理を投げておける非同期処理基盤を導入する活動を行いました。

最近本格導入に至りましたので、背景から設計・工夫したことを共有できればと思います。

導入背景

実はLoglassでは、非同期処理基盤の導入前も非同期での処理は実装されています。
例えば、お客様のデータを取り込む際など、UIをブロックしないように処理を裏側で行うようになっています。
これらはKotlinのCoroutinesを用いてアプリの中で実装されていました。

「すでに実装されてるなら非同期処理基盤はいらないのでは?」と思われるかもしれませんが、多くの企業に導入し始めていただいている中で、インフラへの負荷が課題になってきていました。

現状、単一のAPIで重い処理も軽い処理も行われています。
そのため、重い処理がインフラの負荷を高め、関係ない他の処理にまで影響を与える可能性があります。

つまり、このままだとある特定のユーザーの処理が他のユーザーの体験にまで影響しうる状況でした。
こういった課題は、まだ顕著になっていない今の段階で潰しておくべきだと考えました。

また、KotlinのCoroutinesを用いた実装は煩雑性が高く、きちんと実装しないとデーモンのようにメモリ上に処理の残骸が残り続けてしまう場合があります。
きちんと実装すれば良いとはいえ、現状できていない箇所もあり、こちらもインフラのリソースを圧迫していくので課題になっていました。

上記を整理すると、要求は以下になります。

  1. APIとは別のインフラで重い処理を実行して欲しい
  2. 処理は非同期に行いたいので、インフラレベルで完了を待たずに実行できるようにして欲しい

以上から、既存APIから独立した非同期処理基盤を作ろうと思い至りました。

設計

LoglassのバックエンドはSpringBootを使って開発されています。
そのため、前提としてSpringBootに乗せる形で開発をしようと考えていました。

また、既存APIで実装されている処理を速やかに非同期処理基盤に移行していけるようにするため、できるだけ移行コストの低い設計にする必要がありました。
幸い、LoglassのコードはDDDで設計されており、別のインフラからユースケース層だけを呼び出すことができれば、おおよそ要件を満たせることが分かっていました。
この点において、コードがレイヤーできれいに分かれている恩恵を大きく受けました。
いくつかの記事[1][2]を参考にさせていただき、最終的に以下のような構成を提案しました。

非同期処理基盤の構成

QueueをポーリングするWorkerをECS上で別のアプリケーションとして立てるような設計になっています。
処理の順を追って記述します。

  1. 既存のAPIはAWSのSQSにタスクを投げる。
    • この際に、排他制御のためのDB更新も行う
    • 排他制御は、対象タスクの実行ステータスを実行中にする
  2. WorkerがQueueをポーリングする
  3. Workerがタスクを受け取ると、既存APIのユースケース層の処理を実行する
    • Workerはマルチスレッドで複数タスクを並列に処理する
  4. Queueのタスクが異常終了または、タイムアウトするとDead Letter Queue(以降DLQ)にタスクが移動する
    • SQSの設定(Redrive Policy)で、1度でもエラーしたら次にはDLQに移動する
    • タイムアウトは、SQSの可視性タイムアウトによって実現している
  5. WorkerがDLQをポーリングする
  6. 対象タスクの実行ステータスをエラーに倒す
    • ユーザーは再実行ができるようになる

このようにすることで、既存APIから非同期処理基盤にタスクを移行するときに追加で実装するものを最小限に抑えるようにしました。
また、1度SQSにタスクを外だししているので、今後Workerではなく別のアプリを使って処理を行うように変更することも容易にできます。

Spring Cloud AWS

SQSの操作をSpringBootのアプリ内から行うためのライブラリとして、Spring Cloud AWSを用いました。
https://awspring.io/

この後の内容にも関わってくるため、このライブラリが行うことについて記述します。
非同期処理基盤でこのライブラリが行うことは以下です。

  1. 既存APIからSQSにタスクを送信する
  2. WorkerでSQSをポーリングする

既存APIからSQSにタスクを送信する

このライブラリはSqsTemplateというクラスを用いてSQSへの送信を行います。
ドキュメントはこちらです。

SqsTemlateは以下のようなコードでBeanとして登録しています。(実際に使用しているコードとは異なります)

    @Bean
    fun sqsTemplate(): SqsTemplate {
        return SqsTemplate.builder()
            .sqsAsyncClient(sqsAsyncClient())
            .configure { option ->
                option.queueNotFoundStrategy(QueueNotFoundStrategy.FAIL)
            }
            .build()
    }

このBeanを使用し、以下のようなコードでSQSに送信しています。

sqsTemplate.send { to -> to.queue("my-queue-name").payload(workerTask).headers(headers) }

WorkerでSQSをポーリングする

WorkerがSQSをポーリングするには@SqsListenerというアノテーションを使用します。
ドキュメントはこちらです。

このアノテーションは以下のように使用します。

    @SqsListener({"${my.queue.url}"})
    public void listenTwoQueues(String message) {
        System.out.println(message);
    }

これだけで、指定したキューをポーリングするようになってくれます。
また、必要であれば引数でacknowledgeModeを指定できます。
処理が正常終了したときだけキューからメッセージを削除、などを選択できます。詳しくはこちらのドキュメントを参照してください。

開発体験

今回作成する非同期処理基盤は、今後Loglassの他の処理も乗せていきたいものです。
そのため、私が所属するフィーチャーチーム外の人が実装することも想定されます。
そこで、誰が実装しても同じように、かつ安全に実装できるような設計にする必要がありました。
特に、行レベルセキュリティについて工夫をしたので紹介します。

行レベルセキュリティ

LoglassのDBはマルチテナントの単一DBになっています。
マルチテナントとは、複数のお客様が同じシステムを共有する方式です。このため、他のテナントのデータに誤ってアクセスしないよう、行レベルセキュリティ(RLS: Row Level Security)を導入しています。RLSにより、SQLのwhere句に必ずテナントIDが入るようにして、データの漏洩を防ぎます。

既存のAPIについては以前弊社の松岡が登壇した内容[3]の通りにRLSを意識せずに開発者が実装できるようになっています。
スライドにはありませんが、APIでは認証基盤からテナントIDを引くようになっています。
しかし、非同期処理基盤はテナントIDを引く元はSQSしかなく、SQSのメッセージにテナントIDを入れ込む他ありません。

よって、今後の安全な開発のために、開発者が意識せず以下を満たせることを要件としました。

  • SQSに送信するメッセージに必ずテナントIDが入る
  • SQSからタスクを受けたWorkerの処理がRLSを勝手に考慮する

SQSに送信するメッセージに必ずテナントIDが入るようにする

この要件を満たすために、以下を作成しました。

  • Workerに送信するタスクの継承用クラス(WorkerTask)
    • このクラスでテナントIDはabstractで宣言されています
  • SQSへメッセージを送信するためのインターフェース(WorkerTaskSender)
    • その実装クラス(WorkerTaskSqsSender)

実装者は、必ずWorkerTaskクラスを継承したクラスに処理の実行に必要な情報を渡します。
そして、そのWorkerTaskWorkerTaskSenderを通して送信するようになります。
これにより、要件を満たすようになりました。

これらは既存API側に実装されており、以下の層に実装されています。

- ユースケース層
    - WorkerTask
    - WorkerTaskSender
- インフラ層
    - WorkerTaskSqsSender

SQSからタスクを受けたWorkerの処理がRLSを勝手に考慮する

実装者が処理の実装に集中できるように、RLSについての考慮は勝手にしてくれるような構成にしたいという要件です。
この要件を満たすために、AOPを利用しようと考えました。
AOPを利用すると、実装者が明示的に実装しなくても特定の処理を走らせることができます。
AOPに関する詳しい説明は省きますが、Adviceというものをある処理が始まる前や後に挟み込むことができます。
定義しておけば自動で挟み込まれるため、今回の要件にはぴったりでした。

既存APIでもAOPを利用してRLSを考慮しており、@Transactionalというアノテーションが付与されている場合、処理が始まる前に以下のSQLクエリをDBに投げるようなAdviceが実装されていました。

SET LOCAL app.current_tenant = '${tenantId.value}'

このクエリが実行されることで、Session内ではRLSを突破することができるようになります。
今回、Workerが呼び出すユースケース層には@Transactionalが原則付与されているため、このAdviceを利用しない手はありませんでした。

上述の通り、既存APIのAdviceではテナントIDをユーザーの認証情報から引いています。
非同期処理をする上では、SQSのメッセージからテナントIDを取得して保持しておかないといけません。
逆に言えば、その部分もAOPで実装できれば実装者はRLSを意識しなくてよくなります。

そこで利用しようと考えたのが、SQSをポーリングするために付与する@SqsListenerというアノテーションでした。これは上述のライブラリで実装されているものです。
各処理でQueueをポーリングするため、実装者はこのアノテーションを必ず付与することになります。
最終的に、以下のような流れで要件を満たしました。

RLS突破の流れ

今回、スレッドごとに処理が走るので、テナントIDを保存する先はThreadLocalとしました。
以上の設計にしたことで、@SqsListenerを付与したクラスが既存APIのユースケース層を呼び出している限り、開発者はRLSを意識しないで処理を非同期処理基盤に移行することができるようになりました。

テスト

本番に導入するにあたって、当然ながらテストを行う必要があります。
しかし今回は、移行する処理自体のコードはほとんど変更がありません。そのため、処理自体のテストを実施し直す必要はありませんでした

そこで、テストを行う観点を、既存APIで動作する場合と非同期処理基盤で動作する場合の差分箇所に絞ることができました。
大枠でいうと、テストの対象となる動作の差分は以下となります。

  • 既存APIはタスクをSQSに投げるだけとなる
    • 投げる直前に、排他制御のためにDBの実行ステータスを更新する
  • ユーザーがタスクを実行してから処理されるまでにタイムラグが発生する
    • その間に実行タスクの設定が変更されうる
  • WorkerがSQSからタスクを受け取って処理する
  • 異常終了した場合にはDLQにタスクが移動する
    • 移動後、実行ステータスがエラーになる処理が走る

これらを、SQSの可視性タイムアウトや、配信遅延時間を一時的に調整するなどして動作検証しました。
処理自体のテストを行う必要がなかったので、実際のテスト実行は2時間程度で完了しました。

リリース

今回、既存APIで今まで同期的に処理していたタスクを基盤に移行する形で本格導入しました。
導入にあたって、既存のAPIのエンドポイントに追加で別の非同期処理基盤用のエンドポイントを用意しました。
そして、フィーチャートグルでどちらのエンドポイントを使用するかを制御するようにしました。 (下図参照)

フィーチャートグルによる制御

こうすることで、以下のメリットを享受し、安心・安全のリリースを実現しました。

  • コードは事前にデプロイしておけるので、リリース時にはトグルを更新するのみ
  • 検証用テナントで動作確認をしてから顧客環境に反映できる
  • 何かあった場合には、トグルを全テナントでOFFにすれば影響を最小限にできる

まとめと今後の展望

今回導入した基盤は、これが完成形だとは考えていません。
あくまで既存APIの処理を置き換えていくコストを最小化し、別インフラに処理を切り出すという要件を叶えるためのものです。

今後、Workerのインフラでも負荷に耐えられないというような状況になった場合には、何らかの施策をしないといけません。
例えば、ネイティブイメージを使って1タスク1コンテナで処理を実行するなどです。
ですが、ひとまずは今回の基盤を拡張する形でインフラ的な負荷としても問題なく動作することが検証で分かっています。
SQSに積まれたタスクの先がどう処理されるかは切り替えられますので、YAGNIの精神で必要になったら考えれば良いと思っています。

今回は、今まで単一のAPIで処理を行っていたものを、処理を外出しできる口ができたということ自体が大きな成果であると考えています。

今後もより良いプロダクトにするために、機能の開発だけでなく、こういったインフラを用いたユーザー体験の向上にも目を向けていけると良いなと思っています。

今回の記事が、同様の課題に直面している他のエンジニアの参考になれば幸いです。

脚注
  1. https://speakerdeck.com/recruitengineers/jjugccc2023spring ↩︎

  2. https://debugroom.github.io/mynavi-doc-draft/cloud_native/aws-sqs-2.html ↩︎

  3. https://www.slideshare.net/koichiromatsuoka/postgresql-springaop ↩︎

株式会社ログラス テックブログ

Discussion