💫

マイクロサービスを支えるメッセージング技術:Redis Streams

2021/12/17に公開

はじめに

この記事は、airCloset Advent Calendar 2021 16 日目の記事です!
ここ最近は、複数サービス間の連携やマイクロサービスの設計に関わらせてもらうことが多く、そこでの技術選定について紹介させていただきます。

実現したいこと

さて、やりたいことは次のことでした。

  • ユーザーの認証基盤を外だしし、複数サービスで共通のアカウントでログインできるようにする
  • 複数サービスで同じ項目のデータを、共通化したい

※認証基盤の話はここでは割愛します。

解決したい問題

このアプローチとして、共通の API/DB を使って外だしすることを検討しました。いわゆるマイクロサービスです。

「よっしゃ、API 作るでー!」と、気合を入れた矢先、次のような懸念を抱えました

  1. パフォーマンス: ユーザー関連のアクセスはマイクロサービス化した際の読み取り速度の懸念がある。
  2. 参照箇所が多すぎるため、全ての箇所で API 通信に切り替えるにはトランザクションの問題やスコープの洗い出し、バグのリスクが高い。

大規模な外科手術のごとく、全アプリケーションからデータ参照を洗い出して、マイクロサービスに置き換える作業はとってもリスキーであり、膨大な時間となることは明白でした。
懸念を解消する検証の時間もそこまで確保できなさそうだったので、現実的なラインを目指すことにします。

そこで、段階的なマイクロサービス移行方法を検討しました。

考えたのは、メッセージを受け取ってそのメッセージで各クライアント DB へ書き込みを行うサービスを作ることでした。(データを同期するサービス)

このアプローチでは、速度的なパフォーマンスの問題が解消される一方、全面的にマイクロサービスに移行できるわけではなく、一部移行・一部既存の状態は生まれてしまいます。

例えば、このマイクロサービスを利用したいアプリケーションが増えたときに、データ同期サービスの改修が発生してしまいます。マイクロサービスはクライアントアプリケーションの知識をゼロないし最小に留めるべきだと考えています。

構成でいうとこのようになります。

「せっかく共通の項目を共通 DB に移すのに、クライアント DB への参照を残す・・・?」

今回は「各アプリケーションとマイクロサービス間でデータの整合性を担保する」ことを目的としました。冗長さがパフォーマンスの利点を生むのだと捉えて、マイクロサービス DB への更新とともに、既存 DB への更新を行うことにしました。

ここでやっと本題です。

サービス間のメッセージング

社内でこれまで redis を使用してきたこともあり、「redis の pub/sub の仕組みを使ってうまくメッセージングできないか」と考えました。

Pub/Sub vs Streams

redis にはメッセージ処理に適している Pub/Sub の仕組みと、 Streams の仕組みがあります。

Pub/Sub より Streams が後発なだけあって、いくつかの点で優れているようです

  • Pub/Sub
    • メッセージは保存されないため、subscribe していない期間のメッセージを受け取る事ができない
      • Subscriber が存在するかどうかに関係なく、publish メッセージは揮発する
  • Streams
    • データを取得しても過去のデータを残せる
      • 非同期的に後からメッセージを受け取ることができる
      • Subscriber が死んでいたとしても復旧時にメッセージを回収できる
    • メモリの使用量に注意が必要

結論からいうと、Streams を採用しました。
いくつかの要件を満たすことができたためです。

Pub / Sub

redis-cliでのコマンドはとてもシンプルです。

127.0.0.1:36379> SUBSCRIBE channel [channel ...]
127.0.0.1:36379> PUBLISH channel message

参考: https://redis.io/topics/pubsub

Streams

Streams は Pub/Sub の後発で、Redis v5 から使えるようになりました。

XADDを使ってメッセージキューを追加します。

127.0.0.1:36379> XADD key ID field string [field string ...]
127.0.0.1:36379> XREAD [COUNT count] [BLOCK milliseconds] STREAMS key [key ...] ID [ID ...]

a-streamという Stream に上限 10 コのキューを作成し、message key に helloの value を入れてみます

127.0.0.1:36379> XADD a-stream MAXLEN 10 * message hello
"1639666330803-0"

このキューを読み取るには XREAD を使います。

127.0.0.1:36379> XREAD STREAMS a-stream 0
1) 1) "a-stream"
   2) 1) 1) "1639666330803-0"
         2) 1) "message"
            2) "hello"

STREAM <stream-name> <読み込むID>の形でコマンドを叩きます。
0 を指定すると先頭から読み込まれます。

127.0.0.1:36379> XADD a-stream MAXLEN 10 * message world
"1639666566585-0"

127.0.0.1:36379> XREAD STREAMS a-stream 0
1) 1) "a-stream"
   2) 1) 1) "1639666330803-0"
         2) 1) "message"
            2) "hello"
      2) 1) "1639666566585-0"
         2) 1) "message"
            2) "world"

127.0.0.1:36379> XREAD STREAMS a-stream 1639666330803-0
1) 1) "a-stream"
   2) 1) 1) "1639666566585-0"
         2) 1) "message"
            2) "world"

先程の Pub/Sub のように、キューを待ち受けるには、 BLOCK を使います

XREAD BLOCK 0 STREAMS a-stream $

ここでの $ は特殊な ID で、XREADを始めたタイミングから後に追加されたものを受け取るようになります。

一度、XADDで追加されたメッセージを受け取ったあと、すぐにプロセスが終了してしまいます。

この記事にある通り、「起動時には $ で待ち受け、それ以降は最後に受け取った ID を指定するというコード」でキューの動きを実現できます

async function consume(startId = "$") {
  let nextId = startId ? startId : "$";

  const res = await redis.xread(
    "BLOCK",
    "0",
    "COUNT",
    "20",
    "STREAMS",
    "stream-sample",
    startId
  );
  for (const [, val] of res) {
    for (const [id, messages] of val) {
      nextId = id;
      // 何かしらの処理
    }
  }

  await consume(nextId);
}

consume();
未読 / 既読管理

これまでの XADD XREAD でメッセージの受け取りはできましたが、複数プロセスで動いたときの挙動はどうなるでしょうか。

このように、同時に subscribe している状態となってしまい、同時に複数プロセスで扱うことは難しいです。

そこで、XGROUP / XREADGROUP コマンドを使います。

XREAD との違いは、先にXGROUPでグループを作る必要があることです。

127.0.0.1:36379> XGROUP CREATE a-stream mygroup01 $
OK

group1メンバーのAliceが他のメンバーによてtまだ読まれていないデータを読むコマンドです。

XREADGROUP GROUP group1 Alice STREAMS a-stream >

>は「他のコンシューマに送信されていない新しいメッセージ」を指定します。

XADDをいくつか叩いて、XREADGROUPを叩くと、未読を全件取得し、二度目は未読0件だったことがわかります。

先程のロジックと組み合わせると、次のようになります。

export async function subscribeStream() {
  // xreadgroupを使うためにgroupを作成する.
  await redis.xgroup('CREATE', 'a-stream', 'group1', '$', 'MKSTREAM').catch(() =>

  while (true) {
    // xreadgroupで未読のメッセージのみをsubscribeする
    const reply = await redis.xreadgroup('GROUP', 'group1', 'consumer', 'BLOCK', '0', 'COUNT', '0', 'STREAMS', 'a-stream', '>');

    for (const [, val] of reply) {
      for (const [id, messages] of val) {
        // 何かしらの処理
      }
    }
  }
}

MKSTREAMは「ストリームが存在しない場合、オプションの MKSTREAM サブコマンドを最後の引数として使用して、ストリームを自動的に作成する」オプションです。

エラーを回避するため、おまじない程度に指定しましたが、特に意識しなくてよいかもしれません。


直感的な Pub / Sub と異なり、Streams は機能が豊富な一方で、$やら>の特殊な記号があるため、体で覚えていく必要がありますね。

ぜひ興味が湧いたら触ってみてください!


文献

Discussion