diff --git a/certidude/api/__init__.py b/certidude/api/__init__.py index df4975a..3902ef4 100644 --- a/certidude/api/__init__.py +++ b/certidude/api/__init__.py @@ -255,7 +255,7 @@ def certidude_app(log_handlers=[]): # Add SCEP handler if we have any whitelisted subnets if config.SCEP_SUBNETS: from .scep import SCEPResource - app.add_route("/api/scep/", SCEPResource()) + app.add_route("/api/scep/", SCEPResource(authority)) # Add sink for serving static files app.add_sink(StaticResource(os.path.join(__file__, "..", "..", "static"))) diff --git a/certidude/api/scep.py b/certidude/api/scep.py index 7c1aa95..57b9115 100644 --- a/certidude/api/scep.py +++ b/certidude/api/scep.py @@ -5,7 +5,7 @@ from asn1crypto import cms, algos, x509 from asn1crypto.core import ObjectIdentifier, SetOf, PrintableString from base64 import b64decode, b64encode from certbuilder import pem_armor_certificate -from certidude import authority, push, config +from certidude import push, config from certidude.firewall import whitelist_subnets from oscrypto import keys, asymmetric, symmetric from oscrypto.errors import SignatureError @@ -37,11 +37,14 @@ class SCEPBadTime(SCEPError): code = 3 class SCEPBadCertId(SCEPError): code = 4 class SCEPResource(object): + def __init__(self, authority): + self.authority = authority + @whitelist_subnets(config.SCEP_SUBNETS) def on_get(self, req, resp): operation = req.get_param("operation", required=True) if operation.lower() == "getcacert": - resp.body = keys.parse_certificate(authority.certificate_buf).dump() + resp.body = keys.parse_certificate(self.authority.certificate_buf).dump() resp.append_header("Content-Type", "application/x-x509-ca-cert") return @@ -120,17 +123,17 @@ class SCEPResource(object): encrypted_content = encrypted_content_info['encrypted_content'].native recipient, = encrypted_envelope['recipient_infos'] - if recipient.native["rid"]["serial_number"] != authority.certificate.serial_number: + if recipient.native["rid"]["serial_number"] != self.authority.certificate.serial_number: raise SCEPBadCertId() # Since CA private key is not directly readable here, we'll redirect it to signer socket key = asymmetric.rsa_pkcs1v15_decrypt( - authority.private_key, + self.authority.private_key, recipient.native["encrypted_key"]) if len(key) == 8: key = key * 3 # Convert DES to 3DES buf = symmetric.tripledes_cbc_pkcs5_decrypt(key, encrypted_content, iv) - _, _, common_name = authority.store_request(buf, overwrite=True) - cert, buf = authority.sign(common_name, overwrite=True) + _, _, common_name = self.authority.store_request(buf, overwrite=True) + cert, buf = self.authority.sign(common_name, overwrite=True) signed_certificate = asymmetric.load_certificate(buf) content = signed_certificate.asn1.dump() @@ -242,14 +245,14 @@ class SCEPResource(object): 'version': "v1", 'sid': cms.SignerIdentifier({ 'issuer_and_serial_number': cms.IssuerAndSerialNumber({ - 'issuer': authority.certificate.issuer, - 'serial_number': authority.certificate.serial_number, + 'issuer': self.authority.certificate.issuer, + 'serial_number': self.authority.certificate.serial_number, }), }), 'digest_algorithm': algos.DigestAlgorithm({'algorithm': "sha1"}), 'signature_algorithm': algos.SignedDigestAlgorithm({'algorithm': "rsassa_pkcs1v15"}), 'signature': asymmetric.rsa_pkcs1v15_sign( - authority.private_key, + self.authority.private_key, b"\x31" + attrs.dump()[1:], "sha1" ) @@ -260,7 +263,7 @@ class SCEPResource(object): 'content_type': "signed_data", 'content': cms.SignedData({ 'version': "v1", - 'certificates': [authority.certificate], + 'certificates': [self.authority.certificate], 'digest_algorithms': [cms.DigestAlgorithm({ 'algorithm': "sha1" })],