parse partial lines

This commit is contained in:
rasmus 2022-11-06 22:02:29 +02:00
parent 532575a87b
commit 1424096176
6 changed files with 224 additions and 66 deletions

174
cmd/line.go Normal file
View File

@ -0,0 +1,174 @@
package logmower
import (
"bytes"
"fmt"
"sync"
"time"
prom "github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/client_golang/prometheus/promauto"
"go.uber.org/zap"
)
var (
promRecordPrefixParsingErr = promauto.NewCounterVec(prom.CounterOpts{
Namespace: PrometheusPrefix,
Subsystem: "record",
Name: "parsing_errors",
Help: "Errors while parsing log line prefixes",
}, []string{"filename"})
promRecordDroppedTooLarge = promauto.NewCounterVec(prom.CounterOpts{
Namespace: PrometheusPrefix,
// Subsystem: "record",
Name: "dropped_lines", // "dropped",
Help: "Records dropped due to being too large",
}, []string{"filename"})
)
type (
rawLine struct {
recordMetadata
line []byte
}
singleLine struct {
mLog
line []byte
isPartial bool // P or F
}
)
func parseSingleContainerLine(line []byte) (u singleLine, err error) {
split := bytes.SplitN(line, []byte(" "), 4)
if len(split) != 4 {
u.line = line
return u, fmt.Errorf("expected at least 3 spaces in container log line, got %d", len(split)-1)
}
u.line = split[3]
u.StdErr = string(split[1]) == "stderr" // or stdout
switch string(split[2]) {
case "P":
u.isPartial = true
case "F":
default:
return u, fmt.Errorf("partial indicator must be 'P' or 'F', not %q", split[2])
}
u.ContainerTime, err = time.Parse(time.RFC3339Nano, string(split[0]))
return u, err
}
func (s *submitter) parseContainerLines(unparsed <-chan rawLine, parsed chan<- singleLine) {
for {
raw, ok := <-unparsed
if !ok {
close(parsed)
return
}
line, err := parseSingleContainerLine(raw.line)
if err != nil {
promRecordPrefixParsingErr.WithLabelValues(raw.File).Add(1)
s.l.Error("parsing single container line", zap.Error(err), zap.String("file", raw.File))
}
line.mLog.recordMetadata = raw.recordMetadata
parsed <- line
}
}
// assumes all lines are from same file
func (s *submitter) parseLines(bufferLimitBytes int, unparsed <-chan rawLine, parsed chan<- mLog) {
lines := make(chan singleLine)
go s.parseContainerLines(unparsed, lines)
var wg sync.WaitGroup
wg.Add(2)
stdOut, stdErr := make(chan singleLine), make(chan singleLine)
go func() {
s.parseStdChannel(bufferLimitBytes, stdOut, parsed)
wg.Done()
}()
go func() {
s.parseStdChannel(bufferLimitBytes, stdErr, parsed)
wg.Done()
}()
// split stdout and stderr
for {
line, ok := <-lines
if !ok {
close(stdOut)
close(stdErr)
wg.Wait()
close(parsed)
return
}
if line.StdErr {
stdErr <- line
} else {
stdOut <- line
}
}
}
// partial is ended with full
func (s *submitter) parseStdChannel(bufferLimitBytes int, lines <-chan singleLine, parsed chan<- mLog) {
var firstTime time.Time
var buffer []byte
flush := func(last *mLog) {
parsed <- mLog{
recordMetadata: last.recordMetadata,
StdErr: last.StdErr,
ContainerTime: firstTime,
Content: parseRecord(buffer),
}
buffer = nil
}
for {
line, ok := <-lines
if !ok {
// discard any partial lines without end delimiter (full line)
return
}
if len(buffer) == 0 {
firstTime = line.ContainerTime
}
buffer = append(buffer, line.line...)
if len(buffer) > bufferLimitBytes {
buffer = nil
promRecordDroppedTooLarge.WithLabelValues(line.File).Add(1)
continue
}
if !line.isPartial {
flush(&line.mLog)
}
}
// TODO: flush last
// use time of first
// metadata of last
// //
// for {
// }
// promRecordDroppedTooLarge
}
func parseRecord(buffer []byte) any {
// TODO: json parser
return string(buffer)
}

View File

@ -20,21 +20,28 @@ func initializeIndexes(ctx context.Context, col *mongo.Collection) error {
} }
// when editing, also edit everything in this file! // when editing, also edit everything in this file!
type mLog struct { type (
mLog struct {
recordMetadata
Content any
ContainerTime time.Time
StdErr bool
// added by toBson()
ShipTime time.Time
}
recordMetadata struct {
HostInfo HostInfo HostInfo HostInfo
File string File string
Offset int64 // byte offset where log entry ends at Offset int64 // byte offset where log entry ends at
Content string // TODO: }
ShipTime time.Time )
CollectTime time.Time
StdErr bool
Format string // F or P TODO: what does it mean? Is there a well-defined log format for cri-o?
}
const ( const (
mongoKeyHostInfo = "host_info" mongoKeyHostInfo = "host_info"
mongoKeyId = "id" mongoKeyId = "id"
mongoKeyHostInfoId = mongoKeyHostInfo + "." + mongoKeyId mongoKeyHostId = mongoKeyHostInfo + "." + mongoKeyId
mongoKeyFileBasename = "file" mongoKeyFileBasename = "file"
mongoKeyOffset = "offset" mongoKeyOffset = "offset"
) )
@ -51,9 +58,9 @@ func (l *mLog) toBson() bson.M {
mongoKeyFileBasename: l.File, mongoKeyFileBasename: l.File,
mongoKeyOffset: l.Offset, mongoKeyOffset: l.Offset,
"content": l.Content, "content": l.Content,
"ship_time": l.ShipTime, "container_time": l.ContainerTime,
"container_time": l.CollectTime,
"stderr": l.StdErr, "stderr": l.StdErr,
"format": l.Format,
"ship_time": time.Now(),
} }
} }

View File

@ -7,7 +7,6 @@ import (
"io" "io"
"os" "os"
"path/filepath" "path/filepath"
"strings"
"sync" "sync"
"time" "time"
@ -47,18 +46,6 @@ var (
Help: "Log line size in bytes", Help: "Log line size in bytes",
Buckets: []float64{80, 160, 320, 640, 1280}, Buckets: []float64{80, 160, 320, 640, 1280},
}, []string{"filename"}) }, []string{"filename"})
promRecordPrefixParsingErr = promauto.NewCounterVec(prom.CounterOpts{
Namespace: PrometheusPrefix,
Subsystem: "record",
Name: "parsing_errors",
Help: "Errors while parsing log line prefixes",
}, []string{"filename"})
promRecordDroppedTooLarge = promauto.NewCounterVec(prom.CounterOpts{
Namespace: PrometheusPrefix,
// Subsystem: "record",
Name: "dropped_lines", // "dropped",
Help: "Records dropped due to being too large",
}, []string{"filename"})
) )
type ( type (
@ -75,18 +62,23 @@ type (
const SendQueueLimit = 1024 const SendQueueLimit = 1024
// TODO: caller may call duplicate shipFile of same name on file replace; sends might not work properly // TODO: caller may call duplicate shipFile of same name on file replace; sends might not work properly
func (s *submitter) shipFile(ctx context.Context, name string, deleteAfterRead bool) { func (s *submitter) shipFile(ctx context.Context, name string, recordLimitBytes int) {
baseName := filepath.Base(name) baseName := filepath.Base(name)
lineChan := make(chan rawLine)
defer close(lineChan)
sendChan := make(chan mLog, SendQueueLimit) sendChan := make(chan mLog, SendQueueLimit)
defer close(sendChan) defer close(sendChan)
go s.parseLines(recordLimitBytes, lineChan, sendChan)
go s.sender(name, sendChan) go s.sender(name, sendChan)
// TODO: better way to kill or wait for sendQueue before retrying (or duplicates?) // TODO: better way to kill or wait for sendQueue before retrying (or duplicates?)
wait.ManagedExponentialBackoffWithContext(ctx, defaultBackoff(), func() (done bool, _ error) { wait.ManagedExponentialBackoffWithContext(ctx, defaultBackoff(), func() (done bool, _ error) {
// //
err := s.shipFileRoutine(ctx, name, sendChan) err := s.shipFileRoutine(ctx, name, lineChan)
if err == nil { if err == nil {
return true, nil return true, nil
} }
@ -97,7 +89,7 @@ func (s *submitter) shipFile(ctx context.Context, name string, deleteAfterRead b
}) })
} }
func (s *submitter) shipFileRoutine(ctx context.Context, name string, sendQueue chan<- mLog) error { func (s *submitter) shipFileRoutine(ctx context.Context, name string, sendQueue chan<- rawLine) error {
baseName := filepath.Base(name) baseName := filepath.Base(name)
// TODO: better way for respecting ?killing sender for retry // TODO: better way for respecting ?killing sender for retry
@ -110,7 +102,7 @@ func (s *submitter) shipFileRoutine(ctx context.Context, name string, sendQueue
// get files with offset // get files with offset
offsetResult, err := mongoWithErr(s.db.FindOne(mongoTimeoutCtx(ctx), offsetResult, err := mongoWithErr(s.db.FindOne(mongoTimeoutCtx(ctx),
bson.D{{Key: mongoKeyHostInfoId, Value: s.hostInfo.id}, {Key: mongoKeyFileBasename, Value: baseName}}, bson.D{{Key: mongoKeyHostId, Value: s.hostInfo.id}, {Key: mongoKeyFileBasename, Value: baseName}},
&mongoOpt.FindOneOptions{Sort: bson.D{{Key: mongoKeyOffset, Value: -1}}}, // sort descending (get largest) &mongoOpt.FindOneOptions{Sort: bson.D{{Key: mongoKeyOffset, Value: -1}}}, // sort descending (get largest)
)) ))
@ -155,7 +147,7 @@ func (s *submitter) shipFileRoutine(ctx context.Context, name string, sendQueue
return nil return nil
} }
promFileLineSize.WithLabelValues(baseName).Observe(float64(len(line.String))) promFileLineSize.WithLabelValues(baseName).Observe(float64(len(line.Bytes)))
if !catchUpped { if !catchUpped {
catchUpped = line.EndOffset >= startSize catchUpped = line.EndOffset >= startSize
@ -165,40 +157,18 @@ func (s *submitter) shipFileRoutine(ctx context.Context, name string, sendQueue
} }
} }
if line.String == "" { if len(line.Bytes) == 0 {
continue continue
} }
var collectTime time.Time sendQueue <- rawLine{
var stdErr, format, log string recordMetadata: recordMetadata{
split := strings.SplitN(line.String, " ", 4)
if len(split) != 4 {
log = line.String
promRecordPrefixParsingErr.WithLabelValues(baseName).Add(1)
s.l.Error("parsing line", zap.Error(fmt.Errorf("expected at least 3 spaces in container log")), zap.Int("got", len(split)-1), zap.String("file", name))
} else {
stdErr, format, log = split[1], split[2], split[3]
collectTime, err = time.Parse(time.RFC3339Nano, split[0])
if err != nil {
promRecordPrefixParsingErr.WithLabelValues(baseName).Add(1)
s.l.Error("parsing line time", zap.Error(err), zap.String("file", name))
}
}
sendQueue <- mLog{
HostInfo: s.hostInfo, HostInfo: s.hostInfo,
File: baseName, File: baseName,
Offset: line.EndOffset, Offset: line.EndOffset,
ShipTime: time.Now(), },
line: line.Bytes,
CollectTime: collectTime,
StdErr: stdErr == "stderr", // or stdout
Format: format,
Content: log,
} }
} }
} }

View File

@ -59,12 +59,18 @@ var App = &cli.App{
&cli.StringFlag{Name: "log-directory", Usage: "Directory to watch for logs", Value: "/var/log/containers"}, &cli.StringFlag{Name: "log-directory", Usage: "Directory to watch for logs", Value: "/var/log/containers"},
&cli.IntFlag{Name: "max-record-size", Value: 128 * 1024, Usage: "Maximum record size in bytes"}, // TODO: &cli.IntFlag{Name: "max-record-size", Value: 128 * 1024, Usage: "Maximum record size in bytes"}, // TODO:
&cli.BoolFlag{Name: "normalize-log-level", Usage: "Normalize log.level values to Syslog defined keywords"}, // TODO: &cli.BoolFlag{Name: "normalize-log-level", Usage: "Normalize log.level values to Syslog defined keywords"}, // TODO:
&cli.BoolFlag{Name: "delete-after-read", Usage: "Delete log file when it is synced to database, and no new lines to read", Value: false},
// &cli.BoolFlag{Name: "parse-json"}, //TODO: // &cli.BoolFlag{Name: "parse-json"}, //TODO:
&cli.StringFlag{Category: "k8s metadata", Name: "pod-namespace", EnvVars: []string{"KUBE_POD_NAMESPACE"}}, // TODO: &cli.StringFlag{Category: "k8s metadata", Name: "pod-namespace", EnvVars: []string{"KUBE_POD_NAMESPACE"}}, // TODO:
&cli.StringFlag{Category: "k8s metadata", Name: "node-name", EnvVars: []string{"KUBE_NODE_NAME"}, Required: true}, &cli.StringFlag{Category: "k8s metadata", Name: "node-name", EnvVars: []string{"KUBE_NODE_NAME"}, Required: true},
&cli.StringFlag{Category: "secrets", Name: "mongo-uri", EnvVars: []string{"MONGO_URI"}, Usage: "mongodb://foo:bar@host:27017/database", Required: true}, &cli.StringFlag{Category: "secrets", Name: "mongo-uri", EnvVars: []string{"MONGO_URI"}, Usage: "mongodb://foo:bar@host:27017/database", Required: true},
}, },
Before: func(ctx *cli.Context) error {
if ctx.Int("max-record-size") < 1 {
return fmt.Errorf("max-record-size must be postivie")
}
return nil
},
Action: func(ctx *cli.Context) error { Action: func(ctx *cli.Context) error {
ctx.Context, _ = signal.NotifyContext(ctx.Context, os.Interrupt) // TODO: test ctx.Context, _ = signal.NotifyContext(ctx.Context, os.Interrupt) // TODO: test
@ -84,7 +90,6 @@ var App = &cli.App{
Name: "online", Name: "online",
Help: "1 if initialized, and directory watcher has been engaged successfully", Help: "1 if initialized, and directory watcher has been engaged successfully",
}) })
promWatcherErr = promauto.NewCounter(prom.CounterOpts{ promWatcherErr = promauto.NewCounter(prom.CounterOpts{
Namespace: PrometheusPrefix, Namespace: PrometheusPrefix,
Subsystem: "watcher", Subsystem: "watcher",
@ -187,7 +192,7 @@ var App = &cli.App{
wg.Add(1) wg.Add(1)
go func() { go func() {
state.shipFile(ctx.Context, event.Name, ctx.Bool("delete-after-read")) state.shipFile(ctx.Context, event.Name, ctx.Int("max-record-size"))
wg.Done() wg.Done()
}() }()

2
go.mod
View File

@ -4,7 +4,7 @@ go 1.19
require ( require (
github.com/fsnotify/fsnotify v1.6.0 github.com/fsnotify/fsnotify v1.6.0
github.com/jtagcat/util v0.0.0-20221106123855-f4d34033addb github.com/jtagcat/util v0.0.0-20221106194134-88ccac17ec03
github.com/prometheus/client_golang v1.13.1 github.com/prometheus/client_golang v1.13.1
github.com/urfave/cli/v2 v2.23.4 github.com/urfave/cli/v2 v2.23.4
go.elastic.co/ecszap v1.0.1 go.elastic.co/ecszap v1.0.1

2
go.sum
View File

@ -157,6 +157,8 @@ github.com/jtagcat/util v0.0.0-20221103213637-071f312fb4b0 h1:XeIjmB047GgFXqDhJR
github.com/jtagcat/util v0.0.0-20221103213637-071f312fb4b0/go.mod h1:VIg6NAm5vU1HwDCL8p/iILmCwvgVCP3/U4QhlS6hftY= github.com/jtagcat/util v0.0.0-20221103213637-071f312fb4b0/go.mod h1:VIg6NAm5vU1HwDCL8p/iILmCwvgVCP3/U4QhlS6hftY=
github.com/jtagcat/util v0.0.0-20221106123855-f4d34033addb h1:D7X3joRJVj/X7LJekb8Rco0QD/RuD/sF/j0pcCWWJas= github.com/jtagcat/util v0.0.0-20221106123855-f4d34033addb h1:D7X3joRJVj/X7LJekb8Rco0QD/RuD/sF/j0pcCWWJas=
github.com/jtagcat/util v0.0.0-20221106123855-f4d34033addb/go.mod h1:VIg6NAm5vU1HwDCL8p/iILmCwvgVCP3/U4QhlS6hftY= github.com/jtagcat/util v0.0.0-20221106123855-f4d34033addb/go.mod h1:VIg6NAm5vU1HwDCL8p/iILmCwvgVCP3/U4QhlS6hftY=
github.com/jtagcat/util v0.0.0-20221106194134-88ccac17ec03 h1:Er8Rl1FzvxZ1//jWhR+4hRl4ZnMSk5qoDiKXGXRKDp4=
github.com/jtagcat/util v0.0.0-20221106194134-88ccac17ec03/go.mod h1:VIg6NAm5vU1HwDCL8p/iILmCwvgVCP3/U4QhlS6hftY=
github.com/julienschmidt/httprouter v1.2.0/go.mod h1:SYymIcj16QtmaHHD7aYtjjsJG7VTCxuUUipMqKk8s4w= github.com/julienschmidt/httprouter v1.2.0/go.mod h1:SYymIcj16QtmaHHD7aYtjjsJG7VTCxuUUipMqKk8s4w=
github.com/julienschmidt/httprouter v1.3.0/go.mod h1:JR6WtHb+2LUe8TCKY3cZOxFyyO8IZAc4RVcycCCAKdM= github.com/julienschmidt/httprouter v1.3.0/go.mod h1:JR6WtHb+2LUe8TCKY3cZOxFyyO8IZAc4RVcycCCAKdM=
github.com/kisielk/gotool v1.0.0/go.mod h1:XhKaO+MFFWcvkIS/tQcRk01m1F5IRFswLeQ+oQHNcck= github.com/kisielk/gotool v1.0.0/go.mod h1:XhKaO+MFFWcvkIS/tQcRk01m1F5IRFswLeQ+oQHNcck=