video_dataloading

This example uses SPDL to decode and batch video frames, then send them to GPU.

The structure of the pipeline is identical to that of image_dataloading.

Basic Usage

Running this example requires a dataset consists of videos.

For example, to run this example with Kinetics dataset.

  1. Download Kinetics dataset. https://github.com/cvdfoundation/kinetics-dataset provides scripts to facilitate this.

  2. Create a list containing the downloaded videos.

    cd /data/users/moto/kinetics-dataset/k400/
    find train -name '*.mp4' > ~/imagenet.train.flist
    
  3. Run the script.

    python examples/video_dataloading.py
      --input-flist ~/kinetics400.train.flist
      --prefix /data/users/moto/kinetics-dataset/k400/
      --num-threads 8
    

Using GPU video decoder

When SPDL is built with NVDEC integration enabled, and the GPUs support NVDEC, providing --nvdec option switches the video decoder to NVDEC, using spdl.io.decode_packets_nvdec(). When using this option, adjust the number of threads (the number of concurrent decoding) to accomodate the number of hardware video decoder availabe on GPUs. For the details, please refer to https://developer.nvidia.com/video-encode-and-decode-gpu-support-matrix-new

Note

This example decodes videos from the beginning to the end, so using NVDEC speeds up the whole decoding speed. But in cases where framees are sampled, CPU decoding with higher concurrency often yields higher throughput.

Source

Source

Click here to see the source.
  1# Copyright (c) Meta Platforms, Inc. and affiliates.
  2# All rights reserved.
  3#
  4# This source code is licensed under the BSD-style license found in the
  5# LICENSE file in the root directory of this source tree.
  6
  7"""This example uses SPDL to decode and batch video frames, then send them to GPU.
  8
  9The structure of the pipeline is identical to that of
 10:py:mod:`image_dataloading`.
 11
 12Basic Usage
 13-----------
 14
 15Running this example requires a dataset consists of videos.
 16
 17For example, to run this example with Kinetics dataset.
 18
 191. Download Kinetics dataset.
 20   https://github.com/cvdfoundation/kinetics-dataset provides scripts to facilitate this.
 212. Create a list containing the downloaded videos.
 22
 23   .. code-block::
 24
 25      cd /data/users/moto/kinetics-dataset/k400/
 26      find train -name '*.mp4' > ~/imagenet.train.flist
 27
 283. Run the script.
 29
 30   .. code-block:: shell
 31
 32      python examples/video_dataloading.py
 33        --input-flist ~/kinetics400.train.flist
 34        --prefix /data/users/moto/kinetics-dataset/k400/
 35        --num-threads 8
 36
 37Using GPU video decoder
 38-----------------------
 39
 40When SPDL is built with NVDEC integration enabled, and the GPUs support NVDEC,
 41providing ``--nvdec`` option switches the video decoder to NVDEC, using
 42:py:func:`spdl.io.decode_packets_nvdec`. When using this option, adjust the
 43number of threads (the number of concurrent decoding) to accomodate
 44the number of hardware video decoder availabe on GPUs.
 45For the details, please refer to https://developer.nvidia.com/video-encode-and-decode-gpu-support-matrix-new
 46
 47.. note::
 48
 49   This example decodes videos from the beginning to the end, so using NVDEC
 50   speeds up the whole decoding speed. But in cases where framees are sampled,
 51   CPU decoding with higher concurrency often yields higher throughput.
 52"""
 53
 54# pyre-ignore-all-errors
 55
 56import logging
 57import signal
 58import time
 59from collections.abc import Callable, Iterable
 60from dataclasses import dataclass
 61from pathlib import Path
 62from threading import Event
 63
 64import spdl.io
 65import spdl.utils
 66import torch
 67from spdl.dataloader import Pipeline, PipelineBuilder
 68from torch import Tensor
 69
 70_LG = logging.getLogger(__name__)
 71
 72__all__ = [
 73    "entrypoint",
 74    "worker_entrypoint",
 75    "benchmark",
 76    "source",
 77    "decode_video",
 78    "decode_video_nvdec",
 79    "get_pipeline",
 80    "PerfResult",
 81]
 82
 83
 84def _parse_args(args):
 85    import argparse
 86
 87    parser = argparse.ArgumentParser(
 88        description=__doc__,
 89    )
 90    parser.add_argument("--debug", action="store_true")
 91    parser.add_argument("--input-flist", type=Path, required=True)
 92    parser.add_argument("--max-samples", type=int, default=float("inf"))
 93    parser.add_argument("--prefix", default="")
 94    parser.add_argument("--trace", type=Path)
 95    parser.add_argument("--queue-size", type=int, default=16)
 96    parser.add_argument("--num-threads", type=int, required=True)
 97    parser.add_argument("--worker-id", type=int, required=True)
 98    parser.add_argument("--num-workers", type=int, required=True)
 99    parser.add_argument("--nvdec", action="store_true")
100    args = parser.parse_args(args)
101    if args.trace:
102        args.max_samples = 320
103    return args
104
105
106def source(
107    input_flist: str,
108    prefix: str,
109    max_samples: int,
110    split_size: int = 1,
111    split_id: int = 0,
112) -> Iterable[str]:
113    """Iterate a file containing a list of paths, while optionally skipping some.
114
115    Args:
116        input_flist: A file contains list of video paths.
117        prefix: Prepended to the paths in the list.
118        max_samples: The maximum number of items to yield.
119        split_size: Split the paths in to this number of subsets.
120        split_id: The index of this split. Paths at ``line_number % split_size == split_id`` are returned.
121
122    Yields:
123        The paths of the specified split.
124    """
125    with open(input_flist, "r") as f:
126        num_yielded = 0
127        for i, line in enumerate(f):
128            if i % split_size != split_id:
129                continue
130            if line := line.strip():
131                yield prefix + line
132
133                if (num_yielded := num_yielded + 1) >= max_samples:
134                    return
135
136
137async def decode_video(
138    src: str | bytes,
139    width: int,
140    height: int,
141    device_index: int,
142) -> Tensor:
143    """Decode video and send decoded frames to GPU.
144
145    Args:
146        src: Data source. Passed to :py:func:`spdl.io.demux_video`.
147        width, height: The target resolution.
148        device_index: The index of the target GPU.
149
150    Returns:
151        A GPU tensor represents decoded video frames.
152        The dtype is uint8, the shape is ``[N, C, H, W]``, where ``N`` is the number
153        of frames in the video, ``C`` is RGB channels.
154    """
155    packets = await spdl.io.async_demux_video(src)
156    frames = await spdl.io.async_decode_packets(
157        packets,
158        filter_desc=spdl.io.get_filter_desc(
159            packets,
160            scale_width=width,
161            scale_height=height,
162            pix_fmt="rgb24",
163        ),
164    )
165    buffer = await spdl.io.async_convert_frames(frames)
166    buffer = await spdl.io.async_transfer_buffer(
167        buffer,
168        device_config=spdl.io.cuda_config(
169            device_index=device_index,
170            allocator=(
171                torch.cuda.caching_allocator_alloc,
172                torch.cuda.caching_allocator_delete,
173            ),
174        ),
175    )
176    return spdl.io.to_torch(buffer).permute(0, 2, 3, 1)
177
178
179async def decode_video_nvdec(
180    src: str,
181    device_index: int,
182    width: int,
183    height: int,
184):
185    """Decode video using NVDEC.
186
187    Args:
188        src: Data source. Passed to :py:func:`spdl.io.demux_video`.
189        device_index: The index of the target GPU.
190        width, height: The target resolution.
191
192    Returns:
193        A GPU tensor represents decoded video frames.
194        The dtype is uint8, the shape is ``[N, C, H, W]``, where ``N`` is the number
195        of frames in the video, ``C`` is RGB channels.
196    """
197    packets = await spdl.io.async_demux_video(src)
198    buffer = await spdl.io.async_decode_packets_nvdec(
199        packets,
200        device_config=spdl.io.cuda_config(
201            device_index=device_index,
202            allocator=(
203                torch.cuda.caching_allocator_alloc,
204                torch.cuda.caching_allocator_delete,
205            ),
206        ),
207        width=width,
208        height=height,
209        pix_fmt="rgba",
210    )
211    return spdl.io.to_torch(buffer)[..., :3].permute(0, 2, 3, 1)
212
213
214def _get_decode_fn(device_index, use_nvdec, width=222, height=222):
215    if use_nvdec:
216
217        async def _decode_func(src):
218            return await decode_video_nvdec(src, device_index, width, height)
219
220    else:
221
222        async def _decode_func(src):
223            return await decode_video(src, width, height, device_index)
224
225    return _decode_func
226
227
228def get_pipeline(
229    src: Iterable[str],
230    decode_fn: Callable[[str], Tensor],
231    decode_concurrency: int,
232    num_threads: int,
233    buffer_size: int = 3,
234) -> Pipeline:
235    """Construct the video loading pipeline.
236
237    Args:
238        src: Pipeline source. Generator that yields image paths. See :py:func:`source`.
239        decode_fn: Function that decode the given image and send the decoded frames to GPU.
240        decode_concurrency: The maximum number of decoding scheduled concurrently.
241        num_threads: The number of threads in the pipeline.
242        buffer_size: The size of buffer for the resulting batch image Tensor.
243    """
244    return (
245        PipelineBuilder()
246        .add_source(src)
247        .pipe(decode_fn, concurrency=decode_concurrency, report_stats_interval=15)
248        .add_sink(buffer_size)
249        .build(num_threads=num_threads)
250    )
251
252
253def _get_pipeline(args):
254    src = source(
255        input_flist=args.input_flist,
256        prefix=args.prefix,
257        max_samples=args.max_samples,
258        split_id=args.worker_id,
259        split_size=args.num_workers,
260    )
261
262    decode_fn = _get_decode_fn(args.worker_id, args.nvdec)
263    pipeline = get_pipeline(
264        src,
265        decode_fn,
266        decode_concurrency=args.num_threads,
267        num_threads=args.num_threads + 3,
268        buffer_size=args.queue_size,
269    )
270    print(pipeline)
271    return pipeline
272
273
274@dataclass
275class PerfResult:
276    """Used to report the worker performance to the main process."""
277
278    elapsed: float
279    """The time it took to process all the inputs."""
280
281    num_batches: int
282    """The number of batches processed."""
283
284    num_frames: int
285    """The number of frames processed."""
286
287
288def benchmark(
289    dataloader: Iterable[Tensor],
290    stop_requested: Event,
291) -> PerfResult:
292    """The main loop that measures the performance of dataloading.
293
294    Args:
295        dataloader: The dataloader to benchmark.
296        stop_requested: Used to interrupt the benchmark loop.
297
298    Returns:
299        The performance result.
300    """
301    t0 = time.monotonic()
302    num_frames = num_batches = 0
303    try:
304        for batches in dataloader:
305            for batch in batches:
306                num_frames += batch.shape[0]
307                num_batches += 1
308
309            if stop_requested.is_set():
310                break
311
312    finally:
313        elapsed = time.monotonic() - t0
314        fps = num_frames / elapsed
315        _LG.info(f"FPS={fps:.2f} ({num_frames} / {elapsed:.2f}), (Done {num_frames})")
316
317    return PerfResult(elapsed, num_batches, num_frames)
318
319
320def worker_entrypoint(args: list[str]) -> PerfResult:
321    """Entrypoint for worker process. Load images to a GPU and measure its performance.
322
323    It builds a Pipeline object using :py:func:`get_pipeline` function and run it with
324    :py:func:`benchmark` function.
325    """
326    args = _parse_args(args)
327    _init(args.debug, args.worker_id)
328
329    _LG.info(args)
330
331    pipeline = _get_pipeline(args)
332
333    device = torch.device(f"cuda:{args.worker_id}")
334
335    ev = Event()
336
337    def handler_stop_signals(_signum, _frame):
338        ev.set()
339
340    signal.signal(signal.SIGTERM, handler_stop_signals)
341
342    # Warm up
343    torch.zeros([1, 1], device=device)
344
345    trace_path = f"{args.trace}.{args.worker_id}"
346    with (
347        pipeline.auto_stop(),
348        spdl.utils.tracing(trace_path, enable=args.trace is not None),
349    ):
350        return benchmark(pipeline.get_iterator(), ev)
351
352
353def _init_logging(debug=False, worker_id=None):
354    fmt = "%(asctime)s [%(levelname)s] %(message)s"
355    if worker_id is not None:
356        fmt = f"[{worker_id}:%(thread)d] {fmt}"
357    level = logging.DEBUG if debug else logging.INFO
358    logging.basicConfig(format=fmt, level=level)
359
360
361def _init(debug, worker_id):
362    _init_logging(debug, worker_id)
363
364
365def _parse_process_args(args):
366    import argparse
367
368    parser = argparse.ArgumentParser(
369        description=__doc__,
370    )
371    parser.add_argument("--num-workers", type=int, default=8)
372    return parser.parse_known_args(args)
373
374
375def entrypoint(args: list[str] | None = None):
376    """CLI entrypoint. Launch the worker processes, each of which load videos and send them to GPU."""
377    ns, args = _parse_process_args(args)
378
379    args_set = [
380        [*args, f"--worker-id={i}", f"--num-workers={ns.num_workers}"]
381        for i in range(ns.num_workers)
382    ]
383
384    from multiprocessing import Pool
385
386    with Pool(processes=ns.num_workers) as pool:
387        _init_logging()
388        _LG.info("Spawned: %d workers", ns.num_workers)
389
390        vals = pool.map(worker_entrypoint, args_set)
391
392    ave_time = sum(v.elapsed for v in vals) / len(vals)
393    total_frames = sum(v.num_frames for v in vals)
394    total_batches = sum(v.num_batches for v in vals)
395
396    _LG.info(f"{ave_time=:.2f}, {total_frames=}, {total_batches=}")
397
398    FPS = total_frames / ave_time
399    BPS = total_batches / ave_time
400    _LG.info(f"Aggregated {FPS=:.2f}, {BPS=:.2f}")
401
402
403if __name__ == "__main__":
404    entrypoint()

Functions

Functions

entrypoint(args: list[str] | None = None)[source]

CLI entrypoint. Launch the worker processes, each of which load videos and send them to GPU.

worker_entrypoint(args: list[str]) PerfResult[source]

Entrypoint for worker process. Load images to a GPU and measure its performance.

It builds a Pipeline object using get_pipeline() function and run it with benchmark() function.

benchmark(dataloader: Iterable[Tensor], stop_requested: Event) PerfResult[source]

The main loop that measures the performance of dataloading.

Parameters:
  • dataloader – The dataloader to benchmark.

  • stop_requested – Used to interrupt the benchmark loop.

Returns:

The performance result.

source(input_flist: str, prefix: str, max_samples: int, split_size: int = 1, split_id: int = 0) Iterable[str][source]

Iterate a file containing a list of paths, while optionally skipping some.

Parameters:
  • input_flist – A file contains list of video paths.

  • prefix – Prepended to the paths in the list.

  • max_samples – The maximum number of items to yield.

  • split_size – Split the paths in to this number of subsets.

  • split_id – The index of this split. Paths at line_number % split_size == split_id are returned.

Yields:

The paths of the specified split.

async decode_video(src: str | bytes, width: int, height: int, device_index: int) Tensor[source]

Decode video and send decoded frames to GPU.

Parameters:
  • src – Data source. Passed to spdl.io.demux_video().

  • width – The target resolution.

  • height – The target resolution.

  • device_index – The index of the target GPU.

Returns:

A GPU tensor represents decoded video frames. The dtype is uint8, the shape is [N, C, H, W], where N is the number of frames in the video, C is RGB channels.

async decode_video_nvdec(src: str, device_index: int, width: int, height: int)[source]

Decode video using NVDEC.

Parameters:
  • src – Data source. Passed to spdl.io.demux_video().

  • device_index – The index of the target GPU.

  • width – The target resolution.

  • height – The target resolution.

Returns:

A GPU tensor represents decoded video frames. The dtype is uint8, the shape is [N, C, H, W], where N is the number of frames in the video, C is RGB channels.

get_pipeline(src: Iterable[str], decode_fn: Callable[[str], Tensor], decode_concurrency: int, num_threads: int, buffer_size: int = 3) Pipeline[source]

Construct the video loading pipeline.

Parameters:
  • src – Pipeline source. Generator that yields image paths. See source().

  • decode_fn – Function that decode the given image and send the decoded frames to GPU.

  • decode_concurrency – The maximum number of decoding scheduled concurrently.

  • num_threads – The number of threads in the pipeline.

  • buffer_size – The size of buffer for the resulting batch image Tensor.

Classes

Classes

class PerfResult(elapsed: float, num_batches: int, num_frames: int)[source]

Used to report the worker performance to the main process.

elapsed: float

The time it took to process all the inputs.

num_batches: int

The number of batches processed.

num_frames: int

The number of frames processed.