package file import ( "context" "errors" "fmt" "io" "log" "os" "time" "git.k-space.ee/k-space/logmower-shipper/pkg/lines" m "git.k-space.ee/k-space/logmower-shipper/pkg/mongo" "git.k-space.ee/k-space/logmower-shipper/pkg/sender" "github.com/jtagcat/util" "go.mongodb.org/mongo-driver/bson" "go.mongodb.org/mongo-driver/mongo" mongoOpt "go.mongodb.org/mongo-driver/mongo/options" "k8s.io/apimachinery/pkg/util/wait" ) const SendQueueLimit = 1024 // wrapper to force copying before use func backoff() wait.Backoff { return wait.Backoff{ Duration: 2 * time.Second, Factor: 1.5, Jitter: 0.1, Cap: 30 * time.Second, } } type File struct { *m.File MetricsName string // filepath.Base() } // TODO: caller could call duplicate shipFile of same name on file replace: sends might not work properly func (f File) Process(ctx context.Context, db *mongo.Collection) { _ = wait.ManagedExponentialBackoffWithContext(ctx, backoff(), func() (done bool, _ error) { err := f.process(ctx, db) if err == nil { return true, nil } promFileErr.WithLabelValues(f.MetricsName).Add(1) log.Printf("processing file %q: %e", f.MetricsName, err) // nil: loop and keep retrying indefinitely return false, nil }) } func (f File) launchChannels(cancel func(), db *mongo.Collection) (_ chan<- lines.Raw, deferFn func()) { lineOut := make(chan lines.Raw) sctx, scancel := context.WithCancel(context.Background()) cancelAll := func() { cancel() scancel() } dbQueue := make(chan m.Record, SendQueueLimit) go lines.RawC(lineOut).Process(sctx, dbQueue) waitBatchSend := util.GoWg(func() { sender.Queue(dbQueue).Sender(db, f.MetricsName, cancelAll) }) return lineOut, func() { close(lineOut) waitBatchSend() } } // use submitter(), don't use directly func (f File) process(ctx context.Context, db *mongo.Collection) error { lFile := lines.File(f) // file.File, but avoiding import cycle sctx, cancel := context.WithCancel(ctx) lineOut, dfn := f.launchChannels(cancel, db) defer dfn() // get files with offset offsetResult, _ := mongoWithErr(db.FindOne(m.GlobalTimeout(sctx), bson.D{{Key: m.RecordKeyHostId, Value: f.Host.Id}, {Key: m.RecordKeyFilePath, Value: f.Path}}, &mongoOpt.FindOneOptions{Sort: bson.D{{Key: m.RecordKeyOffset, Value: -1}}}, // sort descending (get largest) )) offsetResultBytes, err := offsetResult.DecodeBytes() if err != nil && !errors.Is(err, mongo.ErrNoDocuments) { return fmt.Errorf("retrieving offset from database: %w", err) } dbOffset := m.RecordOffsetFromBson(&offsetResultBytes) // for promFileChatcupDone fi, err := os.Stat(f.Path) if err != nil { return fmt.Errorf("getting original file size: %w", err) } startSize := fi.Size() lineIn, errChan, err := util.TailFile(sctx, f.Path, dbOffset, io.SeekStart) if err != nil { return fmt.Errorf("tailing file: %w", err) } promFileInitialSeekSkipped.WithLabelValues(f.MetricsName).Set(float64(dbOffset)) var catchUpped bool promFileCatchupDone.WithLabelValues(f.MetricsName).Set(0) for { select { case err := <-errChan: return fmt.Errorf("tailing file: %w", err) case line, ok := <-lineIn: if !ok { return nil } promFileLineSize.WithLabelValues(f.MetricsName).Observe(float64(len(line.Bytes))) if !catchUpped { catchUpped = line.EndOffset >= startSize if catchUpped { promFileCatchupDone.WithLabelValues(f.MetricsName).Set(1) } } if len(line.Bytes) == 0 { continue } select { case <-sctx.Done(): case lineOut <- lines.Raw{ File: &lFile, Offset: line.EndOffset, B: line.Bytes, }: } } } } func mongoWithErr[t interface{ Err() error }](mongoWrap t) (t, error) { return mongoWrap, mongoWrap.Err() } // func JitterUntilCancelWithContext(pctx context.Context, f func(context.Context, context.CancelFunc), period time.Duration, jitterFactor float64, sliding bool) { // ctx, cancel := context.WithCancel(pctx) // wait.JitterUntil(func() { f(ctx, cancel) }, period, jitterFactor, sliding, ctx.Done()) // }