Files
inventory-app/inventory-app/doorboy.py

368 lines
12 KiB
Python

from datetime import datetime, timedelta
from dateutil.parser import parse, ParserError
from bson.objectid import ObjectId
from flask import Blueprint, g, redirect, render_template, request, abort
from flask_wtf import FlaskForm
from pymongo import MongoClient
from wtforms import StringField, IntegerField, SelectField, BooleanField, DateTimeField, validators
from wtforms.validators import DataRequired
import pytz
import const
from api import check_api_key
from common import slack_post, User
from oidc import login_required, read_user
page_doorboy = Blueprint("doorboy", __name__)
db = MongoClient(const.MONGO_URI).get_default_database()
@page_doorboy.route("/m/doorboy/<event_id>/claim")
@login_required
def view_doorboy_claim(event_id):
user = read_user()
# Find swipe event OR token object by id to get card UID
event = db.inventory.find_one({
"_id": ObjectId(event_id)
})
# Find token object to associate with user
token = db.inventory.update_one({
"type": "token",
"token.uid_hash": event["token"]["uid_hash"],
"inventory.owner.username": { "$exists": False }
}, {
"$set": {
"token.enabled": datetime.utcnow(),
"inventory.owner.display_name": user["name"],
"inventory.owner.username": user["username"]
}
})
return redirect("/m/doorboy")
@page_doorboy.route("/m/doorboy/<token_id>/disable")
@login_required
def view_doorboy_disable(token_id):
user = read_user()
db.inventory.update_one({
"component": "doorboy",
"type": "token",
"_id": ObjectId(token_id),
"inventory.owner.username": user["username"]
}, {
"$set": {
"token.disabled": datetime.utcnow()
},
"$unset": {
"token.enabled": ""
}
})
return redirect("/m/doorboy")
@page_doorboy.route("/m/doorboy/<token_id>/enable")
@login_required
def view_doorboy_enable(token_id):
user = read_user()
db.inventory.update_one({
"component": "doorboy",
"type": "token",
"_id": ObjectId(token_id),
"inventory.owner.username": user["username"]
}, {
"$set": {
"token.enabled": datetime.utcnow(),
},
"$unset": {
"token.disabled": ""
}
})
return redirect("/m/doorboy")
class TokenEditForm(FlaskForm):
comment = StringField("Comment")
enabled = BooleanField("Enabled")
@page_doorboy.route("/m/doorboy/<token_id>/edit", methods=["GET"])
@login_required
def view_doorboy_edit(token_id):
user = read_user()
token = db.inventory.find_one({
"component": "doorboy",
"type": "token",
"_id": ObjectId(token_id),
"inventory.owner.username": user["username"]
})
form = TokenEditForm()
form.comment.data = token["token"].get("comment", "")
if token["token"].get("enabled"):
form.enabled.render_kw = {"checked": "checked"}
form.enabled.data = "y"
return render_template("doorboy_token_edit.html", form=form, token=token)
@page_doorboy.route("/m/doorboy/<token_id>/edit", methods=["POST"])
@login_required
def save_doorboy_edit(token_id):
user = read_user()
form = TokenEditForm(request.form)
if form.validate_on_submit():
db.inventory.update_one({
"component": "doorboy",
"type": "token",
"_id": ObjectId(token_id),
"inventory.owner.username": user["username"]
}, {
"$set": {
"token.comment": form.comment.data,
"token.enabled": form.enabled.data,
}
})
return redirect("/m/doorboy/me")
class HoldDoorForm(FlaskForm):
door_name = SelectField("Door name", choices=[(j,j) for j in ["grounddoor", "frontdoor", "backdoor"]], validators=[DataRequired()])
duration = IntegerField('Duration in seconds', validators=[DataRequired(), validators.NumberRange(min=5, max=21600)])
@page_doorboy.route("/m/doorboy/hold", methods=["POST"])
@login_required
def view_doorboy_hold():
user = read_user()
form = HoldDoorForm(request.form)
now = datetime.utcnow()
if form.validate_on_submit():
db.eventlog.insert_one({
"component": "doorboy",
"type": "hold",
"requester": user["name"],
"door": form.door_name.data,
"expires": datetime.utcnow() + timedelta(seconds=form.duration.data)
})
return redirect("/m/doorboy")
@page_doorboy.route("/m/doorboy/<door>/open")
@login_required
def view_doorboy_open(door):
user = read_user()
if door not in ("grounddoor", "frontdoor", "backdoor", "workshopdoor"):
return "", 400
if door == "workshopdoor":
access_group = "k-space:workshop"
else:
access_group = "k-space:floor"
approved = access_group in g.users_lookup.get(user["username"], User()).groups
db.eventlog.insert_one({
"method": "web",
"approved": approved,
"duration": 5,
"component": "doorboy",
"type": "open-door",
"door": door,
"member_id": user["username"],
"member": user["name"],
"timestamp": datetime.utcnow(),
})
status = "Permitted" if approved else "Denied"
subject = user["name"]
msg = "%s %s door access for %s via https://inventory.k-space.ee/m/doorboy" % (status, door, subject)
slack_post(msg, "doorboy")
if approved:
return redirect("/m/doorboy")
else:
return "", 401
@page_doorboy.route("/m/doorboy/slam", methods=["POST"])
@login_required
def view_doorboy_slam():
user = read_user()
db.eventlog.insert_one({
"component": "doorboy",
"type": "hold",
"requester": user["name"],
"door": form.door_name.data,
"expires": datetime.utcnow() + timedelta(minutes=form.duration_min.data)
})
return redirect("/m/doorboy")
@page_doorboy.route("/m/doorboy")
@login_required
def view_doorboy():
user = read_user()
workshop_access = "k-space:workshop" in g.users_lookup.get(user["username"], User()).groups
# latest_events = db.eventlog.find({"component": "doorboy", "type":"open-door"}).sort([("timestamp", -1)]).limit(10);
latest_swipes = db.inventory.find({"component": "doorboy", "type":"token"}).sort([("last_seen", -1)]).limit(10);
return render_template("doorboy.html", **locals())
@page_doorboy.route("/m/doorboy/user/<username>/cards")
@login_required(groups=["k-space:board", "k-space:kubernetes:admins"])
def view_user_cards(username):
return view_user_cards_inner(username)
@page_doorboy.route("/m/doorboy/me")
@login_required
def view_own_cards():
user = read_user()
return view_user_cards_inner(user["username"])
def view_user_cards_inner(username):
user = read_user()
subject_user = g.users_lookup.get(username)
if not subject_user:
return abort(404)
is_self = user["username"] == subject_user["username"]
cards = db.inventory.find({
"component": "doorboy",
"type":"token",
"inventory.owner.username": username
}).sort([("last_seen", -1)])
return render_template("doorboy_user.html", **locals())
@page_doorboy.route("/m/doorboy/admin")
@login_required(groups=["k-space:board", "k-space:kubernetes:admins"])
def view_doorboy_admin():
results = db.inventory.aggregate([
{ "$match": {"component": "doorboy", "type": "token"} },
{
"$group": {
"_id": "$inventory.owner.username",
"cards": {
"$push" : {"$mergeObjects": [
"$token",
{"last_seen": "$last_seen"},
{"_id": "$_id"},
{"old_display_name": "$inventory.owner.display_name"},
{"old_foreign_id": "$inventory.owner.foreign_id"}
]}
}
}
},
{ "$sort": { "_id" : 1 } }
])
user_keyfobs = {r["_id"] : r["cards"] for r in results}
orphaned_keyfobs = user_keyfobs.pop(None)
no_keyfobs = [u for u in g.users if not user_keyfobs.get(u.username)]
no_keyfobs = list(filter(lambda u : u.user_type == "person", no_keyfobs))
last_seen = {key : max(datetime_handle(card.get("last_seen")) for card in value) for key, value in user_keyfobs.items()}
orphaned_keyfobs = sorted(orphaned_keyfobs, key = lambda o : (not bool(o.get("comment")), o.get("comment", "")))
no_keyfobs = sorted(no_keyfobs, key = lambda u : u.display_name or u.username)
last_seen = dict(sorted(last_seen.items(), key=lambda i : datetime_handle(i[1]), reverse=True))
return render_template("doorboy_admin.html", **locals())
def datetime_handle(item):
if not item:
dt = datetime.min
elif type(item) is str:
try:
dt = parse(item)
except ParserError as e:
print(e)
dt = datetime.min
elif type(item) is datetime:
dt = item
else:
dt = datetime.min
try:
dt = pytz.UTC.localize(dt)
except ValueError:
pass
return dt
@page_doorboy.route("/m/doorboy/swipes")
@login_required
def view_doorboy_events():
user = read_user()
latest_events = db.eventlog.find({"component": "doorboy", "event":"card-swiped"}).sort([("timestamp", -1)]).limit(500);
return render_template("doorboy.html", **locals())
@page_doorboy.route("/m/doorboy/<token_id>/events")
@login_required
def view_doorboy_token_events(token_id):
user = read_user()
token = db.inventory.find_one({"_id": ObjectId(token_id)})
latest_events = db.eventlog.find({"component": "doorboy", "event":"card-swiped", "token.uid_hash": token.get("token").get("uid_hash")}).sort([("timestamp", -1)])
return render_template("doorboy.html", **locals())
class FormSwipe(FlaskForm):
class Meta:
csrf = False
uid = StringField('uid', validators=[])
uid_hash = StringField('uid', validators=[])
door = StringField('door', validators=[DataRequired()])
success = BooleanField('success', validators=[])
timestamp = DateTimeField('timestamp')
@page_doorboy.route("/m/doorboy/swipe", methods=["POST"])
@check_api_key
def view_swipe():
form = request.json
print(form)
timestamp = parse(form["timestamp"]) if form.get("timestamp") else None
now = datetime.utcnow()
# Make sure token exists
db.inventory.update_one({
"type": "token",
"component": "doorboy",
"token.uid_hash": form["uid_hash"]
}, {
"$set": {
"last_seen": timestamp or now
},
"$setOnInsert": {
"component": "doorboy",
"type": "token",
"first_seen": now,
"inventory": {
"claimable": True,
}
}
}, upsert=True)
# Fetch token to read owner
token = db.inventory.find_one({
"type": "token",
"component": "doorboy",
"token.uid_hash": form["uid_hash"]
})
event_swipe = {
"component": "doorboy",
"timestamp": timestamp,
"door": form["door"],
"event": "card-swiped",
"approved": form["approved"],
"token": {
"uid_hash": form["uid_hash"]
},
"inventory": {}
}
if token.get("inventory", {}).get("owner", {}).get("username", None):
event_swipe["inventory"]["owner_id"] = token["inventory"]["owner"]["username"]
db.eventlog.insert_one(event_swipe)
status = "Permitted" if form["approved"] else "Denied"
username = token.get("inventory", {}).get("owner", {}).get("username", None)
if username and username in g.users_lookup:
subject = g.users_lookup[username].display_name or username
else:
subject = "Unknown"
msg = "%s %s door access for %s identified by keycard/keyfob" % (status, form["door"], subject)
slack_post(msg, "doorboy")
return "ok"