remove synced() for shipper

+ shipFile cleanup after itself
This commit is contained in:
rasmus 2022-11-06 15:46:07 +02:00
parent 60b893d960
commit a56e6d1ffc
2 changed files with 13 additions and 33 deletions

View File

@ -74,7 +74,9 @@ func init() {
}) })
} }
func (s *submitter) sender(name string, sendQueue <-chan mLog) (synced func() bool) { func (s *submitter) sender(name string, sendQueue <-chan mLog) {
baseName := filepath.Base(name)
batched := make(chan []mLog) batched := make(chan []mLog)
go func() { go func() {
@ -97,24 +99,13 @@ func (s *submitter) sender(name string, sendQueue <-chan mLog) (synced func() bo
}() }()
util.Batch(MaxBatchItems, MaxBatchTime, sendQueue, batched) util.Batch(MaxBatchItems, MaxBatchTime, sendQueue, batched)
// returns when sendQueue is closed
}() }()
var batchSynced bool
s.Add(1) s.Add(1)
go s.senderRoutine(name, batched, &batchSynced)
return func() bool {
return batchSynced && len(sendQueue) == 0
}
}
func (s *submitter) senderRoutine(name string, batched <-chan []mLog, synced *bool) {
defer s.Done() defer s.Done()
baseName := filepath.Base(name)
for { for {
*synced = true
promShipperSynced.WithLabelValues(baseName).Set(1) promShipperSynced.WithLabelValues(baseName).Set(1)
batch, ok := <-batched batch, ok := <-batched
@ -122,7 +113,6 @@ func (s *submitter) senderRoutine(name string, batched <-chan []mLog, synced *bo
return return
} }
*synced = false
promShipperSynced.WithLabelValues(baseName).Set(0) promShipperSynced.WithLabelValues(baseName).Set(0)
var batchBson []interface{} // mongo does not like typing var batchBson []interface{} // mongo does not like typing

View File

@ -60,19 +60,14 @@ func (s *submitter) shipFile(ctx context.Context, name string, deleteAfterRead b
}() }()
sendChan := make(chan mLog, SendQueueLimit) sendChan := make(chan mLog, SendQueueLimit)
synced := s.sender(name, sendChan) defer close(sendChan)
deleteOk := func() bool { go s.sender(name, sendChan)
if deleteAfterRead && synced() {
return true
}
return false
}
// TODO: better way to kill or wait for mongo sendQueue before retrying (or duplicates?) // TODO: better way to kill or wait for mongo sendQueue before retrying (or duplicates?)
wait.ManagedExponentialBackoffWithContext(ctx, defaultBackoff(), func() (done bool, _ error) { wait.ManagedExponentialBackoffWithContext(ctx, defaultBackoff(), func() (done bool, _ error) {
// //
err := s.shipFileRoutine(ctx, name, deleteOk, sendChan, sigCatchupped) err := s.shipFileRoutine(ctx, name, sendChan, sigCatchupped)
if err == nil { if err == nil {
return true, nil return true, nil
} }
@ -83,7 +78,7 @@ func (s *submitter) shipFile(ctx context.Context, name string, deleteAfterRead b
}) })
} }
func (s *submitter) shipFileRoutine(ctx context.Context, name string, deleteOk func() bool, sendQueue chan<- mLog, sigCatchupped chan<- struct{}) error { func (s *submitter) shipFileRoutine(ctx context.Context, name string, sendQueue chan<- mLog, sigCatchupped chan<- struct{}) error {
baseName := filepath.Base(name) baseName := filepath.Base(name)
// TODO: better way for respecting ?killing sender for retry // TODO: better way for respecting ?killing sender for retry
@ -104,8 +99,6 @@ func (s *submitter) shipFileRoutine(ctx context.Context, name string, deleteOk f
return fmt.Errorf("retrieving mongo offset: %w", err) return fmt.Errorf("retrieving mongo offset: %w", err)
} }
// offsetResult.DecodeBytes() //TODO: check for extra fields
var log mLog var log mLog
if err := offsetResult.Decode(&log); err != nil && !errors.Is(err, mongo.ErrNoDocuments) { if err := offsetResult.Decode(&log); err != nil && !errors.Is(err, mongo.ErrNoDocuments) {
return fmt.Errorf("decoding mongo offset: %w", err) return fmt.Errorf("decoding mongo offset: %w", err)
@ -130,7 +123,11 @@ func (s *submitter) shipFileRoutine(ctx context.Context, name string, deleteOk f
case err := <-errChan: case err := <-errChan:
return fmt.Errorf("tailing file: %w", err) return fmt.Errorf("tailing file: %w", err)
case line := <-lineChan: case line, ok := <-lineChan:
if !ok {
return nil
}
if line.EndOffset > startSize { if line.EndOffset > startSize {
select { select {
case sigCatchupped <- struct{}{}: case sigCatchupped <- struct{}{}:
@ -172,13 +169,6 @@ func (s *submitter) shipFileRoutine(ctx context.Context, name string, deleteOk f
default: default:
promShipperDropped.WithLabelValues(baseName).Add(1) promShipperDropped.WithLabelValues(baseName).Add(1)
} }
// no new lines
// TODO: ensure we don't instantly jump here
// default:
// if deleteOk() {
// return os.Remove(name)
// }
} }
} }
} }