Remote Iterable Protocol¶
Manipulting an iterable object in a remote location (subprocess or subinterpreter) requires somewhat elaborated state control. The following section go over the implementation detail.
Worker State¶
The iterable object is manipulated in the worker process.
The worker process has three states, Initialization, Stand By and Iteration.
The Initialization state performs global initialization and create the iterable object.
When the Initialization completes, the worker transition to Stand By mode,
where it waits for a command from the parent process.
The command can be START_ITERATION or ABORT.
When the START_ITERATION is received, the worker process transition to the Iteration mode.
In the Iteration mode, the worker creates an iterator object from the iterable, then executes it. The resulting data are put in the queue, which the parent process is watching.
The following diagram illustrates worker’s state transition in simplified manner.
Detailed diagram alongside the actual implementation is found in _execute_iterable().
Helper functions and data structures¶
The follosing functions and data structures are used to implement
the iterate_in_subprocess() function.
They are not public interface, but the logic is sufficiently elaborated,
and it is helpful to have them in the documentation, so they are listed here.
- class _Cmd[source]
 Command issued from the parent process to the worker process in
iterate_in_subprocess().- ABORT
 Instruct the worker process to abort and exit.
- START_ITERATION
 Instruct the worker process to start the iteration.
- STOP_ITERATION
 Instruct the worker process to stop the ongoing iteration, and go back to the stand-by mode.
If the worker process receive this command in stand-by mode, it is sliently ignored. (This allows the parent process to be sure that the worker process is in the stand-by mode or failure mode, and not in iteration mode.)
- class _Status[source]
 Status reported by the worker process in
iterate_in_subprocess().- INITIALIZATION_FAILED
 Initialization (global, or creation of iterable) failed.
- INITIALIZATION_SUCCEEDED
 Initialization succeeded. The worker process is transitioning to stand-by mode.
- ITERATION_FINISHED
 The worker finished an iteration, transitioning back to the stand-by mode.
Note that this will be sent in both cases where the the iterator exhausted or the parent process issued
STOP_ITERATIONcommand.
- ITERATION_STARTED
 The worker is transitioning to iteration mode.
- ITERATOR_FAILED
 There was an error in an iteration step.
- ITERATOR_SUCCESS
 One step of iteration has been succeeded.
- UNEXPECTED_CMD_RECIEVED
 Received a command unexpected.
- _enter_iteration_mode()[source]
 Instruct the worker to enter iteration mode and wait for the acknowledgement.
Works with both multiprocessing.Queue and concurrent.interpreters.Queue.
- Parameters:
 cmd_q – Queue to send commands to the worker
data_q – Queue to receive status messages from the worker
timeout – Maximum time to wait for acknowledgement
worker_type – Type of worker (for error messages)
- _execute_iterable()[source]
 Worker implementation for
iterate_in_subprocess().The following diagram illustrates the state transition with more details.
stateDiagram-v2 state Initialization { init0: Call Initializer init1: Create Iterable init0 --> init1 } state stand_by { i2: Wait for a command (block) state i3 <<choice>> } stand_by: Stand By init_fail: Push INITIALIZATION_FAILED init_success: Push INITIALIZATION_SUCCESS i4: Push ITERATION_START i5: Iteration i6: Push UNEXPECTED_COMMAND init_fail-->Done [*] --> Initialization Initialization --> init_success: Success Initialization --> init_fail: Failed init_success --> i2 i2 --> i3: Command recieved i3 --> i4: START_ITERATION i3 --> i2: STOP_ITERATION i3 --> i6: Other commands i3 --> Done: ABORT i4 --> j0 i6 --> Done Done --> [*] state i5 { j0: Create Iterator j1: Check command queue (non-blocking) j2: Get Next Item state j3 <<choice>> j4: Push ITERATE_SUCCESS (block) j5: Push ITERATION_FINISHED j6: Push ITERATE_FAILED j0 --> j1 : Success j0 --> j6 : Fail j1 --> j2 : Command Not Found j1 --> j3: Command Found j3 --> j5 : STOP_ITERATION j2 --> j4 : Success j2 --> j5 : EOF j2 --> j6 : Fail j4 --> j1 j5 --> [*] : Iteration completed without and error (go back to Stand By) } j3 --> i6 : START_ITERATION or other commands j3 --> Done : ABORT j6 --> Done i5 --> i2: Iteration completed without an error
- _iterate_results()[source]
 Watch the result queue and iterate on the results.
Works with both multiprocessing.Queue and concurrent.interpreters.Queue.
- Parameters:
 data_q – Queue to receive iteration results
timeout – Maximum time to wait between results
worker_name – Name of the worker (for error messages)
- Yields:
 Items from the iterator