refactor and continue implementation of whole pkg
This commit is contained in:
221
cmd/logmower.go
Normal file
221
cmd/logmower.go
Normal file
@@ -0,0 +1,221 @@
|
||||
package logmower
|
||||
|
||||
import (
|
||||
"context"
|
||||
"errors"
|
||||
"fmt"
|
||||
"net/http"
|
||||
"os"
|
||||
"os/signal"
|
||||
"path"
|
||||
"runtime"
|
||||
"strings"
|
||||
"sync"
|
||||
"time"
|
||||
|
||||
"github.com/fsnotify/fsnotify"
|
||||
prom "github.com/prometheus/client_golang/prometheus"
|
||||
"github.com/prometheus/client_golang/prometheus/promauto"
|
||||
"github.com/prometheus/client_golang/prometheus/promhttp"
|
||||
"github.com/urfave/cli/v2"
|
||||
"go.elastic.co/ecszap"
|
||||
"go.mongodb.org/mongo-driver/mongo"
|
||||
"go.uber.org/zap"
|
||||
"k8s.io/apimachinery/pkg/util/wait"
|
||||
)
|
||||
|
||||
const (
|
||||
MACHINEID = "/etc/machine-id"
|
||||
MONGO_TIMEOUT = 10 * time.Second
|
||||
SendQueueLimit = 1024
|
||||
MaxBatchTime = time.Second
|
||||
)
|
||||
|
||||
// wrapper to force copying before use
|
||||
func defaultBackoff() wait.Backoff {
|
||||
return wait.Backoff{
|
||||
Duration: 2 * time.Second,
|
||||
Factor: 1.5,
|
||||
Jitter: 0.1,
|
||||
Cap: 30 * time.Second,
|
||||
}
|
||||
}
|
||||
|
||||
func mongoTimeoutCtx(ctx context.Context) context.Context {
|
||||
ctx, _ = context.WithTimeout(ctx, MONGO_TIMEOUT) //nolint:lostcancel (cancelled by mongo, should be bug on them //TODO)
|
||||
return ctx
|
||||
}
|
||||
|
||||
var App = &cli.App{
|
||||
Name: "logmower",
|
||||
Version: "1.0.0",
|
||||
Authors: []*cli.Author{{Name: "jtagcat"}},
|
||||
|
||||
Description: "Collect and ship kubernetes logs",
|
||||
// Usage: "rubykana <input>",
|
||||
Flags: []cli.Flag{
|
||||
&cli.BoolFlag{Name: "dry-run", Usage: "Do not write to database"}, // TODO:
|
||||
&cli.StringFlag{Name: "log-directory", Usage: "Directory to watch for logs", Value: "/var/log/containers"},
|
||||
&cli.IntFlag{Name: "max-record-size", Value: 128 * 1024, Usage: "Maximum record size in bytes"}, // TODO:
|
||||
&cli.BoolFlag{Name: "normalize-log-level", Usage: "Normalize log.level values to Syslog defined keywords"}, // TODO:
|
||||
// &cli.BoolFlag{Name: "parse-json"}, //TODO:
|
||||
&cli.StringFlag{Category: "k8s metadata", Name: "pod-namespace", EnvVars: []string{"KUBE_POD_NAMESPACE"}}, // TODO:
|
||||
&cli.StringFlag{Category: "k8s metadata", Name: "node-name", EnvVars: []string{"KUBE_NODE_NAME"}, Required: true},
|
||||
&cli.StringFlag{Category: "secrets", Name: "mongo-uri", EnvVars: []string{"MONGO_URI"}, Usage: "mongodb://foo:bar@host:27017", Required: true},
|
||||
},
|
||||
|
||||
Action: func(ctx *cli.Context) error {
|
||||
ctx.Context, _ = signal.NotifyContext(ctx.Context, os.Interrupt) // TODO: test
|
||||
var wg sync.WaitGroup
|
||||
|
||||
// init logger, ECS schema
|
||||
core := ecszap.NewCore(ecszap.NewDefaultEncoderConfig(),
|
||||
os.Stderr, zap.WarnLevel)
|
||||
l := zap.New(core, zap.AddCaller())
|
||||
|
||||
l.Info("logmower starting", zap.String("version", ctx.App.Version))
|
||||
|
||||
var (
|
||||
promWatcherOnline = promauto.NewGauge(prom.GaugeOpts{
|
||||
Subsystem: "watcher",
|
||||
Name: "online",
|
||||
Help: "1 if initialized, and directory watcher has been engaged successfully",
|
||||
})
|
||||
|
||||
promErrWatching = promauto.NewCounter(prom.CounterOpts{
|
||||
Subsystem: "watcher",
|
||||
Name: "errors_count",
|
||||
Help: "Error in logmower watching log files",
|
||||
})
|
||||
promFilesRead = promauto.NewCounter(prom.CounterOpts{
|
||||
Subsystem: "watcher",
|
||||
Name: "file_count",
|
||||
Help: "Number of tracked log files",
|
||||
})
|
||||
)
|
||||
go func() {
|
||||
l.Info("/metrics starting", zap.Int("port", 2112))
|
||||
http.Handle("/metrics", promhttp.Handler())
|
||||
|
||||
if err := http.ListenAndServe(":2112", nil); !errors.Is(err, http.ErrServerClosed) {
|
||||
l.Fatal("failed to serve /metrics", zap.Error(err))
|
||||
}
|
||||
}()
|
||||
|
||||
state := submitter{
|
||||
ctx: ctx.Context,
|
||||
l: l,
|
||||
sendQueue: make(chan mLog, SendQueueLimit),
|
||||
}
|
||||
|
||||
dbOpt := mongoMonitoredClientOptions(l).ApplyURI(ctx.String("mongo-uri"))
|
||||
|
||||
dbClient, err := mongo.Connect(mongoTimeoutCtx(ctx.Context))
|
||||
if err != nil {
|
||||
l.Fatal("connecting to mongo", zap.String("uri", dbOpt.GetURI()), zap.Error(err))
|
||||
}
|
||||
|
||||
state.db = dbClient.Database(dbOpt.Auth.AuthSource).Collection("logs")
|
||||
wg.Add(1)
|
||||
go func() {
|
||||
state.sender()
|
||||
wg.Done()
|
||||
}()
|
||||
|
||||
state.hostInfo, err = getHostInfo(ctx.String("node-name"))
|
||||
if err != nil {
|
||||
l.Fatal("gathering host info", zap.Error(err))
|
||||
}
|
||||
|
||||
watcher, err := fsnotify.NewWatcher()
|
||||
if err != nil {
|
||||
l.Fatal("setting up watcher", zap.Error(err))
|
||||
}
|
||||
|
||||
logDir := ctx.String("log-directory")
|
||||
|
||||
wg.Add(1)
|
||||
go func() {
|
||||
defer wg.Done()
|
||||
|
||||
for {
|
||||
select {
|
||||
case <-ctx.Context.Done():
|
||||
return
|
||||
case event, ok := <-watcher.Events:
|
||||
if !ok {
|
||||
return
|
||||
}
|
||||
if event.Op != fsnotify.Create {
|
||||
continue
|
||||
}
|
||||
|
||||
absPath := path.Join(logDir, event.Name)
|
||||
promFilesRead.Add(1)
|
||||
l.Debug("digesting new file", zap.String("name", absPath))
|
||||
|
||||
wg.Add(1)
|
||||
go func() {
|
||||
state.shipFile(absPath)
|
||||
wg.Done()
|
||||
}()
|
||||
|
||||
case err, ok := <-watcher.Errors:
|
||||
if !ok {
|
||||
return
|
||||
}
|
||||
promErrWatching.Add(1)
|
||||
l.Error("while watching log dir events", zap.Error(err))
|
||||
}
|
||||
}
|
||||
}()
|
||||
|
||||
err = watcher.Add(logDir)
|
||||
if err != nil {
|
||||
promErrWatching.Add(1)
|
||||
l.Fatal("watching log directory", zap.String("name", logDir), zap.Error(err))
|
||||
}
|
||||
|
||||
promWatcherOnline.Set(1)
|
||||
|
||||
// waiting indefinitely for interrupt
|
||||
wg.Wait() // wait for watch and file processors to cleanup
|
||||
|
||||
return errAppend(watcher.Close(), state.ctx.Err())
|
||||
},
|
||||
}
|
||||
|
||||
type HostInfo struct {
|
||||
id string
|
||||
name string
|
||||
arch string
|
||||
}
|
||||
|
||||
func getHostInfo(nodeName string) (h HostInfo, err error) {
|
||||
if nodeName == "" {
|
||||
nodeName, err = os.Hostname()
|
||||
if err != nil {
|
||||
err = fmt.Errorf("name: hostname: %w", err) // don't exit early
|
||||
}
|
||||
}
|
||||
h.name = strings.TrimSpace(nodeName)
|
||||
|
||||
id, errL := os.ReadFile(MACHINEID)
|
||||
err = errAppend(err, fmt.Errorf("id: %w", errL))
|
||||
|
||||
h.id = strings.TrimSpace(string(id))
|
||||
|
||||
h.arch = runtime.GOARCH
|
||||
|
||||
return h, err
|
||||
}
|
||||
|
||||
func errAppend(a, b error) error {
|
||||
if a == nil {
|
||||
return b
|
||||
}
|
||||
if b == nil {
|
||||
return a
|
||||
}
|
||||
return fmt.Errorf("%e; %e", a, b)
|
||||
}
|
56
cmd/mongo.go
Normal file
56
cmd/mongo.go
Normal file
@@ -0,0 +1,56 @@
|
||||
package logmower
|
||||
|
||||
import (
|
||||
"context"
|
||||
"fmt"
|
||||
|
||||
prom "github.com/prometheus/client_golang/prometheus"
|
||||
"github.com/prometheus/client_golang/prometheus/promauto"
|
||||
mongoEvent "go.mongodb.org/mongo-driver/event"
|
||||
mongoOpt "go.mongodb.org/mongo-driver/mongo/options"
|
||||
"go.uber.org/zap"
|
||||
)
|
||||
|
||||
func mongoMonitoredClientOptions(l *zap.Logger) *mongoOpt.ClientOptions {
|
||||
promMongoHeartbeat := promauto.NewHistogramVec(prom.HistogramOpts{
|
||||
Subsystem: "mongo",
|
||||
Name: "heartbeat",
|
||||
Help: "Time in ns for succeeded heartbeat, or 0 on failure",
|
||||
Buckets: []float64{1},
|
||||
}, []string{"connection_id"})
|
||||
|
||||
promMongoCmd := promauto.NewHistogramVec(prom.HistogramOpts{
|
||||
Subsystem: "mongo",
|
||||
Name: "command",
|
||||
Help: "Time in ns of commands",
|
||||
Buckets: prom.DefBuckets,
|
||||
}, []string{"connection_id", "command_name"})
|
||||
|
||||
promMongoCmdErr := promauto.NewCounterVec(prom.CounterOpts{
|
||||
Subsystem: "mongo",
|
||||
Name: "errors_count",
|
||||
Help: "Count of failed commands",
|
||||
}, []string{"connection_id", "command_name"})
|
||||
|
||||
return mongoOpt.Client().
|
||||
SetServerMonitor(&mongoEvent.ServerMonitor{
|
||||
ServerHeartbeatSucceeded: func(ev *mongoEvent.ServerHeartbeatSucceededEvent) {
|
||||
promMongoHeartbeat.WithLabelValues(ev.ConnectionID).Observe(float64(ev.DurationNanos))
|
||||
},
|
||||
ServerHeartbeatFailed: func(ev *mongoEvent.ServerHeartbeatFailedEvent) {
|
||||
promMongoHeartbeat.WithLabelValues(ev.ConnectionID).Observe(0)
|
||||
l.Error("mongo heartbeat", zap.Error(ev.Failure), zap.String("connection_id", ev.ConnectionID))
|
||||
},
|
||||
}).
|
||||
SetMonitor(&mongoEvent.CommandMonitor{
|
||||
Succeeded: func(_ context.Context, ev *mongoEvent.CommandSucceededEvent) {
|
||||
promMongoCmd.WithLabelValues(ev.ConnectionID, ev.CommandName).Observe(float64(ev.DurationNanos))
|
||||
},
|
||||
Failed: func(_ context.Context, ev *mongoEvent.CommandFailedEvent) {
|
||||
promMongoCmd.WithLabelValues(ev.ConnectionID, ev.CommandName).Observe(float64(ev.DurationNanos))
|
||||
|
||||
promMongoCmdErr.WithLabelValues(ev.ConnectionID, ev.CommandName).Add(1)
|
||||
l.Error("mongo command", zap.Error(fmt.Errorf("%s", ev.Failure)), zap.String("connection_id", ev.ConnectionID), zap.String("command_name", ev.CommandName)) // TODO: https://github.com/mongodb/mongo-go-driver/pull/1105
|
||||
},
|
||||
})
|
||||
}
|
95
cmd/sender.go
Normal file
95
cmd/sender.go
Normal file
@@ -0,0 +1,95 @@
|
||||
package logmower
|
||||
|
||||
import (
|
||||
"time"
|
||||
|
||||
"github.com/jtagcat/util"
|
||||
prom "github.com/prometheus/client_golang/prometheus"
|
||||
"github.com/prometheus/client_golang/prometheus/promauto"
|
||||
"go.mongodb.org/mongo-driver/bson"
|
||||
"go.uber.org/zap"
|
||||
)
|
||||
|
||||
var (
|
||||
promShipperMongoSent = promauto.NewCounter(prom.CounterOpts{
|
||||
Subsystem: "shipper",
|
||||
Name: "sent_count",
|
||||
Help: "Log items successfully committed to mongo",
|
||||
})
|
||||
promShipperMongoSentError = promauto.NewCounter(prom.CounterOpts{
|
||||
Subsystem: "shipper",
|
||||
Name: "mongo_errors",
|
||||
Help: "Errors while submitting to mongo", // TODO:
|
||||
})
|
||||
promShipperDropped = promauto.NewCounterVec(prom.CounterOpts{
|
||||
Subsystem: "shipper",
|
||||
Name: "queue_dropped",
|
||||
Help: "Items ready to be sent to mongo, but dropped due to full queue",
|
||||
}, []string{"filename"})
|
||||
)
|
||||
|
||||
func init() {
|
||||
promauto.NewGaugeFunc(prom.GaugeOpts{
|
||||
Subsystem: "shipper",
|
||||
Name: "queue_size",
|
||||
Help: "Submit queue size",
|
||||
}, func() float64 {
|
||||
return float64(SendQueueLimit)
|
||||
})
|
||||
}
|
||||
|
||||
func (s *submitter) sender() {
|
||||
promauto.NewGaugeFunc(prom.GaugeOpts{
|
||||
Subsystem: "shipper",
|
||||
Name: "queue_items",
|
||||
Help: "Items in queue to be submitted in batch to mongo",
|
||||
}, func() float64 {
|
||||
return float64(len(s.sendQueue))
|
||||
})
|
||||
|
||||
batched := make(chan []mLog)
|
||||
|
||||
go func() {
|
||||
util.Batch(4, MaxBatchTime, s.sendQueue, batched)
|
||||
}()
|
||||
|
||||
for {
|
||||
batch, ok := <-batched
|
||||
if !ok {
|
||||
return
|
||||
}
|
||||
|
||||
var batchBson []interface{} // mongo does not like typing
|
||||
for _, b := range batch {
|
||||
batchBson = append(batchBson, b.toBson())
|
||||
}
|
||||
|
||||
result, err := s.db.InsertMany(mongoTimeoutCtx(s.ctx), batchBson, nil)
|
||||
promShipperMongoSent.Add(float64(
|
||||
len(result.InsertedIDs)))
|
||||
|
||||
if err != nil {
|
||||
s.l.Error("mongo send returned error; TODO: add some selective retry here or something", zap.Error(err)) // TODO:
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// when editing, also edit toBson(); all bson.D (and bson.M) uses
|
||||
type mLog struct {
|
||||
HostInfo HostInfo
|
||||
Filename string
|
||||
Offset int64 // byte offset where log entry ends at
|
||||
Content string // TODO:
|
||||
Time time.Time
|
||||
}
|
||||
|
||||
// not using marshal, since it is <0.1x performance
|
||||
func (l *mLog) toBson() bson.M {
|
||||
return bson.M{
|
||||
"host_info": l.HostInfo,
|
||||
"filename": l.Filename,
|
||||
"offset": l.Offset,
|
||||
"content": l.Content,
|
||||
"time": l.Time,
|
||||
}
|
||||
}
|
137
cmd/submit.go
Normal file
137
cmd/submit.go
Normal file
@@ -0,0 +1,137 @@
|
||||
package logmower
|
||||
|
||||
import (
|
||||
"context"
|
||||
"errors"
|
||||
"fmt"
|
||||
"io"
|
||||
"os"
|
||||
"path/filepath"
|
||||
|
||||
"github.com/jtagcat/util"
|
||||
prom "github.com/prometheus/client_golang/prometheus"
|
||||
"github.com/prometheus/client_golang/prometheus/promauto"
|
||||
"go.mongodb.org/mongo-driver/bson"
|
||||
"go.mongodb.org/mongo-driver/mongo"
|
||||
mongoOpt "go.mongodb.org/mongo-driver/mongo/options"
|
||||
"go.uber.org/zap"
|
||||
"k8s.io/apimachinery/pkg/util/wait"
|
||||
)
|
||||
|
||||
var (
|
||||
promCatchupDone = promauto.NewGaugeVec(prom.GaugeOpts{
|
||||
Subsystem: "file",
|
||||
Name: "catchupped",
|
||||
Help: "File count where backlog has been sent; <= watcher_file_count",
|
||||
}, []string{"filename"}) // TODO: rm filename?
|
||||
promFileErr = promauto.NewCounterVec(prom.CounterOpts{
|
||||
Subsystem: "file",
|
||||
Name: "errors_count",
|
||||
Help: "Error count for reading files",
|
||||
}, []string{"filename"})
|
||||
)
|
||||
|
||||
type (
|
||||
submitter struct {
|
||||
ctx context.Context
|
||||
l *zap.Logger
|
||||
|
||||
hostInfo HostInfo
|
||||
db *mongo.Collection
|
||||
|
||||
sendQueue chan mLog
|
||||
}
|
||||
)
|
||||
|
||||
func (s *submitter) shipFile(name string) {
|
||||
baseName := filepath.Base(name)
|
||||
|
||||
sigCatchupped := make(chan struct{}, 1)
|
||||
go func() {
|
||||
<-sigCatchupped
|
||||
close(sigCatchupped) // once
|
||||
|
||||
promCatchupDone.WithLabelValues(baseName).Add(1)
|
||||
}()
|
||||
|
||||
// TODO: restarting before mongo sendQueue finishes will result in duplicates
|
||||
wait.ManagedExponentialBackoffWithContext(s.ctx, defaultBackoff(), func() (done bool, _ error) {
|
||||
if err := s.shipFileRoutine(name, sigCatchupped); err != nil {
|
||||
promFileErr.WithLabelValues(baseName).Add(1)
|
||||
s.l.Error("shipping file", zap.String("filename", baseName), zap.Error(err))
|
||||
return false, nil // nil since we want it to loop and keep retrying indefinitely
|
||||
}
|
||||
return true, nil
|
||||
})
|
||||
}
|
||||
|
||||
func (s *submitter) shipFileRoutine(name string, sigCatchupped chan<- struct{}) error {
|
||||
// Initialize in case of new file
|
||||
log := mLog{
|
||||
HostInfo: s.hostInfo,
|
||||
Filename: name,
|
||||
}
|
||||
|
||||
// get files with offset
|
||||
offsetResult, err := mWithErr(s.db.FindOne(mongoTimeoutCtx(s.ctx),
|
||||
bson.D{{Key: "hostinfo.id", Value: s.hostInfo.id}, {Key: "filename", Value: name}},
|
||||
&mongoOpt.FindOneOptions{Sort: bson.D{{Key: "offset", Value: -1}}}, // sort descending (get largest)
|
||||
))
|
||||
if err != nil && !errors.Is(err, mongo.ErrNoDocuments) {
|
||||
return fmt.Errorf("retrieving mongo offset: %w", err)
|
||||
}
|
||||
|
||||
// offsetResult.DecodeBytes() //TODO: check for extra fields
|
||||
|
||||
if err := offsetResult.Decode(&log); err != nil && !errors.Is(err, mongo.ErrNoDocuments) {
|
||||
return fmt.Errorf("decoding mongo offset: %w", err)
|
||||
}
|
||||
|
||||
fi, err := os.Stat(log.Filename)
|
||||
if err != nil {
|
||||
return fmt.Errorf("getting original file size")
|
||||
}
|
||||
startSize := fi.Size()
|
||||
|
||||
// TODO: use inotify for file, and end with file deletion or replacement
|
||||
lineChan, errChan, err := util.TailFile(s.ctx, log.Filename, log.Offset, io.SeekStart)
|
||||
if err != nil {
|
||||
return fmt.Errorf("tailing file: %w", err)
|
||||
}
|
||||
for {
|
||||
select {
|
||||
case err := <-errChan:
|
||||
return fmt.Errorf("tailing file: %w", err)
|
||||
case line := <-lineChan:
|
||||
if line.EndOffset > startSize {
|
||||
select {
|
||||
case sigCatchupped <- struct{}{}:
|
||||
default:
|
||||
}
|
||||
}
|
||||
|
||||
select {
|
||||
case s.sendQueue <- mLog{
|
||||
HostInfo: s.hostInfo,
|
||||
Filename: *line.Filename,
|
||||
Offset: line.EndOffset,
|
||||
Content: line.String,
|
||||
}:
|
||||
default:
|
||||
promShipperDropped.WithLabelValues(*line.Filename).Add(1)
|
||||
}
|
||||
// TODO:
|
||||
// default:
|
||||
// return nil
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func mWithErr[t interface{ Err() error }](mongoWrap t) (t, error) {
|
||||
return mongoWrap, mongoWrap.Err()
|
||||
}
|
||||
|
||||
// func JitterUntilCancelWithContext(pctx context.Context, f func(context.Context, context.CancelFunc), period time.Duration, jitterFactor float64, sliding bool) {
|
||||
// ctx, cancel := context.WithCancel(pctx)
|
||||
// wait.JitterUntil(func() { f(ctx, cancel) }, period, jitterFactor, sliding, ctx.Done())
|
||||
// }
|
Reference in New Issue
Block a user