Video dataloading¶
This example uses SPDL to decode and batch video frames, then send them to GPU.
The structure of the pipeline is identical to that of
image_dataloading.
Basic Usage¶
Running this example requires a dataset consists of videos.
For example, to run this example with Kinetics dataset.
Download Kinetics dataset. https://github.com/cvdfoundation/kinetics-dataset provides scripts to facilitate this.
Create a list containing the downloaded videos.
cd /data/users/moto/kinetics-dataset/k400/ find train -name '*.mp4' > ~/imagenet.train.flist
Run the script.
python examples/video_dataloading.py --input-flist ~/kinetics400.train.flist --prefix /data/users/moto/kinetics-dataset/k400/ --num-threads 8
Using GPU video decoder¶
When SPDL is built with NVDEC integration enabled, and the GPUs support NVDEC,
providing --nvdec option switches the video decoder to NVDEC, using
spdl.io.decode_packets_nvdec(). When using this option, adjust the
number of threads (the number of concurrent decoding) to accommodate
the number of hardware video decoder available on GPUs.
For the details, please refer to https://developer.nvidia.com/video-encode-and-decode-gpu-support-matrix-new
Note
This example decodes videos from the beginning to the end, so using NVDEC speeds up the whole decoding speed. But in cases where framees are sampled, CPU decoding with higher concurrency often yields higher throughput.
Source¶
Source
Click here to see the source.
1# Copyright (c) Meta Platforms, Inc. and affiliates.
2# All rights reserved.
3#
4# This source code is licensed under the BSD-style license found in the
5# LICENSE file in the root directory of this source tree.
6
7"""This example uses SPDL to decode and batch video frames, then send them to GPU.
8
9The structure of the pipeline is identical to that of
10:py:mod:`image_dataloading`.
11
12Basic Usage
13-----------
14
15Running this example requires a dataset consists of videos.
16
17For example, to run this example with Kinetics dataset.
18
191. Download Kinetics dataset.
20 https://github.com/cvdfoundation/kinetics-dataset provides scripts to facilitate this.
212. Create a list containing the downloaded videos.
22
23 .. code-block::
24
25 cd /data/users/moto/kinetics-dataset/k400/
26 find train -name '*.mp4' > ~/imagenet.train.flist
27
283. Run the script.
29
30 .. code-block:: shell
31
32 python examples/video_dataloading.py
33 --input-flist ~/kinetics400.train.flist
34 --prefix /data/users/moto/kinetics-dataset/k400/
35 --num-threads 8
36
37Using GPU video decoder
38-----------------------
39
40When SPDL is built with NVDEC integration enabled, and the GPUs support NVDEC,
41providing ``--nvdec`` option switches the video decoder to NVDEC, using
42:py:func:`spdl.io.decode_packets_nvdec`. When using this option, adjust the
43number of threads (the number of concurrent decoding) to accommodate
44the number of hardware video decoder available on GPUs.
45For the details, please refer to https://developer.nvidia.com/video-encode-and-decode-gpu-support-matrix-new
46
47.. note::
48
49 This example decodes videos from the beginning to the end, so using NVDEC
50 speeds up the whole decoding speed. But in cases where framees are sampled,
51 CPU decoding with higher concurrency often yields higher throughput.
52"""
53
54# pyre-strict
55
56import argparse
57import logging
58import signal
59import time
60from argparse import Namespace
61from collections.abc import Callable, Iterable
62from dataclasses import dataclass
63from pathlib import Path
64from threading import Event
65from types import FrameType
66
67import spdl.io
68import spdl.io.utils
69import torch
70from spdl.pipeline import Pipeline, PipelineBuilder
71from torch import Tensor
72
73_LG: logging.Logger = logging.getLogger(__name__)
74
75__all__ = [
76 "entrypoint",
77 "worker_entrypoint",
78 "benchmark",
79 "source",
80 "decode_video",
81 "decode_video_nvdec",
82 "get_pipeline",
83 "PerfResult",
84]
85
86
87def _parse_args(args: list[str]) -> Namespace:
88 parser = argparse.ArgumentParser(
89 description=__doc__,
90 )
91 parser.add_argument("--debug", action="store_true")
92 parser.add_argument("--input-flist", type=Path, required=True)
93 parser.add_argument("--max-samples", type=int, default=float("inf"))
94 parser.add_argument("--prefix", default="")
95 parser.add_argument("--trace", type=Path)
96 parser.add_argument("--queue-size", type=int, default=16)
97 parser.add_argument("--num-threads", type=int, required=True)
98 parser.add_argument("--worker-id", type=int, required=True)
99 parser.add_argument("--num-workers", type=int, required=True)
100 parser.add_argument("--nvdec", action="store_true")
101 ns = parser.parse_args(args)
102 if ns.trace:
103 ns.max_samples = 320
104 return ns
105
106
107def source(
108 input_flist: str,
109 prefix: str,
110 max_samples: int,
111 split_size: int = 1,
112 split_id: int = 0,
113) -> Iterable[str]:
114 """Iterate a file containing a list of paths, while optionally skipping some.
115
116 Args:
117 input_flist: A file contains list of video paths.
118 prefix: Prepended to the paths in the list.
119 max_samples: The maximum number of items to yield.
120 split_size: Split the paths in to this number of subsets.
121 split_id: The index of this split. Paths at ``line_number % split_size == split_id`` are returned.
122
123 Yields:
124 The paths of the specified split.
125 """
126 with open(input_flist, "r") as f:
127 num_yielded = 0
128 for i, line in enumerate(f):
129 if i % split_size != split_id:
130 continue
131 if line := line.strip():
132 yield prefix + line
133
134 if (num_yielded := num_yielded + 1) >= max_samples:
135 return
136
137
138def decode_video(
139 src: str | bytes,
140 width: int,
141 height: int,
142 device_index: int,
143) -> Tensor:
144 """Decode video and send decoded frames to GPU.
145
146 Args:
147 src: Data source. Passed to :py:func:`spdl.io.demux_video`.
148 width, height: The target resolution.
149 device_index: The index of the target GPU.
150
151 Returns:
152 A GPU tensor represents decoded video frames.
153 The dtype is uint8, the shape is ``[N, C, H, W]``, where ``N`` is the number
154 of frames in the video, ``C`` is RGB channels.
155 """
156 packets = spdl.io.demux_video(src)
157 frames = spdl.io.decode_packets(
158 packets,
159 filter_desc=spdl.io.get_filter_desc(
160 packets,
161 scale_width=width,
162 scale_height=height,
163 pix_fmt="rgb24",
164 ),
165 )
166 buffer = spdl.io.convert_frames(frames)
167 buffer = spdl.io.transfer_buffer(
168 buffer,
169 device_config=spdl.io.cuda_config(
170 device_index=device_index,
171 allocator=(
172 torch.cuda.caching_allocator_alloc,
173 torch.cuda.caching_allocator_delete,
174 ),
175 ),
176 )
177 return spdl.io.to_torch(buffer).permute(0, 2, 3, 1)
178
179
180def decode_video_nvdec(
181 src: str,
182 device_index: int,
183 width: int,
184 height: int,
185) -> Tensor:
186 """Decode video using NVDEC.
187
188 Args:
189 src: Data source. Passed to :py:func:`spdl.io.demux_video`.
190 device_index: The index of the target GPU.
191 width, height: The target resolution.
192
193 Returns:
194 A GPU tensor represents decoded video frames.
195 The dtype is uint8, the shape is ``[N, C, H, W]``, where ``N`` is the number
196 of frames in the video, ``C`` is RGB channels.
197 """
198 packets = spdl.io.demux_video(src)
199 buffer = spdl.io.decode_packets_nvdec(
200 packets,
201 device_config=spdl.io.cuda_config(
202 device_index=device_index,
203 allocator=(
204 torch.cuda.caching_allocator_alloc,
205 torch.cuda.caching_allocator_delete,
206 ),
207 ),
208 scale_width=width,
209 scale_height=height,
210 pix_fmt="rgb",
211 )
212 return spdl.io.to_torch(buffer)[..., :3].permute(0, 2, 3, 1)
213
214
215def _get_decode_fn(
216 device_index: int, use_nvdec: bool, width: int = 222, height: int = 222
217) -> Callable[[str], Tensor]:
218 if use_nvdec:
219
220 def _decode_func(src: str) -> Tensor:
221 return decode_video_nvdec(src, device_index, width, height)
222
223 else:
224
225 def _decode_func(src: str) -> Tensor:
226 return decode_video(src, width, height, device_index)
227
228 return _decode_func
229
230
231def get_pipeline(
232 src: Iterable[str],
233 decode_fn: Callable[[str], Tensor],
234 decode_concurrency: int,
235 num_threads: int,
236 buffer_size: int = 3,
237) -> Pipeline:
238 """Construct the video loading pipeline.
239
240 Args:
241 src: Pipeline source. Generator that yields image paths. See :py:func:`source`.
242 decode_fn: Function that decode the given image and send the decoded frames to GPU.
243 decode_concurrency: The maximum number of decoding scheduled concurrently.
244 num_threads: The number of threads in the pipeline.
245 buffer_size: The size of buffer for the resulting batch image Tensor.
246 """
247 return (
248 PipelineBuilder()
249 .add_source(src)
250 .pipe(decode_fn, concurrency=decode_concurrency)
251 .add_sink(buffer_size)
252 .build(num_threads=num_threads, report_stats_interval=15)
253 )
254
255
256def _get_pipeline(args: Namespace) -> Pipeline:
257 src = source(
258 input_flist=args.input_flist,
259 prefix=args.prefix,
260 max_samples=args.max_samples,
261 split_id=args.worker_id,
262 split_size=args.num_workers,
263 )
264
265 decode_fn = _get_decode_fn(args.worker_id, args.nvdec)
266 pipeline = get_pipeline(
267 src,
268 decode_fn,
269 decode_concurrency=args.num_threads,
270 num_threads=args.num_threads + 3,
271 buffer_size=args.queue_size,
272 )
273 print(pipeline)
274 return pipeline
275
276
277@dataclass
278class PerfResult:
279 """Used to report the worker performance to the main process."""
280
281 elapsed: float
282 """The time it took to process all the inputs."""
283
284 num_batches: int
285 """The number of batches processed."""
286
287 num_frames: int
288 """The number of frames processed."""
289
290
291def benchmark(
292 dataloader: Iterable[Tensor],
293 stop_requested: Event,
294) -> PerfResult:
295 """The main loop that measures the performance of dataloading.
296
297 Args:
298 dataloader: The dataloader to benchmark.
299 stop_requested: Used to interrupt the benchmark loop.
300
301 Returns:
302 The performance result.
303 """
304 t0 = time.monotonic()
305 num_frames = num_batches = 0
306 try:
307 for batches in dataloader:
308 for batch in batches:
309 num_frames += batch.shape[0]
310 num_batches += 1
311
312 if stop_requested.is_set():
313 break
314
315 finally:
316 elapsed = time.monotonic() - t0
317 fps = num_frames / elapsed
318 _LG.info(f"FPS={fps:.2f} ({num_frames} / {elapsed:.2f}), (Done {num_frames})")
319
320 return PerfResult(elapsed, num_batches, num_frames)
321
322
323def worker_entrypoint(args_: list[str]) -> PerfResult:
324 """Entrypoint for worker process. Load images to a GPU and measure its performance.
325
326 It builds a Pipeline object using :py:func:`get_pipeline` function and run it with
327 :py:func:`benchmark` function.
328 """
329 args = _parse_args(args_)
330 _init(args.debug, args.worker_id)
331
332 _LG.info(args)
333
334 pipeline = _get_pipeline(args)
335
336 device = torch.device(f"cuda:{args.worker_id}")
337
338 ev: Event = Event()
339
340 def handler_stop_signals(_signum: int, _frame: FrameType | None) -> None:
341 ev.set()
342
343 signal.signal(signal.SIGTERM, handler_stop_signals)
344
345 # Warm up
346 torch.zeros([1, 1], device=device)
347
348 trace_path = f"{args.trace}.{args.worker_id}"
349 with (
350 pipeline.auto_stop(),
351 spdl.io.utils.tracing(trace_path, enable=args.trace is not None),
352 ):
353 return benchmark(pipeline.get_iterator(), ev)
354
355
356def _init_logging(debug: bool = False, worker_id: int | None = None) -> None:
357 fmt = "%(asctime)s [%(levelname)s] %(message)s"
358 if worker_id is not None:
359 fmt = f"[{worker_id}:%(thread)d] {fmt}"
360 level = logging.DEBUG if debug else logging.INFO
361 logging.basicConfig(format=fmt, level=level)
362
363
364def _init(debug: bool, worker_id: int) -> None:
365 _init_logging(debug, worker_id)
366
367
368def _parse_process_args(args: list[str] | None) -> tuple[Namespace, list[str]]:
369 parser = argparse.ArgumentParser(
370 description=__doc__,
371 )
372 parser.add_argument("--num-workers", type=int, default=8)
373 return parser.parse_known_args(args)
374
375
376def entrypoint(args: list[str] | None = None) -> None:
377 """CLI entrypoint. Launch the worker processes, each of which load videos and send them to GPU."""
378 ns, args = _parse_process_args(args)
379
380 args_set = [
381 [*args, f"--worker-id={i}", f"--num-workers={ns.num_workers}"]
382 for i in range(ns.num_workers)
383 ]
384
385 from multiprocessing import Pool
386
387 with Pool(processes=ns.num_workers) as pool:
388 _init_logging()
389 _LG.info("Spawned: %d workers", ns.num_workers)
390
391 vals = pool.map(worker_entrypoint, args_set)
392
393 ave_time = sum(v.elapsed for v in vals) / len(vals)
394 total_frames = sum(v.num_frames for v in vals)
395 total_batches = sum(v.num_batches for v in vals)
396
397 _LG.info(f"{ave_time=:.2f}, {total_frames=}, {total_batches=}")
398
399 FPS = total_frames / ave_time
400 BPS = total_batches / ave_time
401 _LG.info(f"Aggregated {FPS=:.2f}, {BPS=:.2f}")
402
403
404if __name__ == "__main__":
405 entrypoint()
API Reference¶
Functions
- entrypoint(args: list[str] | None = None) None[source]¶
CLI entrypoint. Launch the worker processes, each of which load videos and send them to GPU.
- worker_entrypoint(args_: list[str]) PerfResult[source]¶
Entrypoint for worker process. Load images to a GPU and measure its performance.
It builds a Pipeline object using
get_pipeline()function and run it withbenchmark()function.
- benchmark(dataloader: Iterable[Tensor], stop_requested: Event) PerfResult[source]¶
The main loop that measures the performance of dataloading.
- Parameters:
dataloader – The dataloader to benchmark.
stop_requested – Used to interrupt the benchmark loop.
- Returns:
The performance result.
- source(input_flist: str, prefix: str, max_samples: int, split_size: int = 1, split_id: int = 0) Iterable[str][source]¶
Iterate a file containing a list of paths, while optionally skipping some.
- Parameters:
input_flist – A file contains list of video paths.
prefix – Prepended to the paths in the list.
max_samples – The maximum number of items to yield.
split_size – Split the paths in to this number of subsets.
split_id – The index of this split. Paths at
line_number % split_size == split_idare returned.
- Yields:
The paths of the specified split.
- decode_video(src: str | bytes, width: int, height: int, device_index: int) Tensor[source]¶
Decode video and send decoded frames to GPU.
- Parameters:
src – Data source. Passed to
spdl.io.demux_video().width – The target resolution.
height – The target resolution.
device_index – The index of the target GPU.
- Returns:
A GPU tensor represents decoded video frames. The dtype is uint8, the shape is
[N, C, H, W], whereNis the number of frames in the video,Cis RGB channels.
- decode_video_nvdec(src: str, device_index: int, width: int, height: int) Tensor[source]¶
Decode video using NVDEC.
- Parameters:
src – Data source. Passed to
spdl.io.demux_video().device_index – The index of the target GPU.
width – The target resolution.
height – The target resolution.
- Returns:
A GPU tensor represents decoded video frames. The dtype is uint8, the shape is
[N, C, H, W], whereNis the number of frames in the video,Cis RGB channels.
- get_pipeline(src: Iterable[str], decode_fn: Callable[[str], Tensor], decode_concurrency: int, num_threads: int, buffer_size: int = 3) Pipeline[source]¶
Construct the video loading pipeline.
- Parameters:
src – Pipeline source. Generator that yields image paths. See
source().decode_fn – Function that decode the given image and send the decoded frames to GPU.
decode_concurrency – The maximum number of decoding scheduled concurrently.
num_threads – The number of threads in the pipeline.
buffer_size – The size of buffer for the resulting batch image Tensor.
Classes