🐻

cloudwego/netpollについて語る

2021/10/16に公開

はじめに

前回?のnet/httpについてはこちらから
https://zenn.dev/cube/articles/4c75bc8455ef92

前提としてepollの知識と、goの標準パッケージであるnet/httpの知識があった方が読みやすいので、お時間があれば↑の記事を読んでいただければと思います。

従来のnet/httpは1コネクションに対し、1Goroutine作られてしまいます。
なので大量リクエストの時にはGoroutineも大量に作られてしまい、結果メモリ使用量だったり、コンテキストスイッチがより多く発生してしまうという問題があります。

Goroutineの生成個数を減らし、上記の問題を解決するのがcloudwego/netpollです。

net/httpとのデザインの違い

net/httpは下記のような形でした。

それに対してcloudwego/netpollは下記のような形です。

step1

step2

step3

net/httpとの最大の違いはGoroutineの数が一定になっていることです。

これはnet/httpでは1つのacceptFDに1つのGoroutineが紐づいているのに対し、
cloudwego/netpollでは生成した一定数のGoroutineがそれぞれepollを持っていて、
epollにacceptFDが紐づいていくので、一つのGoroutineで複数のacceptFDを扱えるので一定のGoroutineで済むようになっています。

次節は実際にソースコードを読んでいきましょう。
その際に以下の疑問が解決するように読んでいきたいと思います。

  1. 一定個のGoroutineを作るとあるがどのように作っているか、またその個数は?
  2. ランダムに選んだGのepollにlistenFDを登録するとあるが、ランダムに選ぶとはどういうことか?
  3. listenFDをGoroutineのもつepollに登録するところ
  4. acceptFDをGoroutineのもつepollに登録するところ
  5. Goroutineがepoll_waitでFDの準備完了を検知するところ

実際のソースコードはこちらです。
https://github.com/cloudwego/netpoll

下記に使用例があったので、使用例を足掛かりにソースコードを読んでいきましょう。
https://golangrepo.com/repo/cloudwego-netpoll-go-network

サンプル

package main
 
 import (
 	"context"
 	"fmt"
 	"time"
 
 	"github.com/cloudwego/netpoll"
 )
 
 func main() {
 	network, address := "tcp", "127.0.0.1:8888"
        
	//listenFDの生成
 	listener, err := netpoll.CreateListener(network, address)
 	if err != nil {
 		panic("create netpoll listener fail")
 	}
        
	//handlerをonRequestにセット、onRequestはacceptFDを読み込むときに起動します。
 	var onRequest netpoll.OnRequest = handler
 
 	var opts = []netpoll.Option{
 		netpoll.WithReadTimeout(1 * time.Second),
 		netpoll.WithIdleTimeout(10 * time.Minute),
 		netpoll.WithOnPrepare(nil),
 	}
        
	//eventLoopを作成
 	eventLoop, err := netpoll.NewEventLoop(onRequest, opts...)
 	if err != nil {
 		panic("create netpoll event-loop fail")
 	}
        
	//eventLoopを起動
 	err = eventLoop.Serve(listener)
 	if err != nil {
 		panic("netpoll server exit")
 	}
 }
 
 func handler(ctx context.Context, connection netpoll.Connection) error {
 	reader := connection.Reader()
 	buf, err := reader.Peek(reader.Len())
 	if err != nil {
 		return err
 	}
 
 	n, err := connection.Write(buf)
 
 	if err != nil {
 		return err
 	}
 	if n != len(buf) {
 		return fmt.Errorf("write failed: %v < %v", n, len(buf))
 	}
 
 	return nil
 }

前段 listenFD、eventLoopの生成,eventLoopの起動

ここでは前段としてlistenFD、eventLoopの生成,eventLoopの起動までを見ていきます。

 func main() {
	//listenFDの生成
 	listener, err := netpoll.CreateListener(network, address)
	
	 //eventLoopを作成
 	eventLoop, err := netpoll.NewEventLoop(onRequest, opts...)
	}
	
	//eventLoopを起動
 	err = eventLoop.Serve(listener)
func CreateListener(network, addr string) (l Listener, err error) {
	if network == "udp" {
		// TODO: udp listener.
		return udpListener(network, addr)
	}
	// tcp, tcp4, tcp6, unix
	ln, err := net.Listen(network, addr)
	if err != nil {
		return nil, err
	}
	return ConvertListener(ln)
}

func ConvertListener(l net.Listener) (nl Listener, err error) {
	if tmp, ok := l.(Listener); ok {
		return tmp, nil
	}
	ln := &listener{}
	ln.ln = l
	ln.addr = l.Addr()
	err = ln.parseFD()
	if err != nil {
		return nil, err
	}
	return ln, syscall.SetNonblock(ln.fd, true)
}

net.Listen以降の動きは下記で見ているので今回は省略します。
https://zenn.dev/cube/articles/4c75bc8455ef92#listenandserve-->-net.listen-->-listenconfig.listen-->-syslistener.listentcp-->-internetsocket

CreateListenerではlistenerにnet.Listen由来のFD等の情報を詰め込んでいます。

func NewEventLoop(onRequest OnRequest, ops ...Option) (EventLoop, error) {
	opt := &options{}
	for _, do := range ops {
		do.f(opt)
	}
	return &eventLoop{
		opt:     opt,
		prepare: opt.prepare(onRequest),
		stop:    make(chan error, 1),
	}, nil
}

eventLoopを作っているだけです。

func (evl *eventLoop) Serve(ln net.Listener) error {
	npln, err := ConvertListener(ln)
	if err != nil {
		return err
	}
	evl.Lock()
	evl.svr = newServer(npln, evl.prepare, evl.quit)
	evl.svr.Run()
	evl.Unlock()
	return evl.waitQuit()
}
 

return evl.waitQuit()のところは下記のようになっていて、stopChanにて受信待ちをしています。

func (evl *eventLoop) waitQuit() error {
	return <-evl.stop
}

ここまでで準備は終わりです。
listener,eventLoop,serverを作り、serverをRunするところまで来ました。ここからが本編です。

本編 server.Runからスタート

server.Run -> pollmanager.init

func (evl *eventLoop) Serve(ln net.Listener) error {
	evl.svr = newServer(npln, evl.prepare, evl.quit)
	evl.svr.Run()
}

server.Runから見ていきましょう。

func (s *server) Run() (err error) {
	s.operator = FDOperator{
		FD:     s.ln.Fd(), <----listenFDのこと
		OnRead: s.OnRead,
		OnHup:  s.OnHup,
	}
	s.operator.poll = pollmanager.Pick()
	err = s.operator.Control(PollReadable)
	if err != nil {
		s.quit(err)
	}
	return err
}

FDOperatorとかいうわけがわからないものが出てきましたが、今は一旦放置します。
後々見ていきますが、今少し説明するとFDOperatorはepollとfdをつなぐ役割を持っています。(epollにfdを登録したり)

pollmanagerはpackage netpollのグローバル変数です。
cloudwego/netpollはコードも少ないのでpackage netpollにすべてのファイルがある構成です。
なのでpollmanagerはどこからでも見える状態です。
ここでpollmanager.Pickを見ていく際に面倒なのが、poll_manager.goにinit()が存在することです。
init()が起動するのはそのパッケージがimportされた時なのですが、今回は全ファイルnetpollなので、netpollをimportした時にpoll_manger.goのinit()は起動していることになります。

なのでpollmanager.Pickを見る前にinitを見ていきましょう。

init()は下記が詳しいです。
https://qiita.com/YusukeIwaki/items/f1f92c23d7ee0ca8dc7a

pollmanager.init ~ pollmanager.SetNumLoops

// manage all pollers
var pollmanager *manager

func init() {
	pollmanager = &manager{}
	pollmanager.SetLoadBalance(RoundRobin)
	pollmanager.SetNumLoops(defaultNumLoops())
}

type manager struct {
	NumLoops int
	balance  loadbalance // load balancing method
	polls    []Poll      // all the polls
}

pollの役割について少し解説すると、大雑把に言えばpoll = epollを持ったGと言えます。
これもFDOperatorと同様後々詳しく動きを見ていきます。
pollManagerの役割としてはアルゴリズムに従ってpollmanager.pollsの中からpollManager.Pickでpollを選ぶのが役割です。

ここのSetNumLoopsが1.の答え、SetLoadBalanceが2.の答えとなっています。

SetLoadBalanceはアルゴリズムを引数にとって、manager.balanceにアルゴリズムに応じた構造体をくっつけるのが役割です。
ここで選択したアルゴリズムによってmanager.Pickの挙動が変わり、アルゴリズムに沿ってpollが選ばれます。(デフォルトはラウンドロビン)

func (m *manager) SetLoadBalance(lb LoadBalance) error {
	if m.balance != nil && m.balance.LoadBalance() == lb {
		return nil
	}
	m.balance = newLoadbalance(lb, m.polls)
	return nil
}

func newLoadbalance(lb LoadBalance, polls []Poll) loadbalance {
	switch lb {
	case Random:
		return newRandomLB(polls)
	case RoundRobin:
		return newRoundRobinLB(polls)
	}
	return newRoundRobinLB(polls)
}

func newRoundRobinLB(polls []Poll) loadbalance {
	return &roundRobinLB{polls: polls, pollSize: len(polls)}
}

type roundRobinLB struct {
	polls    []Poll
	accepted uintptr // accept counter
	pollSize int
}

SetNumLoopsは以下のようになっています。

func defaultNumLoops() int {
	procs := runtime.GOMAXPROCS(0)
	loops := 1
	if procs > 4 {
		loops = procs
	}
	return loops
}

func (m *manager) SetNumLoops(numLoops int) error {
	if numLoops < 1 {
		return fmt.Errorf("set invaild numLoops[%d]", numLoops)
	}
	// if less than, reset all; else new the delta.
	if numLoops < m.NumLoops {
		m.NumLoops = numLoops
		return m.Reset()
	}
	m.NumLoops = numLoops
	return m.Run()
}

func (m *manager) Run() error {
	// new poll to fill delta.
	for idx := len(m.polls); idx < m.NumLoops; idx++ {
		var poll = openPoll()
		m.polls = append(m.polls, poll)
		go poll.Wait()
	}
	// LoadBalance must be set before calling Run, otherwise it will panic.
	m.balance.Rebalance(m.polls)
	return nil
}

ここではruntime.GOMAXPROCS(0)でCPUのコア数を返し、その数をmanager.NumLoopsとしています、コア数4以下だったらloopsは1です。
manager.Runにて現在m.pollsは0なので、m.NumLoopsの数のpollを作ります。

そののちに各pollをそれぞれgoroutineでpoll.Wait()としています。
poll.Waitはepoll_waitを回していて、
ここが5. Goroutineがepoll_waitでFDの準備完了を検知するところの答えとなっているところですね。

 func (p *defaultPoll) Wait() (err error) {
 	// init
 	var caps, msec, n = barriercap, -1, 0
 	p.Reset(128, caps)
 	// wait
 	for {
 		if n == p.size && p.size < 128*1024 {
 			p.Reset(p.size<<1, caps)
 		}
 		n, err = EpollWait(p.fd, p.events, msec)  <-ここでp.fdとはepollfdのことでp.eventsはepollfd -> eventsと使うバッファ
 		if err != nil && err != syscall.EINTR {
 			return err
 		}
 		if n <= 0 {
 			msec = -1
 			runtime.Gosched()  <-ずっとepoll回してPに居座ることはない、pollに紐づFDがないならここで処理を譲るし、時間経過で他のpollにスケジューリングされるのでもしずっと処理が回ってこないpollがあって一生処理されないFD、リクエストがあるのではという心配はいらない
 			continue
 		}
 		msec = 0
 		if p.Handler(p.events[:n]) {
 			return nil
 		}
 	}
 }

openPollではpollの作成とepollの作成を行っています。

func openPoll() Poll {
 	return openDefaultPoll()
 }
 
 func openDefaultPoll() *defaultPoll {
 	var poll = defaultPoll{}
 	poll.buf = make([]byte, 8)
 	var p, err = syscall.EpollCreate1(0)  <-ここでpollごとにepollfdを作っている
 	if err != nil {
 		panic(err)
 	}
 	poll.fd = p    <--poll.fd = epoll
	
 	var r0, _, e0 = syscall.Syscall(syscall.SYS_EVENTFD2, 0, 0, 0) <-eventfd2はepollとは関係なく作ったeventfdに対して通知をする、または通知を待つことができる 、
 	if e0 != 0 {
 		syscall.Close(p)
 		panic(err)
 	}
 
 	poll.Reset = poll.reset
 	poll.Handler = poll.handler
 
 	poll.wop = &FDOperator{FD: int(r0)}
 	poll.Control(poll.wop, PollReadable)
 	return &poll
 }

ここまでserver.Run ~ manager.SetNumLoopsまで見てきました。
主にやったことはpollの生成、pollManager.Pick時に使うアルゴリズムの決定です。
次はserver.Runに戻って、manager.Pickから先に進みましょう。

server.Run -> manager.Pick -> FDOperator.Control

func (s *server) Run() (err error) {
	s.operator = FDOperator{
		FD:     s.ln.Fd(), <----listenFDのこと
		OnRead: s.OnRead,
		OnHup:  s.OnHup,
	}
	s.operator.poll = pollmanager.Pick()
	err = s.operator.Control(PollReadable)
	if err != nil {
		s.quit(err)
	}
	return err
}
func (m *manager) Pick() Poll {
	return m.balance.Pick()
}

m.balanceはinit()の時にSetLoadBalanceでstruct roundRobinLBをセットしていました。

func (b *roundRobinLB) Pick() (poll Poll) {
	idx := int(atomic.AddUintptr(&b.accepted, 1)) % b.pollSize
	return b.polls[idx]
}

Pickでpollを選んだあとはFDOperator.Controlです。
前にepollとfdを結びつける役割と書きましたがここで詳しく見ていきます。

func (op *FDOperator) Control(event PollEvent) error {
	return op.poll.Control(op, event)
}

ここでpollはPickで選んだpollです。

func (p *defaultPoll) Control(operator *FDOperator, event PollEvent) error {
	var op int
	var evt epollevent
	*(**FDOperator)(unsafe.Pointer(&evt.data)) = operator
	
	switch event {
	case PollReadable:
		operator.inuse()
		op, evt.events = syscall.EPOLL_CTL_ADD, syscall.EPOLLIN|syscall.EPOLLRDHUP|syscall.EPOLLERR
	...
	}
	return EpollCtl(p.fd, op, operator.FD, &evt)
}

poll.Controlではepoll_ctlでpollが持つepoll = p.fdにoperator.FDを登録しています。
ここで現在のoperatorは下記のように作られていました。

s.operator = FDOperator{
		FD:     s.ln.Fd(), <----listenFDのこと
		OnRead: s.OnRead,
		OnHup:  s.OnHup,
	}

なのでoperator.FDはlistenFDを指します。

さらに、evt.dataをoperatorとしていることで後々epoll_waitでFDと結びついたoperatorを取り出すことができるようにしています。
net/httpではpollDescを結び付けてGoroutineを取り出していました。

eventはPollReadableであったので、ここでやっていることは、listenFDが読み取り可能になった時(リクエストがあった時)をepoll_waitで監視するということです。

実際に読み取り可能になったら各pollはforループでepoll_waitを回していたので、そのpollからFDと結びついたoperatorを使って様々な処理をしていくわけですね。

ここが3. listenFDをGoroutineのもつepollに登録するところの答えです。

epollの基本とnet/httpのepollの扱いは下記を参照ください。
https://zenn.dev/cube/articles/4c75bc8455ef92#どうやってepollを使って非同期ioを実現するか、epollの使い方

https://zenn.dev/cube/articles/4c75bc8455ef92#polldesc.init-~-netpollopen

さてラウンドロビンで選んだpollのepollにlistenFDを登録するところまで見ました。
次はepoll_waitでlistenFDが準備完了になった時の動きを見ていきましょう。

poll.Wait -> EpollWait -> poll.Handler -> server.OnRead

 func (p *defaultPoll) Wait() (err error) {
 	// init
 	var caps, msec, n = barriercap, -1, 0
 	p.Reset(128, caps)
 	// wait
 	for {
 		if n == p.size && p.size < 128*1024 {
 			p.Reset(p.size<<1, caps)
 		}
 		n, err = EpollWait(p.fd, p.events, msec) 
 		if err != nil && err != syscall.EINTR {
 			return err
 		}
 		if n <= 0 {
 			msec = -1
 			runtime.Gosched() 
 			continue
 		}
 		msec = 0
 		if p.Handler(p.events[:n]) {
 			return nil
 		}
 	}
 }
 
 func EpollWait(epfd int, events []epollevent, msec int) (n int, err error) {
	var r0 uintptr
	var _p0 = unsafe.Pointer(&events[0])
	if msec == 0 {
		r0, _, err = syscall.RawSyscall6(syscall.SYS_EPOLL_WAIT, uintptr(epfd), uintptr(_p0), uintptr(len(events)), 0, 0, 0)
	} else {
		r0, _, err = syscall.Syscall6(syscall.SYS_EPOLL_WAIT, uintptr(epfd), uintptr(_p0), uintptr(len(events)), uintptr(msec), 0, 0)
	}
	if err == syscall.Errno(0) {
		err = nil
	}
	return int(r0), err
}

ここっでp.fd = pollごとに作ったepollのことでした。
EpolLWaitはシステムコールのepoll_waitを実行しています。eventsに準備完了したfdと結びつくevtがコピーされます。
evtはstruct epolleventでEpollCtl(p.fd, op, operator.FD, &evt)のときに使ったevtです。

poll.Handlerですが、これはpoll.handlerです。openDefaultPollの時にセットしました。

func (p *defaultPoll) handler(events []epollevent) (closed bool) {
 	var hups []*FDOperator // TODO: maybe can use sync.Pool
 	for i := range events {
 		var operator = *(**FDOperator)(unsafe.Pointer(&events[i].data)) -//ここのdataはcontrolで登録したOperator
 		
 		
 		switch {
 		...
		
 		case events[i].events&syscall.EPOLLIN != 0:
 			// for non-connection                 
			//listenFDの時はここ
 			if operator.OnRead != nil {
 				operator.OnRead(p)  -break
 			}
 			// only for connection
			//acceptFDの時はここ
 			var bs = operator.Inputs(p.barriers[i].bs)
 			if len(bs) == 0 {
 				break
 			}
 			var n, err = readv(operator.FD, bs, p.barriers[i].ivs)
 			operator.InputAck(n)
 			if err != nil && err != syscall.EAGAIN && err != syscall.EINTR {
 				log.Printf("readv(fd=%d) failed: %s", operator.FD, err.Error())
 				hups = append(hups, operator)
 			}
 		
 	return false
 }

★で取り出したoperatorはserver.Runであった下記でした。
s.operator = FDOperator{
FD: s.ln.Fd(), <----listenFDのこと
OnRead: s.OnRead,
OnHup: s.OnHup,
}
OnRead=s.OnReadとなっているので、OnRead != nilとなり、☆でOnReadが起動します。

func (s *server) OnRead(p Poll) error {
	// accept socket
	conn, err := s.ln.Accept()
	if err != nil {
		// shut down
		if strings.Contains(err.Error(), "closed") {
			s.operator.Control(PollDetach)
			s.quit(err)
			return err
		}
		log.Println("accept conn failed:", err.Error())
		return err
	}
	if conn == nil {
		return nil
	}
	// store & register connection
	var connection = &connection{}
	connection.init(conn.(Conn), s.prepare)
	if !connection.IsActive() {
		return nil
	}
	var fd = conn.(Conn).Fd()
	connection.AddCloseCallback(func(connection Connection) error {
		s.connections.Delete(fd)
		return nil
	})
	s.connections.Store(fd, connection)
	return nil
}

さて、ここでやっとAcceptが出てきましたね。
ここからacceptFDをPickで選んだ適当なpollのepollに登録することになっていきます。

次節からはacceptFDを登録していくところを見ていきましょう。

connection.init -> connection.checkNetFD -> connecition.initOperator

 func (c *connection) init(conn Conn, prepare OnPrepare) (err error) {
   	// conn must be *netFD{}
   	c.checkNetFD(conn)
   
   	c.initFDOperator()
   	syscall.SetNonblock(c.fd, true)
   
   	// init buffer, barrier, finalizer
   	c.readTrigger = make(chan int, 1)
   	c.inputBuffer, c.outputBuffer = NewLinkBuffer(pagesize), NewLinkBuffer()
   	c.inputBarrier, c.outputBarrier = barrierPool.Get().(*barrier), barrierPool.Get().(*barrier)
   	c.setFinalizer()
   
   	// check zero-copy
   	if setZeroCopy(c.fd) == nil && setBlockZeroCopySend(c.fd, defaultZeroCopyTimeoutSec, 0) == nil {
   		c.supportZeroCopy = true
   	}
   	return c.onPrepare(prepare)
   }

connection.initからゲットしたacceptFDを登録していきます。

onPrepareから実際にacceptFDを登録していくのですが、少しcheckNetFD,initFDOperatorを見ていきます。

func (c *connection) checkNetFD(conn Conn) {
   	if nfd, ok := conn.(*netFD); ok {
   		c.netFD = *nfd
   		return
   	}
   	c.netFD = netFD{
   		fd:         conn.Fd(),
   		localAddr:  conn.LocalAddr(),
   		remoteAddr: conn.RemoteAddr(),
   	}
   }

checkNetFdでconnection.netFDにセット、conn.Fd()はacceptFD

func (c *connection) initFDOperator() {
   	op := allocop() //ここからfdOperatorが割り振られる
   	op.FD = c.fd //Accept()由来のconn,connection.fdはcheckNetFDでconnからとった
   	op.OnRead, op.OnWrite, op.OnHup = nil, nil, c.onHup //listenFDの時に作ったoperatorと違ってOnReadがnilとなっている
   	op.Inputs, op.InputAck = c.inputs, c.inputAck
   	op.Outputs, op.OutputAck = c.outputs, c.outputAck
   
   	// if connection has been registered, must reuse poll here. 
     if c.pd != nil && c.pd.operator != nil {
   		op.poll = c.pd.operator.poll
   	}
   	c.operator = op
   }   
   
   func allocop() *FDOperator {
	return opcache.alloc()
  }
   
   func (c *operatorCache) alloc() *FDOperator {
   	c.lock()
   	if c.first == nil {
   		const opSize = unsafe.Sizeof(FDOperator{})
   		n := block4k / opSize
   		if n == 0 {
   			n = 1
   		}
   		// Must be in non-GC memory because can be referenced
   		// only from epoll/kqueue internals.
   		for i := uintptr(0); i < n; i++ {
   			pd := &FDOperator{}
   			c.cache = append(c.cache, pd)
   			pd.next = c.first
   			c.first = pd
   		}
   	}
   	op := c.first
   	c.first = op.next
   	c.unlock()
   	return op
   }

connection.opにoperatorを割り当てています。
serverで作ったoperatorと違ってOnReadがnilになっているのでepoll_wait -> handlerの時が違う分岐になってきます。

connection.onPrepare -> connection.register

 func (c *connection) init(conn Conn, prepare OnPrepare) (err error) {
   	return c.onPrepare(prepare)
   }

initの第二引数のprepareはserver.prepareで、NewEventLoopのopt.prepare(onRequest)のこと
OnRequestはユーザーが最初にOnRequest = handlerとしたやつですね。

// NewEventLoop .
func NewEventLoop(onRequest OnRequest, ops ...Option) (EventLoop, error) {
	opt := &options{}
	for _, do := range ops {
		do.f(opt)
	}
	return &eventLoop{
		opt:     opt,
		prepare: opt.prepare(onRequest),
		stop:    make(chan error, 1),
	}, nil
}

func (opt *options) prepare(onRequest OnRequest) OnPrepare { 
 	return func(connection Connection) context.Context {
 		connection.SetOnRequest(onRequest)
 		connection.SetReadTimeout(opt.readTimeout)
 		connection.SetIdleTimeout(opt.idleTimeout)
 		if opt.onPrepare != nil {
 			return opt.onPrepare(connection)
 		}
 		return context.Background()
 	}
 }
  func (c *connection) onPrepare(prepare OnPrepare) (err error) {
   	// calling prepare first and then register.
   	if prepare != nil {
   		c.ctx = prepare(c)
   	}
   	// prepare may close the connection.
   	if c.IsActive() {
   		return c.register()
   	}
   	return nil
   }

prepare(c)のところで connection.SetOnRequestが起動、ユーザーが定義したOnRequestをconnectionにセットします。
connectionはonEvent構造体を埋め込んでいて、SetOnRequestはonEventのメソッドです。

type onEvent struct {
	ctx       context.Context
	process   atomic.Value // value is OnRequest
	callbacks atomic.Value // value is latest *callbackNode
}

 func (on *onEvent) SetOnRequest(onReq OnRequest) error {
 	if onReq != nil {
 		on.process.Store(onReq)
 	}
 	return nil
 }

processにOnRequestを入れていて、atomic.Valueはinterfaceなのでなんでも入ります。
ここでprocessにいれたOnRequestはacceptFDが準備完了したときに使います。
OnRequest時にまたやりましょう。

 func (c *connection) register() (err error) {
   	if c.operator.poll != nil {
   		err = c.operator.Control(PollModReadable)
   	} else {
   		c.operator.poll = pollmanager.Pick() //pickで適当に選んだpollとacceptFDを紐づける、accpetFDが準備完了したら紐づいたpollが反応する
   		err = c.operator.Control(PollReadable)
   	}
   	if err != nil {
   		log.Println("connection register failed:", err.Error())
   		c.Close()
   		return Exception(ErrConnClosed, err.Error())
   	}
   	return nil
   }

ifの方のc.operator.poll != nilの状況がいつ来るのかがわかりませんでした(調査不足)
ただ今回は初回でregisterに入ってきたときなので、elseを見ていけば大丈夫です。
elseの方では、これまで見てきたpollmanager.Pickとoperator.Controlがありますね。

今connection.operatorはinitFDOperatorでつくったoperatorが入っていて、operator.FDはacceptFDとなっていました。

なのでlistenFDのときやったようにacceptFDを対応するpollにoperatorを媒介として結びつけます。
operator.Control以降の流れは依然と同じです。

ここが4. acceptFDをGoroutineのもつepollに登録するところ の答えです。

最後にacceptFDの準備完了を検知した後でユーザーが定義したOnRequest = handlerを使うところを見てみましょう。

EpollWait -> poll.handler -> operator.InputAck

 func (p *defaultPoll) Wait() (err error) {
 	
 	
 	// wait
 	for {
 		
 		n, err = EpollWait(p.fd, p.events, msec) 
 		if err != nil && err != syscall.EINTR {
 			return err
 		}
 		if n <= 0 {
 			msec = -1
 			runtime.Gosched() 
 			continue
 		}
 		msec = 0
 		if p.Handler(p.events[:n]) {
 			return nil
 		}
 	}
 }
 
func (p *defaultPoll) handler(events []epollevent) (closed bool) {
 	var hups []*FDOperator // TODO: maybe can use sync.Pool
 	for i := range events {
 		var operator = *(**FDOperator)(unsafe.Pointer(&events[i].data)) -//ここのdataはcontrolで登録したOperator
 		
 		
 		switch {
 		...
		
 		case events[i].events&syscall.EPOLLIN != 0:
 			// for non-connection                 
			//listenFDの時はここ
 			if operator.OnRead != nil {
 				operator.OnRead(p)  -break
 			}
 			// only for connection
			//acceptFDの時はここ
 			var bs = operator.Inputs(p.barriers[i].bs)
 			if len(bs) == 0 {
 				break
 			}
 			var n, err = readv(operator.FD, bs, p.barriers[i].ivs)
 			operator.InputAck(n)
 			if err != nil && err != syscall.EAGAIN && err != syscall.EINTR {
 				log.Printf("readv(fd=%d) failed: %s", operator.FD, err.Error())
 				hups = append(hups, operator)
 			}
 		
 	return false
 }

acceptFDの時に対応するFDOperatorをconnection.initOperatorで作成したときにOnRead=nilであったこと思い出すと、operator.Inputs以降が動くことがわかります。

実際にユーザーが定義したOnRequestが利用されるのはoperator.InputAckです。

func (c *connection) inputAck(n int) (err error) {
 	if n < 0 {
 		n = 0
 	}
 	leftover := atomic.AddInt32(&c.waitReadSize, int32(-n))
 	err = c.inputBuffer.BookAck(n, leftover <= 0)
 	c.triggerRead()
 	c.onRequest()
 	return err
 }
 
 func (c *connection) onRequest() (err error) {
	var process = c.process.Load()
	
	var task = func() {
		if c.ctx == nil {
			c.ctx = context.Background()
		}
		var handler = process.(OnRequest)  
	START:
		// NOTE: loop processing, which is useful for streaming.
		for c.Reader().Len() > 0 && c.IsActive() {
			// Single request processing, blocking allowed.
			handler(c.ctx, c)
		}
		// Handling callback if connection has been closed.
		if !c.IsActive() {
			c.closeCallback(false)
			return
		}
		// Double check when exiting.
		c.unlock(processing)
		if c.Reader().Len() > 0 {
			if !c.lock(processing) {
				return
			}
			goto START
		}
	}
	
	runTask(c.ctx, task)
	return nil
}

connection.onRequestではconnection.registerでproccessにロードしたユーザー定義のOnRequestをprocess = c.process.Load()で取り出しています。

その後runTask内でtask関数を発火させ、task関数内のhandler(c.ctx, c)でユーザー定義のOnRequestをようやく発火となっています。

おわりに

ここまでコードを追って実際どのような工夫が凝らされているか見てきました。
ハイライトは個人的にはepollを複数のGに持たせ、pollmanager,FDOperator,pollという要素を使いうまく複数のコネクションを一つのGで扱えるようにしていたところだと思います。
疑問点があったのでコードを詳しく追ってみましたが、細かい部分の動きというより、epollを複数のGに持たせるというデザインを学べたことが一番の収穫でした。

こういうデザインレベルでの工夫を凝らしているコードをもっと学んでいきたいなと感じたプロジェクトでした。

Discussion