mirror of
https://github.com/laurivosandi/certidude
synced 2024-12-23 00:25:18 +00:00
Lauri Võsandi
59bedc1f16
* Migrate to Python 3 * Update token generator mechanism * Switch to Bootstrap 4 * Switch from Iconmonstr to Font Awesome icons * Rename default CA common name to "Certidude at ca.example.lan" * Add self-enroll for the TLS server certificates * TLS client auth for lease updating * Compile assets from npm packages to /var/lib/certidude/ca.example.lan/assets
163 lines
5.8 KiB
Python
163 lines
5.8 KiB
Python
|
|
import click
|
|
import grp
|
|
import os
|
|
import pwd
|
|
from certidude import const, config
|
|
|
|
class User(object):
|
|
def __init__(self, username, mail, given_name="", surname=""):
|
|
self.name = username
|
|
self.mail = mail
|
|
self.given_name = given_name
|
|
self.surname = surname
|
|
|
|
def __repr__(self):
|
|
if self.given_name and self.surname:
|
|
return "%s %s <%s>" % (self.given_name, self.surname, self.mail)
|
|
else:
|
|
return self.mail
|
|
|
|
def __hash__(self):
|
|
return hash(self.mail)
|
|
|
|
def __eq__(self, other):
|
|
assert isinstance(other, User), "%s is not instance of User" % repr(other)
|
|
return self.mail == other.mail
|
|
|
|
def is_admin(self):
|
|
if not hasattr(self, "_is_admin"):
|
|
self._is_admin = self.objects.is_admin(self)
|
|
return self._is_admin
|
|
|
|
class DoesNotExist(Exception):
|
|
pass
|
|
|
|
|
|
class PosixUserManager(object):
|
|
def get(self, username):
|
|
_, _, _, _, gecos, _, _ = pwd.getpwnam(username)
|
|
gecos = gecos.split(",")
|
|
full_name = gecos[0]
|
|
mail = "%s@%s" % (username, const.DOMAIN)
|
|
if full_name and " " in full_name:
|
|
given_name, surname = full_name.split(" ", 1)
|
|
return User(username, mail, given_name, surname)
|
|
return User(username, mail)
|
|
|
|
def filter_admins(self):
|
|
_, _, gid, members = grp.getgrnam(config.ADMIN_GROUP)
|
|
for username in members:
|
|
yield self.get(username)
|
|
|
|
def is_admin(self, user):
|
|
import grp
|
|
_, _, gid, members = grp.getgrnam(config.ADMIN_GROUP)
|
|
return user.name in members
|
|
|
|
def all(self):
|
|
_, _, gid, members = grp.getgrnam(config.USERS_GROUP)
|
|
for username in members:
|
|
yield self.get(username)
|
|
for user in self.filter_admins(): # TODO: dedup
|
|
yield user
|
|
|
|
|
|
class DirectoryConnection(object):
|
|
def __enter__(self):
|
|
import ldap
|
|
import ldap.sasl
|
|
|
|
# TODO: Implement simple bind
|
|
if not os.path.exists(config.LDAP_GSSAPI_CRED_CACHE):
|
|
raise ValueError("Ticket cache at %s not initialized, unable to "
|
|
"authenticate with computer account against LDAP server!" % config.LDAP_GSSAPI_CRED_CACHE)
|
|
os.environ["KRB5CCNAME"] = config.LDAP_GSSAPI_CRED_CACHE
|
|
self.conn = ldap.initialize(config.LDAP_ACCOUNTS_URI, bytes_mode=False)
|
|
self.conn.set_option(ldap.OPT_REFERRALS, 0)
|
|
click.echo("Connecing to %s using Kerberos ticket cache from %s" %
|
|
(config.LDAP_ACCOUNTS_URI, config.LDAP_GSSAPI_CRED_CACHE))
|
|
self.conn.sasl_interactive_bind_s('', ldap.sasl.gssapi())
|
|
return self.conn
|
|
|
|
def __exit__(self, type, value, traceback):
|
|
self.conn.unbind_s()
|
|
|
|
|
|
class ActiveDirectoryUserManager(object):
|
|
def get(self, username):
|
|
# TODO: Sanitize username
|
|
with DirectoryConnection() as conn:
|
|
ft = config.LDAP_USER_FILTER % username
|
|
attribs = "cn", "givenName", "sn", "mail", "userPrincipalName"
|
|
r = conn.search_s(config.LDAP_BASE, 2, ft, attribs)
|
|
for dn, entry in r:
|
|
if not dn:
|
|
continue
|
|
if entry.get("givenname") and entry.get("sn"):
|
|
given_name, = entry.get("givenName")
|
|
surname, = entry.get("sn")
|
|
else:
|
|
cn, = entry.get("cn")
|
|
if b" " in cn:
|
|
given_name, surname = cn.split(b" ", 1)
|
|
else:
|
|
given_name, surname = cn, b""
|
|
|
|
mail, = entry.get("mail") or entry.get("userPrincipalName") or ((username + "@" + const.DOMAIN).encode("ascii"),)
|
|
return User(username, mail.decode("ascii"),
|
|
given_name.decode("utf-8"), surname.decode("utf-8"))
|
|
raise User.DoesNotExist("User %s does not exist" % username)
|
|
|
|
def filter(self, ft):
|
|
with DirectoryConnection() as conn:
|
|
attribs = "givenName", "surname", "samaccountname", "cn", "mail", "userPrincipalName"
|
|
r = conn.search_s(config.LDAP_BASE, 2, ft, attribs)
|
|
for dn,entry in r:
|
|
if not dn:
|
|
continue
|
|
username, = entry.get("sAMAccountName")
|
|
cn, = entry.get("cn")
|
|
mail, = entry.get("mail") or entry.get("userPrincipalName") or (username + b"@" + const.DOMAIN.encode("ascii"),)
|
|
if entry.get("givenName") and entry.get("sn"):
|
|
given_name, = entry.get("givenName")
|
|
surname, = entry.get("sn")
|
|
else:
|
|
cn, = entry.get("cn")
|
|
if b" " in cn:
|
|
given_name, surname = cn.split(b" ", 1)
|
|
else:
|
|
given_name, surname = cn, b""
|
|
yield User(username.decode("utf-8"), mail.decode("utf-8"),
|
|
given_name.decode("utf-8"), surname.decode("utf-8"))
|
|
|
|
def filter_admins(self):
|
|
"""
|
|
Return admin User objects
|
|
"""
|
|
return self.filter(config.LDAP_ADMIN_FILTER % "*")
|
|
|
|
def all(self):
|
|
"""
|
|
Return all valid User objects
|
|
"""
|
|
return self.filter(ft=config.LDAP_USER_FILTER % "*")
|
|
|
|
def is_admin(self, user):
|
|
with DirectoryConnection() as conn:
|
|
ft = config.LDAP_ADMIN_FILTER % user.name
|
|
r = conn.search_s(config.LDAP_BASE, 2, ft, ["cn"])
|
|
for dn, entry in r:
|
|
if not dn:
|
|
continue
|
|
return True
|
|
return False
|
|
|
|
if config.ACCOUNTS_BACKEND == "ldap":
|
|
User.objects = ActiveDirectoryUserManager()
|
|
elif config.ACCOUNTS_BACKEND == "posix":
|
|
User.objects = PosixUserManager()
|
|
else:
|
|
raise NotImplementedError("Authorization backend %s not supported" % repr(config.AUTHORIZATION_BACKEND))
|
|
|