Goのnet/httpのServer側デザインの良さについて語る(Connection処理編)
あらすじ
前回まではacceptを非同期にしてlistenFDが準備完了までの間は他のGoroutineを動かし(epoll_ctl & gopark & スケジューリング)、
listenFDが準備完了になったら処理をacceptに戻す(epoll_wait & netpoll)、というところを見ていきました。
今回はacceptが完了して、accept由来のfd,acceptFDをゲットし、そのFDをGoroutineで扱うところを見ていきましょう。
やっとListen編で紹介した下の画像の動きですね。
ここでは指針を↑の画像の動きに絞ってみていきたいと思います。
上記画像の動きをするためにはaccept編で見たように、下の3つが必要となります。
- fdをepoll_ctlで監視対象に
- goparkでsleep&スケジューリング
- スケジューラー内のepoll_waitでfdが準備完了になったら中断していたgoroutineを再起動
この三つを中心に据えてみていきましょう。
1. fdをepoll_ctlで監視対象に(acceptFDを監視対象とする)
さて、前回下記のserver.Serveに戻るところまで見ましたが...ここに戻るまでに一つだけ見ておかなくてはいけないところがあります。
1.fdをepoll_ctlで監視対象に
上の動きをserver.Serveに戻るまでにやっているのでまずはそこだけ押さえましょう。
func (srv *Server) Serve(l net.Listener) error {
...
ctx := context.WithValue(baseCtx, ServerContextKey, srv)
for {
rw, err := l.Accept() <-★
...
c := srv.newConn(rw)
c.setState(c.rwc, StateNew, runHooks) // before Serve can return
go c.serve(connCtx)
}
}
一旦話をacceptが成功したところ、fd.Acceptまで戻します。
fd_unix.go
func (fd *FD) Accept() (int, syscall.Sockaddr, string, error) {
//無限ループ
for {
s, rsa, errcall, err := accept(fd.Sysfd) <-★1
if err == nil {
return s, rsa, "", err
}
switch err {
case syscall.EAGAIN:
if fd.pd.pollable() {
if err = fd.pd.waitRead(fd.isFile); err == nil {
continue
}
}
}
}
func (fd *netFD) accept() (netfd *netFD, err error) {
d, rsa, errcall, err := fd.pfd.Accept()
if err != nil {
if errcall != "" {
err = wrapSyscallError(errcall, err)
}
return nil, err
}
if netfd, err = newFD(d, fd.family, fd.sotype, fd.net); err != nil {
poll.CloseFunc(d)
return nil, err
}
if err = netfd.init(); err != nil { <---★
netfd.Close()
return nil, err
}
lsa, _ := syscall.Getsockname(netfd.pfd.Sysfd)
netfd.setAddr(netfd.addrFunc()(lsa), netfd.addrFunc()(rsa))
return netfd, nil
}
func newFD(sysfd, family, sotype int, net string) (*netFD, error) {
ret := &netFD{
pfd: poll.FD{
Sysfd: sysfd,
IsStream: sotype == syscall.SOCK_STREAM,
ZeroReadIsEOF: sotype != syscall.SOCK_DGRAM && sotype != syscall.SOCK_RAW,
},
family: family,
sotype: sotype,
net: net,
}
return ret, nil
}
newFDで d = acceptFDを使ってnetFDを作っています。listen編でも同じ流れがありました。あとはlisten編と同じ流れなのですが二つ説明したいことがあります。
listen編で見た流れ↓
1.newFDの生成とinitについて
2.epollへの登録
1.newFDの生成とinitについて
newFDとそのinitは下記のようになっています。
func newFD(sysfd, family, sotype int, net string) (*netFD, error) {
ret := &netFD{ <------------★
pfd: poll.FD{
Sysfd: sysfd,
IsStream: sotype == syscall.SOCK_STREAM,
ZeroReadIsEOF: sotype != syscall.SOCK_DGRAM && sotype != syscall.SOCK_RAW,
},
family: family,
sotype: sotype,
net: net,
}
return ret, nil
}
func (fd *netFD) init() error {
return fd.pfd.Init(fd.net, true)
}
func (fd *FD) Init(net string, pollable bool) error {
err := fd.pd.init(fd)
return err
}
ここでFD.pdはpollDescなのですが★印の時にpollDescは作成されていませんでした。つまりnilですね。
nilからメソッドを実行していることになりますが、Goではポインタレシーバーならnilからのメソッド実行は合法なのでOKです。
知ってはいたのですが実際に利用されているところを見ると少し驚きますね。
さて、このあとpollDesc.initと続いていき,pollDescの生成やepoll_ctlでacceptFDを登録していきます。
一個腑に落ちない点としてFDにpollDescを実際に入れているところが見つからないところです。FDにpollDescを持たせている意味はinitをするためというだけで、実際にFDからはpollDescを操作することはないのでしょうか...?
(詳しい方いたら教えてくだされば助かります。)
2.epollへの登録
pollDesc.init ~と続いていき、runtime_pollServerInitを実行するときの話です。
fd_poll_runtime
func (pd *pollDesc) init(fd *FD) error {
serverInit.Do(runtime_pollServerInit)
}
func poll_runtime_pollServerInit() {
netpollGenericInit()
}
func netpollGenericInit() {
if atomic.Load(&netpollInited) == 0 { <--------------★
lockInit(&netpollInitLock, lockRankNetpollInit)
lock(&netpollInitLock)
if netpollInited == 0 {
netpollinit()
atomic.Store(&netpollInited, 1)
}
unlock(&netpollInitLock)
}
}
netpoll_epoll.go
func netpollinit() {
epfd = epollcreate1(_EPOLL_CLOEXEC) <---ここでepollを作成
★のatomic.Loadに注目すると、一度netpollInit()を実行し、atomic.Store(&netpollInited, 1)でnetpollInited=1となっていれば実行されないことがわかります。
一度きりの実行なのでepollcreate1でepfdを何度も作らないようにしているので、epoll=epfdは一つ、ということになります。
ここまでacceptFDがいつepollの監視対象になるか見てきました。
次はacceptが終わった後の話~goroutine起動直前の話を少しした後に、各Goroutineに分かれてからの処理のお話に行きましょう。
acceptが終わったら
やっとserver.Serveまで戻ってきましたので話を進めましょう。
func (srv *Server) Serve(l net.Listener) error {
...
ctx := context.WithValue(baseCtx, ServerContextKey, srv)
for {
rw, err := l.Accept() <-★
...
c := srv.newConn(rw)
c.setState(c.rwc, StateNew, runHooks) // before Serve can return
go c.serve(connCtx)
}
}
★のところでacceptが成功するとrw = net.Connを獲得できます。
獲得するところは下記でした。
tcpsock.go
func (l *TCPListener) Accept() (Conn, error) {
if !l.ok() {
return nil, syscall.EINVAL
}
c, err := l.accept()
if err != nil {
return nil, &OpError{Op: "accept", Net: l.fd.net, Source: nil, Addr: l.fd.laddr, Err: err}
}
return c, nil
}
func (ln *TCPListener) accept() (*TCPConn, error) {
fd, err := ln.fd.accept() <-★ 前節でみたnetFDが返ってくるのだった、netFDの中にaccept由来のFDが入っている
if err != nil {
return nil, err
}
tc := newTCPConn(fd)
if ln.lc.KeepAlive >= 0 {
setKeepAlive(fd, true)
ka := ln.lc.KeepAlive
if ln.lc.KeepAlive == 0 {
ka = defaultTCPKeepAlive
}
setKeepAlivePeriod(fd, ka)
}
return tc, nil
}
func newTCPConn(fd *netFD) *TCPConn {
c := &TCPConn{conn{fd}}
setNoDelay(c.fd, true)
return c
}
type TCPConn struct {
conn
}
type conn struct {
fd *netFD
}
ここでnewTCPConnはinterfaceであるnet.Connを継承しています。
そのnewTCPConnのなかにnetFDを入れます。
func (srv *Server) newConn(rwc net.Conn) *conn {
c := &conn{
server: srv,
rwc: rwc,
}
if debugServerConnections {
c.rwc = newLoggingConn("server", c.rwc)
}
return c
}
type conn struct {
server *Server
cancelCtx context.CancelFunc
rwc net.Conn
...
}
newTCPConnをさらにserver.newConnに与えて、conn構造体を作ってから、conn.serveごとにgoroutineを実行するという流れです。
FDやらConnやらがごちゃごちゃしてきたので一度まとめると下図みたいな感じです。
さて、それではconn.serveからGoroutineによって並列処理されているところをみましょう。
conn.serve -> conn.readRequest -> readRequest
func (srv *Server) Serve(l net.Listener) error {
...
ctx := context.WithValue(baseCtx, ServerContextKey, srv)
for {
rw, err := l.Accept()
...
c := srv.newConn(rw)
c.setState(c.rwc, StateNew, runHooks) // before Serve can return
go c.serve(connCtx) <-★
}
}
func (c *conn) serve(ctx context.Context) {
// HTTP/1.x from here on.
ctx, cancelCtx := context.WithCancel(ctx)
c.cancelCtx = cancelCtx
defer cancelCtx()
//conn.rにconnReaderを入れて、それをBufioReaderでwrap
c.r = &connReader{conn: c}
c.bufr = newBufioReader(c.r)
c.bufw = newBufioWriterSize(checkConnErrorWriter{c}, 4<<10)
//無限ループ、コネクションが切れるまでReadする
for {
w, err := c.readRequest(ctx) -★
...
serverHandler{c.server}.ServeHTTP(w, w.req)
...
}
}
ここでは詳しく見ませんが、serverHandler{c.server}.ServeHTTP(w, w.req)のところであらかじめ登録したHandlerをつかって処理を行っています。
conn.rにconnReaderを入れて、それをBufioReaderでwrapしています。conn.bufrはreadRequestで使います。
// Read next request from connection.
func (c *conn) readRequest(ctx context.Context) (w *response, err error) {
...
req, err := readRequest(c.bufr) <-★
...
ctx, cancelCtx := context.WithCancel(ctx)
req.ctx = ctx
req.RemoteAddr = c.remoteAddr
req.TLS = c.tlsState
if body, ok := req.Body.(*body); ok {
body.doEarlyClose = true
}
w = &response{
conn: c,
cancelCtx: cancelCtx,
req: req,
reqBody: req.Body,
handlerHeader: make(Header),
contentLength: -1,
closeNotifyCh: make(chan bool, 1),
// We populate these ahead of time so we're not
// reading from req.Header after their Handler starts
// and maybe mutates it (Issue 14940)
wants10KeepAlive: req.wantsHttp10KeepAlive(),
wantsClose: req.wantsClose(),
}
w.cw.res = w
w.w = newBufioWriterSize(&w.cw, bufferBeforeChunkingSize)
return w, nil
}
conn.ReadRequestではreadRequestから読み取ったものをresponseとして返しています。
ここの読み取ったものが後のServeHTTPに渡されるわけですね。
今回HTTPのtextをパースしているところはほとんど省いていますので興味のある方はぜひ元ソースをご覧ください。
func readRequest(b *bufio.Reader) (req *Request, err error) {
tp := newTextprotoReader(b)
req = new(Request)
// First line: GET /index.html HTTP/1.0
var s string
if s, err = tp.ReadLine(); err != nil {
return nil, err
}
}
bufio.ReaderをさらにTextprotoReaderでwrapしています。
func newTextprotoReader(br *bufio.Reader) *textproto.Reader {
if v := textprotoReaderPool.Get(); v != nil {
tr := v.(*textproto.Reader)
tr.R = br
return tr
}
return textproto.NewReader(br)
}
下記がReaderのwrapをまとめた図です。
なぜこんなwrapばかりしているかというと、
textprotoReaderはその名の通りtextのReadに便利なメソッドがあるから、
bufioReaderはbufio.Readline等,従来のReadを便利にできるからですね。
Readerの拡張がしたいがためにwrapを挟んでいます。
ここからはRead部分に入っていきましょう。今回bufioが施している様々な工夫は省いていますので、興味のある方はbufioを覗いてみるのも面白いかもしれません。
textproto.Reader.Readline -> textproto.Reader.readLineSlice -> bufio.Reader.ReadLine -> bufio.Reader.ReadSlice -> bufio.Reader.fill
func (r *Reader) ReadLine() (string, error) {
line, err := r.readLineSlice()
return string(line), err
}
func (r *Reader) readLineSlice() ([]byte, error) {
r.closeDot()
var line []byte
for {
l, more, err := r.R.ReadLine()
if err != nil {
return nil, err
}
// Avoid the copy if the first call produced a full line.
if line == nil && !more {
return l, nil
}
line = append(line, l...)
if !more {
break
}
}
return line, nil
}
ここでr.Rはbufio.Readerのことでした。
bufio.go
func (b *Reader) ReadLine() (line []byte, isPrefix bool, err error) {
line, err = b.ReadSlice('\n')
...
}
func (b *Reader) ReadSlice(delim byte) (line []byte, err error) {
s := 0 // search start index
for {
// Search buffer.
if i := bytes.IndexByte(b.buf[b.r+s:b.w], delim); i >= 0 {
i += s
line = b.buf[b.r : b.r+i+1]
b.r += i + 1
break
}
// Pending error?
if b.err != nil {
line = b.buf[b.r:b.w]
b.r = b.w
err = b.readErr()
break
}
// Buffer full?
if b.Buffered() >= len(b.buf) {
b.r = b.w
line = b.buf
err = ErrBufferFull
break
}
s = b.w - b.r // do not rescan area we scanned before
b.fill() // buffer is not full <---------★
}
return
}
func (b *Reader) fill() {
// Slide existing data to beginning.
if b.r > 0 {
copy(b.buf, b.buf[b.r:b.w])
b.w -= b.r
b.r = 0
}
if b.w >= len(b.buf) {
panic("bufio: tried to fill full buffer")
}
// Read new data: try a limited number of times.
for i := maxConsecutiveEmptyReads; i > 0; i-- {
n, err := b.rd.Read(b.buf[b.w:]) <--------★
if n < 0 {
panic(errNegativeRead)
}
b.w += n
if err != nil {
b.err = err
return
}
if n > 0 {
return
}
}
b.err = io.ErrNoProgress
}
b.rdとは&connReader{conn: c}のことでした。いよいよコネクションのReadに迫ってきましたね。
connReader.Read -> conn.Read -> netFD.Read -> FD.Read
func (cr *connReader) Read(p []byte) (n int, err error) {
cr.lock()
if cr.inRead {
cr.unlock()
if cr.conn.hijacked() {
panic("invalid Body.Read call. After hijacked, the original Request must not be used")
}
panic("invalid concurrent Body.Read call")
}
if cr.hitReadLimit() {
cr.unlock()
return 0, io.EOF
}
if len(p) == 0 {
cr.unlock()
return 0, nil
}
if int64(len(p)) > cr.remain {
p = p[:cr.remain]
}
if cr.hasByte {
p[0] = cr.byteBuf[0]
cr.hasByte = false
cr.unlock()
return 1, nil
}
cr.inRead = true
cr.unlock()
n, err = cr.conn.rwc.Read(p) <---------★
cr.lock()
cr.inRead = false
if err != nil {
cr.handleReadError(err)
}
cr.remain -= int64(n)
cr.unlock()
cr.cond.Broadcast()
return n, err
}
いろいろReadが競合しないようにしたりしていますが、今見たいのはconn.rwc.Read=conn.Read(TCPConnが内包しているconnのRead)です。
func (c *conn) Read(b []byte) (int, error) {
if !c.ok() {
return 0, syscall.EINVAL
}
n, err := c.fd.Read(b)
if err != nil && err != io.EOF {
err = &OpError{Op: "read", Net: c.fd.net, Source: c.fd.laddr, Addr: c.fd.raddr, Err: err}
}
return n, err
}
func (fd *netFD) Read(p []byte) (n int, err error) {
n, err = fd.pfd.Read(p)
runtime.KeepAlive(fd)
return n, wrapSyscallError(readSyscallName, err)
}
// Read implements io.Reader.
func (fd *FD) Read(p []byte) (int, error) {
if err := fd.readLock(); err != nil {
return 0, err
}
defer fd.readUnlock()
if len(p) == 0 {
// If the caller wanted a zero byte read, return immediately
// without trying (but after acquiring the readLock).
// Otherwise syscall.Read returns 0, nil which looks like
// io.EOF.
// TODO(bradfitz): make it wait for readability? (Issue 15735)
return 0, nil
}
if err := fd.pd.prepareRead(fd.isFile); err != nil {
return 0, err
}
if fd.IsStream && len(p) > maxRW {
p = p[:maxRW]
}
for {
n, err := ignoringEINTRIO(syscall.Read, fd.Sysfd, p)
if err != nil {
n = 0
if err == syscall.EAGAIN && fd.pd.pollable() {
if err = fd.pd.waitRead(fd.isFile); err == nil {
continue
}
}
}
err = fd.eofError(n, err)
return n, err
}
}
ようやくキーパーソンのwaitReadが出てきました。
例のごとくacceprFDが準備完了でなければsysCall.ReadはerrでwaitReadで待つという流れですね。
waitReadからの流れは前回のAccept編で見たのと同じです。
2. goparkでsleep&スケジューリング
上記でwaitReadからAccept編でみたのと同じように進行します。
詳しくは下記を参照ください。
3. スケジューラー内のepoll_waitでfdが準備完了になったら中断していたgoroutineを再起動
こちらもAccept編でみたのと同じ動きですね。
acceptFDが準備完了になるとスケジューラーでepoll_waitをして、Goroutineが再起動し、
下記の無限ループに戻りReadを再実行します。
その時はacceptFDが準備完了なのでReadは成功し、無限ループを脱出します。
func (fd *FD) Read(p []byte) (int, error) {
for {
n, err := ignoringEINTRIO(syscall.Read, fd.Sysfd, p)
if err != nil {
n = 0
if err == syscall.EAGAIN && fd.pd.pollable() {
if err = fd.pd.waitRead(fd.isFile); err == nil {
continue
}
}
}
err = fd.eofError(n, err)
return n, err
}
}
Readで無事コネクションから情報を読み取れたら、その情報を最初で見たServeHTTPに与えあとは各Hanlderで処理してもらうといった流れがaccept ~ 各Goroutineに分かれてからの流れとなります。
まとめ
コネクション処理編はepollとスケジューラーというより、serveHTTPへ至る道の解説のような形になりました。
epollもスケジューラーももしかしたらコネクション処理編で解説した方が良かったのかもしれません。
accept編と今回で見たようにReadはブロックせず、リクエストが来ない間は別のGoroutineに処理を渡せて、しかも実装者(サーバー作成者)は全く意識することなく利用できます。
実はnet/httpのサーバー側を読んだのは、
ここまで紹介したnet/httpServerをさらに大量リクエスト向けに改善したと言われている下記を見つけたのがきっかけでした。
そもそもnet/httpServerがどんなデザインかわからなかったので、改善といわれても正直ピンとこなかったので今回実装を読んでみることにしました。
正直パフォーマンス改善についての理解はもっと低レイヤから勉強しないといけない気もしますが...ともかくデザイン的にはわかったのでよしとします。
cloudwego/netpollが大量リクエスト向けと書いたのは、これまで見てきた通りnet/httpは1コネクション1Goroutineでした。
なのでリクエストが多いほどGoroutineも多くなり、Goroutineが軽量とは言ってもメモリの使用量や、コンテキストスイッチも多くなってしまいます。
cloudwego/netpollではこれを改善し、生成するGoroutineの数を抑えています。
cloudwego/netpollもepollをふんだんに利用しているのですが、次回の記事では従来のnet/httpと何が違うか、デザインはどうなっているかなんて見て行ければいいなと思っています。
Discussion