Source code for tardis.adapters.sites.moab

from ...exceptions.executorexceptions import CommandExecutionFailure
from ...exceptions.tardisexceptions import TardisError
from ...exceptions.tardisexceptions import TardisTimeout
from ...exceptions.tardisexceptions import TardisResourceStatusUpdateFailed
from ...interfaces.siteadapter import ResourceStatus
from ...interfaces.siteadapter import SiteAdapter
from ...utilities.staticmapping import StaticMapping
from ...utilities.attributedict import AttributeDict
from ...utilities.attributedict import convert_to_attribute_dict
from ...utilities.executors.shellexecutor import ShellExecutor
from ...utilities.asynccachemap import AsyncCacheMap
from ...utilities.utils import (
    convert_to,
    drone_environment_to_str,
    submit_cmd_option_formatter,
)

from asyncio import TimeoutError
from contextlib import contextmanager
from functools import partial
from datetime import datetime

import asyncssh
import logging
import re
import warnings
from xml.dom import minidom

logger = logging.getLogger("cobald.runtime.tardis.adapters.sites.moab")


[docs]async def moab_status_updater(executor): cmd = "showq --xml -w user=$(whoami) && showq -c --xml -w user=$(whoami)" logger.debug("Moab status update is running.") response = await executor.run_command(cmd) # combine two XML outputs to one xml_output = minidom.parseString( response["stdout"].replace("\n", "").replace("</Data><Data>", "") ) xml_jobs_list = xml_output.getElementsByTagName("queue") # parse XML output moab_resource_status = {} for queue in xml_jobs_list: queue_jobs_list = queue.getElementsByTagName("job") for line in queue_jobs_list: moab_resource_status[line.attributes["JobID"].value] = { "JobID": line.attributes["JobID"].value, "State": line.attributes["State"].value, } logger.debug("Moab status update completed") return moab_resource_status
[docs]class MoabAdapter(SiteAdapter): def __init__(self, machine_type: str, site_name: str): self._machine_type = machine_type self._site_name = site_name try: self._startup_command = self.machine_type_configuration.StartupCommand except AttributeError: if not hasattr(self.configuration, "StartupCommand"): raise warnings.warn( "StartupCommand has been moved to the machine_type_configuration!", DeprecationWarning, stacklevel=2, ) self._startup_command = self.configuration.StartupCommand self._executor = getattr(self.configuration, "executor", ShellExecutor()) self._moab_status = AsyncCacheMap( update_coroutine=partial(moab_status_updater, self._executor), max_age=self.configuration.StatusUpdate * 60, ) key_translator = StaticMapping( remote_resource_uuid="JobID", resource_status="State" ) # see job state codes at https://computing.llnl.gov/tutorials/moab/#JobStates translator_functions = StaticMapping( State=lambda x, translator=StaticMapping( BatchHold=ResourceStatus.Stopped, Canceling=ResourceStatus.Running, CANCELLED=ResourceStatus.Deleted, Completed=ResourceStatus.Deleted, COMPLETED=ResourceStatus.Deleted, COMPLETING=ResourceStatus.Running, Deffered=ResourceStatus.Booting, Depend=ResourceStatus.Error, Dependency=ResourceStatus.Error, FAILED=ResourceStatus.Error, Idle=ResourceStatus.Booting, JobHeldUser=ResourceStatus.Stopped, Migrated=ResourceStatus.Booting, NODE_FAIL=ResourceStatus.Error, NotQueued=ResourceStatus.Error, PENDING=ResourceStatus.Booting, Priority=ResourceStatus.Booting, Removed=ResourceStatus.Deleted, Resources=ResourceStatus.Booting, Running=ResourceStatus.Running, RUNNING=ResourceStatus.Running, Staging=ResourceStatus.Booting, Starting=ResourceStatus.Booting, Suspended=ResourceStatus.Stopped, SUSPENDED=ResourceStatus.Stopped, SystemHold=ResourceStatus.Stopped, TimeLimit=ResourceStatus.Deleted, TIMEOUT=ResourceStatus.Deleted, UserHold=ResourceStatus.Stopped, Vacated=ResourceStatus.Deleted, ): translator[x], JobID=int, ) self.handle_response = partial( self.handle_response, key_translator=key_translator, translator_functions=translator_functions, )
[docs] async def deploy_resource( self, resource_attributes: AttributeDict ) -> AttributeDict: msub_cmdline_option_string = self.msub_cmdline_options( resource_attributes.drone_uuid, resource_attributes.obs_machine_meta_data_translation_mapping, ) request_command = f"msub {msub_cmdline_option_string} {self._startup_command}" result = await self._executor.run_command(request_command) logger.debug(f"{self.site_name} servers create returned {result}") remote_resource_uuid = int(result.stdout) resource_attributes.update( remote_resource_uuid=remote_resource_uuid, created=datetime.now(), updated=datetime.now(), resource_status=ResourceStatus.Booting, ) return resource_attributes
[docs] @staticmethod def check_remote_resource_uuid(resource_attributes, regex, response): pattern = re.compile(regex, flags=re.MULTILINE) remote_resource_uuid = int(pattern.findall(response)[0]) if remote_resource_uuid != int(resource_attributes.remote_resource_uuid): raise TardisError( f"Failed to terminate {resource_attributes.remote_resource_uuid}." ) else: resource_attributes.update( resource_status=ResourceStatus.Stopped, updated=datetime.now() ) return remote_resource_uuid
[docs] async def resource_status( self, resource_attributes: AttributeDict ) -> AttributeDict: await self._moab_status.update_status() # In case the created timestamp is after last update timestamp of the # asynccachemap, no decision about the current state can be given, # since map is updated asynchronously. try: resource_uuid = resource_attributes.remote_resource_uuid resource_status = self._moab_status[str(resource_uuid)] except KeyError as err: if ( self._moab_status._last_update - resource_attributes.created ).total_seconds() < 0: raise TardisResourceStatusUpdateFailed from err else: resource_status = { "JobID": resource_attributes.remote_resource_uuid, "State": "Completed", } logger.debug(f"{self.site_name} has status {resource_status}.") resource_attributes.update(updated=datetime.now()) return convert_to_attribute_dict( {**resource_attributes, **self.handle_response(resource_status)} )
[docs] async def terminate_resource(self, resource_attributes: AttributeDict): request_command = f"canceljob {resource_attributes.remote_resource_uuid}" try: response = await self._executor.run_command(request_command) except CommandExecutionFailure as cf: if cf.exit_code == 1: logger.warning( f"{self.site_name} servers terminate returned {cf.stdout}" ) remote_resource_uuid = self.check_remote_resource_uuid( resource_attributes, r"ERROR: invalid job specified \((\d*)\)", cf.stderr, ) else: raise cf else: logger.debug(f"{self.site_name} servers terminate returned {response}") remote_resource_uuid = self.check_remote_resource_uuid( resource_attributes, r"^job \'(\d*)\' cancelled", response.stdout ) return self.handle_response( {"SystemJID": remote_resource_uuid}, **resource_attributes )
[docs] async def stop_resource(self, resource_attributes: AttributeDict): logger.debug("MOAB jobs cannot be stopped gracefully. Terminating instead.") return await self.terminate_resource(resource_attributes)
[docs] def msub_cmdline_options(self, drone_uuid, machine_meta_data_translation_mapping): sbatch_options = self.machine_type_configuration.get( "SubmitOptions", AttributeDict() ) walltime = self.machine_type_configuration.Walltime mem = self.machine_meta_data.Memory node_type = self.machine_type_configuration.NodeType drone_environment = drone_environment_to_str( self.drone_environment(drone_uuid, machine_meta_data_translation_mapping), seperator=",", prefix="TardisDrone", customize_value=lambda x: convert_to(x, int, x), ) return submit_cmd_option_formatter( AttributeDict( short=AttributeDict( **sbatch_options.get("short", AttributeDict()), j="oe", m="p", l=f"walltime={walltime},mem={mem}gb,nodes={node_type}", v=f"{drone_environment}", ), long=AttributeDict( **sbatch_options.get("long", AttributeDict()), ), ) )
[docs] @contextmanager def handle_exceptions(self): try: yield except TimeoutError as te: raise TardisTimeout from te except asyncssh.Error as exc: logger.warning("SSH connection failed: " + str(exc)) raise TardisResourceStatusUpdateFailed from exc except IndexError as ide: raise TardisResourceStatusUpdateFailed from ide except TardisResourceStatusUpdateFailed: raise except CommandExecutionFailure as cef: raise TardisResourceStatusUpdateFailed from cef except Exception as ex: raise TardisError from ex