Pipeline Stages¶
Pipeline is composed of multiple stages.
There are mainly three kinds of stages.
Source
Processing
Sink (buffer)
Source¶
The source specifies the origin of data, which is typically file paths or URLs.
The source can be set with PipelineBuilder.add_source() method.
The only requirement for the source object is that it must implement
Iterable or AsyncIterable
interface.
For example
Load a list of paths from a file.
def load_path_from_file(input_path: str):
with open(input_path, "r") as f:
for line in f:
if path := line.strip():
yield path
Files in directories
def find_files(path: Path, ext: str):
yield from path.glob(f'**/*{ext}')
Asynchronously list files in remote storage
# Using some imaginary async network client
async def list_bucket(bucket: str) -> AsyncIterator[str]:
client = client.connect()
async for route in client.list_bucket(bucket):
yield route
Note
Since the source object is executed in async event loop, if the source is
Iterable (synchronous iterator), the source object must be lightweight
and refrain from performing blocking operations.
Running a blocking operation in async event loop can, in turn, prevent the loop from scheduling callbacks, prevent tasks from being canceled, and prevent the background thread from joining.
See also
- Introduction to Async I/O
Learn more about async event loops and how they work in SPDL.
Note
In typical ML/AI training, the source is an index-generator, so called sampler,
and SPDL has implementations like spdl.source.DistributedRandomSampler
and spdl.source.DistributedDeterministicSampler objects.
Processing¶
Pre-processing is where a variety of operations are applied to the items passed from the previous stages.
You can define a processing stage by passing an operator function (callable) to
pipe(). You can also use aggregate()
and disaggregate() to stack and unstack multiple items.
See also
- IO Module
I/O is critical for performance, so SPDL comes with an independent I/O module.
The operator can be either an async function or a synchronous function. It must take exactly one argument†, which is an output from the earlier stage.
Note†
If you need to pass multiple objects between stages, use tuple or define a protocol using
dataclass().If you want to use an existing function which takes additional arguments, you need to convert the function to a univariate function by manually writing a wrapper function or using
functools.partial().
The following diagram illustrates a pipeline that fetches images from remote locations, batch decodes and sends data to GPU.
An implementation could look like this.
It uses spdl.io.load_image_batch(), which can decode and resize images
and send the decoded frames to GPU asynchronously.
>>> import spdl.io
>>> from spdl.dataloader import PipelineBuilder
>>>
>>> def source() -> Iterator[str]:
... """Returns the list of URLs to fetch data from"""
... ...
>>>
>>> async def download(url: str) -> bytes:
... """Download data from the given URL"""
... ...
>>>
>>> def process(data: list[bytes]) -> Tensor:
... """Given raw image data, decode, resize, batch and transfer data to GPU"""
... buffer = spdl.io.load_image_batch(
... data,
... width=224,
... height=224,
... cuda_config=spdl.io.cuda_config(device_index=0),
... )
... return spdl.io.to_torch(buffer)
>>>
>>> pipeline = (
... PipelineBuilder()
... .add_source(source())
... .pipe(download)
... .aggregate(32)
... .pipe(process)
... .add_sink(4)
... .build()
... )
>>>
>>>
Sink¶
Sink is a buffer where the results of the pipeline are accumulated.
A sink can be attached to a pipeline with PipelineBuilder.add_sink() method.
You can specify how many items can be buffered in the sink.
Advanced: Merging Multiple Pipelines¶
For more complex data loading scenarios, you can merge outputs from multiple independent
pipelines using MergeConfig. This is useful when you need to:
Combine data from different sources (e.g., multiple datasets or storage locations)
Process different types of data in parallel and merge them downstream
Build complex data loading patterns that go beyond linear pipeline structures
The Merge() function creates a merge configuration that combines
outputs from multiple PipelineConfig objects into a single stream.
Note
The merge mechanism is not supported by PipelineBuilder. You need to use
the lower-level spdl.pipeline.defs API and build_pipeline()
function to build pipelines with merge nodes.
See also
- Example: Pipeline definitions
Demonstrates how to build a complex pipeline with merge nodes, including how to combine multiple data sources and process them through a unified pipeline.