Benchmark utils¶
Common utilities for benchmark scripts.
This module provides a standardized framework for running benchmarks with:
Configurable executor types (
ThreadPoolExecutor,ProcessPoolExecutor,InterpreterPoolExecutor)Warmup phase to exclude executor initialization overhead
Statistical analysis with confidence intervals
CSV export functionality
Python version and free-threaded ABI detection
Source¶
Source
Click here to see the source.
1#!/usr/bin/env python3
2# Copyright (c) Meta Platforms, Inc. and affiliates.
3# All rights reserved.
4#
5# This source code is licensed under the BSD-style license found in the
6# LICENSE file in the root directory of this source tree.
7
8# pyre-strict
9
10"""Common utilities for benchmark scripts.
11
12This module provides a standardized framework for running benchmarks with:
13
14- Configurable executor types (
15 :py:class:`~concurrent.futures.ThreadPoolExecutor`,
16 :py:class:`~concurrent.futures.ProcessPoolExecutor`,
17 :py:class:`~concurrent.futures.InterpreterPoolExecutor`)
18- Warmup phase to exclude executor initialization overhead
19- Statistical analysis with confidence intervals
20- CSV export functionality
21- Python version and free-threaded ABI detection
22
23.. seealso::
24
25 - :doc:`./benchmark_tarfile`
26 - :doc:`./benchmark_wav`
27 - :doc:`./benchmark_numpy`
28
29"""
30
31__all__ = [
32 "BenchmarkRunner",
33 "BenchmarkResult",
34 "ExecutorType",
35 "get_default_result_path",
36 "load_results_from_csv",
37 "save_results_to_csv",
38]
39
40import csv
41import os
42import sys
43import time
44from collections.abc import Callable
45from concurrent.futures import (
46 as_completed,
47 Executor,
48 ProcessPoolExecutor,
49 ThreadPoolExecutor,
50)
51from dataclasses import asdict, dataclass, field
52from datetime import datetime, timezone
53from enum import Enum
54from functools import partial
55from sys import version_info
56from types import TracebackType
57from typing import Any, Generic, TypeVar
58
59import numpy as np
60import psutil
61import scipy.stats
62
63T = TypeVar("T")
64ConfigT = TypeVar("ConfigT")
65
66
67def _is_free_threaded() -> bool:
68 """Check if Python is running with free-threaded ABI."""
69 try:
70 return not sys._is_gil_enabled() # pyre-ignore[16]
71 except AttributeError:
72 return False
73
74
75_PYTHON_VERSION: str = f"{version_info.major}.{version_info.minor}.{version_info.micro}"
76_FREE_THREADED: bool = _is_free_threaded()
77
78
79@dataclass
80class BenchmarkResult(Generic[ConfigT]):
81 """BenchmarkResult()
82
83 Generic benchmark result containing configuration and performance metrics.
84
85 This class holds both the benchmark-specific configuration and the
86 common performance statistics. It is parameterized by the config type,
87 which allows each benchmark script to define its own configuration dataclass.
88 """
89
90 config: ConfigT
91 """Benchmark-specific configuration (e.g., data format, file size, etc.)"""
92
93 executor_type: str
94 """Type of executor used (thread, process, or interpreter)"""
95
96 qps: float
97 """Queries per second (mean)"""
98
99 ci_lower: float
100 """Lower bound of 95% confidence interval for QPS"""
101
102 ci_upper: float
103 """Upper bound of 95% confidence interval for QPS"""
104
105 date: str
106 """When benchmark was run. ISO 8601 format."""
107
108 cpu_percent: float
109 """Average CPU utilization percentage during benchmark execution."""
110
111 python_version: str = field(default=_PYTHON_VERSION)
112 """Python version used for the benchmark"""
113
114 free_threaded: bool = field(default=_FREE_THREADED)
115 """Whether Python is running with free-threaded ABI."""
116
117
118class ExecutorType(Enum):
119 """ExecutorType()
120
121 Supported executor types for concurrent execution."""
122
123 THREAD = "thread"
124 """Use :py:class:`~concurrent.futures.ThreadPoolExecutor`."""
125
126 PROCESS = "process"
127 """Use :py:class:`~concurrent.futures.ProcessPoolExecutor`."""
128
129 INTERPRETER = "interpreter"
130 """Use :py:class:`~concurrent.futures.InterpreterPoolExecutor`.
131
132 Requires Python 3.14+.
133 """
134
135
136def _create_executor(executor_type: ExecutorType, max_workers: int) -> Executor:
137 """Create an executor of the specified type.
138
139 Args:
140 executor_type: Type of executor to create
141 max_workers: Maximum number of workers
142
143 Returns:
144 Executor instance
145
146 Raises:
147 ValueError: If ``executor_type`` is not supported
148 """
149 match executor_type:
150 case ExecutorType.THREAD:
151 return ThreadPoolExecutor(max_workers=max_workers)
152 case ExecutorType.PROCESS:
153 return ProcessPoolExecutor(max_workers=max_workers)
154 case ExecutorType.INTERPRETER:
155 from concurrent.futures import InterpreterPoolExecutor # pyre-ignore[21]
156
157 return InterpreterPoolExecutor(max_workers=max_workers)
158 case _:
159 raise ValueError(f"Unsupported executor type: {executor_type}")
160
161
162def _verify_workers(executor: Executor, expected_workers: int) -> None:
163 """Verify that the executor has created the expected number of workers.
164
165 Args:
166 executor: The executor to verify
167 expected_workers: Expected number of workers
168
169 Raises:
170 RuntimeError: If the number of workers doesn't match expected
171 """
172 match executor:
173 case ThreadPoolExecutor():
174 actual_workers = len(executor._threads)
175 case ProcessPoolExecutor():
176 actual_workers = len(executor._processes)
177 case _:
178 raise ValueError(f"Unexpected executor type {type(executor)}")
179
180 if actual_workers != expected_workers:
181 raise RuntimeError(
182 f"Expected {expected_workers} workers, but executor has {actual_workers}"
183 )
184
185
186def _warmup_executor(
187 executor: Executor, func: Callable[[], T], num_iterations: int
188) -> None:
189 """Warmup the executor by running the function multiple times.
190
191 The function output is intentionally discarded; the warmup only exists to
192 spin up the worker threads/processes before measurement begins.
193
194 Args:
195 executor: The executor to warmup
196 func: Function to run for warmup
197 num_iterations: Number of warmup iterations
198 """
199 futures = [executor.submit(func) for _ in range(num_iterations)]
200 for future in as_completed(futures):
201 future.result()
202
203
204class BenchmarkRunner:
205 """Runner for executing benchmarks with configurable executors.
206
207 This class provides a standardized way to run benchmarks with:
208
209 - Warmup phase to exclude executor initialization overhead
210 - Multiple runs for statistical confidence intervals
211 - Support for different executor types
212
213 The executor is initialized and warmed up in the constructor to exclude
214 initialization overhead from benchmark measurements.
215
216 Args:
217 executor_type: Type of executor to use
218 (``"thread"``, ``"process"``, or ``"interpreter"``)
219 num_workers: Number of concurrent workers
220 warmup_iterations: Number of warmup iterations (default: ``2 * num_workers``)
221 """
222
223 def __init__(
224 self,
225 executor_type: ExecutorType,
226 num_workers: int,
227 warmup_iterations: int | None = None,
228 ) -> None:
229 self._executor_type: ExecutorType = executor_type
230
231 warmup_iters = (
232 warmup_iterations if warmup_iterations is not None else 2 * num_workers
233 )
234
235 self._executor: Executor = _create_executor(executor_type, num_workers)
236
237 _warmup_executor(self._executor, partial(time.sleep, 1), warmup_iters)
238 _verify_workers(self._executor, num_workers)
239
240 @property
241 def executor_type(self) -> ExecutorType:
242 """Get the executor type."""
243 return self._executor_type
244
245 def __enter__(self) -> "BenchmarkRunner":
246 """Enter context manager."""
247 return self
248
249 def __exit__(
250 self,
251 exc_type: type[BaseException] | None,
252 exc_val: BaseException | None,
253 exc_tb: TracebackType | None,
254 ) -> None:
255 """Exit context manager and shutdown executor."""
256 self._executor.shutdown(wait=True)
257
258 def _run_iterations(
259 self,
260 func: Callable[[], T],
261 iterations: int,
262 num_runs: int,
263 ) -> tuple[list[float], list[float], T]:
264 """Run benchmark iterations and collect QPS and CPU utilization samples.
265
266 Args:
267 func: Function to benchmark (takes no arguments)
268 iterations: Number of iterations per run
269 num_runs: Number of benchmark runs
270
271 Returns:
272 Tuple of (list of QPS samples, list of CPU percent samples, last function output)
273 """
274 qps_samples: list[float] = []
275 cpu_samples: list[float] = []
276 last_output: T | None = None
277
278 process = psutil.Process()
279
280 for _ in range(num_runs):
281 process.cpu_percent()
282 t0 = time.perf_counter()
283 futures = [self._executor.submit(func) for _ in range(iterations)]
284 for future in as_completed(futures):
285 last_output = future.result()
286 elapsed = time.perf_counter() - t0
287 cpu_percent = process.cpu_percent()
288 qps_samples.append(iterations / elapsed)
289 cpu_samples.append(cpu_percent / iterations)
290
291 assert last_output is not None
292 return qps_samples, cpu_samples, last_output
293
294 def run(
295 self,
296 config: ConfigT,
297 func: Callable[[], T],
298 iterations: int,
299 num_runs: int = 5,
300 confidence_level: float = 0.95,
301 ) -> tuple[BenchmarkResult[ConfigT], T]:
302 """Run benchmark and return results with configuration.
303
304 Args:
305 config: Benchmark-specific configuration
306 func: Function to benchmark (takes no arguments)
307 iterations: Number of iterations per run
308 num_runs: Number of benchmark runs for confidence interval calculation
309 (default: ``5``)
310 confidence_level: Confidence level for interval calculation (default: ``0.95``)
311
312 Returns:
313 Tuple of (``BenchmarkResult``, last output from function)
314 """
315 qps_samples, cpu_samples, last_output = self._run_iterations(
316 func, iterations, num_runs
317 )
318
319 qps_mean = np.mean(qps_samples)
320 qps_std = np.std(qps_samples, ddof=1)
321 degrees_freedom = num_runs - 1
322 confidence_interval = scipy.stats.t.interval(
323 confidence_level,
324 degrees_freedom,
325 loc=qps_mean,
326 scale=qps_std / np.sqrt(num_runs),
327 )
328
329 cpu_mean = np.mean(cpu_samples)
330
331 date = datetime.now(timezone.utc).isoformat()
332
333 result = BenchmarkResult(
334 config=config,
335 executor_type=self.executor_type.value,
336 qps=float(qps_mean),
337 # pyrefly: ignore [bad-argument-type]
338 ci_lower=float(confidence_interval[0]),
339 # pyrefly: ignore [bad-argument-type]
340 ci_upper=float(confidence_interval[1]),
341 date=date,
342 cpu_percent=float(cpu_mean),
343 )
344
345 return result, last_output
346
347
348def get_default_result_path(path: str, ext: str = ".csv") -> str:
349 """Get the default result path with Python version appended."""
350 base, _ = os.path.splitext(os.path.realpath(path))
351 dirname = os.path.join(os.path.dirname(base), "data")
352 filename = os.path.basename(base)
353 version_suffix = (
354 f"_{'.'.join(_PYTHON_VERSION.split('.')[:2])}{'t' if _FREE_THREADED else ''}"
355 )
356 return os.path.join(dirname, f"{filename}{version_suffix}{ext}")
357
358
359def save_results_to_csv(
360 results: list[BenchmarkResult[Any]],
361 output_file: str,
362) -> None:
363 """Save benchmark results to a CSV file.
364
365 Flattens the nested BenchmarkResult structure (config + performance metrics)
366 into a flat CSV format. Each row contains both the benchmark configuration
367 fields and the performance metrics.
368
369 Args:
370 results: List of BenchmarkResult instances
371 output_file: Output file path for the CSV file
372 """
373 if not results:
374 raise ValueError("No results to save")
375
376 flattened_results = []
377 for result in results:
378 config_dict = asdict(result.config)
379 # convert bool to int for slight readability improvement of raw CSV file
380 config_dict = {
381 k: (int(v) if isinstance(v, bool) else v) for k, v in config_dict.items()
382 }
383 flattened = {
384 "date": result.date,
385 "python_version": result.python_version,
386 "free_threaded": int(result.free_threaded),
387 **config_dict,
388 "executor_type": result.executor_type,
389 "qps": result.qps,
390 "ci_lower": result.ci_lower,
391 "ci_upper": result.ci_upper,
392 "cpu_percent": result.cpu_percent,
393 }
394 flattened_results.append(flattened)
395
396 # Get all field names from the first result
397 fieldnames = list(flattened_results[0].keys())
398
399 output_path = os.path.realpath(output_file)
400 os.makedirs(os.path.dirname(output_file), exist_ok=True)
401 with open(output_path, "w", newline="") as csvfile:
402 # Write generated marker as first line
403 # Note: Splitting the marker so as to avoid linter consider this file as generated file
404 csvfile.write("# @")
405 csvfile.write("generated\n")
406
407 writer = csv.DictWriter(csvfile, fieldnames=fieldnames)
408 writer.writeheader()
409 for result_dict in flattened_results:
410 writer.writerow(result_dict)
411
412 print(f"Results saved to {output_file}")
413
414
415def load_results_from_csv(
416 input_file: str,
417 config_type: type[ConfigT],
418) -> list[BenchmarkResult[ConfigT]]:
419 """Load benchmark results from a CSV file.
420
421 Reconstructs BenchmarkResult objects from the flattened CSV format created
422 by :py:func:`save_results_to_csv`.
423 Each row in the CSV is parsed into a :py:class:`BenchmarkResult`
424 with the appropriate config type.
425
426 Args:
427 input_file: Input CSV file path
428 config_type: The dataclass type to use for the config field
429
430 Returns:
431 List of BenchmarkResult instances with parsed config objects
432
433 Raises:
434 FileNotFoundError: If input_file does not exist
435 ValueError: If CSV format is invalid or ``config_type`` is not a dataclass
436 """
437 if not hasattr(config_type, "__dataclass_fields__"):
438 raise ValueError(f"config_type must be a dataclass, got {config_type}")
439 fields: dict[str, Any] = config_type.__dataclass_fields__ # pyre-ignore[16]
440
441 # Normalize input path and resolve symbolic links
442 input_file = os.path.realpath(input_file)
443
444 # Get the field names from the config dataclass
445 config_fields = set(fields.keys())
446
447 # Performance metric fields that are part of BenchmarkResult
448 result_fields = {
449 "executor_type",
450 "qps",
451 "ci_lower",
452 "ci_upper",
453 "date",
454 "python_version",
455 "free_threaded",
456 "cpu_percent",
457 }
458
459 results: list[BenchmarkResult[ConfigT]] = []
460
461 TRUES = ("true", "1", "yes")
462
463 with open(input_file, newline="") as csvfile:
464 reader = csv.DictReader((v for v in csvfile if not v.strip().startswith("#")))
465
466 for row in reader:
467 # Split row into config fields and result fields
468 config_dict = {}
469 result_dict = {}
470
471 for key, value in row.items():
472 if key in config_fields:
473 config_dict[key] = value
474 elif key in result_fields:
475 result_dict[key] = value
476 else:
477 # Unknown field - could be from config or result
478 # Try to infer based on whether it matches a config field name
479 config_dict[key] = value
480
481 # Convert string values to appropriate types for config
482 typed_config_dict = {}
483 for field_name, field_info in fields.items():
484 if field_name not in config_dict:
485 continue
486
487 value = config_dict[field_name]
488 field_type = field_info.type
489
490 # Handle type conversions
491 if field_type is int or field_type == "int":
492 typed_config_dict[field_name] = int(value)
493 elif field_type is float or field_type == "float":
494 # pyrefly: ignore [unsupported-operation]
495 typed_config_dict[field_name] = float(value)
496 elif field_type is bool or field_type == "bool":
497 typed_config_dict[field_name] = value.lower() in TRUES
498 else:
499 # Keep as string or use the value as-is
500 # pyrefly: ignore [unsupported-operation]
501 typed_config_dict[field_name] = value
502
503 result = BenchmarkResult(
504 config=config_type(**typed_config_dict),
505 executor_type=result_dict["executor_type"],
506 qps=float(result_dict["qps"]),
507 ci_lower=float(result_dict["ci_lower"]),
508 ci_upper=float(result_dict["ci_upper"]),
509 date=result_dict["date"],
510 python_version=result_dict["python_version"],
511 free_threaded=result_dict["free_threaded"].lower()
512 in ("true", "1", "yes"),
513 cpu_percent=float(result_dict.get("cpu_percent", 0.0)),
514 )
515
516 results.append(result)
517
518 return results
API Reference¶
Functions
- get_default_result_path(path: str, ext: str = '.csv') str[source]¶
Get the default result path with Python version appended.
- load_results_from_csv(input_file: str, config_type: type[ConfigT]) list[BenchmarkResult[ConfigT]][source]¶
Load benchmark results from a CSV file.
Reconstructs BenchmarkResult objects from the flattened CSV format created by
save_results_to_csv(). Each row in the CSV is parsed into aBenchmarkResultwith the appropriate config type.- Parameters:
input_file – Input CSV file path
config_type – The dataclass type to use for the config field
- Returns:
List of BenchmarkResult instances with parsed config objects
- Raises:
FileNotFoundError – If input_file does not exist
ValueError – If CSV format is invalid or
config_typeis not a dataclass
- save_results_to_csv(results: list[BenchmarkResult[Any]], output_file: str) None[source]¶
Save benchmark results to a CSV file.
Flattens the nested BenchmarkResult structure (config + performance metrics) into a flat CSV format. Each row contains both the benchmark configuration fields and the performance metrics.
- Parameters:
results – List of BenchmarkResult instances
output_file – Output file path for the CSV file
Classes
- class BenchmarkRunner(executor_type: ExecutorType, num_workers: int, warmup_iterations: int | None = None)[source]¶
Runner for executing benchmarks with configurable executors.
This class provides a standardized way to run benchmarks with:
Warmup phase to exclude executor initialization overhead
Multiple runs for statistical confidence intervals
Support for different executor types
The executor is initialized and warmed up in the constructor to exclude initialization overhead from benchmark measurements.
- Parameters:
executor_type – Type of executor to use (
"thread","process", or"interpreter")num_workers – Number of concurrent workers
warmup_iterations – Number of warmup iterations (default:
2 * num_workers)
- property executor_type: ExecutorType[source]¶
Get the executor type.
- run(config: ConfigT, func: Callable[[], T], iterations: int, num_runs: int = 5, confidence_level: float = 0.95) tuple[BenchmarkResult[ConfigT], T][source]¶
Run benchmark and return results with configuration.
- Parameters:
config – Benchmark-specific configuration
func – Function to benchmark (takes no arguments)
iterations – Number of iterations per run
num_runs – Number of benchmark runs for confidence interval calculation (default:
5)confidence_level – Confidence level for interval calculation (default:
0.95)
- Returns:
Tuple of (
BenchmarkResult, last output from function)
- class BenchmarkResult[source]¶
Generic benchmark result containing configuration and performance metrics.
This class holds both the benchmark-specific configuration and the common performance statistics. It is parameterized by the config type, which allows each benchmark script to define its own configuration dataclass.
- config: ConfigT¶
Benchmark-specific configuration (e.g., data format, file size, etc.)
- class ExecutorType[source]¶
Supported executor types for concurrent execution.
- INTERPRETER = 'interpreter'¶
-
Requires Python 3.14+.
- PROCESS = 'process'¶
Use
ProcessPoolExecutor.
- THREAD = 'thread'¶
Use
ThreadPoolExecutor.