Initial commit
This commit is contained in:
commit
51880829f5
|
@ -0,0 +1 @@
|
||||||
|
.env
|
|
@ -0,0 +1,4 @@
|
||||||
|
FROM python:3
|
||||||
|
RUN pip install motor msgpack
|
||||||
|
COPY logger.py /
|
||||||
|
ENTRYPOINT /logger.py
|
|
@ -0,0 +1,12 @@
|
||||||
|
version: "3.7"
|
||||||
|
|
||||||
|
services:
|
||||||
|
app:
|
||||||
|
restart: always
|
||||||
|
image: kspaceee/mongo-logger
|
||||||
|
build:
|
||||||
|
context: .
|
||||||
|
network_mode: host
|
||||||
|
env_file: .env
|
||||||
|
logging:
|
||||||
|
driver: json-file
|
|
@ -0,0 +1,56 @@
|
||||||
|
#!/usr/bin/env python
|
||||||
|
import asyncio
|
||||||
|
import msgpack
|
||||||
|
import os
|
||||||
|
import struct
|
||||||
|
import socket
|
||||||
|
from motor.motor_asyncio import AsyncIOMotorClient
|
||||||
|
from datetime import datetime
|
||||||
|
|
||||||
|
MONGO_URI = os.getenv("MONGO_URI", "mongodb://127.0.0.1:27017/default")
|
||||||
|
FQDN = socket.getfqdn()
|
||||||
|
EXPIRE_AFTER_SECONDS = int(os.getenv("EXPIRE_AFTER_SECONDS", "1209600"))
|
||||||
|
|
||||||
|
db = AsyncIOMotorClient(MONGO_URI).get_default_database()
|
||||||
|
db.log.create_index("time", expireAfterSeconds=EXPIRE_AFTER_SECONDS)
|
||||||
|
|
||||||
|
class Server(object):
|
||||||
|
def __init__(self):
|
||||||
|
self._loop = asyncio.get_event_loop()
|
||||||
|
|
||||||
|
async def __call__(self, reader: asyncio.StreamReader, writer: asyncio.StreamWriter) -> None:
|
||||||
|
unpacker = msgpack.Unpacker(raw=False)
|
||||||
|
try:
|
||||||
|
while True:
|
||||||
|
data = await reader.read(n=4096)
|
||||||
|
if not data:
|
||||||
|
raise ConnectionError("Client connection closed")
|
||||||
|
unpacker.feed(data)
|
||||||
|
for obj in unpacker:
|
||||||
|
self._loop.create_task(self._handle_request(obj, writer))
|
||||||
|
except ConnectionError:
|
||||||
|
pass
|
||||||
|
|
||||||
|
async def _handle_request(self, obj, writer: asyncio.StreamWriter) -> None:
|
||||||
|
tag, time, record, options = obj
|
||||||
|
seconds, nanoseconds = struct.unpack(">II", time.data)
|
||||||
|
assert not options, "Can't handle options"
|
||||||
|
record["time"] = datetime.fromtimestamp(seconds).replace(microsecond=int(nanoseconds/1000))
|
||||||
|
record.pop("container_id", None)
|
||||||
|
record["host"] = FQDN
|
||||||
|
result = await db.log.insert_one(record)
|
||||||
|
|
||||||
|
|
||||||
|
async def main():
|
||||||
|
try:
|
||||||
|
server = await asyncio.start_server(Server(), host="127.0.0.1", port=24224)
|
||||||
|
while True:
|
||||||
|
await asyncio.sleep(0.1)
|
||||||
|
finally:
|
||||||
|
server.close()
|
||||||
|
|
||||||
|
|
||||||
|
try:
|
||||||
|
asyncio.get_event_loop().run_until_complete(main())
|
||||||
|
except KeyboardInterrupt:
|
||||||
|
pass
|
Reference in New Issue