implement Simulate and globalize BufferLimit
This commit is contained in:
		| @@ -27,9 +27,9 @@ type File struct { | ||||
| } | ||||
|  | ||||
| // TODO: caller could call duplicate shipFile of same name on file replace: sends might not work properly | ||||
| func (f File) Process(ctx context.Context, db *mongo.Collection, recordLimitBytes int) { | ||||
| func (f File) Process(ctx context.Context, db *mongo.Collection) { | ||||
| 	_ = wait.ManagedExponentialBackoffWithContext(ctx, globals.Backoff(), func() (done bool, _ error) { | ||||
| 		err := f.process(ctx, db, recordLimitBytes) | ||||
| 		err := f.process(ctx, db) | ||||
| 		if err == nil { | ||||
| 			return true, nil | ||||
| 		} | ||||
| @@ -42,7 +42,7 @@ func (f File) Process(ctx context.Context, db *mongo.Collection, recordLimitByte | ||||
| 	}) | ||||
| } | ||||
|  | ||||
| func (f File) launchChannels(cancel func(), db *mongo.Collection, recordLimitBytes int) (_ chan<- lines.Raw, deferFn func()) { | ||||
| func (f File) launchChannels(cancel func(), db *mongo.Collection) (_ chan<- lines.Raw, deferFn func()) { | ||||
| 	lineOut := make(chan lines.Raw) | ||||
|  | ||||
| 	sctx, scancel := context.WithCancel(context.Background()) | ||||
| @@ -52,7 +52,7 @@ func (f File) launchChannels(cancel func(), db *mongo.Collection, recordLimitByt | ||||
| 	} | ||||
|  | ||||
| 	dbQueue := make(chan m.Record, SendQueueLimit) | ||||
| 	go lines.RawC(lineOut).Process(sctx, recordLimitBytes, dbQueue) | ||||
| 	go lines.RawC(lineOut).Process(sctx, dbQueue) | ||||
|  | ||||
| 	waitBatchSend := util.GoWg(func() { | ||||
| 		sender.Queue(dbQueue).Sender(db, f.MetricsName, cancelAll) | ||||
| @@ -65,12 +65,12 @@ func (f File) launchChannels(cancel func(), db *mongo.Collection, recordLimitByt | ||||
| } | ||||
|  | ||||
| // use submitter(), don't use directly | ||||
| func (f File) process(ctx context.Context, db *mongo.Collection, recordLimitBytes int) error { | ||||
| func (f File) process(ctx context.Context, db *mongo.Collection) error { | ||||
| 	lFile := lines.File(f) // file.File, but avoiding import cycle | ||||
|  | ||||
| 	sctx, cancel := context.WithCancel(ctx) | ||||
|  | ||||
| 	lineOut, dfn := f.launchChannels(cancel, db, recordLimitBytes) | ||||
| 	lineOut, dfn := f.launchChannels(cancel, db) | ||||
| 	defer dfn() | ||||
|  | ||||
| 	// get files with offset | ||||
|   | ||||
| @@ -15,6 +15,11 @@ const ( | ||||
| 	DatabaseCommandTimeout = 10 * time.Second | ||||
| ) | ||||
|  | ||||
| var ( | ||||
| 	BufferLimitBytes int | ||||
| 	Simulate         bool | ||||
| ) | ||||
|  | ||||
| func MongoTimeout(ctx context.Context) context.Context { | ||||
| 	ctx, _ = context.WithTimeout(ctx, DatabaseCommandTimeout) //nolint:lostcancel (cancelled by mongo, should be bug on them //TODO) | ||||
| 	return ctx | ||||
|   | ||||
| @@ -5,6 +5,7 @@ import ( | ||||
| 	"log" | ||||
| 	"sync" | ||||
|  | ||||
| 	"git.k-space.ee/k-space/logmower-shipper/pkg/globals" | ||||
| 	m "git.k-space.ee/k-space/logmower-shipper/pkg/mongo" | ||||
| ) | ||||
|  | ||||
| @@ -24,7 +25,7 @@ type ( | ||||
| ) | ||||
|  | ||||
| // assumes all lines are from same file | ||||
| func (unparsed RawC) Process(ctx context.Context, bufferLimitBytes int, parsed chan<- m.Record) { | ||||
| func (unparsed RawC) Process(ctx context.Context, parsed chan<- m.Record) { | ||||
| 	lines := make(chan singleLine) | ||||
| 	go unparsed.parse(lines) | ||||
|  | ||||
| @@ -33,11 +34,11 @@ func (unparsed RawC) Process(ctx context.Context, bufferLimitBytes int, parsed c | ||||
|  | ||||
| 	stdOut, stdErr := make(chan singleLine), make(chan singleLine) | ||||
| 	go func() { | ||||
| 		singleLines(stdOut).process(ctx, bufferLimitBytes, parsed) | ||||
| 		singleLines(stdOut).process(ctx, parsed) | ||||
| 		wg.Done() | ||||
| 	}() | ||||
| 	go func() { | ||||
| 		singleLines(stdErr).process(ctx, bufferLimitBytes, parsed) | ||||
| 		singleLines(stdErr).process(ctx, parsed) | ||||
| 		wg.Done() | ||||
| 	}() | ||||
|  | ||||
| @@ -68,7 +69,7 @@ func (unparsed RawC) Process(ctx context.Context, bufferLimitBytes int, parsed c | ||||
| 	} | ||||
| } | ||||
|  | ||||
| func (lines singleLines) process(ctx context.Context, bufferLimitBytes int, parsed chan<- m.Record) { | ||||
| func (lines singleLines) process(ctx context.Context, parsed chan<- m.Record) { | ||||
| 	var firstMetadata *m.ParsedMetadata | ||||
| 	var buffer []byte | ||||
|  | ||||
| @@ -86,9 +87,9 @@ func (lines singleLines) process(ctx context.Context, bufferLimitBytes int, pars | ||||
|  | ||||
| 		buffer = append(buffer, line.B...) | ||||
|  | ||||
| 		if len(buffer) > bufferLimitBytes { | ||||
| 		if len(buffer) > globals.BufferLimitBytes { | ||||
| 			promRecordDroppedTooLarge.WithLabelValues(line.MetricsName).Add(1) | ||||
| 			log.Printf("dropped record: size in bytes exceeds limit of %d", bufferLimitBytes) | ||||
| 			log.Printf("dropped record: size in bytes exceeds limit of %d", globals.BufferLimitBytes) | ||||
|  | ||||
| 			buffer = nil | ||||
| 			continue | ||||
|   | ||||
| @@ -2,6 +2,7 @@ package sender | ||||
|  | ||||
| import ( | ||||
| 	"context" | ||||
| 	"fmt" | ||||
| 	"log" | ||||
| 	"time" | ||||
|  | ||||
| @@ -56,13 +57,7 @@ func (queue Queue) Sender(db *mongo.Collection, metricsFilename string, cancelOn | ||||
|  | ||||
| 		promShipperSynced.WithLabelValues(metricsFilename).Set(0) | ||||
|  | ||||
| 		var batchBson []interface{} // mongo does not like typing | ||||
| 		for _, b := range batch { | ||||
| 			batchBson = append(batchBson, b.ToBson()) | ||||
| 		} | ||||
|  | ||||
| 		tru := true | ||||
| 		result, err := db.InsertMany(globals.MongoTimeout(context.Background()), batchBson, &options.InsertManyOptions{Ordered: &tru}) | ||||
| 		result, err := insertManyWithSimulate(db, batch) | ||||
|  | ||||
| 		var succeedCount int | ||||
| 		if result != nil { | ||||
| @@ -89,3 +84,24 @@ func (queue Queue) Sender(db *mongo.Collection, metricsFilename string, cancelOn | ||||
| 		} | ||||
| 	} | ||||
| } | ||||
|  | ||||
| func insertManyWithSimulate(db *mongo.Collection, batch []m.Record) (*mongo.InsertManyResult, error) { | ||||
| 	if !globals.Simulate { | ||||
| 		var batchBson []interface{} // mongo does not like typing | ||||
| 		for _, b := range batch { | ||||
| 			batchBson = append(batchBson, b.ToBson()) | ||||
| 		} | ||||
|  | ||||
| 		tru := true | ||||
| 		return db.InsertMany(globals.MongoTimeout(context.Background()), batchBson, &options.InsertManyOptions{Ordered: &tru}) | ||||
| 	} | ||||
|  | ||||
| 	fmt.Printf("simulating successful database bulk write: %v", batch) | ||||
|  | ||||
| 	var res mongo.InsertManyResult | ||||
| 	for range batch { | ||||
| 		res.InsertedIDs = append(res.InsertedIDs, nil) | ||||
| 	} | ||||
|  | ||||
| 	return &res, nil | ||||
| } | ||||
|   | ||||
| @@ -24,21 +24,26 @@ var App = &cli.App{ | ||||
| 	// Usage:       "rubykana <input>", | ||||
| 	// TODO: #2: yaml | ||||
| 	Flags: []cli.Flag{ | ||||
| 		&cli.BoolFlag{Name: "simulate", Aliases: []string{"dry-run"}, Usage: "Do not write to database"}, // TODO: | ||||
| 		&cli.BoolFlag{Name: "simulate", Aliases: []string{"dry-run"}, Usage: "Do not write to database"}, | ||||
| 		&cli.StringFlag{Name: "log-directory", Usage: "Directory to watch for logs", Value: "/var/log/containers"}, | ||||
| 		&cli.IntFlag{Name: "max-record-size", Value: 128 * 1024, Usage: "Maximum record size in bytes"},            // TODO: | ||||
| 		&cli.BoolFlag{Name: "normalize-log-level", Usage: "Normalize log.level values to Syslog defined keywords"}, // TODO: | ||||
| 		// &cli.BoolFlag{Name: "parse-json"}, //TODO: | ||||
| 		&cli.StringFlag{Category: "k8s metadata", Name: "pod-namespace", EnvVars: []string{"KUBE_POD_NAMESPACE"}}, // TODO: | ||||
| 		&cli.IntFlag{Name: "max-record-size", Value: 128 * 1024, Usage: "Maximum record size in bytes"}, | ||||
| 		// | ||||
| 		//TODO: &cli.BoolFlag{Name: "normalize-log-level", Usage: "Normalize log.level values to Syslog defined keywords"}, | ||||
| 		//TODO: &cli.BoolFlag{Name: "parse-json"}, | ||||
| 		// TODO: | ||||
| 		&cli.StringFlag{Category: "k8s metadata", Name: "pod-namespace", EnvVars: []string{"KUBE_POD_NAMESPACE"}}, | ||||
| 		&cli.StringFlag{Category: "k8s metadata", Name: "node-name", EnvVars: []string{"KUBE_NODE_NAME"}, Required: true}, | ||||
| 		// | ||||
| 		&cli.StringFlag{Category: "secrets", Name: "mongo-uri", EnvVars: []string{"MONGO_URI"}, Usage: "mongodb://foo:bar@host:27017/database", Required: true}, | ||||
| 	}, | ||||
| 	Before: func(ctx *cli.Context) error { | ||||
| 		if ctx.Int("max-record-size") < 1 { | ||||
| 			return fmt.Errorf("max-record-size must be postivie") | ||||
| 		globals.BufferLimitBytes = ctx.Int("max-record-size") | ||||
| 		if globals.BufferLimitBytes < 1 { | ||||
| 			return fmt.Errorf("max-record-size must be positive") | ||||
| 		} | ||||
|  | ||||
| 		globals.Simulate = ctx.Bool("simulate") | ||||
|  | ||||
| 		return nil | ||||
| 	}, | ||||
|  | ||||
| @@ -102,7 +107,7 @@ var App = &cli.App{ | ||||
| 							MetricsName: filepath.Base(event.Name), | ||||
| 						} | ||||
|  | ||||
| 						file.Process(ctx.Context, db, ctx.Int("max-record-size")) | ||||
| 						file.Process(ctx.Context, db) | ||||
| 						wg.Done() | ||||
| 					}() | ||||
|  | ||||
|   | ||||
		Reference in New Issue
	
	Block a user