💬

go で event bus

2024/10/01に公開
// Package eventbus は、Go アプリケーション向けのシンプルで型安全なイベントバスの実装を提供する。
// このパッケージは、ジェネリクス型を使用したイベントの発行と購読をサポートする。
package eventbus

import (
	"context"
	"errors"
	"sync"
)

// Disposable は購読の終了を行う関数。
type Disposable interface {
	Dispose()
}

type Disposer func()

func (d Disposer) Dispose() {
	d()
}

// Handler は Event 型のイベントを処理する関数。
type Handler[Event any] func(ctx context.Context, event Event) error

// Publisher は Event 型のイベントを発行する関数。
type Publisher[Event any] func(ctx context.Context, event Event) error

// Subscriber は Event 型のイベントを購読する関数。
type Subscriber[Event any] func(handler Handler[Event]) (Disposable, error)

type handler interface {
	canHandle(ctx context.Context, event any) bool
	handle(ctx context.Context, event any) error
}

type handlerImpl struct {
	canHandleFunc func(ctx context.Context, event any) bool
	handleFunc    func(ctx context.Context, event any) error
}

func (h handlerImpl) canHandle(ctx context.Context, event any) bool {
	return h.canHandleFunc(ctx, event)
}

func (h handlerImpl) handle(ctx context.Context, event any) error {
	return h.handleFunc(ctx, event)
}

// EventBus はイベントを発行し、購読を管理するイベントバス。
type EventBus struct {
	handlers map[handler]struct{}
	mu       sync.RWMutex
}

// New は新しい EventBus を作成して返す。
func New() *EventBus {
	return &EventBus{
		handlers: make(map[handler]struct{}),
	}
}

var (
	// Default はデフォルトの EventBus インスタンス。
	Default = New()
)

// publish は登録されたすべてのハンドラーにイベントを送信する。
// いずれかのハンドラーがエラーを返した場合、エラーを返す。
func (bus *EventBus) publish(ctx context.Context, event any) error {
	bus.mu.RLock()
	handlers := make([]handler, 0, len(bus.handlers))
	for h := range bus.handlers {
		handlers = append(handlers, h)
	}
	bus.mu.RUnlock()

	var errs []error
	for _, h := range handlers {
		if h.canHandle(ctx, event) {
			if err := h.handle(ctx, event); err != nil {
				errs = append(errs, err)
			}
		}
	}
	if len(errs) > 0 {
		var multiErr error
		for _, err := range errs {
			multiErr = errors.Join(multiErr, err)
		}
		return multiErr
	}

	return nil
}

// subscribe は新しいハンドラを EventBus に登録する。
// ハンドラーの登録を解除するために使用できる Disposable を返す。
func (bus *EventBus) subscribe(h handler) (Disposable, error) {
	bus.mu.Lock()
	defer bus.mu.Unlock()
	bus.handlers[h] = struct{}{}
	return Disposer(func() {
		bus.mu.Lock()
		defer bus.mu.Unlock()
		delete(bus.handlers, h)
	}), nil
}

// Publish は指定されたイベント型に対する Publisher 関数を返す。
// EventBus が指定されていない場合、デフォルトの EventBus を使用する。
func Publish[Event any](buses ...*EventBus) Publisher[Event] {
	bus := resolveEventBus(buses...)
	return func(ctx context.Context, event Event) error {
		return bus.publish(ctx, event)
	}
}

// Subscribe は指定されたイベント型に対する Subscriber 関数を返す。
// EventBus が指定されていない場合、デフォルトの EventBus を使用する。
func Subscribe[Event any](buses ...*EventBus) Subscriber[Event] {
	bus := resolveEventBus(buses...)
	return func(h Handler[Event]) (Disposable, error) {
		wrapper := func(ctx context.Context, event any) error {
			return h(ctx, event.(Event))
		}
		return bus.subscribe(&handlerImpl{
			canHandleFunc: func(ctx context.Context, event any) bool {
				_, ok := event.(Event)
				return ok
			},
			handleFunc: wrapper,
		})
	}
}

// resolveEventBus は提供された EventBus スライスの最初の要素を返す。
// スライスが空の場合、デフォルトの EventBus を返す。
func resolveEventBus(buses ...*EventBus) *EventBus {
	if len(buses) > 0 {
		return buses[0]
	} else {
		return Default
	}
}

test

package eventbus_test

import (
	"context"
	"errors"
	"testing"

	"github.com/stretchr/testify/assert"

	"github.com/KiraboshiSys/Letas/internal/lib/eventbus"
)

type TestEvent struct {
	Message string
}

func TestEventBus(t *testing.T) {
	t.Parallel()

	t.Run("Publish and Subscribe", func(t *testing.T) {
		t.Parallel()

		bus := eventbus.New()
		ctx := context.Background()

		var receivedEvent TestEvent
		subscriber := eventbus.Subscribe[TestEvent](bus)
		disposable, err := subscriber(func(ctx context.Context, event TestEvent) error {
			receivedEvent = event
			return nil
		})
		assert.NoError(t, err)
		defer disposable.Dispose()

		publisher := eventbus.Publish[TestEvent](bus)
		err = publisher(ctx, TestEvent{Message: "Hello, World!"})
		assert.NoError(t, err)

		assert.Equal(t, "Hello, World!", receivedEvent.Message)
	})

	t.Run("Multiple Subscribers", func(t *testing.T) {
		t.Parallel()

		bus := eventbus.New()
		ctx := context.Background()

		count := 0
		subscriber := eventbus.Subscribe[TestEvent](bus)
		for i := 0; i < 3; i++ {
			_, err := subscriber(func(ctx context.Context, event TestEvent) error {
				count++
				return nil
			})
			assert.NoError(t, err)
		}

		publisher := eventbus.Publish[TestEvent](bus)
		err := publisher(ctx, TestEvent{Message: "Hello, World!"})
		assert.NoError(t, err)

		assert.Equal(t, 3, count)
	})

	t.Run("Error Handling", func(t *testing.T) {
		t.Parallel()

		bus := eventbus.New()
		ctx := context.Background()

		subscriber := eventbus.Subscribe[TestEvent](bus)
		_, err := subscriber(func(ctx context.Context, event TestEvent) error {
			return errors.New("handler error")
		})
		assert.NoError(t, err)

		publisher := eventbus.Publish[TestEvent](bus)
		err = publisher(ctx, TestEvent{Message: "Hello, World!"})
		assert.Error(t, err)
		assert.Contains(t, err.Error(), "handler error")
	})

	t.Run("Dispose Subscriber", func(t *testing.T) {
		t.Parallel()

		bus := eventbus.New()
		ctx := context.Background()

		count := 0
		subscriber := eventbus.Subscribe[TestEvent](bus)
		disposable, err := subscriber(func(ctx context.Context, event TestEvent) error {
			count++
			return nil
		})
		assert.NoError(t, err)

		publisher := eventbus.Publish[TestEvent](bus)
		err = publisher(ctx, TestEvent{Message: "Hello, World!"})
		assert.NoError(t, err)
		assert.Equal(t, 1, count)

		disposable.Dispose()

		err = publisher(ctx, TestEvent{Message: "Hello again!"})
		assert.NoError(t, err)
		assert.Equal(t, 1, count) // Count should not increase after disposal
	})

	t.Run("Default EventBus", func(t *testing.T) {
		t.Parallel()

		ctx := context.Background()

		var receivedEvent TestEvent
		subscriber := eventbus.Subscribe[TestEvent]()
		disposable, err := subscriber(func(ctx context.Context, event TestEvent) error {
			receivedEvent = event
			return nil
		})
		assert.NoError(t, err)
		defer disposable.Dispose()

		publisher := eventbus.Publish[TestEvent]()
		err = publisher(ctx, TestEvent{Message: "Hello, Default!"})
		assert.NoError(t, err)

		assert.Equal(t, "Hello, Default!", receivedEvent.Message)
	})
}

Discussion