commit 51880829f5d68b25457e01221a8c36af8869efee Author: Lauri Võsandi Date: Sat Jun 19 18:43:37 2021 +0000 Initial commit diff --git a/.gitignore b/.gitignore new file mode 100644 index 0000000..4c49bd7 --- /dev/null +++ b/.gitignore @@ -0,0 +1 @@ +.env diff --git a/Dockerfile b/Dockerfile new file mode 100644 index 0000000..8dc0473 --- /dev/null +++ b/Dockerfile @@ -0,0 +1,4 @@ +FROM python:3 +RUN pip install motor msgpack +COPY logger.py / +ENTRYPOINT /logger.py diff --git a/docker-compose.yml b/docker-compose.yml new file mode 100644 index 0000000..28eb343 --- /dev/null +++ b/docker-compose.yml @@ -0,0 +1,12 @@ +version: "3.7" + +services: + app: + restart: always + image: kspaceee/mongo-logger + build: + context: . + network_mode: host + env_file: .env + logging: + driver: json-file diff --git a/logger.py b/logger.py new file mode 100755 index 0000000..1811850 --- /dev/null +++ b/logger.py @@ -0,0 +1,56 @@ +#!/usr/bin/env python +import asyncio +import msgpack +import os +import struct +import socket +from motor.motor_asyncio import AsyncIOMotorClient +from datetime import datetime + +MONGO_URI = os.getenv("MONGO_URI", "mongodb://127.0.0.1:27017/default") +FQDN = socket.getfqdn() +EXPIRE_AFTER_SECONDS = int(os.getenv("EXPIRE_AFTER_SECONDS", "1209600")) + +db = AsyncIOMotorClient(MONGO_URI).get_default_database() +db.log.create_index("time", expireAfterSeconds=EXPIRE_AFTER_SECONDS) + +class Server(object): + def __init__(self): + self._loop = asyncio.get_event_loop() + + async def __call__(self, reader: asyncio.StreamReader, writer: asyncio.StreamWriter) -> None: + unpacker = msgpack.Unpacker(raw=False) + try: + while True: + data = await reader.read(n=4096) + if not data: + raise ConnectionError("Client connection closed") + unpacker.feed(data) + for obj in unpacker: + self._loop.create_task(self._handle_request(obj, writer)) + except ConnectionError: + pass + + async def _handle_request(self, obj, writer: asyncio.StreamWriter) -> None: + tag, time, record, options = obj + seconds, nanoseconds = struct.unpack(">II", time.data) + assert not options, "Can't handle options" + record["time"] = datetime.fromtimestamp(seconds).replace(microsecond=int(nanoseconds/1000)) + record.pop("container_id", None) + record["host"] = FQDN + result = await db.log.insert_one(record) + + +async def main(): + try: + server = await asyncio.start_server(Server(), host="127.0.0.1", port=24224) + while True: + await asyncio.sleep(0.1) + finally: + server.close() + + +try: + asyncio.get_event_loop().run_until_complete(main()) +except KeyboardInterrupt: + pass