implement context cancellation on db error
This commit is contained in:
		| @@ -7,7 +7,6 @@ import ( | |||||||
| 	"io" | 	"io" | ||||||
| 	"log" | 	"log" | ||||||
| 	"os" | 	"os" | ||||||
| 	"time" |  | ||||||
|  |  | ||||||
| 	"git.k-space.ee/k-space/logmower-shipper/pkg/globals" | 	"git.k-space.ee/k-space/logmower-shipper/pkg/globals" | ||||||
| 	"git.k-space.ee/k-space/logmower-shipper/pkg/lines" | 	"git.k-space.ee/k-space/logmower-shipper/pkg/lines" | ||||||
| @@ -29,20 +28,8 @@ type File struct { | |||||||
|  |  | ||||||
| // TODO: caller could call duplicate shipFile of same name on file replace: sends might not work properly | // TODO: caller could call duplicate shipFile of same name on file replace: sends might not work properly | ||||||
| func (f File) Process(ctx context.Context, db *mongo.Collection, recordLimitBytes int) { | func (f File) Process(ctx context.Context, db *mongo.Collection, recordLimitBytes int) { | ||||||
| 	lineChan := make(chan lines.Raw) |  | ||||||
| 	defer close(lineChan) |  | ||||||
|  |  | ||||||
| 	dbQueue := make(chan m.Record, SendQueueLimit) |  | ||||||
| 	go lines.RawC(lineChan).Process(recordLimitBytes, dbQueue) |  | ||||||
|  |  | ||||||
| 	waitGo := util.GoWg(func() { |  | ||||||
| 		sender.Queue(dbQueue).Sender(db, f.MetricsName) |  | ||||||
| 	}) |  | ||||||
| 	defer waitGo() |  | ||||||
|  |  | ||||||
| 	// TODO: better way to kill or wait for sendQueue before retrying (or duplicates?) |  | ||||||
| 	_ = wait.ManagedExponentialBackoffWithContext(ctx, globals.Backoff(), func() (done bool, _ error) { | 	_ = wait.ManagedExponentialBackoffWithContext(ctx, globals.Backoff(), func() (done bool, _ error) { | ||||||
| 		err := f.trySubmit(ctx, db, lineChan) | 		err := f.process(ctx, db, recordLimitBytes) | ||||||
| 		if err == nil { | 		if err == nil { | ||||||
| 			return true, nil | 			return true, nil | ||||||
| 		} | 		} | ||||||
| @@ -55,20 +42,39 @@ func (f File) Process(ctx context.Context, db *mongo.Collection, recordLimitByte | |||||||
| 	}) | 	}) | ||||||
| } | } | ||||||
|  |  | ||||||
| // use submitter(), don't use directly | func (f File) launchChannels(cancel func(), db *mongo.Collection, recordLimitBytes int) (_ chan<- lines.Raw, deferFn func()) { | ||||||
| func (f File) trySubmit(ctx context.Context, db *mongo.Collection, sendQueue chan<- lines.Raw) error { | 	lineOut := make(chan lines.Raw) | ||||||
| 	lFile := lines.File(f) // file.File, but avoiding import cycle |  | ||||||
|  |  | ||||||
| 	// TODO: better way for respecting ?killing sender for retry | 	sctx, scancel := context.WithCancel(context.Background()) | ||||||
| 	for { | 	cancelAll := func() { | ||||||
| 		if len(sendQueue) == 0 { | 		cancel() | ||||||
| 			break | 		scancel() | ||||||
| 		} |  | ||||||
| 		time.Sleep(time.Second) |  | ||||||
| 	} | 	} | ||||||
|  |  | ||||||
|  | 	dbQueue := make(chan m.Record, SendQueueLimit) | ||||||
|  | 	go lines.RawC(lineOut).Process(sctx, recordLimitBytes, dbQueue) | ||||||
|  |  | ||||||
|  | 	waitBatchSend := util.GoWg(func() { | ||||||
|  | 		sender.Queue(dbQueue).Sender(db, f.MetricsName, cancelAll) | ||||||
|  | 	}) | ||||||
|  |  | ||||||
|  | 	return lineOut, func() { | ||||||
|  | 		close(lineOut) | ||||||
|  | 		waitBatchSend() | ||||||
|  | 	} | ||||||
|  | } | ||||||
|  |  | ||||||
|  | // use submitter(), don't use directly | ||||||
|  | func (f File) process(ctx context.Context, db *mongo.Collection, recordLimitBytes int) error { | ||||||
|  | 	lFile := lines.File(f) // file.File, but avoiding import cycle | ||||||
|  |  | ||||||
|  | 	sctx, cancel := context.WithCancel(ctx) | ||||||
|  |  | ||||||
|  | 	lineOut, dfn := f.launchChannels(cancel, db, recordLimitBytes) | ||||||
|  | 	defer dfn() | ||||||
|  |  | ||||||
| 	// get files with offset | 	// get files with offset | ||||||
| 	offsetResult, _ := mongoWithErr(db.FindOne(globals.MongoTimeout(ctx), | 	offsetResult, _ := mongoWithErr(db.FindOne(globals.MongoTimeout(sctx), | ||||||
| 		bson.D{{Key: m.RecordKeyHostId, Value: f.Host.Id}, {Key: m.RecordKeyFilePath, Value: f.Path}}, | 		bson.D{{Key: m.RecordKeyHostId, Value: f.Host.Id}, {Key: m.RecordKeyFilePath, Value: f.Path}}, | ||||||
| 		&mongoOpt.FindOneOptions{Sort: bson.D{{Key: m.RecordKeyOffset, Value: -1}}}, // sort descending (get largest) | 		&mongoOpt.FindOneOptions{Sort: bson.D{{Key: m.RecordKeyOffset, Value: -1}}}, // sort descending (get largest) | ||||||
| 	)) | 	)) | ||||||
| @@ -77,24 +83,20 @@ func (f File) trySubmit(ctx context.Context, db *mongo.Collection, sendQueue cha | |||||||
| 	if err != nil && !errors.Is(err, mongo.ErrNoDocuments) { | 	if err != nil && !errors.Is(err, mongo.ErrNoDocuments) { | ||||||
| 		return fmt.Errorf("retrieving offset from database: %w", err) | 		return fmt.Errorf("retrieving offset from database: %w", err) | ||||||
| 	} | 	} | ||||||
|  |  | ||||||
| 	dbOffset := m.RecordOffsetFromBson(&offsetResultBytes) | 	dbOffset := m.RecordOffsetFromBson(&offsetResultBytes) | ||||||
|  |  | ||||||
|  | 	// for promFileChatcupDone | ||||||
| 	fi, err := os.Stat(f.Path) | 	fi, err := os.Stat(f.Path) | ||||||
| 	if err != nil { | 	if err != nil { | ||||||
| 		return fmt.Errorf("getting original file size: %w", err) | 		return fmt.Errorf("getting original file size: %w", err) | ||||||
| 	} | 	} | ||||||
| 	startSize := fi.Size() | 	startSize := fi.Size() | ||||||
|  |  | ||||||
| 	sctx, cancel := context.WithCancel(ctx) | 	lineIn, errChan, err := util.TailFile(sctx, f.Path, dbOffset, io.SeekStart) | ||||||
| 	defer cancel() |  | ||||||
|  |  | ||||||
| 	promFileInitialSeekSkipped.WithLabelValues(f.MetricsName).Set(float64(dbOffset)) |  | ||||||
|  |  | ||||||
| 	lineChan, errChan, err := util.TailFile(sctx, f.Path, dbOffset, io.SeekStart) |  | ||||||
| 	if err != nil { | 	if err != nil { | ||||||
| 		return fmt.Errorf("tailing file: %w", err) | 		return fmt.Errorf("tailing file: %w", err) | ||||||
| 	} | 	} | ||||||
|  | 	promFileInitialSeekSkipped.WithLabelValues(f.MetricsName).Set(float64(dbOffset)) | ||||||
|  |  | ||||||
| 	var catchUpped bool | 	var catchUpped bool | ||||||
| 	promFileCatchupDone.WithLabelValues(f.MetricsName).Set(0) | 	promFileCatchupDone.WithLabelValues(f.MetricsName).Set(0) | ||||||
| @@ -104,7 +106,7 @@ func (f File) trySubmit(ctx context.Context, db *mongo.Collection, sendQueue cha | |||||||
| 		case err := <-errChan: | 		case err := <-errChan: | ||||||
| 			return fmt.Errorf("tailing file: %w", err) | 			return fmt.Errorf("tailing file: %w", err) | ||||||
|  |  | ||||||
| 		case line, ok := <-lineChan: | 		case line, ok := <-lineIn: | ||||||
| 			if !ok { | 			if !ok { | ||||||
| 				return nil | 				return nil | ||||||
| 			} | 			} | ||||||
| @@ -123,11 +125,15 @@ func (f File) trySubmit(ctx context.Context, db *mongo.Collection, sendQueue cha | |||||||
| 				continue | 				continue | ||||||
| 			} | 			} | ||||||
|  |  | ||||||
| 			sendQueue <- lines.Raw{ | 			select { | ||||||
|  | 			case <-sctx.Done(): | ||||||
|  |  | ||||||
|  | 			case lineOut <- lines.Raw{ | ||||||
| 				File: &lFile, | 				File: &lFile, | ||||||
|  |  | ||||||
| 				Offset: line.EndOffset, | 				Offset: line.EndOffset, | ||||||
| 				B:      line.Bytes, | 				B:      line.Bytes, | ||||||
|  | 			}: | ||||||
| 			} | 			} | ||||||
| 		} | 		} | ||||||
| 	} | 	} | ||||||
|   | |||||||
| @@ -1,6 +1,7 @@ | |||||||
| package lines | package lines | ||||||
|  |  | ||||||
| import ( | import ( | ||||||
|  | 	"context" | ||||||
| 	"log" | 	"log" | ||||||
| 	"sync" | 	"sync" | ||||||
|  |  | ||||||
| @@ -23,7 +24,7 @@ type ( | |||||||
| ) | ) | ||||||
|  |  | ||||||
| // assumes all lines are from same file | // assumes all lines are from same file | ||||||
| func (unparsed RawC) Process(bufferLimitBytes int, parsed chan<- m.Record) { | func (unparsed RawC) Process(ctx context.Context, bufferLimitBytes int, parsed chan<- m.Record) { | ||||||
| 	lines := make(chan singleLine) | 	lines := make(chan singleLine) | ||||||
| 	go unparsed.parse(lines) | 	go unparsed.parse(lines) | ||||||
|  |  | ||||||
| @@ -32,34 +33,42 @@ func (unparsed RawC) Process(bufferLimitBytes int, parsed chan<- m.Record) { | |||||||
|  |  | ||||||
| 	stdOut, stdErr := make(chan singleLine), make(chan singleLine) | 	stdOut, stdErr := make(chan singleLine), make(chan singleLine) | ||||||
| 	go func() { | 	go func() { | ||||||
| 		singleLines(stdOut).process(bufferLimitBytes, parsed) | 		singleLines(stdOut).process(ctx, bufferLimitBytes, parsed) | ||||||
| 		wg.Done() | 		wg.Done() | ||||||
| 	}() | 	}() | ||||||
| 	go func() { | 	go func() { | ||||||
| 		singleLines(stdErr).process(bufferLimitBytes, parsed) | 		singleLines(stdErr).process(ctx, bufferLimitBytes, parsed) | ||||||
| 		wg.Done() | 		wg.Done() | ||||||
| 	}() | 	}() | ||||||
|  |  | ||||||
|  | 	defer func() { | ||||||
|  | 		close(stdOut) | ||||||
|  | 		close(stdErr) | ||||||
|  | 		wg.Wait() | ||||||
|  | 		close(parsed) | ||||||
|  | 	}() | ||||||
|  |  | ||||||
| 	// split stdout and stderr | 	// split stdout and stderr | ||||||
| 	for { | 	for { | ||||||
| 		line, ok := <-lines | 		select { | ||||||
| 		if !ok { | 		case <-ctx.Done(): | ||||||
| 			close(stdOut) |  | ||||||
| 			close(stdErr) |  | ||||||
| 			wg.Wait() |  | ||||||
| 			close(parsed) |  | ||||||
| 			return | 			return | ||||||
| 		} |  | ||||||
|  |  | ||||||
| 		if line.StdErr { | 		case line, ok := <-lines: | ||||||
| 			stdErr <- line | 			if !ok { | ||||||
| 		} else { | 				return | ||||||
| 			stdOut <- line | 			} | ||||||
|  |  | ||||||
|  | 			if line.StdErr { | ||||||
|  | 				stdErr <- line | ||||||
|  | 			} else { | ||||||
|  | 				stdOut <- line | ||||||
|  | 			} | ||||||
| 		} | 		} | ||||||
| 	} | 	} | ||||||
| } | } | ||||||
|  |  | ||||||
| func (lines singleLines) process(bufferLimitBytes int, parsed chan<- m.Record) { | func (lines singleLines) process(ctx context.Context, bufferLimitBytes int, parsed chan<- m.Record) { | ||||||
| 	var firstMetadata *m.ParsedMetadata | 	var firstMetadata *m.ParsedMetadata | ||||||
| 	var buffer []byte | 	var buffer []byte | ||||||
|  |  | ||||||
| @@ -86,12 +95,17 @@ func (lines singleLines) process(bufferLimitBytes int, parsed chan<- m.Record) { | |||||||
| 		} | 		} | ||||||
|  |  | ||||||
| 		if !line.partial { | 		if !line.partial { | ||||||
| 			parsed <- m.Record{ | 			select { | ||||||
|  | 			case <-ctx.Done(): | ||||||
|  | 				return | ||||||
|  |  | ||||||
|  | 			case parsed <- m.Record{ | ||||||
| 				File:   line.File.File, | 				File:   line.File.File, | ||||||
| 				Offset: line.Offset, | 				Offset: line.Offset, | ||||||
|  |  | ||||||
| 				String:         string(buffer), | 				String:         string(buffer), | ||||||
| 				ParsedMetadata: *firstMetadata, | 				ParsedMetadata: *firstMetadata, | ||||||
|  | 			}: | ||||||
| 			} | 			} | ||||||
|  |  | ||||||
| 			buffer = nil | 			buffer = nil | ||||||
|   | |||||||
| @@ -9,6 +9,7 @@ import ( | |||||||
| 	m "git.k-space.ee/k-space/logmower-shipper/pkg/mongo" | 	m "git.k-space.ee/k-space/logmower-shipper/pkg/mongo" | ||||||
| 	"github.com/jtagcat/util" | 	"github.com/jtagcat/util" | ||||||
| 	"go.mongodb.org/mongo-driver/mongo" | 	"go.mongodb.org/mongo-driver/mongo" | ||||||
|  | 	"go.mongodb.org/mongo-driver/mongo/options" | ||||||
| ) | ) | ||||||
|  |  | ||||||
| const ( | const ( | ||||||
| @@ -18,7 +19,7 @@ const ( | |||||||
|  |  | ||||||
| type Queue <-chan m.Record | type Queue <-chan m.Record | ||||||
|  |  | ||||||
| func (queue Queue) Sender(db *mongo.Collection, metricsFilename string) { | func (queue Queue) Sender(db *mongo.Collection, metricsFilename string, cancelOnError func()) { | ||||||
| 	batched := make(chan []m.Record) | 	batched := make(chan []m.Record) | ||||||
|  |  | ||||||
| 	// metrics for batcher and queue | 	// metrics for batcher and queue | ||||||
| @@ -60,14 +61,31 @@ func (queue Queue) Sender(db *mongo.Collection, metricsFilename string) { | |||||||
| 			batchBson = append(batchBson, b.ToBson()) | 			batchBson = append(batchBson, b.ToBson()) | ||||||
| 		} | 		} | ||||||
|  |  | ||||||
| 		result, err := db.InsertMany(globals.MongoTimeout(context.Background()), batchBson, nil) | 		tru := true | ||||||
|  | 		result, err := db.InsertMany(globals.MongoTimeout(context.Background()), batchBson, &options.InsertManyOptions{Ordered: &tru}) | ||||||
|  |  | ||||||
|  | 		var succeedCount int | ||||||
|  | 		if result != nil { | ||||||
|  | 			succeedCount = len(result.InsertedIDs) | ||||||
|  | 		} | ||||||
|  | 		promShipperDbSent.WithLabelValues(metricsFilename).Add(float64(succeedCount)) | ||||||
|  |  | ||||||
| 		if err != nil { | 		if err != nil { | ||||||
| 			promShipperDbSendError.WithLabelValues(metricsFilename).Add(1) | 			promShipperDbSendError.WithLabelValues(metricsFilename).Add(1) | ||||||
| 			log.Printf("failure in batch submit to database: %e", err) // TODO: add some selective retry here or something, better error handling |  | ||||||
| 			continue |  | ||||||
| 		} |  | ||||||
|  |  | ||||||
| 		promShipperDbSent.WithLabelValues(metricsFilename).Add(float64( | 			if succeedCount == len(batch) { | ||||||
| 			len(result.InsertedIDs))) | 				log.Printf("all insertions in batch were successful, yet failure in database: %e", err) | ||||||
|  |  | ||||||
|  | 				cancelOnError() | ||||||
|  | 				return | ||||||
|  | 			} | ||||||
|  |  | ||||||
|  | 			firstFailed := &batch[succeedCount] // (len-1)+1 | ||||||
|  | 			log.Printf("failure in inserting %q record with offset %d to database: %e", | ||||||
|  | 				firstFailed.Path, firstFailed.Offset, err) | ||||||
|  |  | ||||||
|  | 			cancelOnError() | ||||||
|  | 			return | ||||||
|  | 		} | ||||||
| 	} | 	} | ||||||
| } | } | ||||||
|   | |||||||
		Reference in New Issue
	
	Block a user