Benchmark utils

Common utilities for benchmark scripts.

This module provides a standardized framework for running benchmarks with:

Source

Source

Click here to see the source.
  1#!/usr/bin/env python3
  2# Copyright (c) Meta Platforms, Inc. and affiliates.
  3# All rights reserved.
  4#
  5# This source code is licensed under the BSD-style license found in the
  6# LICENSE file in the root directory of this source tree.
  7
  8"""Common utilities for benchmark scripts.
  9
 10This module provides a standardized framework for running benchmarks with:
 11
 12- Configurable executor types (
 13  :py:class:`~concurrent.futures.ThreadPoolExecutor`,
 14  :py:class:`~concurrent.futures.ProcessPoolExecutor`,
 15  :py:class:`~concurrent.futures.InterpreterPoolExecutor`)
 16- Warmup phase to exclude executor initialization overhead
 17- Statistical analysis with confidence intervals
 18- CSV export functionality
 19- Python version and free-threaded ABI detection
 20
 21.. seealso::
 22
 23   - :doc:`./benchmark_tarfile`
 24   - :doc:`./benchmark_wav`
 25   - :doc:`./benchmark_numpy`
 26
 27"""
 28
 29__all__ = [
 30    "BenchmarkRunner",
 31    "BenchmarkResult",
 32    "ExecutorType",
 33    "get_default_result_path",
 34    "load_results_from_csv",
 35    "save_results_to_csv",
 36]
 37
 38import csv
 39import os
 40import sys
 41import time
 42from collections.abc import Callable
 43from concurrent.futures import (
 44    as_completed,
 45    Executor,
 46    ProcessPoolExecutor,
 47    ThreadPoolExecutor,
 48)
 49from dataclasses import asdict, dataclass
 50from datetime import datetime, timezone
 51from enum import Enum
 52from functools import partial
 53from typing import Any, Generic, TypeVar
 54
 55import numpy as np
 56import psutil
 57import scipy.stats
 58
 59T = TypeVar("T")
 60ConfigT = TypeVar("ConfigT")
 61
 62
 63@dataclass
 64class BenchmarkResult(Generic[ConfigT]):
 65    """BenchmarkResult()
 66
 67    Generic benchmark result containing configuration and performance metrics.
 68
 69    This class holds both the benchmark-specific configuration and the
 70    common performance statistics. It is parameterized by the config type,
 71    which allows each benchmark script to define its own configuration dataclass.
 72    """
 73
 74    config: ConfigT
 75    """Benchmark-specific configuration (e.g., data format, file size, etc.)"""
 76
 77    executor_type: str
 78    """Type of executor used (thread, process, or interpreter)"""
 79
 80    qps: float
 81    """Queries per second (mean)"""
 82
 83    ci_lower: float
 84    """Lower bound of 95% confidence interval for QPS"""
 85
 86    ci_upper: float
 87    """Upper bound of 95% confidence interval for QPS"""
 88
 89    date: str
 90    """When benchmark was run. ISO 8601 format."""
 91
 92    python_version: str
 93    """Python version used for the benchmark"""
 94
 95    free_threaded: bool
 96    """Whether Python is running with free-threaded ABI."""
 97
 98    cpu_percent: float
 99    """Average CPU utilization percentage during benchmark execution."""
100
101
102class ExecutorType(Enum):
103    """ExecutorType()
104
105    Supported executor types for concurrent execution."""
106
107    THREAD = "thread"
108    """Use :py:class:`~concurrent.futures.ThreadPoolExecutor`."""
109
110    PROCESS = "process"
111    """Use :py:class:`~concurrent.futures.ProcessPoolExecutor`."""
112
113    INTERPRETER = "interpreter"
114    """Use :py:class:`~concurrent.futures.InterpreterPoolExecutor`.
115    
116    Requires Python 3.14+.
117    """
118
119
120def _get_python_info() -> tuple[str, bool]:
121    """Get Python version and free-threaded ABI information.
122
123    Returns:
124        Tuple of (``python_version``, ``is_free_threaded``)
125    """
126    python_version = (
127        f"{sys.version_info.major}.{sys.version_info.minor}.{sys.version_info.micro}"
128    )
129    try:
130        is_free_threaded = not sys._is_gil_enabled()  # pyre-ignore[16]
131    except AttributeError:
132        is_free_threaded = False
133    return python_version, is_free_threaded
134
135
136def _create_executor(executor_type: ExecutorType, max_workers: int) -> Executor:
137    """Create an executor of the specified type.
138
139    Args:
140        executor_type: Type of executor to create
141        max_workers: Maximum number of workers
142
143    Returns:
144        Executor instance
145
146    Raises:
147        ValueError: If ``executor_type`` is not supported
148    """
149    match executor_type:
150        case ExecutorType.THREAD:
151            return ThreadPoolExecutor(max_workers=max_workers)
152        case ExecutorType.PROCESS:
153            return ProcessPoolExecutor(max_workers=max_workers)
154        case ExecutorType.INTERPRETER:
155            from concurrent.futures import InterpreterPoolExecutor  # pyre-ignore[21]
156
157            return InterpreterPoolExecutor(max_workers=max_workers)
158        case _:
159            raise ValueError(f"Unsupported executor type: {executor_type}")
160
161
162def _verify_workers(executor: Executor, expected_workers: int) -> None:
163    """Verify that the executor has created the expected number of workers.
164
165    Args:
166        executor: The executor to verify
167        expected_workers: Expected number of workers
168
169    Raises:
170        RuntimeError: If the number of workers doesn't match expected
171    """
172    match executor:
173        case ThreadPoolExecutor():
174            actual_workers = len(executor._threads)
175        case ProcessPoolExecutor():
176            actual_workers = len(executor._processes)
177        case _:
178            raise ValueError(f"Unexpected executor type {type(executor)}")
179
180    if actual_workers != expected_workers:
181        raise RuntimeError(
182            f"Expected {expected_workers} workers, but executor has {actual_workers}"
183        )
184
185
186def _warmup_executor(
187    executor: Executor, func: Callable[[], T], num_iterations: int
188) -> T:
189    """Warmup the executor by running the function multiple times.
190
191    Args:
192        executor: The executor to warmup
193        func: Function to run for warmup
194        num_iterations: Number of warmup iterations
195
196    Returns:
197        Output from the last warmup iteration
198    """
199    futures = [executor.submit(func) for _ in range(num_iterations)]
200    last_output: T | None = None
201    for future in as_completed(futures):
202        last_output = future.result()
203    return last_output  # pyre-ignore[7]
204
205
206class BenchmarkRunner:
207    """Runner for executing benchmarks with configurable executors.
208
209    This class provides a standardized way to run benchmarks with:
210
211    - Warmup phase to exclude executor initialization overhead
212    - Multiple runs for statistical confidence intervals
213    - Support for different executor types
214
215    The executor is initialized and warmed up in the constructor to exclude
216    initialization overhead from benchmark measurements.
217
218    Args:
219        executor_type: Type of executor to use
220            (``"thread"``, ``"process"``, or ``"interpreter"``)
221        num_workers: Number of concurrent workers
222        warmup_iterations: Number of warmup iterations (default: ``2 * num_workers``)
223    """
224
225    def __init__(
226        self,
227        executor_type: ExecutorType,
228        num_workers: int,
229        warmup_iterations: int | None = None,
230    ) -> None:
231        self._executor_type: ExecutorType = executor_type
232
233        warmup_iters = (
234            warmup_iterations if warmup_iterations is not None else 2 * num_workers
235        )
236
237        self._executor: Executor = _create_executor(executor_type, num_workers)
238
239        _warmup_executor(self._executor, partial(time.sleep, 1), warmup_iters)
240        _verify_workers(self._executor, num_workers)
241
242    @property
243    def executor_type(self) -> ExecutorType:
244        """Get the executor type."""
245        return self._executor_type
246
247    def __enter__(self) -> "BenchmarkRunner":
248        """Enter context manager."""
249        return self
250
251    def __exit__(self, exc_type: Any, exc_val: Any, exc_tb: Any) -> None:
252        """Exit context manager and shutdown executor."""
253        self._executor.shutdown(wait=True)
254
255    def _run_iterations(
256        self,
257        func: Callable[[], T],
258        iterations: int,
259        num_runs: int,
260    ) -> tuple[list[float], list[float], T]:
261        """Run benchmark iterations and collect QPS and CPU utilization samples.
262
263        Args:
264            func: Function to benchmark (takes no arguments)
265            iterations: Number of iterations per run
266            num_runs: Number of benchmark runs
267
268        Returns:
269            Tuple of (list of QPS samples, list of CPU percent samples, last function output)
270        """
271        qps_samples: list[float] = []
272        cpu_samples: list[float] = []
273        last_output: T | None = None
274
275        process = psutil.Process()
276
277        for _ in range(num_runs):
278            process.cpu_percent()
279            t0 = time.perf_counter()
280            futures = [self._executor.submit(func) for _ in range(iterations)]
281            for future in as_completed(futures):
282                last_output = future.result()
283            elapsed = time.perf_counter() - t0
284            cpu_percent = process.cpu_percent()
285            qps_samples.append(iterations / elapsed)
286            cpu_samples.append(cpu_percent / iterations)
287
288        return qps_samples, cpu_samples, last_output  # pyre-ignore[7]
289
290    def run(
291        self,
292        config: ConfigT,
293        func: Callable[[], T],
294        iterations: int,
295        num_runs: int = 5,
296        confidence_level: float = 0.95,
297    ) -> tuple[BenchmarkResult[ConfigT], T]:
298        """Run benchmark and return results with configuration.
299
300        Args:
301            config: Benchmark-specific configuration
302            func: Function to benchmark (takes no arguments)
303            iterations: Number of iterations per run
304            num_runs: Number of benchmark runs for confidence interval calculation
305                (default: ``5``)
306            confidence_level: Confidence level for interval calculation (default: ``0.95``)
307
308        Returns:
309            Tuple of (``BenchmarkResult``, last output from function)
310        """
311        qps_samples, cpu_samples, last_output = self._run_iterations(
312            func, iterations, num_runs
313        )
314
315        qps_mean = np.mean(qps_samples)
316        qps_std = np.std(qps_samples, ddof=1)
317        degrees_freedom = num_runs - 1
318        confidence_interval = scipy.stats.t.interval(
319            confidence_level,
320            degrees_freedom,
321            loc=qps_mean,
322            scale=qps_std / np.sqrt(num_runs),
323        )
324
325        cpu_mean = np.mean(cpu_samples)
326
327        python_version, free_threaded = _get_python_info()
328        date = datetime.now(timezone.utc).isoformat()
329
330        result = BenchmarkResult(
331            config=config,
332            executor_type=self.executor_type.value,
333            qps=float(qps_mean),
334            ci_lower=float(confidence_interval[0]),
335            ci_upper=float(confidence_interval[1]),
336            date=date,
337            python_version=python_version,
338            free_threaded=free_threaded,
339            cpu_percent=float(cpu_mean),
340        )
341
342        return result, last_output
343
344
345def get_default_result_path(path: str, ext: str = ".csv") -> str:
346    """Get the default result path with Python version appended."""
347    base, _ = os.path.splitext(os.path.realpath(path))
348    dirname = os.path.join(os.path.dirname(base), "data")
349    filename = os.path.basename(base)
350    python_version, free_threaded = _get_python_info()
351    version_suffix = (
352        f"_{'.'.join(python_version.split('.')[:2])}{'t' if free_threaded else ''}"
353    )
354    return os.path.join(dirname, f"{filename}{version_suffix}{ext}")
355
356
357def save_results_to_csv(
358    results: list[BenchmarkResult[Any]],
359    output_file: str,
360) -> None:
361    """Save benchmark results to a CSV file.
362
363    Flattens the nested BenchmarkResult structure (config + performance metrics)
364    into a flat CSV format. Each row contains both the benchmark configuration
365    fields and the performance metrics.
366
367    Args:
368        results: List of BenchmarkResult instances
369        output_file: Output file path for the CSV file
370    """
371    if not results:
372        raise ValueError("No results to save")
373
374    flattened_results = []
375    for result in results:
376        config_dict = asdict(result.config)
377        # convert bool to int for slight readability improvement of raw CSV file
378        config_dict = {
379            k: (int(v) if isinstance(v, bool) else v) for k, v in config_dict.items()
380        }
381        flattened = {
382            "date": result.date,
383            "python_version": result.python_version,
384            "free_threaded": int(result.free_threaded),
385            **config_dict,
386            "executor_type": result.executor_type,
387            "qps": result.qps,
388            "ci_lower": result.ci_lower,
389            "ci_upper": result.ci_upper,
390            "cpu_percent": result.cpu_percent,
391        }
392        flattened_results.append(flattened)
393
394    # Get all field names from the first result
395    fieldnames = list(flattened_results[0].keys())
396
397    output_path = os.path.realpath(output_file)
398    os.makedirs(os.path.dirname(output_file), exist_ok=True)
399    with open(output_path, "w", newline="") as csvfile:
400        # Write generated marker as first line
401        # Note: Splitting the marker so as to avoid linter consider this file as generated file
402        csvfile.write("# @" "generated\n")
403
404        writer = csv.DictWriter(csvfile, fieldnames=fieldnames)
405        writer.writeheader()
406        for result_dict in flattened_results:
407            writer.writerow(result_dict)
408
409    print(f"Results saved to {output_file}")
410
411
412def load_results_from_csv(
413    input_file: str,
414    config_type: type[ConfigT],
415) -> list[BenchmarkResult[ConfigT]]:
416    """Load benchmark results from a CSV file.
417
418    Reconstructs BenchmarkResult objects from the flattened CSV format created
419    by :py:func:`save_results_to_csv`.
420    Each row in the CSV is parsed into a :py:class:`BenchmarkResult`
421    with the appropriate config type.
422
423    Args:
424        input_file: Input CSV file path
425        config_type: The dataclass type to use for the config field
426
427    Returns:
428        List of BenchmarkResult instances with parsed config objects
429
430    Raises:
431        FileNotFoundError: If input_file does not exist
432        ValueError: If CSV format is invalid or ``config_type`` is not a dataclass
433    """
434    if not hasattr(config_type, "__dataclass_fields__"):
435        raise ValueError(f"config_type must be a dataclass, got {config_type}")
436    fields: dict[str, Any] = config_type.__dataclass_fields__  # pyre-ignore[16]
437
438    # Normalize input path and resolve symbolic links
439    input_file = os.path.realpath(input_file)
440
441    # Get the field names from the config dataclass
442    config_fields = set(fields.keys())
443
444    # Performance metric fields that are part of BenchmarkResult
445    result_fields = {
446        "executor_type",
447        "qps",
448        "ci_lower",
449        "ci_upper",
450        "date",
451        "python_version",
452        "free_threaded",
453        "cpu_percent",
454    }
455
456    results: list[BenchmarkResult[ConfigT]] = []
457
458    TRUES = ("true", "1", "yes")
459
460    with open(input_file, newline="") as csvfile:
461        reader = csv.DictReader((v for v in csvfile if not v.strip().startswith("#")))
462
463        for row in reader:
464            # Split row into config fields and result fields
465            config_dict = {}
466            result_dict = {}
467
468            for key, value in row.items():
469                if key in config_fields:
470                    config_dict[key] = value
471                elif key in result_fields:
472                    result_dict[key] = value
473                else:
474                    # Unknown field - could be from config or result
475                    # Try to infer based on whether it matches a config field name
476                    config_dict[key] = value
477
478            # Convert string values to appropriate types for config
479            typed_config_dict = {}
480            for field_name, field_info in fields.items():
481                if field_name not in config_dict:
482                    continue
483
484                value = config_dict[field_name]
485                field_type = field_info.type
486
487                # Handle type conversions
488                if field_type is int or field_type == "int":
489                    typed_config_dict[field_name] = int(value)
490                elif field_type is float or field_type == "float":
491                    typed_config_dict[field_name] = float(value)
492                elif field_type is bool or field_type == "bool":
493                    typed_config_dict[field_name] = value.lower() in TRUES
494                else:
495                    # Keep as string or use the value as-is
496                    typed_config_dict[field_name] = value
497
498            result = BenchmarkResult(
499                config=config_type(**typed_config_dict),
500                executor_type=result_dict["executor_type"],
501                qps=float(result_dict["qps"]),
502                ci_lower=float(result_dict["ci_lower"]),
503                ci_upper=float(result_dict["ci_upper"]),
504                date=result_dict["date"],
505                python_version=result_dict["python_version"],
506                free_threaded=result_dict["free_threaded"].lower()
507                in ("true", "1", "yes"),
508                cpu_percent=float(result_dict.get("cpu_percent", 0.0)),
509            )
510
511            results.append(result)
512
513    return results

API Reference

Functions

get_default_result_path(path: str, ext: str = '.csv') str[source]

Get the default result path with Python version appended.

load_results_from_csv(input_file: str, config_type: type[ConfigT]) list[BenchmarkResult[ConfigT]][source]

Load benchmark results from a CSV file.

Reconstructs BenchmarkResult objects from the flattened CSV format created by save_results_to_csv(). Each row in the CSV is parsed into a BenchmarkResult with the appropriate config type.

Parameters:
  • input_file – Input CSV file path

  • config_type – The dataclass type to use for the config field

Returns:

List of BenchmarkResult instances with parsed config objects

Raises:
save_results_to_csv(results: list[BenchmarkResult[Any]], output_file: str) None[source]

Save benchmark results to a CSV file.

Flattens the nested BenchmarkResult structure (config + performance metrics) into a flat CSV format. Each row contains both the benchmark configuration fields and the performance metrics.

Parameters:
  • results – List of BenchmarkResult instances

  • output_file – Output file path for the CSV file

Classes

class BenchmarkRunner(executor_type: ExecutorType, num_workers: int, warmup_iterations: int | None = None)[source]

Runner for executing benchmarks with configurable executors.

This class provides a standardized way to run benchmarks with:

  • Warmup phase to exclude executor initialization overhead

  • Multiple runs for statistical confidence intervals

  • Support for different executor types

The executor is initialized and warmed up in the constructor to exclude initialization overhead from benchmark measurements.

Parameters:
  • executor_type – Type of executor to use ("thread", "process", or "interpreter")

  • num_workers – Number of concurrent workers

  • warmup_iterations – Number of warmup iterations (default: 2 * num_workers)

property executor_type: ExecutorType[source]

Get the executor type.

run(config: ConfigT, func: Callable[[], T], iterations: int, num_runs: int = 5, confidence_level: float = 0.95) tuple[BenchmarkResult[ConfigT], T][source]

Run benchmark and return results with configuration.

Parameters:
  • config – Benchmark-specific configuration

  • func – Function to benchmark (takes no arguments)

  • iterations – Number of iterations per run

  • num_runs – Number of benchmark runs for confidence interval calculation (default: 5)

  • confidence_level – Confidence level for interval calculation (default: 0.95)

Returns:

Tuple of (BenchmarkResult, last output from function)

class BenchmarkResult[source]

Generic benchmark result containing configuration and performance metrics.

This class holds both the benchmark-specific configuration and the common performance statistics. It is parameterized by the config type, which allows each benchmark script to define its own configuration dataclass.

ci_lower: float

Lower bound of 95% confidence interval for QPS

ci_upper: float

Upper bound of 95% confidence interval for QPS

config: ConfigT

Benchmark-specific configuration (e.g., data format, file size, etc.)

cpu_percent: float

Average CPU utilization percentage during benchmark execution.

date: str

When benchmark was run. ISO 8601 format.

executor_type: str

Type of executor used (thread, process, or interpreter)

free_threaded: bool

Whether Python is running with free-threaded ABI.

python_version: str

Python version used for the benchmark

qps: float

Queries per second (mean)

class ExecutorType[source]

Supported executor types for concurrent execution.

INTERPRETER = 'interpreter'

Use InterpreterPoolExecutor.

Requires Python 3.14+.

PROCESS = 'process'

Use ProcessPoolExecutor.

THREAD = 'thread'

Use ThreadPoolExecutor.