certidude/tests/test_cli.py

268 lines
9.8 KiB
Python
Raw Normal View History

2017-03-13 15:20:41 +00:00
import os
2017-04-25 10:06:59 +00:00
import requests
2017-04-25 18:47:41 +00:00
import subprocess
import pwd
2017-04-25 10:52:10 +00:00
from falcon import testing
2015-09-09 05:31:48 +00:00
from click.testing import CliRunner
from certidude.cli import entry_point as cli
2016-09-18 15:30:31 +00:00
from datetime import datetime, timedelta
2017-04-25 13:04:11 +00:00
from cryptography import x509
from cryptography.hazmat.primitives.asymmetric import rsa, padding
2017-03-13 15:20:41 +00:00
from cryptography.hazmat.primitives import hashes, serialization
2017-04-25 13:04:11 +00:00
from cryptography.hazmat.backends import default_backend
2017-03-13 15:20:41 +00:00
from cryptography.x509.oid import NameOID
2017-04-25 10:52:10 +00:00
import pytest
2017-04-25 10:06:59 +00:00
from xattr import setxattr
2015-09-09 05:31:48 +00:00
2017-04-25 13:04:11 +00:00
# pkill py && rm -Rfv ~/.certidude && TRAVIS=1 py.test tests
2015-09-09 05:31:48 +00:00
runner = CliRunner()
2017-04-25 10:52:10 +00:00
@pytest.fixture(scope='module')
def client():
from certidude.api import certidude_app
return testing.TestClient(certidude_app())
def generate_csr(cn=None):
2017-04-25 13:04:11 +00:00
key = rsa.generate_private_key(
public_exponent=65537,
key_size=1024,
backend=default_backend())
csr = x509.CertificateSigningRequestBuilder()
if cn is not None:
csr = csr.subject_name(x509.Name([x509.NameAttribute(NameOID.COMMON_NAME, cn)]))
2017-04-25 13:04:11 +00:00
buf = csr.sign(key, hashes.SHA256(), default_backend()
).public_bytes(serialization.Encoding.PEM)
return buf
2017-04-25 10:52:10 +00:00
def test_cli_setup_authority():
2016-09-18 15:30:31 +00:00
result = runner.invoke(cli, ['setup', 'authority'])
assert not result.exception
2015-09-09 05:31:48 +00:00
2017-04-25 13:04:11 +00:00
from certidude import const, config, authority
2017-03-26 20:44:47 +00:00
assert authority.ca_cert.serial_number >= 0x100000000000000000000000000000000000000
assert authority.ca_cert.serial_number <= 0xfffffffffffffffffffffffffffffffffffffff
2017-03-13 15:20:41 +00:00
assert authority.ca_cert.not_valid_before < datetime.now()
assert authority.ca_cert.not_valid_after > datetime.now() + timedelta(days=7000)
# Password is bot, users created by Travis
2017-04-25 18:47:41 +00:00
usertoken = "Basic dXNlcmJvdDpib3Q="
admintoken = "Basic YWRtaW5ib3Q6Ym90"
result = runner.invoke(cli, ['users'])
assert not result.exception
2017-04-25 10:52:10 +00:00
# Try starting up forked server
result = runner.invoke(cli, ['serve', '-f', '-p', '8080'])
assert not result.exception
2017-03-13 15:20:41 +00:00
2017-04-25 10:06:59 +00:00
# Check that we can retrieve empty CRL
2017-04-25 10:52:10 +00:00
r = client().simulate_get("/api/revoked/")
2017-04-25 10:06:59 +00:00
assert r.status_code == 200
2017-04-25 13:04:11 +00:00
# Test command line interface
2017-03-26 21:16:01 +00:00
result = runner.invoke(cli, ['list', '-srv'])
assert not result.exception
2017-04-25 13:04:11 +00:00
# Test CA certificate fetch
r = client().simulate_get("/api/certificate")
assert r.status_code == 200
assert r.headers.get('content-type') == "application/x-x509-ca-cert"
# Test request submission
buf = generate_csr(cn=u"test")
2017-04-25 13:04:11 +00:00
r = client().simulate_post("/api/request/", body=buf)
assert r.status_code == 415 # wrong content type
r = client().simulate_post("/api/request/",
body=buf,
headers={"content-type":"application/pkcs10"})
assert r.status_code == 202 # success
r = client().simulate_post("/api/request/",
body=buf,
headers={"content-type":"application/pkcs10"})
assert r.status_code == 202 # already exists, same keypair so it's ok
r = client().simulate_post("/api/request/",
query_string="wait=1",
body=buf,
headers={"content-type":"application/pkcs10"})
assert r.status_code == 303 # redirect to long poll
r = client().simulate_post("/api/request/",
body=generate_csr(cn=u"test"),
2017-04-25 13:04:11 +00:00
headers={"content-type":"application/pkcs10"})
assert r.status_code == 409 # duplicate cn, different keypair
r = client().simulate_get("/api/request/test/", headers={"Accept":"application/json"})
assert r.status_code == 200 # fetch as JSON ok
assert r.headers.get('content-type') == "application/json"
r = client().simulate_get("/api/request/test/", headers={"Accept":"application/x-pem-file"})
assert r.status_code == 200 # fetch as PEM ok
assert r.headers.get('content-type') == "application/x-pem-file"
r = client().simulate_get("/api/request/test/", headers={"Accept":"text/plain"})
assert r.status_code == 415 # not available as plaintext
r = client().simulate_get("/api/request/nonexistant/", headers={"Accept":"application/json"})
assert r.status_code == 404 # nonexistant common names
r = client().simulate_post("/api/request/", query_string="autosign=1",
body=buf,
headers={"content-type":"application/pkcs10"})
assert r.status_code == 200 # autosign successful
assert r.headers.get('content-type') == "application/x-pem-file"
# TODO: submit messed up CSR-s: no CN, empty CN etc
2017-04-25 13:04:11 +00:00
# Test command line interface
result = runner.invoke(cli, ['list', '-srv'])
assert not result.exception
2017-03-13 15:20:41 +00:00
result = runner.invoke(cli, ['sign', 'test', '-o'])
assert not result.exception
result = runner.invoke(cli, ['revoke', 'test'])
assert not result.exception
2017-03-13 15:54:33 +00:00
authority.generate_ovpn_bundle(u"test2")
authority.generate_pkcs12_bundle(u"test3")
result = runner.invoke(cli, ['list', '-srv'])
assert not result.exception
2017-03-26 21:16:01 +00:00
result = runner.invoke(cli, ['cron'])
assert not result.exception
2017-04-25 10:06:59 +00:00
# Test session API call
r = client().simulate_get("/api/", headers={"Authorization":usertoken})
assert r.status_code == 200
r = client().simulate_get("/api/", headers={"Authorization":admintoken})
assert r.status_code == 200
r = client().simulate_get("/api/")
assert r.status_code == 401
2017-04-25 10:06:59 +00:00
# Test signed certificate API call
2017-04-25 13:04:11 +00:00
r = client().simulate_get("/api/signed/nonexistant/")
2017-04-25 10:58:21 +00:00
assert r.status_code == 404
2017-04-25 13:04:11 +00:00
r = client().simulate_get("/api/signed/test2/")
2017-04-25 10:06:59 +00:00
assert r.status_code == 200
assert r.headers.get('content-type') == "application/x-pem-file"
2017-04-25 13:04:11 +00:00
r = client().simulate_get("/api/signed/test2/", headers={"Accept":"application/json"})
2017-04-25 10:06:59 +00:00
assert r.status_code == 200
assert r.headers.get('content-type') == "application/json"
2017-04-25 13:04:11 +00:00
r = client().simulate_get("/api/signed/test2/", headers={"Accept":"text/plain"})
2017-04-25 10:58:21 +00:00
assert r.status_code == 415
2017-04-25 10:06:59 +00:00
# Test revocations API call
2017-04-25 13:04:11 +00:00
r = client().simulate_get("/api/revoked/")
2017-04-25 10:06:59 +00:00
assert r.status_code == 200
assert r.headers.get('content-type') == "application/x-pkcs7-crl"
2017-04-25 13:04:11 +00:00
r = client().simulate_get("/api/revoked/",
2017-04-25 10:06:59 +00:00
headers={"Accept":"application/x-pem-file"})
assert r.status_code == 200
assert r.headers.get('content-type') == "application/x-pem-file"
2017-04-25 13:04:11 +00:00
r = client().simulate_get("/api/revoked/",
headers={"Accept":"text/plain"})
assert r.status_code == 415
r = client().simulate_get("/api/revoked/", query_string="wait=true",
headers={"Accept":"application/x-pem-file"})
assert r.status_code == 303
2017-04-25 10:06:59 +00:00
# Test attribute fetching API call
2017-04-25 10:52:10 +00:00
r = client().simulate_get("/api/signed/test2/attr/")
2017-04-25 10:06:59 +00:00
assert r.status_code == 403
r = client().simulate_get("/api/signed/test2/lease/", headers={"Authorization":admintoken})
assert r.status_code == 404
2017-04-25 10:06:59 +00:00
# Insert lease as if VPN gateway had submitted it
2017-04-25 10:06:59 +00:00
path, _, _ = authority.get_signed("test2")
setxattr(path, "user.lease.address", b"127.0.0.1")
setxattr(path, "user.lease.last_seen", b"random")
2017-04-25 10:52:10 +00:00
r = client().simulate_get("/api/signed/test2/attr/")
2017-04-25 10:06:59 +00:00
assert r.status_code == 200
# Test lease retrieval
r = client().simulate_get("/api/signed/test2/lease/")
assert r.status_code == 401
r = client().simulate_get("/api/signed/test2/lease/", headers={"Authorization":usertoken})
assert r.status_code == 403
r = client().simulate_get("/api/signed/test2/lease/", headers={"Authorization":admintoken})
assert r.status_code == 200
assert r.headers.get('content-type') == "application/json; charset=UTF-8"
2017-04-25 10:06:59 +00:00
# Tags should not be visible anonymously
2017-04-25 10:52:10 +00:00
r = client().simulate_get("/api/signed/test2/tag/")
2017-04-25 10:06:59 +00:00
assert r.status_code == 401
2017-04-25 18:47:41 +00:00
r = client().simulate_get("/api/signed/test2/tag/", headers={"Authorization":usertoken})
assert r.status_code == 403
r = client().simulate_get("/api/signed/test2/tag/", headers={"Authorization":admintoken})
assert r.status_code == 200
# Tags can be added only by admin
r = client().simulate_post("/api/signed/test2/tag/")
assert r.status_code == 401
r = client().simulate_post("/api/signed/test2/tag/",
headers={"Authorization":usertoken})
assert r.status_code == 403
r = client().simulate_post("/api/signed/test2/tag/",
body="key=other&value=something",
headers={"content-type": "application/x-www-form-urlencoded", "Authorization":admintoken})
assert r.status_code == 200
2017-04-25 18:47:41 +00:00
# Tags can be overwritten only by admin
r = client().simulate_put("/api/signed/test2/tag/other/")
assert r.status_code == 401
r = client().simulate_put("/api/signed/test2/tag/other/",
headers={"Authorization":usertoken})
assert r.status_code == 403
r = client().simulate_put("/api/signed/test2/tag/other/",
body="value=else",
headers={"content-type": "application/x-www-form-urlencoded", "Authorization":admintoken})
assert r.status_code == 200
# Tags can be deleted only by admin
r = client().simulate_delete("/api/signed/test2/tag/else/")
assert r.status_code == 401
r = client().simulate_delete("/api/signed/test2/tag/else/",
headers={"Authorization":usertoken})
assert r.status_code == 403
r = client().simulate_delete("/api/signed/test2/tag/else/",
headers={"content-type": "application/x-www-form-urlencoded", "Authorization":admintoken})
assert r.status_code == 200
2017-04-25 10:06:59 +00:00
# Test revocation
r = client().simulate_delete("/api/signed/test2/")
assert r.status_code == 401
r = client().simulate_delete("/api/signed/test2/",
headers={"Authorization":usertoken})
assert r.status_code == 403
r = client().simulate_delete("/api/signed/test2/",
headers={"Authorization":admintoken})
assert r.status_code == 200
2017-04-25 10:06:59 +00:00
result = runner.invoke(cli, ['revoke', 'test3'])
assert not result.exception
2017-04-25 10:52:10 +00:00
# Test static
r = client().simulate_delete("/nonexistant.html")
assert r.status_code == 404
r = client().simulate_delete("/index.html")
assert r.status_code == 200
2017-04-25 10:52:10 +00:00