mirror of
https://github.com/laurivosandi/certidude
synced 2025-09-05 21:31:19 +00:00
Several updates #2
* Reverse RDN components for all certs * Less side effects in unittests * Split help dialog shell snippets into separate files * Restore 'admin subnets' config option * Embedded subnets, IKE and ESP proposals now configurable in builder.conf * Use expr instead of bc for math operations in shell * Better frontend support for Let's Encrypt certificates
This commit is contained in:
@@ -49,7 +49,7 @@ def client():
|
||||
|
||||
def generate_csr(cn=None):
|
||||
|
||||
public_key, private_key = asymmetric.generate_pair('rsa', bit_size=2048)
|
||||
public_key, private_key = asymmetric.generate_pair('ec', curve="secp384r1")
|
||||
builder = CSRBuilder({ 'common_name': cn }, public_key)
|
||||
request = builder.build(private_key)
|
||||
return pem_armor_csr(request)
|
||||
@@ -116,6 +116,10 @@ def clean_server():
|
||||
"/etc/nginx/sites-enabled/certidude.conf",
|
||||
"/etc/nginx/conf.d/tls.conf",
|
||||
"/etc/certidude/server.keytab",
|
||||
"/tmp/sscep/ca.pem",
|
||||
"/tmp/key.pem",
|
||||
"/tmp/req.pem",
|
||||
"/tmp/cert.pem",
|
||||
]
|
||||
|
||||
for filename in files:
|
||||
@@ -142,14 +146,19 @@ def clean_server():
|
||||
# Restore initial resolv.conf
|
||||
shutil.copyfile("/etc/resolv.conf.orig", "/etc/resolv.conf")
|
||||
|
||||
def assert_cleanliness():
|
||||
assert os.getuid() == 0, "Environment contaminated, UID: %d" % os.getuid()
|
||||
assert os.getgid() == 0, "Environment contaminated, GID: %d" % os.getgid()
|
||||
assert not os.environ.get("KRB5_KTNAME"), "Environment contaminated, KRB5_KTNAME=%s" % os.environ.get("KRB5_KTNAME")
|
||||
assert not os.environ.get("KRB5CCNAME"), "Environment contaminated, KRB5CCNAME=%s" % os.environ.get("KRB5CCNAME")
|
||||
|
||||
def test_cli_setup_authority():
|
||||
assert os.getuid() == 0, "Run tests as root in a clean VM or container"
|
||||
assert check_output(["/bin/hostname", "-f"]) == b"ca.example.lan\n", "As a safety precaution, unittests only run in a machine whose hostanme -f is ca.example.lan"
|
||||
|
||||
os.system("DEBIAN_FRONTEND=noninteractive apt-get install -qq -y git build-essential python-dev libkrb5-dev")
|
||||
os.system("DEBIAN_FRONTEND=noninteractive apt-get install -qq -y git build-essential python-dev libkrb5-dev samba krb5-user winbind bc")
|
||||
|
||||
assert not os.environ.get("KRB5CCNAME"), "Environment contaminated"
|
||||
assert not os.environ.get("KRB5_KTNAME"), "Environment contaminated"
|
||||
assert_cleanliness()
|
||||
|
||||
# Mock Fedora
|
||||
for util in "/usr/bin/chcon", "/usr/bin/dnf", "/usr/bin/update-ca-trust", "/usr/sbin/dmidecode":
|
||||
@@ -196,34 +205,23 @@ def test_cli_setup_authority():
|
||||
assert const.HOSTNAME == "ca"
|
||||
assert const.DOMAIN == "example.lan"
|
||||
|
||||
# Bootstrap authority
|
||||
bootstrap_pid = os.fork() # TODO: this shouldn't be necessary
|
||||
if not bootstrap_pid:
|
||||
assert os.getuid() == 0 and os.getgid() == 0
|
||||
result = runner.invoke(cli, ["setup", "authority"])
|
||||
assert not result.exception, result.output
|
||||
return
|
||||
else:
|
||||
os.waitpid(bootstrap_pid, 0)
|
||||
os.system("certidude setup authority --elliptic-curve")
|
||||
|
||||
assert os.getuid() == 0 and os.getgid() == 0, "Environment contaminated"
|
||||
assert_cleanliness()
|
||||
|
||||
|
||||
# Make sure nginx is running
|
||||
assert os.system("nginx -t") == 0, "invalid nginx configuration"
|
||||
os.system("service nginx restart")
|
||||
assert os.path.exists("/run/nginx.pid"), "nginx wasn't started up properly"
|
||||
|
||||
# Make sure we generated legit CA certificate
|
||||
from certidude import config, authority, auth, user
|
||||
from certidude import config, authority, user
|
||||
assert authority.certificate.serial_number >= 0x100000000000000000000000000000000000000
|
||||
assert authority.certificate.serial_number <= 0xfffffffffffffffffffffffffffffffffffffff
|
||||
assert authority.certificate["tbs_certificate"]["validity"]["not_before"].native.replace(tzinfo=None) < datetime.utcnow()
|
||||
assert authority.certificate["tbs_certificate"]["validity"]["not_after"].native.replace(tzinfo=None) > datetime.utcnow() + timedelta(days=7000)
|
||||
assert authority.server_flags("lauri@fedora-123") == False
|
||||
assert authority.server_flags("fedora-123") == False
|
||||
assert authority.server_flags("vpn.example.lan") == True
|
||||
assert authority.server_flags("lauri@a.b.c") == False
|
||||
assert authority.certificate["tbs_certificate"]["validity"]["not_before"].native.replace(tzinfo=None) < datetime.utcnow()
|
||||
assert authority.public_key.algorithm == "ec"
|
||||
|
||||
# Generate garbage
|
||||
with open("/var/lib/certidude/ca.example.lan/bla", "w") as fh:
|
||||
@@ -237,19 +235,18 @@ def test_cli_setup_authority():
|
||||
|
||||
# Start server before any signing operations are performed
|
||||
config.CERTIFICATE_RENEWAL_ALLOWED = True
|
||||
assert_cleanliness()
|
||||
|
||||
server_pid = os.fork()
|
||||
if not server_pid:
|
||||
# Fork to prevent umask, setuid, setgid side effects
|
||||
result = runner.invoke(cli, ['serve'])
|
||||
assert not result.exception, result.output
|
||||
return
|
||||
|
||||
sleep(1) # Wait for serve to start up
|
||||
import requests
|
||||
for j in range(0,10):
|
||||
r = requests.get("http://ca.example.lan/api/")
|
||||
if r.status_code != 502:
|
||||
break
|
||||
sleep(1)
|
||||
assert r.status_code == 401, "Timed out starting up the API backend"
|
||||
|
||||
# TODO: check that port 8080 is listening, otherwise app probably crashed
|
||||
|
||||
import requests
|
||||
|
||||
# Test CA certificate fetch
|
||||
buf = open("/var/lib/certidude/ca.example.lan/ca_cert.pem").read()
|
||||
@@ -621,10 +618,7 @@ def test_cli_setup_authority():
|
||||
assert r.status_code == 415 # invalid media type
|
||||
|
||||
r = client().simulate_get("/api/", headers={"Authorization":usertoken})
|
||||
assert r.status_code == 200
|
||||
assert r.headers.get('content-type').startswith("application/json")
|
||||
assert r.json, r.text
|
||||
assert not r.json.get("authority"), r.text # No permissions to admin
|
||||
assert r.status_code == 403 # regular users have no access
|
||||
|
||||
r = client().simulate_get("/api/", headers={"Authorization":admintoken})
|
||||
assert r.status_code == 200
|
||||
@@ -665,7 +659,6 @@ def test_cli_setup_authority():
|
||||
assert not result.exception, result.output # client conf already exists, remove to regenerate
|
||||
|
||||
with open("/etc/certidude/client.conf", "a") as fh:
|
||||
fh.write("insecure = true\n")
|
||||
fh.write("autosign = false\n")
|
||||
|
||||
assert not os.path.exists("/etc/certidude/authority/ca.example.lan/server_cert.pem")
|
||||
@@ -721,7 +714,6 @@ def test_cli_setup_authority():
|
||||
assert not result.exception, result.output # client conf already exists, remove to regenerate
|
||||
|
||||
with open("/etc/certidude/client.conf", "a") as fh:
|
||||
fh.write("insecure = true\n")
|
||||
fh.write("autosign = false\n")
|
||||
|
||||
assert not os.path.exists("/etc/certidude/authority/ca.example.lan/server_cert.pem")
|
||||
@@ -761,9 +753,6 @@ def test_cli_setup_authority():
|
||||
result = runner.invoke(cli, ['setup', 'openvpn', 'client', "-cn", "roadwarrior1", "ca.example.lan", "vpn.example.lan"])
|
||||
assert not result.exception, result.output # client conf already exists, remove to regenerate
|
||||
|
||||
with open("/etc/certidude/client.conf", "a") as fh:
|
||||
fh.write("insecure = true\n")
|
||||
|
||||
result = runner.invoke(cli, ["enroll", "--skip-self", "--no-wait"])
|
||||
assert not result.exception, result.output
|
||||
assert not os.path.exists("/run/certidude/ca.example.lan.pid"), result.output
|
||||
@@ -778,9 +767,6 @@ def test_cli_setup_authority():
|
||||
result = runner.invoke(cli, ['setup', 'openvpn', 'networkmanager', "-cn", "roadwarrior3", "ca.example.lan", "vpn.example.lan"])
|
||||
assert not result.exception, result.output
|
||||
|
||||
with open("/etc/certidude/client.conf", "a") as fh:
|
||||
fh.write("insecure = true\n")
|
||||
|
||||
result = runner.invoke(cli, ["enroll", "--skip-self", "--no-wait"])
|
||||
assert not result.exception, result.output
|
||||
assert not os.path.exists("/run/certidude/ca.example.lan.pid"), result.output
|
||||
@@ -947,7 +933,6 @@ def test_cli_setup_authority():
|
||||
assert not os.path.exists("/var/lib/certidude/ca.example.lan/signed/ipsec.example.lan.pem")
|
||||
|
||||
with open("/etc/certidude/client.conf", "a") as fh:
|
||||
fh.write("insecure = true\n")
|
||||
fh.write("autosign = false\n")
|
||||
|
||||
result = runner.invoke(cli, ["enroll", "--skip-self", "--no-wait"])
|
||||
@@ -985,9 +970,6 @@ def test_cli_setup_authority():
|
||||
result = runner.invoke(cli, ['setup', 'strongswan', 'client', "-cn", "roadwarrior2", "ca.example.lan", "ipsec.example.lan"])
|
||||
assert not result.exception, result.output # client conf already exists, remove to regenerate
|
||||
|
||||
with open("/etc/certidude/client.conf", "a") as fh:
|
||||
fh.write("insecure = true\n")
|
||||
|
||||
result = runner.invoke(cli, ["enroll", "--skip-self", "--no-wait"])
|
||||
assert not result.exception, result.output
|
||||
assert not os.path.exists("/run/certidude/ca.example.lan.pid"), result.output
|
||||
@@ -1001,9 +983,6 @@ def test_cli_setup_authority():
|
||||
result = runner.invoke(cli, ['setup', 'strongswan', 'networkmanager', "-cn", "roadwarrior4", "ca.example.lan", "ipsec.example.lan"])
|
||||
assert not result.exception, result.output
|
||||
|
||||
with open("/etc/certidude/client.conf", "a") as fh:
|
||||
fh.write("insecure = true\n")
|
||||
|
||||
result = runner.invoke(cli, ["enroll", "--skip-self", "--no-wait"])
|
||||
assert not result.exception, result.output
|
||||
assert not os.path.exists("/run/certidude/ca.example.lan.pid"), result.output
|
||||
@@ -1075,15 +1054,9 @@ def test_cli_setup_authority():
|
||||
### Switch to Kerberos/LDAP auth ###
|
||||
####################################
|
||||
|
||||
# Shut down current instance
|
||||
os.kill(server_pid, 15)
|
||||
requests.get("http://ca.example.lan/api/")
|
||||
# sleep(2)
|
||||
# os.kill(server_pid, 9) # TODO: Figure out why doesn't shut down gracefully
|
||||
os.waitpid(server_pid, 0)
|
||||
os.system("systemctl stop certidude")
|
||||
|
||||
# Install packages
|
||||
os.system("apt-get install -y samba krb5-user winbind bc")
|
||||
clean_server()
|
||||
|
||||
# Bootstrap domain controller here,
|
||||
@@ -1114,17 +1087,6 @@ def test_cli_setup_authority():
|
||||
else:
|
||||
assert False, "Samba startup timed out"
|
||||
|
||||
# Bootstrap authority
|
||||
bootstrap_pid = os.fork() # TODO: this shouldn't be necessary
|
||||
if not bootstrap_pid:
|
||||
result = runner.invoke(cli, ["setup", "authority", "--skip-packages", "--elliptic-curve"])
|
||||
assert not result.exception, result.output
|
||||
return
|
||||
else:
|
||||
os.waitpid(bootstrap_pid, 0)
|
||||
|
||||
assert os.getuid() == 0 and os.getgid() == 0, "Environment contaminated"
|
||||
|
||||
# (re)auth against DC
|
||||
assert os.system("kdestroy") == 0
|
||||
assert not os.path.exists("/tmp/krb5cc_0")
|
||||
@@ -1144,45 +1106,55 @@ def test_cli_setup_authority():
|
||||
else:
|
||||
os.waitpid(spn_pid, 0)
|
||||
|
||||
assert_cleanliness()
|
||||
r = requests.get("http://ca.example.lan/api/")
|
||||
assert r.status_code == 502, r.text
|
||||
|
||||
|
||||
|
||||
# Bootstrap authority
|
||||
assert not os.path.exists("/var/lib/certidude/ca.example.lan/ca_key.pem")
|
||||
os.system("certidude setup authority --skip-packages")
|
||||
|
||||
|
||||
# Make modifications to /etc/certidude/server.conf so
|
||||
# Certidude would auth against domain controller
|
||||
os.system("sed -e 's/ldap uri = ldaps:.*/ldap uri = ldaps:\\/\\/ca.example.lan/g' -i /etc/certidude/server.conf")
|
||||
os.system("sed -e 's/ldap uri = ldap:.*/ldap uri = ldap:\\/\\/ca.example.lan/g' -i /etc/certidude/server.conf")
|
||||
os.system("sed -e 's/dc1/ca/g' -i /etc/cron.hourly/certidude")
|
||||
os.system("sed -e 's/autosign subnets =.*/autosign subnets =/g' -i /etc/certidude/server.conf")
|
||||
os.system("sed -e 's/machine enrollment subnets =.*/machine enrollment subnets = 0.0.0.0\\/0/g' -i /etc/certidude/server.conf")
|
||||
os.system("sed -e 's/scep subnets =.*/scep subnets = 0.0.0.0\\/0/g' -i /etc/certidude/server.conf")
|
||||
os.system("sed -e 's/ocsp subnets =.*/ocsp subnets =/g' -i /etc/certidude/server.conf")
|
||||
os.system("sed -e 's/crl subnets =.*/crl subnets =/g' -i /etc/certidude/server.conf")
|
||||
os.system("sed -e 's/address = certificates@example.lan/address =/g' -i /etc/certidude/server.conf")
|
||||
from certidude.common import pip
|
||||
|
||||
# Update server credential cache
|
||||
os.system("sed -e 's/dc1/ca/g' -i /etc/cron.hourly/certidude")
|
||||
with open("/etc/cron.hourly/certidude") as fh:
|
||||
cronjob = fh.read()
|
||||
assert "ldap/ca.example.lan" in cronjob, cronjob
|
||||
os.system("/etc/cron.hourly/certidude")
|
||||
assert os.system("/etc/cron.hourly/certidude") == 0
|
||||
assert os.path.exists("/run/certidude/krb5cc")
|
||||
assert os.stat("/run/certidude/krb5cc").st_uid != 0, "Incorrect persmissions for /run/certidude/krb5cc"
|
||||
|
||||
server_pid = os.fork() # Fork to prevent environment contamination
|
||||
if not server_pid:
|
||||
# Apply /etc/certidude/server.conf changes
|
||||
reload(config)
|
||||
reload(user)
|
||||
reload(auth)
|
||||
assert isinstance(user.User.objects, user.ActiveDirectoryUserManager), user.User.objects
|
||||
# Start certidude backend
|
||||
assert os.system("systemctl restart certidude") == 0
|
||||
assert_cleanliness()
|
||||
|
||||
result = runner.invoke(cli, ['users'])
|
||||
assert not result.exception, result.output
|
||||
assert "user;userbot;User;Bot;userbot@example.lan" in result.output
|
||||
assert "admin;adminbot;Admin;Bot;adminbot@example.lan" in result.output
|
||||
assert "admin;Administrator;Administrator;;Administrator@example.lan" in result.output
|
||||
# Apply /etc/certidude/server.conf changes
|
||||
reload(config)
|
||||
reload(user)
|
||||
reload(authority)
|
||||
|
||||
assert authority.public_key.algorithm == "rsa"
|
||||
assert isinstance(user.User.objects, user.ActiveDirectoryUserManager), user.User.objects
|
||||
|
||||
result = runner.invoke(cli, ['users'])
|
||||
assert not result.exception, result.output
|
||||
assert "user;userbot;User;Bot;userbot@example.lan" in result.output
|
||||
assert "admin;adminbot;Admin;Bot;adminbot@example.lan" in result.output
|
||||
assert "admin;Administrator;Administrator;;Administrator@example.lan" in result.output
|
||||
|
||||
result = runner.invoke(cli, ['serve'])
|
||||
assert not result.exception, result.output
|
||||
return
|
||||
|
||||
# Wait for serve to start up
|
||||
for j in range(0,10):
|
||||
@@ -1215,10 +1187,15 @@ def test_cli_setup_authority():
|
||||
### Kerberos auth ###
|
||||
#####################
|
||||
|
||||
# TODO: pip install requests-kerberos
|
||||
# TODO: pip3 install requests-kerberos
|
||||
assert_cleanliness()
|
||||
|
||||
assert os.stat("/run/certidude/krb5cc").st_uid != 0, "Incorrect persmissions for /run/certidude/krb5cc"
|
||||
|
||||
from requests_kerberos import HTTPKerberosAuth, OPTIONAL
|
||||
auth = HTTPKerberosAuth(mutual_authentication=OPTIONAL, force_preemptive=True)
|
||||
|
||||
|
||||
# Test Kerberos auth
|
||||
r = requests.get("http://ca.example.lan/api/")
|
||||
assert r.status_code == 401, r.text
|
||||
@@ -1229,6 +1206,8 @@ def test_cli_setup_authority():
|
||||
r = requests.get("http://ca.example.lan/api/", headers={"Authorization": "Negotiate TlRMTVNTUAABAAAAl4II4gAAAAAAAAAAAAAAAAAAAAAKADk4AAAADw=="})
|
||||
assert r.status_code == 400, r.text
|
||||
assert "Unsupported authentication mechanism (NTLM" in r.text
|
||||
assert os.system("echo S4l4k4l4 | kinit administrator") == 0
|
||||
assert os.stat("/run/certidude/krb5cc").st_uid != 0, "Incorrect persmissions for /run/certidude/krb5cc"
|
||||
r = requests.get("http://ca.example.lan/api/", auth=auth)
|
||||
assert r.status_code == 200, r.text
|
||||
|
||||
@@ -1247,14 +1226,15 @@ def test_cli_setup_authority():
|
||||
# curl http://ca.example.lan/api/ -u adminbot:S4l4k4l4 -H "User-agent: Android" -H "Referer: http://ca.example.lan"
|
||||
r = requests.get("http://ca.example.lan/api/",
|
||||
headers={"Authorization":usertoken, "User-Agent": "Android", "Referer":"http://ca.example.lan/"})
|
||||
#assert r.status_code == 200, r.text # TODO: Fails with 500 in Travis
|
||||
assert r.status_code == 400, r.text
|
||||
assert "expected Negotiate" in r.text, r.text
|
||||
|
||||
|
||||
###########################
|
||||
### Machine keytab auth ###
|
||||
###########################
|
||||
|
||||
assert not os.environ.get("KRB5_KTNAME"), "Environment contaminated"
|
||||
assert_cleanliness()
|
||||
|
||||
mach_pid = os.fork() # Otherwise results in Terminated, needs investigation why
|
||||
if not mach_pid:
|
||||
@@ -1264,9 +1244,6 @@ def test_cli_setup_authority():
|
||||
result = runner.invoke(cli, ['setup', 'openvpn', 'client', "-cn", "somethingelse", "ca.example.lan", "vpn.example.lan"])
|
||||
assert not result.exception, result.output
|
||||
|
||||
with open("/etc/certidude/client.conf", "a") as fh:
|
||||
fh.write("insecure = true\n")
|
||||
|
||||
result = runner.invoke(cli, ["enroll", "--skip-self", "--no-wait", "--kerberos"])
|
||||
assert result.exception, result.output # Bad request 400
|
||||
|
||||
@@ -1276,9 +1253,6 @@ def test_cli_setup_authority():
|
||||
result = runner.invoke(cli, ['setup', 'openvpn', 'client', "-cn", "ca", "ca.example.lan", "vpn.example.lan"])
|
||||
assert not result.exception, result.output
|
||||
|
||||
with open("/etc/certidude/client.conf", "a") as fh:
|
||||
fh.write("insecure = true\n")
|
||||
|
||||
result = runner.invoke(cli, ["enroll", "--skip-self", "--no-wait", "--kerberos"])
|
||||
assert not result.exception, result.output
|
||||
assert "Writing certificate to:" in result.output, result.output
|
||||
@@ -1291,6 +1265,8 @@ def test_cli_setup_authority():
|
||||
### SCEP tests ###
|
||||
##################
|
||||
|
||||
assert not os.path.exists("/tmp/sscep/ca.pem")
|
||||
|
||||
if not os.path.exists("/tmp/sscep"):
|
||||
assert not os.system("git clone https://github.com/certnanny/sscep /tmp/sscep")
|
||||
if not os.path.exists("/tmp/sscep/sscep_dyn"):
|
||||
@@ -1302,6 +1278,7 @@ def test_cli_setup_authority():
|
||||
assert not os.system("echo '.\n.\n.\n.\nGateway\ntest8\n\n\n\n' | openssl req -new -sha256 -key /tmp/key.pem -out /tmp/req.pem")
|
||||
assert not os.system("/tmp/sscep/sscep_dyn enroll -c /tmp/sscep/ca.pem -u http://ca.example.lan/cgi-bin/pkiclient.exe -k /tmp/key.pem -r /tmp/req.pem -l /tmp/cert.pem")
|
||||
# TODO: test e-mails at this point
|
||||
# TODO: add strongswan scep client tests here
|
||||
|
||||
|
||||
###################
|
||||
@@ -1314,12 +1291,7 @@ def test_cli_setup_authority():
|
||||
result = runner.invoke(cli, ['expire'])
|
||||
assert not result.exception, result.output
|
||||
|
||||
# Shut down server
|
||||
assert os.path.exists("/proc/%d" % server_pid)
|
||||
os.kill(server_pid, 15)
|
||||
# sleep(2)
|
||||
# os.kill(server_pid, 9)
|
||||
os.waitpid(server_pid, 0)
|
||||
assert os.system("systemctl stop certidude") == 0
|
||||
|
||||
# Note: STORAGE_PATH was mangled above, hence it's /tmp not /var/lib/certidude
|
||||
assert open("/etc/apparmor.d/local/usr.lib.ipsec.charon").read() == \
|
||||
|
Reference in New Issue
Block a user