refactored variables with "nv_" and "my_" prefix

This commit is contained in:
Oscar Krause 2025-03-20 21:02:00 +01:00
parent ffed4798f7
commit 27cd0597be

View File

@ -4,17 +4,24 @@ from datetime import datetime, UTC, timedelta
from cryptography import x509 from cryptography import x509
from cryptography.hazmat._oid import NameOID from cryptography.hazmat._oid import NameOID
from cryptography.hazmat.backends import default_backend
from cryptography.hazmat.primitives import serialization, hashes from cryptography.hazmat.primitives import serialization, hashes
from cryptography.hazmat.primitives.asymmetric.rsa import generate_private_key
from cryptography.hazmat.primitives.serialization import load_pem_private_key, load_pem_public_key, Encoding from cryptography.hazmat.primitives.serialization import load_pem_private_key, load_pem_public_key, Encoding
from dateutil.relativedelta import relativedelta from dateutil.relativedelta import relativedelta
from jose import jwk, jws from jose import jwk, jws
from jose.constants import ALGORITHMS from jose.constants import ALGORITHMS
"""
Any variables prefixed with `NV_` or `nv_` are original values, dumped from an NLS registration/activation process.
Any variables prefixed with `MY_` or `my_` are variables which are set by fastapi-dls and are "faked".
"""
### DEFAULTS ### DEFAULTS
INSTANCE_SITE_ID = '4e53a171-103b-4946-9ed8-5f4c0ee750d9' # SELECT xid FROM request_routing.service_instance
INSTANCE_KEY_RSA = """-----BEGIN RSA PRIVATE KEY----- NV_SI_SITE_ID = '4e53a171-103b-4946-9ed8-5f4c0ee750d9'
# SELECT value FROM si_<xid>.service_instance_artifact WHERE namespace = 'service_instance.client.all' and name = 'private_key'
NV_SI_KEY_RSA = """-----BEGIN RSA PRIVATE KEY-----
MIIEpAIBAAKCAQEAyIz6i48cFn4XOK0S2GTYpLMU85xzJ1fmQmA2nC6Zod2V4BxN MIIEpAIBAAKCAQEAyIz6i48cFn4XOK0S2GTYpLMU85xzJ1fmQmA2nC6Zod2V4BxN
Xqk+9y8nvdzZVELxmC+N47ZQGV/J5cquIadx0V42F3JTryJFDuZ+7fQsNlXUX3og Xqk+9y8nvdzZVELxmC+N47ZQGV/J5cquIadx0V42F3JTryJFDuZ+7fQsNlXUX3og
UQhhgvHuluhDJQSvdZAzpguS7N+gJGCGbGk1pZBYL2JtTDTWSIcWsQtD/w9DAPEk UQhhgvHuluhDJQSvdZAzpguS7N+gJGCGbGk1pZBYL2JtTDTWSIcWsQtD/w9DAPEk
@ -42,7 +49,8 @@ LdUhyKEt/if0EJd09UbH6+T7aqkuw4HthF8ab2FSlLcyQ6t0UYUlTwCLTHsquqeu
5+Le7DO89hskB8DKr4Oobmr12eulCf81UDYWSKhDYeqrBJyf3PopLg== 5+Le7DO89hskB8DKr4Oobmr12eulCf81UDYWSKhDYeqrBJyf3PopLg==
-----END RSA PRIVATE KEY----- -----END RSA PRIVATE KEY-----
""" """
INSTANCE_KEY_PUB = """-----BEGIN PUBLIC KEY----- # SELECT value FROM service_instance_artifact WHERE namespace = 'service_instance.client.all' and name = 'public_key'
NV_SI_KEY_PUB = """-----BEGIN PUBLIC KEY-----
MIIBIjANBgkqhkiG9w0BAQEFAAOCAQ8AMIIBCgKCAQEAyIz6i48cFn4XOK0S2GTY MIIBIjANBgkqhkiG9w0BAQEFAAOCAQ8AMIIBCgKCAQEAyIz6i48cFn4XOK0S2GTY
pLMU85xzJ1fmQmA2nC6Zod2V4BxNXqk+9y8nvdzZVELxmC+N47ZQGV/J5cquIadx pLMU85xzJ1fmQmA2nC6Zod2V4BxNXqk+9y8nvdzZVELxmC+N47ZQGV/J5cquIadx
0V42F3JTryJFDuZ+7fQsNlXUX3ogUQhhgvHuluhDJQSvdZAzpguS7N+gJGCGbGk1 0V42F3JTryJFDuZ+7fQsNlXUX3ogUQhhgvHuluhDJQSvdZAzpguS7N+gJGCGbGk1
@ -52,9 +60,9 @@ ayrneYAdIbEAQSQg+Z40npUwNKW71Ue2NsG3SoGWuj3lTyEVXlLsAw0bsCDVLipM
WwIDAQAB WwIDAQAB
-----END PUBLIC KEY----- -----END PUBLIC KEY-----
""" """
CLIENT_TOKEN_EXPIRE_DELTA = relativedelta(years=12) MY_CLIENT_TOKEN_EXPIRE_DELTA = relativedelta(years=12)
CONFIG_TOKEN_RESPONSE = { NV_CONFIG_TOKEN_RESPONSE = {
"certificateConfiguration": { "certificateConfiguration": {
"caChain": [ "caChain": [
"-----BEGIN CERTIFICATE-----\r\nMIIF3TCCA8WgAwIBAgIUCpVszfecRrnPa3EGwPKuyWESBmMwDQYJKoZIhvcNAQELBQAwcjELMAkG\r\nA1UEBhMCVVMxEzARBgNVBAgTCkNhbGlmb3JuaWExDzANBgNVBAoTBk52aWRpYTEnMCUGA1UECxMe\r\nTnZpZGlhIExpY2Vuc2luZyBTZXJ2aWNlIChOTFMpMRQwEgYDVQQDEwtOTFMgUm9vdCBDQTAeFw0y\r\nNDA5MjYwNzM4MTlaFw0zNDA5MjQwNzM4NDlaMHoxCzAJBgNVBAYTAlVTMRMwEQYDVQQIEwpDYWxp\r\nZm9ybmlhMQ8wDQYDVQQKEwZOdmlkaWExJzAlBgNVBAsTHk52aWRpYSBMaWNlbnNpbmcgU2Vydmlj\r\nZSAoTkxTKTEcMBoGA1UEAxMTTkxTIEludGVybWVkaWF0ZSBDQTCCAiIwDQYJKoZIhvcNAQEBBQAD\r\nggIPADCCAgoCggIBAOIb5ZcYWR78WkJipEW4cOB2d3WkXhjzA9Omj0SBnA6fJad+zObguInmkgyB\r\nUC/0xMnHeEH1WQpZ0yZE1rdH0ziwPy07hmCgjMSC8iXSfV4QXoHzsQy80HSbD3dr0A5Fk9UrWdJu\r\nIlLnwqTfUjxMSqiVYbGI2JLVLDIPjnrCKgZ//vVTFWiMDQaGInDz5Qo3azHIt1Sw3u47/b88TzmK\r\ni3TMbjtAR3djlhQfJBY6nUdP8wWy2Fntx9fO7U723sp6cnGtHnbXGpon/QqxlPjT4RXXm1QmFQ/d\r\nyUmvmjoiJsCQ3v2KFJNei2bkUS29ZKPr4TGokojOilESQAQTLo+5s0cN7ZtPWvwZ4uets84GCRP5\r\ndC+aKoNQ7cg06A1tA3SxEL9r6D2LaTiheuWKFNiIJZzfmmbTPExsKt4Nzmv72wfG2i2+sY6l4f5x\r\nEFiKybn2EY1Hjpt0J3vL/goOOt/ejRtS5qKco3pu6zZBBWqB1qesA813AGgqbscht4y4m414rPmQ\r\naHA2PTe0JRDcradK75chFUOvLeIYD1Hy0XTxNxlhRA/5mFd2GkWZmtsW3D1iAV73VHAEvWDS0hXB\r\ng60B0y4d3fyYxI+pOTaZzsh0PAC2jUqDOhQ7dKELeYUKWsEDDMq9mg2bxqSNoQnQbITIsbu7IELu\r\nvmxIWT1omRptd5LrAgMBAAGjYzBhMA4GA1UdDwEB/wQEAwIBBjAPBgNVHRMBAf8EBTADAQH/MB0G\r\nA1UdDgQWBBRKNST8UPeZYQgLZLEKMBGklaADHjAfBgNVHSMEGDAWgBRiEXE0RonjkPN+XBjnSQbo\r\nA8X3ajANBgkqhkiG9w0BAQsFAAOCAgEAEq5FaQWhTWt1hNfoz/BeDQ68O9PEGGveCPouElE8s/uG\r\nPHYSJpmg7dq5Qoxb5dpdq1mJX2rTgixJu/iC3uRUsirdH6wsVjjqz4YsoAz5VqjlkriFJpXlfOpp\r\nw18ex5C5p4x3TrlPCowMgf9h6VBR1iCq3VikVVguqSPP/zf9G3Qhitvqs0+m7KJnbwFA/bDLMET8\r\nTJS/r4XKQYisXfu95XrG2TTCaOwytqx+uepqwB74tFMznfdjzKyztqGwniKLrcZ3kOuM4cyo5ZT4\r\nOORCV6FWmbRq2OtttI4o85zsVNkY1JF8hvyvjygRiX5dQROza5EStkXvGO6532atFU43KNJvLanZ\r\nZTaxIJvZGWeKvrH+HTCANp11cgq5qcRRltQHb7KWweYNM4nyCjyBQm5vTm7g1uVI7llVm2Txx5dT\r\n5OtenaohmJIr6POeq8Y2Z+DJ8s3UpZoZCc3Vj5PQyNZiAx2ErN6XgrsmljG3w6+k2ooLpT9Sr1Ql\r\nKc8okN5SJGUOLuFI+h8jX1hHqpQejjNKy3UkTzjosYNq6Kk0h2Tl1i8iO+wY4Wb3GbL6GtP1rcjI\r\np/d9mxPNJONlp4a0koaMEpHTODT/xyVjU7FkUyKE9Uj1O/1lBEANYsFrQGfmuHAZTGf9J+cvkrz3\r\n56OFWPHcA7gxkpU8wftrVMLFeDvLIGc=\r\n-----END CERTIFICATE-----" "-----BEGIN CERTIFICATE-----\r\nMIIF3TCCA8WgAwIBAgIUCpVszfecRrnPa3EGwPKuyWESBmMwDQYJKoZIhvcNAQELBQAwcjELMAkG\r\nA1UEBhMCVVMxEzARBgNVBAgTCkNhbGlmb3JuaWExDzANBgNVBAoTBk52aWRpYTEnMCUGA1UECxMe\r\nTnZpZGlhIExpY2Vuc2luZyBTZXJ2aWNlIChOTFMpMRQwEgYDVQQDEwtOTFMgUm9vdCBDQTAeFw0y\r\nNDA5MjYwNzM4MTlaFw0zNDA5MjQwNzM4NDlaMHoxCzAJBgNVBAYTAlVTMRMwEQYDVQQIEwpDYWxp\r\nZm9ybmlhMQ8wDQYDVQQKEwZOdmlkaWExJzAlBgNVBAsTHk52aWRpYSBMaWNlbnNpbmcgU2Vydmlj\r\nZSAoTkxTKTEcMBoGA1UEAxMTTkxTIEludGVybWVkaWF0ZSBDQTCCAiIwDQYJKoZIhvcNAQEBBQAD\r\nggIPADCCAgoCggIBAOIb5ZcYWR78WkJipEW4cOB2d3WkXhjzA9Omj0SBnA6fJad+zObguInmkgyB\r\nUC/0xMnHeEH1WQpZ0yZE1rdH0ziwPy07hmCgjMSC8iXSfV4QXoHzsQy80HSbD3dr0A5Fk9UrWdJu\r\nIlLnwqTfUjxMSqiVYbGI2JLVLDIPjnrCKgZ//vVTFWiMDQaGInDz5Qo3azHIt1Sw3u47/b88TzmK\r\ni3TMbjtAR3djlhQfJBY6nUdP8wWy2Fntx9fO7U723sp6cnGtHnbXGpon/QqxlPjT4RXXm1QmFQ/d\r\nyUmvmjoiJsCQ3v2KFJNei2bkUS29ZKPr4TGokojOilESQAQTLo+5s0cN7ZtPWvwZ4uets84GCRP5\r\ndC+aKoNQ7cg06A1tA3SxEL9r6D2LaTiheuWKFNiIJZzfmmbTPExsKt4Nzmv72wfG2i2+sY6l4f5x\r\nEFiKybn2EY1Hjpt0J3vL/goOOt/ejRtS5qKco3pu6zZBBWqB1qesA813AGgqbscht4y4m414rPmQ\r\naHA2PTe0JRDcradK75chFUOvLeIYD1Hy0XTxNxlhRA/5mFd2GkWZmtsW3D1iAV73VHAEvWDS0hXB\r\ng60B0y4d3fyYxI+pOTaZzsh0PAC2jUqDOhQ7dKELeYUKWsEDDMq9mg2bxqSNoQnQbITIsbu7IELu\r\nvmxIWT1omRptd5LrAgMBAAGjYzBhMA4GA1UdDwEB/wQEAwIBBjAPBgNVHRMBAf8EBTADAQH/MB0G\r\nA1UdDgQWBBRKNST8UPeZYQgLZLEKMBGklaADHjAfBgNVHSMEGDAWgBRiEXE0RonjkPN+XBjnSQbo\r\nA8X3ajANBgkqhkiG9w0BAQsFAAOCAgEAEq5FaQWhTWt1hNfoz/BeDQ68O9PEGGveCPouElE8s/uG\r\nPHYSJpmg7dq5Qoxb5dpdq1mJX2rTgixJu/iC3uRUsirdH6wsVjjqz4YsoAz5VqjlkriFJpXlfOpp\r\nw18ex5C5p4x3TrlPCowMgf9h6VBR1iCq3VikVVguqSPP/zf9G3Qhitvqs0+m7KJnbwFA/bDLMET8\r\nTJS/r4XKQYisXfu95XrG2TTCaOwytqx+uepqwB74tFMznfdjzKyztqGwniKLrcZ3kOuM4cyo5ZT4\r\nOORCV6FWmbRq2OtttI4o85zsVNkY1JF8hvyvjygRiX5dQROza5EStkXvGO6532atFU43KNJvLanZ\r\nZTaxIJvZGWeKvrH+HTCANp11cgq5qcRRltQHb7KWweYNM4nyCjyBQm5vTm7g1uVI7llVm2Txx5dT\r\n5OtenaohmJIr6POeq8Y2Z+DJ8s3UpZoZCc3Vj5PQyNZiAx2ErN6XgrsmljG3w6+k2ooLpT9Sr1Ql\r\nKc8okN5SJGUOLuFI+h8jX1hHqpQejjNKy3UkTzjosYNq6Kk0h2Tl1i8iO+wY4Wb3GbL6GtP1rcjI\r\np/d9mxPNJONlp4a0koaMEpHTODT/xyVjU7FkUyKE9Uj1O/1lBEANYsFrQGfmuHAZTGf9J+cvkrz3\r\n56OFWPHcA7gxkpU8wftrVMLFeDvLIGc=\r\n-----END CERTIFICATE-----"
@ -72,39 +80,39 @@ CONFIG_TOKEN_RESPONSE = {
### VARIABLES ### VARIABLES
instance_private_key = load_pem_private_key(INSTANCE_KEY_RSA.encode('utf-8'), password=None) nv_si_private_key = load_pem_private_key(NV_SI_KEY_RSA.encode('utf-8'), password=None)
instance_private_key_pem = instance_private_key.private_bytes( nv_si_private_key_pem = nv_si_private_key.private_bytes(
encoding=serialization.Encoding.PEM, encoding=serialization.Encoding.PEM,
format=serialization.PrivateFormat.TraditionalOpenSSL, format=serialization.PrivateFormat.TraditionalOpenSSL,
encryption_algorithm=serialization.NoEncryption() encryption_algorithm=serialization.NoEncryption()
) )
instance_public_key = load_pem_public_key(INSTANCE_KEY_PUB.encode('utf-8')) nv_si_public_key = load_pem_public_key(NV_SI_KEY_PUB.encode('utf-8'))
instance_public_key_pem = instance_public_key.public_bytes( nv_si_public_key_pem = nv_si_public_key.public_bytes(
encoding=serialization.Encoding.PEM, encoding=serialization.Encoding.PEM,
format=serialization.PublicFormat.SubjectPublicKeyInfo format=serialization.PublicFormat.SubjectPublicKeyInfo
) )
instance_public_key_exp = instance_public_key.public_numbers().e nv_si_public_key_exp = nv_si_public_key.public_numbers().e
instance_public_key_mod = f'{instance_public_key.public_numbers().n:x}' # hex value without "0x" prefix nv_si_public_key_mod = f'{nv_si_public_key.public_numbers().n:x}' # hex value without "0x" prefix
### TESTS ### TESTS
def test_private_public_key_equals_public_key(): def test_private_public_key_equals_public_key():
""" """
test that the public-key exported from private-key is the same as the public-key test that the public-key exported from private-key is the same as the defined public-key
:return: :return:
""" """
public_key_from_private_key_as_pem = instance_private_key.public_key().public_bytes( nv_public_key_from_private_key_as_pem = nv_si_private_key.public_key().public_bytes(
encoding=serialization.Encoding.PEM, encoding=serialization.Encoding.PEM,
format=serialization.PublicFormat.SubjectPublicKeyInfo format=serialization.PublicFormat.SubjectPublicKeyInfo
) )
public_key_as_pem = instance_public_key.public_bytes( nv_public_key_as_pem = nv_si_public_key.public_bytes(
encoding=serialization.Encoding.PEM, encoding=serialization.Encoding.PEM,
format=serialization.PublicFormat.SubjectPublicKeyInfo format=serialization.PublicFormat.SubjectPublicKeyInfo
) )
assert public_key_as_pem == public_key_from_private_key_as_pem assert nv_public_key_as_pem == nv_public_key_from_private_key_as_pem
def test_official_cert(): def test_official_cert():
@ -114,21 +122,20 @@ def test_official_cert():
:return: :return:
""" """
response_certificate_configuration = CONFIG_TOKEN_RESPONSE.get('certificateConfiguration') nv_response_certificate_configuration = NV_CONFIG_TOKEN_RESPONSE.get('certificateConfiguration')
response_public_cert = response_certificate_configuration.get('publicCert').encode('utf-8') nv_response_public_cert = nv_response_certificate_configuration.get('publicCert').encode('utf-8')
response_public_key = response_certificate_configuration.get('publicKey') nv_response_public_key = nv_response_certificate_configuration.get('publicKey')
nv_response_parsed_cert = x509.load_pem_x509_certificate(nv_response_public_cert)
response_parsed_cert = x509.load_pem_x509_certificate(response_public_cert, default_backend()) nv_response_parsed_cert_exp = nv_response_parsed_cert.public_key().public_numbers().e
nv_response_parsed_cert_mod = f'{nv_response_parsed_cert.public_key().public_numbers().n:x}' # hex value without "0x" prefix
assert nv_response_parsed_cert_exp == nv_response_public_key.get('exp')
assert nv_response_parsed_cert_mod == nv_response_public_key.get('mod')[0]
response_parsed_cert_exp = response_parsed_cert.public_key().public_numbers().e nv_issuer = f'CN=NLS Intermediate CA,OU=Nvidia Licensing Service (NLS),O=Nvidia,ST=California,C=US'
response_parsed_cert_mod = f'{response_parsed_cert.public_key().public_numbers().n:x}' # hex value without "0x" prefix nv_subject = f'CN={NV_SI_SITE_ID}'
assert response_parsed_cert_exp == response_public_key.get('exp') assert nv_response_parsed_cert.issuer.rfc4514_string() == nv_issuer
assert response_parsed_cert_mod == response_public_key.get('mod')[0] assert nv_response_parsed_cert.subject.rfc4514_string() == nv_subject
issuer = f'CN=NLS Intermediate CA,OU=Nvidia Licensing Service (NLS),O=Nvidia,ST=California,C=US'
subject = f'CN={INSTANCE_SITE_ID}'
assert response_parsed_cert.issuer.rfc4514_string() == issuer
assert response_parsed_cert.subject.rfc4514_string() == subject
def test_official_config_token_response(): def test_official_config_token_response():
@ -138,43 +145,45 @@ def test_official_config_token_response():
:return: :return:
""" """
response_certificate_configuration = CONFIG_TOKEN_RESPONSE.get('certificateConfiguration') nv_response_certificate_configuration = NV_CONFIG_TOKEN_RESPONSE.get('certificateConfiguration')
response_public_cert = response_certificate_configuration.get('publicCert').encode('utf-8') nv_response_public_cert = nv_response_certificate_configuration.get('publicCert').encode('utf-8')
jwt_decode_key = jwk.construct(response_public_cert, algorithm=ALGORITHMS.RS256) nv_jwt_decode_key = jwk.construct(nv_response_public_cert, algorithm=ALGORITHMS.RS256)
response_config_token = CONFIG_TOKEN_RESPONSE.get('configToken') nv_response_config_token = NV_CONFIG_TOKEN_RESPONSE.get('configToken')
payload = jws.verify(response_config_token, key=jwt_decode_key, algorithms=ALGORITHMS.RS256) payload = jws.verify(nv_response_config_token, key=nv_jwt_decode_key, algorithms=ALGORITHMS.RS256)
payload = json.loads(payload) payload = json.loads(payload)
assert payload.get('iss') == 'NLS Service Instance' assert payload.get('iss') == 'NLS Service Instance'
assert payload.get('aud') == 'NLS Licensed Client' assert payload.get('aud') == 'NLS Licensed Client'
assert payload.get('service_instance_ref') == INSTANCE_SITE_ID assert payload.get('service_instance_ref') == NV_SI_SITE_ID
service_instance_public_key_configuration = payload.get('service_instance_public_key_configuration') nv_si_public_key_configuration = payload.get('service_instance_public_key_configuration')
service_instance_public_key_me = service_instance_public_key_configuration.get('service_instance_public_key_me') nv_si_public_key_me = nv_si_public_key_configuration.get('service_instance_public_key_me')
assert service_instance_public_key_me.get('mod') == instance_public_key_mod assert nv_si_public_key_me.get('mod') == nv_si_public_key_mod
assert service_instance_public_key_me.get('exp') == instance_public_key_exp assert nv_si_public_key_me.get('exp') == nv_si_public_key_exp
def test_our_config_token(): def test_our_config_token():
""" """
test our config-token and test jwt-signing test our config-token with nvidia data and test jwt-signing.
THIS TEST WILL FAIL UNTIL WE HAVE FOUND THE CORRECT PRIVATE KEY
:return: :return:
""" """
response_certificate_configuration = CONFIG_TOKEN_RESPONSE.get('certificateConfiguration') nv_response_certificate_configuration = NV_CONFIG_TOKEN_RESPONSE.get('certificateConfiguration')
response_public_cert = response_certificate_configuration.get('publicCert').encode('utf-8') nv_response_public_cert = nv_response_certificate_configuration.get('publicCert').encode('utf-8')
nv_response_parsed_cert = x509.load_pem_x509_certificate(nv_response_public_cert)
response_parsed_cert = x509.load_pem_x509_certificate(response_public_cert, default_backend()) jwt_decode_key = jwk.construct(nv_response_parsed_cert.public_bytes(encoding=Encoding.PEM),
jwt_decode_key = jwk.construct(response_parsed_cert.public_bytes(encoding=Encoding.PEM), algorithm=ALGORITHMS.RS256) algorithm=ALGORITHMS.RS256)
### build payload ### build payload
cur_time = datetime.now(UTC) cur_time = datetime.now(UTC)
exp_time = cur_time + CLIENT_TOKEN_EXPIRE_DELTA exp_time = cur_time + MY_CLIENT_TOKEN_EXPIRE_DELTA
payload = { my_payload = {
"iss": "NLS Service Instance", "iss": "NLS Service Instance",
"aud": "NLS Licensed Client", "aud": "NLS Licensed Client",
"iat": timegm(cur_time.timetuple()), "iat": timegm(cur_time.timetuple()),
@ -182,54 +191,53 @@ def test_our_config_token():
"exp": timegm(exp_time.timetuple()), "exp": timegm(exp_time.timetuple()),
"protocol_version": "2.0", "protocol_version": "2.0",
"d_name": "DLS", "d_name": "DLS",
"service_instance_ref": INSTANCE_SITE_ID, "service_instance_ref": NV_SI_SITE_ID,
"service_instance_public_key_configuration": { "service_instance_public_key_configuration": {
"service_instance_public_key_me": { "service_instance_public_key_me": {
"mod": hex(instance_public_key.public_numbers().n)[2:], "mod": hex(nv_si_public_key.public_numbers().n)[2:],
"exp": int(instance_public_key.public_numbers().e), "exp": int(nv_si_public_key.public_numbers().e),
}, },
"service_instance_public_key_pem": instance_public_key_pem.decode('utf-8'), "service_instance_public_key_pem": nv_si_public_key_pem.decode('utf-8'),
"key_retention_mode": "LATEST_ONLY" "key_retention_mode": "LATEST_ONLY"
}, },
} }
# todo: maybe DLS_SI_CERTIFICATE['private_key'] # todo: maybe DLS_SI_CERTIFICATE['private_key']
# sign_key = load_key(join(dirname(__file__), 'cert\\....pem')) # our_correct_sign_key = load_key('our_correct_private_key.pem').export_key().decode('utf-8')
# sign_key = jwk.construct(sign_key.export_key().decode('utf-8'), algorithm=ALGORITHMS.RS256) # our_correct_sign_key = jwk.construct(our_correct_sign_key, algorithm=ALGORITHMS.RS256)
sign_key = jwk.construct(instance_private_key_pem.decode('utf-8'), algorithm=ALGORITHMS.RS256) nv_sign_key = jwk.construct(nv_si_private_key_pem.decode('utf-8'), algorithm=ALGORITHMS.RS256)
# config_token = jws.sign(payload, key=sign_key, headers=None, algorithm=ALGORITHMS.RS256) # our_correct_config_token = jws.sign(payload, key=our_correct_sign_key, headers=None, algorithm=ALGORITHMS.RS256)
# until we have not found the correct private key,
# "jwt_encode_key" has invalid signature (can't be verified with DLS_SI_CERTIFICATE['certificate']) # "jwt_encode_key" has invalid signature (can't be verified with DLS_SI_CERTIFICATE['certificate'])
config_token = jws.sign(payload, key=sign_key, headers=None, algorithm=ALGORITHMS.RS256) my_config_token = jws.sign(my_payload, key=nv_sign_key, headers=None, algorithm=ALGORITHMS.RS256)
response = { my_response = {
"certificateConfiguration": { "certificateConfiguration": {
"caChain": CONFIG_TOKEN_RESPONSE['certificateConfiguration']['caChain'], "caChain": NV_CONFIG_TOKEN_RESPONSE['certificateConfiguration']['caChain'],
"publicCert": CONFIG_TOKEN_RESPONSE['certificateConfiguration']['publicCert'], "publicCert": NV_CONFIG_TOKEN_RESPONSE['certificateConfiguration']['publicCert'],
"publicKey": { "publicKey": {
"exp": int(response_parsed_cert.public_key().public_numbers().e), "exp": int(nv_response_parsed_cert.public_key().public_numbers().e),
"mod": [hex(response_parsed_cert.public_key().public_numbers().n)[2:]], "mod": [hex(nv_response_parsed_cert.public_key().public_numbers().n)[2:]],
}, },
}, },
"configToken": config_token, "configToken": my_config_token,
} }
### ###
our_config_token = response.get('configToken') my_config_token = my_response.get('configToken')
payload = jws.verify(our_config_token, key=jwt_decode_key, algorithms=ALGORITHMS.RS256) # error! payload = jws.verify(my_config_token, key=jwt_decode_key, algorithms=ALGORITHMS.RS256) # error!
payload = json.loads(payload) payload = json.loads(payload)
assert payload.get('iss') == 'NLS Service Instance' assert payload.get('iss') == 'NLS Service Instance'
assert payload.get('aud') == 'NLS Licensed Client' assert payload.get('aud') == 'NLS Licensed Client'
assert payload.get('service_instance_ref') == INSTANCE_SITE_ID assert payload.get('service_instance_ref') == NV_SI_SITE_ID
service_instance_public_key_configuration = payload.get('service_instance_public_key_configuration') service_si_public_key_configuration = payload.get('service_instance_public_key_configuration')
service_instance_public_key_me = service_instance_public_key_configuration.get('service_instance_public_key_me') service_si_public_key_me = service_si_public_key_configuration.get('service_instance_public_key_me')
assert service_si_public_key_me.get('mod') == nv_si_public_key_mod
assert service_instance_public_key_me.get('mod') == instance_public_key_mod assert service_si_public_key_me.get('exp') == nv_si_public_key_exp
assert service_instance_public_key_me.get('exp') == instance_public_key_exp
def test_our_config_token_with_our_key(): def test_our_config_token_with_our_key():
@ -239,8 +247,25 @@ def test_our_config_token_with_our_key():
:return: :return:
""" """
my_private_key = generate_private_key(public_exponent=65537, key_size=2024)
my_public_key = my_private_key.public_key()
my_private_key_pem = my_private_key.private_bytes(
encoding=serialization.Encoding.PEM,
format=serialization.PrivateFormat.TraditionalOpenSSL,
encryption_algorithm=serialization.NoEncryption()
)
my_public_key_pem = my_public_key.public_bytes(
encoding=serialization.Encoding.PEM,
format=serialization.PublicFormat.SubjectPublicKeyInfo
)
my_si_public_key_exp = my_public_key.public_numbers().e
my_si_public_key_mod = f'{my_public_key.public_numbers().n:x}' # hex value without "0x" prefix
# create ca-certificate subject # create ca-certificate subject
ca_subject = x509.Name([ my_ca_subject = x509.Name([
x509.NameAttribute(NameOID.COUNTRY_NAME, u"DE"), x509.NameAttribute(NameOID.COUNTRY_NAME, u"DE"),
x509.NameAttribute(NameOID.STATE_OR_PROVINCE_NAME, u"Hessen"), x509.NameAttribute(NameOID.STATE_OR_PROVINCE_NAME, u"Hessen"),
x509.NameAttribute(NameOID.LOCALITY_NAME, u"Wiesbaden"), x509.NameAttribute(NameOID.LOCALITY_NAME, u"Wiesbaden"),
@ -249,18 +274,19 @@ def test_our_config_token_with_our_key():
]) ])
# create self-signed ca-certificate # create self-signed ca-certificate
ca_certificate = (x509.CertificateBuilder() my_ca_certificate = (
.subject_name(ca_subject) x509.CertificateBuilder()
.issuer_name(ca_subject) .subject_name(my_ca_subject)
.public_key(instance_private_key.public_key()) .issuer_name(my_ca_subject)
.serial_number(x509.random_serial_number()) .public_key(my_public_key)
.not_valid_before(datetime.now(tz=UTC) - timedelta(days=1)) .serial_number(x509.random_serial_number())
.not_valid_after(datetime.now(tz=UTC) + timedelta(days=365 * 10)) .not_valid_before(datetime.now(tz=UTC) - timedelta(days=1))
.add_extension(x509.BasicConstraints(ca=True, path_length=None), critical=True) .not_valid_after(datetime.now(tz=UTC) + timedelta(days=365 * 10))
.sign(instance_private_key, hashes.SHA256())) .add_extension(x509.BasicConstraints(ca=True, path_length=None), critical=True)
.sign(my_private_key, hashes.SHA256()))
# create certificate subject # create certificate subject
subject = x509.Name([ my_subject = x509.Name([
x509.NameAttribute(NameOID.COUNTRY_NAME, u'US'), x509.NameAttribute(NameOID.COUNTRY_NAME, u'US'),
x509.NameAttribute(NameOID.STATE_OR_PROVINCE_NAME, u'California'), x509.NameAttribute(NameOID.STATE_OR_PROVINCE_NAME, u'California'),
x509.NameAttribute(NameOID.ORGANIZATION_NAME, u'Nvidia'), x509.NameAttribute(NameOID.ORGANIZATION_NAME, u'Nvidia'),
@ -268,26 +294,27 @@ def test_our_config_token_with_our_key():
x509.NameAttribute(NameOID.COMMON_NAME, u"NLS Intermediate CA"), x509.NameAttribute(NameOID.COMMON_NAME, u"NLS Intermediate CA"),
]) ])
issuer_name = ca_subject if ca_subject is not None else subject my_issuer_name = my_ca_subject if my_ca_subject is not None else my_subject
# create self-signed certificate # create self-signed certificate
certificate = (x509.CertificateBuilder() my_certificate = (
.subject_name(subject) x509.CertificateBuilder()
.issuer_name(issuer_name) .subject_name(my_subject)
.public_key(instance_private_key.public_key()) .issuer_name(my_issuer_name)
.serial_number(x509.random_serial_number()) .public_key(my_public_key)
.not_valid_before(datetime.now(tz=UTC) - timedelta(days=1)) .serial_number(x509.random_serial_number())
.not_valid_after(datetime.now(tz=UTC) + timedelta(days=3650)) .not_valid_before(datetime.now(tz=UTC) - timedelta(days=1))
.add_extension(x509.BasicConstraints(ca=False, path_length=None), critical=False) .not_valid_after(datetime.now(tz=UTC) + timedelta(days=3650))
.sign(instance_private_key, hashes.SHA256()) .add_extension(x509.BasicConstraints(ca=False, path_length=None), critical=False)
) .sign(my_private_key, hashes.SHA256())
)
jwt_decode_key = jwk.construct(certificate.public_bytes(encoding=Encoding.PEM), algorithm=ALGORITHMS.RS256) my_jwt_decode_key = jwk.construct(my_certificate.public_bytes(encoding=Encoding.PEM), algorithm=ALGORITHMS.RS256)
### build payload ### build payload
cur_time = datetime.now(UTC) cur_time = datetime.now(UTC)
exp_time = cur_time + CLIENT_TOKEN_EXPIRE_DELTA exp_time = cur_time + MY_CLIENT_TOKEN_EXPIRE_DELTA
payload = { payload = {
"iss": "NLS Service Instance", "iss": "NLS Service Instance",
@ -297,43 +324,43 @@ def test_our_config_token_with_our_key():
"exp": timegm(exp_time.timetuple()), "exp": timegm(exp_time.timetuple()),
"protocol_version": "2.0", "protocol_version": "2.0",
"d_name": "DLS", "d_name": "DLS",
"service_instance_ref": INSTANCE_SITE_ID, "service_instance_ref": NV_SI_SITE_ID,
"service_instance_public_key_configuration": { "service_instance_public_key_configuration": {
"service_instance_public_key_me": { "service_instance_public_key_me": {
"mod": hex(instance_public_key.public_numbers().n)[2:], "mod": hex(my_public_key.public_numbers().n)[2:],
"exp": int(instance_public_key.public_numbers().e), "exp": int(my_public_key.public_numbers().e),
}, },
"service_instance_public_key_pem": instance_public_key_pem.decode('utf-8'), "service_instance_public_key_pem": my_public_key_pem.decode('utf-8'),
"key_retention_mode": "LATEST_ONLY" "key_retention_mode": "LATEST_ONLY"
}, },
} }
sign_key = jwk.construct(instance_private_key_pem.decode('utf-8'), algorithm=ALGORITHMS.RS256) my_sign_key = jwk.construct(my_private_key_pem.decode('utf-8'), algorithm=ALGORITHMS.RS256)
config_token = jws.sign(payload, key=sign_key, headers=None, algorithm=ALGORITHMS.RS256) my_config_token = jws.sign(payload, key=my_sign_key, headers=None, algorithm=ALGORITHMS.RS256)
response = { response = {
"certificateConfiguration": { "certificateConfiguration": {
"caChain": [ca_certificate.public_bytes(encoding=Encoding.PEM)], "caChain": [my_ca_certificate.public_bytes(encoding=Encoding.PEM)],
"publicCert": certificate.public_bytes(encoding=Encoding.PEM), "publicCert": my_ca_certificate.public_bytes(encoding=Encoding.PEM),
"publicKey": { "publicKey": {
"exp": int(certificate.public_key().public_numbers().e), "exp": int(my_certificate.public_key().public_numbers().e),
"mod": [hex(certificate.public_key().public_numbers().n)[2:]], "mod": [hex(my_certificate.public_key().public_numbers().n)[2:]],
}, },
}, },
"configToken": config_token, "configToken": my_config_token,
} }
### ###
response_config_token = response.get('configToken') my_response_config_token = response.get('configToken')
payload = jws.verify(response_config_token, key=jwt_decode_key, algorithms=ALGORITHMS.RS256) # error! payload = jws.verify(my_response_config_token, key=my_jwt_decode_key, algorithms=ALGORITHMS.RS256)
payload = json.loads(payload) payload = json.loads(payload)
assert payload.get('iss') == 'NLS Service Instance' assert payload.get('iss') == 'NLS Service Instance'
assert payload.get('aud') == 'NLS Licensed Client' assert payload.get('aud') == 'NLS Licensed Client'
assert payload.get('service_instance_ref') == INSTANCE_SITE_ID assert payload.get('service_instance_ref') == NV_SI_SITE_ID
service_instance_public_key_configuration = payload.get('service_instance_public_key_configuration') my_si_public_key_configuration = payload.get('service_instance_public_key_configuration')
service_instance_public_key_me = service_instance_public_key_configuration.get('service_instance_public_key_me') my_si_public_key_me = my_si_public_key_configuration.get('service_instance_public_key_me')
assert service_instance_public_key_me.get('mod') == instance_public_key_mod assert my_si_public_key_me.get('mod') == my_si_public_key_mod
assert service_instance_public_key_me.get('exp') == instance_public_key_exp assert my_si_public_key_me.get('exp') == my_si_public_key_exp