spdl.pipeline.Pipeline¶
- class Pipeline[source]¶
Data processing pipeline. Use
PipelineBuilder
to instantiate.Pipeline
andPipelineBuilder
facilitate building data processing pipeline consists of multiple stages of async operations. It allows to configure the concurrency of each stage independently.Typically, the source is a lightweight (synchronous) iterable that generates the source location of data, such as file paths and URLs. The first stage retrieves data from the (network) storage.
The subsequent stages process the data, such as decoding images and resizing them, or decoding audio and resampling them.
After the preprocessings are done, the data are buffered in a sink, which is a queue.
The pipeline is executed in a background thread, so that the main thread can perform other tasks while the data are being processed.
The following diagram illustrates this.
flowchart TD Source["Source (Iterator)"] Queue subgraph Op1["Op1 (Concurrency = 4)"] op1_1(Task 1-1) op1_2(Task 1-2) op1_3(Task 1-3) op1_4(Task 1-4) end subgraph Op2["Op2 (Concurrency=2)"] op2_1(Task 2-1) op2_2(Task 2-2) end Queue["Sink (Queue)"] Source --> Op1 Op1 --> Op2 Op2 --> QueueExample: Bulk loading images
import asyncio import spdl.io def source(): with open("images.txt") as f: for path in f: yield path async def decode(path): return await spdl.io.async_decode_image(path) pipeline: Pipeline = ( PipelineBuilder() .add_source(source()) .pipe(decode, concurrency=10) .add_sink(3) .build(num_threads=10) ) pipeline.start() try: for item in pipeline.get_iterator(): # do something with the decoded image ... finally: pipeline.stop()
Methods
auto_stop
(*[, timeout])Context manager to start/stop the background thread automatically.
get_item
(*[, timeout])Get the next item.
get_iterator
(*[, timeout])Get an iterator, which iterates over the pipeline outputs.
start
(*[, timeout])Start the pipeline in background thread.
stop
(*[, timeout])Stop the pipeline.
- __iter__() Iterator[T] [source]¶
Call
get_iterator()
without arguments.
- auto_stop(*, timeout: float | None = None) Iterator[None] [source]¶
Context manager to start/stop the background thread automatically.
- Parameters:
timeout – The duration to wait for the thread initialization / shutdown. [Unit: second] If
None
(default), it waits indefinitely.
- get_item(*, timeout: float | None = None) T [source]¶
Get the next item.
- Parameters:
timeout – The duration to wait for the next item to become available. [Unit: second] If
None
(default), it waits indefinitely.- Raises:
RuntimeError – The pipeline is not started.
TimeoutError – When pipeline is not producing the next item within the given time.
EOFError – When the pipeline is exhausted or cancelled and there are no more items in the sink.
- get_iterator(*, timeout: float | None = None) Iterator[T] [source]¶
Get an iterator, which iterates over the pipeline outputs.
- Parameters:
timeout – Timeout value used for each get_item call.