diff options
Diffstat (limited to 'libgo/go/net/http/h2_bundle.go')
-rw-r--r-- | libgo/go/net/http/h2_bundle.go | 581 |
1 files changed, 379 insertions, 202 deletions
diff --git a/libgo/go/net/http/h2_bundle.go b/libgo/go/net/http/h2_bundle.go index 373f55098a3..f5a95084d24 100644 --- a/libgo/go/net/http/h2_bundle.go +++ b/libgo/go/net/http/h2_bundle.go @@ -30,6 +30,7 @@ import ( "io/ioutil" "log" "math" + mathrand "math/rand" "net" "net/http/httptrace" "net/textproto" @@ -3909,12 +3910,15 @@ func http2ConfigureServer(s *Server, conf *http2Server) error { } else if s.TLSConfig.CipherSuites != nil { // If they already provided a CipherSuite list, return // an error if it has a bad order or is missing - // ECDHE_RSA_WITH_AES_128_GCM_SHA256. - const requiredCipher = tls.TLS_ECDHE_RSA_WITH_AES_128_GCM_SHA256 + // ECDHE_RSA_WITH_AES_128_GCM_SHA256 or ECDHE_ECDSA_WITH_AES_128_GCM_SHA256. haveRequired := false sawBad := false for i, cs := range s.TLSConfig.CipherSuites { - if cs == requiredCipher { + switch cs { + case tls.TLS_ECDHE_RSA_WITH_AES_128_GCM_SHA256, + // Alternative MTI cipher to not discourage ECDSA-only servers. + // See http://golang.org/cl/30721 for further information. + tls.TLS_ECDHE_ECDSA_WITH_AES_128_GCM_SHA256: haveRequired = true } if http2isBadCipher(cs) { @@ -3924,7 +3928,7 @@ func http2ConfigureServer(s *Server, conf *http2Server) error { } } if !haveRequired { - return fmt.Errorf("http2: TLSConfig.CipherSuites is missing HTTP/2-required TLS_ECDHE_RSA_WITH_AES_128_GCM_SHA256") + return fmt.Errorf("http2: TLSConfig.CipherSuites is missing an HTTP/2-required AES_128_GCM_SHA256 cipher.") } } @@ -4341,7 +4345,7 @@ func (sc *http2serverConn) condlogf(err error, format string, args ...interface{ if err == nil { return } - if err == io.EOF || err == io.ErrUnexpectedEOF || http2isClosedConnError(err) { + if err == io.EOF || err == io.ErrUnexpectedEOF || http2isClosedConnError(err) || err == http2errPrefaceTimeout { // Boring, expected errors. sc.vlogf(format, args...) } else { @@ -4545,8 +4549,13 @@ func (sc *http2serverConn) serve() { } } - if sc.inGoAway && sc.curOpenStreams() == 0 && !sc.needToSendGoAway && !sc.writingFrame { - return + // Start the shutdown timer after sending a GOAWAY. When sending GOAWAY + // with no error code (graceful shutdown), don't start the timer until + // all open streams have been completed. + sentGoAway := sc.inGoAway && !sc.needToSendGoAway && !sc.writingFrame + gracefulShutdownComplete := sc.goAwayCode == http2ErrCodeNo && sc.curOpenStreams() == 0 + if sentGoAway && sc.shutdownTimer == nil && (sc.goAwayCode != http2ErrCodeNo || gracefulShutdownComplete) { + sc.shutDownIn(http2goAwayTimeout) } } } @@ -4583,8 +4592,11 @@ func (sc *http2serverConn) sendServeMsg(msg interface{}) { } } -// readPreface reads the ClientPreface greeting from the peer -// or returns an error on timeout or an invalid greeting. +var http2errPrefaceTimeout = errors.New("timeout waiting for client preface") + +// readPreface reads the ClientPreface greeting from the peer or +// returns errPrefaceTimeout on timeout, or an error if the greeting +// is invalid. func (sc *http2serverConn) readPreface() error { errc := make(chan error, 1) go func() { @@ -4602,7 +4614,7 @@ func (sc *http2serverConn) readPreface() error { defer timer.Stop() select { case <-timer.C: - return errors.New("timeout waiting for client preface") + return http2errPrefaceTimeout case err := <-errc: if err == nil { if http2VerboseLogs { @@ -4912,30 +4924,31 @@ func (sc *http2serverConn) startGracefulShutdown() { sc.shutdownOnce.Do(func() { sc.sendServeMsg(http2gracefulShutdownMsg) }) } +// After sending GOAWAY, the connection will close after goAwayTimeout. +// If we close the connection immediately after sending GOAWAY, there may +// be unsent data in our kernel receive buffer, which will cause the kernel +// to send a TCP RST on close() instead of a FIN. This RST will abort the +// connection immediately, whether or not the client had received the GOAWAY. +// +// Ideally we should delay for at least 1 RTT + epsilon so the client has +// a chance to read the GOAWAY and stop sending messages. Measuring RTT +// is hard, so we approximate with 1 second. See golang.org/issue/18701. +// +// This is a var so it can be shorter in tests, where all requests uses the +// loopback interface making the expected RTT very small. +// +// TODO: configurable? +var http2goAwayTimeout = 1 * time.Second + func (sc *http2serverConn) startGracefulShutdownInternal() { - sc.goAwayIn(http2ErrCodeNo, 0) + sc.goAway(http2ErrCodeNo) } func (sc *http2serverConn) goAway(code http2ErrCode) { sc.serveG.check() - var forceCloseIn time.Duration - if code != http2ErrCodeNo { - forceCloseIn = 250 * time.Millisecond - } else { - // TODO: configurable - forceCloseIn = 1 * time.Second - } - sc.goAwayIn(code, forceCloseIn) -} - -func (sc *http2serverConn) goAwayIn(code http2ErrCode, forceCloseIn time.Duration) { - sc.serveG.check() if sc.inGoAway { return } - if forceCloseIn != 0 { - sc.shutDownIn(forceCloseIn) - } sc.inGoAway = true sc.needToSendGoAway = true sc.goAwayCode = code @@ -6004,7 +6017,7 @@ func (rws *http2responseWriterState) writeChunk(p []byte) (n int, err error) { clen = strconv.Itoa(len(p)) } _, hasContentType := rws.snapHeader["Content-Type"] - if !hasContentType && http2bodyAllowedForStatus(rws.status) { + if !hasContentType && http2bodyAllowedForStatus(rws.status) && len(p) > 0 { ctype = DetectContentType(p) } var date string @@ -6172,7 +6185,26 @@ func (w *http2responseWriter) Header() Header { return rws.handlerHeader } +// checkWriteHeaderCode is a copy of net/http's checkWriteHeaderCode. +func http2checkWriteHeaderCode(code int) { + // Issue 22880: require valid WriteHeader status codes. + // For now we only enforce that it's three digits. + // In the future we might block things over 599 (600 and above aren't defined + // at http://httpwg.org/specs/rfc7231.html#status.codes) + // and we might block under 200 (once we have more mature 1xx support). + // But for now any three digits. + // + // We used to send "HTTP/1.1 000 0" on the wire in responses but there's + // no equivalent bogus thing we can realistically send in HTTP/2, + // so we'll consistently panic instead and help people find their bugs + // early. (We can't return an error from WriteHeader even if we wanted to.) + if code < 100 || code > 999 { + panic(fmt.Sprintf("invalid WriteHeader code %v", code)) + } +} + func (w *http2responseWriter) WriteHeader(code int) { + http2checkWriteHeaderCode(code) rws := w.rws if rws == nil { panic("WriteHeader called after Handler finished") @@ -6605,7 +6637,7 @@ type http2Transport struct { // MaxHeaderListSize is the http2 SETTINGS_MAX_HEADER_LIST_SIZE to // send in the initial settings frame. It is how many bytes - // of response headers are allow. Unlike the http2 spec, zero here + // of response headers are allowed. Unlike the http2 spec, zero here // means to use a default limit (currently 10MB). If you actually // want to advertise an ulimited value to the peer, Transport // interprets the highest possible value here (0xffffffff or 1<<32-1) @@ -6683,15 +6715,17 @@ type http2ClientConn struct { goAwayDebug string // goAway frame's debug data, retained as a string streams map[uint32]*http2clientStream // client-initiated nextStreamID uint32 + pendingRequests int // requests blocked and waiting to be sent because len(streams) == maxConcurrentStreams pings map[[8]byte]chan struct{} // in flight ping data to notification channel bw *bufio.Writer br *bufio.Reader fr *http2Framer lastActive time.Time // Settings from peer: (also guarded by mu) - maxFrameSize uint32 - maxConcurrentStreams uint32 - initialWindowSize uint32 + maxFrameSize uint32 + maxConcurrentStreams uint32 + peerMaxHeaderListSize uint64 + initialWindowSize uint32 hbuf bytes.Buffer // HPACK encoder writes into this henc *hpack.Encoder @@ -6735,35 +6769,45 @@ type http2clientStream struct { resTrailer *Header // client's Response.Trailer } -// awaitRequestCancel runs in its own goroutine and waits for the user -// to cancel a RoundTrip request, its context to expire, or for the -// request to be done (any way it might be removed from the cc.streams -// map: peer reset, successful completion, TCP connection breakage, -// etc) -func (cs *http2clientStream) awaitRequestCancel(req *Request) { +// awaitRequestCancel waits for the user to cancel a request or for the done +// channel to be signaled. A non-nil error is returned only if the request was +// canceled. +func http2awaitRequestCancel(req *Request, done <-chan struct{}) error { ctx := http2reqContext(req) if req.Cancel == nil && ctx.Done() == nil { - return + return nil } select { case <-req.Cancel: - cs.cancelStream() - cs.bufPipe.CloseWithError(http2errRequestCanceled) + return http2errRequestCanceled case <-ctx.Done(): + return ctx.Err() + case <-done: + return nil + } +} + +// awaitRequestCancel waits for the user to cancel a request, its context to +// expire, or for the request to be done (any way it might be removed from the +// cc.streams map: peer reset, successful completion, TCP connection breakage, +// etc). If the request is canceled, then cs will be canceled and closed. +func (cs *http2clientStream) awaitRequestCancel(req *Request) { + if err := http2awaitRequestCancel(req, cs.done); err != nil { cs.cancelStream() - cs.bufPipe.CloseWithError(ctx.Err()) - case <-cs.done: + cs.bufPipe.CloseWithError(err) } } func (cs *http2clientStream) cancelStream() { - cs.cc.mu.Lock() + cc := cs.cc + cc.mu.Lock() didReset := cs.didReset cs.didReset = true - cs.cc.mu.Unlock() + cc.mu.Unlock() if !didReset { - cs.cc.writeStreamReset(cs.ID, http2ErrCodeCancel, nil) + cc.writeStreamReset(cs.ID, http2ErrCodeCancel, nil) + cc.forgetStreamID(cs.ID) } } @@ -6780,6 +6824,13 @@ func (cs *http2clientStream) checkResetOrDone() error { } } +func (cs *http2clientStream) getStartedWrite() bool { + cc := cs.cc + cc.mu.Lock() + defer cc.mu.Unlock() + return cs.startedWrite +} + func (cs *http2clientStream) abortRequestBodyWrite(err error) { if err == nil { panic("nil error") @@ -6848,17 +6899,28 @@ func (t *http2Transport) RoundTripOpt(req *Request, opt http2RoundTripOpt) (*Res } addr := http2authorityAddr(req.URL.Scheme, req.URL.Host) - for { + for retry := 0; ; retry++ { cc, err := t.connPool().GetClientConn(req, addr) if err != nil { t.vlogf("http2: Transport failed to get client conn for %s: %v", addr, err) return nil, err } http2traceGotConn(req, cc) - res, err := cc.RoundTrip(req) - if err != nil { - if req, err = http2shouldRetryRequest(req, err); err == nil { - continue + res, gotErrAfterReqBodyWrite, err := cc.roundTrip(req) + if err != nil && retry <= 6 { + if req, err = http2shouldRetryRequest(req, err, gotErrAfterReqBodyWrite); err == nil { + // After the first retry, do exponential backoff with 10% jitter. + if retry == 0 { + continue + } + backoff := float64(uint(1) << (uint(retry) - 1)) + backoff += backoff * (0.1 * mathrand.Float64()) + select { + case <-time.After(time.Second * time.Duration(backoff)): + continue + case <-http2reqContext(req).Done(): + return nil, http2reqContext(req).Err() + } } } if err != nil { @@ -6879,43 +6941,50 @@ func (t *http2Transport) CloseIdleConnections() { } var ( - http2errClientConnClosed = errors.New("http2: client conn is closed") - http2errClientConnUnusable = errors.New("http2: client conn not usable") - - http2errClientConnGotGoAway = errors.New("http2: Transport received Server's graceful shutdown GOAWAY") - http2errClientConnGotGoAwayAfterSomeReqBody = errors.New("http2: Transport received Server's graceful shutdown GOAWAY; some request body already written") + http2errClientConnClosed = errors.New("http2: client conn is closed") + http2errClientConnUnusable = errors.New("http2: client conn not usable") + http2errClientConnGotGoAway = errors.New("http2: Transport received Server's graceful shutdown GOAWAY") ) // shouldRetryRequest is called by RoundTrip when a request fails to get // response headers. It is always called with a non-nil error. // It returns either a request to retry (either the same request, or a // modified clone), or an error if the request can't be replayed. -func http2shouldRetryRequest(req *Request, err error) (*Request, error) { - switch err { - default: +func http2shouldRetryRequest(req *Request, err error, afterBodyWrite bool) (*Request, error) { + if !http2canRetryError(err) { return nil, err - case http2errClientConnUnusable, http2errClientConnGotGoAway: + } + if !afterBodyWrite { return req, nil - case http2errClientConnGotGoAwayAfterSomeReqBody: - // If the Body is nil (or http.NoBody), it's safe to reuse - // this request and its Body. - if req.Body == nil || http2reqBodyIsNoBody(req.Body) { - return req, nil - } - // Otherwise we depend on the Request having its GetBody - // func defined. - getBody := http2reqGetBody(req) // Go 1.8: getBody = req.GetBody - if getBody == nil { - return nil, errors.New("http2: Transport: peer server initiated graceful shutdown after some of Request.Body was written; define Request.GetBody to avoid this error") - } - body, err := getBody() - if err != nil { - return nil, err - } - newReq := *req - newReq.Body = body - return &newReq, nil } + // If the Body is nil (or http.NoBody), it's safe to reuse + // this request and its Body. + if req.Body == nil || http2reqBodyIsNoBody(req.Body) { + return req, nil + } + // Otherwise we depend on the Request having its GetBody + // func defined. + getBody := http2reqGetBody(req) // Go 1.8: getBody = req.GetBody + if getBody == nil { + return nil, fmt.Errorf("http2: Transport: cannot retry err [%v] after Request.Body was written; define Request.GetBody to avoid this error", err) + } + body, err := getBody() + if err != nil { + return nil, err + } + newReq := *req + newReq.Body = body + return &newReq, nil +} + +func http2canRetryError(err error) bool { + if err == http2errClientConnUnusable || err == http2errClientConnGotGoAway { + return true + } + if se, ok := err.(http2StreamError); ok { + return se.Code == http2ErrCodeRefusedStream + } + return false } func (t *http2Transport) dialClientConn(addr string, singleUse bool) (*http2ClientConn, error) { @@ -6993,17 +7062,18 @@ func (t *http2Transport) NewClientConn(c net.Conn) (*http2ClientConn, error) { func (t *http2Transport) newClientConn(c net.Conn, singleUse bool) (*http2ClientConn, error) { cc := &http2ClientConn{ - t: t, - tconn: c, - readerDone: make(chan struct{}), - nextStreamID: 1, - maxFrameSize: 16 << 10, // spec default - initialWindowSize: 65535, // spec default - maxConcurrentStreams: 1000, // "infinite", per spec. 1000 seems good enough. - streams: make(map[uint32]*http2clientStream), - singleUse: singleUse, - wantSettingsAck: true, - pings: make(map[[8]byte]chan struct{}), + t: t, + tconn: c, + readerDone: make(chan struct{}), + nextStreamID: 1, + maxFrameSize: 16 << 10, // spec default + initialWindowSize: 65535, // spec default + maxConcurrentStreams: 1000, // "infinite", per spec. 1000 seems good enough. + peerMaxHeaderListSize: 0xffffffffffffffff, // "infinite", per spec. Use 2^64-1 instead. + streams: make(map[uint32]*http2clientStream), + singleUse: singleUse, + wantSettingsAck: true, + pings: make(map[[8]byte]chan struct{}), } if d := t.idleConnTimeout(); d != 0 { cc.idleTimeout = d @@ -7079,6 +7149,8 @@ func (cc *http2ClientConn) setGoAway(f *http2GoAwayFrame) { } } +// CanTakeNewRequest reports whether the connection can take a new request, +// meaning it has not been closed or received or sent a GOAWAY. func (cc *http2ClientConn) CanTakeNewRequest() bool { cc.mu.Lock() defer cc.mu.Unlock() @@ -7090,8 +7162,7 @@ func (cc *http2ClientConn) canTakeNewRequestLocked() bool { return false } return cc.goAway == nil && !cc.closed && - int64(len(cc.streams)+1) < int64(cc.maxConcurrentStreams) && - cc.nextStreamID < math.MaxInt32 + int64(cc.nextStreamID)+int64(cc.pendingRequests) < math.MaxInt32 } // onIdleTimeout is called from a time.AfterFunc goroutine. It will @@ -7223,8 +7294,13 @@ func http2actualContentLength(req *Request) int64 { } func (cc *http2ClientConn) RoundTrip(req *Request) (*Response, error) { + resp, _, err := cc.roundTrip(req) + return resp, err +} + +func (cc *http2ClientConn) roundTrip(req *Request) (res *Response, gotErrAfterReqBodyWrite bool, err error) { if err := http2checkConnHeaders(req); err != nil { - return nil, err + return nil, false, err } if cc.idleTimer != nil { cc.idleTimer.Stop() @@ -7232,15 +7308,14 @@ func (cc *http2ClientConn) RoundTrip(req *Request) (*Response, error) { trailers, err := http2commaSeparatedTrailers(req) if err != nil { - return nil, err + return nil, false, err } hasTrailers := trailers != "" cc.mu.Lock() - cc.lastActive = time.Now() - if cc.closed || !cc.canTakeNewRequestLocked() { + if err := cc.awaitOpenSlotForRequest(req); err != nil { cc.mu.Unlock() - return nil, http2errClientConnUnusable + return nil, false, err } body := req.Body @@ -7274,7 +7349,7 @@ func (cc *http2ClientConn) RoundTrip(req *Request) (*Response, error) { hdrs, err := cc.encodeHeaders(req, requestedGzip, trailers, contentLen) if err != nil { cc.mu.Unlock() - return nil, err + return nil, false, err } cs := cc.newStream() @@ -7286,7 +7361,7 @@ func (cc *http2ClientConn) RoundTrip(req *Request) (*Response, error) { cc.wmu.Lock() endStream := !hasBody && !hasTrailers - werr := cc.writeHeaders(cs.ID, endStream, hdrs) + werr := cc.writeHeaders(cs.ID, endStream, int(cc.maxFrameSize), hdrs) cc.wmu.Unlock() http2traceWroteHeaders(cs.trace) cc.mu.Unlock() @@ -7300,7 +7375,7 @@ func (cc *http2ClientConn) RoundTrip(req *Request) (*Response, error) { // Don't bother sending a RST_STREAM (our write already failed; // no need to keep writing) http2traceWroteRequest(cs.trace, werr) - return nil, werr + return nil, false, werr } var respHeaderTimer <-chan time.Time @@ -7319,7 +7394,7 @@ func (cc *http2ClientConn) RoundTrip(req *Request) (*Response, error) { bodyWritten := false ctx := http2reqContext(req) - handleReadLoopResponse := func(re http2resAndError) (*Response, error) { + handleReadLoopResponse := func(re http2resAndError) (*Response, bool, error) { res := re.res if re.err != nil || res.StatusCode > 299 { // On error or status code 3xx, 4xx, 5xx, etc abort any @@ -7335,19 +7410,12 @@ func (cc *http2ClientConn) RoundTrip(req *Request) (*Response, error) { cs.abortRequestBodyWrite(http2errStopReqBodyWrite) } if re.err != nil { - if re.err == http2errClientConnGotGoAway { - cc.mu.Lock() - if cs.startedWrite { - re.err = http2errClientConnGotGoAwayAfterSomeReqBody - } - cc.mu.Unlock() - } cc.forgetStreamID(cs.ID) - return nil, re.err + return nil, cs.getStartedWrite(), re.err } res.Request = req res.TLS = cc.tlsState - return res, nil + return res, false, nil } for { @@ -7355,42 +7423,42 @@ func (cc *http2ClientConn) RoundTrip(req *Request) (*Response, error) { case re := <-readLoopResCh: return handleReadLoopResponse(re) case <-respHeaderTimer: - cc.forgetStreamID(cs.ID) if !hasBody || bodyWritten { cc.writeStreamReset(cs.ID, http2ErrCodeCancel, nil) } else { bodyWriter.cancel() cs.abortRequestBodyWrite(http2errStopReqBodyWriteAndCancel) } - return nil, http2errTimeout + cc.forgetStreamID(cs.ID) + return nil, cs.getStartedWrite(), http2errTimeout case <-ctx.Done(): select { case re := <-readLoopResCh: return handleReadLoopResponse(re) default: } - cc.forgetStreamID(cs.ID) if !hasBody || bodyWritten { cc.writeStreamReset(cs.ID, http2ErrCodeCancel, nil) } else { bodyWriter.cancel() cs.abortRequestBodyWrite(http2errStopReqBodyWriteAndCancel) } - return nil, ctx.Err() + cc.forgetStreamID(cs.ID) + return nil, cs.getStartedWrite(), ctx.Err() case <-req.Cancel: select { case re := <-readLoopResCh: return handleReadLoopResponse(re) default: } - cc.forgetStreamID(cs.ID) if !hasBody || bodyWritten { cc.writeStreamReset(cs.ID, http2ErrCodeCancel, nil) } else { bodyWriter.cancel() cs.abortRequestBodyWrite(http2errStopReqBodyWriteAndCancel) } - return nil, http2errRequestCanceled + cc.forgetStreamID(cs.ID) + return nil, cs.getStartedWrite(), http2errRequestCanceled case <-cs.peerReset: select { case re := <-readLoopResCh: @@ -7400,7 +7468,7 @@ func (cc *http2ClientConn) RoundTrip(req *Request) (*Response, error) { // processResetStream already removed the // stream from the streams map; no need for // forgetStreamID. - return nil, cs.resetErr + return nil, cs.getStartedWrite(), cs.resetErr case err := <-bodyWriter.resc: // Prefer the read loop's response, if available. Issue 16102. select { @@ -7409,7 +7477,7 @@ func (cc *http2ClientConn) RoundTrip(req *Request) (*Response, error) { default: } if err != nil { - return nil, err + return nil, cs.getStartedWrite(), err } bodyWritten = true if d := cc.responseHeaderTimeout(); d != 0 { @@ -7421,14 +7489,52 @@ func (cc *http2ClientConn) RoundTrip(req *Request) (*Response, error) { } } +// awaitOpenSlotForRequest waits until len(streams) < maxConcurrentStreams. +// Must hold cc.mu. +func (cc *http2ClientConn) awaitOpenSlotForRequest(req *Request) error { + var waitingForConn chan struct{} + var waitingForConnErr error // guarded by cc.mu + for { + cc.lastActive = time.Now() + if cc.closed || !cc.canTakeNewRequestLocked() { + return http2errClientConnUnusable + } + if int64(len(cc.streams))+1 <= int64(cc.maxConcurrentStreams) { + if waitingForConn != nil { + close(waitingForConn) + } + return nil + } + // Unfortunately, we cannot wait on a condition variable and channel at + // the same time, so instead, we spin up a goroutine to check if the + // request is canceled while we wait for a slot to open in the connection. + if waitingForConn == nil { + waitingForConn = make(chan struct{}) + go func() { + if err := http2awaitRequestCancel(req, waitingForConn); err != nil { + cc.mu.Lock() + waitingForConnErr = err + cc.cond.Broadcast() + cc.mu.Unlock() + } + }() + } + cc.pendingRequests++ + cc.cond.Wait() + cc.pendingRequests-- + if waitingForConnErr != nil { + return waitingForConnErr + } + } +} + // requires cc.wmu be held -func (cc *http2ClientConn) writeHeaders(streamID uint32, endStream bool, hdrs []byte) error { +func (cc *http2ClientConn) writeHeaders(streamID uint32, endStream bool, maxFrameSize int, hdrs []byte) error { first := true // first frame written (HEADERS is first, then CONTINUATION) - frameSize := int(cc.maxFrameSize) for len(hdrs) > 0 && cc.werr == nil { chunk := hdrs - if len(chunk) > frameSize { - chunk = chunk[:frameSize] + if len(chunk) > maxFrameSize { + chunk = chunk[:maxFrameSize] } hdrs = hdrs[len(chunk):] endHeaders := len(hdrs) == 0 @@ -7536,17 +7642,26 @@ func (cs *http2clientStream) writeRequestBody(body io.Reader, bodyCloser io.Clos var trls []byte if hasTrailers { cc.mu.Lock() - defer cc.mu.Unlock() - trls = cc.encodeTrailers(req) + trls, err = cc.encodeTrailers(req) + cc.mu.Unlock() + if err != nil { + cc.writeStreamReset(cs.ID, http2ErrCodeInternal, err) + cc.forgetStreamID(cs.ID) + return err + } } + cc.mu.Lock() + maxFrameSize := int(cc.maxFrameSize) + cc.mu.Unlock() + cc.wmu.Lock() defer cc.wmu.Unlock() // Two ways to send END_STREAM: either with trailers, or // with an empty DATA frame. if len(trls) > 0 { - err = cc.writeHeaders(cs.ID, true, trls) + err = cc.writeHeaders(cs.ID, true, maxFrameSize, trls) } else { err = cc.fr.WriteData(cs.ID, true, nil) } @@ -7640,62 +7755,86 @@ func (cc *http2ClientConn) encodeHeaders(req *Request, addGzipHeader bool, trail } } - // 8.1.2.3 Request Pseudo-Header Fields - // The :path pseudo-header field includes the path and query parts of the - // target URI (the path-absolute production and optionally a '?' character - // followed by the query production (see Sections 3.3 and 3.4 of - // [RFC3986]). - cc.writeHeader(":authority", host) - cc.writeHeader(":method", req.Method) - if req.Method != "CONNECT" { - cc.writeHeader(":path", path) - cc.writeHeader(":scheme", req.URL.Scheme) - } - if trailers != "" { - cc.writeHeader("trailer", trailers) - } - - var didUA bool - for k, vv := range req.Header { - lowKey := strings.ToLower(k) - switch lowKey { - case "host", "content-length": - // Host is :authority, already sent. - // Content-Length is automatic, set below. - continue - case "connection", "proxy-connection", "transfer-encoding", "upgrade", "keep-alive": - // Per 8.1.2.2 Connection-Specific Header - // Fields, don't send connection-specific - // fields. We have already checked if any - // are error-worthy so just ignore the rest. - continue - case "user-agent": - // Match Go's http1 behavior: at most one - // User-Agent. If set to nil or empty string, - // then omit it. Otherwise if not mentioned, - // include the default (below). - didUA = true - if len(vv) < 1 { + enumerateHeaders := func(f func(name, value string)) { + // 8.1.2.3 Request Pseudo-Header Fields + // The :path pseudo-header field includes the path and query parts of the + // target URI (the path-absolute production and optionally a '?' character + // followed by the query production (see Sections 3.3 and 3.4 of + // [RFC3986]). + f(":authority", host) + f(":method", req.Method) + if req.Method != "CONNECT" { + f(":path", path) + f(":scheme", req.URL.Scheme) + } + if trailers != "" { + f("trailer", trailers) + } + + var didUA bool + for k, vv := range req.Header { + if strings.EqualFold(k, "host") || strings.EqualFold(k, "content-length") { + // Host is :authority, already sent. + // Content-Length is automatic, set below. continue - } - vv = vv[:1] - if vv[0] == "" { + } else if strings.EqualFold(k, "connection") || strings.EqualFold(k, "proxy-connection") || + strings.EqualFold(k, "transfer-encoding") || strings.EqualFold(k, "upgrade") || + strings.EqualFold(k, "keep-alive") { + // Per 8.1.2.2 Connection-Specific Header + // Fields, don't send connection-specific + // fields. We have already checked if any + // are error-worthy so just ignore the rest. continue + } else if strings.EqualFold(k, "user-agent") { + // Match Go's http1 behavior: at most one + // User-Agent. If set to nil or empty string, + // then omit it. Otherwise if not mentioned, + // include the default (below). + didUA = true + if len(vv) < 1 { + continue + } + vv = vv[:1] + if vv[0] == "" { + continue + } + + } + + for _, v := range vv { + f(k, v) } } - for _, v := range vv { - cc.writeHeader(lowKey, v) + if http2shouldSendReqContentLength(req.Method, contentLength) { + f("content-length", strconv.FormatInt(contentLength, 10)) + } + if addGzipHeader { + f("accept-encoding", "gzip") + } + if !didUA { + f("user-agent", http2defaultUserAgent) } } - if http2shouldSendReqContentLength(req.Method, contentLength) { - cc.writeHeader("content-length", strconv.FormatInt(contentLength, 10)) - } - if addGzipHeader { - cc.writeHeader("accept-encoding", "gzip") - } - if !didUA { - cc.writeHeader("user-agent", http2defaultUserAgent) + + // Do a first pass over the headers counting bytes to ensure + // we don't exceed cc.peerMaxHeaderListSize. This is done as a + // separate pass before encoding the headers to prevent + // modifying the hpack state. + hlSize := uint64(0) + enumerateHeaders(func(name, value string) { + hf := hpack.HeaderField{Name: name, Value: value} + hlSize += uint64(hf.Size()) + }) + + if hlSize > cc.peerMaxHeaderListSize { + return nil, http2errRequestHeaderListSize } + + // Header list size is ok. Write the headers. + enumerateHeaders(func(name, value string) { + cc.writeHeader(strings.ToLower(name), value) + }) + return cc.hbuf.Bytes(), nil } @@ -7722,17 +7861,29 @@ func http2shouldSendReqContentLength(method string, contentLength int64) bool { } // requires cc.mu be held. -func (cc *http2ClientConn) encodeTrailers(req *Request) []byte { +func (cc *http2ClientConn) encodeTrailers(req *Request) ([]byte, error) { cc.hbuf.Reset() + + hlSize := uint64(0) + for k, vv := range req.Trailer { + for _, v := range vv { + hf := hpack.HeaderField{Name: k, Value: v} + hlSize += uint64(hf.Size()) + } + } + if hlSize > cc.peerMaxHeaderListSize { + return nil, http2errRequestHeaderListSize + } + for k, vv := range req.Trailer { - // Transfer-Encoding, etc.. have already been filter at the + // Transfer-Encoding, etc.. have already been filtered at the // start of RoundTrip lowKey := strings.ToLower(k) for _, v := range vv { cc.writeHeader(lowKey, v) } } - return cc.hbuf.Bytes() + return cc.hbuf.Bytes(), nil } func (cc *http2ClientConn) writeHeader(name, value string) { @@ -7780,7 +7931,9 @@ func (cc *http2ClientConn) streamByID(id uint32, andRemove bool) *http2clientStr cc.idleTimer.Reset(cc.idleTimeout) } close(cs.done) - cc.cond.Broadcast() // wake up checkResetOrDone via clientStream.awaitFlowControl + // Wake up checkResetOrDone via clientStream.awaitFlowControl and + // wake up RoundTrip if there is a pending request. + cc.cond.Broadcast() } return cs } @@ -7788,17 +7941,12 @@ func (cc *http2ClientConn) streamByID(id uint32, andRemove bool) *http2clientStr // clientConnReadLoop is the state owned by the clientConn's frame-reading readLoop. type http2clientConnReadLoop struct { cc *http2ClientConn - activeRes map[uint32]*http2clientStream // keyed by streamID closeWhenIdle bool } // readLoop runs in its own goroutine and reads and dispatches frames. func (cc *http2ClientConn) readLoop() { - rl := &http2clientConnReadLoop{ - cc: cc, - activeRes: make(map[uint32]*http2clientStream), - } - + rl := &http2clientConnReadLoop{cc: cc} defer rl.cleanup() cc.readerErr = rl.run() if ce, ok := cc.readerErr.(http2ConnectionError); ok { @@ -7853,10 +8001,8 @@ func (rl *http2clientConnReadLoop) cleanup() { } else if err == io.EOF { err = io.ErrUnexpectedEOF } - for _, cs := range rl.activeRes { - cs.bufPipe.CloseWithError(err) - } for _, cs := range cc.streams { + cs.bufPipe.CloseWithError(err) // no-op if already closed select { case cs.resc <- http2resAndError{err: err}: default: @@ -7879,8 +8025,9 @@ func (rl *http2clientConnReadLoop) run() error { cc.vlogf("http2: Transport readFrame error on conn %p: (%T) %v", cc, err, err) } if se, ok := err.(http2StreamError); ok { - if cs := cc.streamByID(se.StreamID, true /*ended; remove it*/); cs != nil { + if cs := cc.streamByID(se.StreamID, false); cs != nil { cs.cc.writeStreamReset(cs.ID, se.Code, err) + cs.cc.forgetStreamID(cs.ID) if se.Cause == nil { se.Cause = cc.fr.errDetail } @@ -7933,7 +8080,7 @@ func (rl *http2clientConnReadLoop) run() error { } return err } - if rl.closeWhenIdle && gotReply && maybeIdle && len(rl.activeRes) == 0 { + if rl.closeWhenIdle && gotReply && maybeIdle { cc.closeIfIdle() } } @@ -7941,13 +8088,31 @@ func (rl *http2clientConnReadLoop) run() error { func (rl *http2clientConnReadLoop) processHeaders(f *http2MetaHeadersFrame) error { cc := rl.cc - cs := cc.streamByID(f.StreamID, f.StreamEnded()) + cs := cc.streamByID(f.StreamID, false) if cs == nil { // We'd get here if we canceled a request while the // server had its response still in flight. So if this // was just something we canceled, ignore it. return nil } + if f.StreamEnded() { + // Issue 20521: If the stream has ended, streamByID() causes + // clientStream.done to be closed, which causes the request's bodyWriter + // to be closed with an errStreamClosed, which may be received by + // clientConn.RoundTrip before the result of processing these headers. + // Deferring stream closure allows the header processing to occur first. + // clientConn.RoundTrip may still receive the bodyWriter error first, but + // the fix for issue 16102 prioritises any response. + // + // Issue 22413: If there is no request body, we should close the + // stream before writing to cs.resc so that the stream is closed + // immediately once RoundTrip returns. + if cs.req.Body != nil { + defer cc.forgetStreamID(f.StreamID) + } else { + cc.forgetStreamID(f.StreamID) + } + } if !cs.firstByte { if cs.trace != nil { // TODO(bradfitz): move first response byte earlier, @@ -7971,6 +8136,7 @@ func (rl *http2clientConnReadLoop) processHeaders(f *http2MetaHeadersFrame) erro } // Any other error type is a stream error. cs.cc.writeStreamReset(f.StreamID, http2ErrCodeProtocol, err) + cc.forgetStreamID(cs.ID) cs.resc <- http2resAndError{err: err} return nil // return nil from process* funcs to keep conn alive } @@ -7978,9 +8144,6 @@ func (rl *http2clientConnReadLoop) processHeaders(f *http2MetaHeadersFrame) erro // (nil, nil) special case. See handleResponse docs. return nil } - if res.Body != http2noBody { - rl.activeRes[cs.ID] = cs - } cs.resTrailer = &res.Trailer cs.resc <- http2resAndError{res: res} return nil @@ -8000,11 +8163,11 @@ func (rl *http2clientConnReadLoop) handleResponse(cs *http2clientStream, f *http status := f.PseudoValue("status") if status == "" { - return nil, errors.New("missing status pseudo header") + return nil, errors.New("malformed response from server: missing status pseudo header") } statusCode, err := strconv.Atoi(status) if err != nil { - return nil, errors.New("malformed non-numeric status pseudo header") + return nil, errors.New("malformed response from server: malformed non-numeric status pseudo header") } if statusCode == 100 { @@ -8202,6 +8365,7 @@ func (b http2transportResponseBody) Close() error { } cs.bufPipe.BreakWithError(http2errClosedResponseBody) + cc.forgetStreamID(cs.ID) return nil } @@ -8236,7 +8400,23 @@ func (rl *http2clientConnReadLoop) processData(f *http2DataFrame) error { } return nil } + if !cs.firstByte { + cc.logf("protocol error: received DATA before a HEADERS frame") + rl.endStreamError(cs, http2StreamError{ + StreamID: f.StreamID, + Code: http2ErrCodeProtocol, + }) + return nil + } if f.Length > 0 { + if cs.req.Method == "HEAD" && len(data) > 0 { + cc.logf("protocol error: received DATA on a HEAD request") + rl.endStreamError(cs, http2StreamError{ + StreamID: f.StreamID, + Code: http2ErrCodeProtocol, + }) + return nil + } // Check connection-level flow control. cc.mu.Lock() if cs.inflow.available() >= int32(f.Length) { @@ -8298,11 +8478,10 @@ func (rl *http2clientConnReadLoop) endStreamError(cs *http2clientStream, err err err = io.EOF code = cs.copyTrailers } - cs.bufPipe.closeWithErrorAndCode(err, code) - delete(rl.activeRes, cs.ID) if http2isConnectionCloseRequest(cs.req) { rl.closeWhenIdle = true } + cs.bufPipe.closeWithErrorAndCode(err, code) select { case cs.resc <- http2resAndError{err: err}: @@ -8350,6 +8529,8 @@ func (rl *http2clientConnReadLoop) processSettings(f *http2SettingsFrame) error cc.maxFrameSize = s.Val case http2SettingMaxConcurrentStreams: cc.maxConcurrentStreams = s.Val + case http2SettingMaxHeaderListSize: + cc.peerMaxHeaderListSize = uint64(s.Val) case http2SettingInitialWindowSize: // Values above the maximum flow-control // window size of 2^31-1 MUST be treated as a @@ -8427,7 +8608,6 @@ func (rl *http2clientConnReadLoop) processResetStream(f *http2RSTStreamFrame) er cs.bufPipe.CloseWithError(err) cs.cc.cond.Broadcast() // wake up checkResetOrDone via clientStream.awaitFlowControl } - delete(rl.activeRes, cs.ID) return nil } @@ -8516,6 +8696,7 @@ func (cc *http2ClientConn) writeStreamReset(streamID uint32, code http2ErrCode, var ( http2errResponseHeaderListSize = errors.New("http2: response header list larger than advertised limit") + http2errRequestHeaderListSize = errors.New("http2: request header list larger than peer's advertised limit") http2errPseudoTrailers = errors.New("http2: invalid pseudo header in trailers") ) @@ -8741,11 +8922,7 @@ type http2writeGoAway struct { func (p *http2writeGoAway) writeFrame(ctx http2writeContext) error { err := ctx.Framer().WriteGoAway(p.maxStreamID, p.code, nil) - if p.code != 0 { - ctx.Flush() // ignore error: we're hanging up on them anyway - time.Sleep(50 * time.Millisecond) - ctx.CloseConn() - } + ctx.Flush() // ignore error: we're hanging up on them anyway return err } |