spdl.pipeline.defs.PathVariants

PathVariants(router: Callable[[Any], int] | Callable[[Any], Awaitable[int]], paths: Sequence[Sequence[PipeConfig[Any, Any] | AggregateConfig[Any] | DisaggregateConfig[Any] | PathVariantsConfig[Any]]], name: str | None = None) PathVariantsConfig[Any][source]

Create a PathVariantsConfig for variant path routing.

See also

Example: Pipeline definitions

Illustrates how to build a complex pipeline.

Routes each incoming item to one of several processing paths based on a router function. Each path is an independent chain of pipe configs. The outputs of all paths are merged back into a single stream.

This is useful when items need different processing depending on runtime conditions — e.g., routing cached items to a fast cache-read path while uncached items go through full data loading, or splitting between local and remote processing.

Parameters:
Returns:

The config object.

Raises:

ValueError – If router is not callable, paths is empty, or a path contains SourceConfig or SinkConfig.

Example:

from spdl.pipeline.defs import PathVariants, Pipe, PipelineConfig, SourceConfig, SinkConfig

config = PipelineConfig(
    src=SourceConfig(items),
    pipes=[
        PathVariants(
            router=lambda item: 0 if item in cache else 1,
            paths=[
                [Pipe(load_from_cache)],   # path 0: cache hit
                [Pipe(load_from_source)],  # path 1: cache miss
            ],
        ),
    ],
    sink=SinkConfig(buffer_size=10),
)