133 lines
3.6 KiB
Go
133 lines
3.6 KiB
Go
// Copyright 2019 The etcd Authors
|
|
//
|
|
// Licensed under the Apache License, Version 2.0 (the "License");
|
|
// you may not use this file except in compliance with the License.
|
|
// You may obtain a copy of the License at
|
|
//
|
|
// http://www.apache.org/licenses/LICENSE-2.0
|
|
//
|
|
// Unless required by applicable law or agreed to in writing, software
|
|
// distributed under the License is distributed on an "AS IS" BASIS,
|
|
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
|
// See the License for the specific language governing permissions and
|
|
// limitations under the License.
|
|
|
|
package tracker
|
|
|
|
// Inflights limits the number of MsgApp (represented by the largest index
|
|
// contained within) sent to followers but not yet acknowledged by them. Callers
|
|
// use Full() to check whether more messages can be sent, call Add() whenever
|
|
// they are sending a new append, and release "quota" via FreeLE() whenever an
|
|
// ack is received.
|
|
type Inflights struct {
|
|
// the starting index in the buffer
|
|
start int
|
|
// number of inflights in the buffer
|
|
count int
|
|
|
|
// the size of the buffer
|
|
size int
|
|
|
|
// buffer contains the index of the last entry
|
|
// inside one message.
|
|
buffer []uint64
|
|
}
|
|
|
|
// NewInflights sets up an Inflights that allows up to 'size' inflight messages.
|
|
func NewInflights(size int) *Inflights {
|
|
return &Inflights{
|
|
size: size,
|
|
}
|
|
}
|
|
|
|
// Clone returns an *Inflights that is identical to but shares no memory with
|
|
// the receiver.
|
|
func (in *Inflights) Clone() *Inflights {
|
|
ins := *in
|
|
ins.buffer = append([]uint64(nil), in.buffer...)
|
|
return &ins
|
|
}
|
|
|
|
// Add notifies the Inflights that a new message with the given index is being
|
|
// dispatched. Full() must be called prior to Add() to verify that there is room
|
|
// for one more message, and consecutive calls to add Add() must provide a
|
|
// monotonic sequence of indexes.
|
|
func (in *Inflights) Add(inflight uint64) {
|
|
if in.Full() {
|
|
panic("cannot add into a Full inflights")
|
|
}
|
|
next := in.start + in.count
|
|
size := in.size
|
|
if next >= size {
|
|
next -= size
|
|
}
|
|
if next >= len(in.buffer) {
|
|
in.grow()
|
|
}
|
|
in.buffer[next] = inflight
|
|
in.count++
|
|
}
|
|
|
|
// grow the inflight buffer by doubling up to inflights.size. We grow on demand
|
|
// instead of preallocating to inflights.size to handle systems which have
|
|
// thousands of Raft groups per process.
|
|
func (in *Inflights) grow() {
|
|
newSize := len(in.buffer) * 2
|
|
if newSize == 0 {
|
|
newSize = 1
|
|
} else if newSize > in.size {
|
|
newSize = in.size
|
|
}
|
|
newBuffer := make([]uint64, newSize)
|
|
copy(newBuffer, in.buffer)
|
|
in.buffer = newBuffer
|
|
}
|
|
|
|
// FreeLE frees the inflights smaller or equal to the given `to` flight.
|
|
func (in *Inflights) FreeLE(to uint64) {
|
|
if in.count == 0 || to < in.buffer[in.start] {
|
|
// out of the left side of the window
|
|
return
|
|
}
|
|
|
|
idx := in.start
|
|
var i int
|
|
for i = 0; i < in.count; i++ {
|
|
if to < in.buffer[idx] { // found the first large inflight
|
|
break
|
|
}
|
|
|
|
// increase index and maybe rotate
|
|
size := in.size
|
|
if idx++; idx >= size {
|
|
idx -= size
|
|
}
|
|
}
|
|
// free i inflights and set new start index
|
|
in.count -= i
|
|
in.start = idx
|
|
if in.count == 0 {
|
|
// inflights is empty, reset the start index so that we don't grow the
|
|
// buffer unnecessarily.
|
|
in.start = 0
|
|
}
|
|
}
|
|
|
|
// FreeFirstOne releases the first inflight. This is a no-op if nothing is
|
|
// inflight.
|
|
func (in *Inflights) FreeFirstOne() { in.FreeLE(in.buffer[in.start]) }
|
|
|
|
// Full returns true if no more messages can be sent at the moment.
|
|
func (in *Inflights) Full() bool {
|
|
return in.count == in.size
|
|
}
|
|
|
|
// Count returns the number of inflight messages.
|
|
func (in *Inflights) Count() int { return in.count }
|
|
|
|
// reset frees all inflights.
|
|
func (in *Inflights) reset() {
|
|
in.count = 0
|
|
in.start = 0
|
|
}
|