Source code for chemfit.executor_wrapper_cob
from __future__ import annotations
from concurrent.futures import ThreadPoolExecutor
from typing import Any
from chemfit.abstract_objective_function import (
EvaluateContext,
ExecutorLike,
ObjectiveFunctor,
)
from chemfit.combined_objective_function import CombinedObjectiveFunction
from chemfit.executor_utils import map_with_context
[docs]
class ExecutorWrapperCOB(ObjectiveFunctor):
[docs]
def __init__(
self, cob: CombinedObjectiveFunction, executor: ExecutorLike | None = None
):
"""
Initialize a concurrent wrapper for a combined objective.
This wrapper evaluates the terms of a ``CombinedObjectiveFunction``
through an ``ExecutorLike`` instance. Each term is evaluated in its
own child ``EvaluateContext``, and the resulting term values are
reduced using the wrapped combined objective's reduction function.
If no executor is provided here, the wrapper falls back to
``ctx.executor`` at call time. If neither is available, a
``ThreadPoolExecutor`` is created lazily.
Args:
cob: Combined objective function whose terms will be evaluated
concurrently.
executor: Optional default executor used when ``ctx.executor``
is not set.
"""
self.cob = cob
self.executor: ExecutorLike | None = executor
def __enter__(self):
"""
Enter the wrapper context.
This method allows the wrapper to be used as a context manager for
interface consistency with other combined-objective wrappers.
Returns:
The wrapper instance itself.
"""
return self
def __exit__(
self,
exc_type: type[BaseException] | None,
exc: BaseException | None,
tb: object,
): ...
[docs]
def __call__(
self, parameters: dict[str, Any], ctx: EvaluateContext | None = None
) -> float:
"""
Evaluate the wrapped combined objective using an executor.
This method prepares one child context per objective term, evaluates
the terms through the configured executor, filters out any skipped
terms, and reduces the remaining weighted term values using the
wrapped combined objective's reduction function.
Executor selection follows this order:
1. ``ctx.executor``, if set
2. ``self.executor``, if set
3. a lazily created ``ThreadPoolExecutor``
Args:
parameters: Parameter dictionary for the evaluation.
ctx: Optional parent evaluation context. If ``None``, a new
``EvaluateContext`` is created.
Returns:
The reduced scalar loss computed from the evaluated terms.
Side Effects:
- Initializes the parent context through ``self.cob.prepare_evaluation(...)``.
- Spawns one child context per objective term.
- Evaluates terms through the selected executor.
- Collects child metadata into ``ctx.meta["children"]``.
- Stores the final reduced loss in ``ctx.loss``.
"""
if ctx is None:
ctx = EvaluateContext()
ctx.parameters = parameters
executor = ctx.executor or self.executor or ThreadPoolExecutor()
assert executor is not None
ctx.parameters = parameters
ctx.meta.update({"n_terms": self.cob.n_terms()})
with ctx.child_contexts(
self.cob.n_terms(), configurator=self.cob.child_context_configurator
) as child_contexts:
terms = map_with_context(
executor,
self.cob.evaluate_term,
[parameters for _ in range(self.cob.n_terms())],
range(self.cob.n_terms()),
ctxs=child_contexts,
)
filtered_terms = self.cob.filter_terms(terms, ctx)
ctx.loss = self.cob.apply_reduction(filtered_terms, ctx)
return ctx.loss