#!/usr/bin/env python3 import os from datetime import datetime, timezone from functools import wraps from typing import List import kube from dateutil.parser import parse from motor.motor_asyncio import AsyncIOMotorClient from pymongo.errors import PyMongoError from sanic import Sanic from sanic.response import json, text from sanic_prometheus import monitor from slack import slack_app app = Sanic(__name__) app.blueprint(slack_app) monitor(app).expose_endpoint() # API key for godoor controllers authenticating to k-space:floor DOORBOY_SECRET_FLOOR = os.environ["DOORBOY_SECRET_FLOOR"] # API key for godoor controllers authenticating to k-space:workshop DOORBOY_SECRET_WORKSHOP = os.environ["DOORBOY_SECRET_WORKSHOP"] MONGO_URI = os.environ["MONGO_URI"] if len(DOORBOY_SECRET_FLOOR) < 10: raise ValueError("DOORBOY_SECRET_FLOOR must be at least 10 characters") if len(DOORBOY_SECRET_WORKSHOP) < 10: raise ValueError("DOORBOY_SECRET_WORKSHOP must be at least 10 characters") @app.listener("before_server_start") async def setup_db(app): # TODO: find cleaner way to do this, for more see # https://github.com/sanic-org/sanic/issues/919 app.ctx.db = AsyncIOMotorClient(MONGO_URI).get_default_database() def authenticate_door(f): @wraps(f) async def decorated_function(request, *args, **kwargs): doorboy_secret = request.headers.get("KEY") if doorboy_secret not in [DOORBOY_SECRET_FLOOR, DOORBOY_SECRET_WORKSHOP]: return text("Invalid doorboy secret token", status=401) return await f(request, *args, **kwargs) return decorated_function # Door controllers query uid_hashes for offline use. There is no online uid_hash authn/z, only logging. @app.route("/allowed") @authenticate_door async def view_doorboy_uids(request): users: List[str] = [] # authorize key = request.headers.get("KEY") if key == DOORBOY_SECRET_FLOOR: users = kube.users_with_group("k-space:floor") elif key == DOORBOY_SECRET_WORKSHOP: users = kube.users_with_group("k-space:workshop") else: print("WARN: unknown door token in /allowed") return text("unknown doorboy secret token", status=403) flt = { "token.uid_hash": {"$exists": True}, "token.enabled": True, "inventory.owner.username": {"$in": users}, } prj = {"token.uid_hash": True} tokensRaw = await request.app.ctx.db.inventory.find(flt, prj).to_list() tokens = [item["token"]["uid_hash"] for item in tokensRaw] doorname = request.headers.get("DOOR_NAME") print(f"delegating {len(tokens)} doorkey tokens to {doorname}") # Determine which doors this key controls, then compute keep_open_until # for the requesting controller's door (newest active approved hold wins). if key == DOORBOY_SECRET_FLOOR: door_set = {"backdoor", "frontdoor", "grounddoor"} elif key == DOORBOY_SECRET_WORKSHOP: door_set = {"workshopdoor"} else: door_set = set() keep_open_until = None if doorname in door_set: hold = await request.app.ctx.db.doorlog.find_one( {"method": "hold", "door": doorname, "approved": True}, sort=[("timestamp", -1)], ) # DB datetimes are naive UTC; attach tzinfo only for the serialized # output per integration contract. now = datetime.now(timezone.utc).replace(tzinfo=None) if hold and hold.get("expires") and hold["expires"] > now: keep_open_until = hold["expires"].replace(tzinfo=timezone.utc).isoformat() return json({"allowed_hashes": tokens, "keep_open_until": keep_open_until}) # Only identified and successful events. **Endpoint not in use.** # The query logic below is preserved as reference for future re-enablement; # see git history for the original implementation. @app.route("/logs") async def view_open_door_events(request): return text("not an admin"), 403 # Door controllers listen for open events (web, slack). @app.route("/longpoll", stream=True) @authenticate_door async def view_longpoll(request): response = await request.respond(content_type="text/event-stream") await response.send("data: response-generator-started\n\n") pipeline = [ { "$match": { "operationType": "insert", } } ] try: async with request.app.ctx.db.doorlog.watch(pipeline) as stream: await response.send("data: watch-stream-opened\n\n") async for event in stream: ev = event["fullDocument"] if not ev["approved"]: continue if ev["method"] == "card": continue if ev["method"] == "hold": continue print("realtime opening %s" % ev["door"]) await response.send("data: %s\n\n" % ev["door"]) except PyMongoError as e: print(e) await response.send("data: response-generator-ended\n\n") return # Door controller reporting a card swipe. No authn/z about the event (done offline on door controller), only logging. @app.post("/swipe") @authenticate_door async def swipe(request): # authorize key = request.headers.get("KEY") data = request.json doors = set() # this mapping also duplicated to slack.py if key == DOORBOY_SECRET_FLOOR: doors.update(["backdoor", "frontdoor", "grounddoor"]) if key == DOORBOY_SECRET_WORKSHOP: doors.add("workshopdoor") if data.get("door") not in doors: print("WARN: Door", repr(data.get("door")), "not in", doors) return text("Not allowed", 403) timestamp = ( parse(data["timestamp"]) if data.get("timestamp") else datetime.now(timezone.utc) ) # Update token, create if unknown await request.app.ctx.db.inventory.update_one( {"component": "doorboy", "type": "token", "token.uid_hash": data["uid_hash"]}, { "$set": {"last_seen": timestamp}, "$setOnInsert": { "first_seen": timestamp, "inventory": { "claimable": True, }, }, }, upsert=True, ) token = await request.app.ctx.db.inventory.find_one( {"component": "doorboy", "type": "token", "token.uid_hash": data["uid_hash"]} ) event_swipe = { "method": "card", "timestamp": timestamp, "door": data["door"], "approved": data["approved"], "user": token.get("inventory", {}).get("owner", {}).get("username", ""), "userExtra": data["uid_hash"], } await request.app.ctx.db.doorlog.insert_one(event_swipe) return text("ok") if __name__ == "__main__": app.run( debug=False, host="0.0.0.0", port=5000, single_process=True, access_log=True )