API Reference#

class exca.TaskInfra(*, folder: Path | str | None = None, cluster: Literal[None, 'auto', 'local', 'slurm', 'debug'] = None, logs: Path | str = '{folder}/logs/{user}/%j', job_name: str | None = None, timeout_min: int | None = None, nodes: int | None = 1, tasks_per_node: int | None = 1, cpus_per_task: int | None = None, gpus_per_node: int | None = None, mem_gb: float | None = None, max_pickle_size_gb: float | None = None, slurm_constraint: str | None = None, slurm_partition: str | None = None, slurm_account: str | None = None, slurm_qos: str | None = None, slurm_use_srun: bool = False, slurm_additional_parameters: dict[str, int | str | float | bool] | None = None, conda_env: Path | str | None = None, workdir: None | WorkDir = None, permissions: int | str | None = 511, version: str = '0', mode: Literal['cached', 'retry', 'force', 'read-only'] = 'cached', keep_in_ram: bool = False)[source]#

Processing/caching infrastructure ready to be applied to a pydantic.BaseModel method. To use it, the configuration must be set as an attribute of a pydantic BaseModel, then @infra.apply must be set on the parameter-free method to process/cache. This will effectively replace the function with a cached/remotely-computed version of itself

Parameters:
  • folder (optional Path or str) – Path to directory for dumping/loading the cache on disk, if provided

  • keep_in_ram (bool) – if True, adds a cache in RAM of the data once loaded (similar to LRU cache)

  • mode (str) –

    One of the following:
    • "cached": cache is returned if available (error or not), otherwise computed (and cached)

    • "retry": cache is returned if available except if it’s an error, otherwise (re)computed (and cached)

    • "force": cache is ignored, and result are (re)computed (and cached)

    • "read-only": never compute anything

  • parameters (Slurm/submitit)

  • -------------------------

:param Check out exca.slurm.SubmititMixin: :param Usage: :param —–: :param .. code-block:: python:

class MyTask(pydantic.BaseModel):

x: int infra: exca.TaskInfra = exca.TaskInfra()

@infra.apply def compute(self) -> int:

return 2 * self.x

cfg = MyTask(12, infra={“folder”: “tmp”, “cluster”: “slurm”}) assert cfg.compute() == 24 # “compute” runs on slurm and is cached

apply(arg: C, /) C[source]#
apply(exclude_from_cache_uid: Iterable[str] | Callable[[Any], Iterable[str]] = ()) Callable[[C], C]

Applies the infra on a method taking no parameter (except self)

Parameters:
  • method (callable) – a method of a pydantic.BaseModel taking as input an iterable of items of a type X, and yielding one output of a type Y for each input item.

  • exclude_from_cache_uid (iterable of str / method / method name) – fields that must be removed from the uid of the cache (in addition to the ones already removed from the class uid)

  • Usage

  • -----

:param either decorate with @infra.apply or @infra.apply(exclude_from_cache_uid=<whatever>):

clear_job() None[source]#

Clears and possibly cancels this task’s job so that the computation is rerun at the next call

clone_obj(*args: dict[str, Any], **kwargs: Any) Any[source]#

Create a new decorated object by applying a diff config to the underlying object

config(uid: bool = True, exclude_defaults: bool = False) ConfDict[source]#

Exports the task configuration as a ConfigDict ConfDict are dict which split on “.” with extra flatten, to_uid and to_yaml features

Parameters:
  • uid (bool) – if True, uses the _exclude_from_cls_uid field/method to filter in and out some fields

  • exclude_defaults (bool) – if True, values that are set to defaults are not included

iter_cached() Iterable[BaseModel][source]#

Iterate over similar tasks in the cache folder

job() Job[Any] | LocalJob[source]#

Creates or reload the job corresponding to the task

job_array(max_workers: int = 256, allow_empty: bool = False, allow_repeated_tasks: bool = False) Iterator[list[Any]][source]#

Creates a list object to populate The tasks in the list will be sent as a job array when exiting the context

Parameter#

max_workers: int

maximum number of jobs in the array that can be running at a given time

allow_empty: bool

if False, an exception will be raised when exiting the context if the array is still empty

allow_repeated_tasks: bool

if False (default) a same task should not be repeated twice in the array.

obj_infras() Mapping[str, BaseInfra][source]#

Returns a dictionary of all infras part of the current model/config (including sub-configs)

status() Literal['not submitted', 'running', 'completed', 'failed'][source]#

Provides the status of the job This can be one of “not submitted”, “running”, “completed” or “failed”

uid() str[source]#

Returns the unique uid of the task

uid_folder(create: bool = False) Path | None[source]#

Folder where this task instance is stored

class exca.MapInfra(*, folder: Path | str | None = None, cluster: Literal[None, 'auto', 'local', 'slurm', 'debug', 'threadpool', 'processpool'] = None, logs: Path | str = '{folder}/logs/{user}/%j', job_name: str | None = None, timeout_min: int | None = None, nodes: int | None = 1, tasks_per_node: int | None = 1, cpus_per_task: int | None = None, gpus_per_node: int | None = None, mem_gb: float | None = None, max_pickle_size_gb: float | None = None, slurm_constraint: str | None = None, slurm_partition: str | None = None, slurm_account: str | None = None, slurm_qos: str | None = None, slurm_use_srun: bool = False, slurm_additional_parameters: dict[str, int | str | float | bool] | None = None, conda_env: Path | str | None = None, workdir: None | WorkDir = None, permissions: int | str | None = 511, version: str = '0', keep_in_ram: bool = True, max_jobs: int | None = 128, min_samples_per_job: int = 1, forbid_single_item_computation: bool = False, mode: Literal['cached', 'force', 'read-only'] = 'cached')[source]#

Processing/caching infrastructure ready to be applied to a pydantic.BaseModel method. To use it, the configuration can be set as an attribute of a pydantic BaseModel, then @infra.apply(item_uid) must be set on the method to process/cache this will effectively replace the function with a cached/remotely-computed version of itself

Parameters:
  • folder (optional Path or str) – Path to directory for dumping/loading the cache on disk, if provided

  • keep_in_ram (bool) – if True, adds a cache in RAM of the data once loaded (similar to LRU cache)

  • mode (str) –

    One of the following:
    • "cached": cache is returned if available (error or not), otherwise computed (and cached)

    • "force": cache is ignored, and result are (re)computed (and cached)

    • "read-only": never compute anything

  • cluster (optional str) –

    Where to run the computation, one of:
    • None: runs in the current thread

    • "debug": submitit debug executor (runs in the current process with ipdb)

    • "local": submitit local executor (runs in a dedicated subprocess)

    • "slurm": submitit slurm executor (runs in a slurm cluster)

    • "auto": submitit auto executor (uses slurm if available, otherwise local)

    • "processpool": runs locally in a concurrent.future.ProcessPoolExecutor

    • "threadpool": runs locally in a concurrent.future.ThreadPoolExecutor

  • max_jobs (optional int) – maximum number of submitit jobs or process/thread workers to submit for running all the map processing

  • min_samples_per_job (optional int) – minimum number of samples to compute within each job

  • forbid_single_item_computation (bool) – raises if a single item needs to be computed. This can help detect issues (and overloading the cluster) when all items are supposed to have been precomputed.

  • parameters (Slurm/submitit)

  • -------------------------

:param Check out exca.slurm.SubmititMixin:

Note

  • the decorated method must take as input an iterable of items of a type X, and yield one output of a type Y for each input.

apply(*, item_uid: Callable[[Any], str], item_uid_max_length: int | None = 256, exclude_from_cache_uid: Iterable[str] | Callable[[Any], Iterable[str]] = (), cache_type: str | None = None) Callable[[C], C][source]#

Applies the infra on a method taking an iterable of items as input

Parameters:
  • method (callable) – a method of a pydantic.BaseModel taking as input an iterable of items of a type X, and yielding one output of a type Y for each input item.

  • item_uid (callable from item to str) – function returning a uid from the item of a map

  • item_uid_max_length (int or None) – maximum length of the item_uid output before it gets shortened for efficiency (as all uids need to be read at first call). Use None to deactivate shortening.

  • exclude_from_cache_uid (iterable of str / method / method name) – fields that must be removed from the uid of the cache (in addition to the ones already removed from the class uid)

  • cache_type (str) –

    name of the cache class to use (inferred by default) this can for instance be used to enforce eg a memmap instead of loading arrays The available options include: - NumpyArray: stores numpy arrays as npy files (default for np.ndarray) - NumpyMemmapArray: similar to NumpyArray but reloads arrays as memmaps - MemmapArrayFile: stores multiple np.ndarray into a unique memmap file

    (strongly adviced in case of many arrays)

    • PandasDataFrame: stores pandas dataframes as csv (default for dataframes)

    • ParquetPandasDataFrame: stores pandas dataframes as parquet files

    • TorchTensor: stores torch.Tensor as .pt file (default for tensors)

    • Pickle: stores object as pickle file (fallback default)

  • Usage

  • -----

  • well (Decorate the method with @infra.apply(item_uid=<function>) (as)

  • set) (as any additional parameter you want to)

clone_obj(*args: dict[str, Any], **kwargs: Any) Any[source]#

Create a new decorated object by applying a diff config to the underlying object

config(uid: bool = True, exclude_defaults: bool = False) ConfDict[source]#

Exports the task configuration as a ConfigDict ConfDict are dict which split on “.” with extra flatten, to_uid and to_yaml features

Parameters:
  • uid (bool) – if True, uses the _exclude_from_cls_uid field/method to filter in and out some fields

  • exclude_defaults (bool) – if True, values that are set to defaults are not included

iter_cached() Iterable[BaseModel][source]#

Iterate over similar objects in the cache folder

obj_infras() Mapping[str, BaseInfra][source]#

Returns a dictionary of all infras part of the current model/config (including sub-configs)

uid() str[source]#

Returns the unique uid of the task

uid_folder(create: bool = False) Path | None[source]#

Folder where this task instance is stored

class exca.SubmitInfra(*, folder: Path | str | None = None, cluster: Literal[None, 'auto', 'local', 'slurm', 'debug'] = None, logs: Path | str = '{folder}/logs/{user}/%j', job_name: str | None = None, timeout_min: int | None = None, nodes: int | None = 1, tasks_per_node: int | None = 1, cpus_per_task: int | None = None, gpus_per_node: int | None = None, mem_gb: float | None = None, max_pickle_size_gb: float | None = None, slurm_constraint: str | None = None, slurm_partition: str | None = None, slurm_account: str | None = None, slurm_qos: str | None = None, slurm_use_srun: bool = False, slurm_additional_parameters: dict[str, int | str | float | bool] | None = None, conda_env: Path | str | None = None, workdir: None | WorkDir = None, permissions: int | str | None = 511, version: str = '0')#

[Experimental] Processing infrastructure ready to be applied to a pydantic.BaseModel method. To use it, the configuration must be set as an attribute of a pydantic BaseModel, then @infra.apply must be set on the method to process this will effectively replace the function with a remotely-computed version of itself. Contrarily to TaskInfra, outputs of the method are not cached, and the method can take arguments

Parameters:
  • folder (optional Path or str) – Path to directory for dumping/loading the cache on disk, if provided

  • parameters (Slurm/submitit)

  • -------------------------

:param Check out exca.slurm.SubmititMixin: :param Usage: :param —–: :param .. code-block:: python:

class MyTask(pydantic.BaseModel):

x: int infra: exca.TaskInfra = exca.TaskInfra()

@infra.apply def compute(self, y: int) -> int:

return 2 * self.x

cfg = MyTask(12, infra={“folder”: “tmp”, “cluster”: “slurm”}) assert cfg.compute(y=1) == 25 # “compute” runs on slurm but is not cached job = cfg.infra.submit(y=1) # runs the computation asynchronously assert job.result() == 25

Note

  • The decorated method can be a staticmethod to avoid pickling the owner object along with the other parameters.

  • This is an experimental infra that is still evolving

apply(method: C) C#

Applies the infra on a method taking no parameter (except self)

Parameter#

method: callable

a method of a pydantic.BaseModel taking as input an iterable of items of a type X, and yielding one output of a type Y for each input item.

Usage#

Decorate the method with @infra.apply

batch(max_workers: int = 256) Iterator[None]#

Context for batching submissions through infra.submit into a unique array job

Parameter#

max_workers: int

maximum number of jobs in the array that can be running at a given time

Usage#

with cfg.infra.batch():
    job1 = cfg.infra.submit(y=1)
    job2 = cfg.infra.submit(y=2)
# job1 and job2 are submitted together as a job array
submit(*args: Any, **kwargs: Any) Job[Any] | LocalJob#

Submit an asynchroneous job. This call is non-blocking and returns a Job instance that has a result() method that awaits for the computation to be over.

Parameters:
  • *args (parameters of the decorated method)

  • **kwargs (parameters of the decorated method)

Note

As a reminder, outputs are not cached so submitting several time will run as many jobs.

uid() str#

Returns the unique uid of the task

Associated classes and functions#

class exca.slurm.SubmititMixin(*, folder: Path | str | None = None, cluster: Literal[None, 'auto', 'local', 'slurm', 'debug'] = None, logs: Path | str = '{folder}/logs/{user}/%j', job_name: str | None = None, timeout_min: int | None = None, nodes: int | None = 1, tasks_per_node: int | None = 1, cpus_per_task: int | None = None, gpus_per_node: int | None = None, mem_gb: float | None = None, max_pickle_size_gb: float | None = None, slurm_constraint: str | None = None, slurm_partition: str | None = None, slurm_account: str | None = None, slurm_qos: str | None = None, slurm_use_srun: bool = False, slurm_additional_parameters: dict[str, int | str | float | bool] | None = None, conda_env: Path | str | None = None, workdir: None | WorkDir = None)[source]#

Mixin class for creating a submitit runner infra

Parameters:
  • folder (optional Path or str) – Path to directory for dumping/loading the cache on disk, if provided

  • cluster (optional str) –

    Where to run the computation, one of:
    • None: runs in the current thread

    • "debug": submitit debug executor (runs in the current process with ipdb)

    • "local": submitit local executor (runs in a dedicated subprocess)

    • "slurm": submitit slurm executor (runs in a slurm cluster)

    • "auto": submitit auto executor (uses slurm if available, otherwise local)

  • logs (Path or str) – path to the logs for slurm/local jobs. One can use {folder} in the string to define logs as a subfolder of the storage folder, {user} for the user name and %j (slurm syntax) for the job id

  • workdir (optional exca.workdir.WorkDir) – pydantic config defining whether and how to copy the current workspace to a directory specific for the job and avoid interferences when working on the code. See exca.workdir.WorkDir for details.

  • name (optional str) – name of the job

  • timeout_min (optional int) – timeout for slurm/local jobs

  • nodes (optional int) – number of nodes for slurm jobs

  • tasks_per_node (optional int) – number of task nodes for slurm jobs

  • cpus_per_task (optional int) – number of cpus per task for slurm jobs

  • gpus_per_node (optional int) – number of gpus per node for slurm jobs

  • mem_gb (float) – RAM memory to be used in GB

  • slurm_constraint (optional str) – node constraint for the job

  • slurm_account (optional str) – account to use for the job

  • slurm_qos (optional str) – qos to use for the job

  • slurm_partition (optional str) – partition for the slurm job

  • slurm_use_srun (bool) – use srun in the sbatch file. This is the default in submitit, but not adviced for jobs triggering more jobs.

  • slurm_additional_parameters (optional dict) – additional parameters for slurm that are not first class parameters of this config

  • conda_env (optional str/path) – path or name of a conda environment to use in the job. Note that as submitit uses a pickle that needs to be loaded in the job with the new conda env, the pickle needs to be compatible. This mostly means that if the env has a different pydantic version, the job may fail to reload it. Additionally, to allow for different python versions, the job is dumped with pickle and not cloudpickle, so inline functions (defined in main or in a notebook) will not be supported.

class exca.workdir.WorkDir(*, copied: Sequence[str | Path] = [], folder: str | Path | None = None, log_commit: bool = False, includes: Sequence[str] = ('*.py',), excludes: Sequence[str] = ('__pycache__', '.git'))[source]#

Custom working directory configuration

Parameters:
  • copied (Sequence[str]) – list/tuple of names of files, or folders, or packages installed in editable mode to copy to the new working directory folder. Relative paths will be moved to the relative equivalent in the new folder, while for absolute path, the folder/file pointed by the path will be moved directly to the new folder.

  • folder (Path/str) – folder to use as working directory, if not specified, infra will create one automatically <infra_uid_folder>/code/<date>-<random_uid>/. The folder is logged so you should be able to see what happened in your stderr/stdout. This parameter can be used in particular to store the code in a specific location or reuse workdir from a previous run.

  • includes (sequence of str) – file name pattern than must be included (recursively) folder are always included except if explitely excluded eg: ["*.py"] to include only python files

  • excludes (sequence of str) – file/folder name pattern than mush be excluded

  • log_commit (bool) – if True, raises if current working directory is in a git repository with uncommited changes and logs commit otherwise

Notes

  • Since python privileges current working directory over installed packages, the copied packages should be the one running in the job (be careful there can be a few gotchas, eg: for debug cluster or with no cluster, the import cannot be not reloaded so the current working directory will be used, but that should not make a difference in theses cases)

  • The change of working directory (and possibly the copy) only happens when the infra is called for submitting the decorated function. Depending on your code, this may not be at the very beginning of your execution.

  • The context is reentrant, upon the second entrance nothing will be updated.

class exca.ConfDict(mapping: MutableMapping[str, Any] | Iterable[tuple[str, Any]] | None = None, **kwargs: Any)[source]#

Dictionary which breaks into sub-dictionnaries on “.” as in a config (see example) The data can be specified either through “.” keywords or directly through sub-dicts or a mixture of both. Lists of dictionaries are processed as list of ConfDict Also, it has yaml export capabilities as well as uid computation.

Example

ConDict({"training.optim.lr": 0.01}) == {"training": {"optim": {"lr": 0.01}}}

Note

  • This is designed for configurations, so it probably does not scale well to 100k+ keys

  • dicts are merged expect if containing the key "=replace=", in which case they replace the content. On the other hand, non-dicts always replace the content.

copy() a shallow copy of D[source]#
flat() dict[str, Any][source]#

Returns a flat dictionary such as {“training.dataloader.lr”: 0.01, “training.optim.name”: “Ada”}

classmethod from_args(args: list[str]) Self[source]#

Parses a list of Bash-style arguments (e.g., –key=value) into a ConfDict. typically used as MyConfig(**ConfDict(sys.argv[1:])) This method supports sub-arguments eg: --optimizer.lr=0.01

classmethod from_model(model: BaseModel, uid: bool = False, exclude_defaults: bool = False) ConfDict[source]#

Creates a ConfDict based on a pydantic model

Parameters:
  • model (pydantic.BaseModel) – the model to convert into a dictionary

  • uid (bool) – if True, uses the _exclude_from_cls_uid field/method to filter in and out some fields

  • exclude_defaults (bool) – if True, values that are set to defaults are not included

Note

_exclude_from_cls_uid needs needs to be a list/tuple/set (or classmethod returning it) with either of both of fields: - exclude: tuple of field names to be excluded - force_include: tuple of fields to include in all cases (even if excluded or set to defaults)

classmethod from_yaml(yaml: str | Path | IO[str] | IO[bytes]) ConfDict[source]#

Loads a ConfDict from a yaml string/filepath/file handle.

get(key: str, default: Any = None) Any[source]#

Return the value for key if key is in the dictionary, else default.

pop(k[, d]) v, remove specified key and return the corresponding value.[source]#

If the key is not found, return the default if given; otherwise, raise a KeyError.

to_uid(version: None | int = None) str[source]#

Provides a unique string for the config

to_yaml(filepath: Path | str | None = None) str[source]#

Exports the ConfDict to yaml string and optionnaly to a file if a filepath is provided

update(mapping: MutableMapping[str, Any] | Iterable[tuple[str, Any]] | None = None, **kwargs: Any) None[source]#

Updates recursively the keys of the confdict. No key is removed unless a sub-dictionary contains "=replace=": True, in this case the existing keys in the sub-dictionary are wiped

class exca.cachedict.CacheDict(folder: Path | str | None, keep_in_ram: bool = False, cache_type: None | str = None, permissions: int | None = 511)#

Dictionary-like object that caches and loads data on disk and ram.

Parameters:
  • folder (optional Path or str) – Path to directory for dumping/loading the cache on disk

  • keep_in_ram (bool) – if True, adds a cache in RAM of the data once loaded (similar to LRU cache)

  • cache_type (str or None) –

    type of cache dumper to use (see dumperloader.py file to see existing options, this include:

    • "NumpyArray": one .npy file for each array, loaded in ram

    • "NumpyMemmapArray": one .npy file for each array, loaded as a memmap

    • "MemmapArrayFile": one bytes file per worker, loaded as a memmap, and keeping an internal cache of the open memmap file (EXCA_MEMMAP_ARRAY_FILE_MAX_CACHE env variable can be set to reset the cache at a given number of open files, defaults to 100 000)

    • "TorchTensor": one .pt file per tensor

    • "PandasDataframe": one .csv file per pandas dataframe

    • "ParquetPandasDataframe": one .parquet file per pandas dataframe (faster to dump and read)

    • "DataDict": a dict for which (first-level) fields are dumped using the default dumper. This is particularly useful to store dict of arrays which would be then loaded as dict of memmaps.

    If None, the type will be deduced automatically (Json for JSON-serializable values, or a type-specific handler for numpy arrays, tensors, etc.). Loading is handled using the cache_type specified in info files.

  • permissions (optional int) – permissions for generated files use os.chmod / path.chmod compatible numbers, or None to deactivate eg: 0o777 for all rights to all users

  • Usage

  • -----

  • code-block: (..) –

    python: mydict = CacheDict(folder, keep_in_ram=True) mydict.keys() # empty if folder was empty with mydict.write():

    mydict[“whatever”] = np.array([0, 1])

    # stored in both memory cache, and disk :) mydict2 = CacheDict(folder, keep_in_ram=True) # since mydict and mydict2 share the same folder, the # key “whatever” will be in mydict2 assert “whatever” in mydict2

Note

  • Dicts write to .jsonl files to hold keys and how to read the corresponding item. Different threads write to different jsonl files to avoid interferences.

  • checking repeatedly for content can be slow if unavailable, as this will repeatedly reload all jsonl files

frozen_cache_folder() Iterator[None]#

Considers the cache folder as frozen to prevents reloading key/json files more than once from now. This is useful to speed up __contains__ statement with many missing items, which could trigger thousands of file rereads

keys() Iterator[str]#

Returns the keys in the dictionary (triggers a cache folder reading if folder is not None)

write() Iterator[CacheDict[X]]#

Context manager for writing items to the cache.

writer() Iterator[CacheDict[X]]#

Deprecated: use write() instead.

class exca.helpers.DiscriminatedModel[source]#

Preserves the types of child class instance passed in pydantic models during serialization and de-serialization. This is achieved by injecting a key upon serialization.

By default the key is “type” but this can be customized through heritage (eg: class SubNamedModel(NamedModel, discriminator_key="name"))

Subclasses can be instantiated from any ancestor by passing the discriminator key:

class Base(DiscriminatedModel, discriminator_key="name"):
    ...
class Child(Base):
    value: int = 0

obj = Base(name="Child", value=3)  # returns a Child instance

Note

experimental feature

model_config = {'extra': 'forbid', 'validation_error_cause': True}[source]#

Configuration for the model, should be a dictionary conforming to [ConfigDict][pydantic.config.ConfigDict].

exca.helpers.find_slurm_job(*, job_id: str, folder: str | Path | None = None) InfraSlurmJob | None[source]#

Attemps to instantiate a submitit.SlurmJob instance from a cache folder and a job_id, looking for it recursively. This is based on default configuration of the log folder position (<cache folder>/logs/<username>/<job_id>), and some additional heuristic that may be invalid in other pipelines (skipping logs/wandb folders) so this can fail with other configurations and may need adaptations, but should answer 95% of cases.

Parameters:
  • job_id (str) – the job id

  • folder (str, Path or None) – the path of the cache folder. If None, scontrol will be called to try and identify it automatically (will fail for non-running jobs)

Notes

  • a submitit.Job instance has:
    • job.paths.stderr/stdout: pathlib.Path of the logs

    • job.stderr()/stdout(): string of the logs

    • job.result(): output of the job (waits in not completed, raises if error)

    • job.done(): True if job is completed

  • On top of it, the returned job has attributes:
    • config: the full configuration of the job

    • uid_config: the non default uid configuration of the job

  • The search assumes there is only one “logs” folder in the path (as we assume the default configuration of the logs path) and will probably fail if the cache folder contains /logs/ in it It also assumes there is no /code/ in it.

  • Get the err using this line: out = job.stderr().split("\n")

Example

job = find_slurm_job(job_id=job_id, folder=my_folder)
print(job.uid_config)  # see uid (= simplified) config for this job
print(job.stdout())  # print stdout of the job
class exca.helpers.with_infra(**kwargs: Any)[source]#

Decorator for adding an infra to a function

Usage#

@with_infra(folder="whatever")
def my_func(....)
    ...

or directly my_func = with_infra(folder="whavetever")(my_func) then the function will always use this infra.