remove sigCatchupped; it has no use after refactor
(refactor: used to not have WithLabelValues)
This commit is contained in:
parent
d142fa9295
commit
2fa1c6cd7b
@ -51,14 +51,6 @@ const SendQueueLimit = 1024
|
|||||||
func (s *submitter) shipFile(ctx context.Context, name string, deleteAfterRead bool) {
|
func (s *submitter) shipFile(ctx context.Context, name string, deleteAfterRead bool) {
|
||||||
baseName := filepath.Base(name)
|
baseName := filepath.Base(name)
|
||||||
|
|
||||||
sigCatchupped := make(chan struct{}, 1)
|
|
||||||
go func() {
|
|
||||||
<-sigCatchupped
|
|
||||||
close(sigCatchupped) // once
|
|
||||||
|
|
||||||
promCatchupDone.WithLabelValues(baseName).Add(1)
|
|
||||||
}()
|
|
||||||
|
|
||||||
sendChan := make(chan mLog, SendQueueLimit)
|
sendChan := make(chan mLog, SendQueueLimit)
|
||||||
defer close(sendChan)
|
defer close(sendChan)
|
||||||
|
|
||||||
@ -67,7 +59,7 @@ func (s *submitter) shipFile(ctx context.Context, name string, deleteAfterRead b
|
|||||||
// TODO: better way to kill or wait for mongo sendQueue before retrying (or duplicates?)
|
// TODO: better way to kill or wait for mongo sendQueue before retrying (or duplicates?)
|
||||||
wait.ManagedExponentialBackoffWithContext(ctx, defaultBackoff(), func() (done bool, _ error) {
|
wait.ManagedExponentialBackoffWithContext(ctx, defaultBackoff(), func() (done bool, _ error) {
|
||||||
//
|
//
|
||||||
err := s.shipFileRoutine(ctx, name, sendChan, sigCatchupped)
|
err := s.shipFileRoutine(ctx, name, sendChan)
|
||||||
if err == nil {
|
if err == nil {
|
||||||
return true, nil
|
return true, nil
|
||||||
}
|
}
|
||||||
@ -78,7 +70,7 @@ func (s *submitter) shipFile(ctx context.Context, name string, deleteAfterRead b
|
|||||||
})
|
})
|
||||||
}
|
}
|
||||||
|
|
||||||
func (s *submitter) shipFileRoutine(ctx context.Context, name string, sendQueue chan<- mLog, sigCatchupped chan<- struct{}) error {
|
func (s *submitter) shipFileRoutine(ctx context.Context, name string, sendQueue chan<- mLog) error {
|
||||||
baseName := filepath.Base(name)
|
baseName := filepath.Base(name)
|
||||||
|
|
||||||
// TODO: better way for respecting ?killing sender for retry
|
// TODO: better way for respecting ?killing sender for retry
|
||||||
@ -119,6 +111,8 @@ func (s *submitter) shipFileRoutine(ctx context.Context, name string, sendQueue
|
|||||||
}
|
}
|
||||||
|
|
||||||
var catchUpped bool // cache
|
var catchUpped bool // cache
|
||||||
|
promCatchupDone.WithLabelValues(baseName).Set(0)
|
||||||
|
|
||||||
for {
|
for {
|
||||||
select {
|
select {
|
||||||
case err := <-errChan:
|
case err := <-errChan:
|
||||||
@ -133,10 +127,7 @@ func (s *submitter) shipFileRoutine(ctx context.Context, name string, sendQueue
|
|||||||
catchUpped = line.EndOffset > startSize
|
catchUpped = line.EndOffset > startSize
|
||||||
|
|
||||||
if catchUpped {
|
if catchUpped {
|
||||||
select {
|
promCatchupDone.WithLabelValues(baseName).Set(1)
|
||||||
case sigCatchupped <- struct{}{}:
|
|
||||||
default:
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
Loading…
Reference in New Issue
Block a user