Building and Running Pipeline¶
First, let’s look at how easy it is to build the pipeline in SPDL.
The following snippet demonstrates how one can construct a
Pipeline
object using a PipelineBuilder
object.
>>> from spdl.pipeline import PipelineBuilder
>>>
>>> pipeline = (
... PipelineBuilder()
... .add_source(range(12))
... .pipe(lambda x: 2 * x)
... .pipe(lambda x: x + 1)
... .aggregate(3)
... .add_sink(3)
... .build()
... )
The resulting Pipeline
object contains all the logic to
perform the operations in an async event loop in the background thread.
To run the pipeline, call Pipeline.start()
.
Once the pipeline starts executing, you can iterate on the pipeline.
Finally call Pipeline.stop()
to stop the background thread.
>>> pipeline.start()
>>>
>>> for item in pipeline:
... print(item)
[1, 3, 5]
[7, 9, 11]
[13, 15, 17]
[19, 21, 23]
>>> pipeline.stop()
It is important to call Pipeline.stop()
.
Forgetting to do so will leave the background thread running,
leading to the situation where Python interpreter gets stuck at exit.
In practice, there is always a possibility that the application is interrupted
and shutdown.
To handle such case and stop the background threads properly,
there is a context manager Pipeline.auto_stop()
which makes sure that
pipeline is stopped.
>>> with pipeline.auto_stop():
... for item in pipeline:
... print(item)
Warning
Do not call iter()
on the pipeline because Pipeline.stop()
might not be called at the right time.
Say you wrap a Pipeline
to create an class that resembles conventional
DataLoader
.
class DataLoader:
...
def __iter__(self):
with self.pipeline.auto_stop():
for item in pipeline:
yield item
dataloader = DataLoader(...)
Make sure to use this class like the following.
This way, the context manager properly calls Pipeline.stop
when
the execution flow goes out of the loop, even
when the application is exiting with unexpected errors.
for item in dataloader:
...
Do not use it like the following. This way, the Pipeline.stop
does not get called until the garbage collector deletes the object,
which might cause deadlock.
iterator = iter(dataloader)
item = next(iterator)
Note
Once Pipeline.stop()
method is called, the Pipeline
object is unusable.
To pause the execution, simply stop consuming the output.
The Pipeline
will get blocked when the internal buffers are full.
To resume the execution, resume consuming the data.