Benchmark utils¶
Common utilities for benchmark scripts.
This module provides a standardized framework for running benchmarks with:
Configurable executor types (
ThreadPoolExecutor,ProcessPoolExecutor,InterpreterPoolExecutor)Warmup phase to exclude executor initialization overhead
Statistical analysis with confidence intervals
CSV export functionality
Python version and free-threaded ABI detection
Source¶
Source
Click here to see the source.
1#!/usr/bin/env python3
2# Copyright (c) Meta Platforms, Inc. and affiliates.
3# All rights reserved.
4#
5# This source code is licensed under the BSD-style license found in the
6# LICENSE file in the root directory of this source tree.
7
8"""Common utilities for benchmark scripts.
9
10This module provides a standardized framework for running benchmarks with:
11
12- Configurable executor types (
13 :py:class:`~concurrent.futures.ThreadPoolExecutor`,
14 :py:class:`~concurrent.futures.ProcessPoolExecutor`,
15 :py:class:`~concurrent.futures.InterpreterPoolExecutor`)
16- Warmup phase to exclude executor initialization overhead
17- Statistical analysis with confidence intervals
18- CSV export functionality
19- Python version and free-threaded ABI detection
20
21.. seealso::
22
23 - :doc:`./benchmark_tarfile`
24 - :doc:`./benchmark_wav`
25 - :doc:`./benchmark_numpy`
26
27"""
28
29__all__ = [
30 "BenchmarkRunner",
31 "BenchmarkResult",
32 "ExecutorType",
33 "get_default_result_path",
34 "load_results_from_csv",
35 "save_results_to_csv",
36]
37
38import csv
39import os
40import sys
41import time
42from collections.abc import Callable
43from concurrent.futures import (
44 as_completed,
45 Executor,
46 ProcessPoolExecutor,
47 ThreadPoolExecutor,
48)
49from dataclasses import asdict, dataclass, field
50from datetime import datetime, timezone
51from enum import Enum
52from functools import partial
53from sys import version_info
54from typing import Any, Generic, TypeVar
55
56import numpy as np
57import psutil
58import scipy.stats
59
60T = TypeVar("T")
61ConfigT = TypeVar("ConfigT")
62
63
64def _is_free_threaded() -> bool:
65 """Check if Python is running with free-threaded ABI."""
66 try:
67 return not sys._is_gil_enabled() # pyre-ignore[16]
68 except AttributeError:
69 return False
70
71
72_PYTHON_VERSION = f"{version_info.major}.{version_info.minor}.{version_info.micro}"
73_FREE_THREADED = _is_free_threaded()
74
75
76@dataclass
77class BenchmarkResult(Generic[ConfigT]):
78 """BenchmarkResult()
79
80 Generic benchmark result containing configuration and performance metrics.
81
82 This class holds both the benchmark-specific configuration and the
83 common performance statistics. It is parameterized by the config type,
84 which allows each benchmark script to define its own configuration dataclass.
85 """
86
87 config: ConfigT
88 """Benchmark-specific configuration (e.g., data format, file size, etc.)"""
89
90 executor_type: str
91 """Type of executor used (thread, process, or interpreter)"""
92
93 qps: float
94 """Queries per second (mean)"""
95
96 ci_lower: float
97 """Lower bound of 95% confidence interval for QPS"""
98
99 ci_upper: float
100 """Upper bound of 95% confidence interval for QPS"""
101
102 date: str
103 """When benchmark was run. ISO 8601 format."""
104
105 cpu_percent: float
106 """Average CPU utilization percentage during benchmark execution."""
107
108 python_version: str = field(default=_PYTHON_VERSION)
109 """Python version used for the benchmark"""
110
111 free_threaded: bool = field(default=_FREE_THREADED)
112 """Whether Python is running with free-threaded ABI."""
113
114
115class ExecutorType(Enum):
116 """ExecutorType()
117
118 Supported executor types for concurrent execution."""
119
120 THREAD = "thread"
121 """Use :py:class:`~concurrent.futures.ThreadPoolExecutor`."""
122
123 PROCESS = "process"
124 """Use :py:class:`~concurrent.futures.ProcessPoolExecutor`."""
125
126 INTERPRETER = "interpreter"
127 """Use :py:class:`~concurrent.futures.InterpreterPoolExecutor`.
128
129 Requires Python 3.14+.
130 """
131
132
133def _create_executor(executor_type: ExecutorType, max_workers: int) -> Executor:
134 """Create an executor of the specified type.
135
136 Args:
137 executor_type: Type of executor to create
138 max_workers: Maximum number of workers
139
140 Returns:
141 Executor instance
142
143 Raises:
144 ValueError: If ``executor_type`` is not supported
145 """
146 match executor_type:
147 case ExecutorType.THREAD:
148 return ThreadPoolExecutor(max_workers=max_workers)
149 case ExecutorType.PROCESS:
150 return ProcessPoolExecutor(max_workers=max_workers)
151 case ExecutorType.INTERPRETER:
152 from concurrent.futures import InterpreterPoolExecutor # pyre-ignore[21]
153
154 return InterpreterPoolExecutor(max_workers=max_workers)
155 case _:
156 raise ValueError(f"Unsupported executor type: {executor_type}")
157
158
159def _verify_workers(executor: Executor, expected_workers: int) -> None:
160 """Verify that the executor has created the expected number of workers.
161
162 Args:
163 executor: The executor to verify
164 expected_workers: Expected number of workers
165
166 Raises:
167 RuntimeError: If the number of workers doesn't match expected
168 """
169 match executor:
170 case ThreadPoolExecutor():
171 actual_workers = len(executor._threads)
172 case ProcessPoolExecutor():
173 actual_workers = len(executor._processes)
174 case _:
175 raise ValueError(f"Unexpected executor type {type(executor)}")
176
177 if actual_workers != expected_workers:
178 raise RuntimeError(
179 f"Expected {expected_workers} workers, but executor has {actual_workers}"
180 )
181
182
183def _warmup_executor(
184 executor: Executor, func: Callable[[], T], num_iterations: int
185) -> T:
186 """Warmup the executor by running the function multiple times.
187
188 Args:
189 executor: The executor to warmup
190 func: Function to run for warmup
191 num_iterations: Number of warmup iterations
192
193 Returns:
194 Output from the last warmup iteration
195 """
196 futures = [executor.submit(func) for _ in range(num_iterations)]
197 last_output: T | None = None
198 for future in as_completed(futures):
199 last_output = future.result()
200 return last_output # pyre-ignore[7]
201
202
203class BenchmarkRunner:
204 """Runner for executing benchmarks with configurable executors.
205
206 This class provides a standardized way to run benchmarks with:
207
208 - Warmup phase to exclude executor initialization overhead
209 - Multiple runs for statistical confidence intervals
210 - Support for different executor types
211
212 The executor is initialized and warmed up in the constructor to exclude
213 initialization overhead from benchmark measurements.
214
215 Args:
216 executor_type: Type of executor to use
217 (``"thread"``, ``"process"``, or ``"interpreter"``)
218 num_workers: Number of concurrent workers
219 warmup_iterations: Number of warmup iterations (default: ``2 * num_workers``)
220 """
221
222 def __init__(
223 self,
224 executor_type: ExecutorType,
225 num_workers: int,
226 warmup_iterations: int | None = None,
227 ) -> None:
228 self._executor_type: ExecutorType = executor_type
229
230 warmup_iters = (
231 warmup_iterations if warmup_iterations is not None else 2 * num_workers
232 )
233
234 self._executor: Executor = _create_executor(executor_type, num_workers)
235
236 _warmup_executor(self._executor, partial(time.sleep, 1), warmup_iters)
237 _verify_workers(self._executor, num_workers)
238
239 @property
240 def executor_type(self) -> ExecutorType:
241 """Get the executor type."""
242 return self._executor_type
243
244 def __enter__(self) -> "BenchmarkRunner":
245 """Enter context manager."""
246 return self
247
248 def __exit__(self, exc_type: Any, exc_val: Any, exc_tb: Any) -> None:
249 """Exit context manager and shutdown executor."""
250 self._executor.shutdown(wait=True)
251
252 def _run_iterations(
253 self,
254 func: Callable[[], T],
255 iterations: int,
256 num_runs: int,
257 ) -> tuple[list[float], list[float], T]:
258 """Run benchmark iterations and collect QPS and CPU utilization samples.
259
260 Args:
261 func: Function to benchmark (takes no arguments)
262 iterations: Number of iterations per run
263 num_runs: Number of benchmark runs
264
265 Returns:
266 Tuple of (list of QPS samples, list of CPU percent samples, last function output)
267 """
268 qps_samples: list[float] = []
269 cpu_samples: list[float] = []
270 last_output: T | None = None
271
272 process = psutil.Process()
273
274 for _ in range(num_runs):
275 process.cpu_percent()
276 t0 = time.perf_counter()
277 futures = [self._executor.submit(func) for _ in range(iterations)]
278 for future in as_completed(futures):
279 last_output = future.result()
280 elapsed = time.perf_counter() - t0
281 cpu_percent = process.cpu_percent()
282 qps_samples.append(iterations / elapsed)
283 cpu_samples.append(cpu_percent / iterations)
284
285 return qps_samples, cpu_samples, last_output # pyre-ignore[7]
286
287 def run(
288 self,
289 config: ConfigT,
290 func: Callable[[], T],
291 iterations: int,
292 num_runs: int = 5,
293 confidence_level: float = 0.95,
294 ) -> tuple[BenchmarkResult[ConfigT], T]:
295 """Run benchmark and return results with configuration.
296
297 Args:
298 config: Benchmark-specific configuration
299 func: Function to benchmark (takes no arguments)
300 iterations: Number of iterations per run
301 num_runs: Number of benchmark runs for confidence interval calculation
302 (default: ``5``)
303 confidence_level: Confidence level for interval calculation (default: ``0.95``)
304
305 Returns:
306 Tuple of (``BenchmarkResult``, last output from function)
307 """
308 qps_samples, cpu_samples, last_output = self._run_iterations(
309 func, iterations, num_runs
310 )
311
312 qps_mean = np.mean(qps_samples)
313 qps_std = np.std(qps_samples, ddof=1)
314 degrees_freedom = num_runs - 1
315 confidence_interval = scipy.stats.t.interval(
316 confidence_level,
317 degrees_freedom,
318 loc=qps_mean,
319 scale=qps_std / np.sqrt(num_runs),
320 )
321
322 cpu_mean = np.mean(cpu_samples)
323
324 date = datetime.now(timezone.utc).isoformat()
325
326 result = BenchmarkResult(
327 config=config,
328 executor_type=self.executor_type.value,
329 qps=float(qps_mean),
330 ci_lower=float(confidence_interval[0]),
331 ci_upper=float(confidence_interval[1]),
332 date=date,
333 cpu_percent=float(cpu_mean),
334 )
335
336 return result, last_output
337
338
339def get_default_result_path(path: str, ext: str = ".csv") -> str:
340 """Get the default result path with Python version appended."""
341 base, _ = os.path.splitext(os.path.realpath(path))
342 dirname = os.path.join(os.path.dirname(base), "data")
343 filename = os.path.basename(base)
344 version_suffix = (
345 f"_{'.'.join(_PYTHON_VERSION.split('.')[:2])}{'t' if _FREE_THREADED else ''}"
346 )
347 return os.path.join(dirname, f"{filename}{version_suffix}{ext}")
348
349
350def save_results_to_csv(
351 results: list[BenchmarkResult[Any]],
352 output_file: str,
353) -> None:
354 """Save benchmark results to a CSV file.
355
356 Flattens the nested BenchmarkResult structure (config + performance metrics)
357 into a flat CSV format. Each row contains both the benchmark configuration
358 fields and the performance metrics.
359
360 Args:
361 results: List of BenchmarkResult instances
362 output_file: Output file path for the CSV file
363 """
364 if not results:
365 raise ValueError("No results to save")
366
367 flattened_results = []
368 for result in results:
369 config_dict = asdict(result.config)
370 # convert bool to int for slight readability improvement of raw CSV file
371 config_dict = {
372 k: (int(v) if isinstance(v, bool) else v) for k, v in config_dict.items()
373 }
374 flattened = {
375 "date": result.date,
376 "python_version": result.python_version,
377 "free_threaded": int(result.free_threaded),
378 **config_dict,
379 "executor_type": result.executor_type,
380 "qps": result.qps,
381 "ci_lower": result.ci_lower,
382 "ci_upper": result.ci_upper,
383 "cpu_percent": result.cpu_percent,
384 }
385 flattened_results.append(flattened)
386
387 # Get all field names from the first result
388 fieldnames = list(flattened_results[0].keys())
389
390 output_path = os.path.realpath(output_file)
391 os.makedirs(os.path.dirname(output_file), exist_ok=True)
392 with open(output_path, "w", newline="") as csvfile:
393 # Write generated marker as first line
394 # Note: Splitting the marker so as to avoid linter consider this file as generated file
395 csvfile.write("# @")
396 csvfile.write("generated\n")
397
398 writer = csv.DictWriter(csvfile, fieldnames=fieldnames)
399 writer.writeheader()
400 for result_dict in flattened_results:
401 writer.writerow(result_dict)
402
403 print(f"Results saved to {output_file}")
404
405
406def load_results_from_csv(
407 input_file: str,
408 config_type: type[ConfigT],
409) -> list[BenchmarkResult[ConfigT]]:
410 """Load benchmark results from a CSV file.
411
412 Reconstructs BenchmarkResult objects from the flattened CSV format created
413 by :py:func:`save_results_to_csv`.
414 Each row in the CSV is parsed into a :py:class:`BenchmarkResult`
415 with the appropriate config type.
416
417 Args:
418 input_file: Input CSV file path
419 config_type: The dataclass type to use for the config field
420
421 Returns:
422 List of BenchmarkResult instances with parsed config objects
423
424 Raises:
425 FileNotFoundError: If input_file does not exist
426 ValueError: If CSV format is invalid or ``config_type`` is not a dataclass
427 """
428 if not hasattr(config_type, "__dataclass_fields__"):
429 raise ValueError(f"config_type must be a dataclass, got {config_type}")
430 fields: dict[str, Any] = config_type.__dataclass_fields__ # pyre-ignore[16]
431
432 # Normalize input path and resolve symbolic links
433 input_file = os.path.realpath(input_file)
434
435 # Get the field names from the config dataclass
436 config_fields = set(fields.keys())
437
438 # Performance metric fields that are part of BenchmarkResult
439 result_fields = {
440 "executor_type",
441 "qps",
442 "ci_lower",
443 "ci_upper",
444 "date",
445 "python_version",
446 "free_threaded",
447 "cpu_percent",
448 }
449
450 results: list[BenchmarkResult[ConfigT]] = []
451
452 TRUES = ("true", "1", "yes")
453
454 with open(input_file, newline="") as csvfile:
455 reader = csv.DictReader((v for v in csvfile if not v.strip().startswith("#")))
456
457 for row in reader:
458 # Split row into config fields and result fields
459 config_dict = {}
460 result_dict = {}
461
462 for key, value in row.items():
463 if key in config_fields:
464 config_dict[key] = value
465 elif key in result_fields:
466 result_dict[key] = value
467 else:
468 # Unknown field - could be from config or result
469 # Try to infer based on whether it matches a config field name
470 config_dict[key] = value
471
472 # Convert string values to appropriate types for config
473 typed_config_dict = {}
474 for field_name, field_info in fields.items():
475 if field_name not in config_dict:
476 continue
477
478 value = config_dict[field_name]
479 field_type = field_info.type
480
481 # Handle type conversions
482 if field_type is int or field_type == "int":
483 typed_config_dict[field_name] = int(value)
484 elif field_type is float or field_type == "float":
485 typed_config_dict[field_name] = float(value)
486 elif field_type is bool or field_type == "bool":
487 typed_config_dict[field_name] = value.lower() in TRUES
488 else:
489 # Keep as string or use the value as-is
490 typed_config_dict[field_name] = value
491
492 result = BenchmarkResult(
493 config=config_type(**typed_config_dict),
494 executor_type=result_dict["executor_type"],
495 qps=float(result_dict["qps"]),
496 ci_lower=float(result_dict["ci_lower"]),
497 ci_upper=float(result_dict["ci_upper"]),
498 date=result_dict["date"],
499 python_version=result_dict["python_version"],
500 free_threaded=result_dict["free_threaded"].lower()
501 in ("true", "1", "yes"),
502 cpu_percent=float(result_dict.get("cpu_percent", 0.0)),
503 )
504
505 results.append(result)
506
507 return results
API Reference¶
Functions
- get_default_result_path(path: str, ext: str = '.csv') str[source]¶
Get the default result path with Python version appended.
- load_results_from_csv(input_file: str, config_type: type[ConfigT]) list[BenchmarkResult[ConfigT]][source]¶
Load benchmark results from a CSV file.
Reconstructs BenchmarkResult objects from the flattened CSV format created by
save_results_to_csv(). Each row in the CSV is parsed into aBenchmarkResultwith the appropriate config type.- Parameters:
input_file – Input CSV file path
config_type – The dataclass type to use for the config field
- Returns:
List of BenchmarkResult instances with parsed config objects
- Raises:
FileNotFoundError – If input_file does not exist
ValueError – If CSV format is invalid or
config_typeis not a dataclass
- save_results_to_csv(results: list[BenchmarkResult[Any]], output_file: str) None[source]¶
Save benchmark results to a CSV file.
Flattens the nested BenchmarkResult structure (config + performance metrics) into a flat CSV format. Each row contains both the benchmark configuration fields and the performance metrics.
- Parameters:
results – List of BenchmarkResult instances
output_file – Output file path for the CSV file
Classes
- class BenchmarkRunner(executor_type: ExecutorType, num_workers: int, warmup_iterations: int | None = None)[source]¶
Runner for executing benchmarks with configurable executors.
This class provides a standardized way to run benchmarks with:
Warmup phase to exclude executor initialization overhead
Multiple runs for statistical confidence intervals
Support for different executor types
The executor is initialized and warmed up in the constructor to exclude initialization overhead from benchmark measurements.
- Parameters:
executor_type – Type of executor to use (
"thread","process", or"interpreter")num_workers – Number of concurrent workers
warmup_iterations – Number of warmup iterations (default:
2 * num_workers)
- property executor_type: ExecutorType[source]¶
Get the executor type.
- run(config: ConfigT, func: Callable[[], T], iterations: int, num_runs: int = 5, confidence_level: float = 0.95) tuple[BenchmarkResult[ConfigT], T][source]¶
Run benchmark and return results with configuration.
- Parameters:
config – Benchmark-specific configuration
func – Function to benchmark (takes no arguments)
iterations – Number of iterations per run
num_runs – Number of benchmark runs for confidence interval calculation (default:
5)confidence_level – Confidence level for interval calculation (default:
0.95)
- Returns:
Tuple of (
BenchmarkResult, last output from function)
- class BenchmarkResult[source]¶
Generic benchmark result containing configuration and performance metrics.
This class holds both the benchmark-specific configuration and the common performance statistics. It is parameterized by the config type, which allows each benchmark script to define its own configuration dataclass.
- config: ConfigT¶
Benchmark-specific configuration (e.g., data format, file size, etc.)
- class ExecutorType[source]¶
Supported executor types for concurrent execution.
- INTERPRETER = 'interpreter'¶
-
Requires Python 3.14+.
- PROCESS = 'process'¶
Use
ProcessPoolExecutor.
- THREAD = 'thread'¶
Use
ThreadPoolExecutor.