Benchmark thread output queue¶
Benchmark: SPDL pipeline handoff latency with and without thread output queue.
Compares main-thread get_item latency between the default asyncio-based
handoff (run_coroutine_threadsafe + Future.result polling) and the
queue.Queue-based handoff (direct queue.Queue.get).
The benchmark emulates a realistic training loop: each iteration the
foreground thread receives a batch, does simulated work (busy-wait to
mimic a GPU forward/backward pass), and then times how long the next
get_item call takes. When the simulated work is long enough the
producer has time to pre-fill the queue, so the handoff latency
isolates the cross-thread scheduling overhead.
Foreground work is swept from 0ms to 30ms in 3ms steps.
Usage:
buck2 run //spdl/examples:benchmark_thread_output_queue
Example results (devserver, no GPU, 500 lightweight int items):
FG work default (p50) default (p99) thread_q (p50) thread_q (p99)
------- ------------- ------------- -------------- --------------
0ms 199us 625us 116us 313us
3ms 246us 642us 223us 646us
6ms 232us 532us 190us 555us
9ms 287us 485us 153us 549us
12ms 280us 822us 14us 464us
15ms 221us 396us 8us 385us
18ms 227us 431us 9us 70us
21ms 224us 450us 11us 41us
24ms 240us 533us 12us 34us
27ms 227us 470us 12us 27us
30ms 242us 512us 12us 26us
The default asyncio path stays flat at ~220-280us regardless of overlap
time — that is the fixed run_coroutine_threadsafe scheduling tax.
The thread output queue drops to ~8-14us once the foreground work is
long enough (>= ~12ms on this machine) for the producer to pre-fill it.
Source¶
Source
Click here to see the source.
1# Copyright (c) Meta Platforms, Inc. and affiliates.
2# All rights reserved.
3#
4# This source code is licensed under the BSD-style license found in the
5# LICENSE file in the root directory of this source tree.
6
7# pyre-strict
8
9"""Benchmark: SPDL pipeline handoff latency with and without thread output queue.
10
11Compares main-thread ``get_item`` latency between the default asyncio-based
12handoff (``run_coroutine_threadsafe`` + ``Future.result`` polling) and the
13``queue.Queue``-based handoff (direct ``queue.Queue.get``).
14
15The benchmark emulates a realistic training loop: each iteration the
16foreground thread receives a batch, does simulated work (busy-wait to
17mimic a GPU forward/backward pass), and then times how long the *next*
18``get_item`` call takes. When the simulated work is long enough the
19producer has time to pre-fill the queue, so the handoff latency
20isolates the cross-thread scheduling overhead.
21
22Foreground work is swept from 0ms to 30ms in 3ms steps.
23
24Usage::
25
26 buck2 run //spdl/examples:benchmark_thread_output_queue
27
28Example results (devserver, no GPU, 500 lightweight int items)::
29
30 FG work default (p50) default (p99) thread_q (p50) thread_q (p99)
31 ------- ------------- ------------- -------------- --------------
32 0ms 199us 625us 116us 313us
33 3ms 246us 642us 223us 646us
34 6ms 232us 532us 190us 555us
35 9ms 287us 485us 153us 549us
36 12ms 280us 822us 14us 464us
37 15ms 221us 396us 8us 385us
38 18ms 227us 431us 9us 70us
39 21ms 224us 450us 11us 41us
40 24ms 240us 533us 12us 34us
41 27ms 227us 470us 12us 27us
42 30ms 242us 512us 12us 26us
43
44The default asyncio path stays flat at ~220-280us regardless of overlap
45time — that is the fixed ``run_coroutine_threadsafe`` scheduling tax.
46The thread output queue drops to ~8-14us once the foreground work is
47long enough (>= ~12ms on this machine) for the producer to pre-fill it.
48"""
49
50from __future__ import annotations
51
52import statistics
53import time
54from dataclasses import dataclass, field
55
56from spdl.pipeline import PipelineBuilder
57
58__all__ = ["BenchResult", "main"]
59
60
61@dataclass
62class BenchResult:
63 """Collected latency measurements for a single benchmark run."""
64
65 name: str
66 latencies_us: list[float] = field(default_factory=list)
67
68 @property
69 def p50(self) -> float:
70 s = sorted(self.latencies_us)
71 return s[len(s) // 2]
72
73 @property
74 def p95(self) -> float:
75 s = sorted(self.latencies_us)
76 return s[int(len(s) * 0.95)]
77
78 @property
79 def p99(self) -> float:
80 s = sorted(self.latencies_us)
81 return s[int(len(s) * 0.99)]
82
83 @property
84 def mean(self) -> float:
85 return statistics.mean(self.latencies_us)
86
87 def summary(self) -> str:
88 """Return a one-line summary string with mean/p50/p95/p99."""
89 return (
90 f"{self.name:<50s} "
91 f"mean={self.mean:>9.1f}us "
92 f"p50={self.p50:>9.1f}us "
93 f"p95={self.p95:>9.1f}us "
94 f"p99={self.p99:>9.1f}us "
95 f"({len(self.latencies_us)} meas)"
96 )
97
98
99def _busy_wait_us(us: float) -> None:
100 """Busy-wait for *us* microseconds (spin-loop)."""
101 deadline = time.perf_counter() + us / 1e6
102 while time.perf_counter() < deadline:
103 pass
104
105
106def _run_bench(
107 name: str,
108 n_items: int,
109 buffer_size: int,
110 use_thread_output_queue: bool,
111 work_us: float,
112 warmup: int = 20,
113) -> BenchResult:
114 """Run a single benchmark: source -> sink pipeline with foreground work.
115
116 Builds a trivial pipeline (source of *n_items* integers -> sink) and
117 iterates it. Between each consumed item the foreground thread
118 busy-waits for *work_us* microseconds to simulate consumer-side
119 compute (e.g. a GPU training step). After warmup, the time spent
120 inside each ``get_item`` call is recorded.
121
122 Args:
123 name: Human-readable label for the result.
124 n_items: Total number of items the source produces.
125 buffer_size: Sink buffer size (number of slots).
126 use_thread_output_queue: Whether to use the thread output queue handoff.
127 work_us: Duration of simulated foreground work in microseconds.
128 warmup: Number of items to consume before recording latencies.
129 """
130 items = list(range(n_items))
131 pipeline = (
132 PipelineBuilder()
133 .add_source(iter(items))
134 .add_sink(buffer_size=buffer_size)
135 .build(num_threads=2, use_thread_output_queue=use_thread_output_queue)
136 )
137
138 result = BenchResult(name=name)
139 consumed = 0
140 for _ in pipeline.get_iterator(timeout=60):
141 consumed += 1
142
143 # Simulate foreground work (e.g. GPU fwd+bwd).
144 # This gives the producer time to pre-fill the queue
145 # so the next get_item measures pure handoff overhead.
146 _busy_wait_us(work_us)
147
148 if consumed <= warmup:
149 continue
150
151 # Time the next get_item call — this is what we're measuring.
152 t0 = time.perf_counter()
153 try:
154 next(pipeline.get_iterator(timeout=60))
155 except StopIteration:
156 break
157 result.latencies_us.append((time.perf_counter() - t0) * 1e6)
158 consumed += 1
159
160 # Do work after the timed item too, so the *next* iteration's
161 # measurement also has overlap time.
162 _busy_wait_us(work_us)
163
164 return result
165
166
167def main() -> None:
168 """Run the full benchmark sweep and print results."""
169 n_items = 500
170 buffer_size = 8
171
172 print("SPDL Pipeline Thread Output Queue Handoff Benchmark")
173 print("=" * 90)
174
175 results: list[BenchResult] = []
176
177 work_ms_values = list(range(0, 31, 3))
178 for work_ms in work_ms_values:
179 work_us = work_ms * 1000.0
180 label = f"{work_ms}ms foreground work"
181 print(f"\n--- {label} ({n_items} items, buffer_size={buffer_size}) ---")
182
183 for use_toq, tag in [
184 (False, "default asyncio"),
185 (True, "thread output queue"),
186 ]:
187 r = _run_bench(
188 f"{tag}, {label}",
189 n_items,
190 buffer_size,
191 use_thread_output_queue=use_toq,
192 work_us=work_us,
193 )
194 results.append(r)
195 print(f" {r.summary()}")
196
197 print(f"\n{'=' * 90}")
198 print("SUMMARY — main thread get_item() latency (lower is better)")
199 print(f"{'=' * 90}")
200 for r in results:
201 if r.latencies_us:
202 print(f" {r.summary()}")
203
204
205if __name__ == "__main__":
206 main()
API Reference¶
Functions
Classes