spdl.pipeline.Pipeline

class Pipeline[source]

Data processing pipeline. Use PipelineBuilder to instantiate.

See also

Pipeline and PipelineBuilder facilitate building data processing pipeline consists of multiple stages of async operations. It allows to configure the concurrency of each stage independently.

Typically, the source is a lightweight (synchronous) iterable that generates the source location of data, such as file paths and URLs. The first stage retrieves data from the (network) storage.

The subsequent stages process the data, such as decoding images and resizing them, or decoding audio and resampling them.

After the preprocessings are done, the data are buffered in a sink, which is a queue.

The pipeline is executed in a background thread, so that the main thread can perform other tasks while the data are being processed.

The following diagram illustrates this.

flowchart TD Source["Source (Iterator)"] Queue subgraph Op1["Op1 (Concurrency = 4)"] op1_1(Task 1-1) op1_2(Task 1-2) op1_3(Task 1-3) op1_4(Task 1-4) end subgraph Op2["Op2 (Concurrency=2)"] op2_1(Task 2-1) op2_2(Task 2-2) end Queue["Sink (Queue)"] Source --> Op1 Op1 --> Op2 Op2 --> Queue

Example: Bulk loading images

import asyncio

import spdl.io

def source():
    with open("images.txt") as f:
        for path in f:
            yield path

def load(path):
    return await spdl.io.load_image(path)


pipeline: Pipeline = (
    PipelineBuilder()
    .add_source(source())
    .pipe(decode, concurrency=10)
    .add_sink(3)
    .build(num_threads=10)
)

with pipeline.auto_stop():
    for item in pipeline.get_iterator(timeout=30):
        # do something with the decoded image
        ...

When the Pipeline object is garbage collected, the background thread is automatically stopped. You can still use auto_stop() or stop() for deterministic, scoped lifecycle management.

Changed in version 0.4.0: [Experimental] Calling start() and stop() is now optional. When iterating a pipeline that has not been explicitly started, the background thread is started automatically on the first item request. When the Pipeline object is garbage collected, the background thread is stopped automatically via weakref.finalize(). Explicit start() / stop() and the auto_stop() context manager continue to work as before.

Methods

auto_stop(*[, timeout])

Context manager to start/stop the background thread automatically.

get_item(*[, timeout])

Get the next item.

get_iterator(*[, timeout])

Get an iterator, which iterates over the pipeline outputs.

start(*[, timeout])

Start the pipeline in background thread.

stop(*[, timeout])

Stop the pipeline.

__iter__() Iterator[T][source]

Call get_iterator() without arguments.

auto_stop(*, timeout: float | None = None) Iterator[None][source]

Context manager to start/stop the background thread automatically.

Parameters:

timeout – The duration to wait for the thread initialization / shutdown. [Unit: second] If None (default), it waits indefinitely.

get_item(*, timeout: float | None = None) T[source]

Get the next item.

Parameters:

timeout – The duration to wait for the next item to become available. [Unit: second] If None (default), it waits indefinitely.

Raises:
  • RuntimeError – The pipeline is not started.

  • TimeoutError – When pipeline is not producing the next item within the given time.

  • EOFError – When the pipeline is exhausted or cancelled and there are no more items in the sink.

get_iterator(*, timeout: float | None = None) Iterator[T][source]

Get an iterator, which iterates over the pipeline outputs.

The returned iterator covers a single epoch (one pass over the source), regardless of whether the source is continuous (see the continuous argument of PipelineBuilder.add_source). Call this method again to iterate each subsequent epoch:

for epoch in range(num_epochs):
    for item in pipeline.get_iterator(timeout=...):
        ...
Parameters:

timeout – Timeout value used for each get_item call.

Changed in version 0.6.0: Fixed reuse with a continuous source: an iterator that reached its epoch boundary used to resume into the next epoch when reused, but now stays exhausted, consistent with non-continuous sources. Use one iterator per epoch.

start(*, timeout: float | None = None, **kwargs: Any) None[source]

Start the pipeline in background thread.

Parameters:

timeout – Timeout value used when starting the thread and waiting for the pipeline to be initialized. [Unit: second]

Note

Calling start multiple times raises RuntimeError.

stop(*, timeout: float | None = None) None[source]

Stop the pipeline.

Parameters:

timeout – Timeout value used when stopping the pipeline and waiting for the thread to join. [Unit: second]

Note

It is safe to call stop multiple times.