📡

PostgreSQL LISTEN/NOTIFY, Goを利用したリアルタイム配信

2024/03/27に公開

はじめに

本記事では、PostgreSQLのLISTEN/NOTIFY機能とGoを組み合わせた、メッセージをリアルタイム配信するための仕組み・実装を紹介します。

私たちが開発しているMiROHA eConsentでは本記事で紹介する仕組みを利用して、ユーザが見ている文書のページをリアルタイムに知らせる機能をリリースしました。

MiROHA eConsentは、治験業務支援サービス MiROHAのシステムの一部で 、治験の同意取得プロセスをオンラインのみで完結させることができるプロダクトです。医師とCRC[1]・被験者が同じルームに入室し、ビデオ通話を繋ぎながら治験に関する説明から同意署名、署名した文書のダウンロードまで一気通貫で行うことができます。

MiROHAの全体像
MiROHAシステムの全体図。医師/CRCと被験者の間で同意取得ができる機能を提供しているのがMiROHA eConsentになります。

プロダクトの課題として、「被験者の方が文書のどのページを閲覧しているかがわからず、説明がやりにくい」という声がありました。この課題を解決すべく、Google スライドのように閲覧ページのカーソルを表示できる機能を提供することに決めました。

この機能を実現するためにPostgreSQLのLISTEN/NOTIFYを利用しました。この機能の詳細およびGoでの使い方、その他の選択肢との比較を紹介します。

PostgreSQL LISTEN/NOTIFY

LISTEN, NOTIFYはそれぞれPostgreSQL標準で利用できるコマンドです。これを利用してPostgreSQLデータベースを通してリアルタイムなメッセージの送受信ができます。

簡単な例をpsqlで試してみましょう。2つのターミナルのセッション(Alice, Bobとします)でpsqlを実行して、PostgreSQLに接続します。

まずAliceが適当なチャンネルを指定してLISTENを実行します。

-- Alice
LISTEN foo_channel;

次にBobがそのチャンネルに対して、'Hello!'というペイロードでNOTIFYを実行します。

-- Bob
NOTIFY foo_channel, 'Hello!';

そしてAliceでコマンド空打ち(;)すると、受け取ったメッセージの確認ができます。

-- Alice
;
-- Asynchronous notification "foo_channel" with payload "Hello!" received from server process with PID 5963.

psqlの場合は入力待ちが発生するため空打ちを行わないと確認できませんが、各言語のクライアントを利用すれば即時通知を受け取ることが可能です。

なお、LISTEN foo_channel;を実行しているセッションが複数存在すればそのすべてに通知が届き、LISTENしているセッションが存在しなければどこにも通知は届きません。

また、PG_NOTIFYを利用すればチャンネル名・ペイロードとして動的な値が渡せます。

-- Bob
SELECT PG_NOTIFY('foo_' || 'channel', 'Hello' || 'World!');

-- Alice
;
-- Asynchronous notification "foo_channel" with payload "HelloWorld!" received from server process with PID 5963.

JSON_BUILD_OBJECTを利用すればJSONをペイロードとして送ることも可能です。

-- Bob
SELECT PG_NOTIFY('foo_channel', JSON_BUILD_OBJECT('hoge', 'fuga', 'bar', 12345)::text);

-- Alice
;
-- Asynchronous notification "foo_channel" with payload "{"hoge" : "fuga", "bar" : 12345}" received from server process with PID 5963.

なお、ペイロードは8,000 bytes未満までという制限があるので注意です。

実装例

PostgreSQL LISTEN/NOTIFYとGoを組み合わせた実装例を紹介します。サンプルコードはこちらに置いております。

https://github.com/abekoh/pg_notify_sample

紹介する例は、WebSocketを経由してJSONを相互に送受信できるアプリケーションです。指定したルーム内でJSONを、チャットのように複数ユーザ同士が送受信できます。

以下は動作させた例です。ターミナルでWebSocketの通信するためwebsocatを使ってます。

# Alice
websocat 'ws://localhost:8080/ws?room_id=e5c87218-676b-4185-9335-3fe48eff6a38'
{"foo", "bar"} # JSONを入力
# Bob
websocat 'ws://localhost:8080/ws?room_id=e5c87218-676b-4185-9335-3fe48eff6a38'
# {"foo", "bar"} # JSONを受け取る

全体像

全体像は次のようなイメージです。

Overview

PostgreSQLのTRIGGERの仕組みと、Goの4つのgoroutine(Writer, Reader, Listener, Dispatcher)が作用し合う仕組みになります。通信開始→メッセージ送受信→切断の流れは次の通りです。

  1. 通信開始
    1. Alice, BobがWebSocket通信を開始
    2. ReaderのchannelをDispatcherに登録(register)
    3. WebSocket接続ごとにWriter, Readerが開始される(便宜上Alice,Bobの所有物とする)
  2. メッセージ送受信
    1. Aliceがメッセージを書き込む
    2. AliceのWriterがeventsにINSERTを実行
    3. TRIGGER実行、new_eventsにNOTIFYで通知
    4. Listenerがnew_eventsをLISTEN→Dispatcherにchannelで通知
    5. DispatcherがReaderにchannelで通知
    6. BobのReaderがeventsをSELECTしてメッセージ取得
    7. Bobがメッセージを受信
  3. 切断
    1. Alice, BobがWebSocket通信を切断
    2. ReaderのchannelをDispatcherから登録解除(unregister)

詳細のデータベース定義、コードを見ていきましょう。

データベース定義

CREATE TABLE IF NOT EXISTS events (
    event_id uuid PRIMARY KEY,
    room_id uuid NOT NULL,
    client_id uuid NOT NULL,
    message jsonb NOT NULL,
    created_at timestamp with time zone DEFAULT now()
);

CREATE OR REPLACE FUNCTION notify_event ()
    RETURNS TRIGGER
    AS $$
DECLARE
BEGIN
    PERFORM
        PG_NOTIFY('new_events', JSON_BUILD_OBJECT('event_id', NEW.event_id, 'room_id', NEW.room_id, 'client_id', NEW.client_id)::text);
    RETURN NEW;
END;
$$
LANGUAGE plpgsql;

CREATE OR REPLACE TRIGGER notify_event_trigger
    AFTER INSERT ON events
    FOR EACH ROW
    EXECUTE FUNCTION notify_event ();

イベント、ルーム、クライアントそれぞれのIDを設定可能にしてます。メッセージはjsonb型でスキーマレスにしております。

通知はIDの情報のみで、messageは含めておりません。前述の通りNOTIFYのペイロードには8,000 bytesの制限があるため、メッセージの大きさに懸念がある場合は通知を受け取った側で取得するようにします。

Listener

listener := &pgxlisten.Listener{
	Connect: func(ctx context.Context) (*pgx.Conn, error) {
		c, err := db.Acquire(ctx)
		if err != nil {
			return nil, err
		}
		return c.Conn(), nil
	},
}
newEventCh := make(chan NewEventsPayload, 10)
listener.Handle("new_events", pgxlisten.HandlerFunc(
	func(ctx context.Context, notification *pgconn.Notification, conn *pgx.Conn) error {
		var payload NewEventsPayload
		if err := json.Unmarshal([]byte(notification.Payload), &payload); err != nil {
			return fmt.Errorf("unmarshal payload: %w", err)
		}
		newEventCh <- payload
		return nil
	}),
)

// Listener
go func() {
	slog.Info("listening from new_events channel")
	if err := listener.Listen(ctx); err != nil {
		slog.Error("failed to listen", "error", err)
	}
}()

PostgreSQLに対してLISTENを実行する箇所です。今回は jackc/pgx/v5, jackc/pgxlisten を利用しております。HTTPサーバのようにハンドラ定義・Listen実行という形で実装できます。

LISTENは1プロセス内で1つのコネクションのみで行うようにしております。「ルームごとにPostgreSQLの通知チャンネルを定義→LISTENする」という実装にしてしまうとコネクションを使い切ってしまい、他のクエリを一切受け付けなくなってしまうのでこのような工夫が必要です。

Dispatcher

// Dispatcher
go func() {
	roomClientMap := make(map[RoomID]map[ClientID]chan<- EventID)
	for {
		select {
		case <-ctx.Done():
			return
		case registerReq := <-registerCh:
			slog.Info("client registered", "client_id", registerReq.ClientID, "room_id", registerReq.RoomID)
			clientMap, ok := roomClientMap[registerReq.RoomID]
			if !ok {
				roomClientMap[registerReq.RoomID] = make(map[ClientID]chan<- EventID)
				clientMap = roomClientMap[registerReq.RoomID]
			}
			clientMap[registerReq.ClientID] = registerReq.Ch
		case unregisterReq := <-unregisterCh:
			slog.Info("client unregistered", "client_id", unregisterReq.ClientID, "room_id", unregisterReq.RoomID)
			clientMap, ok := roomClientMap[unregisterReq.RoomID]
			if !ok {
				continue
			}
			delete(clientMap, unregisterReq.ClientID)
			if len(clientMap) == 0 {
				delete(roomClientMap, unregisterReq.RoomID)
			}
		case payload := <-newEventCh:
			slog.Info("notify event", "event_id", payload.EventID, "client_id", payload.ClientID, "room_id", payload.RoomID)
			clientMap, ok := roomClientMap[payload.RoomID]
			if !ok {
				continue
			}
			for clientID, ch := range clientMap {
				if clientID != payload.ClientID {
					select {
					case ch <- payload.EventID:
					default:
						slog.Warn("failed to send event to client", "event_id", payload.EventID, "client_id", clientID, "room_id", payload.RoomID)
					}
				}
			}
		}
	}
}()

Dispatcherでは、Readerに送信するchannelの登録・解除、およびListenerから受け取った通知のReaderへの振り分けを行います。

roomClientMapというマップを定義し、それぞれのルーム内のクライアントのchannelを取得できるようにしています。今回は送信者自身にはメッセージを送らないようにしているため、その分岐も含まれます。登録・解除・通知すべてを1つのgoroutineに収めることで、ロックを取らずに処理することが可能になります。

Readerへの通知の箇所について、もしchannelが詰まっている場合は送信失敗、Warningログを出すようにしています。メッセージを確実に送信したい場合はNGですが、ロストが許される場合はこういった逃げも可能です。

Writer / Reader

clientID := ClientID(uuid.NewString())
slog.Info("client connected", "remote_addr", r.RemoteAddr, "client_id", clientID)

receiveCh := make(chan EventID, 100)
registerCh <- RegisterRequest{RoomID: roomID, ClientID: clientID, Ch: receiveCh}

ctx, cancel := context.WithCancel(context.Background())

// Writer
go func() {
	defer func() {
		cancel()
		wsConn.Close()
		close(receiveCh)
		unregisterCh <- UnregisterRequest{RoomID: roomID, ClientID: clientID}
		slog.Info("client disconnected", "client_id", clientID)
	}()
	for {
		_, msg, err := wsConn.ReadMessage()
		if err != nil {
			if websocket.IsUnexpectedCloseError(err, websocket.CloseGoingAway, websocket.CloseAbnormalClosure) {
				slog.Error("unexpected close error", "error", err)
			}
			break
		}

		var data any
		if err := json.Unmarshal(msg, &data); err != nil {
			slog.Error("failed to unmarshal message", "error", err)
			continue
		}

		eventID := EventID(uuid.NewString())
		slog.Debug("write event", "event_id", eventID, "client_id", clientID, "room_id", roomID)

		if _, err := db.Exec(ctx, `INSERT INTO events (event_id, room_id, client_id, message) VALUES ($1, $2, $3, $4)`,
			eventID, roomID, clientID, msg); err != nil {
			slog.Error("failed to insert event", "error", err, "event_id", eventID, "client_id", clientID, "room_id", roomID)
			continue
		}
	}
}()
// Reader
go func() {
	for {
		select {
		case <-ctx.Done():
			return
		case eventID, ok := <-receiveCh:
			if !ok {
				break
			}
			var msg json.RawMessage
			if err := db.QueryRow(ctx, `SELECT message FROM events WHERE event_id = $1`, eventID).Scan(&msg); err != nil {
				slog.Error("failed to query event", "error", err, "event_id", eventID, "client_id", clientID, "room_id", roomID)
				continue
			}
			slog.Debug("read message", "event_id", eventID, "client_id", clientID, "room_id", roomID)
			if err := wsConn.WriteMessage(websocket.TextMessage, msg); err != nil {
				slog.Error("failed to write message", "error", err, "event_id", eventID, "client_id", clientID, "room_id", roomID)
				break
			}
		}
	}
}()

Writer, Readerはクライアントごとに固有のものです。WebSocket接続開始時、最初にDispatcherに対して登録を依頼、その後にWriter, Readerが生成・実行されます。

WriterはWebSocketのメッセージを受取、INSERTでデータを挿入する働きを行います。また、WebSocket切断時の処理もここに含めてます。

ReaderはDispatcherから通知を受信、メッセージ本体を取得、WebSocketを通してユーザに返却しています。

他の選択肢との比較

最後に、他に取り得る選択肢の紹介と、それらの比較について述べます。

1プロセスのみで動かす

安直ですがシンプルな案として、Goアプリケーション1プロセスで動かすことが挙げられます。プロセス内でgoroutineのやり取りだけの簡単な実装です。

しかし、冗長化されていないため可用性が著しく低い構成になります。いつサーバダウンしても構わないレベルのシステムであればよいですが、多くの場合は許されないでしょう。

Redis Pub/Sub

今回と同様のケースで多く考えられる選択肢として、Redisを採用することが挙げられます。Redis Pub/Subの機能を用いることで、まさにPostgreSQL LISTEN/NOTIFYと同様のことが可能です。詳細は省きますが、大量のリクエストが予想されるシステムにも適用しやすい選択肢となります。

Redisの採用は有効である反面、ミドルウェアの管理コスト・クラウドの金銭的コストを受け入れる必要があります。

ミドルウェアが増えると、もちろんその環境設定・アプリケーションに追加する接続設定など、管理するために考えるところが増えます。また、本番運用に乗せると監視設定も追加する必要があります。パフォーマンス要件のため必須であれば投資せざるを得ないですが、そうでない場合は余計な負担が増えるだけです。

また、クラウドのマネージドサービスを利用する場合は金銭的コストも無視できません。例えばAmazon ElastiCache for Redisのサーバレスを利用する場合、最低でも毎月$90程度かかります。それが本当に必要な費用であるか、慎重に検討しなければなりません。

比較

その他、AWS環境でAmazon SNS, AWS AppSyncなどを利用するパターンも考えられましたが、煩雑で本題から逸れそうなので割愛します。他に良いアイデアがあればコメントいただけると嬉しいです。

上記を踏まえて、PostgreSQL LISTEN/NOTIFYを利用する優位性は以下が挙げられます。

  • 可用性を落とすことなく実装できる。
    • 例えばAmazon RDSなどのマネージドサービスを利用している場合、そのSLAレベルに依存した状態を保つことができる。
  • 管理コスト・金銭的コストが増大しすぎない。
    • もちろんメッセージをロストせず送受信できているか、コネクションが飽和しないかなど気をつけなければならない点は増える。
  • データのスキーマ定義などPostgreSQLのエコシステムをそのまま利用できる。

一方で、以下の注意点が挙げられます。

  • スケールが困難。リクエストの増大が予想されるシステムであれば採用はおすすめしない。
  • 筆者が観測する限りマイナーな機能であるため、知見があまり存在しない。

MiROHA eConsentは同時リクエストがそれほど予想されず、爆発的に増大することもないシステムで、管理コストも抑えめにしたいということからこの選定が最適だと判断しました。採用する際は要件に合うのか綿密に調査・実験することをおすすめします。

まとめ

PostgreSQLのLISTEN/NOTIFY機能とGoを組み合わせた、メッセージをリアルタイム配信するための仕組み・実装を紹介しました。同様の機能の開発を検討する際、参考になれば幸いです。

今回の例では素のWebSocketを利用しましたが、MiROHA eConsentではGraphQL Subscription / Server-Sent Eventsを利用するなど、他にも工夫した点があります。それも折を見て紹介できればと思います。

参考


MICINではメンバーを大募集しています。
「とりあえず話を聞いてみたい」でも大歓迎ですので、お気軽にご応募ください!

https://recruit.micin.jp/

脚注
  1. CRCとは、Clinical Research Coordinatorの略で、治験コーディネーターとも呼ばれています。 医療機関において、治験責任医師・分担医師の指示のもとに、医学的判断を伴わない業務や、治験に係わる事務的業務、業務を行うチーム内の調整等、治験業務全般をサポートします。 ( https://www.jasmo.org/recruit/job/ より引用 ) ↩︎

株式会社MICIN

Discussion