Benchmark utils

Common utilities for benchmark scripts.

This module provides a standardized framework for running benchmarks with:

Source

Source

Click here to see the source.
  1#!/usr/bin/env python3
  2# Copyright (c) Meta Platforms, Inc. and affiliates.
  3# All rights reserved.
  4#
  5# This source code is licensed under the BSD-style license found in the
  6# LICENSE file in the root directory of this source tree.
  7
  8"""Common utilities for benchmark scripts.
  9
 10This module provides a standardized framework for running benchmarks with:
 11
 12- Configurable executor types (
 13  :py:class:`~concurrent.futures.ThreadPoolExecutor`,
 14  :py:class:`~concurrent.futures.ProcessPoolExecutor`,
 15  :py:class:`~concurrent.futures.InterpreterPoolExecutor`)
 16- Warmup phase to exclude executor initialization overhead
 17- Statistical analysis with confidence intervals
 18- CSV export functionality
 19- Python version and free-threaded ABI detection
 20
 21.. seealso::
 22
 23   - :doc:`./benchmark_tarfile`
 24   - :doc:`./benchmark_wav`
 25   - :doc:`./benchmark_numpy`
 26
 27"""
 28
 29__all__ = [
 30    "BenchmarkRunner",
 31    "BenchmarkResult",
 32    "ExecutorType",
 33    "get_default_result_path",
 34    "load_results_from_csv",
 35    "save_results_to_csv",
 36]
 37
 38import csv
 39import os
 40import sys
 41import time
 42from collections.abc import Callable
 43from concurrent.futures import (
 44    as_completed,
 45    Executor,
 46    ProcessPoolExecutor,
 47    ThreadPoolExecutor,
 48)
 49from dataclasses import asdict, dataclass
 50from datetime import datetime, timezone
 51from enum import Enum
 52from functools import partial
 53from typing import Any, Generic, TypeVar
 54
 55import numpy as np
 56import scipy.stats
 57
 58T = TypeVar("T")
 59ConfigT = TypeVar("ConfigT")
 60
 61
 62@dataclass
 63class BenchmarkResult(Generic[ConfigT]):
 64    """BenchmarkResult()
 65
 66    Generic benchmark result containing configuration and performance metrics.
 67
 68    This class holds both the benchmark-specific configuration and the
 69    common performance statistics. It is parameterized by the config type,
 70    which allows each benchmark script to define its own configuration dataclass.
 71    """
 72
 73    config: ConfigT
 74    """Benchmark-specific configuration (e.g., data format, file size, etc.)"""
 75
 76    executor_type: str
 77    """Type of executor used (thread, process, or interpreter)"""
 78
 79    qps: float
 80    """Queries per second (mean)"""
 81
 82    ci_lower: float
 83    """Lower bound of 95% confidence interval for QPS"""
 84
 85    ci_upper: float
 86    """Upper bound of 95% confidence interval for QPS"""
 87
 88    date: str
 89    """When benchmark was run. ISO 8601 format."""
 90
 91    python_version: str
 92    """Python version used for the benchmark"""
 93
 94    free_threaded: bool
 95    """Whether Python is running with free-threaded ABI."""
 96
 97
 98class ExecutorType(Enum):
 99    """ExecutorType()
100
101    Supported executor types for concurrent execution."""
102
103    THREAD = "thread"
104    """Use :py:class:`~concurrent.futures.ThreadPoolExecutor`."""
105
106    PROCESS = "process"
107    """Use :py:class:`~concurrent.futures.ProcessPoolExecutor`."""
108
109    INTERPRETER = "interpreter"
110    """Use :py:class:`~concurrent.futures.InterpreterPoolExecutor`.
111    
112    Requires Python 3.14+.
113    """
114
115
116def _get_python_info() -> tuple[str, bool]:
117    """Get Python version and free-threaded ABI information.
118
119    Returns:
120        Tuple of (``python_version``, ``is_free_threaded``)
121    """
122    python_version = (
123        f"{sys.version_info.major}.{sys.version_info.minor}.{sys.version_info.micro}"
124    )
125    try:
126        is_free_threaded = not sys._is_gil_enabled()  # pyre-ignore[16]
127    except AttributeError:
128        is_free_threaded = False
129    return python_version, is_free_threaded
130
131
132def _create_executor(executor_type: ExecutorType, max_workers: int) -> Executor:
133    """Create an executor of the specified type.
134
135    Args:
136        executor_type: Type of executor to create
137        max_workers: Maximum number of workers
138
139    Returns:
140        Executor instance
141
142    Raises:
143        ValueError: If ``executor_type`` is not supported
144    """
145    match executor_type:
146        case ExecutorType.THREAD:
147            return ThreadPoolExecutor(max_workers=max_workers)
148        case ExecutorType.PROCESS:
149            return ProcessPoolExecutor(max_workers=max_workers)
150        case ExecutorType.INTERPRETER:
151            from concurrent.futures import InterpreterPoolExecutor  # pyre-ignore[21]
152
153            return InterpreterPoolExecutor(max_workers=max_workers)
154        case _:
155            raise ValueError(f"Unsupported executor type: {executor_type}")
156
157
158def _verify_workers(executor: Executor, expected_workers: int) -> None:
159    """Verify that the executor has created the expected number of workers.
160
161    Args:
162        executor: The executor to verify
163        expected_workers: Expected number of workers
164
165    Raises:
166        RuntimeError: If the number of workers doesn't match expected
167    """
168    match executor:
169        case ThreadPoolExecutor():
170            actual_workers = len(executor._threads)
171        case ProcessPoolExecutor():
172            actual_workers = len(executor._processes)
173        case _:
174            raise ValueError(f"Unexpected executor type {type(executor)}")
175
176    if actual_workers != expected_workers:
177        raise RuntimeError(
178            f"Expected {expected_workers} workers, but executor has {actual_workers}"
179        )
180
181
182def _warmup_executor(
183    executor: Executor, func: Callable[[], T], num_iterations: int
184) -> T:
185    """Warmup the executor by running the function multiple times.
186
187    Args:
188        executor: The executor to warmup
189        func: Function to run for warmup
190        num_iterations: Number of warmup iterations
191
192    Returns:
193        Output from the last warmup iteration
194    """
195    futures = [executor.submit(func) for _ in range(num_iterations)]
196    last_output: T | None = None
197    for future in as_completed(futures):
198        last_output = future.result()
199    return last_output  # pyre-ignore[7]
200
201
202class BenchmarkRunner:
203    """Runner for executing benchmarks with configurable executors.
204
205    This class provides a standardized way to run benchmarks with:
206
207    - Warmup phase to exclude executor initialization overhead
208    - Multiple runs for statistical confidence intervals
209    - Support for different executor types
210
211    The executor is initialized and warmed up in the constructor to exclude
212    initialization overhead from benchmark measurements.
213
214    Args:
215        executor_type: Type of executor to use
216            (``"thread"``, ``"process"``, or ``"interpreter"``)
217        num_workers: Number of concurrent workers
218        warmup_iterations: Number of warmup iterations (default: ``2 * num_workers``)
219    """
220
221    def __init__(
222        self,
223        executor_type: ExecutorType,
224        num_workers: int,
225        warmup_iterations: int | None = None,
226    ) -> None:
227        self._executor_type: ExecutorType = executor_type
228
229        warmup_iters = (
230            warmup_iterations if warmup_iterations is not None else 2 * num_workers
231        )
232
233        self._executor: Executor = _create_executor(executor_type, num_workers)
234
235        _warmup_executor(self._executor, partial(time.sleep, 1), warmup_iters)
236        _verify_workers(self._executor, num_workers)
237
238    @property
239    def executor_type(self) -> ExecutorType:
240        """Get the executor type."""
241        return self._executor_type
242
243    def __enter__(self) -> "BenchmarkRunner":
244        """Enter context manager."""
245        return self
246
247    def __exit__(self, exc_type: Any, exc_val: Any, exc_tb: Any) -> None:
248        """Exit context manager and shutdown executor."""
249        self._executor.shutdown(wait=True)
250
251    def _run_iterations(
252        self,
253        func: Callable[[], T],
254        iterations: int,
255        num_runs: int,
256    ) -> tuple[list[float], T]:
257        """Run benchmark iterations and collect QPS samples.
258
259        Args:
260            func: Function to benchmark (takes no arguments)
261            iterations: Number of iterations per run
262            num_runs: Number of benchmark runs
263
264        Returns:
265            Tuple of (list of QPS samples from each run, last function output)
266        """
267        qps_samples: list[float] = []
268        last_output: T | None = None
269
270        for _ in range(num_runs):
271            t0 = time.perf_counter()
272            futures = [self._executor.submit(func) for _ in range(iterations)]
273            for future in as_completed(futures):
274                last_output = future.result()
275            elapsed = time.perf_counter() - t0
276            qps_samples.append(iterations / elapsed)
277
278        return qps_samples, last_output  # pyre-ignore[7]
279
280    def run(
281        self,
282        config: ConfigT,
283        func: Callable[[], T],
284        iterations: int,
285        num_runs: int = 5,
286        confidence_level: float = 0.95,
287    ) -> tuple[BenchmarkResult[ConfigT], T]:
288        """Run benchmark and return results with configuration.
289
290        Args:
291            config: Benchmark-specific configuration
292            func: Function to benchmark (takes no arguments)
293            iterations: Number of iterations per run
294            num_runs: Number of benchmark runs for confidence interval calculation
295                (default: ``5``)
296            confidence_level: Confidence level for interval calculation (default: ``0.95``)
297
298        Returns:
299            Tuple of (``BenchmarkResult``, last output from function)
300        """
301        qps_samples, last_output = self._run_iterations(func, iterations, num_runs)
302
303        qps_mean = np.mean(qps_samples)
304        qps_std = np.std(qps_samples, ddof=1)
305        degrees_freedom = num_runs - 1
306        confidence_interval = scipy.stats.t.interval(
307            confidence_level,
308            degrees_freedom,
309            loc=qps_mean,
310            scale=qps_std / np.sqrt(num_runs),
311        )
312
313        python_version, free_threaded = _get_python_info()
314        date = datetime.now(timezone.utc).isoformat()
315
316        result = BenchmarkResult(
317            config=config,
318            executor_type=self.executor_type.value,
319            qps=float(qps_mean),
320            ci_lower=float(confidence_interval[0]),
321            ci_upper=float(confidence_interval[1]),
322            date=date,
323            python_version=python_version,
324            free_threaded=free_threaded,
325        )
326
327        return result, last_output
328
329
330def get_default_result_path(path: str, ext: str = ".csv") -> str:
331    """Get the default result path with Python version appended."""
332    base, _ = os.path.splitext(os.path.realpath(path))
333    dirname = os.path.join(os.path.dirname(base), "data")
334    filename = os.path.basename(base)
335    python_version, free_threaded = _get_python_info()
336    version_suffix = (
337        f"_{'.'.join(python_version.split('.')[:2])}{'t' if free_threaded else ''}"
338    )
339    return os.path.join(dirname, f"{filename}{version_suffix}{ext}")
340
341
342def save_results_to_csv(
343    results: list[BenchmarkResult[Any]],
344    output_file: str,
345) -> None:
346    """Save benchmark results to a CSV file.
347
348    Flattens the nested BenchmarkResult structure (config + performance metrics)
349    into a flat CSV format. Each row contains both the benchmark configuration
350    fields and the performance metrics.
351
352    Args:
353        results: List of BenchmarkResult instances
354        output_file: Output file path for the CSV file
355    """
356    if not results:
357        raise ValueError("No results to save")
358
359    flattened_results = []
360    for result in results:
361        config_dict = asdict(result.config)
362        # convert bool to int for slight readability improvement of raw CSV file
363        config_dict = {
364            k: (int(v) if isinstance(v, bool) else v) for k, v in config_dict.items()
365        }
366        flattened = {
367            "date": result.date,
368            "python_version": result.python_version,
369            "free_threaded": int(result.free_threaded),
370            **config_dict,
371            "executor_type": result.executor_type,
372            "qps": result.qps,
373            "ci_lower": result.ci_lower,
374            "ci_upper": result.ci_upper,
375        }
376        flattened_results.append(flattened)
377
378    # Get all field names from the first result
379    fieldnames = list(flattened_results[0].keys())
380
381    output_path = os.path.realpath(output_file)
382    os.makedirs(os.path.dirname(output_file), exist_ok=True)
383    with open(output_path, "w", newline="") as csvfile:
384        # Write generated marker as first line
385        # Note: Splitting the marker so as to avoid linter consider this file as generated file
386        csvfile.write("# @" "generated\n")
387
388        writer = csv.DictWriter(csvfile, fieldnames=fieldnames)
389        writer.writeheader()
390        for result_dict in flattened_results:
391            writer.writerow(result_dict)
392
393    print(f"Results saved to {output_file}")
394
395
396def load_results_from_csv(
397    input_file: str,
398    config_type: type[ConfigT],
399) -> list[BenchmarkResult[ConfigT]]:
400    """Load benchmark results from a CSV file.
401
402    Reconstructs BenchmarkResult objects from the flattened CSV format created
403    by :py:func:`save_results_to_csv`.
404    Each row in the CSV is parsed into a :py:class:`BenchmarkResult`
405    with the appropriate config type.
406
407    Args:
408        input_file: Input CSV file path
409        config_type: The dataclass type to use for the config field
410
411    Returns:
412        List of BenchmarkResult instances with parsed config objects
413
414    Raises:
415        FileNotFoundError: If input_file does not exist
416        ValueError: If CSV format is invalid or ``config_type`` is not a dataclass
417    """
418    if not hasattr(config_type, "__dataclass_fields__"):
419        raise ValueError(f"config_type must be a dataclass, got {config_type}")
420    fields: dict[str, Any] = config_type.__dataclass_fields__  # pyre-ignore[16]
421
422    # Normalize input path and resolve symbolic links
423    input_file = os.path.realpath(input_file)
424
425    # Get the field names from the config dataclass
426    config_fields = set(fields.keys())
427
428    # Performance metric fields that are part of BenchmarkResult
429    result_fields = {
430        "executor_type",
431        "qps",
432        "ci_lower",
433        "ci_upper",
434        "date",
435        "python_version",
436        "free_threaded",
437    }
438
439    results: list[BenchmarkResult[ConfigT]] = []
440
441    TRUES = ("true", "1", "yes")
442
443    with open(input_file, newline="") as csvfile:
444        reader = csv.DictReader((v for v in csvfile if not v.strip().startswith("#")))
445
446        for row in reader:
447            # Split row into config fields and result fields
448            config_dict = {}
449            result_dict = {}
450
451            for key, value in row.items():
452                if key in config_fields:
453                    config_dict[key] = value
454                elif key in result_fields:
455                    result_dict[key] = value
456                else:
457                    # Unknown field - could be from config or result
458                    # Try to infer based on whether it matches a config field name
459                    config_dict[key] = value
460
461            # Convert string values to appropriate types for config
462            typed_config_dict = {}
463            for field_name, field_info in fields.items():
464                if field_name not in config_dict:
465                    continue
466
467                value = config_dict[field_name]
468                field_type = field_info.type
469
470                # Handle type conversions
471                if field_type is int or field_type == "int":
472                    typed_config_dict[field_name] = int(value)
473                elif field_type is float or field_type == "float":
474                    typed_config_dict[field_name] = float(value)
475                elif field_type is bool or field_type == "bool":
476                    typed_config_dict[field_name] = value.lower() in TRUES
477                else:
478                    # Keep as string or use the value as-is
479                    typed_config_dict[field_name] = value
480
481            result = BenchmarkResult(
482                config=config_type(**typed_config_dict),
483                executor_type=result_dict["executor_type"],
484                qps=float(result_dict["qps"]),
485                ci_lower=float(result_dict["ci_lower"]),
486                ci_upper=float(result_dict["ci_upper"]),
487                date=result_dict["date"],
488                python_version=result_dict["python_version"],
489                free_threaded=result_dict["free_threaded"].lower()
490                in ("true", "1", "yes"),
491            )
492
493            results.append(result)
494
495    return results

Functions

Functions

get_default_result_path(path: str, ext: str = '.csv') str[source]

Get the default result path with Python version appended.

load_results_from_csv(input_file: str, config_type: type[ConfigT]) list[BenchmarkResult[ConfigT]][source]

Load benchmark results from a CSV file.

Reconstructs BenchmarkResult objects from the flattened CSV format created by save_results_to_csv(). Each row in the CSV is parsed into a BenchmarkResult with the appropriate config type.

Parameters:
  • input_file – Input CSV file path

  • config_type – The dataclass type to use for the config field

Returns:

List of BenchmarkResult instances with parsed config objects

Raises:
save_results_to_csv(results: list[BenchmarkResult[Any]], output_file: str) None[source]

Save benchmark results to a CSV file.

Flattens the nested BenchmarkResult structure (config + performance metrics) into a flat CSV format. Each row contains both the benchmark configuration fields and the performance metrics.

Parameters:
  • results – List of BenchmarkResult instances

  • output_file – Output file path for the CSV file

Classes

Classes

class BenchmarkRunner(executor_type: ExecutorType, num_workers: int, warmup_iterations: int | None = None)[source]

Runner for executing benchmarks with configurable executors.

This class provides a standardized way to run benchmarks with:

  • Warmup phase to exclude executor initialization overhead

  • Multiple runs for statistical confidence intervals

  • Support for different executor types

The executor is initialized and warmed up in the constructor to exclude initialization overhead from benchmark measurements.

Parameters:
  • executor_type – Type of executor to use ("thread", "process", or "interpreter")

  • num_workers – Number of concurrent workers

  • warmup_iterations – Number of warmup iterations (default: 2 * num_workers)

property executor_type: ExecutorType[source]

Get the executor type.

run(config: ConfigT, func: Callable[[], T], iterations: int, num_runs: int = 5, confidence_level: float = 0.95) tuple[BenchmarkResult[ConfigT], T][source]

Run benchmark and return results with configuration.

Parameters:
  • config – Benchmark-specific configuration

  • func – Function to benchmark (takes no arguments)

  • iterations – Number of iterations per run

  • num_runs – Number of benchmark runs for confidence interval calculation (default: 5)

  • confidence_level – Confidence level for interval calculation (default: 0.95)

Returns:

Tuple of (BenchmarkResult, last output from function)

class BenchmarkResult[source]

Generic benchmark result containing configuration and performance metrics.

This class holds both the benchmark-specific configuration and the common performance statistics. It is parameterized by the config type, which allows each benchmark script to define its own configuration dataclass.

ci_lower: float

Lower bound of 95% confidence interval for QPS

ci_upper: float

Upper bound of 95% confidence interval for QPS

config: ConfigT

Benchmark-specific configuration (e.g., data format, file size, etc.)

date: str

When benchmark was run. ISO 8601 format.

executor_type: str

Type of executor used (thread, process, or interpreter)

free_threaded: bool

Whether Python is running with free-threaded ABI.

python_version: str

Python version used for the benchmark

qps: float

Queries per second (mean)

class ExecutorType[source]

Supported executor types for concurrent execution.

INTERPRETER = 'interpreter'

Use InterpreterPoolExecutor.

Requires Python 3.14+.

PROCESS = 'process'

Use ProcessPoolExecutor.

THREAD = 'thread'

Use ThreadPoolExecutor.