diff --git a/src/test/test.py b/src/test/test.py index 72cda2b..39cfbcf 100644 --- a/src/test/test.py +++ b/src/test/test.py @@ -4,17 +4,24 @@ from datetime import datetime, UTC, timedelta from cryptography import x509 from cryptography.hazmat._oid import NameOID -from cryptography.hazmat.backends import default_backend from cryptography.hazmat.primitives import serialization, hashes +from cryptography.hazmat.primitives.asymmetric.rsa import generate_private_key from cryptography.hazmat.primitives.serialization import load_pem_private_key, load_pem_public_key, Encoding from dateutil.relativedelta import relativedelta from jose import jwk, jws from jose.constants import ALGORITHMS +""" +Any variables prefixed with `NV_` or `nv_` are original values, dumped from an NLS registration/activation process. +Any variables prefixed with `MY_` or `my_` are variables which are set by fastapi-dls and are "faked". +""" + ### DEFAULTS -INSTANCE_SITE_ID = '4e53a171-103b-4946-9ed8-5f4c0ee750d9' -INSTANCE_KEY_RSA = """-----BEGIN RSA PRIVATE KEY----- +# SELECT xid FROM request_routing.service_instance +NV_SI_SITE_ID = '4e53a171-103b-4946-9ed8-5f4c0ee750d9' +# SELECT value FROM si_.service_instance_artifact WHERE namespace = 'service_instance.client.all' and name = 'private_key' +NV_SI_KEY_RSA = """-----BEGIN RSA PRIVATE KEY----- MIIEpAIBAAKCAQEAyIz6i48cFn4XOK0S2GTYpLMU85xzJ1fmQmA2nC6Zod2V4BxN Xqk+9y8nvdzZVELxmC+N47ZQGV/J5cquIadx0V42F3JTryJFDuZ+7fQsNlXUX3og UQhhgvHuluhDJQSvdZAzpguS7N+gJGCGbGk1pZBYL2JtTDTWSIcWsQtD/w9DAPEk @@ -42,7 +49,8 @@ LdUhyKEt/if0EJd09UbH6+T7aqkuw4HthF8ab2FSlLcyQ6t0UYUlTwCLTHsquqeu 5+Le7DO89hskB8DKr4Oobmr12eulCf81UDYWSKhDYeqrBJyf3PopLg== -----END RSA PRIVATE KEY----- """ -INSTANCE_KEY_PUB = """-----BEGIN PUBLIC KEY----- +# SELECT value FROM service_instance_artifact WHERE namespace = 'service_instance.client.all' and name = 'public_key' +NV_SI_KEY_PUB = """-----BEGIN PUBLIC KEY----- MIIBIjANBgkqhkiG9w0BAQEFAAOCAQ8AMIIBCgKCAQEAyIz6i48cFn4XOK0S2GTY pLMU85xzJ1fmQmA2nC6Zod2V4BxNXqk+9y8nvdzZVELxmC+N47ZQGV/J5cquIadx 0V42F3JTryJFDuZ+7fQsNlXUX3ogUQhhgvHuluhDJQSvdZAzpguS7N+gJGCGbGk1 @@ -52,9 +60,9 @@ ayrneYAdIbEAQSQg+Z40npUwNKW71Ue2NsG3SoGWuj3lTyEVXlLsAw0bsCDVLipM WwIDAQAB -----END PUBLIC KEY----- """ -CLIENT_TOKEN_EXPIRE_DELTA = relativedelta(years=12) +MY_CLIENT_TOKEN_EXPIRE_DELTA = relativedelta(years=12) -CONFIG_TOKEN_RESPONSE = { +NV_CONFIG_TOKEN_RESPONSE = { "certificateConfiguration": { "caChain": [ "-----BEGIN CERTIFICATE-----\r\nMIIF3TCCA8WgAwIBAgIUCpVszfecRrnPa3EGwPKuyWESBmMwDQYJKoZIhvcNAQELBQAwcjELMAkG\r\nA1UEBhMCVVMxEzARBgNVBAgTCkNhbGlmb3JuaWExDzANBgNVBAoTBk52aWRpYTEnMCUGA1UECxMe\r\nTnZpZGlhIExpY2Vuc2luZyBTZXJ2aWNlIChOTFMpMRQwEgYDVQQDEwtOTFMgUm9vdCBDQTAeFw0y\r\nNDA5MjYwNzM4MTlaFw0zNDA5MjQwNzM4NDlaMHoxCzAJBgNVBAYTAlVTMRMwEQYDVQQIEwpDYWxp\r\nZm9ybmlhMQ8wDQYDVQQKEwZOdmlkaWExJzAlBgNVBAsTHk52aWRpYSBMaWNlbnNpbmcgU2Vydmlj\r\nZSAoTkxTKTEcMBoGA1UEAxMTTkxTIEludGVybWVkaWF0ZSBDQTCCAiIwDQYJKoZIhvcNAQEBBQAD\r\nggIPADCCAgoCggIBAOIb5ZcYWR78WkJipEW4cOB2d3WkXhjzA9Omj0SBnA6fJad+zObguInmkgyB\r\nUC/0xMnHeEH1WQpZ0yZE1rdH0ziwPy07hmCgjMSC8iXSfV4QXoHzsQy80HSbD3dr0A5Fk9UrWdJu\r\nIlLnwqTfUjxMSqiVYbGI2JLVLDIPjnrCKgZ//vVTFWiMDQaGInDz5Qo3azHIt1Sw3u47/b88TzmK\r\ni3TMbjtAR3djlhQfJBY6nUdP8wWy2Fntx9fO7U723sp6cnGtHnbXGpon/QqxlPjT4RXXm1QmFQ/d\r\nyUmvmjoiJsCQ3v2KFJNei2bkUS29ZKPr4TGokojOilESQAQTLo+5s0cN7ZtPWvwZ4uets84GCRP5\r\ndC+aKoNQ7cg06A1tA3SxEL9r6D2LaTiheuWKFNiIJZzfmmbTPExsKt4Nzmv72wfG2i2+sY6l4f5x\r\nEFiKybn2EY1Hjpt0J3vL/goOOt/ejRtS5qKco3pu6zZBBWqB1qesA813AGgqbscht4y4m414rPmQ\r\naHA2PTe0JRDcradK75chFUOvLeIYD1Hy0XTxNxlhRA/5mFd2GkWZmtsW3D1iAV73VHAEvWDS0hXB\r\ng60B0y4d3fyYxI+pOTaZzsh0PAC2jUqDOhQ7dKELeYUKWsEDDMq9mg2bxqSNoQnQbITIsbu7IELu\r\nvmxIWT1omRptd5LrAgMBAAGjYzBhMA4GA1UdDwEB/wQEAwIBBjAPBgNVHRMBAf8EBTADAQH/MB0G\r\nA1UdDgQWBBRKNST8UPeZYQgLZLEKMBGklaADHjAfBgNVHSMEGDAWgBRiEXE0RonjkPN+XBjnSQbo\r\nA8X3ajANBgkqhkiG9w0BAQsFAAOCAgEAEq5FaQWhTWt1hNfoz/BeDQ68O9PEGGveCPouElE8s/uG\r\nPHYSJpmg7dq5Qoxb5dpdq1mJX2rTgixJu/iC3uRUsirdH6wsVjjqz4YsoAz5VqjlkriFJpXlfOpp\r\nw18ex5C5p4x3TrlPCowMgf9h6VBR1iCq3VikVVguqSPP/zf9G3Qhitvqs0+m7KJnbwFA/bDLMET8\r\nTJS/r4XKQYisXfu95XrG2TTCaOwytqx+uepqwB74tFMznfdjzKyztqGwniKLrcZ3kOuM4cyo5ZT4\r\nOORCV6FWmbRq2OtttI4o85zsVNkY1JF8hvyvjygRiX5dQROza5EStkXvGO6532atFU43KNJvLanZ\r\nZTaxIJvZGWeKvrH+HTCANp11cgq5qcRRltQHb7KWweYNM4nyCjyBQm5vTm7g1uVI7llVm2Txx5dT\r\n5OtenaohmJIr6POeq8Y2Z+DJ8s3UpZoZCc3Vj5PQyNZiAx2ErN6XgrsmljG3w6+k2ooLpT9Sr1Ql\r\nKc8okN5SJGUOLuFI+h8jX1hHqpQejjNKy3UkTzjosYNq6Kk0h2Tl1i8iO+wY4Wb3GbL6GtP1rcjI\r\np/d9mxPNJONlp4a0koaMEpHTODT/xyVjU7FkUyKE9Uj1O/1lBEANYsFrQGfmuHAZTGf9J+cvkrz3\r\n56OFWPHcA7gxkpU8wftrVMLFeDvLIGc=\r\n-----END CERTIFICATE-----" @@ -72,39 +80,39 @@ CONFIG_TOKEN_RESPONSE = { ### VARIABLES -instance_private_key = load_pem_private_key(INSTANCE_KEY_RSA.encode('utf-8'), password=None) -instance_private_key_pem = instance_private_key.private_bytes( +nv_si_private_key = load_pem_private_key(NV_SI_KEY_RSA.encode('utf-8'), password=None) +nv_si_private_key_pem = nv_si_private_key.private_bytes( encoding=serialization.Encoding.PEM, format=serialization.PrivateFormat.TraditionalOpenSSL, encryption_algorithm=serialization.NoEncryption() ) -instance_public_key = load_pem_public_key(INSTANCE_KEY_PUB.encode('utf-8')) -instance_public_key_pem = instance_public_key.public_bytes( +nv_si_public_key = load_pem_public_key(NV_SI_KEY_PUB.encode('utf-8')) +nv_si_public_key_pem = nv_si_public_key.public_bytes( encoding=serialization.Encoding.PEM, format=serialization.PublicFormat.SubjectPublicKeyInfo ) -instance_public_key_exp = instance_public_key.public_numbers().e -instance_public_key_mod = f'{instance_public_key.public_numbers().n:x}' # hex value without "0x" prefix +nv_si_public_key_exp = nv_si_public_key.public_numbers().e +nv_si_public_key_mod = f'{nv_si_public_key.public_numbers().n:x}' # hex value without "0x" prefix ### TESTS def test_private_public_key_equals_public_key(): """ - test that the public-key exported from private-key is the same as the public-key + test that the public-key exported from private-key is the same as the defined public-key :return: """ - public_key_from_private_key_as_pem = instance_private_key.public_key().public_bytes( + nv_public_key_from_private_key_as_pem = nv_si_private_key.public_key().public_bytes( encoding=serialization.Encoding.PEM, format=serialization.PublicFormat.SubjectPublicKeyInfo ) - public_key_as_pem = instance_public_key.public_bytes( + nv_public_key_as_pem = nv_si_public_key.public_bytes( encoding=serialization.Encoding.PEM, format=serialization.PublicFormat.SubjectPublicKeyInfo ) - assert public_key_as_pem == public_key_from_private_key_as_pem + assert nv_public_key_as_pem == nv_public_key_from_private_key_as_pem def test_official_cert(): @@ -114,21 +122,20 @@ def test_official_cert(): :return: """ - response_certificate_configuration = CONFIG_TOKEN_RESPONSE.get('certificateConfiguration') - response_public_cert = response_certificate_configuration.get('publicCert').encode('utf-8') - response_public_key = response_certificate_configuration.get('publicKey') + nv_response_certificate_configuration = NV_CONFIG_TOKEN_RESPONSE.get('certificateConfiguration') + nv_response_public_cert = nv_response_certificate_configuration.get('publicCert').encode('utf-8') + nv_response_public_key = nv_response_certificate_configuration.get('publicKey') + nv_response_parsed_cert = x509.load_pem_x509_certificate(nv_response_public_cert) - response_parsed_cert = x509.load_pem_x509_certificate(response_public_cert, default_backend()) + nv_response_parsed_cert_exp = nv_response_parsed_cert.public_key().public_numbers().e + nv_response_parsed_cert_mod = f'{nv_response_parsed_cert.public_key().public_numbers().n:x}' # hex value without "0x" prefix + assert nv_response_parsed_cert_exp == nv_response_public_key.get('exp') + assert nv_response_parsed_cert_mod == nv_response_public_key.get('mod')[0] - response_parsed_cert_exp = response_parsed_cert.public_key().public_numbers().e - response_parsed_cert_mod = f'{response_parsed_cert.public_key().public_numbers().n:x}' # hex value without "0x" prefix - assert response_parsed_cert_exp == response_public_key.get('exp') - assert response_parsed_cert_mod == response_public_key.get('mod')[0] - - issuer = f'CN=NLS Intermediate CA,OU=Nvidia Licensing Service (NLS),O=Nvidia,ST=California,C=US' - subject = f'CN={INSTANCE_SITE_ID}' - assert response_parsed_cert.issuer.rfc4514_string() == issuer - assert response_parsed_cert.subject.rfc4514_string() == subject + nv_issuer = f'CN=NLS Intermediate CA,OU=Nvidia Licensing Service (NLS),O=Nvidia,ST=California,C=US' + nv_subject = f'CN={NV_SI_SITE_ID}' + assert nv_response_parsed_cert.issuer.rfc4514_string() == nv_issuer + assert nv_response_parsed_cert.subject.rfc4514_string() == nv_subject def test_official_config_token_response(): @@ -138,43 +145,45 @@ def test_official_config_token_response(): :return: """ - response_certificate_configuration = CONFIG_TOKEN_RESPONSE.get('certificateConfiguration') - response_public_cert = response_certificate_configuration.get('publicCert').encode('utf-8') - jwt_decode_key = jwk.construct(response_public_cert, algorithm=ALGORITHMS.RS256) + nv_response_certificate_configuration = NV_CONFIG_TOKEN_RESPONSE.get('certificateConfiguration') + nv_response_public_cert = nv_response_certificate_configuration.get('publicCert').encode('utf-8') + nv_jwt_decode_key = jwk.construct(nv_response_public_cert, algorithm=ALGORITHMS.RS256) - response_config_token = CONFIG_TOKEN_RESPONSE.get('configToken') + nv_response_config_token = NV_CONFIG_TOKEN_RESPONSE.get('configToken') - payload = jws.verify(response_config_token, key=jwt_decode_key, algorithms=ALGORITHMS.RS256) + payload = jws.verify(nv_response_config_token, key=nv_jwt_decode_key, algorithms=ALGORITHMS.RS256) payload = json.loads(payload) assert payload.get('iss') == 'NLS Service Instance' assert payload.get('aud') == 'NLS Licensed Client' - assert payload.get('service_instance_ref') == INSTANCE_SITE_ID + assert payload.get('service_instance_ref') == NV_SI_SITE_ID - service_instance_public_key_configuration = payload.get('service_instance_public_key_configuration') - service_instance_public_key_me = service_instance_public_key_configuration.get('service_instance_public_key_me') - assert service_instance_public_key_me.get('mod') == instance_public_key_mod - assert service_instance_public_key_me.get('exp') == instance_public_key_exp + nv_si_public_key_configuration = payload.get('service_instance_public_key_configuration') + nv_si_public_key_me = nv_si_public_key_configuration.get('service_instance_public_key_me') + assert nv_si_public_key_me.get('mod') == nv_si_public_key_mod + assert nv_si_public_key_me.get('exp') == nv_si_public_key_exp def test_our_config_token(): """ - test our config-token and test jwt-signing + test our config-token with nvidia data and test jwt-signing. + + THIS TEST WILL FAIL UNTIL WE HAVE FOUND THE CORRECT PRIVATE KEY :return: """ - response_certificate_configuration = CONFIG_TOKEN_RESPONSE.get('certificateConfiguration') - response_public_cert = response_certificate_configuration.get('publicCert').encode('utf-8') - - response_parsed_cert = x509.load_pem_x509_certificate(response_public_cert, default_backend()) - jwt_decode_key = jwk.construct(response_parsed_cert.public_bytes(encoding=Encoding.PEM), algorithm=ALGORITHMS.RS256) + nv_response_certificate_configuration = NV_CONFIG_TOKEN_RESPONSE.get('certificateConfiguration') + nv_response_public_cert = nv_response_certificate_configuration.get('publicCert').encode('utf-8') + nv_response_parsed_cert = x509.load_pem_x509_certificate(nv_response_public_cert) + jwt_decode_key = jwk.construct(nv_response_parsed_cert.public_bytes(encoding=Encoding.PEM), + algorithm=ALGORITHMS.RS256) ### build payload cur_time = datetime.now(UTC) - exp_time = cur_time + CLIENT_TOKEN_EXPIRE_DELTA + exp_time = cur_time + MY_CLIENT_TOKEN_EXPIRE_DELTA - payload = { + my_payload = { "iss": "NLS Service Instance", "aud": "NLS Licensed Client", "iat": timegm(cur_time.timetuple()), @@ -182,54 +191,53 @@ def test_our_config_token(): "exp": timegm(exp_time.timetuple()), "protocol_version": "2.0", "d_name": "DLS", - "service_instance_ref": INSTANCE_SITE_ID, + "service_instance_ref": NV_SI_SITE_ID, "service_instance_public_key_configuration": { "service_instance_public_key_me": { - "mod": hex(instance_public_key.public_numbers().n)[2:], - "exp": int(instance_public_key.public_numbers().e), + "mod": hex(nv_si_public_key.public_numbers().n)[2:], + "exp": int(nv_si_public_key.public_numbers().e), }, - "service_instance_public_key_pem": instance_public_key_pem.decode('utf-8'), + "service_instance_public_key_pem": nv_si_public_key_pem.decode('utf-8'), "key_retention_mode": "LATEST_ONLY" }, } # todo: maybe DLS_SI_CERTIFICATE['private_key'] - # sign_key = load_key(join(dirname(__file__), 'cert\\....pem')) - # sign_key = jwk.construct(sign_key.export_key().decode('utf-8'), algorithm=ALGORITHMS.RS256) - sign_key = jwk.construct(instance_private_key_pem.decode('utf-8'), algorithm=ALGORITHMS.RS256) + # our_correct_sign_key = load_key('our_correct_private_key.pem').export_key().decode('utf-8') + # our_correct_sign_key = jwk.construct(our_correct_sign_key, algorithm=ALGORITHMS.RS256) + nv_sign_key = jwk.construct(nv_si_private_key_pem.decode('utf-8'), algorithm=ALGORITHMS.RS256) - # config_token = jws.sign(payload, key=sign_key, headers=None, algorithm=ALGORITHMS.RS256) + # our_correct_config_token = jws.sign(payload, key=our_correct_sign_key, headers=None, algorithm=ALGORITHMS.RS256) + # until we have not found the correct private key, # "jwt_encode_key" has invalid signature (can't be verified with DLS_SI_CERTIFICATE['certificate']) - config_token = jws.sign(payload, key=sign_key, headers=None, algorithm=ALGORITHMS.RS256) + my_config_token = jws.sign(my_payload, key=nv_sign_key, headers=None, algorithm=ALGORITHMS.RS256) - response = { + my_response = { "certificateConfiguration": { - "caChain": CONFIG_TOKEN_RESPONSE['certificateConfiguration']['caChain'], - "publicCert": CONFIG_TOKEN_RESPONSE['certificateConfiguration']['publicCert'], + "caChain": NV_CONFIG_TOKEN_RESPONSE['certificateConfiguration']['caChain'], + "publicCert": NV_CONFIG_TOKEN_RESPONSE['certificateConfiguration']['publicCert'], "publicKey": { - "exp": int(response_parsed_cert.public_key().public_numbers().e), - "mod": [hex(response_parsed_cert.public_key().public_numbers().n)[2:]], + "exp": int(nv_response_parsed_cert.public_key().public_numbers().e), + "mod": [hex(nv_response_parsed_cert.public_key().public_numbers().n)[2:]], }, }, - "configToken": config_token, + "configToken": my_config_token, } ### - our_config_token = response.get('configToken') + my_config_token = my_response.get('configToken') - payload = jws.verify(our_config_token, key=jwt_decode_key, algorithms=ALGORITHMS.RS256) # error! + payload = jws.verify(my_config_token, key=jwt_decode_key, algorithms=ALGORITHMS.RS256) # error! payload = json.loads(payload) - assert payload.get('iss') == 'NLS Service Instance' assert payload.get('aud') == 'NLS Licensed Client' - assert payload.get('service_instance_ref') == INSTANCE_SITE_ID + assert payload.get('service_instance_ref') == NV_SI_SITE_ID - service_instance_public_key_configuration = payload.get('service_instance_public_key_configuration') - service_instance_public_key_me = service_instance_public_key_configuration.get('service_instance_public_key_me') - - assert service_instance_public_key_me.get('mod') == instance_public_key_mod - assert service_instance_public_key_me.get('exp') == instance_public_key_exp + service_si_public_key_configuration = payload.get('service_instance_public_key_configuration') + service_si_public_key_me = service_si_public_key_configuration.get('service_instance_public_key_me') + assert service_si_public_key_me.get('mod') == nv_si_public_key_mod + assert service_si_public_key_me.get('exp') == nv_si_public_key_exp def test_our_config_token_with_our_key(): @@ -239,8 +247,25 @@ def test_our_config_token_with_our_key(): :return: """ + my_private_key = generate_private_key(public_exponent=65537, key_size=2024) + my_public_key = my_private_key.public_key() + + my_private_key_pem = my_private_key.private_bytes( + encoding=serialization.Encoding.PEM, + format=serialization.PrivateFormat.TraditionalOpenSSL, + encryption_algorithm=serialization.NoEncryption() + ) + my_public_key_pem = my_public_key.public_bytes( + encoding=serialization.Encoding.PEM, + format=serialization.PublicFormat.SubjectPublicKeyInfo + ) + + my_si_public_key_exp = my_public_key.public_numbers().e + my_si_public_key_mod = f'{my_public_key.public_numbers().n:x}' # hex value without "0x" prefix + + # create ca-certificate subject - ca_subject = x509.Name([ + my_ca_subject = x509.Name([ x509.NameAttribute(NameOID.COUNTRY_NAME, u"DE"), x509.NameAttribute(NameOID.STATE_OR_PROVINCE_NAME, u"Hessen"), x509.NameAttribute(NameOID.LOCALITY_NAME, u"Wiesbaden"), @@ -249,18 +274,19 @@ def test_our_config_token_with_our_key(): ]) # create self-signed ca-certificate - ca_certificate = (x509.CertificateBuilder() - .subject_name(ca_subject) - .issuer_name(ca_subject) - .public_key(instance_private_key.public_key()) - .serial_number(x509.random_serial_number()) - .not_valid_before(datetime.now(tz=UTC) - timedelta(days=1)) - .not_valid_after(datetime.now(tz=UTC) + timedelta(days=365 * 10)) - .add_extension(x509.BasicConstraints(ca=True, path_length=None), critical=True) - .sign(instance_private_key, hashes.SHA256())) + my_ca_certificate = ( + x509.CertificateBuilder() + .subject_name(my_ca_subject) + .issuer_name(my_ca_subject) + .public_key(my_public_key) + .serial_number(x509.random_serial_number()) + .not_valid_before(datetime.now(tz=UTC) - timedelta(days=1)) + .not_valid_after(datetime.now(tz=UTC) + timedelta(days=365 * 10)) + .add_extension(x509.BasicConstraints(ca=True, path_length=None), critical=True) + .sign(my_private_key, hashes.SHA256())) # create certificate subject - subject = x509.Name([ + my_subject = x509.Name([ x509.NameAttribute(NameOID.COUNTRY_NAME, u'US'), x509.NameAttribute(NameOID.STATE_OR_PROVINCE_NAME, u'California'), x509.NameAttribute(NameOID.ORGANIZATION_NAME, u'Nvidia'), @@ -268,26 +294,27 @@ def test_our_config_token_with_our_key(): x509.NameAttribute(NameOID.COMMON_NAME, u"NLS Intermediate CA"), ]) - issuer_name = ca_subject if ca_subject is not None else subject + my_issuer_name = my_ca_subject if my_ca_subject is not None else my_subject # create self-signed certificate - certificate = (x509.CertificateBuilder() - .subject_name(subject) - .issuer_name(issuer_name) - .public_key(instance_private_key.public_key()) - .serial_number(x509.random_serial_number()) - .not_valid_before(datetime.now(tz=UTC) - timedelta(days=1)) - .not_valid_after(datetime.now(tz=UTC) + timedelta(days=3650)) - .add_extension(x509.BasicConstraints(ca=False, path_length=None), critical=False) - .sign(instance_private_key, hashes.SHA256()) - ) + my_certificate = ( + x509.CertificateBuilder() + .subject_name(my_subject) + .issuer_name(my_issuer_name) + .public_key(my_public_key) + .serial_number(x509.random_serial_number()) + .not_valid_before(datetime.now(tz=UTC) - timedelta(days=1)) + .not_valid_after(datetime.now(tz=UTC) + timedelta(days=3650)) + .add_extension(x509.BasicConstraints(ca=False, path_length=None), critical=False) + .sign(my_private_key, hashes.SHA256()) + ) - jwt_decode_key = jwk.construct(certificate.public_bytes(encoding=Encoding.PEM), algorithm=ALGORITHMS.RS256) + my_jwt_decode_key = jwk.construct(my_certificate.public_bytes(encoding=Encoding.PEM), algorithm=ALGORITHMS.RS256) ### build payload cur_time = datetime.now(UTC) - exp_time = cur_time + CLIENT_TOKEN_EXPIRE_DELTA + exp_time = cur_time + MY_CLIENT_TOKEN_EXPIRE_DELTA payload = { "iss": "NLS Service Instance", @@ -297,43 +324,43 @@ def test_our_config_token_with_our_key(): "exp": timegm(exp_time.timetuple()), "protocol_version": "2.0", "d_name": "DLS", - "service_instance_ref": INSTANCE_SITE_ID, + "service_instance_ref": NV_SI_SITE_ID, "service_instance_public_key_configuration": { "service_instance_public_key_me": { - "mod": hex(instance_public_key.public_numbers().n)[2:], - "exp": int(instance_public_key.public_numbers().e), + "mod": hex(my_public_key.public_numbers().n)[2:], + "exp": int(my_public_key.public_numbers().e), }, - "service_instance_public_key_pem": instance_public_key_pem.decode('utf-8'), + "service_instance_public_key_pem": my_public_key_pem.decode('utf-8'), "key_retention_mode": "LATEST_ONLY" }, } - sign_key = jwk.construct(instance_private_key_pem.decode('utf-8'), algorithm=ALGORITHMS.RS256) - config_token = jws.sign(payload, key=sign_key, headers=None, algorithm=ALGORITHMS.RS256) + my_sign_key = jwk.construct(my_private_key_pem.decode('utf-8'), algorithm=ALGORITHMS.RS256) + my_config_token = jws.sign(payload, key=my_sign_key, headers=None, algorithm=ALGORITHMS.RS256) response = { "certificateConfiguration": { - "caChain": [ca_certificate.public_bytes(encoding=Encoding.PEM)], - "publicCert": certificate.public_bytes(encoding=Encoding.PEM), + "caChain": [my_ca_certificate.public_bytes(encoding=Encoding.PEM)], + "publicCert": my_ca_certificate.public_bytes(encoding=Encoding.PEM), "publicKey": { - "exp": int(certificate.public_key().public_numbers().e), - "mod": [hex(certificate.public_key().public_numbers().n)[2:]], + "exp": int(my_certificate.public_key().public_numbers().e), + "mod": [hex(my_certificate.public_key().public_numbers().n)[2:]], }, }, - "configToken": config_token, + "configToken": my_config_token, } ### - response_config_token = response.get('configToken') + my_response_config_token = response.get('configToken') - payload = jws.verify(response_config_token, key=jwt_decode_key, algorithms=ALGORITHMS.RS256) # error! + payload = jws.verify(my_response_config_token, key=my_jwt_decode_key, algorithms=ALGORITHMS.RS256) payload = json.loads(payload) assert payload.get('iss') == 'NLS Service Instance' assert payload.get('aud') == 'NLS Licensed Client' - assert payload.get('service_instance_ref') == INSTANCE_SITE_ID + assert payload.get('service_instance_ref') == NV_SI_SITE_ID - service_instance_public_key_configuration = payload.get('service_instance_public_key_configuration') - service_instance_public_key_me = service_instance_public_key_configuration.get('service_instance_public_key_me') - assert service_instance_public_key_me.get('mod') == instance_public_key_mod - assert service_instance_public_key_me.get('exp') == instance_public_key_exp + my_si_public_key_configuration = payload.get('service_instance_public_key_configuration') + my_si_public_key_me = my_si_public_key_configuration.get('service_instance_public_key_me') + assert my_si_public_key_me.get('mod') == my_si_public_key_mod + assert my_si_public_key_me.get('exp') == my_si_public_key_exp