multi_thread_preprocessing

This example shows how to run PyTorch tarnsform in SPDL Pipeline, and compares its performance against PyTorch DataLoader.

Each pipeline reads images from the ImageNet dataset, and applies resize, batching, and pixel normalization then the data is transferred to GPU.

In the PyTorch and TorchVision native solution, the images are decoded and resized using Pillow, batched with torch.utils.data.default_collate(), pixel normalization is applied with torchvision.transforms.Normalize, and data are transferred to GPU with torch.Tensor.cuda().

Using torch.utils.data.DataLoader, the batch is created and normalized in subprocess and transferred to the main process before they are sent to GPU.

The following diagram illustrates this.

flowchart TB subgraph SP[Subprocess] subgraph TV[TorchVision/Pillow] Source --Decode--> F0[YUV] --Convert--> F1[RGB] --Resize--> F2[RGB] --To Contiguous--> T1[Tensor] T2[Tensor] end T1 --Copy--> B1[Batch] --Pixel Normalization--> B2[Batch] T2 --Copy--> B1 end Pipe subgraph MP[Main Process] B3[Batch] --GPU Transfer--> B4[Batch] end SP --Deserialize--> Pipe --Serialize--> MP

On the other hand, SPDL Pipeline executes the transforms in the main process. SPDL pipeline uses its own implementation for decode, resize and batching image data.

flowchart LR subgraph MP[Main Process] direction LR subgraph Decode[SPDL I/O] Source --Decode--> F0[YUV] --Resize--> F1[YUV] --Convert--> F2[RGB] --Copy--> B1[Batch] F3[RGB] --Copy--> B1 end B1 --Pixel Normalization--> B2[Batch] --Transfer--> GPU end

This script runs the pipeline with different configurations described bellow while changing the number of workers.

  1. Image decoding and resizing

  2. Image decoding, resizing, and batching

  3. Image decoding, resizing, batching, and normalization

  4. Image decoding, resizing, batching, normalization, and transfer to GPU

The following result was obtained.

The following observations can be made.

  • In both implementations, the throughput peaks around 16 workers, and then decreases as the number of workers.

  • The throughput increases when batching images, then decreases as additional processing is added.

  • The degree of improvement from batching in SPDL is significantly higher than in PyTorch. (more than 2x at 16 workers.)

  • The peak througput is almost 2.7x in SPDL than in PyTorch.

Source

Source

Click here to see the source.
  1#!/usr/bin/env python3
  2# Copyright (c) Meta Platforms, Inc. and affiliates.
  3# All rights reserved.
  4#
  5# This source code is licensed under the BSD-style license found in the
  6# LICENSE file in the root directory of this source tree.
  7
  8"""This example shows how to run PyTorch tarnsform in SPDL Pipeline,
  9and compares its performance against PyTorch DataLoader.
 10
 11Each pipeline reads images from the ImageNet dataset, and applies
 12resize, batching, and pixel normalization then the data is transferred
 13to GPU.
 14
 15In the PyTorch and TorchVision native solution, the images are decoded
 16and resized using Pillow, batched with :py:func:`torch.utils.data.default_collate`,
 17pixel normalization is applied with :py:class:`torchvision.transforms.Normalize`,
 18and data are transferred to GPU with :py:func:`torch.Tensor.cuda`.
 19
 20Using :py:class:`torch.utils.data.DataLoader`, the batch is created and
 21normalized in subprocess and transferred to the main process before they are
 22sent to GPU.
 23
 24The following diagram illustrates this.
 25
 26.. include:: ../plots/multi_thread_preprocessing_chart_torch.txt
 27
 28On the other hand, SPDL Pipeline executes the transforms in the main process.
 29SPDL pipeline uses its own implementation for decode, resize and batching image data.
 30
 31.. include:: ../plots/multi_thread_preprocessing_chart_spdl.txt
 32
 33This script runs the pipeline with different configurations described bellow while
 34changing the number of workers.
 35
 361. Image decoding and resizing
 372. Image decoding, resizing, and batching
 383. Image decoding, resizing, batching, and normalization
 394. Image decoding, resizing, batching, normalization, and transfer to GPU
 40
 41The following result was obtained.
 42
 43.. include:: ../plots/multi_thread_preprocessing_plot.txt
 44
 45The following observations can be made.
 46
 47- In both implementations, the throughput peaks around 16 workers,
 48  and then decreases as the number of workers.
 49- The throughput increases when batching images, then decreases
 50  as additional processing is added.
 51- The degree of improvement from batching in SPDL is significantly
 52  higher than in PyTorch. (more than 2x at 16 workers.)
 53- The peak througput is almost 2.7x in SPDL than in PyTorch.
 54"""
 55
 56import logging
 57import multiprocessing
 58import time
 59from collections.abc import Iterable
 60from multiprocessing import Process, Queue
 61
 62import spdl.io
 63import torch
 64from spdl.dataloader import PipelineBuilder
 65from torchvision.datasets import ImageNet
 66from torchvision.transforms import Compose, Normalize, PILToTensor, Resize
 67
 68__all__ = [
 69    "entrypoint",
 70    "exp_torch",
 71    "exp_spdl",
 72    "run_dataloader",
 73]
 74
 75
 76logging.getLogger().setLevel(logging.ERROR)
 77
 78
 79def run_dataloader(
 80    dataloader: Iterable,
 81    max_items: int,
 82) -> tuple[int, float]:
 83    """Run the given dataloader and measure its performance.
 84
 85    Args:
 86        dataloader: The dataloader to benchmark.
 87        max_items: The maximum number of items to process.
 88
 89    Returns:
 90        The number of items processed and the elapsed time in seconds.
 91    """
 92    num_items = 0
 93    t0 = time.monotonic()
 94    try:
 95        for i, (data, _) in enumerate(dataloader, start=1):
 96            num_items += 1 if data.ndim == 3 else len(data)
 97            if i >= max_items:
 98                break
 99    finally:
100        elapsed = time.monotonic() - t0
101    return num_items, elapsed
102
103
104def exp_torch(
105    *,
106    root_dir: str,
107    split: str,
108    num_workers: int,
109    max_items: int,
110    batch_size: int | None = None,
111    normalize: bool = False,
112    transfer: bool = False,
113) -> tuple[int, float]:
114    """Load data with PyTorch native operation using PyTorch DataLoader.
115
116    This is the baseline for comparison.
117
118    Args:
119        root_dir: The root directory of the ImageNet dataset.
120        split: The dataset split, such as "train" and "val".
121        num_workers: The number of workers to use.
122        max_items: The maximum number of items to process.
123        batch: Whether to batch the data.
124        normalize: Whether to normalize the data. Only applicable when ``batch`` is True.
125        transfer: Whether to transfer the data to GPU.
126
127    Returns:
128        The number of items processed and the elapsed time in seconds.
129    """
130    dataset = ImageNet(
131        root=root_dir,
132        split=split,
133        transform=Compose([Resize((224, 224)), PILToTensor()]),
134    )
135
136    normalize_transform = Normalize(
137        mean=[0.485, 0.456, 0.406],
138        std=[0.229, 0.224, 0.225],
139    )
140
141    def collate(item):
142        batch, cls = torch.utils.data.default_collate(item)
143        if normalize:
144            batch = batch.float() / 255
145            batch = normalize_transform(batch)
146        return batch, cls
147
148    dataloader = torch.utils.data.DataLoader(
149        dataset,
150        batch_size=batch_size,
151        num_workers=num_workers,
152        collate_fn=None if batch_size is None else collate,
153        prefetch_factor=1,
154        multiprocessing_context="fork",
155    )
156
157    if transfer:
158
159        def with_transfer(dataloader):
160            for tensor, cls in dataloader:
161                tensor = tensor.cuda()
162                yield tensor, cls
163
164        dataloader = with_transfer(dataloader)
165
166    with torch.no_grad():
167        return run_dataloader(dataloader, max_items)
168
169
170def exp_spdl(
171    *,
172    root_dir: str,
173    split: str,
174    num_workers: int,
175    max_items: int,
176    batch_size: int | None = None,
177    normalize: bool = False,
178    transfer: bool = False,
179) -> tuple[int, float]:
180    """Load data with SPDL operation using SPDL Pipeline.
181
182    Args:
183        root_dir: The root directory of the ImageNet dataset.
184        split: The dataset split, such as "train" and "val".
185        num_workers: The number of workers to use.
186        max_items: The maximum number of items to process.
187        batch: Whether to batch the data.
188        normalize: Whether to normalize the data. Only applicable when ``batch`` is True.
189        transfer: Whether to transfer the data to GPU.
190
191    Returns:
192        The number of items processed and the elapsed time in seconds.
193    """
194    filter_desc = spdl.io.get_video_filter_desc(
195        scale_width=224,
196        scale_height=224,
197    )
198
199    def decode_image(path):
200        packets = spdl.io.demux_image(path)
201        return spdl.io.decode_packets(packets, filter_desc=filter_desc)
202
203    dataset = ImageNet(
204        root=root_dir,
205        split=split,
206        loader=decode_image,
207    )
208
209    def convert(items):
210        frames, cls = list(zip(*items))
211        buffer = spdl.io.convert_frames(frames)
212        tensor = spdl.io.to_torch(buffer).permute(0, 3, 1, 2)
213        return tensor, cls
214
215    builder = (
216        PipelineBuilder()
217        .add_source(range(len(dataset)))
218        .pipe(dataset.__getitem__, concurrency=num_workers)
219        .aggregate(batch_size or 1)
220        .pipe(convert)
221    )
222
223    if normalize:
224        transform = Normalize(mean=[0.485, 0.456, 0.406], std=[0.229, 0.224, 0.225])
225
226        def normalize(item):
227            tensor, cls = item
228            tensor = tensor.float() / 255
229            tensor = transform(tensor)
230            return tensor, cls
231
232        builder = builder.pipe(normalize)
233
234    if transfer:
235        builder = builder.pipe(lambda item: (item[0].cuda(), item[1]))
236
237    builder = builder.add_sink(num_workers)
238    pipeline = builder.build(num_threads=num_workers)
239
240    with torch.no_grad(), pipeline.auto_stop():
241        return run_dataloader(pipeline, max_items)
242
243
244##############################################################################
245# Execute the test function in subprocess, so as to isolate them
246##############################################################################
247def exp_torch_(queue, **kwargs):
248    queue.put(exp_torch(**kwargs))
249
250
251def exp_spdl_(queue, **kwargs):
252    queue.put(exp_spdl(**kwargs))
253
254
255def run_in_process(func, **kwargs):
256    queue = Queue()
257    Process(target=func, args=[queue], kwargs=kwargs).run()
258    return queue.get()
259
260
261def run_test(**kwargs):
262    data = {}
263    num_workers_ = [1, 2, 4, 8, 16, 32]
264    for func in [exp_torch_, exp_spdl_]:  # exp_torch_thread, exp_spdl]:
265        print(func.__name__)
266        print("\tnum_workers\tFPS")
267        y = []
268        for num_workers in num_workers_:
269            num_images, elapsed = run_in_process(
270                func, num_workers=num_workers, **kwargs
271            )
272            qps = num_images / elapsed
273            y.append(qps)
274            print(f"\t{num_workers}\t{qps:8.2f} ({num_images} / {elapsed:5.2f})")
275
276        data[func.__name__] = (num_workers_, y)
277
278    return data
279
280
281def _print(data):
282    for i, (x, y) in enumerate(data.values()):
283        if i == 0:
284            print("\t".join(str(v) for v in x))
285        print("\t".join(f"{v:.2f}" for v in y))
286
287
288def entrypoint(
289    root_dir: str,
290    split: str,
291    batch_size: int,
292    max_items: int,
293):
294    """The main entrypoint for CLI.
295
296    Args:
297        root_dir: The root directory of the ImageNet dataset.
298        split: Dataset split, such as "train" and "val".
299        batch_size: The batch size to use.
300        max_items: The maximum number of items to process.
301    """
302    multiprocessing.set_start_method("spawn")
303
304    argset = (
305        {"batch_size": None},
306        {"batch_size": batch_size},
307        {"batch_size": batch_size, "normalize": True},
308        {"batch_size": batch_size, "normalize": True, "transfer": True},
309    )
310
311    for kwargs in argset:
312        print(kwargs)
313        data = run_test(root_dir=root_dir, split=split, max_items=max_items, **kwargs)
314        _print(data)
315
316
317def _parse_args():
318    import argparse
319
320    parser = argparse.ArgumentParser()
321    parser.add_argument(
322        "--root-dir",
323        help="Directory where the ImageNet dataset is stored.",
324        default="/home/moto/local/imagenet/",
325    )
326    parser.add_argument("--batch-size", default=32, type=int)
327    parser.add_argument(
328        "--max-items",
329        type=int,
330        help="The maximum number of items (images or batches) to process.",
331        default=100,
332    )
333    parser.add_argument(
334        "--split",
335        default="val",
336    )
337    return parser.parse_args()
338
339
340if __name__ == "__main__":
341    _args = _parse_args()
342    entrypoint(
343        _args.root_dir,
344        _args.split,
345        _args.batch_size,
346        _args.max_items,
347    )

Functions

Functions

entrypoint(root_dir: str, split: str, batch_size: int, max_items: int)[source]

The main entrypoint for CLI.

Parameters:
  • root_dir – The root directory of the ImageNet dataset.

  • split – Dataset split, such as “train” and “val”.

  • batch_size – The batch size to use.

  • max_items – The maximum number of items to process.

exp_torch(*, root_dir: str, split: str, num_workers: int, max_items: int, batch_size: int | None = None, normalize: bool = False, transfer: bool = False) tuple[int, float][source]

Load data with PyTorch native operation using PyTorch DataLoader.

This is the baseline for comparison.

Parameters:
  • root_dir – The root directory of the ImageNet dataset.

  • split – The dataset split, such as “train” and “val”.

  • num_workers – The number of workers to use.

  • max_items – The maximum number of items to process.

  • batch – Whether to batch the data.

  • normalize – Whether to normalize the data. Only applicable when batch is True.

  • transfer – Whether to transfer the data to GPU.

Returns:

The number of items processed and the elapsed time in seconds.

exp_spdl(*, root_dir: str, split: str, num_workers: int, max_items: int, batch_size: int | None = None, normalize: bool = False, transfer: bool = False) tuple[int, float][source]

Load data with SPDL operation using SPDL Pipeline.

Parameters:
  • root_dir – The root directory of the ImageNet dataset.

  • split – The dataset split, such as “train” and “val”.

  • num_workers – The number of workers to use.

  • max_items – The maximum number of items to process.

  • batch – Whether to batch the data.

  • normalize – Whether to normalize the data. Only applicable when batch is True.

  • transfer – Whether to transfer the data to GPU.

Returns:

The number of items processed and the elapsed time in seconds.

run_dataloader(dataloader: Iterable, max_items: int) tuple[int, float][source]

Run the given dataloader and measure its performance.

Parameters:
  • dataloader – The dataloader to benchmark.

  • max_items – The maximum number of items to process.

Returns:

The number of items processed and the elapsed time in seconds.