Implementation detail of Pipeline
=================================
This note is a memorandum of the design trade offs and their consequences encountered
during the development of the :py:class:`spdl.pipeline.Pipeline`.
Simply put, the data processing pipeline is an async functions executed in a background thread,
and the foreground fetches the processed data from the sink.
When implementing foreground/background components, it turned out that a subtle design choice in
one part constraints the design choices of other parts of the system,
and there are multiple constraints that must be met at the same time.
Architecture Overview
---------------------
The following diagram illustrates the key components of the Pipeline and their interactions:
.. mermaid::
graph TB
subgraph FG["Foreground Thread"]
Client[Client Code]
end
subgraph BG["Background Thread"]
EL[Event Loop]
subgraph Pipeline["Pipeline"]
Source[Source Stage]
SQ1(Queue)
Stage1[Processing Stage 1]
SQ2(Queue)
Stage2[Processing Stage 2]
SQ3(Queue)
Sink[Sink Stage]
SinkQ(Sink Queue)
Source -->|data| SQ1
SQ1 --> Stage1
Stage1 -->|data| SQ2
SQ2 --> Stage2
Stage2 -->|data| SQ3
SQ3 --> Sink
Sink -->|data| SinkQ
end
end
subgraph TP["Thread Pool"]
TPQ(Queue)
Thread1["Thread 1
(Task 1)"]
Thread2["Thread 2
(Task 2)"]
Thread3["Thread 3
(Task 3)"]
ThreadN["Thread N
(Task N)"]
TPQ --> Thread1
TPQ --> Thread2
TPQ --> Thread3
TPQ --> ThreadN
end
Client -.->|request data from sink queue| EL
EL -.->|return data| Client
Source -.->|delegate task| TPQ
Stage1 -.->|delegate task| TPQ
Stage2 -.->|delegate task| TPQ
Sink -.->|delegate task| TPQ
EL -->|manages| Pipeline
EL -->|controls| TP
style FG fill:#e1f5ff
style BG fill:#fff4e1
style Pipeline fill:#f0f0f0
style TP fill:#d4edda
style Client fill:#4a90e2
style EL fill:#e67e22
style SinkQ fill:#e74c3c
style TPQ fill:#ffd700
The diagram shows:
- **Foreground Thread**: Contains the client code that requests data from the Pipeline
- **Background Thread**: Hosts the event loop and executes the Pipeline
- **Event Loop**: Manages the Pipeline execution and controls the thread pool
- **Thread Pool**: Executes tasks delegated from each Pipeline stage
- **Pipeline**: Consists of multiple stages (Source, Processing Stages, Sink) connected by queues
- **Tasks**: Multiple tasks (Task 1, Task 2, Task 3, ... Task N) executed by the thread pool for each stage
- **Data Flow** (solid arrows): Data flows through the Pipeline from Source → Processing Stages → Sink → Sink Queue
- **Task Delegation** (dashed arrows): Each Pipeline stage delegates tasks to the thread pool for execution
- **Data Request** (dashed arrows): Foreground thread requests data from the Sink Queue
Async IO
--------
The Pipeline is composed of async functions for the following reasons:
1. **Orchestration Requirements**: Building a pipeline with different concurrency levels and
flexible structure requires an orchestrator to schedule tasks.
Using bare :py:class:`concurrent.futures.ThreadPoolExecutor`
(or :py:class:`concurrent.futures.ProcessPoolExecutor`) makes it difficult
to cleanly implement the Pipeline abstraction.
The async event loop provides the necessary orchestration capabilities.
For details on the limitations of using bare ``ThreadPoolExecutor``
for pipeline implementation, see :doc:`../async/index`.
2. **Integration and Parallelism**: The async context makes it easy to integrate
network utilities, which are often async functions.
Additionally, executing data processing functions in async context enables
inter-op and intra-op parallelism.
Queue vs Async Queue as Buffer
------------------------------
The sink of the Pipeline is where the processed data are buffered. Pipeline runs in the background thread, so that the data are written to the sink in the background thread. They are fetched by the foreground thread. Therefore, the access to the sink must be thread-safe. In addition, pipeline is executed in async event loop, so it is ideal that the sink buffer supports async accessor natively.
Python has two types of queues. One is thread-safe :py:class:`queue.Queue` (sync queue) and the other is its async variant :py:class:`asyncio.Queue` (async queue).
The accessors of sync queue, :py:meth:`queue.Queue.get` and :py:meth:`queue.Queue.put`, are thread-safe, and they support blocking operations with timeout.
The accessors of async queue, :py:meth:`asyncio.Queue.get` and :py:meth:`asyncio.Queue.put`, are not thread-safe. They return coroutine which can be awaited. For the foreground thread to actually fetch the values from the queue, these coroutinues must be executed by the same async event loop that's running the pipeline. There are synchronous variant of these accessors, :py:meth:`asyncio.Queue.get_nowait` and :py:meth:`asyncio.Queue.put_nowait`, which can work without an event loop, but since they are not thread-safe, they can only be used when the pipeline is not running.
If we choose sync queue, reading from the foreground is straightforward because the its accessors are thread-safe, but writing to the queue can block the event loop.
If we choose async queue, writing to the queue is straightforward in an event loop, but reading from the foreground is convoluted, because the access must be thread-safe, and if the loop is running and the Pipeline is still writing the queue, then the read access must use async operation as well.
From the perspective of the apparent code simplicity, :py:class:`queue.Queue` requires less code to write, however, having the blocking :py:meth:`queue.Queue.put` call in event loop makes it impossible to cleanly stop the background thread. This is because the synchronous blocking call blocks the event loop, and prevents the loop from processing cancellation request.
For this reason, we use :py:class:`asyncio.Queue` in the :py:class:`spdl.pipeline.Pipeline`. As a result, the implementation of :py:meth:`spdl.pipeline.Pipeline.get_item` becomes a bit convoluted. The next section explains why it is the case.
Thread, loop and task
---------------------
In implementing :py:class:`spdl.pipeline.Pipeline`, there are several object states that need to be carefully managed. They are
- The state of the background thread which runs the event loop.
- The state of the async event loop managed by the background thread.
- The state of the pipeline task, which process data and puts in the sink buffer.
When the foreground thread attempts to fetch data from sink buffer, which is an async queue, it must use the different API (sync vs async accessor) to get the data, depending on the state of the state of the pipeline execution. This is because when the pipeline is running, the pipeline puts data in the async queue, and the event loop controls its execution. To access the async queue in cooperative manner, the foreground has to issue a request to run fetch coroutine (:py:meth:`asyncio.Queue.get`) to the background thread and wait for the result. However if the event loop is not running, then this request to run the fetch coroutine will never be fulfilled. Therefore, if the event loop is not running, the foreground must use sync accessor (:py:meth:`asyncio.Queue.get_nowait`).
Another thing to consider is how to run the event loop. The foreground attempts to fetch data, the fetch request must be made via :py:func:`asyncio.run_coroutine_threadsafe`, so the system needs access to the loop object. In general, however, it is recommended not to manage loop object explicitly i.e. :py:meth:`asyncio.loop.run_forever` or :py:meth:`asyncio.loop.run_until_complete`). Instead it is encouraged to use :py:func:`asyncio.run`. But if we simply pass the pipeline coroutine to the :py:func:`asyncio.run` function, as soon as the task completes, the event loop is stopped and closed. We would like to encapsulate the event loop in the background thread and abstract away from the foreground thread. But this way, the foreground thread cannot know if the loop is running or not.
Following the above considerations, the implementation of the pipeline executions follows the following constraints.
1. To make the state management simpler, overlap the life cycle of the background thread and the event loop.
- When the thread is started, the control flow is not returned to the foreground thread until the event loop is initialized.
- The thread is stopped when the event loop is stopped.
2. Detach the life cycle of pipeline task from that of the event loop.
- Keep the event loop alive after the pipeline task is completed.
- Wait for the explicit request to stop the loop.
3. The event loop signals the object that manages the background thread that the task is completed.
Following the above constraints, the foreground can decide whether it should use sync or async accessor.
- If the background thread is not started. → Fail
- If the task is completed. → Use sync API
- Otherwise, the task is running. → use async API.
The following sequence diagram summarizes the interaction between the foreground thread, the background thread, the event loop and the pipeline task.
.. mermaid::
sequenceDiagram
FG Thread ->>+ BG Thread: Start BG Thread
create participant Event Loop
BG Thread ->> Event Loop: Start Event loop
Event Loop ->> BG Thread: Event loop initialized
BG Thread ->>- FG Thread: Return
create participant Task
Event Loop ->> Task: Start Task
FG Thread --)+ BG Thread: Q: "Is task started?"
BG Thread --)- FG Thread: A: "Not yet."
Event Loop -->> BG Thread: Signal task start
FG Thread --)+ BG Thread: Q: "Is task started?"
BG Thread --)- FG Thread: A: "Yes it is started."
FG Thread --)+ BG Thread: Q: "Is task completed?"
BG Thread --)- FG Thread: A: "Not yet."
destroy Task
Task ->> Event Loop: Task completed
Event Loop -->> BG Thread: Signal task completion
FG Thread --)+ BG Thread: Q: "Is task completed?"
BG Thread --)- FG Thread: A: "Yes it is completed."
Event Loop ->> Event Loop: Keep event loop alive
FG Thread ->>+ BG Thread: Request stop event loop
BG Thread -->> Event Loop: Signal Stop
BG Thread ->>- FG Thread: Return without waiting for the loop stop
destroy Event Loop
Event Loop ->> BG Thread: Loop Stopped
FG Thread ->>+ BG Thread: Join thread
BG Thread ->>- FG Thread: Return
If the foreground thread decides to stop the pipeline before its completion, the
event loop will cancel the pipeline task, (in turn the pipeline task will cancel
tasks correspond to pipeline stages) then the foreground thread will wait for the
background thread to complete the loop and join.
.. mermaid::
sequenceDiagram
FG Thread ->>+ BG Thread: Start BG Thread
create participant Event Loop
BG Thread ->> Event Loop: Start Event Loop
Event Loop ->> BG Thread: Event loop initialized
BG Thread ->>- FG Thread: Return
create participant Task
Event Loop ->> Task: Start Task
Event Loop -->> BG Thread: Signal task start
FG Thread ->>+ BG Thread: Request stop event loop
BG Thread -->> Event Loop: Signal Stop
BG Thread ->>- FG Thread: Return without waiting for the loop stop
Event Loop -->> Task: Signal Stop
destroy Task
Task ->> Event Loop: Task cancelled
destroy Event Loop
Event Loop ->> BG Thread: Loop Stopped
FG Thread ->>+ BG Thread: Join thread
BG Thread ->>- FG Thread: Return
Building Pipeline from Configuration
-------------------------------------
The process of converting a :py:class:`~spdl.pipeline.defs.PipelineConfig` into an executable :py:class:`~spdl.pipeline.Pipeline` is handled by :py:func:`spdl.pipeline.build_pipeline`. This section explains the multi-step transformation process.
Overview of the Build Process
~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
The build process can be summarized as follows:
1. **Config to Node Graph Conversion**: The :py:class:`~spdl.pipeline.defs.PipelineConfig` is converted into a linked list of :py:class:`~spdl.pipeline._components._node._Node` objects, forming a directed acyclic graph (DAG) without branching.
2. **Recursive Node Traversal**: Starting from the sink node, the upstream nodes are recursively traversed.
3. **Coroutine Creation**: Each node is converted into a coroutine that processes a stream of input data. Each coroutine completes when it receives an EOF (End-of-File) token.
4. **Main Coroutine Assembly**: A main coroutine is created to monitor the state of all stage coroutines.
5. **Error Handling and Cleanup**: The main coroutine is responsible for handling failures, performing cleanup (ensuring upstream stages are completed or cancelled when a downstream node completes), and reacting to interrupt requests from the foreground client code.
6. **Event Loop Execution**: The main coroutine is executed by the event loop running in the background thread.
Detailed Build Steps
~~~~~~~~~~~~~~~~~~~~~
Step 1: Configuration to Node Graph
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
The :py:class:`~spdl.pipeline.defs.PipelineConfig` represents the pipeline structure declaratively. The function :py:func:`~spdl.pipeline._components._node._convert_config` transforms this configuration into a linked list of :py:class:`~spdl.pipeline._components._node._Node` objects.
Each :py:class:`~spdl.pipeline._components._node._Node` is linked with references to its upstream :py:class:`~spdl.pipeline._components._node._Node` objects, forming a directed graph. Each node also has a queue object (:py:class:`~spdl.pipeline.AsyncQueue`) that will be used when building coroutines to buffer data between stages.
The following diagram illustrates how a simple pipeline configuration is converted into a node graph:
.. mermaid::
graph LR
subgraph PConfig["PipelineConfig"]
PC_Src[SourceConfig]
PC_P1["PipeConfig: Stage1"]
PC_P2["PipeConfig: Stage2"]
PC_Sink[SinkConfig]
PC_Src --> PC_P1
PC_P1 --> PC_P2
PC_P2 --> PC_Sink
end
subgraph NGraph["Node Graph"]
N_Src["Node: Source"]
N_P1["Node: Stage1"]
N_P2["Node: Stage2"]
N_Sink["Node: Sink"]
N_Src -.->|queue| N_P1
N_P1 -.->|queue| N_P2
N_P2 -.->|queue| N_Sink
N_P1 -->|upstream| N_Src
N_P2 -->|upstream| N_P1
N_Sink -->|upstream| N_P2
end
PC_Src -.->|converts to| N_Src
PC_P1 -.->|converts to| N_P1
PC_P2 -.->|converts to| N_P2
PC_Sink -.->|converts to| N_Sink
style PC_Src fill:#e1f5ff
style PC_P1 fill:#e1f5ff
style PC_P2 fill:#e1f5ff
style PC_Sink fill:#e1f5ff
style N_Src fill:#fff4e1
style N_P1 fill:#fff4e1
style N_P2 fill:#fff4e1
style N_Sink fill:#fff4e1
Step 2: Recursive Coroutine Creation
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
The function :py:func:`~spdl.pipeline._components._node._build_node_recursive` traverses
the node graph recursively starting from the sink node and creates a coroutine for each node.
The traversal follows the upstream references, ensuring that all upstream coroutines are created
before the downstream coroutine.
For each node type, it calls :py:func:`~spdl.pipeline._components._node._build_node` to
create a coroutine for the corresponding config.
Each coroutine processes input data in a loop and completes when it receives an EOF token.
Step 3: Main Coroutine Assembly
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
The function :py:func:`~spdl.pipeline._components._node._run_pipeline_coroutines` creates the main coroutine that orchestrates the execution of all stage coroutines. This main coroutine:
1. Creates asyncio Tasks from each node's coroutine
2. Monitors the tasks using :py:func:`asyncio.wait` with ``FIRST_COMPLETED`` strategy
3. When a task completes, cancels orphaned upstream tasks
4. Handles cancellation requests from the foreground thread
5. Gathers errors from failed tasks and raises :py:class:`~spdl.pipeline.PipelineFailure` if any task failed
The following diagram illustrates the execution flow of the main coroutine:
.. mermaid::
flowchart TD
Start([Main Coroutine Starts]) --> CreateTasks[Create Tasks from All Nodes]
CreateTasks --> StartTasks[Start All Tasks]
StartTasks --> Wait{Wait for
First Completion}
Wait -->|Task Completed| CancelOrphans[Cancel Orphaned
Upstream Tasks]
Wait -->|Cancellation Request| CancelAll[Cancel All
Pending Tasks]
CancelOrphans --> CheckPending{Any Tasks
Pending?}
CancelAll --> WaitAll[Wait for All
Tasks to Complete]
CheckPending -->|Yes| Wait
CheckPending -->|No| GatherErrors[Gather Errors
from Tasks]
WaitAll --> RaiseCancelled[Raise
CancelledError]
GatherErrors --> HasErrors{Any Errors?}
HasErrors -->|Yes| RaiseFailure[Raise
PipelineFailure]
HasErrors -->|No| Complete([Complete Successfully])
style Start fill:#90EE90
style Complete fill:#90EE90
style RaiseFailure fill:#FFB6C1
style RaiseCancelled fill:#FFB6C1
Step 4: Task Creation and Monitoring
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
When the main coroutine starts, it recursively creates asyncio Tasks for each node
starting from the sink and traversing upstream.
Each task wraps the node's coroutine and begins executing immediately.
The main coroutine then enters a loop monitoring these tasks:
- It uses :py:func:`asyncio.wait` with ``return_when=FIRST_COMPLETED`` to wait for
any task to complete
- When a task completes, it cancels upstream tasks that would become orphaned producers
(i.e., tasks that would keep producing data that will never be consumed)
- If the main coroutine receives a cancellation request, it cancels all pending tasks and
waits for them to complete before re-raising the :py:class:`asyncio.CancelledError`
Step 5: Error Handling and Cleanup
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
The main coroutine implements a comprehensive error handling and cleanup strategy:
**Successful Execution**:
When all tasks complete successfully, each stage passes EOF to its output queue
before completing.
The sink filters out EOF and does not pass it to the output queue.
The main coroutine then gathers any errors (should be none) and completes successfully.
**Failures at Some Stage**:
When a task fails:
1. The failed stage should pass EOF to its output queue before exiting (even when failing)
2. The main coroutine detects the completion and cancels all upstream tasks to prevent orphaned producers
3. Downstream tasks continue processing remaining items in their queues and eventually complete
4. The main coroutine waits for all tasks to complete
5. The main coroutine gathers errors and raises :py:class:`~spdl.pipeline.PipelineFailure`
**Cancellation**:
When the foreground thread requests cancellation:
1. The event loop signals cancellation to the main coroutine
2. The main coroutine catches :py:class:`asyncio.CancelledError`
3. The main coroutine cancels all pending tasks
4. The main coroutine waits for all tasks to complete (allowing graceful cleanup)
5. The main coroutine re-raises :py:class:`asyncio.CancelledError`
Step 6: Pipeline Object Creation
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
Finally, :py:func:`spdl.pipeline.build_pipeline` creates a :py:class:`~spdl.pipeline.Pipeline` object with:
- ``coro``: The main coroutine from :py:func:`~spdl.pipeline._components._node._run_pipeline_coroutines`
- ``queue``: The sink node's output queue (where processed data is buffered)
- ``executor``: A :py:class:`~concurrent.futures.ThreadPoolExecutor` for executing tasks
- ``desc``: A string description of the pipeline
The :py:class:`~spdl.pipeline.Pipeline` object manages the lifecycle of the background thread and
event loop as described in the previous sections.
API reference
-------------
The following functions and classes are the main components of Pipeline's implementation.
They are not public API, but listed here for developers who are interested in learning
how the :py:class:`~spdl.pipeline.Pipeline` is implemented.
.. py:currentmodule:: spdl.pipeline._components._node
.. autoclass:: _Node
:members:
.. autofunction:: _convert_config
.. autofunction:: _build_node_recursive
.. autofunction:: _build_node
.. autofunction:: spdl.pipeline._components._source._source
.. autofunction:: spdl.pipeline._components._pipe._merge
.. autofunction:: spdl.pipeline._components._pipe._pipe
.. autofunction:: spdl.pipeline._components._pipe._ordered_pipe
.. autofunction:: spdl.pipeline._components._sink._sink
.. autofunction:: _run_pipeline_coroutines