refactor: use py mongo struct + stdlib log
+ restructure project
This commit is contained in:
144
cmd/line.go
144
cmd/line.go
@@ -1,101 +1,45 @@
|
||||
package logmower
|
||||
|
||||
import (
|
||||
"bytes"
|
||||
"fmt"
|
||||
"log"
|
||||
"sync"
|
||||
"time"
|
||||
|
||||
ms "git.k-space.ee/k-space/logmower-shipper/pkg/mongoStruct"
|
||||
prom "github.com/prometheus/client_golang/prometheus"
|
||||
"github.com/prometheus/client_golang/prometheus/promauto"
|
||||
"go.uber.org/zap"
|
||||
)
|
||||
|
||||
var (
|
||||
promRecordPrefixParsingErr = promauto.NewCounterVec(prom.CounterOpts{
|
||||
Namespace: PrometheusPrefix,
|
||||
Subsystem: "record",
|
||||
Name: "parsing_errors",
|
||||
Help: "Errors while parsing log line prefixes",
|
||||
}, []string{"filename"})
|
||||
promRecordDroppedTooLarge = promauto.NewCounterVec(prom.CounterOpts{
|
||||
Namespace: PrometheusPrefix,
|
||||
// Subsystem: "record",
|
||||
Name: "dropped_lines", // "dropped",
|
||||
Help: "Records dropped due to being too large",
|
||||
}, []string{"filename"})
|
||||
)
|
||||
var promRecordDroppedTooLarge = promauto.NewCounterVec(prom.CounterOpts{
|
||||
Namespace: PrometheusPrefix,
|
||||
// Subsystem: "record",
|
||||
Name: "dropped_lines", // "dropped",
|
||||
Help: "Records dropped due to being too large",
|
||||
}, []string{"filename"})
|
||||
|
||||
type (
|
||||
rawLine struct {
|
||||
RecordMetadata
|
||||
line []byte
|
||||
}
|
||||
|
||||
singleLine struct {
|
||||
mLog
|
||||
line []byte
|
||||
isPartial bool // P or F
|
||||
RawLines <-chan RawLine
|
||||
RawLine struct {
|
||||
*file
|
||||
Offset int64
|
||||
B []byte
|
||||
}
|
||||
)
|
||||
|
||||
func parseSingleContainerLine(line []byte) (u singleLine, err error) {
|
||||
split := bytes.SplitN(line, []byte(" "), 4)
|
||||
if len(split) != 4 {
|
||||
u.line = line
|
||||
return u, fmt.Errorf("expected at least 3 spaces in container log line, got %d", len(split)-1)
|
||||
}
|
||||
|
||||
u.line = split[3]
|
||||
|
||||
u.StdErr = string(split[1]) == "stderr" // or stdout
|
||||
switch string(split[2]) {
|
||||
case "P":
|
||||
u.isPartial = true
|
||||
case "F":
|
||||
default:
|
||||
return u, fmt.Errorf("partial indicator must be 'P' or 'F', not %q", split[2])
|
||||
}
|
||||
|
||||
u.ContainerTime, err = time.Parse(time.RFC3339Nano, string(split[0]))
|
||||
|
||||
return u, err
|
||||
}
|
||||
|
||||
func (s *submitter) parseContainerLogLines(unparsed <-chan rawLine, parsed chan<- singleLine) {
|
||||
for {
|
||||
raw, ok := <-unparsed
|
||||
if !ok {
|
||||
close(parsed)
|
||||
return
|
||||
}
|
||||
|
||||
line, err := parseSingleContainerLine(raw.line)
|
||||
if err != nil {
|
||||
promRecordPrefixParsingErr.WithLabelValues(raw.File).Add(1)
|
||||
s.l.Error("parsing container log line", zap.Error(err), zap.String("file", raw.File))
|
||||
}
|
||||
|
||||
line.mLog.RecordMetadata = raw.RecordMetadata
|
||||
parsed <- line
|
||||
}
|
||||
}
|
||||
|
||||
// assumes all lines are from same file
|
||||
func (s *submitter) parseLines(bufferLimitBytes int, unparsed <-chan rawLine, parsed chan<- mLog) {
|
||||
func (unparsed RawLines) Process(bufferLimitBytes int, parsed chan<- ms.Record) {
|
||||
lines := make(chan singleLine)
|
||||
go s.parseContainerLogLines(unparsed, lines)
|
||||
go unparsed.parse(lines)
|
||||
|
||||
var wg sync.WaitGroup
|
||||
wg.Add(2)
|
||||
|
||||
stdOut, stdErr := make(chan singleLine), make(chan singleLine)
|
||||
go func() {
|
||||
s.parseStdChannel(bufferLimitBytes, stdOut, parsed)
|
||||
singleLines(stdOut).process(bufferLimitBytes, parsed)
|
||||
wg.Done()
|
||||
}()
|
||||
go func() {
|
||||
s.parseStdChannel(bufferLimitBytes, stdErr, parsed)
|
||||
singleLines(stdErr).process(bufferLimitBytes, parsed)
|
||||
wg.Done()
|
||||
}()
|
||||
|
||||
@@ -118,58 +62,42 @@ func (s *submitter) parseLines(bufferLimitBytes int, unparsed <-chan rawLine, pa
|
||||
}
|
||||
}
|
||||
|
||||
// partial is ended with full
|
||||
|
||||
func (s *submitter) parseStdChannel(bufferLimitBytes int, lines <-chan singleLine, parsed chan<- mLog) {
|
||||
var firstTime time.Time
|
||||
func (lines singleLines) process(bufferLimitBytes int, parsed chan<- ms.Record) {
|
||||
var firstMetadata *ms.ParsedMetadata
|
||||
var buffer []byte
|
||||
|
||||
flush := func(last *mLog) {
|
||||
parsed <- mLog{
|
||||
RecordMetadata: last.RecordMetadata,
|
||||
StdErr: last.StdErr,
|
||||
|
||||
ContainerTime: firstTime,
|
||||
Content: parseRecord(buffer),
|
||||
}
|
||||
buffer = nil
|
||||
}
|
||||
|
||||
for {
|
||||
line, ok := <-lines
|
||||
if !ok {
|
||||
// discard any partial lines without end delimiter (full line)
|
||||
// partial line should always be finished with full line
|
||||
// discard any partial lines without end (full line)
|
||||
return
|
||||
}
|
||||
|
||||
if len(buffer) == 0 {
|
||||
firstTime = line.ContainerTime
|
||||
firstMetadata = &line.ParsedMetadata
|
||||
}
|
||||
|
||||
buffer = append(buffer, line.line...)
|
||||
buffer = append(buffer, line.B...)
|
||||
|
||||
if len(buffer) > bufferLimitBytes {
|
||||
promRecordDroppedTooLarge.WithLabelValues(line.metricsName).Add(1)
|
||||
log.Printf("dropped record: size in bytes exceeds limit of %d", bufferLimitBytes)
|
||||
|
||||
buffer = nil
|
||||
promRecordDroppedTooLarge.WithLabelValues(line.File).Add(1)
|
||||
s.l.Warn("dropped record: too large", zap.Int("cap_bytes", bufferLimitBytes))
|
||||
continue
|
||||
}
|
||||
|
||||
if !line.isPartial {
|
||||
flush(&line.mLog)
|
||||
if !line.partial {
|
||||
parsed <- ms.Record{
|
||||
File: line.file.File,
|
||||
Offset: line.Offset,
|
||||
|
||||
String: string(buffer),
|
||||
ParsedMetadata: *firstMetadata,
|
||||
}
|
||||
|
||||
buffer = nil
|
||||
}
|
||||
}
|
||||
// TODO: flush last
|
||||
// use time of first
|
||||
// metadata of last
|
||||
// //
|
||||
// for {
|
||||
// }
|
||||
|
||||
// promRecordDroppedTooLarge
|
||||
}
|
||||
|
||||
func parseRecord(buffer []byte) any {
|
||||
// TODO: json parser
|
||||
return string(buffer)
|
||||
}
|
||||
|
75
cmd/line_single.go
Normal file
75
cmd/line_single.go
Normal file
@@ -0,0 +1,75 @@
|
||||
package logmower
|
||||
|
||||
import (
|
||||
"bytes"
|
||||
"fmt"
|
||||
"log"
|
||||
"time"
|
||||
|
||||
ms "git.k-space.ee/k-space/logmower-shipper/pkg/mongoStruct"
|
||||
prom "github.com/prometheus/client_golang/prometheus"
|
||||
"github.com/prometheus/client_golang/prometheus/promauto"
|
||||
)
|
||||
|
||||
var promRecordPrefixParsingErr = promauto.NewCounterVec(prom.CounterOpts{
|
||||
Namespace: PrometheusPrefix,
|
||||
Subsystem: "record",
|
||||
Name: "parsing_errors",
|
||||
Help: "Errors while parsing log line prefixes",
|
||||
}, []string{"filename"})
|
||||
|
||||
func (unparsed RawLines) parse(parsed chan<- singleLine) {
|
||||
for {
|
||||
raw, ok := <-unparsed
|
||||
if !ok {
|
||||
close(parsed)
|
||||
return
|
||||
}
|
||||
|
||||
line := singleLine{RawLine: raw}
|
||||
|
||||
if err := line.parse(); err != nil {
|
||||
promRecordPrefixParsingErr.WithLabelValues(raw.metricsName).Add(1)
|
||||
log.Printf("parsing kubernetes log line in %q: %e", raw.File.Path, err)
|
||||
}
|
||||
|
||||
// TODO: should this only be on success?
|
||||
parsed <- line
|
||||
}
|
||||
}
|
||||
|
||||
type (
|
||||
singleLines <-chan singleLine
|
||||
singleLine struct {
|
||||
RawLine
|
||||
|
||||
// populated by parse()
|
||||
ms.ParsedMetadata
|
||||
partial bool // P or F
|
||||
}
|
||||
)
|
||||
|
||||
func (line *singleLine) parse() (err error) {
|
||||
split := bytes.SplitN(line.B, []byte(" "), 4)
|
||||
if len(split) != 4 {
|
||||
return fmt.Errorf("expected at least 3 spaces in , got %d", len(split)-1)
|
||||
}
|
||||
|
||||
line.TimeKubernetes, err = time.Parse(time.RFC3339Nano, string(split[0]))
|
||||
if err != nil {
|
||||
return fmt.Errorf("invalid time: %w", err)
|
||||
}
|
||||
|
||||
line.StdErr = string(split[1]) == "stderr" // or stdout
|
||||
|
||||
switch string(split[2]) {
|
||||
case "P":
|
||||
line.partial = true
|
||||
case "F":
|
||||
default:
|
||||
return fmt.Errorf("partial indicator must be 'P' or 'F', not %q", split[2])
|
||||
}
|
||||
|
||||
line.B = split[3]
|
||||
return nil
|
||||
}
|
32
cmd/main.go
Normal file
32
cmd/main.go
Normal file
@@ -0,0 +1,32 @@
|
||||
package logmower
|
||||
|
||||
import (
|
||||
"errors"
|
||||
"fmt"
|
||||
"log"
|
||||
"net/http"
|
||||
"os"
|
||||
|
||||
"github.com/prometheus/client_golang/prometheus/promhttp"
|
||||
)
|
||||
|
||||
const PrometheusPrefix = "logmower"
|
||||
|
||||
// TODO:
|
||||
func main() {
|
||||
go func() {
|
||||
metricsPort := 2112
|
||||
|
||||
log.Printf("serving /metrics on port %d", metricsPort)
|
||||
|
||||
http.Handle("/metrics", promhttp.Handler())
|
||||
|
||||
if err := http.ListenAndServe(fmt.Sprintf(":%d", metricsPort), nil); !errors.Is(err, http.ErrServerClosed) {
|
||||
log.Fatalf("serving /metrics: %e", err)
|
||||
}
|
||||
}()
|
||||
|
||||
if err := App.Run(os.Args); err != nil {
|
||||
log.Fatal(err)
|
||||
}
|
||||
}
|
37
cmd/mongo.go
37
cmd/mongo.go
@@ -2,13 +2,17 @@ package logmower
|
||||
|
||||
import (
|
||||
"context"
|
||||
"fmt"
|
||||
"log"
|
||||
"net/url"
|
||||
"time"
|
||||
|
||||
ms "git.k-space.ee/k-space/logmower-shipper/pkg/mongoStruct"
|
||||
prom "github.com/prometheus/client_golang/prometheus"
|
||||
"github.com/prometheus/client_golang/prometheus/promauto"
|
||||
mongoEvent "go.mongodb.org/mongo-driver/event"
|
||||
"go.mongodb.org/mongo-driver/mongo"
|
||||
mongoOpt "go.mongodb.org/mongo-driver/mongo/options"
|
||||
"go.uber.org/zap"
|
||||
)
|
||||
|
||||
var (
|
||||
@@ -36,7 +40,7 @@ var (
|
||||
}, []string{"connection_id", "command_name"})
|
||||
)
|
||||
|
||||
func mongoMonitoredClientOptions(l *zap.Logger) *mongoOpt.ClientOptions {
|
||||
func mongoMonitoredClientOptions() *mongoOpt.ClientOptions {
|
||||
return mongoOpt.Client().
|
||||
SetServerMonitor(&mongoEvent.ServerMonitor{
|
||||
ServerHeartbeatSucceeded: func(ev *mongoEvent.ServerHeartbeatSucceededEvent) {
|
||||
@@ -44,7 +48,7 @@ func mongoMonitoredClientOptions(l *zap.Logger) *mongoOpt.ClientOptions {
|
||||
},
|
||||
ServerHeartbeatFailed: func(ev *mongoEvent.ServerHeartbeatFailedEvent) {
|
||||
promDbHeartbeat.WithLabelValues(ev.ConnectionID).Observe(0)
|
||||
l.Warn("database heartbeat", zap.Error(ev.Failure), zap.String("connection_id", ev.ConnectionID))
|
||||
log.Printf("database heartbeat failed on connection %q: %e", ev.ConnectionID, ev.Failure)
|
||||
},
|
||||
}).
|
||||
SetMonitor(&mongoEvent.CommandMonitor{
|
||||
@@ -58,3 +62,30 @@ func mongoMonitoredClientOptions(l *zap.Logger) *mongoOpt.ClientOptions {
|
||||
},
|
||||
})
|
||||
}
|
||||
|
||||
func initDatabase(ctx context.Context, uri string) (*mongo.Collection, error) {
|
||||
uriParsed, err := url.ParseRequestURI(uri)
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("parsing URI for database name: %w", err)
|
||||
}
|
||||
|
||||
uriParsed.Path = uriParsed.Path[1:] // remove leading slash
|
||||
if uriParsed.Path == "" {
|
||||
return nil, fmt.Errorf("URI must include database name (as database to authenticate against)")
|
||||
}
|
||||
|
||||
dbOpt := mongoMonitoredClientOptions().ApplyURI(uri)
|
||||
|
||||
dbClient, err := mongo.Connect(mongoTimeoutCtx(ctx))
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("connecting to %q: %w", dbOpt.GetURI(), err)
|
||||
}
|
||||
|
||||
col := dbClient.Database(uriParsed.Path).Collection("logs")
|
||||
|
||||
if err := ms.InitializeIndexes(mongoTimeoutCtx(ctx), col); err != nil {
|
||||
return nil, fmt.Errorf("initializing indexes: %w", err)
|
||||
}
|
||||
|
||||
return col, nil
|
||||
}
|
||||
|
@@ -1,120 +0,0 @@
|
||||
package logmower
|
||||
|
||||
import (
|
||||
"context"
|
||||
"time"
|
||||
|
||||
"go.mongodb.org/mongo-driver/bson"
|
||||
"go.mongodb.org/mongo-driver/mongo"
|
||||
)
|
||||
|
||||
func initializeIndexes(ctx context.Context, col *mongo.Collection) error {
|
||||
ind := col.Indexes()
|
||||
|
||||
// (does not create duplicates)
|
||||
_, err := ind.CreateOne(mongoTimeoutCtx(ctx), mongo.IndexModel{
|
||||
Keys: bson.D{{Key: mLogKeyFileBasename, Value: 1}, {Key: mLogKeyOffset, Value: -1}},
|
||||
})
|
||||
|
||||
return err
|
||||
}
|
||||
|
||||
// when editing, also edit everything in this file!
|
||||
type (
|
||||
mLog struct {
|
||||
RecordMetadata
|
||||
|
||||
Content any
|
||||
ContainerTime time.Time
|
||||
StdErr bool
|
||||
|
||||
// added by toBson()
|
||||
ShipTime time.Time
|
||||
}
|
||||
RecordMetadata struct {
|
||||
HostInfo HostInfo
|
||||
File string
|
||||
Offset int64 // byte offset where log entry ends at
|
||||
}
|
||||
)
|
||||
|
||||
const (
|
||||
// used outside fromBson and toBson
|
||||
mLogKeyHostId = mLogKeyHostInfo + "." + mLogKeyId
|
||||
)
|
||||
|
||||
// Don't use direct strings in bson types. Use the constants as keys.
|
||||
// This ensures keys (and subkeys) are consistent, at least within the package.
|
||||
|
||||
const (
|
||||
mLogKeyHostInfo = "host_info"
|
||||
mLogKeyId = "id"
|
||||
mLogKeyName = "name"
|
||||
mLogKeyArch = "arch"
|
||||
|
||||
mLogKeyFileBasename = "file"
|
||||
mLogKeyOffset = "offset"
|
||||
mLogKeyContent = "content"
|
||||
mLogKeyContainerTime = "container_time"
|
||||
mLogKeyStderr = "stderr"
|
||||
mLogKeyShipTime = "ship_time"
|
||||
)
|
||||
|
||||
// not using marshal, since it is <0.1x performance
|
||||
func (l *mLog) toBson() bson.M {
|
||||
return bson.M{
|
||||
mLogKeyHostInfo: bson.M{
|
||||
mLogKeyId: l.HostInfo.Id,
|
||||
mLogKeyName: l.HostInfo.Name,
|
||||
mLogKeyArch: l.HostInfo.Arch,
|
||||
},
|
||||
mLogKeyFileBasename: l.File,
|
||||
mLogKeyOffset: l.Offset,
|
||||
mLogKeyContent: l.Content,
|
||||
mLogKeyContainerTime: l.ContainerTime,
|
||||
mLogKeyStderr: l.StdErr,
|
||||
|
||||
mLogKeyShipTime: time.Now(),
|
||||
}
|
||||
}
|
||||
|
||||
// really, mongo should support tagged structs
|
||||
func mLogfromBson(b *bson.Raw) mLog {
|
||||
return mLog{
|
||||
RecordMetadata: RecordMetadata{
|
||||
HostInfo: HostInfo{
|
||||
Id: bsonLookupStringValue(b, mLogKeyHostInfo, mLogKeyId),
|
||||
Name: bsonLookupStringValue(b, mLogKeyHostInfo, mLogKeyName),
|
||||
Arch: bsonLookupStringValue(b, mLogKeyHostInfo, mLogKeyArch),
|
||||
},
|
||||
File: bsonLookupStringValue(b, mLogKeyFileBasename),
|
||||
Offset: bsonLookupInt64(b, mLogKeyOffset),
|
||||
},
|
||||
Content: bsonLookupStringValue(b, mLogKeyContent),
|
||||
ContainerTime: bsonLookupTime(b, mLogKeyContainerTime),
|
||||
StdErr: bsonLookupBoolean(b, mLogKeyStderr),
|
||||
ShipTime: bsonLookupTime(b, mLogKeyShipTime),
|
||||
}
|
||||
}
|
||||
|
||||
// default values without ok
|
||||
|
||||
func bsonLookupBoolean(b *bson.Raw, key ...string) bool {
|
||||
v, _ := b.Lookup(key...).BooleanOK()
|
||||
return v
|
||||
}
|
||||
|
||||
func bsonLookupStringValue(b *bson.Raw, key ...string) string {
|
||||
v, _ := b.Lookup(key...).StringValueOK()
|
||||
return v
|
||||
}
|
||||
|
||||
func bsonLookupInt64(b *bson.Raw, key ...string) int64 {
|
||||
v, _ := b.Lookup(key...).Int64OK()
|
||||
return v
|
||||
}
|
||||
|
||||
func bsonLookupTime(b *bson.Raw, key ...string) time.Time {
|
||||
v, _ := b.Lookup(key...).TimeOK()
|
||||
return v
|
||||
}
|
@@ -2,13 +2,14 @@ package logmower
|
||||
|
||||
import (
|
||||
"context"
|
||||
"path/filepath"
|
||||
"log"
|
||||
"time"
|
||||
|
||||
ms "git.k-space.ee/k-space/logmower-shipper/pkg/mongoStruct"
|
||||
"github.com/jtagcat/util"
|
||||
prom "github.com/prometheus/client_golang/prometheus"
|
||||
"github.com/prometheus/client_golang/prometheus/promauto"
|
||||
"go.uber.org/zap"
|
||||
"go.mongodb.org/mongo-driver/mongo"
|
||||
)
|
||||
|
||||
var (
|
||||
@@ -50,20 +51,20 @@ const (
|
||||
MaxBatchTime = 5 * time.Second
|
||||
)
|
||||
|
||||
func (s *submitter) sender(name string, sendQueue <-chan mLog) {
|
||||
baseName := filepath.Base(name)
|
||||
type queueT <-chan ms.Record
|
||||
|
||||
batched := make(chan []mLog)
|
||||
func (queue queueT) sender(db *mongo.Collection, metricsFilename string) {
|
||||
batched := make(chan []ms.Record)
|
||||
|
||||
// batcher and queue metrics
|
||||
go func() {
|
||||
ctx, cancel := context.WithCancel(context.Background())
|
||||
defer cancel()
|
||||
|
||||
go func() {
|
||||
baseName := filepath.Base(name)
|
||||
for {
|
||||
promShipperQueued.WithLabelValues(baseName).Set(float64(
|
||||
len(sendQueue)))
|
||||
promShipperQueued.WithLabelValues(metricsFilename).Set(float64(
|
||||
len(queue)))
|
||||
|
||||
timer := time.NewTimer(time.Second)
|
||||
select {
|
||||
@@ -74,15 +75,12 @@ func (s *submitter) sender(name string, sendQueue <-chan mLog) {
|
||||
}
|
||||
}()
|
||||
|
||||
util.Batch(MaxBatchItems, MaxBatchTime, sendQueue, batched)
|
||||
util.Batch(MaxBatchItems, MaxBatchTime, queue, batched)
|
||||
// returns when sendQueue is closed
|
||||
}()
|
||||
|
||||
s.Add(1)
|
||||
defer s.Done()
|
||||
|
||||
for {
|
||||
promShipperSynced.WithLabelValues(baseName).Set(1)
|
||||
promShipperSynced.WithLabelValues(metricsFilename).Set(1)
|
||||
|
||||
batch, ok := <-batched
|
||||
if !ok {
|
||||
@@ -90,21 +88,21 @@ func (s *submitter) sender(name string, sendQueue <-chan mLog) {
|
||||
}
|
||||
promShipperBatchSizeResult.Observe(float64(len(batch)))
|
||||
|
||||
promShipperSynced.WithLabelValues(baseName).Set(0)
|
||||
promShipperSynced.WithLabelValues(metricsFilename).Set(0)
|
||||
|
||||
var batchBson []interface{} // mongo does not like typing
|
||||
for _, b := range batch {
|
||||
batchBson = append(batchBson, b.toBson())
|
||||
batchBson = append(batchBson, b.ToBson())
|
||||
}
|
||||
|
||||
result, err := s.db.InsertMany(mongoTimeoutCtx(context.Background()), batchBson, nil)
|
||||
result, err := db.InsertMany(mongoTimeoutCtx(context.Background()), batchBson, nil)
|
||||
if err != nil {
|
||||
promShipperDbSendError.WithLabelValues(baseName).Add(1)
|
||||
s.l.Error("submission to database", zap.Error(err)) // TODO: add some selective retry here or something
|
||||
promShipperDbSendError.WithLabelValues(metricsFilename).Add(1)
|
||||
log.Printf("failure in batch submit to database: %e", err) // TODO: add some selective retry here or something, better error handling
|
||||
continue
|
||||
}
|
||||
|
||||
promShipperDbSent.WithLabelValues(baseName).Add(float64(
|
||||
promShipperDbSent.WithLabelValues(metricsFilename).Add(float64(
|
||||
len(result.InsertedIDs)))
|
||||
}
|
||||
}
|
||||
|
@@ -5,18 +5,17 @@ import (
|
||||
"errors"
|
||||
"fmt"
|
||||
"io"
|
||||
"log"
|
||||
"os"
|
||||
"path/filepath"
|
||||
"sync"
|
||||
"time"
|
||||
|
||||
ms "git.k-space.ee/k-space/logmower-shipper/pkg/mongoStruct"
|
||||
"github.com/jtagcat/util"
|
||||
prom "github.com/prometheus/client_golang/prometheus"
|
||||
"github.com/prometheus/client_golang/prometheus/promauto"
|
||||
"go.mongodb.org/mongo-driver/bson"
|
||||
"go.mongodb.org/mongo-driver/mongo"
|
||||
mongoOpt "go.mongodb.org/mongo-driver/mongo/options"
|
||||
"go.uber.org/zap"
|
||||
"k8s.io/apimachinery/pkg/util/wait"
|
||||
)
|
||||
|
||||
@@ -48,49 +47,43 @@ var (
|
||||
}, []string{"filename"})
|
||||
)
|
||||
|
||||
type (
|
||||
submitter struct {
|
||||
l *zap.Logger
|
||||
|
||||
hostInfo HostInfo
|
||||
db *mongo.Collection
|
||||
|
||||
sync.WaitGroup
|
||||
}
|
||||
)
|
||||
|
||||
const SendQueueLimit = 1024
|
||||
|
||||
// TODO: caller may call duplicate shipFile of same name on file replace; sends might not work properly
|
||||
func (s *submitter) shipFile(ctx context.Context, name string, recordLimitBytes int) {
|
||||
baseName := filepath.Base(name)
|
||||
type file struct {
|
||||
ms.File
|
||||
metricsName string // filepath.Base()
|
||||
}
|
||||
|
||||
lineChan := make(chan rawLine)
|
||||
// TODO: caller could call duplicate shipFile of same name on file replace: sends might not work properly
|
||||
func (f file) Process(ctx context.Context, db *mongo.Collection, recordLimitBytes int) {
|
||||
lineChan := make(chan RawLine)
|
||||
defer close(lineChan)
|
||||
|
||||
sendChan := make(chan mLog, SendQueueLimit)
|
||||
dbQueue := make(chan ms.Record, SendQueueLimit)
|
||||
go RawLines(lineChan).Process(recordLimitBytes, dbQueue)
|
||||
|
||||
go s.parseLines(recordLimitBytes, lineChan, sendChan)
|
||||
|
||||
go s.sender(name, sendChan)
|
||||
waitGo := util.GoWg(func() {
|
||||
queueT(dbQueue).sender(db, f.metricsName)
|
||||
})
|
||||
defer waitGo()
|
||||
|
||||
// TODO: better way to kill or wait for sendQueue before retrying (or duplicates?)
|
||||
_ = wait.ManagedExponentialBackoffWithContext(ctx, defaultBackoff(), func() (done bool, _ error) {
|
||||
//
|
||||
err := s.shipFileRoutine(ctx, name, lineChan)
|
||||
err := f.trySubmit(ctx, db, lineChan)
|
||||
if err == nil {
|
||||
return true, nil
|
||||
}
|
||||
|
||||
promFileErr.WithLabelValues(baseName).Add(1)
|
||||
s.l.Error("shipping file", zap.String("filename", name), zap.Error(err))
|
||||
return false, nil // nil since we want to loop and keep retrying indefinitely
|
||||
promFileErr.WithLabelValues(f.metricsName).Add(1)
|
||||
log.Printf("processing file %q: %e", f.metricsName, err)
|
||||
|
||||
// nil: loop and keep retrying indefinitely
|
||||
return false, nil
|
||||
})
|
||||
}
|
||||
|
||||
func (s *submitter) shipFileRoutine(ctx context.Context, name string, sendQueue chan<- rawLine) error {
|
||||
baseName := filepath.Base(name)
|
||||
|
||||
// use submitter(), don't use directly
|
||||
func (f file) trySubmit(ctx context.Context, db *mongo.Collection, sendQueue chan<- RawLine) error {
|
||||
// TODO: better way for respecting ?killing sender for retry
|
||||
for {
|
||||
if len(sendQueue) == 0 {
|
||||
@@ -100,9 +93,9 @@ func (s *submitter) shipFileRoutine(ctx context.Context, name string, sendQueue
|
||||
}
|
||||
|
||||
// get files with offset
|
||||
offsetResult, _ := mongoWithErr(s.db.FindOne(mongoTimeoutCtx(ctx),
|
||||
bson.D{{Key: mLogKeyHostId, Value: s.hostInfo.Id}, {Key: mLogKeyFileBasename, Value: baseName}},
|
||||
&mongoOpt.FindOneOptions{Sort: bson.D{{Key: mLogKeyOffset, Value: -1}}}, // sort descending (get largest)
|
||||
offsetResult, _ := mongoWithErr(db.FindOne(mongoTimeoutCtx(ctx),
|
||||
bson.D{{Key: ms.RecordKeyHostId, Value: f.Host.Id}, {Key: ms.RecordKeyFilePath, Value: f.Path}},
|
||||
&mongoOpt.FindOneOptions{Sort: bson.D{{Key: ms.RecordKeyOffset, Value: -1}}}, // sort descending (get largest)
|
||||
))
|
||||
|
||||
offsetResultBytes, err := offsetResult.DecodeBytes()
|
||||
@@ -110,9 +103,9 @@ func (s *submitter) shipFileRoutine(ctx context.Context, name string, sendQueue
|
||||
return fmt.Errorf("retrieving offset from database: %w", err)
|
||||
}
|
||||
|
||||
log := mLogfromBson(&offsetResultBytes)
|
||||
dbOffset := ms.RecordOffsetFromBson(&offsetResultBytes)
|
||||
|
||||
fi, err := os.Stat(name)
|
||||
fi, err := os.Stat(f.Path)
|
||||
if err != nil {
|
||||
return fmt.Errorf("getting original file size: %w", err)
|
||||
}
|
||||
@@ -121,18 +114,15 @@ func (s *submitter) shipFileRoutine(ctx context.Context, name string, sendQueue
|
||||
sctx, cancel := context.WithCancel(ctx)
|
||||
defer cancel()
|
||||
|
||||
promFileInitialSeekSkipped.WithLabelValues(baseName).Set(float64(log.Offset))
|
||||
promFileInitialSeekSkipped.WithLabelValues(f.metricsName).Set(float64(dbOffset))
|
||||
|
||||
lineChan, errChan, err := util.TailFile(sctx, name, log.Offset, io.SeekStart)
|
||||
lineChan, errChan, err := util.TailFile(sctx, f.Path, dbOffset, io.SeekStart)
|
||||
if err != nil {
|
||||
return fmt.Errorf("tailing file: %w", err)
|
||||
}
|
||||
|
||||
var catchUpped bool // cache
|
||||
promFileCatchupDone.WithLabelValues(baseName).Set(0)
|
||||
|
||||
// TODO: partial line combining
|
||||
// TODO: promRecordDroppedTooLarge
|
||||
var catchUpped bool
|
||||
promFileCatchupDone.WithLabelValues(f.metricsName).Set(0)
|
||||
|
||||
for {
|
||||
select {
|
||||
@@ -144,13 +134,13 @@ func (s *submitter) shipFileRoutine(ctx context.Context, name string, sendQueue
|
||||
return nil
|
||||
}
|
||||
|
||||
promFileLineSize.WithLabelValues(baseName).Observe(float64(len(line.Bytes)))
|
||||
promFileLineSize.WithLabelValues(f.metricsName).Observe(float64(len(line.Bytes)))
|
||||
|
||||
if !catchUpped {
|
||||
catchUpped = line.EndOffset >= startSize
|
||||
|
||||
if catchUpped {
|
||||
promFileCatchupDone.WithLabelValues(baseName).Set(1)
|
||||
promFileCatchupDone.WithLabelValues(f.metricsName).Set(1)
|
||||
}
|
||||
}
|
||||
|
||||
@@ -158,14 +148,11 @@ func (s *submitter) shipFileRoutine(ctx context.Context, name string, sendQueue
|
||||
continue
|
||||
}
|
||||
|
||||
sendQueue <- rawLine{
|
||||
RecordMetadata: RecordMetadata{
|
||||
HostInfo: s.hostInfo,
|
||||
File: baseName,
|
||||
sendQueue <- RawLine{
|
||||
file: &f,
|
||||
|
||||
Offset: line.EndOffset,
|
||||
},
|
||||
line: line.Bytes,
|
||||
Offset: line.EndOffset,
|
||||
B: line.Bytes,
|
||||
}
|
||||
}
|
||||
}
|
||||
|
180
cmd/watcher.go
180
cmd/watcher.go
@@ -2,34 +2,23 @@ package logmower
|
||||
|
||||
import (
|
||||
"context"
|
||||
"errors"
|
||||
"fmt"
|
||||
"net/http"
|
||||
"net/url"
|
||||
"log"
|
||||
"os"
|
||||
"os/signal"
|
||||
"path/filepath"
|
||||
"runtime"
|
||||
"strings"
|
||||
"sync"
|
||||
"time"
|
||||
|
||||
ms "git.k-space.ee/k-space/logmower-shipper/pkg/mongoStruct"
|
||||
"github.com/fsnotify/fsnotify"
|
||||
prom "github.com/prometheus/client_golang/prometheus"
|
||||
"github.com/prometheus/client_golang/prometheus/promauto"
|
||||
"github.com/prometheus/client_golang/prometheus/promhttp"
|
||||
"github.com/urfave/cli/v2"
|
||||
"go.elastic.co/ecszap"
|
||||
"go.mongodb.org/mongo-driver/mongo"
|
||||
"go.uber.org/zap"
|
||||
"k8s.io/apimachinery/pkg/util/wait"
|
||||
)
|
||||
|
||||
const (
|
||||
MachineId = "/etc/machine-id"
|
||||
DatabaseCommandTimeout = 10 * time.Second
|
||||
PrometheusPrefix = "logmower"
|
||||
)
|
||||
const DatabaseCommandTimeout = 10 * time.Second
|
||||
|
||||
// wrapper to force copying before use
|
||||
func defaultBackoff() wait.Backoff {
|
||||
@@ -73,16 +62,6 @@ var App = &cli.App{
|
||||
},
|
||||
|
||||
Action: func(ctx *cli.Context) error {
|
||||
ctx.Context, _ = signal.NotifyContext(ctx.Context, os.Interrupt) // TODO: test
|
||||
var wg sync.WaitGroup
|
||||
|
||||
// init logger, ECS schema
|
||||
core := ecszap.NewCore(ecszap.NewDefaultEncoderConfig(),
|
||||
os.Stderr, zap.WarnLevel)
|
||||
l := zap.New(core, zap.AddCaller())
|
||||
|
||||
l.Info("logmower starting", zap.String("version", ctx.App.Version))
|
||||
|
||||
var (
|
||||
promWatcherOnline = promauto.NewGauge(prom.GaugeOpts{
|
||||
Namespace: PrometheusPrefix,
|
||||
@@ -115,52 +94,27 @@ var App = &cli.App{
|
||||
Help: "Number of events while watchng (includes initial create events for existing file discovery)",
|
||||
})
|
||||
)
|
||||
go func() {
|
||||
l.Info("/metrics starting", zap.Int("port", 2112))
|
||||
http.Handle("/metrics", promhttp.Handler())
|
||||
|
||||
if err := http.ListenAndServe(":2112", nil); !errors.Is(err, http.ErrServerClosed) {
|
||||
l.Fatal("failed to serve /metrics", zap.Error(err))
|
||||
}
|
||||
}()
|
||||
ctx.Context, _ = signal.NotifyContext(ctx.Context, os.Interrupt) // TODO: test
|
||||
var wg sync.WaitGroup
|
||||
|
||||
state := submitter{l: l}
|
||||
log.Printf("%s %s starting", ctx.App.Name, ctx.App.Version)
|
||||
|
||||
dbOpt := mongoMonitoredClientOptions(l).ApplyURI(ctx.String("mongo-uri"))
|
||||
|
||||
dbClient, err := mongo.Connect(mongoTimeoutCtx(ctx.Context))
|
||||
db, err := initDatabase(ctx.Context, ctx.String("mongo-uri"))
|
||||
if err != nil {
|
||||
l.Fatal("connecting to database", zap.String("uri", dbOpt.GetURI()), zap.Error(err))
|
||||
return fmt.Errorf("initializing database connection: %w", err)
|
||||
}
|
||||
|
||||
uriParsed, err := url.ParseRequestURI(ctx.String("mongo-uri"))
|
||||
if err != nil {
|
||||
l.Fatal("parsing URI for mongo database name", zap.Error(err))
|
||||
}
|
||||
|
||||
uriParsed.Path = uriParsed.Path[1:] // remove leading slash
|
||||
if uriParsed.Path == "" {
|
||||
l.Fatal("mongo database name must be set in mongo URI")
|
||||
}
|
||||
|
||||
state.db = dbClient.Database(uriParsed.Path).Collection("logs")
|
||||
|
||||
err = initializeIndexes(ctx.Context, state.db)
|
||||
if err != nil {
|
||||
l.Fatal("initializing indexes", zap.Error(err))
|
||||
}
|
||||
|
||||
state.hostInfo, err = getHostInfo(ctx.String("node-name"))
|
||||
if err != nil {
|
||||
l.Fatal("gathering host info", zap.Error(err))
|
||||
var hostInfo ms.HostInfo
|
||||
if err := hostInfo.Populate(ctx.String("node-name")); err != nil {
|
||||
return fmt.Errorf("populating host info: %w", err)
|
||||
}
|
||||
|
||||
watcher, err := fsnotify.NewWatcher()
|
||||
if err != nil {
|
||||
l.Fatal("setting up watcher", zap.Error(err))
|
||||
return fmt.Errorf("initializing log directory watcher: %w", err)
|
||||
}
|
||||
|
||||
logDir := ctx.String("log-directory")
|
||||
defer watcher.Close()
|
||||
|
||||
wg.Add(1)
|
||||
go func() {
|
||||
@@ -181,10 +135,10 @@ var App = &cli.App{
|
||||
}
|
||||
|
||||
// TODO: #1: || if not in filterset
|
||||
_, ok = parseLogName(event.Name)
|
||||
kubeInfo, ok := ms.ParseLogName(event.Name)
|
||||
if !ok {
|
||||
promWatcherFilesSkipped.Add(1)
|
||||
l.Warn("skipped file with unparsable name", zap.String("name", event.Name))
|
||||
log.Printf("skipped %q: filename not parsable in kubernetes log format", filepath.Base(event.Name))
|
||||
continue
|
||||
}
|
||||
|
||||
@@ -192,7 +146,16 @@ var App = &cli.App{
|
||||
|
||||
wg.Add(1)
|
||||
go func() {
|
||||
state.shipFile(ctx.Context, event.Name, ctx.Int("max-record-size"))
|
||||
file := file{
|
||||
File: ms.File{
|
||||
Host: &hostInfo,
|
||||
KubeInfo: kubeInfo,
|
||||
Path: event.Name,
|
||||
},
|
||||
metricsName: filepath.Base(event.Name),
|
||||
}
|
||||
|
||||
file.Process(ctx.Context, db, ctx.Int("max-record-size"))
|
||||
wg.Done()
|
||||
}()
|
||||
|
||||
@@ -201,22 +164,20 @@ var App = &cli.App{
|
||||
return
|
||||
}
|
||||
promWatcherErr.Add(1)
|
||||
l.Error("while watching log dir events", zap.Error(err))
|
||||
log.Printf("watching for new logs: %e", err)
|
||||
}
|
||||
}
|
||||
}()
|
||||
|
||||
logDir := ctx.String("log-directory")
|
||||
|
||||
// simulate create events to pick up files already created
|
||||
err = simulateInitialCreate(logDir, watcher.Events)
|
||||
if err != nil {
|
||||
promWatcherErr.Add(1)
|
||||
l.Fatal("listing initial log directory", zap.String("name", logDir), zap.Error(err))
|
||||
if err := simulateInitialCreates(logDir, watcher.Events); err != nil {
|
||||
return fmt.Errorf("listing log directory %q: %w", logDir, err)
|
||||
}
|
||||
|
||||
err = watcher.Add(logDir)
|
||||
if err != nil {
|
||||
promWatcherErr.Add(1)
|
||||
l.Fatal("watching log directory", zap.String("name", logDir), zap.Error(err))
|
||||
if err := watcher.Add(logDir); err != nil {
|
||||
return fmt.Errorf("watching for new logs in %q: %w", logDir, err)
|
||||
}
|
||||
|
||||
promWatcherOnline.Set(1)
|
||||
@@ -224,84 +185,11 @@ var App = &cli.App{
|
||||
// waiting indefinitely for interrupt
|
||||
wg.Wait() // wait for watch and file processors to cleanup
|
||||
|
||||
return errAppend(watcher.Close(), ctx.Err())
|
||||
return ctx.Err()
|
||||
},
|
||||
}
|
||||
|
||||
type HostInfo struct {
|
||||
Id string
|
||||
Name string
|
||||
Arch string
|
||||
}
|
||||
|
||||
func getHostInfo(nodeName string) (h HostInfo, err error) {
|
||||
if nodeName == "" {
|
||||
nodeName, err = os.Hostname()
|
||||
if err != nil {
|
||||
err = fmt.Errorf("name: hostname: %w", err) // don't exit early
|
||||
}
|
||||
}
|
||||
h.Name = strings.TrimSpace(nodeName)
|
||||
|
||||
id, errL := os.ReadFile(MachineId)
|
||||
if errL != nil {
|
||||
err = errAppend(err, fmt.Errorf("id: %w", errL))
|
||||
}
|
||||
|
||||
h.Id = strings.TrimSpace(string(id))
|
||||
|
||||
h.Arch = runtime.GOARCH
|
||||
|
||||
return h, err
|
||||
}
|
||||
|
||||
func errAppend(a, b error) error {
|
||||
if a == nil {
|
||||
return b
|
||||
}
|
||||
if b == nil {
|
||||
return a
|
||||
}
|
||||
return fmt.Errorf("%e; %e", a, b)
|
||||
}
|
||||
|
||||
type logMeta struct {
|
||||
podName string
|
||||
podNamespace string
|
||||
containerName string
|
||||
containerId string
|
||||
}
|
||||
|
||||
func parseLogName(name string) (m logMeta, ok bool) {
|
||||
name = filepath.Base(name)
|
||||
|
||||
// https://github.com/kubernetes/design-proposals-archive/blob/8da1442ea29adccea40693357d04727127e045ed/node/kubelet-cri-logging.md
|
||||
// <pod_name>_<pod_namespace>_<container_name>-<container_id>.log`
|
||||
|
||||
m.podName, name, ok = strings.Cut(name, "_")
|
||||
if !ok {
|
||||
return
|
||||
}
|
||||
|
||||
m.podNamespace, name, ok = strings.Cut(name, "_")
|
||||
if !ok {
|
||||
return
|
||||
}
|
||||
|
||||
m.containerName, name, ok = strings.Cut(name, "-")
|
||||
if !ok {
|
||||
return
|
||||
}
|
||||
|
||||
m.containerId = strings.TrimSuffix(name, ".log")
|
||||
if !strings.HasSuffix(name, ".log") {
|
||||
return
|
||||
}
|
||||
|
||||
return m, true
|
||||
}
|
||||
|
||||
func simulateInitialCreate(dirName string, eventChan chan<- fsnotify.Event) error {
|
||||
func simulateInitialCreates(dirName string, eventChan chan<- fsnotify.Event) error {
|
||||
dir, err := os.ReadDir(dirName)
|
||||
if err != nil {
|
||||
return err
|
||||
|
Reference in New Issue
Block a user