Implementation detail of Pipeline

This note is a memorandum of the design trade offs and their consequences encountered during the development of the spdl.pipeline.Pipeline.

Simply put, the data processing pipeline is an async functions executed in a background thread, and the foreground fetches the processed data from the sink.

When implementing foreground/background components, it turned out that a subtle design choice in one part constraints the design choices of other parts of the system, and there are multiple constraints that must be met at the same time.

Architecture Overview

The following diagram illustrates the key components of the Pipeline and their interactions:

graph TB subgraph FG["Foreground Thread"] Client[Client Code] end subgraph BG["Background Thread"] EL[Event Loop] subgraph Pipeline["Pipeline"] Source[Source Stage] SQ1(Queue) Stage1[Processing Stage 1] SQ2(Queue) Stage2[Processing Stage 2] SQ3(Queue) Sink[Sink Stage] SinkQ(Sink Queue) Source -->|data| SQ1 SQ1 --> Stage1 Stage1 -->|data| SQ2 SQ2 --> Stage2 Stage2 -->|data| SQ3 SQ3 --> Sink Sink -->|data| SinkQ end end subgraph TP["Thread Pool"] TPQ(Queue) Thread1["Thread 1<br/>(Task 1)"] Thread2["Thread 2<br/>(Task 2)"] Thread3["Thread 3<br/>(Task 3)"] ThreadN["Thread N<br/>(Task N)"] TPQ --> Thread1 TPQ --> Thread2 TPQ --> Thread3 TPQ --> ThreadN end Client -.->|request data from sink queue| EL EL -.->|return data| Client Source -.->|delegate task| TPQ Stage1 -.->|delegate task| TPQ Stage2 -.->|delegate task| TPQ Sink -.->|delegate task| TPQ EL -->|manages| Pipeline EL -->|controls| TP style FG fill:#e1f5ff style BG fill:#fff4e1 style Pipeline fill:#f0f0f0 style TP fill:#d4edda style Client fill:#4a90e2 style EL fill:#e67e22 style SinkQ fill:#e74c3c style TPQ fill:#ffd700

The diagram shows:

  • Foreground Thread: Contains the client code that requests data from the Pipeline

  • Background Thread: Hosts the event loop and executes the Pipeline

  • Event Loop: Manages the Pipeline execution and controls the thread pool

  • Thread Pool: Executes tasks delegated from each Pipeline stage

  • Pipeline: Consists of multiple stages (Source, Processing Stages, Sink) connected by queues

  • Tasks: Multiple tasks (Task 1, Task 2, Task 3, … Task N) executed by the thread pool for each stage

  • Data Flow (solid arrows): Data flows through the Pipeline from Source → Processing Stages → Sink → Sink Queue

  • Task Delegation (dashed arrows): Each Pipeline stage delegates tasks to the thread pool for execution

  • Data Request (dashed arrows): Foreground thread requests data from the Sink Queue

Async IO

The Pipeline is composed of async functions for the following reasons:

  1. Orchestration Requirements: Building a pipeline with different concurrency levels and flexible structure requires an orchestrator to schedule tasks. Using bare concurrent.futures.ThreadPoolExecutor (or concurrent.futures.ProcessPoolExecutor) makes it difficult to cleanly implement the Pipeline abstraction. The async event loop provides the necessary orchestration capabilities. For details on the limitations of using bare ThreadPoolExecutor for pipeline implementation, see Introduction to Async I/O.

  2. Integration and Parallelism: The async context makes it easy to integrate network utilities, which are often async functions. Additionally, executing data processing functions in async context enables inter-op and intra-op parallelism.

Queue vs Async Queue as Buffer

The sink of the Pipeline is where the processed data are buffered. Pipeline runs in the background thread, so that the data are written to the sink in the background thread. They are fetched by the foreground thread. Therefore, the access to the sink must be thread-safe. In addition, pipeline is executed in async event loop, so it is ideal that the sink buffer supports async accessor natively.

Python has two types of queues. One is thread-safe queue.Queue (sync queue) and the other is its async variant asyncio.Queue (async queue).

The accessors of sync queue, queue.Queue.get() and queue.Queue.put(), are thread-safe, and they support blocking operations with timeout. The accessors of async queue, asyncio.Queue.get() and asyncio.Queue.put(), are not thread-safe. They return coroutine which can be awaited. For the foreground thread to actually fetch the values from the queue, these coroutinues must be executed by the same async event loop that’s running the pipeline. There are synchronous variant of these accessors, asyncio.Queue.get_nowait() and asyncio.Queue.put_nowait(), which can work without an event loop, but since they are not thread-safe, they can only be used when the pipeline is not running.

If we choose sync queue, reading from the foreground is straightforward because the its accessors are thread-safe, but writing to the queue can block the event loop. If we choose async queue, writing to the queue is straightforward in an event loop, but reading from the foreground is convoluted, because the access must be thread-safe, and if the loop is running and the Pipeline is still writing the queue, then the read access must use async operation as well.

From the perspective of the apparent code simplicity, queue.Queue requires less code to write, however, having the blocking queue.Queue.put() call in event loop makes it impossible to cleanly stop the background thread. This is because the synchronous blocking call blocks the event loop, and prevents the loop from processing cancellation request.

For this reason, we use asyncio.Queue in the spdl.pipeline.Pipeline. As a result, the implementation of spdl.pipeline.Pipeline.get_item() becomes a bit convoluted. The next section explains why it is the case.

Thread, loop and task

In implementing spdl.pipeline.Pipeline, there are several object states that need to be carefully managed. They are

  • The state of the background thread which runs the event loop.

  • The state of the async event loop managed by the background thread.

  • The state of the pipeline task, which process data and puts in the sink buffer.

When the foreground thread attempts to fetch data from sink buffer, which is an async queue, it must use the different API (sync vs async accessor) to get the data, depending on the state of the state of the pipeline execution. This is because when the pipeline is running, the pipeline puts data in the async queue, and the event loop controls its execution. To access the async queue in cooperative manner, the foreground has to issue a request to run fetch coroutine (asyncio.Queue.get()) to the background thread and wait for the result. However if the event loop is not running, then this request to run the fetch coroutine will never be fulfilled. Therefore, if the event loop is not running, the foreground must use sync accessor (asyncio.Queue.get_nowait()).

Another thing to consider is how to run the event loop. The foreground attempts to fetch data, the fetch request must be made via asyncio.run_coroutine_threadsafe(), so the system needs access to the loop object. In general, however, it is recommended not to manage loop object explicitly i.e. asyncio.loop.run_forever() or asyncio.loop.run_until_complete()). Instead it is encouraged to use asyncio.run(). But if we simply pass the pipeline coroutine to the asyncio.run() function, as soon as the task completes, the event loop is stopped and closed. We would like to encapsulate the event loop in the background thread and abstract away from the foreground thread. But this way, the foreground thread cannot know if the loop is running or not.

Following the above considerations, the implementation of the pipeline executions follows the following constraints.

  1. To make the state management simpler, overlap the life cycle of the background thread and the event loop.

  • When the thread is started, the control flow is not returned to the foreground thread until the event loop is initialized.

  • The thread is stopped when the event loop is stopped.

  1. Detach the life cycle of pipeline task from that of the event loop.

  • Keep the event loop alive after the pipeline task is completed.

  • Wait for the explicit request to stop the loop.

  1. The event loop signals the object that manages the background thread that the task is completed.

Following the above constraints, the foreground can decide whether it should use sync or async accessor.

  • If the background thread is not started. → Fail

  • If the task is completed. → Use sync API

  • Otherwise, the task is running. → use async API.

The following sequence diagram summarizes the interaction between the foreground thread, the background thread, the event loop and the pipeline task.

sequenceDiagram FG Thread ->>+ BG Thread: Start BG Thread create participant Event Loop BG Thread ->> Event Loop: Start Event loop Event Loop ->> BG Thread: Event loop initialized BG Thread ->>- FG Thread: Return create participant Task Event Loop ->> Task: Start Task FG Thread --)+ BG Thread: Q: "Is task started?" BG Thread --)- FG Thread: A: "Not yet." Event Loop -->> BG Thread: Signal task start FG Thread --)+ BG Thread: Q: "Is task started?" BG Thread --)- FG Thread: A: "Yes it is started." FG Thread --)+ BG Thread: Q: "Is task completed?" BG Thread --)- FG Thread: A: "Not yet." destroy Task Task ->> Event Loop: Task completed Event Loop -->> BG Thread: Signal task completion FG Thread --)+ BG Thread: Q: "Is task completed?" BG Thread --)- FG Thread: A: "Yes it is completed." Event Loop ->> Event Loop: Keep event loop alive FG Thread ->>+ BG Thread: Request stop event loop BG Thread -->> Event Loop: Signal Stop BG Thread ->>- FG Thread: Return without waiting for the loop stop destroy Event Loop Event Loop ->> BG Thread: Loop Stopped FG Thread ->>+ BG Thread: Join thread BG Thread ->>- FG Thread: Return

If the foreground thread decides to stop the pipeline before its completion, the event loop will cancel the pipeline task, (in turn the pipeline task will cancel tasks correspond to pipeline stages) then the foreground thread will wait for the background thread to complete the loop and join.

sequenceDiagram FG Thread ->>+ BG Thread: Start BG Thread create participant Event Loop BG Thread ->> Event Loop: Start Event Loop Event Loop ->> BG Thread: Event loop initialized BG Thread ->>- FG Thread: Return create participant Task Event Loop ->> Task: Start Task Event Loop -->> BG Thread: Signal task start FG Thread ->>+ BG Thread: Request stop event loop BG Thread -->> Event Loop: Signal Stop BG Thread ->>- FG Thread: Return without waiting for the loop stop Event Loop -->> Task: Signal Stop destroy Task Task ->> Event Loop: Task cancelled destroy Event Loop Event Loop ->> BG Thread: Loop Stopped FG Thread ->>+ BG Thread: Join thread BG Thread ->>- FG Thread: Return

Building Pipeline from Configuration

The process of converting a PipelineConfig into an executable Pipeline is handled by spdl.pipeline.build_pipeline(). This section explains the multi-step transformation process.

Overview of the Build Process

The build process can be summarized as follows:

  1. Config to Node Graph Conversion: The PipelineConfig is converted into a linked list of _Node objects, forming a directed acyclic graph (DAG) without branching.

  2. Recursive Node Traversal: Starting from the sink node, the upstream nodes are recursively traversed.

  3. Coroutine Creation: Each node is converted into a coroutine that processes a stream of input data. Each coroutine completes when it receives an EOF (End-of-File) token.

  4. Main Coroutine Assembly: A main coroutine is created to monitor the state of all stage coroutines.

  5. Error Handling and Cleanup: The main coroutine is responsible for handling failures, performing cleanup (ensuring upstream stages are completed or cancelled when a downstream node completes), and reacting to interrupt requests from the foreground client code.

  6. Event Loop Execution: The main coroutine is executed by the event loop running in the background thread.

Detailed Build Steps

Step 1: Configuration to Node Graph

The PipelineConfig represents the pipeline structure declaratively. The function _convert_config() transforms this configuration into a linked list of _Node objects.

Each _Node is linked with references to its upstream _Node objects, forming a directed graph. Each node also has a queue object (AsyncQueue) that will be used when building coroutines to buffer data between stages.

The following diagram illustrates how a simple pipeline configuration is converted into a node graph:

graph LR subgraph PConfig["PipelineConfig"] PC_Src[SourceConfig] PC_P1["PipeConfig: Stage1"] PC_P2["PipeConfig: Stage2"] PC_Sink[SinkConfig] PC_Src --> PC_P1 PC_P1 --> PC_P2 PC_P2 --> PC_Sink end subgraph NGraph["Node Graph"] N_Src["Node: Source"] N_P1["Node: Stage1"] N_P2["Node: Stage2"] N_Sink["Node: Sink"] N_Src -.->|queue| N_P1 N_P1 -.->|queue| N_P2 N_P2 -.->|queue| N_Sink N_P1 -->|upstream| N_Src N_P2 -->|upstream| N_P1 N_Sink -->|upstream| N_P2 end PC_Src -.->|converts to| N_Src PC_P1 -.->|converts to| N_P1 PC_P2 -.->|converts to| N_P2 PC_Sink -.->|converts to| N_Sink style PC_Src fill:#e1f5ff style PC_P1 fill:#e1f5ff style PC_P2 fill:#e1f5ff style PC_Sink fill:#e1f5ff style N_Src fill:#fff4e1 style N_P1 fill:#fff4e1 style N_P2 fill:#fff4e1 style N_Sink fill:#fff4e1

Step 2: Recursive Coroutine Creation

The function _build_node_recursive() traverses the node graph recursively starting from the sink node and creates a coroutine for each node. The traversal follows the upstream references, ensuring that all upstream coroutines are created before the downstream coroutine.

For each node type, it calls _build_node() to create a coroutine for the corresponding config. Each coroutine processes input data in a loop and completes when it receives an EOF token.

Step 3: Main Coroutine Assembly

The function _run_pipeline_coroutines() creates the main coroutine that orchestrates the execution of all stage coroutines. This main coroutine:

  1. Creates asyncio Tasks from each node’s coroutine

  2. Monitors the tasks using asyncio.wait() with FIRST_COMPLETED strategy

  3. When a task completes, cancels orphaned upstream tasks

  4. Handles cancellation requests from the foreground thread

  5. Gathers errors from failed tasks and raises PipelineFailure if any task failed

The following diagram illustrates the execution flow of the main coroutine:

flowchart TD Start([Main Coroutine Starts]) --> CreateTasks[Create Tasks from All Nodes] CreateTasks --> StartTasks[Start All Tasks] StartTasks --> Wait{Wait for<br/>First Completion} Wait -->|Task Completed| CancelOrphans[Cancel Orphaned<br/>Upstream Tasks] Wait -->|Cancellation Request| CancelAll[Cancel All<br/>Pending Tasks] CancelOrphans --> CheckPending{Any Tasks<br/>Pending?} CancelAll --> WaitAll[Wait for All<br/>Tasks to Complete] CheckPending -->|Yes| Wait CheckPending -->|No| GatherErrors[Gather Errors<br/>from Tasks] WaitAll --> RaiseCancelled[Raise<br/>CancelledError] GatherErrors --> HasErrors{Any Errors?} HasErrors -->|Yes| RaiseFailure[Raise<br/>PipelineFailure] HasErrors -->|No| Complete([Complete Successfully]) style Start fill:#90EE90 style Complete fill:#90EE90 style RaiseFailure fill:#FFB6C1 style RaiseCancelled fill:#FFB6C1

Step 4: Task Creation and Monitoring

When the main coroutine starts, it recursively creates asyncio Tasks for each node starting from the sink and traversing upstream. Each task wraps the node’s coroutine and begins executing immediately.

The main coroutine then enters a loop monitoring these tasks:

  • It uses asyncio.wait() with return_when=FIRST_COMPLETED to wait for any task to complete

  • When a task completes, it cancels upstream tasks that would become orphaned producers (i.e., tasks that would keep producing data that will never be consumed)

  • If the main coroutine receives a cancellation request, it cancels all pending tasks and waits for them to complete before re-raising the asyncio.CancelledError

Step 5: Error Handling and Cleanup

The main coroutine implements a comprehensive error handling and cleanup strategy:

Successful Execution: When all tasks complete successfully, each stage passes EOF to its output queue before completing. The sink filters out EOF and does not pass it to the output queue. The main coroutine then gathers any errors (should be none) and completes successfully.

Failures at Some Stage: When a task fails:

  1. The failed stage should pass EOF to its output queue before exiting (even when failing)

  2. The main coroutine detects the completion and cancels all upstream tasks to prevent orphaned producers

  3. Downstream tasks continue processing remaining items in their queues and eventually complete

  4. The main coroutine waits for all tasks to complete

  5. The main coroutine gathers errors and raises PipelineFailure

Cancellation: When the foreground thread requests cancellation:

  1. The event loop signals cancellation to the main coroutine

  2. The main coroutine catches asyncio.CancelledError

  3. The main coroutine cancels all pending tasks

  4. The main coroutine waits for all tasks to complete (allowing graceful cleanup)

  5. The main coroutine re-raises asyncio.CancelledError

Step 6: Pipeline Object Creation

Finally, spdl.pipeline.build_pipeline() creates a Pipeline object with:

  • coro: The main coroutine from _run_pipeline_coroutines()

  • queue: The sink node’s output queue (where processed data is buffered)

  • executor: A ThreadPoolExecutor for executing tasks

  • desc: A string description of the pipeline

The Pipeline object manages the lifecycle of the background thread and event loop as described in the previous sections.

API reference

The following functions and classes are the main components of Pipeline’s implementation.

They are not public API, but listed here for developers who are interested in learning how the Pipeline is implemented.

class _Node[source]

Represents a node in the pipeline graph.

Each node corresponds to a pipeline stage and contains references to its upstream nodes, forming a directed acyclic graph (DAG). Nodes are used internally during the pipeline build process to create and manage coroutines for each stage.

cfg: _ConfigBase

The configuration object that defines the behavior of this stage.

property coro: Coroutine[None, None, None]
create_task() Task
name: str

A unique identifier for the node, used for logging and task naming.

queue: AsyncQueue

An async queue for buffering data between this stage and downstream stages.

property task: Task
upstream: Sequence[_Node[Any]]

A sequence of upstream nodes that this node depends on.

_convert_config(plc: PipelineConfig[T], q_class: type[AsyncQueue], pipeline_id: int, stage_id: _MutableInt, disable_sink: bool = False) _Node[T][source]

Convert a PipelineConfig into a linked list of _Node objects.

This function recursively transforms a declarative pipeline configuration into a node graph where each node represents a pipeline stage. The nodes are linked via upstream references, forming a directed acyclic graph (DAG) without branching.

Parameters:
  • plc – The pipeline configuration to convert.

  • q_class – The queue class to use for creating buffers between stages.

  • pipeline_id – A unique identifier for the pipeline, used in stage naming.

  • stage_id – A mutable counter for assigning unique stage IDs.

  • disable_sink – If True, skip creating the sink node (used for merge branches).

Returns:

The sink node (or the last processing node if disable_sink is True) with all upstream nodes linked.

_build_node_recursive(node: _Node[Any], fc_class: type[_FailCounter], task_hook_factory: Callable[[str], list[TaskHook]], max_failures: int) None[source]

Recursively build coroutines for a node and all its upstream nodes.

This function traverses the node graph starting from the given node, following upstream references recursively. For each node, it creates a coroutine based on the node’s configuration type (Source, Pipe, Merge, Sink, etc.).

Parameters:
  • node – The node to build coroutines for.

  • fc_class – The failure counter class for tracking task failures.

  • task_hook_factory – A factory function for creating task hooks for monitoring.

  • max_failures – The maximum number of failures allowed before halting.

Raises:

RuntimeError – If attempting to build a coroutine for a node that already has one.

_build_node(node: _Node[Any], fc_class: type[_FailCounter], task_hook_factory: Callable[[str], list[TaskHook]], max_failures: int) None[source]

Build a coroutine for a single node based on its configuration type.

This function creates the appropriate coroutine for a given node by pattern matching on the node’s configuration type. Each configuration type corresponds to a specific pipeline stage behavior:

The created coroutine is stored in the node’s _coro attribute.

Parameters:
  • node – The node to build a coroutine for. The node must not already have a coroutine.

  • fc_class – The failure counter class for tracking task failures.

  • task_hook_factory – A factory function for creating task hooks for monitoring.

  • max_failures – The maximum number of failures allowed before halting.

Raises:
  • ValueError – If an unsupported configuration type is encountered.

  • AssertionError – If the node’s upstream structure doesn’t match the configuration type’s requirements (e.g., SourceConfig must have no upstream nodes).

Note

This function does not handle recursion. Use _build_node_recursive() to build coroutines for a node and all its upstream dependencies.

async _source(src: Iterable[T] | AsyncIterable[T], queue: AsyncQueue, max_items: int | None = None) None[source]

Coroutine that generates data from an iterator and puts it into a queue.

This coroutine represents the source stage of a pipeline. It consumes items from a synchronous or asynchronous iterable and puts them into the output queue for downstream processing. The coroutine completes when the iterator is exhausted or when the maximum number of items has been reached.

Parameters:
  • src – The source iterable (synchronous or asynchronous) to consume data from.

  • queue – The async queue to put items into for downstream consumption.

  • max_items – Optional maximum number of items to process. If None, processes all items from the source.

Note

The EOF token is automatically handled by the queue’s stage hook context manager, so this coroutine does not explicitly send EOF.

_merge(name: str, input_queues: Sequence[AsyncQueue], output_queue: AsyncQueue, fail_counter: _FailCounter, task_hooks: list[TaskHook], merge_op: Callable[[str, Sequence[Queue], Queue], Awaitable[None]] | None) Coroutine[source]

Create a coroutine for merging data from multiple input queues.

This function creates a merge stage coroutine that consumes items from multiple input queues and puts them into a single output queue. By default, items from all input queues are passed through in the order they become available. A custom merge operation can be provided for more complex merging logic.

Parameters:
  • name – The name of the pipeline stage for logging and task naming.

  • input_queues – Sequence of queues to consume input items from.

  • output_queue – The queue to put merged items into.

  • fail_counter – Hook for tracking and limiting task failures.

  • task_hooks – List of hooks for monitoring task execution.

  • merge_op – Optional custom merge operation. If None, uses default pass-through merge that forwards items from all input queues to output queue.

Returns:

A coroutine that executes the merge stage.

Raises:

ValueError – If input_queues is empty or if any input queue is the same as the output queue.

_pipe(name: str, input_queue: AsyncQueue, output_queue: AsyncQueue, args: _PipeArgs[T, U], fail_counter: _FailCounter, task_hooks: list[TaskHook], op_requires_eof: bool) Coroutine[source]

Create a coroutine for processing data from input queue to output queue.

This function creates a processing stage coroutine that consumes items from the input queue, applies a transformation operation (sync or async function/generator), and puts the results into the output queue. It manages concurrent execution of tasks up to the specified concurrency level.

Parameters:
  • name – The name of the pipeline stage for logging and task naming.

  • input_queue – The queue to consume input items from.

  • output_queue – The queue to put processed items into.

  • args – Pipeline arguments containing the operation, executor, and concurrency.

  • fail_counter – Hook for tracking and limiting task failures.

  • task_hooks – List of hooks for monitoring task execution.

  • op_requires_eof – If True, pass EOF token to the operation; otherwise stop processing before EOF.

Returns:

A coroutine that executes the pipeline stage.

Raises:
  • ValueError – If input_queue and output_queue are the same object.

  • RuntimeError – If the number of failures exceeds the threshold.

_ordered_pipe(name: str, input_queue: AsyncQueue, output_queue: AsyncQueue, args: _PipeArgs[T, U], fail_counter: _FailCounter, task_hooks: list[TaskHook]) Coroutine[source]

Create a coroutine for processing data while preserving input order.

This function creates a processing stage coroutine similar to _pipe, but guarantees that output items maintain the same order as input items, regardless of task completion order. This is achieved by using an intermediate queue to buffer tasks and waiting for them to complete in sequence.

Parameters:
  • name – The name of the pipeline stage for logging and task naming.

  • input_queue – The queue to consume input items from.

  • output_queue – The queue to put processed items into (in order).

  • args – Pipeline arguments containing the operation, executor, and concurrency.

  • fail_counter – Hook for tracking and limiting task failures.

  • task_hooks – List of hooks for monitoring task execution.

Returns:

A coroutine that executes the ordered pipeline stage.

Raises:
  • ValueError – If input_queue and output_queue are the same object.

  • RuntimeError – If the number of failures exceeds the threshold.

Note

The operation must be an async function (not an async generator) for ordered pipe.

Implementation Note

The core idea of ordered pipe implementation is to use queue as buffer for active tasks.

       ┌─┐
       │ │
       │ │ AsyncQueue: Input
       │ │
       └┬┘
        │
┌───────▼────────┐
│ Async Function │
└───────┬────────┘
       ┌▼┐
       │ │
       │ │ AsyncQueue: Intermediate queue:
       │ │ contains tasks. queue size == concurrency
       └┬┘
┌───────▼────────┐
│     enqueue    │
└───────┬────────┘
       ┌▼┐
       │ │
       │ │ AsyncQueue: Output
       │ │
       └─┘
async _sink(input_queue: AsyncQueue, output_queue: AsyncQueue) None[source]

Coroutine that consumes data from input queue and buffers it in output queue.

This coroutine represents the sink stage of a pipeline. It consumes items from the input queue and puts them into the output queue for the foreground thread to fetch. The coroutine completes when it receives an EOF token, which it does not pass to the output queue (EOF filtering is the sink’s responsibility).

Parameters:
  • input_queue – The async queue to consume items from (typically from the last processing stage).

  • output_queue – The async queue to buffer items for the foreground thread.

Note

The sink filters out EOF tokens and does not pass them to the output queue. The EOF handling and queue cleanup is managed by the queue’s stage hook context manager.

async _run_pipeline_coroutines(node: _Node[T]) None[source]

Orchestrate the execution of all pipeline stage coroutines.

This is the main coroutine that manages the lifecycle of all pipeline stage tasks. It creates asyncio Tasks from each node’s coroutine, monitors their execution, handles failures, and ensures proper cleanup.

The execution flow:

  1. Creates asyncio Tasks for all nodes (starting from sink, traversing upstream)

  2. Waits for tasks to complete using asyncio.wait with FIRST_COMPLETED

  3. When a task completes, cancels orphaned upstream tasks to prevent deadlocks

  4. Handles cancellation requests from the foreground thread

  5. Gathers errors from failed tasks and raises PipelineFailure if any

Parameters:

node – The sink node of the pipeline (all upstream nodes are accessed via the upstream references).

Raises: