Move inventory to new repo
This commit is contained in:
108
inventory-app/common.py
Normal file
108
inventory-app/common.py
Normal file
@@ -0,0 +1,108 @@
|
||||
import collections.abc
|
||||
from functools import wraps
|
||||
|
||||
import requests
|
||||
from bson.objectid import ObjectId
|
||||
from flask import g, request
|
||||
from flask_wtf import FlaskForm
|
||||
from pymongo import MongoClient
|
||||
|
||||
import const
|
||||
|
||||
devenv = const.ENVIRONMENT_TYPE == "DEV"
|
||||
|
||||
db = MongoClient(const.MONGO_URI).get_default_database()
|
||||
|
||||
|
||||
class CustomForm(FlaskForm):
|
||||
# quite hacky
|
||||
def populate_dict(self, d):
|
||||
for name, field in self._fields.items():
|
||||
if name != "csrf_token":
|
||||
d[name] = field.data
|
||||
|
||||
|
||||
def inventory_fetch(owner=True, item_type=None):
|
||||
def wrapper(f):
|
||||
@wraps(f)
|
||||
def decorated_function(*args, **kwargs):
|
||||
d = {
|
||||
"_id": ObjectId(kwargs.pop("item_id"))
|
||||
}
|
||||
if item_type:
|
||||
d["type"] = item_type
|
||||
if owner:
|
||||
d["inventory.owner.foreign_id"] = g.user["_id"]
|
||||
kwargs["item"] = db.inventory.find_one(d)
|
||||
return f(*args, **kwargs)
|
||||
return decorated_function
|
||||
return wrapper
|
||||
|
||||
def format_name(item):
|
||||
if item.get("locker"):
|
||||
return "Locker %d" % item.get("locker").get("number")
|
||||
elif item.get("mac"):
|
||||
return "Machine %s" % item.get("hostname", item.get("mac"))
|
||||
elif item.get("desk"):
|
||||
return "Desk %s" % item.get("desk").get("number")
|
||||
return item.get("name", "Unknown item")
|
||||
|
||||
def flatten(d, parent_key='', sep='.'):
|
||||
items = []
|
||||
for k, v in d.items():
|
||||
new_key = parent_key + sep + k if parent_key else k
|
||||
if isinstance(v, collections.abc.MutableMapping):
|
||||
items.extend(flatten(v, new_key, sep=sep).items())
|
||||
else:
|
||||
items.append((new_key, v))
|
||||
return dict(items)
|
||||
|
||||
def spam(msg):
|
||||
if devenv:
|
||||
print(msg)
|
||||
else:
|
||||
url = "https://hooks.slack.com/services/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"
|
||||
requests.post(url, json={"text": msg })
|
||||
|
||||
def build_query(base_query, fields=[], sort_fields={}):
|
||||
selectors = []
|
||||
q = base_query.copy()
|
||||
for attr, title, tp in fields:
|
||||
key = attr.replace(".", "_")
|
||||
if tp == list:
|
||||
val = request.args.getlist(key)
|
||||
val = list(map(str, val))
|
||||
else:
|
||||
val = request.args.get(key, type=tp)
|
||||
results = db.inventory.find(base_query).distinct(attr)
|
||||
if tp == ObjectId:
|
||||
results = sorted(results, key = lambda k: k["display_name"])
|
||||
elif tp != list:
|
||||
results = sorted(results)
|
||||
results = list(map(tp, results))
|
||||
selectors.append((key, title, results, val))
|
||||
if val:
|
||||
if tp == ObjectId:
|
||||
attr = attr + ".foreign_id"
|
||||
q[attr] = val
|
||||
elif tp == list and attr == "type":
|
||||
q[attr] = { "$nin" : val }
|
||||
elif tp == list:
|
||||
q[attr] = { "$in" : val }
|
||||
else:
|
||||
q[attr] = val
|
||||
|
||||
for s in selectors:
|
||||
if s[0] != "type":
|
||||
sort_fields[s[0]] = s[1]
|
||||
sort_field = request.args.get("sort_field", "", type=str)
|
||||
if sort_field not in sort_fields.keys():
|
||||
sort_field = "last_seen"
|
||||
if sort_field not in ["nic_vendor", "last_seen"]:
|
||||
sort_field_final = sort_field.replace("_", ".")
|
||||
else:
|
||||
sort_field_final = sort_field
|
||||
sort_direction = request.args.get("sort_direction", type=str)
|
||||
if sort_direction not in ["asc", "desc"]:
|
||||
sort_direction = "desc"
|
||||
return q, selectors, sort_field, sort_field_final, sort_direction
|
Reference in New Issue
Block a user