package logmower import ( "context" "errors" "fmt" "io" "log" "os" "time" ms "git.k-space.ee/k-space/logmower-shipper/pkg/mongo_struct" "github.com/jtagcat/util" prom "github.com/prometheus/client_golang/prometheus" "github.com/prometheus/client_golang/prometheus/promauto" "go.mongodb.org/mongo-driver/bson" "go.mongodb.org/mongo-driver/mongo" mongoOpt "go.mongodb.org/mongo-driver/mongo/options" "k8s.io/apimachinery/pkg/util/wait" ) var ( promFileInitialSeekSkipped = promauto.NewGaugeVec(prom.GaugeOpts{ Namespace: PrometheusPrefix, // Subsystem: "file", Name: "skipped_bytes", Help: "Bytes skipped in file after discovering", }, []string{"filename"}) promFileCatchupDone = promauto.NewGaugeVec(prom.GaugeOpts{ Namespace: PrometheusPrefix, Subsystem: "file", Name: "catchupped", Help: "(0 or) 1 if initial backlog has been sent; (total <= watcher_file_count)", }, []string{"filename"}) // TODO: rm filename? promFileErr = promauto.NewCounterVec(prom.CounterOpts{ Namespace: PrometheusPrefix, Subsystem: "file", Name: "errors_count", Help: "Errors while reading file", }, []string{"filename"}) promFileLineSize = promauto.NewHistogramVec(prom.HistogramOpts{ Namespace: PrometheusPrefix, // Subsystem: "file", Name: "line_size_bytes", Help: "Log line size in bytes", Buckets: []float64{80, 160, 320, 640, 1280}, }, []string{"filename"}) ) const SendQueueLimit = 1024 type file struct { ms.File metricsName string // filepath.Base() } // TODO: caller could call duplicate shipFile of same name on file replace: sends might not work properly func (f file) Process(ctx context.Context, db *mongo.Collection, recordLimitBytes int) { lineChan := make(chan RawLine) defer close(lineChan) dbQueue := make(chan ms.Record, SendQueueLimit) go RawLines(lineChan).Process(recordLimitBytes, dbQueue) waitGo := util.GoWg(func() { queueT(dbQueue).sender(db, f.metricsName) }) defer waitGo() // TODO: better way to kill or wait for sendQueue before retrying (or duplicates?) _ = wait.ManagedExponentialBackoffWithContext(ctx, defaultBackoff(), func() (done bool, _ error) { err := f.trySubmit(ctx, db, lineChan) if err == nil { return true, nil } promFileErr.WithLabelValues(f.metricsName).Add(1) log.Printf("processing file %q: %e", f.metricsName, err) // nil: loop and keep retrying indefinitely return false, nil }) } // use submitter(), don't use directly func (f file) trySubmit(ctx context.Context, db *mongo.Collection, sendQueue chan<- RawLine) error { // TODO: better way for respecting ?killing sender for retry for { if len(sendQueue) == 0 { break } time.Sleep(time.Second) } // get files with offset offsetResult, _ := mongoWithErr(db.FindOne(mongoTimeoutCtx(ctx), bson.D{{Key: ms.RecordKeyHostId, Value: f.Host.Id}, {Key: ms.RecordKeyFilePath, Value: f.Path}}, &mongoOpt.FindOneOptions{Sort: bson.D{{Key: ms.RecordKeyOffset, Value: -1}}}, // sort descending (get largest) )) offsetResultBytes, err := offsetResult.DecodeBytes() if err != nil && !errors.Is(err, mongo.ErrNoDocuments) { return fmt.Errorf("retrieving offset from database: %w", err) } dbOffset := ms.RecordOffsetFromBson(&offsetResultBytes) fi, err := os.Stat(f.Path) if err != nil { return fmt.Errorf("getting original file size: %w", err) } startSize := fi.Size() sctx, cancel := context.WithCancel(ctx) defer cancel() promFileInitialSeekSkipped.WithLabelValues(f.metricsName).Set(float64(dbOffset)) lineChan, errChan, err := util.TailFile(sctx, f.Path, dbOffset, io.SeekStart) if err != nil { return fmt.Errorf("tailing file: %w", err) } var catchUpped bool promFileCatchupDone.WithLabelValues(f.metricsName).Set(0) for { select { case err := <-errChan: return fmt.Errorf("tailing file: %w", err) case line, ok := <-lineChan: if !ok { return nil } promFileLineSize.WithLabelValues(f.metricsName).Observe(float64(len(line.Bytes))) if !catchUpped { catchUpped = line.EndOffset >= startSize if catchUpped { promFileCatchupDone.WithLabelValues(f.metricsName).Set(1) } } if len(line.Bytes) == 0 { continue } sendQueue <- RawLine{ file: &f, Offset: line.EndOffset, B: line.Bytes, } } } } func mongoWithErr[t interface{ Err() error }](mongoWrap t) (t, error) { return mongoWrap, mongoWrap.Err() } // func JitterUntilCancelWithContext(pctx context.Context, f func(context.Context, context.CancelFunc), period time.Duration, jitterFactor float64, sliding bool) { // ctx, cancel := context.WithCancel(pctx) // wait.JitterUntil(func() { f(ctx, cancel) }, period, jitterFactor, sliding, ctx.Done()) // }