Source code for tardis.resources.drone

from typing import List, Union, Optional, Type

from tardis.agents.batchsystemagent import BatchSystemAgent
from tardis.agents.siteagent import SiteAgent
from tardis.interfaces.plugin import Plugin
from tardis.interfaces.state import State
from .dronestates import RequestState
from .dronestates import DownState
from ..plugins.sqliteregistry import SqliteRegistry
from ..utilities.attributedict import AttributeDict
from ..utilities.utils import load_states
from cobald.daemon import service
from cobald.interfaces import Pool

from backports.cached_property import cached_property
from datetime import datetime

import asyncio
import logging
import uuid

logger = logging.getLogger("cobald.runtime.tardis.resources.drone")

[docs]@service(flavour=asyncio) class Drone(Pool): def __init__( self, site_agent: SiteAgent, batch_system_agent: BatchSystemAgent, plugins: Optional[List[Plugin]] = None, remote_resource_uuid=None, drone_uuid=None, state: Optional[State] = None, created: float = None, updated: float = None, ): self._site_agent = site_agent self._batch_system_agent = batch_system_agent self._plugins = plugins or [] self._state = state self.resource_attributes = AttributeDict( site_name=self._site_agent.site_name, machine_type=self.site_agent.machine_type, obs_machine_meta_data_translation_mapping=self.batch_system_agent.machine_meta_data_translation_mapping, # noqa B950 remote_resource_uuid=remote_resource_uuid, created=created or, updated=updated or, drone_uuid=drone_uuid or self.site_agent.drone_uuid(uuid.uuid4().hex[:10]), ) self._allocation = 0.0 self._demand = self.maximum_demand self._utilisation = 0.0 self._supply = 0.0 @property def allocation(self) -> float: return self._allocation @property def batch_system_agent(self) -> BatchSystemAgent: return self._batch_system_agent @cached_property def _database(self) -> Optional[SqliteRegistry]: for plugin in self._plugins: if isinstance(plugin, SqliteRegistry): return plugin
[docs] async def database_state(self) -> Optional[Type[State]]: try: return load_states( await self._database.get_resource_state( self.resource_attributes.drone_uuid ) )[0]["state"] except (IndexError, AttributeError): return None
@property def demand(self) -> float: return self._demand @demand.setter def demand(self, value: float): self._demand = value @property def heartbeat_interval(self) -> int: return self.site_agent.drone_heartbeat_interval @property def minimum_lifetime(self) -> [int, None]: return self.site_agent.drone_minimum_lifetime @property def maximum_demand(self) -> float: return self.site_agent.machine_meta_data["Cores"] @property def supply(self) -> float: return self._supply @property def utilisation(self) -> float: return self._utilisation @property def site_agent(self) -> SiteAgent: return self._site_agent
[docs] async def run(self): if self.state is None: # The state of a newly created Drone is None, since the plugins need # to be notified on the first state change. As calling the # ``set_state`` coroutine is not possible in the constructor, we # initiate the first state change here await self.set_state(RequestState()) while True: current_state = self.state await if isinstance(current_state, DownState): logger.debug( f"Garbage Collect Drone: {self.resource_attributes.drone_uuid}" ) self._demand = 0 return await asyncio.sleep(self.heartbeat_interval)
[docs] def register_plugins(self, observer: Union[List[Plugin], Plugin]) -> None: self._plugins.append(observer)
[docs] def remove_plugins(self, observer: Union[List[Plugin], Plugin]) -> None: self._plugins.remove(observer)
[docs] async def set_state(self, state: State) -> None: """Should be replaced by asynchronous state.setter property once available""" if state.__class__ != self.state.__class__: self.resource_attributes.updated = self._state = state await self.notify_plugins() else: self._state = state
@property def state(self) -> State: return self._state
[docs] async def notify_plugins(self) -> None: for plugin in self._plugins: await plugin.notify(self.state, self.resource_attributes)