Benchmark numpy

This example benchmarks the speed of loading data in different formats.

See Case Studies / Data Format for the detail of how data format and the loading function affects the performance of the training pipeline.

Source

Source

Click here to see the source.
  1#!/usr/bin/env python3
  2# Copyright (c) Meta Platforms, Inc. and affiliates.
  3# All rights reserved.
  4#
  5# This source code is licensed under the BSD-style license found in the
  6# LICENSE file in the root directory of this source tree.
  7
  8"""This example benchmarks the speed of loading data in different formats.
  9
 10See `Case Studies / Data Format <../case_studies/data_format.html>`_ for
 11the detail of how data format and the loading function affects
 12the performance of the training pipeline.
 13"""
 14
 15__all__ = [
 16    "main",
 17    "get_mock_data",
 18    "load_npy",
 19    "load_npy_spdl",
 20    "load_npz",
 21    "load_npz_spdl",
 22    "load_torch",
 23    "BenchmarkConfig",
 24]
 25
 26# pyre-strict
 27
 28import argparse
 29import os
 30from collections.abc import Callable
 31from dataclasses import dataclass
 32from functools import partial
 33from io import BytesIO
 34
 35import numpy as np
 36import spdl.io
 37import torch
 38from numpy.typing import NDArray
 39
 40try:
 41    from examples.benchmark_utils import (  # pyre-ignore[21]
 42        BenchmarkResult,
 43        BenchmarkRunner,
 44        ExecutorType,
 45        get_default_result_path,
 46        save_results_to_csv,
 47    )
 48except ImportError:
 49    from spdl.examples.benchmark_utils import (
 50        BenchmarkResult,
 51        BenchmarkRunner,
 52        ExecutorType,
 53        get_default_result_path,
 54        save_results_to_csv,
 55    )
 56
 57
 58DEFAULT_RESULT_PATH: str = get_default_result_path(__file__)
 59
 60
 61def load_npy(items: list[bytes]) -> list[NDArray]:
 62    """Load arrays from serialized NPY binary strings using :py:func:`numpy.load`."""
 63    return [np.load(BytesIO(item), allow_pickle=False) for item in items]
 64
 65
 66def load_npy_spdl(items: list[bytes]) -> list[NDArray]:
 67    """Load arrays from serialized NPY binary strings using :py:func:`spdl.io.load_npy`."""
 68    return [spdl.io.load_npy(item) for item in items]
 69
 70
 71def load_npz(item: bytes) -> list[NDArray]:
 72    """Load arrays from a serialized NPZ binary string using :py:func:`numpy.load`."""
 73    data = np.load(BytesIO(item))
 74    return list(data.values())
 75
 76
 77def load_npz_spdl(item: bytes) -> list[NDArray]:
 78    """Load arrays from serialized NPZ binary strings using :py:func:`spdl.io.load_npz`."""
 79    data = spdl.io.load_npz(item)
 80    return list(data.values())
 81
 82
 83def load_torch(item: bytes) -> list[NDArray]:
 84    """Load arrays from a serialized PyTorch state dict."""
 85    return list(torch.load(BytesIO(item)).values())
 86
 87
 88def _get_load_fn(
 89    data_format: str, impl: str
 90) -> Callable[[list[bytes]], list[NDArray]] | Callable[[bytes], list[NDArray]]:
 91    match data_format:
 92        case "torch":
 93            return load_torch
 94        case "npy":
 95            if impl == "spdl":
 96                return load_npy_spdl
 97            return load_npy
 98        case "npz":
 99            if impl == "spdl":
100                return load_npz_spdl
101            return load_npz
102        case _:
103            raise ValueError(f"Unexpected data format: {data_format}")
104
105
106def _dump_np(arr: NDArray | dict[str, NDArray], compressed: bool = False) -> bytes:
107    with BytesIO() as buf:
108        if isinstance(arr, dict):
109            if compressed:
110                np.savez_compressed(buf, allow_pickle=False, **arr)
111            else:
112                np.savez(buf, allow_pickle=False, **arr)
113        else:
114            np.save(buf, arr, allow_pickle=False)
115        buf.seek(0)
116        return buf.read()
117
118
119def _dump_torch(arr: dict[str, NDArray]) -> bytes:
120    with BytesIO() as buf:
121        torch.save({k: torch.from_numpy(v) for k, v in arr.items()}, buf)
122        buf.seek(0)
123        return buf.read()
124
125
126def get_mock_data(format: str, compressed: bool = False) -> tuple[bytes, bytes] | bytes:
127    """Generate a single sample in the given format.
128
129    The mock data resemboles an RGB image and its segmentation labels.
130
131    Args:
132        format: One of ``"npz"``, ``"npy"`` or ``"torch"``.
133        compressed: If ``True``, NPZ file is compressed.
134            (i.e. :py:func:`numpy.savez_compressed` is used.)
135
136    Returns:
137        Serialized mock arrays. If ``"npy"`` then arrays are serialized
138        separately. Otherwise arrays are bundled together.
139    """
140    img = np.random.randint(256, size=(3, 640, 480), dtype=np.uint8)
141    lbl = np.random.randint(256, size=(640, 480), dtype=np.uint8)
142
143    match format:
144        case "npz":
145            return _dump_np({"img": img, "lbl": lbl}, compressed=compressed)
146        case "npy":
147            return _dump_np(img), _dump_np(lbl)
148        case "torch":
149            return _dump_torch({"img": img, "lbl": lbl})
150        case _:
151            raise ValueError(f"Unexpected `format`: {format}")
152
153
154@dataclass
155class BenchmarkConfig:
156    """Configuration for a single benchmark run."""
157
158    data_format: str
159    compressed: bool
160    impl: str
161    num_workers: int
162
163
164def _parse_args() -> argparse.Namespace:
165    """Parse command line arguments.
166
167    Returns:
168        Parsed arguments.
169    """
170    parser = argparse.ArgumentParser(
171        description="Benchmark data format loading performance"
172    )
173    parser.add_argument(
174        "--output",
175        type=lambda p: os.path.realpath(p),
176        default=DEFAULT_RESULT_PATH,
177        help="Output path for the results",
178    )
179    return parser.parse_args()
180
181
182def main() -> None:
183    """The entrypoint from CLI."""
184    args = _parse_args()
185
186    # Define explicit configuration lists
187    worker_counts = [32, 16, 8, 4, 2, 1]
188    executor_types = [ExecutorType.PROCESS, ExecutorType.THREAD]
189
190    # Define benchmark configurations
191    # (data_format, compressed, impl)
192    data_configs = [
193        ("torch", False, "torch"),
194        ("npy", False, "np"),
195        ("npy", False, "spdl"),
196        ("npz", False, "np"),
197        ("npz", True, "np"),
198        ("npz", False, "spdl"),
199        ("npz", True, "spdl"),
200    ]
201
202    results: list[BenchmarkResult[BenchmarkConfig]] = []
203    iterations = 1000
204    num_runs = 5
205
206    for num_workers in worker_counts:
207        for executor_type in executor_types:
208            with BenchmarkRunner(
209                executor_type=executor_type,
210                num_workers=num_workers,
211                warmup_iterations=30 * num_workers,
212            ) as runner:
213                for data_format, compressed, impl in data_configs:
214                    data = get_mock_data(data_format, compressed)
215
216                    load_fn = _get_load_fn(data_format, impl)
217
218                    result, _ = runner.run(
219                        BenchmarkConfig(
220                            data_format=data_format,
221                            compressed=compressed,
222                            impl=impl,
223                            num_workers=num_workers,
224                        ),
225                        partial(load_fn, data),
226                        iterations,
227                        num_runs=num_runs,
228                    )
229
230                    results.append(result)
231                    print(
232                        f"{data_format},{compressed},{impl},{executor_type.value},{num_workers},{result.qps:.1f}"
233                    )
234
235    save_results_to_csv(results, args.output)
236    plot_output = args.output.replace(".csv", ".png")
237    print(
238        f"\nBenchmark complete. To generate plots, run:\n"
239        f"python benchmark_numpy_plot.py --input {args.output} --output {plot_output}"
240    )
241
242
243if __name__ == "__main__":
244    main()

Functions

Functions

main() None[source]

The entrypoint from CLI.

get_mock_data(format: str, compressed: bool = False) tuple[bytes, bytes] | bytes[source]

Generate a single sample in the given format.

The mock data resemboles an RGB image and its segmentation labels.

Parameters:
  • format – One of "npz", "npy" or "torch".

  • compressed – If True, NPZ file is compressed. (i.e. numpy.savez_compressed() is used.)

Returns:

Serialized mock arrays. If "npy" then arrays are serialized separately. Otherwise arrays are bundled together.

load_npy(items: list[bytes]) list[ndarray[tuple[Any, ...], dtype[_ScalarT]]][source]

Load arrays from serialized NPY binary strings using numpy.load().

load_npy_spdl(items: list[bytes]) list[ndarray[tuple[Any, ...], dtype[_ScalarT]]][source]

Load arrays from serialized NPY binary strings using spdl.io.load_npy().

load_npz(item: bytes) list[ndarray[tuple[Any, ...], dtype[_ScalarT]]][source]

Load arrays from a serialized NPZ binary string using numpy.load().

load_npz_spdl(item: bytes) list[ndarray[tuple[Any, ...], dtype[_ScalarT]]][source]

Load arrays from serialized NPZ binary strings using spdl.io.load_npz().

load_torch(item: bytes) list[ndarray[tuple[Any, ...], dtype[_ScalarT]]][source]

Load arrays from a serialized PyTorch state dict.

Classes

Classes

class BenchmarkConfig(data_format: str, compressed: bool, impl: str, num_workers: int)[source]

Configuration for a single benchmark run.

compressed: bool
data_format: str
impl: str
num_workers: int