from collections import defaultdict
from datetime import datetime
import asyncio
import logging
from typing import TYPE_CHECKING
from typing import Type
from ..exceptions.tardisexceptions import TardisAuthError
from ..exceptions.tardisexceptions import TardisDroneCrashed
from ..exceptions.tardisexceptions import TardisTimeout
from ..exceptions.tardisexceptions import TardisQuotaExceeded
from ..exceptions.tardisexceptions import TardisResourceStatusUpdateFailed
from ..interfaces.batchsystemadapter import MachineStatus
from ..interfaces.state import State
from ..interfaces.siteadapter import ResourceStatus
from ..utilities.pipeline import StopProcessing
if TYPE_CHECKING:
from tardis.resources.drone import Drone
logger = logging.getLogger("cobald.runtime.tardis.resources.dronestates")
[docs]async def batchsystem_machine_status(
state_transition, drone: "Drone", current_state: Type[State]
):
machine_status = await drone.batch_system_agent.get_machine_status(
drone_uuid=drone.resource_attributes["drone_uuid"]
)
return state_transition[machine_status]()
[docs]async def check_remote_draining(
state_transition, drone: "Drone", current_state: Type[State]
):
database_state = await drone.database_state()
if database_state is DrainState and database_state is not current_state:
raise StopProcessing(last_result=database_state())
return state_transition
[docs]async def check_demand(state_transition, drone: "Drone", current_state: Type[State]):
if not drone.demand:
drone._supply = 0.0
if current_state in (BootingState,):
raise StopProcessing(last_result=CleanupState()) # static state transition
else:
raise StopProcessing(last_result=DrainState()) # static state transition
return state_transition
[docs]async def check_minimum_lifetime(
state_transition, drone: "Drone", current_state: Type[State]
):
if (
drone.minimum_lifetime
and (datetime.now() - drone.resource_attributes.updated).total_seconds()
> drone.minimum_lifetime
):
raise StopProcessing(last_result=DrainState())
return state_transition
[docs]async def resource_status(state_transition, drone: "Drone", current_state: Type[State]):
try:
drone.resource_attributes.update(
await drone.site_agent.resource_status(drone.resource_attributes)
)
logger.debug(f"Resource attributes: {drone.resource_attributes}")
except (TardisAuthError, TardisTimeout, TardisResourceStatusUpdateFailed) as err:
# Retry to get current state of the resource
raise StopProcessing(last_result=current_state()) from err
except TardisDroneCrashed as tdc:
# Try to cleanup crashed resources
raise StopProcessing(last_result=CleanupState()) from tdc
else:
return state_transition[drone.resource_attributes.resource_status]()
[docs]class RequestState(State):
[docs] @classmethod
async def run(cls, drone: "Drone"):
logger.info(f"Drone {drone.resource_attributes} in RequestState")
try:
drone.resource_attributes.update(
await drone.site_agent.deploy_resource(drone.resource_attributes)
)
except (
TardisAuthError,
TardisTimeout,
TardisQuotaExceeded,
TardisResourceStatusUpdateFailed,
):
await drone.set_state(DownState())
except TardisDroneCrashed:
await drone.set_state(CleanupState())
else:
await drone.set_state(BootingState())
[docs]class BootingState(State):
transition = {
ResourceStatus.Booting: lambda: BootingState(),
ResourceStatus.Running: lambda: IntegrateState(),
ResourceStatus.Deleted: lambda: DownState(),
ResourceStatus.Stopped: lambda: CleanupState(),
ResourceStatus.Error: lambda: CleanupState(),
}
processing_pipeline = [check_demand, resource_status]
[docs] @classmethod
async def run(cls, drone: "Drone"):
logger.info(f"Drone {drone.resource_attributes} in BootingState")
await drone.set_state(await cls.run_processing_pipeline(drone))
[docs]class IntegrateState(State):
[docs] @classmethod
async def run(cls, drone: "Drone"):
logger.info(f"Drone {drone.resource_attributes} in IntegrateState")
await drone.batch_system_agent.integrate_machine(
drone_uuid=drone.resource_attributes["drone_uuid"]
)
await drone.set_state(IntegratingState()) # static state transition
[docs]class IntegratingState(State):
transition = {
ResourceStatus.Running: lambda: {
MachineStatus.NotAvailable: lambda: IntegratingState(),
MachineStatus.Available: lambda: AvailableState(),
MachineStatus.Draining: lambda: DrainingState(),
MachineStatus.Drained: lambda: DisintegrateState(),
},
ResourceStatus.Booting: lambda: defaultdict(lambda: BootingState),
ResourceStatus.Deleted: lambda: defaultdict(lambda: DownState),
ResourceStatus.Stopped: lambda: defaultdict(lambda: CleanupState),
ResourceStatus.Error: lambda: defaultdict(lambda: CleanupState),
}
processing_pipeline = [resource_status, batchsystem_machine_status]
[docs] @classmethod
async def run(cls, drone: "Drone"):
logger.info(f"Drone {drone.resource_attributes} in IntegratingState")
await drone.set_state(await cls.run_processing_pipeline(drone))
[docs]class AvailableState(State):
transition = {
ResourceStatus.Running: lambda: {
MachineStatus.Available: lambda: AvailableState(),
MachineStatus.NotAvailable: lambda: ShutDownState(),
MachineStatus.Draining: lambda: DrainingState(),
MachineStatus.Drained: lambda: DisintegrateState(),
},
ResourceStatus.Booting: lambda: defaultdict(lambda: BootingState),
ResourceStatus.Deleted: lambda: defaultdict(lambda: DownState),
ResourceStatus.Stopped: lambda: defaultdict(lambda: CleanupState),
ResourceStatus.Error: lambda: defaultdict(lambda: CleanupState),
}
processing_pipeline = [
check_remote_draining,
check_demand,
check_minimum_lifetime,
resource_status,
batchsystem_machine_status,
]
[docs] @classmethod
async def run(cls, drone: "Drone"):
logger.info(f"Drone {drone.resource_attributes} in AvailableState")
new_state = await cls.run_processing_pipeline(drone)
if isinstance(new_state, AvailableState):
drone._allocation = await drone.batch_system_agent.get_allocation(
drone_uuid=drone.resource_attributes["drone_uuid"]
)
drone._utilisation = await drone.batch_system_agent.get_utilisation(
drone_uuid=drone.resource_attributes["drone_uuid"]
)
drone._supply = drone.maximum_demand
await drone.set_state(new_state)
[docs]class DrainState(State):
[docs] @classmethod
async def run(cls, drone: "Drone"):
logger.info(f"Drone {drone.resource_attributes} in DrainState")
await drone.batch_system_agent.drain_machine(
drone_uuid=drone.resource_attributes["drone_uuid"]
)
await asyncio.sleep(0.5)
await drone.set_state(DrainingState()) # static state transition
[docs]class DrainingState(State):
transition = {
ResourceStatus.Running: lambda: {
MachineStatus.Draining: lambda: DrainingState(),
MachineStatus.Available: lambda: DrainState(),
MachineStatus.Drained: lambda: DisintegrateState(),
MachineStatus.NotAvailable: lambda: ShutDownState(),
},
# In case the job is retried by HTCondor, resources can transition to
# BootingState again. In this case the job should be removed.
ResourceStatus.Booting: lambda: defaultdict(lambda: CleanupState),
ResourceStatus.Deleted: lambda: defaultdict(lambda: DownState),
ResourceStatus.Stopped: lambda: defaultdict(lambda: CleanupState),
ResourceStatus.Error: lambda: defaultdict(lambda: CleanupState),
}
processing_pipeline = [resource_status, batchsystem_machine_status]
[docs] @classmethod
async def run(cls, drone: "Drone"):
logger.info(f"Drone {drone.resource_attributes} in DrainingState")
await drone.set_state(await cls.run_processing_pipeline(drone))
[docs]class DisintegrateState(State):
[docs] @classmethod
async def run(cls, drone: "Drone"):
logger.info(f"Drone {drone.resource_attributes} in DisintegrateState")
await drone.batch_system_agent.disintegrate_machine(
drone_uuid=drone.resource_attributes["drone_uuid"]
)
await drone.set_state(ShutDownState()) # static state transition
[docs]class ShutDownState(State):
transition = {
# In case the job is retried by HTCondor, resources can transition to
# BootingState again. In this case the job should be removed.
ResourceStatus.Booting: lambda: CleanupState(),
ResourceStatus.Running: lambda: ShuttingDownState(),
ResourceStatus.Stopped: lambda: CleanupState(),
ResourceStatus.Deleted: lambda: DownState(),
ResourceStatus.Error: lambda: CleanupState(),
}
processing_pipeline = [resource_status]
[docs] @classmethod
async def run(cls, drone: "Drone"):
logger.info(f"Drone {drone.resource_attributes} in ShutDownState")
logger.debug(
f"Stopping VM with ID {drone.resource_attributes.remote_resource_uuid}"
)
new_state = await cls.run_processing_pipeline(drone)
if isinstance(new_state, ShuttingDownState):
try:
await drone.site_agent.stop_resource(drone.resource_attributes)
except TardisResourceStatusUpdateFailed:
logger.warning(
f"Calling stop_resource failed for drone "
f"{drone.resource_attributes.drone_uuid}"
)
new_state = ShutDownState()
await drone.set_state(new_state)
[docs]class ShuttingDownState(State):
transition = {
# In case the job is retried by HTCondor, resources can transition to
# BootingState again. In this case the job should be removed.
ResourceStatus.Booting: lambda: CleanupState(),
ResourceStatus.Running: lambda: ShuttingDownState(),
ResourceStatus.Stopped: lambda: CleanupState(),
ResourceStatus.Deleted: lambda: DownState(),
ResourceStatus.Error: lambda: CleanupState(),
}
processing_pipeline = [resource_status]
[docs] @classmethod
async def run(cls, drone: "Drone"):
logger.info(f"Drone {drone.resource_attributes} in ShuttingDownState")
logger.debug(
f"Checking Status of drone with ID "
f"{drone.resource_attributes.remote_resource_uuid}"
)
await drone.set_state(await cls.run_processing_pipeline(drone))
[docs]class CleanupState(State):
transition = {
ResourceStatus.Booting: lambda: CleanupState(),
ResourceStatus.Running: lambda: DrainState(),
ResourceStatus.Stopped: lambda: CleanupState(),
ResourceStatus.Deleted: lambda: DownState(),
ResourceStatus.Error: lambda: CleanupState(),
}
processing_pipeline = [resource_status]
[docs] @classmethod
async def run(cls, drone: "Drone"):
logger.info(f"Drone {drone.resource_attributes} in CleanupState")
new_state = await cls.run_processing_pipeline(drone)
if isinstance(new_state, CleanupState):
try:
logger.debug(
f"Destroying VM with ID "
f"{drone.resource_attributes.remote_resource_uuid}"
)
await drone.site_agent.terminate_resource(drone.resource_attributes)
except TardisDroneCrashed:
logger.warning(
f"Calling terminate_resource failed for drone "
f"{drone.resource_attributes.drone_uuid}. Drone crashed!"
)
new_state = DownState()
except TardisResourceStatusUpdateFailed:
logger.warning(
f"Calling terminate_resource failed for drone "
f"{drone.resource_attributes.drone_uuid}. Will retry later!"
)
await drone.set_state(new_state) # static state transition
[docs]class DownState(State):
[docs] @classmethod
async def run(cls, drone: "Drone"):
logger.info(f"Drone {drone.resource_attributes} in DownState")
drone.demand = 0