😽
Go: 2つのワーカーのメッセージを橋渡し
「2つのワーカースレッドのメッセージを橋渡しする」という実装が、Go で書いたら意外と難しかったのでまとめてみました。
- メインスレッドはワーカーからデータを受け取り、もう一方のワーカーに送信する
- ワーカーは2つだけ (Hub 型のブロードキャストは考えない)
- 特定のデータの場合、処理を切り替えるので、チャンネルを直接つなげるのではなく、メインスレッドを介してデータを送受信する
- ワーカーはデータを受け取るほか、数秒間隔で定期的にデータを送信する
まとめたと言いつつ、終了時の channel の扱いがまだおかしい気がする...
package main
import (
"context"
"fmt"
"math/rand"
"time"
)
func worker(ctx context.Context, id int, chRecv <-chan int, chSend chan<- int) {
defer close(chSend) // channel は書き込み側で閉じる
i := 0
for {
select {
case <-ctx.Done():
fmt.Printf("Worker %d: Stopping\n", id)
return
case data, ok := <-chRecv:
if !ok {
fmt.Printf("Worker %d: Recv channel closed\n", id)
return
}
fmt.Printf("Worker %d: Received %d\n", id, data)
// 1〜3秒の間でランダムに待つ(float秒)
case <-time.After(time.Duration(1_000+rand.Float64()*2_000) * time.Millisecond):
select {
// time.After 中にキャンセルされている可能性があるので、再度 ctx.Done() を確認する
case <-ctx.Done():
fmt.Printf("Worker %d: Stopping (after timer)\n", id)
return
case chSend <- i:
i++
}
}
}
}
func main() {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
// bridge(main) -> worker
toWorker1 := make(chan int, 5)
toWorker2 := make(chan int, 5)
// worker -> bridge(main)
fromWorker1 := make(chan int)
fromWorker2 := make(chan int)
go worker(ctx, 1, toWorker1, fromWorker1)
go worker(ctx, 2, toWorker2, fromWorker2)
// この値を超えたら終了する、ということにしておく
const threshold = 10
for fromWorker1 != nil || fromWorker2 != nil {
select {
case v, ok := <-fromWorker1:
if !ok {
fmt.Println("fromWorker1 closed")
fromWorker1 = nil // channelはすでに閉じられているのでnilにし、以降のselectを無効化
continue
}
fmt.Printf("Received from worker1: %v\n", v)
// cancel後はdrainだけして転送はしない
if ctx.Err() == nil {
if v > threshold {
fmt.Println("Data from worker1 is greater than threshold, stopping...")
cancel()
// ch1Send をdrainするため、nilにしない
} else {
select {
case toWorker2 <- v:
case <-ctx.Done():
}
}
}
case v, ok := <-fromWorker2:
if !ok {
fmt.Println("fromWorker2 closed")
fromWorker2 = nil
continue
}
fmt.Printf("Received from worker2: %v\n", v)
if ctx.Err() == nil {
if v > threshold {
fmt.Println("Data from worker2 is greater than threshold, stopping...")
cancel()
} else {
select {
case toWorker1 <- v:
case <-ctx.Done():
}
}
}
}
}
fmt.Println("Main: All workers done, exiting.")
}
Discussion