restructure project

This commit is contained in:
rasmus 2022-11-09 18:07:28 +02:00
parent 86609d9347
commit 7e59c24c13
18 changed files with 498 additions and 450 deletions

View File

@ -1,108 +0,0 @@
package logmower
import (
"context"
"log"
"time"
ms "git.k-space.ee/k-space/logmower-shipper/pkg/mongo_struct"
"github.com/jtagcat/util"
prom "github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/client_golang/prometheus/promauto"
"go.mongodb.org/mongo-driver/mongo"
)
var (
promShipperQueued = promauto.NewGaugeVec(prom.GaugeOpts{
Namespace: PrometheusPrefix,
// Subsystem: "shipper",
Name: "shipper_record", // "queued",
Help: "Log records in queue to be batched and sent to database",
}, []string{"filename"})
promShipperDbSent = promauto.NewCounterVec(prom.CounterOpts{
Namespace: PrometheusPrefix,
// Subsystem: "shipper",
Name: "record", // "sent",
Help: "Log records successfully committed to database",
}, []string{"filename"})
promShipperBatchSizeResult = promauto.NewHistogram(prom.HistogramOpts{
Namespace: PrometheusPrefix,
// Subsystem: "shipper",
Name: "bulk_submission_message", // "items_in_batch"
Help: "Batch size for database submissions",
Buckets: []float64{1, 5, 10, 50, 100, 500, 1000, 5000, 10000},
})
promShipperDbSendError = promauto.NewCounterVec(prom.CounterOpts{
Namespace: PrometheusPrefix,
// Subsystem: "shipper",
Name: "insertion_error", // "errors",
Help: "Errors while submitting to database", // TODO:
}, []string{"filename"})
promShipperSynced = promauto.NewGaugeVec(prom.GaugeOpts{
Namespace: PrometheusPrefix,
Subsystem: "shipper",
Name: "batches_synced",
Help: "All batches available have been committed database (0 or 1)",
}, []string{"filename"})
)
const (
MaxBatchItems = 10000
MaxBatchTime = 5 * time.Second
)
type queueT <-chan ms.Record
func (queue queueT) sender(db *mongo.Collection, metricsFilename string) {
batched := make(chan []ms.Record)
// batcher and queue metrics
go func() {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
go func() {
for {
promShipperQueued.WithLabelValues(metricsFilename).Set(float64(
len(queue)))
timer := time.NewTimer(time.Second)
select {
case <-ctx.Done():
return
case <-timer.C:
}
}
}()
util.Batch(MaxBatchItems, MaxBatchTime, queue, batched)
// returns when sendQueue is closed
}()
for {
promShipperSynced.WithLabelValues(metricsFilename).Set(1)
batch, ok := <-batched
if !ok {
return
}
promShipperBatchSizeResult.Observe(float64(len(batch)))
promShipperSynced.WithLabelValues(metricsFilename).Set(0)
var batchBson []interface{} // mongo does not like typing
for _, b := range batch {
batchBson = append(batchBson, b.ToBson())
}
result, err := db.InsertMany(mongoTimeoutCtx(context.Background()), batchBson, nil)
if err != nil {
promShipperDbSendError.WithLabelValues(metricsFilename).Add(1)
log.Printf("failure in batch submit to database: %e", err) // TODO: add some selective retry here or something, better error handling
continue
}
promShipperDbSent.WithLabelValues(metricsFilename).Add(float64(
len(result.InsertedIDs)))
}
}

View File

@ -1,168 +0,0 @@
package logmower
import (
"context"
"errors"
"fmt"
"io"
"log"
"os"
"time"
ms "git.k-space.ee/k-space/logmower-shipper/pkg/mongo_struct"
"github.com/jtagcat/util"
prom "github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/client_golang/prometheus/promauto"
"go.mongodb.org/mongo-driver/bson"
"go.mongodb.org/mongo-driver/mongo"
mongoOpt "go.mongodb.org/mongo-driver/mongo/options"
"k8s.io/apimachinery/pkg/util/wait"
)
var (
promFileInitialSeekSkipped = promauto.NewGaugeVec(prom.GaugeOpts{
Namespace: PrometheusPrefix,
// Subsystem: "file",
Name: "skipped_bytes",
Help: "Bytes skipped in file after discovering",
}, []string{"filename"})
promFileCatchupDone = promauto.NewGaugeVec(prom.GaugeOpts{
Namespace: PrometheusPrefix,
Subsystem: "file",
Name: "catchupped",
Help: "(0 or) 1 if initial backlog has been sent; (total <= watcher_file_count)",
}, []string{"filename"}) // TODO: rm filename?
promFileErr = promauto.NewCounterVec(prom.CounterOpts{
Namespace: PrometheusPrefix,
Subsystem: "file",
Name: "errors_count",
Help: "Errors while reading file",
}, []string{"filename"})
promFileLineSize = promauto.NewHistogramVec(prom.HistogramOpts{
Namespace: PrometheusPrefix,
// Subsystem: "file",
Name: "line_size_bytes",
Help: "Log line size in bytes",
Buckets: []float64{80, 160, 320, 640, 1280},
}, []string{"filename"})
)
const SendQueueLimit = 1024
type file struct {
ms.File
metricsName string // filepath.Base()
}
// TODO: caller could call duplicate shipFile of same name on file replace: sends might not work properly
func (f file) Process(ctx context.Context, db *mongo.Collection, recordLimitBytes int) {
lineChan := make(chan RawLine)
defer close(lineChan)
dbQueue := make(chan ms.Record, SendQueueLimit)
go RawLines(lineChan).Process(recordLimitBytes, dbQueue)
waitGo := util.GoWg(func() {
queueT(dbQueue).sender(db, f.metricsName)
})
defer waitGo()
// TODO: better way to kill or wait for sendQueue before retrying (or duplicates?)
_ = wait.ManagedExponentialBackoffWithContext(ctx, defaultBackoff(), func() (done bool, _ error) {
err := f.trySubmit(ctx, db, lineChan)
if err == nil {
return true, nil
}
promFileErr.WithLabelValues(f.metricsName).Add(1)
log.Printf("processing file %q: %e", f.metricsName, err)
// nil: loop and keep retrying indefinitely
return false, nil
})
}
// use submitter(), don't use directly
func (f file) trySubmit(ctx context.Context, db *mongo.Collection, sendQueue chan<- RawLine) error {
// TODO: better way for respecting ?killing sender for retry
for {
if len(sendQueue) == 0 {
break
}
time.Sleep(time.Second)
}
// get files with offset
offsetResult, _ := mongoWithErr(db.FindOne(mongoTimeoutCtx(ctx),
bson.D{{Key: ms.RecordKeyHostId, Value: f.Host.Id}, {Key: ms.RecordKeyFilePath, Value: f.Path}},
&mongoOpt.FindOneOptions{Sort: bson.D{{Key: ms.RecordKeyOffset, Value: -1}}}, // sort descending (get largest)
))
offsetResultBytes, err := offsetResult.DecodeBytes()
if err != nil && !errors.Is(err, mongo.ErrNoDocuments) {
return fmt.Errorf("retrieving offset from database: %w", err)
}
dbOffset := ms.RecordOffsetFromBson(&offsetResultBytes)
fi, err := os.Stat(f.Path)
if err != nil {
return fmt.Errorf("getting original file size: %w", err)
}
startSize := fi.Size()
sctx, cancel := context.WithCancel(ctx)
defer cancel()
promFileInitialSeekSkipped.WithLabelValues(f.metricsName).Set(float64(dbOffset))
lineChan, errChan, err := util.TailFile(sctx, f.Path, dbOffset, io.SeekStart)
if err != nil {
return fmt.Errorf("tailing file: %w", err)
}
var catchUpped bool
promFileCatchupDone.WithLabelValues(f.metricsName).Set(0)
for {
select {
case err := <-errChan:
return fmt.Errorf("tailing file: %w", err)
case line, ok := <-lineChan:
if !ok {
return nil
}
promFileLineSize.WithLabelValues(f.metricsName).Observe(float64(len(line.Bytes)))
if !catchUpped {
catchUpped = line.EndOffset >= startSize
if catchUpped {
promFileCatchupDone.WithLabelValues(f.metricsName).Set(1)
}
}
if len(line.Bytes) == 0 {
continue
}
sendQueue <- RawLine{
file: &f,
Offset: line.EndOffset,
B: line.Bytes,
}
}
}
}
func mongoWithErr[t interface{ Err() error }](mongoWrap t) (t, error) {
return mongoWrap, mongoWrap.Err()
}
// func JitterUntilCancelWithContext(pctx context.Context, f func(context.Context, context.CancelFunc), period time.Duration, jitterFactor float64, sliding bool) {
// ctx, cancel := context.WithCancel(pctx)
// wait.JitterUntil(func() { f(ctx, cancel) }, period, jitterFactor, sliding, ctx.Done())
// }

View File

@ -7,12 +7,10 @@ import (
"net/http" "net/http"
"os" "os"
"git.k-space.ee/k-space/logmower-shipper/logmower" "git.k-space.ee/k-space/logmower-shipper/pkg/watcher"
"github.com/prometheus/client_golang/prometheus/promhttp" "github.com/prometheus/client_golang/prometheus/promhttp"
) )
const PrometheusPrefix = "logmower" // TODO:
func main() { func main() {
go func() { go func() {
metricsPort := 2112 metricsPort := 2112
@ -26,7 +24,7 @@ func main() {
} }
}() }()
if err := logmower.App.Run(os.Args); err != nil { if err := watcher.App.Run(os.Args); err != nil {
log.Fatal(err) log.Fatal(err)
} }
} }

143
pkg/file/file.go Normal file
View File

@ -0,0 +1,143 @@
package file
import (
"context"
"errors"
"fmt"
"io"
"log"
"os"
"time"
"git.k-space.ee/k-space/logmower-shipper/pkg/globals"
"git.k-space.ee/k-space/logmower-shipper/pkg/lines"
m "git.k-space.ee/k-space/logmower-shipper/pkg/mongo"
"git.k-space.ee/k-space/logmower-shipper/pkg/sender"
"github.com/jtagcat/util"
"go.mongodb.org/mongo-driver/bson"
"go.mongodb.org/mongo-driver/mongo"
mongoOpt "go.mongodb.org/mongo-driver/mongo/options"
"k8s.io/apimachinery/pkg/util/wait"
)
const SendQueueLimit = 1024
type File struct {
*m.File
MetricsName string // filepath.Base()
}
// TODO: caller could call duplicate shipFile of same name on file replace: sends might not work properly
func (f File) Process(ctx context.Context, db *mongo.Collection, recordLimitBytes int) {
lineChan := make(chan lines.Raw)
defer close(lineChan)
dbQueue := make(chan m.Record, SendQueueLimit)
go lines.RawC(lineChan).Process(recordLimitBytes, dbQueue)
waitGo := util.GoWg(func() {
sender.Queue(dbQueue).Sender(db, f.MetricsName)
})
defer waitGo()
// TODO: better way to kill or wait for sendQueue before retrying (or duplicates?)
_ = wait.ManagedExponentialBackoffWithContext(ctx, globals.Backoff(), func() (done bool, _ error) {
err := f.trySubmit(ctx, db, lineChan)
if err == nil {
return true, nil
}
promFileErr.WithLabelValues(f.MetricsName).Add(1)
log.Printf("processing file %q: %e", f.MetricsName, err)
// nil: loop and keep retrying indefinitely
return false, nil
})
}
// use submitter(), don't use directly
func (f File) trySubmit(ctx context.Context, db *mongo.Collection, sendQueue chan<- lines.Raw) error {
lFile := lines.File(f) // file.File, but avoiding import cycle
// TODO: better way for respecting ?killing sender for retry
for {
if len(sendQueue) == 0 {
break
}
time.Sleep(time.Second)
}
// get files with offset
offsetResult, _ := mongoWithErr(db.FindOne(globals.MongoTimeout(ctx),
bson.D{{Key: m.RecordKeyHostId, Value: f.Host.Id}, {Key: m.RecordKeyFilePath, Value: f.Path}},
&mongoOpt.FindOneOptions{Sort: bson.D{{Key: m.RecordKeyOffset, Value: -1}}}, // sort descending (get largest)
))
offsetResultBytes, err := offsetResult.DecodeBytes()
if err != nil && !errors.Is(err, mongo.ErrNoDocuments) {
return fmt.Errorf("retrieving offset from database: %w", err)
}
dbOffset := m.RecordOffsetFromBson(&offsetResultBytes)
fi, err := os.Stat(f.Path)
if err != nil {
return fmt.Errorf("getting original file size: %w", err)
}
startSize := fi.Size()
sctx, cancel := context.WithCancel(ctx)
defer cancel()
promFileInitialSeekSkipped.WithLabelValues(f.MetricsName).Set(float64(dbOffset))
lineChan, errChan, err := util.TailFile(sctx, f.Path, dbOffset, io.SeekStart)
if err != nil {
return fmt.Errorf("tailing file: %w", err)
}
var catchUpped bool
promFileCatchupDone.WithLabelValues(f.MetricsName).Set(0)
for {
select {
case err := <-errChan:
return fmt.Errorf("tailing file: %w", err)
case line, ok := <-lineChan:
if !ok {
return nil
}
promFileLineSize.WithLabelValues(f.MetricsName).Observe(float64(len(line.Bytes)))
if !catchUpped {
catchUpped = line.EndOffset >= startSize
if catchUpped {
promFileCatchupDone.WithLabelValues(f.MetricsName).Set(1)
}
}
if len(line.Bytes) == 0 {
continue
}
sendQueue <- lines.Raw{
File: &lFile,
Offset: line.EndOffset,
B: line.Bytes,
}
}
}
}
func mongoWithErr[t interface{ Err() error }](mongoWrap t) (t, error) {
return mongoWrap, mongoWrap.Err()
}
// func JitterUntilCancelWithContext(pctx context.Context, f func(context.Context, context.CancelFunc), period time.Duration, jitterFactor float64, sliding bool) {
// ctx, cancel := context.WithCancel(pctx)
// wait.JitterUntil(func() { f(ctx, cancel) }, period, jitterFactor, sliding, ctx.Done())
// }

35
pkg/file/metrics.go Normal file
View File

@ -0,0 +1,35 @@
package file
import (
"git.k-space.ee/k-space/logmower-shipper/pkg/globals"
"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/client_golang/prometheus/promauto"
)
var (
promFileInitialSeekSkipped = promauto.NewGaugeVec(prometheus.GaugeOpts{
Namespace: globals.PrometheusPrefix,
// Subsystem: "file",
Name: "skipped_bytes",
Help: "Bytes skipped in file after discovering",
}, []string{"filename"})
promFileCatchupDone = promauto.NewGaugeVec(prometheus.GaugeOpts{
Namespace: globals.PrometheusPrefix,
Subsystem: "file",
Name: "catchupped",
Help: "(0 or) 1 if initial backlog has been sent; (total <= watcher_file_count)",
}, []string{"filename"}) // TODO: rm filename?
promFileErr = promauto.NewCounterVec(prometheus.CounterOpts{
Namespace: globals.PrometheusPrefix,
Subsystem: "file",
Name: "errors_count",
Help: "Errors while reading file",
}, []string{"filename"})
promFileLineSize = promauto.NewHistogramVec(prometheus.HistogramOpts{
Namespace: globals.PrometheusPrefix,
// Subsystem: "file",
Name: "line_size_bytes",
Help: "Log line size in bytes",
Buckets: []float64{80, 160, 320, 640, 1280},
}, []string{"filename"})
)

31
pkg/globals/globals.go Normal file
View File

@ -0,0 +1,31 @@
package globals
import (
"context"
"time"
"k8s.io/apimachinery/pkg/util/wait"
)
// Did not find any better way for multipackage prefixing
const (
PrometheusPrefix = "logmower"
AppName = PrometheusPrefix + "shipper"
DatabaseCommandTimeout = 10 * time.Second
)
func MongoTimeout(ctx context.Context) context.Context {
ctx, _ = context.WithTimeout(ctx, DatabaseCommandTimeout) //nolint:lostcancel (cancelled by mongo, should be bug on them //TODO)
return ctx
}
// wrapper to force copying before use
func Backoff() wait.Backoff {
return wait.Backoff{
Duration: 2 * time.Second,
Factor: 1.5,
Jitter: 0.1,
Cap: 30 * time.Second,
}
}

View File

@ -1,32 +1,29 @@
package logmower package lines
import ( import (
"log" "log"
"sync" "sync"
ms "git.k-space.ee/k-space/logmower-shipper/pkg/mongo_struct" m "git.k-space.ee/k-space/logmower-shipper/pkg/mongo"
prom "github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/client_golang/prometheus/promauto"
) )
var promRecordDroppedTooLarge = promauto.NewCounterVec(prom.CounterOpts{
Namespace: PrometheusPrefix,
// Subsystem: "record",
Name: "dropped_lines", // "dropped",
Help: "Records dropped due to being too large",
}, []string{"filename"})
type ( type (
RawLines <-chan RawLine RawC <-chan Raw
RawLine struct { Raw struct {
*file *File
Offset int64 Offset int64
B []byte B []byte
} }
// file.File, but avoiding import cycle
File struct {
*m.File
MetricsName string // filepath.Base()
}
) )
// assumes all lines are from same file // assumes all lines are from same file
func (unparsed RawLines) Process(bufferLimitBytes int, parsed chan<- ms.Record) { func (unparsed RawC) Process(bufferLimitBytes int, parsed chan<- m.Record) {
lines := make(chan singleLine) lines := make(chan singleLine)
go unparsed.parse(lines) go unparsed.parse(lines)
@ -62,8 +59,8 @@ func (unparsed RawLines) Process(bufferLimitBytes int, parsed chan<- ms.Record)
} }
} }
func (lines singleLines) process(bufferLimitBytes int, parsed chan<- ms.Record) { func (lines singleLines) process(bufferLimitBytes int, parsed chan<- m.Record) {
var firstMetadata *ms.ParsedMetadata var firstMetadata *m.ParsedMetadata
var buffer []byte var buffer []byte
for { for {
@ -81,7 +78,7 @@ func (lines singleLines) process(bufferLimitBytes int, parsed chan<- ms.Record)
buffer = append(buffer, line.B...) buffer = append(buffer, line.B...)
if len(buffer) > bufferLimitBytes { if len(buffer) > bufferLimitBytes {
promRecordDroppedTooLarge.WithLabelValues(line.metricsName).Add(1) promRecordDroppedTooLarge.WithLabelValues(line.MetricsName).Add(1)
log.Printf("dropped record: size in bytes exceeds limit of %d", bufferLimitBytes) log.Printf("dropped record: size in bytes exceeds limit of %d", bufferLimitBytes)
buffer = nil buffer = nil
@ -89,8 +86,8 @@ func (lines singleLines) process(bufferLimitBytes int, parsed chan<- ms.Record)
} }
if !line.partial { if !line.partial {
parsed <- ms.Record{ parsed <- m.Record{
File: line.file.File, File: line.File.File,
Offset: line.Offset, Offset: line.Offset,
String: string(buffer), String: string(buffer),

View File

@ -1,4 +1,4 @@
package logmower package lines
import ( import (
"bytes" "bytes"
@ -6,19 +6,10 @@ import (
"log" "log"
"time" "time"
ms "git.k-space.ee/k-space/logmower-shipper/pkg/mongo_struct" m "git.k-space.ee/k-space/logmower-shipper/pkg/mongo"
prom "github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/client_golang/prometheus/promauto"
) )
var promRecordPrefixParsingErr = promauto.NewCounterVec(prom.CounterOpts{ func (unparsed RawC) parse(parsed chan<- singleLine) {
Namespace: PrometheusPrefix,
Subsystem: "record",
Name: "parsing_errors",
Help: "Errors while parsing log line prefixes",
}, []string{"filename"})
func (unparsed RawLines) parse(parsed chan<- singleLine) {
for { for {
raw, ok := <-unparsed raw, ok := <-unparsed
if !ok { if !ok {
@ -26,10 +17,10 @@ func (unparsed RawLines) parse(parsed chan<- singleLine) {
return return
} }
line := singleLine{RawLine: raw} line := singleLine{Raw: raw}
if err := line.parse(); err != nil { if err := line.parse(); err != nil {
promRecordPrefixParsingErr.WithLabelValues(raw.metricsName).Add(1) promRecordPrefixParsingErr.WithLabelValues(raw.MetricsName).Add(1)
log.Printf("parsing kubernetes log line in %q: %e", raw.File.Path, err) log.Printf("parsing kubernetes log line in %q: %e", raw.File.Path, err)
} }
@ -41,10 +32,10 @@ func (unparsed RawLines) parse(parsed chan<- singleLine) {
type ( type (
singleLines <-chan singleLine singleLines <-chan singleLine
singleLine struct { singleLine struct {
RawLine Raw
// populated by parse() // populated by parse()
ms.ParsedMetadata m.ParsedMetadata
partial bool // P or F partial bool // P or F
} }
) )

23
pkg/lines/metrics.go Normal file
View File

@ -0,0 +1,23 @@
package lines
import (
"git.k-space.ee/k-space/logmower-shipper/pkg/globals"
prom "github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/client_golang/prometheus/promauto"
)
var (
promRecordDroppedTooLarge = promauto.NewCounterVec(prom.CounterOpts{
Namespace: globals.PrometheusPrefix,
// Subsystem: "record",
Name: "dropped_lines", // "dropped",
Help: "Records dropped due to being too large",
}, []string{"filename"})
promRecordPrefixParsingErr = promauto.NewCounterVec(prom.CounterOpts{
Namespace: globals.PrometheusPrefix,
Subsystem: "record",
Name: "parsing_errors",
Help: "Errors while parsing log line prefixes",
}, []string{"filename"})
)

View File

@ -1,4 +1,4 @@
package mongo_struct package mongo
import ( import (
"time" "time"

View File

@ -1,46 +1,42 @@
package logmower package mongo
import ( import (
"context" "context"
"fmt"
"log" "log"
"net/url"
"time" "time"
ms "git.k-space.ee/k-space/logmower-shipper/pkg/mongo_struct" "git.k-space.ee/k-space/logmower-shipper/pkg/globals"
prom "github.com/prometheus/client_golang/prometheus" prom "github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/client_golang/prometheus/promauto" "github.com/prometheus/client_golang/prometheus/promauto"
mongoEvent "go.mongodb.org/mongo-driver/event" mongoEvent "go.mongodb.org/mongo-driver/event"
"go.mongodb.org/mongo-driver/mongo"
mongoOpt "go.mongodb.org/mongo-driver/mongo/options" mongoOpt "go.mongodb.org/mongo-driver/mongo/options"
) )
const promSubsystem = "database"
var ( var (
promDbHeartbeat = promauto.NewHistogramVec(prom.HistogramOpts{ promDbHeartbeat = promauto.NewHistogramVec(prom.HistogramOpts{
Namespace: PrometheusPrefix, Namespace: globals.PrometheusPrefix, Subsystem: promSubsystem,
Subsystem: "database", Name: "heartbeat_time",
Name: "heartbeat_time", Help: "Time in seconds for succeeded heartbeat, or 0 on failure",
Help: "Time in seconds for succeeded heartbeat, or 0 on failure", Buckets: []float64{0.1, 0.2, 0.5, 1, 5, 10, 50},
Buckets: []float64{0.1, 0.2, 0.5, 1, 5, 10, 50},
}, []string{"connection_id"}) }, []string{"connection_id"})
promDbCmd = promauto.NewHistogramVec(prom.HistogramOpts{ promDbCmd = promauto.NewHistogramVec(prom.HistogramOpts{
Namespace: PrometheusPrefix, Namespace: globals.PrometheusPrefix, Subsystem: promSubsystem,
Subsystem: "database", Name: "operation_latency", // "command_time",
Name: "operation_latency", // "command_time", Help: "Time in seconds of commands",
Help: "Time in seconds of commands", Buckets: []float64{0.1, 0.2, 0.5, 1, 5, 10, 50},
Buckets: []float64{0.1, 0.2, 0.5, 1, 5, 10, 50},
}, []string{"connection_id", "command_name"}) }, []string{"connection_id", "command_name"})
promDbCmdErr = promauto.NewCounterVec(prom.CounterOpts{ promDbCmdErr = promauto.NewCounterVec(prom.CounterOpts{
Namespace: PrometheusPrefix, Namespace: globals.PrometheusPrefix, Subsystem: promSubsystem,
Subsystem: "database", Name: "errors",
Name: "errors", Help: "Failed commands (also reflected elsewhere)",
Help: "Failed commands (also reflected elsewhere)",
}, []string{"connection_id", "command_name"}) }, []string{"connection_id", "command_name"})
) )
func mongoMonitoredClientOptions() *mongoOpt.ClientOptions { func monitoredClientOptions() *mongoOpt.ClientOptions {
return mongoOpt.Client(). return mongoOpt.Client().
SetServerMonitor(&mongoEvent.ServerMonitor{ SetServerMonitor(&mongoEvent.ServerMonitor{
ServerHeartbeatSucceeded: func(ev *mongoEvent.ServerHeartbeatSucceededEvent) { ServerHeartbeatSucceeded: func(ev *mongoEvent.ServerHeartbeatSucceededEvent) {
@ -62,30 +58,3 @@ func mongoMonitoredClientOptions() *mongoOpt.ClientOptions {
}, },
}) })
} }
func initDatabase(ctx context.Context, uri string) (*mongo.Collection, error) {
uriParsed, err := url.ParseRequestURI(uri)
if err != nil {
return nil, fmt.Errorf("parsing URI for database name: %w", err)
}
uriParsed.Path = uriParsed.Path[1:] // remove leading slash
if uriParsed.Path == "" {
return nil, fmt.Errorf("URI must include database name (as database to authenticate against)")
}
dbOpt := mongoMonitoredClientOptions().ApplyURI(uri)
dbClient, err := mongo.Connect(mongoTimeoutCtx(ctx))
if err != nil {
return nil, fmt.Errorf("connecting to %q: %w", dbOpt.GetURI(), err)
}
col := dbClient.Database(uriParsed.Path).Collection("logs")
if err := ms.InitializeIndexes(mongoTimeoutCtx(ctx), col); err != nil {
return nil, fmt.Errorf("initializing indexes: %w", err)
}
return col, nil
}

37
pkg/mongo/mongo.go Normal file
View File

@ -0,0 +1,37 @@
package mongo
import (
"context"
"fmt"
"net/url"
"git.k-space.ee/k-space/logmower-shipper/pkg/globals"
"go.mongodb.org/mongo-driver/mongo"
)
func Initialize(ctx context.Context, uri string) (*mongo.Collection, error) {
uriParsed, err := url.ParseRequestURI(uri)
if err != nil {
return nil, fmt.Errorf("parsing URI for database name: %w", err)
}
uriParsed.Path = uriParsed.Path[1:] // remove leading slash
if uriParsed.Path == "" {
return nil, fmt.Errorf("URI must include database name (as database to authenticate against)")
}
dbOpt := monitoredClientOptions().ApplyURI(uri)
dbClient, err := mongo.Connect(globals.MongoTimeout(ctx))
if err != nil {
return nil, fmt.Errorf("connecting to %q: %w", dbOpt.GetURI(), err)
}
col := dbClient.Database(uriParsed.Path).Collection("logs")
if err := InitializeIndexes(globals.MongoTimeout(ctx), col); err != nil {
return nil, fmt.Errorf("initializing indexes: %w", err)
}
return col, nil
}

View File

@ -1,4 +1,4 @@
package mongo_struct package mongo
import ( import (
"context" "context"
@ -23,7 +23,7 @@ func InitializeIndexes(ctx context.Context, col *mongo.Collection) error {
// when editing, also edit everything in this file! // when editing, also edit everything in this file!
type ( type (
Record struct { Record struct {
File *File
Offset int64 // end, of last line Offset int64 // end, of last line
String string String string
@ -42,13 +42,13 @@ type (
Content any Content any
TimeUpstream time.Time TimeUpstream time.Time
} }
HostInfo struct { Host struct {
Id string Id string
Name string Name string
Arch string Arch string
} }
File struct { File struct {
Host *HostInfo Host *Host
Path string // absolute Path string // absolute
KubeInfo KubeInfo
} }

41
pkg/sender/metrics.go Normal file
View File

@ -0,0 +1,41 @@
package sender
import (
"git.k-space.ee/k-space/logmower-shipper/pkg/globals"
"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/client_golang/prometheus/promauto"
)
var (
promShipperQueued = promauto.NewGaugeVec(prometheus.GaugeOpts{
Namespace: globals.PrometheusPrefix,
// Subsystem: "shipper",
Name: "shipper_record", // "queued",
Help: "Log records in queue to be batched and sent to database",
}, []string{"filename"})
promShipperDbSent = promauto.NewCounterVec(prometheus.CounterOpts{
Namespace: globals.PrometheusPrefix,
// Subsystem: "shipper",
Name: "record", // "sent",
Help: "Log records successfully committed to database",
}, []string{"filename"})
promShipperBatchSizeResult = promauto.NewHistogram(prometheus.HistogramOpts{
Namespace: globals.PrometheusPrefix,
// Subsystem: "shipper",
Name: "bulk_submission_message", // "items_in_batch"
Help: "Batch size for database submissions",
Buckets: []float64{1, 5, 10, 50, 100, 500, 1000, 5000, 10000},
})
promShipperDbSendError = promauto.NewCounterVec(prometheus.CounterOpts{
Namespace: globals.PrometheusPrefix,
// Subsystem: "shipper",
Name: "insertion_error", // "errors",
Help: "Errors while submitting to database", // TODO:
}, []string{"filename"})
promShipperSynced = promauto.NewGaugeVec(prometheus.GaugeOpts{
Namespace: globals.PrometheusPrefix,
Subsystem: "shipper",
Name: "batches_synced",
Help: "All batches available have been committed database (0 or 1)",
}, []string{"filename"})
)

73
pkg/sender/sender.go Normal file
View File

@ -0,0 +1,73 @@
package sender
import (
"context"
"log"
"time"
"git.k-space.ee/k-space/logmower-shipper/pkg/globals"
m "git.k-space.ee/k-space/logmower-shipper/pkg/mongo"
"github.com/jtagcat/util"
"go.mongodb.org/mongo-driver/mongo"
)
const (
MaxBatchItems = 10000
MaxBatchTime = 5 * time.Second
)
type Queue <-chan m.Record
func (queue Queue) Sender(db *mongo.Collection, metricsFilename string) {
batched := make(chan []m.Record)
// metrics for batcher and queue
go func() {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
go func() {
for {
promShipperQueued.WithLabelValues(metricsFilename).Set(float64(
len(queue)))
timer := time.NewTimer(time.Second)
select {
case <-ctx.Done():
return
case <-timer.C:
}
}
}()
util.Batch(MaxBatchItems, MaxBatchTime, queue, batched)
// returns when sendQueue is closed
}()
for {
promShipperSynced.WithLabelValues(metricsFilename).Set(1)
batch, ok := <-batched
if !ok {
return
}
promShipperBatchSizeResult.Observe(float64(len(batch)))
promShipperSynced.WithLabelValues(metricsFilename).Set(0)
var batchBson []interface{} // mongo does not like typing
for _, b := range batch {
batchBson = append(batchBson, b.ToBson())
}
result, err := db.InsertMany(globals.MongoTimeout(context.Background()), batchBson, nil)
if err != nil {
promShipperDbSendError.WithLabelValues(metricsFilename).Add(1)
log.Printf("failure in batch submit to database: %e", err) // TODO: add some selective retry here or something, better error handling
continue
}
promShipperDbSent.WithLabelValues(metricsFilename).Add(float64(
len(result.InsertedIDs)))
}
}

View File

@ -1,6 +1,4 @@
package mongo_struct package util
// TODO: this is misc collection of stuff not really fitting in here
import ( import (
"fmt" "fmt"
@ -8,42 +6,44 @@ import (
"path/filepath" "path/filepath"
"runtime" "runtime"
"strings" "strings"
m "git.k-space.ee/k-space/logmower-shipper/pkg/mongo"
) )
func ParseLogName(name string) (m KubeInfo, ok bool) { func ParseLogFilename(name string) (i m.KubeInfo, ok bool) {
name = filepath.Base(name) name = filepath.Base(name)
// https://github.com/kubernetes/design-proposals-archive/blob/8da1442ea29adccea40693357d04727127e045ed/node/kubelet-cri-logging.md // https://github.com/kubernetes/design-proposals-archive/blob/8da1442ea29adccea40693357d04727127e045ed/node/kubelet-cri-logging.md
// <pod_name>_<pod_namespace>_<container_name>-<container_id>.log` // <pod_name>_<pod_namespace>_<container_name>-<container_id>.log`
m.Pod, name, ok = strings.Cut(name, "_") i.Pod, name, ok = strings.Cut(name, "_")
if !ok { if !ok {
return return
} }
m.Namespace, name, ok = strings.Cut(name, "_") i.Namespace, name, ok = strings.Cut(name, "_")
if !ok { if !ok {
return return
} }
m.ContainerName, name, ok = strings.Cut(name, "-") i.ContainerName, name, ok = strings.Cut(name, "-")
if !ok { if !ok {
return return
} }
m.ContainerId = strings.TrimSuffix(name, ".log") i.ContainerId = strings.TrimSuffix(name, ".log")
if !strings.HasSuffix(name, ".log") { if !strings.HasSuffix(name, ".log") {
return return
} }
return m, true return i, true
} }
func (h *HostInfo) Populate(nodeName string) (err error) { func Hostinfo(nodeName string) (h m.Host, err error) {
if nodeName == "" { if nodeName == "" {
nodeName, err = os.Hostname() nodeName, err = os.Hostname()
if err != nil { if err != nil {
return fmt.Errorf("getting hostname: %w", err) return h, fmt.Errorf("getting hostname: %w", err)
} }
} }
@ -51,12 +51,12 @@ func (h *HostInfo) Populate(nodeName string) (err error) {
id, err := os.ReadFile("/etc/machine-id") id, err := os.ReadFile("/etc/machine-id")
if err != nil { if err != nil {
return fmt.Errorf("getting machineId: %w", err) return h, fmt.Errorf("getting machineId: %w", err)
} }
h.Id = strings.TrimSpace(string(id)) h.Id = strings.TrimSpace(string(id))
h.Arch = runtime.GOARCH h.Arch = runtime.GOARCH
return nil return h, nil
} }

40
pkg/watcher/metrics.go Normal file
View File

@ -0,0 +1,40 @@
package watcher
import (
"git.k-space.ee/k-space/logmower-shipper/pkg/globals"
prom "github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/client_golang/prometheus/promauto"
)
var (
promWatcherOnline = promauto.NewGauge(prom.GaugeOpts{
Namespace: globals.PrometheusPrefix,
Subsystem: "watcher",
Name: "online",
Help: "1 if initialized, and directory watcher has been engaged successfully",
})
promWatcherErr = promauto.NewCounter(prom.CounterOpts{
Namespace: globals.PrometheusPrefix,
Subsystem: "watcher",
Name: "errors",
Help: "Error in logmower watching log files",
})
promWatcherFilesStarted = promauto.NewCounter(prom.CounterOpts{
Namespace: globals.PrometheusPrefix,
// Subsystem: "watcher",
Name: "log_file", // "discovered_logfiles",
Help: "Number of tracked log files",
})
promWatcherFilesSkipped = promauto.NewCounter(prom.CounterOpts{
Namespace: globals.PrometheusPrefix,
// Subsystem: "watcher",
Name: "invalid_filename", // "skipped_files",
Help: "Number of files in log directory skipped due to unexpected filename",
})
promWatcherEvents = promauto.NewCounter(prom.CounterOpts{
Namespace: globals.PrometheusPrefix,
// Subsystem: "watcher",
Name: "inotify_event", // "events",
Help: "Number of events while watchng (includes initial create events for existing file discovery)",
})
)

View File

@ -1,46 +1,25 @@
package logmower package watcher
import ( import (
"context"
"fmt" "fmt"
"log" "log"
"os" "os"
"os/signal" "os/signal"
"path/filepath" "path/filepath"
"sync" "sync"
"time"
ms "git.k-space.ee/k-space/logmower-shipper/pkg/mongo_struct" "git.k-space.ee/k-space/logmower-shipper/pkg/file"
"git.k-space.ee/k-space/logmower-shipper/pkg/globals"
m "git.k-space.ee/k-space/logmower-shipper/pkg/mongo"
"git.k-space.ee/k-space/logmower-shipper/pkg/util"
"github.com/fsnotify/fsnotify" "github.com/fsnotify/fsnotify"
prom "github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/client_golang/prometheus/promauto"
"github.com/urfave/cli/v2" "github.com/urfave/cli/v2"
"k8s.io/apimachinery/pkg/util/wait"
) )
const DatabaseCommandTimeout = 10 * time.Second
const PrometheusPrefix = "logmower" // TODO:
// wrapper to force copying before use
func defaultBackoff() wait.Backoff {
return wait.Backoff{
Duration: 2 * time.Second,
Factor: 1.5,
Jitter: 0.1,
Cap: 30 * time.Second,
}
}
func mongoTimeoutCtx(ctx context.Context) context.Context {
ctx, _ = context.WithTimeout(ctx, DatabaseCommandTimeout) //nolint:lostcancel (cancelled by mongo, should be bug on them //TODO)
return ctx
}
var App = &cli.App{ var App = &cli.App{
Name: "logmower-shipper", Name: globals.AppName,
Version: "1.0.0", Version: "1.0.0",
Authors: []*cli.Author{{Name: "jtagcat"}}, Authors: []*cli.Author{{Name: "jtagcat"}, {Name: "codemowers.io"}},
Description: "Collect and ship kubernetes logs", Description: "Collect and ship kubernetes logs",
// Usage: "rubykana <input>", // Usage: "rubykana <input>",
@ -64,51 +43,18 @@ var App = &cli.App{
}, },
Action: func(ctx *cli.Context) error { Action: func(ctx *cli.Context) error {
var (
promWatcherOnline = promauto.NewGauge(prom.GaugeOpts{
Namespace: PrometheusPrefix,
Subsystem: "watcher",
Name: "online",
Help: "1 if initialized, and directory watcher has been engaged successfully",
})
promWatcherErr = promauto.NewCounter(prom.CounterOpts{
Namespace: PrometheusPrefix,
Subsystem: "watcher",
Name: "errors",
Help: "Error in logmower watching log files",
})
promWatcherFilesStarted = promauto.NewCounter(prom.CounterOpts{
Namespace: PrometheusPrefix,
// Subsystem: "watcher",
Name: "log_file", // "discovered_logfiles",
Help: "Number of tracked log files",
})
promWatcherFilesSkipped = promauto.NewCounter(prom.CounterOpts{
Namespace: PrometheusPrefix,
// Subsystem: "watcher",
Name: "invalid_filename", // "skipped_files",
Help: "Number of files in log directory skipped due to unexpected filename",
})
promWatcherEvents = promauto.NewCounter(prom.CounterOpts{
Namespace: PrometheusPrefix,
// Subsystem: "watcher",
Name: "inotify_event", // "events",
Help: "Number of events while watchng (includes initial create events for existing file discovery)",
})
)
ctx.Context, _ = signal.NotifyContext(ctx.Context, os.Interrupt) // TODO: test ctx.Context, _ = signal.NotifyContext(ctx.Context, os.Interrupt) // TODO: test
var wg sync.WaitGroup var wg sync.WaitGroup
log.Printf("%s %s starting", ctx.App.Name, ctx.App.Version) log.Printf("%s %s starting", ctx.App.Name, ctx.App.Version)
db, err := initDatabase(ctx.Context, ctx.String("mongo-uri")) db, err := m.Initialize(ctx.Context, ctx.String("mongo-uri"))
if err != nil { if err != nil {
return fmt.Errorf("initializing database connection: %w", err) return fmt.Errorf("initializing database connection: %w", err)
} }
var hostInfo ms.HostInfo hostinfo, err := util.Hostinfo(ctx.String("node-name"))
if err := hostInfo.Populate(ctx.String("node-name")); err != nil { if err != nil {
return fmt.Errorf("populating host info: %w", err) return fmt.Errorf("populating host info: %w", err)
} }
@ -137,7 +83,7 @@ var App = &cli.App{
} }
// TODO: #1: || if not in filterset // TODO: #1: || if not in filterset
kubeInfo, ok := ms.ParseLogName(event.Name) kubeInfo, ok := util.ParseLogFilename(event.Name)
if !ok { if !ok {
promWatcherFilesSkipped.Add(1) promWatcherFilesSkipped.Add(1)
log.Printf("skipped %q: filename not parsable in kubernetes log format", filepath.Base(event.Name)) log.Printf("skipped %q: filename not parsable in kubernetes log format", filepath.Base(event.Name))
@ -148,13 +94,13 @@ var App = &cli.App{
wg.Add(1) wg.Add(1)
go func() { go func() {
file := file{ file := file.File{
File: ms.File{ File: &m.File{
Host: &hostInfo, Host: &hostinfo,
KubeInfo: kubeInfo, KubeInfo: kubeInfo,
Path: event.Name, Path: event.Name,
}, },
metricsName: filepath.Base(event.Name), MetricsName: filepath.Base(event.Name),
} }
file.Process(ctx.Context, db, ctx.Int("max-record-size")) file.Process(ctx.Context, db, ctx.Int("max-record-size"))