*: revendor and regenerate protobuf files
This commit is contained in:
52
vendor/google.golang.org/grpc/transport/control.go
generated
vendored
52
vendor/google.golang.org/grpc/transport/control.go
generated
vendored
@@ -35,7 +35,9 @@ package transport
|
||||
|
||||
import (
|
||||
"fmt"
|
||||
"math"
|
||||
"sync"
|
||||
"time"
|
||||
|
||||
"golang.org/x/net/http2"
|
||||
)
|
||||
@@ -44,8 +46,18 @@ const (
|
||||
// The default value of flow control window size in HTTP2 spec.
|
||||
defaultWindowSize = 65535
|
||||
// The initial window size for flow control.
|
||||
initialWindowSize = defaultWindowSize // for an RPC
|
||||
initialConnWindowSize = defaultWindowSize * 16 // for a connection
|
||||
initialWindowSize = defaultWindowSize // for an RPC
|
||||
initialConnWindowSize = defaultWindowSize * 16 // for a connection
|
||||
infinity = time.Duration(math.MaxInt64)
|
||||
defaultClientKeepaliveTime = infinity
|
||||
defaultClientKeepaliveTimeout = time.Duration(20 * time.Second)
|
||||
defaultMaxStreamsClient = 100
|
||||
defaultMaxConnectionIdle = infinity
|
||||
defaultMaxConnectionAge = infinity
|
||||
defaultMaxConnectionAgeGrace = infinity
|
||||
defaultServerKeepaliveTime = time.Duration(2 * time.Hour)
|
||||
defaultServerKeepaliveTimeout = time.Duration(20 * time.Second)
|
||||
defaultKeepalivePolicyMinTime = time.Duration(5 * time.Minute)
|
||||
)
|
||||
|
||||
// The following defines various control items which could flow through
|
||||
@@ -73,6 +85,8 @@ type resetStream struct {
|
||||
func (*resetStream) item() {}
|
||||
|
||||
type goAway struct {
|
||||
code http2.ErrCode
|
||||
debugData []byte
|
||||
}
|
||||
|
||||
func (*goAway) item() {}
|
||||
@@ -111,35 +125,9 @@ func newQuotaPool(q int) *quotaPool {
|
||||
return qb
|
||||
}
|
||||
|
||||
// add adds n to the available quota and tries to send it on acquire.
|
||||
func (qb *quotaPool) add(n int) {
|
||||
qb.mu.Lock()
|
||||
defer qb.mu.Unlock()
|
||||
qb.quota += n
|
||||
if qb.quota <= 0 {
|
||||
return
|
||||
}
|
||||
select {
|
||||
case qb.c <- qb.quota:
|
||||
qb.quota = 0
|
||||
default:
|
||||
}
|
||||
}
|
||||
|
||||
// cancel cancels the pending quota sent on acquire, if any.
|
||||
func (qb *quotaPool) cancel() {
|
||||
qb.mu.Lock()
|
||||
defer qb.mu.Unlock()
|
||||
select {
|
||||
case n := <-qb.c:
|
||||
qb.quota += n
|
||||
default:
|
||||
}
|
||||
}
|
||||
|
||||
// reset cancels the pending quota sent on acquired, incremented by v and sends
|
||||
// add cancels the pending quota sent on acquired, incremented by v and sends
|
||||
// it back on acquire.
|
||||
func (qb *quotaPool) reset(v int) {
|
||||
func (qb *quotaPool) add(v int) {
|
||||
qb.mu.Lock()
|
||||
defer qb.mu.Unlock()
|
||||
select {
|
||||
@@ -151,6 +139,10 @@ func (qb *quotaPool) reset(v int) {
|
||||
if qb.quota <= 0 {
|
||||
return
|
||||
}
|
||||
// After the pool has been created, this is the only place that sends on
|
||||
// the channel. Since mu is held at this point and any quota that was sent
|
||||
// on the channel has been retrieved, we know that this code will always
|
||||
// place any positive quota value on the channel.
|
||||
select {
|
||||
case qb.c <- qb.quota:
|
||||
qb.quota = 0
|
||||
|
17
vendor/google.golang.org/grpc/transport/handler_server.go
generated
vendored
17
vendor/google.golang.org/grpc/transport/handler_server.go
generated
vendored
@@ -53,6 +53,7 @@ import (
|
||||
"google.golang.org/grpc/credentials"
|
||||
"google.golang.org/grpc/metadata"
|
||||
"google.golang.org/grpc/peer"
|
||||
"google.golang.org/grpc/status"
|
||||
)
|
||||
|
||||
// NewServerHandlerTransport returns a ServerTransport handling gRPC
|
||||
@@ -182,7 +183,7 @@ func (ht *serverHandlerTransport) do(fn func()) error {
|
||||
}
|
||||
}
|
||||
|
||||
func (ht *serverHandlerTransport) WriteStatus(s *Stream, statusCode codes.Code, statusDesc string) error {
|
||||
func (ht *serverHandlerTransport) WriteStatus(s *Stream, st *status.Status) error {
|
||||
err := ht.do(func() {
|
||||
ht.writeCommonHeaders(s)
|
||||
|
||||
@@ -192,10 +193,13 @@ func (ht *serverHandlerTransport) WriteStatus(s *Stream, statusCode codes.Code,
|
||||
ht.rw.(http.Flusher).Flush()
|
||||
|
||||
h := ht.rw.Header()
|
||||
h.Set("Grpc-Status", fmt.Sprintf("%d", statusCode))
|
||||
if statusDesc != "" {
|
||||
h.Set("Grpc-Message", encodeGrpcMessage(statusDesc))
|
||||
h.Set("Grpc-Status", fmt.Sprintf("%d", st.Code()))
|
||||
if m := st.Message(); m != "" {
|
||||
h.Set("Grpc-Message", encodeGrpcMessage(m))
|
||||
}
|
||||
|
||||
// TODO: Support Grpc-Status-Details-Bin
|
||||
|
||||
if md := s.Trailer(); len(md) > 0 {
|
||||
for k, vv := range md {
|
||||
// Clients don't tolerate reading restricted headers after some non restricted ones were sent.
|
||||
@@ -234,6 +238,7 @@ func (ht *serverHandlerTransport) writeCommonHeaders(s *Stream) {
|
||||
// and https://golang.org/pkg/net/http/#example_ResponseWriter_trailers
|
||||
h.Add("Trailer", "Grpc-Status")
|
||||
h.Add("Trailer", "Grpc-Message")
|
||||
// TODO: Support Grpc-Status-Details-Bin
|
||||
|
||||
if s.sendCompress != "" {
|
||||
h.Set("Grpc-Encoding", s.sendCompress)
|
||||
@@ -268,7 +273,7 @@ func (ht *serverHandlerTransport) WriteHeader(s *Stream, md metadata.MD) error {
|
||||
})
|
||||
}
|
||||
|
||||
func (ht *serverHandlerTransport) HandleStreams(startStream func(*Stream)) {
|
||||
func (ht *serverHandlerTransport) HandleStreams(startStream func(*Stream), traceCtx func(context.Context, string) context.Context) {
|
||||
// With this transport type there will be exactly 1 stream: this HTTP request.
|
||||
|
||||
var ctx context.Context
|
||||
@@ -314,7 +319,7 @@ func (ht *serverHandlerTransport) HandleStreams(startStream func(*Stream)) {
|
||||
if req.TLS != nil {
|
||||
pr.AuthInfo = credentials.TLSInfo{State: *req.TLS}
|
||||
}
|
||||
ctx = metadata.NewContext(ctx, ht.headerMD)
|
||||
ctx = metadata.NewIncomingContext(ctx, ht.headerMD)
|
||||
ctx = peer.NewContext(ctx, pr)
|
||||
s.ctx = newContextWithStream(ctx, s)
|
||||
s.dec = &recvBufferReader{ctx: s.ctx, recv: s.buf}
|
||||
|
386
vendor/google.golang.org/grpc/transport/http2_client.go
generated
vendored
386
vendor/google.golang.org/grpc/transport/http2_client.go
generated
vendored
@@ -35,12 +35,12 @@ package transport
|
||||
|
||||
import (
|
||||
"bytes"
|
||||
"fmt"
|
||||
"io"
|
||||
"math"
|
||||
"net"
|
||||
"strings"
|
||||
"sync"
|
||||
"sync/atomic"
|
||||
"time"
|
||||
|
||||
"golang.org/x/net/context"
|
||||
@@ -49,17 +49,24 @@ import (
|
||||
"google.golang.org/grpc/codes"
|
||||
"google.golang.org/grpc/credentials"
|
||||
"google.golang.org/grpc/grpclog"
|
||||
"google.golang.org/grpc/keepalive"
|
||||
"google.golang.org/grpc/metadata"
|
||||
"google.golang.org/grpc/peer"
|
||||
"google.golang.org/grpc/stats"
|
||||
"google.golang.org/grpc/status"
|
||||
)
|
||||
|
||||
// http2Client implements the ClientTransport interface with HTTP2.
|
||||
type http2Client struct {
|
||||
target string // server name/addr
|
||||
userAgent string
|
||||
conn net.Conn // underlying communication channel
|
||||
authInfo credentials.AuthInfo // auth info about the connection
|
||||
nextID uint32 // the next stream ID to be used
|
||||
ctx context.Context
|
||||
target string // server name/addr
|
||||
userAgent string
|
||||
md interface{}
|
||||
conn net.Conn // underlying communication channel
|
||||
remoteAddr net.Addr
|
||||
localAddr net.Addr
|
||||
authInfo credentials.AuthInfo // auth info about the connection
|
||||
nextID uint32 // the next stream ID to be used
|
||||
|
||||
// writableChan synchronizes write access to the transport.
|
||||
// A writer acquires the write lock by sending a value on writableChan
|
||||
@@ -75,6 +82,8 @@ type http2Client struct {
|
||||
// goAway is closed to notify the upper layer (i.e., addrConn.transportMonitor)
|
||||
// that the server sent GoAway on this transport.
|
||||
goAway chan struct{}
|
||||
// awakenKeepalive is used to wake up keepalive when after it has gone dormant.
|
||||
awakenKeepalive chan struct{}
|
||||
|
||||
framer *framer
|
||||
hBuf *bytes.Buffer // the buffer for HPACK encoding
|
||||
@@ -94,6 +103,13 @@ type http2Client struct {
|
||||
|
||||
creds []credentials.PerRPCCredentials
|
||||
|
||||
// Boolean to keep track of reading activity on transport.
|
||||
// 1 is true and 0 is false.
|
||||
activity uint32 // Accessed atomically.
|
||||
kp keepalive.ClientParameters
|
||||
|
||||
statsHandler stats.Handler
|
||||
|
||||
mu sync.Mutex // guard the following variables
|
||||
state transportState // the state of underlying connection
|
||||
activeStreams map[uint32]*Stream
|
||||
@@ -105,9 +121,12 @@ type http2Client struct {
|
||||
goAwayID uint32
|
||||
// prevGoAway ID records the Last-Stream-ID in the previous GOAway frame.
|
||||
prevGoAwayID uint32
|
||||
// goAwayReason records the http2.ErrCode and debug data received with the
|
||||
// GoAway frame.
|
||||
goAwayReason GoAwayReason
|
||||
}
|
||||
|
||||
func dial(fn func(context.Context, string) (net.Conn, error), ctx context.Context, addr string) (net.Conn, error) {
|
||||
func dial(ctx context.Context, fn func(context.Context, string) (net.Conn, error), addr string) (net.Conn, error) {
|
||||
if fn != nil {
|
||||
return fn(ctx, addr)
|
||||
}
|
||||
@@ -145,10 +164,13 @@ func isTemporary(err error) bool {
|
||||
// newHTTP2Client constructs a connected ClientTransport to addr based on HTTP2
|
||||
// and starts to receive messages on it. Non-nil error returns if construction
|
||||
// fails.
|
||||
func newHTTP2Client(ctx context.Context, addr string, opts ConnectOptions) (_ ClientTransport, err error) {
|
||||
func newHTTP2Client(ctx context.Context, addr TargetInfo, opts ConnectOptions) (_ ClientTransport, err error) {
|
||||
scheme := "http"
|
||||
conn, err := dial(opts.Dialer, ctx, addr)
|
||||
conn, err := dial(ctx, opts.Dialer, addr.Addr)
|
||||
if err != nil {
|
||||
if opts.FailOnNonTempDialError {
|
||||
return nil, connectionErrorf(isTemporary(err), err, "transport: %v", err)
|
||||
}
|
||||
return nil, connectionErrorf(true, err, "transport: %v", err)
|
||||
}
|
||||
// Any further errors will close the underlying connection
|
||||
@@ -160,7 +182,7 @@ func newHTTP2Client(ctx context.Context, addr string, opts ConnectOptions) (_ Cl
|
||||
var authInfo credentials.AuthInfo
|
||||
if creds := opts.TransportCredentials; creds != nil {
|
||||
scheme = "https"
|
||||
conn, authInfo, err = creds.ClientHandshake(ctx, addr, conn)
|
||||
conn, authInfo, err = creds.ClientHandshake(ctx, addr.Addr, conn)
|
||||
if err != nil {
|
||||
// Credentials handshake errors are typically considered permanent
|
||||
// to avoid retrying on e.g. bad certificates.
|
||||
@@ -168,22 +190,31 @@ func newHTTP2Client(ctx context.Context, addr string, opts ConnectOptions) (_ Cl
|
||||
return nil, connectionErrorf(temp, err, "transport: %v", err)
|
||||
}
|
||||
}
|
||||
ua := primaryUA
|
||||
if opts.UserAgent != "" {
|
||||
ua = opts.UserAgent + " " + ua
|
||||
kp := opts.KeepaliveParams
|
||||
// Validate keepalive parameters.
|
||||
if kp.Time == 0 {
|
||||
kp.Time = defaultClientKeepaliveTime
|
||||
}
|
||||
if kp.Timeout == 0 {
|
||||
kp.Timeout = defaultClientKeepaliveTimeout
|
||||
}
|
||||
var buf bytes.Buffer
|
||||
t := &http2Client{
|
||||
target: addr,
|
||||
userAgent: ua,
|
||||
conn: conn,
|
||||
authInfo: authInfo,
|
||||
ctx: ctx,
|
||||
target: addr.Addr,
|
||||
userAgent: opts.UserAgent,
|
||||
md: addr.Metadata,
|
||||
conn: conn,
|
||||
remoteAddr: conn.RemoteAddr(),
|
||||
localAddr: conn.LocalAddr(),
|
||||
authInfo: authInfo,
|
||||
// The client initiated stream id is odd starting from 1.
|
||||
nextID: 1,
|
||||
writableChan: make(chan int, 1),
|
||||
shutdownChan: make(chan struct{}),
|
||||
errorChan: make(chan struct{}),
|
||||
goAway: make(chan struct{}),
|
||||
awakenKeepalive: make(chan struct{}, 1),
|
||||
framer: newFramer(conn),
|
||||
hBuf: &buf,
|
||||
hEnc: hpack.NewEncoder(&buf),
|
||||
@@ -194,8 +225,24 @@ func newHTTP2Client(ctx context.Context, addr string, opts ConnectOptions) (_ Cl
|
||||
state: reachable,
|
||||
activeStreams: make(map[uint32]*Stream),
|
||||
creds: opts.PerRPCCredentials,
|
||||
maxStreams: math.MaxInt32,
|
||||
maxStreams: defaultMaxStreamsClient,
|
||||
streamsQuota: newQuotaPool(defaultMaxStreamsClient),
|
||||
streamSendQuota: defaultWindowSize,
|
||||
kp: kp,
|
||||
statsHandler: opts.StatsHandler,
|
||||
}
|
||||
// Make sure awakenKeepalive can't be written upon.
|
||||
// keepalive routine will make it writable, if need be.
|
||||
t.awakenKeepalive <- struct{}{}
|
||||
if t.statsHandler != nil {
|
||||
t.ctx = t.statsHandler.TagConn(t.ctx, &stats.ConnTagInfo{
|
||||
RemoteAddr: t.remoteAddr,
|
||||
LocalAddr: t.localAddr,
|
||||
})
|
||||
connBegin := &stats.ConnBegin{
|
||||
Client: true,
|
||||
}
|
||||
t.statsHandler.HandleConn(t.ctx, connBegin)
|
||||
}
|
||||
// Start the reader goroutine for incoming message. Each transport has
|
||||
// a dedicated goroutine which reads HTTP2 frame from network. Then it
|
||||
@@ -231,6 +278,9 @@ func newHTTP2Client(ctx context.Context, addr string, opts ConnectOptions) (_ Cl
|
||||
}
|
||||
}
|
||||
go t.controller()
|
||||
if t.kp.Time != infinity {
|
||||
go t.keepalive()
|
||||
}
|
||||
t.writableChan <- 0
|
||||
return t, nil
|
||||
}
|
||||
@@ -264,16 +314,17 @@ func (t *http2Client) newStream(ctx context.Context, callHdr *CallHdr) *Stream {
|
||||
return s
|
||||
}
|
||||
|
||||
// NewStream creates a stream and register it into the transport as "active"
|
||||
// NewStream creates a stream and registers it into the transport as "active"
|
||||
// streams.
|
||||
func (t *http2Client) NewStream(ctx context.Context, callHdr *CallHdr) (_ *Stream, err error) {
|
||||
pr := &peer.Peer{
|
||||
Addr: t.conn.RemoteAddr(),
|
||||
Addr: t.remoteAddr,
|
||||
}
|
||||
// Attach Auth info if there is any.
|
||||
if t.authInfo != nil {
|
||||
pr.AuthInfo = t.authInfo
|
||||
}
|
||||
userCtx := ctx
|
||||
ctx = peer.NewContext(ctx, pr)
|
||||
authData := make(map[string]string)
|
||||
for _, c := range t.creds {
|
||||
@@ -311,21 +362,18 @@ func (t *http2Client) NewStream(ctx context.Context, callHdr *CallHdr) (_ *Strea
|
||||
t.mu.Unlock()
|
||||
return nil, ErrConnClosing
|
||||
}
|
||||
checkStreamsQuota := t.streamsQuota != nil
|
||||
t.mu.Unlock()
|
||||
if checkStreamsQuota {
|
||||
sq, err := wait(ctx, nil, nil, t.shutdownChan, t.streamsQuota.acquire())
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
// Returns the quota balance back.
|
||||
if sq > 1 {
|
||||
t.streamsQuota.add(sq - 1)
|
||||
}
|
||||
sq, err := wait(ctx, nil, nil, t.shutdownChan, t.streamsQuota.acquire())
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
// Returns the quota balance back.
|
||||
if sq > 1 {
|
||||
t.streamsQuota.add(sq - 1)
|
||||
}
|
||||
if _, err := wait(ctx, nil, nil, t.shutdownChan, t.writableChan); err != nil {
|
||||
// Return the quota back now because there is no stream returned to the caller.
|
||||
if _, ok := err.(StreamError); ok && checkStreamsQuota {
|
||||
if _, ok := err.(StreamError); ok {
|
||||
t.streamsQuota.add(1)
|
||||
}
|
||||
return nil, err
|
||||
@@ -333,9 +381,7 @@ func (t *http2Client) NewStream(ctx context.Context, callHdr *CallHdr) (_ *Strea
|
||||
t.mu.Lock()
|
||||
if t.state == draining {
|
||||
t.mu.Unlock()
|
||||
if checkStreamsQuota {
|
||||
t.streamsQuota.add(1)
|
||||
}
|
||||
t.streamsQuota.add(1)
|
||||
// Need to make t writable again so that the rpc in flight can still proceed.
|
||||
t.writableChan <- 0
|
||||
return nil, ErrStreamDrain
|
||||
@@ -345,18 +391,19 @@ func (t *http2Client) NewStream(ctx context.Context, callHdr *CallHdr) (_ *Strea
|
||||
return nil, ErrConnClosing
|
||||
}
|
||||
s := t.newStream(ctx, callHdr)
|
||||
s.clientStatsCtx = userCtx
|
||||
t.activeStreams[s.id] = s
|
||||
// If the number of active streams change from 0 to 1, then check if keepalive
|
||||
// has gone dormant. If so, wake it up.
|
||||
if len(t.activeStreams) == 1 {
|
||||
select {
|
||||
case t.awakenKeepalive <- struct{}{}:
|
||||
t.framer.writePing(false, false, [8]byte{})
|
||||
default:
|
||||
}
|
||||
}
|
||||
|
||||
// This stream is not counted when applySetings(...) initialize t.streamsQuota.
|
||||
// Reset t.streamsQuota to the right value.
|
||||
var reset bool
|
||||
if !checkStreamsQuota && t.streamsQuota != nil {
|
||||
reset = true
|
||||
}
|
||||
t.mu.Unlock()
|
||||
if reset {
|
||||
t.streamsQuota.reset(-1)
|
||||
}
|
||||
|
||||
// HPACK encodes various headers. Note that once WriteField(...) is
|
||||
// called, the corresponding headers/continuation frame has to be sent
|
||||
@@ -388,7 +435,7 @@ func (t *http2Client) NewStream(ctx context.Context, callHdr *CallHdr) (_ *Strea
|
||||
hasMD bool
|
||||
endHeaders bool
|
||||
)
|
||||
if md, ok := metadata.FromContext(ctx); ok {
|
||||
if md, ok := metadata.FromOutgoingContext(ctx); ok {
|
||||
hasMD = true
|
||||
for k, v := range md {
|
||||
// HTTP doesn't allow you to set pseudoheaders after non pseudoheaders were set.
|
||||
@@ -400,7 +447,18 @@ func (t *http2Client) NewStream(ctx context.Context, callHdr *CallHdr) (_ *Strea
|
||||
}
|
||||
}
|
||||
}
|
||||
if md, ok := t.md.(*metadata.MD); ok {
|
||||
for k, v := range *md {
|
||||
if isReservedHeader(k) {
|
||||
continue
|
||||
}
|
||||
for _, entry := range v {
|
||||
t.hEnc.WriteField(hpack.HeaderField{Name: k, Value: entry})
|
||||
}
|
||||
}
|
||||
}
|
||||
first := true
|
||||
bufLen := t.hBuf.Len()
|
||||
// Sends the headers in a single batch even when they span multiple frames.
|
||||
for !endHeaders {
|
||||
size := t.hBuf.Len()
|
||||
@@ -435,6 +493,17 @@ func (t *http2Client) NewStream(ctx context.Context, callHdr *CallHdr) (_ *Strea
|
||||
return nil, connectionErrorf(true, err, "transport: %v", err)
|
||||
}
|
||||
}
|
||||
if t.statsHandler != nil {
|
||||
outHeader := &stats.OutHeader{
|
||||
Client: true,
|
||||
WireLength: bufLen,
|
||||
FullMethod: callHdr.Method,
|
||||
RemoteAddr: t.remoteAddr,
|
||||
LocalAddr: t.localAddr,
|
||||
Compression: callHdr.SendCompress,
|
||||
}
|
||||
t.statsHandler.HandleRPC(s.clientStatsCtx, outHeader)
|
||||
}
|
||||
t.writableChan <- 0
|
||||
return s, nil
|
||||
}
|
||||
@@ -442,15 +511,11 @@ func (t *http2Client) NewStream(ctx context.Context, callHdr *CallHdr) (_ *Strea
|
||||
// CloseStream clears the footprint of a stream when the stream is not needed any more.
|
||||
// This must not be executed in reader's goroutine.
|
||||
func (t *http2Client) CloseStream(s *Stream, err error) {
|
||||
var updateStreams bool
|
||||
t.mu.Lock()
|
||||
if t.activeStreams == nil {
|
||||
t.mu.Unlock()
|
||||
return
|
||||
}
|
||||
if t.streamsQuota != nil {
|
||||
updateStreams = true
|
||||
}
|
||||
delete(t.activeStreams, s.id)
|
||||
if t.state == draining && len(t.activeStreams) == 0 {
|
||||
// The transport is draining and s is the last live stream on t.
|
||||
@@ -459,10 +524,27 @@ func (t *http2Client) CloseStream(s *Stream, err error) {
|
||||
return
|
||||
}
|
||||
t.mu.Unlock()
|
||||
if updateStreams {
|
||||
t.streamsQuota.add(1)
|
||||
}
|
||||
// rstStream is true in case the stream is being closed at the client-side
|
||||
// and the server needs to be intimated about it by sending a RST_STREAM
|
||||
// frame.
|
||||
// To make sure this frame is written to the wire before the headers of the
|
||||
// next stream waiting for streamsQuota, we add to streamsQuota pool only
|
||||
// after having acquired the writableChan to send RST_STREAM out (look at
|
||||
// the controller() routine).
|
||||
var rstStream bool
|
||||
var rstError http2.ErrCode
|
||||
defer func() {
|
||||
// In case, the client doesn't have to send RST_STREAM to server
|
||||
// we can safely add back to streamsQuota pool now.
|
||||
if !rstStream {
|
||||
t.streamsQuota.add(1)
|
||||
return
|
||||
}
|
||||
t.controlBuf.put(&resetStream{s.id, rstError})
|
||||
}()
|
||||
s.mu.Lock()
|
||||
rstStream = s.rstStream
|
||||
rstError = s.rstError
|
||||
if q := s.fc.resetPendingData(); q > 0 {
|
||||
if n := t.fc.onRead(q); n > 0 {
|
||||
t.controlBuf.put(&windowUpdate{0, n})
|
||||
@@ -478,8 +560,9 @@ func (t *http2Client) CloseStream(s *Stream, err error) {
|
||||
}
|
||||
s.state = streamDone
|
||||
s.mu.Unlock()
|
||||
if se, ok := err.(StreamError); ok && se.Code != codes.DeadlineExceeded {
|
||||
t.controlBuf.put(&resetStream{s.id, http2.ErrCodeCancel})
|
||||
if _, ok := err.(StreamError); ok {
|
||||
rstStream = true
|
||||
rstError = http2.ErrCodeCancel
|
||||
}
|
||||
}
|
||||
|
||||
@@ -513,6 +596,12 @@ func (t *http2Client) Close() (err error) {
|
||||
s.mu.Unlock()
|
||||
s.write(recvMsg{err: ErrConnClosing})
|
||||
}
|
||||
if t.statsHandler != nil {
|
||||
connEnd := &stats.ConnEnd{
|
||||
Client: true,
|
||||
}
|
||||
t.statsHandler.HandleConn(t.ctx, connEnd)
|
||||
}
|
||||
return
|
||||
}
|
||||
|
||||
@@ -570,19 +659,14 @@ func (t *http2Client) Write(s *Stream, data []byte, opts *Options) error {
|
||||
var p []byte
|
||||
if r.Len() > 0 {
|
||||
size := http2MaxFrameLen
|
||||
s.sendQuotaPool.add(0)
|
||||
// Wait until the stream has some quota to send the data.
|
||||
sq, err := wait(s.ctx, s.done, s.goAway, t.shutdownChan, s.sendQuotaPool.acquire())
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
t.sendQuotaPool.add(0)
|
||||
// Wait until the transport has some quota to send the data.
|
||||
tq, err := wait(s.ctx, s.done, s.goAway, t.shutdownChan, t.sendQuotaPool.acquire())
|
||||
if err != nil {
|
||||
if _, ok := err.(StreamError); ok || err == io.EOF {
|
||||
t.sendQuotaPool.cancel()
|
||||
}
|
||||
return err
|
||||
}
|
||||
if sq < size {
|
||||
@@ -692,7 +776,7 @@ func (t *http2Client) updateWindow(s *Stream, n uint32) {
|
||||
}
|
||||
|
||||
func (t *http2Client) handleData(f *http2.DataFrame) {
|
||||
size := len(f.Data())
|
||||
size := f.Header().Length
|
||||
if err := t.fc.onData(uint32(size)); err != nil {
|
||||
t.notifyError(connectionErrorf(true, err, "%v", err))
|
||||
return
|
||||
@@ -706,6 +790,11 @@ func (t *http2Client) handleData(f *http2.DataFrame) {
|
||||
return
|
||||
}
|
||||
if size > 0 {
|
||||
if f.Header().Flags.Has(http2.FlagDataPadded) {
|
||||
if w := t.fc.onRead(uint32(size) - uint32(len(f.Data()))); w > 0 {
|
||||
t.controlBuf.put(&windowUpdate{0, w})
|
||||
}
|
||||
}
|
||||
s.mu.Lock()
|
||||
if s.state == streamDone {
|
||||
s.mu.Unlock()
|
||||
@@ -716,22 +805,27 @@ func (t *http2Client) handleData(f *http2.DataFrame) {
|
||||
return
|
||||
}
|
||||
if err := s.fc.onData(uint32(size)); err != nil {
|
||||
s.state = streamDone
|
||||
s.statusCode = codes.Internal
|
||||
s.statusDesc = err.Error()
|
||||
close(s.done)
|
||||
s.rstStream = true
|
||||
s.rstError = http2.ErrCodeFlowControl
|
||||
s.finish(status.New(codes.Internal, err.Error()))
|
||||
s.mu.Unlock()
|
||||
s.write(recvMsg{err: io.EOF})
|
||||
t.controlBuf.put(&resetStream{s.id, http2.ErrCodeFlowControl})
|
||||
return
|
||||
}
|
||||
if f.Header().Flags.Has(http2.FlagDataPadded) {
|
||||
if w := s.fc.onRead(uint32(size) - uint32(len(f.Data()))); w > 0 {
|
||||
t.controlBuf.put(&windowUpdate{s.id, w})
|
||||
}
|
||||
}
|
||||
s.mu.Unlock()
|
||||
// TODO(bradfitz, zhaoq): A copy is required here because there is no
|
||||
// guarantee f.Data() is consumed before the arrival of next frame.
|
||||
// Can this copy be eliminated?
|
||||
data := make([]byte, size)
|
||||
copy(data, f.Data())
|
||||
s.write(recvMsg{data: data})
|
||||
if len(f.Data()) > 0 {
|
||||
data := make([]byte, len(f.Data()))
|
||||
copy(data, f.Data())
|
||||
s.write(recvMsg{data: data})
|
||||
}
|
||||
}
|
||||
// The server has closed the stream without sending trailers. Record that
|
||||
// the read direction is closed, and set the status appropriately.
|
||||
@@ -741,10 +835,7 @@ func (t *http2Client) handleData(f *http2.DataFrame) {
|
||||
s.mu.Unlock()
|
||||
return
|
||||
}
|
||||
s.state = streamDone
|
||||
s.statusCode = codes.Internal
|
||||
s.statusDesc = "server closed the stream without sending trailers"
|
||||
close(s.done)
|
||||
s.finish(status.New(codes.Internal, "server closed the stream without sending trailers"))
|
||||
s.mu.Unlock()
|
||||
s.write(recvMsg{err: io.EOF})
|
||||
}
|
||||
@@ -760,18 +851,16 @@ func (t *http2Client) handleRSTStream(f *http2.RSTStreamFrame) {
|
||||
s.mu.Unlock()
|
||||
return
|
||||
}
|
||||
s.state = streamDone
|
||||
if !s.headerDone {
|
||||
close(s.headerChan)
|
||||
s.headerDone = true
|
||||
}
|
||||
s.statusCode, ok = http2ErrConvTab[http2.ErrCode(f.ErrCode)]
|
||||
statusCode, ok := http2ErrConvTab[http2.ErrCode(f.ErrCode)]
|
||||
if !ok {
|
||||
grpclog.Println("transport: http2Client.handleRSTStream found no mapped gRPC status for the received http2 error ", f.ErrCode)
|
||||
s.statusCode = codes.Unknown
|
||||
statusCode = codes.Unknown
|
||||
}
|
||||
s.statusDesc = fmt.Sprintf("stream terminated by RST_STREAM with error code: %d", f.ErrCode)
|
||||
close(s.done)
|
||||
s.finish(status.Newf(statusCode, "stream terminated by RST_STREAM with error code: %d", f.ErrCode))
|
||||
s.mu.Unlock()
|
||||
s.write(recvMsg{err: io.EOF})
|
||||
}
|
||||
@@ -790,12 +879,18 @@ func (t *http2Client) handleSettings(f *http2.SettingsFrame) {
|
||||
}
|
||||
|
||||
func (t *http2Client) handlePing(f *http2.PingFrame) {
|
||||
if f.IsAck() { // Do nothing.
|
||||
return
|
||||
}
|
||||
pingAck := &ping{ack: true}
|
||||
copy(pingAck.data[:], f.Data[:])
|
||||
t.controlBuf.put(pingAck)
|
||||
}
|
||||
|
||||
func (t *http2Client) handleGoAway(f *http2.GoAwayFrame) {
|
||||
if f.ErrCode == http2.ErrCodeEnhanceYourCalm {
|
||||
grpclog.Printf("Client received GoAway with http2.ErrCodeEnhanceYourCalm.")
|
||||
}
|
||||
t.mu.Lock()
|
||||
if t.state == reachable || t.state == draining {
|
||||
if f.LastStreamID > 0 && f.LastStreamID%2 != 1 {
|
||||
@@ -817,6 +912,7 @@ func (t *http2Client) handleGoAway(f *http2.GoAwayFrame) {
|
||||
t.mu.Unlock()
|
||||
return
|
||||
default:
|
||||
t.setGoAwayReason(f)
|
||||
}
|
||||
t.goAwayID = f.LastStreamID
|
||||
close(t.goAway)
|
||||
@@ -824,6 +920,26 @@ func (t *http2Client) handleGoAway(f *http2.GoAwayFrame) {
|
||||
t.mu.Unlock()
|
||||
}
|
||||
|
||||
// setGoAwayReason sets the value of t.goAwayReason based
|
||||
// on the GoAway frame received.
|
||||
// It expects a lock on transport's mutext to be held by
|
||||
// the caller.
|
||||
func (t *http2Client) setGoAwayReason(f *http2.GoAwayFrame) {
|
||||
t.goAwayReason = NoReason
|
||||
switch f.ErrCode {
|
||||
case http2.ErrCodeEnhanceYourCalm:
|
||||
if string(f.DebugData()) == "too_many_pings" {
|
||||
t.goAwayReason = TooManyPings
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func (t *http2Client) GetGoAwayReason() GoAwayReason {
|
||||
t.mu.Lock()
|
||||
defer t.mu.Unlock()
|
||||
return t.goAwayReason
|
||||
}
|
||||
|
||||
func (t *http2Client) handleWindowUpdate(f *http2.WindowUpdateFrame) {
|
||||
id := f.Header().StreamID
|
||||
incr := f.Increment
|
||||
@@ -844,21 +960,38 @@ func (t *http2Client) operateHeaders(frame *http2.MetaHeadersFrame) {
|
||||
}
|
||||
var state decodeState
|
||||
for _, hf := range frame.Fields {
|
||||
state.processHeaderField(hf)
|
||||
}
|
||||
if state.err != nil {
|
||||
s.mu.Lock()
|
||||
if !s.headerDone {
|
||||
close(s.headerChan)
|
||||
s.headerDone = true
|
||||
if err := state.processHeaderField(hf); err != nil {
|
||||
s.mu.Lock()
|
||||
if !s.headerDone {
|
||||
close(s.headerChan)
|
||||
s.headerDone = true
|
||||
}
|
||||
s.mu.Unlock()
|
||||
s.write(recvMsg{err: err})
|
||||
// Something wrong. Stops reading even when there is remaining.
|
||||
return
|
||||
}
|
||||
s.mu.Unlock()
|
||||
s.write(recvMsg{err: state.err})
|
||||
// Something wrong. Stops reading even when there is remaining.
|
||||
return
|
||||
}
|
||||
|
||||
endStream := frame.StreamEnded()
|
||||
var isHeader bool
|
||||
defer func() {
|
||||
if t.statsHandler != nil {
|
||||
if isHeader {
|
||||
inHeader := &stats.InHeader{
|
||||
Client: true,
|
||||
WireLength: int(frame.Header().Length),
|
||||
}
|
||||
t.statsHandler.HandleRPC(s.clientStatsCtx, inHeader)
|
||||
} else {
|
||||
inTrailer := &stats.InTrailer{
|
||||
Client: true,
|
||||
WireLength: int(frame.Header().Length),
|
||||
}
|
||||
t.statsHandler.HandleRPC(s.clientStatsCtx, inTrailer)
|
||||
}
|
||||
}
|
||||
}()
|
||||
|
||||
s.mu.Lock()
|
||||
if !endStream {
|
||||
@@ -870,6 +1003,7 @@ func (t *http2Client) operateHeaders(frame *http2.MetaHeadersFrame) {
|
||||
}
|
||||
close(s.headerChan)
|
||||
s.headerDone = true
|
||||
isHeader = true
|
||||
}
|
||||
if !endStream || s.state == streamDone {
|
||||
s.mu.Unlock()
|
||||
@@ -879,10 +1013,7 @@ func (t *http2Client) operateHeaders(frame *http2.MetaHeadersFrame) {
|
||||
if len(state.mdata) > 0 {
|
||||
s.trailer = state.mdata
|
||||
}
|
||||
s.statusCode = state.statusCode
|
||||
s.statusDesc = state.statusDesc
|
||||
close(s.done)
|
||||
s.state = streamDone
|
||||
s.finish(state.status())
|
||||
s.mu.Unlock()
|
||||
s.write(recvMsg{err: io.EOF})
|
||||
}
|
||||
@@ -910,6 +1041,7 @@ func (t *http2Client) reader() {
|
||||
t.notifyError(err)
|
||||
return
|
||||
}
|
||||
atomic.CompareAndSwapUint32(&t.activity, 0, 1)
|
||||
sf, ok := frame.(*http2.SettingsFrame)
|
||||
if !ok {
|
||||
t.notifyError(err)
|
||||
@@ -920,6 +1052,7 @@ func (t *http2Client) reader() {
|
||||
// loop to keep reading incoming messages on this transport.
|
||||
for {
|
||||
frame, err := t.framer.readFrame()
|
||||
atomic.CompareAndSwapUint32(&t.activity, 0, 1)
|
||||
if err != nil {
|
||||
// Abort an active stream if the http2.Framer returns a
|
||||
// http2.StreamError. This can happen only if the server's response
|
||||
@@ -971,21 +1104,15 @@ func (t *http2Client) applySettings(ss []http2.Setting) {
|
||||
s.Val = math.MaxInt32
|
||||
}
|
||||
t.mu.Lock()
|
||||
reset := t.streamsQuota != nil
|
||||
if !reset {
|
||||
t.streamsQuota = newQuotaPool(int(s.Val) - len(t.activeStreams))
|
||||
}
|
||||
ms := t.maxStreams
|
||||
t.maxStreams = int(s.Val)
|
||||
t.mu.Unlock()
|
||||
if reset {
|
||||
t.streamsQuota.reset(int(s.Val) - ms)
|
||||
}
|
||||
t.streamsQuota.add(int(s.Val) - ms)
|
||||
case http2.SettingInitialWindowSize:
|
||||
t.mu.Lock()
|
||||
for _, stream := range t.activeStreams {
|
||||
// Adjust the sending quota for each stream.
|
||||
stream.sendQuotaPool.reset(int(s.Val - t.streamSendQuota))
|
||||
stream.sendQuotaPool.add(int(s.Val - t.streamSendQuota))
|
||||
}
|
||||
t.streamSendQuota = s.Val
|
||||
t.mu.Unlock()
|
||||
@@ -1013,6 +1140,12 @@ func (t *http2Client) controller() {
|
||||
t.framer.writeSettings(true, i.ss...)
|
||||
}
|
||||
case *resetStream:
|
||||
// If the server needs to be to intimated about stream closing,
|
||||
// then we need to make sure the RST_STREAM frame is written to
|
||||
// the wire before the headers of the next stream waiting on
|
||||
// streamQuota. We ensure this by adding to the streamsQuota pool
|
||||
// only after having acquired the writableChan to send RST_STREAM.
|
||||
t.streamsQuota.add(1)
|
||||
t.framer.writeRSTStream(true, i.streamID, i.code)
|
||||
case *flushIO:
|
||||
t.framer.flushWrite()
|
||||
@@ -1032,6 +1165,61 @@ func (t *http2Client) controller() {
|
||||
}
|
||||
}
|
||||
|
||||
// keepalive running in a separate goroutune makes sure the connection is alive by sending pings.
|
||||
func (t *http2Client) keepalive() {
|
||||
p := &ping{data: [8]byte{}}
|
||||
timer := time.NewTimer(t.kp.Time)
|
||||
for {
|
||||
select {
|
||||
case <-timer.C:
|
||||
if atomic.CompareAndSwapUint32(&t.activity, 1, 0) {
|
||||
timer.Reset(t.kp.Time)
|
||||
continue
|
||||
}
|
||||
// Check if keepalive should go dormant.
|
||||
t.mu.Lock()
|
||||
if len(t.activeStreams) < 1 && !t.kp.PermitWithoutStream {
|
||||
// Make awakenKeepalive writable.
|
||||
<-t.awakenKeepalive
|
||||
t.mu.Unlock()
|
||||
select {
|
||||
case <-t.awakenKeepalive:
|
||||
// If the control gets here a ping has been sent
|
||||
// need to reset the timer with keepalive.Timeout.
|
||||
case <-t.shutdownChan:
|
||||
return
|
||||
}
|
||||
} else {
|
||||
t.mu.Unlock()
|
||||
// Send ping.
|
||||
t.controlBuf.put(p)
|
||||
}
|
||||
|
||||
// By the time control gets here a ping has been sent one way or the other.
|
||||
timer.Reset(t.kp.Timeout)
|
||||
select {
|
||||
case <-timer.C:
|
||||
if atomic.CompareAndSwapUint32(&t.activity, 1, 0) {
|
||||
timer.Reset(t.kp.Time)
|
||||
continue
|
||||
}
|
||||
t.Close()
|
||||
return
|
||||
case <-t.shutdownChan:
|
||||
if !timer.Stop() {
|
||||
<-timer.C
|
||||
}
|
||||
return
|
||||
}
|
||||
case <-t.shutdownChan:
|
||||
if !timer.Stop() {
|
||||
<-timer.C
|
||||
}
|
||||
return
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func (t *http2Client) Error() <-chan struct{} {
|
||||
return t.errorChan
|
||||
}
|
||||
|
406
vendor/google.golang.org/grpc/transport/http2_server.go
generated
vendored
406
vendor/google.golang.org/grpc/transport/http2_server.go
generated
vendored
@@ -38,18 +38,26 @@ import (
|
||||
"errors"
|
||||
"io"
|
||||
"math"
|
||||
"math/rand"
|
||||
"net"
|
||||
"strconv"
|
||||
"sync"
|
||||
"sync/atomic"
|
||||
"time"
|
||||
|
||||
"github.com/golang/protobuf/proto"
|
||||
"golang.org/x/net/context"
|
||||
"golang.org/x/net/http2"
|
||||
"golang.org/x/net/http2/hpack"
|
||||
"google.golang.org/grpc/codes"
|
||||
"google.golang.org/grpc/credentials"
|
||||
"google.golang.org/grpc/grpclog"
|
||||
"google.golang.org/grpc/keepalive"
|
||||
"google.golang.org/grpc/metadata"
|
||||
"google.golang.org/grpc/peer"
|
||||
"google.golang.org/grpc/stats"
|
||||
"google.golang.org/grpc/status"
|
||||
"google.golang.org/grpc/tap"
|
||||
)
|
||||
|
||||
// ErrIllegalHeaderWrite indicates that setting header is illegal because of
|
||||
@@ -58,9 +66,13 @@ var ErrIllegalHeaderWrite = errors.New("transport: the stream is done or WriteHe
|
||||
|
||||
// http2Server implements the ServerTransport interface with HTTP2.
|
||||
type http2Server struct {
|
||||
ctx context.Context
|
||||
conn net.Conn
|
||||
remoteAddr net.Addr
|
||||
localAddr net.Addr
|
||||
maxStreamID uint32 // max stream ID ever seen
|
||||
authInfo credentials.AuthInfo // auth info about the connection
|
||||
inTapHandle tap.ServerInHandle
|
||||
// writableChan synchronizes write access to the transport.
|
||||
// A writer acquires the write lock by receiving a value on writableChan
|
||||
// and releases it by sending on writableChan.
|
||||
@@ -82,21 +94,46 @@ type http2Server struct {
|
||||
// sendQuotaPool provides flow control to outbound message.
|
||||
sendQuotaPool *quotaPool
|
||||
|
||||
stats stats.Handler
|
||||
|
||||
// Flag to keep track of reading activity on transport.
|
||||
// 1 is true and 0 is false.
|
||||
activity uint32 // Accessed atomically.
|
||||
// Keepalive and max-age parameters for the server.
|
||||
kp keepalive.ServerParameters
|
||||
|
||||
// Keepalive enforcement policy.
|
||||
kep keepalive.EnforcementPolicy
|
||||
// The time instance last ping was received.
|
||||
lastPingAt time.Time
|
||||
// Number of times the client has violated keepalive ping policy so far.
|
||||
pingStrikes uint8
|
||||
// Flag to signify that number of ping strikes should be reset to 0.
|
||||
// This is set whenever data or header frames are sent.
|
||||
// 1 means yes.
|
||||
resetPingStrikes uint32 // Accessed atomically.
|
||||
|
||||
mu sync.Mutex // guard the following
|
||||
state transportState
|
||||
activeStreams map[uint32]*Stream
|
||||
// the per-stream outbound flow control window size set by the peer.
|
||||
streamSendQuota uint32
|
||||
// idle is the time instant when the connection went idle.
|
||||
// This is either the begining of the connection or when the number of
|
||||
// RPCs go down to 0.
|
||||
// When the connection is busy, this value is set to 0.
|
||||
idle time.Time
|
||||
}
|
||||
|
||||
// newHTTP2Server constructs a ServerTransport based on HTTP2. ConnectionError is
|
||||
// returned if something goes wrong.
|
||||
func newHTTP2Server(conn net.Conn, maxStreams uint32, authInfo credentials.AuthInfo) (_ ServerTransport, err error) {
|
||||
func newHTTP2Server(conn net.Conn, config *ServerConfig) (_ ServerTransport, err error) {
|
||||
framer := newFramer(conn)
|
||||
// Send initial settings as connection preface to client.
|
||||
var settings []http2.Setting
|
||||
// TODO(zhaoq): Have a better way to signal "no limit" because 0 is
|
||||
// permitted in the HTTP2 spec.
|
||||
maxStreams := config.MaxStreams
|
||||
if maxStreams == 0 {
|
||||
maxStreams = math.MaxUint32
|
||||
} else {
|
||||
@@ -119,14 +156,40 @@ func newHTTP2Server(conn net.Conn, maxStreams uint32, authInfo credentials.AuthI
|
||||
return nil, connectionErrorf(true, err, "transport: %v", err)
|
||||
}
|
||||
}
|
||||
kp := config.KeepaliveParams
|
||||
if kp.MaxConnectionIdle == 0 {
|
||||
kp.MaxConnectionIdle = defaultMaxConnectionIdle
|
||||
}
|
||||
if kp.MaxConnectionAge == 0 {
|
||||
kp.MaxConnectionAge = defaultMaxConnectionAge
|
||||
}
|
||||
// Add a jitter to MaxConnectionAge.
|
||||
kp.MaxConnectionAge += getJitter(kp.MaxConnectionAge)
|
||||
if kp.MaxConnectionAgeGrace == 0 {
|
||||
kp.MaxConnectionAgeGrace = defaultMaxConnectionAgeGrace
|
||||
}
|
||||
if kp.Time == 0 {
|
||||
kp.Time = defaultServerKeepaliveTime
|
||||
}
|
||||
if kp.Timeout == 0 {
|
||||
kp.Timeout = defaultServerKeepaliveTimeout
|
||||
}
|
||||
kep := config.KeepalivePolicy
|
||||
if kep.MinTime == 0 {
|
||||
kep.MinTime = defaultKeepalivePolicyMinTime
|
||||
}
|
||||
var buf bytes.Buffer
|
||||
t := &http2Server{
|
||||
ctx: context.Background(),
|
||||
conn: conn,
|
||||
authInfo: authInfo,
|
||||
remoteAddr: conn.RemoteAddr(),
|
||||
localAddr: conn.LocalAddr(),
|
||||
authInfo: config.AuthInfo,
|
||||
framer: framer,
|
||||
hBuf: &buf,
|
||||
hEnc: hpack.NewEncoder(&buf),
|
||||
maxStreams: maxStreams,
|
||||
inTapHandle: config.InTapHandle,
|
||||
controlBuf: newRecvBuffer(),
|
||||
fc: &inFlow{limit: initialConnWindowSize},
|
||||
sendQuotaPool: newQuotaPool(defaultWindowSize),
|
||||
@@ -135,14 +198,27 @@ func newHTTP2Server(conn net.Conn, maxStreams uint32, authInfo credentials.AuthI
|
||||
shutdownChan: make(chan struct{}),
|
||||
activeStreams: make(map[uint32]*Stream),
|
||||
streamSendQuota: defaultWindowSize,
|
||||
stats: config.StatsHandler,
|
||||
kp: kp,
|
||||
idle: time.Now(),
|
||||
kep: kep,
|
||||
}
|
||||
if t.stats != nil {
|
||||
t.ctx = t.stats.TagConn(t.ctx, &stats.ConnTagInfo{
|
||||
RemoteAddr: t.remoteAddr,
|
||||
LocalAddr: t.localAddr,
|
||||
})
|
||||
connBegin := &stats.ConnBegin{}
|
||||
t.stats.HandleConn(t.ctx, connBegin)
|
||||
}
|
||||
go t.controller()
|
||||
go t.keepalive()
|
||||
t.writableChan <- 0
|
||||
return t, nil
|
||||
}
|
||||
|
||||
// operateHeader takes action on the decoded headers.
|
||||
func (t *http2Server) operateHeaders(frame *http2.MetaHeadersFrame, handle func(*Stream)) (close bool) {
|
||||
func (t *http2Server) operateHeaders(frame *http2.MetaHeadersFrame, handle func(*Stream), traceCtx func(context.Context, string) context.Context) (close bool) {
|
||||
buf := newRecvBuffer()
|
||||
s := &Stream{
|
||||
id: frame.Header().StreamID,
|
||||
@@ -153,13 +229,12 @@ func (t *http2Server) operateHeaders(frame *http2.MetaHeadersFrame, handle func(
|
||||
|
||||
var state decodeState
|
||||
for _, hf := range frame.Fields {
|
||||
state.processHeaderField(hf)
|
||||
}
|
||||
if err := state.err; err != nil {
|
||||
if se, ok := err.(StreamError); ok {
|
||||
t.controlBuf.put(&resetStream{s.id, statusCodeConvTab[se.Code]})
|
||||
if err := state.processHeaderField(hf); err != nil {
|
||||
if se, ok := err.(StreamError); ok {
|
||||
t.controlBuf.put(&resetStream{s.id, statusCodeConvTab[se.Code]})
|
||||
}
|
||||
return
|
||||
}
|
||||
return
|
||||
}
|
||||
|
||||
if frame.StreamEnded() {
|
||||
@@ -168,12 +243,12 @@ func (t *http2Server) operateHeaders(frame *http2.MetaHeadersFrame, handle func(
|
||||
}
|
||||
s.recvCompress = state.encoding
|
||||
if state.timeoutSet {
|
||||
s.ctx, s.cancel = context.WithTimeout(context.TODO(), state.timeout)
|
||||
s.ctx, s.cancel = context.WithTimeout(t.ctx, state.timeout)
|
||||
} else {
|
||||
s.ctx, s.cancel = context.WithCancel(context.TODO())
|
||||
s.ctx, s.cancel = context.WithCancel(t.ctx)
|
||||
}
|
||||
pr := &peer.Peer{
|
||||
Addr: t.conn.RemoteAddr(),
|
||||
Addr: t.remoteAddr,
|
||||
}
|
||||
// Attach Auth info if there is any.
|
||||
if t.authInfo != nil {
|
||||
@@ -186,7 +261,7 @@ func (t *http2Server) operateHeaders(frame *http2.MetaHeadersFrame, handle func(
|
||||
s.ctx = newContextWithStream(s.ctx, s)
|
||||
// Attach the received metadata to the context.
|
||||
if len(state.mdata) > 0 {
|
||||
s.ctx = metadata.NewContext(s.ctx, state.mdata)
|
||||
s.ctx = metadata.NewIncomingContext(s.ctx, state.mdata)
|
||||
}
|
||||
|
||||
s.dec = &recvBufferReader{
|
||||
@@ -195,6 +270,18 @@ func (t *http2Server) operateHeaders(frame *http2.MetaHeadersFrame, handle func(
|
||||
}
|
||||
s.recvCompress = state.encoding
|
||||
s.method = state.method
|
||||
if t.inTapHandle != nil {
|
||||
var err error
|
||||
info := &tap.Info{
|
||||
FullMethodName: state.method,
|
||||
}
|
||||
s.ctx, err = t.inTapHandle(s.ctx, info)
|
||||
if err != nil {
|
||||
// TODO: Log the real error.
|
||||
t.controlBuf.put(&resetStream{s.id, http2.ErrCodeRefusedStream})
|
||||
return
|
||||
}
|
||||
}
|
||||
t.mu.Lock()
|
||||
if t.state != reachable {
|
||||
t.mu.Unlock()
|
||||
@@ -214,17 +301,33 @@ func (t *http2Server) operateHeaders(frame *http2.MetaHeadersFrame, handle func(
|
||||
t.maxStreamID = s.id
|
||||
s.sendQuotaPool = newQuotaPool(int(t.streamSendQuota))
|
||||
t.activeStreams[s.id] = s
|
||||
if len(t.activeStreams) == 1 {
|
||||
t.idle = time.Time{}
|
||||
}
|
||||
t.mu.Unlock()
|
||||
s.windowHandler = func(n int) {
|
||||
t.updateWindow(s, uint32(n))
|
||||
}
|
||||
s.ctx = traceCtx(s.ctx, s.method)
|
||||
if t.stats != nil {
|
||||
s.ctx = t.stats.TagRPC(s.ctx, &stats.RPCTagInfo{FullMethodName: s.method})
|
||||
inHeader := &stats.InHeader{
|
||||
FullMethod: s.method,
|
||||
RemoteAddr: t.remoteAddr,
|
||||
LocalAddr: t.localAddr,
|
||||
Compression: s.recvCompress,
|
||||
WireLength: int(frame.Header().Length),
|
||||
}
|
||||
t.stats.HandleRPC(s.ctx, inHeader)
|
||||
}
|
||||
handle(s)
|
||||
return
|
||||
}
|
||||
|
||||
// HandleStreams receives incoming streams using the given handler. This is
|
||||
// typically run in a separate goroutine.
|
||||
func (t *http2Server) HandleStreams(handle func(*Stream)) {
|
||||
// traceCtx attaches trace to ctx and returns the new context.
|
||||
func (t *http2Server) HandleStreams(handle func(*Stream), traceCtx func(context.Context, string) context.Context) {
|
||||
// Check the validity of client preface.
|
||||
preface := make([]byte, len(clientPreface))
|
||||
if _, err := io.ReadFull(t.conn, preface); err != nil {
|
||||
@@ -248,6 +351,7 @@ func (t *http2Server) HandleStreams(handle func(*Stream)) {
|
||||
t.Close()
|
||||
return
|
||||
}
|
||||
atomic.StoreUint32(&t.activity, 1)
|
||||
sf, ok := frame.(*http2.SettingsFrame)
|
||||
if !ok {
|
||||
grpclog.Printf("transport: http2Server.HandleStreams saw invalid preface type %T from client", frame)
|
||||
@@ -258,6 +362,7 @@ func (t *http2Server) HandleStreams(handle func(*Stream)) {
|
||||
|
||||
for {
|
||||
frame, err := t.framer.readFrame()
|
||||
atomic.StoreUint32(&t.activity, 1)
|
||||
if err != nil {
|
||||
if se, ok := err.(http2.StreamError); ok {
|
||||
t.mu.Lock()
|
||||
@@ -279,7 +384,7 @@ func (t *http2Server) HandleStreams(handle func(*Stream)) {
|
||||
}
|
||||
switch frame := frame.(type) {
|
||||
case *http2.MetaHeadersFrame:
|
||||
if t.operateHeaders(frame, handle) {
|
||||
if t.operateHeaders(frame, handle, traceCtx) {
|
||||
t.Close()
|
||||
break
|
||||
}
|
||||
@@ -334,7 +439,7 @@ func (t *http2Server) updateWindow(s *Stream, n uint32) {
|
||||
}
|
||||
|
||||
func (t *http2Server) handleData(f *http2.DataFrame) {
|
||||
size := len(f.Data())
|
||||
size := f.Header().Length
|
||||
if err := t.fc.onData(uint32(size)); err != nil {
|
||||
grpclog.Printf("transport: http2Server %v", err)
|
||||
t.Close()
|
||||
@@ -349,6 +454,11 @@ func (t *http2Server) handleData(f *http2.DataFrame) {
|
||||
return
|
||||
}
|
||||
if size > 0 {
|
||||
if f.Header().Flags.Has(http2.FlagDataPadded) {
|
||||
if w := t.fc.onRead(uint32(size) - uint32(len(f.Data()))); w > 0 {
|
||||
t.controlBuf.put(&windowUpdate{0, w})
|
||||
}
|
||||
}
|
||||
s.mu.Lock()
|
||||
if s.state == streamDone {
|
||||
s.mu.Unlock()
|
||||
@@ -364,13 +474,20 @@ func (t *http2Server) handleData(f *http2.DataFrame) {
|
||||
t.controlBuf.put(&resetStream{s.id, http2.ErrCodeFlowControl})
|
||||
return
|
||||
}
|
||||
if f.Header().Flags.Has(http2.FlagDataPadded) {
|
||||
if w := s.fc.onRead(uint32(size) - uint32(len(f.Data()))); w > 0 {
|
||||
t.controlBuf.put(&windowUpdate{s.id, w})
|
||||
}
|
||||
}
|
||||
s.mu.Unlock()
|
||||
// TODO(bradfitz, zhaoq): A copy is required here because there is no
|
||||
// guarantee f.Data() is consumed before the arrival of next frame.
|
||||
// Can this copy be eliminated?
|
||||
data := make([]byte, size)
|
||||
copy(data, f.Data())
|
||||
s.write(recvMsg{data: data})
|
||||
if len(f.Data()) > 0 {
|
||||
data := make([]byte, len(f.Data()))
|
||||
copy(data, f.Data())
|
||||
s.write(recvMsg{data: data})
|
||||
}
|
||||
}
|
||||
if f.Header().Flags.Has(http2.FlagDataEndStream) {
|
||||
// Received the end of stream from the client.
|
||||
@@ -404,10 +521,50 @@ func (t *http2Server) handleSettings(f *http2.SettingsFrame) {
|
||||
t.controlBuf.put(&settings{ack: true, ss: ss})
|
||||
}
|
||||
|
||||
const (
|
||||
maxPingStrikes = 2
|
||||
defaultPingTimeout = 2 * time.Hour
|
||||
)
|
||||
|
||||
func (t *http2Server) handlePing(f *http2.PingFrame) {
|
||||
if f.IsAck() { // Do nothing.
|
||||
return
|
||||
}
|
||||
pingAck := &ping{ack: true}
|
||||
copy(pingAck.data[:], f.Data[:])
|
||||
t.controlBuf.put(pingAck)
|
||||
|
||||
now := time.Now()
|
||||
defer func() {
|
||||
t.lastPingAt = now
|
||||
}()
|
||||
// A reset ping strikes means that we don't need to check for policy
|
||||
// violation for this ping and the pingStrikes counter should be set
|
||||
// to 0.
|
||||
if atomic.CompareAndSwapUint32(&t.resetPingStrikes, 1, 0) {
|
||||
t.pingStrikes = 0
|
||||
return
|
||||
}
|
||||
t.mu.Lock()
|
||||
ns := len(t.activeStreams)
|
||||
t.mu.Unlock()
|
||||
if ns < 1 && !t.kep.PermitWithoutStream {
|
||||
// Keepalive shouldn't be active thus, this new ping should
|
||||
// have come after atleast defaultPingTimeout.
|
||||
if t.lastPingAt.Add(defaultPingTimeout).After(now) {
|
||||
t.pingStrikes++
|
||||
}
|
||||
} else {
|
||||
// Check if keepalive policy is respected.
|
||||
if t.lastPingAt.Add(t.kep.MinTime).After(now) {
|
||||
t.pingStrikes++
|
||||
}
|
||||
}
|
||||
|
||||
if t.pingStrikes > maxPingStrikes {
|
||||
// Send goaway and close the connection.
|
||||
t.controlBuf.put(&goAway{code: http2.ErrCodeEnhanceYourCalm, debugData: []byte("too_many_pings")})
|
||||
}
|
||||
}
|
||||
|
||||
func (t *http2Server) handleWindowUpdate(f *http2.WindowUpdateFrame) {
|
||||
@@ -426,6 +583,13 @@ func (t *http2Server) writeHeaders(s *Stream, b *bytes.Buffer, endStream bool) e
|
||||
first := true
|
||||
endHeaders := false
|
||||
var err error
|
||||
defer func() {
|
||||
if err == nil {
|
||||
// Reset ping strikes when seding headers since that might cause the
|
||||
// peer to send ping.
|
||||
atomic.StoreUint32(&t.resetPingStrikes, 1)
|
||||
}
|
||||
}()
|
||||
// Sends the headers in a single batch.
|
||||
for !endHeaders {
|
||||
size := t.hBuf.Len()
|
||||
@@ -462,6 +626,14 @@ func (t *http2Server) WriteHeader(s *Stream, md metadata.MD) error {
|
||||
return ErrIllegalHeaderWrite
|
||||
}
|
||||
s.headerOk = true
|
||||
if md.Len() > 0 {
|
||||
if s.header.Len() > 0 {
|
||||
s.header = metadata.Join(s.header, md)
|
||||
} else {
|
||||
s.header = md
|
||||
}
|
||||
}
|
||||
md = s.header
|
||||
s.mu.Unlock()
|
||||
if _, err := wait(s.ctx, nil, nil, t.shutdownChan, t.writableChan); err != nil {
|
||||
return err
|
||||
@@ -481,9 +653,16 @@ func (t *http2Server) WriteHeader(s *Stream, md metadata.MD) error {
|
||||
t.hEnc.WriteField(hpack.HeaderField{Name: k, Value: entry})
|
||||
}
|
||||
}
|
||||
bufLen := t.hBuf.Len()
|
||||
if err := t.writeHeaders(s, t.hBuf, false); err != nil {
|
||||
return err
|
||||
}
|
||||
if t.stats != nil {
|
||||
outHeader := &stats.OutHeader{
|
||||
WireLength: bufLen,
|
||||
}
|
||||
t.stats.HandleRPC(s.Context(), outHeader)
|
||||
}
|
||||
t.writableChan <- 0
|
||||
return nil
|
||||
}
|
||||
@@ -492,8 +671,8 @@ func (t *http2Server) WriteHeader(s *Stream, md metadata.MD) error {
|
||||
// There is no further I/O operations being able to perform on this stream.
|
||||
// TODO(zhaoq): Now it indicates the end of entire stream. Revisit if early
|
||||
// OK is adopted.
|
||||
func (t *http2Server) WriteStatus(s *Stream, statusCode codes.Code, statusDesc string) error {
|
||||
var headersSent bool
|
||||
func (t *http2Server) WriteStatus(s *Stream, st *status.Status) error {
|
||||
var headersSent, hasHeader bool
|
||||
s.mu.Lock()
|
||||
if s.state == streamDone {
|
||||
s.mu.Unlock()
|
||||
@@ -502,7 +681,16 @@ func (t *http2Server) WriteStatus(s *Stream, statusCode codes.Code, statusDesc s
|
||||
if s.headerOk {
|
||||
headersSent = true
|
||||
}
|
||||
if s.header.Len() > 0 {
|
||||
hasHeader = true
|
||||
}
|
||||
s.mu.Unlock()
|
||||
|
||||
if !headersSent && hasHeader {
|
||||
t.WriteHeader(s, nil)
|
||||
headersSent = true
|
||||
}
|
||||
|
||||
if _, err := wait(s.ctx, nil, nil, t.shutdownChan, t.writableChan); err != nil {
|
||||
return err
|
||||
}
|
||||
@@ -514,9 +702,24 @@ func (t *http2Server) WriteStatus(s *Stream, statusCode codes.Code, statusDesc s
|
||||
t.hEnc.WriteField(
|
||||
hpack.HeaderField{
|
||||
Name: "grpc-status",
|
||||
Value: strconv.Itoa(int(statusCode)),
|
||||
Value: strconv.Itoa(int(st.Code())),
|
||||
})
|
||||
t.hEnc.WriteField(hpack.HeaderField{Name: "grpc-message", Value: encodeGrpcMessage(statusDesc)})
|
||||
t.hEnc.WriteField(hpack.HeaderField{Name: "grpc-message", Value: encodeGrpcMessage(st.Message())})
|
||||
|
||||
if p := st.Proto(); p != nil && len(p.Details) > 0 {
|
||||
stBytes, err := proto.Marshal(p)
|
||||
if err != nil {
|
||||
// TODO: return error instead, when callers are able to handle it.
|
||||
panic(err)
|
||||
}
|
||||
|
||||
for k, v := range metadata.New(map[string]string{"grpc-status-details-bin": (string)(stBytes)}) {
|
||||
for _, entry := range v {
|
||||
t.hEnc.WriteField(hpack.HeaderField{Name: k, Value: entry})
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// Attach the trailer metadata.
|
||||
for k, v := range s.trailer {
|
||||
// Clients don't tolerate reading restricted headers after some non restricted ones were sent.
|
||||
@@ -527,10 +730,17 @@ func (t *http2Server) WriteStatus(s *Stream, statusCode codes.Code, statusDesc s
|
||||
t.hEnc.WriteField(hpack.HeaderField{Name: k, Value: entry})
|
||||
}
|
||||
}
|
||||
bufLen := t.hBuf.Len()
|
||||
if err := t.writeHeaders(s, t.hBuf, true); err != nil {
|
||||
t.Close()
|
||||
return err
|
||||
}
|
||||
if t.stats != nil {
|
||||
outTrailer := &stats.OutTrailer{
|
||||
WireLength: bufLen,
|
||||
}
|
||||
t.stats.HandleRPC(s.Context(), outTrailer)
|
||||
}
|
||||
t.closeStream(s)
|
||||
t.writableChan <- 0
|
||||
return nil
|
||||
@@ -538,7 +748,7 @@ func (t *http2Server) WriteStatus(s *Stream, statusCode codes.Code, statusDesc s
|
||||
|
||||
// Write converts the data into HTTP2 data frame and sends it out. Non-nil error
|
||||
// is returns if it fails (e.g., framing error, transport error).
|
||||
func (t *http2Server) Write(s *Stream, data []byte, opts *Options) error {
|
||||
func (t *http2Server) Write(s *Stream, data []byte, opts *Options) (err error) {
|
||||
// TODO(zhaoq): Support multi-writers for a single stream.
|
||||
var writeHeaderFrame bool
|
||||
s.mu.Lock()
|
||||
@@ -548,49 +758,32 @@ func (t *http2Server) Write(s *Stream, data []byte, opts *Options) error {
|
||||
}
|
||||
if !s.headerOk {
|
||||
writeHeaderFrame = true
|
||||
s.headerOk = true
|
||||
}
|
||||
s.mu.Unlock()
|
||||
if writeHeaderFrame {
|
||||
if _, err := wait(s.ctx, nil, nil, t.shutdownChan, t.writableChan); err != nil {
|
||||
return err
|
||||
}
|
||||
t.hBuf.Reset()
|
||||
t.hEnc.WriteField(hpack.HeaderField{Name: ":status", Value: "200"})
|
||||
t.hEnc.WriteField(hpack.HeaderField{Name: "content-type", Value: "application/grpc"})
|
||||
if s.sendCompress != "" {
|
||||
t.hEnc.WriteField(hpack.HeaderField{Name: "grpc-encoding", Value: s.sendCompress})
|
||||
}
|
||||
p := http2.HeadersFrameParam{
|
||||
StreamID: s.id,
|
||||
BlockFragment: t.hBuf.Bytes(),
|
||||
EndHeaders: true,
|
||||
}
|
||||
if err := t.framer.writeHeaders(false, p); err != nil {
|
||||
t.Close()
|
||||
return connectionErrorf(true, err, "transport: %v", err)
|
||||
}
|
||||
t.writableChan <- 0
|
||||
t.WriteHeader(s, nil)
|
||||
}
|
||||
defer func() {
|
||||
if err == nil {
|
||||
// Reset ping strikes when sending data since this might cause
|
||||
// the peer to send ping.
|
||||
atomic.StoreUint32(&t.resetPingStrikes, 1)
|
||||
}
|
||||
}()
|
||||
r := bytes.NewBuffer(data)
|
||||
for {
|
||||
if r.Len() == 0 {
|
||||
return nil
|
||||
}
|
||||
size := http2MaxFrameLen
|
||||
s.sendQuotaPool.add(0)
|
||||
// Wait until the stream has some quota to send the data.
|
||||
sq, err := wait(s.ctx, nil, nil, t.shutdownChan, s.sendQuotaPool.acquire())
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
t.sendQuotaPool.add(0)
|
||||
// Wait until the transport has some quota to send the data.
|
||||
tq, err := wait(s.ctx, nil, nil, t.shutdownChan, t.sendQuotaPool.acquire())
|
||||
if err != nil {
|
||||
if _, ok := err.(StreamError); ok {
|
||||
t.sendQuotaPool.cancel()
|
||||
}
|
||||
return err
|
||||
}
|
||||
if sq < size {
|
||||
@@ -658,7 +851,7 @@ func (t *http2Server) applySettings(ss []http2.Setting) {
|
||||
t.mu.Lock()
|
||||
defer t.mu.Unlock()
|
||||
for _, stream := range t.activeStreams {
|
||||
stream.sendQuotaPool.reset(int(s.Val - t.streamSendQuota))
|
||||
stream.sendQuotaPool.add(int(s.Val - t.streamSendQuota))
|
||||
}
|
||||
t.streamSendQuota = s.Val
|
||||
}
|
||||
@@ -666,6 +859,91 @@ func (t *http2Server) applySettings(ss []http2.Setting) {
|
||||
}
|
||||
}
|
||||
|
||||
// keepalive running in a separate goroutine does the following:
|
||||
// 1. Gracefully closes an idle connection after a duration of keepalive.MaxConnectionIdle.
|
||||
// 2. Gracefully closes any connection after a duration of keepalive.MaxConnectionAge.
|
||||
// 3. Forcibly closes a connection after an additive period of keepalive.MaxConnectionAgeGrace over keepalive.MaxConnectionAge.
|
||||
// 4. Makes sure a connection is alive by sending pings with a frequency of keepalive.Time and closes a non-resposive connection
|
||||
// after an additional duration of keepalive.Timeout.
|
||||
func (t *http2Server) keepalive() {
|
||||
p := &ping{}
|
||||
var pingSent bool
|
||||
maxIdle := time.NewTimer(t.kp.MaxConnectionIdle)
|
||||
maxAge := time.NewTimer(t.kp.MaxConnectionAge)
|
||||
keepalive := time.NewTimer(t.kp.Time)
|
||||
// NOTE: All exit paths of this function should reset their
|
||||
// respecitve timers. A failure to do so will cause the
|
||||
// following clean-up to deadlock and eventually leak.
|
||||
defer func() {
|
||||
if !maxIdle.Stop() {
|
||||
<-maxIdle.C
|
||||
}
|
||||
if !maxAge.Stop() {
|
||||
<-maxAge.C
|
||||
}
|
||||
if !keepalive.Stop() {
|
||||
<-keepalive.C
|
||||
}
|
||||
}()
|
||||
for {
|
||||
select {
|
||||
case <-maxIdle.C:
|
||||
t.mu.Lock()
|
||||
idle := t.idle
|
||||
if idle.IsZero() { // The connection is non-idle.
|
||||
t.mu.Unlock()
|
||||
maxIdle.Reset(t.kp.MaxConnectionIdle)
|
||||
continue
|
||||
}
|
||||
val := t.kp.MaxConnectionIdle - time.Since(idle)
|
||||
if val <= 0 {
|
||||
// The connection has been idle for a duration of keepalive.MaxConnectionIdle or more.
|
||||
// Gracefully close the connection.
|
||||
t.state = draining
|
||||
t.mu.Unlock()
|
||||
t.Drain()
|
||||
// Reseting the timer so that the clean-up doesn't deadlock.
|
||||
maxIdle.Reset(infinity)
|
||||
return
|
||||
}
|
||||
t.mu.Unlock()
|
||||
maxIdle.Reset(val)
|
||||
case <-maxAge.C:
|
||||
t.mu.Lock()
|
||||
t.state = draining
|
||||
t.mu.Unlock()
|
||||
t.Drain()
|
||||
maxAge.Reset(t.kp.MaxConnectionAgeGrace)
|
||||
select {
|
||||
case <-maxAge.C:
|
||||
// Close the connection after grace period.
|
||||
t.Close()
|
||||
// Reseting the timer so that the clean-up doesn't deadlock.
|
||||
maxAge.Reset(infinity)
|
||||
case <-t.shutdownChan:
|
||||
}
|
||||
return
|
||||
case <-keepalive.C:
|
||||
if atomic.CompareAndSwapUint32(&t.activity, 1, 0) {
|
||||
pingSent = false
|
||||
keepalive.Reset(t.kp.Time)
|
||||
continue
|
||||
}
|
||||
if pingSent {
|
||||
t.Close()
|
||||
// Reseting the timer so that the clean-up doesn't deadlock.
|
||||
keepalive.Reset(infinity)
|
||||
return
|
||||
}
|
||||
pingSent = true
|
||||
t.controlBuf.put(p)
|
||||
keepalive.Reset(t.kp.Timeout)
|
||||
case <-t.shutdownChan:
|
||||
return
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// controller running in a separate goroutine takes charge of sending control
|
||||
// frames (e.g., window update, reset stream, setting, etc.) to the server.
|
||||
func (t *http2Server) controller() {
|
||||
@@ -697,7 +975,10 @@ func (t *http2Server) controller() {
|
||||
sid := t.maxStreamID
|
||||
t.state = draining
|
||||
t.mu.Unlock()
|
||||
t.framer.writeGoAway(true, sid, http2.ErrCodeNo, nil)
|
||||
t.framer.writeGoAway(true, sid, i.code, i.debugData)
|
||||
if i.code == http2.ErrCodeEnhanceYourCalm {
|
||||
t.Close()
|
||||
}
|
||||
case *flushIO:
|
||||
t.framer.flushWrite()
|
||||
case *ping:
|
||||
@@ -735,6 +1016,10 @@ func (t *http2Server) Close() (err error) {
|
||||
for _, s := range streams {
|
||||
s.cancel()
|
||||
}
|
||||
if t.stats != nil {
|
||||
connEnd := &stats.ConnEnd{}
|
||||
t.stats.HandleConn(t.ctx, connEnd)
|
||||
}
|
||||
return
|
||||
}
|
||||
|
||||
@@ -743,6 +1028,9 @@ func (t *http2Server) Close() (err error) {
|
||||
func (t *http2Server) closeStream(s *Stream) {
|
||||
t.mu.Lock()
|
||||
delete(t.activeStreams, s.id)
|
||||
if len(t.activeStreams) == 0 {
|
||||
t.idle = time.Now()
|
||||
}
|
||||
if t.state == draining && len(t.activeStreams) == 0 {
|
||||
defer t.Close()
|
||||
}
|
||||
@@ -766,9 +1054,21 @@ func (t *http2Server) closeStream(s *Stream) {
|
||||
}
|
||||
|
||||
func (t *http2Server) RemoteAddr() net.Addr {
|
||||
return t.conn.RemoteAddr()
|
||||
return t.remoteAddr
|
||||
}
|
||||
|
||||
func (t *http2Server) Drain() {
|
||||
t.controlBuf.put(&goAway{})
|
||||
t.controlBuf.put(&goAway{code: http2.ErrCodeNo})
|
||||
}
|
||||
|
||||
var rgen = rand.New(rand.NewSource(time.Now().UnixNano()))
|
||||
|
||||
func getJitter(v time.Duration) time.Duration {
|
||||
if v == infinity {
|
||||
return 0
|
||||
}
|
||||
// Generate a jitter between +/- 10% of the value.
|
||||
r := int64(v / 10)
|
||||
j := rgen.Int63n(2*r) - r
|
||||
return time.Duration(j)
|
||||
}
|
||||
|
77
vendor/google.golang.org/grpc/transport/http_util.go
generated
vendored
77
vendor/google.golang.org/grpc/transport/http_util.go
generated
vendored
@@ -44,16 +44,17 @@ import (
|
||||
"sync/atomic"
|
||||
"time"
|
||||
|
||||
"github.com/golang/protobuf/proto"
|
||||
"golang.org/x/net/http2"
|
||||
"golang.org/x/net/http2/hpack"
|
||||
spb "google.golang.org/genproto/googleapis/rpc/status"
|
||||
"google.golang.org/grpc/codes"
|
||||
"google.golang.org/grpc/grpclog"
|
||||
"google.golang.org/grpc/metadata"
|
||||
"google.golang.org/grpc/status"
|
||||
)
|
||||
|
||||
const (
|
||||
// The primary user agent
|
||||
primaryUA = "grpc-go/1.0"
|
||||
// http2MaxFrameLen specifies the max length of a HTTP2 frame.
|
||||
http2MaxFrameLen = 16384 // 16KB frame
|
||||
// http://http2.github.io/http2-spec/#SettingValues
|
||||
@@ -92,13 +93,15 @@ var (
|
||||
// Records the states during HPACK decoding. Must be reset once the
|
||||
// decoding of the entire headers are finished.
|
||||
type decodeState struct {
|
||||
err error // first error encountered decoding
|
||||
|
||||
encoding string
|
||||
// statusCode caches the stream status received from the trailer
|
||||
// the server sent. Client side only.
|
||||
statusCode codes.Code
|
||||
statusDesc string
|
||||
// statusGen caches the stream status received from the trailer the server
|
||||
// sent. Client side only. Do not access directly. After all trailers are
|
||||
// parsed, use the status method to retrieve the status.
|
||||
statusGen *status.Status
|
||||
// rawStatusCode and rawStatusMsg are set from the raw trailer fields and are not
|
||||
// intended for direct access outside of parsing.
|
||||
rawStatusCode int32
|
||||
rawStatusMsg string
|
||||
// Server side only fields.
|
||||
timeoutSet bool
|
||||
timeout time.Duration
|
||||
@@ -121,6 +124,7 @@ func isReservedHeader(hdr string) bool {
|
||||
"grpc-message",
|
||||
"grpc-status",
|
||||
"grpc-timeout",
|
||||
"grpc-status-details-bin",
|
||||
"te":
|
||||
return true
|
||||
default:
|
||||
@@ -139,12 +143,6 @@ func isWhitelistedPseudoHeader(hdr string) bool {
|
||||
}
|
||||
}
|
||||
|
||||
func (d *decodeState) setErr(err error) {
|
||||
if d.err == nil {
|
||||
d.err = err
|
||||
}
|
||||
}
|
||||
|
||||
func validContentType(t string) bool {
|
||||
e := "application/grpc"
|
||||
if !strings.HasPrefix(t, e) {
|
||||
@@ -158,56 +156,62 @@ func validContentType(t string) bool {
|
||||
return true
|
||||
}
|
||||
|
||||
func (d *decodeState) processHeaderField(f hpack.HeaderField) {
|
||||
func (d *decodeState) status() *status.Status {
|
||||
if d.statusGen == nil {
|
||||
// No status-details were provided; generate status using code/msg.
|
||||
d.statusGen = status.New(codes.Code(d.rawStatusCode), d.rawStatusMsg)
|
||||
}
|
||||
return d.statusGen
|
||||
}
|
||||
|
||||
func (d *decodeState) processHeaderField(f hpack.HeaderField) error {
|
||||
switch f.Name {
|
||||
case "content-type":
|
||||
if !validContentType(f.Value) {
|
||||
d.setErr(streamErrorf(codes.FailedPrecondition, "transport: received the unexpected content-type %q", f.Value))
|
||||
return
|
||||
return streamErrorf(codes.FailedPrecondition, "transport: received the unexpected content-type %q", f.Value)
|
||||
}
|
||||
case "grpc-encoding":
|
||||
d.encoding = f.Value
|
||||
case "grpc-status":
|
||||
code, err := strconv.Atoi(f.Value)
|
||||
if err != nil {
|
||||
d.setErr(streamErrorf(codes.Internal, "transport: malformed grpc-status: %v", err))
|
||||
return
|
||||
return streamErrorf(codes.Internal, "transport: malformed grpc-status: %v", err)
|
||||
}
|
||||
d.statusCode = codes.Code(code)
|
||||
d.rawStatusCode = int32(code)
|
||||
case "grpc-message":
|
||||
d.statusDesc = decodeGrpcMessage(f.Value)
|
||||
d.rawStatusMsg = decodeGrpcMessage(f.Value)
|
||||
case "grpc-status-details-bin":
|
||||
_, v, err := metadata.DecodeKeyValue("grpc-status-details-bin", f.Value)
|
||||
if err != nil {
|
||||
return streamErrorf(codes.Internal, "transport: malformed grpc-status-details-bin: %v", err)
|
||||
}
|
||||
s := &spb.Status{}
|
||||
if err := proto.Unmarshal([]byte(v), s); err != nil {
|
||||
return streamErrorf(codes.Internal, "transport: malformed grpc-status-details-bin: %v", err)
|
||||
}
|
||||
d.statusGen = status.FromProto(s)
|
||||
case "grpc-timeout":
|
||||
d.timeoutSet = true
|
||||
var err error
|
||||
d.timeout, err = decodeTimeout(f.Value)
|
||||
if err != nil {
|
||||
d.setErr(streamErrorf(codes.Internal, "transport: malformed time-out: %v", err))
|
||||
return
|
||||
if d.timeout, err = decodeTimeout(f.Value); err != nil {
|
||||
return streamErrorf(codes.Internal, "transport: malformed time-out: %v", err)
|
||||
}
|
||||
case ":path":
|
||||
d.method = f.Value
|
||||
default:
|
||||
if !isReservedHeader(f.Name) || isWhitelistedPseudoHeader(f.Name) {
|
||||
if f.Name == "user-agent" {
|
||||
i := strings.LastIndex(f.Value, " ")
|
||||
if i == -1 {
|
||||
// There is no application user agent string being set.
|
||||
return
|
||||
}
|
||||
// Extract the application user agent string.
|
||||
f.Value = f.Value[:i]
|
||||
}
|
||||
if d.mdata == nil {
|
||||
d.mdata = make(map[string][]string)
|
||||
}
|
||||
k, v, err := metadata.DecodeKeyValue(f.Name, f.Value)
|
||||
if err != nil {
|
||||
grpclog.Printf("Failed to decode (%q, %q): %v", f.Name, f.Value, err)
|
||||
return
|
||||
return nil
|
||||
}
|
||||
d.mdata[k] = append(d.mdata[k], v)
|
||||
}
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
type timeoutUnit uint8
|
||||
@@ -379,6 +383,9 @@ func newFramer(conn net.Conn) *framer {
|
||||
writer: bufio.NewWriterSize(conn, http2IOBufSize),
|
||||
}
|
||||
f.fr = http2.NewFramer(f.writer, f.reader)
|
||||
// Opt-in to Frame reuse API on framer to reduce garbage.
|
||||
// Frames aren't safe to read from after a subsequent call to ReadFrame.
|
||||
f.fr.SetReuseFrames()
|
||||
f.fr.ReadMetaHeaders = hpack.NewDecoder(http2InitHeaderTableSize, nil)
|
||||
return f
|
||||
}
|
||||
|
51
vendor/google.golang.org/grpc/transport/pre_go16.go
generated
vendored
51
vendor/google.golang.org/grpc/transport/pre_go16.go
generated
vendored
@@ -1,51 +0,0 @@
|
||||
// +build !go1.6
|
||||
|
||||
/*
|
||||
* Copyright 2016, Google Inc.
|
||||
* All rights reserved.
|
||||
*
|
||||
* Redistribution and use in source and binary forms, with or without
|
||||
* modification, are permitted provided that the following conditions are
|
||||
* met:
|
||||
*
|
||||
* * Redistributions of source code must retain the above copyright
|
||||
* notice, this list of conditions and the following disclaimer.
|
||||
* * Redistributions in binary form must reproduce the above
|
||||
* copyright notice, this list of conditions and the following disclaimer
|
||||
* in the documentation and/or other materials provided with the
|
||||
* distribution.
|
||||
* * Neither the name of Google Inc. nor the names of its
|
||||
* contributors may be used to endorse or promote products derived from
|
||||
* this software without specific prior written permission.
|
||||
*
|
||||
* THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
|
||||
* "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
|
||||
* LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
|
||||
* A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
|
||||
* OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
|
||||
* SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
|
||||
* LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
|
||||
* DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
|
||||
* THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
|
||||
* (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
|
||||
* OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
|
||||
*
|
||||
*/
|
||||
|
||||
package transport
|
||||
|
||||
import (
|
||||
"net"
|
||||
"time"
|
||||
|
||||
"golang.org/x/net/context"
|
||||
)
|
||||
|
||||
// dialContext connects to the address on the named network.
|
||||
func dialContext(ctx context.Context, network, address string) (net.Conn, error) {
|
||||
var dialer net.Dialer
|
||||
if deadline, ok := ctx.Deadline(); ok {
|
||||
dialer.Timeout = deadline.Sub(time.Now())
|
||||
}
|
||||
return dialer.Dial(network, address)
|
||||
}
|
131
vendor/google.golang.org/grpc/transport/transport.go
generated
vendored
131
vendor/google.golang.org/grpc/transport/transport.go
generated
vendored
@@ -45,10 +45,14 @@ import (
|
||||
"sync"
|
||||
|
||||
"golang.org/x/net/context"
|
||||
"golang.org/x/net/trace"
|
||||
"golang.org/x/net/http2"
|
||||
"google.golang.org/grpc/codes"
|
||||
"google.golang.org/grpc/credentials"
|
||||
"google.golang.org/grpc/keepalive"
|
||||
"google.golang.org/grpc/metadata"
|
||||
"google.golang.org/grpc/stats"
|
||||
"google.golang.org/grpc/status"
|
||||
"google.golang.org/grpc/tap"
|
||||
)
|
||||
|
||||
// recvMsg represents the received msg from the transport. All transport
|
||||
@@ -167,6 +171,11 @@ type Stream struct {
|
||||
id uint32
|
||||
// nil for client side Stream.
|
||||
st ServerTransport
|
||||
// clientStatsCtx keeps the user context for stats handling.
|
||||
// It's only valid on client side. Server side stats context is same as s.ctx.
|
||||
// All client side stats collection should use the clientStatsCtx (instead of the stream context)
|
||||
// so that all the generated stats for a particular RPC can be associated in the processing phase.
|
||||
clientStatsCtx context.Context
|
||||
// ctx is the associated context of the stream.
|
||||
ctx context.Context
|
||||
// cancel is always nil for client side Stream.
|
||||
@@ -204,9 +213,13 @@ type Stream struct {
|
||||
// true iff headerChan is closed. Used to avoid closing headerChan
|
||||
// multiple times.
|
||||
headerDone bool
|
||||
// the status received from the server.
|
||||
statusCode codes.Code
|
||||
statusDesc string
|
||||
// the status error received from the server.
|
||||
status *status.Status
|
||||
// rstStream indicates whether a RST_STREAM frame needs to be sent
|
||||
// to the server to signify that this stream is closing.
|
||||
rstStream bool
|
||||
// rstError is the error that needs to be sent along with the RST_STREAM frame.
|
||||
rstError http2.ErrCode
|
||||
}
|
||||
|
||||
// RecvCompress returns the compression algorithm applied to the inbound
|
||||
@@ -266,29 +279,37 @@ func (s *Stream) Context() context.Context {
|
||||
return s.ctx
|
||||
}
|
||||
|
||||
// TraceContext recreates the context of s with a trace.Trace.
|
||||
func (s *Stream) TraceContext(tr trace.Trace) {
|
||||
s.ctx = trace.NewContext(s.ctx, tr)
|
||||
}
|
||||
|
||||
// Method returns the method for the stream.
|
||||
func (s *Stream) Method() string {
|
||||
return s.method
|
||||
}
|
||||
|
||||
// StatusCode returns statusCode received from the server.
|
||||
func (s *Stream) StatusCode() codes.Code {
|
||||
return s.statusCode
|
||||
// Status returns the status received from the server.
|
||||
func (s *Stream) Status() *status.Status {
|
||||
return s.status
|
||||
}
|
||||
|
||||
// StatusDesc returns statusDesc received from the server.
|
||||
func (s *Stream) StatusDesc() string {
|
||||
return s.statusDesc
|
||||
// SetHeader sets the header metadata. This can be called multiple times.
|
||||
// Server side only.
|
||||
func (s *Stream) SetHeader(md metadata.MD) error {
|
||||
s.mu.Lock()
|
||||
defer s.mu.Unlock()
|
||||
if s.headerOk || s.state == streamDone {
|
||||
return ErrIllegalHeaderWrite
|
||||
}
|
||||
if md.Len() == 0 {
|
||||
return nil
|
||||
}
|
||||
s.header = metadata.Join(s.header, md)
|
||||
return nil
|
||||
}
|
||||
|
||||
// SetTrailer sets the trailer metadata which will be sent with the RPC status
|
||||
// by the server. This can be called multiple times. Server side only.
|
||||
func (s *Stream) SetTrailer(md metadata.MD) error {
|
||||
if md.Len() == 0 {
|
||||
return nil
|
||||
}
|
||||
s.mu.Lock()
|
||||
defer s.mu.Unlock()
|
||||
s.trailer = metadata.Join(s.trailer, md)
|
||||
@@ -312,6 +333,20 @@ func (s *Stream) Read(p []byte) (n int, err error) {
|
||||
return
|
||||
}
|
||||
|
||||
// finish sets the stream's state and status, and closes the done channel.
|
||||
// s.mu must be held by the caller. st must always be non-nil.
|
||||
func (s *Stream) finish(st *status.Status) {
|
||||
s.status = st
|
||||
s.state = streamDone
|
||||
close(s.done)
|
||||
}
|
||||
|
||||
// GoString is implemented by Stream so context.String() won't
|
||||
// race when printing %#v.
|
||||
func (s *Stream) GoString() string {
|
||||
return fmt.Sprintf("<stream: %p, %v>", s, s.method)
|
||||
}
|
||||
|
||||
// The key to save transport.Stream in the context.
|
||||
type streamKey struct{}
|
||||
|
||||
@@ -337,27 +372,52 @@ const (
|
||||
draining
|
||||
)
|
||||
|
||||
// NewServerTransport creates a ServerTransport with conn or non-nil error
|
||||
// if it fails.
|
||||
func NewServerTransport(protocol string, conn net.Conn, maxStreams uint32, authInfo credentials.AuthInfo) (ServerTransport, error) {
|
||||
return newHTTP2Server(conn, maxStreams, authInfo)
|
||||
// ServerConfig consists of all the configurations to establish a server transport.
|
||||
type ServerConfig struct {
|
||||
MaxStreams uint32
|
||||
AuthInfo credentials.AuthInfo
|
||||
InTapHandle tap.ServerInHandle
|
||||
StatsHandler stats.Handler
|
||||
KeepaliveParams keepalive.ServerParameters
|
||||
KeepalivePolicy keepalive.EnforcementPolicy
|
||||
}
|
||||
|
||||
// ConnectOptions covers all relevant options for dialing a server.
|
||||
// NewServerTransport creates a ServerTransport with conn or non-nil error
|
||||
// if it fails.
|
||||
func NewServerTransport(protocol string, conn net.Conn, config *ServerConfig) (ServerTransport, error) {
|
||||
return newHTTP2Server(conn, config)
|
||||
}
|
||||
|
||||
// ConnectOptions covers all relevant options for communicating with the server.
|
||||
type ConnectOptions struct {
|
||||
// UserAgent is the application user agent.
|
||||
UserAgent string
|
||||
// Authority is the :authority pseudo-header to use. This field has no effect if
|
||||
// TransportCredentials is set.
|
||||
Authority string
|
||||
// Dialer specifies how to dial a network address.
|
||||
Dialer func(context.Context, string) (net.Conn, error)
|
||||
// FailOnNonTempDialError specifies if gRPC fails on non-temporary dial errors.
|
||||
FailOnNonTempDialError bool
|
||||
// PerRPCCredentials stores the PerRPCCredentials required to issue RPCs.
|
||||
PerRPCCredentials []credentials.PerRPCCredentials
|
||||
// TransportCredentials stores the Authenticator required to setup a client connection.
|
||||
TransportCredentials credentials.TransportCredentials
|
||||
// KeepaliveParams stores the keepalive parameters.
|
||||
KeepaliveParams keepalive.ClientParameters
|
||||
// StatsHandler stores the handler for stats.
|
||||
StatsHandler stats.Handler
|
||||
}
|
||||
|
||||
// TargetInfo contains the information of the target such as network address and metadata.
|
||||
type TargetInfo struct {
|
||||
Addr string
|
||||
Metadata interface{}
|
||||
}
|
||||
|
||||
// NewClientTransport establishes the transport with the required ConnectOptions
|
||||
// and returns it to the caller.
|
||||
func NewClientTransport(ctx context.Context, target string, opts ConnectOptions) (ClientTransport, error) {
|
||||
func NewClientTransport(ctx context.Context, target TargetInfo, opts ConnectOptions) (ClientTransport, error) {
|
||||
return newHTTP2Client(ctx, target, opts)
|
||||
}
|
||||
|
||||
@@ -433,6 +493,9 @@ type ClientTransport interface {
|
||||
// receives the draining signal from the server (e.g., GOAWAY frame in
|
||||
// HTTP/2).
|
||||
GoAway() <-chan struct{}
|
||||
|
||||
// GetGoAwayReason returns the reason why GoAway frame was received.
|
||||
GetGoAwayReason() GoAwayReason
|
||||
}
|
||||
|
||||
// ServerTransport is the common interface for all gRPC server-side transport
|
||||
@@ -442,7 +505,7 @@ type ClientTransport interface {
|
||||
// Write methods for a given Stream will be called serially.
|
||||
type ServerTransport interface {
|
||||
// HandleStreams receives incoming streams using the given handler.
|
||||
HandleStreams(func(*Stream))
|
||||
HandleStreams(func(*Stream), func(context.Context, string) context.Context)
|
||||
|
||||
// WriteHeader sends the header metadata for the given stream.
|
||||
// WriteHeader may not be called on all streams.
|
||||
@@ -452,10 +515,9 @@ type ServerTransport interface {
|
||||
// Write may not be called on all streams.
|
||||
Write(s *Stream, data []byte, opts *Options) error
|
||||
|
||||
// WriteStatus sends the status of a stream to the client.
|
||||
// WriteStatus is the final call made on a stream and always
|
||||
// occurs.
|
||||
WriteStatus(s *Stream, statusCode codes.Code, statusDesc string) error
|
||||
// WriteStatus sends the status of a stream to the client. WriteStatus is
|
||||
// the final call made on a stream and always occurs.
|
||||
WriteStatus(s *Stream, st *status.Status) error
|
||||
|
||||
// Close tears down the transport. Once it is called, the transport
|
||||
// should not be accessed any more. All the pending streams and their
|
||||
@@ -521,6 +583,8 @@ var (
|
||||
ErrStreamDrain = streamErrorf(codes.Unavailable, "the server stops accepting new RPCs")
|
||||
)
|
||||
|
||||
// TODO: See if we can replace StreamError with status package errors.
|
||||
|
||||
// StreamError is an error that only affects one stream within a connection.
|
||||
type StreamError struct {
|
||||
Code codes.Code
|
||||
@@ -528,7 +592,7 @@ type StreamError struct {
|
||||
}
|
||||
|
||||
func (e StreamError) Error() string {
|
||||
return fmt.Sprintf("stream error: code = %d desc = %q", e.Code, e.Desc)
|
||||
return fmt.Sprintf("stream error: code = %s desc = %q", e.Code, e.Desc)
|
||||
}
|
||||
|
||||
// ContextErr converts the error from context package into a StreamError.
|
||||
@@ -569,3 +633,16 @@ func wait(ctx context.Context, done, goAway, closing <-chan struct{}, proceed <-
|
||||
return i, nil
|
||||
}
|
||||
}
|
||||
|
||||
// GoAwayReason contains the reason for the GoAway frame received.
|
||||
type GoAwayReason uint8
|
||||
|
||||
const (
|
||||
// Invalid indicates that no GoAway frame is received.
|
||||
Invalid GoAwayReason = 0
|
||||
// NoReason is the default value when GoAway frame is received.
|
||||
NoReason GoAwayReason = 1
|
||||
// TooManyPings indicates that a GoAway frame with ErrCodeEnhanceYourCalm
|
||||
// was recieved and that the debug data said "too_many_pings".
|
||||
TooManyPings GoAwayReason = 2
|
||||
)
|
||||
|
Reference in New Issue
Block a user