spdl.pipeline.defs.Merge¶
- Merge(pipeline_configs: Sequence[PipelineConfig[Any]], op: Callable[[str, Sequence[Queue], Queue], Awaitable[None]] | None = None) MergeConfig[source]¶
Create a
MergeConfig.Merge multiple pipelines into one output queue.
- Parameters:
pipeline_configs – A list of pipeline configs.
op –
Optional custom merge operation. If provided, this custom operation will be used to merge items from the input queues. The operation should be an async function with signature:
(name: str, input_queues: Sequence[Queue], output_queue: Queue) -> NoneIf not provided, the default merge operation will be used, which passes items from all input queues to the output queue in the order they become available.
- Returns:
The config object.
Example
Custom round-robin merge operation that checks queues one by one.
import asyncio from collections.abc import Sequence from spdl.pipeline import ( build_pipeline, is_eof, PipelineConfig, Merge, SourceConfig, SinkConfig, ) async def round_robin_merge( name: str, input_queues: Sequence[asyncio.Queue], output_queue: asyncio.Queue, ) -> None: '''Merge that polls queues in round-robin order.''' active_queues = list(input_queues) while active_queues: for queue in list(active_queues): item = await queue.get() if is_eof(item): active_queues.remove(queue) else: await output_queue.put(item) # Use the custom merge operation plc1 = PipelineConfig( src=SourceConfig([1, 2, 3]), pipes=[], sink=SinkConfig(10)) plc2 = PipelineConfig( src=SourceConfig([4, 5, 6]), pipes=[], sink=SinkConfig(10)) pipeline_config = PipelineConfig( src=Merge([plc1, plc2], op=round_robin_merge), pipes=[], sink=SinkConfig(10), ) pipeline = build_pipeline(pipeline_config)
See also
- Example: Pipeline definitions
Illustrates how to build a complex pipeline.