Source code for chemfit.async_helpers

import asyncio
from collections.abc import Iterable
from typing import Any, Callable

from chemfit.abstract_objective_function import EvaluateContext


[docs] async def async_eval_one( obj: Callable[[dict[str, Any], EvaluateContext], float], params: dict[str, Any], ctx: EvaluateContext, ): """ Evaluate one objective call asynchronously. This helper runs a single evaluation of an objective function in a background thread using ``asyncio.to_thread``. It is intended for concurrent or parallel evaluation of stateless objectives that accept a `(params, ctx)` signature. Args: obj (Callable[[dict[str, Any], EvaluateContext], float]): Objective-like callable. Typically an ``ObjectiveFunctor`` or a wrapper around one. Must be synchronous and thread-safe when provided distinct ``EvaluateContext`` instances. params (dict[str, Any]): Parameter dictionary for this evaluation. ctx (EvaluateContext): Context that will be populated during evaluation. Returns: float: The scalar loss computed by ``obj``. Notes: - This function does **not** create the context; callers must supply one context per evaluation. - ``async_eval_one`` does not modify ``obj``. If ``obj`` stores per-evaluation state internally, it is **not** safe to use with this function. """ return await asyncio.to_thread(obj, params, ctx)
[docs] async def async_eval_many( obj: Callable[[dict[str, Any], EvaluateContext], float], params_list: Iterable[dict[str, Any]], ctxs: Iterable[EvaluateContext], ): """ Evaluate multiple objective calls concurrently. This helper schedules a batch of evaluations of the same objective using ``asyncio.gather``. Each evaluation receives its own distinct ``EvaluateContext``. All evaluations run concurrently via ``asyncio.to_thread`` and therefore do not block the event loop. Args: obj (Callable[[dict[str, Any], EvaluateContext], float]): Objective-like callable. Must be compatible with concurrent calls when provided separate contexts. params_list (Iterable[dict[str, Any]]): Iterable of parameter dictionaries. One evaluation is performed per entry. ctxs (Iterable[EvaluateContext]): Iterable of contexts. Must have the same length as ``params_list``. Each context is populated independently. Returns: list[float]: A list of scalar loss values in the same order as ``params_list``. Raises: ValueError: If ``params_list`` and ``ctxs`` have mismatched lengths when zipped with ``strict=True``. Notes: - The caller is responsible for allocating exactly one ``EvaluateContext`` per evaluation. - This function does not modify shared state on ``obj``; if the objective stores per-evaluation data in ``self`` instead of in ``ctx``, it cannot be used safely. """ tasks = [ async_eval_one(obj, p, ctx) for p, ctx in zip(params_list, ctxs, strict=True) ] return await asyncio.gather(*tasks)