1
0
mirror of https://github.com/laurivosandi/certidude synced 2024-12-23 00:25:18 +00:00

Merge pull request #5 from plaes/tests

Add basic test support via travis
This commit is contained in:
Lauri Võsandi 2015-10-02 20:55:57 +03:00
commit 34823d6739
8 changed files with 125 additions and 35 deletions

1
.gitignore vendored
View File

@ -2,6 +2,7 @@
__pycache__/ __pycache__/
*.py[cod] *.py[cod]
.goutputstream* .goutputstream*
*.swp
# C extensions # C extensions
*.so *.so

16
.travis.yml Normal file
View File

@ -0,0 +1,16 @@
sudo: false
language: python
python:
#- "2.6"
#- "2.7"
#- "3.2"
- "3.3"
- "3.4"
- "3.5"
install:
- pip install -r requirements.txt
- pip install --editable .
script: py.test
cache:
directories:
- $HOME/.cache/pip

View File

@ -4,7 +4,6 @@
import asyncore import asyncore
import click import click
import logging import logging
import netifaces
import os import os
import pwd import pwd
import re import re
@ -41,8 +40,6 @@ assert hasattr(crypto.X509Req(), "get_extensions"), "You're running too old vers
# keyUsage, extendedKeyUsage - https://www.openssl.org/docs/apps/x509v3_config.html # keyUsage, extendedKeyUsage - https://www.openssl.org/docs/apps/x509v3_config.html
# strongSwan key paths - https://wiki.strongswan.org/projects/1/wiki/SimpleCA # strongSwan key paths - https://wiki.strongswan.org/projects/1/wiki/SimpleCA
config = CertificateAuthorityConfig()
# Parse command-line argument defaults from environment # Parse command-line argument defaults from environment
HOSTNAME = socket.gethostname() HOSTNAME = socket.gethostname()
USERNAME = os.environ.get("USER") USERNAME = os.environ.get("USER")
@ -61,9 +58,13 @@ if os.getuid() >= 1000:
else: else:
FIRST_NAME = gecos FIRST_NAME = gecos
DEFAULT_ROUTE, PRIMARY_INTERFACE = netifaces.gateways().get("default").get(2)
PRIMARY_ALIASES = netifaces.ifaddresses(PRIMARY_INTERFACE).get(2) def load_config():
PRIMARY_ADDRESS = PRIMARY_ALIASES[0].get("addr") path = os.getenv('CERTIDUDE_CONF')
if path and os.path.isfile(path):
return CertificateAuthorityConfig(path)
return CertificateAuthorityConfig()
@click.command("spawn", help="Run privilege isolated signer processes") @click.command("spawn", help="Run privilege isolated signer processes")
@click.option("-k", "--kill", default=False, is_flag=True, help="Kill previous instances") @click.option("-k", "--kill", default=False, is_flag=True, help="Kill previous instances")
@ -100,6 +101,7 @@ def certidude_spawn(kill, no_interaction):
os.system("mknod -m 444 %s c 1 9" % os.path.join(chroot_dir, "dev", "urandom")) os.system("mknod -m 444 %s c 1 9" % os.path.join(chroot_dir, "dev", "urandom"))
ca_loaded = False ca_loaded = False
config = load_config()
for ca in config.all_authorities(): for ca in config.all_authorities():
socket_path = os.path.join(signer_dir, ca.slug + ".sock") socket_path = os.path.join(signer_dir, ca.slug + ".sock")
pidfile_path = os.path.join(signer_dir, ca.slug + ".pid") pidfile_path = os.path.join(signer_dir, ca.slug + ".pid")
@ -175,7 +177,7 @@ def certidude_setup_client(quiet, **kwargs):
@click.option("--org-unit", "-ou", help="Organizational unit") @click.option("--org-unit", "-ou", help="Organizational unit")
@click.option("--email-address", "-m", default=EMAIL, help="E-mail associated with the request, '%s' by default" % EMAIL) @click.option("--email-address", "-m", default=EMAIL, help="E-mail associated with the request, '%s' by default" % EMAIL)
@click.option("--subnet", "-s", default="192.168.33.0/24", type=ip_network, help="OpenVPN subnet, 192.168.33.0/24 by default") @click.option("--subnet", "-s", default="192.168.33.0/24", type=ip_network, help="OpenVPN subnet, 192.168.33.0/24 by default")
@click.option("--local", "-l", default=PRIMARY_ADDRESS, help="OpenVPN listening address, %s" % PRIMARY_ADDRESS) @click.option("--local", "-l", default="127.0.0.1", help="OpenVPN listening address, defaults to 127.0.0.1")
@click.option("--port", "-p", default=1194, type=click.IntRange(1,60000), help="OpenVPN listening port, 1194 by default") @click.option("--port", "-p", default=1194, type=click.IntRange(1,60000), help="OpenVPN listening port, 1194 by default")
@click.option('--proto', "-t", default="udp", type=click.Choice(['udp', 'tcp']), help="OpenVPN transport protocol, UDP by default") @click.option('--proto', "-t", default="udp", type=click.Choice(['udp', 'tcp']), help="OpenVPN transport protocol, UDP by default")
@click.option("--route", "-r", type=ip_network, multiple=True, help="Subnets to advertise via this connection, multiple allowed") @click.option("--route", "-r", type=ip_network, multiple=True, help="Subnets to advertise via this connection, multiple allowed")
@ -288,10 +290,10 @@ def certidude_setup_openvpn_client(url, config, email_address, common_name, org_
@click.argument("url") @click.argument("url")
@click.option("--common-name", "-cn", default=HOSTNAME, help="Common name, %s by default" % HOSTNAME) @click.option("--common-name", "-cn", default=HOSTNAME, help="Common name, %s by default" % HOSTNAME)
@click.option("--org-unit", "-ou", help="Organizational unit") @click.option("--org-unit", "-ou", help="Organizational unit")
@click.option("--fqdn", "-f", default=HOSTNAME, help="Fully qualified hostname, %s by default" % PRIMARY_ADDRESS) @click.option("--fqdn", "-f", default=HOSTNAME, help="Fully qualified hostname, %s by default" % HOSTNAME)
@click.option("--email-address", "-m", default=EMAIL, help="E-mail associated with the request, %s by default" % EMAIL) @click.option("--email-address", "-m", default=EMAIL, help="E-mail associated with the request, %s by default" % EMAIL)
@click.option("--subnet", "-s", default="192.168.33.0/24", type=ip_network, help="IPsec virtual subnet, 192.168.33.0/24 by default") @click.option("--subnet", "-s", default="192.168.33.0/24", type=ip_network, help="IPsec virtual subnet, 192.168.33.0/24 by default")
@click.option("--local", "-l", default=PRIMARY_ADDRESS, help="IPsec gateway address, %s" % PRIMARY_ADDRESS) @click.option("--local", "-l", default="127.0.0.1", help="IPsec gateway address, defaults to 127.0.0.1")
@click.option("--route", "-r", type=ip_network, multiple=True, help="Subnets to advertise via this connection, multiple allowed") @click.option("--route", "-r", type=ip_network, multiple=True, help="Subnets to advertise via this connection, multiple allowed")
@click.option("--config", "-o", @click.option("--config", "-o",
default="/etc/ipsec.conf", default="/etc/ipsec.conf",
@ -449,7 +451,6 @@ def certidude_setup_production(username, hostname, push_server, nginx_config, uw
@click.command("authority", help="Set up Certificate Authority in a directory") @click.command("authority", help="Set up Certificate Authority in a directory")
@click.option("--group", "-g", default="certidude", help="Group for file permissions, certidude by default")
@click.option("--parent", "-p", help="Parent CA, none by default") @click.option("--parent", "-p", help="Parent CA, none by default")
@click.option("--common-name", "-cn", default=HOSTNAME, help="Common name, hostname by default") @click.option("--common-name", "-cn", default=HOSTNAME, help="Common name, hostname by default")
@click.option("--country", "-c", default="ee", help="Country, Estonia by default") @click.option("--country", "-c", default="ee", help="Country, Estonia by default")
@ -467,16 +468,18 @@ def certidude_setup_production(username, hostname, push_server, nginx_config, uw
@click.option("--inbox", default="imap://user:pass@host:port/INBOX", help="Inbound e-mail server") @click.option("--inbox", default="imap://user:pass@host:port/INBOX", help="Inbound e-mail server")
@click.option("--outbox", default="smtp://localhost", help="Outbound e-mail server") @click.option("--outbox", default="smtp://localhost", help="Outbound e-mail server")
@click.argument("directory") @click.argument("directory")
def certidude_setup_authority(parent, country, state, locality, organization, organizational_unit, common_name, directory, certificate_lifetime, authority_lifetime, revocation_list_lifetime, pkcs11, group, crl_distribution_url, ocsp_responder_url, email_address, inbox, outbox): def certidude_setup_authority(parent, country, state, locality, organization, organizational_unit, common_name, directory, certificate_lifetime, authority_lifetime, revocation_list_lifetime, pkcs11, crl_distribution_url, ocsp_responder_url, email_address, inbox, outbox):
logging.info("Creating certificate authority in %s", directory)
_, _, uid, gid, gecos, root, shell = pwd.getpwnam(group)
os.setgid(gid)
slug = os.path.basename(directory[:-1] if directory.endswith('/') else directory) slug = os.path.basename(directory[:-1] if directory.endswith('/') else directory)
if not slug: if not slug:
raise ValueError("Please supply proper target path") raise click.ClickException("Please supply proper target path")
# Make sure slug is valid
if not re.match(r"^[_a-zA-Z0-9]+$", slug):
raise click.ClickException("CA name can contain only alphanumeric and '_' characters")
click.echo("CA configuration files are saved to: {}".format(os.path.abspath(slug))) if os.path.lexists(directory):
raise click.ClickException("Output directory {} already exists.".format(directory))
click.echo("CA configuration files are saved to: {}".format(directory))
click.echo("Generating 4096-bit RSA key...") click.echo("Generating 4096-bit RSA key...")
@ -553,10 +556,10 @@ def certidude_setup_authority(parent, country, state, locality, organization, or
click.echo("Signing %s..." % subject2dn(ca.get_subject())) click.echo("Signing %s..." % subject2dn(ca.get_subject()))
# openssl x509 -in ca_crt.pem -outform DER | sha1sum # openssl x509 -in ca_crt.pem -outform DER | sha256sum
# openssl x509 -fingerprint -in ca_crt.pem # openssl x509 -fingerprint -in ca_crt.pem
ca.sign(key, "sha1") ca.sign(key, "sha256")
os.umask(0o027) os.umask(0o027)
if not os.path.exists(directory): if not os.path.exists(directory):
@ -577,7 +580,6 @@ def certidude_setup_authority(parent, country, state, locality, organization, or
with open(ca_crt, "wb") as fh: with open(ca_crt, "wb") as fh:
fh.write(crypto.dump_certificate(crypto.FILETYPE_PEM, ca)) fh.write(crypto.dump_certificate(crypto.FILETYPE_PEM, ca))
os.umask(0o077) os.umask(0o077)
with open(ca_key, "wb") as fh: with open(ca_key, "wb") as fh:
fh.write(crypto.dump_privatekey(crypto.FILETYPE_PEM, key)) fh.write(crypto.dump_privatekey(crypto.FILETYPE_PEM, key))
@ -588,7 +590,6 @@ def certidude_setup_authority(parent, country, state, locality, organization, or
click.echo("You need to copy the contents of the 'openssl.cnf.example'") click.echo("You need to copy the contents of the 'openssl.cnf.example'")
click.echo("to system-wide OpenSSL configuration file, usually located") click.echo("to system-wide OpenSSL configuration file, usually located")
click.echo("at /etc/ssl/openssl.cnf") click.echo("at /etc/ssl/openssl.cnf")
click.echo()
click.echo() click.echo()
click.echo("Use following commands to inspect the newly created files:") click.echo("Use following commands to inspect the newly created files:")
@ -644,7 +645,16 @@ def certidude_list(ca, show_key_type, show_extensions, show_path):
click.echo(" | | Key usage: " + j.key_usage) click.echo(" | | Key usage: " + j.key_usage)
click.echo(" | |") click.echo(" | |")
for ca in config.all_authorities(): config = load_config()
wanted_list = None
if ca:
missing = list(set(ca) - set(config.ca_list))
if missing:
raise click.NoSuchOption(option_name='', message="Unable to find certificate authority.", possibilities=config.ca_list)
wanted_list = ca
for ca in config.all_authorities(wanted_list):
click.echo("Certificate authority " + click.style(ca.slug, fg="blue")) click.echo("Certificate authority " + click.style(ca.slug, fg="blue"))
# if ca.certificate.email_address: # if ca.certificate.email_address:
# click.echo(" \u2709 %s" % ca.certificate.email_address) # click.echo(" \u2709 %s" % ca.certificate.email_address)
@ -705,11 +715,13 @@ def certidude_list(ca, show_key_type, show_extensions, show_path):
@click.command("list", help="List Certificate Authorities") @click.command("list", help="List Certificate Authorities")
@click.argument("ca") @click.argument("ca")
@config.pop_certificate_authority() #@config.pop_certificate_authority()
def cert_list(ca): def cert_list(ca):
mapping = {} mapping = {}
config = load_config()
click.echo("Listing certificates for: %s" % ca.certificate.subject.CN) click.echo("Listing certificates for: %s" % ca.certificate.subject.CN)
for serial, reason, timestamp in ca.get_revoked(): for serial, reason, timestamp in ca.get_revoked():
@ -732,6 +744,7 @@ def cert_list(ca):
@click.option("--overwrite", "-o", default=False, is_flag=True, help="Revoke valid certificate with same CN") @click.option("--overwrite", "-o", default=False, is_flag=True, help="Revoke valid certificate with same CN")
@click.option("--lifetime", "-l", help="Lifetime") @click.option("--lifetime", "-l", help="Lifetime")
def certidude_sign(common_name, overwrite, lifetime): def certidude_sign(common_name, overwrite, lifetime):
config = load_config()
def iterate(): def iterate():
for ca in config.all_authorities(): for ca in config.all_authorities():
for request in ca.get_requests(): for request in ca.get_requests():

View File

@ -1,11 +1,8 @@
import click import click
import logging
import netifaces
import os import os
import urllib.request import urllib.request
from certidude.wrappers import Certificate, Request from certidude.wrappers import Certificate, Request
from certidude.signer import SignServer
from OpenSSL import crypto from OpenSSL import crypto
def expand_paths(): def expand_paths():

View File

@ -107,14 +107,29 @@ class CertificateAuthorityConfig(object):
authority = CertificateAuthority(slug, **dirs) authority = CertificateAuthority(slug, **dirs)
return authority return authority
def all_authorities(self):
for section in self._config: def all_authorities(self, wanted=None):
if section.startswith("CA_"): for ca in self.ca_list:
if wanted and ca not in wanted:
continue
try: try:
yield self.instantiate_authority(section[3:]) yield self.instantiate_authority(ca)
except FileNotFoundError: except FileNotFoundError:
pass pass
@property
def ca_list(self):
"""
Returns sorted list of CA-s defined in the configuration file.
"""
l = [s[3:] for s in self._config if s.startswith("CA_")]
# Sanity check for duplicates (although ConfigParser fails earlier)
if len(l) != len(set(l)):
raise ValueError
return sorted(l)
def pop_certificate_authority(self): def pop_certificate_authority(self):
def wrapper(func): def wrapper(func):
def wrapped(*args, **kwargs): def wrapped(*args, **kwargs):

View File

@ -7,7 +7,6 @@ idna==2.0
Jinja2==2.8 Jinja2==2.8
ldap3==0.9.8.8 ldap3==0.9.8.8
MarkupSafe==0.23 MarkupSafe==0.23
netifaces==0.10.4
pyasn1==0.1.8 pyasn1==0.1.8
pycountry==1.14 pycountry==1.14
pycparser==2.14 pycparser==2.14

View File

@ -21,7 +21,6 @@ setup(
"click", "click",
"falcon", "falcon",
"jinja2", "jinja2",
"netifaces",
"pyopenssl", "pyopenssl",
"pycountry", "pycountry",
"humanize", "humanize",

50
tests/test_cli.py Normal file
View File

@ -0,0 +1,50 @@
import os
import pwd
import pytest
from click.testing import CliRunner
from certidude.cli import entry_point as cli
runner = CliRunner()
def user_check(name='certidude'):
try:
pwd.getpwnam(name)
return False
except KeyError:
pass
return True
def test_cli_setup_authority():
# Authority setup
# TODO: parent, common-name, country, state, locality
# {authority,certificate,revocation-list}-lifetime
# organization, organizational-unit
# pkcs11
# {crl-distribution,ocsp-responder}-url
# email-address
# inbox, outbox
with runner.isolated_filesystem():
result = runner.invoke(cli, ['setup', 'authority', 'ca'])
assert not result.exception
# Check whether required files were generated
for f in ('ca_key.pem', 'ca_crt.pem', 'ca_crl.pem',
'serial', 'openssl.cnf.example'):
assert os.path.isfile(os.path.join('ca', f))
for d in ('requests', 'revoked', 'signed'):
assert os.path.isdir(os.path.join('ca', d))
def test_cli_setup_authority_invalid_name():
with runner.isolated_filesystem():
result = runner.invoke(cli, ['setup', 'authority'])
assert result.exception
result = runner.invoke(cli, ['setup', 'authority', '""'])
assert result.exception
def test_cli_setup_authority_overwrite():
with runner.isolated_filesystem():
os.mkdir('foo')
result = runner.invoke(cli, ['setup', 'authority', 'foo'])
assert result.exception