import boto3 import pymongo from datetime import datetime from bson.objectid import ObjectId from flask import Blueprint, abort, g, make_response, redirect, render_template, request from jpegtran import JPEGImage from pymongo import MongoClient from werkzeug.utils import secure_filename from wtforms import BooleanField, SelectField, StringField, validators from wtforms.fields import FormField, TextAreaField from wtforms.form import Form from wtforms.validators import Length import const from common import CustomForm, build_query, flatten, format_name, spam from oidc import page_oidc, login_required, read_user, do_login page_inventory = Blueprint("inventory", __name__) db = MongoClient(const.MONGO_URI).get_default_database() @page_inventory.route("/m/inventory//view") def view_inventory_view(item_id): user = read_user() template = "inventory_view.html" item = db.inventory.find_one({ "_id": ObjectId(item_id) }) if not user: if not item["inventory"].get("public"): return do_login() template = "inventory_view_public.html" else: can_audit = "k-space:janitors" in user["groups"] bucket=get_bucket() photo_url = bucket.generate_presigned_url( ClientMethod='get_object', Params={ 'Bucket': const.BUCKET_NAME, 'Key': item_id }, ExpiresIn=3600 ) return render_template(template , **locals()) def fetch_members_select(): choices = [(None, None)] for username, user in g.users_lookup.items(): choices.append((username, user.display_name or username)) choices = list(set(choices)) choices = sorted(choices, key = lambda k: k[1] or "") return choices def fetch_type_select(): objs = db.inventory.distinct("type") choices = set() choices.add("") for obj in objs: choices.add(obj) return choices def setup_user_select(select_field, name_fields): existing_id = None if select_field.data and select_field.data != 'None': existing_id = select_field.data select_field.choices = fetch_members_select() if existing_id: values = dict(select_field.choices) value = values.get(existing_id) for name_field in name_fields: name_field.data = value def setup_default_owner(select_field): c = select_field.choices d = [i for i in c if i[1] == "K-SPACE MTÜ"] if len(d) < 1: return dv = d[0] or None select_field.default = dv[0] select_field.process([]) def member_select_coerce(x): x = str(x) if x and x != "None": return x else: return None class MemberForm(Form): label = None username = SelectField(choices=[], coerce=member_select_coerce) display_name = StringField() def __init__(self, label=None, *args, **kwargs): super(MemberForm, self).__init__(*args, **kwargs) self.label = label setup_user_select(self.username, [self.display_name]) class InventoryForm(Form): owner = FormField(MemberForm, label="Owner") user = FormField(MemberForm, label="Current User") usable = BooleanField("Usable") public = BooleanField("Public") class HardwareForm(Form): serial = StringField("Serial Number") product = StringField("Product") vendor = StringField("Vendor") class ShortenerForm(Form): slug = StringField("Slug", [Length(max=4)], [lambda x: x or None]) url = StringField("URL", [], [lambda x: x or None]) class InventoryItemForm(CustomForm): type = SelectField("Type", choices=fetch_type_select()) name = StringField('Name') issue_tracker = StringField('Issue Tracker') comment = StringField('Comment') inventory = FormField(InventoryForm) hardware = FormField(HardwareForm) shortener = FormField(ShortenerForm) _description_placeholder = "Insert Markdown here." description = TextAreaField('Description', [validators.optional(), validators.length(max=3000)], render_kw={"placeholder": _description_placeholder}) def setup_defaults_add(self): setup_default_owner(self.inventory.form.owner.form.username) self.inventory.form.public.render_kw = {"checked": "checked"} self.inventory.form.public.data = "y" @page_inventory.route("/m/inventory//edit", methods=['GET']) @page_inventory.route("/m/inventory//edit-by-slug/", methods=['GET']) @page_inventory.route("/m/inventory/add", methods=['GET']) @page_inventory.route("/m/inventory/add-by-slug/", methods=['GET']) @page_inventory.route("/m/inventory//clone", methods=['GET']) @page_inventory.route("/m/inventory//clone-by-slug/", methods=['GET']) @login_required def view_inventory_edit(item_id=None, slug=None, clone_item_id=None): item = None if item_id: item = db.inventory.find_one({ "_id": ObjectId(item_id) }) form = InventoryItemForm(**item) elif clone_item_id: item = db.inventory.find_one({ "_id": ObjectId(clone_item_id) }) item.pop("_id", None) item.pop("shortener", None) item.pop("name", None) if "hardware" in item: item["hardware"].pop("serial", None) form = InventoryItemForm(**item) else: form = InventoryItemForm() form.setup_defaults_add() if slug: form.shortener.slug.data = slug all_tags = db.inventory.distinct("tags") item_tags = [] if item and item.get("tags"): item_tags = item["tags"] return render_template("inventory_edit.html", **locals()) @page_inventory.route("/m/inventory//edit", methods=['POST']) @page_inventory.route("/m/inventory//edit-by-slug/", methods=['POST']) @page_inventory.route("/m/inventory/add", methods=['POST']) @page_inventory.route("/m/inventory/add-by-slug/", methods=['POST']) @page_inventory.route("/m/inventory//clone", methods=['POST']) @page_inventory.route("/m/inventory//clone-by-slug/", methods=['POST']) @login_required def save_inventory_item(item_id=None, **_): form = InventoryItemForm(request.form) if not form.validate_on_submit(): has_errors = True return render_template("inventory_edit.html", **locals()) d = {} form.populate_dict(d) d['tags'] = list(set(request.form.getlist('tags[]'))) custom_errors = {} try: if item_id: df = flatten(d) r = {} for k, v in dict(df).items(): if k.startswith("shortener.") and not v: df.pop(k) r[k] = v if k.startswith("inventory.user.") and not v: df.pop(k) r["inventory.user"] = "" if k.startswith("inventory.owner.") and not v: df.pop(k) r["inventory.owner"] = "" db.inventory.update_one({ "_id": ObjectId(item_id) }, { "$set": df, "$unset": r }) else: d["inventory"]["managed"] = True if "shortener" in d: if not d["shortener"]["slug"]: d.pop("shortener") if "user" in d["inventory"]: if not d["inventory"]["user"]["username"]: d["inventory"].pop("user") if "owner" in d["inventory"]: if not d["inventory"]["owner"]["username"]: d["inventory"].pop("owner") item_id = db.inventory.insert_one(d).inserted_id except pymongo.errors.DuplicateKeyError: has_errors = True slug_error = "Slug %s already is assigned" % d["shortener"]["slug"] custom_errors["Shortener"] = slug_error return render_template("inventory_edit.html", **locals()) return redirect("/m/inventory/%s/view" % item_id) @page_inventory.route("/m/inventory/add-slug/", methods=['GET']) @login_required def add_inventory_slug(slug): slug_item = db.inventory.find_one({ "shortener.slug": slug }) if slug_item: return redirect("/m/inventory/%s/view" % slug_item["_id"]) return render_template("inventory_add_slug.html", **locals()) def is_image_ext(filename): return '.' in filename and \ filename.rsplit('.', 1)[1].lower() in ["jpg", "jpeg"] def get_bucket(): return boto3.client('s3', endpoint_url=const.AWS_S3_ENDPOINT_URL, config=boto3.session.Config(signature_version='s3v4'), region_name='us-east-1') @page_inventory.route("/inventory//upload-photo", methods=["POST"]) @login_required def upload_photo(item_id): item = db.inventory.find_one(filter = { "_id": ObjectId(item_id) }, projection = { "thumbs": 1 }) if not item: return "Item not found", 404 if "file" not in request.files: return "No file part", 400 file = request.files["file"] if not file.filename: return "No selected file", 400 if file and is_image_ext(secure_filename(file.filename)): try: file.seek(0) img = JPEGImage(blob=file.read()) except: return "Not a valid JPEG", 400 if min(img.width, img.height) < 576: return "Image must have smallest dimension of at least 576px", 400 bucket = get_bucket() file.seek(0) bucket.upload_fileobj(file, const.BUCKET_NAME, item_id) db.inventory.update_one({ "_id": ObjectId(item_id) }, {"$set": {"has_photo": True}}) delete_thumbs(item) return redirect("/m/inventory/%s/view" % item_id) else: return "File is not valid", 400 @page_inventory.route("/m/photo//") def get_scaled_photo(item_id=None, dimension=0, retry=2): dimension = int(dimension) #why if retry <= 0: return "", 500 if dimension not in [240, 480, 576, 600]: return abort(404) item = db.inventory.find_one(filter = { "_id": ObjectId(item_id) }, projection = { "thumbs": 1 }) if not item: return abort(404) if item.get("thumbs", {}).get(str(dimension)): thumb = item["thumbs"][str(dimension)] img = get_item(thumb["name"]) if not img: delete_thumbs(item) return get_scaled_photo(item_id, dimension, retry - 1) img = JPEGImage(blob=img) else: make_thumb(item, dimension) return get_scaled_photo(item_id, dimension, retry - 1) response = make_response(img.as_blob()) response.headers.set('Content-Type', 'image/jpeg') response.cache_control.public = True response.cache_control.immutable = True response.cache_control.max_age = 31536000 return response def get_item(key): try: bucket = get_bucket() r = bucket.get_object(Bucket=const.BUCKET_NAME, Key=key) return r['Body'].read() except: return None def make_thumb(item, dimension): img = get_item(str(item["_id"])) img = scale_image(img, dimension) bucket = get_bucket() thumb_name = "thumb_%s_%d" % (item["_id"], dimension) bucket.put_object(Body=img.as_blob(), Bucket=const.BUCKET_NAME, Key=thumb_name) db.inventory.update_one({ "_id": ObjectId(item["_id"]), }, { "$set": { "thumbs.%d.name" % dimension : thumb_name, }, }) return img def scale_image(img, dimension): img = JPEGImage(blob=img) if img.exif_orientation: img = img.exif_autotransform() if img.width < img.height: img = img.downscale(dimension, img.height * dimension // img.width, 100) else: img = img.downscale(img.width * dimension // img.height, dimension, 100) ratio = 4 / 3 crop_height = min(img.width / ratio, img.height) crop_width = crop_height * ratio crop_x = int((img.width - crop_width) / 2) & ~7 crop_y = int((img.height - crop_height) / 2) & ~7 return img.crop(crop_x, crop_y, int(crop_width), int(crop_height)) def delete_thumbs(item): bucket = get_bucket() for _, thumb in item.get("thumbs", {}).items(): if thumb.get("name"): bucket.delete_object(Bucket=const.BUCKET_NAME, Key=thumb["name"]) db.inventory.update_one({ "_id": ObjectId(item["_id"]) }, { "$unset": { "thumbs": "" }, }) @page_inventory.route("/m/inventory/assign-slug/") @page_inventory.route("/m/inventory/clone-with-slug/") @page_inventory.route("/m/inventory") def view_inventory(slug=None): user = request.args.get("user", type=ObjectId) owner = request.args.get("owner", type=ObjectId) grid = request.args.get("grid", type=str) == "true" owner_disabled = request.args.get("owner_disabled", type=str) == "true" user_disabled = request.args.get("user_disabled", type=str) == "true" sort_fields = { "last_seen": "Last seen", "inventory_audit_timestamp": "Last audited", "name": "Name", "type": "Inventory type", } fields = [ ("type", "Exclude type", list), ("tags", "Tags", list), ] q = {"type": {"$ne": "token"}} template = "inventory.html" public_view = False if not read_user(): q.update({"inventory.public": True}) template = "inventory_public.html" public_view = True else: fields.append(("inventory.owner.username", "Owner", str)) fields.append(("inventory.user.username", "User", str)) if slug and not public_view: template = "inventory_pick.html" if request.path.startswith("/m/inventory/clone-with-slug"): action_path = "clone-by-slug" action_label = "Clone" action_help = "Cloning item to new slug" pick = True elif request.path.startswith("/m/inventory/assign-slug"): q.update({"shortener.slug": None}) action_path = "edit-by-slug" action_label = "Assign slug" action_help = "Assigning slug" pick = True if grid: template = "inventory_grid.html" if user: q["inventory.user.username"] = user if owner: q["inventory.owner.username"] = owner q2 = {"type": {"$ne": "token"}} if not public_view and owner_disabled: q2["Owner.enabled"] = False if not public_view and user_disabled: q2["User.enabled"] = False q, selectors, sort_field, sort_field_final, sort_direction = build_query(dict(q), fields, sort_fields) items = db.inventory.aggregate([ { "$match": q }, { "$match": q2 }, { "$sort": { sort_field_final : 1 if sort_direction == "asc" else -1 } } ]) return render_template(template, **locals()) @page_inventory.route("/m/inventory//audit", methods=["POST"]) @login_required(groups=["k-space:janitors"]) def view_inventory_audit(item_id): user = read_user() db.inventory.update_one({ "_id": ObjectId(item_id), }, { "$set": { "inventory.audit.username": user["username"], "inventory.audit.timestamp": datetime.utcnow(), }, }) return redirect("/m/inventory/%s/view" % item_id) @page_inventory.route("/m/inventory//claim", methods=["POST"]) @login_required def view_inventory_claim(item_id): user = read_user() db.inventory.update_one({ "_id": ObjectId(item_id), "inventory.owner.username": None }, { "$set": { "inventory.owner.username": user["username"], "inventory.owner.display_name": user["name"], }, }) return redirect("/m/inventory/%s/view" % item_id) @page_inventory.route("/m/inventory//use", methods=["POST"]) @login_required def view_inventory_use(item_id): user = read_user() item = db.inventory.find_one({ "_id": ObjectId(item_id), "inventory.usable": True, "inventory.user.username": None }) if not item: return abort(404) db.inventory.update_one({ "_id": ObjectId(item["_id"]) }, { "$set": { "inventory.user.username": user["username"], "inventory.user.display_name": user["name"], }, }) item_name = format_name(item) msg = "%s has started using %s" % (user["name"], item_name) if item.get("shortener") and item["shortener"].get("slug"): msg += ("\nk6.ee/%s" % item["shortener"]["slug"]) spam(msg) return redirect("/m/inventory/%s/view" % item_id) @page_inventory.route("/m/inventory//vacate", methods=["POST"]) @login_required def view_inventory_vacate(item_id): user = read_user() item = db.inventory.find_one({ "_id": ObjectId(item_id), "inventory.usable": True, "inventory.user.username": user["username"] }) if not item: return abort(404) db.inventory.update_one({ "_id": ObjectId(item["_id"]) }, { "$unset": { "inventory.user": "" }, }) item_name = format_name(item) msg = "%s has stopped using %s" % (user["name"], item_name) if item.get("shortener") and item["shortener"].get("slug"): msg += ("\nk6.ee/%s" % item["shortener"]["slug"]) spam(msg) return redirect("/m/inventory/%s/view" % item_id)