API Reference#
Note
The Step API is part of Exca core but its public surface is still evolving — expect breaking changes between minor versions. See Step for context.
Core classes#
- class exca.steps.Step(*, infra: Backend | None = None)#
Base class for pipeline steps.
Override
_run()to implement computation:class Generator(Step): def _run(self): return load_data() class Transformer(Step): coeff: float = 1.0 def _run(self, data): return data * self.coeff
Override
_resolve_step()to decompose into a chain of steps:class Pipeline(Step): transforms: list[Step] = [] def _run(self, data): return expensive_computation(data) def _resolve_step(self): if not self.transforms: return self stripped = self.model_copy(update={"transforms": []}) return Chain(steps=[stripped] + self.transforms)
Note
When
Stepis used as a pydantic field type, a list/tuple is auto-converted to aChain(and a dict is dispatched on the discriminator key,"type"by default). Configs typically pass dicts rather than instances so they round-trip through YAML/JSON:class Config(pydantic.BaseModel): pipeline: Step Config(pipeline=[ {"type": "Mult", "coeff": 2}, {"type": "Mult", "coeff": 3}, ]) # pipeline is a Chain
- clone(*args: dict[str, Any], **kwargs: Any) Self#
Create a fresh Step config, optionally updated with params.
- item_uid(value: Any) str | None#
Custom cache uid for value, or
Nonefor default keying.
- lookup(value: Any = <exca.steps.identity.NoValue object>, *, _upstream: Sequence[Step] = (), _uid: str | None = None) LookupHandle#
Return a
LookupHandlefor inspecting or clearing the cache.- Parameters:
value – The input value to look up. Omit for no-input steps.
- Returns:
Handle to inspect, retrieve, or clear the cached result.
- Return type:
- run(value: Any = <exca.steps.identity.NoValue object>) Any#
Execute the step, using the cache and backend when configured.
- Parameters:
value – Input to the step. Omit for no-input steps.
- Returns:
Cached or freshly computed result.
- Return type:
Any
- class exca.steps.Chain(*, infra: Backend | None = None, steps: Sequence[Step] | OrderedDict[str, Step])#
Bases:
StepCompose multiple steps sequentially.
Example:
chain = Chain( steps=[LoadData(path="x.csv"), Train(epochs=10)], infra={"backend": "Cached", "folder": "/cache"}, ) result = chain.run()
Chain has the same public API as Step; the constructor
adds a steps field (a Sequence[Step] or OrderedDict[str,
Step]). chain[i] and len(chain) index into the sub-steps;
slicing returns a new Chain.
Batched execution#
- class exca.steps.Items(values: Iterable[Any] | None = None)[source]#
Batch wrapper: run the same step over many inputs.
step.run(Items([v1, v2, ...]))produces a streaming iterator yielding one result per input, in order, with one cache entry per(step, input)pair.Items()with no arguments is the no-input form for generator steps.Example:
step = Multiply(coeff=2.0, infra={"backend": "Cached", "folder": cache}) for r in step.run(Items([1.0, 2.0, 3.0])): print(r) # 2.0, then 4.0, then 6.0
See Items — batched compute for batched semantics, custom
item_uid, and_run_batch.
Cache lookup#
- class exca.steps.backends.LookupHandle(paths: StepPaths | None = None, cache_dict: CacheDict[Any] | None = None, backend: Backend | None = None, uid: str = '')[source]#
Cache handle for a
(step, value)pair.Returned by
Step.lookup(). Provides read-only access to the cache entry and its on-disk paths.- clear_cache(recursive: bool = True) None[source]#
Delete the cached result and associated files.
- Parameters:
recursive – Also clear sub-step caches (e.g. inside a
Chain).
LookupHandle.paths exposes a StepPaths dataclass with two
relevant attributes: step_folder (logs, metadata) and
cache_folder (cache entries).
Backends#
All backends share the fields declared on the base
Backend:
- class exca.steps.backends.Backend(*, folder: Path | None = None, mode: Literal['cached', 'force', 'read-only', 'retry'] = 'cached', keep_in_ram: bool = False)[source]#
Base class for execution backends with integrated caching.
Submitit-based backends — LocalProcess, SubmititDebug,
Slurm, Auto — additionally accept:
job_name: str | Nonetimeout_min: int | Nonenodes, tasks_per_node, cpus_per_task, gpus_per_node(int | None),mem_gb: float | Nonemax_jobs: int = 128— maximum number of array sub-jobs.min_items_per_job: int = 1— combine items into one job below this count.
Pool backends — ProcessPool, ThreadPool — additionally
accept:
max_jobs: int | None = 128— maximum pool size.
- class exca.steps.backends.Cached(*, folder: Path | None = None, mode: Literal['cached', 'force', 'read-only', 'retry'] = 'cached', keep_in_ram: bool = False)[source]#
Bases:
BackendInline execution + caching.
- class exca.steps.backends.LocalProcess(*, folder: Path | None = None, mode: Literal['cached', 'force', 'read-only', 'retry'] = 'cached', keep_in_ram: bool = False, job_name: str | None = None, timeout_min: int | None = None, nodes: int | None = None, tasks_per_node: int | None = None, cpus_per_task: int | None = None, gpus_per_node: int | None = None, mem_gb: float | None = None, max_jobs: Annotated[int, Gt(gt=0)] = 128, min_items_per_job: Annotated[int, Gt(gt=0)] = 1)[source]#
Bases:
_SubmititBackendSubprocess execution + caching.
- class exca.steps.backends.SubmititDebug(*, folder: Path | None = None, mode: Literal['cached', 'force', 'read-only', 'retry'] = 'cached', keep_in_ram: bool = False, job_name: str | None = None, timeout_min: int | None = None, nodes: int | None = None, tasks_per_node: int | None = None, cpus_per_task: int | None = None, gpus_per_node: int | None = None, mem_gb: float | None = None, max_jobs: Annotated[int, Gt(gt=0)] = 128, min_items_per_job: Annotated[int, Gt(gt=0)] = 1)[source]#
Bases:
_SubmititBackendDebug executor (inline but simulates submitit).
- class exca.steps.backends.Slurm(*, folder: Path | None = None, mode: Literal['cached', 'force', 'read-only', 'retry'] = 'cached', keep_in_ram: bool = False, job_name: str | None = None, timeout_min: int | None = None, nodes: int | None = None, tasks_per_node: int | None = None, cpus_per_task: int | None = None, gpus_per_node: int | None = None, mem_gb: float | None = None, max_jobs: Annotated[int, Gt(gt=0)] = 128, min_items_per_job: Annotated[int, Gt(gt=0)] = 1, constraint: str | None = None, partition: str | None = None, account: str | None = None, qos: str | None = None, additional_parameters: dict[str, int | str | float | bool] | None = None, use_srun: bool = False)[source]#
Bases:
_SubmititBackendSlurm cluster execution + caching. Fails on non-slurm machines.
- class exca.steps.backends.Auto(*, folder: Path | None = None, mode: Literal['cached', 'force', 'read-only', 'retry'] = 'cached', keep_in_ram: bool = False, job_name: str | None = None, timeout_min: int | None = None, nodes: int | None = None, tasks_per_node: int | None = None, cpus_per_task: int | None = None, gpus_per_node: int | None = None, mem_gb: float | None = None, max_jobs: Annotated[int, Gt(gt=0)] = 128, min_items_per_job: Annotated[int, Gt(gt=0)] = 1, constraint: str | None = None, partition: str | None = None, account: str | None = None, qos: str | None = None, additional_parameters: dict[str, int | str | float | bool] | None = None, use_srun: bool = False)[source]#
Bases:
SlurmAuto-detect executor (local or Slurm). Slurm fields only apply on slurm.
Helpers#
- class exca.steps.helpers.Func(*, infra: Backend | None = None, function: Annotated[Callable[[...], Any], ImportString], input_param: str | None = None, **kwargs: Any)[source]#
Wrap a plain function as a Step.
- Parameters:
function (Callable[[...], Any]) – Callable (or dotted import path) to wrap.
input_param (str | None) – Name of the parameter that receives pipeline input at runtime.
None(default) = auto-detect from signature (the single parameter without a default; generator if all have defaults; errors if 2+ required).the (All other keyword arguments are passed as fixed configuration to)
signature. (function and are validated against its)
Examples
>>> def scale(x: float, factor: float = 2.0) -> float: ... return x * factor >>> Func(function=scale, factor=3.0).run(5.0) 15.0