split image functions to inventory_image.py

This commit is contained in:
2025-12-29 00:50:11 +02:00
parent 194420d375
commit 44c64cc44e
3 changed files with 161 additions and 149 deletions

View File

@@ -1,26 +1,22 @@
import re
import boto3
import pymongo
import urllib
from datetime import datetime, date, timedelta
from botocore.exceptions import ClientError
from datetime import date, datetime, timedelta
import const
import inventory_image
import pymongo
from bson.objectid import ObjectId
from common import CustomForm, build_query, flatten
from flask import Blueprint, abort, g, redirect, render_template, request, url_for
from jpegtran import JPEGImage
from oidc import do_login, login_required, read_user
from pymongo import MongoClient
from werkzeug.utils import secure_filename
from wtforms import BooleanField, SelectField, StringField
from wtforms.fields import FormField
from wtforms.form import Form
from wtforms.validators import Length
import const
from common import CustomForm, build_query, flatten
from oidc import login_required, read_user, do_login
page_inventory = Blueprint("inventory", __name__)
db = MongoClient(const.MONGO_URI).get_default_database()
channel = "inventory"
@login_required
@page_inventory.route("/m/inventory/by-mac/<mac>", methods=['GET'])
@@ -50,25 +46,11 @@ def view_inventory_view(item_id):
can_audit = "k-space:inventory:audit" in user.get("groups", [])
can_edit = check_edit_permission(item_id)
is_using = item_user and item_user == user["username"]
photo_url = get_image_url(item_id)
photo_url = inventory_image.get_image_url(item_id)
# pylance: disable=unused-variable
constants = {"MACADDRESS_OUTLINK_BASEURL": const.MACADDRESS_OUTLINK_BASEURL}
return render_template(template , **locals())
def get_image_url(item_id):
bucket=get_bucket()
try:
return bucket.generate_presigned_url(
ClientMethod='get_object',
Params={
'Bucket': const.BUCKET_NAME,
'Key': item_id
},
ExpiresIn=3600
)
except ClientError:
return None
def fetch_members_select():
top_usernames= ['k-space', None]
choices = [(None, None)]
@@ -256,129 +238,6 @@ def save_inventory_item(item_id=None, **_):
return render_template("inventory_edit.html", **locals())
return redirect("/m/inventory/%s/view" % item_id)
def is_image_ext(filename):
return '.' in filename and \
filename.rsplit('.', 1)[1].lower() in ["jpg", "jpeg"]
# AWS S3 credentials / env is automagically imported https://boto3.amazonaws.com/v1/documentation/api/latest/guide/configuration.html#using-environment-variables
# AWS_ACCESS_KEY_ID, AWS_DEFAULT_REGION, etc
def get_bucket(): #TODO
return boto3.client('s3',
endpoint_url=const.AWS_S3_ENDPOINT_URL,
config=boto3.session.Config(signature_version='s3v4'))
@page_inventory.route("/inventory/<item_id>/upload-photo", methods=["POST"])
@login_required
def upload_photo(item_id):
user = read_user()
item = db.inventory.find_one(filter = { "_id": ObjectId(item_id) }, projection = { "thumbs": 1 })
if not item:
return "Item not found", 404
if item.get("type") == "key" and "k-space:inventory:keys" not in user.get("groups", []):
return abort(403)
if "file" not in request.files:
return "No file part", 400
file = request.files["file"]
if not file.filename:
return "No selected file", 400
if file and is_image_ext(secure_filename(file.filename)):
try:
file.seek(0)
img = JPEGImage(blob=file.read())
except:
return "Not a valid JPEG", 400
if min(img.width, img.height) < 576:
return "Image must have smallest dimension of at least 576px", 400
bucket = get_bucket()
file.seek(0)
bucket.upload_fileobj(file, const.BUCKET_NAME, item_id)
db.inventory.update_one({ "_id": ObjectId(item_id) }, {"$set": {"has_photo": True}})
delete_thumbs(item)
return redirect("/m/inventory/%s/view" % item_id)
else:
return "File is not valid", 400
@page_inventory.app_template_filter('thumbnail')
def thumbnail_filter(item_id, dimension):
return get_scaled_photo(item_id, dimension)
@page_inventory.route("/m/photo/<item_id>/<dimension>")
def get_scaled_photo(item_id=None, dimension=0, retry=2):
dimension = int(dimension) #why
if retry <= 0:
return "", 500
if dimension not in [240, 480, 576, 600]:
return abort(404)
item = db.inventory.find_one(filter = { "_id": ObjectId(item_id) }, projection = { "thumbs": 1 })
if not item:
return abort(404)
if item.get("thumbs", {}).get(str(dimension)):
thumb = item["thumbs"][str(dimension)]
img_url = get_image_url(thumb["name"])
if not img_url:
delete_thumbs(item)
return get_scaled_photo(item_id, dimension, retry - 1)
else:
make_thumb(item, dimension)
return get_scaled_photo(item_id, dimension, retry - 1)
return img_url
def get_item(key):
try:
bucket = get_bucket()
r = bucket.get_object(Bucket=const.BUCKET_NAME, Key=key)
return r['Body'].read()
except:
return None
def make_thumb(item, dimension):
img = get_item(str(item["_id"]))
img = scale_image(img, dimension)
bucket = get_bucket()
thumb_name = "thumb_%s_%d" % (item["_id"], dimension)
bucket.put_object(Body=img.as_blob(), Bucket=const.BUCKET_NAME, Key=thumb_name)
db.inventory.update_one({
"_id": ObjectId(item["_id"]),
}, {
"$set": {
"thumbs.%d.name" % dimension : thumb_name,
},
})
return img
def scale_image(img, dimension):
img = JPEGImage(blob=img)
if img.exif_orientation:
img = img.exif_autotransform()
if img.width < img.height:
img = img.downscale(dimension, img.height * dimension // img.width, 100)
else:
img = img.downscale(img.width * dimension // img.height, dimension, 100)
ratio = 4 / 3
crop_height = min(img.width / ratio, img.height)
crop_width = crop_height * ratio
crop_x = int((img.width - crop_width) / 2) & ~7
crop_y = int((img.height - crop_height) / 2) & ~7
return img.crop(crop_x, crop_y, int(crop_width), int(crop_height))
def delete_thumbs(item):
bucket = get_bucket()
for _, thumb in item.get("thumbs", {}).items():
if thumb.get("name"):
bucket.delete_object(Bucket=const.BUCKET_NAME, Key=thumb["name"])
db.inventory.update_one({
"_id": ObjectId(item["_id"])
}, {
"$unset": {
"thumbs": ""
},
})
@page_inventory.route("/m/inventory/assign-mac/<slug>")
@page_inventory.route("/m/inventory/assign-slug/<slug>")

View File

@@ -0,0 +1,151 @@
import boto3
import const
from botocore.exceptions import ClientError
from bson.objectid import ObjectId
from flask import Blueprint, abort, redirect, request
from jpegtran import JPEGImage
from oidc import login_required, read_user
from pymongo import MongoClient
from werkzeug.utils import secure_filename
page_inventory_image = Blueprint("inventory_image", __name__)
db = MongoClient(const.MONGO_URI).get_default_database()
def is_image_ext(filename):
return '.' in filename and \
filename.rsplit('.', 1)[1].lower() in ["jpg", "jpeg"]
# AWS S3 credentials / env is automagically imported https://boto3.amazonaws.com/v1/documentation/api/latest/guide/configuration.html#using-environment-variables
# AWS_ACCESS_KEY_ID, AWS_DEFAULT_REGION, etc
def get_bucket(): #TODO
return boto3.client('s3',
endpoint_url=const.AWS_S3_ENDPOINT_URL,
config=boto3.session.Config(signature_version='s3v4'))
@page_inventory_image.route("/inventory/<item_id>/upload-photo", methods=["POST"])
@login_required
def upload_photo(item_id):
user = read_user()
item = db.inventory.find_one(filter = { "_id": ObjectId(item_id) }, projection = { "thumbs": 1 })
if not item:
return "Item not found", 404
if item.get("type") == "key" and "k-space:inventory:keys" not in user.get("groups", []):
return abort(403)
if "file" not in request.files:
return "No file part", 400
file = request.files["file"]
if not file.filename:
return "No selected file", 400
if file and is_image_ext(secure_filename(file.filename)):
try:
file.seek(0)
img = JPEGImage(blob=file.read())
except:
return "Not a valid JPEG", 400
if min(img.width, img.height) < 576:
return "Image must have smallest dimension of at least 576px", 400
bucket = get_bucket()
file.seek(0)
bucket.upload_fileobj(file, const.BUCKET_NAME, item_id)
db.inventory.update_one({ "_id": ObjectId(item_id) }, {"$set": {"has_photo": True}})
delete_thumbs(item)
return redirect("/m/inventory/%s/view" % item_id)
else:
return "File is not valid", 400
@page_inventory_image.app_template_filter('thumbnail')
def thumbnail_filter(item_id, dimension):
return get_scaled_photo(item_id, dimension)
@page_inventory_image.route("/m/photo/<item_id>/<dimension>")
def get_scaled_photo(item_id=None, dimension=0, retry=2):
dimension = int(dimension) #why
if retry <= 0:
return "", 500
if dimension not in [240, 480, 576, 600]:
return abort(404)
item = db.inventory.find_one(filter = { "_id": ObjectId(item_id) }, projection = { "thumbs": 1 })
if not item:
return abort(404)
if item.get("thumbs", {}).get(str(dimension)):
thumb = item["thumbs"][str(dimension)]
img_url = get_image_url(thumb["name"])
if not img_url:
delete_thumbs(item)
return get_scaled_photo(item_id, dimension, retry - 1)
else:
make_thumb(item, dimension)
return get_scaled_photo(item_id, dimension, retry - 1)
return img_url
def get_item(key):
try:
bucket = get_bucket()
r = bucket.get_object(Bucket=const.BUCKET_NAME, Key=key)
return r['Body'].read()
except:
return None
def make_thumb(item, dimension):
img = get_item(str(item["_id"]))
img = scale_image(img, dimension)
bucket = get_bucket()
thumb_name = "thumb_%s_%d" % (item["_id"], dimension)
bucket.put_object(Body=img.as_blob(), Bucket=const.BUCKET_NAME, Key=thumb_name)
db.inventory.update_one({
"_id": ObjectId(item["_id"]),
}, {
"$set": {
"thumbs.%d.name" % dimension : thumb_name,
},
})
return img
def scale_image(img, dimension):
img = JPEGImage(blob=img)
if img.exif_orientation:
img = img.exif_autotransform()
if img.width < img.height:
img = img.downscale(dimension, img.height * dimension // img.width, 100)
else:
img = img.downscale(img.width * dimension // img.height, dimension, 100)
ratio = 4 / 3
crop_height = min(img.width / ratio, img.height)
crop_width = crop_height * ratio
crop_x = int((img.width - crop_width) / 2) & ~7
crop_y = int((img.height - crop_height) / 2) & ~7
return img.crop(crop_x, crop_y, int(crop_width), int(crop_height))
def delete_thumbs(item):
bucket = get_bucket()
for _, thumb in item.get("thumbs", {}).items():
if thumb.get("name"):
bucket.delete_object(Bucket=const.BUCKET_NAME, Key=thumb["name"])
db.inventory.update_one({
"_id": ObjectId(item["_id"])
}, {
"$unset": {
"thumbs": ""
},
})
def get_image_url(item_id):
bucket=get_bucket()
try:
return bucket.generate_presigned_url(
ClientMethod='get_object',
Params={
'Bucket': const.BUCKET_NAME,
'Key': item_id
},
ExpiresIn=3600
)
except ClientError:
return None

View File

@@ -15,6 +15,7 @@ from common import User, devenv, format_name, get_users
from doorboy import page_doorboy
from flask import Flask, abort, g, redirect, request
from inventory import page_inventory
from inventory_image import page_inventory_image
from jinja2 import Environment, FileSystemLoader
from markupsafe import Markup
from oidc import login_required, page_oidc
@@ -119,6 +120,7 @@ app = Flask(__name__)
app.secret_key = const.COOKIES_SECRET_KEY
app.wsgi_app = ReverseProxied(app.wsgi_app)
app.register_blueprint(page_inventory)
app.register_blueprint(page_inventory_image)
app.register_blueprint(page_oidc)
app.register_blueprint(page_doorboy)
metrics = PrometheusMetrics(app, group_by="path")