Image dataloading

Benchmark the performance of loading images from local file system to GPUs.

Given a list of image files to process, this script spawns subprocesses, each of which load images and send them to the corresponding GPUs, then collect the runtime statistics.

flowchart A[Main Process] subgraph P1[Worker Process 1] subgraph TP1[Thread Pool] t11[Thread] t12[Thread] end end G1[GPU 1] subgraph P3[Worker Process N] subgraph TP3[Thread Pool] t31[Thread] t32[Thread] end end G3[GPU N] A --> P1 A --> P3 t11 --> G1 t12 --> G1 t31 --> G3 t32 --> G3

A file list can be created, for example, by:

cd /data/users/moto/imagenet/
find train -name '*.JPEG' > ~/imagenet.train.flist

To run the benchmark, pass it to the script like the following.

python image_dataloading.py
    --input-flist ~/imagenet.train.flist
    --prefix /data/users/moto/imagenet/
    --num-workers 8 # The number of GPUs

Source

Source

Click here to see the source.
  1#!/usr/bin/env python3
  2# Copyright (c) Meta Platforms, Inc. and affiliates.
  3# All rights reserved.
  4#
  5# This source code is licensed under the BSD-style license found in the
  6# LICENSE file in the root directory of this source tree.
  7
  8"""Benchmark the performance of loading images from local file system to GPUs.
  9
 10Given a list of image files to process, this script spawns subprocesses,
 11each of which load images and send them to the corresponding GPUs, then
 12collect the runtime statistics.
 13
 14.. include:: ../plots/image_dataloading_chart.txt
 15
 16A file list can be created, for example, by:
 17
 18.. code-block:: bash
 19
 20   cd /data/users/moto/imagenet/
 21   find train -name '*.JPEG' > ~/imagenet.train.flist
 22
 23To run the benchmark,  pass it to the script like the following.
 24
 25.. code-block::
 26
 27   python image_dataloading.py
 28       --input-flist ~/imagenet.train.flist
 29       --prefix /data/users/moto/imagenet/
 30       --num-workers 8 # The number of GPUs
 31"""
 32
 33# pyre-strict
 34
 35import argparse
 36import logging
 37import signal
 38import time
 39from argparse import Namespace
 40from collections.abc import Iterator
 41from dataclasses import dataclass
 42from functools import partial
 43from pathlib import Path
 44from threading import Event
 45
 46import spdl.io
 47import spdl.io.utils
 48import torch
 49from spdl.io import CUDAConfig
 50from spdl.pipeline import Pipeline, PipelineBuilder
 51from torch import Tensor
 52
 53_LG: logging.Logger = logging.getLogger(__name__)
 54
 55__all__ = [
 56    "entrypoint",
 57    "worker_entrypoint",
 58    "benchmark",
 59    "source",
 60    "batch_decode",
 61    "get_pipeline",
 62    "PerfResult",
 63]
 64
 65
 66def _parse_args(args: list[str]) -> Namespace:
 67    parser = argparse.ArgumentParser(
 68        description=__doc__,
 69        formatter_class=argparse.RawDescriptionHelpFormatter,
 70    )
 71    parser.add_argument("--debug", action="store_true")
 72    parser.add_argument("--input-flist", type=Path, required=True)
 73    parser.add_argument("--max-samples", type=int)
 74    parser.add_argument("--prefix", required=True)
 75    parser.add_argument("--batch-size", type=int, default=32)
 76    parser.add_argument("--trace", type=Path)
 77    parser.add_argument("--buffer-size", type=int, default=16)
 78    parser.add_argument("--num-threads", type=int, default=16)
 79    parser.add_argument("--worker-id", type=int, required=True)
 80    parser.add_argument("--num-workers", type=int, required=True)
 81    ns = parser.parse_args(args)
 82    if ns.trace:
 83        ns.max_samples = ns.batch_size * 40
 84    return ns
 85
 86
 87def source(
 88    path: Path,
 89    prefix: str = "",
 90    split_size: int = 1,
 91    split_id: int = 0,
 92) -> Iterator[str]:
 93    """Iterate a file containing a list of paths, while optionally skipping some.
 94
 95    Args:
 96        path: Path to the file containing list of file paths.
 97        prefix: Prepended to the paths in the list.
 98        split_size: Split the paths in to this number of subsets.
 99        split_id: The index of this split.
100            Paths at ``line_number % split_size == split_id`` are returned.
101
102    Yields:
103        Path: The paths of the specified split.
104    """
105    with open(path) as f:
106        for i, line in enumerate(f):
107            if i % split_size == split_id:
108                if line := line.strip():
109                    yield prefix + line
110
111
112def batch_decode(
113    srcs: list[str],
114    device_config: spdl.io.CUDAConfig,
115    width: int = 224,
116    height: int = 224,
117) -> Tensor:
118    """Given image paths, decode, resize, batch and optionally send them to GPU.
119
120    Args:
121        srcs: List of image paths.
122        width, height: The size of the images to batch.
123        device_config: When provided, the data are sent to the specified GPU.
124
125    Returns:
126        The batch tensor.
127    """
128    buffer = spdl.io.load_image_batch(
129        srcs,
130        width=width,
131        height=height,
132        pix_fmt="rgb24",
133        device_config=device_config,
134        strict=False,
135    )
136    return spdl.io.to_torch(buffer)
137
138
139def get_pipeline(
140    src: Iterator[str],
141    batch_size: int,
142    device_config: CUDAConfig,
143    buffer_size: int,
144    num_threads: int,
145) -> Pipeline:
146    """Build image data loading pipeline.
147
148    The pipeline uses :py:func:`batch_decode` for decoding images concurrently
149    and send the resulting data to GPU.
150
151    Args:
152        src: Pipeline source. Generator that yields image paths.
153            See :py:func:`source`.
154        batch_size: The number of images in a batch.
155        device_config: The configuration of target CUDA device.
156        buffer_size: The size of buffer for the resulting batch image Tensor.
157        num_threads: The number of threads in the pipeline.
158
159    Returns:
160        The pipeline that performs batch image decoding and device transfer.
161    """
162    decode = partial(batch_decode, device_config=device_config)
163
164    pipeline = (
165        PipelineBuilder()
166        .add_source(src)
167        .aggregate(batch_size)
168        .pipe(decode, concurrency=num_threads)
169        .add_sink(buffer_size)
170        .build(num_threads=num_threads, report_stats_interval=15)
171    )
172    return pipeline
173
174
175def _get_pipeline(args: Namespace) -> Pipeline:
176    return get_pipeline(
177        source(args.input_flist, args.prefix, args.num_workers, args.worker_id),
178        args.batch_size,
179        device_config=(
180            spdl.io.cuda_config(
181                device_index=args.worker_id,
182                allocator=(
183                    torch.cuda.caching_allocator_alloc,
184                    torch.cuda.caching_allocator_delete,
185                ),
186            )
187        ),
188        buffer_size=args.buffer_size,
189        num_threads=args.num_threads,
190    )
191
192
193@dataclass
194class PerfResult:
195    """Used to report the worker performance to the main process."""
196
197    elapsed: float
198    """The time it took to process all the inputs."""
199
200    num_batches: int
201    """The number of batches processed."""
202
203    num_frames: int
204    """The number of frames processed."""
205
206
207def worker_entrypoint(args_: list[str]) -> PerfResult:
208    """Entrypoint for worker process. Load images to a GPU and measure its performance.
209
210    It builds a :py:class:`~spdl.pipeline.Pipeline` object using :py:func:`get_pipeline`
211    function and run it with :py:func:`benchmark` function.
212    """
213    args = _parse_args(args_)
214    _init(args.debug, args.worker_id)
215
216    _LG.info(args)
217
218    pipeline = _get_pipeline(args)
219    print(pipeline)
220
221    device = torch.device(f"cuda:{args.worker_id}")
222
223    ev: Event = Event()
224
225    def handler_stop_signals(_signum, _frame) -> None:
226        ev.set()
227
228    signal.signal(signal.SIGTERM, handler_stop_signals)
229
230    # Warm up
231    torch.zeros([1, 1], device=device)
232
233    trace_path = f"{args.trace}.{args.worker_id}"
234    with (
235        pipeline.auto_stop(),
236        spdl.io.utils.tracing(trace_path, enable=args.trace is not None),
237    ):
238        return benchmark(pipeline.get_iterator(), ev)
239
240
241def benchmark(loader: Iterator[Tensor], stop_requested: Event) -> PerfResult:
242    """The main loop that measures the performance of dataloading.
243
244    Args:
245        loader: The dataloader to benchmark.
246        stop_requested: Used to interrupt the benchmark loop.
247
248    Returns:
249        The performance result.
250    """
251    t0 = time.monotonic()
252    num_frames = num_batches = 0
253    try:
254        for batch in loader:
255            num_frames += batch.shape[0]
256            num_batches += 1
257
258            if stop_requested.is_set():
259                break
260
261    finally:
262        elapsed = time.monotonic() - t0
263
264    return PerfResult(elapsed, num_batches, num_frames)
265
266
267def _init_logging(debug: bool = False, worker_id: int | None = None) -> None:
268    fmt = "%(asctime)s [%(filename)s:%(lineno)d] [%(levelname)s] %(message)s"
269    if worker_id is not None:
270        fmt = f"[{worker_id}:%(thread)d] {fmt}"
271    level = logging.DEBUG if debug else logging.INFO
272    logging.basicConfig(format=fmt, level=level)
273
274
275def _init(debug: bool, worker_id: int) -> None:
276    _init_logging(debug, worker_id)
277
278
279def _parse_process_args(args: list[str] | None) -> tuple[Namespace, list[str]]:
280    parser = argparse.ArgumentParser(
281        description=__doc__,
282        formatter_class=argparse.RawDescriptionHelpFormatter,
283    )
284    parser.add_argument("--num-workers", type=int, default=8)
285    return parser.parse_known_args(args)
286
287
288def entrypoint(args: list[str] | None = None) -> None:
289    """CLI entrypoint. Launch the worker processes,
290    each of which load images and send them to GPU."""
291    ns, args = _parse_process_args(args)
292
293    args_set = [
294        [*args, f"--worker-id={i}", f"--num-workers={ns.num_workers}"]
295        for i in range(ns.num_workers)
296    ]
297
298    from multiprocessing import Pool
299
300    with Pool(processes=ns.num_workers) as pool:
301        _init_logging()
302        _LG.info("Spawned: %d workers", ns.num_workers)
303
304        vals = pool.map(worker_entrypoint, args_set)
305
306    ave_time = sum(v.elapsed for v in vals) / len(vals)
307    total_frames = sum(v.num_frames for v in vals)
308    total_batches = sum(v.num_batches for v in vals)
309
310    _LG.info(f"{ave_time=:.2f}, {total_frames=}, {total_batches=}")
311
312    FPS = total_frames / ave_time
313    BPS = total_batches / ave_time
314    _LG.info(f"Aggregated {FPS=:.2f}, {BPS=:.2f}")
315
316
317if __name__ == "__main__":
318    entrypoint()

Functions

Functions

entrypoint(args: list[str] | None = None) None[source]

CLI entrypoint. Launch the worker processes, each of which load images and send them to GPU.

worker_entrypoint(args_: list[str]) PerfResult[source]

Entrypoint for worker process. Load images to a GPU and measure its performance.

It builds a Pipeline object using get_pipeline() function and run it with benchmark() function.

benchmark(loader: Iterator[Tensor], stop_requested: Event) PerfResult[source]

The main loop that measures the performance of dataloading.

Parameters:
  • loader – The dataloader to benchmark.

  • stop_requested – Used to interrupt the benchmark loop.

Returns:

The performance result.

source(path: Path, prefix: str = '', split_size: int = 1, split_id: int = 0) Iterator[str][source]

Iterate a file containing a list of paths, while optionally skipping some.

Parameters:
  • path – Path to the file containing list of file paths.

  • prefix – Prepended to the paths in the list.

  • split_size – Split the paths in to this number of subsets.

  • split_id – The index of this split. Paths at line_number % split_size == split_id are returned.

Yields:

Path – The paths of the specified split.

batch_decode(srcs: list[str], device_config: CUDAConfig, width: int = 224, height: int = 224) Tensor[source]

Given image paths, decode, resize, batch and optionally send them to GPU.

Parameters:
  • srcs – List of image paths.

  • width – The size of the images to batch.

  • height – The size of the images to batch.

  • device_config – When provided, the data are sent to the specified GPU.

Returns:

The batch tensor.

get_pipeline(src: Iterator[str], batch_size: int, device_config: CUDAConfig, buffer_size: int, num_threads: int) Pipeline[source]

Build image data loading pipeline.

The pipeline uses batch_decode() for decoding images concurrently and send the resulting data to GPU.

Parameters:
  • src – Pipeline source. Generator that yields image paths. See source().

  • batch_size – The number of images in a batch.

  • device_config – The configuration of target CUDA device.

  • buffer_size – The size of buffer for the resulting batch image Tensor.

  • num_threads – The number of threads in the pipeline.

Returns:

The pipeline that performs batch image decoding and device transfer.

Classes

Classes

class PerfResult(elapsed: float, num_batches: int, num_frames: int)[source]

Used to report the worker performance to the main process.

elapsed: float

The time it took to process all the inputs.

num_batches: int

The number of batches processed.

num_frames: int

The number of frames processed.