mirror of
				https://github.com/laurivosandi/certidude
				synced 2025-10-31 01:19:11 +00:00 
			
		
		
		
	tests: Test request deletion and signing API calls
This commit is contained in:
		| @@ -119,7 +119,7 @@ class RequestListResource(object): | |||||||
|                             logger.info("Autosigned %s as %s is whitelisted", common_name.value, req.context.get("remote_addr")) |                             logger.info("Autosigned %s as %s is whitelisted", common_name.value, req.context.get("remote_addr")) | ||||||
|                             return |                             return | ||||||
|                         except EnvironmentError: |                         except EnvironmentError: | ||||||
|                             logger.info("Autosign for %s failed, signed certificate already exists", |                             logger.info("Autosign for %s from %s failed, signed certificate already exists", | ||||||
|                                 common_name.value, req.context.get("remote_addr")) |                                 common_name.value, req.context.get("remote_addr")) | ||||||
|                             reason = "Autosign failed, signed certificate already exists" |                             reason = "Autosign failed, signed certificate already exists" | ||||||
|                         break |                         break | ||||||
|   | |||||||
| @@ -198,6 +198,7 @@ def delete_request(common_name): | |||||||
|     push.publish("request-deleted", common_name) |     push.publish("request-deleted", common_name) | ||||||
|  |  | ||||||
|     # Write empty certificate to long-polling URL |     # Write empty certificate to long-polling URL | ||||||
|  |     if config.LONG_POLL_PUBLISH: | ||||||
|         requests.delete( |         requests.delete( | ||||||
|             config.LONG_POLL_PUBLISH % hashlib.sha256(buf).hexdigest(), |             config.LONG_POLL_PUBLISH % hashlib.sha256(buf).hexdigest(), | ||||||
|             headers={"User-Agent": "Certidude API"}) |             headers={"User-Agent": "Certidude API"}) | ||||||
|   | |||||||
| @@ -6,6 +6,9 @@ import pytest | |||||||
| import shutil | import shutil | ||||||
| import os | import os | ||||||
|  |  | ||||||
|  | UA_FEDORA_FIREFOX = "Mozilla/5.0 (X11; Fedora; Linux x86_64) " \ | ||||||
|  |     "AppleWebKit/537.36 (KHTML, like Gecko) Chrome/57.0.2987.133 Safari/537.36" | ||||||
|  |  | ||||||
| smtp=None | smtp=None | ||||||
| inbox=[] | inbox=[] | ||||||
|  |  | ||||||
| @@ -186,7 +189,6 @@ def test_cli_setup_authority(): | |||||||
|  |  | ||||||
|     r = client().simulate_post("/api/request/", body=buf) |     r = client().simulate_post("/api/request/", body=buf) | ||||||
|     assert r.status_code == 415 # wrong content type |     assert r.status_code == 415 # wrong content type | ||||||
|     assert not inbox |  | ||||||
|  |  | ||||||
|     r = client().simulate_post("/api/request/", |     r = client().simulate_post("/api/request/", | ||||||
|         body=buf, |         body=buf, | ||||||
| @@ -194,6 +196,26 @@ def test_cli_setup_authority(): | |||||||
|     assert r.status_code == 202 # success |     assert r.status_code == 202 # success | ||||||
|     assert "Stored request " in inbox.pop(), inbox |     assert "Stored request " in inbox.pop(), inbox | ||||||
|  |  | ||||||
|  |     # Test request deletion | ||||||
|  |     r = client().simulate_delete("/api/request/test/") | ||||||
|  |     assert r.status_code == 401, r.text | ||||||
|  |     r = client().simulate_delete("/api/request/test/", | ||||||
|  |         headers={"Authorization":usertoken}) | ||||||
|  |     assert r.status_code == 403, r.text | ||||||
|  |     r = client().simulate_delete("/api/request/test/", | ||||||
|  |         headers={"User-Agent":UA_FEDORA_FIREFOX, "Authorization":admintoken}) | ||||||
|  |     assert r.status_code == 403, r.text # CSRF prevented | ||||||
|  |     r = client().simulate_delete("/api/request/test/", | ||||||
|  |         headers={"Authorization":admintoken}) | ||||||
|  |     assert r.status_code == 200, r.text | ||||||
|  |  | ||||||
|  |     # Test request submission corner cases | ||||||
|  |     r = client().simulate_post("/api/request/", | ||||||
|  |         body=buf, | ||||||
|  |         headers={"content-type":"application/pkcs10"}) | ||||||
|  |     assert r.status_code == 202 # success | ||||||
|  |     assert "Stored request " in inbox.pop(), inbox | ||||||
|  |  | ||||||
|     r = client().simulate_post("/api/request/", |     r = client().simulate_post("/api/request/", | ||||||
|         body=buf, |         body=buf, | ||||||
|         headers={"content-type":"application/pkcs10"}) |         headers={"content-type":"application/pkcs10"}) | ||||||
| @@ -227,6 +249,25 @@ def test_cli_setup_authority(): | |||||||
|     r = client().simulate_get("/api/request/nonexistant/", headers={"Accept":"application/json"}) |     r = client().simulate_get("/api/request/nonexistant/", headers={"Accept":"application/json"}) | ||||||
|     assert r.status_code == 404 # nonexistant common names |     assert r.status_code == 404 # nonexistant common names | ||||||
|  |  | ||||||
|  |     # TODO: submit messed up CSR-s: no CN, empty CN etc | ||||||
|  |  | ||||||
|  |     # Test command line interface | ||||||
|  |     result = runner.invoke(cli, ['list', '-srv']) | ||||||
|  |     assert not result.exception, result.output | ||||||
|  |  | ||||||
|  |     # Test sign API call | ||||||
|  |     r = client().simulate_patch("/api/request/test/") | ||||||
|  |     assert r.status_code == 401, r.text | ||||||
|  |     r = client().simulate_patch("/api/request/test/", | ||||||
|  |         headers={"Authorization":usertoken}) | ||||||
|  |     assert r.status_code == 403, r.text | ||||||
|  |     r = client().simulate_patch("/api/request/test/", | ||||||
|  |         headers={"Authorization":admintoken}) | ||||||
|  |     assert r.status_code == 201, r.text | ||||||
|  |     assert "Signed " in inbox.pop(), inbox | ||||||
|  |  | ||||||
|  |     # Test autosign | ||||||
|  |     buf = generate_csr(cn=u"test2") | ||||||
|     r = client().simulate_post("/api/request/", |     r = client().simulate_post("/api/request/", | ||||||
|         query_string="autosign=1", |         query_string="autosign=1", | ||||||
|         body=buf, |         body=buf, | ||||||
| @@ -234,22 +275,23 @@ def test_cli_setup_authority(): | |||||||
|     assert r.status_code == 200 # autosign successful |     assert r.status_code == 200 # autosign successful | ||||||
|     assert r.headers.get('content-type') == "application/x-pem-file" |     assert r.headers.get('content-type') == "application/x-pem-file" | ||||||
|     assert "Signed " in inbox.pop(), inbox |     assert "Signed " in inbox.pop(), inbox | ||||||
|  |     assert not inbox | ||||||
|  |  | ||||||
|     # TODO: submit messed up CSR-s: no CN, empty CN etc |     r = client().simulate_post("/api/request/", | ||||||
|  |         query_string="autosign=1", | ||||||
|  |         body=buf, | ||||||
|  |         headers={"content-type":"application/pkcs10"}) | ||||||
|  |     assert r.status_code == 303 # already signed, redirect to signed certificate | ||||||
|  |     assert not inbox | ||||||
|  |  | ||||||
|     # Test command line interface |     buf = generate_csr(cn=u"test2") | ||||||
|     result = runner.invoke(cli, ['list', '-srv']) |     r = client().simulate_post("/api/request/", | ||||||
|     assert not result.exception, result.output |         query_string="autosign=1", | ||||||
|  |         body=buf, | ||||||
|     # Some commands have side effects (setuid, setgid etc) |         headers={"content-type":"application/pkcs10"}) | ||||||
|     child_pid = os.fork() |     assert r.status_code == 202 # duplicate CN, request stored | ||||||
|     if not child_pid: |     assert "Stored request " in inbox.pop(), inbox | ||||||
|         result = runner.invoke(cli, ['sign', 'test', '-o']) |     assert not inbox | ||||||
|         assert not result.exception, result.output |  | ||||||
|         return |  | ||||||
|     else: |  | ||||||
|         os.waitpid(child_pid, 0) |  | ||||||
|         assert not inbox # forked processes don't reach the mailbox |  | ||||||
|  |  | ||||||
|     # Test session API call |     # Test session API call | ||||||
|     r = client().simulate_get("/api/", headers={"Authorization":usertoken}) |     r = client().simulate_get("/api/", headers={"Authorization":usertoken}) | ||||||
| @@ -258,6 +300,9 @@ def test_cli_setup_authority(): | |||||||
|     r = client().simulate_get("/api/", headers={"Authorization":admintoken}) |     r = client().simulate_get("/api/", headers={"Authorization":admintoken}) | ||||||
|     assert r.status_code == 200 |     assert r.status_code == 200 | ||||||
|  |  | ||||||
|  |     r = client().simulate_get("/api/", headers={"Accept":"text/plain", "Authorization":admintoken}) | ||||||
|  |     assert r.status_code == 415 # invalid media type | ||||||
|  |  | ||||||
|     r = client().simulate_get("/api/") |     r = client().simulate_get("/api/") | ||||||
|     assert r.status_code == 401 |     assert r.status_code == 401 | ||||||
|  |  | ||||||
| @@ -408,8 +453,7 @@ def test_cli_setup_authority(): | |||||||
|     assert r2.status_code == 403 # invalid checksum |     assert r2.status_code == 403 # invalid checksum | ||||||
|     r2 = client().simulate_get("/api/token/", |     r2 = client().simulate_get("/api/token/", | ||||||
|         query_string=r.content, |         query_string=r.content, | ||||||
|         headers={"User-Agent":"Mozilla/5.0 (X11; Fedora; Linux x86_64) " |         headers={"User-Agent":UA_FEDORA_FIREFOX}) | ||||||
|             "AppleWebKit/537.36 (KHTML, like Gecko) Chrome/57.0.2987.133 Safari/537.36"}) |  | ||||||
|     assert r2.status_code == 200 # token consumed by anyone on Fedora |     assert r2.status_code == 200 # token consumed by anyone on Fedora | ||||||
|     assert r2.headers.get('content-type') == "application/x-openvpn" |     assert r2.headers.get('content-type') == "application/x-openvpn" | ||||||
|     assert "Signed " in inbox.pop(), inbox |     assert "Signed " in inbox.pop(), inbox | ||||||
|   | |||||||
		Reference in New Issue
	
	Block a user