From d08f314cf128bd4c5173a9e1b916318b3df3d5d2 Mon Sep 17 00:00:00 2001 From: rasmus Date: Fri, 11 Nov 2022 17:52:30 +0200 Subject: [PATCH] retry within mongo sender --- go.mod | 2 +- pkg/sender/sender.go | 48 +++++++++++++++++++++++++++++--------------- vendor/modules.txt | 2 +- 3 files changed, 34 insertions(+), 18 deletions(-) diff --git a/go.mod b/go.mod index 20d6ba5..8a3114b 100644 --- a/go.mod +++ b/go.mod @@ -8,7 +8,7 @@ require ( github.com/prometheus/client_golang v1.14.0 github.com/urfave/cli/v2 v2.23.5 go.mongodb.org/mongo-driver v1.11.0 - k8s.io/apimachinery v0.25.3 + k8s.io/apimachinery v0.25.4 ) require ( diff --git a/pkg/sender/sender.go b/pkg/sender/sender.go index 7adc6c9..91c0023 100644 --- a/pkg/sender/sender.go +++ b/pkg/sender/sender.go @@ -10,6 +10,7 @@ import ( "github.com/jtagcat/util" "go.mongodb.org/mongo-driver/mongo" "go.mongodb.org/mongo-driver/mongo/options" + "k8s.io/apimachinery/pkg/util/wait" ) var ( @@ -21,6 +22,16 @@ const ( MaxBatchTime = 5 * time.Second ) +// wrapper to force copying before use +func backoff() wait.Backoff { + return wait.Backoff{ + Duration: 2 * time.Second, + Factor: 4, + Jitter: 0.2, + Cap: 2 * time.Minute, + } +} + type Queue <-chan m.Record func (queue Queue) Sender(db *mongo.Collection, metricsFilename string, cancelOnError func()) { @@ -60,31 +71,36 @@ func (queue Queue) Sender(db *mongo.Collection, metricsFilename string, cancelOn promShipperSynced.WithLabelValues(metricsFilename).Set(0) - result, err := insertManyWithSimulate(db, batch) + err := util.RetryOnError(backoff(), func() (_ bool, _ error) { + result, err := insertManyWithSimulate(db, batch) - var succeedCount int - if result != nil { - succeedCount = len(result.InsertedIDs) - } - promShipperDbSent.WithLabelValues(metricsFilename).Add(float64(succeedCount)) + var succeedCount int + if result != nil { + succeedCount = len(result.InsertedIDs) + } + promShipperDbSent.WithLabelValues(metricsFilename).Add(float64(succeedCount)) - if err != nil { - promShipperDbSendError.WithLabelValues(metricsFilename).Add(1) - - if succeedCount == len(batch) { - log.Printf("all insertions in batch were successful, yet failure in database: %e", err) - - cancelOnError() + if err == nil { return } + promShipperDbSendError.WithLabelValues(metricsFilename).Add(1) - firstFailed := &batch[succeedCount] // (len-1)+1 - log.Printf("failure in inserting %q record with offset %d to database: %e", - firstFailed.Path, firstFailed.Offset, err) + batch = batch[succeedCount:] + + if succeedCount == len(batch) { + return true, fmt.Errorf("all insertions in batch were successful, yet failure in database: %w", err) + } + + return true, fmt.Errorf("insert record with offset %d to database: %w", + batch[0].Offset, err) + }) + if err != nil { + log.Printf("batch insert %q to mongo: %e", metricsFilename, err) cancelOnError() return } + } } diff --git a/vendor/modules.txt b/vendor/modules.txt index 4e09d09..80eda09 100644 --- a/vendor/modules.txt +++ b/vendor/modules.txt @@ -167,7 +167,7 @@ google.golang.org/protobuf/runtime/protoiface google.golang.org/protobuf/runtime/protoimpl google.golang.org/protobuf/types/descriptorpb google.golang.org/protobuf/types/known/timestamppb -# k8s.io/apimachinery v0.25.3 => github.com/jtagcat/kubernetes/staging/src/k8s.io/apimachinery v0.0.0-20221027124836-581f57977fff +# k8s.io/apimachinery v0.25.4 => github.com/jtagcat/kubernetes/staging/src/k8s.io/apimachinery v0.0.0-20221027124836-581f57977fff ## explicit; go 1.19 k8s.io/apimachinery/pkg/util/runtime k8s.io/apimachinery/pkg/util/wait