spdl.pipeline.SharedMemorySegmentPool

class SharedMemorySegmentPool(segment_size: int, count: int, *, acquire_timeout: float = 60.0)[source]

[Experimental] A rotating pool of shared-memory segments, one per unit.

Construct it in the parent process and pass it to spdl.pipeline.iterate_in_subprocess() via the arena argument; ownership transfers to the returned iterable, which closes and unlinks it at teardown. Restored NumPy/Torch payloads are zero-copy views over shared memory; the pool keeps each segment until its views are released.

Added in version 0.5.0.

Changed in version 0.6.0: Backpressure now uses a process-shared multiprocessing.Condition instead of busy-polling the shared reclaim counter; idle producers no longer consume CPU while waiting.

Note

Performance. Because each unit is restored as a zero-copy view directly over shared memory, this backend avoids the per-item serialization and copy-out that a plain pickle/queue transfer pays on both sides of the process boundary. The consistent, measured benefit is substantially lower host CPU usage for moving large payloads across the boundary — the transfer stops being a serialize-then-copy operation.

Whether that converts into higher end-to-end throughput depends on where the bottleneck is. It helps most when the pipeline is CPU-bound or transfer/serialization-bound. When the consumer is the bottleneck (for example a GPU-starved, decode-bound stage), throughput can be unchanged and the gain instead shows up as freed CPU headroom — which still matters when many workers contend for host cores. Treat this as an opt-in optimization and measure your own pipeline before relying on it.

Parameters:
  • segment_size – Size of each segment in bytes. Must be at least as large as the biggest single pipeline unit (the sum of its offloaded binaries).

  • count – Number of segments. With Mode B backpressure (the default) the producer waits for the consumer to reclaim a segment when the pool is full, so count bounds memory and in-flight units rather than being a hard cap that errors. Size it to the working set that keeps the consumer fed (buffer_size plus concurrent consumer holds).

  • acquire_timeout – Seconds begin_unit waits for a free segment before raising BufferError. Guards against a stalled/dead consumer; set to 0 for non-blocking (Mode A) behavior.

Methods

close()

Release this process's mappings of the segments.

open_reader()

Return the reader endpoint (call in the parent process).

open_writer()

Return the writer endpoint (call in the worker process).

shutdown_arena()

Wake any producer blocked in _PoolWriter.begin_unit().

unlink()

Remove all segments from the system (call once, by the owner).

Attributes

count

Number of segments in the pool.

published

Number of units published by the writer.

reclaimed

Number of units whose segments have been returned to the pool.

segment_size

Size of each segment in bytes.

close() None[source]

Release this process’s mappings of the segments.

Restored zero-copy views (Packets / NumPy / Torch) may still alias a segment at teardown — their slices keep the underlying mmap exported, so unmapping it raises BufferError. We release what we can and retain the segments that are still mapped; their shared memory is freed once the views are garbage-collected, and unlink() removes the names regardless. Raising here would crash an otherwise-successful run.

property count: int[source]

Number of segments in the pool.

open_reader() _PoolReader[source]

Return the reader endpoint (call in the parent process).

open_writer() _PoolWriter[source]

Return the writer endpoint (call in the worker process).

property published: int[source]

Number of units published by the writer.

property reclaimed: int[source]

Number of units whose segments have been returned to the pool.

property segment_size: int[source]

Size of each segment in bytes.

shutdown_arena() None[source]

Wake any producer blocked in _PoolWriter.begin_unit().

Sets a sticky shutdown flag in the control segment and broadcasts on the space condition variable so a producer that is currently waiting for a free segment exits the wait promptly with a BufferError. Safe to call multiple times and from either process.

Remove all segments from the system (call once, by the owner).