diff --git a/README.md b/README.md index 5bda326..54a643a 100644 --- a/README.md +++ b/README.md @@ -6,6 +6,11 @@ Watches log directory for logs, and ships them to mongo. - Building: `go build .` ## Files -1. `mower.go` acts as the main routine: initializes (including `mongo.go`) and watches for new files in log directory. -2. `submit.go` watches file contents and tails them, streaming to `sender.go`. -3. `sender.go` batches lines and ships them to mongo. +0. `pkg/globals` defines globals. This is used for Prometheus namespace, and for forwarding static global CLI configuration with less clutter. +1. `main.go` serves `/metrics` and runs `watcher.go`. +2. `pkg/watcher` main routine; initializes (including `mongo.go`) and watches for new log files. + - `pkg/mongo` provides statically safe and central tools to interact with the database. + - `pkg/util` provides additional utility functions. +3. `pkg/file` handles file lifecycle; watches files and tails them, streaming lines to `pkg/lines`. +4. `pkg/lines` processes lines and streams them to `pkg/sender`. +5. `pkg/sender` batches lines and ships them to mongo. diff --git a/pkg/file/file.go b/pkg/file/file.go index 936e6d6..8396f8f 100644 --- a/pkg/file/file.go +++ b/pkg/file/file.go @@ -27,9 +27,9 @@ type File struct { } // TODO: caller could call duplicate shipFile of same name on file replace: sends might not work properly -func (f File) Process(ctx context.Context, db *mongo.Collection, recordLimitBytes int) { +func (f File) Process(ctx context.Context, db *mongo.Collection) { _ = wait.ManagedExponentialBackoffWithContext(ctx, globals.Backoff(), func() (done bool, _ error) { - err := f.process(ctx, db, recordLimitBytes) + err := f.process(ctx, db) if err == nil { return true, nil } @@ -42,7 +42,7 @@ func (f File) Process(ctx context.Context, db *mongo.Collection, recordLimitByte }) } -func (f File) launchChannels(cancel func(), db *mongo.Collection, recordLimitBytes int) (_ chan<- lines.Raw, deferFn func()) { +func (f File) launchChannels(cancel func(), db *mongo.Collection) (_ chan<- lines.Raw, deferFn func()) { lineOut := make(chan lines.Raw) sctx, scancel := context.WithCancel(context.Background()) @@ -52,7 +52,7 @@ func (f File) launchChannels(cancel func(), db *mongo.Collection, recordLimitByt } dbQueue := make(chan m.Record, SendQueueLimit) - go lines.RawC(lineOut).Process(sctx, recordLimitBytes, dbQueue) + go lines.RawC(lineOut).Process(sctx, dbQueue) waitBatchSend := util.GoWg(func() { sender.Queue(dbQueue).Sender(db, f.MetricsName, cancelAll) @@ -65,12 +65,12 @@ func (f File) launchChannels(cancel func(), db *mongo.Collection, recordLimitByt } // use submitter(), don't use directly -func (f File) process(ctx context.Context, db *mongo.Collection, recordLimitBytes int) error { +func (f File) process(ctx context.Context, db *mongo.Collection) error { lFile := lines.File(f) // file.File, but avoiding import cycle sctx, cancel := context.WithCancel(ctx) - lineOut, dfn := f.launchChannels(cancel, db, recordLimitBytes) + lineOut, dfn := f.launchChannels(cancel, db) defer dfn() // get files with offset diff --git a/pkg/globals/globals.go b/pkg/globals/globals.go index c31b3a7..e9e5f80 100644 --- a/pkg/globals/globals.go +++ b/pkg/globals/globals.go @@ -15,6 +15,11 @@ const ( DatabaseCommandTimeout = 10 * time.Second ) +var ( + BufferLimitBytes int + Simulate bool +) + func MongoTimeout(ctx context.Context) context.Context { ctx, _ = context.WithTimeout(ctx, DatabaseCommandTimeout) //nolint:lostcancel (cancelled by mongo, should be bug on them //TODO) return ctx diff --git a/pkg/lines/lines.go b/pkg/lines/lines.go index cfac720..3ac50c2 100644 --- a/pkg/lines/lines.go +++ b/pkg/lines/lines.go @@ -5,6 +5,7 @@ import ( "log" "sync" + "git.k-space.ee/k-space/logmower-shipper/pkg/globals" m "git.k-space.ee/k-space/logmower-shipper/pkg/mongo" ) @@ -24,7 +25,7 @@ type ( ) // assumes all lines are from same file -func (unparsed RawC) Process(ctx context.Context, bufferLimitBytes int, parsed chan<- m.Record) { +func (unparsed RawC) Process(ctx context.Context, parsed chan<- m.Record) { lines := make(chan singleLine) go unparsed.parse(lines) @@ -33,11 +34,11 @@ func (unparsed RawC) Process(ctx context.Context, bufferLimitBytes int, parsed c stdOut, stdErr := make(chan singleLine), make(chan singleLine) go func() { - singleLines(stdOut).process(ctx, bufferLimitBytes, parsed) + singleLines(stdOut).process(ctx, parsed) wg.Done() }() go func() { - singleLines(stdErr).process(ctx, bufferLimitBytes, parsed) + singleLines(stdErr).process(ctx, parsed) wg.Done() }() @@ -68,7 +69,7 @@ func (unparsed RawC) Process(ctx context.Context, bufferLimitBytes int, parsed c } } -func (lines singleLines) process(ctx context.Context, bufferLimitBytes int, parsed chan<- m.Record) { +func (lines singleLines) process(ctx context.Context, parsed chan<- m.Record) { var firstMetadata *m.ParsedMetadata var buffer []byte @@ -86,9 +87,9 @@ func (lines singleLines) process(ctx context.Context, bufferLimitBytes int, pars buffer = append(buffer, line.B...) - if len(buffer) > bufferLimitBytes { + if len(buffer) > globals.BufferLimitBytes { promRecordDroppedTooLarge.WithLabelValues(line.MetricsName).Add(1) - log.Printf("dropped record: size in bytes exceeds limit of %d", bufferLimitBytes) + log.Printf("dropped record: size in bytes exceeds limit of %d", globals.BufferLimitBytes) buffer = nil continue diff --git a/pkg/sender/sender.go b/pkg/sender/sender.go index 98e5fd5..e90ac70 100644 --- a/pkg/sender/sender.go +++ b/pkg/sender/sender.go @@ -2,6 +2,7 @@ package sender import ( "context" + "fmt" "log" "time" @@ -56,13 +57,7 @@ func (queue Queue) Sender(db *mongo.Collection, metricsFilename string, cancelOn promShipperSynced.WithLabelValues(metricsFilename).Set(0) - var batchBson []interface{} // mongo does not like typing - for _, b := range batch { - batchBson = append(batchBson, b.ToBson()) - } - - tru := true - result, err := db.InsertMany(globals.MongoTimeout(context.Background()), batchBson, &options.InsertManyOptions{Ordered: &tru}) + result, err := insertManyWithSimulate(db, batch) var succeedCount int if result != nil { @@ -89,3 +84,24 @@ func (queue Queue) Sender(db *mongo.Collection, metricsFilename string, cancelOn } } } + +func insertManyWithSimulate(db *mongo.Collection, batch []m.Record) (*mongo.InsertManyResult, error) { + if !globals.Simulate { + var batchBson []interface{} // mongo does not like typing + for _, b := range batch { + batchBson = append(batchBson, b.ToBson()) + } + + tru := true + return db.InsertMany(globals.MongoTimeout(context.Background()), batchBson, &options.InsertManyOptions{Ordered: &tru}) + } + + fmt.Printf("simulating successful database bulk write: %v", batch) + + var res mongo.InsertManyResult + for range batch { + res.InsertedIDs = append(res.InsertedIDs, nil) + } + + return &res, nil +} diff --git a/pkg/watcher/watcher.go b/pkg/watcher/watcher.go index f3643a3..acaa32a 100644 --- a/pkg/watcher/watcher.go +++ b/pkg/watcher/watcher.go @@ -24,21 +24,26 @@ var App = &cli.App{ // Usage: "rubykana ", // TODO: #2: yaml Flags: []cli.Flag{ - &cli.BoolFlag{Name: "simulate", Aliases: []string{"dry-run"}, Usage: "Do not write to database"}, // TODO: + &cli.BoolFlag{Name: "simulate", Aliases: []string{"dry-run"}, Usage: "Do not write to database"}, &cli.StringFlag{Name: "log-directory", Usage: "Directory to watch for logs", Value: "/var/log/containers"}, - &cli.IntFlag{Name: "max-record-size", Value: 128 * 1024, Usage: "Maximum record size in bytes"}, // TODO: - &cli.BoolFlag{Name: "normalize-log-level", Usage: "Normalize log.level values to Syslog defined keywords"}, // TODO: - // &cli.BoolFlag{Name: "parse-json"}, //TODO: - &cli.StringFlag{Category: "k8s metadata", Name: "pod-namespace", EnvVars: []string{"KUBE_POD_NAMESPACE"}}, // TODO: + &cli.IntFlag{Name: "max-record-size", Value: 128 * 1024, Usage: "Maximum record size in bytes"}, + // + //TODO: &cli.BoolFlag{Name: "normalize-log-level", Usage: "Normalize log.level values to Syslog defined keywords"}, + //TODO: &cli.BoolFlag{Name: "parse-json"}, + // TODO: + &cli.StringFlag{Category: "k8s metadata", Name: "pod-namespace", EnvVars: []string{"KUBE_POD_NAMESPACE"}}, &cli.StringFlag{Category: "k8s metadata", Name: "node-name", EnvVars: []string{"KUBE_NODE_NAME"}, Required: true}, // &cli.StringFlag{Category: "secrets", Name: "mongo-uri", EnvVars: []string{"MONGO_URI"}, Usage: "mongodb://foo:bar@host:27017/database", Required: true}, }, Before: func(ctx *cli.Context) error { - if ctx.Int("max-record-size") < 1 { - return fmt.Errorf("max-record-size must be postivie") + globals.BufferLimitBytes = ctx.Int("max-record-size") + if globals.BufferLimitBytes < 1 { + return fmt.Errorf("max-record-size must be positive") } + globals.Simulate = ctx.Bool("simulate") + return nil }, @@ -102,7 +107,7 @@ var App = &cli.App{ MetricsName: filepath.Base(event.Name), } - file.Process(ctx.Context, db, ctx.Int("max-record-size")) + file.Process(ctx.Context, db) wg.Done() }()