Source code for tardis.adapters.sites.fakesite

from ...exceptions.tardisexceptions import TardisError
from ...interfaces.siteadapter import ResourceStatus
from ...interfaces.siteadapter import SiteAdapter
from ...utilities.attributedict import AttributeDict
from ...utilities.staticmapping import StaticMapping

from contextlib import contextmanager
from datetime import datetime
from datetime import timedelta
from functools import partial
from uuid import uuid4

import asyncio


[docs]class FakeSiteAdapter(SiteAdapter): def __init__(self, machine_type: str, site_name: str) -> None: self._machine_type = machine_type self._site_name = site_name self._api_response_delay = self.configuration.api_response_delay self._resource_boot_time = self.configuration.resource_boot_time key_translator = StaticMapping( remote_resource_uuid="remote_resource_uuid", resource_status="resource_status", ) self.handle_response = partial( self.handle_response, key_translator=key_translator, translator_functions=StaticMapping(), )
[docs] async def deploy_resource( self, resource_attributes: AttributeDict ) -> AttributeDict: await asyncio.sleep(self._api_response_delay.get_value()) response = AttributeDict( remote_resource_uuid=uuid4().hex, resource_status=ResourceStatus.Booting, ) return self.handle_response(response)
[docs] async def resource_status( self, resource_attributes: AttributeDict ) -> AttributeDict: await asyncio.sleep(self._api_response_delay.get_value()) created_time = resource_attributes.created resource_boot_time = self._resource_boot_time.get_value() # check if resource should already run if (datetime.now() - created_time) > timedelta( seconds=resource_boot_time ) and resource_attributes.get( "resource_status", ResourceStatus.Booting, # When cobald is restarted, "resource_status" is not set. Since this is a # FakeAdapter, when can safely start the cycle again by assuming # ResourceStatus.Booting and let TARDIS manage the drone's life cycle ) is ResourceStatus.Booting: return self.handle_response( AttributeDict(resource_status=ResourceStatus.Running) ) return AttributeDict() # do not change anything
[docs] async def stop_resource(self, resource_attributes: AttributeDict) -> None: await asyncio.sleep(self._api_response_delay.get_value()) # update resource status manually to ResourceStatus.Stopped, so that # the life cycle comes to an end. resource_attributes.resource_status = ResourceStatus.Stopped
[docs] async def terminate_resource(self, resource_attributes: AttributeDict) -> None: await asyncio.sleep(self._api_response_delay.get_value()) # update resource status manually to ResourceStatus.Deleted, so that # the life cycle is ended. resource_attributes.resource_status = ResourceStatus.Deleted
[docs] @contextmanager def handle_exceptions(self) -> None: try: yield except Exception as ex: raise TardisError from ex