Implementation detail of Pipeline¶
This note is a memorandum of the design trade offs and their consequences encountered
during the development of the spdl.pipeline.Pipeline.
Simply put, the data processing pipeline is an async functions executed in a background thread, and the foreground fetches the processed data from the sink.
When implementing foreground/background components, it turned out that a subtle design choice in one part constraints the design choices of other parts of the system, and there are multiple constraints that must be met at the same time.
Architecture Overview¶
The following diagram illustrates the key components of the Pipeline and their interactions:
The diagram shows:
Foreground Thread: Contains the client code that requests data from the Pipeline
Background Thread: Hosts the event loop and executes the Pipeline
Event Loop: Manages the Pipeline execution and controls the thread pool
Thread Pool: Executes tasks delegated from each Pipeline stage
Pipeline: Consists of multiple stages (Source, Processing Stages, Sink) connected by queues
Tasks: Multiple tasks (Task 1, Task 2, Task 3, … Task N) executed by the thread pool for each stage
Data Flow (solid arrows): Data flows through the Pipeline from Source → Processing Stages → Sink → Sink Queue
Task Delegation (dashed arrows): Each Pipeline stage delegates tasks to the thread pool for execution
Data Request (dashed arrows): Foreground thread requests data from the Sink Queue
Async IO¶
The Pipeline is composed of async functions for the following reasons:
Orchestration Requirements: Building a pipeline with different concurrency levels and flexible structure requires an orchestrator to schedule tasks. Using bare
concurrent.futures.ThreadPoolExecutor(orconcurrent.futures.ProcessPoolExecutor) makes it difficult to cleanly implement the Pipeline abstraction. The async event loop provides the necessary orchestration capabilities. For details on the limitations of using bareThreadPoolExecutorfor pipeline implementation, see Introduction to Async I/O.Integration and Parallelism: The async context makes it easy to integrate network utilities, which are often async functions. Additionally, executing data processing functions in async context enables inter-op and intra-op parallelism.
Queue vs Async Queue as Buffer¶
The sink of the Pipeline is where the processed data are buffered. Pipeline runs in the background thread, so that the data are written to the sink in the background thread. They are fetched by the foreground thread. Therefore, the access to the sink must be thread-safe. In addition, pipeline is executed in async event loop, so it is ideal that the sink buffer supports async accessor natively.
Python has two types of queues. One is thread-safe queue.Queue (sync queue) and the other is its async variant asyncio.Queue (async queue).
The accessors of sync queue, queue.Queue.get() and queue.Queue.put(), are thread-safe, and they support blocking operations with timeout.
The accessors of async queue, asyncio.Queue.get() and asyncio.Queue.put(), are not thread-safe. They return coroutine which can be awaited. For the foreground thread to actually fetch the values from the queue, these coroutinues must be executed by the same async event loop that’s running the pipeline. There are synchronous variant of these accessors, asyncio.Queue.get_nowait() and asyncio.Queue.put_nowait(), which can work without an event loop, but since they are not thread-safe, they can only be used when the pipeline is not running.
If we choose sync queue, reading from the foreground is straightforward because the its accessors are thread-safe, but writing to the queue can block the event loop. If we choose async queue, writing to the queue is straightforward in an event loop, but reading from the foreground is convoluted, because the access must be thread-safe, and if the loop is running and the Pipeline is still writing the queue, then the read access must use async operation as well.
From the perspective of the apparent code simplicity, queue.Queue requires less code to write, however, having the blocking queue.Queue.put() call in event loop makes it impossible to cleanly stop the background thread. This is because the synchronous blocking call blocks the event loop, and prevents the loop from processing cancellation request.
For this reason, we use asyncio.Queue in the spdl.pipeline.Pipeline. As a result, the implementation of spdl.pipeline.Pipeline.get_item() becomes a bit convoluted. The next section explains why it is the case.
Thread, loop and task¶
In implementing spdl.pipeline.Pipeline, there are several object states that need to be carefully managed. They are
The state of the background thread which runs the event loop.
The state of the async event loop managed by the background thread.
The state of the pipeline task, which process data and puts in the sink buffer.
When the foreground thread attempts to fetch data from sink buffer, which is an async queue, it must use the different API (sync vs async accessor) to get the data, depending on the state of the state of the pipeline execution. This is because when the pipeline is running, the pipeline puts data in the async queue, and the event loop controls its execution. To access the async queue in cooperative manner, the foreground has to issue a request to run fetch coroutine (asyncio.Queue.get()) to the background thread and wait for the result. However if the event loop is not running, then this request to run the fetch coroutine will never be fulfilled. Therefore, if the event loop is not running, the foreground must use sync accessor (asyncio.Queue.get_nowait()).
Another thing to consider is how to run the event loop. The foreground attempts to fetch data, the fetch request must be made via asyncio.run_coroutine_threadsafe(), so the system needs access to the loop object. In general, however, it is recommended not to manage loop object explicitly i.e. asyncio.loop.run_forever() or asyncio.loop.run_until_complete()). Instead it is encouraged to use asyncio.run(). But if we simply pass the pipeline coroutine to the asyncio.run() function, as soon as the task completes, the event loop is stopped and closed. We would like to encapsulate the event loop in the background thread and abstract away from the foreground thread. But this way, the foreground thread cannot know if the loop is running or not.
Following the above considerations, the implementation of the pipeline executions follows the following constraints.
To make the state management simpler, overlap the life cycle of the background thread and the event loop.
When the thread is started, the control flow is not returned to the foreground thread until the event loop is initialized.
The thread is stopped when the event loop is stopped.
Detach the life cycle of pipeline task from that of the event loop.
Keep the event loop alive after the pipeline task is completed.
Wait for the explicit request to stop the loop.
The event loop signals the object that manages the background thread that the task is completed.
Following the above constraints, the foreground can decide whether it should use sync or async accessor.
If the background thread is not started. → Fail
If the task is completed. → Use sync API
Otherwise, the task is running. → use async API.
The following sequence diagram summarizes the interaction between the foreground thread, the background thread, the event loop and the pipeline task.
If the foreground thread decides to stop the pipeline before its completion, the event loop will cancel the pipeline task, (in turn the pipeline task will cancel tasks correspond to pipeline stages) then the foreground thread will wait for the background thread to complete the loop and join.
Building Pipeline from Configuration¶
The process of converting a PipelineConfig into an executable Pipeline is handled by spdl.pipeline.build_pipeline(). This section explains the multi-step transformation process.
Overview of the Build Process¶
The build process can be summarized as follows:
Config to Node Graph Conversion: The
PipelineConfigis converted into a linked list of_Nodeobjects, forming a directed acyclic graph (DAG) without branching.Recursive Node Traversal: Starting from the sink node, the upstream nodes are recursively traversed.
Coroutine Creation: Each node is converted into a coroutine that processes a stream of input data. Each coroutine completes when it receives an EOF (End-of-File) token.
Main Coroutine Assembly: A main coroutine is created to monitor the state of all stage coroutines.
Error Handling and Cleanup: The main coroutine is responsible for handling failures, performing cleanup (ensuring upstream stages are completed or cancelled when a downstream node completes), and reacting to interrupt requests from the foreground client code.
Event Loop Execution: The main coroutine is executed by the event loop running in the background thread.
Detailed Build Steps¶
Step 1: Configuration to Node Graph¶
The PipelineConfig represents the pipeline structure declaratively. The function _convert_config() transforms this configuration into a linked list of _Node objects.
Each _Node is linked with references to its upstream _Node objects, forming a directed graph. Each node also has a queue object (AsyncQueue) that will be used when building coroutines to buffer data between stages.
The following diagram illustrates how a simple pipeline configuration is converted into a node graph:
Step 2: Recursive Coroutine Creation¶
The function _build_node_recursive() traverses
the node graph recursively starting from the sink node and creates a coroutine for each node.
The traversal follows the upstream references, ensuring that all upstream coroutines are created
before the downstream coroutine.
For each node type, it calls _build_node() to
create a coroutine for the corresponding config.
Each coroutine processes input data in a loop and completes when it receives an EOF token.
Step 3: Main Coroutine Assembly¶
The function _run_pipeline_coroutines() creates the main coroutine that orchestrates the execution of all stage coroutines. This main coroutine:
Creates asyncio Tasks from each node’s coroutine
Monitors the tasks using
asyncio.wait()withFIRST_COMPLETEDstrategyWhen a task completes, cancels orphaned upstream tasks
Handles cancellation requests from the foreground thread
Gathers errors from failed tasks and raises
PipelineFailureif any task failed
The following diagram illustrates the execution flow of the main coroutine:
Step 4: Task Creation and Monitoring¶
When the main coroutine starts, it recursively creates asyncio Tasks for each node starting from the sink and traversing upstream. Each task wraps the node’s coroutine and begins executing immediately.
The main coroutine then enters a loop monitoring these tasks:
It uses
asyncio.wait()withreturn_when=FIRST_COMPLETEDto wait for any task to completeWhen a task completes, it cancels upstream tasks that would become orphaned producers (i.e., tasks that would keep producing data that will never be consumed)
If the main coroutine receives a cancellation request, it cancels all pending tasks and waits for them to complete before re-raising the
asyncio.CancelledError
Step 5: Error Handling and Cleanup¶
The main coroutine implements a comprehensive error handling and cleanup strategy:
Successful Execution: When all tasks complete successfully, each stage passes EOF to its output queue before completing. The sink filters out EOF and does not pass it to the output queue. The main coroutine then gathers any errors (should be none) and completes successfully.
Failures at Some Stage: When a task fails:
The failed stage should pass EOF to its output queue before exiting (even when failing)
The main coroutine detects the completion and cancels all upstream tasks to prevent orphaned producers
Downstream tasks continue processing remaining items in their queues and eventually complete
The main coroutine waits for all tasks to complete
The main coroutine gathers errors and raises
PipelineFailure
Cancellation: When the foreground thread requests cancellation:
The event loop signals cancellation to the main coroutine
The main coroutine catches
asyncio.CancelledErrorThe main coroutine cancels all pending tasks
The main coroutine waits for all tasks to complete (allowing graceful cleanup)
The main coroutine re-raises
asyncio.CancelledError
Step 6: Pipeline Object Creation¶
Finally, spdl.pipeline.build_pipeline() creates a Pipeline object with:
coro: The main coroutine from_run_pipeline_coroutines()queue: The sink node’s output queue (where processed data is buffered)executor: AThreadPoolExecutorfor executing tasksdesc: A string description of the pipeline
The Pipeline object manages the lifecycle of the background thread and
event loop as described in the previous sections.
API reference¶
The following functions and classes are the main components of Pipeline’s implementation.
They are not public API, but listed here for developers who are interested in learning
how the Pipeline is implemented.
- class _Node[source]¶
Represents a node in the pipeline graph.
Each node corresponds to a pipeline stage and contains references to its upstream nodes, forming a directed acyclic graph (DAG). Nodes are used internally during the pipeline build process to create and manage coroutines for each stage.
- cfg: _ConfigBase¶
The configuration object that defines the behavior of this stage.
- create_task() Task¶
- queue: AsyncQueue¶
An async queue for buffering data between this stage and downstream stages.
- property task: Task¶
- _convert_config(plc: PipelineConfig[T], q_class: type[AsyncQueue], pipeline_id: int, stage_id: _MutableInt, disable_sink: bool = False) _Node[T][source]¶
Convert a
PipelineConfiginto a linked list of_Nodeobjects.This function recursively transforms a declarative pipeline configuration into a node graph where each node represents a pipeline stage. The nodes are linked via upstream references, forming a directed acyclic graph (DAG) without branching.
- Parameters:
plc – The pipeline configuration to convert.
q_class – The queue class to use for creating buffers between stages.
pipeline_id – A unique identifier for the pipeline, used in stage naming.
stage_id – A mutable counter for assigning unique stage IDs.
disable_sink – If True, skip creating the sink node (used for merge branches).
- Returns:
The sink node (or the last processing node if disable_sink is True) with all upstream nodes linked.
- _build_node_recursive(node: _Node[Any], fc_class: type[_FailCounter], task_hook_factory: Callable[[str], list[TaskHook]], max_failures: int) None[source]¶
Recursively build coroutines for a node and all its upstream nodes.
This function traverses the node graph starting from the given node, following upstream references recursively. For each node, it creates a coroutine based on the node’s configuration type (Source, Pipe, Merge, Sink, etc.).
- Parameters:
node – The node to build coroutines for.
fc_class – The failure counter class for tracking task failures.
task_hook_factory – A factory function for creating task hooks for monitoring.
max_failures – The maximum number of failures allowed before halting.
- Raises:
RuntimeError – If attempting to build a coroutine for a node that already has one.
- _build_node(node: _Node[Any], fc_class: type[_FailCounter], task_hook_factory: Callable[[str], list[TaskHook]], max_failures: int) None[source]¶
Build a coroutine for a single node based on its configuration type.
This function creates the appropriate coroutine for a given node by pattern matching on the node’s configuration type. Each configuration type corresponds to a specific pipeline stage behavior:
SourceConfig: Creates a source coroutine that generates data from an iterator using_source()MergeConfig: Creates a merge coroutine that combines multiple input streams using_merge()SinkConfig: Creates a sink coroutine that buffers output data using_sink()PipeConfig: Creates a processing coroutineusing
AggregateConfig: Creates a pipe coroutine with aggregation logic using_pipe().DisaggregateConfig: Creates a pipe coroutine with disaggregation logic using_pipe()
The created coroutine is stored in the node’s
_coroattribute.- Parameters:
node – The node to build a coroutine for. The node must not already have a coroutine.
fc_class – The failure counter class for tracking task failures.
task_hook_factory – A factory function for creating task hooks for monitoring.
max_failures – The maximum number of failures allowed before halting.
- Raises:
ValueError – If an unsupported configuration type is encountered.
AssertionError – If the node’s upstream structure doesn’t match the configuration type’s requirements (e.g., SourceConfig must have no upstream nodes).
Note
This function does not handle recursion. Use
_build_node_recursive()to build coroutines for a node and all its upstream dependencies.
- async _source(src: Iterable[T] | AsyncIterable[T], queue: AsyncQueue, max_items: int | None = None) None[source]¶
Coroutine that generates data from an iterator and puts it into a queue.
This coroutine represents the source stage of a pipeline. It consumes items from a synchronous or asynchronous iterable and puts them into the output queue for downstream processing. The coroutine completes when the iterator is exhausted or when the maximum number of items has been reached.
- Parameters:
src – The source iterable (synchronous or asynchronous) to consume data from.
queue – The async queue to put items into for downstream consumption.
max_items – Optional maximum number of items to process. If
None, processes all items from the source.
Note
The EOF token is automatically handled by the queue’s stage hook context manager, so this coroutine does not explicitly send EOF.
- _merge(name: str, input_queues: Sequence[AsyncQueue], output_queue: AsyncQueue, fail_counter: _FailCounter, task_hooks: list[TaskHook], merge_op: Callable[[str, Sequence[Queue], Queue], Awaitable[None]] | None) Coroutine[source]¶
Create a coroutine for merging data from multiple input queues.
This function creates a merge stage coroutine that consumes items from multiple input queues and puts them into a single output queue. By default, items from all input queues are passed through in the order they become available. A custom merge operation can be provided for more complex merging logic.
- Parameters:
name – The name of the pipeline stage for logging and task naming.
input_queues – Sequence of queues to consume input items from.
output_queue – The queue to put merged items into.
fail_counter – Hook for tracking and limiting task failures.
task_hooks – List of hooks for monitoring task execution.
merge_op – Optional custom merge operation. If None, uses default pass-through merge that forwards items from all input queues to output queue.
- Returns:
A coroutine that executes the merge stage.
- Raises:
ValueError – If input_queues is empty or if any input queue is the same as the output queue.
- _pipe(name: str, input_queue: AsyncQueue, output_queue: AsyncQueue, args: _PipeArgs[T, U], fail_counter: _FailCounter, task_hooks: list[TaskHook], op_requires_eof: bool) Coroutine[source]¶
Create a coroutine for processing data from input queue to output queue.
This function creates a processing stage coroutine that consumes items from the input queue, applies a transformation operation (sync or async function/generator), and puts the results into the output queue. It manages concurrent execution of tasks up to the specified concurrency level.
- Parameters:
name – The name of the pipeline stage for logging and task naming.
input_queue – The queue to consume input items from.
output_queue – The queue to put processed items into.
args – Pipeline arguments containing the operation, executor, and concurrency.
fail_counter – Hook for tracking and limiting task failures.
task_hooks – List of hooks for monitoring task execution.
op_requires_eof – If True, pass EOF token to the operation; otherwise stop processing before EOF.
- Returns:
A coroutine that executes the pipeline stage.
- Raises:
ValueError – If input_queue and output_queue are the same object.
RuntimeError – If the number of failures exceeds the threshold.
- _ordered_pipe(name: str, input_queue: AsyncQueue, output_queue: AsyncQueue, args: _PipeArgs[T, U], fail_counter: _FailCounter, task_hooks: list[TaskHook]) Coroutine[source]¶
Create a coroutine for processing data while preserving input order.
This function creates a processing stage coroutine similar to
_pipe, but guarantees that output items maintain the same order as input items, regardless of task completion order. This is achieved by using an intermediate queue to buffer tasks and waiting for them to complete in sequence.- Parameters:
name – The name of the pipeline stage for logging and task naming.
input_queue – The queue to consume input items from.
output_queue – The queue to put processed items into (in order).
args – Pipeline arguments containing the operation, executor, and concurrency.
fail_counter – Hook for tracking and limiting task failures.
task_hooks – List of hooks for monitoring task execution.
- Returns:
A coroutine that executes the ordered pipeline stage.
- Raises:
ValueError – If input_queue and output_queue are the same object.
RuntimeError – If the number of failures exceeds the threshold.
Note
The operation must be an async function (not an async generator) for ordered pipe.
Implementation Note
The core idea of ordered pipe implementation is to use queue as buffer for active tasks.
┌─┐ │ │ │ │ AsyncQueue: Input │ │ └┬┘ │ ┌───────▼────────┐ │ Async Function │ └───────┬────────┘ ┌▼┐ │ │ │ │ AsyncQueue: Intermediate queue: │ │ contains tasks. queue size == concurrency └┬┘ ┌───────▼────────┐ │ enqueue │ └───────┬────────┘ ┌▼┐ │ │ │ │ AsyncQueue: Output │ │ └─┘
- async _sink(input_queue: AsyncQueue, output_queue: AsyncQueue) None[source]¶
Coroutine that consumes data from input queue and buffers it in output queue.
This coroutine represents the sink stage of a pipeline. It consumes items from the input queue and puts them into the output queue for the foreground thread to fetch. The coroutine completes when it receives an EOF token, which it does not pass to the output queue (EOF filtering is the sink’s responsibility).
- Parameters:
input_queue – The async queue to consume items from (typically from the last processing stage).
output_queue – The async queue to buffer items for the foreground thread.
Note
The sink filters out EOF tokens and does not pass them to the output queue. The EOF handling and queue cleanup is managed by the queue’s stage hook context manager.
- async _run_pipeline_coroutines(node: _Node[T]) None[source]¶
Orchestrate the execution of all pipeline stage coroutines.
This is the main coroutine that manages the lifecycle of all pipeline stage tasks. It creates asyncio Tasks from each node’s coroutine, monitors their execution, handles failures, and ensures proper cleanup.
The execution flow:
Creates asyncio Tasks for all nodes (starting from sink, traversing upstream)
Waits for tasks to complete using asyncio.wait with FIRST_COMPLETED
When a task completes, cancels orphaned upstream tasks to prevent deadlocks
Handles cancellation requests from the foreground thread
Gathers errors from failed tasks and raises PipelineFailure if any
- Parameters:
node – The sink node of the pipeline (all upstream nodes are accessed via the upstream references).
- Raises:
asyncio.CancelledError – If the pipeline is cancelled by the foreground thread.
PipelineFailure – If any pipeline stage encounters an error.