Make everything configurable
continuous-integration/drone Build is passing
Details
continuous-integration/drone Build is passing
Details
This commit is contained in:
parent
617b555fde
commit
bfe310b9b0
115
log_shipper.py
115
log_shipper.py
|
@ -22,17 +22,31 @@ To install dependencies:
|
||||||
pip3 install ujson pymongo motor asyncinotify prometheus_client sanic
|
pip3 install ujson pymongo motor asyncinotify prometheus_client sanic
|
||||||
"""
|
"""
|
||||||
|
|
||||||
parser = argparse.ArgumentParser(description="Log shipper")
|
parser = argparse.ArgumentParser(description="Log shipper",
|
||||||
|
formatter_class=argparse.ArgumentDefaultsHelpFormatter)
|
||||||
parser.add_argument("--dry-run", action="store_true",
|
parser.add_argument("--dry-run", action="store_true",
|
||||||
help="Do not insert anything into database")
|
help="Do not insert anything into database")
|
||||||
parser.add_argument("--namespace", type=str)
|
parser.add_argument("--namespace", type=str,
|
||||||
parser.add_argument("--exclude-pod-prefixes", nargs="*", type=str, default=["logmower-"])
|
help="Namespace to monitor, all by default")
|
||||||
parser.add_argument("--max-record-size", type=int, default=128 * 1024) # 128kB
|
parser.add_argument("--exclude-pod-prefixes", nargs="*", type=str, default=["logmower-"],
|
||||||
parser.add_argument("--max-collection-size", type=int, default=2**30) # 1GiB
|
help="Pod prefixes to exclude")
|
||||||
parser.add_argument("--normalize-log-level", action="store_true",
|
parser.add_argument("--max-upload-queue-size", type=int, default=10000,
|
||||||
help="Normalize log.level values to Syslog defined keywords")
|
help="Max upload queue size in records")
|
||||||
parser.add_argument("--bulk-insertion-size", type=int, default=1000)
|
parser.add_argument("--max-connection-pool-size", type=int, default=1,
|
||||||
parser.add_argument("--parse-json", action="store_true")
|
help="Max MongoDB connection pool size")
|
||||||
|
parser.add_argument("--max-record-size", type=int, default=128 * 1024,
|
||||||
|
help="Max record size in bytes, 128k by default")
|
||||||
|
parser.add_argument("--max-record-retention", type=int,
|
||||||
|
help="Record retention in seconds, never by default")
|
||||||
|
parser.add_argument("--max-collection-size", type=int,
|
||||||
|
help="MongoDB collection size limit in bytes, by default disabled")
|
||||||
|
parser.add_argument("--bulk-insertion-size", type=int, default=1000,
|
||||||
|
help="MongoDB bulk insertion size in records")
|
||||||
|
parser.add_argument("--heuristic-parse-json", action="store_true",
|
||||||
|
help="Attempt automatically unwrapping JSON records")
|
||||||
|
parser.add_argument("--heuristic-normalize-log-level", action="store_true",
|
||||||
|
help="Normalize log.level values to Syslog defined keywords")
|
||||||
|
|
||||||
args = parser.parse_args()
|
args = parser.parse_args()
|
||||||
|
|
||||||
ROOT = "/var/log/containers"
|
ROOT = "/var/log/containers"
|
||||||
|
@ -71,6 +85,10 @@ counter_skipped_bytes = Counter(
|
||||||
counter_dropped_lines = Counter(
|
counter_dropped_lines = Counter(
|
||||||
"logmower_dropped_lines",
|
"logmower_dropped_lines",
|
||||||
"Lines dropped due to being part of too long record")
|
"Lines dropped due to being part of too long record")
|
||||||
|
counter_heuristic_failures = Counter(
|
||||||
|
"logmower_heuristic_failed_record_count",
|
||||||
|
"Heuristic failures",
|
||||||
|
["mode"])
|
||||||
counter_records = Counter(
|
counter_records = Counter(
|
||||||
"logmower_record_count",
|
"logmower_record_count",
|
||||||
"Record count",
|
"Record count",
|
||||||
|
@ -102,15 +120,59 @@ histogram_line_size = Histogram(
|
||||||
buckets=(80, 160, 320, 640, 1280, inf))
|
buckets=(80, 160, 320, 640, 1280, inf))
|
||||||
|
|
||||||
|
|
||||||
|
NORMALIZED_LOG_LEVELS = {
|
||||||
|
# Syslog level emergency (0), should not be used by applications
|
||||||
|
"emerg": "emergency",
|
||||||
|
"panic": "emergency",
|
||||||
|
|
||||||
|
# Syslog level alert (1)
|
||||||
|
"a": "alert",
|
||||||
|
|
||||||
|
# Syslog level critical (2), likely results in program exit
|
||||||
|
"crit": "critical",
|
||||||
|
"fatal": "critical",
|
||||||
|
"f": "critical",
|
||||||
|
|
||||||
|
# Syslog level error (3)
|
||||||
|
"err": "error",
|
||||||
|
"e": "error",
|
||||||
|
|
||||||
|
# Syslog level warning (4)
|
||||||
|
"warn": "warning",
|
||||||
|
"w": "warning",
|
||||||
|
|
||||||
|
# Following log levels should not be enabled by default
|
||||||
|
|
||||||
|
# Syslog level notice (5)
|
||||||
|
"n": "notice",
|
||||||
|
|
||||||
|
# Syslog level informational (6)
|
||||||
|
"informational": "info",
|
||||||
|
"i": "info",
|
||||||
|
|
||||||
|
# Syslog level debug (7)
|
||||||
|
"d": "debug",
|
||||||
|
"d1": "debug",
|
||||||
|
"d2": "debug",
|
||||||
|
"d3": "debug",
|
||||||
|
"d4": "debug",
|
||||||
|
"d5": "debug",
|
||||||
|
"trace": "debug",
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
async def uploader(coll, queue):
|
async def uploader(coll, queue):
|
||||||
then = time()
|
then = time()
|
||||||
await coll.create_index([("@timestamp", 1)],
|
kwargs = {}
|
||||||
expireAfterSeconds=3600 * 24 * 3)
|
if args.max_record_retention:
|
||||||
|
kwargs["expireAfterSeconds"] = args.max_record_retention
|
||||||
|
await coll.create_index([("@timestamp", 1)], **kwargs)
|
||||||
|
|
||||||
# Following index is used to look up where to resume submitting logs
|
# Following index is used to look up where to resume submitting logs
|
||||||
# after restart/crash
|
# after restart/crash
|
||||||
await coll.create_index([("log.file.path", 1),
|
await coll.create_index([("log.file.path", 1),
|
||||||
("log.offset", 1)])
|
("log.offset", 1)],
|
||||||
|
unique=True)
|
||||||
|
|
||||||
# Indexes used for frequent searches
|
# Indexes used for frequent searches
|
||||||
await coll.create_index([("host.name", 1)])
|
await coll.create_index([("host.name", 1)])
|
||||||
|
@ -267,14 +329,13 @@ class LogFile(object):
|
||||||
continue
|
continue
|
||||||
|
|
||||||
stream = line[36:42].strip()
|
stream = line[36:42].strip()
|
||||||
if message.startswith("{\""):
|
if args.heuristic_parse_json and o["message"].startswith("{\""):
|
||||||
# TODO: Follow Filebeat hints
|
# TODO: Follow Filebeat hints
|
||||||
try:
|
try:
|
||||||
j = ujson.loads(message)
|
j = ujson.loads(message)
|
||||||
except ujson.JSONDecodeError:
|
except ujson.JSONDecodeError:
|
||||||
counter_records.labels("invalid-json").inc()
|
counter_heuristic_failures.labels("invalid-json").inc()
|
||||||
else:
|
else:
|
||||||
counter_records.labels("json").inc()
|
|
||||||
# Merge only if parsed JSON message looks like it's
|
# Merge only if parsed JSON message looks like it's
|
||||||
# conforming to ECS schema
|
# conforming to ECS schema
|
||||||
if "@timestamp" in j and "message" in j:
|
if "@timestamp" in j and "message" in j:
|
||||||
|
@ -301,6 +362,16 @@ class LogFile(object):
|
||||||
"created": event_created
|
"created": event_created
|
||||||
}
|
}
|
||||||
|
|
||||||
|
if args.heuristic_normalize_log_level:
|
||||||
|
if "level" in o["log"]:
|
||||||
|
level = o["log"]["level"].strip().lower()
|
||||||
|
try:
|
||||||
|
o["log"]["level"] = NORMALIZED_LOG_LEVELS[level]
|
||||||
|
except KeyError:
|
||||||
|
counter_heuristic_failures.labels("invalid-log-level").inc()
|
||||||
|
else:
|
||||||
|
o["log"]["level"] = "error" if stream == "stderr" else "info"
|
||||||
|
|
||||||
if "@timestamp" not in o:
|
if "@timestamp" not in o:
|
||||||
o["@timestamp"] = o["event"]["created"]
|
o["@timestamp"] = o["event"]["created"]
|
||||||
o.pop("_id", None)
|
o.pop("_id", None)
|
||||||
|
@ -328,10 +399,11 @@ async def watcher(loop, queue, coll):
|
||||||
|
|
||||||
if not m:
|
if not m:
|
||||||
print("Unexpected filename:", path)
|
print("Unexpected filename:", path)
|
||||||
raise
|
|
||||||
counter_unexpected_filenames.inc()
|
counter_unexpected_filenames.inc()
|
||||||
return
|
return
|
||||||
namespace_name, pod_name, container_name = m.groups()
|
namespace_name, pod_name, container_name = m.groups()
|
||||||
|
if args.namespace and args.namespace == namespace_name:
|
||||||
|
return
|
||||||
for prefix in args.exclude_pod_prefixes:
|
for prefix in args.exclude_pod_prefixes:
|
||||||
if pod_name.startswith(prefix):
|
if pod_name.startswith(prefix):
|
||||||
return
|
return
|
||||||
|
@ -417,11 +489,14 @@ async def dumper(queue):
|
||||||
|
|
||||||
@app.listener("before_server_start")
|
@app.listener("before_server_start")
|
||||||
async def init(sanic, loop):
|
async def init(sanic, loop):
|
||||||
queue = asyncio.Queue(10000)
|
queue = asyncio.Queue(args.max_upload_queue_size)
|
||||||
if not args.dry_run:
|
if not args.dry_run:
|
||||||
db = AsyncIOMotorClient(os.environ["MONGODB_HOST"]).get_default_database()
|
db = AsyncIOMotorClient(os.environ["MONGODB_HOST"],
|
||||||
|
maxPoolSize=args.max_connection_pool_size).get_default_database()
|
||||||
try:
|
try:
|
||||||
await db.create_collection("log", capped=True, size=args.max_collection_size)
|
await db.create_collection("log",
|
||||||
|
capped=bool(args.max_collection_size),
|
||||||
|
size=args.max_collection_size)
|
||||||
except CollectionInvalid:
|
except CollectionInvalid:
|
||||||
pass
|
pass
|
||||||
coll = db["log"]
|
coll = db["log"]
|
||||||
|
|
Loading…
Reference in New Issue