#!/usr/bin/env python3 import os from datetime import date, datetime from functools import wraps from typing import List from dateutil.parser import parse from motor.motor_asyncio import AsyncIOMotorClient from pymongo.errors import PyMongoError from sanic import Sanic from sanic.response import json, text from sanic_prometheus import monitor import kube import slack app = Sanic(__name__) slack.add_routes(app) monitor(app).expose_endpoint() # API key for godoor controllers authenticating to k-space:floor DOORBOY_SECRET_FLOOR = os.environ["DOORBOY_SECRET_FLOOR"] # API key for godoor controllers authenticating to k-space:workshop DOORBOY_SECRET_WORKSHOP = os.environ["DOORBOY_SECRET_WORKSHOP"] FLOOR_ACCESS_GROUP = os.getenv("FLOOR_ACCESS_GROUP", "k-space:floor") WORKSHOP_ACCESS_GROUP = os.getenv("WORKSHOP_ACCESS_GROUP", "k-space:workshop") MONGO_URI = os.environ["MONGO_URI"] assert len(DOORBOY_SECRET_FLOOR) >= 10 assert len(DOORBOY_SECRET_WORKSHOP) >= 10 @app.listener("before_server_start") async def setup_db(app, loop): # TODO: find cleaner way to do this, for more see # https://github.com/sanic-org/sanic/issues/919 app.ctx.db = AsyncIOMotorClient(MONGO_URI).get_default_database() def authenticate_door(wrapped): def decorator(f): @wraps(f) async def decorated_function(request, *args, **kwargs): doorboy_secret = request.headers.get("KEY") if doorboy_secret not in [DOORBOY_SECRET_FLOOR, DOORBOY_SECRET_WORKSHOP]: return text("Invalid doorboy secret token", status=401) return await f(request, *args, **kwargs) return decorated_function return decorator(wrapped) @app.route("/allowed") @authenticate_door async def view_doorboy_uids(request): users = List[str] # authorize key = request.headers.get("KEY") if key == DOORBOY_SECRET_FLOOR: users = kube.users_with_group(FLOOR_ACCESS_GROUP) elif key == DOORBOY_SECRET_WORKSHOP: users = kube.users_with_group(WORKSHOP_ACCESS_GROUP) else: print("WARN: unknown door token in /allowed") return "unknown doorboy secret token", 403 flt = { "token.uid_hash": {"$exists": True}, "inventory.owner.username": {"$in": users}, } prj = {"token.uid_hash": True} tokens = await app.ctx.db.inventory.find(flt, prj) print(f"delegating {len(tokens)} doorkey tokens") return json({"allowed_uids": tokens}) def datetime_to_json_formatting(o): if isinstance(o, (date, datetime)): return o.isoformat() # Only identified and successful events. **Endpoint not in use.** @app.route("/logs") async def view_open_door_events(request): return text("not an admin"), 403 results = ( await app.ctx.db.eventlog.find( { "component": "doorboy", "type": "open-door", "approved": True, "$or": [ {"type": "open-door"}, {"event": "card-swiped"}, ], "door": {"$exists": True}, "timestamp": {"$exists": True}, } ) .sort("timestamp", -1) .to_list(length=None) ) transformed = [] for r in results: if r.get("type") == "open-door" and r.get("approved") and r.get("method"): transformed.append( { "method": r.get("method"), "door": r["door"], "timestamp": r.get("timestamp"), "member": r.get("member"), } ) if r.get("event") == "card-swiped" and r.get("success"): transformed.append( { "method": "card-swiped", "door": r["door"], "timestamp": r.get("timestamp"), "member": r.get("inventory", {}).get("owner"), } ) return json(transformed, default=datetime_to_json_formatting) @app.route("/longpoll", stream=True) @authenticate_door async def view_longpoll(request): response = await request.respond(content_type="text/event-stream") await response.send("data: response-generator-started\n\n") pipeline = [ { "$match": { "operationType": "insert", } } ] try: async with app.ctx.db.eventlog.watch(pipeline) as stream: await response.send("data: watch-stream-opened\n\n") async for event in stream: ev = event["fullDocument"] if ev["approved"] != "true": continue if ev["type"] == "card": continue response.send("data: %s\n\n" % ev["door"]) except PyMongoError as e: print(e) await response.send("data: response-generator-ended\n\n") return # Called by the door to log a card swipe. Does not decide whether the door should be opened. @app.post("/swipe") @authenticate_door async def swipe(request): # authorize key = request.headers.get("KEY") data = request.json doors = set() if key == DOORBOY_SECRET_FLOOR: doors.update(["backdoor", "frontdoor", "grounddoor"]) if key == DOORBOY_SECRET_WORKSHOP: doors.add("workshopdoor") if data.get("door") not in doors: print("WARN: Door", repr(data.get("door")), "not in", doors) return text("Not allowed", 403) timestamp = ( parse(data["timestamp"]) if data.get("timestamp") else datetime.now(datetime.timezone.utc) ) # Update token, create if unknown await app.ctx.db.inventory.update_one( {"component": "doorboy", "type": "token", "token.uid_hash": data["uid_hash"]}, { "$set": {"last_seen": timestamp}, "$setOnInsert": { "first_seen": timestamp, "inventory": { "claimable": True, }, }, }, upsert=True, ) token = await app.ctx.db.inventory.find_one( {"component": "doorboy", "type": "token", "token.uid_hash": data["uid_hash"]} ) event_swipe = { "component": "doorboy", "method": "card", "timestamp": timestamp, "door": data["door"], "approved": data["approved"], "user": { "id": token.get("inventory", {}).get("owner", {}).get("username", ""), "name": token.get("inventory", {}) .get("owner", {}) .get("display_name", "Unclaimed Token"), }, "uid_hash": data["uid_hash"], } await app.ctx.db.eventlog.insert_one(event_swipe) return text("ok") if __name__ == "__main__": app.run( debug=False, host="0.0.0.0", port=5000, single_process=True, access_log=True )