Source code for tardis.adapters.batchsystems.slurm

"""SLURM Batch system Adapter"""

import logging

from functools import partial

from typing import Iterable

from ...configuration.configuration import Configuration
from ...exceptions.executorexceptions import CommandExecutionFailure
from ...interfaces.batchsystemadapter import BatchSystemAdapter
from ...interfaces.batchsystemadapter import MachineStatus
from ...interfaces.executor import Executor
from ...utilities.utils import submit_cmd_option_formatter
from ...utilities.utils import csv_parser
from ...utilities.executors.shellexecutor import ShellExecutor
from ...utilities.asynccachemap import AsyncCacheMap
from ...utilities.attributedict import AttributeDict


[docs]async def slurm_status_updater( options: AttributeDict, attributes: AttributeDict, executor: Executor ) -> dict: """ Slurm status update. :param options: Additional parameters for the ``sinfo`` command :type options: AttributeDict :param attributes: Formatting options for ``sinfo`` :type attributes: AttributeDict :return: Dictionary containing the output of the ``sinfo`` command :rtype: dict """ options_string = submit_cmd_option_formatter(options) attributes_string = ",".join([str(x) for x in attributes.values()]) cmd = f'sinfo --Format="{attributes_string}" -e --noheader -r' if options_string: cmd = f"{cmd} {options_string}" slurm_status = {} try: logging.debug(f"SLURM status update is running. Command: {cmd}") status = await executor.run_command(cmd) for row in csv_parser( input_csv=status.stdout, fieldnames=tuple(attributes.keys()), delimiter=" ", replacements=dict(undefined=None), skipinitialspace=True, skiptrailingspace=True, ): row["CPUs"] = [float(elem) for elem in row["CPUs"].split("/")] row["TotalMem"] = float(row["TotalMem"]) row["FreeMem"] = row["TotalMem"] - float(row["AllocMem"]) status_key = row["Features"] if status_key is not None: slurm_status[status_key] = row except CommandExecutionFailure as ex: logging.warning(f"SLURM's sinfo could not be executed! {str(ex)}") raise else: logging.debug("SLURM status update finished.") return slurm_status
[docs]class SlurmAdapter(BatchSystemAdapter): """ :py:class:`~tardis.adapters.batchsystems.slurm.SlurmAdapter` implements the TARDIS interface to dynamically integrate and manage opportunistic resources with the SLURM Batch System. """ def __init__(self): config = Configuration() self._executor = getattr(config.BatchSystem, "executor", ShellExecutor()) try: self.slurm_options = config.BatchSystem.options except AttributeError: self.slurm_options = {} attributes = { "State": "statelong", "CPUs": "cpusstate", "AllocMem": "allocmem", "TotalMem": "memory", "Features": "features", "Machine": "nodehost", } self._slurm_status = AsyncCacheMap( update_coroutine=partial( slurm_status_updater, self.slurm_options, attributes, self._executor ), max_age=config.BatchSystem.max_age * 60, )
[docs] async def disintegrate_machine(self, drone_uuid: str) -> None: """ SLURM does not require any specific disintegration procedure (at least in Freiburg). :param drone_uuid: Uuid of the worker node, for some sites corresponding to the host name of the drone. :type drone_uuid: str :return: None """ return
[docs] async def drain_machine(self, drone_uuid: str) -> None: """ Drain a machine in the SLURM batch system, which means that no new jobs will be accepted :param drone_uuid: Uuid of the worker node, for some sites corresponding to the host name of the drone. :type drone_uuid: str :return: None """ await self._slurm_status.update_status() try: machine = self._slurm_status[drone_uuid]["Machine"] except KeyError: return cmd = f"scontrol update NodeName={machine} State=DRAIN Reason='COBalD/TARDIS'" await self._executor.run_command(cmd)
[docs] async def integrate_machine(self, drone_uuid: str) -> None: """ SLURM does not require any specific integration procedure (if the Drones take care of it themselves) :param drone_uuid: Uuid of the worker node, for some sites corresponding to the host name of the drone. :type drone_uuid: str :return: None """ return None
[docs] async def get_resource_ratios(self, drone_uuid: str) -> Iterable[float]: """ Get the ratio of requested over total resources for a worker node in Slurm. The CPU ratio is calculated as allocated CPUs / total CPUs and the memory ratio is given by allocated memory / total memory. :param drone_uuid: Uuid of the worker node, for some sites corresponding to the host name of the drone. :type drone_uuid: str :return: Iterable of float containing the ratios :rtype: Iterable[float] """ await self._slurm_status.update_status() try: slurm_status = self._slurm_status[drone_uuid] except KeyError: return {} else: return ( (slurm_status["CPUs"][3] - slurm_status["CPUs"][1]) / slurm_status["CPUs"][3], (slurm_status["TotalMem"] - slurm_status["FreeMem"]) / slurm_status["TotalMem"], )
[docs] async def get_allocation(self, drone_uuid: str) -> float: """ Get the allocation of a worker node in SLURM, which is defined as maximum of the ratios of requested over total resources (CPU, Memory, Disk, etc.). :param drone_uuid: Uuid of the worker node, for some sites corresponding to the host name of the drone. :type drone_uuid: str :return: The allocation of a worker node as described above. :rtype: float """ return max(await self.get_resource_ratios(drone_uuid), default=0.0)
[docs] async def get_machine_status(self, drone_uuid: str) -> MachineStatus: """ Get the status of a worker node in SLURM (Available, Draining, Drained, NotAvailable) :param drone_uuid: Uuid of the worker node, for some sites corresponding to the host name of the drone. :type drone_uuid: str :return: The machine status in SLURM (Available, Draining, Drained, NotAvailable) :rtype: MachineStatus """ # '*' means the machine didn't respond for a while # 'allocated+' means that node is allocated to one or more active jobs plus one # or more jobs in COMPLETING status_mapping = { "allocated": MachineStatus.Available, "allocated+": MachineStatus.Available, "mixed": MachineStatus.Available, "idle": MachineStatus.Available, "completing": MachineStatus.Available, "draining": MachineStatus.Draining, "down": MachineStatus.NotAvailable, "down*": MachineStatus.Drained, "drained": MachineStatus.NotAvailable, "drained*": MachineStatus.Drained, "fail": MachineStatus.Drained, "failing": MachineStatus.Drained, "future": MachineStatus.Drained, "maint": MachineStatus.Drained, "reboot": MachineStatus.Drained, "power_down": MachineStatus.Drained, "powering_down": MachineStatus.Drained, "reserved": MachineStatus.NotAvailable, "unknown": MachineStatus.Drained, "power_up": MachineStatus.NotAvailable, } await self._slurm_status.update_status() try: machine_status = self._slurm_status[drone_uuid] except KeyError: return MachineStatus.NotAvailable else: return status_mapping.get( machine_status["State"], MachineStatus.NotAvailable )
[docs] async def get_utilisation(self, drone_uuid: str) -> float: """ Get the utilization of a worker node in Slurm, which is defined as minimum of the ratios of requested over total resources (CPU, Memory, Disk, etc.). :param drone_uuid: Uuid of the worker node, for some sites corresponding to the host name of the drone. :type drone_uuid: str :return: The utilization of a worker node as described above. :rtype: float """ return min(await self.get_resource_ratios(drone_uuid), default=0.0)
@property def machine_meta_data_translation_mapping(self) -> AttributeDict: """ The machine meta data translation mapping is used to translate units of the machine meta data in ``TARDIS`` to values expected by the Slurm batch system adapter. :return: Machine meta data translation mapping :rtype: AttributeDict """ return AttributeDict(Cores=1, Memory=1000, Disk=1000)