doorboy-proxy/app/doorboy-proxy.py

87 lines
2.8 KiB
Python
Executable File

#!/usr/bin/env python3
from sanic import Sanic
from sanic.response import text, json
from sanic_prometheus import monitor
from motor.motor_asyncio import AsyncIOMotorClient
import httpx
import pymongo
import os
app = Sanic(__name__)
monitor(app).expose_endpoint()
INVENTORY_API_KEY = os.environ["INVENTORY_API_KEY"]
DOORBOY_SECRET_FLOOR = os.environ["DOORBOY_SECRET_FLOOR"]
DOORBOY_SECRET_WORKSHOP = os.environ["DOORBOY_SECRET_WORKSHOP"]
CARD_URI = os.environ["CARD_URI"]
FLOOR_ACCESS_GROUP = os.environ["FLOOR_ACCESS_GROUP"]
WORKSHOP_ACCESS_GROUP = os.environ["WORKSHOP_ACCESS_GROUP"]
MONGO_URI = os.getenv("MONGO_URI",
"mongodb://127.0.0.1:27017/default?replicaSet=rs0")
assert len(DOORBOY_SECRET_FLOOR) >= 10
assert len(DOORBOY_SECRET_WORKSHOP) >= 10
@app.listener("before_server_start")
async def setup_db(app, loop):
# TODO: find cleaner way to do this, for more see
# https://github.com/sanic-org/sanic/issues/919
app.ctx.db = AsyncIOMotorClient(MONGO_URI).get_default_database()
@app.route("/allowed")
async def view_doorboy_uids(request):
key = request.headers.get("KEY")
if not key or key not in [DOORBOY_SECRET_FLOOR, DOORBOY_SECRET_WORKSHOP]:
return text("how about no")
if key == DOORBOY_SECRET_FLOOR:
group = FLOOR_ACCESS_GROUP
elif key == DOORBOY_SECRET_WORKSHOP:
group = WORKSHOP_ACCESS_GROUP
if not group:
return "fail", 500
async with httpx.AsyncClient() as client:
r = await client.post(CARD_URI, json={
"groups": [group]
}, headers={
"Content-Type": "application/json",
"Authorization": f"Basic {INVENTORY_API_KEY}"
})
j = r.json()
allowed_uids = []
for obj in j:
allowed_uids.append({
"token": obj["token"]
})
return json({"allowed_uids": allowed_uids})
@app.route("/longpoll", stream=True)
async def view_longpoll(request):
response = await request.respond(content_type="text/event-stream")
if not key or key not in [DOORBOY_SECRET_FLOOR, DOORBOY_SECRET_WORKSHOP]:
return text("Invalid token")
await response.send("data: response-generator-started\n\n")
pipeline = [
{
"$match": {
"operationType": "insert",
}
}
]
try:
async with app.ctx.db.eventlog.watch(pipeline) as stream:
await response.send("data: watch-stream-opened\n\n")
async for event in stream:
if event["fullDocument"].get("type") == "open-door":
await response.send("data: %s\n\n" %
event["fullDocument"]["door"])
except pymongo.errors.PyMongoError:
return
if __name__ == "__main__":
app.run(debug=False, host="0.0.0.0", port=5000, single_process=True, access_log=True)