Source code for tardis.adapters.sites.lancium

from aiolancium.client import Authenticator, LanciumClient
from simple_rest_client.exceptions import AuthError, ClientError

from ...exceptions.tardisexceptions import (
    TardisError,
    TardisResourceStatusUpdateFailed,
    TardisDroneCrashed,
)
from ...interfaces.siteadapter import SiteAdapter, ResourceStatus
from ...utilities.attributedict import AttributeDict, convert_to_attribute_dict
from ...utilities.asynccachemap import AsyncCacheMap
from ...utilities.staticmapping import StaticMapping

from contextlib import contextmanager
from datetime import datetime
from functools import partial
from typing import Dict

import logging

logger = logging.getLogger("cobald.runtime.tardis.adapters.sites.lancium")


[docs]async def lancium_status_updater(client: LanciumClient) -> Dict: response = await client.jobs.show_jobs() logger.debug(f"Show jobs returned {response}") return {job["id"]: job for job in response["jobs"]}
[docs]class LanciumAdapter(SiteAdapter): # space in last key requires dict expansion in `__init__` `translation_functions` resource_status_translation = { "created": ResourceStatus.Booting, "submitted": ResourceStatus.Booting, "queued": ResourceStatus.Booting, "ready": ResourceStatus.Booting, "running": ResourceStatus.Running, "error": ResourceStatus.Error, "finished": ResourceStatus.Stopped, "delete pending": ResourceStatus.Stopped, "deleted": ResourceStatus.Deleted, } def __init__(self, machine_type: str, site_name: str): self._machine_type = machine_type self._site_name = site_name auth = Authenticator(api_key=self.configuration.api_key) self.client = LanciumClient(api_url=self.configuration.api_url, auth=auth) key_translator = StaticMapping( remote_resource_uuid="id", drone_uuid="name", resource_status="status", ) translator_functions = StaticMapping( status=lambda x, translator=StaticMapping( **self.resource_status_translation ): translator[x], id=int, name=str, ) self.handle_response = partial( self.handle_response, key_translator=key_translator, translator_functions=translator_functions, ) self._lancium_status = AsyncCacheMap( update_coroutine=partial(lancium_status_updater, self.client), max_age=self.configuration.max_age * 60, )
[docs] async def deploy_resource( self, resource_attributes: AttributeDict ) -> AttributeDict: specs = dict(self.machine_type_configuration) # Overwrite, update or extend entries necessary for TARDIS to work properly specs["name"] = resource_attributes.drone_uuid specs.setdefault("resources", {}).update( dict( core_count=self.machine_meta_data.Cores, memory=self.machine_meta_data.Memory, scratch=self.machine_meta_data.Disk, ) ) specs.setdefault("environment", []).extend( [ {"variable": f"TardisDrone{key}", "value": str(value)} for key, value in self.drone_environment( resource_attributes.drone_uuid, resource_attributes.obs_machine_meta_data_translation_mapping, ).items() ] ) create_response = await self.client.jobs.create_job(job=specs) logger.debug(f"{self.site_name} create job returned {create_response}") submit_response = await self.client.jobs.submit_job( id=create_response["job"]["id"] ) logger.debug(f"{self.site_name} submit job returned {submit_response}") return self.handle_response(create_response["job"])
[docs] async def resource_status( self, resource_attributes: AttributeDict ) -> AttributeDict: await self._lancium_status.update_status() # In case the created timestamp is after last update timestamp of the # asynccachemap, no decision about the current state can be given, # since map is updated asynchronously. try: resource_uuid = resource_attributes.remote_resource_uuid resource_status = self._lancium_status[int(resource_uuid)] except KeyError as err: if ( self._lancium_status.last_update - resource_attributes.created ).total_seconds() < 0: raise TardisResourceStatusUpdateFailed from err else: resource_status = { "id": resource_attributes.remote_resource_uuid, "status": "deleted", } logger.debug(f"{self.site_name} has status {resource_status}.") resource_attributes["updated"] = datetime.now() return convert_to_attribute_dict( {**resource_attributes, **self.handle_response(resource_status)} )
[docs] async def stop_resource(self, resource_attributes: AttributeDict): response = await self.client.jobs.terminate_job( id=int(resource_attributes.remote_resource_uuid) ) logger.debug(f"{self.site_name} stop resource returned {response}") return response
[docs] async def terminate_resource(self, resource_attributes: AttributeDict): response = await self.client.jobs.delete_job( id=int(resource_attributes.remote_resource_uuid) ) logger.debug(f"{self.site_name} terminate resource returned {response}") return response
[docs] @contextmanager def handle_exceptions(self): try: yield except AuthError as ae: # AuthError inherits ClientError but does not contain a response # therefore handle AuthError separately before ClientError raise TardisError from ae except ClientError as ce: status_code = ce.response.status_code if status_code == 404: # Drone does not exist anymore raise TardisDroneCrashed from ce elif status_code == 409: # Current status of the drone does not allow the performed operation # Let us wait until the next resource status update raise TardisResourceStatusUpdateFailed from ce raise TardisError from ce except Exception as ex: raise TardisError from ex