All checks were successful
ci/woodpecker/manual/woodpecker Pipeline was successful
113 lines
3.4 KiB
Python
113 lines
3.4 KiB
Python
import os
|
|
import re
|
|
import const
|
|
from datetime import datetime, timedelta
|
|
from functools import wraps
|
|
from pymongo import MongoClient
|
|
from flask import Blueprint, abort, g, make_response, redirect, render_template, request, jsonify
|
|
from common import CustomForm, build_query, flatten, format_name, spam, users, User
|
|
|
|
page_api = Blueprint("api", __name__)
|
|
db = MongoClient(const.MONGO_URI).get_default_database()
|
|
api_key = os.getenv("INVENTORY_API_KEY")
|
|
|
|
def check_api_key(f):
|
|
@wraps(f)
|
|
def decorated_function(*args, **kwargs):
|
|
request_key = request.headers.get('Authorization', False)
|
|
if not request_key:
|
|
return "nope", 403
|
|
found_key = re.search(r"Basic (.*)", request_key).group(1)
|
|
if not found_key or found_key != api_key:
|
|
return "nope", 403
|
|
return f(*args, **kwargs)
|
|
return decorated_function
|
|
|
|
@page_api.route("/users")
|
|
@check_api_key
|
|
def view_users():
|
|
resp = users
|
|
print(resp)
|
|
return jsonify(resp)
|
|
|
|
@page_api.route("/cards", methods=["POST"])
|
|
@check_api_key
|
|
def get_group_cards():
|
|
request_groups = request.json.get("groups", False)
|
|
if not request_groups:
|
|
return "must specify groups in parameter", 400
|
|
print(f"groups requested are: {request_groups}")
|
|
print(f"found users: {users}")
|
|
keys = []
|
|
for u in users:
|
|
for group in u.groups:
|
|
if group in request_groups:
|
|
keys.append(u.username)
|
|
break
|
|
print(f"keys are {keys}")
|
|
flt = {
|
|
"token.uid_hash": {"$exists": True},
|
|
"inventory.owner.username": {"$in": keys}
|
|
}
|
|
prj = {
|
|
"inventory.owner": True,
|
|
"token.uid_hash": True
|
|
}
|
|
found = []
|
|
for obj in db.inventory.find(flt, prj):
|
|
found.append({"token": obj["token"]})
|
|
fl = list(found)
|
|
print(f"found tokens are: {fl}")
|
|
return jsonify(fl)
|
|
|
|
@page_api.route("/api/slack/doorboy", methods=['POST'])
|
|
def view_slack_doorboy():
|
|
print(request.data)
|
|
if request.form.get("token") != "FSh3r8UE1vFHP4GrAn8SgZUY":
|
|
return "Invalid token was supplied"
|
|
if request.form.get("channel_id") not in ("C01CWPF5H8W", "CDL9H8Q9W"):
|
|
return "Invalid channel was supplied"
|
|
command = request.form.get("command")
|
|
try:
|
|
door = {
|
|
"/open-new-door": "backdoor",
|
|
"/open-back-door": "backdoor",
|
|
"/open-front-door": "frontdoor",
|
|
"/open-ground-door": "grounddoor",
|
|
"/open-workshop-door": "workshopdoor"
|
|
}[command]
|
|
except KeyError:
|
|
return "Invalid command was supplied"
|
|
|
|
member = None
|
|
print(users)
|
|
for user in users:
|
|
if user.slack_id == request.form.get("user_id"):
|
|
member = user
|
|
|
|
if door == "workshopdoor":
|
|
access_group = "k-space:workshop"
|
|
else:
|
|
access_group = "k-space:floor"
|
|
approved = access_group in member.groups
|
|
db.eventlog.insert_one({
|
|
"method": "slack",
|
|
"approved": approved,
|
|
"duration": 5,
|
|
"component": "doorboy",
|
|
"type": "open-door",
|
|
"door": door,
|
|
"member_id": member.username,
|
|
"member": member.display_name,
|
|
"timestamp": datetime.utcnow(),
|
|
})
|
|
|
|
status = "Permitted" if approved else "Denied"
|
|
subject = member.display_name
|
|
msg = "%s %s door access for %s via Slack bot" % (status, door, subject)
|
|
spam(msg)
|
|
if approved:
|
|
return "Opening %s for %s" % (door, subject)
|
|
else:
|
|
return "Permission denied"
|