Benchmark utils¶
Common utilities for benchmark scripts.
This module provides a standardized framework for running benchmarks with:
Configurable executor types (
ThreadPoolExecutor,ProcessPoolExecutor,InterpreterPoolExecutor)Warmup phase to exclude executor initialization overhead
Statistical analysis with confidence intervals
CSV export functionality
Python version and free-threaded ABI detection
Source¶
Source
Click here to see the source.
1#!/usr/bin/env python3
2# Copyright (c) Meta Platforms, Inc. and affiliates.
3# All rights reserved.
4#
5# This source code is licensed under the BSD-style license found in the
6# LICENSE file in the root directory of this source tree.
7
8"""Common utilities for benchmark scripts.
9
10This module provides a standardized framework for running benchmarks with:
11
12- Configurable executor types (
13 :py:class:`~concurrent.futures.ThreadPoolExecutor`,
14 :py:class:`~concurrent.futures.ProcessPoolExecutor`,
15 :py:class:`~concurrent.futures.InterpreterPoolExecutor`)
16- Warmup phase to exclude executor initialization overhead
17- Statistical analysis with confidence intervals
18- CSV export functionality
19- Python version and free-threaded ABI detection
20
21.. seealso::
22
23 - :doc:`./benchmark_tarfile`
24 - :doc:`./benchmark_wav`
25 - :doc:`./benchmark_numpy`
26
27"""
28
29__all__ = [
30 "BenchmarkRunner",
31 "BenchmarkResult",
32 "ExecutorType",
33 "get_default_result_path",
34 "load_results_from_csv",
35 "save_results_to_csv",
36]
37
38import csv
39import os
40import sys
41import time
42from collections.abc import Callable
43from concurrent.futures import (
44 as_completed,
45 Executor,
46 ProcessPoolExecutor,
47 ThreadPoolExecutor,
48)
49from dataclasses import asdict, dataclass, field
50from datetime import datetime, timezone
51from enum import Enum
52from functools import partial
53from sys import version_info
54from typing import Any, Generic, TypeVar
55
56import numpy as np
57import psutil
58import scipy.stats
59
60T = TypeVar("T")
61ConfigT = TypeVar("ConfigT")
62
63
64def _is_free_threaded() -> bool:
65 """Check if Python is running with free-threaded ABI."""
66 try:
67 return not sys._is_gil_enabled() # pyre-ignore[16]
68 except AttributeError:
69 return False
70
71
72_PYTHON_VERSION = f"{version_info.major}.{version_info.minor}.{version_info.micro}"
73_FREE_THREADED = _is_free_threaded()
74
75
76@dataclass
77class BenchmarkResult(Generic[ConfigT]):
78 """BenchmarkResult()
79
80 Generic benchmark result containing configuration and performance metrics.
81
82 This class holds both the benchmark-specific configuration and the
83 common performance statistics. It is parameterized by the config type,
84 which allows each benchmark script to define its own configuration dataclass.
85 """
86
87 config: ConfigT
88 """Benchmark-specific configuration (e.g., data format, file size, etc.)"""
89
90 executor_type: str
91 """Type of executor used (thread, process, or interpreter)"""
92
93 qps: float
94 """Queries per second (mean)"""
95
96 ci_lower: float
97 """Lower bound of 95% confidence interval for QPS"""
98
99 ci_upper: float
100 """Upper bound of 95% confidence interval for QPS"""
101
102 date: str
103 """When benchmark was run. ISO 8601 format."""
104
105 cpu_percent: float
106 """Average CPU utilization percentage during benchmark execution."""
107
108 python_version: str = field(default=_PYTHON_VERSION)
109 """Python version used for the benchmark"""
110
111 free_threaded: bool = field(default=_FREE_THREADED)
112 """Whether Python is running with free-threaded ABI."""
113
114
115class ExecutorType(Enum):
116 """ExecutorType()
117
118 Supported executor types for concurrent execution."""
119
120 THREAD = "thread"
121 """Use :py:class:`~concurrent.futures.ThreadPoolExecutor`."""
122
123 PROCESS = "process"
124 """Use :py:class:`~concurrent.futures.ProcessPoolExecutor`."""
125
126 INTERPRETER = "interpreter"
127 """Use :py:class:`~concurrent.futures.InterpreterPoolExecutor`.
128
129 Requires Python 3.14+.
130 """
131
132
133def _create_executor(executor_type: ExecutorType, max_workers: int) -> Executor:
134 """Create an executor of the specified type.
135
136 Args:
137 executor_type: Type of executor to create
138 max_workers: Maximum number of workers
139
140 Returns:
141 Executor instance
142
143 Raises:
144 ValueError: If ``executor_type`` is not supported
145 """
146 match executor_type:
147 case ExecutorType.THREAD:
148 return ThreadPoolExecutor(max_workers=max_workers)
149 case ExecutorType.PROCESS:
150 return ProcessPoolExecutor(max_workers=max_workers)
151 case ExecutorType.INTERPRETER:
152 from concurrent.futures import InterpreterPoolExecutor # pyre-ignore[21]
153
154 return InterpreterPoolExecutor(max_workers=max_workers)
155 case _:
156 raise ValueError(f"Unsupported executor type: {executor_type}")
157
158
159def _verify_workers(executor: Executor, expected_workers: int) -> None:
160 """Verify that the executor has created the expected number of workers.
161
162 Args:
163 executor: The executor to verify
164 expected_workers: Expected number of workers
165
166 Raises:
167 RuntimeError: If the number of workers doesn't match expected
168 """
169 match executor:
170 case ThreadPoolExecutor():
171 actual_workers = len(executor._threads)
172 case ProcessPoolExecutor():
173 actual_workers = len(executor._processes)
174 case _:
175 raise ValueError(f"Unexpected executor type {type(executor)}")
176
177 if actual_workers != expected_workers:
178 raise RuntimeError(
179 f"Expected {expected_workers} workers, but executor has {actual_workers}"
180 )
181
182
183def _warmup_executor(
184 executor: Executor, func: Callable[[], T], num_iterations: int
185) -> T:
186 """Warmup the executor by running the function multiple times.
187
188 Args:
189 executor: The executor to warmup
190 func: Function to run for warmup
191 num_iterations: Number of warmup iterations
192
193 Returns:
194 Output from the last warmup iteration
195 """
196 futures = [executor.submit(func) for _ in range(num_iterations)]
197 last_output: T | None = None
198 for future in as_completed(futures):
199 last_output = future.result()
200 assert last_output is not None
201 return last_output
202
203
204class BenchmarkRunner:
205 """Runner for executing benchmarks with configurable executors.
206
207 This class provides a standardized way to run benchmarks with:
208
209 - Warmup phase to exclude executor initialization overhead
210 - Multiple runs for statistical confidence intervals
211 - Support for different executor types
212
213 The executor is initialized and warmed up in the constructor to exclude
214 initialization overhead from benchmark measurements.
215
216 Args:
217 executor_type: Type of executor to use
218 (``"thread"``, ``"process"``, or ``"interpreter"``)
219 num_workers: Number of concurrent workers
220 warmup_iterations: Number of warmup iterations (default: ``2 * num_workers``)
221 """
222
223 def __init__(
224 self,
225 executor_type: ExecutorType,
226 num_workers: int,
227 warmup_iterations: int | None = None,
228 ) -> None:
229 self._executor_type: ExecutorType = executor_type
230
231 warmup_iters = (
232 warmup_iterations if warmup_iterations is not None else 2 * num_workers
233 )
234
235 self._executor: Executor = _create_executor(executor_type, num_workers)
236
237 _warmup_executor(self._executor, partial(time.sleep, 1), warmup_iters)
238 _verify_workers(self._executor, num_workers)
239
240 @property
241 def executor_type(self) -> ExecutorType:
242 """Get the executor type."""
243 return self._executor_type
244
245 def __enter__(self) -> "BenchmarkRunner":
246 """Enter context manager."""
247 return self
248
249 def __exit__(self, exc_type: Any, exc_val: Any, exc_tb: Any) -> None:
250 """Exit context manager and shutdown executor."""
251 self._executor.shutdown(wait=True)
252
253 def _run_iterations(
254 self,
255 func: Callable[[], T],
256 iterations: int,
257 num_runs: int,
258 ) -> tuple[list[float], list[float], T]:
259 """Run benchmark iterations and collect QPS and CPU utilization samples.
260
261 Args:
262 func: Function to benchmark (takes no arguments)
263 iterations: Number of iterations per run
264 num_runs: Number of benchmark runs
265
266 Returns:
267 Tuple of (list of QPS samples, list of CPU percent samples, last function output)
268 """
269 qps_samples: list[float] = []
270 cpu_samples: list[float] = []
271 last_output: T | None = None
272
273 process = psutil.Process()
274
275 for _ in range(num_runs):
276 process.cpu_percent()
277 t0 = time.perf_counter()
278 futures = [self._executor.submit(func) for _ in range(iterations)]
279 for future in as_completed(futures):
280 last_output = future.result()
281 elapsed = time.perf_counter() - t0
282 cpu_percent = process.cpu_percent()
283 qps_samples.append(iterations / elapsed)
284 cpu_samples.append(cpu_percent / iterations)
285
286 assert last_output is not None
287 return qps_samples, cpu_samples, last_output
288
289 def run(
290 self,
291 config: ConfigT,
292 func: Callable[[], T],
293 iterations: int,
294 num_runs: int = 5,
295 confidence_level: float = 0.95,
296 ) -> tuple[BenchmarkResult[ConfigT], T]:
297 """Run benchmark and return results with configuration.
298
299 Args:
300 config: Benchmark-specific configuration
301 func: Function to benchmark (takes no arguments)
302 iterations: Number of iterations per run
303 num_runs: Number of benchmark runs for confidence interval calculation
304 (default: ``5``)
305 confidence_level: Confidence level for interval calculation (default: ``0.95``)
306
307 Returns:
308 Tuple of (``BenchmarkResult``, last output from function)
309 """
310 qps_samples, cpu_samples, last_output = self._run_iterations(
311 func, iterations, num_runs
312 )
313
314 qps_mean = np.mean(qps_samples)
315 qps_std = np.std(qps_samples, ddof=1)
316 degrees_freedom = num_runs - 1
317 confidence_interval = scipy.stats.t.interval(
318 confidence_level,
319 degrees_freedom,
320 loc=qps_mean,
321 scale=qps_std / np.sqrt(num_runs),
322 )
323
324 cpu_mean = np.mean(cpu_samples)
325
326 date = datetime.now(timezone.utc).isoformat()
327
328 result = BenchmarkResult(
329 config=config,
330 executor_type=self.executor_type.value,
331 qps=float(qps_mean),
332 ci_lower=float(confidence_interval[0]),
333 ci_upper=float(confidence_interval[1]),
334 date=date,
335 cpu_percent=float(cpu_mean),
336 )
337
338 return result, last_output
339
340
341def get_default_result_path(path: str, ext: str = ".csv") -> str:
342 """Get the default result path with Python version appended."""
343 base, _ = os.path.splitext(os.path.realpath(path))
344 dirname = os.path.join(os.path.dirname(base), "data")
345 filename = os.path.basename(base)
346 version_suffix = (
347 f"_{'.'.join(_PYTHON_VERSION.split('.')[:2])}{'t' if _FREE_THREADED else ''}"
348 )
349 return os.path.join(dirname, f"{filename}{version_suffix}{ext}")
350
351
352def save_results_to_csv(
353 results: list[BenchmarkResult[Any]],
354 output_file: str,
355) -> None:
356 """Save benchmark results to a CSV file.
357
358 Flattens the nested BenchmarkResult structure (config + performance metrics)
359 into a flat CSV format. Each row contains both the benchmark configuration
360 fields and the performance metrics.
361
362 Args:
363 results: List of BenchmarkResult instances
364 output_file: Output file path for the CSV file
365 """
366 if not results:
367 raise ValueError("No results to save")
368
369 flattened_results = []
370 for result in results:
371 config_dict = asdict(result.config)
372 # convert bool to int for slight readability improvement of raw CSV file
373 config_dict = {
374 k: (int(v) if isinstance(v, bool) else v) for k, v in config_dict.items()
375 }
376 flattened = {
377 "date": result.date,
378 "python_version": result.python_version,
379 "free_threaded": int(result.free_threaded),
380 **config_dict,
381 "executor_type": result.executor_type,
382 "qps": result.qps,
383 "ci_lower": result.ci_lower,
384 "ci_upper": result.ci_upper,
385 "cpu_percent": result.cpu_percent,
386 }
387 flattened_results.append(flattened)
388
389 # Get all field names from the first result
390 fieldnames = list(flattened_results[0].keys())
391
392 output_path = os.path.realpath(output_file)
393 os.makedirs(os.path.dirname(output_file), exist_ok=True)
394 with open(output_path, "w", newline="") as csvfile:
395 # Write generated marker as first line
396 # Note: Splitting the marker so as to avoid linter consider this file as generated file
397 csvfile.write("# @")
398 csvfile.write("generated\n")
399
400 writer = csv.DictWriter(csvfile, fieldnames=fieldnames)
401 writer.writeheader()
402 for result_dict in flattened_results:
403 writer.writerow(result_dict)
404
405 print(f"Results saved to {output_file}")
406
407
408def load_results_from_csv(
409 input_file: str,
410 config_type: type[ConfigT],
411) -> list[BenchmarkResult[ConfigT]]:
412 """Load benchmark results from a CSV file.
413
414 Reconstructs BenchmarkResult objects from the flattened CSV format created
415 by :py:func:`save_results_to_csv`.
416 Each row in the CSV is parsed into a :py:class:`BenchmarkResult`
417 with the appropriate config type.
418
419 Args:
420 input_file: Input CSV file path
421 config_type: The dataclass type to use for the config field
422
423 Returns:
424 List of BenchmarkResult instances with parsed config objects
425
426 Raises:
427 FileNotFoundError: If input_file does not exist
428 ValueError: If CSV format is invalid or ``config_type`` is not a dataclass
429 """
430 if not hasattr(config_type, "__dataclass_fields__"):
431 raise ValueError(f"config_type must be a dataclass, got {config_type}")
432 fields: dict[str, Any] = config_type.__dataclass_fields__ # pyre-ignore[16]
433
434 # Normalize input path and resolve symbolic links
435 input_file = os.path.realpath(input_file)
436
437 # Get the field names from the config dataclass
438 config_fields = set(fields.keys())
439
440 # Performance metric fields that are part of BenchmarkResult
441 result_fields = {
442 "executor_type",
443 "qps",
444 "ci_lower",
445 "ci_upper",
446 "date",
447 "python_version",
448 "free_threaded",
449 "cpu_percent",
450 }
451
452 results: list[BenchmarkResult[ConfigT]] = []
453
454 TRUES = ("true", "1", "yes")
455
456 with open(input_file, newline="") as csvfile:
457 reader = csv.DictReader((v for v in csvfile if not v.strip().startswith("#")))
458
459 for row in reader:
460 # Split row into config fields and result fields
461 config_dict = {}
462 result_dict = {}
463
464 for key, value in row.items():
465 if key in config_fields:
466 config_dict[key] = value
467 elif key in result_fields:
468 result_dict[key] = value
469 else:
470 # Unknown field - could be from config or result
471 # Try to infer based on whether it matches a config field name
472 config_dict[key] = value
473
474 # Convert string values to appropriate types for config
475 typed_config_dict = {}
476 for field_name, field_info in fields.items():
477 if field_name not in config_dict:
478 continue
479
480 value = config_dict[field_name]
481 field_type = field_info.type
482
483 # Handle type conversions
484 if field_type is int or field_type == "int":
485 typed_config_dict[field_name] = int(value)
486 elif field_type is float or field_type == "float":
487 typed_config_dict[field_name] = float(value)
488 elif field_type is bool or field_type == "bool":
489 typed_config_dict[field_name] = value.lower() in TRUES
490 else:
491 # Keep as string or use the value as-is
492 typed_config_dict[field_name] = value
493
494 result = BenchmarkResult(
495 config=config_type(**typed_config_dict),
496 executor_type=result_dict["executor_type"],
497 qps=float(result_dict["qps"]),
498 ci_lower=float(result_dict["ci_lower"]),
499 ci_upper=float(result_dict["ci_upper"]),
500 date=result_dict["date"],
501 python_version=result_dict["python_version"],
502 free_threaded=result_dict["free_threaded"].lower()
503 in ("true", "1", "yes"),
504 cpu_percent=float(result_dict.get("cpu_percent", 0.0)),
505 )
506
507 results.append(result)
508
509 return results
API Reference¶
Functions
- get_default_result_path(path: str, ext: str = '.csv') str[source]¶
Get the default result path with Python version appended.
- load_results_from_csv(input_file: str, config_type: type[ConfigT]) list[BenchmarkResult[ConfigT]][source]¶
Load benchmark results from a CSV file.
Reconstructs BenchmarkResult objects from the flattened CSV format created by
save_results_to_csv(). Each row in the CSV is parsed into aBenchmarkResultwith the appropriate config type.- Parameters:
input_file – Input CSV file path
config_type – The dataclass type to use for the config field
- Returns:
List of BenchmarkResult instances with parsed config objects
- Raises:
FileNotFoundError – If input_file does not exist
ValueError – If CSV format is invalid or
config_typeis not a dataclass
- save_results_to_csv(results: list[BenchmarkResult[Any]], output_file: str) None[source]¶
Save benchmark results to a CSV file.
Flattens the nested BenchmarkResult structure (config + performance metrics) into a flat CSV format. Each row contains both the benchmark configuration fields and the performance metrics.
- Parameters:
results – List of BenchmarkResult instances
output_file – Output file path for the CSV file
Classes
- class BenchmarkRunner(executor_type: ExecutorType, num_workers: int, warmup_iterations: int | None = None)[source]¶
Runner for executing benchmarks with configurable executors.
This class provides a standardized way to run benchmarks with:
Warmup phase to exclude executor initialization overhead
Multiple runs for statistical confidence intervals
Support for different executor types
The executor is initialized and warmed up in the constructor to exclude initialization overhead from benchmark measurements.
- Parameters:
executor_type – Type of executor to use (
"thread","process", or"interpreter")num_workers – Number of concurrent workers
warmup_iterations – Number of warmup iterations (default:
2 * num_workers)
- property executor_type: ExecutorType[source]¶
Get the executor type.
- run(config: ConfigT, func: Callable[[], T], iterations: int, num_runs: int = 5, confidence_level: float = 0.95) tuple[BenchmarkResult[ConfigT], T][source]¶
Run benchmark and return results with configuration.
- Parameters:
config – Benchmark-specific configuration
func – Function to benchmark (takes no arguments)
iterations – Number of iterations per run
num_runs – Number of benchmark runs for confidence interval calculation (default:
5)confidence_level – Confidence level for interval calculation (default:
0.95)
- Returns:
Tuple of (
BenchmarkResult, last output from function)
- class BenchmarkResult[source]¶
Generic benchmark result containing configuration and performance metrics.
This class holds both the benchmark-specific configuration and the common performance statistics. It is parameterized by the config type, which allows each benchmark script to define its own configuration dataclass.
- config: ConfigT¶
Benchmark-specific configuration (e.g., data format, file size, etc.)
- class ExecutorType[source]¶
Supported executor types for concurrent execution.
- INTERPRETER = 'interpreter'¶
-
Requires Python 3.14+.
- PROCESS = 'process'¶
Use
ProcessPoolExecutor.
- THREAD = 'thread'¶
Use
ThreadPoolExecutor.