API Reference

class exca.TaskInfra(*, folder: Path | str | None = None, cluster: Literal[None, 'auto', 'local', 'slurm', 'debug'] = None, logs: Path | str = '{folder}/logs/{user}/%j', job_name: str | None = None, timeout_min: int | None = None, nodes: int | None = 1, tasks_per_node: int | None = 1, cpus_per_task: int | None = None, gpus_per_node: int | None = None, mem_gb: float | None = None, slurm_constraint: str | None = None, slurm_partition: str | None = None, slurm_account: str | None = None, slurm_qos: str | None = None, slurm_use_srun: bool = False, slurm_additional_parameters: Dict[str, int | str | float | bool] | None = None, conda_env: Path | str | None = None, workdir: None | WorkDir = None, permissions: int | str | None = 511, version: str = '0', mode: Literal['cached', 'retry', 'force', 'read-only'] = 'cached', keep_in_ram: bool = False)[source]

Processing/caching infrastructure ready to be applied to a pydantic.BaseModel method. To use it, the configuration can be set as an attribute of a pydantic BaseModel, then @infra.apply must be set on the method to process/cache this will effectively replace the function with a cached/remotely-computed version of itself

Parameters

folder: optional Path or str

Path to directory for dumping/loading the cache on disk, if provided

keep_in_ram: bool

if True, adds a cache in RAM of the data once loaded (similar to LRU cache)

mode: str
One of the following:
  • "cached": cache is returned if available (error or not), otherwise computed (and cached)

  • "retry": cache is returned if available except if it’s an error, otherwise (re)computed (and cached)

  • "force": cache is ignored, and result are (re)computed (and cached)

  • "read-only": never compute anything

Slurm/submitit parameters

Check out exca.slurm.SubmititMixin

Note

  • the method must take as input an iterable of items of a type X, and yield one output of a type Y for each input.

apply(arg: C, /) C[source]
apply(exclude_from_cache_uid: Iterable[str] | Callable[[Any], Iterable[str]] = ()) Callable[[C], C]

Applies the infra on a method taking no parameter (except self)

Parameters

method: callable

a method of a pydantic.BaseModel taking as input an iterable of items of a type X, and yielding one output of a type Y for each input item.

exclude_from_cache_uid: iterable of str / method / method name

fields that must be removed from the uid of the cache (in addition to the ones already removed from the class uid)

Usage

either decorate with @infra.apply or @infra.apply(exclude_from_cache_uid=<whatever>)

clear_job() None[source]

Clears and possibly cancels this task’s job so that the computation is rerun at the next call

clone_obj(*args: Dict[str, Any], **kwargs: Any) Any[source]

Create a new decorated object by applying a diff config to the underlying object

config(uid: bool = True, exclude_defaults: bool = False) ConfDict[source]

Exports the task configuration as a ConfigDict ConfDict are dict which split on “.” with extra flatten, to_uid and to_yaml features

Parameters

uid: bool

if True, uses the _exclude_from_cls_uid field/method to filter in and out some fields

exclude_defaults: bool

if True, values that are set to defaults are not included

iter_cached() Iterable[BaseModel][source]

Iterate over similar tasks in the cache folder

job() Job[Any] | LocalJob[source]

Creates or reload the job corresponding to the task

job_array(max_workers: int = 256) Iterator[List[Any]][source]

Creates a list object to populate The tasks in the list will be sent as a job array when exiting the context

obj_infras() Mapping[str, BaseInfra][source]

Returns a dictionary of all infras part of the current model/config (including sub-configs)

status() Literal['not submitted', 'running', 'completed', 'failed'][source]

Provides the status of the job This can be one of “not submitted”, “running”, “completed” or “failed”

uid() str[source]

Returns the unique uid of the task

uid_folder(create: bool = False) Path | None[source]

Folder where this task instance is stored

class exca.MapInfra(*, folder: Path | str | None = None, cluster: Literal[None, 'auto', 'local', 'slurm', 'debug', 'threadpool', 'processpool'] = None, logs: Path | str = '{folder}/logs/{user}/%j', job_name: str | None = None, timeout_min: int | None = None, nodes: int | None = 1, tasks_per_node: int | None = 1, cpus_per_task: int | None = None, gpus_per_node: int | None = None, mem_gb: float | None = None, slurm_constraint: str | None = None, slurm_partition: str | None = None, slurm_account: str | None = None, slurm_qos: str | None = None, slurm_use_srun: bool = False, slurm_additional_parameters: Dict[str, int | str | float | bool] | None = None, conda_env: Path | str | None = None, workdir: None | WorkDir = None, permissions: int | str | None = 511, version: str = '0', keep_in_ram: bool = True, max_jobs: int | None = 128, min_samples_per_job: int = 2048, forbid_single_item_computation: bool = False, mode: Literal['cached', 'force', 'read-only'] = 'cached')[source]

Processing/caching infrastructure ready to be applied to a pydantic.BaseModel method. To use it, the configuration can be set as an attribute of a pydantic BaseModel, then @infra.apply(item_uid) must be set on the method to process/cache this will effectively replace the function with a cached/remotely-computed version of itself

Parameters

folder: optional Path or str

Path to directory for dumping/loading the cache on disk, if provided

keep_in_ram: bool

if True, adds a cache in RAM of the data once loaded (similar to LRU cache)

mode: str
One of the following:
  • "cached": cache is returned if available (error or not), otherwise computed (and cached)

  • "force": cache is ignored, and result are (re)computed (and cached)

  • "read-only": never compute anything

cluster: optional str
Where to run the computation, one of:
  • None: runs in the current thread

  • "debug": submitit debug executor (runs in the current process with ipdb)

  • "local": submitit local executor (runs in a dedicated subprocess)

  • "slurm": submitit slurm executor (runs in a slurm cluster)

  • "auto": submitit auto executor (uses slurm if available, otherwise local)

  • "processpool": runs locally in a concurrent.future.ProcessPoolExecutor

  • "threadpool": runs locally in a concurrent.future.ThreadPoolExecutor

max_jobs: optional int

maximum number of submitit jobs or process/thread workers to submit for running all the map processing

min_samples_per_job: optional int

minimum number of samples to compute within each job

forbid_single_item_computation: bool

raises if a single item needs to be computed. This can help detect issues (and overloading the cluster) when all items are supposed to have been precomputed.

Slurm/submitit parameters

Check out exca.slurm.SubmititMixin

Note

  • the decorated method must take as input an iterable of items of a type X, and yield one output of a type Y for each input.

apply(*, item_uid: Callable[[Any], str], exclude_from_cache_uid: Iterable[str] | Callable[[Any], Iterable[str]] = (), cache_type: str | None = None) Callable[[C], C][source]

Applies the infra on a method taking an iterable of items as input

Parameters

method: callable

a method of a pydantic.BaseModel taking as input an iterable of items of a type X, and yielding one output of a type Y for each input item.

item_uid: callable from item to str

function returning a uid from the item of a map

exclude_from_cache_uid: iterable of str / method / method name

fields that must be removed from the uid of the cache (in addition to the ones already removed from the class uid)

cache_type: str

name of the cache class to use (inferred by default) this can for instance be used to enforce eg a memmap instead of loading arrays

Usage

Decorate the method with @infra.apply(item_uid=<function>) (as well as any additional parameter you want to set)

clone_obj(*args: Dict[str, Any], **kwargs: Any) Any[source]

Create a new decorated object by applying a diff config to the underlying object

config(uid: bool = True, exclude_defaults: bool = False) ConfDict[source]

Exports the task configuration as a ConfigDict ConfDict are dict which split on “.” with extra flatten, to_uid and to_yaml features

Parameters

uid: bool

if True, uses the _exclude_from_cls_uid field/method to filter in and out some fields

exclude_defaults: bool

if True, values that are set to defaults are not included

iter_cached() Iterable[BaseModel][source]

Iterate over similar objects in the cache folder

obj_infras() Mapping[str, BaseInfra][source]

Returns a dictionary of all infras part of the current model/config (including sub-configs)

uid() str[source]

Returns the unique uid of the task

uid_folder(create: bool = False) Path | None[source]

Folder where this task instance is stored

Associated classes and functions

class exca.slurm.SubmititMixin(*, folder: Path | str | None = None, cluster: Literal[None, 'auto', 'local', 'slurm', 'debug'] = None, logs: Path | str = '{folder}/logs/{user}/%j', job_name: str | None = None, timeout_min: int | None = None, nodes: int | None = 1, tasks_per_node: int | None = 1, cpus_per_task: int | None = None, gpus_per_node: int | None = None, mem_gb: float | None = None, slurm_constraint: str | None = None, slurm_partition: str | None = None, slurm_account: str | None = None, slurm_qos: str | None = None, slurm_use_srun: bool = False, slurm_additional_parameters: Dict[str, int | str | float | bool] | None = None, conda_env: Path | str | None = None, workdir: None | WorkDir = None)[source]

Mixin class for creating a submitit runner infra

Parameters

folder: optional Path or str

Path to directory for dumping/loading the cache on disk, if provided

cluster: optional str
Where to run the computation, one of:
  • None: runs in the current thread

  • "debug": submitit debug executor (runs in the current process with ipdb)

  • "local": submitit local executor (runs in a dedicated subprocess)

  • "slurm": submitit slurm executor (runs in a slurm cluster)

  • "auto": submitit auto executor (uses slurm if available, otherwise local)

logs: Path or str

path to the logs for slurm/local jobs. One can use {folder} in the string to define logs as a subfolder of the storage folder, {user} for the user name and %j (slurm syntax) for the job id

workdir: optional exca.workdir.WorkDir

pydantic config defining whether and how to copy the current workspace to a directory specific for the job and avoid interferences when working on the code. See exca.workdir.WorkDir for details.

name: optional str

name of the job

timeout_min: optional int

timeout for slurm/local jobs

nodes: optional int

number of nodes for slurm jobs

tasks_per_node: optional int

number of task nodes for slurm jobs

cpus_per_task: optional int

number of cpus per task for slurm jobs

gpus_per_node: optional int

number of gpus per node for slurm jobs

mem_gb: float

RAM memory to be used in GB

slurm_constraint: optional str

node constraint for the job

slurm_account: optional str

account to use for the job

slurm_qos: optional str

qos to use for the job

slurm_partition: optional str

partition for the slurm job

slurm_use_srun: bool

use srun in the sbatch file. This is the default in submitit, but not adviced for jobs triggering more jobs.

slurm_additional_parameters: optional dict

additional parameters for slurm that are not first class parameters of this config

conda_env: optional str/path

path or name of a conda environment to use in the job

class exca.workdir.WorkDir(*, copied: Sequence[str | Path] = [], folder: str | Path | None = None, log_commit: bool = False, includes: Sequence[str] = (), excludes: Sequence[str] = ('__pycache__', '.git'))[source]

Custom working directory configuration

Parameters

copied: Sequence[str]

list/tuple of names of files, or folders, or packages installed in editable mode to copy to the new working directory folder.

folder: Path/str

folder to use as working directory, if not specified, infra will create one automatically <infra_uid_folder>/code/<date>-<random_uid>/. The folder is logged so you should be able to see what happened in your stderr/stdout. This parameter can be used in particular to store the code in a specific location or reuse workdir from a previous run.

includes: sequence of str

file name pattern than must be included (recursively) folder are always included except if explitely excluded eg: ["*.py"] to include only python files

excludes: sequence of str

file/folder name pattern than mush be excluded

log_commit: bool

if True, raises if current working directory is in a git repository with uncommited changes and logs commit otherwise

Notes

  • Since python privileges current working directory over installed packages, the copied packages should be the one running in the job (be careful there can be a few gotchas, eg: for debug cluster or with no cluster, the import cannot be not reloaded so the current working directory will be used, but that should not make a difference in theses cases)

  • The change of working directory (and possibly the copy) only happens when the infra is called for submitting the decorated function. Depending on your code, this may not be at the very beginning of your execution.

class exca.ConfDict(mapping: MutableMapping[str, Any] | Iterable[Tuple[str, Any]] | None = None, **kwargs: Any)[source]

Dictionary which breaks into sub-dictionnaries on “.” as in a config (see example) The data can be specified either through “.” keywords or directly through sub-dicts or a mixture of both. Lists of dictionaries are processed as list of ConfDict Also, it has yaml export capabilities as well as uid computation.

Example

ConDict({"training.optim.lr": 0.01}) == {"training": {"optim": {"lr": 0.01}}}

Note

This is designed for configurations, so it probably does not scale well to 100k+ keys

flat() Dict[str, Any][source]

Returns a flat dictionary such as {“training.dataloader.lr”: 0.01, “training.optim.name”: “Ada”}

classmethod from_args(args: list[str]) ConfDict[source]

Parses a list of Bash-style arguments (e.g., –key=value) into a ConfDict. typically used as MyConfig(**ConfDict(sys.argv[1:])) This method supports sub-arguments eg: --optimizer.lr=0.01

classmethod from_model(model: BaseModel, uid: bool = False, exclude_defaults: bool = False) ConfDict[source]

Creates a ConfDict based on a pydantic model

Parameters

model: pydantic.BaseModel

the model to convert into a dictionary

uid: bool

if True, uses the _exclude_from_cls_uid field/method to filter in and out some fields

exclude_defaults: bool

if True, values that are set to defaults are not included

Note

_exclude_from_cls_uid needs needs to be a list/tuple/set (or classmethod returning it) with either of both of fields: - exclude: tuple of field names to be excluded - force_include: tuple of fields to include in all cases (even if excluded or set to defaults)

classmethod from_yaml(yaml: str | Path | IO[str] | IO[bytes]) ConfDict[source]

Loads a ConfDict from a yaml string/filepath/file handle.

get(key: str, default: Any | None = None) Any[source]

Return the value for key if key is in the dictionary, else default.

pop(k[, d]) v, remove specified key and return the corresponding value.[source]

If the key is not found, return the default if given; otherwise, raise a KeyError.

to_uid() str[source]

Provides a unique string for the config

to_yaml(filepath: Path | str | None = None) str[source]

Exports the ConfDict to yaml string and optionnaly to a file if a filepath is provided

update(mapping: MutableMapping[str, Any] | Iterable[Tuple[str, Any]] | None = None, **kwargs: Any) None[source]

Updates recursively the keys of the confdict. No key is removed unless a sub-dictionary contains "=replace=": True, in this case the existing keys in the sub-dictionary are wiped

class exca.cachedict.CacheDict(folder: Path | str | None, keep_in_ram: bool = False, cache_type: str | None = None, permissions: int | None = 511)[source]

Dictionary-like object that caches and loads data on disk and ram.

Parameters

folder: optional Path or str

Path to directory for dumping/loading the cache on disk

keep_in_ram: bool

if True, adds a cache in RAM of the data once loaded (similar to LRU cache)

cache_type: str or None

type of cache dumper/loader to use (see dumperloader.py file to see existing options, this include “NumpyArray”, “NumpyMemmapArray”, “TorchTensor”, “PandasDataframe”. If None, the type will be deduced automatically and by default use a standard pickle dump.

permissions: optional int

permissions for generated files use os.chmod / path.chmod compatible numbers, or None to deactivate eg: 0o777 for all rights to all users

Usage

mydict = CacheDict(folder, keep_in_ram=True)
mydict.keys()  # empty if folder was empty
mydict["whatever"] = np.array([0, 1])
# stored in both memory cache, and disk :)
mydict2 = CacheDict(folder, keep_in_ram=True)
# since mydict and mydict2 share the same folder, the
# key "whatever" will be in mydict2
assert "whatever" in mydict2

Note

Each item is cached as 1 file, with an additional .key file with the same name holding the actual key for the item (which can differ from the file name)

exca.helpers.find_slurm_job(*, job_id: str, folder: Path | str | None = None) InfraSlurmJob | None[source]

Attemps to instantiate a submitit.SlurmJob instance from a cache folder and a job_id, looking for it recursively. This is based on default configuration of the log folder position (<cache folder>/logs/<username>/<job_id>), and some additional heuristic that may be invalid in other pipelines (skipping logs/wandb folders) so this can fail with other configurations and may need adaptations, but should answer 95% of cases.

Parameters

job_id: str

the job id

folder: str, Path or None

the path of the cache folder. If None, scontrol will be called to try and identify it automatically (will fail for non-running jobs)

Notes

  • a submitit.Job instance has:
    • job.paths.stderr/stdout: pathlib.Path of the logs

    • job.stderr()/stdout(): string of the logs

    • job.result(): output of the job (waits in not completed, raises if error)

    • job.done(): True if job is completed

  • On top of it, the returned job has attributes:
    • config: the full configuration of the job

    • uid_config: the non default uid configuration of the job

  • The search assumes there is only one “logs” folder in the path (as we assume the default configuration of the logs path) and will probably fail if the cache folder contains /logs/ in it It also assumes there is no /code/ in it.

  • Get the err using this line: out = job.stderr().split("\n")

Example

job = find_slurm_job(job_id=job_id, folder=my_folder)
print(job.uid_config)  # see uid (= simplified) config for this job
print(job.stdout())  # print stdout of the job
class exca.helpers.with_infra(**kwargs: Any)[source]

Decorator for adding an infra to a function

Usage

@with_infra(folder="whatever")
def my_func(....)
    ...

or directly my_func = with_infra(folder="whavetever")(my_func) then the function will always use this infra.