Source code for tardis.utilities.utils
from .attributedict import AttributeDict
from contextlib import contextmanager
from io import StringIO
from typing import Any, Callable, Dict, List, TypeVar, Tuple
import csv
import logging
logger = logging.getLogger("cobald.runtime.tardis.utilities.utils")
[docs]def cmd_option_formatter(options: AttributeDict, prefix: str, separator: str) -> str:
options = (
f"{prefix}{name}{separator}{value}" if value is not None else f"{prefix}{name}"
for name, value in options.items()
)
return " ".join(options)
[docs]def htcondor_cmd_option_formatter(options: AttributeDict) -> str:
return cmd_option_formatter(options, prefix="-", separator=" ")
[docs]def csv_parser(
input_csv: str,
fieldnames: [List, Tuple],
delimiter: str = "\t",
replacements: dict = None,
skipinitialspace: bool = False,
skiptrailingspace: bool = False,
):
"""
Parses CSV formatted input
:param input_csv: CSV formatted input
:type input_csv: str
:param fieldnames: corresponding field names
:type fieldnames: [List, Tuple]
:param delimiter: delimiter between entries
:type delimiter: str
:param replacements: fields to be replaced
:type replacements: dict
:param skipinitialspace: ignore whitespace immediately following the delimiter
:type skipinitialspace: bool
:param skiptrailingspace: ignore whitespace at the end of each csv row
:type skiptrailingspace: bool
"""
if skiptrailingspace:
input_csv = "\n".join((line.strip() for line in input_csv.splitlines()))
if len(fieldnames) > 1:
input_csv = "\n".join(
(line for line in input_csv.splitlines() if delimiter in line)
)
replacements = replacements or {}
with StringIO(input_csv) as csv_input:
csv_reader = csv.DictReader(
csv_input,
fieldnames=fieldnames,
delimiter=delimiter,
skipinitialspace=skipinitialspace,
)
for row in csv_reader:
yield {
key: value if value not in replacements.keys() else replacements[value]
for key, value in row.items()
}
[docs]@contextmanager
def disable_logging(level):
logging.disable(level)
yield
logging.disable(logging.NOTSET)
[docs]def machine_meta_data_translation(
machine_meta_data: AttributeDict, meta_data_translation_mapping: AttributeDict
):
"""
Helper function to translate units of the machine_meta_data to match the
units required by the overlay batch system
:param machine_meta_data: Machine Meta Data (Cores, Memory, Disk)
:param meta_data_translation_mapping: Map used for the translation of meta
data, contains conversion factors
:return: Converted meta data with units expected by the OBS
:rtype: dict
"""
try:
return {
key: meta_data_translation_mapping[key] * value
for key, value in machine_meta_data.items()
}
except KeyError as ke:
logger.critical(
f"machine_meta_data_translation failed: no translation known for {ke}"
)
raise
[docs]def load_states(resources):
import tardis.resources.dronestates
for entry in resources:
state_class = getattr(tardis.resources.dronestates, str(entry["state"]))
entry["state"] = state_class()
return resources
[docs]def submit_cmd_option_formatter(options: AttributeDict) -> str:
option_prefix = dict(short="-", long="--")
option_separator = dict(short=" ", long="=")
option_string = ""
for option_type in ("short", "long"):
try:
tmp_option_string = cmd_option_formatter(
getattr(options, option_type),
prefix=option_prefix[option_type],
separator=option_separator[option_type],
)
except AttributeError:
pass
else:
# add additional space between short and long options
if option_string:
option_string += " "
option_string += tmp_option_string
return option_string.strip()
T = TypeVar("T")
sentinel = object()
[docs]def convert_to(
value: Any, convert_to_type: Callable[[Any], T], default: Any = sentinel
) -> T:
try:
return convert_to_type(value)
except ValueError:
return default
[docs]def drone_environment_to_str(
drone_environment: Dict,
seperator: str,
prefix: str,
customize_key: Callable = lambda x: x,
customize_value: Callable = lambda x: x,
) -> str:
return seperator.join(
f"{prefix}{customize_key(key)}={customize_value(value)}"
for key, value in drone_environment.items()
)