From e23912c49ab14ac237ceeb39ec555a13b5f8f030 Mon Sep 17 00:00:00 2001 From: Oscar Krause Date: Wed, 16 Apr 2025 07:54:23 +0200 Subject: [PATCH] code refactorings - removed INSTANCE_KEY_RSA and INSTANCE_KEY_PUB from configuration and implemented CASetup instead --- .DEBIAN/env.default | 4 - .PKGBUILD/fastapi-dls.default | 4 - README.md | 6 +- app/main.py | 231 ++++------------------------------ app/util.py | 197 ++++++++++++++++++++++++++++- test/main.py | 29 ++--- 6 files changed, 236 insertions(+), 235 deletions(-) diff --git a/.DEBIAN/env.default b/.DEBIAN/env.default index 835f29e..0bb6549 100644 --- a/.DEBIAN/env.default +++ b/.DEBIAN/env.default @@ -21,7 +21,3 @@ DATABASE=sqlite:////etc/fastapi-dls/db.sqlite #SITE_KEY_XID="00000000-0000-0000-0000-000000000000" #INSTANCE_REF="10000000-0000-0000-0000-000000000001" #ALLOTMENT_REF="20000000-0000-0000-0000-000000000001" - -# Site-wide signing keys -INSTANCE_KEY_RSA=/etc/fastapi-dls/instance.private.pem -INSTANCE_KEY_PUB=/etc/fastapi-dls/instance.public.pem diff --git a/.PKGBUILD/fastapi-dls.default b/.PKGBUILD/fastapi-dls.default index 0bfb664..97258ae 100644 --- a/.PKGBUILD/fastapi-dls.default +++ b/.PKGBUILD/fastapi-dls.default @@ -19,10 +19,6 @@ DATABASE="sqlite:////var/lib/fastapi-dls/db.sqlite" SITE_KEY_XID="<>" INSTANCE_REF="<>" -# Site-wide signing keys -INSTANCE_KEY_RSA="/var/lib/fastapi-dls/instance.private.pem" -INSTANCE_KEY_PUB="/var/lib/fastapi-dls/instance.public.pem" - # TLS certificate INSTANCE_SSL_CERT="/var/lib/fastapi-dls/cert/webserver.crt" INSTANCE_SSL_KEY="/var/lib/fastapi-dls/cert/webserver.key" diff --git a/README.md b/README.md index 25d3588..9bc0094 100644 --- a/README.md +++ b/README.md @@ -435,9 +435,7 @@ After first success you have to replace `--issue` with `--renew`. | `CORS_ORIGINS` | `https://{DLS_URL}` | Sets `Access-Control-Allow-Origin` header (comma separated string) \*2 | | `SITE_KEY_XID` | `00000000-0000-0000-0000-000000000000` | Site identification uuid | | `INSTANCE_REF` | `10000000-0000-0000-0000-000000000001` | Instance identification uuid | -| `ALLOTMENT_REF` | `20000000-0000-0000-0000-000000000001` | Allotment identification uuid | -| `INSTANCE_KEY_RSA` | `/cert/instance.private.pem` | Site-wide private RSA key for singing JWTs \*3 | -| `INSTANCE_KEY_PUB` | `/cert/instance.public.pem` | Site-wide public key \*3 | +| `ALLOTMENT_REF` | `20000000-0000-0000-0000-000000000001` | Allotment identification uuid | | \*1 For example, if the lease period is one day and the renewal period is 20%, the client attempts to renew its license every 4.8 hours. If network connectivity is lost, the loss of connectivity is detected during license renewal and the @@ -445,8 +443,6 @@ client has 19.2 hours in which to re-establish connectivity before its license e \*2 Always use `https`, since guest-drivers only support secure connections! -\*3 If you recreate your instance keys you need to **recreate client-token for each guest**! - # Setup (Client) **The token file has to be copied! It's not enough to C&P file contents, because there can be special characters.** diff --git a/app/main.py b/app/main.py index 8652b62..0894adb 100644 --- a/app/main.py +++ b/app/main.py @@ -6,7 +6,7 @@ from datetime import datetime, timedelta, UTC from hashlib import sha256 from json import loads as json_loads, dumps as json_dumps from os import getenv as env -from os.path import join, dirname, isfile +from os.path import join, dirname from uuid import uuid4 from dateutil.relativedelta import relativedelta @@ -21,7 +21,7 @@ from sqlalchemy.orm import sessionmaker from starlette.middleware.cors import CORSMiddleware from orm import Origin, Lease, init as db_init, migrate -from util import PrivateKey, PublicKey, load_file, Cert, ProductMapping +from util import CASetup, PrivateKey, PublicKey, load_file, Cert, ProductMapping # Load variables load_dotenv('../version.env') @@ -42,8 +42,6 @@ DLS_PORT = int(env('DLS_PORT', '443')) SITE_KEY_XID = str(env('SITE_KEY_XID', '00000000-0000-0000-0000-000000000000')) INSTANCE_REF = str(env('INSTANCE_REF', '10000000-0000-0000-0000-000000000001')) ALLOTMENT_REF = str(env('ALLOTMENT_REF', '20000000-0000-0000-0000-000000000001')) -INSTANCE_KEY_RSA = PrivateKey.from_file(str(env('INSTANCE_KEY_RSA', join(dirname(__file__), 'cert/instance.private.pem')))) -INSTANCE_KEY_PUB = PublicKey.from_file(str(env('INSTANCE_KEY_PUB', join(dirname(__file__), 'cert/instance.public.pem')))) TOKEN_EXPIRE_DELTA = relativedelta(days=int(env('TOKEN_EXPIRE_DAYS', 1)), hours=int(env('TOKEN_EXPIRE_HOURS', 0))) LEASE_EXPIRE_DELTA = relativedelta(days=int(env('LEASE_EXPIRE_DAYS', 90)), hours=int(env('LEASE_EXPIRE_HOURS', 0))) LEASE_RENEWAL_PERIOD = float(env('LEASE_RENEWAL_PERIOD', 0.15)) @@ -53,9 +51,15 @@ CORS_ORIGINS = str(env('CORS_ORIGINS', '')).split(',') if (env('CORS_ORIGINS')) DT_FORMAT = '%Y-%m-%dT%H:%M:%S.%fZ' PRODUCT_MAPPING = ProductMapping(filename=join(dirname(__file__), 'static/product_mapping.json')) +# Create certificate chain and signing keys +ca_setup = CASetup(service_instance_ref=INSTANCE_REF) +my_ca_certificate = Cert.from_file(ca_setup.ca_certificate_filename) +my_si_certificate = Cert.from_file(ca_setup.si_certificate_filename) +my_si_private_key = PrivateKey.from_file(ca_setup.si_private_key_filename) +my_si_public_key = my_si_private_key.public_key() -jwt_encode_key = jwk.construct(INSTANCE_KEY_RSA.pem(), algorithm=ALGORITHMS.RS256) -jwt_decode_key = jwk.construct(INSTANCE_KEY_PUB.pem(), algorithm=ALGORITHMS.RS256) +jwt_encode_key = jwk.construct(my_si_private_key.pem(), algorithm=ALGORITHMS.RS256) +jwt_decode_key = jwk.construct(my_si_private_key.public_key().pem(), algorithm=ALGORITHMS.RS256) # Logging LOG_LEVEL = logging.DEBUG if DEBUG else logging.INFO @@ -268,10 +272,10 @@ async def _client_token(): }, "service_instance_public_key_configuration": { "service_instance_public_key_me": { - "mod": hex(INSTANCE_KEY_PUB.raw().public_numbers().n)[2:], - "exp": int(INSTANCE_KEY_PUB.raw().public_numbers().e), + "mod": my_si_public_key.mod(), + "exp": my_si_public_key.exp(), }, - "service_instance_public_key_pem": INSTANCE_KEY_PUB.pem().decode('utf-8'), + "service_instance_public_key_pem": my_si_private_key.public_key().pem().decode('utf-8'), "key_retention_mode": "LATEST_ONLY" }, } @@ -431,190 +435,6 @@ async def leasing_v1_config_token(request: Request): logger.debug(f'Headers: {request.headers}') logger.debug(f'Request: {j}') - # todo: THIS IS A DEMO ONLY - - ### - # - # https://git.collinwebdesigns.de/nvidia/nls/-/blob/main/src/test/test_config_token.py - # - ### - - root_private_key_filename = join(dirname(__file__), 'cert/my_demo_root_private_key.pem') - root_certificate_filename = join(dirname(__file__), 'cert/my_demo_root_certificate.pem') - ca_private_key_filename = join(dirname(__file__), 'cert/my_demo_ca_private_key.pem') - ca_certificate_filename = join(dirname(__file__), 'cert/my_demo_ca_certificate.pem') - si_private_key_filename = join(dirname(__file__), 'cert/my_demo_si_private_key.pem') - si_certificate_filename = join(dirname(__file__), 'cert/my_demo_si_certificate.pem') - - def init_config_token_demo(): - from cryptography import x509 - from cryptography.hazmat._oid import NameOID - from cryptography.hazmat.primitives import serialization, hashes - from cryptography.hazmat.primitives.asymmetric.rsa import generate_private_key - from cryptography.hazmat.primitives.serialization import Encoding - - """ Create Root Key and Certificate """ - - # create root keypair - my_root_private_key = generate_private_key(public_exponent=65537, key_size=4096) - my_root_public_key = my_root_private_key.public_key() - - # create root-certificate subject - my_root_subject = x509.Name([ - x509.NameAttribute(NameOID.COUNTRY_NAME, u'US'), - x509.NameAttribute(NameOID.STATE_OR_PROVINCE_NAME, u'California'), - x509.NameAttribute(NameOID.ORGANIZATION_NAME, u'Nvidia'), - x509.NameAttribute(NameOID.ORGANIZATIONAL_UNIT_NAME, u'Nvidia Licensing Service (NLS)'), - x509.NameAttribute(NameOID.COMMON_NAME, u'NLS Root CA'), - ]) - - # create self-signed root-certificate - my_root_certificate = ( - x509.CertificateBuilder() - .subject_name(my_root_subject) - .issuer_name(my_root_subject) - .public_key(my_root_public_key) - .serial_number(x509.random_serial_number()) - .not_valid_before(datetime.now(tz=UTC) - timedelta(days=1)) - .not_valid_after(datetime.now(tz=UTC) + timedelta(days=365 * 10)) - .add_extension(x509.BasicConstraints(ca=True, path_length=None), critical=True) - .add_extension(x509.SubjectKeyIdentifier.from_public_key(my_root_public_key), critical=False) - .sign(my_root_private_key, hashes.SHA256())) - - my_root_private_key_as_pem = my_root_private_key.private_bytes( - encoding=serialization.Encoding.PEM, - format=serialization.PrivateFormat.TraditionalOpenSSL, - encryption_algorithm=serialization.NoEncryption(), - ) - - with open(root_private_key_filename, 'wb') as f: - f.write(my_root_private_key_as_pem) - - with open(root_certificate_filename, 'wb') as f: - f.write(my_root_certificate.public_bytes(encoding=Encoding.PEM)) - - """ Create CA (Intermediate) Key and Certificate """ - - # create ca keypair - my_ca_private_key = generate_private_key(public_exponent=65537, key_size=4096) - my_ca_public_key = my_ca_private_key.public_key() - - # create ca-certificate subject - my_ca_subject = x509.Name([ - x509.NameAttribute(NameOID.COUNTRY_NAME, u'US'), - x509.NameAttribute(NameOID.STATE_OR_PROVINCE_NAME, u'California'), - x509.NameAttribute(NameOID.ORGANIZATION_NAME, u'Nvidia'), - x509.NameAttribute(NameOID.ORGANIZATIONAL_UNIT_NAME, u'Nvidia Licensing Service (NLS)'), - x509.NameAttribute(NameOID.COMMON_NAME, u'NLS Intermediate CA'), - ]) - - # create self-signed ca-certificate - my_ca_certificate = ( - x509.CertificateBuilder() - .subject_name(my_ca_subject) - .issuer_name(my_root_subject) - .public_key(my_ca_public_key) - .serial_number(x509.random_serial_number()) - .not_valid_before(datetime.now(tz=UTC) - timedelta(days=1)) - .not_valid_after(datetime.now(tz=UTC) + timedelta(days=365 * 10)) - .add_extension(x509.BasicConstraints(ca=True, path_length=None), critical=True) - .add_extension(x509.KeyUsage(digital_signature=False, key_encipherment=False, key_cert_sign=True, - key_agreement=False, content_commitment=False, data_encipherment=False, - crl_sign=True, encipher_only=False, decipher_only=False), critical=True) - .add_extension(x509.SubjectKeyIdentifier.from_public_key(my_ca_public_key), critical=False) - # .add_extension(x509.AuthorityKeyIdentifier.from_issuer_public_key(my_root_public_key), critical=False) - .add_extension(x509.AuthorityKeyIdentifier.from_issuer_subject_key_identifier( - my_root_certificate.extensions.get_extension_for_class(x509.SubjectKeyIdentifier).value - ), critical=False) - .sign(my_root_private_key, hashes.SHA256())) - - my_ca_private_key_as_pem = my_ca_private_key.private_bytes( - encoding=serialization.Encoding.PEM, - format=serialization.PrivateFormat.TraditionalOpenSSL, - encryption_algorithm=serialization.NoEncryption(), - ) - - with open(ca_private_key_filename, 'wb') as f: - f.write(my_ca_private_key_as_pem) - - with open(ca_certificate_filename, 'wb') as f: - f.write(my_ca_certificate.public_bytes(encoding=Encoding.PEM)) - - """ Create Service-Instance Key and Certificate """ - - # create si keypair - my_si_private_key = generate_private_key(public_exponent=65537, key_size=2048) - my_si_public_key = my_si_private_key.public_key() - - my_si_private_key_as_pem = my_si_private_key.private_bytes( - encoding=serialization.Encoding.PEM, - format=serialization.PrivateFormat.TraditionalOpenSSL, - encryption_algorithm=serialization.NoEncryption(), - ) - my_si_public_key_as_pem = my_si_public_key.public_bytes( - encoding=serialization.Encoding.PEM, - format=serialization.PublicFormat.SubjectPublicKeyInfo, - ) - - with open(si_private_key_filename, 'wb') as f: - f.write(my_si_private_key_as_pem) - - # with open('instance.public.pem', 'wb') as f: - # f.write(my_si_public_key_as_pem) - - # create si-certificate subject - my_si_subject = x509.Name([ - # x509.NameAttribute(NameOID.COMMON_NAME, INSTANCE_REF), - x509.NameAttribute(NameOID.COMMON_NAME, j.get('service_instance_ref')), - ]) - - # create self-signed si-certificate - my_si_certificate = ( - x509.CertificateBuilder() - .subject_name(my_si_subject) - .issuer_name(my_ca_subject) - .public_key(my_si_public_key) - .serial_number(x509.random_serial_number()) - .not_valid_before(datetime.now(tz=UTC) - timedelta(days=1)) - .not_valid_after(datetime.now(tz=UTC) + timedelta(days=365 * 10)) - .add_extension(x509.KeyUsage(digital_signature=True, key_encipherment=True, key_cert_sign=False, - key_agreement=True, content_commitment=False, data_encipherment=False, - crl_sign=False, encipher_only=False, decipher_only=False), critical=True) - .add_extension(x509.ExtendedKeyUsage([ - x509.oid.ExtendedKeyUsageOID.SERVER_AUTH, - x509.oid.ExtendedKeyUsageOID.CLIENT_AUTH] - ), critical=False) - .add_extension(x509.SubjectKeyIdentifier.from_public_key(my_si_public_key), critical=False) - # .add_extension(x509.AuthorityKeyIdentifier.from_issuer_public_key(my_ca_public_key), critical=False) - .add_extension(x509.AuthorityKeyIdentifier.from_issuer_subject_key_identifier( - my_ca_certificate.extensions.get_extension_for_class(x509.SubjectKeyIdentifier).value - ), critical=False) - .add_extension(x509.SubjectAlternativeName([ - # x509.DNSName(INSTANCE_REF) - x509.DNSName(j.get('service_instance_ref')) - ]), critical=False) - .sign(my_ca_private_key, hashes.SHA256())) - - my_si_public_key_exp = my_si_certificate.public_key().public_numbers().e - my_si_public_key_mod = f'{my_si_certificate.public_key().public_numbers().n:x}' # hex value without "0x" prefix - - with open(si_certificate_filename, 'wb') as f: - f.write(my_si_certificate.public_bytes(encoding=Encoding.PEM)) - - if not (isfile(root_private_key_filename) - and isfile(ca_private_key_filename) - and isfile(ca_certificate_filename) - and isfile(si_private_key_filename) - and isfile(si_certificate_filename)): - init_config_token_demo() - - my_ca_certificate = Cert.from_file(ca_certificate_filename) - my_si_certificate = Cert.from_file(si_certificate_filename) - my_si_private_key = PrivateKey.from_file(si_private_key_filename) - my_si_private_key_as_pem = my_si_private_key.pem() - my_si_public_key = my_si_private_key.public_key().raw() - my_si_public_key_as_pem = my_si_private_key.public_key().pem() - """ build out payload """ cur_time = datetime.now(UTC) @@ -631,16 +451,16 @@ async def leasing_v1_config_token(request: Request): "service_instance_ref": j.get('service_instance_ref'), "service_instance_public_key_configuration": { "service_instance_public_key_me": { - "mod": hex(my_si_public_key.public_numbers().n)[2:], - "exp": int(my_si_public_key.public_numbers().e), + "mod": my_si_public_key.mod(), + "exp": my_si_public_key.exp(), }, # 64 chars per line (pem default) - "service_instance_public_key_pem": my_si_public_key_as_pem.decode('utf-8').strip(), + "service_instance_public_key_pem": my_si_private_key.public_key().pem().decode('utf-8').strip(), "key_retention_mode": "LATEST_ONLY" }, } - my_jwt_encode_key = jwk.construct(my_si_private_key_as_pem.decode('utf-8'), algorithm=ALGORITHMS.RS256) + my_jwt_encode_key = jwk.construct(my_si_private_key.pem().decode('utf-8'), algorithm=ALGORITHMS.RS256) config_token = jws.sign(payload, key=my_jwt_encode_key, headers=None, algorithm=ALGORITHMS.RS256) response_ca_chain = my_ca_certificate.pem().decode('utf-8') @@ -722,17 +542,15 @@ async def leasing_v1_lessor(request: Request): } content = json_dumps(response, separators=(',', ':')) - content = f'{content}\n'.encode('utf-8') - signature = INSTANCE_KEY_RSA.generate_signature(content) + content = f'{content}\n'.encode('ascii') + signature = my_si_private_key.generate_signature(content) headers = { 'Content-Type': 'application/json', 'access-control-expose-headers': 'X-NLS-Signature', 'X-NLS-Signature': f'{signature.hex().encode()}' } - x = Response(content=content, media_type='text/plain') - x.raw_headers = [(k.encode("latin-1"), v.encode("latin-1")) for k, v in headers.items()] - return x + return Response(content=content, media_type='text/plain', headers=headers) # return JSONr(content, headers=headers) # venv/lib/python3.9/site-packages/nls_services_lease/test/test_lease_multi_controller.py @@ -784,17 +602,16 @@ async def leasing_v1_lease_renew(request: Request, lease_ref: str): Lease.renew(db, entity, expires, cur_time) content = json_dumps(response, separators=(',', ':')) - content = f'{content}\n'.encode('utf-8') - signature = INSTANCE_KEY_RSA.generate_signature(content) + content = f'{content}\n'.encode('ascii') + signature = my_si_private_key.generate_signature(content) headers = { 'Content-Type': 'application/json', 'access-control-expose-headers': 'X-NLS-Signature', 'X-NLS-Signature': f'{signature.hex().encode()}' } - x = Response(content=content, media_type='text/plain') - x.raw_headers = [(k.encode("latin-1"), v.encode("latin-1")) for k, v in headers.items()] - return x + return Response(content=content, media_type='text/plain', headers=headers) #return JSONr(content, headers=headers) + # venv/lib/python3.9/site-packages/nls_services_lease/test/test_lease_single_controller.py diff --git a/app/util.py b/app/util.py index 953b928..7b60026 100644 --- a/app/util.py +++ b/app/util.py @@ -1,15 +1,204 @@ import logging +from datetime import datetime, UTC, timedelta from json import loads as json_loads -from cryptography.hazmat.primitives import serialization +from os.path import join, dirname, isfile + +from cryptography import x509 +from cryptography.hazmat._oid import NameOID +from cryptography.hazmat.primitives import serialization, hashes from cryptography.hazmat.primitives.asymmetric.padding import PKCS1v15 from cryptography.hazmat.primitives.asymmetric.rsa import RSAPrivateKey, RSAPublicKey, generate_private_key from cryptography.hazmat.primitives.hashes import SHA256 +from cryptography.hazmat.primitives.serialization import Encoding from cryptography.hazmat.primitives.serialization import load_pem_private_key, load_pem_public_key from cryptography.x509 import load_pem_x509_certificate, Certificate logging.basicConfig() +class CASetup: + ### + # + # https://git.collinwebdesigns.de/nvidia/nls/-/blob/main/src/test/test_config_token.py + # + ### + + def __init__(self, service_instance_ref: str): + self.service_instance_ref = service_instance_ref + self.root_private_key_filename = join(dirname(__file__), 'cert/my_demo_root_private_key.pem') + self.root_certificate_filename = join(dirname(__file__), 'cert/my_demo_root_certificate.pem') + self.ca_private_key_filename = join(dirname(__file__), 'cert/my_demo_ca_private_key.pem') + self.ca_certificate_filename = join(dirname(__file__), 'cert/my_demo_ca_certificate.pem') + self.si_private_key_filename = join(dirname(__file__), 'cert/my_demo_si_private_key.pem') + self.si_public_key_filename = join(dirname(__file__), 'cert/my_demo_si_public_key.pem') + self.si_certificate_filename = join(dirname(__file__), 'cert/my_demo_si_certificate.pem') + + if not (isfile(self.root_private_key_filename) + and isfile(self.ca_private_key_filename) + and isfile(self.ca_certificate_filename) + and isfile(self.si_private_key_filename) + and isfile(self.si_certificate_filename)): + self.init_config_token_demo() + + def init_config_token_demo(self): + """ Create Root Key and Certificate """ + + # create root keypair + my_root_private_key = generate_private_key(public_exponent=65537, key_size=4096) + my_root_public_key = my_root_private_key.public_key() + + # create root-certificate subject + my_root_subject = x509.Name([ + x509.NameAttribute(NameOID.COUNTRY_NAME, u'US'), + x509.NameAttribute(NameOID.STATE_OR_PROVINCE_NAME, u'California'), + x509.NameAttribute(NameOID.ORGANIZATION_NAME, u'Nvidia'), + x509.NameAttribute(NameOID.ORGANIZATIONAL_UNIT_NAME, u'Nvidia Licensing Service (NLS)'), + x509.NameAttribute(NameOID.COMMON_NAME, u'NLS Root CA'), + ]) + + # create self-signed root-certificate + my_root_certificate = ( + x509.CertificateBuilder() + .subject_name(my_root_subject) + .issuer_name(my_root_subject) + .public_key(my_root_public_key) + .serial_number(x509.random_serial_number()) + .not_valid_before(datetime.now(tz=UTC) - timedelta(days=1)) + .not_valid_after(datetime.now(tz=UTC) + timedelta(days=365 * 10)) + .add_extension(x509.BasicConstraints(ca=True, path_length=None), critical=True) + .add_extension(x509.SubjectKeyIdentifier.from_public_key(my_root_public_key), critical=False) + .sign(my_root_private_key, hashes.SHA256())) + + my_root_private_key_as_pem = my_root_private_key.private_bytes( + encoding=serialization.Encoding.PEM, + format=serialization.PrivateFormat.TraditionalOpenSSL, + encryption_algorithm=serialization.NoEncryption(), + ) + + with open(self.root_private_key_filename, 'wb') as f: + f.write(my_root_private_key_as_pem) + + with open(self.root_certificate_filename, 'wb') as f: + f.write(my_root_certificate.public_bytes(encoding=Encoding.PEM)) + + """ Create CA (Intermediate) Key and Certificate """ + + # create ca keypair + my_ca_private_key = generate_private_key(public_exponent=65537, key_size=4096) + my_ca_public_key = my_ca_private_key.public_key() + + # create ca-certificate subject + my_ca_subject = x509.Name([ + x509.NameAttribute(NameOID.COUNTRY_NAME, u'US'), + x509.NameAttribute(NameOID.STATE_OR_PROVINCE_NAME, u'California'), + x509.NameAttribute(NameOID.ORGANIZATION_NAME, u'Nvidia'), + x509.NameAttribute(NameOID.ORGANIZATIONAL_UNIT_NAME, u'Nvidia Licensing Service (NLS)'), + x509.NameAttribute(NameOID.COMMON_NAME, u'NLS Intermediate CA'), + ]) + + # create self-signed ca-certificate + my_ca_certificate = ( + x509.CertificateBuilder() + .subject_name(my_ca_subject) + .issuer_name(my_root_subject) + .public_key(my_ca_public_key) + .serial_number(x509.random_serial_number()) + .not_valid_before(datetime.now(tz=UTC) - timedelta(days=1)) + .not_valid_after(datetime.now(tz=UTC) + timedelta(days=365 * 10)) + .add_extension(x509.BasicConstraints(ca=True, path_length=None), critical=True) + .add_extension(x509.KeyUsage( + digital_signature=False, + key_encipherment=False, + key_cert_sign=True, + key_agreement=False, + content_commitment=False, + data_encipherment=False, + crl_sign=True, + encipher_only=False, + decipher_only=False), + critical=True + ) + .add_extension(x509.SubjectKeyIdentifier.from_public_key(my_ca_public_key), critical=False) + # .add_extension(x509.AuthorityKeyIdentifier.from_issuer_public_key(my_root_public_key), critical=False) + .add_extension(x509.AuthorityKeyIdentifier.from_issuer_subject_key_identifier( + my_root_certificate.extensions.get_extension_for_class(x509.SubjectKeyIdentifier).value + ), critical=False) + .sign(my_root_private_key, hashes.SHA256())) + + my_ca_private_key_as_pem = my_ca_private_key.private_bytes( + encoding=serialization.Encoding.PEM, + format=serialization.PrivateFormat.TraditionalOpenSSL, + encryption_algorithm=serialization.NoEncryption(), + ) + + with open(self.ca_private_key_filename, 'wb') as f: + f.write(my_ca_private_key_as_pem) + + with open(self.ca_certificate_filename, 'wb') as f: + f.write(my_ca_certificate.public_bytes(encoding=Encoding.PEM)) + + """ Create Service-Instance Key and Certificate """ + + # create si keypair + my_si_private_key = generate_private_key(public_exponent=65537, key_size=2048) + my_si_public_key = my_si_private_key.public_key() + + my_si_private_key_as_pem = my_si_private_key.private_bytes( + encoding=serialization.Encoding.PEM, + format=serialization.PrivateFormat.TraditionalOpenSSL, + encryption_algorithm=serialization.NoEncryption(), + ) + my_si_public_key_as_pem = my_si_public_key.public_bytes( + encoding=serialization.Encoding.PEM, + format=serialization.PublicFormat.SubjectPublicKeyInfo, + ) + + with open(self.si_private_key_filename, 'wb') as f: + f.write(my_si_private_key_as_pem) + + with open(self.si_public_key_filename, 'wb') as f: + f.write(my_si_public_key_as_pem) + + # create si-certificate subject + my_si_subject = x509.Name([ + # x509.NameAttribute(NameOID.COMMON_NAME, INSTANCE_REF), + x509.NameAttribute(NameOID.COMMON_NAME, j.get('service_instance_ref')), + ]) + + # create self-signed si-certificate + my_si_certificate = ( + x509.CertificateBuilder() + .subject_name(my_si_subject) + .issuer_name(my_ca_subject) + .public_key(my_si_public_key) + .serial_number(x509.random_serial_number()) + .not_valid_before(datetime.now(tz=UTC) - timedelta(days=1)) + .not_valid_after(datetime.now(tz=UTC) + timedelta(days=365 * 10)) + .add_extension(x509.KeyUsage(digital_signature=True, key_encipherment=True, key_cert_sign=False, + key_agreement=True, content_commitment=False, data_encipherment=False, + crl_sign=False, encipher_only=False, decipher_only=False), critical=True) + .add_extension(x509.ExtendedKeyUsage([ + x509.oid.ExtendedKeyUsageOID.SERVER_AUTH, + x509.oid.ExtendedKeyUsageOID.CLIENT_AUTH] + ), critical=False) + .add_extension(x509.SubjectKeyIdentifier.from_public_key(my_si_public_key), critical=False) + # .add_extension(x509.AuthorityKeyIdentifier.from_issuer_public_key(my_ca_public_key), critical=False) + .add_extension(x509.AuthorityKeyIdentifier.from_issuer_subject_key_identifier( + my_ca_certificate.extensions.get_extension_for_class(x509.SubjectKeyIdentifier).value + ), critical=False) + .add_extension(x509.SubjectAlternativeName([ + # x509.DNSName(INSTANCE_REF) + x509.DNSName(self.service_instance_ref) + ]), critical=False) + .sign(my_ca_private_key, hashes.SHA256())) + + my_si_public_key_exp = my_si_certificate.public_key().public_numbers().e + my_si_public_key_mod = f'{my_si_certificate.public_key().public_numbers().n:x}' # hex value without "0x" prefix + + with open(self.si_certificate_filename, 'wb') as f: + f.write(my_si_certificate.public_bytes(encoding=Encoding.PEM)) + + class PrivateKey: def __init__(self, data: bytes): @@ -82,6 +271,12 @@ class PublicKey: format=serialization.PublicFormat.SubjectPublicKeyInfo ) + def mod(self) -> str: + return hex(self.__key.public_numbers().n)[2:] + + def exp(self): + return int(self.__key.public_numbers().e) + def verify_signature(self, signature: bytes, data: bytes) -> bytes: return self.__key.verify(signature, data, padding=PKCS1v15(), algorithm=SHA256()) diff --git a/test/main.py b/test/main.py index 664b754..2547412 100644 --- a/test/main.py +++ b/test/main.py @@ -4,7 +4,6 @@ from base64 import b64encode as b64enc from calendar import timegm from datetime import datetime, UTC from hashlib import sha256 -from os.path import dirname, join from uuid import uuid4, UUID from dateutil.relativedelta import relativedelta @@ -17,21 +16,23 @@ sys.path.append('../') sys.path.append('../app') from app import main -from util import PrivateKey, PublicKey +from util import CASetup, PrivateKey, PublicKey client = TestClient(main.app) +# Instance INSTANCE_REF = '10000000-0000-0000-0000-000000000001' ORIGIN_REF, ALLOTMENT_REF, SECRET = str(uuid4()), '20000000-0000-0000-0000-000000000001', 'HelloWorld' -# INSTANCE_KEY_RSA = generate_key() -# INSTANCE_KEY_PUB = INSTANCE_KEY_RSA.public_key() +# CA & Signing +ca_setup = CASetup(service_instance_ref=INSTANCE_REF) +my_si_private_key = PrivateKey.from_file(ca_setup.si_private_key_filename) +my_si_private_key_as_pem = my_si_private_key.pem() +my_si_public_key = my_si_private_key.public_key() +my_si_public_key_as_pem = my_si_private_key.public_key().pem() -INSTANCE_KEY_RSA = PrivateKey.from_file(str(join(dirname(__file__), '../app/cert/instance.private.pem'))) -INSTANCE_KEY_PUB = PublicKey.from_file(str(join(dirname(__file__), '../app/cert/instance.public.pem'))) - -jwt_encode_key = jwk.construct(INSTANCE_KEY_RSA.pem(), algorithm=ALGORITHMS.RS256) -jwt_decode_key = jwk.construct(INSTANCE_KEY_PUB.pem(), algorithm=ALGORITHMS.RS256) +jwt_encode_key = jwk.construct(my_si_private_key_as_pem, algorithm=ALGORITHMS.RS256) +jwt_decode_key = jwk.construct(my_si_public_key_as_pem, algorithm=ALGORITHMS.RS256) def __bearer_token(origin_ref: str) -> str: @@ -41,10 +42,10 @@ def __bearer_token(origin_ref: str) -> str: def test_signing(): - signature_set_header = INSTANCE_KEY_RSA.generate_signature(b'Hello') + signature_set_header = my_si_private_key.generate_signature(b'Hello') # test plain - INSTANCE_KEY_PUB.verify_signature(signature_set_header, b'Hello') + my_si_public_key.verify_signature(signature_set_header, b'Hello') # test "X-NLS-Signature: b'....' x_nls_signature_header_value = f'{signature_set_header.hex().encode()}' @@ -54,7 +55,7 @@ def test_signing(): # test eval signature_get_header = eval(x_nls_signature_header_value) signature_get_header = bytes.fromhex(signature_get_header.decode('ascii')) - INSTANCE_KEY_PUB.verify_signature(signature_get_header, b'Hello') + my_si_public_key.verify_signature(signature_get_header, b'Hello') def test_index(): @@ -233,7 +234,7 @@ def test_leasing_v1_lessor(): assert len(signature) == 512 signature = bytes.fromhex(signature.decode('ascii')) assert len(signature) == 256 - INSTANCE_KEY_PUB.verify_signature(signature, response.content) + my_si_public_key.verify_signature(signature, response.content) lease_result_list = response.json().get('lease_result_list') assert len(lease_result_list) == 1 @@ -271,7 +272,7 @@ def test_leasing_v1_lease_renew(): assert len(signature) == 512 signature = bytes.fromhex(signature.decode('ascii')) assert len(signature) == 256 - INSTANCE_KEY_PUB.verify_signature(signature, response.content) + my_si_public_key.verify_signature(signature, response.content) lease_ref = response.json().get('lease_ref') assert len(lease_ref) == 36