vendor: revendor

This commit is contained in:
Eric Chiang
2016-12-22 11:39:28 -08:00
parent d87a4c35b9
commit 1451213dd7
268 changed files with 484 additions and 59530 deletions

76
vendor/google.golang.org/appengine/appengine.go generated vendored Normal file
View File

@@ -0,0 +1,76 @@
// Copyright 2011 Google Inc. All rights reserved.
// Use of this source code is governed by the Apache 2.0
// license that can be found in the LICENSE file.
// Package appengine provides basic functionality for Google App Engine.
//
// For more information on how to write Go apps for Google App Engine, see:
// https://cloud.google.com/appengine/docs/go/
package appengine // import "google.golang.org/appengine"
import (
"net/http"
"github.com/golang/protobuf/proto"
"golang.org/x/net/context"
"google.golang.org/appengine/internal"
)
// IsDevAppServer reports whether the App Engine app is running in the
// development App Server.
func IsDevAppServer() bool {
return internal.IsDevAppServer()
}
// NewContext returns a context for an in-flight HTTP request.
// This function is cheap.
func NewContext(req *http.Request) context.Context {
return WithContext(context.Background(), req)
}
// WithContext returns a copy of the parent context
// and associates it with an in-flight HTTP request.
// This function is cheap.
func WithContext(parent context.Context, req *http.Request) context.Context {
return internal.WithContext(parent, req)
}
// TODO(dsymonds): Add a Call function here? Otherwise other packages can't access internal.Call.
// BlobKey is a key for a blobstore blob.
//
// Conceptually, this type belongs in the blobstore package, but it lives in
// the appengine package to avoid a circular dependency: blobstore depends on
// datastore, and datastore needs to refer to the BlobKey type.
type BlobKey string
// GeoPoint represents a location as latitude/longitude in degrees.
type GeoPoint struct {
Lat, Lng float64
}
// Valid returns whether a GeoPoint is within [-90, 90] latitude and [-180, 180] longitude.
func (g GeoPoint) Valid() bool {
return -90 <= g.Lat && g.Lat <= 90 && -180 <= g.Lng && g.Lng <= 180
}
// APICallFunc defines a function type for handling an API call.
// See WithCallOverride.
type APICallFunc func(ctx context.Context, service, method string, in, out proto.Message) error
// WithAPICallFunc returns a copy of the parent context
// that will cause API calls to invoke f instead of their normal operation.
//
// This is intended for advanced users only.
func WithAPICallFunc(ctx context.Context, f APICallFunc) context.Context {
return internal.WithCallOverride(ctx, internal.CallOverrideFunc(f))
}
// APICall performs an API call.
//
// This is not intended for general use; it is exported for use in conjunction
// with WithAPICallFunc.
func APICall(ctx context.Context, service, method string, in, out proto.Message) error {
return internal.Call(ctx, service, method, in, out)
}

56
vendor/google.golang.org/appengine/appengine_vm.go generated vendored Normal file
View File

@@ -0,0 +1,56 @@
// Copyright 2015 Google Inc. All rights reserved.
// Use of this source code is governed by the Apache 2.0
// license that can be found in the LICENSE file.
// +build !appengine
package appengine
import (
"golang.org/x/net/context"
"google.golang.org/appengine/internal"
)
// The comment below must not be changed.
// It is used by go-app-builder to recognise that this package has
// the Main function to use in the synthetic main.
// The gophers party all night; the rabbits provide the beats.
// Main is the principal entry point for an app running in App Engine "flexible environment".
// It installs a trivial health checker if one isn't already registered,
// and starts listening on port 8080 (overridden by the $PORT environment
// variable).
//
// See https://cloud.google.com/appengine/docs/flexible/custom-runtimes#health_check_requests
// for details on how to do your own health checking.
//
// Main never returns.
//
// Main is designed so that the app's main package looks like this:
//
// package main
//
// import (
// "google.golang.org/appengine"
//
// _ "myapp/package0"
// _ "myapp/package1"
// )
//
// func main() {
// appengine.Main()
// }
//
// The "myapp/packageX" packages are expected to register HTTP handlers
// in their init functions.
func Main() {
internal.Main()
}
// BackgroundContext returns a context not associated with a request.
// This should only be used when not servicing a request.
// This only works in App Engine "flexible environment".
func BackgroundContext() context.Context {
return internal.BackgroundContext()
}

46
vendor/google.golang.org/appengine/errors.go generated vendored Normal file
View File

@@ -0,0 +1,46 @@
// Copyright 2011 Google Inc. All rights reserved.
// Use of this source code is governed by the Apache 2.0
// license that can be found in the LICENSE file.
// This file provides error functions for common API failure modes.
package appengine
import (
"fmt"
"google.golang.org/appengine/internal"
)
// IsOverQuota reports whether err represents an API call failure
// due to insufficient available quota.
func IsOverQuota(err error) bool {
callErr, ok := err.(*internal.CallError)
return ok && callErr.Code == 4
}
// MultiError is returned by batch operations when there are errors with
// particular elements. Errors will be in a one-to-one correspondence with
// the input elements; successful elements will have a nil entry.
type MultiError []error
func (m MultiError) Error() string {
s, n := "", 0
for _, e := range m {
if e != nil {
if n == 0 {
s = e.Error()
}
n++
}
}
switch n {
case 0:
return "(0 errors)"
case 1:
return s
case 2:
return s + " (and 1 other error)"
}
return fmt.Sprintf("%s (and %d other errors)", s, n-1)
}

142
vendor/google.golang.org/appengine/identity.go generated vendored Normal file
View File

@@ -0,0 +1,142 @@
// Copyright 2011 Google Inc. All rights reserved.
// Use of this source code is governed by the Apache 2.0
// license that can be found in the LICENSE file.
package appengine
import (
"time"
"golang.org/x/net/context"
"google.golang.org/appengine/internal"
pb "google.golang.org/appengine/internal/app_identity"
modpb "google.golang.org/appengine/internal/modules"
)
// AppID returns the application ID for the current application.
// The string will be a plain application ID (e.g. "appid"), with a
// domain prefix for custom domain deployments (e.g. "example.com:appid").
func AppID(c context.Context) string { return internal.AppID(c) }
// DefaultVersionHostname returns the standard hostname of the default version
// of the current application (e.g. "my-app.appspot.com"). This is suitable for
// use in constructing URLs.
func DefaultVersionHostname(c context.Context) string {
return internal.DefaultVersionHostname(c)
}
// ModuleName returns the module name of the current instance.
func ModuleName(c context.Context) string {
return internal.ModuleName(c)
}
// ModuleHostname returns a hostname of a module instance.
// If module is the empty string, it refers to the module of the current instance.
// If version is empty, it refers to the version of the current instance if valid,
// or the default version of the module of the current instance.
// If instance is empty, ModuleHostname returns the load-balancing hostname.
func ModuleHostname(c context.Context, module, version, instance string) (string, error) {
req := &modpb.GetHostnameRequest{}
if module != "" {
req.Module = &module
}
if version != "" {
req.Version = &version
}
if instance != "" {
req.Instance = &instance
}
res := &modpb.GetHostnameResponse{}
if err := internal.Call(c, "modules", "GetHostname", req, res); err != nil {
return "", err
}
return *res.Hostname, nil
}
// VersionID returns the version ID for the current application.
// It will be of the form "X.Y", where X is specified in app.yaml,
// and Y is a number generated when each version of the app is uploaded.
// It does not include a module name.
func VersionID(c context.Context) string { return internal.VersionID(c) }
// InstanceID returns a mostly-unique identifier for this instance.
func InstanceID() string { return internal.InstanceID() }
// Datacenter returns an identifier for the datacenter that the instance is running in.
func Datacenter(c context.Context) string { return internal.Datacenter(c) }
// ServerSoftware returns the App Engine release version.
// In production, it looks like "Google App Engine/X.Y.Z".
// In the development appserver, it looks like "Development/X.Y".
func ServerSoftware() string { return internal.ServerSoftware() }
// RequestID returns a string that uniquely identifies the request.
func RequestID(c context.Context) string { return internal.RequestID(c) }
// AccessToken generates an OAuth2 access token for the specified scopes on
// behalf of service account of this application. This token will expire after
// the returned time.
func AccessToken(c context.Context, scopes ...string) (token string, expiry time.Time, err error) {
req := &pb.GetAccessTokenRequest{Scope: scopes}
res := &pb.GetAccessTokenResponse{}
err = internal.Call(c, "app_identity_service", "GetAccessToken", req, res)
if err != nil {
return "", time.Time{}, err
}
return res.GetAccessToken(), time.Unix(res.GetExpirationTime(), 0), nil
}
// Certificate represents a public certificate for the app.
type Certificate struct {
KeyName string
Data []byte // PEM-encoded X.509 certificate
}
// PublicCertificates retrieves the public certificates for the app.
// They can be used to verify a signature returned by SignBytes.
func PublicCertificates(c context.Context) ([]Certificate, error) {
req := &pb.GetPublicCertificateForAppRequest{}
res := &pb.GetPublicCertificateForAppResponse{}
if err := internal.Call(c, "app_identity_service", "GetPublicCertificatesForApp", req, res); err != nil {
return nil, err
}
var cs []Certificate
for _, pc := range res.PublicCertificateList {
cs = append(cs, Certificate{
KeyName: pc.GetKeyName(),
Data: []byte(pc.GetX509CertificatePem()),
})
}
return cs, nil
}
// ServiceAccount returns a string representing the service account name, in
// the form of an email address (typically app_id@appspot.gserviceaccount.com).
func ServiceAccount(c context.Context) (string, error) {
req := &pb.GetServiceAccountNameRequest{}
res := &pb.GetServiceAccountNameResponse{}
err := internal.Call(c, "app_identity_service", "GetServiceAccountName", req, res)
if err != nil {
return "", err
}
return res.GetServiceAccountName(), err
}
// SignBytes signs bytes using a private key unique to your application.
func SignBytes(c context.Context, bytes []byte) (keyName string, signature []byte, err error) {
req := &pb.SignForAppRequest{BytesToSign: bytes}
res := &pb.SignForAppResponse{}
if err := internal.Call(c, "app_identity_service", "SignForApp", req, res); err != nil {
return "", nil, err
}
return res.GetKeyName(), res.GetSignatureBytes(), nil
}
func init() {
internal.RegisterErrorCodeMap("app_identity_service", pb.AppIdentityServiceError_ErrorCode_name)
internal.RegisterErrorCodeMap("modules", modpb.ModulesServiceError_ErrorCode_name)
}

View File

@@ -1,9 +0,0 @@
// Copyright 2014 Google Inc. All rights reserved.
// Use of this source code is governed by the Apache 2.0
// license that can be found in the LICENSE file.
// +build race
package internal
func init() { raceDetector = true }

View File

@@ -1,467 +0,0 @@
// Copyright 2014 Google Inc. All rights reserved.
// Use of this source code is governed by the Apache 2.0
// license that can be found in the LICENSE file.
// +build !appengine
package internal
import (
"bufio"
"bytes"
"fmt"
"io"
"io/ioutil"
"net/http"
"net/http/httptest"
"net/url"
"os"
"os/exec"
"strings"
"sync/atomic"
"testing"
"time"
"github.com/golang/protobuf/proto"
netcontext "golang.org/x/net/context"
basepb "google.golang.org/appengine/internal/base"
remotepb "google.golang.org/appengine/internal/remote_api"
)
const testTicketHeader = "X-Magic-Ticket-Header"
func init() {
ticketHeader = testTicketHeader
}
type fakeAPIHandler struct {
hang chan int // used for RunSlowly RPC
LogFlushes int32 // atomic
}
func (f *fakeAPIHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) {
writeResponse := func(res *remotepb.Response) {
hresBody, err := proto.Marshal(res)
if err != nil {
http.Error(w, fmt.Sprintf("Failed encoding API response: %v", err), 500)
return
}
w.Write(hresBody)
}
if r.URL.Path != "/rpc_http" {
http.NotFound(w, r)
return
}
hreqBody, err := ioutil.ReadAll(r.Body)
if err != nil {
http.Error(w, fmt.Sprintf("Bad body: %v", err), 500)
return
}
apiReq := &remotepb.Request{}
if err := proto.Unmarshal(hreqBody, apiReq); err != nil {
http.Error(w, fmt.Sprintf("Bad encoded API request: %v", err), 500)
return
}
if *apiReq.RequestId != "s3cr3t" {
writeResponse(&remotepb.Response{
RpcError: &remotepb.RpcError{
Code: proto.Int32(int32(remotepb.RpcError_SECURITY_VIOLATION)),
Detail: proto.String("bad security ticket"),
},
})
return
}
if got, want := r.Header.Get(dapperHeader), "trace-001"; got != want {
writeResponse(&remotepb.Response{
RpcError: &remotepb.RpcError{
Code: proto.Int32(int32(remotepb.RpcError_BAD_REQUEST)),
Detail: proto.String(fmt.Sprintf("trace info = %q, want %q", got, want)),
},
})
return
}
service, method := *apiReq.ServiceName, *apiReq.Method
var resOut proto.Message
if service == "actordb" && method == "LookupActor" {
req := &basepb.StringProto{}
res := &basepb.StringProto{}
if err := proto.Unmarshal(apiReq.Request, req); err != nil {
http.Error(w, fmt.Sprintf("Bad encoded request: %v", err), 500)
return
}
if *req.Value == "Doctor Who" {
res.Value = proto.String("David Tennant")
}
resOut = res
}
if service == "errors" {
switch method {
case "Non200":
http.Error(w, "I'm a little teapot.", 418)
return
case "ShortResponse":
w.Header().Set("Content-Length", "100")
w.Write([]byte("way too short"))
return
case "OverQuota":
writeResponse(&remotepb.Response{
RpcError: &remotepb.RpcError{
Code: proto.Int32(int32(remotepb.RpcError_OVER_QUOTA)),
Detail: proto.String("you are hogging the resources!"),
},
})
return
case "RunSlowly":
// TestAPICallRPCFailure creates f.hang, but does not strobe it
// until Call returns with remotepb.RpcError_CANCELLED.
// This is here to force a happens-before relationship between
// the httptest server handler and shutdown.
<-f.hang
resOut = &basepb.VoidProto{}
}
}
if service == "logservice" && method == "Flush" {
// Pretend log flushing is slow.
time.Sleep(50 * time.Millisecond)
atomic.AddInt32(&f.LogFlushes, 1)
resOut = &basepb.VoidProto{}
}
encOut, err := proto.Marshal(resOut)
if err != nil {
http.Error(w, fmt.Sprintf("Failed encoding response: %v", err), 500)
return
}
writeResponse(&remotepb.Response{
Response: encOut,
})
}
func setup() (f *fakeAPIHandler, c *context, cleanup func()) {
f = &fakeAPIHandler{}
srv := httptest.NewServer(f)
u, err := url.Parse(srv.URL + apiPath)
if err != nil {
panic(fmt.Sprintf("url.Parse(%q): %v", srv.URL+apiPath, err))
}
return f, &context{
req: &http.Request{
Header: http.Header{
ticketHeader: []string{"s3cr3t"},
dapperHeader: []string{"trace-001"},
},
},
apiURL: u,
}, srv.Close
}
func TestAPICall(t *testing.T) {
_, c, cleanup := setup()
defer cleanup()
req := &basepb.StringProto{
Value: proto.String("Doctor Who"),
}
res := &basepb.StringProto{}
err := Call(toContext(c), "actordb", "LookupActor", req, res)
if err != nil {
t.Fatalf("API call failed: %v", err)
}
if got, want := *res.Value, "David Tennant"; got != want {
t.Errorf("Response is %q, want %q", got, want)
}
}
func TestAPICallRPCFailure(t *testing.T) {
f, c, cleanup := setup()
defer cleanup()
testCases := []struct {
method string
code remotepb.RpcError_ErrorCode
}{
{"Non200", remotepb.RpcError_UNKNOWN},
{"ShortResponse", remotepb.RpcError_UNKNOWN},
{"OverQuota", remotepb.RpcError_OVER_QUOTA},
{"RunSlowly", remotepb.RpcError_CANCELLED},
}
f.hang = make(chan int) // only for RunSlowly
for _, tc := range testCases {
ctx, _ := netcontext.WithTimeout(toContext(c), 100*time.Millisecond)
err := Call(ctx, "errors", tc.method, &basepb.VoidProto{}, &basepb.VoidProto{})
ce, ok := err.(*CallError)
if !ok {
t.Errorf("%s: API call error is %T (%v), want *CallError", tc.method, err, err)
continue
}
if ce.Code != int32(tc.code) {
t.Errorf("%s: ce.Code = %d, want %d", tc.method, ce.Code, tc.code)
}
if tc.method == "RunSlowly" {
f.hang <- 1 // release the HTTP handler
}
}
}
func TestAPICallDialFailure(t *testing.T) {
// See what happens if the API host is unresponsive.
// This should time out quickly, not hang forever.
_, c, cleanup := setup()
defer cleanup()
// Reset the URL to the production address so that dialing fails.
c.apiURL = apiURL()
start := time.Now()
err := Call(toContext(c), "foo", "bar", &basepb.VoidProto{}, &basepb.VoidProto{})
const max = 1 * time.Second
if taken := time.Since(start); taken > max {
t.Errorf("Dial hang took too long: %v > %v", taken, max)
}
if err == nil {
t.Error("Call did not fail")
}
}
func TestDelayedLogFlushing(t *testing.T) {
f, c, cleanup := setup()
defer cleanup()
http.HandleFunc("/quick_log", func(w http.ResponseWriter, r *http.Request) {
logC := WithContext(netcontext.Background(), r)
fromContext(logC).apiURL = c.apiURL // Otherwise it will try to use the default URL.
Logf(logC, 1, "It's a lovely day.")
w.WriteHeader(200)
w.Write(make([]byte, 100<<10)) // write 100 KB to force HTTP flush
})
r := &http.Request{
Method: "GET",
URL: &url.URL{
Scheme: "http",
Path: "/quick_log",
},
Header: c.req.Header,
Body: ioutil.NopCloser(bytes.NewReader(nil)),
}
w := httptest.NewRecorder()
// Check that log flushing does not hold up the HTTP response.
start := time.Now()
handleHTTP(w, r)
if d := time.Since(start); d > 10*time.Millisecond {
t.Errorf("handleHTTP took %v, want under 10ms", d)
}
const hdr = "X-AppEngine-Log-Flush-Count"
if h := w.HeaderMap.Get(hdr); h != "1" {
t.Errorf("%s header = %q, want %q", hdr, h, "1")
}
if f := atomic.LoadInt32(&f.LogFlushes); f != 0 {
t.Errorf("After HTTP response: f.LogFlushes = %d, want 0", f)
}
// Check that the log flush eventually comes in.
time.Sleep(100 * time.Millisecond)
if f := atomic.LoadInt32(&f.LogFlushes); f != 1 {
t.Errorf("After 100ms: f.LogFlushes = %d, want 1", f)
}
}
func TestRemoteAddr(t *testing.T) {
var addr string
http.HandleFunc("/remote_addr", func(w http.ResponseWriter, r *http.Request) {
addr = r.RemoteAddr
})
testCases := []struct {
headers http.Header
addr string
}{
{http.Header{"X-Appengine-User-Ip": []string{"10.5.2.1"}}, "10.5.2.1:80"},
{http.Header{"X-Appengine-Remote-Addr": []string{"1.2.3.4"}}, "1.2.3.4:80"},
{http.Header{"X-Appengine-Remote-Addr": []string{"1.2.3.4:8080"}}, "1.2.3.4:8080"},
{
http.Header{"X-Appengine-Remote-Addr": []string{"2401:fa00:9:1:7646:a0ff:fe90:ca66"}},
"[2401:fa00:9:1:7646:a0ff:fe90:ca66]:80",
},
{
http.Header{"X-Appengine-Remote-Addr": []string{"[::1]:http"}},
"[::1]:http",
},
{http.Header{}, "127.0.0.1:80"},
}
for _, tc := range testCases {
r := &http.Request{
Method: "GET",
URL: &url.URL{Scheme: "http", Path: "/remote_addr"},
Header: tc.headers,
Body: ioutil.NopCloser(bytes.NewReader(nil)),
}
handleHTTP(httptest.NewRecorder(), r)
if addr != tc.addr {
t.Errorf("Header %v, got %q, want %q", tc.headers, addr, tc.addr)
}
}
}
func TestPanickingHandler(t *testing.T) {
http.HandleFunc("/panic", func(http.ResponseWriter, *http.Request) {
panic("whoops!")
})
r := &http.Request{
Method: "GET",
URL: &url.URL{Scheme: "http", Path: "/panic"},
Body: ioutil.NopCloser(bytes.NewReader(nil)),
}
rec := httptest.NewRecorder()
handleHTTP(rec, r)
if rec.Code != 500 {
t.Errorf("Panicking handler returned HTTP %d, want HTTP %d", rec.Code, 500)
}
}
var raceDetector = false
func TestAPICallAllocations(t *testing.T) {
if raceDetector {
t.Skip("not running under race detector")
}
// Run the test API server in a subprocess so we aren't counting its allocations.
u, cleanup := launchHelperProcess(t)
defer cleanup()
c := &context{
req: &http.Request{
Header: http.Header{
ticketHeader: []string{"s3cr3t"},
dapperHeader: []string{"trace-001"},
},
},
apiURL: u,
}
req := &basepb.StringProto{
Value: proto.String("Doctor Who"),
}
res := &basepb.StringProto{}
var apiErr error
avg := testing.AllocsPerRun(100, func() {
ctx, _ := netcontext.WithTimeout(toContext(c), 100*time.Millisecond)
if err := Call(ctx, "actordb", "LookupActor", req, res); err != nil && apiErr == nil {
apiErr = err // get the first error only
}
})
if apiErr != nil {
t.Errorf("API call failed: %v", apiErr)
}
// Lots of room for improvement...
// TODO(djd): Reduce maximum to 85 once the App Engine SDK is based on 1.6.
const min, max float64 = 70, 90
if avg < min || max < avg {
t.Errorf("Allocations per API call = %g, want in [%g,%g]", avg, min, max)
}
}
func launchHelperProcess(t *testing.T) (apiURL *url.URL, cleanup func()) {
cmd := exec.Command(os.Args[0], "-test.run=TestHelperProcess")
cmd.Env = []string{"GO_WANT_HELPER_PROCESS=1"}
stdin, err := cmd.StdinPipe()
if err != nil {
t.Fatalf("StdinPipe: %v", err)
}
stdout, err := cmd.StdoutPipe()
if err != nil {
t.Fatalf("StdoutPipe: %v", err)
}
if err := cmd.Start(); err != nil {
t.Fatalf("Starting helper process: %v", err)
}
scan := bufio.NewScanner(stdout)
var u *url.URL
for scan.Scan() {
line := scan.Text()
if hp := strings.TrimPrefix(line, helperProcessMagic); hp != line {
var err error
u, err = url.Parse(hp)
if err != nil {
t.Fatalf("Failed to parse %q: %v", hp, err)
}
break
}
}
if err := scan.Err(); err != nil {
t.Fatalf("Scanning helper process stdout: %v", err)
}
if u == nil {
t.Fatal("Helper process never reported")
}
return u, func() {
stdin.Close()
if err := cmd.Wait(); err != nil {
t.Errorf("Helper process did not exit cleanly: %v", err)
}
}
}
const helperProcessMagic = "A lovely helper process is listening at "
// This isn't a real test. It's used as a helper process.
func TestHelperProcess(*testing.T) {
if os.Getenv("GO_WANT_HELPER_PROCESS") != "1" {
return
}
defer os.Exit(0)
f := &fakeAPIHandler{}
srv := httptest.NewServer(f)
defer srv.Close()
fmt.Println(helperProcessMagic + srv.URL + apiPath)
// Wait for stdin to be closed.
io.Copy(ioutil.Discard, os.Stdin)
}
func TestBackgroundContext(t *testing.T) {
environ := []struct {
key, value string
}{
{"GAE_LONG_APP_ID", "my-app-id"},
{"GAE_MINOR_VERSION", "067924799508853122"},
{"GAE_MODULE_INSTANCE", "0"},
{"GAE_MODULE_NAME", "default"},
{"GAE_MODULE_VERSION", "20150612t184001"},
}
for _, v := range environ {
old := os.Getenv(v.key)
os.Setenv(v.key, v.value)
v.value = old
}
defer func() { // Restore old environment after the test completes.
for _, v := range environ {
if v.value == "" {
os.Unsetenv(v.key)
continue
}
os.Setenv(v.key, v.value)
}
}()
ctx, key := fromContext(BackgroundContext()), "X-Magic-Ticket-Header"
if g, w := ctx.req.Header.Get(key), "my-app-id/default.20150612t184001.0"; g != w {
t.Errorf("%v = %q, want %q", key, g, w)
}
// Check that using the background context doesn't panic.
req := &basepb.StringProto{
Value: proto.String("Doctor Who"),
}
res := &basepb.StringProto{}
Call(BackgroundContext(), "actordb", "LookupActor", req, res) // expected to fail
}

View File

@@ -1,34 +0,0 @@
// Copyright 2011 Google Inc. All Rights Reserved.
// Use of this source code is governed by the Apache 2.0
// license that can be found in the LICENSE file.
package internal
import (
"testing"
)
func TestAppIDParsing(t *testing.T) {
testCases := []struct {
in string
partition, domain, displayID string
}{
{"simple-app-id", "", "", "simple-app-id"},
{"domain.com:domain-app-id", "", "domain.com", "domain-app-id"},
{"part~partition-app-id", "part", "", "partition-app-id"},
{"part~domain.com:display", "part", "domain.com", "display"},
}
for _, tc := range testCases {
part, dom, dis := parseFullAppID(tc.in)
if part != tc.partition {
t.Errorf("partition of %q: got %q, want %q", tc.in, part, tc.partition)
}
if dom != tc.domain {
t.Errorf("domain of %q: got %q, want %q", tc.in, dom, tc.domain)
}
if dis != tc.displayID {
t.Errorf("displayID of %q: got %q, want %q", tc.in, dis, tc.displayID)
}
}
}

View File

@@ -1,33 +0,0 @@
// Built-in base types for API calls. Primarily useful as return types.
syntax = "proto2";
option go_package = "base";
package appengine.base;
message StringProto {
required string value = 1;
}
message Integer32Proto {
required int32 value = 1;
}
message Integer64Proto {
required int64 value = 1;
}
message BoolProto {
required bool value = 1;
}
message DoubleProto {
required double value = 1;
}
message BytesProto {
required bytes value = 1 [ctype=CORD];
}
message VoidProto {
}

View File

@@ -1,541 +0,0 @@
syntax = "proto2";
option go_package = "datastore";
package appengine;
message Action{}
message PropertyValue {
optional int64 int64Value = 1;
optional bool booleanValue = 2;
optional string stringValue = 3;
optional double doubleValue = 4;
optional group PointValue = 5 {
required double x = 6;
required double y = 7;
}
optional group UserValue = 8 {
required string email = 9;
required string auth_domain = 10;
optional string nickname = 11;
optional string federated_identity = 21;
optional string federated_provider = 22;
}
optional group ReferenceValue = 12 {
required string app = 13;
optional string name_space = 20;
repeated group PathElement = 14 {
required string type = 15;
optional int64 id = 16;
optional string name = 17;
}
}
}
message Property {
enum Meaning {
NO_MEANING = 0;
BLOB = 14;
TEXT = 15;
BYTESTRING = 16;
ATOM_CATEGORY = 1;
ATOM_LINK = 2;
ATOM_TITLE = 3;
ATOM_CONTENT = 4;
ATOM_SUMMARY = 5;
ATOM_AUTHOR = 6;
GD_WHEN = 7;
GD_EMAIL = 8;
GEORSS_POINT = 9;
GD_IM = 10;
GD_PHONENUMBER = 11;
GD_POSTALADDRESS = 12;
GD_RATING = 13;
BLOBKEY = 17;
ENTITY_PROTO = 19;
INDEX_VALUE = 18;
};
optional Meaning meaning = 1 [default = NO_MEANING];
optional string meaning_uri = 2;
required string name = 3;
required PropertyValue value = 5;
required bool multiple = 4;
optional bool searchable = 6 [default=false];
enum FtsTokenizationOption {
HTML = 1;
ATOM = 2;
}
optional FtsTokenizationOption fts_tokenization_option = 8;
optional string locale = 9 [default = "en"];
}
message Path {
repeated group Element = 1 {
required string type = 2;
optional int64 id = 3;
optional string name = 4;
}
}
message Reference {
required string app = 13;
optional string name_space = 20;
required Path path = 14;
}
message User {
required string email = 1;
required string auth_domain = 2;
optional string nickname = 3;
optional string federated_identity = 6;
optional string federated_provider = 7;
}
message EntityProto {
required Reference key = 13;
required Path entity_group = 16;
optional User owner = 17;
enum Kind {
GD_CONTACT = 1;
GD_EVENT = 2;
GD_MESSAGE = 3;
}
optional Kind kind = 4;
optional string kind_uri = 5;
repeated Property property = 14;
repeated Property raw_property = 15;
optional int32 rank = 18;
}
message CompositeProperty {
required int64 index_id = 1;
repeated string value = 2;
}
message Index {
required string entity_type = 1;
required bool ancestor = 5;
repeated group Property = 2 {
required string name = 3;
enum Direction {
ASCENDING = 1;
DESCENDING = 2;
}
optional Direction direction = 4 [default = ASCENDING];
}
}
message CompositeIndex {
required string app_id = 1;
required int64 id = 2;
required Index definition = 3;
enum State {
WRITE_ONLY = 1;
READ_WRITE = 2;
DELETED = 3;
ERROR = 4;
}
required State state = 4;
optional bool only_use_if_required = 6 [default = false];
}
message IndexPostfix {
message IndexValue {
required string property_name = 1;
required PropertyValue value = 2;
}
repeated IndexValue index_value = 1;
optional Reference key = 2;
optional bool before = 3 [default=true];
}
message IndexPosition {
optional string key = 1;
optional bool before = 2 [default=true];
}
message Snapshot {
enum Status {
INACTIVE = 0;
ACTIVE = 1;
}
required int64 ts = 1;
}
message InternalHeader {
optional string qos = 1;
}
message Transaction {
optional InternalHeader header = 4;
required fixed64 handle = 1;
required string app = 2;
optional bool mark_changes = 3 [default = false];
}
message Query {
optional InternalHeader header = 39;
required string app = 1;
optional string name_space = 29;
optional string kind = 3;
optional Reference ancestor = 17;
repeated group Filter = 4 {
enum Operator {
LESS_THAN = 1;
LESS_THAN_OR_EQUAL = 2;
GREATER_THAN = 3;
GREATER_THAN_OR_EQUAL = 4;
EQUAL = 5;
IN = 6;
EXISTS = 7;
}
required Operator op = 6;
repeated Property property = 14;
}
optional string search_query = 8;
repeated group Order = 9 {
enum Direction {
ASCENDING = 1;
DESCENDING = 2;
}
required string property = 10;
optional Direction direction = 11 [default = ASCENDING];
}
enum Hint {
ORDER_FIRST = 1;
ANCESTOR_FIRST = 2;
FILTER_FIRST = 3;
}
optional Hint hint = 18;
optional int32 count = 23;
optional int32 offset = 12 [default = 0];
optional int32 limit = 16;
optional CompiledCursor compiled_cursor = 30;
optional CompiledCursor end_compiled_cursor = 31;
repeated CompositeIndex composite_index = 19;
optional bool require_perfect_plan = 20 [default = false];
optional bool keys_only = 21 [default = false];
optional Transaction transaction = 22;
optional bool compile = 25 [default = false];
optional int64 failover_ms = 26;
optional bool strong = 32;
repeated string property_name = 33;
repeated string group_by_property_name = 34;
optional bool distinct = 24;
optional int64 min_safe_time_seconds = 35;
repeated string safe_replica_name = 36;
optional bool persist_offset = 37 [default=false];
}
message CompiledQuery {
required group PrimaryScan = 1 {
optional string index_name = 2;
optional string start_key = 3;
optional bool start_inclusive = 4;
optional string end_key = 5;
optional bool end_inclusive = 6;
repeated string start_postfix_value = 22;
repeated string end_postfix_value = 23;
optional int64 end_unapplied_log_timestamp_us = 19;
}
repeated group MergeJoinScan = 7 {
required string index_name = 8;
repeated string prefix_value = 9;
optional bool value_prefix = 20 [default=false];
}
optional Index index_def = 21;
optional int32 offset = 10 [default = 0];
optional int32 limit = 11;
required bool keys_only = 12;
repeated string property_name = 24;
optional int32 distinct_infix_size = 25;
optional group EntityFilter = 13 {
optional bool distinct = 14 [default=false];
optional string kind = 17;
optional Reference ancestor = 18;
}
}
message CompiledCursor {
optional group Position = 2 {
optional string start_key = 27;
repeated group IndexValue = 29 {
optional string property = 30;
required PropertyValue value = 31;
}
optional Reference key = 32;
optional bool start_inclusive = 28 [default=true];
}
}
message Cursor {
required fixed64 cursor = 1;
optional string app = 2;
}
message Error {
enum ErrorCode {
BAD_REQUEST = 1;
CONCURRENT_TRANSACTION = 2;
INTERNAL_ERROR = 3;
NEED_INDEX = 4;
TIMEOUT = 5;
PERMISSION_DENIED = 6;
BIGTABLE_ERROR = 7;
COMMITTED_BUT_STILL_APPLYING = 8;
CAPABILITY_DISABLED = 9;
TRY_ALTERNATE_BACKEND = 10;
SAFE_TIME_TOO_OLD = 11;
}
}
message Cost {
optional int32 index_writes = 1;
optional int32 index_write_bytes = 2;
optional int32 entity_writes = 3;
optional int32 entity_write_bytes = 4;
optional group CommitCost = 5 {
optional int32 requested_entity_puts = 6;
optional int32 requested_entity_deletes = 7;
};
optional int32 approximate_storage_delta = 8;
optional int32 id_sequence_updates = 9;
}
message GetRequest {
optional InternalHeader header = 6;
repeated Reference key = 1;
optional Transaction transaction = 2;
optional int64 failover_ms = 3;
optional bool strong = 4;
optional bool allow_deferred = 5 [default=false];
}
message GetResponse {
repeated group Entity = 1 {
optional EntityProto entity = 2;
optional Reference key = 4;
optional int64 version = 3;
}
repeated Reference deferred = 5;
optional bool in_order = 6 [default=true];
}
message PutRequest {
optional InternalHeader header = 11;
repeated EntityProto entity = 1;
optional Transaction transaction = 2;
repeated CompositeIndex composite_index = 3;
optional bool trusted = 4 [default = false];
optional bool force = 7 [default = false];
optional bool mark_changes = 8 [default = false];
repeated Snapshot snapshot = 9;
enum AutoIdPolicy {
CURRENT = 0;
SEQUENTIAL = 1;
}
optional AutoIdPolicy auto_id_policy = 10 [default = CURRENT];
}
message PutResponse {
repeated Reference key = 1;
optional Cost cost = 2;
repeated int64 version = 3;
}
message TouchRequest {
optional InternalHeader header = 10;
repeated Reference key = 1;
repeated CompositeIndex composite_index = 2;
optional bool force = 3 [default = false];
repeated Snapshot snapshot = 9;
}
message TouchResponse {
optional Cost cost = 1;
}
message DeleteRequest {
optional InternalHeader header = 10;
repeated Reference key = 6;
optional Transaction transaction = 5;
optional bool trusted = 4 [default = false];
optional bool force = 7 [default = false];
optional bool mark_changes = 8 [default = false];
repeated Snapshot snapshot = 9;
}
message DeleteResponse {
optional Cost cost = 1;
repeated int64 version = 3;
}
message NextRequest {
optional InternalHeader header = 5;
required Cursor cursor = 1;
optional int32 count = 2;
optional int32 offset = 4 [default = 0];
optional bool compile = 3 [default = false];
}
message QueryResult {
optional Cursor cursor = 1;
repeated EntityProto result = 2;
optional int32 skipped_results = 7;
required bool more_results = 3;
optional bool keys_only = 4;
optional bool index_only = 9;
optional bool small_ops = 10;
optional CompiledQuery compiled_query = 5;
optional CompiledCursor compiled_cursor = 6;
repeated CompositeIndex index = 8;
repeated int64 version = 11;
}
message AllocateIdsRequest {
optional InternalHeader header = 4;
optional Reference model_key = 1;
optional int64 size = 2;
optional int64 max = 3;
repeated Reference reserve = 5;
}
message AllocateIdsResponse {
required int64 start = 1;
required int64 end = 2;
optional Cost cost = 3;
}
message CompositeIndices {
repeated CompositeIndex index = 1;
}
message AddActionsRequest {
optional InternalHeader header = 3;
required Transaction transaction = 1;
repeated Action action = 2;
}
message AddActionsResponse {
}
message BeginTransactionRequest {
optional InternalHeader header = 3;
required string app = 1;
optional bool allow_multiple_eg = 2 [default = false];
}
message CommitResponse {
optional Cost cost = 1;
repeated group Version = 3 {
required Reference root_entity_key = 4;
required int64 version = 5;
}
}

View File

@@ -1,58 +0,0 @@
// Copyright 2014 Google Inc. All rights reserved.
// Use of this source code is governed by the Apache 2.0
// license that can be found in the LICENSE file.
package internal
import (
"io"
"io/ioutil"
"net/http"
"net/http/httptest"
"testing"
)
func TestInstallingHealthChecker(t *testing.T) {
try := func(desc string, mux *http.ServeMux, wantCode int, wantBody string) {
installHealthChecker(mux)
srv := httptest.NewServer(mux)
defer srv.Close()
resp, err := http.Get(srv.URL + "/_ah/health")
if err != nil {
t.Errorf("%s: http.Get: %v", desc, err)
return
}
defer resp.Body.Close()
body, err := ioutil.ReadAll(resp.Body)
if err != nil {
t.Errorf("%s: reading body: %v", desc, err)
return
}
if resp.StatusCode != wantCode {
t.Errorf("%s: got HTTP %d, want %d", desc, resp.StatusCode, wantCode)
return
}
if wantBody != "" && string(body) != wantBody {
t.Errorf("%s: got HTTP body %q, want %q", desc, body, wantBody)
return
}
}
// If there's no handlers, or only a root handler, a health checker should be installed.
try("empty mux", http.NewServeMux(), 200, "ok")
mux := http.NewServeMux()
mux.HandleFunc("/", func(w http.ResponseWriter, r *http.Request) {
io.WriteString(w, "root handler")
})
try("mux with root handler", mux, 200, "ok")
// If there's a custom health check handler, one should not be installed.
mux = http.NewServeMux()
mux.HandleFunc("/_ah/health", func(w http.ResponseWriter, r *http.Request) {
w.WriteHeader(418)
io.WriteString(w, "I'm short and stout!")
})
try("mux with custom health checker", mux, 418, "I'm short and stout!")
}

View File

@@ -1,150 +0,0 @@
syntax = "proto2";
option go_package = "log";
package appengine;
message LogServiceError {
enum ErrorCode {
OK = 0;
INVALID_REQUEST = 1;
STORAGE_ERROR = 2;
}
}
message UserAppLogLine {
required int64 timestamp_usec = 1;
required int64 level = 2;
required string message = 3;
}
message UserAppLogGroup {
repeated UserAppLogLine log_line = 2;
}
message FlushRequest {
optional bytes logs = 1;
}
message SetStatusRequest {
required string status = 1;
}
message LogOffset {
optional bytes request_id = 1;
}
message LogLine {
required int64 time = 1;
required int32 level = 2;
required string log_message = 3;
}
message RequestLog {
required string app_id = 1;
optional string module_id = 37 [default="default"];
required string version_id = 2;
required bytes request_id = 3;
optional LogOffset offset = 35;
required string ip = 4;
optional string nickname = 5;
required int64 start_time = 6;
required int64 end_time = 7;
required int64 latency = 8;
required int64 mcycles = 9;
required string method = 10;
required string resource = 11;
required string http_version = 12;
required int32 status = 13;
required int64 response_size = 14;
optional string referrer = 15;
optional string user_agent = 16;
required string url_map_entry = 17;
required string combined = 18;
optional int64 api_mcycles = 19;
optional string host = 20;
optional double cost = 21;
optional string task_queue_name = 22;
optional string task_name = 23;
optional bool was_loading_request = 24;
optional int64 pending_time = 25;
optional int32 replica_index = 26 [default = -1];
optional bool finished = 27 [default = true];
optional bytes clone_key = 28;
repeated LogLine line = 29;
optional bool lines_incomplete = 36;
optional bytes app_engine_release = 38;
optional int32 exit_reason = 30;
optional bool was_throttled_for_time = 31;
optional bool was_throttled_for_requests = 32;
optional int64 throttled_time = 33;
optional bytes server_name = 34;
}
message LogModuleVersion {
optional string module_id = 1 [default="default"];
optional string version_id = 2;
}
message LogReadRequest {
required string app_id = 1;
repeated string version_id = 2;
repeated LogModuleVersion module_version = 19;
optional int64 start_time = 3;
optional int64 end_time = 4;
optional LogOffset offset = 5;
repeated bytes request_id = 6;
optional int32 minimum_log_level = 7;
optional bool include_incomplete = 8;
optional int64 count = 9;
optional string combined_log_regex = 14;
optional string host_regex = 15;
optional int32 replica_index = 16;
optional bool include_app_logs = 10;
optional int32 app_logs_per_request = 17;
optional bool include_host = 11;
optional bool include_all = 12;
optional bool cache_iterator = 13;
optional int32 num_shards = 18;
}
message LogReadResponse {
repeated RequestLog log = 1;
optional LogOffset offset = 2;
optional int64 last_end_time = 3;
}
message LogUsageRecord {
optional string version_id = 1;
optional int32 start_time = 2;
optional int32 end_time = 3;
optional int64 count = 4;
optional int64 total_size = 5;
optional int32 records = 6;
}
message LogUsageRequest {
required string app_id = 1;
repeated string version_id = 2;
optional int32 start_time = 3;
optional int32 end_time = 4;
optional uint32 resolution_hours = 5 [default = 1];
optional bool combine_versions = 6;
optional int32 usage_version = 7;
optional bool versions_only = 8;
}
message LogUsageResponse {
repeated LogUsageRecord usage = 1;
optional LogUsageRecord summary = 2;
}

View File

@@ -1,58 +0,0 @@
// Copyright 2014 Google Inc. All rights reserved.
// Use of this source code is governed by the Apache 2.0
// license that can be found in the LICENSE file.
// +build !appengine
package internal
import (
"sync"
"testing"
"time"
netcontext "golang.org/x/net/context"
basepb "google.golang.org/appengine/internal/base"
)
func TestDialLimit(t *testing.T) {
// Fill up semaphore with false acquisitions to permit only two TCP connections at a time.
// We don't replace limitSem because that results in a data race when net/http lazily closes connections.
nFake := cap(limitSem) - 2
for i := 0; i < nFake; i++ {
limitSem <- 1
}
defer func() {
for i := 0; i < nFake; i++ {
<-limitSem
}
}()
f, c, cleanup := setup() // setup is in api_test.go
defer cleanup()
f.hang = make(chan int)
// If we make two RunSlowly RPCs (which will wait for f.hang to be strobed),
// then the simple Non200 RPC should hang.
var wg sync.WaitGroup
wg.Add(2)
for i := 0; i < 2; i++ {
go func() {
defer wg.Done()
Call(toContext(c), "errors", "RunSlowly", &basepb.VoidProto{}, &basepb.VoidProto{})
}()
}
time.Sleep(50 * time.Millisecond) // let those two RPCs start
ctx, _ := netcontext.WithTimeout(toContext(c), 50*time.Millisecond)
err := Call(ctx, "errors", "Non200", &basepb.VoidProto{}, &basepb.VoidProto{})
if err != errTimeout {
t.Errorf("Non200 RPC returned with err %v, want errTimeout", err)
}
// Drain the two RunSlowly calls.
f.hang <- 1
f.hang <- 1
wg.Wait()
}

View File

@@ -1,40 +0,0 @@
#!/bin/bash -e
#
# This script rebuilds the generated code for the protocol buffers.
# To run this you will need protoc and goprotobuf installed;
# see https://github.com/golang/protobuf for instructions.
PKG=google.golang.org/appengine
function die() {
echo 1>&2 $*
exit 1
}
# Sanity check that the right tools are accessible.
for tool in go protoc protoc-gen-go; do
q=$(which $tool) || die "didn't find $tool"
echo 1>&2 "$tool: $q"
done
echo -n 1>&2 "finding package dir... "
pkgdir=$(go list -f '{{.Dir}}' $PKG)
echo 1>&2 $pkgdir
base=$(echo $pkgdir | sed "s,/$PKG\$,,")
echo 1>&2 "base: $base"
cd $base
# Run protoc once per package.
for dir in $(find $PKG/internal -name '*.proto' | xargs dirname | sort | uniq); do
echo 1>&2 "* $dir"
protoc --go_out=. $dir/*.proto
done
for f in $(find $PKG/internal -name '*.pb.go'); do
# Remove proto.RegisterEnum calls.
# These cause duplicate registration panics when these packages
# are used on classic App Engine. proto.RegisterEnum only affects
# parsing the text format; we don't care about that.
# https://code.google.com/p/googleappengine/issues/detail?id=11670#c17
sed -i '/proto.RegisterEnum/d' $f
done

View File

@@ -1,44 +0,0 @@
syntax = "proto2";
option go_package = "remote_api";
package remote_api;
message Request {
required string service_name = 2;
required string method = 3;
required bytes request = 4;
optional string request_id = 5;
}
message ApplicationError {
required int32 code = 1;
required string detail = 2;
}
message RpcError {
enum ErrorCode {
UNKNOWN = 0;
CALL_NOT_FOUND = 1;
PARSE_ERROR = 2;
SECURITY_VIOLATION = 3;
OVER_QUOTA = 4;
REQUEST_TOO_LARGE = 5;
CAPABILITY_DISABLED = 6;
FEATURE_DISABLED = 7;
BAD_REQUEST = 8;
RESPONSE_TOO_LARGE = 9;
CANCELLED = 10;
REPLAY_ERROR = 11;
DEADLINE_EXCEEDED = 12;
}
required int32 code = 1;
optional string detail = 2;
}
message Response {
optional bytes response = 1;
optional bytes exception = 2;
optional ApplicationError application_error = 3;
optional bytes java_exception = 4;
optional RpcError rpc_error = 5;
}

View File

@@ -1,64 +0,0 @@
syntax = "proto2";
option go_package = "urlfetch";
package appengine;
message URLFetchServiceError {
enum ErrorCode {
OK = 0;
INVALID_URL = 1;
FETCH_ERROR = 2;
UNSPECIFIED_ERROR = 3;
RESPONSE_TOO_LARGE = 4;
DEADLINE_EXCEEDED = 5;
SSL_CERTIFICATE_ERROR = 6;
DNS_ERROR = 7;
CLOSED = 8;
INTERNAL_TRANSIENT_ERROR = 9;
TOO_MANY_REDIRECTS = 10;
MALFORMED_REPLY = 11;
CONNECTION_ERROR = 12;
}
}
message URLFetchRequest {
enum RequestMethod {
GET = 1;
POST = 2;
HEAD = 3;
PUT = 4;
DELETE = 5;
PATCH = 6;
}
required RequestMethod Method = 1;
required string Url = 2;
repeated group Header = 3 {
required string Key = 4;
required string Value = 5;
}
optional bytes Payload = 6 [ctype=CORD];
optional bool FollowRedirects = 7 [default=true];
optional double Deadline = 8;
optional bool MustValidateServerCertificate = 9 [default=true];
}
message URLFetchResponse {
optional bytes Content = 1;
required int32 StatusCode = 2;
repeated group Header = 3 {
required string Key = 4;
required string Value = 5;
}
optional bool ContentWasTruncated = 6 [default=false];
optional int64 ExternalBytesSent = 7;
optional int64 ExternalBytesReceived = 8;
optional string FinalUrl = 9;
optional int64 ApiCpuMilliseconds = 10 [default=0];
optional int64 ApiBytesSent = 11 [default=0];
optional int64 ApiBytesReceived = 12 [default=0];
}

25
vendor/google.golang.org/appengine/namespace.go generated vendored Normal file
View File

@@ -0,0 +1,25 @@
// Copyright 2012 Google Inc. All rights reserved.
// Use of this source code is governed by the Apache 2.0
// license that can be found in the LICENSE file.
package appengine
import (
"fmt"
"regexp"
"golang.org/x/net/context"
"google.golang.org/appengine/internal"
)
// Namespace returns a replacement context that operates within the given namespace.
func Namespace(c context.Context, namespace string) (context.Context, error) {
if !validNamespace.MatchString(namespace) {
return nil, fmt.Errorf("appengine: namespace %q does not match /%s/", namespace, validNamespace)
}
return internal.NamespacedContext(c, namespace), nil
}
// validNamespace matches valid namespace names.
var validNamespace = regexp.MustCompile(`^[0-9A-Za-z._-]{0,100}$`)

20
vendor/google.golang.org/appengine/timeout.go generated vendored Normal file
View File

@@ -0,0 +1,20 @@
// Copyright 2013 Google Inc. All rights reserved.
// Use of this source code is governed by the Apache 2.0
// license that can be found in the LICENSE file.
package appengine
import "golang.org/x/net/context"
// IsTimeoutError reports whether err is a timeout error.
func IsTimeoutError(err error) bool {
if err == context.DeadlineExceeded {
return true
}
if t, ok := err.(interface {
IsTimeout() bool
}); ok {
return t.IsTimeout()
}
return false
}

View File

@@ -1,18 +0,0 @@
language: go
go:
- 1.5.4
- 1.6.3
- 1.7
go_import_path: google.golang.org/grpc
before_install:
- go get -u golang.org/x/tools/cmd/goimports github.com/golang/lint/golint github.com/axw/gocov/gocov github.com/mattn/goveralls golang.org/x/tools/cmd/cover
script:
- '! gofmt -s -d -l . 2>&1 | read'
- '! goimports -l . | read'
- '! golint ./... | grep -vE "(_string|\.pb)\.go:"'
- '! go tool vet -all . 2>&1 | grep -vE "constant [0-9]+ not a string in call to Errorf" | grep -vF .pb.go:' # https://github.com/golang/protobuf/issues/214
- make test testrace

View File

@@ -1,46 +0,0 @@
# How to contribute
We definitely welcome patches and contribution to grpc! Here are some guidelines
and information about how to do so.
## Sending patches
### Getting started
1. Check out the code:
$ go get google.golang.org/grpc
$ cd $GOPATH/src/google.golang.org/grpc
1. Create a fork of the grpc-go repository.
1. Add your fork as a remote:
$ git remote add fork git@github.com:$YOURGITHUBUSERNAME/grpc-go.git
1. Make changes, commit them.
1. Run the test suite:
$ make test
1. Push your changes to your fork:
$ git push fork ...
1. Open a pull request.
## Legal requirements
In order to protect both you and ourselves, you will need to sign the
[Contributor License Agreement](https://cla.developers.google.com/clas).
## Filing Issues
When filing an issue, make sure to answer these five questions:
1. What version of Go are you using (`go version`)?
2. What operating system and processor architecture are you using?
3. What did you do?
4. What did you expect to see?
5. What did you see instead?
### Contributing code
Unless otherwise noted, the Go source files are distributed under the BSD-style license found in the LICENSE file.

View File

@@ -1,52 +0,0 @@
all: test testrace
deps:
go get -d -v google.golang.org/grpc/...
updatedeps:
go get -d -v -u -f google.golang.org/grpc/...
testdeps:
go get -d -v -t google.golang.org/grpc/...
updatetestdeps:
go get -d -v -t -u -f google.golang.org/grpc/...
build: deps
go build google.golang.org/grpc/...
proto:
@ if ! which protoc > /dev/null; then \
echo "error: protoc not installed" >&2; \
exit 1; \
fi
go get -u -v github.com/golang/protobuf/protoc-gen-go
# use $$dir as the root for all proto files in the same directory
for dir in $$(git ls-files '*.proto' | xargs -n1 dirname | uniq); do \
protoc -I $$dir --go_out=plugins=grpc:$$dir $$dir/*.proto; \
done
test: testdeps
go test -v -cpu 1,4 google.golang.org/grpc/...
testrace: testdeps
go test -v -race -cpu 1,4 google.golang.org/grpc/...
clean:
go clean -i google.golang.org/grpc/...
coverage: testdeps
./coverage.sh --coveralls
.PHONY: \
all \
deps \
updatedeps \
testdeps \
updatetestdeps \
build \
proto \
test \
testrace \
clean \
coverage

View File

@@ -1,32 +0,0 @@
#gRPC-Go
[![Build Status](https://travis-ci.org/grpc/grpc-go.svg)](https://travis-ci.org/grpc/grpc-go) [![GoDoc](https://godoc.org/google.golang.org/grpc?status.svg)](https://godoc.org/google.golang.org/grpc)
The Go implementation of [gRPC](http://www.grpc.io/): A high performance, open source, general RPC framework that puts mobile and HTTP/2 first. For more information see the [gRPC Quick Start](http://www.grpc.io/docs/) guide.
Installation
------------
To install this package, you need to install Go and setup your Go workspace on your computer. The simplest way to install the library is to run:
```
$ go get google.golang.org/grpc
```
Prerequisites
-------------
This requires Go 1.5 or later .
Constraints
-----------
The grpc package should only depend on standard Go packages and a small number of exceptions. If your contribution introduces new dependencies which are NOT in the [list](http://godoc.org/google.golang.org/grpc?imports), you need a discussion with gRPC-Go authors and consultants.
Documentation
-------------
See [API documentation](https://godoc.org/google.golang.org/grpc) for package and API descriptions and find examples in the [examples directory](examples/).
Status
------
GA

View File

@@ -1,11 +0,0 @@
package grpc
import "testing"
func TestBackoffConfigDefaults(t *testing.T) {
b := BackoffConfig{}
setDefaults(&b)
if b != DefaultBackoffConfig {
t.Fatalf("expected BackoffConfig to pickup default parameters: %v != %v", b, DefaultBackoffConfig)
}
}

View File

@@ -1,438 +0,0 @@
/*
*
* Copyright 2016, Google Inc.
* All rights reserved.
*
* Redistribution and use in source and binary forms, with or without
* modification, are permitted provided that the following conditions are
* met:
*
* * Redistributions of source code must retain the above copyright
* notice, this list of conditions and the following disclaimer.
* * Redistributions in binary form must reproduce the above
* copyright notice, this list of conditions and the following disclaimer
* in the documentation and/or other materials provided with the
* distribution.
* * Neither the name of Google Inc. nor the names of its
* contributors may be used to endorse or promote products derived from
* this software without specific prior written permission.
*
* THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
* "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
* LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
* A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
* OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
* SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
* LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
* DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
* THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
* (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
* OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
*
*/
package grpc
import (
"fmt"
"math"
"sync"
"testing"
"time"
"golang.org/x/net/context"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/naming"
)
type testWatcher struct {
// the channel to receives name resolution updates
update chan *naming.Update
// the side channel to get to know how many updates in a batch
side chan int
// the channel to notifiy update injector that the update reading is done
readDone chan int
}
func (w *testWatcher) Next() (updates []*naming.Update, err error) {
n := <-w.side
if n == 0 {
return nil, fmt.Errorf("w.side is closed")
}
for i := 0; i < n; i++ {
u := <-w.update
if u != nil {
updates = append(updates, u)
}
}
w.readDone <- 0
return
}
func (w *testWatcher) Close() {
}
// Inject naming resolution updates to the testWatcher.
func (w *testWatcher) inject(updates []*naming.Update) {
w.side <- len(updates)
for _, u := range updates {
w.update <- u
}
<-w.readDone
}
type testNameResolver struct {
w *testWatcher
addr string
}
func (r *testNameResolver) Resolve(target string) (naming.Watcher, error) {
r.w = &testWatcher{
update: make(chan *naming.Update, 1),
side: make(chan int, 1),
readDone: make(chan int),
}
r.w.side <- 1
r.w.update <- &naming.Update{
Op: naming.Add,
Addr: r.addr,
}
go func() {
<-r.w.readDone
}()
return r.w, nil
}
func startServers(t *testing.T, numServers int, maxStreams uint32) ([]*server, *testNameResolver) {
var servers []*server
for i := 0; i < numServers; i++ {
s := newTestServer()
servers = append(servers, s)
go s.start(t, 0, maxStreams)
s.wait(t, 2*time.Second)
}
// Point to server[0]
addr := "127.0.0.1:" + servers[0].port
return servers, &testNameResolver{
addr: addr,
}
}
func TestNameDiscovery(t *testing.T) {
// Start 2 servers on 2 ports.
numServers := 2
servers, r := startServers(t, numServers, math.MaxUint32)
cc, err := Dial("foo.bar.com", WithBalancer(RoundRobin(r)), WithBlock(), WithInsecure(), WithCodec(testCodec{}))
if err != nil {
t.Fatalf("Failed to create ClientConn: %v", err)
}
req := "port"
var reply string
if err := Invoke(context.Background(), "/foo/bar", &req, &reply, cc); err == nil || ErrorDesc(err) != servers[0].port {
t.Fatalf("grpc.Invoke(_, _, _, _, _) = %v, want %s", err, servers[0].port)
}
// Inject the name resolution change to remove servers[0] and add servers[1].
var updates []*naming.Update
updates = append(updates, &naming.Update{
Op: naming.Delete,
Addr: "127.0.0.1:" + servers[0].port,
})
updates = append(updates, &naming.Update{
Op: naming.Add,
Addr: "127.0.0.1:" + servers[1].port,
})
r.w.inject(updates)
// Loop until the rpcs in flight talks to servers[1].
for {
if err := Invoke(context.Background(), "/foo/bar", &req, &reply, cc); err != nil && ErrorDesc(err) == servers[1].port {
break
}
time.Sleep(10 * time.Millisecond)
}
cc.Close()
for i := 0; i < numServers; i++ {
servers[i].stop()
}
}
func TestEmptyAddrs(t *testing.T) {
servers, r := startServers(t, 1, math.MaxUint32)
cc, err := Dial("foo.bar.com", WithBalancer(RoundRobin(r)), WithBlock(), WithInsecure(), WithCodec(testCodec{}))
if err != nil {
t.Fatalf("Failed to create ClientConn: %v", err)
}
var reply string
if err := Invoke(context.Background(), "/foo/bar", &expectedRequest, &reply, cc); err != nil || reply != expectedResponse {
t.Fatalf("grpc.Invoke(_, _, _, _, _) = %v, reply = %q, want %q, <nil>", err, reply, expectedResponse)
}
// Inject name resolution change to remove the server so that there is no address
// available after that.
u := &naming.Update{
Op: naming.Delete,
Addr: "127.0.0.1:" + servers[0].port,
}
r.w.inject([]*naming.Update{u})
// Loop until the above updates apply.
for {
time.Sleep(10 * time.Millisecond)
ctx, _ := context.WithTimeout(context.Background(), 10*time.Millisecond)
if err := Invoke(ctx, "/foo/bar", &expectedRequest, &reply, cc); err != nil {
break
}
}
cc.Close()
servers[0].stop()
}
func TestRoundRobin(t *testing.T) {
// Start 3 servers on 3 ports.
numServers := 3
servers, r := startServers(t, numServers, math.MaxUint32)
cc, err := Dial("foo.bar.com", WithBalancer(RoundRobin(r)), WithBlock(), WithInsecure(), WithCodec(testCodec{}))
if err != nil {
t.Fatalf("Failed to create ClientConn: %v", err)
}
// Add servers[1] to the service discovery.
u := &naming.Update{
Op: naming.Add,
Addr: "127.0.0.1:" + servers[1].port,
}
r.w.inject([]*naming.Update{u})
req := "port"
var reply string
// Loop until servers[1] is up
for {
if err := Invoke(context.Background(), "/foo/bar", &req, &reply, cc); err != nil && ErrorDesc(err) == servers[1].port {
break
}
time.Sleep(10 * time.Millisecond)
}
// Add server2[2] to the service discovery.
u = &naming.Update{
Op: naming.Add,
Addr: "127.0.0.1:" + servers[2].port,
}
r.w.inject([]*naming.Update{u})
// Loop until both servers[2] are up.
for {
if err := Invoke(context.Background(), "/foo/bar", &req, &reply, cc); err != nil && ErrorDesc(err) == servers[2].port {
break
}
time.Sleep(10 * time.Millisecond)
}
// Check the incoming RPCs served in a round-robin manner.
for i := 0; i < 10; i++ {
if err := Invoke(context.Background(), "/foo/bar", &req, &reply, cc); err == nil || ErrorDesc(err) != servers[i%numServers].port {
t.Fatalf("Index %d: Invoke(_, _, _, _, _) = %v, want %s", i, err, servers[i%numServers].port)
}
}
cc.Close()
for i := 0; i < numServers; i++ {
servers[i].stop()
}
}
func TestCloseWithPendingRPC(t *testing.T) {
servers, r := startServers(t, 1, math.MaxUint32)
cc, err := Dial("foo.bar.com", WithBalancer(RoundRobin(r)), WithBlock(), WithInsecure(), WithCodec(testCodec{}))
if err != nil {
t.Fatalf("Failed to create ClientConn: %v", err)
}
var reply string
if err := Invoke(context.Background(), "/foo/bar", &expectedRequest, &reply, cc, FailFast(false)); err != nil {
t.Fatalf("grpc.Invoke(_, _, _, _, _) = %v, want %s", err, servers[0].port)
}
// Remove the server.
updates := []*naming.Update{{
Op: naming.Delete,
Addr: "127.0.0.1:" + servers[0].port,
}}
r.w.inject(updates)
// Loop until the above update applies.
for {
ctx, _ := context.WithTimeout(context.Background(), 10*time.Millisecond)
if err := Invoke(ctx, "/foo/bar", &expectedRequest, &reply, cc, FailFast(false)); Code(err) == codes.DeadlineExceeded {
break
}
time.Sleep(10 * time.Millisecond)
}
// Issue 2 RPCs which should be completed with error status once cc is closed.
var wg sync.WaitGroup
wg.Add(2)
go func() {
defer wg.Done()
var reply string
if err := Invoke(context.Background(), "/foo/bar", &expectedRequest, &reply, cc, FailFast(false)); err == nil {
t.Errorf("grpc.Invoke(_, _, _, _, _) = %v, want not nil", err)
}
}()
go func() {
defer wg.Done()
var reply string
time.Sleep(5 * time.Millisecond)
if err := Invoke(context.Background(), "/foo/bar", &expectedRequest, &reply, cc, FailFast(false)); err == nil {
t.Errorf("grpc.Invoke(_, _, _, _, _) = %v, want not nil", err)
}
}()
time.Sleep(5 * time.Millisecond)
cc.Close()
wg.Wait()
servers[0].stop()
}
func TestGetOnWaitChannel(t *testing.T) {
servers, r := startServers(t, 1, math.MaxUint32)
cc, err := Dial("foo.bar.com", WithBalancer(RoundRobin(r)), WithBlock(), WithInsecure(), WithCodec(testCodec{}))
if err != nil {
t.Fatalf("Failed to create ClientConn: %v", err)
}
// Remove all servers so that all upcoming RPCs will block on waitCh.
updates := []*naming.Update{{
Op: naming.Delete,
Addr: "127.0.0.1:" + servers[0].port,
}}
r.w.inject(updates)
for {
var reply string
ctx, _ := context.WithTimeout(context.Background(), 10*time.Millisecond)
if err := Invoke(ctx, "/foo/bar", &expectedRequest, &reply, cc, FailFast(false)); Code(err) == codes.DeadlineExceeded {
break
}
time.Sleep(10 * time.Millisecond)
}
var wg sync.WaitGroup
wg.Add(1)
go func() {
defer wg.Done()
var reply string
if err := Invoke(context.Background(), "/foo/bar", &expectedRequest, &reply, cc, FailFast(false)); err != nil {
t.Errorf("grpc.Invoke(_, _, _, _, _) = %v, want <nil>", err)
}
}()
// Add a connected server to get the above RPC through.
updates = []*naming.Update{{
Op: naming.Add,
Addr: "127.0.0.1:" + servers[0].port,
}}
r.w.inject(updates)
// Wait until the above RPC succeeds.
wg.Wait()
cc.Close()
servers[0].stop()
}
func TestOneServerDown(t *testing.T) {
// Start 2 servers.
numServers := 2
servers, r := startServers(t, numServers, math.MaxUint32)
cc, err := Dial("foo.bar.com", WithBalancer(RoundRobin(r)), WithBlock(), WithInsecure(), WithCodec(testCodec{}))
if err != nil {
t.Fatalf("Failed to create ClientConn: %v", err)
}
// Add servers[1] to the service discovery.
var updates []*naming.Update
updates = append(updates, &naming.Update{
Op: naming.Add,
Addr: "127.0.0.1:" + servers[1].port,
})
r.w.inject(updates)
req := "port"
var reply string
// Loop until servers[1] is up
for {
if err := Invoke(context.Background(), "/foo/bar", &req, &reply, cc); err != nil && ErrorDesc(err) == servers[1].port {
break
}
time.Sleep(10 * time.Millisecond)
}
var wg sync.WaitGroup
numRPC := 100
sleepDuration := 10 * time.Millisecond
wg.Add(1)
go func() {
time.Sleep(sleepDuration)
// After sleepDuration, kill server[0].
servers[0].stop()
wg.Done()
}()
// All non-failfast RPCs should not block because there's at least one connection available.
for i := 0; i < numRPC; i++ {
wg.Add(1)
go func() {
time.Sleep(sleepDuration)
// After sleepDuration, invoke RPC.
// server[0] is killed around the same time to make it racy between balancer and gRPC internals.
Invoke(context.Background(), "/foo/bar", &req, &reply, cc, FailFast(false))
wg.Done()
}()
}
wg.Wait()
cc.Close()
for i := 0; i < numServers; i++ {
servers[i].stop()
}
}
func TestOneAddressRemoval(t *testing.T) {
// Start 2 servers.
numServers := 2
servers, r := startServers(t, numServers, math.MaxUint32)
cc, err := Dial("foo.bar.com", WithBalancer(RoundRobin(r)), WithBlock(), WithInsecure(), WithCodec(testCodec{}))
if err != nil {
t.Fatalf("Failed to create ClientConn: %v", err)
}
// Add servers[1] to the service discovery.
var updates []*naming.Update
updates = append(updates, &naming.Update{
Op: naming.Add,
Addr: "127.0.0.1:" + servers[1].port,
})
r.w.inject(updates)
req := "port"
var reply string
// Loop until servers[1] is up
for {
if err := Invoke(context.Background(), "/foo/bar", &req, &reply, cc); err != nil && ErrorDesc(err) == servers[1].port {
break
}
time.Sleep(10 * time.Millisecond)
}
var wg sync.WaitGroup
numRPC := 100
sleepDuration := 10 * time.Millisecond
wg.Add(1)
go func() {
time.Sleep(sleepDuration)
// After sleepDuration, delete server[0].
var updates []*naming.Update
updates = append(updates, &naming.Update{
Op: naming.Delete,
Addr: "127.0.0.1:" + servers[0].port,
})
r.w.inject(updates)
wg.Done()
}()
// All non-failfast RPCs should not fail because there's at least one connection available.
for i := 0; i < numRPC; i++ {
wg.Add(1)
go func() {
var reply string
time.Sleep(sleepDuration)
// After sleepDuration, invoke RPC.
// server[0] is removed around the same time to make it racy between balancer and gRPC internals.
if err := Invoke(context.Background(), "/foo/bar", &expectedRequest, &reply, cc, FailFast(false)); err != nil {
t.Errorf("grpc.Invoke(_, _, _, _, _) = %v, want not nil", err)
}
wg.Done()
}()
}
wg.Wait()
cc.Close()
for i := 0; i < numServers; i++ {
servers[i].stop()
}
}

View File

@@ -1,293 +0,0 @@
/*
*
* Copyright 2014, Google Inc.
* All rights reserved.
*
* Redistribution and use in source and binary forms, with or without
* modification, are permitted provided that the following conditions are
* met:
*
* * Redistributions of source code must retain the above copyright
* notice, this list of conditions and the following disclaimer.
* * Redistributions in binary form must reproduce the above
* copyright notice, this list of conditions and the following disclaimer
* in the documentation and/or other materials provided with the
* distribution.
* * Neither the name of Google Inc. nor the names of its
* contributors may be used to endorse or promote products derived from
* this software without specific prior written permission.
*
* THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
* "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
* LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
* A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
* OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
* SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
* LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
* DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
* THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
* (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
* OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
*
*/
package grpc
import (
"fmt"
"io"
"math"
"net"
"strconv"
"strings"
"sync"
"testing"
"time"
"golang.org/x/net/context"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/transport"
)
var (
expectedRequest = "ping"
expectedResponse = "pong"
weirdError = "format verbs: %v%s"
sizeLargeErr = 1024 * 1024
canceled = 0
)
type testCodec struct {
}
func (testCodec) Marshal(v interface{}) ([]byte, error) {
return []byte(*(v.(*string))), nil
}
func (testCodec) Unmarshal(data []byte, v interface{}) error {
*(v.(*string)) = string(data)
return nil
}
func (testCodec) String() string {
return "test"
}
type testStreamHandler struct {
port string
t transport.ServerTransport
}
func (h *testStreamHandler) handleStream(t *testing.T, s *transport.Stream) {
p := &parser{r: s}
for {
pf, req, err := p.recvMsg(math.MaxInt32)
if err == io.EOF {
break
}
if err != nil {
return
}
if pf != compressionNone {
t.Errorf("Received the mistaken message format %d, want %d", pf, compressionNone)
return
}
var v string
codec := testCodec{}
if err := codec.Unmarshal(req, &v); err != nil {
t.Errorf("Failed to unmarshal the received message: %v", err)
return
}
if v == "weird error" {
h.t.WriteStatus(s, codes.Internal, weirdError)
return
}
if v == "canceled" {
canceled++
h.t.WriteStatus(s, codes.Internal, "")
return
}
if v == "port" {
h.t.WriteStatus(s, codes.Internal, h.port)
return
}
if v != expectedRequest {
h.t.WriteStatus(s, codes.Internal, strings.Repeat("A", sizeLargeErr))
return
}
}
// send a response back to end the stream.
reply, err := encode(testCodec{}, &expectedResponse, nil, nil)
if err != nil {
t.Errorf("Failed to encode the response: %v", err)
return
}
h.t.Write(s, reply, &transport.Options{})
h.t.WriteStatus(s, codes.OK, "")
}
type server struct {
lis net.Listener
port string
startedErr chan error // sent nil or an error after server starts
mu sync.Mutex
conns map[transport.ServerTransport]bool
}
func newTestServer() *server {
return &server{startedErr: make(chan error, 1)}
}
// start starts server. Other goroutines should block on s.startedErr for further operations.
func (s *server) start(t *testing.T, port int, maxStreams uint32) {
var err error
if port == 0 {
s.lis, err = net.Listen("tcp", "localhost:0")
} else {
s.lis, err = net.Listen("tcp", "localhost:"+strconv.Itoa(port))
}
if err != nil {
s.startedErr <- fmt.Errorf("failed to listen: %v", err)
return
}
_, p, err := net.SplitHostPort(s.lis.Addr().String())
if err != nil {
s.startedErr <- fmt.Errorf("failed to parse listener address: %v", err)
return
}
s.port = p
s.conns = make(map[transport.ServerTransport]bool)
s.startedErr <- nil
for {
conn, err := s.lis.Accept()
if err != nil {
return
}
st, err := transport.NewServerTransport("http2", conn, maxStreams, nil)
if err != nil {
continue
}
s.mu.Lock()
if s.conns == nil {
s.mu.Unlock()
st.Close()
return
}
s.conns[st] = true
s.mu.Unlock()
h := &testStreamHandler{
port: s.port,
t: st,
}
go st.HandleStreams(func(s *transport.Stream) {
go h.handleStream(t, s)
})
}
}
func (s *server) wait(t *testing.T, timeout time.Duration) {
select {
case err := <-s.startedErr:
if err != nil {
t.Fatal(err)
}
case <-time.After(timeout):
t.Fatalf("Timed out after %v waiting for server to be ready", timeout)
}
}
func (s *server) stop() {
s.lis.Close()
s.mu.Lock()
for c := range s.conns {
c.Close()
}
s.conns = nil
s.mu.Unlock()
}
func setUp(t *testing.T, port int, maxStreams uint32) (*server, *ClientConn) {
server := newTestServer()
go server.start(t, port, maxStreams)
server.wait(t, 2*time.Second)
addr := "localhost:" + server.port
cc, err := Dial(addr, WithBlock(), WithInsecure(), WithCodec(testCodec{}))
if err != nil {
t.Fatalf("Failed to create ClientConn: %v", err)
}
return server, cc
}
func TestInvoke(t *testing.T) {
server, cc := setUp(t, 0, math.MaxUint32)
var reply string
if err := Invoke(context.Background(), "/foo/bar", &expectedRequest, &reply, cc); err != nil || reply != expectedResponse {
t.Fatalf("grpc.Invoke(_, _, _, _, _) = %v, want <nil>", err)
}
cc.Close()
server.stop()
}
func TestInvokeLargeErr(t *testing.T) {
server, cc := setUp(t, 0, math.MaxUint32)
var reply string
req := "hello"
err := Invoke(context.Background(), "/foo/bar", &req, &reply, cc)
if _, ok := err.(*rpcError); !ok {
t.Fatalf("grpc.Invoke(_, _, _, _, _) receives non rpc error.")
}
if Code(err) != codes.Internal || len(ErrorDesc(err)) != sizeLargeErr {
t.Fatalf("grpc.Invoke(_, _, _, _, _) = %v, want an error of code %d and desc size %d", err, codes.Internal, sizeLargeErr)
}
cc.Close()
server.stop()
}
// TestInvokeErrorSpecialChars checks that error messages don't get mangled.
func TestInvokeErrorSpecialChars(t *testing.T) {
server, cc := setUp(t, 0, math.MaxUint32)
var reply string
req := "weird error"
err := Invoke(context.Background(), "/foo/bar", &req, &reply, cc)
if _, ok := err.(*rpcError); !ok {
t.Fatalf("grpc.Invoke(_, _, _, _, _) receives non rpc error.")
}
if got, want := ErrorDesc(err), weirdError; got != want {
t.Fatalf("grpc.Invoke(_, _, _, _, _) error = %q, want %q", got, want)
}
cc.Close()
server.stop()
}
// TestInvokeCancel checks that an Invoke with a canceled context is not sent.
func TestInvokeCancel(t *testing.T) {
server, cc := setUp(t, 0, math.MaxUint32)
var reply string
req := "canceled"
for i := 0; i < 100; i++ {
ctx, cancel := context.WithCancel(context.Background())
cancel()
Invoke(ctx, "/foo/bar", &req, &reply, cc)
}
if canceled != 0 {
t.Fatalf("received %d of 100 canceled requests", canceled)
}
cc.Close()
server.stop()
}
// TestInvokeCancelClosedNonFail checks that a canceled non-failfast RPC
// on a closed client will terminate.
func TestInvokeCancelClosedNonFailFast(t *testing.T) {
server, cc := setUp(t, 0, math.MaxUint32)
var reply string
cc.Close()
req := "hello"
ctx, cancel := context.WithCancel(context.Background())
cancel()
if err := Invoke(ctx, "/foo/bar", &req, &reply, cc, FailFast(false)); err == nil {
t.Fatalf("canceled invoke on closed connection should fail")
}
server.stop()
}

View File

@@ -1,190 +0,0 @@
/*
*
* Copyright 2014, Google Inc.
* All rights reserved.
*
* Redistribution and use in source and binary forms, with or without
* modification, are permitted provided that the following conditions are
* met:
*
* * Redistributions of source code must retain the above copyright
* notice, this list of conditions and the following disclaimer.
* * Redistributions in binary form must reproduce the above
* copyright notice, this list of conditions and the following disclaimer
* in the documentation and/or other materials provided with the
* distribution.
* * Neither the name of Google Inc. nor the names of its
* contributors may be used to endorse or promote products derived from
* this software without specific prior written permission.
*
* THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
* "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
* LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
* A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
* OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
* SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
* LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
* DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
* THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
* (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
* OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
*
*/
package grpc
import (
"testing"
"time"
"golang.org/x/net/context"
"google.golang.org/grpc/credentials"
"google.golang.org/grpc/credentials/oauth"
)
const tlsDir = "testdata/"
func TestDialTimeout(t *testing.T) {
conn, err := Dial("Non-Existent.Server:80", WithTimeout(time.Millisecond), WithBlock(), WithInsecure())
if err == nil {
conn.Close()
}
if err != ErrClientConnTimeout {
t.Fatalf("Dial(_, _) = %v, %v, want %v", conn, err, ErrClientConnTimeout)
}
}
func TestTLSDialTimeout(t *testing.T) {
creds, err := credentials.NewClientTLSFromFile(tlsDir+"ca.pem", "x.test.youtube.com")
if err != nil {
t.Fatalf("Failed to create credentials %v", err)
}
conn, err := Dial("Non-Existent.Server:80", WithTransportCredentials(creds), WithTimeout(time.Millisecond), WithBlock())
if err == nil {
conn.Close()
}
if err != ErrClientConnTimeout {
t.Fatalf("Dial(_, _) = %v, %v, want %v", conn, err, ErrClientConnTimeout)
}
}
func TestTLSServerNameOverwrite(t *testing.T) {
overwriteServerName := "over.write.server.name"
creds, err := credentials.NewClientTLSFromFile(tlsDir+"ca.pem", overwriteServerName)
if err != nil {
t.Fatalf("Failed to create credentials %v", err)
}
conn, err := Dial("Non-Existent.Server:80", WithTransportCredentials(creds))
if err != nil {
t.Fatalf("Dial(_, _) = _, %v, want _, <nil>", err)
}
conn.Close()
if conn.authority != overwriteServerName {
t.Fatalf("%v.authority = %v, want %v", conn, conn.authority, overwriteServerName)
}
}
func TestDialContextCancel(t *testing.T) {
ctx, cancel := context.WithCancel(context.Background())
cancel()
if _, err := DialContext(ctx, "Non-Existent.Server:80", WithBlock(), WithInsecure()); err != context.Canceled {
t.Fatalf("DialContext(%v, _) = _, %v, want _, %v", ctx, err, context.Canceled)
}
}
// blockingBalancer mimics the behavior of balancers whose initialization takes a long time.
// In this test, reading from blockingBalancer.Notify() blocks forever.
type blockingBalancer struct {
ch chan []Address
}
func newBlockingBalancer() Balancer {
return &blockingBalancer{ch: make(chan []Address)}
}
func (b *blockingBalancer) Start(target string, config BalancerConfig) error {
return nil
}
func (b *blockingBalancer) Up(addr Address) func(error) {
return nil
}
func (b *blockingBalancer) Get(ctx context.Context, opts BalancerGetOptions) (addr Address, put func(), err error) {
return Address{}, nil, nil
}
func (b *blockingBalancer) Notify() <-chan []Address {
return b.ch
}
func (b *blockingBalancer) Close() error {
close(b.ch)
return nil
}
func TestDialWithBlockingBalancer(t *testing.T) {
ctx, cancel := context.WithCancel(context.Background())
dialDone := make(chan struct{})
go func() {
DialContext(ctx, "Non-Existent.Server:80", WithBlock(), WithInsecure(), WithBalancer(newBlockingBalancer()))
close(dialDone)
}()
cancel()
<-dialDone
}
func TestCredentialsMisuse(t *testing.T) {
tlsCreds, err := credentials.NewClientTLSFromFile(tlsDir+"ca.pem", "x.test.youtube.com")
if err != nil {
t.Fatalf("Failed to create authenticator %v", err)
}
// Two conflicting credential configurations
if _, err := Dial("Non-Existent.Server:80", WithTransportCredentials(tlsCreds), WithBlock(), WithInsecure()); err != errCredentialsConflict {
t.Fatalf("Dial(_, _) = _, %v, want _, %v", err, errCredentialsConflict)
}
rpcCreds, err := oauth.NewJWTAccessFromKey(nil)
if err != nil {
t.Fatalf("Failed to create credentials %v", err)
}
// security info on insecure connection
if _, err := Dial("Non-Existent.Server:80", WithPerRPCCredentials(rpcCreds), WithBlock(), WithInsecure()); err != errTransportCredentialsMissing {
t.Fatalf("Dial(_, _) = _, %v, want _, %v", err, errTransportCredentialsMissing)
}
}
func TestWithBackoffConfigDefault(t *testing.T) {
testBackoffConfigSet(t, &DefaultBackoffConfig)
}
func TestWithBackoffConfig(t *testing.T) {
b := BackoffConfig{MaxDelay: DefaultBackoffConfig.MaxDelay / 2}
expected := b
setDefaults(&expected) // defaults should be set
testBackoffConfigSet(t, &expected, WithBackoffConfig(b))
}
func TestWithBackoffMaxDelay(t *testing.T) {
md := DefaultBackoffConfig.MaxDelay / 2
expected := BackoffConfig{MaxDelay: md}
setDefaults(&expected)
testBackoffConfigSet(t, &expected, WithBackoffMaxDelay(md))
}
func testBackoffConfigSet(t *testing.T, expected *BackoffConfig, opts ...DialOption) {
opts = append(opts, WithInsecure())
conn, err := Dial("foo:80", opts...)
if err != nil {
t.Fatalf("unexpected error dialing connection: %v", err)
}
if conn.dopts.bs == nil {
t.Fatalf("backoff config not set")
}
actual, ok := conn.dopts.bs.(BackoffConfig)
if !ok {
t.Fatalf("unexpected type of backoff config: %#v", conn.dopts.bs)
}
if actual != *expected {
t.Fatalf("unexpected backoff config on connection: %v, want %v", actual, expected)
}
conn.Close()
}

View File

@@ -1,17 +0,0 @@
#!/bin/bash
# This script serves as an example to demonstrate how to generate the gRPC-Go
# interface and the related messages from .proto file.
#
# It assumes the installation of i) Google proto buffer compiler at
# https://github.com/google/protobuf (after v2.6.1) and ii) the Go codegen
# plugin at https://github.com/golang/protobuf (after 2015-02-20). If you have
# not, please install them first.
#
# We recommend running this script at $GOPATH/src.
#
# If this is not what you need, feel free to make your own scripts. Again, this
# script is for demonstration purpose.
#
proto=$1
protoc --go_out=plugins=grpc:. $proto

View File

@@ -1,47 +0,0 @@
#!/bin/bash
set -e
workdir=.cover
profile="$workdir/cover.out"
mode=set
end2endtest="google.golang.org/grpc/test"
generate_cover_data() {
rm -rf "$workdir"
mkdir "$workdir"
for pkg in "$@"; do
if [ $pkg == "google.golang.org/grpc" -o $pkg == "google.golang.org/grpc/transport" -o $pkg == "google.golang.org/grpc/metadata" -o $pkg == "google.golang.org/grpc/credentials" ]
then
f="$workdir/$(echo $pkg | tr / -)"
go test -covermode="$mode" -coverprofile="$f.cover" "$pkg"
go test -covermode="$mode" -coverpkg "$pkg" -coverprofile="$f.e2e.cover" "$end2endtest"
fi
done
echo "mode: $mode" >"$profile"
grep -h -v "^mode:" "$workdir"/*.cover >>"$profile"
}
show_cover_report() {
go tool cover -${1}="$profile"
}
push_to_coveralls() {
goveralls -coverprofile="$profile"
}
generate_cover_data $(go list ./...)
show_cover_report func
case "$1" in
"")
;;
--html)
show_cover_report html ;;
--coveralls)
push_to_coveralls ;;
*)
echo >&2 "error: invalid option: $1" ;;
esac
rm -rf "$workdir"

View File

@@ -1,61 +0,0 @@
/*
*
* Copyright 2016, Google Inc.
* All rights reserved.
*
* Redistribution and use in source and binary forms, with or without
* modification, are permitted provided that the following conditions are
* met:
*
* * Redistributions of source code must retain the above copyright
* notice, this list of conditions and the following disclaimer.
* * Redistributions in binary form must reproduce the above
* copyright notice, this list of conditions and the following disclaimer
* in the documentation and/or other materials provided with the
* distribution.
* * Neither the name of Google Inc. nor the names of its
* contributors may be used to endorse or promote products derived from
* this software without specific prior written permission.
*
* THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
* "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
* LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
* A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
* OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
* SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
* LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
* DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
* THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
* (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
* OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
*
*/
package credentials
import (
"testing"
)
func TestTLSOverrideServerName(t *testing.T) {
expectedServerName := "server.name"
c := NewTLS(nil)
c.OverrideServerName(expectedServerName)
if c.Info().ServerName != expectedServerName {
t.Fatalf("c.Info().ServerName = %v, want %v", c.Info().ServerName, expectedServerName)
}
}
func TestTLSClone(t *testing.T) {
expectedServerName := "server.name"
c := NewTLS(nil)
c.OverrideServerName(expectedServerName)
cc := c.Clone()
if cc.Info().ServerName != expectedServerName {
t.Fatalf("cc.Info().ServerName = %v, want %v", cc.Info().ServerName, expectedServerName)
}
cc.OverrideServerName("")
if c.Info().ServerName != expectedServerName {
t.Fatalf("Change in clone should not affect the original, c.Info().ServerName = %v, want %v", c.Info().ServerName, expectedServerName)
}
}

View File

@@ -1,139 +0,0 @@
/*
*
* Copyright 2014, Google Inc.
* All rights reserved.
*
* Redistribution and use in source and binary forms, with or without
* modification, are permitted provided that the following conditions are
* met:
*
* * Redistributions of source code must retain the above copyright
* notice, this list of conditions and the following disclaimer.
* * Redistributions in binary form must reproduce the above
* copyright notice, this list of conditions and the following disclaimer
* in the documentation and/or other materials provided with the
* distribution.
* * Neither the name of Google Inc. nor the names of its
* contributors may be used to endorse or promote products derived from
* this software without specific prior written permission.
*
* THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
* "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
* LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
* A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
* OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
* SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
* LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
* DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
* THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
* (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
* OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
*
*/
package metadata
import (
"reflect"
"testing"
)
const binaryValue = string(128)
func TestEncodeKeyValue(t *testing.T) {
for _, test := range []struct {
// input
kin string
vin string
// output
kout string
vout string
}{
{"key", "abc", "key", "abc"},
{"KEY", "abc", "key", "abc"},
{"key-bin", "abc", "key-bin", "YWJj"},
{"key-bin", binaryValue, "key-bin", "woA="},
} {
k, v := encodeKeyValue(test.kin, test.vin)
if k != test.kout || !reflect.DeepEqual(v, test.vout) {
t.Fatalf("encodeKeyValue(%q, %q) = %q, %q, want %q, %q", test.kin, test.vin, k, v, test.kout, test.vout)
}
}
}
func TestDecodeKeyValue(t *testing.T) {
for _, test := range []struct {
// input
kin string
vin string
// output
kout string
vout string
err error
}{
{"a", "abc", "a", "abc", nil},
{"key-bin", "Zm9vAGJhcg==", "key-bin", "foo\x00bar", nil},
{"key-bin", "woA=", "key-bin", binaryValue, nil},
{"a", "abc,efg", "a", "abc,efg", nil},
{"key-bin", "Zm9vAGJhcg==,Zm9vAGJhcg==", "key-bin", "foo\x00bar,foo\x00bar", nil},
} {
k, v, err := DecodeKeyValue(test.kin, test.vin)
if k != test.kout || !reflect.DeepEqual(v, test.vout) || !reflect.DeepEqual(err, test.err) {
t.Fatalf("DecodeKeyValue(%q, %q) = %q, %q, %v, want %q, %q, %v", test.kin, test.vin, k, v, err, test.kout, test.vout, test.err)
}
}
}
func TestPairsMD(t *testing.T) {
for _, test := range []struct {
// input
kv []string
// output
md MD
size int
}{
{[]string{}, MD{}, 0},
{[]string{"k1", "v1", "k2-bin", binaryValue}, New(map[string]string{
"k1": "v1",
"k2-bin": binaryValue,
}), 2},
} {
md := Pairs(test.kv...)
if !reflect.DeepEqual(md, test.md) {
t.Fatalf("Pairs(%v) = %v, want %v", test.kv, md, test.md)
}
if md.Len() != test.size {
t.Fatalf("Pairs(%v) generates md of size %d, want %d", test.kv, md.Len(), test.size)
}
}
}
func TestCopy(t *testing.T) {
const key, val = "key", "val"
orig := Pairs(key, val)
copy := orig.Copy()
if !reflect.DeepEqual(orig, copy) {
t.Errorf("copied value not equal to the original, got %v, want %v", copy, orig)
}
orig[key][0] = "foo"
if v := copy[key][0]; v != val {
t.Errorf("change in original should not affect copy, got %q, want %q", v, val)
}
}
func TestJoin(t *testing.T) {
for _, test := range []struct {
mds []MD
want MD
}{
{[]MD{}, MD{}},
{[]MD{Pairs("foo", "bar")}, Pairs("foo", "bar")},
{[]MD{Pairs("foo", "bar"), Pairs("foo", "baz")}, Pairs("foo", "bar", "foo", "baz")},
{[]MD{Pairs("foo", "bar"), Pairs("foo", "baz"), Pairs("zip", "zap")}, Pairs("foo", "bar", "foo", "baz", "zip", "zap")},
} {
md := Join(test.mds...)
if !reflect.DeepEqual(md, test.want) {
t.Errorf("context's metadata is %v, want %v", md, test.want)
}
}
}

View File

@@ -1,234 +0,0 @@
/*
*
* Copyright 2014, Google Inc.
* All rights reserved.
*
* Redistribution and use in source and binary forms, with or without
* modification, are permitted provided that the following conditions are
* met:
*
* * Redistributions of source code must retain the above copyright
* notice, this list of conditions and the following disclaimer.
* * Redistributions in binary form must reproduce the above
* copyright notice, this list of conditions and the following disclaimer
* in the documentation and/or other materials provided with the
* distribution.
* * Neither the name of Google Inc. nor the names of its
* contributors may be used to endorse or promote products derived from
* this software without specific prior written permission.
*
* THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
* "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
* LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
* A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
* OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
* SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
* LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
* DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
* THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
* (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
* OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
*
*/
package grpc
import (
"bytes"
"io"
"math"
"reflect"
"testing"
"github.com/golang/protobuf/proto"
"golang.org/x/net/context"
"google.golang.org/grpc/codes"
perfpb "google.golang.org/grpc/test/codec_perf"
"google.golang.org/grpc/transport"
)
func TestSimpleParsing(t *testing.T) {
bigMsg := bytes.Repeat([]byte{'x'}, 1<<24)
for _, test := range []struct {
// input
p []byte
// outputs
err error
b []byte
pt payloadFormat
}{
{nil, io.EOF, nil, compressionNone},
{[]byte{0, 0, 0, 0, 0}, nil, nil, compressionNone},
{[]byte{0, 0, 0, 0, 1, 'a'}, nil, []byte{'a'}, compressionNone},
{[]byte{1, 0}, io.ErrUnexpectedEOF, nil, compressionNone},
{[]byte{0, 0, 0, 0, 10, 'a'}, io.ErrUnexpectedEOF, nil, compressionNone},
// Check that messages with length >= 2^24 are parsed.
{append([]byte{0, 1, 0, 0, 0}, bigMsg...), nil, bigMsg, compressionNone},
} {
buf := bytes.NewReader(test.p)
parser := &parser{r: buf}
pt, b, err := parser.recvMsg(math.MaxInt32)
if err != test.err || !bytes.Equal(b, test.b) || pt != test.pt {
t.Fatalf("parser{%v}.recvMsg(_) = %v, %v, %v\nwant %v, %v, %v", test.p, pt, b, err, test.pt, test.b, test.err)
}
}
}
func TestMultipleParsing(t *testing.T) {
// Set a byte stream consists of 3 messages with their headers.
p := []byte{0, 0, 0, 0, 1, 'a', 0, 0, 0, 0, 2, 'b', 'c', 0, 0, 0, 0, 1, 'd'}
b := bytes.NewReader(p)
parser := &parser{r: b}
wantRecvs := []struct {
pt payloadFormat
data []byte
}{
{compressionNone, []byte("a")},
{compressionNone, []byte("bc")},
{compressionNone, []byte("d")},
}
for i, want := range wantRecvs {
pt, data, err := parser.recvMsg(math.MaxInt32)
if err != nil || pt != want.pt || !reflect.DeepEqual(data, want.data) {
t.Fatalf("after %d calls, parser{%v}.recvMsg(_) = %v, %v, %v\nwant %v, %v, <nil>",
i, p, pt, data, err, want.pt, want.data)
}
}
pt, data, err := parser.recvMsg(math.MaxInt32)
if err != io.EOF {
t.Fatalf("after %d recvMsgs calls, parser{%v}.recvMsg(_) = %v, %v, %v\nwant _, _, %v",
len(wantRecvs), p, pt, data, err, io.EOF)
}
}
func TestEncode(t *testing.T) {
for _, test := range []struct {
// input
msg proto.Message
cp Compressor
// outputs
b []byte
err error
}{
{nil, nil, []byte{0, 0, 0, 0, 0}, nil},
} {
b, err := encode(protoCodec{}, test.msg, nil, nil)
if err != test.err || !bytes.Equal(b, test.b) {
t.Fatalf("encode(_, _, %v, _) = %v, %v\nwant %v, %v", test.cp, b, err, test.b, test.err)
}
}
}
func TestCompress(t *testing.T) {
for _, test := range []struct {
// input
data []byte
cp Compressor
dc Decompressor
// outputs
err error
}{
{make([]byte, 1024), &gzipCompressor{}, &gzipDecompressor{}, nil},
} {
b := new(bytes.Buffer)
if err := test.cp.Do(b, test.data); err != test.err {
t.Fatalf("Compressor.Do(_, %v) = %v, want %v", test.data, err, test.err)
}
if b.Len() >= len(test.data) {
t.Fatalf("The compressor fails to compress data.")
}
if p, err := test.dc.Do(b); err != nil || !bytes.Equal(test.data, p) {
t.Fatalf("Decompressor.Do(%v) = %v, %v, want %v, <nil>", b, p, err, test.data)
}
}
}
func TestToRPCErr(t *testing.T) {
for _, test := range []struct {
// input
errIn error
// outputs
errOut *rpcError
}{
{transport.StreamError{codes.Unknown, ""}, Errorf(codes.Unknown, "").(*rpcError)},
{transport.ErrConnClosing, Errorf(codes.Internal, transport.ErrConnClosing.Desc).(*rpcError)},
} {
err := toRPCErr(test.errIn)
rpcErr, ok := err.(*rpcError)
if !ok {
t.Fatalf("toRPCErr{%v} returned type %T, want %T", test.errIn, err, rpcError{})
}
if *rpcErr != *test.errOut {
t.Fatalf("toRPCErr{%v} = %v \nwant %v", test.errIn, err, test.errOut)
}
}
}
func TestContextErr(t *testing.T) {
for _, test := range []struct {
// input
errIn error
// outputs
errOut transport.StreamError
}{
{context.DeadlineExceeded, transport.StreamError{codes.DeadlineExceeded, context.DeadlineExceeded.Error()}},
{context.Canceled, transport.StreamError{codes.Canceled, context.Canceled.Error()}},
} {
err := transport.ContextErr(test.errIn)
if err != test.errOut {
t.Fatalf("ContextErr{%v} = %v \nwant %v", test.errIn, err, test.errOut)
}
}
}
func TestErrorsWithSameParameters(t *testing.T) {
const description = "some description"
e1 := Errorf(codes.AlreadyExists, description).(*rpcError)
e2 := Errorf(codes.AlreadyExists, description).(*rpcError)
if e1 == e2 {
t.Fatalf("Error interfaces should not be considered equal - e1: %p - %v e2: %p - %v", e1, e1, e2, e2)
}
if Code(e1) != Code(e2) || ErrorDesc(e1) != ErrorDesc(e2) {
t.Fatalf("Expected errors to have same code and description - e1: %p - %v e2: %p - %v", e1, e1, e2, e2)
}
}
// bmEncode benchmarks encoding a Protocol Buffer message containing mSize
// bytes.
func bmEncode(b *testing.B, mSize int) {
msg := &perfpb.Buffer{Body: make([]byte, mSize)}
encoded, _ := encode(protoCodec{}, msg, nil, nil)
encodedSz := int64(len(encoded))
b.ReportAllocs()
b.ResetTimer()
for i := 0; i < b.N; i++ {
encode(protoCodec{}, msg, nil, nil)
}
b.SetBytes(encodedSz)
}
func BenchmarkEncode1B(b *testing.B) {
bmEncode(b, 1)
}
func BenchmarkEncode1KiB(b *testing.B) {
bmEncode(b, 1024)
}
func BenchmarkEncode8KiB(b *testing.B) {
bmEncode(b, 8*1024)
}
func BenchmarkEncode64KiB(b *testing.B) {
bmEncode(b, 64*1024)
}
func BenchmarkEncode512KiB(b *testing.B) {
bmEncode(b, 512*1024)
}
func BenchmarkEncode1MiB(b *testing.B) {
bmEncode(b, 1024*1024)
}

View File

@@ -1,113 +0,0 @@
/*
*
* Copyright 2016, Google Inc.
* All rights reserved.
*
* Redistribution and use in source and binary forms, with or without
* modification, are permitted provided that the following conditions are
* met:
*
* * Redistributions of source code must retain the above copyright
* notice, this list of conditions and the following disclaimer.
* * Redistributions in binary form must reproduce the above
* copyright notice, this list of conditions and the following disclaimer
* in the documentation and/or other materials provided with the
* distribution.
* * Neither the name of Google Inc. nor the names of its
* contributors may be used to endorse or promote products derived from
* this software without specific prior written permission.
*
* THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
* "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
* LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
* A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
* OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
* SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
* LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
* DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
* THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
* (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
* OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
*
*/
package grpc
import (
"net"
"reflect"
"strings"
"testing"
)
type emptyServiceServer interface{}
type testServer struct{}
func TestStopBeforeServe(t *testing.T) {
lis, err := net.Listen("tcp", "localhost:0")
if err != nil {
t.Fatalf("failed to create listener: %v", err)
}
server := NewServer()
server.Stop()
err = server.Serve(lis)
if err != ErrServerStopped {
t.Fatalf("server.Serve() error = %v, want %v", err, ErrServerStopped)
}
// server.Serve is responsible for closing the listener, even if the
// server was already stopped.
err = lis.Close()
if got, want := ErrorDesc(err), "use of closed network connection"; !strings.Contains(got, want) {
t.Errorf("Close() error = %q, want %q", got, want)
}
}
func TestGetServiceInfo(t *testing.T) {
testSd := ServiceDesc{
ServiceName: "grpc.testing.EmptyService",
HandlerType: (*emptyServiceServer)(nil),
Methods: []MethodDesc{
{
MethodName: "EmptyCall",
Handler: nil,
},
},
Streams: []StreamDesc{
{
StreamName: "EmptyStream",
Handler: nil,
ServerStreams: false,
ClientStreams: true,
},
},
Metadata: []int{0, 2, 1, 3},
}
server := NewServer()
server.RegisterService(&testSd, &testServer{})
info := server.GetServiceInfo()
want := map[string]ServiceInfo{
"grpc.testing.EmptyService": {
Methods: []MethodInfo{
{
Name: "EmptyCall",
IsClientStream: false,
IsServerStream: false,
},
{
Name: "EmptyStream",
IsClientStream: true,
IsServerStream: false,
}},
Metadata: []int{0, 2, 1, 3},
},
}
if !reflect.DeepEqual(info, want) {
t.Errorf("GetServiceInfo() = %+v, want %+v", info, want)
}
}

View File

@@ -1,389 +0,0 @@
/*
* Copyright 2016, Google Inc.
* All rights reserved.
*
* Redistribution and use in source and binary forms, with or without
* modification, are permitted provided that the following conditions are
* met:
*
* * Redistributions of source code must retain the above copyright
* notice, this list of conditions and the following disclaimer.
* * Redistributions in binary form must reproduce the above
* copyright notice, this list of conditions and the following disclaimer
* in the documentation and/or other materials provided with the
* distribution.
* * Neither the name of Google Inc. nor the names of its
* contributors may be used to endorse or promote products derived from
* this software without specific prior written permission.
*
* THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
* "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
* LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
* A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
* OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
* SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
* LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
* DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
* THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
* (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
* OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
*
*/
package transport
import (
"errors"
"fmt"
"io"
"net/http"
"net/http/httptest"
"net/url"
"reflect"
"testing"
"time"
"golang.org/x/net/context"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/metadata"
)
func TestHandlerTransport_NewServerHandlerTransport(t *testing.T) {
type testCase struct {
name string
req *http.Request
wantErr string
modrw func(http.ResponseWriter) http.ResponseWriter
check func(*serverHandlerTransport, *testCase) error
}
tests := []testCase{
{
name: "http/1.1",
req: &http.Request{
ProtoMajor: 1,
ProtoMinor: 1,
},
wantErr: "gRPC requires HTTP/2",
},
{
name: "bad method",
req: &http.Request{
ProtoMajor: 2,
Method: "GET",
Header: http.Header{},
RequestURI: "/",
},
wantErr: "invalid gRPC request method",
},
{
name: "bad content type",
req: &http.Request{
ProtoMajor: 2,
Method: "POST",
Header: http.Header{
"Content-Type": {"application/foo"},
},
RequestURI: "/service/foo.bar",
},
wantErr: "invalid gRPC request content-type",
},
{
name: "not flusher",
req: &http.Request{
ProtoMajor: 2,
Method: "POST",
Header: http.Header{
"Content-Type": {"application/grpc"},
},
RequestURI: "/service/foo.bar",
},
modrw: func(w http.ResponseWriter) http.ResponseWriter {
// Return w without its Flush method
type onlyCloseNotifier interface {
http.ResponseWriter
http.CloseNotifier
}
return struct{ onlyCloseNotifier }{w.(onlyCloseNotifier)}
},
wantErr: "gRPC requires a ResponseWriter supporting http.Flusher",
},
{
name: "not closenotifier",
req: &http.Request{
ProtoMajor: 2,
Method: "POST",
Header: http.Header{
"Content-Type": {"application/grpc"},
},
RequestURI: "/service/foo.bar",
},
modrw: func(w http.ResponseWriter) http.ResponseWriter {
// Return w without its CloseNotify method
type onlyFlusher interface {
http.ResponseWriter
http.Flusher
}
return struct{ onlyFlusher }{w.(onlyFlusher)}
},
wantErr: "gRPC requires a ResponseWriter supporting http.CloseNotifier",
},
{
name: "valid",
req: &http.Request{
ProtoMajor: 2,
Method: "POST",
Header: http.Header{
"Content-Type": {"application/grpc"},
},
URL: &url.URL{
Path: "/service/foo.bar",
},
RequestURI: "/service/foo.bar",
},
check: func(t *serverHandlerTransport, tt *testCase) error {
if t.req != tt.req {
return fmt.Errorf("t.req = %p; want %p", t.req, tt.req)
}
if t.rw == nil {
return errors.New("t.rw = nil; want non-nil")
}
return nil
},
},
{
name: "with timeout",
req: &http.Request{
ProtoMajor: 2,
Method: "POST",
Header: http.Header{
"Content-Type": []string{"application/grpc"},
"Grpc-Timeout": {"200m"},
},
URL: &url.URL{
Path: "/service/foo.bar",
},
RequestURI: "/service/foo.bar",
},
check: func(t *serverHandlerTransport, tt *testCase) error {
if !t.timeoutSet {
return errors.New("timeout not set")
}
if want := 200 * time.Millisecond; t.timeout != want {
return fmt.Errorf("timeout = %v; want %v", t.timeout, want)
}
return nil
},
},
{
name: "with bad timeout",
req: &http.Request{
ProtoMajor: 2,
Method: "POST",
Header: http.Header{
"Content-Type": []string{"application/grpc"},
"Grpc-Timeout": {"tomorrow"},
},
URL: &url.URL{
Path: "/service/foo.bar",
},
RequestURI: "/service/foo.bar",
},
wantErr: `stream error: code = 13 desc = "malformed time-out: transport: timeout unit is not recognized: \"tomorrow\""`,
},
{
name: "with metadata",
req: &http.Request{
ProtoMajor: 2,
Method: "POST",
Header: http.Header{
"Content-Type": []string{"application/grpc"},
"meta-foo": {"foo-val"},
"meta-bar": {"bar-val1", "bar-val2"},
"user-agent": {"x/y a/b"},
},
URL: &url.URL{
Path: "/service/foo.bar",
},
RequestURI: "/service/foo.bar",
},
check: func(ht *serverHandlerTransport, tt *testCase) error {
want := metadata.MD{
"meta-bar": {"bar-val1", "bar-val2"},
"user-agent": {"x/y"},
"meta-foo": {"foo-val"},
}
if !reflect.DeepEqual(ht.headerMD, want) {
return fmt.Errorf("metdata = %#v; want %#v", ht.headerMD, want)
}
return nil
},
},
}
for _, tt := range tests {
rw := newTestHandlerResponseWriter()
if tt.modrw != nil {
rw = tt.modrw(rw)
}
got, gotErr := NewServerHandlerTransport(rw, tt.req)
if (gotErr != nil) != (tt.wantErr != "") || (gotErr != nil && gotErr.Error() != tt.wantErr) {
t.Errorf("%s: error = %v; want %q", tt.name, gotErr, tt.wantErr)
continue
}
if gotErr != nil {
continue
}
if tt.check != nil {
if err := tt.check(got.(*serverHandlerTransport), &tt); err != nil {
t.Errorf("%s: %v", tt.name, err)
}
}
}
}
type testHandlerResponseWriter struct {
*httptest.ResponseRecorder
closeNotify chan bool
}
func (w testHandlerResponseWriter) CloseNotify() <-chan bool { return w.closeNotify }
func (w testHandlerResponseWriter) Flush() {}
func newTestHandlerResponseWriter() http.ResponseWriter {
return testHandlerResponseWriter{
ResponseRecorder: httptest.NewRecorder(),
closeNotify: make(chan bool, 1),
}
}
type handleStreamTest struct {
t *testing.T
bodyw *io.PipeWriter
req *http.Request
rw testHandlerResponseWriter
ht *serverHandlerTransport
}
func newHandleStreamTest(t *testing.T) *handleStreamTest {
bodyr, bodyw := io.Pipe()
req := &http.Request{
ProtoMajor: 2,
Method: "POST",
Header: http.Header{
"Content-Type": {"application/grpc"},
},
URL: &url.URL{
Path: "/service/foo.bar",
},
RequestURI: "/service/foo.bar",
Body: bodyr,
}
rw := newTestHandlerResponseWriter().(testHandlerResponseWriter)
ht, err := NewServerHandlerTransport(rw, req)
if err != nil {
t.Fatal(err)
}
return &handleStreamTest{
t: t,
bodyw: bodyw,
ht: ht.(*serverHandlerTransport),
rw: rw,
}
}
func TestHandlerTransport_HandleStreams(t *testing.T) {
st := newHandleStreamTest(t)
handleStream := func(s *Stream) {
if want := "/service/foo.bar"; s.method != want {
t.Errorf("stream method = %q; want %q", s.method, want)
}
st.bodyw.Close() // no body
st.ht.WriteStatus(s, codes.OK, "")
}
st.ht.HandleStreams(func(s *Stream) { go handleStream(s) })
wantHeader := http.Header{
"Date": nil,
"Content-Type": {"application/grpc"},
"Trailer": {"Grpc-Status", "Grpc-Message"},
"Grpc-Status": {"0"},
}
if !reflect.DeepEqual(st.rw.HeaderMap, wantHeader) {
t.Errorf("Header+Trailer Map: %#v; want %#v", st.rw.HeaderMap, wantHeader)
}
}
// Tests that codes.Unimplemented will close the body, per comment in handler_server.go.
func TestHandlerTransport_HandleStreams_Unimplemented(t *testing.T) {
handleStreamCloseBodyTest(t, codes.Unimplemented, "thingy is unimplemented")
}
// Tests that codes.InvalidArgument will close the body, per comment in handler_server.go.
func TestHandlerTransport_HandleStreams_InvalidArgument(t *testing.T) {
handleStreamCloseBodyTest(t, codes.InvalidArgument, "bad arg")
}
func handleStreamCloseBodyTest(t *testing.T, statusCode codes.Code, msg string) {
st := newHandleStreamTest(t)
handleStream := func(s *Stream) {
st.ht.WriteStatus(s, statusCode, msg)
}
st.ht.HandleStreams(func(s *Stream) { go handleStream(s) })
wantHeader := http.Header{
"Date": nil,
"Content-Type": {"application/grpc"},
"Trailer": {"Grpc-Status", "Grpc-Message"},
"Grpc-Status": {fmt.Sprint(uint32(statusCode))},
"Grpc-Message": {encodeGrpcMessage(msg)},
}
if !reflect.DeepEqual(st.rw.HeaderMap, wantHeader) {
t.Errorf("Header+Trailer mismatch.\n got: %#v\nwant: %#v", st.rw.HeaderMap, wantHeader)
}
}
func TestHandlerTransport_HandleStreams_Timeout(t *testing.T) {
bodyr, bodyw := io.Pipe()
req := &http.Request{
ProtoMajor: 2,
Method: "POST",
Header: http.Header{
"Content-Type": {"application/grpc"},
"Grpc-Timeout": {"200m"},
},
URL: &url.URL{
Path: "/service/foo.bar",
},
RequestURI: "/service/foo.bar",
Body: bodyr,
}
rw := newTestHandlerResponseWriter().(testHandlerResponseWriter)
ht, err := NewServerHandlerTransport(rw, req)
if err != nil {
t.Fatal(err)
}
runStream := func(s *Stream) {
defer bodyw.Close()
select {
case <-s.ctx.Done():
case <-time.After(5 * time.Second):
t.Errorf("timeout waiting for ctx.Done")
return
}
err := s.ctx.Err()
if err != context.DeadlineExceeded {
t.Errorf("ctx.Err = %v; want %v", err, context.DeadlineExceeded)
return
}
ht.WriteStatus(s, codes.DeadlineExceeded, "too slow")
}
ht.HandleStreams(func(s *Stream) { go runStream(s) })
wantHeader := http.Header{
"Date": nil,
"Content-Type": {"application/grpc"},
"Trailer": {"Grpc-Status", "Grpc-Message"},
"Grpc-Status": {"4"},
"Grpc-Message": {encodeGrpcMessage("too slow")},
}
if !reflect.DeepEqual(rw.HeaderMap, wantHeader) {
t.Errorf("Header+Trailer Map mismatch.\n got: %#v\nwant: %#v", rw.HeaderMap, wantHeader)
}
}

View File

@@ -1,145 +0,0 @@
/*
*
* Copyright 2014, Google Inc.
* All rights reserved.
*
* Redistribution and use in source and binary forms, with or without
* modification, are permitted provided that the following conditions are
* met:
*
* * Redistributions of source code must retain the above copyright
* notice, this list of conditions and the following disclaimer.
* * Redistributions in binary form must reproduce the above
* copyright notice, this list of conditions and the following disclaimer
* in the documentation and/or other materials provided with the
* distribution.
* * Neither the name of Google Inc. nor the names of its
* contributors may be used to endorse or promote products derived from
* this software without specific prior written permission.
*
* THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
* "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
* LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
* A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
* OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
* SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
* LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
* DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
* THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
* (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
* OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
*
*/
package transport
import (
"fmt"
"testing"
"time"
)
func TestTimeoutEncode(t *testing.T) {
for _, test := range []struct {
in string
out string
}{
{"12345678ns", "12345678n"},
{"123456789ns", "123457u"},
{"12345678us", "12345678u"},
{"123456789us", "123457m"},
{"12345678ms", "12345678m"},
{"123456789ms", "123457S"},
{"12345678s", "12345678S"},
{"123456789s", "2057614M"},
{"12345678m", "12345678M"},
{"123456789m", "2057614H"},
} {
d, err := time.ParseDuration(test.in)
if err != nil {
t.Fatalf("failed to parse duration string %s: %v", test.in, err)
}
out := encodeTimeout(d)
if out != test.out {
t.Fatalf("timeoutEncode(%s) = %s, want %s", test.in, out, test.out)
}
}
}
func TestTimeoutDecode(t *testing.T) {
for _, test := range []struct {
// input
s string
// output
d time.Duration
err error
}{
{"1234S", time.Second * 1234, nil},
{"1234x", 0, fmt.Errorf("transport: timeout unit is not recognized: %q", "1234x")},
{"1", 0, fmt.Errorf("transport: timeout string is too short: %q", "1")},
{"", 0, fmt.Errorf("transport: timeout string is too short: %q", "")},
} {
d, err := decodeTimeout(test.s)
if d != test.d || fmt.Sprint(err) != fmt.Sprint(test.err) {
t.Fatalf("timeoutDecode(%q) = %d, %v, want %d, %v", test.s, int64(d), err, int64(test.d), test.err)
}
}
}
func TestValidContentType(t *testing.T) {
tests := []struct {
h string
want bool
}{
{"application/grpc", true},
{"application/grpc+", true},
{"application/grpc+blah", true},
{"application/grpc;", true},
{"application/grpc;blah", true},
{"application/grpcd", false},
{"application/grpd", false},
{"application/grp", false},
}
for _, tt := range tests {
got := validContentType(tt.h)
if got != tt.want {
t.Errorf("validContentType(%q) = %v; want %v", tt.h, got, tt.want)
}
}
}
func TestEncodeGrpcMessage(t *testing.T) {
for _, tt := range []struct {
input string
expected string
}{
{"", ""},
{"Hello", "Hello"},
{"my favorite character is \u0000", "my favorite character is %00"},
{"my favorite character is %", "my favorite character is %25"},
} {
actual := encodeGrpcMessage(tt.input)
if tt.expected != actual {
t.Errorf("encodeGrpcMessage(%v) = %v, want %v", tt.input, actual, tt.expected)
}
}
}
func TestDecodeGrpcMessage(t *testing.T) {
for _, tt := range []struct {
input string
expected string
}{
{"", ""},
{"Hello", "Hello"},
{"H%61o", "Hao"},
{"H%6", "H%6"},
{"%G0", "%G0"},
{"%E7%B3%BB%E7%BB%9F", "系统"},
} {
actual := decodeGrpcMessage(tt.input)
if tt.expected != actual {
t.Errorf("dncodeGrpcMessage(%v) = %v, want %v", tt.input, actual, tt.expected)
}
}
}

View File

@@ -1,829 +0,0 @@
/*
*
* Copyright 2014, Google Inc.
* All rights reserved.
*
* Redistribution and use in source and binary forms, with or without
* modification, are permitted provided that the following conditions are
* met:
*
* * Redistributions of source code must retain the above copyright
* notice, this list of conditions and the following disclaimer.
* * Redistributions in binary form must reproduce the above
* copyright notice, this list of conditions and the following disclaimer
* in the documentation and/or other materials provided with the
* distribution.
* * Neither the name of Google Inc. nor the names of its
* contributors may be used to endorse or promote products derived from
* this software without specific prior written permission.
*
* THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
* "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
* LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
* A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
* OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
* SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
* LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
* DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
* THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
* (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
* OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
*
*/
package transport
import (
"bytes"
"fmt"
"io"
"math"
"net"
"strconv"
"strings"
"sync"
"testing"
"time"
"golang.org/x/net/context"
"golang.org/x/net/http2"
"golang.org/x/net/http2/hpack"
"google.golang.org/grpc/codes"
)
type server struct {
lis net.Listener
port string
startedErr chan error // error (or nil) with server start value
mu sync.Mutex
conns map[ServerTransport]bool
}
var (
expectedRequest = []byte("ping")
expectedResponse = []byte("pong")
expectedRequestLarge = make([]byte, initialWindowSize*2)
expectedResponseLarge = make([]byte, initialWindowSize*2)
expectedInvalidHeaderField = "invalid/content-type"
)
type testStreamHandler struct {
t *http2Server
}
type hType int
const (
normal hType = iota
suspended
misbehaved
encodingRequiredStatus
invalidHeaderField
)
func (h *testStreamHandler) handleStream(t *testing.T, s *Stream) {
req := expectedRequest
resp := expectedResponse
if s.Method() == "foo.Large" {
req = expectedRequestLarge
resp = expectedResponseLarge
}
p := make([]byte, len(req))
_, err := io.ReadFull(s, p)
if err != nil {
return
}
if !bytes.Equal(p, req) {
t.Fatalf("handleStream got %v, want %v", p, req)
}
// send a response back to the client.
h.t.Write(s, resp, &Options{})
// send the trailer to end the stream.
h.t.WriteStatus(s, codes.OK, "")
}
// handleStreamSuspension blocks until s.ctx is canceled.
func (h *testStreamHandler) handleStreamSuspension(s *Stream) {
go func() {
<-s.ctx.Done()
}()
}
func (h *testStreamHandler) handleStreamMisbehave(t *testing.T, s *Stream) {
conn, ok := s.ServerTransport().(*http2Server)
if !ok {
t.Fatalf("Failed to convert %v to *http2Server", s.ServerTransport())
}
var sent int
p := make([]byte, http2MaxFrameLen)
for sent < initialWindowSize {
<-conn.writableChan
n := initialWindowSize - sent
// The last message may be smaller than http2MaxFrameLen
if n <= http2MaxFrameLen {
if s.Method() == "foo.Connection" {
// Violate connection level flow control window of client but do not
// violate any stream level windows.
p = make([]byte, n)
} else {
// Violate stream level flow control window of client.
p = make([]byte, n+1)
}
}
if err := conn.framer.writeData(true, s.id, false, p); err != nil {
conn.writableChan <- 0
break
}
conn.writableChan <- 0
sent += len(p)
}
}
func (h *testStreamHandler) handleStreamEncodingRequiredStatus(t *testing.T, s *Stream) {
// raw newline is not accepted by http2 framer so it must be encoded.
h.t.WriteStatus(s, encodingTestStatusCode, encodingTestStatusDesc)
}
func (h *testStreamHandler) handleStreamInvalidHeaderField(t *testing.T, s *Stream) {
<-h.t.writableChan
h.t.hBuf.Reset()
h.t.hEnc.WriteField(hpack.HeaderField{Name: "content-type", Value: expectedInvalidHeaderField})
if err := h.t.writeHeaders(s, h.t.hBuf, false); err != nil {
t.Fatalf("Failed to write headers: %v", err)
}
h.t.writableChan <- 0
}
// start starts server. Other goroutines should block on s.readyChan for further operations.
func (s *server) start(t *testing.T, port int, maxStreams uint32, ht hType) {
var err error
if port == 0 {
s.lis, err = net.Listen("tcp", "localhost:0")
} else {
s.lis, err = net.Listen("tcp", "localhost:"+strconv.Itoa(port))
}
if err != nil {
s.startedErr <- fmt.Errorf("failed to listen: %v", err)
return
}
_, p, err := net.SplitHostPort(s.lis.Addr().String())
if err != nil {
s.startedErr <- fmt.Errorf("failed to parse listener address: %v", err)
return
}
s.port = p
s.conns = make(map[ServerTransport]bool)
s.startedErr <- nil
for {
conn, err := s.lis.Accept()
if err != nil {
return
}
transport, err := NewServerTransport("http2", conn, maxStreams, nil)
if err != nil {
return
}
s.mu.Lock()
if s.conns == nil {
s.mu.Unlock()
transport.Close()
return
}
s.conns[transport] = true
s.mu.Unlock()
h := &testStreamHandler{transport.(*http2Server)}
switch ht {
case suspended:
go transport.HandleStreams(h.handleStreamSuspension)
case misbehaved:
go transport.HandleStreams(func(s *Stream) {
go h.handleStreamMisbehave(t, s)
})
case encodingRequiredStatus:
go transport.HandleStreams(func(s *Stream) {
go h.handleStreamEncodingRequiredStatus(t, s)
})
case invalidHeaderField:
go transport.HandleStreams(func(s *Stream) {
go h.handleStreamInvalidHeaderField(t, s)
})
default:
go transport.HandleStreams(func(s *Stream) {
go h.handleStream(t, s)
})
}
}
}
func (s *server) wait(t *testing.T, timeout time.Duration) {
select {
case err := <-s.startedErr:
if err != nil {
t.Fatal(err)
}
case <-time.After(timeout):
t.Fatalf("Timed out after %v waiting for server to be ready", timeout)
}
}
func (s *server) stop() {
s.lis.Close()
s.mu.Lock()
for c := range s.conns {
c.Close()
}
s.conns = nil
s.mu.Unlock()
}
func setUp(t *testing.T, port int, maxStreams uint32, ht hType) (*server, ClientTransport) {
server := &server{startedErr: make(chan error, 1)}
go server.start(t, port, maxStreams, ht)
server.wait(t, 2*time.Second)
addr := "localhost:" + server.port
var (
ct ClientTransport
connErr error
)
ct, connErr = NewClientTransport(context.Background(), addr, ConnectOptions{})
if connErr != nil {
t.Fatalf("failed to create transport: %v", connErr)
}
return server, ct
}
func TestClientSendAndReceive(t *testing.T) {
server, ct := setUp(t, 0, math.MaxUint32, normal)
callHdr := &CallHdr{
Host: "localhost",
Method: "foo.Small",
}
s1, err1 := ct.NewStream(context.Background(), callHdr)
if err1 != nil {
t.Fatalf("failed to open stream: %v", err1)
}
if s1.id != 1 {
t.Fatalf("wrong stream id: %d", s1.id)
}
s2, err2 := ct.NewStream(context.Background(), callHdr)
if err2 != nil {
t.Fatalf("failed to open stream: %v", err2)
}
if s2.id != 3 {
t.Fatalf("wrong stream id: %d", s2.id)
}
opts := Options{
Last: true,
Delay: false,
}
if err := ct.Write(s1, expectedRequest, &opts); err != nil && err != io.EOF {
t.Fatalf("failed to send data: %v", err)
}
p := make([]byte, len(expectedResponse))
_, recvErr := io.ReadFull(s1, p)
if recvErr != nil || !bytes.Equal(p, expectedResponse) {
t.Fatalf("Error: %v, want <nil>; Result: %v, want %v", recvErr, p, expectedResponse)
}
_, recvErr = io.ReadFull(s1, p)
if recvErr != io.EOF {
t.Fatalf("Error: %v; want <EOF>", recvErr)
}
ct.Close()
server.stop()
}
func TestClientErrorNotify(t *testing.T) {
server, ct := setUp(t, 0, math.MaxUint32, normal)
go server.stop()
// ct.reader should detect the error and activate ct.Error().
<-ct.Error()
ct.Close()
}
func performOneRPC(ct ClientTransport) {
callHdr := &CallHdr{
Host: "localhost",
Method: "foo.Small",
}
s, err := ct.NewStream(context.Background(), callHdr)
if err != nil {
return
}
opts := Options{
Last: true,
Delay: false,
}
if err := ct.Write(s, expectedRequest, &opts); err == nil || err == io.EOF {
time.Sleep(5 * time.Millisecond)
// The following s.Recv()'s could error out because the
// underlying transport is gone.
//
// Read response
p := make([]byte, len(expectedResponse))
io.ReadFull(s, p)
// Read io.EOF
io.ReadFull(s, p)
}
}
func TestClientMix(t *testing.T) {
s, ct := setUp(t, 0, math.MaxUint32, normal)
go func(s *server) {
time.Sleep(5 * time.Second)
s.stop()
}(s)
go func(ct ClientTransport) {
<-ct.Error()
ct.Close()
}(ct)
for i := 0; i < 1000; i++ {
time.Sleep(10 * time.Millisecond)
go performOneRPC(ct)
}
}
func TestLargeMessage(t *testing.T) {
server, ct := setUp(t, 0, math.MaxUint32, normal)
callHdr := &CallHdr{
Host: "localhost",
Method: "foo.Large",
}
var wg sync.WaitGroup
for i := 0; i < 2; i++ {
wg.Add(1)
go func() {
defer wg.Done()
s, err := ct.NewStream(context.Background(), callHdr)
if err != nil {
t.Errorf("%v.NewStream(_, _) = _, %v, want _, <nil>", ct, err)
}
if err := ct.Write(s, expectedRequestLarge, &Options{Last: true, Delay: false}); err != nil && err != io.EOF {
t.Errorf("%v.Write(_, _, _) = %v, want <nil>", ct, err)
}
p := make([]byte, len(expectedResponseLarge))
if _, err := io.ReadFull(s, p); err != nil || !bytes.Equal(p, expectedResponseLarge) {
t.Errorf("io.ReadFull(_, %v) = _, %v, want %v, <nil>", err, p, expectedResponse)
}
if _, err = io.ReadFull(s, p); err != io.EOF {
t.Errorf("Failed to complete the stream %v; want <EOF>", err)
}
}()
}
wg.Wait()
ct.Close()
server.stop()
}
func TestGracefulClose(t *testing.T) {
server, ct := setUp(t, 0, math.MaxUint32, normal)
callHdr := &CallHdr{
Host: "localhost",
Method: "foo.Small",
}
s, err := ct.NewStream(context.Background(), callHdr)
if err != nil {
t.Fatalf("%v.NewStream(_, _) = _, %v, want _, <nil>", ct, err)
}
if err = ct.GracefulClose(); err != nil {
t.Fatalf("%v.GracefulClose() = %v, want <nil>", ct, err)
}
var wg sync.WaitGroup
// Expect the failure for all the follow-up streams because ct has been closed gracefully.
for i := 0; i < 100; i++ {
wg.Add(1)
go func() {
defer wg.Done()
if _, err := ct.NewStream(context.Background(), callHdr); err != ErrStreamDrain {
t.Errorf("%v.NewStream(_, _) = _, %v, want _, %v", ct, err, ErrStreamDrain)
}
}()
}
opts := Options{
Last: true,
Delay: false,
}
// The stream which was created before graceful close can still proceed.
if err := ct.Write(s, expectedRequest, &opts); err != nil && err != io.EOF {
t.Fatalf("%v.Write(_, _, _) = %v, want <nil>", ct, err)
}
p := make([]byte, len(expectedResponse))
if _, err := io.ReadFull(s, p); err != nil || !bytes.Equal(p, expectedResponse) {
t.Fatalf("io.ReadFull(_, %v) = _, %v, want %v, <nil>", err, p, expectedResponse)
}
if _, err = io.ReadFull(s, p); err != io.EOF {
t.Fatalf("Failed to complete the stream %v; want <EOF>", err)
}
wg.Wait()
ct.Close()
server.stop()
}
func TestLargeMessageSuspension(t *testing.T) {
server, ct := setUp(t, 0, math.MaxUint32, suspended)
callHdr := &CallHdr{
Host: "localhost",
Method: "foo.Large",
}
// Set a long enough timeout for writing a large message out.
ctx, _ := context.WithTimeout(context.Background(), time.Second)
s, err := ct.NewStream(ctx, callHdr)
if err != nil {
t.Fatalf("failed to open stream: %v", err)
}
// Write should not be done successfully due to flow control.
err = ct.Write(s, expectedRequestLarge, &Options{Last: true, Delay: false})
expectedErr := streamErrorf(codes.DeadlineExceeded, "%v", context.DeadlineExceeded)
if err != expectedErr {
t.Fatalf("Write got %v, want %v", err, expectedErr)
}
ct.Close()
server.stop()
}
func TestMaxStreams(t *testing.T) {
server, ct := setUp(t, 0, 1, suspended)
callHdr := &CallHdr{
Host: "localhost",
Method: "foo.Large",
}
// Have a pending stream which takes all streams quota.
s, err := ct.NewStream(context.Background(), callHdr)
if err != nil {
t.Fatalf("Failed to open stream: %v", err)
}
cc, ok := ct.(*http2Client)
if !ok {
t.Fatalf("Failed to convert %v to *http2Client", ct)
}
done := make(chan struct{})
ch := make(chan int)
ready := make(chan struct{})
go func() {
for {
select {
case <-time.After(5 * time.Millisecond):
select {
case ch <- 0:
case <-ready:
return
}
case <-time.After(5 * time.Second):
close(done)
return
case <-ready:
return
}
}
}()
for {
select {
case <-ch:
case <-done:
t.Fatalf("Client has not received the max stream setting in 5 seconds.")
}
cc.mu.Lock()
// cc.streamsQuota should be initialized once receiving the 1st setting frame from
// the server.
if cc.streamsQuota != nil {
cc.mu.Unlock()
select {
case <-cc.streamsQuota.acquire():
t.Fatalf("streamsQuota.acquire() becomes readable mistakenly.")
default:
if cc.streamsQuota.quota != 0 {
t.Fatalf("streamsQuota.quota got non-zero quota mistakenly.")
}
}
break
}
cc.mu.Unlock()
}
close(ready)
// Close the pending stream so that the streams quota becomes available for the next new stream.
ct.CloseStream(s, nil)
select {
case i := <-cc.streamsQuota.acquire():
if i != 1 {
t.Fatalf("streamsQuota.acquire() got %d quota, want 1.", i)
}
cc.streamsQuota.add(i)
default:
t.Fatalf("streamsQuota.acquire() is not readable.")
}
if _, err := ct.NewStream(context.Background(), callHdr); err != nil {
t.Fatalf("Failed to open stream: %v", err)
}
ct.Close()
server.stop()
}
func TestServerContextCanceledOnClosedConnection(t *testing.T) {
server, ct := setUp(t, 0, math.MaxUint32, suspended)
callHdr := &CallHdr{
Host: "localhost",
Method: "foo",
}
var sc *http2Server
// Wait until the server transport is setup.
for {
server.mu.Lock()
if len(server.conns) == 0 {
server.mu.Unlock()
time.Sleep(time.Millisecond)
continue
}
for k := range server.conns {
var ok bool
sc, ok = k.(*http2Server)
if !ok {
t.Fatalf("Failed to convert %v to *http2Server", k)
}
}
server.mu.Unlock()
break
}
cc, ok := ct.(*http2Client)
if !ok {
t.Fatalf("Failed to convert %v to *http2Client", ct)
}
s, err := ct.NewStream(context.Background(), callHdr)
if err != nil {
t.Fatalf("Failed to open stream: %v", err)
}
// Make sure the headers frame is flushed out.
<-cc.writableChan
if err = cc.framer.writeData(true, s.id, false, make([]byte, http2MaxFrameLen)); err != nil {
t.Fatalf("Failed to write data: %v", err)
}
cc.writableChan <- 0
// Loop until the server side stream is created.
var ss *Stream
for {
time.Sleep(time.Second)
sc.mu.Lock()
if len(sc.activeStreams) == 0 {
sc.mu.Unlock()
continue
}
ss = sc.activeStreams[s.id]
sc.mu.Unlock()
break
}
cc.Close()
select {
case <-ss.Context().Done():
if ss.Context().Err() != context.Canceled {
t.Fatalf("ss.Context().Err() got %v, want %v", ss.Context().Err(), context.Canceled)
}
case <-time.After(5 * time.Second):
t.Fatalf("Failed to cancel the context of the sever side stream.")
}
server.stop()
}
func TestServerWithMisbehavedClient(t *testing.T) {
server, ct := setUp(t, 0, math.MaxUint32, suspended)
callHdr := &CallHdr{
Host: "localhost",
Method: "foo",
}
var sc *http2Server
// Wait until the server transport is setup.
for {
server.mu.Lock()
if len(server.conns) == 0 {
server.mu.Unlock()
time.Sleep(time.Millisecond)
continue
}
for k := range server.conns {
var ok bool
sc, ok = k.(*http2Server)
if !ok {
t.Fatalf("Failed to convert %v to *http2Server", k)
}
}
server.mu.Unlock()
break
}
cc, ok := ct.(*http2Client)
if !ok {
t.Fatalf("Failed to convert %v to *http2Client", ct)
}
// Test server behavior for violation of stream flow control window size restriction.
s, err := ct.NewStream(context.Background(), callHdr)
if err != nil {
t.Fatalf("Failed to open stream: %v", err)
}
var sent int
// Drain the stream flow control window
<-cc.writableChan
if err = cc.framer.writeData(true, s.id, false, make([]byte, http2MaxFrameLen)); err != nil {
t.Fatalf("Failed to write data: %v", err)
}
cc.writableChan <- 0
sent += http2MaxFrameLen
// Wait until the server creates the corresponding stream and receive some data.
var ss *Stream
for {
time.Sleep(time.Millisecond)
sc.mu.Lock()
if len(sc.activeStreams) == 0 {
sc.mu.Unlock()
continue
}
ss = sc.activeStreams[s.id]
sc.mu.Unlock()
ss.fc.mu.Lock()
if ss.fc.pendingData > 0 {
ss.fc.mu.Unlock()
break
}
ss.fc.mu.Unlock()
}
if ss.fc.pendingData != http2MaxFrameLen || ss.fc.pendingUpdate != 0 || sc.fc.pendingData != http2MaxFrameLen || sc.fc.pendingUpdate != 0 {
t.Fatalf("Server mistakenly updates inbound flow control params: got %d, %d, %d, %d; want %d, %d, %d, %d", ss.fc.pendingData, ss.fc.pendingUpdate, sc.fc.pendingData, sc.fc.pendingUpdate, http2MaxFrameLen, 0, http2MaxFrameLen, 0)
}
// Keep sending until the server inbound window is drained for that stream.
for sent <= initialWindowSize {
<-cc.writableChan
if err = cc.framer.writeData(true, s.id, false, make([]byte, 1)); err != nil {
t.Fatalf("Failed to write data: %v", err)
}
cc.writableChan <- 0
sent++
}
// Server sent a resetStream for s already.
code := http2ErrConvTab[http2.ErrCodeFlowControl]
if _, err := io.ReadFull(s, make([]byte, 1)); err != io.EOF || s.statusCode != code {
t.Fatalf("%v got err %v with statusCode %d, want err <EOF> with statusCode %d", s, err, s.statusCode, code)
}
if ss.fc.pendingData != 0 || ss.fc.pendingUpdate != 0 || sc.fc.pendingData != 0 || sc.fc.pendingUpdate <= initialWindowSize {
t.Fatalf("Server mistakenly resets inbound flow control params: got %d, %d, %d, %d; want 0, 0, 0, >%d", ss.fc.pendingData, ss.fc.pendingUpdate, sc.fc.pendingData, sc.fc.pendingUpdate, initialWindowSize)
}
ct.CloseStream(s, nil)
// Test server behavior for violation of connection flow control window size restriction.
//
// Keep creating new streams until the connection window is drained on the server and
// the server tears down the connection.
for {
s, err := ct.NewStream(context.Background(), callHdr)
if err != nil {
// The server tears down the connection.
break
}
<-cc.writableChan
cc.framer.writeData(true, s.id, true, make([]byte, http2MaxFrameLen))
cc.writableChan <- 0
}
ct.Close()
server.stop()
}
func TestClientWithMisbehavedServer(t *testing.T) {
server, ct := setUp(t, 0, math.MaxUint32, misbehaved)
callHdr := &CallHdr{
Host: "localhost",
Method: "foo.Stream",
}
conn, ok := ct.(*http2Client)
if !ok {
t.Fatalf("Failed to convert %v to *http2Client", ct)
}
// Test the logic for the violation of stream flow control window size restriction.
s, err := ct.NewStream(context.Background(), callHdr)
if err != nil {
t.Fatalf("Failed to open stream: %v", err)
}
d := make([]byte, 1)
if err := ct.Write(s, d, &Options{Last: true, Delay: false}); err != nil && err != io.EOF {
t.Fatalf("Failed to write: %v", err)
}
// Read without window update.
for {
p := make([]byte, http2MaxFrameLen)
if _, err = s.dec.Read(p); err != nil {
break
}
}
if s.fc.pendingData <= initialWindowSize || s.fc.pendingUpdate != 0 || conn.fc.pendingData <= initialWindowSize || conn.fc.pendingUpdate != 0 {
t.Fatalf("Client mistakenly updates inbound flow control params: got %d, %d, %d, %d; want >%d, %d, >%d, %d", s.fc.pendingData, s.fc.pendingUpdate, conn.fc.pendingData, conn.fc.pendingUpdate, initialWindowSize, 0, initialWindowSize, 0)
}
if err != io.EOF || s.statusCode != codes.Internal {
t.Fatalf("Got err %v and the status code %d, want <EOF> and the code %d", err, s.statusCode, codes.Internal)
}
conn.CloseStream(s, err)
if s.fc.pendingData != 0 || s.fc.pendingUpdate != 0 || conn.fc.pendingData != 0 || conn.fc.pendingUpdate <= initialWindowSize {
t.Fatalf("Client mistakenly resets inbound flow control params: got %d, %d, %d, %d; want 0, 0, 0, >%d", s.fc.pendingData, s.fc.pendingUpdate, conn.fc.pendingData, conn.fc.pendingUpdate, initialWindowSize)
}
// Test the logic for the violation of the connection flow control window size restriction.
//
// Generate enough streams to drain the connection window. Make the server flood the traffic
// to violate flow control window size of the connection.
callHdr.Method = "foo.Connection"
for i := 0; i < int(initialConnWindowSize/initialWindowSize+10); i++ {
s, err := ct.NewStream(context.Background(), callHdr)
if err != nil {
break
}
if err := ct.Write(s, d, &Options{Last: true, Delay: false}); err != nil {
break
}
}
// http2Client.errChan is closed due to connection flow control window size violation.
<-conn.Error()
ct.Close()
server.stop()
}
var (
encodingTestStatusCode = codes.Internal
encodingTestStatusDesc = "\n"
)
func TestEncodingRequiredStatus(t *testing.T) {
server, ct := setUp(t, 0, math.MaxUint32, encodingRequiredStatus)
callHdr := &CallHdr{
Host: "localhost",
Method: "foo",
}
s, err := ct.NewStream(context.Background(), callHdr)
if err != nil {
return
}
opts := Options{
Last: true,
Delay: false,
}
if err := ct.Write(s, expectedRequest, &opts); err != nil && err != io.EOF {
t.Fatalf("Failed to write the request: %v", err)
}
p := make([]byte, http2MaxFrameLen)
if _, err := s.dec.Read(p); err != io.EOF {
t.Fatalf("Read got error %v, want %v", err, io.EOF)
}
if s.StatusCode() != encodingTestStatusCode || s.StatusDesc() != encodingTestStatusDesc {
t.Fatalf("stream with status code %d, status desc %v, want %d, %v", s.StatusCode(), s.StatusDesc(), encodingTestStatusCode, encodingTestStatusDesc)
}
ct.Close()
server.stop()
}
func TestInvalidHeaderField(t *testing.T) {
server, ct := setUp(t, 0, math.MaxUint32, invalidHeaderField)
callHdr := &CallHdr{
Host: "localhost",
Method: "foo",
}
s, err := ct.NewStream(context.Background(), callHdr)
if err != nil {
return
}
opts := Options{
Last: true,
Delay: false,
}
if err := ct.Write(s, expectedRequest, &opts); err != nil && err != io.EOF {
t.Fatalf("Failed to write the request: %v", err)
}
p := make([]byte, http2MaxFrameLen)
_, err = s.dec.Read(p)
if se, ok := err.(StreamError); !ok || se.Code != codes.FailedPrecondition || !strings.Contains(err.Error(), expectedInvalidHeaderField) {
t.Fatalf("Read got error %v, want error with code %s and contains %q", err, codes.FailedPrecondition, expectedInvalidHeaderField)
}
ct.Close()
server.stop()
}
func TestStreamContext(t *testing.T) {
expectedStream := &Stream{}
ctx := newContextWithStream(context.Background(), expectedStream)
s, ok := StreamFromContext(ctx)
if !ok || expectedStream != s {
t.Fatalf("GetStreamFromContext(%v) = %v, %t, want: %v, true", ctx, s, ok, expectedStream)
}
}
func TestIsReservedHeader(t *testing.T) {
tests := []struct {
h string
want bool
}{
{"", false}, // but should be rejected earlier
{"foo", false},
{"content-type", true},
{"grpc-message-type", true},
{"grpc-encoding", true},
{"grpc-message", true},
{"grpc-status", true},
{"grpc-timeout", true},
{"te", true},
}
for _, tt := range tests {
got := isReservedHeader(tt.h)
if got != tt.want {
t.Errorf("isReservedHeader(%q) = %v; want %v", tt.h, got, tt.want)
}
}
}