From f26438f32a8ee09b7605d740b55f9503d9c3c344 Mon Sep 17 00:00:00 2001 From: rasmus Date: Wed, 9 Nov 2022 19:56:44 +0200 Subject: [PATCH] implement context cancellation on db error --- pkg/file/file.go | 72 ++++++++++++++++++++++++-------------------- pkg/lines/lines.go | 46 ++++++++++++++++++---------- pkg/sender/sender.go | 32 +++++++++++++++----- 3 files changed, 94 insertions(+), 56 deletions(-) diff --git a/pkg/file/file.go b/pkg/file/file.go index 53fc257..936e6d6 100644 --- a/pkg/file/file.go +++ b/pkg/file/file.go @@ -7,7 +7,6 @@ import ( "io" "log" "os" - "time" "git.k-space.ee/k-space/logmower-shipper/pkg/globals" "git.k-space.ee/k-space/logmower-shipper/pkg/lines" @@ -29,20 +28,8 @@ type File struct { // TODO: caller could call duplicate shipFile of same name on file replace: sends might not work properly func (f File) Process(ctx context.Context, db *mongo.Collection, recordLimitBytes int) { - lineChan := make(chan lines.Raw) - defer close(lineChan) - - dbQueue := make(chan m.Record, SendQueueLimit) - go lines.RawC(lineChan).Process(recordLimitBytes, dbQueue) - - waitGo := util.GoWg(func() { - sender.Queue(dbQueue).Sender(db, f.MetricsName) - }) - defer waitGo() - - // TODO: better way to kill or wait for sendQueue before retrying (or duplicates?) _ = wait.ManagedExponentialBackoffWithContext(ctx, globals.Backoff(), func() (done bool, _ error) { - err := f.trySubmit(ctx, db, lineChan) + err := f.process(ctx, db, recordLimitBytes) if err == nil { return true, nil } @@ -55,20 +42,39 @@ func (f File) Process(ctx context.Context, db *mongo.Collection, recordLimitByte }) } -// use submitter(), don't use directly -func (f File) trySubmit(ctx context.Context, db *mongo.Collection, sendQueue chan<- lines.Raw) error { - lFile := lines.File(f) // file.File, but avoiding import cycle +func (f File) launchChannels(cancel func(), db *mongo.Collection, recordLimitBytes int) (_ chan<- lines.Raw, deferFn func()) { + lineOut := make(chan lines.Raw) - // TODO: better way for respecting ?killing sender for retry - for { - if len(sendQueue) == 0 { - break - } - time.Sleep(time.Second) + sctx, scancel := context.WithCancel(context.Background()) + cancelAll := func() { + cancel() + scancel() } + dbQueue := make(chan m.Record, SendQueueLimit) + go lines.RawC(lineOut).Process(sctx, recordLimitBytes, dbQueue) + + waitBatchSend := util.GoWg(func() { + sender.Queue(dbQueue).Sender(db, f.MetricsName, cancelAll) + }) + + return lineOut, func() { + close(lineOut) + waitBatchSend() + } +} + +// use submitter(), don't use directly +func (f File) process(ctx context.Context, db *mongo.Collection, recordLimitBytes int) error { + lFile := lines.File(f) // file.File, but avoiding import cycle + + sctx, cancel := context.WithCancel(ctx) + + lineOut, dfn := f.launchChannels(cancel, db, recordLimitBytes) + defer dfn() + // get files with offset - offsetResult, _ := mongoWithErr(db.FindOne(globals.MongoTimeout(ctx), + offsetResult, _ := mongoWithErr(db.FindOne(globals.MongoTimeout(sctx), bson.D{{Key: m.RecordKeyHostId, Value: f.Host.Id}, {Key: m.RecordKeyFilePath, Value: f.Path}}, &mongoOpt.FindOneOptions{Sort: bson.D{{Key: m.RecordKeyOffset, Value: -1}}}, // sort descending (get largest) )) @@ -77,24 +83,20 @@ func (f File) trySubmit(ctx context.Context, db *mongo.Collection, sendQueue cha if err != nil && !errors.Is(err, mongo.ErrNoDocuments) { return fmt.Errorf("retrieving offset from database: %w", err) } - dbOffset := m.RecordOffsetFromBson(&offsetResultBytes) + // for promFileChatcupDone fi, err := os.Stat(f.Path) if err != nil { return fmt.Errorf("getting original file size: %w", err) } startSize := fi.Size() - sctx, cancel := context.WithCancel(ctx) - defer cancel() - - promFileInitialSeekSkipped.WithLabelValues(f.MetricsName).Set(float64(dbOffset)) - - lineChan, errChan, err := util.TailFile(sctx, f.Path, dbOffset, io.SeekStart) + lineIn, errChan, err := util.TailFile(sctx, f.Path, dbOffset, io.SeekStart) if err != nil { return fmt.Errorf("tailing file: %w", err) } + promFileInitialSeekSkipped.WithLabelValues(f.MetricsName).Set(float64(dbOffset)) var catchUpped bool promFileCatchupDone.WithLabelValues(f.MetricsName).Set(0) @@ -104,7 +106,7 @@ func (f File) trySubmit(ctx context.Context, db *mongo.Collection, sendQueue cha case err := <-errChan: return fmt.Errorf("tailing file: %w", err) - case line, ok := <-lineChan: + case line, ok := <-lineIn: if !ok { return nil } @@ -123,11 +125,15 @@ func (f File) trySubmit(ctx context.Context, db *mongo.Collection, sendQueue cha continue } - sendQueue <- lines.Raw{ + select { + case <-sctx.Done(): + + case lineOut <- lines.Raw{ File: &lFile, Offset: line.EndOffset, B: line.Bytes, + }: } } } diff --git a/pkg/lines/lines.go b/pkg/lines/lines.go index 53f70b7..cfac720 100644 --- a/pkg/lines/lines.go +++ b/pkg/lines/lines.go @@ -1,6 +1,7 @@ package lines import ( + "context" "log" "sync" @@ -23,7 +24,7 @@ type ( ) // assumes all lines are from same file -func (unparsed RawC) Process(bufferLimitBytes int, parsed chan<- m.Record) { +func (unparsed RawC) Process(ctx context.Context, bufferLimitBytes int, parsed chan<- m.Record) { lines := make(chan singleLine) go unparsed.parse(lines) @@ -32,34 +33,42 @@ func (unparsed RawC) Process(bufferLimitBytes int, parsed chan<- m.Record) { stdOut, stdErr := make(chan singleLine), make(chan singleLine) go func() { - singleLines(stdOut).process(bufferLimitBytes, parsed) + singleLines(stdOut).process(ctx, bufferLimitBytes, parsed) wg.Done() }() go func() { - singleLines(stdErr).process(bufferLimitBytes, parsed) + singleLines(stdErr).process(ctx, bufferLimitBytes, parsed) wg.Done() }() + defer func() { + close(stdOut) + close(stdErr) + wg.Wait() + close(parsed) + }() + // split stdout and stderr for { - line, ok := <-lines - if !ok { - close(stdOut) - close(stdErr) - wg.Wait() - close(parsed) + select { + case <-ctx.Done(): return - } - if line.StdErr { - stdErr <- line - } else { - stdOut <- line + case line, ok := <-lines: + if !ok { + return + } + + if line.StdErr { + stdErr <- line + } else { + stdOut <- line + } } } } -func (lines singleLines) process(bufferLimitBytes int, parsed chan<- m.Record) { +func (lines singleLines) process(ctx context.Context, bufferLimitBytes int, parsed chan<- m.Record) { var firstMetadata *m.ParsedMetadata var buffer []byte @@ -86,12 +95,17 @@ func (lines singleLines) process(bufferLimitBytes int, parsed chan<- m.Record) { } if !line.partial { - parsed <- m.Record{ + select { + case <-ctx.Done(): + return + + case parsed <- m.Record{ File: line.File.File, Offset: line.Offset, String: string(buffer), ParsedMetadata: *firstMetadata, + }: } buffer = nil diff --git a/pkg/sender/sender.go b/pkg/sender/sender.go index a798eac..98e5fd5 100644 --- a/pkg/sender/sender.go +++ b/pkg/sender/sender.go @@ -9,6 +9,7 @@ import ( m "git.k-space.ee/k-space/logmower-shipper/pkg/mongo" "github.com/jtagcat/util" "go.mongodb.org/mongo-driver/mongo" + "go.mongodb.org/mongo-driver/mongo/options" ) const ( @@ -18,7 +19,7 @@ const ( type Queue <-chan m.Record -func (queue Queue) Sender(db *mongo.Collection, metricsFilename string) { +func (queue Queue) Sender(db *mongo.Collection, metricsFilename string, cancelOnError func()) { batched := make(chan []m.Record) // metrics for batcher and queue @@ -60,14 +61,31 @@ func (queue Queue) Sender(db *mongo.Collection, metricsFilename string) { batchBson = append(batchBson, b.ToBson()) } - result, err := db.InsertMany(globals.MongoTimeout(context.Background()), batchBson, nil) + tru := true + result, err := db.InsertMany(globals.MongoTimeout(context.Background()), batchBson, &options.InsertManyOptions{Ordered: &tru}) + + var succeedCount int + if result != nil { + succeedCount = len(result.InsertedIDs) + } + promShipperDbSent.WithLabelValues(metricsFilename).Add(float64(succeedCount)) + if err != nil { promShipperDbSendError.WithLabelValues(metricsFilename).Add(1) - log.Printf("failure in batch submit to database: %e", err) // TODO: add some selective retry here or something, better error handling - continue - } - promShipperDbSent.WithLabelValues(metricsFilename).Add(float64( - len(result.InsertedIDs))) + if succeedCount == len(batch) { + log.Printf("all insertions in batch were successful, yet failure in database: %e", err) + + cancelOnError() + return + } + + firstFailed := &batch[succeedCount] // (len-1)+1 + log.Printf("failure in inserting %q record with offset %d to database: %e", + firstFailed.Path, firstFailed.Offset, err) + + cancelOnError() + return + } } }