moolib.Accumulator

class moolib.Accumulator

Accumulate and synchronize gradients and state from multiple peers in the cohort.

This class allows for the accumulation and synchronization of gradients and model weights during distributed training. The communication requires an underlying Group object, which can either be passed into the constructor, or will be created by the accumulator during initialisation.

Many of the calls on the Accumulator involve synchronisation across the network. This occurs in the update() call, that must be called regularly, throughout training.

Example

The accumulator takes a moolib Group to coordinate:

model = ...
peer = moolib.Rpc()
group = moolib.Group(peer, "foo_group"))
accumulator = moolib.Accumulator("bar", model.parameters(), model.buffers(), group=group)
accumulator.connect(ADDRESS)

opt = torch.optim.Adam(model.parameters(), lr=0.001)

while True:
    # Update the status of the objects across the network
    group.update()
    accumulator.update()

    # 0. Check we are connected.
    if not accumulator.connected():
        time.sleep(0.25)
        continue

    # 1. Synchronize our state.
    if accumulator.wants_state():
        accumulator.set_state({"optimizer": opt.state_dict()})
    if accumulator.has_new_state():
        opt.load_state_dict(accumulator.state()["optimizer"])

    # 2. Generate our gradients then reduce them.
    if accumulator.wants_gradients():
        y_pred = model(X)
        loss = ...
        loss.backward()
        accumulator.reduce_gradients(batch_size) # has_gradients() -> True after this

    # 3. Step our gradients then reset.
    if accumulator.has_gradients():
        opt.step()
        accumulator.zero_gradients()  # has_gradients() -> False after this
__init__()

Init.

The Accumulator performs its operations on an underlying Group object. This can either be passed in in the group keyword, or will be constructed by the Accumulator, with a group name based on the Accumulator’s name.

Parameters
  • name (str) – the name of the accumulator.

  • parameters (Iterable[torch.Parameter]) – the parameters to accumulate - eg model.parameters().

  • buffers (Iterable[torch.Tensor]) – the buffers to accumulate - eg model.buffers().

  • group (Optional[moolib.Group]) – a Group on which to perform accumulation. If None, a new Group will be constructed, with a name based on name parameter.

Methods

__init__

Init.

connect

Connect the underlying Rpc object to a Broker at a given network address.

connected

Check whether the underlying Group is connected to a Broker and is ready to train.

get_gradient_stats

Return a dict of statistics about the gradients syncing process.

get_leader

Returns the name of the leader in the group.

has_gradients

Returns True if the accumulator has gradients that have been reduced.

has_new_state

Returns True if the accumulator been given a new state to sync.

is_leader

Return true iff this peer is the leader of the group.

model_version

Return the current model version.

reduce_gradients

Reduce sum of the gradients across all the accumulators.

set_model_version

Set the current model version.

set_parallel_gradients

Set the number of parallel gradient reduction operations.

set_state

Set the user state for synchronization.

set_virtual_batch_size

Set the virtual batch size of the reduction operation.

skip_gradients

Force the accumulator to skip this round of gradient reductions.

state

Returns the current user state.

update

Update the state of the accumulator across the network.

wants_gradients

Returns True if the accumulator is ready to reduce more gradients or skip gradients.

wants_state

Returns True if the accumulator has requested a new state to sync.

zero_gradients

Reset the gradients of the accumulator to zero.

connect()

Connect the underlying Rpc object to a Broker at a given network address.

This is a non-blocking call, and moolib will maintain this connection by regularly reconnecting if the connection is lost.

Parameters

address (str) – address to connect to (eg: "127.0.0.1:1234")

connected()

Check whether the underlying Group is connected to a Broker and is ready to train.

Returns

bool

get_gradient_stats()

Return a dict of statistics about the gradients syncing process.

This include information about batch_size, num_updates and num_skipped.

Returns

statistics about the gradient syncing.

Return type

(dict)

get_leader()

Returns the name of the leader in the group.

Returns

returns the name of the leader in the group.

Return type

(str)

has_gradients()

Returns True if the accumulator has gradients that have been reduced.

This will return True after a call to reduce_gradients or skip_gradients has been executed.

Returns

bool

has_new_state()

Returns True if the accumulator been given a new state to sync.

Returns

bool

is_leader()

Return true iff this peer is the leader of the group.

Returns

whether this peer is the leader of the group.

Return type

(bool)

model_version()

Return the current model version. This number is automatically incremented when zero_gradients is called. The peer with the highest model version will be selected as the leader.

Returns

the number of updates.

Return type

n (int)

reduce_gradients()

Reduce sum of the gradients across all the accumulators.

At each step, if wants_gradients returns true, either reduce_gradients or skip_gradients must be called.

Parameters

batch_size (int) – The size of the virtual batch_size to reduce to.

set_model_version()

Set the current model version.

The default value is:

Parameters

n (int) – the virtual batch size.

set_parallel_gradients()

Set the number of parallel gradient reduction operations.

The default value is:

Parameters

n (int) – the number of parallel gradient operations.

set_state()

Set the user state for synchronization.

This state is the user-defined state that must be synchronized for training to be successful. This often includes the model weights and optimizer state_dicts, along with any other objects.

This function accepts any objects that can be pickled. These objects will what is returned in a call to state.

Parameters

state (object) – The current user-defined state to synchronize.

set_virtual_batch_size()

Set the virtual batch size of the reduction operation.

The default value is:

Parameters

n (int) – the virtual batch size.

skip_gradients()

Force the accumulator to skip this round of gradient reductions.

At each step, if wants_gradients returns true, either reduce_gradients or skip_gradients must be called.

state()

Returns the current user state.

See set_state for more details on what is returned.

Returns

The state (as set in set_state)

Return type

Object

update()

Update the state of the accumulator across the network.

This call executes the communication of the accumulator with the network, including connecting to the network, sending or receiving state updates.

wants_gradients()

Returns True if the accumulator is ready to reduce more gradients or skip gradients.

Returns

bool

wants_state()

Returns True if the accumulator has requested a new state to sync.

This is required for new peers to get their initial state to start training, and is used to ensure synchronization of the state during training. The new state can be set using set_state.

Returns

bool

zero_gradients()

Reset the gradients of the accumulator to zero.