refactor sender to be per-file
This commit is contained in:
		| @@ -25,10 +25,8 @@ import ( | ||||
| ) | ||||
|  | ||||
| const ( | ||||
| 	MACHINEID      = "/etc/machine-id" | ||||
| 	MONGO_TIMEOUT  = 10 * time.Second | ||||
| 	SendQueueLimit = 1024 | ||||
| 	MaxBatchTime   = time.Second | ||||
| 	MACHINEID     = "/etc/machine-id" | ||||
| 	MONGO_TIMEOUT = 10 * time.Second | ||||
| ) | ||||
|  | ||||
| // wrapper to force copying before use | ||||
| @@ -117,11 +115,6 @@ var App = &cli.App{ | ||||
| 		} | ||||
|  | ||||
| 		state.db = dbClient.Database(dbOpt.Auth.AuthSource).Collection("logs") | ||||
| 		wg.Add(1) | ||||
| 		go func() { | ||||
| 			state.sender() | ||||
| 			wg.Done() | ||||
| 		}() | ||||
|  | ||||
| 		state.hostInfo, err = getHostInfo(ctx.String("node-name")) | ||||
| 		if err != nil { | ||||
|   | ||||
| @@ -1,6 +1,7 @@ | ||||
| package logmower | ||||
|  | ||||
| import ( | ||||
| 	"context" | ||||
| 	"time" | ||||
|  | ||||
| 	"github.com/jtagcat/util" | ||||
| @@ -11,61 +12,118 @@ import ( | ||||
| ) | ||||
|  | ||||
| var ( | ||||
| 	promShipperMongoSent = promauto.NewCounter(prom.CounterOpts{ | ||||
| 	promShipperMongoSent = promauto.NewCounterVec(prom.CounterOpts{ | ||||
| 		Subsystem: "shipper", | ||||
| 		Name:      "sent_count", | ||||
| 		Help:      "Log items successfully committed to mongo", | ||||
| 	}) | ||||
| 	promShipperMongoSentError = promauto.NewCounter(prom.CounterOpts{ | ||||
| 		Help:      "Items successfully committed to mongo", | ||||
| 	}, []string{"filename"}) | ||||
| 	promShipperMongoSentError = promauto.NewCounterVec(prom.CounterOpts{ | ||||
| 		Subsystem: "shipper", | ||||
| 		Name:      "mongo_errors", | ||||
| 		Name:      "mongo_errors_count", | ||||
| 		Help:      "Errors while submitting to mongo", // TODO: | ||||
| 	}) | ||||
| 	}, []string{"filename"}) | ||||
| 	promShipperDropped = promauto.NewCounterVec(prom.CounterOpts{ | ||||
| 		Subsystem: "shipper", | ||||
| 		Name:      "queue_dropped", | ||||
| 		Help:      "Items ready to be sent to mongo, but dropped due to full queue", | ||||
| 		Name:      "queue_dropped_count", | ||||
| 		Help:      "Items ready to be batched and sent to mongo, but dropped due to full queue", | ||||
| 	}, []string{"filename"}) | ||||
| 	promShipperQueueItems = promauto.NewHistogramVec(prom.HistogramOpts{ | ||||
| 		Subsystem: "shipper", | ||||
| 		Name:      "queue_items", | ||||
| 		Help:      "Items in queue to be batched and sent to mongo", | ||||
| 	}, []string{"filename"}) | ||||
| 	promShipperSynced = promauto.NewGaugeVec(prom.GaugeOpts{ | ||||
| 		Subsystem: "shipper", | ||||
| 		Name:      "batches_synced", | ||||
| 		Help:      "All batches available have been sent to mongo", | ||||
| 	}, []string{"filename"}) | ||||
| ) | ||||
|  | ||||
| const ( | ||||
| 	SendQueueLimit = 1024 | ||||
| 	MaxBatchItems  = 100 | ||||
| 	MaxBatchTime   = time.Second | ||||
| ) | ||||
|  | ||||
| func init() { | ||||
| 	promauto.NewGaugeFunc(prom.GaugeOpts{ | ||||
| 		Subsystem: "shipper", | ||||
| 		Name:      "queue_size", | ||||
| 		Help:      "Submit queue size", | ||||
| 		Help:      "Submit queue size cap", | ||||
| 	}, func() float64 { | ||||
| 		return float64(SendQueueLimit) | ||||
| 	}) | ||||
| } | ||||
|  | ||||
| func (s *submitter) sender() { | ||||
| 	promauto.NewGaugeFunc(prom.GaugeOpts{ | ||||
| 		Subsystem: "shipper", | ||||
| 		Name:      "queue_items", | ||||
| 		Help:      "Items in queue to be submitted in batch to mongo", | ||||
| 		Name:      "batch_size", | ||||
| 		Help:      "batching size cap", | ||||
| 	}, func() float64 { | ||||
| 		return float64(len(s.sendQueue)) | ||||
| 		return float64(MaxBatchItems) | ||||
| 	}) | ||||
| 	promauto.NewGaugeFunc(prom.GaugeOpts{ | ||||
| 		Subsystem: "shipper", | ||||
| 		Name:      "batch_time", | ||||
| 		Help:      "batching delay cap", | ||||
| 	}, func() float64 { | ||||
| 		return float64(MaxBatchTime) | ||||
| 	}) | ||||
| } | ||||
|  | ||||
| func (s *submitter) sender(name string, sendQueue <-chan mLog) (synced func() bool) { | ||||
| 	batched := make(chan []mLog) | ||||
|  | ||||
| 	go func() { | ||||
| 		util.Batch(4, MaxBatchTime, s.sendQueue, batched) | ||||
| 		ctx, cancel := context.WithCancel(context.Background()) | ||||
| 		defer cancel() | ||||
|  | ||||
| 		go func() { | ||||
| 			for { | ||||
| 				promShipperQueueItems.WithLabelValues(name).Observe(float64( | ||||
| 					len(sendQueue))) | ||||
|  | ||||
| 				timer := time.NewTimer(time.Second) | ||||
| 				select { | ||||
| 				case <-ctx.Done(): | ||||
| 					return | ||||
| 				case <-timer.C: | ||||
| 				} | ||||
| 			} | ||||
| 		}() | ||||
|  | ||||
| 		util.Batch(MaxBatchItems, MaxBatchTime, sendQueue, batched) | ||||
| 	}() | ||||
|  | ||||
| 	var batchSynced bool | ||||
| 	s.Add(1) | ||||
| 	go s.senderRoutine(name, batched, &batchSynced) | ||||
|  | ||||
| 	return func() bool { | ||||
| 		return batchSynced && len(sendQueue) == 0 | ||||
| 	} | ||||
| } | ||||
|  | ||||
| func (s *submitter) senderRoutine(name string, batched <-chan []mLog, synced *bool) { | ||||
| 	defer s.Done() | ||||
|  | ||||
| 	for { | ||||
| 		*synced = true | ||||
| 		promShipperSynced.WithLabelValues(name).Set(1) | ||||
|  | ||||
| 		batch, ok := <-batched | ||||
| 		if !ok { | ||||
| 			return | ||||
| 		} | ||||
|  | ||||
| 		*synced = false | ||||
| 		promShipperSynced.WithLabelValues(name).Set(0) | ||||
|  | ||||
| 		var batchBson []interface{} // mongo does not like typing | ||||
| 		for _, b := range batch { | ||||
| 			batchBson = append(batchBson, b.toBson()) | ||||
| 		} | ||||
|  | ||||
| 		result, err := s.db.InsertMany(mongoTimeoutCtx(s.ctx), batchBson, nil) | ||||
| 		promShipperMongoSent.Add(float64( | ||||
| 		result, err := s.db.InsertMany(mongoTimeoutCtx(context.Background()), batchBson, nil) | ||||
| 		promShipperMongoSent.WithLabelValues(name).Add(float64( | ||||
| 			len(result.InsertedIDs))) | ||||
|  | ||||
| 		if err != nil { | ||||
|   | ||||
| @@ -7,6 +7,7 @@ import ( | ||||
| 	"io" | ||||
| 	"os" | ||||
| 	"path/filepath" | ||||
| 	"sync" | ||||
|  | ||||
| 	"github.com/jtagcat/util" | ||||
| 	prom "github.com/prometheus/client_golang/prometheus" | ||||
| @@ -39,7 +40,7 @@ type ( | ||||
| 		hostInfo HostInfo | ||||
| 		db       *mongo.Collection | ||||
|  | ||||
| 		sendQueue chan mLog | ||||
| 		sync.WaitGroup | ||||
| 	} | ||||
| ) | ||||
|  | ||||
|   | ||||
		Reference in New Issue
	
	Block a user