mirror of
https://github.com/laurivosandi/certidude
synced 2024-12-22 08:15:18 +00:00
Bugfixes and test for SCEP
This commit is contained in:
parent
9b5511212e
commit
e25c774fa3
@ -90,9 +90,9 @@ class SessionResource(object):
|
||||
)
|
||||
|
||||
if req.context.get("user").is_admin():
|
||||
logger.info("Logged in authority administrator %s" % req.context.get("user"))
|
||||
logger.info("Logged in authority administrator %s from %s" % (req.context.get("user"), req.context.get("remote_addr")))
|
||||
else:
|
||||
logger.info("Logged in authority user %s" % req.context.get("user"))
|
||||
logger.info("Logged in authority user %s from %s" % (req.context.get("user"), req.context.get("remote_addr")))
|
||||
return dict(
|
||||
user = dict(
|
||||
name=req.context.get("user").name,
|
||||
|
@ -2,13 +2,12 @@ import click
|
||||
import hashlib
|
||||
import os
|
||||
from asn1crypto.util import timezone
|
||||
from datetime import datetime, timedelta
|
||||
|
||||
from asn1crypto import cms, algos, x509, ocsp
|
||||
from base64 import b64decode, b64encode
|
||||
from certbuilder import pem_armor_certificate
|
||||
from certidude import authority, push, config
|
||||
from certidude.firewall import whitelist_subnets
|
||||
from datetime import datetime, timedelta
|
||||
from oscrypto import keys, asymmetric, symmetric
|
||||
from oscrypto.errors import SignatureError
|
||||
|
||||
@ -35,7 +34,7 @@ class OCSPResource(object):
|
||||
if ext["extn_id"].native == "nonce":
|
||||
response_extensions.append(
|
||||
ocsp.ResponseDataExtension({
|
||||
'extn_id': "nonce",
|
||||
'extn_id': u"nonce",
|
||||
'critical': False,
|
||||
'extn_value': ext["extn_value"]
|
||||
})
|
||||
@ -62,7 +61,7 @@ class OCSPResource(object):
|
||||
name='revoked',
|
||||
value={
|
||||
'revocation_time': revoked,
|
||||
'revocation_reason': "key_compromise",
|
||||
'revocation_reason': u"key_compromise",
|
||||
})
|
||||
except EnvironmentError:
|
||||
status = ocsp.CertStatus(name="unknown", value=None)
|
||||
@ -70,7 +69,7 @@ class OCSPResource(object):
|
||||
responses.append({
|
||||
'cert_id': {
|
||||
'hash_algorithm': {
|
||||
'algorithm': "sha1"
|
||||
'algorithm': u"sha1"
|
||||
},
|
||||
'issuer_name_hash': server_certificate.asn1.subject.sha1,
|
||||
'issuer_key_hash': server_certificate.public_key.asn1.sha1,
|
||||
@ -89,12 +88,12 @@ class OCSPResource(object):
|
||||
})
|
||||
|
||||
resp.body = ocsp.OCSPResponse({
|
||||
'response_status': "successful",
|
||||
'response_status': u"successful",
|
||||
'response_bytes': {
|
||||
'response_type': 'basic_ocsp_response',
|
||||
'response_type': u"basic_ocsp_response",
|
||||
'response': {
|
||||
'tbs_response_data': response_data,
|
||||
'signature_algorithm': {'algorithm': "sha1_rsa"},
|
||||
'signature_algorithm': {'algorithm': u"sha1_rsa"},
|
||||
'signature': b64decode(authority.signer_exec("sign-pkcs7", b64encode(response_data.dump()))),
|
||||
'certs': [server_certificate.asn1]
|
||||
}
|
||||
|
@ -15,12 +15,12 @@ from oscrypto.errors import SignatureError
|
||||
class SetOfPrintableString(SetOf):
|
||||
_child_spec = PrintableString
|
||||
|
||||
cms.CMSAttributeType._map['2.16.840.1.113733.1.9.2'] = "message_type"
|
||||
cms.CMSAttributeType._map['2.16.840.1.113733.1.9.3'] = "pki_status"
|
||||
cms.CMSAttributeType._map['2.16.840.1.113733.1.9.4'] = "fail_info"
|
||||
cms.CMSAttributeType._map['2.16.840.1.113733.1.9.5'] = "sender_nonce"
|
||||
cms.CMSAttributeType._map['2.16.840.1.113733.1.9.6'] = "recipient_nonce"
|
||||
cms.CMSAttributeType._map['2.16.840.1.113733.1.9.7'] = "trans_id"
|
||||
cms.CMSAttributeType._map['2.16.840.1.113733.1.9.2'] = u"message_type"
|
||||
cms.CMSAttributeType._map['2.16.840.1.113733.1.9.3'] = u"pki_status"
|
||||
cms.CMSAttributeType._map['2.16.840.1.113733.1.9.4'] = u"fail_info"
|
||||
cms.CMSAttributeType._map['2.16.840.1.113733.1.9.5'] = u"sender_nonce"
|
||||
cms.CMSAttributeType._map['2.16.840.1.113733.1.9.6'] = u"recipient_nonce"
|
||||
cms.CMSAttributeType._map['2.16.840.1.113733.1.9.7'] = u"trans_id"
|
||||
|
||||
cms.CMSAttribute._oid_specs['message_type'] = SetOfPrintableString
|
||||
cms.CMSAttribute._oid_specs['pki_status'] = SetOfPrintableString
|
||||
@ -54,12 +54,12 @@ class SCEPResource(object):
|
||||
encrypted_container = b""
|
||||
attr_list = [
|
||||
cms.CMSAttribute({
|
||||
'type': "message_type",
|
||||
'values': ["3"]
|
||||
'type': u"message_type",
|
||||
'values': [u"3"]
|
||||
}),
|
||||
cms.CMSAttribute({
|
||||
'type': "pki_status",
|
||||
'values': ["2"] # rejected
|
||||
'type': u"pki_status",
|
||||
'values': [u"2"] # rejected
|
||||
})
|
||||
]
|
||||
|
||||
@ -139,7 +139,7 @@ class SCEPResource(object):
|
||||
|
||||
except SCEPError, e:
|
||||
attr_list.append(cms.CMSAttribute({
|
||||
'type': "fail_info",
|
||||
'type': u"fail_info",
|
||||
'values': ["%d" % e.code]
|
||||
}))
|
||||
else:
|
||||
@ -149,17 +149,17 @@ class SCEPResource(object):
|
||||
##################################
|
||||
|
||||
degenerate = cms.ContentInfo({
|
||||
'content_type': 'signed_data',
|
||||
'content_type': u"signed_data",
|
||||
'content': cms.SignedData({
|
||||
'version': 'v1',
|
||||
'version': u"v1",
|
||||
'certificates': [signed_certificate.asn1],
|
||||
'digest_algorithms': [cms.DigestAlgorithm({
|
||||
'algorithm':'md5'
|
||||
'algorithm': u"md5"
|
||||
})],
|
||||
'encap_content_info': {
|
||||
'content_type': 'data',
|
||||
'content_type': u"data",
|
||||
'content': cms.ContentInfo({
|
||||
'content_type': 'signed_data',
|
||||
'content_type': u"signed_data",
|
||||
'content': None
|
||||
}).dump()
|
||||
},
|
||||
@ -178,7 +178,7 @@ class SCEPResource(object):
|
||||
|
||||
ri = cms.RecipientInfo({
|
||||
'ktri': cms.KeyTransRecipientInfo({
|
||||
'version': 'v0',
|
||||
'version': u"v0",
|
||||
'rid': cms.RecipientIdentifier({
|
||||
'issuer_and_serial_number': cms.IssuerAndSerialNumber({
|
||||
'issuer': current_certificate.chosen["tbs_certificate"]["issuer"],
|
||||
@ -186,7 +186,7 @@ class SCEPResource(object):
|
||||
}),
|
||||
}),
|
||||
'key_encryption_algorithm': {
|
||||
'algorithm': 'rsa'
|
||||
'algorithm': u"rsa"
|
||||
},
|
||||
'encrypted_key': asymmetric.rsa_pkcs1v15_encrypt(
|
||||
asymmetric.load_certificate(current_certificate.chosen.dump()), key)
|
||||
@ -194,14 +194,14 @@ class SCEPResource(object):
|
||||
})
|
||||
|
||||
encrypted_container = cms.ContentInfo({
|
||||
'content_type': 'enveloped_data',
|
||||
'content_type': u"enveloped_data",
|
||||
'content': cms.EnvelopedData({
|
||||
'version': 'v1',
|
||||
'version': u"v1",
|
||||
'recipient_infos': [ri],
|
||||
'encrypted_content_info': {
|
||||
'content_type': 'data',
|
||||
'content_type': u"data",
|
||||
'content_encryption_algorithm': {
|
||||
'algorithm': 'des',
|
||||
'algorithm': u"des",
|
||||
'parameters': iv
|
||||
},
|
||||
'encrypted_content': encrypted_content
|
||||
@ -211,16 +211,16 @@ class SCEPResource(object):
|
||||
|
||||
attr_list = [
|
||||
cms.CMSAttribute({
|
||||
'type': 'message_digest',
|
||||
'type': u"message_digest",
|
||||
'values': [hashlib.sha1(encrypted_container).digest()]
|
||||
}),
|
||||
cms.CMSAttribute({
|
||||
'type': "message_type",
|
||||
'values': ["3"]
|
||||
'type': u"message_type",
|
||||
'values': [u"3"]
|
||||
}),
|
||||
cms.CMSAttribute({
|
||||
'type': "pki_status",
|
||||
'values': ["0"] # ok
|
||||
'type': u"pki_status",
|
||||
'values': [u"0"] # ok
|
||||
})
|
||||
]
|
||||
finally:
|
||||
@ -231,40 +231,40 @@ class SCEPResource(object):
|
||||
|
||||
attrs = cms.CMSAttributes(attr_list + [
|
||||
cms.CMSAttribute({
|
||||
'type': "recipient_nonce",
|
||||
'type': u"recipient_nonce",
|
||||
'values': [sender_nonce]
|
||||
}),
|
||||
cms.CMSAttribute({
|
||||
'type': 'trans_id',
|
||||
'type': u"trans_id",
|
||||
'values': [transaction_id]
|
||||
})
|
||||
])
|
||||
|
||||
signer = cms.SignerInfo({
|
||||
"signed_attrs": attrs,
|
||||
'version':'v1',
|
||||
'version': u"v1",
|
||||
'sid': cms.SignerIdentifier({
|
||||
'issuer_and_serial_number': cms.IssuerAndSerialNumber({
|
||||
'issuer': server_certificate.asn1["tbs_certificate"]["issuer"],
|
||||
'serial_number': server_certificate.asn1["tbs_certificate"]["serial_number"],
|
||||
}),
|
||||
}),
|
||||
'digest_algorithm': algos.DigestAlgorithm({'algorithm': 'sha1'}),
|
||||
'signature_algorithm': algos.SignedDigestAlgorithm({'algorithm': 'rsassa_pkcs1v15'}),
|
||||
'digest_algorithm': algos.DigestAlgorithm({'algorithm': u"sha1"}),
|
||||
'signature_algorithm': algos.SignedDigestAlgorithm({'algorithm': u"rsassa_pkcs1v15"}),
|
||||
'signature': b64decode(authority.signer_exec("sign-pkcs7", b64encode(b"\x31" + attrs.dump()[1:])))
|
||||
})
|
||||
|
||||
resp.append_header("Content-Type", "application/x-pki-message")
|
||||
resp.body = cms.ContentInfo({
|
||||
'content_type': 'signed_data',
|
||||
'content_type': u"signed_data",
|
||||
'content': cms.SignedData({
|
||||
'version': 'v1',
|
||||
'version': u"v1",
|
||||
'certificates': [x509.Certificate.load(server_certificate.asn1.dump())], # wat
|
||||
'digest_algorithms': [cms.DigestAlgorithm({
|
||||
'algorithm':'sha1'
|
||||
'algorithm': u"sha1"
|
||||
})],
|
||||
'encap_content_info': {
|
||||
'content_type': 'data',
|
||||
'content_type': u"data",
|
||||
'content': encrypted_container
|
||||
},
|
||||
'signer_infos': [signer]
|
||||
|
@ -18,14 +18,17 @@ class ScriptResource():
|
||||
else:
|
||||
script = config.SCRIPT_DEFAULT
|
||||
tags = []
|
||||
for tag in attribs.get("user").get("xdg").get("tags").split(","):
|
||||
if "=" in tag:
|
||||
k, v = tag.split("=", 1)
|
||||
else:
|
||||
k, v = "other", tag
|
||||
if k == "script":
|
||||
script = v
|
||||
tags.append(dict(id=tag, key=k, value=v))
|
||||
try:
|
||||
for tag in attribs.get("user").get("xdg").get("tags").split(","):
|
||||
if "=" in tag:
|
||||
k, v = tag.split("=", 1)
|
||||
else:
|
||||
k, v = "other", tag
|
||||
if k == "script":
|
||||
script = v
|
||||
tags.append(dict(id=tag, key=k, value=v))
|
||||
except AttributeError: # No tags
|
||||
pass
|
||||
|
||||
resp.set_header("Content-Type", "text/x-shellscript")
|
||||
resp.body = env.get_template(script).render(
|
||||
|
@ -1276,7 +1276,7 @@ def certidude_cron():
|
||||
@click.command("serve", help="Run server")
|
||||
@click.option("-e", "--exit-handler", default=False, is_flag=True, help="Install /api/exit/ handler")
|
||||
@click.option("-p", "--port", default=8080, help="Listen port")
|
||||
@click.option("-l", "--listen", default="127.0.0.1", help="Listen address")
|
||||
@click.option("-l", "--listen", default="127.0.1.1", help="Listen address")
|
||||
@click.option("-f", "--fork", default=False, is_flag=True, help="Fork to background")
|
||||
def certidude_serve(port, listen, fork, exit_handler):
|
||||
import pwd
|
||||
|
@ -50,9 +50,14 @@ def whitelist_subject(func):
|
||||
except IOError:
|
||||
raise falcon.HTTPNotFound()
|
||||
else:
|
||||
inner_address = getxattr(path, "user.lease.inner_address").decode("ascii")
|
||||
if req.context.get("remote_addr") != ip_address(inner_address):
|
||||
try:
|
||||
inner_address = getxattr(path, "user.lease.inner_address").decode("ascii")
|
||||
except IOError:
|
||||
raise falcon.HTTPForbidden("Forbidden", "Remote address %s not whitelisted" % req.context.get("remote_addr"))
|
||||
return func(self, req, resp, cn, *args, **kwargs)
|
||||
else:
|
||||
if req.context.get("remote_addr") != ip_address(inner_address):
|
||||
raise falcon.HTTPForbidden("Forbidden", "Remote address %s mismatch" % req.context.get("remote_addr"))
|
||||
else:
|
||||
return func(self, req, resp, cn, *args, **kwargs)
|
||||
return wrapped
|
||||
|
||||
|
@ -42,7 +42,8 @@ def send(template, to=None, include_admins=True, attachments=(), **context):
|
||||
part.add_header('Content-Disposition', 'attachment', filename=suggested_filename)
|
||||
part.set_payload(attachment)
|
||||
msg.attach(part)
|
||||
click.echo("Sending to: %s" % msg["to"])
|
||||
|
||||
conn = smtplib.SMTP("localhost")
|
||||
conn.sendmail(config.MAILER_ADDRESS, [u.mail for u in recipients], msg.as_string())
|
||||
if config.MAILER_ADDRESS:
|
||||
click.echo("Sending to: %s" % msg["to"])
|
||||
conn = smtplib.SMTP("localhost")
|
||||
conn.sendmail(config.MAILER_ADDRESS, [u.mail for u in recipients], msg.as_string())
|
||||
|
@ -37,6 +37,10 @@ server {
|
||||
alias /var/www/html/.well-known/;
|
||||
}
|
||||
|
||||
# Rewrite /cgi-bin/pkiclient.exe to /api/scep for SCEP protocol
|
||||
location /cgi-bin/pkiclient.exe {
|
||||
rewrite /cgi-bin/pkiclient.exe /api/scep/ last;
|
||||
}
|
||||
|
||||
{% if not push_server %}
|
||||
# This only works with nchan, for Debian 9 just apt install libnginx-mod-nchan
|
||||
|
@ -485,13 +485,19 @@ def test_cli_setup_authority():
|
||||
|
||||
# Test attribute fetching API call
|
||||
r = client().simulate_get("/api/signed/test/attr/")
|
||||
assert r.status_code == 401, r.text
|
||||
r = client().simulate_get("/api/signed/test/attr/", headers={"Authorization":usertoken})
|
||||
assert r.status_code == 403, r.text
|
||||
r = client().simulate_get("/api/signed/nonexistant/attr/")
|
||||
assert r.status_code == 404, r.text
|
||||
r = client().simulate_get("/api/signed/test/lease/", headers={"Authorization":admintoken})
|
||||
r = client().simulate_get("/api/signed/test/attr/", headers={"Authorization":admintoken})
|
||||
assert r.status_code == 200, r.text
|
||||
r = client().simulate_get("/api/signed/nonexistant/attr/", headers={"Authorization":admintoken})
|
||||
assert r.status_code == 404, r.text
|
||||
|
||||
# Insert lease
|
||||
r = client().simulate_get("/api/signed/test/script/")
|
||||
assert r.status_code == 403, r.text # script not authorized
|
||||
r = client().simulate_get("/api/signed/test/lease/", headers={"Authorization":admintoken})
|
||||
assert r.status_code == 404, r.text
|
||||
r = client().simulate_post("/api/lease/",
|
||||
query_string = "client=test&inner_address=127.0.0.1&outer_address=8.8.8.8",
|
||||
headers={"Authorization":admintoken})
|
||||
@ -500,21 +506,16 @@ def test_cli_setup_authority():
|
||||
assert r.status_code == 404, r.text # cert not found
|
||||
r = client().simulate_get("/api/signed/test/script/")
|
||||
assert r.status_code == 200, r.text # script render ok
|
||||
assert "uci set " in r.text, r.text
|
||||
assert "curl http://ca.example.lan/api/signed/test/attr " in r.text, r.text
|
||||
|
||||
r = client().simulate_post("/api/lease/",
|
||||
query_string = "client=test&inner_address=127.0.0.1&outer_address=8.8.8.8&serial=0",
|
||||
headers={"Authorization":admintoken})
|
||||
assert r.status_code == 403, r.text # invalid serial number supplied
|
||||
r = client().simulate_get("/api/signed/test/attr/")
|
||||
assert r.status_code == 200, r.text # read okay from own address
|
||||
r = client().simulate_post("/api/lease/",
|
||||
query_string = "client=test&inner_address=1.2.3.4&outer_address=8.8.8.8",
|
||||
headers={"Authorization":admintoken})
|
||||
assert r.status_code == 200, r.text # lease update ok
|
||||
r = client().simulate_get("/api/signed/test/attr/")
|
||||
assert r.status_code == 403, r.text # read failed from other address
|
||||
|
||||
|
||||
# Test lease retrieval
|
||||
r = client().simulate_get("/api/signed/test/lease/")
|
||||
@ -978,6 +979,7 @@ def test_cli_setup_authority():
|
||||
os.system("sed -e 's/machine enrollment =.*/machine enrollment = allowed/g' -i /etc/certidude/server.conf")
|
||||
os.system("sed -e 's/scep subnets =.*/scep subnets = 0.0.0.0\\/0/g' -i /etc/certidude/server.conf")
|
||||
os.system("sed -e 's/ocsp subnets =.*/ocsp subnets = 0.0.0.0\\/0/g' -i /etc/certidude/server.conf")
|
||||
os.system("sed -e 's/address = certificates@example.lan/address =/g' -i /etc/certidude/server.conf")
|
||||
from certidude.common import pip
|
||||
pip("asn1crypto certbuilder")
|
||||
|
||||
@ -1103,6 +1105,20 @@ def test_cli_setup_authority():
|
||||
os.waitpid(mach_pid, 0)
|
||||
|
||||
|
||||
##################
|
||||
### SCEP tests ###
|
||||
##################
|
||||
|
||||
os.umask(0022)
|
||||
assert not os.system("git clone https://github.com/certnanny/sscep /tmp/sscep")
|
||||
assert not os.system("cd /tmp/sscep && ./Configure && make sscep_dyn")
|
||||
assert not os.system("/tmp/sscep/sscep_dyn getca -c /tmp/sscep/ca.pem -u http://ca.example.lan/cgi-bin/pkiclient.exe")
|
||||
assert not os.system("openssl genrsa -out /tmp/key.pem 1024")
|
||||
assert not os.system("echo '.\n.\n.\n.\n.\ntest8\n\n\n\n' | openssl req -new -sha256 -key /tmp/key.pem -out /tmp/req.pem")
|
||||
assert not os.system("/tmp/sscep/sscep_dyn enroll -c /tmp/sscep/ca.pem -u http://ca.example.lan/cgi-bin/pkiclient.exe -k /tmp/key.pem -r /tmp/req.pem -l /tmp/cert.pem")
|
||||
# TODO: test e-mails at this point
|
||||
|
||||
|
||||
###################
|
||||
### Final tests ###
|
||||
###################
|
||||
|
Loading…
Reference in New Issue
Block a user