From 47aded48d5117a427c96660f9ca4276de0328de7 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Lauri=20V=C3=B5sandi?= Date: Wed, 3 May 2017 14:42:37 +0000 Subject: [PATCH] tests: Add e-mailing and more cli commands --- certidude/api/revoked.py | 7 +--- certidude/cli.py | 38 +++++++++--------- certidude/templates/server/server.conf | 3 +- tests/test_cli.py | 54 +++++++++++++++++++++++++- 4 files changed, 75 insertions(+), 27 deletions(-) diff --git a/certidude/api/revoked.py b/certidude/api/revoked.py index 4ee0d6e..1522c65 100644 --- a/certidude/api/revoked.py +++ b/certidude/api/revoked.py @@ -37,12 +37,7 @@ class RevocationListResource(object): "Content-Disposition", ("attachment; filename=%s-crl.pem" % const.HOSTNAME).encode("ascii")) logger.debug(u"Serving revocation list to %s in PEM format", req.context.get("remote_addr")) - try: - resp.body = export_crl() - logger.debug(u"Serving %s to client", resp.body) - except: - logger.debug(u"Failed to export CRL, are you sure signer is running?") - raise falcon.HTTPInternalServerError("Failed to export CRL") + resp.body = export_crl() else: logger.debug(u"Client %s asked revocation list in unsupported format" % req.context.get("remote_addr")) raise falcon.HTTPUnsupportedMediaType( diff --git a/certidude/cli.py b/certidude/cli.py index 89e7e3d..8d4d91e 100755 --- a/certidude/cli.py +++ b/certidude/cli.py @@ -206,9 +206,9 @@ def certidude_request(fork, renew, no_wait): if config[section_type,section_name]["leftcert"] != endpoint_certificate_path: continue - if config[section_type,section_name]["left"] == "%defaultroute": + if config[section_type,section_name].get("left", "") == "%defaultroute": config[section_type,section_name]["auto"] = "start" # This is client - elif config[section_type,section_name]["leftsourceip"]: + elif config[section_type,section_name].get("leftsourceip", ""): config[section_type,section_name]["auto"] = "add" # This is server else: config[section_type,section_name]["auto"] = "route" # This is site-to-site tunnel @@ -616,7 +616,8 @@ def certidude_setup_strongswan_server(authority, common_name, subnet, route): @click.command("client", help="Set up strongSwan client") @click.argument("authority") @click.argument("remote") -def certidude_setup_strongswan_client(authority, remote): +@click.option("--common-name", "-cn", default=const.HOSTNAME, help="Common name, %s by default" % const.HOSTNAME) +def certidude_setup_strongswan_client(authority, remote, common_name): # Install dependencies apt("strongswan") rpm("strongswan") @@ -631,10 +632,10 @@ def certidude_setup_strongswan_client(authority, remote): else: client_config.add_section(authority) client_config.set(authority, "trigger", "interface up") - client_config.set(authority, "common name", const.HOSTNAME) - client_config.set(authority, "request path", "%s/ipsec.d/reqs/%s.pem" % (const.STRONGSWAN_PREFIX, const.HOSTNAME)) - client_config.set(authority, "key path", "%s/ipsec.d/private/%s.pem" % (const.STRONGSWAN_PREFIX, const.HOSTNAME)) - client_config.set(authority, "certificate path", "%s/ipsec.d/certs/%s.pem" % (const.STRONGSWAN_PREFIX, const.HOSTNAME)) + client_config.set(authority, "common name", common_name) + client_config.set(authority, "request path", "%s/ipsec.d/reqs/%s.pem" % (const.STRONGSWAN_PREFIX, common_name)) + client_config.set(authority, "key path", "%s/ipsec.d/private/%s.pem" % (const.STRONGSWAN_PREFIX, common_name)) + client_config.set(authority, "certificate path", "%s/ipsec.d/certs/%s.pem" % (const.STRONGSWAN_PREFIX, common_name)) client_config.set(authority, "authority path", "%s/ipsec.d/cacerts/ca.pem" % const.STRONGSWAN_PREFIX) client_config.set(authority, "revocations path", "%s/ipsec.d/crls/ca.pem" % const.STRONGSWAN_PREFIX) with open(const.CLIENT_CONFIG_PATH + ".part", 'wb') as fh: @@ -687,7 +688,8 @@ def certidude_setup_strongswan_client(authority, remote): @click.command("networkmanager", help="Set up strongSwan client via NetworkManager") @click.argument("authority") # Certidude server @click.argument("remote") # StrongSwan gateway -def certidude_setup_strongswan_networkmanager(authority, remote): +@click.option("--common-name", "-cn", default=const.HOSTNAME, help="Common name, %s by default" % const.HOSTNAME) +def certidude_setup_strongswan_networkmanager(authority, remote, common_name): # Install dependencies apt("strongswan-nm") rpm("NetworkManager-strongswan-gnome") @@ -703,10 +705,10 @@ def certidude_setup_strongswan_networkmanager(authority, remote): else: client_config.add_section(authority) client_config.set(authority, "trigger", "interface up") - client_config.set(authority, "common name", const.HOSTNAME) - client_config.set(authority, "request path", "/etc/ipsec.d/reqs/%s.pem" % const.HOSTNAME) - client_config.set(authority, "key path", "/etc/ipsec.d/private/%s.pem" % const.HOSTNAME) - client_config.set(authority, "certificate path", "/etc/ipsec.d/certs/%s.pem" % const.HOSTNAME) + client_config.set(authority, "common name", common_name) + client_config.set(authority, "request path", "/etc/ipsec.d/reqs/%s.pem" % common_name) + client_config.set(authority, "key path", "/etc/ipsec.d/private/%s.pem" % common_name) + client_config.set(authority, "certificate path", "/etc/ipsec.d/certs/%s.pem" % common_name) client_config.set(authority, "authority path", "/etc/ipsec.d/cacerts/ca.pem") client_config.set(authority, "revocations path", "/etc/ipsec.d/crls/ca.pem") with open(const.CLIENT_CONFIG_PATH + ".part", 'wb') as fh: @@ -735,20 +737,20 @@ def certidude_setup_strongswan_networkmanager(authority, remote): @click.argument("authority") @click.argument("remote") # OpenVPN gateway @click.option("--common-name", "-cn", default=const.HOSTNAME, help="Common name, %s by default" % const.HOSTNAME) -def certidude_setup_openvpn_networkmanager(authority, remote): +def certidude_setup_openvpn_networkmanager(authority, remote, common_name): # Create corresponding section in /etc/certidude/client.conf client_config = ConfigParser() if os.path.exists(const.CLIENT_CONFIG_PATH): client_config.readfp(open(const.CLIENT_CONFIG_PATH)) - if client_config.has_section(server): + if client_config.has_section(authority): click.echo("Section '%s' already exists in %s, remove to regenerate" % (authority, const.CLIENT_CONFIG_PATH)) else: client_config.add_section(authority) client_config.set(authority, "trigger", "interface up") - client_config.set(authority, "common name", const.HOSTNAME) - client_config.set(authority, "request path", "/etc/ipsec.d/reqs/%s.pem" % const.HOSTNAME) - client_config.set(authority, "key path", "/etc/ipsec.d/private/%s.pem" % const.HOSTNAME) - client_config.set(authority, "certificate path", "/etc/ipsec.d/certs/%s.pem" % const.HOSTNAME) + client_config.set(authority, "common name", common_name) + client_config.set(authority, "request path", "/etc/ipsec.d/reqs/%s.pem" % common_name) + client_config.set(authority, "key path", "/etc/ipsec.d/private/%s.pem" % common_name) + client_config.set(authority, "certificate path", "/etc/ipsec.d/certs/%s.pem" % common_name) client_config.set(authority, "authority path", "/etc/ipsec.d/cacerts/ca.pem") client_config.set(authority, "revocations path", "/etc/ipsec.d/crls/ca.pem") with open(const.CLIENT_CONFIG_PATH + ".part", 'wb') as fh: diff --git a/certidude/templates/server/server.conf b/certidude/templates/server/server.conf index 55cbb03..6bfe0f5 100644 --- a/certidude/templates/server/server.conf +++ b/certidude/templates/server/server.conf @@ -140,8 +140,7 @@ expired dir = {{ directory }}/expired/ # uncomment mail sender address to enable e-mails. # Make sure used e-mail address is reachable for end users. name = Certificate management -address = -;address = certificates@example.com +address = certificates@example.lan [tagging] owner/string = Owner diff --git a/tests/test_cli.py b/tests/test_cli.py index 8f09c32..4c219a7 100644 --- a/tests/test_cli.py +++ b/tests/test_cli.py @@ -6,6 +6,29 @@ import pytest import shutil import os +smtp=None +inbox=[] + +class DummySMTP(object): + def __init__(self,address): + self.address=address + + def login(self,username,password): + self.username=username + self.password=password + + def sendmail(self,from_address,to_address,fullmessage): + global inbox + inbox.append(fullmessage) + return [] + + def quit(self): + self.has_quit=True + +# this is the actual monkey patch (simply replacing one class with another) +import smtplib +smtplib.SMTP=DummySMTP + runner = CliRunner() @pytest.fixture(scope='module') @@ -59,6 +82,11 @@ def test_cli_setup_authority(): os.unlink("/etc/certidude/client.conf") if os.path.exists("/run/certidude"): shutil.rmtree("/run/certidude") + if os.path.exists("/var/log/certidude.log"): + os.unlink("/var/log/certidude.log") + + with open("/etc/ipsec.conf", "w") as fh: # TODO: make compatible with Fedora + pass # Remove OpenVPN stuff if os.path.exists("/etc/openvpn"): @@ -135,27 +163,32 @@ def test_cli_setup_authority(): r = client().simulate_post("/api/request/", body=buf) assert r.status_code == 415 # wrong content type + assert not inbox r = client().simulate_post("/api/request/", body=buf, headers={"content-type":"application/pkcs10"}) assert r.status_code == 202 # success + assert "Stored request " in inbox.pop(), inbox r = client().simulate_post("/api/request/", body=buf, headers={"content-type":"application/pkcs10"}) assert r.status_code == 202 # already exists, same keypair so it's ok + assert not inbox r = client().simulate_post("/api/request/", query_string="wait=true", body=buf, headers={"content-type":"application/pkcs10"}) assert r.status_code == 303 # redirect to long poll + assert not inbox r = client().simulate_post("/api/request/", body=generate_csr(cn=u"test"), headers={"content-type":"application/pkcs10"}) assert r.status_code == 409 # duplicate cn, different keypair + assert not inbox r = client().simulate_get("/api/request/test/", headers={"Accept":"application/json"}) assert r.status_code == 200 # fetch as JSON ok @@ -177,6 +210,7 @@ def test_cli_setup_authority(): headers={"content-type":"application/pkcs10"}) assert r.status_code == 200 # autosign successful assert r.headers.get('content-type') == "application/x-pem-file" + assert "Signed " in inbox.pop(), inbox # TODO: submit messed up CSR-s: no CN, empty CN etc @@ -192,7 +226,7 @@ def test_cli_setup_authority(): return else: os.waitpid(child_pid, 0) - assert os.getuid() == 0 and os.getgid() == 0, "Serve dropped permissions incorrectly!" + assert not inbox # forked processes don't reach the mailbox # Test session API call r = client().simulate_get("/api/", headers={"Authorization":usertoken}) @@ -313,6 +347,7 @@ def test_cli_setup_authority(): r = client().simulate_delete("/api/signed/test/", headers={"Authorization":admintoken}) assert r.status_code == 200, r.text + assert "Revoked " in inbox.pop(), inbox # Log can be read only by admin @@ -342,6 +377,7 @@ def test_cli_setup_authority(): body="user=userbot", # TODO: test nonexistant user headers={"content-type": "application/x-www-form-urlencoded", "Authorization":admintoken}) assert r.status_code == 200 # token generated by admin + assert "Token for " in inbox.pop(), inbox r2 = client().simulate_get("/api/token/", query_string="u=userbot&t=1493184342&c=ac9b71421d5741800c5a4905b20c1072594a2df863e60ba836464888786bf2a6", @@ -353,11 +389,13 @@ def test_cli_setup_authority(): "AppleWebKit/537.36 (KHTML, like Gecko) Chrome/57.0.2987.133 Safari/537.36"}) assert r2.status_code == 200 # token consumed by anyone on Fedora assert r2.headers.get('content-type') == "application/x-openvpn" + assert "Signed " in inbox.pop(), inbox config.BUNDLE_FORMAT = "p12" # Switch to PKCS#12 r2 = client().simulate_get("/api/token/", query_string=r.content) assert r2.status_code == 200 # token consumed by anyone on unknown device assert r2.headers.get('content-type') == "application/x-pkcs12" + assert "Signed " in inbox.pop(), inbox result = runner.invoke(cli, ['setup', 'openvpn', 'server', "-cn", "vpn.example.lan", "ca.example.lan"]) assert not result.exception, result.output @@ -365,6 +403,18 @@ def test_cli_setup_authority(): result = runner.invoke(cli, ['setup', 'openvpn', 'client', "-cn", "roadwarrior1", "ca.example.lan", "vpn.example.lan"]) assert not result.exception, result.output + result = runner.invoke(cli, ['setup', 'strongswan', 'server', "-cn", "ipsec.example.lan", "ca.example.lan"]) + assert not result.exception, result.output + + result = runner.invoke(cli, ['setup', 'strongswan', 'client', "-cn", "roadwarrior2", "ca.example.lan", "ipsec.example.lan"]) + assert not result.exception, result.output + + result = runner.invoke(cli, ['setup', 'openvpn', 'networkmanager', "-cn", "roadwarrior3", "ca.example.lan", "vpn.example.lan"]) + assert not result.exception, result.output + + result = runner.invoke(cli, ['setup', 'strongswan', 'networkmanager', "-cn", "roadwarrior4", "ca.example.lan", "ipsec.example.lan"]) + assert not result.exception, result.output + import os if not os.path.exists("/etc/openvpn/keys"): os.makedirs("/etc/openvpn/keys") @@ -410,4 +460,6 @@ def test_cli_setup_authority(): with open("/run/certidude/server.pid") as fh: os.kill(int(fh.read()), 1) + assert len(inbox) == 0, inbox # Make sure all messages were checked + os.waitpid(server_pid, 0)