Pipeline definitions¶
Comprehensive example defining a complex pipeline with spdl.pipeline.defs
.
This example showcases the usage of all configuration classes available in the
spdl.pipeline.defs
module, including:
SourceConfig
: Configures data sources for pipelinesPipeConfig
: Configures individual processing stages (via factory functions)SinkConfig
: Configures output buffering for pipelinesPipelineConfig
: Top-level pipeline configuration combining all componentsMerge
: Merges outputs from multiple pipelines into a single stream
The example also demonstrates the factory functions:
Pipe()
: Creates pipe configurations for general processingAggregate()
: Creates configurations for batching/grouping dataDisaggregate()
: Creates configurations for splitting batched data
The pipeline structure created by this example is illustrated below:
Note
The pipeline defined here uses merge mechanism, which is not supported by
PipelineBuilder
.
The data flow:
Pipeline 1:
[0, 1, 2, 3, 4]
→square
→[[0, 1], [4, 9], [16]]
Pipeline 2:
[10, 11, 12, 13, 14]
→add_100
→[110, 111, 112, 113, 114]
Merge combines outputs:
[[0, 1], [4, 9], [16], 110, 111, 112, 113, 114]
Normalize handles mixed data types:
[[0, 1], [4, 9], [16], [110], [111], [112], [113], [114]]
Disaggregate flattens:
[0, 1, 4, 9, 16, 110, 111, 112, 113, 114]
Finally, multiply by 10:
[0, 10, 40, 90, 160, 1100, 1110, 1120, 1130, 1140]
Source¶
Source
Click here to see the source.
1#!/usr/bin/env python3
2# Copyright (c) Meta Platforms, Inc. and affiliates.
3# All rights reserved.
4#
5# This source code is licensed under the BSD-style license found in the
6# LICENSE file in the root directory of this source tree.
7
8"""Comprehensive example defining a complex pipeline with :py:mod:`spdl.pipeline.defs`.
9
10This example showcases the usage of all configuration classes available in the
11:py:mod:`spdl.pipeline.defs` module, including:
12
13.. py:currentmodule:: spdl.pipeline.defs
14
15- :py:class:`SourceConfig`: Configures data sources for pipelines
16- :py:class:`PipeConfig`: Configures individual processing stages (via factory functions)
17- :py:class:`SinkConfig`: Configures output buffering for pipelines
18- :py:class:`PipelineConfig`: Top-level pipeline configuration combining all components
19- :py:class:`Merge`: Merges outputs from multiple pipelines into a single stream
20
21The example also demonstrates the factory functions:
22
23- :py:func:`Pipe`: Creates pipe configurations for general processing
24- :py:func:`Aggregate`: Creates configurations for batching/grouping data
25- :py:func:`Disaggregate`: Creates configurations for splitting batched data
26
27The pipeline structure created by this example is illustrated below:
28
29.. note::
30
31 The pipeline defined here uses merge mechanism, which is not supported by
32 :py:class:`~spdl.pipeline.PipelineBuilder`.
33
34.. mermaid::
35
36 graph TD
37 subgraph "Pipeline 1"
38 S1[Source: range#40;5#41;] --> P1[Pipe: square]
39 P1 --> AGG1[Aggregate: batch_size=2]
40 AGG1 --> SNK1[Sink 1]
41 end
42
43 subgraph "Pipeline 2"
44 S2[Source: range#40;10,15#41;] --> P2[Pipe: add_100]
45 P2 --> SNK2[Sink 2]
46 end
47
48 subgraph "Main Pipeline"
49 SNK1 --> M[Merge]
50 SNK2 --> M
51 M --> NORM[Pipe: normalize_to_lists]
52 NORM --> DISAGG[Disaggregate]
53 DISAGG --> P3[Pipe: multiply_by_10]
54 P3 --> FINAL_SINK[Final Sink]
55 end
56
57The data flow:
58
591. Pipeline 1:
60 ``[0, 1, 2, 3, 4]`` → ``square`` → ``[[0, 1], [4, 9], [16]]``
612. Pipeline 2:
62 ``[10, 11, 12, 13, 14]`` → ``add_100`` → ``[110, 111, 112, 113, 114]``
633. Merge combines outputs:
64 ``[[0, 1], [4, 9], [16], 110, 111, 112, 113, 114]``
654. Normalize handles mixed data types:
66 ``[[0, 1], [4, 9], [16], [110], [111], [112], [113], [114]]``
675. Disaggregate flattens:
68 ``[0, 1, 4, 9, 16, 110, 111, 112, 113, 114]``
696. Finally, multiply by 10:
70 ``[0, 10, 40, 90, 160, 1100, 1110, 1120, 1130, 1140]``
71"""
72
73__all__ = [
74 "main",
75 "create_sub_pipeline_1",
76 "create_sub_pipeline_2",
77 "create_main_pipeline",
78 "square",
79 "add_100",
80 "multiply_by_10",
81 "normalize_to_lists",
82 "run_pipeline_example",
83]
84
85import logging
86from typing import Any
87
88from spdl.pipeline import build_pipeline
89from spdl.pipeline.defs import (
90 Aggregate,
91 Disaggregate,
92 Merge,
93 Pipe,
94 PipelineConfig,
95 SinkConfig,
96 SourceConfig,
97)
98
99# pyre-strict
100
101_LG: logging.Logger = logging.getLogger(__name__)
102
103
104def square(x: int) -> int:
105 """Square the input number."""
106 return x * x
107
108
109def add_100(x: int) -> int:
110 """Add 100 to the input number."""
111 return x + 100
112
113
114def multiply_by_10(x: int) -> int:
115 """Multiply the input by 10."""
116 return x * 10
117
118
119def create_sub_pipeline_1() -> PipelineConfig[int, list[int]]:
120 """Create a sub-pipeline that squares numbers and aggregates them.
121
122 .. code-block:: text
123
124 range(5)
125 → square
126 → aggregate(2)
127 → [[squared_pairs], [remaining]]
128
129 Returns:
130 Configuration for a pipeline that processes
131 ``[0,1,2,3,4]`` into batches of squared values.
132 """
133 source_config = SourceConfig(range(5))
134 square_pipe = Pipe(square)
135 aggregate_pipe = Aggregate(2, drop_last=False)
136 sink_config = SinkConfig(buffer_size=10)
137 return PipelineConfig(
138 src=source_config,
139 pipes=[square_pipe, aggregate_pipe],
140 sink=sink_config,
141 )
142
143
144def create_sub_pipeline_2() -> PipelineConfig[int, int]:
145 """Create a sub-pipeline that adds 100 to numbers.
146
147 .. code-block:: text
148
149 range(10,15)
150 → add_100
151 → individual_values
152
153 Returns:
154 Configuration for a pipeline that processes
155 ``[10,11,12,13,14]`` by adding ``100``.
156 """
157 source_config = SourceConfig(range(10, 15))
158 add_pipe = Pipe(add_100, concurrency=2)
159 sink_config = SinkConfig(buffer_size=5)
160
161 return PipelineConfig(
162 src=source_config,
163 pipes=[add_pipe],
164 sink=sink_config,
165 )
166
167
168def normalize_to_lists(item: Any) -> list[Any]:
169 """Flatten lists or wrap individual items in a list for uniform handling."""
170 if isinstance(item, list):
171 return item
172 else:
173 return [item]
174
175
176def create_main_pipeline(
177 sub_pipeline_1: PipelineConfig[int, list[int]],
178 sub_pipeline_2: PipelineConfig[int, int],
179) -> PipelineConfig[Any, int]:
180 """Create the main pipeline that merges outputs from sub-pipelines.
181
182 .. code-block:: text
183
184 Merge([sub1, sub2])
185 → normalize_to_lists
186 → disaggregate
187 → multiply_by_10
188
189 Args:
190 sub_pipeline_1: First sub-pipeline configuration
191 sub_pipeline_2: Second sub-pipeline configuration
192
193 Returns:
194 Main pipeline configuration that merges and processes the sub-pipeline outputs.
195 """
196 merge_config = Merge([sub_pipeline_1, sub_pipeline_2])
197 normalize_pipe = Pipe(normalize_to_lists)
198 disaggregate_pipe = Disaggregate()
199 multiply_pipe = Pipe(
200 multiply_by_10,
201 concurrency=3,
202 output_order="input", # Maintain input order
203 )
204
205 sink_config = SinkConfig(buffer_size=20)
206
207 return PipelineConfig(
208 src=merge_config,
209 pipes=[normalize_pipe, disaggregate_pipe, multiply_pipe],
210 sink=sink_config,
211 )
212
213
214def run_pipeline_example() -> list[int]:
215 """Execute the complete pipeline example and return results.
216
217 Returns:
218 List of processed integers from the merged pipeline execution.
219 """
220 _LG.info("Creating sub-pipeline configurations...")
221
222 sub_pipeline_1 = create_sub_pipeline_1()
223 sub_pipeline_2 = create_sub_pipeline_2()
224
225 _LG.info("Sub-pipeline 1: %s", sub_pipeline_1)
226 _LG.info("Sub-pipeline 2: %s", sub_pipeline_2)
227
228 main_pipeline_config = create_main_pipeline(sub_pipeline_1, sub_pipeline_2)
229
230 _LG.info("Main pipeline config: %s", main_pipeline_config)
231
232 _LG.info("Builting the pipeline.")
233 pipeline = build_pipeline(main_pipeline_config, num_threads=4)
234
235 _LG.info("Executing the pipeline.")
236 results = []
237 with pipeline.auto_stop():
238 for item in pipeline:
239 results.append(item)
240
241 return results
242
243
244def run() -> None:
245 """Run example pipeline and check the resutl."""
246 results = run_pipeline_example()
247
248 _LG.info("Final results: %s", results)
249 _LG.info("Number of items processed: %d", len(results))
250
251 # Verify expected data flow
252 expected_squared = [0, 1, 4, 9, 16] # squares of 0-4
253 expected_added = [110, 111, 112, 113, 114] # 10-14 + 100
254 expected_combined_count = len(expected_squared) + len(expected_added)
255
256 if len(results) != expected_combined_count:
257 raise RuntimeError(
258 f"✗ Unexpected number of items: got {len(results)}, expected {expected_combined_count}"
259 )
260 _LG.info("✓ Pipeline processed expected number of items")
261
262
263def main() -> None:
264 """Main entry point demonstrating all pipeline configuration classes."""
265 logging.basicConfig(
266 level=logging.INFO, format="%(asctime)s [%(levelname)s] %(message)s"
267 )
268
269 run()
270
271
272if __name__ == "__main__":
273 main()
Functions¶
Functions
- create_sub_pipeline_1() PipelineConfig[int, list[int]] [source]¶
Create a sub-pipeline that squares numbers and aggregates them.
range(5) → square → aggregate(2) → [[squared_pairs], [remaining]]
- Returns:
Configuration for a pipeline that processes
[0,1,2,3,4]
into batches of squared values.
- create_sub_pipeline_2() PipelineConfig[int, int] [source]¶
Create a sub-pipeline that adds 100 to numbers.
range(10,15) → add_100 → individual_values
- Returns:
Configuration for a pipeline that processes
[10,11,12,13,14]
by adding100
.
- create_main_pipeline(sub_pipeline_1: PipelineConfig[int, list[int]], sub_pipeline_2: PipelineConfig[int, int]) PipelineConfig[Any, int] [source]¶
Create the main pipeline that merges outputs from sub-pipelines.
Merge([sub1, sub2]) → normalize_to_lists → disaggregate → multiply_by_10
- Parameters:
sub_pipeline_1 – First sub-pipeline configuration
sub_pipeline_2 – Second sub-pipeline configuration
- Returns:
Main pipeline configuration that merges and processes the sub-pipeline outputs.