From c15cdee6103c5dcd439f439d1a1dcda0e0aef210 Mon Sep 17 00:00:00 2001 From: Oscar Krause Date: Thu, 10 Apr 2025 21:38:25 +0200 Subject: [PATCH] created "init_config_token_demo" --- app/main.py | 288 ++++++++++++++++++++++++++++++---------------------- app/util.py | 24 +++++ 2 files changed, 190 insertions(+), 122 deletions(-) diff --git a/app/main.py b/app/main.py index b3d5247..17bff68 100644 --- a/app/main.py +++ b/app/main.py @@ -6,7 +6,7 @@ from datetime import datetime, timedelta, UTC from hashlib import sha256 from json import loads as json_loads from os import getenv as env -from os.path import join, dirname +from os.path import join, dirname, isfile from uuid import uuid4 from dateutil.relativedelta import relativedelta @@ -18,10 +18,11 @@ from jose.constants import ALGORITHMS from sqlalchemy import create_engine from sqlalchemy.orm import sessionmaker from starlette.middleware.cors import CORSMiddleware -from starlette.responses import StreamingResponse, JSONResponse as JSONr, HTMLResponse as HTMLr, Response, RedirectResponse +from starlette.responses import StreamingResponse, JSONResponse as JSONr, HTMLResponse as HTMLr, Response, \ + RedirectResponse from orm import Origin, Lease, init as db_init, migrate -from util import PrivateKey, PublicKey, load_file +from util import PrivateKey, PublicKey, load_file, Cert # Load variables load_dotenv('../version.env') @@ -421,7 +422,7 @@ async def leasing_v1_config_token(request: Request): logger.debug(f'Headers: {request.headers}') logger.debug(f'Request: {j}') - # todo: THIS IS A DEMO ONLY - THIS ENDPOINT GENERATES A NEW ROOT-CA EVERY TIME IT IS CALLED !!! + # todo: THIS IS A DEMO ONLY ### # @@ -429,138 +430,181 @@ async def leasing_v1_config_token(request: Request): # ### - from cryptography import x509 - from cryptography.hazmat._oid import NameOID - from cryptography.hazmat.primitives import serialization, hashes - from cryptography.hazmat.primitives.asymmetric.rsa import generate_private_key - from cryptography.hazmat.primitives.serialization import Encoding + root_private_key_filename = join(dirname(__file__), 'cert/my_demo_root_private_key.pem') + root_certificate_filename = join(dirname(__file__), 'cert/my_demo_root_certificate.pem') + ca_private_key_filename = join(dirname(__file__), 'cert/my_demo_ca_private_key.pem') + ca_certificate_filename = join(dirname(__file__), 'cert/my_demo_ca_certificate.pem') + si_private_key_filename = join(dirname(__file__), 'cert/my_demo_si_private_key.pem') + si_certificate_filename = join(dirname(__file__), 'cert/my_demo_si_certificate.pem') - """ Create Root Key and Certificate """ + def init_config_token_demo(): + from cryptography import x509 + from cryptography.hazmat._oid import NameOID + from cryptography.hazmat.primitives import serialization, hashes + from cryptography.hazmat.primitives.asymmetric.rsa import generate_private_key + from cryptography.hazmat.primitives.serialization import Encoding - # create root keypair - my_root_private_key = generate_private_key(public_exponent=65537, key_size=4096) - my_root_public_key = my_root_private_key.public_key() + """ Create Root Key and Certificate """ - # create root-certificate subject - my_root_subject = x509.Name([ - x509.NameAttribute(NameOID.COUNTRY_NAME, u'US'), - x509.NameAttribute(NameOID.STATE_OR_PROVINCE_NAME, u'California'), - x509.NameAttribute(NameOID.ORGANIZATION_NAME, u'Nvidia'), - x509.NameAttribute(NameOID.ORGANIZATIONAL_UNIT_NAME, u'Nvidia Licensing Service (NLS)'), - x509.NameAttribute(NameOID.COMMON_NAME, u'NLS Root CA'), - ]) + # create root keypair + my_root_private_key = generate_private_key(public_exponent=65537, key_size=4096) + my_root_public_key = my_root_private_key.public_key() - # create self-signed root-certificate - my_root_certificate = ( - x509.CertificateBuilder() - .subject_name(my_root_subject) - .issuer_name(my_root_subject) - .public_key(my_root_public_key) - .serial_number(x509.random_serial_number()) - .not_valid_before(datetime.now(tz=UTC) - timedelta(days=1)) - .not_valid_after(datetime.now(tz=UTC) + timedelta(days=365 * 10)) - .add_extension(x509.BasicConstraints(ca=True, path_length=None), critical=True) - .add_extension(x509.SubjectKeyIdentifier.from_public_key(my_root_public_key), critical=False) - .sign(my_root_private_key, hashes.SHA256())) + # create root-certificate subject + my_root_subject = x509.Name([ + x509.NameAttribute(NameOID.COUNTRY_NAME, u'US'), + x509.NameAttribute(NameOID.STATE_OR_PROVINCE_NAME, u'California'), + x509.NameAttribute(NameOID.ORGANIZATION_NAME, u'Nvidia'), + x509.NameAttribute(NameOID.ORGANIZATIONAL_UNIT_NAME, u'Nvidia Licensing Service (NLS)'), + x509.NameAttribute(NameOID.COMMON_NAME, u'NLS Root CA'), + ]) - """ Create CA (Intermediate) Key and Certificate """ + # create self-signed root-certificate + my_root_certificate = ( + x509.CertificateBuilder() + .subject_name(my_root_subject) + .issuer_name(my_root_subject) + .public_key(my_root_public_key) + .serial_number(x509.random_serial_number()) + .not_valid_before(datetime.now(tz=UTC) - timedelta(days=1)) + .not_valid_after(datetime.now(tz=UTC) + timedelta(days=365 * 10)) + .add_extension(x509.BasicConstraints(ca=True, path_length=None), critical=True) + .add_extension(x509.SubjectKeyIdentifier.from_public_key(my_root_public_key), critical=False) + .sign(my_root_private_key, hashes.SHA256())) - # create ca keypair - my_ca_private_key = generate_private_key(public_exponent=65537, key_size=4096) - my_ca_public_key = my_ca_private_key.public_key() + my_root_private_key_as_pem = my_root_private_key.private_bytes( + encoding=serialization.Encoding.PEM, + format=serialization.PrivateFormat.TraditionalOpenSSL, + encryption_algorithm=serialization.NoEncryption(), + ) - # create ca-certificate subject - my_ca_subject = x509.Name([ - x509.NameAttribute(NameOID.COUNTRY_NAME, u'US'), - x509.NameAttribute(NameOID.STATE_OR_PROVINCE_NAME, u'California'), - x509.NameAttribute(NameOID.ORGANIZATION_NAME, u'Nvidia'), - x509.NameAttribute(NameOID.ORGANIZATIONAL_UNIT_NAME, u'Nvidia Licensing Service (NLS)'), - x509.NameAttribute(NameOID.COMMON_NAME, u'NLS Intermediate CA'), - ]) + with open(root_private_key_filename, 'wb') as f: + f.write(my_root_private_key_as_pem) - # create self-signed ca-certificate - my_ca_certificate = ( - x509.CertificateBuilder() - .subject_name(my_ca_subject) - .issuer_name(my_root_subject) - .public_key(my_ca_public_key) - .serial_number(x509.random_serial_number()) - .not_valid_before(datetime.now(tz=UTC) - timedelta(days=1)) - .not_valid_after(datetime.now(tz=UTC) + timedelta(days=365 * 10)) - .add_extension(x509.BasicConstraints(ca=True, path_length=None), critical=True) - .add_extension(x509.KeyUsage(digital_signature=False, key_encipherment=False, key_cert_sign=True, - key_agreement=False, content_commitment=False, data_encipherment=False, - crl_sign=True, encipher_only=False, decipher_only=False), critical=True) - .add_extension(x509.SubjectKeyIdentifier.from_public_key(my_ca_public_key), critical=False) - # .add_extension(x509.AuthorityKeyIdentifier.from_issuer_public_key(my_root_public_key), critical=False) - .add_extension(x509.AuthorityKeyIdentifier.from_issuer_subject_key_identifier( - my_root_certificate.extensions.get_extension_for_class(x509.SubjectKeyIdentifier).value - ), critical=False) - .sign(my_root_private_key, hashes.SHA256())) + with open(root_certificate_filename, 'wb') as f: + f.write(my_root_certificate.public_bytes(encoding=Encoding.PEM)) - # with open('caChain_my.pem', 'wb') as f: - # f.write(my_ca_certificate.public_bytes(encoding=Encoding.PEM)) + """ Create CA (Intermediate) Key and Certificate """ - """ Create Service-Instance Key and Certificate """ + # create ca keypair + my_ca_private_key = generate_private_key(public_exponent=65537, key_size=4096) + my_ca_public_key = my_ca_private_key.public_key() - # create si keypair - my_si_private_key = generate_private_key(public_exponent=65537, key_size=2048) - my_si_public_key = my_si_private_key.public_key() + # create ca-certificate subject + my_ca_subject = x509.Name([ + x509.NameAttribute(NameOID.COUNTRY_NAME, u'US'), + x509.NameAttribute(NameOID.STATE_OR_PROVINCE_NAME, u'California'), + x509.NameAttribute(NameOID.ORGANIZATION_NAME, u'Nvidia'), + x509.NameAttribute(NameOID.ORGANIZATIONAL_UNIT_NAME, u'Nvidia Licensing Service (NLS)'), + x509.NameAttribute(NameOID.COMMON_NAME, u'NLS Intermediate CA'), + ]) - my_si_private_key_as_pem = my_si_private_key.private_bytes( - encoding=serialization.Encoding.PEM, - format=serialization.PrivateFormat.TraditionalOpenSSL, - encryption_algorithm=serialization.NoEncryption(), - ) - my_si_public_key_as_pem = my_si_public_key.public_bytes( - encoding=serialization.Encoding.PEM, - format=serialization.PublicFormat.SubjectPublicKeyInfo, - ) + # create self-signed ca-certificate + my_ca_certificate = ( + x509.CertificateBuilder() + .subject_name(my_ca_subject) + .issuer_name(my_root_subject) + .public_key(my_ca_public_key) + .serial_number(x509.random_serial_number()) + .not_valid_before(datetime.now(tz=UTC) - timedelta(days=1)) + .not_valid_after(datetime.now(tz=UTC) + timedelta(days=365 * 10)) + .add_extension(x509.BasicConstraints(ca=True, path_length=None), critical=True) + .add_extension(x509.KeyUsage(digital_signature=False, key_encipherment=False, key_cert_sign=True, + key_agreement=False, content_commitment=False, data_encipherment=False, + crl_sign=True, encipher_only=False, decipher_only=False), critical=True) + .add_extension(x509.SubjectKeyIdentifier.from_public_key(my_ca_public_key), critical=False) + # .add_extension(x509.AuthorityKeyIdentifier.from_issuer_public_key(my_root_public_key), critical=False) + .add_extension(x509.AuthorityKeyIdentifier.from_issuer_subject_key_identifier( + my_root_certificate.extensions.get_extension_for_class(x509.SubjectKeyIdentifier).value + ), critical=False) + .sign(my_root_private_key, hashes.SHA256())) - # with open('instance.private.pem', 'wb') as f: - # f.write(my_si_private_key_as_pem) + my_ca_private_key_as_pem = my_ca_private_key.private_bytes( + encoding=serialization.Encoding.PEM, + format=serialization.PrivateFormat.TraditionalOpenSSL, + encryption_algorithm=serialization.NoEncryption(), + ) - # with open('instance.public.pem', 'wb') as f: - # f.write(my_si_public_key_as_pem) + with open(ca_private_key_filename, 'wb') as f: + f.write(my_ca_private_key_as_pem) - # create si-certificate subject - my_si_subject = x509.Name([ - #x509.NameAttribute(NameOID.COMMON_NAME, INSTANCE_REF), - x509.NameAttribute(NameOID.COMMON_NAME, j.get('service_instance_ref')), - ]) + with open(ca_certificate_filename, 'wb') as f: + f.write(my_ca_certificate.public_bytes(encoding=Encoding.PEM)) - # create self-signed si-certificate - my_si_certificate = ( - x509.CertificateBuilder() - .subject_name(my_si_subject) - .issuer_name(my_ca_subject) - .public_key(my_si_public_key) - .serial_number(x509.random_serial_number()) - .not_valid_before(datetime.now(tz=UTC) - timedelta(days=1)) - .not_valid_after(datetime.now(tz=UTC) + timedelta(days=365 * 10)) - .add_extension(x509.KeyUsage(digital_signature=True, key_encipherment=True, key_cert_sign=False, - key_agreement=True, content_commitment=False, data_encipherment=False, - crl_sign=False, encipher_only=False, decipher_only=False), critical=True) - .add_extension(x509.ExtendedKeyUsage([ - x509.oid.ExtendedKeyUsageOID.SERVER_AUTH, - x509.oid.ExtendedKeyUsageOID.CLIENT_AUTH] - ), critical=False) - .add_extension(x509.SubjectKeyIdentifier.from_public_key(my_si_public_key), critical=False) - # .add_extension(x509.AuthorityKeyIdentifier.from_issuer_public_key(my_ca_public_key), critical=False) - .add_extension(x509.AuthorityKeyIdentifier.from_issuer_subject_key_identifier( - my_ca_certificate.extensions.get_extension_for_class(x509.SubjectKeyIdentifier).value - ), critical=False) - .add_extension(x509.SubjectAlternativeName([ - #x509.DNSName(INSTANCE_REF) - x509.DNSName(j.get('service_instance_ref')) - ]), critical=False) - .sign(my_ca_private_key, hashes.SHA256())) + """ Create Service-Instance Key and Certificate """ - my_si_public_key_exp = my_si_certificate.public_key().public_numbers().e - my_si_public_key_mod = f'{my_si_certificate.public_key().public_numbers().n:x}' # hex value without "0x" prefix + # create si keypair + my_si_private_key = generate_private_key(public_exponent=65537, key_size=2048) + my_si_public_key = my_si_private_key.public_key() - # with open('cert_my.pem', 'wb') as f: - # f.write(my_si_certificate.public_bytes(encoding=Encoding.PEM)) + my_si_private_key_as_pem = my_si_private_key.private_bytes( + encoding=serialization.Encoding.PEM, + format=serialization.PrivateFormat.TraditionalOpenSSL, + encryption_algorithm=serialization.NoEncryption(), + ) + my_si_public_key_as_pem = my_si_public_key.public_bytes( + encoding=serialization.Encoding.PEM, + format=serialization.PublicFormat.SubjectPublicKeyInfo, + ) + + with open(si_private_key_filename, 'wb') as f: + f.write(my_si_private_key_as_pem) + + # with open('instance.public.pem', 'wb') as f: + # f.write(my_si_public_key_as_pem) + + # create si-certificate subject + my_si_subject = x509.Name([ + # x509.NameAttribute(NameOID.COMMON_NAME, INSTANCE_REF), + x509.NameAttribute(NameOID.COMMON_NAME, j.get('service_instance_ref')), + ]) + + # create self-signed si-certificate + my_si_certificate = ( + x509.CertificateBuilder() + .subject_name(my_si_subject) + .issuer_name(my_ca_subject) + .public_key(my_si_public_key) + .serial_number(x509.random_serial_number()) + .not_valid_before(datetime.now(tz=UTC) - timedelta(days=1)) + .not_valid_after(datetime.now(tz=UTC) + timedelta(days=365 * 10)) + .add_extension(x509.KeyUsage(digital_signature=True, key_encipherment=True, key_cert_sign=False, + key_agreement=True, content_commitment=False, data_encipherment=False, + crl_sign=False, encipher_only=False, decipher_only=False), critical=True) + .add_extension(x509.ExtendedKeyUsage([ + x509.oid.ExtendedKeyUsageOID.SERVER_AUTH, + x509.oid.ExtendedKeyUsageOID.CLIENT_AUTH] + ), critical=False) + .add_extension(x509.SubjectKeyIdentifier.from_public_key(my_si_public_key), critical=False) + # .add_extension(x509.AuthorityKeyIdentifier.from_issuer_public_key(my_ca_public_key), critical=False) + .add_extension(x509.AuthorityKeyIdentifier.from_issuer_subject_key_identifier( + my_ca_certificate.extensions.get_extension_for_class(x509.SubjectKeyIdentifier).value + ), critical=False) + .add_extension(x509.SubjectAlternativeName([ + # x509.DNSName(INSTANCE_REF) + x509.DNSName(j.get('service_instance_ref')) + ]), critical=False) + .sign(my_ca_private_key, hashes.SHA256())) + + my_si_public_key_exp = my_si_certificate.public_key().public_numbers().e + my_si_public_key_mod = f'{my_si_certificate.public_key().public_numbers().n:x}' # hex value without "0x" prefix + + with open(si_certificate_filename, 'wb') as f: + f.write(my_si_certificate.public_bytes(encoding=Encoding.PEM)) + + if not (isfile(root_private_key_filename) + and isfile(ca_private_key_filename) + and isfile(ca_certificate_filename) + and isfile(si_private_key_filename) + and isfile(si_certificate_filename)): + init_config_token_demo() + + my_ca_certificate = Cert.from_file(ca_certificate_filename) + my_si_certificate = Cert.from_file(si_certificate_filename) + my_si_private_key = PrivateKey.from_file(si_private_key_filename) + my_si_private_key_as_pem = my_si_private_key.pem() + my_si_public_key = my_si_private_key.public_key().raw() + my_si_public_key_as_pem = my_si_private_key.public_key().pem() """ build out payload """ @@ -590,8 +634,8 @@ async def leasing_v1_config_token(request: Request): my_jwt_encode_key = jwk.construct(my_si_private_key_as_pem.decode('utf-8'), algorithm=ALGORITHMS.RS256) config_token = jws.sign(payload, key=my_jwt_encode_key, headers=None, algorithm=ALGORITHMS.RS256) - response_ca_chain = my_ca_certificate.public_bytes(encoding=Encoding.PEM).decode('utf-8') - response_si_certificate = my_si_certificate.public_bytes(encoding=Encoding.PEM).decode('utf-8') + response_ca_chain = my_ca_certificate.pem().decode('utf-8') + response_si_certificate = my_si_certificate.pem().decode('utf-8') response = { "certificateConfiguration": { @@ -600,8 +644,8 @@ async def leasing_v1_config_token(request: Request): # 76 chars per line "publicCert": response_si_certificate, "publicKey": { - "exp": int(my_si_certificate.public_key().public_numbers().e), - "mod": [hex(my_si_certificate.public_key().public_numbers().n)[2:]], + "exp": int(my_si_certificate.raw().public_key().public_numbers().e), + "mod": [hex(my_si_certificate.raw().public_key().public_numbers().n)[2:]], }, }, "configToken": config_token, diff --git a/app/util.py b/app/util.py index 1aae17b..14f20f2 100644 --- a/app/util.py +++ b/app/util.py @@ -3,6 +3,7 @@ import logging from cryptography.hazmat.primitives import serialization from cryptography.hazmat.primitives.asymmetric.rsa import RSAPrivateKey, RSAPublicKey, generate_private_key from cryptography.hazmat.primitives.serialization import load_pem_private_key, load_pem_public_key +from cryptography.x509 import load_pem_x509_certificate, Certificate logging.basicConfig() @@ -76,6 +77,29 @@ class PublicKey: format=serialization.PublicFormat.SubjectPublicKeyInfo ) + +class Cert: + + def __init__(self, data: bytes): + self.__cert = load_pem_x509_certificate(data) + + @staticmethod + def from_file(filename: str) -> "Cert": + log = logging.getLogger(__name__) + log.debug(f'Importing Certificate from "{filename}"') + + with open(filename, 'rb') as f: + data = f.read() + + return Cert(data=data.strip()) + + def raw(self) -> Certificate: + return self.__cert + + def pem(self) -> bytes: + return self.__cert.public_bytes(encoding=serialization.Encoding.PEM) + + def load_file(filename: str) -> bytes: log = logging.getLogger(f'{__name__}') log.debug(f'Loading contents of file "{filename}')