597 lines
15 KiB
Go
597 lines
15 KiB
Go
// Copyright 2016 The etcd Authors
|
|
//
|
|
// Licensed under the Apache License, Version 2.0 (the "License");
|
|
// you may not use this file except in compliance with the License.
|
|
// You may obtain a copy of the License at
|
|
//
|
|
// http://www.apache.org/licenses/LICENSE-2.0
|
|
//
|
|
// Unless required by applicable law or agreed to in writing, software
|
|
// distributed under the License is distributed on an "AS IS" BASIS,
|
|
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
|
// See the License for the specific language governing permissions and
|
|
// limitations under the License.
|
|
|
|
package clientv3
|
|
|
|
import (
|
|
"context"
|
|
"sync"
|
|
"time"
|
|
|
|
"go.etcd.io/etcd/etcdserver/api/v3rpc/rpctypes"
|
|
pb "go.etcd.io/etcd/etcdserver/etcdserverpb"
|
|
|
|
"go.uber.org/zap"
|
|
"google.golang.org/grpc"
|
|
"google.golang.org/grpc/metadata"
|
|
)
|
|
|
|
type (
|
|
LeaseRevokeResponse pb.LeaseRevokeResponse
|
|
LeaseID int64
|
|
)
|
|
|
|
// LeaseGrantResponse wraps the protobuf message LeaseGrantResponse.
|
|
type LeaseGrantResponse struct {
|
|
*pb.ResponseHeader
|
|
ID LeaseID
|
|
TTL int64
|
|
Error string
|
|
}
|
|
|
|
// LeaseKeepAliveResponse wraps the protobuf message LeaseKeepAliveResponse.
|
|
type LeaseKeepAliveResponse struct {
|
|
*pb.ResponseHeader
|
|
ID LeaseID
|
|
TTL int64
|
|
}
|
|
|
|
// LeaseTimeToLiveResponse wraps the protobuf message LeaseTimeToLiveResponse.
|
|
type LeaseTimeToLiveResponse struct {
|
|
*pb.ResponseHeader
|
|
ID LeaseID `json:"id"`
|
|
|
|
// TTL is the remaining TTL in seconds for the lease; the lease will expire in under TTL+1 seconds. Expired lease will return -1.
|
|
TTL int64 `json:"ttl"`
|
|
|
|
// GrantedTTL is the initial granted time in seconds upon lease creation/renewal.
|
|
GrantedTTL int64 `json:"granted-ttl"`
|
|
|
|
// Keys is the list of keys attached to this lease.
|
|
Keys [][]byte `json:"keys"`
|
|
}
|
|
|
|
// LeaseStatus represents a lease status.
|
|
type LeaseStatus struct {
|
|
ID LeaseID `json:"id"`
|
|
// TODO: TTL int64
|
|
}
|
|
|
|
// LeaseLeasesResponse wraps the protobuf message LeaseLeasesResponse.
|
|
type LeaseLeasesResponse struct {
|
|
*pb.ResponseHeader
|
|
Leases []LeaseStatus `json:"leases"`
|
|
}
|
|
|
|
const (
|
|
// defaultTTL is the assumed lease TTL used for the first keepalive
|
|
// deadline before the actual TTL is known to the client.
|
|
defaultTTL = 5 * time.Second
|
|
// NoLease is a lease ID for the absence of a lease.
|
|
NoLease LeaseID = 0
|
|
|
|
// retryConnWait is how long to wait before retrying request due to an error
|
|
retryConnWait = 500 * time.Millisecond
|
|
)
|
|
|
|
// LeaseResponseChSize is the size of buffer to store unsent lease responses.
|
|
// WARNING: DO NOT UPDATE.
|
|
// Only for testing purposes.
|
|
var LeaseResponseChSize = 16
|
|
|
|
// ErrKeepAliveHalted is returned if client keep alive loop halts with an unexpected error.
|
|
//
|
|
// This usually means that automatic lease renewal via KeepAlive is broken, but KeepAliveOnce will still work as expected.
|
|
type ErrKeepAliveHalted struct {
|
|
Reason error
|
|
}
|
|
|
|
func (e ErrKeepAliveHalted) Error() string {
|
|
s := "etcdclient: leases keep alive halted"
|
|
if e.Reason != nil {
|
|
s += ": " + e.Reason.Error()
|
|
}
|
|
return s
|
|
}
|
|
|
|
type Lease interface {
|
|
// Grant creates a new lease.
|
|
Grant(ctx context.Context, ttl int64) (*LeaseGrantResponse, error)
|
|
|
|
// Revoke revokes the given lease.
|
|
Revoke(ctx context.Context, id LeaseID) (*LeaseRevokeResponse, error)
|
|
|
|
// TimeToLive retrieves the lease information of the given lease ID.
|
|
TimeToLive(ctx context.Context, id LeaseID, opts ...LeaseOption) (*LeaseTimeToLiveResponse, error)
|
|
|
|
// Leases retrieves all leases.
|
|
Leases(ctx context.Context) (*LeaseLeasesResponse, error)
|
|
|
|
// KeepAlive attempts to keep the given lease alive forever. If the keepalive responses posted
|
|
// to the channel are not consumed promptly the channel may become full. When full, the lease
|
|
// client will continue sending keep alive requests to the etcd server, but will drop responses
|
|
// until there is capacity on the channel to send more responses.
|
|
//
|
|
// If client keep alive loop halts with an unexpected error (e.g. "etcdserver: no leader") or
|
|
// canceled by the caller (e.g. context.Canceled), KeepAlive returns a ErrKeepAliveHalted error
|
|
// containing the error reason.
|
|
//
|
|
// The returned "LeaseKeepAliveResponse" channel closes if underlying keep
|
|
// alive stream is interrupted in some way the client cannot handle itself;
|
|
// given context "ctx" is canceled or timed out.
|
|
//
|
|
// TODO(v4.0): post errors to last keep alive message before closing
|
|
// (see https://github.com/etcd-io/etcd/pull/7866)
|
|
KeepAlive(ctx context.Context, id LeaseID) (<-chan *LeaseKeepAliveResponse, error)
|
|
|
|
// KeepAliveOnce renews the lease once. The response corresponds to the
|
|
// first message from calling KeepAlive. If the response has a recoverable
|
|
// error, KeepAliveOnce will retry the RPC with a new keep alive message.
|
|
//
|
|
// In most of the cases, Keepalive should be used instead of KeepAliveOnce.
|
|
KeepAliveOnce(ctx context.Context, id LeaseID) (*LeaseKeepAliveResponse, error)
|
|
|
|
// Close releases all resources Lease keeps for efficient communication
|
|
// with the etcd server.
|
|
Close() error
|
|
}
|
|
|
|
type lessor struct {
|
|
mu sync.Mutex // guards all fields
|
|
|
|
// donec is closed and loopErr is set when recvKeepAliveLoop stops
|
|
donec chan struct{}
|
|
loopErr error
|
|
|
|
remote pb.LeaseClient
|
|
|
|
stream pb.Lease_LeaseKeepAliveClient
|
|
streamCancel context.CancelFunc
|
|
|
|
stopCtx context.Context
|
|
stopCancel context.CancelFunc
|
|
|
|
keepAlives map[LeaseID]*keepAlive
|
|
|
|
// firstKeepAliveTimeout is the timeout for the first keepalive request
|
|
// before the actual TTL is known to the lease client
|
|
firstKeepAliveTimeout time.Duration
|
|
|
|
// firstKeepAliveOnce ensures stream starts after first KeepAlive call.
|
|
firstKeepAliveOnce sync.Once
|
|
|
|
callOpts []grpc.CallOption
|
|
|
|
lg *zap.Logger
|
|
}
|
|
|
|
// keepAlive multiplexes a keepalive for a lease over multiple channels
|
|
type keepAlive struct {
|
|
chs []chan<- *LeaseKeepAliveResponse
|
|
ctxs []context.Context
|
|
// deadline is the time the keep alive channels close if no response
|
|
deadline time.Time
|
|
// nextKeepAlive is when to send the next keep alive message
|
|
nextKeepAlive time.Time
|
|
// donec is closed on lease revoke, expiration, or cancel.
|
|
donec chan struct{}
|
|
}
|
|
|
|
func NewLease(c *Client) Lease {
|
|
return NewLeaseFromLeaseClient(RetryLeaseClient(c), c, c.cfg.DialTimeout+time.Second)
|
|
}
|
|
|
|
func NewLeaseFromLeaseClient(remote pb.LeaseClient, c *Client, keepAliveTimeout time.Duration) Lease {
|
|
l := &lessor{
|
|
donec: make(chan struct{}),
|
|
keepAlives: make(map[LeaseID]*keepAlive),
|
|
remote: remote,
|
|
firstKeepAliveTimeout: keepAliveTimeout,
|
|
lg: c.lg,
|
|
}
|
|
if l.firstKeepAliveTimeout == time.Second {
|
|
l.firstKeepAliveTimeout = defaultTTL
|
|
}
|
|
if c != nil {
|
|
l.callOpts = c.callOpts
|
|
}
|
|
reqLeaderCtx := WithRequireLeader(context.Background())
|
|
l.stopCtx, l.stopCancel = context.WithCancel(reqLeaderCtx)
|
|
return l
|
|
}
|
|
|
|
func (l *lessor) Grant(ctx context.Context, ttl int64) (*LeaseGrantResponse, error) {
|
|
r := &pb.LeaseGrantRequest{TTL: ttl}
|
|
resp, err := l.remote.LeaseGrant(ctx, r, l.callOpts...)
|
|
if err == nil {
|
|
gresp := &LeaseGrantResponse{
|
|
ResponseHeader: resp.GetHeader(),
|
|
ID: LeaseID(resp.ID),
|
|
TTL: resp.TTL,
|
|
Error: resp.Error,
|
|
}
|
|
return gresp, nil
|
|
}
|
|
return nil, toErr(ctx, err)
|
|
}
|
|
|
|
func (l *lessor) Revoke(ctx context.Context, id LeaseID) (*LeaseRevokeResponse, error) {
|
|
r := &pb.LeaseRevokeRequest{ID: int64(id)}
|
|
resp, err := l.remote.LeaseRevoke(ctx, r, l.callOpts...)
|
|
if err == nil {
|
|
return (*LeaseRevokeResponse)(resp), nil
|
|
}
|
|
return nil, toErr(ctx, err)
|
|
}
|
|
|
|
func (l *lessor) TimeToLive(ctx context.Context, id LeaseID, opts ...LeaseOption) (*LeaseTimeToLiveResponse, error) {
|
|
r := toLeaseTimeToLiveRequest(id, opts...)
|
|
resp, err := l.remote.LeaseTimeToLive(ctx, r, l.callOpts...)
|
|
if err == nil {
|
|
gresp := &LeaseTimeToLiveResponse{
|
|
ResponseHeader: resp.GetHeader(),
|
|
ID: LeaseID(resp.ID),
|
|
TTL: resp.TTL,
|
|
GrantedTTL: resp.GrantedTTL,
|
|
Keys: resp.Keys,
|
|
}
|
|
return gresp, nil
|
|
}
|
|
return nil, toErr(ctx, err)
|
|
}
|
|
|
|
func (l *lessor) Leases(ctx context.Context) (*LeaseLeasesResponse, error) {
|
|
resp, err := l.remote.LeaseLeases(ctx, &pb.LeaseLeasesRequest{}, l.callOpts...)
|
|
if err == nil {
|
|
leases := make([]LeaseStatus, len(resp.Leases))
|
|
for i := range resp.Leases {
|
|
leases[i] = LeaseStatus{ID: LeaseID(resp.Leases[i].ID)}
|
|
}
|
|
return &LeaseLeasesResponse{ResponseHeader: resp.GetHeader(), Leases: leases}, nil
|
|
}
|
|
return nil, toErr(ctx, err)
|
|
}
|
|
|
|
func (l *lessor) KeepAlive(ctx context.Context, id LeaseID) (<-chan *LeaseKeepAliveResponse, error) {
|
|
ch := make(chan *LeaseKeepAliveResponse, LeaseResponseChSize)
|
|
|
|
l.mu.Lock()
|
|
// ensure that recvKeepAliveLoop is still running
|
|
select {
|
|
case <-l.donec:
|
|
err := l.loopErr
|
|
l.mu.Unlock()
|
|
close(ch)
|
|
return ch, ErrKeepAliveHalted{Reason: err}
|
|
default:
|
|
}
|
|
ka, ok := l.keepAlives[id]
|
|
if !ok {
|
|
// create fresh keep alive
|
|
ka = &keepAlive{
|
|
chs: []chan<- *LeaseKeepAliveResponse{ch},
|
|
ctxs: []context.Context{ctx},
|
|
deadline: time.Now().Add(l.firstKeepAliveTimeout),
|
|
nextKeepAlive: time.Now(),
|
|
donec: make(chan struct{}),
|
|
}
|
|
l.keepAlives[id] = ka
|
|
} else {
|
|
// add channel and context to existing keep alive
|
|
ka.ctxs = append(ka.ctxs, ctx)
|
|
ka.chs = append(ka.chs, ch)
|
|
}
|
|
l.mu.Unlock()
|
|
|
|
go l.keepAliveCtxCloser(ctx, id, ka.donec)
|
|
l.firstKeepAliveOnce.Do(func() {
|
|
go l.recvKeepAliveLoop()
|
|
go l.deadlineLoop()
|
|
})
|
|
|
|
return ch, nil
|
|
}
|
|
|
|
func (l *lessor) KeepAliveOnce(ctx context.Context, id LeaseID) (*LeaseKeepAliveResponse, error) {
|
|
for {
|
|
resp, err := l.keepAliveOnce(ctx, id)
|
|
if err == nil {
|
|
if resp.TTL <= 0 {
|
|
err = rpctypes.ErrLeaseNotFound
|
|
}
|
|
return resp, err
|
|
}
|
|
if isHaltErr(ctx, err) {
|
|
return nil, toErr(ctx, err)
|
|
}
|
|
}
|
|
}
|
|
|
|
func (l *lessor) Close() error {
|
|
l.stopCancel()
|
|
// close for synchronous teardown if stream goroutines never launched
|
|
l.firstKeepAliveOnce.Do(func() { close(l.donec) })
|
|
<-l.donec
|
|
return nil
|
|
}
|
|
|
|
func (l *lessor) keepAliveCtxCloser(ctx context.Context, id LeaseID, donec <-chan struct{}) {
|
|
select {
|
|
case <-donec:
|
|
return
|
|
case <-l.donec:
|
|
return
|
|
case <-ctx.Done():
|
|
}
|
|
|
|
l.mu.Lock()
|
|
defer l.mu.Unlock()
|
|
|
|
ka, ok := l.keepAlives[id]
|
|
if !ok {
|
|
return
|
|
}
|
|
|
|
// close channel and remove context if still associated with keep alive
|
|
for i, c := range ka.ctxs {
|
|
if c == ctx {
|
|
close(ka.chs[i])
|
|
ka.ctxs = append(ka.ctxs[:i], ka.ctxs[i+1:]...)
|
|
ka.chs = append(ka.chs[:i], ka.chs[i+1:]...)
|
|
break
|
|
}
|
|
}
|
|
// remove if no one more listeners
|
|
if len(ka.chs) == 0 {
|
|
delete(l.keepAlives, id)
|
|
}
|
|
}
|
|
|
|
// closeRequireLeader scans keepAlives for ctxs that have require leader
|
|
// and closes the associated channels.
|
|
func (l *lessor) closeRequireLeader() {
|
|
l.mu.Lock()
|
|
defer l.mu.Unlock()
|
|
for _, ka := range l.keepAlives {
|
|
reqIdxs := 0
|
|
// find all required leader channels, close, mark as nil
|
|
for i, ctx := range ka.ctxs {
|
|
md, ok := metadata.FromOutgoingContext(ctx)
|
|
if !ok {
|
|
continue
|
|
}
|
|
ks := md[rpctypes.MetadataRequireLeaderKey]
|
|
if len(ks) < 1 || ks[0] != rpctypes.MetadataHasLeader {
|
|
continue
|
|
}
|
|
close(ka.chs[i])
|
|
ka.chs[i] = nil
|
|
reqIdxs++
|
|
}
|
|
if reqIdxs == 0 {
|
|
continue
|
|
}
|
|
// remove all channels that required a leader from keepalive
|
|
newChs := make([]chan<- *LeaseKeepAliveResponse, len(ka.chs)-reqIdxs)
|
|
newCtxs := make([]context.Context, len(newChs))
|
|
newIdx := 0
|
|
for i := range ka.chs {
|
|
if ka.chs[i] == nil {
|
|
continue
|
|
}
|
|
newChs[newIdx], newCtxs[newIdx] = ka.chs[i], ka.ctxs[newIdx]
|
|
newIdx++
|
|
}
|
|
ka.chs, ka.ctxs = newChs, newCtxs
|
|
}
|
|
}
|
|
|
|
func (l *lessor) keepAliveOnce(ctx context.Context, id LeaseID) (*LeaseKeepAliveResponse, error) {
|
|
cctx, cancel := context.WithCancel(ctx)
|
|
defer cancel()
|
|
|
|
stream, err := l.remote.LeaseKeepAlive(cctx, l.callOpts...)
|
|
if err != nil {
|
|
return nil, toErr(ctx, err)
|
|
}
|
|
|
|
err = stream.Send(&pb.LeaseKeepAliveRequest{ID: int64(id)})
|
|
if err != nil {
|
|
return nil, toErr(ctx, err)
|
|
}
|
|
|
|
resp, rerr := stream.Recv()
|
|
if rerr != nil {
|
|
return nil, toErr(ctx, rerr)
|
|
}
|
|
|
|
karesp := &LeaseKeepAliveResponse{
|
|
ResponseHeader: resp.GetHeader(),
|
|
ID: LeaseID(resp.ID),
|
|
TTL: resp.TTL,
|
|
}
|
|
return karesp, nil
|
|
}
|
|
|
|
func (l *lessor) recvKeepAliveLoop() (gerr error) {
|
|
defer func() {
|
|
l.mu.Lock()
|
|
close(l.donec)
|
|
l.loopErr = gerr
|
|
for _, ka := range l.keepAlives {
|
|
ka.close()
|
|
}
|
|
l.keepAlives = make(map[LeaseID]*keepAlive)
|
|
l.mu.Unlock()
|
|
}()
|
|
|
|
for {
|
|
stream, err := l.resetRecv()
|
|
if err != nil {
|
|
if canceledByCaller(l.stopCtx, err) {
|
|
return err
|
|
}
|
|
} else {
|
|
for {
|
|
resp, err := stream.Recv()
|
|
if err != nil {
|
|
if canceledByCaller(l.stopCtx, err) {
|
|
return err
|
|
}
|
|
|
|
if toErr(l.stopCtx, err) == rpctypes.ErrNoLeader {
|
|
l.closeRequireLeader()
|
|
}
|
|
break
|
|
}
|
|
|
|
l.recvKeepAlive(resp)
|
|
}
|
|
}
|
|
|
|
select {
|
|
case <-time.After(retryConnWait):
|
|
case <-l.stopCtx.Done():
|
|
return l.stopCtx.Err()
|
|
}
|
|
}
|
|
}
|
|
|
|
// resetRecv opens a new lease stream and starts sending keep alive requests.
|
|
func (l *lessor) resetRecv() (pb.Lease_LeaseKeepAliveClient, error) {
|
|
sctx, cancel := context.WithCancel(l.stopCtx)
|
|
stream, err := l.remote.LeaseKeepAlive(sctx, append(l.callOpts, withMax(0))...)
|
|
if err != nil {
|
|
cancel()
|
|
return nil, err
|
|
}
|
|
|
|
l.mu.Lock()
|
|
defer l.mu.Unlock()
|
|
if l.stream != nil && l.streamCancel != nil {
|
|
l.streamCancel()
|
|
}
|
|
|
|
l.streamCancel = cancel
|
|
l.stream = stream
|
|
|
|
go l.sendKeepAliveLoop(stream)
|
|
return stream, nil
|
|
}
|
|
|
|
// recvKeepAlive updates a lease based on its LeaseKeepAliveResponse
|
|
func (l *lessor) recvKeepAlive(resp *pb.LeaseKeepAliveResponse) {
|
|
karesp := &LeaseKeepAliveResponse{
|
|
ResponseHeader: resp.GetHeader(),
|
|
ID: LeaseID(resp.ID),
|
|
TTL: resp.TTL,
|
|
}
|
|
|
|
l.mu.Lock()
|
|
defer l.mu.Unlock()
|
|
|
|
ka, ok := l.keepAlives[karesp.ID]
|
|
if !ok {
|
|
return
|
|
}
|
|
|
|
if karesp.TTL <= 0 {
|
|
// lease expired; close all keep alive channels
|
|
delete(l.keepAlives, karesp.ID)
|
|
ka.close()
|
|
return
|
|
}
|
|
|
|
// send update to all channels
|
|
nextKeepAlive := time.Now().Add((time.Duration(karesp.TTL) * time.Second) / 3.0)
|
|
ka.deadline = time.Now().Add(time.Duration(karesp.TTL) * time.Second)
|
|
for _, ch := range ka.chs {
|
|
select {
|
|
case ch <- karesp:
|
|
default:
|
|
if l.lg != nil {
|
|
l.lg.Warn("lease keepalive response queue is full; dropping response send",
|
|
zap.Int("queue-size", len(ch)),
|
|
zap.Int("queue-capacity", cap(ch)),
|
|
)
|
|
}
|
|
}
|
|
// still advance in order to rate-limit keep-alive sends
|
|
ka.nextKeepAlive = nextKeepAlive
|
|
}
|
|
}
|
|
|
|
// deadlineLoop reaps any keep alive channels that have not received a response
|
|
// within the lease TTL
|
|
func (l *lessor) deadlineLoop() {
|
|
for {
|
|
select {
|
|
case <-time.After(time.Second):
|
|
case <-l.donec:
|
|
return
|
|
}
|
|
now := time.Now()
|
|
l.mu.Lock()
|
|
for id, ka := range l.keepAlives {
|
|
if ka.deadline.Before(now) {
|
|
// waited too long for response; lease may be expired
|
|
ka.close()
|
|
delete(l.keepAlives, id)
|
|
}
|
|
}
|
|
l.mu.Unlock()
|
|
}
|
|
}
|
|
|
|
// sendKeepAliveLoop sends keep alive requests for the lifetime of the given stream.
|
|
func (l *lessor) sendKeepAliveLoop(stream pb.Lease_LeaseKeepAliveClient) {
|
|
for {
|
|
var tosend []LeaseID
|
|
|
|
now := time.Now()
|
|
l.mu.Lock()
|
|
for id, ka := range l.keepAlives {
|
|
if ka.nextKeepAlive.Before(now) {
|
|
tosend = append(tosend, id)
|
|
}
|
|
}
|
|
l.mu.Unlock()
|
|
|
|
for _, id := range tosend {
|
|
r := &pb.LeaseKeepAliveRequest{ID: int64(id)}
|
|
if err := stream.Send(r); err != nil {
|
|
// TODO do something with this error?
|
|
return
|
|
}
|
|
}
|
|
|
|
select {
|
|
case <-time.After(retryConnWait):
|
|
case <-stream.Context().Done():
|
|
return
|
|
case <-l.donec:
|
|
return
|
|
case <-l.stopCtx.Done():
|
|
return
|
|
}
|
|
}
|
|
}
|
|
|
|
func (ka *keepAlive) close() {
|
|
close(ka.donec)
|
|
for _, ch := range ka.chs {
|
|
close(ch)
|
|
}
|
|
}
|