diff --git a/cmd/logmower.go b/cmd/logmower.go index d27c342..0e9dfb3 100644 --- a/cmd/logmower.go +++ b/cmd/logmower.go @@ -25,10 +25,8 @@ import ( ) const ( - MACHINEID = "/etc/machine-id" - MONGO_TIMEOUT = 10 * time.Second - SendQueueLimit = 1024 - MaxBatchTime = time.Second + MACHINEID = "/etc/machine-id" + MONGO_TIMEOUT = 10 * time.Second ) // wrapper to force copying before use @@ -117,11 +115,6 @@ var App = &cli.App{ } state.db = dbClient.Database(dbOpt.Auth.AuthSource).Collection("logs") - wg.Add(1) - go func() { - state.sender() - wg.Done() - }() state.hostInfo, err = getHostInfo(ctx.String("node-name")) if err != nil { diff --git a/cmd/sender.go b/cmd/sender.go index 010c71d..9897bb6 100644 --- a/cmd/sender.go +++ b/cmd/sender.go @@ -1,6 +1,7 @@ package logmower import ( + "context" "time" "github.com/jtagcat/util" @@ -11,61 +12,118 @@ import ( ) var ( - promShipperMongoSent = promauto.NewCounter(prom.CounterOpts{ + promShipperMongoSent = promauto.NewCounterVec(prom.CounterOpts{ Subsystem: "shipper", Name: "sent_count", - Help: "Log items successfully committed to mongo", - }) - promShipperMongoSentError = promauto.NewCounter(prom.CounterOpts{ + Help: "Items successfully committed to mongo", + }, []string{"filename"}) + promShipperMongoSentError = promauto.NewCounterVec(prom.CounterOpts{ Subsystem: "shipper", - Name: "mongo_errors", + Name: "mongo_errors_count", Help: "Errors while submitting to mongo", // TODO: - }) + }, []string{"filename"}) promShipperDropped = promauto.NewCounterVec(prom.CounterOpts{ Subsystem: "shipper", - Name: "queue_dropped", - Help: "Items ready to be sent to mongo, but dropped due to full queue", + Name: "queue_dropped_count", + Help: "Items ready to be batched and sent to mongo, but dropped due to full queue", }, []string{"filename"}) + promShipperQueueItems = promauto.NewHistogramVec(prom.HistogramOpts{ + Subsystem: "shipper", + Name: "queue_items", + Help: "Items in queue to be batched and sent to mongo", + }, []string{"filename"}) + promShipperSynced = promauto.NewGaugeVec(prom.GaugeOpts{ + Subsystem: "shipper", + Name: "batches_synced", + Help: "All batches available have been sent to mongo", + }, []string{"filename"}) +) + +const ( + SendQueueLimit = 1024 + MaxBatchItems = 100 + MaxBatchTime = time.Second ) func init() { promauto.NewGaugeFunc(prom.GaugeOpts{ Subsystem: "shipper", Name: "queue_size", - Help: "Submit queue size", + Help: "Submit queue size cap", }, func() float64 { return float64(SendQueueLimit) }) -} - -func (s *submitter) sender() { promauto.NewGaugeFunc(prom.GaugeOpts{ Subsystem: "shipper", - Name: "queue_items", - Help: "Items in queue to be submitted in batch to mongo", + Name: "batch_size", + Help: "batching size cap", }, func() float64 { - return float64(len(s.sendQueue)) + return float64(MaxBatchItems) }) + promauto.NewGaugeFunc(prom.GaugeOpts{ + Subsystem: "shipper", + Name: "batch_time", + Help: "batching delay cap", + }, func() float64 { + return float64(MaxBatchTime) + }) +} +func (s *submitter) sender(name string, sendQueue <-chan mLog) (synced func() bool) { batched := make(chan []mLog) go func() { - util.Batch(4, MaxBatchTime, s.sendQueue, batched) + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + + go func() { + for { + promShipperQueueItems.WithLabelValues(name).Observe(float64( + len(sendQueue))) + + timer := time.NewTimer(time.Second) + select { + case <-ctx.Done(): + return + case <-timer.C: + } + } + }() + + util.Batch(MaxBatchItems, MaxBatchTime, sendQueue, batched) }() + var batchSynced bool + s.Add(1) + go s.senderRoutine(name, batched, &batchSynced) + + return func() bool { + return batchSynced && len(sendQueue) == 0 + } +} + +func (s *submitter) senderRoutine(name string, batched <-chan []mLog, synced *bool) { + defer s.Done() + for { + *synced = true + promShipperSynced.WithLabelValues(name).Set(1) + batch, ok := <-batched if !ok { return } + *synced = false + promShipperSynced.WithLabelValues(name).Set(0) + var batchBson []interface{} // mongo does not like typing for _, b := range batch { batchBson = append(batchBson, b.toBson()) } - result, err := s.db.InsertMany(mongoTimeoutCtx(s.ctx), batchBson, nil) - promShipperMongoSent.Add(float64( + result, err := s.db.InsertMany(mongoTimeoutCtx(context.Background()), batchBson, nil) + promShipperMongoSent.WithLabelValues(name).Add(float64( len(result.InsertedIDs))) if err != nil { diff --git a/cmd/submit.go b/cmd/submit.go index 7396a8c..d96ac67 100644 --- a/cmd/submit.go +++ b/cmd/submit.go @@ -7,6 +7,7 @@ import ( "io" "os" "path/filepath" + "sync" "github.com/jtagcat/util" prom "github.com/prometheus/client_golang/prometheus" @@ -39,7 +40,7 @@ type ( hostInfo HostInfo db *mongo.Collection - sendQueue chan mLog + sync.WaitGroup } )