Benchmark utils

Common utilities for benchmark scripts.

This module provides a standardized framework for running benchmarks with:

Source

Source

Click here to see the source.
  1#!/usr/bin/env python3
  2# Copyright (c) Meta Platforms, Inc. and affiliates.
  3# All rights reserved.
  4#
  5# This source code is licensed under the BSD-style license found in the
  6# LICENSE file in the root directory of this source tree.
  7
  8# pyre-strict
  9
 10"""Common utilities for benchmark scripts.
 11
 12This module provides a standardized framework for running benchmarks with:
 13
 14- Configurable executor types (
 15  :py:class:`~concurrent.futures.ThreadPoolExecutor`,
 16  :py:class:`~concurrent.futures.ProcessPoolExecutor`,
 17  :py:class:`~concurrent.futures.InterpreterPoolExecutor`)
 18- Warmup phase to exclude executor initialization overhead
 19- Statistical analysis with confidence intervals
 20- CSV export functionality
 21- Python version and free-threaded ABI detection
 22
 23.. seealso::
 24
 25   - :doc:`./benchmark_tarfile`
 26   - :doc:`./benchmark_wav`
 27   - :doc:`./benchmark_numpy`
 28
 29"""
 30
 31__all__ = [
 32    "BenchmarkRunner",
 33    "BenchmarkResult",
 34    "ExecutorType",
 35    "get_default_result_path",
 36    "load_results_from_csv",
 37    "save_results_to_csv",
 38]
 39
 40import csv
 41import os
 42import sys
 43import time
 44from collections.abc import Callable
 45from concurrent.futures import (
 46    as_completed,
 47    Executor,
 48    ProcessPoolExecutor,
 49    ThreadPoolExecutor,
 50)
 51from dataclasses import asdict, dataclass, field
 52from datetime import datetime, timezone
 53from enum import Enum
 54from functools import partial
 55from sys import version_info
 56from types import TracebackType
 57from typing import Any, Generic, TypeVar
 58
 59import numpy as np
 60import psutil
 61import scipy.stats
 62
 63T = TypeVar("T")
 64ConfigT = TypeVar("ConfigT")
 65
 66
 67def _is_free_threaded() -> bool:
 68    """Check if Python is running with free-threaded ABI."""
 69    try:
 70        return not sys._is_gil_enabled()  # pyre-ignore[16]
 71    except AttributeError:
 72        return False
 73
 74
 75_PYTHON_VERSION: str = f"{version_info.major}.{version_info.minor}.{version_info.micro}"
 76_FREE_THREADED: bool = _is_free_threaded()
 77
 78
 79@dataclass
 80class BenchmarkResult(Generic[ConfigT]):
 81    """BenchmarkResult()
 82
 83    Generic benchmark result containing configuration and performance metrics.
 84
 85    This class holds both the benchmark-specific configuration and the
 86    common performance statistics. It is parameterized by the config type,
 87    which allows each benchmark script to define its own configuration dataclass.
 88    """
 89
 90    config: ConfigT
 91    """Benchmark-specific configuration (e.g., data format, file size, etc.)"""
 92
 93    executor_type: str
 94    """Type of executor used (thread, process, or interpreter)"""
 95
 96    qps: float
 97    """Queries per second (mean)"""
 98
 99    ci_lower: float
100    """Lower bound of 95% confidence interval for QPS"""
101
102    ci_upper: float
103    """Upper bound of 95% confidence interval for QPS"""
104
105    date: str
106    """When benchmark was run. ISO 8601 format."""
107
108    cpu_percent: float
109    """Average CPU utilization percentage during benchmark execution."""
110
111    python_version: str = field(default=_PYTHON_VERSION)
112    """Python version used for the benchmark"""
113
114    free_threaded: bool = field(default=_FREE_THREADED)
115    """Whether Python is running with free-threaded ABI."""
116
117
118class ExecutorType(Enum):
119    """ExecutorType()
120
121    Supported executor types for concurrent execution."""
122
123    THREAD = "thread"
124    """Use :py:class:`~concurrent.futures.ThreadPoolExecutor`."""
125
126    PROCESS = "process"
127    """Use :py:class:`~concurrent.futures.ProcessPoolExecutor`."""
128
129    INTERPRETER = "interpreter"
130    """Use :py:class:`~concurrent.futures.InterpreterPoolExecutor`.
131
132    Requires Python 3.14+.
133    """
134
135
136def _create_executor(executor_type: ExecutorType, max_workers: int) -> Executor:
137    """Create an executor of the specified type.
138
139    Args:
140        executor_type: Type of executor to create
141        max_workers: Maximum number of workers
142
143    Returns:
144        Executor instance
145
146    Raises:
147        ValueError: If ``executor_type`` is not supported
148    """
149    match executor_type:
150        case ExecutorType.THREAD:
151            return ThreadPoolExecutor(max_workers=max_workers)
152        case ExecutorType.PROCESS:
153            return ProcessPoolExecutor(max_workers=max_workers)
154        case ExecutorType.INTERPRETER:
155            from concurrent.futures import InterpreterPoolExecutor  # pyre-ignore[21]
156
157            return InterpreterPoolExecutor(max_workers=max_workers)
158        case _:
159            raise ValueError(f"Unsupported executor type: {executor_type}")
160
161
162def _verify_workers(executor: Executor, expected_workers: int) -> None:
163    """Verify that the executor has created the expected number of workers.
164
165    Args:
166        executor: The executor to verify
167        expected_workers: Expected number of workers
168
169    Raises:
170        RuntimeError: If the number of workers doesn't match expected
171    """
172    match executor:
173        case ThreadPoolExecutor():
174            actual_workers = len(executor._threads)
175        case ProcessPoolExecutor():
176            actual_workers = len(executor._processes)
177        case _:
178            raise ValueError(f"Unexpected executor type {type(executor)}")
179
180    if actual_workers != expected_workers:
181        raise RuntimeError(
182            f"Expected {expected_workers} workers, but executor has {actual_workers}"
183        )
184
185
186def _warmup_executor(
187    executor: Executor, func: Callable[[], T], num_iterations: int
188) -> T:
189    """Warmup the executor by running the function multiple times.
190
191    Args:
192        executor: The executor to warmup
193        func: Function to run for warmup
194        num_iterations: Number of warmup iterations
195
196    Returns:
197        Output from the last warmup iteration
198    """
199    futures = [executor.submit(func) for _ in range(num_iterations)]
200    last_output: T | None = None
201    for future in as_completed(futures):
202        last_output = future.result()
203    assert last_output is not None
204    return last_output
205
206
207class BenchmarkRunner:
208    """Runner for executing benchmarks with configurable executors.
209
210    This class provides a standardized way to run benchmarks with:
211
212    - Warmup phase to exclude executor initialization overhead
213    - Multiple runs for statistical confidence intervals
214    - Support for different executor types
215
216    The executor is initialized and warmed up in the constructor to exclude
217    initialization overhead from benchmark measurements.
218
219    Args:
220        executor_type: Type of executor to use
221            (``"thread"``, ``"process"``, or ``"interpreter"``)
222        num_workers: Number of concurrent workers
223        warmup_iterations: Number of warmup iterations (default: ``2 * num_workers``)
224    """
225
226    def __init__(
227        self,
228        executor_type: ExecutorType,
229        num_workers: int,
230        warmup_iterations: int | None = None,
231    ) -> None:
232        self._executor_type: ExecutorType = executor_type
233
234        warmup_iters = (
235            warmup_iterations if warmup_iterations is not None else 2 * num_workers
236        )
237
238        self._executor: Executor = _create_executor(executor_type, num_workers)
239
240        _warmup_executor(self._executor, partial(time.sleep, 1), warmup_iters)
241        _verify_workers(self._executor, num_workers)
242
243    @property
244    def executor_type(self) -> ExecutorType:
245        """Get the executor type."""
246        return self._executor_type
247
248    def __enter__(self) -> "BenchmarkRunner":
249        """Enter context manager."""
250        return self
251
252    def __exit__(
253        self,
254        exc_type: type[BaseException] | None,
255        exc_val: BaseException | None,
256        exc_tb: TracebackType | None,
257    ) -> None:
258        """Exit context manager and shutdown executor."""
259        self._executor.shutdown(wait=True)
260
261    def _run_iterations(
262        self,
263        func: Callable[[], T],
264        iterations: int,
265        num_runs: int,
266    ) -> tuple[list[float], list[float], T]:
267        """Run benchmark iterations and collect QPS and CPU utilization samples.
268
269        Args:
270            func: Function to benchmark (takes no arguments)
271            iterations: Number of iterations per run
272            num_runs: Number of benchmark runs
273
274        Returns:
275            Tuple of (list of QPS samples, list of CPU percent samples, last function output)
276        """
277        qps_samples: list[float] = []
278        cpu_samples: list[float] = []
279        last_output: T | None = None
280
281        process = psutil.Process()
282
283        for _ in range(num_runs):
284            process.cpu_percent()
285            t0 = time.perf_counter()
286            futures = [self._executor.submit(func) for _ in range(iterations)]
287            for future in as_completed(futures):
288                last_output = future.result()
289            elapsed = time.perf_counter() - t0
290            cpu_percent = process.cpu_percent()
291            qps_samples.append(iterations / elapsed)
292            cpu_samples.append(cpu_percent / iterations)
293
294        assert last_output is not None
295        return qps_samples, cpu_samples, last_output
296
297    def run(
298        self,
299        config: ConfigT,
300        func: Callable[[], T],
301        iterations: int,
302        num_runs: int = 5,
303        confidence_level: float = 0.95,
304    ) -> tuple[BenchmarkResult[ConfigT], T]:
305        """Run benchmark and return results with configuration.
306
307        Args:
308            config: Benchmark-specific configuration
309            func: Function to benchmark (takes no arguments)
310            iterations: Number of iterations per run
311            num_runs: Number of benchmark runs for confidence interval calculation
312                (default: ``5``)
313            confidence_level: Confidence level for interval calculation (default: ``0.95``)
314
315        Returns:
316            Tuple of (``BenchmarkResult``, last output from function)
317        """
318        qps_samples, cpu_samples, last_output = self._run_iterations(
319            func, iterations, num_runs
320        )
321
322        qps_mean = np.mean(qps_samples)
323        qps_std = np.std(qps_samples, ddof=1)
324        degrees_freedom = num_runs - 1
325        confidence_interval = scipy.stats.t.interval(
326            confidence_level,
327            degrees_freedom,
328            loc=qps_mean,
329            scale=qps_std / np.sqrt(num_runs),
330        )
331
332        cpu_mean = np.mean(cpu_samples)
333
334        date = datetime.now(timezone.utc).isoformat()
335
336        result = BenchmarkResult(
337            config=config,
338            executor_type=self.executor_type.value,
339            qps=float(qps_mean),
340            # pyrefly: ignore [bad-argument-type]
341            ci_lower=float(confidence_interval[0]),
342            # pyrefly: ignore [bad-argument-type]
343            ci_upper=float(confidence_interval[1]),
344            date=date,
345            cpu_percent=float(cpu_mean),
346        )
347
348        return result, last_output
349
350
351def get_default_result_path(path: str, ext: str = ".csv") -> str:
352    """Get the default result path with Python version appended."""
353    base, _ = os.path.splitext(os.path.realpath(path))
354    dirname = os.path.join(os.path.dirname(base), "data")
355    filename = os.path.basename(base)
356    version_suffix = (
357        f"_{'.'.join(_PYTHON_VERSION.split('.')[:2])}{'t' if _FREE_THREADED else ''}"
358    )
359    return os.path.join(dirname, f"{filename}{version_suffix}{ext}")
360
361
362def save_results_to_csv(
363    results: list[BenchmarkResult[Any]],
364    output_file: str,
365) -> None:
366    """Save benchmark results to a CSV file.
367
368    Flattens the nested BenchmarkResult structure (config + performance metrics)
369    into a flat CSV format. Each row contains both the benchmark configuration
370    fields and the performance metrics.
371
372    Args:
373        results: List of BenchmarkResult instances
374        output_file: Output file path for the CSV file
375    """
376    if not results:
377        raise ValueError("No results to save")
378
379    flattened_results = []
380    for result in results:
381        config_dict = asdict(result.config)
382        # convert bool to int for slight readability improvement of raw CSV file
383        config_dict = {
384            k: (int(v) if isinstance(v, bool) else v) for k, v in config_dict.items()
385        }
386        flattened = {
387            "date": result.date,
388            "python_version": result.python_version,
389            "free_threaded": int(result.free_threaded),
390            **config_dict,
391            "executor_type": result.executor_type,
392            "qps": result.qps,
393            "ci_lower": result.ci_lower,
394            "ci_upper": result.ci_upper,
395            "cpu_percent": result.cpu_percent,
396        }
397        flattened_results.append(flattened)
398
399    # Get all field names from the first result
400    fieldnames = list(flattened_results[0].keys())
401
402    output_path = os.path.realpath(output_file)
403    os.makedirs(os.path.dirname(output_file), exist_ok=True)
404    with open(output_path, "w", newline="") as csvfile:
405        # Write generated marker as first line
406        # Note: Splitting the marker so as to avoid linter consider this file as generated file
407        csvfile.write("# @")
408        csvfile.write("generated\n")
409
410        writer = csv.DictWriter(csvfile, fieldnames=fieldnames)
411        writer.writeheader()
412        for result_dict in flattened_results:
413            writer.writerow(result_dict)
414
415    print(f"Results saved to {output_file}")
416
417
418def load_results_from_csv(
419    input_file: str,
420    config_type: type[ConfigT],
421) -> list[BenchmarkResult[ConfigT]]:
422    """Load benchmark results from a CSV file.
423
424    Reconstructs BenchmarkResult objects from the flattened CSV format created
425    by :py:func:`save_results_to_csv`.
426    Each row in the CSV is parsed into a :py:class:`BenchmarkResult`
427    with the appropriate config type.
428
429    Args:
430        input_file: Input CSV file path
431        config_type: The dataclass type to use for the config field
432
433    Returns:
434        List of BenchmarkResult instances with parsed config objects
435
436    Raises:
437        FileNotFoundError: If input_file does not exist
438        ValueError: If CSV format is invalid or ``config_type`` is not a dataclass
439    """
440    if not hasattr(config_type, "__dataclass_fields__"):
441        raise ValueError(f"config_type must be a dataclass, got {config_type}")
442    fields: dict[str, Any] = config_type.__dataclass_fields__  # pyre-ignore[16]
443
444    # Normalize input path and resolve symbolic links
445    input_file = os.path.realpath(input_file)
446
447    # Get the field names from the config dataclass
448    config_fields = set(fields.keys())
449
450    # Performance metric fields that are part of BenchmarkResult
451    result_fields = {
452        "executor_type",
453        "qps",
454        "ci_lower",
455        "ci_upper",
456        "date",
457        "python_version",
458        "free_threaded",
459        "cpu_percent",
460    }
461
462    results: list[BenchmarkResult[ConfigT]] = []
463
464    TRUES = ("true", "1", "yes")
465
466    with open(input_file, newline="") as csvfile:
467        reader = csv.DictReader((v for v in csvfile if not v.strip().startswith("#")))
468
469        for row in reader:
470            # Split row into config fields and result fields
471            config_dict = {}
472            result_dict = {}
473
474            for key, value in row.items():
475                if key in config_fields:
476                    config_dict[key] = value
477                elif key in result_fields:
478                    result_dict[key] = value
479                else:
480                    # Unknown field - could be from config or result
481                    # Try to infer based on whether it matches a config field name
482                    config_dict[key] = value
483
484            # Convert string values to appropriate types for config
485            typed_config_dict = {}
486            for field_name, field_info in fields.items():
487                if field_name not in config_dict:
488                    continue
489
490                value = config_dict[field_name]
491                field_type = field_info.type
492
493                # Handle type conversions
494                if field_type is int or field_type == "int":
495                    typed_config_dict[field_name] = int(value)
496                elif field_type is float or field_type == "float":
497                    # pyrefly: ignore [unsupported-operation]
498                    typed_config_dict[field_name] = float(value)
499                elif field_type is bool or field_type == "bool":
500                    typed_config_dict[field_name] = value.lower() in TRUES
501                else:
502                    # Keep as string or use the value as-is
503                    # pyrefly: ignore [unsupported-operation]
504                    typed_config_dict[field_name] = value
505
506            result = BenchmarkResult(
507                config=config_type(**typed_config_dict),
508                executor_type=result_dict["executor_type"],
509                qps=float(result_dict["qps"]),
510                ci_lower=float(result_dict["ci_lower"]),
511                ci_upper=float(result_dict["ci_upper"]),
512                date=result_dict["date"],
513                python_version=result_dict["python_version"],
514                free_threaded=result_dict["free_threaded"].lower()
515                in ("true", "1", "yes"),
516                cpu_percent=float(result_dict.get("cpu_percent", 0.0)),
517            )
518
519            results.append(result)
520
521    return results

API Reference

Functions

get_default_result_path(path: str, ext: str = '.csv') str[source]

Get the default result path with Python version appended.

load_results_from_csv(input_file: str, config_type: type[ConfigT]) list[BenchmarkResult[ConfigT]][source]

Load benchmark results from a CSV file.

Reconstructs BenchmarkResult objects from the flattened CSV format created by save_results_to_csv(). Each row in the CSV is parsed into a BenchmarkResult with the appropriate config type.

Parameters:
  • input_file – Input CSV file path

  • config_type – The dataclass type to use for the config field

Returns:

List of BenchmarkResult instances with parsed config objects

Raises:
save_results_to_csv(results: list[BenchmarkResult[Any]], output_file: str) None[source]

Save benchmark results to a CSV file.

Flattens the nested BenchmarkResult structure (config + performance metrics) into a flat CSV format. Each row contains both the benchmark configuration fields and the performance metrics.

Parameters:
  • results – List of BenchmarkResult instances

  • output_file – Output file path for the CSV file

Classes

class BenchmarkRunner(executor_type: ExecutorType, num_workers: int, warmup_iterations: int | None = None)[source]

Runner for executing benchmarks with configurable executors.

This class provides a standardized way to run benchmarks with:

  • Warmup phase to exclude executor initialization overhead

  • Multiple runs for statistical confidence intervals

  • Support for different executor types

The executor is initialized and warmed up in the constructor to exclude initialization overhead from benchmark measurements.

Parameters:
  • executor_type – Type of executor to use ("thread", "process", or "interpreter")

  • num_workers – Number of concurrent workers

  • warmup_iterations – Number of warmup iterations (default: 2 * num_workers)

property executor_type: ExecutorType[source]

Get the executor type.

run(config: ConfigT, func: Callable[[], T], iterations: int, num_runs: int = 5, confidence_level: float = 0.95) tuple[BenchmarkResult[ConfigT], T][source]

Run benchmark and return results with configuration.

Parameters:
  • config – Benchmark-specific configuration

  • func – Function to benchmark (takes no arguments)

  • iterations – Number of iterations per run

  • num_runs – Number of benchmark runs for confidence interval calculation (default: 5)

  • confidence_level – Confidence level for interval calculation (default: 0.95)

Returns:

Tuple of (BenchmarkResult, last output from function)

class BenchmarkResult[source]

Generic benchmark result containing configuration and performance metrics.

This class holds both the benchmark-specific configuration and the common performance statistics. It is parameterized by the config type, which allows each benchmark script to define its own configuration dataclass.

ci_lower: float

Lower bound of 95% confidence interval for QPS

ci_upper: float

Upper bound of 95% confidence interval for QPS

config: ConfigT

Benchmark-specific configuration (e.g., data format, file size, etc.)

cpu_percent: float

Average CPU utilization percentage during benchmark execution.

date: str

When benchmark was run. ISO 8601 format.

executor_type: str

Type of executor used (thread, process, or interpreter)

free_threaded: bool = False

Whether Python is running with free-threaded ABI.

python_version: str = '3.12.13'

Python version used for the benchmark

qps: float

Queries per second (mean)

class ExecutorType[source]

Supported executor types for concurrent execution.

INTERPRETER = 'interpreter'

Use InterpreterPoolExecutor.

Requires Python 3.14+.

PROCESS = 'process'

Use ProcessPoolExecutor.

THREAD = 'thread'

Use ThreadPoolExecutor.