submit: complete local sender refactor

This commit is contained in:
rasmus 2022-11-06 01:45:19 +02:00
parent 270737516c
commit 908ac5c52a
3 changed files with 50 additions and 21 deletions

View File

@ -57,6 +57,7 @@ var App = &cli.App{
&cli.StringFlag{Name: "log-directory", Usage: "Directory to watch for logs", Value: "/var/log/containers"}, &cli.StringFlag{Name: "log-directory", Usage: "Directory to watch for logs", Value: "/var/log/containers"},
&cli.IntFlag{Name: "max-record-size", Value: 128 * 1024, Usage: "Maximum record size in bytes"}, // TODO: &cli.IntFlag{Name: "max-record-size", Value: 128 * 1024, Usage: "Maximum record size in bytes"}, // TODO:
&cli.BoolFlag{Name: "normalize-log-level", Usage: "Normalize log.level values to Syslog defined keywords"}, // TODO: &cli.BoolFlag{Name: "normalize-log-level", Usage: "Normalize log.level values to Syslog defined keywords"}, // TODO:
&cli.BoolFlag{Name: "delete-after-read", Usage: "Delete log file when it is synced to mongo, and no new lines to read", Value: false},
// &cli.BoolFlag{Name: "parse-json"}, //TODO: // &cli.BoolFlag{Name: "parse-json"}, //TODO:
&cli.StringFlag{Category: "k8s metadata", Name: "pod-namespace", EnvVars: []string{"KUBE_POD_NAMESPACE"}}, // TODO: &cli.StringFlag{Category: "k8s metadata", Name: "pod-namespace", EnvVars: []string{"KUBE_POD_NAMESPACE"}}, // TODO:
&cli.StringFlag{Category: "k8s metadata", Name: "node-name", EnvVars: []string{"KUBE_NODE_NAME"}, Required: true}, &cli.StringFlag{Category: "k8s metadata", Name: "node-name", EnvVars: []string{"KUBE_NODE_NAME"}, Required: true},
@ -147,7 +148,7 @@ var App = &cli.App{
wg.Add(1) wg.Add(1)
go func() { go func() {
state.shipFile(ctx.Context, absPath) state.shipFile(ctx.Context, absPath, ctx.Bool("delete-after-read"))
wg.Done() wg.Done()
}() }()

View File

@ -41,7 +41,6 @@ var (
) )
const ( const (
SendQueueLimit = 1024
MaxBatchItems = 100 MaxBatchItems = 100
MaxBatchTime = time.Second MaxBatchTime = time.Second
) )

View File

@ -8,6 +8,7 @@ import (
"os" "os"
"path/filepath" "path/filepath"
"sync" "sync"
"time"
"github.com/jtagcat/util" "github.com/jtagcat/util"
prom "github.com/prometheus/client_golang/prometheus" prom "github.com/prometheus/client_golang/prometheus"
@ -43,7 +44,10 @@ type (
} }
) )
func (s *submitter) shipFile(ctx context.Context, name string) { const SendQueueLimit = 1024
// TODO: caller may call duplicate shipFile of same name on file replace; sends might not work properly
func (s *submitter) shipFile(ctx context.Context, name string, deleteAfterRead bool) {
baseName := filepath.Base(name) baseName := filepath.Base(name)
sigCatchupped := make(chan struct{}, 1) sigCatchupped := make(chan struct{}, 1)
@ -54,18 +58,40 @@ func (s *submitter) shipFile(ctx context.Context, name string) {
promCatchupDone.WithLabelValues(baseName).Add(1) promCatchupDone.WithLabelValues(baseName).Add(1)
}() }()
// TODO: restarting before mongo sendQueue finishes will result in duplicates sendChan := make(chan mLog, SendQueueLimit)
synced := s.sender(name, sendChan)
deleteOk := func() bool {
if deleteAfterRead && synced() {
return true
}
return false
}
// TODO: better way to kill or wait for mongo sendQueue before retrying (or duplicates?)
wait.ManagedExponentialBackoffWithContext(ctx, defaultBackoff(), func() (done bool, _ error) { wait.ManagedExponentialBackoffWithContext(ctx, defaultBackoff(), func() (done bool, _ error) {
if err := s.shipFileRoutine(ctx, name, sigCatchupped); err != nil { //
err := s.shipFileRoutine(ctx, name, deleteOk, sendChan, sigCatchupped)
if err == nil {
return true, nil
}
promFileErr.WithLabelValues(baseName).Add(1) promFileErr.WithLabelValues(baseName).Add(1)
s.l.Error("shipping file", zap.String("filename", baseName), zap.Error(err)) s.l.Error("shipping file", zap.String("filename", baseName), zap.Error(err))
return false, nil // nil since we want it to loop and keep retrying indefinitely return false, nil // nil since we want to loop and keep retrying indefinitely
}
return true, nil
}) })
} }
func (s *submitter) shipFileRoutine(ctx context.Context, name string, sigCatchupped chan<- struct{}) error { func (s *submitter) shipFileRoutine(ctx context.Context, name string, deleteOk func() bool, sendQueue chan<- mLog, sigCatchupped chan<- struct{}) error {
// TODO: better way for respecting ?killing sender for retry
for {
if len(sendQueue) == 0 {
break
}
time.Sleep(time.Second)
}
// Initialize in case of new file // Initialize in case of new file
log := mLog{ log := mLog{
HostInfo: s.hostInfo, HostInfo: s.hostInfo,
@ -73,7 +99,7 @@ func (s *submitter) shipFileRoutine(ctx context.Context, name string, sigCatchup
} }
// get files with offset // get files with offset
offsetResult, err := mWithErr(s.db.FindOne(mongoTimeoutCtx(ctx), offsetResult, err := mongoWithErr(s.db.FindOne(mongoTimeoutCtx(ctx),
bson.D{{Key: "hostinfo.id", Value: s.hostInfo.id}, {Key: "filename", Value: name}}, bson.D{{Key: "hostinfo.id", Value: s.hostInfo.id}, {Key: "filename", Value: name}},
&mongoOpt.FindOneOptions{Sort: bson.D{{Key: "offset", Value: -1}}}, // sort descending (get largest) &mongoOpt.FindOneOptions{Sort: bson.D{{Key: "offset", Value: -1}}}, // sort descending (get largest)
)) ))
@ -82,7 +108,6 @@ func (s *submitter) shipFileRoutine(ctx context.Context, name string, sigCatchup
} }
// offsetResult.DecodeBytes() //TODO: check for extra fields // offsetResult.DecodeBytes() //TODO: check for extra fields
if err := offsetResult.Decode(&log); err != nil && !errors.Is(err, mongo.ErrNoDocuments) { if err := offsetResult.Decode(&log); err != nil && !errors.Is(err, mongo.ErrNoDocuments) {
return fmt.Errorf("decoding mongo offset: %w", err) return fmt.Errorf("decoding mongo offset: %w", err)
} }
@ -98,10 +123,12 @@ func (s *submitter) shipFileRoutine(ctx context.Context, name string, sigCatchup
if err != nil { if err != nil {
return fmt.Errorf("tailing file: %w", err) return fmt.Errorf("tailing file: %w", err)
} }
for { for {
select { select {
case err := <-errChan: case err := <-errChan:
return fmt.Errorf("tailing file: %w", err) return fmt.Errorf("tailing file: %w", err)
case line := <-lineChan: case line := <-lineChan:
if line.EndOffset > startSize { if line.EndOffset > startSize {
select { select {
@ -111,8 +138,7 @@ func (s *submitter) shipFileRoutine(ctx context.Context, name string, sigCatchup
} }
select { select {
// TODO: use per-file batch senders for file deletion mongo synced, and better error handling; #3 case sendQueue <- mLog{
case s.sendQueue <- mLog{
HostInfo: s.hostInfo, HostInfo: s.hostInfo,
Filename: *line.Filename, Filename: *line.Filename,
Offset: line.EndOffset, Offset: line.EndOffset,
@ -121,14 +147,17 @@ func (s *submitter) shipFileRoutine(ctx context.Context, name string, sigCatchup
default: default:
promShipperDropped.WithLabelValues(*line.Filename).Add(1) promShipperDropped.WithLabelValues(*line.Filename).Add(1)
} }
// TODO:
// default: // no new lines
// return nil default:
if deleteOk() {
return os.Remove(name)
}
} }
} }
} }
func mWithErr[t interface{ Err() error }](mongoWrap t) (t, error) { func mongoWithErr[t interface{ Err() error }](mongoWrap t) (t, error) {
return mongoWrap, mongoWrap.Err() return mongoWrap, mongoWrap.Err()
} }