diff --git a/Dockerfile b/Dockerfile index 2ccace3..8dd5d5a 100644 --- a/Dockerfile +++ b/Dockerfile @@ -1,4 +1,6 @@ FROM harbor.k-space.ee/k-space/microservice-base RUN pip3 install asyncinotify ujson prometheus-async[aiohttp] -ADD log_shipper.py /log_shipper.py -ENTRYPOINT /log_shipper.py +WORKDIR /app +ADD heuristics.py /app/heuristics.py +ADD log_shipper.py /app/log_shipper.py +ENTRYPOINT /app/log_shipper.py diff --git a/heuristics.py b/heuristics.py new file mode 100644 index 0000000..3b8aea3 --- /dev/null +++ b/heuristics.py @@ -0,0 +1,196 @@ +import ujson +import re + + +def int64(value): + """ + Clamp integers for Mongo + """ + value = int(value) + if value < -9223372036854775808: + raise ValueError("Integer value %s too low" % value) + elif value > 9223372036854775807: + raise ValueError("Integer value %s too high" % value) + return value + + +class DecodeStringJSON(): + """ + Match and extract JSON strings + """ + @classmethod + def match(cls, record): + return record["message"].startswith("{\"") + + @classmethod + def extract(cls, record, m): + record["json"] = ujson.loads(record["message"]) + + +class DecodeStringRegex(): + """ + Match and extract fields based on regular expressions + """ + @classmethod + def match(cls, record): + return cls.PATTERN.match(record["message"]) + + @classmethod + def extract(cls, record, m): + for key, value in m.groupdict().items(): + record[key] = value + + +class DecodeStringGoLogger(DecodeStringRegex): + PATTERN = re.compile( + "time=\"([0-9][0-9][0-9][0-9]\\-[0-9][0-9]\\-[0-9][0-9]T[0-9][0-9]\\:[0-9][0-9]\\:[0-9][0-9]Z)\" " + "level=([a-z]+) " + "msg=\"(.*?)" + "( resource=(.*?))?\"$") + + @classmethod + def extract(cls, record, m): + _, level, msg, _, _ = m.groups() + record["message"] = msg + record["log"]["level"] = level + + +class DecodeStringCLF(DecodeStringRegex): + PATTERN = re.compile("(\\S+) \\S+ (\\S+) \\[([^\\]]+)\\] " + "\"([A-Z]+) ([^ \"]+)? HTTP/[0-9.]+\" ([0-9]{3}) ([0-9]+|-)") + + @classmethod + def extract(cls, record, m): + client, userid, _, method, request, status, size = m.groups() + record["message"] = "%s %s" % (method, request) + record["event"]["type"] = "access" + if userid: + record["client"]["user"] = userid + record["client"]["address"] = client + record["http"]["request"]["method"] = method + record["http"]["response"]["bytes"] = int64(size) + record["http"]["response"]["status_code"] = int64(status) + + +class ExtractSanic(): + FIELDS = {"type", "response_time", "status_code", "path", "method", + "remote", "user_agent", "host", "logger", "level", "timestamp", + "worker", "req_id"} + + @classmethod + def match(cls, record): + return "json" in record and cls.FIELDS.issubset(record["json"].keys()) + + @classmethod + def extract(cls, record, m): + j = record.pop("json") + if "traceback" in j: + record["error"]["stack_trace"] = j["traceback"] + record["log"]["level"] = j["level"] + record["message"] = "%s %s" % (j["method"], j["path"]) + record["event"]["type"] = "access" + record["client"]["address"] = j["remote"] + record["http"]["request"]["path"] = j["path"] + record["http"]["request"]["method"] = j["method"] + record["http"]["response"]["status_code"] = int64(j["status_code"]) + # TODO: timestamp, type, user_agent, response_time, req_id, logger, worker + # TODO: optional length + + +class ExtractElasticCommonSchema(): + @classmethod + def match(cls, record): + json_keys = record.get("json", {}).keys() + return "@timestamp" in json_keys and "message" in json_keys + + @classmethod + def extract(cls, record, m): + record.update(record.pop("json")) + + +class NormalizeLogLevel(): + MAPPING = { + # Syslog level emergency (0), should not be used by applications + "emerg": "emergency", + "panic": "emergency", + + # Syslog level alert (1) + "a": "alert", + + # Syslog level critical (2), likely results in program exit + "crit": "critical", + "fatal": "critical", + "f": "critical", + + # Syslog level error (3) + "err": "error", + "e": "error", + + # Syslog level warning (4) + "warn": "warning", + "w": "warning", + + # Following log levels should not be enabled by default + + # Syslog level notice (5) + "n": "notice", + + # Syslog level informational (6) + "informational": "info", + "i": "info", + + # Syslog level debug (7) + "d": "debug", + "d1": "debug", + "d2": "debug", + "d3": "debug", + "d4": "debug", + "d5": "debug", + "trace": "debug", + } + + @classmethod + def match(cls, record): + return "level" in record.get("log", "") + + @classmethod + def extract(cls, record, m): + z = record["log"]["level"].strip().lower() + record["log"]["level"] = cls.MAPPING.get(z, z) + + +class BestEffortTopLevelMerge(): + @classmethod + def match(cls, record): + json_keys = record.get("json", {}).keys() + return "level" in json_keys and ("msg" in json_keys or "message" in json_keys) + + @classmethod + def extract(cls, record, m): + j = record["json"] + record["message"] = j.get("message") or j.get("msg") + record["log"]["level"] = j["level"] + + +string_decoders = ( + DecodeStringJSON, + DecodeStringCLF, + DecodeStringGoLogger +) + +record_manglers = ( + ExtractElasticCommonSchema, + ExtractSanic, + BestEffortTopLevelMerge, + NormalizeLogLevel +) + + +def process(rec): + rec["heuristics"] = [] + for decode_pass in string_decoders, record_manglers: + for heuristic in decode_pass: + m = heuristic.match(rec) + if m: + rec["heuristics"].append(heuristic.__name__) + heuristic.extract(rec, m) diff --git a/log_shipper.py b/log_shipper.py index 8014c0b..d24b4f2 100755 --- a/log_shipper.py +++ b/log_shipper.py @@ -2,10 +2,10 @@ import argparse import asyncio import collections +import heuristics import os import re import socket -import ujson import prometheus_async import pymongo from aiofile import async_open @@ -130,45 +130,8 @@ histogram_line_size = Histogram( buckets=(80, 160, 320, 640, 1280, inf)) -NORMALIZED_LOG_LEVELS = { - # Syslog level emergency (0), should not be used by applications - "emerg": "emergency", - "panic": "emergency", - - # Syslog level alert (1) - "a": "alert", - - # Syslog level critical (2), likely results in program exit - "crit": "critical", - "fatal": "critical", - "f": "critical", - - # Syslog level error (3) - "err": "error", - "e": "error", - - # Syslog level warning (4) - "warn": "warning", - "w": "warning", - - # Following log levels should not be enabled by default - - # Syslog level notice (5) - "n": "notice", - - # Syslog level informational (6) - "informational": "info", - "i": "info", - - # Syslog level debug (7) - "d": "debug", - "d1": "debug", - "d2": "debug", - "d3": "debug", - "d4": "debug", - "d5": "debug", - "trace": "debug", -} +def recursively_default_dict(): + return collections.defaultdict(recursively_default_dict) async def uploader(coll, queue): @@ -222,7 +185,7 @@ async def uploader(coll, queue): except pymongo.errors.NotPrimaryError: counter_bulk_insertions.labels("not-primary").inc() continue - except pymongo.errors.BulkWriteError as e: + except (pymongo.errors.BulkWriteError, OverflowError) as e: counter_bulk_insertions.labels("retried-as-singles").inc() j = "%s.%s" % (e.__class__.__module__, e.__class__.__name__) counter_bulk_insertion_errors.labels(j).inc() @@ -366,9 +329,9 @@ class LogFile(object): # This is partial message continue assert state == "F", "Unknown line state" - o = {} + o = recursively_default_dict() o["message"] = message - o["log"] = {} + message = "" record_size = 0 @@ -378,19 +341,7 @@ class LogFile(object): continue stream = line[36:42].strip() - if args.parse_json and o["message"].startswith("{\""): - # TODO: Follow Filebeat hints - try: - j = ujson.loads(o["message"]) - except ujson.JSONDecodeError: - counter_heuristic_failures.labels("invalid-json").inc() - else: - # Merge only if parsed JSON message looks like it's - # conforming to ECS schema - if args.merge_top_level and "@timestamp" in j and "message" in j: - o.update(j) - else: - o["json"] = j + heuristics.process(o) o["kubernetes"] = { "container": { @@ -412,15 +363,6 @@ class LogFile(object): "created": event_created } - if args.normalize_log_level and "level" in o["log"]: - level = o["log"]["level"].strip().lower() - try: - o["log"]["level"] = NORMALIZED_LOG_LEVELS[level] - except KeyError: - counter_heuristic_failures.labels("invalid-log-level").inc() - if args.stream_to_log_level and "level" not in o["log"]: - o["log"]["level"] = "error" if stream == "stderr" else "info" - if "@timestamp" not in o: o["@timestamp"] = o["event"]["created"] o.pop("_id", None)