From 965b73754887513f7a4e8db7e95e058a2c14983d Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Lauri=20V=C3=B5sandi?= Date: Wed, 9 Nov 2022 15:50:21 +0200 Subject: [PATCH] Rename arguments --- log_shipper.py | 47 +++++++++++++++++++++++++++++------------------ 1 file changed, 29 insertions(+), 18 deletions(-) diff --git a/log_shipper.py b/log_shipper.py index 7387f1c..401af4b 100755 --- a/log_shipper.py +++ b/log_shipper.py @@ -21,26 +21,38 @@ parser = argparse.ArgumentParser(description="Log shipper", formatter_class=argparse.ArgumentDefaultsHelpFormatter) parser.add_argument("--dry-run", action="store_true", help="Do not insert anything into database") + +# Target selectors parser.add_argument("--namespace", type=str, - help="Namespace to monitor, all by default") + help="Namespace to watch, all by default") parser.add_argument("--exclude-pod-prefixes", nargs="*", type=str, default=["logmower-"], - help="Pod prefixes to exclude") + help="Pod prefixes to exclude in any of the watched namespaces") + +# Tunables parser.add_argument("--max-upload-queue-size", type=int, default=10000, help="Max upload queue size in records") parser.add_argument("--max-connection-pool-size", type=int, default=1, help="Max MongoDB connection pool size") parser.add_argument("--max-record-size", type=int, default=128 * 1024, help="Max record size in bytes, 128k by default") +parser.add_argument("--bulk-insertion-size", type=int, default=1000, + help="MongoDB bulk insertion size in records") + +# Retention parser.add_argument("--max-record-retention", type=int, help="Record retention in seconds, never by default") parser.add_argument("--max-collection-size", type=int, help="MongoDB collection size limit in bytes, by default disabled") -parser.add_argument("--bulk-insertion-size", type=int, default=1000, - help="MongoDB bulk insertion size in records") -parser.add_argument("--heuristic-parse-json", action="store_true", - help="Attempt automatically unwrapping JSON records") -parser.add_argument("--heuristic-normalize-log-level", action="store_true", + +# Optional heuristics +parser.add_argument("--parse-json", action="store_true", + help="Parse log records that look like JSON") +parser.add_argument("--merge-top-level", action="store_true", + help="Merge decoded JSON records on top level if '@timestamp' and 'message' fields are present (looks like ECS schema)") +parser.add_argument("--normalize-log-level", action="store_true", help="Normalize log.level values to Syslog defined keywords") +parser.add_argument("--stream-to-log-level", action="store_true", + help="Upon missing log.level map stderr to 'error' and stdout to 'info'") args = parser.parse_args() @@ -364,7 +376,7 @@ class LogFile(object): continue stream = line[36:42].strip() - if args.heuristic_parse_json and o["message"].startswith("{\""): + if args.parse_json and o["message"].startswith("{\""): # TODO: Follow Filebeat hints try: j = ujson.loads(message) @@ -373,7 +385,7 @@ class LogFile(object): else: # Merge only if parsed JSON message looks like it's # conforming to ECS schema - if "@timestamp" in j and "message" in j: + if args.merge_top_level and "@timestamp" in j and "message" in j: o.update(j) else: o["json"] = j @@ -397,15 +409,14 @@ class LogFile(object): "created": event_created } - if args.heuristic_normalize_log_level: - if "level" in o["log"]: - level = o["log"]["level"].strip().lower() - try: - o["log"]["level"] = NORMALIZED_LOG_LEVELS[level] - except KeyError: - counter_heuristic_failures.labels("invalid-log-level").inc() - else: - o["log"]["level"] = "error" if stream == "stderr" else "info" + if args.normalize_log_level and "level" in o["log"]: + level = o["log"]["level"].strip().lower() + try: + o["log"]["level"] = NORMALIZED_LOG_LEVELS[level] + except KeyError: + counter_heuristic_failures.labels("invalid-log-level").inc() + if args.stream_to_log_level and "level" not in o["log"]: + o["log"]["level"] = "error" if stream == "stderr" else "info" if "@timestamp" not in o: o["@timestamp"] = o["event"]["created"]