Benchmark numpy

This example benchmarks the speed of loading data in different formats.

See Case Studies / Data Format for the detail of how data format and the loading function affects the performance of the training pipeline.

Example

$ numactl --membind 0 --cpubind 0 python benchmark_numpy.py --output results.csv
# Plot results
$ python benchmark_numpy_plot.py --input results.csv --output plot.png

Source

Source

Click here to see the source.
  1#!/usr/bin/env python3
  2# Copyright (c) Meta Platforms, Inc. and affiliates.
  3# All rights reserved.
  4#
  5# This source code is licensed under the BSD-style license found in the
  6# LICENSE file in the root directory of this source tree.
  7
  8"""This example benchmarks the speed of loading data in different formats.
  9
 10See `Case Studies / Data Format <../case_studies/data_format.html>`_ for
 11the detail of how data format and the loading function affects
 12the performance of the training pipeline.
 13
 14**Example**
 15
 16.. code-block:: shell
 17
 18   $ numactl --membind 0 --cpubind 0 python benchmark_numpy.py --output results.csv
 19   # Plot results
 20   $ python benchmark_numpy_plot.py --input results.csv --output plot.png
 21
 22"""
 23
 24__all__ = [
 25    "main",
 26    "get_mock_data",
 27    "load_npy",
 28    "load_npy_spdl",
 29    "load_npz",
 30    "load_npz_spdl",
 31    "load_torch",
 32    "BenchmarkConfig",
 33]
 34
 35# pyre-strict
 36
 37import argparse
 38import os
 39from collections.abc import Callable
 40from dataclasses import dataclass
 41from functools import partial
 42from io import BytesIO
 43
 44import numpy as np
 45import spdl.io
 46import torch
 47from numpy.typing import NDArray
 48
 49try:
 50    from examples.benchmark_utils import (  # pyre-ignore[21]
 51        BenchmarkResult,
 52        BenchmarkRunner,
 53        ExecutorType,
 54        get_default_result_path,
 55        save_results_to_csv,
 56    )
 57except ImportError:
 58    from spdl.examples.benchmark_utils import (
 59        BenchmarkResult,
 60        BenchmarkRunner,
 61        ExecutorType,
 62        get_default_result_path,
 63        save_results_to_csv,
 64    )
 65
 66
 67DEFAULT_RESULT_PATH: str = get_default_result_path(__file__)
 68
 69
 70def load_npy(items: list[bytes]) -> list[NDArray]:
 71    """Load arrays from serialized NPY binary strings using :py:func:`numpy.load`."""
 72    return [np.load(BytesIO(item), allow_pickle=False) for item in items]
 73
 74
 75def load_npy_spdl(items: list[bytes]) -> list[NDArray]:
 76    """Load arrays from serialized NPY binary strings using :py:func:`spdl.io.load_npy`."""
 77    return [spdl.io.load_npy(item) for item in items]
 78
 79
 80def load_npz(item: bytes) -> list[NDArray]:
 81    """Load arrays from a serialized NPZ binary string using :py:func:`numpy.load`."""
 82    data = np.load(BytesIO(item))
 83    return list(data.values())
 84
 85
 86def load_npz_spdl(item: bytes) -> list[NDArray]:
 87    """Load arrays from serialized NPZ binary strings using :py:func:`spdl.io.load_npz`."""
 88    data = spdl.io.load_npz(item)
 89    return list(data.values())
 90
 91
 92def load_torch(item: bytes) -> list[NDArray]:
 93    """Load arrays from a serialized PyTorch state dict."""
 94    return list(torch.load(BytesIO(item)).values())
 95
 96
 97def _get_load_fn(
 98    data_format: str, impl: str
 99) -> Callable[[list[bytes]], list[NDArray]] | Callable[[bytes], list[NDArray]]:
100    match data_format:
101        case "torch":
102            return load_torch
103        case "npy":
104            if impl == "spdl":
105                return load_npy_spdl
106            return load_npy
107        case "npz":
108            if impl == "spdl":
109                return load_npz_spdl
110            return load_npz
111        case _:
112            raise ValueError(f"Unexpected data format: {data_format}")
113
114
115def _dump_np(arr: NDArray | dict[str, NDArray], compressed: bool = False) -> bytes:
116    with BytesIO() as buf:
117        if isinstance(arr, dict):
118            if compressed:
119                np.savez_compressed(buf, allow_pickle=False, **arr)
120            else:
121                np.savez(buf, allow_pickle=False, **arr)
122        else:
123            np.save(buf, arr, allow_pickle=False)
124        buf.seek(0)
125        return buf.read()
126
127
128def _dump_torch(arr: dict[str, NDArray]) -> bytes:
129    with BytesIO() as buf:
130        torch.save({k: torch.from_numpy(v) for k, v in arr.items()}, buf)
131        buf.seek(0)
132        return buf.read()
133
134
135def get_mock_data(format: str, compressed: bool = False) -> tuple[bytes, bytes] | bytes:
136    """Generate a single sample in the given format.
137
138    The mock data resemboles an RGB image and its segmentation labels.
139
140    Args:
141        format: One of ``"npz"``, ``"npy"`` or ``"torch"``.
142        compressed: If ``True``, NPZ file is compressed.
143            (i.e. :py:func:`numpy.savez_compressed` is used.)
144
145    Returns:
146        Serialized mock arrays. If ``"npy"`` then arrays are serialized
147        separately. Otherwise arrays are bundled together.
148    """
149    # pyrefly: ignore [no-matching-overload]
150    img = np.random.randint(256, size=(3, 640, 480), dtype=np.uint8)
151    # pyrefly: ignore [no-matching-overload]
152    lbl = np.random.randint(256, size=(640, 480), dtype=np.uint8)
153
154    match format:
155        case "npz":
156            return _dump_np({"img": img, "lbl": lbl}, compressed=compressed)
157        case "npy":
158            return _dump_np(img), _dump_np(lbl)
159        case "torch":
160            return _dump_torch({"img": img, "lbl": lbl})
161        case _:
162            raise ValueError(f"Unexpected `format`: {format}")
163
164
165@dataclass
166class BenchmarkConfig:
167    """BenchmarkConfig()
168
169    Configuration for a single benchmark run."""
170
171    data_format: str
172    """Data format (``"npy"``, ``"npz"``, or ``"torch"``)"""
173
174    compressed: bool
175    """Whether NPZ file is compressed"""
176
177    impl: str
178    """Implementation (``"np"``, ``"spdl"``, or ``"torch"``)"""
179
180    num_workers: int
181    """Number of concurrent workers"""
182
183
184def _parse_args() -> argparse.Namespace:
185    """Parse command line arguments.
186
187    Returns:
188        Parsed arguments.
189    """
190    parser = argparse.ArgumentParser(
191        description="Benchmark data format loading performance"
192    )
193    parser.add_argument(
194        "--output",
195        type=lambda p: os.path.realpath(p),
196        default=DEFAULT_RESULT_PATH,
197        help="Output path for the results",
198    )
199    return parser.parse_args()
200
201
202def main() -> None:
203    """The entrypoint from CLI."""
204    args = _parse_args()
205
206    # Define explicit configuration lists
207    worker_counts = [32, 16, 8, 4, 2, 1]
208    executor_types = [ExecutorType.PROCESS, ExecutorType.THREAD]
209
210    # Define benchmark configurations
211    # (data_format, compressed, impl)
212    data_configs = [
213        ("torch", False, "torch"),
214        ("npy", False, "np"),
215        ("npy", False, "spdl"),
216        ("npz", False, "np"),
217        ("npz", True, "np"),
218        ("npz", False, "spdl"),
219        ("npz", True, "spdl"),
220    ]
221
222    results: list[BenchmarkResult[BenchmarkConfig]] = []
223    iterations = 1000
224    num_runs = 5
225
226    for num_workers in worker_counts:
227        for executor_type in executor_types:
228            with BenchmarkRunner(
229                executor_type=executor_type,
230                num_workers=num_workers,
231                warmup_iterations=30 * num_workers,
232            ) as runner:
233                for data_format, compressed, impl in data_configs:
234                    data = get_mock_data(data_format, compressed)
235
236                    load_fn = _get_load_fn(data_format, impl)
237
238                    result, _ = runner.run(
239                        BenchmarkConfig(
240                            data_format=data_format,
241                            compressed=compressed,
242                            impl=impl,
243                            num_workers=num_workers,
244                        ),
245                        partial(load_fn, data),
246                        iterations,
247                        num_runs=num_runs,
248                    )
249
250                    results.append(result)
251                    print(
252                        f"{data_format},{compressed},{impl},{executor_type.value},{num_workers},{result.qps:.1f}"
253                    )
254
255    save_results_to_csv(results, args.output)
256    plot_output = args.output.replace(".csv", ".png")
257    print(
258        f"\nBenchmark complete. To generate plots, run:\n"
259        f"python benchmark_numpy_plot.py --input {args.output} --output {plot_output}"
260    )
261
262
263if __name__ == "__main__":
264    main()

API Reference

Functions

main() None[source]

The entrypoint from CLI.

get_mock_data(format: str, compressed: bool = False) tuple[bytes, bytes] | bytes[source]

Generate a single sample in the given format.

The mock data resemboles an RGB image and its segmentation labels.

Parameters:
  • format – One of "npz", "npy" or "torch".

  • compressed – If True, NPZ file is compressed. (i.e. numpy.savez_compressed() is used.)

Returns:

Serialized mock arrays. If "npy" then arrays are serialized separately. Otherwise arrays are bundled together.

load_npy(items: list[bytes]) list[NDArray][source]

Load arrays from serialized NPY binary strings using numpy.load().

load_npy_spdl(items: list[bytes]) list[NDArray][source]

Load arrays from serialized NPY binary strings using spdl.io.load_npy().

load_npz(item: bytes) list[NDArray][source]

Load arrays from a serialized NPZ binary string using numpy.load().

load_npz_spdl(item: bytes) list[NDArray][source]

Load arrays from serialized NPZ binary strings using spdl.io.load_npz().

load_torch(item: bytes) list[NDArray][source]

Load arrays from a serialized PyTorch state dict.

Classes

class BenchmarkConfig[source]

Configuration for a single benchmark run.

compressed: bool

Whether NPZ file is compressed

data_format: str

Data format ("npy", "npz", or "torch")

impl: str

Implementation ("np", "spdl", or "torch")

num_workers: int

Number of concurrent workers