API Reference#

Note

The Step API is part of Exca core but its public surface is still evolving — expect breaking changes between minor versions. See Step for context.

Core classes#

class exca.steps.Step(*, infra: Backend | None = None)#

Base class for pipeline steps.

Override _run() to implement computation:

class Generator(Step):
    def _run(self):
        return load_data()


class Transformer(Step):
    coeff: float = 1.0

    def _run(self, data):
        return data * self.coeff

Override _resolve_step() to decompose into a chain of steps:

class Pipeline(Step):
    transforms: list[Step] = []

    def _run(self, data):
        return expensive_computation(data)

    def _resolve_step(self):
        if not self.transforms:
            return self
        stripped = self.model_copy(update={"transforms": []})
        return Chain(steps=[stripped] + self.transforms)

Note

When Step is used as a pydantic field type, a list/tuple is auto-converted to a Chain (and a dict is dispatched on the discriminator key, "type" by default). Configs typically pass dicts rather than instances so they round-trip through YAML/JSON:

class Config(pydantic.BaseModel):
    pipeline: Step

Config(pipeline=[
    {"type": "Mult", "coeff": 2},
    {"type": "Mult", "coeff": 3},
])  # pipeline is a Chain
clone(*args: dict[str, Any], **kwargs: Any) Self#

Create a fresh Step config, optionally updated with params.

item_uid(value: Any) str | None#

Custom cache uid for value, or None for default keying.

Pure generators can return non-None for NoValue to use attributes as the item dimension (colocation). Such fields should usually be excluded via _exclude_from_cls_uid.

lookup(value: Any = <exca.steps.identity.NoValue object>, *, _upstream: Sequence[Step] = (), _uid: str | None = None) LookupHandle#

Return a LookupHandle for inspecting or clearing the cache.

Parameters:

value – The input value to look up. Omit for no-input steps.

Returns:

Handle to inspect, retrieve, or clear the cached result.

Return type:

backends.LookupHandle

run(value: Any = <exca.steps.identity.NoValue object>) Any#

Execute the step on a single input, using cache/backend when set.

Parameters:

value – Input to the step. Omit for no-input steps.

Returns:

Cached or freshly computed result.

Return type:

Any

run_many(values: Iterable[Any]) StepItems#

Execute the step over many inputs, one cache entry per input.

Parameters:

values – Inputs to run; one result is produced per input, in order.

Returns:

Iterator yielding one result per input, in input order.

Return type:

StepItems

class exca.steps.Chain(*, infra: Backend | None = None, steps: Sequence[Step] | OrderedDict[str, Step])#

Bases: Step

Compose multiple steps sequentially.

Example:

chain = Chain(
    steps=[LoadData(path="x.csv"), Train(epochs=10)],
    infra={"backend": "Cached", "folder": "/cache"},
)
result = chain.run()

Chain has the same public API as Step; the constructor adds a steps field (a Sequence[Step] or OrderedDict[str, Step]). chain[i] and len(chain) index into the sub-steps; slicing returns a new Chain.

Batched execution#

class exca.steps.items.StepItems(*, source: _Source, uids: tp.Sequence[str] | None = None, upstream: tp.Sequence[Step] = (), pending: tp.Sequence[Step] = (), mode: identity.ModeType = 'cached')[source]#

Pipeline carrier for inline computation between cached boundaries.

For dict sources, uids default to the dict keys (insertion order). For CacheDict sources, explicit uids are required (the CacheDict may contain keys from other runs).

exception exca.steps.items.BatchProtocolError[source]#

Raised when _run_batch does not yield one result per consumed input.

Cache lookup#

class exca.steps.backends.LookupHandle(paths: StepPaths | None = None, cache_dict: CacheDict[Any] | None = None, backend: Backend | None = None, uid: str = '')[source]#

Cache handle for a (step, value) pair.

Returned by Step.lookup(). Provides read-only access to the cache entry and its on-disk paths.

property cache_dict: CacheDict[Any][source]#

CacheDict for this entry.

cached() bool[source]#

True iff there is a cached success or error.

clear_cache(recursive: bool = True) None[source]#

Delete the cached result and associated files.

Parameters:

recursive – Also clear sub-step caches (e.g. inside a Chain).

job() Job[Any] | None[source]#

Return the live inflight job, or latest submitit job recorded for logs.

property paths: StepPaths[source]#

On-disk path layout (StepPaths) for this entry.

result() Any[source]#

Return the cached value, or re-raise a cached error.

property status: Literal['success', 'error', 'running', None][source]#

"success", "error", "running", or None.

Type:

Entry status

LookupHandle.paths exposes a StepPaths dataclass with two relevant attributes: step_folder (logs, metadata) and cache_folder (cache entries).

Backends#

All backends share the fields declared on the base Backend:

class exca.steps.backends.Backend(*, folder: Path | None = None, mode: Literal['cached', 'force', 'read-only', 'retry'] = 'cached', keep_in_ram: bool = False)[source]#

Base class for execution backends with integrated caching.

Submitit-based backends — LocalProcess, SubmititDebug, Slurm, Auto — additionally accept:

  • job_name: str | None

  • timeout_min: int | None

  • nodes, tasks_per_node, cpus_per_task, gpus_per_node (int | None), mem_gb: float | None

  • max_jobs: int = 128 — maximum number of array sub-jobs.

  • min_items_per_job: int = 1 — combine items into one job below this count.

Pool backends — ProcessPool, ThreadPool — additionally accept:

  • max_jobs: int | None = 128 — maximum pool size.

class exca.steps.backends.Cached(*, folder: Path | None = None, mode: Literal['cached', 'force', 'read-only', 'retry'] = 'cached', keep_in_ram: bool = False)[source]#

Bases: Backend

Inline execution + caching.

class exca.steps.backends.LocalProcess(*, folder: Path | None = None, mode: Literal['cached', 'force', 'read-only', 'retry'] = 'cached', keep_in_ram: bool = False, job_name: str | None = None, timeout_min: int | None = None, nodes: int | None = None, tasks_per_node: int | None = None, cpus_per_task: int | None = None, gpus_per_node: int | None = None, mem_gb: float | None = None, max_jobs: Annotated[int, Gt(gt=0)] = 128, min_items_per_job: Annotated[int, Gt(gt=0)] = 1)[source]#

Bases: _SubmititBackend

Subprocess execution + caching.

class exca.steps.backends.SubmititDebug(*, folder: Path | None = None, mode: Literal['cached', 'force', 'read-only', 'retry'] = 'cached', keep_in_ram: bool = False, job_name: str | None = None, timeout_min: int | None = None, nodes: int | None = None, tasks_per_node: int | None = None, cpus_per_task: int | None = None, gpus_per_node: int | None = None, mem_gb: float | None = None, max_jobs: Annotated[int, Gt(gt=0)] = 128, min_items_per_job: Annotated[int, Gt(gt=0)] = 1)[source]#

Bases: _SubmititBackend

Debug executor (inline but simulates submitit).

class exca.steps.backends.Slurm(*, folder: Path | None = None, mode: Literal['cached', 'force', 'read-only', 'retry'] = 'cached', keep_in_ram: bool = False, job_name: str | None = None, timeout_min: int | None = None, nodes: int | None = None, tasks_per_node: int | None = None, cpus_per_task: int | None = None, gpus_per_node: int | None = None, mem_gb: float | None = None, max_jobs: Annotated[int, Gt(gt=0)] = 128, min_items_per_job: Annotated[int, Gt(gt=0)] = 1, constraint: str | None = None, partition: str | None = None, account: str | None = None, qos: str | None = None, additional_parameters: dict[str, int | str | float | bool] | None = None, use_srun: bool = False)[source]#

Bases: _SubmititBackend

Slurm cluster execution + caching. Fails on non-slurm machines.

class exca.steps.backends.Auto(*, folder: Path | None = None, mode: Literal['cached', 'force', 'read-only', 'retry'] = 'cached', keep_in_ram: bool = False, job_name: str | None = None, timeout_min: int | None = None, nodes: int | None = None, tasks_per_node: int | None = None, cpus_per_task: int | None = None, gpus_per_node: int | None = None, mem_gb: float | None = None, max_jobs: Annotated[int, Gt(gt=0)] = 128, min_items_per_job: Annotated[int, Gt(gt=0)] = 1, constraint: str | None = None, partition: str | None = None, account: str | None = None, qos: str | None = None, additional_parameters: dict[str, int | str | float | bool] | None = None, use_srun: bool = False)[source]#

Bases: Slurm

Auto-detect executor (local or Slurm). Slurm fields only apply on slurm.

class exca.steps.backends.ProcessPool(*, folder: Path | None = None, mode: Literal['cached', 'force', 'read-only', 'retry'] = 'cached', keep_in_ram: bool = False, max_jobs: Annotated[int | None, Gt(gt=0)] = 128)[source]#

Bases: _PoolBackend

Process pool execution + caching.

class exca.steps.backends.ThreadPool(*, folder: Path | None = None, mode: Literal['cached', 'force', 'read-only', 'retry'] = 'cached', keep_in_ram: bool = False, max_jobs: Annotated[int | None, Gt(gt=0)] = 128)[source]#

Bases: _PoolBackend

Thread pool execution + caching.

Helpers#

class exca.steps.helpers.Func(*, infra: Backend | None = None, function: Annotated[Callable[[...], Any], ImportString], input_param: str | None = None, **kwargs: Any)[source]#

Wrap a plain function as a Step.

Parameters:
  • function (Callable[[...], Any]) – Callable (or dotted import path) to wrap.

  • input_param (str | None) – Name of the parameter that receives pipeline input at runtime. None (default) = auto-detect from signature (the single parameter without a default; generator if all have defaults; errors if 2+ required).

  • the (All other keyword arguments are passed as fixed configuration to)

  • signature. (function and are validated against its)

Examples

>>> def scale(x: float, factor: float = 2.0) -> float:
...     return x * factor
>>> Func(function=scale, factor=3.0).run(5.0)
15.0