#!/usr/bin/env python import asyncio import msgpack import os import struct import socket from motor.motor_asyncio import AsyncIOMotorClient from datetime import datetime MONGO_URI = os.getenv("MONGO_URI", "mongodb://127.0.0.1:27017/default") FQDN = socket.getfqdn() EXPIRE_AFTER_SECONDS = int(os.getenv("EXPIRE_AFTER_SECONDS", "1209600")) db = AsyncIOMotorClient(MONGO_URI).get_default_database() db.log.create_index("time", expireAfterSeconds=EXPIRE_AFTER_SECONDS) class Server(object): def __init__(self): self._loop = asyncio.get_event_loop() async def __call__(self, reader: asyncio.StreamReader, writer: asyncio.StreamWriter) -> None: unpacker = msgpack.Unpacker(raw=False) try: while True: data = await reader.read(n=4096) if not data: raise ConnectionError("Client connection closed") unpacker.feed(data) for obj in unpacker: self._loop.create_task(self._handle_request(obj, writer)) except ConnectionError: pass async def _handle_request(self, obj, writer: asyncio.StreamWriter) -> None: tag, time, record, options = obj try: seconds, nanoseconds = struct.unpack(">II", time.data) record["time"] = datetime.fromtimestamp(seconds).replace(microsecond=int(nanoseconds / 1000)) except AttributeError: record["time"] = datetime.fromtimestamp(time) assert not options, "Can't handle options" record.pop("container_id", None) record["host"] = FQDN await db.log.insert_one(record) async def main(): try: server = await asyncio.start_server(Server(), host="127.0.0.1", port=24224) while True: await asyncio.sleep(0.1) finally: server.close() try: asyncio.get_event_loop().run_until_complete(main()) except KeyboardInterrupt: pass