Pipeline Stages¶
Pipeline
is composed of multiple stages.
There are mainly three kind of stages.
Source
Processing
Sink (buffer)
Source¶
Source specifies where the data are located. This is typically file paths or URLs.
The source can be set with PipelineBuilder.add_source()
method. The only requirement for the source object is that it must implement
Iterable
or AsyncIterable
interface.
For example
Load a list of paths from a file.
def load_path_from_file(input_path: str):
with open(input_path, "r") as f:
for line in f:
if path := line.strip():
yield path
Files in directories
def find_files(path: Path, ext: str):
yield from path.glob(f'**/*{ext}')
Asynchronously list files in remote storage
# Using some imaginary client
async def list_bucket(bucket: str) -> AsyncIterator[str]:
client = client.connect()
async for route in client.list_bucket(bucket):
yield route
Note
Since the source object is executed in async event loop, if the source is
Iterable
(synchronous iterator), the source object must be lightweight
and refrain from performing blocking operation.
Running a blocking operation in async event loop can, in turn, prevent the loop from scheduling callbacks, prevent tasks from being canceled, and prevent the background thread from joining.
Processing¶
Pre-processing is where a variety of operations are applied to the items passed from the previous stages.
You can define processing stage by passing an operator function (callable) to
pipe()
. (Also there is aggregate()
method, which can be used to stack multiple items.)
The operator can be either async function or synchronous function. Either way, the operator must take exactly one argument†, which is an output from the earlier stage.
Note
† If you need to pass multiple objects between stages, use tuple or define a
protocol using dataclass
.
The following diagram illustrates a pipeline that fetch images from remote locations, batch decode and send data to GPU.
An implementation could look like this.
It uses spdl.io.async_load_image_batch()
, which can decode and resize images
and send the decoded frames to GPU in asynchronously.
>>> import spdl.io
>>> from spdl.dataloader import PipelineBuilder
>>>
>>> def source() -> Iterator[str]:
... """Returns the list of URLs to fetch data from"""
... ...
>>>
>>> async def download(url: str) -> bytes:
... """Download data from the given URL"""
... ...
>>>
>>> async def process(data: list[bytes]) -> Tensor:
... """Given raw image data, decode, resize, batch and transfer data to GPU"""
... buffer = spdl.io.async_load_image_batch(
... data,
... width=224,
... height=224,
... cuda_config=spdl.io.cuda_config(device_index=0),
... )
... return spdl.io.to_torch(buffer)
>>>
>>> pipeline = (
... PipelineBuiler()
... .add_source(source())
... .pipe(download)
... .aggregate(32)
... .pipe(process)
... .add_sink(4)
... .build()
... )
>>>
>>>
Sink¶
Sink is a buffer where the results of the pipeline is accumulated.
A sink can be attached to pipeline with PipelineBuilder.add_sink()
method.
You can specify how many items can be buffered in the sink.