From b65f624df1dba42b4211895a6e22133d81c82ccb Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Lauri=20V=C3=B5sandi?= Date: Tue, 8 Nov 2022 23:25:15 +0200 Subject: [PATCH] Refactor to be pypy compatible --- Dockerfile | 2 +- log_shipper.py | 352 ++++++++++++++++++++++++------------------------- 2 files changed, 173 insertions(+), 181 deletions(-) diff --git a/Dockerfile b/Dockerfile index 6903597..2ccace3 100644 --- a/Dockerfile +++ b/Dockerfile @@ -1,4 +1,4 @@ FROM harbor.k-space.ee/k-space/microservice-base -RUN pip3 install asyncinotify ujson +RUN pip3 install asyncinotify ujson prometheus-async[aiohttp] ADD log_shipper.py /log_shipper.py ENTRYPOINT /log_shipper.py diff --git a/log_shipper.py b/log_shipper.py index 8375953..7387f1c 100755 --- a/log_shipper.py +++ b/log_shipper.py @@ -1,4 +1,4 @@ -#!/usr/local/bin/python3 -OO +#!/usr/local/bin/python3 import argparse import asyncio import collections @@ -6,22 +6,17 @@ import os import re import socket import ujson +import prometheus_async import pymongo +from aiofile import async_open from asyncinotify import Inotify, Mask from datetime import datetime from math import inf from motor.motor_asyncio import AsyncIOMotorClient from prometheus_client import Counter, Gauge, Histogram -from prometheus_client.exposition import generate_latest from pymongo.errors import CollectionInvalid -from sanic import Sanic, text from time import time -""" -To install dependencies: -pip3 install ujson pymongo motor asyncinotify prometheus_client sanic -""" - parser = argparse.ArgumentParser(description="Log shipper", formatter_class=argparse.ArgumentDefaultsHelpFormatter) parser.add_argument("--dry-run", action="store_true", @@ -50,7 +45,6 @@ parser.add_argument("--heuristic-normalize-log-level", action="store_true", args = parser.parse_args() ROOT = "/var/log/containers" -app = Sanic("tail") tasks = dict() with open("/etc/machine-id") as fh: @@ -65,12 +59,6 @@ host_info = { log_files = dict() -gauge_buffer_size = Gauge( - "logmower_buffer_size_bytes", - "Log files buffered in memory") -gauge_backlog_size = Gauge( - "logmower_backlog_size_bytes", - "Content that is yet to be submitted") gauge_log_files = Gauge( "logmower_log_file_count", "Number of tracked log files", @@ -203,10 +191,15 @@ async def uploader(coll, queue): break else: gauge_queue_entries.set(queue.qsize()) - o["event"]["ingested"] = datetime.utcnow() messages.append(o) if not messages: continue + + # Set ingestion timestamp + now = datetime.utcnow() + for o in messages: + o["event"]["ingested"] = now + try: then = time() await coll.insert_many(messages) @@ -223,7 +216,12 @@ async def uploader(coll, queue): counter_bulk_insertion_errors.labels(j).inc() print("Bulk insert failed: %s" % j) for o in messages: + # Remove ObjectID set during insert_many, + # as we want duplicate errors to be caused only by + # combination of log.file and log.offset collisions o.pop("_id", None) + + # Reset ingestion timestamp o["event"]["ingested"] = datetime.utcnow() try: then = time() @@ -243,70 +241,48 @@ async def uploader(coll, queue): messages = [] -class FileTailer(object): - def __init__(self, path, offset=0, finished=False): - self.head = offset - self.tail = offset + 1 - self.offset = offset +class LogFile(object): + def __init__(self, coll, queue, path, namespace_name, pod_name, container_name, start=False): + self.offset = 0 self.path = path self.buf = b"" - self.finished = finished - self.caughtup = False + self.finished = False self.more_content = asyncio.Event() - - async def __aiter__(self): - with open(self.path, "rb") as fh: - while True: - if not self.finished and self.head >= self.tail: - self.caughtup = True - await self.more_content.wait() - self.more_content.clear() - self.tail = fh.seek(0, os.SEEK_END) - if self.head >= self.tail: - if self.finished: - # TODO: if there is still something in buf? - break - continue - fh.seek(self.head) - chunk = fh.read(min(self.tail - self.head, 4096)) - self.buf += chunk - self.head += len(chunk) - while True: - step = self.buf.find(b"\n") - if step == -1: - break - buf = self.buf[:step + 1] - self.buf = self.buf[step + 1:] - await asyncio.sleep(0) - yield self.offset, len(buf), buf[:-1].decode("utf-8") - self.offset += step + 1 - break - - -class LogFile(FileTailer): - def __init__(self, loop, coll, queue, path, namespace_name, pod_name, container_name, start=False): - FileTailer.__init__(self, path) self.queue = queue self.namespace_name = namespace_name self.pod_name = pod_name self.container_name = container_name self.coll = coll - self.state = "seeking" - self.loop = loop + self._state = None + self.state = "init" if start: self.start() - def start(self): - self.loop.create_task(self.handler_loop()) + @property + def state(self): + return self._state - def poke(self): + @state.setter + def state(self, value): + self._state = value + c = collections.Counter([j.state for j in log_files.values()]) + for key in ("seeking", "replaying", "watching", "closing"): + gauge_log_files.labels(key).set(c[key]) + + def done(self): + # Do not expect more content in this file + self.finished = True + self.notify() + + def notify(self): + # Signal that there is more content in this file self.more_content.set() - def close(self): - self.done = True - self.poke() + def start(self): + asyncio.create_task(self.handler_loop()) async def handler_loop(self): + self.state = "seeking" message = "" record_size = 0 skip_next = False @@ -318,115 +294,138 @@ class LogFile(FileTailer): }, sort=[("log.offset", -1)]) histogram_database_operation_latency.labels("find-replay-offset").observe(time() - then) if last_record: - self.head = self.offset = last_record["log"]["offset"] - counter_skipped_bytes.inc(self.head) + self.offset = last_record["log"]["offset"] + counter_skipped_bytes.inc(self.offset) + print("Skipping", self.offset, "bytes for", self.path) skip_next = True self.state = "replaying" record_offset = self.offset - async for line_offset, line_size, line in self: - self.state = "watching" if self.caughtup else "replaying" - assert "\n" not in line - try: - reason = "unicode-encoding" - if len(line) < 45: - reason = "line-short" - raise ValueError() - if not re.match("^(.+) (stdout|stderr)( (.))? (.*)$", line): - reason = "no-regex-match" - raise ValueError() - reason = "invalid-timestamp" - event_created = datetime.strptime(line[:23], "%Y-%m-%dT%H:%M:%S.%f") - except ValueError: - print("Failed to parse file %s at offset %d, reason %s: %s" % (self.path, line_offset, reason, repr(line))) - break + line_offset = self.offset - histogram_line_size.observe(line_size) - record_size += line_size + async with async_open(self.path, "rb") as fp: + fp.seek(self.offset) + while True: + buf = await fp.readline() + self.offset += len(buf) + if not buf and self.finished: + break + if not buf and self.state != "watching": + print("Finished replaying:", self.path) + self.state = "watching" + self.buf += buf + if not buf or not buf.endswith(b"\n"): + await self.more_content.wait() + self.more_content.clear() + continue - if record_size < args.max_record_size: - # TODO: Support Docker runtime on EKS - message += line[45:] + line_size = len(self.buf) + line = self.buf[:-1].decode("utf-8") - state = line[43] - if state == "P": - # This is partial message - continue - assert state == "F", "Unknown line state" - o = {} - o["message"] = message - o["log"] = {} - message = "" - record_size = 0 + record_offset = line_offset + line_offset = self.offset + self.buf = b"" - if record_size > args.max_record_size: - counter_records.labels("too-large").inc() - # TODO: Log portion of the message - continue - - stream = line[36:42].strip() - if args.heuristic_parse_json and o["message"].startswith("{\""): - # TODO: Follow Filebeat hints try: - j = ujson.loads(message) - except ujson.JSONDecodeError: - counter_heuristic_failures.labels("invalid-json").inc() - else: - # Merge only if parsed JSON message looks like it's - # conforming to ECS schema - if "@timestamp" in j and "message" in j: - o.update(j) - else: - o["json"] = j + reason = "unicode-encoding" + if len(line) < 45: + reason = "line-short" + raise ValueError() + if not re.match("^(.+) (stdout|stderr)( (.))? (.*)$", line): + reason = "no-regex-match" + raise ValueError() + reason = "invalid-timestamp" + event_created = datetime.strptime(line[:23], "%Y-%m-%dT%H:%M:%S.%f") + except ValueError: + print("Failed to parse file %s at offset %d, reason %s: %s" % (self.path, line_offset, reason, repr(line))) + break - o["kubernetes"] = { - "container": { - "name": self.container_name, - }, - "namespace": self.namespace_name, - "pod": { - "name": self.pod_name - } - } - o["log"]["file"] = { - "path": self.path - } - o["log"]["offset"] = record_offset - o["host"] = host_info - o["stream"] = stream - o["event"] = { - "created": event_created - } + histogram_line_size.observe(line_size) + record_size += line_size - if args.heuristic_normalize_log_level: - if "level" in o["log"]: - level = o["log"]["level"].strip().lower() + if record_size < args.max_record_size: + # TODO: Support Docker runtime on EKS + message += line[45:] + + state = line[43] + if state == "P": + # This is partial message + continue + assert state == "F", "Unknown line state" + o = {} + o["message"] = message + o["log"] = {} + message = "" + record_size = 0 + + if record_size > args.max_record_size: + counter_records.labels("too-large").inc() + # TODO: Log portion of the message + continue + + stream = line[36:42].strip() + if args.heuristic_parse_json and o["message"].startswith("{\""): + # TODO: Follow Filebeat hints try: - o["log"]["level"] = NORMALIZED_LOG_LEVELS[level] - except KeyError: - counter_heuristic_failures.labels("invalid-log-level").inc() - else: - o["log"]["level"] = "error" if stream == "stderr" else "info" + j = ujson.loads(message) + except ujson.JSONDecodeError: + counter_heuristic_failures.labels("invalid-json").inc() + else: + # Merge only if parsed JSON message looks like it's + # conforming to ECS schema + if "@timestamp" in j and "message" in j: + o.update(j) + else: + o["json"] = j - if "@timestamp" not in o: - o["@timestamp"] = o["event"]["created"] - o.pop("_id", None) + o["kubernetes"] = { + "container": { + "name": self.container_name, + }, + "namespace": self.namespace_name, + "pod": { + "name": self.pod_name + } + } + o["log"]["file"] = { + "path": self.path + } + o["log"]["offset"] = record_offset + o["host"] = host_info + o["stream"] = stream + o["event"] = { + "created": event_created + } - if not skip_next: - await self.queue.put(o) - gauge_queue_entries.set(self.queue.qsize()) - skip_next = False - record_offset = line_offset + if args.heuristic_normalize_log_level: + if "level" in o["log"]: + level = o["log"]["level"].strip().lower() + try: + o["log"]["level"] = NORMALIZED_LOG_LEVELS[level] + except KeyError: + counter_heuristic_failures.labels("invalid-log-level").inc() + else: + o["log"]["level"] = "error" if stream == "stderr" else "info" + + if "@timestamp" not in o: + o["@timestamp"] = o["event"]["created"] + o.pop("_id", None) + + if not skip_next: + await self.queue.put(o) + gauge_queue_entries.set(self.queue.qsize()) + skip_next = False + record_offset = line_offset self.state = "closing" log_files.pop(self.path) -async def watcher(loop, queue, coll): +async def watcher(queue, coll): print("Starting watching") with Inotify() as inotify: - def add_file(path, done=False, start=False): + def add_file(path, finished=False, start=False): if path in log_files: - log_files[path].done = done + log_files[path].finished = finished return log_files[path] print("Adding file: %s" % path) @@ -444,9 +443,8 @@ async def watcher(loop, queue, coll): return if args.namespace and namespace_name != args.namespace: return - lf = log_files[path] = LogFile(loop, coll, queue, path, namespace_name, pod_name, container_name) - lf.done = done - lf.start() + lf = log_files[path] = LogFile(coll, queue, path, namespace_name, pod_name, container_name, start=start) + lf.finished = finished inotify.add_watch(path, Mask.MODIFY | Mask.CLOSE_WRITE) return lf @@ -469,18 +467,19 @@ async def watcher(loop, queue, coll): print("Unexpected filename:", filename) continue path = os.path.join("/var/log/pods", pod_dir, container_name, filename) - add_file(path, done=True) + add_file(path, finished=True) - # Inspect currently running containers + # Add currently running containers as not finished for filename in os.listdir("/var/log/containers"): path = os.path.realpath(os.path.join(os.path.join("/var/log/containers", filename))) - add_file(path, done=False) + add_file(path, finished=False) + # Start coroutines after we know for sure which ones have finished for log_file in log_files.values(): log_file.start() async for event in inotify: - # Events for /var/log/pods + # Events for /var/log/containers if event.mask & Mask.CREATE: counter_inotify_events.labels("create").inc() add_file(os.path.realpath(event.path), start=True) @@ -490,29 +489,21 @@ async def watcher(loop, queue, coll): print("File closed: %s" % event.path) counter_inotify_events.labels("close_write").inc() log_file = log_files.get(str(event.path)) - log_file.close() + if log_file: + # TODO: Why does this happen? + log_file.done() elif event.mask & Mask.MODIFY: counter_inotify_events.labels("modify").inc() log_file = log_files.get(str(event.path)) if log_file: - # TODO: Count cases where log_file is None - log_file.poke() + # In some cases MODIFY events are triggered after CLOSE_WRITE + log_file.notify() elif event.mask & Mask.IGNORED: counter_inotify_events.labels("ignored").inc() else: raise NotImplementedError("Unhandled inotify event: %s" % event) -@app.route("/metrics") -async def handler(request): - gauge_buffer_size.set(sum([len(j.buf) for j in log_files.values()])) - gauge_backlog_size.set(sum([j.tail - j.head for j in log_files.values()])) - c = collections.Counter([j.state for j in log_files.values()]) - for key in ("seeking", "replaying", "watching", "closing"): - gauge_log_files.labels(key).set(c[key]) - return text(generate_latest().decode("utf-8")) - - async def dumper(queue): while True: try: @@ -524,11 +515,11 @@ async def dumper(queue): print(o) -@app.listener("before_server_start") -async def init(sanic, loop): +async def main(): queue = asyncio.Queue(args.max_upload_queue_size) + tasks = [] if not args.dry_run: - db = AsyncIOMotorClient(os.environ["MONGODB_HOST"], + db = AsyncIOMotorClient(os.environ["MONGO_URI"], maxPoolSize=args.max_connection_pool_size).get_default_database() try: await db.create_collection("log", @@ -537,11 +528,12 @@ async def init(sanic, loop): except CollectionInvalid: pass coll = db["log"] - loop.create_task(uploader(coll, queue)) + tasks.append(uploader(coll, queue)) else: coll = None - loop.create_task(dumper(queue)) - loop.create_task(watcher(loop, queue, coll)) + tasks.append(dumper(queue)) + tasks.append(prometheus_async.aio.web.start_http_server(addr="0.0.0.0", port=8000)) + tasks.append(watcher(queue, coll)) + await asyncio.gather(*tasks) - -app.run(host="0.0.0.0", single_process=True) +asyncio.run(main())