spdl.pipeline.profile_pipeline

profile_pipeline(cfg: PipelineConfig[T, U], num_inputs: int = 1000, *, callback: Callable[[ProfileResult], None] | None = None, hook: ProfileHook | None = None) Sequence[ProfileResult][source]

[Experimental] Profile pipeline by running pipes separately while changing the concurrency, measuring performance and logging results.

This function benchmarks each pipeline stage independently across different concurrency levels (32, 16, 8, 4, 1) to identify optimal performance settings. It measures both throughput (QPS) and queue occupancy rates.

See also

Example: Pipeline Profiling

Illustrates how to run profiling and how to interpret the result.

Parameters:
  • cfg – Pipeline configuration containing source, pipes, and sink definitions.

  • num_inputs – The number of source items to use for profiling each stage.

  • callback – Optional function that, if provided, will be called with the profiling result for each pipeline stage after it is benchmarked. This allows for custom handling or logging of profiling results as they are produced.

  • hook – Optional hook object, which can be used to execute custom code before and after each stage and pipeline profiling.

Returns:

List of ProfileResult objects, one per pipeline stage.

Example

from spdl.pipeline import PipelineConfig, SourceConfig, SinkConfig, Pipe, profile_pipeline

# Define a simple data processing function
def double_value(x):
    return x * 2

def square_value(x):
    return x ** 2

# Create pipeline configuration
pipeline_config = PipelineConfig(
    src=SourceConfig(range(1000)),  # Source with 1000 integers
    pipes=[
        Pipe(double_value, concurrency=4, name="double"),
        Pipe(square_value, concurrency=2, name="square")
    ],
    sink=SinkConfig(buffer_size=10)
)

# Profile the pipeline
results = profile_pipeline(pipeline_config, num_inputs=500)

# The results list contains ProfileResult objects for each pipe stage
for result in results:
    print(f"Stage: {result.name}")
    for stat in result.stats:
        print(f"  Concurrency {stat.concurrency}: "
              f"QPS={stat.qps:.2f}, "
              f"Occupancy={stat.occupancy_rate:.2f}")

# Example output:
# Stage: double
#   Concurrency 32: QPS=1250.45, Occupancy=0.85
#   Concurrency 16: QPS=1180.32, Occupancy=0.78
#   Concurrency 8: QPS=1050.21, Occupancy=0.65
#   Concurrency 4: QPS=850.12, Occupancy=0.45
#   Concurrency 1: QPS=320.88, Occupancy=0.25
# Stage: square
#   Concurrency 32: QPS=2100.67, Occupancy=0.92
#   Concurrency 16: QPS=1980.55, Occupancy=0.88
#   Concurrency 8: QPS=1750.33, Occupancy=0.75
#   Concurrency 4: QPS=1200.44, Occupancy=0.55
#   Concurrency 1: QPS=450.22, Occupancy=0.30