Streaming Decoding¶
This section explains how to decode media data in a streaming fashion, processing data chunk by chunk instead of loading everything into memory at once.
Overview¶
The high-level loading functions (spdl.io.load_audio(), spdl.io.load_video(), spdl.io.load_image())
load entire media files into memory before returning. While this is convenient for many use cases, it can be
inefficient or impractical for:
Long media files: Files that are too large to fit in memory
Real-time processing: Applications that need to start processing before the entire file is loaded
Memory-constrained environments: Systems with limited available memory
Streaming applications: Live video/audio processing pipelines
SPDL provides streaming APIs that allow you to process media data incrementally, chunk by chunk.
Basic Streaming Pattern¶
The basic streaming workflow follows these steps:
Create a Demuxer: Open the media source
Create a Decoder: Initialize the decoder for the codec
Stream packets: Use an iterator to get packets in chunks
Decode incrementally: Process each chunk of packets
Flush: Don’t forget to flush remaining buffered frames
Here’s a minimal example:
import spdl.io
# Step 1: Create demuxer
with spdl.io.Demuxer("video.mp4") as demuxer:
# Step 2: Create decoder
decoder = spdl.io.Decoder(demuxer.video_codec)
# Step 3 & 4: Stream and decode packets
for packets in demuxer.streaming_demux(demuxer.video_stream_index, num_packets=10):
frames = decoder.decode(packets)
if frames is not None:
buffer = spdl.io.convert_frames(frames)
# Process buffer here...
tensor = spdl.io.to_torch(buffer)
# Step 5: Flush remaining frames
if (frames := decoder.flush()) is not None:
buffer = spdl.io.convert_frames(frames)
tensor = spdl.io.to_torch(buffer)
Warning
Always call decoder.flush() at the end of streaming to retrieve any buffered frames.
Many codecs buffer frames internally, and failing to flush will result in incomplete data.
Streaming Video¶
Use spdl.io.Demuxer.streaming_demux() to stream video packets in chunks:
import spdl.io
with spdl.io.Demuxer("video.mp4") as demuxer:
decoder = spdl.io.Decoder(demuxer.video_codec)
# Process 30 packets at a time
for packets in demuxer.streaming_demux(demuxer.video_stream_index, num_packets=30):
print(f"Processing {len(packets)} packets")
frames = decoder.decode(packets)
if frames is not None:
buffer = spdl.io.convert_frames(frames)
# Process frames...
# Always flush
if (frames := decoder.flush()) is not None:
buffer = spdl.io.convert_frames(frames)
Streaming Audio¶
Audio streaming works similarly to video streaming:
import spdl.io
with spdl.io.Demuxer("audio.mp3") as demuxer:
decoder = spdl.io.Decoder(demuxer.audio_codec)
# Stream audio packets
for packets in demuxer.streaming_demux(demuxer.audio_stream_index, num_packets=20):
frames = decoder.decode(packets)
if frames is not None:
buffer = spdl.io.convert_frames(frames)
array = spdl.io.to_numpy(buffer)
# Process audio chunk...
# Flush
if (frames := decoder.flush()) is not None:
buffer = spdl.io.convert_frames(frames)
Multi-Stream Decoding¶
For files containing multiple streams (e.g., audio and video), use spdl.io.Demuxer.streaming_demux()
with stream indices.
You can chunk streams by duration (e.g., 5-second chunks) or by packet count (e.g., 50 packets at a time):
import spdl.io
demuxer = spdl.io.Demuxer("movie.mp4")
video_index = demuxer.video_stream_index
audio_index = demuxer.audio_stream_index
audio_decoder = spdl.io.Decoder(demuxer.audio_codec)
video_decoder = spdl.io.Decoder(demuxer.video_codec)
# Process 5-second chunks (alternatively, use num_packets=50 for packet-based chunking)
packet_stream = demuxer.streaming_demux(
indices=[video_index, audio_index],
duration=5.0
)
for packets in packet_stream:
# Process audio if present in this chunk
if audio_index in packets:
frames = audio_decoder.decode(packets[audio_index])
buffer = spdl.io.convert_frames(frames)
# Process audio...
# Process video if present in this chunk
if video_index in packets:
frames = video_decoder.decode(packets[video_index])
buffer = spdl.io.convert_frames(frames)
# Process video...
# Flush both decoders
if (frames := audio_decoder.flush()) is not None:
buffer = spdl.io.convert_frames(frames)
if (frames := video_decoder.flush()) is not None:
buffer = spdl.io.convert_frames(frames)
Note
When using duration parameter, each iteration yields approximately the specified duration
worth of packets from each stream. Alternatively, use num_packets to chunk by packet count.
Streaming with Filtering¶
You can apply filters during streaming decoding by providing a filter_desc to the decoder.
This example shows how to apply a common preprocessing filter that resizes video frames to 256x256 and converts them to RGB format during streaming:
import spdl.io
demuxer = spdl.io.Demuxer("video.mp4")
# Create filter description
filter_desc = spdl.io.get_video_filter_desc(
scale_width=256,
scale_height=256,
pix_fmt="rgb24"
)
# Create decoder with filter
decoder = spdl.io.Decoder(demuxer.video_codec, filter_desc=filter_desc)
# Stream with filtering applied
for packets in demuxer.streaming_demux_video(num_packets=20):
frames = decoder.decode(packets) # Frames are already filtered
buffer = spdl.io.convert_frames(frames)
tensor = spdl.io.to_torch(buffer)
# Process filtered tensor...
# Flush
if (frames := decoder.flush()) is not None:
buffer = spdl.io.convert_frames(frames)
Advanced: Custom Filter Graph Streaming¶
For more complex filtering scenarios, use spdl.io.FilterGraph to manually control the filter graph.
This example demonstrates adding a watermark overlay to video frames during streaming decoding. The overlay filter composites a logo image on top of the video:
import spdl.io
demuxer = spdl.io.Demuxer("video.mp4")
codec = demuxer.video_codec
# Create decoder without filter
decoder = spdl.io.Decoder(codec, filter_desc=None)
# Load watermark image
watermark_buffer = spdl.io.load_image("logo.png")
watermark_desc = spdl.io.get_vbuffer_desc(watermark_buffer)
# Create filter graph with overlay
# Format: buffer (main video) -> scale -> overlay <- buffer (watermark)
filter_desc = (
f"{spdl.io.get_vbuffer_desc(codec)}[main];"
f"{watermark_desc}[logo];"
"[main]scale=1280:720[scaled];"
"[scaled][logo]overlay=W-w-10:H-h-10" # Position logo at bottom-right with 10px margin
)
filter_graph = spdl.io.FilterGraph(filter_desc)
# Add watermark to filter graph (only once)
filter_graph.add_frames(watermark_buffer)
# Stream with manual filter graph control
for packets in demuxer.streaming_demux_video(num_packets=10):
frames = decoder.decode(packets)
if frames is not None:
# Add video frames to filter graph
filter_graph.add_frames(frames)
# Get filtered frames with watermark applied
output_frames = filter_graph.get_frames()
if len(output_frames):
buffer = spdl.io.convert_frames(output_frames)
# Process watermarked buffer...
# Flush decoder
if (frames := decoder.flush()) is not None:
filter_graph.add_frames(frames)
# Flush filter graph
filter_graph.flush()
output_frames = filter_graph.get_frames()
if len(output_frames):
buffer = spdl.io.convert_frames(output_frames)