package logmower import ( "context" "errors" "fmt" "net/http" "net/url" "os" "os/signal" "path/filepath" "runtime" "strings" "sync" "time" "github.com/fsnotify/fsnotify" prom "github.com/prometheus/client_golang/prometheus" "github.com/prometheus/client_golang/prometheus/promauto" "github.com/prometheus/client_golang/prometheus/promhttp" "github.com/urfave/cli/v2" "go.elastic.co/ecszap" "go.mongodb.org/mongo-driver/mongo" "go.uber.org/zap" "k8s.io/apimachinery/pkg/util/wait" ) const ( MachineId = "/etc/machine-id" MongoTimeout = 10 * time.Second PrometheusPrefix = "logmower-shipper" ) // wrapper to force copying before use func defaultBackoff() wait.Backoff { return wait.Backoff{ Duration: 2 * time.Second, Factor: 1.5, Jitter: 0.1, Cap: 30 * time.Second, } } func mongoTimeoutCtx(ctx context.Context) context.Context { ctx, _ = context.WithTimeout(ctx, MongoTimeout) //nolint:lostcancel (cancelled by mongo, should be bug on them //TODO) return ctx } var App = &cli.App{ Name: "logmower-shipper", Version: "1.0.0", Authors: []*cli.Author{{Name: "jtagcat"}}, Description: "Collect and ship kubernetes logs", // Usage: "rubykana ", // TODO: #2: yaml Flags: []cli.Flag{ &cli.BoolFlag{Name: "dry-run", Usage: "Do not write to database"}, // TODO: &cli.StringFlag{Name: "log-directory", Usage: "Directory to watch for logs", Value: "/var/log/containers"}, &cli.IntFlag{Name: "max-record-size", Value: 128 * 1024, Usage: "Maximum record size in bytes"}, // TODO: &cli.BoolFlag{Name: "normalize-log-level", Usage: "Normalize log.level values to Syslog defined keywords"}, // TODO: &cli.BoolFlag{Name: "delete-after-read", Usage: "Delete log file when it is synced to mongo, and no new lines to read", Value: false}, // &cli.BoolFlag{Name: "parse-json"}, //TODO: &cli.StringFlag{Category: "k8s metadata", Name: "pod-namespace", EnvVars: []string{"KUBE_POD_NAMESPACE"}}, // TODO: &cli.StringFlag{Category: "k8s metadata", Name: "node-name", EnvVars: []string{"KUBE_NODE_NAME"}, Required: true}, &cli.StringFlag{Category: "secrets", Name: "mongo-uri", EnvVars: []string{"MONGO_URI"}, Usage: "mongodb://foo:bar@host:27017/database", Required: true}, }, Action: func(ctx *cli.Context) error { ctx.Context, _ = signal.NotifyContext(ctx.Context, os.Interrupt) // TODO: test var wg sync.WaitGroup // init logger, ECS schema core := ecszap.NewCore(ecszap.NewDefaultEncoderConfig(), os.Stderr, zap.WarnLevel) l := zap.New(core, zap.AddCaller()) l.Info("logmower starting", zap.String("version", ctx.App.Version)) var ( promWatcherOnline = promauto.NewGauge(prom.GaugeOpts{ Namespace: PrometheusPrefix, Subsystem: "watcher", Name: "online", Help: "1 if initialized, and directory watcher has been engaged successfully", }) promErrWatching = promauto.NewCounter(prom.CounterOpts{ Namespace: PrometheusPrefix, Subsystem: "watcher", Name: "errors", Help: "Error in logmower watching log files", }) promFilesRead = promauto.NewCounter(prom.CounterOpts{ Namespace: PrometheusPrefix, Subsystem: "watcher", Name: "seen_files", Help: "Number of tracked log files", }) ) go func() { l.Info("/metrics starting", zap.Int("port", 2112)) http.Handle("/metrics", promhttp.Handler()) if err := http.ListenAndServe(":2112", nil); !errors.Is(err, http.ErrServerClosed) { l.Fatal("failed to serve /metrics", zap.Error(err)) } }() state := submitter{l: l} dbOpt := mongoMonitoredClientOptions(l).ApplyURI(ctx.String("mongo-uri")) dbClient, err := mongo.Connect(mongoTimeoutCtx(ctx.Context)) if err != nil { l.Fatal("connecting to mongo", zap.String("uri", dbOpt.GetURI()), zap.Error(err)) } uriParsed, err := url.ParseRequestURI(ctx.String("mongo-uri")) if err != nil { l.Fatal("parsing URI for mongo database name", zap.Error(err)) } uriParsed.Path = uriParsed.Path[1:] // remove leading slash if uriParsed.Path == "" { l.Fatal("mongo database name must be set in mongo URI") } state.db = dbClient.Database(uriParsed.Path).Collection("logs") err = initializeIndexes(ctx.Context, state.db) if err != nil { l.Fatal("initializing indexes", zap.Error(err)) } state.hostInfo, err = getHostInfo(ctx.String("node-name")) if err != nil { l.Fatal("gathering host info", zap.Error(err)) } watcher, err := fsnotify.NewWatcher() if err != nil { l.Fatal("setting up watcher", zap.Error(err)) } logDir := ctx.String("log-directory") wg.Add(1) go func() { defer wg.Done() for { select { case <-ctx.Context.Done(): return case event, ok := <-watcher.Events: if !ok { return } // TODO: #1: || if not in filterset if event.Op != fsnotify.Create { continue } promFilesRead.Add(1) l.Debug("digesting new file", zap.String("name", event.Name)) wg.Add(1) go func() { state.shipFile(ctx.Context, event.Name, ctx.Bool("delete-after-read")) wg.Done() }() case err, ok := <-watcher.Errors: if !ok { return } promErrWatching.Add(1) l.Error("while watching log dir events", zap.Error(err)) } } }() // simulate create events to pick up files already created err = simulateInitialCreate(logDir, watcher.Events) if err != nil { promErrWatching.Add(1) l.Fatal("listing initial log directory", zap.String("name", logDir), zap.Error(err)) } err = watcher.Add(logDir) if err != nil { promErrWatching.Add(1) l.Fatal("watching log directory", zap.String("name", logDir), zap.Error(err)) } promWatcherOnline.Set(1) // waiting indefinitely for interrupt wg.Wait() // wait for watch and file processors to cleanup return errAppend(watcher.Close(), ctx.Err()) }, } type HostInfo struct { id string name string arch string } func getHostInfo(nodeName string) (h HostInfo, err error) { if nodeName == "" { nodeName, err = os.Hostname() if err != nil { err = fmt.Errorf("name: hostname: %w", err) // don't exit early } } h.name = strings.TrimSpace(nodeName) id, errL := os.ReadFile(MachineId) if errL != nil { err = errAppend(err, fmt.Errorf("id: %w", errL)) } h.id = strings.TrimSpace(string(id)) h.arch = runtime.GOARCH return h, err } func errAppend(a, b error) error { if a == nil { return b } if b == nil { return a } return fmt.Errorf("%e; %e", a, b) } type logMeta struct { podName string podNamespace string containerName string containerId string } func parseLogName(name string) (m logMeta, ok bool) { name = filepath.Base(name) // https://github.com/kubernetes/design-proposals-archive/blob/8da1442ea29adccea40693357d04727127e045ed/node/kubelet-cri-logging.md // __-.log` m.podName, name, ok = strings.Cut(name, "_") if !ok { return } m.podNamespace, name, ok = strings.Cut(name, "_") if !ok { return } m.containerName, name, ok = strings.Cut(name, "-") if !ok { return } m.containerId = strings.TrimSuffix(name, ".log") return m, true } func simulateInitialCreate(dirName string, eventChan chan<- fsnotify.Event) error { dir, err := os.ReadDir(dirName) if err != nil { return err } for _, file := range dir { eventChan <- fsnotify.Event{ Name: filepath.Join(dirName, file.Name()), Op: fsnotify.Create, } } return nil }