💬
go で event bus
// Package eventbus は、Go アプリケーション向けのシンプルで型安全なイベントバスの実装を提供する。
// このパッケージは、ジェネリクス型を使用したイベントの発行と購読をサポートする。
package eventbus
import (
"context"
"errors"
"sync"
)
// Disposable は購読の終了を行う関数。
type Disposable interface {
Dispose()
}
type Disposer func()
func (d Disposer) Dispose() {
d()
}
// Handler は Event 型のイベントを処理する関数。
type Handler[Event any] func(ctx context.Context, event Event) error
// Publisher は Event 型のイベントを発行する関数。
type Publisher[Event any] func(ctx context.Context, event Event) error
// Subscriber は Event 型のイベントを購読する関数。
type Subscriber[Event any] func(handler Handler[Event]) (Disposable, error)
type handler interface {
canHandle(ctx context.Context, event any) bool
handle(ctx context.Context, event any) error
}
type handlerImpl struct {
canHandleFunc func(ctx context.Context, event any) bool
handleFunc func(ctx context.Context, event any) error
}
func (h handlerImpl) canHandle(ctx context.Context, event any) bool {
return h.canHandleFunc(ctx, event)
}
func (h handlerImpl) handle(ctx context.Context, event any) error {
return h.handleFunc(ctx, event)
}
// EventBus はイベントを発行し、購読を管理するイベントバス。
type EventBus struct {
handlers map[handler]struct{}
mu sync.RWMutex
}
// New は新しい EventBus を作成して返す。
func New() *EventBus {
return &EventBus{
handlers: make(map[handler]struct{}),
}
}
var (
// Default はデフォルトの EventBus インスタンス。
Default = New()
)
// publish は登録されたすべてのハンドラーにイベントを送信する。
// いずれかのハンドラーがエラーを返した場合、エラーを返す。
func (bus *EventBus) publish(ctx context.Context, event any) error {
bus.mu.RLock()
handlers := make([]handler, 0, len(bus.handlers))
for h := range bus.handlers {
handlers = append(handlers, h)
}
bus.mu.RUnlock()
var errs []error
for _, h := range handlers {
if h.canHandle(ctx, event) {
if err := h.handle(ctx, event); err != nil {
errs = append(errs, err)
}
}
}
if len(errs) > 0 {
var multiErr error
for _, err := range errs {
multiErr = errors.Join(multiErr, err)
}
return multiErr
}
return nil
}
// subscribe は新しいハンドラを EventBus に登録する。
// ハンドラーの登録を解除するために使用できる Disposable を返す。
func (bus *EventBus) subscribe(h handler) (Disposable, error) {
bus.mu.Lock()
defer bus.mu.Unlock()
bus.handlers[h] = struct{}{}
return Disposer(func() {
bus.mu.Lock()
defer bus.mu.Unlock()
delete(bus.handlers, h)
}), nil
}
// Publish は指定されたイベント型に対する Publisher 関数を返す。
// EventBus が指定されていない場合、デフォルトの EventBus を使用する。
func Publish[Event any](buses ...*EventBus) Publisher[Event] {
bus := resolveEventBus(buses...)
return func(ctx context.Context, event Event) error {
return bus.publish(ctx, event)
}
}
// Subscribe は指定されたイベント型に対する Subscriber 関数を返す。
// EventBus が指定されていない場合、デフォルトの EventBus を使用する。
func Subscribe[Event any](buses ...*EventBus) Subscriber[Event] {
bus := resolveEventBus(buses...)
return func(h Handler[Event]) (Disposable, error) {
wrapper := func(ctx context.Context, event any) error {
return h(ctx, event.(Event))
}
return bus.subscribe(&handlerImpl{
canHandleFunc: func(ctx context.Context, event any) bool {
_, ok := event.(Event)
return ok
},
handleFunc: wrapper,
})
}
}
// resolveEventBus は提供された EventBus スライスの最初の要素を返す。
// スライスが空の場合、デフォルトの EventBus を返す。
func resolveEventBus(buses ...*EventBus) *EventBus {
if len(buses) > 0 {
return buses[0]
} else {
return Default
}
}
test
package eventbus_test
import (
"context"
"errors"
"testing"
"github.com/stretchr/testify/assert"
"github.com/KiraboshiSys/Letas/internal/lib/eventbus"
)
type TestEvent struct {
Message string
}
func TestEventBus(t *testing.T) {
t.Parallel()
t.Run("Publish and Subscribe", func(t *testing.T) {
t.Parallel()
bus := eventbus.New()
ctx := context.Background()
var receivedEvent TestEvent
subscriber := eventbus.Subscribe[TestEvent](bus)
disposable, err := subscriber(func(ctx context.Context, event TestEvent) error {
receivedEvent = event
return nil
})
assert.NoError(t, err)
defer disposable.Dispose()
publisher := eventbus.Publish[TestEvent](bus)
err = publisher(ctx, TestEvent{Message: "Hello, World!"})
assert.NoError(t, err)
assert.Equal(t, "Hello, World!", receivedEvent.Message)
})
t.Run("Multiple Subscribers", func(t *testing.T) {
t.Parallel()
bus := eventbus.New()
ctx := context.Background()
count := 0
subscriber := eventbus.Subscribe[TestEvent](bus)
for i := 0; i < 3; i++ {
_, err := subscriber(func(ctx context.Context, event TestEvent) error {
count++
return nil
})
assert.NoError(t, err)
}
publisher := eventbus.Publish[TestEvent](bus)
err := publisher(ctx, TestEvent{Message: "Hello, World!"})
assert.NoError(t, err)
assert.Equal(t, 3, count)
})
t.Run("Error Handling", func(t *testing.T) {
t.Parallel()
bus := eventbus.New()
ctx := context.Background()
subscriber := eventbus.Subscribe[TestEvent](bus)
_, err := subscriber(func(ctx context.Context, event TestEvent) error {
return errors.New("handler error")
})
assert.NoError(t, err)
publisher := eventbus.Publish[TestEvent](bus)
err = publisher(ctx, TestEvent{Message: "Hello, World!"})
assert.Error(t, err)
assert.Contains(t, err.Error(), "handler error")
})
t.Run("Dispose Subscriber", func(t *testing.T) {
t.Parallel()
bus := eventbus.New()
ctx := context.Background()
count := 0
subscriber := eventbus.Subscribe[TestEvent](bus)
disposable, err := subscriber(func(ctx context.Context, event TestEvent) error {
count++
return nil
})
assert.NoError(t, err)
publisher := eventbus.Publish[TestEvent](bus)
err = publisher(ctx, TestEvent{Message: "Hello, World!"})
assert.NoError(t, err)
assert.Equal(t, 1, count)
disposable.Dispose()
err = publisher(ctx, TestEvent{Message: "Hello again!"})
assert.NoError(t, err)
assert.Equal(t, 1, count) // Count should not increase after disposal
})
t.Run("Default EventBus", func(t *testing.T) {
t.Parallel()
ctx := context.Background()
var receivedEvent TestEvent
subscriber := eventbus.Subscribe[TestEvent]()
disposable, err := subscriber(func(ctx context.Context, event TestEvent) error {
receivedEvent = event
return nil
})
assert.NoError(t, err)
defer disposable.Dispose()
publisher := eventbus.Publish[TestEvent]()
err = publisher(ctx, TestEvent{Message: "Hello, Default!"})
assert.NoError(t, err)
assert.Equal(t, "Hello, Default!", receivedEvent.Message)
})
}
Discussion