mirror of
https://github.com/laurivosandi/certidude
synced 2025-09-05 21:31:19 +00:00
tests: Handle forking
This commit is contained in:
@@ -1,10 +1,10 @@
|
||||
import subprocess
|
||||
import pwd
|
||||
from click.testing import CliRunner
|
||||
from datetime import datetime, timedelta
|
||||
from time import sleep
|
||||
import pytest
|
||||
|
||||
# pkill py && rm -Rfv ~/.certidude && TRAVIS=1 py.test tests
|
||||
import shutil
|
||||
import os
|
||||
|
||||
runner = CliRunner()
|
||||
|
||||
@@ -33,8 +33,11 @@ def generate_csr(cn=None):
|
||||
return buf
|
||||
|
||||
def test_cli_setup_authority():
|
||||
import shutil
|
||||
import os
|
||||
import sys
|
||||
|
||||
assert os.getuid() == 0, "Run tests as root in a clean VM or container"
|
||||
|
||||
if os.path.exists("/run/certidude/signer.pid"):
|
||||
with open("/run/certidude/signer.pid") as fh:
|
||||
try:
|
||||
@@ -69,7 +72,12 @@ def test_cli_setup_authority():
|
||||
from certidude import const
|
||||
|
||||
result = runner.invoke(cli, ['setup', 'authority'])
|
||||
os.setgid(0) # Restore GID
|
||||
os.umask(0022)
|
||||
|
||||
assert not result.exception, result.output
|
||||
assert os.getuid() == 0 and os.getgid() == 0, "Serve dropped permissions incorrectly!"
|
||||
|
||||
|
||||
from certidude import config, authority
|
||||
assert authority.ca_cert.serial_number >= 0x100000000000000000000000000000000000000
|
||||
@@ -79,19 +87,20 @@ def test_cli_setup_authority():
|
||||
|
||||
# Start server before any signing operations are performed
|
||||
config.CERTIFICATE_RENEWAL_ALLOWED = True
|
||||
result = runner.invoke(cli, ['serve', '-f', '-p', '80', '-l', '127.0.1.1'])
|
||||
assert not result.exception, result.output
|
||||
|
||||
server_pid = os.fork()
|
||||
if not server_pid:
|
||||
# Fork to prevent umask, setuid, setgid side effects
|
||||
result = runner.invoke(cli, ['serve', '-p', '80', '-l', '127.0.1.1'])
|
||||
assert not result.exception, result.output
|
||||
return
|
||||
|
||||
sleep(1) # Wait for serve to start up
|
||||
|
||||
import requests
|
||||
|
||||
# Test CA certificate fetch
|
||||
buf = open("/var/lib/certidude/ca.example.lan/ca_crt.pem").read()
|
||||
|
||||
r = client().simulate_get("/api/certificate")
|
||||
assert r.status_code == 200
|
||||
assert r.headers.get('content-type') == "application/x-x509-ca-cert"
|
||||
assert r.text == buf
|
||||
|
||||
r = requests.get("http://ca.example.lan/api/certificate")
|
||||
assert r.status_code == 200
|
||||
assert r.headers.get('content-type') == "application/x-x509-ca-cert"
|
||||
@@ -107,7 +116,7 @@ def test_cli_setup_authority():
|
||||
|
||||
# Check that we can retrieve empty CRL
|
||||
assert authority.export_crl(), "Failed to export CRL"
|
||||
r = client().simulate_get("/api/revoked/")
|
||||
r = requests.get("http://ca.example.lan/api/revoked/")
|
||||
assert r.status_code == 200, r.text
|
||||
|
||||
|
||||
@@ -116,13 +125,10 @@ def test_cli_setup_authority():
|
||||
assert not result.exception, result.output
|
||||
|
||||
# Test static
|
||||
r = client().simulate_get("/nonexistant.html")
|
||||
assert r.status_code == 404, r.text
|
||||
r = client().simulate_get("/index.html")
|
||||
assert r.status_code == 200, r.text
|
||||
r = requests.get("http://ca.example.lan/index.html")
|
||||
assert r.status_code == 200, "server responded %s, logs say %s" % (r.text, open("/var/log/certidude.log").read())
|
||||
|
||||
assert r.status_code == 200, r.text # if this breaks certidude serve has no read access to static folder
|
||||
r = requests.get("http://ca.example.lan/nonexistant.html")
|
||||
assert r.status_code == 404, r.text
|
||||
|
||||
# Test request submission
|
||||
buf = generate_csr(cn=u"test")
|
||||
@@ -141,7 +147,7 @@ def test_cli_setup_authority():
|
||||
assert r.status_code == 202 # already exists, same keypair so it's ok
|
||||
|
||||
r = client().simulate_post("/api/request/",
|
||||
query_string="wait=1",
|
||||
query_string="wait=true",
|
||||
body=buf,
|
||||
headers={"content-type":"application/pkcs10"})
|
||||
assert r.status_code == 303 # redirect to long poll
|
||||
@@ -165,7 +171,8 @@ def test_cli_setup_authority():
|
||||
r = client().simulate_get("/api/request/nonexistant/", headers={"Accept":"application/json"})
|
||||
assert r.status_code == 404 # nonexistant common names
|
||||
|
||||
r = client().simulate_post("/api/request/", query_string="autosign=1",
|
||||
r = client().simulate_post("/api/request/",
|
||||
query_string="autosign=1",
|
||||
body=buf,
|
||||
headers={"content-type":"application/pkcs10"})
|
||||
assert r.status_code == 200 # autosign successful
|
||||
@@ -176,17 +183,16 @@ def test_cli_setup_authority():
|
||||
# Test command line interface
|
||||
result = runner.invoke(cli, ['list', '-srv'])
|
||||
assert not result.exception, result.output
|
||||
result = runner.invoke(cli, ['sign', 'test', '-o'])
|
||||
assert not result.exception, result.output
|
||||
result = runner.invoke(cli, ['revoke', 'test'])
|
||||
assert not result.exception, result.output
|
||||
authority.generate_ovpn_bundle(u"test2")
|
||||
authority.generate_pkcs12_bundle(u"test3")
|
||||
result = runner.invoke(cli, ['list', '-srv'])
|
||||
assert not result.exception, result.output
|
||||
result = runner.invoke(cli, ['cron'])
|
||||
assert not result.exception, result.output
|
||||
|
||||
# Some commands have side effects (setuid, setgid etc)
|
||||
child_pid = os.fork()
|
||||
if not child_pid:
|
||||
result = runner.invoke(cli, ['sign', 'test', '-o'])
|
||||
assert not result.exception, result.output
|
||||
return
|
||||
else:
|
||||
os.waitpid(child_pid, 0)
|
||||
assert os.getuid() == 0 and os.getgid() == 0, "Serve dropped permissions incorrectly!"
|
||||
|
||||
# Test session API call
|
||||
r = client().simulate_get("/api/", headers={"Authorization":usertoken})
|
||||
@@ -203,120 +209,110 @@ def test_cli_setup_authority():
|
||||
r = client().simulate_get("/api/signed/nonexistant/")
|
||||
assert r.status_code == 404, r.text
|
||||
|
||||
r = client().simulate_get("/api/signed/test2/")
|
||||
r = client().simulate_get("/api/signed/test/")
|
||||
assert r.status_code == 200, r.text
|
||||
assert r.headers.get('content-type') == "application/x-pem-file"
|
||||
|
||||
r = client().simulate_get("/api/signed/test2/", headers={"Accept":"application/json"})
|
||||
r = client().simulate_get("/api/signed/test/", headers={"Accept":"application/json"})
|
||||
assert r.status_code == 200, r.text
|
||||
assert r.headers.get('content-type') == "application/json"
|
||||
|
||||
r = client().simulate_get("/api/signed/test2/", headers={"Accept":"text/plain"})
|
||||
r = client().simulate_get("/api/signed/test/", headers={"Accept":"text/plain"})
|
||||
assert r.status_code == 415, r.text
|
||||
|
||||
# Test revocations API call
|
||||
r = client().simulate_get("/api/revoked/",
|
||||
headers={"Accept":"application/x-pem-file"})
|
||||
assert r.status_code == 200, r.text
|
||||
assert r.headers.get('content-type') == "application/x-pem-file"
|
||||
|
||||
r = requests.get("http://ca.example.lan/api/revoked/",
|
||||
headers={"Accept":"application/x-pem-file"})
|
||||
assert r.status_code == 200, "Server responded with %s, server logs say %s" % (r.text, open("/var/log/certidude.log").read())
|
||||
assert r.status_code == 200, r.text # if this breaks certidude serve has no access to signer socket
|
||||
assert r.headers.get('content-type') == "application/x-pem-file"
|
||||
|
||||
r = client().simulate_get("/api/revoked/")
|
||||
assert r.status_code == 200, r.text
|
||||
assert r.headers.get('content-type') == "application/x-pkcs7-crl"
|
||||
|
||||
r = requests.get("http://ca.example.lan/api/revoked/")
|
||||
assert r.status_code == 200, r.text
|
||||
assert r.headers.get('content-type') == "application/x-pkcs7-crl"
|
||||
|
||||
r = client().simulate_get("/api/revoked/",
|
||||
headers={"Accept":"text/plain"})
|
||||
assert r.status_code == 415, r.text
|
||||
|
||||
r = client().simulate_get("/api/revoked/", query_string="wait=true",
|
||||
r = client().simulate_get("/api/revoked/",
|
||||
query_string="wait=true",
|
||||
headers={"Accept":"application/x-pem-file"})
|
||||
assert r.status_code == 303, r.text
|
||||
|
||||
# Test attribute fetching API call
|
||||
r = client().simulate_get("/api/signed/test2/attr/")
|
||||
r = client().simulate_get("/api/signed/test/attr/")
|
||||
assert r.status_code == 403, r.text
|
||||
r = client().simulate_get("/api/signed/test2/lease/", headers={"Authorization":admintoken})
|
||||
r = client().simulate_get("/api/signed/test/lease/", headers={"Authorization":admintoken})
|
||||
assert r.status_code == 404, r.text
|
||||
|
||||
# Insert lease as if VPN gateway had submitted it
|
||||
path, _, _ = authority.get_signed("test2")
|
||||
path, _, _ = authority.get_signed("test")
|
||||
from xattr import setxattr
|
||||
setxattr(path, "user.lease.address", b"127.0.0.1")
|
||||
setxattr(path, "user.lease.last_seen", b"random")
|
||||
r = client().simulate_get("/api/signed/test2/attr/")
|
||||
r = client().simulate_get("/api/signed/test/attr/")
|
||||
assert r.status_code == 200, r.text
|
||||
|
||||
# Test lease retrieval
|
||||
r = client().simulate_get("/api/signed/test2/lease/")
|
||||
r = client().simulate_get("/api/signed/test/lease/")
|
||||
assert r.status_code == 401, r.text
|
||||
r = client().simulate_get("/api/signed/test2/lease/", headers={"Authorization":usertoken})
|
||||
r = client().simulate_get("/api/signed/test/lease/", headers={"Authorization":usertoken})
|
||||
assert r.status_code == 403, r.text
|
||||
r = client().simulate_get("/api/signed/test2/lease/", headers={"Authorization":admintoken})
|
||||
r = client().simulate_get("/api/signed/test/lease/", headers={"Authorization":admintoken})
|
||||
assert r.status_code == 200, r.text
|
||||
assert r.headers.get('content-type') == "application/json; charset=UTF-8"
|
||||
|
||||
|
||||
# Tags should not be visible anonymously
|
||||
r = client().simulate_get("/api/signed/test2/tag/")
|
||||
r = client().simulate_get("/api/signed/test/tag/")
|
||||
assert r.status_code == 401, r.text
|
||||
r = client().simulate_get("/api/signed/test2/tag/", headers={"Authorization":usertoken})
|
||||
r = client().simulate_get("/api/signed/test/tag/", headers={"Authorization":usertoken})
|
||||
assert r.status_code == 403, r.text
|
||||
r = client().simulate_get("/api/signed/test2/tag/", headers={"Authorization":admintoken})
|
||||
r = client().simulate_get("/api/signed/test/tag/", headers={"Authorization":admintoken})
|
||||
assert r.status_code == 200, r.text
|
||||
|
||||
# Tags can be added only by admin
|
||||
r = client().simulate_post("/api/signed/test2/tag/")
|
||||
r = client().simulate_post("/api/signed/test/tag/")
|
||||
assert r.status_code == 401, r.text
|
||||
r = client().simulate_post("/api/signed/test2/tag/",
|
||||
r = client().simulate_post("/api/signed/test/tag/",
|
||||
headers={"Authorization":usertoken})
|
||||
assert r.status_code == 403, r.text
|
||||
r = client().simulate_post("/api/signed/test2/tag/",
|
||||
r = client().simulate_post("/api/signed/test/tag/",
|
||||
body="key=other&value=something",
|
||||
headers={"content-type": "application/x-www-form-urlencoded", "Authorization":admintoken})
|
||||
assert r.status_code == 200, r.text
|
||||
|
||||
# Tags can be overwritten only by admin
|
||||
r = client().simulate_put("/api/signed/test2/tag/other/")
|
||||
r = client().simulate_put("/api/signed/test/tag/other/")
|
||||
assert r.status_code == 401, r.text
|
||||
r = client().simulate_put("/api/signed/test2/tag/other/",
|
||||
r = client().simulate_put("/api/signed/test/tag/other/",
|
||||
headers={"Authorization":usertoken})
|
||||
assert r.status_code == 403, r.text
|
||||
r = client().simulate_put("/api/signed/test2/tag/other/",
|
||||
r = client().simulate_put("/api/signed/test/tag/other/",
|
||||
body="value=else",
|
||||
headers={"content-type": "application/x-www-form-urlencoded", "Authorization":admintoken})
|
||||
assert r.status_code == 200, r.text
|
||||
|
||||
# Tags can be deleted only by admin
|
||||
r = client().simulate_delete("/api/signed/test2/tag/else/")
|
||||
r = client().simulate_delete("/api/signed/test/tag/else/")
|
||||
assert r.status_code == 401, r.text
|
||||
r = client().simulate_delete("/api/signed/test2/tag/else/",
|
||||
r = client().simulate_delete("/api/signed/test/tag/else/",
|
||||
headers={"Authorization":usertoken})
|
||||
assert r.status_code == 403, r.text
|
||||
r = client().simulate_delete("/api/signed/test2/tag/else/",
|
||||
r = client().simulate_delete("/api/signed/test/tag/else/",
|
||||
headers={"content-type": "application/x-www-form-urlencoded", "Authorization":admintoken})
|
||||
assert r.status_code == 200, r.text
|
||||
|
||||
|
||||
# Test revocation
|
||||
r = client().simulate_delete("/api/signed/test2/")
|
||||
r = client().simulate_delete("/api/signed/test/")
|
||||
assert r.status_code == 401, r.text
|
||||
r = client().simulate_delete("/api/signed/test2/",
|
||||
r = client().simulate_delete("/api/signed/test/",
|
||||
headers={"Authorization":usertoken})
|
||||
assert r.status_code == 403, r.text
|
||||
r = client().simulate_delete("/api/signed/test2/",
|
||||
r = client().simulate_delete("/api/signed/test/",
|
||||
headers={"Authorization":admintoken})
|
||||
assert r.status_code == 200, r.text
|
||||
result = runner.invoke(cli, ['revoke', 'test3'])
|
||||
assert not result.exception, result.output
|
||||
|
||||
|
||||
# Log can be read only by admin
|
||||
@@ -351,7 +347,8 @@ def test_cli_setup_authority():
|
||||
query_string="u=userbot&t=1493184342&c=ac9b71421d5741800c5a4905b20c1072594a2df863e60ba836464888786bf2a6",
|
||||
headers={"content-type": "application/x-www-form-urlencoded", "Authorization":admintoken})
|
||||
assert r2.status_code == 403 # invalid checksum
|
||||
r2 = client().simulate_get("/api/token/", query_string=r.content,
|
||||
r2 = client().simulate_get("/api/token/",
|
||||
query_string=r.content,
|
||||
headers={"User-Agent":"Mozilla/5.0 (X11; Fedora; Linux x86_64) "
|
||||
"AppleWebKit/537.36 (KHTML, like Gecko) Chrome/57.0.2987.133 Safari/537.36"})
|
||||
assert r2.status_code == 200 # token consumed by anyone on Fedora
|
||||
@@ -362,7 +359,6 @@ def test_cli_setup_authority():
|
||||
assert r2.status_code == 200 # token consumed by anyone on unknown device
|
||||
assert r2.headers.get('content-type') == "application/x-pkcs12"
|
||||
|
||||
|
||||
result = runner.invoke(cli, ['setup', 'openvpn', 'server', "-cn", "vpn.example.lan", "ca.example.lan"])
|
||||
assert not result.exception, result.output
|
||||
|
||||
@@ -379,9 +375,39 @@ def test_cli_setup_authority():
|
||||
# pregen dhparam
|
||||
result = runner.invoke(cli, ["request", "--no-wait"])
|
||||
assert not result.exception, "server responded %s, server logs say %s" % (result.output, open("/var/log/certidude.log").read())
|
||||
result = runner.invoke(cli, ['sign', 'vpn.example.lan'])
|
||||
assert not result.exception, result.output
|
||||
|
||||
child_pid = os.fork()
|
||||
if not child_pid:
|
||||
result = runner.invoke(cli, ['sign', 'vpn.example.lan'])
|
||||
assert not result.exception, result.output
|
||||
return
|
||||
else:
|
||||
os.waitpid(child_pid, 0)
|
||||
|
||||
result = runner.invoke(cli, ["request", "--no-wait"])
|
||||
assert not result.exception, result.output
|
||||
result = runner.invoke(cli, ["request", "--renew"])
|
||||
assert not result.exception, result.output
|
||||
|
||||
# Test revocation on command-line
|
||||
child_pid = os.fork()
|
||||
if not child_pid:
|
||||
result = runner.invoke(cli, ['revoke', 'vpn.example.lan'])
|
||||
assert not result.exception, result.output
|
||||
return
|
||||
else:
|
||||
os.waitpid(child_pid, 0)
|
||||
|
||||
result = runner.invoke(cli, ['list', '-srv'])
|
||||
assert not result.exception, result.output
|
||||
result = runner.invoke(cli, ['cron'])
|
||||
assert not result.exception, result.output
|
||||
|
||||
# Shut down signer
|
||||
assert authority.signer_exec("exit") == "ok"
|
||||
|
||||
# Shut down server
|
||||
with open("/run/certidude/server.pid") as fh:
|
||||
os.kill(int(fh.read()), 1)
|
||||
|
||||
os.waitpid(server_pid, 0)
|
||||
|
Reference in New Issue
Block a user