mirror of
https://github.com/laurivosandi/certidude
synced 2024-11-14 00:46:44 +00:00
tests: Cover LDAP auth and more
This commit is contained in:
parent
a4a31ca2c6
commit
545febf3d0
@ -89,12 +89,13 @@ def authenticate(optional=False):
|
|||||||
basic, token = req.auth.split(" ", 1)
|
basic, token = req.auth.split(" ", 1)
|
||||||
user, passwd = b64decode(token).split(":", 1)
|
user, passwd = b64decode(token).split(":", 1)
|
||||||
|
|
||||||
click.echo("Connecting to %s as %s" % (config.LDAP_AUTHENTICATION_URI, user))
|
upn = "%s@%s" % (user, const.DOMAIN)
|
||||||
|
click.echo("Connecting to %s as %s" % (config.LDAP_AUTHENTICATION_URI, upn))
|
||||||
conn = ldap.initialize(config.LDAP_AUTHENTICATION_URI)
|
conn = ldap.initialize(config.LDAP_AUTHENTICATION_URI)
|
||||||
conn.set_option(ldap.OPT_REFERRALS, 0)
|
conn.set_option(ldap.OPT_REFERRALS, 0)
|
||||||
|
|
||||||
try:
|
try:
|
||||||
conn.simple_bind_s("%s@%s" % (user, const.DOMAIN), passwd)
|
conn.simple_bind_s(upn, passwd)
|
||||||
except ldap.STRONG_AUTH_REQUIRED:
|
except ldap.STRONG_AUTH_REQUIRED:
|
||||||
logger.critical("LDAP server demands encryption, use ldaps:// instead of ldaps://")
|
logger.critical("LDAP server demands encryption, use ldaps:// instead of ldaps://")
|
||||||
raise
|
raise
|
||||||
@ -168,21 +169,10 @@ def login_optional(func):
|
|||||||
return authenticate(optional=True)(func)
|
return authenticate(optional=True)(func)
|
||||||
|
|
||||||
def authorize_admin(func):
|
def authorize_admin(func):
|
||||||
def whitelist_authorize_admin(resource, req, resp, *args, **kwargs):
|
def wrapped(resource, req, resp, *args, **kwargs):
|
||||||
# Check for username whitelist
|
|
||||||
if not req.context.get("user") or req.context.get("user") not in config.ADMIN_WHITELIST:
|
|
||||||
logger.info(u"Rejected access to administrative call %s by %s from %s, user not whitelisted",
|
|
||||||
req.env["PATH_INFO"], req.context.get("user"), req.context.get("remote_addr"))
|
|
||||||
raise falcon.HTTPForbidden("Forbidden", "User %s not whitelisted" % req.context.get("user"))
|
|
||||||
return func(resource, req, resp, *args, **kwargs)
|
|
||||||
|
|
||||||
def authorize_admin(resource, req, resp, *args, **kwargs):
|
|
||||||
if req.context.get("user").is_admin():
|
if req.context.get("user").is_admin():
|
||||||
req.context["admin_authorized"] = True
|
req.context["admin_authorized"] = True
|
||||||
return func(resource, req, resp, *args, **kwargs)
|
return func(resource, req, resp, *args, **kwargs)
|
||||||
logger.info(u"User '%s' not authorized to access administrative API", req.context.get("user").name)
|
logger.info(u"User '%s' not authorized to access administrative API", req.context.get("user").name)
|
||||||
raise falcon.HTTPForbidden("Forbidden", "User not authorized to perform administrative operations")
|
raise falcon.HTTPForbidden("Forbidden", "User not authorized to perform administrative operations")
|
||||||
|
return wrapped
|
||||||
if config.AUTHORIZATION_BACKEND == "whitelist":
|
|
||||||
return whitelist_authorize_admin
|
|
||||||
return authorize_admin
|
|
||||||
|
@ -73,8 +73,6 @@ LONG_POLL_SUBSCRIBE = cp.get("push", "long poll subscribe")
|
|||||||
|
|
||||||
LOGGING_BACKEND = cp.get("logging", "backend")
|
LOGGING_BACKEND = cp.get("logging", "backend")
|
||||||
|
|
||||||
USERS_WHITELIST = set([j for j in cp.get("authorization", "user whitelist").split(" ") if j])
|
|
||||||
ADMINS_WHITELIST = set([j for j in cp.get("authorization", "admin whitelist").split(" ") if j])
|
|
||||||
USERS_GROUP = cp.get("authorization", "posix user group")
|
USERS_GROUP = cp.get("authorization", "posix user group")
|
||||||
ADMIN_GROUP = cp.get("authorization", "posix admin group")
|
ADMIN_GROUP = cp.get("authorization", "posix admin group")
|
||||||
LDAP_USER_FILTER = cp.get("authorization", "ldap user filter")
|
LDAP_USER_FILTER = cp.get("authorization", "ldap user filter")
|
||||||
|
@ -23,11 +23,7 @@ except socket.gaierror:
|
|||||||
click.echo("Failed to resolve fully qualified hostname of this machine, make sure hostname -f works")
|
click.echo("Failed to resolve fully qualified hostname of this machine, make sure hostname -f works")
|
||||||
sys.exit(254)
|
sys.exit(254)
|
||||||
|
|
||||||
if "." in FQDN:
|
HOSTNAME, DOMAIN = FQDN.split(".", 1)
|
||||||
HOSTNAME, DOMAIN = FQDN.split(".", 1)
|
|
||||||
else:
|
|
||||||
HOSTNAME, DOMAIN = FQDN, "local"
|
|
||||||
click.echo("Unable to determine domain of this computer, falling back to local")
|
|
||||||
|
|
||||||
# TODO: lazier, otherwise gets evaluated before installing package
|
# TODO: lazier, otherwise gets evaluated before installing package
|
||||||
if os.path.exists("/etc/strongswan/ipsec.conf"): # fedora dafuq?!
|
if os.path.exists("/etc/strongswan/ipsec.conf"): # fedora dafuq?!
|
||||||
|
@ -86,17 +86,3 @@ class RelationalMixin(object):
|
|||||||
cursor.close()
|
cursor.close()
|
||||||
conn.close()
|
conn.close()
|
||||||
return tuple(g())
|
return tuple(g())
|
||||||
|
|
||||||
|
|
||||||
def sql_fetchone(self, query, *args):
|
|
||||||
conn = self.sql_connect()
|
|
||||||
cursor = conn.cursor()
|
|
||||||
cursor.execute(query, args)
|
|
||||||
cols = [j[0] for j in cursor.description]
|
|
||||||
|
|
||||||
for row in cursor:
|
|
||||||
r = dict(zip(cols, row))
|
|
||||||
cursor.close()
|
|
||||||
conn.close()
|
|
||||||
return r
|
|
||||||
return None
|
|
||||||
|
@ -7,8 +7,6 @@ from certidude import const, config
|
|||||||
|
|
||||||
class User(object):
|
class User(object):
|
||||||
def __init__(self, username, mail, given_name="", surname=""):
|
def __init__(self, username, mail, given_name="", surname=""):
|
||||||
if "@" not in mail:
|
|
||||||
raise ValueError("Invalid e-mail %s" % repr(mail))
|
|
||||||
self.name = username
|
self.name = username
|
||||||
self.mail = mail
|
self.mail = mail
|
||||||
self.given_name = given_name
|
self.given_name = given_name
|
||||||
|
@ -168,6 +168,7 @@ def test_cli_setup_authority():
|
|||||||
os.system("samba-tool domain provision --server-role=dc --domain=EXAMPLE --realm=EXAMPLE.LAN --host-name=ca")
|
os.system("samba-tool domain provision --server-role=dc --domain=EXAMPLE --realm=EXAMPLE.LAN --host-name=ca")
|
||||||
os.system("samba-tool user add userbot S4l4k4l4 --given-name='User' --surname='Bot'")
|
os.system("samba-tool user add userbot S4l4k4l4 --given-name='User' --surname='Bot'")
|
||||||
os.system("samba-tool user add adminbot S4l4k4l4 --given-name='Admin' --surname='Bot'")
|
os.system("samba-tool user add adminbot S4l4k4l4 --given-name='Admin' --surname='Bot'")
|
||||||
|
os.system("samba-tool group addmembers 'Domain Admins' adminbot")
|
||||||
os.system("samba-tool user setpassword administrator --newpassword=S4l4k4l4")
|
os.system("samba-tool user setpassword administrator --newpassword=S4l4k4l4")
|
||||||
os.symlink("/var/lib/samba/private/secrets.keytab", "/etc/krb5.keytab")
|
os.symlink("/var/lib/samba/private/secrets.keytab", "/etc/krb5.keytab")
|
||||||
os.chmod("/var/lib/samba/private/secrets.keytab", 0644) # To allow access to certidude server
|
os.chmod("/var/lib/samba/private/secrets.keytab", 0644) # To allow access to certidude server
|
||||||
@ -245,8 +246,12 @@ def test_cli_setup_authority():
|
|||||||
usertoken = "Basic dXNlcmJvdDpib3Q="
|
usertoken = "Basic dXNlcmJvdDpib3Q="
|
||||||
admintoken = "Basic YWRtaW5ib3Q6Ym90"
|
admintoken = "Basic YWRtaW5ib3Q6Ym90"
|
||||||
|
|
||||||
|
|
||||||
result = runner.invoke(cli, ['users'])
|
result = runner.invoke(cli, ['users'])
|
||||||
assert not result.exception, result.output
|
assert not result.exception, result.output
|
||||||
|
assert "user;userbot;User;Bot;userbot@example.lan" in result.output
|
||||||
|
assert "admin;adminbot;;;adminbot@example.lan" in result.output
|
||||||
|
# TODO: assert nothing else is in the list
|
||||||
|
|
||||||
# Check that we can retrieve empty CRL
|
# Check that we can retrieve empty CRL
|
||||||
assert authority.export_crl(), "Failed to export CRL"
|
assert authority.export_crl(), "Failed to export CRL"
|
||||||
@ -445,6 +450,8 @@ def test_cli_setup_authority():
|
|||||||
query_string = "client=test&address=127.0.0.1",
|
query_string = "client=test&address=127.0.0.1",
|
||||||
headers={"Authorization":admintoken})
|
headers={"Authorization":admintoken})
|
||||||
assert r.status_code == 200, r.text # lease update ok
|
assert r.status_code == 200, r.text # lease update ok
|
||||||
|
r = client().simulate_get("/api/signed/nonexistant/script/")
|
||||||
|
assert r.status_code == 404, r.text # cert not found
|
||||||
r = client().simulate_get("/api/signed/test/script/")
|
r = client().simulate_get("/api/signed/test/script/")
|
||||||
assert r.status_code == 200, r.text # script render ok
|
assert r.status_code == 200, r.text # script render ok
|
||||||
assert "uci set " in r.text, r.text
|
assert "uci set " in r.text, r.text
|
||||||
@ -835,7 +842,7 @@ def test_cli_setup_authority():
|
|||||||
# Certidude would auth against domain controller
|
# Certidude would auth against domain controller
|
||||||
os.system("sed -e 's/ldap uri = ldaps:.*/ldap uri = ldaps:\\/\\/ca.example.lan/g' -i /etc/certidude/server.conf")
|
os.system("sed -e 's/ldap uri = ldaps:.*/ldap uri = ldaps:\\/\\/ca.example.lan/g' -i /etc/certidude/server.conf")
|
||||||
os.system("sed -e 's/ldap uri = ldap:.*/ldap uri = ldap:\\/\\/ca.example.lan/g' -i /etc/certidude/server.conf")
|
os.system("sed -e 's/ldap uri = ldap:.*/ldap uri = ldap:\\/\\/ca.example.lan/g' -i /etc/certidude/server.conf")
|
||||||
os.system("sed -e 's/backends = pam/backends = kerberos/g' -i /etc/certidude/server.conf")
|
os.system("sed -e 's/backends = pam/backends = kerberos ldap/g' -i /etc/certidude/server.conf")
|
||||||
os.system("sed -e 's/backend = posix/backend = ldap/g' -i /etc/certidude/server.conf")
|
os.system("sed -e 's/backend = posix/backend = ldap/g' -i /etc/certidude/server.conf")
|
||||||
os.system("sed -e 's/dc1/ca/g' -i /etc/cron.hourly/certidude")
|
os.system("sed -e 's/dc1/ca/g' -i /etc/cron.hourly/certidude")
|
||||||
|
|
||||||
@ -845,29 +852,36 @@ def test_cli_setup_authority():
|
|||||||
assert "ldap/ca.example.lan" in cronjob, cronjob
|
assert "ldap/ca.example.lan" in cronjob, cronjob
|
||||||
os.system("/etc/cron.hourly/certidude")
|
os.system("/etc/cron.hourly/certidude")
|
||||||
|
|
||||||
result = runner.invoke(cli, ['users'])
|
|
||||||
assert not result.exception, result.output
|
|
||||||
# TODO: assert "Administrator@example.lan" in result.output
|
|
||||||
|
|
||||||
server_pid = os.fork() # Fork to prevent environment contamination
|
server_pid = os.fork() # Fork to prevent environment contamination
|
||||||
if not server_pid:
|
if not server_pid:
|
||||||
# Apply /etc/certidude/server.conf changes
|
# Apply /etc/certidude/server.conf changes
|
||||||
reload(config)
|
reload(config)
|
||||||
reload(user)
|
reload(user)
|
||||||
reload(auth)
|
reload(auth)
|
||||||
|
|
||||||
assert isinstance(user.User.objects, user.ActiveDirectoryUserManager), user.User.objects
|
assert isinstance(user.User.objects, user.ActiveDirectoryUserManager), user.User.objects
|
||||||
|
|
||||||
|
result = runner.invoke(cli, ['users'])
|
||||||
|
assert not result.exception, result.output
|
||||||
|
assert "user;userbot;User;Bot;userbot@example.lan" in result.output
|
||||||
|
assert "admin;adminbot;Admin;Bot;adminbot@example.lan" in result.output
|
||||||
|
assert "admin;Administrator;Administrator;;Administrator@example.lan" in result.output
|
||||||
|
|
||||||
result = runner.invoke(cli, ['serve', '-p', '8080', '-l', '127.0.1.1', '-e'])
|
result = runner.invoke(cli, ['serve', '-p', '8080', '-l', '127.0.1.1', '-e'])
|
||||||
assert not result.exception, result.output
|
assert not result.exception, result.output
|
||||||
return
|
return
|
||||||
|
|
||||||
sleep(1) # Wait for serve to start up
|
sleep(1) # Wait for serve to start up
|
||||||
|
|
||||||
|
|
||||||
|
#####################
|
||||||
|
### Kerberos auth ###
|
||||||
|
#####################
|
||||||
|
|
||||||
# TODO: pip install requests-kerberos
|
# TODO: pip install requests-kerberos
|
||||||
from requests_kerberos import HTTPKerberosAuth, OPTIONAL
|
from requests_kerberos import HTTPKerberosAuth, OPTIONAL
|
||||||
auth = HTTPKerberosAuth(mutual_authentication=OPTIONAL, force_preemptive=True)
|
auth = HTTPKerberosAuth(mutual_authentication=OPTIONAL, force_preemptive=True)
|
||||||
|
|
||||||
# Test session API call
|
# Test Kerberos auth
|
||||||
r = requests.get("http://ca.example.lan/api/")
|
r = requests.get("http://ca.example.lan/api/")
|
||||||
assert r.status_code == 401, r.text
|
assert r.status_code == 401, r.text
|
||||||
assert "No Kerberos ticket offered" in r.text, r.text
|
assert "No Kerberos ticket offered" in r.text, r.text
|
||||||
@ -877,6 +891,23 @@ def test_cli_setup_authority():
|
|||||||
assert r.status_code == 200, r.text
|
assert r.status_code == 200, r.text
|
||||||
|
|
||||||
|
|
||||||
|
#################
|
||||||
|
### LDAP auth ###
|
||||||
|
#################
|
||||||
|
|
||||||
|
# Test LDAP bind auth fallback
|
||||||
|
usertoken = "Basic dXNlcmJvdDpTNGw0azRsNA=="
|
||||||
|
admintoken = "Basic YWRtaW5ib3Q6UzRsNGs0bDQ="
|
||||||
|
|
||||||
|
with open("/etc/ldap/ldap.conf", "w") as fh:
|
||||||
|
fh.write("TLS_REQCERT allow") # TODO: Correct way
|
||||||
|
|
||||||
|
# curl http://ca.example.lan/api/ -u adminbot:S4l4k4l4 -H "User-agent: Android" -H "Referer: http://ca.example.lan"
|
||||||
|
r = requests.get("http://ca.example.lan/api/",
|
||||||
|
headers={"Authorization":usertoken, "User-Agent": "Android", "Referer":"http://ca.example.lan/"})
|
||||||
|
assert r.status_code == 200, r.text
|
||||||
|
|
||||||
|
|
||||||
###################
|
###################
|
||||||
### Final tests ###
|
### Final tests ###
|
||||||
###################
|
###################
|
||||||
|
Loading…
Reference in New Issue
Block a user