Benchmark numpy

This example benchmarks the speed of loading data in different formats.

See Case Studies / Data Format for the detail of how data format and the loading function affects the performance of the training pipeline.

Example

$ numactl --membind 0 --cpubind 0 python benchmark_numpy.py --output results.csv
# Plot results
$ python benchmark_numpy_plot.py --input results.csv --output plot.png

Source

Source

Click here to see the source.
  1#!/usr/bin/env python3
  2# Copyright (c) Meta Platforms, Inc. and affiliates.
  3# All rights reserved.
  4#
  5# This source code is licensed under the BSD-style license found in the
  6# LICENSE file in the root directory of this source tree.
  7
  8"""This example benchmarks the speed of loading data in different formats.
  9
 10See `Case Studies / Data Format <../case_studies/data_format.html>`_ for
 11the detail of how data format and the loading function affects
 12the performance of the training pipeline.
 13
 14**Example**
 15
 16.. code-block:: shell
 17
 18   $ numactl --membind 0 --cpubind 0 python benchmark_numpy.py --output results.csv
 19   # Plot results
 20   $ python benchmark_numpy_plot.py --input results.csv --output plot.png
 21
 22"""
 23
 24__all__ = [
 25    "main",
 26    "get_mock_data",
 27    "load_npy",
 28    "load_npy_spdl",
 29    "load_npz",
 30    "load_npz_spdl",
 31    "load_torch",
 32    "BenchmarkConfig",
 33]
 34
 35# pyre-strict
 36
 37import argparse
 38import os
 39from collections.abc import Callable
 40from dataclasses import dataclass
 41from functools import partial
 42from io import BytesIO
 43
 44import numpy as np
 45import spdl.io
 46import torch
 47from numpy.typing import NDArray
 48
 49try:
 50    from examples.benchmark_utils import (  # pyre-ignore[21]
 51        BenchmarkResult,
 52        BenchmarkRunner,
 53        ExecutorType,
 54        get_default_result_path,
 55        save_results_to_csv,
 56    )
 57except ImportError:
 58    from spdl.examples.benchmark_utils import (
 59        BenchmarkResult,
 60        BenchmarkRunner,
 61        ExecutorType,
 62        get_default_result_path,
 63        save_results_to_csv,
 64    )
 65
 66
 67DEFAULT_RESULT_PATH: str = get_default_result_path(__file__)
 68
 69
 70def load_npy(items: list[bytes]) -> list[NDArray]:
 71    """Load arrays from serialized NPY binary strings using :py:func:`numpy.load`."""
 72    return [np.load(BytesIO(item), allow_pickle=False) for item in items]
 73
 74
 75def load_npy_spdl(items: list[bytes]) -> list[NDArray]:
 76    """Load arrays from serialized NPY binary strings using :py:func:`spdl.io.load_npy`."""
 77    return [spdl.io.load_npy(item) for item in items]
 78
 79
 80def load_npz(item: bytes) -> list[NDArray]:
 81    """Load arrays from a serialized NPZ binary string using :py:func:`numpy.load`."""
 82    data = np.load(BytesIO(item))
 83    return list(data.values())
 84
 85
 86def load_npz_spdl(item: bytes) -> list[NDArray]:
 87    """Load arrays from serialized NPZ binary strings using :py:func:`spdl.io.load_npz`."""
 88    data = spdl.io.load_npz(item)
 89    return list(data.values())
 90
 91
 92def load_torch(item: bytes) -> list[NDArray]:
 93    """Load arrays from a serialized PyTorch state dict."""
 94    return list(torch.load(BytesIO(item)).values())
 95
 96
 97def _get_load_fn(
 98    data_format: str, impl: str
 99) -> Callable[[list[bytes]], list[NDArray]] | Callable[[bytes], list[NDArray]]:
100    match data_format:
101        case "torch":
102            return load_torch
103        case "npy":
104            if impl == "spdl":
105                return load_npy_spdl
106            return load_npy
107        case "npz":
108            if impl == "spdl":
109                return load_npz_spdl
110            return load_npz
111        case _:
112            raise ValueError(f"Unexpected data format: {data_format}")
113
114
115def _dump_np(arr: NDArray | dict[str, NDArray], compressed: bool = False) -> bytes:
116    with BytesIO() as buf:
117        if isinstance(arr, dict):
118            if compressed:
119                np.savez_compressed(buf, allow_pickle=False, **arr)
120            else:
121                np.savez(buf, allow_pickle=False, **arr)
122        else:
123            np.save(buf, arr, allow_pickle=False)
124        buf.seek(0)
125        return buf.read()
126
127
128def _dump_torch(arr: dict[str, NDArray]) -> bytes:
129    with BytesIO() as buf:
130        torch.save({k: torch.from_numpy(v) for k, v in arr.items()}, buf)
131        buf.seek(0)
132        return buf.read()
133
134
135def get_mock_data(format: str, compressed: bool = False) -> tuple[bytes, bytes] | bytes:
136    """Generate a single sample in the given format.
137
138    The mock data resemboles an RGB image and its segmentation labels.
139
140    Args:
141        format: One of ``"npz"``, ``"npy"`` or ``"torch"``.
142        compressed: If ``True``, NPZ file is compressed.
143            (i.e. :py:func:`numpy.savez_compressed` is used.)
144
145    Returns:
146        Serialized mock arrays. If ``"npy"`` then arrays are serialized
147        separately. Otherwise arrays are bundled together.
148    """
149    img = np.random.randint(256, size=(3, 640, 480), dtype=np.uint8)
150    lbl = np.random.randint(256, size=(640, 480), dtype=np.uint8)
151
152    match format:
153        case "npz":
154            return _dump_np({"img": img, "lbl": lbl}, compressed=compressed)
155        case "npy":
156            return _dump_np(img), _dump_np(lbl)
157        case "torch":
158            return _dump_torch({"img": img, "lbl": lbl})
159        case _:
160            raise ValueError(f"Unexpected `format`: {format}")
161
162
163@dataclass
164class BenchmarkConfig:
165    """BenchmarkConfig()
166
167    Configuration for a single benchmark run."""
168
169    data_format: str
170    """Data format (``"npy"``, ``"npz"``, or ``"torch"``)"""
171
172    compressed: bool
173    """Whether NPZ file is compressed"""
174
175    impl: str
176    """Implementation (``"np"``, ``"spdl"``, or ``"torch"``)"""
177
178    num_workers: int
179    """Number of concurrent workers"""
180
181
182def _parse_args() -> argparse.Namespace:
183    """Parse command line arguments.
184
185    Returns:
186        Parsed arguments.
187    """
188    parser = argparse.ArgumentParser(
189        description="Benchmark data format loading performance"
190    )
191    parser.add_argument(
192        "--output",
193        type=lambda p: os.path.realpath(p),
194        default=DEFAULT_RESULT_PATH,
195        help="Output path for the results",
196    )
197    return parser.parse_args()
198
199
200def main() -> None:
201    """The entrypoint from CLI."""
202    args = _parse_args()
203
204    # Define explicit configuration lists
205    worker_counts = [32, 16, 8, 4, 2, 1]
206    executor_types = [ExecutorType.PROCESS, ExecutorType.THREAD]
207
208    # Define benchmark configurations
209    # (data_format, compressed, impl)
210    data_configs = [
211        ("torch", False, "torch"),
212        ("npy", False, "np"),
213        ("npy", False, "spdl"),
214        ("npz", False, "np"),
215        ("npz", True, "np"),
216        ("npz", False, "spdl"),
217        ("npz", True, "spdl"),
218    ]
219
220    results: list[BenchmarkResult[BenchmarkConfig]] = []
221    iterations = 1000
222    num_runs = 5
223
224    for num_workers in worker_counts:
225        for executor_type in executor_types:
226            with BenchmarkRunner(
227                executor_type=executor_type,
228                num_workers=num_workers,
229                warmup_iterations=30 * num_workers,
230            ) as runner:
231                for data_format, compressed, impl in data_configs:
232                    data = get_mock_data(data_format, compressed)
233
234                    load_fn = _get_load_fn(data_format, impl)
235
236                    result, _ = runner.run(
237                        BenchmarkConfig(
238                            data_format=data_format,
239                            compressed=compressed,
240                            impl=impl,
241                            num_workers=num_workers,
242                        ),
243                        partial(load_fn, data),
244                        iterations,
245                        num_runs=num_runs,
246                    )
247
248                    results.append(result)
249                    print(
250                        f"{data_format},{compressed},{impl},{executor_type.value},{num_workers},{result.qps:.1f}"
251                    )
252
253    save_results_to_csv(results, args.output)
254    plot_output = args.output.replace(".csv", ".png")
255    print(
256        f"\nBenchmark complete. To generate plots, run:\n"
257        f"python benchmark_numpy_plot.py --input {args.output} --output {plot_output}"
258    )
259
260
261if __name__ == "__main__":
262    main()

API Reference

Functions

main() None[source]

The entrypoint from CLI.

get_mock_data(format: str, compressed: bool = False) tuple[bytes, bytes] | bytes[source]

Generate a single sample in the given format.

The mock data resemboles an RGB image and its segmentation labels.

Parameters:
  • format – One of "npz", "npy" or "torch".

  • compressed – If True, NPZ file is compressed. (i.e. numpy.savez_compressed() is used.)

Returns:

Serialized mock arrays. If "npy" then arrays are serialized separately. Otherwise arrays are bundled together.

load_npy(items: list[bytes]) list[ndarray[tuple[Any, ...], dtype[_ScalarT]]][source]

Load arrays from serialized NPY binary strings using numpy.load().

load_npy_spdl(items: list[bytes]) list[ndarray[tuple[Any, ...], dtype[_ScalarT]]][source]

Load arrays from serialized NPY binary strings using spdl.io.load_npy().

load_npz(item: bytes) list[ndarray[tuple[Any, ...], dtype[_ScalarT]]][source]

Load arrays from a serialized NPZ binary string using numpy.load().

load_npz_spdl(item: bytes) list[ndarray[tuple[Any, ...], dtype[_ScalarT]]][source]

Load arrays from serialized NPZ binary strings using spdl.io.load_npz().

load_torch(item: bytes) list[ndarray[tuple[Any, ...], dtype[_ScalarT]]][source]

Load arrays from a serialized PyTorch state dict.

Classes

class BenchmarkConfig[source]

Configuration for a single benchmark run.

compressed: bool

Whether NPZ file is compressed

data_format: str

Data format ("npy", "npz", or "torch")

impl: str

Implementation ("np", "spdl", or "torch")

num_workers: int

Number of concurrent workers