Files
inventory-app/inventory-app/inventory.py
rasmus 9c6902c5a2 alias add-slug to add-by-slug
Nobody was really picking. Slug can be assigned to existing item by editing it (possibly button to scan-in-web on the item's page).

This skips a page load + button press, and speeds up inventorying. Aliasing done on goredirect side.
2025-06-24 18:45:23 +03:00

601 lines
20 KiB
Python

import re
import boto3
import pymongo
import urllib
from datetime import datetime, date, timedelta
from botocore.exceptions import ClientError
from bson.objectid import ObjectId
from flask import Blueprint, abort, g, redirect, render_template, request, url_for
from jpegtran import JPEGImage
from pymongo import MongoClient
from werkzeug.utils import secure_filename
from wtforms import BooleanField, SelectField, StringField
from wtforms.fields import FormField
from wtforms.form import Form
from wtforms.validators import Length
import const
from common import CustomForm, build_query, flatten
from oidc import login_required, read_user, do_login
page_inventory = Blueprint("inventory", __name__)
db = MongoClient(const.MONGO_URI).get_default_database()
channel = "inventory"
@login_required
@page_inventory.route("/m/inventory/by-mac/<mac>", methods=['GET'])
def view_inventory_by_mac(mac):
if not check_mac_address_valid(mac):
return "Invalid mac address", 400
mac = mac.lower()
item = db.inventory.find_one({ "mac": mac }, projection = { "_id": 1})
if not item or not item.get("_id", False):
return redirect("/m/inventory/assign-mac/%s" % mac)
return redirect(url_for("inventory.view_inventory_view", item_id = item["_id"]))
@page_inventory.route("/m/inventory/<item_id>/view")
def view_inventory_view(item_id):
user = read_user()
template = "inventory_view.html"
item = db.inventory.find_one({ "_id": ObjectId(item_id) })
if not item:
return abort(404)
item_user = item.get("inventory", {}).get("user", {}).get("username", None)
if not user:
if item["inventory"].get("visibility") not in ["Featured", "Normal"]:
return do_login()
template = "inventory_view_public.html"
redirect_url = urllib.parse.quote_plus(request.full_path)
else:
can_audit = "k-space:inventory:audit" in user.get("groups", [])
can_edit = check_edit_permission(item_id)
is_using = item_user and item_user == user["username"]
photo_url = get_image_url(item_id)
# pylance: disable=unused-variable
constants = {"MACADDRESS_OUTLINK_BASEURL": const.MACADDRESS_OUTLINK_BASEURL}
return render_template(template , **locals())
def get_image_url(item_id):
bucket=get_bucket()
try:
return bucket.generate_presigned_url(
ClientMethod='get_object',
Params={
'Bucket': const.BUCKET_NAME,
'Key': item_id
},
ExpiresIn=3600
)
except ClientError:
return None
def fetch_members_select():
top_usernames= ['k-space', None]
choices = [(None, None)]
for username, user in g.users_lookup.items():
choices.append((username, user.display_name or username))
choices = list(set(choices))
choices = sorted(choices, key = lambda k: k[1] or "")
top = [k for k in choices if k[0] in top_usernames]
rest = [k for k in choices if k[0] not in top_usernames]
return top + rest
def fetch_type_select():
objs = db.inventory.distinct("type")
choices = set()
choices.add("")
for obj in objs:
choices.add(obj)
return choices
def setup_user_select(select_field, name_fields):
existing_id = None
if select_field.data and select_field.data != 'None':
existing_id = select_field.data
select_field.choices = fetch_members_select()
if existing_id:
values = dict(select_field.choices)
value = values.get(existing_id)
for name_field in name_fields:
name_field.data = value
def member_select_coerce(x):
x = str(x)
if x and x != "None":
return x
else:
return None
class MemberForm(Form):
label = None
username = SelectField(choices=[], coerce=member_select_coerce)
display_name = StringField()
def __init__(self, label=None, *args, **kwargs):
super(MemberForm, self).__init__(*args, **kwargs)
self.label = label
setup_user_select(self.username, [self.display_name])
class InventoryForm(Form):
owner = FormField(MemberForm, label="Owner")
user = FormField(MemberForm, label="Current User")
usable = BooleanField("Usable")
visibility = SelectField("Visibility", choices=['Normal', 'Featured', 'Protected', 'Archived'], default='Normal')
class HardwareForm(Form):
serial = StringField("Serial Number")
product = StringField("Product")
vendor = StringField("Vendor")
class ShortenerForm(Form):
slug = StringField("Slug", [Length(max=4)], [lambda x: x or None])
url = StringField("URL", [], [lambda x: x or None])
class InventoryItemForm(CustomForm):
type = SelectField("Type", choices=fetch_type_select())
name = StringField('Name')
comment = StringField('Comment')
inventory = FormField(InventoryForm)
hardware = FormField(HardwareForm)
shortener = FormField(ShortenerForm)
location = StringField('Location')
def check_edit_permission(item_id):
if not item_id:
return False
user = read_user()
if not user:
return False
item = db.inventory.find_one(filter = { "_id": ObjectId(item_id) }, projection = { "inventory.owner": 1 , "type": 1})
if not item:
return False
user_groups = user.get("groups", [])
if item.get("type") == "key" and "k-space:inventory:keys" not in user_groups:
return False
if "k-space:inventory:edit" in user_groups:
return True
item_username = item.get("inventory", {}).get("owner", {}).get("username", False)
user_username = user.get("username", False)
if not item_username or not user_username:
return False
return item_username == user_username
@page_inventory.route("/m/inventory/<item_id>/edit", methods=['GET'])
@page_inventory.route("/m/inventory/<item_id>/edit-by-slug/<slug>", methods=['GET'])
@page_inventory.route("/m/inventory/add", methods=['GET'])
@page_inventory.route("/m/inventory/add-by-slug/<slug>", methods=['GET'])
@page_inventory.route("/m/inventory/<clone_item_id>/clone", methods=['GET'])
@page_inventory.route("/m/inventory/<clone_item_id>/clone-by-slug/<slug>", methods=['GET'])
@login_required
def view_inventory_edit(item_id=None, slug=None, clone_item_id=None):
user = read_user()
item = None
if item_id:
if not check_edit_permission(item_id):
return abort(403)
item = db.inventory.find_one({ "_id": ObjectId(item_id) })
form = InventoryItemForm(**item)
elif clone_item_id:
item = db.inventory.find_one({ "_id": ObjectId(clone_item_id) })
item.pop("_id", None)
item.pop("shortener", None)
item.pop("name", None)
if "hardware" in item:
item["hardware"].pop("serial", None)
form = InventoryItemForm(**item)
else:
form = InventoryItemForm()
# prefill
if slug:
form.shortener.slug.data = slug
all_tags = db.inventory.distinct("tags")
item_tags = []
if item and item.get("tags"):
item_tags = item["tags"]
has_edit_all = user and "k-space:inventory:edit" in user.get("groups", [])
return render_template("inventory_edit.html", **locals())
@page_inventory.route("/m/inventory/<item_id>/edit", methods=['POST'])
@page_inventory.route("/m/inventory/<item_id>/edit-by-slug/<slug>", methods=['POST'])
@page_inventory.route("/m/inventory/add", methods=['POST'])
@page_inventory.route("/m/inventory/add-by-slug/<slug>", methods=['POST'])
@page_inventory.route("/m/inventory/<clone_item_id>/clone", methods=['POST'])
@page_inventory.route("/m/inventory/<clone_item_id>/clone-by-slug/<slug>", methods=['POST'])
@login_required
def save_inventory_item(item_id=None, **_):
if item_id and not check_edit_permission(item_id):
return abort(403)
form = InventoryItemForm(request.form)
if not form.validate_on_submit():
has_errors = True
return render_template("inventory_edit.html", **locals())
d = {}
form.populate_dict(d)
d['tags'] = list(set(request.form.getlist('tags[]')))
custom_errors = {}
try:
if item_id:
df = flatten(d)
r = {}
for k, v in dict(df).items():
if k.startswith("shortener.") and not v:
df.pop(k)
r[k] = v
if k.startswith("inventory.user.") and not v:
df.pop(k)
r["inventory.user"] = ""
if k.startswith("inventory.owner.") and not v:
df.pop(k)
r["inventory.owner"] = ""
db.inventory.update_one({
"_id": ObjectId(item_id)
}, {
"$set": df,
"$unset": r
})
else:
d["inventory"]["managed"] = True
if "shortener" in d:
if not d["shortener"]["slug"]:
d.pop("shortener")
if "user" in d["inventory"]:
if not d["inventory"]["user"]["username"]:
d["inventory"].pop("user")
if "owner" in d["inventory"]:
if not d["inventory"]["owner"]["username"]:
d["inventory"].pop("owner")
item_id = db.inventory.insert_one(d).inserted_id
except pymongo.errors.DuplicateKeyError:
has_errors = True
slug_error = "Slug %s already is assigned" % d["shortener"]["slug"]
custom_errors["Shortener"] = slug_error
return render_template("inventory_edit.html", **locals())
return redirect("/m/inventory/%s/view" % item_id)
def is_image_ext(filename):
return '.' in filename and \
filename.rsplit('.', 1)[1].lower() in ["jpg", "jpeg"]
def get_bucket():
return boto3.client('s3',
endpoint_url=const.AWS_S3_ENDPOINT_URL,
config=boto3.session.Config(signature_version='s3v4'),
region_name='us-east-1')
@page_inventory.route("/inventory/<item_id>/upload-photo", methods=["POST"])
@login_required
def upload_photo(item_id):
user = read_user()
item = db.inventory.find_one(filter = { "_id": ObjectId(item_id) }, projection = { "thumbs": 1 })
if not item:
return "Item not found", 404
if item.get("type") == "key" and "k-space:inventory:keys" not in user.get("groups", []):
return abort(403)
if "file" not in request.files:
return "No file part", 400
file = request.files["file"]
if not file.filename:
return "No selected file", 400
if file and is_image_ext(secure_filename(file.filename)):
try:
file.seek(0)
img = JPEGImage(blob=file.read())
except:
return "Not a valid JPEG", 400
if min(img.width, img.height) < 576:
return "Image must have smallest dimension of at least 576px", 400
bucket = get_bucket()
file.seek(0)
bucket.upload_fileobj(file, const.BUCKET_NAME, item_id)
db.inventory.update_one({ "_id": ObjectId(item_id) }, {"$set": {"has_photo": True}})
delete_thumbs(item)
return redirect("/m/inventory/%s/view" % item_id)
else:
return "File is not valid", 400
@page_inventory.app_template_filter('thumbnail')
def thumbnail_filter(item_id, dimension):
return get_scaled_photo(item_id, dimension)
@page_inventory.route("/m/photo/<item_id>/<dimension>")
def get_scaled_photo(item_id=None, dimension=0, retry=2):
dimension = int(dimension) #why
if retry <= 0:
return "", 500
if dimension not in [240, 480, 576, 600]:
return abort(404)
item = db.inventory.find_one(filter = { "_id": ObjectId(item_id) }, projection = { "thumbs": 1 })
if not item:
return abort(404)
if item.get("thumbs", {}).get(str(dimension)):
thumb = item["thumbs"][str(dimension)]
img_url = get_image_url(thumb["name"])
if not img_url:
delete_thumbs(item)
return get_scaled_photo(item_id, dimension, retry - 1)
else:
make_thumb(item, dimension)
return get_scaled_photo(item_id, dimension, retry - 1)
return img_url
def get_item(key):
try:
bucket = get_bucket()
r = bucket.get_object(Bucket=const.BUCKET_NAME, Key=key)
return r['Body'].read()
except:
return None
def make_thumb(item, dimension):
img = get_item(str(item["_id"]))
img = scale_image(img, dimension)
bucket = get_bucket()
thumb_name = "thumb_%s_%d" % (item["_id"], dimension)
bucket.put_object(Body=img.as_blob(), Bucket=const.BUCKET_NAME, Key=thumb_name)
db.inventory.update_one({
"_id": ObjectId(item["_id"]),
}, {
"$set": {
"thumbs.%d.name" % dimension : thumb_name,
},
})
return img
def scale_image(img, dimension):
img = JPEGImage(blob=img)
if img.exif_orientation:
img = img.exif_autotransform()
if img.width < img.height:
img = img.downscale(dimension, img.height * dimension // img.width, 100)
else:
img = img.downscale(img.width * dimension // img.height, dimension, 100)
ratio = 4 / 3
crop_height = min(img.width / ratio, img.height)
crop_width = crop_height * ratio
crop_x = int((img.width - crop_width) / 2) & ~7
crop_y = int((img.height - crop_height) / 2) & ~7
return img.crop(crop_x, crop_y, int(crop_width), int(crop_height))
def delete_thumbs(item):
bucket = get_bucket()
for _, thumb in item.get("thumbs", {}).items():
if thumb.get("name"):
bucket.delete_object(Bucket=const.BUCKET_NAME, Key=thumb["name"])
db.inventory.update_one({
"_id": ObjectId(item["_id"])
}, {
"$unset": {
"thumbs": ""
},
})
@page_inventory.route("/m/inventory/assign-mac/<slug>")
@page_inventory.route("/m/inventory/assign-slug/<slug>")
@page_inventory.route("/m/inventory")
def view_inventory(slug=None):
user = request.args.get("user", type=ObjectId)
owner = request.args.get("owner", type=ObjectId)
grid = request.args.get("grid", type=str) == "true"
sort_fields = {
"last_seen": "Last seen",
"inventory_audit_timestamp": "Last audited",
"name": "Name",
"type": "Inventory type",
}
fields = [
("type", "Exclude type", list),
("tags", "Tags", list),
("location", "Location", list),
]
q = {"type": {"$ne": "token"}}
template = "inventory.html"
public_view = False
login_user = read_user()
if not login_user:
q.update({"inventory.visibility": {"$eq": "Featured"}})
template = "inventory_public.html"
public_view = True
else:
fields.append(("inventory.owner.username", "Owner", str))
fields.append(("inventory.user.username", "User", str))
can_audit = "k-space:inventory:audit" in login_user.get("groups", [])
can_edit_all = "k-space:inventory:edit" in login_user.get("groups", [])
v = ["Featured", "Normal"]
if can_edit_all:
v.append("Protected")
q.update({
"$or": [
{"inventory.visibility": {"$in": v}},
{"inventory.owner.username": login_user.get('username', False)}
]
})
if slug and not public_view:
template = "inventory_pick.html"
if request.path.startswith("/m/inventory/assign-slug"):
q.update({"shortener.slug": None})
action_method = "get"
action_path = "edit-by-slug"
action_label = "Assign slug"
action_help = "Assigning slug"
pick = True
elif request.path.startswith("/m/inventory/assign-mac"):
q.update({"mac": None})
action_method = "post"
action_path = "set-mac"
action_label = "Assign MAC"
action_help = "Assigning MAC"
pick = True
if grid:
template = "inventory_grid.html"
if user:
q["inventory.user.username"] = user
if owner:
q["inventory.owner.username"] = owner
q, selectors, sort_field, sort_field_final, sort_direction = build_query(dict(q), fields, sort_fields)
items = db.inventory.aggregate([
{ "$match": q },
{ "$match": {"type": {"$ne": "token"}} },
{ "$sort": { sort_field_final : 1 if sort_direction == "asc" else -1 } }
])
return render_template(template, **locals())
@page_inventory.route("/m/inventory/<item_id>/set-mac/<mac>", methods=['POST'])
@login_required
def view_set_mac(item_id, mac):
if not check_mac_address_valid(mac):
return abort(400)
result = db.inventory.update_one({
"_id": ObjectId(item_id),
"mac": {"$exists": False}
}, {
"$set": {
"mac" : mac.lower(),
},
})
if result.matched_count > 0:
return redirect("/m/inventory/%s/view" % item_id)
else:
return abort(404)
def check_mac_address_valid(mac):
mac_regex = re.compile(r'^([0-9A-Fa-f]{2}:){5}[0-9A-Fa-f]{2}$')
return bool(mac_regex.match(mac))
@page_inventory.route("/m/inventory/<item_id>/audit", methods=["POST"])
@login_required(groups=["k-space:inventory:audit"])
def view_inventory_audit(item_id):
user = read_user()
item = db.inventory.find_one(filter = {
"_id": ObjectId(item_id)
}, projection = {
"inventory.audit": 1
})
q = {
"$set": {
"inventory.audit.username": user["username"],
"inventory.audit.timestamp": datetime.utcnow(),
},
}
if item["inventory"].get("audit"):
q["$push"] = {
"inventory.audit.log": {
"username": item["inventory"]["audit"]["username"],
"timestamp": item["inventory"]["audit"]["timestamp"],
}
}
db.inventory.update_one({
"_id": ObjectId(item_id)
}, q)
if request.form and request.form.get("noRedirect", False):
return {}
else:
return redirect("/m/inventory/%s/view" % item_id)
@page_inventory.app_template_filter('audit_text')
def render_audit_text(item):
timestamp = item.get('audit', {}).get('timestamp', False)
if timestamp:
return timestamp.date()
else:
return 'Never'
@page_inventory.app_template_filter('audit_color')
def render_audit_color(item):
timestamp = item.get('audit', {}).get('timestamp', False)
today = date.today()
if not timestamp or timestamp.date() < today - timedelta(days=365):
return "red"
elif timestamp.date() < today - timedelta(days=7):
return "orange"
else:
return "green unclickable"
@page_inventory.route("/m/inventory/<item_id>/claim", methods=["POST"])
@login_required
def view_inventory_claim(item_id):
user = read_user()
item = db.inventory.find_one({
"_id": ObjectId(item_id),
"inventory.owner.username": None
})
if not item:
return abort(404)
if item.get("type") == "key" and "k-space:inventory:keys" not in user.get("groups", []):
return abort(403)
db.inventory.update_one({
"_id": ObjectId(item["_id"])
}, {
"$set": {
"inventory.owner.username": user["username"],
"inventory.owner.display_name": user["name"],
},
})
return redirect("/m/inventory/%s/view" % item_id)
@page_inventory.route("/m/inventory/<item_id>/use", methods=["POST"])
@login_required
def view_inventory_use(item_id):
user = read_user()
item = db.inventory.find_one({
"_id": ObjectId(item_id),
"inventory.usable": True,
"inventory.user.username": None
})
if not item:
return abort(404)
if item.get("type") == "key" and "k-space:inventory:keys" not in user.get("groups", []):
return abort(403)
db.inventory.update_one({
"_id": ObjectId(item["_id"])
}, {
"$set": {
"inventory.user.username": user["username"],
"inventory.user.display_name": user["name"],
},
})
return redirect("/m/inventory/%s/view" % item_id)
@page_inventory.route("/m/inventory/<item_id>/vacate", methods=["POST"])
@login_required
def view_inventory_vacate(item_id):
user = read_user()
item = db.inventory.find_one({
"_id": ObjectId(item_id),
"inventory.usable": True,
"inventory.user.username": user["username"]
})
if not item:
return abort(404)
if item.get("type") == "key" and "k-space:inventory:keys" not in user.get("groups", []):
return abort(403)
db.inventory.update_one({
"_id": ObjectId(item["_id"])
}, {
"$unset": {
"inventory.user": ""
},
})
return redirect("/m/inventory/%s/view" % item_id)