inventory-app/inventory-app/inventory.py
Madis Mägi a28189a306
All checks were successful
ci/woodpecker/manual/woodpecker Pipeline was successful
Change to oidc and new foreign id format
2023-07-30 03:17:03 +03:00

484 lines
17 KiB
Python

import boto3
import pymongo
from bson.objectid import ObjectId
from flask import Blueprint, abort, g, make_response, redirect, render_template, request
from jpegtran import JPEGImage
from pymongo import MongoClient
from werkzeug.utils import secure_filename
from wtforms import BooleanField, SelectField, StringField, validators
from wtforms.fields import FormField, TextAreaField
from wtforms.form import Form
from wtforms.validators import Length
import const
from common import CustomForm, build_query, flatten, format_name, spam
from oidc import page_oidc, login_required, read_user
page_inventory = Blueprint("inventory", __name__)
db = MongoClient(const.MONGO_URI).get_default_database()
@page_inventory.route("/m/inventory/<item_id>/view")
def view_inventory_view(item_id):
template = "inventory_view.html"
item = db.inventory.find_one({ "_id": ObjectId(item_id) })
if not read_user():
if not item["inventory"].get("public"):
return do_login()
template = "inventory_view_public.html"
base_url = const.INVENTORY_ASSETS_BASE_URL
photo_url = "%s/kspace-inventory/%s" % (base_url, item_id)
return render_template(template , **locals())
def fetch_members_select(existing_id=None):
q = [{"enabled": True}]
if existing_id:
q.append({"_id": ObjectId(existing_id)})
objs = db.member.find(filter = { "$or": q }, projection = {
"full_name": 1
}).sort("full_name", 1)
choices = [(None, None)]
for obj in objs:
choices.append((obj["_id"], obj["full_name"]))
choices = list(set(choices))
return choices
def fetch_type_select():
objs = db.inventory.distinct("type")
choices = set()
choices.add("")
for obj in objs:
choices.add(obj)
return choices
def setup_user_select(select_field, name_fields):
existing_id = None
if select_field.data and select_field.data != 'None':
existing_id = select_field.data
select_field.choices = fetch_members_select(existing_id)
if existing_id:
values = dict(select_field.choices)
value = values.get(ObjectId(existing_id))
for name_field in name_fields:
name_field.data = value
def setup_default_owner(select_field):
c = select_field.choices
d = [i for i in c if i[1] == "K-SPACE MTÜ"]
if len(d) < 1:
return
dv = d[0] or None
select_field.default = dv[0]
select_field.process([])
def member_select_coerce(x):
x = str(x)
if x and x != "None":
return ObjectId(x)
else:
return None
class MemberForm(Form):
label = None
foreign_id = SelectField(choices=[], coerce=member_select_coerce)
display_name = StringField()
def __init__(self, label=None, *args, **kwargs):
super(MemberForm, self).__init__(*args, **kwargs)
self.label = label
setup_user_select(self.foreign_id, [self.display_name])
class InventoryForm(Form):
owner = FormField(MemberForm, label="Owner")
user = FormField(MemberForm, label="Current User")
usable = BooleanField("Usable")
public = BooleanField("Public")
class HardwareForm(Form):
serial = StringField("Serial Number")
product = StringField("Product")
vendor = StringField("Vendor")
class ShortenerForm(Form):
slug = StringField("Slug", [Length(max=4)], [lambda x: x or None])
url = StringField("URL", [], [lambda x: x or None])
class InventoryItemForm(CustomForm):
type = SelectField("Type", choices=fetch_type_select())
name = StringField('Name')
issue_tracker = StringField('Issue Tracker')
comment = StringField('Comment')
inventory = FormField(InventoryForm)
hardware = FormField(HardwareForm)
shortener = FormField(ShortenerForm)
_description_placeholder = "Insert Markdown here."
description = TextAreaField('Description', [validators.optional(), validators.length(max=3000)], render_kw={"placeholder": _description_placeholder})
def setup_defaults_add(self):
setup_default_owner(self.inventory.form.owner.form.foreign_id)
self.inventory.form.public.render_kw = {"checked": "checked"}
self.inventory.form.public.data = "y"
@page_inventory.route("/m/inventory/<item_id>/edit", methods=['GET'])
@page_inventory.route("/m/inventory/<item_id>/edit-by-slug/<slug>", methods=['GET'])
@page_inventory.route("/m/inventory/add", methods=['GET'])
@page_inventory.route("/m/inventory/add-by-slug/<slug>", methods=['GET'])
@page_inventory.route("/m/inventory/<clone_item_id>/clone", methods=['GET'])
@page_inventory.route("/m/inventory/<clone_item_id>/clone-by-slug/<slug>", methods=['GET'])
@login_required
def view_inventory_edit(item_id=None, slug=None, clone_item_id=None):
item = None
if item_id:
item = db.inventory.find_one({ "_id": ObjectId(item_id) })
form = InventoryItemForm(**item)
elif clone_item_id:
item = db.inventory.find_one({ "_id": ObjectId(clone_item_id) })
item.pop("_id", None)
item.pop("shortener", None)
item.pop("name", None)
if "hardware" in item:
item["hardware"].pop("serial", None)
form = InventoryItemForm(**item)
else:
form = InventoryItemForm()
form.setup_defaults_add()
if slug:
form.shortener.slug.data = slug
all_tags = db.inventory.distinct("tags")
item_tags = []
if item and item.get("tags"):
item_tags = item["tags"]
return render_template("inventory_edit.html", **locals())
@page_inventory.route("/m/inventory/<item_id>/edit", methods=['POST'])
@page_inventory.route("/m/inventory/<item_id>/edit-by-slug/<slug>", methods=['POST'])
@page_inventory.route("/m/inventory/add", methods=['POST'])
@page_inventory.route("/m/inventory/add-by-slug/<slug>", methods=['POST'])
@page_inventory.route("/m/inventory/<clone_item_id>/clone", methods=['POST'])
@page_inventory.route("/m/inventory/<clone_item_id>/clone-by-slug/<slug>", methods=['POST'])
@login_required
def save_inventory_item(item_id=None, **_):
form = InventoryItemForm(request.form)
if not form.validate_on_submit():
has_errors = True
return render_template("inventory_edit.html", **locals())
d = {}
form.populate_dict(d)
d['tags'] = list(set(request.form.getlist('tags[]')))
custom_errors = {}
try:
if item_id:
df = flatten(d)
r = {}
for k, v in dict(df).items():
if k.startswith("shortener.") and not v:
df.pop(k)
r[k] = v
if k.startswith("inventory.user.") and not v:
df.pop(k)
r["inventory.user"] = ""
if k.startswith("inventory.owner.") and not v:
df.pop(k)
r["inventory.owner"] = ""
db.inventory.update_one({
"_id": ObjectId(item_id)
}, {
"$set": df,
"$unset": r
})
else:
d["inventory"]["managed"] = True
if "shortener" in d:
if not d["shortener"]["slug"]:
d.pop("shortener")
if "user" in d["inventory"]:
if not d["inventory"]["user"]["foreign_id"]:
d["inventory"].pop("user")
if "owner" in d["inventory"]:
if not d["inventory"]["owner"]["foreign_id"]:
d["inventory"].pop("owner")
item_id = db.inventory.insert_one(d).inserted_id
except pymongo.errors.DuplicateKeyError:
has_errors = True
slug_error = "Slug %s already is assigned" % d["shortener"]["slug"]
custom_errors["Shortener"] = slug_error
return render_template("inventory_edit.html", **locals())
return redirect("/m/inventory/%s/view" % item_id)
@page_inventory.route("/m/inventory/add-slug/<slug>", methods=['GET'])
@login_required
def add_inventory_slug(slug):
slug_item = db.inventory.find_one({ "shortener.slug": slug })
if slug_item:
return redirect("/m/inventory/%s/view" % slug_item["_id"])
return render_template("inventory_add_slug.html", **locals())
def is_image_ext(filename):
return '.' in filename and \
filename.rsplit('.', 1)[1].lower() in ["jpg", "jpeg"]
def get_bucket():
return boto3.client('s3',
endpoint_url=const.AWS_ENDPOINT_URL,
config=boto3.session.Config(signature_version='s3v4'),
region_name='us-east-1')
@page_inventory.route("/inventory/<item_id>/upload-photo", methods=["POST"])
@login_required
def upload_photo(item_id):
item = db.inventory.find_one(filter = { "_id": ObjectId(item_id) }, projection = { "thumbs": 1 })
if not item:
return "Item not found", 404
if "file" not in request.files:
return "No file part", 400
file = request.files["file"]
if not file.filename:
return "No selected file", 400
if file and is_image_ext(secure_filename(file.filename)):
try:
file.seek(0)
img = JPEGImage(blob=file.read())
except:
return "Not a valid JPEG", 400
if min(img.width, img.height) < 576:
return "Image must have smallest dimension of at least 576px", 400
bucket = get_bucket()
file.seek(0)
bucket.upload_fileobj(file, 'kspace-inventory', item_id)
db.inventory.update_one({ "_id": ObjectId(item_id) }, {"$set": {"has_photo": True}})
delete_thumbs(item)
return redirect("/m/inventory/%s/view" % item_id)
else:
return "File is not valid", 400
@page_inventory.route("/m/photo/<item_id>/<dimension>")
def get_scaled_photo(item_id=None, dimension=0, retry=2):
dimension = int(dimension) #why
if retry <= 0:
return "", 500
if dimension not in [240, 480, 576, 600]:
return abort(404)
item = db.inventory.find_one(filter = { "_id": ObjectId(item_id) }, projection = { "thumbs": 1 })
if not item:
return abort(404)
if item.get("thumbs", {}).get(str(dimension)):
thumb = item["thumbs"][str(dimension)]
img = get_item(thumb["name"])
if not img:
delete_thumbs(item)
return get_scaled_photo(item_id, dimension, retry - 1)
img = JPEGImage(blob=img)
else:
make_thumb(item, dimension)
return get_scaled_photo(item_id, dimension, retry - 1)
response = make_response(img.as_blob())
response.headers.set('Content-Type', 'image/jpeg')
response.cache_control.public = True
response.cache_control.immutable = True
response.cache_control.max_age = 31536000
return response
def get_item(key):
try:
bucket = get_bucket()
r = bucket.get_object(Bucket='kspace-inventory', Key=key)
return r['Body'].read()
except:
return None
def make_thumb(item, dimension):
img = get_item(str(item["_id"]))
img = scale_image(img, dimension)
bucket = get_bucket()
thumb_name = "thumb_%s_%d" % (item["_id"], dimension)
bucket.put_object(Body=img.as_blob(), Bucket='kspace-inventory', Key=thumb_name)
db.inventory.update_one({
"_id": ObjectId(item["_id"]),
}, {
"$set": {
"thumbs.%d.name" % dimension : thumb_name,
},
})
return img
def scale_image(img, dimension):
img = JPEGImage(blob=img)
if img.exif_orientation:
img = img.exif_autotransform()
if img.width < img.height:
img = img.downscale(dimension, img.height * dimension // img.width, 100)
else:
img = img.downscale(img.width * dimension // img.height, dimension, 100)
ratio = 4 / 3
crop_height = min(img.width / ratio, img.height)
crop_width = crop_height * ratio
crop_x = int((img.width - crop_width) / 2) & ~7
crop_y = int((img.height - crop_height) / 2) & ~7
return img.crop(crop_x, crop_y, int(crop_width), int(crop_height))
def delete_thumbs(item):
bucket = get_bucket()
for _, thumb in item.get("thumbs", {}).items():
if thumb.get("name"):
bucket.delete_object(Bucket='kspace-inventory', Key=thumb["name"])
db.inventory.update_one({
"_id": ObjectId(item["_id"])
}, {
"$unset": {
"thumbs": ""
},
})
@page_inventory.route("/m/inventory/assign-slug/<slug>")
@page_inventory.route("/m/inventory/clone-with-slug/<slug>")
@page_inventory.route("/m/inventory")
def view_inventory(slug=None):
user = request.args.get("user", type=ObjectId)
owner = request.args.get("owner", type=ObjectId)
grid = request.args.get("grid", type=str) == "true"
owner_disabled = request.args.get("owner_disabled", type=str) == "true"
user_disabled = request.args.get("user_disabled", type=str) == "true"
sort_fields = {
"last_seen": "Last seen",
"name": "Name",
"type": "Inventory type",
}
fields = [
("type", "Exclude type", list),
("tags", "Tags", list),
]
q = {"type": {"$ne": "token"}}
template = "inventory.html"
public_view = False
if not read_user():
q.update({"inventory.public": True})
template = "inventory_public.html"
public_view = True
else:
fields.append(("inventory.owner", "Owner", ObjectId))
fields.append(("inventory.user", "User", ObjectId))
if slug and not public_view:
template = "inventory_pick.html"
if request.path.startswith("/m/inventory/clone-with-slug"):
action_path = "clone-by-slug"
action_label = "Clone"
action_help = "Cloning item to new slug"
pick = True
elif request.path.startswith("/m/inventory/assign-slug"):
q.update({"shortener.slug": None})
action_path = "edit-by-slug"
action_label = "Assign slug"
action_help = "Assigning slug"
pick = True
if grid:
template = "inventory_grid.html"
if user:
q["inventory.user.foreign_id"] = user
if owner:
q["inventory.owner.foreign_id"] = owner
q2 = {"type": {"$ne": "token"}}
if not public_view and owner_disabled:
q2["Owner.enabled"] = False
if not public_view and user_disabled:
q2["User.enabled"] = False
q, selectors, sort_field, sort_field_final, sort_direction = build_query(dict(q), fields, sort_fields)
items = db.inventory.aggregate([
{ "$match": q },
{
"$lookup": {
"from": 'member',
"localField": 'inventory.owner.foreign_id',
"foreignField": '_id',
"as": 'Owner'
}
},
{
"$lookup": {
"from": 'member',
"localField": 'inventory.user.foreign_id',
"foreignField": '_id',
"as": 'User'
}
},
{ "$match": q2 },
{ "$sort": { sort_field_final : 1 if sort_direction == "asc" else -1 } }
])
return render_template(template, **locals())
@page_inventory.route("/m/inventory/<item_id>/claim", methods=["POST"])
@login_required
def view_inventory_claim(item_id):
user = read_user()
db.inventory.update_one({
"_id": ObjectId(item_id),
"inventory.owner.foreign_id": None
}, {
"$set": {
"inventory.owner.foreign_id": user["username"],
"inventory.owner.display_name": user["name"],
},
})
return redirect("/m/inventory/%s/view" % item_id)
@page_inventory.route("/m/inventory/<item_id>/use", methods=["POST"])
@login_required
def view_inventory_use(item_id):
user = read_user()
item = db.inventory.find_one({
"_id": ObjectId(item_id),
"inventory.usable": True,
"inventory.user.foreign_id": None
})
if not item:
return abort(404)
db.inventory.update_one({
"_id": ObjectId(item["_id"])
}, {
"$set": {
"inventory.user.foreign_id": ObjectId(user["username"]),
"inventory.user.display_name": user["name"],
},
})
item_name = format_name(item)
msg = "%s has started using %s" % (user["name"], item_name)
if item.get("shortener") and item["shortener"].get("slug"):
msg += ("\nk6.ee/%s" % item["shortener"]["slug"])
spam(msg)
return redirect("/m/inventory/%s/view" % item_id)
@page_inventory.route("/m/inventory/<item_id>/vacate", methods=["POST"])
@login_required
def view_inventory_vacate(item_id):
user = read_user()
item = db.inventory.find_one({
"_id": ObjectId(item_id),
"inventory.usable": True,
"inventory.user.foreign_id": ObjectId(user["username"])
})
if not item:
return abort(404)
db.inventory.update_one({
"_id": ObjectId(item["_id"])
}, {
"$unset": {
"inventory.user": ""
},
})
item_name = format_name(item)
msg = "%s has stopped using %s" % (user["name"], item_name)
if item.get("shortener") and item["shortener"].get("slug"):
msg += ("\nk6.ee/%s" % item["shortener"]["slug"])
spam(msg)
return redirect("/m/inventory/%s/view" % item_id)