logmower-shipper/cmd/logmower.go

224 lines
5.6 KiB
Go

package logmower
import (
"context"
"errors"
"fmt"
"net/http"
"os"
"os/signal"
"path"
"runtime"
"strings"
"sync"
"time"
"github.com/fsnotify/fsnotify"
prom "github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/client_golang/prometheus/promauto"
"github.com/prometheus/client_golang/prometheus/promhttp"
"github.com/urfave/cli/v2"
"go.elastic.co/ecszap"
"go.mongodb.org/mongo-driver/mongo"
"go.uber.org/zap"
"k8s.io/apimachinery/pkg/util/wait"
)
const (
MACHINEID = "/etc/machine-id"
MONGO_TIMEOUT = 10 * time.Second
SendQueueLimit = 1024
MaxBatchTime = time.Second
)
// wrapper to force copying before use
func defaultBackoff() wait.Backoff {
return wait.Backoff{
Duration: 2 * time.Second,
Factor: 1.5,
Jitter: 0.1,
Cap: 30 * time.Second,
}
}
func mongoTimeoutCtx(ctx context.Context) context.Context {
ctx, _ = context.WithTimeout(ctx, MONGO_TIMEOUT) //nolint:lostcancel (cancelled by mongo, should be bug on them //TODO)
return ctx
}
var App = &cli.App{
Name: "logmower-shipper",
Version: "1.0.0",
Authors: []*cli.Author{{Name: "jtagcat"}},
Description: "Collect and ship kubernetes logs",
// Usage: "rubykana <input>",
// TODO: #2: yaml
Flags: []cli.Flag{
&cli.BoolFlag{Name: "dry-run", Usage: "Do not write to database"}, // TODO:
&cli.StringFlag{Name: "log-directory", Usage: "Directory to watch for logs", Value: "/var/log/containers"},
&cli.IntFlag{Name: "max-record-size", Value: 128 * 1024, Usage: "Maximum record size in bytes"}, // TODO:
&cli.BoolFlag{Name: "normalize-log-level", Usage: "Normalize log.level values to Syslog defined keywords"}, // TODO:
// &cli.BoolFlag{Name: "parse-json"}, //TODO:
&cli.StringFlag{Category: "k8s metadata", Name: "pod-namespace", EnvVars: []string{"KUBE_POD_NAMESPACE"}}, // TODO:
&cli.StringFlag{Category: "k8s metadata", Name: "node-name", EnvVars: []string{"KUBE_NODE_NAME"}, Required: true},
&cli.StringFlag{Category: "secrets", Name: "mongo-uri", EnvVars: []string{"MONGO_URI"}, Usage: "mongodb://foo:bar@host:27017", Required: true},
},
Action: func(ctx *cli.Context) error {
ctx.Context, _ = signal.NotifyContext(ctx.Context, os.Interrupt) // TODO: test
var wg sync.WaitGroup
// init logger, ECS schema
core := ecszap.NewCore(ecszap.NewDefaultEncoderConfig(),
os.Stderr, zap.WarnLevel)
l := zap.New(core, zap.AddCaller())
l.Info("logmower starting", zap.String("version", ctx.App.Version))
var (
promWatcherOnline = promauto.NewGauge(prom.GaugeOpts{
Subsystem: "watcher",
Name: "online",
Help: "1 if initialized, and directory watcher has been engaged successfully",
})
promErrWatching = promauto.NewCounter(prom.CounterOpts{
Subsystem: "watcher",
Name: "errors_count",
Help: "Error in logmower watching log files",
})
promFilesRead = promauto.NewCounter(prom.CounterOpts{
Subsystem: "watcher",
Name: "file_count",
Help: "Number of tracked log files",
})
)
go func() {
l.Info("/metrics starting", zap.Int("port", 2112))
http.Handle("/metrics", promhttp.Handler())
if err := http.ListenAndServe(":2112", nil); !errors.Is(err, http.ErrServerClosed) {
l.Fatal("failed to serve /metrics", zap.Error(err))
}
}()
state := submitter{
ctx: ctx.Context,
l: l,
sendQueue: make(chan mLog, SendQueueLimit),
}
dbOpt := mongoMonitoredClientOptions(l).ApplyURI(ctx.String("mongo-uri"))
dbClient, err := mongo.Connect(mongoTimeoutCtx(ctx.Context))
if err != nil {
l.Fatal("connecting to mongo", zap.String("uri", dbOpt.GetURI()), zap.Error(err))
}
state.db = dbClient.Database(dbOpt.Auth.AuthSource).Collection("logs")
wg.Add(1)
go func() {
state.sender()
wg.Done()
}()
state.hostInfo, err = getHostInfo(ctx.String("node-name"))
if err != nil {
l.Fatal("gathering host info", zap.Error(err))
}
watcher, err := fsnotify.NewWatcher()
if err != nil {
l.Fatal("setting up watcher", zap.Error(err))
}
logDir := ctx.String("log-directory")
wg.Add(1)
go func() {
defer wg.Done()
for {
select {
case <-ctx.Context.Done():
return
case event, ok := <-watcher.Events:
if !ok {
return
}
// TODO: #1: || if not in filterset
if event.Op != fsnotify.Create {
continue
}
absPath := path.Join(logDir, event.Name)
promFilesRead.Add(1)
l.Debug("digesting new file", zap.String("name", absPath))
wg.Add(1)
go func() {
state.shipFile(absPath)
wg.Done()
}()
case err, ok := <-watcher.Errors:
if !ok {
return
}
promErrWatching.Add(1)
l.Error("while watching log dir events", zap.Error(err))
}
}
}()
err = watcher.Add(logDir)
if err != nil {
promErrWatching.Add(1)
l.Fatal("watching log directory", zap.String("name", logDir), zap.Error(err))
}
promWatcherOnline.Set(1)
// waiting indefinitely for interrupt
wg.Wait() // wait for watch and file processors to cleanup
return errAppend(watcher.Close(), state.ctx.Err())
},
}
type HostInfo struct {
id string
name string
arch string
}
func getHostInfo(nodeName string) (h HostInfo, err error) {
if nodeName == "" {
nodeName, err = os.Hostname()
if err != nil {
err = fmt.Errorf("name: hostname: %w", err) // don't exit early
}
}
h.name = strings.TrimSpace(nodeName)
id, errL := os.ReadFile(MACHINEID)
err = errAppend(err, fmt.Errorf("id: %w", errL))
h.id = strings.TrimSpace(string(id))
h.arch = runtime.GOARCH
return h, err
}
func errAppend(a, b error) error {
if a == nil {
return b
}
if b == nil {
return a
}
return fmt.Errorf("%e; %e", a, b)
}