spdl.pipeline.defs.Merge

Merge(pipeline_configs: Sequence[PipelineConfig[Any]], op: Callable[[str, Sequence[Queue], Queue], Awaitable[None]] | None = None) MergeConfig[source]

Create a MergeConfig.

Merge multiple pipelines into one output queue.

Parameters:
  • pipeline_configs – A list of pipeline configs.

  • op

    Optional custom merge operation. If provided, this custom operation will be used to merge items from the input queues. The operation should be an async function with signature: (name: str, input_queues: Sequence[Queue], output_queue: Queue) -> None

    If not provided, the default merge operation will be used, which passes items from all input queues to the output queue in the order they become available.

Returns:

The config object.

Example

Custom round-robin merge operation that checks queues one by one.

import asyncio
from collections.abc import Sequence

from spdl.pipeline import (
    build_pipeline,
    is_eof,
    PipelineConfig,
    Merge,
    SourceConfig,
    SinkConfig,
)

async def round_robin_merge(
    name: str,
    input_queues: Sequence[asyncio.Queue],
    output_queue: asyncio.Queue,
) -> None:
    '''Merge that polls queues in round-robin order.'''
    active_queues = list(input_queues)

    while active_queues:
        for queue in list(active_queues):
            item = await queue.get()

            if is_eof(item):
                active_queues.remove(queue)
            else:
                await output_queue.put(item)

# Use the custom merge operation
plc1 = PipelineConfig(
    src=SourceConfig([1, 2, 3]), pipes=[], sink=SinkConfig(10))
plc2 = PipelineConfig(
    src=SourceConfig([4, 5, 6]), pipes=[], sink=SinkConfig(10))

pipeline_config = PipelineConfig(
    src=Merge([plc1, plc2], op=round_robin_merge),
    pipes=[],
    sink=SinkConfig(10),
)

pipeline = build_pipeline(pipeline_config)

See also

Example: Pipeline definitions

Illustrates how to build a complex pipeline.