diff --git a/certidude/auth.py b/certidude/auth.py index df19c9d..4fc743f 100644 --- a/certidude/auth.py +++ b/certidude/auth.py @@ -89,12 +89,13 @@ def authenticate(optional=False): basic, token = req.auth.split(" ", 1) user, passwd = b64decode(token).split(":", 1) - click.echo("Connecting to %s as %s" % (config.LDAP_AUTHENTICATION_URI, user)) + upn = "%s@%s" % (user, const.DOMAIN) + click.echo("Connecting to %s as %s" % (config.LDAP_AUTHENTICATION_URI, upn)) conn = ldap.initialize(config.LDAP_AUTHENTICATION_URI) conn.set_option(ldap.OPT_REFERRALS, 0) try: - conn.simple_bind_s("%s@%s" % (user, const.DOMAIN), passwd) + conn.simple_bind_s(upn, passwd) except ldap.STRONG_AUTH_REQUIRED: logger.critical("LDAP server demands encryption, use ldaps:// instead of ldaps://") raise @@ -168,21 +169,10 @@ def login_optional(func): return authenticate(optional=True)(func) def authorize_admin(func): - def whitelist_authorize_admin(resource, req, resp, *args, **kwargs): - # Check for username whitelist - if not req.context.get("user") or req.context.get("user") not in config.ADMIN_WHITELIST: - logger.info(u"Rejected access to administrative call %s by %s from %s, user not whitelisted", - req.env["PATH_INFO"], req.context.get("user"), req.context.get("remote_addr")) - raise falcon.HTTPForbidden("Forbidden", "User %s not whitelisted" % req.context.get("user")) - return func(resource, req, resp, *args, **kwargs) - - def authorize_admin(resource, req, resp, *args, **kwargs): + def wrapped(resource, req, resp, *args, **kwargs): if req.context.get("user").is_admin(): req.context["admin_authorized"] = True return func(resource, req, resp, *args, **kwargs) logger.info(u"User '%s' not authorized to access administrative API", req.context.get("user").name) raise falcon.HTTPForbidden("Forbidden", "User not authorized to perform administrative operations") - - if config.AUTHORIZATION_BACKEND == "whitelist": - return whitelist_authorize_admin - return authorize_admin + return wrapped diff --git a/certidude/config.py b/certidude/config.py index 2023bc7..c4c7ae7 100644 --- a/certidude/config.py +++ b/certidude/config.py @@ -73,8 +73,6 @@ LONG_POLL_SUBSCRIBE = cp.get("push", "long poll subscribe") LOGGING_BACKEND = cp.get("logging", "backend") -USERS_WHITELIST = set([j for j in cp.get("authorization", "user whitelist").split(" ") if j]) -ADMINS_WHITELIST = set([j for j in cp.get("authorization", "admin whitelist").split(" ") if j]) USERS_GROUP = cp.get("authorization", "posix user group") ADMIN_GROUP = cp.get("authorization", "posix admin group") LDAP_USER_FILTER = cp.get("authorization", "ldap user filter") diff --git a/certidude/const.py b/certidude/const.py index b3b6310..c8222b6 100644 --- a/certidude/const.py +++ b/certidude/const.py @@ -23,11 +23,7 @@ except socket.gaierror: click.echo("Failed to resolve fully qualified hostname of this machine, make sure hostname -f works") sys.exit(254) -if "." in FQDN: - HOSTNAME, DOMAIN = FQDN.split(".", 1) -else: - HOSTNAME, DOMAIN = FQDN, "local" - click.echo("Unable to determine domain of this computer, falling back to local") +HOSTNAME, DOMAIN = FQDN.split(".", 1) # TODO: lazier, otherwise gets evaluated before installing package if os.path.exists("/etc/strongswan/ipsec.conf"): # fedora dafuq?! diff --git a/certidude/relational.py b/certidude/relational.py index ca60ae3..e1e3b60 100644 --- a/certidude/relational.py +++ b/certidude/relational.py @@ -86,17 +86,3 @@ class RelationalMixin(object): cursor.close() conn.close() return tuple(g()) - - - def sql_fetchone(self, query, *args): - conn = self.sql_connect() - cursor = conn.cursor() - cursor.execute(query, args) - cols = [j[0] for j in cursor.description] - - for row in cursor: - r = dict(zip(cols, row)) - cursor.close() - conn.close() - return r - return None diff --git a/certidude/user.py b/certidude/user.py index 6232a4c..82367d5 100644 --- a/certidude/user.py +++ b/certidude/user.py @@ -7,8 +7,6 @@ from certidude import const, config class User(object): def __init__(self, username, mail, given_name="", surname=""): - if "@" not in mail: - raise ValueError("Invalid e-mail %s" % repr(mail)) self.name = username self.mail = mail self.given_name = given_name diff --git a/tests/test_cli.py b/tests/test_cli.py index c15663e..f820081 100644 --- a/tests/test_cli.py +++ b/tests/test_cli.py @@ -168,6 +168,7 @@ def test_cli_setup_authority(): os.system("samba-tool domain provision --server-role=dc --domain=EXAMPLE --realm=EXAMPLE.LAN --host-name=ca") os.system("samba-tool user add userbot S4l4k4l4 --given-name='User' --surname='Bot'") os.system("samba-tool user add adminbot S4l4k4l4 --given-name='Admin' --surname='Bot'") + os.system("samba-tool group addmembers 'Domain Admins' adminbot") os.system("samba-tool user setpassword administrator --newpassword=S4l4k4l4") os.symlink("/var/lib/samba/private/secrets.keytab", "/etc/krb5.keytab") os.chmod("/var/lib/samba/private/secrets.keytab", 0644) # To allow access to certidude server @@ -245,8 +246,12 @@ def test_cli_setup_authority(): usertoken = "Basic dXNlcmJvdDpib3Q=" admintoken = "Basic YWRtaW5ib3Q6Ym90" + result = runner.invoke(cli, ['users']) assert not result.exception, result.output + assert "user;userbot;User;Bot;userbot@example.lan" in result.output + assert "admin;adminbot;;;adminbot@example.lan" in result.output + # TODO: assert nothing else is in the list # Check that we can retrieve empty CRL assert authority.export_crl(), "Failed to export CRL" @@ -445,6 +450,8 @@ def test_cli_setup_authority(): query_string = "client=test&address=127.0.0.1", headers={"Authorization":admintoken}) assert r.status_code == 200, r.text # lease update ok + r = client().simulate_get("/api/signed/nonexistant/script/") + assert r.status_code == 404, r.text # cert not found r = client().simulate_get("/api/signed/test/script/") assert r.status_code == 200, r.text # script render ok assert "uci set " in r.text, r.text @@ -835,7 +842,7 @@ def test_cli_setup_authority(): # Certidude would auth against domain controller os.system("sed -e 's/ldap uri = ldaps:.*/ldap uri = ldaps:\\/\\/ca.example.lan/g' -i /etc/certidude/server.conf") os.system("sed -e 's/ldap uri = ldap:.*/ldap uri = ldap:\\/\\/ca.example.lan/g' -i /etc/certidude/server.conf") - os.system("sed -e 's/backends = pam/backends = kerberos/g' -i /etc/certidude/server.conf") + os.system("sed -e 's/backends = pam/backends = kerberos ldap/g' -i /etc/certidude/server.conf") os.system("sed -e 's/backend = posix/backend = ldap/g' -i /etc/certidude/server.conf") os.system("sed -e 's/dc1/ca/g' -i /etc/cron.hourly/certidude") @@ -845,29 +852,36 @@ def test_cli_setup_authority(): assert "ldap/ca.example.lan" in cronjob, cronjob os.system("/etc/cron.hourly/certidude") - result = runner.invoke(cli, ['users']) - assert not result.exception, result.output - # TODO: assert "Administrator@example.lan" in result.output - server_pid = os.fork() # Fork to prevent environment contamination if not server_pid: # Apply /etc/certidude/server.conf changes reload(config) reload(user) reload(auth) - assert isinstance(user.User.objects, user.ActiveDirectoryUserManager), user.User.objects + + result = runner.invoke(cli, ['users']) + assert not result.exception, result.output + assert "user;userbot;User;Bot;userbot@example.lan" in result.output + assert "admin;adminbot;Admin;Bot;adminbot@example.lan" in result.output + assert "admin;Administrator;Administrator;;Administrator@example.lan" in result.output + result = runner.invoke(cli, ['serve', '-p', '8080', '-l', '127.0.1.1', '-e']) assert not result.exception, result.output return sleep(1) # Wait for serve to start up + + ##################### + ### Kerberos auth ### + ##################### + # TODO: pip install requests-kerberos from requests_kerberos import HTTPKerberosAuth, OPTIONAL auth = HTTPKerberosAuth(mutual_authentication=OPTIONAL, force_preemptive=True) - # Test session API call + # Test Kerberos auth r = requests.get("http://ca.example.lan/api/") assert r.status_code == 401, r.text assert "No Kerberos ticket offered" in r.text, r.text @@ -877,6 +891,23 @@ def test_cli_setup_authority(): assert r.status_code == 200, r.text + ################# + ### LDAP auth ### + ################# + + # Test LDAP bind auth fallback + usertoken = "Basic dXNlcmJvdDpTNGw0azRsNA==" + admintoken = "Basic YWRtaW5ib3Q6UzRsNGs0bDQ=" + + with open("/etc/ldap/ldap.conf", "w") as fh: + fh.write("TLS_REQCERT allow") # TODO: Correct way + + # curl http://ca.example.lan/api/ -u adminbot:S4l4k4l4 -H "User-agent: Android" -H "Referer: http://ca.example.lan" + r = requests.get("http://ca.example.lan/api/", + headers={"Authorization":usertoken, "User-Agent": "Android", "Referer":"http://ca.example.lan/"}) + assert r.status_code == 200, r.text + + ################### ### Final tests ### ###################