video_dataloading¶
This example uses SPDL to decode and batch video frames, then send them to GPU.
The structure of the pipeline is identical to that of
image_dataloading
.
Basic Usage¶
Running this example requires a dataset consists of videos.
For example, to run this example with Kinetics dataset.
Download Kinetics dataset. https://github.com/cvdfoundation/kinetics-dataset provides scripts to facilitate this.
Create a list containing the downloaded videos.
cd /data/users/moto/kinetics-dataset/k400/ find train -name '*.mp4' > ~/imagenet.train.flist
Run the script.
python examples/video_dataloading.py --input-flist ~/kinetics400.train.flist --prefix /data/users/moto/kinetics-dataset/k400/ --num-threads 8
Using GPU video decoder¶
When SPDL is built with NVDEC integration enabled, and the GPUs support NVDEC,
providing --nvdec
option switches the video decoder to NVDEC, using
spdl.io.decode_packets_nvdec()
. When using this option, adjust the
number of threads (the number of concurrent decoding) to accomodate
the number of hardware video decoder availabe on GPUs.
For the details, please refer to https://developer.nvidia.com/video-encode-and-decode-gpu-support-matrix-new
Note
This example decodes videos from the beginning to the end, so using NVDEC speeds up the whole decoding speed. But in cases where framees are sampled, CPU decoding with higher concurrency often yields higher throughput.
Source¶
Source
Click here to see the source.
1# Copyright (c) Meta Platforms, Inc. and affiliates.
2# All rights reserved.
3#
4# This source code is licensed under the BSD-style license found in the
5# LICENSE file in the root directory of this source tree.
6
7"""This example uses SPDL to decode and batch video frames, then send them to GPU.
8
9The structure of the pipeline is identical to that of
10:py:mod:`image_dataloading`.
11
12Basic Usage
13-----------
14
15Running this example requires a dataset consists of videos.
16
17For example, to run this example with Kinetics dataset.
18
191. Download Kinetics dataset.
20 https://github.com/cvdfoundation/kinetics-dataset provides scripts to facilitate this.
212. Create a list containing the downloaded videos.
22
23 .. code-block::
24
25 cd /data/users/moto/kinetics-dataset/k400/
26 find train -name '*.mp4' > ~/imagenet.train.flist
27
283. Run the script.
29
30 .. code-block:: shell
31
32 python examples/video_dataloading.py
33 --input-flist ~/kinetics400.train.flist
34 --prefix /data/users/moto/kinetics-dataset/k400/
35 --num-threads 8
36
37Using GPU video decoder
38-----------------------
39
40When SPDL is built with NVDEC integration enabled, and the GPUs support NVDEC,
41providing ``--nvdec`` option switches the video decoder to NVDEC, using
42:py:func:`spdl.io.decode_packets_nvdec`. When using this option, adjust the
43number of threads (the number of concurrent decoding) to accomodate
44the number of hardware video decoder availabe on GPUs.
45For the details, please refer to https://developer.nvidia.com/video-encode-and-decode-gpu-support-matrix-new
46
47.. note::
48
49 This example decodes videos from the beginning to the end, so using NVDEC
50 speeds up the whole decoding speed. But in cases where framees are sampled,
51 CPU decoding with higher concurrency often yields higher throughput.
52"""
53
54# pyre-ignore-all-errors
55
56import logging
57import signal
58import time
59from collections.abc import Callable, Iterable
60from dataclasses import dataclass
61from pathlib import Path
62from threading import Event
63
64import spdl.io
65import spdl.utils
66import torch
67from spdl.dataloader import Pipeline, PipelineBuilder
68from torch import Tensor
69
70_LG = logging.getLogger(__name__)
71
72__all__ = [
73 "entrypoint",
74 "worker_entrypoint",
75 "benchmark",
76 "source",
77 "decode_video",
78 "decode_video_nvdec",
79 "get_pipeline",
80 "PerfResult",
81]
82
83
84def _parse_args(args):
85 import argparse
86
87 parser = argparse.ArgumentParser(
88 description=__doc__,
89 )
90 parser.add_argument("--debug", action="store_true")
91 parser.add_argument("--input-flist", type=Path, required=True)
92 parser.add_argument("--max-samples", type=int, default=float("inf"))
93 parser.add_argument("--prefix", default="")
94 parser.add_argument("--trace", type=Path)
95 parser.add_argument("--queue-size", type=int, default=16)
96 parser.add_argument("--num-threads", type=int, required=True)
97 parser.add_argument("--worker-id", type=int, required=True)
98 parser.add_argument("--num-workers", type=int, required=True)
99 parser.add_argument("--nvdec", action="store_true")
100 args = parser.parse_args(args)
101 if args.trace:
102 args.max_samples = 320
103 return args
104
105
106def source(
107 input_flist: str,
108 prefix: str,
109 max_samples: int,
110 split_size: int = 1,
111 split_id: int = 0,
112) -> Iterable[str]:
113 """Iterate a file containing a list of paths, while optionally skipping some.
114
115 Args:
116 input_flist: A file contains list of video paths.
117 prefix: Prepended to the paths in the list.
118 max_samples: The maximum number of items to yield.
119 split_size: Split the paths in to this number of subsets.
120 split_id: The index of this split. Paths at ``line_number % split_size == split_id`` are returned.
121
122 Yields:
123 The paths of the specified split.
124 """
125 with open(input_flist, "r") as f:
126 num_yielded = 0
127 for i, line in enumerate(f):
128 if i % split_size != split_id:
129 continue
130 if line := line.strip():
131 yield prefix + line
132
133 if (num_yielded := num_yielded + 1) >= max_samples:
134 return
135
136
137async def decode_video(
138 src: str | bytes,
139 width: int,
140 height: int,
141 device_index: int,
142) -> Tensor:
143 """Decode video and send decoded frames to GPU.
144
145 Args:
146 src: Data source. Passed to :py:func:`spdl.io.demux_video`.
147 width, height: The target resolution.
148 device_index: The index of the target GPU.
149
150 Returns:
151 A GPU tensor represents decoded video frames.
152 The dtype is uint8, the shape is ``[N, C, H, W]``, where ``N`` is the number
153 of frames in the video, ``C`` is RGB channels.
154 """
155 packets = await spdl.io.async_demux_video(src)
156 frames = await spdl.io.async_decode_packets(
157 packets,
158 filter_desc=spdl.io.get_filter_desc(
159 packets,
160 scale_width=width,
161 scale_height=height,
162 pix_fmt="rgb24",
163 ),
164 )
165 buffer = await spdl.io.async_convert_frames(frames)
166 buffer = await spdl.io.async_transfer_buffer(
167 buffer,
168 device_config=spdl.io.cuda_config(
169 device_index=device_index,
170 allocator=(
171 torch.cuda.caching_allocator_alloc,
172 torch.cuda.caching_allocator_delete,
173 ),
174 ),
175 )
176 return spdl.io.to_torch(buffer).permute(0, 2, 3, 1)
177
178
179async def decode_video_nvdec(
180 src: str,
181 device_index: int,
182 width: int,
183 height: int,
184):
185 """Decode video using NVDEC.
186
187 Args:
188 src: Data source. Passed to :py:func:`spdl.io.demux_video`.
189 device_index: The index of the target GPU.
190 width, height: The target resolution.
191
192 Returns:
193 A GPU tensor represents decoded video frames.
194 The dtype is uint8, the shape is ``[N, C, H, W]``, where ``N`` is the number
195 of frames in the video, ``C`` is RGB channels.
196 """
197 packets = await spdl.io.async_demux_video(src)
198 buffer = await spdl.io.async_decode_packets_nvdec(
199 packets,
200 device_config=spdl.io.cuda_config(
201 device_index=device_index,
202 allocator=(
203 torch.cuda.caching_allocator_alloc,
204 torch.cuda.caching_allocator_delete,
205 ),
206 ),
207 width=width,
208 height=height,
209 pix_fmt="rgba",
210 )
211 return spdl.io.to_torch(buffer)[..., :3].permute(0, 2, 3, 1)
212
213
214def _get_decode_fn(device_index, use_nvdec, width=222, height=222):
215 if use_nvdec:
216
217 async def _decode_func(src):
218 return await decode_video_nvdec(src, device_index, width, height)
219
220 else:
221
222 async def _decode_func(src):
223 return await decode_video(src, width, height, device_index)
224
225 return _decode_func
226
227
228def get_pipeline(
229 src: Iterable[str],
230 decode_fn: Callable[[str], Tensor],
231 decode_concurrency: int,
232 num_threads: int,
233 buffer_size: int = 3,
234) -> Pipeline:
235 """Construct the video loading pipeline.
236
237 Args:
238 src: Pipeline source. Generator that yields image paths. See :py:func:`source`.
239 decode_fn: Function that decode the given image and send the decoded frames to GPU.
240 decode_concurrency: The maximum number of decoding scheduled concurrently.
241 num_threads: The number of threads in the pipeline.
242 buffer_size: The size of buffer for the resulting batch image Tensor.
243 """
244 return (
245 PipelineBuilder()
246 .add_source(src)
247 .pipe(decode_fn, concurrency=decode_concurrency, report_stats_interval=15)
248 .add_sink(buffer_size)
249 .build(num_threads=num_threads)
250 )
251
252
253def _get_pipeline(args):
254 src = source(
255 input_flist=args.input_flist,
256 prefix=args.prefix,
257 max_samples=args.max_samples,
258 split_id=args.worker_id,
259 split_size=args.num_workers,
260 )
261
262 decode_fn = _get_decode_fn(args.worker_id, args.nvdec)
263 pipeline = get_pipeline(
264 src,
265 decode_fn,
266 decode_concurrency=args.num_threads,
267 num_threads=args.num_threads + 3,
268 buffer_size=args.queue_size,
269 )
270 print(pipeline)
271 return pipeline
272
273
274@dataclass
275class PerfResult:
276 """Used to report the worker performance to the main process."""
277
278 elapsed: float
279 """The time it took to process all the inputs."""
280
281 num_batches: int
282 """The number of batches processed."""
283
284 num_frames: int
285 """The number of frames processed."""
286
287
288def benchmark(
289 dataloader: Iterable[Tensor],
290 stop_requested: Event,
291) -> PerfResult:
292 """The main loop that measures the performance of dataloading.
293
294 Args:
295 dataloader: The dataloader to benchmark.
296 stop_requested: Used to interrupt the benchmark loop.
297
298 Returns:
299 The performance result.
300 """
301 t0 = time.monotonic()
302 num_frames = num_batches = 0
303 try:
304 for batches in dataloader:
305 for batch in batches:
306 num_frames += batch.shape[0]
307 num_batches += 1
308
309 if stop_requested.is_set():
310 break
311
312 finally:
313 elapsed = time.monotonic() - t0
314 fps = num_frames / elapsed
315 _LG.info(f"FPS={fps:.2f} ({num_frames} / {elapsed:.2f}), (Done {num_frames})")
316
317 return PerfResult(elapsed, num_batches, num_frames)
318
319
320def worker_entrypoint(args: list[str]) -> PerfResult:
321 """Entrypoint for worker process. Load images to a GPU and measure its performance.
322
323 It builds a Pipeline object using :py:func:`get_pipeline` function and run it with
324 :py:func:`benchmark` function.
325 """
326 args = _parse_args(args)
327 _init(args.debug, args.worker_id)
328
329 _LG.info(args)
330
331 pipeline = _get_pipeline(args)
332
333 device = torch.device(f"cuda:{args.worker_id}")
334
335 ev = Event()
336
337 def handler_stop_signals(_signum, _frame):
338 ev.set()
339
340 signal.signal(signal.SIGTERM, handler_stop_signals)
341
342 # Warm up
343 torch.zeros([1, 1], device=device)
344
345 trace_path = f"{args.trace}.{args.worker_id}"
346 with (
347 pipeline.auto_stop(),
348 spdl.utils.tracing(trace_path, enable=args.trace is not None),
349 ):
350 return benchmark(pipeline.get_iterator(), ev)
351
352
353def _init_logging(debug=False, worker_id=None):
354 fmt = "%(asctime)s [%(levelname)s] %(message)s"
355 if worker_id is not None:
356 fmt = f"[{worker_id}:%(thread)d] {fmt}"
357 level = logging.DEBUG if debug else logging.INFO
358 logging.basicConfig(format=fmt, level=level)
359
360
361def _init(debug, worker_id):
362 _init_logging(debug, worker_id)
363
364
365def _parse_process_args(args):
366 import argparse
367
368 parser = argparse.ArgumentParser(
369 description=__doc__,
370 )
371 parser.add_argument("--num-workers", type=int, default=8)
372 return parser.parse_known_args(args)
373
374
375def entrypoint(args: list[str] | None = None):
376 """CLI entrypoint. Launch the worker processes, each of which load videos and send them to GPU."""
377 ns, args = _parse_process_args(args)
378
379 args_set = [
380 [*args, f"--worker-id={i}", f"--num-workers={ns.num_workers}"]
381 for i in range(ns.num_workers)
382 ]
383
384 from multiprocessing import Pool
385
386 with Pool(processes=ns.num_workers) as pool:
387 _init_logging()
388 _LG.info("Spawned: %d workers", ns.num_workers)
389
390 vals = pool.map(worker_entrypoint, args_set)
391
392 ave_time = sum(v.elapsed for v in vals) / len(vals)
393 total_frames = sum(v.num_frames for v in vals)
394 total_batches = sum(v.num_batches for v in vals)
395
396 _LG.info(f"{ave_time=:.2f}, {total_frames=}, {total_batches=}")
397
398 FPS = total_frames / ave_time
399 BPS = total_batches / ave_time
400 _LG.info(f"Aggregated {FPS=:.2f}, {BPS=:.2f}")
401
402
403if __name__ == "__main__":
404 entrypoint()
Functions¶
Functions
- entrypoint(args: list[str] | None = None)[source]¶
CLI entrypoint. Launch the worker processes, each of which load videos and send them to GPU.
- worker_entrypoint(args: list[str]) PerfResult [source]¶
Entrypoint for worker process. Load images to a GPU and measure its performance.
It builds a Pipeline object using
get_pipeline()
function and run it withbenchmark()
function.
- benchmark(dataloader: Iterable[Tensor], stop_requested: Event) PerfResult [source]¶
The main loop that measures the performance of dataloading.
- Parameters:
dataloader – The dataloader to benchmark.
stop_requested – Used to interrupt the benchmark loop.
- Returns:
The performance result.
- source(input_flist: str, prefix: str, max_samples: int, split_size: int = 1, split_id: int = 0) Iterable[str] [source]¶
Iterate a file containing a list of paths, while optionally skipping some.
- Parameters:
input_flist – A file contains list of video paths.
prefix – Prepended to the paths in the list.
max_samples – The maximum number of items to yield.
split_size – Split the paths in to this number of subsets.
split_id – The index of this split. Paths at
line_number % split_size == split_id
are returned.
- Yields:
The paths of the specified split.
- async decode_video(src: str | bytes, width: int, height: int, device_index: int) Tensor [source]¶
Decode video and send decoded frames to GPU.
- Parameters:
src – Data source. Passed to
spdl.io.demux_video()
.width – The target resolution.
height – The target resolution.
device_index – The index of the target GPU.
- Returns:
A GPU tensor represents decoded video frames. The dtype is uint8, the shape is
[N, C, H, W]
, whereN
is the number of frames in the video,C
is RGB channels.
- async decode_video_nvdec(src: str, device_index: int, width: int, height: int)[source]¶
Decode video using NVDEC.
- Parameters:
src – Data source. Passed to
spdl.io.demux_video()
.device_index – The index of the target GPU.
width – The target resolution.
height – The target resolution.
- Returns:
A GPU tensor represents decoded video frames. The dtype is uint8, the shape is
[N, C, H, W]
, whereN
is the number of frames in the video,C
is RGB channels.
- get_pipeline(src: Iterable[str], decode_fn: Callable[[str], Tensor], decode_concurrency: int, num_threads: int, buffer_size: int = 3) Pipeline [source]¶
Construct the video loading pipeline.
- Parameters:
src – Pipeline source. Generator that yields image paths. See
source()
.decode_fn – Function that decode the given image and send the decoded frames to GPU.
decode_concurrency – The maximum number of decoding scheduled concurrently.
num_threads – The number of threads in the pipeline.
buffer_size – The size of buffer for the resulting batch image Tensor.
Classes¶
Classes