mirror of
https://github.com/laurivosandi/certidude
synced 2025-09-05 21:31:19 +00:00
Migrate from cryptography.io to oscrypto
This commit is contained in:
@@ -80,12 +80,6 @@ def clean_client():
|
||||
|
||||
|
||||
def clean_server():
|
||||
if os.path.exists("/run/certidude/signer.pid"):
|
||||
with open("/run/certidude/signer.pid") as fh:
|
||||
try:
|
||||
os.kill(int(fh.read()), 15)
|
||||
except OSError:
|
||||
pass
|
||||
if os.path.exists("/run/certidude/server.pid"):
|
||||
with open("/run/certidude/server.pid") as fh:
|
||||
try:
|
||||
@@ -239,16 +233,18 @@ def test_cli_setup_authority():
|
||||
os.setgid(0) # Restore GID
|
||||
os.umask(0022)
|
||||
|
||||
# Make sure nginx is running
|
||||
assert not result.exception, result.output
|
||||
assert os.getuid() == 0 and os.getgid() == 0, "Serve dropped permissions incorrectly!"
|
||||
assert os.system("nginx -t") == 0, "invalid nginx configuration"
|
||||
os.system("service nginx restart")
|
||||
assert os.path.exists("/run/nginx.pid"), "nginx wasn't started up properly"
|
||||
|
||||
from certidude import config, authority, auth, user
|
||||
assert authority.ca_cert.serial_number >= 0x100000000000000000000000000000000000000
|
||||
assert authority.ca_cert.serial_number <= 0xfffffffffffffffffffffffffffffffffffffff
|
||||
assert authority.ca_cert.not_valid_before < datetime.now()
|
||||
assert authority.ca_cert.not_valid_after > datetime.now() + timedelta(days=7000)
|
||||
assert authority.certificate.serial_number >= 0x100000000000000000000000000000000000000
|
||||
assert authority.certificate.serial_number <= 0xfffffffffffffffffffffffffffffffffffffff
|
||||
assert authority.certificate["tbs_certificate"]["validity"]["not_before"].native.replace(tzinfo=None) < datetime.utcnow()
|
||||
assert authority.certificate["tbs_certificate"]["validity"]["not_after"].native.replace(tzinfo=None) > datetime.utcnow() + timedelta(days=7000)
|
||||
assert authority.server_flags("lauri@fedora-123") == False
|
||||
assert authority.server_flags("fedora-123") == False
|
||||
assert authority.server_flags("vpn.example.lan") == True
|
||||
@@ -412,12 +408,12 @@ def test_cli_setup_authority():
|
||||
assert not result.exception, result.output
|
||||
|
||||
# Test sign API call
|
||||
r = client().simulate_patch("/api/request/test/")
|
||||
r = client().simulate_post("/api/request/test/")
|
||||
assert r.status_code == 401, r.text
|
||||
r = client().simulate_patch("/api/request/test/",
|
||||
r = client().simulate_post("/api/request/test/",
|
||||
headers={"Authorization":usertoken})
|
||||
assert r.status_code == 403, r.text
|
||||
r = client().simulate_patch("/api/request/test/",
|
||||
r = client().simulate_post("/api/request/test/",
|
||||
headers={"Authorization":admintoken})
|
||||
assert r.status_code == 201, r.text
|
||||
assert "Signed " in inbox.pop(), inbox
|
||||
@@ -476,7 +472,7 @@ def test_cli_setup_authority():
|
||||
# Test revocations API call
|
||||
r = client().simulate_get("/api/revoked/",
|
||||
headers={"Accept":"application/x-pem-file"})
|
||||
assert r.status_code == 200, r.text # if this breaks certidude serve has no access to signer socket
|
||||
assert r.status_code == 200, r.text
|
||||
assert r.headers.get('content-type') == "application/x-pem-file"
|
||||
|
||||
r = client().simulate_get("/api/revoked/")
|
||||
@@ -672,29 +668,11 @@ def test_cli_setup_authority():
|
||||
assert "/ev/sub/" in r.text, r.text
|
||||
assert r.json, r.text
|
||||
assert r.json.get("authority"), r.text
|
||||
assert r.json.get("authority").get("events"), r.text
|
||||
|
||||
|
||||
#################################
|
||||
### Subscribe to event source ###
|
||||
#################################
|
||||
|
||||
ev_pid = os.fork()
|
||||
if not ev_pid:
|
||||
url = r.json.get("authority").get("events")
|
||||
if url.startswith("/"): # Expand URL
|
||||
url = "http://ca.example.lan" + url
|
||||
r = requests.get(url, headers={"Accept": "text/event-stream"}, stream=True)
|
||||
lines = ["data: userbot@fedora-15417dc5", "event: request-signed"] # In reverse order!
|
||||
assert r.status_code == 200, r.text
|
||||
for line in r.iter_lines():
|
||||
if not line or line.startswith("id:") or line.startswith(":"):
|
||||
continue
|
||||
assert line == lines.pop(), line
|
||||
if not lines:
|
||||
return
|
||||
assert False, r.text # This should not happen
|
||||
return
|
||||
ev_url = r.json.get("authority").get("events")
|
||||
assert ev_url, r.text
|
||||
if ev_url.startswith("/"): # Expand URL
|
||||
ev_url = "http://ca.example.lan" + ev_url
|
||||
assert ev_url.startswith("http://ca.example.lan/ev/sub/")
|
||||
|
||||
|
||||
#######################
|
||||
@@ -704,6 +682,7 @@ def test_cli_setup_authority():
|
||||
r = client().simulate_post("/api/token/")
|
||||
assert r.status_code == 404, r.text
|
||||
|
||||
"""
|
||||
config.BUNDLE_FORMAT = "ovpn"
|
||||
config.USER_ENROLLMENT_ALLOWED = True
|
||||
|
||||
@@ -734,6 +713,7 @@ def test_cli_setup_authority():
|
||||
assert r2.status_code == 200 # token consumed by anyone on unknown device
|
||||
assert r2.headers.get('content-type') == "application/x-pkcs12"
|
||||
assert "Signed " in inbox.pop(), inbox
|
||||
"""
|
||||
|
||||
# Beyond this point don't use client()
|
||||
const.STORAGE_PATH = "/tmp/"
|
||||
@@ -765,12 +745,13 @@ def test_cli_setup_authority():
|
||||
result = runner.invoke(cli, ["request", "--no-wait"])
|
||||
assert not result.exception, result.output
|
||||
assert not os.path.exists("/run/certidude/ca.example.lan.pid"), result.output
|
||||
assert "refused to sign" in result.output, result.output
|
||||
assert "(Autosign failed, only client certificates allowed to be signed automatically)" in result.output, result.output
|
||||
|
||||
child_pid = os.fork()
|
||||
if not child_pid:
|
||||
result = runner.invoke(cli, ['sign', 'www.example.lan'])
|
||||
result = runner.invoke(cli, ["sign", "www.example.lan"])
|
||||
assert not result.exception, result.output
|
||||
assert "Publishing request-signed event 'www.example.lan' on http://localhost/ev/pub/" in result.output, result.output
|
||||
return
|
||||
else:
|
||||
os.waitpid(child_pid, 0)
|
||||
@@ -785,13 +766,10 @@ def test_cli_setup_authority():
|
||||
assert not os.path.exists("/run/certidude/ca.example.lan.pid"), result.output
|
||||
#assert "Writing certificate to:" in result.output, result.output
|
||||
assert "Attached renewal signature" in result.output, result.output
|
||||
#assert "refused to sign immideately" not in result.output, result.output
|
||||
|
||||
# Test nginx setup
|
||||
assert os.system("nginx -t") == 0, "Generated nginx config was invalid"
|
||||
|
||||
# TODO: test client verification with curl
|
||||
|
||||
|
||||
###############
|
||||
### OpenVPN ###
|
||||
@@ -818,13 +796,17 @@ def test_cli_setup_authority():
|
||||
|
||||
result = runner.invoke(cli, ["request", "--no-wait"])
|
||||
assert not result.exception, result.output
|
||||
assert "(Autosign failed, only client certificates allowed to be signed automatically)" in result.output, result.output
|
||||
assert not os.path.exists("/run/certidude/ca.example.lan.pid"), result.output
|
||||
|
||||
assert not os.path.exists("/var/lib/certidude/ca.example.lan/signed/vpn.example.lan.pem")
|
||||
|
||||
child_pid = os.fork()
|
||||
if not child_pid:
|
||||
result = runner.invoke(cli, ['sign', 'vpn.example.lan'])
|
||||
assert not os.path.exists("/var/lib/certidude/ca.example.lan/signed/vpn.example.lan.pem")
|
||||
result = runner.invoke(cli, ["sign", "vpn.example.lan"])
|
||||
assert not result.exception, result.output
|
||||
assert "overwrit" not in result.output, result.output
|
||||
assert "Publishing request-signed event 'vpn.example.lan' on http://localhost/ev/pub/" in result.output, result.output
|
||||
return
|
||||
else:
|
||||
os.waitpid(child_pid, 0)
|
||||
@@ -859,10 +841,164 @@ def test_cli_setup_authority():
|
||||
# TODO: Check that tunnel interfaces came up, perhaps try to ping?
|
||||
# TODO: assert key, req, cert paths were included correctly in OpenVPN config
|
||||
|
||||
clean_client()
|
||||
|
||||
###############
|
||||
result = runner.invoke(cli, ['setup', 'openvpn', 'networkmanager', "-cn", "roadwarrior3", "ca.example.lan", "vpn.example.lan"])
|
||||
assert not result.exception, result.output
|
||||
|
||||
with open("/etc/certidude/client.conf", "a") as fh:
|
||||
fh.write("insecure = true\n")
|
||||
|
||||
result = runner.invoke(cli, ["request", "--no-wait"])
|
||||
assert not result.exception, result.output
|
||||
assert not os.path.exists("/run/certidude/ca.example.lan.pid"), result.output
|
||||
assert "Writing certificate to:" in result.output, result.output
|
||||
|
||||
|
||||
#################################
|
||||
### Subscribe to event source ###
|
||||
#################################
|
||||
|
||||
ev_pid = os.fork()
|
||||
if not ev_pid:
|
||||
r = requests.get(ev_url, headers={"Accept": "text/event-stream"}, stream=True)
|
||||
assert r.status_code == 200, r.text
|
||||
i = r.iter_lines()
|
||||
assert i.next() == ": hi"
|
||||
assert not i.next()
|
||||
|
||||
# IPSec gateway below
|
||||
assert i.next() == "event: log-entry", i.next()
|
||||
assert i.next().startswith("id:")
|
||||
assert i.next().startswith('data: {"message": "Served CA certificate ')
|
||||
assert not i.next()
|
||||
|
||||
assert i.next() == "event: log-entry", i.next()
|
||||
assert i.next().startswith("id:")
|
||||
assert i.next().startswith('data: {"message": "Serving revocation list (PEM)')
|
||||
assert not i.next()
|
||||
|
||||
assert i.next() == "event: log-entry", i.next() # FIXME
|
||||
assert i.next().startswith("id:")
|
||||
assert i.next().startswith('data: {"message": "Serving revocation list (PEM)')
|
||||
assert not i.next()
|
||||
|
||||
assert i.next() == "event: request-submitted", "%s; %s" % (i.next(), i.next())
|
||||
assert i.next().startswith("id:")
|
||||
assert i.next() == "data: ipsec.example.lan"
|
||||
assert not i.next()
|
||||
|
||||
assert i.next() == "event: log-entry", i.next()
|
||||
assert i.next().startswith("id:")
|
||||
assert i.next().startswith('data: {"message": "Stored signing request ipsec.example.lan ')
|
||||
assert not i.next()
|
||||
|
||||
assert i.next() == "event: log-entry", i.next() # FIXME
|
||||
assert i.next().startswith("id:")
|
||||
assert i.next().startswith('data: {"message": "Stored signing request ipsec.example.lan ')
|
||||
assert not i.next()
|
||||
|
||||
assert i.next() == "event: request-signed"
|
||||
assert i.next().startswith("id:")
|
||||
assert i.next().startswith('data: ipsec.example.lan')
|
||||
assert not i.next()
|
||||
|
||||
assert i.next() == "event: log-entry", i.next()
|
||||
assert i.next().startswith("id:")
|
||||
assert i.next().startswith('data: {"message": "Serving revocation list (PEM)')
|
||||
assert not i.next()
|
||||
|
||||
assert i.next() == "event: log-entry", i.next() # FIXME
|
||||
assert i.next().startswith("id:")
|
||||
assert i.next().startswith('data: {"message": "Serving revocation list (PEM)')
|
||||
assert not i.next()
|
||||
|
||||
assert i.next() == "event: log-entry", i.next()
|
||||
assert i.next().startswith("id:")
|
||||
assert i.next().startswith('data: {"message": "Served certificate ipsec.example.lan')
|
||||
assert not i.next()
|
||||
|
||||
assert i.next() == "event: log-entry", i.next() # FIXME
|
||||
assert i.next().startswith("id:")
|
||||
assert i.next().startswith('data: {"message": "Served certificate ipsec.example.lan')
|
||||
assert not i.next()
|
||||
|
||||
# IPsec client as service enroll
|
||||
assert i.next() == "event: log-entry", i.next()
|
||||
assert i.next().startswith("id:")
|
||||
assert i.next().startswith('data: {"message": "Serving revocation list (PEM)')
|
||||
assert not i.next()
|
||||
|
||||
assert i.next() == "event: log-entry", i.next() # FIXME
|
||||
assert i.next().startswith("id:")
|
||||
assert i.next().startswith('data: {"message": "Serving revocation list (PEM)')
|
||||
assert not i.next()
|
||||
|
||||
assert i.next() == "event: request-signed", i.next()
|
||||
assert i.next().startswith("id:")
|
||||
assert i.next().startswith('data: roadwarrior2')
|
||||
assert not i.next()
|
||||
|
||||
assert i.next() == "event: log-entry", i.next()
|
||||
assert i.next().startswith("id:")
|
||||
assert i.next().startswith('data: {"message": "Autosigned roadwarrior2')
|
||||
assert not i.next()
|
||||
|
||||
assert i.next() == "event: log-entry", i.next() # FIXME
|
||||
assert i.next().startswith("id:")
|
||||
assert i.next().startswith('data: {"message": "Autosigned roadwarrior2')
|
||||
assert not i.next()
|
||||
|
||||
|
||||
|
||||
# IPSec client using Networkmanger enroll
|
||||
assert i.next() == "event: log-entry", i.next()
|
||||
assert i.next().startswith("id:")
|
||||
assert i.next().startswith('data: {"message": "Served CA certificate ')
|
||||
assert not i.next()
|
||||
|
||||
assert i.next() == "event: log-entry", i.next()
|
||||
assert i.next().startswith("id:")
|
||||
assert i.next().startswith('data: {"message": "Serving revocation list (PEM)')
|
||||
assert not i.next()
|
||||
|
||||
assert i.next() == "event: log-entry", i.next() # FIXME
|
||||
assert i.next().startswith("id:")
|
||||
assert i.next().startswith('data: {"message": "Serving revocation list (PEM)')
|
||||
assert not i.next()
|
||||
|
||||
assert i.next() == "event: request-signed", i.next()
|
||||
assert i.next().startswith("id:")
|
||||
assert i.next().startswith('data: roadwarrior4')
|
||||
assert not i.next()
|
||||
|
||||
assert i.next() == "event: log-entry", i.next()
|
||||
assert i.next().startswith("id:")
|
||||
assert i.next().startswith('data: {"message": "Autosigned roadwarrior4')
|
||||
assert not i.next()
|
||||
|
||||
assert i.next() == "event: log-entry", i.next() # FIXME
|
||||
assert i.next().startswith("id:")
|
||||
assert i.next().startswith('data: {"message": "Autosigned roadwarrior4')
|
||||
assert not i.next()
|
||||
|
||||
|
||||
# Revoke
|
||||
|
||||
assert i.next() == "event: certificate-revoked", i.next() # why?!
|
||||
assert i.next().startswith("id:")
|
||||
assert i.next().startswith('data: roadwarrior4')
|
||||
assert not i.next()
|
||||
|
||||
|
||||
return
|
||||
|
||||
|
||||
#############
|
||||
### IPSec ###
|
||||
###############
|
||||
#############
|
||||
|
||||
# Setup gateway
|
||||
|
||||
clean_client()
|
||||
|
||||
@@ -882,11 +1018,15 @@ def test_cli_setup_authority():
|
||||
result = runner.invoke(cli, ["request", "--no-wait"])
|
||||
assert not result.exception, result.output
|
||||
assert not os.path.exists("/run/certidude/ca.example.lan.pid"), result.output
|
||||
assert not os.path.exists("/var/lib/certidude/ca.example.lan/signed/ipsec.example.lan.pem")
|
||||
|
||||
child_pid = os.fork()
|
||||
if not child_pid:
|
||||
result = runner.invoke(cli, ['sign', 'ipsec.example.lan'])
|
||||
assert not os.path.exists("/var/lib/certidude/ca.example.lan/signed/ipsec.example.lan.pem")
|
||||
result = runner.invoke(cli, ["sign", "ipsec.example.lan"])
|
||||
assert not result.exception, result.output
|
||||
assert "overwrit" not in result.output, result.output
|
||||
assert "Publishing request-signed event 'ipsec.example.lan' on http://localhost/ev/pub/" in result.output, result.output
|
||||
return
|
||||
else:
|
||||
os.waitpid(child_pid, 0)
|
||||
@@ -898,7 +1038,8 @@ def test_cli_setup_authority():
|
||||
assert "Writing certificate to:" in result.output, result.output
|
||||
assert os.path.exists("/tmp/ca.example.lan/server_cert.pem")
|
||||
|
||||
# Reset config
|
||||
# IPSec client as service
|
||||
|
||||
os.unlink("/etc/certidude/client.conf")
|
||||
os.unlink("/etc/certidude/services.conf")
|
||||
|
||||
@@ -917,23 +1058,7 @@ def test_cli_setup_authority():
|
||||
|
||||
assert "Writing certificate to:" in result.output, result.output
|
||||
|
||||
|
||||
######################
|
||||
### NetworkManager ###
|
||||
######################
|
||||
|
||||
clean_client()
|
||||
|
||||
result = runner.invoke(cli, ['setup', 'openvpn', 'networkmanager', "-cn", "roadwarrior3", "ca.example.lan", "vpn.example.lan"])
|
||||
assert not result.exception, result.output
|
||||
|
||||
with open("/etc/certidude/client.conf", "a") as fh:
|
||||
fh.write("insecure = true\n")
|
||||
|
||||
result = runner.invoke(cli, ["request", "--no-wait"])
|
||||
assert not result.exception, result.output
|
||||
assert not os.path.exists("/run/certidude/ca.example.lan.pid"), result.output
|
||||
assert "Writing certificate to:" in result.output, result.output
|
||||
# IPSec using NetworkManager
|
||||
|
||||
clean_client()
|
||||
|
||||
@@ -1158,11 +1283,15 @@ def test_cli_setup_authority():
|
||||
##################
|
||||
|
||||
os.umask(0022)
|
||||
assert not os.system("git clone https://github.com/certnanny/sscep /tmp/sscep")
|
||||
assert not os.system("cd /tmp/sscep && ./Configure && make sscep_dyn")
|
||||
if not os.path.exists("/tmp/sscep"):
|
||||
assert not os.system("git clone https://github.com/certnanny/sscep /tmp/sscep")
|
||||
if not os.path.exists("/tmp/sscep/sscep_dyn"):
|
||||
assert not os.system("cd /tmp/sscep && ./Configure && make sscep_dyn")
|
||||
assert not os.system("/tmp/sscep/sscep_dyn getca -c /tmp/sscep/ca.pem -u http://ca.example.lan/cgi-bin/pkiclient.exe")
|
||||
assert not os.system("openssl genrsa -out /tmp/key.pem 1024")
|
||||
assert not os.system("echo '.\n.\n.\n.\n.\ntest8\n\n\n\n' | openssl req -new -sha256 -key /tmp/key.pem -out /tmp/req.pem")
|
||||
if not os.path.exists("/tmp/key.pem"):
|
||||
assert not os.system("openssl genrsa -out /tmp/key.pem 1024")
|
||||
if not os.path.exists("/tmp/req.pem"):
|
||||
assert not os.system("echo '.\n.\n.\n.\n.\ntest8\n\n\n\n' | openssl req -new -sha256 -key /tmp/key.pem -out /tmp/req.pem")
|
||||
assert not os.system("/tmp/sscep/sscep_dyn enroll -c /tmp/sscep/ca.pem -u http://ca.example.lan/cgi-bin/pkiclient.exe -k /tmp/key.pem -r /tmp/req.pem -l /tmp/cert.pem")
|
||||
# TODO: test e-mails at this point
|
||||
|
||||
|
Reference in New Issue
Block a user