import argparse import asyncio import re import os import sys from motor.motor_asyncio import AsyncIOMotorClient from termcolor import colored MONGO_URI = os.getenv("MONGO_URI") if not MONGO_URI: print("MONGO_URI not specified") sys.exit(1) db = AsyncIOMotorClient(MONGO_URI).get_default_database() parser = argparse.ArgumentParser() parser.add_argument("--include-container", action="append", default=[]) parser.add_argument("--exclude-container", action="append", default=[]) parser.add_argument("--exclude-host", action="append", default=[]) args = parser.parse_args() async def main(): async with db.log.watch([{"$match": {"operationType": "insert"}}]) as stream: print("Connected") async for event in stream: doc = event["fullDocument"] host = doc.get("host", "-") if host in args.exclude_host: continue container_name = doc.get("container_name", "/-")[1:] if args.include_container and container_name not in args.include_container: continue if container_name in args.exclude_container: continue msg = doc.get("message") if msg: container_name = "system:%s" % doc.get("ident") else: msg = doc.get("log") # TODO: Do this mangling in fluentd.conf m = re.match("time=\".*?\" level=[a-z]+ msg=\"(.*?)\"", msg) if m: msg, = m.groups() m = re.match(r"\[\d+.*? \d+\] (.*?)\"", msg) if m: msg, = m.groups() print(doc["time"], colored(doc.get("host", "-"), "blue"), "\t", "% 30s" % colored(container_name, "yellow"), msg) loop = asyncio.get_event_loop() loop.run_until_complete(main())