Which stage is the bottleneck?¶
By default, when a pipe stage is created, TaskStatsHook
is attached to the stage.
This hook collects runtime statistics of the stage and report them at the end.
We can use this report to determine which stage in the pipeline is being the bottleneck
thus requires improvement.
Task Stats and QPS¶
Let’s say, we have a pipeline that downloads data from remote storage system and process the data.
>>> pipeline = (
... PipelineBuilder()
... .add_source(sample_list)
... .pipe(download, concurrency=32)
... .pipe(process, concurrency=128)
... .add_sink(3)
... .build(num_threads=128)
... )
We execute the pipeline without downstream load, and we obtain the following log.
>>> with pipeline.auto_stop():
... for data in pipeline:
... pass
[download] Completed 2559 tasks (163 failed) in 17.3974 [sec]. QPS: 137.72 (Concurrency: 32). Average task time: 128.7163 [ ms].
[process] Completed 2396 tasks ( 0 failed) in 18.9318 [sec]. QPS: 126.56 (Concurrency: 128). Average task time: 896.3629 [ ms].
TaskStatsHook
collects the number of tasks (items processed by the stage), the average task execution time and the time elapsed between the start and the end of the stage.
Note
The average task time does not include the failed tasks, however, the total duration of the stage include everything from task execution to the time it waited on input/output queues.
QPS
(queries per second, though it is not query) is the number of tasks successfully completed, divided by the duration of the stage. It is an indication of how fast the stage is processing items.
From source to the sink, QPS
can only decrease, as it is impossible for downstream stages to process items faster than upstream stages.
If a downstream stage is fast enough to catch up its upstream stage, then the QPS values are roughly the same between these stages.
If a downstream stage is not as fast as its upstream stage, then the QPS value of the downstream stage is smaller than that of the upstream stage.
Therefore, by locating the stage at which QPS drops significantly, we can determine the stage which is the bottleneck within the pipeline.
In the above example, QPS drops from 137 to 126 in the process
stage. This indicates that data processing is not up to the speed of data acquisition. To optimize the pipeline throughput, one needs to improve the performance of the process
stage. Increasing the throughput of the download
stage does not help. How to optimize the process
stage depends on the other factors. For example if the machine executing the pipeline has spare computation resource, then increasing the concurrency and the number of threads can help.
Tuning Pipeline Performance¶
Let’s look at another example.
This time, we download images from the remote source, decode, batch and send them to GPU device. We create decoding/pre-processing and batch stage using spdl.io
.
>>> def decode(width=224, height=224, pix_fmt="rgb24") -> Callable:
... """Decode and resize image from byte string"""
... filter_desc = spdl.io.get_video_filter_desc(
... scale_width=width, scale_height=height, pix_fmt=pix_fmt
... )
...
... async def decode(data: bytes) -> FFmpegFrames:
... packets = await spdl.io.async_demux_image(data)
... frames = await spdl.io.async_decode_packets(packets, filter_desc=filter_desc)
... return frames
...
... return decode
>>> async def batchify(frames: list[FFmpegFrames]) -> torch.Tensor:
... """Create a batch from image frames, send them to GPU and create Torch tensor"""
... cfg = spdl.io.cuda_config(device_index=0)
... cpu_buffer = await spdl.io.async_convert_frames(frames)
... cuda_buffer = await spdl.io.async_transfer_buffer(cpu_buffer, cuda_config=cfg)
...
... return routes, spdl.io.to_torch(cuda_buffer)
>>> def run_pipeline(dl_concurrency, decode_concurrency):
... pipeline = (
... PipelineBuilder()
... .add_source(src)
... .pipe(download, concurrency=dl_concurrency)
... .pipe(decode(), concurrency=decode_concurrency)
... .aggregate(32)
... .pipe(batchify)
... .add_sink(10)
... .build(num_threads=decode_concurrency)
... )
... with pipeline.auto_stop():
... for item in pipeline:
... pass
Now we run the pipeline with different concurrency values for downloading and decoding. (You can skip the raw result and go to the summary table bellow.)
>>> run_pipeline(64, 4)
[download] Completed 1600 tasks ( 0 failed) in 6.2723 [sec]. QPS: 255.09 (Concurrency: 64). Average task time: 240.2614 [ ms].
[decode_image] Completed 1600 tasks ( 0 failed) in 6.2763 [sec]. QPS: 254.93 (Concurrency: 4). Average task time: 3.8516 [ ms].
[aggregate(32, drop_last=False)] Completed 1601 tasks ( 0 failed) in 6.2786 [sec]. QPS: 254.99 (Concurrency: 1). Average task time: 0.0038 [ ms].
[batchify] Completed 50 tasks ( 0 failed) in 6.2790 [sec]. QPS: 7.96 (Concurrency: 1). Average task time: 1.6127 [ ms].
[sink] Processed 50 items in 6.2799 [sec]. QPS: 7.96. Average wait time: Upstream: 123.1203 [ms ], Downstream: 0.0043 [ms ].
>>> run_pipeline(128, 4)
[download] Completed 1600 tasks ( 0 failed) in 2.0347 [sec]. QPS: 786.36 (Concurrency: 128). Average task time: 139.2847 [ ms].
[decode_image] Completed 1600 tasks ( 0 failed) in 2.0385 [sec]. QPS: 784.89 (Concurrency: 4). Average task time: 4.1260 [ ms].
[aggregate(32, drop_last=False)] Completed 1601 tasks ( 0 failed) in 2.0477 [sec]. QPS: 781.87 (Concurrency: 1). Average task time: 0.0039 [ ms].
[batchify] Completed 50 tasks ( 0 failed) in 2.0522 [sec]. QPS: 24.36 (Concurrency: 1). Average task time: 2.6672 [ ms].
[sink] Processed 50 items in 2.0529 [sec]. QPS: 24.36. Average wait time: Upstream: 40.2405 [ms ], Downstream: 0.0040 [ms ].
>>> run_pipeline(256, 4)
[download] Completed 1600 tasks ( 0 failed) in 1.8855 [sec]. QPS: 848.57 (Concurrency: 256). Average task time: 146.4174 [ ms].
[decode_image] Completed 1600 tasks ( 0 failed) in 1.8907 [sec]. QPS: 846.25 (Concurrency: 4). Average task time: 4.3023 [ ms].
[aggregate(32, drop_last=False)] Completed 1601 tasks ( 0 failed) in 1.8935 [sec]. QPS: 845.52 (Concurrency: 1). Average task time: 0.0038 [ ms].
[batchify] Completed 50 tasks ( 0 failed) in 1.8942 [sec]. QPS: 26.40 (Concurrency: 1). Average task time: 2.6016 [ ms].
[sink] Processed 50 items in 1.8945 [sec]. QPS: 26.39. Average wait time: Upstream: 37.1349 [ms ], Downstream: 0.0039 [ms ].
>>> run_pipeline(512, 4)
[download] Completed 1600 tasks ( 0 failed) in 1.9942 [sec]. QPS: 802.31 (Concurrency: 512). Average task time: 151.8697 [ ms].
[decode_image] Completed 1600 tasks ( 0 failed) in 1.9986 [sec]. QPS: 800.55 (Concurrency: 4). Average task time: 4.5500 [ ms].
[aggregate(32, drop_last=False)] Completed 1601 tasks ( 0 failed) in 2.0021 [sec]. QPS: 799.67 (Concurrency: 1). Average task time: 0.0038 [ ms].
[batchify] Completed 50 tasks ( 0 failed) in 2.0029 [sec]. QPS: 24.96 (Concurrency: 1). Average task time: 3.1626 [ ms].
[sink] Processed 50 items in 2.0037 [sec]. QPS: 24.95. Average wait time: Upstream: 39.2747 [ms ], Downstream: 0.0044 [ms ].
>>> run_pipeline(256, 8)
[download] Completed 1600 tasks ( 0 failed) in 1.3731 [sec]. QPS: 1165.27 (Concurrency: 256). Average task time: 152.5442 [ ms].
[decode_image] Completed 1600 tasks ( 0 failed) in 1.3768 [sec]. QPS: 1162.10 (Concurrency: 8). Average task time: 5.4827 [ ms].
[aggregate(32, drop_last=False)] Completed 1601 tasks ( 0 failed) in 1.3794 [sec]. QPS: 1160.61 (Concurrency: 1). Average task time: 0.0038 [ ms].
[batchify] Completed 50 tasks ( 0 failed) in 1.3806 [sec]. QPS: 36.22 (Concurrency: 1). Average task time: 3.1528 [ ms].
[sink] Processed 50 items in 1.3814 [sec]. QPS: 36.20. Average wait time: Upstream: 27.0740 [ms ], Downstream: 0.0041 [ms ].
>>> run_pipeline(256, 16)
[download] Completed 1600 tasks ( 0 failed) in 2.2429 [sec]. QPS: 713.36 (Concurrency: 256). Average task time: 154.0344 [ ms].
[decode_image] Completed 1600 tasks ( 0 failed) in 2.2661 [sec]. QPS: 706.06 (Concurrency: 16). Average task time: 14.4060 [ ms].
[aggregate(32, drop_last=False)] Completed 1601 tasks ( 0 failed) in 2.3514 [sec]. QPS: 680.86 (Concurrency: 1). Average task time: 0.0039 [ ms].
[batchify] Completed 50 tasks ( 0 failed) in 2.3610 [sec]. QPS: 21.18 (Concurrency: 1). Average task time: 15.5912 [ ms].
[sink] Processed 50 items in 2.3622 [sec]. QPS: 21.17. Average wait time: Upstream: 46.3030 [ms ], Downstream: 0.0041 [ms ].
>>> run_pipeline(256, 32)
[download] Completed 1600 tasks ( 0 failed) in 1.6766 [sec]. QPS: 954.30 (Concurrency: 256). Average task time: 156.4433 [ ms].
[decode_image] Completed 1600 tasks ( 0 failed) in 1.6815 [sec]. QPS: 951.55 (Concurrency: 32). Average task time: 18.5734 [ ms].
[aggregate(32, drop_last=False)] Completed 1601 tasks ( 0 failed) in 1.6862 [sec]. QPS: 949.48 (Concurrency: 1). Average task time: 0.0039 [ ms].
[batchify] Completed 50 tasks ( 0 failed) in 1.6866 [sec]. QPS: 29.64 (Concurrency: 1). Average task time: 15.2163 [ ms].
[sink] Processed 50 items in 1.6913 [sec]. QPS: 29.56. Average wait time: Upstream: 33.1498 [ms ], Downstream: 0.0041 [ms ].
The following table summarizes the above result.
Run |
Download Concurrency |
Decode Concurrency |
Download QPS |
Decoding QPS |
Sink QPS (normalized) |
---|---|---|---|---|---|
1 |
64 |
4 |
255.09 |
254.93 |
254.72 |
2 |
128 |
4 |
786.36 |
784.89 |
779.52 |
3 |
256 |
4 |
848.57 |
846.25 |
844.48 |
4 |
512 |
4 |
802.31 |
800.55 |
798.40 |
5 |
256 |
8 |
1165.27 |
1162.10 |
1158.40 |
6 |
256 |
16 |
713.36 |
706.06 |
677.44 |
7 |
256 |
32 |
954.30 |
951.55 |
945.92 |
Looking at the first pipeline (Run 1
), we do not see a significant QPS drop in the stages.
It is around 241 at the beginning and the at the end of the pipeline.
This suggests that the first stage (download) is dominating the QPS of the whole pipeline.
So we increase the download concurrency.
As we increase the concurrency of download (Run 2 - 4
), QPS increases, but QPS is saturated
around 800.
Because the pipeline is automatically blocked according to the performance of the downstream,
we tweak the concurrency of decoding.
Increasing the decode concurrency from 4 to 8 (Run 5
), the QPS increases further more, but
it drops again beyond 16 (Run 6, 7
).
Summary¶
When running Pipeline
, TaskStatsHook
: provides runtime statistics of stages. This information is helpful when determining which part of the pipeline should be optimized.