From 14240961766fcf1273d02cee865283190e63daf8 Mon Sep 17 00:00:00 2001 From: rasmus Date: Sun, 6 Nov 2022 22:02:29 +0200 Subject: [PATCH] parse partial lines --- cmd/line.go | 174 ++++++++++++++++++++++++++++++++++++++++++++ cmd/mongo_struct.go | 35 +++++---- cmd/submitter.go | 66 +++++------------ cmd/watcher.go | 11 ++- go.mod | 2 +- go.sum | 2 + 6 files changed, 224 insertions(+), 66 deletions(-) create mode 100644 cmd/line.go diff --git a/cmd/line.go b/cmd/line.go new file mode 100644 index 0000000..22f880a --- /dev/null +++ b/cmd/line.go @@ -0,0 +1,174 @@ +package logmower + +import ( + "bytes" + "fmt" + "sync" + "time" + + prom "github.com/prometheus/client_golang/prometheus" + "github.com/prometheus/client_golang/prometheus/promauto" + "go.uber.org/zap" +) + +var ( + promRecordPrefixParsingErr = promauto.NewCounterVec(prom.CounterOpts{ + Namespace: PrometheusPrefix, + Subsystem: "record", + Name: "parsing_errors", + Help: "Errors while parsing log line prefixes", + }, []string{"filename"}) + promRecordDroppedTooLarge = promauto.NewCounterVec(prom.CounterOpts{ + Namespace: PrometheusPrefix, + // Subsystem: "record", + Name: "dropped_lines", // "dropped", + Help: "Records dropped due to being too large", + }, []string{"filename"}) +) + +type ( + rawLine struct { + recordMetadata + line []byte + } + + singleLine struct { + mLog + line []byte + isPartial bool // P or F + } +) + +func parseSingleContainerLine(line []byte) (u singleLine, err error) { + split := bytes.SplitN(line, []byte(" "), 4) + if len(split) != 4 { + u.line = line + return u, fmt.Errorf("expected at least 3 spaces in container log line, got %d", len(split)-1) + } + + u.line = split[3] + + u.StdErr = string(split[1]) == "stderr" // or stdout + switch string(split[2]) { + case "P": + u.isPartial = true + case "F": + default: + return u, fmt.Errorf("partial indicator must be 'P' or 'F', not %q", split[2]) + } + + u.ContainerTime, err = time.Parse(time.RFC3339Nano, string(split[0])) + + return u, err +} + +func (s *submitter) parseContainerLines(unparsed <-chan rawLine, parsed chan<- singleLine) { + for { + raw, ok := <-unparsed + if !ok { + close(parsed) + return + } + + line, err := parseSingleContainerLine(raw.line) + if err != nil { + promRecordPrefixParsingErr.WithLabelValues(raw.File).Add(1) + s.l.Error("parsing single container line", zap.Error(err), zap.String("file", raw.File)) + } + + line.mLog.recordMetadata = raw.recordMetadata + parsed <- line + } +} + +// assumes all lines are from same file +func (s *submitter) parseLines(bufferLimitBytes int, unparsed <-chan rawLine, parsed chan<- mLog) { + lines := make(chan singleLine) + go s.parseContainerLines(unparsed, lines) + + var wg sync.WaitGroup + wg.Add(2) + + stdOut, stdErr := make(chan singleLine), make(chan singleLine) + go func() { + s.parseStdChannel(bufferLimitBytes, stdOut, parsed) + wg.Done() + }() + go func() { + s.parseStdChannel(bufferLimitBytes, stdErr, parsed) + wg.Done() + }() + + // split stdout and stderr + for { + line, ok := <-lines + if !ok { + close(stdOut) + close(stdErr) + wg.Wait() + close(parsed) + return + } + + if line.StdErr { + stdErr <- line + } else { + stdOut <- line + } + } +} + +// partial is ended with full + +func (s *submitter) parseStdChannel(bufferLimitBytes int, lines <-chan singleLine, parsed chan<- mLog) { + var firstTime time.Time + var buffer []byte + + flush := func(last *mLog) { + parsed <- mLog{ + recordMetadata: last.recordMetadata, + StdErr: last.StdErr, + + ContainerTime: firstTime, + Content: parseRecord(buffer), + } + buffer = nil + } + + for { + line, ok := <-lines + if !ok { + // discard any partial lines without end delimiter (full line) + return + } + + if len(buffer) == 0 { + firstTime = line.ContainerTime + } + + buffer = append(buffer, line.line...) + + if len(buffer) > bufferLimitBytes { + buffer = nil + promRecordDroppedTooLarge.WithLabelValues(line.File).Add(1) + continue + } + + if !line.isPartial { + flush(&line.mLog) + } + } + // TODO: flush last + // use time of first + // metadata of last + // // + // for { + // } + + // promRecordDroppedTooLarge +} + +func parseRecord(buffer []byte) any { + // TODO: json parser + return string(buffer) +} diff --git a/cmd/mongo_struct.go b/cmd/mongo_struct.go index 7a3eeea..f8e58a9 100644 --- a/cmd/mongo_struct.go +++ b/cmd/mongo_struct.go @@ -20,21 +20,28 @@ func initializeIndexes(ctx context.Context, col *mongo.Collection) error { } // when editing, also edit everything in this file! -type mLog struct { - HostInfo HostInfo - File string - Offset int64 // byte offset where log entry ends at - Content string // TODO: - ShipTime time.Time - CollectTime time.Time - StdErr bool - Format string // F or P TODO: what does it mean? Is there a well-defined log format for cri-o? -} +type ( + mLog struct { + recordMetadata + + Content any + ContainerTime time.Time + StdErr bool + + // added by toBson() + ShipTime time.Time + } + recordMetadata struct { + HostInfo HostInfo + File string + Offset int64 // byte offset where log entry ends at + } +) const ( mongoKeyHostInfo = "host_info" mongoKeyId = "id" - mongoKeyHostInfoId = mongoKeyHostInfo + "." + mongoKeyId + mongoKeyHostId = mongoKeyHostInfo + "." + mongoKeyId mongoKeyFileBasename = "file" mongoKeyOffset = "offset" ) @@ -51,9 +58,9 @@ func (l *mLog) toBson() bson.M { mongoKeyFileBasename: l.File, mongoKeyOffset: l.Offset, "content": l.Content, - "ship_time": l.ShipTime, - "container_time": l.CollectTime, + "container_time": l.ContainerTime, "stderr": l.StdErr, - "format": l.Format, + + "ship_time": time.Now(), } } diff --git a/cmd/submitter.go b/cmd/submitter.go index 7e3740d..e1a4168 100644 --- a/cmd/submitter.go +++ b/cmd/submitter.go @@ -7,7 +7,6 @@ import ( "io" "os" "path/filepath" - "strings" "sync" "time" @@ -47,18 +46,6 @@ var ( Help: "Log line size in bytes", Buckets: []float64{80, 160, 320, 640, 1280}, }, []string{"filename"}) - promRecordPrefixParsingErr = promauto.NewCounterVec(prom.CounterOpts{ - Namespace: PrometheusPrefix, - Subsystem: "record", - Name: "parsing_errors", - Help: "Errors while parsing log line prefixes", - }, []string{"filename"}) - promRecordDroppedTooLarge = promauto.NewCounterVec(prom.CounterOpts{ - Namespace: PrometheusPrefix, - // Subsystem: "record", - Name: "dropped_lines", // "dropped", - Help: "Records dropped due to being too large", - }, []string{"filename"}) ) type ( @@ -75,18 +62,23 @@ type ( const SendQueueLimit = 1024 // TODO: caller may call duplicate shipFile of same name on file replace; sends might not work properly -func (s *submitter) shipFile(ctx context.Context, name string, deleteAfterRead bool) { +func (s *submitter) shipFile(ctx context.Context, name string, recordLimitBytes int) { baseName := filepath.Base(name) + lineChan := make(chan rawLine) + defer close(lineChan) + sendChan := make(chan mLog, SendQueueLimit) defer close(sendChan) + go s.parseLines(recordLimitBytes, lineChan, sendChan) + go s.sender(name, sendChan) // TODO: better way to kill or wait for sendQueue before retrying (or duplicates?) wait.ManagedExponentialBackoffWithContext(ctx, defaultBackoff(), func() (done bool, _ error) { // - err := s.shipFileRoutine(ctx, name, sendChan) + err := s.shipFileRoutine(ctx, name, lineChan) if err == nil { return true, nil } @@ -97,7 +89,7 @@ func (s *submitter) shipFile(ctx context.Context, name string, deleteAfterRead b }) } -func (s *submitter) shipFileRoutine(ctx context.Context, name string, sendQueue chan<- mLog) error { +func (s *submitter) shipFileRoutine(ctx context.Context, name string, sendQueue chan<- rawLine) error { baseName := filepath.Base(name) // TODO: better way for respecting ?killing sender for retry @@ -110,7 +102,7 @@ func (s *submitter) shipFileRoutine(ctx context.Context, name string, sendQueue // get files with offset offsetResult, err := mongoWithErr(s.db.FindOne(mongoTimeoutCtx(ctx), - bson.D{{Key: mongoKeyHostInfoId, Value: s.hostInfo.id}, {Key: mongoKeyFileBasename, Value: baseName}}, + bson.D{{Key: mongoKeyHostId, Value: s.hostInfo.id}, {Key: mongoKeyFileBasename, Value: baseName}}, &mongoOpt.FindOneOptions{Sort: bson.D{{Key: mongoKeyOffset, Value: -1}}}, // sort descending (get largest) )) @@ -155,7 +147,7 @@ func (s *submitter) shipFileRoutine(ctx context.Context, name string, sendQueue return nil } - promFileLineSize.WithLabelValues(baseName).Observe(float64(len(line.String))) + promFileLineSize.WithLabelValues(baseName).Observe(float64(len(line.Bytes))) if !catchUpped { catchUpped = line.EndOffset >= startSize @@ -165,40 +157,18 @@ func (s *submitter) shipFileRoutine(ctx context.Context, name string, sendQueue } } - if line.String == "" { + if len(line.Bytes) == 0 { continue } - var collectTime time.Time - var stdErr, format, log string + sendQueue <- rawLine{ + recordMetadata: recordMetadata{ + HostInfo: s.hostInfo, + File: baseName, - split := strings.SplitN(line.String, " ", 4) - if len(split) != 4 { - log = line.String - promRecordPrefixParsingErr.WithLabelValues(baseName).Add(1) - s.l.Error("parsing line", zap.Error(fmt.Errorf("expected at least 3 spaces in container log")), zap.Int("got", len(split)-1), zap.String("file", name)) - - } else { - stdErr, format, log = split[1], split[2], split[3] - - collectTime, err = time.Parse(time.RFC3339Nano, split[0]) - if err != nil { - promRecordPrefixParsingErr.WithLabelValues(baseName).Add(1) - s.l.Error("parsing line time", zap.Error(err), zap.String("file", name)) - } - } - - sendQueue <- mLog{ - HostInfo: s.hostInfo, - File: baseName, - - Offset: line.EndOffset, - ShipTime: time.Now(), - - CollectTime: collectTime, - StdErr: stdErr == "stderr", // or stdout - Format: format, - Content: log, + Offset: line.EndOffset, + }, + line: line.Bytes, } } } diff --git a/cmd/watcher.go b/cmd/watcher.go index 72a1787..97e5298 100644 --- a/cmd/watcher.go +++ b/cmd/watcher.go @@ -59,12 +59,18 @@ var App = &cli.App{ &cli.StringFlag{Name: "log-directory", Usage: "Directory to watch for logs", Value: "/var/log/containers"}, &cli.IntFlag{Name: "max-record-size", Value: 128 * 1024, Usage: "Maximum record size in bytes"}, // TODO: &cli.BoolFlag{Name: "normalize-log-level", Usage: "Normalize log.level values to Syslog defined keywords"}, // TODO: - &cli.BoolFlag{Name: "delete-after-read", Usage: "Delete log file when it is synced to database, and no new lines to read", Value: false}, // &cli.BoolFlag{Name: "parse-json"}, //TODO: &cli.StringFlag{Category: "k8s metadata", Name: "pod-namespace", EnvVars: []string{"KUBE_POD_NAMESPACE"}}, // TODO: &cli.StringFlag{Category: "k8s metadata", Name: "node-name", EnvVars: []string{"KUBE_NODE_NAME"}, Required: true}, &cli.StringFlag{Category: "secrets", Name: "mongo-uri", EnvVars: []string{"MONGO_URI"}, Usage: "mongodb://foo:bar@host:27017/database", Required: true}, }, + Before: func(ctx *cli.Context) error { + if ctx.Int("max-record-size") < 1 { + return fmt.Errorf("max-record-size must be postivie") + } + + return nil + }, Action: func(ctx *cli.Context) error { ctx.Context, _ = signal.NotifyContext(ctx.Context, os.Interrupt) // TODO: test @@ -84,7 +90,6 @@ var App = &cli.App{ Name: "online", Help: "1 if initialized, and directory watcher has been engaged successfully", }) - promWatcherErr = promauto.NewCounter(prom.CounterOpts{ Namespace: PrometheusPrefix, Subsystem: "watcher", @@ -187,7 +192,7 @@ var App = &cli.App{ wg.Add(1) go func() { - state.shipFile(ctx.Context, event.Name, ctx.Bool("delete-after-read")) + state.shipFile(ctx.Context, event.Name, ctx.Int("max-record-size")) wg.Done() }() diff --git a/go.mod b/go.mod index b7de305..6e6ad6c 100644 --- a/go.mod +++ b/go.mod @@ -4,7 +4,7 @@ go 1.19 require ( github.com/fsnotify/fsnotify v1.6.0 - github.com/jtagcat/util v0.0.0-20221106123855-f4d34033addb + github.com/jtagcat/util v0.0.0-20221106194134-88ccac17ec03 github.com/prometheus/client_golang v1.13.1 github.com/urfave/cli/v2 v2.23.4 go.elastic.co/ecszap v1.0.1 diff --git a/go.sum b/go.sum index 4036306..e32eec2 100644 --- a/go.sum +++ b/go.sum @@ -157,6 +157,8 @@ github.com/jtagcat/util v0.0.0-20221103213637-071f312fb4b0 h1:XeIjmB047GgFXqDhJR github.com/jtagcat/util v0.0.0-20221103213637-071f312fb4b0/go.mod h1:VIg6NAm5vU1HwDCL8p/iILmCwvgVCP3/U4QhlS6hftY= github.com/jtagcat/util v0.0.0-20221106123855-f4d34033addb h1:D7X3joRJVj/X7LJekb8Rco0QD/RuD/sF/j0pcCWWJas= github.com/jtagcat/util v0.0.0-20221106123855-f4d34033addb/go.mod h1:VIg6NAm5vU1HwDCL8p/iILmCwvgVCP3/U4QhlS6hftY= +github.com/jtagcat/util v0.0.0-20221106194134-88ccac17ec03 h1:Er8Rl1FzvxZ1//jWhR+4hRl4ZnMSk5qoDiKXGXRKDp4= +github.com/jtagcat/util v0.0.0-20221106194134-88ccac17ec03/go.mod h1:VIg6NAm5vU1HwDCL8p/iILmCwvgVCP3/U4QhlS6hftY= github.com/julienschmidt/httprouter v1.2.0/go.mod h1:SYymIcj16QtmaHHD7aYtjjsJG7VTCxuUUipMqKk8s4w= github.com/julienschmidt/httprouter v1.3.0/go.mod h1:JR6WtHb+2LUe8TCKY3cZOxFyyO8IZAc4RVcycCCAKdM= github.com/kisielk/gotool v1.0.0/go.mod h1:XhKaO+MFFWcvkIS/tQcRk01m1F5IRFswLeQ+oQHNcck=