spdl.pipeline.defs.PathVariantsConfig¶
- class PathVariantsConfig(name: str, router: Callable[[Any], int] | Callable[[Any], Awaitable[int]], paths: tuple[tuple[PipeConfig[Any, Any] | AggregateConfig[Any] | DisaggregateConfig[Any] | PathVariantsConfig[Any], ...], ...])[source]¶
Configuration for variant path routing.
Use
PathVariants()to create a config.Routes each incoming item to one of several processing paths based on a router function. All paths produce the same output type and their outputs are merged back into a single stream.
This is useful when items need different processing depending on runtime conditions. For example:
Caching: route cached items to a fast cache-read path while uncached items go through full data loading.
Hybrid processing: split items between local and remote processing based on size or availability.
See also
- Example: Pipeline definitions
Illustrates how to build a complex pipeline.
Example:
from spdl.pipeline import PipelineBuilder def cache_router(item): return 0 if item in cache else 1 pipeline = ( PipelineBuilder() .add_source(items) .path_variants( router=cache_router, paths=[ [Pipe(load_from_cache)], # path 0: cache hit [Pipe(load_from_source)], # path 1: cache miss ], ) .add_sink(buffer_size=10) .build() )
Note
Paths can only contain pipe stages (
PipeConfig,AggregateConfig,DisaggregateConfig, or nestedPathVariantsConfig).SourceConfigandSinkConfigare not allowed in paths.Attributes
Name of the path variants stage.
A function that takes an item and returns an index into the paths list.
Alternative processing paths.
- paths: tuple[tuple[PipeConfig[Any, Any] | AggregateConfig[Any] | DisaggregateConfig[Any] | PathVariantsConfig[Any], ...], ...]¶
Alternative processing paths. Each path is a tuple of pipe configs.