spdl.pipeline.defs.Merge

Merge(pipeline_configs: Sequence[PipelineConfig[Any]], op: Callable[[str, Sequence[Queue], Queue], Awaitable[None]] | None = None) MergeConfig[source]

Create a MergeConfig.

Merge multiple pipelines into one output queue.

Parameters:
  • pipeline_configs – A list of pipeline configs.

  • op

    Optional custom merge operation. If provided, this custom operation will be used to merge items from the input queues. The operation should be an async function with signature: (name: str, input_queues: Sequence[Queue], output_queue: Queue) -> None

    If not provided, the default merge operation will be used, which passes items from all input queues to the output queue in the order they become available.

    Changed in version 0.1.7: Custom merge operations can now exit early without hanging the pipeline. The upstream stages are automatically cleaned up when the merge operation returns.

Returns:

The config object.

Example

Custom round-robin merge operation that checks queues one by one.

import asyncio
from collections.abc import Sequence

from spdl.pipeline import (
    build_pipeline,
    is_eof,
    PipelineConfig,
    Merge,
    SourceConfig,
    SinkConfig,
)

async def round_robin_merge(
    name: str,
    input_queues: Sequence[asyncio.Queue],
    output_queue: asyncio.Queue,
) -> None:
    '''Merge that polls queues in round-robin order.'''
    active_queues = list(input_queues)

    while active_queues:
        for queue in list(active_queues):
            item = await queue.get()

            if is_eof(item):
                active_queues.remove(queue)
            else:
                await output_queue.put(item)

# Use the custom merge operation
plc1 = PipelineConfig(
    src=SourceConfig([1, 2, 3]), pipes=[], sink=SinkConfig(10))
plc2 = PipelineConfig(
    src=SourceConfig([4, 5, 6]), pipes=[], sink=SinkConfig(10))

pipeline_config = PipelineConfig(
    src=Merge([plc1, plc2], op=round_robin_merge),
    pipes=[],
    sink=SinkConfig(10),
)

pipeline = build_pipeline(pipeline_config)

See also

Example: Pipeline definitions

Illustrates how to build a complex pipeline.