Performance simulation

Simulation script to demonstrate pipeline bottleneck scenarios.

This script creates a configurable multi-stage pipeline and runs with custom stage configurations to demonstrate how bottlenecks in different stages affect performance metrics.

See also

Understanding the performance statistics

Uses this script to illustrates how pipeline configuration affects the performance statistics.

The script accepts flexible stage configurations via CLI arguments, where each stage can have: - Custom processing time (sleep duration in seconds) - Custom concurrency level (number of parallel tasks) - Optional aggregation (batch size for grouping items)

Performance statistics are collected at configurable intervals and saved to a SQLite database for analysis. The foreground thread simulates a consumer (e.g., training loop) with configurable sleep duration.

Example usage:

# Single stage with 50ms processing
python performance_simulation.py --db-path output.db --stage-configs "0.050,1"

# Three stages: fast (10ms) -> medium (25ms) -> slow (40ms)
python performance_simulation.py --db-path output.db --stage-configs "0.010,1;0.025,1;0.040,1"

# Stage with concurrency and aggregation
python performance_simulation.py --db-path output.db --stage-configs "0.006,1,1;0.0,1,4"

Source

Source

Click here to see the source.
  1#!/usr/bin/env python3
  2# Copyright (c) Meta Platforms, Inc. and affiliates.
  3# All rights reserved.
  4#
  5# This source code is licensed under the BSD-style license found in the
  6# LICENSE file in the root directory of this source tree.
  7
  8# pyre-strict
  9
 10"""Simulation script to demonstrate pipeline bottleneck scenarios.
 11
 12This script creates a configurable multi-stage pipeline and runs with custom
 13stage configurations to demonstrate how bottlenecks in different stages affect
 14performance metrics.
 15
 16.. seealso::
 17
 18   :doc:`../optimization_guide/stats`
 19      Uses this script to illustrates how pipeline configuration affects
 20      the performance statistics.
 21
 22The script accepts flexible stage configurations via CLI arguments, where each
 23stage can have:
 24- Custom processing time (sleep duration in seconds)
 25- Custom concurrency level (number of parallel tasks)
 26- Optional aggregation (batch size for grouping items)
 27
 28Performance statistics are collected at configurable intervals and saved to
 29a SQLite database for analysis. The foreground thread simulates a consumer
 30(e.g., training loop) with configurable sleep duration.
 31
 32Example usage:
 33
 34.. code-block::
 35
 36   # Single stage with 50ms processing
 37   python performance_simulation.py --db-path output.db --stage-configs "0.050,1"
 38
 39   # Three stages: fast (10ms) -> medium (25ms) -> slow (40ms)
 40   python performance_simulation.py --db-path output.db --stage-configs "0.010,1;0.025,1;0.040,1"
 41
 42   # Stage with concurrency and aggregation
 43   python performance_simulation.py --db-path output.db --stage-configs "0.006,1,1;0.0,1,4"
 44"""
 45
 46import argparse
 47import asyncio
 48import logging
 49import random
 50import time
 51from collections.abc import Iterable
 52from functools import partial
 53from pathlib import Path
 54from queue import Queue
 55from typing import Any, TypeVar
 56
 57from spdl.pipeline import Pipeline, PipelineBuilder
 58
 59try:
 60    from examples.sqlite_stats_logger import (  # pyre-ignore[21]
 61        EventLogEntry,
 62        log_stats_summary,
 63        QueueStatsLogEntry,
 64        SQLiteStatsWriter,
 65        TaskStatsLogEntry,
 66    )
 67except ImportError:
 68    from spdl.examples.sqlite_stats_logger import (
 69        EventLogEntry,
 70        log_stats_summary,
 71        QueueStatsLogEntry,
 72        SQLiteStatsWriter,
 73        TaskStatsLogEntry,
 74    )
 75
 76try:
 77    from examples.performance_analysis import (  # pyre-ignore[21]
 78        StatsQueueWithLogging,
 79        TaskStatsHookWithLogging,
 80    )
 81except ImportError:
 82    from spdl.examples.performance_analysis import (
 83        StatsQueueWithLogging,
 84        TaskStatsHookWithLogging,
 85    )
 86
 87
 88__all__ = [
 89    "parse_args",
 90    "main",
 91    "build_pipeline",
 92    "SimulatedStage",
 93]
 94
 95_LG: logging.Logger = logging.getLogger(__name__)
 96
 97T = TypeVar("T")
 98
 99
100class SimulatedStage:
101    """Callable stage with configurable sleep duration.
102
103    This class simulates a processing stage with a specified average
104    sleep duration. Each invocation applies ±10% random jitter to the
105    sleep duration to simulate processing time variance.
106
107    Args:
108        sleep_duration: Average sleep duration in seconds (e.g., 0.050 for 50ms).
109            Actual duration will vary by ±10% on each call.
110    """
111
112    def __init__(self, sleep_duration: float) -> None:
113        self.sleep_duration = sleep_duration
114
115    async def __call__(self, item: T) -> T:
116        """Process an item with simulated delay including random jitter.
117
118        Applies the configured sleep duration with ±10% random jitter to simulate
119        realistic processing time variance.
120
121        Args:
122            item: The item to process. (The data is passed through unmodified)
123
124        Returns:
125            The same item (unmodified) after simulated processing delay.
126        """
127        if self.sleep_duration > 0:
128            # Add ±10% jitter to make simulation more realistic
129            jitter = random.uniform(-0.1, 0.1)
130            actual_duration = self.sleep_duration * (1 + jitter)
131            await asyncio.sleep(actual_duration)
132        return item
133
134
135def build_pipeline(
136    source: Iterable[int],
137    log_interval: float,
138    buffer: Queue[TaskStatsLogEntry | QueueStatsLogEntry | EventLogEntry],
139    stage_configs: list[tuple[float, int, int]],
140) -> Pipeline:
141    """Build a pipeline with configurable stage functions and aggregation.
142
143    Creates a pipeline with one or more stages, each having configurable processing
144    time, concurrency, and optional aggregation. Stage names are automatically
145    generated based on their configuration (e.g., "25ms", "pass", "50ms_c2").
146
147    Args:
148        source: A data source (iterable) containing integers.
149        log_interval: The interval in seconds at which performance statistics are logged.
150        buffer: Shared queue for collecting :py:class:`TaskStatsLogEntry`, :py:class`QueueStatsLogEntry`,
151            and :py:class:`EventLogEntry` objects.
152        stage_configs: List of tuples (sleep_duration, concurrency, batch_size) for each stage.
153
154            - ``sleep_duration``: Processing time in seconds (0 for passthrough)
155            - ``concurrency``: Number of parallel tasks (>= 1)
156            - ``batch_size``: Number of items to aggregate (1 for no aggregation)
157
158            Example: ``[(0.050, 1, 1), (0.025, 2, 1), (0.0, 1, 4)]`` creates three stages
159            where the last stage aggregates 4 items into batches.
160
161    Returns:
162        A configured Pipeline instance ready for execution with performance monitoring.
163    """
164
165    def hook_factory(name: str) -> list[Any]:
166        return [
167            TaskStatsHookWithLogging(
168                name=name,
169                buffer=buffer,
170                interval=log_interval,
171            )
172        ]
173
174    builder = PipelineBuilder().add_source(source=source)
175
176    # Calculate total num_threads as the sum of all stage concurrencies
177    total_threads = sum(concurrency for _, concurrency, _ in stage_configs)
178
179    # Add stages based on configuration with descriptive names
180    for sleep_duration, concurrency, batch_size in stage_configs:
181        stage = SimulatedStage(sleep_duration)
182
183        # Create short, descriptive name based on configuration
184        if sleep_duration == 0:
185            name = "pass"
186        else:
187            sleep_ms = int(sleep_duration * 1000)
188            name = f"{sleep_ms}ms"
189
190        # Add concurrency suffix if > 1
191        if concurrency > 1:
192            name = f"{name}_c{concurrency}"
193
194        builder = builder.pipe(stage, concurrency=concurrency, name=name)
195
196        # If batch_size > 1, insert aggregation after the processing stage
197        if batch_size > 1:
198            builder = builder.aggregate(batch_size)
199
200    return builder.add_sink().build(
201        num_threads=total_threads,
202        queue_class=partial(  # pyre-ignore[6]
203            StatsQueueWithLogging,
204            buffer=buffer,
205            interval=log_interval,
206        ),
207        task_hook_factory=hook_factory,
208    )
209
210
211def infinite_source() -> Iterable[int]:
212    """Generate an infinite stream of integers."""
213    counter = 0
214    while True:
215        yield counter
216        counter += 1
217
218
219def run_configuration(
220    db_path: Path,
221    stage_configs: list[tuple[float, int, int]],
222    log_interval: float,
223    run_duration: float,
224    foreground_sleep: float,
225) -> None:
226    """Run a single pipeline configuration and collect performance statistics.
227
228    Builds and runs a pipeline with the specified configuration, simulating a
229    foreground consumer (e.g., training loop) that processes items with a
230    configurable sleep duration. Performance statistics are collected and saved
231    to a SQLite database.
232
233    Args:
234        db_path: Path where the SQLite database file will be saved.
235        stage_configs: List of tuples (sleep_duration, concurrency, batch_size) defining
236            each pipeline stage's configuration.
237        log_interval: Interval in seconds at which performance statistics are logged
238            to the database.
239        run_duration: Total duration in seconds to run the pipeline before stopping.
240        foreground_sleep: Sleep duration in seconds for the foreground consumer thread
241            (e.g., 0.030 for 30ms).
242    """
243    print(f"\n{'=' * 80}")
244    print("🎯 Running Pipeline Simulation")
245    print(f"{'=' * 80}")
246    print(f"   Database: {db_path}")
247    print(f"   Stage configs: {stage_configs}")
248    print(f"   Log interval: {log_interval}s")
249    print(f"   Run duration: {run_duration}s")
250    print(f"   Foreground sleep: {foreground_sleep * 1000:.0f}ms")
251    print()
252
253    # Create shared buffer and writer
254    buffer: Queue[TaskStatsLogEntry | QueueStatsLogEntry | EventLogEntry] = Queue()
255    writer = SQLiteStatsWriter(
256        str(db_path),
257        buffer,
258        flush_interval=max(0.1, log_interval - 1),
259    )
260    writer.start()
261
262    # Build pipeline
263    pipeline = build_pipeline(
264        source=infinite_source(),
265        log_interval=log_interval,
266        buffer=buffer,
267        stage_configs=stage_configs,
268    )
269
270    try:
271        start_time = time.monotonic()
272        item_count = 0
273
274        with pipeline.auto_stop():
275            for _ in pipeline:
276                item_count += 1
277
278                # Simulate foreground work (e.g., training loop)
279                time.sleep(foreground_sleep)
280
281                # Check if we've run long enough
282                elapsed = time.monotonic() - start_time
283                if elapsed >= run_duration:
284                    break
285
286        elapsed = time.monotonic() - start_time
287        print(f"\n✅ Configuration completed in {elapsed:.2f} seconds")
288        print(f"   Items processed: {item_count}")
289        print(f"   Average throughput: {item_count / elapsed:.2f} items/sec")
290
291    finally:
292        # Ensure all stats are flushed to database
293        writer.shutdown()
294
295        # Log stats summary
296        log_stats_summary(db_path)
297
298
299def parse_args() -> argparse.Namespace:
300    """Parse command line arguments for pipeline simulation configuration.
301
302    Returns:
303        Parsed arguments including database path, stage configurations,
304        log interval, run duration, and foreground sleep time.
305    """
306    parser = argparse.ArgumentParser(
307        description=__doc__,
308        formatter_class=argparse.RawDescriptionHelpFormatter,
309    )
310    parser.add_argument(
311        "--db-path",
312        type=Path,
313        required=True,
314        help="Path to save the SQLite database file",
315    )
316    parser.add_argument(
317        "--stage-configs",
318        type=str,
319        required=True,
320        help='Stage configurations as "sleep1,concurrency1;sleep2,concurrency2;..." (e.g., "0.050,1;0.030,1;0.010,1")',
321    )
322    parser.add_argument(
323        "--log-interval",
324        type=float,
325        default=1.0,
326        help="Interval in seconds for logging stats to database (default: 1)",
327    )
328    parser.add_argument(
329        "--run-duration",
330        type=float,
331        default=10.0,
332        help="Duration in seconds to run the configuration (default: 10)",
333    )
334    parser.add_argument(
335        "--foreground-sleep",
336        type=float,
337        default=0.030,
338        help="Sleep duration in foreground thread in seconds (default: 0.030 = 30ms)",
339    )
340    return parser.parse_args()
341
342
343def parse_stage_configs(config_str: str) -> list[tuple[float, int, int]]:
344    """Parse stage configuration string into list of tuples.
345
346    Args:
347        config_str: Configuration string like "0.050,1,1;0.030,1,4;0.010,1,1;" where each stage
348                   has format "sleep,concurrency,batch_size". Batch_size is optional (defaults to 1).
349                   Examples:
350                   - "0.050,1;0.030,1" - two stages with batch_size=1 (no aggregation)
351                   - "0.050,1,1;0.030,1,4" - stage 1 aggregates 4 items into 1
352
353    Returns:
354        List of (sleep_duration, concurrency, batch_size) tuples
355
356    Raises:
357        ValueError: If the configuration string is invalid
358    """
359    try:
360        stages = []
361        for stage_str in config_str.split(";"):
362            stage_str = stage_str.strip()
363            if not stage_str:
364                continue
365            parts = stage_str.split(",")
366            if len(parts) < 2 or len(parts) > 3:
367                raise ValueError(
368                    f"Invalid stage format '{stage_str}'. Expected 'sleep,concurrency[,batch_size]'"
369                )
370            sleep_duration = float(parts[0])
371            concurrency = int(parts[1])
372            batch_size = int(parts[2]) if len(parts) == 3 else 1
373
374            if sleep_duration < 0:
375                raise ValueError(
376                    f"Sleep duration must be non-negative: {sleep_duration}"
377                )
378            if concurrency < 1:
379                raise ValueError(f"Concurrency must be at least 1: {concurrency}")
380            if batch_size < 1:
381                raise ValueError(f"Batch size must be at least 1: {batch_size}")
382
383            stages.append((sleep_duration, concurrency, batch_size))
384        if not stages:
385            raise ValueError("At least one stage configuration is required")
386        return stages
387    except (ValueError, IndexError) as e:
388        raise ValueError(
389            f"Invalid stage configuration '{config_str}': {e}. "
390            "Expected format: 'sleep1,concurrency1[,batch_size1];sleep2,concurrency2[,batch_size2];...'"
391        ) from e
392
393
394def main() -> None:
395    """Main entry point for the pipeline performance simulation.
396
397    Parses command line arguments, builds a pipeline with the specified
398    configuration, runs the simulation, and saves performance statistics
399    to a SQLite database.
400    """
401    logging.basicConfig(
402        level=logging.INFO,
403        format="%(asctime)s [%(levelname).1s]: %(message)s",
404    )
405
406    args = parse_args()
407
408    # Parse stage configurations from CLI
409    try:
410        stage_configs = parse_stage_configs(args.stage_configs)
411    except ValueError as e:
412        print(f"Error: {e}")
413        return
414
415    # Ensure output directory for database exists
416    args.db_path.parent.mkdir(parents=True, exist_ok=True)
417
418    print("\n🚀 Pipeline Bottleneck Simulation")
419    print(f"   Database file: {args.db_path}")
420    print(f"   Stage configs: {stage_configs}")
421    print()
422
423    # Run the configuration
424    run_configuration(
425        db_path=args.db_path,
426        stage_configs=stage_configs,
427        log_interval=args.log_interval,
428        run_duration=args.run_duration,
429        foreground_sleep=args.foreground_sleep,
430    )
431
432    print("\n" + "=" * 80)
433    print("🎉 Simulation completed!")
434    print("=" * 80)
435    print(f"\nDatabase file created: {args.db_path}")
436    print()
437
438
439if __name__ == "__main__":
440    main()

API Reference

Functions

parse_args() Namespace[source]

Parse command line arguments for pipeline simulation configuration.

Returns:

Parsed arguments including database path, stage configurations, log interval, run duration, and foreground sleep time.

main() None[source]

Main entry point for the pipeline performance simulation.

Parses command line arguments, builds a pipeline with the specified configuration, runs the simulation, and saves performance statistics to a SQLite database.

build_pipeline(source: Iterable[int], log_interval: float, buffer: Queue[TaskStatsLogEntry | QueueStatsLogEntry | EventLogEntry], stage_configs: list[tuple[float, int, int]]) Pipeline[source]

Build a pipeline with configurable stage functions and aggregation.

Creates a pipeline with one or more stages, each having configurable processing time, concurrency, and optional aggregation. Stage names are automatically generated based on their configuration (e.g., “25ms”, “pass”, “50ms_c2”).

Parameters:
  • source – A data source (iterable) containing integers.

  • log_interval – The interval in seconds at which performance statistics are logged.

  • buffer – Shared queue for collecting TaskStatsLogEntry, :py:class`QueueStatsLogEntry`, and EventLogEntry objects.

  • stage_configs

    List of tuples (sleep_duration, concurrency, batch_size) for each stage.

    • sleep_duration: Processing time in seconds (0 for passthrough)

    • concurrency: Number of parallel tasks (>= 1)

    • batch_size: Number of items to aggregate (1 for no aggregation)

    Example: [(0.050, 1, 1), (0.025, 2, 1), (0.0, 1, 4)] creates three stages where the last stage aggregates 4 items into batches.

Returns:

A configured Pipeline instance ready for execution with performance monitoring.

Classes

class SimulatedStage(sleep_duration: float)[source]

Callable stage with configurable sleep duration.

This class simulates a processing stage with a specified average sleep duration. Each invocation applies ±10% random jitter to the sleep duration to simulate processing time variance.

Parameters:

sleep_duration – Average sleep duration in seconds (e.g., 0.050 for 50ms). Actual duration will vary by ±10% on each call.