moolib.Accumulator¶
- class moolib.Accumulator¶
Accumulate and synchronize gradients and state from multiple peers in the cohort.
This class allows for the accumulation and synchronization of gradients and model weights during distributed training. The communication requires an underlying
Group
object, which can either be passed into the constructor, or will be created by the accumulator during initialisation.Many of the calls on the Accumulator involve synchronisation across the network. This occurs in the
update()
call, that must be called regularly, throughout training.Example
The accumulator takes a moolib
Group
to coordinate:model = ... peer = moolib.Rpc() group = moolib.Group(peer, "foo_group")) accumulator = moolib.Accumulator("bar", model.parameters(), model.buffers(), group=group) accumulator.connect(ADDRESS) opt = torch.optim.Adam(model.parameters(), lr=0.001) while True: # Update the status of the objects across the network group.update() accumulator.update() # 0. Check we are connected. if not accumulator.connected(): time.sleep(0.25) continue # 1. Synchronize our state. if accumulator.wants_state(): accumulator.set_state({"optimizer": opt.state_dict()}) if accumulator.has_new_state(): opt.load_state_dict(accumulator.state()["optimizer"]) # 2. Generate our gradients then reduce them. if accumulator.wants_gradients(): y_pred = model(X) loss = ... loss.backward() accumulator.reduce_gradients(batch_size) # has_gradients() -> True after this # 3. Step our gradients then reset. if accumulator.has_gradients(): opt.step() accumulator.zero_gradients() # has_gradients() -> False after this
- __init__()¶
Init.
The
Accumulator
performs its operations on an underlyingGroup
object. This can either be passed in in the group keyword, or will be constructed by the Accumulator, with a group name based on the Accumulator’s name.- Parameters
name (str) – the name of the accumulator.
parameters (Iterable[torch.Parameter]) – the parameters to accumulate - eg
model.parameters()
.buffers (Iterable[torch.Tensor]) – the buffers to accumulate - eg
model.buffers()
.group (Optional[moolib.Group]) – a
Group
on which to perform accumulation. If None, a newGroup
will be constructed, with a name based onname
parameter.
Methods
Init.
Connect the underlying Rpc object to a
Broker
at a given network address.Check whether the underlying
Group
is connected to aBroker
and is ready to train.Return a dict of statistics about the gradients syncing process.
Returns the name of the leader in the group.
Returns
True
if the accumulator has gradients that have been reduced.Returns
True
if the accumulator been given a new state to sync.Return true iff this peer is the leader of the group.
Return the current model version.
Reduce sum of the gradients across all the accumulators.
Set the current model version.
Set the number of parallel gradient reduction operations.
Set the user state for synchronization.
Set the virtual batch size of the reduction operation.
Force the accumulator to skip this round of gradient reductions.
Returns the current user state.
Update the state of the accumulator across the network.
Returns
True
if the accumulator is ready to reduce more gradients or skip gradients.Returns
True
if the accumulator has requested a new state to sync.Reset the gradients of the accumulator to zero.
- connect()¶
Connect the underlying Rpc object to a
Broker
at a given network address.This is a non-blocking call, and moolib will maintain this connection by regularly reconnecting if the connection is lost.
- Parameters
address (str) – address to connect to (eg:
"127.0.0.1:1234"
)
- connected()¶
Check whether the underlying
Group
is connected to aBroker
and is ready to train.- Returns
bool
- get_gradient_stats()¶
Return a dict of statistics about the gradients syncing process.
This include information about
batch_size
,num_updates
andnum_skipped
.- Returns
statistics about the gradient syncing.
- Return type
(dict)
- get_leader()¶
Returns the name of the leader in the group.
- Returns
returns the name of the leader in the group.
- Return type
(str)
- has_gradients()¶
Returns
True
if the accumulator has gradients that have been reduced.This will return True after a call to
reduce_gradients
orskip_gradients
has been executed.- Returns
bool
- has_new_state()¶
Returns
True
if the accumulator been given a new state to sync.- Returns
bool
- is_leader()¶
Return true iff this peer is the leader of the group.
- Returns
whether this peer is the leader of the group.
- Return type
(bool)
- model_version()¶
Return the current model version. This number is automatically incremented when
zero_gradients
is called. The peer with the highest model version will be selected as the leader.- Returns
the number of updates.
- Return type
n (int)
- reduce_gradients()¶
Reduce sum of the gradients across all the accumulators.
At each step, if
wants_gradients
returns true, eitherreduce_gradients
orskip_gradients
must be called.- Parameters
batch_size (int) – The size of the virtual batch_size to reduce to.
- set_model_version()¶
Set the current model version.
The default value is:
- Parameters
n (int) – the virtual batch size.
- set_parallel_gradients()¶
Set the number of parallel gradient reduction operations.
The default value is:
- Parameters
n (int) – the number of parallel gradient operations.
- set_state()¶
Set the user state for synchronization.
This state is the user-defined state that must be synchronized for training to be successful. This often includes the model weights and optimizer
state_dicts
, along with any other objects.This function accepts any objects that can be
pickled
. These objects will what is returned in a call tostate
.- Parameters
state (object) – The current user-defined state to synchronize.
- set_virtual_batch_size()¶
Set the virtual batch size of the reduction operation.
The default value is:
- Parameters
n (int) – the virtual batch size.
- skip_gradients()¶
Force the accumulator to skip this round of gradient reductions.
At each step, if
wants_gradients
returns true, eitherreduce_gradients
orskip_gradients
must be called.
- state()¶
Returns the current user state.
See
set_state
for more details on what is returned.- Returns
The state (as set in
set_state
)- Return type
Object
- update()¶
Update the state of the accumulator across the network.
This call executes the communication of the accumulator with the network, including connecting to the network, sending or receiving state updates.
- wants_gradients()¶
Returns
True
if the accumulator is ready to reduce more gradients or skip gradients.- Returns
bool
- wants_state()¶
Returns
True
if the accumulator has requested a new state to sync.This is required for new peers to get their initial state to start training, and is used to ensure synchronization of the state during training. The new state can be set using
set_state
.- Returns
bool
- zero_gradients()¶
Reset the gradients of the accumulator to zero.