Image dataloading¶
Benchmark the performance of loading images from local file system to GPUs.
Given a list of image files to process, this script spawns subprocesses, each of which load images and send them to the corresponding GPUs, then collect the runtime statistics.
A file list can be created, for example, by:
cd /data/users/moto/imagenet/
find train -name '*.JPEG' > ~/imagenet.train.flist
To run the benchmark, pass it to the script like the following.
python image_dataloading.py
--input-flist ~/imagenet.train.flist
--prefix /data/users/moto/imagenet/
--num-workers 8 # The number of GPUs
Source¶
Source
Click here to see the source.
1#!/usr/bin/env python3
2# Copyright (c) Meta Platforms, Inc. and affiliates.
3# All rights reserved.
4#
5# This source code is licensed under the BSD-style license found in the
6# LICENSE file in the root directory of this source tree.
7
8"""Benchmark the performance of loading images from local file system to GPUs.
9
10Given a list of image files to process, this script spawns subprocesses,
11each of which load images and send them to the corresponding GPUs, then
12collect the runtime statistics.
13
14.. include:: ../plots/image_dataloading_chart.txt
15
16A file list can be created, for example, by:
17
18.. code-block:: bash
19
20 cd /data/users/moto/imagenet/
21 find train -name '*.JPEG' > ~/imagenet.train.flist
22
23To run the benchmark, pass it to the script like the following.
24
25.. code-block::
26
27 python image_dataloading.py
28 --input-flist ~/imagenet.train.flist
29 --prefix /data/users/moto/imagenet/
30 --num-workers 8 # The number of GPUs
31"""
32
33# pyre-strict
34
35import argparse
36import logging
37import signal
38import time
39from argparse import Namespace
40from collections.abc import Iterator
41from dataclasses import dataclass
42from functools import partial
43from pathlib import Path
44from threading import Event
45
46import spdl.io
47import spdl.io.utils
48import torch
49from spdl.io import CUDAConfig
50from spdl.pipeline import Pipeline, PipelineBuilder
51from torch import Tensor
52
53_LG: logging.Logger = logging.getLogger(__name__)
54
55__all__ = [
56 "entrypoint",
57 "worker_entrypoint",
58 "benchmark",
59 "source",
60 "batch_decode",
61 "get_pipeline",
62 "PerfResult",
63]
64
65
66def _parse_args(args: list[str]) -> Namespace:
67 parser = argparse.ArgumentParser(
68 description=__doc__,
69 formatter_class=argparse.RawDescriptionHelpFormatter,
70 )
71 parser.add_argument("--debug", action="store_true")
72 parser.add_argument("--input-flist", type=Path, required=True)
73 parser.add_argument("--max-samples", type=int)
74 parser.add_argument("--prefix", required=True)
75 parser.add_argument("--batch-size", type=int, default=32)
76 parser.add_argument("--trace", type=Path)
77 parser.add_argument("--buffer-size", type=int, default=16)
78 parser.add_argument("--num-threads", type=int, default=16)
79 parser.add_argument("--worker-id", type=int, required=True)
80 parser.add_argument("--num-workers", type=int, required=True)
81 ns = parser.parse_args(args)
82 if ns.trace:
83 ns.max_samples = ns.batch_size * 40
84 return ns
85
86
87def source(
88 path: Path,
89 prefix: str = "",
90 split_size: int = 1,
91 split_id: int = 0,
92) -> Iterator[str]:
93 """Iterate a file containing a list of paths, while optionally skipping some.
94
95 Args:
96 path: Path to the file containing list of file paths.
97 prefix: Prepended to the paths in the list.
98 split_size: Split the paths in to this number of subsets.
99 split_id: The index of this split.
100 Paths at ``line_number % split_size == split_id`` are returned.
101
102 Yields:
103 Path: The paths of the specified split.
104 """
105 with open(path) as f:
106 for i, line in enumerate(f):
107 if i % split_size == split_id:
108 if line := line.strip():
109 yield prefix + line
110
111
112def batch_decode(
113 srcs: list[str],
114 device_config: spdl.io.CUDAConfig,
115 width: int = 224,
116 height: int = 224,
117) -> Tensor:
118 """Given image paths, decode, resize, batch and optionally send them to GPU.
119
120 Args:
121 srcs: List of image paths.
122 width, height: The size of the images to batch.
123 device_config: When provided, the data are sent to the specified GPU.
124
125 Returns:
126 The batch tensor.
127 """
128 buffer = spdl.io.load_image_batch(
129 srcs,
130 width=width,
131 height=height,
132 pix_fmt="rgb24",
133 device_config=device_config,
134 strict=False,
135 )
136 return spdl.io.to_torch(buffer)
137
138
139def get_pipeline(
140 src: Iterator[str],
141 batch_size: int,
142 device_config: CUDAConfig,
143 buffer_size: int,
144 num_threads: int,
145) -> Pipeline:
146 """Build image data loading pipeline.
147
148 The pipeline uses :py:func:`batch_decode` for decoding images concurrently
149 and send the resulting data to GPU.
150
151 Args:
152 src: Pipeline source. Generator that yields image paths.
153 See :py:func:`source`.
154 batch_size: The number of images in a batch.
155 device_config: The configuration of target CUDA device.
156 buffer_size: The size of buffer for the resulting batch image Tensor.
157 num_threads: The number of threads in the pipeline.
158
159 Returns:
160 The pipeline that performs batch image decoding and device transfer.
161 """
162 decode = partial(batch_decode, device_config=device_config)
163
164 pipeline = (
165 PipelineBuilder()
166 .add_source(src)
167 .aggregate(batch_size)
168 .pipe(decode, concurrency=num_threads)
169 .add_sink(buffer_size)
170 .build(num_threads=num_threads, report_stats_interval=15)
171 )
172 return pipeline
173
174
175def _get_pipeline(args: Namespace) -> Pipeline:
176 return get_pipeline(
177 source(args.input_flist, args.prefix, args.num_workers, args.worker_id),
178 args.batch_size,
179 device_config=(
180 spdl.io.cuda_config(
181 device_index=args.worker_id,
182 allocator=(
183 torch.cuda.caching_allocator_alloc,
184 torch.cuda.caching_allocator_delete,
185 ),
186 )
187 ),
188 buffer_size=args.buffer_size,
189 num_threads=args.num_threads,
190 )
191
192
193@dataclass
194class PerfResult:
195 """Used to report the worker performance to the main process."""
196
197 elapsed: float
198 """The time it took to process all the inputs."""
199
200 num_batches: int
201 """The number of batches processed."""
202
203 num_frames: int
204 """The number of frames processed."""
205
206
207def worker_entrypoint(args_: list[str]) -> PerfResult:
208 """Entrypoint for worker process. Load images to a GPU and measure its performance.
209
210 It builds a :py:class:`~spdl.pipeline.Pipeline` object using :py:func:`get_pipeline`
211 function and run it with :py:func:`benchmark` function.
212 """
213 args = _parse_args(args_)
214 _init(args.debug, args.worker_id)
215
216 _LG.info(args)
217
218 pipeline = _get_pipeline(args)
219 print(pipeline)
220
221 device = torch.device(f"cuda:{args.worker_id}")
222
223 ev: Event = Event()
224
225 def handler_stop_signals(_signum, _frame) -> None:
226 ev.set()
227
228 signal.signal(signal.SIGTERM, handler_stop_signals)
229
230 # Warm up
231 torch.zeros([1, 1], device=device)
232
233 trace_path = f"{args.trace}.{args.worker_id}"
234 with (
235 pipeline.auto_stop(),
236 spdl.io.utils.tracing(trace_path, enable=args.trace is not None),
237 ):
238 return benchmark(pipeline.get_iterator(), ev)
239
240
241def benchmark(loader: Iterator[Tensor], stop_requested: Event) -> PerfResult:
242 """The main loop that measures the performance of dataloading.
243
244 Args:
245 loader: The dataloader to benchmark.
246 stop_requested: Used to interrupt the benchmark loop.
247
248 Returns:
249 The performance result.
250 """
251 t0 = time.monotonic()
252 num_frames = num_batches = 0
253 try:
254 for batch in loader:
255 num_frames += batch.shape[0]
256 num_batches += 1
257
258 if stop_requested.is_set():
259 break
260
261 finally:
262 elapsed = time.monotonic() - t0
263
264 return PerfResult(elapsed, num_batches, num_frames)
265
266
267def _init_logging(debug: bool = False, worker_id: int | None = None) -> None:
268 fmt = "%(asctime)s [%(filename)s:%(lineno)d] [%(levelname)s] %(message)s"
269 if worker_id is not None:
270 fmt = f"[{worker_id}:%(thread)d] {fmt}"
271 level = logging.DEBUG if debug else logging.INFO
272 logging.basicConfig(format=fmt, level=level)
273
274
275def _init(debug: bool, worker_id: int) -> None:
276 _init_logging(debug, worker_id)
277
278
279def _parse_process_args(args: list[str] | None) -> tuple[Namespace, list[str]]:
280 parser = argparse.ArgumentParser(
281 description=__doc__,
282 formatter_class=argparse.RawDescriptionHelpFormatter,
283 )
284 parser.add_argument("--num-workers", type=int, default=8)
285 return parser.parse_known_args(args)
286
287
288def entrypoint(args: list[str] | None = None) -> None:
289 """CLI entrypoint. Launch the worker processes,
290 each of which load images and send them to GPU."""
291 ns, args = _parse_process_args(args)
292
293 args_set = [
294 [*args, f"--worker-id={i}", f"--num-workers={ns.num_workers}"]
295 for i in range(ns.num_workers)
296 ]
297
298 from multiprocessing import Pool
299
300 with Pool(processes=ns.num_workers) as pool:
301 _init_logging()
302 _LG.info("Spawned: %d workers", ns.num_workers)
303
304 vals = pool.map(worker_entrypoint, args_set)
305
306 ave_time = sum(v.elapsed for v in vals) / len(vals)
307 total_frames = sum(v.num_frames for v in vals)
308 total_batches = sum(v.num_batches for v in vals)
309
310 _LG.info(f"{ave_time=:.2f}, {total_frames=}, {total_batches=}")
311
312 FPS = total_frames / ave_time
313 BPS = total_batches / ave_time
314 _LG.info(f"Aggregated {FPS=:.2f}, {BPS=:.2f}")
315
316
317if __name__ == "__main__":
318 entrypoint()
Functions¶
Functions
- entrypoint(args: list[str] | None = None) None [source]¶
CLI entrypoint. Launch the worker processes, each of which load images and send them to GPU.
- worker_entrypoint(args_: list[str]) PerfResult [source]¶
Entrypoint for worker process. Load images to a GPU and measure its performance.
It builds a
Pipeline
object usingget_pipeline()
function and run it withbenchmark()
function.
- benchmark(loader: Iterator[Tensor], stop_requested: Event) PerfResult [source]¶
The main loop that measures the performance of dataloading.
- Parameters:
loader – The dataloader to benchmark.
stop_requested – Used to interrupt the benchmark loop.
- Returns:
The performance result.
- source(path: Path, prefix: str = '', split_size: int = 1, split_id: int = 0) Iterator[str] [source]¶
Iterate a file containing a list of paths, while optionally skipping some.
- Parameters:
path – Path to the file containing list of file paths.
prefix – Prepended to the paths in the list.
split_size – Split the paths in to this number of subsets.
split_id – The index of this split. Paths at
line_number % split_size == split_id
are returned.
- Yields:
Path – The paths of the specified split.
- batch_decode(srcs: list[str], device_config: CUDAConfig, width: int = 224, height: int = 224) Tensor [source]¶
Given image paths, decode, resize, batch and optionally send them to GPU.
- Parameters:
srcs – List of image paths.
width – The size of the images to batch.
height – The size of the images to batch.
device_config – When provided, the data are sent to the specified GPU.
- Returns:
The batch tensor.
- get_pipeline(src: Iterator[str], batch_size: int, device_config: CUDAConfig, buffer_size: int, num_threads: int) Pipeline [source]¶
Build image data loading pipeline.
The pipeline uses
batch_decode()
for decoding images concurrently and send the resulting data to GPU.- Parameters:
src – Pipeline source. Generator that yields image paths. See
source()
.batch_size – The number of images in a batch.
device_config – The configuration of target CUDA device.
buffer_size – The size of buffer for the resulting batch image Tensor.
num_threads – The number of threads in the pipeline.
- Returns:
The pipeline that performs batch image decoding and device transfer.
Classes¶
Classes