diff --git a/cmd/line.go b/cmd/line.go index 22f880a..6937843 100644 --- a/cmd/line.go +++ b/cmd/line.go @@ -62,7 +62,7 @@ func parseSingleContainerLine(line []byte) (u singleLine, err error) { return u, err } -func (s *submitter) parseContainerLines(unparsed <-chan rawLine, parsed chan<- singleLine) { +func (s *submitter) parseContainerLogLines(unparsed <-chan rawLine, parsed chan<- singleLine) { for { raw, ok := <-unparsed if !ok { @@ -73,7 +73,7 @@ func (s *submitter) parseContainerLines(unparsed <-chan rawLine, parsed chan<- s line, err := parseSingleContainerLine(raw.line) if err != nil { promRecordPrefixParsingErr.WithLabelValues(raw.File).Add(1) - s.l.Error("parsing single container line", zap.Error(err), zap.String("file", raw.File)) + s.l.Error("parsing container log line", zap.Error(err), zap.String("file", raw.File)) } line.mLog.recordMetadata = raw.recordMetadata @@ -84,7 +84,7 @@ func (s *submitter) parseContainerLines(unparsed <-chan rawLine, parsed chan<- s // assumes all lines are from same file func (s *submitter) parseLines(bufferLimitBytes int, unparsed <-chan rawLine, parsed chan<- mLog) { lines := make(chan singleLine) - go s.parseContainerLines(unparsed, lines) + go s.parseContainerLogLines(unparsed, lines) var wg sync.WaitGroup wg.Add(2) @@ -151,6 +151,7 @@ func (s *submitter) parseStdChannel(bufferLimitBytes int, lines <-chan singleLin if len(buffer) > bufferLimitBytes { buffer = nil promRecordDroppedTooLarge.WithLabelValues(line.File).Add(1) + s.l.Warn("dropped record: too large", zap.Int("cap_bytes", bufferLimitBytes)) continue } diff --git a/cmd/mongo.go b/cmd/mongo.go index a17eecc..854f635 100644 --- a/cmd/mongo.go +++ b/cmd/mongo.go @@ -2,7 +2,6 @@ package logmower import ( "context" - "fmt" "time" prom "github.com/prometheus/client_golang/prometheus" @@ -45,7 +44,7 @@ func mongoMonitoredClientOptions(l *zap.Logger) *mongoOpt.ClientOptions { }, ServerHeartbeatFailed: func(ev *mongoEvent.ServerHeartbeatFailedEvent) { promDbHeartbeat.WithLabelValues(ev.ConnectionID).Observe(0) - l.Error("database heartbeat", zap.Error(ev.Failure), zap.String("connection_id", ev.ConnectionID)) + l.Warn("database heartbeat", zap.Error(ev.Failure), zap.String("connection_id", ev.ConnectionID)) }, }). SetMonitor(&mongoEvent.CommandMonitor{ @@ -56,7 +55,6 @@ func mongoMonitoredClientOptions(l *zap.Logger) *mongoOpt.ClientOptions { promDbCmd.WithLabelValues(ev.ConnectionID, ev.CommandName).Observe(time.Duration(ev.DurationNanos).Seconds()) promDbCmdErr.WithLabelValues(ev.ConnectionID, ev.CommandName).Add(1) - l.Error("database command", zap.Error(fmt.Errorf("%s", ev.Failure)), zap.String("connection_id", ev.ConnectionID), zap.String("command_name", ev.CommandName)) // TODO: https://github.com/mongodb/mongo-go-driver/pull/1105 }, }) } diff --git a/cmd/sender.go b/cmd/sender.go index b78cd9c..900341a 100644 --- a/cmd/sender.go +++ b/cmd/sender.go @@ -97,11 +97,12 @@ func (s *submitter) sender(name string, sendQueue <-chan mLog) { } result, err := s.db.InsertMany(mongoTimeoutCtx(context.Background()), batchBson, nil) - promShipperDbSent.WithLabelValues(baseName).Add(float64( - len(result.InsertedIDs))) - if err != nil { s.l.Error("submission to database", zap.Error(err)) // TODO: add some selective retry here or something + continue } + + promShipperDbSent.WithLabelValues(baseName).Add(float64( + len(result.InsertedIDs))) } } diff --git a/cmd/watcher.go b/cmd/watcher.go index 97e5298..7f30960 100644 --- a/cmd/watcher.go +++ b/cmd/watcher.go @@ -184,11 +184,11 @@ var App = &cli.App{ _, ok = parseLogName(event.Name) if !ok { promWatcherFilesSkipped.Add(1) + l.Warn("skipped file with unparsable name", zap.String("name", event.Name)) continue } promWatcherFilesStarted.Add(1) - l.Debug("digesting new file", zap.String("name", event.Name)) wg.Add(1) go func() {