Benchmark utils¶
Common utilities for benchmark scripts.
This module provides a standardized framework for running benchmarks with:
Configurable executor types (
ThreadPoolExecutor,ProcessPoolExecutor,InterpreterPoolExecutor)Warmup phase to exclude executor initialization overhead
Statistical analysis with confidence intervals
CSV export functionality
Python version and free-threaded ABI detection
Source¶
Source
Click here to see the source.
1#!/usr/bin/env python3
2# Copyright (c) Meta Platforms, Inc. and affiliates.
3# All rights reserved.
4#
5# This source code is licensed under the BSD-style license found in the
6# LICENSE file in the root directory of this source tree.
7
8"""Common utilities for benchmark scripts.
9
10This module provides a standardized framework for running benchmarks with:
11
12- Configurable executor types (
13 :py:class:`~concurrent.futures.ThreadPoolExecutor`,
14 :py:class:`~concurrent.futures.ProcessPoolExecutor`,
15 :py:class:`~concurrent.futures.InterpreterPoolExecutor`)
16- Warmup phase to exclude executor initialization overhead
17- Statistical analysis with confidence intervals
18- CSV export functionality
19- Python version and free-threaded ABI detection
20
21.. seealso::
22
23 - :doc:`./benchmark_tarfile`
24 - :doc:`./benchmark_wav`
25 - :doc:`./benchmark_numpy`
26
27"""
28
29__all__ = [
30 "BenchmarkRunner",
31 "BenchmarkResult",
32 "ExecutorType",
33 "get_default_result_path",
34 "load_results_from_csv",
35 "save_results_to_csv",
36]
37
38import csv
39import os
40import sys
41import time
42from collections.abc import Callable
43from concurrent.futures import (
44 as_completed,
45 Executor,
46 ProcessPoolExecutor,
47 ThreadPoolExecutor,
48)
49from dataclasses import asdict, dataclass
50from datetime import datetime, timezone
51from enum import Enum
52from functools import partial
53from typing import Any, Generic, TypeVar
54
55import numpy as np
56import psutil
57import scipy.stats
58
59T = TypeVar("T")
60ConfigT = TypeVar("ConfigT")
61
62
63@dataclass
64class BenchmarkResult(Generic[ConfigT]):
65 """BenchmarkResult()
66
67 Generic benchmark result containing configuration and performance metrics.
68
69 This class holds both the benchmark-specific configuration and the
70 common performance statistics. It is parameterized by the config type,
71 which allows each benchmark script to define its own configuration dataclass.
72 """
73
74 config: ConfigT
75 """Benchmark-specific configuration (e.g., data format, file size, etc.)"""
76
77 executor_type: str
78 """Type of executor used (thread, process, or interpreter)"""
79
80 qps: float
81 """Queries per second (mean)"""
82
83 ci_lower: float
84 """Lower bound of 95% confidence interval for QPS"""
85
86 ci_upper: float
87 """Upper bound of 95% confidence interval for QPS"""
88
89 date: str
90 """When benchmark was run. ISO 8601 format."""
91
92 python_version: str
93 """Python version used for the benchmark"""
94
95 free_threaded: bool
96 """Whether Python is running with free-threaded ABI."""
97
98 cpu_percent: float
99 """Average CPU utilization percentage during benchmark execution."""
100
101
102class ExecutorType(Enum):
103 """ExecutorType()
104
105 Supported executor types for concurrent execution."""
106
107 THREAD = "thread"
108 """Use :py:class:`~concurrent.futures.ThreadPoolExecutor`."""
109
110 PROCESS = "process"
111 """Use :py:class:`~concurrent.futures.ProcessPoolExecutor`."""
112
113 INTERPRETER = "interpreter"
114 """Use :py:class:`~concurrent.futures.InterpreterPoolExecutor`.
115
116 Requires Python 3.14+.
117 """
118
119
120def _get_python_info() -> tuple[str, bool]:
121 """Get Python version and free-threaded ABI information.
122
123 Returns:
124 Tuple of (``python_version``, ``is_free_threaded``)
125 """
126 python_version = (
127 f"{sys.version_info.major}.{sys.version_info.minor}.{sys.version_info.micro}"
128 )
129 try:
130 is_free_threaded = not sys._is_gil_enabled() # pyre-ignore[16]
131 except AttributeError:
132 is_free_threaded = False
133 return python_version, is_free_threaded
134
135
136def _create_executor(executor_type: ExecutorType, max_workers: int) -> Executor:
137 """Create an executor of the specified type.
138
139 Args:
140 executor_type: Type of executor to create
141 max_workers: Maximum number of workers
142
143 Returns:
144 Executor instance
145
146 Raises:
147 ValueError: If ``executor_type`` is not supported
148 """
149 match executor_type:
150 case ExecutorType.THREAD:
151 return ThreadPoolExecutor(max_workers=max_workers)
152 case ExecutorType.PROCESS:
153 return ProcessPoolExecutor(max_workers=max_workers)
154 case ExecutorType.INTERPRETER:
155 from concurrent.futures import InterpreterPoolExecutor # pyre-ignore[21]
156
157 return InterpreterPoolExecutor(max_workers=max_workers)
158 case _:
159 raise ValueError(f"Unsupported executor type: {executor_type}")
160
161
162def _verify_workers(executor: Executor, expected_workers: int) -> None:
163 """Verify that the executor has created the expected number of workers.
164
165 Args:
166 executor: The executor to verify
167 expected_workers: Expected number of workers
168
169 Raises:
170 RuntimeError: If the number of workers doesn't match expected
171 """
172 match executor:
173 case ThreadPoolExecutor():
174 actual_workers = len(executor._threads)
175 case ProcessPoolExecutor():
176 actual_workers = len(executor._processes)
177 case _:
178 raise ValueError(f"Unexpected executor type {type(executor)}")
179
180 if actual_workers != expected_workers:
181 raise RuntimeError(
182 f"Expected {expected_workers} workers, but executor has {actual_workers}"
183 )
184
185
186def _warmup_executor(
187 executor: Executor, func: Callable[[], T], num_iterations: int
188) -> T:
189 """Warmup the executor by running the function multiple times.
190
191 Args:
192 executor: The executor to warmup
193 func: Function to run for warmup
194 num_iterations: Number of warmup iterations
195
196 Returns:
197 Output from the last warmup iteration
198 """
199 futures = [executor.submit(func) for _ in range(num_iterations)]
200 last_output: T | None = None
201 for future in as_completed(futures):
202 last_output = future.result()
203 return last_output # pyre-ignore[7]
204
205
206class BenchmarkRunner:
207 """Runner for executing benchmarks with configurable executors.
208
209 This class provides a standardized way to run benchmarks with:
210
211 - Warmup phase to exclude executor initialization overhead
212 - Multiple runs for statistical confidence intervals
213 - Support for different executor types
214
215 The executor is initialized and warmed up in the constructor to exclude
216 initialization overhead from benchmark measurements.
217
218 Args:
219 executor_type: Type of executor to use
220 (``"thread"``, ``"process"``, or ``"interpreter"``)
221 num_workers: Number of concurrent workers
222 warmup_iterations: Number of warmup iterations (default: ``2 * num_workers``)
223 """
224
225 def __init__(
226 self,
227 executor_type: ExecutorType,
228 num_workers: int,
229 warmup_iterations: int | None = None,
230 ) -> None:
231 self._executor_type: ExecutorType = executor_type
232
233 warmup_iters = (
234 warmup_iterations if warmup_iterations is not None else 2 * num_workers
235 )
236
237 self._executor: Executor = _create_executor(executor_type, num_workers)
238
239 _warmup_executor(self._executor, partial(time.sleep, 1), warmup_iters)
240 _verify_workers(self._executor, num_workers)
241
242 @property
243 def executor_type(self) -> ExecutorType:
244 """Get the executor type."""
245 return self._executor_type
246
247 def __enter__(self) -> "BenchmarkRunner":
248 """Enter context manager."""
249 return self
250
251 def __exit__(self, exc_type: Any, exc_val: Any, exc_tb: Any) -> None:
252 """Exit context manager and shutdown executor."""
253 self._executor.shutdown(wait=True)
254
255 def _run_iterations(
256 self,
257 func: Callable[[], T],
258 iterations: int,
259 num_runs: int,
260 ) -> tuple[list[float], list[float], T]:
261 """Run benchmark iterations and collect QPS and CPU utilization samples.
262
263 Args:
264 func: Function to benchmark (takes no arguments)
265 iterations: Number of iterations per run
266 num_runs: Number of benchmark runs
267
268 Returns:
269 Tuple of (list of QPS samples, list of CPU percent samples, last function output)
270 """
271 qps_samples: list[float] = []
272 cpu_samples: list[float] = []
273 last_output: T | None = None
274
275 process = psutil.Process()
276
277 for _ in range(num_runs):
278 process.cpu_percent()
279 t0 = time.perf_counter()
280 futures = [self._executor.submit(func) for _ in range(iterations)]
281 for future in as_completed(futures):
282 last_output = future.result()
283 elapsed = time.perf_counter() - t0
284 cpu_percent = process.cpu_percent()
285 qps_samples.append(iterations / elapsed)
286 cpu_samples.append(cpu_percent / iterations)
287
288 return qps_samples, cpu_samples, last_output # pyre-ignore[7]
289
290 def run(
291 self,
292 config: ConfigT,
293 func: Callable[[], T],
294 iterations: int,
295 num_runs: int = 5,
296 confidence_level: float = 0.95,
297 ) -> tuple[BenchmarkResult[ConfigT], T]:
298 """Run benchmark and return results with configuration.
299
300 Args:
301 config: Benchmark-specific configuration
302 func: Function to benchmark (takes no arguments)
303 iterations: Number of iterations per run
304 num_runs: Number of benchmark runs for confidence interval calculation
305 (default: ``5``)
306 confidence_level: Confidence level for interval calculation (default: ``0.95``)
307
308 Returns:
309 Tuple of (``BenchmarkResult``, last output from function)
310 """
311 qps_samples, cpu_samples, last_output = self._run_iterations(
312 func, iterations, num_runs
313 )
314
315 qps_mean = np.mean(qps_samples)
316 qps_std = np.std(qps_samples, ddof=1)
317 degrees_freedom = num_runs - 1
318 confidence_interval = scipy.stats.t.interval(
319 confidence_level,
320 degrees_freedom,
321 loc=qps_mean,
322 scale=qps_std / np.sqrt(num_runs),
323 )
324
325 cpu_mean = np.mean(cpu_samples)
326
327 python_version, free_threaded = _get_python_info()
328 date = datetime.now(timezone.utc).isoformat()
329
330 result = BenchmarkResult(
331 config=config,
332 executor_type=self.executor_type.value,
333 qps=float(qps_mean),
334 ci_lower=float(confidence_interval[0]),
335 ci_upper=float(confidence_interval[1]),
336 date=date,
337 python_version=python_version,
338 free_threaded=free_threaded,
339 cpu_percent=float(cpu_mean),
340 )
341
342 return result, last_output
343
344
345def get_default_result_path(path: str, ext: str = ".csv") -> str:
346 """Get the default result path with Python version appended."""
347 base, _ = os.path.splitext(os.path.realpath(path))
348 dirname = os.path.join(os.path.dirname(base), "data")
349 filename = os.path.basename(base)
350 python_version, free_threaded = _get_python_info()
351 version_suffix = (
352 f"_{'.'.join(python_version.split('.')[:2])}{'t' if free_threaded else ''}"
353 )
354 return os.path.join(dirname, f"{filename}{version_suffix}{ext}")
355
356
357def save_results_to_csv(
358 results: list[BenchmarkResult[Any]],
359 output_file: str,
360) -> None:
361 """Save benchmark results to a CSV file.
362
363 Flattens the nested BenchmarkResult structure (config + performance metrics)
364 into a flat CSV format. Each row contains both the benchmark configuration
365 fields and the performance metrics.
366
367 Args:
368 results: List of BenchmarkResult instances
369 output_file: Output file path for the CSV file
370 """
371 if not results:
372 raise ValueError("No results to save")
373
374 flattened_results = []
375 for result in results:
376 config_dict = asdict(result.config)
377 # convert bool to int for slight readability improvement of raw CSV file
378 config_dict = {
379 k: (int(v) if isinstance(v, bool) else v) for k, v in config_dict.items()
380 }
381 flattened = {
382 "date": result.date,
383 "python_version": result.python_version,
384 "free_threaded": int(result.free_threaded),
385 **config_dict,
386 "executor_type": result.executor_type,
387 "qps": result.qps,
388 "ci_lower": result.ci_lower,
389 "ci_upper": result.ci_upper,
390 "cpu_percent": result.cpu_percent,
391 }
392 flattened_results.append(flattened)
393
394 # Get all field names from the first result
395 fieldnames = list(flattened_results[0].keys())
396
397 output_path = os.path.realpath(output_file)
398 os.makedirs(os.path.dirname(output_file), exist_ok=True)
399 with open(output_path, "w", newline="") as csvfile:
400 # Write generated marker as first line
401 # Note: Splitting the marker so as to avoid linter consider this file as generated file
402 csvfile.write("# @")
403 csvfile.write("generated\n")
404
405 writer = csv.DictWriter(csvfile, fieldnames=fieldnames)
406 writer.writeheader()
407 for result_dict in flattened_results:
408 writer.writerow(result_dict)
409
410 print(f"Results saved to {output_file}")
411
412
413def load_results_from_csv(
414 input_file: str,
415 config_type: type[ConfigT],
416) -> list[BenchmarkResult[ConfigT]]:
417 """Load benchmark results from a CSV file.
418
419 Reconstructs BenchmarkResult objects from the flattened CSV format created
420 by :py:func:`save_results_to_csv`.
421 Each row in the CSV is parsed into a :py:class:`BenchmarkResult`
422 with the appropriate config type.
423
424 Args:
425 input_file: Input CSV file path
426 config_type: The dataclass type to use for the config field
427
428 Returns:
429 List of BenchmarkResult instances with parsed config objects
430
431 Raises:
432 FileNotFoundError: If input_file does not exist
433 ValueError: If CSV format is invalid or ``config_type`` is not a dataclass
434 """
435 if not hasattr(config_type, "__dataclass_fields__"):
436 raise ValueError(f"config_type must be a dataclass, got {config_type}")
437 fields: dict[str, Any] = config_type.__dataclass_fields__ # pyre-ignore[16]
438
439 # Normalize input path and resolve symbolic links
440 input_file = os.path.realpath(input_file)
441
442 # Get the field names from the config dataclass
443 config_fields = set(fields.keys())
444
445 # Performance metric fields that are part of BenchmarkResult
446 result_fields = {
447 "executor_type",
448 "qps",
449 "ci_lower",
450 "ci_upper",
451 "date",
452 "python_version",
453 "free_threaded",
454 "cpu_percent",
455 }
456
457 results: list[BenchmarkResult[ConfigT]] = []
458
459 TRUES = ("true", "1", "yes")
460
461 with open(input_file, newline="") as csvfile:
462 reader = csv.DictReader((v for v in csvfile if not v.strip().startswith("#")))
463
464 for row in reader:
465 # Split row into config fields and result fields
466 config_dict = {}
467 result_dict = {}
468
469 for key, value in row.items():
470 if key in config_fields:
471 config_dict[key] = value
472 elif key in result_fields:
473 result_dict[key] = value
474 else:
475 # Unknown field - could be from config or result
476 # Try to infer based on whether it matches a config field name
477 config_dict[key] = value
478
479 # Convert string values to appropriate types for config
480 typed_config_dict = {}
481 for field_name, field_info in fields.items():
482 if field_name not in config_dict:
483 continue
484
485 value = config_dict[field_name]
486 field_type = field_info.type
487
488 # Handle type conversions
489 if field_type is int or field_type == "int":
490 typed_config_dict[field_name] = int(value)
491 elif field_type is float or field_type == "float":
492 typed_config_dict[field_name] = float(value)
493 elif field_type is bool or field_type == "bool":
494 typed_config_dict[field_name] = value.lower() in TRUES
495 else:
496 # Keep as string or use the value as-is
497 typed_config_dict[field_name] = value
498
499 result = BenchmarkResult(
500 config=config_type(**typed_config_dict),
501 executor_type=result_dict["executor_type"],
502 qps=float(result_dict["qps"]),
503 ci_lower=float(result_dict["ci_lower"]),
504 ci_upper=float(result_dict["ci_upper"]),
505 date=result_dict["date"],
506 python_version=result_dict["python_version"],
507 free_threaded=result_dict["free_threaded"].lower()
508 in ("true", "1", "yes"),
509 cpu_percent=float(result_dict.get("cpu_percent", 0.0)),
510 )
511
512 results.append(result)
513
514 return results
API Reference¶
Functions
- get_default_result_path(path: str, ext: str = '.csv') str[source]¶
Get the default result path with Python version appended.
- load_results_from_csv(input_file: str, config_type: type[ConfigT]) list[BenchmarkResult[ConfigT]][source]¶
Load benchmark results from a CSV file.
Reconstructs BenchmarkResult objects from the flattened CSV format created by
save_results_to_csv(). Each row in the CSV is parsed into aBenchmarkResultwith the appropriate config type.- Parameters:
input_file – Input CSV file path
config_type – The dataclass type to use for the config field
- Returns:
List of BenchmarkResult instances with parsed config objects
- Raises:
FileNotFoundError – If input_file does not exist
ValueError – If CSV format is invalid or
config_typeis not a dataclass
- save_results_to_csv(results: list[BenchmarkResult[Any]], output_file: str) None[source]¶
Save benchmark results to a CSV file.
Flattens the nested BenchmarkResult structure (config + performance metrics) into a flat CSV format. Each row contains both the benchmark configuration fields and the performance metrics.
- Parameters:
results – List of BenchmarkResult instances
output_file – Output file path for the CSV file
Classes
- class BenchmarkRunner(executor_type: ExecutorType, num_workers: int, warmup_iterations: int | None = None)[source]¶
Runner for executing benchmarks with configurable executors.
This class provides a standardized way to run benchmarks with:
Warmup phase to exclude executor initialization overhead
Multiple runs for statistical confidence intervals
Support for different executor types
The executor is initialized and warmed up in the constructor to exclude initialization overhead from benchmark measurements.
- Parameters:
executor_type – Type of executor to use (
"thread","process", or"interpreter")num_workers – Number of concurrent workers
warmup_iterations – Number of warmup iterations (default:
2 * num_workers)
- property executor_type: ExecutorType[source]¶
Get the executor type.
- run(config: ConfigT, func: Callable[[], T], iterations: int, num_runs: int = 5, confidence_level: float = 0.95) tuple[BenchmarkResult[ConfigT], T][source]¶
Run benchmark and return results with configuration.
- Parameters:
config – Benchmark-specific configuration
func – Function to benchmark (takes no arguments)
iterations – Number of iterations per run
num_runs – Number of benchmark runs for confidence interval calculation (default:
5)confidence_level – Confidence level for interval calculation (default:
0.95)
- Returns:
Tuple of (
BenchmarkResult, last output from function)
- class BenchmarkResult[source]¶
Generic benchmark result containing configuration and performance metrics.
This class holds both the benchmark-specific configuration and the common performance statistics. It is parameterized by the config type, which allows each benchmark script to define its own configuration dataclass.
- config: ConfigT¶
Benchmark-specific configuration (e.g., data format, file size, etc.)
- class ExecutorType[source]¶
Supported executor types for concurrent execution.
- INTERPRETER = 'interpreter'¶
-
Requires Python 3.14+.
- PROCESS = 'process'¶
Use
ProcessPoolExecutor.
- THREAD = 'thread'¶
Use
ThreadPoolExecutor.