mirror of
https://github.com/laurivosandi/certidude
synced 2025-09-05 21:31:19 +00:00
tests: Preliminary tests for Kerberos/LDAP auth
This commit is contained in:
@@ -93,6 +93,7 @@ def clean_server():
|
||||
except OSError:
|
||||
pass
|
||||
|
||||
|
||||
if os.path.exists("/var/lib/certidude/ca.example.lan"):
|
||||
shutil.rmtree("/var/lib/certidude/ca.example.lan")
|
||||
if os.path.exists("/etc/certidude/server.conf"):
|
||||
@@ -126,10 +127,25 @@ def clean_server():
|
||||
if os.path.exists("/etc/openvpn/keys"):
|
||||
shutil.rmtree("/etc/openvpn/keys")
|
||||
|
||||
# System packages
|
||||
os.system("apt purge -y nginx libnginx-mod-nchan openvpn strongswan")
|
||||
os.system("apt-get -y autoremove")
|
||||
# Stop samba
|
||||
if os.path.exists("/run/samba/samba.pid"):
|
||||
with open("/run/samba/samba.pid") as fh:
|
||||
try:
|
||||
os.kill(int(fh.read()), 15)
|
||||
except OSError:
|
||||
pass
|
||||
if os.path.exists("/etc/krb5.conf"):
|
||||
os.unlink("/etc/krb5.conf")
|
||||
if os.path.exists("/etc/krb5.keytab"):
|
||||
os.unlink("/etc/krb5.keytab")
|
||||
if os.path.exists("/etc/certidude/server.keytab"):
|
||||
os.unlink("/etc/certidude/server.keytab")
|
||||
if os.path.exists("/var/lib/samba/"):
|
||||
shutil.rmtree("/var/lib/samba")
|
||||
os.makedirs("/var/lib/samba")
|
||||
|
||||
# Restore initial resolv.conf
|
||||
shutil.copyfile("/etc/resolv.conf.orig", "/etc/resolv.conf")
|
||||
|
||||
def test_cli_setup_authority():
|
||||
import os
|
||||
@@ -137,9 +153,30 @@ def test_cli_setup_authority():
|
||||
|
||||
assert os.getuid() == 0, "Run tests as root in a clean VM or container"
|
||||
|
||||
if not os.path.exists("/etc/resolv.conf.orig"):
|
||||
shutil.copyfile("/etc/resolv.conf", "/etc/resolv.conf.orig")
|
||||
|
||||
clean_server()
|
||||
clean_client()
|
||||
|
||||
|
||||
# Bootstrap domain controller here,
|
||||
# Samba startup takes some time
|
||||
os.system("apt install -y samba krb5-user winbind")
|
||||
if os.path.exists("/etc/samba/smb.conf"):
|
||||
os.unlink("/etc/samba/smb.conf")
|
||||
os.system("samba-tool domain provision --server-role=dc --domain=EXAMPLE --realm=EXAMPLE.LAN --host-name=ca")
|
||||
os.system("samba-tool user add userbot S4l4k4l4 --given-name='User' --surname='Bot'")
|
||||
os.system("samba-tool user add adminbot S4l4k4l4 --given-name='Admin' --surname='Bot'")
|
||||
os.system("samba-tool user setpassword administrator --newpassword=S4l4k4l4")
|
||||
os.symlink("/var/lib/samba/private/secrets.keytab", "/etc/krb5.keytab")
|
||||
os.chmod("/var/lib/samba/private/secrets.keytab", 0644) # To allow access to certidude server
|
||||
os.symlink("/var/lib/samba/private/krb5.conf", "/etc/krb5.conf")
|
||||
with open("/etc/resolv.conf", "w") as fh:
|
||||
fh.write("nameserver 127.0.0.1\nsearch example.lan\n")
|
||||
# TODO: dig -t srv perhaps?
|
||||
os.system("samba")
|
||||
|
||||
from certidude.cli import entry_point as cli
|
||||
from certidude import const
|
||||
|
||||
@@ -156,7 +193,7 @@ def test_cli_setup_authority():
|
||||
assert os.system("nginx -t") == 0, "invalid nginx configuration"
|
||||
assert os.path.exists("/run/nginx.pid"), "nginx wasn't started up properly"
|
||||
|
||||
from certidude import config, authority
|
||||
from certidude import config, authority, auth, user
|
||||
assert authority.ca_cert.serial_number >= 0x100000000000000000000000000000000000000
|
||||
assert authority.ca_cert.serial_number <= 0xfffffffffffffffffffffffffffffffffffffff
|
||||
assert authority.ca_cert.not_valid_before < datetime.now()
|
||||
@@ -182,7 +219,7 @@ def test_cli_setup_authority():
|
||||
server_pid = os.fork()
|
||||
if not server_pid:
|
||||
# Fork to prevent umask, setuid, setgid side effects
|
||||
result = runner.invoke(cli, ['serve', '-p', '8080', '-l', '127.0.1.1'])
|
||||
result = runner.invoke(cli, ['serve', '-p', '8080', '-l', '127.0.1.1', '-e'])
|
||||
assert not result.exception, result.output
|
||||
return
|
||||
|
||||
@@ -519,15 +556,13 @@ def test_cli_setup_authority():
|
||||
# Test session API call
|
||||
r = client().simulate_get("/api/", headers={"Authorization":usertoken})
|
||||
assert r.status_code == 200
|
||||
|
||||
r = client().simulate_get("/api/", headers={"Authorization":admintoken})
|
||||
assert r.status_code == 200
|
||||
|
||||
r = client().simulate_get("/api/", headers={"Accept":"text/plain", "Authorization":admintoken})
|
||||
assert r.status_code == 415 # invalid media type
|
||||
|
||||
r = client().simulate_get("/api/")
|
||||
assert r.status_code == 401
|
||||
assert "Please authenticate" in r.text
|
||||
|
||||
|
||||
# Test token mech
|
||||
@@ -568,9 +603,14 @@ def test_cli_setup_authority():
|
||||
# Beyond this point don't use client()
|
||||
const.STORAGE_PATH = "/tmp/"
|
||||
|
||||
|
||||
#############
|
||||
### nginx ###
|
||||
#############
|
||||
|
||||
# In this case nginx is set up as web server with TLS certificates
|
||||
# generated by certidude.
|
||||
|
||||
clean_client()
|
||||
|
||||
result = runner.invoke(cli, ["setup", "nginx", "-cn", "www", "ca.example.lan"])
|
||||
@@ -612,11 +652,15 @@ def test_cli_setup_authority():
|
||||
# Test nginx setup
|
||||
assert os.system("nginx -t") == 0, "Generated nginx config was invalid"
|
||||
|
||||
# TODO: test client verification with curl
|
||||
|
||||
|
||||
###############
|
||||
### OpenVPN ###
|
||||
###############
|
||||
|
||||
# First OpenVPN server is set up
|
||||
|
||||
clean_client()
|
||||
|
||||
if not os.path.exists("/etc/openvpn/keys"):
|
||||
@@ -651,7 +695,8 @@ def test_cli_setup_authority():
|
||||
assert os.path.exists("/tmp/ca.example.lan/server_cert.pem")
|
||||
assert os.path.exists("/etc/openvpn/site-to-client.conf")
|
||||
|
||||
# Reset config
|
||||
# Secondly OpenVPN client is set up
|
||||
|
||||
os.unlink("/etc/certidude/client.conf")
|
||||
os.unlink("/etc/certidude/services.conf")
|
||||
|
||||
@@ -669,8 +714,9 @@ def test_cli_setup_authority():
|
||||
assert "Writing certificate to:" in result.output, result.output
|
||||
assert os.path.exists("/etc/openvpn/client-to-site.conf")
|
||||
|
||||
# TODO: Check that tunnel interfaces came up, perhaps try to ping?
|
||||
# TODO: assert key, req, cert paths were included correctly in OpenVPN config
|
||||
# TODO: test client verification with curl
|
||||
|
||||
|
||||
###############
|
||||
### IPSec ###
|
||||
@@ -755,6 +801,74 @@ def test_cli_setup_authority():
|
||||
|
||||
|
||||
|
||||
####################################
|
||||
### Switch to Kerberos/LDAP auth ###
|
||||
####################################
|
||||
|
||||
# Shut down current instance
|
||||
requests.get("http://ca.example.lan/api/exit")
|
||||
requests.get("http://ca.example.lan/api/")
|
||||
os.waitpid(server_pid, 0)
|
||||
|
||||
# Hacks, note that CA is domain controller
|
||||
assert os.system("kdestroy") == 0
|
||||
assert not os.path.exists("/tmp/krb5cc_0")
|
||||
|
||||
assert os.system("echo S4l4k4l4 | kinit administrator") == 0
|
||||
assert os.path.exists("/tmp/krb5cc_0")
|
||||
os.system("sed -e 's/CA/CA\\nkerberos method = system keytab/' -i /etc/samba/smb.conf ")
|
||||
|
||||
# Create service principals
|
||||
spn_pid = os.fork()
|
||||
if not spn_pid:
|
||||
assert os.getuid() == 0 and os.getgid() == 0
|
||||
os.environ["KRB5_KTNAME"] = "FILE:/etc/certidude/server.keytab"
|
||||
assert os.system("net ads keytab add HTTP -k") == 0
|
||||
assert os.path.exists("/etc/certidude/server.keytab")
|
||||
os.system("chown root:certidude /etc/certidude/server.keytab")
|
||||
os.system("chmod 640 /etc/certidude/server.keytab")
|
||||
return
|
||||
else:
|
||||
os.waitpid(spn_pid, 0)
|
||||
|
||||
os.system("sed -e 's/ldap uri = ldaps:.*/ldap uri = ldaps:\\/\\/ca.example.lan/g' -i /etc/certidude/server.conf")
|
||||
os.system("sed -e 's/ldap uri = ldap:.*/ldap uri = ldap:\\/\\/ca.example.lan/g' -i /etc/certidude/server.conf")
|
||||
os.system("sed -e 's/backends = pam/backends = kerberos/g' -i /etc/certidude/server.conf")
|
||||
os.system("sed -e 's/backend = posix/backend = ldap/g' -i /etc/certidude/server.conf")
|
||||
os.system("/etc/cron.hourly/certidude") # Update server credential cache
|
||||
|
||||
result = runner.invoke(cli, ['users'])
|
||||
assert not result.exception, result.output
|
||||
# TODO: assert "Administrator@example.lan" in result.output
|
||||
|
||||
server_pid = os.fork() # Fork to prevent environment contamination
|
||||
if not server_pid:
|
||||
# Apply /etc/certidude/server.conf changes
|
||||
reload(config)
|
||||
reload(user)
|
||||
reload(auth)
|
||||
|
||||
assert isinstance(user.User.objects, user.ActiveDirectoryUserManager), user.User.objects
|
||||
result = runner.invoke(cli, ['serve', '-p', '8080', '-l', '127.0.1.1', '-e'])
|
||||
assert not result.exception, result.output
|
||||
return
|
||||
|
||||
sleep(1) # Wait for serve to start up
|
||||
|
||||
# TODO: pip install requests-kerberos
|
||||
from requests_kerberos import HTTPKerberosAuth, OPTIONAL
|
||||
auth = HTTPKerberosAuth(mutual_authentication=OPTIONAL, force_preemptive=True)
|
||||
|
||||
# Test session API call
|
||||
r = requests.get("http://ca.example.lan/api/")
|
||||
assert r.status_code == 401, r.text
|
||||
assert "No Kerberos ticket offered" in r.text, r.text
|
||||
r = requests.get("http://ca.example.lan/api/", headers={"Authorization": "Negotiate blerrgh"})
|
||||
assert r.status_code == 400, r.text
|
||||
r = requests.get("http://ca.example.lan/api/", auth=auth)
|
||||
assert r.status_code == 200, r.text
|
||||
|
||||
|
||||
###################
|
||||
### Final tests ###
|
||||
###################
|
||||
@@ -783,18 +897,18 @@ def test_cli_setup_authority():
|
||||
assert authority.signer_exec("exit") == "ok"
|
||||
|
||||
# Shut down server
|
||||
with open("/run/certidude/server.pid") as fh:
|
||||
os.kill(int(fh.read()), 1)
|
||||
requests.get("http://ca.example.lan/api/exit")
|
||||
os.waitpid(server_pid, 0)
|
||||
|
||||
# Note: STORAGE_PATH was mangled above, hence it's /tmp not /var/lib/certidude
|
||||
assert open("/etc/apparmor.d/local/usr.lib.ipsec.charon").read() == "/tmp/** r,\n"
|
||||
|
||||
assert len(inbox) == 0, inbox # Make sure all messages were checked
|
||||
|
||||
os.waitpid(server_pid, 0)
|
||||
|
||||
os.system("service nginx stop")
|
||||
os.system("service openvpn stop")
|
||||
os.system("ipsec stop")
|
||||
|
||||
clean_server()
|
||||
|
||||
if __name__ == "__main__":
|
||||
test_cli_setup_authority()
|
||||
|
Reference in New Issue
Block a user