Refactor imports to use explicit submodule imports and organize class/function imports

This commit is contained in:
olszomal 2024-09-06 10:56:55 +02:00 committed by Michał Trojnara
parent 27686c0b0c
commit 4ee429792d

View File

@ -4,14 +4,61 @@
import os import os
import datetime import datetime
import cryptography import cryptography
from cryptography import x509
from cryptography.x509.oid import NameOID
from cryptography.hazmat.primitives import hashes
from cryptography.hazmat.primitives import serialization
from cryptography.hazmat.primitives.asymmetric import rsa
# Explicit imports of cryptography submodules
import cryptography.x509
import cryptography.x509.oid
import cryptography.hazmat.primitives.hashes
import cryptography.hazmat.primitives.asymmetric.rsa
import cryptography.hazmat.primitives.serialization
import cryptography.hazmat.primitives.serialization.pkcs12 import cryptography.hazmat.primitives.serialization.pkcs12
# Import classes and functions from the cryptography module
from cryptography.x509 import (
AuthorityKeyIdentifier,
BasicConstraints,
Certificate,
CertificateBuilder,
CertificateRevocationListBuilder,
CRLDistributionPoints,
CRLNumber,
CRLReason,
DistributionPoint,
DNSName,
ExtendedKeyUsage,
KeyUsage,
Name,
NameAttribute,
NameConstraints,
random_serial_number,
RevokedCertificateBuilder,
ReasonFlags,
SubjectKeyIdentifier,
UniformResourceIdentifier
)
from cryptography.x509.oid import (
ExtendedKeyUsageOID,
NameOID
)
from cryptography.hazmat.primitives.hashes import SHA256
from cryptography.hazmat.primitives.asymmetric.rsa import (
generate_private_key,
RSAPrivateKey
)
from cryptography.hazmat.primitives.serialization import (
BestAvailableEncryption,
Encoding,
NoEncryption,
PrivateFormat
)
from cryptography.hazmat.primitives.serialization.pkcs12 import serialize_key_and_certificates
try:
if cryptography.__version__ >= '38.0.0':
from cryptography.hazmat.primitives.serialization.pkcs12 import PBES
except ImportError:
pass
RESULT_PATH = os.getcwd() RESULT_PATH = os.getcwd()
CERTS_PATH = os.path.join(RESULT_PATH, "./Testing/certs/") CERTS_PATH = os.path.join(RESULT_PATH, "./Testing/certs/")
@ -30,25 +77,25 @@ class X509Extensions():
self.port = cdp_port self.port = cdp_port
self.name = cdp_name self.name = cdp_name
def create_x509_name(self, common_name) -> x509.Name: def create_x509_name(self, common_name) -> Name:
"""Return x509.Name""" """Return x509.Name"""
return x509.Name( return Name(
[ [
x509.NameAttribute(NameOID.COUNTRY_NAME, "PL"), NameAttribute(NameOID.COUNTRY_NAME, "PL"),
x509.NameAttribute(NameOID.STATE_OR_PROVINCE_NAME, "Mazovia Province"), NameAttribute(NameOID.STATE_OR_PROVINCE_NAME, "Mazovia Province"),
x509.NameAttribute(NameOID.LOCALITY_NAME, "Warsaw"), NameAttribute(NameOID.LOCALITY_NAME, "Warsaw"),
x509.NameAttribute(NameOID.ORGANIZATION_NAME, "osslsigncode"), NameAttribute(NameOID.ORGANIZATION_NAME, "osslsigncode"),
x509.NameAttribute(NameOID.ORGANIZATIONAL_UNIT_NAME, self.unit_name), NameAttribute(NameOID.ORGANIZATIONAL_UNIT_NAME, self.unit_name),
x509.NameAttribute(NameOID.COMMON_NAME, common_name) NameAttribute(NameOID.COMMON_NAME, common_name)
] ]
) )
def create_x509_crldp(self) -> x509.CRLDistributionPoints: def create_x509_crldp(self) -> CRLDistributionPoints:
"""Return x509.CRLDistributionPoints""" """Return x509.CRLDistributionPoints"""
return x509.CRLDistributionPoints( return CRLDistributionPoints(
[ [
x509.DistributionPoint( DistributionPoint(
full_name=[x509.UniformResourceIdentifier( full_name=[UniformResourceIdentifier(
"http://127.0.0.1:" + str(self.port) + "/" + str(self.name)) "http://127.0.0.1:" + str(self.port) + "/" + str(self.name))
], ],
relative_name=None, relative_name=None,
@ -58,10 +105,10 @@ class X509Extensions():
] ]
) )
def create_x509_name_constraints(self) -> x509.NameConstraints: def create_x509_name_constraints(self) -> NameConstraints:
"""Return x509.NameConstraints""" """Return x509.NameConstraints"""
return x509.NameConstraints( return NameConstraints(
permitted_subtrees = [x509.DNSName('test.com'), x509.DNSName('test.org')], permitted_subtrees = [DNSName('test.com'), DNSName('test.org')],
excluded_subtrees = None excluded_subtrees = None
) )
@ -73,14 +120,14 @@ class IntermediateCACertificate(X509Extensions):
self.issuer_key = issuer_key self.issuer_key = issuer_key
super().__init__("Certification Authority", 0, None) super().__init__("Certification Authority", 0, None)
def make_cert(self) -> (x509.Certificate, rsa.RSAPrivateKey): def make_cert(self) -> (Certificate, RSAPrivateKey):
"""Generate intermediate CA certificate""" """Generate intermediate CA certificate"""
key = rsa.generate_private_key(public_exponent=65537, key_size=2048) key = generate_private_key(public_exponent=65537, key_size=2048)
key_public = key.public_key() key_public = key.public_key()
authority_key = x509.AuthorityKeyIdentifier.from_issuer_subject_key_identifier( authority_key = AuthorityKeyIdentifier.from_issuer_subject_key_identifier(
self.issuer_cert.extensions.get_extension_for_class(x509.SubjectKeyIdentifier).value self.issuer_cert.extensions.get_extension_for_class(SubjectKeyIdentifier).value
) )
key_usage = x509.KeyUsage( key_usage = KeyUsage(
digital_signature=True, digital_signature=True,
content_commitment=False, content_commitment=False,
key_encipherment=False, key_encipherment=False,
@ -92,22 +139,22 @@ class IntermediateCACertificate(X509Extensions):
decipher_only=False decipher_only=False
) )
cert = ( cert = (
x509.CertificateBuilder() CertificateBuilder()
.subject_name(self.create_x509_name("Intermediate CA")) .subject_name(self.create_x509_name("Intermediate CA"))
.issuer_name(self.issuer_cert.subject) .issuer_name(self.issuer_cert.subject)
.public_key(key_public) .public_key(key_public)
.serial_number(x509.random_serial_number()) .serial_number(random_serial_number())
.not_valid_before(date_20180101) .not_valid_before(date_20180101)
.not_valid_after(date_20180101 + datetime.timedelta(days=7300)) .not_valid_after(date_20180101 + datetime.timedelta(days=7300))
.add_extension(x509.BasicConstraints(ca=True, path_length=0), critical=True) .add_extension(BasicConstraints(ca=True, path_length=0), critical=True)
.add_extension(x509.SubjectKeyIdentifier.from_public_key(key_public), critical=False) .add_extension(SubjectKeyIdentifier.from_public_key(key_public), critical=False)
.add_extension(authority_key, critical=False) .add_extension(authority_key, critical=False)
.add_extension(key_usage, critical=True) .add_extension(key_usage, critical=True)
.sign(self.issuer_key, hashes.SHA256()) .sign(self.issuer_key, SHA256())
) )
file_path=os.path.join(CERTS_PATH, "intermediateCA.pem") file_path=os.path.join(CERTS_PATH, "intermediateCA.pem")
with open(file_path, mode="wb") as file: with open(file_path, mode="wb") as file:
file.write(cert.public_bytes(encoding=serialization.Encoding.PEM)) file.write(cert.public_bytes(encoding=Encoding.PEM))
return cert, key return cert, key
@ -116,7 +163,7 @@ class RootCACertificate(X509Extensions):
"""Base class for Root CA certificate""" """Base class for Root CA certificate"""
def __init__(self): def __init__(self):
self.key_usage = x509.KeyUsage( self.key_usage = KeyUsage(
digital_signature=True, digital_signature=True,
content_commitment=False, content_commitment=False,
key_encipherment=False, key_encipherment=False,
@ -129,7 +176,7 @@ class RootCACertificate(X509Extensions):
) )
super().__init__("Certification Authority", 0, None) super().__init__("Certification Authority", 0, None)
def make_cert(self) -> (x509.Certificate, rsa.RSAPrivateKey): def make_cert(self) -> (Certificate, RSAPrivateKey):
"""Generate CA certificates""" """Generate CA certificates"""
ca_root, root_key = self.make_ca_cert("Trusted Root CA", "CAroot.pem") ca_root, root_key = self.make_ca_cert("Trusted Root CA", "CAroot.pem")
ca_cert, ca_key = self.make_ca_cert("Root CA", "CACert.pem") ca_cert, ca_key = self.make_ca_cert("Root CA", "CACert.pem")
@ -138,52 +185,52 @@ class RootCACertificate(X509Extensions):
def make_ca_cert(self, common_name, file_name) -> None: def make_ca_cert(self, common_name, file_name) -> None:
"""Generate self-signed root CA certificate""" """Generate self-signed root CA certificate"""
ca_key = rsa.generate_private_key(public_exponent=65537, key_size=2048) ca_key = generate_private_key(public_exponent=65537, key_size=2048)
ca_public = ca_key.public_key() ca_public = ca_key.public_key()
authority_key = x509.AuthorityKeyIdentifier.from_issuer_public_key(ca_public) authority_key = AuthorityKeyIdentifier.from_issuer_public_key(ca_public)
name = self.create_x509_name(common_name) name = self.create_x509_name(common_name)
ca_cert = ( ca_cert = (
x509.CertificateBuilder() CertificateBuilder()
.subject_name(name) .subject_name(name)
.issuer_name(name) .issuer_name(name)
.public_key(ca_public) .public_key(ca_public)
.serial_number(x509.random_serial_number()) .serial_number(random_serial_number())
.not_valid_before(date_20170101) .not_valid_before(date_20170101)
.not_valid_after(date_20170101 + datetime.timedelta(days=7300)) .not_valid_after(date_20170101 + datetime.timedelta(days=7300))
.add_extension(x509.BasicConstraints(ca=True, path_length=None), critical=True) .add_extension(BasicConstraints(ca=True, path_length=None), critical=True)
.add_extension(x509.SubjectKeyIdentifier.from_public_key(ca_public), critical=False) .add_extension(SubjectKeyIdentifier.from_public_key(ca_public), critical=False)
.add_extension(authority_key, critical=False) .add_extension(authority_key, critical=False)
.add_extension(self.key_usage, critical=True) .add_extension(self.key_usage, critical=True)
.sign(ca_key, hashes.SHA256()) .sign(ca_key, SHA256())
) )
file_path=os.path.join(CERTS_PATH, file_name) file_path=os.path.join(CERTS_PATH, file_name)
with open(file_path, mode="wb") as file: with open(file_path, mode="wb") as file:
file.write(ca_cert.public_bytes(encoding=serialization.Encoding.PEM)) file.write(ca_cert.public_bytes(encoding=Encoding.PEM))
return ca_cert, ca_key return ca_cert, ca_key
def make_cross_cert(self, ca_root, root_key, ca_cert, ca_key) -> None: def make_cross_cert(self, ca_root, root_key, ca_cert, ca_key) -> None:
"""Generate cross-signed root CA certificate""" """Generate cross-signed root CA certificate"""
ca_public = ca_key.public_key() ca_public = ca_key.public_key()
authority_key = x509.AuthorityKeyIdentifier.from_issuer_subject_key_identifier( authority_key = AuthorityKeyIdentifier.from_issuer_subject_key_identifier(
ca_root.extensions.get_extension_for_class(x509.SubjectKeyIdentifier).value ca_root.extensions.get_extension_for_class(SubjectKeyIdentifier).value
) )
ca_cross = ( ca_cross = (
x509.CertificateBuilder() CertificateBuilder()
.subject_name(ca_cert.subject) .subject_name(ca_cert.subject)
.issuer_name(ca_root.subject) .issuer_name(ca_root.subject)
.public_key(ca_public) .public_key(ca_public)
.serial_number(ca_cert.serial_number) .serial_number(ca_cert.serial_number)
.not_valid_before(date_20180101) .not_valid_before(date_20180101)
.not_valid_after(date_20180101 + datetime.timedelta(days=7300)) .not_valid_after(date_20180101 + datetime.timedelta(days=7300))
.add_extension(x509.BasicConstraints(ca=True, path_length=None), critical=True) .add_extension(BasicConstraints(ca=True, path_length=None), critical=True)
.add_extension(x509.SubjectKeyIdentifier.from_public_key(ca_public), critical=False) .add_extension(SubjectKeyIdentifier.from_public_key(ca_public), critical=False)
.add_extension(authority_key, critical=False) .add_extension(authority_key, critical=False)
.add_extension(self.key_usage, critical=True) .add_extension(self.key_usage, critical=True)
.sign(root_key, hashes.SHA256()) .sign(root_key, SHA256())
) )
file_path=os.path.join(CERTS_PATH, "CAcross.pem") file_path=os.path.join(CERTS_PATH, "CAcross.pem")
with open(file_path, mode="wb") as file: with open(file_path, mode="wb") as file:
file.write(ca_cross.public_bytes(encoding=serialization.Encoding.PEM)) file.write(ca_cross.public_bytes(encoding=Encoding.PEM))
def write_key(self, key, file_name) -> None: def write_key(self, key, file_name) -> None:
"""Write a private RSA key""" """Write a private RSA key"""
@ -196,27 +243,27 @@ class RootCACertificate(X509Extensions):
file_path = os.path.join(CERTS_PATH, file_name + "p.pem") file_path = os.path.join(CERTS_PATH, file_name + "p.pem")
with open(file_path, mode="wb") as file: with open(file_path, mode="wb") as file:
file.write(key.private_bytes( file.write(key.private_bytes(
encoding=serialization.Encoding.PEM, encoding=Encoding.PEM,
format=serialization.PrivateFormat.PKCS8, format=PrivateFormat.PKCS8,
encryption_algorithm=serialization.BestAvailableEncryption(PASSWORD.encode()) encryption_algorithm=BestAvailableEncryption(PASSWORD.encode())
) )
) )
# Write decrypted key in PEM format # Write decrypted key in PEM format
file_path = os.path.join(CERTS_PATH, file_name + ".pem") file_path = os.path.join(CERTS_PATH, file_name + ".pem")
with open(file_path, mode="wb") as file: with open(file_path, mode="wb") as file:
file.write(key.private_bytes( file.write(key.private_bytes(
encoding=serialization.Encoding.PEM, encoding=Encoding.PEM,
format=serialization.PrivateFormat.PKCS8, format=PrivateFormat.PKCS8,
encryption_algorithm=serialization.NoEncryption() encryption_algorithm=NoEncryption()
) )
) )
# Write the key in DER format # Write the key in DER format
file_path = os.path.join(CERTS_PATH, file_name + ".der") file_path = os.path.join(CERTS_PATH, file_name + ".der")
with open(file_path, mode="wb") as file: with open(file_path, mode="wb") as file:
file.write(key.private_bytes( file.write(key.private_bytes(
encoding=serialization.Encoding.DER, encoding=Encoding.DER,
format=serialization.PrivateFormat.PKCS8, format=PrivateFormat.PKCS8,
encryption_algorithm=serialization.NoEncryption() encryption_algorithm=NoEncryption()
) )
) )
@ -227,13 +274,13 @@ class TSARootCACertificate(X509Extensions):
def __init__(self): def __init__(self):
super().__init__("Timestamp Authority Root CA", 0, None) super().__init__("Timestamp Authority Root CA", 0, None)
def make_cert(self) -> (x509.Certificate, rsa.RSAPrivateKey): def make_cert(self) -> (Certificate, RSAPrivateKey):
"""Generate a Time Stamp Authority certificate""" """Generate a Time Stamp Authority certificate"""
ca_key = rsa.generate_private_key(public_exponent=65537, key_size=2048) ca_key = generate_private_key(public_exponent=65537, key_size=2048)
ca_public = ca_key.public_key() ca_public = ca_key.public_key()
authority_key = x509.AuthorityKeyIdentifier.from_issuer_public_key(ca_public) authority_key = AuthorityKeyIdentifier.from_issuer_public_key(ca_public)
name = self.create_x509_name("TSA Root CA") name = self.create_x509_name("TSA Root CA")
key_usage = x509.KeyUsage( key_usage = KeyUsage(
digital_signature=False, digital_signature=False,
content_commitment=False, content_commitment=False,
key_encipherment=False, key_encipherment=False,
@ -245,22 +292,22 @@ class TSARootCACertificate(X509Extensions):
decipher_only=False decipher_only=False
) )
ca_cert = ( ca_cert = (
x509.CertificateBuilder() CertificateBuilder()
.subject_name(name) .subject_name(name)
.issuer_name(name) .issuer_name(name)
.public_key(ca_public) .public_key(ca_public)
.serial_number(x509.random_serial_number()) .serial_number(random_serial_number())
.not_valid_before(date_20170101) .not_valid_before(date_20170101)
.not_valid_after(date_20170101 + datetime.timedelta(days=7300)) .not_valid_after(date_20170101 + datetime.timedelta(days=7300))
.add_extension(x509.BasicConstraints(ca=True, path_length=None), critical=True) .add_extension(BasicConstraints(ca=True, path_length=None), critical=True)
.add_extension(x509.SubjectKeyIdentifier.from_public_key(ca_public), critical=False) .add_extension(SubjectKeyIdentifier.from_public_key(ca_public), critical=False)
.add_extension(authority_key, critical=False) .add_extension(authority_key, critical=False)
.add_extension(key_usage, critical=True) .add_extension(key_usage, critical=True)
.sign(ca_key, hashes.SHA256()) .sign(ca_key, SHA256())
) )
file_path=os.path.join(CERTS_PATH, "TSACA.pem") file_path=os.path.join(CERTS_PATH, "TSACA.pem")
with open(file_path, mode="wb") as file: with open(file_path, mode="wb") as file:
file.write(ca_cert.public_bytes(encoding=serialization.Encoding.PEM)) file.write(ca_cert.public_bytes(encoding=Encoding.PEM))
return ca_cert, ca_key return ca_cert, ca_key
@ -269,14 +316,14 @@ class TSARootCACertificate(X509Extensions):
file_path = os.path.join(CERTS_PATH, file_name + ".key") file_path = os.path.join(CERTS_PATH, file_name + ".key")
with open(file_path, mode="wb") as file: with open(file_path, mode="wb") as file:
file.write(key.private_bytes( file.write(key.private_bytes(
encoding=serialization.Encoding.PEM, encoding=Encoding.PEM,
format=serialization.PrivateFormat.PKCS8, format=PrivateFormat.PKCS8,
encryption_algorithm=serialization.NoEncryption() encryption_algorithm=NoEncryption()
) )
) )
class Certificate(X509Extensions): class LeafCertificate(X509Extensions):
"""Base class for a leaf certificate""" """Base class for a leaf certificate"""
def __init__(self, issuer_cert, issuer_key, unit_name, common_name, cdp_port, cdp_name): def __init__(self, issuer_cert, issuer_key, unit_name, common_name, cdp_port, cdp_name):
@ -286,78 +333,78 @@ class Certificate(X509Extensions):
self.common_name = common_name self.common_name = common_name
super().__init__(unit_name, cdp_port, cdp_name) super().__init__(unit_name, cdp_port, cdp_name)
def make_cert(self, public_key, not_before, days) -> x509.Certificate: def make_cert(self, public_key, not_before, days) -> Certificate:
"""Generate a leaf certificate""" """Generate a leaf certificate"""
authority_key = x509.AuthorityKeyIdentifier.from_issuer_subject_key_identifier( authority_key = AuthorityKeyIdentifier.from_issuer_subject_key_identifier(
self.issuer_cert.extensions.get_extension_for_class(x509.SubjectKeyIdentifier).value self.issuer_cert.extensions.get_extension_for_class(SubjectKeyIdentifier).value
) )
extended_key_usage = x509.ExtendedKeyUsage( extended_key_usage = ExtendedKeyUsage(
[x509.oid.ExtendedKeyUsageOID.CODE_SIGNING] [ExtendedKeyUsageOID.CODE_SIGNING]
) )
cert = ( cert = (
x509.CertificateBuilder() CertificateBuilder()
.subject_name(self.create_x509_name(self.common_name)) .subject_name(self.create_x509_name(self.common_name))
.issuer_name(self.issuer_cert.subject) .issuer_name(self.issuer_cert.subject)
.public_key(public_key) .public_key(public_key)
.serial_number(x509.random_serial_number()) .serial_number(random_serial_number())
.not_valid_before(not_before) .not_valid_before(not_before)
.not_valid_after(not_before + datetime.timedelta(days=days)) .not_valid_after(not_before + datetime.timedelta(days=days))
.add_extension(x509.BasicConstraints(ca=False, path_length=None), critical=False) .add_extension(BasicConstraints(ca=False, path_length=None), critical=False)
.add_extension(x509.SubjectKeyIdentifier.from_public_key(public_key), critical=False) .add_extension(SubjectKeyIdentifier.from_public_key(public_key), critical=False)
.add_extension(authority_key, critical=False) .add_extension(authority_key, critical=False)
.add_extension(extended_key_usage, critical=False) .add_extension(extended_key_usage, critical=False)
.add_extension(self.create_x509_crldp(), critical=False) .add_extension(self.create_x509_crldp(), critical=False)
.sign(self.issuer_key, hashes.SHA256()) .sign(self.issuer_key, SHA256())
) )
# Write PEM file and attach intermediate certificate # Write PEM file and attach intermediate certificate
file_path = os.path.join(CERTS_PATH, self.common_name + ".pem") file_path = os.path.join(CERTS_PATH, self.common_name + ".pem")
with open(file_path, mode="wb") as file: with open(file_path, mode="wb") as file:
file.write(cert.public_bytes(encoding=serialization.Encoding.PEM)) file.write(cert.public_bytes(encoding=Encoding.PEM))
file.write(self.issuer_cert.public_bytes(encoding=serialization.Encoding.PEM)) file.write(self.issuer_cert.public_bytes(encoding=Encoding.PEM))
return cert return cert
def revoke_cert(self, serial_number, file_name) -> None: def revoke_cert(self, serial_number, file_name) -> None:
"""Revoke a certificate""" """Revoke a certificate"""
revoked = ( revoked = (
x509.RevokedCertificateBuilder() RevokedCertificateBuilder()
.serial_number(serial_number) .serial_number(serial_number)
.revocation_date(date_20190101) .revocation_date(date_20190101)
.add_extension(x509.CRLReason(x509.ReasonFlags.superseded), critical=False) .add_extension(CRLReason(ReasonFlags.superseded), critical=False)
.build() .build()
) )
# Generate CRL # Generate CRL
authority_key = x509.AuthorityKeyIdentifier.from_issuer_subject_key_identifier( authority_key = AuthorityKeyIdentifier.from_issuer_subject_key_identifier(
self.issuer_cert.extensions.get_extension_for_class(x509.SubjectKeyIdentifier).value self.issuer_cert.extensions.get_extension_for_class(SubjectKeyIdentifier).value
) )
crl = ( crl = (
x509.CertificateRevocationListBuilder() CertificateRevocationListBuilder()
.issuer_name(self.issuer_cert.subject) .issuer_name(self.issuer_cert.subject)
.last_update(date_20190101) .last_update(date_20190101)
.next_update(date_20190101 + datetime.timedelta(days=7300)) .next_update(date_20190101 + datetime.timedelta(days=7300))
.add_extension(authority_key, critical=False) .add_extension(authority_key, critical=False)
.add_extension(x509.CRLNumber(4097), critical=False) .add_extension(CRLNumber(4097), critical=False)
.add_revoked_certificate(revoked) .add_revoked_certificate(revoked)
.sign(self.issuer_key, hashes.SHA256()) .sign(self.issuer_key, SHA256())
) )
# Write CRL file # Write CRL file
file_path = os.path.join(CERTS_PATH, file_name + ".pem") file_path = os.path.join(CERTS_PATH, file_name + ".pem")
with open(file_path, mode="wb") as file: with open(file_path, mode="wb") as file:
file.write(crl.public_bytes(encoding=serialization.Encoding.PEM)) file.write(crl.public_bytes(encoding=Encoding.PEM))
file_path = os.path.join(CERTS_PATH, file_name + ".der") file_path = os.path.join(CERTS_PATH, file_name + ".der")
with open(file_path, mode="wb") as file: with open(file_path, mode="wb") as file:
file.write(crl.public_bytes(encoding=serialization.Encoding.DER)) file.write(crl.public_bytes(encoding=Encoding.DER))
class LeafCACertificate(Certificate): class LeafCACertificate(LeafCertificate):
"""Base class for a leaf certificate""" """Base class for a leaf certificate"""
def __init__(self, issuer_cert, issuer_key, common, cdp_port): def __init__(self, issuer_cert, issuer_key, common, cdp_port):
super().__init__(issuer_cert, issuer_key, "CSP", common, cdp_port, "intermediateCA") super().__init__(issuer_cert, issuer_key, "CSP", common, cdp_port, "intermediateCA")
class LeafTSACertificate(Certificate): class LeafTSACertificate(LeafCertificate):
"""Base class for a TSA leaf certificate""" """Base class for a TSA leaf certificate"""
def __init__(self, issuer_cert, issuer_key, common, cdp_port): def __init__(self, issuer_cert, issuer_key, common, cdp_port):
@ -366,40 +413,40 @@ class LeafTSACertificate(Certificate):
self.common_name = common self.common_name = common
super().__init__(issuer_cert, issuer_key, "Timestamp Root CA", common, cdp_port, "TSACA") super().__init__(issuer_cert, issuer_key, "Timestamp Root CA", common, cdp_port, "TSACA")
def make_cert(self, public_key, not_before, days) -> x509.Certificate: def make_cert(self, public_key, not_before, days) -> Certificate:
"""Generate a TSA leaf certificate""" """Generate a TSA leaf certificate"""
authority_key = x509.AuthorityKeyIdentifier.from_issuer_subject_key_identifier( authority_key = AuthorityKeyIdentifier.from_issuer_subject_key_identifier(
self.issuer_cert.extensions.get_extension_for_class(x509.SubjectKeyIdentifier).value self.issuer_cert.extensions.get_extension_for_class(SubjectKeyIdentifier).value
) )
# The TSA signing certificate must have exactly one extended key usage # The TSA signing certificate must have exactly one extended key usage
# assigned to it: timeStamping. The extended key usage must also be critical, # assigned to it: timeStamping. The extended key usage must also be critical,
# otherwise the certificate is going to be refused. # otherwise the certificate is going to be refused.
extended_key_usage = x509.ExtendedKeyUsage( extended_key_usage = ExtendedKeyUsage(
[x509.oid.ExtendedKeyUsageOID.TIME_STAMPING] [ExtendedKeyUsageOID.TIME_STAMPING]
) )
cert = ( cert = (
x509.CertificateBuilder() CertificateBuilder()
.subject_name(self.create_x509_name(self.common_name)) .subject_name(self.create_x509_name(self.common_name))
.issuer_name(self.issuer_cert.subject) .issuer_name(self.issuer_cert.subject)
.public_key(public_key) .public_key(public_key)
.serial_number(x509.random_serial_number()) .serial_number(random_serial_number())
.not_valid_before(not_before) .not_valid_before(not_before)
.not_valid_after(not_before + datetime.timedelta(days=days)) .not_valid_after(not_before + datetime.timedelta(days=days))
.add_extension(x509.BasicConstraints(ca=False, path_length=None), critical=True) .add_extension(BasicConstraints(ca=False, path_length=None), critical=True)
.add_extension(x509.SubjectKeyIdentifier.from_public_key(public_key), critical=False) .add_extension(SubjectKeyIdentifier.from_public_key(public_key), critical=False)
.add_extension(authority_key, critical=False) .add_extension(authority_key, critical=False)
.add_extension(extended_key_usage, critical=True) .add_extension(extended_key_usage, critical=True)
.add_extension(self.create_x509_crldp(), critical=False) .add_extension(self.create_x509_crldp(), critical=False)
.add_extension(self.create_x509_name_constraints(), critical=False) .add_extension(self.create_x509_name_constraints(), critical=False)
.sign(self.issuer_key, hashes.SHA256()) .sign(self.issuer_key, SHA256())
) )
# Write PEM file and attach intermediate certificate # Write PEM file and attach intermediate certificate
file_path = os.path.join(CERTS_PATH, self.common_name + ".pem") file_path = os.path.join(CERTS_PATH, self.common_name + ".pem")
with open(file_path, mode="wb") as file: with open(file_path, mode="wb") as file:
file.write(cert.public_bytes(encoding=serialization.Encoding.PEM)) file.write(cert.public_bytes(encoding=Encoding.PEM))
file.write(self.issuer_cert.public_bytes(encoding=serialization.Encoding.PEM)) file.write(self.issuer_cert.public_bytes(encoding=Encoding.PEM))
return cert return cert
@ -435,7 +482,7 @@ class CertificateMaker():
issuer_cert, issuer_key = intermediate.make_cert() issuer_cert, issuer_key = intermediate.make_cert()
# Generate private RSA key # Generate private RSA key
private_key = rsa.generate_private_key(public_exponent=65537, key_size=2048) private_key = generate_private_key(public_exponent=65537, key_size=2048)
public_key = private_key.public_key() public_key = private_key.public_key()
root.write_key(key=private_key, file_name="key") root.write_key(key=private_key, file_name="key")
@ -462,7 +509,7 @@ class CertificateMaker():
# Write DER file and attach intermediate certificate # Write DER file and attach intermediate certificate
file_path = os.path.join(CERTS_PATH, "cert.der") file_path = os.path.join(CERTS_PATH, "cert.der")
with open(file_path, mode="wb") as file: with open(file_path, mode="wb") as file:
file.write(cert.public_bytes(encoding=serialization.Encoding.DER)) file.write(cert.public_bytes(encoding=Encoding.DER))
def make_tsa_certs(self): def make_tsa_certs(self):
"""Make test TSA certificates""" """Make test TSA certificates"""
@ -472,7 +519,7 @@ class CertificateMaker():
issuer_cert, issuer_key = root.make_cert() issuer_cert, issuer_key = root.make_cert()
# Generate private RSA key # Generate private RSA key
private_key = rsa.generate_private_key(public_exponent=65537, key_size=2048) private_key = generate_private_key(public_exponent=65537, key_size=2048)
public_key = private_key.public_key() public_key = private_key.public_key()
root.write_key(key=private_key, file_name="TSA") root.write_key(key=private_key, file_name="TSA")
@ -488,8 +535,8 @@ class CertificateMaker():
# Save the chain to be included in the TSA response # Save the chain to be included in the TSA response
file_path = os.path.join(CERTS_PATH, "tsa-chain.pem") file_path = os.path.join(CERTS_PATH, "tsa-chain.pem")
with open(file_path, mode="wb") as file: with open(file_path, mode="wb") as file:
file.write(cert.public_bytes(encoding=serialization.Encoding.PEM)) file.write(cert.public_bytes(encoding=Encoding.PEM))
file.write(issuer_cert.public_bytes(encoding=serialization.Encoding.PEM)) file.write(issuer_cert.public_bytes(encoding=Encoding.PEM))
def write_pkcs12_container(self, cert, key, issuer) -> None: def write_pkcs12_container(self, cert, key, issuer) -> None:
@ -501,16 +548,16 @@ class CertificateMaker():
# and private key encryption: DES-EDE3-CBC (vel 3DES_CBC) # and private key encryption: DES-EDE3-CBC (vel 3DES_CBC)
# pylint: disable=no-member # pylint: disable=no-member
encryption = ( encryption = (
serialization.PrivateFormat.PKCS12.encryption_builder() PrivateFormat.PKCS12.encryption_builder()
.key_cert_algorithm(serialization.pkcs12.PBES.PBESv1SHA1And3KeyTripleDESCBC) .key_cert_algorithm(PBES.PBESv1SHA1And3KeyTripleDESCBC)
.kdf_rounds(5000) .kdf_rounds(5000)
.build(PASSWORD.encode()) .build(PASSWORD.encode())
) )
else: else:
encryption = serialization.BestAvailableEncryption(PASSWORD.encode()) encryption = BestAvailableEncryption(PASSWORD.encode())
# Generate PKCS#12 struct # Generate PKCS#12 struct
pkcs12 = serialization.pkcs12.serialize_key_and_certificates( pkcs12 = serialize_key_and_certificates(
name=b'certificate', name=b'certificate',
key=key, key=key,
cert=cert, cert=cert,