inventory-app/inventory-app/main.py
Madis Mägi 2cfccc22a0
All checks were successful
ci/woodpecker/manual/woodpecker Pipeline was successful
Move Doorboy page over from members site
2023-08-07 14:50:05 +03:00

222 lines
7.0 KiB
Python
Executable File

#!/usr/bin/env python3
import hashlib
import json
import secrets
import unicodedata
import urllib
from collections import Counter
from configparser import ConfigParser
from datetime import datetime, timedelta
from email.mime.multipart import MIMEMultipart
from email.mime.text import MIMEText
from functools import wraps
from time import sleep
import bleach
import flask
import jinja2
import ldap
import markdown
import pymongo
import requests
import safe
from flask import Flask, abort, g, make_response, redirect, render_template, request, session
from flask_wtf import FlaskForm, RecaptchaField
from jinja2 import Environment, FileSystemLoader
from ldap import modlist
from markupsafe import Markup
from prometheus_flask_exporter import PrometheusMetrics
from pymongo import MongoClient
from werkzeug import exceptions
from wtforms import (
BooleanField,
PasswordField,
SelectField,
SelectMultipleField,
StringField,
ValidationError,
validators,
widgets,
)
from wtforms.fields import FormField
from wtforms.fields import DateField, EmailField
from wtforms.form import Form
from wtforms.validators import DataRequired
import const
from common import CustomForm, devenv, flatten, format_name, spam, users_lookup, User
from inventory import page_inventory
from oidc import page_oidc, login_required, read_user
from doorboy import page_doorboy
from api import page_api
def check_foreign_key_format(item):
owner = item.get("inventory", {}).get("owner", {})
user = item.get("inventory", {}).get("user", {})
if owner.get("foreign_id", False):
return not bool(owner.get("username"))
if user.get("foreign_id", False):
return not bool(user.get("username"))
def render_user_display_name(username):
if not username:
return ""
display_name = users_lookup.get(username, User()).display_name
return display_name or username
def render_markdown(text):
if not text:
return ""
cleaned = bleach.clean(text)
md = markdown.markdown(cleaned)
return Markup(md)
def render_timeago(dt):
if not dt:
return ""
return Markup("<time class=\"timeago\" datetime=\"%s+0000\">%s</time>" % (dt, dt))
def render_last_seen(machine, stale_minutes=30):
stale_timestamp = datetime.utcnow() - timedelta(minutes=stale_minutes)
if machine and machine.get("last_seen"):
if machine.get("last_seen") < stale_timestamp:
return Markup("opacity: 25%;")
def render_owner_link(item):
username = item.get("inventory", {}).get("owner", {}).get("username", False)
if username:
return Markup("<a href=\"/m/user/%s\">%s</a>" % (username, render_user_display_name(username)))
else:
return Markup("<form method=\"post\" action=\"/m/inventory/%s/claim\"><button type=\"submit\" class=\"waves-effect waves-light btn\">Mine!</button></form>" % (item["_id"]))
def render_user_link(item):
username = item.get("inventory", {}).get("user", {}).get("username", False)
if username:
return Markup("<a href=\"/m/user/%s\">%s</a>" % (username, render_user_display_name(username)))
elif item.get("inventory", {}).get("usable"):
return Markup("<form method=\"post\" action=\"/m/inventory/%s/use\"><button type=\"submit\" class=\"waves-effect waves-light btn\">I use it now!</button></form>" % (item["_id"]))
else:
return ""
def is_list(value):
return isinstance(value, list)
jinja2.filters.FILTERS['format_name'] = format_name
jinja2.filters.FILTERS['markdown'] = render_markdown
jinja2.filters.FILTERS['timeago'] = render_timeago
jinja2.filters.FILTERS['last_seen'] = render_last_seen
jinja2.filters.FILTERS['owner_link'] = render_owner_link
jinja2.filters.FILTERS['user_link'] = render_user_link
jinja2.filters.FILTERS['is_list'] = is_list
jinja2.filters.FILTERS['quote_plus'] = lambda u: urllib.parse.quote_plus(u)
jinja2.filters.FILTERS['check_foreign_key_format'] = check_foreign_key_format
jinja2.filters.FILTERS['display_name'] = render_user_display_name
env = Environment(loader=FileSystemLoader('templates/'))
class ReverseProxied(object):
def __init__(self, app):
self.app = app
def __call__(self, environ, start_response):
scheme = environ.get('HTTP_X_FORWARDED_PROTO')
if scheme and scheme in ['http', 'https']:
environ['wsgi.url_scheme'] = scheme
return self.app(environ, start_response)
app = Flask(__name__)
app.wsgi_app = ReverseProxied(app.wsgi_app)
app.register_blueprint(page_inventory)
app.register_blueprint(page_oidc)
app.register_blueprint(page_api)
app.register_blueprint(page_doorboy)
metrics = PrometheusMetrics(app, group_by="path")
app.config['SECRET_KEY'] = const.SECRET_KEY
mongoclient = MongoClient(const.MONGO_URI)
mongodb = mongoclient.get_default_database()
mongodb.member.create_index("ad.username", sparse=True, unique=True)
mongodb.inventory.create_index("shortener.slug", sparse=True, unique=True)
mongodb.inventory.create_index("token.uid_hash", sparse=True, unique=True)
#mongodb.inventory.create_index("token.uid_hash", unique=True)
CATEGORY_COLORS = (
('membership-fee', '#acc236'),
('rent', '#f53794'),
('accounting-fees', '#f67019'),
('office-supplies', '#58595b'),
('server-room', '#166a8f'),
('training', '#00a950'),
('membership-fee-company', '#4dc9f6'),
('snack-machine', '#8549ba'),
('other', '#537bc4'),
)
INCOME_CATEGORIES = "membership-fee", "workshop"
cp = ConfigParser()
cp.read("/config/provision.conf")
PROVISION_CONFIGS = {}
for section in cp.sections():
if not cp.getint(section, "enabled"):
continue
PROVISION_CONFIGS[section] = {
"entrypoint": cp.get(section, "entrypoint"),
"admin": cp.get(section, "admin"),
"type": cp.get(section, "type"),
"comment": cp.get(section, "comment"),
"username": cp.get(section, "username"),
"password": cp.get(section, "password"),
"port": cp.get(section, "port"),
}
def dev_only(f):
@wraps(f)
def decorated_function(*args, **kwargs):
if not devenv:
return abort(404)
return f(*args, **kwargs)
return decorated_function
@app.context_processor
def inject_context():
return dict(devenv=devenv, inventory_assets_base_url=const.INVENTORY_ASSETS_BASE_URL, members_host=const.MEMBERS_HOST)
def name_check(form, field):
if field.data != field.data.strip():
raise ValidationError("Name must not contain leading or trailing space")
if any(c.isdigit() for c in field.data):
raise ValidationError("Name must not contain numbers")
# just to manually enter login_required annotation
@app.route("/login")
@login_required
def login_dummy():
return redirect("/m/inventory")
@app.route("/")
def index():
return redirect("/m/inventory")
@app.route("/me")
@login_required
def view_profile():
user = read_user()
return f"Hello {user['name']}"
@app.route("/hello")
def view_hello():
return "Hello!"
class MultiCheckboxField(SelectMultipleField):
widget = widgets.ListWidget(prefix_label=False)
option_widget = widgets.CheckboxInput()
if __name__ == '__main__':
app.run(debug=devenv, host='::')