mirror of
				https://github.com/laurivosandi/certidude
				synced 2025-10-31 01:19:11 +00:00 
			
		
		
		
	Add request submission API call tests
This commit is contained in:
		| @@ -74,7 +74,7 @@ LONG_POLL_SUBSCRIBE = cp.get("push", "long poll subscribe") | |||||||
| if os.getenv("TRAVIS"): # TODO: include nginx setup in Travis | if os.getenv("TRAVIS"): # TODO: include nginx setup in Travis | ||||||
|     EVENT_SOURCE_PUBLISH = "" |     EVENT_SOURCE_PUBLISH = "" | ||||||
|     LONG_POLL_PUBLISH = "" |     LONG_POLL_PUBLISH = "" | ||||||
|     LONG_POLL_SUBSCRIBE = "" |     LONG_POLL_SUBSCRIBE = "//nonexistant/lp/sub/%s" | ||||||
|  |  | ||||||
| LOGGING_BACKEND = cp.get("logging", "backend") | LOGGING_BACKEND = cp.get("logging", "backend") | ||||||
|  |  | ||||||
|   | |||||||
| @@ -4,11 +4,16 @@ from falcon import testing | |||||||
| from click.testing import CliRunner | from click.testing import CliRunner | ||||||
| from certidude.cli import entry_point as cli | from certidude.cli import entry_point as cli | ||||||
| from datetime import datetime, timedelta | from datetime import datetime, timedelta | ||||||
|  | from cryptography import x509 | ||||||
|  | from cryptography.hazmat.primitives.asymmetric import rsa, padding | ||||||
| from cryptography.hazmat.primitives import hashes, serialization | from cryptography.hazmat.primitives import hashes, serialization | ||||||
|  | from cryptography.hazmat.backends import default_backend | ||||||
| from cryptography.x509.oid import NameOID | from cryptography.x509.oid import NameOID | ||||||
| import pytest | import pytest | ||||||
| from xattr import setxattr | from xattr import setxattr | ||||||
|  |  | ||||||
|  | # pkill py && rm -Rfv ~/.certidude && TRAVIS=1 py.test tests | ||||||
|  |  | ||||||
| runner = CliRunner() | runner = CliRunner() | ||||||
|  |  | ||||||
| @pytest.fixture(scope='module') | @pytest.fixture(scope='module') | ||||||
| @@ -16,13 +21,22 @@ def client(): | |||||||
|     from certidude.api import certidude_app |     from certidude.api import certidude_app | ||||||
|     return testing.TestClient(certidude_app()) |     return testing.TestClient(certidude_app()) | ||||||
|  |  | ||||||
|  | def generate_csr(): | ||||||
|  |     key = rsa.generate_private_key( | ||||||
|  |         public_exponent=65537, | ||||||
|  |         key_size=1024, | ||||||
|  |         backend=default_backend()) | ||||||
|  |     csr = x509.CertificateSigningRequestBuilder( | ||||||
|  |         ).subject_name(x509.Name([x509.NameAttribute(NameOID.COMMON_NAME, u"test")])) | ||||||
|  |     buf = csr.sign(key, hashes.SHA256(), default_backend() | ||||||
|  |         ).public_bytes(serialization.Encoding.PEM) | ||||||
|  |     return buf | ||||||
|  |  | ||||||
| def test_cli_setup_authority(): | def test_cli_setup_authority(): | ||||||
|     result = runner.invoke(cli, ['setup', 'authority']) |     result = runner.invoke(cli, ['setup', 'authority']) | ||||||
|     assert not result.exception |     assert not result.exception | ||||||
|     from certidude import const, config |  | ||||||
|  |  | ||||||
|     from certidude import authority |     from certidude import const, config, authority | ||||||
|     assert authority.ca_cert.serial_number >= 0x100000000000000000000000000000000000000 |     assert authority.ca_cert.serial_number >= 0x100000000000000000000000000000000000000 | ||||||
|     assert authority.ca_cert.serial_number <= 0xfffffffffffffffffffffffffffffffffffffff |     assert authority.ca_cert.serial_number <= 0xfffffffffffffffffffffffffffffffffffffff | ||||||
|     assert authority.ca_cert.not_valid_before < datetime.now() |     assert authority.ca_cert.not_valid_before < datetime.now() | ||||||
| @@ -32,76 +46,95 @@ def test_cli_setup_authority(): | |||||||
|     result = runner.invoke(cli, ['serve', '-f', '-p', '8080']) |     result = runner.invoke(cli, ['serve', '-f', '-p', '8080']) | ||||||
|     assert not result.exception |     assert not result.exception | ||||||
|  |  | ||||||
|     from cryptography import x509 |  | ||||||
|     from cryptography.hazmat.primitives.asymmetric import rsa, padding |  | ||||||
|     from cryptography.hazmat.backends import default_backend |  | ||||||
|     key = rsa.generate_private_key( |  | ||||||
|         public_exponent=65537, |  | ||||||
|         key_size=4096, |  | ||||||
|         backend=default_backend() |  | ||||||
|     ) |  | ||||||
|  |  | ||||||
|     csr = x509.CertificateSigningRequestBuilder( |  | ||||||
|         ).subject_name(x509.Name([x509.NameAttribute(NameOID.COMMON_NAME, u"test")])) |  | ||||||
|  |  | ||||||
|     authority.store_request( |  | ||||||
|         csr.sign(key, hashes.SHA256(), default_backend()).public_bytes(serialization.Encoding.PEM)) |  | ||||||
|  |  | ||||||
|     # Check that we can retrieve empty CRL |     # Check that we can retrieve empty CRL | ||||||
|     r = client().simulate_get("/api/revoked/") |     r = client().simulate_get("/api/revoked/") | ||||||
|     assert r.status_code == 200 |     assert r.status_code == 200 | ||||||
|  |  | ||||||
|  |  | ||||||
|  |     # Test command line interface | ||||||
|     result = runner.invoke(cli, ['list', '-srv']) |     result = runner.invoke(cli, ['list', '-srv']) | ||||||
|     assert not result.exception |     assert not result.exception | ||||||
|  |  | ||||||
|     result = runner.invoke(cli, ['sign', 'test', '-o']) |  | ||||||
|     assert not result.exception |  | ||||||
|  |  | ||||||
|     result = runner.invoke(cli, ['revoke', 'test']) |  | ||||||
|     assert not result.exception |  | ||||||
|  |  | ||||||
|     authority.generate_ovpn_bundle(u"test2") |  | ||||||
|     authority.generate_pkcs12_bundle(u"test3") |  | ||||||
|  |  | ||||||
|     result = runner.invoke(cli, ['list', '-srv']) |  | ||||||
|     assert not result.exception |  | ||||||
|  |  | ||||||
|     result = runner.invoke(cli, ['cron']) |  | ||||||
|     assert not result.exception |  | ||||||
|  |  | ||||||
|  |  | ||||||
|     # Test CA certificate fetch |     # Test CA certificate fetch | ||||||
|     r = client().simulate_get("/api/certificate") |     r = client().simulate_get("/api/certificate") | ||||||
|     assert r.status_code == 200 |     assert r.status_code == 200 | ||||||
|     assert r.headers.get('content-type') == "application/x-x509-ca-cert" |     assert r.headers.get('content-type') == "application/x-x509-ca-cert" | ||||||
|  |  | ||||||
|  |     # Test request submission | ||||||
|  |     buf = generate_csr() | ||||||
|  |  | ||||||
|  |     r = client().simulate_post("/api/request/", body=buf) | ||||||
|  |     assert r.status_code == 415 # wrong content type | ||||||
|  |  | ||||||
|  |     r = client().simulate_post("/api/request/", | ||||||
|  |         body=buf, | ||||||
|  |         headers={"content-type":"application/pkcs10"}) | ||||||
|  |     assert r.status_code == 202 # success | ||||||
|  |  | ||||||
|  |     r = client().simulate_post("/api/request/", | ||||||
|  |         body=buf, | ||||||
|  |         headers={"content-type":"application/pkcs10"}) | ||||||
|  |     assert r.status_code == 202 # already exists, same keypair so it's ok | ||||||
|  |  | ||||||
|  |     r = client().simulate_post("/api/request/", | ||||||
|  |         query_string="wait=1", | ||||||
|  |         body=buf, | ||||||
|  |         headers={"content-type":"application/pkcs10"}) | ||||||
|  |     assert r.status_code == 303 # redirect to long poll | ||||||
|  |  | ||||||
|  |     r = client().simulate_post("/api/request/", | ||||||
|  |         body=generate_csr(), | ||||||
|  |         headers={"content-type":"application/pkcs10"}) | ||||||
|  |     assert r.status_code == 409 # duplicate cn, different keypair | ||||||
|  |  | ||||||
|  |     # Test command line interface | ||||||
|  |     result = runner.invoke(cli, ['list', '-srv']) | ||||||
|  |     assert not result.exception | ||||||
|  |     result = runner.invoke(cli, ['sign', 'test', '-o']) | ||||||
|  |     assert not result.exception | ||||||
|  |     result = runner.invoke(cli, ['revoke', 'test']) | ||||||
|  |     assert not result.exception | ||||||
|  |     authority.generate_ovpn_bundle(u"test2") | ||||||
|  |     authority.generate_pkcs12_bundle(u"test3") | ||||||
|  |     result = runner.invoke(cli, ['list', '-srv']) | ||||||
|  |     assert not result.exception | ||||||
|  |     result = runner.invoke(cli, ['cron']) | ||||||
|  |     assert not result.exception | ||||||
|  |  | ||||||
|     # Test signed certificate API call |     # Test signed certificate API call | ||||||
|     r = client().simulate_get("/api/signed/nonexistant") |     r = client().simulate_get("/api/signed/nonexistant/") | ||||||
|     assert r.status_code == 404 |     assert r.status_code == 404 | ||||||
|  |  | ||||||
|     r = client().simulate_get("/api/signed/test2") |     r = client().simulate_get("/api/signed/test2/") | ||||||
|     assert r.status_code == 200 |     assert r.status_code == 200 | ||||||
|     assert r.headers.get('content-type') == "application/x-pem-file" |     assert r.headers.get('content-type') == "application/x-pem-file" | ||||||
|  |  | ||||||
|     r = client().simulate_get("/api/signed/test2", headers={"Accept":"application/json"}) |     r = client().simulate_get("/api/signed/test2/", headers={"Accept":"application/json"}) | ||||||
|     assert r.status_code == 200 |     assert r.status_code == 200 | ||||||
|     assert r.headers.get('content-type') == "application/json" |     assert r.headers.get('content-type') == "application/json" | ||||||
|  |  | ||||||
|     r = client().simulate_get("/api/signed/test2", headers={"Accept":"text/plain"}) |     r = client().simulate_get("/api/signed/test2/", headers={"Accept":"text/plain"}) | ||||||
|     assert r.status_code == 415 |     assert r.status_code == 415 | ||||||
|  |  | ||||||
|  |  | ||||||
|     # Test revocations API call |     # Test revocations API call | ||||||
|     r = client().simulate_get("/api/revoked") |     r = client().simulate_get("/api/revoked/") | ||||||
|     assert r.status_code == 200 |     assert r.status_code == 200 | ||||||
|     assert r.headers.get('content-type') == "application/x-pkcs7-crl" |     assert r.headers.get('content-type') == "application/x-pkcs7-crl" | ||||||
|  |  | ||||||
|     r = client().simulate_get("/api/revoked", |     r = client().simulate_get("/api/revoked/", | ||||||
|         headers={"Accept":"application/x-pem-file"}) |         headers={"Accept":"application/x-pem-file"}) | ||||||
|     assert r.status_code == 200 |     assert r.status_code == 200 | ||||||
|     assert r.headers.get('content-type') == "application/x-pem-file" |     assert r.headers.get('content-type') == "application/x-pem-file" | ||||||
|  |  | ||||||
|  |     r = client().simulate_get("/api/revoked/", | ||||||
|  |         headers={"Accept":"text/plain"}) | ||||||
|  |     assert r.status_code == 415 | ||||||
|  |  | ||||||
|  |     r = client().simulate_get("/api/revoked/", query_string="wait=true", | ||||||
|  |         headers={"Accept":"application/x-pem-file"}) | ||||||
|  |     assert r.status_code == 303 | ||||||
|  |  | ||||||
|     # Test attribute fetching API call |     # Test attribute fetching API call | ||||||
|     r = client().simulate_get("/api/signed/test2/attr/") |     r = client().simulate_get("/api/signed/test2/attr/") | ||||||
|     assert r.status_code == 403 |     assert r.status_code == 403 | ||||||
| @@ -116,7 +149,6 @@ def test_cli_setup_authority(): | |||||||
|     r = client().simulate_get("/api/signed/test2/tag/") |     r = client().simulate_get("/api/signed/test2/tag/") | ||||||
|     assert r.status_code == 401 |     assert r.status_code == 401 | ||||||
|  |  | ||||||
|  |  | ||||||
|     # Revoke all valid ones |     # Revoke all valid ones | ||||||
|     result = runner.invoke(cli, ['revoke', 'test2']) |     result = runner.invoke(cli, ['revoke', 'test2']) | ||||||
|     assert not result.exception |     assert not result.exception | ||||||
|   | |||||||
		Reference in New Issue
	
	Block a user