spdl.autoresearch.core.Orchestrator

class Orchestrator(*, workflow: WorkflowProtocol, max_concurrency: int)[source]

Run serializable work specs with bounded async concurrency.

Parameters:
  • workflow – Domain-specific adapter implementing the WorkflowProtocol protocol.

  • max_concurrency – Maximum number of work coroutines to run concurrently (clamped to at least 1).

sequenceDiagram participant Engine as Orchestrator participant Adapter as WorkflowProtocol participant Task as Work coroutine Engine->>Adapter: load() Engine->>Adapter: checkpoint(queued, running, "running") Engine->>Adapter: make_coro(spec) Engine->>Task: schedule up to max_concurrency Task-->>Engine: TaskResult(children) Engine->>Adapter: on_result(spec, result) Adapter-->>Engine: child TaskSpecs Engine->>Adapter: checkpoint(..., "stopped") Note over Engine,Adapter: On SIGINT/SIGTERM, cancel tasks and Note over Engine,Adapter: checkpoint "interrupted".

Methods

run([initial_specs])

Run the engine until all work is complete or interrupted.

async run(initial_specs: list[TaskSpec] | None = None) None[source]

Run the engine until all work is complete or interrupted.

The engine dequeues specs from a priority queue, schedules up to max_concurrency coroutines at a time, and checkpoints state on every iteration. On SIGINT / SIGTERM it cancels running tasks and checkpoints with "interrupted" status so that a subsequent run can resume from where it left off.

Parameters:

initial_specs – Specs to seed the priority queue with. When None, the adapter’s load() method is called instead, which typically reads a checkpoint file written by a previous interrupted run.