from datetime import datetime from sanic import Sanic, response from sanic.response import text, json, stream from motor.motor_asyncio import AsyncIOMotorClient import asyncio import pymongo import os import const app = Sanic(__name__) DOORBOY_SECRET = os.environ["DOORBOY_SECRET"] assert len(DOORBOY_SECRET) > 10 @app.listener("before_server_start") async def setup_db(app, loop): # TODO: find cleaner way to do this, for more see # https://github.com/sanic-org/sanic/issues/919 app.db = AsyncIOMotorClient(const.MONGO_URI).get_default_database() @app.route("/allowed") async def view_doorboy_uids(request): if request.headers.get('KEY') != DOORBOY_SECRET: return text("how about no") allowed_names = [] async for obj in app.db.member.find({"enabled": True}): allowed_names.append(obj["_id"]) allowed_uids = [] async for obj in app.db.inventory.find({"token.uid_hash": {"$exists":True}, "inventory.owner": {"$exists":True}, "token.enabled": {"$exists":True}}, {"inventory.owner": True, "token.uid_hash": True }): if obj["inventory"].pop("owner").get("foreign_id") in allowed_names: del obj["_id"] del obj["inventory"] allowed_uids.append(obj) return json({"allowed_uids": allowed_uids}) @app.route("/longpoll") async def view_longpoll(request): if request.headers.get('KEY') != DOORBOY_SECRET: return text("how about no") async def g(response): await response.write("data: response-generator-started\n\n") pipeline = [ { '$match': { 'operationType': "insert", # 'component': 'doorboy', # 'type': 'open-door' } } ] try: async with app.db.eventlog.watch(pipeline) as stream: await response.write("data: watch-stream-opened\n\n") async for event in stream: if event["fullDocument"].get("type") == "open-door": await response.write("data: %s\n\n" % event["fullDocument"]["door"]) except pymongo.errors.PyMongoError: return return stream(g, content_type="text/event-stream") if __name__ == '__main__': app.run(debug=False, host='0.0.0.0', port=5000)