Performance analysis

This examlple shows how to collect runtime performance statistics using TensorBoard.

An example of how stats shown using TensorBoard UI.

Note

To learn how to interpret the performance statistics please refer to Optimization Guide.

The Pipeline class can collect runtime statistics and periodically publish it. This example shows how to write a callback function for publishing the stats, and how to attach the callback to the Pipeline object.

The performance stats are exposed as TaskPerfStats and QueuePerfStats classes.

You can add custom callbacks by following steps.

For QueuePerfStats:

  1. Subclass StatsQueue and override StatsQueue.interval_stats_callback().

  2. In the interval_stats_callback method, save the fields of QueuePerfStats to somewhere you can access later.

  3. Provide the new class to PipelineBuilder.build() method.

Similarly for TaskPerfStats

  1. Subclass TaskStatsHook and override TaskStatsHook.interval_stats_callback().

  2. In the interval_stats_callback method, save the fields of TaskPerfStats to somewhere you can access later.

  3. Create a factory function that takes a name of the stage function and return a list of TaskHook-s applied to the stage.

  4. Provide the factory function to PipelineBuilder.build() method.

Source

Source

Click here to see the source.
  1#!/usr/bin/env python3
  2# Copyright (c) Meta Platforms, Inc. and affiliates.
  3# All rights reserved.
  4#
  5# This source code is licensed under the BSD-style license found in the
  6# LICENSE file in the root directory of this source tree.
  7
  8"""This examlple shows how to collect runtime performance statistics using TensorBoard.
  9
 10.. image:: ../_static/data/example_performance_analysis_tensorboard.png
 11   :alt: An example of how stats shown using TensorBoard UI.
 12
 13.. note::
 14
 15   To learn how to interpret the performance statistics please refer to
 16   `Optimization Guide <../performance_analysis/index.html>`_.
 17
 18.. py:currentmodule:: spdl.pipeline
 19
 20The :py:class:`Pipeline` class can collect runtime statistics
 21and periodically publish it.
 22This example shows how to write a callback function for publishing the stats, and
 23how to attach the callback to the Pipeline object.
 24
 25The performance stats are exposed as :py:class:`TaskPerfStats` and
 26:py:class:`QueuePerfStats` classes.
 27
 28You can add custom callbacks by following steps.
 29
 30For :py:class:`QueuePerfStats`:
 31
 32#. Subclass :py:class:`StatsQueue` and
 33   override :py:meth:`StatsQueue.interval_stats_callback`.
 34#. In the ``interval_stats_callback`` method,
 35   save the fields of ``QueuePerfStats`` to somewhere you can access later.
 36#. Provide the new class to :py:meth:`PipelineBuilder.build` method.
 37
 38Similarly for :py:class:`TaskPerfStats`
 39
 40#. Subclass :py:class:`TaskStatsHook` and
 41   override :py:meth:`TaskStatsHook.interval_stats_callback`.
 42#. In the ``interval_stats_callback`` method,
 43   save the fields of ``TaskPerfStats`` to somewhere you can access later.
 44#. Create a factory function that takes a name of the stage function and
 45   return a list of :py:class:`TaskHook`-s applied to the stage.
 46#. Provide the factory function to :py:meth:`PipelineBuilder.build` method.
 47"""
 48
 49import argparse
 50import asyncio
 51import contextlib
 52import logging
 53import time
 54from collections.abc import Iterator
 55from functools import partial
 56from pathlib import Path
 57
 58import spdl.io
 59import torch
 60from spdl.pipeline import (
 61    Pipeline,
 62    PipelineBuilder,
 63    QueuePerfStats,
 64    StatsQueue,
 65    TaskHook,
 66    TaskPerfStats,
 67    TaskStatsHook,
 68)
 69from torch.utils.tensorboard import SummaryWriter
 70
 71__all__ = [
 72    "parse_args",
 73    "main",
 74    "build_pipeline",
 75    "decode",
 76    "CustomTaskHook",
 77    "CustomQueue",
 78]
 79
 80# pyre-strict
 81
 82
 83def parse_args() -> argparse.Namespace:
 84    """Parse command line arguments."""
 85    parser = argparse.ArgumentParser(
 86        description=__doc__,
 87    )
 88    parser.add_argument(
 89        "--dataset-dir",
 90        type=Path,
 91        required=True,
 92    )
 93    parser.add_argument(
 94        "--log-dir",
 95        type=Path,
 96    )
 97    parser.add_argument(
 98        "--log-interval",
 99        type=float,
100        default=120,
101    )
102    return parser.parse_args()
103
104
105class CustomTaskHook(TaskStatsHook):
106    """Extend the :py:class:`~spdl.pipeline.TaskStatsHook` to add logging to TensorBoard.
107
108    This class extends the :py:class:`~spdl.pipeline.TaskStatsHook`, and periodically
109    writes the performance statistics to TensorBoard.
110
111    Args:
112        name, interval: See :py:class:`spdl.pipeline.TaskStatsHook`
113        writer: TensorBoard summary writer object.
114    """
115
116    def __init__(
117        self,
118        name: str,
119        interval: float = -1,
120        *,
121        writer: SummaryWriter,
122    ) -> None:
123        super().__init__(name=name, interval=interval)
124        self._writer = writer
125        self._step = -1
126
127    async def interval_stats_callback(self, stats: TaskPerfStats) -> None:
128        """Log the performance statistics to TensorBoard.
129
130        Args:
131            stats: See :py:meth:`spdl.pipeline.TaskStatsHook.interval_stats_callback`.
132        """
133        await super().interval_stats_callback(stats)
134
135        self._step += 1
136        walltime = time.time()
137        vals = {
138            "pipeline_task/ave_time": stats.ave_time,
139            "pipeline_task/num_invocations": stats.num_tasks,
140            "pipeline_task/num_failures": stats.num_failures,
141        }
142        await asyncio.get_running_loop().run_in_executor(
143            None, _log, self._writer, self.name, vals, self._step, walltime
144        )
145
146
147class CustomQueue(StatsQueue):
148    """Extend the :py:class:`~spdl.pipeline.StatsQueue` to add logging TensorBoard.
149
150
151    This class extends the :py:class:`~spdl.pipeline.TaskStatsHook`, and periodically
152    writes the performance statistics to TensorBoard.
153
154    Args:
155        name, buffer_size, interval: See :py:class:`spdl.pipeline.StatsQueue`
156        writer: TensorBoard summary writer object.
157    """
158
159    def __init__(
160        self,
161        name: str,
162        buffer_size: int = 1,
163        interval: float = -1,
164        *,
165        writer: SummaryWriter,
166    ) -> None:
167        super().__init__(name=name, buffer_size=buffer_size, interval=interval)
168        self._writer = writer
169        self._step = -1
170
171    async def interval_stats_callback(self, stats: QueuePerfStats) -> None:
172        """Log the performance statistics to TensorBoard.
173
174        Args:
175            stats: See :py:meth:`spdl.pipeline.StatsQueue.interval_stats_callback`.
176        """
177        await super().interval_stats_callback(stats)
178
179        self._step += 1
180        walltime = time.time()
181
182        vals = {
183            "pipeline_queue/qps": stats.qps,
184            "pipeline_queue/ave_put_time": stats.ave_put_time,
185            "pipeline_queue/ave_get_time": stats.ave_get_time,
186            "pipeline_queue/occupancy_rate": stats.occupancy_rate,
187        }
188        await asyncio.get_running_loop().run_in_executor(
189            None, _log, self._writer, self.name, vals, self._step, walltime
190        )
191
192
193def _log(
194    writer: SummaryWriter,
195    name: str,
196    vals: dict[str, float],
197    step: int | None,
198    walltime: float,
199) -> None:
200    for k, v in vals.items():
201        writer.add_scalars(k, {name: v}, global_step=step, walltime=walltime)
202
203
204def decode(path: Path, width: int = 128, height: int = 128) -> torch.Tensor:
205    """Decode the video from the given path with rescaling.
206
207    Args:
208        path: The path to the video file.
209        width,height: The resolution of video after rescaling.
210
211    Returns:
212        Uint8 tensor in shape of ``[N, C, H, W]``: Video frames in Tensor.
213    """
214    packets = spdl.io.demux_video(path)
215    frames = spdl.io.decode_packets(
216        packets,
217        filter_desc=spdl.io.get_filter_desc(
218            packets,
219            scale_width=width,
220            scale_height=height,
221            pix_fmt="rgb24",
222        ),
223    )
224    buffer = spdl.io.convert_frames(frames)
225    return spdl.io.to_torch(buffer).permute(0, 2, 3, 1)
226
227
228def build_pipeline(
229    source: Iterator[Path],
230    writer: SummaryWriter,
231    log_interval: float,
232    concurrency: int,
233) -> Pipeline:
234    """Build the pipeline using :py:class`CustomTaskHook`, and :py:class:`CustomQueue`.
235
236    Args:
237        source: A data source.
238        writer: A TensorBoard SummaryWriter object.
239        log_interval: The interval (in second) the performance data is saved.
240        concurrency: The concurrency for video decoding.
241    """
242
243    def hook_factory(name: str) -> list[TaskHook]:
244        return [CustomTaskHook(name=name, interval=log_interval, writer=writer)]
245
246    return (
247        PipelineBuilder()
248        .add_source(source=source)
249        .pipe(decode, concurrency=concurrency)
250        .add_sink()
251        .build(
252            num_threads=concurrency,
253            queue_class=partial(
254                CustomQueue,
255                writer=writer,
256                interval=log_interval,
257            ),
258            task_hook_factory=hook_factory,
259        )
260    )
261
262
263def main() -> None:
264    """The main entry point for the example."""
265    logging.basicConfig(level=logging.INFO)
266    args = parse_args()
267    source = args.dataset_dir.rglob("*.mp4")
268    with contextlib.closing(SummaryWriter(args.log_dir)) as writer:
269        pipeline = build_pipeline(
270            source=source,
271            writer=writer,
272            log_interval=args.log_interval,
273            concurrency=8,
274        )
275        with pipeline.auto_stop():
276            for _ in pipeline:
277                pass
278
279
280if __name__ == "__main__":
281    main()

Functions

Functions

parse_args() Namespace[source]

Parse command line arguments.

main() None[source]

The main entry point for the example.

build_pipeline(source: Iterator[Path], writer: SummaryWriter, log_interval: float, concurrency: int) Pipeline[source]

Build the pipeline using :py:class`CustomTaskHook`, and CustomQueue.

Parameters:
  • source – A data source.

  • writer – A TensorBoard SummaryWriter object.

  • log_interval – The interval (in second) the performance data is saved.

  • concurrency – The concurrency for video decoding.

decode(path: Path, width: int = 128, height: int = 128) Tensor[source]

Decode the video from the given path with rescaling.

Parameters:
  • path – The path to the video file.

  • width – The resolution of video after rescaling.

  • height – The resolution of video after rescaling.

Returns:

Video frames in Tensor.

Return type:

Uint8 tensor in shape of [N, C, H, W]

Classes

Classes

class CustomTaskHook(name: str, interval: float = -1, *, writer: SummaryWriter)[source]

Extend the TaskStatsHook to add logging to TensorBoard.

This class extends the TaskStatsHook, and periodically writes the performance statistics to TensorBoard.

Parameters:
async interval_stats_callback(stats: TaskPerfStats) None[source]

Log the performance statistics to TensorBoard.

Parameters:

stats – See spdl.pipeline.TaskStatsHook.interval_stats_callback().

class CustomQueue(name: str, buffer_size: int = 1, interval: float = -1, *, writer: SummaryWriter)[source]

Extend the StatsQueue to add logging TensorBoard.

This class extends the TaskStatsHook, and periodically writes the performance statistics to TensorBoard.

Parameters:
async interval_stats_callback(stats: QueuePerfStats) None[source]

Log the performance statistics to TensorBoard.

Parameters:

stats – See spdl.pipeline.StatsQueue.interval_stats_callback().