Source code for chemfit.executor_utils

from __future__ import annotations

from typing import (
    TYPE_CHECKING,
    Any,
    Callable,
    Generic,
    ParamSpec,
    TypeVar,
    cast,
)

from chemfit.abstract_objective_function import EvaluateContext, ExecutorLike

if TYPE_CHECKING:
    from collections.abc import Iterable

T_co = TypeVar("T_co", covariant=True)
P = ParamSpec("P")


[docs] class AttachContextAsReturnValue(Generic[T_co]): """ Wrap a callable so it also returns serialized context state. This helper is primarily used when executing functions through an executor that may run in a separate process. In that case, mutations to an ``EvaluateContext`` made inside the worker are not reflected in the caller's original context object. This wrapper makes those side effects explicit by returning the function result together with ``ctx.__getstate__()``. The wrapped callable is expected to receive an ``EvaluateContext`` as its final positional argument. """
[docs] def __init__(self, func: Callable[..., T_co]) -> None: """ Initialize the wrapper. Args: func: Callable whose final positional argument must be an ``EvaluateContext``. """ self.func = func
[docs] def __call__(self, *args: Any) -> tuple[T_co, dict[str, Any]]: """ Call the wrapped function and return its result with context state. Args: *args: Positional arguments forwarded to the wrapped callable. The final argument must be an ``EvaluateContext``. Returns: A tuple containing: - The return value of the wrapped callable. - The serialized state of the provided ``EvaluateContext``. Raises: AssertionError: If the final positional argument is not an ``EvaluateContext``. """ ctx = cast("EvaluateContext", args[-1]) assert isinstance(ctx, EvaluateContext) return (self.func(*args), ctx.to_result_state())
[docs] def map_with_context( executor: ExecutorLike, fn: Callable[..., T_co], *iterables: Iterable[Any], ctxs: Iterable[EvaluateContext], timeout: float | None = None, chunksize: int = 1, ) -> list[T_co]: """ Map a function over iterables and propagate context side effects. This helper behaves like ``executor.map(...)`` for callables whose final positional argument is an ``EvaluateContext``. Each worker returns both the function result and the serialized state of its context. After execution, the input context objects are updated in place via ``EvaluateContext.__setstate__()`` so that caller-visible context state reflects mutations performed inside the executor. This is especially useful for executors that may run work in separate processes, where in-worker mutations to context objects would otherwise not be visible to the caller. Args: executor: Executor used to evaluate the mapped calls. fn: Callable to evaluate. Its final positional argument must be an ``EvaluateContext``. *iterables: Iterables supplying the non-context positional arguments for each mapped call. ctxs: Iterable of evaluation contexts, one per mapped call. timeout: Maximum number of seconds to wait for results. If ``None``, wait indefinitely. chunksize: Approximate chunk size passed through to ``executor.map(...)``. Returns: A list of function return values in executor map order. Side Effects: Updates each context in ``ctxs`` in place using the serialized state returned from the corresponding worker execution. """ fn_with_ctx_ret = AttachContextAsReturnValue(func=fn) return_vals_with_ctx = executor.map( fn_with_ctx_ret, *iterables, ctxs, timeout=timeout, chunksize=chunksize ) return_vals = [] for ctx_in, (r, ctx_r) in zip(ctxs, return_vals_with_ctx): ctx_in.apply_result_state(ctx_r) return_vals.append(r) return return_vals