From 908ac5c52ac3422bd5a4ae789b1c3207c4006bc8 Mon Sep 17 00:00:00 2001 From: rasmus Date: Sun, 6 Nov 2022 01:45:19 +0200 Subject: [PATCH] submit: complete local sender refactor --- cmd/logmower.go | 3 ++- cmd/sender.go | 5 ++-- cmd/submit.go | 63 ++++++++++++++++++++++++++++++++++++------------- 3 files changed, 50 insertions(+), 21 deletions(-) diff --git a/cmd/logmower.go b/cmd/logmower.go index 9512950..2998cfa 100644 --- a/cmd/logmower.go +++ b/cmd/logmower.go @@ -57,6 +57,7 @@ var App = &cli.App{ &cli.StringFlag{Name: "log-directory", Usage: "Directory to watch for logs", Value: "/var/log/containers"}, &cli.IntFlag{Name: "max-record-size", Value: 128 * 1024, Usage: "Maximum record size in bytes"}, // TODO: &cli.BoolFlag{Name: "normalize-log-level", Usage: "Normalize log.level values to Syslog defined keywords"}, // TODO: + &cli.BoolFlag{Name: "delete-after-read", Usage: "Delete log file when it is synced to mongo, and no new lines to read", Value: false}, // &cli.BoolFlag{Name: "parse-json"}, //TODO: &cli.StringFlag{Category: "k8s metadata", Name: "pod-namespace", EnvVars: []string{"KUBE_POD_NAMESPACE"}}, // TODO: &cli.StringFlag{Category: "k8s metadata", Name: "node-name", EnvVars: []string{"KUBE_NODE_NAME"}, Required: true}, @@ -147,7 +148,7 @@ var App = &cli.App{ wg.Add(1) go func() { - state.shipFile(ctx.Context, absPath) + state.shipFile(ctx.Context, absPath, ctx.Bool("delete-after-read")) wg.Done() }() diff --git a/cmd/sender.go b/cmd/sender.go index 09f1481..d74bc4e 100644 --- a/cmd/sender.go +++ b/cmd/sender.go @@ -41,9 +41,8 @@ var ( ) const ( - SendQueueLimit = 1024 - MaxBatchItems = 100 - MaxBatchTime = time.Second + MaxBatchItems = 100 + MaxBatchTime = time.Second ) func init() { diff --git a/cmd/submit.go b/cmd/submit.go index 2a033da..cca8d97 100644 --- a/cmd/submit.go +++ b/cmd/submit.go @@ -8,6 +8,7 @@ import ( "os" "path/filepath" "sync" + "time" "github.com/jtagcat/util" prom "github.com/prometheus/client_golang/prometheus" @@ -43,7 +44,10 @@ type ( } ) -func (s *submitter) shipFile(ctx context.Context, name string) { +const SendQueueLimit = 1024 + +// TODO: caller may call duplicate shipFile of same name on file replace; sends might not work properly +func (s *submitter) shipFile(ctx context.Context, name string, deleteAfterRead bool) { baseName := filepath.Base(name) sigCatchupped := make(chan struct{}, 1) @@ -54,18 +58,40 @@ func (s *submitter) shipFile(ctx context.Context, name string) { promCatchupDone.WithLabelValues(baseName).Add(1) }() - // TODO: restarting before mongo sendQueue finishes will result in duplicates - wait.ManagedExponentialBackoffWithContext(ctx, defaultBackoff(), func() (done bool, _ error) { - if err := s.shipFileRoutine(ctx, name, sigCatchupped); err != nil { - promFileErr.WithLabelValues(baseName).Add(1) - s.l.Error("shipping file", zap.String("filename", baseName), zap.Error(err)) - return false, nil // nil since we want it to loop and keep retrying indefinitely + sendChan := make(chan mLog, SendQueueLimit) + synced := s.sender(name, sendChan) + + deleteOk := func() bool { + if deleteAfterRead && synced() { + return true } - return true, nil + return false + } + + // TODO: better way to kill or wait for mongo sendQueue before retrying (or duplicates?) + wait.ManagedExponentialBackoffWithContext(ctx, defaultBackoff(), func() (done bool, _ error) { + // + err := s.shipFileRoutine(ctx, name, deleteOk, sendChan, sigCatchupped) + if err == nil { + return true, nil + } + + promFileErr.WithLabelValues(baseName).Add(1) + s.l.Error("shipping file", zap.String("filename", baseName), zap.Error(err)) + return false, nil // nil since we want to loop and keep retrying indefinitely }) } -func (s *submitter) shipFileRoutine(ctx context.Context, name string, sigCatchupped chan<- struct{}) error { +func (s *submitter) shipFileRoutine(ctx context.Context, name string, deleteOk func() bool, sendQueue chan<- mLog, sigCatchupped chan<- struct{}) error { + // TODO: better way for respecting ?killing sender for retry + + for { + if len(sendQueue) == 0 { + break + } + time.Sleep(time.Second) + } + // Initialize in case of new file log := mLog{ HostInfo: s.hostInfo, @@ -73,7 +99,7 @@ func (s *submitter) shipFileRoutine(ctx context.Context, name string, sigCatchup } // get files with offset - offsetResult, err := mWithErr(s.db.FindOne(mongoTimeoutCtx(ctx), + offsetResult, err := mongoWithErr(s.db.FindOne(mongoTimeoutCtx(ctx), bson.D{{Key: "hostinfo.id", Value: s.hostInfo.id}, {Key: "filename", Value: name}}, &mongoOpt.FindOneOptions{Sort: bson.D{{Key: "offset", Value: -1}}}, // sort descending (get largest) )) @@ -82,7 +108,6 @@ func (s *submitter) shipFileRoutine(ctx context.Context, name string, sigCatchup } // offsetResult.DecodeBytes() //TODO: check for extra fields - if err := offsetResult.Decode(&log); err != nil && !errors.Is(err, mongo.ErrNoDocuments) { return fmt.Errorf("decoding mongo offset: %w", err) } @@ -98,10 +123,12 @@ func (s *submitter) shipFileRoutine(ctx context.Context, name string, sigCatchup if err != nil { return fmt.Errorf("tailing file: %w", err) } + for { select { case err := <-errChan: return fmt.Errorf("tailing file: %w", err) + case line := <-lineChan: if line.EndOffset > startSize { select { @@ -111,8 +138,7 @@ func (s *submitter) shipFileRoutine(ctx context.Context, name string, sigCatchup } select { - // TODO: use per-file batch senders for file deletion mongo synced, and better error handling; #3 - case s.sendQueue <- mLog{ + case sendQueue <- mLog{ HostInfo: s.hostInfo, Filename: *line.Filename, Offset: line.EndOffset, @@ -121,14 +147,17 @@ func (s *submitter) shipFileRoutine(ctx context.Context, name string, sigCatchup default: promShipperDropped.WithLabelValues(*line.Filename).Add(1) } - // TODO: - // default: - // return nil + + // no new lines + default: + if deleteOk() { + return os.Remove(name) + } } } } -func mWithErr[t interface{ Err() error }](mongoWrap t) (t, error) { +func mongoWithErr[t interface{ Err() error }](mongoWrap t) (t, error) { return mongoWrap, mongoWrap.Err() }