Separate heuristics module
continuous-integration/drone Build is passing
Details
continuous-integration/drone Build is passing
Details
This commit is contained in:
parent
ca854f60e5
commit
c84f85421c
|
@ -1,4 +1,6 @@
|
|||
FROM harbor.k-space.ee/k-space/microservice-base
|
||||
RUN pip3 install asyncinotify ujson prometheus-async[aiohttp]
|
||||
ADD log_shipper.py /log_shipper.py
|
||||
ENTRYPOINT /log_shipper.py
|
||||
WORKDIR /app
|
||||
ADD heuristics.py /app/heuristics.py
|
||||
ADD log_shipper.py /app/log_shipper.py
|
||||
ENTRYPOINT /app/log_shipper.py
|
||||
|
|
|
@ -0,0 +1,196 @@
|
|||
import ujson
|
||||
import re
|
||||
|
||||
|
||||
def int64(value):
|
||||
"""
|
||||
Clamp integers for Mongo
|
||||
"""
|
||||
value = int(value)
|
||||
if value < -9223372036854775808:
|
||||
raise ValueError("Integer value %s too low" % value)
|
||||
elif value > 9223372036854775807:
|
||||
raise ValueError("Integer value %s too high" % value)
|
||||
return value
|
||||
|
||||
|
||||
class DecodeStringJSON():
|
||||
"""
|
||||
Match and extract JSON strings
|
||||
"""
|
||||
@classmethod
|
||||
def match(cls, record):
|
||||
return record["message"].startswith("{\"")
|
||||
|
||||
@classmethod
|
||||
def extract(cls, record, m):
|
||||
record["json"] = ujson.loads(record["message"])
|
||||
|
||||
|
||||
class DecodeStringRegex():
|
||||
"""
|
||||
Match and extract fields based on regular expressions
|
||||
"""
|
||||
@classmethod
|
||||
def match(cls, record):
|
||||
return cls.PATTERN.match(record["message"])
|
||||
|
||||
@classmethod
|
||||
def extract(cls, record, m):
|
||||
for key, value in m.groupdict().items():
|
||||
record[key] = value
|
||||
|
||||
|
||||
class DecodeStringGoLogger(DecodeStringRegex):
|
||||
PATTERN = re.compile(
|
||||
"time=\"([0-9][0-9][0-9][0-9]\\-[0-9][0-9]\\-[0-9][0-9]T[0-9][0-9]\\:[0-9][0-9]\\:[0-9][0-9]Z)\" "
|
||||
"level=([a-z]+) "
|
||||
"msg=\"(.*?)"
|
||||
"( resource=(.*?))?\"$")
|
||||
|
||||
@classmethod
|
||||
def extract(cls, record, m):
|
||||
_, level, msg, _, _ = m.groups()
|
||||
record["message"] = msg
|
||||
record["log"]["level"] = level
|
||||
|
||||
|
||||
class DecodeStringCLF(DecodeStringRegex):
|
||||
PATTERN = re.compile("(\\S+) \\S+ (\\S+) \\[([^\\]]+)\\] "
|
||||
"\"([A-Z]+) ([^ \"]+)? HTTP/[0-9.]+\" ([0-9]{3}) ([0-9]+|-)")
|
||||
|
||||
@classmethod
|
||||
def extract(cls, record, m):
|
||||
client, userid, _, method, request, status, size = m.groups()
|
||||
record["message"] = "%s %s" % (method, request)
|
||||
record["event"]["type"] = "access"
|
||||
if userid:
|
||||
record["client"]["user"] = userid
|
||||
record["client"]["address"] = client
|
||||
record["http"]["request"]["method"] = method
|
||||
record["http"]["response"]["bytes"] = int64(size)
|
||||
record["http"]["response"]["status_code"] = int64(status)
|
||||
|
||||
|
||||
class ExtractSanic():
|
||||
FIELDS = {"type", "response_time", "status_code", "path", "method",
|
||||
"remote", "user_agent", "host", "logger", "level", "timestamp",
|
||||
"worker", "req_id"}
|
||||
|
||||
@classmethod
|
||||
def match(cls, record):
|
||||
return "json" in record and cls.FIELDS.issubset(record["json"].keys())
|
||||
|
||||
@classmethod
|
||||
def extract(cls, record, m):
|
||||
j = record.pop("json")
|
||||
if "traceback" in j:
|
||||
record["error"]["stack_trace"] = j["traceback"]
|
||||
record["log"]["level"] = j["level"]
|
||||
record["message"] = "%s %s" % (j["method"], j["path"])
|
||||
record["event"]["type"] = "access"
|
||||
record["client"]["address"] = j["remote"]
|
||||
record["http"]["request"]["path"] = j["path"]
|
||||
record["http"]["request"]["method"] = j["method"]
|
||||
record["http"]["response"]["status_code"] = int64(j["status_code"])
|
||||
# TODO: timestamp, type, user_agent, response_time, req_id, logger, worker
|
||||
# TODO: optional length
|
||||
|
||||
|
||||
class ExtractElasticCommonSchema():
|
||||
@classmethod
|
||||
def match(cls, record):
|
||||
json_keys = record.get("json", {}).keys()
|
||||
return "@timestamp" in json_keys and "message" in json_keys
|
||||
|
||||
@classmethod
|
||||
def extract(cls, record, m):
|
||||
record.update(record.pop("json"))
|
||||
|
||||
|
||||
class NormalizeLogLevel():
|
||||
MAPPING = {
|
||||
# Syslog level emergency (0), should not be used by applications
|
||||
"emerg": "emergency",
|
||||
"panic": "emergency",
|
||||
|
||||
# Syslog level alert (1)
|
||||
"a": "alert",
|
||||
|
||||
# Syslog level critical (2), likely results in program exit
|
||||
"crit": "critical",
|
||||
"fatal": "critical",
|
||||
"f": "critical",
|
||||
|
||||
# Syslog level error (3)
|
||||
"err": "error",
|
||||
"e": "error",
|
||||
|
||||
# Syslog level warning (4)
|
||||
"warn": "warning",
|
||||
"w": "warning",
|
||||
|
||||
# Following log levels should not be enabled by default
|
||||
|
||||
# Syslog level notice (5)
|
||||
"n": "notice",
|
||||
|
||||
# Syslog level informational (6)
|
||||
"informational": "info",
|
||||
"i": "info",
|
||||
|
||||
# Syslog level debug (7)
|
||||
"d": "debug",
|
||||
"d1": "debug",
|
||||
"d2": "debug",
|
||||
"d3": "debug",
|
||||
"d4": "debug",
|
||||
"d5": "debug",
|
||||
"trace": "debug",
|
||||
}
|
||||
|
||||
@classmethod
|
||||
def match(cls, record):
|
||||
return "level" in record.get("log", "")
|
||||
|
||||
@classmethod
|
||||
def extract(cls, record, m):
|
||||
z = record["log"]["level"].strip().lower()
|
||||
record["log"]["level"] = cls.MAPPING.get(z, z)
|
||||
|
||||
|
||||
class BestEffortTopLevelMerge():
|
||||
@classmethod
|
||||
def match(cls, record):
|
||||
json_keys = record.get("json", {}).keys()
|
||||
return "level" in json_keys and ("msg" in json_keys or "message" in json_keys)
|
||||
|
||||
@classmethod
|
||||
def extract(cls, record, m):
|
||||
j = record["json"]
|
||||
record["message"] = j.get("message") or j.get("msg")
|
||||
record["log"]["level"] = j["level"]
|
||||
|
||||
|
||||
string_decoders = (
|
||||
DecodeStringJSON,
|
||||
DecodeStringCLF,
|
||||
DecodeStringGoLogger
|
||||
)
|
||||
|
||||
record_manglers = (
|
||||
ExtractElasticCommonSchema,
|
||||
ExtractSanic,
|
||||
BestEffortTopLevelMerge,
|
||||
NormalizeLogLevel
|
||||
)
|
||||
|
||||
|
||||
def process(rec):
|
||||
rec["heuristics"] = []
|
||||
for decode_pass in string_decoders, record_manglers:
|
||||
for heuristic in decode_pass:
|
||||
m = heuristic.match(rec)
|
||||
if m:
|
||||
rec["heuristics"].append(heuristic.__name__)
|
||||
heuristic.extract(rec, m)
|
|
@ -2,10 +2,10 @@
|
|||
import argparse
|
||||
import asyncio
|
||||
import collections
|
||||
import heuristics
|
||||
import os
|
||||
import re
|
||||
import socket
|
||||
import ujson
|
||||
import prometheus_async
|
||||
import pymongo
|
||||
from aiofile import async_open
|
||||
|
@ -130,45 +130,8 @@ histogram_line_size = Histogram(
|
|||
buckets=(80, 160, 320, 640, 1280, inf))
|
||||
|
||||
|
||||
NORMALIZED_LOG_LEVELS = {
|
||||
# Syslog level emergency (0), should not be used by applications
|
||||
"emerg": "emergency",
|
||||
"panic": "emergency",
|
||||
|
||||
# Syslog level alert (1)
|
||||
"a": "alert",
|
||||
|
||||
# Syslog level critical (2), likely results in program exit
|
||||
"crit": "critical",
|
||||
"fatal": "critical",
|
||||
"f": "critical",
|
||||
|
||||
# Syslog level error (3)
|
||||
"err": "error",
|
||||
"e": "error",
|
||||
|
||||
# Syslog level warning (4)
|
||||
"warn": "warning",
|
||||
"w": "warning",
|
||||
|
||||
# Following log levels should not be enabled by default
|
||||
|
||||
# Syslog level notice (5)
|
||||
"n": "notice",
|
||||
|
||||
# Syslog level informational (6)
|
||||
"informational": "info",
|
||||
"i": "info",
|
||||
|
||||
# Syslog level debug (7)
|
||||
"d": "debug",
|
||||
"d1": "debug",
|
||||
"d2": "debug",
|
||||
"d3": "debug",
|
||||
"d4": "debug",
|
||||
"d5": "debug",
|
||||
"trace": "debug",
|
||||
}
|
||||
def recursively_default_dict():
|
||||
return collections.defaultdict(recursively_default_dict)
|
||||
|
||||
|
||||
async def uploader(coll, queue):
|
||||
|
@ -222,7 +185,7 @@ async def uploader(coll, queue):
|
|||
except pymongo.errors.NotPrimaryError:
|
||||
counter_bulk_insertions.labels("not-primary").inc()
|
||||
continue
|
||||
except pymongo.errors.BulkWriteError as e:
|
||||
except (pymongo.errors.BulkWriteError, OverflowError) as e:
|
||||
counter_bulk_insertions.labels("retried-as-singles").inc()
|
||||
j = "%s.%s" % (e.__class__.__module__, e.__class__.__name__)
|
||||
counter_bulk_insertion_errors.labels(j).inc()
|
||||
|
@ -366,9 +329,9 @@ class LogFile(object):
|
|||
# This is partial message
|
||||
continue
|
||||
assert state == "F", "Unknown line state"
|
||||
o = {}
|
||||
o = recursively_default_dict()
|
||||
o["message"] = message
|
||||
o["log"] = {}
|
||||
|
||||
message = ""
|
||||
record_size = 0
|
||||
|
||||
|
@ -378,19 +341,7 @@ class LogFile(object):
|
|||
continue
|
||||
|
||||
stream = line[36:42].strip()
|
||||
if args.parse_json and o["message"].startswith("{\""):
|
||||
# TODO: Follow Filebeat hints
|
||||
try:
|
||||
j = ujson.loads(o["message"])
|
||||
except ujson.JSONDecodeError:
|
||||
counter_heuristic_failures.labels("invalid-json").inc()
|
||||
else:
|
||||
# Merge only if parsed JSON message looks like it's
|
||||
# conforming to ECS schema
|
||||
if args.merge_top_level and "@timestamp" in j and "message" in j:
|
||||
o.update(j)
|
||||
else:
|
||||
o["json"] = j
|
||||
heuristics.process(o)
|
||||
|
||||
o["kubernetes"] = {
|
||||
"container": {
|
||||
|
@ -412,15 +363,6 @@ class LogFile(object):
|
|||
"created": event_created
|
||||
}
|
||||
|
||||
if args.normalize_log_level and "level" in o["log"]:
|
||||
level = o["log"]["level"].strip().lower()
|
||||
try:
|
||||
o["log"]["level"] = NORMALIZED_LOG_LEVELS[level]
|
||||
except KeyError:
|
||||
counter_heuristic_failures.labels("invalid-log-level").inc()
|
||||
if args.stream_to_log_level and "level" not in o["log"]:
|
||||
o["log"]["level"] = "error" if stream == "stderr" else "info"
|
||||
|
||||
if "@timestamp" not in o:
|
||||
o["@timestamp"] = o["event"]["created"]
|
||||
o.pop("_id", None)
|
||||
|
|
Loading…
Reference in New Issue