API Reference
- class exca.TaskInfra(*, folder: Path | str | None = None, cluster: Literal[None, 'auto', 'local', 'slurm', 'debug'] = None, logs: Path | str = '{folder}/logs/{user}/%j', job_name: str | None = None, timeout_min: int | None = None, nodes: int | None = 1, tasks_per_node: int | None = 1, cpus_per_task: int | None = None, gpus_per_node: int | None = None, mem_gb: float | None = None, slurm_constraint: str | None = None, slurm_partition: str | None = None, slurm_account: str | None = None, slurm_qos: str | None = None, slurm_use_srun: bool = False, slurm_additional_parameters: Dict[str, int | str | float | bool] | None = None, conda_env: Path | str | None = None, workdir: None | WorkDir = None, permissions: int | str | None = 511, version: str = '0', mode: Literal['cached', 'retry', 'force', 'read-only'] = 'cached', keep_in_ram: bool = False)[source]
Processing/caching infrastructure ready to be applied to a pydantic.BaseModel method. To use it, the configuration can be set as an attribute of a pydantic BaseModel, then @infra.apply must be set on the method to process/cache this will effectively replace the function with a cached/remotely-computed version of itself
Parameters
- folder: optional Path or str
Path to directory for dumping/loading the cache on disk, if provided
- keep_in_ram: bool
if True, adds a cache in RAM of the data once loaded (similar to LRU cache)
- mode: str
- One of the following:
"cached"
: cache is returned if available (error or not), otherwise computed (and cached)"retry"
: cache is returned if available except if it’s an error, otherwise (re)computed (and cached)"force"
: cache is ignored, and result are (re)computed (and cached)"read-only"
: never compute anything
Slurm/submitit parameters
Check out
exca.slurm.SubmititMixin
Note
the method must take as input an iterable of items of a type X, and yield one output of a type Y for each input.
- apply(arg: C, /) C [source]
- apply(exclude_from_cache_uid: Iterable[str] | Callable[[Any], Iterable[str]] = ()) Callable[[C], C]
Applies the infra on a method taking no parameter (except self)
Parameters
- method: callable
a method of a pydantic.BaseModel taking as input an iterable of items of a type X, and yielding one output of a type Y for each input item.
- exclude_from_cache_uid: iterable of str / method / method name
fields that must be removed from the uid of the cache (in addition to the ones already removed from the class uid)
Usage
either decorate with @infra.apply or @infra.apply(exclude_from_cache_uid=<whatever>)
- clear_job() None [source]
Clears and possibly cancels this task’s job so that the computation is rerun at the next call
- clone_obj(*args: Dict[str, Any], **kwargs: Any) Any [source]
Create a new decorated object by applying a diff config to the underlying object
- config(uid: bool = True, exclude_defaults: bool = False) ConfDict [source]
Exports the task configuration as a ConfigDict ConfDict are dict which split on “.” with extra flatten, to_uid and to_yaml features
Parameters
- uid: bool
if True, uses the _exclude_from_cls_uid field/method to filter in and out some fields
- exclude_defaults: bool
if True, values that are set to defaults are not included
- job_array(max_workers: int = 256) Iterator[List[Any]] [source]
Creates a list object to populate The tasks in the list will be sent as a job array when exiting the context
- obj_infras() Mapping[str, BaseInfra] [source]
Returns a dictionary of all infras part of the current model/config (including sub-configs)
- class exca.MapInfra(*, folder: Path | str | None = None, cluster: Literal[None, 'auto', 'local', 'slurm', 'debug', 'threadpool', 'processpool'] = None, logs: Path | str = '{folder}/logs/{user}/%j', job_name: str | None = None, timeout_min: int | None = None, nodes: int | None = 1, tasks_per_node: int | None = 1, cpus_per_task: int | None = None, gpus_per_node: int | None = None, mem_gb: float | None = None, slurm_constraint: str | None = None, slurm_partition: str | None = None, slurm_account: str | None = None, slurm_qos: str | None = None, slurm_use_srun: bool = False, slurm_additional_parameters: Dict[str, int | str | float | bool] | None = None, conda_env: Path | str | None = None, workdir: None | WorkDir = None, permissions: int | str | None = 511, version: str = '0', keep_in_ram: bool = True, max_jobs: int | None = 128, min_samples_per_job: int = 2048, forbid_single_item_computation: bool = False, mode: Literal['cached', 'force', 'read-only'] = 'cached')[source]
Processing/caching infrastructure ready to be applied to a pydantic.BaseModel method. To use it, the configuration can be set as an attribute of a pydantic BaseModel, then @infra.apply(item_uid) must be set on the method to process/cache this will effectively replace the function with a cached/remotely-computed version of itself
Parameters
- folder: optional Path or str
Path to directory for dumping/loading the cache on disk, if provided
- keep_in_ram: bool
if True, adds a cache in RAM of the data once loaded (similar to LRU cache)
- mode: str
- One of the following:
"cached"
: cache is returned if available (error or not), otherwise computed (and cached)"force"
: cache is ignored, and result are (re)computed (and cached)"read-only"
: never compute anything
- cluster: optional str
- Where to run the computation, one of:
None
: runs in the current thread"debug"
: submitit debug executor (runs in the current process with ipdb)"local"
: submitit local executor (runs in a dedicated subprocess)"slurm"
: submitit slurm executor (runs in a slurm cluster)"auto"
: submitit auto executor (uses slurm if available, otherwise local)"processpool"
: runs locally in a concurrent.future.ProcessPoolExecutor"threadpool"
: runs locally in a concurrent.future.ThreadPoolExecutor
- max_jobs: optional int
maximum number of submitit jobs or process/thread workers to submit for running all the map processing
- min_samples_per_job: optional int
minimum number of samples to compute within each job
- forbid_single_item_computation: bool
raises if a single item needs to be computed. This can help detect issues (and overloading the cluster) when all items are supposed to have been precomputed.
Slurm/submitit parameters
Check out
exca.slurm.SubmititMixin
Note
the decorated method must take as input an iterable of items of a type X, and yield one output of a type Y for each input.
- apply(*, item_uid: Callable[[Any], str], exclude_from_cache_uid: Iterable[str] | Callable[[Any], Iterable[str]] = (), cache_type: str | None = None) Callable[[C], C] [source]
Applies the infra on a method taking an iterable of items as input
Parameters
- method: callable
a method of a pydantic.BaseModel taking as input an iterable of items of a type X, and yielding one output of a type Y for each input item.
- item_uid: callable from item to str
function returning a uid from the item of a map
- exclude_from_cache_uid: iterable of str / method / method name
fields that must be removed from the uid of the cache (in addition to the ones already removed from the class uid)
- cache_type: str
name of the cache class to use (inferred by default) this can for instance be used to enforce eg a memmap instead of loading arrays
Usage
Decorate the method with @infra.apply(item_uid=<function>) (as well as any additional parameter you want to set)
- clone_obj(*args: Dict[str, Any], **kwargs: Any) Any [source]
Create a new decorated object by applying a diff config to the underlying object
- config(uid: bool = True, exclude_defaults: bool = False) ConfDict [source]
Exports the task configuration as a ConfigDict ConfDict are dict which split on “.” with extra flatten, to_uid and to_yaml features
Parameters
- uid: bool
if True, uses the _exclude_from_cls_uid field/method to filter in and out some fields
- exclude_defaults: bool
if True, values that are set to defaults are not included
Associated classes and functions
- class exca.slurm.SubmititMixin(*, folder: Path | str | None = None, cluster: Literal[None, 'auto', 'local', 'slurm', 'debug'] = None, logs: Path | str = '{folder}/logs/{user}/%j', job_name: str | None = None, timeout_min: int | None = None, nodes: int | None = 1, tasks_per_node: int | None = 1, cpus_per_task: int | None = None, gpus_per_node: int | None = None, mem_gb: float | None = None, slurm_constraint: str | None = None, slurm_partition: str | None = None, slurm_account: str | None = None, slurm_qos: str | None = None, slurm_use_srun: bool = False, slurm_additional_parameters: Dict[str, int | str | float | bool] | None = None, conda_env: Path | str | None = None, workdir: None | WorkDir = None)[source]
Mixin class for creating a submitit runner infra
Parameters
- folder: optional Path or str
Path to directory for dumping/loading the cache on disk, if provided
- cluster: optional str
- Where to run the computation, one of:
None
: runs in the current thread"debug"
: submitit debug executor (runs in the current process with ipdb)"local"
: submitit local executor (runs in a dedicated subprocess)"slurm"
: submitit slurm executor (runs in a slurm cluster)"auto"
: submitit auto executor (uses slurm if available, otherwise local)
- logs: Path or str
path to the logs for slurm/local jobs. One can use
{folder}
in the string to define logs as a subfolder of the storage folder,{user}
for the user name and%j
(slurm syntax) for the job id- workdir: optional
exca.workdir.WorkDir
pydantic config defining whether and how to copy the current workspace to a directory specific for the job and avoid interferences when working on the code. See
exca.workdir.WorkDir
for details.- name: optional str
name of the job
- timeout_min: optional int
timeout for slurm/local jobs
- nodes: optional int
number of nodes for slurm jobs
- tasks_per_node: optional int
number of task nodes for slurm jobs
- cpus_per_task: optional int
number of cpus per task for slurm jobs
- gpus_per_node: optional int
number of gpus per node for slurm jobs
- mem_gb: float
RAM memory to be used in GB
- slurm_constraint: optional str
node constraint for the job
- slurm_account: optional str
account to use for the job
- slurm_qos: optional str
qos to use for the job
- slurm_partition: optional str
partition for the slurm job
- slurm_use_srun: bool
use srun in the sbatch file. This is the default in submitit, but not adviced for jobs triggering more jobs.
- slurm_additional_parameters: optional dict
additional parameters for slurm that are not first class parameters of this config
- conda_env: optional str/path
path or name of a conda environment to use in the job
- class exca.workdir.WorkDir(*, copied: Sequence[str | Path] = [], folder: str | Path | None = None, log_commit: bool = False, includes: Sequence[str] = (), excludes: Sequence[str] = ('__pycache__', '.git'))[source]
Custom working directory configuration
Parameters
- copied: Sequence[str]
list/tuple of names of files, or folders, or packages installed in editable mode to copy to the new working directory folder.
- folder: Path/str
folder to use as working directory, if not specified, infra will create one automatically
<infra_uid_folder>/code/<date>-<random_uid>/
. The folder is logged so you should be able to see what happened in your stderr/stdout. This parameter can be used in particular to store the code in a specific location or reuse workdir from a previous run.- includes: sequence of str
file name pattern than must be included (recursively) folder are always included except if explitely excluded eg:
["*.py"]
to include only python files- excludes: sequence of str
file/folder name pattern than mush be excluded
- log_commit: bool
if True, raises if current working directory is in a git repository with uncommited changes and logs commit otherwise
Notes
Since python privileges current working directory over installed packages, the copied packages should be the one running in the job (be careful there can be a few gotchas, eg: for debug cluster or with no cluster, the import cannot be not reloaded so the current working directory will be used, but that should not make a difference in theses cases)
The change of working directory (and possibly the copy) only happens when the infra is called for submitting the decorated function. Depending on your code, this may not be at the very beginning of your execution.
- class exca.ConfDict(mapping: MutableMapping[str, Any] | Iterable[Tuple[str, Any]] | None = None, **kwargs: Any)[source]
Dictionary which breaks into sub-dictionnaries on “.” as in a config (see example) The data can be specified either through “.” keywords or directly through sub-dicts or a mixture of both. Lists of dictionaries are processed as list of ConfDict Also, it has yaml export capabilities as well as uid computation.
Example
ConDict({"training.optim.lr": 0.01}) == {"training": {"optim": {"lr": 0.01}}}
Note
This is designed for configurations, so it probably does not scale well to 100k+ keys
- flat() Dict[str, Any] [source]
Returns a flat dictionary such as {“training.dataloader.lr”: 0.01, “training.optim.name”: “Ada”}
- classmethod from_args(args: list[str]) ConfDict [source]
Parses a list of Bash-style arguments (e.g., –key=value) into a ConfDict. typically used as
MyConfig(**ConfDict(sys.argv[1:]))
This method supports sub-arguments eg:--optimizer.lr=0.01
- classmethod from_model(model: BaseModel, uid: bool = False, exclude_defaults: bool = False) ConfDict [source]
Creates a ConfDict based on a pydantic model
Parameters
- model: pydantic.BaseModel
the model to convert into a dictionary
- uid: bool
if True, uses the _exclude_from_cls_uid field/method to filter in and out some fields
- exclude_defaults: bool
if True, values that are set to defaults are not included
Note
_exclude_from_cls_uid needs needs to be a list/tuple/set (or classmethod returning it) with either of both of fields: - exclude: tuple of field names to be excluded - force_include: tuple of fields to include in all cases (even if excluded or set to defaults)
- classmethod from_yaml(yaml: str | Path | IO[str] | IO[bytes]) ConfDict [source]
Loads a ConfDict from a yaml string/filepath/file handle.
- get(key: str, default: Any | None = None) Any [source]
Return the value for key if key is in the dictionary, else default.
- pop(k[, d]) v, remove specified key and return the corresponding value. [source]
If the key is not found, return the default if given; otherwise, raise a KeyError.
- class exca.cachedict.CacheDict(folder: Path | str | None, keep_in_ram: bool = False, cache_type: str | None = None, permissions: int | None = 511)[source]
Dictionary-like object that caches and loads data on disk and ram.
Parameters
- folder: optional Path or str
Path to directory for dumping/loading the cache on disk
- keep_in_ram: bool
if True, adds a cache in RAM of the data once loaded (similar to LRU cache)
- cache_type: str or None
type of cache dumper/loader to use (see dumperloader.py file to see existing options, this include “NumpyArray”, “NumpyMemmapArray”, “TorchTensor”, “PandasDataframe”. If None, the type will be deduced automatically and by default use a standard pickle dump.
- permissions: optional int
permissions for generated files use os.chmod / path.chmod compatible numbers, or None to deactivate eg: 0o777 for all rights to all users
Usage
mydict = CacheDict(folder, keep_in_ram=True) mydict.keys() # empty if folder was empty mydict["whatever"] = np.array([0, 1]) # stored in both memory cache, and disk :) mydict2 = CacheDict(folder, keep_in_ram=True) # since mydict and mydict2 share the same folder, the # key "whatever" will be in mydict2 assert "whatever" in mydict2
Note
Each item is cached as 1 file, with an additional .key file with the same name holding the actual key for the item (which can differ from the file name)
- exca.helpers.find_slurm_job(*, job_id: str, folder: Path | str | None = None) InfraSlurmJob | None [source]
Attemps to instantiate a submitit.SlurmJob instance from a cache folder and a job_id, looking for it recursively. This is based on default configuration of the log folder position (
<cache folder>/logs/<username>/<job_id>
), and some additional heuristic that may be invalid in other pipelines (skipping logs/wandb folders) so this can fail with other configurations and may need adaptations, but should answer 95% of cases.Parameters
- job_id: str
the job id
- folder: str, Path or None
the path of the cache folder. If None, scontrol will be called to try and identify it automatically (will fail for non-running jobs)
Notes
- a
submitit.Job
instance has: job.paths.stderr/stdout
: pathlib.Path of the logsjob.stderr()/stdout()
: string of the logsjob.result()
: output of the job (waits in not completed, raises if error)job.done()
: True if job is completed
- a
- On top of it, the returned job has attributes:
config
: the full configuration of the jobuid_config
: the non default uid configuration of the job
The search assumes there is only one “logs” folder in the path (as we assume the default configuration of the logs path) and will probably fail if the cache folder contains /logs/ in it It also assumes there is no /code/ in it.
Get the err using this line:
out = job.stderr().split("\n")
Example
job = find_slurm_job(job_id=job_id, folder=my_folder) print(job.uid_config) # see uid (= simplified) config for this job print(job.stdout()) # print stdout of the job