10
0
This repository has been archived on 2023-08-14. You can view files and clone it, but cannot push or open issues or pull requests.
mongo-logger/logger.py

62 lines
1.9 KiB
Python
Raw Normal View History

2021-06-19 18:43:37 +00:00
#!/usr/bin/env python
import asyncio
import msgpack
import os
import struct
import socket
from motor.motor_asyncio import AsyncIOMotorClient
from datetime import datetime
MONGO_URI = os.getenv("MONGO_URI", "mongodb://127.0.0.1:27017/default")
FQDN = socket.getfqdn()
EXPIRE_AFTER_SECONDS = int(os.getenv("EXPIRE_AFTER_SECONDS", "1209600"))
db = AsyncIOMotorClient(MONGO_URI).get_default_database()
db.log.create_index("time", expireAfterSeconds=EXPIRE_AFTER_SECONDS)
2021-07-05 12:44:44 +00:00
2021-06-19 18:43:37 +00:00
class Server(object):
def __init__(self):
self._loop = asyncio.get_event_loop()
async def __call__(self, reader: asyncio.StreamReader, writer: asyncio.StreamWriter) -> None:
unpacker = msgpack.Unpacker(raw=False)
try:
while True:
data = await reader.read(n=4096)
if not data:
raise ConnectionError("Client connection closed")
unpacker.feed(data)
for obj in unpacker:
self._loop.create_task(self._handle_request(obj, writer))
except ConnectionError:
pass
async def _handle_request(self, obj, writer: asyncio.StreamWriter) -> None:
tag, time, record, options = obj
2021-06-19 19:25:45 +00:00
try:
seconds, nanoseconds = struct.unpack(">II", time.data)
2021-07-05 12:44:44 +00:00
record["time"] = datetime.fromtimestamp(seconds).replace(microsecond=int(nanoseconds / 1000))
2021-06-19 19:25:45 +00:00
except AttributeError:
record["time"] = datetime.fromtimestamp(time)
2021-06-19 18:43:37 +00:00
assert not options, "Can't handle options"
2021-06-19 19:25:45 +00:00
2021-06-19 18:43:37 +00:00
record.pop("container_id", None)
record["host"] = FQDN
2021-07-05 12:44:44 +00:00
await db.log.insert_one(record)
2021-06-19 18:43:37 +00:00
async def main():
try:
server = await asyncio.start_server(Server(), host="127.0.0.1", port=24224)
while True:
await asyncio.sleep(0.1)
finally:
server.close()
try:
asyncio.get_event_loop().run_until_complete(main())
except KeyboardInterrupt:
pass