Files
inventory-app/inventory-app/inventory.py
rasmus 04a7e18e55
All checks were successful
ci/woodpecker/push/woodpecker Pipeline was successful
make unlisted the default
The idea for public was to pick up highlights for the public listing.
2025-01-22 20:01:42 +02:00

649 lines
22 KiB
Python

import re
import boto3
import pymongo
import urllib
from datetime import datetime, date, timedelta
from botocore.exceptions import ClientError
from bson.objectid import ObjectId
from flask import Blueprint, abort, g, make_response, redirect, render_template, request, url_for
from jpegtran import JPEGImage
from pymongo import MongoClient
from werkzeug.utils import secure_filename
from wtforms import BooleanField, SelectField, StringField, validators
from wtforms.fields import FormField, TextAreaField
from wtforms.form import Form
from wtforms.validators import Length
import const
from common import CustomForm, build_query, flatten, format_name
from oidc import page_oidc, login_required, read_user, do_login
page_inventory = Blueprint("inventory", __name__)
db = MongoClient(const.MONGO_URI).get_default_database()
channel = "inventory"
@login_required
@page_inventory.route("/m/inventory/by-mac/<mac>", methods=['GET'])
def view_inventory_by_mac(mac):
if not check_mac_address_valid(mac):
return "Invalid mac address", 400
mac = mac.lower()
item = db.inventory.find_one({ "mac": mac }, projection = { "_id": 1})
if not item or not item.get("_id", False):
return redirect("/m/inventory/assign-mac/%s" % mac)
return redirect(url_for("inventory.view_inventory_view", item_id = item["_id"]))
@page_inventory.route("/m/inventory/<item_id>/view")
def view_inventory_view(item_id):
user = read_user()
template = "inventory_view.html"
item = db.inventory.find_one({ "_id": ObjectId(item_id) })
if not item:
return abort(404)
item_user = item.get("inventory", {}).get("user", {}).get("username", None)
item_owner = item.get("inventory", {}).get("owner", {}).get("username", None)
if not user:
if item["inventory"].get("visibility") not in ["public"]:
return do_login()
template = "inventory_view_public.html"
redirect_url = urllib.parse.quote_plus(request.full_path)
elif item["inventory"].get("visibility") == "private" and item_owner != user["username"] and "k-space:inventory:edit" not in user.get("groups", []):
return abort(403)
else:
can_audit = "k-space:inventory:audit" in user.get("groups", [])
can_edit = check_edit_permission(item_id)
is_using = item_user and item_user == user["username"]
photo_url = get_image_url(item_id)
return render_template(template , **locals())
def get_image_url(item_id):
bucket=get_bucket()
try:
return bucket.generate_presigned_url(
ClientMethod='get_object',
Params={
'Bucket': const.BUCKET_NAME,
'Key': item_id
},
ExpiresIn=3600
)
except ClientError:
return None
def fetch_members_select():
top_usernames= ['k-space', None]
choices = [(None, None)]
for username, user in g.users_lookup.items():
choices.append((username, user.display_name or username))
choices = list(set(choices))
choices = sorted(choices, key = lambda k: k[1] or "")
top = [k for k in choices if k[0] in top_usernames]
rest = [k for k in choices if k[0] not in top_usernames]
return top + rest
def fetch_type_select():
objs = db.inventory.distinct("type")
choices = set()
choices.add("")
for obj in objs:
choices.add(obj)
return choices
def setup_user_select(select_field, name_fields):
existing_id = None
if select_field.data and select_field.data != 'None':
existing_id = select_field.data
select_field.choices = fetch_members_select()
if existing_id:
values = dict(select_field.choices)
value = values.get(existing_id)
for name_field in name_fields:
name_field.data = value
def setup_default_owner(select_field):
user = read_user()
c = select_field.choices
d = [i for i in c if i[0] == (user and user.get("username") or "k-space")]
if len(d) < 1:
return
dv = d[0] or None
select_field.default = dv[0]
select_field.process([])
def member_select_coerce(x):
x = str(x)
if x and x != "None":
return x
else:
return None
class MemberForm(Form):
label = None
username = SelectField(choices=[], coerce=member_select_coerce)
display_name = StringField()
def __init__(self, label=None, *args, **kwargs):
super(MemberForm, self).__init__(*args, **kwargs)
self.label = label
setup_user_select(self.username, [self.display_name])
class InventoryForm(Form):
owner = FormField(MemberForm, label="Owner")
user = FormField(MemberForm, label="Current User")
usable = BooleanField("Usable")
visibility = SelectField("Visibility", choices=['unlisted', 'public', 'private'])
class HardwareForm(Form):
serial = StringField("Serial Number")
product = StringField("Product")
vendor = StringField("Vendor")
class ShortenerForm(Form):
slug = StringField("Slug", [Length(max=4)], [lambda x: x or None])
url = StringField("URL", [], [lambda x: x or None])
class InventoryItemForm(CustomForm):
type = SelectField("Type", choices=fetch_type_select())
name = StringField('Name')
external_link = StringField('External link')
comment = StringField('Comment')
inventory = FormField(InventoryForm)
hardware = FormField(HardwareForm)
shortener = FormField(ShortenerForm)
_description_placeholder = "Insert Markdown here."
description = TextAreaField('Description', [validators.optional(), validators.length(max=3000)], render_kw={"placeholder": _description_placeholder})
location = StringField('Location')
def setup_defaults_add(self):
setup_default_owner(self.inventory.form.owner.form.username)
def check_edit_permission(item_id):
if not item_id:
return False
user = read_user()
if not user:
return False
item = db.inventory.find_one(filter = { "_id": ObjectId(item_id) }, projection = { "inventory.owner": 1 , "type": 1})
if not item:
return False
user_groups = user.get("groups", [])
if item.get("type") == "key" and "k-space:inventory:keys" not in user_groups:
return False
if "k-space:inventory:edit" in user_groups:
return True
item_username = item.get("inventory", {}).get("owner", {}).get("username", False)
user_username = user.get("username", False)
if not item_username or not user_username:
return False
return item_username == user_username
@page_inventory.route("/m/inventory/<item_id>/edit", methods=['GET'])
@page_inventory.route("/m/inventory/<item_id>/edit-by-slug/<slug>", methods=['GET'])
@page_inventory.route("/m/inventory/add", methods=['GET'])
@page_inventory.route("/m/inventory/add-by-slug/<slug>", methods=['GET'])
@page_inventory.route("/m/inventory/<clone_item_id>/clone", methods=['GET'])
@page_inventory.route("/m/inventory/<clone_item_id>/clone-by-slug/<slug>", methods=['GET'])
@login_required
def view_inventory_edit(item_id=None, slug=None, clone_item_id=None):
user = read_user()
has_edit_all = user and "k-space:inventory:edit" in user.get("groups", [])
item = None
if item_id:
if not check_edit_permission(item_id):
return abort(403)
item = db.inventory.find_one({ "_id": ObjectId(item_id) })
form = InventoryItemForm(**item)
elif clone_item_id:
item = db.inventory.find_one({ "_id": ObjectId(clone_item_id) })
item.pop("_id", None)
item.pop("shortener", None)
item.pop("name", None)
if "hardware" in item:
item["hardware"].pop("serial", None)
form = InventoryItemForm(**item)
else:
form = InventoryItemForm()
form.setup_defaults_add()
if slug:
form.shortener.slug.data = slug
all_tags = db.inventory.distinct("tags")
item_tags = []
if item and item.get("tags"):
item_tags = item["tags"]
return render_template("inventory_edit.html", **locals())
@page_inventory.route("/m/inventory/<item_id>/edit", methods=['POST'])
@page_inventory.route("/m/inventory/<item_id>/edit-by-slug/<slug>", methods=['POST'])
@page_inventory.route("/m/inventory/add", methods=['POST'])
@page_inventory.route("/m/inventory/add-by-slug/<slug>", methods=['POST'])
@page_inventory.route("/m/inventory/<clone_item_id>/clone", methods=['POST'])
@page_inventory.route("/m/inventory/<clone_item_id>/clone-by-slug/<slug>", methods=['POST'])
@login_required
def save_inventory_item(item_id=None, **_):
if item_id and not check_edit_permission(item_id):
return abort(403)
form = InventoryItemForm(request.form)
if not form.validate_on_submit():
has_errors = True
return render_template("inventory_edit.html", **locals())
d = {}
form.populate_dict(d)
d['tags'] = list(set(request.form.getlist('tags[]')))
custom_errors = {}
try:
if item_id:
df = flatten(d)
r = {}
for k, v in dict(df).items():
if k.startswith("shortener.") and not v:
df.pop(k)
r[k] = v
if k.startswith("inventory.user.") and not v:
df.pop(k)
r["inventory.user"] = ""
if k.startswith("inventory.owner.") and not v:
df.pop(k)
r["inventory.owner"] = ""
db.inventory.update_one({
"_id": ObjectId(item_id)
}, {
"$set": df,
"$unset": r
})
else:
d["inventory"]["managed"] = True
if "shortener" in d:
if not d["shortener"]["slug"]:
d.pop("shortener")
if "user" in d["inventory"]:
if not d["inventory"]["user"]["username"]:
d["inventory"].pop("user")
if "owner" in d["inventory"]:
if not d["inventory"]["owner"]["username"]:
d["inventory"].pop("owner")
item_id = db.inventory.insert_one(d).inserted_id
except pymongo.errors.DuplicateKeyError:
has_errors = True
slug_error = "Slug %s already is assigned" % d["shortener"]["slug"]
custom_errors["Shortener"] = slug_error
return render_template("inventory_edit.html", **locals())
return redirect("/m/inventory/%s/view" % item_id)
@page_inventory.route("/m/inventory/<item_id>/archive", methods=['POST'])
@login_required
def archive_inventory_item(item_id):
item = db.inventory.find_one({"_id": ObjectId(item_id)})
if not item:
return abort(404)
if not check_edit_permission(item_id):
return abort(403)
db.archive.insert_one(item);
db.inventory.delete_one({"_id": ObjectId(item_id)})
return redirect("/m/inventory")
@page_inventory.route("/m/inventory/add-slug/<slug>", methods=['GET'])
@login_required
def add_inventory_slug(slug):
slug_item = db.inventory.find_one({ "shortener.slug": slug })
if slug_item:
return redirect("/m/inventory/%s/view" % slug_item["_id"])
return render_template("inventory_add_slug.html", **locals())
def is_image_ext(filename):
return '.' in filename and \
filename.rsplit('.', 1)[1].lower() in ["jpg", "jpeg"]
def get_bucket():
return boto3.client('s3',
endpoint_url=const.AWS_S3_ENDPOINT_URL,
config=boto3.session.Config(signature_version='s3v4'),
region_name='us-east-1')
@page_inventory.route("/inventory/<item_id>/upload-photo", methods=["POST"])
@login_required
def upload_photo(item_id):
user = read_user()
item = db.inventory.find_one(filter = { "_id": ObjectId(item_id) }, projection = { "thumbs": 1 })
if not item:
return "Item not found", 404
if item.get("type") == "key" and "k-space:inventory:keys" not in user.get("groups", []):
return abort(403)
if "file" not in request.files:
return "No file part", 400
file = request.files["file"]
if not file.filename:
return "No selected file", 400
if file and is_image_ext(secure_filename(file.filename)):
try:
file.seek(0)
img = JPEGImage(blob=file.read())
except:
return "Not a valid JPEG", 400
if min(img.width, img.height) < 576:
return "Image must have smallest dimension of at least 576px", 400
bucket = get_bucket()
file.seek(0)
bucket.upload_fileobj(file, const.BUCKET_NAME, item_id)
db.inventory.update_one({ "_id": ObjectId(item_id) }, {"$set": {"has_photo": True}})
delete_thumbs(item)
return redirect("/m/inventory/%s/view" % item_id)
else:
return "File is not valid", 400
@page_inventory.app_template_filter('thumbnail')
def thumbnail_filter(item_id, dimension):
return get_scaled_photo(item_id, dimension)
@page_inventory.route("/m/photo/<item_id>/<dimension>")
def get_scaled_photo(item_id=None, dimension=0, retry=2):
dimension = int(dimension) #why
if retry <= 0:
return "", 500
if dimension not in [240, 480, 576, 600]:
return abort(404)
item = db.inventory.find_one(filter = { "_id": ObjectId(item_id) }, projection = { "thumbs": 1 })
if not item:
return abort(404)
if item.get("thumbs", {}).get(str(dimension)):
thumb = item["thumbs"][str(dimension)]
img_url = get_image_url(thumb["name"])
if not img_url:
delete_thumbs(item)
return get_scaled_photo(item_id, dimension, retry - 1)
else:
make_thumb(item, dimension)
return get_scaled_photo(item_id, dimension, retry - 1)
return img_url
def get_item(key):
try:
bucket = get_bucket()
r = bucket.get_object(Bucket=const.BUCKET_NAME, Key=key)
return r['Body'].read()
except:
return None
def make_thumb(item, dimension):
img = get_item(str(item["_id"]))
img = scale_image(img, dimension)
bucket = get_bucket()
thumb_name = "thumb_%s_%d" % (item["_id"], dimension)
bucket.put_object(Body=img.as_blob(), Bucket=const.BUCKET_NAME, Key=thumb_name)
db.inventory.update_one({
"_id": ObjectId(item["_id"]),
}, {
"$set": {
"thumbs.%d.name" % dimension : thumb_name,
},
})
return img
def scale_image(img, dimension):
img = JPEGImage(blob=img)
if img.exif_orientation:
img = img.exif_autotransform()
if img.width < img.height:
img = img.downscale(dimension, img.height * dimension // img.width, 100)
else:
img = img.downscale(img.width * dimension // img.height, dimension, 100)
ratio = 4 / 3
crop_height = min(img.width / ratio, img.height)
crop_width = crop_height * ratio
crop_x = int((img.width - crop_width) / 2) & ~7
crop_y = int((img.height - crop_height) / 2) & ~7
return img.crop(crop_x, crop_y, int(crop_width), int(crop_height))
def delete_thumbs(item):
bucket = get_bucket()
for _, thumb in item.get("thumbs", {}).items():
if thumb.get("name"):
bucket.delete_object(Bucket=const.BUCKET_NAME, Key=thumb["name"])
db.inventory.update_one({
"_id": ObjectId(item["_id"])
}, {
"$unset": {
"thumbs": ""
},
})
@page_inventory.route("/m/inventory/assign-mac/<slug>")
@page_inventory.route("/m/inventory/assign-slug/<slug>")
@page_inventory.route("/m/inventory/clone-with-slug/<slug>")
@page_inventory.route("/m/inventory")
def view_inventory(slug=None):
user = request.args.get("user", type=ObjectId)
owner = request.args.get("owner", type=ObjectId)
grid = request.args.get("grid", type=str) == "true"
owner_disabled = request.args.get("owner_disabled", type=str) == "true"
user_disabled = request.args.get("user_disabled", type=str) == "true"
sort_fields = {
"last_seen": "Last seen",
"inventory_audit_timestamp": "Last audited",
"name": "Name",
"type": "Inventory type",
}
fields = [
("type", "Exclude type", list),
("tags", "Tags", list),
("location", "Location", list),
]
q = {"type": {"$ne": "token"}}
template = "inventory.html"
public_view = False
login_user = read_user()
if not login_user:
q.update({"inventory.visibility": {"$eq": "public"}})
template = "inventory_public.html"
public_view = True
else:
fields.append(("inventory.owner.username", "Owner", str))
fields.append(("inventory.user.username", "User", str))
can_audit = "k-space:inventory:audit" in login_user.get("groups", [])
can_edit_all = "k-space:inventory:edit" in login_user.get("groups", [])
v = ["public", "unlisted"]
if can_edit_all:
v.append("private")
q.update({
"$or": [
{"inventory.visibility": {"$in": v}},
{"inventory.owner.username": login_user.get('username', False)}
]
})
if slug and not public_view:
template = "inventory_pick.html"
if request.path.startswith("/m/inventory/clone-with-slug"):
action_method = "get"
action_path = "clone-by-slug"
action_label = "Clone"
action_help = "Cloning item to new slug"
pick = True
elif request.path.startswith("/m/inventory/assign-slug"):
q.update({"shortener.slug": None})
action_method = "get"
action_path = "edit-by-slug"
action_label = "Assign slug"
action_help = "Assigning slug"
pick = True
elif request.path.startswith("/m/inventory/assign-mac"):
q.update({"mac": None})
action_method = "post"
action_path = "set-mac"
action_label = "Assign MAC"
action_help = "Assigning MAC"
pick = True
if grid:
template = "inventory_grid.html"
if user:
q["inventory.user.username"] = user
if owner:
q["inventory.owner.username"] = owner
q2 = {"type": {"$ne": "token"}}
if not public_view and owner_disabled:
q2["Owner.enabled"] = False
if not public_view and user_disabled:
q2["User.enabled"] = False
q, selectors, sort_field, sort_field_final, sort_direction = build_query(dict(q), fields, sort_fields)
items = db.inventory.aggregate([
{ "$match": q },
{ "$match": q2 },
{ "$sort": { sort_field_final : 1 if sort_direction == "asc" else -1 } }
])
return render_template(template, **locals())
@page_inventory.route("/m/inventory/<item_id>/set-mac/<mac>", methods=['POST'])
@login_required
def view_set_mac(item_id, mac):
if not check_mac_address_valid(mac):
return abort(400)
result = db.inventory.update_one({
"_id": ObjectId(item_id),
"mac": {"$exists": False}
}, {
"$set": {
"mac" : mac.lower(),
},
})
if result.matched_count > 0:
return redirect("/m/inventory/%s/view" % item_id)
else:
return abort(404)
def check_mac_address_valid(mac):
mac_regex = re.compile(r'^([0-9A-Fa-f]{2}:){5}[0-9A-Fa-f]{2}$')
return bool(mac_regex.match(mac))
@page_inventory.route("/m/inventory/<item_id>/audit", methods=["POST"])
@login_required(groups=["k-space:inventory:audit"])
def view_inventory_audit(item_id):
user = read_user()
item = db.inventory.find_one(filter = {
"_id": ObjectId(item_id)
}, projection = {
"inventory.audit": 1
})
q = {
"$set": {
"inventory.audit.username": user["username"],
"inventory.audit.timestamp": datetime.utcnow(),
},
}
if item["inventory"].get("audit"):
q["$push"] = {
"inventory.audit.log": {
"username": item["inventory"]["audit"]["username"],
"timestamp": item["inventory"]["audit"]["timestamp"],
}
}
db.inventory.update_one({
"_id": ObjectId(item_id)
}, q)
if request.form and request.form.get("noRedirect", False):
return {}
else:
return redirect("/m/inventory/%s/view" % item_id)
@page_inventory.app_template_filter('audit_text')
def render_audit_text(item):
timestamp = item.get('audit', {}).get('timestamp', False)
if timestamp:
return timestamp.date()
else:
return 'Never'
@page_inventory.app_template_filter('audit_color')
def render_audit_color(item):
timestamp = item.get('audit', {}).get('timestamp', False)
today = date.today()
if not timestamp or timestamp.date() < today - timedelta(days=365):
return "red"
elif timestamp.date() < today - timedelta(days=7):
return "orange"
else:
return "green unclickable"
@page_inventory.route("/m/inventory/<item_id>/claim", methods=["POST"])
@login_required
def view_inventory_claim(item_id):
user = read_user()
item = db.inventory.find_one({
"_id": ObjectId(item_id),
"inventory.owner.username": None
})
if not item:
return abort(404)
if item.get("type") == "key" and "k-space:inventory:keys" not in user.get("groups", []):
return abort(403)
db.inventory.update_one({
"_id": ObjectId(item["_id"])
}, {
"$set": {
"inventory.owner.username": user["username"],
"inventory.owner.display_name": user["name"],
},
})
return redirect("/m/inventory/%s/view" % item_id)
@page_inventory.route("/m/inventory/<item_id>/use", methods=["POST"])
@login_required
def view_inventory_use(item_id):
user = read_user()
item = db.inventory.find_one({
"_id": ObjectId(item_id),
"inventory.usable": True,
"inventory.user.username": None
})
if not item:
return abort(404)
if item.get("type") == "key" and "k-space:inventory:keys" not in user.get("groups", []):
return abort(403)
db.inventory.update_one({
"_id": ObjectId(item["_id"])
}, {
"$set": {
"inventory.user.username": user["username"],
"inventory.user.display_name": user["name"],
},
})
return redirect("/m/inventory/%s/view" % item_id)
@page_inventory.route("/m/inventory/<item_id>/vacate", methods=["POST"])
@login_required
def view_inventory_vacate(item_id):
user = read_user()
item = db.inventory.find_one({
"_id": ObjectId(item_id),
"inventory.usable": True,
"inventory.user.username": user["username"]
})
if not item:
return abort(404)
if item.get("type") == "key" and "k-space:inventory:keys" not in user.get("groups", []):
return abort(403)
db.inventory.update_one({
"_id": ObjectId(item["_id"])
}, {
"$unset": {
"inventory.user": ""
},
})
return redirect("/m/inventory/%s/view" % item_id)