💫

マイクロサービスを支えるメッセージング技術:Redis Streams

6 min read

はじめに

この記事は、airCloset Advent Calendar 2021 16日目の記事です!
ここ最近は、複数サービス間の連携やマイクロサービスの設計に関わらせてもらうことが多く、そこでの技術選定について紹介させていただきます。

実現したいこと

さて、やりたいことは次のことでした。

  • ユーザーの認証基盤を外だしし、複数サービスで共通のアカウントでログインできるようにする
  • 複数サービスで同じ項目のデータを、共通化したい

※認証基盤の話はここでは割愛します。

解決したい問題

このアプローチとして、共通のAPI/DBを使って外だしすることを検討しました。いわゆるマイクロサービスです。

「よっしゃ、API作るでー!」と、気合を入れた矢先、次のような懸念を抱えました

  1. パフォーマンス: ユーザー関連のアクセスはマイクロサービス化した際の読み取り速度の懸念がある。
  2. 参照箇所が多すぎるため、全ての箇所でAPI通信に切り替えるにはトランザクションの問題やスコープの洗い出し、バグのリスクが高い。

大規模な外科手術のごとく、全アプリケーションからデータ参照を洗い出して、マイクロサービスに置き換える作業はとってもリスキーであり、膨大な時間となることは明白でした。
懸念を解消する検証の時間もそこまで確保できなさそうだったので、現実的なラインを目指すことにします。

そこで、段階的なマイクロサービス移行方法を検討しました。

考えたのは、メッセージを受け取ってそのメッセージで各クライアントDBへ書き込みを行うサービスを作ることでした。(データを同期するサービス)

このアプローチでは、速度的なパフォーマンスの問題が解消される一方、全面的にマイクロサービスに移行できるわけではなく、一部移行・一部既存の状態は生まれてしまいます。

例えば、このマイクロサービスを利用したいアプリケーションが増えたときに、データ同期サービスの改修が発生してしまいます。マイクロサービスはクライアントアプリケーションの知識をゼロないし最小に留めるべきだと考えています。

構成でいうとこのようになります。

「せっかく共通の項目を共通DBに移すのに、クライアントDBへの参照を残す・・・?」

今回は「各アプリケーションとマイクロサービス間でデータの整合性を担保する」ことを目的としました。冗長さがパフォーマンスの利点を生むのだと捉えて、マイクロサービスDBへの更新とともに、既存DBへの更新を行うことにしました。

ここでやっと本題です。

サービス間のメッセージング

社内でこれまでredisを使用してきたこともあり、「redisのpub/subの仕組みを使ってうまくメッセージングできないか」と考えました。

Pub/Sub vs Streams

redisにはメッセージ処理に適している Pub/Sub の仕組みと、 Streams の仕組みがあります。

Pub/SubよりStreamsが後発なだけあって、いくつかの点で優れているようです

  • Pub/Sub
    • メッセージは保存されないため、subscribeしていない期間のメッセージを受け取る事ができない
      • Subscriberが存在するかどうかに関係なく、publishメッセージは揮発する
  • Streams
    • データを取得しても過去のデータを残せる
      • 非同期的に後からメッセージを受け取ることができる
      • Subscriberが死んでいたとしても復旧時にメッセージを回収できる
    • メモリの使用量に注意が必要

結論からいうと、Streamsを採用しました。
いくつかの要件を満たすことができたためです。

Pub / Sub

redis-cliでのコマンドはとてもシンプルです。

127.0.0.1:36379> SUBSCRIBE channel [channel ...]
127.0.0.1:36379> PUBLISH channel message

参考: https://redis.io/topics/pubsub

Streams

StreamsはPub/Subの後発で、Redis v5から使えるようになりました。

XADDを使ってメッセージキューを追加します。

127.0.0.1:36379> XADD key ID field string [field string ...]
127.0.0.1:36379> XREAD [COUNT count] [BLOCK milliseconds] STREAMS key [key ...] ID [ID ...]

a-streamというStreamに上限10コのキューを作成し、message key に helloの value を入れてみます

127.0.0.1:36379> XADD a-stream MAXLEN 10 * message hello
"1639666330803-0"

このキューを読み取るには XREAD を使います。

127.0.0.1:36379> XREAD STREAMS a-stream 0
1) 1) "a-stream"
   2) 1) 1) "1639666330803-0"
         2) 1) "message"
            2) "hello"

STREAM <stream-name> <読み込むID>の形でコマンドを叩きます。
0を指定すると先頭から読み込まれます。

127.0.0.1:36379> XADD a-stream MAXLEN 10 * message world
"1639666566585-0"

127.0.0.1:36379> XREAD STREAMS a-stream 0
1) 1) "a-stream"
   2) 1) 1) "1639666330803-0"
         2) 1) "message"
            2) "hello"
      2) 1) "1639666566585-0"
         2) 1) "message"
            2) "world"
	    
127.0.0.1:36379> XREAD STREAMS a-stream 1639666330803-0
1) 1) "a-stream"
   2) 1) 1) "1639666566585-0"
         2) 1) "message"
            2) "world"

先程のPub/Subのように、キューを待ち受けるには、 BLOCK を使います

XREAD BLOCK 0 STREAMS a-stream $

ここでの $ は特殊なIDで、XREADを始めたタイミングから後に追加されたものを受け取るようになります。

一度、XADDで追加されたメッセージを受け取ったあと、すぐにプロセスが終了してしまいます。

この記事にある通り、「起動時には $ で待ち受け、それ以降は最後に受け取った ID を指定するというコード」でキューの動きを実現できます

async function consume(startId = '$') {
  let nextId = startId ? startId : '$'

  const res = await redis.xread('BLOCK', '0', 'COUNT', '20', 'STREAMS', 'stream-sample', startId)
  for (const [, val] of res) {
    for (const [id, messages] of val) {
      nextId = id
      // 何かしらの処理
    }
  }

  await consume(nextId)
}

consume()
未読 / 既読管理

これまでの XADD XREAD でメッセージの受け取りはできましたが、複数プロセスで動いたときの挙動はどうなるでしょうか。

このように、同時にsubscribeしている状態となってしまい、同時に複数プロセスで扱うことは難しいです。

そこで、XGROUP / XREADGROUP コマンドを使います。

XREADとの違いは、先にXGROUPでグループを作る必要があることです。

127.0.0.1:36379> XGROUP CREATE a-stream mygroup01 $
OK

group1メンバーのAliceが他のメンバーによてtまだ読まれていないデータを読むコマンドです。

XREADGROUP GROUP group1 Alice STREAMS a-stream >

>は「他のコンシューマに送信されていない新しいメッセージ」を指定します。

XADDをいくつか叩いて、XREADGROUPを叩くと、未読を全件取得し、二度目は未読0件だったことがわかります。

先程のロジックと組み合わせると、次のようになります。

export async function subscribeStream() {
  // xreadgroupを使うためにgroupを作成する.
  await redis.xgroup('CREATE', 'a-stream', 'group1', '$', 'MKSTREAM').catch(() => 

  while (true) {
    // xreadgroupで未読のメッセージのみをsubscribeする
    const reply = await redis.xreadgroup('GROUP', 'group1', 'consumer', 'BLOCK', '0', 'COUNT', '0', 'STREAMS', 'a-stream', '>');

    for (const [, val] of reply) {
      for (const [id, messages] of val) {
        // 何かしらの処理
      }
    }
  }
}

MKSTREAMは「ストリームが存在しない場合、オプションのMKSTREAMサブコマンドを最後の引数として使用して、ストリームを自動的に作成する」オプションです。

エラーを回避するため、おまじない程度に指定しましたが、特に意識しなくてよいかもしれません。


直感的なPub / Sub と異なり、Streamsは機能が豊富な一方で、$やら>の特殊な記号があるため、体で覚えていく必要がありますね。

ぜひ興味が湧いたら触ってみてください!


文献

Discussion

ログインするとコメントできます