multi_thread_preprocessing¶
This example shows how to run PyTorch tarnsform in SPDL Pipeline, and compares its performance against PyTorch DataLoader.
Each pipeline reads images from the ImageNet dataset, and applies resize, batching, and pixel normalization then the data is transferred to GPU.
In the PyTorch and TorchVision native solution, the images are decoded
and resized using Pillow, batched with torch.utils.data.default_collate()
,
pixel normalization is applied with torchvision.transforms.Normalize
,
and data are transferred to GPU with torch.Tensor.cuda()
.
Using torch.utils.data.DataLoader
, the batch is created and
normalized in subprocess and transferred to the main process before they are
sent to GPU.
The following diagram illustrates this.
On the other hand, SPDL Pipeline executes the transforms in the main process. SPDL pipeline uses its own implementation for decode, resize and batching image data.
This script runs the pipeline with different configurations described bellow while changing the number of workers.
Image decoding and resizing
Image decoding, resizing, and batching
Image decoding, resizing, batching, and normalization
Image decoding, resizing, batching, normalization, and transfer to GPU
The following result was obtained.
The following observations can be made.
In both implementations, the throughput peaks around 16 workers, and then decreases as the number of workers.
The throughput increases when batching images, then decreases as additional processing is added.
The degree of improvement from batching in SPDL is significantly higher than in PyTorch. (more than 2x at 16 workers.)
The peak througput is almost 2.7x in SPDL than in PyTorch.
Source¶
Source
Click here to see the source.
1#!/usr/bin/env python3
2# Copyright (c) Meta Platforms, Inc. and affiliates.
3# All rights reserved.
4#
5# This source code is licensed under the BSD-style license found in the
6# LICENSE file in the root directory of this source tree.
7
8"""This example shows how to run PyTorch tarnsform in SPDL Pipeline,
9and compares its performance against PyTorch DataLoader.
10
11Each pipeline reads images from the ImageNet dataset, and applies
12resize, batching, and pixel normalization then the data is transferred
13to GPU.
14
15In the PyTorch and TorchVision native solution, the images are decoded
16and resized using Pillow, batched with :py:func:`torch.utils.data.default_collate`,
17pixel normalization is applied with :py:class:`torchvision.transforms.Normalize`,
18and data are transferred to GPU with :py:func:`torch.Tensor.cuda`.
19
20Using :py:class:`torch.utils.data.DataLoader`, the batch is created and
21normalized in subprocess and transferred to the main process before they are
22sent to GPU.
23
24The following diagram illustrates this.
25
26.. include:: ../plots/multi_thread_preprocessing_chart_torch.txt
27
28On the other hand, SPDL Pipeline executes the transforms in the main process.
29SPDL pipeline uses its own implementation for decode, resize and batching image data.
30
31.. include:: ../plots/multi_thread_preprocessing_chart_spdl.txt
32
33This script runs the pipeline with different configurations described bellow while
34changing the number of workers.
35
361. Image decoding and resizing
372. Image decoding, resizing, and batching
383. Image decoding, resizing, batching, and normalization
394. Image decoding, resizing, batching, normalization, and transfer to GPU
40
41The following result was obtained.
42
43.. include:: ../plots/multi_thread_preprocessing_plot.txt
44
45The following observations can be made.
46
47- In both implementations, the throughput peaks around 16 workers,
48 and then decreases as the number of workers.
49- The throughput increases when batching images, then decreases
50 as additional processing is added.
51- The degree of improvement from batching in SPDL is significantly
52 higher than in PyTorch. (more than 2x at 16 workers.)
53- The peak througput is almost 2.7x in SPDL than in PyTorch.
54"""
55
56import logging
57import multiprocessing
58import time
59from collections.abc import Iterable
60from multiprocessing import Process, Queue
61
62import spdl.io
63import torch
64from spdl.pipeline import PipelineBuilder
65from torchvision.datasets import ImageNet
66from torchvision.transforms import Compose, Normalize, PILToTensor, Resize
67
68__all__ = [
69 "entrypoint",
70 "exp_torch",
71 "exp_spdl",
72 "run_dataloader",
73]
74
75
76logging.getLogger().setLevel(logging.ERROR)
77
78
79def run_dataloader(
80 dataloader: Iterable,
81 max_items: int,
82) -> tuple[int, float]:
83 """Run the given dataloader and measure its performance.
84
85 Args:
86 dataloader: The dataloader to benchmark.
87 max_items: The maximum number of items to process.
88
89 Returns:
90 The number of items processed and the elapsed time in seconds.
91 """
92 num_items = 0
93 t0 = time.monotonic()
94 try:
95 for i, (data, _) in enumerate(dataloader, start=1):
96 num_items += 1 if data.ndim == 3 else len(data)
97 if i >= max_items:
98 break
99 finally:
100 elapsed = time.monotonic() - t0
101 return num_items, elapsed
102
103
104def exp_torch(
105 *,
106 root_dir: str,
107 split: str,
108 num_workers: int,
109 max_items: int,
110 batch_size: int | None = None,
111 normalize: bool = False,
112 transfer: bool = False,
113) -> tuple[int, float]:
114 """Load data with PyTorch native operation using PyTorch DataLoader.
115
116 This is the baseline for comparison.
117
118 Args:
119 root_dir: The root directory of the ImageNet dataset.
120 split: The dataset split, such as "train" and "val".
121 num_workers: The number of workers to use.
122 max_items: The maximum number of items to process.
123 batch: Whether to batch the data.
124 normalize: Whether to normalize the data. Only applicable when ``batch`` is True.
125 transfer: Whether to transfer the data to GPU.
126
127 Returns:
128 The number of items processed and the elapsed time in seconds.
129 """
130 dataset = ImageNet(
131 root=root_dir,
132 split=split,
133 transform=Compose([Resize((224, 224)), PILToTensor()]),
134 )
135
136 normalize_transform = Normalize(
137 mean=[0.485, 0.456, 0.406],
138 std=[0.229, 0.224, 0.225],
139 )
140
141 def collate(item):
142 batch, cls = torch.utils.data.default_collate(item)
143 if normalize:
144 batch = batch.float() / 255
145 batch = normalize_transform(batch)
146 return batch, cls
147
148 dataloader = torch.utils.data.DataLoader(
149 dataset,
150 batch_size=batch_size,
151 num_workers=num_workers,
152 collate_fn=None if batch_size is None else collate,
153 prefetch_factor=1,
154 multiprocessing_context="fork",
155 )
156
157 if transfer:
158
159 def with_transfer(dataloader):
160 for tensor, cls in dataloader:
161 tensor = tensor.cuda()
162 yield tensor, cls
163
164 dataloader = with_transfer(dataloader)
165
166 with torch.no_grad():
167 return run_dataloader(dataloader, max_items)
168
169
170def exp_spdl(
171 *,
172 root_dir: str,
173 split: str,
174 num_workers: int,
175 max_items: int,
176 batch_size: int | None = None,
177 normalize: bool = False,
178 transfer: bool = False,
179) -> tuple[int, float]:
180 """Load data with SPDL operation using SPDL Pipeline.
181
182 Args:
183 root_dir: The root directory of the ImageNet dataset.
184 split: The dataset split, such as "train" and "val".
185 num_workers: The number of workers to use.
186 max_items: The maximum number of items to process.
187 batch: Whether to batch the data.
188 normalize: Whether to normalize the data. Only applicable when ``batch`` is True.
189 transfer: Whether to transfer the data to GPU.
190
191 Returns:
192 The number of items processed and the elapsed time in seconds.
193 """
194 filter_desc = spdl.io.get_video_filter_desc(
195 scale_width=224,
196 scale_height=224,
197 )
198
199 def decode_image(path):
200 packets = spdl.io.demux_image(path)
201 return spdl.io.decode_packets(packets, filter_desc=filter_desc)
202
203 dataset = ImageNet(
204 root=root_dir,
205 split=split,
206 loader=decode_image,
207 )
208
209 def convert(items):
210 frames, cls = list(zip(*items))
211 buffer = spdl.io.convert_frames(frames)
212 tensor = spdl.io.to_torch(buffer).permute(0, 3, 1, 2)
213 return tensor, cls
214
215 builder = (
216 PipelineBuilder()
217 .add_source(range(len(dataset)))
218 .pipe(dataset.__getitem__, concurrency=num_workers)
219 .aggregate(batch_size or 1)
220 .pipe(convert)
221 )
222
223 if normalize:
224 transform = Normalize(mean=[0.485, 0.456, 0.406], std=[0.229, 0.224, 0.225])
225
226 def normalize(item):
227 tensor, cls = item
228 tensor = tensor.float() / 255
229 tensor = transform(tensor)
230 return tensor, cls
231
232 builder = builder.pipe(normalize)
233
234 if transfer:
235 builder = builder.pipe(lambda item: (item[0].cuda(), item[1]))
236
237 builder = builder.add_sink(num_workers)
238 pipeline = builder.build(num_threads=num_workers)
239
240 with torch.no_grad(), pipeline.auto_stop():
241 return run_dataloader(pipeline, max_items)
242
243
244##############################################################################
245# Execute the test function in subprocess, so as to isolate them
246##############################################################################
247def exp_torch_(queue, **kwargs):
248 queue.put(exp_torch(**kwargs))
249
250
251def exp_spdl_(queue, **kwargs):
252 queue.put(exp_spdl(**kwargs))
253
254
255def run_in_process(func, **kwargs):
256 queue = Queue()
257 Process(target=func, args=[queue], kwargs=kwargs).run()
258 return queue.get()
259
260
261def run_test(**kwargs):
262 data = {}
263 num_workers_ = [1, 2, 4, 8, 16, 32]
264 for func in [exp_torch_, exp_spdl_]: # exp_torch_thread, exp_spdl]:
265 print(func.__name__)
266 print("\tnum_workers\tFPS")
267 y = []
268 for num_workers in num_workers_:
269 num_images, elapsed = run_in_process(
270 func, num_workers=num_workers, **kwargs
271 )
272 qps = num_images / elapsed
273 y.append(qps)
274 print(f"\t{num_workers}\t{qps:8.2f} ({num_images} / {elapsed:5.2f})")
275
276 data[func.__name__] = (num_workers_, y)
277
278 return data
279
280
281def _print(data):
282 for i, (x, y) in enumerate(data.values()):
283 if i == 0:
284 print("\t".join(str(v) for v in x))
285 print("\t".join(f"{v:.2f}" for v in y))
286
287
288def entrypoint(
289 root_dir: str,
290 split: str,
291 batch_size: int,
292 max_items: int,
293):
294 """The main entrypoint for CLI.
295
296 Args:
297 root_dir: The root directory of the ImageNet dataset.
298 split: Dataset split, such as "train" and "val".
299 batch_size: The batch size to use.
300 max_items: The maximum number of items to process.
301 """
302 multiprocessing.set_start_method("spawn")
303
304 argset = (
305 {"batch_size": None},
306 {"batch_size": batch_size},
307 {"batch_size": batch_size, "normalize": True},
308 {"batch_size": batch_size, "normalize": True, "transfer": True},
309 )
310
311 for kwargs in argset:
312 print(kwargs)
313 data = run_test(root_dir=root_dir, split=split, max_items=max_items, **kwargs)
314 _print(data)
315
316
317def _parse_args():
318 import argparse
319
320 parser = argparse.ArgumentParser()
321 parser.add_argument(
322 "--root-dir",
323 help="Directory where the ImageNet dataset is stored.",
324 default="/home/moto/local/imagenet/",
325 )
326 parser.add_argument("--batch-size", default=32, type=int)
327 parser.add_argument(
328 "--max-items",
329 type=int,
330 help="The maximum number of items (images or batches) to process.",
331 default=100,
332 )
333 parser.add_argument(
334 "--split",
335 default="val",
336 )
337 return parser.parse_args()
338
339
340if __name__ == "__main__":
341 _args = _parse_args()
342 entrypoint(
343 _args.root_dir,
344 _args.split,
345 _args.batch_size,
346 _args.max_items,
347 )
Functions¶
Functions
- entrypoint(root_dir: str, split: str, batch_size: int, max_items: int)[source]¶
The main entrypoint for CLI.
- Parameters:
root_dir – The root directory of the ImageNet dataset.
split – Dataset split, such as “train” and “val”.
batch_size – The batch size to use.
max_items – The maximum number of items to process.
- exp_torch(*, root_dir: str, split: str, num_workers: int, max_items: int, batch_size: int | None = None, normalize: bool = False, transfer: bool = False) tuple[int, float] [source]¶
Load data with PyTorch native operation using PyTorch DataLoader.
This is the baseline for comparison.
- Parameters:
root_dir – The root directory of the ImageNet dataset.
split – The dataset split, such as “train” and “val”.
num_workers – The number of workers to use.
max_items – The maximum number of items to process.
batch – Whether to batch the data.
normalize – Whether to normalize the data. Only applicable when
batch
is True.transfer – Whether to transfer the data to GPU.
- Returns:
The number of items processed and the elapsed time in seconds.
- exp_spdl(*, root_dir: str, split: str, num_workers: int, max_items: int, batch_size: int | None = None, normalize: bool = False, transfer: bool = False) tuple[int, float] [source]¶
Load data with SPDL operation using SPDL Pipeline.
- Parameters:
root_dir – The root directory of the ImageNet dataset.
split – The dataset split, such as “train” and “val”.
num_workers – The number of workers to use.
max_items – The maximum number of items to process.
batch – Whether to batch the data.
normalize – Whether to normalize the data. Only applicable when
batch
is True.transfer – Whether to transfer the data to GPU.
- Returns:
The number of items processed and the elapsed time in seconds.
- run_dataloader(dataloader: Iterable, max_items: int) tuple[int, float] [source]¶
Run the given dataloader and measure its performance.
- Parameters:
dataloader – The dataloader to benchmark.
max_items – The maximum number of items to process.
- Returns:
The number of items processed and the elapsed time in seconds.