Building and Running Pipeline¶
First, let’s look at how easy it is to build the pipeline in SPDL.
The following snippet demonstrates how one can construct a
Pipeline
object using a PipelineBuilder
object.
>>> from spdl.dataloader import PipelineBuilder
>>>
>>> pipeline = (
... PipelineBuilder()
... .add_source(range(12))
... .pipe(lambda x: 2 * x)
... .pipe(lambda x: x + 1)
... .aggregate(3)
... .add_sink(3)
... .build()
... )
The resulting Pipeline
object contains all the logic to
perform the operations in an async event loop in the background thread.
To run the pipeline, call Pipeline.start()
.
Once the pipeline starts executing, you can iterate on the pipeline.
Finally call Pipeline.stop()
to stop the background thread.
>>> pipeline.start()
>>>
>>> for item in pipeline:
... print(item)
[1, 3, 5]
[7, 9, 11]
[13, 15, 17]
[19, 21, 23]
>>> pipeline.stop()
It is important to call Pipeline.stop()
.
Forgetting to do so will leave the background thread running,
leading to the situation where Python interpreter gets stuck at exit.
In practice, there is always a chance that data processing raises an error,
so there is a context manager Pipeline.auto_stop()
to make sure that
pipeline is stopped.
>>> with pipeline.auto_stop():
... for item in pipeline:
... print(item)
Note
Once Pipeline.stop()
method is called, the Pipeline
object is unusable.
To pause and resume the execution, simply keep the reference around until the
next use.