diff --git a/log_shipper.py b/log_shipper.py index 83ee9c5..04ef899 100755 --- a/log_shipper.py +++ b/log_shipper.py @@ -97,6 +97,10 @@ counter_insertion_errors = Counter( "logmower_insertion_error_count", "Exceptions caught during insertion of single event", ["exception"]) +counter_bulk_insertion_errors = Counter( + "logmower_bulk_insertion_error_count", + "Exceptions caught during bulk insertions", + ["exception"]) counter_bulk_insertions = Counter( "logmower_bulk_insertion_count", "Count of bulk insertions to database", @@ -191,9 +195,10 @@ async def uploader(coll, queue): o = await asyncio.wait_for(queue.get(), timeout=0.1) except asyncio.exceptions.TimeoutError: break - gauge_queue_entries.set(queue.qsize()) - o["event"]["ingested"] = datetime.utcnow() - messages.append(o) + else: + gauge_queue_entries.set(queue.qsize()) + o["event"]["ingested"] = datetime.utcnow() + messages.append(o) if not messages: continue try: @@ -202,10 +207,13 @@ async def uploader(coll, queue): await coll.insert_many(messages) histogram_database_operation_latency.labels("insert-many").observe(time() - then) except pymongo.errors.ServerSelectionTimeoutError: + counter_bulk_insertions.labels("timed-out").inc() continue except pymongo.errors.BulkWriteError as e: - print("Bulk insertion failed:", e) - counter_bulk_insertions.labels("failed").inc() + counter_bulk_insertions.labels("retried-as-singles").inc() + j = "%s.%s" % (e.__class__.__module__, e.__class__.__name__) + counter_bulk_insertion_errors.labels(j).inc() + print("Bulk insert failed: %s" % j) for o in messages: o.pop("_id", None) o["event"]["ingested"] = datetime.utcnow()