created "init_config_token_demo"

This commit is contained in:
Oscar Krause 2025-04-10 21:38:25 +02:00
parent 7ce79ec95b
commit c15cdee610
2 changed files with 190 additions and 122 deletions

View File

@ -6,7 +6,7 @@ from datetime import datetime, timedelta, UTC
from hashlib import sha256
from json import loads as json_loads
from os import getenv as env
from os.path import join, dirname
from os.path import join, dirname, isfile
from uuid import uuid4
from dateutil.relativedelta import relativedelta
@ -18,10 +18,11 @@ from jose.constants import ALGORITHMS
from sqlalchemy import create_engine
from sqlalchemy.orm import sessionmaker
from starlette.middleware.cors import CORSMiddleware
from starlette.responses import StreamingResponse, JSONResponse as JSONr, HTMLResponse as HTMLr, Response, RedirectResponse
from starlette.responses import StreamingResponse, JSONResponse as JSONr, HTMLResponse as HTMLr, Response, \
RedirectResponse
from orm import Origin, Lease, init as db_init, migrate
from util import PrivateKey, PublicKey, load_file
from util import PrivateKey, PublicKey, load_file, Cert
# Load variables
load_dotenv('../version.env')
@ -421,7 +422,7 @@ async def leasing_v1_config_token(request: Request):
logger.debug(f'Headers: {request.headers}')
logger.debug(f'Request: {j}')
# todo: THIS IS A DEMO ONLY - THIS ENDPOINT GENERATES A NEW ROOT-CA EVERY TIME IT IS CALLED !!!
# todo: THIS IS A DEMO ONLY
###
#
@ -429,6 +430,14 @@ async def leasing_v1_config_token(request: Request):
#
###
root_private_key_filename = join(dirname(__file__), 'cert/my_demo_root_private_key.pem')
root_certificate_filename = join(dirname(__file__), 'cert/my_demo_root_certificate.pem')
ca_private_key_filename = join(dirname(__file__), 'cert/my_demo_ca_private_key.pem')
ca_certificate_filename = join(dirname(__file__), 'cert/my_demo_ca_certificate.pem')
si_private_key_filename = join(dirname(__file__), 'cert/my_demo_si_private_key.pem')
si_certificate_filename = join(dirname(__file__), 'cert/my_demo_si_certificate.pem')
def init_config_token_demo():
from cryptography import x509
from cryptography.hazmat._oid import NameOID
from cryptography.hazmat.primitives import serialization, hashes
@ -463,6 +472,18 @@ async def leasing_v1_config_token(request: Request):
.add_extension(x509.SubjectKeyIdentifier.from_public_key(my_root_public_key), critical=False)
.sign(my_root_private_key, hashes.SHA256()))
my_root_private_key_as_pem = my_root_private_key.private_bytes(
encoding=serialization.Encoding.PEM,
format=serialization.PrivateFormat.TraditionalOpenSSL,
encryption_algorithm=serialization.NoEncryption(),
)
with open(root_private_key_filename, 'wb') as f:
f.write(my_root_private_key_as_pem)
with open(root_certificate_filename, 'wb') as f:
f.write(my_root_certificate.public_bytes(encoding=Encoding.PEM))
""" Create CA (Intermediate) Key and Certificate """
# create ca keypair
@ -498,8 +519,17 @@ async def leasing_v1_config_token(request: Request):
), critical=False)
.sign(my_root_private_key, hashes.SHA256()))
# with open('caChain_my.pem', 'wb') as f:
# f.write(my_ca_certificate.public_bytes(encoding=Encoding.PEM))
my_ca_private_key_as_pem = my_ca_private_key.private_bytes(
encoding=serialization.Encoding.PEM,
format=serialization.PrivateFormat.TraditionalOpenSSL,
encryption_algorithm=serialization.NoEncryption(),
)
with open(ca_private_key_filename, 'wb') as f:
f.write(my_ca_private_key_as_pem)
with open(ca_certificate_filename, 'wb') as f:
f.write(my_ca_certificate.public_bytes(encoding=Encoding.PEM))
""" Create Service-Instance Key and Certificate """
@ -517,15 +547,15 @@ async def leasing_v1_config_token(request: Request):
format=serialization.PublicFormat.SubjectPublicKeyInfo,
)
# with open('instance.private.pem', 'wb') as f:
# f.write(my_si_private_key_as_pem)
with open(si_private_key_filename, 'wb') as f:
f.write(my_si_private_key_as_pem)
# with open('instance.public.pem', 'wb') as f:
# f.write(my_si_public_key_as_pem)
# create si-certificate subject
my_si_subject = x509.Name([
#x509.NameAttribute(NameOID.COMMON_NAME, INSTANCE_REF),
# x509.NameAttribute(NameOID.COMMON_NAME, INSTANCE_REF),
x509.NameAttribute(NameOID.COMMON_NAME, j.get('service_instance_ref')),
])
@ -551,7 +581,7 @@ async def leasing_v1_config_token(request: Request):
my_ca_certificate.extensions.get_extension_for_class(x509.SubjectKeyIdentifier).value
), critical=False)
.add_extension(x509.SubjectAlternativeName([
#x509.DNSName(INSTANCE_REF)
# x509.DNSName(INSTANCE_REF)
x509.DNSName(j.get('service_instance_ref'))
]), critical=False)
.sign(my_ca_private_key, hashes.SHA256()))
@ -559,8 +589,22 @@ async def leasing_v1_config_token(request: Request):
my_si_public_key_exp = my_si_certificate.public_key().public_numbers().e
my_si_public_key_mod = f'{my_si_certificate.public_key().public_numbers().n:x}' # hex value without "0x" prefix
# with open('cert_my.pem', 'wb') as f:
# f.write(my_si_certificate.public_bytes(encoding=Encoding.PEM))
with open(si_certificate_filename, 'wb') as f:
f.write(my_si_certificate.public_bytes(encoding=Encoding.PEM))
if not (isfile(root_private_key_filename)
and isfile(ca_private_key_filename)
and isfile(ca_certificate_filename)
and isfile(si_private_key_filename)
and isfile(si_certificate_filename)):
init_config_token_demo()
my_ca_certificate = Cert.from_file(ca_certificate_filename)
my_si_certificate = Cert.from_file(si_certificate_filename)
my_si_private_key = PrivateKey.from_file(si_private_key_filename)
my_si_private_key_as_pem = my_si_private_key.pem()
my_si_public_key = my_si_private_key.public_key().raw()
my_si_public_key_as_pem = my_si_private_key.public_key().pem()
""" build out payload """
@ -590,8 +634,8 @@ async def leasing_v1_config_token(request: Request):
my_jwt_encode_key = jwk.construct(my_si_private_key_as_pem.decode('utf-8'), algorithm=ALGORITHMS.RS256)
config_token = jws.sign(payload, key=my_jwt_encode_key, headers=None, algorithm=ALGORITHMS.RS256)
response_ca_chain = my_ca_certificate.public_bytes(encoding=Encoding.PEM).decode('utf-8')
response_si_certificate = my_si_certificate.public_bytes(encoding=Encoding.PEM).decode('utf-8')
response_ca_chain = my_ca_certificate.pem().decode('utf-8')
response_si_certificate = my_si_certificate.pem().decode('utf-8')
response = {
"certificateConfiguration": {
@ -600,8 +644,8 @@ async def leasing_v1_config_token(request: Request):
# 76 chars per line
"publicCert": response_si_certificate,
"publicKey": {
"exp": int(my_si_certificate.public_key().public_numbers().e),
"mod": [hex(my_si_certificate.public_key().public_numbers().n)[2:]],
"exp": int(my_si_certificate.raw().public_key().public_numbers().e),
"mod": [hex(my_si_certificate.raw().public_key().public_numbers().n)[2:]],
},
},
"configToken": config_token,

View File

@ -3,6 +3,7 @@ import logging
from cryptography.hazmat.primitives import serialization
from cryptography.hazmat.primitives.asymmetric.rsa import RSAPrivateKey, RSAPublicKey, generate_private_key
from cryptography.hazmat.primitives.serialization import load_pem_private_key, load_pem_public_key
from cryptography.x509 import load_pem_x509_certificate, Certificate
logging.basicConfig()
@ -76,6 +77,29 @@ class PublicKey:
format=serialization.PublicFormat.SubjectPublicKeyInfo
)
class Cert:
def __init__(self, data: bytes):
self.__cert = load_pem_x509_certificate(data)
@staticmethod
def from_file(filename: str) -> "Cert":
log = logging.getLogger(__name__)
log.debug(f'Importing Certificate from "{filename}"')
with open(filename, 'rb') as f:
data = f.read()
return Cert(data=data.strip())
def raw(self) -> Certificate:
return self.__cert
def pem(self) -> bytes:
return self.__cert.public_bytes(encoding=serialization.Encoding.PEM)
def load_file(filename: str) -> bytes:
log = logging.getLogger(f'{__name__}')
log.debug(f'Loading contents of file "{filename}')