Pipeline definitions¶
Comprehensive example defining a complex pipeline with spdl.pipeline.defs.
This example showcases the usage of all configuration classes available in the
spdl.pipeline.defs module, including:
SourceConfig: Configures data sources for pipelinesPipeConfig: Configures individual processing stages (via factory functions)SinkConfig: Configures output buffering for pipelinesPipelineConfig: Top-level pipeline configuration combining all componentsMerge: Merges outputs from multiple pipelines into a single streamPathVariantsConfig: Routes items to different processing paths
The example also demonstrates the factory functions:
Pipe(): Creates pipe configurations for general processingAggregate(): Creates configurations for batching/grouping dataDisaggregate(): Creates configurations for splitting batched dataPathVariants(): Creates variant path routing configurations
Note
This pipeline uses the merge mechanism, which is not supported by
PipelineBuilder.
The data flow:
Pipeline 1:
[0, 1, 2, 3, 4]→square→[[0, 1], [4, 9], [16]]Pipeline 2:
[10, 11, 12, 13, 14]→add_100→[110, 111, 112, 113, 114]Merge combines outputs:
[[0, 1], [4, 9], [16], 110, 111, 112, 113, 114]Normalize handles mixed data types:
[[0, 1], [4, 9], [16], [110], [111], [112], [113], [114]]Disaggregate flattens:
[0, 1, 4, 9, 16, 110, 111, 112, 113, 114]PathVariants routes each item based on cache membership:
Cache hits (pre-populated values) →
load_from_cache(1 stage)Cache misses →
multiply_by_10→store_in_cache(2 stages: compute×10, then store the result for future lookups)
Source¶
Source
Click here to see the source.
1#!/usr/bin/env python3
2# Copyright (c) Meta Platforms, Inc. and affiliates.
3# All rights reserved.
4#
5# This source code is licensed under the BSD-style license found in the
6# LICENSE file in the root directory of this source tree.
7
8"""Comprehensive example defining a complex pipeline with :py:mod:`spdl.pipeline.defs`.
9
10This example showcases the usage of all configuration classes available in the
11:py:mod:`spdl.pipeline.defs` module, including:
12
13.. py:currentmodule:: spdl.pipeline.defs
14
15- :py:class:`SourceConfig`: Configures data sources for pipelines
16- :py:class:`PipeConfig`: Configures individual processing stages (via factory functions)
17- :py:class:`SinkConfig`: Configures output buffering for pipelines
18- :py:class:`PipelineConfig`: Top-level pipeline configuration combining all components
19- :py:class:`Merge`: Merges outputs from multiple pipelines into a single stream
20- :py:class:`PathVariantsConfig`: Routes items to different processing paths
21
22The example also demonstrates the factory functions:
23
24- :py:func:`Pipe`: Creates pipe configurations for general processing
25- :py:func:`Aggregate`: Creates configurations for batching/grouping data
26- :py:func:`Disaggregate`: Creates configurations for splitting batched data
27- :py:func:`PathVariants`: Creates variant path routing configurations
28
29.. note::
30
31 This pipeline uses the merge mechanism, which is not supported by
32 :py:class:`~spdl.pipeline.PipelineBuilder`.
33
34.. mermaid::
35
36 graph TD
37 subgraph "Pipeline 1"
38 S1[Source: range#40;5#41;] --> P1[Pipe: square]
39 P1 --> AGG1[Aggregate: batch_size=2]
40 AGG1 --> SNK1[Sink 1]
41 end
42
43 subgraph "Pipeline 2"
44 S2[Source: range#40;10,15#41;] --> P2[Pipe: add_100]
45 P2 --> SNK2[Sink 2]
46 end
47
48 subgraph "Main Pipeline"
49 SNK1 --> M[Merge]
50 SNK2 --> M
51 M --> NORM[Pipe: normalize_to_lists]
52 NORM --> DISAGG[Disaggregate]
53 DISAGG --> R[Router: cache_router]
54 R -->|cache hit| CACHE[Pipe: load_from_cache]
55 R -->|cache miss| MUL[Pipe: multiply_by_10]
56 MUL --> STORE[Pipe: store_in_cache]
57 CACHE --> PV_MERGE[PathVariants merge]
58 STORE --> PV_MERGE
59 PV_MERGE --> FINAL_SINK[Final Sink]
60 end
61
62The data flow:
63
641. Pipeline 1:
65 ``[0, 1, 2, 3, 4]`` → ``square`` → ``[[0, 1], [4, 9], [16]]``
662. Pipeline 2:
67 ``[10, 11, 12, 13, 14]`` → ``add_100`` → ``[110, 111, 112, 113, 114]``
683. Merge combines outputs:
69 ``[[0, 1], [4, 9], [16], 110, 111, 112, 113, 114]``
704. Normalize handles mixed data types:
71 ``[[0, 1], [4, 9], [16], [110], [111], [112], [113], [114]]``
725. Disaggregate flattens:
73 ``[0, 1, 4, 9, 16, 110, 111, 112, 113, 114]``
746. PathVariants routes each item based on cache membership:
75
76 - Cache hits (pre-populated values) → ``load_from_cache`` (1 stage)
77 - Cache misses → ``multiply_by_10`` → ``store_in_cache`` (2 stages:
78 compute ``×10``, then store the result for future lookups)
79"""
80
81__all__ = [
82 "main",
83 "create_sub_pipeline_1",
84 "create_sub_pipeline_2",
85 "create_main_pipeline",
86 "square",
87 "add_100",
88 "multiply_by_10",
89 "store_in_cache",
90 "normalize_to_lists",
91 "cache_router",
92 "load_from_cache",
93 "run_pipeline_example",
94]
95
96import logging
97from typing import Any
98
99from spdl.pipeline import build_pipeline
100from spdl.pipeline.defs import (
101 Aggregate,
102 Disaggregate,
103 Merge,
104 PathVariants,
105 Pipe,
106 PipelineConfig,
107 SinkConfig,
108 SourceConfig,
109)
110
111# pyre-strict
112
113_LG: logging.Logger = logging.getLogger(__name__)
114
115
116def square(x: int) -> int:
117 """Square the input number."""
118 return x * x
119
120
121def add_100(x: int) -> int:
122 """Add 100 to the input number."""
123 return x + 100
124
125
126################################################################################
127# Cache-based routing with PathVariants
128################################################################################
129
130# Cache of pre-computed results. In a real pipeline this might be backed by
131# a key-value store or an on-disk cache; here we use a plain dict.
132_CACHE: dict[int, int] = {0: 0, 1: 10, 4: 40} # pre-populated with a few ×10 values
133
134
135def cache_router(item: int) -> int:
136 """Route items based on cache membership.
137
138 Returns 0 for cache hits (fast path) and 1 for cache misses (slow path).
139 """
140 return 0 if item in _CACHE else 1
141
142
143def load_from_cache(item: int) -> int:
144 """Retrieve a previously computed result from the cache (fast path)."""
145 return _CACHE[item]
146
147
148def multiply_by_10(item: int) -> tuple[int, int]:
149 """Multiply the input by 10 (slow path, first stage).
150
151 Returns a (key, result) tuple so the next stage can store it in the cache.
152 """
153 return (item, item * 10)
154
155
156def store_in_cache(item: tuple[int, int]) -> int:
157 """Store the computed result in the cache and return it (slow path, second stage)."""
158 key, value = item
159 _CACHE[key] = value
160 return value
161
162
163def create_sub_pipeline_1() -> PipelineConfig[list[int]]:
164 """Create a sub-pipeline that squares numbers and aggregates them.
165
166 .. code-block:: text
167
168 range(5)
169 → square
170 → aggregate(2)
171 → [[squared_pairs], [remaining]]
172
173 Returns:
174 Configuration for a pipeline that processes
175 ``[0,1,2,3,4]`` into batches of squared values.
176 """
177 source_config = SourceConfig(range(5))
178 square_pipe = Pipe(square)
179 aggregate_pipe = Aggregate(2, drop_last=False)
180 sink_config = SinkConfig(buffer_size=10)
181 return PipelineConfig(
182 src=source_config,
183 pipes=[square_pipe, aggregate_pipe],
184 sink=sink_config,
185 )
186
187
188def create_sub_pipeline_2() -> PipelineConfig[int]:
189 """Create a sub-pipeline that adds 100 to numbers.
190
191 .. code-block:: text
192
193 range(10,15)
194 → add_100
195 → individual_values
196
197 Returns:
198 Configuration for a pipeline that processes
199 ``[10,11,12,13,14]`` by adding ``100``.
200 """
201 source_config = SourceConfig(range(10, 15))
202 add_pipe = Pipe(add_100, concurrency=2)
203 sink_config = SinkConfig(buffer_size=5)
204
205 return PipelineConfig(
206 src=source_config,
207 pipes=[add_pipe],
208 sink=sink_config,
209 )
210
211
212def normalize_to_lists(item: Any) -> list[Any]:
213 """Flatten lists or wrap individual items in a list for uniform handling."""
214 if isinstance(item, list):
215 return item
216 else:
217 return [item]
218
219
220def create_main_pipeline(
221 sub_pipeline_1: PipelineConfig[list[int]],
222 sub_pipeline_2: PipelineConfig[int],
223) -> PipelineConfig[int]:
224 """Create the main pipeline that merges outputs from sub-pipelines.
225
226 After merging, normalising and disaggregating, the pipeline uses
227 :py:func:`PathVariants` to route each item through a cache-aware
228 multiply-by-10 stage:
229
230 - **Cache hit** → ``load_from_cache`` (1 pipe stage)
231 - **Cache miss** → ``multiply_by_10`` → ``store_in_cache`` (2 pipe stages)
232
233 Note that the two paths have different numbers of pipe stages,
234 which PathVariants supports.
235
236 .. code-block:: text
237
238 Merge([sub1, sub2])
239 → normalize_to_lists
240 → disaggregate
241 → PathVariants(cache_router)
242 path 0 (hit): load_from_cache
243 path 1 (miss): multiply_by_10 → store_in_cache
244
245 Args:
246 sub_pipeline_1: First sub-pipeline configuration
247 sub_pipeline_2: Second sub-pipeline configuration
248
249 Returns:
250 Main pipeline configuration that merges and processes the sub-pipeline outputs.
251 """
252 merge_config = Merge([sub_pipeline_1, sub_pipeline_2])
253 normalize_pipe = Pipe(normalize_to_lists)
254 disaggregate_pipe = Disaggregate()
255
256 # Instead of a plain Pipe(multiply_by_10), use PathVariants to
257 # demonstrate cache-based routing: items already in the cache are
258 # served instantly, while cache misses compute the result and
259 # populate the cache for future lookups.
260 cached_multiply = PathVariants(
261 router=cache_router,
262 paths=[
263 [Pipe(load_from_cache)], # path 0: 1-stage (fast)
264 [
265 Pipe(multiply_by_10),
266 Pipe(store_in_cache),
267 ], # path 1: 2-stage (compute + cache)
268 ],
269 )
270
271 sink_config = SinkConfig(buffer_size=20)
272
273 return PipelineConfig(
274 src=merge_config,
275 pipes=[normalize_pipe, disaggregate_pipe, cached_multiply],
276 sink=sink_config,
277 )
278
279
280def run_pipeline_example() -> list[int]:
281 """Execute the complete pipeline example and return results.
282
283 Returns:
284 List of processed integers from the merged pipeline execution.
285 """
286 _LG.info("Creating sub-pipeline configurations...")
287
288 sub_pipeline_1 = create_sub_pipeline_1()
289 sub_pipeline_2 = create_sub_pipeline_2()
290
291 _LG.info("Sub-pipeline 1: %s", sub_pipeline_1)
292 _LG.info("Sub-pipeline 2: %s", sub_pipeline_2)
293
294 main_pipeline_config = create_main_pipeline(sub_pipeline_1, sub_pipeline_2)
295
296 _LG.info("Main pipeline config: %s", main_pipeline_config)
297
298 _LG.info("Building the pipeline.")
299 pipeline = build_pipeline(main_pipeline_config, num_threads=4)
300
301 _LG.info("Executing the pipeline.")
302 results = []
303 with pipeline.auto_stop():
304 for item in pipeline:
305 results.append(item)
306
307 return results
308
309
310def run() -> None:
311 """Run example pipeline and check the result."""
312 results = run_pipeline_example()
313
314 _LG.info("Final results: %s", results)
315 _LG.info("Number of items processed: %d", len(results))
316
317 # Verify expected data flow
318 expected_squared = [0, 1, 4, 9, 16] # squares of 0-4
319 expected_added = [110, 111, 112, 113, 114] # 10-14 + 100
320 expected_combined_count = len(expected_squared) + len(expected_added)
321
322 if len(results) != expected_combined_count:
323 raise RuntimeError(
324 f"✗ Unexpected number of items: got {len(results)}, expected {expected_combined_count}"
325 )
326 _LG.info("✓ Pipeline processed expected number of items")
327
328
329def main() -> None:
330 """Main entry point demonstrating all pipeline configuration classes."""
331 logging.basicConfig(
332 level=logging.INFO, format="%(asctime)s [%(levelname)s] %(message)s"
333 )
334
335 run()
336
337
338if __name__ == "__main__":
339 main()
API Reference¶
Functions
- create_sub_pipeline_1() PipelineConfig[list[int]][source]¶
Create a sub-pipeline that squares numbers and aggregates them.
range(5) → square → aggregate(2) → [[squared_pairs], [remaining]]
- Returns:
Configuration for a pipeline that processes
[0,1,2,3,4]into batches of squared values.
- create_sub_pipeline_2() PipelineConfig[int][source]¶
Create a sub-pipeline that adds 100 to numbers.
range(10,15) → add_100 → individual_values
- Returns:
Configuration for a pipeline that processes
[10,11,12,13,14]by adding100.
- create_main_pipeline(sub_pipeline_1: PipelineConfig[list[int]], sub_pipeline_2: PipelineConfig[int]) PipelineConfig[int][source]¶
Create the main pipeline that merges outputs from sub-pipelines.
After merging, normalising and disaggregating, the pipeline uses
PathVariants()to route each item through a cache-aware multiply-by-10 stage:Cache hit →
load_from_cache(1 pipe stage)Cache miss →
multiply_by_10→store_in_cache(2 pipe stages)
Note that the two paths have different numbers of pipe stages, which PathVariants supports.
Merge([sub1, sub2]) → normalize_to_lists → disaggregate → PathVariants(cache_router) path 0 (hit): load_from_cache path 1 (miss): multiply_by_10 → store_in_cache- Parameters:
sub_pipeline_1 – First sub-pipeline configuration
sub_pipeline_2 – Second sub-pipeline configuration
- Returns:
Main pipeline configuration that merges and processes the sub-pipeline outputs.
- multiply_by_10(item: int) tuple[int, int][source]¶
Multiply the input by 10 (slow path, first stage).
Returns a (key, result) tuple so the next stage can store it in the cache.
- store_in_cache(item: tuple[int, int]) int[source]¶
Store the computed result in the cache and return it (slow path, second stage).
- normalize_to_lists(item: Any) list[Any][source]¶
Flatten lists or wrap individual items in a list for uniform handling.
- cache_router(item: int) int[source]¶
Route items based on cache membership.
Returns 0 for cache hits (fast path) and 1 for cache misses (slow path).