diff --git a/log_shipper.py b/log_shipper.py index e4b2657..699d9a1 100755 --- a/log_shipper.py +++ b/log_shipper.py @@ -22,17 +22,31 @@ To install dependencies: pip3 install ujson pymongo motor asyncinotify prometheus_client sanic """ -parser = argparse.ArgumentParser(description="Log shipper") +parser = argparse.ArgumentParser(description="Log shipper", + formatter_class=argparse.ArgumentDefaultsHelpFormatter) parser.add_argument("--dry-run", action="store_true", - help="Do not insert anything into database") -parser.add_argument("--namespace", type=str) -parser.add_argument("--exclude-pod-prefixes", nargs="*", type=str, default=["logmower-"]) -parser.add_argument("--max-record-size", type=int, default=128 * 1024) # 128kB -parser.add_argument("--max-collection-size", type=int, default=2**30) # 1GiB -parser.add_argument("--normalize-log-level", action="store_true", - help="Normalize log.level values to Syslog defined keywords") -parser.add_argument("--bulk-insertion-size", type=int, default=1000) -parser.add_argument("--parse-json", action="store_true") + help="Do not insert anything into database") +parser.add_argument("--namespace", type=str, + help="Namespace to monitor, all by default") +parser.add_argument("--exclude-pod-prefixes", nargs="*", type=str, default=["logmower-"], + help="Pod prefixes to exclude") +parser.add_argument("--max-upload-queue-size", type=int, default=10000, + help="Max upload queue size in records") +parser.add_argument("--max-connection-pool-size", type=int, default=1, + help="Max MongoDB connection pool size") +parser.add_argument("--max-record-size", type=int, default=128 * 1024, + help="Max record size in bytes, 128k by default") +parser.add_argument("--max-record-retention", type=int, + help="Record retention in seconds, never by default") +parser.add_argument("--max-collection-size", type=int, + help="MongoDB collection size limit in bytes, by default disabled") +parser.add_argument("--bulk-insertion-size", type=int, default=1000, + help="MongoDB bulk insertion size in records") +parser.add_argument("--heuristic-parse-json", action="store_true", + help="Attempt automatically unwrapping JSON records") +parser.add_argument("--heuristic-normalize-log-level", action="store_true", + help="Normalize log.level values to Syslog defined keywords") + args = parser.parse_args() ROOT = "/var/log/containers" @@ -71,6 +85,10 @@ counter_skipped_bytes = Counter( counter_dropped_lines = Counter( "logmower_dropped_lines", "Lines dropped due to being part of too long record") +counter_heuristic_failures = Counter( + "logmower_heuristic_failed_record_count", + "Heuristic failures", + ["mode"]) counter_records = Counter( "logmower_record_count", "Record count", @@ -102,15 +120,59 @@ histogram_line_size = Histogram( buckets=(80, 160, 320, 640, 1280, inf)) +NORMALIZED_LOG_LEVELS = { + # Syslog level emergency (0), should not be used by applications + "emerg": "emergency", + "panic": "emergency", + + # Syslog level alert (1) + "a": "alert", + + # Syslog level critical (2), likely results in program exit + "crit": "critical", + "fatal": "critical", + "f": "critical", + + # Syslog level error (3) + "err": "error", + "e": "error", + + # Syslog level warning (4) + "warn": "warning", + "w": "warning", + + # Following log levels should not be enabled by default + + # Syslog level notice (5) + "n": "notice", + + # Syslog level informational (6) + "informational": "info", + "i": "info", + + # Syslog level debug (7) + "d": "debug", + "d1": "debug", + "d2": "debug", + "d3": "debug", + "d4": "debug", + "d5": "debug", + "trace": "debug", +} + + async def uploader(coll, queue): then = time() - await coll.create_index([("@timestamp", 1)], - expireAfterSeconds=3600 * 24 * 3) + kwargs = {} + if args.max_record_retention: + kwargs["expireAfterSeconds"] = args.max_record_retention + await coll.create_index([("@timestamp", 1)], **kwargs) # Following index is used to look up where to resume submitting logs # after restart/crash await coll.create_index([("log.file.path", 1), - ("log.offset", 1)]) + ("log.offset", 1)], + unique=True) # Indexes used for frequent searches await coll.create_index([("host.name", 1)]) @@ -267,14 +329,13 @@ class LogFile(object): continue stream = line[36:42].strip() - if message.startswith("{\""): + if args.heuristic_parse_json and o["message"].startswith("{\""): # TODO: Follow Filebeat hints try: j = ujson.loads(message) except ujson.JSONDecodeError: - counter_records.labels("invalid-json").inc() + counter_heuristic_failures.labels("invalid-json").inc() else: - counter_records.labels("json").inc() # Merge only if parsed JSON message looks like it's # conforming to ECS schema if "@timestamp" in j and "message" in j: @@ -301,6 +362,16 @@ class LogFile(object): "created": event_created } + if args.heuristic_normalize_log_level: + if "level" in o["log"]: + level = o["log"]["level"].strip().lower() + try: + o["log"]["level"] = NORMALIZED_LOG_LEVELS[level] + except KeyError: + counter_heuristic_failures.labels("invalid-log-level").inc() + else: + o["log"]["level"] = "error" if stream == "stderr" else "info" + if "@timestamp" not in o: o["@timestamp"] = o["event"]["created"] o.pop("_id", None) @@ -328,10 +399,11 @@ async def watcher(loop, queue, coll): if not m: print("Unexpected filename:", path) - raise counter_unexpected_filenames.inc() return namespace_name, pod_name, container_name = m.groups() + if args.namespace and args.namespace == namespace_name: + return for prefix in args.exclude_pod_prefixes: if pod_name.startswith(prefix): return @@ -417,11 +489,14 @@ async def dumper(queue): @app.listener("before_server_start") async def init(sanic, loop): - queue = asyncio.Queue(10000) + queue = asyncio.Queue(args.max_upload_queue_size) if not args.dry_run: - db = AsyncIOMotorClient(os.environ["MONGODB_HOST"]).get_default_database() + db = AsyncIOMotorClient(os.environ["MONGODB_HOST"], + maxPoolSize=args.max_connection_pool_size).get_default_database() try: - await db.create_collection("log", capped=True, size=args.max_collection_size) + await db.create_collection("log", + capped=bool(args.max_collection_size), + size=args.max_collection_size) except CollectionInvalid: pass coll = db["log"]