Performance simulation¶
Simulation script to demonstrate pipeline bottleneck scenarios.
This script creates a configurable multi-stage pipeline and runs with custom stage configurations to demonstrate how bottlenecks in different stages affect performance metrics.
See also
- Understanding the performance statistics
Uses this script to illustrates how pipeline configuration affects the performance statistics.
The script accepts flexible stage configurations via CLI arguments, where each stage can have: - Custom processing time (sleep duration in seconds) - Custom concurrency level (number of parallel tasks) - Optional aggregation (batch size for grouping items)
Performance statistics are collected at configurable intervals and saved to a SQLite database for analysis. The foreground thread simulates a consumer (e.g., training loop) with configurable sleep duration.
Example usage:
# Single stage with 50ms processing
python performance_simulation.py --db-path output.db --stage-configs "0.050,1"
# Three stages: fast (10ms) -> medium (25ms) -> slow (40ms)
python performance_simulation.py --db-path output.db --stage-configs "0.010,1;0.025,1;0.040,1"
# Stage with concurrency and aggregation
python performance_simulation.py --db-path output.db --stage-configs "0.006,1,1;0.0,1,4"
Source¶
Source
Click here to see the source.
1#!/usr/bin/env python3
2# Copyright (c) Meta Platforms, Inc. and affiliates.
3# All rights reserved.
4#
5# This source code is licensed under the BSD-style license found in the
6# LICENSE file in the root directory of this source tree.
7
8# pyre-strict
9
10"""Simulation script to demonstrate pipeline bottleneck scenarios.
11
12This script creates a configurable multi-stage pipeline and runs with custom
13stage configurations to demonstrate how bottlenecks in different stages affect
14performance metrics.
15
16.. seealso::
17
18 :doc:`../optimization_guide/stats`
19 Uses this script to illustrates how pipeline configuration affects
20 the performance statistics.
21
22The script accepts flexible stage configurations via CLI arguments, where each
23stage can have:
24- Custom processing time (sleep duration in seconds)
25- Custom concurrency level (number of parallel tasks)
26- Optional aggregation (batch size for grouping items)
27
28Performance statistics are collected at configurable intervals and saved to
29a SQLite database for analysis. The foreground thread simulates a consumer
30(e.g., training loop) with configurable sleep duration.
31
32Example usage:
33
34.. code-block::
35
36 # Single stage with 50ms processing
37 python performance_simulation.py --db-path output.db --stage-configs "0.050,1"
38
39 # Three stages: fast (10ms) -> medium (25ms) -> slow (40ms)
40 python performance_simulation.py --db-path output.db --stage-configs "0.010,1;0.025,1;0.040,1"
41
42 # Stage with concurrency and aggregation
43 python performance_simulation.py --db-path output.db --stage-configs "0.006,1,1;0.0,1,4"
44"""
45
46import argparse
47import asyncio
48import logging
49import random
50import time
51from collections.abc import Iterable
52from functools import partial
53from pathlib import Path
54from queue import Queue
55from typing import Any, TypeVar
56
57from spdl.pipeline import Pipeline, PipelineBuilder
58
59try:
60 from examples.sqlite_stats_logger import ( # pyre-ignore[21]
61 EventLogEntry,
62 log_stats_summary,
63 QueueStatsLogEntry,
64 SQLiteStatsWriter,
65 TaskStatsLogEntry,
66 )
67except ImportError:
68 from spdl.examples.sqlite_stats_logger import (
69 EventLogEntry,
70 log_stats_summary,
71 QueueStatsLogEntry,
72 SQLiteStatsWriter,
73 TaskStatsLogEntry,
74 )
75
76try:
77 from examples.performance_analysis import ( # pyre-ignore[21]
78 StatsQueueWithLogging,
79 TaskStatsHookWithLogging,
80 )
81except ImportError:
82 from spdl.examples.performance_analysis import (
83 StatsQueueWithLogging,
84 TaskStatsHookWithLogging,
85 )
86
87
88__all__ = [
89 "parse_args",
90 "main",
91 "build_pipeline",
92 "SimulatedStage",
93]
94
95_LG: logging.Logger = logging.getLogger(__name__)
96
97T = TypeVar("T")
98
99
100class SimulatedStage:
101 """Callable stage with configurable sleep duration.
102
103 This class simulates a processing stage with a specified average
104 sleep duration. Each invocation applies ±10% random jitter to the
105 sleep duration to simulate processing time variance.
106
107 Args:
108 sleep_duration: Average sleep duration in seconds (e.g., 0.050 for 50ms).
109 Actual duration will vary by ±10% on each call.
110 """
111
112 def __init__(self, sleep_duration: float) -> None:
113 self.sleep_duration = sleep_duration
114
115 async def __call__(self, item: T) -> T:
116 """Process an item with simulated delay including random jitter.
117
118 Applies the configured sleep duration with ±10% random jitter to simulate
119 realistic processing time variance.
120
121 Args:
122 item: The item to process. (The data is passed through unmodified)
123
124 Returns:
125 The same item (unmodified) after simulated processing delay.
126 """
127 if self.sleep_duration > 0:
128 # Add ±10% jitter to make simulation more realistic
129 jitter = random.uniform(-0.1, 0.1)
130 actual_duration = self.sleep_duration * (1 + jitter)
131 await asyncio.sleep(actual_duration)
132 return item
133
134
135def build_pipeline(
136 source: Iterable[int],
137 log_interval: float,
138 buffer: Queue[TaskStatsLogEntry | QueueStatsLogEntry | EventLogEntry],
139 stage_configs: list[tuple[float, int, int]],
140) -> Pipeline:
141 """Build a pipeline with configurable stage functions and aggregation.
142
143 Creates a pipeline with one or more stages, each having configurable processing
144 time, concurrency, and optional aggregation. Stage names are automatically
145 generated based on their configuration (e.g., "25ms", "pass", "50ms_c2").
146
147 Args:
148 source: A data source (iterable) containing integers.
149 log_interval: The interval in seconds at which performance statistics are logged.
150 buffer: Shared queue for collecting :py:class:`TaskStatsLogEntry`, :py:class`QueueStatsLogEntry`,
151 and :py:class:`EventLogEntry` objects.
152 stage_configs: List of tuples (sleep_duration, concurrency, batch_size) for each stage.
153
154 - ``sleep_duration``: Processing time in seconds (0 for passthrough)
155 - ``concurrency``: Number of parallel tasks (>= 1)
156 - ``batch_size``: Number of items to aggregate (1 for no aggregation)
157
158 Example: ``[(0.050, 1, 1), (0.025, 2, 1), (0.0, 1, 4)]`` creates three stages
159 where the last stage aggregates 4 items into batches.
160
161 Returns:
162 A configured Pipeline instance ready for execution with performance monitoring.
163 """
164
165 def hook_factory(name: str) -> list[Any]:
166 return [
167 TaskStatsHookWithLogging(
168 name=name,
169 buffer=buffer,
170 interval=log_interval,
171 )
172 ]
173
174 builder = PipelineBuilder().add_source(source=source)
175
176 # Calculate total num_threads as the sum of all stage concurrencies
177 total_threads = sum(concurrency for _, concurrency, _ in stage_configs)
178
179 # Add stages based on configuration with descriptive names
180 for sleep_duration, concurrency, batch_size in stage_configs:
181 stage = SimulatedStage(sleep_duration)
182
183 # Create short, descriptive name based on configuration
184 if sleep_duration == 0:
185 name = "pass"
186 else:
187 sleep_ms = int(sleep_duration * 1000)
188 name = f"{sleep_ms}ms"
189
190 # Add concurrency suffix if > 1
191 if concurrency > 1:
192 name = f"{name}_c{concurrency}"
193
194 builder = builder.pipe(stage, concurrency=concurrency, name=name)
195
196 # If batch_size > 1, insert aggregation after the processing stage
197 if batch_size > 1:
198 builder = builder.aggregate(batch_size)
199
200 return builder.add_sink().build(
201 num_threads=total_threads,
202 queue_class=partial( # pyre-ignore[6]
203 StatsQueueWithLogging,
204 buffer=buffer,
205 interval=log_interval,
206 ),
207 task_hook_factory=hook_factory,
208 )
209
210
211def infinite_source() -> Iterable[int]:
212 """Generate an infinite stream of integers."""
213 counter = 0
214 while True:
215 yield counter
216 counter += 1
217
218
219def run_configuration(
220 db_path: Path,
221 stage_configs: list[tuple[float, int, int]],
222 log_interval: float,
223 run_duration: float,
224 foreground_sleep: float,
225) -> None:
226 """Run a single pipeline configuration and collect performance statistics.
227
228 Builds and runs a pipeline with the specified configuration, simulating a
229 foreground consumer (e.g., training loop) that processes items with a
230 configurable sleep duration. Performance statistics are collected and saved
231 to a SQLite database.
232
233 Args:
234 db_path: Path where the SQLite database file will be saved.
235 stage_configs: List of tuples (sleep_duration, concurrency, batch_size) defining
236 each pipeline stage's configuration.
237 log_interval: Interval in seconds at which performance statistics are logged
238 to the database.
239 run_duration: Total duration in seconds to run the pipeline before stopping.
240 foreground_sleep: Sleep duration in seconds for the foreground consumer thread
241 (e.g., 0.030 for 30ms).
242 """
243 print(f"\n{'=' * 80}")
244 print("🎯 Running Pipeline Simulation")
245 print(f"{'=' * 80}")
246 print(f" Database: {db_path}")
247 print(f" Stage configs: {stage_configs}")
248 print(f" Log interval: {log_interval}s")
249 print(f" Run duration: {run_duration}s")
250 print(f" Foreground sleep: {foreground_sleep * 1000:.0f}ms")
251 print()
252
253 # Create shared buffer and writer
254 buffer: Queue[TaskStatsLogEntry | QueueStatsLogEntry | EventLogEntry] = Queue()
255 writer = SQLiteStatsWriter(
256 str(db_path),
257 buffer,
258 flush_interval=max(0.1, log_interval - 1),
259 )
260 writer.start()
261
262 # Build pipeline
263 pipeline = build_pipeline(
264 source=infinite_source(),
265 log_interval=log_interval,
266 buffer=buffer,
267 stage_configs=stage_configs,
268 )
269
270 try:
271 start_time = time.monotonic()
272 item_count = 0
273
274 with pipeline.auto_stop():
275 for _ in pipeline:
276 item_count += 1
277
278 # Simulate foreground work (e.g., training loop)
279 time.sleep(foreground_sleep)
280
281 # Check if we've run long enough
282 elapsed = time.monotonic() - start_time
283 if elapsed >= run_duration:
284 break
285
286 elapsed = time.monotonic() - start_time
287 print(f"\n✅ Configuration completed in {elapsed:.2f} seconds")
288 print(f" Items processed: {item_count}")
289 print(f" Average throughput: {item_count / elapsed:.2f} items/sec")
290
291 finally:
292 # Ensure all stats are flushed to database
293 writer.shutdown()
294
295 # Log stats summary
296 log_stats_summary(db_path)
297
298
299def parse_args() -> argparse.Namespace:
300 """Parse command line arguments for pipeline simulation configuration.
301
302 Returns:
303 Parsed arguments including database path, stage configurations,
304 log interval, run duration, and foreground sleep time.
305 """
306 parser = argparse.ArgumentParser(
307 description=__doc__,
308 formatter_class=argparse.RawDescriptionHelpFormatter,
309 )
310 parser.add_argument(
311 "--db-path",
312 type=Path,
313 required=True,
314 help="Path to save the SQLite database file",
315 )
316 parser.add_argument(
317 "--stage-configs",
318 type=str,
319 required=True,
320 help='Stage configurations as "sleep1,concurrency1;sleep2,concurrency2;..." (e.g., "0.050,1;0.030,1;0.010,1")',
321 )
322 parser.add_argument(
323 "--log-interval",
324 type=float,
325 default=1.0,
326 help="Interval in seconds for logging stats to database (default: 1)",
327 )
328 parser.add_argument(
329 "--run-duration",
330 type=float,
331 default=10.0,
332 help="Duration in seconds to run the configuration (default: 10)",
333 )
334 parser.add_argument(
335 "--foreground-sleep",
336 type=float,
337 default=0.030,
338 help="Sleep duration in foreground thread in seconds (default: 0.030 = 30ms)",
339 )
340 return parser.parse_args()
341
342
343def parse_stage_configs(config_str: str) -> list[tuple[float, int, int]]:
344 """Parse stage configuration string into list of tuples.
345
346 Args:
347 config_str: Configuration string like "0.050,1,1;0.030,1,4;0.010,1,1;" where each stage
348 has format "sleep,concurrency,batch_size". Batch_size is optional (defaults to 1).
349 Examples:
350 - "0.050,1;0.030,1" - two stages with batch_size=1 (no aggregation)
351 - "0.050,1,1;0.030,1,4" - stage 1 aggregates 4 items into 1
352
353 Returns:
354 List of (sleep_duration, concurrency, batch_size) tuples
355
356 Raises:
357 ValueError: If the configuration string is invalid
358 """
359 try:
360 stages = []
361 for stage_str in config_str.split(";"):
362 stage_str = stage_str.strip()
363 if not stage_str:
364 continue
365 parts = stage_str.split(",")
366 if len(parts) < 2 or len(parts) > 3:
367 raise ValueError(
368 f"Invalid stage format '{stage_str}'. Expected 'sleep,concurrency[,batch_size]'"
369 )
370 sleep_duration = float(parts[0])
371 concurrency = int(parts[1])
372 batch_size = int(parts[2]) if len(parts) == 3 else 1
373
374 if sleep_duration < 0:
375 raise ValueError(
376 f"Sleep duration must be non-negative: {sleep_duration}"
377 )
378 if concurrency < 1:
379 raise ValueError(f"Concurrency must be at least 1: {concurrency}")
380 if batch_size < 1:
381 raise ValueError(f"Batch size must be at least 1: {batch_size}")
382
383 stages.append((sleep_duration, concurrency, batch_size))
384 if not stages:
385 raise ValueError("At least one stage configuration is required")
386 return stages
387 except (ValueError, IndexError) as e:
388 raise ValueError(
389 f"Invalid stage configuration '{config_str}': {e}. "
390 "Expected format: 'sleep1,concurrency1[,batch_size1];sleep2,concurrency2[,batch_size2];...'"
391 ) from e
392
393
394def main() -> None:
395 """Main entry point for the pipeline performance simulation.
396
397 Parses command line arguments, builds a pipeline with the specified
398 configuration, runs the simulation, and saves performance statistics
399 to a SQLite database.
400 """
401 logging.basicConfig(
402 level=logging.INFO,
403 format="%(asctime)s [%(levelname).1s]: %(message)s",
404 )
405
406 args = parse_args()
407
408 # Parse stage configurations from CLI
409 try:
410 stage_configs = parse_stage_configs(args.stage_configs)
411 except ValueError as e:
412 print(f"Error: {e}")
413 return
414
415 # Ensure output directory for database exists
416 args.db_path.parent.mkdir(parents=True, exist_ok=True)
417
418 print("\n🚀 Pipeline Bottleneck Simulation")
419 print(f" Database file: {args.db_path}")
420 print(f" Stage configs: {stage_configs}")
421 print()
422
423 # Run the configuration
424 run_configuration(
425 db_path=args.db_path,
426 stage_configs=stage_configs,
427 log_interval=args.log_interval,
428 run_duration=args.run_duration,
429 foreground_sleep=args.foreground_sleep,
430 )
431
432 print("\n" + "=" * 80)
433 print("🎉 Simulation completed!")
434 print("=" * 80)
435 print(f"\nDatabase file created: {args.db_path}")
436 print()
437
438
439if __name__ == "__main__":
440 main()
API Reference¶
Functions
- parse_args() Namespace[source]¶
Parse command line arguments for pipeline simulation configuration.
- Returns:
Parsed arguments including database path, stage configurations, log interval, run duration, and foreground sleep time.
- main() None[source]¶
Main entry point for the pipeline performance simulation.
Parses command line arguments, builds a pipeline with the specified configuration, runs the simulation, and saves performance statistics to a SQLite database.
- build_pipeline(source: Iterable[int], log_interval: float, buffer: Queue[TaskStatsLogEntry | QueueStatsLogEntry | EventLogEntry], stage_configs: list[tuple[float, int, int]]) Pipeline[source]¶
Build a pipeline with configurable stage functions and aggregation.
Creates a pipeline with one or more stages, each having configurable processing time, concurrency, and optional aggregation. Stage names are automatically generated based on their configuration (e.g., “25ms”, “pass”, “50ms_c2”).
- Parameters:
source – A data source (iterable) containing integers.
log_interval – The interval in seconds at which performance statistics are logged.
buffer – Shared queue for collecting
TaskStatsLogEntry, :py:class`QueueStatsLogEntry`, andEventLogEntryobjects.stage_configs –
List of tuples (sleep_duration, concurrency, batch_size) for each stage.
sleep_duration: Processing time in seconds (0 for passthrough)concurrency: Number of parallel tasks (>= 1)batch_size: Number of items to aggregate (1 for no aggregation)
Example:
[(0.050, 1, 1), (0.025, 2, 1), (0.0, 1, 4)]creates three stages where the last stage aggregates 4 items into batches.
- Returns:
A configured Pipeline instance ready for execution with performance monitoring.
Classes
- class SimulatedStage(sleep_duration: float)[source]¶
Callable stage with configurable sleep duration.
This class simulates a processing stage with a specified average sleep duration. Each invocation applies ±10% random jitter to the sleep duration to simulate processing time variance.
- Parameters:
sleep_duration – Average sleep duration in seconds (e.g., 0.050 for 50ms). Actual duration will vary by ±10% on each call.