😽

Go: 2つのワーカーのメッセージを橋渡し

に公開

「2つのワーカースレッドのメッセージを橋渡しする」という実装が、Go で書いたら意外と難しかったのでまとめてみました。

  • メインスレッドはワーカーからデータを受け取り、もう一方のワーカーに送信する
  • ワーカーは2つだけ (Hub 型のブロードキャストは考えない)
  • 特定のデータの場合、処理を切り替えるので、チャンネルを直接つなげるのではなく、メインスレッドを介してデータを送受信する
  • ワーカーはデータを受け取るほか、数秒間隔で定期的にデータを送信する

まとめたと言いつつ、終了時の channel の扱いがまだおかしい気がする...

package main

import (
	"context"
	"fmt"
	"math/rand"
	"time"
)

func worker(ctx context.Context, id int, chRecv <-chan int, chSend chan<- int) {
	defer close(chSend) // channel は書き込み側で閉じる
	i := 0
	for {
		select {
		case <-ctx.Done():
			fmt.Printf("Worker %d: Stopping\n", id)
			return
		case data, ok := <-chRecv:
			if !ok {
				fmt.Printf("Worker %d: Recv channel closed\n", id)
				return
			}
			fmt.Printf("Worker %d: Received %d\n", id, data)
		// 1〜3秒の間でランダムに待つ(float秒)
		case <-time.After(time.Duration(1_000+rand.Float64()*2_000) * time.Millisecond):
			select {
			// time.After 中にキャンセルされている可能性があるので、再度 ctx.Done() を確認する
			case <-ctx.Done():
				fmt.Printf("Worker %d: Stopping (after timer)\n", id)
				return
			case chSend <- i:
				i++
			}
		}
	}
}

func main() {
	ctx, cancel := context.WithCancel(context.Background())
	defer cancel()

	// bridge(main) -> worker
	toWorker1 := make(chan int, 5)
	toWorker2 := make(chan int, 5)
	// worker -> bridge(main)
	fromWorker1 := make(chan int)
	fromWorker2 := make(chan int)

	go worker(ctx, 1, toWorker1, fromWorker1)
	go worker(ctx, 2, toWorker2, fromWorker2)

    // この値を超えたら終了する、ということにしておく
	const threshold = 10

	for fromWorker1 != nil || fromWorker2 != nil {
		select {
		case v, ok := <-fromWorker1:
			if !ok {
				fmt.Println("fromWorker1 closed")
				fromWorker1 = nil // channelはすでに閉じられているのでnilにし、以降のselectを無効化
				continue
			}
			fmt.Printf("Received from worker1: %v\n", v)
			// cancel後はdrainだけして転送はしない
			if ctx.Err() == nil {
				if v > threshold {
					fmt.Println("Data from worker1 is greater than threshold, stopping...")
					cancel()
					// ch1Send をdrainするため、nilにしない
				} else {
					select {
					case toWorker2 <- v:
					case <-ctx.Done():
					}
				}
			}
		case v, ok := <-fromWorker2:
			if !ok {
				fmt.Println("fromWorker2 closed")
				fromWorker2 = nil
				continue
			}
			fmt.Printf("Received from worker2: %v\n", v)
			if ctx.Err() == nil {
				if v > threshold {
					fmt.Println("Data from worker2 is greater than threshold, stopping...")
					cancel()
				} else {
					select {
					case toWorker1 <- v:
					case <-ctx.Done():
					}
				}
			}
		}
	}

	fmt.Println("Main: All workers done, exiting.")
}

Discussion