From 0b3d382742ee1027938e8f3b471164876295326b Mon Sep 17 00:00:00 2001 From: rasmus Date: Fri, 11 Nov 2022 17:09:12 +0200 Subject: [PATCH] work on flags --- pkg/file/file.go | 16 +++++++++++++--- pkg/globals/globals.go | 32 ++------------------------------ pkg/lines/lines.go | 7 ++++--- pkg/mongo/metrics.go | 22 +++++++++++----------- pkg/mongo/mongo.go | 20 ++++++++++++++------ pkg/sender/sender.go | 13 ++++++++----- pkg/watcher/app.go | 24 +++++++++++++++--------- 7 files changed, 67 insertions(+), 67 deletions(-) diff --git a/pkg/file/file.go b/pkg/file/file.go index 8396f8f..5641059 100644 --- a/pkg/file/file.go +++ b/pkg/file/file.go @@ -7,8 +7,8 @@ import ( "io" "log" "os" + "time" - "git.k-space.ee/k-space/logmower-shipper/pkg/globals" "git.k-space.ee/k-space/logmower-shipper/pkg/lines" m "git.k-space.ee/k-space/logmower-shipper/pkg/mongo" "git.k-space.ee/k-space/logmower-shipper/pkg/sender" @@ -21,6 +21,16 @@ import ( const SendQueueLimit = 1024 +// wrapper to force copying before use +func backoff() wait.Backoff { + return wait.Backoff{ + Duration: 2 * time.Second, + Factor: 1.5, + Jitter: 0.1, + Cap: 30 * time.Second, + } +} + type File struct { *m.File MetricsName string // filepath.Base() @@ -28,7 +38,7 @@ type File struct { // TODO: caller could call duplicate shipFile of same name on file replace: sends might not work properly func (f File) Process(ctx context.Context, db *mongo.Collection) { - _ = wait.ManagedExponentialBackoffWithContext(ctx, globals.Backoff(), func() (done bool, _ error) { + _ = wait.ManagedExponentialBackoffWithContext(ctx, backoff(), func() (done bool, _ error) { err := f.process(ctx, db) if err == nil { return true, nil @@ -74,7 +84,7 @@ func (f File) process(ctx context.Context, db *mongo.Collection) error { defer dfn() // get files with offset - offsetResult, _ := mongoWithErr(db.FindOne(globals.MongoTimeout(sctx), + offsetResult, _ := mongoWithErr(db.FindOne(m.GlobalTimeout(sctx), bson.D{{Key: m.RecordKeyHostId, Value: f.Host.Id}, {Key: m.RecordKeyFilePath, Value: f.Path}}, &mongoOpt.FindOneOptions{Sort: bson.D{{Key: m.RecordKeyOffset, Value: -1}}}, // sort descending (get largest) )) diff --git a/pkg/globals/globals.go b/pkg/globals/globals.go index e9e5f80..21591a0 100644 --- a/pkg/globals/globals.go +++ b/pkg/globals/globals.go @@ -1,36 +1,8 @@ package globals -import ( - "context" - "time" - - "k8s.io/apimachinery/pkg/util/wait" -) - // Did not find any better way for multipackage prefixing const ( - PrometheusPrefix = "logmower" - AppName = PrometheusPrefix + "shipper" - DatabaseCommandTimeout = 10 * time.Second + PrometheusPrefix = "logmower" + AppName = PrometheusPrefix + "shipper" ) - -var ( - BufferLimitBytes int - Simulate bool -) - -func MongoTimeout(ctx context.Context) context.Context { - ctx, _ = context.WithTimeout(ctx, DatabaseCommandTimeout) //nolint:lostcancel (cancelled by mongo, should be bug on them //TODO) - return ctx -} - -// wrapper to force copying before use -func Backoff() wait.Backoff { - return wait.Backoff{ - Duration: 2 * time.Second, - Factor: 1.5, - Jitter: 0.1, - Cap: 30 * time.Second, - } -} diff --git a/pkg/lines/lines.go b/pkg/lines/lines.go index 3ac50c2..dd4ffde 100644 --- a/pkg/lines/lines.go +++ b/pkg/lines/lines.go @@ -5,10 +5,11 @@ import ( "log" "sync" - "git.k-space.ee/k-space/logmower-shipper/pkg/globals" m "git.k-space.ee/k-space/logmower-shipper/pkg/mongo" ) +var BufferLimitBytes int + type ( RawC <-chan Raw Raw struct { @@ -87,9 +88,9 @@ func (lines singleLines) process(ctx context.Context, parsed chan<- m.Record) { buffer = append(buffer, line.B...) - if len(buffer) > globals.BufferLimitBytes { + if len(buffer) > BufferLimitBytes && BufferLimitBytes != 0 { promRecordDroppedTooLarge.WithLabelValues(line.MetricsName).Add(1) - log.Printf("dropped record: size in bytes exceeds limit of %d", globals.BufferLimitBytes) + log.Printf("dropped record: size in bytes exceeds limit of %d", BufferLimitBytes) buffer = nil continue diff --git a/pkg/mongo/metrics.go b/pkg/mongo/metrics.go index 5312285..6189112 100644 --- a/pkg/mongo/metrics.go +++ b/pkg/mongo/metrics.go @@ -20,15 +20,15 @@ var promDbHeartbeat = promauto.NewHistogramVec(prom.HistogramOpts{ Buckets: []float64{0.1, 0.2, 0.5, 1, 5, 10, 50}, }, []string{"connection_id"}) -func monitoredClientOptions() *mongoOpt.ClientOptions { - return mongoOpt.Client(). - SetServerMonitor(&mongoEvent.ServerMonitor{ - ServerHeartbeatSucceeded: func(ev *mongoEvent.ServerHeartbeatSucceededEvent) { - promDbHeartbeat.WithLabelValues(ev.ConnectionID).Observe(time.Duration(ev.DurationNanos).Seconds()) - }, - ServerHeartbeatFailed: func(ev *mongoEvent.ServerHeartbeatFailedEvent) { - promDbHeartbeat.WithLabelValues(ev.ConnectionID).Observe(0) - log.Printf("database heartbeat failed on connection %q: %e", ev.ConnectionID, ev.Failure) - }, - }) +func attachMetrics(opts *mongoOpt.ClientOptions) *mongoOpt.ClientOptions { + opts.SetServerMonitor(&mongoEvent.ServerMonitor{ + ServerHeartbeatSucceeded: func(ev *mongoEvent.ServerHeartbeatSucceededEvent) { + promDbHeartbeat.WithLabelValues(ev.ConnectionID).Observe(time.Duration(ev.DurationNanos).Seconds()) + }, + ServerHeartbeatFailed: func(ev *mongoEvent.ServerHeartbeatFailedEvent) { + promDbHeartbeat.WithLabelValues(ev.ConnectionID).Observe(0) + log.Printf("database heartbeat failed on connection %q: %e", ev.ConnectionID, ev.Failure) + }, + }) + return opts } diff --git a/pkg/mongo/mongo.go b/pkg/mongo/mongo.go index fe737b2..0d0af80 100644 --- a/pkg/mongo/mongo.go +++ b/pkg/mongo/mongo.go @@ -4,12 +4,20 @@ import ( "context" "fmt" "net/url" + "time" - "git.k-space.ee/k-space/logmower-shipper/pkg/globals" "go.mongodb.org/mongo-driver/mongo" + mongoOpt "go.mongodb.org/mongo-driver/mongo/options" ) -func Initialize(ctx context.Context, uri string) (*mongo.Collection, error) { +const CommandTimeout = 10 * time.Second + +func GlobalTimeout(ctx context.Context) context.Context { + ctx, _ = context.WithTimeout(ctx, CommandTimeout) //nolint:lostcancel (cancelled by mongo, should be bug on them //TODO) + return ctx +} + +func Initialize(ctx context.Context, uri string, opts *mongoOpt.ClientOptions) (*mongo.Collection, error) { uriParsed, err := url.ParseRequestURI(uri) if err != nil { return nil, fmt.Errorf("parsing URI for database name: %w", err) @@ -20,20 +28,20 @@ func Initialize(ctx context.Context, uri string) (*mongo.Collection, error) { return nil, fmt.Errorf("URI must include database name (as database to authenticate against)") } - dbOpt := monitoredClientOptions().ApplyURI(uri) + dbOpt := attachMetrics(opts).ApplyURI(uri) - dbClient, err := mongo.Connect(globals.MongoTimeout(ctx), dbOpt) + dbClient, err := mongo.Connect(GlobalTimeout(ctx), dbOpt) if err != nil { return nil, fmt.Errorf("connecting to %q: %w", dbOpt.GetURI(), err) } - if err := dbClient.Ping(globals.MongoTimeout(ctx), nil); err != nil { + if err := dbClient.Ping(GlobalTimeout(ctx), nil); err != nil { return nil, fmt.Errorf("first ping to database: %w", err) } col := dbClient.Database(uriParsed.Path).Collection("logs") - if err := InitializeIndexes(globals.MongoTimeout(ctx), col); err != nil { + if err := InitializeIndexes(GlobalTimeout(ctx), col); err != nil { return nil, fmt.Errorf("initializing indexes: %w", err) } diff --git a/pkg/sender/sender.go b/pkg/sender/sender.go index e90ac70..7adc6c9 100644 --- a/pkg/sender/sender.go +++ b/pkg/sender/sender.go @@ -6,16 +6,19 @@ import ( "log" "time" - "git.k-space.ee/k-space/logmower-shipper/pkg/globals" m "git.k-space.ee/k-space/logmower-shipper/pkg/mongo" "github.com/jtagcat/util" "go.mongodb.org/mongo-driver/mongo" "go.mongodb.org/mongo-driver/mongo/options" ) +var ( + Simulate = false + MaxBatchItems = 1000 +) + const ( - MaxBatchItems = 10000 - MaxBatchTime = 5 * time.Second + MaxBatchTime = 5 * time.Second ) type Queue <-chan m.Record @@ -86,14 +89,14 @@ func (queue Queue) Sender(db *mongo.Collection, metricsFilename string, cancelOn } func insertManyWithSimulate(db *mongo.Collection, batch []m.Record) (*mongo.InsertManyResult, error) { - if !globals.Simulate { + if !Simulate { var batchBson []interface{} // mongo does not like typing for _, b := range batch { batchBson = append(batchBson, b.ToBson()) } tru := true - return db.InsertMany(globals.MongoTimeout(context.Background()), batchBson, &options.InsertManyOptions{Ordered: &tru}) + return db.InsertMany(m.GlobalTimeout(context.Background()), batchBson, &options.InsertManyOptions{Ordered: &tru}) } fmt.Printf("simulating successful database bulk write: %v", batch) diff --git a/pkg/watcher/app.go b/pkg/watcher/app.go index e5ae737..737e23a 100644 --- a/pkg/watcher/app.go +++ b/pkg/watcher/app.go @@ -8,10 +8,13 @@ import ( "git.k-space.ee/k-space/logmower-shipper/pkg/file" "git.k-space.ee/k-space/logmower-shipper/pkg/globals" + "git.k-space.ee/k-space/logmower-shipper/pkg/lines" m "git.k-space.ee/k-space/logmower-shipper/pkg/mongo" + "git.k-space.ee/k-space/logmower-shipper/pkg/sender" "git.k-space.ee/k-space/logmower-shipper/pkg/util" "github.com/fsnotify/fsnotify" "github.com/urfave/cli/v2" + mongoOpt "go.mongodb.org/mongo-driver/mongo/options" ) var App = &cli.App{ @@ -20,39 +23,42 @@ var App = &cli.App{ Authors: []*cli.Author{{Name: "jtagcat"}, {Name: "codemowers.io"}}, Description: "Collect and ship kubernetes logs", - // Usage: "rubykana ", // TODO: #2: yaml Flags: []cli.Flag{ &cli.BoolFlag{Name: "simulate", Aliases: []string{"dry-run"}, Usage: "Do not write to database"}, &cli.StringFlag{Name: "log-directory", Usage: "Directory to watch for logs", Value: "/var/log/containers"}, - &cli.IntFlag{Name: "max-record-size", Value: 128 * 1024, Usage: "Maximum record size in bytes"}, + &cli.IntFlag{Name: "max-record-size", EnvVars: []string{"MAX_RECORD_SIZE"}, Value: 128 * 1024, Usage: "Maximum record size in bytes"}, + &cli.IntFlag{Name: "bulk-insertion-size", EnvVars: []string{"BULK_INSERTION_SIZE"}, Value: 1000, Usage: "MongoDB bulk insertion size in records"}, + &cli.Uint64Flag{Name: "max-connection-pool-size", EnvVars: []string{"MAX_CONNECTION_POOL_SIZE"}, Value: 1, Usage: "Max MongoDB connection pool size"}, // //TODO: &cli.BoolFlag{Name: "normalize-log-level", Usage: "Normalize log.level values to Syslog defined keywords"}, //TODO: &cli.BoolFlag{Name: "parse-json"}, // - &cli.StringSliceFlag{Category: "selectors", Name: "namespace", EnvVars: []string{"KUBE_NAMESPACE"}, Usage: "whitelist filter for filenames"}, - &cli.StringSliceFlag{Category: "selectors", Name: "pod-prefix", EnvVars: []string{"KUBE_NODE_NAME"}, Usage: "blacklist filter for filenames"}, + &cli.StringSliceFlag{Category: "selectors", Name: "namespace", EnvVars: []string{"NAMESPACE"}, Usage: "whitelist filter for filenames"}, + &cli.StringSliceFlag{Category: "selectors", Name: "exclude-pod-prefixes", EnvVars: []string{"EXCLUDE_POD_PREFIXES"}, Usage: "blacklist filter for filenames", Value: cli.NewStringSlice("logmower-")}, // &cli.StringFlag{Category: "secrets", Name: "mongo-uri", EnvVars: []string{"MONGODB_URI"}, Usage: "mongodb://foo:bar@host:27017/database", Required: true}, }, Before: func(ctx *cli.Context) error { - globals.BufferLimitBytes = ctx.Int("max-record-size") - if globals.BufferLimitBytes < 1 { + lines.BufferLimitBytes = ctx.Int("max-record-size") + if lines.BufferLimitBytes < 1 { return fmt.Errorf("max-record-size must be positive") } - globals.Simulate = ctx.Bool("simulate") + sender.Simulate = ctx.Bool("simulate") + sender.MaxBatchItems = ctx.Int("bulk-insertion-size") return nil }, Action: func(ctx *cli.Context) error { - whitelistNamespaces, blacklistPodPrefixes := sliceToMap(ctx.StringSlice("namespace")), ctx.StringSlice("pod-prefix") + whitelistNamespaces, blacklistPodPrefixes := sliceToMap(ctx.StringSlice("namespace")), ctx.StringSlice("exclude-pod-prefixes") var wg sync.WaitGroup log.Printf("%s %s starting", ctx.App.Name, ctx.App.Version) - db, err := m.Initialize(ctx.Context, ctx.String("mongo-uri")) + db, err := m.Initialize(ctx.Context, ctx.String("mongo-uri"), mongoOpt.Client(). + SetMaxPoolSize(ctx.Uint64("max-connection-pool-size"))) if err != nil { return fmt.Errorf("initializing database connection: %w", err) }