Building and Running Pipeline

Building a Pipeline

First, let’s look at how easy it is to build the pipeline in SPDL.

The following snippet demonstrates how one can construct a Pipeline object using a PipelineBuilder object.

>>> from spdl.pipeline import PipelineBuilder
>>>
>>> pipeline = (
...     PipelineBuilder()
...     .add_source(range(12))
...     .pipe(lambda x: 2 * x)
...     .pipe(lambda x: x + 1)
...     .aggregate(3)
...     .add_sink(3)
...     .build(num_threads=1)
... )

The resulting Pipeline object contains all the logic to perform the operations in an async event loop in a background thread.

Running a Pipeline

To run the pipeline, call Pipeline.start(). Once the pipeline starts executing, you can iterate on the pipeline. Finally call Pipeline.stop() to stop the background thread.

>>> pipeline.start()
>>>
>>> for item in pipeline:
...     print(item)
[1, 3, 5]
[7, 9, 11]
[13, 15, 17]
[19, 21, 23]
>>> pipeline.stop()

It is important to call Pipeline.stop(). Forgetting to do so will leave the background thread running, leading to the situation where Python interpreter gets stuck at exit.

In practice, there is always a possibility that the application is interrupted for unexpected reasons. To make sure that the pipeline is stopped, it is recommended to use Pipeline.auto_stop() context manager, which calls Pipeline.start and Pipeline.stop automatically.

>>> with pipeline.auto_stop():
...    for item in pipeline:
...        print(item)

Note

Once Pipeline.stop() method is called, the Pipeline object is unusable. To pause the execution, simply stop consuming the output. The Pipeline will get blocked when the internal buffers are full. To resume the execution, resume consuming the data.

⚠ Caveats ⚠

Unlike processes, threads cannot be killed. The Pipeline object uses a thread pool, and it is important to shutdown the thread pool properly.

There are seemingly unharmful patterns, which can cause a deadlock at the end of the Python interpreter, preventing Python from exiting.

Keeping unnecessary references to Pipeline

It is recommended to keep the resulting Pipeline object as a local variable of an interator, and NOT TO assign it to an object attribute.

class DataLoader:
    ...

    def __iter__(self) -> Iterator[T]:
        # 👍 Leave the `pipeline` variable as a local variable.
        pipeline = self.get_pipeline(...)
        # So that the `pipeline` will get garbage collected after the
        # iterator goes out of the scope.

        with pipeline.auto_stop():
            yield from pipeline.get_iterator(...)

        # The reference count of the `pipeline` object goes to zero
        # here, so it will be garbage collected.
class DataLoader:
    ...

    def __iter__(self) -> Iterator[T]:
        # 🚫 Do not assign the pipeline to the object.
        self.pipeline = self.get_pipeline(...)
        #
        # The pipeline won't get garbage collected until
        # the DataLoader instance goes out of scope,
        # which might cause dead-lock when Python tries to exit.

        with self.pipeline.auto_stop():
            yield from self.pipeline.get_iterator(...)

        # The `pipeline` object won't get garbage collected here.

Calling iter on Pipeline

We recommend to not call the iter() function on a Pipeline object. It can prevent the Pipeline.stop() method from being called at the right time. It in turn might make the Python interpreter hange at exit.

Say you wrap a Pipeline to create an class that resembles conventional DataLoader.

class DataLoader(Iterable[T]):
    ...

    def __iter__(self) -> Iterator[T]:
        pipeline = self.get_pipeline()
        with pipeline.auto_stop():
            for item in pipeline:
                yield item

dataloader = DataLoader(...)

When using this instance, make sure to not leave the iterator object hanging around. That is, the usual for-loop is good.

# 👍 The iterator is garbage collected soon after the for-loop.
for item in dataloader:
    ...
# the pipeline will be shutdown at the end of the for-loop.

This way, the context manager properly calls Pipeline.stop when the execution flow goes out of the loop, even when the application is exiting with unexpected errors.

The following code snippet shows an anti-pattern where the iterator object is assigned to a variable, which delays the shutdown of the thread pool.

# 🚫 Do not keep the iterator object around
ite = iter(dataloader)
item = next(ite)
# the won't be shutdown won't be shutdown until the `ite` variable
# goes out of scope. When does that happen??

The Pipeline.stop is not called until the garbage collector deletes the object. It might cause a deadlock, and prevents Python interpreter from exiting.