spdl.pipeline.iterate_in_subprocess¶
- iterate_in_subprocess(fn: Callable[[], Iterable[T]], *, buffer_size: int = 3, initializer: Callable[[], None] | None = None, mp_context: str | None = None, timeout: float | None = None, daemon: bool = False) Iterable[T] [source]¶
[Experimental] Run the given
iterable
in a subprocess.- Parameters:
fn – Function that returns an iterator. Use
functools.partial()
to pass arguments to the function.buffer_size – Maximum number of items to buffer in the queue.
initializer – A function executed in the subprocess before iteration starts.
mp_context – Context to use for multiprocessing. If not specified, a default method is used.
timeout – Timeout for inactivity. If the generator function does not yield any item for this amount of time, the process is terminated.
daemon – Whether to run the process as a daemon. Use it only for debugging.
- Returns:
Iterator over the results of the generator function.
Note
The function and the values yielded by the iterator of generator must be picklable.
See also
run_pipeline_in_subprocess()
for runinng aPipeline
in a subprocessParallelism and Performance for the context in which this function was created.
Implementation Detail¶
Manipulting an iterable object in a subprocess requires somewhat elaborated state control. The following section go over the implementation detail.
Wroker State
The iterable object is manipulated in the worker process. The worker process has three states, “Initialization”, “Stand By” and “Iteration”. The Initialization state performs global initialization and create the iterable object. When the Initialization completes, the worker transition to Stand By mode, where it waits for a command from the parent process. The command can be “START_ITERATION” or “ABORT”. When the “START_ITERATION” is received, the worker process transition to the Iteration mode. In the Iteration mode, the worker creates an iterator object from the iterable, then executes it. The resulting data are put in the queue, which the parent process is watching.
The following diagram illustrates worker’s state transition in simplified manner. Detailed diagram alongside the actual implementation is found in
_execute_iterable()
.stateDiagram-v2 state Parent { p1: Start Iteration p2: Iterate on the result state pf <<fork>> state pj <<join>> [*] --> p1 p1 --> pf pf --> pj: Wait for worker process pj --> p2 p2 --> [*] } state Worker { state wf <<fork>> w0: Initialization w1: Stand By w2: Iteration [*]--> w0 w0 --> w1: Success w0 --> [*]: Fail w1 --> wf: Iteration started wf --> w2 w2 --> w1: Iteration completed w1 --> [*]: Abort w2 --> [*]: Fail / Abort } pf --> w1: Issue START_ITERATION command wf --> pj: Notify ITERATION_STARTED w2 --> p2: Results passed via queueHelper functions and data structures¶
The follosing functions and data structures are used to implement the
iterate_in_subprocess()
function. They are not public interface, but the logic is sufficiently elaborated, and it is helpful to have them in the documentation, so they are listed here.- class _Cmd[source]
Command issued from the parent process to the worker process in
iterate_in_subprocess()
.- ABORT
Instruct the worker process to abort and exit.
- START_ITERATION
Instruct the worker process to start the iteration.
- STOP_ITERATION
Instruct the worker process to stop the ongoing iteration, and go back to the stand-by mode.
If the worker process receive this command in stand-by mode, it is sliently ignored. (This allows the parent process to be sure that the worker process is in the stand-by mode or failure mode, and not in iteration mode.)
- class _Status[source]
Status reported by the worker process in
iterate_in_subprocess()
.- INITIALIZATION_FAILED
Initialization (global, or creation of iterable) failed.
- INITIALIZATION_SUCCEEDED
Initialization succeeded. The worker process is transitioning to stand-by mode.
- ITERATION_FINISHED
The worker finished an iteration, transitioning back to the stand-by mode.
Note that this will be sent in both cases where the the iterator exhausted or the parent process issued
STOP_ITERATION
command.
- ITERATION_STARTED
The worker is transitioning to iteration mode.
- ITERATOR_FAILED
There was an error in an iteration step.
- ITERATOR_SUCCESS
One step of iteration has been succeeded.
- UNEXPECTED_CMD_RECIEVED
Received a command unexpected.
- _execute_iterable()[source]
Worker implementation for
iterate_in_subprocess()
.The following diagram illustrates the state transition with more details.
stateDiagram-v2 state Initialization { init0: Call Initializer init1: Create Iterable init0 --> init1 } state stand_by { i2: Wait for a command (block) state i3 <<choice>> } stand_by: Stand By init_fail: Push INITIALIZATION_FAILED init_success: Push INITIALIZATION_SUCCESS i4: Push ITERATION_START i5: Iteration i6: Push UNEXPECTED_COMMAND init_fail-->Done [*] --> Initialization Initialization --> init_success: Success Initialization --> init_fail: Failed init_success --> i2 i2 --> i3: Command recieved i3 --> i4: START_ITERATION i3 --> i2: STOP_ITERATION i3 --> i6: Other commands i3 --> Done: ABORT i4 --> j0 i6 --> Done Done --> [*] state i5 { j0: Create Iterator j1: Check command queue (non-blocking) j2: Get Next Item state j3 <<choice>> j4: Push ITERATE_SUCCESS (block) j5: Push ITERATION_FINISHED j6: Push ITERATE_FAILED j0 --> j1 : Success j0 --> j6 : Fail j1 --> j2 : Command Not Found j1 --> j3: Command Found j3 --> j5 : STOP_ITERATION j2 --> j4 : Success j2 --> j5 : EOF j2 --> j6 : Fail j4 --> j1 j5 --> [*] : Iteration completed without and error (go back to Stand By) } j3 --> i6 : START_ITERATION or other commands j3 --> Done : ABORT j6 --> Done i5 --> i2: Iteration completed without an error
- _enter_iteration_mode()[source]
Instruct the worker process to enter iteration mode and wait for the acknowledgement.
- _iterate_results()[source]
Watch the result queue and iterate on the results.