retry within mongo sender
This commit is contained in:
		| @@ -10,6 +10,7 @@ import ( | ||||
| 	"github.com/jtagcat/util" | ||||
| 	"go.mongodb.org/mongo-driver/mongo" | ||||
| 	"go.mongodb.org/mongo-driver/mongo/options" | ||||
| 	"k8s.io/apimachinery/pkg/util/wait" | ||||
| ) | ||||
|  | ||||
| var ( | ||||
| @@ -21,6 +22,16 @@ const ( | ||||
| 	MaxBatchTime = 5 * time.Second | ||||
| ) | ||||
|  | ||||
| // wrapper to force copying before use | ||||
| func backoff() wait.Backoff { | ||||
| 	return wait.Backoff{ | ||||
| 		Duration: 2 * time.Second, | ||||
| 		Factor:   4, | ||||
| 		Jitter:   0.2, | ||||
| 		Cap:      2 * time.Minute, | ||||
| 	} | ||||
| } | ||||
|  | ||||
| type Queue <-chan m.Record | ||||
|  | ||||
| func (queue Queue) Sender(db *mongo.Collection, metricsFilename string, cancelOnError func()) { | ||||
| @@ -60,31 +71,36 @@ func (queue Queue) Sender(db *mongo.Collection, metricsFilename string, cancelOn | ||||
|  | ||||
| 		promShipperSynced.WithLabelValues(metricsFilename).Set(0) | ||||
|  | ||||
| 		result, err := insertManyWithSimulate(db, batch) | ||||
| 		err := util.RetryOnError(backoff(), func() (_ bool, _ error) { | ||||
| 			result, err := insertManyWithSimulate(db, batch) | ||||
|  | ||||
| 		var succeedCount int | ||||
| 		if result != nil { | ||||
| 			succeedCount = len(result.InsertedIDs) | ||||
| 		} | ||||
| 		promShipperDbSent.WithLabelValues(metricsFilename).Add(float64(succeedCount)) | ||||
| 			var succeedCount int | ||||
| 			if result != nil { | ||||
| 				succeedCount = len(result.InsertedIDs) | ||||
| 			} | ||||
| 			promShipperDbSent.WithLabelValues(metricsFilename).Add(float64(succeedCount)) | ||||
|  | ||||
| 		if err != nil { | ||||
| 			promShipperDbSendError.WithLabelValues(metricsFilename).Add(1) | ||||
|  | ||||
| 			if succeedCount == len(batch) { | ||||
| 				log.Printf("all insertions in batch were successful, yet failure in database: %e", err) | ||||
|  | ||||
| 				cancelOnError() | ||||
| 			if err == nil { | ||||
| 				return | ||||
| 			} | ||||
| 			promShipperDbSendError.WithLabelValues(metricsFilename).Add(1) | ||||
|  | ||||
| 			firstFailed := &batch[succeedCount] // (len-1)+1 | ||||
| 			log.Printf("failure in inserting %q record with offset %d to database: %e", | ||||
| 				firstFailed.Path, firstFailed.Offset, err) | ||||
| 			batch = batch[succeedCount:] | ||||
|  | ||||
| 			if succeedCount == len(batch) { | ||||
| 				return true, fmt.Errorf("all insertions in batch were successful, yet failure in database: %w", err) | ||||
| 			} | ||||
|  | ||||
| 			return true, fmt.Errorf("insert record with offset %d to database: %w", | ||||
| 				batch[0].Offset, err) | ||||
| 		}) | ||||
| 		if err != nil { | ||||
| 			log.Printf("batch insert %q to mongo: %e", metricsFilename, err) | ||||
|  | ||||
| 			cancelOnError() | ||||
| 			return | ||||
| 		} | ||||
|  | ||||
| 	} | ||||
| } | ||||
|  | ||||
|   | ||||
		Reference in New Issue
	
	Block a user