code styling - migrated direct dict access to '.get()'

This commit is contained in:
Oscar Krause 2023-01-03 09:20:18 +01:00
parent 2b8c468270
commit 8f9d95056f
2 changed files with 27 additions and 28 deletions

View File

@ -229,21 +229,21 @@ async def _client_token():
async def auth_v1_origin(request: Request): async def auth_v1_origin(request: Request):
j, cur_time = json.loads((await request.body()).decode('utf-8')), datetime.utcnow() j, cur_time = json.loads((await request.body()).decode('utf-8')), datetime.utcnow()
origin_ref = j['candidate_origin_ref'] origin_ref = j.get('candidate_origin_ref')
logging.info(f'> [ origin ]: {origin_ref}: {j}') logging.info(f'> [ origin ]: {origin_ref}: {j}')
data = Origin( data = Origin(
origin_ref=origin_ref, origin_ref=origin_ref,
hostname=j['environment']['hostname'], hostname=j.get('environment').get('hostname'),
guest_driver_version=j['environment']['guest_driver_version'], guest_driver_version=j.get('environment').get('guest_driver_version'),
os_platform=j['environment']['os_platform'], os_version=j['environment']['os_version'], os_platform=j.get('environment').get('os_platform'), os_version=j.get('environment').get('os_version'),
) )
Origin.create_or_update(db, data) Origin.create_or_update(db, data)
response = { response = {
"origin_ref": origin_ref, "origin_ref": origin_ref,
"environment": j['environment'], "environment": j.get('environment'),
"svc_port_set_list": None, "svc_port_set_list": None,
"node_url_list": None, "node_url_list": None,
"node_query_order": None, "node_query_order": None,
@ -260,20 +260,20 @@ async def auth_v1_origin(request: Request):
async def auth_v1_origin_update(request: Request): async def auth_v1_origin_update(request: Request):
j, cur_time = json.loads((await request.body()).decode('utf-8')), datetime.utcnow() j, cur_time = json.loads((await request.body()).decode('utf-8')), datetime.utcnow()
origin_ref = j['origin_ref'] origin_ref = j.get('origin_ref')
logging.info(f'> [ update ]: {origin_ref}: {j}') logging.info(f'> [ update ]: {origin_ref}: {j}')
data = Origin( data = Origin(
origin_ref=origin_ref, origin_ref=origin_ref,
hostname=j['environment']['hostname'], hostname=j.get('environment').get('hostname'),
guest_driver_version=j['environment']['guest_driver_version'], guest_driver_version=j.get('environment').get('guest_driver_version'),
os_platform=j['environment']['os_platform'], os_version=j['environment']['os_version'], os_platform=j.get('environment').get('os_platform'), os_version=j.get('environment').get('os_version'),
) )
Origin.create_or_update(db, data) Origin.create_or_update(db, data)
response = { response = {
"environment": j['environment'], "environment": j.get('environment'),
"prompts": None, "prompts": None,
"sync_timestamp": cur_time.isoformat() "sync_timestamp": cur_time.isoformat()
} }
@ -288,7 +288,7 @@ async def auth_v1_origin_update(request: Request):
async def auth_v1_code(request: Request): async def auth_v1_code(request: Request):
j, cur_time = json.loads((await request.body()).decode('utf-8')), datetime.utcnow() j, cur_time = json.loads((await request.body()).decode('utf-8')), datetime.utcnow()
origin_ref = j['origin_ref'] origin_ref = j.get('origin_ref')
logging.info(f'> [ code ]: {origin_ref}: {j}') logging.info(f'> [ code ]: {origin_ref}: {j}')
delta = relativedelta(minutes=15) delta = relativedelta(minutes=15)
@ -297,8 +297,8 @@ async def auth_v1_code(request: Request):
payload = { payload = {
'iat': timegm(cur_time.timetuple()), 'iat': timegm(cur_time.timetuple()),
'exp': timegm(expires.timetuple()), 'exp': timegm(expires.timetuple()),
'challenge': j['code_challenge'], 'challenge': j.get('code_challenge'),
'origin_ref': j['origin_ref'], 'origin_ref': j.get('origin_ref'),
'key_ref': SITE_KEY_XID, 'key_ref': SITE_KEY_XID,
'kid': SITE_KEY_XID 'kid': SITE_KEY_XID
} }
@ -320,13 +320,13 @@ async def auth_v1_code(request: Request):
@app.post('/auth/v1/token', description='exchange auth code and verifier for token') @app.post('/auth/v1/token', description='exchange auth code and verifier for token')
async def auth_v1_token(request: Request): async def auth_v1_token(request: Request):
j, cur_time = json.loads((await request.body()).decode('utf-8')), datetime.utcnow() j, cur_time = json.loads((await request.body()).decode('utf-8')), datetime.utcnow()
payload = jwt.decode(token=j['auth_code'], key=jwt_decode_key) payload = jwt.decode(token=j.get('auth_code'), key=jwt_decode_key)
origin_ref = payload['origin_ref'] origin_ref = payload.get('origin_ref')
logging.info(f'> [ auth ]: {origin_ref}: {j}') logging.info(f'> [ auth ]: {origin_ref}: {j}')
# validate the code challenge # validate the code challenge
if payload['challenge'] != b64enc(sha256(j['code_verifier'].encode('utf-8')).digest()).rstrip(b'=').decode('utf-8'): if payload.get('challenge') != b64enc(sha256(j.get('code_verifier').encode('utf-8')).digest()).rstrip(b'=').decode('utf-8'):
raise HTTPException(status_code=401, detail='expected challenge did not match verifier') raise HTTPException(status_code=401, detail='expected challenge did not match verifier')
access_expires_on = cur_time + TOKEN_EXPIRE_DELTA access_expires_on = cur_time + TOKEN_EXPIRE_DELTA
@ -360,7 +360,7 @@ async def leasing_v1_lessor(request: Request):
j, token, cur_time = json.loads((await request.body()).decode('utf-8')), __get_token(request), datetime.utcnow() j, token, cur_time = json.loads((await request.body()).decode('utf-8')), __get_token(request), datetime.utcnow()
origin_ref = token.get('origin_ref') origin_ref = token.get('origin_ref')
scope_ref_list = j['scope_ref_list'] scope_ref_list = j.get('scope_ref_list')
logging.info(f'> [ create ]: {origin_ref}: create leases for scope_ref_list {scope_ref_list}') logging.info(f'> [ create ]: {origin_ref}: create leases for scope_ref_list {scope_ref_list}')
lease_result_list = [] lease_result_list = []
@ -491,7 +491,7 @@ async def leasing_v1_lessor_lease_remove(request: Request):
async def leasing_v1_lessor_shutdown(request: Request): async def leasing_v1_lessor_shutdown(request: Request):
j, cur_time = json.loads((await request.body()).decode('utf-8')) j, cur_time = json.loads((await request.body()).decode('utf-8'))
token = j['token'] token = j.get('token')
token = jwt.decode(token=token, key=jwt_decode_key, algorithms=ALGORITHMS.RS256, options={'verify_aud': False}) token = jwt.decode(token=token, key=jwt_decode_key, algorithms=ALGORITHMS.RS256, options={'verify_aud': False})
origin_ref = token.get('origin_ref') origin_ref = token.get('origin_ref')

View File

@ -44,11 +44,10 @@ def test_index():
assert response.status_code == 200 assert response.status_code == 200
def test_health(): def test_health():
response = client.get('/-/health') response = client.get('/-/health')
assert response.status_code == 200 assert response.status_code == 200
assert response.json()['status'] == 'up' assert response.json().get('status') == 'up'
def test_config(): def test_config():
@ -105,7 +104,7 @@ def test_auth_v1_origin():
response = client.post('/auth/v1/origin', json=payload) response = client.post('/auth/v1/origin', json=payload)
assert response.status_code == 200 assert response.status_code == 200
assert response.json()['origin_ref'] == ORIGIN_REF assert response.json().get('origin_ref') == ORIGIN_REF
def auth_v1_origin_update(): def auth_v1_origin_update():
@ -126,7 +125,7 @@ def auth_v1_origin_update():
response = client.post('/auth/v1/origin/update', json=payload) response = client.post('/auth/v1/origin/update', json=payload)
assert response.status_code == 200 assert response.status_code == 200
assert response.json()['origin_ref'] == ORIGIN_REF assert response.json().get('origin_ref') == ORIGIN_REF
def test_auth_v1_code(): def test_auth_v1_code():
@ -138,8 +137,8 @@ def test_auth_v1_code():
response = client.post('/auth/v1/code', json=payload) response = client.post('/auth/v1/code', json=payload)
assert response.status_code == 200 assert response.status_code == 200
payload = jwt.get_unverified_claims(token=response.json()['auth_code']) payload = jwt.get_unverified_claims(token=response.json().get('auth_code'))
assert payload['origin_ref'] == ORIGIN_REF assert payload.get('origin_ref') == ORIGIN_REF
def test_auth_v1_token(): def test_auth_v1_token():
@ -163,9 +162,9 @@ def test_auth_v1_token():
response = client.post('/auth/v1/token', json=payload) response = client.post('/auth/v1/token', json=payload)
assert response.status_code == 200 assert response.status_code == 200
token = response.json()['auth_token'] token = response.json().get('auth_token')
payload = jwt.decode(token=token, key=jwt_decode_key, algorithms=ALGORITHMS.RS256, options={'verify_aud': False}) payload = jwt.decode(token=token, key=jwt_decode_key, algorithms=ALGORITHMS.RS256, options={'verify_aud': False})
assert payload['origin_ref'] == ORIGIN_REF assert payload.get('origin_ref') == ORIGIN_REF
def test_leasing_v1_lessor(): def test_leasing_v1_lessor():
@ -193,7 +192,7 @@ def test_leasing_v1_lessor_lease():
response = client.get('/leasing/v1/lessor/leases', headers={'authorization': __bearer_token(ORIGIN_REF)}) response = client.get('/leasing/v1/lessor/leases', headers={'authorization': __bearer_token(ORIGIN_REF)})
assert response.status_code == 200 assert response.status_code == 200
active_lease_list = response.json()['active_lease_list'] active_lease_list = response.json().get('active_lease_list')
assert len(active_lease_list) == 1 assert len(active_lease_list) == 1
assert active_lease_list[0] == LEASE_REF assert active_lease_list[0] == LEASE_REF
@ -218,6 +217,6 @@ def test_leasing_v1_lessor_lease_remove():
response = client.delete('/leasing/v1/lessor/leases', headers={'authorization': __bearer_token(ORIGIN_REF)}) response = client.delete('/leasing/v1/lessor/leases', headers={'authorization': __bearer_token(ORIGIN_REF)})
assert response.status_code == 200 assert response.status_code == 200
released_lease_list = response.json()['released_lease_list'] released_lease_list = response.json().get('released_lease_list')
assert len(released_lease_list) == 1 assert len(released_lease_list) == 1
assert released_lease_list[0] == LEASE_REF assert released_lease_list[0] == LEASE_REF