fastapi-dls/app/util.py

51 lines
1.8 KiB
Python
Raw Normal View History

2022-12-28 10:53:56 +00:00
def load_file(filename) -> bytes:
with open(filename, 'rb') as file:
content = file.read()
return content
2022-12-29 19:40:42 +00:00
def load_key(filename) -> "RsaKey":
try:
# Crypto | Cryptodome on Debian
from Crypto.PublicKey import RSA
from Crypto.PublicKey.RSA import RsaKey
except ModuleNotFoundError:
from Cryptodome.PublicKey import RSA
from Cryptodome.PublicKey.RSA import RsaKey
2022-12-28 10:53:56 +00:00
return RSA.import_key(extern_key=load_file(filename), passphrase=None)
2022-12-29 19:40:42 +00:00
def generate_key() -> "RsaKey":
try:
# Crypto | Cryptodome on Debian
from Crypto.PublicKey import RSA
from Crypto.PublicKey.RSA import RsaKey
except ModuleNotFoundError:
from Cryptodome.PublicKey import RSA
from Cryptodome.PublicKey.RSA import RsaKey
return RSA.generate(bits=2048)
def ha_replicate(logger: "logging.Logger", ha_replicate: str, ha_role: str, version: str, dls_url: str, dls_port: int, site_key_xid: str, instance_ref: str, origins: list["Origin"], leases: list["Lease"]):
from datetime import datetime
import httpx
data = {
'VERSION': str(version),
'HA_REPLICATE': f'{dls_url}:{dls_port}',
'SITE_KEY_XID': str(site_key_xid),
'INSTANCE_REF': str(instance_ref),
'origins': [origin.serialize() for origin in origins],
'leases': [lease.serialize() for lease in leases],
2023-02-28 08:00:20 +00:00
'sync_timestamp': datetime.utcnow().isoformat(),
}
2023-02-28 10:21:52 +00:00
r = httpx.put(f'https://{ha_replicate}/-/ha/replicate', json=data, verify=False)
if r.status_code == 202:
logger.info(f'Successfully replicated this node ({ha_role}) to "{ha_replicate}".')
else:
logger.error(f'Failed to replicate this node ({ha_role}) to "{ha_replicate}": {r.status_code} - {r.content}')