certidude/tests/test_cli.py

958 lines
38 KiB
Python
Raw Normal View History

2017-04-25 18:47:41 +00:00
import pwd
2015-09-09 05:31:48 +00:00
from click.testing import CliRunner
2016-09-18 15:30:31 +00:00
from datetime import datetime, timedelta
2017-05-03 07:04:52 +00:00
from time import sleep
2017-04-25 10:52:10 +00:00
import pytest
2017-05-03 07:04:52 +00:00
import shutil
import os
2017-04-25 13:04:11 +00:00
UA_FEDORA_FIREFOX = "Mozilla/5.0 (X11; Fedora; Linux x86_64) " \
"AppleWebKit/537.36 (KHTML, like Gecko) Chrome/57.0.2987.133 Safari/537.36"
smtp=None
inbox=[]
class DummySMTP(object):
def __init__(self,address):
self.address=address
def login(self,username,password):
self.username=username
self.password=password
def sendmail(self,from_address,to_address,fullmessage):
global inbox
inbox.append(fullmessage)
return []
def quit(self):
self.has_quit=True
# this is the actual monkey patch (simply replacing one class with another)
import smtplib
smtplib.SMTP=DummySMTP
2015-09-09 05:31:48 +00:00
runner = CliRunner()
2017-04-25 10:52:10 +00:00
@pytest.fixture(scope='module')
def client():
from certidude.api import certidude_app
from falcon import testing
app = certidude_app()
return testing.TestClient(app)
2017-04-25 10:52:10 +00:00
def generate_csr(cn=None):
from cryptography import x509
from cryptography.hazmat.primitives.asymmetric import rsa, padding
from cryptography.hazmat.primitives import hashes, serialization
from cryptography.hazmat.backends import default_backend
from cryptography.x509.oid import NameOID
2017-04-25 13:04:11 +00:00
key = rsa.generate_private_key(
public_exponent=65537,
key_size=1024,
backend=default_backend())
csr = x509.CertificateSigningRequestBuilder()
if cn is not None:
csr = csr.subject_name(x509.Name([x509.NameAttribute(NameOID.COMMON_NAME, cn)]))
2017-04-25 13:04:11 +00:00
buf = csr.sign(key, hashes.SHA256(), default_backend()
).public_bytes(serialization.Encoding.PEM)
return buf
2017-04-25 10:52:10 +00:00
2017-05-03 21:03:51 +00:00
def clean_client():
assert os.getuid() == 0 and os.getgid() == 0
if os.path.exists("/etc/certidude/client.conf"):
os.unlink("/etc/certidude/client.conf")
if os.path.exists("/etc/certidude/services.conf"):
os.unlink("/etc/certidude/services.conf")
# Remove client storage area
if os.path.exists("/tmp/ca.example.lan"):
for filename in os.listdir("/tmp/ca.example.lan"):
if filename.endswith(".pem"):
os.unlink(os.path.join("/tmp/ca.example.lan", filename))
# Reset IPsec stuff
with open("/etc/ipsec.conf", "w") as fh: # TODO: make compatible with Fedora
pass
with open("/etc/ipsec.secrets", "w") as fh: # TODO: make compatible with Fedora
pass
2017-05-03 21:03:51 +00:00
def clean_server():
if os.path.exists("/run/certidude/signer.pid"):
with open("/run/certidude/signer.pid") as fh:
try:
os.kill(int(fh.read()), 15)
except OSError:
pass
if os.path.exists("/run/certidude/server.pid"):
with open("/run/certidude/server.pid") as fh:
try:
os.kill(int(fh.read()), 15)
except OSError:
pass
if os.path.exists("/var/lib/certidude/ca.example.lan"):
shutil.rmtree("/var/lib/certidude/ca.example.lan")
if os.path.exists("/etc/certidude/server.conf"):
os.unlink("/etc/certidude/server.conf")
2017-05-01 23:06:45 +00:00
if os.path.exists("/run/certidude"):
shutil.rmtree("/run/certidude")
if os.path.exists("/var/log/certidude.log"):
os.unlink("/var/log/certidude.log")
if os.path.exists("/etc/cron.hourly/certidude"):
os.unlink("/etc/cron.hourly/certidude")
2017-05-04 06:40:47 +00:00
# systemd
if os.path.exists("/etc/systemd/system/certidude.service"):
os.unlink("/etc/systemd/system/certidude.service")
2017-05-03 21:03:51 +00:00
# Remove nginx stuff
if os.path.exists("/etc/nginx/sites-available/ca.conf"):
os.unlink("/etc/nginx/sites-available/ca.conf")
if os.path.exists("/etc/nginx/sites-enabled/ca.conf"):
os.unlink("/etc/nginx/sites-enabled/ca.conf")
if os.path.exists("/etc/nginx/sites-available/certidude.conf"):
os.unlink("/etc/nginx/sites-available/certidude.conf")
if os.path.exists("/etc/nginx/sites-enabled/certidude.conf"):
os.unlink("/etc/nginx/sites-enabled/certidude.conf")
2017-05-03 21:03:51 +00:00
if os.path.exists("/etc/nginx/conf.d/tls.conf"):
os.unlink("/etc/nginx/conf.d/tls.conf")
# Remove OpenVPN stuff
if os.path.exists("/etc/openvpn"):
for filename in os.listdir("/etc/openvpn"):
if filename.endswith(".conf"):
os.unlink(os.path.join("/etc/openvpn", filename))
if os.path.exists("/etc/openvpn/keys"):
shutil.rmtree("/etc/openvpn/keys")
# Stop samba
if os.path.exists("/run/samba/samba.pid"):
with open("/run/samba/samba.pid") as fh:
try:
os.kill(int(fh.read()), 15)
except OSError:
pass
if os.path.exists("/etc/krb5.keytab"):
os.unlink("/etc/krb5.keytab")
if os.path.exists("/etc/certidude/server.keytab"):
os.unlink("/etc/certidude/server.keytab")
if os.path.exists("/var/lib/samba/"):
shutil.rmtree("/var/lib/samba")
os.makedirs("/var/lib/samba")
# Restore initial resolv.conf
shutil.copyfile("/etc/resolv.conf.orig", "/etc/resolv.conf")
def test_cli_setup_authority():
import os
import sys
assert os.getuid() == 0, "Run tests as root in a clean VM or container"
if not os.path.exists("/etc/resolv.conf.orig"):
shutil.copyfile("/etc/resolv.conf", "/etc/resolv.conf.orig")
clean_server()
2017-05-03 21:03:51 +00:00
clean_client()
# Bootstrap domain controller here,
# Samba startup takes some time
os.system("apt install -y samba krb5-user winbind")
if os.path.exists("/etc/samba/smb.conf"):
os.unlink("/etc/samba/smb.conf")
os.system("samba-tool domain provision --server-role=dc --domain=EXAMPLE --realm=EXAMPLE.LAN --host-name=ca")
os.system("samba-tool user add userbot S4l4k4l4 --given-name='User' --surname='Bot'")
os.system("samba-tool user add adminbot S4l4k4l4 --given-name='Admin' --surname='Bot'")
2017-05-07 22:14:58 +00:00
os.system("samba-tool group addmembers 'Domain Admins' adminbot")
os.system("samba-tool user setpassword administrator --newpassword=S4l4k4l4")
os.symlink("/var/lib/samba/private/secrets.keytab", "/etc/krb5.keytab")
os.chmod("/var/lib/samba/private/secrets.keytab", 0644) # To allow access to certidude server
2017-05-07 19:28:50 +00:00
if os.path.exists("/etc/krb5.conf"): # Remove the one from krb5-user package
os.unlink("/etc/krb5.conf")
os.symlink("/var/lib/samba/private/krb5.conf", "/etc/krb5.conf")
with open("/etc/resolv.conf", "w") as fh:
fh.write("nameserver 127.0.0.1\nsearch example.lan\n")
# TODO: dig -t srv perhaps?
os.system("samba")
2017-05-01 17:06:39 +00:00
from certidude.cli import entry_point as cli
from certidude import const
2017-05-04 06:40:47 +00:00
result = runner.invoke(cli, ['setup', 'authority', '-s'])
os.setgid(0) # Restore GID
os.umask(0022)
result = runner.invoke(cli, ['setup', 'authority']) # For if-else branches
2017-05-03 07:04:52 +00:00
os.setgid(0) # Restore GID
os.umask(0022)
assert not result.exception, result.output
2017-05-03 07:04:52 +00:00
assert os.getuid() == 0 and os.getgid() == 0, "Serve dropped permissions incorrectly!"
assert os.system("nginx -t") == 0, "invalid nginx configuration"
assert os.path.exists("/run/nginx.pid"), "nginx wasn't started up properly"
2015-09-09 05:31:48 +00:00
from certidude import config, authority, auth, user
2017-03-26 20:44:47 +00:00
assert authority.ca_cert.serial_number >= 0x100000000000000000000000000000000000000
assert authority.ca_cert.serial_number <= 0xfffffffffffffffffffffffffffffffffffffff
2017-03-13 15:20:41 +00:00
assert authority.ca_cert.not_valid_before < datetime.now()
assert authority.ca_cert.not_valid_after > datetime.now() + timedelta(days=7000)
assert authority.server_flags("lauri@fedora-123") == False
assert authority.server_flags("fedora-123") == False
assert authority.server_flags("vpn.example.lan") == True
assert authority.server_flags("lauri@a.b.c") == False
# Generate garbage
with open("/var/lib/certidude/ca.example.lan/bla", "w") as fh:
pass
with open("/var/lib/certidude/ca.example.lan/requests/bla", "w") as fh:
pass
with open("/var/lib/certidude/ca.example.lan/signed/bla", "w") as fh:
pass
with open("/var/lib/certidude/ca.example.lan/revoked/bla", "w") as fh:
pass
2017-03-13 15:20:41 +00:00
# Start server before any signing operations are performed
config.CERTIFICATE_RENEWAL_ALLOWED = True
2017-05-03 07:04:52 +00:00
server_pid = os.fork()
if not server_pid:
# Fork to prevent umask, setuid, setgid side effects
result = runner.invoke(cli, ['serve', '-p', '8080', '-l', '127.0.1.1', '-e'])
2017-05-03 07:04:52 +00:00
assert not result.exception, result.output
return
sleep(1) # Wait for serve to start up
2017-05-01 20:15:45 +00:00
import requests
2017-05-01 21:52:27 +00:00
# Test CA certificate fetch
buf = open("/var/lib/certidude/ca.example.lan/ca_crt.pem").read()
r = requests.get("http://ca.example.lan/api/certificate")
assert r.status_code == 200
assert r.headers.get('content-type') == "application/x-x509-ca-cert"
assert r.text == buf
2017-05-04 07:38:49 +00:00
r = client().simulate_get("/api/certificate")
assert r.status_code == 200
assert r.headers.get('content-type') == "application/x-x509-ca-cert"
assert r.text == buf
2017-05-01 21:52:27 +00:00
# Password is bot, users created by Travis
2017-04-25 18:47:41 +00:00
usertoken = "Basic dXNlcmJvdDpib3Q="
admintoken = "Basic YWRtaW5ib3Q6Ym90"
2017-05-07 22:14:58 +00:00
2017-04-25 18:47:41 +00:00
result = runner.invoke(cli, ['users'])
assert not result.exception, result.output
2017-05-07 22:14:58 +00:00
assert "user;userbot;User;Bot;userbot@example.lan" in result.output
assert "admin;adminbot;;;adminbot@example.lan" in result.output
# TODO: assert nothing else is in the list
2017-04-25 18:47:41 +00:00
2017-04-25 10:06:59 +00:00
# Check that we can retrieve empty CRL
2017-05-01 21:52:27 +00:00
assert authority.export_crl(), "Failed to export CRL"
2017-05-03 07:04:52 +00:00
r = requests.get("http://ca.example.lan/api/revoked/")
assert r.status_code == 200, r.text
2017-04-25 10:06:59 +00:00
2017-04-25 13:04:11 +00:00
# Test command line interface
2017-03-26 21:16:01 +00:00
result = runner.invoke(cli, ['list', '-srv'])
assert not result.exception, result.output
2017-03-26 21:16:01 +00:00
# Test static
2017-05-01 21:41:34 +00:00
r = requests.get("http://ca.example.lan/index.html")
2017-05-03 07:04:52 +00:00
assert r.status_code == 200, r.text # if this breaks certidude serve has no read access to static folder
r = requests.get("http://ca.example.lan/nonexistant.html")
assert r.status_code == 404, r.text
2017-05-04 07:38:49 +00:00
r = requests.get("http://ca.example.lan/../nonexistant.html")
assert r.status_code == 400, r.text
2017-05-04 07:38:49 +00:00
r = client().simulate_get("/")
assert r.status_code == 200, r.text
2017-05-04 07:38:49 +00:00
r = client().simulate_get("/index.html")
assert r.status_code == 200, r.text
r = client().simulate_get("/nonexistant.html")
assert r.status_code == 404, r.text
r = client().simulate_get("/../nonexistant.html")
assert r.status_code == 400, r.text
2017-04-25 13:04:11 +00:00
# Test request submission
buf = generate_csr(cn=u"test")
2017-04-25 13:04:11 +00:00
r = client().simulate_post("/api/request/", body=buf)
assert r.status_code == 415 # wrong content type
r = client().simulate_post("/api/request/",
body=buf,
headers={"content-type":"application/pkcs10"})
assert r.status_code == 202 # success
assert "Stored request " in inbox.pop(), inbox
assert os.path.exists("/var/lib/certidude/ca.example.lan/requests/test.pem")
2017-04-25 13:04:11 +00:00
# Test request deletion
r = client().simulate_delete("/api/request/test/")
assert r.status_code == 401, r.text
r = client().simulate_delete("/api/request/test/",
headers={"Authorization":usertoken})
assert r.status_code == 403, r.text
r = client().simulate_delete("/api/request/test/",
headers={"User-Agent":UA_FEDORA_FIREFOX, "Authorization":admintoken})
assert r.status_code == 403, r.text # CSRF prevented
assert os.path.exists("/var/lib/certidude/ca.example.lan/requests/test.pem")
r = client().simulate_delete("/api/request/test/",
headers={"Authorization":admintoken})
assert r.status_code == 200, r.text
r = client().simulate_delete("/api/request/nonexistant/",
headers={"Authorization":admintoken})
assert r.status_code == 404, r.text
# Test request submission corner cases
r = client().simulate_post("/api/request/",
body=buf,
headers={"content-type":"application/pkcs10"})
assert r.status_code == 202 # success
assert "Stored request " in inbox.pop(), inbox
2017-04-25 13:04:11 +00:00
r = client().simulate_post("/api/request/",
body=buf,
headers={"content-type":"application/pkcs10"})
assert r.status_code == 202 # already exists, same keypair so it's ok
assert not inbox
2017-04-25 13:04:11 +00:00
r = client().simulate_post("/api/request/",
2017-05-03 07:04:52 +00:00
query_string="wait=true",
2017-04-25 13:04:11 +00:00
body=buf,
headers={"content-type":"application/pkcs10"})
assert r.status_code == 303 # redirect to long poll
assert not inbox
2017-04-25 13:04:11 +00:00
r = client().simulate_post("/api/request/",
body=generate_csr(cn=u"test"),
2017-04-25 13:04:11 +00:00
headers={"content-type":"application/pkcs10"})
assert r.status_code == 409 # duplicate cn, different keypair
assert not inbox
2017-04-25 13:04:11 +00:00
r = client().simulate_get("/api/request/test/", headers={"Accept":"application/json"})
assert r.status_code == 200 # fetch as JSON ok
assert r.headers.get('content-type') == "application/json"
r = client().simulate_get("/api/request/test/", headers={"Accept":"application/x-pem-file"})
assert r.status_code == 200 # fetch as PEM ok
assert r.headers.get('content-type') == "application/x-pem-file"
r = client().simulate_get("/api/request/test/", headers={"Accept":"text/plain"})
assert r.status_code == 415 # not available as plaintext
r = client().simulate_get("/api/request/nonexistant/", headers={"Accept":"application/json"})
assert r.status_code == 404 # nonexistant common names
# TODO: submit messed up CSR-s: no CN, empty CN etc
# Test command line interface
result = runner.invoke(cli, ['list', '-srv'])
assert not result.exception, result.output
# Test sign API call
r = client().simulate_patch("/api/request/test/")
assert r.status_code == 401, r.text
r = client().simulate_patch("/api/request/test/",
headers={"Authorization":usertoken})
assert r.status_code == 403, r.text
r = client().simulate_patch("/api/request/test/",
headers={"Authorization":admintoken})
assert r.status_code == 201, r.text
assert "Signed " in inbox.pop(), inbox
# Test autosign
buf = generate_csr(cn=u"test2")
2017-05-03 07:04:52 +00:00
r = client().simulate_post("/api/request/",
query_string="autosign=1",
body=buf,
headers={"content-type":"application/pkcs10"})
assert r.status_code == 200 # autosign successful
assert r.headers.get('content-type') == "application/x-pem-file"
assert "Signed " in inbox.pop(), inbox
assert not inbox
r = client().simulate_post("/api/request/",
query_string="autosign=1",
body=buf,
headers={"content-type":"application/pkcs10"})
assert r.status_code == 303 # already signed, redirect to signed certificate
assert not inbox
2017-04-25 10:06:59 +00:00
buf = generate_csr(cn=u"test2")
r = client().simulate_post("/api/request/",
query_string="autosign=1",
body=buf,
headers={"content-type":"application/pkcs10"})
assert r.status_code == 202 # duplicate CN, request stored
assert "Stored request " in inbox.pop(), inbox
assert not inbox
buf = generate_csr(cn=u"test2.example.lan")
r = client().simulate_post("/api/request/",
query_string="autosign=1",
body=buf,
headers={"content-type":"application/pkcs10"})
assert r.status_code == 202 # server CN, request stored
assert "Stored request " in inbox.pop(), inbox
assert not inbox
2017-04-25 10:06:59 +00:00
# Test signed certificate API call
2017-04-25 13:04:11 +00:00
r = client().simulate_get("/api/signed/nonexistant/")
assert r.status_code == 404, r.text
2017-04-25 10:58:21 +00:00
2017-05-03 07:04:52 +00:00
r = client().simulate_get("/api/signed/test/")
assert r.status_code == 200, r.text
2017-04-25 10:06:59 +00:00
assert r.headers.get('content-type') == "application/x-pem-file"
2017-05-03 07:04:52 +00:00
r = client().simulate_get("/api/signed/test/", headers={"Accept":"application/json"})
assert r.status_code == 200, r.text
2017-04-25 10:06:59 +00:00
assert r.headers.get('content-type') == "application/json"
2017-05-03 07:04:52 +00:00
r = client().simulate_get("/api/signed/test/", headers={"Accept":"text/plain"})
assert r.status_code == 415, r.text
2017-04-25 10:58:21 +00:00
2017-04-25 10:06:59 +00:00
# Test revocations API call
2017-04-25 13:04:11 +00:00
r = client().simulate_get("/api/revoked/",
2017-04-25 10:06:59 +00:00
headers={"Accept":"application/x-pem-file"})
2017-05-03 07:04:52 +00:00
assert r.status_code == 200, r.text # if this breaks certidude serve has no access to signer socket
2017-05-01 20:15:45 +00:00
assert r.headers.get('content-type') == "application/x-pem-file"
r = client().simulate_get("/api/revoked/")
assert r.status_code == 200, r.text
assert r.headers.get('content-type') == "application/x-pkcs7-crl"
2017-04-25 13:04:11 +00:00
r = client().simulate_get("/api/revoked/",
headers={"Accept":"text/plain"})
assert r.status_code == 415, r.text
2017-04-25 13:04:11 +00:00
2017-05-03 07:04:52 +00:00
r = client().simulate_get("/api/revoked/",
query_string="wait=true",
2017-04-25 13:04:11 +00:00
headers={"Accept":"application/x-pem-file"})
assert r.status_code == 303, r.text
2017-04-25 13:04:11 +00:00
2017-04-25 10:06:59 +00:00
# Test attribute fetching API call
2017-05-03 07:04:52 +00:00
r = client().simulate_get("/api/signed/test/attr/")
assert r.status_code == 403, r.text
r = client().simulate_get("/api/signed/nonexistant/attr/")
assert r.status_code == 404, r.text
2017-05-03 07:04:52 +00:00
r = client().simulate_get("/api/signed/test/lease/", headers={"Authorization":admintoken})
assert r.status_code == 404, r.text
2017-04-25 10:06:59 +00:00
2017-05-04 09:35:31 +00:00
# Insert lease
r = client().simulate_post("/api/lease/",
query_string = "client=test&address=127.0.0.1",
headers={"Authorization":admintoken})
assert r.status_code == 200, r.text # lease update ok
2017-05-07 22:14:58 +00:00
r = client().simulate_get("/api/signed/nonexistant/script/")
assert r.status_code == 404, r.text # cert not found
r = client().simulate_get("/api/signed/test/script/")
assert r.status_code == 200, r.text # script render ok
assert "uci set " in r.text, r.text
r = client().simulate_post("/api/lease/",
query_string = "client=test&address=127.0.0.1&serial=0",
headers={"Authorization":admintoken})
assert r.status_code == 403, r.text # invalid serial number supplied
2017-05-03 07:04:52 +00:00
r = client().simulate_get("/api/signed/test/attr/")
assert r.status_code == 200, r.text # read okay from own address
r = client().simulate_post("/api/lease/",
query_string = "client=test&address=1.2.3.4",
headers={"Authorization":admintoken})
assert r.status_code == 200, r.text # lease update ok
r = client().simulate_get("/api/signed/test/attr/")
assert r.status_code == 403, r.text # read failed from other address
2017-04-25 10:06:59 +00:00
# Test lease retrieval
2017-05-03 07:04:52 +00:00
r = client().simulate_get("/api/signed/test/lease/")
assert r.status_code == 401, r.text
2017-05-03 07:04:52 +00:00
r = client().simulate_get("/api/signed/test/lease/", headers={"Authorization":usertoken})
assert r.status_code == 403, r.text
2017-05-03 07:04:52 +00:00
r = client().simulate_get("/api/signed/test/lease/", headers={"Authorization":admintoken})
assert r.status_code == 200, r.text
assert r.headers.get('content-type') == "application/json; charset=UTF-8"
2017-04-25 10:06:59 +00:00
# Tags should not be visible anonymously
2017-05-03 07:04:52 +00:00
r = client().simulate_get("/api/signed/test/tag/")
assert r.status_code == 401, r.text
2017-05-03 07:04:52 +00:00
r = client().simulate_get("/api/signed/test/tag/", headers={"Authorization":usertoken})
assert r.status_code == 403, r.text
2017-05-03 07:04:52 +00:00
r = client().simulate_get("/api/signed/test/tag/", headers={"Authorization":admintoken})
assert r.status_code == 200, r.text
2017-04-25 18:47:41 +00:00
# Tags can be added only by admin
2017-05-03 07:04:52 +00:00
r = client().simulate_post("/api/signed/test/tag/")
assert r.status_code == 401, r.text
2017-05-03 07:04:52 +00:00
r = client().simulate_post("/api/signed/test/tag/",
headers={"Authorization":usertoken})
assert r.status_code == 403, r.text
2017-05-03 07:04:52 +00:00
r = client().simulate_post("/api/signed/test/tag/",
body="key=other&value=something",
headers={"content-type": "application/x-www-form-urlencoded", "Authorization":admintoken})
assert r.status_code == 200, r.text
r = client().simulate_post("/api/signed/test/tag/",
body="key=location&value=Tallinn",
headers={"content-type": "application/x-www-form-urlencoded", "Authorization":admintoken})
assert r.status_code == 200, r.text
2017-04-25 18:47:41 +00:00
# Tags can be overwritten only by admin
2017-05-04 07:38:49 +00:00
r = client().simulate_put("/api/signed/test/tag/something/")
assert r.status_code == 401, r.text
2017-05-04 07:38:49 +00:00
r = client().simulate_put("/api/signed/test/tag/something/",
headers={"Authorization":usertoken})
assert r.status_code == 403, r.text
2017-05-04 07:38:49 +00:00
r = client().simulate_put("/api/signed/test/tag/something/",
body="value=else",
headers={"content-type": "application/x-www-form-urlencoded", "Authorization":admintoken})
assert r.status_code == 200, r.text
r = client().simulate_put("/api/signed/test/tag/location=Tallinn/",
body="value=Tartu",
headers={"content-type": "application/x-www-form-urlencoded", "Authorization":admintoken})
assert r.status_code == 200, r.text
2017-05-04 09:35:31 +00:00
r = client().simulate_get("/api/signed/test/tag/", headers={"Authorization":admintoken})
assert r.status_code == 200, r.text
assert r.text == '[{"value": "Tartu", "key": "location", "id": "location=Tartu"}, {"value": "else", "key": "other", "id": "else"}]', r.text
# Tags can be deleted only by admin
2017-05-03 07:04:52 +00:00
r = client().simulate_delete("/api/signed/test/tag/else/")
assert r.status_code == 401, r.text
2017-05-03 07:04:52 +00:00
r = client().simulate_delete("/api/signed/test/tag/else/",
headers={"Authorization":usertoken})
assert r.status_code == 403, r.text
2017-05-03 07:04:52 +00:00
r = client().simulate_delete("/api/signed/test/tag/else/",
headers={"content-type": "application/x-www-form-urlencoded", "Authorization":admintoken})
assert r.status_code == 200, r.text
r = client().simulate_delete("/api/signed/test/tag/location=Tartu/",
headers={"content-type": "application/x-www-form-urlencoded", "Authorization":admintoken})
assert r.status_code == 200, r.text
2017-05-04 09:35:31 +00:00
r = client().simulate_get("/api/signed/test/tag/", headers={"Authorization":admintoken})
assert r.status_code == 200, r.text
assert r.text == "[]", r.text
2017-04-25 10:06:59 +00:00
# Test revocation
2017-05-03 07:04:52 +00:00
r = client().simulate_delete("/api/signed/test/")
assert r.status_code == 401, r.text
2017-05-03 07:04:52 +00:00
r = client().simulate_delete("/api/signed/test/",
headers={"Authorization":usertoken})
assert r.status_code == 403, r.text
2017-05-03 07:04:52 +00:00
r = client().simulate_delete("/api/signed/test/",
headers={"Authorization":admintoken})
assert r.status_code == 200, r.text
assert "Revoked " in inbox.pop(), inbox
2017-04-25 10:52:10 +00:00
2017-04-25 21:10:12 +00:00
# Log can be read only by admin
r = client().simulate_get("/api/log/")
assert r.status_code == 401, r.text
2017-04-25 21:10:12 +00:00
r = client().simulate_get("/api/log/",
headers={"Authorization":usertoken})
assert r.status_code == 403, r.text
2017-04-25 21:10:12 +00:00
r = client().simulate_get("/api/log/",
headers={"Authorization":admintoken})
assert r.status_code == 200, r.text
2017-04-25 21:10:12 +00:00
assert r.headers.get('content-type') == "application/json; charset=UTF-8"
2017-04-26 06:13:41 +00:00
2017-05-04 07:38:49 +00:00
# Test session API call
r = client().simulate_get("/api/", headers={"Authorization":usertoken})
assert r.status_code == 200
r = client().simulate_get("/api/", headers={"Authorization":admintoken})
assert r.status_code == 200
r = client().simulate_get("/api/", headers={"Accept":"text/plain", "Authorization":admintoken})
assert r.status_code == 415 # invalid media type
r = client().simulate_get("/api/")
assert r.status_code == 401
assert "Please authenticate" in r.text
2017-05-04 07:38:49 +00:00
2017-04-26 06:13:41 +00:00
# Test token mech
r = client().simulate_post("/api/token/")
assert r.status_code == 404, r.text
2017-04-26 06:13:41 +00:00
config.BUNDLE_FORMAT = "ovpn"
config.USER_ENROLLMENT_ALLOWED = True
r = client().simulate_post("/api/token/")
assert r.status_code == 401 # needs auth
r = client().simulate_post("/api/token/",
headers={"Authorization":usertoken})
assert r.status_code == 403 # regular user forbidden
r = client().simulate_post("/api/token/",
body="user=userbot", # TODO: test nonexistant user
headers={"content-type": "application/x-www-form-urlencoded", "Authorization":admintoken})
assert r.status_code == 200 # token generated by admin
assert "Token for " in inbox.pop(), inbox
2017-04-26 06:13:41 +00:00
r2 = client().simulate_get("/api/token/",
query_string="u=userbot&t=1493184342&c=ac9b71421d5741800c5a4905b20c1072594a2df863e60ba836464888786bf2a6",
headers={"content-type": "application/x-www-form-urlencoded", "Authorization":admintoken})
2017-04-26 06:26:14 +00:00
assert r2.status_code == 403 # invalid checksum
2017-05-03 07:04:52 +00:00
r2 = client().simulate_get("/api/token/",
query_string=r.content,
headers={"User-Agent":UA_FEDORA_FIREFOX})
2017-04-26 06:26:14 +00:00
assert r2.status_code == 200 # token consumed by anyone on Fedora
assert r2.headers.get('content-type') == "application/x-openvpn"
assert "Signed " in inbox.pop(), inbox
2017-04-26 06:26:14 +00:00
config.BUNDLE_FORMAT = "p12" # Switch to PKCS#12
r2 = client().simulate_get("/api/token/", query_string=r.content)
assert r2.status_code == 200 # token consumed by anyone on unknown device
assert r2.headers.get('content-type') == "application/x-pkcs12"
assert "Signed " in inbox.pop(), inbox
2017-05-03 21:03:51 +00:00
# Beyond this point don't use client()
const.STORAGE_PATH = "/tmp/"
2017-05-03 21:03:51 +00:00
#############
### nginx ###
#############
# In this case nginx is set up as web server with TLS certificates
# generated by certidude.
2017-05-03 21:03:51 +00:00
clean_client()
2017-05-04 06:40:47 +00:00
result = runner.invoke(cli, ["setup", "nginx", "-cn", "www", "ca.example.lan"])
assert result.exception # FQDN required
2017-05-03 21:03:51 +00:00
result = runner.invoke(cli, ["setup", "nginx", "-cn", "www.example.lan", "ca.example.lan"])
assert not result.exception, result.output
2017-05-04 06:40:47 +00:00
result = runner.invoke(cli, ["setup", "nginx", "-cn", "www.example.lan", "ca.example.lan"])
assert not result.exception, result.output # client conf already exists, remove to regenerate
2017-05-04 06:40:47 +00:00
2017-05-03 21:03:51 +00:00
import os
with open("/etc/certidude/client.conf", "a") as fh:
fh.write("insecure = true\n")
result = runner.invoke(cli, ["request", "--no-wait"])
assert not result.exception, result.output
2017-05-04 06:40:47 +00:00
assert "refused to sign" in result.output, result.output
2017-05-03 21:03:51 +00:00
child_pid = os.fork()
if not child_pid:
result = runner.invoke(cli, ['sign', 'www.example.lan'])
assert not result.exception, result.output
return
else:
os.waitpid(child_pid, 0)
result = runner.invoke(cli, ["request", "--no-wait"])
assert not result.exception, result.output
2017-05-03 21:03:51 +00:00
assert "Writing certificate to:" in result.output, result.output
2017-05-03 21:03:51 +00:00
result = runner.invoke(cli, ["request", "--renew", "--no-wait"])
assert not result.exception, result.output
2017-05-03 21:03:51 +00:00
assert "Writing certificate to:" in result.output, result.output
assert "Attached renewal signature" in result.output, result.output
assert "refused to sign immideately" not in result.output, result.output
2017-05-03 21:03:51 +00:00
# Test nginx setup
assert os.system("nginx -t") == 0, "Generated nginx config was invalid"
# TODO: test client verification with curl
2017-05-03 21:03:51 +00:00
###############
### OpenVPN ###
###############
# First OpenVPN server is set up
2017-05-03 21:03:51 +00:00
clean_client()
if not os.path.exists("/etc/openvpn/keys"):
os.makedirs("/etc/openvpn/keys")
2017-05-04 06:40:47 +00:00
result = runner.invoke(cli, ['setup', 'openvpn', 'server', "-cn", "vpn", "ca.example.lan"])
assert result.exception, result.output
2017-05-03 21:03:51 +00:00
result = runner.invoke(cli, ['setup', 'openvpn', 'server', "-cn", "vpn.example.lan", "ca.example.lan"])
assert not result.exception, result.output
2017-05-04 06:40:47 +00:00
result = runner.invoke(cli, ['setup', 'openvpn', 'server', "-cn", "vpn.example.lan", "ca.example.lan"])
assert not result.exception, result.output # client conf already exists, remove to regenerate
2017-05-04 06:40:47 +00:00
with open("/etc/certidude/client.conf", "a") as fh:
fh.write("insecure = true\n")
result = runner.invoke(cli, ["request", "--no-wait"])
2017-05-03 21:03:51 +00:00
assert not result.exception, result.output
2017-05-03 07:04:52 +00:00
child_pid = os.fork()
if not child_pid:
result = runner.invoke(cli, ['sign', 'vpn.example.lan'])
assert not result.exception, result.output
return
else:
os.waitpid(child_pid, 0)
result = runner.invoke(cli, ["request", "--no-wait"])
assert not result.exception, result.output
2017-05-03 21:03:51 +00:00
assert "Writing certificate to:" in result.output, result.output
assert os.path.exists("/tmp/ca.example.lan/server_cert.pem")
assert os.path.exists("/etc/openvpn/site-to-client.conf")
2017-05-03 21:03:51 +00:00
# Secondly OpenVPN client is set up
2017-05-03 21:03:51 +00:00
os.unlink("/etc/certidude/client.conf")
os.unlink("/etc/certidude/services.conf")
result = runner.invoke(cli, ['setup', 'openvpn', 'client', "-cn", "roadwarrior1", "ca.example.lan", "vpn.example.lan"])
assert not result.exception, result.output
2017-05-04 06:40:47 +00:00
result = runner.invoke(cli, ['setup', 'openvpn', 'client', "-cn", "roadwarrior1", "ca.example.lan", "vpn.example.lan"])
assert not result.exception, result.output # client conf already exists, remove to regenerate
2017-05-04 06:40:47 +00:00
2017-05-03 21:03:51 +00:00
with open("/etc/certidude/client.conf", "a") as fh:
fh.write("insecure = true\n")
result = runner.invoke(cli, ["request", "--no-wait"])
assert not result.exception, result.output
assert "Writing certificate to:" in result.output, result.output
assert os.path.exists("/etc/openvpn/client-to-site.conf")
2017-05-03 21:03:51 +00:00
# TODO: Check that tunnel interfaces came up, perhaps try to ping?
# TODO: assert key, req, cert paths were included correctly in OpenVPN config
2017-05-03 21:03:51 +00:00
###############
### IPSec ###
###############
clean_client()
2017-05-04 06:40:47 +00:00
result = runner.invoke(cli, ['setup', 'strongswan', 'server', "-cn", "ipsec", "ca.example.lan"])
assert result.exception, result.output # FQDN required
2017-05-03 21:03:51 +00:00
result = runner.invoke(cli, ['setup', 'strongswan', 'server', "-cn", "ipsec.example.lan", "ca.example.lan"])
assert not result.exception, result.output
assert open("/etc/ipsec.secrets").read() == ": RSA /tmp/ca.example.lan/server_key.pem\n"
2017-05-03 21:03:51 +00:00
2017-05-04 06:40:47 +00:00
result = runner.invoke(cli, ['setup', 'strongswan', 'server', "-cn", "ipsec.example.lan", "ca.example.lan"])
assert not result.exception, result.output # client conf already exists, remove to regenerate
2017-05-04 06:40:47 +00:00
2017-05-03 21:03:51 +00:00
with open("/etc/certidude/client.conf", "a") as fh:
fh.write("insecure = true\n")
result = runner.invoke(cli, ["request", "--no-wait"])
assert not result.exception, result.output
child_pid = os.fork()
if not child_pid:
result = runner.invoke(cli, ['sign', 'ipsec.example.lan'])
assert not result.exception, result.output
return
else:
os.waitpid(child_pid, 0)
result = runner.invoke(cli, ["request", "--no-wait"])
assert not result.exception, result.output
assert "Writing certificate to:" in result.output, result.output
assert os.path.exists("/tmp/ca.example.lan/server_cert.pem")
# Reset config
os.unlink("/etc/certidude/client.conf")
os.unlink("/etc/certidude/services.conf")
result = runner.invoke(cli, ['setup', 'strongswan', 'client', "-cn", "roadwarrior2", "ca.example.lan", "ipsec.example.lan"])
assert not result.exception, result.output
2017-05-03 07:04:52 +00:00
2017-05-04 06:40:47 +00:00
result = runner.invoke(cli, ['setup', 'strongswan', 'client', "-cn", "roadwarrior2", "ca.example.lan", "ipsec.example.lan"])
assert not result.exception, result.output # client conf already exists, remove to regenerate
2017-05-04 06:40:47 +00:00
2017-05-03 21:03:51 +00:00
with open("/etc/certidude/client.conf", "a") as fh:
fh.write("insecure = true\n")
result = runner.invoke(cli, ["request", "--no-wait"])
assert not result.exception, result.output
assert "Writing certificate to:" in result.output, result.output
######################
### NetworkManager ###
######################
2017-05-04 06:40:47 +00:00
clean_client()
2017-05-03 21:03:51 +00:00
result = runner.invoke(cli, ['setup', 'openvpn', 'networkmanager', "-cn", "roadwarrior3", "ca.example.lan", "vpn.example.lan"])
assert not result.exception, result.output
2017-05-04 06:40:47 +00:00
with open("/etc/certidude/client.conf", "a") as fh:
fh.write("insecure = true\n")
result = runner.invoke(cli, ["request", "--no-wait"])
assert not result.exception, result.output
assert "Writing certificate to:" in result.output, result.output
clean_client()
2017-05-03 21:03:51 +00:00
result = runner.invoke(cli, ['setup', 'strongswan', 'networkmanager', "-cn", "roadwarrior4", "ca.example.lan", "ipsec.example.lan"])
assert not result.exception, result.output
2017-05-04 06:40:47 +00:00
with open("/etc/certidude/client.conf", "a") as fh:
fh.write("insecure = true\n")
result = runner.invoke(cli, ["request", "--no-wait"])
assert not result.exception, result.output
assert "Writing certificate to:" in result.output, result.output
2017-05-03 21:03:51 +00:00
####################################
### Switch to Kerberos/LDAP auth ###
####################################
# Shut down current instance
requests.get("http://ca.example.lan/api/exit")
requests.get("http://ca.example.lan/api/")
os.waitpid(server_pid, 0)
# (re)auth against DC
assert os.system("kdestroy") == 0
assert not os.path.exists("/tmp/krb5cc_0")
assert os.system("echo S4l4k4l4 | kinit administrator") == 0
assert os.path.exists("/tmp/krb5cc_0")
# Fork to not contaminate environment while creating service principal
spn_pid = os.fork()
if not spn_pid:
os.system("sed -e 's/CA/CA\\nkerberos method = system keytab/' -i /etc/samba/smb.conf ")
os.environ["KRB5_KTNAME"] = "FILE:/etc/certidude/server.keytab"
assert os.system("net ads keytab add HTTP -k") == 0
assert os.path.exists("/etc/certidude/server.keytab")
os.system("chown root:certidude /etc/certidude/server.keytab")
os.system("chmod 640 /etc/certidude/server.keytab")
return
else:
os.waitpid(spn_pid, 0)
# Make modifications to /etc/certidude/server.conf so
# Certidude would auth against domain controller
os.system("sed -e 's/ldap uri = ldaps:.*/ldap uri = ldaps:\\/\\/ca.example.lan/g' -i /etc/certidude/server.conf")
os.system("sed -e 's/ldap uri = ldap:.*/ldap uri = ldap:\\/\\/ca.example.lan/g' -i /etc/certidude/server.conf")
2017-05-07 22:14:58 +00:00
os.system("sed -e 's/backends = pam/backends = kerberos ldap/g' -i /etc/certidude/server.conf")
os.system("sed -e 's/backend = posix/backend = ldap/g' -i /etc/certidude/server.conf")
os.system("sed -e 's/dc1/ca/g' -i /etc/cron.hourly/certidude")
# Update server credential cache
with open("/etc/cron.hourly/certidude") as fh:
cronjob = fh.read()
assert "ldap/ca.example.lan" in cronjob, cronjob
os.system("/etc/cron.hourly/certidude")
server_pid = os.fork() # Fork to prevent environment contamination
if not server_pid:
# Apply /etc/certidude/server.conf changes
reload(config)
reload(user)
reload(auth)
assert isinstance(user.User.objects, user.ActiveDirectoryUserManager), user.User.objects
2017-05-07 22:14:58 +00:00
result = runner.invoke(cli, ['users'])
assert not result.exception, result.output
assert "user;userbot;User;Bot;userbot@example.lan" in result.output
assert "admin;adminbot;Admin;Bot;adminbot@example.lan" in result.output
assert "admin;Administrator;Administrator;;Administrator@example.lan" in result.output
result = runner.invoke(cli, ['serve', '-p', '8080', '-l', '127.0.1.1', '-e'])
assert not result.exception, result.output
return
sleep(1) # Wait for serve to start up
2017-05-07 22:14:58 +00:00
#####################
### Kerberos auth ###
#####################
# TODO: pip install requests-kerberos
from requests_kerberos import HTTPKerberosAuth, OPTIONAL
auth = HTTPKerberosAuth(mutual_authentication=OPTIONAL, force_preemptive=True)
2017-05-07 22:14:58 +00:00
# Test Kerberos auth
r = requests.get("http://ca.example.lan/api/")
assert r.status_code == 401, r.text
assert "No Kerberos ticket offered" in r.text, r.text
r = requests.get("http://ca.example.lan/api/", headers={"Authorization": "Negotiate blerrgh"})
assert r.status_code == 400, r.text
assert "Malformed token" in r.text
r = requests.get("http://ca.example.lan/api/", headers={"Authorization": "Negotiate TlRMTVNTUAABAAAAl4II4gAAAAAAAAAAAAAAAAAAAAAKADk4AAAADw=="})
assert r.status_code == 400, r.text
assert "Unsupported authentication mechanism (NTLM" in r.text
r = requests.get("http://ca.example.lan/api/", auth=auth)
assert r.status_code == 200, r.text
2017-05-07 22:14:58 +00:00
#################
### LDAP auth ###
#################
# Test LDAP bind auth fallback
usertoken = "Basic dXNlcmJvdDpTNGw0azRsNA=="
admintoken = "Basic YWRtaW5ib3Q6UzRsNGs0bDQ="
with open("/etc/ldap/ldap.conf", "w") as fh:
fh.write("TLS_REQCERT allow") # TODO: Correct way
# curl http://ca.example.lan/api/ -u adminbot:S4l4k4l4 -H "User-agent: Android" -H "Referer: http://ca.example.lan"
r = requests.get("http://ca.example.lan/api/",
headers={"Authorization":usertoken, "User-Agent": "Android", "Referer":"http://ca.example.lan/"})
assert r.status_code == 200, r.text
2017-05-03 21:03:51 +00:00
###################
### Final tests ###
###################
2017-05-03 07:04:52 +00:00
# Test revocation on command-line
child_pid = os.fork()
if not child_pid:
2017-05-04 07:38:49 +00:00
result = runner.invoke(cli, ['revoke', 'roadwarrior4'])
2017-05-03 07:04:52 +00:00
assert not result.exception, result.output
return
else:
os.waitpid(child_pid, 0)
2017-05-04 07:38:49 +00:00
# Test revocation check on client side
result = runner.invoke(cli, ["request", "--no-wait"])
assert not result.exception, result.output
assert "Certificate has been revoked, wiping keys and certificates" in result.output, result.output
assert "Writing certificate to:" in result.output, result.output
2017-05-03 07:04:52 +00:00
result = runner.invoke(cli, ['list', '-srv'])
assert not result.exception, result.output
result = runner.invoke(cli, ['cron'])
assert not result.exception, result.output
# Shut down signer
assert authority.signer_exec("exit") == "ok"
# Shut down server
requests.get("http://ca.example.lan/api/exit")
os.waitpid(server_pid, 0)
2017-05-03 07:04:52 +00:00
# Note: STORAGE_PATH was mangled above, hence it's /tmp not /var/lib/certidude
assert open("/etc/apparmor.d/local/usr.lib.ipsec.charon").read() == "/tmp/** r,\n"
assert len(inbox) == 0, inbox # Make sure all messages were checked
os.system("service nginx stop")
os.system("service openvpn stop")
os.system("ipsec stop")
clean_server()
if __name__ == "__main__":
test_cli_setup_authority()