spdl.pipeline.run_pipeline_in_subprocess¶
- run_pipeline_in_subprocess(config_or_builder: PipelineConfig[T], /, *, num_threads: int, max_failures: int | Fraction = -1, report_stats_interval: float = -1, queue_class: type[AsyncQueue] | None = None, task_hook_factory: Callable[[StageInfo], list[TaskHook]] | None = None, background_tasks: list[Callable[[], BackgroundTask]] | None = None, use_thread_output_queue: bool = False, **kwargs: Any) Iterable[T][source]¶
Run the given Pipeline in a subprocess, and iterate on the result.
The returned
Iterablesupports multiple iterations. The subprocess is created once and reused — each call toiter()(orfor ... in) builds a freshPipelineinside the same subprocess without spawning a new process. This avoids the overhead of subprocess creation (fork/spawn, initializer execution, and pickling) on every iteration.For multi-epoch training, create the iterable once before the epoch loop and iterate it each epoch:
src = run_pipeline_in_subprocess(config, num_threads=4) for epoch in range(num_epochs): for batch in src: train(batch)
If the given config has a continuous source (built with
PipelineBuilder.add_source(..., continuous=True)), the pipeline is built and started once inside the subprocess and then reused across epochs: it keeps running in the background between epochs instead of being torn down and rebuilt. This keeps the prefetch buffer warm and removes the per-epoch rebuild gap, which matters most when the training step is short (e.g. small models):config = ( PipelineBuilder() .add_source(dataset, continuous=True) .pipe(load, concurrency=4) .aggregate(batch_size) .add_sink(buffer_size=3) .get_config() ) src = run_pipeline_in_subprocess(config, num_threads=4) for epoch in range(num_epochs): for batch in src: # one epoch; subprocess pipeline stays warm train(batch)
Each iteration yields exactly one epoch (the epoch boundary is handled internally), so the loop above iterates one epoch per
forpass just as in the non-continuous case. The continuous setting only changes how the subprocess manages the pipeline between epochs. To run continuous GPU-transfer stages in the main process, an outer pipeline can wrapsrcwithadd_source(src, continuous=True)(see the MTP pattern in the parallelism guide).Note
Pipe stages configured with a stdlib
concurrent.futures.ThreadPoolExecutor,concurrent.futures.ProcessPoolExecutoror (on Python 3.14+)InterpreterPoolExecutorare explicitly supported, even though these executors are not picklable.Such an executor must be freshly constructed — handed over without any work submitted yet — because its workers are (re)created as part of running the pipeline in the subprocess (the whole point of moving execution there). Passing one that has already spawned workers (i.e. been used) lifts it mid-lifecycle and raises
ValueError.ThreadPoolExecutor/InterpreterPoolExecutor: their constructor arguments are serialized and an equivalent executor (same type, samemax_workers) is reconstructed inside the subprocess. Their workers (threads / subinterpreters) live inside the subprocess and are cleaned up when it exits.ProcessPoolExecutor: its worker processes are spawned in the main process (as children of the main process, not grandchildren via the pipeline subprocess) and the executor is replaced with a queue-backed proxy that the subprocess submits to. This keeps ownership of the worker processes in the main process, which reaps them when the returned iterable is garbage-collected, so they cannot be orphaned if the pipeline subprocess is force-killed. The worker count (max_workers) andinitializer/initargsare preserved; other construction options (e.g.mp_context,max_tasks_per_child) are not honored.Warning
Those worker processes are spawned with the start method named by
mp_context(default: the platform default start method —forkon Linux through Python 3.13,forkserverfrom Python 3.14). Spawning them withforkfrom a process that already has other live threads can deadlock —forkcopies only the calling thread, so a lock held by another thread is never released in the child. If you attach aProcessPoolExecutorand the main process is (or may become) multi-threaded, passmp_context="spawn"or"forkserver", or build the pipeline before any other threads start. ARuntimeWarningis emitted when this risky combination is detected.
SPDL’s own
PriorityThreadPoolExecutorand related pool executors are already picklable and pass through unchanged.- Parameters:
config_or_builder –
The definition of
Pipeline. Can be either aPipelineConfigorPipelineBuilder.Warning
The support for
PipelineBuilderis deprecated, and will be removed in the future. Please call get_config() method and pass the config object.num_threads – Passed to
build_pipeline().max_failures – Passed to
build_pipeline().report_stats_interval – Passed to
build_pipeline().queue_class – Passed to
build_pipeline().task_hook_factory – Passed to
build_pipeline().background_tasks – Passed to
build_pipeline().kwargs – Passed to
iterate_in_subprocess().
- Yields:
The results yielded from the pipeline.
Changed in version 0.6.0: Pipe stages configured with a stdlib
ThreadPoolExecutor,ProcessPoolExecutor, or (on Python 3.14+)InterpreterPoolExecutorare now supported. Thread/interpreter pools are reconstructed inside the subprocess; aProcessPoolExecutor’s worker processes are spawned in (and owned by) the main process.See also
iterate_in_subprocess()implements the logic for manipulating an iterable in a subprocess.Parallelism and Performance for the context in which this function was created.