api: scep: drop usage of global authority import

This commit is contained in:
Priit Laes 2018-02-03 12:53:19 +02:00
parent 239538371f
commit 1cfb1b3293
2 changed files with 14 additions and 11 deletions

View File

@ -255,7 +255,7 @@ def certidude_app(log_handlers=[]):
# Add SCEP handler if we have any whitelisted subnets # Add SCEP handler if we have any whitelisted subnets
if config.SCEP_SUBNETS: if config.SCEP_SUBNETS:
from .scep import SCEPResource from .scep import SCEPResource
app.add_route("/api/scep/", SCEPResource()) app.add_route("/api/scep/", SCEPResource(authority))
# Add sink for serving static files # Add sink for serving static files
app.add_sink(StaticResource(os.path.join(__file__, "..", "..", "static"))) app.add_sink(StaticResource(os.path.join(__file__, "..", "..", "static")))

View File

@ -5,7 +5,7 @@ from asn1crypto import cms, algos, x509
from asn1crypto.core import ObjectIdentifier, SetOf, PrintableString from asn1crypto.core import ObjectIdentifier, SetOf, PrintableString
from base64 import b64decode, b64encode from base64 import b64decode, b64encode
from certbuilder import pem_armor_certificate from certbuilder import pem_armor_certificate
from certidude import authority, push, config from certidude import push, config
from certidude.firewall import whitelist_subnets from certidude.firewall import whitelist_subnets
from oscrypto import keys, asymmetric, symmetric from oscrypto import keys, asymmetric, symmetric
from oscrypto.errors import SignatureError from oscrypto.errors import SignatureError
@ -37,11 +37,14 @@ class SCEPBadTime(SCEPError): code = 3
class SCEPBadCertId(SCEPError): code = 4 class SCEPBadCertId(SCEPError): code = 4
class SCEPResource(object): class SCEPResource(object):
def __init__(self, authority):
self.authority = authority
@whitelist_subnets(config.SCEP_SUBNETS) @whitelist_subnets(config.SCEP_SUBNETS)
def on_get(self, req, resp): def on_get(self, req, resp):
operation = req.get_param("operation", required=True) operation = req.get_param("operation", required=True)
if operation.lower() == "getcacert": if operation.lower() == "getcacert":
resp.body = keys.parse_certificate(authority.certificate_buf).dump() resp.body = keys.parse_certificate(self.authority.certificate_buf).dump()
resp.append_header("Content-Type", "application/x-x509-ca-cert") resp.append_header("Content-Type", "application/x-x509-ca-cert")
return return
@ -120,17 +123,17 @@ class SCEPResource(object):
encrypted_content = encrypted_content_info['encrypted_content'].native encrypted_content = encrypted_content_info['encrypted_content'].native
recipient, = encrypted_envelope['recipient_infos'] recipient, = encrypted_envelope['recipient_infos']
if recipient.native["rid"]["serial_number"] != authority.certificate.serial_number: if recipient.native["rid"]["serial_number"] != self.authority.certificate.serial_number:
raise SCEPBadCertId() raise SCEPBadCertId()
# Since CA private key is not directly readable here, we'll redirect it to signer socket # Since CA private key is not directly readable here, we'll redirect it to signer socket
key = asymmetric.rsa_pkcs1v15_decrypt( key = asymmetric.rsa_pkcs1v15_decrypt(
authority.private_key, self.authority.private_key,
recipient.native["encrypted_key"]) recipient.native["encrypted_key"])
if len(key) == 8: key = key * 3 # Convert DES to 3DES if len(key) == 8: key = key * 3 # Convert DES to 3DES
buf = symmetric.tripledes_cbc_pkcs5_decrypt(key, encrypted_content, iv) buf = symmetric.tripledes_cbc_pkcs5_decrypt(key, encrypted_content, iv)
_, _, common_name = authority.store_request(buf, overwrite=True) _, _, common_name = self.authority.store_request(buf, overwrite=True)
cert, buf = authority.sign(common_name, overwrite=True) cert, buf = self.authority.sign(common_name, overwrite=True)
signed_certificate = asymmetric.load_certificate(buf) signed_certificate = asymmetric.load_certificate(buf)
content = signed_certificate.asn1.dump() content = signed_certificate.asn1.dump()
@ -242,14 +245,14 @@ class SCEPResource(object):
'version': "v1", 'version': "v1",
'sid': cms.SignerIdentifier({ 'sid': cms.SignerIdentifier({
'issuer_and_serial_number': cms.IssuerAndSerialNumber({ 'issuer_and_serial_number': cms.IssuerAndSerialNumber({
'issuer': authority.certificate.issuer, 'issuer': self.authority.certificate.issuer,
'serial_number': authority.certificate.serial_number, 'serial_number': self.authority.certificate.serial_number,
}), }),
}), }),
'digest_algorithm': algos.DigestAlgorithm({'algorithm': "sha1"}), 'digest_algorithm': algos.DigestAlgorithm({'algorithm': "sha1"}),
'signature_algorithm': algos.SignedDigestAlgorithm({'algorithm': "rsassa_pkcs1v15"}), 'signature_algorithm': algos.SignedDigestAlgorithm({'algorithm': "rsassa_pkcs1v15"}),
'signature': asymmetric.rsa_pkcs1v15_sign( 'signature': asymmetric.rsa_pkcs1v15_sign(
authority.private_key, self.authority.private_key,
b"\x31" + attrs.dump()[1:], b"\x31" + attrs.dump()[1:],
"sha1" "sha1"
) )
@ -260,7 +263,7 @@ class SCEPResource(object):
'content_type': "signed_data", 'content_type': "signed_data",
'content': cms.SignedData({ 'content': cms.SignedData({
'version': "v1", 'version': "v1",
'certificates': [authority.certificate], 'certificates': [self.authority.certificate],
'digest_algorithms': [cms.DigestAlgorithm({ 'digest_algorithms': [cms.DigestAlgorithm({
'algorithm': "sha1" 'algorithm': "sha1"
})], })],