9
0
Fork 0

Refactor to be pypy compatible
continuous-integration/drone Build is passing Details

This commit is contained in:
Lauri Võsandi 2022-11-08 23:25:15 +02:00
parent dce38567a3
commit b65f624df1
2 changed files with 173 additions and 181 deletions

View File

@ -1,4 +1,4 @@
FROM harbor.k-space.ee/k-space/microservice-base
RUN pip3 install asyncinotify ujson
RUN pip3 install asyncinotify ujson prometheus-async[aiohttp]
ADD log_shipper.py /log_shipper.py
ENTRYPOINT /log_shipper.py

View File

@ -1,4 +1,4 @@
#!/usr/local/bin/python3 -OO
#!/usr/local/bin/python3
import argparse
import asyncio
import collections
@ -6,22 +6,17 @@ import os
import re
import socket
import ujson
import prometheus_async
import pymongo
from aiofile import async_open
from asyncinotify import Inotify, Mask
from datetime import datetime
from math import inf
from motor.motor_asyncio import AsyncIOMotorClient
from prometheus_client import Counter, Gauge, Histogram
from prometheus_client.exposition import generate_latest
from pymongo.errors import CollectionInvalid
from sanic import Sanic, text
from time import time
"""
To install dependencies:
pip3 install ujson pymongo motor asyncinotify prometheus_client sanic
"""
parser = argparse.ArgumentParser(description="Log shipper",
formatter_class=argparse.ArgumentDefaultsHelpFormatter)
parser.add_argument("--dry-run", action="store_true",
@ -50,7 +45,6 @@ parser.add_argument("--heuristic-normalize-log-level", action="store_true",
args = parser.parse_args()
ROOT = "/var/log/containers"
app = Sanic("tail")
tasks = dict()
with open("/etc/machine-id") as fh:
@ -65,12 +59,6 @@ host_info = {
log_files = dict()
gauge_buffer_size = Gauge(
"logmower_buffer_size_bytes",
"Log files buffered in memory")
gauge_backlog_size = Gauge(
"logmower_backlog_size_bytes",
"Content that is yet to be submitted")
gauge_log_files = Gauge(
"logmower_log_file_count",
"Number of tracked log files",
@ -203,10 +191,15 @@ async def uploader(coll, queue):
break
else:
gauge_queue_entries.set(queue.qsize())
o["event"]["ingested"] = datetime.utcnow()
messages.append(o)
if not messages:
continue
# Set ingestion timestamp
now = datetime.utcnow()
for o in messages:
o["event"]["ingested"] = now
try:
then = time()
await coll.insert_many(messages)
@ -223,7 +216,12 @@ async def uploader(coll, queue):
counter_bulk_insertion_errors.labels(j).inc()
print("Bulk insert failed: %s" % j)
for o in messages:
# Remove ObjectID set during insert_many,
# as we want duplicate errors to be caused only by
# combination of log.file and log.offset collisions
o.pop("_id", None)
# Reset ingestion timestamp
o["event"]["ingested"] = datetime.utcnow()
try:
then = time()
@ -243,70 +241,48 @@ async def uploader(coll, queue):
messages = []
class FileTailer(object):
def __init__(self, path, offset=0, finished=False):
self.head = offset
self.tail = offset + 1
self.offset = offset
class LogFile(object):
def __init__(self, coll, queue, path, namespace_name, pod_name, container_name, start=False):
self.offset = 0
self.path = path
self.buf = b""
self.finished = finished
self.caughtup = False
self.finished = False
self.more_content = asyncio.Event()
async def __aiter__(self):
with open(self.path, "rb") as fh:
while True:
if not self.finished and self.head >= self.tail:
self.caughtup = True
await self.more_content.wait()
self.more_content.clear()
self.tail = fh.seek(0, os.SEEK_END)
if self.head >= self.tail:
if self.finished:
# TODO: if there is still something in buf?
break
continue
fh.seek(self.head)
chunk = fh.read(min(self.tail - self.head, 4096))
self.buf += chunk
self.head += len(chunk)
while True:
step = self.buf.find(b"\n")
if step == -1:
break
buf = self.buf[:step + 1]
self.buf = self.buf[step + 1:]
await asyncio.sleep(0)
yield self.offset, len(buf), buf[:-1].decode("utf-8")
self.offset += step + 1
break
class LogFile(FileTailer):
def __init__(self, loop, coll, queue, path, namespace_name, pod_name, container_name, start=False):
FileTailer.__init__(self, path)
self.queue = queue
self.namespace_name = namespace_name
self.pod_name = pod_name
self.container_name = container_name
self.coll = coll
self.state = "seeking"
self.loop = loop
self._state = None
self.state = "init"
if start:
self.start()
def start(self):
self.loop.create_task(self.handler_loop())
@property
def state(self):
return self._state
def poke(self):
@state.setter
def state(self, value):
self._state = value
c = collections.Counter([j.state for j in log_files.values()])
for key in ("seeking", "replaying", "watching", "closing"):
gauge_log_files.labels(key).set(c[key])
def done(self):
# Do not expect more content in this file
self.finished = True
self.notify()
def notify(self):
# Signal that there is more content in this file
self.more_content.set()
def close(self):
self.done = True
self.poke()
def start(self):
asyncio.create_task(self.handler_loop())
async def handler_loop(self):
self.state = "seeking"
message = ""
record_size = 0
skip_next = False
@ -318,115 +294,138 @@ class LogFile(FileTailer):
}, sort=[("log.offset", -1)])
histogram_database_operation_latency.labels("find-replay-offset").observe(time() - then)
if last_record:
self.head = self.offset = last_record["log"]["offset"]
counter_skipped_bytes.inc(self.head)
self.offset = last_record["log"]["offset"]
counter_skipped_bytes.inc(self.offset)
print("Skipping", self.offset, "bytes for", self.path)
skip_next = True
self.state = "replaying"
record_offset = self.offset
async for line_offset, line_size, line in self:
self.state = "watching" if self.caughtup else "replaying"
assert "\n" not in line
try:
reason = "unicode-encoding"
if len(line) < 45:
reason = "line-short"
raise ValueError()
if not re.match("^(.+) (stdout|stderr)( (.))? (.*)$", line):
reason = "no-regex-match"
raise ValueError()
reason = "invalid-timestamp"
event_created = datetime.strptime(line[:23], "%Y-%m-%dT%H:%M:%S.%f")
except ValueError:
print("Failed to parse file %s at offset %d, reason %s: %s" % (self.path, line_offset, reason, repr(line)))
break
line_offset = self.offset
histogram_line_size.observe(line_size)
record_size += line_size
async with async_open(self.path, "rb") as fp:
fp.seek(self.offset)
while True:
buf = await fp.readline()
self.offset += len(buf)
if not buf and self.finished:
break
if not buf and self.state != "watching":
print("Finished replaying:", self.path)
self.state = "watching"
self.buf += buf
if not buf or not buf.endswith(b"\n"):
await self.more_content.wait()
self.more_content.clear()
continue
if record_size < args.max_record_size:
# TODO: Support Docker runtime on EKS
message += line[45:]
line_size = len(self.buf)
line = self.buf[:-1].decode("utf-8")
state = line[43]
if state == "P":
# This is partial message
continue
assert state == "F", "Unknown line state"
o = {}
o["message"] = message
o["log"] = {}
message = ""
record_size = 0
record_offset = line_offset
line_offset = self.offset
self.buf = b""
if record_size > args.max_record_size:
counter_records.labels("too-large").inc()
# TODO: Log portion of the message
continue
stream = line[36:42].strip()
if args.heuristic_parse_json and o["message"].startswith("{\""):
# TODO: Follow Filebeat hints
try:
j = ujson.loads(message)
except ujson.JSONDecodeError:
counter_heuristic_failures.labels("invalid-json").inc()
else:
# Merge only if parsed JSON message looks like it's
# conforming to ECS schema
if "@timestamp" in j and "message" in j:
o.update(j)
else:
o["json"] = j
reason = "unicode-encoding"
if len(line) < 45:
reason = "line-short"
raise ValueError()
if not re.match("^(.+) (stdout|stderr)( (.))? (.*)$", line):
reason = "no-regex-match"
raise ValueError()
reason = "invalid-timestamp"
event_created = datetime.strptime(line[:23], "%Y-%m-%dT%H:%M:%S.%f")
except ValueError:
print("Failed to parse file %s at offset %d, reason %s: %s" % (self.path, line_offset, reason, repr(line)))
break
o["kubernetes"] = {
"container": {
"name": self.container_name,
},
"namespace": self.namespace_name,
"pod": {
"name": self.pod_name
}
}
o["log"]["file"] = {
"path": self.path
}
o["log"]["offset"] = record_offset
o["host"] = host_info
o["stream"] = stream
o["event"] = {
"created": event_created
}
histogram_line_size.observe(line_size)
record_size += line_size
if args.heuristic_normalize_log_level:
if "level" in o["log"]:
level = o["log"]["level"].strip().lower()
if record_size < args.max_record_size:
# TODO: Support Docker runtime on EKS
message += line[45:]
state = line[43]
if state == "P":
# This is partial message
continue
assert state == "F", "Unknown line state"
o = {}
o["message"] = message
o["log"] = {}
message = ""
record_size = 0
if record_size > args.max_record_size:
counter_records.labels("too-large").inc()
# TODO: Log portion of the message
continue
stream = line[36:42].strip()
if args.heuristic_parse_json and o["message"].startswith("{\""):
# TODO: Follow Filebeat hints
try:
o["log"]["level"] = NORMALIZED_LOG_LEVELS[level]
except KeyError:
counter_heuristic_failures.labels("invalid-log-level").inc()
else:
o["log"]["level"] = "error" if stream == "stderr" else "info"
j = ujson.loads(message)
except ujson.JSONDecodeError:
counter_heuristic_failures.labels("invalid-json").inc()
else:
# Merge only if parsed JSON message looks like it's
# conforming to ECS schema
if "@timestamp" in j and "message" in j:
o.update(j)
else:
o["json"] = j
if "@timestamp" not in o:
o["@timestamp"] = o["event"]["created"]
o.pop("_id", None)
o["kubernetes"] = {
"container": {
"name": self.container_name,
},
"namespace": self.namespace_name,
"pod": {
"name": self.pod_name
}
}
o["log"]["file"] = {
"path": self.path
}
o["log"]["offset"] = record_offset
o["host"] = host_info
o["stream"] = stream
o["event"] = {
"created": event_created
}
if not skip_next:
await self.queue.put(o)
gauge_queue_entries.set(self.queue.qsize())
skip_next = False
record_offset = line_offset
if args.heuristic_normalize_log_level:
if "level" in o["log"]:
level = o["log"]["level"].strip().lower()
try:
o["log"]["level"] = NORMALIZED_LOG_LEVELS[level]
except KeyError:
counter_heuristic_failures.labels("invalid-log-level").inc()
else:
o["log"]["level"] = "error" if stream == "stderr" else "info"
if "@timestamp" not in o:
o["@timestamp"] = o["event"]["created"]
o.pop("_id", None)
if not skip_next:
await self.queue.put(o)
gauge_queue_entries.set(self.queue.qsize())
skip_next = False
record_offset = line_offset
self.state = "closing"
log_files.pop(self.path)
async def watcher(loop, queue, coll):
async def watcher(queue, coll):
print("Starting watching")
with Inotify() as inotify:
def add_file(path, done=False, start=False):
def add_file(path, finished=False, start=False):
if path in log_files:
log_files[path].done = done
log_files[path].finished = finished
return log_files[path]
print("Adding file: %s" % path)
@ -444,9 +443,8 @@ async def watcher(loop, queue, coll):
return
if args.namespace and namespace_name != args.namespace:
return
lf = log_files[path] = LogFile(loop, coll, queue, path, namespace_name, pod_name, container_name)
lf.done = done
lf.start()
lf = log_files[path] = LogFile(coll, queue, path, namespace_name, pod_name, container_name, start=start)
lf.finished = finished
inotify.add_watch(path, Mask.MODIFY | Mask.CLOSE_WRITE)
return lf
@ -469,18 +467,19 @@ async def watcher(loop, queue, coll):
print("Unexpected filename:", filename)
continue
path = os.path.join("/var/log/pods", pod_dir, container_name, filename)
add_file(path, done=True)
add_file(path, finished=True)
# Inspect currently running containers
# Add currently running containers as not finished
for filename in os.listdir("/var/log/containers"):
path = os.path.realpath(os.path.join(os.path.join("/var/log/containers", filename)))
add_file(path, done=False)
add_file(path, finished=False)
# Start coroutines after we know for sure which ones have finished
for log_file in log_files.values():
log_file.start()
async for event in inotify:
# Events for /var/log/pods
# Events for /var/log/containers
if event.mask & Mask.CREATE:
counter_inotify_events.labels("create").inc()
add_file(os.path.realpath(event.path), start=True)
@ -490,29 +489,21 @@ async def watcher(loop, queue, coll):
print("File closed: %s" % event.path)
counter_inotify_events.labels("close_write").inc()
log_file = log_files.get(str(event.path))
log_file.close()
if log_file:
# TODO: Why does this happen?
log_file.done()
elif event.mask & Mask.MODIFY:
counter_inotify_events.labels("modify").inc()
log_file = log_files.get(str(event.path))
if log_file:
# TODO: Count cases where log_file is None
log_file.poke()
# In some cases MODIFY events are triggered after CLOSE_WRITE
log_file.notify()
elif event.mask & Mask.IGNORED:
counter_inotify_events.labels("ignored").inc()
else:
raise NotImplementedError("Unhandled inotify event: %s" % event)
@app.route("/metrics")
async def handler(request):
gauge_buffer_size.set(sum([len(j.buf) for j in log_files.values()]))
gauge_backlog_size.set(sum([j.tail - j.head for j in log_files.values()]))
c = collections.Counter([j.state for j in log_files.values()])
for key in ("seeking", "replaying", "watching", "closing"):
gauge_log_files.labels(key).set(c[key])
return text(generate_latest().decode("utf-8"))
async def dumper(queue):
while True:
try:
@ -524,11 +515,11 @@ async def dumper(queue):
print(o)
@app.listener("before_server_start")
async def init(sanic, loop):
async def main():
queue = asyncio.Queue(args.max_upload_queue_size)
tasks = []
if not args.dry_run:
db = AsyncIOMotorClient(os.environ["MONGODB_HOST"],
db = AsyncIOMotorClient(os.environ["MONGO_URI"],
maxPoolSize=args.max_connection_pool_size).get_default_database()
try:
await db.create_collection("log",
@ -537,11 +528,12 @@ async def init(sanic, loop):
except CollectionInvalid:
pass
coll = db["log"]
loop.create_task(uploader(coll, queue))
tasks.append(uploader(coll, queue))
else:
coll = None
loop.create_task(dumper(queue))
loop.create_task(watcher(loop, queue, coll))
tasks.append(dumper(queue))
tasks.append(prometheus_async.aio.web.start_http_server(addr="0.0.0.0", port=8000))
tasks.append(watcher(queue, coll))
await asyncio.gather(*tasks)
app.run(host="0.0.0.0", single_process=True)
asyncio.run(main())