Pipeline profiling¶
Example demonstrating the use of spdl.pipeline.profile_pipeline()
.
The profile_pipeline()
function allows you to benchmark your pipeline
stages independently across different concurrency levels to identify optimal
performance settings. This is particularly useful when tuning pipeline performance
before deploying to production.
This example shows how to:
Create a simple pipeline with multiple processing stages
Use
profile_pipeline()
to benchmark each stageAnalyze the profiling results to identify performance bottlenecks
Use a custom callback to process results as they are generated
Visualize performance results with a plot
This example generates an figure like the following.

If a function is time-consuming like networking or performing,
as long as the GIL is released, the performance improves with more threads.
("scalable_op"
and "scalable_op2"
).
The function might be constrained by other factors such as CPU resource,
and it can hit the peak performance at some point. ("scalable_op2"
)
If the function holds the GIL completely, the performance peaks at single
concurrency, and it degrades as more threads are added. ("op_with_contention"
)
Source¶
Source
Click here to see the source.
1#!/usr/bin/env python3
2# Copyright (c) Meta Platforms, Inc. and affiliates.
3# All rights reserved.
4#
5# This source code is licensed under the BSD-style license found in the
6# LICENSE file in the root directory of this source tree.
7
8"""Example demonstrating the use of :py:func:`spdl.pipeline.profile_pipeline`.
9
10The :py:func:`~spdl.pipeline.profile_pipeline` function allows you to benchmark your pipeline
11stages independently across different concurrency levels to identify optimal
12performance settings. This is particularly useful when tuning pipeline performance
13before deploying to production.
14
15This example shows how to:
16
171. Create a simple pipeline with multiple processing stages
182. Use :py:func:`~spdl.pipeline.profile_pipeline` to benchmark each stage
193. Analyze the profiling results to identify performance bottlenecks
204. Use a custom callback to process results as they are generated
215. Visualize performance results with a plot
22
23This example generates an figure like the following.
24
25.. image:: ../../_static/data/profile_pipeline_example.png
26
27If a function is time-consuming like networking or performing,
28as long as the GIL is released, the performance improves with more threads.
29(``"scalable_op"`` and ``"scalable_op2"``).
30
31The function might be constrained by other factors such as CPU resource,
32and it can hit the peak performance at some point. (``"scalable_op2"``)
33
34If the function holds the GIL completely, the performance peaks at single
35concurrency, and it degrades as more threads are added. (``"op_with_contention"``)
36"""
37
38import argparse
39import logging
40import time
41from pathlib import Path
42
43from spdl.pipeline import profile_pipeline, ProfileResult
44from spdl.pipeline.defs import Pipe, PipelineConfig, SinkConfig, SourceConfig
45
46__all__ = [
47 "parse_args",
48 "main",
49 "scalable_op",
50 "scalable_op2",
51 "op_with_contention",
52 "create_pipeline",
53 "print_profile_result",
54 "plot_profile_results",
55 "run_profiling_example",
56]
57
58# pyre-strict
59
60_LG: logging.Logger = logging.getLogger(__name__)
61
62
63def parse_args() -> argparse.Namespace:
64 """Parse command line arguments."""
65 parser = argparse.ArgumentParser(
66 description=__doc__,
67 formatter_class=argparse.RawDescriptionHelpFormatter,
68 )
69 parser.add_argument(
70 "--num-inputs",
71 type=int,
72 default=500,
73 help="Number of inputs to use for profiling each stage",
74 )
75 parser.add_argument(
76 "--plot-output",
77 type=Path,
78 help="Path to save the performance plot (e.g., profile_results.png)",
79 )
80 return parser.parse_args()
81
82
83def scalable_op(x: int) -> int:
84 """Simulate an operation which releases the GIL most of the time.
85
86 Args:
87 x: Input integer
88
89 Returns:
90 The input value multiplied by 2
91 """
92 time.sleep(0.01)
93 return x * 2
94
95
96def scalable_op2(x: int) -> int:
97 """Simulate an operation which releases the GIL some time.
98
99 Args:
100 x: Input integer
101
102 Returns:
103 The input value plus 100
104 """
105 time.sleep(0.003)
106 return x + 100
107
108
109def op_with_contention(x: int) -> int:
110 """Simulate an operation holds the GIL.
111
112 Args:
113 x: Input integer
114
115 Returns:
116 The input value squared
117 """
118 return x**2
119
120
121def create_pipeline(num_sources: int = 1000) -> PipelineConfig[int, int]:
122 """Create a pipeline configuration with multiple stages.
123
124 Args:
125 num_sources: Number of source items to generate
126
127 Returns:
128 Pipeline configuration with three processing stages
129 """
130 return PipelineConfig(
131 src=SourceConfig(range(num_sources)),
132 pipes=[
133 Pipe(scalable_op),
134 Pipe(scalable_op2),
135 Pipe(op_with_contention),
136 ],
137 sink=SinkConfig(buffer_size=10),
138 )
139
140
141def print_profile_result(result: ProfileResult) -> None:
142 """Print profiling result in a formatted way.
143
144 This is a callback function that will be called after each stage is profiled.
145
146 Args:
147 result: Profiling result for a single stage
148 """
149 _LG.info("=" * 60)
150 _LG.info("Stage: %s", result.name)
151 _LG.info("-" * 60)
152
153 for stat in result.stats:
154 _LG.info(
155 "Concurrency %2d: QPS=%8.2f, Occupancy=%5.1f%%",
156 stat.concurrency,
157 stat.qps,
158 stat.occupancy_rate * 100,
159 )
160
161 best_stat = max(result.stats, key=lambda s: s.qps)
162 _LG.info("-" * 60)
163 _LG.info(
164 "Best Performance: Concurrency=%d, QPS=%.2f",
165 best_stat.concurrency,
166 best_stat.qps,
167 )
168 _LG.info("=" * 60)
169
170
171def plot_profile_results(
172 results: list[ProfileResult], output_path: Path | None = None
173) -> None:
174 """Plot profiling results showing QPS vs concurrency for each stage.
175
176 Args:
177 results: List of profiling results for each pipeline stage
178 output_path: Optional path to save the plot. If None, displays the plot.
179 """
180 import matplotlib.pyplot as plt
181
182 plt.figure(figsize=(12, 6))
183
184 all_concurrencies = set()
185 for result in results:
186 concurrencies = [stat.concurrency for stat in result.stats]
187 qps_values = [stat.qps for stat in result.stats]
188 plt.plot(concurrencies, qps_values, marker="o", linewidth=2, label=result.name)
189 all_concurrencies.update(concurrencies)
190
191 sorted_concurrencies = sorted(all_concurrencies, reverse=True)
192 plt.xticks(sorted_concurrencies, [str(c) for c in sorted_concurrencies])
193
194 plt.xlabel("Number of Threads (Concurrency)", fontsize=12)
195 plt.ylabel("Throughput (QPS)", fontsize=12)
196 plt.title(
197 "Pipeline Stage Performance vs Concurrency", fontsize=14, fontweight="bold"
198 )
199 plt.legend(loc="best", fontsize=10)
200 plt.grid(True, alpha=0.3, linestyle="--")
201 plt.tight_layout()
202
203 if output_path:
204 plt.savefig(output_path, dpi=150, bbox_inches="tight")
205 _LG.info("Plot saved to: %s", output_path)
206 else:
207 plt.show()
208
209
210def run_profiling_example(num_inputs: int = 500) -> list[ProfileResult]:
211 """Run the profiling example.
212
213 Args:
214 num_inputs: Number of inputs to use for profiling
215
216 Returns:
217 List of profiling results for each stage
218 """
219 _LG.info("Creating pipeline configuration...")
220 pipeline_config = create_pipeline(num_sources=num_inputs * 2)
221
222 _LG.info("Starting pipeline profiling with %d inputs...", num_inputs)
223 _LG.info("This will benchmark each stage at different concurrency levels.")
224
225 results = profile_pipeline(
226 pipeline_config,
227 num_inputs=num_inputs,
228 callback=print_profile_result,
229 )
230
231 _LG.info("Profiling complete!")
232 _LG.info("Total stages profiled: %d", len(results))
233
234 return results
235
236
237def main() -> None:
238 """Main entry point demonstrating profile_pipeline usage."""
239 logging.basicConfig(
240 level=logging.INFO,
241 format="%(asctime)s [%(levelname)s] %(message)s",
242 )
243
244 args = parse_args()
245
246 _LG.info("Profile Pipeline Example")
247 _LG.info("=" * 60)
248
249 results = run_profiling_example(num_inputs=args.num_inputs)
250
251 _LG.info("\nSummary of Best Performance per Stage:")
252 _LG.info("=" * 60)
253 for result in results:
254 best_stat = max(result.stats, key=lambda s: s.qps)
255 _LG.info(
256 "%-20s: Best at concurrency=%2d (QPS=%.2f)",
257 result.name,
258 best_stat.concurrency,
259 best_stat.qps,
260 )
261
262 if args.plot_output or True:
263 _LG.info("\nGenerating performance plot...")
264 plot_profile_results(results, args.plot_output)
265
266
267if __name__ == "__main__":
268 main()
Functions¶
Functions
- scalable_op(x: int) int [source]¶
Simulate an operation which releases the GIL most of the time.
- Parameters:
x – Input integer
- Returns:
The input value multiplied by 2
- scalable_op2(x: int) int [source]¶
Simulate an operation which releases the GIL some time.
- Parameters:
x – Input integer
- Returns:
The input value plus 100
- op_with_contention(x: int) int [source]¶
Simulate an operation holds the GIL.
- Parameters:
x – Input integer
- Returns:
The input value squared
- create_pipeline(num_sources: int = 1000) PipelineConfig[int, int] [source]¶
Create a pipeline configuration with multiple stages.
- Parameters:
num_sources – Number of source items to generate
- Returns:
Pipeline configuration with three processing stages
- print_profile_result(result: ProfileResult) None [source]¶
Print profiling result in a formatted way.
This is a callback function that will be called after each stage is profiled.
- Parameters:
result – Profiling result for a single stage
- plot_profile_results(results: list[ProfileResult], output_path: Path | None = None) None [source]¶
Plot profiling results showing QPS vs concurrency for each stage.
- Parameters:
results – List of profiling results for each pipeline stage
output_path – Optional path to save the plot. If None, displays the plot.
- run_profiling_example(num_inputs: int = 500) list[ProfileResult] [source]¶
Run the profiling example.
- Parameters:
num_inputs – Number of inputs to use for profiling
- Returns:
List of profiling results for each stage