spdl.pipeline.run_pipeline_in_subprocess

run_pipeline_in_subprocess(config_or_builder: PipelineConfig[T], /, *, num_threads: int, max_failures: int | Fraction = -1, report_stats_interval: float = -1, queue_class: type[AsyncQueue] | None = None, task_hook_factory: Callable[[StageInfo], list[TaskHook]] | None = None, background_tasks: list[Callable[[], BackgroundTask]] | None = None, use_thread_output_queue: bool = False, **kwargs: Any) Iterable[T][source]

Run the given Pipeline in a subprocess, and iterate on the result.

The returned Iterable supports multiple iterations. The subprocess is created once and reused — each call to iter() (or for ... in) builds a fresh Pipeline inside the same subprocess without spawning a new process. This avoids the overhead of subprocess creation (fork/spawn, initializer execution, and pickling) on every iteration.

For multi-epoch training, create the iterable once before the epoch loop and iterate it each epoch:

src = run_pipeline_in_subprocess(config, num_threads=4)
for epoch in range(num_epochs):
    for batch in src:
        train(batch)

If the given config has a continuous source (built with PipelineBuilder.add_source(..., continuous=True)), the pipeline is built and started once inside the subprocess and then reused across epochs: it keeps running in the background between epochs instead of being torn down and rebuilt. This keeps the prefetch buffer warm and removes the per-epoch rebuild gap, which matters most when the training step is short (e.g. small models):

config = (
    PipelineBuilder()
    .add_source(dataset, continuous=True)
    .pipe(load, concurrency=4)
    .aggregate(batch_size)
    .add_sink(buffer_size=3)
    .get_config()
)
src = run_pipeline_in_subprocess(config, num_threads=4)
for epoch in range(num_epochs):
    for batch in src:  # one epoch; subprocess pipeline stays warm
        train(batch)

Each iteration yields exactly one epoch (the epoch boundary is handled internally), so the loop above iterates one epoch per for pass just as in the non-continuous case. The continuous setting only changes how the subprocess manages the pipeline between epochs. To run continuous GPU-transfer stages in the main process, an outer pipeline can wrap src with add_source(src, continuous=True) (see the MTP pattern in the parallelism guide).

Note

Pipe stages configured with a stdlib concurrent.futures.ThreadPoolExecutor, concurrent.futures.ProcessPoolExecutor or (on Python 3.14+) InterpreterPoolExecutor are explicitly supported, even though these executors are not picklable.

Such an executor must be freshly constructed — handed over without any work submitted yet — because its workers are (re)created as part of running the pipeline in the subprocess (the whole point of moving execution there). Passing one that has already spawned workers (i.e. been used) lifts it mid-lifecycle and raises ValueError.

  • ThreadPoolExecutor / InterpreterPoolExecutor: their constructor arguments are serialized and an equivalent executor (same type, same max_workers) is reconstructed inside the subprocess. Their workers (threads / subinterpreters) live inside the subprocess and are cleaned up when it exits.

  • ProcessPoolExecutor: its worker processes are spawned in the main process (as children of the main process, not grandchildren via the pipeline subprocess) and the executor is replaced with a queue-backed proxy that the subprocess submits to. This keeps ownership of the worker processes in the main process, which reaps them when the returned iterable is garbage-collected, so they cannot be orphaned if the pipeline subprocess is force-killed. The worker count (max_workers) and initializer/initargs are preserved; other construction options (e.g. mp_context, max_tasks_per_child) are not honored.

    Warning

    Those worker processes are spawned with the start method named by mp_context (default: the platform default start method — fork on Linux through Python 3.13, forkserver from Python 3.14). Spawning them with fork from a process that already has other live threads can deadlock — fork copies only the calling thread, so a lock held by another thread is never released in the child. If you attach a ProcessPoolExecutor and the main process is (or may become) multi-threaded, pass mp_context="spawn" or "forkserver", or build the pipeline before any other threads start. A RuntimeWarning is emitted when this risky combination is detected.

SPDL’s own PriorityThreadPoolExecutor and related pool executors are already picklable and pass through unchanged.

Parameters:
Yields:

The results yielded from the pipeline.

Changed in version 0.6.0: Pipe stages configured with a stdlib ThreadPoolExecutor, ProcessPoolExecutor, or (on Python 3.14+) InterpreterPoolExecutor are now supported. Thread/interpreter pools are reconstructed inside the subprocess; a ProcessPoolExecutor’s worker processes are spawned in (and owned by) the main process.

See also