Pipeline definitions

Comprehensive example defining a complex pipeline with spdl.pipeline.defs.

This example showcases the usage of all configuration classes available in the spdl.pipeline.defs module, including:

  • SourceConfig: Configures data sources for pipelines

  • PipeConfig: Configures individual processing stages (via factory functions)

  • SinkConfig: Configures output buffering for pipelines

  • PipelineConfig: Top-level pipeline configuration combining all components

  • Merge: Merges outputs from multiple pipelines into a single stream

  • PathVariantsConfig: Routes items to different processing paths

The example also demonstrates the factory functions:

  • Pipe(): Creates pipe configurations for general processing

  • Aggregate(): Creates configurations for batching/grouping data

  • Disaggregate(): Creates configurations for splitting batched data

  • PathVariants(): Creates variant path routing configurations

Note

This pipeline uses the merge mechanism, which is not supported by PipelineBuilder.

graph TD subgraph "Pipeline 1" S1[Source: range#40;5#41;] --> P1[Pipe: square] P1 --> AGG1[Aggregate: batch_size=2] AGG1 --> SNK1[Sink 1] end subgraph "Pipeline 2" S2[Source: range#40;10,15#41;] --> P2[Pipe: add_100] P2 --> SNK2[Sink 2] end subgraph "Main Pipeline" SNK1 --> M[Merge] SNK2 --> M M --> NORM[Pipe: normalize_to_lists] NORM --> DISAGG[Disaggregate] DISAGG --> R[Router: cache_router] R -->|cache hit| CACHE[Pipe: load_from_cache] R -->|cache miss| MUL[Pipe: multiply_by_10] MUL --> STORE[Pipe: store_in_cache] CACHE --> PV_MERGE[PathVariants merge] STORE --> PV_MERGE PV_MERGE --> FINAL_SINK[Final Sink] end

The data flow:

  1. Pipeline 1: [0, 1, 2, 3, 4]square[[0, 1], [4, 9], [16]]

  2. Pipeline 2: [10, 11, 12, 13, 14]add_100[110, 111, 112, 113, 114]

  3. Merge combines outputs: [[0, 1], [4, 9], [16], 110, 111, 112, 113, 114]

  4. Normalize handles mixed data types: [[0, 1], [4, 9], [16], [110], [111], [112], [113], [114]]

  5. Disaggregate flattens: [0, 1, 4, 9, 16, 110, 111, 112, 113, 114]

  6. PathVariants routes each item based on cache membership:

    • Cache hits (pre-populated values) → load_from_cache (1 stage)

    • Cache misses → multiply_by_10store_in_cache (2 stages: compute ×10, then store the result for future lookups)

Source

Source

Click here to see the source.
  1#!/usr/bin/env python3
  2# Copyright (c) Meta Platforms, Inc. and affiliates.
  3# All rights reserved.
  4#
  5# This source code is licensed under the BSD-style license found in the
  6# LICENSE file in the root directory of this source tree.
  7
  8"""Comprehensive example defining a complex pipeline with :py:mod:`spdl.pipeline.defs`.
  9
 10This example showcases the usage of all configuration classes available in the
 11:py:mod:`spdl.pipeline.defs` module, including:
 12
 13.. py:currentmodule:: spdl.pipeline.defs
 14
 15- :py:class:`SourceConfig`: Configures data sources for pipelines
 16- :py:class:`PipeConfig`: Configures individual processing stages (via factory functions)
 17- :py:class:`SinkConfig`: Configures output buffering for pipelines
 18- :py:class:`PipelineConfig`: Top-level pipeline configuration combining all components
 19- :py:class:`Merge`: Merges outputs from multiple pipelines into a single stream
 20- :py:class:`PathVariantsConfig`: Routes items to different processing paths
 21
 22The example also demonstrates the factory functions:
 23
 24- :py:func:`Pipe`: Creates pipe configurations for general processing
 25- :py:func:`Aggregate`: Creates configurations for batching/grouping data
 26- :py:func:`Disaggregate`: Creates configurations for splitting batched data
 27- :py:func:`PathVariants`: Creates variant path routing configurations
 28
 29.. note::
 30
 31   This pipeline uses the merge mechanism, which is not supported by
 32   :py:class:`~spdl.pipeline.PipelineBuilder`.
 33
 34.. mermaid::
 35
 36   graph TD
 37       subgraph "Pipeline 1"
 38           S1[Source: range#40;5#41;] --> P1[Pipe: square]
 39           P1 --> AGG1[Aggregate: batch_size=2]
 40           AGG1 --> SNK1[Sink 1]
 41       end
 42
 43       subgraph "Pipeline 2"
 44           S2[Source: range#40;10,15#41;] --> P2[Pipe: add_100]
 45           P2 --> SNK2[Sink 2]
 46       end
 47
 48       subgraph "Main Pipeline"
 49           SNK1 --> M[Merge]
 50           SNK2 --> M
 51           M --> NORM[Pipe: normalize_to_lists]
 52           NORM --> DISAGG[Disaggregate]
 53           DISAGG --> R[Router: cache_router]
 54           R -->|cache hit| CACHE[Pipe: load_from_cache]
 55           R -->|cache miss| MUL[Pipe: multiply_by_10]
 56           MUL --> STORE[Pipe: store_in_cache]
 57           CACHE --> PV_MERGE[PathVariants merge]
 58           STORE --> PV_MERGE
 59           PV_MERGE --> FINAL_SINK[Final Sink]
 60       end
 61
 62The data flow:
 63
 641. Pipeline 1:
 65   ``[0, 1, 2, 3, 4]`` → ``square`` → ``[[0, 1], [4, 9], [16]]``
 662. Pipeline 2:
 67   ``[10, 11, 12, 13, 14]`` → ``add_100`` → ``[110, 111, 112, 113, 114]``
 683. Merge combines outputs:
 69   ``[[0, 1], [4, 9], [16], 110, 111, 112, 113, 114]``
 704. Normalize handles mixed data types:
 71   ``[[0, 1], [4, 9], [16], [110], [111], [112], [113], [114]]``
 725. Disaggregate flattens:
 73   ``[0, 1, 4, 9, 16, 110, 111, 112, 113, 114]``
 746. PathVariants routes each item based on cache membership:
 75
 76   - Cache hits (pre-populated values) → ``load_from_cache`` (1 stage)
 77   - Cache misses → ``multiply_by_10`` → ``store_in_cache`` (2 stages:
 78     compute ``×10``, then store the result for future lookups)
 79"""
 80
 81__all__ = [
 82    "main",
 83    "create_sub_pipeline_1",
 84    "create_sub_pipeline_2",
 85    "create_main_pipeline",
 86    "square",
 87    "add_100",
 88    "multiply_by_10",
 89    "store_in_cache",
 90    "normalize_to_lists",
 91    "cache_router",
 92    "load_from_cache",
 93    "run_pipeline_example",
 94]
 95
 96import logging
 97from typing import Any
 98
 99from spdl.pipeline import build_pipeline
100from spdl.pipeline.defs import (
101    Aggregate,
102    Disaggregate,
103    Merge,
104    PathVariants,
105    Pipe,
106    PipelineConfig,
107    SinkConfig,
108    SourceConfig,
109)
110
111# pyre-strict
112
113_LG: logging.Logger = logging.getLogger(__name__)
114
115
116def square(x: int) -> int:
117    """Square the input number."""
118    return x * x
119
120
121def add_100(x: int) -> int:
122    """Add 100 to the input number."""
123    return x + 100
124
125
126################################################################################
127# Cache-based routing with PathVariants
128################################################################################
129
130# Cache of pre-computed results.  In a real pipeline this might be backed by
131# a key-value store or an on-disk cache; here we use a plain dict.
132_CACHE: dict[int, int] = {0: 0, 1: 10, 4: 40}  # pre-populated with a few ×10 values
133
134
135def cache_router(item: int) -> int:
136    """Route items based on cache membership.
137
138    Returns 0 for cache hits (fast path) and 1 for cache misses (slow path).
139    """
140    return 0 if item in _CACHE else 1
141
142
143def load_from_cache(item: int) -> int:
144    """Retrieve a previously computed result from the cache (fast path)."""
145    return _CACHE[item]
146
147
148def multiply_by_10(item: int) -> tuple[int, int]:
149    """Multiply the input by 10 (slow path, first stage).
150
151    Returns a (key, result) tuple so the next stage can store it in the cache.
152    """
153    return (item, item * 10)
154
155
156def store_in_cache(item: tuple[int, int]) -> int:
157    """Store the computed result in the cache and return it (slow path, second stage)."""
158    key, value = item
159    _CACHE[key] = value
160    return value
161
162
163def create_sub_pipeline_1() -> PipelineConfig[list[int]]:
164    """Create a sub-pipeline that squares numbers and aggregates them.
165
166    .. code-block:: text
167
168       range(5)
169        → square
170        → aggregate(2)
171        → [[squared_pairs], [remaining]]
172
173    Returns:
174        Configuration for a pipeline that processes
175        ``[0,1,2,3,4]`` into batches of squared values.
176    """
177    source_config = SourceConfig(range(5))
178    square_pipe = Pipe(square)
179    aggregate_pipe = Aggregate(2, drop_last=False)
180    sink_config = SinkConfig(buffer_size=10)
181    return PipelineConfig(
182        src=source_config,
183        pipes=[square_pipe, aggregate_pipe],
184        sink=sink_config,
185    )
186
187
188def create_sub_pipeline_2() -> PipelineConfig[int]:
189    """Create a sub-pipeline that adds 100 to numbers.
190
191    .. code-block:: text
192
193       range(10,15)
194        → add_100
195        → individual_values
196
197    Returns:
198        Configuration for a pipeline that processes
199        ``[10,11,12,13,14]`` by adding ``100``.
200    """
201    source_config = SourceConfig(range(10, 15))
202    add_pipe = Pipe(add_100, concurrency=2)
203    sink_config = SinkConfig(buffer_size=5)
204
205    return PipelineConfig(
206        src=source_config,
207        pipes=[add_pipe],
208        sink=sink_config,
209    )
210
211
212def normalize_to_lists(item: Any) -> list[Any]:
213    """Flatten lists or wrap individual items in a list for uniform handling."""
214    if isinstance(item, list):
215        return item
216    else:
217        return [item]
218
219
220def create_main_pipeline(
221    sub_pipeline_1: PipelineConfig[list[int]],
222    sub_pipeline_2: PipelineConfig[int],
223) -> PipelineConfig[int]:
224    """Create the main pipeline that merges outputs from sub-pipelines.
225
226    After merging, normalising and disaggregating, the pipeline uses
227    :py:func:`PathVariants` to route each item through a cache-aware
228    multiply-by-10 stage:
229
230    - **Cache hit** → ``load_from_cache`` (1 pipe stage)
231    - **Cache miss** → ``multiply_by_10`` → ``store_in_cache`` (2 pipe stages)
232
233    Note that the two paths have different numbers of pipe stages,
234    which PathVariants supports.
235
236    .. code-block:: text
237
238       Merge([sub1, sub2])
239         → normalize_to_lists
240         → disaggregate
241         → PathVariants(cache_router)
242             path 0 (hit):  load_from_cache
243             path 1 (miss): multiply_by_10 → store_in_cache
244
245    Args:
246        sub_pipeline_1: First sub-pipeline configuration
247        sub_pipeline_2: Second sub-pipeline configuration
248
249    Returns:
250        Main pipeline configuration that merges and processes the sub-pipeline outputs.
251    """
252    merge_config = Merge([sub_pipeline_1, sub_pipeline_2])
253    normalize_pipe = Pipe(normalize_to_lists)
254    disaggregate_pipe = Disaggregate()
255
256    # Instead of a plain Pipe(multiply_by_10), use PathVariants to
257    # demonstrate cache-based routing: items already in the cache are
258    # served instantly, while cache misses compute the result and
259    # populate the cache for future lookups.
260    cached_multiply = PathVariants(
261        router=cache_router,
262        paths=[
263            [Pipe(load_from_cache)],  # path 0: 1-stage (fast)
264            [
265                Pipe(multiply_by_10),
266                Pipe(store_in_cache),
267            ],  # path 1: 2-stage (compute + cache)
268        ],
269    )
270
271    sink_config = SinkConfig(buffer_size=20)
272
273    return PipelineConfig(
274        src=merge_config,
275        pipes=[normalize_pipe, disaggregate_pipe, cached_multiply],
276        sink=sink_config,
277    )
278
279
280def run_pipeline_example() -> list[int]:
281    """Execute the complete pipeline example and return results.
282
283    Returns:
284        List of processed integers from the merged pipeline execution.
285    """
286    _LG.info("Creating sub-pipeline configurations...")
287
288    sub_pipeline_1 = create_sub_pipeline_1()
289    sub_pipeline_2 = create_sub_pipeline_2()
290
291    _LG.info("Sub-pipeline 1: %s", sub_pipeline_1)
292    _LG.info("Sub-pipeline 2: %s", sub_pipeline_2)
293
294    main_pipeline_config = create_main_pipeline(sub_pipeline_1, sub_pipeline_2)
295
296    _LG.info("Main pipeline config: %s", main_pipeline_config)
297
298    _LG.info("Building the pipeline.")
299    pipeline = build_pipeline(main_pipeline_config, num_threads=4)
300
301    _LG.info("Executing the pipeline.")
302    results = []
303    with pipeline.auto_stop():
304        for item in pipeline:
305            results.append(item)
306
307    return results
308
309
310def run() -> None:
311    """Run example pipeline and check the result."""
312    results = run_pipeline_example()
313
314    _LG.info("Final results: %s", results)
315    _LG.info("Number of items processed: %d", len(results))
316
317    # Verify expected data flow
318    expected_squared = [0, 1, 4, 9, 16]  # squares of 0-4
319    expected_added = [110, 111, 112, 113, 114]  # 10-14 + 100
320    expected_combined_count = len(expected_squared) + len(expected_added)
321
322    if len(results) != expected_combined_count:
323        raise RuntimeError(
324            f"✗ Unexpected number of items: got {len(results)}, expected {expected_combined_count}"
325        )
326    _LG.info("✓ Pipeline processed expected number of items")
327
328
329def main() -> None:
330    """Main entry point demonstrating all pipeline configuration classes."""
331    logging.basicConfig(
332        level=logging.INFO, format="%(asctime)s [%(levelname)s] %(message)s"
333    )
334
335    run()
336
337
338if __name__ == "__main__":
339    main()

API Reference

Functions

main() None[source]

Main entry point demonstrating all pipeline configuration classes.

create_sub_pipeline_1() PipelineConfig[list[int]][source]

Create a sub-pipeline that squares numbers and aggregates them.

range(5)
 → square
 → aggregate(2)
 → [[squared_pairs], [remaining]]
Returns:

Configuration for a pipeline that processes [0,1,2,3,4] into batches of squared values.

create_sub_pipeline_2() PipelineConfig[int][source]

Create a sub-pipeline that adds 100 to numbers.

range(10,15)
 → add_100
 → individual_values
Returns:

Configuration for a pipeline that processes [10,11,12,13,14] by adding 100.

create_main_pipeline(sub_pipeline_1: PipelineConfig[list[int]], sub_pipeline_2: PipelineConfig[int]) PipelineConfig[int][source]

Create the main pipeline that merges outputs from sub-pipelines.

After merging, normalising and disaggregating, the pipeline uses PathVariants() to route each item through a cache-aware multiply-by-10 stage:

  • Cache hitload_from_cache (1 pipe stage)

  • Cache missmultiply_by_10store_in_cache (2 pipe stages)

Note that the two paths have different numbers of pipe stages, which PathVariants supports.

Merge([sub1, sub2])
  → normalize_to_lists
  → disaggregate
  → PathVariants(cache_router)
      path 0 (hit):  load_from_cache
      path 1 (miss): multiply_by_10 → store_in_cache
Parameters:
  • sub_pipeline_1 – First sub-pipeline configuration

  • sub_pipeline_2 – Second sub-pipeline configuration

Returns:

Main pipeline configuration that merges and processes the sub-pipeline outputs.

square(x: int) int[source]

Square the input number.

add_100(x: int) int[source]

Add 100 to the input number.

multiply_by_10(item: int) tuple[int, int][source]

Multiply the input by 10 (slow path, first stage).

Returns a (key, result) tuple so the next stage can store it in the cache.

store_in_cache(item: tuple[int, int]) int[source]

Store the computed result in the cache and return it (slow path, second stage).

normalize_to_lists(item: Any) list[Any][source]

Flatten lists or wrap individual items in a list for uniform handling.

cache_router(item: int) int[source]

Route items based on cache membership.

Returns 0 for cache hits (fast path) and 1 for cache misses (slow path).

load_from_cache(item: int) int[source]

Retrieve a previously computed result from the cache (fast path).

run_pipeline_example() list[int][source]

Execute the complete pipeline example and return results.

Returns:

List of processed integers from the merged pipeline execution.