logmower-shipper/cmd/submit.go

215 lines
6.0 KiB
Go
Raw Normal View History

package logmower
import (
"context"
"errors"
"fmt"
"io"
"os"
"path/filepath"
2022-11-06 01:43:18 +00:00
"strings"
2022-11-04 08:47:45 +00:00
"sync"
2022-11-05 23:45:19 +00:00
"time"
"github.com/jtagcat/util"
prom "github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/client_golang/prometheus/promauto"
"go.mongodb.org/mongo-driver/bson"
"go.mongodb.org/mongo-driver/mongo"
mongoOpt "go.mongodb.org/mongo-driver/mongo/options"
"go.uber.org/zap"
"k8s.io/apimachinery/pkg/util/wait"
)
var (
promFileInitialSeekSkipped = promauto.NewGaugeVec(prom.GaugeOpts{
Namespace: PrometheusPrefix,
// Subsystem: "file",
Name: "skipped_bytes",
Help: "Bytes skipped in file after discovering",
}, []string{"filename"})
promFileCatchupDone = promauto.NewGaugeVec(prom.GaugeOpts{
2022-11-06 15:02:49 +00:00
Namespace: PrometheusPrefix,
Subsystem: "file",
Name: "catchupped",
Help: "(0 or) 1 if initial backlog has been sent; (total <= watcher_file_count)",
}, []string{"filename"}) // TODO: rm filename?
promFileErr = promauto.NewCounterVec(prom.CounterOpts{
2022-11-06 15:02:49 +00:00
Namespace: PrometheusPrefix,
Subsystem: "file",
Name: "errors_count",
Help: "Errors while reading file",
}, []string{"filename"})
promFileLineSize = promauto.NewHistogramVec(prom.HistogramOpts{
Namespace: PrometheusPrefix,
// Subsystem: "file",
Name: "line_size_bytes",
Help: "Log line size in bytes",
Buckets: []float64{80, 160, 320, 640, 1280},
}, []string{"filename"})
promRecordPrefixParsingErr = promauto.NewCounterVec(prom.CounterOpts{
Namespace: PrometheusPrefix,
Subsystem: "record",
Name: "parsing_errors",
Help: "Errors while parsing log line prefixes",
}, []string{"filename"})
promRecordDroppedTooLarge = promauto.NewCounterVec(prom.CounterOpts{
Namespace: PrometheusPrefix,
// Subsystem: "record",
Name: "dropped_lines", // "dropped",
Help: "Records dropped due to being too large",
}, []string{"filename"})
)
type (
submitter struct {
2022-11-05 23:21:30 +00:00
l *zap.Logger
hostInfo HostInfo
db *mongo.Collection
2022-11-04 08:47:45 +00:00
sync.WaitGroup
}
)
2022-11-05 23:45:19 +00:00
const SendQueueLimit = 1024
// TODO: caller may call duplicate shipFile of same name on file replace; sends might not work properly
func (s *submitter) shipFile(ctx context.Context, name string, deleteAfterRead bool) {
baseName := filepath.Base(name)
2022-11-05 23:45:19 +00:00
sendChan := make(chan mLog, SendQueueLimit)
defer close(sendChan)
2022-11-05 23:45:19 +00:00
go s.sender(name, sendChan)
2022-11-05 23:45:19 +00:00
// TODO: better way to kill or wait for sendQueue before retrying (or duplicates?)
2022-11-05 23:21:30 +00:00
wait.ManagedExponentialBackoffWithContext(ctx, defaultBackoff(), func() (done bool, _ error) {
2022-11-05 23:45:19 +00:00
//
err := s.shipFileRoutine(ctx, name, sendChan)
2022-11-05 23:45:19 +00:00
if err == nil {
return true, nil
}
2022-11-05 23:45:19 +00:00
promFileErr.WithLabelValues(baseName).Add(1)
s.l.Error("shipping file", zap.String("filename", name), zap.Error(err))
2022-11-05 23:45:19 +00:00
return false, nil // nil since we want to loop and keep retrying indefinitely
})
}
func (s *submitter) shipFileRoutine(ctx context.Context, name string, sendQueue chan<- mLog) error {
2022-11-06 01:43:18 +00:00
baseName := filepath.Base(name)
2022-11-05 23:45:19 +00:00
2022-11-06 01:43:18 +00:00
// TODO: better way for respecting ?killing sender for retry
2022-11-05 23:45:19 +00:00
for {
if len(sendQueue) == 0 {
break
}
time.Sleep(time.Second)
}
// get files with offset
2022-11-05 23:45:19 +00:00
offsetResult, err := mongoWithErr(s.db.FindOne(mongoTimeoutCtx(ctx),
bson.D{{Key: mongoKeyHostInfoId, Value: s.hostInfo.id}, {Key: mongoKeyFileBasename, Value: baseName}},
&mongoOpt.FindOneOptions{Sort: bson.D{{Key: mongoKeyOffset, Value: -1}}}, // sort descending (get largest)
))
2022-11-06 01:43:18 +00:00
if err != nil && !errors.Is(err, mongo.ErrNoDocuments) {
return fmt.Errorf("retrieving offset from database: %w", err)
}
2022-11-06 01:43:18 +00:00
var log mLog
if err := offsetResult.Decode(&log); err != nil && !errors.Is(err, mongo.ErrNoDocuments) {
return fmt.Errorf("decoding offset from database: %w", err)
}
2022-11-06 01:43:18 +00:00
fi, err := os.Stat(name)
if err != nil {
2022-11-06 02:04:32 +00:00
return fmt.Errorf("getting original file size: %w", err)
}
startSize := fi.Size()
sctx, cancel := context.WithCancel(ctx)
defer cancel()
promFileInitialSeekSkipped.WithLabelValues(baseName).Set(float64(log.Offset))
lineChan, errChan, err := util.TailFile(sctx, name, log.Offset, io.SeekStart)
if err != nil {
return fmt.Errorf("tailing file: %w", err)
}
2022-11-05 23:45:19 +00:00
2022-11-06 14:09:28 +00:00
var catchUpped bool // cache
promFileCatchupDone.WithLabelValues(baseName).Set(0)
// TODO: partial line combining
// TODO: promRecordDroppedTooLarge
for {
select {
case err := <-errChan:
return fmt.Errorf("tailing file: %w", err)
2022-11-05 23:45:19 +00:00
case line, ok := <-lineChan:
if !ok {
return nil
}
promFileLineSize.WithLabelValues(baseName).Observe(float64(len(line.String)))
2022-11-06 14:09:28 +00:00
if !catchUpped {
2022-11-06 14:35:49 +00:00
catchUpped = line.EndOffset >= startSize
2022-11-06 14:09:28 +00:00
if catchUpped {
promFileCatchupDone.WithLabelValues(baseName).Set(1)
}
}
if line.String == "" {
continue
}
2022-11-06 01:43:18 +00:00
var collectTime time.Time
var stdErr, format, log string
split := strings.SplitN(line.String, " ", 4)
if len(split) != 4 {
log = line.String
promRecordPrefixParsingErr.WithLabelValues(baseName).Add(1)
s.l.Error("parsing line", zap.Error(fmt.Errorf("expected at least 3 spaces in container log")), zap.Int("got", len(split)-1), zap.String("file", name))
2022-11-06 01:43:18 +00:00
} else {
stdErr, format, log = split[1], split[2], split[3]
collectTime, err = time.Parse(time.RFC3339Nano, split[0])
if err != nil {
promRecordPrefixParsingErr.WithLabelValues(baseName).Add(1)
s.l.Error("parsing line time", zap.Error(err), zap.String("file", name))
2022-11-06 01:43:18 +00:00
}
}
2022-11-06 14:08:11 +00:00
sendQueue <- mLog{
HostInfo: s.hostInfo,
2022-11-06 01:43:18 +00:00
File: baseName,
Offset: line.EndOffset,
2022-11-06 01:43:18 +00:00
ShipTime: time.Now(),
CollectTime: collectTime,
StdErr: stdErr == "stderr", // or stdout
Format: format,
Content: log,
}
}
}
}
2022-11-05 23:45:19 +00:00
func mongoWithErr[t interface{ Err() error }](mongoWrap t) (t, error) {
return mongoWrap, mongoWrap.Err()
}
// func JitterUntilCancelWithContext(pctx context.Context, f func(context.Context, context.CancelFunc), period time.Duration, jitterFactor float64, sliding bool) {
// ctx, cancel := context.WithCancel(pctx)
// wait.JitterUntil(func() { f(ctx, cancel) }, period, jitterFactor, sliding, ctx.Done())
// }