spdl.pipeline.SharedMemoryRingBuffer

class SharedMemoryRingBuffer(capacity: int, *, acquire_timeout: float = 60.0)[source]

[Experimental] A single shared-memory ring buffer for cross-process payloads.

Construct it in the parent process and pass it to spdl.pipeline.iterate_in_subprocess() via the arena argument; ownership transfers to that call, which closes and unlinks it at teardown.

Added in version 0.5.0.

Changed in version 0.6.0: The producer now blocks on a process-shared multiprocessing.Condition when the arena is full and resumes once the consumer releases enough space, throttling the producer instead of failing it. Pass acquire_timeout=0 to restore the legacy raise-immediately behavior.

Added in version 0.6.0: The acquire_timeout argument.

Parameters:
  • capacity – Size of the payload arena in bytes. Must be at least as large as the biggest single pipeline unit (the sum of the binaries offloaded from one item). Size it to the in-flight high-water mark, roughly (buffer_size + 2) * max_unit_bytes.

  • acquire_timeout – Seconds write_binary waits for the consumer to free enough space before raising BufferError. Guards against a stalled/dead consumer; set to 0 for non-blocking behavior (raise immediately when the in-progress unit would not fit).

Methods

close()

Release this process's mapping of the segment.

open_reader()

Return the reader endpoint (call in the parent process).

open_writer()

Return the writer endpoint (call in the worker process).

shutdown_arena()

Wake any producer blocked in _RingWriter.write_binary().

unlink()

Remove the segment from the system (call once, by the owner).

Attributes

capacity

Size of the payload arena in bytes.

head

Absolute byte count published by the writer.

name

Name of the underlying shared-memory segment.

tail

Absolute byte count released by the reader.

property capacity: int[source]

Size of the payload arena in bytes.

close() None[source]

Release this process’s mapping of the segment.

property head: int[source]

Absolute byte count published by the writer.

property name: str[source]

Name of the underlying shared-memory segment.

open_reader() _RingReader[source]

Return the reader endpoint (call in the parent process).

open_writer() _RingWriter[source]

Return the writer endpoint (call in the worker process).

shutdown_arena() None[source]

Wake any producer blocked in _RingWriter.write_binary().

Sets a sticky shutdown flag in the control header and broadcasts on the space condition variable so a producer that is currently waiting for free space exits the wait promptly with a BufferError. Safe to call multiple times and from either process.

property tail: int[source]

Absolute byte count released by the reader.

Remove the segment from the system (call once, by the owner).