Pipeline definitions

Comprehensive example defining a complex pipeline with spdl.pipeline.defs.

This example showcases the usage of all configuration classes available in the spdl.pipeline.defs module, including:

  • SourceConfig: Configures data sources for pipelines

  • PipeConfig: Configures individual processing stages (via factory functions)

  • SinkConfig: Configures output buffering for pipelines

  • PipelineConfig: Top-level pipeline configuration combining all components

  • Merge: Merges outputs from multiple pipelines into a single stream

The example also demonstrates the factory functions:

  • Pipe(): Creates pipe configurations for general processing

  • Aggregate(): Creates configurations for batching/grouping data

  • Disaggregate(): Creates configurations for splitting batched data

The pipeline structure created by this example is illustrated below:

Note

The pipeline defined here uses merge mechanism, which is not supported by PipelineBuilder.

graph TD subgraph "Pipeline 1" S1[Source: range#40;5#41;] --> P1[Pipe: square] P1 --> AGG1[Aggregate: batch_size=2] AGG1 --> SNK1[Sink 1] end subgraph "Pipeline 2" S2[Source: range#40;10,15#41;] --> P2[Pipe: add_100] P2 --> SNK2[Sink 2] end subgraph "Main Pipeline" SNK1 --> M[Merge] SNK2 --> M M --> NORM[Pipe: normalize_to_lists] NORM --> DISAGG[Disaggregate] DISAGG --> P3[Pipe: multiply_by_10] P3 --> FINAL_SINK[Final Sink] end

The data flow:

  1. Pipeline 1: [0, 1, 2, 3, 4]square[[0, 1], [4, 9], [16]]

  2. Pipeline 2: [10, 11, 12, 13, 14]add_100[110, 111, 112, 113, 114]

  3. Merge combines outputs: [[0, 1], [4, 9], [16], 110, 111, 112, 113, 114]

  4. Normalize handles mixed data types: [[0, 1], [4, 9], [16], [110], [111], [112], [113], [114]]

  5. Disaggregate flattens: [0, 1, 4, 9, 16, 110, 111, 112, 113, 114]

  6. Finally, multiply by 10: [0, 10, 40, 90, 160, 1100, 1110, 1120, 1130, 1140]

Source

Source

Click here to see the source.
  1#!/usr/bin/env python3
  2# Copyright (c) Meta Platforms, Inc. and affiliates.
  3# All rights reserved.
  4#
  5# This source code is licensed under the BSD-style license found in the
  6# LICENSE file in the root directory of this source tree.
  7
  8"""Comprehensive example defining a complex pipeline with :py:mod:`spdl.pipeline.defs`.
  9
 10This example showcases the usage of all configuration classes available in the
 11:py:mod:`spdl.pipeline.defs` module, including:
 12
 13.. py:currentmodule:: spdl.pipeline.defs
 14
 15- :py:class:`SourceConfig`: Configures data sources for pipelines
 16- :py:class:`PipeConfig`: Configures individual processing stages (via factory functions)
 17- :py:class:`SinkConfig`: Configures output buffering for pipelines
 18- :py:class:`PipelineConfig`: Top-level pipeline configuration combining all components
 19- :py:class:`Merge`: Merges outputs from multiple pipelines into a single stream
 20
 21The example also demonstrates the factory functions:
 22
 23- :py:func:`Pipe`: Creates pipe configurations for general processing
 24- :py:func:`Aggregate`: Creates configurations for batching/grouping data
 25- :py:func:`Disaggregate`: Creates configurations for splitting batched data
 26
 27The pipeline structure created by this example is illustrated below:
 28
 29.. note::
 30
 31   The pipeline defined here uses merge mechanism, which is not supported by
 32   :py:class:`~spdl.pipeline.PipelineBuilder`.
 33
 34.. mermaid::
 35
 36   graph TD
 37       subgraph "Pipeline 1"
 38           S1[Source: range#40;5#41;] --> P1[Pipe: square]
 39           P1 --> AGG1[Aggregate: batch_size=2]
 40           AGG1 --> SNK1[Sink 1]
 41       end
 42
 43       subgraph "Pipeline 2"
 44           S2[Source: range#40;10,15#41;] --> P2[Pipe: add_100]
 45           P2 --> SNK2[Sink 2]
 46       end
 47
 48       subgraph "Main Pipeline"
 49           SNK1 --> M[Merge]
 50           SNK2 --> M
 51           M --> NORM[Pipe: normalize_to_lists]
 52           NORM --> DISAGG[Disaggregate]
 53           DISAGG --> P3[Pipe: multiply_by_10]
 54           P3 --> FINAL_SINK[Final Sink]
 55       end
 56
 57The data flow:
 58
 591. Pipeline 1:
 60   ``[0, 1, 2, 3, 4]`` → ``square`` → ``[[0, 1], [4, 9], [16]]``
 612. Pipeline 2:
 62   ``[10, 11, 12, 13, 14]`` → ``add_100`` → ``[110, 111, 112, 113, 114]``
 633. Merge combines outputs:
 64   ``[[0, 1], [4, 9], [16], 110, 111, 112, 113, 114]``
 654. Normalize handles mixed data types:
 66   ``[[0, 1], [4, 9], [16], [110], [111], [112], [113], [114]]``
 675. Disaggregate flattens:
 68   ``[0, 1, 4, 9, 16, 110, 111, 112, 113, 114]``
 696. Finally, multiply by 10:
 70   ``[0, 10, 40, 90, 160, 1100, 1110, 1120, 1130, 1140]``
 71"""
 72
 73__all__ = [
 74    "main",
 75    "create_sub_pipeline_1",
 76    "create_sub_pipeline_2",
 77    "create_main_pipeline",
 78    "square",
 79    "add_100",
 80    "multiply_by_10",
 81    "normalize_to_lists",
 82    "run_pipeline_example",
 83]
 84
 85import logging
 86from typing import Any
 87
 88from spdl.pipeline import build_pipeline
 89from spdl.pipeline.defs import (
 90    Aggregate,
 91    Disaggregate,
 92    Merge,
 93    Pipe,
 94    PipelineConfig,
 95    SinkConfig,
 96    SourceConfig,
 97)
 98
 99# pyre-strict
100
101_LG: logging.Logger = logging.getLogger(__name__)
102
103
104def square(x: int) -> int:
105    """Square the input number."""
106    return x * x
107
108
109def add_100(x: int) -> int:
110    """Add 100 to the input number."""
111    return x + 100
112
113
114def multiply_by_10(x: int) -> int:
115    """Multiply the input by 10."""
116    return x * 10
117
118
119def create_sub_pipeline_1() -> PipelineConfig[int, list[int]]:
120    """Create a sub-pipeline that squares numbers and aggregates them.
121
122    .. code-block:: text
123
124       range(5)
125        → square
126        → aggregate(2)
127        → [[squared_pairs], [remaining]]
128
129    Returns:
130        Configuration for a pipeline that processes
131        ``[0,1,2,3,4]`` into batches of squared values.
132    """
133    source_config = SourceConfig(range(5))
134    square_pipe = Pipe(square)
135    aggregate_pipe = Aggregate(2, drop_last=False)
136    sink_config = SinkConfig(buffer_size=10)
137    return PipelineConfig(
138        src=source_config,
139        pipes=[square_pipe, aggregate_pipe],
140        sink=sink_config,
141    )
142
143
144def create_sub_pipeline_2() -> PipelineConfig[int, int]:
145    """Create a sub-pipeline that adds 100 to numbers.
146
147    .. code-block:: text
148
149       range(10,15)
150        → add_100
151        → individual_values
152
153    Returns:
154        Configuration for a pipeline that processes
155        ``[10,11,12,13,14]`` by adding ``100``.
156    """
157    source_config = SourceConfig(range(10, 15))
158    add_pipe = Pipe(add_100, concurrency=2)
159    sink_config = SinkConfig(buffer_size=5)
160
161    return PipelineConfig(
162        src=source_config,
163        pipes=[add_pipe],
164        sink=sink_config,
165    )
166
167
168def normalize_to_lists(item: Any) -> list[Any]:
169    """Flatten lists or wrap individual items in a list for uniform handling."""
170    if isinstance(item, list):
171        return item
172    else:
173        return [item]
174
175
176def create_main_pipeline(
177    sub_pipeline_1: PipelineConfig[int, list[int]],
178    sub_pipeline_2: PipelineConfig[int, int],
179) -> PipelineConfig[Any, int]:
180    """Create the main pipeline that merges outputs from sub-pipelines.
181
182    .. code-block:: text
183
184       Merge([sub1, sub2])
185         → normalize_to_lists
186         → disaggregate
187         → multiply_by_10
188
189    Args:
190        sub_pipeline_1: First sub-pipeline configuration
191        sub_pipeline_2: Second sub-pipeline configuration
192
193    Returns:
194        Main pipeline configuration that merges and processes the sub-pipeline outputs.
195    """
196    merge_config = Merge([sub_pipeline_1, sub_pipeline_2])
197    normalize_pipe = Pipe(normalize_to_lists)
198    disaggregate_pipe = Disaggregate()
199    multiply_pipe = Pipe(
200        multiply_by_10,
201        concurrency=3,
202        output_order="input",  # Maintain input order
203    )
204
205    sink_config = SinkConfig(buffer_size=20)
206
207    return PipelineConfig(
208        src=merge_config,
209        pipes=[normalize_pipe, disaggregate_pipe, multiply_pipe],
210        sink=sink_config,
211    )
212
213
214def run_pipeline_example() -> list[int]:
215    """Execute the complete pipeline example and return results.
216
217    Returns:
218        List of processed integers from the merged pipeline execution.
219    """
220    _LG.info("Creating sub-pipeline configurations...")
221
222    sub_pipeline_1 = create_sub_pipeline_1()
223    sub_pipeline_2 = create_sub_pipeline_2()
224
225    _LG.info("Sub-pipeline 1: %s", sub_pipeline_1)
226    _LG.info("Sub-pipeline 2: %s", sub_pipeline_2)
227
228    main_pipeline_config = create_main_pipeline(sub_pipeline_1, sub_pipeline_2)
229
230    _LG.info("Main pipeline config: %s", main_pipeline_config)
231
232    _LG.info("Builting the pipeline.")
233    pipeline = build_pipeline(main_pipeline_config, num_threads=4)
234
235    _LG.info("Executing the pipeline.")
236    results = []
237    with pipeline.auto_stop():
238        for item in pipeline:
239            results.append(item)
240
241    return results
242
243
244def run() -> None:
245    """Run example pipeline and check the resutl."""
246    results = run_pipeline_example()
247
248    _LG.info("Final results: %s", results)
249    _LG.info("Number of items processed: %d", len(results))
250
251    # Verify expected data flow
252    expected_squared = [0, 1, 4, 9, 16]  # squares of 0-4
253    expected_added = [110, 111, 112, 113, 114]  # 10-14 + 100
254    expected_combined_count = len(expected_squared) + len(expected_added)
255
256    if len(results) != expected_combined_count:
257        raise RuntimeError(
258            f"✗ Unexpected number of items: got {len(results)}, expected {expected_combined_count}"
259        )
260    _LG.info("✓ Pipeline processed expected number of items")
261
262
263def main() -> None:
264    """Main entry point demonstrating all pipeline configuration classes."""
265    logging.basicConfig(
266        level=logging.INFO, format="%(asctime)s [%(levelname)s] %(message)s"
267    )
268
269    run()
270
271
272if __name__ == "__main__":
273    main()

Functions

Functions

main() None[source]

Main entry point demonstrating all pipeline configuration classes.

create_sub_pipeline_1() PipelineConfig[int, list[int]][source]

Create a sub-pipeline that squares numbers and aggregates them.

range(5)
 → square
 → aggregate(2)
 → [[squared_pairs], [remaining]]
Returns:

Configuration for a pipeline that processes [0,1,2,3,4] into batches of squared values.

create_sub_pipeline_2() PipelineConfig[int, int][source]

Create a sub-pipeline that adds 100 to numbers.

range(10,15)
 → add_100
 → individual_values
Returns:

Configuration for a pipeline that processes [10,11,12,13,14] by adding 100.

create_main_pipeline(sub_pipeline_1: PipelineConfig[int, list[int]], sub_pipeline_2: PipelineConfig[int, int]) PipelineConfig[Any, int][source]

Create the main pipeline that merges outputs from sub-pipelines.

Merge([sub1, sub2])
  → normalize_to_lists
  → disaggregate
  → multiply_by_10
Parameters:
  • sub_pipeline_1 – First sub-pipeline configuration

  • sub_pipeline_2 – Second sub-pipeline configuration

Returns:

Main pipeline configuration that merges and processes the sub-pipeline outputs.

square(x: int) int[source]

Square the input number.

add_100(x: int) int[source]

Add 100 to the input number.

multiply_by_10(x: int) int[source]

Multiply the input by 10.

normalize_to_lists(item: Any) list[Any][source]

Flatten lists or wrap individual items in a list for uniform handling.

run_pipeline_example() list[int][source]

Execute the complete pipeline example and return results.

Returns:

List of processed integers from the merged pipeline execution.