Benchmark utils¶
Common utilities for benchmark scripts.
This module provides a standardized framework for running benchmarks with:
Configurable executor types (
ThreadPoolExecutor,ProcessPoolExecutor,InterpreterPoolExecutor)Warmup phase to exclude executor initialization overhead
Statistical analysis with confidence intervals
CSV export functionality
Python version and free-threaded ABI detection
Source¶
Source
Click here to see the source.
1#!/usr/bin/env python3
2# Copyright (c) Meta Platforms, Inc. and affiliates.
3# All rights reserved.
4#
5# This source code is licensed under the BSD-style license found in the
6# LICENSE file in the root directory of this source tree.
7
8"""Common utilities for benchmark scripts.
9
10This module provides a standardized framework for running benchmarks with:
11
12- Configurable executor types (
13 :py:class:`~concurrent.futures.ThreadPoolExecutor`,
14 :py:class:`~concurrent.futures.ProcessPoolExecutor`,
15 :py:class:`~concurrent.futures.InterpreterPoolExecutor`)
16- Warmup phase to exclude executor initialization overhead
17- Statistical analysis with confidence intervals
18- CSV export functionality
19- Python version and free-threaded ABI detection
20
21.. seealso::
22
23 - :doc:`./benchmark_tarfile`
24 - :doc:`./benchmark_wav`
25 - :doc:`./benchmark_numpy`
26
27"""
28
29__all__ = [
30 "BenchmarkRunner",
31 "BenchmarkResult",
32 "ExecutorType",
33 "get_default_result_path",
34 "load_results_from_csv",
35 "save_results_to_csv",
36]
37
38import csv
39import os
40import sys
41import time
42from collections.abc import Callable
43from concurrent.futures import (
44 as_completed,
45 Executor,
46 ProcessPoolExecutor,
47 ThreadPoolExecutor,
48)
49from dataclasses import asdict, dataclass
50from datetime import datetime, timezone
51from enum import Enum
52from functools import partial
53from typing import Any, Generic, TypeVar
54
55import numpy as np
56import scipy.stats
57
58T = TypeVar("T")
59ConfigT = TypeVar("ConfigT")
60
61
62@dataclass
63class BenchmarkResult(Generic[ConfigT]):
64 """BenchmarkResult()
65
66 Generic benchmark result containing configuration and performance metrics.
67
68 This class holds both the benchmark-specific configuration and the
69 common performance statistics. It is parameterized by the config type,
70 which allows each benchmark script to define its own configuration dataclass.
71 """
72
73 config: ConfigT
74 """Benchmark-specific configuration (e.g., data format, file size, etc.)"""
75
76 executor_type: str
77 """Type of executor used (thread, process, or interpreter)"""
78
79 qps: float
80 """Queries per second (mean)"""
81
82 ci_lower: float
83 """Lower bound of 95% confidence interval for QPS"""
84
85 ci_upper: float
86 """Upper bound of 95% confidence interval for QPS"""
87
88 date: str
89 """When benchmark was run. ISO 8601 format."""
90
91 python_version: str
92 """Python version used for the benchmark"""
93
94 free_threaded: bool
95 """Whether Python is running with free-threaded ABI."""
96
97
98class ExecutorType(Enum):
99 """ExecutorType()
100
101 Supported executor types for concurrent execution."""
102
103 THREAD = "thread"
104 """Use :py:class:`~concurrent.futures.ThreadPoolExecutor`."""
105
106 PROCESS = "process"
107 """Use :py:class:`~concurrent.futures.ProcessPoolExecutor`."""
108
109 INTERPRETER = "interpreter"
110 """Use :py:class:`~concurrent.futures.InterpreterPoolExecutor`.
111
112 Requires Python 3.14+.
113 """
114
115
116def _get_python_info() -> tuple[str, bool]:
117 """Get Python version and free-threaded ABI information.
118
119 Returns:
120 Tuple of (``python_version``, ``is_free_threaded``)
121 """
122 python_version = (
123 f"{sys.version_info.major}.{sys.version_info.minor}.{sys.version_info.micro}"
124 )
125 try:
126 is_free_threaded = not sys._is_gil_enabled() # pyre-ignore[16]
127 except AttributeError:
128 is_free_threaded = False
129 return python_version, is_free_threaded
130
131
132def _create_executor(executor_type: ExecutorType, max_workers: int) -> Executor:
133 """Create an executor of the specified type.
134
135 Args:
136 executor_type: Type of executor to create
137 max_workers: Maximum number of workers
138
139 Returns:
140 Executor instance
141
142 Raises:
143 ValueError: If ``executor_type`` is not supported
144 """
145 match executor_type:
146 case ExecutorType.THREAD:
147 return ThreadPoolExecutor(max_workers=max_workers)
148 case ExecutorType.PROCESS:
149 return ProcessPoolExecutor(max_workers=max_workers)
150 case ExecutorType.INTERPRETER:
151 from concurrent.futures import InterpreterPoolExecutor # pyre-ignore[21]
152
153 return InterpreterPoolExecutor(max_workers=max_workers)
154 case _:
155 raise ValueError(f"Unsupported executor type: {executor_type}")
156
157
158def _verify_workers(executor: Executor, expected_workers: int) -> None:
159 """Verify that the executor has created the expected number of workers.
160
161 Args:
162 executor: The executor to verify
163 expected_workers: Expected number of workers
164
165 Raises:
166 RuntimeError: If the number of workers doesn't match expected
167 """
168 match executor:
169 case ThreadPoolExecutor():
170 actual_workers = len(executor._threads)
171 case ProcessPoolExecutor():
172 actual_workers = len(executor._processes)
173 case _:
174 raise ValueError(f"Unexpected executor type {type(executor)}")
175
176 if actual_workers != expected_workers:
177 raise RuntimeError(
178 f"Expected {expected_workers} workers, but executor has {actual_workers}"
179 )
180
181
182def _warmup_executor(
183 executor: Executor, func: Callable[[], T], num_iterations: int
184) -> T:
185 """Warmup the executor by running the function multiple times.
186
187 Args:
188 executor: The executor to warmup
189 func: Function to run for warmup
190 num_iterations: Number of warmup iterations
191
192 Returns:
193 Output from the last warmup iteration
194 """
195 futures = [executor.submit(func) for _ in range(num_iterations)]
196 last_output: T | None = None
197 for future in as_completed(futures):
198 last_output = future.result()
199 return last_output # pyre-ignore[7]
200
201
202class BenchmarkRunner:
203 """Runner for executing benchmarks with configurable executors.
204
205 This class provides a standardized way to run benchmarks with:
206
207 - Warmup phase to exclude executor initialization overhead
208 - Multiple runs for statistical confidence intervals
209 - Support for different executor types
210
211 The executor is initialized and warmed up in the constructor to exclude
212 initialization overhead from benchmark measurements.
213
214 Args:
215 executor_type: Type of executor to use
216 (``"thread"``, ``"process"``, or ``"interpreter"``)
217 num_workers: Number of concurrent workers
218 warmup_iterations: Number of warmup iterations (default: ``2 * num_workers``)
219 """
220
221 def __init__(
222 self,
223 executor_type: ExecutorType,
224 num_workers: int,
225 warmup_iterations: int | None = None,
226 ) -> None:
227 self._executor_type: ExecutorType = executor_type
228
229 warmup_iters = (
230 warmup_iterations if warmup_iterations is not None else 2 * num_workers
231 )
232
233 self._executor: Executor = _create_executor(executor_type, num_workers)
234
235 _warmup_executor(self._executor, partial(time.sleep, 1), warmup_iters)
236 _verify_workers(self._executor, num_workers)
237
238 @property
239 def executor_type(self) -> ExecutorType:
240 """Get the executor type."""
241 return self._executor_type
242
243 def __enter__(self) -> "BenchmarkRunner":
244 """Enter context manager."""
245 return self
246
247 def __exit__(self, exc_type: Any, exc_val: Any, exc_tb: Any) -> None:
248 """Exit context manager and shutdown executor."""
249 self._executor.shutdown(wait=True)
250
251 def _run_iterations(
252 self,
253 func: Callable[[], T],
254 iterations: int,
255 num_runs: int,
256 ) -> tuple[list[float], T]:
257 """Run benchmark iterations and collect QPS samples.
258
259 Args:
260 func: Function to benchmark (takes no arguments)
261 iterations: Number of iterations per run
262 num_runs: Number of benchmark runs
263
264 Returns:
265 Tuple of (list of QPS samples from each run, last function output)
266 """
267 qps_samples: list[float] = []
268 last_output: T | None = None
269
270 for _ in range(num_runs):
271 t0 = time.perf_counter()
272 futures = [self._executor.submit(func) for _ in range(iterations)]
273 for future in as_completed(futures):
274 last_output = future.result()
275 elapsed = time.perf_counter() - t0
276 qps_samples.append(iterations / elapsed)
277
278 return qps_samples, last_output # pyre-ignore[7]
279
280 def run(
281 self,
282 config: ConfigT,
283 func: Callable[[], T],
284 iterations: int,
285 num_runs: int = 5,
286 confidence_level: float = 0.95,
287 ) -> tuple[BenchmarkResult[ConfigT], T]:
288 """Run benchmark and return results with configuration.
289
290 Args:
291 config: Benchmark-specific configuration
292 func: Function to benchmark (takes no arguments)
293 iterations: Number of iterations per run
294 num_runs: Number of benchmark runs for confidence interval calculation
295 (default: ``5``)
296 confidence_level: Confidence level for interval calculation (default: ``0.95``)
297
298 Returns:
299 Tuple of (``BenchmarkResult``, last output from function)
300 """
301 qps_samples, last_output = self._run_iterations(func, iterations, num_runs)
302
303 qps_mean = np.mean(qps_samples)
304 qps_std = np.std(qps_samples, ddof=1)
305 degrees_freedom = num_runs - 1
306 confidence_interval = scipy.stats.t.interval(
307 confidence_level,
308 degrees_freedom,
309 loc=qps_mean,
310 scale=qps_std / np.sqrt(num_runs),
311 )
312
313 python_version, free_threaded = _get_python_info()
314 date = datetime.now(timezone.utc).isoformat()
315
316 result = BenchmarkResult(
317 config=config,
318 executor_type=self.executor_type.value,
319 qps=float(qps_mean),
320 ci_lower=float(confidence_interval[0]),
321 ci_upper=float(confidence_interval[1]),
322 date=date,
323 python_version=python_version,
324 free_threaded=free_threaded,
325 )
326
327 return result, last_output
328
329
330def get_default_result_path(path: str, ext: str = ".csv") -> str:
331 """Get the default result path with Python version appended."""
332 base, _ = os.path.splitext(os.path.realpath(path))
333 dirname = os.path.join(os.path.dirname(base), "data")
334 filename = os.path.basename(base)
335 python_version, free_threaded = _get_python_info()
336 version_suffix = (
337 f"_{'.'.join(python_version.split('.')[:2])}{'t' if free_threaded else ''}"
338 )
339 return os.path.join(dirname, f"{filename}{version_suffix}{ext}")
340
341
342def save_results_to_csv(
343 results: list[BenchmarkResult[Any]],
344 output_file: str,
345) -> None:
346 """Save benchmark results to a CSV file.
347
348 Flattens the nested BenchmarkResult structure (config + performance metrics)
349 into a flat CSV format. Each row contains both the benchmark configuration
350 fields and the performance metrics.
351
352 Args:
353 results: List of BenchmarkResult instances
354 output_file: Output file path for the CSV file
355 """
356 if not results:
357 raise ValueError("No results to save")
358
359 flattened_results = []
360 for result in results:
361 config_dict = asdict(result.config)
362 # convert bool to int for slight readability improvement of raw CSV file
363 config_dict = {
364 k: (int(v) if isinstance(v, bool) else v) for k, v in config_dict.items()
365 }
366 flattened = {
367 "date": result.date,
368 "python_version": result.python_version,
369 "free_threaded": int(result.free_threaded),
370 **config_dict,
371 "executor_type": result.executor_type,
372 "qps": result.qps,
373 "ci_lower": result.ci_lower,
374 "ci_upper": result.ci_upper,
375 }
376 flattened_results.append(flattened)
377
378 # Get all field names from the first result
379 fieldnames = list(flattened_results[0].keys())
380
381 output_path = os.path.realpath(output_file)
382 os.makedirs(os.path.dirname(output_file), exist_ok=True)
383 with open(output_path, "w", newline="") as csvfile:
384 # Write generated marker as first line
385 # Note: Splitting the marker so as to avoid linter consider this file as generated file
386 csvfile.write("# @" "generated\n")
387
388 writer = csv.DictWriter(csvfile, fieldnames=fieldnames)
389 writer.writeheader()
390 for result_dict in flattened_results:
391 writer.writerow(result_dict)
392
393 print(f"Results saved to {output_file}")
394
395
396def load_results_from_csv(
397 input_file: str,
398 config_type: type[ConfigT],
399) -> list[BenchmarkResult[ConfigT]]:
400 """Load benchmark results from a CSV file.
401
402 Reconstructs BenchmarkResult objects from the flattened CSV format created
403 by :py:func:`save_results_to_csv`.
404 Each row in the CSV is parsed into a :py:class:`BenchmarkResult`
405 with the appropriate config type.
406
407 Args:
408 input_file: Input CSV file path
409 config_type: The dataclass type to use for the config field
410
411 Returns:
412 List of BenchmarkResult instances with parsed config objects
413
414 Raises:
415 FileNotFoundError: If input_file does not exist
416 ValueError: If CSV format is invalid or ``config_type`` is not a dataclass
417 """
418 if not hasattr(config_type, "__dataclass_fields__"):
419 raise ValueError(f"config_type must be a dataclass, got {config_type}")
420 fields: dict[str, Any] = config_type.__dataclass_fields__ # pyre-ignore[16]
421
422 # Normalize input path and resolve symbolic links
423 input_file = os.path.realpath(input_file)
424
425 # Get the field names from the config dataclass
426 config_fields = set(fields.keys())
427
428 # Performance metric fields that are part of BenchmarkResult
429 result_fields = {
430 "executor_type",
431 "qps",
432 "ci_lower",
433 "ci_upper",
434 "date",
435 "python_version",
436 "free_threaded",
437 }
438
439 results: list[BenchmarkResult[ConfigT]] = []
440
441 TRUES = ("true", "1", "yes")
442
443 with open(input_file, newline="") as csvfile:
444 reader = csv.DictReader((v for v in csvfile if not v.strip().startswith("#")))
445
446 for row in reader:
447 # Split row into config fields and result fields
448 config_dict = {}
449 result_dict = {}
450
451 for key, value in row.items():
452 if key in config_fields:
453 config_dict[key] = value
454 elif key in result_fields:
455 result_dict[key] = value
456 else:
457 # Unknown field - could be from config or result
458 # Try to infer based on whether it matches a config field name
459 config_dict[key] = value
460
461 # Convert string values to appropriate types for config
462 typed_config_dict = {}
463 for field_name, field_info in fields.items():
464 if field_name not in config_dict:
465 continue
466
467 value = config_dict[field_name]
468 field_type = field_info.type
469
470 # Handle type conversions
471 if field_type is int or field_type == "int":
472 typed_config_dict[field_name] = int(value)
473 elif field_type is float or field_type == "float":
474 typed_config_dict[field_name] = float(value)
475 elif field_type is bool or field_type == "bool":
476 typed_config_dict[field_name] = value.lower() in TRUES
477 else:
478 # Keep as string or use the value as-is
479 typed_config_dict[field_name] = value
480
481 result = BenchmarkResult(
482 config=config_type(**typed_config_dict),
483 executor_type=result_dict["executor_type"],
484 qps=float(result_dict["qps"]),
485 ci_lower=float(result_dict["ci_lower"]),
486 ci_upper=float(result_dict["ci_upper"]),
487 date=result_dict["date"],
488 python_version=result_dict["python_version"],
489 free_threaded=result_dict["free_threaded"].lower()
490 in ("true", "1", "yes"),
491 )
492
493 results.append(result)
494
495 return results
Functions¶
Functions
- get_default_result_path(path: str, ext: str = '.csv') str[source]¶
Get the default result path with Python version appended.
- load_results_from_csv(input_file: str, config_type: type[ConfigT]) list[BenchmarkResult[ConfigT]][source]¶
Load benchmark results from a CSV file.
Reconstructs BenchmarkResult objects from the flattened CSV format created by
save_results_to_csv(). Each row in the CSV is parsed into aBenchmarkResultwith the appropriate config type.- Parameters:
input_file – Input CSV file path
config_type – The dataclass type to use for the config field
- Returns:
List of BenchmarkResult instances with parsed config objects
- Raises:
FileNotFoundError – If input_file does not exist
ValueError – If CSV format is invalid or
config_typeis not a dataclass
- save_results_to_csv(results: list[BenchmarkResult[Any]], output_file: str) None[source]¶
Save benchmark results to a CSV file.
Flattens the nested BenchmarkResult structure (config + performance metrics) into a flat CSV format. Each row contains both the benchmark configuration fields and the performance metrics.
- Parameters:
results – List of BenchmarkResult instances
output_file – Output file path for the CSV file
Classes¶
Classes
- class BenchmarkRunner(executor_type: ExecutorType, num_workers: int, warmup_iterations: int | None = None)[source]¶
Runner for executing benchmarks with configurable executors.
This class provides a standardized way to run benchmarks with:
Warmup phase to exclude executor initialization overhead
Multiple runs for statistical confidence intervals
Support for different executor types
The executor is initialized and warmed up in the constructor to exclude initialization overhead from benchmark measurements.
- Parameters:
executor_type – Type of executor to use (
"thread","process", or"interpreter")num_workers – Number of concurrent workers
warmup_iterations – Number of warmup iterations (default:
2 * num_workers)
- property executor_type: ExecutorType[source]¶
Get the executor type.
- run(config: ConfigT, func: Callable[[], T], iterations: int, num_runs: int = 5, confidence_level: float = 0.95) tuple[BenchmarkResult[ConfigT], T][source]¶
Run benchmark and return results with configuration.
- Parameters:
config – Benchmark-specific configuration
func – Function to benchmark (takes no arguments)
iterations – Number of iterations per run
num_runs – Number of benchmark runs for confidence interval calculation (default:
5)confidence_level – Confidence level for interval calculation (default:
0.95)
- Returns:
Tuple of (
BenchmarkResult, last output from function)
- class BenchmarkResult[source]¶
Generic benchmark result containing configuration and performance metrics.
This class holds both the benchmark-specific configuration and the common performance statistics. It is parameterized by the config type, which allows each benchmark script to define its own configuration dataclass.
- config: ConfigT¶
Benchmark-specific configuration (e.g., data format, file size, etc.)
- class ExecutorType[source]¶
Supported executor types for concurrent execution.
- INTERPRETER = 'interpreter'¶
-
Requires Python 3.14+.
- PROCESS = 'process'¶
Use
ProcessPoolExecutor.
- THREAD = 'thread'¶
Use
ThreadPoolExecutor.