import click import grp import os import pwd from certidude import const, config class User(object): def __init__(self, username, mail, given_name="", surname=""): self.name = username self.mail = mail self.given_name = given_name self.surname = surname def format(self): if self.given_name or self.surname: return " ".join([j for j in [self.given_name, self.surname] if j]), "<%s>" % self.mail else: return None, self.mail def __repr__(self): return " ".join([j for j in self.format() if j]) def __hash__(self): return hash(self.mail) def __eq__(self, other): if other == None: return False assert isinstance(other, User), "%s is not instance of User" % repr(other) return self.mail == other.mail def is_admin(self): if not hasattr(self, "_is_admin"): self._is_admin = self.objects.is_admin(self) return self._is_admin class DoesNotExist(Exception): pass class PosixUserManager(object): def get(self, username): _, _, _, _, gecos, _, _ = pwd.getpwnam(username) gecos = gecos.split(",") full_name = gecos[0] mail = "%s@%s" % (username, config.MAIL_SUFFIX) if full_name and " " in full_name: given_name, surname = full_name.split(" ", 1) return User(username, mail, given_name, surname) return User(username, mail) def filter_admins(self): _, _, gid, members = grp.getgrnam(config.ADMIN_GROUP) for username in members: yield self.get(username) def is_admin(self, user): import grp _, _, gid, members = grp.getgrnam(config.ADMIN_GROUP) return user.name in members def all(self): _, _, gid, members = grp.getgrnam(config.USERS_GROUP) for username in members: yield self.get(username) for user in self.filter_admins(): # TODO: dedup yield user class DirectoryConnection(object): def __enter__(self): import ldap import ldap.sasl # TODO: Implement simple bind if not os.path.exists(config.LDAP_GSSAPI_CRED_CACHE): raise ValueError("Ticket cache at %s not initialized, unable to " "authenticate with computer account against LDAP server!" % config.LDAP_GSSAPI_CRED_CACHE) os.environ["KRB5CCNAME"] = config.LDAP_GSSAPI_CRED_CACHE self.conn = ldap.initialize(config.LDAP_ACCOUNTS_URI, bytes_mode=False) self.conn.set_option(ldap.OPT_REFERRALS, 0) click.echo("Connecing to %s using Kerberos ticket cache from %s" % (config.LDAP_ACCOUNTS_URI, config.LDAP_GSSAPI_CRED_CACHE)) self.conn.sasl_interactive_bind_s('', ldap.sasl.gssapi()) return self.conn def __exit__(self, type, value, traceback): self.conn.unbind_s() del os.environ["KRB5CCNAME"] # prevent contaminating environment class ActiveDirectoryUserManager(object): def get(self, username): # TODO: Sanitize username with DirectoryConnection() as conn: ft = config.LDAP_USER_FILTER % username attribs = "cn", "givenName", "sn", config.LDAP_MAIL_ATTRIBUTE, "userPrincipalName" r = conn.search_s(config.LDAP_BASE, 2, ft, attribs) for dn, entry in r: if not dn: continue if entry.get("givenname") and entry.get("sn"): given_name, = entry.get("givenName") surname, = entry.get("sn") else: cn, = entry.get("cn") if b" " in cn: given_name, surname = cn.split(b" ", 1) else: given_name, surname = cn, b"" mail, = entry.get(config.LDAP_MAIL_ATTRIBUTE) or ((username + "@" + const.DOMAIN).encode("ascii"),) return User(username, mail.decode("ascii"), given_name.decode("utf-8"), surname.decode("utf-8")) raise User.DoesNotExist("User %s does not exist" % username) def filter(self, ft): with DirectoryConnection() as conn: attribs = "givenName", "surname", "samaccountname", "cn", config.LDAP_MAIL_ATTRIBUTE, "userPrincipalName" r = conn.search_s(config.LDAP_BASE, 2, ft, attribs) for dn,entry in r: if not dn: continue username, = entry.get("sAMAccountName") cn, = entry.get("cn") mail, = entry.get(config.LDAP_MAIL_ATTRIBUTE) or entry.get("userPrincipalName") or (username + b"@" + const.DOMAIN.encode("ascii"),) if entry.get("givenName") and entry.get("sn"): given_name, = entry.get("givenName") surname, = entry.get("sn") else: cn, = entry.get("cn") if b" " in cn: given_name, surname = cn.split(b" ", 1) else: given_name, surname = cn, b"" yield User(username.decode("utf-8"), mail.decode("utf-8"), given_name.decode("utf-8"), surname.decode("utf-8")) def filter_admins(self): """ Return admin User objects """ return self.filter(config.LDAP_ADMIN_FILTER % "*") def all(self): """ Return all valid User objects """ return self.filter(ft=config.LDAP_USER_FILTER % "*") def is_admin(self, user): with DirectoryConnection() as conn: ft = config.LDAP_ADMIN_FILTER % user.name r = conn.search_s(config.LDAP_BASE, 2, ft, ["cn"]) for dn, entry in r: if not dn: continue return True return False if config.ACCOUNTS_BACKEND == "ldap": User.objects = ActiveDirectoryUserManager() elif config.ACCOUNTS_BACKEND == "posix": User.objects = PosixUserManager() else: raise NotImplementedError("Authorization backend %s not supported" % repr(config.AUTHORIZATION_BACKEND))