diff --git a/certidude/cli.py b/certidude/cli.py index fcfd0ec..03278f4 100755 --- a/certidude/cli.py +++ b/certidude/cli.py @@ -1236,6 +1236,10 @@ def certidude_serve(port, listen, fork, exit_handler): app.add_route("/api/exit/", ExitResource()) httpd.serve_forever() + # Shut down signer as well + assert authority.signer_exec("exit") == "ok" + + @click.command("yubikey", help="Set up Yubikey as client authentication token") @click.argument("authority") diff --git a/tests/test_cli.py b/tests/test_cli.py index 660d92b..375c5cd 100644 --- a/tests/test_cli.py +++ b/tests/test_cli.py @@ -138,9 +138,7 @@ def clean_server(): pass if os.path.exists("/etc/certidude/server.keytab"): os.unlink("/etc/certidude/server.keytab") - if os.path.exists("/var/lib/samba/"): - shutil.rmtree("/var/lib/samba") - os.makedirs("/var/lib/samba") + os.system("rm -Rfv /var/lib/samba/*") # Restore initial resolv.conf shutil.copyfile("/etc/resolv.conf.orig", "/etc/resolv.conf") @@ -595,18 +593,54 @@ def test_cli_setup_authority(): # Test session API call - r = client().simulate_get("/api/", headers={"Authorization":usertoken}) - assert r.status_code == 200 - r = client().simulate_get("/api/", headers={"Authorization":admintoken}) - assert r.status_code == 200 - r = client().simulate_get("/api/", headers={"Accept":"text/plain", "Authorization":admintoken}) - assert r.status_code == 415 # invalid media type r = client().simulate_get("/api/") assert r.status_code == 401 assert "Please authenticate" in r.text + r = client().simulate_get("/api/", headers={"Accept":"text/plain", "Authorization":admintoken}) + assert r.status_code == 415 # invalid media type + + r = client().simulate_get("/api/", headers={"Authorization":usertoken}) + assert r.status_code == 200 + assert r.headers.get('content-type').startswith("application/json") + assert r.json, r.text + assert not r.json.get("authority"), r.text # No permissions to admin + + r = client().simulate_get("/api/", headers={"Authorization":admintoken}) + assert r.status_code == 200 + assert r.headers.get('content-type').startswith("application/json") + assert "/ev/sub/" in r.text, r.text + assert r.json, r.text + assert r.json.get("authority"), r.text + assert r.json.get("authority").get("events"), r.text + + + ################################# + ### Subscribe to event source ### + ################################# + + ev_pid = os.fork() + if not ev_pid: + url = r.json.get("authority").get("events") + if url.startswith("/"): # Expand URL + url = "http://ca.example.lan" + url + r = requests.get(url, headers={"Accept": "text/event-stream"}, stream=True) + lines = ["data: userbot@fedora-15417dc5", "event: request-signed"] # In reverse order! + assert r.status_code == 200, r.text + for line in r.iter_lines(): + if not line or line.startswith("id:") or line.startswith(":"): + continue + assert line == lines.pop(), line + if not lines: + return + assert False, r.text # This should not happen + return + + + ####################### + ### Token mechanism ### + ####################### - # Test token mech r = client().simulate_post("/api/token/") assert r.status_code == 404, r.text @@ -1011,9 +1045,6 @@ def test_cli_setup_authority(): result = runner.invoke(cli, ['cron']) assert not result.exception, result.output - # Shut down signer - assert authority.signer_exec("exit") == "ok" - # Shut down server requests.get("http://ca.example.lan/api/exit") os.waitpid(server_pid, 0)