package logmower import ( "context" "path/filepath" "time" "github.com/jtagcat/util" prom "github.com/prometheus/client_golang/prometheus" "github.com/prometheus/client_golang/prometheus/promauto" "go.uber.org/zap" ) var ( promShipperMongoSent = promauto.NewCounterVec(prom.CounterOpts{ Subsystem: "shipper", Name: "sent", Help: "Items successfully committed to mongo", }, []string{"filename"}) promShipperMongoSentError = promauto.NewCounterVec(prom.CounterOpts{ Subsystem: "shipper", Name: "mongo_errors", Help: "Errors while submitting to mongo", // TODO: }, []string{"filename"}) promLineParsingErr = promauto.NewCounterVec(prom.CounterOpts{ Subsystem: "shipper", Name: "lines_parsing_errors", Help: "Errors while parsing log line suffixes", }, []string{"filename"}) promShipperQueueItems = promauto.NewHistogramVec(prom.HistogramOpts{ Subsystem: "shipper", Name: "queued", Help: "Items in queue to be batched and sent to mongo", }, []string{"filename"}) promShipperSynced = promauto.NewGaugeVec(prom.GaugeOpts{ Subsystem: "shipper", Name: "batches_synced", Help: "All batches available have been sent to mongo", }, []string{"filename"}) ) const ( MaxBatchItems = 100 MaxBatchTime = time.Second ) func init() { promauto.NewGaugeFunc(prom.GaugeOpts{ Subsystem: "shipper", Name: "queue_size", Help: "Submit queue size cap", }, func() float64 { return float64(SendQueueLimit) }) promauto.NewGaugeFunc(prom.GaugeOpts{ Subsystem: "shipper", Name: "batch_size", Help: "batching size cap", }, func() float64 { return float64(MaxBatchItems) }) promauto.NewGaugeFunc(prom.GaugeOpts{ Subsystem: "shipper", Name: "batch_time", Help: "batching delay cap", }, func() float64 { return float64(MaxBatchTime) }) } func (s *submitter) sender(name string, sendQueue <-chan mLog) { baseName := filepath.Base(name) batched := make(chan []mLog) go func() { ctx, cancel := context.WithCancel(context.Background()) defer cancel() go func() { baseName := filepath.Base(name) for { promShipperQueueItems.WithLabelValues(baseName).Observe(float64( len(sendQueue))) timer := time.NewTimer(time.Second) select { case <-ctx.Done(): return case <-timer.C: } } }() util.Batch(MaxBatchItems, MaxBatchTime, sendQueue, batched) // returns when sendQueue is closed }() s.Add(1) defer s.Done() for { promShipperSynced.WithLabelValues(baseName).Set(1) batch, ok := <-batched if !ok { return } promShipperSynced.WithLabelValues(baseName).Set(0) var batchBson []interface{} // mongo does not like typing for _, b := range batch { batchBson = append(batchBson, b.toBson()) } result, err := s.db.InsertMany(mongoTimeoutCtx(context.Background()), batchBson, nil) promShipperMongoSent.WithLabelValues(baseName).Add(float64( len(result.InsertedIDs))) if err != nil { s.l.Error("mongo send returned error; TODO: add some selective retry here or something", zap.Error(err)) // TODO: } } }