Benchmark utils¶
Common utilities for benchmark scripts.
This module provides a standardized framework for running benchmarks with:
Configurable executor types (
ThreadPoolExecutor,ProcessPoolExecutor,InterpreterPoolExecutor)Warmup phase to exclude executor initialization overhead
Statistical analysis with confidence intervals
CSV export functionality
Python version and free-threaded ABI detection
Source¶
Source
Click here to see the source.
1#!/usr/bin/env python3
2# Copyright (c) Meta Platforms, Inc. and affiliates.
3# All rights reserved.
4#
5# This source code is licensed under the BSD-style license found in the
6# LICENSE file in the root directory of this source tree.
7
8"""Common utilities for benchmark scripts.
9
10This module provides a standardized framework for running benchmarks with:
11
12- Configurable executor types (
13 :py:class:`~concurrent.futures.ThreadPoolExecutor`,
14 :py:class:`~concurrent.futures.ProcessPoolExecutor`,
15 :py:class:`~concurrent.futures.InterpreterPoolExecutor`)
16- Warmup phase to exclude executor initialization overhead
17- Statistical analysis with confidence intervals
18- CSV export functionality
19- Python version and free-threaded ABI detection
20
21.. seealso::
22
23 - :doc:`./benchmark_tarfile`
24 - :doc:`./benchmark_wav`
25 - :doc:`./benchmark_numpy`
26
27"""
28
29__all__ = [
30 "BenchmarkRunner",
31 "BenchmarkResult",
32 "ExecutorType",
33 "get_default_result_path",
34 "load_results_from_csv",
35 "save_results_to_csv",
36]
37
38import csv
39import os
40import sys
41import time
42from collections.abc import Callable
43from concurrent.futures import (
44 as_completed,
45 Executor,
46 ProcessPoolExecutor,
47 ThreadPoolExecutor,
48)
49from dataclasses import asdict, dataclass
50from datetime import datetime, timezone
51from enum import Enum
52from functools import partial
53from typing import Any, Generic, TypeVar
54
55import numpy as np
56import psutil
57import scipy.stats
58
59T = TypeVar("T")
60ConfigT = TypeVar("ConfigT")
61
62
63@dataclass
64class BenchmarkResult(Generic[ConfigT]):
65 """BenchmarkResult()
66
67 Generic benchmark result containing configuration and performance metrics.
68
69 This class holds both the benchmark-specific configuration and the
70 common performance statistics. It is parameterized by the config type,
71 which allows each benchmark script to define its own configuration dataclass.
72 """
73
74 config: ConfigT
75 """Benchmark-specific configuration (e.g., data format, file size, etc.)"""
76
77 executor_type: str
78 """Type of executor used (thread, process, or interpreter)"""
79
80 qps: float
81 """Queries per second (mean)"""
82
83 ci_lower: float
84 """Lower bound of 95% confidence interval for QPS"""
85
86 ci_upper: float
87 """Upper bound of 95% confidence interval for QPS"""
88
89 date: str
90 """When benchmark was run. ISO 8601 format."""
91
92 python_version: str
93 """Python version used for the benchmark"""
94
95 free_threaded: bool
96 """Whether Python is running with free-threaded ABI."""
97
98 cpu_percent: float
99 """Average CPU utilization percentage during benchmark execution."""
100
101
102class ExecutorType(Enum):
103 """ExecutorType()
104
105 Supported executor types for concurrent execution."""
106
107 THREAD = "thread"
108 """Use :py:class:`~concurrent.futures.ThreadPoolExecutor`."""
109
110 PROCESS = "process"
111 """Use :py:class:`~concurrent.futures.ProcessPoolExecutor`."""
112
113 INTERPRETER = "interpreter"
114 """Use :py:class:`~concurrent.futures.InterpreterPoolExecutor`.
115
116 Requires Python 3.14+.
117 """
118
119
120def _get_python_info() -> tuple[str, bool]:
121 """Get Python version and free-threaded ABI information.
122
123 Returns:
124 Tuple of (``python_version``, ``is_free_threaded``)
125 """
126 python_version = (
127 f"{sys.version_info.major}.{sys.version_info.minor}.{sys.version_info.micro}"
128 )
129 try:
130 is_free_threaded = not sys._is_gil_enabled() # pyre-ignore[16]
131 except AttributeError:
132 is_free_threaded = False
133 return python_version, is_free_threaded
134
135
136def _create_executor(executor_type: ExecutorType, max_workers: int) -> Executor:
137 """Create an executor of the specified type.
138
139 Args:
140 executor_type: Type of executor to create
141 max_workers: Maximum number of workers
142
143 Returns:
144 Executor instance
145
146 Raises:
147 ValueError: If ``executor_type`` is not supported
148 """
149 match executor_type:
150 case ExecutorType.THREAD:
151 return ThreadPoolExecutor(max_workers=max_workers)
152 case ExecutorType.PROCESS:
153 return ProcessPoolExecutor(max_workers=max_workers)
154 case ExecutorType.INTERPRETER:
155 from concurrent.futures import InterpreterPoolExecutor # pyre-ignore[21]
156
157 return InterpreterPoolExecutor(max_workers=max_workers)
158 case _:
159 raise ValueError(f"Unsupported executor type: {executor_type}")
160
161
162def _verify_workers(executor: Executor, expected_workers: int) -> None:
163 """Verify that the executor has created the expected number of workers.
164
165 Args:
166 executor: The executor to verify
167 expected_workers: Expected number of workers
168
169 Raises:
170 RuntimeError: If the number of workers doesn't match expected
171 """
172 match executor:
173 case ThreadPoolExecutor():
174 actual_workers = len(executor._threads)
175 case ProcessPoolExecutor():
176 actual_workers = len(executor._processes)
177 case _:
178 raise ValueError(f"Unexpected executor type {type(executor)}")
179
180 if actual_workers != expected_workers:
181 raise RuntimeError(
182 f"Expected {expected_workers} workers, but executor has {actual_workers}"
183 )
184
185
186def _warmup_executor(
187 executor: Executor, func: Callable[[], T], num_iterations: int
188) -> T:
189 """Warmup the executor by running the function multiple times.
190
191 Args:
192 executor: The executor to warmup
193 func: Function to run for warmup
194 num_iterations: Number of warmup iterations
195
196 Returns:
197 Output from the last warmup iteration
198 """
199 futures = [executor.submit(func) for _ in range(num_iterations)]
200 last_output: T | None = None
201 for future in as_completed(futures):
202 last_output = future.result()
203 return last_output # pyre-ignore[7]
204
205
206class BenchmarkRunner:
207 """Runner for executing benchmarks with configurable executors.
208
209 This class provides a standardized way to run benchmarks with:
210
211 - Warmup phase to exclude executor initialization overhead
212 - Multiple runs for statistical confidence intervals
213 - Support for different executor types
214
215 The executor is initialized and warmed up in the constructor to exclude
216 initialization overhead from benchmark measurements.
217
218 Args:
219 executor_type: Type of executor to use
220 (``"thread"``, ``"process"``, or ``"interpreter"``)
221 num_workers: Number of concurrent workers
222 warmup_iterations: Number of warmup iterations (default: ``2 * num_workers``)
223 """
224
225 def __init__(
226 self,
227 executor_type: ExecutorType,
228 num_workers: int,
229 warmup_iterations: int | None = None,
230 ) -> None:
231 self._executor_type: ExecutorType = executor_type
232
233 warmup_iters = (
234 warmup_iterations if warmup_iterations is not None else 2 * num_workers
235 )
236
237 self._executor: Executor = _create_executor(executor_type, num_workers)
238
239 _warmup_executor(self._executor, partial(time.sleep, 1), warmup_iters)
240 _verify_workers(self._executor, num_workers)
241
242 @property
243 def executor_type(self) -> ExecutorType:
244 """Get the executor type."""
245 return self._executor_type
246
247 def __enter__(self) -> "BenchmarkRunner":
248 """Enter context manager."""
249 return self
250
251 def __exit__(self, exc_type: Any, exc_val: Any, exc_tb: Any) -> None:
252 """Exit context manager and shutdown executor."""
253 self._executor.shutdown(wait=True)
254
255 def _run_iterations(
256 self,
257 func: Callable[[], T],
258 iterations: int,
259 num_runs: int,
260 ) -> tuple[list[float], list[float], T]:
261 """Run benchmark iterations and collect QPS and CPU utilization samples.
262
263 Args:
264 func: Function to benchmark (takes no arguments)
265 iterations: Number of iterations per run
266 num_runs: Number of benchmark runs
267
268 Returns:
269 Tuple of (list of QPS samples, list of CPU percent samples, last function output)
270 """
271 qps_samples: list[float] = []
272 cpu_samples: list[float] = []
273 last_output: T | None = None
274
275 process = psutil.Process()
276
277 for _ in range(num_runs):
278 process.cpu_percent()
279 t0 = time.perf_counter()
280 futures = [self._executor.submit(func) for _ in range(iterations)]
281 for future in as_completed(futures):
282 last_output = future.result()
283 elapsed = time.perf_counter() - t0
284 cpu_percent = process.cpu_percent()
285 qps_samples.append(iterations / elapsed)
286 cpu_samples.append(cpu_percent / iterations)
287
288 return qps_samples, cpu_samples, last_output # pyre-ignore[7]
289
290 def run(
291 self,
292 config: ConfigT,
293 func: Callable[[], T],
294 iterations: int,
295 num_runs: int = 5,
296 confidence_level: float = 0.95,
297 ) -> tuple[BenchmarkResult[ConfigT], T]:
298 """Run benchmark and return results with configuration.
299
300 Args:
301 config: Benchmark-specific configuration
302 func: Function to benchmark (takes no arguments)
303 iterations: Number of iterations per run
304 num_runs: Number of benchmark runs for confidence interval calculation
305 (default: ``5``)
306 confidence_level: Confidence level for interval calculation (default: ``0.95``)
307
308 Returns:
309 Tuple of (``BenchmarkResult``, last output from function)
310 """
311 qps_samples, cpu_samples, last_output = self._run_iterations(
312 func, iterations, num_runs
313 )
314
315 qps_mean = np.mean(qps_samples)
316 qps_std = np.std(qps_samples, ddof=1)
317 degrees_freedom = num_runs - 1
318 confidence_interval = scipy.stats.t.interval(
319 confidence_level,
320 degrees_freedom,
321 loc=qps_mean,
322 scale=qps_std / np.sqrt(num_runs),
323 )
324
325 cpu_mean = np.mean(cpu_samples)
326
327 python_version, free_threaded = _get_python_info()
328 date = datetime.now(timezone.utc).isoformat()
329
330 result = BenchmarkResult(
331 config=config,
332 executor_type=self.executor_type.value,
333 qps=float(qps_mean),
334 ci_lower=float(confidence_interval[0]),
335 ci_upper=float(confidence_interval[1]),
336 date=date,
337 python_version=python_version,
338 free_threaded=free_threaded,
339 cpu_percent=float(cpu_mean),
340 )
341
342 return result, last_output
343
344
345def get_default_result_path(path: str, ext: str = ".csv") -> str:
346 """Get the default result path with Python version appended."""
347 base, _ = os.path.splitext(os.path.realpath(path))
348 dirname = os.path.join(os.path.dirname(base), "data")
349 filename = os.path.basename(base)
350 python_version, free_threaded = _get_python_info()
351 version_suffix = (
352 f"_{'.'.join(python_version.split('.')[:2])}{'t' if free_threaded else ''}"
353 )
354 return os.path.join(dirname, f"{filename}{version_suffix}{ext}")
355
356
357def save_results_to_csv(
358 results: list[BenchmarkResult[Any]],
359 output_file: str,
360) -> None:
361 """Save benchmark results to a CSV file.
362
363 Flattens the nested BenchmarkResult structure (config + performance metrics)
364 into a flat CSV format. Each row contains both the benchmark configuration
365 fields and the performance metrics.
366
367 Args:
368 results: List of BenchmarkResult instances
369 output_file: Output file path for the CSV file
370 """
371 if not results:
372 raise ValueError("No results to save")
373
374 flattened_results = []
375 for result in results:
376 config_dict = asdict(result.config)
377 # convert bool to int for slight readability improvement of raw CSV file
378 config_dict = {
379 k: (int(v) if isinstance(v, bool) else v) for k, v in config_dict.items()
380 }
381 flattened = {
382 "date": result.date,
383 "python_version": result.python_version,
384 "free_threaded": int(result.free_threaded),
385 **config_dict,
386 "executor_type": result.executor_type,
387 "qps": result.qps,
388 "ci_lower": result.ci_lower,
389 "ci_upper": result.ci_upper,
390 "cpu_percent": result.cpu_percent,
391 }
392 flattened_results.append(flattened)
393
394 # Get all field names from the first result
395 fieldnames = list(flattened_results[0].keys())
396
397 output_path = os.path.realpath(output_file)
398 os.makedirs(os.path.dirname(output_file), exist_ok=True)
399 with open(output_path, "w", newline="") as csvfile:
400 # Write generated marker as first line
401 # Note: Splitting the marker so as to avoid linter consider this file as generated file
402 csvfile.write("# @" "generated\n")
403
404 writer = csv.DictWriter(csvfile, fieldnames=fieldnames)
405 writer.writeheader()
406 for result_dict in flattened_results:
407 writer.writerow(result_dict)
408
409 print(f"Results saved to {output_file}")
410
411
412def load_results_from_csv(
413 input_file: str,
414 config_type: type[ConfigT],
415) -> list[BenchmarkResult[ConfigT]]:
416 """Load benchmark results from a CSV file.
417
418 Reconstructs BenchmarkResult objects from the flattened CSV format created
419 by :py:func:`save_results_to_csv`.
420 Each row in the CSV is parsed into a :py:class:`BenchmarkResult`
421 with the appropriate config type.
422
423 Args:
424 input_file: Input CSV file path
425 config_type: The dataclass type to use for the config field
426
427 Returns:
428 List of BenchmarkResult instances with parsed config objects
429
430 Raises:
431 FileNotFoundError: If input_file does not exist
432 ValueError: If CSV format is invalid or ``config_type`` is not a dataclass
433 """
434 if not hasattr(config_type, "__dataclass_fields__"):
435 raise ValueError(f"config_type must be a dataclass, got {config_type}")
436 fields: dict[str, Any] = config_type.__dataclass_fields__ # pyre-ignore[16]
437
438 # Normalize input path and resolve symbolic links
439 input_file = os.path.realpath(input_file)
440
441 # Get the field names from the config dataclass
442 config_fields = set(fields.keys())
443
444 # Performance metric fields that are part of BenchmarkResult
445 result_fields = {
446 "executor_type",
447 "qps",
448 "ci_lower",
449 "ci_upper",
450 "date",
451 "python_version",
452 "free_threaded",
453 "cpu_percent",
454 }
455
456 results: list[BenchmarkResult[ConfigT]] = []
457
458 TRUES = ("true", "1", "yes")
459
460 with open(input_file, newline="") as csvfile:
461 reader = csv.DictReader((v for v in csvfile if not v.strip().startswith("#")))
462
463 for row in reader:
464 # Split row into config fields and result fields
465 config_dict = {}
466 result_dict = {}
467
468 for key, value in row.items():
469 if key in config_fields:
470 config_dict[key] = value
471 elif key in result_fields:
472 result_dict[key] = value
473 else:
474 # Unknown field - could be from config or result
475 # Try to infer based on whether it matches a config field name
476 config_dict[key] = value
477
478 # Convert string values to appropriate types for config
479 typed_config_dict = {}
480 for field_name, field_info in fields.items():
481 if field_name not in config_dict:
482 continue
483
484 value = config_dict[field_name]
485 field_type = field_info.type
486
487 # Handle type conversions
488 if field_type is int or field_type == "int":
489 typed_config_dict[field_name] = int(value)
490 elif field_type is float or field_type == "float":
491 typed_config_dict[field_name] = float(value)
492 elif field_type is bool or field_type == "bool":
493 typed_config_dict[field_name] = value.lower() in TRUES
494 else:
495 # Keep as string or use the value as-is
496 typed_config_dict[field_name] = value
497
498 result = BenchmarkResult(
499 config=config_type(**typed_config_dict),
500 executor_type=result_dict["executor_type"],
501 qps=float(result_dict["qps"]),
502 ci_lower=float(result_dict["ci_lower"]),
503 ci_upper=float(result_dict["ci_upper"]),
504 date=result_dict["date"],
505 python_version=result_dict["python_version"],
506 free_threaded=result_dict["free_threaded"].lower()
507 in ("true", "1", "yes"),
508 cpu_percent=float(result_dict.get("cpu_percent", 0.0)),
509 )
510
511 results.append(result)
512
513 return results
API Reference¶
Functions
- get_default_result_path(path: str, ext: str = '.csv') str[source]¶
Get the default result path with Python version appended.
- load_results_from_csv(input_file: str, config_type: type[ConfigT]) list[BenchmarkResult[ConfigT]][source]¶
Load benchmark results from a CSV file.
Reconstructs BenchmarkResult objects from the flattened CSV format created by
save_results_to_csv(). Each row in the CSV is parsed into aBenchmarkResultwith the appropriate config type.- Parameters:
input_file – Input CSV file path
config_type – The dataclass type to use for the config field
- Returns:
List of BenchmarkResult instances with parsed config objects
- Raises:
FileNotFoundError – If input_file does not exist
ValueError – If CSV format is invalid or
config_typeis not a dataclass
- save_results_to_csv(results: list[BenchmarkResult[Any]], output_file: str) None[source]¶
Save benchmark results to a CSV file.
Flattens the nested BenchmarkResult structure (config + performance metrics) into a flat CSV format. Each row contains both the benchmark configuration fields and the performance metrics.
- Parameters:
results – List of BenchmarkResult instances
output_file – Output file path for the CSV file
Classes
- class BenchmarkRunner(executor_type: ExecutorType, num_workers: int, warmup_iterations: int | None = None)[source]¶
Runner for executing benchmarks with configurable executors.
This class provides a standardized way to run benchmarks with:
Warmup phase to exclude executor initialization overhead
Multiple runs for statistical confidence intervals
Support for different executor types
The executor is initialized and warmed up in the constructor to exclude initialization overhead from benchmark measurements.
- Parameters:
executor_type – Type of executor to use (
"thread","process", or"interpreter")num_workers – Number of concurrent workers
warmup_iterations – Number of warmup iterations (default:
2 * num_workers)
- property executor_type: ExecutorType[source]¶
Get the executor type.
- run(config: ConfigT, func: Callable[[], T], iterations: int, num_runs: int = 5, confidence_level: float = 0.95) tuple[BenchmarkResult[ConfigT], T][source]¶
Run benchmark and return results with configuration.
- Parameters:
config – Benchmark-specific configuration
func – Function to benchmark (takes no arguments)
iterations – Number of iterations per run
num_runs – Number of benchmark runs for confidence interval calculation (default:
5)confidence_level – Confidence level for interval calculation (default:
0.95)
- Returns:
Tuple of (
BenchmarkResult, last output from function)
- class BenchmarkResult[source]¶
Generic benchmark result containing configuration and performance metrics.
This class holds both the benchmark-specific configuration and the common performance statistics. It is parameterized by the config type, which allows each benchmark script to define its own configuration dataclass.
- config: ConfigT¶
Benchmark-specific configuration (e.g., data format, file size, etc.)
- class ExecutorType[source]¶
Supported executor types for concurrent execution.
- INTERPRETER = 'interpreter'¶
-
Requires Python 3.14+.
- PROCESS = 'process'¶
Use
ProcessPoolExecutor.
- THREAD = 'thread'¶
Use
ThreadPoolExecutor.