🔊

Goで複数channelにブロードキャストするライブラリを作った話

2023/10/05に公開

これです:

https://github.com/Maki-Daisuke/go-broadcast-channel

Usage

こんなかんじの、ごく当たり前の雰囲気のAPIで使えます:

// バッファゼロ、タイムアウト1秒で初期化する
b := broadcast.New[int](0).WithTimeout(1 * time.Second)
defer b.Close()  // 使い終わったら、必ずClose()しましょう。ゾンビgoroutineが残ります。

// 受信者を2つ作って、WaitGroupで終了を待つ
wg := &sync.WaitGroup{}
wg.Add(2)

ch1 := make(chan int)  // 自分で受信用channelを作って
b.Subscribe(ch1)       // サブスクライブする
go func() {
    for v := range ch1 {
        // Do something.
    }
    wg.Done()
}()

ch2 := make(chan int)
b.Subscribe(ch2)
go func() {
    for v := range ch2 {
        // Do another thing.
    }
    wg.Done()
}()

// 値を2つブロードキャストする
b.Chan() <- 1
b.Chan() <- 2
// Closeすると、すべての受信側channelもcloseされる
b.Close()  //※deferでもう一度Closeが呼ばれてしまっても安全です

wg.Wait()

こんなの何番煎じだよ?ってかんじですね。でも、作ったらからにはそれなりに理由があるんです。
当然、作る前には既存のライブラリをいくつか調べたのですが、残念ながら自分の求める要件を満たすものがなかったのです。
今回の要件は次の通り:

Design Goals

型安全である

せっかくあるんだから、Genericsを使えと。
既存のライブラリが比較的歴史の長いものが多く、また後方互換性のためにあえてGenericsを使っていないということもあると思います。
この点は、後発ライブラリのメリットですね。

受信側がブロックしてしまっても安全である

既存の実装では、こんなかんじ↓でchannelのリストをforループで回してひとつずつsendするという単純な実装ばかりでした()。

for _, c := range subscribers {
    c <- v
}

この実装では、受信側channelのどれかひとつがブロックするとブロードキャスト処理全体がブロックしてしまいます。いつまでたってもchannelを読みに来ないgoroutineが一人いるだけで、全員が迷惑をこうむるわけですね。

この問題を避けるため、本ライブラリではひとつずつ逐次処理するのではなく、selectを使って受け取り準備ができている受信者から順不同で送信します。
さらにタイムアウトを導入することで、指定した時間内に受信しなかった受信者を強制的に除外する仕組みとなっています。(※デフォルトではタイムアウトは無限⇒全体がブロックします)

また上の単純な実装では、何らかの理由で受信側channelがcloseされた場合にruntime panicになる点も考慮されていませんね。

channelをAPIとして使う

多くのライブラリでは Send(v) や Publish(v) のようなメソッド呼び出しで値を送信するAPIを採用していました。これらは直感的である一方で、同期的な呼び出しであるというデメリットがあります。
サーバープログラムを書いたことがある方ならわかるでしょうが、 select を使って複数の通信処理を同時に待ちたいという場面がけっこうあるのです。
そのため、送信・受信の双方でchannelをAPIとして採用しています:

// 送信側
b.Chan() <- v

// 受信側
ch := make(chan T)
b.Subscribe(ch)
v := <-ch

channelなので、好きなときに select 文の中で使うことができるというわけです:

select {
//送信
case b.Chan() <- v:
	...
//受信
case v := <-ch:
	...
//defaultをつければnon-blockingにできる
default:
	...
}

「PubSubじゃあかんの?」

channelではなくコールバック関数を使った、こんなかんじのPubSubライブラリも存在しますよね:

b := NewBroadcaster()
b.Subscribe(func(v interface{}){
    // Do something.
})
b.Publish(someVal)

人によってはこちらのほうが慣れている/直感的に理解しやすいということもあるでしょう。
ですが単純にコールバックを使った設計は、backpressure問題におちいりやすいというデメリットがあります。これがコールバックを採用しなかった理由です。

backpressure問題というのは、あるデータ処理が前段と後段に分かれているとき(例:読み込みと書き出し)後段のスループットが前段のスループットよりも遅いとデータがバッファに滞留してしまう問題です。
たとえばNode.js初期のI/Oはコールバック型のAPIだったのですが、このbackpressure問題がいたるところで起こったために、Stream APIが標準APIとなったという歴史があります。

詳しくは下記の記事を参照してください:

https://medium.com/@jayphelps/backpressure-explained-the-flow-of-data-through-software-2350b3e77ce7

https://nodejs.org/en/docs/guides/backpressuring-in-streams

終わりに

ぶっちゃけ上記の要件を満たす既存のライブラリがあったら教えてください。
オレオレ実装をメンテしたいわけではないので…

Discussion