diff --git a/cmd/sender.go b/cmd/sender.go index 22bd895..bd2715f 100644 --- a/cmd/sender.go +++ b/cmd/sender.go @@ -74,7 +74,9 @@ func init() { }) } -func (s *submitter) sender(name string, sendQueue <-chan mLog) (synced func() bool) { +func (s *submitter) sender(name string, sendQueue <-chan mLog) { + baseName := filepath.Base(name) + batched := make(chan []mLog) go func() { @@ -97,24 +99,13 @@ func (s *submitter) sender(name string, sendQueue <-chan mLog) (synced func() bo }() util.Batch(MaxBatchItems, MaxBatchTime, sendQueue, batched) + // returns when sendQueue is closed }() - var batchSynced bool s.Add(1) - go s.senderRoutine(name, batched, &batchSynced) - - return func() bool { - return batchSynced && len(sendQueue) == 0 - } -} - -func (s *submitter) senderRoutine(name string, batched <-chan []mLog, synced *bool) { defer s.Done() - baseName := filepath.Base(name) - for { - *synced = true promShipperSynced.WithLabelValues(baseName).Set(1) batch, ok := <-batched @@ -122,7 +113,6 @@ func (s *submitter) senderRoutine(name string, batched <-chan []mLog, synced *bo return } - *synced = false promShipperSynced.WithLabelValues(baseName).Set(0) var batchBson []interface{} // mongo does not like typing diff --git a/cmd/submit.go b/cmd/submit.go index 2f1dc31..69aa959 100644 --- a/cmd/submit.go +++ b/cmd/submit.go @@ -60,19 +60,14 @@ func (s *submitter) shipFile(ctx context.Context, name string, deleteAfterRead b }() sendChan := make(chan mLog, SendQueueLimit) - synced := s.sender(name, sendChan) + defer close(sendChan) - deleteOk := func() bool { - if deleteAfterRead && synced() { - return true - } - return false - } + go s.sender(name, sendChan) // TODO: better way to kill or wait for mongo sendQueue before retrying (or duplicates?) wait.ManagedExponentialBackoffWithContext(ctx, defaultBackoff(), func() (done bool, _ error) { // - err := s.shipFileRoutine(ctx, name, deleteOk, sendChan, sigCatchupped) + err := s.shipFileRoutine(ctx, name, sendChan, sigCatchupped) if err == nil { return true, nil } @@ -83,7 +78,7 @@ func (s *submitter) shipFile(ctx context.Context, name string, deleteAfterRead b }) } -func (s *submitter) shipFileRoutine(ctx context.Context, name string, deleteOk func() bool, sendQueue chan<- mLog, sigCatchupped chan<- struct{}) error { +func (s *submitter) shipFileRoutine(ctx context.Context, name string, sendQueue chan<- mLog, sigCatchupped chan<- struct{}) error { baseName := filepath.Base(name) // TODO: better way for respecting ?killing sender for retry @@ -104,8 +99,6 @@ func (s *submitter) shipFileRoutine(ctx context.Context, name string, deleteOk f return fmt.Errorf("retrieving mongo offset: %w", err) } - // offsetResult.DecodeBytes() //TODO: check for extra fields - var log mLog if err := offsetResult.Decode(&log); err != nil && !errors.Is(err, mongo.ErrNoDocuments) { return fmt.Errorf("decoding mongo offset: %w", err) @@ -130,7 +123,11 @@ func (s *submitter) shipFileRoutine(ctx context.Context, name string, deleteOk f case err := <-errChan: return fmt.Errorf("tailing file: %w", err) - case line := <-lineChan: + case line, ok := <-lineChan: + if !ok { + return nil + } + if line.EndOffset > startSize { select { case sigCatchupped <- struct{}{}: @@ -172,13 +169,6 @@ func (s *submitter) shipFileRoutine(ctx context.Context, name string, deleteOk f default: promShipperDropped.WithLabelValues(baseName).Add(1) } - - // no new lines - // TODO: ensure we don't instantly jump here - // default: - // if deleteOk() { - // return os.Remove(name) - // } } } }