summaryrefslogtreecommitdiff
path: root/libgo/go/net/http/h2_bundle.go
diff options
context:
space:
mode:
Diffstat (limited to 'libgo/go/net/http/h2_bundle.go')
-rw-r--r--libgo/go/net/http/h2_bundle.go581
1 files changed, 379 insertions, 202 deletions
diff --git a/libgo/go/net/http/h2_bundle.go b/libgo/go/net/http/h2_bundle.go
index 373f55098a3..f5a95084d24 100644
--- a/libgo/go/net/http/h2_bundle.go
+++ b/libgo/go/net/http/h2_bundle.go
@@ -30,6 +30,7 @@ import (
"io/ioutil"
"log"
"math"
+ mathrand "math/rand"
"net"
"net/http/httptrace"
"net/textproto"
@@ -3909,12 +3910,15 @@ func http2ConfigureServer(s *Server, conf *http2Server) error {
} else if s.TLSConfig.CipherSuites != nil {
// If they already provided a CipherSuite list, return
// an error if it has a bad order or is missing
- // ECDHE_RSA_WITH_AES_128_GCM_SHA256.
- const requiredCipher = tls.TLS_ECDHE_RSA_WITH_AES_128_GCM_SHA256
+ // ECDHE_RSA_WITH_AES_128_GCM_SHA256 or ECDHE_ECDSA_WITH_AES_128_GCM_SHA256.
haveRequired := false
sawBad := false
for i, cs := range s.TLSConfig.CipherSuites {
- if cs == requiredCipher {
+ switch cs {
+ case tls.TLS_ECDHE_RSA_WITH_AES_128_GCM_SHA256,
+ // Alternative MTI cipher to not discourage ECDSA-only servers.
+ // See http://golang.org/cl/30721 for further information.
+ tls.TLS_ECDHE_ECDSA_WITH_AES_128_GCM_SHA256:
haveRequired = true
}
if http2isBadCipher(cs) {
@@ -3924,7 +3928,7 @@ func http2ConfigureServer(s *Server, conf *http2Server) error {
}
}
if !haveRequired {
- return fmt.Errorf("http2: TLSConfig.CipherSuites is missing HTTP/2-required TLS_ECDHE_RSA_WITH_AES_128_GCM_SHA256")
+ return fmt.Errorf("http2: TLSConfig.CipherSuites is missing an HTTP/2-required AES_128_GCM_SHA256 cipher.")
}
}
@@ -4341,7 +4345,7 @@ func (sc *http2serverConn) condlogf(err error, format string, args ...interface{
if err == nil {
return
}
- if err == io.EOF || err == io.ErrUnexpectedEOF || http2isClosedConnError(err) {
+ if err == io.EOF || err == io.ErrUnexpectedEOF || http2isClosedConnError(err) || err == http2errPrefaceTimeout {
// Boring, expected errors.
sc.vlogf(format, args...)
} else {
@@ -4545,8 +4549,13 @@ func (sc *http2serverConn) serve() {
}
}
- if sc.inGoAway && sc.curOpenStreams() == 0 && !sc.needToSendGoAway && !sc.writingFrame {
- return
+ // Start the shutdown timer after sending a GOAWAY. When sending GOAWAY
+ // with no error code (graceful shutdown), don't start the timer until
+ // all open streams have been completed.
+ sentGoAway := sc.inGoAway && !sc.needToSendGoAway && !sc.writingFrame
+ gracefulShutdownComplete := sc.goAwayCode == http2ErrCodeNo && sc.curOpenStreams() == 0
+ if sentGoAway && sc.shutdownTimer == nil && (sc.goAwayCode != http2ErrCodeNo || gracefulShutdownComplete) {
+ sc.shutDownIn(http2goAwayTimeout)
}
}
}
@@ -4583,8 +4592,11 @@ func (sc *http2serverConn) sendServeMsg(msg interface{}) {
}
}
-// readPreface reads the ClientPreface greeting from the peer
-// or returns an error on timeout or an invalid greeting.
+var http2errPrefaceTimeout = errors.New("timeout waiting for client preface")
+
+// readPreface reads the ClientPreface greeting from the peer or
+// returns errPrefaceTimeout on timeout, or an error if the greeting
+// is invalid.
func (sc *http2serverConn) readPreface() error {
errc := make(chan error, 1)
go func() {
@@ -4602,7 +4614,7 @@ func (sc *http2serverConn) readPreface() error {
defer timer.Stop()
select {
case <-timer.C:
- return errors.New("timeout waiting for client preface")
+ return http2errPrefaceTimeout
case err := <-errc:
if err == nil {
if http2VerboseLogs {
@@ -4912,30 +4924,31 @@ func (sc *http2serverConn) startGracefulShutdown() {
sc.shutdownOnce.Do(func() { sc.sendServeMsg(http2gracefulShutdownMsg) })
}
+// After sending GOAWAY, the connection will close after goAwayTimeout.
+// If we close the connection immediately after sending GOAWAY, there may
+// be unsent data in our kernel receive buffer, which will cause the kernel
+// to send a TCP RST on close() instead of a FIN. This RST will abort the
+// connection immediately, whether or not the client had received the GOAWAY.
+//
+// Ideally we should delay for at least 1 RTT + epsilon so the client has
+// a chance to read the GOAWAY and stop sending messages. Measuring RTT
+// is hard, so we approximate with 1 second. See golang.org/issue/18701.
+//
+// This is a var so it can be shorter in tests, where all requests uses the
+// loopback interface making the expected RTT very small.
+//
+// TODO: configurable?
+var http2goAwayTimeout = 1 * time.Second
+
func (sc *http2serverConn) startGracefulShutdownInternal() {
- sc.goAwayIn(http2ErrCodeNo, 0)
+ sc.goAway(http2ErrCodeNo)
}
func (sc *http2serverConn) goAway(code http2ErrCode) {
sc.serveG.check()
- var forceCloseIn time.Duration
- if code != http2ErrCodeNo {
- forceCloseIn = 250 * time.Millisecond
- } else {
- // TODO: configurable
- forceCloseIn = 1 * time.Second
- }
- sc.goAwayIn(code, forceCloseIn)
-}
-
-func (sc *http2serverConn) goAwayIn(code http2ErrCode, forceCloseIn time.Duration) {
- sc.serveG.check()
if sc.inGoAway {
return
}
- if forceCloseIn != 0 {
- sc.shutDownIn(forceCloseIn)
- }
sc.inGoAway = true
sc.needToSendGoAway = true
sc.goAwayCode = code
@@ -6004,7 +6017,7 @@ func (rws *http2responseWriterState) writeChunk(p []byte) (n int, err error) {
clen = strconv.Itoa(len(p))
}
_, hasContentType := rws.snapHeader["Content-Type"]
- if !hasContentType && http2bodyAllowedForStatus(rws.status) {
+ if !hasContentType && http2bodyAllowedForStatus(rws.status) && len(p) > 0 {
ctype = DetectContentType(p)
}
var date string
@@ -6172,7 +6185,26 @@ func (w *http2responseWriter) Header() Header {
return rws.handlerHeader
}
+// checkWriteHeaderCode is a copy of net/http's checkWriteHeaderCode.
+func http2checkWriteHeaderCode(code int) {
+ // Issue 22880: require valid WriteHeader status codes.
+ // For now we only enforce that it's three digits.
+ // In the future we might block things over 599 (600 and above aren't defined
+ // at http://httpwg.org/specs/rfc7231.html#status.codes)
+ // and we might block under 200 (once we have more mature 1xx support).
+ // But for now any three digits.
+ //
+ // We used to send "HTTP/1.1 000 0" on the wire in responses but there's
+ // no equivalent bogus thing we can realistically send in HTTP/2,
+ // so we'll consistently panic instead and help people find their bugs
+ // early. (We can't return an error from WriteHeader even if we wanted to.)
+ if code < 100 || code > 999 {
+ panic(fmt.Sprintf("invalid WriteHeader code %v", code))
+ }
+}
+
func (w *http2responseWriter) WriteHeader(code int) {
+ http2checkWriteHeaderCode(code)
rws := w.rws
if rws == nil {
panic("WriteHeader called after Handler finished")
@@ -6605,7 +6637,7 @@ type http2Transport struct {
// MaxHeaderListSize is the http2 SETTINGS_MAX_HEADER_LIST_SIZE to
// send in the initial settings frame. It is how many bytes
- // of response headers are allow. Unlike the http2 spec, zero here
+ // of response headers are allowed. Unlike the http2 spec, zero here
// means to use a default limit (currently 10MB). If you actually
// want to advertise an ulimited value to the peer, Transport
// interprets the highest possible value here (0xffffffff or 1<<32-1)
@@ -6683,15 +6715,17 @@ type http2ClientConn struct {
goAwayDebug string // goAway frame's debug data, retained as a string
streams map[uint32]*http2clientStream // client-initiated
nextStreamID uint32
+ pendingRequests int // requests blocked and waiting to be sent because len(streams) == maxConcurrentStreams
pings map[[8]byte]chan struct{} // in flight ping data to notification channel
bw *bufio.Writer
br *bufio.Reader
fr *http2Framer
lastActive time.Time
// Settings from peer: (also guarded by mu)
- maxFrameSize uint32
- maxConcurrentStreams uint32
- initialWindowSize uint32
+ maxFrameSize uint32
+ maxConcurrentStreams uint32
+ peerMaxHeaderListSize uint64
+ initialWindowSize uint32
hbuf bytes.Buffer // HPACK encoder writes into this
henc *hpack.Encoder
@@ -6735,35 +6769,45 @@ type http2clientStream struct {
resTrailer *Header // client's Response.Trailer
}
-// awaitRequestCancel runs in its own goroutine and waits for the user
-// to cancel a RoundTrip request, its context to expire, or for the
-// request to be done (any way it might be removed from the cc.streams
-// map: peer reset, successful completion, TCP connection breakage,
-// etc)
-func (cs *http2clientStream) awaitRequestCancel(req *Request) {
+// awaitRequestCancel waits for the user to cancel a request or for the done
+// channel to be signaled. A non-nil error is returned only if the request was
+// canceled.
+func http2awaitRequestCancel(req *Request, done <-chan struct{}) error {
ctx := http2reqContext(req)
if req.Cancel == nil && ctx.Done() == nil {
- return
+ return nil
}
select {
case <-req.Cancel:
- cs.cancelStream()
- cs.bufPipe.CloseWithError(http2errRequestCanceled)
+ return http2errRequestCanceled
case <-ctx.Done():
+ return ctx.Err()
+ case <-done:
+ return nil
+ }
+}
+
+// awaitRequestCancel waits for the user to cancel a request, its context to
+// expire, or for the request to be done (any way it might be removed from the
+// cc.streams map: peer reset, successful completion, TCP connection breakage,
+// etc). If the request is canceled, then cs will be canceled and closed.
+func (cs *http2clientStream) awaitRequestCancel(req *Request) {
+ if err := http2awaitRequestCancel(req, cs.done); err != nil {
cs.cancelStream()
- cs.bufPipe.CloseWithError(ctx.Err())
- case <-cs.done:
+ cs.bufPipe.CloseWithError(err)
}
}
func (cs *http2clientStream) cancelStream() {
- cs.cc.mu.Lock()
+ cc := cs.cc
+ cc.mu.Lock()
didReset := cs.didReset
cs.didReset = true
- cs.cc.mu.Unlock()
+ cc.mu.Unlock()
if !didReset {
- cs.cc.writeStreamReset(cs.ID, http2ErrCodeCancel, nil)
+ cc.writeStreamReset(cs.ID, http2ErrCodeCancel, nil)
+ cc.forgetStreamID(cs.ID)
}
}
@@ -6780,6 +6824,13 @@ func (cs *http2clientStream) checkResetOrDone() error {
}
}
+func (cs *http2clientStream) getStartedWrite() bool {
+ cc := cs.cc
+ cc.mu.Lock()
+ defer cc.mu.Unlock()
+ return cs.startedWrite
+}
+
func (cs *http2clientStream) abortRequestBodyWrite(err error) {
if err == nil {
panic("nil error")
@@ -6848,17 +6899,28 @@ func (t *http2Transport) RoundTripOpt(req *Request, opt http2RoundTripOpt) (*Res
}
addr := http2authorityAddr(req.URL.Scheme, req.URL.Host)
- for {
+ for retry := 0; ; retry++ {
cc, err := t.connPool().GetClientConn(req, addr)
if err != nil {
t.vlogf("http2: Transport failed to get client conn for %s: %v", addr, err)
return nil, err
}
http2traceGotConn(req, cc)
- res, err := cc.RoundTrip(req)
- if err != nil {
- if req, err = http2shouldRetryRequest(req, err); err == nil {
- continue
+ res, gotErrAfterReqBodyWrite, err := cc.roundTrip(req)
+ if err != nil && retry <= 6 {
+ if req, err = http2shouldRetryRequest(req, err, gotErrAfterReqBodyWrite); err == nil {
+ // After the first retry, do exponential backoff with 10% jitter.
+ if retry == 0 {
+ continue
+ }
+ backoff := float64(uint(1) << (uint(retry) - 1))
+ backoff += backoff * (0.1 * mathrand.Float64())
+ select {
+ case <-time.After(time.Second * time.Duration(backoff)):
+ continue
+ case <-http2reqContext(req).Done():
+ return nil, http2reqContext(req).Err()
+ }
}
}
if err != nil {
@@ -6879,43 +6941,50 @@ func (t *http2Transport) CloseIdleConnections() {
}
var (
- http2errClientConnClosed = errors.New("http2: client conn is closed")
- http2errClientConnUnusable = errors.New("http2: client conn not usable")
-
- http2errClientConnGotGoAway = errors.New("http2: Transport received Server's graceful shutdown GOAWAY")
- http2errClientConnGotGoAwayAfterSomeReqBody = errors.New("http2: Transport received Server's graceful shutdown GOAWAY; some request body already written")
+ http2errClientConnClosed = errors.New("http2: client conn is closed")
+ http2errClientConnUnusable = errors.New("http2: client conn not usable")
+ http2errClientConnGotGoAway = errors.New("http2: Transport received Server's graceful shutdown GOAWAY")
)
// shouldRetryRequest is called by RoundTrip when a request fails to get
// response headers. It is always called with a non-nil error.
// It returns either a request to retry (either the same request, or a
// modified clone), or an error if the request can't be replayed.
-func http2shouldRetryRequest(req *Request, err error) (*Request, error) {
- switch err {
- default:
+func http2shouldRetryRequest(req *Request, err error, afterBodyWrite bool) (*Request, error) {
+ if !http2canRetryError(err) {
return nil, err
- case http2errClientConnUnusable, http2errClientConnGotGoAway:
+ }
+ if !afterBodyWrite {
return req, nil
- case http2errClientConnGotGoAwayAfterSomeReqBody:
- // If the Body is nil (or http.NoBody), it's safe to reuse
- // this request and its Body.
- if req.Body == nil || http2reqBodyIsNoBody(req.Body) {
- return req, nil
- }
- // Otherwise we depend on the Request having its GetBody
- // func defined.
- getBody := http2reqGetBody(req) // Go 1.8: getBody = req.GetBody
- if getBody == nil {
- return nil, errors.New("http2: Transport: peer server initiated graceful shutdown after some of Request.Body was written; define Request.GetBody to avoid this error")
- }
- body, err := getBody()
- if err != nil {
- return nil, err
- }
- newReq := *req
- newReq.Body = body
- return &newReq, nil
}
+ // If the Body is nil (or http.NoBody), it's safe to reuse
+ // this request and its Body.
+ if req.Body == nil || http2reqBodyIsNoBody(req.Body) {
+ return req, nil
+ }
+ // Otherwise we depend on the Request having its GetBody
+ // func defined.
+ getBody := http2reqGetBody(req) // Go 1.8: getBody = req.GetBody
+ if getBody == nil {
+ return nil, fmt.Errorf("http2: Transport: cannot retry err [%v] after Request.Body was written; define Request.GetBody to avoid this error", err)
+ }
+ body, err := getBody()
+ if err != nil {
+ return nil, err
+ }
+ newReq := *req
+ newReq.Body = body
+ return &newReq, nil
+}
+
+func http2canRetryError(err error) bool {
+ if err == http2errClientConnUnusable || err == http2errClientConnGotGoAway {
+ return true
+ }
+ if se, ok := err.(http2StreamError); ok {
+ return se.Code == http2ErrCodeRefusedStream
+ }
+ return false
}
func (t *http2Transport) dialClientConn(addr string, singleUse bool) (*http2ClientConn, error) {
@@ -6993,17 +7062,18 @@ func (t *http2Transport) NewClientConn(c net.Conn) (*http2ClientConn, error) {
func (t *http2Transport) newClientConn(c net.Conn, singleUse bool) (*http2ClientConn, error) {
cc := &http2ClientConn{
- t: t,
- tconn: c,
- readerDone: make(chan struct{}),
- nextStreamID: 1,
- maxFrameSize: 16 << 10, // spec default
- initialWindowSize: 65535, // spec default
- maxConcurrentStreams: 1000, // "infinite", per spec. 1000 seems good enough.
- streams: make(map[uint32]*http2clientStream),
- singleUse: singleUse,
- wantSettingsAck: true,
- pings: make(map[[8]byte]chan struct{}),
+ t: t,
+ tconn: c,
+ readerDone: make(chan struct{}),
+ nextStreamID: 1,
+ maxFrameSize: 16 << 10, // spec default
+ initialWindowSize: 65535, // spec default
+ maxConcurrentStreams: 1000, // "infinite", per spec. 1000 seems good enough.
+ peerMaxHeaderListSize: 0xffffffffffffffff, // "infinite", per spec. Use 2^64-1 instead.
+ streams: make(map[uint32]*http2clientStream),
+ singleUse: singleUse,
+ wantSettingsAck: true,
+ pings: make(map[[8]byte]chan struct{}),
}
if d := t.idleConnTimeout(); d != 0 {
cc.idleTimeout = d
@@ -7079,6 +7149,8 @@ func (cc *http2ClientConn) setGoAway(f *http2GoAwayFrame) {
}
}
+// CanTakeNewRequest reports whether the connection can take a new request,
+// meaning it has not been closed or received or sent a GOAWAY.
func (cc *http2ClientConn) CanTakeNewRequest() bool {
cc.mu.Lock()
defer cc.mu.Unlock()
@@ -7090,8 +7162,7 @@ func (cc *http2ClientConn) canTakeNewRequestLocked() bool {
return false
}
return cc.goAway == nil && !cc.closed &&
- int64(len(cc.streams)+1) < int64(cc.maxConcurrentStreams) &&
- cc.nextStreamID < math.MaxInt32
+ int64(cc.nextStreamID)+int64(cc.pendingRequests) < math.MaxInt32
}
// onIdleTimeout is called from a time.AfterFunc goroutine. It will
@@ -7223,8 +7294,13 @@ func http2actualContentLength(req *Request) int64 {
}
func (cc *http2ClientConn) RoundTrip(req *Request) (*Response, error) {
+ resp, _, err := cc.roundTrip(req)
+ return resp, err
+}
+
+func (cc *http2ClientConn) roundTrip(req *Request) (res *Response, gotErrAfterReqBodyWrite bool, err error) {
if err := http2checkConnHeaders(req); err != nil {
- return nil, err
+ return nil, false, err
}
if cc.idleTimer != nil {
cc.idleTimer.Stop()
@@ -7232,15 +7308,14 @@ func (cc *http2ClientConn) RoundTrip(req *Request) (*Response, error) {
trailers, err := http2commaSeparatedTrailers(req)
if err != nil {
- return nil, err
+ return nil, false, err
}
hasTrailers := trailers != ""
cc.mu.Lock()
- cc.lastActive = time.Now()
- if cc.closed || !cc.canTakeNewRequestLocked() {
+ if err := cc.awaitOpenSlotForRequest(req); err != nil {
cc.mu.Unlock()
- return nil, http2errClientConnUnusable
+ return nil, false, err
}
body := req.Body
@@ -7274,7 +7349,7 @@ func (cc *http2ClientConn) RoundTrip(req *Request) (*Response, error) {
hdrs, err := cc.encodeHeaders(req, requestedGzip, trailers, contentLen)
if err != nil {
cc.mu.Unlock()
- return nil, err
+ return nil, false, err
}
cs := cc.newStream()
@@ -7286,7 +7361,7 @@ func (cc *http2ClientConn) RoundTrip(req *Request) (*Response, error) {
cc.wmu.Lock()
endStream := !hasBody && !hasTrailers
- werr := cc.writeHeaders(cs.ID, endStream, hdrs)
+ werr := cc.writeHeaders(cs.ID, endStream, int(cc.maxFrameSize), hdrs)
cc.wmu.Unlock()
http2traceWroteHeaders(cs.trace)
cc.mu.Unlock()
@@ -7300,7 +7375,7 @@ func (cc *http2ClientConn) RoundTrip(req *Request) (*Response, error) {
// Don't bother sending a RST_STREAM (our write already failed;
// no need to keep writing)
http2traceWroteRequest(cs.trace, werr)
- return nil, werr
+ return nil, false, werr
}
var respHeaderTimer <-chan time.Time
@@ -7319,7 +7394,7 @@ func (cc *http2ClientConn) RoundTrip(req *Request) (*Response, error) {
bodyWritten := false
ctx := http2reqContext(req)
- handleReadLoopResponse := func(re http2resAndError) (*Response, error) {
+ handleReadLoopResponse := func(re http2resAndError) (*Response, bool, error) {
res := re.res
if re.err != nil || res.StatusCode > 299 {
// On error or status code 3xx, 4xx, 5xx, etc abort any
@@ -7335,19 +7410,12 @@ func (cc *http2ClientConn) RoundTrip(req *Request) (*Response, error) {
cs.abortRequestBodyWrite(http2errStopReqBodyWrite)
}
if re.err != nil {
- if re.err == http2errClientConnGotGoAway {
- cc.mu.Lock()
- if cs.startedWrite {
- re.err = http2errClientConnGotGoAwayAfterSomeReqBody
- }
- cc.mu.Unlock()
- }
cc.forgetStreamID(cs.ID)
- return nil, re.err
+ return nil, cs.getStartedWrite(), re.err
}
res.Request = req
res.TLS = cc.tlsState
- return res, nil
+ return res, false, nil
}
for {
@@ -7355,42 +7423,42 @@ func (cc *http2ClientConn) RoundTrip(req *Request) (*Response, error) {
case re := <-readLoopResCh:
return handleReadLoopResponse(re)
case <-respHeaderTimer:
- cc.forgetStreamID(cs.ID)
if !hasBody || bodyWritten {
cc.writeStreamReset(cs.ID, http2ErrCodeCancel, nil)
} else {
bodyWriter.cancel()
cs.abortRequestBodyWrite(http2errStopReqBodyWriteAndCancel)
}
- return nil, http2errTimeout
+ cc.forgetStreamID(cs.ID)
+ return nil, cs.getStartedWrite(), http2errTimeout
case <-ctx.Done():
select {
case re := <-readLoopResCh:
return handleReadLoopResponse(re)
default:
}
- cc.forgetStreamID(cs.ID)
if !hasBody || bodyWritten {
cc.writeStreamReset(cs.ID, http2ErrCodeCancel, nil)
} else {
bodyWriter.cancel()
cs.abortRequestBodyWrite(http2errStopReqBodyWriteAndCancel)
}
- return nil, ctx.Err()
+ cc.forgetStreamID(cs.ID)
+ return nil, cs.getStartedWrite(), ctx.Err()
case <-req.Cancel:
select {
case re := <-readLoopResCh:
return handleReadLoopResponse(re)
default:
}
- cc.forgetStreamID(cs.ID)
if !hasBody || bodyWritten {
cc.writeStreamReset(cs.ID, http2ErrCodeCancel, nil)
} else {
bodyWriter.cancel()
cs.abortRequestBodyWrite(http2errStopReqBodyWriteAndCancel)
}
- return nil, http2errRequestCanceled
+ cc.forgetStreamID(cs.ID)
+ return nil, cs.getStartedWrite(), http2errRequestCanceled
case <-cs.peerReset:
select {
case re := <-readLoopResCh:
@@ -7400,7 +7468,7 @@ func (cc *http2ClientConn) RoundTrip(req *Request) (*Response, error) {
// processResetStream already removed the
// stream from the streams map; no need for
// forgetStreamID.
- return nil, cs.resetErr
+ return nil, cs.getStartedWrite(), cs.resetErr
case err := <-bodyWriter.resc:
// Prefer the read loop's response, if available. Issue 16102.
select {
@@ -7409,7 +7477,7 @@ func (cc *http2ClientConn) RoundTrip(req *Request) (*Response, error) {
default:
}
if err != nil {
- return nil, err
+ return nil, cs.getStartedWrite(), err
}
bodyWritten = true
if d := cc.responseHeaderTimeout(); d != 0 {
@@ -7421,14 +7489,52 @@ func (cc *http2ClientConn) RoundTrip(req *Request) (*Response, error) {
}
}
+// awaitOpenSlotForRequest waits until len(streams) < maxConcurrentStreams.
+// Must hold cc.mu.
+func (cc *http2ClientConn) awaitOpenSlotForRequest(req *Request) error {
+ var waitingForConn chan struct{}
+ var waitingForConnErr error // guarded by cc.mu
+ for {
+ cc.lastActive = time.Now()
+ if cc.closed || !cc.canTakeNewRequestLocked() {
+ return http2errClientConnUnusable
+ }
+ if int64(len(cc.streams))+1 <= int64(cc.maxConcurrentStreams) {
+ if waitingForConn != nil {
+ close(waitingForConn)
+ }
+ return nil
+ }
+ // Unfortunately, we cannot wait on a condition variable and channel at
+ // the same time, so instead, we spin up a goroutine to check if the
+ // request is canceled while we wait for a slot to open in the connection.
+ if waitingForConn == nil {
+ waitingForConn = make(chan struct{})
+ go func() {
+ if err := http2awaitRequestCancel(req, waitingForConn); err != nil {
+ cc.mu.Lock()
+ waitingForConnErr = err
+ cc.cond.Broadcast()
+ cc.mu.Unlock()
+ }
+ }()
+ }
+ cc.pendingRequests++
+ cc.cond.Wait()
+ cc.pendingRequests--
+ if waitingForConnErr != nil {
+ return waitingForConnErr
+ }
+ }
+}
+
// requires cc.wmu be held
-func (cc *http2ClientConn) writeHeaders(streamID uint32, endStream bool, hdrs []byte) error {
+func (cc *http2ClientConn) writeHeaders(streamID uint32, endStream bool, maxFrameSize int, hdrs []byte) error {
first := true // first frame written (HEADERS is first, then CONTINUATION)
- frameSize := int(cc.maxFrameSize)
for len(hdrs) > 0 && cc.werr == nil {
chunk := hdrs
- if len(chunk) > frameSize {
- chunk = chunk[:frameSize]
+ if len(chunk) > maxFrameSize {
+ chunk = chunk[:maxFrameSize]
}
hdrs = hdrs[len(chunk):]
endHeaders := len(hdrs) == 0
@@ -7536,17 +7642,26 @@ func (cs *http2clientStream) writeRequestBody(body io.Reader, bodyCloser io.Clos
var trls []byte
if hasTrailers {
cc.mu.Lock()
- defer cc.mu.Unlock()
- trls = cc.encodeTrailers(req)
+ trls, err = cc.encodeTrailers(req)
+ cc.mu.Unlock()
+ if err != nil {
+ cc.writeStreamReset(cs.ID, http2ErrCodeInternal, err)
+ cc.forgetStreamID(cs.ID)
+ return err
+ }
}
+ cc.mu.Lock()
+ maxFrameSize := int(cc.maxFrameSize)
+ cc.mu.Unlock()
+
cc.wmu.Lock()
defer cc.wmu.Unlock()
// Two ways to send END_STREAM: either with trailers, or
// with an empty DATA frame.
if len(trls) > 0 {
- err = cc.writeHeaders(cs.ID, true, trls)
+ err = cc.writeHeaders(cs.ID, true, maxFrameSize, trls)
} else {
err = cc.fr.WriteData(cs.ID, true, nil)
}
@@ -7640,62 +7755,86 @@ func (cc *http2ClientConn) encodeHeaders(req *Request, addGzipHeader bool, trail
}
}
- // 8.1.2.3 Request Pseudo-Header Fields
- // The :path pseudo-header field includes the path and query parts of the
- // target URI (the path-absolute production and optionally a '?' character
- // followed by the query production (see Sections 3.3 and 3.4 of
- // [RFC3986]).
- cc.writeHeader(":authority", host)
- cc.writeHeader(":method", req.Method)
- if req.Method != "CONNECT" {
- cc.writeHeader(":path", path)
- cc.writeHeader(":scheme", req.URL.Scheme)
- }
- if trailers != "" {
- cc.writeHeader("trailer", trailers)
- }
-
- var didUA bool
- for k, vv := range req.Header {
- lowKey := strings.ToLower(k)
- switch lowKey {
- case "host", "content-length":
- // Host is :authority, already sent.
- // Content-Length is automatic, set below.
- continue
- case "connection", "proxy-connection", "transfer-encoding", "upgrade", "keep-alive":
- // Per 8.1.2.2 Connection-Specific Header
- // Fields, don't send connection-specific
- // fields. We have already checked if any
- // are error-worthy so just ignore the rest.
- continue
- case "user-agent":
- // Match Go's http1 behavior: at most one
- // User-Agent. If set to nil or empty string,
- // then omit it. Otherwise if not mentioned,
- // include the default (below).
- didUA = true
- if len(vv) < 1 {
+ enumerateHeaders := func(f func(name, value string)) {
+ // 8.1.2.3 Request Pseudo-Header Fields
+ // The :path pseudo-header field includes the path and query parts of the
+ // target URI (the path-absolute production and optionally a '?' character
+ // followed by the query production (see Sections 3.3 and 3.4 of
+ // [RFC3986]).
+ f(":authority", host)
+ f(":method", req.Method)
+ if req.Method != "CONNECT" {
+ f(":path", path)
+ f(":scheme", req.URL.Scheme)
+ }
+ if trailers != "" {
+ f("trailer", trailers)
+ }
+
+ var didUA bool
+ for k, vv := range req.Header {
+ if strings.EqualFold(k, "host") || strings.EqualFold(k, "content-length") {
+ // Host is :authority, already sent.
+ // Content-Length is automatic, set below.
continue
- }
- vv = vv[:1]
- if vv[0] == "" {
+ } else if strings.EqualFold(k, "connection") || strings.EqualFold(k, "proxy-connection") ||
+ strings.EqualFold(k, "transfer-encoding") || strings.EqualFold(k, "upgrade") ||
+ strings.EqualFold(k, "keep-alive") {
+ // Per 8.1.2.2 Connection-Specific Header
+ // Fields, don't send connection-specific
+ // fields. We have already checked if any
+ // are error-worthy so just ignore the rest.
continue
+ } else if strings.EqualFold(k, "user-agent") {
+ // Match Go's http1 behavior: at most one
+ // User-Agent. If set to nil or empty string,
+ // then omit it. Otherwise if not mentioned,
+ // include the default (below).
+ didUA = true
+ if len(vv) < 1 {
+ continue
+ }
+ vv = vv[:1]
+ if vv[0] == "" {
+ continue
+ }
+
+ }
+
+ for _, v := range vv {
+ f(k, v)
}
}
- for _, v := range vv {
- cc.writeHeader(lowKey, v)
+ if http2shouldSendReqContentLength(req.Method, contentLength) {
+ f("content-length", strconv.FormatInt(contentLength, 10))
+ }
+ if addGzipHeader {
+ f("accept-encoding", "gzip")
+ }
+ if !didUA {
+ f("user-agent", http2defaultUserAgent)
}
}
- if http2shouldSendReqContentLength(req.Method, contentLength) {
- cc.writeHeader("content-length", strconv.FormatInt(contentLength, 10))
- }
- if addGzipHeader {
- cc.writeHeader("accept-encoding", "gzip")
- }
- if !didUA {
- cc.writeHeader("user-agent", http2defaultUserAgent)
+
+ // Do a first pass over the headers counting bytes to ensure
+ // we don't exceed cc.peerMaxHeaderListSize. This is done as a
+ // separate pass before encoding the headers to prevent
+ // modifying the hpack state.
+ hlSize := uint64(0)
+ enumerateHeaders(func(name, value string) {
+ hf := hpack.HeaderField{Name: name, Value: value}
+ hlSize += uint64(hf.Size())
+ })
+
+ if hlSize > cc.peerMaxHeaderListSize {
+ return nil, http2errRequestHeaderListSize
}
+
+ // Header list size is ok. Write the headers.
+ enumerateHeaders(func(name, value string) {
+ cc.writeHeader(strings.ToLower(name), value)
+ })
+
return cc.hbuf.Bytes(), nil
}
@@ -7722,17 +7861,29 @@ func http2shouldSendReqContentLength(method string, contentLength int64) bool {
}
// requires cc.mu be held.
-func (cc *http2ClientConn) encodeTrailers(req *Request) []byte {
+func (cc *http2ClientConn) encodeTrailers(req *Request) ([]byte, error) {
cc.hbuf.Reset()
+
+ hlSize := uint64(0)
+ for k, vv := range req.Trailer {
+ for _, v := range vv {
+ hf := hpack.HeaderField{Name: k, Value: v}
+ hlSize += uint64(hf.Size())
+ }
+ }
+ if hlSize > cc.peerMaxHeaderListSize {
+ return nil, http2errRequestHeaderListSize
+ }
+
for k, vv := range req.Trailer {
- // Transfer-Encoding, etc.. have already been filter at the
+ // Transfer-Encoding, etc.. have already been filtered at the
// start of RoundTrip
lowKey := strings.ToLower(k)
for _, v := range vv {
cc.writeHeader(lowKey, v)
}
}
- return cc.hbuf.Bytes()
+ return cc.hbuf.Bytes(), nil
}
func (cc *http2ClientConn) writeHeader(name, value string) {
@@ -7780,7 +7931,9 @@ func (cc *http2ClientConn) streamByID(id uint32, andRemove bool) *http2clientStr
cc.idleTimer.Reset(cc.idleTimeout)
}
close(cs.done)
- cc.cond.Broadcast() // wake up checkResetOrDone via clientStream.awaitFlowControl
+ // Wake up checkResetOrDone via clientStream.awaitFlowControl and
+ // wake up RoundTrip if there is a pending request.
+ cc.cond.Broadcast()
}
return cs
}
@@ -7788,17 +7941,12 @@ func (cc *http2ClientConn) streamByID(id uint32, andRemove bool) *http2clientStr
// clientConnReadLoop is the state owned by the clientConn's frame-reading readLoop.
type http2clientConnReadLoop struct {
cc *http2ClientConn
- activeRes map[uint32]*http2clientStream // keyed by streamID
closeWhenIdle bool
}
// readLoop runs in its own goroutine and reads and dispatches frames.
func (cc *http2ClientConn) readLoop() {
- rl := &http2clientConnReadLoop{
- cc: cc,
- activeRes: make(map[uint32]*http2clientStream),
- }
-
+ rl := &http2clientConnReadLoop{cc: cc}
defer rl.cleanup()
cc.readerErr = rl.run()
if ce, ok := cc.readerErr.(http2ConnectionError); ok {
@@ -7853,10 +8001,8 @@ func (rl *http2clientConnReadLoop) cleanup() {
} else if err == io.EOF {
err = io.ErrUnexpectedEOF
}
- for _, cs := range rl.activeRes {
- cs.bufPipe.CloseWithError(err)
- }
for _, cs := range cc.streams {
+ cs.bufPipe.CloseWithError(err) // no-op if already closed
select {
case cs.resc <- http2resAndError{err: err}:
default:
@@ -7879,8 +8025,9 @@ func (rl *http2clientConnReadLoop) run() error {
cc.vlogf("http2: Transport readFrame error on conn %p: (%T) %v", cc, err, err)
}
if se, ok := err.(http2StreamError); ok {
- if cs := cc.streamByID(se.StreamID, true /*ended; remove it*/); cs != nil {
+ if cs := cc.streamByID(se.StreamID, false); cs != nil {
cs.cc.writeStreamReset(cs.ID, se.Code, err)
+ cs.cc.forgetStreamID(cs.ID)
if se.Cause == nil {
se.Cause = cc.fr.errDetail
}
@@ -7933,7 +8080,7 @@ func (rl *http2clientConnReadLoop) run() error {
}
return err
}
- if rl.closeWhenIdle && gotReply && maybeIdle && len(rl.activeRes) == 0 {
+ if rl.closeWhenIdle && gotReply && maybeIdle {
cc.closeIfIdle()
}
}
@@ -7941,13 +8088,31 @@ func (rl *http2clientConnReadLoop) run() error {
func (rl *http2clientConnReadLoop) processHeaders(f *http2MetaHeadersFrame) error {
cc := rl.cc
- cs := cc.streamByID(f.StreamID, f.StreamEnded())
+ cs := cc.streamByID(f.StreamID, false)
if cs == nil {
// We'd get here if we canceled a request while the
// server had its response still in flight. So if this
// was just something we canceled, ignore it.
return nil
}
+ if f.StreamEnded() {
+ // Issue 20521: If the stream has ended, streamByID() causes
+ // clientStream.done to be closed, which causes the request's bodyWriter
+ // to be closed with an errStreamClosed, which may be received by
+ // clientConn.RoundTrip before the result of processing these headers.
+ // Deferring stream closure allows the header processing to occur first.
+ // clientConn.RoundTrip may still receive the bodyWriter error first, but
+ // the fix for issue 16102 prioritises any response.
+ //
+ // Issue 22413: If there is no request body, we should close the
+ // stream before writing to cs.resc so that the stream is closed
+ // immediately once RoundTrip returns.
+ if cs.req.Body != nil {
+ defer cc.forgetStreamID(f.StreamID)
+ } else {
+ cc.forgetStreamID(f.StreamID)
+ }
+ }
if !cs.firstByte {
if cs.trace != nil {
// TODO(bradfitz): move first response byte earlier,
@@ -7971,6 +8136,7 @@ func (rl *http2clientConnReadLoop) processHeaders(f *http2MetaHeadersFrame) erro
}
// Any other error type is a stream error.
cs.cc.writeStreamReset(f.StreamID, http2ErrCodeProtocol, err)
+ cc.forgetStreamID(cs.ID)
cs.resc <- http2resAndError{err: err}
return nil // return nil from process* funcs to keep conn alive
}
@@ -7978,9 +8144,6 @@ func (rl *http2clientConnReadLoop) processHeaders(f *http2MetaHeadersFrame) erro
// (nil, nil) special case. See handleResponse docs.
return nil
}
- if res.Body != http2noBody {
- rl.activeRes[cs.ID] = cs
- }
cs.resTrailer = &res.Trailer
cs.resc <- http2resAndError{res: res}
return nil
@@ -8000,11 +8163,11 @@ func (rl *http2clientConnReadLoop) handleResponse(cs *http2clientStream, f *http
status := f.PseudoValue("status")
if status == "" {
- return nil, errors.New("missing status pseudo header")
+ return nil, errors.New("malformed response from server: missing status pseudo header")
}
statusCode, err := strconv.Atoi(status)
if err != nil {
- return nil, errors.New("malformed non-numeric status pseudo header")
+ return nil, errors.New("malformed response from server: malformed non-numeric status pseudo header")
}
if statusCode == 100 {
@@ -8202,6 +8365,7 @@ func (b http2transportResponseBody) Close() error {
}
cs.bufPipe.BreakWithError(http2errClosedResponseBody)
+ cc.forgetStreamID(cs.ID)
return nil
}
@@ -8236,7 +8400,23 @@ func (rl *http2clientConnReadLoop) processData(f *http2DataFrame) error {
}
return nil
}
+ if !cs.firstByte {
+ cc.logf("protocol error: received DATA before a HEADERS frame")
+ rl.endStreamError(cs, http2StreamError{
+ StreamID: f.StreamID,
+ Code: http2ErrCodeProtocol,
+ })
+ return nil
+ }
if f.Length > 0 {
+ if cs.req.Method == "HEAD" && len(data) > 0 {
+ cc.logf("protocol error: received DATA on a HEAD request")
+ rl.endStreamError(cs, http2StreamError{
+ StreamID: f.StreamID,
+ Code: http2ErrCodeProtocol,
+ })
+ return nil
+ }
// Check connection-level flow control.
cc.mu.Lock()
if cs.inflow.available() >= int32(f.Length) {
@@ -8298,11 +8478,10 @@ func (rl *http2clientConnReadLoop) endStreamError(cs *http2clientStream, err err
err = io.EOF
code = cs.copyTrailers
}
- cs.bufPipe.closeWithErrorAndCode(err, code)
- delete(rl.activeRes, cs.ID)
if http2isConnectionCloseRequest(cs.req) {
rl.closeWhenIdle = true
}
+ cs.bufPipe.closeWithErrorAndCode(err, code)
select {
case cs.resc <- http2resAndError{err: err}:
@@ -8350,6 +8529,8 @@ func (rl *http2clientConnReadLoop) processSettings(f *http2SettingsFrame) error
cc.maxFrameSize = s.Val
case http2SettingMaxConcurrentStreams:
cc.maxConcurrentStreams = s.Val
+ case http2SettingMaxHeaderListSize:
+ cc.peerMaxHeaderListSize = uint64(s.Val)
case http2SettingInitialWindowSize:
// Values above the maximum flow-control
// window size of 2^31-1 MUST be treated as a
@@ -8427,7 +8608,6 @@ func (rl *http2clientConnReadLoop) processResetStream(f *http2RSTStreamFrame) er
cs.bufPipe.CloseWithError(err)
cs.cc.cond.Broadcast() // wake up checkResetOrDone via clientStream.awaitFlowControl
}
- delete(rl.activeRes, cs.ID)
return nil
}
@@ -8516,6 +8696,7 @@ func (cc *http2ClientConn) writeStreamReset(streamID uint32, code http2ErrCode,
var (
http2errResponseHeaderListSize = errors.New("http2: response header list larger than advertised limit")
+ http2errRequestHeaderListSize = errors.New("http2: request header list larger than peer's advertised limit")
http2errPseudoTrailers = errors.New("http2: invalid pseudo header in trailers")
)
@@ -8741,11 +8922,7 @@ type http2writeGoAway struct {
func (p *http2writeGoAway) writeFrame(ctx http2writeContext) error {
err := ctx.Framer().WriteGoAway(p.maxStreamID, p.code, nil)
- if p.code != 0 {
- ctx.Flush() // ignore error: we're hanging up on them anyway
- time.Sleep(50 * time.Millisecond)
- ctx.CloseConn()
- }
+ ctx.Flush() // ignore error: we're hanging up on them anyway
return err
}