code refactorings - removed INSTANCE_KEY_RSA and INSTANCE_KEY_PUB from configuration and implemented CASetup instead

This commit is contained in:
Oscar Krause 2025-04-16 07:54:23 +02:00
parent b9e78dbeeb
commit e23912c49a
6 changed files with 236 additions and 235 deletions

View File

@ -21,7 +21,3 @@ DATABASE=sqlite:////etc/fastapi-dls/db.sqlite
#SITE_KEY_XID="00000000-0000-0000-0000-000000000000"
#INSTANCE_REF="10000000-0000-0000-0000-000000000001"
#ALLOTMENT_REF="20000000-0000-0000-0000-000000000001"
# Site-wide signing keys
INSTANCE_KEY_RSA=/etc/fastapi-dls/instance.private.pem
INSTANCE_KEY_PUB=/etc/fastapi-dls/instance.public.pem

View File

@ -19,10 +19,6 @@ DATABASE="sqlite:////var/lib/fastapi-dls/db.sqlite"
SITE_KEY_XID="<<sitekey>>"
INSTANCE_REF="<<instanceref>>"
# Site-wide signing keys
INSTANCE_KEY_RSA="/var/lib/fastapi-dls/instance.private.pem"
INSTANCE_KEY_PUB="/var/lib/fastapi-dls/instance.public.pem"
# TLS certificate
INSTANCE_SSL_CERT="/var/lib/fastapi-dls/cert/webserver.crt"
INSTANCE_SSL_KEY="/var/lib/fastapi-dls/cert/webserver.key"

View File

@ -435,9 +435,7 @@ After first success you have to replace `--issue` with `--renew`.
| `CORS_ORIGINS` | `https://{DLS_URL}` | Sets `Access-Control-Allow-Origin` header (comma separated string) \*2 |
| `SITE_KEY_XID` | `00000000-0000-0000-0000-000000000000` | Site identification uuid |
| `INSTANCE_REF` | `10000000-0000-0000-0000-000000000001` | Instance identification uuid |
| `ALLOTMENT_REF` | `20000000-0000-0000-0000-000000000001` | Allotment identification uuid |
| `INSTANCE_KEY_RSA` | `<app-dir>/cert/instance.private.pem` | Site-wide private RSA key for singing JWTs \*3 |
| `INSTANCE_KEY_PUB` | `<app-dir>/cert/instance.public.pem` | Site-wide public key \*3 |
| `ALLOTMENT_REF` | `20000000-0000-0000-0000-000000000001` | Allotment identification uuid | |
\*1 For example, if the lease period is one day and the renewal period is 20%, the client attempts to renew its license
every 4.8 hours. If network connectivity is lost, the loss of connectivity is detected during license renewal and the
@ -445,8 +443,6 @@ client has 19.2 hours in which to re-establish connectivity before its license e
\*2 Always use `https`, since guest-drivers only support secure connections!
\*3 If you recreate your instance keys you need to **recreate client-token for each guest**!
# Setup (Client)
**The token file has to be copied! It's not enough to C&P file contents, because there can be special characters.**

View File

@ -6,7 +6,7 @@ from datetime import datetime, timedelta, UTC
from hashlib import sha256
from json import loads as json_loads, dumps as json_dumps
from os import getenv as env
from os.path import join, dirname, isfile
from os.path import join, dirname
from uuid import uuid4
from dateutil.relativedelta import relativedelta
@ -21,7 +21,7 @@ from sqlalchemy.orm import sessionmaker
from starlette.middleware.cors import CORSMiddleware
from orm import Origin, Lease, init as db_init, migrate
from util import PrivateKey, PublicKey, load_file, Cert, ProductMapping
from util import CASetup, PrivateKey, PublicKey, load_file, Cert, ProductMapping
# Load variables
load_dotenv('../version.env')
@ -42,8 +42,6 @@ DLS_PORT = int(env('DLS_PORT', '443'))
SITE_KEY_XID = str(env('SITE_KEY_XID', '00000000-0000-0000-0000-000000000000'))
INSTANCE_REF = str(env('INSTANCE_REF', '10000000-0000-0000-0000-000000000001'))
ALLOTMENT_REF = str(env('ALLOTMENT_REF', '20000000-0000-0000-0000-000000000001'))
INSTANCE_KEY_RSA = PrivateKey.from_file(str(env('INSTANCE_KEY_RSA', join(dirname(__file__), 'cert/instance.private.pem'))))
INSTANCE_KEY_PUB = PublicKey.from_file(str(env('INSTANCE_KEY_PUB', join(dirname(__file__), 'cert/instance.public.pem'))))
TOKEN_EXPIRE_DELTA = relativedelta(days=int(env('TOKEN_EXPIRE_DAYS', 1)), hours=int(env('TOKEN_EXPIRE_HOURS', 0)))
LEASE_EXPIRE_DELTA = relativedelta(days=int(env('LEASE_EXPIRE_DAYS', 90)), hours=int(env('LEASE_EXPIRE_HOURS', 0)))
LEASE_RENEWAL_PERIOD = float(env('LEASE_RENEWAL_PERIOD', 0.15))
@ -53,9 +51,15 @@ CORS_ORIGINS = str(env('CORS_ORIGINS', '')).split(',') if (env('CORS_ORIGINS'))
DT_FORMAT = '%Y-%m-%dT%H:%M:%S.%fZ'
PRODUCT_MAPPING = ProductMapping(filename=join(dirname(__file__), 'static/product_mapping.json'))
# Create certificate chain and signing keys
ca_setup = CASetup(service_instance_ref=INSTANCE_REF)
my_ca_certificate = Cert.from_file(ca_setup.ca_certificate_filename)
my_si_certificate = Cert.from_file(ca_setup.si_certificate_filename)
my_si_private_key = PrivateKey.from_file(ca_setup.si_private_key_filename)
my_si_public_key = my_si_private_key.public_key()
jwt_encode_key = jwk.construct(INSTANCE_KEY_RSA.pem(), algorithm=ALGORITHMS.RS256)
jwt_decode_key = jwk.construct(INSTANCE_KEY_PUB.pem(), algorithm=ALGORITHMS.RS256)
jwt_encode_key = jwk.construct(my_si_private_key.pem(), algorithm=ALGORITHMS.RS256)
jwt_decode_key = jwk.construct(my_si_private_key.public_key().pem(), algorithm=ALGORITHMS.RS256)
# Logging
LOG_LEVEL = logging.DEBUG if DEBUG else logging.INFO
@ -268,10 +272,10 @@ async def _client_token():
},
"service_instance_public_key_configuration": {
"service_instance_public_key_me": {
"mod": hex(INSTANCE_KEY_PUB.raw().public_numbers().n)[2:],
"exp": int(INSTANCE_KEY_PUB.raw().public_numbers().e),
"mod": my_si_public_key.mod(),
"exp": my_si_public_key.exp(),
},
"service_instance_public_key_pem": INSTANCE_KEY_PUB.pem().decode('utf-8'),
"service_instance_public_key_pem": my_si_private_key.public_key().pem().decode('utf-8'),
"key_retention_mode": "LATEST_ONLY"
},
}
@ -431,190 +435,6 @@ async def leasing_v1_config_token(request: Request):
logger.debug(f'Headers: {request.headers}')
logger.debug(f'Request: {j}')
# todo: THIS IS A DEMO ONLY
###
#
# https://git.collinwebdesigns.de/nvidia/nls/-/blob/main/src/test/test_config_token.py
#
###
root_private_key_filename = join(dirname(__file__), 'cert/my_demo_root_private_key.pem')
root_certificate_filename = join(dirname(__file__), 'cert/my_demo_root_certificate.pem')
ca_private_key_filename = join(dirname(__file__), 'cert/my_demo_ca_private_key.pem')
ca_certificate_filename = join(dirname(__file__), 'cert/my_demo_ca_certificate.pem')
si_private_key_filename = join(dirname(__file__), 'cert/my_demo_si_private_key.pem')
si_certificate_filename = join(dirname(__file__), 'cert/my_demo_si_certificate.pem')
def init_config_token_demo():
from cryptography import x509
from cryptography.hazmat._oid import NameOID
from cryptography.hazmat.primitives import serialization, hashes
from cryptography.hazmat.primitives.asymmetric.rsa import generate_private_key
from cryptography.hazmat.primitives.serialization import Encoding
""" Create Root Key and Certificate """
# create root keypair
my_root_private_key = generate_private_key(public_exponent=65537, key_size=4096)
my_root_public_key = my_root_private_key.public_key()
# create root-certificate subject
my_root_subject = x509.Name([
x509.NameAttribute(NameOID.COUNTRY_NAME, u'US'),
x509.NameAttribute(NameOID.STATE_OR_PROVINCE_NAME, u'California'),
x509.NameAttribute(NameOID.ORGANIZATION_NAME, u'Nvidia'),
x509.NameAttribute(NameOID.ORGANIZATIONAL_UNIT_NAME, u'Nvidia Licensing Service (NLS)'),
x509.NameAttribute(NameOID.COMMON_NAME, u'NLS Root CA'),
])
# create self-signed root-certificate
my_root_certificate = (
x509.CertificateBuilder()
.subject_name(my_root_subject)
.issuer_name(my_root_subject)
.public_key(my_root_public_key)
.serial_number(x509.random_serial_number())
.not_valid_before(datetime.now(tz=UTC) - timedelta(days=1))
.not_valid_after(datetime.now(tz=UTC) + timedelta(days=365 * 10))
.add_extension(x509.BasicConstraints(ca=True, path_length=None), critical=True)
.add_extension(x509.SubjectKeyIdentifier.from_public_key(my_root_public_key), critical=False)
.sign(my_root_private_key, hashes.SHA256()))
my_root_private_key_as_pem = my_root_private_key.private_bytes(
encoding=serialization.Encoding.PEM,
format=serialization.PrivateFormat.TraditionalOpenSSL,
encryption_algorithm=serialization.NoEncryption(),
)
with open(root_private_key_filename, 'wb') as f:
f.write(my_root_private_key_as_pem)
with open(root_certificate_filename, 'wb') as f:
f.write(my_root_certificate.public_bytes(encoding=Encoding.PEM))
""" Create CA (Intermediate) Key and Certificate """
# create ca keypair
my_ca_private_key = generate_private_key(public_exponent=65537, key_size=4096)
my_ca_public_key = my_ca_private_key.public_key()
# create ca-certificate subject
my_ca_subject = x509.Name([
x509.NameAttribute(NameOID.COUNTRY_NAME, u'US'),
x509.NameAttribute(NameOID.STATE_OR_PROVINCE_NAME, u'California'),
x509.NameAttribute(NameOID.ORGANIZATION_NAME, u'Nvidia'),
x509.NameAttribute(NameOID.ORGANIZATIONAL_UNIT_NAME, u'Nvidia Licensing Service (NLS)'),
x509.NameAttribute(NameOID.COMMON_NAME, u'NLS Intermediate CA'),
])
# create self-signed ca-certificate
my_ca_certificate = (
x509.CertificateBuilder()
.subject_name(my_ca_subject)
.issuer_name(my_root_subject)
.public_key(my_ca_public_key)
.serial_number(x509.random_serial_number())
.not_valid_before(datetime.now(tz=UTC) - timedelta(days=1))
.not_valid_after(datetime.now(tz=UTC) + timedelta(days=365 * 10))
.add_extension(x509.BasicConstraints(ca=True, path_length=None), critical=True)
.add_extension(x509.KeyUsage(digital_signature=False, key_encipherment=False, key_cert_sign=True,
key_agreement=False, content_commitment=False, data_encipherment=False,
crl_sign=True, encipher_only=False, decipher_only=False), critical=True)
.add_extension(x509.SubjectKeyIdentifier.from_public_key(my_ca_public_key), critical=False)
# .add_extension(x509.AuthorityKeyIdentifier.from_issuer_public_key(my_root_public_key), critical=False)
.add_extension(x509.AuthorityKeyIdentifier.from_issuer_subject_key_identifier(
my_root_certificate.extensions.get_extension_for_class(x509.SubjectKeyIdentifier).value
), critical=False)
.sign(my_root_private_key, hashes.SHA256()))
my_ca_private_key_as_pem = my_ca_private_key.private_bytes(
encoding=serialization.Encoding.PEM,
format=serialization.PrivateFormat.TraditionalOpenSSL,
encryption_algorithm=serialization.NoEncryption(),
)
with open(ca_private_key_filename, 'wb') as f:
f.write(my_ca_private_key_as_pem)
with open(ca_certificate_filename, 'wb') as f:
f.write(my_ca_certificate.public_bytes(encoding=Encoding.PEM))
""" Create Service-Instance Key and Certificate """
# create si keypair
my_si_private_key = generate_private_key(public_exponent=65537, key_size=2048)
my_si_public_key = my_si_private_key.public_key()
my_si_private_key_as_pem = my_si_private_key.private_bytes(
encoding=serialization.Encoding.PEM,
format=serialization.PrivateFormat.TraditionalOpenSSL,
encryption_algorithm=serialization.NoEncryption(),
)
my_si_public_key_as_pem = my_si_public_key.public_bytes(
encoding=serialization.Encoding.PEM,
format=serialization.PublicFormat.SubjectPublicKeyInfo,
)
with open(si_private_key_filename, 'wb') as f:
f.write(my_si_private_key_as_pem)
# with open('instance.public.pem', 'wb') as f:
# f.write(my_si_public_key_as_pem)
# create si-certificate subject
my_si_subject = x509.Name([
# x509.NameAttribute(NameOID.COMMON_NAME, INSTANCE_REF),
x509.NameAttribute(NameOID.COMMON_NAME, j.get('service_instance_ref')),
])
# create self-signed si-certificate
my_si_certificate = (
x509.CertificateBuilder()
.subject_name(my_si_subject)
.issuer_name(my_ca_subject)
.public_key(my_si_public_key)
.serial_number(x509.random_serial_number())
.not_valid_before(datetime.now(tz=UTC) - timedelta(days=1))
.not_valid_after(datetime.now(tz=UTC) + timedelta(days=365 * 10))
.add_extension(x509.KeyUsage(digital_signature=True, key_encipherment=True, key_cert_sign=False,
key_agreement=True, content_commitment=False, data_encipherment=False,
crl_sign=False, encipher_only=False, decipher_only=False), critical=True)
.add_extension(x509.ExtendedKeyUsage([
x509.oid.ExtendedKeyUsageOID.SERVER_AUTH,
x509.oid.ExtendedKeyUsageOID.CLIENT_AUTH]
), critical=False)
.add_extension(x509.SubjectKeyIdentifier.from_public_key(my_si_public_key), critical=False)
# .add_extension(x509.AuthorityKeyIdentifier.from_issuer_public_key(my_ca_public_key), critical=False)
.add_extension(x509.AuthorityKeyIdentifier.from_issuer_subject_key_identifier(
my_ca_certificate.extensions.get_extension_for_class(x509.SubjectKeyIdentifier).value
), critical=False)
.add_extension(x509.SubjectAlternativeName([
# x509.DNSName(INSTANCE_REF)
x509.DNSName(j.get('service_instance_ref'))
]), critical=False)
.sign(my_ca_private_key, hashes.SHA256()))
my_si_public_key_exp = my_si_certificate.public_key().public_numbers().e
my_si_public_key_mod = f'{my_si_certificate.public_key().public_numbers().n:x}' # hex value without "0x" prefix
with open(si_certificate_filename, 'wb') as f:
f.write(my_si_certificate.public_bytes(encoding=Encoding.PEM))
if not (isfile(root_private_key_filename)
and isfile(ca_private_key_filename)
and isfile(ca_certificate_filename)
and isfile(si_private_key_filename)
and isfile(si_certificate_filename)):
init_config_token_demo()
my_ca_certificate = Cert.from_file(ca_certificate_filename)
my_si_certificate = Cert.from_file(si_certificate_filename)
my_si_private_key = PrivateKey.from_file(si_private_key_filename)
my_si_private_key_as_pem = my_si_private_key.pem()
my_si_public_key = my_si_private_key.public_key().raw()
my_si_public_key_as_pem = my_si_private_key.public_key().pem()
""" build out payload """
cur_time = datetime.now(UTC)
@ -631,16 +451,16 @@ async def leasing_v1_config_token(request: Request):
"service_instance_ref": j.get('service_instance_ref'),
"service_instance_public_key_configuration": {
"service_instance_public_key_me": {
"mod": hex(my_si_public_key.public_numbers().n)[2:],
"exp": int(my_si_public_key.public_numbers().e),
"mod": my_si_public_key.mod(),
"exp": my_si_public_key.exp(),
},
# 64 chars per line (pem default)
"service_instance_public_key_pem": my_si_public_key_as_pem.decode('utf-8').strip(),
"service_instance_public_key_pem": my_si_private_key.public_key().pem().decode('utf-8').strip(),
"key_retention_mode": "LATEST_ONLY"
},
}
my_jwt_encode_key = jwk.construct(my_si_private_key_as_pem.decode('utf-8'), algorithm=ALGORITHMS.RS256)
my_jwt_encode_key = jwk.construct(my_si_private_key.pem().decode('utf-8'), algorithm=ALGORITHMS.RS256)
config_token = jws.sign(payload, key=my_jwt_encode_key, headers=None, algorithm=ALGORITHMS.RS256)
response_ca_chain = my_ca_certificate.pem().decode('utf-8')
@ -722,17 +542,15 @@ async def leasing_v1_lessor(request: Request):
}
content = json_dumps(response, separators=(',', ':'))
content = f'{content}\n'.encode('utf-8')
signature = INSTANCE_KEY_RSA.generate_signature(content)
content = f'{content}\n'.encode('ascii')
signature = my_si_private_key.generate_signature(content)
headers = {
'Content-Type': 'application/json',
'access-control-expose-headers': 'X-NLS-Signature',
'X-NLS-Signature': f'{signature.hex().encode()}'
}
x = Response(content=content, media_type='text/plain')
x.raw_headers = [(k.encode("latin-1"), v.encode("latin-1")) for k, v in headers.items()]
return x
return Response(content=content, media_type='text/plain', headers=headers) # return JSONr(content, headers=headers)
# venv/lib/python3.9/site-packages/nls_services_lease/test/test_lease_multi_controller.py
@ -784,17 +602,16 @@ async def leasing_v1_lease_renew(request: Request, lease_ref: str):
Lease.renew(db, entity, expires, cur_time)
content = json_dumps(response, separators=(',', ':'))
content = f'{content}\n'.encode('utf-8')
signature = INSTANCE_KEY_RSA.generate_signature(content)
content = f'{content}\n'.encode('ascii')
signature = my_si_private_key.generate_signature(content)
headers = {
'Content-Type': 'application/json',
'access-control-expose-headers': 'X-NLS-Signature',
'X-NLS-Signature': f'{signature.hex().encode()}'
}
x = Response(content=content, media_type='text/plain')
x.raw_headers = [(k.encode("latin-1"), v.encode("latin-1")) for k, v in headers.items()]
return x
return Response(content=content, media_type='text/plain', headers=headers) #return JSONr(content, headers=headers)
# venv/lib/python3.9/site-packages/nls_services_lease/test/test_lease_single_controller.py

View File

@ -1,15 +1,204 @@
import logging
from datetime import datetime, UTC, timedelta
from json import loads as json_loads
from cryptography.hazmat.primitives import serialization
from os.path import join, dirname, isfile
from cryptography import x509
from cryptography.hazmat._oid import NameOID
from cryptography.hazmat.primitives import serialization, hashes
from cryptography.hazmat.primitives.asymmetric.padding import PKCS1v15
from cryptography.hazmat.primitives.asymmetric.rsa import RSAPrivateKey, RSAPublicKey, generate_private_key
from cryptography.hazmat.primitives.hashes import SHA256
from cryptography.hazmat.primitives.serialization import Encoding
from cryptography.hazmat.primitives.serialization import load_pem_private_key, load_pem_public_key
from cryptography.x509 import load_pem_x509_certificate, Certificate
logging.basicConfig()
class CASetup:
###
#
# https://git.collinwebdesigns.de/nvidia/nls/-/blob/main/src/test/test_config_token.py
#
###
def __init__(self, service_instance_ref: str):
self.service_instance_ref = service_instance_ref
self.root_private_key_filename = join(dirname(__file__), 'cert/my_demo_root_private_key.pem')
self.root_certificate_filename = join(dirname(__file__), 'cert/my_demo_root_certificate.pem')
self.ca_private_key_filename = join(dirname(__file__), 'cert/my_demo_ca_private_key.pem')
self.ca_certificate_filename = join(dirname(__file__), 'cert/my_demo_ca_certificate.pem')
self.si_private_key_filename = join(dirname(__file__), 'cert/my_demo_si_private_key.pem')
self.si_public_key_filename = join(dirname(__file__), 'cert/my_demo_si_public_key.pem')
self.si_certificate_filename = join(dirname(__file__), 'cert/my_demo_si_certificate.pem')
if not (isfile(self.root_private_key_filename)
and isfile(self.ca_private_key_filename)
and isfile(self.ca_certificate_filename)
and isfile(self.si_private_key_filename)
and isfile(self.si_certificate_filename)):
self.init_config_token_demo()
def init_config_token_demo(self):
""" Create Root Key and Certificate """
# create root keypair
my_root_private_key = generate_private_key(public_exponent=65537, key_size=4096)
my_root_public_key = my_root_private_key.public_key()
# create root-certificate subject
my_root_subject = x509.Name([
x509.NameAttribute(NameOID.COUNTRY_NAME, u'US'),
x509.NameAttribute(NameOID.STATE_OR_PROVINCE_NAME, u'California'),
x509.NameAttribute(NameOID.ORGANIZATION_NAME, u'Nvidia'),
x509.NameAttribute(NameOID.ORGANIZATIONAL_UNIT_NAME, u'Nvidia Licensing Service (NLS)'),
x509.NameAttribute(NameOID.COMMON_NAME, u'NLS Root CA'),
])
# create self-signed root-certificate
my_root_certificate = (
x509.CertificateBuilder()
.subject_name(my_root_subject)
.issuer_name(my_root_subject)
.public_key(my_root_public_key)
.serial_number(x509.random_serial_number())
.not_valid_before(datetime.now(tz=UTC) - timedelta(days=1))
.not_valid_after(datetime.now(tz=UTC) + timedelta(days=365 * 10))
.add_extension(x509.BasicConstraints(ca=True, path_length=None), critical=True)
.add_extension(x509.SubjectKeyIdentifier.from_public_key(my_root_public_key), critical=False)
.sign(my_root_private_key, hashes.SHA256()))
my_root_private_key_as_pem = my_root_private_key.private_bytes(
encoding=serialization.Encoding.PEM,
format=serialization.PrivateFormat.TraditionalOpenSSL,
encryption_algorithm=serialization.NoEncryption(),
)
with open(self.root_private_key_filename, 'wb') as f:
f.write(my_root_private_key_as_pem)
with open(self.root_certificate_filename, 'wb') as f:
f.write(my_root_certificate.public_bytes(encoding=Encoding.PEM))
""" Create CA (Intermediate) Key and Certificate """
# create ca keypair
my_ca_private_key = generate_private_key(public_exponent=65537, key_size=4096)
my_ca_public_key = my_ca_private_key.public_key()
# create ca-certificate subject
my_ca_subject = x509.Name([
x509.NameAttribute(NameOID.COUNTRY_NAME, u'US'),
x509.NameAttribute(NameOID.STATE_OR_PROVINCE_NAME, u'California'),
x509.NameAttribute(NameOID.ORGANIZATION_NAME, u'Nvidia'),
x509.NameAttribute(NameOID.ORGANIZATIONAL_UNIT_NAME, u'Nvidia Licensing Service (NLS)'),
x509.NameAttribute(NameOID.COMMON_NAME, u'NLS Intermediate CA'),
])
# create self-signed ca-certificate
my_ca_certificate = (
x509.CertificateBuilder()
.subject_name(my_ca_subject)
.issuer_name(my_root_subject)
.public_key(my_ca_public_key)
.serial_number(x509.random_serial_number())
.not_valid_before(datetime.now(tz=UTC) - timedelta(days=1))
.not_valid_after(datetime.now(tz=UTC) + timedelta(days=365 * 10))
.add_extension(x509.BasicConstraints(ca=True, path_length=None), critical=True)
.add_extension(x509.KeyUsage(
digital_signature=False,
key_encipherment=False,
key_cert_sign=True,
key_agreement=False,
content_commitment=False,
data_encipherment=False,
crl_sign=True,
encipher_only=False,
decipher_only=False),
critical=True
)
.add_extension(x509.SubjectKeyIdentifier.from_public_key(my_ca_public_key), critical=False)
# .add_extension(x509.AuthorityKeyIdentifier.from_issuer_public_key(my_root_public_key), critical=False)
.add_extension(x509.AuthorityKeyIdentifier.from_issuer_subject_key_identifier(
my_root_certificate.extensions.get_extension_for_class(x509.SubjectKeyIdentifier).value
), critical=False)
.sign(my_root_private_key, hashes.SHA256()))
my_ca_private_key_as_pem = my_ca_private_key.private_bytes(
encoding=serialization.Encoding.PEM,
format=serialization.PrivateFormat.TraditionalOpenSSL,
encryption_algorithm=serialization.NoEncryption(),
)
with open(self.ca_private_key_filename, 'wb') as f:
f.write(my_ca_private_key_as_pem)
with open(self.ca_certificate_filename, 'wb') as f:
f.write(my_ca_certificate.public_bytes(encoding=Encoding.PEM))
""" Create Service-Instance Key and Certificate """
# create si keypair
my_si_private_key = generate_private_key(public_exponent=65537, key_size=2048)
my_si_public_key = my_si_private_key.public_key()
my_si_private_key_as_pem = my_si_private_key.private_bytes(
encoding=serialization.Encoding.PEM,
format=serialization.PrivateFormat.TraditionalOpenSSL,
encryption_algorithm=serialization.NoEncryption(),
)
my_si_public_key_as_pem = my_si_public_key.public_bytes(
encoding=serialization.Encoding.PEM,
format=serialization.PublicFormat.SubjectPublicKeyInfo,
)
with open(self.si_private_key_filename, 'wb') as f:
f.write(my_si_private_key_as_pem)
with open(self.si_public_key_filename, 'wb') as f:
f.write(my_si_public_key_as_pem)
# create si-certificate subject
my_si_subject = x509.Name([
# x509.NameAttribute(NameOID.COMMON_NAME, INSTANCE_REF),
x509.NameAttribute(NameOID.COMMON_NAME, j.get('service_instance_ref')),
])
# create self-signed si-certificate
my_si_certificate = (
x509.CertificateBuilder()
.subject_name(my_si_subject)
.issuer_name(my_ca_subject)
.public_key(my_si_public_key)
.serial_number(x509.random_serial_number())
.not_valid_before(datetime.now(tz=UTC) - timedelta(days=1))
.not_valid_after(datetime.now(tz=UTC) + timedelta(days=365 * 10))
.add_extension(x509.KeyUsage(digital_signature=True, key_encipherment=True, key_cert_sign=False,
key_agreement=True, content_commitment=False, data_encipherment=False,
crl_sign=False, encipher_only=False, decipher_only=False), critical=True)
.add_extension(x509.ExtendedKeyUsage([
x509.oid.ExtendedKeyUsageOID.SERVER_AUTH,
x509.oid.ExtendedKeyUsageOID.CLIENT_AUTH]
), critical=False)
.add_extension(x509.SubjectKeyIdentifier.from_public_key(my_si_public_key), critical=False)
# .add_extension(x509.AuthorityKeyIdentifier.from_issuer_public_key(my_ca_public_key), critical=False)
.add_extension(x509.AuthorityKeyIdentifier.from_issuer_subject_key_identifier(
my_ca_certificate.extensions.get_extension_for_class(x509.SubjectKeyIdentifier).value
), critical=False)
.add_extension(x509.SubjectAlternativeName([
# x509.DNSName(INSTANCE_REF)
x509.DNSName(self.service_instance_ref)
]), critical=False)
.sign(my_ca_private_key, hashes.SHA256()))
my_si_public_key_exp = my_si_certificate.public_key().public_numbers().e
my_si_public_key_mod = f'{my_si_certificate.public_key().public_numbers().n:x}' # hex value without "0x" prefix
with open(self.si_certificate_filename, 'wb') as f:
f.write(my_si_certificate.public_bytes(encoding=Encoding.PEM))
class PrivateKey:
def __init__(self, data: bytes):
@ -82,6 +271,12 @@ class PublicKey:
format=serialization.PublicFormat.SubjectPublicKeyInfo
)
def mod(self) -> str:
return hex(self.__key.public_numbers().n)[2:]
def exp(self):
return int(self.__key.public_numbers().e)
def verify_signature(self, signature: bytes, data: bytes) -> bytes:
return self.__key.verify(signature, data, padding=PKCS1v15(), algorithm=SHA256())

View File

@ -4,7 +4,6 @@ from base64 import b64encode as b64enc
from calendar import timegm
from datetime import datetime, UTC
from hashlib import sha256
from os.path import dirname, join
from uuid import uuid4, UUID
from dateutil.relativedelta import relativedelta
@ -17,21 +16,23 @@ sys.path.append('../')
sys.path.append('../app')
from app import main
from util import PrivateKey, PublicKey
from util import CASetup, PrivateKey, PublicKey
client = TestClient(main.app)
# Instance
INSTANCE_REF = '10000000-0000-0000-0000-000000000001'
ORIGIN_REF, ALLOTMENT_REF, SECRET = str(uuid4()), '20000000-0000-0000-0000-000000000001', 'HelloWorld'
# INSTANCE_KEY_RSA = generate_key()
# INSTANCE_KEY_PUB = INSTANCE_KEY_RSA.public_key()
# CA & Signing
ca_setup = CASetup(service_instance_ref=INSTANCE_REF)
my_si_private_key = PrivateKey.from_file(ca_setup.si_private_key_filename)
my_si_private_key_as_pem = my_si_private_key.pem()
my_si_public_key = my_si_private_key.public_key()
my_si_public_key_as_pem = my_si_private_key.public_key().pem()
INSTANCE_KEY_RSA = PrivateKey.from_file(str(join(dirname(__file__), '../app/cert/instance.private.pem')))
INSTANCE_KEY_PUB = PublicKey.from_file(str(join(dirname(__file__), '../app/cert/instance.public.pem')))
jwt_encode_key = jwk.construct(INSTANCE_KEY_RSA.pem(), algorithm=ALGORITHMS.RS256)
jwt_decode_key = jwk.construct(INSTANCE_KEY_PUB.pem(), algorithm=ALGORITHMS.RS256)
jwt_encode_key = jwk.construct(my_si_private_key_as_pem, algorithm=ALGORITHMS.RS256)
jwt_decode_key = jwk.construct(my_si_public_key_as_pem, algorithm=ALGORITHMS.RS256)
def __bearer_token(origin_ref: str) -> str:
@ -41,10 +42,10 @@ def __bearer_token(origin_ref: str) -> str:
def test_signing():
signature_set_header = INSTANCE_KEY_RSA.generate_signature(b'Hello')
signature_set_header = my_si_private_key.generate_signature(b'Hello')
# test plain
INSTANCE_KEY_PUB.verify_signature(signature_set_header, b'Hello')
my_si_public_key.verify_signature(signature_set_header, b'Hello')
# test "X-NLS-Signature: b'....'
x_nls_signature_header_value = f'{signature_set_header.hex().encode()}'
@ -54,7 +55,7 @@ def test_signing():
# test eval
signature_get_header = eval(x_nls_signature_header_value)
signature_get_header = bytes.fromhex(signature_get_header.decode('ascii'))
INSTANCE_KEY_PUB.verify_signature(signature_get_header, b'Hello')
my_si_public_key.verify_signature(signature_get_header, b'Hello')
def test_index():
@ -233,7 +234,7 @@ def test_leasing_v1_lessor():
assert len(signature) == 512
signature = bytes.fromhex(signature.decode('ascii'))
assert len(signature) == 256
INSTANCE_KEY_PUB.verify_signature(signature, response.content)
my_si_public_key.verify_signature(signature, response.content)
lease_result_list = response.json().get('lease_result_list')
assert len(lease_result_list) == 1
@ -271,7 +272,7 @@ def test_leasing_v1_lease_renew():
assert len(signature) == 512
signature = bytes.fromhex(signature.decode('ascii'))
assert len(signature) == 256
INSTANCE_KEY_PUB.verify_signature(signature, response.content)
my_si_public_key.verify_signature(signature, response.content)
lease_ref = response.json().get('lease_ref')
assert len(lease_ref) == 36