Files
inventory-app/inventory-app/common.py
2025-12-22 18:17:06 +02:00

139 lines
4.9 KiB
Python

import collections.abc
import os
from dataclasses import dataclass, field
from functools import wraps
from typing import List
import const
from bson.objectid import ObjectId
from flask import g, request
from flask_wtf import FlaskForm
from kubernetes import client, config
from oidc import read_user
from pymongo import MongoClient
devenv = const.ENVIRONMENT_TYPE == "DEV"
OIDC_USERS_NAMESPACE = os.getenv("OIDC_USERS_NAMESPACE")
db = MongoClient(const.MONGO_URI).get_default_database()
@dataclass
class User:
username: str = None
display_name: str = None
slack_id: str = None
user_type: str = None
groups: List[str] = field(default_factory=list)
def __getitem__(self, item):
return getattr(self, item)
def get_users_inner():
config.load_incluster_config()
api_instance = client.CustomObjectsApi()
ret = api_instance.list_namespaced_custom_object("codemowers.cloud", "v1beta1", OIDC_USERS_NAMESPACE, "oidcusers")
for item in ret["items"]:
username = item['metadata']['name']
display_name = item.get("status", {}).get("profile", {}).get("name", None)
slack_id = item.get("status", {}).get("slackId", None)
user_type = item.get("spec", {}).get("type", None) #TODO: read from status when ready
groups = []
for group in item.get("status", {}).get("groups", []):
groups.append(f"{group['prefix']}:{group['name']}")
yield User(username, display_name, slack_id, user_type, groups)
def get_users():
users = list(get_users_inner())
users_lookup = {u.username : u for u in users}
return users, users_lookup
class CustomForm(FlaskForm):
# quite hacky
def populate_dict(self, d):
for name, field in self._fields.items():
if name != "csrf_token":
d[name] = field.data
def inventory_fetch(owner=True, item_type=None):
def wrapper(f):
@wraps(f)
def decorated_function(*args, **kwargs):
user = read_user()
d = {
"_id": ObjectId(kwargs.pop("item_id"))
}
if item_type:
d["type"] = item_type
if owner:
d["inventory.owner.username"] = user["username"]
kwargs["item"] = db.inventory.find_one(d)
return f(*args, **kwargs)
return decorated_function
return wrapper
def format_name(item):
if item.get("locker"):
return "Locker {}".format(item.get("locker").get("number"))
elif item.get("mac"):
return "Machine {} ({})".format(item.get("name", "unidentified"), item.get("mac"))
elif item.get("desk"):
return "Desk {}".format(item.get("desk").get("number"))
return item.get("name", "Unidentified item")
def flatten(d, parent_key='', sep='.'):
items = []
for k, v in d.items():
new_key = parent_key + sep + k if parent_key else k
if isinstance(v, collections.abc.MutableMapping):
items.extend(flatten(v, new_key, sep=sep).items())
else:
items.append((new_key, v))
return dict(items)
def build_query(base_query, fields=[], sort_fields={}):
top_usernames= ['k-space']
selectors = []
q = base_query.copy()
for attr, title, tp in fields:
key = attr.replace(".", "_")
if tp == list:
val = request.args.getlist(key)
val = list(map(str, val))
else:
val = request.args.get(key, type=tp)
results = db.inventory.find(base_query).distinct(attr)
if key in ("inventory_owner_username", "inventory_user_username"):
results = [{"username": u, "display_name": g.users_lookup.get(u, User()).display_name or u} for u in results]
results = sorted(results, key = lambda k: k["display_name"])
top = [k for k in results if k["username"] in top_usernames]
rest = [k for k in results if k["username"] not in top_usernames]
results = top + rest
elif tp != list:
results = sorted(results)
results = list(map(tp, results))
selectors.append((key, title, results, val))
if val:
if tp == list and attr == "type":
q[attr] = { "$nin" : val }
elif tp == list:
q[attr] = { "$in" : val }
else:
q[attr] = val
for s in selectors:
if s[0] != "type":
sort_fields[s[0]] = s[1]
sort_field = request.args.get("sort_field", "", type=str)
if sort_field not in sort_fields.keys():
sort_field = "last_seen"
if sort_field not in ["nic_vendor", "last_seen"]:
sort_field_final = sort_field.replace("_", ".")
else:
sort_field_final = sort_field
sort_direction = request.args.get("sort_direction", type=str)
if sort_direction not in ["asc", "desc"]:
sort_direction = "desc"
return q, selectors, sort_field, sort_field_final, sort_direction