API Reference#

Note

The Step API is part of Exca core but its public surface is still evolving — expect breaking changes between minor versions. See Step for context.

Core classes#

class exca.steps.Step(*, infra: Backend | None = None)#

Base class for pipeline steps.

Override _run() to implement computation:

class Generator(Step):
    def _run(self):
        return load_data()


class Transformer(Step):
    coeff: float = 1.0

    def _run(self, data):
        return data * self.coeff

Override _resolve_step() to decompose into a chain of steps:

class Pipeline(Step):
    transforms: list[Step] = []

    def _run(self, data):
        return expensive_computation(data)

    def _resolve_step(self):
        if not self.transforms:
            return self
        stripped = self.model_copy(update={"transforms": []})
        return Chain(steps=[stripped] + self.transforms)

Note

When Step is used as a pydantic field type, a list/tuple is auto-converted to a Chain (and a dict is dispatched on the discriminator key, "type" by default). Configs typically pass dicts rather than instances so they round-trip through YAML/JSON:

class Config(pydantic.BaseModel):
    pipeline: Step

Config(pipeline=[
    {"type": "Mult", "coeff": 2},
    {"type": "Mult", "coeff": 3},
])  # pipeline is a Chain
clone(*args: dict[str, Any], **kwargs: Any) Self#

Create a fresh Step config, optionally updated with params.

item_uid(value: Any) str | None#

Custom cache uid for value, or None for default keying.

lookup(value: Any = <exca.steps.identity.NoValue object>, *, _upstream: Sequence[Step] = (), _uid: str | None = None) LookupHandle#

Return a LookupHandle for inspecting or clearing the cache.

Parameters:

value – The input value to look up. Omit for no-input steps.

Returns:

Handle to inspect, retrieve, or clear the cached result.

Return type:

LookupHandle

run(value: Any = <exca.steps.identity.NoValue object>) Any#

Execute the step, using the cache and backend when configured.

Parameters:

value – Input to the step. Omit for no-input steps.

Returns:

Cached or freshly computed result.

Return type:

Any

class exca.steps.Chain(*, infra: Backend | None = None, steps: Sequence[Step] | OrderedDict[str, Step])#

Bases: Step

Compose multiple steps sequentially.

Example:

chain = Chain(
    steps=[LoadData(path="x.csv"), Train(epochs=10)],
    infra={"backend": "Cached", "folder": "/cache"},
)
result = chain.run()

Chain has the same public API as Step; the constructor adds a steps field (a Sequence[Step] or OrderedDict[str, Step]). chain[i] and len(chain) index into the sub-steps; slicing returns a new Chain.

Batched execution#

class exca.steps.Items(values: Iterable[Any] | None = None)[source]#

Batch wrapper: run the same step over many inputs.

step.run(Items([v1, v2, ...])) produces a streaming iterator yielding one result per input, in order, with one cache entry per (step, input) pair. Items() with no arguments is the no-input form for generator steps.

Example:

step = Multiply(coeff=2.0, infra={"backend": "Cached", "folder": cache})
for r in step.run(Items([1.0, 2.0, 3.0])):
    print(r)                                    # 2.0, then 4.0, then 6.0

See Items — batched compute for batched semantics, custom item_uid, and _run_batch.

exception exca.steps.items.BatchProtocolError[source]#

Raised when _run_batch does not yield one result per consumed input.

Cache lookup#

class exca.steps.backends.LookupHandle(paths: StepPaths | None = None, cache_dict: CacheDict[Any] | None = None, backend: Backend | None = None, uid: str = '')[source]#

Cache handle for a (step, value) pair.

Returned by Step.lookup(). Provides read-only access to the cache entry and its on-disk paths.

property cache_dict: CacheDict[Any][source]#

CacheDict for this entry.

cached() bool[source]#

True iff there is a cached success or error.

clear_cache(recursive: bool = True) None[source]#

Delete the cached result and associated files.

Parameters:

recursive – Also clear sub-step caches (e.g. inside a Chain).

job() Job[Any] | None[source]#

Return the live inflight job, or latest submitit job recorded for logs.

property paths: StepPaths[source]#

On-disk path layout (StepPaths) for this entry.

result() Any[source]#

Return the cached value, or re-raise a cached error.

property status: Literal['success', 'error', 'running', None][source]#

"success", "error", "running", or None.

Type:

Entry status

LookupHandle.paths exposes a StepPaths dataclass with two relevant attributes: step_folder (logs, metadata) and cache_folder (cache entries).

Backends#

All backends share the fields declared on the base Backend:

class exca.steps.backends.Backend(*, folder: Path | None = None, mode: Literal['cached', 'force', 'read-only', 'retry'] = 'cached', keep_in_ram: bool = False)[source]#

Base class for execution backends with integrated caching.

Submitit-based backends — LocalProcess, SubmititDebug, Slurm, Auto — additionally accept:

  • job_name: str | None

  • timeout_min: int | None

  • nodes, tasks_per_node, cpus_per_task, gpus_per_node (int | None), mem_gb: float | None

  • max_jobs: int = 128 — maximum number of array sub-jobs.

  • min_items_per_job: int = 1 — combine items into one job below this count.

Pool backends — ProcessPool, ThreadPool — additionally accept:

  • max_jobs: int | None = 128 — maximum pool size.

class exca.steps.backends.Cached(*, folder: Path | None = None, mode: Literal['cached', 'force', 'read-only', 'retry'] = 'cached', keep_in_ram: bool = False)[source]#

Bases: Backend

Inline execution + caching.

class exca.steps.backends.LocalProcess(*, folder: Path | None = None, mode: Literal['cached', 'force', 'read-only', 'retry'] = 'cached', keep_in_ram: bool = False, job_name: str | None = None, timeout_min: int | None = None, nodes: int | None = None, tasks_per_node: int | None = None, cpus_per_task: int | None = None, gpus_per_node: int | None = None, mem_gb: float | None = None, max_jobs: Annotated[int, Gt(gt=0)] = 128, min_items_per_job: Annotated[int, Gt(gt=0)] = 1)[source]#

Bases: _SubmititBackend

Subprocess execution + caching.

class exca.steps.backends.SubmititDebug(*, folder: Path | None = None, mode: Literal['cached', 'force', 'read-only', 'retry'] = 'cached', keep_in_ram: bool = False, job_name: str | None = None, timeout_min: int | None = None, nodes: int | None = None, tasks_per_node: int | None = None, cpus_per_task: int | None = None, gpus_per_node: int | None = None, mem_gb: float | None = None, max_jobs: Annotated[int, Gt(gt=0)] = 128, min_items_per_job: Annotated[int, Gt(gt=0)] = 1)[source]#

Bases: _SubmititBackend

Debug executor (inline but simulates submitit).

class exca.steps.backends.Slurm(*, folder: Path | None = None, mode: Literal['cached', 'force', 'read-only', 'retry'] = 'cached', keep_in_ram: bool = False, job_name: str | None = None, timeout_min: int | None = None, nodes: int | None = None, tasks_per_node: int | None = None, cpus_per_task: int | None = None, gpus_per_node: int | None = None, mem_gb: float | None = None, max_jobs: Annotated[int, Gt(gt=0)] = 128, min_items_per_job: Annotated[int, Gt(gt=0)] = 1, constraint: str | None = None, partition: str | None = None, account: str | None = None, qos: str | None = None, additional_parameters: dict[str, int | str | float | bool] | None = None, use_srun: bool = False)[source]#

Bases: _SubmititBackend

Slurm cluster execution + caching. Fails on non-slurm machines.

class exca.steps.backends.Auto(*, folder: Path | None = None, mode: Literal['cached', 'force', 'read-only', 'retry'] = 'cached', keep_in_ram: bool = False, job_name: str | None = None, timeout_min: int | None = None, nodes: int | None = None, tasks_per_node: int | None = None, cpus_per_task: int | None = None, gpus_per_node: int | None = None, mem_gb: float | None = None, max_jobs: Annotated[int, Gt(gt=0)] = 128, min_items_per_job: Annotated[int, Gt(gt=0)] = 1, constraint: str | None = None, partition: str | None = None, account: str | None = None, qos: str | None = None, additional_parameters: dict[str, int | str | float | bool] | None = None, use_srun: bool = False)[source]#

Bases: Slurm

Auto-detect executor (local or Slurm). Slurm fields only apply on slurm.

class exca.steps.backends.ProcessPool(*, folder: Path | None = None, mode: Literal['cached', 'force', 'read-only', 'retry'] = 'cached', keep_in_ram: bool = False, max_jobs: Annotated[int | None, Gt(gt=0)] = 128)[source]#

Bases: _PoolBackend

Process pool execution + caching.

class exca.steps.backends.ThreadPool(*, folder: Path | None = None, mode: Literal['cached', 'force', 'read-only', 'retry'] = 'cached', keep_in_ram: bool = False, max_jobs: Annotated[int | None, Gt(gt=0)] = 128)[source]#

Bases: _PoolBackend

Thread pool execution + caching.

Helpers#

class exca.steps.helpers.Func(*, infra: Backend | None = None, function: Annotated[Callable[[...], Any], ImportString], input_param: str | None = None, **kwargs: Any)[source]#

Wrap a plain function as a Step.

Parameters:
  • function (Callable[[...], Any]) – Callable (or dotted import path) to wrap.

  • input_param (str | None) – Name of the parameter that receives pipeline input at runtime. None (default) = auto-detect from signature (the single parameter without a default; generator if all have defaults; errors if 2+ required).

  • the (All other keyword arguments are passed as fixed configuration to)

  • signature. (function and are validated against its)

Examples

>>> def scale(x: float, factor: float = 2.0) -> float:
...     return x * factor
>>> Func(function=scale, factor=3.0).run(5.0)
15.0