1
0
mirror of https://github.com/laurivosandi/certidude synced 2024-12-22 16:25:17 +00:00

Add AKID and SKID

This commit is contained in:
Lauri Võsandi 2016-03-29 08:47:43 +03:00
parent ff71ca42d7
commit acc0e29109
2 changed files with 98 additions and 69 deletions

View File

@ -863,7 +863,15 @@ def certidude_setup_authority(parent, country, state, locality, organization, or
crypto.X509Extension(
b"subjectAltName",
False,
"DNS: %s, email: %s" % (common_name.encode("ascii"), email_address.encode("ascii")))
(u"DNS: %s, email: %s" % (common_name, email_address)).encode("ascii"))
])
ca.add_extensions([
crypto.X509Extension(
b"authorityKeyIdentifier",
False,
b"keyid:always",
issuer = ca)
])
if ocsp_responder_url:

View File

@ -7,25 +7,21 @@ import os
import asyncore
import asynchat
from certidude import constants, config
from datetime import datetime
from OpenSSL import crypto
"""
Signer processes are spawned per private key.
Private key should only be readable by root.
Signer process starts up as root, reads private key,
drops privileges and awaits for opcodes (sign-request, export-crl) at UNIX domain socket
under /run/certidude/signer/
The main motivation behind the concept is to mitigate private key leaks
by confining it to a separate process.
from cryptography import x509
from cryptography.hazmat.backends import default_backend
from cryptography.hazmat.primitives import hashes, serialization
from cryptography.hazmat.primitives.serialization import Encoding
from datetime import datetime, timedelta
from cryptography.x509.oid import NameOID, ExtendedKeyUsageOID
import random
Note that signer process uses basicConstraints, keyUsage and extendedKeyUsage
attributes from openssl.cnf via CertificateAuthority wrapper class.
Hence it's possible only to sign such certificates via the signer process,
making it hard to take advantage of hacked Certidude server, eg. being able to sign
certificate authoirty (basicConstraints=CA:TRUE) or
TLS server certificates (extendedKeyUsage=serverAuth).
"""
DN_WHITELIST = NameOID.COMMON_NAME, NameOID.GIVEN_NAME, NameOID.SURNAME, \
NameOID.EMAIL_ADDRESS
SERIAL_MIN = 0x1000000000000000000000000000000000000000
SERIAL_MAX = 0xffffffffffffffffffffffffffffffffffffffff
def raw_sign(private_key, ca_cert, request, basic_constraints, lifetime, key_usage=None, extended_key_usage=None):
"""
@ -42,15 +38,20 @@ def raw_sign(private_key, ca_cert, request, basic_constraints, lifetime, key_usa
# Set issuer
cert.set_issuer(ca_cert.get_subject())
# Copy attributes from CA
if ca_cert.get_subject().C:
cert.get_subject().C = ca_cert.get_subject().C
if ca_cert.get_subject().ST:
cert.get_subject().ST = ca_cert.get_subject().ST
if ca_cert.get_subject().L:
cert.get_subject().L = ca_cert.get_subject().L
if ca_cert.get_subject().O:
cert.get_subject().O = ca_cert.get_subject().O
# Set SKID and AKID extensions
cert.add_extensions([
crypto.X509Extension(
b"subjectKeyIdentifier",
False,
b"hash",
subject = cert),
crypto.X509Extension(
b"authorityKeyIdentifier",
False,
b"keyid:always",
issuer = ca_cert)
])
# Copy attributes from request
cert.get_subject().CN = request.get_subject().CN
@ -98,10 +99,8 @@ def raw_sign(private_key, ca_cert, request, basic_constraints, lifetime, key_usa
cert.gmtime_adj_notBefore(-3600)
cert.gmtime_adj_notAfter(lifetime * 24 * 60 * 60)
# Generate serial from 0x10000000000000000000 to 0xffffffffffffffffffff
cert.set_serial_number(random.randint(
0x1000000000000000000000000000000000000000,
0xffffffffffffffffffffffffffffffffffffffff))
# Generate random serial
cert.set_serial_number(random.randint(SERIAL_MIN, SERIAL_MAX))
cert.sign(private_key, 'sha256')
return cert
@ -114,57 +113,82 @@ class SignHandler(asynchat.async_chat):
self.server = server
def parse_command(self, cmd, body=""):
now = datetime.utcnow()
if cmd == "export-crl":
"""
Generate CRL object based on certificate serial number and revocation timestamp
"""
crl = crypto.CRL()
builder = x509.CertificateRevocationListBuilder(
).last_update(now
).next_update(now + timedelta(days=1)
).issuer_name(self.server.certificate.issuer
).add_extension(
x509.AuthorityKeyIdentifier.from_issuer_public_key(
self.server.certificate.public_key()), False)
if body:
for line in body.split("\n"):
serial_number, timestamp = line.split(":")
# TODO: Assert serial against regex
revocation = crypto.Revoked()
revocation.set_rev_date(datetime.utcfromtimestamp(int(timestamp)).strftime("%Y%m%d%H%M%SZ").encode("ascii"))
revocation.set_reason(b"keyCompromise")
revocation.set_serial(serial_number.encode("ascii"))
crl.add_revoked(revocation)
revocation = x509.RevokedCertificateBuilder(
).serial_number(int(serial_number, 16)
).revocation_date(datetime.utcfromtimestamp(int(timestamp))
).add_extension(x509.CRLReason(x509.ReasonFlags.key_compromise), False
).build(default_backend())
builder = builder.add_revoked_certificate(revocation)
self.send(crl.export(
self.server.certificate,
crl = builder.sign(
self.server.private_key,
crypto.FILETYPE_PEM,
config.REVOCATION_LIST_LIFETIME))
hashes.SHA512(),
default_backend())
self.send(crl.public_bytes(Encoding.PEM))
elif cmd == "ocsp-request":
NotImplemented # TODO: Implement OCSP
elif cmd == "sign-request":
request = crypto.load_certificate_request(crypto.FILETYPE_PEM, body)
request = x509.load_pem_x509_csr(body, default_backend())
subject = x509.Name([n for n in request.subject if n.oid in DN_WHITELIST])
for e in request.get_extensions():
key = e.get_short_name().decode("ascii")
if key not in constants.EXTENSION_WHITELIST:
raise ValueError("Certificte Signing Request contains extension '%s' which is not whitelisted" % key)
cert = x509.CertificateBuilder(
).subject_name(subject
).serial_number(random.randint(SERIAL_MIN, SERIAL_MAX)
).issuer_name(self.server.certificate.issuer
).public_key(request.public_key()
).not_valid_before(now - timedelta(hours=1)
).not_valid_after(now + timedelta(days=config.CERTIFICATE_LIFETIME)
).add_extension(x509.BasicConstraints(ca=False, path_length=None), critical=True,
).add_extension(x509.KeyUsage(
digital_signature=True,
key_encipherment=True,
content_commitment=False,
data_encipherment=False,
key_agreement=False,
key_cert_sign=False,
crl_sign=False,
encipher_only=False,
decipher_only=False), critical=True,
).add_extension(x509.ExtendedKeyUsage(
[ExtendedKeyUsageOID.CLIENT_AUTH]
), critical=True,
).add_extension(
x509.SubjectKeyIdentifier.from_public_key(request.public_key()),
critical=False
).add_extension(
x509.AuthorityKeyIdentifier.from_issuer_public_key(
self.server.certificate.public_key()),
critical=False
).sign(self.server.private_key, hashes.SHA512(), default_backend())
# TODO: Potential exploits during PEM parsing?
cert = raw_sign(
self.server.private_key,
self.server.certificate,
request,
basic_constraints=config.CERTIFICATE_BASIC_CONSTRAINTS,
key_usage=config.CERTIFICATE_KEY_USAGE_FLAGS,
extended_key_usage=config.CERTIFICATE_EXTENDED_KEY_USAGE_FLAGS,
lifetime=config.CERTIFICATE_LIFETIME)
self.send(crypto.dump_certificate(crypto.FILETYPE_PEM, cert))
self.send(cert.public_bytes(serialization.Encoding.PEM))
else:
raise NotImplementedError("Unknown command: %s" % cmd)
self.close_when_done()
def found_terminator(self):
args = (b"".join(self.buffer)).decode("ascii").split("\n", 1)
args = (b"".join(self.buffer)).split("\n", 1)
self.parse_command(*args)
self.buffer = []
@ -184,20 +208,17 @@ class SignServer(asyncore.dispatcher):
self.bind(config.SIGNER_SOCKET_PATH)
self.listen(5)
# Load CA private key and certificate
self.private_key = crypto.load_privatekey(crypto.FILETYPE_PEM,
open(config.AUTHORITY_PRIVATE_KEY_PATH).read())
self.certificate = crypto.load_certificate(crypto.FILETYPE_PEM,
open(config.AUTHORITY_CERTIFICATE_PATH).read())
self.private_key = serialization.load_pem_private_key(
open(config.AUTHORITY_PRIVATE_KEY_PATH).read(),
password=None, # TODO: Ask password for private key?
backend=default_backend())
self.certificate = x509.load_pem_x509_certificate(
open(config.AUTHORITY_CERTIFICATE_PATH).read(),
backend=default_backend())
# Perhaps perform chroot as well, currently results in
# (<class 'OpenSSL.crypto.Error'>:[('random number generator', 'SSLEAY_RAND_BYTES', 'PRNG not seeded')
# probably needs partially populated /dev in chroot
# Dropping privileges
# Drop privileges
_, _, uid, gid, gecos, root, shell = pwd.getpwnam("nobody")
#os.chroot("/run/certidude/signer/jail")
os.setgid(gid)
os.setuid(uid)