diff --git a/cmd/logmower.go b/cmd/logmower.go index 0e9dfb3..9512950 100644 --- a/cmd/logmower.go +++ b/cmd/logmower.go @@ -101,11 +101,7 @@ var App = &cli.App{ } }() - state := submitter{ - ctx: ctx.Context, - l: l, - sendQueue: make(chan mLog, SendQueueLimit), - } + state := submitter{l: l} dbOpt := mongoMonitoredClientOptions(l).ApplyURI(ctx.String("mongo-uri")) @@ -151,7 +147,7 @@ var App = &cli.App{ wg.Add(1) go func() { - state.shipFile(absPath) + state.shipFile(ctx.Context, absPath) wg.Done() }() @@ -176,7 +172,7 @@ var App = &cli.App{ // waiting indefinitely for interrupt wg.Wait() // wait for watch and file processors to cleanup - return errAppend(watcher.Close(), state.ctx.Err()) + return errAppend(watcher.Close(), ctx.Err()) }, } diff --git a/cmd/submit.go b/cmd/submit.go index d96ac67..2a033da 100644 --- a/cmd/submit.go +++ b/cmd/submit.go @@ -34,8 +34,7 @@ var ( type ( submitter struct { - ctx context.Context - l *zap.Logger + l *zap.Logger hostInfo HostInfo db *mongo.Collection @@ -44,7 +43,7 @@ type ( } ) -func (s *submitter) shipFile(name string) { +func (s *submitter) shipFile(ctx context.Context, name string) { baseName := filepath.Base(name) sigCatchupped := make(chan struct{}, 1) @@ -56,8 +55,8 @@ func (s *submitter) shipFile(name string) { }() // TODO: restarting before mongo sendQueue finishes will result in duplicates - wait.ManagedExponentialBackoffWithContext(s.ctx, defaultBackoff(), func() (done bool, _ error) { - if err := s.shipFileRoutine(name, sigCatchupped); err != nil { + wait.ManagedExponentialBackoffWithContext(ctx, defaultBackoff(), func() (done bool, _ error) { + if err := s.shipFileRoutine(ctx, name, sigCatchupped); err != nil { promFileErr.WithLabelValues(baseName).Add(1) s.l.Error("shipping file", zap.String("filename", baseName), zap.Error(err)) return false, nil // nil since we want it to loop and keep retrying indefinitely @@ -66,7 +65,7 @@ func (s *submitter) shipFile(name string) { }) } -func (s *submitter) shipFileRoutine(name string, sigCatchupped chan<- struct{}) error { +func (s *submitter) shipFileRoutine(ctx context.Context, name string, sigCatchupped chan<- struct{}) error { // Initialize in case of new file log := mLog{ HostInfo: s.hostInfo, @@ -74,7 +73,7 @@ func (s *submitter) shipFileRoutine(name string, sigCatchupped chan<- struct{}) } // get files with offset - offsetResult, err := mWithErr(s.db.FindOne(mongoTimeoutCtx(s.ctx), + offsetResult, err := mWithErr(s.db.FindOne(mongoTimeoutCtx(ctx), bson.D{{Key: "hostinfo.id", Value: s.hostInfo.id}, {Key: "filename", Value: name}}, &mongoOpt.FindOneOptions{Sort: bson.D{{Key: "offset", Value: -1}}}, // sort descending (get largest) )) @@ -95,7 +94,7 @@ func (s *submitter) shipFileRoutine(name string, sigCatchupped chan<- struct{}) startSize := fi.Size() // TODO: use inotify for file, and end with file deletion or replacement - lineChan, errChan, err := util.TailFile(s.ctx, log.Filename, log.Offset, io.SeekStart) + lineChan, errChan, err := util.TailFile(ctx, log.Filename, log.Offset, io.SeekStart) if err != nil { return fmt.Errorf("tailing file: %w", err) }