Files
inventory-app/inventory-app/main.py
rasmus 194420d375 drop dead s3 code
use region from env as well
2025-12-29 00:19:57 +02:00

239 lines
8.0 KiB
Python
Executable File

#!/usr/bin/env python3
import threading
import urllib
from configparser import ConfigParser
from datetime import datetime, timedelta
from functools import wraps
from http.server import BaseHTTPRequestHandler, HTTPServer
import bleach
import const
import jinja2
import markdown
import segno
from common import User, devenv, format_name, get_users
from doorboy import page_doorboy
from flask import Flask, abort, g, redirect, request
from inventory import page_inventory
from jinja2 import Environment, FileSystemLoader
from markupsafe import Markup
from oidc import login_required, page_oidc
from prometheus_client import CollectorRegistry, Gauge, generate_latest
from prometheus_flask_exporter import PrometheusMetrics
from pymongo import MongoClient
from wtforms import SelectMultipleField, ValidationError, widgets
def check_foreign_key_format(item):
owner = item.get("inventory", {}).get("owner", {})
user = item.get("inventory", {}).get("user", {})
if owner.get("foreign_id", False):
return not bool(owner.get("username"))
if user.get("foreign_id", False):
return not bool(user.get("username"))
def render_user_display_name(username):
if not username:
return ""
display_name = g.users_lookup.get(username, User()).display_name
return display_name or username
def render_markdown(text):
if not text:
return ""
cleaned = bleach.clean(text)
md = markdown.markdown(cleaned)
return Markup(md)
def render_timeago(dt):
if not dt:
return ""
return Markup("<time class=\"timeago\" datetime=\"%s+0000\">%s</time>" % (dt, dt))
def render_last_seen(machine, stale_minutes=30):
stale_timestamp = datetime.utcnow() - timedelta(minutes=stale_minutes)
if machine and machine.get("last_seen"):
if machine.get("last_seen") < stale_timestamp:
return Markup("opacity: 25%;")
def render_owner_link(item):
username = item.get("inventory", {}).get("owner", {}).get("username", False)
if username:
return Markup("<a href=\"/m/user/%s\">%s</a>" % (username, render_user_display_name(username)))
else:
return Markup("<form method=\"post\" action=\"/m/inventory/%s/claim\"><button type=\"submit\" class=\"waves-effect waves-light btn\">Mine!</button></form>" % (item["_id"]))
def render_user_link(item):
username = item.get("inventory", {}).get("user", {}).get("username", False)
if username:
return Markup("<a href=\"/m/user/%s\">%s</a>" % (username, render_user_display_name(username)))
elif item.get("inventory", {}).get("usable"):
return Markup("<form method=\"post\" action=\"/m/inventory/%s/use\"><button type=\"submit\" class=\"waves-effect waves-light btn\">I use it now!</button></form>" % (item["_id"]))
else:
return ""
def is_list(value):
return isinstance(value, list)
def render_qr_code(slug):
qr = segno.make(str("http://k6.ee/" + slug), micro=False, mode='byte', mask=3, error='L', boost_error=False)
return Markup(qr.svg_inline(light='black', dark='yellow', scale=3, border=1))
def render_location_link(location: str):
if location == "":
return
linkstart = location.find("k6.ee/")
if linkstart == -1:
return
return location[linkstart:].split(" ", 1)[0]
jinja2.filters.FILTERS['format_name'] = format_name
jinja2.filters.FILTERS['markdown'] = render_markdown
jinja2.filters.FILTERS['timeago'] = render_timeago
jinja2.filters.FILTERS['last_seen'] = render_last_seen
jinja2.filters.FILTERS['owner_link'] = render_owner_link
jinja2.filters.FILTERS['user_link'] = render_user_link
jinja2.filters.FILTERS['is_list'] = is_list
jinja2.filters.FILTERS['quote_plus'] = lambda u: urllib.parse.quote_plus(u)
jinja2.filters.FILTERS['check_foreign_key_format'] = check_foreign_key_format
jinja2.filters.FILTERS['display_name'] = render_user_display_name
jinja2.filters.FILTERS['qr_code'] = render_qr_code
jinja2.filters.FILTERS['location_link'] = render_location_link
env = Environment(loader=FileSystemLoader('templates/'))
class ReverseProxied(object):
def __init__(self, app):
self.app = app
def __call__(self, environ, start_response):
scheme = environ.get('HTTP_X_FORWARDED_PROTO')
if scheme and scheme in ['http', 'https']:
environ['wsgi.url_scheme'] = scheme
return self.app(environ, start_response)
app = Flask(__name__)
app.secret_key = const.COOKIES_SECRET_KEY
app.wsgi_app = ReverseProxied(app.wsgi_app)
app.register_blueprint(page_inventory)
app.register_blueprint(page_oidc)
app.register_blueprint(page_doorboy)
metrics = PrometheusMetrics(app, group_by="path")
mongoclient = MongoClient(const.MONGO_URI)
mongodb = mongoclient.get_default_database()
mongodb.inventory.create_index("shortener.slug", sparse=True, unique=True)
mongodb.inventory.create_index("token.uid_hash", sparse=True, unique=True)
#mongodb.inventory.create_index("token.uid_hash", unique=True)
CATEGORY_COLORS = (
('membership-fee', '#acc236'),
('rent', '#f53794'),
('accounting-fees', '#f67019'),
('office-supplies', '#58595b'),
('server-room', '#166a8f'),
('training', '#00a950'),
('membership-fee-company', '#4dc9f6'),
('snack-machine', '#8549ba'),
('other', '#537bc4'),
)
INCOME_CATEGORIES = "membership-fee", "workshop"
cp = ConfigParser()
cp.read("/config/provision.conf")
PROVISION_CONFIGS = {}
for section in cp.sections():
if not cp.getint(section, "enabled"):
continue
PROVISION_CONFIGS[section] = {
"entrypoint": cp.get(section, "entrypoint"),
"admin": cp.get(section, "admin"),
"type": cp.get(section, "type"),
"comment": cp.get(section, "comment"),
"username": cp.get(section, "username"),
"password": cp.get(section, "password"),
"port": cp.get(section, "port"),
}
def dev_only(f):
@wraps(f)
def decorated_function(*args, **kwargs):
if not devenv:
return abort(404)
return f(*args, **kwargs)
return decorated_function
@app.before_request
def do_before_request():
g.users, g.users_lookup = get_users()
@app.context_processor
def inject_context():
return dict(devenv=devenv)
def name_check(form, field):
if field.data != field.data.strip():
raise ValidationError("Name must not contain leading or trailing space")
if any(c.isdigit() for c in field.data):
raise ValidationError("Name must not contain numbers")
# just to manually enter login_required annotation
@app.route("/login")
@login_required
def login_dummy():
return redirect(request.args.get("redirect_url") or "/m/inventory")
@app.route("/")
def index():
return redirect("/m/inventory")
def get_mac_metrics():
registry = CollectorRegistry()
inventory_item = Gauge('inventory_item_info', 'Information about inventory item', ['mac', 'slug', 'owner', 'user', 'name'], registry=registry)
items = mongodb.inventory.find(
filter = { "mac": { "$exists": True } },
projection = {
"mac": 1,
"name": 1,
"shortener.slug": 1,
"inventory.owner": 1,
"inventory.user": 1,
}
)
for item in items:
inventory_item.labels(mac=item.mac, owner=item.get("owner", {}).get("username"), user=item.get("user", {}).get("username"), slug=item.slug, name=item.name).set(1)
return generate_latest(registry)
class MetricsHandler(BaseHTTPRequestHandler):
def do_GET(self):
if self.path == '/metrics':
self.send_response(200)
self.send_header('Content-Type', 'text/plain')
self.end_headers()
self.wfile.write(get_mac_metrics())
else:
self.send_response(404)
self.end_headers()
class MultiCheckboxField(SelectMultipleField):
widget = widgets.ListWidget(prefix_label=False)
option_widget = widgets.CheckboxInput()
if __name__ == '__main__':
if not devenv:
metrics_address = ('', 8000)
httpd = HTTPServer(metrics_address, MetricsHandler)
metrics_thread = threading.Thread(target=httpd.serve_forever)
metrics_thread.daemon = True
metrics_thread.start()
app.run(debug=devenv, host='::')