294 lines
9.1 KiB
Go
294 lines
9.1 KiB
Go
|
// Copyright 2018 The etcd Authors
|
||
|
//
|
||
|
// Licensed under the Apache License, Version 2.0 (the "License");
|
||
|
// you may not use this file except in compliance with the License.
|
||
|
// You may obtain a copy of the License at
|
||
|
//
|
||
|
// http://www.apache.org/licenses/LICENSE-2.0
|
||
|
//
|
||
|
// Unless required by applicable law or agreed to in writing, software
|
||
|
// distributed under the License is distributed on an "AS IS" BASIS,
|
||
|
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||
|
// See the License for the specific language governing permissions and
|
||
|
// limitations under the License.
|
||
|
|
||
|
// Package balancer implements client balancer.
|
||
|
package balancer
|
||
|
|
||
|
import (
|
||
|
"strconv"
|
||
|
"sync"
|
||
|
"time"
|
||
|
|
||
|
"go.etcd.io/etcd/clientv3/balancer/connectivity"
|
||
|
"go.etcd.io/etcd/clientv3/balancer/picker"
|
||
|
|
||
|
"go.uber.org/zap"
|
||
|
"google.golang.org/grpc/balancer"
|
||
|
grpcconnectivity "google.golang.org/grpc/connectivity"
|
||
|
"google.golang.org/grpc/resolver"
|
||
|
_ "google.golang.org/grpc/resolver/dns" // register DNS resolver
|
||
|
_ "google.golang.org/grpc/resolver/passthrough" // register passthrough resolver
|
||
|
)
|
||
|
|
||
|
// Config defines balancer configurations.
|
||
|
type Config struct {
|
||
|
// Policy configures balancer policy.
|
||
|
Policy picker.Policy
|
||
|
|
||
|
// Picker implements gRPC picker.
|
||
|
// Leave empty if "Policy" field is not custom.
|
||
|
// TODO: currently custom policy is not supported.
|
||
|
// Picker picker.Picker
|
||
|
|
||
|
// Name defines an additional name for balancer.
|
||
|
// Useful for balancer testing to avoid register conflicts.
|
||
|
// If empty, defaults to policy name.
|
||
|
Name string
|
||
|
|
||
|
// Logger configures balancer logging.
|
||
|
// If nil, logs are discarded.
|
||
|
Logger *zap.Logger
|
||
|
}
|
||
|
|
||
|
// RegisterBuilder creates and registers a builder. Since this function calls balancer.Register, it
|
||
|
// must be invoked at initialization time.
|
||
|
func RegisterBuilder(cfg Config) {
|
||
|
bb := &builder{cfg}
|
||
|
balancer.Register(bb)
|
||
|
|
||
|
bb.cfg.Logger.Debug(
|
||
|
"registered balancer",
|
||
|
zap.String("policy", bb.cfg.Policy.String()),
|
||
|
zap.String("name", bb.cfg.Name),
|
||
|
)
|
||
|
}
|
||
|
|
||
|
type builder struct {
|
||
|
cfg Config
|
||
|
}
|
||
|
|
||
|
// Build is called initially when creating "ccBalancerWrapper".
|
||
|
// "grpc.Dial" is called to this client connection.
|
||
|
// Then, resolved addresses will be handled via "HandleResolvedAddrs".
|
||
|
func (b *builder) Build(cc balancer.ClientConn, opt balancer.BuildOptions) balancer.Balancer {
|
||
|
bb := &baseBalancer{
|
||
|
id: strconv.FormatInt(time.Now().UnixNano(), 36),
|
||
|
policy: b.cfg.Policy,
|
||
|
name: b.cfg.Name,
|
||
|
lg: b.cfg.Logger,
|
||
|
|
||
|
addrToSc: make(map[resolver.Address]balancer.SubConn),
|
||
|
scToAddr: make(map[balancer.SubConn]resolver.Address),
|
||
|
scToSt: make(map[balancer.SubConn]grpcconnectivity.State),
|
||
|
|
||
|
currentConn: nil,
|
||
|
connectivityRecorder: connectivity.New(b.cfg.Logger),
|
||
|
|
||
|
// initialize picker always returns "ErrNoSubConnAvailable"
|
||
|
picker: picker.NewErr(balancer.ErrNoSubConnAvailable),
|
||
|
}
|
||
|
|
||
|
// TODO: support multiple connections
|
||
|
bb.mu.Lock()
|
||
|
bb.currentConn = cc
|
||
|
bb.mu.Unlock()
|
||
|
|
||
|
bb.lg.Info(
|
||
|
"built balancer",
|
||
|
zap.String("balancer-id", bb.id),
|
||
|
zap.String("policy", bb.policy.String()),
|
||
|
zap.String("resolver-target", cc.Target()),
|
||
|
)
|
||
|
return bb
|
||
|
}
|
||
|
|
||
|
// Name implements "grpc/balancer.Builder" interface.
|
||
|
func (b *builder) Name() string { return b.cfg.Name }
|
||
|
|
||
|
// Balancer defines client balancer interface.
|
||
|
type Balancer interface {
|
||
|
// Balancer is called on specified client connection. Client initiates gRPC
|
||
|
// connection with "grpc.Dial(addr, grpc.WithBalancerName)", and then those resolved
|
||
|
// addresses are passed to "grpc/balancer.Balancer.HandleResolvedAddrs".
|
||
|
// For each resolved address, balancer calls "balancer.ClientConn.NewSubConn".
|
||
|
// "grpc/balancer.Balancer.HandleSubConnStateChange" is called when connectivity state
|
||
|
// changes, thus requires failover logic in this method.
|
||
|
balancer.Balancer
|
||
|
|
||
|
// Picker calls "Pick" for every client request.
|
||
|
picker.Picker
|
||
|
}
|
||
|
|
||
|
type baseBalancer struct {
|
||
|
id string
|
||
|
policy picker.Policy
|
||
|
name string
|
||
|
lg *zap.Logger
|
||
|
|
||
|
mu sync.RWMutex
|
||
|
|
||
|
addrToSc map[resolver.Address]balancer.SubConn
|
||
|
scToAddr map[balancer.SubConn]resolver.Address
|
||
|
scToSt map[balancer.SubConn]grpcconnectivity.State
|
||
|
|
||
|
currentConn balancer.ClientConn
|
||
|
connectivityRecorder connectivity.Recorder
|
||
|
|
||
|
picker picker.Picker
|
||
|
}
|
||
|
|
||
|
// HandleResolvedAddrs implements "grpc/balancer.Balancer" interface.
|
||
|
// gRPC sends initial or updated resolved addresses from "Build".
|
||
|
func (bb *baseBalancer) HandleResolvedAddrs(addrs []resolver.Address, err error) {
|
||
|
if err != nil {
|
||
|
bb.lg.Warn("HandleResolvedAddrs called with error", zap.String("balancer-id", bb.id), zap.Error(err))
|
||
|
return
|
||
|
}
|
||
|
bb.lg.Info("resolved",
|
||
|
zap.String("picker", bb.picker.String()),
|
||
|
zap.String("balancer-id", bb.id),
|
||
|
zap.Strings("addresses", addrsToStrings(addrs)),
|
||
|
)
|
||
|
|
||
|
bb.mu.Lock()
|
||
|
defer bb.mu.Unlock()
|
||
|
|
||
|
resolved := make(map[resolver.Address]struct{})
|
||
|
for _, addr := range addrs {
|
||
|
resolved[addr] = struct{}{}
|
||
|
if _, ok := bb.addrToSc[addr]; !ok {
|
||
|
sc, err := bb.currentConn.NewSubConn([]resolver.Address{addr}, balancer.NewSubConnOptions{})
|
||
|
if err != nil {
|
||
|
bb.lg.Warn("NewSubConn failed", zap.String("picker", bb.picker.String()), zap.String("balancer-id", bb.id), zap.Error(err), zap.String("address", addr.Addr))
|
||
|
continue
|
||
|
}
|
||
|
bb.lg.Info("created subconn", zap.String("address", addr.Addr))
|
||
|
bb.addrToSc[addr] = sc
|
||
|
bb.scToAddr[sc] = addr
|
||
|
bb.scToSt[sc] = grpcconnectivity.Idle
|
||
|
sc.Connect()
|
||
|
}
|
||
|
}
|
||
|
|
||
|
for addr, sc := range bb.addrToSc {
|
||
|
if _, ok := resolved[addr]; !ok {
|
||
|
// was removed by resolver or failed to create subconn
|
||
|
bb.currentConn.RemoveSubConn(sc)
|
||
|
delete(bb.addrToSc, addr)
|
||
|
|
||
|
bb.lg.Info(
|
||
|
"removed subconn",
|
||
|
zap.String("picker", bb.picker.String()),
|
||
|
zap.String("balancer-id", bb.id),
|
||
|
zap.String("address", addr.Addr),
|
||
|
zap.String("subconn", scToString(sc)),
|
||
|
)
|
||
|
|
||
|
// Keep the state of this sc in bb.scToSt until sc's state becomes Shutdown.
|
||
|
// The entry will be deleted in HandleSubConnStateChange.
|
||
|
// (DO NOT) delete(bb.scToAddr, sc)
|
||
|
// (DO NOT) delete(bb.scToSt, sc)
|
||
|
}
|
||
|
}
|
||
|
}
|
||
|
|
||
|
// HandleSubConnStateChange implements "grpc/balancer.Balancer" interface.
|
||
|
func (bb *baseBalancer) HandleSubConnStateChange(sc balancer.SubConn, s grpcconnectivity.State) {
|
||
|
bb.mu.Lock()
|
||
|
defer bb.mu.Unlock()
|
||
|
|
||
|
old, ok := bb.scToSt[sc]
|
||
|
if !ok {
|
||
|
bb.lg.Warn(
|
||
|
"state change for an unknown subconn",
|
||
|
zap.String("picker", bb.picker.String()),
|
||
|
zap.String("balancer-id", bb.id),
|
||
|
zap.String("subconn", scToString(sc)),
|
||
|
zap.Int("subconn-size", len(bb.scToAddr)),
|
||
|
zap.String("state", s.String()),
|
||
|
)
|
||
|
return
|
||
|
}
|
||
|
|
||
|
bb.lg.Info(
|
||
|
"state changed",
|
||
|
zap.String("picker", bb.picker.String()),
|
||
|
zap.String("balancer-id", bb.id),
|
||
|
zap.Bool("connected", s == grpcconnectivity.Ready),
|
||
|
zap.String("subconn", scToString(sc)),
|
||
|
zap.Int("subconn-size", len(bb.scToAddr)),
|
||
|
zap.String("address", bb.scToAddr[sc].Addr),
|
||
|
zap.String("old-state", old.String()),
|
||
|
zap.String("new-state", s.String()),
|
||
|
)
|
||
|
|
||
|
bb.scToSt[sc] = s
|
||
|
switch s {
|
||
|
case grpcconnectivity.Idle:
|
||
|
sc.Connect()
|
||
|
case grpcconnectivity.Shutdown:
|
||
|
// When an address was removed by resolver, b called RemoveSubConn but
|
||
|
// kept the sc's state in scToSt. Remove state for this sc here.
|
||
|
delete(bb.scToAddr, sc)
|
||
|
delete(bb.scToSt, sc)
|
||
|
}
|
||
|
|
||
|
oldAggrState := bb.connectivityRecorder.GetCurrentState()
|
||
|
bb.connectivityRecorder.RecordTransition(old, s)
|
||
|
|
||
|
// Update balancer picker when one of the following happens:
|
||
|
// - this sc became ready from not-ready
|
||
|
// - this sc became not-ready from ready
|
||
|
// - the aggregated state of balancer became TransientFailure from non-TransientFailure
|
||
|
// - the aggregated state of balancer became non-TransientFailure from TransientFailure
|
||
|
if (s == grpcconnectivity.Ready) != (old == grpcconnectivity.Ready) ||
|
||
|
(bb.connectivityRecorder.GetCurrentState() == grpcconnectivity.TransientFailure) != (oldAggrState == grpcconnectivity.TransientFailure) {
|
||
|
bb.updatePicker()
|
||
|
}
|
||
|
|
||
|
bb.currentConn.UpdateBalancerState(bb.connectivityRecorder.GetCurrentState(), bb.picker)
|
||
|
}
|
||
|
|
||
|
func (bb *baseBalancer) updatePicker() {
|
||
|
if bb.connectivityRecorder.GetCurrentState() == grpcconnectivity.TransientFailure {
|
||
|
bb.picker = picker.NewErr(balancer.ErrTransientFailure)
|
||
|
bb.lg.Info(
|
||
|
"updated picker to transient error picker",
|
||
|
zap.String("picker", bb.picker.String()),
|
||
|
zap.String("balancer-id", bb.id),
|
||
|
zap.String("policy", bb.policy.String()),
|
||
|
)
|
||
|
return
|
||
|
}
|
||
|
|
||
|
// only pass ready subconns to picker
|
||
|
scToAddr := make(map[balancer.SubConn]resolver.Address)
|
||
|
for addr, sc := range bb.addrToSc {
|
||
|
if st, ok := bb.scToSt[sc]; ok && st == grpcconnectivity.Ready {
|
||
|
scToAddr[sc] = addr
|
||
|
}
|
||
|
}
|
||
|
|
||
|
bb.picker = picker.New(picker.Config{
|
||
|
Policy: bb.policy,
|
||
|
Logger: bb.lg,
|
||
|
SubConnToResolverAddress: scToAddr,
|
||
|
})
|
||
|
bb.lg.Info(
|
||
|
"updated picker",
|
||
|
zap.String("picker", bb.picker.String()),
|
||
|
zap.String("balancer-id", bb.id),
|
||
|
zap.String("policy", bb.policy.String()),
|
||
|
zap.Strings("subconn-ready", scsToStrings(scToAddr)),
|
||
|
zap.Int("subconn-size", len(scToAddr)),
|
||
|
)
|
||
|
}
|
||
|
|
||
|
// Close implements "grpc/balancer.Balancer" interface.
|
||
|
// Close is a nop because base balancer doesn't have internal state to clean up,
|
||
|
// and it doesn't need to call RemoveSubConn for the SubConns.
|
||
|
func (bb *baseBalancer) Close() {
|
||
|
// TODO
|
||
|
}
|