mirror of
				https://github.com/laurivosandi/certidude
				synced 2025-10-30 17:09:19 +00:00 
			
		
		
		
	tests: Add tests for machine keytab auth
This commit is contained in:
		
							
								
								
									
										12
									
								
								.travis.yml
									
									
									
									
									
								
							
							
						
						
									
										12
									
								
								.travis.yml
									
									
									
									
									
								
							| @@ -1,8 +1,6 @@ | ||||
| sudo: required | ||||
| language: python | ||||
| dist: | ||||
|   - trusty | ||||
|   - xenial | ||||
| dist: trusty | ||||
| python: | ||||
|   - "2.7" | ||||
| after_success: | ||||
| @@ -10,18 +8,12 @@ after_success: | ||||
| virtualenv: | ||||
|   system_site_packages: true | ||||
| install: | ||||
|   - echo "127.0.0.1 localhost" | sudo tee /etc/hosts | ||||
|   - echo "127.0.1.1 ca.example.lan ca" | sudo tee -a /etc/hosts | ||||
|   - echo "127.0.0.1 vpn.example.lan vpn" | sudo tee -a /etc/hosts | ||||
|   - echo "127.0.0.1 www.example.lan www" | sudo tee -a /etc/hosts | ||||
|   - sudo mkdir -p /etc/systemd/system | ||||
|   - sudo mkdir -p /etc/systemd/system # Until Travis is stuck with 14.04 | ||||
|   - sudo pip install -r requirements.txt | ||||
|   - sudo pip install codecov pytest-cov requests-kerberos | ||||
|   - sudo pip install -e . | ||||
| script: | ||||
|   - sudo find /home/ -type d -exec chmod 755 {} \; # Allow certidude serve to read templates | ||||
|   - sudo useradd adminbot -G sudo -p '$1$PBkf5waA$n9EV6WJ7PS6lyGWkgeTPf1' | ||||
|   - sudo useradd userbot -G users -p '$1$PBkf5waA$n9EV6WJ7PS6lyGWkgeTPf1' -c "User Bot,,," | ||||
|   - sudo chmod 777 . # Allow forked processes to write .coverage files | ||||
|   - sudo coverage run --parallel-mode --source certidude -m py.test tests | ||||
|   - sudo coverage combine | ||||
|   | ||||
| @@ -45,18 +45,21 @@ class RequestListResource(object): | ||||
|         Handle domain computer automatic enrollment | ||||
|         """ | ||||
|         machine = req.context.get("machine") | ||||
|         if config.MACHINE_ENROLLMENT_ALLOWED and machine: | ||||
|             if common_name.value != machine: | ||||
|                 raise falcon.HTTPBadRequest( | ||||
|                     "Bad request", | ||||
|                     "Common name %s differs from Kerberos credential %s!" % (common_name.value, machine)) | ||||
|         if machine: | ||||
|             if config.MACHINE_ENROLLMENT_ALLOWED: | ||||
|                 if common_name.value != machine: | ||||
|                     raise falcon.HTTPBadRequest( | ||||
|                         "Bad request", | ||||
|                         "Common name %s differs from Kerberos credential %s!" % (common_name.value, machine)) | ||||
|  | ||||
|             # Automatic enroll with Kerberos machine cerdentials | ||||
|             resp.set_header("Content-Type", "application/x-pem-file") | ||||
|             cert, resp.body = authority._sign(csr, body, overwrite=True) | ||||
|             logger.info(u"Automatically enrolled Kerberos authenticated machine %s from %s", | ||||
|                 machine, req.context.get("remote_addr")) | ||||
|             return | ||||
|                 # Automatic enroll with Kerberos machine cerdentials | ||||
|                 resp.set_header("Content-Type", "application/x-pem-file") | ||||
|                 cert, resp.body = authority._sign(csr, body, overwrite=True) | ||||
|                 logger.info(u"Automatically enrolled Kerberos authenticated machine %s from %s", | ||||
|                     machine, req.context.get("remote_addr")) | ||||
|                 return | ||||
|             else: | ||||
|                 reasons.append("Machine enrollment not allowed") | ||||
|  | ||||
|         """ | ||||
|         Attempt to renew certificate using currently valid key pair | ||||
|   | ||||
| @@ -13,8 +13,6 @@ from certidude import config, const | ||||
|  | ||||
| logger = logging.getLogger("api") | ||||
|  | ||||
| os.environ["KRB5_KTNAME"] = config.KERBEROS_KEYTAB | ||||
|  | ||||
| def authenticate(optional=False): | ||||
|     import falcon | ||||
|     def wrapper(func): | ||||
| @@ -31,6 +29,8 @@ def authenticate(optional=False): | ||||
|                     "No Kerberos ticket offered, are you sure you've logged in with domain user account?", | ||||
|                     ["Negotiate"]) | ||||
|  | ||||
|             os.environ["KRB5_KTNAME"] = config.KERBEROS_KEYTAB | ||||
|  | ||||
|             server_creds = gssapi.creds.Credentials( | ||||
|                 usage='accept', | ||||
|                 name=gssapi.names.Name('HTTP/%s'% const.FQDN)) | ||||
|   | ||||
| @@ -87,10 +87,11 @@ def setup_client(prefix="client_", dh=False): | ||||
|  | ||||
|  | ||||
| @click.command("request", help="Run processes for requesting certificates and configuring services") | ||||
| @click.option("-k", "--system-keytab-required", default=False, is_flag=True, help="Offer system keytab for auth") | ||||
| @click.option("-r", "--renew", default=False, is_flag=True, help="Renew now") | ||||
| @click.option("-f", "--fork", default=False, is_flag=True, help="Fork to background") | ||||
| @click.option("-nw", "--no-wait", default=False, is_flag=True, help="Return immideately if server doesn't autosign") | ||||
| def certidude_request(fork, renew, no_wait): | ||||
| def certidude_request(fork, renew, no_wait, system_keytab_required): | ||||
|     # Here let's try to avoid compiling packages from scratch | ||||
|     rpm("openssl") or \ | ||||
|     apt("openssl python-cryptography python-jinja2") or \ | ||||
| @@ -164,14 +165,16 @@ def certidude_request(fork, renew, no_wait): | ||||
|             endpoint_revocations_path = "/var/lib/certidude/%s/ca_crl.pem" % authority | ||||
|         # TODO: Create directories automatically | ||||
|  | ||||
|         system_keytab_required = False | ||||
|         if clients.get(authority, "trigger") == "domain joined": | ||||
|             system_keytab_required = True | ||||
|             if not os.path.exists("/etc/krb5.keytab"): | ||||
|                 continue | ||||
|         elif clients.get(authority, "trigger") != "interface up": | ||||
|             continue | ||||
|  | ||||
|         if system_keytab_required: | ||||
|             # Stop further processing if command line argument said so or trigger expects domain membership | ||||
|             if not os.path.exists("/etc/krb5.keytab"): | ||||
|                 continue | ||||
|  | ||||
|         pid_path = os.path.join(const.RUN_DIR, authority + ".pid") | ||||
|  | ||||
|         try: | ||||
| @@ -736,11 +739,11 @@ def certidude_setup_authority(username, kerberos_keytab, nginx_config, country, | ||||
|  | ||||
|     if not os.path.exists("/etc/apt/sources.list.d/nginx-stable-trusty.list"): | ||||
|         os.system("add-apt-repository -y ppa:nginx/stable") | ||||
|         os.system("apt update") | ||||
|         os.system("apt-get update") | ||||
|     if not os.path.exists("/usr/lib/nginx/modules/ngx_nchan_module.so"): | ||||
|         os.system("apt install -y libnginx-mod-nchan") | ||||
|         os.system("apt-get install -y libnginx-mod-nchan") | ||||
|     if not os.path.exists("/usr/sbin/nginx"): | ||||
|         os.system("apt install -y nginx") | ||||
|         os.system("apt-get install -y nginx") | ||||
|  | ||||
|     from cryptography import x509 | ||||
|     from cryptography.x509.oid import NameOID, ExtendedKeyUsageOID | ||||
|   | ||||
| @@ -191,19 +191,22 @@ def certidude_request_certificate(server, system_keytab_required, key_path, requ | ||||
|     # If machine is joined to domain attempt to present machine credentials for authentication | ||||
|     if system_keytab_required: | ||||
|         os.environ["KRB5CCNAME"]="/tmp/ca.ticket" | ||||
|         # If Samba configuration exists assume NetBIOS name was used in keytab | ||||
|         if os.path.exists("/etc/samba/smb.conf"): | ||||
|             from configparser import ConfigParser | ||||
|             cp = ConfigParser(delimiters=("=")) | ||||
|             cp.readfp(open("/etc/samba/smb.conf")) | ||||
|             name = cp.get("global", "netbios name") | ||||
|             os.system("kinit -S HTTP/%s -k %s$" % (server, name)) | ||||
|         else: | ||||
|             os.system("kinit -S HTTP/%s -k %s$" % (server, const.HOSTNAME.lower())) # Mac OS X | ||||
|             os.system("kinit -S HTTP/%s -k %s$" % (server, const.HOSTNAME.upper())) # Fedora /w SSSD | ||||
|  | ||||
|         # Mac OS X has keytab with lowercase hostname | ||||
|         cmd = "kinit -S HTTP/%s -k %s$" % (server, const.HOSTNAME.lower()) | ||||
|         click.echo("Executing: %s" % cmd) | ||||
|         if os.system(cmd): | ||||
|             # Fedora /w SSSD has keytab with uppercase hostname | ||||
|             cmd = "kinit -S HTTP/%s -k %s$" % (server, const.HOSTNAME.upper()) | ||||
|             if os.system(cmd): | ||||
|                 # Failed, probably /etc/krb5.keytab contains spaghetti | ||||
|                 raise ValueError("Failed to initialize TGT using machine keytab") | ||||
|         assert os.path.exists("/tmp/ca.ticket"), "Ticket not created!" | ||||
|         click.echo("Initialized Kerberos TGT using machine keytab") | ||||
|         from requests_kerberos import HTTPKerberosAuth, OPTIONAL | ||||
|         auth = HTTPKerberosAuth(mutual_authentication=OPTIONAL, force_preemptive=True) | ||||
|     else: | ||||
|         click.echo("Not using machine keytab") | ||||
|         auth = None | ||||
|  | ||||
|     click.echo("Submitting to %s, waiting for response..." % request_url) | ||||
|   | ||||
| @@ -136,8 +136,6 @@ def clean_server(): | ||||
|                 os.kill(int(fh.read()), 15) | ||||
|             except OSError: | ||||
|                 pass | ||||
|     if os.path.exists("/etc/krb5.keytab"): | ||||
|         os.unlink("/etc/krb5.keytab") | ||||
|     if os.path.exists("/etc/certidude/server.keytab"): | ||||
|         os.unlink("/etc/certidude/server.keytab") | ||||
|     if os.path.exists("/var/lib/samba/"): | ||||
| @@ -148,21 +146,39 @@ def clean_server(): | ||||
|     shutil.copyfile("/etc/resolv.conf.orig", "/etc/resolv.conf") | ||||
|  | ||||
| def test_cli_setup_authority(): | ||||
|     # apt install nano git build-essential python-dev libkrb5-dev | ||||
|     import os | ||||
|     import sys | ||||
|  | ||||
|     assert os.getuid() == 0, "Run tests as root in a clean VM or container" | ||||
|  | ||||
|     assert not os.environ.get("KRB5CCNAME"), "Environment contaminated" | ||||
|     assert not os.environ.get("KRB5_KTNAME"), "Environment contaminated" | ||||
|  | ||||
|  | ||||
|     if not os.path.exists("/etc/resolv.conf.orig"): | ||||
|         shutil.copyfile("/etc/resolv.conf", "/etc/resolv.conf.orig") | ||||
|  | ||||
|     clean_server() | ||||
|     clean_client() | ||||
|  | ||||
|     # TODO: set hostname to 'ca' | ||||
|     with open("/etc/hosts", "w") as fh: | ||||
|         fh.write("127.0.0.1 localhost\n") | ||||
|         fh.write("127.0.1.1 ca.example.lan ca\n") | ||||
|         fh.write("127.0.0.1 vpn.example.lan vpn\n") | ||||
|         fh.write("127.0.0.1 www.example.lan www\n") | ||||
|  | ||||
|     with open("/etc/passwd") as fh: # TODO: Better | ||||
|         buf = fh.read() | ||||
|         if "adminbot" not in buf: | ||||
|             os.system("useradd adminbot -G sudo -p '$1$PBkf5waA$n9EV6WJ7PS6lyGWkgeTPf1'") | ||||
|         if "userbot" not in buf: | ||||
|             os.system("useradd userbot -G users -p '$1$PBkf5waA$n9EV6WJ7PS6lyGWkgeTPf1' -c 'User Bot,,,'") | ||||
|  | ||||
|     # Bootstrap domain controller here, | ||||
|     # Samba startup takes some time | ||||
|     os.system("apt install -y samba krb5-user winbind") | ||||
|     os.system("apt-get install -y samba krb5-user winbind") | ||||
|     if os.path.exists("/etc/samba/smb.conf"): | ||||
|         os.unlink("/etc/samba/smb.conf") | ||||
|     os.system("samba-tool domain provision --server-role=dc --domain=EXAMPLE --realm=EXAMPLE.LAN --host-name=ca") | ||||
| @@ -170,6 +186,8 @@ def test_cli_setup_authority(): | ||||
|     os.system("samba-tool user add adminbot S4l4k4l4 --given-name='Admin' --surname='Bot'") | ||||
|     os.system("samba-tool group addmembers 'Domain Admins' adminbot") | ||||
|     os.system("samba-tool user setpassword administrator --newpassword=S4l4k4l4") | ||||
|     if os.path.exists("/etc/krb5.keytab"): | ||||
|         os.unlink("/etc/krb5.keytab") | ||||
|     os.symlink("/var/lib/samba/private/secrets.keytab", "/etc/krb5.keytab") | ||||
|     os.chmod("/var/lib/samba/private/secrets.keytab", 0644) # To allow access to certidude server | ||||
|     if os.path.exists("/etc/krb5.conf"): # Remove the one from krb5-user package | ||||
| @@ -180,6 +198,18 @@ def test_cli_setup_authority(): | ||||
|     # TODO: dig -t srv perhaps? | ||||
|     os.system("samba") | ||||
|  | ||||
|     # Samba bind 636 late (probably generating keypair) | ||||
|     # so LDAPS connections below will fail | ||||
|     timeout = 0 | ||||
|     while timeout < 30: | ||||
|         if os.path.exists("/var/lib/samba/private/tls/cert.pem"): | ||||
|             break | ||||
|         sleep(1) | ||||
|         timeout += 1 | ||||
|     else: | ||||
|         assert False, "Samba startup timed out" | ||||
|  | ||||
|  | ||||
|     from certidude.cli import entry_point as cli | ||||
|     from certidude import const | ||||
|  | ||||
| @@ -809,6 +839,25 @@ def test_cli_setup_authority(): | ||||
|     assert "Writing certificate to:" in result.output, result.output | ||||
|  | ||||
|  | ||||
|     ###################################### | ||||
|     ### Test revocation on client side ### | ||||
|     ###################################### | ||||
|  | ||||
|     # First revoke on server side | ||||
|     child_pid = os.fork() | ||||
|     if not child_pid: | ||||
|         result = runner.invoke(cli, ['revoke', 'roadwarrior4']) | ||||
|         assert not result.exception, result.output | ||||
|         return | ||||
|     else: | ||||
|         os.waitpid(child_pid, 0) | ||||
|  | ||||
|     # Make sure check is ran on the client side | ||||
|     result = runner.invoke(cli, ["request", "--no-wait"]) | ||||
|     assert not result.exception, result.output | ||||
|     assert "Certificate has been revoked, wiping keys and certificates" in result.output, result.output | ||||
|     assert "Writing certificate to:" in result.output, result.output | ||||
|  | ||||
|  | ||||
|     #################################### | ||||
|     ### Switch to Kerberos/LDAP auth ### | ||||
| @@ -845,6 +894,8 @@ def test_cli_setup_authority(): | ||||
|     os.system("sed -e 's/backends = pam/backends = kerberos ldap/g' -i /etc/certidude/server.conf") | ||||
|     os.system("sed -e 's/backend = posix/backend = ldap/g' -i /etc/certidude/server.conf") | ||||
|     os.system("sed -e 's/dc1/ca/g' -i /etc/cron.hourly/certidude") | ||||
|     os.system("sed -e 's/autosign subnets =.*/autosign subnets =/g' -i /etc/certidude/server.conf") | ||||
|     os.system("sed -e 's/machine enrollment =.*/machine enrollment = allowed/g' -i /etc/certidude/server.conf") | ||||
|  | ||||
|     # Update server credential cache | ||||
|     with open("/etc/cron.hourly/certidude") as fh: | ||||
| @@ -912,24 +963,46 @@ def test_cli_setup_authority(): | ||||
|     assert r.status_code == 200, r.text | ||||
|  | ||||
|  | ||||
|     ########################### | ||||
|     ### Machine keytab auth ### | ||||
|     ########################### | ||||
|  | ||||
|     assert not os.environ.get("KRB5_KTNAME"), "Environment contaminated" | ||||
|  | ||||
|     mach_pid = os.fork() # Otherwise results in Terminated, needs investigation why | ||||
|     if not mach_pid: | ||||
|         clean_client() | ||||
|  | ||||
|         # Test non-matching CN | ||||
|         result = runner.invoke(cli, ['setup', 'openvpn', 'client', "-cn", "somethingelse", "ca.example.lan", "vpn.example.lan"]) | ||||
|         assert not result.exception, result.output | ||||
|  | ||||
|         with open("/etc/certidude/client.conf", "a") as fh: | ||||
|             fh.write("insecure = true\n") | ||||
|  | ||||
|         result = runner.invoke(cli, ["request", "--no-wait", "--system-keytab-required"]) | ||||
|         assert result.exception, result.output # Bad request 400 | ||||
|  | ||||
|         # With matching CN it should work | ||||
|         clean_client() | ||||
|  | ||||
|         result = runner.invoke(cli, ['setup', 'openvpn', 'client', "-cn", "ca", "ca.example.lan", "vpn.example.lan"]) | ||||
|         assert not result.exception, result.output | ||||
|  | ||||
|         with open("/etc/certidude/client.conf", "a") as fh: | ||||
|             fh.write("insecure = true\n") | ||||
|  | ||||
|         result = runner.invoke(cli, ["request", "--no-wait", "--system-keytab-required"]) | ||||
|         assert not result.exception, result.output | ||||
|         assert "Writing certificate to:" in result.output, result.output | ||||
|     else: | ||||
|         os.waitpid(mach_pid, 0) | ||||
|  | ||||
|  | ||||
|     ################### | ||||
|     ### Final tests ### | ||||
|     ################### | ||||
|  | ||||
|     # Test revocation on command-line | ||||
|     child_pid = os.fork() | ||||
|     if not child_pid: | ||||
|         result = runner.invoke(cli, ['revoke', 'roadwarrior4']) | ||||
|         assert not result.exception, result.output | ||||
|         return | ||||
|     else: | ||||
|         os.waitpid(child_pid, 0) | ||||
|  | ||||
|     # Test revocation check on client side | ||||
|     result = runner.invoke(cli, ["request", "--no-wait"]) | ||||
|     assert not result.exception, result.output | ||||
|     assert "Certificate has been revoked, wiping keys and certificates" in result.output, result.output | ||||
|     assert "Writing certificate to:" in result.output, result.output | ||||
|  | ||||
|     result = runner.invoke(cli, ['list', '-srv']) | ||||
|     assert not result.exception, result.output | ||||
|   | ||||
		Reference in New Issue
	
	Block a user