9
0
Fork 0
logmower-shipper-prototype/log_shipper.py

497 lines
18 KiB
Python
Raw Permalink Normal View History

2022-11-08 21:25:15 +00:00
#!/usr/local/bin/python3
2022-10-01 05:49:57 +00:00
import argparse
import asyncio
import collections
2022-11-16 08:39:18 +00:00
import heuristics
2022-10-01 05:49:57 +00:00
import os
import re
import socket
2022-11-08 21:25:15 +00:00
import prometheus_async
2022-10-01 05:49:57 +00:00
import pymongo
2022-11-08 21:25:15 +00:00
from aiofile import async_open
2022-10-01 05:49:57 +00:00
from asyncinotify import Inotify, Mask
from datetime import datetime
from math import inf
from motor.motor_asyncio import AsyncIOMotorClient
from prometheus_client import Counter, Gauge, Histogram
from pymongo.errors import CollectionInvalid
from time import time
2022-11-07 05:53:51 +00:00
parser = argparse.ArgumentParser(description="Log shipper",
formatter_class=argparse.ArgumentDefaultsHelpFormatter)
2022-10-01 05:49:57 +00:00
parser.add_argument("--dry-run", action="store_true",
2022-11-07 05:53:51 +00:00
help="Do not insert anything into database")
2022-11-09 13:50:21 +00:00
# Target selectors
2022-11-07 05:53:51 +00:00
parser.add_argument("--namespace", type=str,
2022-11-09 13:50:21 +00:00
help="Namespace to watch, all by default")
2022-11-07 05:53:51 +00:00
parser.add_argument("--exclude-pod-prefixes", nargs="*", type=str, default=["logmower-"],
2022-11-09 13:50:21 +00:00
help="Pod prefixes to exclude in any of the watched namespaces")
# Tunables
2022-11-07 05:53:51 +00:00
parser.add_argument("--max-upload-queue-size", type=int, default=10000,
help="Max upload queue size in records")
parser.add_argument("--max-connection-pool-size", type=int, default=1,
help="Max MongoDB connection pool size")
parser.add_argument("--max-record-size", type=int, default=128 * 1024,
help="Max record size in bytes, 128k by default")
2022-11-09 13:50:21 +00:00
parser.add_argument("--bulk-insertion-size", type=int, default=1000,
help="MongoDB bulk insertion size in records")
# Retention
2022-11-07 05:53:51 +00:00
parser.add_argument("--max-record-retention", type=int,
help="Record retention in seconds, never by default")
parser.add_argument("--max-collection-size", type=int,
help="MongoDB collection size limit in bytes, by default disabled")
2022-11-09 13:50:21 +00:00
# Optional heuristics
parser.add_argument("--parse-json", action="store_true",
help="Parse log records that look like JSON")
parser.add_argument("--merge-top-level", action="store_true",
help="Merge decoded JSON records on top level if '@timestamp' and 'message' fields are present (looks like ECS schema)")
parser.add_argument("--normalize-log-level", action="store_true",
2022-11-07 05:53:51 +00:00
help="Normalize log.level values to Syslog defined keywords")
2022-11-09 13:50:21 +00:00
parser.add_argument("--stream-to-log-level", action="store_true",
help="Upon missing log.level map stderr to 'error' and stdout to 'info'")
2022-11-07 05:53:51 +00:00
2022-10-01 05:49:57 +00:00
args = parser.parse_args()
ROOT = "/var/log/containers"
tasks = dict()
with open("/etc/machine-id") as fh:
machine_id = fh.read().strip()
host_info = {
"id": machine_id,
"architecture": os.uname().machine,
"name": os.environ.get("NODE_NAME", socket.getfqdn())
}
log_files = dict()
gauge_log_files = Gauge(
"logmower_log_file_count",
"Number of tracked log files",
["state"])
gauge_queue_entries = Gauge(
"logmower_queue_record_count",
"Records queued for submission")
counter_unexpected_filenames = Counter(
"logmower_invalid_filename_count",
"Count of unexpected filenames in logs directory")
counter_inotify_events = Counter(
"logmower_inotify_event_count",
"Count of inotify events",
["mask"])
counter_skipped_bytes = Counter(
"logmower_skipped_bytes",
"Bytes that were skipped during startup due to being already present in data store")
counter_dropped_lines = Counter(
"logmower_dropped_lines",
"Lines dropped due to being part of too long record")
2022-11-07 05:53:51 +00:00
counter_heuristic_failures = Counter(
"logmower_heuristic_failed_record_count",
"Heuristic failures",
["mode"])
2022-10-01 05:49:57 +00:00
counter_records = Counter(
"logmower_record_count",
"Record count",
["stage"])
counter_insertion_errors = Counter(
"logmower_insertion_error_count",
"Exceptions caught during insertion of single event",
["exception"])
2022-11-07 06:52:33 +00:00
counter_bulk_insertion_errors = Counter(
"logmower_bulk_insertion_error_count",
"Exceptions caught during bulk insertions",
["exception"])
2022-10-01 05:49:57 +00:00
counter_bulk_insertions = Counter(
"logmower_bulk_insertion_count",
"Count of bulk insertions to database",
["status"])
histogram_bulk_submission_size = Histogram(
"logmower_bulk_submission_message_count",
"Bulk submission message count",
buckets=(1, 5, 10, 50, 100, 500, 1000, 5000, 10000))
histogram_database_operation_latency = Histogram(
"logmower_database_operation_latency",
"Database operation latency",
["operation"],
buckets=(0.1, 0.2, 0.5, 1, 5, 10, 50))
histogram_bulk_submission_latency = Histogram(
"logmower_bulk_submission_latency",
"Bulk submission latency",
buckets=(0.1, 0.2, 0.5, 1, 5, 10, 50))
histogram_line_size = Histogram(
"logmower_line_size_bytes",
"Log file line size in sizes",
buckets=(80, 160, 320, 640, 1280, inf))
2022-11-16 08:39:18 +00:00
def recursively_default_dict():
return collections.defaultdict(recursively_default_dict)
2022-11-07 05:53:51 +00:00
2022-10-01 05:49:57 +00:00
async def uploader(coll, queue):
then = time()
2022-11-07 05:53:51 +00:00
kwargs = {}
if args.max_record_retention:
kwargs["expireAfterSeconds"] = args.max_record_retention
await coll.create_index([("@timestamp", 1)], **kwargs)
2022-10-01 05:49:57 +00:00
# Following index is used to look up where to resume submitting logs
# after restart/crash
await coll.create_index([("log.file.path", 1),
2022-11-07 05:53:51 +00:00
("log.offset", 1)],
unique=True)
2022-10-01 05:49:57 +00:00
# Indexes used for frequent searches
await coll.create_index([("host.name", 1)])
await coll.create_index([("kubernetes.pod.name", 1)],
sparse=True)
await coll.create_index([("kubernetes.namespace", 1),
("kubernetes.pod.name", 1),
("kubernetes.container.name", 1)],
sparse=True)
histogram_database_operation_latency.labels("create-index").observe(time() - then)
messages = []
while True:
while len(messages) < args.bulk_insertion_size:
try:
o = await asyncio.wait_for(queue.get(), timeout=0.1)
except asyncio.exceptions.TimeoutError:
break
2022-11-07 06:52:33 +00:00
else:
gauge_queue_entries.set(queue.qsize())
messages.append(o)
2022-10-01 05:49:57 +00:00
if not messages:
continue
2022-11-08 21:25:15 +00:00
# Set ingestion timestamp
now = datetime.utcnow()
for o in messages:
o["event"]["ingested"] = now
2022-10-01 05:49:57 +00:00
try:
then = time()
await coll.insert_many(messages)
histogram_database_operation_latency.labels("insert-many").observe(time() - then)
except pymongo.errors.ServerSelectionTimeoutError:
2022-11-07 06:52:33 +00:00
counter_bulk_insertions.labels("timed-out").inc()
2022-10-01 05:49:57 +00:00
continue
2022-11-07 15:00:08 +00:00
except pymongo.errors.NotPrimaryError:
counter_bulk_insertions.labels("not-primary").inc()
continue
2022-11-16 08:39:18 +00:00
except (pymongo.errors.BulkWriteError, OverflowError) as e:
2022-11-07 06:52:33 +00:00
counter_bulk_insertions.labels("retried-as-singles").inc()
j = "%s.%s" % (e.__class__.__module__, e.__class__.__name__)
counter_bulk_insertion_errors.labels(j).inc()
print("Bulk insert failed: %s" % j)
2022-10-01 05:49:57 +00:00
for o in messages:
2022-11-08 21:25:15 +00:00
# Remove ObjectID set during insert_many,
# as we want duplicate errors to be caused only by
# combination of log.file and log.offset collisions
2022-10-01 05:49:57 +00:00
o.pop("_id", None)
2022-11-08 21:25:15 +00:00
# Reset ingestion timestamp
2022-10-01 05:49:57 +00:00
o["event"]["ingested"] = datetime.utcnow()
try:
then = time()
await coll.insert_one(o)
histogram_database_operation_latency.labels("insert-one").observe(time() - then)
except Exception as e:
j = "%s.%s" % (e.__class__.__module__, e.__class__.__name__)
counter_insertion_errors.labels(j).inc()
counter_records.labels("dropped").inc()
print("Failed to insert (%s): %s" % (j, o))
else:
counter_records.labels("commited").inc()
else:
counter_bulk_insertions.labels("successful").inc()
histogram_bulk_submission_size.observe(len(messages))
counter_records.labels("commited").inc(len(messages))
messages = []
2022-11-08 21:25:15 +00:00
class LogFile(object):
2022-11-15 20:29:10 +00:00
def __init__(self, coll, queue, path, namespace_name, pod_name, container_name, pod_id, start=False, lookup_offset=True):
2022-11-08 21:25:15 +00:00
self.offset = 0
2022-10-01 05:49:57 +00:00
self.path = path
2022-11-07 16:18:33 +00:00
self.buf = b""
2022-11-08 21:25:15 +00:00
self.finished = False
2022-10-01 05:49:57 +00:00
self.more_content = asyncio.Event()
self.queue = queue
self.namespace_name = namespace_name
self.pod_name = pod_name
self.container_name = container_name
2022-11-15 20:29:10 +00:00
self.pod_id = pod_id
2022-10-01 05:49:57 +00:00
self.coll = coll
2022-11-08 21:25:15 +00:00
self._state = None
self.state = "init"
self.lookup_offset = lookup_offset
2022-11-06 21:07:29 +00:00
if start:
self.start()
2022-11-06 19:08:39 +00:00
2022-11-08 21:25:15 +00:00
@property
def state(self):
return self._state
@state.setter
def state(self, value):
self._state = value
c = collections.Counter([j.state for j in log_files.values()])
for key in ("seeking", "replaying", "watching", "closing"):
gauge_log_files.labels(key).set(c[key])
2022-10-01 05:49:57 +00:00
2022-11-08 21:25:15 +00:00
def done(self):
# Do not expect more content in this file
self.finished = True
self.notify()
def notify(self):
# Signal that there is more content in this file
2022-10-01 05:49:57 +00:00
self.more_content.set()
2022-11-08 21:25:15 +00:00
def start(self):
asyncio.create_task(self.handler_loop())
2022-10-01 05:49:57 +00:00
async def handler_loop(self):
2022-11-08 21:25:15 +00:00
self.state = "seeking"
2022-10-01 05:49:57 +00:00
message = ""
record_size = 0
skip_next = False
if not args.dry_run:
then = time()
last_record = await self.coll.find_one({
"host.id": host_info["id"],
"log.file.path": self.path
}, sort=[("log.offset", -1)])
histogram_database_operation_latency.labels("offset-lookup").observe(time() - then)
2022-10-01 05:49:57 +00:00
if last_record:
2022-11-08 21:25:15 +00:00
self.offset = last_record["log"]["offset"]
counter_skipped_bytes.inc(self.offset)
print("Skipping", self.offset, "bytes for", self.path)
2022-10-01 05:49:57 +00:00
skip_next = True
self.state = "replaying"
2022-11-07 16:18:33 +00:00
record_offset = self.offset
2022-11-08 21:25:15 +00:00
line_offset = self.offset
2022-10-01 05:49:57 +00:00
2022-11-08 21:25:15 +00:00
async with async_open(self.path, "rb") as fp:
fp.seek(self.offset)
while True:
buf = await fp.readline()
self.offset += len(buf)
if not buf and self.finished:
break
if not buf and self.state != "watching":
print("Finished replaying:", self.path)
self.state = "watching"
self.buf += buf
if not buf or not buf.endswith(b"\n"):
await self.more_content.wait()
self.more_content.clear()
continue
2022-10-01 05:49:57 +00:00
2022-11-08 21:25:15 +00:00
line_size = len(self.buf)
line = self.buf[:-1].decode("utf-8")
2022-10-01 05:49:57 +00:00
2022-11-08 21:25:15 +00:00
record_offset = line_offset
line_offset = self.offset
self.buf = b""
2022-10-01 05:49:57 +00:00
try:
2022-11-08 21:25:15 +00:00
reason = "unicode-encoding"
if len(line) < 45:
reason = "line-short"
raise ValueError()
if not re.match("^(.+) (stdout|stderr)( (.))? (.*)$", line):
reason = "no-regex-match"
raise ValueError()
reason = "invalid-timestamp"
event_created = datetime.strptime(line[:23], "%Y-%m-%dT%H:%M:%S.%f")
except ValueError:
print("Failed to parse file %s at offset %d, reason %s: %s" % (self.path, line_offset, reason, repr(line)))
break
histogram_line_size.observe(line_size)
record_size += line_size
if record_size < args.max_record_size:
# TODO: Support Docker runtime on EKS
message += line[45:]
state = line[43]
if state == "P":
# This is partial message
continue
assert state == "F", "Unknown line state"
2022-11-16 08:39:18 +00:00
o = recursively_default_dict()
2022-11-08 21:25:15 +00:00
o["message"] = message
2022-11-16 08:39:18 +00:00
2022-11-08 21:25:15 +00:00
message = ""
record_size = 0
if record_size > args.max_record_size:
counter_records.labels("too-large").inc()
# TODO: Log portion of the message
continue
stream = line[36:42].strip()
2022-11-16 08:39:18 +00:00
heuristics.process(o)
2022-11-08 21:25:15 +00:00
o["kubernetes"] = {
"container": {
"name": self.container_name,
},
"namespace": self.namespace_name,
"pod": {
2022-11-15 20:29:10 +00:00
"id": self.pod_id,
2022-11-08 21:25:15 +00:00
"name": self.pod_name
}
2022-10-01 05:49:57 +00:00
}
2022-11-08 21:25:15 +00:00
o["log"]["file"] = {
"path": self.path
}
o["log"]["offset"] = record_offset
o["host"] = host_info
o["stream"] = stream
o["event"] = {
"created": event_created
}
if "@timestamp" not in o:
o["@timestamp"] = o["event"]["created"]
o.pop("_id", None)
2022-10-01 05:49:57 +00:00
2022-11-08 21:25:15 +00:00
if not skip_next:
await self.queue.put(o)
gauge_queue_entries.set(self.queue.qsize())
skip_next = False
record_offset = line_offset
2022-10-01 05:49:57 +00:00
self.state = "closing"
log_files.pop(self.path)
2022-11-08 21:25:15 +00:00
async def watcher(queue, coll):
2022-10-01 05:49:57 +00:00
print("Starting watching")
with Inotify() as inotify:
def add_file(path, finished=False, start=False, lookup_offset=True):
2022-11-06 19:08:39 +00:00
if path in log_files:
2022-11-08 21:25:15 +00:00
log_files[path].finished = finished
2022-11-06 19:08:39 +00:00
return log_files[path]
2022-11-15 20:29:10 +00:00
m = re.match("/var/log/pods/(.*)_(.*)_(.*)/(.*)/[0-9]+\\.log$", path)
2022-11-06 19:08:39 +00:00
2022-10-01 05:49:57 +00:00
if not m:
2022-11-06 19:08:39 +00:00
print("Unexpected filename:", path)
2022-10-01 05:49:57 +00:00
counter_unexpected_filenames.inc()
return
2022-11-15 20:29:10 +00:00
namespace_name, pod_name, pod_id, container_name = m.groups()
2022-11-13 18:20:22 +00:00
if args.namespace and args.namespace != namespace_name:
print("Skipping due to namespace mismatch:", path)
2022-11-07 05:53:51 +00:00
return
2022-10-01 05:49:57 +00:00
for prefix in args.exclude_pod_prefixes:
if pod_name.startswith(prefix):
2022-11-13 18:20:22 +00:00
print("Skipping due to pod prefix mismatch:", path)
2022-10-01 05:49:57 +00:00
return
2022-11-13 18:20:22 +00:00
print("Adding file: %s" % path)
lf = log_files[path] = LogFile(coll, queue, path, namespace_name,
2022-11-15 20:29:10 +00:00
pod_name, container_name, pod_id, start, lookup_offset)
2022-11-08 21:25:15 +00:00
lf.finished = finished
inotify.add_watch(path, Mask.MODIFY | Mask.CLOSE_WRITE)
2022-11-06 19:08:39 +00:00
return lf
2022-10-01 05:49:57 +00:00
inotify.add_watch(ROOT, Mask.CREATE | Mask.ONLYDIR)
2022-11-06 19:08:39 +00:00
# Register all existing log files
for pod_dir in os.listdir("/var/log/pods"):
m = re.match("(.*)_(.*)_(.*)$", pod_dir)
if not m:
print("Unexpected directory", pod_dir)
continue
namespace_name, pod_name, pod_id = m.groups()
for container_name in os.listdir(os.path.join("/var/log/pods", pod_dir)):
if not re.match("^(?![0-9]+$)(?!-)[a-zA-Z0-9-]{,63}(?<!-)$", container_name):
print("Unexpected directory:", container_name)
continue
for filename in os.listdir(os.path.join("/var/log/pods", pod_dir, container_name)):
m = re.match("[0-9]+\\.log$", filename)
if not m:
print("Unexpected filename:", filename)
continue
path = os.path.join("/var/log/pods", pod_dir, container_name, filename)
2022-11-08 21:25:15 +00:00
add_file(path, finished=True)
2022-11-06 19:08:39 +00:00
2022-11-08 21:25:15 +00:00
# Add currently running containers as not finished
2022-11-06 19:08:39 +00:00
for filename in os.listdir("/var/log/containers"):
path = os.path.realpath(os.path.join(os.path.join("/var/log/containers", filename)))
2022-11-08 21:25:15 +00:00
add_file(path, finished=False)
2022-11-06 19:08:39 +00:00
2022-11-08 21:25:15 +00:00
# Start coroutines after we know for sure which ones have finished
2022-11-06 19:08:39 +00:00
for log_file in log_files.values():
log_file.start()
2022-10-01 05:49:57 +00:00
async for event in inotify:
2022-11-08 21:25:15 +00:00
# Events for /var/log/containers
2022-10-01 05:49:57 +00:00
if event.mask & Mask.CREATE:
counter_inotify_events.labels("create").inc()
add_file(os.path.realpath(event.path), start=True, lookup_offset=False)
2022-11-06 19:08:39 +00:00
# Events for /var/log/pods
2022-10-01 05:49:57 +00:00
elif event.mask & Mask.CLOSE_WRITE:
print("File closed: %s" % event.path)
counter_inotify_events.labels("close_write").inc()
2022-11-06 19:08:39 +00:00
log_file = log_files.get(str(event.path))
2022-11-08 21:25:15 +00:00
if log_file:
# TODO: Why does this happen?
log_file.done()
2022-10-01 05:49:57 +00:00
elif event.mask & Mask.MODIFY:
counter_inotify_events.labels("modify").inc()
log_file = log_files.get(str(event.path))
2022-11-06 19:59:51 +00:00
if log_file:
2022-11-08 21:25:15 +00:00
# In some cases MODIFY events are triggered after CLOSE_WRITE
log_file.notify()
2022-10-01 05:49:57 +00:00
elif event.mask & Mask.IGNORED:
counter_inotify_events.labels("ignored").inc()
else:
raise NotImplementedError("Unhandled inotify event: %s" % event)
async def dumper(queue):
while True:
try:
o = await asyncio.wait_for(queue.get(), timeout=0.1)
except asyncio.exceptions.TimeoutError:
break
else:
gauge_queue_entries.set(queue.qsize())
print(o)
2022-11-08 21:25:15 +00:00
async def main():
2022-11-07 05:53:51 +00:00
queue = asyncio.Queue(args.max_upload_queue_size)
2022-11-08 21:25:15 +00:00
tasks = []
2022-10-01 05:49:57 +00:00
if not args.dry_run:
2022-11-08 21:25:15 +00:00
db = AsyncIOMotorClient(os.environ["MONGO_URI"],
2022-11-07 05:53:51 +00:00
maxPoolSize=args.max_connection_pool_size).get_default_database()
2022-10-01 05:49:57 +00:00
try:
2022-11-07 05:53:51 +00:00
await db.create_collection("log",
capped=bool(args.max_collection_size),
size=args.max_collection_size)
2022-10-01 05:49:57 +00:00
except CollectionInvalid:
pass
coll = db["log"]
2022-11-08 21:25:15 +00:00
tasks.append(uploader(coll, queue))
2022-10-01 05:49:57 +00:00
else:
coll = None
2022-11-08 21:25:15 +00:00
tasks.append(dumper(queue))
tasks.append(prometheus_async.aio.web.start_http_server(addr="0.0.0.0", port=8000))
tasks.append(watcher(queue, coll))
await asyncio.gather(*tasks)
2022-10-01 05:49:57 +00:00
2022-11-08 21:25:15 +00:00
asyncio.run(main())