# Instance Token [TOC] # Generate `dls_instance_token.tok` - Generated by `https:///service_instance_manager/v1/service-instance/identity-token` - Using binary `service_instance_token_manager.cpython-312-x86_64-linux-gnu.so` file -
`service_instance_token_manager_controller.py` ```python # from "service_instance_token_manager.cpython-312-x86_64-linux-gnu.so" _serviceInstanceTokenManager = ServiceInstanceTokenManager() # ... data = _serviceInstanceTokenManager.siit_get(event_args) ```
# Load and Parse `liense.bin` For *logs* and *patches* see [src/nls/patches-and-logs](/src/nls/patches-and-logs). This [Private-Key we have](LicensingFlow.md#public-keys-private-keys-certificates) - in DB-Table `service_instance_artifact`, Key `service_instance.identity` `si_identity_private_key = self._get_si_identity_private_key(service_instance_xid=service_instance_xid)` ## `LicenseFileInstallationManager` > `self.processor.read_license_payload()` calls `LicenseFileProcessor.read_license_payload()`
`license_file_installation_manager.py` ```python class LicenseFileInstallationManager: @retry(ExceptionToCheck=InterfaceError, tries=3, delay=0.05) def license_file_installation(self, event_args): kwargs = self._validate_and_return_args(event_args) service_instance_xid = self._validate_si_xid_header(event_args) # ensure no upgrade job is in progress self._check_migration_job(event_args, service_instance_xid) request: LicenseFileRequest = kwargs['request'] try: si_model = self.dal.get_service_instance(service_instance_xid) deployment_token, payload, product_mapping_token, api_key_models, dls_certificate_token, dls_feat_display_map_token = self._read_license_file( service_instance_xid=service_instance_xid, request=request ) # Register SI if it is UNINITIALIZED self._validate_si_state(si_model, deployment_token, event_args) generated_data = {} generated_data = self.dal.insert_license_file( license_file_xid=payload.header.license_allocation_file_xid, license_file_timestamp=payload.header.license_allocation_file_timestamp, license_allocation_list=payload.license_allocation_list, service_instance_xid=service_instance_xid, product_mapping_list=product_mapping_token.product_mapping_info, api_key_models=api_key_models, dls_certificate_token=dls_certificate_token, dls_feat_display_map_token=dls_feat_display_map_token, si_model=si_model, _license=request.license, generated_data=generated_data ) # Audit this in case of DLS if self._is_dls(si_model) and generated_data is not None: for value in generated_data.values(): if "la_xid" in value: self.audit_event_processor.audit_license_server_installation_event(value["la_xid"], si_model, event_args.get('headers')) except Exception as ex: log.error(f'Error processing license allocation file for service_instance: {service_instance_xid} Error {ex}') raise def _read_license_file(self, service_instance_xid, request): if not LicenseFileInstallationManager._is_dls_pre_registered(): # if its not pre-registered then go the standard route return self._read_lf_with_si_bound_keys(service_instance_xid, request) # X - THIS LINE IS IMPORTANT si_preregistered_private_key = LicenseFileInstallationManager._get_si_preregistered_private_key() try: # X - THIS LINE IS IMPORTANT return self.processor.read_license_payload(request.license, si_preregistered_private_key) except Exception as ex: # if decryption fails with SI pre-registered private key, possibility that user has acknowledged on NLP # so try with SI bound instance keys log.error(f"error decrypting license file with pre-registered identity key, trying SI identity private key, {ex}") return self._read_lf_with_si_bound_keys(service_instance_xid, request) def _read_lf_with_si_bound_keys(self, service_instance_xid, request): si_identity_private_key = self._get_si_identity_private_key(service_instance_xid=service_instance_xid) # if it fails with generic SI private key, try with SI bound key deployment_token, payload, product_mapping_token, api_keys_models, dls_certificate_token, dls_feat_display_map_token = self.processor.read_license_payload(request.license, si_identity_private_key) return deployment_token, payload, product_mapping_token, api_keys_models, dls_certificate_token, dls_feat_display_map_token def _get_si_identity_private_key(self, service_instance_xid): # if this fails look for global private key because it means that the incoming file has pre-registered token try: # Get SI Identity private key to decrypt this license file si_identity_private_key = self.dal.get_si_artifact( service_instance_xid, si_constants.SERVICE_INSTANCE_IDENTITY_NAMESPACE, si_constants.ARTIFACT_NAME_PRIVATE_KEY ) return si_identity_private_key.value except NotFoundError as ex: log.error(f'Error fetching artifacts for SI attached to this license file', ex) raise BadRequestError("Failed to process license allocation file") @staticmethod def _is_dls_pre_registered(): return os.path.exists(si_constants.SI_IS_PRE_REGISTRATION_MARKER) @staticmethod def _get_si_preregistered_private_key(): _global_private_key = PrivateKey.from_data(os.getenv("DLS_PRE_REGISTRATION_PRIVATE_KEY")) return _global_private_key.data ```
## `LicenseFileProcessor`
`license_file_processor.py` ```python class LicenseFileProcessor: def build_license_payload(self, license_allocation_file_xid, license_allocation_file_timestamp, license_allocation_list, public_key_string, deployment_token, product_mapping_token=None, api_keys_response=None, dls_certificate_properties=None, dls_feature_display_mapping_token=None): # Generate license file container with laf and preamble license_file_container = LicenseFileContainer() product_mapping_token_base64 = self.generate_base64_encrypted_string(product_mapping_token, public_key_string) api_keys_response_encrypted_base64 = self.get_api_key_encrypted_encoded_val(api_keys_response, public_key_string) encrypted_dls_certificate_token = self.get_encrypted_dls_certificate_token(dls_certificate_properties, public_key_string) encrypted_dls_feature_display_mapping_token = self._get_encrypted_dls_feature_display_mapping_token(dls_feature_display_mapping_token, public_key_string) license_file_container.preamble = LicenseFilePreamble( deployment_token=deployment_token, product_mapping_token=product_mapping_token_base64, api_keys_response=api_keys_response_encrypted_base64, dls_certificate_token=encrypted_dls_certificate_token, dls_feature_display_mapping_token=encrypted_dls_feature_display_mapping_token ) # process license file payload license_file_payload = LicenseFilePayload() license_file_payload.header = LicenseFilePayloadHeader( license_allocation_file_xid=license_allocation_file_xid, license_allocation_file_timestamp=license_allocation_file_timestamp.isoformat() ) license_file_payload.license_allocation_list = license_allocation_list # Generate license file response # need special UUIDEncoder because license_file_payload contains UUID objects payload_str = json_dumps(license_file_payload.to_dict(), cls=UUIDEncoder) public_key = PublicKey.from_data(public_key_string) encrypted_payload_str = public_key.encrypt_aes(payload_str) encrypted_payload_str = base64.b64encode(encrypted_payload_str.encode('utf-8')).decode('utf-8') license_file_container.payload = encrypted_payload_str # dump LicenseFileContainer response to JSON and base64 encode it license_container_str = json_dumps(license_file_container.to_dict(), cls=UUIDEncoder) license_container_str = base64.b64encode(license_container_str.encode('utf-8')).decode('utf-8') return license_container_str def read_license_payload(self, license_container_str, private_key_string): try: # Decode whole string object into LicenseFileContainer object license_container_str = base64.b64decode(license_container_str.encode('utf-8')).decode('utf-8') license_file_container = LicenseFileContainer.from_dict(JsonUtils.from_json(license_container_str)) # Decode preamble and payload encrypted_payload_str = base64.b64decode(license_file_container.payload.encode('utf-8')).decode('utf-8') # X - THIS LINE IS IMPORTANT private_key = PrivateKey.from_data(private_key_string) license_file_decoded = private_key.decrypt_aes(encrypted_payload_str) payload = LicenseFilePayload.from_dict(JsonUtils.from_json(license_file_decoded)) # Decode product mapping data product_mapping_token = license_file_container.preamble.product_mapping_token if product_mapping_token is not None and product_mapping_token != "": encrypted_product_mapping_token = base64.b64decode(license_file_container.preamble.product_mapping_token.encode('utf-8')).decode('utf-8') private_key = PrivateKey.from_data(private_key_string) decrypted_product_mapping_token = private_key.decrypt_aes(encrypted_product_mapping_token) product_mapping_token = ProductMappingFileContainer.from_dict(JsonUtils.from_json(decrypted_product_mapping_token)) # Api Key preamble api_key_models = self._get_api_key_preamble(license_file_container, private_key_string) # Cert response preamble dls_certificate_token = self._get_dls_certificate_token_preamble(license_file_container, private_key_string) # Feature display mapping token dls_feat_display_map_token = self._get_dls_feature_display_mapping_token(license_file_container, private_key_string) except (UnicodeDecodeError, BinAsciiError) as be: log.exception(f'Error processing license file, invalid license file: {be}') raise BadRequestError('Invalid license file format') from be except JSONDecodeError: raise BadRequestError('Invalid license file object') except ValueError as e: if "Incorrect decryption" in str(e) or "Ciphertext too large" in str(e): log.exception(f'Error decrypting license allocation file : {e}') raise BadRequestError('Invalid license file for this service instance') else: raise except Exception as be: log.exception(f'Error processing license allocation file : {be}') raise BadRequestError('Error processing license allocation file') from be return license_file_container.preamble.deployment_token, payload, product_mapping_token, api_key_models, dls_certificate_token, dls_feat_display_map_token ```
# Other Code Interesting is that for encryption the `service_instance.deployment` **Public-Key** is used. For that one, we have no private key. see ```diff public_key_string=si_deployment_public_key.value ```
`return_file_export_manager.py` ```python class ReturnFileExportManager: def return_file_export_handler(self, event_args, params, dal): if 'file_timestamp' not in event_args: # file_timestamp not in event_args means original request on primary, # so we get current time as file_timestamp license_allocation_file_timestamp = datetime.utcnow() # modify incoming event_args parameter to add file_timestamp, # so broadcaster to sends file_timestamp to secondary event_args['file_timestamp'] = license_allocation_file_timestamp else: # file_timestamp in event_args means replication call on secondary # so we use file_timestamp from event_args license_allocation_file_timestamp = event_args['file_timestamp'] license_allocation_file_xid = self.processor.get_license_file_xid() log.info(f'Generating license allocation return file: {license_allocation_file_xid}') # Generate license allocation data license_allocation = LicenseAllocation() license_allocation.header = LicenseAllocationHeader(params.license_allotment_xid) log.info(f'Generating return for license allocation: {params.license_allotment_xid}') license_allocation.object_list = self._get_object_list(params, dal) try: si_deployment_public_key = dal.get_si_artifact_for_license_allotment( params.license_allotment_xid, si_constants.SERVICE_INSTANCE_DEPLOYMENT_NAMESPACE, si_constants.ARTIFACT_NAME_PUBLIC_KEY ) except NotFoundError as ex: log.error(f'Error fetching artifacts for SI attached to this license allocation return file', ex) raise BadRequestError("Failed to return license allocation file") # Build license file payload string encrypted_payload_str = self.processor.build_license_payload( license_allocation_file_xid=license_allocation_file_xid, license_allocation_file_timestamp=license_allocation_file_timestamp, license_allocation_list=[license_allocation], public_key_string=si_deployment_public_key.value, deployment_token="") # insert LAF record dal.insert_file_creation_record(license_allocation_file_xid, license_allocation_file_timestamp, params.license_allotment_xid, encrypted_payload_str) response = ReturnFileResponse(return_license=encrypted_payload_str) return response ```
`dls_license_file_installation_dal.py` ```python class DlsLicenseFileInstallationDal: def insert_file_creation_record(self, schema, license_file_xid, license_file_timestamp, license_allotment_xid, license_allocation_file, session=None): insert_file_creation_record_query = f""" insert into {schema}.license_allotment_file_publication (xid, license_allotment_xid, publication_detail) values (:xid, :la_xid, :publication_detail) on conflict (xid) do update set license_allotment_xid = :la_xid, publication_detail = :publication_detail """ publication_detail_dict = { 'timestamp': license_file_timestamp.isoformat(), 'license': license_allocation_file, } publication_detail = json_dumps(publication_detail_dict) session.execute(insert_file_creation_record_query, {'xid': license_file_xid, 'la_xid': license_allotment_xid, 'publication_detail': publication_detail}) ```