spdl.pipeline.defs.PathVariantsConfig

class PathVariantsConfig(name: str, router: Callable[[Any], int] | Callable[[Any], Awaitable[int]], paths: tuple[tuple[PipeConfig[Any, Any] | AggregateConfig[Any] | DisaggregateConfig[Any] | PathVariantsConfig[Any], ...], ...])[source]

Configuration for variant path routing.

Use PathVariants() to create a config.

Routes each incoming item to one of several processing paths based on a router function. All paths produce the same output type and their outputs are merged back into a single stream.

This is useful when items need different processing depending on runtime conditions. For example:

  • Caching: route cached items to a fast cache-read path while uncached items go through full data loading.

  • Hybrid processing: split items between local and remote processing based on size or availability.

See also

Example: Pipeline definitions

Illustrates how to build a complex pipeline.

Example:

from spdl.pipeline import PipelineBuilder

def cache_router(item):
    return 0 if item in cache else 1

pipeline = (
    PipelineBuilder()
    .add_source(items)
    .path_variants(
        router=cache_router,
        paths=[
            [Pipe(load_from_cache)],   # path 0: cache hit
            [Pipe(load_from_source)],  # path 1: cache miss
        ],
    )
    .add_sink(buffer_size=10)
    .build()
)

Note

Paths can only contain pipe stages (PipeConfig, AggregateConfig, DisaggregateConfig, or nested PathVariantsConfig). SourceConfig and SinkConfig are not allowed in paths.

Attributes

name

Name of the path variants stage.

router

A function that takes an item and returns an index into the paths list.

paths

Alternative processing paths.

name: str

Name of the path variants stage.

paths: tuple[tuple[PipeConfig[Any, Any] | AggregateConfig[Any] | DisaggregateConfig[Any] | PathVariantsConfig[Any], ...], ...]

Alternative processing paths. Each path is a tuple of pipe configs.

router: Callable[[Any], int] | Callable[[Any], Awaitable[int]]

A function that takes an item and returns an index into the paths list.

Can be a regular function or an async function/callable.