Source code for chemfit.file_based_computer

from __future__ import annotations

import copy
import functools
import shutil
import subprocess
import threading
import time
import uuid
from typing import TYPE_CHECKING, Any, Callable, Protocol, cast, runtime_checkable

from typing_extensions import Self

from chemfit.abstract_objective_function import EvaluateContext, QuantityComputer
from chemfit.utils import check_protocol

if TYPE_CHECKING:
    from collections.abc import Iterable

import logging
from pathlib import Path

logger = logging.getLogger(__name__)


[docs] @runtime_checkable class OutputParser(Protocol): """Protocol for parsing output files into a quantity dictionary."""
[docs] def __call__(self, output_files: list[Path]) -> dict[str, Any]: """ Parse the output files and retrieve the quantities. Args: output_files (list[Path]): List of paths to output files. These are typically located in the working directory of a single evaluation. Returns: dict[str, Any]: Dictionary of parsed quantities. """ ...
[docs] @runtime_checkable class PreSubmitHook(Protocol): """Protocol for running things before the command is submitted."""
[docs] def __call__(self, parameters: dict[str, Any], workdir: Path) -> None: """ Run pre-submit actions. Args: parameters (dict[str, Any]): Parameter dictionary for the evaluation. workdir (Path): Temporary working directory for this evaluation. """
CommandType = Callable[[dict[str, Any], Path], list[str]]
[docs] class FileBasedQuantityComputer(QuantityComputer):
[docs] def __init__( self, output_files: list[Path | str], output_parsers: list[OutputParser] | OutputParser, base_working_directory: Path | str, executable_cmd: CommandType | None = None, presubmit_hook: PreSubmitHook | None = None, wait_timeout: float | None = 500.0, poll_interval: float = 1, subprocess_run_args: dict | None = None, delete_temp_workdirs: bool = True, write_dump_file_after_crash: bool = True, keep_temp_workdir_after_crash: bool = True, ): """ Initialize a file-based quantity computer. This quantity computer evaluates parameters by creating a temporary working directory, executing an external command, waiting for the expected output files to appear, and parsing those files into a quantity dictionary. Args: output_files (list[Path]): Paths to output files that are expected to be created by the external command. These paths must be **relative** to the working directory; absolute paths are not allowed. executable_cmd (Callable[[dict[str, Any], Path], list[str]]): Callable that constructs the command to execute. It receives the parameter dictionary and the temporary working directory, and must return a list of strings suitable for ``subprocess.run``. output_parsers (list[OutputParser] | OutputParser): One or more output parsers called after the external command completes and the output files exist. Each parser receives the list of output file paths and returns a dictionary of quantities. The results of all parsers are merged. base_working_directory (Path): Base directory under which temporary working directories will be created, one per evaluation. presubmit_hook (PreSubmitHook | None, optional): Optional hook executed before the external command is run. It can be used to prepare input files, templates, etc. wait_timeout (float, optional): Maximum time in seconds to wait for all output files to appear. Defaults to 500.0 seconds. poll_interval (float, optional): Interval in seconds between checks for output file creation. Defaults to 1 second. subprocess_run_args (dict | None, optional): Additional keyword arguments forwarded to ``subprocess.run`` (e.g. ``capture_output=True``). Defaults to None. delete_temp_workdirs (bool, optional): Whether to delete temporary working directories after each evaluation. Defaults to True. write_dump_file_after_crash: Whether to write a dump file with subprocess output when command execution fails. keep_temp_workdir_after_crash: Whether to keep the temporary working directory for inspection after a failed evaluation. Raises: Exception: If any path in `output_files` is absolute rather than relative. """ super().__init__() self.output_files = [Path(f) for f in output_files] self.base_working_directory = Path(base_working_directory) self.write_dump_file_after_crash = write_dump_file_after_crash self.keep_temp_workdir_after_crash = keep_temp_workdir_after_crash # We need to make sure none of the output files is absolute. # The reason for this is that, to facilitate multiple concurrent evaluations, # we may have to create temporary working directories so that concurrent runs do not mess with each others outputs. # Then the relative paths are used to place the output files relative ot the temporary working directory. if any(of.is_absolute() for of in self.output_files): msg = "One of the output files is an absolute path. All output paths need to be relative to the working directory." raise Exception(msg) if subprocess_run_args is None: self.subprocess_run_args = {"capture_output": True} else: self.subprocess_run_args = subprocess_run_args self.executable_cmd: CommandType | None = executable_cmd # Make sure that, if a single OutputParser has been passed, we turn it into a list with on element if isinstance(output_parsers, OutputParser): self.output_parsers = [output_parsers] else: self.output_parsers = output_parsers for o in self.output_parsers: check_protocol(o, OutputParser) self.presubmit_hook = presubmit_hook check_protocol(self.presubmit_hook, PreSubmitHook) self.wait_timeout = wait_timeout self.poll_interval = poll_interval self.delete_temp_workdirs = delete_temp_workdirs self.retries_output_parsing = 1
[docs] def create_temp_workdir(self) -> Path: """ Create and return a fresh temporary working directory. Returns: Path to the newly created working directory. """ name = str(uuid.uuid4()) temp_workdir = self.base_working_directory / name temp_workdir.mkdir(exist_ok=False, parents=True) return temp_workdir
[docs] def with_presubmit(self, presubmit: Callable[..., None], /, **kwargs: Any) -> Self: """ Return a copy of this computer with a bound presubmit hook. The provided ``presubmit`` callable may accept additional keyword arguments beyond ``(parameters, workdir)``. These are bound via ``kwargs`` and the resulting callable is stored as the presubmit hook. This is a convenience wrapper around ``functools.partial`` that avoids requiring users to manually construct partial functions. Args: presubmit: Callable executed before the command is run. Must accept ``(parameters: dict[str, Any], workdir: Path, ...)`` where any additional arguments are keyword-only. **kwargs: Keyword arguments to bind to ``presubmit``. Returns: A new ``FileBasedQuantityComputer`` instance with the updated presubmit hook. Example: >>> from chemfit.file_based_computer import FileBasedQuantityComputer >>> computer = FileBasedQuantityComputer( ... output_files=["out.txt"], ... output_parsers=[], ... base_working_directory="workdir" ... ) >>> def write_input(parameters, workdir, *, template_path): ... ... >>> computer2 = computer.with_presubmit( ... write_input, ... template_path="INCAR.template", ... ) Note: Additional arguments must be keyword-only in ``presubmit``. """ new = copy.copy(self) new.presubmit_hook = functools.partial(presubmit, **kwargs) return new
[docs] def with_cmd( self, executable_cmd: Callable[..., list[str]], /, **kwargs: Any, ) -> Self: """ Return a copy of this computer with a bound command function. The provided ``executable_cmd`` may accept additional keyword arguments beyond ``(parameters, workdir)``. These are bound via ``kwargs`` and the resulting callable is stored as the command builder. This is a convenience wrapper around ``functools.partial`` that avoids requiring users to manually construct partial functions. Args: executable_cmd: Callable used to construct the command. Must accept ``(parameters: dict[str, Any], workdir: Path, ...)`` where any additional arguments are keyword-only. **kwargs: Keyword arguments to bind to ``executable_cmd``. Returns: A new ``FileBasedQuantityComputer`` instance with the updated command function. Example: >>> from chemfit.file_based_computer import FileBasedQuantityComputer >>> computer = FileBasedQuantityComputer( ... output_files=["out.txt"], ... output_parsers=[], ... base_working_directory="workdir" ... ) >>> def write_input(parameters, workdir, *, template_path): ... ... >>> computer2 = computer.with_presubmit( ... write_input, ... template_path="INCAR.template", ... ) Note: Additional arguments must be keyword-only in ``executable_cmd``. """ new = copy.copy(self) new.executable_cmd = functools.partial(executable_cmd, **kwargs) return new
[docs] def build_cmd(self, parameters: dict[str, Any], ctx: EvaluateContext) -> list[str]: """ Build the external command for the current evaluation. Args: parameters: Parameter dictionary for the current evaluation. ctx: Evaluation context whose temporary working directory is used when constructing the command. Returns: Command to execute, formatted for ``subprocess.run``. """ self.executable_cmd = cast("CommandType", self.executable_cmd) return self.executable_cmd(parameters, ctx.temp.workdir)
def _compute( # noqa: PLR0912, PLR0915 self, parameters: dict[str, Any], ctx: EvaluateContext, ) -> dict[str, Any]: """ Execute external command and parse parse its output files. This method implements the core logic: 1. Create a temporary working directory. 2. Optionally run a pre-submit hook. 3. Build and execute the external command via ``subprocess.run``. 4. Wait until all configured output files exist (or timeout). 5. Run the configured output parsers and merge the resulting quantity dictionaries. 6. Optionally delete the temporary working directory. Args: parameters: Parameter dictionary for this evaluation. ctx: Evaluation context for this call. The temporary working directory, resolved output file paths, and executed command are stored in ``ctx.temp``. Returns: Dictionary of parsed quantities. Side Effects: - Creates and stores ``ctx.temp.workdir``. - Stores the resolved output file paths in ``ctx.temp.output_files``. - Stores the executed command in ``ctx.temp.cmd``. - Creates and optionally deletes a temporary working directory. - Runs an external subprocess. Raises: TimeoutError: If the configured output files do not appear before ``wait_timeout`` expires. Exception: If subprocess execution fails, output parsing fails, or temporary working-directory management fails. ---------------------- IMPORTANT WARNINGS ---------------------- **1. Use with sbatch / job schedulers** Commands like ``sbatch`` (SLURM), ``qsub`` (Torque/PBS), ``bsub`` (LSF), or any *queueing* submission command typically return **immediately** from ``subprocess.run``. The actual compute job may start minutes or hours later. - The `wait_timeout` starts counting **as soon as `subprocess.run` returns**, *not* when the job begins executing. - This almost always causes a timeout if users submit through a scheduler. **Recommended workaround:** - Modify your job script to create a **completion flag file**, e.g.: .. code-block:: bash # at end of your SLURM job script touch task.done - Then configure `output_files=[Path("task.done")]` **in addition to** your real output files. This ensures `FileBasedQuantityComputer` waits for job completion rather than the output file prematurely appearing or remaining absent. **2. Timeout awareness** The `wait_timeout` applies to the *combined* waiting time after the external command returns. - Use very large timeouts (or None) or a reliable completion flag for scheduler-based workloads. - If timeout is too small, you will get a `TimeoutError`. **3. Programs that stream output** Many scientific programs create output files early and append to them as they run. Because this class only checks **existence**, not completeness: - An output file may exist while the job is still running. - Parsers may read incomplete or partially written files. **Solutions:** - Use a completion marker file (as above). - Or implement a parser that verifies file completeness (checksum, fixed-size, closing footer, etc.). **4. Crash diagnostics and dump files** If the external command fails (i.e. ``subprocess.run`` raises ``subprocess.CalledProcessError``), this class can optionally write a diagnostic dump file to help with debugging. When ``write_dump_file_after_crash=True``, a dump file is written to the base working directory using the name of the temporary work directory with the suffix ``.dump``. For example: base_working_directory/ 7f4d9c1a-8cbb-4b6f-b88c-8a1c53eae6c3/ 7f4d9c1a-8cbb-4b6f-b88c-8a1c53eae6c3.dump The dump file contains diagnostic information including: - the captured ``stderr`` of the external program (if available) - the captured ``stdout`` of the external program (if available) - the contents of ``ctx.temp`` at the time of failure This information often provides enough context to diagnose failures without needing to reproduce the run manually. The dump file is especially useful when: - evaluations run inside large optimization loops - runs are executed on remote clusters - temporary working directories are automatically deleted If ``keep_temp_workdir_after_crash=True``, the temporary working directory is preserved for manual inspection. Otherwise it may be deleted depending on the configuration. Keeping both the dump file and the working directory can make it much easier to reproduce and debug failed evaluations. """ if self.executable_cmd is None: msg = "No executable command has been attached. Supply it either in the constructor or use the `with_cmd` method." raise Exception(msg) # Create a temporary working directory ctx.temp.workdir = self.create_temp_workdir() try: ctx.temp.output_files = [ctx.temp.workdir / o for o in self.output_files] if self.presubmit_hook is not None: self.presubmit_hook(parameters, ctx.temp.workdir) cmd = self.build_cmd(parameters, ctx) ctx.temp.cmd = cmd # Spin up the watcher BEFORE running the command to avoid race conditions ready = threading.Event() stop = threading.Event() watcher = threading.Thread( target=self._file_watch_loop, args=(ctx.temp.output_files, ready, stop), daemon=True, ) watcher.start() try: # Run the external program (raises on non-zero exit) subprocess.run( # noqa: S603 cmd, # type: ignore check=True, cwd=ctx.temp.workdir, **self.subprocess_run_args, ) # type: ignore except subprocess.CalledProcessError as e: msg = ( f"Exception in `subprocess.run` of FileBasedQuantityComputer.\n" f" ctx.temp = {ctx.temp}" ) if e.stderr is not None: msg += f" stderr (if captured) = {e.stderr.decode('utf-8')}\n" # Try to write a dump file if self.write_dump_file_after_crash: dump_path = ( self.base_working_directory / ctx.temp.workdir.name ).with_suffix(".dump") try: with dump_path.open("w") as f: if e.stderr is not None: f.write("Stderr:\n") f.write(e.stderr.decode("utf-8")) if e.stdout is not None: f.write("Stdout:\n") f.write(e.stdout.decode("utf-8")) f.write("ctx.temp:\n") f.write(f"{ctx.temp}") msg += f"\nWrote dump file to `{dump_path}`." except Exception as exc_dump: msg += f"\nCould not write dump file to `{dump_path}`, because of {exc_dump}." raise Exception(msg) from e # Block here until file appears (or timeout) # The main reason to implement this extra check is to eventually support remote execution, e.g. on clusters # A script submitted with `sbatch` for example would immediately return from `subprocess.run`, but the necessary output files # would not be present until the submitted script has actually run on one of the compute nodes. # Therefore, waiting until the output files are actually present is a valid strategy. # Of course, we might still run into problems in the case of output files which get continuously appended to. # These could be present already, but not complete and thus fool us into thinking that the script has completed it's run. if all(o.exists() for o in ctx.temp.output_files): # We do one immediate check on the main thread stop.set() else: ok = ready.wait(timeout=self.wait_timeout) stop.set() watcher.join(timeout=1) if not ok: err_message = f"Timed out waiting for {ctx.temp.output_files}" raise TimeoutError(err_message) res = {} for o in self.output_parsers: success = False # First we perform the retries while silencing all exceptions for _ in range(self.retries_output_parsing): try: res.update(o(ctx.temp.output_files)) success = True break except Exception as e: # noqa: F841, S112 continue # If we have not succeeded so far, the (retries + 1)th (aka the last) # attempt is made without a try block, so that we get to handle the actual exception if not success: res.update(o(ctx.temp.output_files)) except Exception as e: msg = ( "Exception in `_compute` of FileBasedQuantityComputer.\n" f" ctx.temp = {ctx.temp}" ) if self.delete_temp_workdirs and not self.keep_temp_workdir_after_crash: shutil.rmtree(ctx.temp.workdir) else: msg += ( f"\nKeeping temporary workdir '{ctx.temp.workdir}' for inspection" ) raise Exception(msg) from e else: if self.delete_temp_workdirs: shutil.rmtree(ctx.temp.workdir) return res def _file_watch_loop( self, output_files: Iterable[Path], ready: threading.Event, stop: threading.Event, ) -> None: # check if files are there while not stop.is_set(): files_created = all(o.exists() for o in output_files) if files_created: ready.set() return time.sleep(self.poll_interval)