Madis Mägi
8b9ac4b92b
All checks were successful
ci/woodpecker/manual/woodpecker Pipeline was successful
121 lines
4.1 KiB
Python
121 lines
4.1 KiB
Python
import os
|
|
import collections.abc
|
|
from functools import wraps
|
|
|
|
import requests
|
|
from bson.objectid import ObjectId
|
|
from flask import g, request
|
|
from flask_wtf import FlaskForm
|
|
from pymongo import MongoClient
|
|
from kubernetes import client, config
|
|
|
|
import const
|
|
|
|
devenv = const.ENVIRONMENT_TYPE == "DEV"
|
|
OIDC_USERS_NAMESPACE = os.getenv("OIDC_USERS_NAMESPACE")
|
|
|
|
db = MongoClient(const.MONGO_URI).get_default_database()
|
|
|
|
def get_users():
|
|
config.load_incluster_config()
|
|
api_instance = client.CustomObjectsApi()
|
|
ret = api_instance.list_namespaced_custom_object("codemowers.io", "v1alpha1", OIDC_USERS_NAMESPACE, "oidcgatewayusers")
|
|
return ret["items"]
|
|
|
|
users = get_users()
|
|
users_lookup = {u['metadata']['name'] : u for u in users}
|
|
|
|
class CustomForm(FlaskForm):
|
|
# quite hacky
|
|
def populate_dict(self, d):
|
|
for name, field in self._fields.items():
|
|
if name != "csrf_token":
|
|
d[name] = field.data
|
|
|
|
|
|
def inventory_fetch(owner=True, item_type=None):
|
|
def wrapper(f):
|
|
@wraps(f)
|
|
def decorated_function(*args, **kwargs):
|
|
user = read_user()
|
|
d = {
|
|
"_id": ObjectId(kwargs.pop("item_id"))
|
|
}
|
|
if item_type:
|
|
d["type"] = item_type
|
|
if owner:
|
|
d["inventory.owner.username"] = user["username"]
|
|
kwargs["item"] = db.inventory.find_one(d)
|
|
return f(*args, **kwargs)
|
|
return decorated_function
|
|
return wrapper
|
|
|
|
def format_name(item):
|
|
if item.get("locker"):
|
|
return "Locker %d" % item.get("locker").get("number")
|
|
elif item.get("mac"):
|
|
return "Machine %s" % item.get("hostname", item.get("mac"))
|
|
elif item.get("desk"):
|
|
return "Desk %s" % item.get("desk").get("number")
|
|
return item.get("name", "Unknown item")
|
|
|
|
def flatten(d, parent_key='', sep='.'):
|
|
items = []
|
|
for k, v in d.items():
|
|
new_key = parent_key + sep + k if parent_key else k
|
|
if isinstance(v, collections.abc.MutableMapping):
|
|
items.extend(flatten(v, new_key, sep=sep).items())
|
|
else:
|
|
items.append((new_key, v))
|
|
return dict(items)
|
|
|
|
def spam(msg):
|
|
if devenv:
|
|
print(msg)
|
|
else:
|
|
url = "https://hooks.slack.com/services/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"
|
|
requests.post(url, json={"text": msg })
|
|
|
|
def build_query(base_query, fields=[], sort_fields={}):
|
|
selectors = []
|
|
q = base_query.copy()
|
|
for attr, title, tp in fields:
|
|
key = attr.replace(".", "_")
|
|
if key in ("inventory_owner_username", "inventory_user_username"):
|
|
val = request.args.get(key, type=dict)
|
|
elif tp == list:
|
|
val = request.args.getlist(key)
|
|
val = list(map(str, val))
|
|
else:
|
|
val = request.args.get(key, type=tp)
|
|
results = db.inventory.find(base_query).distinct(attr)
|
|
if key in ("inventory_owner_username", "inventory_user_username"):
|
|
results = [{"username": u, "display_name": users_lookup[u]["spec"]["customProfile"]["name"]} for u in results]
|
|
results = sorted(results, key = lambda k: k["display_name"])
|
|
elif tp != list:
|
|
results = sorted(results)
|
|
results = list(map(tp, results))
|
|
selectors.append((key, title, results, val))
|
|
if val:
|
|
if tp == list and attr == "type":
|
|
q[attr] = { "$nin" : val }
|
|
elif tp == list:
|
|
q[attr] = { "$in" : val }
|
|
else:
|
|
q[attr] = val
|
|
|
|
for s in selectors:
|
|
if s[0] != "type":
|
|
sort_fields[s[0]] = s[1]
|
|
sort_field = request.args.get("sort_field", "", type=str)
|
|
if sort_field not in sort_fields.keys():
|
|
sort_field = "last_seen"
|
|
if sort_field not in ["nic_vendor", "last_seen"]:
|
|
sort_field_final = sort_field.replace("_", ".")
|
|
else:
|
|
sort_field_final = sort_field
|
|
sort_direction = request.args.get("sort_direction", type=str)
|
|
if sort_direction not in ["asc", "desc"]:
|
|
sort_direction = "desc"
|
|
return q, selectors, sort_field, sort_field_final, sort_direction
|