image_dataloading¶
Benchmark the performance of loading images from local file system to GPUs.
Given a list of image files to process, this script spawns subprocesses, each of which load images and send them to the corresponding GPUs, then collect the runtime statistics.
A file list can be created, for example, by:
cd /data/users/moto/imagenet/
find train -name '*.JPEG' > ~/imagenet.train.flist
To run the benchmark, pass it to the script like the following.
python image_dataloading.py
--input-flist ~/imagenet.train.flist
--prefix /data/users/moto/imagenet/
--num-workers 8 # The number of GPUs
Source¶
Source
Click here to see the source.
1#!/usr/bin/env python3
2# Copyright (c) Meta Platforms, Inc. and affiliates.
3# All rights reserved.
4#
5# This source code is licensed under the BSD-style license found in the
6# LICENSE file in the root directory of this source tree.
7
8"""Benchmark the performance of loading images from local file system to GPUs.
9
10Given a list of image files to process, this script spawns subprocesses,
11each of which load images and send them to the corresponding GPUs, then
12collect the runtime statistics.
13
14.. include:: ../plots/image_dataloading_chart.txt
15
16A file list can be created, for example, by:
17
18.. code-block:: bash
19
20 cd /data/users/moto/imagenet/
21 find train -name '*.JPEG' > ~/imagenet.train.flist
22
23To run the benchmark, pass it to the script like the following.
24
25.. code-block::
26
27 python image_dataloading.py
28 --input-flist ~/imagenet.train.flist
29 --prefix /data/users/moto/imagenet/
30 --num-workers 8 # The number of GPUs
31"""
32
33# pyre-ignore-all-errors
34
35import logging
36import signal
37import time
38from collections.abc import Iterator
39from dataclasses import dataclass
40from pathlib import Path
41from threading import Event
42
43import spdl.io
44import spdl.utils
45import torch
46from spdl.io import CUDAConfig
47from spdl.pipeline import Pipeline, PipelineBuilder
48from torch import Tensor
49
50_LG = logging.getLogger(__name__)
51
52__all__ = [
53 "entrypoint",
54 "worker_entrypoint",
55 "benchmark",
56 "source",
57 "batch_decode",
58 "get_pipeline",
59 "PerfResult",
60]
61
62
63def _parse_args(args):
64 import argparse
65
66 parser = argparse.ArgumentParser(
67 description=__doc__,
68 formatter_class=argparse.RawDescriptionHelpFormatter,
69 )
70 parser.add_argument("--debug", action="store_true")
71 parser.add_argument("--input-flist", type=Path, required=True)
72 parser.add_argument("--max-samples", type=int)
73 parser.add_argument("--prefix")
74 parser.add_argument("--batch-size", type=int, default=32)
75 parser.add_argument("--trace", type=Path)
76 parser.add_argument("--buffer-size", type=int, default=16)
77 parser.add_argument("--num-threads", type=int, default=16)
78 parser.add_argument("--worker-id", type=int, required=True)
79 parser.add_argument("--num-workers", type=int, required=True)
80 args = parser.parse_args(args)
81 if args.trace:
82 args.max_samples = args.batch_size * 40
83 return args
84
85
86def source(
87 path: Path,
88 prefix: str = "",
89 split_size: int = 1,
90 split_id: int = 0,
91) -> Iterator[str]:
92 """Iterate a file containing a list of paths, while optionally skipping some.
93
94 Args:
95 path: Path to the file containing list of file paths.
96 prefix: Prepended to the paths in the list.
97 split_size: Split the paths in to this number of subsets.
98 split_id: The index of this split.
99 Paths at ``line_number % split_size == split_id`` are returned.
100
101 Yields:
102 Path: The paths of the specified split.
103 """
104 with open(path) as f:
105 for i, line in enumerate(f):
106 if i % split_size == split_id:
107 if line := line.strip():
108 yield prefix + line
109
110
111async def batch_decode(
112 srcs: list[str],
113 width: int = 224,
114 height: int = 224,
115 device_config: spdl.io.CUDAConfig | None = None,
116) -> Tensor:
117 """Given image paths, decode, resize, batch and optionally send them to GPU.
118
119 Args:
120 srcs: List of image paths.
121 width, height: The size of the images to batch.
122 device_config: When provided, the data are sent to the specified GPU.
123
124 Returns:
125 The batch tensor.
126 """
127 buffer = await spdl.io.async_load_image_batch(
128 srcs,
129 width=width,
130 height=height,
131 pix_fmt="rgb24",
132 device_config=device_config,
133 strict=False,
134 )
135 return spdl.io.to_torch(buffer)
136
137
138def get_pipeline(
139 src: Iterator[str],
140 batch_size: int,
141 device_config: CUDAConfig,
142 buffer_size: int,
143 num_threads: int,
144) -> Pipeline:
145 """Build image data loading pipeline.
146
147 The pipeline uses :py:func:`batch_decode` for decoding images concurrently
148 and send the resulting data to GPU.
149
150 Args:
151 src: Pipeline source. Generator that yields image paths.
152 See :py:func:`source`.
153 batch_size: The number of images in a batch.
154 device_config: The configuration of target CUDA device.
155 buffer_size: The size of buffer for the resulting batch image Tensor.
156 num_threads: The number of threads in the pipeline.
157
158 Returns:
159 The pipeline that performs batch image decoding and device transfer.
160 """
161
162 async def _batch_decode(srcs):
163 return await batch_decode(srcs, device_config=device_config)
164
165 pipeline = (
166 PipelineBuilder()
167 .add_source(src)
168 .aggregate(batch_size)
169 .pipe(_batch_decode, concurrency=num_threads, report_stats_interval=15)
170 .add_sink(buffer_size)
171 .build(num_threads=num_threads)
172 )
173 return pipeline
174
175
176def _get_pipeline(args):
177 return get_pipeline(
178 source(args.input_flist, args.prefix, args.num_workers, args.worker_id),
179 args.batch_size,
180 device_config=(
181 None
182 if args.worker_id is None
183 else spdl.io.cuda_config(
184 device_index=args.worker_id,
185 allocator=(
186 torch.cuda.caching_allocator_alloc,
187 torch.cuda.caching_allocator_delete,
188 ),
189 )
190 ),
191 buffer_size=args.buffer_size,
192 num_threads=args.num_threads,
193 )
194
195
196@dataclass
197class PerfResult:
198 """Used to report the worker performance to the main process."""
199
200 elapsed: float
201 """The time it took to process all the inputs."""
202
203 num_batches: int
204 """The number of batches processed."""
205
206 num_frames: int
207 """The number of frames processed."""
208
209
210def worker_entrypoint(args: list[str]) -> PerfResult:
211 """Entrypoint for worker process. Load images to a GPU and measure its performance.
212
213 It builds a :py:class:`~spdl.pipeline.Pipeline` object using :py:func:`get_pipeline`
214 function and run it with :py:func:`benchmark` function.
215 """
216 args = _parse_args(args)
217 _init(args.debug, args.worker_id)
218
219 _LG.info(args)
220
221 pipeline = _get_pipeline(args)
222 print(pipeline)
223
224 device = torch.device(f"cuda:{args.worker_id}")
225
226 ev = Event()
227
228 def handler_stop_signals(_signum, _frame):
229 ev.set()
230
231 signal.signal(signal.SIGTERM, handler_stop_signals)
232
233 # Warm up
234 torch.zeros([1, 1], device=device)
235
236 trace_path = f"{args.trace}.{args.worker_id}"
237 with (
238 pipeline.auto_stop(),
239 spdl.utils.tracing(trace_path, enable=args.trace is not None),
240 ):
241 return benchmark(pipeline.get_iterator(), ev)
242
243
244def benchmark(loader: Iterator[Tensor], stop_requested: Event) -> PerfResult:
245 """The main loop that measures the performance of dataloading.
246
247 Args:
248 loader: The dataloader to benchmark.
249 stop_requested: Used to interrupt the benchmark loop.
250
251 Returns:
252 The performance result.
253 """
254 t0 = time.monotonic()
255 num_frames = num_batches = 0
256 try:
257 for batch in loader:
258 num_frames += batch.shape[0]
259 num_batches += 1
260
261 if stop_requested.is_set():
262 break
263
264 finally:
265 elapsed = time.monotonic() - t0
266
267 return PerfResult(elapsed, num_batches, num_frames)
268
269
270def _init_logging(debug=False, worker_id=None):
271 fmt = "%(asctime)s [%(filename)s:%(lineno)d] [%(levelname)s] %(message)s"
272 if worker_id is not None:
273 fmt = f"[{worker_id}:%(thread)d] {fmt}"
274 level = logging.DEBUG if debug else logging.INFO
275 logging.basicConfig(format=fmt, level=level)
276
277
278def _init(debug, worker_id):
279 _init_logging(debug, worker_id)
280
281
282def _parse_process_args(args):
283 import argparse
284
285 parser = argparse.ArgumentParser(
286 description=__doc__,
287 formatter_class=argparse.RawDescriptionHelpFormatter,
288 )
289 parser.add_argument("--num-workers", type=int, default=8)
290 return parser.parse_known_args(args)
291
292
293def entrypoint(args: list[str] | None = None):
294 """CLI entrypoint. Launch the worker processes,
295 each of which load images and send them to GPU."""
296 ns, args = _parse_process_args(args)
297
298 args_set = [
299 [*args, f"--worker-id={i}", f"--num-workers={ns.num_workers}"]
300 for i in range(ns.num_workers)
301 ]
302
303 from multiprocessing import Pool
304
305 with Pool(processes=ns.num_workers) as pool:
306 _init_logging()
307 _LG.info("Spawned: %d workers", ns.num_workers)
308
309 vals = pool.map(worker_entrypoint, args_set)
310
311 ave_time = sum(v.elapsed for v in vals) / len(vals)
312 total_frames = sum(v.num_frames for v in vals)
313 total_batches = sum(v.num_batches for v in vals)
314
315 _LG.info(f"{ave_time=:.2f}, {total_frames=}, {total_batches=}")
316
317 FPS = total_frames / ave_time
318 BPS = total_batches / ave_time
319 _LG.info(f"Aggregated {FPS=:.2f}, {BPS=:.2f}")
320
321
322if __name__ == "__main__":
323 entrypoint()
Functions¶
Functions
- entrypoint(args: list[str] | None = None)[source]¶
CLI entrypoint. Launch the worker processes, each of which load images and send them to GPU.
- worker_entrypoint(args: list[str]) PerfResult [source]¶
Entrypoint for worker process. Load images to a GPU and measure its performance.
It builds a
Pipeline
object usingget_pipeline()
function and run it withbenchmark()
function.
- benchmark(loader: Iterator[Tensor], stop_requested: Event) PerfResult [source]¶
The main loop that measures the performance of dataloading.
- Parameters:
loader – The dataloader to benchmark.
stop_requested – Used to interrupt the benchmark loop.
- Returns:
The performance result.
- source(path: Path, prefix: str = '', split_size: int = 1, split_id: int = 0) Iterator[str] [source]¶
Iterate a file containing a list of paths, while optionally skipping some.
- Parameters:
path – Path to the file containing list of file paths.
prefix – Prepended to the paths in the list.
split_size – Split the paths in to this number of subsets.
split_id – The index of this split. Paths at
line_number % split_size == split_id
are returned.
- Yields:
Path – The paths of the specified split.
- async batch_decode(srcs: list[str], width: int = 224, height: int = 224, device_config: CUDAConfig | None = None) Tensor [source]¶
Given image paths, decode, resize, batch and optionally send them to GPU.
- Parameters:
srcs – List of image paths.
width – The size of the images to batch.
height – The size of the images to batch.
device_config – When provided, the data are sent to the specified GPU.
- Returns:
The batch tensor.
- get_pipeline(src: Iterator[str], batch_size: int, device_config: CUDAConfig, buffer_size: int, num_threads: int) Pipeline [source]¶
Build image data loading pipeline.
The pipeline uses
batch_decode()
for decoding images concurrently and send the resulting data to GPU.- Parameters:
src – Pipeline source. Generator that yields image paths. See
source()
.batch_size – The number of images in a batch.
device_config – The configuration of target CUDA device.
buffer_size – The size of buffer for the resulting batch image Tensor.
num_threads – The number of threads in the pipeline.
- Returns:
The pipeline that performs batch image decoding and device transfer.
Classes¶
Classes