Source code for tardis.utilities.asyncbulkcall

from functools import cached_property
from typing import TypeVar, Generic, Iterable, List, Tuple, Optional, Set
from typing_extensions import Protocol
import asyncio
import time
import sys


T = TypeVar("T")
R = TypeVar("R")


[docs]class BulkCommand(Protocol[T, R]): """ Protocol of callables suitable for :py:class:`~.BulkExecution` A bulk command must take an arbitrary number of tasks and is expected to provide an iterable of one result per task. Alternatively, it may provide a single :py:data:`None` value to indicate that there is no result. An unhandled :py:class:`Exception` means that all tasks failed with that :py:class:`Exception`. """ async def __call__(self, *__tasks: T) -> Optional[Iterable[R]]: ... # noqa E704
[docs]class AsyncBulkCall(Generic[T, R]): """ Framework for queueing and executing several tasks via bulk commands :param command: async callable that executes several tasks :param size: maximum number of tasks to execute in one bulk :param delay: maximum time window for tasks to execute in one bulk :param concurrent: how often the `command` may be executed at the same time Given some bulk-task callable ``(T, ...) -> (R, ...)`` (the ``command``), :py:class:`~.BulkExecution` represents a single-task callable ``(T) -> R``. Single-task calls are buffered for a moment according to ``size`` and ``delay``, then executed in bulk with ``concurrent`` calls to ``command``. Each :py:class:`~.BulkExecution` should represent a different ``command`` (for example, ``rm`` or ``mkdir``) collecting similar tasks (for example, ``rm foo`` and ``rm bar`` to ``rm foo bar``). The ``command`` is an arbitrary async callable and can freely decide how to handle its tasks. The :py:class:`~.BulkExecution` takes care of collecting individual tasks, partitioning them to bulks, and translating the results of bulk execution back to individual tasks. Both ``size`` and ``delay`` control how long to queue tasks at most before starting to execute them. The ``concurrent`` parameter controls how many bulks may run at once; when concurrency is low tasks may be waiting for execution even past ``size`` and ``delay``. Possible values for ``concurrent`` are :py:data:`None` for unlimited concurrency or an integer above 0 to set a precise concurrency limit. .. note:: If the ``command`` requires additional arguments, wrap it via :py:func:`~functools.partial`, for example ``AsyncBulkCall(partial(async_rm, force=True), ...)``. """ def __init__( self, command: BulkCommand[T, R], size: int, delay: float, concurrent: Optional[int] = None, ): self._command = command self._size = size self._delay = delay self._concurrency = sys.maxsize if concurrent is None else concurrent # task handling dispatch from queue to command execution self._dispatch_task: Optional[asyncio.Task] = None # tasks handling individual command executions self._bulk_tasks: Set[asyncio.Task] = set() self._verify_settings() @cached_property def _concurrent(self) -> "asyncio.BoundedSemaphore": """synchronized counter for active commands""" return asyncio.BoundedSemaphore(value=self._concurrency) @cached_property def _queue(self) -> "asyncio.Queue[Tuple[T, asyncio.Future[R]]]": """queue of outstanding tasks""" return asyncio.Queue() def _verify_settings(self): if not isinstance(self._size, int) or self._size <= 0: raise ValueError(f"expected 'size' > 0, got {self._size!r} instead") if self._delay <= 0: raise ValueError(f"expected 'delay' > 0, got {self._delay!r} instead") if not isinstance(self._concurrency, int) or self._concurrency <= 0: raise ValueError( "'concurrent' must be None or an integer above 0" f", got {self._concurrency!r} instead" ) async def __call__(self, __task: T) -> R: """Queue a ``task`` for bulk execution and return the result when available""" result: "asyncio.Future[R]" = asyncio.get_event_loop().create_future() # queue item first so that the dispatch task does not finish before self._queue.put_nowait((__task, result)) # ensure there is a worker to dispatch items for command execution if self._dispatch_task is None: self._dispatch_task = asyncio.ensure_future(self._bulk_dispatch()) return await result async def _bulk_dispatch(self): """Collect tasks into bulks and dispatch them for command execution""" while not self._queue.empty(): bulk = list(zip(*(await self._get_bulk()))) # noqa B905 if not bulk: continue tasks, futures = bulk # limit concurrent bulk execution # We must make sure *here* that a new bulk can be launched, but # we must release the claim *in the task* when it is done. await self._concurrent.acquire() task = asyncio.ensure_future(self._bulk_execute(tuple(tasks), futures)) task.add_done_callback(lambda _: self._concurrent.release) # track tasks via strong references to avoid them being garbage collected. # see bpo#44665 self._bulk_tasks.add(task) task.add_done_callback(lambda _, task=task: self._bulk_tasks.discard(task)) # yield to the event loop so that the `while True` loop does not arbitrarily # delay other tasks on the fast paths for `_get_bulk` and `acquire`. await asyncio.sleep(0) self._dispatch_task = None async def _get_bulk(self) -> "List[Tuple[T, asyncio.Future[R]]]": """Fetch the next bulk from the internal queue""" max_items, queue = self._size, self._queue # always pull in at least one item asynchronously # this avoids stalling for very low delays and efficiently waits for items results = [await queue.get()] queue.task_done() deadline = time.monotonic() + self._delay while len(results) < max_items and time.monotonic() < deadline: try: if queue.empty(): item = await asyncio.wait_for( queue.get(), deadline - time.monotonic() ) else: item = queue.get_nowait() except asyncio.TimeoutError: break else: results.append(item) queue.task_done() return results async def _bulk_execute( self, tasks: Tuple[T, ...], futures: "List[asyncio.Future[R]]" ) -> None: """Execute several ``tasks`` in bulk and set their ``futures``' result""" try: results = await self._command(*tasks) # make sure we can cleanly match input to output results = [None] * len(futures) if results is None else list(results) if len(results) != len(futures): raise RuntimeError( f"bulk command {self._command} provided {len(results)} results" f", expected {len(futures)} results or 'None'" ) except Exception as task_exception: for future in futures: future.set_exception(task_exception) else: for future, result in zip(futures, results): # noqa B905 future.set_result(result)