#!/usr/bin/env python3 from sanic import Sanic from sanic.response import text, json from sanic_prometheus import monitor from motor.motor_asyncio import AsyncIOMotorClient import httpx import pymongo import os app = Sanic(__name__) monitor(app).expose_endpoint() DOORBOY_SECRET = os.environ["DOORBOY_SECRET"] CARD_KUBE_GROUP = os.environ["CARD_KUBE_GROUP"] MONGO_URI = os.getenv("MONGO_URI", "mongodb://127.0.0.1:27017/default?replicaSet=rs0") assert len(DOORBOY_SECRET) >= 10 @app.listener("before_server_start") async def setup_db(app, loop): # TODO: find cleaner way to do this, for more see # https://github.com/sanic-org/sanic/issues/919 app.ctx.db = AsyncIOMotorClient(MONGO_URI).get_default_database() @app.route("/allowed") async def view_doorboy_uids(request): if request.headers.get("KEY") != DOORBOY_SECRET: return text("how about no") async with httpx.AsyncClient() as client: r = await client.get("https://inventory-app-72zn4.codemowers.ee/cards", params={ "group": CARD_KUBE_GROUP }) j = r.json() allowed_uids = [] for obj in j: allowed_uids.append({ "token": obj["token"] }) return json({"allowed_uids": allowed_uids}) @app.route("/longpoll", stream=True) async def view_longpoll(request): response = await request.respond(content_type="text/event-stream") if request.headers.get("KEY") != DOORBOY_SECRET: return text("Invalid token") await response.send("data: response-generator-started\n\n") pipeline = [ { "$match": { "operationType": "insert", } } ] try: async with app.ctx.db.eventlog.watch(pipeline) as stream: await response.send("data: watch-stream-opened\n\n") async for event in stream: if event["fullDocument"].get("type") == "open-door": await response.send("data: %s\n\n" % event["fullDocument"]["door"]) except pymongo.errors.PyMongoError: return if __name__ == "__main__": app.run(debug=False, host="0.0.0.0", port=5000, single_process=True, access_log=True)