logmower-shipper/pkg/watcher/watcher.go

188 lines
4.7 KiB
Go

package watcher
import (
"fmt"
"log"
"os"
"path/filepath"
"strings"
"sync"
"git.k-space.ee/k-space/logmower-shipper/pkg/file"
"git.k-space.ee/k-space/logmower-shipper/pkg/globals"
m "git.k-space.ee/k-space/logmower-shipper/pkg/mongo"
"git.k-space.ee/k-space/logmower-shipper/pkg/util"
"github.com/fsnotify/fsnotify"
"github.com/urfave/cli/v2"
)
var App = &cli.App{
Name: globals.AppName,
Version: "1.0.0",
Authors: []*cli.Author{{Name: "jtagcat"}, {Name: "codemowers.io"}},
Description: "Collect and ship kubernetes logs",
// Usage: "rubykana <input>",
// TODO: #2: yaml
Flags: []cli.Flag{
&cli.BoolFlag{Name: "simulate", Aliases: []string{"dry-run"}, Usage: "Do not write to database"},
&cli.StringFlag{Name: "log-directory", Usage: "Directory to watch for logs", Value: "/var/log/containers"},
&cli.IntFlag{Name: "max-record-size", Value: 128 * 1024, Usage: "Maximum record size in bytes"},
//
//TODO: &cli.BoolFlag{Name: "normalize-log-level", Usage: "Normalize log.level values to Syslog defined keywords"},
//TODO: &cli.BoolFlag{Name: "parse-json"},
//
&cli.StringSliceFlag{Category: "selectors", Name: "namespace", EnvVars: []string{"KUBE_NAMESPACE"}, Usage: "whitelist filter for filenames"},
&cli.StringSliceFlag{Category: "selectors", Name: "pod-prefix", EnvVars: []string{"KUBE_NODE_NAME"}, Usage: "blacklist filter for filenames"},
//
&cli.StringFlag{Category: "secrets", Name: "mongo-uri", EnvVars: []string{"MONGODB_URI"}, Usage: "mongodb://foo:bar@host:27017/database", Required: true},
},
Before: func(ctx *cli.Context) error {
globals.BufferLimitBytes = ctx.Int("max-record-size")
if globals.BufferLimitBytes < 1 {
return fmt.Errorf("max-record-size must be positive")
}
globals.Simulate = ctx.Bool("simulate")
return nil
},
Action: func(ctx *cli.Context) error {
whitelistNamespaces, blacklistPodPrefixes := sliceToMap(ctx.StringSlice("namespace")), ctx.StringSlice("pod-prefix")
var wg sync.WaitGroup
log.Printf("%s %s starting", ctx.App.Name, ctx.App.Version)
db, err := m.Initialize(ctx.Context, ctx.String("mongo-uri"))
if err != nil {
return fmt.Errorf("initializing database connection: %w", err)
}
hostinfo, err := util.Hostinfo(ctx.String("node-name"))
if err != nil {
return fmt.Errorf("populating host info: %w", err)
}
watcher, err := fsnotify.NewWatcher()
if err != nil {
return fmt.Errorf("initializing log directory watcher: %w", err)
}
defer watcher.Close()
wg.Add(1)
go func() {
defer wg.Done()
for {
select {
case <-ctx.Context.Done():
return
case event, ok := <-watcher.Events:
if !ok {
return
}
promWatcherEvents.Add(1)
if event.Op != fsnotify.Create {
continue
}
// TODO: #1: || if not in filterset
kubeInfo, ok := util.ParseLogFilename(event.Name)
if !ok {
promWatcherFilesSkipped.Add(1)
log.Printf("skipped %q: filename not parsable in kubernetes log format", filepath.Base(event.Name))
continue
}
if _, ok := whitelistNamespaces[kubeInfo.Namespace]; !ok {
continue
}
if ok := hasSlicePrefix(kubeInfo.Pod, blacklistPodPrefixes); ok {
continue
}
promWatcherFilesStarted.Add(1)
wg.Add(1)
go func() {
file := file.File{
File: &m.File{
Host: &hostinfo,
KubeInfo: kubeInfo,
Path: event.Name,
},
MetricsName: filepath.Base(event.Name),
}
file.Process(ctx.Context, db)
wg.Done()
}()
case err, ok := <-watcher.Errors:
if !ok {
return
}
promWatcherErr.Add(1)
log.Printf("watching for new logs: %e", err)
}
}
}()
logDir := ctx.String("log-directory")
// simulate create events to pick up files already created
if err := simulateInitialCreates(logDir, watcher.Events); err != nil {
return fmt.Errorf("listing log directory %q: %w", logDir, err)
}
if err := watcher.Add(logDir); err != nil {
return fmt.Errorf("watching for new logs in %q: %w", logDir, err)
}
promWatcherOnline.Set(1)
// waiting indefinitely for interrupt
wg.Wait() // wait for watch and file processors to cleanup
return ctx.Err()
},
}
func simulateInitialCreates(dirName string, eventChan chan<- fsnotify.Event) error {
dir, err := os.ReadDir(dirName)
if err != nil {
return err
}
for _, file := range dir {
eventChan <- fsnotify.Event{
Name: filepath.Join(dirName, file.Name()),
Op: fsnotify.Create,
}
}
return nil
}
func sliceToMap[T comparable](sl []T) map[T]interface{} {
m := make(map[T]interface{})
for _, k := range sl {
m[k] = nil
}
return m
}
func hasSlicePrefix(s string, sl []string) bool {
for _, prefix := range sl {
if strings.HasPrefix(s, prefix) {
return true
}
}
return false
}