it is shipping and parsing

This commit is contained in:
rasmus 2022-11-06 03:43:18 +02:00
parent 36b971a930
commit f5240274ec
4 changed files with 107 additions and 31 deletions

1
.gitignore vendored
View File

@ -1,3 +1,4 @@
*.swp
.vscode
logs/
__debug_bin

View File

@ -8,7 +8,7 @@ import (
"net/url"
"os"
"os/signal"
"path"
"path/filepath"
"runtime"
"strings"
"sync"
@ -116,6 +116,8 @@ var App = &cli.App{
if err != nil {
l.Fatal("parsing URI for mongo database name", zap.Error(err))
}
uriParsed.Path = uriParsed.Path[1:] // remove leading slash
if uriParsed.Path == "" {
l.Fatal("mongo database name must be set in mongo URI")
}
@ -151,13 +153,12 @@ var App = &cli.App{
continue
}
absPath := path.Join(logDir, event.Name)
promFilesRead.Add(1)
l.Debug("digesting new file", zap.String("name", absPath))
l.Debug("digesting new file", zap.String("name", event.Name))
wg.Add(1)
go func() {
state.shipFile(ctx.Context, absPath, ctx.Bool("delete-after-read"))
state.shipFile(ctx.Context, event.Name, ctx.Bool("delete-after-read"))
wg.Done()
}()
@ -171,6 +172,8 @@ var App = &cli.App{
}
}()
// TODO: simulate create events files for current files
err = watcher.Add(logDir)
if err != nil {
promErrWatching.Add(1)
@ -222,3 +225,36 @@ func errAppend(a, b error) error {
}
return fmt.Errorf("%e; %e", a, b)
}
type logMeta struct {
podName string
podNamespace string
containerName string
containerId string
}
func parseLogName(name string) (m logMeta, ok bool) {
name = filepath.Base(name)
// https://github.com/kubernetes/design-proposals-archive/blob/8da1442ea29adccea40693357d04727127e045ed/node/kubelet-cri-logging.md
// <pod_name>_<pod_namespace>_<container_name>-<container_id>.log`
m.podName, name, ok = strings.Cut(name, "_")
if !ok {
return
}
m.podNamespace, name, ok = strings.Cut(name, "_")
if !ok {
return
}
m.containerName, name, ok = strings.Cut(name, "-")
if !ok {
return
}
m.containerId = strings.TrimSuffix(name, ".log")
return m, true
}

View File

@ -28,6 +28,11 @@ var (
Name: "queue_dropped_count",
Help: "Items ready to be batched and sent to mongo, but dropped due to full queue",
}, []string{"filename"})
promLineParsingErr = promauto.NewCounterVec(prom.CounterOpts{
Subsystem: "shipper",
Name: "lines_parsing_err_count",
Help: "Errors while parsing log line suffixes",
}, []string{"filename"})
promShipperQueueItems = promauto.NewHistogramVec(prom.HistogramOpts{
Subsystem: "shipper",
Name: "queue_items",
@ -137,20 +142,30 @@ func (s *submitter) senderRoutine(name string, batched <-chan []mLog, synced *bo
// when editing, also edit toBson(); all bson.D (and bson.M) uses
type mLog struct {
HostInfo HostInfo
Filename string
Offset int64 // byte offset where log entry ends at
Content string // TODO:
Time time.Time
HostInfo HostInfo
File string
Offset int64 // byte offset where log entry ends at
Content string // TODO:
ShipTime time.Time
CollectTime time.Time
StdErr bool
Format string // F or P TODO: what does it mean? Is there a well-defined log format for cri-o?
}
// not using marshal, since it is <0.1x performance
func (l *mLog) toBson() bson.M {
return bson.M{
"host_info": l.HostInfo,
"filename": l.Filename,
"offset": l.Offset,
"content": l.Content,
"time": l.Time,
"host_info": bson.M{
"id": l.HostInfo.id,
"name": l.HostInfo.name,
"arch": l.HostInfo.arch,
},
"filename": l.File,
"offset": l.Offset,
"content": l.Content,
"ship_time": l.ShipTime,
"container_time": l.CollectTime,
"stderr": l.StdErr,
"format": l.Format,
}
}

View File

@ -7,6 +7,7 @@ import (
"io"
"os"
"path/filepath"
"strings"
"sync"
"time"
@ -83,8 +84,9 @@ func (s *submitter) shipFile(ctx context.Context, name string, deleteAfterRead b
}
func (s *submitter) shipFileRoutine(ctx context.Context, name string, deleteOk func() bool, sendQueue chan<- mLog, sigCatchupped chan<- struct{}) error {
// TODO: better way for respecting ?killing sender for retry
baseName := filepath.Base(name)
// TODO: better way for respecting ?killing sender for retry
for {
if len(sendQueue) == 0 {
break
@ -92,34 +94,31 @@ func (s *submitter) shipFileRoutine(ctx context.Context, name string, deleteOk f
time.Sleep(time.Second)
}
// Initialize in case of new file
log := mLog{
HostInfo: s.hostInfo,
Filename: name,
}
// get files with offset
offsetResult, err := mongoWithErr(s.db.FindOne(mongoTimeoutCtx(ctx),
bson.D{{Key: "hostinfo.id", Value: s.hostInfo.id}, {Key: "filename", Value: name}},
bson.D{{Key: "hostinfo.id", Value: s.hostInfo.id}, {Key: "file", Value: baseName}},
&mongoOpt.FindOneOptions{Sort: bson.D{{Key: "offset", Value: -1}}}, // sort descending (get largest)
))
if err != nil && !errors.Is(err, mongo.ErrNoDocuments) {
return fmt.Errorf("retrieving mongo offset: %w", err)
}
// offsetResult.DecodeBytes() //TODO: check for extra fields
var log mLog
if err := offsetResult.Decode(&log); err != nil && !errors.Is(err, mongo.ErrNoDocuments) {
return fmt.Errorf("decoding mongo offset: %w", err)
}
fi, err := os.Stat(log.Filename)
fi, err := os.Stat(name)
if err != nil {
return fmt.Errorf("getting original file size")
}
startSize := fi.Size()
// TODO: use inotify for file, and end with file deletion or replacement
lineChan, errChan, err := util.TailFile(ctx, log.Filename, log.Offset, io.SeekStart)
lineChan, errChan, err := util.TailFile(ctx, name, log.Offset, io.SeekStart)
if err != nil {
return fmt.Errorf("tailing file: %w", err)
}
@ -137,22 +136,47 @@ func (s *submitter) shipFileRoutine(ctx context.Context, name string, deleteOk f
}
}
var collectTime time.Time
var stdErr, format, log string
split := strings.SplitN(line.String, " ", 4)
if len(split) != 4 {
log = line.String
promLineParsingErr.WithLabelValues(baseName).Add(1)
} else {
stdErr, format, log = split[1], split[2], split[3]
collectTime, err = time.Parse(time.RFC3339Nano, split[0])
if err != nil {
promLineParsingErr.WithLabelValues(baseName).Add(1)
}
}
select {
case sendQueue <- mLog{
HostInfo: s.hostInfo,
Filename: *line.Filename,
File: baseName,
Offset: line.EndOffset,
Content: line.String,
ShipTime: time.Now(),
CollectTime: collectTime,
StdErr: stdErr == "stderr", // or stdout
Format: format,
Content: log,
}:
default:
promShipperDropped.WithLabelValues(*line.Filename).Add(1)
promShipperDropped.WithLabelValues(baseName).Add(1)
}
// no new lines
default:
if deleteOk() {
return os.Remove(name)
}
// TODO: ensure we don't instantly jump here
// default:
// if deleteOk() {
// return os.Remove(name)
// }
}
}
}