Module dora.explore
Classes used to define a grid search.
Launcher
: a launcher is passed to each grid search explore function,
and can be called repeatidly to schedule XPs.
Explorer
: defines some metadata, in particular the metrics to display
with the dora grid
command.
Expand source code
# Copyright (c) Facebook, Inc. and its affiliates.
# All rights reserved.
#
# This source code is licensed under the license found in the
# LICENSE file in the root directory of this source tree.
"""
Classes used to define a grid search.
`Launcher`: a launcher is passed to each grid search explore function,
and can be called repeatidly to schedule XPs.
`Explorer`: defines some metadata, in particular the metrics to display
with the `dora grid` command.
"""
from collections import OrderedDict
from copy import deepcopy
from concurrent.futures import ProcessPoolExecutor, Future
from contextlib import contextmanager
from dataclasses import dataclass, field
import typing as tp
from treetable.table import _Node
import treetable as tt
from .conf import SlurmConfig
from .shep import Shepherd, Sheep
class ProcessException(RuntimeError):
pass
def _process(shepherd: Shepherd, argv: tp.List[str], slurm: SlurmConfig,
job_array_index: tp.Optional[int] = None):
try:
return (shepherd.get_sheep_from_argv(argv), slurm, job_array_index)
except Exception as exc:
raise ProcessException(repr(exc))
@dataclass
class Herd:
"""Represents a herd of sheeps ready to be scheduled.
"""
sheeps: tp.Dict[str, Sheep] = field(default_factory=OrderedDict)
slurm_configs: tp.Dict[str, SlurmConfig] = field(default_factory=dict)
job_arrays: tp.List[tp.List[str]] = field(default_factory=list)
# Sheeps that need to be evaluated in a process pool for faster execution.
_pendings: tp.List[Future] = field(default_factory=list)
_job_array_launcher: tp.Optional["Launcher"] = None
def complete(self):
"""Complete all pending sheep evaluations and add them to the herd."""
while self._pendings:
future = self._pendings.pop(0)
sheep, slurm, job_array_index = future.result()
self._add_sheep(sheep, slurm, job_array_index)
def add_sheep(self, shepherd: Shepherd, argv: tp.List[str], slurm: SlurmConfig,
pool: tp.Optional[ProcessPoolExecutor] = None):
if self._job_array_launcher is None:
self.job_arrays.append([])
job_array_index = len(self.job_arrays) - 1
if pool is None:
self._add_sheep(shepherd.get_sheep_from_argv(argv), slurm, job_array_index)
else:
self._pendings.append(pool.submit(_process, shepherd, argv, slurm, job_array_index))
def _add_sheep(self, sheep: Sheep, slurm: SlurmConfig,
job_array_index: tp.Optional[int] = None):
if sheep.xp.sig in self.sheeps:
return
self.sheeps[sheep.xp.sig] = sheep
self.slurm_configs[sheep.xp.sig] = slurm
if job_array_index is not None:
self.job_arrays[job_array_index].append(sheep.xp.sig)
class Launcher:
"""
A launcher is passed to the explore function and can be called repeatidly
to schedule experiments.
For instance:
launcher(epochs=40)
launcher(bs=64)
A call to `launcher()` will schedule a new experiments, and all arguments
have the same effect as in `Launcher.bind()`.
"""
def __init__(self, shepherd: Shepherd, slurm: SlurmConfig, herd: Herd,
argv: tp.List[str] = [], pool: tp.Optional[ProcessPoolExecutor] = None):
self._shepherd = shepherd
self._main = self._shepherd.main
self._herd = herd
self._slurm = deepcopy(slurm)
self._argv = list(argv)
self._pool = pool
def _copy(self):
return Launcher(self._shepherd, self._slurm, self._herd, self._argv, self._pool)
def bind(self, *args, **kwargs):
"""
Returns a new `Launcher` with different default XP parameters when scheduling experiments.
Each entry in `*args` can be itself a list of dict or strings,
or a string or a dict.
Any string arg is considered directly as something to append to the list
of *argv*, i.e. the command line arguments passed to the training scripts.
A dictionary will be converted to a list of `argv`, with the specific syntax
defined by the `main` function. For an argparse based script, a key
value pair will be converted to `--key=value`, with some special rules
(if the value is True, then it is converted to just `--key`).
A list containing strings or dicts will be the concatenation
of the argv obtained from each of its entries.
For instance
sub_launcher = launcher.bind(["--some_flag=5"], other_flag="test")
"""
new = self._copy()
return new.bind_(*args, **kwargs)
def bind_(self, *args, **kwargs):
"""
In-place version of `Launcher.bind()`.
"""
for arg in args:
self._argv += self._main.value_to_argv(arg)
self._argv += self._main.value_to_argv(kwargs)
return self
def slurm(self, **kwargs):
"""
Return a new `Launcher` with different default Slurm parameters.
For instance
sub_launcher = launcher.slurm(cpus_per_task=20)
"""
new = self._copy()
return new.slurm_(**kwargs)
def slurm_(self, **kwargs):
"""
In-place version of `Launcher.slurm()`.
"""
for key, value in kwargs.items():
if not hasattr(self._slurm, key):
raise AttributeError(f"Invalid Slurm config {key}")
setattr(self._slurm, key, value)
return self
def __call__(self, *args, **kwargs):
"""
Schedule an XP with the current default training hyper-parameters
and Slurm config. You can also provide extra overrides like in `bind()`.
"""
launcher = self.bind(*args, **kwargs)
array_launcher = self._herd._job_array_launcher
if array_launcher is not None:
assert array_launcher._slurm == launcher._slurm, \
"cannot change slurm config inside job array."
self._herd.add_sheep(self._shepherd, launcher._argv, launcher._slurm, self._pool)
@contextmanager
def job_array(self):
"""Context manager to indicate that you wish to launch all the included
XPs using a single job array with the current Slurm parameters.
"""
assert self._herd._job_array_launcher is None, "Cannot stack job arrays"
self._herd._job_array_launcher = self._copy()
self._herd.job_arrays.append([])
try:
yield
finally:
self._herd._job_array_launcher = None
Explore = tp.Callable[[Launcher], None]
class Explorer:
def __init__(self, explore: Explore):
self.explore = explore
def __call__(self, launcher: Launcher):
self.explore(launcher)
def get_grid_metrics(self) -> tp.List[_Node]:
"""Return the metrics that should be displayed in the tracking table.
"""
return []
def get_grid_meta(self) -> tp.List[_Node]:
"""Returns the list of Meta information to display for each XP/job.
"""
return [
tt.leaf("index", align=">"),
tt.leaf("name"),
tt.leaf("state"),
tt.leaf("sig", align=">"),
tt.leaf("sid", align=">"),
]
def get_colors(self):
return ["0", "38;5;245"]
def process_sheep(self, sheep: Sheep, history: tp.List[dict]) -> dict:
"""Process a sheep to return a dict (with possibly nested dict inside)
matching the schema given by `get_grid_metrics`.
This gives more possiblities than `process_history`, which is kept for compatibility,
as one has access to the XP config here.
If this is implemented, it will always be called, otherwise, `process_history` is used.
One should use the history provided here, rather than the one in `sheep.xp.link.history`,
as it has possibly been shortened to align multiple experiments.
"""
raise NotImplementedError()
def process_history(self, history: tp.List[dict]) -> dict:
"""Process history to return a dict (with possibly nested dict inside)
matching the schema given by `get_grid_metrics`.
"""
out = {
'epoch': len(history)
}
for metrics in history:
out.update(metrics)
return out
Classes
class Explorer (explore: Callable[[Launcher], None])
-
Expand source code
class Explorer: def __init__(self, explore: Explore): self.explore = explore def __call__(self, launcher: Launcher): self.explore(launcher) def get_grid_metrics(self) -> tp.List[_Node]: """Return the metrics that should be displayed in the tracking table. """ return [] def get_grid_meta(self) -> tp.List[_Node]: """Returns the list of Meta information to display for each XP/job. """ return [ tt.leaf("index", align=">"), tt.leaf("name"), tt.leaf("state"), tt.leaf("sig", align=">"), tt.leaf("sid", align=">"), ] def get_colors(self): return ["0", "38;5;245"] def process_sheep(self, sheep: Sheep, history: tp.List[dict]) -> dict: """Process a sheep to return a dict (with possibly nested dict inside) matching the schema given by `get_grid_metrics`. This gives more possiblities than `process_history`, which is kept for compatibility, as one has access to the XP config here. If this is implemented, it will always be called, otherwise, `process_history` is used. One should use the history provided here, rather than the one in `sheep.xp.link.history`, as it has possibly been shortened to align multiple experiments. """ raise NotImplementedError() def process_history(self, history: tp.List[dict]) -> dict: """Process history to return a dict (with possibly nested dict inside) matching the schema given by `get_grid_metrics`. """ out = { 'epoch': len(history) } for metrics in history: out.update(metrics) return out
Methods
def get_colors(self)
-
Expand source code
def get_colors(self): return ["0", "38;5;245"]
def get_grid_meta(self) ‑> List[treetable.table._Node]
-
Returns the list of Meta information to display for each XP/job.
Expand source code
def get_grid_meta(self) -> tp.List[_Node]: """Returns the list of Meta information to display for each XP/job. """ return [ tt.leaf("index", align=">"), tt.leaf("name"), tt.leaf("state"), tt.leaf("sig", align=">"), tt.leaf("sid", align=">"), ]
def get_grid_metrics(self) ‑> List[treetable.table._Node]
-
Return the metrics that should be displayed in the tracking table.
Expand source code
def get_grid_metrics(self) -> tp.List[_Node]: """Return the metrics that should be displayed in the tracking table. """ return []
def process_history(self, history: List[dict]) ‑> dict
-
Process history to return a dict (with possibly nested dict inside) matching the schema given by
get_grid_metrics
.Expand source code
def process_history(self, history: tp.List[dict]) -> dict: """Process history to return a dict (with possibly nested dict inside) matching the schema given by `get_grid_metrics`. """ out = { 'epoch': len(history) } for metrics in history: out.update(metrics) return out
def process_sheep(self, sheep: Sheep, history: List[dict]) ‑> dict
-
Process a sheep to return a dict (with possibly nested dict inside) matching the schema given by
get_grid_metrics
. This gives more possiblities thanprocess_history
, which is kept for compatibility, as one has access to the XP config here. If this is implemented, it will always be called, otherwise,process_history
is used.One should use the history provided here, rather than the one in
sheep.xp.link.history
, as it has possibly been shortened to align multiple experiments.Expand source code
def process_sheep(self, sheep: Sheep, history: tp.List[dict]) -> dict: """Process a sheep to return a dict (with possibly nested dict inside) matching the schema given by `get_grid_metrics`. This gives more possiblities than `process_history`, which is kept for compatibility, as one has access to the XP config here. If this is implemented, it will always be called, otherwise, `process_history` is used. One should use the history provided here, rather than the one in `sheep.xp.link.history`, as it has possibly been shortened to align multiple experiments. """ raise NotImplementedError()
class Herd (sheeps: Dict[str, Sheep] = <factory>, slurm_configs: Dict[str, SlurmConfig] = <factory>, job_arrays: List[List[str]] = <factory>)
-
Represents a herd of sheeps ready to be scheduled.
Expand source code
class Herd: """Represents a herd of sheeps ready to be scheduled. """ sheeps: tp.Dict[str, Sheep] = field(default_factory=OrderedDict) slurm_configs: tp.Dict[str, SlurmConfig] = field(default_factory=dict) job_arrays: tp.List[tp.List[str]] = field(default_factory=list) # Sheeps that need to be evaluated in a process pool for faster execution. _pendings: tp.List[Future] = field(default_factory=list) _job_array_launcher: tp.Optional["Launcher"] = None def complete(self): """Complete all pending sheep evaluations and add them to the herd.""" while self._pendings: future = self._pendings.pop(0) sheep, slurm, job_array_index = future.result() self._add_sheep(sheep, slurm, job_array_index) def add_sheep(self, shepherd: Shepherd, argv: tp.List[str], slurm: SlurmConfig, pool: tp.Optional[ProcessPoolExecutor] = None): if self._job_array_launcher is None: self.job_arrays.append([]) job_array_index = len(self.job_arrays) - 1 if pool is None: self._add_sheep(shepherd.get_sheep_from_argv(argv), slurm, job_array_index) else: self._pendings.append(pool.submit(_process, shepherd, argv, slurm, job_array_index)) def _add_sheep(self, sheep: Sheep, slurm: SlurmConfig, job_array_index: tp.Optional[int] = None): if sheep.xp.sig in self.sheeps: return self.sheeps[sheep.xp.sig] = sheep self.slurm_configs[sheep.xp.sig] = slurm if job_array_index is not None: self.job_arrays[job_array_index].append(sheep.xp.sig)
Class variables
var job_arrays : List[List[str]]
var sheeps : Dict[str, Sheep]
var slurm_configs : Dict[str, SlurmConfig]
Methods
def add_sheep(self, shepherd: Shepherd, argv: List[str], slurm: SlurmConfig, pool: Optional[concurrent.futures.process.ProcessPoolExecutor] = None)
-
Expand source code
def add_sheep(self, shepherd: Shepherd, argv: tp.List[str], slurm: SlurmConfig, pool: tp.Optional[ProcessPoolExecutor] = None): if self._job_array_launcher is None: self.job_arrays.append([]) job_array_index = len(self.job_arrays) - 1 if pool is None: self._add_sheep(shepherd.get_sheep_from_argv(argv), slurm, job_array_index) else: self._pendings.append(pool.submit(_process, shepherd, argv, slurm, job_array_index))
def complete(self)
-
Complete all pending sheep evaluations and add them to the herd.
Expand source code
def complete(self): """Complete all pending sheep evaluations and add them to the herd.""" while self._pendings: future = self._pendings.pop(0) sheep, slurm, job_array_index = future.result() self._add_sheep(sheep, slurm, job_array_index)
class Launcher (shepherd: Shepherd, slurm: SlurmConfig, herd: Herd, argv: List[str] = [], pool: Optional[concurrent.futures.process.ProcessPoolExecutor] = None)
-
A launcher is passed to the explore function and can be called repeatidly to schedule experiments.
For instance:
launcher(epochs=40) launcher(bs=64)
A call to
launcher()
will schedule a new experiments, and all arguments have the same effect as inLauncher.bind()
.Expand source code
class Launcher: """ A launcher is passed to the explore function and can be called repeatidly to schedule experiments. For instance: launcher(epochs=40) launcher(bs=64) A call to `launcher()` will schedule a new experiments, and all arguments have the same effect as in `Launcher.bind()`. """ def __init__(self, shepherd: Shepherd, slurm: SlurmConfig, herd: Herd, argv: tp.List[str] = [], pool: tp.Optional[ProcessPoolExecutor] = None): self._shepherd = shepherd self._main = self._shepherd.main self._herd = herd self._slurm = deepcopy(slurm) self._argv = list(argv) self._pool = pool def _copy(self): return Launcher(self._shepherd, self._slurm, self._herd, self._argv, self._pool) def bind(self, *args, **kwargs): """ Returns a new `Launcher` with different default XP parameters when scheduling experiments. Each entry in `*args` can be itself a list of dict or strings, or a string or a dict. Any string arg is considered directly as something to append to the list of *argv*, i.e. the command line arguments passed to the training scripts. A dictionary will be converted to a list of `argv`, with the specific syntax defined by the `main` function. For an argparse based script, a key value pair will be converted to `--key=value`, with some special rules (if the value is True, then it is converted to just `--key`). A list containing strings or dicts will be the concatenation of the argv obtained from each of its entries. For instance sub_launcher = launcher.bind(["--some_flag=5"], other_flag="test") """ new = self._copy() return new.bind_(*args, **kwargs) def bind_(self, *args, **kwargs): """ In-place version of `Launcher.bind()`. """ for arg in args: self._argv += self._main.value_to_argv(arg) self._argv += self._main.value_to_argv(kwargs) return self def slurm(self, **kwargs): """ Return a new `Launcher` with different default Slurm parameters. For instance sub_launcher = launcher.slurm(cpus_per_task=20) """ new = self._copy() return new.slurm_(**kwargs) def slurm_(self, **kwargs): """ In-place version of `Launcher.slurm()`. """ for key, value in kwargs.items(): if not hasattr(self._slurm, key): raise AttributeError(f"Invalid Slurm config {key}") setattr(self._slurm, key, value) return self def __call__(self, *args, **kwargs): """ Schedule an XP with the current default training hyper-parameters and Slurm config. You can also provide extra overrides like in `bind()`. """ launcher = self.bind(*args, **kwargs) array_launcher = self._herd._job_array_launcher if array_launcher is not None: assert array_launcher._slurm == launcher._slurm, \ "cannot change slurm config inside job array." self._herd.add_sheep(self._shepherd, launcher._argv, launcher._slurm, self._pool) @contextmanager def job_array(self): """Context manager to indicate that you wish to launch all the included XPs using a single job array with the current Slurm parameters. """ assert self._herd._job_array_launcher is None, "Cannot stack job arrays" self._herd._job_array_launcher = self._copy() self._herd.job_arrays.append([]) try: yield finally: self._herd._job_array_launcher = None
Methods
def bind(self, *args, **kwargs)
-
Returns a new
Launcher
with different default XP parameters when scheduling experiments.Each entry in
*args
can be itself a list of dict or strings, or a string or a dict.Any string arg is considered directly as something to append to the list of argv, i.e. the command line arguments passed to the training scripts.
A dictionary will be converted to a list of
argv
, with the specific syntax defined by themain
function. For an argparse based script, a key value pair will be converted to--key=value
, with some special rules (if the value is True, then it is converted to just--key
).A list containing strings or dicts will be the concatenation of the argv obtained from each of its entries.
For instance
sub_launcher = launcher.bind(["--some_flag=5"], other_flag="test")
Expand source code
def bind(self, *args, **kwargs): """ Returns a new `Launcher` with different default XP parameters when scheduling experiments. Each entry in `*args` can be itself a list of dict or strings, or a string or a dict. Any string arg is considered directly as something to append to the list of *argv*, i.e. the command line arguments passed to the training scripts. A dictionary will be converted to a list of `argv`, with the specific syntax defined by the `main` function. For an argparse based script, a key value pair will be converted to `--key=value`, with some special rules (if the value is True, then it is converted to just `--key`). A list containing strings or dicts will be the concatenation of the argv obtained from each of its entries. For instance sub_launcher = launcher.bind(["--some_flag=5"], other_flag="test") """ new = self._copy() return new.bind_(*args, **kwargs)
def bind_(self, *args, **kwargs)
-
In-place version of
Launcher.bind()
.Expand source code
def bind_(self, *args, **kwargs): """ In-place version of `Launcher.bind()`. """ for arg in args: self._argv += self._main.value_to_argv(arg) self._argv += self._main.value_to_argv(kwargs) return self
def job_array(self)
-
Context manager to indicate that you wish to launch all the included XPs using a single job array with the current Slurm parameters.
Expand source code
@contextmanager def job_array(self): """Context manager to indicate that you wish to launch all the included XPs using a single job array with the current Slurm parameters. """ assert self._herd._job_array_launcher is None, "Cannot stack job arrays" self._herd._job_array_launcher = self._copy() self._herd.job_arrays.append([]) try: yield finally: self._herd._job_array_launcher = None
def slurm(self, **kwargs)
-
Return a new
Launcher
with different default Slurm parameters.For instance
sub_launcher = launcher.slurm(cpus_per_task=20)
Expand source code
def slurm(self, **kwargs): """ Return a new `Launcher` with different default Slurm parameters. For instance sub_launcher = launcher.slurm(cpus_per_task=20) """ new = self._copy() return new.slurm_(**kwargs)
def slurm_(self, **kwargs)
-
In-place version of
Launcher.slurm()
.Expand source code
def slurm_(self, **kwargs): """ In-place version of `Launcher.slurm()`. """ for key, value in kwargs.items(): if not hasattr(self._slurm, key): raise AttributeError(f"Invalid Slurm config {key}") setattr(self._slurm, key, value) return self
class ProcessException (*args, **kwargs)
-
Unspecified run-time error.
Expand source code
class ProcessException(RuntimeError): pass
Ancestors
- builtins.RuntimeError
- builtins.Exception
- builtins.BaseException