mirror of
https://github.com/laurivosandi/certidude
synced 2024-11-16 18:06:44 +00:00
Add tests for API calls
This commit is contained in:
parent
d5edbe50c5
commit
ba9dca910f
@ -35,7 +35,11 @@ class AttributeResource(object):
|
|||||||
current = current[component]
|
current = current[component]
|
||||||
current[key] = value
|
current[key] = value
|
||||||
|
|
||||||
|
try:
|
||||||
whitelist = ip_address(attribs.get("user").get("lease").get("address").decode("ascii"))
|
whitelist = ip_address(attribs.get("user").get("lease").get("address").decode("ascii"))
|
||||||
|
except AttributeError: # TODO: probably race condition
|
||||||
|
raise falcon.HTTPForbidden("Forbidden",
|
||||||
|
"Attributes only accessible to the machine")
|
||||||
|
|
||||||
if req.context.get("remote_addr") != whitelist:
|
if req.context.get("remote_addr") != whitelist:
|
||||||
logger.info("Attribute access denied from %s, expected %s for %s",
|
logger.info("Attribute access denied from %s, expected %s for %s",
|
||||||
|
@ -1,9 +1,11 @@
|
|||||||
import os
|
import os
|
||||||
|
import requests
|
||||||
from click.testing import CliRunner
|
from click.testing import CliRunner
|
||||||
from certidude.cli import entry_point as cli
|
from certidude.cli import entry_point as cli
|
||||||
from datetime import datetime, timedelta
|
from datetime import datetime, timedelta
|
||||||
from cryptography.hazmat.primitives import hashes, serialization
|
from cryptography.hazmat.primitives import hashes, serialization
|
||||||
from cryptography.x509.oid import NameOID
|
from cryptography.x509.oid import NameOID
|
||||||
|
from xattr import setxattr
|
||||||
|
|
||||||
runner = CliRunner()
|
runner = CliRunner()
|
||||||
|
|
||||||
@ -37,6 +39,10 @@ def test_cli_setup_authority():
|
|||||||
authority.store_request(
|
authority.store_request(
|
||||||
csr.sign(key, hashes.SHA256(), default_backend()).public_bytes(serialization.Encoding.PEM))
|
csr.sign(key, hashes.SHA256(), default_backend()).public_bytes(serialization.Encoding.PEM))
|
||||||
|
|
||||||
|
# Check that we can retrieve empty CRL
|
||||||
|
r = requests.get("http://localhost:8080/api/revoked")
|
||||||
|
assert r.status_code == 200
|
||||||
|
|
||||||
result = runner.invoke(cli, ['list', '-srv'])
|
result = runner.invoke(cli, ['list', '-srv'])
|
||||||
assert not result.exception
|
assert not result.exception
|
||||||
|
|
||||||
@ -54,3 +60,52 @@ def test_cli_setup_authority():
|
|||||||
|
|
||||||
result = runner.invoke(cli, ['cron'])
|
result = runner.invoke(cli, ['cron'])
|
||||||
assert not result.exception
|
assert not result.exception
|
||||||
|
|
||||||
|
|
||||||
|
# Test CA certificate fetch
|
||||||
|
r = requests.get("http://localhost:8080/api/certificate")
|
||||||
|
assert r.status_code == 200
|
||||||
|
assert r.headers.get('content-type') == "application/x-x509-ca-cert"
|
||||||
|
|
||||||
|
|
||||||
|
# Test signed certificate API call
|
||||||
|
r = requests.get("http://localhost:8080/api/signed/test2")
|
||||||
|
assert r.status_code == 200
|
||||||
|
assert r.headers.get('content-type') == "application/x-pem-file"
|
||||||
|
|
||||||
|
r = requests.get("http://localhost:8080/api/signed/test2", headers={"Accept":"application/json"})
|
||||||
|
assert r.status_code == 200
|
||||||
|
assert r.headers.get('content-type') == "application/json"
|
||||||
|
|
||||||
|
|
||||||
|
# Test revocations API call
|
||||||
|
r = requests.get("http://localhost:8080/api/revoked")
|
||||||
|
assert r.status_code == 200
|
||||||
|
assert r.headers.get('content-type') == "application/x-pkcs7-crl"
|
||||||
|
|
||||||
|
r = requests.get("http://localhost:8080/api/revoked",
|
||||||
|
headers={"Accept":"application/x-pem-file"})
|
||||||
|
assert r.status_code == 200
|
||||||
|
assert r.headers.get('content-type') == "application/x-pem-file"
|
||||||
|
|
||||||
|
# Test attribute fetching API call
|
||||||
|
r = requests.get("http://localhost:8080/api/signed/test2/attr/")
|
||||||
|
assert r.status_code == 403
|
||||||
|
|
||||||
|
path, _, _ = authority.get_signed("test2")
|
||||||
|
setxattr(path, "user.lease.address", b"127.0.0.1")
|
||||||
|
|
||||||
|
r = requests.get("http://localhost:8080/api/signed/test2/attr/")
|
||||||
|
assert r.status_code == 200
|
||||||
|
|
||||||
|
# Tags should not be visible anonymously
|
||||||
|
r = requests.get("http://localhost:8080/api/signed/test2/tag/")
|
||||||
|
assert r.status_code == 401
|
||||||
|
|
||||||
|
|
||||||
|
# Revoke all valid ones
|
||||||
|
result = runner.invoke(cli, ['revoke', 'test2'])
|
||||||
|
assert not result.exception
|
||||||
|
|
||||||
|
result = runner.invoke(cli, ['revoke', 'test3'])
|
||||||
|
assert not result.exception
|
||||||
|
Loading…
Reference in New Issue
Block a user