Benchmark thread output queue

Benchmark: SPDL pipeline handoff latency with and without thread output queue.

Compares main-thread get_item latency between the default asyncio-based handoff (run_coroutine_threadsafe + Future.result polling) and the queue.Queue-based handoff (direct queue.Queue.get).

The benchmark emulates a realistic training loop: each iteration the foreground thread receives a batch, does simulated work (busy-wait to mimic a GPU forward/backward pass), and then times how long the next get_item call takes. When the simulated work is long enough the producer has time to pre-fill the queue, so the handoff latency isolates the cross-thread scheduling overhead.

Foreground work is swept from 0ms to 30ms in 3ms steps.

Usage:

buck2 run //spdl/examples:benchmark_thread_output_queue

Example results (devserver, no GPU, 500 lightweight int items):

FG work   default (p50)   default (p99)   thread_q (p50)   thread_q (p99)
-------   -------------   -------------   --------------   --------------
  0ms         199us           625us           116us            313us
  3ms         246us           642us           223us            646us
  6ms         232us           532us           190us            555us
  9ms         287us           485us           153us            549us
 12ms         280us           822us            14us            464us
 15ms         221us           396us             8us            385us
 18ms         227us           431us             9us             70us
 21ms         224us           450us            11us             41us
 24ms         240us           533us            12us             34us
 27ms         227us           470us            12us             27us
 30ms         242us           512us            12us             26us

The default asyncio path stays flat at ~220-280us regardless of overlap time — that is the fixed run_coroutine_threadsafe scheduling tax. The thread output queue drops to ~8-14us once the foreground work is long enough (>= ~12ms on this machine) for the producer to pre-fill it.

Source

Source

Click here to see the source.
  1# Copyright (c) Meta Platforms, Inc. and affiliates.
  2# All rights reserved.
  3#
  4# This source code is licensed under the BSD-style license found in the
  5# LICENSE file in the root directory of this source tree.
  6
  7# pyre-strict
  8
  9"""Benchmark: SPDL pipeline handoff latency with and without thread output queue.
 10
 11Compares main-thread ``get_item`` latency between the default asyncio-based
 12handoff (``run_coroutine_threadsafe`` + ``Future.result`` polling) and the
 13``queue.Queue``-based handoff (direct ``queue.Queue.get``).
 14
 15The benchmark emulates a realistic training loop: each iteration the
 16foreground thread receives a batch, does simulated work (busy-wait to
 17mimic a GPU forward/backward pass), and then times how long the *next*
 18``get_item`` call takes.  When the simulated work is long enough the
 19producer has time to pre-fill the queue, so the handoff latency
 20isolates the cross-thread scheduling overhead.
 21
 22Foreground work is swept from 0ms to 30ms in 3ms steps.
 23
 24Usage::
 25
 26    buck2 run //spdl/examples:benchmark_thread_output_queue
 27
 28Example results (devserver, no GPU, 500 lightweight int items)::
 29
 30    FG work   default (p50)   default (p99)   thread_q (p50)   thread_q (p99)
 31    -------   -------------   -------------   --------------   --------------
 32      0ms         199us           625us           116us            313us
 33      3ms         246us           642us           223us            646us
 34      6ms         232us           532us           190us            555us
 35      9ms         287us           485us           153us            549us
 36     12ms         280us           822us            14us            464us
 37     15ms         221us           396us             8us            385us
 38     18ms         227us           431us             9us             70us
 39     21ms         224us           450us            11us             41us
 40     24ms         240us           533us            12us             34us
 41     27ms         227us           470us            12us             27us
 42     30ms         242us           512us            12us             26us
 43
 44The default asyncio path stays flat at ~220-280us regardless of overlap
 45time — that is the fixed ``run_coroutine_threadsafe`` scheduling tax.
 46The thread output queue drops to ~8-14us once the foreground work is
 47long enough (>= ~12ms on this machine) for the producer to pre-fill it.
 48"""
 49
 50from __future__ import annotations
 51
 52import statistics
 53import time
 54from dataclasses import dataclass, field
 55
 56from spdl.pipeline import PipelineBuilder
 57
 58__all__ = ["BenchResult", "main"]
 59
 60
 61@dataclass
 62class BenchResult:
 63    """Collected latency measurements for a single benchmark run."""
 64
 65    name: str
 66    latencies_us: list[float] = field(default_factory=list)
 67
 68    @property
 69    def p50(self) -> float:
 70        s = sorted(self.latencies_us)
 71        return s[len(s) // 2]
 72
 73    @property
 74    def p95(self) -> float:
 75        s = sorted(self.latencies_us)
 76        return s[int(len(s) * 0.95)]
 77
 78    @property
 79    def p99(self) -> float:
 80        s = sorted(self.latencies_us)
 81        return s[int(len(s) * 0.99)]
 82
 83    @property
 84    def mean(self) -> float:
 85        return statistics.mean(self.latencies_us)
 86
 87    def summary(self) -> str:
 88        """Return a one-line summary string with mean/p50/p95/p99."""
 89        return (
 90            f"{self.name:<50s}  "
 91            f"mean={self.mean:>9.1f}us  "
 92            f"p50={self.p50:>9.1f}us  "
 93            f"p95={self.p95:>9.1f}us  "
 94            f"p99={self.p99:>9.1f}us  "
 95            f"({len(self.latencies_us)} meas)"
 96        )
 97
 98
 99def _busy_wait_us(us: float) -> None:
100    """Busy-wait for *us* microseconds (spin-loop)."""
101    deadline = time.perf_counter() + us / 1e6
102    while time.perf_counter() < deadline:
103        pass
104
105
106def _run_bench(
107    name: str,
108    n_items: int,
109    buffer_size: int,
110    use_thread_output_queue: bool,
111    work_us: float,
112    warmup: int = 20,
113) -> BenchResult:
114    """Run a single benchmark: source -> sink pipeline with foreground work.
115
116    Builds a trivial pipeline (source of *n_items* integers -> sink) and
117    iterates it.  Between each consumed item the foreground thread
118    busy-waits for *work_us* microseconds to simulate consumer-side
119    compute (e.g. a GPU training step).  After warmup, the time spent
120    inside each ``get_item`` call is recorded.
121
122    Args:
123        name: Human-readable label for the result.
124        n_items: Total number of items the source produces.
125        buffer_size: Sink buffer size (number of slots).
126        use_thread_output_queue: Whether to use the thread output queue handoff.
127        work_us: Duration of simulated foreground work in microseconds.
128        warmup: Number of items to consume before recording latencies.
129    """
130    items = list(range(n_items))
131    pipeline = (
132        PipelineBuilder()
133        .add_source(iter(items))
134        .add_sink(buffer_size=buffer_size)
135        .build(num_threads=2, use_thread_output_queue=use_thread_output_queue)
136    )
137
138    result = BenchResult(name=name)
139    consumed = 0
140    for _ in pipeline.get_iterator(timeout=60):
141        consumed += 1
142
143        # Simulate foreground work (e.g. GPU fwd+bwd).
144        # This gives the producer time to pre-fill the queue
145        # so the next get_item measures pure handoff overhead.
146        _busy_wait_us(work_us)
147
148        if consumed <= warmup:
149            continue
150
151        # Time the next get_item call — this is what we're measuring.
152        t0 = time.perf_counter()
153        try:
154            next(pipeline.get_iterator(timeout=60))
155        except StopIteration:
156            break
157        result.latencies_us.append((time.perf_counter() - t0) * 1e6)
158        consumed += 1
159
160        # Do work after the timed item too, so the *next* iteration's
161        # measurement also has overlap time.
162        _busy_wait_us(work_us)
163
164    return result
165
166
167def main() -> None:
168    """Run the full benchmark sweep and print results."""
169    n_items = 500
170    buffer_size = 8
171
172    print("SPDL Pipeline Thread Output Queue Handoff Benchmark")
173    print("=" * 90)
174
175    results: list[BenchResult] = []
176
177    work_ms_values = list(range(0, 31, 3))
178    for work_ms in work_ms_values:
179        work_us = work_ms * 1000.0
180        label = f"{work_ms}ms foreground work"
181        print(f"\n--- {label} ({n_items} items, buffer_size={buffer_size}) ---")
182
183        for use_toq, tag in [
184            (False, "default asyncio"),
185            (True, "thread output queue"),
186        ]:
187            r = _run_bench(
188                f"{tag}, {label}",
189                n_items,
190                buffer_size,
191                use_thread_output_queue=use_toq,
192                work_us=work_us,
193            )
194            results.append(r)
195            print(f"  {r.summary()}")
196
197    print(f"\n{'=' * 90}")
198    print("SUMMARY — main thread get_item() latency (lower is better)")
199    print(f"{'=' * 90}")
200    for r in results:
201        if r.latencies_us:
202            print(f"  {r.summary()}")
203
204
205if __name__ == "__main__":
206    main()

API Reference

Functions

main() None[source]

Run the full benchmark sweep and print results.

Classes

class BenchResult(name: str, latencies_us: list[float] = <factory>)[source]

Collected latency measurements for a single benchmark run.

latencies_us: list[float]
property mean: float
name: str
property p50: float
property p95: float
property p99: float
summary() str

Return a one-line summary string with mean/p50/p95/p99.