Files
doorboy-proxy/app/doorboy-proxy.py
2025-08-08 03:46:37 +03:00

228 lines
6.8 KiB
Python
Executable File

#!/usr/bin/env python3
import os
from datetime import date, datetime
from functools import wraps
from typing import List
from dateutil.parser import parse
from motor.motor_asyncio import AsyncIOMotorClient
from pymongo.errors import PyMongoError
from sanic import Sanic
from sanic.response import json, text
from sanic_prometheus import monitor
import kube
import slack
app = Sanic(__name__)
slack.add_routes(app)
monitor(app).expose_endpoint()
# API key for godoor controllers authenticating to k-space:floor
DOORBOY_SECRET_FLOOR = os.environ["DOORBOY_SECRET_FLOOR"]
# API key for godoor controllers authenticating to k-space:workshop
DOORBOY_SECRET_WORKSHOP = os.environ["DOORBOY_SECRET_WORKSHOP"]
FLOOR_ACCESS_GROUP = os.getenv("FLOOR_ACCESS_GROUP", "k-space:floor")
WORKSHOP_ACCESS_GROUP = os.getenv("WORKSHOP_ACCESS_GROUP", "k-space:workshop")
MONGO_URI = os.environ["MONGO_URI"]
assert len(DOORBOY_SECRET_FLOOR) >= 10
assert len(DOORBOY_SECRET_WORKSHOP) >= 10
@app.listener("before_server_start")
async def setup_db(app, loop):
# TODO: find cleaner way to do this, for more see
# https://github.com/sanic-org/sanic/issues/919
app.ctx.db = AsyncIOMotorClient(MONGO_URI).get_default_database()
def authenticate_door(wrapped):
def decorator(f):
@wraps(f)
async def decorated_function(request, *args, **kwargs):
doorboy_secret = request.headers.get("KEY")
if doorboy_secret not in [DOORBOY_SECRET_FLOOR, DOORBOY_SECRET_WORKSHOP]:
return text("Invalid doorboy secret token", status=401)
return await f(request, *args, **kwargs)
return decorated_function
return decorator(wrapped)
@app.route("/allowed")
@authenticate_door
async def view_doorboy_uids(request):
users = List[str]
# authorize
key = request.headers.get("KEY")
if key == DOORBOY_SECRET_FLOOR:
users = kube.users_with_group(FLOOR_ACCESS_GROUP)
elif key == DOORBOY_SECRET_WORKSHOP:
users = kube.users_with_group(WORKSHOP_ACCESS_GROUP)
else:
print("WARN: unknown door token in /allowed")
return "unknown doorboy secret token", 403
flt = {
"token.uid_hash": {"$exists": True},
"inventory.owner.username": {"$in": users},
}
prj = {"token.uid_hash": True}
tokens = await app.ctx.db.inventory.find(flt, prj)
print(f"delegating {len(tokens)} doorkey tokens")
return json({"allowed_uids": tokens})
def datetime_to_json_formatting(o):
if isinstance(o, (date, datetime)):
return o.isoformat()
# Only identified and successful events. **Endpoint not in use.**
@app.route("/logs")
async def view_open_door_events(request):
return text("not an admin"), 403
results = (
await app.ctx.db.eventlog.find(
{
"component": "doorboy",
"type": "open-door",
"approved": True,
"$or": [
{"type": "open-door"},
{"event": "card-swiped"},
],
"door": {"$exists": True},
"timestamp": {"$exists": True},
}
)
.sort("timestamp", -1)
.to_list(length=None)
)
transformed = []
for r in results:
if r.get("type") == "open-door" and r.get("approved") and r.get("method"):
transformed.append(
{
"method": r.get("method"),
"door": r["door"],
"timestamp": r.get("timestamp"),
"member": r.get("member"),
}
)
if r.get("event") == "card-swiped" and r.get("success"):
transformed.append(
{
"method": "card-swiped",
"door": r["door"],
"timestamp": r.get("timestamp"),
"member": r.get("inventory", {}).get("owner"),
}
)
return json(transformed, default=datetime_to_json_formatting)
@app.route("/longpoll", stream=True)
@authenticate_door
async def view_longpoll(request):
response = await request.respond(content_type="text/event-stream")
await response.send("data: response-generator-started\n\n")
pipeline = [
{
"$match": {
"operationType": "insert",
}
}
]
try:
async with app.ctx.db.eventlog.watch(pipeline) as stream:
await response.send("data: watch-stream-opened\n\n")
async for event in stream:
ev = event["fullDocument"]
if ev["approved"] != "true":
continue
if ev["type"] == "token":
continue
response.send("data: %s\n\n" % ev["door"])
except PyMongoError as e:
print(e)
await response.send("data: response-generator-ended\n\n")
return
# Called by the door to log a card swipe. Does not decide whether the door should be opened.
@app.post("/swipe")
@authenticate_door
async def swipe(request):
# authorize
key = request.headers.get("KEY")
data = request.json
doors = set()
if key == DOORBOY_SECRET_FLOOR:
doors.update(["backdoor", "frontdoor", "grounddoor"])
if key == DOORBOY_SECRET_WORKSHOP:
doors.add("workshopdoor")
if data.get("door") not in doors:
print("WARN: Door", repr(data.get("door")), "not in", doors)
return text("Not allowed", 403)
timestamp = (
parse(data["timestamp"])
if data.get("timestamp")
else datetime.now(datetime.timezone.utc)
)
# Update token, create if unknown
await app.ctx.db.inventory.update_one(
{"component": "doorboy", "type": "token", "token.uid_hash": data["uid_hash"]},
{
"$set": {"last_seen": timestamp},
"$setOnInsert": {
"first_seen": timestamp,
"inventory": {
"claimable": True,
},
},
},
upsert=True,
)
token = await app.ctx.db.inventory.find_one(
{"component": "doorboy", "type": "token", "token.uid_hash": data["uid_hash"]}
)
event_swipe = {
"component": "doorboy",
"method": "token",
"timestamp": timestamp,
"door": data["door"],
"event": "card-swiped",
"approved": data["approved"],
"uid_hash": data["uid_hash"],
"user": {
"id": token.get("inventory", {}).get("owner", {}).get("username", ""),
"name": token.get("inventory", {})
.get("owner", {})
.get("display_name", "Unclaimed Token"),
},
}
await app.ctx.db.eventlog.insert_one(event_swipe)
return text("ok")
if __name__ == "__main__":
app.run(
debug=False, host="0.0.0.0", port=5000, single_process=True, access_log=True
)