mirror of
https://github.com/laurivosandi/certidude
synced 2025-10-30 17:09:19 +00:00
Migrate renewal to mutually authenticated TLS connection
This commit is contained in:
@@ -4,7 +4,7 @@ import logging
|
||||
import json
|
||||
import os
|
||||
import hashlib
|
||||
from asn1crypto import pem
|
||||
from asn1crypto import pem, x509
|
||||
from asn1crypto.csr import CertificationRequest
|
||||
from base64 import b64decode
|
||||
from certidude import config, push, errors
|
||||
@@ -89,43 +89,29 @@ class RequestListResource(AuthorityHandler):
|
||||
cert_pk = cert["tbs_certificate"]["subject_public_key_info"].native
|
||||
csr_pk = csr["certification_request_info"]["subject_pk_info"].native
|
||||
|
||||
if cert_pk == csr_pk: # Same public key, assume renewal
|
||||
expires = cert["tbs_certificate"]["validity"]["not_after"].native.replace(tzinfo=None)
|
||||
renewal_header = req.get_header("X-Renewal-Signature")
|
||||
try:
|
||||
buf = req.get_header("X-SSL-CERT")
|
||||
header, _, der_bytes = pem.unarmor(buf.replace("\t", "").encode("ascii"))
|
||||
handshake_cert = x509.Certificate.load(der_bytes)
|
||||
except:
|
||||
raise
|
||||
else:
|
||||
# Same public key
|
||||
if cert_pk == csr_pk:
|
||||
# Used mutually authenticated TLS handshake, assume renewal
|
||||
if handshake_cert.native == cert.native:
|
||||
for subnet in config.RENEWAL_SUBNETS:
|
||||
if req.context.get("remote_addr") in subnet:
|
||||
resp.set_header("Content-Type", "application/x-x509-user-cert")
|
||||
_, resp.body = self.authority._sign(csr, body, overwrite=True)
|
||||
logger.info("Renewing certificate for %s as %s is whitelisted", common_name, req.context.get("remote_addr"))
|
||||
return
|
||||
|
||||
if not renewal_header:
|
||||
# No header supplied, redirect to signed API call
|
||||
resp.status = falcon.HTTP_SEE_OTHER
|
||||
resp.location = os.path.join(os.path.dirname(req.relative_uri), "signed", common_name)
|
||||
return
|
||||
|
||||
try:
|
||||
renewal_signature = b64decode(renewal_header)
|
||||
except (TypeError, ValueError):
|
||||
logger.error("Renewal failed, bad signature supplied for %s", common_name)
|
||||
reasons.append("Renewal failed, bad signature supplied")
|
||||
else:
|
||||
try:
|
||||
asymmetric.rsa_pss_verify(
|
||||
asymmetric.load_certificate(cert),
|
||||
renewal_signature, buf + body, "sha512")
|
||||
except SignatureError:
|
||||
logger.error("Renewal failed, invalid signature supplied for %s", common_name)
|
||||
reasons.append("Renewal failed, invalid signature supplied")
|
||||
else:
|
||||
# At this point renewal signature was valid but we need to perform some extra checks
|
||||
if datetime.utcnow() > expires:
|
||||
logger.error("Renewal failed, current certificate for %s has expired", common_name)
|
||||
reasons.append("Renewal failed, current certificate expired")
|
||||
elif not config.CERTIFICATE_RENEWAL_ALLOWED:
|
||||
logger.error("Renewal requested for %s, but not allowed by authority settings", common_name)
|
||||
reasons.append("Renewal requested, but not allowed by authority settings")
|
||||
else:
|
||||
resp.set_header("Content-Type", "application/x-x509-user-cert")
|
||||
_, resp.body = self.authority._sign(csr, body, overwrite=True)
|
||||
logger.info("Renewed certificate for %s", common_name)
|
||||
return
|
||||
|
||||
|
||||
"""
|
||||
Process automatic signing if the IP address is whitelisted,
|
||||
|
||||
Reference in New Issue
Block a user