package logmower import ( "context" "errors" "fmt" "io" "os" "path/filepath" "sync" "github.com/jtagcat/util" prom "github.com/prometheus/client_golang/prometheus" "github.com/prometheus/client_golang/prometheus/promauto" "go.mongodb.org/mongo-driver/bson" "go.mongodb.org/mongo-driver/mongo" mongoOpt "go.mongodb.org/mongo-driver/mongo/options" "go.uber.org/zap" "k8s.io/apimachinery/pkg/util/wait" ) var ( promCatchupDone = promauto.NewGaugeVec(prom.GaugeOpts{ Subsystem: "file", Name: "catchupped", Help: "File count where backlog has been sent; <= watcher_file_count", }, []string{"filename"}) // TODO: rm filename? promFileErr = promauto.NewCounterVec(prom.CounterOpts{ Subsystem: "file", Name: "errors_count", Help: "Error count for reading files", }, []string{"filename"}) ) type ( submitter struct { l *zap.Logger hostInfo HostInfo db *mongo.Collection sync.WaitGroup } ) func (s *submitter) shipFile(ctx context.Context, name string) { baseName := filepath.Base(name) sigCatchupped := make(chan struct{}, 1) go func() { <-sigCatchupped close(sigCatchupped) // once promCatchupDone.WithLabelValues(baseName).Add(1) }() // TODO: restarting before mongo sendQueue finishes will result in duplicates wait.ManagedExponentialBackoffWithContext(ctx, defaultBackoff(), func() (done bool, _ error) { if err := s.shipFileRoutine(ctx, name, sigCatchupped); err != nil { promFileErr.WithLabelValues(baseName).Add(1) s.l.Error("shipping file", zap.String("filename", baseName), zap.Error(err)) return false, nil // nil since we want it to loop and keep retrying indefinitely } return true, nil }) } func (s *submitter) shipFileRoutine(ctx context.Context, name string, sigCatchupped chan<- struct{}) error { // Initialize in case of new file log := mLog{ HostInfo: s.hostInfo, Filename: name, } // get files with offset offsetResult, err := mWithErr(s.db.FindOne(mongoTimeoutCtx(ctx), bson.D{{Key: "hostinfo.id", Value: s.hostInfo.id}, {Key: "filename", Value: name}}, &mongoOpt.FindOneOptions{Sort: bson.D{{Key: "offset", Value: -1}}}, // sort descending (get largest) )) if err != nil && !errors.Is(err, mongo.ErrNoDocuments) { return fmt.Errorf("retrieving mongo offset: %w", err) } // offsetResult.DecodeBytes() //TODO: check for extra fields if err := offsetResult.Decode(&log); err != nil && !errors.Is(err, mongo.ErrNoDocuments) { return fmt.Errorf("decoding mongo offset: %w", err) } fi, err := os.Stat(log.Filename) if err != nil { return fmt.Errorf("getting original file size") } startSize := fi.Size() // TODO: use inotify for file, and end with file deletion or replacement lineChan, errChan, err := util.TailFile(ctx, log.Filename, log.Offset, io.SeekStart) if err != nil { return fmt.Errorf("tailing file: %w", err) } for { select { case err := <-errChan: return fmt.Errorf("tailing file: %w", err) case line := <-lineChan: if line.EndOffset > startSize { select { case sigCatchupped <- struct{}{}: default: } } select { // TODO: use per-file batch senders for file deletion mongo synced, and better error handling; #3 case s.sendQueue <- mLog{ HostInfo: s.hostInfo, Filename: *line.Filename, Offset: line.EndOffset, Content: line.String, }: default: promShipperDropped.WithLabelValues(*line.Filename).Add(1) } // TODO: // default: // return nil } } } func mWithErr[t interface{ Err() error }](mongoWrap t) (t, error) { return mongoWrap, mongoWrap.Err() } // func JitterUntilCancelWithContext(pctx context.Context, f func(context.Context, context.CancelFunc), period time.Duration, jitterFactor float64, sliding bool) { // ctx, cancel := context.WithCancel(pctx) // wait.JitterUntil(func() { f(ctx, cancel) }, period, jitterFactor, sliding, ctx.Done()) // }