Video dataloading

This example uses SPDL to decode and batch video frames, then send them to GPU.

The structure of the pipeline is identical to that of image_dataloading.

Basic Usage

Running this example requires a dataset consists of videos.

For example, to run this example with Kinetics dataset.

  1. Download Kinetics dataset. https://github.com/cvdfoundation/kinetics-dataset provides scripts to facilitate this.

  2. Create a list containing the downloaded videos.

    cd /data/users/moto/kinetics-dataset/k400/
    find train -name '*.mp4' > ~/imagenet.train.flist
    
  3. Run the script.

    python examples/video_dataloading.py
      --input-flist ~/kinetics400.train.flist
      --prefix /data/users/moto/kinetics-dataset/k400/
      --num-threads 8
    

Using GPU video decoder

When SPDL is built with NVDEC integration enabled, and the GPUs support NVDEC, providing --nvdec option switches the video decoder to NVDEC, using spdl.io.decode_packets_nvdec(). When using this option, adjust the number of threads (the number of concurrent decoding) to accommodate the number of hardware video decoder available on GPUs. For the details, please refer to https://developer.nvidia.com/video-encode-and-decode-gpu-support-matrix-new

Note

This example decodes videos from the beginning to the end, so using NVDEC speeds up the whole decoding speed. But in cases where framees are sampled, CPU decoding with higher concurrency often yields higher throughput.

Source

Source

Click here to see the source.
  1# Copyright (c) Meta Platforms, Inc. and affiliates.
  2# All rights reserved.
  3#
  4# This source code is licensed under the BSD-style license found in the
  5# LICENSE file in the root directory of this source tree.
  6
  7"""This example uses SPDL to decode and batch video frames, then send them to GPU.
  8
  9The structure of the pipeline is identical to that of
 10:py:mod:`image_dataloading`.
 11
 12Basic Usage
 13-----------
 14
 15Running this example requires a dataset consists of videos.
 16
 17For example, to run this example with Kinetics dataset.
 18
 191. Download Kinetics dataset.
 20   https://github.com/cvdfoundation/kinetics-dataset provides scripts to facilitate this.
 212. Create a list containing the downloaded videos.
 22
 23   .. code-block::
 24
 25      cd /data/users/moto/kinetics-dataset/k400/
 26      find train -name '*.mp4' > ~/imagenet.train.flist
 27
 283. Run the script.
 29
 30   .. code-block:: shell
 31
 32      python examples/video_dataloading.py
 33        --input-flist ~/kinetics400.train.flist
 34        --prefix /data/users/moto/kinetics-dataset/k400/
 35        --num-threads 8
 36
 37Using GPU video decoder
 38-----------------------
 39
 40When SPDL is built with NVDEC integration enabled, and the GPUs support NVDEC,
 41providing ``--nvdec`` option switches the video decoder to NVDEC, using
 42:py:func:`spdl.io.decode_packets_nvdec`. When using this option, adjust the
 43number of threads (the number of concurrent decoding) to accommodate
 44the number of hardware video decoder available on GPUs.
 45For the details, please refer to https://developer.nvidia.com/video-encode-and-decode-gpu-support-matrix-new
 46
 47.. note::
 48
 49   This example decodes videos from the beginning to the end, so using NVDEC
 50   speeds up the whole decoding speed. But in cases where framees are sampled,
 51   CPU decoding with higher concurrency often yields higher throughput.
 52"""
 53
 54# pyre-strict
 55
 56import argparse
 57import logging
 58import signal
 59import time
 60from argparse import Namespace
 61from collections.abc import Callable, Iterable
 62from dataclasses import dataclass
 63from pathlib import Path
 64from threading import Event
 65from types import FrameType
 66
 67import spdl.io
 68import spdl.io.utils
 69import torch
 70from spdl.pipeline import Pipeline, PipelineBuilder
 71from torch import Tensor
 72
 73_LG: logging.Logger = logging.getLogger(__name__)
 74
 75__all__ = [
 76    "entrypoint",
 77    "worker_entrypoint",
 78    "benchmark",
 79    "source",
 80    "decode_video",
 81    "decode_video_nvdec",
 82    "get_pipeline",
 83    "PerfResult",
 84]
 85
 86
 87def _parse_args(args: list[str]) -> Namespace:
 88    parser = argparse.ArgumentParser(
 89        description=__doc__,
 90    )
 91    parser.add_argument("--debug", action="store_true")
 92    parser.add_argument("--input-flist", type=Path, required=True)
 93    parser.add_argument("--max-samples", type=int, default=float("inf"))
 94    parser.add_argument("--prefix", default="")
 95    parser.add_argument("--trace", type=Path)
 96    parser.add_argument("--queue-size", type=int, default=16)
 97    parser.add_argument("--num-threads", type=int, required=True)
 98    parser.add_argument("--worker-id", type=int, required=True)
 99    parser.add_argument("--num-workers", type=int, required=True)
100    parser.add_argument("--nvdec", action="store_true")
101    ns = parser.parse_args(args)
102    if ns.trace:
103        ns.max_samples = 320
104    return ns
105
106
107def source(
108    input_flist: str,
109    prefix: str,
110    max_samples: int,
111    split_size: int = 1,
112    split_id: int = 0,
113) -> Iterable[str]:
114    """Iterate a file containing a list of paths, while optionally skipping some.
115
116    Args:
117        input_flist: A file contains list of video paths.
118        prefix: Prepended to the paths in the list.
119        max_samples: The maximum number of items to yield.
120        split_size: Split the paths in to this number of subsets.
121        split_id: The index of this split. Paths at ``line_number % split_size == split_id`` are returned.
122
123    Yields:
124        The paths of the specified split.
125    """
126    with open(input_flist, "r") as f:
127        num_yielded = 0
128        for i, line in enumerate(f):
129            if i % split_size != split_id:
130                continue
131            if line := line.strip():
132                yield prefix + line
133
134                if (num_yielded := num_yielded + 1) >= max_samples:
135                    return
136
137
138def decode_video(
139    src: str | bytes,
140    width: int,
141    height: int,
142    device_index: int,
143) -> Tensor:
144    """Decode video and send decoded frames to GPU.
145
146    Args:
147        src: Data source. Passed to :py:func:`spdl.io.demux_video`.
148        width, height: The target resolution.
149        device_index: The index of the target GPU.
150
151    Returns:
152        A GPU tensor represents decoded video frames.
153        The dtype is uint8, the shape is ``[N, C, H, W]``, where ``N`` is the number
154        of frames in the video, ``C`` is RGB channels.
155    """
156    packets = spdl.io.demux_video(src)
157    frames = spdl.io.decode_packets(
158        packets,
159        filter_desc=spdl.io.get_filter_desc(
160            packets,
161            scale_width=width,
162            scale_height=height,
163            pix_fmt="rgb24",
164        ),
165    )
166    buffer = spdl.io.convert_frames(frames)
167    buffer = spdl.io.transfer_buffer(
168        buffer,
169        device_config=spdl.io.cuda_config(
170            device_index=device_index,
171            allocator=(
172                torch.cuda.caching_allocator_alloc,
173                torch.cuda.caching_allocator_delete,
174            ),
175        ),
176    )
177    return spdl.io.to_torch(buffer).permute(0, 2, 3, 1)
178
179
180def decode_video_nvdec(
181    src: str,
182    device_index: int,
183    width: int,
184    height: int,
185) -> Tensor:
186    """Decode video using NVDEC.
187
188    Args:
189        src: Data source. Passed to :py:func:`spdl.io.demux_video`.
190        device_index: The index of the target GPU.
191        width, height: The target resolution.
192
193    Returns:
194        A GPU tensor represents decoded video frames.
195        The dtype is uint8, the shape is ``[N, C, H, W]``, where ``N`` is the number
196        of frames in the video, ``C`` is RGB channels.
197    """
198    packets = spdl.io.demux_video(src)
199    buffer = spdl.io.decode_packets_nvdec(
200        packets,
201        device_config=spdl.io.cuda_config(
202            device_index=device_index,
203            allocator=(
204                torch.cuda.caching_allocator_alloc,
205                torch.cuda.caching_allocator_delete,
206            ),
207        ),
208        scale_width=width,
209        scale_height=height,
210        pix_fmt="rgb",
211    )
212    return spdl.io.to_torch(buffer)[..., :3].permute(0, 2, 3, 1)
213
214
215def _get_decode_fn(
216    device_index: int, use_nvdec: bool, width: int = 222, height: int = 222
217) -> Callable[[str], Tensor]:
218    if use_nvdec:
219
220        def _decode_func(src: str) -> Tensor:
221            return decode_video_nvdec(src, device_index, width, height)
222
223    else:
224
225        def _decode_func(src: str) -> Tensor:
226            return decode_video(src, width, height, device_index)
227
228    return _decode_func
229
230
231def get_pipeline(
232    src: Iterable[str],
233    decode_fn: Callable[[str], Tensor],
234    decode_concurrency: int,
235    num_threads: int,
236    buffer_size: int = 3,
237) -> Pipeline:
238    """Construct the video loading pipeline.
239
240    Args:
241        src: Pipeline source. Generator that yields image paths. See :py:func:`source`.
242        decode_fn: Function that decode the given image and send the decoded frames to GPU.
243        decode_concurrency: The maximum number of decoding scheduled concurrently.
244        num_threads: The number of threads in the pipeline.
245        buffer_size: The size of buffer for the resulting batch image Tensor.
246    """
247    return (
248        PipelineBuilder()
249        .add_source(src)
250        .pipe(decode_fn, concurrency=decode_concurrency)
251        .add_sink(buffer_size)
252        .build(num_threads=num_threads, report_stats_interval=15)
253    )
254
255
256def _get_pipeline(args: Namespace) -> Pipeline:
257    src = source(
258        input_flist=args.input_flist,
259        prefix=args.prefix,
260        max_samples=args.max_samples,
261        split_id=args.worker_id,
262        split_size=args.num_workers,
263    )
264
265    decode_fn = _get_decode_fn(args.worker_id, args.nvdec)
266    pipeline = get_pipeline(
267        src,
268        decode_fn,
269        decode_concurrency=args.num_threads,
270        num_threads=args.num_threads + 3,
271        buffer_size=args.queue_size,
272    )
273    print(pipeline)
274    return pipeline
275
276
277@dataclass
278class PerfResult:
279    """Used to report the worker performance to the main process."""
280
281    elapsed: float
282    """The time it took to process all the inputs."""
283
284    num_batches: int
285    """The number of batches processed."""
286
287    num_frames: int
288    """The number of frames processed."""
289
290
291def benchmark(
292    dataloader: Iterable[Tensor],
293    stop_requested: Event,
294) -> PerfResult:
295    """The main loop that measures the performance of dataloading.
296
297    Args:
298        dataloader: The dataloader to benchmark.
299        stop_requested: Used to interrupt the benchmark loop.
300
301    Returns:
302        The performance result.
303    """
304    t0 = time.monotonic()
305    num_frames = num_batches = 0
306    try:
307        for batches in dataloader:
308            for batch in batches:
309                num_frames += batch.shape[0]
310                num_batches += 1
311
312            if stop_requested.is_set():
313                break
314
315    finally:
316        elapsed = time.monotonic() - t0
317        fps = num_frames / elapsed
318        _LG.info(f"FPS={fps:.2f} ({num_frames} / {elapsed:.2f}), (Done {num_frames})")
319
320    return PerfResult(elapsed, num_batches, num_frames)
321
322
323def worker_entrypoint(args_: list[str]) -> PerfResult:
324    """Entrypoint for worker process. Load images to a GPU and measure its performance.
325
326    It builds a Pipeline object using :py:func:`get_pipeline` function and run it with
327    :py:func:`benchmark` function.
328    """
329    args = _parse_args(args_)
330    _init(args.debug, args.worker_id)
331
332    _LG.info(args)
333
334    pipeline = _get_pipeline(args)
335
336    device = torch.device(f"cuda:{args.worker_id}")
337
338    ev: Event = Event()
339
340    def handler_stop_signals(_signum: int, _frame: FrameType | None) -> None:
341        ev.set()
342
343    signal.signal(signal.SIGTERM, handler_stop_signals)
344
345    # Warm up
346    torch.zeros([1, 1], device=device)
347
348    trace_path = f"{args.trace}.{args.worker_id}"
349    with (
350        pipeline.auto_stop(),
351        spdl.io.utils.tracing(trace_path, enable=args.trace is not None),
352    ):
353        return benchmark(pipeline.get_iterator(), ev)
354
355
356def _init_logging(debug: bool = False, worker_id: int | None = None) -> None:
357    fmt = "%(asctime)s [%(levelname)s] %(message)s"
358    if worker_id is not None:
359        fmt = f"[{worker_id}:%(thread)d] {fmt}"
360    level = logging.DEBUG if debug else logging.INFO
361    logging.basicConfig(format=fmt, level=level)
362
363
364def _init(debug: bool, worker_id: int) -> None:
365    _init_logging(debug, worker_id)
366
367
368def _parse_process_args(args: list[str] | None) -> tuple[Namespace, list[str]]:
369    parser = argparse.ArgumentParser(
370        description=__doc__,
371    )
372    parser.add_argument("--num-workers", type=int, default=8)
373    return parser.parse_known_args(args)
374
375
376def entrypoint(args: list[str] | None = None) -> None:
377    """CLI entrypoint. Launch the worker processes, each of which load videos and send them to GPU."""
378    ns, args = _parse_process_args(args)
379
380    args_set = [
381        [*args, f"--worker-id={i}", f"--num-workers={ns.num_workers}"]
382        for i in range(ns.num_workers)
383    ]
384
385    from multiprocessing import Pool
386
387    with Pool(processes=ns.num_workers) as pool:
388        _init_logging()
389        _LG.info("Spawned: %d workers", ns.num_workers)
390
391        vals = pool.map(worker_entrypoint, args_set)
392
393    ave_time = sum(v.elapsed for v in vals) / len(vals)
394    total_frames = sum(v.num_frames for v in vals)
395    total_batches = sum(v.num_batches for v in vals)
396
397    _LG.info(f"{ave_time=:.2f}, {total_frames=}, {total_batches=}")
398
399    FPS = total_frames / ave_time
400    BPS = total_batches / ave_time
401    _LG.info(f"Aggregated {FPS=:.2f}, {BPS=:.2f}")
402
403
404if __name__ == "__main__":
405    entrypoint()

API Reference

Functions

entrypoint(args: list[str] | None = None) None[source]

CLI entrypoint. Launch the worker processes, each of which load videos and send them to GPU.

worker_entrypoint(args_: list[str]) PerfResult[source]

Entrypoint for worker process. Load images to a GPU and measure its performance.

It builds a Pipeline object using get_pipeline() function and run it with benchmark() function.

benchmark(dataloader: Iterable[Tensor], stop_requested: Event) PerfResult[source]

The main loop that measures the performance of dataloading.

Parameters:
  • dataloader – The dataloader to benchmark.

  • stop_requested – Used to interrupt the benchmark loop.

Returns:

The performance result.

source(input_flist: str, prefix: str, max_samples: int, split_size: int = 1, split_id: int = 0) Iterable[str][source]

Iterate a file containing a list of paths, while optionally skipping some.

Parameters:
  • input_flist – A file contains list of video paths.

  • prefix – Prepended to the paths in the list.

  • max_samples – The maximum number of items to yield.

  • split_size – Split the paths in to this number of subsets.

  • split_id – The index of this split. Paths at line_number % split_size == split_id are returned.

Yields:

The paths of the specified split.

decode_video(src: str | bytes, width: int, height: int, device_index: int) Tensor[source]

Decode video and send decoded frames to GPU.

Parameters:
  • src – Data source. Passed to spdl.io.demux_video().

  • width – The target resolution.

  • height – The target resolution.

  • device_index – The index of the target GPU.

Returns:

A GPU tensor represents decoded video frames. The dtype is uint8, the shape is [N, C, H, W], where N is the number of frames in the video, C is RGB channels.

decode_video_nvdec(src: str, device_index: int, width: int, height: int) Tensor[source]

Decode video using NVDEC.

Parameters:
  • src – Data source. Passed to spdl.io.demux_video().

  • device_index – The index of the target GPU.

  • width – The target resolution.

  • height – The target resolution.

Returns:

A GPU tensor represents decoded video frames. The dtype is uint8, the shape is [N, C, H, W], where N is the number of frames in the video, C is RGB channels.

get_pipeline(src: Iterable[str], decode_fn: Callable[[str], Tensor], decode_concurrency: int, num_threads: int, buffer_size: int = 3) Pipeline[source]

Construct the video loading pipeline.

Parameters:
  • src – Pipeline source. Generator that yields image paths. See source().

  • decode_fn – Function that decode the given image and send the decoded frames to GPU.

  • decode_concurrency – The maximum number of decoding scheduled concurrently.

  • num_threads – The number of threads in the pipeline.

  • buffer_size – The size of buffer for the resulting batch image Tensor.

Classes

class PerfResult(elapsed: float, num_batches: int, num_frames: int)[source]

Used to report the worker performance to the main process.

elapsed: float

The time it took to process all the inputs.

num_batches: int

The number of batches processed.

num_frames: int

The number of frames processed.