Streaming video processing

This example shows how to process video in streaming fashion.

For the resulting video to be playable, audio data and video data must be written in small chunks in alternating manner.

block-beta columns 7 block:e:7 p1["Audio t1"] p2("Video t1") p3("Video t2") p4["Audio t2"] p5("Video t3") p6["Audio t3"] p7["..."] end

The following diagram illustrates how audio/video data are processed.

flowchart TD In[Input Video] -->|1\. Demux| AP(Audio Packets) AP --> |2\. Remux| Out[Output Video] In[Input Video] -->|1\. Demux| VP(Video Packets) VP --> |3\. Decode| VF("Video Frames (YUV)") VF --> |4\. Filter| VF2("Video Frames (RGB)") VF2 --> |5\. Convert| ARR(Array) ARR --> |6\. Wrap| VF3("Video Frames (RGB)") VF3 --> |7\. Filter| VF4("Video Frames (YUV)") VF4 --> |8\. Encode| VP2(Video Packets) VP2 --> |9\. Mux| Out

We use spdl.io.Demuxer to extract audio/video data from the source. (1)

In this example, we do not modify audio data, so audio packets are sent to muxer (an instance of spdl.io.Muxer) directly. (2)

To modify video data, first we decode videos packets and obtain frames, using spdl.io.VideoDecoder. (3)

Usually, video frames are stored as YUV420 format, so we convert it to RGB using spdl.io.FilterGraph. (4) Then the resulting frame data are extracted as NumPy array. (5)

Though omitted in this example, let’s pretend that the array data is modified with some sort of AI model. Now we convert the array back to packet, by applying a reverse operation one by one.

To convert array back to frames, we use spdl.io.create_reference_video_frame(). This function creates a VideoFrames object that references the data of the array. (6)

We convert RGB into YUV420 using another FilterGraph instance. (7)

The YUV frame is encoded using spdl.io.VideoEncoder. (8)

Finally, the encoded data is written to the multiplexer. (9)

Note on component states

All the media processing components used in this example, (Demuxer/Decoder/FilterGraph/Encoder/Muxer) maintain its own internal state, and do not necessarily process the input data immediately.

Therefore, the number of input/output frames/packets do not necessarily match, and you need to call flush() at the end for each component.

Source

Source

Click here to see the source.
  1#!/usr/bin/env python3
  2# Copyright (c) Meta Platforms, Inc. and affiliates.
  3# All rights reserved.
  4#
  5# This source code is licensed under the BSD-style license found in the
  6# LICENSE file in the root directory of this source tree.
  7
  8"""This example shows how to process video in streaming fashion.
  9
 10For the resulting video to be playable, audio data and video data must be
 11written in small chunks in alternating manner.
 12
 13.. include:: ../plots/streaming_video_processing_block.txt
 14
 15The following diagram illustrates how audio/video data are processed.
 16
 17.. include:: ../plots/streaming_video_processing_chart.txt
 18
 19We use :py:class:`spdl.io.Demuxer` to extract audio/video data from the
 20source. (1)
 21
 22In this example, we do not modify audio data, so audio packets are sent to
 23muxer (an instance of :py:class:`spdl.io.Muxer`) directly. (2)
 24
 25To modify video data, first we decode videos packets and obtain frames,
 26using :py:class:`spdl.io.VideoDecoder`. (3)
 27
 28Usually, video frames are stored as YUV420 format, so we convert
 29it to RGB using :py:class:`spdl.io.FilterGraph`. (4) Then the resulting
 30frame data are extracted as NumPy array. (5)
 31
 32Though omitted in this example, let's pretend that the array data is
 33modified with some sort of AI model. Now we convert the array back
 34to packet, by applying a reverse operation one by one.
 35
 36To convert array back to frames, we use
 37:py:func:`spdl.io.create_reference_video_frame`. This function creates a
 38:py:class:`~spdl.io.VideoFrames` object that references the data of the
 39array. (6)
 40
 41We convert RGB into YUV420 using another :py:class:`~spdl.io.FilterGraph`
 42instance. (7)
 43
 44The YUV frame is encoded using :py:class:`spdl.io.VideoEncoder`. (8)
 45
 46Finally, the encoded data is written to the multiplexer. (9)
 47
 48.. admonition:: Note on component states
 49   :class: note
 50
 51   All the media processing components used in this example,
 52   (Demuxer/Decoder/FilterGraph/Encoder/Muxer) maintain its own internal
 53   state, and do not necessarily process the input data immediately.
 54
 55   Therefore, the number of input/output frames/packets do not necessarily
 56   match, and you need to call ``flush()`` at the end for each component.
 57
 58"""
 59
 60__all__ = [
 61    "main",
 62    "parse_args",
 63    "get_filter_desc",
 64    "process",
 65    "build_components",
 66    "main",
 67]
 68
 69import argparse
 70from pathlib import Path
 71
 72import spdl.io
 73from spdl.io import (
 74    Demuxer,
 75    FilterGraph,
 76    Muxer,
 77    VideoDecoder,
 78    VideoEncoder,
 79    VideoPackets,
 80)
 81
 82# pyre-strict
 83
 84
 85def parse_args() -> argparse.Namespace:
 86    """Parse the command line arguments."""
 87
 88    parser = argparse.ArgumentParser(
 89        description=__doc__,
 90    )
 91    parser.add_argument("--input-path", "-i", required=True, type=Path)
 92    parser.add_argument("--output-path", "-o", required=True, type=Path)
 93    return parser.parse_args()
 94
 95
 96def get_filter_desc(
 97    input_pix_fmt: str,
 98    input_width: int,
 99    input_height: int,
100    frame_rate: tuple[int, int],
101    output_pix_fmt: str,
102    output_width: int | None = None,
103    output_height: int | None = None,
104) -> str:
105    """Build a filter description that performs format conversion and optional scaling
106
107    Args:
108        input_pix_fmt: The input pixel format. Usually ``"rgb24"``.
109        input_width,input_height: The input frame resolution.
110        frame_rate: The frame rate of the video.
111        output_pix_fmt: The output pixel format. It is the pixel format used by
112            the encoder.
113        output_width,output_height: The output frame resolution.
114
115    Returns:
116        The filter description.
117    """
118    # filter graph for converting RGB into YUV420p
119    buffer_arg = ":".join(
120        [
121            f"video_size={input_width}x{input_height}",
122            f"pix_fmt={input_pix_fmt}",
123            f"time_base={frame_rate[1]}/{frame_rate[0]}",
124            "pixel_aspect=1/1",
125        ]
126    )
127    filter_arg = ",".join(
128        [
129            f"format=pix_fmts={output_pix_fmt}",
130            f"scale=w={output_width or 'iw'}:h={output_height or 'ih'}",
131        ]
132    )
133    return f"buffer={buffer_arg},{filter_arg},buffersink"
134
135
136def process(
137    demuxer: Demuxer,
138    video_decoder: VideoDecoder,
139    filter_graph: FilterGraph,
140    video_encoder: VideoEncoder,
141    muxer: Muxer,
142) -> None:
143    """The main processing logic.
144
145    Args:
146        demuxer: Demux audio/video streams from the source.
147        video_decoder: Decode the video packets.
148        filter_graph: Transform applied to the array data before encoding.
149        video_encoder: Encode the processed video array.
150        muxer: Multiplexer for remuxing audio packets and processed video packets.
151    """
152    src_pix_fmt = "rgb24"
153    frame_rate = demuxer.video_codec.frame_rate
154    video_index = demuxer.video_stream_index
155    audio_index = demuxer.audio_stream_index
156
157    streaming_demuxing = demuxer.streaming_demux([video_index, audio_index], duration=1)
158    with muxer.open():
159        num_video_frames = 0
160        for packets in streaming_demuxing:
161            if (audio_packets := packets.get(audio_index)) is not None:
162                muxer.write(1, audio_packets)
163
164            if (video_packets := packets.get(video_index)) is None:
165                continue
166
167            assert isinstance(video_packets, VideoPackets)
168            if (frames := video_decoder.decode(video_packets)) is not None:
169                buffer = spdl.io.convert_frames(frames)
170                array = spdl.io.to_numpy(buffer)
171
172                ##############################################################
173                # <ADD FRAME PROCESSING HERE>
174                ##############################################################
175
176                frames = spdl.io.create_reference_video_frame(
177                    array,
178                    pix_fmt=src_pix_fmt,
179                    frame_rate=frame_rate,
180                    pts=num_video_frames,
181                )
182                num_video_frames += len(array)
183
184                filter_graph.add_frames(frames)
185
186                if (frames := filter_graph.get_frames()) is not None:
187                    if (
188                        packets := video_encoder.encode(frames)  # pyre-ignore
189                    ) is not None:
190                        muxer.write(0, packets)
191
192        # -------------------------------------------------------------
193        # Drain mode
194        # -------------------------------------------------------------
195
196        # Flush decoder
197        if (frames := video_decoder.flush()) is not None:
198            buffer = spdl.io.convert_frames(frames)
199            array = spdl.io.to_numpy(buffer)
200
201            ##############################################################
202            # <ADD FRAME PROCESSING HERE>
203            ##############################################################
204
205            frames = spdl.io.create_reference_video_frame(
206                array,
207                pix_fmt=src_pix_fmt,
208                frame_rate=frame_rate,
209                pts=num_video_frames,
210            )
211            num_video_frames += len(frames)
212
213            filter_graph.add_frames(frames)
214            if (frames := filter_graph.get_frames()) is not None:
215                if (packets := video_encoder.encode(frames)) is not None:  # pyre-ignore
216                    muxer.write(0, packets)
217
218        # Flush filter graph
219        if (frames := filter_graph.flush()) is not None:
220            if (packets := video_encoder.encode(frames)) is not None:
221                muxer.write(0, packets)
222
223        # Flush encoder
224        if (packets := video_encoder.flush()) is not None:
225            muxer.write(0, packets)
226
227
228def build_components(
229    input_path: Path, output_path: Path
230) -> tuple[Demuxer, VideoDecoder, FilterGraph, VideoEncoder, Muxer]:
231    """"""
232    demuxer = spdl.io.Demuxer(input_path)
233    muxer = spdl.io.Muxer(output_path)
234
235    # Fetch the input config
236    audio_codec = demuxer.audio_codec
237
238    video_codec = demuxer.video_codec
239    frame_rate = video_codec.frame_rate
240    src_width = video_codec.width
241    src_height = video_codec.height
242
243    # Create decoder
244    video_decoder = spdl.io.Decoder(demuxer.video_codec)
245
246    # Configure output
247    src_pix_fmt = "rgb24"
248    enc_pix_fmt = "yuv420p"
249    enc_height = src_height // 2
250    enc_width = src_width // 2
251    filter_desc = get_filter_desc(
252        input_pix_fmt=src_pix_fmt,
253        input_width=src_width,
254        input_height=src_height,
255        frame_rate=frame_rate,
256        output_pix_fmt=enc_pix_fmt,
257        output_width=enc_width,
258        output_height=enc_height,
259    )
260    print(filter_desc)
261    filter_graph = spdl.io.FilterGraph(filter_desc)
262
263    video_encoder = muxer.add_encode_stream(
264        config=spdl.io.video_encode_config(
265            pix_fmt=enc_pix_fmt,
266            frame_rate=frame_rate,
267            height=enc_height,
268            width=enc_width,
269            colorspace="bt709",
270            color_primaries="bt709",
271            color_trc="bt709",
272        ),
273    )
274    muxer.add_remux_stream(audio_codec)
275    return demuxer, video_decoder, filter_graph, video_encoder, muxer
276
277
278def main() -> None:
279    """Entrypoint from the command line."""
280    args = parse_args()
281
282    demuxer, video_decoder, filter_graph, video_encoder, muxer = build_components(
283        args.input_path, args.output_path
284    )
285
286    process(
287        demuxer,
288        video_decoder,
289        filter_graph,
290        video_encoder,
291        muxer,
292    )
293
294
295if __name__ == "__main__":
296    main()

API Reference

Functions

main() None[source]

Entrypoint from the command line.

parse_args() Namespace[source]

Parse the command line arguments.

get_filter_desc(input_pix_fmt: str, input_width: int, input_height: int, frame_rate: tuple[int, int], output_pix_fmt: str, output_width: int | None = None, output_height: int | None = None) str[source]

Build a filter description that performs format conversion and optional scaling

Parameters:
  • input_pix_fmt – The input pixel format. Usually "rgb24".

  • input_width – The input frame resolution.

  • input_height – The input frame resolution.

  • frame_rate – The frame rate of the video.

  • output_pix_fmt – The output pixel format. It is the pixel format used by the encoder.

  • output_width – The output frame resolution.

  • output_height – The output frame resolution.

Returns:

The filter description.

process(demuxer: Demuxer, video_decoder: VideoDecoder, filter_graph: FilterGraph, video_encoder: VideoEncoder, muxer: Muxer) None[source]

The main processing logic.

Parameters:
  • demuxer – Demux audio/video streams from the source.

  • video_decoder – Decode the video packets.

  • filter_graph – Transform applied to the array data before encoding.

  • video_encoder – Encode the processed video array.

  • muxer – Multiplexer for remuxing audio packets and processed video packets.

build_components(input_path: Path, output_path: Path) tuple[Demuxer, VideoDecoder, FilterGraph, VideoEncoder, Muxer][source]