9
0
Fork 0
logmower-shipper-prototype/log_shipper.py

548 lines
20 KiB
Python
Executable File

#!/usr/local/bin/python3 -OO
import argparse
import asyncio
import collections
import os
import re
import socket
import ujson
import pymongo
from asyncinotify import Inotify, Mask
from datetime import datetime
from math import inf
from motor.motor_asyncio import AsyncIOMotorClient
from prometheus_client import Counter, Gauge, Histogram
from prometheus_client.exposition import generate_latest
from pymongo.errors import CollectionInvalid
from sanic import Sanic, text
from time import time
"""
To install dependencies:
pip3 install ujson pymongo motor asyncinotify prometheus_client sanic
"""
parser = argparse.ArgumentParser(description="Log shipper",
formatter_class=argparse.ArgumentDefaultsHelpFormatter)
parser.add_argument("--dry-run", action="store_true",
help="Do not insert anything into database")
parser.add_argument("--namespace", type=str,
help="Namespace to monitor, all by default")
parser.add_argument("--exclude-pod-prefixes", nargs="*", type=str, default=["logmower-"],
help="Pod prefixes to exclude")
parser.add_argument("--max-upload-queue-size", type=int, default=10000,
help="Max upload queue size in records")
parser.add_argument("--max-connection-pool-size", type=int, default=1,
help="Max MongoDB connection pool size")
parser.add_argument("--max-record-size", type=int, default=128 * 1024,
help="Max record size in bytes, 128k by default")
parser.add_argument("--max-record-retention", type=int,
help="Record retention in seconds, never by default")
parser.add_argument("--max-collection-size", type=int,
help="MongoDB collection size limit in bytes, by default disabled")
parser.add_argument("--bulk-insertion-size", type=int, default=1000,
help="MongoDB bulk insertion size in records")
parser.add_argument("--heuristic-parse-json", action="store_true",
help="Attempt automatically unwrapping JSON records")
parser.add_argument("--heuristic-normalize-log-level", action="store_true",
help="Normalize log.level values to Syslog defined keywords")
args = parser.parse_args()
ROOT = "/var/log/containers"
app = Sanic("tail")
tasks = dict()
with open("/etc/machine-id") as fh:
machine_id = fh.read().strip()
host_info = {
"id": machine_id,
"architecture": os.uname().machine,
"name": os.environ.get("NODE_NAME", socket.getfqdn())
}
log_files = dict()
gauge_buffer_size = Gauge(
"logmower_buffer_size_bytes",
"Log files buffered in memory")
gauge_backlog_size = Gauge(
"logmower_backlog_size_bytes",
"Content that is yet to be submitted")
gauge_log_files = Gauge(
"logmower_log_file_count",
"Number of tracked log files",
["state"])
gauge_queue_entries = Gauge(
"logmower_queue_record_count",
"Records queued for submission")
counter_unexpected_filenames = Counter(
"logmower_invalid_filename_count",
"Count of unexpected filenames in logs directory")
counter_inotify_events = Counter(
"logmower_inotify_event_count",
"Count of inotify events",
["mask"])
counter_skipped_bytes = Counter(
"logmower_skipped_bytes",
"Bytes that were skipped during startup due to being already present in data store")
counter_dropped_lines = Counter(
"logmower_dropped_lines",
"Lines dropped due to being part of too long record")
counter_heuristic_failures = Counter(
"logmower_heuristic_failed_record_count",
"Heuristic failures",
["mode"])
counter_records = Counter(
"logmower_record_count",
"Record count",
["stage"])
counter_insertion_errors = Counter(
"logmower_insertion_error_count",
"Exceptions caught during insertion of single event",
["exception"])
counter_bulk_insertion_errors = Counter(
"logmower_bulk_insertion_error_count",
"Exceptions caught during bulk insertions",
["exception"])
counter_bulk_insertions = Counter(
"logmower_bulk_insertion_count",
"Count of bulk insertions to database",
["status"])
histogram_bulk_submission_size = Histogram(
"logmower_bulk_submission_message_count",
"Bulk submission message count",
buckets=(1, 5, 10, 50, 100, 500, 1000, 5000, 10000))
histogram_database_operation_latency = Histogram(
"logmower_database_operation_latency",
"Database operation latency",
["operation"],
buckets=(0.1, 0.2, 0.5, 1, 5, 10, 50))
histogram_bulk_submission_latency = Histogram(
"logmower_bulk_submission_latency",
"Bulk submission latency",
buckets=(0.1, 0.2, 0.5, 1, 5, 10, 50))
histogram_line_size = Histogram(
"logmower_line_size_bytes",
"Log file line size in sizes",
buckets=(80, 160, 320, 640, 1280, inf))
NORMALIZED_LOG_LEVELS = {
# Syslog level emergency (0), should not be used by applications
"emerg": "emergency",
"panic": "emergency",
# Syslog level alert (1)
"a": "alert",
# Syslog level critical (2), likely results in program exit
"crit": "critical",
"fatal": "critical",
"f": "critical",
# Syslog level error (3)
"err": "error",
"e": "error",
# Syslog level warning (4)
"warn": "warning",
"w": "warning",
# Following log levels should not be enabled by default
# Syslog level notice (5)
"n": "notice",
# Syslog level informational (6)
"informational": "info",
"i": "info",
# Syslog level debug (7)
"d": "debug",
"d1": "debug",
"d2": "debug",
"d3": "debug",
"d4": "debug",
"d5": "debug",
"trace": "debug",
}
async def uploader(coll, queue):
then = time()
kwargs = {}
if args.max_record_retention:
kwargs["expireAfterSeconds"] = args.max_record_retention
await coll.create_index([("@timestamp", 1)], **kwargs)
# Following index is used to look up where to resume submitting logs
# after restart/crash
await coll.create_index([("log.file.path", 1),
("log.offset", 1)],
unique=True)
# Indexes used for frequent searches
await coll.create_index([("host.name", 1)])
await coll.create_index([("kubernetes.pod.name", 1)],
sparse=True)
await coll.create_index([("kubernetes.namespace", 1),
("kubernetes.pod.name", 1),
("kubernetes.container.name", 1)],
sparse=True)
histogram_database_operation_latency.labels("create-index").observe(time() - then)
messages = []
while True:
while len(messages) < args.bulk_insertion_size:
try:
o = await asyncio.wait_for(queue.get(), timeout=0.1)
except asyncio.exceptions.TimeoutError:
break
else:
gauge_queue_entries.set(queue.qsize())
o["event"]["ingested"] = datetime.utcnow()
messages.append(o)
if not messages:
continue
try:
then = time()
await coll.insert_many(messages)
histogram_database_operation_latency.labels("insert-many").observe(time() - then)
except pymongo.errors.ServerSelectionTimeoutError:
counter_bulk_insertions.labels("timed-out").inc()
continue
except pymongo.errors.NotPrimaryError:
counter_bulk_insertions.labels("not-primary").inc()
continue
except pymongo.errors.BulkWriteError as e:
counter_bulk_insertions.labels("retried-as-singles").inc()
j = "%s.%s" % (e.__class__.__module__, e.__class__.__name__)
counter_bulk_insertion_errors.labels(j).inc()
print("Bulk insert failed: %s" % j)
for o in messages:
o.pop("_id", None)
o["event"]["ingested"] = datetime.utcnow()
try:
then = time()
await coll.insert_one(o)
histogram_database_operation_latency.labels("insert-one").observe(time() - then)
except Exception as e:
j = "%s.%s" % (e.__class__.__module__, e.__class__.__name__)
counter_insertion_errors.labels(j).inc()
counter_records.labels("dropped").inc()
print("Failed to insert (%s): %s" % (j, o))
else:
counter_records.labels("commited").inc()
else:
counter_bulk_insertions.labels("successful").inc()
histogram_bulk_submission_size.observe(len(messages))
counter_records.labels("commited").inc(len(messages))
messages = []
class FileTailer(object):
def __init__(self, path, offset=0, finished=False):
self.head = offset
self.tail = offset + 1
self.offset = offset
self.path = path
self.buf = b""
self.finished = finished
self.caughtup = False
self.more_content = asyncio.Event()
async def __aiter__(self):
with open(self.path, "rb") as fh:
while True:
if not self.finished and self.head >= self.tail:
self.caughtup = True
await self.more_content.wait()
self.more_content.clear()
self.tail = fh.seek(0, os.SEEK_END)
if self.head >= self.tail:
if self.finished:
# TODO: if there is still something in buf?
break
continue
fh.seek(self.head)
chunk = fh.read(min(self.tail - self.head, 4096))
self.buf += chunk
self.head += len(chunk)
while True:
step = self.buf.find(b"\n")
if step == -1:
break
buf = self.buf[:step + 1]
self.buf = self.buf[step + 1:]
await asyncio.sleep(0)
yield self.offset, len(buf), buf[:-1].decode("utf-8")
self.offset += step + 1
break
class LogFile(FileTailer):
def __init__(self, loop, coll, queue, path, namespace_name, pod_name, container_name, start=False):
FileTailer.__init__(self, path)
self.queue = queue
self.namespace_name = namespace_name
self.pod_name = pod_name
self.container_name = container_name
self.coll = coll
self.state = "seeking"
self.loop = loop
if start:
self.start()
def start(self):
self.loop.create_task(self.handler_loop())
def poke(self):
self.more_content.set()
def close(self):
self.done = True
self.poke()
async def handler_loop(self):
message = ""
record_size = 0
skip_next = False
if not args.dry_run:
then = time()
last_record = await self.coll.find_one({
"host.id": host_info["id"],
"log.file.path": self.path
}, sort=[("log.offset", -1)])
histogram_database_operation_latency.labels("find-replay-offset").observe(time() - then)
if last_record:
self.head = self.offset = last_record["log"]["offset"]
counter_skipped_bytes.inc(self.head)
skip_next = True
self.state = "replaying"
record_offset = self.offset
async for line_offset, line_size, line in self:
self.state = "watching" if self.caughtup else "replaying"
assert "\n" not in line
try:
reason = "unicode-encoding"
if len(line) < 45:
reason = "line-short"
raise ValueError()
if not re.match("^(.+) (stdout|stderr)( (.))? (.*)$", line):
reason = "no-regex-match"
raise ValueError()
reason = "invalid-timestamp"
event_created = datetime.strptime(line[:23], "%Y-%m-%dT%H:%M:%S.%f")
except ValueError:
print("Failed to parse file %s at offset %d, reason %s: %s" % (self.path, line_offset, reason, repr(line)))
break
histogram_line_size.observe(line_size)
record_size += line_size
if record_size < args.max_record_size:
# TODO: Support Docker runtime on EKS
message += line[45:]
state = line[43]
if state == "P":
# This is partial message
continue
assert state == "F", "Unknown line state"
o = {}
o["message"] = message
o["log"] = {}
message = ""
record_size = 0
if record_size > args.max_record_size:
counter_records.labels("too-large").inc()
# TODO: Log portion of the message
continue
stream = line[36:42].strip()
if args.heuristic_parse_json and o["message"].startswith("{\""):
# TODO: Follow Filebeat hints
try:
j = ujson.loads(message)
except ujson.JSONDecodeError:
counter_heuristic_failures.labels("invalid-json").inc()
else:
# Merge only if parsed JSON message looks like it's
# conforming to ECS schema
if "@timestamp" in j and "message" in j:
o.update(j)
else:
o["json"] = j
o["kubernetes"] = {
"container": {
"name": self.container_name,
},
"namespace": self.namespace_name,
"pod": {
"name": self.pod_name
}
}
o["log"]["file"] = {
"path": self.path
}
o["log"]["offset"] = record_offset
o["host"] = host_info
o["stream"] = stream
o["event"] = {
"created": event_created
}
if args.heuristic_normalize_log_level:
if "level" in o["log"]:
level = o["log"]["level"].strip().lower()
try:
o["log"]["level"] = NORMALIZED_LOG_LEVELS[level]
except KeyError:
counter_heuristic_failures.labels("invalid-log-level").inc()
else:
o["log"]["level"] = "error" if stream == "stderr" else "info"
if "@timestamp" not in o:
o["@timestamp"] = o["event"]["created"]
o.pop("_id", None)
if not skip_next:
await self.queue.put(o)
gauge_queue_entries.set(self.queue.qsize())
skip_next = False
record_offset = line_offset
self.state = "closing"
log_files.pop(self.path)
async def watcher(loop, queue, coll):
print("Starting watching")
with Inotify() as inotify:
def add_file(path, done=False, start=False):
if path in log_files:
log_files[path].done = done
return log_files[path]
print("Adding file: %s" % path)
m = re.match("/var/log/pods/(.*)_(.*)_.*/(.*)/[0-9]+\\.log$", path)
if not m:
print("Unexpected filename:", path)
counter_unexpected_filenames.inc()
return
namespace_name, pod_name, container_name = m.groups()
if args.namespace and args.namespace == namespace_name:
return
for prefix in args.exclude_pod_prefixes:
if pod_name.startswith(prefix):
return
if args.namespace and namespace_name != args.namespace:
return
lf = log_files[path] = LogFile(loop, coll, queue, path, namespace_name, pod_name, container_name)
lf.done = done
lf.start()
inotify.add_watch(path, Mask.MODIFY | Mask.CLOSE_WRITE)
return lf
inotify.add_watch(ROOT, Mask.CREATE | Mask.ONLYDIR)
# Register all existing log files
for pod_dir in os.listdir("/var/log/pods"):
m = re.match("(.*)_(.*)_(.*)$", pod_dir)
if not m:
print("Unexpected directory", pod_dir)
continue
namespace_name, pod_name, pod_id = m.groups()
for container_name in os.listdir(os.path.join("/var/log/pods", pod_dir)):
if not re.match("^(?![0-9]+$)(?!-)[a-zA-Z0-9-]{,63}(?<!-)$", container_name):
print("Unexpected directory:", container_name)
continue
for filename in os.listdir(os.path.join("/var/log/pods", pod_dir, container_name)):
m = re.match("[0-9]+\\.log$", filename)
if not m:
print("Unexpected filename:", filename)
continue
path = os.path.join("/var/log/pods", pod_dir, container_name, filename)
add_file(path, done=True)
# Inspect currently running containers
for filename in os.listdir("/var/log/containers"):
path = os.path.realpath(os.path.join(os.path.join("/var/log/containers", filename)))
add_file(path, done=False)
for log_file in log_files.values():
log_file.start()
async for event in inotify:
# Events for /var/log/pods
if event.mask & Mask.CREATE:
counter_inotify_events.labels("create").inc()
add_file(os.path.realpath(event.path), start=True)
# Events for /var/log/pods
elif event.mask & Mask.CLOSE_WRITE:
print("File closed: %s" % event.path)
counter_inotify_events.labels("close_write").inc()
log_file = log_files.get(str(event.path))
log_file.close()
elif event.mask & Mask.MODIFY:
counter_inotify_events.labels("modify").inc()
log_file = log_files.get(str(event.path))
if log_file:
# TODO: Count cases where log_file is None
log_file.poke()
elif event.mask & Mask.IGNORED:
counter_inotify_events.labels("ignored").inc()
else:
raise NotImplementedError("Unhandled inotify event: %s" % event)
@app.route("/metrics")
async def handler(request):
gauge_buffer_size.set(sum([len(j.buf) for j in log_files.values()]))
gauge_backlog_size.set(sum([j.tail - j.head for j in log_files.values()]))
c = collections.Counter([j.state for j in log_files.values()])
for key in ("seeking", "replaying", "watching", "closing"):
gauge_log_files.labels(key).set(c[key])
return text(generate_latest().decode("utf-8"))
async def dumper(queue):
while True:
try:
o = await asyncio.wait_for(queue.get(), timeout=0.1)
except asyncio.exceptions.TimeoutError:
break
else:
gauge_queue_entries.set(queue.qsize())
print(o)
@app.listener("before_server_start")
async def init(sanic, loop):
queue = asyncio.Queue(args.max_upload_queue_size)
if not args.dry_run:
db = AsyncIOMotorClient(os.environ["MONGODB_HOST"],
maxPoolSize=args.max_connection_pool_size).get_default_database()
try:
await db.create_collection("log",
capped=bool(args.max_collection_size),
size=args.max_collection_size)
except CollectionInvalid:
pass
coll = db["log"]
loop.create_task(uploader(coll, queue))
else:
coll = None
loop.create_task(dumper(queue))
loop.create_task(watcher(loop, queue, coll))
app.run(host="0.0.0.0", single_process=True)