spdl.autoresearch.core.Orchestrator¶
- class Orchestrator(*, workflow: WorkflowProtocol, max_concurrency: int)[source]¶
Run serializable work specs with bounded async concurrency.
- Parameters:
workflow – Domain-specific adapter implementing the
WorkflowProtocolprotocol.max_concurrency – Maximum number of work coroutines to run concurrently (clamped to at least 1).
sequenceDiagram participant Engine as Orchestrator participant Adapter as WorkflowProtocol participant Task as Work coroutine Engine->>Adapter: load() Engine->>Adapter: checkpoint(queued, running, "running") Engine->>Adapter: make_coro(spec) Engine->>Task: schedule up to max_concurrency Task-->>Engine: TaskResult(children) Engine->>Adapter: on_result(spec, result) Adapter-->>Engine: child TaskSpecs Engine->>Adapter: checkpoint(..., "stopped") Note over Engine,Adapter: On SIGINT/SIGTERM, cancel tasks and Note over Engine,Adapter: checkpoint "interrupted".Methods
run([initial_specs])Run the engine until all work is complete or interrupted.
- async run(initial_specs: list[TaskSpec] | None = None) None[source]¶
Run the engine until all work is complete or interrupted.
The engine dequeues specs from a priority queue, schedules up to
max_concurrencycoroutines at a time, and checkpoints state on every iteration. OnSIGINT/SIGTERMit cancels running tasks and checkpoints with"interrupted"status so that a subsequent run can resume from where it left off.- Parameters:
initial_specs – Specs to seed the priority queue with. When
None, the adapter’sload()method is called instead, which typically reads a checkpoint file written by a previous interrupted run.