How-to guide
Debugging
Use the following lines to add more logging during debugging, this will help understand what happens:
import logging
logging.getLogger("exca").setLevel(logging.DEBUG)
Also, use cluster=None
or cluster="debug"
to avoid distributed computation which is harder to debug.
Note: raised error in a decorated task method will have a different type than the actual raised exception (so as to have an error printing the traceback of the initial error)
When communiating with others for help, make sure to send the stack trace as well as the error, as the stack trace is often much more informative than the sole error.
Asynchronous computation with TaskInfra
Job
Results of a cached task computation can be obtained in 2 ways:
either by calling the method directly
task.process()
through the attached
job = task.infra.job()
result.
task = TutorialTask(infra={"folder": tmp_path})
assert task.process() == task.infra.job().result()
Calling job = task.infra.job()
starts the submission if it was not already started, and does not wait for the result, only calling job.result()
will block until completion. This can let you run other computation in your current process / monitor the job (eg through task.infra.status()
) etc.
Batching (job arrays in slurm clusters)
Submitting one task job at a time is not efficient nor a good practice for slurm clusters, as each submission consumes some of slurm scheduler resources. A better way is to submit arrays of job. With TaskInfra
, this is done through a job_array
context:
task = TutorialTask(infra={"folder": tmp_path})
with task.infra.job_array() as array:
# "array" is a list to append/extend with tasks to compute
for k in range(3):
# the following creates a new task with a new "param" value
task_to_compute = task.infra.clone_obj({"param": k})
array.append(task_to_compute)
# leaving the context is non-blocking and submits all tasks to compute
assert array[0].infra.status() in ("completed", "running")
Similarly as with calling task.infra.job()
, previously computed tasks are not resubmitted, unless infra.mode
is set to "force"
, or set to "retry"
with a previously failed computation.
Monitoring
To monitor running jobs on slurm clusters, one can use the squeue
command. However, when running a lot of slurm jobs it can become complex to figure out which job does what. We provide a basic helper function to access the logs and config of a given job (assuming default log folder position): exca.helpers.find_slurm_job
See more details in its API reference.
We also recommend using Turm, which provides a real-time interface to access the stdout/stderr
of running jobs. Simply install it with pip install turm
and you can then use turm --slurm-refresh 20 --me --states RUNNING
to check your running jobs (please use at least 20s for the slurm refresh rate to avoid overloading the cluster).
Efficient caching: cache and class uid exclusion
Consider the following class that defines a process
function which returns a random torch tensor:
import typing as tp
import torch
import numpy as np
class UidTask(pydantic.BaseModel):
seed: int = 12
shape: tp.Tuple[int, int] = (3, 4)
coeff: float = 12.0
device: str = "cpu"
infra: TaskInfra = TaskInfra(version="1")
@infra.apply
def _internal_cached_method(self) -> np.ndarray:
rng = np.random.default_rng(seed=12)
return rng.normal(size=self.shape)
def process(self) -> torch.Tensor:
array = self._internal_cached_method()
tensor = torch.Tensor(array, device=self.device)
return self.coeff * tensor
infra
uses all parameters of the pydantic.BaseModel
to define a uid
(a string) of the class or of the cache. Two objects with a same uid
should behave the same way / provide the same results. Preferably two objects which provide the same computation should also have the same uid, but there are a couple of issues here.
Class uid
The uid of the class takes into account all parameters, you can check the parameters through the config
for instance:
task = UidTask(device="cuda", coeff=3)
assert task.infra.config(uid=True) == {
'seed': 12,
'shape': (3, 4),
'coeff': 3.0,
'device': 'cuda',
'infra': {'version': '1'}
}
assert task.infra.config(uid=True, exclude_defaults=True) == {
'coeff': 3.0,
'device': 'cuda'
}
In practice the uid
is computed from the non-default parameters, so the uid
will be something like coeff=3,device=cuda-4f4ca7cb
in this case (the last part being a hash for security reasons).
device
however defines where the computation is performed but has no impact on the actual result, so it should not impact the uid of the class. This parameter should therefore be excluded from the cache, this can be done by either having a _exclude_from_class_uid
method or class variable.
All infra
parameters except version
are ignored in such a way because caching or the required resources for computation (number of cpus/gpus for instance) do not impact the actual result of the computation. version
is therefore the only parameter of the infra
that will appear in the config even when specifying caching or remote computation options.
Cache uid
Cache also requires a uid
that will be used to store the results. All parameters ignored from the class uid
are also ignored for the cache (such as device
). We can however see in this case that while the UidTask
does depend on coeff
parameter (used in process
), the cache does not, because _internal_cached_method
does not use it. We can then further ignore coeff
from the cache by specifying it through the exclude_from_cache_uid
of the infra.apply
decorator. This parameter can be one of:
a list of parameter names to ignore
a method defined in the class returning a list of parameters names to ignore
the method above specified by name under the format
"method:<mehtod_name>"
The main difference between 2 and 3 is that a method override in a subclass will only be taken into account with option 3.
Updated task
Here an updated class with better uid
handling:
class UidTask(pydantic.BaseModel):
seed: int = 12
shape: tp.Tuple[int, int] = (3, 4)
coeff: float = 12.0
device: str = "cpu"
infra: TaskInfra = TaskInfra(version="1")
_exclude_from_cache_uid=("device",)
@infra.apply(exclude_from_cache_uid=("coeff",))
def _internal_cached_method(self) -> np.ndarray:
rng = np.random.default_rng(seed=12)
return rng.normal(size=self.shape)
def process(self) -> torch.Tensor:
array = self._internal_cached_method()
tensor = torch.Tensor(array, device=self.device)
return self.coeff * tensor
and here is an equivalent class with different options for specifying the class and cache exclusions:
class UidTask(pydantic.BaseModel):
seed: int = 12
shape: tp.Tuple[int, int] = (3, 4)
coeff: float = 12.0
device: str = "cpu"
infra: TaskInfra = TaskInfra(version="1")
def _exclude_from_cache_uid(self) -> tp.List[str]:
return ["device"]
def _cache_exclusion(self) -> tp.List[str]:
return ["coeff"]
@infra.apply(exclude_from_cache_uid="method:_cache_exclusion")
def _internal_cached_method(self) -> np.ndarray:
rng = np.random.default_rng(seed=12)
return rng.normal(size=self.shape)
def process(self) -> torch.Tensor:
array = self._internal_cached_method()
tensor = torch.Tensor(array, device=self.device)
return self.coeff * tensor
Infra Versioning & default heritage
All attributes of infra configurations are ignored for uid computation (i.e. modifying eg the type of cluster / number of cpus in the infra will not modify the uid), except their version
attribute. This allows for cache invalidation. Indeed, changing this value will change the current class uid and therefore lead to the creation of a new cache folder. Cache of classes depending on the class with a new version will not be usable anymore (because of conflicting default version value) and the cache folder of these classes may need to be deleted.
Furthermore, all attributes of the default infra set on a config class serve as seed/default values when you instantiate a config instance. So, when instantiating the TutorialMap
class (see tutorial), you will get a version="1"
in your instance if you do not override it:
mapper = TutorialMap(infra={"folder": tmp_path})
assert mapper.infra.version == "1"
# even though the default version in a MapInfra is actually "0":
assert MapInfra(**{"folder": tmp_path}).version == "0"
Be careful, this behavior is limited to infra objects, so as to preset version and computation defaults more easily, other nested config do not behave this way.
Using pydantic’s discriminator
pydantic
allows for automatic selection between several sub-configurations, such as between a Dog
or a Cat
sub-configuration below:
import typing as tp
class Dog(pydantic.BaseModel):
name: tp.Literal["dog"] = "dog"
dog_param: int = 12
class Cat(pydantic.BaseModel):
name: tp.Literal["cat"] = "cat"
cat_param: str = "mew"
class Household(pydantic.BaseModel):
pet: Dog | Cat = pydantic.Field(..., discriminator="name")
household = Household(pet={"name": "dog"})
assert household.pet.dog_param == 12
The syntax requires providing a “discriminator” field which is a constant (a Literal
) for selecting which class to instantiate. While explicitely stating the discriminator through pydantic.Field
(as above) or through an annotation (see in pydantic
documentation) is not strictly necessary with pydantic
, it is necessary when working with infras so that the discriminator be part of the uid.
Workdir/code copy
Running code on a cluster while still working on the code can be dangereous, as the job will use the state of the code at start time and not at submission time.
In order to avoid surprises, both task/map infra support a workdir
for copying the code to a different working directory where the decorated function will be running from:
Check its parameters here, in particular, the copied
parameter can be used to select folders or files or packages installed in editable mode that should be copied to the job’s working directory, like in the TutorialTask
class from the tutorial section:
task = TutorialTask(infra={
"cluster": "local",
"folder": tmp_path,
# will create a copy of exca in a folder and run from there:
"workdir": {"copied": ["exca"]},
})
Note that the change of working directory (and possibly the copy) only happens when the infra is called for submitting the decorated function. Depending on your code, this may not be at the very beginning of your execution.