138 lines
3.7 KiB
Go
138 lines
3.7 KiB
Go
|
package logmower
|
||
|
|
||
|
import (
|
||
|
"context"
|
||
|
"errors"
|
||
|
"fmt"
|
||
|
"io"
|
||
|
"os"
|
||
|
"path/filepath"
|
||
|
|
||
|
"github.com/jtagcat/util"
|
||
|
prom "github.com/prometheus/client_golang/prometheus"
|
||
|
"github.com/prometheus/client_golang/prometheus/promauto"
|
||
|
"go.mongodb.org/mongo-driver/bson"
|
||
|
"go.mongodb.org/mongo-driver/mongo"
|
||
|
mongoOpt "go.mongodb.org/mongo-driver/mongo/options"
|
||
|
"go.uber.org/zap"
|
||
|
"k8s.io/apimachinery/pkg/util/wait"
|
||
|
)
|
||
|
|
||
|
var (
|
||
|
promCatchupDone = promauto.NewGaugeVec(prom.GaugeOpts{
|
||
|
Subsystem: "file",
|
||
|
Name: "catchupped",
|
||
|
Help: "File count where backlog has been sent; <= watcher_file_count",
|
||
|
}, []string{"filename"}) // TODO: rm filename?
|
||
|
promFileErr = promauto.NewCounterVec(prom.CounterOpts{
|
||
|
Subsystem: "file",
|
||
|
Name: "errors_count",
|
||
|
Help: "Error count for reading files",
|
||
|
}, []string{"filename"})
|
||
|
)
|
||
|
|
||
|
type (
|
||
|
submitter struct {
|
||
|
ctx context.Context
|
||
|
l *zap.Logger
|
||
|
|
||
|
hostInfo HostInfo
|
||
|
db *mongo.Collection
|
||
|
|
||
|
sendQueue chan mLog
|
||
|
}
|
||
|
)
|
||
|
|
||
|
func (s *submitter) shipFile(name string) {
|
||
|
baseName := filepath.Base(name)
|
||
|
|
||
|
sigCatchupped := make(chan struct{}, 1)
|
||
|
go func() {
|
||
|
<-sigCatchupped
|
||
|
close(sigCatchupped) // once
|
||
|
|
||
|
promCatchupDone.WithLabelValues(baseName).Add(1)
|
||
|
}()
|
||
|
|
||
|
// TODO: restarting before mongo sendQueue finishes will result in duplicates
|
||
|
wait.ManagedExponentialBackoffWithContext(s.ctx, defaultBackoff(), func() (done bool, _ error) {
|
||
|
if err := s.shipFileRoutine(name, sigCatchupped); err != nil {
|
||
|
promFileErr.WithLabelValues(baseName).Add(1)
|
||
|
s.l.Error("shipping file", zap.String("filename", baseName), zap.Error(err))
|
||
|
return false, nil // nil since we want it to loop and keep retrying indefinitely
|
||
|
}
|
||
|
return true, nil
|
||
|
})
|
||
|
}
|
||
|
|
||
|
func (s *submitter) shipFileRoutine(name string, sigCatchupped chan<- struct{}) error {
|
||
|
// Initialize in case of new file
|
||
|
log := mLog{
|
||
|
HostInfo: s.hostInfo,
|
||
|
Filename: name,
|
||
|
}
|
||
|
|
||
|
// get files with offset
|
||
|
offsetResult, err := mWithErr(s.db.FindOne(mongoTimeoutCtx(s.ctx),
|
||
|
bson.D{{Key: "hostinfo.id", Value: s.hostInfo.id}, {Key: "filename", Value: name}},
|
||
|
&mongoOpt.FindOneOptions{Sort: bson.D{{Key: "offset", Value: -1}}}, // sort descending (get largest)
|
||
|
))
|
||
|
if err != nil && !errors.Is(err, mongo.ErrNoDocuments) {
|
||
|
return fmt.Errorf("retrieving mongo offset: %w", err)
|
||
|
}
|
||
|
|
||
|
// offsetResult.DecodeBytes() //TODO: check for extra fields
|
||
|
|
||
|
if err := offsetResult.Decode(&log); err != nil && !errors.Is(err, mongo.ErrNoDocuments) {
|
||
|
return fmt.Errorf("decoding mongo offset: %w", err)
|
||
|
}
|
||
|
|
||
|
fi, err := os.Stat(log.Filename)
|
||
|
if err != nil {
|
||
|
return fmt.Errorf("getting original file size")
|
||
|
}
|
||
|
startSize := fi.Size()
|
||
|
|
||
|
// TODO: use inotify for file, and end with file deletion or replacement
|
||
|
lineChan, errChan, err := util.TailFile(s.ctx, log.Filename, log.Offset, io.SeekStart)
|
||
|
if err != nil {
|
||
|
return fmt.Errorf("tailing file: %w", err)
|
||
|
}
|
||
|
for {
|
||
|
select {
|
||
|
case err := <-errChan:
|
||
|
return fmt.Errorf("tailing file: %w", err)
|
||
|
case line := <-lineChan:
|
||
|
if line.EndOffset > startSize {
|
||
|
select {
|
||
|
case sigCatchupped <- struct{}{}:
|
||
|
default:
|
||
|
}
|
||
|
}
|
||
|
|
||
|
select {
|
||
|
case s.sendQueue <- mLog{
|
||
|
HostInfo: s.hostInfo,
|
||
|
Filename: *line.Filename,
|
||
|
Offset: line.EndOffset,
|
||
|
Content: line.String,
|
||
|
}:
|
||
|
default:
|
||
|
promShipperDropped.WithLabelValues(*line.Filename).Add(1)
|
||
|
}
|
||
|
// TODO:
|
||
|
// default:
|
||
|
// return nil
|
||
|
}
|
||
|
}
|
||
|
}
|
||
|
|
||
|
func mWithErr[t interface{ Err() error }](mongoWrap t) (t, error) {
|
||
|
return mongoWrap, mongoWrap.Err()
|
||
|
}
|
||
|
|
||
|
// func JitterUntilCancelWithContext(pctx context.Context, f func(context.Context, context.CancelFunc), period time.Duration, jitterFactor float64, sliding bool) {
|
||
|
// ctx, cancel := context.WithCancel(pctx)
|
||
|
// wait.JitterUntil(func() { f(ctx, cancel) }, period, jitterFactor, sliding, ctx.Done())
|
||
|
// }
|