Source code for tardis.adapters.sites.moab

from ...exceptions.executorexceptions import CommandExecutionFailure
from ...exceptions.tardisexceptions import TardisError
from ...exceptions.tardisexceptions import TardisTimeout
from ...exceptions.tardisexceptions import TardisResourceStatusUpdateFailed
from ...interfaces.executor import Executor
from ...interfaces.siteadapter import ResourceStatus
from ...interfaces.siteadapter import SiteAdapter
from ...utilities.staticmapping import StaticMapping
from ...utilities.asyncbulkcall import AsyncBulkCall
from ...utilities.attributedict import AttributeDict
from ...utilities.executors.shellexecutor import ShellExecutor
from ...utilities.utils import (
    convert_to,
    drone_environment_to_str,
    submit_cmd_option_formatter,
)

from asyncio import TimeoutError
from contextlib import contextmanager
from functools import partial
from typing import Iterable, Mapping, Tuple

import asyncssh
import logging
import warnings
from xml.dom import minidom

logger = logging.getLogger("cobald.runtime.tardis.adapters.sites.moab")


[docs]async def showq( *resource_attributes: Tuple[AttributeDict, ...], executor: Executor ) -> Iterable[Mapping]: showq_active_cmd = "showq --xml -w user=${USER}" showq_completed_cmd = "showq -c --xml -w user=${USER}" logger.debug("Moab status update is running.") combined_response_stdout = "" for cmd in (showq_active_cmd, showq_completed_cmd): response = await executor.run_command(cmd) combined_response_stdout += response.stdout # combine two XML outputs to one xml_output = minidom.parseString( combined_response_stdout.replace("\n", "").replace("</Data><Data>", "") ) xml_jobs_list = xml_output.getElementsByTagName("queue") # parse XML output moab_resource_status = {} for queue in xml_jobs_list: queue_jobs_list = queue.getElementsByTagName("job") for line in queue_jobs_list: moab_resource_status[line.attributes["JobID"].value] = { "JobID": line.attributes["JobID"].value, "State": line.attributes["State"].value, } logger.debug("Moab status update completed") return ( moab_resource_status.get( str(resource.remote_resource_uuid), # assume that jobs that do not show up (anymore) in squeue have # State Completed (Deleted) { "State": "Completed", }, ) for resource in resource_attributes )
[docs]class MoabAdapter(SiteAdapter): def __init__(self, machine_type: str, site_name: str): self._machine_type = machine_type self._site_name = site_name try: self._startup_command = self.machine_type_configuration.StartupCommand except AttributeError: if not hasattr(self.configuration, "StartupCommand"): raise warnings.warn( "StartupCommand has been moved to the machine_type_configuration!", DeprecationWarning, stacklevel=2, ) self._startup_command = self.configuration.StartupCommand self._executor = getattr(self.configuration, "executor", ShellExecutor()) key_translator = StaticMapping( remote_resource_uuid="JobID", resource_status="State" ) # see job state codes at https://computing.llnl.gov/tutorials/moab/#JobStates translator_functions = StaticMapping( State=lambda x, translator=StaticMapping( BatchHold=ResourceStatus.Stopped, Canceling=ResourceStatus.Running, CANCELLED=ResourceStatus.Deleted, Completed=ResourceStatus.Deleted, COMPLETED=ResourceStatus.Deleted, COMPLETING=ResourceStatus.Running, Deffered=ResourceStatus.Booting, Depend=ResourceStatus.Error, Dependency=ResourceStatus.Error, FAILED=ResourceStatus.Error, Idle=ResourceStatus.Booting, JobHeldUser=ResourceStatus.Stopped, Migrated=ResourceStatus.Booting, NODE_FAIL=ResourceStatus.Error, NotQueued=ResourceStatus.Error, PENDING=ResourceStatus.Booting, Priority=ResourceStatus.Booting, Removed=ResourceStatus.Deleted, Resources=ResourceStatus.Booting, Running=ResourceStatus.Running, RUNNING=ResourceStatus.Running, Staging=ResourceStatus.Booting, Starting=ResourceStatus.Booting, Suspended=ResourceStatus.Stopped, SUSPENDED=ResourceStatus.Stopped, SystemHold=ResourceStatus.Stopped, TimeLimit=ResourceStatus.Deleted, TIMEOUT=ResourceStatus.Deleted, UserHold=ResourceStatus.Stopped, Vacated=ResourceStatus.Deleted, ): translator[x], JobID=int, ) self.handle_response = partial( self.handle_response, key_translator=key_translator, translator_functions=translator_functions, ) bulk_size = getattr(self.configuration, "bulk_size", 100) bulk_delay = getattr(self.configuration, "bulk_delay", 1.0) self._showq = AsyncBulkCall( partial(showq, executor=self._executor), size=bulk_size, delay=bulk_delay, )
[docs] async def deploy_resource( self, resource_attributes: AttributeDict ) -> AttributeDict: msub_cmdline_option_string = self.msub_cmdline_options( resource_attributes.drone_uuid, resource_attributes.obs_machine_meta_data_translation_mapping, ) request_command = f"msub {msub_cmdline_option_string} {self._startup_command}" result = await self._executor.run_command(request_command) logger.debug(f"{self.site_name} servers create returned {result}") remote_resource_uuid = int(result.stdout) return AttributeDict( remote_resource_uuid=remote_resource_uuid, resource_status=ResourceStatus.Booting, )
[docs] async def resource_status( self, resource_attributes: AttributeDict ) -> AttributeDict: return self.handle_response(await self._showq(resource_attributes))
[docs] async def terminate_resource(self, resource_attributes: AttributeDict) -> None: request_command = f"canceljob {resource_attributes.remote_resource_uuid}" try: response = await self._executor.run_command(request_command) except CommandExecutionFailure as cf: if cf.exit_code == 1: logger.warning( f"{self.site_name} servers terminate returned {cf.stdout}." "Potentially already terminated." ) else: raise cf else: logger.debug(f"{self.site_name} servers terminate returned {response}")
[docs] async def stop_resource(self, resource_attributes: AttributeDict) -> None: logger.debug("MOAB jobs cannot be stopped gracefully. Terminating instead.") await self.terminate_resource(resource_attributes)
[docs] def msub_cmdline_options(self, drone_uuid, machine_meta_data_translation_mapping): sbatch_options = self.machine_type_configuration.get( "SubmitOptions", AttributeDict() ) walltime = self.machine_type_configuration.Walltime mem = self.machine_meta_data.Memory node_type = self.machine_type_configuration.NodeType drone_environment = drone_environment_to_str( self.drone_environment(drone_uuid, machine_meta_data_translation_mapping), seperator=",", prefix="TardisDrone", customize_value=lambda x: convert_to(x, int, x), ) return submit_cmd_option_formatter( AttributeDict( short=AttributeDict( **sbatch_options.get("short", AttributeDict()), j="oe", m="p", l=f"walltime={walltime},mem={mem}gb,nodes={node_type}", v=f"{drone_environment}", ), long=AttributeDict( **sbatch_options.get("long", AttributeDict()), ), ) )
[docs] @contextmanager def handle_exceptions(self): try: yield except TimeoutError as te: raise TardisTimeout from te except asyncssh.Error as exc: logger.warning("SSH connection failed: " + str(exc)) raise TardisResourceStatusUpdateFailed from exc except IndexError as ide: raise TardisResourceStatusUpdateFailed from ide except TardisResourceStatusUpdateFailed: raise except CommandExecutionFailure as cef: raise TardisResourceStatusUpdateFailed from cef except Exception as ex: raise TardisError from ex