pinecrypt-gateway-ocsp-resp.../ocsp_responder.py

164 lines
5.6 KiB
Python
Executable File

#!/usr/bin/env python
import os
import pytz
from asn1crypto.util import timezone
from asn1crypto import ocsp, pem
from datetime import datetime, timedelta
from math import inf
from motor.motor_asyncio import AsyncIOMotorClient
from oscrypto import asymmetric
from prometheus_client import Counter, Histogram
from sanic import Sanic, response
from sanic_prometheus import monitor
from time import sleep
ocsp_request_valid = Counter(
"pinecrypt_gateway_ocsp_request_valid",
"Valid OCSP requests")
ocsp_request_list_size = Histogram(
"pinecrypt_gateway_ocsp_request_list_size",
"Histogram of OCSP request list size",
buckets=(1, 2, 3, inf))
ocsp_request_size_bytes = Histogram(
"pinecrypt_gateway_ocsp_request_size_bytes",
"Histogram of OCSP request size in bytes",
buckets=(100, 200, 500, 1000, 2000, 5000, 10000, inf))
ocsp_request_nonces = Histogram(
"pinecrypt_gateway_ocsp_request_nonces",
"Histogram of nonce count per request",
buckets=(1, 2, 3, inf))
ocsp_response_status = Counter(
"pinecrypt_gateway_ocsp_response_status",
"Status responses",
["status"])
app = Sanic("events")
monitor(app).expose_endpoint()
while True:
try:
# Load CA certificate
with open("/server-secrets/ca_cert.pem", "rb") as fh:
authority_cert = asymmetric.load_certificate(fh.read())
except FileNotFoundError:
sleep(1)
continue
else:
break
# Load CA private key
with open("/authority-secrets/ca_key.pem", "rb") as fh:
key_buf = fh.read()
header, _, key_der_bytes = pem.unarmor(key_buf)
private_key = asymmetric.load_private_key(key_der_bytes)
CLOCK_SKEW_TOLERANCE = timedelta(minutes=5)
DEBUG = bool(os.getenv("DEBUG"))
MONGO_URI = os.getenv("MONGO_URI", "mongodb://127.0.0.1:27017/default?replicaSet=rs0")
@app.listener("before_server_start")
async def setup_db(app, loop):
# TODO: find cleaner way to do this, for more see
# https://github.com/sanic-org/sanic/issues/919
app.ctx.db = AsyncIOMotorClient(MONGO_URI).get_default_database()
@app.route("/api/ocsp/", methods=["POST"])
async def view_ocsp_responder(request):
sign_algo = {
"ec": "sha1_ecdsa",
"rsa": "sha1_rsa"
}[authority_cert.public_key.algorithm]
sign_func = {
"ec": asymmetric.ecdsa_sign,
"rsa": asymmetric.rsa_pkcs1v15_sign
}[authority_cert.public_key.algorithm]
ocsp_request_size_bytes.observe(len(request.body))
ocsp_req = ocsp.OCSPRequest.load(request.body)
now = datetime.now(timezone.utc).replace(microsecond=0)
response_extensions = []
nonces = 0
for ext in ocsp_req["tbs_request"]["request_extensions"]:
if ext["extn_id"].native == "nonce":
nonces += 1
response_extensions.append(
ocsp.ResponseDataExtension({
"extn_id": "nonce",
"critical": False,
"extn_value": ext["extn_value"]
})
)
ocsp_request_nonces.observe(nonces)
ocsp_request_valid.inc()
responses = []
ocsp_request_list_size.observe(len(ocsp_req["tbs_request"]["request_list"]))
for item in ocsp_req["tbs_request"]["request_list"]:
serial = item["req_cert"]["serial_number"].native
assert serial > 0, "Serial number correctness check failed"
doc = await app.ctx.db.certidude_certificates.find_one({"serial_number": "%x" % serial})
if doc:
if doc["status"] == "signed":
status = ocsp.CertStatus(name="good", value=None)
ocsp_response_status.labels("good").inc()
elif doc["status"] == "revoked":
status = ocsp.CertStatus(
name="revoked",
value={
"revocation_time": doc["revoked"].replace(tzinfo=pytz.UTC),
"revocation_reason": doc["revocation_reason"],
})
ocsp_response_status.labels("revoked").inc()
else:
# This should not happen, if it does database is mangled
raise ValueError("Invalid/unknown certificate status '%s'" % doc["status"])
else:
status = ocsp.CertStatus(name="unknown", value=None)
ocsp_response_status.labels("unknown").inc()
responses.append({
"cert_id": {
"hash_algorithm": {
"algorithm": "sha1"
},
"issuer_name_hash": authority_cert.asn1.subject.sha1,
"issuer_key_hash": authority_cert.public_key.asn1.sha1,
"serial_number": serial,
},
"cert_status": status,
"this_update": now - CLOCK_SKEW_TOLERANCE,
"next_update": now + timedelta(minutes=15) + CLOCK_SKEW_TOLERANCE,
"single_extensions": []
})
response_data = ocsp.ResponseData({
"responder_id": ocsp.ResponderId(name="by_key", value=authority_cert.public_key.asn1.sha1),
"produced_at": now,
"responses": responses,
"response_extensions": response_extensions
})
return response.raw(ocsp.OCSPResponse({
"response_status": "successful",
"response_bytes": {
"response_type": "basic_ocsp_response",
"response": {
"tbs_response_data": response_data,
"certs": [authority_cert.asn1],
"signature_algorithm": {"algorithm": sign_algo},
"signature": sign_func(private_key, response_data.dump(), "sha1")
}
}
}).dump(), headers={"Content-Type": "application/ocsp-response"})
app.run(port=5001, debug=DEBUG)