Benchmark utils

Common utilities for benchmark scripts.

This module provides a standardized framework for running benchmarks with:

Source

Source

Click here to see the source.
  1#!/usr/bin/env python3
  2# Copyright (c) Meta Platforms, Inc. and affiliates.
  3# All rights reserved.
  4#
  5# This source code is licensed under the BSD-style license found in the
  6# LICENSE file in the root directory of this source tree.
  7
  8"""Common utilities for benchmark scripts.
  9
 10This module provides a standardized framework for running benchmarks with:
 11
 12- Configurable executor types (
 13  :py:class:`~concurrent.futures.ThreadPoolExecutor`,
 14  :py:class:`~concurrent.futures.ProcessPoolExecutor`,
 15  :py:class:`~concurrent.futures.InterpreterPoolExecutor`)
 16- Warmup phase to exclude executor initialization overhead
 17- Statistical analysis with confidence intervals
 18- CSV export functionality
 19- Python version and free-threaded ABI detection
 20
 21.. seealso::
 22
 23   - :doc:`./benchmark_tarfile`
 24   - :doc:`./benchmark_wav`
 25   - :doc:`./benchmark_numpy`
 26
 27"""
 28
 29__all__ = [
 30    "BenchmarkRunner",
 31    "BenchmarkResult",
 32    "ExecutorType",
 33    "get_default_result_path",
 34    "load_results_from_csv",
 35    "save_results_to_csv",
 36]
 37
 38import csv
 39import os
 40import sys
 41import time
 42from collections.abc import Callable
 43from concurrent.futures import (
 44    as_completed,
 45    Executor,
 46    ProcessPoolExecutor,
 47    ThreadPoolExecutor,
 48)
 49from dataclasses import asdict, dataclass, field
 50from datetime import datetime, timezone
 51from enum import Enum
 52from functools import partial
 53from sys import version_info
 54from typing import Any, Generic, TypeVar
 55
 56import numpy as np
 57import psutil
 58import scipy.stats
 59
 60T = TypeVar("T")
 61ConfigT = TypeVar("ConfigT")
 62
 63
 64def _is_free_threaded() -> bool:
 65    """Check if Python is running with free-threaded ABI."""
 66    try:
 67        return not sys._is_gil_enabled()  # pyre-ignore[16]
 68    except AttributeError:
 69        return False
 70
 71
 72_PYTHON_VERSION = f"{version_info.major}.{version_info.minor}.{version_info.micro}"
 73_FREE_THREADED = _is_free_threaded()
 74
 75
 76@dataclass
 77class BenchmarkResult(Generic[ConfigT]):
 78    """BenchmarkResult()
 79
 80    Generic benchmark result containing configuration and performance metrics.
 81
 82    This class holds both the benchmark-specific configuration and the
 83    common performance statistics. It is parameterized by the config type,
 84    which allows each benchmark script to define its own configuration dataclass.
 85    """
 86
 87    config: ConfigT
 88    """Benchmark-specific configuration (e.g., data format, file size, etc.)"""
 89
 90    executor_type: str
 91    """Type of executor used (thread, process, or interpreter)"""
 92
 93    qps: float
 94    """Queries per second (mean)"""
 95
 96    ci_lower: float
 97    """Lower bound of 95% confidence interval for QPS"""
 98
 99    ci_upper: float
100    """Upper bound of 95% confidence interval for QPS"""
101
102    date: str
103    """When benchmark was run. ISO 8601 format."""
104
105    cpu_percent: float
106    """Average CPU utilization percentage during benchmark execution."""
107
108    python_version: str = field(default=_PYTHON_VERSION)
109    """Python version used for the benchmark"""
110
111    free_threaded: bool = field(default=_FREE_THREADED)
112    """Whether Python is running with free-threaded ABI."""
113
114
115class ExecutorType(Enum):
116    """ExecutorType()
117
118    Supported executor types for concurrent execution."""
119
120    THREAD = "thread"
121    """Use :py:class:`~concurrent.futures.ThreadPoolExecutor`."""
122
123    PROCESS = "process"
124    """Use :py:class:`~concurrent.futures.ProcessPoolExecutor`."""
125
126    INTERPRETER = "interpreter"
127    """Use :py:class:`~concurrent.futures.InterpreterPoolExecutor`.
128
129    Requires Python 3.14+.
130    """
131
132
133def _create_executor(executor_type: ExecutorType, max_workers: int) -> Executor:
134    """Create an executor of the specified type.
135
136    Args:
137        executor_type: Type of executor to create
138        max_workers: Maximum number of workers
139
140    Returns:
141        Executor instance
142
143    Raises:
144        ValueError: If ``executor_type`` is not supported
145    """
146    match executor_type:
147        case ExecutorType.THREAD:
148            return ThreadPoolExecutor(max_workers=max_workers)
149        case ExecutorType.PROCESS:
150            return ProcessPoolExecutor(max_workers=max_workers)
151        case ExecutorType.INTERPRETER:
152            from concurrent.futures import InterpreterPoolExecutor  # pyre-ignore[21]
153
154            return InterpreterPoolExecutor(max_workers=max_workers)
155        case _:
156            raise ValueError(f"Unsupported executor type: {executor_type}")
157
158
159def _verify_workers(executor: Executor, expected_workers: int) -> None:
160    """Verify that the executor has created the expected number of workers.
161
162    Args:
163        executor: The executor to verify
164        expected_workers: Expected number of workers
165
166    Raises:
167        RuntimeError: If the number of workers doesn't match expected
168    """
169    match executor:
170        case ThreadPoolExecutor():
171            actual_workers = len(executor._threads)
172        case ProcessPoolExecutor():
173            actual_workers = len(executor._processes)
174        case _:
175            raise ValueError(f"Unexpected executor type {type(executor)}")
176
177    if actual_workers != expected_workers:
178        raise RuntimeError(
179            f"Expected {expected_workers} workers, but executor has {actual_workers}"
180        )
181
182
183def _warmup_executor(
184    executor: Executor, func: Callable[[], T], num_iterations: int
185) -> T:
186    """Warmup the executor by running the function multiple times.
187
188    Args:
189        executor: The executor to warmup
190        func: Function to run for warmup
191        num_iterations: Number of warmup iterations
192
193    Returns:
194        Output from the last warmup iteration
195    """
196    futures = [executor.submit(func) for _ in range(num_iterations)]
197    last_output: T | None = None
198    for future in as_completed(futures):
199        last_output = future.result()
200    assert last_output is not None
201    return last_output
202
203
204class BenchmarkRunner:
205    """Runner for executing benchmarks with configurable executors.
206
207    This class provides a standardized way to run benchmarks with:
208
209    - Warmup phase to exclude executor initialization overhead
210    - Multiple runs for statistical confidence intervals
211    - Support for different executor types
212
213    The executor is initialized and warmed up in the constructor to exclude
214    initialization overhead from benchmark measurements.
215
216    Args:
217        executor_type: Type of executor to use
218            (``"thread"``, ``"process"``, or ``"interpreter"``)
219        num_workers: Number of concurrent workers
220        warmup_iterations: Number of warmup iterations (default: ``2 * num_workers``)
221    """
222
223    def __init__(
224        self,
225        executor_type: ExecutorType,
226        num_workers: int,
227        warmup_iterations: int | None = None,
228    ) -> None:
229        self._executor_type: ExecutorType = executor_type
230
231        warmup_iters = (
232            warmup_iterations if warmup_iterations is not None else 2 * num_workers
233        )
234
235        self._executor: Executor = _create_executor(executor_type, num_workers)
236
237        _warmup_executor(self._executor, partial(time.sleep, 1), warmup_iters)
238        _verify_workers(self._executor, num_workers)
239
240    @property
241    def executor_type(self) -> ExecutorType:
242        """Get the executor type."""
243        return self._executor_type
244
245    def __enter__(self) -> "BenchmarkRunner":
246        """Enter context manager."""
247        return self
248
249    def __exit__(self, exc_type: Any, exc_val: Any, exc_tb: Any) -> None:
250        """Exit context manager and shutdown executor."""
251        self._executor.shutdown(wait=True)
252
253    def _run_iterations(
254        self,
255        func: Callable[[], T],
256        iterations: int,
257        num_runs: int,
258    ) -> tuple[list[float], list[float], T]:
259        """Run benchmark iterations and collect QPS and CPU utilization samples.
260
261        Args:
262            func: Function to benchmark (takes no arguments)
263            iterations: Number of iterations per run
264            num_runs: Number of benchmark runs
265
266        Returns:
267            Tuple of (list of QPS samples, list of CPU percent samples, last function output)
268        """
269        qps_samples: list[float] = []
270        cpu_samples: list[float] = []
271        last_output: T | None = None
272
273        process = psutil.Process()
274
275        for _ in range(num_runs):
276            process.cpu_percent()
277            t0 = time.perf_counter()
278            futures = [self._executor.submit(func) for _ in range(iterations)]
279            for future in as_completed(futures):
280                last_output = future.result()
281            elapsed = time.perf_counter() - t0
282            cpu_percent = process.cpu_percent()
283            qps_samples.append(iterations / elapsed)
284            cpu_samples.append(cpu_percent / iterations)
285
286        assert last_output is not None
287        return qps_samples, cpu_samples, last_output
288
289    def run(
290        self,
291        config: ConfigT,
292        func: Callable[[], T],
293        iterations: int,
294        num_runs: int = 5,
295        confidence_level: float = 0.95,
296    ) -> tuple[BenchmarkResult[ConfigT], T]:
297        """Run benchmark and return results with configuration.
298
299        Args:
300            config: Benchmark-specific configuration
301            func: Function to benchmark (takes no arguments)
302            iterations: Number of iterations per run
303            num_runs: Number of benchmark runs for confidence interval calculation
304                (default: ``5``)
305            confidence_level: Confidence level for interval calculation (default: ``0.95``)
306
307        Returns:
308            Tuple of (``BenchmarkResult``, last output from function)
309        """
310        qps_samples, cpu_samples, last_output = self._run_iterations(
311            func, iterations, num_runs
312        )
313
314        qps_mean = np.mean(qps_samples)
315        qps_std = np.std(qps_samples, ddof=1)
316        degrees_freedom = num_runs - 1
317        confidence_interval = scipy.stats.t.interval(
318            confidence_level,
319            degrees_freedom,
320            loc=qps_mean,
321            scale=qps_std / np.sqrt(num_runs),
322        )
323
324        cpu_mean = np.mean(cpu_samples)
325
326        date = datetime.now(timezone.utc).isoformat()
327
328        result = BenchmarkResult(
329            config=config,
330            executor_type=self.executor_type.value,
331            qps=float(qps_mean),
332            ci_lower=float(confidence_interval[0]),
333            ci_upper=float(confidence_interval[1]),
334            date=date,
335            cpu_percent=float(cpu_mean),
336        )
337
338        return result, last_output
339
340
341def get_default_result_path(path: str, ext: str = ".csv") -> str:
342    """Get the default result path with Python version appended."""
343    base, _ = os.path.splitext(os.path.realpath(path))
344    dirname = os.path.join(os.path.dirname(base), "data")
345    filename = os.path.basename(base)
346    version_suffix = (
347        f"_{'.'.join(_PYTHON_VERSION.split('.')[:2])}{'t' if _FREE_THREADED else ''}"
348    )
349    return os.path.join(dirname, f"{filename}{version_suffix}{ext}")
350
351
352def save_results_to_csv(
353    results: list[BenchmarkResult[Any]],
354    output_file: str,
355) -> None:
356    """Save benchmark results to a CSV file.
357
358    Flattens the nested BenchmarkResult structure (config + performance metrics)
359    into a flat CSV format. Each row contains both the benchmark configuration
360    fields and the performance metrics.
361
362    Args:
363        results: List of BenchmarkResult instances
364        output_file: Output file path for the CSV file
365    """
366    if not results:
367        raise ValueError("No results to save")
368
369    flattened_results = []
370    for result in results:
371        config_dict = asdict(result.config)
372        # convert bool to int for slight readability improvement of raw CSV file
373        config_dict = {
374            k: (int(v) if isinstance(v, bool) else v) for k, v in config_dict.items()
375        }
376        flattened = {
377            "date": result.date,
378            "python_version": result.python_version,
379            "free_threaded": int(result.free_threaded),
380            **config_dict,
381            "executor_type": result.executor_type,
382            "qps": result.qps,
383            "ci_lower": result.ci_lower,
384            "ci_upper": result.ci_upper,
385            "cpu_percent": result.cpu_percent,
386        }
387        flattened_results.append(flattened)
388
389    # Get all field names from the first result
390    fieldnames = list(flattened_results[0].keys())
391
392    output_path = os.path.realpath(output_file)
393    os.makedirs(os.path.dirname(output_file), exist_ok=True)
394    with open(output_path, "w", newline="") as csvfile:
395        # Write generated marker as first line
396        # Note: Splitting the marker so as to avoid linter consider this file as generated file
397        csvfile.write("# @")
398        csvfile.write("generated\n")
399
400        writer = csv.DictWriter(csvfile, fieldnames=fieldnames)
401        writer.writeheader()
402        for result_dict in flattened_results:
403            writer.writerow(result_dict)
404
405    print(f"Results saved to {output_file}")
406
407
408def load_results_from_csv(
409    input_file: str,
410    config_type: type[ConfigT],
411) -> list[BenchmarkResult[ConfigT]]:
412    """Load benchmark results from a CSV file.
413
414    Reconstructs BenchmarkResult objects from the flattened CSV format created
415    by :py:func:`save_results_to_csv`.
416    Each row in the CSV is parsed into a :py:class:`BenchmarkResult`
417    with the appropriate config type.
418
419    Args:
420        input_file: Input CSV file path
421        config_type: The dataclass type to use for the config field
422
423    Returns:
424        List of BenchmarkResult instances with parsed config objects
425
426    Raises:
427        FileNotFoundError: If input_file does not exist
428        ValueError: If CSV format is invalid or ``config_type`` is not a dataclass
429    """
430    if not hasattr(config_type, "__dataclass_fields__"):
431        raise ValueError(f"config_type must be a dataclass, got {config_type}")
432    fields: dict[str, Any] = config_type.__dataclass_fields__  # pyre-ignore[16]
433
434    # Normalize input path and resolve symbolic links
435    input_file = os.path.realpath(input_file)
436
437    # Get the field names from the config dataclass
438    config_fields = set(fields.keys())
439
440    # Performance metric fields that are part of BenchmarkResult
441    result_fields = {
442        "executor_type",
443        "qps",
444        "ci_lower",
445        "ci_upper",
446        "date",
447        "python_version",
448        "free_threaded",
449        "cpu_percent",
450    }
451
452    results: list[BenchmarkResult[ConfigT]] = []
453
454    TRUES = ("true", "1", "yes")
455
456    with open(input_file, newline="") as csvfile:
457        reader = csv.DictReader((v for v in csvfile if not v.strip().startswith("#")))
458
459        for row in reader:
460            # Split row into config fields and result fields
461            config_dict = {}
462            result_dict = {}
463
464            for key, value in row.items():
465                if key in config_fields:
466                    config_dict[key] = value
467                elif key in result_fields:
468                    result_dict[key] = value
469                else:
470                    # Unknown field - could be from config or result
471                    # Try to infer based on whether it matches a config field name
472                    config_dict[key] = value
473
474            # Convert string values to appropriate types for config
475            typed_config_dict = {}
476            for field_name, field_info in fields.items():
477                if field_name not in config_dict:
478                    continue
479
480                value = config_dict[field_name]
481                field_type = field_info.type
482
483                # Handle type conversions
484                if field_type is int or field_type == "int":
485                    typed_config_dict[field_name] = int(value)
486                elif field_type is float or field_type == "float":
487                    typed_config_dict[field_name] = float(value)
488                elif field_type is bool or field_type == "bool":
489                    typed_config_dict[field_name] = value.lower() in TRUES
490                else:
491                    # Keep as string or use the value as-is
492                    typed_config_dict[field_name] = value
493
494            result = BenchmarkResult(
495                config=config_type(**typed_config_dict),
496                executor_type=result_dict["executor_type"],
497                qps=float(result_dict["qps"]),
498                ci_lower=float(result_dict["ci_lower"]),
499                ci_upper=float(result_dict["ci_upper"]),
500                date=result_dict["date"],
501                python_version=result_dict["python_version"],
502                free_threaded=result_dict["free_threaded"].lower()
503                in ("true", "1", "yes"),
504                cpu_percent=float(result_dict.get("cpu_percent", 0.0)),
505            )
506
507            results.append(result)
508
509    return results

API Reference

Functions

get_default_result_path(path: str, ext: str = '.csv') str[source]

Get the default result path with Python version appended.

load_results_from_csv(input_file: str, config_type: type[ConfigT]) list[BenchmarkResult[ConfigT]][source]

Load benchmark results from a CSV file.

Reconstructs BenchmarkResult objects from the flattened CSV format created by save_results_to_csv(). Each row in the CSV is parsed into a BenchmarkResult with the appropriate config type.

Parameters:
  • input_file – Input CSV file path

  • config_type – The dataclass type to use for the config field

Returns:

List of BenchmarkResult instances with parsed config objects

Raises:
save_results_to_csv(results: list[BenchmarkResult[Any]], output_file: str) None[source]

Save benchmark results to a CSV file.

Flattens the nested BenchmarkResult structure (config + performance metrics) into a flat CSV format. Each row contains both the benchmark configuration fields and the performance metrics.

Parameters:
  • results – List of BenchmarkResult instances

  • output_file – Output file path for the CSV file

Classes

class BenchmarkRunner(executor_type: ExecutorType, num_workers: int, warmup_iterations: int | None = None)[source]

Runner for executing benchmarks with configurable executors.

This class provides a standardized way to run benchmarks with:

  • Warmup phase to exclude executor initialization overhead

  • Multiple runs for statistical confidence intervals

  • Support for different executor types

The executor is initialized and warmed up in the constructor to exclude initialization overhead from benchmark measurements.

Parameters:
  • executor_type – Type of executor to use ("thread", "process", or "interpreter")

  • num_workers – Number of concurrent workers

  • warmup_iterations – Number of warmup iterations (default: 2 * num_workers)

property executor_type: ExecutorType[source]

Get the executor type.

run(config: ConfigT, func: Callable[[], T], iterations: int, num_runs: int = 5, confidence_level: float = 0.95) tuple[BenchmarkResult[ConfigT], T][source]

Run benchmark and return results with configuration.

Parameters:
  • config – Benchmark-specific configuration

  • func – Function to benchmark (takes no arguments)

  • iterations – Number of iterations per run

  • num_runs – Number of benchmark runs for confidence interval calculation (default: 5)

  • confidence_level – Confidence level for interval calculation (default: 0.95)

Returns:

Tuple of (BenchmarkResult, last output from function)

class BenchmarkResult[source]

Generic benchmark result containing configuration and performance metrics.

This class holds both the benchmark-specific configuration and the common performance statistics. It is parameterized by the config type, which allows each benchmark script to define its own configuration dataclass.

ci_lower: float

Lower bound of 95% confidence interval for QPS

ci_upper: float

Upper bound of 95% confidence interval for QPS

config: ConfigT

Benchmark-specific configuration (e.g., data format, file size, etc.)

cpu_percent: float

Average CPU utilization percentage during benchmark execution.

date: str

When benchmark was run. ISO 8601 format.

executor_type: str

Type of executor used (thread, process, or interpreter)

free_threaded: bool = False

Whether Python is running with free-threaded ABI.

python_version: str = '3.12.13'

Python version used for the benchmark

qps: float

Queries per second (mean)

class ExecutorType[source]

Supported executor types for concurrent execution.

INTERPRETER = 'interpreter'

Use InterpreterPoolExecutor.

Requires Python 3.14+.

PROCESS = 'process'

Use ProcessPoolExecutor.

THREAD = 'thread'

Use ThreadPoolExecutor.