Performance analysis¶
This examlple shows how to collect runtime performance statistics using TensorBoard.

Note
To learn how to interpret the performance statistics please refer to Optimization Guide.
The Pipeline
class can collect runtime statistics
and periodically publish it.
This example shows how to write a callback function for publishing the stats, and
how to attach the callback to the Pipeline object.
The performance stats are exposed as TaskPerfStats
and
QueuePerfStats
classes.
You can add custom callbacks by following steps.
For QueuePerfStats
:
Subclass
StatsQueue
and overrideStatsQueue.interval_stats_callback()
.In the
interval_stats_callback
method, save the fields ofQueuePerfStats
to somewhere you can access later.Provide the new class to
PipelineBuilder.build()
method.
Similarly for TaskPerfStats
Subclass
TaskStatsHook
and overrideTaskStatsHook.interval_stats_callback()
.In the
interval_stats_callback
method, save the fields ofTaskPerfStats
to somewhere you can access later.Create a factory function that takes a name of the stage function and return a list of
TaskHook
-s applied to the stage.Provide the factory function to
PipelineBuilder.build()
method.
Source¶
Source
Click here to see the source.
1#!/usr/bin/env python3
2# Copyright (c) Meta Platforms, Inc. and affiliates.
3# All rights reserved.
4#
5# This source code is licensed under the BSD-style license found in the
6# LICENSE file in the root directory of this source tree.
7
8"""This examlple shows how to collect runtime performance statistics using TensorBoard.
9
10.. image:: ../_static/data/example_performance_analysis_tensorboard.png
11 :alt: An example of how stats shown using TensorBoard UI.
12
13.. note::
14
15 To learn how to interpret the performance statistics please refer to
16 `Optimization Guide <../performance_analysis/index.html>`_.
17
18.. py:currentmodule:: spdl.pipeline
19
20The :py:class:`Pipeline` class can collect runtime statistics
21and periodically publish it.
22This example shows how to write a callback function for publishing the stats, and
23how to attach the callback to the Pipeline object.
24
25The performance stats are exposed as :py:class:`TaskPerfStats` and
26:py:class:`QueuePerfStats` classes.
27
28You can add custom callbacks by following steps.
29
30For :py:class:`QueuePerfStats`:
31
32#. Subclass :py:class:`StatsQueue` and
33 override :py:meth:`StatsQueue.interval_stats_callback`.
34#. In the ``interval_stats_callback`` method,
35 save the fields of ``QueuePerfStats`` to somewhere you can access later.
36#. Provide the new class to :py:meth:`PipelineBuilder.build` method.
37
38Similarly for :py:class:`TaskPerfStats`
39
40#. Subclass :py:class:`TaskStatsHook` and
41 override :py:meth:`TaskStatsHook.interval_stats_callback`.
42#. In the ``interval_stats_callback`` method,
43 save the fields of ``TaskPerfStats`` to somewhere you can access later.
44#. Create a factory function that takes a name of the stage function and
45 return a list of :py:class:`TaskHook`-s applied to the stage.
46#. Provide the factory function to :py:meth:`PipelineBuilder.build` method.
47"""
48
49import argparse
50import asyncio
51import contextlib
52import logging
53import time
54from collections.abc import Iterator
55from functools import partial
56from pathlib import Path
57
58import spdl.io
59import torch
60from spdl.pipeline import (
61 Pipeline,
62 PipelineBuilder,
63 QueuePerfStats,
64 StatsQueue,
65 TaskHook,
66 TaskPerfStats,
67 TaskStatsHook,
68)
69from torch.utils.tensorboard import SummaryWriter
70
71__all__ = [
72 "parse_args",
73 "main",
74 "build_pipeline",
75 "decode",
76 "CustomTaskHook",
77 "CustomQueue",
78]
79
80# pyre-strict
81
82
83def parse_args() -> argparse.Namespace:
84 """Parse command line arguments."""
85 parser = argparse.ArgumentParser(
86 description=__doc__,
87 )
88 parser.add_argument(
89 "--dataset-dir",
90 type=Path,
91 required=True,
92 )
93 parser.add_argument(
94 "--log-dir",
95 type=Path,
96 )
97 parser.add_argument(
98 "--log-interval",
99 type=float,
100 default=120,
101 )
102 return parser.parse_args()
103
104
105class CustomTaskHook(TaskStatsHook):
106 """Extend the :py:class:`~spdl.pipeline.TaskStatsHook` to add logging to TensorBoard.
107
108 This class extends the :py:class:`~spdl.pipeline.TaskStatsHook`, and periodically
109 writes the performance statistics to TensorBoard.
110
111 Args:
112 name, interval: See :py:class:`spdl.pipeline.TaskStatsHook`
113 writer: TensorBoard summary writer object.
114 """
115
116 def __init__(
117 self,
118 name: str,
119 interval: float = -1,
120 *,
121 writer: SummaryWriter,
122 ) -> None:
123 super().__init__(name=name, interval=interval)
124 self._writer = writer
125 self._step = -1
126
127 async def interval_stats_callback(self, stats: TaskPerfStats) -> None:
128 """Log the performance statistics to TensorBoard.
129
130 Args:
131 stats: See :py:meth:`spdl.pipeline.TaskStatsHook.interval_stats_callback`.
132 """
133 await super().interval_stats_callback(stats)
134
135 self._step += 1
136 walltime = time.time()
137 vals = {
138 "pipeline_task/ave_time": stats.ave_time,
139 "pipeline_task/num_invocations": stats.num_tasks,
140 "pipeline_task/num_failures": stats.num_failures,
141 }
142 await asyncio.get_running_loop().run_in_executor(
143 None, _log, self._writer, self.name, vals, self._step, walltime
144 )
145
146
147class CustomQueue(StatsQueue):
148 """Extend the :py:class:`~spdl.pipeline.StatsQueue` to add logging TensorBoard.
149
150
151 This class extends the :py:class:`~spdl.pipeline.TaskStatsHook`, and periodically
152 writes the performance statistics to TensorBoard.
153
154 Args:
155 name, buffer_size, interval: See :py:class:`spdl.pipeline.StatsQueue`
156 writer: TensorBoard summary writer object.
157 """
158
159 def __init__(
160 self,
161 name: str,
162 buffer_size: int = 1,
163 interval: float = -1,
164 *,
165 writer: SummaryWriter,
166 ) -> None:
167 super().__init__(name=name, buffer_size=buffer_size, interval=interval)
168 self._writer = writer
169 self._step = -1
170
171 async def interval_stats_callback(self, stats: QueuePerfStats) -> None:
172 """Log the performance statistics to TensorBoard.
173
174 Args:
175 stats: See :py:meth:`spdl.pipeline.StatsQueue.interval_stats_callback`.
176 """
177 await super().interval_stats_callback(stats)
178
179 self._step += 1
180 walltime = time.time()
181
182 vals = {
183 "pipeline_queue/qps": stats.qps,
184 "pipeline_queue/ave_put_time": stats.ave_put_time,
185 "pipeline_queue/ave_get_time": stats.ave_get_time,
186 "pipeline_queue/occupancy_rate": stats.occupancy_rate,
187 }
188 await asyncio.get_running_loop().run_in_executor(
189 None, _log, self._writer, self.name, vals, self._step, walltime
190 )
191
192
193def _log(
194 writer: SummaryWriter,
195 name: str,
196 vals: dict[str, float],
197 step: int | None,
198 walltime: float,
199) -> None:
200 for k, v in vals.items():
201 writer.add_scalars(k, {name: v}, global_step=step, walltime=walltime)
202
203
204def decode(path: Path, width: int = 128, height: int = 128) -> torch.Tensor:
205 """Decode the video from the given path with rescaling.
206
207 Args:
208 path: The path to the video file.
209 width,height: The resolution of video after rescaling.
210
211 Returns:
212 Uint8 tensor in shape of ``[N, C, H, W]``: Video frames in Tensor.
213 """
214 packets = spdl.io.demux_video(path)
215 frames = spdl.io.decode_packets(
216 packets,
217 filter_desc=spdl.io.get_filter_desc(
218 packets,
219 scale_width=width,
220 scale_height=height,
221 pix_fmt="rgb24",
222 ),
223 )
224 buffer = spdl.io.convert_frames(frames)
225 return spdl.io.to_torch(buffer).permute(0, 2, 3, 1)
226
227
228def build_pipeline(
229 source: Iterator[Path],
230 writer: SummaryWriter,
231 log_interval: float,
232 concurrency: int,
233) -> Pipeline:
234 """Build the pipeline using :py:class`CustomTaskHook`, and :py:class:`CustomQueue`.
235
236 Args:
237 source: A data source.
238 writer: A TensorBoard SummaryWriter object.
239 log_interval: The interval (in second) the performance data is saved.
240 concurrency: The concurrency for video decoding.
241 """
242
243 def hook_factory(name: str) -> list[TaskHook]:
244 return [CustomTaskHook(name=name, interval=log_interval, writer=writer)]
245
246 return (
247 PipelineBuilder()
248 .add_source(source=source)
249 .pipe(decode, concurrency=concurrency)
250 .add_sink()
251 .build(
252 num_threads=concurrency,
253 queue_class=partial(
254 CustomQueue,
255 writer=writer,
256 interval=log_interval,
257 ),
258 task_hook_factory=hook_factory,
259 )
260 )
261
262
263def main() -> None:
264 """The main entry point for the example."""
265 logging.basicConfig(level=logging.INFO)
266 args = parse_args()
267 source = args.dataset_dir.rglob("*.mp4")
268 with contextlib.closing(SummaryWriter(args.log_dir)) as writer:
269 pipeline = build_pipeline(
270 source=source,
271 writer=writer,
272 log_interval=args.log_interval,
273 concurrency=8,
274 )
275 with pipeline.auto_stop():
276 for _ in pipeline:
277 pass
278
279
280if __name__ == "__main__":
281 main()
Functions¶
Functions
- build_pipeline(source: Iterator[Path], writer: SummaryWriter, log_interval: float, concurrency: int) Pipeline [source]¶
Build the pipeline using :py:class`CustomTaskHook`, and
CustomQueue
.- Parameters:
source – A data source.
writer – A TensorBoard SummaryWriter object.
log_interval – The interval (in second) the performance data is saved.
concurrency – The concurrency for video decoding.
- decode(path: Path, width: int = 128, height: int = 128) Tensor [source]¶
Decode the video from the given path with rescaling.
- Parameters:
path – The path to the video file.
width – The resolution of video after rescaling.
height – The resolution of video after rescaling.
- Returns:
Video frames in Tensor.
- Return type:
Uint8 tensor in shape of
[N, C, H, W]
Classes¶
Classes
- class CustomTaskHook(name: str, interval: float = -1, *, writer: SummaryWriter)[source]¶
Extend the
TaskStatsHook
to add logging to TensorBoard.This class extends the
TaskStatsHook
, and periodically writes the performance statistics to TensorBoard.- Parameters:
name – See
spdl.pipeline.TaskStatsHook
interval – See
spdl.pipeline.TaskStatsHook
writer – TensorBoard summary writer object.
- async interval_stats_callback(stats: TaskPerfStats) None [source]¶
Log the performance statistics to TensorBoard.
- Parameters:
stats – See
spdl.pipeline.TaskStatsHook.interval_stats_callback()
.
- class CustomQueue(name: str, buffer_size: int = 1, interval: float = -1, *, writer: SummaryWriter)[source]¶
Extend the
StatsQueue
to add logging TensorBoard.This class extends the
TaskStatsHook
, and periodically writes the performance statistics to TensorBoard.- Parameters:
name – See
spdl.pipeline.StatsQueue
buffer_size – See
spdl.pipeline.StatsQueue
interval – See
spdl.pipeline.StatsQueue
writer – TensorBoard summary writer object.
- async interval_stats_callback(stats: QueuePerfStats) None [source]¶
Log the performance statistics to TensorBoard.
- Parameters:
stats – See
spdl.pipeline.StatsQueue.interval_stats_callback()
.