Building and Running Pipeline

First, let’s look at how easy it is to build the pipeline in SPDL.

The following snippet demonstrates how one can construct a Pipeline object using a PipelineBuilder object.

>>> from spdl.dataloader import PipelineBuilder
>>>
>>> pipeline = (
...     PipelineBuilder()
...     .add_source(range(12))
...     .pipe(lambda x: 2 * x)
...     .pipe(lambda x: x + 1)
...     .aggregate(3)
...     .add_sink(3)
...     .build()
... )

The resulting Pipeline object contains all the logic to perform the operations in an async event loop in the background thread.

To run the pipeline, call Pipeline.start(). Once the pipeline starts executing, you can iterate on the pipeline. Finally call Pipeline.stop() to stop the background thread.

>>> pipeline.start()
>>>
>>> for item in pipeline:
...     print(item)
[1, 3, 5]
[7, 9, 11]
[13, 15, 17]
[19, 21, 23]
>>> pipeline.stop()

It is important to call Pipeline.stop(). Forgetting to do so will leave the background thread running, leading to the situation where Python interpreter gets stuck at exit.

In practice, there is always a chance that data processing raises an error, so there is a context manager Pipeline.auto_stop() to make sure that pipeline is stopped.

>>> with pipeline.auto_stop():
...    for item in pipeline:
...        print(item)

Note

Once Pipeline.stop() method is called, the Pipeline object is unusable. To pause and resume the execution, simply keep the reference around until the next use.