🌐

Goのnet/httpのServer側デザインの良さについて語る(Connection処理編)

2021/10/07に公開

あらすじ

https://zenn.dev/cube/articles/0731eb7c14bb5d

前回まではacceptを非同期にしてlistenFDが準備完了までの間は他のGoroutineを動かし(epoll_ctl & gopark & スケジューリング)、
listenFDが準備完了になったら処理をacceptに戻す(epoll_wait & netpoll)、というところを見ていきました。
今回はacceptが完了して、accept由来のfd,acceptFDをゲットし、そのFDをGoroutineで扱うところを見ていきましょう。
やっとListen編で紹介した下の画像の動きですね。

ここでは指針を↑の画像の動きに絞ってみていきたいと思います。
上記画像の動きをするためにはaccept編で見たように、下の3つが必要となります。

  1. fdをepoll_ctlで監視対象に
  2. goparkでsleep&スケジューリング
  3. スケジューラー内のepoll_waitでfdが準備完了になったら中断していたgoroutineを再起動
    この三つを中心に据えてみていきましょう。

1. fdをepoll_ctlで監視対象に(acceptFDを監視対象とする)

さて、前回下記のserver.Serveに戻るところまで見ましたが...ここに戻るまでに一つだけ見ておかなくてはいけないところがあります。
1.fdをepoll_ctlで監視対象に
上の動きをserver.Serveに戻るまでにやっているのでまずはそこだけ押さえましょう。

func (srv *Server) Serve(l net.Listener) error {
	...
	

	ctx := context.WithValue(baseCtx, ServerContextKey, srv)
	for {
		rw, err := l.Accept() <-...
		
		
		c := srv.newConn(rw)
		c.setState(c.rwc, StateNew, runHooks) // before Serve can return
		go c.serve(connCtx)
	}
}

一旦話をacceptが成功したところ、fd.Acceptまで戻します。

fd_unix.go
func (fd *FD) Accept() (int, syscall.Sockaddr, string, error) {
	//無限ループ
	for {
		s, rsa, errcall, err := accept(fd.Sysfd) <-1
		if err == nil {
			return s, rsa, "", err
		}
		switch err {

		case syscall.EAGAIN:
			if fd.pd.pollable() {
				if err = fd.pd.waitRead(fd.isFile); err == nil {  
					continue
				}
			}	
	}
}

func (fd *netFD) accept() (netfd *netFD, err error) {
	d, rsa, errcall, err := fd.pfd.Accept()
	if err != nil {
		if errcall != "" {
			err = wrapSyscallError(errcall, err)
		}
		return nil, err
	}

	if netfd, err = newFD(d, fd.family, fd.sotype, fd.net); err != nil {
		poll.CloseFunc(d)
		return nil, err
	}
	if err = netfd.init(); err != nil { <---★
		netfd.Close()
		return nil, err
	}
	lsa, _ := syscall.Getsockname(netfd.pfd.Sysfd)
	netfd.setAddr(netfd.addrFunc()(lsa), netfd.addrFunc()(rsa))
	return netfd, nil
}

func newFD(sysfd, family, sotype int, net string) (*netFD, error) {
	ret := &netFD{
		pfd: poll.FD{
			Sysfd:         sysfd,
			IsStream:      sotype == syscall.SOCK_STREAM,
			ZeroReadIsEOF: sotype != syscall.SOCK_DGRAM && sotype != syscall.SOCK_RAW,
		},
		family: family,
		sotype: sotype,
		net:    net,
	}
	return ret, nil
}

newFDで d = acceptFDを使ってnetFDを作っています。listen編でも同じ流れがありました。あとはlisten編と同じ流れなのですが二つ説明したいことがあります。

listen編で見た流れ↓
https://zenn.dev/cube/articles/4c75bc8455ef92#socket-->-netfd.listenstream-->-netfd.init-->-fd.init-->-polldesc.init

1.newFDの生成とinitについて

2.epollへの登録

1.newFDの生成とinitについて

newFDとそのinitは下記のようになっています。

func newFD(sysfd, family, sotype int, net string) (*netFD, error) {
	ret := &netFD{         <------------★
		pfd: poll.FD{
			Sysfd:         sysfd,
			IsStream:      sotype == syscall.SOCK_STREAM,
			ZeroReadIsEOF: sotype != syscall.SOCK_DGRAM && sotype != syscall.SOCK_RAW,
		},
		family: family,
		sotype: sotype,
		net:    net,
	}
	return ret, nil
}

func (fd *netFD) init() error {
	return fd.pfd.Init(fd.net, true)
}

func (fd *FD) Init(net string, pollable bool) error {
	
	err := fd.pd.init(fd)
	
	return err
}

ここでFD.pdはpollDescなのですが★印の時にpollDescは作成されていませんでした。つまりnilですね。
nilからメソッドを実行していることになりますが、Goではポインタレシーバーならnilからのメソッド実行は合法なのでOKです。
知ってはいたのですが実際に利用されているところを見ると少し驚きますね。

さて、このあとpollDesc.initと続いていき,pollDescの生成やepoll_ctlでacceptFDを登録していきます。

一個腑に落ちない点としてFDにpollDescを実際に入れているところが見つからないところです。FDにpollDescを持たせている意味はinitをするためというだけで、実際にFDからはpollDescを操作することはないのでしょうか...?
(詳しい方いたら教えてくだされば助かります。)

2.epollへの登録

pollDesc.init ~と続いていき、runtime_pollServerInitを実行するときの話です。

fd_poll_runtime
func (pd *pollDesc) init(fd *FD) error {
	serverInit.Do(runtime_pollServerInit)
}

func poll_runtime_pollServerInit() {
	netpollGenericInit()
}

func netpollGenericInit() {
	if atomic.Load(&netpollInited) == 0 {  <--------------lockInit(&netpollInitLock, lockRankNetpollInit)
		lock(&netpollInitLock)
		if netpollInited == 0 {
			netpollinit()
			atomic.Store(&netpollInited, 1)
		}
		unlock(&netpollInitLock)
	}
}

netpoll_epoll.go
func netpollinit() {
	epfd = epollcreate1(_EPOLL_CLOEXEC)  <---ここでepollを作成

★のatomic.Loadに注目すると、一度netpollInit()を実行し、atomic.Store(&netpollInited, 1)でnetpollInited=1となっていれば実行されないことがわかります。
一度きりの実行なのでepollcreate1でepfdを何度も作らないようにしているので、epoll=epfdは一つ、ということになります。

ここまでacceptFDがいつepollの監視対象になるか見てきました。
次はacceptが終わった後の話~goroutine起動直前の話を少しした後に、各Goroutineに分かれてからの処理のお話に行きましょう。

acceptが終わったら

やっとserver.Serveまで戻ってきましたので話を進めましょう。

func (srv *Server) Serve(l net.Listener) error {
	...
	

	ctx := context.WithValue(baseCtx, ServerContextKey, srv)
	for {
		rw, err := l.Accept() <-...
		
		
		c := srv.newConn(rw)
		c.setState(c.rwc, StateNew, runHooks) // before Serve can return
		go c.serve(connCtx)
	}
}

★のところでacceptが成功するとrw = net.Connを獲得できます。
獲得するところは下記でした。

tcpsock.go
func (l *TCPListener) Accept() (Conn, error) {
	if !l.ok() {
		return nil, syscall.EINVAL
	}
	c, err := l.accept()
	if err != nil {
		return nil, &OpError{Op: "accept", Net: l.fd.net, Source: nil, Addr: l.fd.laddr, Err: err}
	}
	return c, nil
}

func (ln *TCPListener) accept() (*TCPConn, error) {
	fd, err := ln.fd.accept() <-★ 前節でみたnetFDが返ってくるのだった、netFDの中にaccept由来のFDが入っている
	if err != nil {
		return nil, err
	}
	tc := newTCPConn(fd)
	if ln.lc.KeepAlive >= 0 {
		setKeepAlive(fd, true)
		ka := ln.lc.KeepAlive
		if ln.lc.KeepAlive == 0 {
			ka = defaultTCPKeepAlive
		}
		setKeepAlivePeriod(fd, ka)
	}
	return tc, nil
}

func newTCPConn(fd *netFD) *TCPConn {
	c := &TCPConn{conn{fd}}
	setNoDelay(c.fd, true)
	return c
}

type TCPConn struct {
	conn
}

type conn struct {
	fd *netFD
}

ここでnewTCPConnはinterfaceであるnet.Connを継承しています。
そのnewTCPConnのなかにnetFDを入れます。

func (srv *Server) newConn(rwc net.Conn) *conn {
	c := &conn{
		server: srv,
		rwc:    rwc,
	}
	if debugServerConnections {
		c.rwc = newLoggingConn("server", c.rwc)
	}
	return c
}

type conn struct {
	server *Server
	cancelCtx context.CancelFunc
	rwc net.Conn
        
	...
}

newTCPConnをさらにserver.newConnに与えて、conn構造体を作ってから、conn.serveごとにgoroutineを実行するという流れです。

FDやらConnやらがごちゃごちゃしてきたので一度まとめると下図みたいな感じです。

さて、それではconn.serveからGoroutineによって並列処理されているところをみましょう。

conn.serve -> conn.readRequest -> readRequest

func (srv *Server) Serve(l net.Listener) error {
	...
	

	ctx := context.WithValue(baseCtx, ServerContextKey, srv)
	for {
		rw, err := l.Accept()
		
		...
		
		
		c := srv.newConn(rw)
		c.setState(c.rwc, StateNew, runHooks) // before Serve can return
		go c.serve(connCtx) <-}
}

func (c *conn) serve(ctx context.Context) {
	
	// HTTP/1.x from here on.

	ctx, cancelCtx := context.WithCancel(ctx)
	c.cancelCtx = cancelCtx
	defer cancelCtx()
	
        //conn.rにconnReaderを入れて、それをBufioReaderでwrap
	c.r = &connReader{conn: c} 
	c.bufr = newBufioReader(c.r)
	c.bufw = newBufioWriterSize(checkConnErrorWriter{c}, 4<<10)
        
	
	//無限ループ、コネクションが切れるまでReadする
	for {
		w, err := c.readRequest(ctx) -...
	
		serverHandler{c.server}.ServeHTTP(w, w.req)
		
		...
	}
}

ここでは詳しく見ませんが、serverHandler{c.server}.ServeHTTP(w, w.req)のところであらかじめ登録したHandlerをつかって処理を行っています。
conn.rにconnReaderを入れて、それをBufioReaderでwrapしています。conn.bufrはreadRequestで使います。

// Read next request from connection.
func (c *conn) readRequest(ctx context.Context) (w *response, err error) {
	...
	
	req, err := readRequest(c.bufr) <-...

	ctx, cancelCtx := context.WithCancel(ctx)
	req.ctx = ctx
	req.RemoteAddr = c.remoteAddr
	req.TLS = c.tlsState
	if body, ok := req.Body.(*body); ok {
		body.doEarlyClose = true
	}

	

	w = &response{
		conn:          c,
		cancelCtx:     cancelCtx,
		req:           req,
		reqBody:       req.Body,
		handlerHeader: make(Header),
		contentLength: -1,
		closeNotifyCh: make(chan bool, 1),

		// We populate these ahead of time so we're not
		// reading from req.Header after their Handler starts
		// and maybe mutates it (Issue 14940)
		wants10KeepAlive: req.wantsHttp10KeepAlive(),
		wantsClose:       req.wantsClose(),
	}
	
	w.cw.res = w
	w.w = newBufioWriterSize(&w.cw, bufferBeforeChunkingSize)
	return w, nil
}

conn.ReadRequestではreadRequestから読み取ったものをresponseとして返しています。
ここの読み取ったものが後のServeHTTPに渡されるわけですね。
今回HTTPのtextをパースしているところはほとんど省いていますので興味のある方はぜひ元ソースをご覧ください。

  func readRequest(b *bufio.Reader) (req *Request, err error) {
	tp := newTextprotoReader(b)
	req = new(Request)

	// First line: GET /index.html HTTP/1.0
	var s string
	if s, err = tp.ReadLine(); err != nil {
		return nil, err
	}
	
}

bufio.ReaderをさらにTextprotoReaderでwrapしています。

func newTextprotoReader(br *bufio.Reader) *textproto.Reader {
	if v := textprotoReaderPool.Get(); v != nil {
		tr := v.(*textproto.Reader)
		tr.R = br
		return tr
	}
	return textproto.NewReader(br)
}

下記がReaderのwrapをまとめた図です。

なぜこんなwrapばかりしているかというと、
textprotoReaderはその名の通りtextのReadに便利なメソッドがあるから、
bufioReaderはbufio.Readline等,従来のReadを便利にできるからですね。
Readerの拡張がしたいがためにwrapを挟んでいます。

ここからはRead部分に入っていきましょう。今回bufioが施している様々な工夫は省いていますので、興味のある方はbufioを覗いてみるのも面白いかもしれません。

textproto.Reader.Readline -> textproto.Reader.readLineSlice -> bufio.Reader.ReadLine -> bufio.Reader.ReadSlice -> bufio.Reader.fill

func (r *Reader) ReadLine() (string, error) {
	line, err := r.readLineSlice()
	return string(line), err
}

func (r *Reader) readLineSlice() ([]byte, error) {
	r.closeDot()
	var line []byte
	for {
		l, more, err := r.R.ReadLine()
		if err != nil {
			return nil, err
		}
		// Avoid the copy if the first call produced a full line.
		if line == nil && !more {
			return l, nil
		}
		line = append(line, l...)
		if !more {
			break
		}
	}
	return line, nil
}

ここでr.Rはbufio.Readerのことでした。

bufio.go

func (b *Reader) ReadLine() (line []byte, isPrefix bool, err error) {
	line, err = b.ReadSlice('\n')
	...
}

func (b *Reader) ReadSlice(delim byte) (line []byte, err error) {
s := 0 // search start index
	for {
		// Search buffer.
		if i := bytes.IndexByte(b.buf[b.r+s:b.w], delim); i >= 0 {
			i += s
			line = b.buf[b.r : b.r+i+1]
			b.r += i + 1
			break
		}

		// Pending error?
		if b.err != nil {
			line = b.buf[b.r:b.w]
			b.r = b.w
			err = b.readErr()
			break
		}

		// Buffer full?
		if b.Buffered() >= len(b.buf) {
			b.r = b.w
			line = b.buf
			err = ErrBufferFull
			break
		}

		s = b.w - b.r // do not rescan area we scanned before

		b.fill() // buffer is not full  <---------★
	}
	return
}

func (b *Reader) fill() {
	// Slide existing data to beginning.
	if b.r > 0 {
		copy(b.buf, b.buf[b.r:b.w])
		b.w -= b.r
		b.r = 0
	}

	if b.w >= len(b.buf) {
		panic("bufio: tried to fill full buffer")
	}

	// Read new data: try a limited number of times.
	for i := maxConsecutiveEmptyReads; i > 0; i-- {
		n, err := b.rd.Read(b.buf[b.w:]) <--------if n < 0 {
			panic(errNegativeRead)
		}
		b.w += n
		if err != nil {
			b.err = err
			return
		}
		if n > 0 {
			return
		}
	}
	b.err = io.ErrNoProgress
}

b.rdとは&connReader{conn: c}のことでした。いよいよコネクションのReadに迫ってきましたね。

connReader.Read -> conn.Read -> netFD.Read -> FD.Read

func (cr *connReader) Read(p []byte) (n int, err error) {
	cr.lock()
	if cr.inRead {
		cr.unlock()
		if cr.conn.hijacked() {
			panic("invalid Body.Read call. After hijacked, the original Request must not be used")
		}
		panic("invalid concurrent Body.Read call")
	}
	if cr.hitReadLimit() {
		cr.unlock()
		return 0, io.EOF
	}
	if len(p) == 0 {
		cr.unlock()
		return 0, nil
	}
	if int64(len(p)) > cr.remain {
		p = p[:cr.remain]
	}
	if cr.hasByte {
		p[0] = cr.byteBuf[0]
		cr.hasByte = false
		cr.unlock()
		return 1, nil
	}
	cr.inRead = true
	cr.unlock()
	n, err = cr.conn.rwc.Read(p) <---------★

	cr.lock()
	cr.inRead = false
	if err != nil {
		cr.handleReadError(err)
	}
	cr.remain -= int64(n)
	cr.unlock()

	cr.cond.Broadcast()
	return n, err
}

いろいろReadが競合しないようにしたりしていますが、今見たいのはconn.rwc.Read=conn.Read(TCPConnが内包しているconnのRead)です。

func (c *conn) Read(b []byte) (int, error) {
	if !c.ok() {
		return 0, syscall.EINVAL
	}
	n, err := c.fd.Read(b)
	if err != nil && err != io.EOF {
		err = &OpError{Op: "read", Net: c.fd.net, Source: c.fd.laddr, Addr: c.fd.raddr, Err: err}
	}
	return n, err
}

func (fd *netFD) Read(p []byte) (n int, err error) {
	n, err = fd.pfd.Read(p)
	runtime.KeepAlive(fd)
	return n, wrapSyscallError(readSyscallName, err)
}

// Read implements io.Reader.
func (fd *FD) Read(p []byte) (int, error) {
	if err := fd.readLock(); err != nil {
		return 0, err
	}
	defer fd.readUnlock()
	if len(p) == 0 {
		// If the caller wanted a zero byte read, return immediately
		// without trying (but after acquiring the readLock).
		// Otherwise syscall.Read returns 0, nil which looks like
		// io.EOF.
		// TODO(bradfitz): make it wait for readability? (Issue 15735)
		return 0, nil
	}
	if err := fd.pd.prepareRead(fd.isFile); err != nil {
		return 0, err
	}
	if fd.IsStream && len(p) > maxRW {
		p = p[:maxRW]
	}
	for {
		n, err := ignoringEINTRIO(syscall.Read, fd.Sysfd, p)
		if err != nil {
			n = 0
			if err == syscall.EAGAIN && fd.pd.pollable() {
				if err = fd.pd.waitRead(fd.isFile); err == nil {
					continue
				}
			}
		}
		err = fd.eofError(n, err)
		return n, err
	}
}


ようやくキーパーソンのwaitReadが出てきました。
例のごとくacceprFDが準備完了でなければsysCall.ReadはerrでwaitReadで待つという流れですね。
waitReadからの流れは前回のAccept編で見たのと同じです。

2. goparkでsleep&スケジューリング

上記でwaitReadからAccept編でみたのと同じように進行します。
詳しくは下記を参照ください。
https://zenn.dev/cube/articles/0731eb7c14bb5d#polldesc.waitread-->-polldesc.wait-->-poll_runtime_pollwait-->-netpollblock-->-gopark

3. スケジューラー内のepoll_waitでfdが準備完了になったら中断していたgoroutineを再起動

こちらもAccept編でみたのと同じ動きですね。
https://zenn.dev/cube/articles/0731eb7c14bb5d#step2-スケジューリングで新しいgを取ってくる-1

acceptFDが準備完了になるとスケジューラーでepoll_waitをして、Goroutineが再起動し、
下記の無限ループに戻りReadを再実行します。
その時はacceptFDが準備完了なのでReadは成功し、無限ループを脱出します。

func (fd *FD) Read(p []byte) (int, error) {
	
	for {
		n, err := ignoringEINTRIO(syscall.Read, fd.Sysfd, p)
		if err != nil {
			n = 0
			if err == syscall.EAGAIN && fd.pd.pollable() {
				if err = fd.pd.waitRead(fd.isFile); err == nil {
					continue
				}
			}
		}
		err = fd.eofError(n, err)
		return n, err
	}
}

Readで無事コネクションから情報を読み取れたら、その情報を最初で見たServeHTTPに与えあとは各Hanlderで処理してもらうといった流れがaccept ~ 各Goroutineに分かれてからの流れとなります。

まとめ

コネクション処理編はepollとスケジューラーというより、serveHTTPへ至る道の解説のような形になりました。
epollもスケジューラーももしかしたらコネクション処理編で解説した方が良かったのかもしれません。
accept編と今回で見たようにReadはブロックせず、リクエストが来ない間は別のGoroutineに処理を渡せて、しかも実装者(サーバー作成者)は全く意識することなく利用できます。

実はnet/httpのサーバー側を読んだのは、
ここまで紹介したnet/httpServerをさらに大量リクエスト向けに改善したと言われている下記を見つけたのがきっかけでした。
https://github.com/cloudwego/netpoll

そもそもnet/httpServerがどんなデザインかわからなかったので、改善といわれても正直ピンとこなかったので今回実装を読んでみることにしました。
正直パフォーマンス改善についての理解はもっと低レイヤから勉強しないといけない気もしますが...ともかくデザイン的にはわかったのでよしとします。

cloudwego/netpollが大量リクエスト向けと書いたのは、これまで見てきた通りnet/httpは1コネクション1Goroutineでした。
なのでリクエストが多いほどGoroutineも多くなり、Goroutineが軽量とは言ってもメモリの使用量や、コンテキストスイッチも多くなってしまいます。
cloudwego/netpollではこれを改善し、生成するGoroutineの数を抑えています。

cloudwego/netpollもepollをふんだんに利用しているのですが、次回の記事では従来のnet/httpと何が違うか、デザインはどうなっているかなんて見て行ければいいなと思っています。

Discussion