📌

ライブラリ利用者に panic を recover させない

2023/05/01に公開

Go の準標準パッケージに singleflight という重複した関数の呼び出しを抑止するものがある。キャッシュの有効期限が切れたときにキャッシュ更新のための同一リクエストが Origin サーバーに何回も発行されてしまい、Origin サーバーに高負荷がかかってしまうという Thundering Herd 問題への対応に用いられる。

package main

import (
	"fmt"
	"sync"
	"time"

	"golang.org/x/sync/singleflight"
)

var group singleflight.Group

func something() {
	time.Sleep(500 * time.Millisecond)
	fmt.Println("something")
}

func doSomething(n int) {
	something()
	fmt.Printf("result %d\n", n)
}

func doSomethingWithSingleFlight(n int) {
	v, _, _ := group.Do("something", func() (interface{}, error) {
		something()
		return time.Now(), nil
	})
	fmt.Printf("result %d: %s\n", n, v)
}

func main() {
	var wg sync.WaitGroup
	for i := 0; i < 5; i++ {
		i := i
		wg.Add(1)
		go func() {
			defer wg.Done()
			// doSomething(i)
			doSomethingWithSingleFlight(i)
		}()
	}
	wg.Wait()
    }
# doSomething => something は指定回数分実行される
something
something
result 2
something
result 1
something
something
result 0
result 4
result 3
# doSomethingWithSingleFlight => something は一回のみ実行される
something
result 4: 2009-11-10 23:00:00.5 +0000 UTC m=+0.500000001
result 3: 2009-11-10 23:00:00.5 +0000 UTC m=+0.500000001
result 2: 2009-11-10 23:00:00.5 +0000 UTC m=+0.500000001
result 1: 2009-11-10 23:00:00.5 +0000 UTC m=+0.500000001
result 0: 2009-11-10 23:00:00.5 +0000 UTC m=+0.500000001

playground

singleflight.Group.Do の内部処理は以下の通り。

func (g *Group) Do(key string, fn func() (interface{}, error)) (v interface{}, err error, shared bool) {
  g.mu.Lock()
  if g.m == nil {
    g.m = make(map[string]*call)
  }
  if c, ok := g.m[key]; ok {
    c.dups++
    g.mu.Unlock()
    c.wg.Wait()

    if e, ok := c.err.(*panicError); ok {
      panic(e)
    } else if c.err == errGoexit {
      runtime.Goexit()
    }
    return c.val, c.err, true
  }
  c := new(call)
  c.wg.Add(1)
  g.m[key] = c
  g.mu.Unlock()

  g.doCall(c, key, fn)
  return c.val, c.err, c.dups > 0
}

やっていることは簡単で、

  • 引数に呼び出される関数を識別する key と関数を受け取る
  • 最初の Do 実行であれば、内部の関数呼び出し管理用の map を初期化
  • 関数呼び出し管理用 map を key で参照
  • すでに存在すれば過去に呼び出し済みであると判定できるので、結果を待って返す
  • 呼び出したことがなければ、関数呼び出しを表す call 構造体を生成し、関数呼び出し管理用の map に登録して呼び出し

go panic

関数の実行自体は以下の doCall で行われる。

func (g *Group) doCall(c *call, key string, fn func() (interface{}, error)) {
  normalReturn := false
  recovered := false

  // use double-defer to distinguish panic from runtime.Goexit,
  // more details see https://golang.org/cl/134395
  defer func() {
    // the given function invoked runtime.Goexit
    if !normalReturn && !recovered {
      c.err = errGoexit
    }

    g.mu.Lock()
    defer g.mu.Unlock()
    c.wg.Done()
    if g.m[key] == c {
      delete(g.m, key)
    }

    if e, ok := c.err.(*panicError); ok {
      // In order to prevent the waiting channels from being blocked forever,
      // needs to ensure that this panic cannot be recovered.
      if len(c.chans) > 0 {
        go panic(e)
        select {} // Keep this goroutine around so that it will appear in the crash dump.
      } else {
        panic(e)
      }
    } else if c.err == errGoexit {
      // Already in the process of goexit, no need to call again
    } else {
      // Normal return
      for _, ch := range c.chans {
        ch <- Result{c.val, c.err, c.dups > 0}
      }
    }
  }()

  func() {
    defer func() {
      if !normalReturn {
        // Ideally, we would wait to take a stack trace until we've determined
        // whether this is a panic or a runtime.Goexit.
        //
        // Unfortunately, the only way we can distinguish the two is to see
        // whether the recover stopped the goroutine from terminating, and by
        // the time we know that, the part of the stack trace relevant to the
        // panic has been discarded.
        if r := recover(); r != nil {
          c.err = newPanicError(r)
        }
      }
    }()

    c.val, c.err = fn()
    normalReturn = true
  }()

  if !normalReturn {
    recovered = true
  }
}

singleflight は、 Do のほかに結果をチャネルで返す DoChan も提供している。ここで DoChan が返すチャネルは singleflight(送信側)では閉じられない(ドキュメントにも記載されている)。

doCall では、関数呼び出しの結果 panic やエラーが発生したときの処理も行っているが、あえて go panic(e) で非同期に panic を発生させて select {} でブロックしている。
recover は同じ goroutine 上で発生した panic しか回復できないため、このように go panic で新しく作成した goroutine 上で panic を発生させることにより、DoChan 呼び出し側(チャネル受信側)が recover によって panic を回復させられないようになっている。

DoChan 呼び出し側で回復できてしまうと、 DoChan で開いたチャネルにデータが送信もクローズもされずに受信側が永遠にブロックされてしまうので、これを防いでいる。

go panic というかたちを初めて見たのでなるほどなとなった(中で非同期処理を行うライブラリを書くなんてほぼないだろうけど)。

参考

Discussion