spdl.pipeline.Pipeline

class Pipeline[source]

Data processing pipeline. Use PipelineBuilder to instantiate.

Pipeline and PipelineBuilder facilitate building data processing pipeline consists of multiple stages of async operations. It allows to configure the concurrency of each stage independently.

Typically, the source is a lightweight (synchronous) iterable that generates the source location of data, such as file paths and URLs. The first stage retrieves data from the (network) storage.

The subsequent stages process the data, such as decoding images and resizing them, or decoding audio and resampling them.

After the preprocessings are done, the data are buffered in a sink, which is a queue.

The pipeline is executed in a background thread, so that the main thread can perform other tasks while the data are being processed.

The following diagram illustrates this.

flowchart TD Source["Source (Iterator)"] Queue subgraph Op1["Op1 (Concurrency = 4)"] op1_1(Task 1-1) op1_2(Task 1-2) op1_3(Task 1-3) op1_4(Task 1-4) end subgraph Op2["Op2 (Concurrency=2)"] op2_1(Task 2-1) op2_2(Task 2-2) end Queue["Sink (Queue)"] Source --> Op1 Op1 --> Op2 Op2 --> Queue

Example: Bulk loading images

import asyncio

import spdl.io

def source():
    with open("images.txt") as f:
        for path in f:
            yield path

async def decode(path):
    return await spdl.io.async_decode_image(path)


pipeline: Pipeline = (
    PipelineBuilder()
    .add_source(source())
    .pipe(decode, concurrency=10)
    .add_sink(3)
    .build(num_threads=10)
)

pipeline.start()
try:
    for item in pipeline.get_iterator():
        # do something with the decoded image
        ...
finally:
    pipeline.stop()

Methods

auto_stop(*[, timeout])

Context manager to start/stop the background thread automatically.

get_item(*[, timeout])

Get the next item.

get_iterator(*[, timeout])

Get an iterator, which iterates over the pipeline outputs.

start(*[, timeout])

Start the pipeline in background thread.

stop(*[, timeout])

Stop the pipeline.

__iter__() Iterator[T][source]

Call get_iterator() without arguments.

auto_stop(*, timeout: float | None = None) Iterator[None][source]

Context manager to start/stop the background thread automatically.

Parameters:

timeout – The duration to wait for the thread initialization / shutdown. [Unit: second] If None (default), it waits indefinitely.

get_item(*, timeout: float | None = None) T[source]

Get the next item.

Parameters:

timeout – The duration to wait for the next item to become available. [Unit: second] If None (default), it waits indefinitely.

Raises:
  • RuntimeError – The pipeline is not started.

  • TimeoutError – When pipeline is not producing the next item within the given time.

  • EOFError – When the pipeline is exhausted or cancelled and there are no more items in the sink.

get_iterator(*, timeout: float | None = None) Iterator[T][source]

Get an iterator, which iterates over the pipeline outputs.

Parameters:

timeout – Timeout value used for each get_item call.

start(*, timeout: float | None = None, **kwargs: Any) None[source]

Start the pipeline in background thread.

Parameters:

timeout – Timeout value used when starting the thread and waiting for the pipeline to be initialized. [Unit: second]

Note

Calling start multiple times raises RuntimeError.

stop(*, timeout: float | None = None) None[source]

Stop the pipeline.

Parameters:

timeout – Timeout value used when stopping the pipeline and waiting for the thread to join. [Unit: second]

Note

It is safe to call stop multiple times.