spdl.pipeline.build_pipeline¶
- build_pipeline(pipeline_cfg: PipelineConfig[U], /, *, num_threads: int, max_failures: int | Fraction = -1, report_stats_interval: float = -1, queue_class: type[AsyncQueue] | None = None, task_hook_factory: Callable[[str], list[TaskHook]] | None = None, stage_id: int = 0) Pipeline[U][source]¶
Build a pipeline from the config.
Note
If environment variable
SPDL_PIPELINE_DIAGNOSTIC_MODE=1is set, then this function builds a Pipeline in self-diagnostic mode. In self-diagnostic mode, the pipeline will callprofile_pipelinefunction and benchmark each stage with different concurrency. Once the profiling is done, then the program exits.See also
- Example: Pipeline definitions
Illustrates how to build a complex pipeline.
profile_pipeline()A function to profile a Pipeline stage by stage.
- Parameters:
pipeline_cfg – The definition of the pipeline to build.
num_threads –
The number of threads in the thread pool commonly used among Pipeline stages.
Note
If a stage in the pipeline has dedicated executor, that stage will use it.
max_failures – The maximum number (int) or rate (Fraction) of failures each pipe stage can have before the pipeline is halted. When an int is provided, it specifies the maximum count of failures. Setting
-1(default) disables it. When a Fraction is provided (e.g., Fraction(1, 10) for 10%), it specifies the maximum failure rate (failures / invocations).report_stats_interval –
When provided, report the pipeline performance stats every given interval. Unit: [sec]
This is only effective if there is no custom hook or custom AsyncQueue provided for stages. The argument is passed to
TaskStatsHookandStatsQueue.If a custom stage hook is provided and stats report is needed, you can instantiate
TaskStatsHookand include it in the hooks provided toPipelineBuilder.pipe().Similarly if you are providing a custom
AsyncQueueclass, you need to implement the same logic by your self.queue_class – If provided, override the queue class used to connect stages. Must be a class (not an instance) inherits
AsyncQueue.task_hook_factory – If provided, used to create task hook objects, given a name of the stage. If
None, a default hook,TaskStatsHookis used. To disable hooks, provide a function that returns an empty list.stage_id – The index of the initial stage used for logging.