Chapter 08

チャネルの内部構造

さき(H.Saki)
さき(H.Saki)
2021.06.18に更新

この章について

ここでは、ランタイムの中でチャネルがどう動いているのかについて、runtimeパッケージのコードを読みながら深堀りしていきます。

チャネルの実体

hchan構造体

チャネルの実体はhchan構造体です。

type hchan struct {
	// (一部抜粋)
	qcount   uint           // バッファ内にあるデータ数
	dataqsiz uint           // バッファ用のメモリの大きさ(何byteか)
	buf      unsafe.Pointer // バッファ内へのポインタ
	elemsize uint16
	closed   uint32
	elemtype *_type // チャネル型
	sendx    uint   // send index
	recvx    uint   // receive index
	recvq    waitq  // 受信待ちしているGの連結リスト
	sendq    waitq  // 送信待ちしているGの連結リスト
}

出典:runtime/chan.go

送受信待ちGのリストについて

チャネルには、そのチャネルからの送受信街をしているGを保存するrecvq, sendqフィールドがあります。
このフィールドの型をよくみてみると、waitq型という見慣れないものであることに気づくかと思います。

type waitq struct {
	first *sudog
	last  *sudog
}

出典:runtime/chan.go

連結リストらしく先頭と最後尾へのポインタが含まれています。
しかし、肝心のリスト要素の型が、g型ではなくてsudog型というものであることがわかります。

// sudog represents a g in a wait list, such as for sending/receiving
// on a channel.
type sudog struct {
	// (一部抜粋)
	g    *g  // Gそのもの
	next *sudog // 後要素へのポインタ(連結リストなので)
	prev *sudog // 前要素へのポインタ(連結リストなので)
	elem unsafe.Pointer // 送受信したい値
	c    *hchan // 送受信待ちをしている先のチャネル
}

出典:runtime/runtime2.go

なぜGそのものの連結リストではなくて、わざわざsudog型を導入したのでしょうか。
その理由は、sudog型の定義に添えられたコメントに記されています。

sudog is necessary because the g ↔ synchronization object relation is many-to-many.
A g can be on many wait lists, so there may be many sudogs for one g;
and many gs may be waiting on the same synchronization object, so there may be many sudogs for one object.

(訳)sudog型の必要性は、Gと同期を必要とするオブジェクトとの関係が多対多であることに由来しています。
Gは(select文などで)たくさんのチャネルからの送受信を待つことがあるので、1つのGに対して複数個のsudogが必要です。
そして、一つの同期オブジェクト(チャネル等)からの送受信を複数のGが待っていることもあるため、1つの同期オブジェクトに対しても複数個のsudogが必要です。

出典:runtime/runtime2.go

つまり、GとチャネルのM:Nの関係をうまく表現するための中間素材としてsudogが存在するのです。

チャネル動作の裏側

ここからは、チャネルを使った値の送受信やチャネルの作成はどのように行われているのか、ランタイムのコードレベルまで掘り下げてみてみます。

チャネルの作成

Goのコードの中でmake(chan 型名)と書いた場所があると、バイナリ上では自動でruntime.makechan関数を呼んでいることに変換されます。

TEXT main.main(SB) /path/to/main.go
// (略)
  main.go:4		0x105e1b1		e8ca55faff		CALL runtime.makechan(SB)		

このruntime.makechan関数をみてみると、

func makechan(t *chantype, size int) *hchan

出典:runtime/chan.go

hchan構造体を返す関数でした。ここで、チャネルの実体hchanにたどり着きました。

特筆すべきなのは、make(chan 型名)と書いたときに帰ってくるのが*hchanとポインタであるということです。
元からhchanのポインタである、ということはつまり「チャネルを別の関数に渡すときに、確実に同じチャネルを参照するようにするためわざわざチャネルのポインタを渡す」というようなことはしなくていいということです。

送信操作

チャネルcに対して値xを送るためc <- xと書かれたとき、呼び出されるのは以下のchansend1関数です。

// entry point for c <- x from compiled code
func chansend1(c *hchan, elem unsafe.Pointer) {
	chansend(c, elem, true, getcallerpc())
}

出典:runtime/chan.go

内部で呼び出しているchansend関数が、本質的な送信処理をしています。
このchansend関数は、バッファがに空きがある/ない、受信待ちしているGがある/ないなど、その時々の状況によって挙動が違います。

受信待ちしているGがある

受信待ちしているGがあるのならば、チャネルcrecvq連結リストフィールドにsudogが1つ以上あるはずです。

そのような場合には、send関数を呼ぶことで処理をしています。

func chansend(c *hchan, ep unsafe.Pointer, block bool, callerpc uintptr) bool {
    // (一部抜粋)
    if sg := c.recvq.dequeue(); sg != nil {
		// Found a waiting receiver. We pass the value we want to send
		// directly to the receiver, bypassing the channel buffer (if any).
		send(c, sg, ep, func() { unlock(&c.lock) }, 3)
		return true
	}
}

出典:runtime/chan.go

肝心のsend関数は以下のようになっています。

// send processes a send operation on an empty channel c.
// Channel c must be empty and locked.
func send(c *hchan, sg *sudog, ep unsafe.Pointer, unlockf func(), skip int) {
    // (一部抜粋)
    if sg.elem != nil {
		sendDirect(c.elemtype, sg, ep) // 送信
	}
	gp := sg.g
    goready(gp, skip+1) // Gをrunableにする
}

出典:runtime/chan.go

  1. sendDirect関数で、送信したい値を受信待ちsudogelemフィールドに書き込む
  2. goready関数(→内部でready関数)で、受信待ちしていたGのステータスをGwaitingからGrunnableに変更する

送り先チャネルのバッファにまだ空きがある

バッファありチャネルで、そこにまだ空きがあるならば、送信したい値をその中に入れる処理をします。

func chansend(c *hchan, ep unsafe.Pointer, block bool, callerpc uintptr) bool {
    // (一部抜粋)
    if c.qcount < c.dataqsiz {
        // cのc.sendx番目のポインタをget
		qp := chanbuf(c, c.sendx)
		typedmemmove(c.elemtype, qp, ep) // bufにepを書き込み
		// sendxの値を更新
		c.sendx++
		if c.sendx == c.dataqsiz {
			c.sendx = 0
		}
		return true
	}
}

出典:runtime/chan.go

バッファがフル/バッファなしチャネル

バッファがいっぱい、もしくはそもそもバッファなしチャネルだった場合は、その場では送信できません。
その場合はチャネルをブロックして、当該Gを待ちにする必要があります。

何はともあれchansend関数での処理内容をみてみましょう。

func chansend(c *hchan, ep unsafe.Pointer, block bool, callerpc uintptr) bool {
    // (一部抜粋)
    // Block on the channel. Some receiver will complete our operation for us.

	// sudogを作る
    mysg := acquireSudog()
    mysg.elem = ep
    mysg.g = gp
	// sudogをチャネルのsendまちリストに入れる
    c.sendq.enqueue(mysg)
	// (goparkについては後述)
    gopark(chanparkcommit, unsafe.Pointer(&c.lock), waitReasonChanSend, traceEvGoBlockSend, 2)
}

出典:runtime/chan.go

まずacquireSudog関数を使って得たsudogに、「送信待ちをしているG」「送りたい値」といった情報を入れています。
sudog構造体が完成したら、enqueueメソッドを使ってチャネルのsendqフィールドにそれを格納しています。

その後に続くgopark関数は、以下のようになっています。

func gopark(unlockf func(*g, unsafe.Pointer) bool, lock unsafe.Pointer, reason waitReason, traceEv byte, traceskip int) {
	// (一部抜粋)
	mp := acquirem() // 今のmをgetする
	releasem(mp) // gのstackguard0をstackPreemptに書き換えて、プリエンプとしていいよってフラグにする
	mcall(park_m) //引数となっている関数を呼び出す
}

出典:runtime/proc.go

  1. releasem関数で、Gをプリエンプトしていいというフラグを立てる
  2. mcall関数の引数であるpark_m関数を呼び出す

park_m関数の中では、

// park continuation on g0.
func park_m(gp *g) {
	// (一部抜粋)
	casgstatus(gp, _Grunning, _Gwaiting)
	dropg()
	schedule()
}

出典:runtime/proc.go

  1. GのステータスをGrunningからGwaitingに変更
  2. dropg関数で、GとMを切り離す
  3. スケジューラによって、Mに新しいGを割り当てる

という処理を行っています。

受信操作

チャネルcから値を受信する<- cと書かれたときに、以下のchanrecv1関数かchanrecv2関数のどちらかが呼ばれます。の最初のエントリポイントはこれ。

func chanrecv1(c *hchan, elem unsafe.Pointer) {
	chanrecv(c, elem, true)
}

func chanrecv2(c *hchan, elem unsafe.Pointer) (received bool) {
	_, received = chanrecv(c, elem, true)
	return
}

出典:runtime/chan.go

内部で呼び出しているchanrecv関数が、本質的な受信処理をしています。
これも送信の時と同様に、状況によって挙動が異なります。

送信待ちがある

送信待ちしているGがあるのならば、チャネルcsendq連結リストフィールドにsudogが1つ以上あるはずです。

そのため、sendqフィールドから受け取ったsudogを使って、recv関数にて受信処理を行います。

func chanrecv(c *hchan, ep unsafe.Pointer, block bool) (selected, received bool) {
    // (一部抜粋)
    if sg := c.sendq.dequeue(); sg != nil {
		// Found a waiting sender. If buffer is size 0, receive value
		// directly from sender. Otherwise, receive from head of queue
		// and add sender's value to the tail of the queue (both map to
		// the same buffer slot because the queue is full).
		recv(c, sg, ep, func() { unlock(&c.lock) }, 3)
		return true, true
	}
}

出典:runtime/chan.go

recv関数については、このチャネルが

  • バッファなしチャネル
  • バッファありチャネルで、その内部バッファが埋まっている

のかで挙動がわかれます。

func recv(c *hchan, sg *sudog, ep unsafe.Pointer, unlockf func(), skip int) {
    // (一部抜粋)
    // bufがないなら直接
	if c.dataqsiz == 0 {
		if ep != nil {
			// copy data from sender
			recvDirect(c.elemtype, sg, ep)
		}
	} else {
        // Queue is full. Take the item at the
		// head of the queue. Make the sender enqueue
		// its item at the tail of the queue. Since the
		// queue is full, those are both the same slot.
        qp := chanbuf(c, c.recvx)
        // copy data from queue to receiver
		if ep != nil {
			typedmemmove(c.elemtype, ep, qp)
		}
		// copy data from sender to queue
		typedmemmove(c.elemtype, qp, sg.elem)
		c.recvx++
		if c.recvx == c.dataqsiz {
			c.recvx = 0
		}
		c.sendx = c.recvx
	}
	gp := sg.g
	goready(gp, skip+1)
}

出典:runtime/chan.go

バッファなしチャネルだった場合、

  1. recvDirect関数で、受信した値を受け取りたい変数に直接結果を書き込み
  2. goready関数で、GのステータスをGrunnableに変更

バッファありチャネルだった場合、

  1. chanbuf関数で、次に受け取る値がある場所(=bufのインデックスrecvx番目)へのポインタをget
  2. 1で手に入れた情報を使って、受信した値を受け取りたい変数に直接結果を書き込み
  3. 値が受信済みになって空いたbufの位置(=bufのインデックスrecvx番目)に、送信待ちになっていた値を書き込み
  4. recvxの値を更新
  5. sendxの値を、recvxと同じ値になるように更新
  6. goready関数で、GのステータスをGrunnableに変更

送信待ちがなく、かつバッファに受信可能な値がある

このような場合では、バッファの中の値を直接受け取るだけでOKです。

func chanrecv(c *hchan, ep unsafe.Pointer, block bool) (selected, received bool) {
    // (一部抜粋)
    if c.qcount > 0 {
		// Receive directly from queue
		qp := chanbuf(c, c.recvx)
		if ep != nil {
			typedmemmove(c.elemtype, ep, qp) // epにバッファの中身を書き込み
		}
		// recvxの値を更新
		c.recvx++
		if c.recvx == c.dataqsiz {
			c.recvx = 0
		}
	}
}

出典:runtime/chan.go

チャネルから受け取れる値がない場合

送信待ちのGもなく、バッファの中にデータがない場合は、その場では値を受信できません。
その場合はチャネルをブロックして、当該Gを待ちにする必要があります。

このような場合、chanrecv関数ではどのように処理をしているのでしょうか。

func chanrecv(c *hchan, ep unsafe.Pointer, block bool) (selected, received bool) {
    // (一部抜粋)
    // no sender available: block on this channel.

	// sudogを作って設定
	gp := getg()
	mysg := acquireSudog()
    mysg.elem = ep
    mysg.g = gp

	// 作ったsudogをrecvqに追加
    c.recvq.enqueue(mysg)

	// (goparkの内容については前述の通り)
    gopark(chanparkcommit, unsafe.Pointer(&c.lock), waitReasonChanReceive, traceEvGoBlockRecv, 2)
}

出典:runtime/chan.go