from datetime import datetime, timedelta from dateutil.parser import parse, ParserError from bson.objectid import ObjectId from flask import Blueprint, g, redirect, render_template, request, abort from flask_wtf import FlaskForm from pymongo import MongoClient from wtforms import StringField, IntegerField, SelectField, BooleanField, DateTimeField, validators from wtforms.validators import DataRequired import pytz import const from api import check_api_key from common import spam, User from oidc import login_required, read_user page_doorboy = Blueprint("doorboy", __name__) db = MongoClient(const.MONGO_URI).get_default_database() @page_doorboy.route("/m/doorboy//claim") @login_required def view_doorboy_claim(event_id): user = read_user() # Find swipe event OR token object by id to get card UID event = db.inventory.find_one({ "_id": ObjectId(event_id) }) # Find token object to associate with user token = db.inventory.update_one({ "type": "token", "token.uid_hash": event["token"]["uid_hash"], "inventory.owner.username": { "$exists": False } }, { "$set": { "token.enabled": datetime.utcnow(), "inventory.owner.display_name": user["name"], "inventory.owner.username": user["username"] } }) return redirect("/m/doorboy") @page_doorboy.route("/m/doorboy//disable") @login_required def view_doorboy_disable(token_id): user = read_user() db.inventory.update_one({ "component": "doorboy", "type": "token", "_id": ObjectId(token_id), "inventory.owner.username": user["username"] }, { "$set": { "token.disabled": datetime.utcnow() }, "$unset": { "token.enabled": "" } }) return redirect("/m/doorboy") @page_doorboy.route("/m/doorboy//enable") @login_required def view_doorboy_enable(token_id): user = read_user() db.inventory.update_one({ "component": "doorboy", "type": "token", "_id": ObjectId(token_id), "inventory.owner.username": user["username"] }, { "$set": { "token.enabled": datetime.utcnow(), }, "$unset": { "token.disabled": "" } }) return redirect("/m/doorboy") class TokenEditForm(FlaskForm): comment = StringField("Comment") enabled = BooleanField("Enabled") @page_doorboy.route("/m/doorboy//edit", methods=["GET"]) @login_required def view_doorboy_edit(token_id): user = read_user() token = db.inventory.find_one({ "component": "doorboy", "type": "token", "_id": ObjectId(token_id), "inventory.owner.username": user["username"] }) form = TokenEditForm() form.comment.data = token["token"].get("comment", "") if token["token"].get("enabled"): form.enabled.render_kw = {"checked": "checked"} form.enabled.data = "y" return render_template("doorboy_token_edit.html", form=form, token=token) @page_doorboy.route("/m/doorboy//edit", methods=["POST"]) @login_required def save_doorboy_edit(token_id): user = read_user() form = TokenEditForm(request.form) if form.validate_on_submit(): db.inventory.update_one({ "component": "doorboy", "type": "token", "_id": ObjectId(token_id), "inventory.owner.username": user["username"] }, { "$set": { "token.comment": form.comment.data, "token.enabled": form.enabled.data, } }) return redirect("/m/doorboy/me") class HoldDoorForm(FlaskForm): door_name = SelectField("Door name", choices=[(j,j) for j in ["grounddoor", "frontdoor", "backdoor"]], validators=[DataRequired()]) duration = IntegerField('Duration in seconds', validators=[DataRequired(), validators.NumberRange(min=5, max=21600)]) @page_doorboy.route("/m/doorboy/hold", methods=["POST"]) @login_required def view_doorboy_hold(): user = read_user() form = HoldDoorForm(request.form) now = datetime.utcnow() if form.validate_on_submit(): db.eventlog.insert_one({ "component": "doorboy", "type": "hold", "requester": user["name"], "door": form.door_name.data, "expires": datetime.utcnow() + timedelta(seconds=form.duration.data) }) return redirect("/m/doorboy") @page_doorboy.route("/m/doorboy//open") @login_required def view_doorboy_open(door): user = read_user() if door not in ("grounddoor", "frontdoor", "backdoor", "workshopdoor"): return "", 400 if door == "workshopdoor": access_group = "k-space:workshop" else: access_group = "k-space:floor" approved = access_group in g.users_lookup.get(user["username"], User()).groups db.eventlog.insert_one({ "method": "web", "approved": approved, "duration": 5, "component": "doorboy", "type": "open-door", "door": door, "member_id": user["username"], "member": user["name"], "timestamp": datetime.utcnow(), }) status = "Permitted" if approved else "Denied" subject = user["name"] msg = "%s %s door access for %s via https://inventory.k-space.ee/m/doorboy" % (status, door, subject) spam(msg) if approved: return redirect("/m/doorboy") else: return "", 401 @page_doorboy.route("/m/doorboy/slam", methods=["POST"]) @login_required def view_doorboy_slam(): user = read_user() db.eventlog.insert_one({ "component": "doorboy", "type": "hold", "requester": user["name"], "door": form.door_name.data, "expires": datetime.utcnow() + timedelta(minutes=form.duration_min.data) }) return redirect("/m/doorboy") @page_doorboy.route("/m/doorboy") @login_required def view_doorboy(): user = read_user() workshop_access = "k-space:workshop" in g.users_lookup.get(user["username"], User()).groups latest_events = db.eventlog.find({"component": "doorboy", "type":"open-door"}).sort([("timestamp", -1)]).limit(10); latest_swipes = db.inventory.find({"component": "doorboy", "type":"token"}).sort([("last_seen", -1)]).limit(10); return render_template("doorboy.html", **locals()) @page_doorboy.route("/m/doorboy/user//cards") @login_required(groups=["k-space:board", "k-space:kubernetes:admins"]) def view_user_cards(username): return view_user_cards_inner(username) @page_doorboy.route("/m/doorboy/me") @login_required def view_own_cards(): user = read_user() return view_user_cards_inner(user["username"]) def view_user_cards_inner(username): user = read_user() subject_user = g.users_lookup.get(username) if not subject_user: return abort(404) is_self = user["username"] == subject_user["username"] cards = db.inventory.find({ "component": "doorboy", "type":"token", "inventory.owner.username": username }).sort([("last_seen", -1)]) return render_template("doorboy_user.html", **locals()) @page_doorboy.route("/m/doorboy/admin") @login_required(groups=["k-space:board", "k-space:kubernetes:admins"]) def view_doorboy_admin(): results = db.inventory.aggregate([ { "$match": {"component": "doorboy", "type": "token"} }, { "$group": { "_id": "$inventory.owner.username", "cards": { "$push" : {"$mergeObjects": [ "$token", {"last_seen": "$last_seen"}, {"_id": "$_id"} ]} } } }, { "$sort": { "_id" : 1 } } ]) user_keyfobs = {r["_id"] : r["cards"] for r in results} orphaned_keyfobs = user_keyfobs.pop(None) no_keyfobs = [u for u in g.users if not user_keyfobs.get(u.username)] for u in no_keyfobs: if u.user_type != "person": del no_keyfobs[u.username] last_seen = {key : max(datetime_handle(card.get("last_seen")) for card in value) for key, value in user_keyfobs.items()} orphaned_keyfobs = sorted(orphaned_keyfobs, key = lambda o : (not bool(o.get("comment")), o.get("comment", ""))) no_keyfobs = sorted(no_keyfobs, key = lambda u : u.display_name or u.username) last_seen = dict(sorted(last_seen.items(), key=lambda i : datetime_handle(i[1]), reverse=True)) return render_template("doorboy_admin.html", **locals()) def datetime_handle(item): if not item: dt = datetime.min elif type(item) is str: try: dt = parse(item) except ParserError as e: print(e) dt = datetime.min elif type(item) is datetime: dt = item else: dt = datetime.min try: dt = pytz.UTC.localize(dt) except ValueError: pass return dt @page_doorboy.route("/m/doorboy/swipes") @login_required def view_doorboy_events(): user = read_user() latest_events = db.eventlog.find({"component": "doorboy", "event":"card-swiped"}).sort([("timestamp", -1)]).limit(500); return render_template("doorboy.html", **locals()) @page_doorboy.route("/m/doorboy//events") @login_required def view_doorboy_token_events(token_id): user = read_user() token = db.inventory.find_one({"_id": ObjectId(token_id)}) latest_events = db.eventlog.find({"component": "doorboy", "event":"card-swiped", "token.uid_hash": token.get("token").get("uid_hash")}).sort([("timestamp", -1)]) return render_template("doorboy.html", **locals()) class FormSwipe(FlaskForm): class Meta: csrf = False uid = StringField('uid', validators=[]) uid_hash = StringField('uid', validators=[]) door = StringField('door', validators=[DataRequired()]) success = BooleanField('success', validators=[]) timestamp = DateTimeField('timestamp') @page_doorboy.route("/m/doorboy/swipe", methods=["POST"]) @check_api_key def view_swipe(): form = request.json print(form) timestamp = parse(form["timestamp"]) if form.get("timestamp") else None now = datetime.utcnow() # Make sure token exists db.inventory.update_one({ "type": "token", "component": "doorboy", "token.uid_hash": form["uid_hash"] }, { "$set": { "last_seen": timestamp or now }, "$setOnInsert": { "component": "doorboy", "type": "token", "first_seen": now, "inventory": { "claimable": True, } } }, upsert=True) # Fetch token to read owner token = db.inventory.find_one({ "type": "token", "component": "doorboy", "token.uid_hash": form["uid_hash"] }) event_swipe = { "component": "doorboy", "timestamp": timestamp, "door": form["door"], "event": "card-swiped", "success": form["success"], "token": { "uid_hash": form["uid_hash"] }, "inventory": {} } if token.get("inventory", {}).get("owner", {}).get("username", None): event_swipe["inventory"]["owner_id"] = token["inventory"]["owner"]["username"] db.eventlog.insert_one(event_swipe) status = "Permitted" if form["success"] else "Denied" username = token.get("inventory", {}).get("owner", {}).get("username", None) if username and username in g.users_lookup: subject = g.users_lookup[username].display_name or username else: subject = "Unknown" msg = "%s %s door access for %s identified by keycard/keyfob" % (status, form["door"], subject) spam(msg) return "ok"