spdl.pipeline.defs.PathVariants¶
- PathVariants(router: Callable[[Any], int] | Callable[[Any], Awaitable[int]], paths: Sequence[Sequence[PipeConfig[Any, Any] | AggregateConfig[Any] | DisaggregateConfig[Any] | PathVariantsConfig[Any]]], name: str | None = None) PathVariantsConfig[Any][source]¶
Create a
PathVariantsConfigfor variant path routing.See also
- Example: Pipeline definitions
Illustrates how to build a complex pipeline.
Routes each incoming item to one of several processing paths based on a router function. Each path is an independent chain of pipe configs. The outputs of all paths are merged back into a single stream.
This is useful when items need different processing depending on runtime conditions — e.g., routing cached items to a fast cache-read path while uncached items go through full data loading, or splitting between local and remote processing.
- Parameters:
router – A callable that takes an item and returns an int index selecting which path the item should be routed to. The returned index must be in range
[0, len(paths)).paths – A sequence of paths. Each path is a sequence of pipe configs (
PipeConfig,AggregateConfig,DisaggregateConfig, or nestedPathVariantsConfig).SourceConfigandSinkConfigare not allowed.name – Optional name for the stage.
- Returns:
The config object.
- Raises:
ValueError – If
routeris not callable,pathsis empty, or a path containsSourceConfigorSinkConfig.
Example:
from spdl.pipeline.defs import PathVariants, Pipe, PipelineConfig, SourceConfig, SinkConfig config = PipelineConfig( src=SourceConfig(items), pipes=[ PathVariants( router=lambda item: 0 if item in cache else 1, paths=[ [Pipe(load_from_cache)], # path 0: cache hit [Pipe(load_from_source)], # path 1: cache miss ], ), ], sink=SinkConfig(buffer_size=10), )