spdl.pipeline.profile_pipeline¶
- profile_pipeline(cfg: PipelineConfig[T, U], num_inputs: int = 1000, *, callback: Callable[[ProfileResult], None] | None = None, hook: ProfileHook | None = None) Sequence[ProfileResult] [source]¶
[Experimental] Profile pipeline by running pipes separately while changing the concurrency, measuring performance and logging results.
This function benchmarks each pipeline stage independently across different concurrency levels (32, 16, 8, 4, 1) to identify optimal performance settings. It measures both throughput (QPS) and queue occupancy rates.
See also
- Example: Pipeline Profiling
Illustrates how to run profiling and how to interpret the result.
- Parameters:
cfg – Pipeline configuration containing source, pipes, and sink definitions.
num_inputs – The number of source items to use for profiling each stage.
callback – Optional function that, if provided, will be called with the profiling result for each pipeline stage after it is benchmarked. This allows for custom handling or logging of profiling results as they are produced.
hook – Optional hook object, which can be used to execute custom code before and after each stage and pipeline profiling.
- Returns:
List of ProfileResult objects, one per pipeline stage.
Example
from spdl.pipeline import PipelineConfig, SourceConfig, SinkConfig, Pipe, profile_pipeline # Define a simple data processing function def double_value(x): return x * 2 def square_value(x): return x ** 2 # Create pipeline configuration pipeline_config = PipelineConfig( src=SourceConfig(range(1000)), # Source with 1000 integers pipes=[ Pipe(double_value, concurrency=4, name="double"), Pipe(square_value, concurrency=2, name="square") ], sink=SinkConfig(buffer_size=10) ) # Profile the pipeline results = profile_pipeline(pipeline_config, num_inputs=500) # The results list contains ProfileResult objects for each pipe stage for result in results: print(f"Stage: {result.name}") for stat in result.stats: print(f" Concurrency {stat.concurrency}: " f"QPS={stat.qps:.2f}, " f"Occupancy={stat.occupancy_rate:.2f}") # Example output: # Stage: double # Concurrency 32: QPS=1250.45, Occupancy=0.85 # Concurrency 16: QPS=1180.32, Occupancy=0.78 # Concurrency 8: QPS=1050.21, Occupancy=0.65 # Concurrency 4: QPS=850.12, Occupancy=0.45 # Concurrency 1: QPS=320.88, Occupancy=0.25 # Stage: square # Concurrency 32: QPS=2100.67, Occupancy=0.92 # Concurrency 16: QPS=1980.55, Occupancy=0.88 # Concurrency 8: QPS=1750.33, Occupancy=0.75 # Concurrency 4: QPS=1200.44, Occupancy=0.55 # Concurrency 1: QPS=450.22, Occupancy=0.30