Madis Mägi
8b7e220ec4
All checks were successful
ci/woodpecker/manual/woodpecker Pipeline was successful
161 lines
5.4 KiB
Python
Executable File
161 lines
5.4 KiB
Python
Executable File
#!/usr/bin/env python3
|
|
from datetime import date, datetime
|
|
from sanic import Sanic
|
|
from sanic.response import text, json
|
|
from sanic_prometheus import monitor
|
|
from motor.motor_asyncio import AsyncIOMotorClient
|
|
import httpx
|
|
import pymongo
|
|
import os
|
|
|
|
app = Sanic(__name__)
|
|
monitor(app).expose_endpoint()
|
|
|
|
INVENTORY_API_KEY = os.environ["INVENTORY_API_KEY"]
|
|
DOORBOY_SECRET_FLOOR = os.environ["DOORBOY_SECRET_FLOOR"]
|
|
DOORBOY_SECRET_WORKSHOP = os.environ["DOORBOY_SECRET_WORKSHOP"]
|
|
DOORBOY_SECRET_OPEN_EVENTS = os.environ["DOORBOY_SECRET_OPEN_EVENTS"]
|
|
CARD_URI = os.environ["CARD_URI"]
|
|
FLOOR_ACCESS_GROUP = os.environ["FLOOR_ACCESS_GROUP"]
|
|
WORKSHOP_ACCESS_GROUP = os.environ["WORKSHOP_ACCESS_GROUP"]
|
|
MONGO_URI = os.environ["MONGO_URI"]
|
|
SWIPE_URI = os.environ["SWIPE_URI"]
|
|
|
|
assert len(DOORBOY_SECRET_FLOOR) >= 10
|
|
assert len(DOORBOY_SECRET_WORKSHOP) >= 10
|
|
|
|
|
|
@app.listener("before_server_start")
|
|
async def setup_db(app, loop):
|
|
# TODO: find cleaner way to do this, for more see
|
|
# https://github.com/sanic-org/sanic/issues/919
|
|
app.ctx.db = AsyncIOMotorClient(MONGO_URI).get_default_database()
|
|
|
|
@app.route("/allowed")
|
|
async def view_doorboy_uids(request):
|
|
key = request.headers.get("KEY")
|
|
if not key or key not in [DOORBOY_SECRET_FLOOR, DOORBOY_SECRET_WORKSHOP]:
|
|
return text("how about no")
|
|
|
|
groups = []
|
|
if key == DOORBOY_SECRET_FLOOR:
|
|
groups.append(FLOOR_ACCESS_GROUP)
|
|
if key == DOORBOY_SECRET_WORKSHOP:
|
|
groups.append(WORKSHOP_ACCESS_GROUP)
|
|
if not groups:
|
|
return "fail", 500
|
|
async with httpx.AsyncClient() as client:
|
|
r = await client.post(CARD_URI, json={
|
|
"groups": groups
|
|
}, headers={
|
|
"Content-Type": "application/json",
|
|
"Authorization": f"Basic {INVENTORY_API_KEY}"
|
|
})
|
|
j = r.json()
|
|
allowed_uids = []
|
|
for obj in j:
|
|
allowed_uids.append({
|
|
"token": obj["token"]
|
|
})
|
|
return json({"allowed_uids": allowed_uids})
|
|
|
|
def datetime_to_json_formatting(o):
|
|
if isinstance(o, (date, datetime)):
|
|
return o.isoformat()
|
|
|
|
@app.route("/open-door-events")
|
|
async def view_open_door_events(request):
|
|
key = request.headers.get("KEY")
|
|
if not key or key != DOORBOY_SECRET_OPEN_EVENTS:
|
|
return text("Invalid token")
|
|
|
|
results = await app.ctx.db.eventlog.find({
|
|
"component": "doorboy",
|
|
"type": "open-door",
|
|
"$or": [
|
|
{ "approved": True },
|
|
{ "success": True },
|
|
],
|
|
"$or": [
|
|
{ "type": "open-door" },
|
|
{ "event": "card-swiped" },
|
|
],
|
|
"door": { "$exists": True },
|
|
"timestamp": { "$exists": True }
|
|
}).sort("timestamp", -1).to_list(length=None)
|
|
|
|
transformed = []
|
|
for r in results:
|
|
if r.get("type") == "open-door" and r.get("approved") and r.get("method"):
|
|
transformed.append({
|
|
"method": r.get("method"),
|
|
"door": r["door"],
|
|
"timestamp": r.get("timestamp"),
|
|
"member": r.get("member"),
|
|
})
|
|
if r.get("event") == "card-swiped" and r.get("success"):
|
|
transformed.append({
|
|
"method": "card-swiped",
|
|
"door": r["door"],
|
|
"timestamp": r.get("timestamp"),
|
|
"member": r.get("inventory", {}).get("owner")
|
|
})
|
|
|
|
return json(transformed, default=datetime_to_json_formatting)
|
|
|
|
@app.route("/longpoll", stream=True)
|
|
async def view_longpoll(request):
|
|
key = request.headers.get("KEY")
|
|
if not key or key not in [DOORBOY_SECRET_FLOOR, DOORBOY_SECRET_WORKSHOP]:
|
|
return text("Invalid token")
|
|
|
|
response = await request.respond(content_type="text/event-stream")
|
|
await response.send("data: response-generator-started\n\n")
|
|
pipeline = [
|
|
{
|
|
"$match": {
|
|
"operationType": "insert",
|
|
}
|
|
}
|
|
]
|
|
try:
|
|
async with app.ctx.db.eventlog.watch(pipeline) as stream:
|
|
await response.send("data: watch-stream-opened\n\n")
|
|
async for event in stream:
|
|
if event["fullDocument"].get("type") == "open-door" and event["fullDocument"].get("approved", False):
|
|
await response.send("data: %s\n\n" %
|
|
event["fullDocument"]["door"])
|
|
except pymongo.errors.PyMongoError as e:
|
|
print(e)
|
|
await response.send("data: response-generator-ended\n\n")
|
|
return
|
|
|
|
@app.post("/swipe")
|
|
async def forward_swipe(request):
|
|
key = request.headers.get("KEY")
|
|
if not key or key not in [DOORBOY_SECRET_FLOOR, DOORBOY_SECRET_WORKSHOP]:
|
|
return text("Invalid token", status=401)
|
|
data = request.json
|
|
doors = set()
|
|
if key == DOORBOY_SECRET_FLOOR:
|
|
doors.update(["backdoor", "frontdoor", "grounddoor"])
|
|
if key == DOORBOY_SECRET_WORKSHOP:
|
|
doors.add("workshopdoor")
|
|
if data.get("door") not in doors:
|
|
print("Door", repr(data.get("door")), "not in", doors)
|
|
return text("Not allowed", 403)
|
|
|
|
async with httpx.AsyncClient() as client:
|
|
r = await client.post(SWIPE_URI, json=data, headers={
|
|
"Content-Type": "application/json",
|
|
"Authorization": f"Basic {INVENTORY_API_KEY}"
|
|
})
|
|
if r.status_code == 200:
|
|
return text("ok")
|
|
else:
|
|
return text("Failed", 500)
|
|
|
|
|
|
if __name__ == "__main__":
|
|
app.run(debug=False, host="0.0.0.0", port=5000, single_process=True, access_log=True)
|