.. _pipeline-parallelism: Pipeline Parallelism ==================== .. currentmodule:: spdl.pipeline The :py:class:`Pipeline` class supports multi-threading and multi-processing. You can also use a ``Pipeline`` objects as source iterator of another ``Pipeline``. When experimenting, this flexibility makes it easy to switch multi-threading, multi-processing and mixtures of them. .. image:: ../_static/data/parallelism_architecture.png Specifying an executor ---------------------- The core mechanism to deploy concurrency is :py:meth:`asyncio.loop.run_in_executor` method. The synchronous function (or generator) provided to ``Pipeline`` is executed asynchronously using the ``run_in_executor`` method. When you provide a synchronous function (or generator), the ``PipelineBuilder`` internally converts it to an asynchronous equivalent using the ``run_in_executor`` method. In the following snippet, an ``executor`` argument is provided when constructing the ``Pipeline``. .. code-block:: executor: ThreadPoolExecutor | ProcessPoolExecutor | None = ... def my_func(input): ... pipeline = ( PipelineBuilder() .add_source(...) .pipe(my_func, executor=executor) .add_sink(...) .build(...) ) Internally, the ``my_func`` function is converted to an asynchronous equivalent, meaning it's dispatched to the provided executor (or a default one if the executor is ``None``) as follows. .. code-block:: async asynchronous_my_func(input): loop = asyncio.get_running_loop() coroutine = loop.run_in_executor(executor, my_func, input) return await coroutine Multi-threading (default) ------------------------- If you build a pipeline without any customization, it defaults to multi-threading. The event loop dispatches the tasks to the default :py:class:`~concurrent.futures.ThreadPoolExecutor` created with the maximum concurrency specified in :py:meth:`PipelineBuilder.build` method. .. mermaid:: %%{init: {'theme':'base'}}%% graph TB subgraph Process["Process"] EL["Event Loop
(Main Thread)"] subgraph TP["Thread Pool"] T1["Worker Thread 1"] T2["Worker Thread 2"] T3["Worker Thread 3"] T4["Worker Thread 4"] end EL -->|schedules tasks| TP TP -->|returns results| EL end style Process fill:#f0f8ff style EL fill:#e1f5ff style TP fill:#fff4e1 style T1 fill:#fffacd style T2 fill:#fffacd style T3 fill:#fffacd style T4 fill:#fffacd .. note:: **Multi-threading characteristics:** - All threads (main thread and worker threads) run within a single process and naturally share the same memory address space - Fast task startup and minimal overhead - Data can be passed by reference (no copying needed) with fast inter-thread communication - Constrained by the GIL for Python code - best for I/O-bound tasks or GIL-releasing operations **GIL considerations:** Before your application can take advantage of free-threaded Python, to properly achieve concurrency, your stage functions must mainly consist of operations that release the GIL. Libraries such as PyTorch and NumPy release the GIL when manipulating arrays, so they are usually fine. For loading raw byte strings into array format, SPDL offers efficient functions through :py:mod:`spdl.io` module. .. _pipeline-parallelism-custom-mt: Multi-threading (custom) ------------------------ There are cases where you want to use a dedicated thread for certain task. #. You need to maintain a state across multiple task invocations. (caching for faster execution or storing the application context) #. You want to specify a different number of concurrency. One notable example that meets these conditions is transferring data to the GPU. Due to the hardware constraints, only one data transfer can be performed at a time. To transfer data without interrupting the model training, you need to use a stream object dedicated for the transfer, and you want to keep using the same stream object across multiple function invocations. To maintain a state, you can either encapsulate it in a callable class instance, or put it in a `thread-local storage `_. The following example shows how to initialize and store a CUDA stream in a thread-local storage. .. note:: The following code is now available as :py:func:`spdl.io.transfer_tensor`. .. code-block:: python import threading THREAD_LOCAL = threading.local() def _get_threadlocal_stream(index: int) -> tuple[torch.cuda.Stream, torch.device]: if not hasattr(THREAD_LOCAL, "stream"): device = torch.device(f"cuda:{index}") THREAD_LOCAL.stream = torch.cuda.Stream(device) THREAD_LOCAL.device = device return THREAD_LOCAL.stream, THREAD_LOCAL.device The following code illustrates a way to transfer data using the same dedicated stream across function invocations. .. code-block:: python def transfer_data(data: list[Tensor], index: int = 0): stream, device = _get_threadlocal_stream(index) with torch.cuda.stream(stream): data = [ t.obj.pin_memory().to(device, non_blocking=True) for t in data] stream.synchronize() return data Now we want to run this function in background, but we want to use only one thread, and keep using the same thread. For this purpose we create a ``ThreadPoolExecutor`` with one thread and pass it to the pipeline. .. code-block:: python transfer_executor = ThreadPoolExecutor(max_workers=1) pipeline = ( PipelineBuilder() .add_source(...) .pipe(...) .pipe(transfer_data, executor=transfer_executor) .add_sink(...) ) This way, the transfer function is always executed in a dedicated thread, so that it keeps using the same CUDA stream. When tracing this pipeline with `PyTorch Profiler `_, we can see that it is always the one background thread that issues data transfer, and the transfer overlaps with the stream executing the model training. .. image:: ../../_static/data/parallelism_transfer.png Multi-processing (stage) ------------------------ Similar to the custom multi-threading, by providing an instance of :py:class:`~concurrent.futures.ProcessPoolExecutor`, that stage is executed in a subprocess. .. code-block:: executor = ProcessPoolExecutor(...) pipeline = ( PipelineBuilder() .add_source(...) .pipe(task_function, executor=executor) .add_sink(...) ) .. mermaid:: %%{init: {'theme':'base'}}%% graph TB subgraph MP["Main Process"] EL["Event Loop"] MEM1["Memory Space"] end subgraph SP1["Subprocess 1"] W1["Worker"] MEM2["Memory Space"] end subgraph SP2["Subprocess 2"] W2["Worker"] MEM3["Memory Space"] end subgraph SP3["Subprocess 3"] W3["Worker"] MEM4["Memory Space"] end EL -.sends data via IPC.-> W1 EL -.sends data via IPC.-> W2 EL -.sends data via IPC.-> W3 W1 -.returns results via IPC.-> EL W2 -.returns results via IPC.-> EL W3 -.returns results via IPC.-> EL style MP fill:#e1f5ff style SP1 fill:#ffe1e1 style SP2 fill:#ffe1e1 style SP3 fill:#ffe1e1 style MEM1 fill:#e8f5e9 style MEM2 fill:#e8f5e9 style MEM3 fill:#e8f5e9 style MEM4 fill:#e8f5e9 .. note:: **Multi-processing characteristics:** - Each process has its own isolated memory space - No GIL constraints - true parallelism for CPU-bound tasks - Data must be pickled and copied between processes (overhead) - Slower startup due to process creation - Best for CPU-bound tasks that hold the GIL Note that when you dispatch the stage to subprocess, both the function (callable) and the argument are sent from the main process to the subprocess. Then the result obtained by passing the argument to the function is sent back from the subprocess to the main process. Therefore, all of the function (callable), the input argument and the output value must be `picklable `_. If you want to bind extra arguments to a function, you can use :py:func:`functools.partial`. If you want to pass around an object that's not picklable by default, you can define the serialization protocol by providing :py:meth:`object.__getstate__` and :py:meth:`object.__setstate__`. Multi-processing (combined) --------------------------- If you have multiple stages that you want to run in subprocess, it is inefficient to copy data between processes back and forth. One workaround is to combine stages and let each process run processes in a batch. .. code-block:: def preprocess(items: list[T]) -> U: # performs decode/preprocess and collation ... executor = ProcessPoolExecutor(...) pipeline = ( PipelineBuilder() .add_source(...) .aggregate(batch_size) .pipe(preprocess, executor=executor, concurrency=...) .add_sink(...) ) This approach is similar to the conventional DataLoader. One downside with this approach is less robust in error handling than the previous approaches. If preprocessing fails for one item, and if you want to ensure the size of the batch to be consistent, then all items must be dropped too. The other approach does not suffer from this. Multi-threading in subprocess ----------------------------- The multi-threading in subprocess is a paradigm we found effective in the case study :ref:`parallelism-performance`. The :py:func:`spdl.pipeline.run_pipeline_in_subprocess` function moves the given instance of :py:class:`PipelineBuilder` to a subprocess, build and execute the :py:class:`Pipeline` and put the results to inter-process queue. .. mermaid:: %%{init: {'theme':'base'}}%% graph TB subgraph MP["Main Process"] MEL["Event Loop
(Main Thread)"] subgraph MTP["Thread Pool"] MT1["Worker Thread 1
(e.g., GPU Transfer)"] end MEL -->|schedules| MTP end subgraph SP["Subprocess"] SEL["Event Loop
(Sub Thread)"] subgraph STP["Thread Pool"] ST1["Worker Thread 1
(e.g., Download)"] ST2["Worker Thread 2
(e.g., Decode)"] ST3["Worker Thread 3
(e.g., Preprocess)"] end SEL -->|schedules| STP end STP -->|batched data| Q["IPC Queue"] Q -->|batched data| MEL style MP fill:#e1f5ff style SP fill:#ffe1e1 style MEL fill:#b3d9ff style SEL fill:#ffb3b3 style MTP fill:#fff4e1 style STP fill:#fff4e1 style Q fill:#e8f5e9 style MT1 fill:#fffacd style ST1 fill:#fffacd style ST2 fill:#fffacd style ST3 fill:#fffacd .. note:: **How it works:** - **Subprocess**: Runs a full pipeline with its own event loop and thread pool - **Data processing**: Download, decode, and preprocessing happen in the subprocess - **IPC Queue**: Batched data is transferred to the main process via inter-process communication - **Main Process**: Receives batched data and performs GPU transfer in a dedicated thread - **Benefit**: Separates data loading from GPU operations, reducing main thread overhead The following example shows how to use the function. .. code-block:: python # Construct a builder and get its config builder = ( spdl.pipeline.PipelineBuilder() .add_source(...) .pipe(...) ... .add_sink(...) ) config = builder.get_config() # Move it to the subprocess, build the Pipeline iterable = run_pipeline_in_subprocess(config, num_threads=...) # Iterate - epoch 0 for item in iterable: ... # Iterate - epoch 1 for item in iterable: ... .. note:: **Advanced Usage:** - **Pipelines with Merge**: You can run pipelines with sub-pipelines constructed using ``Merge`` by directly passing the ``PipelineConfig`` object to ``run_pipeline_in_subprocess``. This allows complex pipeline topologies to be executed in a subprocess. - **Subinterpreter Execution**: For Python 3.14 and above, the :py:func:`run_pipeline_in_subinterpreter` function is also available. It executes the pipeline in a separate interpreter within the same process, providing interpreter-level isolation while being lighter weight than a full subprocess. Since the result of the ``run_pipeline_in_subprocess`` is an ``iterable``, you can build a pipeline on top of it. This allows to build a pipeline that creates a batch object in a subprocess, then transfer the batch to the GPU in a background thread in the main process. We refer this pattern as MTP ("multi-threading in subprocess"). .. code-block:: python # Pipeline that fetches data, loads, then collates. builder = ( PipelineBuilder() .add_source(Dataset(...)) .pipe(download, concurrency=...) .pipe(load, concurrency=...) .aggregate(batch_size) .pipe(collate) .add_sink(...) ) config = builder.get_config() src = run_pipeline_in_subprocess(config, num_threads=...) # Build another pipeline on top of it, which transfers the data to a # GPU pipeline = ( PipelineBuilder() .add_source(src) .pipe(gpu_transfer) .add_sink(...) .build(...) ) # Iterate for batch in pipeline: ... The MTP mode helps the OS to schedule GPU kernel launches from the main thread (where the training loop is running) in timely manner, and reduces the number of Python objects that the Python interpreter in the main process has to handle.