この章について
ここでは、ランタイムの中でチャネルがどう動いているのかについて、runtime
パッケージのコードを読みながら深堀りしていきます。
チャネルの実体
hchan構造体
チャネルの実体はhchan
構造体です。
type hchan struct {
// (一部抜粋)
qcount uint // バッファ内にあるデータ数
dataqsiz uint // バッファ用のメモリの大きさ(何byteか)
buf unsafe.Pointer // バッファ内へのポインタ
elemsize uint16
closed uint32
elemtype *_type // チャネル型
sendx uint // send index
recvx uint // receive index
recvq waitq // 受信待ちしているGの連結リスト
sendq waitq // 送信待ちしているGの連結リスト
}
送受信待ちGのリストについて
チャネルには、そのチャネルからの送受信街をしているGを保存するrecvq
, sendq
フィールドがあります。
このフィールドの型をよくみてみると、waitq
型という見慣れないものであることに気づくかと思います。
type waitq struct {
first *sudog
last *sudog
}
連結リストらしく先頭と最後尾へのポインタが含まれています。
しかし、肝心のリスト要素の型が、g
型ではなくてsudog
型というものであることがわかります。
// sudog represents a g in a wait list, such as for sending/receiving
// on a channel.
type sudog struct {
// (一部抜粋)
g *g // Gそのもの
next *sudog // 後要素へのポインタ(連結リストなので)
prev *sudog // 前要素へのポインタ(連結リストなので)
elem unsafe.Pointer // 送受信したい値
c *hchan // 送受信待ちをしている先のチャネル
}
なぜGそのものの連結リストではなくて、わざわざsudog
型を導入したのでしょうか。
その理由は、sudog
型の定義に添えられたコメントに記されています。
sudog is necessary because the g ↔ synchronization object relation is many-to-many.
A g can be on many wait lists, so there may be many sudogs for one g;
and many gs may be waiting on the same synchronization object, so there may be many sudogs for one object.(訳)
sudog
型の必要性は、Gと同期を必要とするオブジェクトとの関係が多対多であることに由来しています。
Gは(select
文などで)たくさんのチャネルからの送受信を待つことがあるので、1つのGに対して複数個のsudog
が必要です。
そして、一つの同期オブジェクト(チャネル等)からの送受信を複数のGが待っていることもあるため、1つの同期オブジェクトに対しても複数個のsudog
が必要です。
つまり、GとチャネルのM:Nの関係をうまく表現するための中間素材としてsudog
が存在するのです。
チャネル動作の裏側
ここからは、チャネルを使った値の送受信やチャネルの作成はどのように行われているのか、ランタイムのコードレベルまで掘り下げてみてみます。
チャネルの作成
Goのコードの中でmake(chan 型名)
と書いた場所があると、バイナリ上では自動でruntime.makechan
関数を呼んでいることに変換されます。
TEXT main.main(SB) /path/to/main.go
// (略)
main.go:4 0x105e1b1 e8ca55faff CALL runtime.makechan(SB)
このruntime.makechan
関数をみてみると、
func makechan(t *chantype, size int) *hchan
hchan
構造体を返す関数でした。ここで、チャネルの実体hchan
にたどり着きました。
特筆すべきなのは、make(chan 型名)
と書いたときに帰ってくるのが*hchan
とポインタであるということです。
元からhchan
のポインタである、ということはつまり「チャネルを別の関数に渡すときに、確実に同じチャネルを参照するようにするためわざわざチャネルのポインタを渡す」というようなことはしなくていいということです。
送信操作
チャネルc
に対して値x
を送るためc <- x
と書かれたとき、呼び出されるのは以下のchansend1
関数です。
// entry point for c <- x from compiled code
func chansend1(c *hchan, elem unsafe.Pointer) {
chansend(c, elem, true, getcallerpc())
}
内部で呼び出しているchansend
関数が、本質的な送信処理をしています。
このchansend
関数は、バッファがに空きがある/ない、受信待ちしているGがある/ないなど、その時々の状況によって挙動が違います。
受信待ちしているGがある
受信待ちしているGがあるのならば、チャネルc
のrecvq
連結リストフィールドにsudog
が1つ以上あるはずです。
そのような場合には、send
関数を呼ぶことで処理をしています。
func chansend(c *hchan, ep unsafe.Pointer, block bool, callerpc uintptr) bool {
// (一部抜粋)
if sg := c.recvq.dequeue(); sg != nil {
// Found a waiting receiver. We pass the value we want to send
// directly to the receiver, bypassing the channel buffer (if any).
send(c, sg, ep, func() { unlock(&c.lock) }, 3)
return true
}
}
肝心のsend
関数は以下のようになっています。
// send processes a send operation on an empty channel c.
// Channel c must be empty and locked.
func send(c *hchan, sg *sudog, ep unsafe.Pointer, unlockf func(), skip int) {
// (一部抜粋)
if sg.elem != nil {
sendDirect(c.elemtype, sg, ep) // 送信
}
gp := sg.g
goready(gp, skip+1) // Gをrunableにする
}
-
sendDirect
関数で、送信したい値を受信待ちsudog
のelem
フィールドに書き込む -
goready
関数(→内部でready
関数)で、受信待ちしていたGのステータスをGwaiting
からGrunnable
に変更する
送り先チャネルのバッファにまだ空きがある
バッファありチャネルで、そこにまだ空きがあるならば、送信したい値をその中に入れる処理をします。
func chansend(c *hchan, ep unsafe.Pointer, block bool, callerpc uintptr) bool {
// (一部抜粋)
if c.qcount < c.dataqsiz {
// cのc.sendx番目のポインタをget
qp := chanbuf(c, c.sendx)
typedmemmove(c.elemtype, qp, ep) // bufにepを書き込み
// sendxの値を更新
c.sendx++
if c.sendx == c.dataqsiz {
c.sendx = 0
}
return true
}
}
バッファがフル/バッファなしチャネル
バッファがいっぱい、もしくはそもそもバッファなしチャネルだった場合は、その場では送信できません。
その場合はチャネルをブロックして、当該Gを待ちにする必要があります。
何はともあれchansend
関数での処理内容をみてみましょう。
func chansend(c *hchan, ep unsafe.Pointer, block bool, callerpc uintptr) bool {
// (一部抜粋)
// Block on the channel. Some receiver will complete our operation for us.
// sudogを作る
mysg := acquireSudog()
mysg.elem = ep
mysg.g = gp
// sudogをチャネルのsendまちリストに入れる
c.sendq.enqueue(mysg)
// (goparkについては後述)
gopark(chanparkcommit, unsafe.Pointer(&c.lock), waitReasonChanSend, traceEvGoBlockSend, 2)
}
まずacquireSudog
関数を使って得たsudog
に、「送信待ちをしているG」「送りたい値」といった情報を入れています。
sudog
構造体が完成したら、enqueue
メソッドを使ってチャネルのsendq
フィールドにそれを格納しています。
その後に続くgopark
関数は、以下のようになっています。
func gopark(unlockf func(*g, unsafe.Pointer) bool, lock unsafe.Pointer, reason waitReason, traceEv byte, traceskip int) {
// (一部抜粋)
mp := acquirem() // 今のmをgetする
releasem(mp) // gのstackguard0をstackPreemptに書き換えて、プリエンプとしていいよってフラグにする
mcall(park_m) //引数となっている関数を呼び出す
}
park_m
関数の中では、
// park continuation on g0.
func park_m(gp *g) {
// (一部抜粋)
casgstatus(gp, _Grunning, _Gwaiting)
dropg()
schedule()
}
- Gのステータスを
Grunning
からGwaiting
に変更 -
dropg
関数で、GとMを切り離す - スケジューラによって、Mに新しいGを割り当てる
という処理を行っています。
受信操作
チャネルc
から値を受信する<- c
と書かれたときに、以下のchanrecv1
関数かchanrecv2
関数のどちらかが呼ばれます。の最初のエントリポイントはこれ。
func chanrecv1(c *hchan, elem unsafe.Pointer) {
chanrecv(c, elem, true)
}
func chanrecv2(c *hchan, elem unsafe.Pointer) (received bool) {
_, received = chanrecv(c, elem, true)
return
}
内部で呼び出しているchanrecv
関数が、本質的な受信処理をしています。
これも送信の時と同様に、状況によって挙動が異なります。
送信待ちがある
送信待ちしているGがあるのならば、チャネルc
のsendq
連結リストフィールドにsudog
が1つ以上あるはずです。
そのため、sendq
フィールドから受け取ったsudog
を使って、recv
関数にて受信処理を行います。
func chanrecv(c *hchan, ep unsafe.Pointer, block bool) (selected, received bool) {
// (一部抜粋)
if sg := c.sendq.dequeue(); sg != nil {
// Found a waiting sender. If buffer is size 0, receive value
// directly from sender. Otherwise, receive from head of queue
// and add sender's value to the tail of the queue (both map to
// the same buffer slot because the queue is full).
recv(c, sg, ep, func() { unlock(&c.lock) }, 3)
return true, true
}
}
recv
関数については、このチャネルが
- バッファなしチャネル
- バッファありチャネルで、その内部バッファが埋まっている
のかで挙動がわかれます。
func recv(c *hchan, sg *sudog, ep unsafe.Pointer, unlockf func(), skip int) {
// (一部抜粋)
// bufがないなら直接
if c.dataqsiz == 0 {
if ep != nil {
// copy data from sender
recvDirect(c.elemtype, sg, ep)
}
} else {
// Queue is full. Take the item at the
// head of the queue. Make the sender enqueue
// its item at the tail of the queue. Since the
// queue is full, those are both the same slot.
qp := chanbuf(c, c.recvx)
// copy data from queue to receiver
if ep != nil {
typedmemmove(c.elemtype, ep, qp)
}
// copy data from sender to queue
typedmemmove(c.elemtype, qp, sg.elem)
c.recvx++
if c.recvx == c.dataqsiz {
c.recvx = 0
}
c.sendx = c.recvx
}
gp := sg.g
goready(gp, skip+1)
}
バッファなしチャネルだった場合、
-
recvDirect
関数で、受信した値を受け取りたい変数に直接結果を書き込み -
goready
関数で、GのステータスをGrunnable
に変更
バッファありチャネルだった場合、
-
chanbuf
関数で、次に受け取る値がある場所(=buf
のインデックスrecvx
番目)へのポインタをget - 1で手に入れた情報を使って、受信した値を受け取りたい変数に直接結果を書き込み
- 値が受信済みになって空いた
buf
の位置(=buf
のインデックスrecvx
番目)に、送信待ちになっていた値を書き込み -
recvx
の値を更新 -
sendx
の値を、recvx
と同じ値になるように更新 -
goready
関数で、GのステータスをGrunnable
に変更
送信待ちがなく、かつバッファに受信可能な値がある
このような場合では、バッファの中の値を直接受け取るだけでOKです。
func chanrecv(c *hchan, ep unsafe.Pointer, block bool) (selected, received bool) {
// (一部抜粋)
if c.qcount > 0 {
// Receive directly from queue
qp := chanbuf(c, c.recvx)
if ep != nil {
typedmemmove(c.elemtype, ep, qp) // epにバッファの中身を書き込み
}
// recvxの値を更新
c.recvx++
if c.recvx == c.dataqsiz {
c.recvx = 0
}
}
}
チャネルから受け取れる値がない場合
送信待ちのGもなく、バッファの中にデータがない場合は、その場では値を受信できません。
その場合はチャネルをブロックして、当該Gを待ちにする必要があります。
このような場合、chanrecv
関数ではどのように処理をしているのでしょうか。
func chanrecv(c *hchan, ep unsafe.Pointer, block bool) (selected, received bool) {
// (一部抜粋)
// no sender available: block on this channel.
// sudogを作って設定
gp := getg()
mysg := acquireSudog()
mysg.elem = ep
mysg.g = gp
// 作ったsudogをrecvqに追加
c.recvq.enqueue(mysg)
// (goparkの内容については前述の通り)
gopark(chanparkcommit, unsafe.Pointer(&c.lock), waitReasonChanReceive, traceEvGoBlockRecv, 2)
}