fairseq2.gang

This module provides the implementation of the Gang class and its related classes for managing collective operations in a distributed environment.

Classes

class fairseq2.gang.Gang[source]

Bases: ABC

Represents a set of processes that work collectively.

abstract close()[source]

Close and destroy the gang.

abstract make_gang(ranks)[source]

Make a new gang.

Parameters:

ranks (Sequence[int]) – The ranks of processes that will be part of the new gang.

Return type:

Gang | None

abstract as_process_group()[source]

Return this gang as a process group.

Return type:

ProcessGroup

abstract barrier()[source]

Synchronize all processes.

abstract all_reduce(tensor, op)[source]

Reduce tensor across all processes.

Parameters:
  • tensor (Tensor) – The input and output tensor of the operation.

  • op (ReduceOperation) – The element-wise reduce operation.

abstract all_gather(output_tensor, input_tensor)[source]

Gather tensors from all processes and put them in output_tensor.

Parameters:
  • output_tensor (Tensor) – The output tensor to accomodate tensors from all processes.

  • input_tensor (Tensor) – The tensor to be gathered from this process.

abstract all_gather_to_list(output_tensors, input_tensor)[source]

Gather tensors from all processes and put them in output_tensors.

Parameters:
  • output_tensors (list[Tensor]) – The tensor list to accomodate tensors from all processes.

  • input_tensor (Tensor) – The tensor to be gathered from this process.

abstract broadcast(tensor, source_rank=0)[source]

Broadcast tensor from source_rank to all processes.

Parameters:
  • tensor (Tensor) – The tensor to be sent from source_rank.

  • source_rank (int) – The rank of the process from which to broadcast tensor.

abstract broadcast_objects(objects, source_rank=0)[source]

Broadcast picklable objects from source_rank to all processes.

Parameters:
  • objects (list[object]) – The list of picklable objects to broadcast. Each process must provide lists of equal sizes.

  • source_rank (int) – The rank of the process from which to broadcast objects.

abstract property rank: int

The rank of this process in the gang.

abstract property size: int

The number of processes that are part of the gang.

abstract property device: device

The associated device.

class fairseq2.gang.AbstractGang(rank, size, device)[source]

Bases: Gang

Provides a skeletal implementation of Gang.

Parameters:
  • rank (int) – The rank of this process in the gang.

  • size (int) – The number of processes that are part of the gang.

  • device (Device) – The associated device.

final make_gang(ranks)[source]

Make a new gang.

Parameters:

ranks (Sequence[int]) – The ranks of processes that will be part of the new gang.

Return type:

Gang | None

property rank: int

The rank of this process in the gang.

property size: int

The number of processes that are part of the gang.

property device: device

The associated device.

final class fairseq2.gang.FakeGang(*, rank=0, size=1, device=None)[source]

Bases: AbstractGang

Represents a non-distributed gang for local use.

Parameters:
  • rank (int) – The emulated rank of this process in the gang.

  • size (int) – The emulated number of processes that are part of the gang.

  • device (Device | None) – If None; if CUDA is available, the gang will use the default CUDA device of the process; otherwise, it will use the CPU.

close()[source]

Close and destroy the gang.

as_process_group()[source]

Return this gang as a process group.

Return type:

ProcessGroup

barrier()[source]

Synchronize all processes.

all_reduce(tensor, op)[source]

Reduce tensor across all processes.

Parameters:
  • tensor (Tensor) – The input and output tensor of the operation.

  • op (ReduceOperation) – The element-wise reduce operation.

all_gather(output_tensor, input_tensor)[source]

Gather tensors from all processes and put them in output_tensor.

Parameters:
  • output_tensor (Tensor) – The output tensor to accomodate tensors from all processes.

  • input_tensor (Tensor) – The tensor to be gathered from this process.

all_gather_to_list(output_tensors, input_tensor)[source]

Gather tensors from all processes and put them in output_tensors.

Parameters:
  • output_tensors (list[Tensor]) – The tensor list to accomodate tensors from all processes.

  • input_tensor (Tensor) – The tensor to be gathered from this process.

broadcast(tensor, source_rank=0)[source]

Broadcast tensor from source_rank to all processes.

Parameters:
  • tensor (Tensor) – The tensor to be sent from source_rank.

  • source_rank (int) – The rank of the process from which to broadcast tensor.

broadcast_objects(objects, source_rank=0)[source]

Broadcast picklable objects from source_rank to all processes.

Parameters:
  • objects (list[object]) – The list of picklable objects to broadcast. Each process must provide lists of equal sizes.

  • source_rank (int) – The rank of the process from which to broadcast objects.

final class fairseq2.gang.ProcessGroupGang(pg, device, *, monitor_pg=None)[source]

Bases: AbstractGang

Represents a gang that wraps a process group.

Parameters:
  • rank – The rank of this process in the gang.

  • size – The number of processes that are part of the gang.

  • device (Device) – The associated device.

classmethod init_default_process_group(*, device=None, timeout=None, num_threads=None, monitored=False, ok_initialized=False)[source]

Initialize the default process group and wrap it as a gang.

Parameters:
  • device (device | None) – If None; if CUDA is available, the gang will use the default CUDA device of the process; otherwise, it will use the CPU.

  • timeout (timedelta | None) – The timeout for collective operations. If None, the default timeout value (15 minutes) will be used.

  • num_threads (int | None) – The number of threads to use for interaop parallelism.

  • monitored (bool) – If True, puts a monitored barrier before every collective call.

  • ok_initialized (bool) – If True, does not raise an error if the default process group is already initialized.

Return type:

ProcessGroupGang

static from_process_group(pg, device)[source]

Wrap pg as a gang.

Parameters:
  • pg (ProcessGroup) – The process group to wrap.

  • device (device) – The associated device.

Return type:

ProcessGroupGang

classmethod from_default_process_group()[source]

Wrap the default process group as a gang.

Return type:

ProcessGroupGang

close()[source]

Close and destroy the gang.

as_process_group()[source]

Return this gang as a process group.

Return type:

ProcessGroup

barrier()[source]

Synchronize all processes.

all_reduce(tensor, op)[source]

Reduce tensor across all processes.

Parameters:
  • tensor (Tensor) – The input and output tensor of the operation.

  • op (ReduceOperation) – The element-wise reduce operation.

all_gather(output_tensor, input_tensor)[source]

Gather tensors from all processes and put them in output_tensor.

Parameters:
  • output_tensor (Tensor) – The output tensor to accomodate tensors from all processes.

  • input_tensor (Tensor) – The tensor to be gathered from this process.

all_gather_to_list(output_tensors, input_tensor)[source]

Gather tensors from all processes and put them in output_tensors.

Parameters:
  • output_tensors (list[Tensor]) – The tensor list to accomodate tensors from all processes.

  • input_tensor (Tensor) – The tensor to be gathered from this process.

broadcast(tensor, source_rank=0)[source]

Broadcast tensor from source_rank to all processes.

Parameters:
  • tensor (Tensor) – The tensor to be sent from source_rank.

  • source_rank (int) – The rank of the process from which to broadcast tensor.

broadcast_objects(objects, source_rank=0)[source]

Broadcast picklable objects from source_rank to all processes.

Parameters:
  • objects (list[object]) – The list of picklable objects to broadcast. Each process must provide lists of equal sizes.

  • source_rank (int) – The rank of the process from which to broadcast objects.

class fairseq2.gang.GangError[source]

Bases: Exception

class fairseq2.gang.ReduceOperation(value, names=<not given>, *values, module=None, qualname=None, type=None, start=1, boundary=None)[source]

Bases: Enum

Specifies a reduce operation.

SUM = 1
MEAN = 2
PRODUCT = 3
MIN = 4
MAX = 5

Functions

fairseq2.gang.setup_default_gang(*, device=None, timeout=None, monitored=False)[source]

Make the default gang of this process.

Parameters:
  • device (device | None) – If None; if CUDA is available, the gang will use the default CUDA device of the process; otherwise, it will use the CPU.

  • timeout (timedelta | None) – The timeout for collective operations.

  • monitored (bool) – If True, puts a monitored barrier before every collective call.

Return type:

Gang

fairseq2.gang.fake_gangs(device)[source]
Return type:

Gangs

fairseq2.gang.setup_parallel_gangs(root_gang, *, tp_size=1)[source]

Make gangs to be used for data and tensor parallelism.

For instance; if we have 8 devices denoted by g0 to g7 and 2 devices are used for tensor parallelism, this function will make 4 tensor parallel gangs and 2 data parallel gangs as:

4 tensor parallel gangs:

[g0, g1], [g2, g3], [g4, g5], [g6, g7]

2 data parallel gangs:

[g0, g2, g4, g6], [g1, g3, g5, g7]

For efficiency, the caller should make sure adjacent ranks are on the same host. For example, if there are two hosts with a total of 16 GPUs, ranks 0 to 7 belong to the first host and ranks 8 to 15 belong to the second host.

Parameters:
  • root_gang (Gang) – The gang whose topology will be used to make the new gangs.

  • tp_size (int) – The size of tensor parallel gangs.

Returns:

Three gangs: the root gang, the data parallel gang that this process is part of, and the tensor parallel gang that this process is part of.

Return type:

Gangs

fairseq2.gang.broadcast_flag(gang, flag, source_rank=0)[source]

Broadcast flag to all processes in gang from source_rank.

Return type:

bool

fairseq2.gang.all_sum(gang, value)[source]

Sum value over all processes in gang.

Return type:

Tensor

fairseq2.gang.get_world_size()[source]

Return the world size of the running job.

Return type:

int

fairseq2.gang.get_rank()[source]

Return the rank of this process in the running job.

Return type:

int

fairseq2.gang.get_local_world_size()[source]

Return the local world size of the running job.

Return type:

int

fairseq2.gang.get_local_rank()[source]

Return the local rank of this process in the running job.

Return type:

int

fairseq2.gang.is_torchrun()[source]

Return True if this process was spawned by torchrun.

Return type:

bool