Benchmark utils

Common utilities for benchmark scripts.

This module provides a standardized framework for running benchmarks with:

Source

Source

Click here to see the source.
  1#!/usr/bin/env python3
  2# Copyright (c) Meta Platforms, Inc. and affiliates.
  3# All rights reserved.
  4#
  5# This source code is licensed under the BSD-style license found in the
  6# LICENSE file in the root directory of this source tree.
  7
  8"""Common utilities for benchmark scripts.
  9
 10This module provides a standardized framework for running benchmarks with:
 11
 12- Configurable executor types (
 13  :py:class:`~concurrent.futures.ThreadPoolExecutor`,
 14  :py:class:`~concurrent.futures.ProcessPoolExecutor`,
 15  :py:class:`~concurrent.futures.InterpreterPoolExecutor`)
 16- Warmup phase to exclude executor initialization overhead
 17- Statistical analysis with confidence intervals
 18- CSV export functionality
 19- Python version and free-threaded ABI detection
 20
 21.. seealso::
 22
 23   - :doc:`./benchmark_tarfile`
 24   - :doc:`./benchmark_wav`
 25   - :doc:`./benchmark_numpy`
 26
 27"""
 28
 29__all__ = [
 30    "BenchmarkRunner",
 31    "BenchmarkResult",
 32    "ExecutorType",
 33    "get_default_result_path",
 34    "load_results_from_csv",
 35    "save_results_to_csv",
 36]
 37
 38import csv
 39import os
 40import sys
 41import time
 42from collections.abc import Callable
 43from concurrent.futures import (
 44    as_completed,
 45    Executor,
 46    ProcessPoolExecutor,
 47    ThreadPoolExecutor,
 48)
 49from dataclasses import asdict, dataclass, field
 50from datetime import datetime, timezone
 51from enum import Enum
 52from functools import partial
 53from sys import version_info
 54from typing import Any, Generic, TypeVar
 55
 56import numpy as np
 57import psutil
 58import scipy.stats
 59
 60T = TypeVar("T")
 61ConfigT = TypeVar("ConfigT")
 62
 63
 64def _is_free_threaded() -> bool:
 65    """Check if Python is running with free-threaded ABI."""
 66    try:
 67        return not sys._is_gil_enabled()  # pyre-ignore[16]
 68    except AttributeError:
 69        return False
 70
 71
 72_PYTHON_VERSION = f"{version_info.major}.{version_info.minor}.{version_info.micro}"
 73_FREE_THREADED = _is_free_threaded()
 74
 75
 76@dataclass
 77class BenchmarkResult(Generic[ConfigT]):
 78    """BenchmarkResult()
 79
 80    Generic benchmark result containing configuration and performance metrics.
 81
 82    This class holds both the benchmark-specific configuration and the
 83    common performance statistics. It is parameterized by the config type,
 84    which allows each benchmark script to define its own configuration dataclass.
 85    """
 86
 87    config: ConfigT
 88    """Benchmark-specific configuration (e.g., data format, file size, etc.)"""
 89
 90    executor_type: str
 91    """Type of executor used (thread, process, or interpreter)"""
 92
 93    qps: float
 94    """Queries per second (mean)"""
 95
 96    ci_lower: float
 97    """Lower bound of 95% confidence interval for QPS"""
 98
 99    ci_upper: float
100    """Upper bound of 95% confidence interval for QPS"""
101
102    date: str
103    """When benchmark was run. ISO 8601 format."""
104
105    cpu_percent: float
106    """Average CPU utilization percentage during benchmark execution."""
107
108    python_version: str = field(default=_PYTHON_VERSION)
109    """Python version used for the benchmark"""
110
111    free_threaded: bool = field(default=_FREE_THREADED)
112    """Whether Python is running with free-threaded ABI."""
113
114
115class ExecutorType(Enum):
116    """ExecutorType()
117
118    Supported executor types for concurrent execution."""
119
120    THREAD = "thread"
121    """Use :py:class:`~concurrent.futures.ThreadPoolExecutor`."""
122
123    PROCESS = "process"
124    """Use :py:class:`~concurrent.futures.ProcessPoolExecutor`."""
125
126    INTERPRETER = "interpreter"
127    """Use :py:class:`~concurrent.futures.InterpreterPoolExecutor`.
128
129    Requires Python 3.14+.
130    """
131
132
133def _create_executor(executor_type: ExecutorType, max_workers: int) -> Executor:
134    """Create an executor of the specified type.
135
136    Args:
137        executor_type: Type of executor to create
138        max_workers: Maximum number of workers
139
140    Returns:
141        Executor instance
142
143    Raises:
144        ValueError: If ``executor_type`` is not supported
145    """
146    match executor_type:
147        case ExecutorType.THREAD:
148            return ThreadPoolExecutor(max_workers=max_workers)
149        case ExecutorType.PROCESS:
150            return ProcessPoolExecutor(max_workers=max_workers)
151        case ExecutorType.INTERPRETER:
152            from concurrent.futures import InterpreterPoolExecutor  # pyre-ignore[21]
153
154            return InterpreterPoolExecutor(max_workers=max_workers)
155        case _:
156            raise ValueError(f"Unsupported executor type: {executor_type}")
157
158
159def _verify_workers(executor: Executor, expected_workers: int) -> None:
160    """Verify that the executor has created the expected number of workers.
161
162    Args:
163        executor: The executor to verify
164        expected_workers: Expected number of workers
165
166    Raises:
167        RuntimeError: If the number of workers doesn't match expected
168    """
169    match executor:
170        case ThreadPoolExecutor():
171            actual_workers = len(executor._threads)
172        case ProcessPoolExecutor():
173            actual_workers = len(executor._processes)
174        case _:
175            raise ValueError(f"Unexpected executor type {type(executor)}")
176
177    if actual_workers != expected_workers:
178        raise RuntimeError(
179            f"Expected {expected_workers} workers, but executor has {actual_workers}"
180        )
181
182
183def _warmup_executor(
184    executor: Executor, func: Callable[[], T], num_iterations: int
185) -> T:
186    """Warmup the executor by running the function multiple times.
187
188    Args:
189        executor: The executor to warmup
190        func: Function to run for warmup
191        num_iterations: Number of warmup iterations
192
193    Returns:
194        Output from the last warmup iteration
195    """
196    futures = [executor.submit(func) for _ in range(num_iterations)]
197    last_output: T | None = None
198    for future in as_completed(futures):
199        last_output = future.result()
200    return last_output  # pyre-ignore[7]
201
202
203class BenchmarkRunner:
204    """Runner for executing benchmarks with configurable executors.
205
206    This class provides a standardized way to run benchmarks with:
207
208    - Warmup phase to exclude executor initialization overhead
209    - Multiple runs for statistical confidence intervals
210    - Support for different executor types
211
212    The executor is initialized and warmed up in the constructor to exclude
213    initialization overhead from benchmark measurements.
214
215    Args:
216        executor_type: Type of executor to use
217            (``"thread"``, ``"process"``, or ``"interpreter"``)
218        num_workers: Number of concurrent workers
219        warmup_iterations: Number of warmup iterations (default: ``2 * num_workers``)
220    """
221
222    def __init__(
223        self,
224        executor_type: ExecutorType,
225        num_workers: int,
226        warmup_iterations: int | None = None,
227    ) -> None:
228        self._executor_type: ExecutorType = executor_type
229
230        warmup_iters = (
231            warmup_iterations if warmup_iterations is not None else 2 * num_workers
232        )
233
234        self._executor: Executor = _create_executor(executor_type, num_workers)
235
236        _warmup_executor(self._executor, partial(time.sleep, 1), warmup_iters)
237        _verify_workers(self._executor, num_workers)
238
239    @property
240    def executor_type(self) -> ExecutorType:
241        """Get the executor type."""
242        return self._executor_type
243
244    def __enter__(self) -> "BenchmarkRunner":
245        """Enter context manager."""
246        return self
247
248    def __exit__(self, exc_type: Any, exc_val: Any, exc_tb: Any) -> None:
249        """Exit context manager and shutdown executor."""
250        self._executor.shutdown(wait=True)
251
252    def _run_iterations(
253        self,
254        func: Callable[[], T],
255        iterations: int,
256        num_runs: int,
257    ) -> tuple[list[float], list[float], T]:
258        """Run benchmark iterations and collect QPS and CPU utilization samples.
259
260        Args:
261            func: Function to benchmark (takes no arguments)
262            iterations: Number of iterations per run
263            num_runs: Number of benchmark runs
264
265        Returns:
266            Tuple of (list of QPS samples, list of CPU percent samples, last function output)
267        """
268        qps_samples: list[float] = []
269        cpu_samples: list[float] = []
270        last_output: T | None = None
271
272        process = psutil.Process()
273
274        for _ in range(num_runs):
275            process.cpu_percent()
276            t0 = time.perf_counter()
277            futures = [self._executor.submit(func) for _ in range(iterations)]
278            for future in as_completed(futures):
279                last_output = future.result()
280            elapsed = time.perf_counter() - t0
281            cpu_percent = process.cpu_percent()
282            qps_samples.append(iterations / elapsed)
283            cpu_samples.append(cpu_percent / iterations)
284
285        return qps_samples, cpu_samples, last_output  # pyre-ignore[7]
286
287    def run(
288        self,
289        config: ConfigT,
290        func: Callable[[], T],
291        iterations: int,
292        num_runs: int = 5,
293        confidence_level: float = 0.95,
294    ) -> tuple[BenchmarkResult[ConfigT], T]:
295        """Run benchmark and return results with configuration.
296
297        Args:
298            config: Benchmark-specific configuration
299            func: Function to benchmark (takes no arguments)
300            iterations: Number of iterations per run
301            num_runs: Number of benchmark runs for confidence interval calculation
302                (default: ``5``)
303            confidence_level: Confidence level for interval calculation (default: ``0.95``)
304
305        Returns:
306            Tuple of (``BenchmarkResult``, last output from function)
307        """
308        qps_samples, cpu_samples, last_output = self._run_iterations(
309            func, iterations, num_runs
310        )
311
312        qps_mean = np.mean(qps_samples)
313        qps_std = np.std(qps_samples, ddof=1)
314        degrees_freedom = num_runs - 1
315        confidence_interval = scipy.stats.t.interval(
316            confidence_level,
317            degrees_freedom,
318            loc=qps_mean,
319            scale=qps_std / np.sqrt(num_runs),
320        )
321
322        cpu_mean = np.mean(cpu_samples)
323
324        date = datetime.now(timezone.utc).isoformat()
325
326        result = BenchmarkResult(
327            config=config,
328            executor_type=self.executor_type.value,
329            qps=float(qps_mean),
330            ci_lower=float(confidence_interval[0]),
331            ci_upper=float(confidence_interval[1]),
332            date=date,
333            cpu_percent=float(cpu_mean),
334        )
335
336        return result, last_output
337
338
339def get_default_result_path(path: str, ext: str = ".csv") -> str:
340    """Get the default result path with Python version appended."""
341    base, _ = os.path.splitext(os.path.realpath(path))
342    dirname = os.path.join(os.path.dirname(base), "data")
343    filename = os.path.basename(base)
344    version_suffix = (
345        f"_{'.'.join(_PYTHON_VERSION.split('.')[:2])}{'t' if _FREE_THREADED else ''}"
346    )
347    return os.path.join(dirname, f"{filename}{version_suffix}{ext}")
348
349
350def save_results_to_csv(
351    results: list[BenchmarkResult[Any]],
352    output_file: str,
353) -> None:
354    """Save benchmark results to a CSV file.
355
356    Flattens the nested BenchmarkResult structure (config + performance metrics)
357    into a flat CSV format. Each row contains both the benchmark configuration
358    fields and the performance metrics.
359
360    Args:
361        results: List of BenchmarkResult instances
362        output_file: Output file path for the CSV file
363    """
364    if not results:
365        raise ValueError("No results to save")
366
367    flattened_results = []
368    for result in results:
369        config_dict = asdict(result.config)
370        # convert bool to int for slight readability improvement of raw CSV file
371        config_dict = {
372            k: (int(v) if isinstance(v, bool) else v) for k, v in config_dict.items()
373        }
374        flattened = {
375            "date": result.date,
376            "python_version": result.python_version,
377            "free_threaded": int(result.free_threaded),
378            **config_dict,
379            "executor_type": result.executor_type,
380            "qps": result.qps,
381            "ci_lower": result.ci_lower,
382            "ci_upper": result.ci_upper,
383            "cpu_percent": result.cpu_percent,
384        }
385        flattened_results.append(flattened)
386
387    # Get all field names from the first result
388    fieldnames = list(flattened_results[0].keys())
389
390    output_path = os.path.realpath(output_file)
391    os.makedirs(os.path.dirname(output_file), exist_ok=True)
392    with open(output_path, "w", newline="") as csvfile:
393        # Write generated marker as first line
394        # Note: Splitting the marker so as to avoid linter consider this file as generated file
395        csvfile.write("# @")
396        csvfile.write("generated\n")
397
398        writer = csv.DictWriter(csvfile, fieldnames=fieldnames)
399        writer.writeheader()
400        for result_dict in flattened_results:
401            writer.writerow(result_dict)
402
403    print(f"Results saved to {output_file}")
404
405
406def load_results_from_csv(
407    input_file: str,
408    config_type: type[ConfigT],
409) -> list[BenchmarkResult[ConfigT]]:
410    """Load benchmark results from a CSV file.
411
412    Reconstructs BenchmarkResult objects from the flattened CSV format created
413    by :py:func:`save_results_to_csv`.
414    Each row in the CSV is parsed into a :py:class:`BenchmarkResult`
415    with the appropriate config type.
416
417    Args:
418        input_file: Input CSV file path
419        config_type: The dataclass type to use for the config field
420
421    Returns:
422        List of BenchmarkResult instances with parsed config objects
423
424    Raises:
425        FileNotFoundError: If input_file does not exist
426        ValueError: If CSV format is invalid or ``config_type`` is not a dataclass
427    """
428    if not hasattr(config_type, "__dataclass_fields__"):
429        raise ValueError(f"config_type must be a dataclass, got {config_type}")
430    fields: dict[str, Any] = config_type.__dataclass_fields__  # pyre-ignore[16]
431
432    # Normalize input path and resolve symbolic links
433    input_file = os.path.realpath(input_file)
434
435    # Get the field names from the config dataclass
436    config_fields = set(fields.keys())
437
438    # Performance metric fields that are part of BenchmarkResult
439    result_fields = {
440        "executor_type",
441        "qps",
442        "ci_lower",
443        "ci_upper",
444        "date",
445        "python_version",
446        "free_threaded",
447        "cpu_percent",
448    }
449
450    results: list[BenchmarkResult[ConfigT]] = []
451
452    TRUES = ("true", "1", "yes")
453
454    with open(input_file, newline="") as csvfile:
455        reader = csv.DictReader((v for v in csvfile if not v.strip().startswith("#")))
456
457        for row in reader:
458            # Split row into config fields and result fields
459            config_dict = {}
460            result_dict = {}
461
462            for key, value in row.items():
463                if key in config_fields:
464                    config_dict[key] = value
465                elif key in result_fields:
466                    result_dict[key] = value
467                else:
468                    # Unknown field - could be from config or result
469                    # Try to infer based on whether it matches a config field name
470                    config_dict[key] = value
471
472            # Convert string values to appropriate types for config
473            typed_config_dict = {}
474            for field_name, field_info in fields.items():
475                if field_name not in config_dict:
476                    continue
477
478                value = config_dict[field_name]
479                field_type = field_info.type
480
481                # Handle type conversions
482                if field_type is int or field_type == "int":
483                    typed_config_dict[field_name] = int(value)
484                elif field_type is float or field_type == "float":
485                    typed_config_dict[field_name] = float(value)
486                elif field_type is bool or field_type == "bool":
487                    typed_config_dict[field_name] = value.lower() in TRUES
488                else:
489                    # Keep as string or use the value as-is
490                    typed_config_dict[field_name] = value
491
492            result = BenchmarkResult(
493                config=config_type(**typed_config_dict),
494                executor_type=result_dict["executor_type"],
495                qps=float(result_dict["qps"]),
496                ci_lower=float(result_dict["ci_lower"]),
497                ci_upper=float(result_dict["ci_upper"]),
498                date=result_dict["date"],
499                python_version=result_dict["python_version"],
500                free_threaded=result_dict["free_threaded"].lower()
501                in ("true", "1", "yes"),
502                cpu_percent=float(result_dict.get("cpu_percent", 0.0)),
503            )
504
505            results.append(result)
506
507    return results

API Reference

Functions

get_default_result_path(path: str, ext: str = '.csv') str[source]

Get the default result path with Python version appended.

load_results_from_csv(input_file: str, config_type: type[ConfigT]) list[BenchmarkResult[ConfigT]][source]

Load benchmark results from a CSV file.

Reconstructs BenchmarkResult objects from the flattened CSV format created by save_results_to_csv(). Each row in the CSV is parsed into a BenchmarkResult with the appropriate config type.

Parameters:
  • input_file – Input CSV file path

  • config_type – The dataclass type to use for the config field

Returns:

List of BenchmarkResult instances with parsed config objects

Raises:
save_results_to_csv(results: list[BenchmarkResult[Any]], output_file: str) None[source]

Save benchmark results to a CSV file.

Flattens the nested BenchmarkResult structure (config + performance metrics) into a flat CSV format. Each row contains both the benchmark configuration fields and the performance metrics.

Parameters:
  • results – List of BenchmarkResult instances

  • output_file – Output file path for the CSV file

Classes

class BenchmarkRunner(executor_type: ExecutorType, num_workers: int, warmup_iterations: int | None = None)[source]

Runner for executing benchmarks with configurable executors.

This class provides a standardized way to run benchmarks with:

  • Warmup phase to exclude executor initialization overhead

  • Multiple runs for statistical confidence intervals

  • Support for different executor types

The executor is initialized and warmed up in the constructor to exclude initialization overhead from benchmark measurements.

Parameters:
  • executor_type – Type of executor to use ("thread", "process", or "interpreter")

  • num_workers – Number of concurrent workers

  • warmup_iterations – Number of warmup iterations (default: 2 * num_workers)

property executor_type: ExecutorType[source]

Get the executor type.

run(config: ConfigT, func: Callable[[], T], iterations: int, num_runs: int = 5, confidence_level: float = 0.95) tuple[BenchmarkResult[ConfigT], T][source]

Run benchmark and return results with configuration.

Parameters:
  • config – Benchmark-specific configuration

  • func – Function to benchmark (takes no arguments)

  • iterations – Number of iterations per run

  • num_runs – Number of benchmark runs for confidence interval calculation (default: 5)

  • confidence_level – Confidence level for interval calculation (default: 0.95)

Returns:

Tuple of (BenchmarkResult, last output from function)

class BenchmarkResult[source]

Generic benchmark result containing configuration and performance metrics.

This class holds both the benchmark-specific configuration and the common performance statistics. It is parameterized by the config type, which allows each benchmark script to define its own configuration dataclass.

ci_lower: float

Lower bound of 95% confidence interval for QPS

ci_upper: float

Upper bound of 95% confidence interval for QPS

config: ConfigT

Benchmark-specific configuration (e.g., data format, file size, etc.)

cpu_percent: float

Average CPU utilization percentage during benchmark execution.

date: str

When benchmark was run. ISO 8601 format.

executor_type: str

Type of executor used (thread, process, or interpreter)

free_threaded: bool = False

Whether Python is running with free-threaded ABI.

python_version: str = '3.12.12'

Python version used for the benchmark

qps: float

Queries per second (mean)

class ExecutorType[source]

Supported executor types for concurrent execution.

INTERPRETER = 'interpreter'

Use InterpreterPoolExecutor.

Requires Python 3.14+.

PROCESS = 'process'

Use ProcessPoolExecutor.

THREAD = 'thread'

Use ThreadPoolExecutor.