From ba9dca910f1695ee4c178e78205e45951075a802 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Lauri=20V=C3=B5sandi?= Date: Tue, 25 Apr 2017 13:06:59 +0300 Subject: [PATCH] Add tests for API calls --- certidude/api/attrib.py | 6 ++++- tests/test_cli.py | 55 +++++++++++++++++++++++++++++++++++++++++ 2 files changed, 60 insertions(+), 1 deletion(-) diff --git a/certidude/api/attrib.py b/certidude/api/attrib.py index 740f8c6..0d0c50e 100644 --- a/certidude/api/attrib.py +++ b/certidude/api/attrib.py @@ -35,7 +35,11 @@ class AttributeResource(object): current = current[component] current[key] = value - whitelist = ip_address(attribs.get("user").get("lease").get("address").decode("ascii")) + try: + whitelist = ip_address(attribs.get("user").get("lease").get("address").decode("ascii")) + except AttributeError: # TODO: probably race condition + raise falcon.HTTPForbidden("Forbidden", + "Attributes only accessible to the machine") if req.context.get("remote_addr") != whitelist: logger.info("Attribute access denied from %s, expected %s for %s", diff --git a/tests/test_cli.py b/tests/test_cli.py index dcbd117..3c80a3e 100644 --- a/tests/test_cli.py +++ b/tests/test_cli.py @@ -1,9 +1,11 @@ import os +import requests from click.testing import CliRunner from certidude.cli import entry_point as cli from datetime import datetime, timedelta from cryptography.hazmat.primitives import hashes, serialization from cryptography.x509.oid import NameOID +from xattr import setxattr runner = CliRunner() @@ -37,6 +39,10 @@ def test_cli_setup_authority(): authority.store_request( csr.sign(key, hashes.SHA256(), default_backend()).public_bytes(serialization.Encoding.PEM)) + # Check that we can retrieve empty CRL + r = requests.get("http://localhost:8080/api/revoked") + assert r.status_code == 200 + result = runner.invoke(cli, ['list', '-srv']) assert not result.exception @@ -54,3 +60,52 @@ def test_cli_setup_authority(): result = runner.invoke(cli, ['cron']) assert not result.exception + + + # Test CA certificate fetch + r = requests.get("http://localhost:8080/api/certificate") + assert r.status_code == 200 + assert r.headers.get('content-type') == "application/x-x509-ca-cert" + + + # Test signed certificate API call + r = requests.get("http://localhost:8080/api/signed/test2") + assert r.status_code == 200 + assert r.headers.get('content-type') == "application/x-pem-file" + + r = requests.get("http://localhost:8080/api/signed/test2", headers={"Accept":"application/json"}) + assert r.status_code == 200 + assert r.headers.get('content-type') == "application/json" + + + # Test revocations API call + r = requests.get("http://localhost:8080/api/revoked") + assert r.status_code == 200 + assert r.headers.get('content-type') == "application/x-pkcs7-crl" + + r = requests.get("http://localhost:8080/api/revoked", + headers={"Accept":"application/x-pem-file"}) + assert r.status_code == 200 + assert r.headers.get('content-type') == "application/x-pem-file" + + # Test attribute fetching API call + r = requests.get("http://localhost:8080/api/signed/test2/attr/") + assert r.status_code == 403 + + path, _, _ = authority.get_signed("test2") + setxattr(path, "user.lease.address", b"127.0.0.1") + + r = requests.get("http://localhost:8080/api/signed/test2/attr/") + assert r.status_code == 200 + + # Tags should not be visible anonymously + r = requests.get("http://localhost:8080/api/signed/test2/tag/") + assert r.status_code == 401 + + + # Revoke all valid ones + result = runner.invoke(cli, ['revoke', 'test2']) + assert not result.exception + + result = runner.invoke(cli, ['revoke', 'test3']) + assert not result.exception