spdl.pipeline.defs.Merge¶
- Merge(pipeline_configs: Sequence[PipelineConfig[Any]], op: Callable[[str, Sequence[Queue], Queue], Awaitable[None]] | None = None) MergeConfig[source]¶
Create a
MergeConfig.Merge multiple pipelines into one output queue.
- Parameters:
pipeline_configs – A list of pipeline configs.
op –
Optional custom merge operation. If provided, this custom operation will be used to merge items from the input queues. The operation should be an async function with signature:
(name: str, input_queues: Sequence[Queue], output_queue: Queue) -> NoneIf not provided, the default merge operation will be used, which passes items from all input queues to the output queue in the order they become available.
Changed in version 0.1.7: Custom merge operations can now exit early without hanging the pipeline. The upstream stages are automatically cleaned up when the merge operation returns.
- Returns:
The config object.
Example
Custom round-robin merge operation that checks queues one by one.
import asyncio from collections.abc import Sequence from spdl.pipeline import ( build_pipeline, is_eof, PipelineConfig, Merge, SourceConfig, SinkConfig, ) async def round_robin_merge( name: str, input_queues: Sequence[asyncio.Queue], output_queue: asyncio.Queue, ) -> None: '''Merge that polls queues in round-robin order.''' active_queues = list(input_queues) while active_queues: for queue in list(active_queues): item = await queue.get() if is_eof(item): active_queues.remove(queue) else: await output_queue.put(item) # Use the custom merge operation plc1 = PipelineConfig( src=SourceConfig([1, 2, 3]), pipes=[], sink=SinkConfig(10)) plc2 = PipelineConfig( src=SourceConfig([4, 5, 6]), pipes=[], sink=SinkConfig(10)) pipeline_config = PipelineConfig( src=Merge([plc1, plc2], op=round_robin_merge), pipes=[], sink=SinkConfig(10), ) pipeline = build_pipeline(pipeline_config)
See also
- Example: Pipeline definitions
Illustrates how to build a complex pipeline.