package logmower import ( "context" "errors" "fmt" "io" "os" "path/filepath" "strings" "sync" "time" "github.com/jtagcat/util" prom "github.com/prometheus/client_golang/prometheus" "github.com/prometheus/client_golang/prometheus/promauto" "go.mongodb.org/mongo-driver/bson" "go.mongodb.org/mongo-driver/mongo" mongoOpt "go.mongodb.org/mongo-driver/mongo/options" "go.uber.org/zap" "k8s.io/apimachinery/pkg/util/wait" ) var ( promCatchupDone = promauto.NewGaugeVec(prom.GaugeOpts{ Subsystem: "file", Name: "catchupped", Help: "File count where backlog has been sent; <= watcher_file_count", }, []string{"filename"}) // TODO: rm filename? promFileErr = promauto.NewCounterVec(prom.CounterOpts{ Subsystem: "file", Name: "errors_count", Help: "Error count for reading files", }, []string{"filename"}) ) type ( submitter struct { l *zap.Logger hostInfo HostInfo db *mongo.Collection sync.WaitGroup } ) const SendQueueLimit = 1024 // TODO: caller may call duplicate shipFile of same name on file replace; sends might not work properly func (s *submitter) shipFile(ctx context.Context, name string, deleteAfterRead bool) { baseName := filepath.Base(name) sigCatchupped := make(chan struct{}, 1) go func() { <-sigCatchupped close(sigCatchupped) // once promCatchupDone.WithLabelValues(baseName).Add(1) }() sendChan := make(chan mLog, SendQueueLimit) synced := s.sender(name, sendChan) deleteOk := func() bool { if deleteAfterRead && synced() { return true } return false } // TODO: better way to kill or wait for mongo sendQueue before retrying (or duplicates?) wait.ManagedExponentialBackoffWithContext(ctx, defaultBackoff(), func() (done bool, _ error) { // err := s.shipFileRoutine(ctx, name, deleteOk, sendChan, sigCatchupped) if err == nil { return true, nil } promFileErr.WithLabelValues(baseName).Add(1) s.l.Error("shipping file", zap.String("filename", baseName), zap.Error(err)) return false, nil // nil since we want to loop and keep retrying indefinitely }) } func (s *submitter) shipFileRoutine(ctx context.Context, name string, deleteOk func() bool, sendQueue chan<- mLog, sigCatchupped chan<- struct{}) error { baseName := filepath.Base(name) // TODO: better way for respecting ?killing sender for retry for { if len(sendQueue) == 0 { break } time.Sleep(time.Second) } // get files with offset offsetResult, err := mongoWithErr(s.db.FindOne(mongoTimeoutCtx(ctx), bson.D{{Key: "hostinfo.id", Value: s.hostInfo.id}, {Key: "file", Value: baseName}}, &mongoOpt.FindOneOptions{Sort: bson.D{{Key: "offset", Value: -1}}}, // sort descending (get largest) )) if err != nil && !errors.Is(err, mongo.ErrNoDocuments) { return fmt.Errorf("retrieving mongo offset: %w", err) } // offsetResult.DecodeBytes() //TODO: check for extra fields var log mLog if err := offsetResult.Decode(&log); err != nil && !errors.Is(err, mongo.ErrNoDocuments) { return fmt.Errorf("decoding mongo offset: %w", err) } fi, err := os.Stat(name) if err != nil { return fmt.Errorf("getting original file size: %w", err) } startSize := fi.Size() sctx, cancel := context.WithCancel(ctx) defer cancel() lineChan, errChan, err := util.TailFile(sctx, name, log.Offset, io.SeekStart) if err != nil { return fmt.Errorf("tailing file: %w", err) } for { select { case err := <-errChan: return fmt.Errorf("tailing file: %w", err) case line := <-lineChan: if line.EndOffset > startSize { select { case sigCatchupped <- struct{}{}: default: } } var collectTime time.Time var stdErr, format, log string split := strings.SplitN(line.String, " ", 4) if len(split) != 4 { log = line.String promLineParsingErr.WithLabelValues(baseName).Add(1) } else { stdErr, format, log = split[1], split[2], split[3] collectTime, err = time.Parse(time.RFC3339Nano, split[0]) if err != nil { promLineParsingErr.WithLabelValues(baseName).Add(1) } } select { case sendQueue <- mLog{ HostInfo: s.hostInfo, File: baseName, Offset: line.EndOffset, ShipTime: time.Now(), CollectTime: collectTime, StdErr: stdErr == "stderr", // or stdout Format: format, Content: log, }: default: promShipperDropped.WithLabelValues(baseName).Add(1) } // no new lines // TODO: ensure we don't instantly jump here // default: // if deleteOk() { // return os.Remove(name) // } } } } func mongoWithErr[t interface{ Err() error }](mongoWrap t) (t, error) { return mongoWrap, mongoWrap.Err() } // func JitterUntilCancelWithContext(pctx context.Context, f func(context.Context, context.CancelFunc), period time.Duration, jitterFactor float64, sliding bool) { // ctx, cancel := context.WithCancel(pctx) // wait.JitterUntil(func() { f(ctx, cancel) }, period, jitterFactor, sliding, ctx.Done()) // }