This commit is contained in:
Oscar Krause 2023-02-28 12:18:43 +01:00
parent 9a370f817a
commit 3921fc44f7
3 changed files with 19 additions and 14 deletions

View File

@ -265,9 +265,11 @@ async def _ha_replicate_to_ha(request: Request, background_tasks: BackgroundTask
if HA_REPLICATE is None or HA_ROLE is None: if HA_REPLICATE is None or HA_ROLE is None:
logger.warning('HA replicate endpoint triggerd, but no value for "HA_REPLICATE" or "HA_ROLE" is set!') logger.warning('HA replicate endpoint triggerd, but no value for "HA_REPLICATE" or "HA_ROLE" is set!')
return JSONr(status_code=503, content={'status': 503, 'detail': 'no value for "HA_REPLICATE" or "HA_ROLE" set'}) return JSONr(status_code=503, content={'status': 503, 'detail': 'no value for "HA_REPLICATE" or "HA_ROLE" set'})
session = sessionmaker(bind=db)() session = sessionmaker(bind=db)()
origins = session.query(Origin).all() origins = [origin.serialize() for origin in session.query(Origin).all()]
leases = session.query(Lease).all() leases = [lease.serialize(renewal_period=LEASE_RENEWAL_PERIOD, renewal_delta=LEASE_RENEWAL_DELTA) for lease in session.query(Lease).all()]
background_tasks.add_task(ha_replicate, logger, HA_REPLICATE, HA_ROLE, VERSION, DLS_URL, DLS_PORT, SITE_KEY_XID, INSTANCE_REF, origins, leases) background_tasks.add_task(ha_replicate, logger, HA_REPLICATE, HA_ROLE, VERSION, DLS_URL, DLS_PORT, SITE_KEY_XID, INSTANCE_REF, origins, leases)
return JSONr(status_code=202, content=None) return JSONr(status_code=202, content=None)
@ -295,7 +297,7 @@ async def _ha_replicate_by_ha(request: Request):
logger.error(f'Version missmatch on HA replication task!') logger.error(f'Version missmatch on HA replication task!')
return JSONr(status_code=503, content={'status': 503, 'detail': 'Missmatch for "INSTANCE_REF"'}) return JSONr(status_code=503, content={'status': 503, 'detail': 'Missmatch for "INSTANCE_REF"'})
sync_timestamp, max_seconds_behind = j.get('sync_timestamp'), 30 sync_timestamp, max_seconds_behind = datetime.fromisoformat(j.get('sync_timestamp')), 30
if sync_timestamp <= cur_time - timedelta(seconds=max_seconds_behind): if sync_timestamp <= cur_time - timedelta(seconds=max_seconds_behind):
logger.error(f'Request time more than {max_seconds_behind}s behind!') logger.error(f'Request time more than {max_seconds_behind}s behind!')
return JSONr(status_code=503, content={'status': 503, 'detail': 'Request time behind'}) return JSONr(status_code=503, content={'status': 503, 'detail': 'Request time behind'})
@ -310,7 +312,7 @@ async def _ha_replicate_by_ha(request: Request):
for lease in leases: for lease in leases:
lease_ref = lease.get('lease_ref') lease_ref = lease.get('lease_ref')
x = Lease.find_by_lease_ref(db, lease_ref) x = Lease.find_by_lease_ref(db, lease_ref)
if x.lease_updated > remote_time: if x is not None and x.lease_updated > sync_timestamp:
continue continue
logging.info(f'> [ ha ]: lease {lease_ref}') logging.info(f'> [ ha ]: lease {lease_ref}')
data = Lease.deserialize(lease) data = Lease.deserialize(lease)

View File

@ -110,10 +110,9 @@ class Lease(Base):
return Lease( return Lease(
lease_ref=j.get('lease_ref'), lease_ref=j.get('lease_ref'),
origin_ref=j.get('origin_ref'), origin_ref=j.get('origin_ref'),
lease_created=j.get('lease_created'), lease_created=datetime.fromisoformat(j.get('lease_created')),
lease_expires=j.get('lease_expires'), lease_expires=datetime.fromisoformat(j.get('lease_expires')),
lease_updated=j.get('lease_updated'), lease_updated=datetime.fromisoformat(j.get('lease_updated')),
lease_renewal=j.get('lease_renewal'),
) )
@staticmethod @staticmethod

View File

@ -28,23 +28,27 @@ def generate_key() -> "RsaKey":
return RSA.generate(bits=2048) return RSA.generate(bits=2048)
def ha_replicate(logger: "logging.Logger", ha_replicate: str, ha_role: str, version: str, dls_url: str, dls_port: int, site_key_xid: str, instance_ref: str, origins: list["Origin"], leases: list["Lease"]): def ha_replicate(logger: "logging.Logger", ha_replicate: str, ha_role: str, version: str, dls_url: str, dls_port: int, site_key_xid: str, instance_ref: str, origins: list, leases: list) -> bool:
from datetime import datetime from datetime import datetime
import httpx import httpx
if f'{dls_url}:{dls_port}' == ha_replicate:
logger.error(f'Failed to replicate this node ({ha_role}) to "{ha_replicate}": can\'t replicate to itself')
return False
data = { data = {
'VERSION': str(version), 'VERSION': str(version),
'HA_REPLICATE': f'{dls_url}:{dls_port}', 'HA_REPLICATE': f'{dls_url}:{dls_port}',
'SITE_KEY_XID': str(site_key_xid), 'SITE_KEY_XID': str(site_key_xid),
'INSTANCE_REF': str(instance_ref), 'INSTANCE_REF': str(instance_ref),
'origins': [origin.serialize() for origin in origins], 'origins': origins,
'leases': [lease.serialize() for lease in leases], 'leases': leases,
'sync_timestamp': datetime.utcnow().isoformat(), 'sync_timestamp': datetime.utcnow().isoformat(),
} }
r = httpx.put(f'https://{ha_replicate}/-/ha/replicate', json=data, verify=False) r = httpx.put(f'https://{ha_replicate}/-/ha/replicate', json=data, verify=False)
if r.status_code == 202: if r.status_code == 202:
logger.info(f'Successfully replicated this node ({ha_role}) to "{ha_replicate}".') logger.info(f'Successfully replicated this node ({ha_role}) to "{ha_replicate}".')
else: return True
logger.error(f'Failed to replicate this node ({ha_role}) to "{ha_replicate}": {r.status_code} - {r.content}') logger.error(f'Failed to replicate this node ({ha_role}) to "{ha_replicate}": {r.status_code} - {r.content}')
return False