🌊

Goのnet/httpのclientでなぜresponseBodyをClose、読み切らなくてはいけないのか

2021/09/27に公開

いきなり結論

結論としては、

  1. responseBodyをCloseしないとコネクションがブロックしてしまい再利用されず、古い接続が残ったまま、新しく接続するたびに新しいGoroutineとファイルディスクリプタを作ってしまう
  2. responseBodyを読み切らないとkeepAliveされずコネクションが終了してしまい再利用されず、接続のたびに新しい接続を作ってしまう。

ということなのですが、コードではどうなっているか見てみましょう。

http.Get ~ client.sendまで

http.Get -> client.Get -> client.Do -> client.sendまで

 http.Get
 func Get(url string) (resp *Response, err error) {
     return DefaultClient.Get(url)
 }
 
 ここのDefaultClientはグローバル変数なので使いまわししています。
 
 client.Get
 func (c *Client) Get(url string) (resp *Response, err error) {
     req, err := NewRequest("GET", url, nil)
     if err != nil {
         return nil, err
     }
     return c.Do(req)
}

client.Do
func (c *Client) Do(req *Request) (*Response, error) {
     return c.do(req)
 }
 
 func (c *Client) do(req *Request) (retres *Response, reterr error) {
	
	for {
		reqs = append(reqs, req)
		var err error
		var didTimeout func() bool
		
		//sendはここから
		if resp, didTimeout, err = c.send(req, deadline); err != nil { 
			// c.send() always closes req.Body
			reqBodyClosed = true
			if !deadline.IsZero() && didTimeout() {
				err = &httpError{
					// TODO: early in cycle: s/Client.Timeout exceeded/timeout or context cancellation/
					err:     err.Error() + " (Client.Timeout exceeded while awaiting headers)",
					timeout: true,
				}
			}
			return nil, uerr(err)
		}

		var shouldRedirect bool
		redirectMethod, shouldRedirect, includeBody = redirectBehavior(req.Method, resp, reqs[0])
		if !shouldRedirect {
			return resp, nil
		}

		req.closeBody()
	}
}

client.send
func (c *Client) send(req *Request, deadline time.Time) (resp *Response, didTimeout func() bool, err error) {
	if c.Jar != nil {
		for _, cookie := range c.Jar.Cookies(req.URL) {
			req.AddCookie(cookie)
		}
	}
	resp, didTimeout, err = send(req, c.transport(), deadline)
	if err != nil {
		return nil, didTimeout, err
	}
	if c.Jar != nil {
		if rc := resp.Cookies(); len(rc) > 0 {
			c.Jar.SetCookies(req.URL, rc)
		}
	}
	return resp, nil, nil
	}

func send(ireq *Request, rt RoundTripper, deadline time.Time) (resp *Response, didTimeout func() bool, err error) {
 
 	resp, err = rt.RoundTrip(req)
 
 	return resp, nil, nil
 }
 
DefaultClientを使っていたりして、RoundTripperがnilの時は、下記でDefaultTransportを使うようになっている、defaultをうまく使っているなと感じる
 func (c *Client) transport() RoundTripper {
 	if c.Transport != nil {
 		return c.Transport
 	}
 	return DefaultTransport
 }

defaultTransport.RoundTrip ~ persistConnection.roundTripまで

defaultTransport.RoundTrip -> defaultTransport.getConn -> persistConnection.roundTrip

 rtはclient.transport()由来であったRoundTripperはinterfaceなのでdefaultTransport  が嫌ならClient生成時にTransportを指定してあげれば良い。
 ここではdefaultTransportとして話を進める。
  tr := &http.Transport{
        MaxIdleConns:       10,
        IdleConnTimeout:    30 * time.Second,
        DisableCompression: true,
    }
    client := &http.Client{
        Transport: tr,
    }
    
 func (t *Transport) roundTrip(req *Request) (*Response, error) {
 	
 	ctx := req.Context()
 	
 	for {
 	
 	//forループの最初でseolectはさんでctxのチェックをするのもよくあるパターン
 		select {
 		case <-ctx.Done():
 			req.closeBody()
 			return nil, ctx.Err()
 		default:
 		}
 
 		// treq gets modified by roundTrip, so we need to recreate for each retry.
 		treq := &transportRequest{Request: req, trace: trace, cancelKey: cancelKey}
 		cm, err := t.connectMethodForRequest(treq)
 		if err != nil {
 			req.closeBody()
 			return nil, err
 		}
 
 		// Get the cached or newly-created connection to either the
 		// host (for http or https), the http proxy, or the http proxy
 		// pre-CONNECTed to https server. In any case, we'll be ready
 		// to send it requests.
 		pconn, err := t.getConn(treq, cm)  <-if err != nil {
 			t.setReqCanceler(cancelKey, nil)
 			req.closeBody()
 			return nil, err
 		}
 
 		var resp *Response
 		if pconn.alt != nil {
 			// HTTP/2 path.
 			t.setReqCanceler(cancelKey, nil) // not cancelable with CancelRequest
 			resp, err = pconn.alt.RoundTrip(req)
 		} else {
 			resp, err = pconn.roundTrip(treq)   <-}
 	
 	}
 }
 
 getConn 
 getConnではコネクションを獲得している。
 transportごとにコネクションのQueueがあってこのQueueをつかってコネクションを使いまわしている,いつこのQueueに追加されるかが今回の表題の一つのカギとなってくる、追加タイミングは後々
 
 下記のどちらかの道でコネクションを獲得、queueForIdleConnではqueueから使いまわされたコネクションを獲得するものと覚えておいて、今回は新規コネクション生成の方を見ていく
 getConn -> queueForIdleConn or
         -> queueForDial -> dialConnFor -> dialConn -func (t *Transport) getConn(treq *transportRequest, cm connectMethod) (pc *persistConn, err error) {
 	
 	w := &wantConn{
 		cm:         cm,
 		key:        cm.key(),
 		ctx:        ctx,
 		ready:      make(chan struct{}, 1),
 		beforeDial: testHookPrePendingDial,
 		afterDial:  testHookPostPendingDial,
 	}
 	
 	// Queue for idle connection.
 	if delivered := t.queueForIdleConn(w); delivered { <-☆
 		pc := w.pc
 		// Trace only for HTTP/1.
 		// HTTP/2 calls trace.GotConn itself.
 		if pc.alt == nil && trace != nil && trace.GotConn != nil {
 			trace.GotConn(pc.gotIdleConnTrace(pc.idleAt))
 		}
 		// set request canceler to some non-nil function so we
 		// can detect whether it was cleared between now and when
 		// we enter roundTrip
 		t.setReqCanceler(treq.cancelKey, func(error) {})
 		return pc, nil
 	}
 
 	cancelc := make(chan error, 1)
 	t.setReqCanceler(treq.cancelKey, func(err error) { cancelc <- err })
 
 	// Queue for permission to dial.
 	t.queueForDial(w)
 
 	// Wait for completion or cancellation.
 	select {
 	case <-w.ready:
 		// Trace success but only for HTTP/1.
 		// HTTP/2 calls trace.GotConn itself.
 		if w.pc != nil && w.pc.alt == nil && trace != nil && trace.GotConn != nil {
 			trace.GotConn(httptrace.GotConnInfo{Conn: w.pc.conn, Reused: w.pc.isReused()})
 		}
 		if w.err != nil {
 			// If the request has been cancelled, that's probably
 			// what caused w.err; if so, prefer to return the
 			// cancellation error (see golang.org/issue/16049).
 			select {
 			case <-req.Cancel:
 				return nil, errRequestCanceledConn
 			case <-req.Context().Done():
 				return nil, req.Context().Err()
 			case err := <-cancelc:
 				if err == errRequestCanceled {
 					err = errRequestCanceledConn
 				}
 				return nil, err
 			default:
 				// return below
 			}
 		}
 		return w.pc, w.err
 	case <-req.Cancel:
 		return nil, errRequestCanceledConn
 	case <-req.Context().Done():
 		return nil, req.Context().Err()
 	case err := <-cancelc:
 		if err == errRequestCanceled {
 			err = errRequestCanceledConn
 		}
 		return nil, err
 	}
 }
 
 コネクション生成の本体はdialConn
 
 func (t *Transport) dialConn(ctx context.Context, cm connectMethod) (pconn *persistConn, err error) {
 	pconn = &persistConn{
 		t:             t,
 		cacheKey:      cm.key(),
 		reqch:         make(chan requestAndChan, 1),
 		writech:       make(chan writeRequest, 1),
 		closech:       make(chan struct{}),
 		writeErrCh:    make(chan error, 1),
 		writeLoopDone: make(chan struct{}),
 	}
 	trace := httptrace.ContextClientTrace(ctx)
 	wrapErr := func(err error) error {
 		if cm.proxyURL != nil {
 			// Return a typed error, per Issue 16997
 			return &net.OpError{Op: "proxyconnect", Net: "tcp", Err: err}
 		}
 		return err
 	}
 	if cm.scheme() == "https" && t.hasCustomTLSDialer() {
 		var err error
 		pconn.conn, err = t.customDialTLS(ctx, "tcp", cm.addr())
 		if err != nil {
 			return nil, wrapErr(err)
 		}
 		if tc, ok := pconn.conn.(*tls.Conn); ok {
 			// Handshake here, in case DialTLS didn't. TLSNextProto below
 			// depends on it for knowing the connection state.
 			if trace != nil && trace.TLSHandshakeStart != nil {
 				trace.TLSHandshakeStart()
 			}
 			if err := tc.Handshake(); err != nil {
 				go pconn.conn.Close()
 				if trace != nil && trace.TLSHandshakeDone != nil {
 					trace.TLSHandshakeDone(tls.ConnectionState{}, err)
 				}
 				return nil, err
 			}
 			cs := tc.ConnectionState()
 			if trace != nil && trace.TLSHandshakeDone != nil {
 				trace.TLSHandshakeDone(cs, nil)
 			}
 			pconn.tlsState = &cs
 		}
 	} else {
 		conn, err := t.dial(ctx, "tcp", cm.addr())
 		if err != nil {
 			return nil, wrapErr(err)
 		}
 		pconn.conn = conn
 		if cm.scheme() == "https" {
 			var firstTLSHost string
 			if firstTLSHost, _, err = net.SplitHostPort(cm.addr()); err != nil {
 				return nil, wrapErr(err)
 			}
 			if err = pconn.addTLS(firstTLSHost, trace); err != nil {
 				return nil, wrapErr(err)
 			}
 		}
 	}
 
 	...
    	pconn.br = bufio.NewReaderSize(pconn, t.readBufferSize())  <-これは将来的にconn.Readをしてサーバーから読み取り
    	pconn.bw = bufio.NewWriterSize(persistConnWriter{pconn}, t.writeBufferSize()) <-これは将来的にconn.Writeをしてサーバーへ書き込み
 
 	go pconn.readLoop()
 	go pconn.writeLoop()
 	return pconn, nil
 }
 
 dialConnで押さえておきたいのは
 1.persistConnはtransport.dial()から取得した実際のコネクションのwrapperということ
 2.コネクション新規作成時にtransport.dial()をしてファイルディスクリプタを生成しているということ
 3.同じくコネクション新規作成時にreadLoop,writeLoopというGoroutineを起動していること
   readLoop,writeLoopはpersistConnection.roundTripと協調して受信、送信を行っている

roundtripとreadLoop,writeLoopが協調するということだけここでは押さえておく。

原因の箇所、readLoop

ようやくreadLoopまで来たので問題となる箇所を見ていく。

func (pc *persistConn) readLoop() {
	closeErr := errReadLoopExiting // default value, if not changed below
	defer func() {
		pc.close(closeErr)
		pc.t.removeIdleConn(pc)   
	}()

	tryPutIdleConn := func(trace *httptrace.ClientTrace) bool {
		if err := pc.t.tryPutIdleConn(pc); err != nil {
			closeErr = err
			if trace != nil && trace.PutIdleConn != nil && err != errKeepAlivesDisabled {
				trace.PutIdleConn(err)
			}
			return false
		}
		if trace != nil && trace.PutIdleConn != nil {
			trace.PutIdleConn(nil)
		}
		return true
	}

	// eofc is used to block caller goroutines reading from Response.Body
	// at EOF until this goroutines has (potentially) added the connection
	// back to the idle pool.
	eofc := make(chan struct{})
	defer close(eofc) // unblock reader on errors

	// Read this once, before loop starts. (to avoid races in tests)
	testHookMu.Lock()
	testHookReadLoopBeforeNextRead := testHookReadLoopBeforeNextRead
	testHookMu.Unlock()

	alive := true <-3
	for alive {
		pc.readLimit = pc.maxHeaderResponseSize()
		_, err := pc.br.Peek(1)

		pc.mu.Lock()
		if pc.numExpectedResponses == 0 {
			pc.readLoopPeekFailLocked(err)
			pc.mu.Unlock()
			return
		}
		pc.mu.Unlock()

		rc := <-pc.reqch
		trace := httptrace.ContextClientTrace(rc.req.Context())

		var resp *Response
		if err == nil {
			resp, err = pc.readResponse(rc, trace)
		} else {
			err = transportReadFromServerError{err}
			closeErr = err
		}

		if err != nil {
			if pc.readLimit <= 0 {
				err = fmt.Errorf("net/http: server response headers exceeded %d bytes; aborted", pc.maxHeaderResponseSize())
			}

			select {
			case rc.ch <- responseAndError{err: err}:
			case <-rc.callerGone:
				return
			}
			return
		}
		pc.readLimit = maxInt64 // effectively no limit for response bodies

		pc.mu.Lock()
		pc.numExpectedResponses--
		pc.mu.Unlock()

		bodyWritable := resp.bodyIsWritable()
		hasBody := rc.req.Method != "HEAD" && resp.ContentLength != 0

		if resp.Close || rc.req.Close || resp.StatusCode <= 199 || bodyWritable {
			// Don't do keep-alive on error if either party requested a close
			// or we get an unexpected informational (1xx) response.
			// StatusCode 100 is already handled above.
			alive = false
		}

		if !hasBody || bodyWritable {
			replaced := pc.t.replaceReqCanceler(rc.cancelKey, nil)

			// Put the idle conn back into the pool before we send the response
			// so if they process it quickly and make another request, they'll
			// get this same conn. But we use the unbuffered channel 'rc'
			// to guarantee that persistConn.roundTrip got out of its select
			// potentially waiting for this persistConn to close.
			alive = alive &&
				!pc.sawEOF &&
				pc.wroteRequest() &&
				replaced && tryPutIdleConn(trace)

			if bodyWritable {
				closeErr = errCallerOwnsConn
			}

			select {
			case rc.ch <- responseAndError{res: resp}:
			case <-rc.callerGone:
				return
			}

			// Now that they've read from the unbuffered channel, they're safely
			// out of the select that also waits on this goroutine to die, so
			// we're allowed to exit now if needed (if alive is false)
			testHookReadLoopBeforeNextRead()
			continue
		}

		waitForBodyRead := make(chan bool, 2)   <-2
		body := &bodyEOFSignal{
			body: resp.Body,
			earlyCloseFn: func() error {
				waitForBodyRead <- false
				<-eofc // will be closed by deferred call at the end of the function
				return nil

			},
			fn: func(err error) error {
				isEOF := err == io.EOF
				waitForBodyRead <- isEOF
				if isEOF {
					<-eofc // see comment above eofc declaration
				} else if err != nil {
					if cerr := pc.canceled(); cerr != nil {
						return cerr
					}
				}
				return err
			},
		}

		resp.Body = body
		if rc.addedGzip && strings.EqualFold(resp.Header.Get("Content-Encoding"), "gzip") {
			resp.Body = &gzipReader{body: body}
			resp.Header.Del("Content-Encoding")
			resp.Header.Del("Content-Length")
			resp.ContentLength = -1
			resp.Uncompressed = true
		}

		select {
		case rc.ch <- responseAndError{res: resp}:
		case <-rc.callerGone:
			return
		}

		// Before looping back to the top of this function and peeking on
		// the bufio.Reader, wait for the caller goroutine to finish
		// reading the response body. (or for cancellation or death)
		select {  <-1
		case bodyEOF := <-waitForBodyRead:
			replaced := pc.t.replaceReqCanceler(rc.cancelKey, nil) // before pc might return to idle pool
			alive = alive &&
				bodyEOF &&
				!pc.sawEOF &&
				pc.wroteRequest() &&
				replaced && tryPutIdleConn(trace)  
			if bodyEOF {
				eofc <- struct{}{}
			}
		case <-rc.req.Cancel:
			alive = false
			pc.t.CancelRequest(rc.req)
		case <-rc.req.Context().Done():
			alive = false
			pc.t.cancelRequest(rc.cancelKey, rc.req.Context().Err())
		case <-pc.closech:
			alive = false
		}

		testHookReadLoopBeforeNextRead()
	}
}

responseBodyをCloseしないといけないわけ

まずは1から見ていこう。

  1. responseBodyをCloseしないとコネクションがブロックしてしまい再利用されず、古い接続が残ったまま、新しく接続するたびに新しいGoroutineとファイルディスクリプタを作ってしまう

一番最後のあたりのselect文でbodyを読むまで待っている部分がある(読み切ったかどうかはcase内部で判定)、ここのwaitForBodyReadがブロックしてしまうというのが問題で、
ここがブロックしてしまえば
tryPutIdleConnでコネクションをTransportのQueueに入れて再利用ができない。

1
 select {
		case bodyEOF := <-waitForBodyRead:
			replaced := pc.t.replaceReqCanceler(rc.cancelKey, nil) // before pc might return to idle pool
			alive = alive &&
				bodyEOF &&
				!pc.sawEOF &&
				pc.wroteRequest() &&
				replaced && tryPutIdleConn(trace) 
			if bodyEOF {
				eofc <- struct{}{}
			}

どんな時にブロックしてしまうかというと、下記のbodyがresp.bodyなのだが、fn,earlyCloseFnでwaitForBodyReadに値を入れていることがわかる。
fn,earlyCloseFnはbody.Closeしたときに発火するのでbodyをCloseしなければ、無駄にコネクションが作られ、ファイルディスクリプタとGoroutineの無駄になってしまう。

2
 waitForBodyRead := make(chan bool, 2)
 body := &bodyEOFSignal{
 			body: resp.Body,
 			earlyCloseFn: func() error {
 				waitForBodyRead <- false
 				<-eofc // will be closed by deferred call at the end of the function
 				return nil
 
 			},
 			fn: func(err error) error {  
 				isEOF := err == io.EOF
 				waitForBodyRead <- isEOF
 				if isEOF {
 					<-eofc // see comment above eofc declaration
 				} else if err != nil {
 					if cerr := pc.canceled(); cerr != nil {
 						return cerr
 					}
 				}
 				return err
 			},
 		}

func (es *bodyEOFSignal) Close() error {
	es.mu.Lock()
	defer es.mu.Unlock()
	if es.closed {
		return nil
	}
	es.closed = true
	if es.earlyCloseFn != nil && es.rerr != io.EOF {
		return es.earlyCloseFn()
	}
	err := es.body.Close()
	return es.condfn(err)
}

 	

responseBodyを読み切らないといけないわけ

次に、
2. responseBodyを読み切らないとkeepAliveされずコネクションが終了してしまい再利用されず、接続のたびに新しい接続を作ってしまう。
を見ていこう。これもreadLoopが関わる、BodyをCloseしたとしてcase式の中へ行ったときにaliveとあるが、これはkeepAliveのこと

alive = trueである限りreadLoopからは抜けることはない。

bodyを読み切った時は★4のbodyEOF=trueで、
tryPutIdleConnでtransportのqueueにコネクションを入れる。
次回以降clientでsendした場合はTransportのコネクションを使うことができ、新しくコネクションを作るわけではないのでreadLoopもqueueから取り出したコネクションがまだ動いている状態なのでそれを使える。

ただ、bodyを最後まで読み切らないと、★4のbodyEOFがfalseとなり、tryPutIdleConnも実行されず、alive=falseとなってreadLoopを抜けてしまう。
なのでコネクションの再利用が行われず、keepAliveも終了してしまうという形

func (pc *persistConn) readLoop() {
	
        ...
	
	alive := true <-3
	for alive {
		...
		
		select {
		case bodyEOF := <-waitForBodyRead:
			replaced := pc.t.replaceReqCanceler(rc.cancelKey, nil) //            before pc might return to idle pool
			alive = alive &&
				bodyEOF && <-4
				!pc.sawEOF &&
				pc.wroteRequest() &&
				replaced && tryPutIdleConn(trace) 
			if bodyEOF {
				eofc <- struct{}{}
			}
        }

以上が、
resp.body.Close()とbodyを最後まで読み切ったほうが良い理由となっています。
やらなければいけないとはわかっていても実際なぜなのかよくわかっていなかったので勉強になりました。

参考
https://tutuz-tech.hatenablog.com/entry/2020/03/22/160529
https://www.sambaiz.net/article/53/
https://qiita.com/kitauji/items/b4e8a48c75bf01ccc9f0

Discussion