From ce066129b324f648660e7eec33e4bf7a656d16c6 Mon Sep 17 00:00:00 2001 From: rasmus Date: Sun, 6 Nov 2022 18:41:10 +0200 Subject: [PATCH] prom: homogenize metrics with python prototype --- cmd/mongo.go | 35 +++++++++++----------- cmd/mower.go | 55 ++++++++++++++++++++++++----------- cmd/sender.go | 80 +++++++++++++++++---------------------------------- cmd/submit.go | 52 ++++++++++++++++++++++++++------- 4 files changed, 127 insertions(+), 95 deletions(-) diff --git a/cmd/mongo.go b/cmd/mongo.go index 2efd6ad..7a70c60 100644 --- a/cmd/mongo.go +++ b/cmd/mongo.go @@ -12,45 +12,48 @@ import ( ) func mongoMonitoredClientOptions(l *zap.Logger) *mongoOpt.ClientOptions { - promMongoHeartbeat := promauto.NewHistogramVec(prom.HistogramOpts{ - Subsystem: "mongo", + promDbHeartbeat := promauto.NewHistogramVec(prom.HistogramOpts{ + Namespace: PrometheusPrefix, + Subsystem: "database", Name: "heartbeat_time", Help: "Time in ns for succeeded heartbeat, or 0 on failure", Buckets: []float64{1}, }, []string{"connection_id"}) - promMongoCmd := promauto.NewHistogramVec(prom.HistogramOpts{ - Subsystem: "mongo", - Name: "command_time", + promDbCmd := promauto.NewHistogramVec(prom.HistogramOpts{ + Namespace: PrometheusPrefix, + Subsystem: "database", + Name: "operation_latency", // "command_time", Help: "Time in ns of commands", - Buckets: prom.DefBuckets, + Buckets: []float64{0.1, 0.2, 0.5, 1, 5, 10, 50}, }, []string{"connection_id", "command_name"}) - promMongoCmdErr := promauto.NewCounterVec(prom.CounterOpts{ - Subsystem: "mongo", + promDbCmdErr := promauto.NewCounterVec(prom.CounterOpts{ + Namespace: PrometheusPrefix, + Subsystem: "database", Name: "errors", - Help: "Count of failed commands", + Help: "Failed commands (also reflected elsewhere)", }, []string{"connection_id", "command_name"}) return mongoOpt.Client(). SetServerMonitor(&mongoEvent.ServerMonitor{ ServerHeartbeatSucceeded: func(ev *mongoEvent.ServerHeartbeatSucceededEvent) { - promMongoHeartbeat.WithLabelValues(ev.ConnectionID).Observe(float64(ev.DurationNanos)) + promDbHeartbeat.WithLabelValues(ev.ConnectionID).Observe(float64(ev.DurationNanos)) }, ServerHeartbeatFailed: func(ev *mongoEvent.ServerHeartbeatFailedEvent) { - promMongoHeartbeat.WithLabelValues(ev.ConnectionID).Observe(0) - l.Error("mongo heartbeat", zap.Error(ev.Failure), zap.String("connection_id", ev.ConnectionID)) + promDbHeartbeat.WithLabelValues(ev.ConnectionID).Observe(0) + l.Error("database heartbeat", zap.Error(ev.Failure), zap.String("connection_id", ev.ConnectionID)) }, }). SetMonitor(&mongoEvent.CommandMonitor{ Succeeded: func(_ context.Context, ev *mongoEvent.CommandSucceededEvent) { - promMongoCmd.WithLabelValues(ev.ConnectionID, ev.CommandName).Observe(float64(ev.DurationNanos)) + promDbCmd.WithLabelValues(ev.ConnectionID, ev.CommandName).Observe(float64(ev.DurationNanos)) }, Failed: func(_ context.Context, ev *mongoEvent.CommandFailedEvent) { - promMongoCmd.WithLabelValues(ev.ConnectionID, ev.CommandName).Observe(float64(ev.DurationNanos)) + promDbCmd.WithLabelValues(ev.ConnectionID, ev.CommandName).Observe(float64(ev.DurationNanos)) - promMongoCmdErr.WithLabelValues(ev.ConnectionID, ev.CommandName).Add(1) - l.Error("mongo command", zap.Error(fmt.Errorf("%s", ev.Failure)), zap.String("connection_id", ev.ConnectionID), zap.String("command_name", ev.CommandName)) // TODO: https://github.com/mongodb/mongo-go-driver/pull/1105 + promDbCmdErr.WithLabelValues(ev.ConnectionID, ev.CommandName).Add(1) + l.Error("database command", zap.Error(fmt.Errorf("%s", ev.Failure)), zap.String("connection_id", ev.ConnectionID), zap.String("command_name", ev.CommandName)) // TODO: https://github.com/mongodb/mongo-go-driver/pull/1105 }, }) } diff --git a/cmd/mower.go b/cmd/mower.go index 86b4dfd..72a1787 100644 --- a/cmd/mower.go +++ b/cmd/mower.go @@ -26,9 +26,9 @@ import ( ) const ( - MachineId = "/etc/machine-id" - MongoTimeout = 10 * time.Second - PrometheusPrefix = "logmower-shipper" + MachineId = "/etc/machine-id" + DatabaseCommandTimeout = 10 * time.Second + PrometheusPrefix = "logmower" ) // wrapper to force copying before use @@ -42,7 +42,7 @@ func defaultBackoff() wait.Backoff { } func mongoTimeoutCtx(ctx context.Context) context.Context { - ctx, _ = context.WithTimeout(ctx, MongoTimeout) //nolint:lostcancel (cancelled by mongo, should be bug on them //TODO) + ctx, _ = context.WithTimeout(ctx, DatabaseCommandTimeout) //nolint:lostcancel (cancelled by mongo, should be bug on them //TODO) return ctx } @@ -59,7 +59,7 @@ var App = &cli.App{ &cli.StringFlag{Name: "log-directory", Usage: "Directory to watch for logs", Value: "/var/log/containers"}, &cli.IntFlag{Name: "max-record-size", Value: 128 * 1024, Usage: "Maximum record size in bytes"}, // TODO: &cli.BoolFlag{Name: "normalize-log-level", Usage: "Normalize log.level values to Syslog defined keywords"}, // TODO: - &cli.BoolFlag{Name: "delete-after-read", Usage: "Delete log file when it is synced to mongo, and no new lines to read", Value: false}, + &cli.BoolFlag{Name: "delete-after-read", Usage: "Delete log file when it is synced to database, and no new lines to read", Value: false}, // &cli.BoolFlag{Name: "parse-json"}, //TODO: &cli.StringFlag{Category: "k8s metadata", Name: "pod-namespace", EnvVars: []string{"KUBE_POD_NAMESPACE"}}, // TODO: &cli.StringFlag{Category: "k8s metadata", Name: "node-name", EnvVars: []string{"KUBE_NODE_NAME"}, Required: true}, @@ -85,17 +85,29 @@ var App = &cli.App{ Help: "1 if initialized, and directory watcher has been engaged successfully", }) - promErrWatching = promauto.NewCounter(prom.CounterOpts{ + promWatcherErr = promauto.NewCounter(prom.CounterOpts{ Namespace: PrometheusPrefix, Subsystem: "watcher", Name: "errors", Help: "Error in logmower watching log files", }) - promFilesRead = promauto.NewCounter(prom.CounterOpts{ + promWatcherFilesStarted = promauto.NewCounter(prom.CounterOpts{ Namespace: PrometheusPrefix, - Subsystem: "watcher", - Name: "seen_files", - Help: "Number of tracked log files", + // Subsystem: "watcher", + Name: "log_file", // "discovered_logfiles", + Help: "Number of tracked log files", + }) + promWatcherFilesSkipped = promauto.NewCounter(prom.CounterOpts{ + Namespace: PrometheusPrefix, + // Subsystem: "watcher", + Name: "invalid_filename", // "skipped_files", + Help: "Number of files in log directory skipped due to unexpected filename", + }) + promWatcherEvents = promauto.NewCounter(prom.CounterOpts{ + Namespace: PrometheusPrefix, + // Subsystem: "watcher", + Name: "inotify_event", // "events", + Help: "Number of events while watchng (includes initial create events for existing file discovery)", }) ) go func() { @@ -113,7 +125,7 @@ var App = &cli.App{ dbClient, err := mongo.Connect(mongoTimeoutCtx(ctx.Context)) if err != nil { - l.Fatal("connecting to mongo", zap.String("uri", dbOpt.GetURI()), zap.Error(err)) + l.Fatal("connecting to database", zap.String("uri", dbOpt.GetURI()), zap.Error(err)) } uriParsed, err := url.ParseRequestURI(ctx.String("mongo-uri")) @@ -157,12 +169,20 @@ var App = &cli.App{ if !ok { return } - // TODO: #1: || if not in filterset + promWatcherEvents.Add(1) + if event.Op != fsnotify.Create { continue } - promFilesRead.Add(1) + // TODO: #1: || if not in filterset + _, ok = parseLogName(event.Name) + if !ok { + promWatcherFilesSkipped.Add(1) + continue + } + + promWatcherFilesStarted.Add(1) l.Debug("digesting new file", zap.String("name", event.Name)) wg.Add(1) @@ -175,7 +195,7 @@ var App = &cli.App{ if !ok { return } - promErrWatching.Add(1) + promWatcherErr.Add(1) l.Error("while watching log dir events", zap.Error(err)) } } @@ -184,13 +204,13 @@ var App = &cli.App{ // simulate create events to pick up files already created err = simulateInitialCreate(logDir, watcher.Events) if err != nil { - promErrWatching.Add(1) + promWatcherErr.Add(1) l.Fatal("listing initial log directory", zap.String("name", logDir), zap.Error(err)) } err = watcher.Add(logDir) if err != nil { - promErrWatching.Add(1) + promWatcherErr.Add(1) l.Fatal("watching log directory", zap.String("name", logDir), zap.Error(err)) } @@ -269,6 +289,9 @@ func parseLogName(name string) (m logMeta, ok bool) { } m.containerId = strings.TrimSuffix(name, ".log") + if !strings.HasSuffix(name, ".log") { + return + } return m, true } diff --git a/cmd/sender.go b/cmd/sender.go index f80f7f6..b78cd9c 100644 --- a/cmd/sender.go +++ b/cmd/sender.go @@ -12,70 +12,44 @@ import ( ) var ( - promShipperMongoSent = promauto.NewCounterVec(prom.CounterOpts{ - Namespace: PrometheusPrefix, - Subsystem: "shipper", - Name: "sent", - Help: "Log lines successfully committed to mongo", - }, []string{"filename"}) - promShipperMongoSentError = promauto.NewCounterVec(prom.CounterOpts{ - Namespace: PrometheusPrefix, - Subsystem: "shipper", - Name: "mongo_errors", - Help: "Errors while submitting to mongo", // TODO: - }, []string{"filename"}) - promLineParsingErr = promauto.NewCounterVec(prom.CounterOpts{ - Namespace: PrometheusPrefix, - Subsystem: "shipper", - Name: "lines_parsing_errors", - Help: "Errors while parsing log line suffixes", - }, []string{"filename"}) promShipperQueued = promauto.NewGaugeVec(prom.GaugeOpts{ Namespace: PrometheusPrefix, - Subsystem: "shipper", - Name: "queued", - Help: "Log lines in queue to be batched and sent to mongo", + // Subsystem: "shipper", + Name: "shipper_record", // "queued", + Help: "Log records in queue to be batched and sent to database", + }, []string{"filename"}) + promShipperDbSent = promauto.NewCounterVec(prom.CounterOpts{ + Namespace: PrometheusPrefix, + // Subsystem: "shipper", + Name: "record", // "sent", + Help: "Log records successfully committed to database", + }, []string{"filename"}) + promShipperBatchSizeResult = promauto.NewHistogram(prom.HistogramOpts{ + Namespace: PrometheusPrefix, + // Subsystem: "shipper", + Name: "bulk_submission_message", // "items_in_batch" + Help: "Batch size for database submissions", + Buckets: []float64{1, 5, 10, 50, 100, 500, 1000, 5000, 10000}, + }) + promShipperMongoSentError = promauto.NewCounterVec(prom.CounterOpts{ + Namespace: PrometheusPrefix, + // Subsystem: "shipper", + Name: "insertion_error", // "errors", + Help: "Errors while submitting to database", // TODO: }, []string{"filename"}) promShipperSynced = promauto.NewGaugeVec(prom.GaugeOpts{ Namespace: PrometheusPrefix, Subsystem: "shipper", Name: "batches_synced", - Help: "All batches available have been sent to mongo", + Help: "All batches available have been committed database (0 or 1)", }, []string{"filename"}) ) const ( - MaxBatchItems = 100 - MaxBatchTime = time.Second + MaxBatchItems = 10000 + MaxBatchTime = 5 * time.Second ) -func init() { - promauto.NewGaugeFunc(prom.GaugeOpts{ - Namespace: PrometheusPrefix, - Subsystem: "shipper", - Name: "queue_size", - Help: "Submit queue size cap", - }, func() float64 { - return float64(SendQueueLimit) - }) - promauto.NewGaugeFunc(prom.GaugeOpts{ - Namespace: PrometheusPrefix, - Subsystem: "shipper", - Name: "batch_size", - Help: "batching size cap", - }, func() float64 { - return float64(MaxBatchItems) - }) - promauto.NewGaugeFunc(prom.GaugeOpts{ - Namespace: PrometheusPrefix, - Subsystem: "shipper", - Name: "batch_time", - Help: "batching delay cap", - }, func() float64 { - return float64(MaxBatchTime) - }) -} - func (s *submitter) sender(name string, sendQueue <-chan mLog) { baseName := filepath.Base(name) @@ -123,11 +97,11 @@ func (s *submitter) sender(name string, sendQueue <-chan mLog) { } result, err := s.db.InsertMany(mongoTimeoutCtx(context.Background()), batchBson, nil) - promShipperMongoSent.WithLabelValues(baseName).Add(float64( + promShipperDbSent.WithLabelValues(baseName).Add(float64( len(result.InsertedIDs))) if err != nil { - s.l.Error("mongo send returned error; TODO: add some selective retry here or something", zap.Error(err)) // TODO: + s.l.Error("submission to database", zap.Error(err)) // TODO: add some selective retry here or something } } } diff --git a/cmd/submit.go b/cmd/submit.go index ad9e7e4..7e3740d 100644 --- a/cmd/submit.go +++ b/cmd/submit.go @@ -22,17 +22,42 @@ import ( ) var ( - promCatchupDone = promauto.NewGaugeVec(prom.GaugeOpts{ + promFileInitialSeekSkipped = promauto.NewGaugeVec(prom.GaugeOpts{ + Namespace: PrometheusPrefix, + // Subsystem: "file", + Name: "skipped_bytes", + Help: "Bytes skipped in file after discovering", + }, []string{"filename"}) + promFileCatchupDone = promauto.NewGaugeVec(prom.GaugeOpts{ Namespace: PrometheusPrefix, Subsystem: "file", Name: "catchupped", - Help: "Files where initial backlog has been sent; (total <= watcher_file_count)", + Help: "(0 or) 1 if initial backlog has been sent; (total <= watcher_file_count)", }, []string{"filename"}) // TODO: rm filename? promFileErr = promauto.NewCounterVec(prom.CounterOpts{ Namespace: PrometheusPrefix, Subsystem: "file", Name: "errors_count", - Help: "Error count for reading files", + Help: "Errors while reading file", + }, []string{"filename"}) + promFileLineSize = promauto.NewHistogramVec(prom.HistogramOpts{ + Namespace: PrometheusPrefix, + // Subsystem: "file", + Name: "line_size_bytes", + Help: "Log line size in bytes", + Buckets: []float64{80, 160, 320, 640, 1280}, + }, []string{"filename"}) + promRecordPrefixParsingErr = promauto.NewCounterVec(prom.CounterOpts{ + Namespace: PrometheusPrefix, + Subsystem: "record", + Name: "parsing_errors", + Help: "Errors while parsing log line prefixes", + }, []string{"filename"}) + promRecordDroppedTooLarge = promauto.NewCounterVec(prom.CounterOpts{ + Namespace: PrometheusPrefix, + // Subsystem: "record", + Name: "dropped_lines", // "dropped", + Help: "Records dropped due to being too large", }, []string{"filename"}) ) @@ -58,7 +83,7 @@ func (s *submitter) shipFile(ctx context.Context, name string, deleteAfterRead b go s.sender(name, sendChan) - // TODO: better way to kill or wait for mongo sendQueue before retrying (or duplicates?) + // TODO: better way to kill or wait for sendQueue before retrying (or duplicates?) wait.ManagedExponentialBackoffWithContext(ctx, defaultBackoff(), func() (done bool, _ error) { // err := s.shipFileRoutine(ctx, name, sendChan) @@ -90,12 +115,12 @@ func (s *submitter) shipFileRoutine(ctx context.Context, name string, sendQueue )) if err != nil && !errors.Is(err, mongo.ErrNoDocuments) { - return fmt.Errorf("retrieving mongo offset: %w", err) + return fmt.Errorf("retrieving offset from database: %w", err) } var log mLog if err := offsetResult.Decode(&log); err != nil && !errors.Is(err, mongo.ErrNoDocuments) { - return fmt.Errorf("decoding mongo offset: %w", err) + return fmt.Errorf("decoding offset from database: %w", err) } fi, err := os.Stat(name) @@ -107,13 +132,18 @@ func (s *submitter) shipFileRoutine(ctx context.Context, name string, sendQueue sctx, cancel := context.WithCancel(ctx) defer cancel() + promFileInitialSeekSkipped.WithLabelValues(baseName).Set(float64(log.Offset)) + lineChan, errChan, err := util.TailFile(sctx, name, log.Offset, io.SeekStart) if err != nil { return fmt.Errorf("tailing file: %w", err) } var catchUpped bool // cache - promCatchupDone.WithLabelValues(baseName).Set(0) + promFileCatchupDone.WithLabelValues(baseName).Set(0) + + // TODO: partial line combining + // TODO: promRecordDroppedTooLarge for { select { @@ -125,11 +155,13 @@ func (s *submitter) shipFileRoutine(ctx context.Context, name string, sendQueue return nil } + promFileLineSize.WithLabelValues(baseName).Observe(float64(len(line.String))) + if !catchUpped { catchUpped = line.EndOffset >= startSize if catchUpped { - promCatchupDone.WithLabelValues(baseName).Set(1) + promFileCatchupDone.WithLabelValues(baseName).Set(1) } } @@ -143,7 +175,7 @@ func (s *submitter) shipFileRoutine(ctx context.Context, name string, sendQueue split := strings.SplitN(line.String, " ", 4) if len(split) != 4 { log = line.String - promLineParsingErr.WithLabelValues(baseName).Add(1) + promRecordPrefixParsingErr.WithLabelValues(baseName).Add(1) s.l.Error("parsing line", zap.Error(fmt.Errorf("expected at least 3 spaces in container log")), zap.Int("got", len(split)-1), zap.String("file", name)) } else { @@ -151,7 +183,7 @@ func (s *submitter) shipFileRoutine(ctx context.Context, name string, sendQueue collectTime, err = time.Parse(time.RFC3339Nano, split[0]) if err != nil { - promLineParsingErr.WithLabelValues(baseName).Add(1) + promRecordPrefixParsingErr.WithLabelValues(baseName).Add(1) s.l.Error("parsing line time", zap.Error(err), zap.String("file", name)) } }