Goのnet/httpのclientでなぜresponseBodyをClose、読み切らなくてはいけないのか
いきなり結論
結論としては、
- responseBodyをCloseしないとコネクションがブロックしてしまい再利用されず、古い接続が残ったまま、新しく接続するたびに新しいGoroutineとファイルディスクリプタを作ってしまう
- responseBodyを読み切らないとkeepAliveされずコネクションが終了してしまい再利用されず、接続のたびに新しい接続を作ってしまう。
ということなのですが、コードではどうなっているか見てみましょう。
http.Get ~ client.sendまで
http.Get -> client.Get -> client.Do -> client.sendまで
http.Get
func Get(url string) (resp *Response, err error) {
return DefaultClient.Get(url)
}
ここのDefaultClientはグローバル変数なので使いまわししています。
client.Get
func (c *Client) Get(url string) (resp *Response, err error) {
req, err := NewRequest("GET", url, nil)
if err != nil {
return nil, err
}
return c.Do(req)
}
client.Do
func (c *Client) Do(req *Request) (*Response, error) {
return c.do(req)
}
func (c *Client) do(req *Request) (retres *Response, reterr error) {
for {
reqs = append(reqs, req)
var err error
var didTimeout func() bool
//sendはここから
if resp, didTimeout, err = c.send(req, deadline); err != nil {
// c.send() always closes req.Body
reqBodyClosed = true
if !deadline.IsZero() && didTimeout() {
err = &httpError{
// TODO: early in cycle: s/Client.Timeout exceeded/timeout or context cancellation/
err: err.Error() + " (Client.Timeout exceeded while awaiting headers)",
timeout: true,
}
}
return nil, uerr(err)
}
var shouldRedirect bool
redirectMethod, shouldRedirect, includeBody = redirectBehavior(req.Method, resp, reqs[0])
if !shouldRedirect {
return resp, nil
}
req.closeBody()
}
}
client.send
func (c *Client) send(req *Request, deadline time.Time) (resp *Response, didTimeout func() bool, err error) {
if c.Jar != nil {
for _, cookie := range c.Jar.Cookies(req.URL) {
req.AddCookie(cookie)
}
}
resp, didTimeout, err = send(req, c.transport(), deadline)
if err != nil {
return nil, didTimeout, err
}
if c.Jar != nil {
if rc := resp.Cookies(); len(rc) > 0 {
c.Jar.SetCookies(req.URL, rc)
}
}
return resp, nil, nil
}
func send(ireq *Request, rt RoundTripper, deadline time.Time) (resp *Response, didTimeout func() bool, err error) {
resp, err = rt.RoundTrip(req)
return resp, nil, nil
}
DefaultClientを使っていたりして、RoundTripperがnilの時は、下記でDefaultTransportを使うようになっている、defaultをうまく使っているなと感じる
func (c *Client) transport() RoundTripper {
if c.Transport != nil {
return c.Transport
}
return DefaultTransport
}
defaultTransport.RoundTrip ~ persistConnection.roundTripまで
defaultTransport.RoundTrip -> defaultTransport.getConn -> persistConnection.roundTrip
rtはclient.transport()由来であったRoundTripperはinterfaceなのでdefaultTransport が嫌ならClient生成時にTransportを指定してあげれば良い。
ここではdefaultTransportとして話を進める。
tr := &http.Transport{
MaxIdleConns: 10,
IdleConnTimeout: 30 * time.Second,
DisableCompression: true,
}
client := &http.Client{
Transport: tr,
}
func (t *Transport) roundTrip(req *Request) (*Response, error) {
ctx := req.Context()
for {
//forループの最初でseolectはさんでctxのチェックをするのもよくあるパターン
select {
case <-ctx.Done():
req.closeBody()
return nil, ctx.Err()
default:
}
// treq gets modified by roundTrip, so we need to recreate for each retry.
treq := &transportRequest{Request: req, trace: trace, cancelKey: cancelKey}
cm, err := t.connectMethodForRequest(treq)
if err != nil {
req.closeBody()
return nil, err
}
// Get the cached or newly-created connection to either the
// host (for http or https), the http proxy, or the http proxy
// pre-CONNECTed to https server. In any case, we'll be ready
// to send it requests.
pconn, err := t.getConn(treq, cm) <- ☆
if err != nil {
t.setReqCanceler(cancelKey, nil)
req.closeBody()
return nil, err
}
var resp *Response
if pconn.alt != nil {
// HTTP/2 path.
t.setReqCanceler(cancelKey, nil) // not cancelable with CancelRequest
resp, err = pconn.alt.RoundTrip(req)
} else {
resp, err = pconn.roundTrip(treq) <-☆
}
}
}
getConn
getConnではコネクションを獲得している。
transportごとにコネクションのQueueがあってこのQueueをつかってコネクションを使いまわしている,いつこのQueueに追加されるかが今回の表題の一つのカギとなってくる、追加タイミングは後々
下記のどちらかの道でコネクションを獲得、queueForIdleConnではqueueから使いまわされたコネクションを獲得するものと覚えておいて、今回は新規コネクション生成の方を見ていく
getConn -> queueForIdleConn or
-> queueForDial -> dialConnFor -> dialConn -☆
func (t *Transport) getConn(treq *transportRequest, cm connectMethod) (pc *persistConn, err error) {
w := &wantConn{
cm: cm,
key: cm.key(),
ctx: ctx,
ready: make(chan struct{}, 1),
beforeDial: testHookPrePendingDial,
afterDial: testHookPostPendingDial,
}
// Queue for idle connection.
if delivered := t.queueForIdleConn(w); delivered { <-☆
pc := w.pc
// Trace only for HTTP/1.
// HTTP/2 calls trace.GotConn itself.
if pc.alt == nil && trace != nil && trace.GotConn != nil {
trace.GotConn(pc.gotIdleConnTrace(pc.idleAt))
}
// set request canceler to some non-nil function so we
// can detect whether it was cleared between now and when
// we enter roundTrip
t.setReqCanceler(treq.cancelKey, func(error) {})
return pc, nil
}
cancelc := make(chan error, 1)
t.setReqCanceler(treq.cancelKey, func(err error) { cancelc <- err })
// Queue for permission to dial.
t.queueForDial(w)
// Wait for completion or cancellation.
select {
case <-w.ready:
// Trace success but only for HTTP/1.
// HTTP/2 calls trace.GotConn itself.
if w.pc != nil && w.pc.alt == nil && trace != nil && trace.GotConn != nil {
trace.GotConn(httptrace.GotConnInfo{Conn: w.pc.conn, Reused: w.pc.isReused()})
}
if w.err != nil {
// If the request has been cancelled, that's probably
// what caused w.err; if so, prefer to return the
// cancellation error (see golang.org/issue/16049).
select {
case <-req.Cancel:
return nil, errRequestCanceledConn
case <-req.Context().Done():
return nil, req.Context().Err()
case err := <-cancelc:
if err == errRequestCanceled {
err = errRequestCanceledConn
}
return nil, err
default:
// return below
}
}
return w.pc, w.err
case <-req.Cancel:
return nil, errRequestCanceledConn
case <-req.Context().Done():
return nil, req.Context().Err()
case err := <-cancelc:
if err == errRequestCanceled {
err = errRequestCanceledConn
}
return nil, err
}
}
コネクション生成の本体はdialConn
func (t *Transport) dialConn(ctx context.Context, cm connectMethod) (pconn *persistConn, err error) {
pconn = &persistConn{
t: t,
cacheKey: cm.key(),
reqch: make(chan requestAndChan, 1),
writech: make(chan writeRequest, 1),
closech: make(chan struct{}),
writeErrCh: make(chan error, 1),
writeLoopDone: make(chan struct{}),
}
trace := httptrace.ContextClientTrace(ctx)
wrapErr := func(err error) error {
if cm.proxyURL != nil {
// Return a typed error, per Issue 16997
return &net.OpError{Op: "proxyconnect", Net: "tcp", Err: err}
}
return err
}
if cm.scheme() == "https" && t.hasCustomTLSDialer() {
var err error
pconn.conn, err = t.customDialTLS(ctx, "tcp", cm.addr())
if err != nil {
return nil, wrapErr(err)
}
if tc, ok := pconn.conn.(*tls.Conn); ok {
// Handshake here, in case DialTLS didn't. TLSNextProto below
// depends on it for knowing the connection state.
if trace != nil && trace.TLSHandshakeStart != nil {
trace.TLSHandshakeStart()
}
if err := tc.Handshake(); err != nil {
go pconn.conn.Close()
if trace != nil && trace.TLSHandshakeDone != nil {
trace.TLSHandshakeDone(tls.ConnectionState{}, err)
}
return nil, err
}
cs := tc.ConnectionState()
if trace != nil && trace.TLSHandshakeDone != nil {
trace.TLSHandshakeDone(cs, nil)
}
pconn.tlsState = &cs
}
} else {
conn, err := t.dial(ctx, "tcp", cm.addr())
if err != nil {
return nil, wrapErr(err)
}
pconn.conn = conn
if cm.scheme() == "https" {
var firstTLSHost string
if firstTLSHost, _, err = net.SplitHostPort(cm.addr()); err != nil {
return nil, wrapErr(err)
}
if err = pconn.addTLS(firstTLSHost, trace); err != nil {
return nil, wrapErr(err)
}
}
}
...
pconn.br = bufio.NewReaderSize(pconn, t.readBufferSize()) <-これは将来的にconn.Readをしてサーバーから読み取り
pconn.bw = bufio.NewWriterSize(persistConnWriter{pconn}, t.writeBufferSize()) <-これは将来的にconn.Writeをしてサーバーへ書き込み
go pconn.readLoop()
go pconn.writeLoop()
return pconn, nil
}
dialConnで押さえておきたいのは
1.persistConnはtransport.dial()から取得した実際のコネクションのwrapperということ
2.コネクション新規作成時にtransport.dial()をしてファイルディスクリプタを生成しているということ
3.同じくコネクション新規作成時にreadLoop,writeLoopというGoroutineを起動していること
readLoop,writeLoopはpersistConnection.roundTripと協調して受信、送信を行っている
roundtripとreadLoop,writeLoopが協調するということだけここでは押さえておく。
原因の箇所、readLoop
ようやくreadLoopまで来たので問題となる箇所を見ていく。
func (pc *persistConn) readLoop() {
closeErr := errReadLoopExiting // default value, if not changed below
defer func() {
pc.close(closeErr)
pc.t.removeIdleConn(pc)
}()
tryPutIdleConn := func(trace *httptrace.ClientTrace) bool {
if err := pc.t.tryPutIdleConn(pc); err != nil {
closeErr = err
if trace != nil && trace.PutIdleConn != nil && err != errKeepAlivesDisabled {
trace.PutIdleConn(err)
}
return false
}
if trace != nil && trace.PutIdleConn != nil {
trace.PutIdleConn(nil)
}
return true
}
// eofc is used to block caller goroutines reading from Response.Body
// at EOF until this goroutines has (potentially) added the connection
// back to the idle pool.
eofc := make(chan struct{})
defer close(eofc) // unblock reader on errors
// Read this once, before loop starts. (to avoid races in tests)
testHookMu.Lock()
testHookReadLoopBeforeNextRead := testHookReadLoopBeforeNextRead
testHookMu.Unlock()
alive := true <- ★3
for alive {
pc.readLimit = pc.maxHeaderResponseSize()
_, err := pc.br.Peek(1)
pc.mu.Lock()
if pc.numExpectedResponses == 0 {
pc.readLoopPeekFailLocked(err)
pc.mu.Unlock()
return
}
pc.mu.Unlock()
rc := <-pc.reqch
trace := httptrace.ContextClientTrace(rc.req.Context())
var resp *Response
if err == nil {
resp, err = pc.readResponse(rc, trace)
} else {
err = transportReadFromServerError{err}
closeErr = err
}
if err != nil {
if pc.readLimit <= 0 {
err = fmt.Errorf("net/http: server response headers exceeded %d bytes; aborted", pc.maxHeaderResponseSize())
}
select {
case rc.ch <- responseAndError{err: err}:
case <-rc.callerGone:
return
}
return
}
pc.readLimit = maxInt64 // effectively no limit for response bodies
pc.mu.Lock()
pc.numExpectedResponses--
pc.mu.Unlock()
bodyWritable := resp.bodyIsWritable()
hasBody := rc.req.Method != "HEAD" && resp.ContentLength != 0
if resp.Close || rc.req.Close || resp.StatusCode <= 199 || bodyWritable {
// Don't do keep-alive on error if either party requested a close
// or we get an unexpected informational (1xx) response.
// StatusCode 100 is already handled above.
alive = false
}
if !hasBody || bodyWritable {
replaced := pc.t.replaceReqCanceler(rc.cancelKey, nil)
// Put the idle conn back into the pool before we send the response
// so if they process it quickly and make another request, they'll
// get this same conn. But we use the unbuffered channel 'rc'
// to guarantee that persistConn.roundTrip got out of its select
// potentially waiting for this persistConn to close.
alive = alive &&
!pc.sawEOF &&
pc.wroteRequest() &&
replaced && tryPutIdleConn(trace)
if bodyWritable {
closeErr = errCallerOwnsConn
}
select {
case rc.ch <- responseAndError{res: resp}:
case <-rc.callerGone:
return
}
// Now that they've read from the unbuffered channel, they're safely
// out of the select that also waits on this goroutine to die, so
// we're allowed to exit now if needed (if alive is false)
testHookReadLoopBeforeNextRead()
continue
}
waitForBodyRead := make(chan bool, 2) <-★2
body := &bodyEOFSignal{
body: resp.Body,
earlyCloseFn: func() error {
waitForBodyRead <- false
<-eofc // will be closed by deferred call at the end of the function
return nil
},
fn: func(err error) error {
isEOF := err == io.EOF
waitForBodyRead <- isEOF
if isEOF {
<-eofc // see comment above eofc declaration
} else if err != nil {
if cerr := pc.canceled(); cerr != nil {
return cerr
}
}
return err
},
}
resp.Body = body
if rc.addedGzip && strings.EqualFold(resp.Header.Get("Content-Encoding"), "gzip") {
resp.Body = &gzipReader{body: body}
resp.Header.Del("Content-Encoding")
resp.Header.Del("Content-Length")
resp.ContentLength = -1
resp.Uncompressed = true
}
select {
case rc.ch <- responseAndError{res: resp}:
case <-rc.callerGone:
return
}
// Before looping back to the top of this function and peeking on
// the bufio.Reader, wait for the caller goroutine to finish
// reading the response body. (or for cancellation or death)
select { <-★1
case bodyEOF := <-waitForBodyRead:
replaced := pc.t.replaceReqCanceler(rc.cancelKey, nil) // before pc might return to idle pool
alive = alive &&
bodyEOF &&
!pc.sawEOF &&
pc.wroteRequest() &&
replaced && tryPutIdleConn(trace)
if bodyEOF {
eofc <- struct{}{}
}
case <-rc.req.Cancel:
alive = false
pc.t.CancelRequest(rc.req)
case <-rc.req.Context().Done():
alive = false
pc.t.cancelRequest(rc.cancelKey, rc.req.Context().Err())
case <-pc.closech:
alive = false
}
testHookReadLoopBeforeNextRead()
}
}
responseBodyをCloseしないといけないわけ
まずは1から見ていこう。
- responseBodyをCloseしないとコネクションがブロックしてしまい再利用されず、古い接続が残ったまま、新しく接続するたびに新しいGoroutineとファイルディスクリプタを作ってしまう
一番最後のあたりのselect文でbodyを読むまで待っている部分がある(読み切ったかどうかはcase内部で判定)、ここのwaitForBodyReadがブロックしてしまうというのが問題で、
ここがブロックしてしまえば
tryPutIdleConnでコネクションをTransportのQueueに入れて再利用ができない。
★1
select {
case bodyEOF := <-waitForBodyRead:
replaced := pc.t.replaceReqCanceler(rc.cancelKey, nil) // before pc might return to idle pool
alive = alive &&
bodyEOF &&
!pc.sawEOF &&
pc.wroteRequest() &&
replaced && tryPutIdleConn(trace)
if bodyEOF {
eofc <- struct{}{}
}
どんな時にブロックしてしまうかというと、下記のbodyがresp.bodyなのだが、fn,earlyCloseFnでwaitForBodyReadに値を入れていることがわかる。
fn,earlyCloseFnはbody.Closeしたときに発火するのでbodyをCloseしなければ、無駄にコネクションが作られ、ファイルディスクリプタとGoroutineの無駄になってしまう。
★2
waitForBodyRead := make(chan bool, 2)
body := &bodyEOFSignal{
body: resp.Body,
earlyCloseFn: func() error {
waitForBodyRead <- false
<-eofc // will be closed by deferred call at the end of the function
return nil
},
fn: func(err error) error {
isEOF := err == io.EOF
waitForBodyRead <- isEOF
if isEOF {
<-eofc // see comment above eofc declaration
} else if err != nil {
if cerr := pc.canceled(); cerr != nil {
return cerr
}
}
return err
},
}
func (es *bodyEOFSignal) Close() error {
es.mu.Lock()
defer es.mu.Unlock()
if es.closed {
return nil
}
es.closed = true
if es.earlyCloseFn != nil && es.rerr != io.EOF {
return es.earlyCloseFn()
}
err := es.body.Close()
return es.condfn(err)
}
responseBodyを読み切らないといけないわけ
次に、
2. responseBodyを読み切らないとkeepAliveされずコネクションが終了してしまい再利用されず、接続のたびに新しい接続を作ってしまう。
を見ていこう。これもreadLoopが関わる、BodyをCloseしたとしてcase式の中へ行ったときにaliveとあるが、これはkeepAliveのこと
alive = trueである限りreadLoopからは抜けることはない。
bodyを読み切った時は★4のbodyEOF=trueで、
tryPutIdleConnでtransportのqueueにコネクションを入れる。
次回以降clientでsendした場合はTransportのコネクションを使うことができ、新しくコネクションを作るわけではないのでreadLoopもqueueから取り出したコネクションがまだ動いている状態なのでそれを使える。
ただ、bodyを最後まで読み切らないと、★4のbodyEOFがfalseとなり、tryPutIdleConnも実行されず、alive=falseとなってreadLoopを抜けてしまう。
なのでコネクションの再利用が行われず、keepAliveも終了してしまうという形
func (pc *persistConn) readLoop() {
...
alive := true <- ★3
for alive {
...
select {
case bodyEOF := <-waitForBodyRead:
replaced := pc.t.replaceReqCanceler(rc.cancelKey, nil) // before pc might return to idle pool
alive = alive &&
bodyEOF && <-★4
!pc.sawEOF &&
pc.wroteRequest() &&
replaced && tryPutIdleConn(trace)
if bodyEOF {
eofc <- struct{}{}
}
}
以上が、
resp.body.Close()とbodyを最後まで読み切ったほうが良い理由となっています。
やらなければいけないとはわかっていても実際なぜなのかよくわかっていなかったので勉強になりました。
参考
Discussion