65 lines
2.2 KiB
Python
65 lines
2.2 KiB
Python
from datetime import datetime
|
|
from sanic import Sanic, response
|
|
from sanic.response import text, json, stream
|
|
from motor.motor_asyncio import AsyncIOMotorClient
|
|
import uvloop, asyncio
|
|
import pymongo
|
|
import os
|
|
import const
|
|
|
|
app = Sanic(__name__)
|
|
|
|
mongodb = AsyncIOMotorClient(const.MONGO_URI)
|
|
db = mongodb.get_default_database()
|
|
|
|
DOORBOY_SECRET = os.environ["DOORBOY_SECRET"]
|
|
|
|
assert len(DOORBOY_SECRET) > 10
|
|
|
|
@app.route("/allowed")
|
|
async def view_doorboy_uids(request):
|
|
if request.headers.get('KEY') != DOORBOY_SECRET:
|
|
return text("how about no")
|
|
allowed_names = []
|
|
async for obj in db.member.find({"enabled": True}):
|
|
allowed_names.append(obj["_id"])
|
|
allowed_uids = []
|
|
async for obj in db.inventory.find({"token.uid_hash": {"$exists":True}, "inventory.owner_id": {"$exists":True}, "token.enabled": {"$exists":True}}, {"inventory.owner_id": True, "token.uid_hash": True }):
|
|
if obj["inventory"].pop("owner_id") in allowed_names:
|
|
del obj["_id"]
|
|
del obj["inventory"]
|
|
allowed_uids.append(obj)
|
|
return json({"allowed_uids": allowed_uids})
|
|
|
|
|
|
@app.route("/longpoll")
|
|
async def view_longpoll(request):
|
|
if request.headers.get('KEY') != DOORBOY_SECRET:
|
|
return text("how about no")
|
|
|
|
async def g(response):
|
|
await response.write("data: response-generator-started\n\n")
|
|
pipeline = [
|
|
{
|
|
'$match': {
|
|
'operationType': "insert",
|
|
# 'component': 'doorboy',
|
|
# 'type': 'open-door'
|
|
}
|
|
}
|
|
]
|
|
try:
|
|
async with db.eventlog.watch(pipeline) as stream:
|
|
await response.write("data: watch-stream-opened\n\n")
|
|
async for event in stream:
|
|
if event["fullDocument"].get("type") == "open-door":
|
|
await response.write("data: %s\n\n" % event["fullDocument"]["door"])
|
|
except pymongo.errors.PyMongoError:
|
|
return
|
|
return stream(g, content_type="text/event-stream")
|
|
|
|
if __name__ == '__main__':
|
|
asyncio.set_event_loop_policy(uvloop.EventLoopPolicy()) # swap default event loop to uvloop
|
|
loop = asyncio.get_event_loop()
|
|
app.run(host='::', debug=False, loop=loop)
|