API Reference#
Note
The Step API is part of Exca core but its public surface is still evolving — expect breaking changes between minor versions. See Step for context.
Core classes#
- class exca.steps.Step(*, infra: Backend | None = None)#
Base class for pipeline steps.
Override
_run()to implement computation:class Generator(Step): def _run(self): return load_data() class Transformer(Step): coeff: float = 1.0 def _run(self, data): return data * self.coeff
Override
_resolve_step()to decompose into a chain of steps:class Pipeline(Step): transforms: list[Step] = [] def _run(self, data): return expensive_computation(data) def _resolve_step(self): if not self.transforms: return self stripped = self.model_copy(update={"transforms": []}) return Chain(steps=[stripped] + self.transforms)
Note
When
Stepis used as a pydantic field type, a list/tuple is auto-converted to aChain(and a dict is dispatched on the discriminator key,"type"by default). Configs typically pass dicts rather than instances so they round-trip through YAML/JSON:class Config(pydantic.BaseModel): pipeline: Step Config(pipeline=[ {"type": "Mult", "coeff": 2}, {"type": "Mult", "coeff": 3}, ]) # pipeline is a Chain
- clone(*args: dict[str, Any], **kwargs: Any) Self#
Create a fresh Step config, optionally updated with params.
- item_uid(value: Any) str | None#
Custom cache uid for value, or
Nonefor default keying.Pure generators can return non-None for
NoValueto use attributes as the item dimension (colocation). Such fields should usually be excluded via_exclude_from_cls_uid.
- lookup(value: Any = <exca.steps.identity.NoValue object>, *, _upstream: Sequence[Step] = (), _uid: str | None = None) LookupHandle#
Return a
LookupHandlefor inspecting or clearing the cache.- Parameters:
value – The input value to look up. Omit for no-input steps.
- Returns:
Handle to inspect, retrieve, or clear the cached result.
- Return type:
- run(value: Any = <exca.steps.identity.NoValue object>) Any#
Execute the step on a single input, using cache/backend when set.
- Parameters:
value – Input to the step. Omit for no-input steps.
- Returns:
Cached or freshly computed result.
- Return type:
Any
- class exca.steps.Chain(*, infra: Backend | None = None, steps: Sequence[Step] | OrderedDict[str, Step])#
Bases:
StepCompose multiple steps sequentially.
Example:
chain = Chain( steps=[LoadData(path="x.csv"), Train(epochs=10)], infra={"backend": "Cached", "folder": "/cache"}, ) result = chain.run()
Chain has the same public API as Step; the constructor
adds a steps field (a Sequence[Step] or OrderedDict[str,
Step]). chain[i] and len(chain) index into the sub-steps;
slicing returns a new Chain.
Batched execution#
- class exca.steps.items.StepItems(*, source: _Source, uids: tp.Sequence[str] | None = None, upstream: tp.Sequence[Step] = (), pending: tp.Sequence[Step] = (), mode: identity.ModeType = 'cached')[source]#
Pipeline carrier for inline computation between cached boundaries.
For dict sources, uids default to the dict keys (insertion order). For CacheDict sources, explicit uids are required (the CacheDict may contain keys from other runs).
Cache lookup#
- class exca.steps.backends.LookupHandle(paths: StepPaths | None = None, cache_dict: CacheDict[Any] | None = None, backend: Backend | None = None, uid: str = '')[source]#
Cache handle for a
(step, value)pair.Returned by
Step.lookup(). Provides read-only access to the cache entry and its on-disk paths.- clear_cache(recursive: bool = True) None[source]#
Delete the cached result and associated files.
- Parameters:
recursive – Also clear sub-step caches (e.g. inside a
Chain).
LookupHandle.paths exposes a StepPaths dataclass with two
relevant attributes: step_folder (logs, metadata) and
cache_folder (cache entries).
Backends#
All backends share the fields declared on the base
Backend:
- class exca.steps.backends.Backend(*, folder: Path | None = None, mode: Literal['cached', 'force', 'read-only', 'retry'] = 'cached', keep_in_ram: bool = False)[source]#
Base class for execution backends with integrated caching.
Submitit-based backends — LocalProcess, SubmititDebug,
Slurm, Auto — additionally accept:
job_name: str | Nonetimeout_min: int | Nonenodes, tasks_per_node, cpus_per_task, gpus_per_node(int | None),mem_gb: float | Nonemax_jobs: int = 128— maximum number of array sub-jobs.min_items_per_job: int = 1— combine items into one job below this count.
Pool backends — ProcessPool, ThreadPool — additionally
accept:
max_jobs: int | None = 128— maximum pool size.
- class exca.steps.backends.Cached(*, folder: Path | None = None, mode: Literal['cached', 'force', 'read-only', 'retry'] = 'cached', keep_in_ram: bool = False)[source]#
Bases:
BackendInline execution + caching.
- class exca.steps.backends.LocalProcess(*, folder: Path | None = None, mode: Literal['cached', 'force', 'read-only', 'retry'] = 'cached', keep_in_ram: bool = False, job_name: str | None = None, timeout_min: int | None = None, nodes: int | None = None, tasks_per_node: int | None = None, cpus_per_task: int | None = None, gpus_per_node: int | None = None, mem_gb: float | None = None, max_jobs: Annotated[int, Gt(gt=0)] = 128, min_items_per_job: Annotated[int, Gt(gt=0)] = 1)[source]#
Bases:
_SubmititBackendSubprocess execution + caching.
- class exca.steps.backends.SubmititDebug(*, folder: Path | None = None, mode: Literal['cached', 'force', 'read-only', 'retry'] = 'cached', keep_in_ram: bool = False, job_name: str | None = None, timeout_min: int | None = None, nodes: int | None = None, tasks_per_node: int | None = None, cpus_per_task: int | None = None, gpus_per_node: int | None = None, mem_gb: float | None = None, max_jobs: Annotated[int, Gt(gt=0)] = 128, min_items_per_job: Annotated[int, Gt(gt=0)] = 1)[source]#
Bases:
_SubmititBackendDebug executor (inline but simulates submitit).
- class exca.steps.backends.Slurm(*, folder: Path | None = None, mode: Literal['cached', 'force', 'read-only', 'retry'] = 'cached', keep_in_ram: bool = False, job_name: str | None = None, timeout_min: int | None = None, nodes: int | None = None, tasks_per_node: int | None = None, cpus_per_task: int | None = None, gpus_per_node: int | None = None, mem_gb: float | None = None, max_jobs: Annotated[int, Gt(gt=0)] = 128, min_items_per_job: Annotated[int, Gt(gt=0)] = 1, constraint: str | None = None, partition: str | None = None, account: str | None = None, qos: str | None = None, additional_parameters: dict[str, int | str | float | bool] | None = None, use_srun: bool = False)[source]#
Bases:
_SubmititBackendSlurm cluster execution + caching. Fails on non-slurm machines.
- class exca.steps.backends.Auto(*, folder: Path | None = None, mode: Literal['cached', 'force', 'read-only', 'retry'] = 'cached', keep_in_ram: bool = False, job_name: str | None = None, timeout_min: int | None = None, nodes: int | None = None, tasks_per_node: int | None = None, cpus_per_task: int | None = None, gpus_per_node: int | None = None, mem_gb: float | None = None, max_jobs: Annotated[int, Gt(gt=0)] = 128, min_items_per_job: Annotated[int, Gt(gt=0)] = 1, constraint: str | None = None, partition: str | None = None, account: str | None = None, qos: str | None = None, additional_parameters: dict[str, int | str | float | bool] | None = None, use_srun: bool = False)[source]#
Bases:
SlurmAuto-detect executor (local or Slurm). Slurm fields only apply on slurm.
Helpers#
- class exca.steps.helpers.Func(*, infra: Backend | None = None, function: Annotated[Callable[[...], Any], ImportString], input_param: str | None = None, **kwargs: Any)[source]#
Wrap a plain function as a Step.
- Parameters:
function (Callable[[...], Any]) – Callable (or dotted import path) to wrap.
input_param (str | None) – Name of the parameter that receives pipeline input at runtime.
None(default) = auto-detect from signature (the single parameter without a default; generator if all have defaults; errors if 2+ required).the (All other keyword arguments are passed as fixed configuration to)
signature. (function and are validated against its)
Examples
>>> def scale(x: float, factor: float = 2.0) -> float: ... return x * factor >>> Func(function=scale, factor=3.0).run(5.0) 15.0