Go Conference 2025 「ENABLE THIS CODE by your REVIEW」4問目解説

に公開

こんにちは。ナレッジワークの Middleware グループでバックエンドエンジニアをしている aita です。

この記事は Go Conference 2025 のナレッジワークのブースで展示された ENABLE THIS CODE by your REVIEW の4問目の解説です。

パネル

レビューポイント

以下は複数のファイルを並行で処理するコードですが、複数の問題を抱えています。

※ ブースでの展示時では誤っていたレビューポイントの行番号を修正しました

01	// CountWordFrequency は行毎に単語が書かれたファイルに含まれる単語の出現頻度をカウントする
02	func CountWordFrequency(
03		ctx context.Context,
04		filenames []string,
05		conccurency int,
06	) (stats map[string]int, err error) {
07		stats = make(map[string]int)
08	
09		ctx, cancel := context.WithCancel(ctx)
10		defer cancel()
11	
12		onError := func(e error) {
13			err = e
14			cancel()
15		}
16	
17		ch := make(chan string)
18		var wg sync.WaitGroup
19		for i := 0; i < conccurency; i++ {
20			wg.Add(1)
21			go func() {
22				defer wg.Done()
23				for {
24					select {
25					case <-ctx.Done():
26						return
27	
28					case filename, ok := <-ch:
29						if !ok {
30							return
31						}
32						file, err := os.Open(filename)
33						if err != nil {
34							onError(err)
35							return
36						}
37						defer file.Close()
38	
39						r := bufio.NewReader(file)
40						for {
41							line, err := r.ReadString('\n')
42							if err != nil {
43								onError(err)
44								return
45							}
46	
47							word := strings.TrimSpace(line)
48							if word == "" {
49								continue
50							}
51							stats[word]++
52						}
53					}
54				}
55			}()
56		}
57	
58		for _, filename := range filenames {
59			ch <- filename
60		}
61	
62		wg.Wait()
63		close(ch)
64	
65		return stats, err
66	}

L24: onError での err の代入時のデータ競合

onError は外側の名前付き戻り値 err を更新しますが、これは複数のワーカーから並行に呼ばれ得るためデータ競合になります。 sync.Once で最初のエラーのみ記録するなど、err の更新でデータ競合が起越さないための工夫が必要です。

L30 for ループが range over int になっていない

Go 1.22 以降では range over int 構文(例: for i := range conccurency { ... })が利用でき、より簡潔に書けます。これにより、ループカウンタ変数の定義や条件式・インクリメント式が不要になり、可読性が向上します。ただし、i を利用しない場合は特に range conccurency と書くだけで済むため、無駄な変数を作らず意図も明確になります。

L48 defer file.Close() がループで行われている

ワーカー内でファイル毎に defer file.Close() を行うと、ゴルーチン終了時までクローズされず、ファイル数が多い場合に FD(ファイルディスクリプタ)を大量に保持してしまいます。ループ内の defer はアンチパターンです。推奨は「そのファイルの処理が終わった直後に file.Close() を明示的に呼ぶ」か、func processFile(...) error { defer f.Close(); ... } のように関数スコープを分けて defer の寿命を短くすることです。

L62 stats の更新処理のデータ競合

stats は通常の map[string]int で、複数のゴルーチンから同時に stats[word]++ を行うとデータ競合が発生します。Go の map は並行書き込みに対してスレッドセーフではなく、ランタイムパニックや壊れた値の読み書きが起き得ます。データ競合の回避のため、sync.Mutexstats へのアクセスを保護するなどの対策が必要です。

L69-70 コンテキストキャンセル時に送信側でハングする可能性

ワーカーは ctx.Done() を監視して早期終了しますが、プロデューサ側(ファイル名を送る側)は for _, filename := range filenames { ch <- filename } とブロッキング送信のみです。もし途中で onError が呼ばれて cancel() が呼び出されると、ワーカーは早々に受信を止めますが、送信側は ch <- でブロックしてハングします。回避するには送信時にも select { case ch <- filename: case <-ctx.Done(): break } を入れて中断可能にし、送信ループを抜けたら close(ch) する、という「生産者側のキャンセル対応」が必要です。

L73-74 WaitGroup の Wait と チャンネルの close の順序の誤り

wg.Wait() の後に close(ch) を実行していますが、ワーカーは「チャネルが閉じられるか、コンテキストがキャンセルされる」まで受信待ちを続けます。wg.Wait() はワーカーの終了を待つため、ワーカー終了にはチャネル close が必要です。しかし、 close は wg.Wait() の後に呼ばれていて、相互待ちでデッドロックします。正解は「全ての送信を終えたら close(ch) を先に行い、その後に wg.Wait() を呼ぶ必要があります。

追加ポイント

ブースの資料から漏れてしまいましたが、さらにレビューポイントがあります。

L33: io.EOF のチェック

buf.ReadLine('\n') はファイルの終端に到達するとエラーとして io.EOF が返ります。そのため err != nil だけではチェックが不足しています。エラーが io.EOF の時はリターンせずに処理を継続する必要があります。

修正例

以上のポイントを踏まえて修正した例が次となります

// CountWordFrequency は行毎に単語が書かれたファイルに含まれる単語の出現頻度をカウントする
func CountWordFrequency(
	ctx context.Context,
	filenames []string,
	concurrency int,
) (stats map[string]int, err error) {
	if concurrency <= 0 {
		return nil, fmt.Errorf("concurrency must be greater than 0")
	}

	stats = make(map[string]int)

	ctx, cancel := context.WithCancel(ctx)
	defer cancel()

	var errOnce sync.Once
	onError := func(e error) {
		errOnce.Do(func() {
			err = e
			cancel()
		})
	}

	var mu sync.Mutex

	ch := make(chan string)
	var wg sync.WaitGroup
	for range concurrency {
		wg.Add(1)
		go func() {
			defer wg.Done()
			for {
				select {
				case <-ctx.Done():
					return

				case filename, ok := <-ch:
					if !ok {
						return
					}
					if e := countWordOnFile(filename, &mu, stats); e != nil {
						onError(e)
						return
					}
				}
			}
		}()
	}

loop:
	for _, filename := range filenames {
		select {
		case <-ctx.Done():
			break loop
		case ch <- filename:
		}
	}

	close(ch)
	wg.Wait()

	return stats, err
}

func countWordOnFile(filename string, mu *sync.Mutex, stats map[string]int) error {
	file, err := os.Open(filename)
	if err != nil {
		return err
	}
	defer file.Close()

	r := bufio.NewReader(file)
	next := true
	for next {
		line, err := r.ReadString('\n')
		if err != nil {
			if !errors.Is(err, io.EOF) {
				return err
			}
			next = false
		}

		word := strings.TrimSpace(line)
		if word == "" {
			continue
		}
		mu.Lock()
		stats[word]++
		mu.Unlock()
	}
	return nil
}

さらに改善するなら

キャンセル伝播が countWordOnFile 内に効かない

ctx がキャンセルされても、今開いているファイルの読み込みが最後まで続きます。エラー発生時に「なるべく早く止めたい」なら、countWordOnFile に ctx を渡して適宜 ctx.Done() を確認するのが良いです。

func countWordOnFile(ctx context.Context, filename string, mu *sync.Mutex, stats map[string]int) error {
    (中略)

	r := bufio.NewReader(file)
	next := true
	for next {
		select {
		case <-ctx.Done():
			return ctx.Err()
		default:
		}

    (中略)

チャネルのバッファサイズ

これは厳密には問題ないですが、concurrency に合わせてバッファを付けると送信側の待ちが減りスムーズです。

ch := make(chan string, 10)

ロック粒度

stats[word]++ のたびにグローバルロックを取るので競合が多いとスループットが落ちます。各ワーカーでローカル map[string]int に集計→最後にマージする方式が高速です。

Close のエラーチェック

軽微ですが Close が返すエラーを無視してしまっています。名前付き返り値を利用して次のようにエラーを返すことができます。

func countWordOnFile(ctx context.Context, filename string, mu *sync.Mutex, stats map[string]int) (err error) {
	file, err := os.Open(filename)
	if err != nil {
		return err
	}
    defer func() {
        err = file.Close()
    }()

まとめ

Go らしい問題を意識して作成しましたがいかがでしたでしょうか? お楽しみいただけたなら幸いです。

プロダクションのコードでは errgroup.Group を利用することが多いと思います。Goを業務で書いていても、意外と goroutine, channel, context.Context を組み合わせたコードを書く機会は少ないと思います。context.Context を用いたキャンセルは自分でも書いていて新鮮でした。

株式会社ナレッジワーク

Discussion