logmower-shipper/pkg/file/file.go

159 lines
4.0 KiB
Go
Raw Normal View History

2022-11-09 16:07:28 +00:00
package file
import (
"context"
"errors"
"fmt"
"io"
"log"
"os"
2022-11-11 15:09:12 +00:00
"time"
2022-11-09 16:07:28 +00:00
"git.k-space.ee/k-space/logmower-shipper/pkg/lines"
m "git.k-space.ee/k-space/logmower-shipper/pkg/mongo"
"git.k-space.ee/k-space/logmower-shipper/pkg/sender"
"github.com/jtagcat/util/retry"
2022-11-12 22:19:19 +00:00
"github.com/jtagcat/util/std"
"github.com/jtagcat/util/tail"
2022-11-09 16:07:28 +00:00
"go.mongodb.org/mongo-driver/bson"
"go.mongodb.org/mongo-driver/mongo"
mongoOpt "go.mongodb.org/mongo-driver/mongo/options"
"k8s.io/apimachinery/pkg/util/wait"
)
2022-11-11 19:12:31 +00:00
var SendQueueLimit = 1024
2022-11-09 16:07:28 +00:00
2022-11-11 15:09:12 +00:00
// wrapper to force copying before use
func backoff() wait.Backoff {
return wait.Backoff{
Duration: 2 * time.Second,
Factor: 1.5,
Jitter: 0.1,
Cap: 30 * time.Second,
}
}
2022-11-09 16:07:28 +00:00
type File struct {
*m.File
MetricsName string // filepath.Base()
}
// TODO: caller could call duplicate shipFile of same name on file replace: sends might not work properly
func (f File) Process(ctx context.Context, db *mongo.Collection) {
_ = retry.OnErrorManagedBackoff(ctx, backoff(), func() (retryable bool, _ error) {
err := f.process(ctx, db)
if err != nil {
promFileErr.WithLabelValues(f.MetricsName).Add(1)
log.Printf("processing file %q: %e", f.MetricsName, err)
2022-11-09 16:07:28 +00:00
}
return true, err
2022-11-09 16:07:28 +00:00
})
}
func (f File) launchChannels(cancel func(), db *mongo.Collection) (_ chan<- lines.Raw, deferFn func()) {
lineOut := make(chan lines.Raw)
sctx, scancel := context.WithCancel(context.Background())
cancelAll := func() {
cancel()
scancel()
}
dbQueue := make(chan m.Record, SendQueueLimit)
go lines.RawC(lineOut).Process(sctx, dbQueue)
2022-11-12 22:19:19 +00:00
waitBatchSend := std.GoWg(func() {
sender.Queue(dbQueue).Sender(db, f.MetricsName, cancelAll)
})
return lineOut, func() {
close(lineOut)
waitBatchSend()
}
}
2022-11-09 16:07:28 +00:00
// use submitter(), don't use directly
func (f File) process(ctx context.Context, db *mongo.Collection) error {
2022-11-09 16:07:28 +00:00
lFile := lines.File(f) // file.File, but avoiding import cycle
sctx, cancel := context.WithCancel(ctx)
lineOut, dfn := f.launchChannels(cancel, db)
defer dfn()
2022-11-09 16:07:28 +00:00
// get files with offset
2022-11-11 15:09:12 +00:00
offsetResult, _ := mongoWithErr(db.FindOne(m.GlobalTimeout(sctx),
2022-11-09 16:07:28 +00:00
bson.D{{Key: m.RecordKeyHostId, Value: f.Host.Id}, {Key: m.RecordKeyFilePath, Value: f.Path}},
&mongoOpt.FindOneOptions{Sort: bson.D{{Key: m.RecordKeyOffset, Value: -1}}}, // sort descending (get largest)
))
offsetResultBytes, err := offsetResult.DecodeBytes()
if err != nil && !errors.Is(err, mongo.ErrNoDocuments) {
return fmt.Errorf("retrieving offset from database: %w", err)
}
dbOffset := m.RecordOffsetFromBson(&offsetResultBytes)
// for promFileChatcupDone
2022-11-09 16:07:28 +00:00
fi, err := os.Stat(f.Path)
if err != nil {
return fmt.Errorf("getting original file size: %w", err)
}
startSize := fi.Size()
2022-11-12 22:19:19 +00:00
lineIn, errChan, err := tail.New(sctx, f.Path, dbOffset, io.SeekStart)
2022-11-09 16:07:28 +00:00
if err != nil {
return fmt.Errorf("tailing file: %w", err)
}
promFileInitialSeekSkipped.WithLabelValues(f.MetricsName).Set(float64(dbOffset))
2022-11-09 16:07:28 +00:00
var catchUpped bool
promFileCatchupDone.WithLabelValues(f.MetricsName).Set(0)
for {
select {
case err := <-errChan:
return fmt.Errorf("tailing file: %w", err)
case line, ok := <-lineIn:
2022-11-09 16:07:28 +00:00
if !ok {
return nil
}
promFileLineSize.WithLabelValues(f.MetricsName).Observe(float64(len(line.Bytes)))
if !catchUpped {
catchUpped = line.EndOffset >= startSize
if catchUpped {
promFileCatchupDone.WithLabelValues(f.MetricsName).Set(1)
}
}
if len(line.Bytes) == 0 {
continue
}
select {
case <-sctx.Done():
case lineOut <- lines.Raw{
2022-11-09 16:07:28 +00:00
File: &lFile,
Offset: line.EndOffset,
B: line.Bytes,
}:
2022-11-09 16:07:28 +00:00
}
}
}
}
func mongoWithErr[t interface{ Err() error }](mongoWrap t) (t, error) {
return mongoWrap, mongoWrap.Err()
}
// func JitterUntilCancelWithContext(pctx context.Context, f func(context.Context, context.CancelFunc), period time.Duration, jitterFactor float64, sliding bool) {
// ctx, cancel := context.WithCancel(pctx)
// wait.JitterUntil(func() { f(ctx, cancel) }, period, jitterFactor, sliding, ctx.Done())
// }