From 7e59c24c133a30a8818d550493d4e119838c5760 Mon Sep 17 00:00:00 2001 From: rasmus Date: Wed, 9 Nov 2022 18:07:28 +0200 Subject: [PATCH] restructure project --- logmower/sender.go | 108 ----------- logmower/submitter.go | 168 ------------------ main.go | 6 +- pkg/file/file.go | 143 +++++++++++++++ pkg/file/metrics.go | 35 ++++ pkg/globals/globals.go | 31 ++++ {logmower => pkg/lines}/lines.go | 37 ++-- {logmower => pkg/lines}/lines_single.go | 23 +-- pkg/lines/metrics.go | 23 +++ pkg/{mongo_struct => mongo}/bson_lookup.go | 2 +- logmower/mongo.go => pkg/mongo/metrics.go | 63 ++----- pkg/mongo/mongo.go | 37 ++++ pkg/{mongo_struct => mongo}/mongo_struct.go | 8 +- pkg/sender/metrics.go | 41 +++++ pkg/sender/sender.go | 73 ++++++++ .../populate.go => util/util.go} | 26 +-- pkg/watcher/metrics.go | 40 +++++ {logmower => pkg/watcher}/watcher.go | 84 ++------- 18 files changed, 498 insertions(+), 450 deletions(-) delete mode 100644 logmower/sender.go delete mode 100644 logmower/submitter.go create mode 100644 pkg/file/file.go create mode 100644 pkg/file/metrics.go create mode 100644 pkg/globals/globals.go rename {logmower => pkg/lines}/lines.go (64%) rename {logmower => pkg/lines}/lines_single.go (62%) create mode 100644 pkg/lines/metrics.go rename pkg/{mongo_struct => mongo}/bson_lookup.go (96%) rename logmower/mongo.go => pkg/mongo/metrics.go (50%) create mode 100644 pkg/mongo/mongo.go rename pkg/{mongo_struct => mongo}/mongo_struct.go (98%) create mode 100644 pkg/sender/metrics.go create mode 100644 pkg/sender/sender.go rename pkg/{mongo_struct/populate.go => util/util.go} (56%) create mode 100644 pkg/watcher/metrics.go rename {logmower => pkg/watcher}/watcher.go (60%) diff --git a/logmower/sender.go b/logmower/sender.go deleted file mode 100644 index 990ea39..0000000 --- a/logmower/sender.go +++ /dev/null @@ -1,108 +0,0 @@ -package logmower - -import ( - "context" - "log" - "time" - - ms "git.k-space.ee/k-space/logmower-shipper/pkg/mongo_struct" - "github.com/jtagcat/util" - prom "github.com/prometheus/client_golang/prometheus" - "github.com/prometheus/client_golang/prometheus/promauto" - "go.mongodb.org/mongo-driver/mongo" -) - -var ( - promShipperQueued = promauto.NewGaugeVec(prom.GaugeOpts{ - Namespace: PrometheusPrefix, - // Subsystem: "shipper", - Name: "shipper_record", // "queued", - Help: "Log records in queue to be batched and sent to database", - }, []string{"filename"}) - promShipperDbSent = promauto.NewCounterVec(prom.CounterOpts{ - Namespace: PrometheusPrefix, - // Subsystem: "shipper", - Name: "record", // "sent", - Help: "Log records successfully committed to database", - }, []string{"filename"}) - promShipperBatchSizeResult = promauto.NewHistogram(prom.HistogramOpts{ - Namespace: PrometheusPrefix, - // Subsystem: "shipper", - Name: "bulk_submission_message", // "items_in_batch" - Help: "Batch size for database submissions", - Buckets: []float64{1, 5, 10, 50, 100, 500, 1000, 5000, 10000}, - }) - promShipperDbSendError = promauto.NewCounterVec(prom.CounterOpts{ - Namespace: PrometheusPrefix, - // Subsystem: "shipper", - Name: "insertion_error", // "errors", - Help: "Errors while submitting to database", // TODO: - }, []string{"filename"}) - promShipperSynced = promauto.NewGaugeVec(prom.GaugeOpts{ - Namespace: PrometheusPrefix, - Subsystem: "shipper", - Name: "batches_synced", - Help: "All batches available have been committed database (0 or 1)", - }, []string{"filename"}) -) - -const ( - MaxBatchItems = 10000 - MaxBatchTime = 5 * time.Second -) - -type queueT <-chan ms.Record - -func (queue queueT) sender(db *mongo.Collection, metricsFilename string) { - batched := make(chan []ms.Record) - - // batcher and queue metrics - go func() { - ctx, cancel := context.WithCancel(context.Background()) - defer cancel() - - go func() { - for { - promShipperQueued.WithLabelValues(metricsFilename).Set(float64( - len(queue))) - - timer := time.NewTimer(time.Second) - select { - case <-ctx.Done(): - return - case <-timer.C: - } - } - }() - - util.Batch(MaxBatchItems, MaxBatchTime, queue, batched) - // returns when sendQueue is closed - }() - - for { - promShipperSynced.WithLabelValues(metricsFilename).Set(1) - - batch, ok := <-batched - if !ok { - return - } - promShipperBatchSizeResult.Observe(float64(len(batch))) - - promShipperSynced.WithLabelValues(metricsFilename).Set(0) - - var batchBson []interface{} // mongo does not like typing - for _, b := range batch { - batchBson = append(batchBson, b.ToBson()) - } - - result, err := db.InsertMany(mongoTimeoutCtx(context.Background()), batchBson, nil) - if err != nil { - promShipperDbSendError.WithLabelValues(metricsFilename).Add(1) - log.Printf("failure in batch submit to database: %e", err) // TODO: add some selective retry here or something, better error handling - continue - } - - promShipperDbSent.WithLabelValues(metricsFilename).Add(float64( - len(result.InsertedIDs))) - } -} diff --git a/logmower/submitter.go b/logmower/submitter.go deleted file mode 100644 index 3847cfb..0000000 --- a/logmower/submitter.go +++ /dev/null @@ -1,168 +0,0 @@ -package logmower - -import ( - "context" - "errors" - "fmt" - "io" - "log" - "os" - "time" - - ms "git.k-space.ee/k-space/logmower-shipper/pkg/mongo_struct" - "github.com/jtagcat/util" - prom "github.com/prometheus/client_golang/prometheus" - "github.com/prometheus/client_golang/prometheus/promauto" - "go.mongodb.org/mongo-driver/bson" - "go.mongodb.org/mongo-driver/mongo" - mongoOpt "go.mongodb.org/mongo-driver/mongo/options" - "k8s.io/apimachinery/pkg/util/wait" -) - -var ( - promFileInitialSeekSkipped = promauto.NewGaugeVec(prom.GaugeOpts{ - Namespace: PrometheusPrefix, - // Subsystem: "file", - Name: "skipped_bytes", - Help: "Bytes skipped in file after discovering", - }, []string{"filename"}) - promFileCatchupDone = promauto.NewGaugeVec(prom.GaugeOpts{ - Namespace: PrometheusPrefix, - Subsystem: "file", - Name: "catchupped", - Help: "(0 or) 1 if initial backlog has been sent; (total <= watcher_file_count)", - }, []string{"filename"}) // TODO: rm filename? - promFileErr = promauto.NewCounterVec(prom.CounterOpts{ - Namespace: PrometheusPrefix, - Subsystem: "file", - Name: "errors_count", - Help: "Errors while reading file", - }, []string{"filename"}) - promFileLineSize = promauto.NewHistogramVec(prom.HistogramOpts{ - Namespace: PrometheusPrefix, - // Subsystem: "file", - Name: "line_size_bytes", - Help: "Log line size in bytes", - Buckets: []float64{80, 160, 320, 640, 1280}, - }, []string{"filename"}) -) - -const SendQueueLimit = 1024 - -type file struct { - ms.File - metricsName string // filepath.Base() -} - -// TODO: caller could call duplicate shipFile of same name on file replace: sends might not work properly -func (f file) Process(ctx context.Context, db *mongo.Collection, recordLimitBytes int) { - lineChan := make(chan RawLine) - defer close(lineChan) - - dbQueue := make(chan ms.Record, SendQueueLimit) - go RawLines(lineChan).Process(recordLimitBytes, dbQueue) - - waitGo := util.GoWg(func() { - queueT(dbQueue).sender(db, f.metricsName) - }) - defer waitGo() - - // TODO: better way to kill or wait for sendQueue before retrying (or duplicates?) - _ = wait.ManagedExponentialBackoffWithContext(ctx, defaultBackoff(), func() (done bool, _ error) { - err := f.trySubmit(ctx, db, lineChan) - if err == nil { - return true, nil - } - - promFileErr.WithLabelValues(f.metricsName).Add(1) - log.Printf("processing file %q: %e", f.metricsName, err) - - // nil: loop and keep retrying indefinitely - return false, nil - }) -} - -// use submitter(), don't use directly -func (f file) trySubmit(ctx context.Context, db *mongo.Collection, sendQueue chan<- RawLine) error { - // TODO: better way for respecting ?killing sender for retry - for { - if len(sendQueue) == 0 { - break - } - time.Sleep(time.Second) - } - - // get files with offset - offsetResult, _ := mongoWithErr(db.FindOne(mongoTimeoutCtx(ctx), - bson.D{{Key: ms.RecordKeyHostId, Value: f.Host.Id}, {Key: ms.RecordKeyFilePath, Value: f.Path}}, - &mongoOpt.FindOneOptions{Sort: bson.D{{Key: ms.RecordKeyOffset, Value: -1}}}, // sort descending (get largest) - )) - - offsetResultBytes, err := offsetResult.DecodeBytes() - if err != nil && !errors.Is(err, mongo.ErrNoDocuments) { - return fmt.Errorf("retrieving offset from database: %w", err) - } - - dbOffset := ms.RecordOffsetFromBson(&offsetResultBytes) - - fi, err := os.Stat(f.Path) - if err != nil { - return fmt.Errorf("getting original file size: %w", err) - } - startSize := fi.Size() - - sctx, cancel := context.WithCancel(ctx) - defer cancel() - - promFileInitialSeekSkipped.WithLabelValues(f.metricsName).Set(float64(dbOffset)) - - lineChan, errChan, err := util.TailFile(sctx, f.Path, dbOffset, io.SeekStart) - if err != nil { - return fmt.Errorf("tailing file: %w", err) - } - - var catchUpped bool - promFileCatchupDone.WithLabelValues(f.metricsName).Set(0) - - for { - select { - case err := <-errChan: - return fmt.Errorf("tailing file: %w", err) - - case line, ok := <-lineChan: - if !ok { - return nil - } - - promFileLineSize.WithLabelValues(f.metricsName).Observe(float64(len(line.Bytes))) - - if !catchUpped { - catchUpped = line.EndOffset >= startSize - - if catchUpped { - promFileCatchupDone.WithLabelValues(f.metricsName).Set(1) - } - } - - if len(line.Bytes) == 0 { - continue - } - - sendQueue <- RawLine{ - file: &f, - - Offset: line.EndOffset, - B: line.Bytes, - } - } - } -} - -func mongoWithErr[t interface{ Err() error }](mongoWrap t) (t, error) { - return mongoWrap, mongoWrap.Err() -} - -// func JitterUntilCancelWithContext(pctx context.Context, f func(context.Context, context.CancelFunc), period time.Duration, jitterFactor float64, sliding bool) { -// ctx, cancel := context.WithCancel(pctx) -// wait.JitterUntil(func() { f(ctx, cancel) }, period, jitterFactor, sliding, ctx.Done()) -// } diff --git a/main.go b/main.go index e9409fd..5dcd91c 100644 --- a/main.go +++ b/main.go @@ -7,12 +7,10 @@ import ( "net/http" "os" - "git.k-space.ee/k-space/logmower-shipper/logmower" + "git.k-space.ee/k-space/logmower-shipper/pkg/watcher" "github.com/prometheus/client_golang/prometheus/promhttp" ) -const PrometheusPrefix = "logmower" // TODO: - func main() { go func() { metricsPort := 2112 @@ -26,7 +24,7 @@ func main() { } }() - if err := logmower.App.Run(os.Args); err != nil { + if err := watcher.App.Run(os.Args); err != nil { log.Fatal(err) } } diff --git a/pkg/file/file.go b/pkg/file/file.go new file mode 100644 index 0000000..53fc257 --- /dev/null +++ b/pkg/file/file.go @@ -0,0 +1,143 @@ +package file + +import ( + "context" + "errors" + "fmt" + "io" + "log" + "os" + "time" + + "git.k-space.ee/k-space/logmower-shipper/pkg/globals" + "git.k-space.ee/k-space/logmower-shipper/pkg/lines" + m "git.k-space.ee/k-space/logmower-shipper/pkg/mongo" + "git.k-space.ee/k-space/logmower-shipper/pkg/sender" + "github.com/jtagcat/util" + "go.mongodb.org/mongo-driver/bson" + "go.mongodb.org/mongo-driver/mongo" + mongoOpt "go.mongodb.org/mongo-driver/mongo/options" + "k8s.io/apimachinery/pkg/util/wait" +) + +const SendQueueLimit = 1024 + +type File struct { + *m.File + MetricsName string // filepath.Base() +} + +// TODO: caller could call duplicate shipFile of same name on file replace: sends might not work properly +func (f File) Process(ctx context.Context, db *mongo.Collection, recordLimitBytes int) { + lineChan := make(chan lines.Raw) + defer close(lineChan) + + dbQueue := make(chan m.Record, SendQueueLimit) + go lines.RawC(lineChan).Process(recordLimitBytes, dbQueue) + + waitGo := util.GoWg(func() { + sender.Queue(dbQueue).Sender(db, f.MetricsName) + }) + defer waitGo() + + // TODO: better way to kill or wait for sendQueue before retrying (or duplicates?) + _ = wait.ManagedExponentialBackoffWithContext(ctx, globals.Backoff(), func() (done bool, _ error) { + err := f.trySubmit(ctx, db, lineChan) + if err == nil { + return true, nil + } + + promFileErr.WithLabelValues(f.MetricsName).Add(1) + log.Printf("processing file %q: %e", f.MetricsName, err) + + // nil: loop and keep retrying indefinitely + return false, nil + }) +} + +// use submitter(), don't use directly +func (f File) trySubmit(ctx context.Context, db *mongo.Collection, sendQueue chan<- lines.Raw) error { + lFile := lines.File(f) // file.File, but avoiding import cycle + + // TODO: better way for respecting ?killing sender for retry + for { + if len(sendQueue) == 0 { + break + } + time.Sleep(time.Second) + } + + // get files with offset + offsetResult, _ := mongoWithErr(db.FindOne(globals.MongoTimeout(ctx), + bson.D{{Key: m.RecordKeyHostId, Value: f.Host.Id}, {Key: m.RecordKeyFilePath, Value: f.Path}}, + &mongoOpt.FindOneOptions{Sort: bson.D{{Key: m.RecordKeyOffset, Value: -1}}}, // sort descending (get largest) + )) + + offsetResultBytes, err := offsetResult.DecodeBytes() + if err != nil && !errors.Is(err, mongo.ErrNoDocuments) { + return fmt.Errorf("retrieving offset from database: %w", err) + } + + dbOffset := m.RecordOffsetFromBson(&offsetResultBytes) + + fi, err := os.Stat(f.Path) + if err != nil { + return fmt.Errorf("getting original file size: %w", err) + } + startSize := fi.Size() + + sctx, cancel := context.WithCancel(ctx) + defer cancel() + + promFileInitialSeekSkipped.WithLabelValues(f.MetricsName).Set(float64(dbOffset)) + + lineChan, errChan, err := util.TailFile(sctx, f.Path, dbOffset, io.SeekStart) + if err != nil { + return fmt.Errorf("tailing file: %w", err) + } + + var catchUpped bool + promFileCatchupDone.WithLabelValues(f.MetricsName).Set(0) + + for { + select { + case err := <-errChan: + return fmt.Errorf("tailing file: %w", err) + + case line, ok := <-lineChan: + if !ok { + return nil + } + + promFileLineSize.WithLabelValues(f.MetricsName).Observe(float64(len(line.Bytes))) + + if !catchUpped { + catchUpped = line.EndOffset >= startSize + + if catchUpped { + promFileCatchupDone.WithLabelValues(f.MetricsName).Set(1) + } + } + + if len(line.Bytes) == 0 { + continue + } + + sendQueue <- lines.Raw{ + File: &lFile, + + Offset: line.EndOffset, + B: line.Bytes, + } + } + } +} + +func mongoWithErr[t interface{ Err() error }](mongoWrap t) (t, error) { + return mongoWrap, mongoWrap.Err() +} + +// func JitterUntilCancelWithContext(pctx context.Context, f func(context.Context, context.CancelFunc), period time.Duration, jitterFactor float64, sliding bool) { +// ctx, cancel := context.WithCancel(pctx) +// wait.JitterUntil(func() { f(ctx, cancel) }, period, jitterFactor, sliding, ctx.Done()) +// } diff --git a/pkg/file/metrics.go b/pkg/file/metrics.go new file mode 100644 index 0000000..33fc9df --- /dev/null +++ b/pkg/file/metrics.go @@ -0,0 +1,35 @@ +package file + +import ( + "git.k-space.ee/k-space/logmower-shipper/pkg/globals" + "github.com/prometheus/client_golang/prometheus" + "github.com/prometheus/client_golang/prometheus/promauto" +) + +var ( + promFileInitialSeekSkipped = promauto.NewGaugeVec(prometheus.GaugeOpts{ + Namespace: globals.PrometheusPrefix, + // Subsystem: "file", + Name: "skipped_bytes", + Help: "Bytes skipped in file after discovering", + }, []string{"filename"}) + promFileCatchupDone = promauto.NewGaugeVec(prometheus.GaugeOpts{ + Namespace: globals.PrometheusPrefix, + Subsystem: "file", + Name: "catchupped", + Help: "(0 or) 1 if initial backlog has been sent; (total <= watcher_file_count)", + }, []string{"filename"}) // TODO: rm filename? + promFileErr = promauto.NewCounterVec(prometheus.CounterOpts{ + Namespace: globals.PrometheusPrefix, + Subsystem: "file", + Name: "errors_count", + Help: "Errors while reading file", + }, []string{"filename"}) + promFileLineSize = promauto.NewHistogramVec(prometheus.HistogramOpts{ + Namespace: globals.PrometheusPrefix, + // Subsystem: "file", + Name: "line_size_bytes", + Help: "Log line size in bytes", + Buckets: []float64{80, 160, 320, 640, 1280}, + }, []string{"filename"}) +) diff --git a/pkg/globals/globals.go b/pkg/globals/globals.go new file mode 100644 index 0000000..c31b3a7 --- /dev/null +++ b/pkg/globals/globals.go @@ -0,0 +1,31 @@ +package globals + +import ( + "context" + "time" + + "k8s.io/apimachinery/pkg/util/wait" +) + +// Did not find any better way for multipackage prefixing + +const ( + PrometheusPrefix = "logmower" + AppName = PrometheusPrefix + "shipper" + DatabaseCommandTimeout = 10 * time.Second +) + +func MongoTimeout(ctx context.Context) context.Context { + ctx, _ = context.WithTimeout(ctx, DatabaseCommandTimeout) //nolint:lostcancel (cancelled by mongo, should be bug on them //TODO) + return ctx +} + +// wrapper to force copying before use +func Backoff() wait.Backoff { + return wait.Backoff{ + Duration: 2 * time.Second, + Factor: 1.5, + Jitter: 0.1, + Cap: 30 * time.Second, + } +} diff --git a/logmower/lines.go b/pkg/lines/lines.go similarity index 64% rename from logmower/lines.go rename to pkg/lines/lines.go index e9779f9..53f70b7 100644 --- a/logmower/lines.go +++ b/pkg/lines/lines.go @@ -1,32 +1,29 @@ -package logmower +package lines import ( "log" "sync" - ms "git.k-space.ee/k-space/logmower-shipper/pkg/mongo_struct" - prom "github.com/prometheus/client_golang/prometheus" - "github.com/prometheus/client_golang/prometheus/promauto" + m "git.k-space.ee/k-space/logmower-shipper/pkg/mongo" ) -var promRecordDroppedTooLarge = promauto.NewCounterVec(prom.CounterOpts{ - Namespace: PrometheusPrefix, - // Subsystem: "record", - Name: "dropped_lines", // "dropped", - Help: "Records dropped due to being too large", -}, []string{"filename"}) - type ( - RawLines <-chan RawLine - RawLine struct { - *file + RawC <-chan Raw + Raw struct { + *File Offset int64 B []byte } + + // file.File, but avoiding import cycle + File struct { + *m.File + MetricsName string // filepath.Base() + } ) // assumes all lines are from same file -func (unparsed RawLines) Process(bufferLimitBytes int, parsed chan<- ms.Record) { +func (unparsed RawC) Process(bufferLimitBytes int, parsed chan<- m.Record) { lines := make(chan singleLine) go unparsed.parse(lines) @@ -62,8 +59,8 @@ func (unparsed RawLines) Process(bufferLimitBytes int, parsed chan<- ms.Record) } } -func (lines singleLines) process(bufferLimitBytes int, parsed chan<- ms.Record) { - var firstMetadata *ms.ParsedMetadata +func (lines singleLines) process(bufferLimitBytes int, parsed chan<- m.Record) { + var firstMetadata *m.ParsedMetadata var buffer []byte for { @@ -81,7 +78,7 @@ func (lines singleLines) process(bufferLimitBytes int, parsed chan<- ms.Record) buffer = append(buffer, line.B...) if len(buffer) > bufferLimitBytes { - promRecordDroppedTooLarge.WithLabelValues(line.metricsName).Add(1) + promRecordDroppedTooLarge.WithLabelValues(line.MetricsName).Add(1) log.Printf("dropped record: size in bytes exceeds limit of %d", bufferLimitBytes) buffer = nil @@ -89,8 +86,8 @@ func (lines singleLines) process(bufferLimitBytes int, parsed chan<- ms.Record) } if !line.partial { - parsed <- ms.Record{ - File: line.file.File, + parsed <- m.Record{ + File: line.File.File, Offset: line.Offset, String: string(buffer), diff --git a/logmower/lines_single.go b/pkg/lines/lines_single.go similarity index 62% rename from logmower/lines_single.go rename to pkg/lines/lines_single.go index d9e574a..a573e12 100644 --- a/logmower/lines_single.go +++ b/pkg/lines/lines_single.go @@ -1,4 +1,4 @@ -package logmower +package lines import ( "bytes" @@ -6,19 +6,10 @@ import ( "log" "time" - ms "git.k-space.ee/k-space/logmower-shipper/pkg/mongo_struct" - prom "github.com/prometheus/client_golang/prometheus" - "github.com/prometheus/client_golang/prometheus/promauto" + m "git.k-space.ee/k-space/logmower-shipper/pkg/mongo" ) -var promRecordPrefixParsingErr = promauto.NewCounterVec(prom.CounterOpts{ - Namespace: PrometheusPrefix, - Subsystem: "record", - Name: "parsing_errors", - Help: "Errors while parsing log line prefixes", -}, []string{"filename"}) - -func (unparsed RawLines) parse(parsed chan<- singleLine) { +func (unparsed RawC) parse(parsed chan<- singleLine) { for { raw, ok := <-unparsed if !ok { @@ -26,10 +17,10 @@ func (unparsed RawLines) parse(parsed chan<- singleLine) { return } - line := singleLine{RawLine: raw} + line := singleLine{Raw: raw} if err := line.parse(); err != nil { - promRecordPrefixParsingErr.WithLabelValues(raw.metricsName).Add(1) + promRecordPrefixParsingErr.WithLabelValues(raw.MetricsName).Add(1) log.Printf("parsing kubernetes log line in %q: %e", raw.File.Path, err) } @@ -41,10 +32,10 @@ func (unparsed RawLines) parse(parsed chan<- singleLine) { type ( singleLines <-chan singleLine singleLine struct { - RawLine + Raw // populated by parse() - ms.ParsedMetadata + m.ParsedMetadata partial bool // P or F } ) diff --git a/pkg/lines/metrics.go b/pkg/lines/metrics.go new file mode 100644 index 0000000..c5b7bde --- /dev/null +++ b/pkg/lines/metrics.go @@ -0,0 +1,23 @@ +package lines + +import ( + "git.k-space.ee/k-space/logmower-shipper/pkg/globals" + prom "github.com/prometheus/client_golang/prometheus" + "github.com/prometheus/client_golang/prometheus/promauto" +) + +var ( + promRecordDroppedTooLarge = promauto.NewCounterVec(prom.CounterOpts{ + Namespace: globals.PrometheusPrefix, + // Subsystem: "record", + Name: "dropped_lines", // "dropped", + Help: "Records dropped due to being too large", + }, []string{"filename"}) + + promRecordPrefixParsingErr = promauto.NewCounterVec(prom.CounterOpts{ + Namespace: globals.PrometheusPrefix, + Subsystem: "record", + Name: "parsing_errors", + Help: "Errors while parsing log line prefixes", + }, []string{"filename"}) +) diff --git a/pkg/mongo_struct/bson_lookup.go b/pkg/mongo/bson_lookup.go similarity index 96% rename from pkg/mongo_struct/bson_lookup.go rename to pkg/mongo/bson_lookup.go index 03f12d1..cefd1d6 100644 --- a/pkg/mongo_struct/bson_lookup.go +++ b/pkg/mongo/bson_lookup.go @@ -1,4 +1,4 @@ -package mongo_struct +package mongo import ( "time" diff --git a/logmower/mongo.go b/pkg/mongo/metrics.go similarity index 50% rename from logmower/mongo.go rename to pkg/mongo/metrics.go index 76b0746..6316611 100644 --- a/logmower/mongo.go +++ b/pkg/mongo/metrics.go @@ -1,46 +1,42 @@ -package logmower +package mongo import ( "context" - "fmt" "log" - "net/url" "time" - ms "git.k-space.ee/k-space/logmower-shipper/pkg/mongo_struct" + "git.k-space.ee/k-space/logmower-shipper/pkg/globals" prom "github.com/prometheus/client_golang/prometheus" "github.com/prometheus/client_golang/prometheus/promauto" mongoEvent "go.mongodb.org/mongo-driver/event" - "go.mongodb.org/mongo-driver/mongo" mongoOpt "go.mongodb.org/mongo-driver/mongo/options" ) +const promSubsystem = "database" + var ( promDbHeartbeat = promauto.NewHistogramVec(prom.HistogramOpts{ - Namespace: PrometheusPrefix, - Subsystem: "database", - Name: "heartbeat_time", - Help: "Time in seconds for succeeded heartbeat, or 0 on failure", - Buckets: []float64{0.1, 0.2, 0.5, 1, 5, 10, 50}, + Namespace: globals.PrometheusPrefix, Subsystem: promSubsystem, + Name: "heartbeat_time", + Help: "Time in seconds for succeeded heartbeat, or 0 on failure", + Buckets: []float64{0.1, 0.2, 0.5, 1, 5, 10, 50}, }, []string{"connection_id"}) promDbCmd = promauto.NewHistogramVec(prom.HistogramOpts{ - Namespace: PrometheusPrefix, - Subsystem: "database", - Name: "operation_latency", // "command_time", - Help: "Time in seconds of commands", - Buckets: []float64{0.1, 0.2, 0.5, 1, 5, 10, 50}, + Namespace: globals.PrometheusPrefix, Subsystem: promSubsystem, + Name: "operation_latency", // "command_time", + Help: "Time in seconds of commands", + Buckets: []float64{0.1, 0.2, 0.5, 1, 5, 10, 50}, }, []string{"connection_id", "command_name"}) promDbCmdErr = promauto.NewCounterVec(prom.CounterOpts{ - Namespace: PrometheusPrefix, - Subsystem: "database", - Name: "errors", - Help: "Failed commands (also reflected elsewhere)", + Namespace: globals.PrometheusPrefix, Subsystem: promSubsystem, + Name: "errors", + Help: "Failed commands (also reflected elsewhere)", }, []string{"connection_id", "command_name"}) ) -func mongoMonitoredClientOptions() *mongoOpt.ClientOptions { +func monitoredClientOptions() *mongoOpt.ClientOptions { return mongoOpt.Client(). SetServerMonitor(&mongoEvent.ServerMonitor{ ServerHeartbeatSucceeded: func(ev *mongoEvent.ServerHeartbeatSucceededEvent) { @@ -62,30 +58,3 @@ func mongoMonitoredClientOptions() *mongoOpt.ClientOptions { }, }) } - -func initDatabase(ctx context.Context, uri string) (*mongo.Collection, error) { - uriParsed, err := url.ParseRequestURI(uri) - if err != nil { - return nil, fmt.Errorf("parsing URI for database name: %w", err) - } - - uriParsed.Path = uriParsed.Path[1:] // remove leading slash - if uriParsed.Path == "" { - return nil, fmt.Errorf("URI must include database name (as database to authenticate against)") - } - - dbOpt := mongoMonitoredClientOptions().ApplyURI(uri) - - dbClient, err := mongo.Connect(mongoTimeoutCtx(ctx)) - if err != nil { - return nil, fmt.Errorf("connecting to %q: %w", dbOpt.GetURI(), err) - } - - col := dbClient.Database(uriParsed.Path).Collection("logs") - - if err := ms.InitializeIndexes(mongoTimeoutCtx(ctx), col); err != nil { - return nil, fmt.Errorf("initializing indexes: %w", err) - } - - return col, nil -} diff --git a/pkg/mongo/mongo.go b/pkg/mongo/mongo.go new file mode 100644 index 0000000..1739373 --- /dev/null +++ b/pkg/mongo/mongo.go @@ -0,0 +1,37 @@ +package mongo + +import ( + "context" + "fmt" + "net/url" + + "git.k-space.ee/k-space/logmower-shipper/pkg/globals" + "go.mongodb.org/mongo-driver/mongo" +) + +func Initialize(ctx context.Context, uri string) (*mongo.Collection, error) { + uriParsed, err := url.ParseRequestURI(uri) + if err != nil { + return nil, fmt.Errorf("parsing URI for database name: %w", err) + } + + uriParsed.Path = uriParsed.Path[1:] // remove leading slash + if uriParsed.Path == "" { + return nil, fmt.Errorf("URI must include database name (as database to authenticate against)") + } + + dbOpt := monitoredClientOptions().ApplyURI(uri) + + dbClient, err := mongo.Connect(globals.MongoTimeout(ctx)) + if err != nil { + return nil, fmt.Errorf("connecting to %q: %w", dbOpt.GetURI(), err) + } + + col := dbClient.Database(uriParsed.Path).Collection("logs") + + if err := InitializeIndexes(globals.MongoTimeout(ctx), col); err != nil { + return nil, fmt.Errorf("initializing indexes: %w", err) + } + + return col, nil +} diff --git a/pkg/mongo_struct/mongo_struct.go b/pkg/mongo/mongo_struct.go similarity index 98% rename from pkg/mongo_struct/mongo_struct.go rename to pkg/mongo/mongo_struct.go index 8d5ecd6..14c5623 100644 --- a/pkg/mongo_struct/mongo_struct.go +++ b/pkg/mongo/mongo_struct.go @@ -1,4 +1,4 @@ -package mongo_struct +package mongo import ( "context" @@ -23,7 +23,7 @@ func InitializeIndexes(ctx context.Context, col *mongo.Collection) error { // when editing, also edit everything in this file! type ( Record struct { - File + *File Offset int64 // end, of last line String string @@ -42,13 +42,13 @@ type ( Content any TimeUpstream time.Time } - HostInfo struct { + Host struct { Id string Name string Arch string } File struct { - Host *HostInfo + Host *Host Path string // absolute KubeInfo } diff --git a/pkg/sender/metrics.go b/pkg/sender/metrics.go new file mode 100644 index 0000000..91db2a8 --- /dev/null +++ b/pkg/sender/metrics.go @@ -0,0 +1,41 @@ +package sender + +import ( + "git.k-space.ee/k-space/logmower-shipper/pkg/globals" + "github.com/prometheus/client_golang/prometheus" + "github.com/prometheus/client_golang/prometheus/promauto" +) + +var ( + promShipperQueued = promauto.NewGaugeVec(prometheus.GaugeOpts{ + Namespace: globals.PrometheusPrefix, + // Subsystem: "shipper", + Name: "shipper_record", // "queued", + Help: "Log records in queue to be batched and sent to database", + }, []string{"filename"}) + promShipperDbSent = promauto.NewCounterVec(prometheus.CounterOpts{ + Namespace: globals.PrometheusPrefix, + // Subsystem: "shipper", + Name: "record", // "sent", + Help: "Log records successfully committed to database", + }, []string{"filename"}) + promShipperBatchSizeResult = promauto.NewHistogram(prometheus.HistogramOpts{ + Namespace: globals.PrometheusPrefix, + // Subsystem: "shipper", + Name: "bulk_submission_message", // "items_in_batch" + Help: "Batch size for database submissions", + Buckets: []float64{1, 5, 10, 50, 100, 500, 1000, 5000, 10000}, + }) + promShipperDbSendError = promauto.NewCounterVec(prometheus.CounterOpts{ + Namespace: globals.PrometheusPrefix, + // Subsystem: "shipper", + Name: "insertion_error", // "errors", + Help: "Errors while submitting to database", // TODO: + }, []string{"filename"}) + promShipperSynced = promauto.NewGaugeVec(prometheus.GaugeOpts{ + Namespace: globals.PrometheusPrefix, + Subsystem: "shipper", + Name: "batches_synced", + Help: "All batches available have been committed database (0 or 1)", + }, []string{"filename"}) +) diff --git a/pkg/sender/sender.go b/pkg/sender/sender.go new file mode 100644 index 0000000..a798eac --- /dev/null +++ b/pkg/sender/sender.go @@ -0,0 +1,73 @@ +package sender + +import ( + "context" + "log" + "time" + + "git.k-space.ee/k-space/logmower-shipper/pkg/globals" + m "git.k-space.ee/k-space/logmower-shipper/pkg/mongo" + "github.com/jtagcat/util" + "go.mongodb.org/mongo-driver/mongo" +) + +const ( + MaxBatchItems = 10000 + MaxBatchTime = 5 * time.Second +) + +type Queue <-chan m.Record + +func (queue Queue) Sender(db *mongo.Collection, metricsFilename string) { + batched := make(chan []m.Record) + + // metrics for batcher and queue + go func() { + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + + go func() { + for { + promShipperQueued.WithLabelValues(metricsFilename).Set(float64( + len(queue))) + + timer := time.NewTimer(time.Second) + select { + case <-ctx.Done(): + return + case <-timer.C: + } + } + }() + + util.Batch(MaxBatchItems, MaxBatchTime, queue, batched) + // returns when sendQueue is closed + }() + + for { + promShipperSynced.WithLabelValues(metricsFilename).Set(1) + + batch, ok := <-batched + if !ok { + return + } + promShipperBatchSizeResult.Observe(float64(len(batch))) + + promShipperSynced.WithLabelValues(metricsFilename).Set(0) + + var batchBson []interface{} // mongo does not like typing + for _, b := range batch { + batchBson = append(batchBson, b.ToBson()) + } + + result, err := db.InsertMany(globals.MongoTimeout(context.Background()), batchBson, nil) + if err != nil { + promShipperDbSendError.WithLabelValues(metricsFilename).Add(1) + log.Printf("failure in batch submit to database: %e", err) // TODO: add some selective retry here or something, better error handling + continue + } + + promShipperDbSent.WithLabelValues(metricsFilename).Add(float64( + len(result.InsertedIDs))) + } +} diff --git a/pkg/mongo_struct/populate.go b/pkg/util/util.go similarity index 56% rename from pkg/mongo_struct/populate.go rename to pkg/util/util.go index 2fa5dd8..3375c3b 100644 --- a/pkg/mongo_struct/populate.go +++ b/pkg/util/util.go @@ -1,6 +1,4 @@ -package mongo_struct - -// TODO: this is misc collection of stuff not really fitting in here +package util import ( "fmt" @@ -8,42 +6,44 @@ import ( "path/filepath" "runtime" "strings" + + m "git.k-space.ee/k-space/logmower-shipper/pkg/mongo" ) -func ParseLogName(name string) (m KubeInfo, ok bool) { +func ParseLogFilename(name string) (i m.KubeInfo, ok bool) { name = filepath.Base(name) // https://github.com/kubernetes/design-proposals-archive/blob/8da1442ea29adccea40693357d04727127e045ed/node/kubelet-cri-logging.md // __-.log` - m.Pod, name, ok = strings.Cut(name, "_") + i.Pod, name, ok = strings.Cut(name, "_") if !ok { return } - m.Namespace, name, ok = strings.Cut(name, "_") + i.Namespace, name, ok = strings.Cut(name, "_") if !ok { return } - m.ContainerName, name, ok = strings.Cut(name, "-") + i.ContainerName, name, ok = strings.Cut(name, "-") if !ok { return } - m.ContainerId = strings.TrimSuffix(name, ".log") + i.ContainerId = strings.TrimSuffix(name, ".log") if !strings.HasSuffix(name, ".log") { return } - return m, true + return i, true } -func (h *HostInfo) Populate(nodeName string) (err error) { +func Hostinfo(nodeName string) (h m.Host, err error) { if nodeName == "" { nodeName, err = os.Hostname() if err != nil { - return fmt.Errorf("getting hostname: %w", err) + return h, fmt.Errorf("getting hostname: %w", err) } } @@ -51,12 +51,12 @@ func (h *HostInfo) Populate(nodeName string) (err error) { id, err := os.ReadFile("/etc/machine-id") if err != nil { - return fmt.Errorf("getting machineId: %w", err) + return h, fmt.Errorf("getting machineId: %w", err) } h.Id = strings.TrimSpace(string(id)) h.Arch = runtime.GOARCH - return nil + return h, nil } diff --git a/pkg/watcher/metrics.go b/pkg/watcher/metrics.go new file mode 100644 index 0000000..58f7eb6 --- /dev/null +++ b/pkg/watcher/metrics.go @@ -0,0 +1,40 @@ +package watcher + +import ( + "git.k-space.ee/k-space/logmower-shipper/pkg/globals" + prom "github.com/prometheus/client_golang/prometheus" + "github.com/prometheus/client_golang/prometheus/promauto" +) + +var ( + promWatcherOnline = promauto.NewGauge(prom.GaugeOpts{ + Namespace: globals.PrometheusPrefix, + Subsystem: "watcher", + Name: "online", + Help: "1 if initialized, and directory watcher has been engaged successfully", + }) + promWatcherErr = promauto.NewCounter(prom.CounterOpts{ + Namespace: globals.PrometheusPrefix, + Subsystem: "watcher", + Name: "errors", + Help: "Error in logmower watching log files", + }) + promWatcherFilesStarted = promauto.NewCounter(prom.CounterOpts{ + Namespace: globals.PrometheusPrefix, + // Subsystem: "watcher", + Name: "log_file", // "discovered_logfiles", + Help: "Number of tracked log files", + }) + promWatcherFilesSkipped = promauto.NewCounter(prom.CounterOpts{ + Namespace: globals.PrometheusPrefix, + // Subsystem: "watcher", + Name: "invalid_filename", // "skipped_files", + Help: "Number of files in log directory skipped due to unexpected filename", + }) + promWatcherEvents = promauto.NewCounter(prom.CounterOpts{ + Namespace: globals.PrometheusPrefix, + // Subsystem: "watcher", + Name: "inotify_event", // "events", + Help: "Number of events while watchng (includes initial create events for existing file discovery)", + }) +) diff --git a/logmower/watcher.go b/pkg/watcher/watcher.go similarity index 60% rename from logmower/watcher.go rename to pkg/watcher/watcher.go index 5213b7a..0a4e729 100644 --- a/logmower/watcher.go +++ b/pkg/watcher/watcher.go @@ -1,46 +1,25 @@ -package logmower +package watcher import ( - "context" "fmt" "log" "os" "os/signal" "path/filepath" "sync" - "time" - ms "git.k-space.ee/k-space/logmower-shipper/pkg/mongo_struct" + "git.k-space.ee/k-space/logmower-shipper/pkg/file" + "git.k-space.ee/k-space/logmower-shipper/pkg/globals" + m "git.k-space.ee/k-space/logmower-shipper/pkg/mongo" + "git.k-space.ee/k-space/logmower-shipper/pkg/util" "github.com/fsnotify/fsnotify" - prom "github.com/prometheus/client_golang/prometheus" - "github.com/prometheus/client_golang/prometheus/promauto" "github.com/urfave/cli/v2" - "k8s.io/apimachinery/pkg/util/wait" ) -const DatabaseCommandTimeout = 10 * time.Second - -const PrometheusPrefix = "logmower" // TODO: - -// wrapper to force copying before use -func defaultBackoff() wait.Backoff { - return wait.Backoff{ - Duration: 2 * time.Second, - Factor: 1.5, - Jitter: 0.1, - Cap: 30 * time.Second, - } -} - -func mongoTimeoutCtx(ctx context.Context) context.Context { - ctx, _ = context.WithTimeout(ctx, DatabaseCommandTimeout) //nolint:lostcancel (cancelled by mongo, should be bug on them //TODO) - return ctx -} - var App = &cli.App{ - Name: "logmower-shipper", + Name: globals.AppName, Version: "1.0.0", - Authors: []*cli.Author{{Name: "jtagcat"}}, + Authors: []*cli.Author{{Name: "jtagcat"}, {Name: "codemowers.io"}}, Description: "Collect and ship kubernetes logs", // Usage: "rubykana ", @@ -64,51 +43,18 @@ var App = &cli.App{ }, Action: func(ctx *cli.Context) error { - var ( - promWatcherOnline = promauto.NewGauge(prom.GaugeOpts{ - Namespace: PrometheusPrefix, - Subsystem: "watcher", - Name: "online", - Help: "1 if initialized, and directory watcher has been engaged successfully", - }) - promWatcherErr = promauto.NewCounter(prom.CounterOpts{ - Namespace: PrometheusPrefix, - Subsystem: "watcher", - Name: "errors", - Help: "Error in logmower watching log files", - }) - promWatcherFilesStarted = promauto.NewCounter(prom.CounterOpts{ - Namespace: PrometheusPrefix, - // Subsystem: "watcher", - Name: "log_file", // "discovered_logfiles", - Help: "Number of tracked log files", - }) - promWatcherFilesSkipped = promauto.NewCounter(prom.CounterOpts{ - Namespace: PrometheusPrefix, - // Subsystem: "watcher", - Name: "invalid_filename", // "skipped_files", - Help: "Number of files in log directory skipped due to unexpected filename", - }) - promWatcherEvents = promauto.NewCounter(prom.CounterOpts{ - Namespace: PrometheusPrefix, - // Subsystem: "watcher", - Name: "inotify_event", // "events", - Help: "Number of events while watchng (includes initial create events for existing file discovery)", - }) - ) - ctx.Context, _ = signal.NotifyContext(ctx.Context, os.Interrupt) // TODO: test var wg sync.WaitGroup log.Printf("%s %s starting", ctx.App.Name, ctx.App.Version) - db, err := initDatabase(ctx.Context, ctx.String("mongo-uri")) + db, err := m.Initialize(ctx.Context, ctx.String("mongo-uri")) if err != nil { return fmt.Errorf("initializing database connection: %w", err) } - var hostInfo ms.HostInfo - if err := hostInfo.Populate(ctx.String("node-name")); err != nil { + hostinfo, err := util.Hostinfo(ctx.String("node-name")) + if err != nil { return fmt.Errorf("populating host info: %w", err) } @@ -137,7 +83,7 @@ var App = &cli.App{ } // TODO: #1: || if not in filterset - kubeInfo, ok := ms.ParseLogName(event.Name) + kubeInfo, ok := util.ParseLogFilename(event.Name) if !ok { promWatcherFilesSkipped.Add(1) log.Printf("skipped %q: filename not parsable in kubernetes log format", filepath.Base(event.Name)) @@ -148,13 +94,13 @@ var App = &cli.App{ wg.Add(1) go func() { - file := file{ - File: ms.File{ - Host: &hostInfo, + file := file.File{ + File: &m.File{ + Host: &hostinfo, KubeInfo: kubeInfo, Path: event.Name, }, - metricsName: filepath.Base(event.Name), + MetricsName: filepath.Base(event.Name), } file.Process(ctx.Context, db, ctx.Int("max-record-size"))