mirror of
				https://github.com/laurivosandi/certidude
				synced 2025-10-31 09:29:13 +00:00 
			
		
		
		
	api: request: drop usage of global authority import
This commit is contained in:
		| @@ -220,8 +220,8 @@ def certidude_app(log_handlers=[]): | ||||
|     # Certificate authority API calls | ||||
|     app.add_route("/api/certificate/", CertificateAuthorityResource()) | ||||
|     app.add_route("/api/signed/{cn}/", SignedCertificateDetailResource()) | ||||
|     app.add_route("/api/request/{cn}/", RequestDetailResource()) | ||||
|     app.add_route("/api/request/", RequestListResource()) | ||||
|     app.add_route("/api/request/{cn}/", RequestDetailResource(authority)) | ||||
|     app.add_route("/api/request/", RequestListResource(authority)) | ||||
|     app.add_route("/api/", SessionResource()) | ||||
|  | ||||
|     if config.USER_ENROLLMENT_ALLOWED: # TODO: add token enable/disable flag for config | ||||
|   | ||||
| @@ -1,4 +1,3 @@ | ||||
|  | ||||
| import click | ||||
| import falcon | ||||
| import logging | ||||
| @@ -9,7 +8,7 @@ import hashlib | ||||
| from asn1crypto import pem | ||||
| from asn1crypto.csr import CertificationRequest | ||||
| from base64 import b64decode | ||||
| from certidude import config, authority, push, errors | ||||
| from certidude import config, push, errors | ||||
| from certidude.auth import login_required, login_optional, authorize_admin | ||||
| from certidude.decorators import csrf_protection, MyEncoder, serialize | ||||
| from certidude.firewall import whitelist_subnets, whitelist_content_types | ||||
| @@ -28,6 +27,9 @@ curl -f -L -H "Content-type: application/pkcs10" --data-binary @test.csr \ | ||||
| """ | ||||
|  | ||||
| class RequestListResource(object): | ||||
|     def __init__(self, authority): | ||||
|         self.authority = authority | ||||
|  | ||||
|     @login_optional | ||||
|     @whitelist_subnets(config.REQUEST_SUBNETS) | ||||
|     @whitelist_content_types("application/pkcs10") | ||||
| @@ -61,7 +63,7 @@ class RequestListResource(object): | ||||
|  | ||||
|                 # Automatic enroll with Kerberos machine cerdentials | ||||
|                 resp.set_header("Content-Type", "application/x-pem-file") | ||||
|                 cert, resp.body = authority._sign(csr, body, overwrite=True) | ||||
|                 cert, resp.body = self.authority._sign(csr, body, overwrite=True) | ||||
|                 logger.info("Automatically enrolled Kerberos authenticated machine %s from %s", | ||||
|                     machine, req.context.get("remote_addr")) | ||||
|                 return | ||||
| @@ -72,7 +74,7 @@ class RequestListResource(object): | ||||
|         Attempt to renew certificate using currently valid key pair | ||||
|         """ | ||||
|         try: | ||||
|             path, buf, cert, signed, expires = authority.get_signed(common_name) | ||||
|             path, buf, cert, signed, expires = self.authority.get_signed(common_name) | ||||
|         except EnvironmentError: | ||||
|             pass # No currently valid certificate for this common name | ||||
|         else: | ||||
| @@ -112,7 +114,7 @@ class RequestListResource(object): | ||||
|                             reasons.append("Renewal requested, but not allowed by authority settings") | ||||
|                         else: | ||||
|                             resp.set_header("Content-Type", "application/x-x509-user-cert") | ||||
|                             _, resp.body = authority._sign(csr, body, overwrite=True) | ||||
|                             _, resp.body = self.authority._sign(csr, body, overwrite=True) | ||||
|                             logger.info("Renewed certificate for %s", common_name) | ||||
|                             return | ||||
|  | ||||
| @@ -122,12 +124,12 @@ class RequestListResource(object): | ||||
|         autosigning was requested and certificate can be automatically signed | ||||
|         """ | ||||
|         if req.get_param_as_bool("autosign"): | ||||
|             if not authority.server_flags(common_name): | ||||
|             if not self.authority.server_flags(common_name): | ||||
|                 for subnet in config.AUTOSIGN_SUBNETS: | ||||
|                     if req.context.get("remote_addr") in subnet: | ||||
|                         try: | ||||
|                             resp.set_header("Content-Type", "application/x-pem-file") | ||||
|                             _, resp.body = authority._sign(csr, body) | ||||
|                             _, resp.body = self.authority._sign(csr, body) | ||||
|                             logger.info("Autosigned %s as %s is whitelisted", common_name, req.context.get("remote_addr")) | ||||
|                             return | ||||
|                         except EnvironmentError: | ||||
| @@ -142,7 +144,7 @@ class RequestListResource(object): | ||||
|  | ||||
|         # Attempt to save the request otherwise | ||||
|         try: | ||||
|             request_path, _, _ = authority.store_request(body, | ||||
|             request_path, _, _ = self.authority.store_request(body, | ||||
|                 address=str(req.context.get("remote_addr"))) | ||||
|         except errors.RequestExists: | ||||
|             reasons.append("Same request already uploaded exists") | ||||
| @@ -176,13 +178,16 @@ class RequestListResource(object): | ||||
|  | ||||
|  | ||||
| class RequestDetailResource(object): | ||||
|     def __init__(self, authority): | ||||
|         self.authority = authority | ||||
|  | ||||
|     def on_get(self, req, resp, cn): | ||||
|         """ | ||||
|         Fetch certificate signing request as PEM | ||||
|         """ | ||||
|  | ||||
|         try: | ||||
|             path, buf, _, submitted = authority.get_request(cn) | ||||
|             path, buf, _, submitted = self.authority.get_request(cn) | ||||
|         except errors.RequestDoesNotExist: | ||||
|             logger.warning("Failed to serve non-existant request %s to %s", | ||||
|                 cn, req.context.get("remote_addr")) | ||||
| @@ -206,7 +211,7 @@ class RequestDetailResource(object): | ||||
|             resp.body = json.dumps(dict( | ||||
|                 submitted = submitted, | ||||
|                 common_name = cn, | ||||
|                 server = authority.server_flags(cn), | ||||
|                 server = self.authority.server_flags(cn), | ||||
|                 address = getxattr(path, "user.request.address").decode("ascii"), # TODO: move to authority.py | ||||
|                 md5sum = hashlib.md5(buf).hexdigest(), | ||||
|                 sha1sum = hashlib.sha1(buf).hexdigest(), | ||||
| @@ -225,7 +230,7 @@ class RequestDetailResource(object): | ||||
|         Sign a certificate signing request | ||||
|         """ | ||||
|         try: | ||||
|             cert, buf = authority.sign(cn, | ||||
|             cert, buf = self.authority.sign(cn, | ||||
|                 profile=req.get_param("profile", default="default"), | ||||
|                 overwrite=True, | ||||
|                 signer=req.context.get("user").name) | ||||
| @@ -244,7 +249,7 @@ class RequestDetailResource(object): | ||||
|     @authorize_admin | ||||
|     def on_delete(self, req, resp, cn): | ||||
|         try: | ||||
|             authority.delete_request(cn) | ||||
|             self.authority.delete_request(cn) | ||||
|             # Logging implemented in the function above | ||||
|         except errors.RequestDoesNotExist as e: | ||||
|             resp.body = "No certificate signing request for %s found" % cn | ||||
|   | ||||
		Reference in New Issue
	
	Block a user