Streaming video processing

This example shows how to process video in streaming fashion.

For the resulting video to be playable, audio data and video data must be written in small chunks in alternating manner.

block-beta columns 7 block:e:7 p1["Audio t1"] p2("Video t1") p3("Video t2") p4["Audio t2"] p5("Video t3") p6["Audio t3"] p7["..."] end

The following diagram illustrates how audio/video data are processed.

flowchart TD In[Input Video] -->|1\. Demux| AP(Audio Packets) AP --> |2\. Remux| Out[Output Video] In[Input Video] -->|1\. Demux| VP(Video Packets) VP --> |3\. Decode| VF("Video Frames (YUV)") VF --> |4\. Filter| VF2("Video Frames (RGB)") VF2 --> |5\. Convert| ARR(Array) ARR --> |6\. Wrap| VF3("Video Frames (RGB)") VF3 --> |7\. Filter| VF4("Video Frames (YUV)") VF4 --> |8\. Encode| VP2(Video Packets) VP2 --> |9\. Mux| Out

We use spdl.io.Demuxer to extract audio/video data from the source. (1)

In this example, we do not modify audio data, so audio packets are sent to muxer (an instance of spdl.io.Muxer) directly. (2)

To modify video data, first we decode videos packets and obtain frames, using spdl.io.VideoDecoder. (3)

Usually, video frames are stored as YUV420 format, so we convert it to RGB using spdl.io.FilterGraph. (4) Then the resulting frame data are extracted as NumPy array. (5)

Though omitted in this example, let’s pretend that the array data is modified with some sort of AI model. Now we convert the array back to packet, by applying a reverse operation one by one.

To convert array back to frames, we use spdl.io.create_reference_video_frame(). This function creates a VideoFrames object that references the data of the array. (6)

We convert RGB into YUV420 using another FilterGraph instance. (7)

The YUV frame is encoded using spdl.io.VideoEncoder. (8)

Finally, the encoded data is written to the multiplexer. (9)

Note on component states

All the media processing components used in this example, (Demuxer/Decoder/FilterGraph/Encoder/Muxer) maintain its own internal state, and do not necessarily process the input data immediately.

Therefore, the number of input/output frames/packets do not necessarily match, and you need to call flush() at the end for each component.

Source

Source

Click here to see the source.
  1#!/usr/bin/env python3
  2# Copyright (c) Meta Platforms, Inc. and affiliates.
  3# All rights reserved.
  4#
  5# This source code is licensed under the BSD-style license found in the
  6# LICENSE file in the root directory of this source tree.
  7
  8"""This example shows how to process video in streaming fashion.
  9
 10For the resulting video to be playable, audio data and video data must be
 11written in small chunks in alternating manner.
 12
 13.. include:: ../plots/streaming_video_processing_block.txt
 14
 15The following diagram illustrates how audio/video data are processed.
 16
 17.. include:: ../plots/streaming_video_processing_chart.txt
 18
 19We use :py:class:`spdl.io.Demuxer` to extract audio/video data from the
 20source. (1)
 21
 22In this example, we do not modify audio data, so audio packets are sent to
 23muxer (an instance of :py:class:`spdl.io.Muxer`) directly. (2)
 24
 25To modify video data, first we decode videos packets and obtain frames,
 26using :py:class:`spdl.io.VideoDecoder`. (3)
 27
 28Usually, video frames are stored as YUV420 format, so we convert
 29it to RGB using :py:class:`spdl.io.FilterGraph`. (4) Then the resulting
 30frame data are extracted as NumPy array. (5)
 31
 32Though omitted in this example, let's pretend that the array data is
 33modified with some sort of AI model. Now we convert the array back
 34to packet, by applying a reverse operation one by one.
 35
 36To convert array back to frames, we use
 37:py:func:`spdl.io.create_reference_video_frame`. This function creates a
 38:py:class:`~spdl.io.VideoFrames` object that references the data of the
 39array. (6)
 40
 41We convert RGB into YUV420 using another :py:class:`~spdl.io.FilterGraph`
 42instance. (7)
 43
 44The YUV frame is encoded using :py:class:`spdl.io.VideoEncoder`. (8)
 45
 46Finally, the encoded data is written to the multiplexer. (9)
 47
 48.. admonition:: Note on component states
 49   :class: note
 50
 51   All the media processing components used in this example,
 52   (Demuxer/Decoder/FilterGraph/Encoder/Muxer) maintain its own internal
 53   state, and do not necessarily process the input data immediately.
 54
 55   Therefore, the number of input/output frames/packets do not necessarily
 56   match, and you need to call ``flush()`` at the end for each component.
 57
 58"""
 59
 60__all__ = [
 61    "main",
 62    "parse_args",
 63    "get_filter_desc",
 64    "process",
 65    "build_components",
 66    "main",
 67]
 68
 69import argparse
 70from pathlib import Path
 71
 72import spdl.io
 73from spdl.io import (
 74    AudioPackets,
 75    Demuxer,
 76    FilterGraph,
 77    Muxer,
 78    VideoDecoder,
 79    VideoEncoder,
 80)
 81
 82# pyre-strict
 83
 84
 85def parse_args() -> argparse.Namespace:
 86    """Parse the command line arguments."""
 87
 88    parser = argparse.ArgumentParser(
 89        description=__doc__,
 90    )
 91    parser.add_argument("--input-path", "-i", required=True, type=Path)
 92    parser.add_argument("--output-path", "-o", required=True, type=Path)
 93    return parser.parse_args()
 94
 95
 96def get_filter_desc(
 97    input_pix_fmt: str,
 98    input_width: int,
 99    input_height: int,
100    frame_rate: tuple[int, int],
101    output_pix_fmt: str,
102    output_width: int | None = None,
103    output_height: int | None = None,
104) -> str:
105    """Build a filter description that performs format conversion and optional scaling
106
107    Args:
108        input_pix_fmt: The input pixel format. Usually ``"rgb24"``.
109        input_width,input_height: The input frame resolution.
110        frame_rate: The frame rate of the video.
111        output_pix_fmt: The output pixel format. It is the pixel format used by
112            the encoder.
113        output_width,output_height: The output frame resolution.
114
115    Returns:
116        The filter description.
117    """
118    # filter graph for converting RGB into YUV420p
119    buffer_arg = ":".join(
120        [
121            f"video_size={input_width}x{input_height}",
122            f"pix_fmt={input_pix_fmt}",
123            f"time_base={frame_rate[1]}/{frame_rate[0]}",
124            "pixel_aspect=1/1",
125        ]
126    )
127    filter_arg = ",".join(
128        [
129            f"format=pix_fmts={output_pix_fmt}",
130            f"scale=w={output_width or 'iw'}:h={output_height or 'ih'}",
131        ]
132    )
133    return f"buffer={buffer_arg},{filter_arg},buffersink"
134
135
136def process(
137    demuxer: Demuxer,
138    video_decoder: VideoDecoder,
139    filter_graph: FilterGraph,
140    video_encoder: VideoEncoder,
141    muxer: Muxer,
142) -> None:
143    """The main processing logic.
144
145    Args:
146        demuxer: Demux audio/video streams from the source.
147        video_decoder: Decode the video packets.
148        filter_graph: Transform applied to the array data before encoding.
149        video_encoder: Encode the processed video array.
150        muxer: Multiplexer for remuxing audio packets and processed video packets.
151    """
152    src_pix_fmt = "rgb24"
153    frame_rate = demuxer.video_codec.frame_rate
154    video_index = demuxer.video_stream_index
155    audio_index = demuxer.audio_stream_index
156
157    streaming_demuxing = demuxer.streaming_demux([video_index, audio_index], duration=1)
158    with muxer.open():
159        num_video_frames = 0
160        for packets in streaming_demuxing:
161            if (audio_packets := packets.get(audio_index)) is not None:
162                muxer.write(1, audio_packets)
163
164            if (video_packets := packets.get(video_index)) is None:
165                continue
166
167            assert not isinstance(video_packets, AudioPackets)
168
169            if (frames := video_decoder.decode(video_packets)) is not None:
170                buffer = spdl.io.convert_frames(frames)
171                array = spdl.io.to_numpy(buffer)
172
173                ##############################################################
174                # <ADD FRAME PROCESSING HERE>
175                ##############################################################
176
177                frames = spdl.io.create_reference_video_frame(
178                    array,
179                    pix_fmt=src_pix_fmt,
180                    frame_rate=frame_rate,
181                    pts=num_video_frames,
182                )
183                num_video_frames += len(array)
184
185                filter_graph.add_frames(frames)
186
187                if (frames := filter_graph.get_frames()) is not None:
188                    if (
189                        packets := video_encoder.encode(frames)  # pyre-ignore
190                    ) is not None:
191                        muxer.write(0, packets)
192
193        # -------------------------------------------------------------
194        # Drain mode
195        # -------------------------------------------------------------
196
197        # Flush decoder
198        if (frames := video_decoder.flush()) is not None:
199            buffer = spdl.io.convert_frames(frames)
200            array = spdl.io.to_numpy(buffer)
201
202            ##############################################################
203            # <ADD FRAME PROCESSING HERE>
204            ##############################################################
205
206            frames = spdl.io.create_reference_video_frame(
207                array,
208                pix_fmt=src_pix_fmt,
209                frame_rate=frame_rate,
210                pts=num_video_frames,
211            )
212            num_video_frames += len(frames)
213
214            filter_graph.add_frames(frames)
215            if (frames := filter_graph.get_frames()) is not None:
216                if (packets := video_encoder.encode(frames)) is not None:  # pyre-ignore
217                    muxer.write(0, packets)
218
219        # Flush filter graph
220        if (frames := filter_graph.flush()) is not None:
221            if (packets := video_encoder.encode(frames)) is not None:
222                muxer.write(0, packets)
223
224        # Flush encoder
225        if (packets := video_encoder.flush()) is not None:
226            muxer.write(0, packets)
227
228
229def build_components(
230    input_path: Path, output_path: Path
231) -> tuple[Demuxer, VideoDecoder, FilterGraph, VideoEncoder, Muxer]:
232    """"""
233    demuxer = spdl.io.Demuxer(input_path)
234    muxer = spdl.io.Muxer(output_path)
235
236    # Fetch the input config
237    audio_codec = demuxer.audio_codec
238
239    video_codec = demuxer.video_codec
240    frame_rate = video_codec.frame_rate
241    src_width = video_codec.width
242    src_height = video_codec.height
243
244    # Create decoder
245    video_decoder = spdl.io.Decoder(demuxer.video_codec)
246
247    # Configure output
248    src_pix_fmt = "rgb24"
249    enc_pix_fmt = "yuv420p"
250    enc_height = src_height // 2
251    enc_width = src_width // 2
252    filter_desc = get_filter_desc(
253        input_pix_fmt=src_pix_fmt,
254        input_width=src_width,
255        input_height=src_height,
256        frame_rate=frame_rate,
257        output_pix_fmt=enc_pix_fmt,
258        output_width=enc_width,
259        output_height=enc_height,
260    )
261    print(filter_desc)
262    filter_graph = spdl.io.FilterGraph(filter_desc)
263
264    video_encoder = muxer.add_encode_stream(
265        config=spdl.io.video_encode_config(
266            pix_fmt=enc_pix_fmt,
267            frame_rate=frame_rate,
268            height=enc_height,
269            width=enc_width,
270            colorspace="bt709",
271            color_primaries="bt709",
272            color_trc="bt709",
273        ),
274    )
275    muxer.add_remux_stream(audio_codec)
276    return demuxer, video_decoder, filter_graph, video_encoder, muxer
277
278
279def main() -> None:
280    """Entrypoint from the command line."""
281    args = parse_args()
282
283    demuxer, video_decoder, filter_graph, video_encoder, muxer = build_components(
284        args.input_path, args.output_path
285    )
286
287    process(
288        demuxer,
289        video_decoder,
290        filter_graph,
291        video_encoder,
292        muxer,
293    )
294
295
296if __name__ == "__main__":
297    main()

Functions

Functions

main() None[source]

Entrypoint from the command line.

parse_args() Namespace[source]

Parse the command line arguments.

get_filter_desc(input_pix_fmt: str, input_width: int, input_height: int, frame_rate: tuple[int, int], output_pix_fmt: str, output_width: int | None = None, output_height: int | None = None) str[source]

Build a filter description that performs format conversion and optional scaling

Parameters:
  • input_pix_fmt – The input pixel format. Usually "rgb24".

  • input_width – The input frame resolution.

  • input_height – The input frame resolution.

  • frame_rate – The frame rate of the video.

  • output_pix_fmt – The output pixel format. It is the pixel format used by the encoder.

  • output_width – The output frame resolution.

  • output_height – The output frame resolution.

Returns:

The filter description.

process(demuxer: Demuxer, video_decoder: VideoDecoder, filter_graph: FilterGraph, video_encoder: VideoEncoder, muxer: Muxer) None[source]

The main processing logic.

Parameters:
  • demuxer – Demux audio/video streams from the source.

  • video_decoder – Decode the video packets.

  • filter_graph – Transform applied to the array data before encoding.

  • video_encoder – Encode the processed video array.

  • muxer – Multiplexer for remuxing audio packets and processed video packets.

build_components(input_path: Path, output_path: Path) tuple[Demuxer, VideoDecoder, FilterGraph, VideoEncoder, Muxer][source]