Benchmark arena transport

Targeted benchmark: arena transport throughput (regular IPC vs ring vs pool).

Measures how the shared-memory arena affects the cost of shipping large payloads from a backend Pipeline running in a subprocess to the main process, via spdl.pipeline.run_pipeline_in_subprocess().

A backend pipeline produces a fixed dataset of {"data": payload} items; the main process receives them through one of three transports and does nothing with them (a no-op stand-in for a training loop), so the measured throughput isolates transfer + restore with no decode or other per-item work in the way:

  • no-arena — payloads are pickled over the multiprocessing queue (the default).

  • ringSharedMemoryRingBuffer; the reader copies each payload out of shared memory (so it cannot hand out live views).

  • poolSharedMemorySegmentPool; the reader restores each payload as a zero-copy view directly over shared memory.

It sweeps payload sizes x four payload types — bytes, NumPy arrays, Torch tensors, and spdl.io VideoPackets — each of which the arena offloads as a large binary.

Results

From --isolate at 32 MiB on a CPU-only host (--num-items 16 --runs 10): each (kind, transport) runs in its own freshly-spawned process, one at a time, so the CPU s and peak RSS columns are attributable to that one config (getrusage over the timed passes / from a pre-arena baseline; a single shared process would accumulate both across configs and could not separate them). The arena raises throughput and drops the CPU of moving a payload to near zero — the pool restores a zero-copy view, so it does essentially no per-item work, while plain IPC spends several CPU-seconds pickling and copying. bytes / NumPy gain the most. Torch transfers tensors through shared memory itself (its multiprocessing reducer), so plain IPC already avoids a bulk copy — its throughput barely moves and plain IPC is already its leanest memory; the ring even adds CPU (it copies out), yet the pool still drops its CPU to ~0. Packets gain less throughput (restoring rebuilds the AVPacket structures) but the pool slashes their CPU and memory too. peak RSS includes a fixed per-process baseline, so compare it within a row; for bytes / NumPy / packets the no-arena pickle buffers are the heaviest, and the pool the lightest.

kind     transport   recv MB/s   speedup   CPU s   peak RSS MB
------------------------------------------------------------
bytes    no-arena          719     1.00x     7.1          1872
bytes    ring             2684     3.73x     1.8          1015
bytes    pool             3760     5.23x     0.0           673
numpy    no-arena          647     1.00x     7.7          1865
numpy    ring             3202     4.95x     1.1          1009
numpy    pool             3767     5.82x     0.0           669
torch    no-arena         2899     1.00x     0.6           603
torch    ring             3106     1.07x     1.2          1262
torch    pool             3116     1.08x     0.0           834
packets  no-arena          555     1.00x     8.6          1885
packets  ring             1359     2.45x     3.3          1236
packets  pool             1622     2.92x     0.1           825
(recv MB/s with speedup vs no-arena; CPU s and peak RSS are per-config,
lower is better. Throughput speedup grows with payload size — at 2 / 8 MiB
it is smaller; see the size sweep below.)

In the plot below, each line is one (kind, transport) across payload sizes (a separate throughput sweep): colour and marker encode the payload type and line style the transport (dotted = no-arena, dashed = ring, solid = pool); the shaded band is the ~95% confidence interval of the mean:

../_static/data/example_benchmark_arena_transport.png

Example

$ python benchmark_arena_transport.py --sizes 2 8 32 --output results.csv
$ python benchmark_arena_transport_plot.py --input results.csv --output plot.png

Source

Source

Click here to see the source.
  1#!/usr/bin/env python3
  2# Copyright (c) Meta Platforms, Inc. and affiliates.
  3# All rights reserved.
  4#
  5# This source code is licensed under the BSD-style license found in the
  6# LICENSE file in the root directory of this source tree.
  7
  8# pyre-strict
  9
 10"""Targeted benchmark: arena transport throughput (regular IPC vs ring vs pool).
 11
 12Measures how the shared-memory arena affects the cost of shipping large payloads
 13from a backend :py:class:`~spdl.pipeline.Pipeline` running in a subprocess to the
 14main process, via :py:func:`spdl.pipeline.run_pipeline_in_subprocess`.
 15
 16A backend pipeline produces a fixed dataset of ``{"data": payload}`` items; the
 17main process receives them through one of three transports and does nothing with
 18them (a no-op stand-in for a training loop), so the measured throughput isolates
 19**transfer + restore** with no decode or other per-item work in the way:
 20
 21- **no-arena** — payloads are pickled over the multiprocessing queue (the default).
 22- **ring** — :py:class:`~spdl.pipeline.SharedMemoryRingBuffer`; the reader copies
 23  each payload out of shared memory (so it cannot hand out live views).
 24- **pool** — :py:class:`~spdl.pipeline.SharedMemorySegmentPool`; the reader
 25  restores each payload as a zero-copy view directly over shared memory.
 26
 27It sweeps payload sizes x four payload types — ``bytes``, NumPy arrays, Torch
 28tensors, and ``spdl.io`` ``VideoPackets`` — each of which the arena offloads as a
 29large binary.
 30
 31**Results**
 32
 33From ``--isolate`` at 32 MiB on a CPU-only host (``--num-items 16 --runs 10``):
 34each ``(kind, transport)`` runs in its own freshly-spawned process, one at a time,
 35so the ``CPU s`` and ``peak RSS`` columns are attributable to that one config
 36(``getrusage`` over the timed passes / from a pre-arena baseline; a single shared
 37process would accumulate both across configs and could not separate them). The
 38arena raises throughput *and* drops the CPU of moving a payload to near zero —
 39the pool restores a zero-copy view, so it does essentially no per-item work, while
 40plain IPC spends several CPU-seconds pickling and copying. ``bytes`` / NumPy gain
 41the most. Torch transfers tensors through shared memory itself (its
 42multiprocessing reducer), so plain IPC already avoids a bulk copy — its throughput
 43barely moves and plain IPC is already its leanest memory; the ring even adds CPU
 44(it copies out), yet the pool still drops its CPU to ~0. Packets gain less
 45throughput (restoring rebuilds the ``AVPacket`` structures) but the pool slashes
 46their CPU and memory too. ``peak RSS`` includes a fixed per-process baseline, so
 47compare it within a row; for ``bytes`` / NumPy / packets the no-arena pickle
 48buffers are the heaviest, and the pool the lightest.
 49
 50.. code-block:: text
 51
 52   kind     transport   recv MB/s   speedup   CPU s   peak RSS MB
 53   ------------------------------------------------------------
 54   bytes    no-arena          719     1.00x     7.1          1872
 55   bytes    ring             2684     3.73x     1.8          1015
 56   bytes    pool             3760     5.23x     0.0           673
 57   numpy    no-arena          647     1.00x     7.7          1865
 58   numpy    ring             3202     4.95x     1.1          1009
 59   numpy    pool             3767     5.82x     0.0           669
 60   torch    no-arena         2899     1.00x     0.6           603
 61   torch    ring             3106     1.07x     1.2          1262
 62   torch    pool             3116     1.08x     0.0           834
 63   packets  no-arena          555     1.00x     8.6          1885
 64   packets  ring             1359     2.45x     3.3          1236
 65   packets  pool             1622     2.92x     0.1           825
 66   (recv MB/s with speedup vs no-arena; CPU s and peak RSS are per-config,
 67   lower is better. Throughput speedup grows with payload size — at 2 / 8 MiB
 68   it is smaller; see the size sweep below.)
 69
 70In the plot below, each line is one (kind, transport) across payload sizes (a
 71separate throughput sweep): colour and marker encode the payload type and line
 72style the transport (dotted = no-arena, dashed = ring, solid = pool); the shaded
 73band is the ~95% confidence interval of the mean:
 74
 75.. image:: ../../_static/data/example_benchmark_arena_transport.png
 76
 77**Example**
 78
 79.. code-block:: shell
 80
 81   $ python benchmark_arena_transport.py --sizes 2 8 32 --output results.csv
 82   $ python benchmark_arena_transport_plot.py --input results.csv --output plot.png
 83"""
 84
 85from __future__ import annotations
 86
 87__all__ = [
 88    "Row",
 89    "create_video_data",
 90    "main",
 91    "read_csv",
 92    "run_transport",
 93    "write_csv",
 94]
 95
 96import argparse
 97import csv
 98import gc
 99import multiprocessing as mp
100import resource
101import statistics
102import tempfile
103import time
104from collections.abc import Iterator
105from dataclasses import asdict, dataclass, fields
106from itertools import product
107from typing import Any
108
109import numpy as np
110import spdl.io
111import torch
112from spdl.pipeline import (
113    PipelineBuilder,
114    run_pipeline_in_subprocess,
115    SharedMemoryRingBuffer,
116    SharedMemorySegmentPool,
117)
118
119_KINDS = ("bytes", "numpy", "torch", "packets")
120_TRANSPORTS = ("no-arena", "ring", "pool")
121# 4K frames: at the benchmark's payload sizes the encoder fills the bitrate with
122# real frame data instead of mostly CBR filler, so the packets resemble a real
123# high-resolution decode workload.
124_VIDEO_SIZE = "3840x2160"
125
126
127@dataclass(frozen=True)
128class Row:
129    """Row()
130
131    One benchmark measurement: throughput for a (size, kind, transport)."""
132
133    size_mb: int
134    """Requested payload size in MiB (the sweep knob)."""
135
136    kind: str
137    """Payload kind: ``"bytes"``, ``"numpy"``, ``"torch"``, or ``"packets"``."""
138
139    transport: str
140    """Transport used: ``"no-arena"``, ``"ring"``, or ``"pool"``."""
141
142    items_per_s: float
143    """Mean items received per second over the timed passes."""
144
145    mb_per_s: float
146    """Mean payload throughput in MB/s (``items_per_s`` x payload bytes)."""
147
148    mb_per_s_lo: float
149    """Lower bound of the ~95% confidence interval of ``mb_per_s`` (normal
150    approximation over the timed passes)."""
151
152    mb_per_s_hi: float
153    """Upper bound of the ~95% confidence interval of ``mb_per_s``."""
154
155    speedup: float
156    """``items_per_s`` relative to the no-arena baseline for this (size, kind)."""
157
158    peak_rss_mb: float = 0.0
159    """Peak resident memory of the config's process tree (consumer + producer), in
160    MB. Only populated by ``--isolate``; ``0`` otherwise. Shared arena pages may be
161    counted in both processes, so treat it as an upper-bound proxy and compare
162    within a (size, kind) row across transports."""
163
164    cpu_sec: float = 0.0
165    """Total CPU seconds (user + system, consumer + producer) the config consumed.
166    Only populated by ``--isolate``; ``0`` otherwise. Lower is better — moving a
167    payload via the arena should cost less CPU than the pickle + copy of plain IPC."""
168
169
170def create_video_data(target_bytes: int, duration_seconds: float = 2.0) -> bytes:
171    """Generate an H.264 MP4 whose stream is approximately ``target_bytes``.
172
173    Encodes low-entropy frames at a constant target bitrate (x264 CBR, which pads
174    with filler to hold the rate), so the serialized ``VideoPackets`` payload
175    scales with the requested size rather than collapsing to the content's natural
176    compressed size. Uses ``spdl.io``'s in-process encoder rather than the
177    ``ffmpeg`` CLI, so it also works on minimal container images that do not ship
178    the CLI.
179
180    Args:
181        target_bytes: Desired size of the encoded video stream, in bytes.
182        duration_seconds: Clip duration; the bitrate is derived from it.
183
184    Returns:
185        The encoded MP4 file contents as raw bytes.
186    """
187    width, height = (int(x) for x in _VIDEO_SIZE.split("x"))
188    frame_rate = (30, 1)
189    num_frames = max(1, round(frame_rate[0] / frame_rate[1] * duration_seconds))
190    bit_rate = max(64_000, int(target_bytes * 8 / duration_seconds))
191    kbps = bit_rate // 1000
192    batch_size = 4  # bound peak memory: 4K yuv444p frames are ~25 MiB each
193    with tempfile.NamedTemporaryFile(suffix=".mp4") as tmp_file:
194        muxer = spdl.io.Muxer(tmp_file.name)
195        encoder = muxer.add_encode_stream(
196            config=spdl.io.video_encode_config(
197                height=height,
198                width=width,
199                pix_fmt="yuv444p",
200                frame_rate=frame_rate,
201                bit_rate=bit_rate,
202            ),
203            encoder="libx264",
204            # CBR with a tight VBV so the encoder pads low-entropy frames with
205            # filler up to the target rate (mirrors the ffmpeg nal-hrd=cbr path).
206            encoder_config={
207                "preset": "ultrafast",
208                "x264-params": f"nal-hrd=cbr:vbv-maxrate={kbps}:vbv-bufsize={kbps}",
209            },
210        )
211        with muxer.open():
212            for start in range(0, num_frames, batch_size):
213                n = min(batch_size, num_frames - start)
214                array = np.zeros((n, 3, height, width), dtype=np.uint8)
215                frames = spdl.io.create_reference_video_frame(
216                    array=array,
217                    pix_fmt="yuv444p",
218                    frame_rate=frame_rate,
219                    pts=start,
220                )
221                if (packets := encoder.encode(frames)) is not None:
222                    muxer.write(0, packets)
223            if (packets := encoder.flush()) is not None:
224                muxer.write(0, packets)
225        with open(tmp_file.name, "rb") as f:
226            return f.read()
227
228
229def _make_payload(kind: str, size: int, video_bytes: bytes | None) -> object:
230    """Build one payload of ``kind`` whose serialized size is roughly ``size``."""
231    if kind == "bytes":
232        return bytes(size)
233    if kind == "numpy":
234        return np.zeros(max(1, size // 4), dtype=np.float32)
235    if kind == "torch":
236        return torch.zeros(max(1, size // 4), dtype=torch.float32)
237    if kind == "packets":
238        assert video_bytes is not None
239        return spdl.io.demux_video(video_bytes)
240    raise ValueError(f"unknown kind: {kind}")
241
242
243def _payload_nbytes(kind: str, size: int, video_bytes: bytes | None) -> int:
244    """The on-the-wire size of one payload, used to size the arena."""
245    payload: Any = _make_payload(kind, size, video_bytes)
246    if kind == "packets":
247        return len(payload.__getstate__())
248    if kind == "numpy":
249        return int(payload.nbytes)
250    if kind == "torch":
251        return int(payload.element_size() * payload.nelement())
252    return len(payload)
253
254
255@dataclass(frozen=True)
256class _Dataset:
257    """A picklable finite dataset: yields ``num_items`` payload items.
258
259    Only the small spec is pickled into the subprocess; the payload itself is
260    built lazily in ``__iter__`` (i.e. in the worker), so shipping the dataset
261    across the process boundary stays cheap regardless of payload size.
262
263    Most kinds reuse one payload object so the measured per-item cost is transfer
264    + restore rather than construction. Torch is the exception: it moves a CPU
265    tensor's storage into shared memory in place on the first IPC send and reuses
266    it afterwards, so reusing one tensor would hide the real cost (a fresh
267    shared-memory segment per distinct tensor). It therefore yields a distinct
268    tensor per item, matching how a real pipeline produces tensors.
269    """
270
271    kind: str
272    size: int
273    num_items: int
274    video_bytes: bytes | None = None
275
276    def __iter__(self) -> Iterator[dict[str, object]]:
277        # Torch moves a CPU tensor's storage into shared memory in place on the
278        # first cross-process send and reuses that same segment for later sends of
279        # the *same* tensor (torch.multiprocessing's reducer + its storage cache).
280        # So if the benchmark reused one tensor, every send after the first would
281        # be nearly free and hide the real per-item cost (a fresh shm segment per
282        # distinct tensor) — build a distinct tensor per item for torch instead.
283        reused = (
284            None
285            if self.kind == "torch"
286            else _make_payload(self.kind, self.size, self.video_bytes)
287        )
288        for _ in range(self.num_items):
289            payload = (
290                reused
291                if reused is not None
292                else _make_payload(self.kind, self.size, self.video_bytes)
293            )
294            yield {"data": payload}
295
296
297def _make_arena(
298    transport: str, payload_nbytes: int, buffer_size: int, num_items: int
299) -> SharedMemoryRingBuffer | SharedMemorySegmentPool | None:
300    """Construct the arena for a transport mode, sized to hold a whole iteration.
301
302    The arena does not block the writer when full — it raises "shared-memory
303    arena full" and relies on the pipeline's queue backpressure to bound the
304    in-flight units. That bound only holds when the reader keeps up; when restore
305    is much slower than offload (e.g. rebuilding ``VideoPackets`` on a busy host),
306    the writer can get a whole iteration ahead. So size the arena for the
307    worst case — every item of one iteration in flight at once — which never
308    overruns regardless of the producer/consumer speed gap.
309    """
310    if transport == "no-arena":
311        return None
312    slots = max(2 * buffer_size + 6, num_items + 2)
313    unit = max(1 << 20, payload_nbytes * 2)  # headroom for envelope + alignment
314    if transport == "ring":
315        return SharedMemoryRingBuffer(capacity=unit * slots)
316    if transport == "pool":
317        return SharedMemorySegmentPool(segment_size=unit, count=slots)
318    raise ValueError(f"unknown transport: {transport}")
319
320
321def run_transport(
322    dataset: _Dataset,
323    transport: str,
324    *,
325    payload_nbytes: int,
326    num_items: int,
327    buffer_size: int,
328    runs: int,
329    duration_sec: float = 0.0,
330    usage: list[float] | None = None,
331) -> list[float]:
332    """Run the backend pipeline once per timed pass; return per-pass items/second.
333
334    Builds a one-stage backend pipeline (source -> sink) that runs in a
335    subprocess via :py:func:`run_pipeline_in_subprocess`, ships its items to the
336    main process over ``transport``, and consumes them in a no-op loop. One warmup
337    pass is discarded; the per-pass throughput of each timed pass is returned so
338    the caller can summarize it (mean + confidence interval). The subprocess and
339    arena are reused across passes and torn down at the end.
340
341    Args:
342        dataset: The (picklable) backend data source.
343        transport: One of ``"no-arena"``, ``"ring"``, ``"pool"``.
344        payload_nbytes: On-the-wire size of one payload, used to size the arena.
345        num_items: Number of items per pass (must match ``dataset.num_items``).
346        buffer_size: Pipeline sink buffer size / arena in-flight unit count.
347        runs: Number of timed passes (used when ``duration_sec`` is not positive).
348        duration_sec: If positive, keep running timed passes until this many
349            seconds have elapsed instead of stopping after ``runs`` passes — used
350            for sustained, fixed-duration host-stat sampling by an external sampler.
351        usage: If given, one element is appended: the CPU seconds (user + system,
352            this process + the producer subprocess) spent across the timed passes.
353            The snapshot is taken *after* the warmup pass, so process-startup and
354            import costs are excluded and only the transport's per-item work counts.
355
356    Returns:
357        One throughput sample (items per second) per timed pass.
358    """
359    arena = _make_arena(transport, payload_nbytes, buffer_size, num_items)
360    config = (
361        PipelineBuilder()
362        .add_source(dataset)
363        .add_sink(buffer_size=buffer_size)
364        .get_config()
365    )
366    src = run_pipeline_in_subprocess(
367        config, num_threads=1, arena=arena, buffer_size=buffer_size
368    )
369    try:
370        for item in src:  # warmup pass (subprocess spawn + first-touch costs)
371            del item
372        cpu_base = _cpu_now()  # after warmup: imports + arena warm-up excluded
373        samples: list[float] = []
374        deadline = time.perf_counter() + duration_sec
375        while True:
376            n = 0
377            t0 = time.perf_counter()
378            for item in src:
379                n += 1
380                del item  # release the view so the pool can recycle the segment
381            samples.append(num_items / (time.perf_counter() - t0))
382            assert n == num_items, f"{transport}: expected {num_items}, got {n}"
383            done_by_count = duration_sec <= 0 and len(samples) >= runs
384            done_by_time = duration_sec > 0 and time.perf_counter() >= deadline
385            if done_by_count or done_by_time:
386                break
387        if usage is not None:
388            usage.append(_cpu_now() - cpu_base)
389        return samples
390    finally:
391        # Drop the iterable so its finalizer closes + unlinks the arena before the
392        # next config reuses the shared-memory namespace.
393        del src
394        gc.collect()
395
396
397def _confidence_interval(samples: list[float]) -> tuple[float, float]:
398    """~95% confidence interval of the mean (normal approximation).
399
400    Degenerate cases (a single pass) return ``(mean, mean)``.
401    """
402    mean = statistics.mean(samples)
403    if len(samples) < 2:
404        return mean, mean
405    half = 1.96 * statistics.stdev(samples) / (len(samples) ** 0.5)
406    return mean - half, mean + half
407
408
409def _run_config(size_mb: int, kind: str, args: argparse.Namespace) -> list[Row]:
410    """Benchmark one (size, kind) across the selected transports; one Row each."""
411    size = size_mb << 20
412    # For packets, synthesize a clip whose serialized payload tracks ``size`` (the
413    # other kinds build a payload of exactly ``size`` bytes directly).
414    vb = create_video_data(size) if kind == "packets" else None
415    payload_nbytes = _payload_nbytes(kind, size, vb)
416    dataset = _Dataset(kind, size, args.num_items, vb)
417    factor = payload_nbytes / 1e6  # items/s -> MB/s
418    # First pass: run each transport in the requested order (order is preserved so
419    # the --duration-sec window markers stay aligned with an external sampler) and
420    # collect its mean + CI.
421    measured: list[tuple[str, float, float, float]] = []
422    for i, transport in enumerate(args.transports):
423        if i and args.gap_sec:
424            time.sleep(args.gap_sec)  # idle valley between per-transport windows
425        if args.duration_sec:
426            # Timestamped window markers so externally-sampled host stats (e.g. an
427            # external once-per-minute CPU/memory sampler) can be attributed to each config.
428            print(
429                f"### begin {kind} {size_mb}M {transport} ts={time.time():.0f}",
430                flush=True,
431            )
432        samples = run_transport(
433            dataset,
434            transport,
435            payload_nbytes=payload_nbytes,
436            num_items=args.num_items,
437            buffer_size=args.buffer_size,
438            runs=args.runs,
439            duration_sec=args.duration_sec,
440        )
441        if args.duration_sec:
442            print(
443                f"### end   {kind} {size_mb}M {transport} ts={time.time():.0f}",
444                flush=True,
445            )
446        mean = statistics.mean(samples)
447        lo, hi = _confidence_interval(samples)
448        measured.append((transport, mean, lo, hi))
449    # Second pass: resolve the no-arena baseline after every transport is measured,
450    # so speedup does not depend on the order transports were run (and defaults to
451    # 1.0 when no-arena was not among --transports).
452    baseline = next((m for (t, m, _lo, _hi) in measured if t == "no-arena"), 0.0)
453    return [
454        Row(
455            size_mb=size_mb,
456            kind=kind,
457            transport=transport,
458            items_per_s=mean,
459            mb_per_s=mean * factor,
460            mb_per_s_lo=lo * factor,
461            mb_per_s_hi=hi * factor,
462            speedup=mean / baseline if baseline else 1.0,
463        )
464        for transport, mean, lo, hi in measured
465    ]
466
467
468def _cpu_now() -> float:
469    """CPU seconds (user + system) for this process plus its reaped children."""
470    s = resource.getrusage(resource.RUSAGE_SELF)
471    c = resource.getrusage(resource.RUSAGE_CHILDREN)
472    return s.ru_utime + s.ru_stime + c.ru_utime + c.ru_stime
473
474
475def _peak_rss_mb() -> float:
476    """Peak RSS (self + reaped children) in MB (``ru_maxrss`` is KiB on Linux)."""
477    s = resource.getrusage(resource.RUSAGE_SELF)
478    c = resource.getrusage(resource.RUSAGE_CHILDREN)
479    return (s.ru_maxrss + c.ru_maxrss) / 1024
480
481
482def _isolated_worker(
483    kind: str,
484    size: int,
485    transport: str,
486    payload_nbytes: int,
487    num_items: int,
488    buffer_size: int,
489    runs: int,
490    vb: bytes | None,
491    q: "mp.Queue[tuple[str, object, float, float]]",
492) -> None:
493    """Run one config in this fresh process; report samples + CPU + peak RSS.
494
495    Run as the target of a freshly-spawned process. Because the process starts
496    clean and exits after one config, the measurements attribute resources to
497    *this* config alone — a single long-lived process accumulates them across
498    configs and cannot separate them, which is the whole point of ``--isolate``.
499    CPU is measured over the timed passes only (excludes this worker's and the
500    producer's import/startup cost); peak RSS is the growth from a baseline taken
501    before the arena and payloads are built. The video bytes are encoded in the
502    parent and passed in, so the encoder's footprint does not land in this peak.
503    """
504    try:
505        rss_base = _peak_rss_mb()
506        usage: list[float] = []
507        dataset = _Dataset(kind, size, num_items, vb)
508        samples = run_transport(
509            dataset,
510            transport,
511            payload_nbytes=payload_nbytes,
512            num_items=num_items,
513            buffer_size=buffer_size,
514            runs=runs,
515            usage=usage,
516        )
517        cpu = usage[0] if usage else 0.0
518        q.put(("ok", samples, cpu, _peak_rss_mb() - rss_base))
519    except Exception as e:  # surface failure instead of hanging the parent
520        q.put(("err", f"{type(e).__name__}: {e}", 0.0, 0.0))
521
522
523def _run_isolated(
524    mp_ctx: "mp.context.BaseContext",
525    kind: str,
526    size: int,
527    transport: str,
528    payload_nbytes: int,
529    num_items: int,
530    buffer_size: int,
531    runs: int,
532    vb: bytes | None,
533) -> tuple[list[float], float, float]:
534    """Run ``_isolated_worker`` in a fresh process; return ``(samples, cpu, rss)``."""
535    q: "mp.Queue[tuple[str, object, float, float]]" = mp_ctx.Queue()
536    p = mp_ctx.Process(  # pyre-ignore[16]
537        target=_isolated_worker,
538        args=(
539            kind,
540            size,
541            transport,
542            payload_nbytes,
543            num_items,
544            buffer_size,
545            runs,
546            vb,
547            q,
548        ),
549    )
550    p.start()
551    status, payload, cpu, rss = q.get()
552    p.join()
553    if status != "ok":
554        raise RuntimeError(str(payload))
555    return payload, cpu, rss  # pyre-ignore[7]
556
557
558def _run_config_isolated(
559    size_mb: int, kind: str, args: argparse.Namespace, mp_ctx: "mp.context.BaseContext"
560) -> list[Row]:
561    """Benchmark one (size, kind), each transport in its own fresh process.
562
563    Per-process isolation is what makes the CPU-time and peak-RSS columns
564    meaningful: each transport's footprint is measured from a clean slate, so the
565    arena's shared-memory cost and the no-arena pickle/copy cost are attributable
566    rather than piled into one accumulating process.
567    """
568    size = size_mb << 20
569    # Encode in the parent so the (large) encoder footprint stays out of the
570    # per-config worker's peak RSS.
571    vb = create_video_data(size) if kind == "packets" else None
572    payload_nbytes = _payload_nbytes(kind, size, vb)
573    factor = payload_nbytes / 1e6  # items/s -> MB/s
574    # First pass: measure each transport in its own process.
575    measured: list[tuple[str, float, float, float, float, float]] = []
576    for transport in args.transports:
577        samples, cpu, rss = _run_isolated(
578            mp_ctx,
579            kind,
580            size,
581            transport,
582            payload_nbytes,
583            args.num_items,
584            args.buffer_size,
585            args.runs,
586            vb,
587        )
588        mean = statistics.mean(samples)
589        lo, hi = _confidence_interval(samples)
590        measured.append((transport, mean, lo, hi, cpu, rss))
591    # Second pass: resolve the no-arena baseline after every transport is measured,
592    # so speedup is order-independent (and defaults to 1.0 when no-arena was not
593    # among --transports).
594    baseline = next((m for (t, m, *_rest) in measured if t == "no-arena"), 0.0)
595    return [
596        Row(
597            size_mb=size_mb,
598            kind=kind,
599            transport=transport,
600            items_per_s=mean,
601            mb_per_s=mean * factor,
602            mb_per_s_lo=lo * factor,
603            mb_per_s_hi=hi * factor,
604            speedup=mean / baseline if baseline else 1.0,
605            peak_rss_mb=rss,
606            cpu_sec=cpu,
607        )
608        for transport, mean, lo, hi, cpu, rss in measured
609    ]
610
611
612def _table_header() -> str:
613    """Header for the pivoted table (one transport column group per row)."""
614    return f"{'kind':<8} {'size':>5} {'no-arena':>9} {'ring':>15} {'pool':>15}"
615
616
617def _cell(
618    idx: dict[tuple[int, str, str], Row], size_mb: int, kind: str, transport: str
619) -> str:
620    """MB/s for one (size, kind, transport), with speedup in (x); ``-`` if not run."""
621    r = idx.get((size_mb, kind, transport))
622    if r is None:
623        return "-"
624    if transport == "no-arena":
625        return f"{r.mb_per_s:.0f}"
626    return f"{r.mb_per_s:.0f} ({r.speedup:.2f}x)"
627
628
629def _table_row(idx: dict[tuple[int, str, str], Row], size_mb: int, kind: str) -> str:
630    """One pivoted row: no-arena/ring/pool MB/s, with ring/pool speedup in (x)."""
631    na = _cell(idx, size_mb, kind, "no-arena")
632    ring = _cell(idx, size_mb, kind, "ring")
633    pool = _cell(idx, size_mb, kind, "pool")
634    return f"{kind:<8} {size_mb:>4}M {na:>9} {ring:>15} {pool:>15}"
635
636
637def _print_table(rows: list[Row]) -> None:
638    """Print one row per (size, kind); transports are columns (MB/s, speedup)."""
639    idx = {(r.size_mb, r.kind, r.transport): r for r in rows}
640    keys = sorted(
641        {(r.size_mb, r.kind) for r in rows},
642        key=lambda k: (_KINDS.index(k[1]), k[0]),
643    )
644    header = _table_header()
645    print(header)
646    print("-" * len(header))
647    for size_mb, kind in keys:
648        print(_table_row(idx, size_mb, kind))
649    print("(throughput in MB/s; (Nx) = speedup over the no-arena baseline)")
650
651
652def _print_isolate_table(rows: list[Row]) -> None:
653    """One row per (size, kind, transport): throughput, CPU time, peak RSS.
654
655    Used for ``--isolate`` runs, where CPU and RSS are per-config (each transport
656    ran in its own process).
657    """
658    rows = sorted(
659        rows,
660        key=lambda r: (_KINDS.index(r.kind), r.size_mb, _TRANSPORTS.index(r.transport)),
661    )
662    header = (
663        f"{'kind':<8} {'size':>5} {'transport':<9} {'recv MB/s':>10} "
664        f"{'speedup':>8} {'CPU s':>7} {'peak RSS MB':>12}"
665    )
666    print(header)
667    print("-" * len(header))
668    for r in rows:
669        print(
670            f"{r.kind:<8} {r.size_mb:>4}M {r.transport:<9} {r.mb_per_s:>10.0f} "
671            f"{r.speedup:>7.2f}x {r.cpu_sec:>7.1f} {r.peak_rss_mb:>12.0f}"
672        )
673    print(
674        "(recv MB/s with speedup vs no-arena; CPU s and peak RSS are per-config, "
675        "lower is better)"
676    )
677
678
679def write_csv(rows: list[Row], path: str) -> None:
680    """Write benchmark rows to ``path`` as CSV (one column per :class:`Row` field)."""
681    names = [f.name for f in fields(Row)]
682    with open(path, "w", newline="") as f:
683        writer = csv.DictWriter(f, fieldnames=names)
684        writer.writeheader()
685        writer.writerows(asdict(r) for r in rows)
686    print(f"wrote {len(rows)} rows to {path}")
687
688
689def read_csv(path: str) -> list[Row]:
690    """Read benchmark rows written by :py:func:`write_csv`."""
691    with open(path, newline="") as f:
692        return [
693            Row(
694                size_mb=int(d["size_mb"]),
695                kind=d["kind"],
696                transport=d["transport"],
697                items_per_s=float(d["items_per_s"]),
698                mb_per_s=float(d["mb_per_s"]),
699                mb_per_s_lo=float(d["mb_per_s_lo"]),
700                mb_per_s_hi=float(d["mb_per_s_hi"]),
701                speedup=float(d["speedup"]),
702                peak_rss_mb=float(d.get("peak_rss_mb") or 0.0),
703                cpu_sec=float(d.get("cpu_sec") or 0.0),
704            )
705            for d in csv.DictReader(f)
706        ]
707
708
709def _parse_args() -> argparse.Namespace:
710    parser = argparse.ArgumentParser(
711        description="Arena transport throughput: regular IPC vs ring vs pool"
712    )
713    parser.add_argument("--num-items", type=int, default=300)
714    parser.add_argument("--buffer-size", type=int, default=3)
715    parser.add_argument(
716        "--sizes", type=int, nargs="+", default=[2, 8, 32], help="payload sizes (MiB)"
717    )
718    parser.add_argument("--runs", type=int, default=3)
719    parser.add_argument(
720        "--kinds",
721        nargs="+",
722        choices=_KINDS,
723        default=list(_KINDS),
724        help="payload types to benchmark",
725    )
726    parser.add_argument(
727        "--transports",
728        nargs="+",
729        choices=_TRANSPORTS,
730        default=list(_TRANSPORTS),
731        help="transports to benchmark (use one at a time to isolate host stats)",
732    )
733    parser.add_argument(
734        "--duration-sec",
735        type=float,
736        default=0.0,
737        help="if >0, run each (transport, kind, size) for this long instead of "
738        "--runs passes — for sustained host-stat sampling by an external sampler",
739    )
740    parser.add_argument(
741        "--gap-sec",
742        type=float,
743        default=0.0,
744        help="idle seconds inserted between transports, to separate per-transport "
745        "windows for once-per-minute host-stat sampling",
746    )
747    parser.add_argument(
748        "--isolate",
749        action="store_true",
750        help="run each (size, kind, transport) in its own fresh process and report "
751        "per-config CPU time and peak RSS (clean memory/CPU attribution, no "
752        "carryover between configs)",
753    )
754    parser.add_argument("--output", help="optional path to write results as CSV")
755    return parser.parse_args()
756
757
758def main() -> None:
759    """Sweep payload sizes x types x transports; print a table and optionally a CSV."""
760    args = _parse_args()
761    # spawn (not fork) so each isolated worker starts from a clean interpreter,
762    # giving comparable per-config peak RSS rather than inheriting the parent's.
763    mp_ctx = mp.get_context("spawn") if args.isolate else None
764    rows: list[Row] = []
765    for size_mb, kind in product(args.sizes, args.kinds):
766        try:
767            if mp_ctx is not None:
768                rows.extend(_run_config_isolated(size_mb, kind, args, mp_ctx))
769            else:
770                rows.extend(_run_config(size_mb, kind, args))
771        except (OSError, RuntimeError) as e:
772            # Building a kind's payload (e.g. encoding the packets clip) can fail
773            # on some hosts; skip that kind rather than abort the whole sweep.
774            print(f"### skip {kind} {size_mb}M: {type(e).__name__}: {e}", flush=True)
775    if args.isolate:
776        _print_isolate_table(rows)
777    else:
778        _print_table(rows)
779    if args.output:
780        write_csv(rows, args.output)
781
782
783if __name__ == "__main__":
784    main()

API Reference

Functions

create_video_data(target_bytes: int, duration_seconds: float = 2.0) bytes[source]

Generate an H.264 MP4 whose stream is approximately target_bytes.

Encodes low-entropy frames at a constant target bitrate (x264 CBR, which pads with filler to hold the rate), so the serialized VideoPackets payload scales with the requested size rather than collapsing to the content’s natural compressed size. Uses spdl.io’s in-process encoder rather than the ffmpeg CLI, so it also works on minimal container images that do not ship the CLI.

Parameters:
  • target_bytes – Desired size of the encoded video stream, in bytes.

  • duration_seconds – Clip duration; the bitrate is derived from it.

Returns:

The encoded MP4 file contents as raw bytes.

main() None[source]

Sweep payload sizes x types x transports; print a table and optionally a CSV.

read_csv(path: str) list[Row][source]

Read benchmark rows written by write_csv().

run_transport(dataset: _Dataset, transport: str, *, payload_nbytes: int, num_items: int, buffer_size: int, runs: int, duration_sec: float = 0.0, usage: list[float] | None = None) list[float][source]

Run the backend pipeline once per timed pass; return per-pass items/second.

Builds a one-stage backend pipeline (source -> sink) that runs in a subprocess via run_pipeline_in_subprocess(), ships its items to the main process over transport, and consumes them in a no-op loop. One warmup pass is discarded; the per-pass throughput of each timed pass is returned so the caller can summarize it (mean + confidence interval). The subprocess and arena are reused across passes and torn down at the end.

Parameters:
  • dataset – The (picklable) backend data source.

  • transport – One of "no-arena", "ring", "pool".

  • payload_nbytes – On-the-wire size of one payload, used to size the arena.

  • num_items – Number of items per pass (must match dataset.num_items).

  • buffer_size – Pipeline sink buffer size / arena in-flight unit count.

  • runs – Number of timed passes (used when duration_sec is not positive).

  • duration_sec – If positive, keep running timed passes until this many seconds have elapsed instead of stopping after runs passes — used for sustained, fixed-duration host-stat sampling by an external sampler.

  • usage – If given, one element is appended: the CPU seconds (user + system, this process + the producer subprocess) spent across the timed passes. The snapshot is taken after the warmup pass, so process-startup and import costs are excluded and only the transport’s per-item work counts.

Returns:

One throughput sample (items per second) per timed pass.

write_csv(rows: list[Row], path: str) None[source]

Write benchmark rows to path as CSV (one column per Row field).

Classes

class Row[source]

One benchmark measurement: throughput for a (size, kind, transport).

cpu_sec: float = 0.0

Total CPU seconds (user + system, consumer + producer) the config consumed. Only populated by --isolate; 0 otherwise. Lower is better — moving a payload via the arena should cost less CPU than the pickle + copy of plain IPC.

items_per_s: float

Mean items received per second over the timed passes.

kind: str

"bytes", "numpy", "torch", or "packets".

Type:

Payload kind

mb_per_s: float

Mean payload throughput in MB/s (items_per_s x payload bytes).

mb_per_s_hi: float

Upper bound of the ~95% confidence interval of mb_per_s.

mb_per_s_lo: float

Lower bound of the ~95% confidence interval of mb_per_s (normal approximation over the timed passes).

peak_rss_mb: float = 0.0

Peak resident memory of the config’s process tree (consumer + producer), in MB. Only populated by --isolate; 0 otherwise. Shared arena pages may be counted in both processes, so treat it as an upper-bound proxy and compare within a (size, kind) row across transports.

size_mb: int

Requested payload size in MiB (the sweep knob).

speedup: float

items_per_s relative to the no-arena baseline for this (size, kind).

transport: str

"no-arena", "ring", or "pool".

Type:

Transport used