logmower-shipper/cmd/line.go

176 lines
3.6 KiB
Go

package logmower
import (
"bytes"
"fmt"
"sync"
"time"
prom "github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/client_golang/prometheus/promauto"
"go.uber.org/zap"
)
var (
promRecordPrefixParsingErr = promauto.NewCounterVec(prom.CounterOpts{
Namespace: PrometheusPrefix,
Subsystem: "record",
Name: "parsing_errors",
Help: "Errors while parsing log line prefixes",
}, []string{"filename"})
promRecordDroppedTooLarge = promauto.NewCounterVec(prom.CounterOpts{
Namespace: PrometheusPrefix,
// Subsystem: "record",
Name: "dropped_lines", // "dropped",
Help: "Records dropped due to being too large",
}, []string{"filename"})
)
type (
rawLine struct {
recordMetadata
line []byte
}
singleLine struct {
mLog
line []byte
isPartial bool // P or F
}
)
func parseSingleContainerLine(line []byte) (u singleLine, err error) {
split := bytes.SplitN(line, []byte(" "), 4)
if len(split) != 4 {
u.line = line
return u, fmt.Errorf("expected at least 3 spaces in container log line, got %d", len(split)-1)
}
u.line = split[3]
u.StdErr = string(split[1]) == "stderr" // or stdout
switch string(split[2]) {
case "P":
u.isPartial = true
case "F":
default:
return u, fmt.Errorf("partial indicator must be 'P' or 'F', not %q", split[2])
}
u.ContainerTime, err = time.Parse(time.RFC3339Nano, string(split[0]))
return u, err
}
func (s *submitter) parseContainerLogLines(unparsed <-chan rawLine, parsed chan<- singleLine) {
for {
raw, ok := <-unparsed
if !ok {
close(parsed)
return
}
line, err := parseSingleContainerLine(raw.line)
if err != nil {
promRecordPrefixParsingErr.WithLabelValues(raw.File).Add(1)
s.l.Error("parsing container log line", zap.Error(err), zap.String("file", raw.File))
}
line.mLog.recordMetadata = raw.recordMetadata
parsed <- line
}
}
// assumes all lines are from same file
func (s *submitter) parseLines(bufferLimitBytes int, unparsed <-chan rawLine, parsed chan<- mLog) {
lines := make(chan singleLine)
go s.parseContainerLogLines(unparsed, lines)
var wg sync.WaitGroup
wg.Add(2)
stdOut, stdErr := make(chan singleLine), make(chan singleLine)
go func() {
s.parseStdChannel(bufferLimitBytes, stdOut, parsed)
wg.Done()
}()
go func() {
s.parseStdChannel(bufferLimitBytes, stdErr, parsed)
wg.Done()
}()
// split stdout and stderr
for {
line, ok := <-lines
if !ok {
close(stdOut)
close(stdErr)
wg.Wait()
close(parsed)
return
}
if line.StdErr {
stdErr <- line
} else {
stdOut <- line
}
}
}
// partial is ended with full
func (s *submitter) parseStdChannel(bufferLimitBytes int, lines <-chan singleLine, parsed chan<- mLog) {
var firstTime time.Time
var buffer []byte
flush := func(last *mLog) {
parsed <- mLog{
recordMetadata: last.recordMetadata,
StdErr: last.StdErr,
ContainerTime: firstTime,
Content: parseRecord(buffer),
}
buffer = nil
}
for {
line, ok := <-lines
if !ok {
// discard any partial lines without end delimiter (full line)
return
}
if len(buffer) == 0 {
firstTime = line.ContainerTime
}
buffer = append(buffer, line.line...)
if len(buffer) > bufferLimitBytes {
buffer = nil
promRecordDroppedTooLarge.WithLabelValues(line.File).Add(1)
s.l.Warn("dropped record: too large", zap.Int("cap_bytes", bufferLimitBytes))
continue
}
if !line.isPartial {
flush(&line.mLog)
}
}
// TODO: flush last
// use time of first
// metadata of last
// //
// for {
// }
// promRecordDroppedTooLarge
}
func parseRecord(buffer []byte) any {
// TODO: json parser
return string(buffer)
}