package watcher import ( "fmt" "log" "os" "path/filepath" "strings" "sync" "git.k-space.ee/k-space/logmower-shipper/pkg/file" "git.k-space.ee/k-space/logmower-shipper/pkg/globals" m "git.k-space.ee/k-space/logmower-shipper/pkg/mongo" "git.k-space.ee/k-space/logmower-shipper/pkg/util" "github.com/fsnotify/fsnotify" "github.com/urfave/cli/v2" ) var App = &cli.App{ Name: globals.AppName, Version: "1.0.0", Authors: []*cli.Author{{Name: "jtagcat"}, {Name: "codemowers.io"}}, Description: "Collect and ship kubernetes logs", // Usage: "rubykana ", // TODO: #2: yaml Flags: []cli.Flag{ &cli.BoolFlag{Name: "simulate", Aliases: []string{"dry-run"}, Usage: "Do not write to database"}, &cli.StringFlag{Name: "log-directory", Usage: "Directory to watch for logs", Value: "/var/log/containers"}, &cli.IntFlag{Name: "max-record-size", Value: 128 * 1024, Usage: "Maximum record size in bytes"}, // //TODO: &cli.BoolFlag{Name: "normalize-log-level", Usage: "Normalize log.level values to Syslog defined keywords"}, //TODO: &cli.BoolFlag{Name: "parse-json"}, // &cli.StringSliceFlag{Category: "selectors", Name: "pod-namespace", EnvVars: []string{"KUBE_POD_NAMESPACE"}, Usage: "whitelist filter for filenames"}, &cli.StringSliceFlag{Category: "selectors", Name: "pod-prefix", EnvVars: []string{"KUBE_NODE_NAME"}, Usage: "blacklist filter for filenames"}, // &cli.StringFlag{Category: "secrets", Name: "mongo-uri", EnvVars: []string{"MONGO_URI"}, Usage: "mongodb://foo:bar@host:27017/database", Required: true}, }, Before: func(ctx *cli.Context) error { globals.BufferLimitBytes = ctx.Int("max-record-size") if globals.BufferLimitBytes < 1 { return fmt.Errorf("max-record-size must be positive") } globals.Simulate = ctx.Bool("simulate") return nil }, Action: func(ctx *cli.Context) error { whitelistNamespaces, blacklistPodPrefixes := sliceToMap(ctx.StringSlice("pod-namespace")), ctx.StringSlice("pod-prefix") var wg sync.WaitGroup log.Printf("%s %s starting", ctx.App.Name, ctx.App.Version) db, err := m.Initialize(ctx.Context, ctx.String("mongo-uri")) if err != nil { return fmt.Errorf("initializing database connection: %w", err) } hostinfo, err := util.Hostinfo(ctx.String("node-name")) if err != nil { return fmt.Errorf("populating host info: %w", err) } watcher, err := fsnotify.NewWatcher() if err != nil { return fmt.Errorf("initializing log directory watcher: %w", err) } defer watcher.Close() wg.Add(1) go func() { defer wg.Done() for { select { case <-ctx.Context.Done(): return case event, ok := <-watcher.Events: if !ok { return } promWatcherEvents.Add(1) if event.Op != fsnotify.Create { continue } // TODO: #1: || if not in filterset kubeInfo, ok := util.ParseLogFilename(event.Name) if !ok { promWatcherFilesSkipped.Add(1) log.Printf("skipped %q: filename not parsable in kubernetes log format", filepath.Base(event.Name)) continue } if _, ok := whitelistNamespaces[kubeInfo.Namespace]; !ok { continue } if ok := hasSlicePrefix(kubeInfo.Pod, blacklistPodPrefixes); ok { continue } promWatcherFilesStarted.Add(1) wg.Add(1) go func() { file := file.File{ File: &m.File{ Host: &hostinfo, KubeInfo: kubeInfo, Path: event.Name, }, MetricsName: filepath.Base(event.Name), } file.Process(ctx.Context, db) wg.Done() }() case err, ok := <-watcher.Errors: if !ok { return } promWatcherErr.Add(1) log.Printf("watching for new logs: %e", err) } } }() logDir := ctx.String("log-directory") // simulate create events to pick up files already created if err := simulateInitialCreates(logDir, watcher.Events); err != nil { return fmt.Errorf("listing log directory %q: %w", logDir, err) } if err := watcher.Add(logDir); err != nil { return fmt.Errorf("watching for new logs in %q: %w", logDir, err) } promWatcherOnline.Set(1) // waiting indefinitely for interrupt wg.Wait() // wait for watch and file processors to cleanup return ctx.Err() }, } func simulateInitialCreates(dirName string, eventChan chan<- fsnotify.Event) error { dir, err := os.ReadDir(dirName) if err != nil { return err } for _, file := range dir { eventChan <- fsnotify.Event{ Name: filepath.Join(dirName, file.Name()), Op: fsnotify.Create, } } return nil } func sliceToMap[T comparable](sl []T) map[T]interface{} { m := make(map[T]interface{}) for _, k := range sl { m[k] = nil } return m } func hasSlicePrefix(s string, sl []string) bool { for _, prefix := range sl { if strings.HasPrefix(s, prefix) { return true } } return false }