1
0
mirror of https://github.com/laurivosandi/certidude synced 2024-12-22 16:25:17 +00:00

tests: Better coverage for tagging tests

This commit is contained in:
Lauri Võsandi 2017-05-04 09:14:47 +00:00
parent 2ffcc64d86
commit 94944e37f1
4 changed files with 74 additions and 31 deletions

View File

@ -67,12 +67,19 @@ class RequestListResource(object):
pass
else:
if cert.public_key().public_numbers() == csr.public_key().public_numbers():
try:
renewal_signature = b64decode(req.get_header("X-Renewal-Signature"))
except TypeError, ValueError: # No header supplied, redirect to signed API call
renewal_header = req.get_header("X-Renewal-Signature")
if not renewal_header:
# No header supplied, redirect to signed API call
resp.status = falcon.HTTP_SEE_OTHER
resp.location = os.path.join(os.path.dirname(req.relative_uri), "signed", common_name.value)
return
try:
renewal_signature = b64decode(renewal_header)
except TypeError, ValueError:
logger.error("Renewal failed, bad signature supplied for %s", common_name.value)
reason = "Renewal failed, bad signature supplied"
else:
try:
verifier = cert.public_key().verifier(

View File

@ -56,7 +56,7 @@ def store_request(buf, overwrite=False):
"""
if not buf:
raise ValueError("No certificate supplied") # No certificate supplied
raise ValueError("No signing request supplied")
csr = x509.load_pem_x509_csr(buf, backend=default_backend())
common_name, = csr.subject.get_attributes_for_oid(NameOID.COMMON_NAME)
@ -259,24 +259,20 @@ def generate_pkcs12_bundle(common_name, owner=None):
cert, cert_buf = _sign(csr, buf, overwrite=True)
# Generate P12, currently supported only by PyOpenSSL
try:
from OpenSSL import crypto
except ImportError:
raise
else:
p12 = crypto.PKCS12()
p12.set_privatekey(
crypto.load_privatekey(
crypto.FILETYPE_PEM,
key.private_bytes(
encoding=serialization.Encoding.PEM,
format=serialization.PrivateFormat.TraditionalOpenSSL,
encryption_algorithm=serialization.NoEncryption())))
p12.set_certificate(
crypto.load_certificate(crypto.FILETYPE_PEM, cert_buf))
p12.set_ca_certificates([
crypto.load_certificate(crypto.FILETYPE_PEM, ca_buf)])
return p12.export("1234"), cert
from OpenSSL import crypto
p12 = crypto.PKCS12()
p12.set_privatekey(
crypto.load_privatekey(
crypto.FILETYPE_PEM,
key.private_bytes(
encoding=serialization.Encoding.PEM,
format=serialization.PrivateFormat.TraditionalOpenSSL,
encryption_algorithm=serialization.NoEncryption())))
p12.set_certificate(
crypto.load_certificate(crypto.FILETYPE_PEM, cert_buf))
p12.set_ca_certificates([
crypto.load_certificate(crypto.FILETYPE_PEM, ca_buf)])
return p12.export("1234"), cert
def sign(common_name, overwrite=False):

View File

@ -1208,13 +1208,14 @@ def certidude_serve(port, listen, fork):
logger.debug("Started Certidude at %s", const.FQDN)
drop_privileges()
def quit_handler(*args, **kwargs):
click.echo("Shutting down HTTP server...")
import threading
threading.Thread(target=httpd.shutdown).start()
raise KeyboardInterrupt
signal.signal(signal.SIGHUP, quit_handler)
httpd.serve_forever()
try:
httpd.serve_forever()
except KeyboardInterrupt:
click.echo("Caught Ctrl-C, server stopped")
@click.command("yubikey", help="Set up Yubikey as client authentication token")

View File

@ -146,6 +146,20 @@ def test_cli_setup_authority():
assert authority.ca_cert.serial_number <= 0xfffffffffffffffffffffffffffffffffffffff
assert authority.ca_cert.not_valid_before < datetime.now()
assert authority.ca_cert.not_valid_after > datetime.now() + timedelta(days=7000)
assert authority.server_flags("lauri@fedora-123") == False
assert authority.server_flags("fedora-123") == False
assert authority.server_flags("vpn.example.lan") == True
assert authority.server_flags("lauri@a.b.c") == False
# Generate garbage
with open("/var/lib/certidude/ca.example.lan/bla", "w") as fh:
pass
with open("/var/lib/certidude/ca.example.lan/requests/bla", "w") as fh:
pass
with open("/var/lib/certidude/ca.example.lan/signed/bla", "w") as fh:
pass
with open("/var/lib/certidude/ca.example.lan/revoked/bla", "w") as fh:
pass
# Start server before any signing operations are performed
config.CERTIFICATE_RENEWAL_ALLOWED = True
@ -198,6 +212,8 @@ def test_cli_setup_authority():
r = requests.get("http://ca.example.lan/../nonexistant.html")
assert r.status_code == 403, r.text
r = client().simulate_get("/")
assert r.status_code == 200, r.text
r = client().simulate_get("/index.html")
assert r.status_code == 200, r.text
r = client().simulate_get("/nonexistant.html")
@ -314,6 +330,15 @@ def test_cli_setup_authority():
assert "Stored request " in inbox.pop(), inbox
assert not inbox
buf = generate_csr(cn=u"test2.example.lan")
r = client().simulate_post("/api/request/",
query_string="autosign=1",
body=buf,
headers={"content-type":"application/pkcs10"})
assert r.status_code == 202 # server CN, request stored
assert "Stored request " in inbox.pop(), inbox
assert not inbox
# Test signed certificate API call
r = client().simulate_get("/api/signed/nonexistant/")
assert r.status_code == 404, r.text
@ -390,6 +415,10 @@ def test_cli_setup_authority():
body="key=other&value=something",
headers={"content-type": "application/x-www-form-urlencoded", "Authorization":admintoken})
assert r.status_code == 200, r.text
r = client().simulate_post("/api/signed/test/tag/",
body="key=location&value=Tallinn",
headers={"content-type": "application/x-www-form-urlencoded", "Authorization":admintoken})
assert r.status_code == 200, r.text
# Tags can be overwritten only by admin
r = client().simulate_put("/api/signed/test/tag/something/")
@ -401,6 +430,11 @@ def test_cli_setup_authority():
body="value=else",
headers={"content-type": "application/x-www-form-urlencoded", "Authorization":admintoken})
assert r.status_code == 200, r.text
r = client().simulate_put("/api/signed/test/tag/location=Tallinn/",
body="value=Tartu",
headers={"content-type": "application/x-www-form-urlencoded", "Authorization":admintoken})
assert r.status_code == 200, r.text
assert getxattr(path, "user.xdg.tags") == "location=Tartu,else"
# Tags can be deleted only by admin
r = client().simulate_delete("/api/signed/test/tag/else/")
@ -411,6 +445,9 @@ def test_cli_setup_authority():
r = client().simulate_delete("/api/signed/test/tag/else/",
headers={"content-type": "application/x-www-form-urlencoded", "Authorization":admintoken})
assert r.status_code == 200, r.text
r = client().simulate_delete("/api/signed/test/tag/location=Tartu/",
headers={"content-type": "application/x-www-form-urlencoded", "Authorization":admintoken})
assert r.status_code == 200, r.text
assert "user.xdg.tags" not in listxattr(path, "user.xdg.tags")
@ -502,7 +539,7 @@ def test_cli_setup_authority():
assert not result.exception, result.output
result = runner.invoke(cli, ["setup", "nginx", "-cn", "www.example.lan", "ca.example.lan"])
assert not result.exception, result.output # blah already exists, remove to regenerate
assert not result.exception, result.output # client conf already exists, remove to regenerate
import os
@ -528,6 +565,8 @@ def test_cli_setup_authority():
result = runner.invoke(cli, ["request", "--renew", "--no-wait"])
assert not result.exception, result.output
assert "Writing certificate to:" in result.output, result.output
assert "Attached renewal signature" in result.output, result.output
assert "refused to sign immideately" not in result.output, result.output
# Test nginx setup
assert os.system("nginx -t") == 0, "Generated nginx config was invalid"
@ -549,7 +588,7 @@ def test_cli_setup_authority():
assert not result.exception, result.output
result = runner.invoke(cli, ['setup', 'openvpn', 'server', "-cn", "vpn.example.lan", "ca.example.lan"])
assert not result.exception, result.output # blah already exists, remove to regenerate
assert not result.exception, result.output # client conf already exists, remove to regenerate
with open("/etc/certidude/client.conf", "a") as fh:
fh.write("insecure = true\n")
@ -578,7 +617,7 @@ def test_cli_setup_authority():
assert not result.exception, result.output
result = runner.invoke(cli, ['setup', 'openvpn', 'client', "-cn", "roadwarrior1", "ca.example.lan", "vpn.example.lan"])
assert not result.exception, result.output # blah already exists, remove to regenerate
assert not result.exception, result.output # client conf already exists, remove to regenerate
with open("/etc/certidude/client.conf", "a") as fh:
fh.write("insecure = true\n")
@ -602,7 +641,7 @@ def test_cli_setup_authority():
assert not result.exception, result.output
result = runner.invoke(cli, ['setup', 'strongswan', 'server', "-cn", "ipsec.example.lan", "ca.example.lan"])
assert not result.exception, result.output # blah already exists, remove to regenerate
assert not result.exception, result.output # client conf already exists, remove to regenerate
with open("/etc/certidude/client.conf", "a") as fh:
fh.write("insecure = true\n")
@ -631,7 +670,7 @@ def test_cli_setup_authority():
assert not result.exception, result.output
result = runner.invoke(cli, ['setup', 'strongswan', 'client', "-cn", "roadwarrior2", "ca.example.lan", "ipsec.example.lan"])
assert not result.exception, result.output # blah already exists, remove to regenerate
assert not result.exception, result.output # client conf already exists, remove to regenerate
with open("/etc/certidude/client.conf", "a") as fh:
fh.write("insecure = true\n")