176 lines
3.6 KiB
Go
176 lines
3.6 KiB
Go
package logmower
|
|
|
|
import (
|
|
"bytes"
|
|
"fmt"
|
|
"sync"
|
|
"time"
|
|
|
|
prom "github.com/prometheus/client_golang/prometheus"
|
|
"github.com/prometheus/client_golang/prometheus/promauto"
|
|
"go.uber.org/zap"
|
|
)
|
|
|
|
var (
|
|
promRecordPrefixParsingErr = promauto.NewCounterVec(prom.CounterOpts{
|
|
Namespace: PrometheusPrefix,
|
|
Subsystem: "record",
|
|
Name: "parsing_errors",
|
|
Help: "Errors while parsing log line prefixes",
|
|
}, []string{"filename"})
|
|
promRecordDroppedTooLarge = promauto.NewCounterVec(prom.CounterOpts{
|
|
Namespace: PrometheusPrefix,
|
|
// Subsystem: "record",
|
|
Name: "dropped_lines", // "dropped",
|
|
Help: "Records dropped due to being too large",
|
|
}, []string{"filename"})
|
|
)
|
|
|
|
type (
|
|
rawLine struct {
|
|
RecordMetadata
|
|
line []byte
|
|
}
|
|
|
|
singleLine struct {
|
|
mLog
|
|
line []byte
|
|
isPartial bool // P or F
|
|
}
|
|
)
|
|
|
|
func parseSingleContainerLine(line []byte) (u singleLine, err error) {
|
|
split := bytes.SplitN(line, []byte(" "), 4)
|
|
if len(split) != 4 {
|
|
u.line = line
|
|
return u, fmt.Errorf("expected at least 3 spaces in container log line, got %d", len(split)-1)
|
|
}
|
|
|
|
u.line = split[3]
|
|
|
|
u.StdErr = string(split[1]) == "stderr" // or stdout
|
|
switch string(split[2]) {
|
|
case "P":
|
|
u.isPartial = true
|
|
case "F":
|
|
default:
|
|
return u, fmt.Errorf("partial indicator must be 'P' or 'F', not %q", split[2])
|
|
}
|
|
|
|
u.ContainerTime, err = time.Parse(time.RFC3339Nano, string(split[0]))
|
|
|
|
return u, err
|
|
}
|
|
|
|
func (s *submitter) parseContainerLogLines(unparsed <-chan rawLine, parsed chan<- singleLine) {
|
|
for {
|
|
raw, ok := <-unparsed
|
|
if !ok {
|
|
close(parsed)
|
|
return
|
|
}
|
|
|
|
line, err := parseSingleContainerLine(raw.line)
|
|
if err != nil {
|
|
promRecordPrefixParsingErr.WithLabelValues(raw.File).Add(1)
|
|
s.l.Error("parsing container log line", zap.Error(err), zap.String("file", raw.File))
|
|
}
|
|
|
|
line.mLog.RecordMetadata = raw.RecordMetadata
|
|
parsed <- line
|
|
}
|
|
}
|
|
|
|
// assumes all lines are from same file
|
|
func (s *submitter) parseLines(bufferLimitBytes int, unparsed <-chan rawLine, parsed chan<- mLog) {
|
|
lines := make(chan singleLine)
|
|
go s.parseContainerLogLines(unparsed, lines)
|
|
|
|
var wg sync.WaitGroup
|
|
wg.Add(2)
|
|
|
|
stdOut, stdErr := make(chan singleLine), make(chan singleLine)
|
|
go func() {
|
|
s.parseStdChannel(bufferLimitBytes, stdOut, parsed)
|
|
wg.Done()
|
|
}()
|
|
go func() {
|
|
s.parseStdChannel(bufferLimitBytes, stdErr, parsed)
|
|
wg.Done()
|
|
}()
|
|
|
|
// split stdout and stderr
|
|
for {
|
|
line, ok := <-lines
|
|
if !ok {
|
|
close(stdOut)
|
|
close(stdErr)
|
|
wg.Wait()
|
|
close(parsed)
|
|
return
|
|
}
|
|
|
|
if line.StdErr {
|
|
stdErr <- line
|
|
} else {
|
|
stdOut <- line
|
|
}
|
|
}
|
|
}
|
|
|
|
// partial is ended with full
|
|
|
|
func (s *submitter) parseStdChannel(bufferLimitBytes int, lines <-chan singleLine, parsed chan<- mLog) {
|
|
var firstTime time.Time
|
|
var buffer []byte
|
|
|
|
flush := func(last *mLog) {
|
|
parsed <- mLog{
|
|
RecordMetadata: last.RecordMetadata,
|
|
StdErr: last.StdErr,
|
|
|
|
ContainerTime: firstTime,
|
|
Content: parseRecord(buffer),
|
|
}
|
|
buffer = nil
|
|
}
|
|
|
|
for {
|
|
line, ok := <-lines
|
|
if !ok {
|
|
// discard any partial lines without end delimiter (full line)
|
|
return
|
|
}
|
|
|
|
if len(buffer) == 0 {
|
|
firstTime = line.ContainerTime
|
|
}
|
|
|
|
buffer = append(buffer, line.line...)
|
|
|
|
if len(buffer) > bufferLimitBytes {
|
|
buffer = nil
|
|
promRecordDroppedTooLarge.WithLabelValues(line.File).Add(1)
|
|
s.l.Warn("dropped record: too large", zap.Int("cap_bytes", bufferLimitBytes))
|
|
continue
|
|
}
|
|
|
|
if !line.isPartial {
|
|
flush(&line.mLog)
|
|
}
|
|
}
|
|
// TODO: flush last
|
|
// use time of first
|
|
// metadata of last
|
|
// //
|
|
// for {
|
|
// }
|
|
|
|
// promRecordDroppedTooLarge
|
|
}
|
|
|
|
func parseRecord(buffer []byte) any {
|
|
// TODO: json parser
|
|
return string(buffer)
|
|
}
|