spdl.pipeline.build_pipeline

build_pipeline(pipeline_cfg: PipelineConfig[U], /, *, num_threads: int, max_failures: int | Fraction = -1, report_stats_interval: float = -1, queue_class: type[AsyncQueue] | None = None, task_hook_factory: Callable[[str], list[TaskHook]] | None = None, stage_id: int = 0) Pipeline[U][source]

Build a pipeline from the config.

Note

If environment variable SPDL_PIPELINE_DIAGNOSTIC_MODE=1 is set, then this function builds a Pipeline in self-diagnostic mode. In self-diagnostic mode, the pipeline will call profile_pipeline function and benchmark each stage with different concurrency. Once the profiling is done, then the program exits.

See also

Example: Pipeline definitions

Illustrates how to build a complex pipeline.

profile_pipeline()

A function to profile a Pipeline stage by stage.

Parameters:
  • pipeline_cfg – The definition of the pipeline to build.

  • num_threads

    The number of threads in the thread pool commonly used among Pipeline stages.

    Note

    If a stage in the pipeline has dedicated executor, that stage will use it.

  • max_failures – The maximum number (int) or rate (Fraction) of failures each pipe stage can have before the pipeline is halted. When an int is provided, it specifies the maximum count of failures. Setting -1 (default) disables it. When a Fraction is provided (e.g., Fraction(1, 10) for 10%), it specifies the maximum failure rate (failures / invocations).

  • report_stats_interval

    When provided, report the pipeline performance stats every given interval. Unit: [sec]

    This is only effective if there is no custom hook or custom AsyncQueue provided for stages. The argument is passed to TaskStatsHook and StatsQueue.

    If a custom stage hook is provided and stats report is needed, you can instantiate TaskStatsHook and include it in the hooks provided to PipelineBuilder.pipe().

    Similarly if you are providing a custom AsyncQueue class, you need to implement the same logic by your self.

  • queue_class – If provided, override the queue class used to connect stages. Must be a class (not an instance) inherits AsyncQueue.

  • task_hook_factory – If provided, used to create task hook objects, given a name of the stage. If None, a default hook, TaskStatsHook is used. To disable hooks, provide a function that returns an empty list.

  • stage_id – The index of the initial stage used for logging.