package watcher import ( "fmt" "log" "path/filepath" "sync" "git.k-space.ee/k-space/logmower-shipper/pkg/file" "git.k-space.ee/k-space/logmower-shipper/pkg/globals" "git.k-space.ee/k-space/logmower-shipper/pkg/lines" m "git.k-space.ee/k-space/logmower-shipper/pkg/mongo" "git.k-space.ee/k-space/logmower-shipper/pkg/sender" "git.k-space.ee/k-space/logmower-shipper/pkg/util" "github.com/fsnotify/fsnotify" "github.com/urfave/cli/v2" ) var App = &cli.App{ Name: globals.AppName, Version: "1.0.0", Authors: []*cli.Author{{Name: "jtagcat"}, {Name: "codemowers.io"}}, Description: "Collect and ship kubernetes logs", // TODO: #2: yaml Flags: []cli.Flag{ // in Action &cli.StringFlag{Name: "log-directory", Usage: "Directory to watch for logs", Value: "/var/log/containers"}, &cli.Uint64Flag{Category: "mongo", Name: "max-connection-pool-size", EnvVars: []string{"MAX_CONNECTION_POOL_SIZE"}, Value: 1, Usage: "Max MongoDB connection pool size"}, &cli.Int64Flag{Category: "mongo", Name: "max-collection-size", EnvVars: []string{"MAX_COLLECTION_SIZE"}, Usage: "MongoDB collection size limit in bytes"}, &cli.Int64Flag{Category: "mongo", Name: "max-record-retention", EnvVars: []string{"MAX_RECORD_RETENTION"}, Usage: "Record retention in seconds"}, &cli.StringFlag{Category: "mongo", Name: "mongo-uri", EnvVars: []string{"MONGODB_URI"}, Usage: "mongodb://foo:bar@host:27017/database", Required: true}, // in Before &cli.BoolFlag{Category: "mongo", Name: "simulate", Aliases: []string{"dry-run"}, Usage: "Writes to database are simulate as successful"}, &cli.IntFlag{Name: "max-record-size", EnvVars: []string{"MAX_RECORD_SIZE"}, Value: 128 * 1024, Usage: "Maximum record size in bytes"}, &cli.IntFlag{Name: "bulk-insertion-size", EnvVars: []string{"BULK_INSERTION_SIZE"}, Value: 1000, Usage: "MongoDB bulk insertion size in records"}, &cli.IntFlag{Name: "max-upload-queue-size", EnvVars: []string{"MAX_UPLOAD_QUEUE_SIZE"}, Value: 1024, Usage: "Max upload queue size (before batching) in records"}, // TODO: &cli.BoolFlag{Category: "parsing", Name: "heuristic-normalize-log-level", Usage: "Normalize log.level values to Syslog defined keywords", Value: false}, // TODO: &cli.BoolFlag{Category: "parsing", Name: "heuristic-parse-json", Usage: "Attempt automatically unwrapping JSON records", Value: false}, &cli.StringSliceFlag{Category: "selectors", Name: "namespace", EnvVars: []string{"NAMESPACE"}, Usage: "whitelist filter for filenames"}, &cli.StringSliceFlag{Category: "selectors", Name: "exclude-pod-prefixes", EnvVars: []string{"EXCLUDE_POD_PREFIXES"}, Usage: "blacklist filter for filenames", Value: cli.NewStringSlice("logmower-")}, // }, Before: func(ctx *cli.Context) error { sender.Simulate = ctx.Bool("simulate") lines.BufferLimitBytes = ctx.Int("max-record-size") sender.MaxBatchItems = ctx.Int("bulk-insertion-size") if sender.MaxBatchItems < 1 { return fmt.Errorf("bulk-insertion-size minimum is 1") } file.SendQueueLimit = ctx.Int("max-upload-queue-size") return nil }, Action: func(ctx *cli.Context) error { whitelistNamespaces, blacklistPodPrefixes := sliceToMap(ctx.StringSlice("namespace")), ctx.StringSlice("exclude-pod-prefixes") var wg sync.WaitGroup log.Printf("%s %s starting", ctx.App.Name, ctx.App.Version) db, err := m.Initialize(ctx.Context, ctx.String("mongo-uri"), &m.InitializeOptions{ MaxPoolSize: ctx.Uint64("max-connection-pool-size"), CapSizeBytes: ctx.Int64("max-collection-size"), ExpireAfterSeconds: ctx.Int64("max-record-retention"), }) if err != nil { return fmt.Errorf("initializing database connection: %w", err) } hostinfo, err := util.Hostinfo(ctx.String("node-name")) if err != nil { return fmt.Errorf("populating host info: %w", err) } watcher, err := fsnotify.NewWatcher() if err != nil { return fmt.Errorf("initializing log directory watcher: %w", err) } defer watcher.Close() wg.Add(1) go func() { defer wg.Done() for { select { case <-ctx.Context.Done(): return case event, ok := <-watcher.Events: if !ok { return } promWatcherEvents.Add(1) if event.Op != fsnotify.Create { continue } kubeInfo, ok := util.ParseLogFilename(event.Name) if !ok { promWatcherFilesSkipped.Add(1) log.Printf("skipped %q: filename not parsable in kubernetes log format", filepath.Base(event.Name)) continue } if _, ok := whitelistNamespaces[kubeInfo.Namespace]; !ok { continue } if ok := hasSlicePrefix(kubeInfo.Pod, blacklistPodPrefixes); ok { continue } promWatcherFilesStarted.Add(1) wg.Add(1) go func() { file := file.File{ File: &m.File{ Host: &hostinfo, KubeInfo: kubeInfo, Path: event.Name, }, MetricsName: filepath.Base(event.Name), } file.Process(ctx.Context, db) wg.Done() }() case err, ok := <-watcher.Errors: if !ok { return } promWatcherErr.Add(1) log.Printf("watching for new logs: %e", err) } } }() logDir := ctx.String("log-directory") // simulate create events to pick up files already created if err := simulateInitialCreates(logDir, watcher.Events); err != nil { return fmt.Errorf("listing log directory %q: %w", logDir, err) } if err := watcher.Add(logDir); err != nil { return fmt.Errorf("watching for new logs in %q: %w", logDir, err) } promWatcherOnline.Set(1) // waiting indefinitely for interrupt wg.Wait() // wait for watch and file processors to cleanup return ctx.Err() }, }