image_dataloading

Benchmark the performance of loading images from local file system to GPUs.

Given a list of image files to process, this script spawns subprocesses, each of which load images and send them to the corresponding GPUs, then collect the runtime statistics.

flowchart A[Main Process] subgraph P1[Worker Process 1] subgraph TP1[Thread Pool] t11[Thread] t12[Thread] end end G1[GPU 1] subgraph P3[Worker Process N] subgraph TP3[Thread Pool] t31[Thread] t32[Thread] end end G3[GPU N] A --> P1 A --> P3 t11 --> G1 t12 --> G1 t31 --> G3 t32 --> G3

A file list can be created, for example, by:

cd /data/users/moto/imagenet/
find train -name '*.JPEG' > ~/imagenet.train.flist

To run the benchmark, pass it to the script like the following.

python image_dataloading.py
    --input-flist ~/imagenet.train.flist
    --prefix /data/users/moto/imagenet/
    --num-workers 8 # The number of GPUs

Source

Source

Click here to see the source.
  1#!/usr/bin/env python3
  2# Copyright (c) Meta Platforms, Inc. and affiliates.
  3# All rights reserved.
  4#
  5# This source code is licensed under the BSD-style license found in the
  6# LICENSE file in the root directory of this source tree.
  7
  8"""Benchmark the performance of loading images from local file system to GPUs.
  9
 10Given a list of image files to process, this script spawns subprocesses,
 11each of which load images and send them to the corresponding GPUs, then
 12collect the runtime statistics.
 13
 14.. include:: ../plots/image_dataloading_chart.txt
 15
 16A file list can be created, for example, by:
 17
 18.. code-block:: bash
 19
 20   cd /data/users/moto/imagenet/
 21   find train -name '*.JPEG' > ~/imagenet.train.flist
 22
 23To run the benchmark,  pass it to the script like the following.
 24
 25.. code-block::
 26
 27   python image_dataloading.py
 28       --input-flist ~/imagenet.train.flist
 29       --prefix /data/users/moto/imagenet/
 30       --num-workers 8 # The number of GPUs
 31"""
 32
 33# pyre-ignore-all-errors
 34
 35import logging
 36import signal
 37import time
 38from collections.abc import Iterator
 39from dataclasses import dataclass
 40from pathlib import Path
 41from threading import Event
 42
 43import spdl.io
 44import spdl.utils
 45import torch
 46from spdl.io import CUDAConfig
 47from spdl.pipeline import Pipeline, PipelineBuilder
 48from torch import Tensor
 49
 50_LG = logging.getLogger(__name__)
 51
 52__all__ = [
 53    "entrypoint",
 54    "worker_entrypoint",
 55    "benchmark",
 56    "source",
 57    "batch_decode",
 58    "get_pipeline",
 59    "PerfResult",
 60]
 61
 62
 63def _parse_args(args):
 64    import argparse
 65
 66    parser = argparse.ArgumentParser(
 67        description=__doc__,
 68        formatter_class=argparse.RawDescriptionHelpFormatter,
 69    )
 70    parser.add_argument("--debug", action="store_true")
 71    parser.add_argument("--input-flist", type=Path, required=True)
 72    parser.add_argument("--max-samples", type=int)
 73    parser.add_argument("--prefix")
 74    parser.add_argument("--batch-size", type=int, default=32)
 75    parser.add_argument("--trace", type=Path)
 76    parser.add_argument("--buffer-size", type=int, default=16)
 77    parser.add_argument("--num-threads", type=int, default=16)
 78    parser.add_argument("--worker-id", type=int, required=True)
 79    parser.add_argument("--num-workers", type=int, required=True)
 80    args = parser.parse_args(args)
 81    if args.trace:
 82        args.max_samples = args.batch_size * 40
 83    return args
 84
 85
 86def source(
 87    path: Path,
 88    prefix: str = "",
 89    split_size: int = 1,
 90    split_id: int = 0,
 91) -> Iterator[str]:
 92    """Iterate a file containing a list of paths, while optionally skipping some.
 93
 94    Args:
 95        path: Path to the file containing list of file paths.
 96        prefix: Prepended to the paths in the list.
 97        split_size: Split the paths in to this number of subsets.
 98        split_id: The index of this split.
 99            Paths at ``line_number % split_size == split_id`` are returned.
100
101    Yields:
102        Path: The paths of the specified split.
103    """
104    with open(path) as f:
105        for i, line in enumerate(f):
106            if i % split_size == split_id:
107                if line := line.strip():
108                    yield prefix + line
109
110
111async def batch_decode(
112    srcs: list[str],
113    width: int = 224,
114    height: int = 224,
115    device_config: spdl.io.CUDAConfig | None = None,
116) -> Tensor:
117    """Given image paths, decode, resize, batch and optionally send them to GPU.
118
119    Args:
120        srcs: List of image paths.
121        width, height: The size of the images to batch.
122        device_config: When provided, the data are sent to the specified GPU.
123
124    Returns:
125        The batch tensor.
126    """
127    buffer = await spdl.io.async_load_image_batch(
128        srcs,
129        width=width,
130        height=height,
131        pix_fmt="rgb24",
132        device_config=device_config,
133        strict=False,
134    )
135    return spdl.io.to_torch(buffer)
136
137
138def get_pipeline(
139    src: Iterator[str],
140    batch_size: int,
141    device_config: CUDAConfig,
142    buffer_size: int,
143    num_threads: int,
144) -> Pipeline:
145    """Build image data loading pipeline.
146
147    The pipeline uses :py:func:`batch_decode` for decoding images concurrently
148    and send the resulting data to GPU.
149
150    Args:
151        src: Pipeline source. Generator that yields image paths.
152            See :py:func:`source`.
153        batch_size: The number of images in a batch.
154        device_config: The configuration of target CUDA device.
155        buffer_size: The size of buffer for the resulting batch image Tensor.
156        num_threads: The number of threads in the pipeline.
157
158    Returns:
159        The pipeline that performs batch image decoding and device transfer.
160    """
161
162    async def _batch_decode(srcs):
163        return await batch_decode(srcs, device_config=device_config)
164
165    pipeline = (
166        PipelineBuilder()
167        .add_source(src)
168        .aggregate(batch_size)
169        .pipe(_batch_decode, concurrency=num_threads, report_stats_interval=15)
170        .add_sink(buffer_size)
171        .build(num_threads=num_threads)
172    )
173    return pipeline
174
175
176def _get_pipeline(args):
177    return get_pipeline(
178        source(args.input_flist, args.prefix, args.num_workers, args.worker_id),
179        args.batch_size,
180        device_config=(
181            None
182            if args.worker_id is None
183            else spdl.io.cuda_config(
184                device_index=args.worker_id,
185                allocator=(
186                    torch.cuda.caching_allocator_alloc,
187                    torch.cuda.caching_allocator_delete,
188                ),
189            )
190        ),
191        buffer_size=args.buffer_size,
192        num_threads=args.num_threads,
193    )
194
195
196@dataclass
197class PerfResult:
198    """Used to report the worker performance to the main process."""
199
200    elapsed: float
201    """The time it took to process all the inputs."""
202
203    num_batches: int
204    """The number of batches processed."""
205
206    num_frames: int
207    """The number of frames processed."""
208
209
210def worker_entrypoint(args: list[str]) -> PerfResult:
211    """Entrypoint for worker process. Load images to a GPU and measure its performance.
212
213    It builds a :py:class:`~spdl.pipeline.Pipeline` object using :py:func:`get_pipeline`
214    function and run it with :py:func:`benchmark` function.
215    """
216    args = _parse_args(args)
217    _init(args.debug, args.worker_id)
218
219    _LG.info(args)
220
221    pipeline = _get_pipeline(args)
222    print(pipeline)
223
224    device = torch.device(f"cuda:{args.worker_id}")
225
226    ev = Event()
227
228    def handler_stop_signals(_signum, _frame):
229        ev.set()
230
231    signal.signal(signal.SIGTERM, handler_stop_signals)
232
233    # Warm up
234    torch.zeros([1, 1], device=device)
235
236    trace_path = f"{args.trace}.{args.worker_id}"
237    with (
238        pipeline.auto_stop(),
239        spdl.utils.tracing(trace_path, enable=args.trace is not None),
240    ):
241        return benchmark(pipeline.get_iterator(), ev)
242
243
244def benchmark(loader: Iterator[Tensor], stop_requested: Event) -> PerfResult:
245    """The main loop that measures the performance of dataloading.
246
247    Args:
248        loader: The dataloader to benchmark.
249        stop_requested: Used to interrupt the benchmark loop.
250
251    Returns:
252        The performance result.
253    """
254    t0 = time.monotonic()
255    num_frames = num_batches = 0
256    try:
257        for batch in loader:
258            num_frames += batch.shape[0]
259            num_batches += 1
260
261            if stop_requested.is_set():
262                break
263
264    finally:
265        elapsed = time.monotonic() - t0
266
267    return PerfResult(elapsed, num_batches, num_frames)
268
269
270def _init_logging(debug=False, worker_id=None):
271    fmt = "%(asctime)s [%(filename)s:%(lineno)d] [%(levelname)s] %(message)s"
272    if worker_id is not None:
273        fmt = f"[{worker_id}:%(thread)d] {fmt}"
274    level = logging.DEBUG if debug else logging.INFO
275    logging.basicConfig(format=fmt, level=level)
276
277
278def _init(debug, worker_id):
279    _init_logging(debug, worker_id)
280
281
282def _parse_process_args(args):
283    import argparse
284
285    parser = argparse.ArgumentParser(
286        description=__doc__,
287        formatter_class=argparse.RawDescriptionHelpFormatter,
288    )
289    parser.add_argument("--num-workers", type=int, default=8)
290    return parser.parse_known_args(args)
291
292
293def entrypoint(args: list[str] | None = None):
294    """CLI entrypoint. Launch the worker processes,
295    each of which load images and send them to GPU."""
296    ns, args = _parse_process_args(args)
297
298    args_set = [
299        [*args, f"--worker-id={i}", f"--num-workers={ns.num_workers}"]
300        for i in range(ns.num_workers)
301    ]
302
303    from multiprocessing import Pool
304
305    with Pool(processes=ns.num_workers) as pool:
306        _init_logging()
307        _LG.info("Spawned: %d workers", ns.num_workers)
308
309        vals = pool.map(worker_entrypoint, args_set)
310
311    ave_time = sum(v.elapsed for v in vals) / len(vals)
312    total_frames = sum(v.num_frames for v in vals)
313    total_batches = sum(v.num_batches for v in vals)
314
315    _LG.info(f"{ave_time=:.2f}, {total_frames=}, {total_batches=}")
316
317    FPS = total_frames / ave_time
318    BPS = total_batches / ave_time
319    _LG.info(f"Aggregated {FPS=:.2f}, {BPS=:.2f}")
320
321
322if __name__ == "__main__":
323    entrypoint()

Functions

Functions

entrypoint(args: list[str] | None = None)[source]

CLI entrypoint. Launch the worker processes, each of which load images and send them to GPU.

worker_entrypoint(args: list[str]) PerfResult[source]

Entrypoint for worker process. Load images to a GPU and measure its performance.

It builds a Pipeline object using get_pipeline() function and run it with benchmark() function.

benchmark(loader: Iterator[Tensor], stop_requested: Event) PerfResult[source]

The main loop that measures the performance of dataloading.

Parameters:
  • loader – The dataloader to benchmark.

  • stop_requested – Used to interrupt the benchmark loop.

Returns:

The performance result.

source(path: Path, prefix: str = '', split_size: int = 1, split_id: int = 0) Iterator[str][source]

Iterate a file containing a list of paths, while optionally skipping some.

Parameters:
  • path – Path to the file containing list of file paths.

  • prefix – Prepended to the paths in the list.

  • split_size – Split the paths in to this number of subsets.

  • split_id – The index of this split. Paths at line_number % split_size == split_id are returned.

Yields:

Path – The paths of the specified split.

async batch_decode(srcs: list[str], width: int = 224, height: int = 224, device_config: CUDAConfig | None = None) Tensor[source]

Given image paths, decode, resize, batch and optionally send them to GPU.

Parameters:
  • srcs – List of image paths.

  • width – The size of the images to batch.

  • height – The size of the images to batch.

  • device_config – When provided, the data are sent to the specified GPU.

Returns:

The batch tensor.

get_pipeline(src: Iterator[str], batch_size: int, device_config: CUDAConfig, buffer_size: int, num_threads: int) Pipeline[source]

Build image data loading pipeline.

The pipeline uses batch_decode() for decoding images concurrently and send the resulting data to GPU.

Parameters:
  • src – Pipeline source. Generator that yields image paths. See source().

  • batch_size – The number of images in a batch.

  • device_config – The configuration of target CUDA device.

  • buffer_size – The size of buffer for the resulting batch image Tensor.

  • num_threads – The number of threads in the pipeline.

Returns:

The pipeline that performs batch image decoding and device transfer.

Classes

Classes

class PerfResult(elapsed: float, num_batches: int, num_frames: int)[source]

Used to report the worker performance to the main process.

elapsed: float

The time it took to process all the inputs.

num_batches: int

The number of batches processed.

num_frames: int

The number of frames processed.