spdl.pipeline.run_pipeline_in_subprocess¶
- run_pipeline_in_subprocess(config_or_builder: PipelineConfig[T], /, *, num_threads: int, max_failures: int | Fraction = -1, report_stats_interval: float = -1, queue_class: type[AsyncQueue] | None = None, task_hook_factory: Callable[[StageInfo], list[TaskHook]] | None = None, background_tasks: list[Callable[[], BackgroundTask]] | None = None, **kwargs: Any) Iterable[T][source]¶
Run the given Pipeline in a subprocess, and iterate on the result.
The returned
Iterablesupports multiple iterations. The subprocess is created once and reused — each call toiter()(orfor ... in) builds a freshPipelineinside the same subprocess without spawning a new process. This avoids the overhead of subprocess creation (fork/spawn, initializer execution, and pickling) on every iteration.For multi-epoch training, create the iterable once before the epoch loop and iterate it each epoch:
src = run_pipeline_in_subprocess(config, num_threads=4) for epoch in range(num_epochs): for batch in src: train(batch)
- Parameters:
config_or_builder –
The definition of
Pipeline. Can be either aPipelineConfigorPipelineBuilder.Warning
The support for
PipelineBuilderis deprecated, and will be removed in the future. Please call get_config() method and pass the config object.num_threads – Passed to
build_pipeline().max_failures – Passed to
build_pipeline().report_stats_interval – Passed to
build_pipeline().queue_class – Passed to
build_pipeline().task_hook_factory – Passed to
build_pipeline().background_tasks – Passed to
build_pipeline().kwargs – Passed to
iterate_in_subprocess().
- Yields:
The results yielded from the pipeline.
See also
iterate_in_subprocess()implements the logic for manipulating an iterable in a subprocess.Parallelism and Performance for the context in which this function was created.