Go + Watermillで実装するイベント駆動通信
はじめに
こんにちは、Lapi(@dragoneena12)です。
最近開発しているプロダクトではイベント駆動による通信パターンを採用しています。
その中で、Goでイベント駆動通信を実装するためのライブラリ Watermill を使った実装をしました。意外とWatermillについて日本語で説明されている記事が少なく苦労したので、今回は試行錯誤の中で学んだWatermillの仕組みや実装例について簡単に説明します。
イベント駆動通信とは
イベント駆動通信とは、ドメイン駆動設計におけるBounded Contextや、マイクロサービス間の通信手法としてよく用いられるパターンです。他に代表的なパターンとしてはリクエスト/レスポンスによる同期/非同期通信があります。
リクエスト/レスポンスによる通信では呼び出し側が呼び出し先のサービスについて知っている必要があります。対してドメイン駆動設計では発行元はあくまでイベント情報をブロードキャストするだけで、それを誰が読んでどう処理したかについては関知しないため、別システムの知識を入れる必要がなくなるという点が最大の特徴です。これによって各サービスが疎結合になり、サービス内部で持つ複雑性を抑えられるほか、自律したチームでの開発が行いやすくなるというメリットがあります。
対してイベント駆動通信には単なるリクエスト/レスポンス型通信になかった多くの課題も存在します。まずイベント情報をブロードキャストするためにキューやメッセージブローカーなどの何らかの仕組みが追加で必要となります。さらにイベントは非同期で処理されることとなるため、非同期処理の様々な問題が発生します。例えばイベントが正常に処理されなかった場合はイベント情報をデッドレターキューに移すなど、通知や再実行の仕組みを用意する必要があります。実際のアプリケーション設計では、このようなトレードオフを考慮して設計する必要があります。
Watermill とは
Watermillはポーランドの企業Three Dots Labsによって開発されている、イベント駆動通信のためのGoライブラリです。イベント駆動通信におけるメッセージ伝達を簡単に扱えるようになるAPIを提供しています。
Watermillは Publisher & Subscriber, Router, CQRS という3つのレベルでAPIを提供しています。開発者のニーズに応じて、好きなレベルのAPIを利用可能です。私たちのチームでは Publisher & Subscriber と Router のAPIを利用しています。今回の記事ではPublisher & Subscriber と Router のAPIについて解説します。
また実際にメッセージを送るバックエンドとしてもKafka, RabbitMQ, PostgreSQL, GoChannelなどさまざまなPub/Subを利用でき、それらをすべて共通のAPIで実装できることが強みとなっています。
Publisher & Subscriber API
Publisher & Subscriber APIは以下のようなインターフェースで定義されています。特定のトピックに対し、メッセージをPublish/Subscribeできます。
type Publisher interface {
Publish(topic string, messages ...*Message) error
Close() error
}
type Subscriber interface {
Subscribe(ctx context.Context, topic string) (<-chan *Message, error)
Close() error
}
実際の挙動は各実装によって異なりますが、本記事ではPostgreSQL Pub/Subを使って解説します。
topic名 sample_topic のPublisher/Subscriberを作成し、AutoInitializeSchema / InitializeSchema を指定した上でメッセージをやりとりすると、以下の2テーブルが作成されます。
Schema | Name | Type | Owner
--------+-----------------------------------+----------+------------
public | watermill_offsets_sample_topic | table | request_wf
public | watermill_sample_topic | table | request_wf
発行されたイベントは watermill_sample_topic テーブルに追加されていきます。offsetは BIGSERIAL 型が指定されており、自動増分する値となっています。uuid, payload, metadata はMessageに指定した内容が保存されます。 created_at および transaction_id はPublish時に自動で付与されます。
offset | uuid | created_at | payload | metadata | transaction_id
--------+--------------------------------------+----------------------------+---------+----------+----------------
1 | e1eb2143-f32d-4226-9d2d-e4880a07e225 | 2026-01-13 02:24:43.672182 | {...} | {...} | 562797
...
watermill_offsets_sample_topic には下記のようにConsumerGroupごとに offset_acked と last_processed_transaction_id が保存されています。対象のConsumerGroupのSubscriberに対し Message.Ack() を実行すると offset_acked を更新し、対象のメッセージを処理済みとしてマークします。つまりメッセージは残り続ける形になるため、過去に発生したイベントのすべては履歴として watermill_sample_topic テーブルに蓄積されていきます。
consumer_group | offset_acked | last_processed_transaction_id
----------------+--------------+-------------------------------
ConsumerGroupA | 8 | 562845
ConsumerGroupB | 8 | 562845
発行された1つのメッセージを複数のSubscriberから利用したい場合は、ConsumerGroupを分けたSubscriberを複数作成すれば、それぞれのSubscriberからすべてのメッセージを独立に購読できます。
Router API
Publisher & Subscriber APIだけでもイベント駆動通信を実現可能ですが、Watermillではさらに高レイヤーの仕組みとしてRouter APIを提供しています。 Router APIは各SubscriberをHandlerとしてラップし、これによって各種ミドルウェア を仕込むことができるようになります。ミドルウェアの例としては以下のようなものがあります。
- Poison:処理不可能なメッセージがあった場合にそれをPoisonMessageとして別トピックに再発行する。
- Delay On Error:メッセージの処理がエラーとなった際に次回のメッセージ処理を遅延させる。対応しているPub/Subでのみ利用可。
- Timeout:メッセージの処理にタイムアウトを追加する。
Handlerには Handler と NoPublisherHandler の2種類があります。Handlerから新たにメッセージを発行する場合には前者を、発行する必要がない場合には後者を利用します。
Requeuer
1つ発展的なトピックとして Requeuer を紹介します。Watermillでは通常エラーになったメッセージは同じトピック・ConsumerGroupの他のメッセージをブロックし続けます。Poisonミドルウェアなどを利用することでこのようなメッセージをこのようなメッセージをPoisonトピックに退避させることができます。ここでRequeuerを利用すると、Poisonトピックに退避したメッセージを再度元のトピックに流すことができます。これによって他のメッセージをブロックせずにエラーになったメッセージの処理をリトライできます。
PostgreSQLを利用する場合には PostgreSQLDelayedRequeuer を利用することでこのような仕組みを簡単に実装できます。 以下はそのコード例の一部抜粋です。
// DelayedRequeuer用のPublisherを作成
watermillPublisher, err := sql.NewPublisher(
sql.BeginnerFromStdSQL(db),
sql.PublisherConfig{
SchemaAdapter: sql.DefaultPostgreSQLSchema{},
AutoInitializeSchema: false,
},
watermillLogger,
)
if err != nil {
return nil, err
}
delayedRequeuer, err := sql.NewPostgreSQLDelayedRequeuer(sql.DelayedRequeuerConfig{
DB: sql.BeginnerFromStdSQL(db),
Publisher: watermillPublisher,
// 最初は10秒、最大3分まで間隔を2倍に伸ばしながらリトライする
DelayOnError: &middleware.DelayOnError{
InitialInterval: 10 * time.Second,
MaxInterval: 3 * time.Minute,
Multiplier: 2,
},
})
if err != nil {
return nil, err
}
r, err := message.NewRouter(message.RouterConfig{}, watermillLogger)
if err != nil {
return nil, err
}
r.AddMiddleware(delayedRequeuer.Middleware()...)
wg.Add(1)
go func() {
defer wg.Done()
if err := r.requeuer.Run(ctx); err != nil {
slog.Error("Requeuer Error")
}
}()
wg.Add(1)
go func() {
defer wg.Done()
if err := r.router.Run(ctx); err != nil {
slog.Error("Router Error")
}
}()
wg.Wait()
おわりに
Message, Publisher, Subscriberなどの基本的なAPIから使い始めることができるので導入しやすいライブラリだなと感じました。今回はPostgreSQL Pub/Subを利用しましたが、テーブル設計やConsumerGroupの設定などをWatermillのデフォルト実装を用いることで素早く実装できるため、自前で実装するよりも手っ取り早くイベント駆動通信を導入したい方にはおすすめです(もちろんカスタマイズも可能です)。
Watermillについて解説されている日本語記事がほとんどなかったので今回の記事を書いてみました。もし誰かのお役に立ったなら幸いです。Goでイベント駆動通信を実現する他のアプローチやライブラリについても聞いてみたいので、経験のある方がいましたらぜひ教えてください!
参考文献
Discussion