#!/usr/local/bin/python3 -OO import argparse import asyncio import collections import os import re import socket import ujson import pymongo from asyncinotify import Inotify, Mask from datetime import datetime from math import inf from motor.motor_asyncio import AsyncIOMotorClient from prometheus_client import Counter, Gauge, Histogram from prometheus_client.exposition import generate_latest from pymongo.errors import CollectionInvalid from sanic import Sanic, text from time import time """ To install dependencies: pip3 install ujson pymongo motor asyncinotify prometheus_client sanic """ parser = argparse.ArgumentParser(description="Log shipper") parser.add_argument("--dry-run", action="store_true", help="Do not insert anything into database") parser.add_argument("--namespace", type=str) parser.add_argument("--exclude-pod-prefixes", nargs="*", type=str, default=["logmower-"]) parser.add_argument("--max-record-size", type=int, default=128 * 1024) # 128kB parser.add_argument("--max-collection-size", type=int, default=2**30) # 1GiB parser.add_argument("--normalize-log-level", action="store_true", help="Normalize log.level values to Syslog defined keywords") parser.add_argument("--bulk-insertion-size", type=int, default=1000) parser.add_argument("--parse-json", action="store_true") args = parser.parse_args() ROOT = "/var/log/containers" app = Sanic("tail") fhs = dict() tasks = dict() tails = collections.Counter() with open("/etc/machine-id") as fh: machine_id = fh.read().strip() host_info = { "id": machine_id, "architecture": os.uname().machine, "name": os.environ.get("NODE_NAME", socket.getfqdn()) } log_files = dict() gauge_log_files = Gauge( "logmower_log_file_count", "Number of tracked log files", ["state"]) gauge_queue_entries = Gauge( "logmower_queue_record_count", "Records queued for submission") counter_unexpected_filenames = Counter( "logmower_invalid_filename_count", "Count of unexpected filenames in logs directory") counter_inotify_events = Counter( "logmower_inotify_event_count", "Count of inotify events", ["mask"]) counter_skipped_bytes = Counter( "logmower_skipped_bytes", "Bytes that were skipped during startup due to being already present in data store") counter_dropped_lines = Counter( "logmower_dropped_lines", "Lines dropped due to being part of too long record") counter_records = Counter( "logmower_record_count", "Record count", ["stage"]) counter_insertion_errors = Counter( "logmower_insertion_error_count", "Exceptions caught during insertion of single event", ["exception"]) counter_bulk_insertions = Counter( "logmower_bulk_insertion_count", "Count of bulk insertions to database", ["status"]) histogram_bulk_submission_size = Histogram( "logmower_bulk_submission_message_count", "Bulk submission message count", buckets=(1, 5, 10, 50, 100, 500, 1000, 5000, 10000)) histogram_database_operation_latency = Histogram( "logmower_database_operation_latency", "Database operation latency", ["operation"], buckets=(0.1, 0.2, 0.5, 1, 5, 10, 50)) histogram_bulk_submission_latency = Histogram( "logmower_bulk_submission_latency", "Bulk submission latency", buckets=(0.1, 0.2, 0.5, 1, 5, 10, 50)) histogram_line_size = Histogram( "logmower_line_size_bytes", "Log file line size in sizes", buckets=(80, 160, 320, 640, 1280, inf)) async def uploader(coll, queue): then = time() await coll.create_index([("@timestamp", 1)], expireAfterSeconds=3600 * 24 * 3) # Following index is used to look up where to resume submitting logs # after restart/crash await coll.create_index([("log.file.path", 1), ("log.offset", 1)]) # Indexes used for frequent searches await coll.create_index([("host.name", 1)]) await coll.create_index([("kubernetes.pod.name", 1)], sparse=True) await coll.create_index([("kubernetes.namespace", 1), ("kubernetes.pod.name", 1), ("kubernetes.container.name", 1)], sparse=True) histogram_database_operation_latency.labels("create-index").observe(time() - then) messages = [] while True: while len(messages) < args.bulk_insertion_size: try: o = await asyncio.wait_for(queue.get(), timeout=0.1) except asyncio.exceptions.TimeoutError: break gauge_queue_entries.set(queue.qsize()) o["event"]["ingested"] = datetime.utcnow() messages.append(o) if not messages: continue try: # TODO: Don't retry submitting messages commit by bulk insert above then = time() await coll.insert_many(messages) histogram_database_operation_latency.labels("insert-many").observe(time() - then) except pymongo.errors.ServerSelectionTimeoutError: continue except pymongo.errors.BulkWriteError: counter_bulk_insertions.labels("failed").inc() for o in messages: o.pop("_id", None) o["event"]["ingested"] = datetime.utcnow() try: then = time() await coll.insert_one(o) histogram_database_operation_latency.labels("insert-one").observe(time() - then) except Exception as e: j = "%s.%s" % (e.__class__.__module__, e.__class__.__name__) counter_insertion_errors.labels(j).inc() counter_records.labels("dropped").inc() print("Failed to insert (%s): %s" % (j, o)) else: counter_records.labels("commited").inc() else: counter_bulk_insertions.labels("successful").inc() histogram_bulk_submission_size.observe(len(messages)) counter_records.labels("commited").inc(len(messages)) messages = [] class LogFile(object): def __init__(self, loop, coll, queue, path, namespace_name, pod_name, container_name, container_uid): self.path = path self.tail = 0 self.more_content = asyncio.Event() self.fh = open(path) self.queue = queue self.namespace_name = namespace_name self.pod_name = pod_name self.container_name = container_name self.container_uid = container_uid self.running = True self.coll = coll self.poke() self.state = "seeking" self.done = False loop.create_task(self.handler_loop()) def poke(self): self.tail = self.fh.seek(0, os.SEEK_END) self.more_content.set() def close(self): self.done = True self.poke() async def handler_loop(self): message = "" record_size = 0 self.head = 0 skip_next = False if not args.dry_run: then = time() last_record = await self.coll.find_one({ "host.id": host_info["id"], "log.file.path": self.path }, sort=[("log.offset", -1)]) histogram_database_operation_latency.labels("find-replay-offset").observe(time() - then) if last_record: self.head = last_record["log"]["offset"] counter_skipped_bytes.inc(self.head) skip_next = True self.state = "replaying" offset = self.head while self.running: while self.head >= self.tail: self.state = "watching" if self.done: break await self.more_content.wait() self.more_content.clear() assert self.head < self.tail self.fh.seek(self.head) buf = self.fh.readline() try: if len(buf) < 45: raise ValueError() if not buf[-1] == "\n": raise ValueError() if not re.match("^(.+) (stdout|stderr)( (.))? (.*)$", buf[:-1]): raise ValueError() event_created = datetime.strptime(buf[:23], "%Y-%m-%dT%H:%M:%S.%f") except ValueError: print("Failed to parse file %s at offset %d" % (self.path, self.head)) break histogram_line_size.observe(len(buf)) self.head += len(buf) record_size += len(buf) if record_size < args.max_record_size: # TODO: Support Docker runtime on EKS message += buf[45:-1] state = buf[43] if state == "P": # This is partial message continue assert state == "F", "Unknown line state" o = {} o["message"] = message o["log"] = {} message = "" record_size = 0 if record_size > args.max_record_size: counter_records.labels("too-large").inc() # TODO: Log portion of the message continue stream = buf[36:42].strip() if message.startswith("{\""): # TODO: Follow Filebeat hints try: j = ujson.loads(message) except ujson.JSONDecodeError: counter_records.labels("invalid-json").inc() else: counter_records.labels("json").inc() # Merge only if parsed JSON message looks like it's # conforming to ECS schema if "@timestamp" in j and "message" in j: o.update(j) else: o["json"] = j o["kubernetes"] = { "container": { "name": self.container_name, "id": self.container_uid }, "namespace": self.namespace_name, "pod": { "name": self.pod_name } } o["log"]["file"] = { "path": self.path } o["log"]["offset"] = offset o["host"] = host_info o["stream"] = stream o["event"] = { "created": event_created } if "@timestamp" not in o: o["@timestamp"] = o["event"]["created"] o.pop("_id", None) if not skip_next: await self.queue.put(o) gauge_queue_entries.set(self.queue.qsize()) skip_next = False offset = self.head self.state = "closing" self.fh.close() log_files.pop(self.path) async def watcher(loop, queue, coll): print("Starting watching") with Inotify() as inotify: async def add_file(path): print("Adding file: %s" % path) m = re.match("(.*)_(.*)_(.*)-([0-9abcdef]{64})\\.log$", os.path.basename(path)) if not m: counter_unexpected_filenames.inc() return pod_name, namespace_name, container_name, container_uid = m.groups() for prefix in args.exclude_pod_prefixes: if pod_name.startswith(prefix): return if args.namespace and namespace_name != args.namespace: return # Handle log rotation as /var/logs/containers path is a symlink to # actual file under /var/logs/pods path = os.readlink(path) assert path not in log_files log_files[path] = LogFile(loop, coll, queue, path, namespace_name, pod_name, container_name, container_uid) inotify.add_watch(path, Mask.MODIFY | Mask.CLOSE_WRITE | Mask.DELETE_SELF) inotify.add_watch(ROOT, Mask.CREATE | Mask.ONLYDIR) for filename in os.listdir(ROOT): await add_file(os.path.join(ROOT, filename)) async for event in inotify: if event.mask & Mask.CREATE: counter_inotify_events.labels("create").inc() await add_file(str(event.path)) elif event.mask & Mask.DELETE_SELF: print("File deleted: %s" % event.path) counter_inotify_events.labels("delete_self").inc() log_file = log_files.get(str(event.path)) if log_file: log_file.close() elif event.mask & Mask.CLOSE_WRITE: print("File closed: %s" % event.path) counter_inotify_events.labels("close_write").inc() # TODO: Close opened file already now instead waiting for deletion elif event.mask & Mask.MODIFY: counter_inotify_events.labels("modify").inc() log_file = log_files.get(str(event.path)) if log_file: log_file.poke() elif event.mask & Mask.IGNORED: counter_inotify_events.labels("ignored").inc() else: raise NotImplementedError("Unhandled inotify event: %s" % event) @app.route("/metrics") async def handler(request): c = collections.Counter([j.state for j in log_files.values()]) for key in ("seeking", "replaying", "watching", "closing"): gauge_log_files.labels(key).set(c[key]) return text(generate_latest().decode("utf-8")) async def dumper(queue): while True: try: o = await asyncio.wait_for(queue.get(), timeout=0.1) except asyncio.exceptions.TimeoutError: break else: gauge_queue_entries.set(queue.qsize()) print(o) @app.listener("before_server_start") async def init(sanic, loop): queue = asyncio.Queue(10000) if not args.dry_run: db = AsyncIOMotorClient(os.environ["MONGODB_HOST"]).get_default_database() try: await db.create_collection("log", capped=True, size=args.max_collection_size) except CollectionInvalid: pass coll = db["log"] loop.create_task(uploader(coll, queue)) else: coll = None loop.create_task(dumper(queue)) loop.create_task(watcher(loop, queue, coll)) app.run(host="0.0.0.0", single_process=True)