Source code for tardis.utilities.asynccachemap

from ..exceptions.executorexceptions import CommandExecutionFailure
from collections.abc import Mapping
from datetime import datetime
from datetime import timedelta

import asyncio
import logging
import json

logger = logging.getLogger("cobald.runtime.tardis.utilities.asynccachemap")


[docs]class AsyncCacheMap(Mapping): def __init__(self, update_coroutine, max_age: int = 60 * 15): self._update_coroutine = update_coroutine self._max_age = max_age self._last_update = datetime.fromtimestamp(0) self._data = {} self._lock = None @property def _async_lock(self): # Create lock once tardis event loop is running. # To avoid got Future <Future pending> attached to a different loop exception if not self._lock: self._lock = asyncio.Lock() return self._lock @property def last_update(self) -> datetime: return self._last_update
[docs] async def update_status(self) -> None: current_time = datetime.now() async with self._async_lock: if (current_time - self._last_update) > timedelta(seconds=self._max_age): try: data = await self._update_coroutine() except json.decoder.JSONDecodeError as je: logger.warning( f"AsyncMap update_status failed: Could not decode json {je}" ) except CommandExecutionFailure as cf: logger.warning(f"AsyncMap update_status failed: {cf}") else: self._data = data self._last_update = current_time
def __iter__(self): return iter(self._data) def __getitem__(self, item): return self._data[item] def __len__(self): return len(self._data) def __eq__(self, other): if not isinstance(other, AsyncCacheMap): return False return ( self._update_coroutine == other._update_coroutine and self._max_age == other._max_age and self._last_update == other._last_update and self._data == other._data and self._lock == other._lock )