chemfit package

Submodules

chemfit.abstract_objective_function module

class ChildContextConfigurator(*args, **kwargs)[source]

Bases: Protocol

Protocol for configuring child evaluation contexts.

The configurator is called once for each child context immediately after the parent context has spawned them. It may mutate the child context or the parent context in place to configure child-specific evaluation behavior, metadata, or resource usage.

The idx_child_ctx argument is the absolute index of the current child within the spawned batch.

__call__(idx_child_ctx: int, child_ctx: EvaluateContext, num_children: int, parent_ctx: EvaluateContext)[source]

Call self as a function.

__init__(*args, **kwargs)
class EvaluateContext(config: SimpleNamespace | None = None, shared: SimpleNamespace | None = None, executor: ExecutorLike | None = None)[source]

Bases: object

__init__(config: SimpleNamespace | None = None, shared: SimpleNamespace | None = None, executor: ExecutorLike | None = None)[source]

Container for per-evaluation state.

A new instance of EvaluateContext should generally be created for each evaluation of an objective function or quantity computation. Implementations write all per-call information into the context rather than storing it in the objective instance. This makes evaluation easier to reason about and compatible with concurrent execution.

The context may also own a single batch of child contexts representing nested sub-evaluations. Such child contexts can be created explicitly with spawn_children() or managed with the child_contexts() context manager.

Parameters:
  • config – Optional child-local evaluation configuration. This namespace is copied to spawned child contexts so that children inherit parent defaults but can be configured independently

  • shared – Optional namespace for shared read-only state that may be reused across related contexts, such as parent/child evaluations.

  • executor – Optional executor-like object that can be used by evaluation code to schedule parallel work.

quantities

Intermediate quantities computed during evaluation. Implementations may leave this as None if no quantities are produced.

Type:

dict[str, Any] | None

parameters

Parameter dictionary used for this evaluation.

Type:

dict[str, Any] | None

loss

Final scalar loss value. Set by ObjectiveFunctor implementations.

Type:

float | None

meta

Free-form metadata dictionary. Implementations may add diagnostic or structural information here as needed. Meta data from child contexts may be collected into the parent

Type:

dict[str, Any]

temp

Scratch space for temporary values during evaluation. Nothing stored here is part of the public API. It is omitted from the to_meta_data function.

Type:

SimpleNamespace

config

Child-local evaluation configuration for this context.

shared

Shared state or resources reused across related contexts.

apply_result_state(state: dict[str, Any])[source]

Apply result-bearing state from another context.

Parameters:

state – State previously produced by to_result_state().

Side Effects:

Updates parameters, loss, quantities, and meta on this context.

child_contexts(n_children: int, configurator: ChildContextConfigurator | None = None, recursive: bool = True)[source]

Create a scoped child-context batch and collect its metadata on exit.

This context manager is a convenience wrapper around spawn_children() and collect_child_meta_data(). It is intended for nested evaluations where the component spawning child contexts is also responsible for collecting their metadata before returning.

Parameters:
  • n_children – Number of child contexts to create.

  • configurator – Optional configurator applied to each spawned child context.

  • recursive – Passed to collect_child_meta_data() when the scope exits.

Yields:

The list of spawned child contexts.

Notes

Child metadata is collected automatically when the context manager exits, even if an exception is raised inside the managed block.

collect_child_meta_data(recursive: bool = True)[source]

Collect metadata from child contexts.

The collected child metadata is stored in self.meta["children"].

Components that spawn child contexts are generally expected to collect their child metadata before returning to their caller. The child_contexts() context manager provides a convenient scoped way to do this automatically.

Parameters:

recursive – If True, metadata from all descendants is collected before serializing the immediate children. This produces a fully materialized metadata tree. If False, only the immediate children are serialized. This can be useful when nested components manage their own metadata collection and have already populated their meta fields.

Notes

In most cases recursive=True is the safest choice, since it ensures that nested child contexts are fully represented in the resulting metadata structure.

spawn_children(n_children: int, configurator: ChildContextConfigurator | None = None) list[EvaluateContext][source]

Create child contexts linked to this context.

Each child receives a deep copy of config, while sharing the same shared namespace and executor reference as the parent.

An EvaluateContext is intended to manage at most one batch of child contexts per evaluation. Calling spawn_children() again on the same context replaces the previous child batch.

In many cases, child_contexts() is the preferred interface, since it automatically collects child metadata when the nested evaluation scope exits.

Parameters:
  • n_children – Number of child contexts to create.

  • configurator – Optional configurator applied once to each spawned child context immediately after creation.

Returns:

The newly created child contexts.

to_meta_data() dict[str, Any][source]

Return a dictionary summarizing the evaluation state.

Returns:

A dictionary containing the fields quantities, parameters, loss, and meta.

Return type:

dict[str, Any]

to_result_state() dict[str, Any][source]

Return the result-bearing state of this context.

Returns:

Dictionary containing the evaluation results recorded in this context. This state is intended for child/worker-to-parent synchronization and does not include shared resources or child context objects.

class ExecutorLike(*args, **kwargs)[source]

Bases: Protocol

Minimal executor protocol used for parallel evaluation.

This interface is modeled after concurrent.futures.Executor.

__init__(*args, **kwargs)
map(fn: Callable[..., T], *iterables: Iterable[Any], timeout: float | None = None, chunksize: int = 1) Iterable[T][source]
submit(fn: Callable[[...], T], /, *args, **kwargs) FutureLike[T][source]
class FutureLike(*args, **kwargs)[source]

Bases: Protocol, Generic[T]

Minimal protocol for future-like objects used by the evaluation framework.

This protocol intentionally mirrors the subset of the interface provided by concurrent.futures.Future

__init__(*args, **kwargs)
cancel() bool[source]
result(timeout: float | None = None) T[source]
class ObjectiveFunctor[source]

Bases: object

__call__(parameters: dict[str, Any], ctx: EvaluateContext | None = None) float[source]

Evaluate the objective function.

Implementations should compute a scalar loss from the given parameter dictionary. All per-evaluation state must be written into the provided ctx. If no context is supplied, a new one should be created internally.

Parameters:
  • parameters (dict[str, Any]) – Mapping of parameter names to float values.

  • ctx (EvaluateContext | None) – Optional evaluation context. If None, a new EvaluateContext should be created.

Returns:

The computed scalar loss.

Return type:

float

Notes

  • Implementations should avoid mutating self during the call. All per-evaluation information should be placed in ctx instead.

  • This method is synchronous. For concurrent or asynchronous evaluation, use one EvaluateContext per call and invoke this method in multiple threads/tasks.

class QuantityComputer[source]

Bases: object

__call__(parameters: dict[str, Any], ctx: EvaluateContext | None = None) dict[str, Any][source]

Compute quantities for the given parameters.

Parameters:
  • parameters (dict[str, Any]) – Parameter dictionary.

  • ctx (EvaluateContext | None) – Optional context. If None, a new one is created.

Returns:

The computed quantity dictionary.

Return type:

dict[str, Any]

Notes

Implementations of _compute must not mutate self. All per-evaluation information should be written into ctx.

Side Effects:

Stores parameters in ctx.parameters. Merges self.static_meta_data into ctx.meta. Stores the computed quantities in ctx.quantities.

__init__()[source]

Initialize a quantity computer.

A QuantityComputer maps a parameter dictionary to a dictionary of intermediate quantities, typically used by an objective function. Instances may hold static configuration, but should not store per-evaluation state internally.

static_meta_data

Static metadata associated with this quantity computer. This is merged into ctx.meta on each call.

Type:

dict[str, Any]

with_loss(loss_function: Callable[[...], float], /, **kwargs: Any) QuantityComputerObjectiveFunction[source]

Create a new QuantityComputerObjectiveFunction from this QuantityComputer.

Parameters:

loss_function (LossFunction) – The loss function to use.

Returns:

A new QuantityComputerObjectiveFunction

Return type:

QuantityComputerObjectiveFunction

class QuantityComputerObjectiveFunction(loss_function: Callable[[dict[str, Any]], float] | Callable[[dict[str, Any], dict[str, Any]], float], quantity_computer: QuantityComputer)[source]

Bases: ObjectiveFunctor

__call__(parameters: dict[str, Any], ctx: EvaluateContext | None = None) float[source]

Compute the objective loss.

This method: 1. Computes intermediate quantities using the quantity computer. 2. Applies the loss function. 3. Stores results in the evaluation context.

Parameters:
  • parameters (dict[str, Any]) – Parameter dictionary.

  • ctx (EvaluateContext | None) – Optional context. If None, a new one is created.

Returns:

The computed scalar loss.

Return type:

float

Side Effects:

Stores the computed loss in ctx.loss. Updates ctx.meta with self.static_meta_data after the wrapped QuantityComputer may have already added metadata. Populates ctx.quantities and ctx.parameters via the wrapped QuantityComputer.

Notes

loss_function may accept either (quantities) or (quantities, parameters) as positional args.

__init__(loss_function: Callable[[dict[str, Any]], float] | Callable[[dict[str, Any], dict[str, Any]], float], quantity_computer: QuantityComputer) None[source]

Objective function composed of a QuantityComputer and a loss function.

This objective first computes intermediate quantities using quantity_computer and then applies loss_function to obtain a scalar loss.

Parameters:
  • loss_function (Callable) –

    A function with signature:

    loss_function(quantities) -> float or loss_function(quantities, parameters) -> float

  • quantity_computer (QuantityComputer) – Object responsible for computing intermediate quantities.

static_meta_data

Static metadata associated with this objective. Merged into ctx.meta on each call.

Type:

dict[str, Any]

chemfit.ase_objective_function module

class AtomsFactory(*args, **kwargs)[source]

Bases: Protocol

Protocol for a function that creates an ASE Atoms object.

__call__() Atoms[source]

Create an atoms object.

__init__(*args, **kwargs)
class AtomsPostProcessor(*args, **kwargs)[source]

Bases: Protocol

Protocol for a callable that post-processes an ASE Atoms object.

__call__(atoms: Atoms) None[source]

Modify the atoms in-place.

__init__(*args, **kwargs)
class CalculatorFactory(*args, **kwargs)[source]

Bases: Protocol

Protocol for a callable that attaches an ASE calculator to atoms.

Implementations are expected to construct or configure a calculator for the given Atoms object and assign it to atoms.calc.

__call__(atoms: Atoms) None[source]

Construct a calculator and overwrite atoms.calc.

__init__(*args, **kwargs)
class DefaultQuantityProcessor(filter_keys: list[str] | None = None)[source]

Bases: object

__call__(calc: Calculator, atoms: Atoms) dict[str, Any][source]

Call self as a function.

__init__(filter_keys: list[str] | None = None) None[source]

Initialize a default quantity processor, that returns all of the results of the calculator.

The returned quantity dictionary contains all entries from calc.results plus "n_atoms". Any keys listed in filter_keys are excluded from the returned dictionary.

Parameters:

filter_keys – Optional list of keys to exclude from the returned quantity dictionary.

class MinimizationASEComputer(dt: float = 0.01, fmax: float = 1e-05, max_steps: int = 2000, **kwargs)[source]

Bases: SinglePointASEComputer

ASE-based quantity computer using a locally optimized structure.

This computer evaluates quantities after performing a local geometry optimization using the ASE BFGS optimizer. Quantities are extracted from the relaxed structure using the configured quantity processors.

__init__(dt: float = 0.01, fmax: float = 1e-05, max_steps: int = 2000, **kwargs) None[source]

Initialize a MinimizationASEComputer.

All additional keyword arguments are forwarded to SinglePointASEComputer.__init__.

Parameters:
  • dt – Relaxation step-size parameter retained for compatibility with earlier implementations. Currently unused.

  • fmax – Force convergence criterion passed to the optimizer.

  • max_steps – Maximum number of optimization steps.

  • **kwargs – Additional keyword arguments forwarded to the parent initializer.

class ParameterApplier(*args, **kwargs)[source]

Bases: Protocol

Protocol for a callable that applies parameters to an ASE calculator.

__call__(atoms: Atoms, params: dict[str, Any]) None[source]

Applies a parameter dictionary to atoms.calc in-place.

__init__(*args, **kwargs)
class PathAtomsFactory(path: Path, index: int | None = None)[source]

Bases: AtomsFactory

Atoms factory that reads a single structure from a filesystem path.

__call__() Atoms[source]

Create an atoms object.

__init__(path: Path, index: int | None = None) None[source]

Initialize the factory.

Parameters:
  • path – Path to a structure file readable by ASE.

  • index – Optional ASE index selecting which image to read. The selection must resolve to a single Atoms object.

class QuantityProcessor(*args, **kwargs)[source]

Bases: Protocol

Protocol for a callable that extracts quantities from an ASE evaluation.

A quantity processor is called after the calculator has evaluated an Atoms object. It receives the calculator and atoms pair and returns a dictionary of quantities to include in the final output.

__call__(calc: Calculator, atoms: Atoms) dict[str, Any][source]

Extract quantities from an evaluated calculator and atoms pair.

Parameters:
  • calc – Calculator that has already evaluated atoms.

  • atoms – Evaluated atoms object.

Returns:

A dictionary of extracted quantities.

__init__(*args, **kwargs)
class SinglePointASEComputer(calc_factory: CalculatorFactory, param_applier: ParameterApplier, atoms_factory: AtomsFactory, atoms_post_processor: AtomsPostProcessor | None = None, quantity_processors: list[QuantityProcessor] | None = None, tag: str | None = None)[source]

Bases: QuantityComputer

ASE-based quantity computer for single-point evaluations.

This class evaluates quantities for a parameterized ASE calculation using an atoms factory, optional atoms post-processing, a calculator factory, a parameter applier, and one or more quantity processors.

__init__(calc_factory: CalculatorFactory, param_applier: ParameterApplier, atoms_factory: AtomsFactory, atoms_post_processor: AtomsPostProcessor | None = None, quantity_processors: list[QuantityProcessor] | None = None, tag: str | None = None) None[source]

Initialize the computer.

Parameters:
  • calc_factory – Callable that attaches a calculator to an Atoms object.

  • param_applier – Callable that applies a parameter dictionary to the calculator attached to an Atoms object.

  • atoms_factory – Callable that creates the base Atoms object.

  • atoms_post_processor – Optional callable that modifies the base

  • evaluation. (atoms object before it is cached and copied for)

  • quantity_processors – Optional list of callables that extract quantities from the evaluated calculator and atoms pair. If None, a DefaultQuantityProcessor is used.

  • tag – Optional label for this computer. If None, "tag_None" is used.

prepare_ctx(parameters: dict[str, Any], ctx: EvaluateContext)[source]

Prepare the evaluation context for a single-point calculation.

This method lazily creates and caches a base atoms object using atoms_factory. If provided, atoms_post_processor is applied once to that base object before it is cached. For each evaluation, the cached atoms object is copied into ctx.temp.atoms, a fresh calculator is attached, and the provided parameters are applied.

Parameters:
  • parameters – Parameter dictionary for the current evaluation.

  • ctx – Evaluation context to populate.

chemfit.async_helpers module

async async_eval_many(obj: Callable[[dict[str, Any], EvaluateContext], float], params_list: Iterable[dict[str, Any]], ctxs: Iterable[EvaluateContext])[source]

Evaluate multiple objective calls concurrently.

This helper schedules a batch of evaluations of the same objective using asyncio.gather. Each evaluation receives its own distinct EvaluateContext. All evaluations run concurrently via asyncio.to_thread and therefore do not block the event loop.

Parameters:
  • obj (Callable[[dict[str, Any], EvaluateContext], float]) – Objective-like callable. Must be compatible with concurrent calls when provided separate contexts.

  • params_list (Iterable[dict[str, Any]]) – Iterable of parameter dictionaries. One evaluation is performed per entry.

  • ctxs (Iterable[EvaluateContext]) – Iterable of contexts. Must have the same length as params_list. Each context is populated independently.

Returns:

A list of scalar loss values in the same order as params_list.

Return type:

list[float]

Raises:

ValueError – If params_list and ctxs have mismatched lengths when zipped with strict=True.

Notes

  • The caller is responsible for allocating exactly one EvaluateContext per evaluation.

  • This function does not modify shared state on obj; if the objective stores per-evaluation data in self instead of in ctx, it cannot be used safely.

async async_eval_one(obj: Callable[[dict[str, Any], EvaluateContext], float], params: dict[str, Any], ctx: EvaluateContext)[source]

Evaluate one objective call asynchronously.

This helper runs a single evaluation of an objective function in a background thread using asyncio.to_thread. It is intended for concurrent or parallel evaluation of stateless objectives that accept a (params, ctx) signature.

Parameters:
  • obj (Callable[[dict[str, Any], EvaluateContext], float]) – Objective-like callable. Typically an ObjectiveFunctor or a wrapper around one. Must be synchronous and thread-safe when provided distinct EvaluateContext instances.

  • params (dict[str, Any]) – Parameter dictionary for this evaluation.

  • ctx (EvaluateContext) – Context that will be populated during evaluation.

Returns:

The scalar loss computed by obj.

Return type:

float

Notes

  • This function does not create the context; callers must supply one context per evaluation.

  • async_eval_one does not modify obj. If obj stores per-evaluation state internally, it is not safe to use with this function.

chemfit.combined_objective_function module

class Aggregator(*args, **kwargs)[source]

Bases: Protocol

__call__(terms: list[float], quantities: list[dict[str, Any]], ctx: EvaluateContext) float[source]

Call self as a function.

__init__(*args, **kwargs)
class CombinedObjectiveFunction(objective_functions: ~collections.abc.Sequence[~typing.Callable[[dict[str, ~typing.Any]], float]], weights: ~collections.abc.Sequence[float] | None = None, child_context_configurator: ~chemfit.abstract_objective_function.ChildContextConfigurator | None = None, reduction: ~chemfit.combined_objective_function.Reducer | ~chemfit.combined_objective_function.Aggregator = <function sum_reducer>, exception_handler: ~chemfit.combined_objective_function.ExceptionHandler = <function raising_exception_handler>)[source]

Bases: ObjectiveFunctor

classmethod add_flat(combined_objective_functions_list: Sequence[Self], weights: Sequence[float] | None = None) Self[source]

Flatten multiple combined objectives into a single instance.

The objective functions from all input instances are concatenated into one flat list. The weights of each input instance are scaled by the corresponding entry in weights before concatenation.

Warning

This method does not preserve execution policy from the input combined objectives. The resulting instance uses the default reduction, exception_handler, and child_context_configurator unless they are explicitly set afterward.

Parameters:
  • combined_objective_functions_list – Combined objective instances to flatten.

  • weights – Optional non-negative scaling weights, one per input combined objective. If None, all scaling weights default to 1.0.

Returns:

A new combined objective containing all flattened terms and scaled weights.

Raises:

AssertionError – If the number of scaling weights does not match the number of combined objectives, or if any scaling weight is negative.

__call__(parameters: dict[str, Any], ctx: EvaluateContext | None = None) float[source]

Evaluate the combined objective.

Each objective term is evaluated in its own child context using the same parameter dictionary. The weighted term values are combined using self.reduction. After evaluation, child metadata is collected into the parent context and the reduced loss is stored in ctx.loss.

Parameters:
  • parameters – Parameter dictionary for the evaluation.

  • ctx – Optional parent evaluation context. If None, a new EvaluateContext is created.

Returns:

The reduced scalar loss computed from the evaluated terms.

Side Effects:
  • Populates ctx.parameters.

  • Spawns child contexts in ctx.

  • Collects child metadata into ctx.meta["children"].

  • Stores the final reduced loss in ctx.loss.

__init__(objective_functions: ~collections.abc.Sequence[~typing.Callable[[dict[str, ~typing.Any]], float]], weights: ~collections.abc.Sequence[float] | None = None, child_context_configurator: ~chemfit.abstract_objective_function.ChildContextConfigurator | None = None, reduction: ~chemfit.combined_objective_function.Reducer | ~chemfit.combined_objective_function.Aggregator = <function sum_reducer>, exception_handler: ~chemfit.combined_objective_function.ExceptionHandler = <function raising_exception_handler>) None[source]

Initialize a combined objective from multiple weighted terms.

Each objective term is evaluated independently in its own child context. The resulting term values are multiplied by their corresponding weights, optionally filtered through exception_handler if evaluation fails, and then combined using reduction.

Generic callables are automatically wrapped as ObjectiveFunctor instances.

Parameters:
  • objective_functions – Sequence of objective functors or compatible callables.

  • weights – Optional non-negative weight for each objective term. If None, all weights default to 1.0.

  • child_context_configurator – Optional callable used to configure each spawned child context before term evaluation.

  • reduction – Callable used to reduce the list of weighted term values to a single scalar loss. Can be either a simple reducer, or the more advanced Aggregator, which can make use of the full context and the quantities.

  • exception_handler – Callable used to handle exceptions raised during term evaluation. It may return a replacement value or None to skip the term entirely.

Raises:

AssertionError – If the number of weights does not match the number of objective functions, or if any weight is negative.

add(obj_funcs: Sequence[Callable[[dict[str, Any]], float]] | Callable[[dict[str, Any]], float], weights: Sequence[float] | float = 1.0) Self[source]

Add one or more objective terms to the combined objective.

Each added callable is converted to an ObjectiveFunctor if needed and appended to the existing term list. The corresponding weights are appended in the same order.

Parameters:
  • obj_funcs – A single objective callable or a sequence of objective callables to add.

  • weights – Either a single non-negative weight applied to every added callable, or a sequence of non-negative weights whose length matches the number of added callables.

Returns:

The current instance.

Raises:

AssertionError – If a sequence of weights is provided with a length that does not match the number of added callables, or if any provided weight is negative.

apply_reduction(terms: Sequence[float], ctx: EvaluateContext)[source]
evaluate_term(parameters: dict[str, Any], idx: int, ctx: EvaluateContext) float | None[source]

Evaluate a single weighted objective term.

The selected objective function is evaluated with the provided parameters and child context, then multiplied by its corresponding weight. If evaluation raises an exception, the configured exception_handler is called.

Parameters:
  • parameters – Parameter dictionary for the current evaluation.

  • idx – Absolute index of the objective term to evaluate.

  • ctx – Child evaluation context for this term.

Returns:

The weighted term value, or None if the exception handler chooses to skip the term.

evaluate_terms(parameters: dict[str, Any], ctx: EvaluateContext) list[float][source]

Evaluate the objective terms.

This method prepares child contexts, evaluates each selected term in its own context, and drops any terms for which evaluate_term() returns None.

Parameters:
  • parameters – Parameter dictionary for the current evaluation.

  • ctx – Parent evaluation context.

Returns:

List of weighted term values that were successfully evaluated and not skipped by the exception handler.

filter_terms(terms: list[float | None], ctx: EvaluateContext) list[float][source]

Filter out terms that are ‘None’, while recording the skipped terms in ctx.meta[‘skipped_indices’].

Side effects:
  • Writes to ctx.meta[‘skipped_indices’]

n_terms() int[source]

Return the number of objective terms.

class ExceptionHandler(*args, **kwargs)[source]

Bases: Protocol

__call__(exception: Exception, ctx: EvaluateContext, idx: int) float | None[source]

Call self as a function.

__init__(*args, **kwargs)
class Reducer(*args, **kwargs)[source]

Bases: Protocol

__call__(terms: list[float]) float[source]

Call self as a function.

__init__(*args, **kwargs)
class WrappedReducer(reducer: Reducer)[source]

Bases: Aggregator

__call__(terms: Sequence[float], quantities: Sequence[dict[str, Any]], ctx: EvaluateContext) float[source]

Call self as a function.

__init__(reducer: Reducer) None[source]

A reducer that is wrapped in order to be used like as an Aggregator.

to_reducer() Reducer[source]
mean_reducer(terms: list[float]) float[source]
nan_exception_handler(exception: Exception, ctx: EvaluateContext, idx: int) float | None[source]
raising_exception_handler(exception: Exception, ctx: EvaluateContext, idx: int) float | None[source]
root_mean_reducer(terms: list[float]) float[source]
skip_exception_handler(exception: Exception, ctx: EvaluateContext, idx: int) float | None[source]
sum_reducer(terms: list[float]) float[source]
transform_generic_callables(list_of_callables: Sequence[Callable[[dict[str, Any]], float]]) list[ObjectiveFunctor][source]

chemfit.data_utils module

process_csv(paths_to_csv: Path | Sequence[Path], index: slice | Sequence[slice] = slice(None, None, None)) tuple[list[Path], list[str], list[float]][source]

Load a dataset CSV and extract file paths, tags, and reference energies.

If a list of paths is passed it forwards them one by one to process_single_csv and collects the results.

Parameters:
  • paths_to_csv (Union[Path, Sequence[Path]]) – Either a single path to a CSV for a list of paths

  • index (Union[slice, Sequence[slice]]) – Either a single slice or a list of slices which is applied to the data read from the CSVs

Returns:

  • paths: List of resolved Path objects to each data file.

  • tags: List of dataset tag strings.

  • energies: List of reference energies as floats.

Return type:

tuple[list[Path], list[str], list[float]]

process_single_csv(path_to_csv: Path, index: slice = slice(None, None, None)) tuple[list[Path], list[str], list[float]][source]

Load a dataset CSV and extract file paths, tags, and reference energies.

The CSV must include the following columns:
  • Either path or file:
    • If path is present, each entry may be absolute or relative to the current working directory.

    • Otherwise, file entries are taken as relative to the CSV’s parent directory.

    • If both are present, path takes precedence.

  • tag: A short string label for each dataset.

  • reference_energy: A numeric reference energy for each dataset.

Additional columns are permitted and ignored.

Parameters:
  • path_to_csv (Path) – Path to the CSV file describing the datasets.

  • index (slice) slice(None, None, None) – A slice which is applied to the data read from the CSV

Returns:

  • paths: List of resolved Path objects to each data file.

  • tags: List of dataset tag strings.

  • energies: List of reference energies as floats.

Return type:

tuple[list[Path], list[str], list[float]]

Raises:
  • FileNotFoundError – If the CSV file does not exist.

  • KeyError – If neither path nor file, or if tag or reference_energy columns are missing.

  • ValueError – If any reference_energy value cannot be converted to float.

chemfit.debug_utils module

log_all_methods(obj: LoggedObject, log_func: Callable[[str], None], *args, **kwargs) LoggedObject[source]

Return a proxy object that logs method calls and delegates everything to obj.

log_invocation(func: Callable[[Any], T], log_func: Callable[[str], None], log_args: bool = True, log_res: bool = True) Callable[[Any], T][source]

chemfit.executor_utils module

class AttachContextAsReturnValue(func: Callable[[...], T_co])[source]

Bases: Generic[T_co]

Wrap a callable so it also returns serialized context state.

This helper is primarily used when executing functions through an executor that may run in a separate process. In that case, mutations to an EvaluateContext made inside the worker are not reflected in the caller’s original context object. This wrapper makes those side effects explicit by returning the function result together with ctx.__getstate__().

The wrapped callable is expected to receive an EvaluateContext as its final positional argument.

__call__(*args: Any) tuple[T_co, dict[str, Any]][source]

Call the wrapped function and return its result with context state.

Parameters:

*args – Positional arguments forwarded to the wrapped callable. The final argument must be an EvaluateContext.

Returns:

  • The return value of the wrapped callable.

  • The serialized state of the provided EvaluateContext.

Return type:

A tuple containing

Raises:

AssertionError – If the final positional argument is not an EvaluateContext.

__init__(func: Callable[[...], T_co]) None[source]

Initialize the wrapper.

Parameters:

func – Callable whose final positional argument must be an EvaluateContext.

map_with_context(executor: ExecutorLike, fn: Callable[..., T_co], *iterables: Iterable[Any], ctxs: Iterable[EvaluateContext], timeout: float | None = None, chunksize: int = 1) list[T_co][source]

Map a function over iterables and propagate context side effects.

This helper behaves like executor.map(...) for callables whose final positional argument is an EvaluateContext. Each worker returns both the function result and the serialized state of its context. After execution, the input context objects are updated in place via EvaluateContext.__setstate__() so that caller-visible context state reflects mutations performed inside the executor.

This is especially useful for executors that may run work in separate processes, where in-worker mutations to context objects would otherwise not be visible to the caller.

Parameters:
  • executor – Executor used to evaluate the mapped calls.

  • fn – Callable to evaluate. Its final positional argument must be an EvaluateContext.

  • *iterables – Iterables supplying the non-context positional arguments for each mapped call.

  • ctxs – Iterable of evaluation contexts, one per mapped call.

  • timeout – Maximum number of seconds to wait for results. If None, wait indefinitely.

  • chunksize – Approximate chunk size passed through to executor.map(...).

Returns:

A list of function return values in executor map order.

Side Effects:

Updates each context in ctxs in place using the serialized state returned from the corresponding worker execution.

chemfit.executor_wrapper_cob module

class ExecutorWrapperCOB(cob: CombinedObjectiveFunction, executor: ExecutorLike | None = None)[source]

Bases: ObjectiveFunctor

__call__(parameters: dict[str, Any], ctx: EvaluateContext | None = None) float[source]

Evaluate the wrapped combined objective using an executor.

This method prepares one child context per objective term, evaluates the terms through the configured executor, filters out any skipped terms, and reduces the remaining weighted term values using the wrapped combined objective’s reduction function.

Executor selection follows this order:
  1. ctx.executor, if set

  2. self.executor, if set

  3. a lazily created ThreadPoolExecutor

Parameters:
  • parameters – Parameter dictionary for the evaluation.

  • ctx – Optional parent evaluation context. If None, a new EvaluateContext is created.

Returns:

The reduced scalar loss computed from the evaluated terms.

Side Effects:
  • Initializes the parent context through self.cob.prepare_evaluation(...).

  • Spawns one child context per objective term.

  • Evaluates terms through the selected executor.

  • Collects child metadata into ctx.meta["children"].

  • Stores the final reduced loss in ctx.loss.

__init__(cob: CombinedObjectiveFunction, executor: ExecutorLike | None = None)[source]

Initialize a concurrent wrapper for a combined objective.

This wrapper evaluates the terms of a CombinedObjectiveFunction through an ExecutorLike instance. Each term is evaluated in its own child EvaluateContext, and the resulting term values are reduced using the wrapped combined objective’s reduction function.

If no executor is provided here, the wrapper falls back to ctx.executor at call time. If neither is available, a ThreadPoolExecutor is created lazily.

Parameters:
  • cob – Combined objective function whose terms will be evaluated concurrently.

  • executor – Optional default executor used when ctx.executor is not set.

chemfit.file_based_computer module

class FileBasedQuantityComputer(output_files: list[Path | str], output_parsers: list[OutputParser] | OutputParser, base_working_directory: Path | str, executable_cmd: Callable[[dict[str, Any], Path], list[str]] | None = None, presubmit_hook: PreSubmitHook | None = None, wait_timeout: float | None = 500.0, poll_interval: float = 1, subprocess_run_args: dict | None = None, delete_temp_workdirs: bool = True, write_dump_file_after_crash: bool = True, keep_temp_workdir_after_crash: bool = True)[source]

Bases: QuantityComputer

__init__(output_files: list[Path | str], output_parsers: list[OutputParser] | OutputParser, base_working_directory: Path | str, executable_cmd: Callable[[dict[str, Any], Path], list[str]] | None = None, presubmit_hook: PreSubmitHook | None = None, wait_timeout: float | None = 500.0, poll_interval: float = 1, subprocess_run_args: dict | None = None, delete_temp_workdirs: bool = True, write_dump_file_after_crash: bool = True, keep_temp_workdir_after_crash: bool = True)[source]

Initialize a file-based quantity computer.

This quantity computer evaluates parameters by creating a temporary working directory, executing an external command, waiting for the expected output files to appear, and parsing those files into a quantity dictionary.

Parameters:
  • output_files (list[Path]) – Paths to output files that are expected to be created by the external command. These paths must be relative to the working directory; absolute paths are not allowed.

  • executable_cmd (Callable[[dict[str, Any], Path], list[str]]) – Callable that constructs the command to execute. It receives the parameter dictionary and the temporary working directory, and must return a list of strings suitable for subprocess.run.

  • output_parsers (list[OutputParser] | OutputParser) – One or more output parsers called after the external command completes and the output files exist. Each parser receives the list of output file paths and returns a dictionary of quantities. The results of all parsers are merged.

  • base_working_directory (Path) – Base directory under which temporary working directories will be created, one per evaluation.

  • presubmit_hook (PreSubmitHook | None, optional) – Optional hook executed before the external command is run. It can be used to prepare input files, templates, etc.

  • wait_timeout (float, optional) – Maximum time in seconds to wait for all output files to appear. Defaults to 500.0 seconds.

  • poll_interval (float, optional) – Interval in seconds between checks for output file creation. Defaults to 1 second.

  • subprocess_run_args (dict | None, optional) – Additional keyword arguments forwarded to subprocess.run (e.g. capture_output=True). Defaults to None.

  • delete_temp_workdirs (bool, optional) – Whether to delete temporary working directories after each evaluation. Defaults to True.

  • write_dump_file_after_crash – Whether to write a dump file with subprocess output when command execution fails.

  • keep_temp_workdir_after_crash – Whether to keep the temporary working directory for inspection after a failed evaluation.

Raises:

Exception – If any path in output_files is absolute rather than relative.

build_cmd(parameters: dict[str, Any], ctx: EvaluateContext) list[str][source]

Build the external command for the current evaluation.

Parameters:
  • parameters – Parameter dictionary for the current evaluation.

  • ctx – Evaluation context whose temporary working directory is used when constructing the command.

Returns:

Command to execute, formatted for subprocess.run.

create_temp_workdir() Path[source]

Create and return a fresh temporary working directory.

Returns:

Path to the newly created working directory.

with_cmd(executable_cmd: Callable[[...], list[str]], /, **kwargs: Any) Self[source]

Return a copy of this computer with a bound command function.

The provided executable_cmd may accept additional keyword arguments beyond (parameters, workdir). These are bound via kwargs and the resulting callable is stored as the command builder.

This is a convenience wrapper around functools.partial that avoids requiring users to manually construct partial functions.

Parameters:
  • executable_cmd – Callable used to construct the command. Must accept (parameters: dict[str, Any], workdir: Path, ...) where any additional arguments are keyword-only.

  • **kwargs – Keyword arguments to bind to executable_cmd.

Returns:

A new FileBasedQuantityComputer instance with the updated command function.

Example

>>> from chemfit.file_based_computer import FileBasedQuantityComputer
>>> computer = FileBasedQuantityComputer(
...    output_files=["out.txt"],
...    output_parsers=[],
...    base_working_directory="workdir"
... )
>>> def write_input(parameters, workdir, *, template_path):
...     ...
>>> computer2 = computer.with_presubmit(
...     write_input,
...     template_path="INCAR.template",
... )

Note

Additional arguments must be keyword-only in executable_cmd.

with_presubmit(presubmit: Callable[[...], None], /, **kwargs: Any) Self[source]

Return a copy of this computer with a bound presubmit hook.

The provided presubmit callable may accept additional keyword arguments beyond (parameters, workdir). These are bound via kwargs and the resulting callable is stored as the presubmit hook.

This is a convenience wrapper around functools.partial that avoids requiring users to manually construct partial functions.

Parameters:
  • presubmit – Callable executed before the command is run. Must accept (parameters: dict[str, Any], workdir: Path, ...) where any additional arguments are keyword-only.

  • **kwargs – Keyword arguments to bind to presubmit.

Returns:

A new FileBasedQuantityComputer instance with the updated presubmit hook.

Example

>>> from chemfit.file_based_computer import FileBasedQuantityComputer
>>> computer = FileBasedQuantityComputer(
...    output_files=["out.txt"],
...    output_parsers=[],
...    base_working_directory="workdir"
... )
>>> def write_input(parameters, workdir, *, template_path):
...     ...
>>> computer2 = computer.with_presubmit(
...     write_input,
...     template_path="INCAR.template",
... )

Note

Additional arguments must be keyword-only in presubmit.

class OutputParser(*args, **kwargs)[source]

Bases: Protocol

Protocol for parsing output files into a quantity dictionary.

__call__(output_files: list[Path]) dict[str, Any][source]

Parse the output files and retrieve the quantities.

Parameters:

output_files (list[Path]) – List of paths to output files. These are typically located in the working directory of a single evaluation.

Returns:

Dictionary of parsed quantities.

Return type:

dict[str, Any]

__init__(*args, **kwargs)
class PreSubmitHook(*args, **kwargs)[source]

Bases: Protocol

Protocol for running things before the command is submitted.

__call__(parameters: dict[str, Any], workdir: Path) None[source]

Run pre-submit actions.

Parameters:
  • parameters (dict[str, Any]) – Parameter dictionary for the evaluation.

  • workdir (Path) – Temporary working directory for this evaluation.

__init__(*args, **kwargs)

chemfit.fitter module

class Fitter(objective_function: Callable[[dict[str, Any]], float] | ObjectiveFunctor, initial_params: dict[str, Any], bounds: dict[str, Any] | None = None, near_bound_tol: float | None = None, value_bad_params: float = 100000.0, swallow_exceptions: bool = False, log_exceptions: bool = True)[source]

Bases: object

__init__(objective_function: Callable[[dict[str, Any]], float] | ObjectiveFunctor, initial_params: dict[str, Any], bounds: dict[str, Any] | None = None, near_bound_tol: float | None = None, value_bad_params: float = 100000.0, swallow_exceptions: bool = False, log_exceptions: bool = True) None[source]

Driver class for parameter optimization.

A Fitter wraps an objective (either a plain callable or an ObjectiveFunctor) in a FitterObjectiveFunctor and exposes convenience methods for running optimizations with nevergrad and SciPy.

Parameters:
  • objective_function (Callable | ObjectiveFunctor) – Objective to be minimized. If a plain callable is provided, it is converted to an ObjectiveFunctor using to_objective_functor.

  • initial_params (dict[str, Any]) – Initial parameter values.

  • bounds (dict[str, Any] | None, optional) – Bounds for each parameter. The structure must mirror initial_params, but may omit bounds for parameters. Defaults to None.

  • near_bound_tol (float | None, optional) – If provided, parameters whose optimized values lie within this relative distance of their bounds will trigger a warning in hook_post_fit. Defaults to None.

  • value_bad_params (float, optional) – Threshold used by some objective wrappers to represent invalid or numerically unstable parameter regions. Defaults to 1e5.

fit_nevergrad(budget: int, optimizer_str: str = 'NgIohTuned', num_workers: int = 1, contexts: list[FitterEvaluateContext] | None = None, executor: ExecutorLike | None = None, initial_observations: Iterable[tuple[dict[str, Any], float | None]] | None = None) dict[str, Any][source]

Optimize parameters using a nevergrad optimizer.

This method drives nevergrad’s ask/tell interface and can evaluate multiple candidate points in parallel through an ExecutorLike instance. One FitterEvaluateContext is used per worker so that evaluation-side state can be tracked independently.

Parameters:
  • budget – Total number of objective evaluations to allow.

  • optimizer_str – Name of the nevergrad optimizer to use. Must be a key in ng.optimizers.registry.

  • num_workers – Number of points to evaluate in parallel per ask/tell step.

  • contexts – Optional list of per-worker fitter contexts. If provided, its length must equal num_workers.

  • executor – Optional executor used for parallel evaluation when num_workers > 1. If None, a ThreadPoolExecutor is created.

  • initial_observations – Optional iterable of previously evaluated (parameters, loss) pairs used to seed the optimizer. These observations are replayed into the optimizer before the main optimization loop begins. This allows approximate continuation of a previous run or warm-starting a new optimization. If any parameter set violates the bounds, it is skipped. These observations do not consume evaluations from the main budget and do not trigger callbacks. This does not restore the exact internal state of the optimizer. Only the provided observations are injected.

Returns:

Dictionary of optimized parameter values.

Raises:
  • KeyError – If optimizer_str is not found in the nevergrad optimizer registry.

  • AssertionError – If contexts is provided and its length does not equal num_workers.

Side Effects:
  • Initializes fitter bookkeeping via _hook_pre_fit().

  • Populates self.contexts with one context per worker.

  • Invokes registered callbacks during optimization.

  • Runs post-fit checks via _hook_post_fit().

fit_scipy(method: str = 'L-BFGS-B', ctx: FitterEvaluateContext | None = None, **kwargs) dict[str, Any][source]

Optimize parameters using scipy.optimize.minimize.

The parameter dictionary is flattened into a vector representation for SciPy and reconstructed on each objective evaluation. Because SciPy’s minimize interface is synchronous, a single FitterEvaluateContext is used for the full optimization run.

Parameters:
  • method – Optimization method passed to scipy.optimize.minimize.

  • ctx – Optional fitter evaluation context to reuse during the fit. If None, a new one is created.

  • **kwargs – Additional keyword arguments forwarded to scipy.optimize.minimize.

Returns:

Dictionary of optimized parameter values.

Warning

If the optimizer does not converge, a warning is logged.

Side Effects:
  • Initializes fitter bookkeeping via _hook_pre_fit().

  • Populates self.contexts with a single context.

  • Invokes registered callbacks during optimization.

  • Runs post-fit checks via _hook_post_fit().

register_callback(func: Callable[[int, list[FitterEvaluateContext]], None], n_steps: int)[source]

Register a callback to be executed during optimization.

The callback is invoked every n_steps iterations (or nevergrad/SciPy “steps”, depending on the backend), and receives the current step index and the list of FitterEvaluateContext instances used by the fitter.

Parameters:
  • func (Callable[[int, list[FitterEvaluateContext]], None]) – Callback function of the form func(step, contexts).

  • n_steps (int) – Interval (in steps) at which the callback is invoked.

class FitterEvaluateContext[source]

Bases: EvaluateContext

__init__()[source]

Initialize fitter-specific evaluation state.

This context extends EvaluateContext with optimization-specific tracking fields that record the number of evaluations performed and the best loss, parameters, and metadata observed so far during a fit.

class FitterObjectiveFunctor(wrap_me: ObjectiveFunctor, swallow_exceptions: bool = False, log_exceptions: bool = True, value_bad_params: float = 100000.0)[source]

Bases: ObjectiveFunctor

__call__(parameters: dict[str, Any], ctx: FitterEvaluateContext | None = None) float[source]

Evaluate the objective function.

Implementations should compute a scalar loss from the given parameter dictionary. All per-evaluation state must be written into the provided ctx. If no context is supplied, a new one should be created internally.

Parameters:
  • parameters (dict[str, Any]) – Mapping of parameter names to float values.

  • ctx (EvaluateContext | None) – Optional evaluation context. If None, a new EvaluateContext should be created.

Returns:

The computed scalar loss.

Return type:

float

Notes

  • Implementations should avoid mutating self during the call. All per-evaluation information should be placed in ctx instead.

  • This method is synchronous. For concurrent or asynchronous evaluation, use one EvaluateContext per call and invoke this method in multiple threads/tasks.

__init__(wrap_me: ObjectiveFunctor, swallow_exceptions: bool = False, log_exceptions: bool = True, value_bad_params: float = 100000.0)[source]

Initialize a fitter-specific objective wrapper.

This wrapper sits between a raw objective and an optimizer. It adds basic robustness and tracking behavior on top of the wrapped objective:

  • exceptions may be logged and optionally swallowed

  • non-scalar or NaN return values are replaced by a large penalty

  • the attached FitterEvaluateContext is updated with the number of evaluations and the best loss/parameters seen so far

Parameters:
  • wrap_me – Underlying objective functor to evaluate.

  • swallow_exceptions – If True, exceptions raised by the wrapped objective are converted into a penalized objective value instead of being re-raised.

  • log_exceptions – If True, exceptions raised by the wrapped objective are logged.

  • value_bad_params (float, optional) – Threshold used to represent invalid or numerically unstable parameter regions. Defaults to 1e5.

post_process_return_value(parameters: dict[str, Any], value: float | None, ctx: FitterEvaluateContext) float[source]

chemfit.fitter_callbacks module

Predefined callback utilities for the ChemFit fitter.

These callbacks provide common functionality such as logging optimization progress and persisting evaluation metadata during optimization runs.

class CheckpointBestParameters(path: Path | str)[source]

Bases: object

Callback that checkpoints the best parameters observed during fitting.

Whenever a new best loss is detected across the provided FitterEvaluateContext instances, the corresponding parameters and metadata are written to disk. The file is overwritten whenever a better solution is found.

This callback is useful for long-running optimizations, as it allows recovery of the best solution even if the optimization process crashes or is interrupted.

__call__(step: int, ctxs: list[FitterEvaluateContext])[source]

Call self as a function.

__init__(path: Path | str)[source]

Initialize the checkpoint callback.

Parameters:

path – File path where the best parameters will be written. The file is overwritten whenever a better loss is found.

class NumpyEncoder(*, skipkeys=False, ensure_ascii=True, check_circular=True, allow_nan=True, sort_keys=False, indent=None, separators=None, default=None)[source]

Bases: JSONEncoder

default(o: Any)[source]

Implement this method in a subclass such that it returns a serializable object for o, or calls the base implementation (to raise a TypeError).

For example, to support arbitrary iterators, you could implement default like this:

def default(self, o):
    try:
        iterable = iter(o)
    except TypeError:
        pass
    else:
        return list(iterable)
    # Let the base class default method raise the TypeError
    return super().default(o)
class SaveMetaData(output_folder: Path | str)[source]

Bases: object

__call__(step: int, ctxs: list[FitterEvaluateContext])[source]

Call self as a function.

__init__(output_folder: Path | str)[source]

Initialize a metadata-saving callback.

This callback writes the metadata of each evaluation context to JSON files during optimization.

Parameters:

output_folder – Directory where metadata files will be written. The directory is created if it does not already exist.

log_progress(step: int, ctxs: list[FitterEvaluateContext])[source]

Log optimization progress.

This callback prints a summary of the current optimization state for each evaluation context, including the current loss, parameters, and the best loss/parameters observed so far.

It also reports the best loss and parameter set across all contexts.

Parameters:
  • step – Current optimizer step index.

  • ctxs – List of FitterEvaluateContext instances used by the optimizer. Each context corresponds to one evaluation worker.

chemfit.kabsch module

apply_transform(P: ndarray[tuple[Any, ...], dtype[float64]], R: ndarray[tuple[Any, ...], dtype[float64]], t: ndarray[tuple[Any, ...], dtype[float64]]) ndarray[tuple[Any, ...], dtype[float64]][source]

Apply affine transform defined by rotation R and translation t to points P.

kabsch(P: ndarray[tuple[Any, ...], dtype[float64]], Q: ndarray[tuple[Any, ...], dtype[float64]], weights: ndarray[tuple[Any, ...], dtype[float64]] | None = None, allow_reflection: bool = False) tuple[ndarray[tuple[Any, ...], dtype[float64]], ndarray[tuple[Any, ...], dtype[float64]]][source]

Compute the optimal rigid transformation that aligns P onto Q using the Kabsch algorithm.

This implementation assumes row-vector points of shape (N, D) and solves for rotation R and translation t in the mapping:

Q ≈ P @ R + t

The solution minimizes the root-mean-square deviation (RMSD) between P transformed and Q, optionally with per-point weights.

The algorithm:

  1. Compute centroids of P and Q (weighted if weights provided).

  2. Subtract centroids to get centered coordinates P0, Q0.

  3. Compute the cross-covariance matrix:

    C = P0.T @ Q0          # (D, D) for row-vector convention
    
  4. Perform singular value decomposition:

    U, S, Vt = np.linalg.svd(C)
    
  5. Compute rotation:

    R = Vt.T @ U.T
    

    If allow_reflection is False and det(R) < 0, flip the sign of the last row of Vt before recomputing R to ensure a proper rotation (det(R) = +1).

  6. Compute translation:

    t = cQ - cP @ R
    
Parameters:
  • P (ndarray of shape (N, D)) – Source point coordinates.

  • Q (ndarray of shape (N, D)) – Target point coordinates, corresponding 1-to-1 with P.

  • weights (ndarray of shape (N,), optional) – Nonnegative weights for each correspondence. If provided, centroids and covariance are computed with these weights.

  • allow_reflection (bool, default=False) – If False, the solution will have det(R) >= 0 (proper rotation). If True, improper rotations (reflections) are allowed.

Returns:

  • R (ndarray of shape (D, D)): Optimal rotation matrix.

  • t (ndarray of shape (D,)): Translation vector.

Return type:

Tuple[ndarray, ndarray]

Raises:

ValueError – If P and Q have mismatched shapes, fewer than D points are provided, or if weights are invalid (negative, wrong shape, or zero sum).

Notes

  • Works for any dimensionality D >= 2.

  • For column-vector convention (R @ P + t), the covariance and multiplication order must be adjusted.

  • The returned transform is optimal in the least-squares sense and preserves distances (no scaling or shearing).

rmsd(A: ndarray[tuple[Any, ...], dtype[float64]], B: ndarray[tuple[Any, ...], dtype[float64]], weights: ndarray[tuple[Any, ...], dtype[float64]] | None = None) float[source]

Root mean square deviation between two point sets A and B.

chemfit.mpi_wrapper_cob module

class MPIWrapperCOB(cob: CombinedObjectiveFunction, comm: Any | None = None, mpi_debug_log: bool = False)[source]

Bases: ObjectiveFunctor

MPI-based wrapper for CombinedObjectiveFunction.

This wrapper distributes the terms of a combined objective across MPI ranks. Rank 0 acts as the driver rank: it broadcasts the evaluation context to all worker ranks, evaluates its own local slice of terms, gathers the worker results, re-raises any worker exceptions, collects child metadata, and applies the wrapped combined objective’s reduction.

Worker ranks do not call __call__ directly. Instead, they run worker_loop(), which waits for broadcast evaluation requests from rank 0 and processes the local slice assigned to that rank.

__call__(params: dict[str, Any], ctx: EvaluateContext | None = None) float[source]

Evaluate the combined objective on rank 0 using MPI.

Rank 0 broadcasts the evaluation context to all worker ranks, evaluates its own assigned slice locally, gathers the worker term values, re-raises any worker exceptions, gathers child metadata from all ranks, and reduces the full list of term values using the wrapped combined objective’s reduction function.

Parameters:
  • params – Parameter dictionary for the current evaluation.

  • ctx – Optional parent evaluation context. If None, a new EvaluateContext is created.

Returns:

Reduced scalar loss value.

Raises:
  • RuntimeError – If called on a nonzero rank.

  • Exception – Re-raises any exception returned from a worker rank.

Side Effects:
  • Stores params in ctx.parameters.

  • Broadcasts the evaluation context to all worker ranks.

  • Collects child metadata into ctx.meta["children"].

  • Stores the final reduced loss in ctx.loss.

__init__(cob: CombinedObjectiveFunction, comm: Any | None = None, mpi_debug_log: bool = False) None[source]

Initialize an MPI wrapper for a combined objective.

Parameters:
  • cob – Combined objective function whose terms are distributed across MPI ranks.

  • comm – MPI communicator to use. If None, a duplicate of MPI.COMM_WORLD is created.

  • mpi_debug_log – If True, wrap the communicator so that MPI method calls are logged for debugging.

Notes

Each rank is assigned a contiguous slice of objective terms at initialization time.

evaluate_slice(params: dict[str, Any], ctx: EvaluateContext) list[float | None][source]
gather_meta_data(ctx: EvaluateContext)[source]

Collect child metadata from all ranks into the parent context.

This method must be called only on rank 0. It collects the local child metadata from rank 0 together with the metadata gathered from worker ranks and stores the flattened result in ctx.meta["children"].

Parameters:

ctx – Parent evaluation context on rank 0.

Raises:

RuntimeError – If called on a nonzero rank.

Notes

The collected metadata is flattened across ranks. The resulting metadata may therefore contain child entries for ranks whose original child contexts are not present in ctx._children on rank 0.

release_workers()[source]
shifted_child_context_configurator(idx_child_ctx: int, child_ctx: EvaluateContext, num_children: int, parent_ctx: EvaluateContext)[source]

Invoke the parents child context configurator, while accounting for the slicing.

The goal of this function is to lead to the same behaviour as on the original combined objective function. This means the child_context_configurator has to “see” the idx of the current child not within the current slice, but the absolute index. Fort the same reason we overwrite the number of children.

Parameters:
  • idx_child_ctx (int) – The index of the current child context within the slice

  • child_ctx (EvaluateContext) – The child context

  • num_children (int) – The number of children within the current slice

  • parent_ctx (EvaluateContext) – The parent context.

worker_gather_meta_data(ctx: EvaluateContext)[source]
worker_loop()[source]

Run the worker-side MPI evaluation loop.

This method must be called only on nonzero ranks. Each worker rank waits for broadcast messages from rank 0. On receiving an EvaluateContext, it evaluates its assigned slice of the combined objective and gathers both term values and child metadata back to rank 0. On receiving Signal.ABORT, the loop exits.

Raises:

RuntimeError – If called on rank 0.

worker_process_params(params: dict[str, Any], ctx: EvaluateContext)[source]
class Signal(*values)[source]

Bases: Enum

ABORT = -1
slice_up_range(n: int, n_ranks: int)[source]

Split a range of length n into contiguous rank-local chunks.

The chunks are distributed as evenly as possible across n_ranks by using a ceiling-based chunk size.

Parameters:
  • n – Total number of items to distribute.

  • n_ranks – Number of MPI ranks.

Yields:

Tuples (start, end) defining half-open index ranges for each rank.

chemfit.plot_utils module

plot_energies(energy_ref: Sequence[float], energy_fit: Sequence[float], n_atoms: Sequence[int], tags: Sequence[str], output_folder: Path) None[source]
plot_progress_curve(progress: list[float], outpath: Path) None[source]

Save a semi-log plot of the objective values (progress) versus iteration index.

tags_as_ticks(ax: Axes, tags: Sequence[str], **kwargs)[source]

chemfit.scme_factories module

class SCMECalculatorFactory(default_scme_params: dict[str, Any], path_to_scme_expansions: Path | None, parametrization_key: str | None)[source]

Bases: object

__call__(atoms: Atoms) Any[source]

Call self as a function.

__init__(default_scme_params: dict[str, Any], path_to_scme_expansions: Path | None, parametrization_key: str | None) None[source]

Create an SCME calculator.

class SCMEParameterApplier[source]

Bases: object

__call__(atoms: Atoms, params: dict[str, Any]) None[source]

Assign SCME parameter values to the attached calculator.

chemfit.scme_setup module

arrange_water_in_ohh_order(atoms: Atoms) Atoms[source]

Reorder atoms so each water molecule appears as O, H, H.

Parameters:

atoms (Atoms) – Original Atoms object containing water molecules.

Returns:

New Atoms object with OHH ordering and no constraints.

Return type:

Atoms

Raises:

ValueError – If atom counts or ratios are inconsistent with water.

check_water_is_in_ohh_order(atoms: Atoms, oh_distance_tol: float = 2.0) bool[source]

Validate that each water molecule is ordered O, H, H and within tolerance.

Parameters:
  • atoms (Atoms) – Atoms object to validate.

  • OH_distance_tol (float, optional) – Maximum allowed O-H distance (default is 2.0 Å).

Raises:

ValueError – If ordering or distances violate water OHH assumptions.

setup_calculator(atoms: Atoms, params: dict[str, Any], path_to_scme_expansions: Path | None, parametrization_key: str | None) pyscme.scme_calculator.SCMECalculator[source]
setup_expansions(calc: pyscme.scme_calculator.SCMECalculator, parametrization_key: str, path_to_scme_expansions: Path)[source]

chemfit.utils module

class ExtendedJSONEncoder(*, skipkeys=False, ensure_ascii=True, check_circular=True, allow_nan=True, sort_keys=False, indent=None, separators=None, default=None)[source]

Bases: JSONEncoder

default(o: Any)[source]

Implement this method in a subclass such that it returns a serializable object for o, or calls the base implementation (to raise a TypeError).

For example, to support arbitrary iterators, you could implement default like this:

def default(self, o):
    try:
        iterable = iter(o)
    except TypeError:
        pass
    else:
        return list(iterable)
    # Let the base class default method raise the TypeError
    return super().default(o)
check_params_near_bounds(params: dict[str, Any], bounds: dict[str, Any], relative_tol: float) list[tuple[str, float, float, float]][source]

Check if any of the parameters are near or beyond the bounds.

The criterions checked are

  1. param_value < lower + relative_tol * (upper - lower)

  2. param_value > upper - relative_tol * (upper - lower)

Parameters:
  • params (dict) – the dict of params to check

  • bounds (dict) – the dict of bounds to check

  • relative_tol (float) – The tolerance, relative to the span of the bounds. Positive numbers mean the values must fulfill a stricter bound Zero means the values must fulfill the exact bound Negative numbers mean the values must fulfill a looser bound

Returns:

A list of tuples with information about parameters, which violate the constraint. Each tuple contains - A string identifying the parameter in a flattened dict - The value of the parameter - The lower bound - The upper bound

check_protocol(obj: Any | None, prot: Any)[source]
dump_dict_to_file(file: Path, dictionary: dict) None[source]

Write dictionary as JSON to file (with indent=4).

next_free_folder(base: Path) Path[source]

If ‘path/to/base’ does not exist, return ‘path/to/base’. Otherwise attempt ‘path/to/base_0’, ‘path/to/base_1’, etc. until finding a non-existent Path, then return that.

chemfit.wrap_funcs module

class WrappedObjectiveFunctor(func: Callable[[dict[str, Any]], float] | Callable[[dict[str, Any], EvaluateContext], float], pass_ctx: bool = False, func_args: tuple[Any] | None = None, func_kwargs: dict[str, Any] | None = None)[source]

Bases: ObjectiveFunctor

__call__(parameters: dict[str, Any], ctx: EvaluateContext | None = None) float[source]

Evaluate the wrapped callable as an objective functor.

If pass_ctx is True, the wrapped callable receives both the parameter dictionary and the evaluation context. Otherwise, it receives only the parameter dictionary.

Parameters:
  • parameters – Parameter dictionary for the current evaluation.

  • ctx – Optional evaluation context. If None, a new EvaluateContext is created.

Returns:

Scalar loss value returned by the wrapped callable.

Side Effects:
  • Stores parameters in ctx.parameters.

  • Stores the returned loss in ctx.loss.

__init__(func: Callable[[dict[str, Any]], float] | Callable[[dict[str, Any], EvaluateContext], float], pass_ctx: bool = False, func_args: tuple[Any] | None = None, func_kwargs: dict[str, Any] | None = None)[source]

Initialize a wrapped objective functor.

Parameters:
  • func – Callable to wrap as an ObjectiveFunctor. The callable may either accept only parameters or accept both parameters and ctx.

  • pass_ctx – If True, call func(parameters, ctx). If False, call func(parameters).

bind(*args: Any, **kwargs: Any) WrappedObjectiveFunctor[source]

Return a new quantity computer with extra arguments bound.

The bound arguments are passed to the wrapped function in addition to the usual ChemFit arguments.

Parameters:
  • *args – Positional arguments to bind after parameters (and after ctx as well if pass_ctx=True).

  • **kwargs – Keyword arguments to bind.

Returns:

A new wrapped quantity computer with the requested arguments pre-applied.

class WrappedQuantityComputer(func: Callable[[...], dict[str, Any]], pass_ctx: bool = False, func_args: tuple[Any] | None = None, func_kwargs: dict[str, Any] | None = None)[source]

Bases: QuantityComputer

__init__(func: Callable[[...], dict[str, Any]], pass_ctx: bool = False, func_args: tuple[Any] | None = None, func_kwargs: dict[str, Any] | None = None)[source]

Initialize a wrapped quantity computer.

Parameters:
  • func – Callable to wrap as a QuantityComputer. The callable may either accept only parameters or accept both parameters and ctx.

  • pass_ctx – If True, call func(parameters, ctx). If False, call func(parameters).

bind(*args: Any, **kwargs: Any) WrappedQuantityComputer[source]

Return a new quantity computer with extra arguments bound.

The bound arguments are passed to the wrapped function in addition to the usual ChemFit arguments.

Parameters:
  • *args – Positional arguments to bind after parameters (and after ctx as well if pass_ctx=True).

  • **kwargs – Keyword arguments to bind.

Returns:

A new wrapped quantity computer with the requested arguments pre-applied.

to_objective_functor(pass_ctx: bool = False)[source]

Create a decorator that wraps a callable as an objective functor.

Parameters:

pass_ctx – If True, the decorated callable is expected to accept (parameters, ctx). Otherwise, it is expected to accept only (parameters).

Returns:

Decorator that converts a compatible callable into a WrappedObjectiveFunctor.

to_quantity_computer(pass_ctx: bool = False)[source]

Create a decorator that wraps a callable as a quantity computer.

Parameters:

pass_ctx – If True, the decorated callable is expected to accept (parameters, ctx). Otherwise, it is expected to accept only (parameters).

Returns:

Decorator that converts a compatible callable into a WrappedQuantityComputer.

Module contents