From 2bbcb1a51b914877b3c19033bd8a9e75d3e23d08 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Lauri=20V=C3=B5sandi?= Date: Wed, 9 Nov 2022 15:53:49 +0200 Subject: [PATCH] Avoid offset lookup for newly created files --- log_shipper.py | 12 +++++++----- 1 file changed, 7 insertions(+), 5 deletions(-) diff --git a/log_shipper.py b/log_shipper.py index 401af4b..ce74f5c 100755 --- a/log_shipper.py +++ b/log_shipper.py @@ -254,7 +254,7 @@ async def uploader(coll, queue): class LogFile(object): - def __init__(self, coll, queue, path, namespace_name, pod_name, container_name, start=False): + def __init__(self, coll, queue, path, namespace_name, pod_name, container_name, start=False, lookup_offset=True): self.offset = 0 self.path = path self.buf = b"" @@ -267,6 +267,7 @@ class LogFile(object): self.coll = coll self._state = None self.state = "init" + self.lookup_offset = lookup_offset if start: self.start() @@ -304,7 +305,7 @@ class LogFile(object): "host.id": host_info["id"], "log.file.path": self.path }, sort=[("log.offset", -1)]) - histogram_database_operation_latency.labels("find-replay-offset").observe(time() - then) + histogram_database_operation_latency.labels("offset-lookup").observe(time() - then) if last_record: self.offset = last_record["log"]["offset"] counter_skipped_bytes.inc(self.offset) @@ -434,7 +435,7 @@ class LogFile(object): async def watcher(queue, coll): print("Starting watching") with Inotify() as inotify: - def add_file(path, finished=False, start=False): + def add_file(path, finished=False, start=False, lookup_offset=True): if path in log_files: log_files[path].finished = finished return log_files[path] @@ -454,7 +455,8 @@ async def watcher(queue, coll): return if args.namespace and namespace_name != args.namespace: return - lf = log_files[path] = LogFile(coll, queue, path, namespace_name, pod_name, container_name, start=start) + lf = log_files[path] = LogFile(coll, queue, path, namespace_name, + pod_name, container_name, start, lookup_offset) lf.finished = finished inotify.add_watch(path, Mask.MODIFY | Mask.CLOSE_WRITE) return lf @@ -493,7 +495,7 @@ async def watcher(queue, coll): # Events for /var/log/containers if event.mask & Mask.CREATE: counter_inotify_events.labels("create").inc() - add_file(os.path.realpath(event.path), start=True) + add_file(os.path.realpath(event.path), start=True, lookup_offset=False) # Events for /var/log/pods elif event.mask & Mask.CLOSE_WRITE: