mirror of
				https://github.com/laurivosandi/certidude
				synced 2025-10-31 01:19:11 +00:00 
			
		
		
		
	tests: Add e-mailing and more cli commands
This commit is contained in:
		| @@ -37,12 +37,7 @@ class RevocationListResource(object): | ||||
|                     "Content-Disposition", | ||||
|                     ("attachment; filename=%s-crl.pem" % const.HOSTNAME).encode("ascii")) | ||||
|                 logger.debug(u"Serving revocation list to %s in PEM format", req.context.get("remote_addr")) | ||||
|                 try: | ||||
|                 resp.body = export_crl() | ||||
|                     logger.debug(u"Serving %s to client", resp.body) | ||||
|                 except: | ||||
|                     logger.debug(u"Failed to export CRL, are you sure signer is running?") | ||||
|                     raise falcon.HTTPInternalServerError("Failed to export CRL") | ||||
|         else: | ||||
|             logger.debug(u"Client %s asked revocation list in unsupported format" % req.context.get("remote_addr")) | ||||
|             raise falcon.HTTPUnsupportedMediaType( | ||||
|   | ||||
| @@ -206,9 +206,9 @@ def certidude_request(fork, renew, no_wait): | ||||
|                     if config[section_type,section_name]["leftcert"] != endpoint_certificate_path: | ||||
|                         continue | ||||
|  | ||||
|                     if config[section_type,section_name]["left"] == "%defaultroute": | ||||
|                     if config[section_type,section_name].get("left", "") == "%defaultroute": | ||||
|                         config[section_type,section_name]["auto"] = "start" # This is client | ||||
|                     elif config[section_type,section_name]["leftsourceip"]: | ||||
|                     elif config[section_type,section_name].get("leftsourceip", ""): | ||||
|                         config[section_type,section_name]["auto"] = "add" # This is server | ||||
|                     else: | ||||
|                         config[section_type,section_name]["auto"] = "route" # This is site-to-site tunnel | ||||
| @@ -616,7 +616,8 @@ def certidude_setup_strongswan_server(authority, common_name, subnet, route): | ||||
| @click.command("client", help="Set up strongSwan client") | ||||
| @click.argument("authority") | ||||
| @click.argument("remote") | ||||
| def certidude_setup_strongswan_client(authority, remote): | ||||
| @click.option("--common-name", "-cn", default=const.HOSTNAME, help="Common name, %s by default" % const.HOSTNAME) | ||||
| def certidude_setup_strongswan_client(authority, remote, common_name): | ||||
|     # Install dependencies | ||||
|     apt("strongswan") | ||||
|     rpm("strongswan") | ||||
| @@ -631,10 +632,10 @@ def certidude_setup_strongswan_client(authority, remote): | ||||
|     else: | ||||
|         client_config.add_section(authority) | ||||
|         client_config.set(authority, "trigger", "interface up") | ||||
|         client_config.set(authority, "common name", const.HOSTNAME) | ||||
|         client_config.set(authority, "request path", "%s/ipsec.d/reqs/%s.pem" % (const.STRONGSWAN_PREFIX, const.HOSTNAME)) | ||||
|         client_config.set(authority, "key path", "%s/ipsec.d/private/%s.pem" % (const.STRONGSWAN_PREFIX, const.HOSTNAME)) | ||||
|         client_config.set(authority, "certificate path", "%s/ipsec.d/certs/%s.pem" % (const.STRONGSWAN_PREFIX, const.HOSTNAME)) | ||||
|         client_config.set(authority, "common name", common_name) | ||||
|         client_config.set(authority, "request path", "%s/ipsec.d/reqs/%s.pem" % (const.STRONGSWAN_PREFIX, common_name)) | ||||
|         client_config.set(authority, "key path", "%s/ipsec.d/private/%s.pem" % (const.STRONGSWAN_PREFIX, common_name)) | ||||
|         client_config.set(authority, "certificate path", "%s/ipsec.d/certs/%s.pem" % (const.STRONGSWAN_PREFIX, common_name)) | ||||
|         client_config.set(authority, "authority path", "%s/ipsec.d/cacerts/ca.pem" % const.STRONGSWAN_PREFIX) | ||||
|         client_config.set(authority, "revocations path", "%s/ipsec.d/crls/ca.pem" % const.STRONGSWAN_PREFIX) | ||||
|         with open(const.CLIENT_CONFIG_PATH + ".part", 'wb') as fh: | ||||
| @@ -687,7 +688,8 @@ def certidude_setup_strongswan_client(authority, remote): | ||||
| @click.command("networkmanager", help="Set up strongSwan client via NetworkManager") | ||||
| @click.argument("authority") # Certidude server | ||||
| @click.argument("remote") # StrongSwan gateway | ||||
| def certidude_setup_strongswan_networkmanager(authority, remote): | ||||
| @click.option("--common-name", "-cn", default=const.HOSTNAME, help="Common name, %s by default" % const.HOSTNAME) | ||||
| def certidude_setup_strongswan_networkmanager(authority, remote, common_name): | ||||
|     # Install dependencies | ||||
|     apt("strongswan-nm") | ||||
|     rpm("NetworkManager-strongswan-gnome") | ||||
| @@ -703,10 +705,10 @@ def certidude_setup_strongswan_networkmanager(authority, remote): | ||||
|     else: | ||||
|         client_config.add_section(authority) | ||||
|         client_config.set(authority, "trigger", "interface up") | ||||
|         client_config.set(authority, "common name", const.HOSTNAME) | ||||
|         client_config.set(authority, "request path", "/etc/ipsec.d/reqs/%s.pem" % const.HOSTNAME) | ||||
|         client_config.set(authority, "key path", "/etc/ipsec.d/private/%s.pem" % const.HOSTNAME) | ||||
|         client_config.set(authority, "certificate path", "/etc/ipsec.d/certs/%s.pem" % const.HOSTNAME) | ||||
|         client_config.set(authority, "common name", common_name) | ||||
|         client_config.set(authority, "request path", "/etc/ipsec.d/reqs/%s.pem" % common_name) | ||||
|         client_config.set(authority, "key path", "/etc/ipsec.d/private/%s.pem" % common_name) | ||||
|         client_config.set(authority, "certificate path", "/etc/ipsec.d/certs/%s.pem" % common_name) | ||||
|         client_config.set(authority, "authority path",  "/etc/ipsec.d/cacerts/ca.pem") | ||||
|         client_config.set(authority, "revocations path",  "/etc/ipsec.d/crls/ca.pem") | ||||
|         with open(const.CLIENT_CONFIG_PATH + ".part", 'wb') as fh: | ||||
| @@ -735,20 +737,20 @@ def certidude_setup_strongswan_networkmanager(authority, remote): | ||||
| @click.argument("authority") | ||||
| @click.argument("remote") # OpenVPN gateway | ||||
| @click.option("--common-name", "-cn", default=const.HOSTNAME, help="Common name, %s by default" % const.HOSTNAME) | ||||
| def certidude_setup_openvpn_networkmanager(authority, remote): | ||||
| def certidude_setup_openvpn_networkmanager(authority, remote, common_name): | ||||
|     # Create corresponding section in /etc/certidude/client.conf | ||||
|     client_config = ConfigParser() | ||||
|     if os.path.exists(const.CLIENT_CONFIG_PATH): | ||||
|         client_config.readfp(open(const.CLIENT_CONFIG_PATH)) | ||||
|     if client_config.has_section(server): | ||||
|     if client_config.has_section(authority): | ||||
|         click.echo("Section '%s' already exists in %s, remove to regenerate" % (authority, const.CLIENT_CONFIG_PATH)) | ||||
|     else: | ||||
|         client_config.add_section(authority) | ||||
|         client_config.set(authority, "trigger", "interface up") | ||||
|         client_config.set(authority, "common name", const.HOSTNAME) | ||||
|         client_config.set(authority, "request path", "/etc/ipsec.d/reqs/%s.pem" % const.HOSTNAME) | ||||
|         client_config.set(authority, "key path", "/etc/ipsec.d/private/%s.pem" % const.HOSTNAME) | ||||
|         client_config.set(authority, "certificate path", "/etc/ipsec.d/certs/%s.pem" % const.HOSTNAME) | ||||
|         client_config.set(authority, "common name", common_name) | ||||
|         client_config.set(authority, "request path", "/etc/ipsec.d/reqs/%s.pem" % common_name) | ||||
|         client_config.set(authority, "key path", "/etc/ipsec.d/private/%s.pem" % common_name) | ||||
|         client_config.set(authority, "certificate path", "/etc/ipsec.d/certs/%s.pem" % common_name) | ||||
|         client_config.set(authority, "authority path",  "/etc/ipsec.d/cacerts/ca.pem") | ||||
|         client_config.set(authority, "revocations path",  "/etc/ipsec.d/crls/ca.pem") | ||||
|         with open(const.CLIENT_CONFIG_PATH + ".part", 'wb') as fh: | ||||
|   | ||||
| @@ -140,8 +140,7 @@ expired dir = {{ directory }}/expired/ | ||||
| # uncomment mail sender address to enable e-mails. | ||||
| # Make sure used e-mail address is reachable for end users. | ||||
| name = Certificate management | ||||
| address = | ||||
| ;address = certificates@example.com | ||||
| address = certificates@example.lan | ||||
|  | ||||
| [tagging] | ||||
| owner/string = Owner | ||||
|   | ||||
| @@ -6,6 +6,29 @@ import pytest | ||||
| import shutil | ||||
| import os | ||||
|  | ||||
| smtp=None | ||||
| inbox=[] | ||||
|  | ||||
| class DummySMTP(object): | ||||
|     def __init__(self,address): | ||||
|         self.address=address | ||||
|  | ||||
|     def login(self,username,password): | ||||
|         self.username=username | ||||
|         self.password=password | ||||
|  | ||||
|     def sendmail(self,from_address,to_address,fullmessage): | ||||
|         global inbox | ||||
|         inbox.append(fullmessage) | ||||
|         return [] | ||||
|  | ||||
|     def quit(self): | ||||
|         self.has_quit=True | ||||
|  | ||||
| # this is the actual monkey patch (simply replacing one class with another) | ||||
| import smtplib | ||||
| smtplib.SMTP=DummySMTP | ||||
|  | ||||
| runner = CliRunner() | ||||
|  | ||||
| @pytest.fixture(scope='module') | ||||
| @@ -59,6 +82,11 @@ def test_cli_setup_authority(): | ||||
|         os.unlink("/etc/certidude/client.conf") | ||||
|     if os.path.exists("/run/certidude"): | ||||
|         shutil.rmtree("/run/certidude") | ||||
|     if os.path.exists("/var/log/certidude.log"): | ||||
|         os.unlink("/var/log/certidude.log") | ||||
|  | ||||
|     with open("/etc/ipsec.conf", "w") as fh: # TODO: make compatible with Fedora | ||||
|         pass | ||||
|  | ||||
|     # Remove OpenVPN stuff | ||||
|     if os.path.exists("/etc/openvpn"): | ||||
| @@ -135,27 +163,32 @@ def test_cli_setup_authority(): | ||||
|  | ||||
|     r = client().simulate_post("/api/request/", body=buf) | ||||
|     assert r.status_code == 415 # wrong content type | ||||
|     assert not inbox | ||||
|  | ||||
|     r = client().simulate_post("/api/request/", | ||||
|         body=buf, | ||||
|         headers={"content-type":"application/pkcs10"}) | ||||
|     assert r.status_code == 202 # success | ||||
|     assert "Stored request " in inbox.pop(), inbox | ||||
|  | ||||
|     r = client().simulate_post("/api/request/", | ||||
|         body=buf, | ||||
|         headers={"content-type":"application/pkcs10"}) | ||||
|     assert r.status_code == 202 # already exists, same keypair so it's ok | ||||
|     assert not inbox | ||||
|  | ||||
|     r = client().simulate_post("/api/request/", | ||||
|         query_string="wait=true", | ||||
|         body=buf, | ||||
|         headers={"content-type":"application/pkcs10"}) | ||||
|     assert r.status_code == 303 # redirect to long poll | ||||
|     assert not inbox | ||||
|  | ||||
|     r = client().simulate_post("/api/request/", | ||||
|         body=generate_csr(cn=u"test"), | ||||
|         headers={"content-type":"application/pkcs10"}) | ||||
|     assert r.status_code == 409 # duplicate cn, different keypair | ||||
|     assert not inbox | ||||
|  | ||||
|     r = client().simulate_get("/api/request/test/", headers={"Accept":"application/json"}) | ||||
|     assert r.status_code == 200 # fetch as JSON ok | ||||
| @@ -177,6 +210,7 @@ def test_cli_setup_authority(): | ||||
|         headers={"content-type":"application/pkcs10"}) | ||||
|     assert r.status_code == 200 # autosign successful | ||||
|     assert r.headers.get('content-type') == "application/x-pem-file" | ||||
|     assert "Signed " in inbox.pop(), inbox | ||||
|  | ||||
|     # TODO: submit messed up CSR-s: no CN, empty CN etc | ||||
|  | ||||
| @@ -192,7 +226,7 @@ def test_cli_setup_authority(): | ||||
|         return | ||||
|     else: | ||||
|         os.waitpid(child_pid, 0) | ||||
|     assert os.getuid() == 0 and os.getgid() == 0, "Serve dropped permissions incorrectly!" | ||||
|         assert not inbox # forked processes don't reach the mailbox | ||||
|  | ||||
|     # Test session API call | ||||
|     r = client().simulate_get("/api/", headers={"Authorization":usertoken}) | ||||
| @@ -313,6 +347,7 @@ def test_cli_setup_authority(): | ||||
|     r = client().simulate_delete("/api/signed/test/", | ||||
|         headers={"Authorization":admintoken}) | ||||
|     assert r.status_code == 200, r.text | ||||
|     assert "Revoked " in inbox.pop(), inbox | ||||
|  | ||||
|  | ||||
|     # Log can be read only by admin | ||||
| @@ -342,6 +377,7 @@ def test_cli_setup_authority(): | ||||
|         body="user=userbot", # TODO: test nonexistant user | ||||
|         headers={"content-type": "application/x-www-form-urlencoded", "Authorization":admintoken}) | ||||
|     assert r.status_code == 200 # token generated by admin | ||||
|     assert "Token for " in inbox.pop(), inbox | ||||
|  | ||||
|     r2 = client().simulate_get("/api/token/", | ||||
|         query_string="u=userbot&t=1493184342&c=ac9b71421d5741800c5a4905b20c1072594a2df863e60ba836464888786bf2a6", | ||||
| @@ -353,11 +389,13 @@ def test_cli_setup_authority(): | ||||
|             "AppleWebKit/537.36 (KHTML, like Gecko) Chrome/57.0.2987.133 Safari/537.36"}) | ||||
|     assert r2.status_code == 200 # token consumed by anyone on Fedora | ||||
|     assert r2.headers.get('content-type') == "application/x-openvpn" | ||||
|     assert "Signed " in inbox.pop(), inbox | ||||
|  | ||||
|     config.BUNDLE_FORMAT = "p12" # Switch to PKCS#12 | ||||
|     r2 = client().simulate_get("/api/token/", query_string=r.content) | ||||
|     assert r2.status_code == 200 # token consumed by anyone on unknown device | ||||
|     assert r2.headers.get('content-type') == "application/x-pkcs12" | ||||
|     assert "Signed " in inbox.pop(), inbox | ||||
|  | ||||
|     result = runner.invoke(cli, ['setup', 'openvpn', 'server', "-cn", "vpn.example.lan", "ca.example.lan"]) | ||||
|     assert not result.exception, result.output | ||||
| @@ -365,6 +403,18 @@ def test_cli_setup_authority(): | ||||
|     result = runner.invoke(cli, ['setup', 'openvpn', 'client', "-cn", "roadwarrior1", "ca.example.lan", "vpn.example.lan"]) | ||||
|     assert not result.exception, result.output | ||||
|  | ||||
|     result = runner.invoke(cli, ['setup', 'strongswan', 'server', "-cn", "ipsec.example.lan", "ca.example.lan"]) | ||||
|     assert not result.exception, result.output | ||||
|  | ||||
|     result = runner.invoke(cli, ['setup', 'strongswan', 'client', "-cn", "roadwarrior2", "ca.example.lan", "ipsec.example.lan"]) | ||||
|     assert not result.exception, result.output | ||||
|  | ||||
|     result = runner.invoke(cli, ['setup', 'openvpn', 'networkmanager', "-cn", "roadwarrior3", "ca.example.lan", "vpn.example.lan"]) | ||||
|     assert not result.exception, result.output | ||||
|  | ||||
|     result = runner.invoke(cli, ['setup', 'strongswan', 'networkmanager', "-cn", "roadwarrior4", "ca.example.lan", "ipsec.example.lan"]) | ||||
|     assert not result.exception, result.output | ||||
|  | ||||
|     import os | ||||
|     if not os.path.exists("/etc/openvpn/keys"): | ||||
|         os.makedirs("/etc/openvpn/keys") | ||||
| @@ -410,4 +460,6 @@ def test_cli_setup_authority(): | ||||
|     with open("/run/certidude/server.pid") as fh: | ||||
|         os.kill(int(fh.read()), 1) | ||||
|  | ||||
|     assert len(inbox) == 0, inbox # Make sure all messages were checked | ||||
|  | ||||
|     os.waitpid(server_pid, 0) | ||||
|   | ||||
		Reference in New Issue
	
	Block a user