1
0
mirror of https://github.com/laurivosandi/certidude synced 2024-11-05 12:50:35 +00:00
certidude/certidude/api/tag.py
Lauri Võsandi bfdd8c4887 Several updates #3
* Move SessionResource and CertificateAuthorityResource to api/session.py
* Log browser user agent for logins
* Remove static sink from backend, nginx always serves static now
* Don't emit 'attribute-update' event if no attributes were changed
* Better CN extraction from DN during lease update
* Log user who deleted request
* Remove long polling CRL fetch API call and relevant test
* Merge auth decorators ldap_authenticate, kerberos_authenticate, pam_authenticate
* Add 'kerberos subnets' to distinguish authentication method
* Add 'admin subnets' to filter traffic to administrative API calls
* Highlight recent log events
* Links to switch between 2, 3 and 4 column layouts in the dashboard
* Restored certidude client snippets in request dialog
* Various bugfixes, improved log messages
2018-05-04 08:55:49 +00:00

87 lines
3.0 KiB
Python

import logging
from xattr import getxattr, removexattr, setxattr
from certidude import push
from certidude.decorators import serialize, csrf_protection
from .utils import AuthorityHandler
from .utils.firewall import login_required, authorize_admin
logger = logging.getLogger(__name__)
class TagResource(AuthorityHandler):
@serialize
@login_required
@authorize_admin
def on_get(self, req, resp, cn):
path, buf, cert, signed, expires = self.authority.get_signed(cn)
tags = []
try:
for tag in getxattr(path, "user.xdg.tags").decode("utf-8").split(","):
if "=" in tag:
k, v = tag.split("=", 1)
else:
k, v = "other", tag
tags.append(dict(id=tag, key=k, value=v))
except IOError: # No user.xdg.tags attribute
pass
return tags
@csrf_protection
@login_required
@authorize_admin
def on_post(self, req, resp, cn):
path, buf, cert, signed, expires = self.authority.get_signed(cn)
key, value = req.get_param("key", required=True), req.get_param("value", required=True)
try:
tags = set(getxattr(path, "user.xdg.tags").decode("utf-8").split(","))
except IOError:
tags = set()
if key == "other":
tags.add(value)
else:
tags.add("%s=%s" % (key,value))
setxattr(path, "user.xdg.tags", ",".join(tags).encode("utf-8"))
logger.info("Tag %s=%s set for %s by %s" % (key, value, cn, req.context.get("user")))
push.publish("tag-update", cn)
class TagDetailResource(object):
def __init__(self, authority):
self.authority = authority
@csrf_protection
@login_required
@authorize_admin
def on_put(self, req, resp, cn, tag):
path, buf, cert, signed, expires = self.authority.get_signed(cn)
value = req.get_param("value", required=True)
try:
tags = set(getxattr(path, "user.xdg.tags").decode("utf-8").split(","))
except IOError:
tags = set()
try:
tags.remove(tag)
except KeyError:
pass
if "=" in tag:
tags.add("%s=%s" % (tag.split("=")[0], value))
else:
tags.add(value)
setxattr(path, "user.xdg.tags", ",".join(tags).encode("utf-8"))
logger.info("Tag %s set to %s for %s by %s" % (tag, value, cn, req.context.get("user")))
push.publish("tag-update", cn)
@csrf_protection
@login_required
@authorize_admin
def on_delete(self, req, resp, cn, tag):
path, buf, cert, signed, expires = self.authority.get_signed(cn)
tags = set(getxattr(path, "user.xdg.tags").decode("utf-8").split(","))
tags.remove(tag)
if not tags:
removexattr(path, "user.xdg.tags")
else:
setxattr(path, "user.xdg.tags", ",".join(tags))
logger.info("Tag %s removed for %s by %s" % (tag, cn, req.context.get("user")))
push.publish("tag-update", cn)