Pipeline profiling

Example demonstrating the use of spdl.pipeline.profile_pipeline().

The profile_pipeline() function allows you to benchmark your pipeline stages independently across different concurrency levels to identify optimal performance settings. This is particularly useful when tuning pipeline performance before deploying to production.

This example shows how to:

  1. Create a simple pipeline with multiple processing stages

  2. Use profile_pipeline() to benchmark each stage

  3. Analyze the profiling results to identify performance bottlenecks

  4. Use a custom callback to process results as they are generated

  5. Visualize performance results with a plot

This example generates an figure like the following.

../_static/data/profile_pipeline_example.png

If a function is time-consuming like networking or performing, as long as the GIL is released, the performance improves with more threads. ("scalable_op" and "scalable_op2").

The function might be constrained by other factors such as CPU resource, and it can hit the peak performance at some point. ("scalable_op2")

If the function holds the GIL completely, the performance peaks at single concurrency, and it degrades as more threads are added. ("op_with_contention")

Source

Source

Click here to see the source.
  1#!/usr/bin/env python3
  2# Copyright (c) Meta Platforms, Inc. and affiliates.
  3# All rights reserved.
  4#
  5# This source code is licensed under the BSD-style license found in the
  6# LICENSE file in the root directory of this source tree.
  7
  8"""Example demonstrating the use of :py:func:`spdl.pipeline.profile_pipeline`.
  9
 10The :py:func:`~spdl.pipeline.profile_pipeline` function allows you to benchmark your pipeline
 11stages independently across different concurrency levels to identify optimal
 12performance settings. This is particularly useful when tuning pipeline performance
 13before deploying to production.
 14
 15This example shows how to:
 16
 171. Create a simple pipeline with multiple processing stages
 182. Use :py:func:`~spdl.pipeline.profile_pipeline` to benchmark each stage
 193. Analyze the profiling results to identify performance bottlenecks
 204. Use a custom callback to process results as they are generated
 215. Visualize performance results with a plot
 22
 23This example generates an figure like the following.
 24
 25.. image:: ../../_static/data/profile_pipeline_example.png
 26
 27If a function is time-consuming like networking or performing,
 28as long as the GIL is released, the performance improves with more threads.
 29(``"scalable_op"`` and ``"scalable_op2"``).
 30
 31The function might be constrained by other factors such as CPU resource,
 32and it can hit the peak performance at some point. (``"scalable_op2"``)
 33
 34If the function holds the GIL completely, the performance peaks at single
 35concurrency, and it degrades as more threads are added. (``"op_with_contention"``)
 36"""
 37
 38import argparse
 39import logging
 40import time
 41from pathlib import Path
 42
 43from spdl.pipeline import profile_pipeline, ProfileResult
 44from spdl.pipeline.defs import Pipe, PipelineConfig, SinkConfig, SourceConfig
 45
 46__all__ = [
 47    "parse_args",
 48    "main",
 49    "scalable_op",
 50    "scalable_op2",
 51    "op_with_contention",
 52    "create_pipeline",
 53    "print_profile_result",
 54    "plot_profile_results",
 55    "run_profiling_example",
 56]
 57
 58# pyre-strict
 59
 60_LG: logging.Logger = logging.getLogger(__name__)
 61
 62
 63def parse_args() -> argparse.Namespace:
 64    """Parse command line arguments."""
 65    parser = argparse.ArgumentParser(
 66        description=__doc__,
 67        formatter_class=argparse.RawDescriptionHelpFormatter,
 68    )
 69    parser.add_argument(
 70        "--num-inputs",
 71        type=int,
 72        default=500,
 73        help="Number of inputs to use for profiling each stage",
 74    )
 75    parser.add_argument(
 76        "--plot-output",
 77        type=Path,
 78        help="Path to save the performance plot (e.g., profile_results.png)",
 79    )
 80    return parser.parse_args()
 81
 82
 83def scalable_op(x: int) -> int:
 84    """Simulate an operation which releases the GIL most of the time.
 85
 86    Args:
 87        x: Input integer
 88
 89    Returns:
 90        The input value multiplied by 2
 91    """
 92    time.sleep(0.01)
 93    return x * 2
 94
 95
 96def scalable_op2(x: int) -> int:
 97    """Simulate an operation which releases the GIL some time.
 98
 99    Args:
100        x: Input integer
101
102    Returns:
103        The input value plus 100
104    """
105    time.sleep(0.003)
106    return x + 100
107
108
109def op_with_contention(x: int) -> int:
110    """Simulate an operation holds the GIL.
111
112    Args:
113        x: Input integer
114
115    Returns:
116        The input value squared
117    """
118    return x**2
119
120
121def create_pipeline(num_sources: int = 1000) -> PipelineConfig[int, int]:
122    """Create a pipeline configuration with multiple stages.
123
124    Args:
125        num_sources: Number of source items to generate
126
127    Returns:
128        Pipeline configuration with three processing stages
129    """
130    return PipelineConfig(
131        src=SourceConfig(range(num_sources)),
132        pipes=[
133            Pipe(scalable_op),
134            Pipe(scalable_op2),
135            Pipe(op_with_contention),
136        ],
137        sink=SinkConfig(buffer_size=10),
138    )
139
140
141def print_profile_result(result: ProfileResult) -> None:
142    """Print profiling result in a formatted way.
143
144    This is a callback function that will be called after each stage is profiled.
145
146    Args:
147        result: Profiling result for a single stage
148    """
149    _LG.info("=" * 60)
150    _LG.info("Stage: %s", result.name)
151    _LG.info("-" * 60)
152
153    for stat in result.stats:
154        _LG.info(
155            "Concurrency %2d: QPS=%8.2f, Occupancy=%5.1f%%",
156            stat.concurrency,
157            stat.qps,
158            stat.occupancy_rate * 100,
159        )
160
161    best_stat = max(result.stats, key=lambda s: s.qps)
162    _LG.info("-" * 60)
163    _LG.info(
164        "Best Performance: Concurrency=%d, QPS=%.2f",
165        best_stat.concurrency,
166        best_stat.qps,
167    )
168    _LG.info("=" * 60)
169
170
171def plot_profile_results(
172    results: list[ProfileResult], output_path: Path | None = None
173) -> None:
174    """Plot profiling results showing QPS vs concurrency for each stage.
175
176    Args:
177        results: List of profiling results for each pipeline stage
178        output_path: Optional path to save the plot. If None, displays the plot.
179    """
180    import matplotlib.pyplot as plt
181
182    plt.figure(figsize=(12, 6))
183
184    all_concurrencies = set()
185    for result in results:
186        concurrencies = [stat.concurrency for stat in result.stats]
187        qps_values = [stat.qps for stat in result.stats]
188        plt.plot(concurrencies, qps_values, marker="o", linewidth=2, label=result.name)
189        all_concurrencies.update(concurrencies)
190
191    sorted_concurrencies = sorted(all_concurrencies, reverse=True)
192    plt.xticks(sorted_concurrencies, [str(c) for c in sorted_concurrencies])
193
194    plt.xlabel("Number of Threads (Concurrency)", fontsize=12)
195    plt.ylabel("Throughput (QPS)", fontsize=12)
196    plt.title(
197        "Pipeline Stage Performance vs Concurrency", fontsize=14, fontweight="bold"
198    )
199    plt.legend(loc="best", fontsize=10)
200    plt.grid(True, alpha=0.3, linestyle="--")
201    plt.tight_layout()
202
203    if output_path:
204        plt.savefig(output_path, dpi=150, bbox_inches="tight")
205        _LG.info("Plot saved to: %s", output_path)
206    else:
207        plt.show()
208
209
210def run_profiling_example(num_inputs: int = 500) -> list[ProfileResult]:
211    """Run the profiling example.
212
213    Args:
214        num_inputs: Number of inputs to use for profiling
215
216    Returns:
217        List of profiling results for each stage
218    """
219    _LG.info("Creating pipeline configuration...")
220    pipeline_config = create_pipeline(num_sources=num_inputs * 2)
221
222    _LG.info("Starting pipeline profiling with %d inputs...", num_inputs)
223    _LG.info("This will benchmark each stage at different concurrency levels.")
224
225    results = profile_pipeline(
226        pipeline_config,
227        num_inputs=num_inputs,
228        callback=print_profile_result,
229    )
230
231    _LG.info("Profiling complete!")
232    _LG.info("Total stages profiled: %d", len(results))
233
234    return results
235
236
237def main() -> None:
238    """Main entry point demonstrating profile_pipeline usage."""
239    logging.basicConfig(
240        level=logging.INFO,
241        format="%(asctime)s [%(levelname)s] %(message)s",
242    )
243
244    args = parse_args()
245
246    _LG.info("Profile Pipeline Example")
247    _LG.info("=" * 60)
248
249    results = run_profiling_example(num_inputs=args.num_inputs)
250
251    _LG.info("\nSummary of Best Performance per Stage:")
252    _LG.info("=" * 60)
253    for result in results:
254        best_stat = max(result.stats, key=lambda s: s.qps)
255        _LG.info(
256            "%-20s: Best at concurrency=%2d (QPS=%.2f)",
257            result.name,
258            best_stat.concurrency,
259            best_stat.qps,
260        )
261
262    if args.plot_output or True:
263        _LG.info("\nGenerating performance plot...")
264        plot_profile_results(results, args.plot_output)
265
266
267if __name__ == "__main__":
268    main()

Functions

Functions

parse_args() Namespace[source]

Parse command line arguments.

main() None[source]

Main entry point demonstrating profile_pipeline usage.

scalable_op(x: int) int[source]

Simulate an operation which releases the GIL most of the time.

Parameters:

x – Input integer

Returns:

The input value multiplied by 2

scalable_op2(x: int) int[source]

Simulate an operation which releases the GIL some time.

Parameters:

x – Input integer

Returns:

The input value plus 100

op_with_contention(x: int) int[source]

Simulate an operation holds the GIL.

Parameters:

x – Input integer

Returns:

The input value squared

create_pipeline(num_sources: int = 1000) PipelineConfig[int, int][source]

Create a pipeline configuration with multiple stages.

Parameters:

num_sources – Number of source items to generate

Returns:

Pipeline configuration with three processing stages

print_profile_result(result: ProfileResult) None[source]

Print profiling result in a formatted way.

This is a callback function that will be called after each stage is profiled.

Parameters:

result – Profiling result for a single stage

plot_profile_results(results: list[ProfileResult], output_path: Path | None = None) None[source]

Plot profiling results showing QPS vs concurrency for each stage.

Parameters:
  • results – List of profiling results for each pipeline stage

  • output_path – Optional path to save the plot. If None, displays the plot.

run_profiling_example(num_inputs: int = 500) list[ProfileResult][source]

Run the profiling example.

Parameters:

num_inputs – Number of inputs to use for profiling

Returns:

List of profiling results for each stage