mirror of
				https://github.com/laurivosandi/certidude
				synced 2025-10-31 09:29:13 +00:00 
			
		
		
		
	tests: Fix signer shutdown and add tests for event source
This commit is contained in:
		| @@ -1236,6 +1236,10 @@ def certidude_serve(port, listen, fork, exit_handler): | |||||||
|             app.add_route("/api/exit/", ExitResource()) |             app.add_route("/api/exit/", ExitResource()) | ||||||
|         httpd.serve_forever() |         httpd.serve_forever() | ||||||
|  |  | ||||||
|  |         # Shut down signer as well | ||||||
|  |         assert authority.signer_exec("exit") == "ok" | ||||||
|  |  | ||||||
|  |  | ||||||
|  |  | ||||||
| @click.command("yubikey", help="Set up Yubikey as client authentication token") | @click.command("yubikey", help="Set up Yubikey as client authentication token") | ||||||
| @click.argument("authority") | @click.argument("authority") | ||||||
|   | |||||||
| @@ -138,9 +138,7 @@ def clean_server(): | |||||||
|                 pass |                 pass | ||||||
|     if os.path.exists("/etc/certidude/server.keytab"): |     if os.path.exists("/etc/certidude/server.keytab"): | ||||||
|         os.unlink("/etc/certidude/server.keytab") |         os.unlink("/etc/certidude/server.keytab") | ||||||
|     if os.path.exists("/var/lib/samba/"): |     os.system("rm -Rfv /var/lib/samba/*") | ||||||
|         shutil.rmtree("/var/lib/samba") |  | ||||||
|     os.makedirs("/var/lib/samba") |  | ||||||
|  |  | ||||||
|     # Restore initial resolv.conf |     # Restore initial resolv.conf | ||||||
|     shutil.copyfile("/etc/resolv.conf.orig", "/etc/resolv.conf") |     shutil.copyfile("/etc/resolv.conf.orig", "/etc/resolv.conf") | ||||||
| @@ -595,18 +593,54 @@ def test_cli_setup_authority(): | |||||||
|  |  | ||||||
|  |  | ||||||
|     # Test session API call |     # Test session API call | ||||||
|     r = client().simulate_get("/api/", headers={"Authorization":usertoken}) |  | ||||||
|     assert r.status_code == 200 |  | ||||||
|     r = client().simulate_get("/api/", headers={"Authorization":admintoken}) |  | ||||||
|     assert r.status_code == 200 |  | ||||||
|     r = client().simulate_get("/api/", headers={"Accept":"text/plain", "Authorization":admintoken}) |  | ||||||
|     assert r.status_code == 415 # invalid media type |  | ||||||
|     r = client().simulate_get("/api/") |     r = client().simulate_get("/api/") | ||||||
|     assert r.status_code == 401 |     assert r.status_code == 401 | ||||||
|     assert "Please authenticate" in r.text |     assert "Please authenticate" in r.text | ||||||
|  |  | ||||||
|  |     r = client().simulate_get("/api/", headers={"Accept":"text/plain", "Authorization":admintoken}) | ||||||
|  |     assert r.status_code == 415 # invalid media type | ||||||
|  |  | ||||||
|  |     r = client().simulate_get("/api/", headers={"Authorization":usertoken}) | ||||||
|  |     assert r.status_code == 200 | ||||||
|  |     assert r.headers.get('content-type').startswith("application/json") | ||||||
|  |     assert r.json, r.text | ||||||
|  |     assert not r.json.get("authority"), r.text # No permissions to admin | ||||||
|  |  | ||||||
|  |     r = client().simulate_get("/api/", headers={"Authorization":admintoken}) | ||||||
|  |     assert r.status_code == 200 | ||||||
|  |     assert r.headers.get('content-type').startswith("application/json") | ||||||
|  |     assert "/ev/sub/" in r.text, r.text | ||||||
|  |     assert r.json, r.text | ||||||
|  |     assert r.json.get("authority"), r.text | ||||||
|  |     assert r.json.get("authority").get("events"), r.text | ||||||
|  |  | ||||||
|  |  | ||||||
|  |     ################################# | ||||||
|  |     ### Subscribe to event source ### | ||||||
|  |     ################################# | ||||||
|  |  | ||||||
|  |     ev_pid = os.fork() | ||||||
|  |     if not ev_pid: | ||||||
|  |         url = r.json.get("authority").get("events") | ||||||
|  |         if url.startswith("/"): # Expand URL | ||||||
|  |             url = "http://ca.example.lan" + url | ||||||
|  |         r = requests.get(url, headers={"Accept": "text/event-stream"}, stream=True) | ||||||
|  |         lines = ["data: userbot@fedora-15417dc5", "event: request-signed"] # In reverse order! | ||||||
|  |         assert r.status_code == 200, r.text | ||||||
|  |         for line in r.iter_lines(): | ||||||
|  |             if not line or line.startswith("id:") or line.startswith(":"): | ||||||
|  |                 continue | ||||||
|  |             assert line == lines.pop(), line | ||||||
|  |             if not lines: | ||||||
|  |                 return | ||||||
|  |         assert False, r.text # This should not happen | ||||||
|  |         return | ||||||
|  |  | ||||||
|  |  | ||||||
|  |     ####################### | ||||||
|  |     ### Token mechanism ### | ||||||
|  |     ####################### | ||||||
|  |  | ||||||
|     # Test token mech |  | ||||||
|     r = client().simulate_post("/api/token/") |     r = client().simulate_post("/api/token/") | ||||||
|     assert r.status_code == 404, r.text |     assert r.status_code == 404, r.text | ||||||
|  |  | ||||||
| @@ -1011,9 +1045,6 @@ def test_cli_setup_authority(): | |||||||
|     result = runner.invoke(cli, ['cron']) |     result = runner.invoke(cli, ['cron']) | ||||||
|     assert not result.exception, result.output |     assert not result.exception, result.output | ||||||
|  |  | ||||||
|     # Shut down signer |  | ||||||
|     assert authority.signer_exec("exit") == "ok" |  | ||||||
|  |  | ||||||
|     # Shut down server |     # Shut down server | ||||||
|     requests.get("http://ca.example.lan/api/exit") |     requests.get("http://ca.example.lan/api/exit") | ||||||
|     os.waitpid(server_pid, 0) |     os.waitpid(server_pid, 0) | ||||||
|   | |||||||
		Reference in New Issue
	
	Block a user