Benchmark utils¶
Common utilities for benchmark scripts.
This module provides a standardized framework for running benchmarks with:
Configurable executor types (
ThreadPoolExecutor,ProcessPoolExecutor,InterpreterPoolExecutor)Warmup phase to exclude executor initialization overhead
Statistical analysis with confidence intervals
CSV export functionality
Python version and free-threaded ABI detection
Source¶
Source
Click here to see the source.
1#!/usr/bin/env python3
2# Copyright (c) Meta Platforms, Inc. and affiliates.
3# All rights reserved.
4#
5# This source code is licensed under the BSD-style license found in the
6# LICENSE file in the root directory of this source tree.
7
8# pyre-strict
9
10"""Common utilities for benchmark scripts.
11
12This module provides a standardized framework for running benchmarks with:
13
14- Configurable executor types (
15 :py:class:`~concurrent.futures.ThreadPoolExecutor`,
16 :py:class:`~concurrent.futures.ProcessPoolExecutor`,
17 :py:class:`~concurrent.futures.InterpreterPoolExecutor`)
18- Warmup phase to exclude executor initialization overhead
19- Statistical analysis with confidence intervals
20- CSV export functionality
21- Python version and free-threaded ABI detection
22
23.. seealso::
24
25 - :doc:`./benchmark_tarfile`
26 - :doc:`./benchmark_wav`
27 - :doc:`./benchmark_numpy`
28
29"""
30
31__all__ = [
32 "BenchmarkRunner",
33 "BenchmarkResult",
34 "ExecutorType",
35 "get_default_result_path",
36 "load_results_from_csv",
37 "save_results_to_csv",
38]
39
40import csv
41import os
42import sys
43import time
44from collections.abc import Callable
45from concurrent.futures import (
46 as_completed,
47 Executor,
48 ProcessPoolExecutor,
49 ThreadPoolExecutor,
50)
51from dataclasses import asdict, dataclass, field
52from datetime import datetime, timezone
53from enum import Enum
54from functools import partial
55from sys import version_info
56from types import TracebackType
57from typing import Any, Generic, TypeVar
58
59import numpy as np
60import psutil
61import scipy.stats
62
63T = TypeVar("T")
64ConfigT = TypeVar("ConfigT")
65
66
67def _is_free_threaded() -> bool:
68 """Check if Python is running with free-threaded ABI."""
69 try:
70 return not sys._is_gil_enabled() # pyre-ignore[16]
71 except AttributeError:
72 return False
73
74
75_PYTHON_VERSION: str = f"{version_info.major}.{version_info.minor}.{version_info.micro}"
76_FREE_THREADED: bool = _is_free_threaded()
77
78
79@dataclass
80class BenchmarkResult(Generic[ConfigT]):
81 """BenchmarkResult()
82
83 Generic benchmark result containing configuration and performance metrics.
84
85 This class holds both the benchmark-specific configuration and the
86 common performance statistics. It is parameterized by the config type,
87 which allows each benchmark script to define its own configuration dataclass.
88 """
89
90 config: ConfigT
91 """Benchmark-specific configuration (e.g., data format, file size, etc.)"""
92
93 executor_type: str
94 """Type of executor used (thread, process, or interpreter)"""
95
96 qps: float
97 """Queries per second (mean)"""
98
99 ci_lower: float
100 """Lower bound of 95% confidence interval for QPS"""
101
102 ci_upper: float
103 """Upper bound of 95% confidence interval for QPS"""
104
105 date: str
106 """When benchmark was run. ISO 8601 format."""
107
108 cpu_percent: float
109 """Average CPU utilization percentage during benchmark execution."""
110
111 python_version: str = field(default=_PYTHON_VERSION)
112 """Python version used for the benchmark"""
113
114 free_threaded: bool = field(default=_FREE_THREADED)
115 """Whether Python is running with free-threaded ABI."""
116
117
118class ExecutorType(Enum):
119 """ExecutorType()
120
121 Supported executor types for concurrent execution."""
122
123 THREAD = "thread"
124 """Use :py:class:`~concurrent.futures.ThreadPoolExecutor`."""
125
126 PROCESS = "process"
127 """Use :py:class:`~concurrent.futures.ProcessPoolExecutor`."""
128
129 INTERPRETER = "interpreter"
130 """Use :py:class:`~concurrent.futures.InterpreterPoolExecutor`.
131
132 Requires Python 3.14+.
133 """
134
135
136def _create_executor(executor_type: ExecutorType, max_workers: int) -> Executor:
137 """Create an executor of the specified type.
138
139 Args:
140 executor_type: Type of executor to create
141 max_workers: Maximum number of workers
142
143 Returns:
144 Executor instance
145
146 Raises:
147 ValueError: If ``executor_type`` is not supported
148 """
149 match executor_type:
150 case ExecutorType.THREAD:
151 return ThreadPoolExecutor(max_workers=max_workers)
152 case ExecutorType.PROCESS:
153 return ProcessPoolExecutor(max_workers=max_workers)
154 case ExecutorType.INTERPRETER:
155 from concurrent.futures import InterpreterPoolExecutor # pyre-ignore[21]
156
157 return InterpreterPoolExecutor(max_workers=max_workers)
158 case _:
159 raise ValueError(f"Unsupported executor type: {executor_type}")
160
161
162def _verify_workers(executor: Executor, expected_workers: int) -> None:
163 """Verify that the executor has created the expected number of workers.
164
165 Args:
166 executor: The executor to verify
167 expected_workers: Expected number of workers
168
169 Raises:
170 RuntimeError: If the number of workers doesn't match expected
171 """
172 match executor:
173 case ThreadPoolExecutor():
174 actual_workers = len(executor._threads)
175 case ProcessPoolExecutor():
176 actual_workers = len(executor._processes)
177 case _:
178 raise ValueError(f"Unexpected executor type {type(executor)}")
179
180 if actual_workers != expected_workers:
181 raise RuntimeError(
182 f"Expected {expected_workers} workers, but executor has {actual_workers}"
183 )
184
185
186def _warmup_executor(
187 executor: Executor, func: Callable[[], T], num_iterations: int
188) -> T:
189 """Warmup the executor by running the function multiple times.
190
191 Args:
192 executor: The executor to warmup
193 func: Function to run for warmup
194 num_iterations: Number of warmup iterations
195
196 Returns:
197 Output from the last warmup iteration
198 """
199 futures = [executor.submit(func) for _ in range(num_iterations)]
200 last_output: T | None = None
201 for future in as_completed(futures):
202 last_output = future.result()
203 assert last_output is not None
204 return last_output
205
206
207class BenchmarkRunner:
208 """Runner for executing benchmarks with configurable executors.
209
210 This class provides a standardized way to run benchmarks with:
211
212 - Warmup phase to exclude executor initialization overhead
213 - Multiple runs for statistical confidence intervals
214 - Support for different executor types
215
216 The executor is initialized and warmed up in the constructor to exclude
217 initialization overhead from benchmark measurements.
218
219 Args:
220 executor_type: Type of executor to use
221 (``"thread"``, ``"process"``, or ``"interpreter"``)
222 num_workers: Number of concurrent workers
223 warmup_iterations: Number of warmup iterations (default: ``2 * num_workers``)
224 """
225
226 def __init__(
227 self,
228 executor_type: ExecutorType,
229 num_workers: int,
230 warmup_iterations: int | None = None,
231 ) -> None:
232 self._executor_type: ExecutorType = executor_type
233
234 warmup_iters = (
235 warmup_iterations if warmup_iterations is not None else 2 * num_workers
236 )
237
238 self._executor: Executor = _create_executor(executor_type, num_workers)
239
240 _warmup_executor(self._executor, partial(time.sleep, 1), warmup_iters)
241 _verify_workers(self._executor, num_workers)
242
243 @property
244 def executor_type(self) -> ExecutorType:
245 """Get the executor type."""
246 return self._executor_type
247
248 def __enter__(self) -> "BenchmarkRunner":
249 """Enter context manager."""
250 return self
251
252 def __exit__(
253 self,
254 exc_type: type[BaseException] | None,
255 exc_val: BaseException | None,
256 exc_tb: TracebackType | None,
257 ) -> None:
258 """Exit context manager and shutdown executor."""
259 self._executor.shutdown(wait=True)
260
261 def _run_iterations(
262 self,
263 func: Callable[[], T],
264 iterations: int,
265 num_runs: int,
266 ) -> tuple[list[float], list[float], T]:
267 """Run benchmark iterations and collect QPS and CPU utilization samples.
268
269 Args:
270 func: Function to benchmark (takes no arguments)
271 iterations: Number of iterations per run
272 num_runs: Number of benchmark runs
273
274 Returns:
275 Tuple of (list of QPS samples, list of CPU percent samples, last function output)
276 """
277 qps_samples: list[float] = []
278 cpu_samples: list[float] = []
279 last_output: T | None = None
280
281 process = psutil.Process()
282
283 for _ in range(num_runs):
284 process.cpu_percent()
285 t0 = time.perf_counter()
286 futures = [self._executor.submit(func) for _ in range(iterations)]
287 for future in as_completed(futures):
288 last_output = future.result()
289 elapsed = time.perf_counter() - t0
290 cpu_percent = process.cpu_percent()
291 qps_samples.append(iterations / elapsed)
292 cpu_samples.append(cpu_percent / iterations)
293
294 assert last_output is not None
295 return qps_samples, cpu_samples, last_output
296
297 def run(
298 self,
299 config: ConfigT,
300 func: Callable[[], T],
301 iterations: int,
302 num_runs: int = 5,
303 confidence_level: float = 0.95,
304 ) -> tuple[BenchmarkResult[ConfigT], T]:
305 """Run benchmark and return results with configuration.
306
307 Args:
308 config: Benchmark-specific configuration
309 func: Function to benchmark (takes no arguments)
310 iterations: Number of iterations per run
311 num_runs: Number of benchmark runs for confidence interval calculation
312 (default: ``5``)
313 confidence_level: Confidence level for interval calculation (default: ``0.95``)
314
315 Returns:
316 Tuple of (``BenchmarkResult``, last output from function)
317 """
318 qps_samples, cpu_samples, last_output = self._run_iterations(
319 func, iterations, num_runs
320 )
321
322 qps_mean = np.mean(qps_samples)
323 qps_std = np.std(qps_samples, ddof=1)
324 degrees_freedom = num_runs - 1
325 confidence_interval = scipy.stats.t.interval(
326 confidence_level,
327 degrees_freedom,
328 loc=qps_mean,
329 scale=qps_std / np.sqrt(num_runs),
330 )
331
332 cpu_mean = np.mean(cpu_samples)
333
334 date = datetime.now(timezone.utc).isoformat()
335
336 result = BenchmarkResult(
337 config=config,
338 executor_type=self.executor_type.value,
339 qps=float(qps_mean),
340 # pyrefly: ignore [bad-argument-type]
341 ci_lower=float(confidence_interval[0]),
342 # pyrefly: ignore [bad-argument-type]
343 ci_upper=float(confidence_interval[1]),
344 date=date,
345 cpu_percent=float(cpu_mean),
346 )
347
348 return result, last_output
349
350
351def get_default_result_path(path: str, ext: str = ".csv") -> str:
352 """Get the default result path with Python version appended."""
353 base, _ = os.path.splitext(os.path.realpath(path))
354 dirname = os.path.join(os.path.dirname(base), "data")
355 filename = os.path.basename(base)
356 version_suffix = (
357 f"_{'.'.join(_PYTHON_VERSION.split('.')[:2])}{'t' if _FREE_THREADED else ''}"
358 )
359 return os.path.join(dirname, f"{filename}{version_suffix}{ext}")
360
361
362def save_results_to_csv(
363 results: list[BenchmarkResult[Any]],
364 output_file: str,
365) -> None:
366 """Save benchmark results to a CSV file.
367
368 Flattens the nested BenchmarkResult structure (config + performance metrics)
369 into a flat CSV format. Each row contains both the benchmark configuration
370 fields and the performance metrics.
371
372 Args:
373 results: List of BenchmarkResult instances
374 output_file: Output file path for the CSV file
375 """
376 if not results:
377 raise ValueError("No results to save")
378
379 flattened_results = []
380 for result in results:
381 config_dict = asdict(result.config)
382 # convert bool to int for slight readability improvement of raw CSV file
383 config_dict = {
384 k: (int(v) if isinstance(v, bool) else v) for k, v in config_dict.items()
385 }
386 flattened = {
387 "date": result.date,
388 "python_version": result.python_version,
389 "free_threaded": int(result.free_threaded),
390 **config_dict,
391 "executor_type": result.executor_type,
392 "qps": result.qps,
393 "ci_lower": result.ci_lower,
394 "ci_upper": result.ci_upper,
395 "cpu_percent": result.cpu_percent,
396 }
397 flattened_results.append(flattened)
398
399 # Get all field names from the first result
400 fieldnames = list(flattened_results[0].keys())
401
402 output_path = os.path.realpath(output_file)
403 os.makedirs(os.path.dirname(output_file), exist_ok=True)
404 with open(output_path, "w", newline="") as csvfile:
405 # Write generated marker as first line
406 # Note: Splitting the marker so as to avoid linter consider this file as generated file
407 csvfile.write("# @")
408 csvfile.write("generated\n")
409
410 writer = csv.DictWriter(csvfile, fieldnames=fieldnames)
411 writer.writeheader()
412 for result_dict in flattened_results:
413 writer.writerow(result_dict)
414
415 print(f"Results saved to {output_file}")
416
417
418def load_results_from_csv(
419 input_file: str,
420 config_type: type[ConfigT],
421) -> list[BenchmarkResult[ConfigT]]:
422 """Load benchmark results from a CSV file.
423
424 Reconstructs BenchmarkResult objects from the flattened CSV format created
425 by :py:func:`save_results_to_csv`.
426 Each row in the CSV is parsed into a :py:class:`BenchmarkResult`
427 with the appropriate config type.
428
429 Args:
430 input_file: Input CSV file path
431 config_type: The dataclass type to use for the config field
432
433 Returns:
434 List of BenchmarkResult instances with parsed config objects
435
436 Raises:
437 FileNotFoundError: If input_file does not exist
438 ValueError: If CSV format is invalid or ``config_type`` is not a dataclass
439 """
440 if not hasattr(config_type, "__dataclass_fields__"):
441 raise ValueError(f"config_type must be a dataclass, got {config_type}")
442 fields: dict[str, Any] = config_type.__dataclass_fields__ # pyre-ignore[16]
443
444 # Normalize input path and resolve symbolic links
445 input_file = os.path.realpath(input_file)
446
447 # Get the field names from the config dataclass
448 config_fields = set(fields.keys())
449
450 # Performance metric fields that are part of BenchmarkResult
451 result_fields = {
452 "executor_type",
453 "qps",
454 "ci_lower",
455 "ci_upper",
456 "date",
457 "python_version",
458 "free_threaded",
459 "cpu_percent",
460 }
461
462 results: list[BenchmarkResult[ConfigT]] = []
463
464 TRUES = ("true", "1", "yes")
465
466 with open(input_file, newline="") as csvfile:
467 reader = csv.DictReader((v for v in csvfile if not v.strip().startswith("#")))
468
469 for row in reader:
470 # Split row into config fields and result fields
471 config_dict = {}
472 result_dict = {}
473
474 for key, value in row.items():
475 if key in config_fields:
476 config_dict[key] = value
477 elif key in result_fields:
478 result_dict[key] = value
479 else:
480 # Unknown field - could be from config or result
481 # Try to infer based on whether it matches a config field name
482 config_dict[key] = value
483
484 # Convert string values to appropriate types for config
485 typed_config_dict = {}
486 for field_name, field_info in fields.items():
487 if field_name not in config_dict:
488 continue
489
490 value = config_dict[field_name]
491 field_type = field_info.type
492
493 # Handle type conversions
494 if field_type is int or field_type == "int":
495 typed_config_dict[field_name] = int(value)
496 elif field_type is float or field_type == "float":
497 # pyrefly: ignore [unsupported-operation]
498 typed_config_dict[field_name] = float(value)
499 elif field_type is bool or field_type == "bool":
500 typed_config_dict[field_name] = value.lower() in TRUES
501 else:
502 # Keep as string or use the value as-is
503 # pyrefly: ignore [unsupported-operation]
504 typed_config_dict[field_name] = value
505
506 result = BenchmarkResult(
507 config=config_type(**typed_config_dict),
508 executor_type=result_dict["executor_type"],
509 qps=float(result_dict["qps"]),
510 ci_lower=float(result_dict["ci_lower"]),
511 ci_upper=float(result_dict["ci_upper"]),
512 date=result_dict["date"],
513 python_version=result_dict["python_version"],
514 free_threaded=result_dict["free_threaded"].lower()
515 in ("true", "1", "yes"),
516 cpu_percent=float(result_dict.get("cpu_percent", 0.0)),
517 )
518
519 results.append(result)
520
521 return results
API Reference¶
Functions
- get_default_result_path(path: str, ext: str = '.csv') str[source]¶
Get the default result path with Python version appended.
- load_results_from_csv(input_file: str, config_type: type[ConfigT]) list[BenchmarkResult[ConfigT]][source]¶
Load benchmark results from a CSV file.
Reconstructs BenchmarkResult objects from the flattened CSV format created by
save_results_to_csv(). Each row in the CSV is parsed into aBenchmarkResultwith the appropriate config type.- Parameters:
input_file – Input CSV file path
config_type – The dataclass type to use for the config field
- Returns:
List of BenchmarkResult instances with parsed config objects
- Raises:
FileNotFoundError – If input_file does not exist
ValueError – If CSV format is invalid or
config_typeis not a dataclass
- save_results_to_csv(results: list[BenchmarkResult[Any]], output_file: str) None[source]¶
Save benchmark results to a CSV file.
Flattens the nested BenchmarkResult structure (config + performance metrics) into a flat CSV format. Each row contains both the benchmark configuration fields and the performance metrics.
- Parameters:
results – List of BenchmarkResult instances
output_file – Output file path for the CSV file
Classes
- class BenchmarkRunner(executor_type: ExecutorType, num_workers: int, warmup_iterations: int | None = None)[source]¶
Runner for executing benchmarks with configurable executors.
This class provides a standardized way to run benchmarks with:
Warmup phase to exclude executor initialization overhead
Multiple runs for statistical confidence intervals
Support for different executor types
The executor is initialized and warmed up in the constructor to exclude initialization overhead from benchmark measurements.
- Parameters:
executor_type – Type of executor to use (
"thread","process", or"interpreter")num_workers – Number of concurrent workers
warmup_iterations – Number of warmup iterations (default:
2 * num_workers)
- property executor_type: ExecutorType[source]¶
Get the executor type.
- run(config: ConfigT, func: Callable[[], T], iterations: int, num_runs: int = 5, confidence_level: float = 0.95) tuple[BenchmarkResult[ConfigT], T][source]¶
Run benchmark and return results with configuration.
- Parameters:
config – Benchmark-specific configuration
func – Function to benchmark (takes no arguments)
iterations – Number of iterations per run
num_runs – Number of benchmark runs for confidence interval calculation (default:
5)confidence_level – Confidence level for interval calculation (default:
0.95)
- Returns:
Tuple of (
BenchmarkResult, last output from function)
- class BenchmarkResult[source]¶
Generic benchmark result containing configuration and performance metrics.
This class holds both the benchmark-specific configuration and the common performance statistics. It is parameterized by the config type, which allows each benchmark script to define its own configuration dataclass.
- config: ConfigT¶
Benchmark-specific configuration (e.g., data format, file size, etc.)
- class ExecutorType[source]¶
Supported executor types for concurrent execution.
- INTERPRETER = 'interpreter'¶
-
Requires Python 3.14+.
- PROCESS = 'process'¶
Use
ProcessPoolExecutor.
- THREAD = 'thread'¶
Use
ThreadPoolExecutor.