From f5240274ec7bc91425fbe348206f6830c8d3b75f Mon Sep 17 00:00:00 2001 From: rasmus Date: Sun, 6 Nov 2022 03:43:18 +0200 Subject: [PATCH] it is shipping and parsing --- .gitignore | 1 + cmd/logmower.go | 44 +++++++++++++++++++++++++++++++++---- cmd/sender.go | 35 ++++++++++++++++++++--------- cmd/submit.go | 58 ++++++++++++++++++++++++++++++++++--------------- 4 files changed, 107 insertions(+), 31 deletions(-) diff --git a/.gitignore b/.gitignore index bd958bd..b4bc95b 100644 --- a/.gitignore +++ b/.gitignore @@ -1,3 +1,4 @@ *.swp .vscode logs/ +__debug_bin diff --git a/cmd/logmower.go b/cmd/logmower.go index 7f5b914..670d445 100644 --- a/cmd/logmower.go +++ b/cmd/logmower.go @@ -8,7 +8,7 @@ import ( "net/url" "os" "os/signal" - "path" + "path/filepath" "runtime" "strings" "sync" @@ -116,6 +116,8 @@ var App = &cli.App{ if err != nil { l.Fatal("parsing URI for mongo database name", zap.Error(err)) } + + uriParsed.Path = uriParsed.Path[1:] // remove leading slash if uriParsed.Path == "" { l.Fatal("mongo database name must be set in mongo URI") } @@ -151,13 +153,12 @@ var App = &cli.App{ continue } - absPath := path.Join(logDir, event.Name) promFilesRead.Add(1) - l.Debug("digesting new file", zap.String("name", absPath)) + l.Debug("digesting new file", zap.String("name", event.Name)) wg.Add(1) go func() { - state.shipFile(ctx.Context, absPath, ctx.Bool("delete-after-read")) + state.shipFile(ctx.Context, event.Name, ctx.Bool("delete-after-read")) wg.Done() }() @@ -171,6 +172,8 @@ var App = &cli.App{ } }() + // TODO: simulate create events files for current files + err = watcher.Add(logDir) if err != nil { promErrWatching.Add(1) @@ -222,3 +225,36 @@ func errAppend(a, b error) error { } return fmt.Errorf("%e; %e", a, b) } + +type logMeta struct { + podName string + podNamespace string + containerName string + containerId string +} + +func parseLogName(name string) (m logMeta, ok bool) { + name = filepath.Base(name) + + // https://github.com/kubernetes/design-proposals-archive/blob/8da1442ea29adccea40693357d04727127e045ed/node/kubelet-cri-logging.md + // __-.log` + + m.podName, name, ok = strings.Cut(name, "_") + if !ok { + return + } + + m.podNamespace, name, ok = strings.Cut(name, "_") + if !ok { + return + } + + m.containerName, name, ok = strings.Cut(name, "-") + if !ok { + return + } + + m.containerId = strings.TrimSuffix(name, ".log") + + return m, true +} diff --git a/cmd/sender.go b/cmd/sender.go index d74bc4e..c0526fd 100644 --- a/cmd/sender.go +++ b/cmd/sender.go @@ -28,6 +28,11 @@ var ( Name: "queue_dropped_count", Help: "Items ready to be batched and sent to mongo, but dropped due to full queue", }, []string{"filename"}) + promLineParsingErr = promauto.NewCounterVec(prom.CounterOpts{ + Subsystem: "shipper", + Name: "lines_parsing_err_count", + Help: "Errors while parsing log line suffixes", + }, []string{"filename"}) promShipperQueueItems = promauto.NewHistogramVec(prom.HistogramOpts{ Subsystem: "shipper", Name: "queue_items", @@ -137,20 +142,30 @@ func (s *submitter) senderRoutine(name string, batched <-chan []mLog, synced *bo // when editing, also edit toBson(); all bson.D (and bson.M) uses type mLog struct { - HostInfo HostInfo - Filename string - Offset int64 // byte offset where log entry ends at - Content string // TODO: - Time time.Time + HostInfo HostInfo + File string + Offset int64 // byte offset where log entry ends at + Content string // TODO: + ShipTime time.Time + CollectTime time.Time + StdErr bool + Format string // F or P TODO: what does it mean? Is there a well-defined log format for cri-o? } // not using marshal, since it is <0.1x performance func (l *mLog) toBson() bson.M { return bson.M{ - "host_info": l.HostInfo, - "filename": l.Filename, - "offset": l.Offset, - "content": l.Content, - "time": l.Time, + "host_info": bson.M{ + "id": l.HostInfo.id, + "name": l.HostInfo.name, + "arch": l.HostInfo.arch, + }, + "filename": l.File, + "offset": l.Offset, + "content": l.Content, + "ship_time": l.ShipTime, + "container_time": l.CollectTime, + "stderr": l.StdErr, + "format": l.Format, } } diff --git a/cmd/submit.go b/cmd/submit.go index cca8d97..ef3492e 100644 --- a/cmd/submit.go +++ b/cmd/submit.go @@ -7,6 +7,7 @@ import ( "io" "os" "path/filepath" + "strings" "sync" "time" @@ -83,8 +84,9 @@ func (s *submitter) shipFile(ctx context.Context, name string, deleteAfterRead b } func (s *submitter) shipFileRoutine(ctx context.Context, name string, deleteOk func() bool, sendQueue chan<- mLog, sigCatchupped chan<- struct{}) error { - // TODO: better way for respecting ?killing sender for retry + baseName := filepath.Base(name) + // TODO: better way for respecting ?killing sender for retry for { if len(sendQueue) == 0 { break @@ -92,34 +94,31 @@ func (s *submitter) shipFileRoutine(ctx context.Context, name string, deleteOk f time.Sleep(time.Second) } - // Initialize in case of new file - log := mLog{ - HostInfo: s.hostInfo, - Filename: name, - } - // get files with offset offsetResult, err := mongoWithErr(s.db.FindOne(mongoTimeoutCtx(ctx), - bson.D{{Key: "hostinfo.id", Value: s.hostInfo.id}, {Key: "filename", Value: name}}, + bson.D{{Key: "hostinfo.id", Value: s.hostInfo.id}, {Key: "file", Value: baseName}}, &mongoOpt.FindOneOptions{Sort: bson.D{{Key: "offset", Value: -1}}}, // sort descending (get largest) )) + if err != nil && !errors.Is(err, mongo.ErrNoDocuments) { return fmt.Errorf("retrieving mongo offset: %w", err) } // offsetResult.DecodeBytes() //TODO: check for extra fields + + var log mLog if err := offsetResult.Decode(&log); err != nil && !errors.Is(err, mongo.ErrNoDocuments) { return fmt.Errorf("decoding mongo offset: %w", err) } - fi, err := os.Stat(log.Filename) + fi, err := os.Stat(name) if err != nil { return fmt.Errorf("getting original file size") } startSize := fi.Size() // TODO: use inotify for file, and end with file deletion or replacement - lineChan, errChan, err := util.TailFile(ctx, log.Filename, log.Offset, io.SeekStart) + lineChan, errChan, err := util.TailFile(ctx, name, log.Offset, io.SeekStart) if err != nil { return fmt.Errorf("tailing file: %w", err) } @@ -137,22 +136,47 @@ func (s *submitter) shipFileRoutine(ctx context.Context, name string, deleteOk f } } + var collectTime time.Time + var stdErr, format, log string + + split := strings.SplitN(line.String, " ", 4) + if len(split) != 4 { + log = line.String + promLineParsingErr.WithLabelValues(baseName).Add(1) + + } else { + stdErr, format, log = split[1], split[2], split[3] + + collectTime, err = time.Parse(time.RFC3339Nano, split[0]) + if err != nil { + promLineParsingErr.WithLabelValues(baseName).Add(1) + } + } + select { case sendQueue <- mLog{ HostInfo: s.hostInfo, - Filename: *line.Filename, + File: baseName, + Offset: line.EndOffset, - Content: line.String, + ShipTime: time.Now(), + + CollectTime: collectTime, + StdErr: stdErr == "stderr", // or stdout + Format: format, + Content: log, }: + default: - promShipperDropped.WithLabelValues(*line.Filename).Add(1) + promShipperDropped.WithLabelValues(baseName).Add(1) } // no new lines - default: - if deleteOk() { - return os.Remove(name) - } + // TODO: ensure we don't instantly jump here + // default: + // if deleteOk() { + // return os.Remove(name) + // } } } }