composer.utils.dist#

Helper methods for torch.distributed.

To use torch.distributed, launch your training script with the composer launcher for distributed training. For example, the following command launches an eight-process training run.

composer -n 8 path/to/train.py

The composer launcher will automatically configure the following environment variables, which are required for distributed training:

  • RANK: The global rank of the process, which should be on [0; WORLD_SIZE - 1].

  • LOCAL_RANK: The local rank for the process, which should be on [0; LOCAL_WORLD_SIZE - 1].

  • NODE_RANK: The rank of the node.

  • WORLD_SIZE: The total number of processes.

  • LOCAL_WORLD_SIZE: The number of processes on the current node.

  • MASTER_ADDR: The hostname for the rank-zero process.

  • MASTER_PORT: The port for the rank-zero process.

If none of these environment variables are set, this module will safely assume a single-rank configuration, where:

RANK=0
LOCAL_RANK=0
NODE_RANK=0
WORLD_SIZE=1
LOCAL_WORLD_SIZE=1

Functions

all_gather

Collects a Tensor from each rank and return a sequence of Tensors indexed by rank.

all_gather_object

Collect a pickleable object from each rank and return a list of these objects indexed by rank.

all_reduce

Reduce a tensor by applying the reduce_operation.

barrier

Synchronizes all processes.

broadcast

Broadcasts the tensor to the whole group.

broadcast_object_list

Broadcasts picklable objects in object_list to the whole group.

get_global_rank

Returns the global rank of the current process, which is on [0; WORLD_SIZE - 1].

get_local_rank

Returns the local rank for the current process, which is on [0; LOCAL_WORLD_SIZE - 1].

get_local_world_size

Returns the local world size, which is the number of processes for the current node.

get_node_rank

Returns the node rank.

get_sampler

Constructs a DistributedSampler for a dataset.

get_world_size

Returns the world size, which is the number of processes participating in this training run.

initialize_dist

Initialize the default PyTorch distributed process group.

is_available

Returns whether PyTorch was built with distributed support.

is_initialized

Returns whether PyTorch distributed is initialized.

composer.utils.dist.all_gather(tensor)[source]#

Collects a Tensor from each rank and return a sequence of Tensors indexed by rank.

Parameters

tensor (Tensor) โ€“ Tensor from each rank to be gathered.

Returns

Sequence[Tensor] โ€“ A sequence of tensors indexed by rank.

composer.utils.dist.all_gather_object(obj)[source]#

Collect a pickleable object from each rank and return a list of these objects indexed by rank.

Parameters

obj (TObj) โ€“ Object to be gathered.

Returns

List[TObj] โ€“ A list of objects indexed by rank.

composer.utils.dist.all_reduce(tensor, reduce_operation='SUM')[source]#

Reduce a tensor by applying the reduce_operation.

All ranks get the same, bitwise-identical result.

Parameters
  • tensor (Tensor) โ€“ Input and output of the collective. The function operates in-place.

  • op (optional) โ€“ One of the values from torch.distributed.ReduceOp enum. Specifies an operation used for element-wise reductions.

  • tensor โ€“ Tensor to reduce. The function operates in-place.

  • reduce_operation (str, optional) โ€“

    The reduction operation (default: SUM).

    Valid options are:
    • SUM

    • PRODUCT

    • MIN

    • MAX

    • BAND

    • BOR

    • BXOR

Returns

None โ€“ tensor is modified in-place.

composer.utils.dist.barrier()[source]#

Synchronizes all processes.

This function blocks until all processes reach this function.

composer.utils.dist.broadcast(tensor, src)[source]#

Broadcasts the tensor to the whole group.

tensor must have the same number of elements in all processes participating in the collective. See torch.distributed.broadcast().

Parameters
  • tensor (Tensor) โ€“ Data to be sent if src is the rank of current process, and tensor to be used to save received data otherwise.

  • src (int) โ€“ Source rank

composer.utils.dist.broadcast_object_list(object_list, src=0)[source]#

Broadcasts picklable objects in object_list to the whole group.

Similar to broadcast(), but Python objects can be passed in. Note that all objects in object_list must be picklable in order to be broadcasted.

Parameters
  • object_list (Tensor) โ€“ List of input objects to broadcast. Each object must be picklable. Only objects on the src rank will be broadcast, but each rank must provide lists of equal sizes.

  • src (int, optional) โ€“ Source rank (default: 0)

Returns

None โ€“ object_list will be modified in-place and set to values of object_list from the src rank.

composer.utils.dist.get_global_rank()[source]#

Returns the global rank of the current process, which is on [0; WORLD_SIZE - 1].

Returns

int โ€“ The global rank.

composer.utils.dist.get_local_rank()[source]#

Returns the local rank for the current process, which is on [0; LOCAL_WORLD_SIZE - 1].

Returns

int โ€“ The local rank.

composer.utils.dist.get_local_world_size()[source]#

Returns the local world size, which is the number of processes for the current node.

Returns

int โ€“ The local world size.

composer.utils.dist.get_node_rank()[source]#

Returns the node rank. For example, if there are 2 nodes, and 2 ranks per node, then global ranks 0-1 will have a node rank of 0, and global ranks 2-3 will have a node rank of 1.

Returns

int โ€“ The node rank, starting at 0.

composer.utils.dist.get_sampler(dataset, *, drop_last, shuffle)[source]#

Constructs a DistributedSampler for a dataset. The DistributedSampler assumes that each rank has a complete copy of the dataset. It ensures that each rank sees a unique shard for each epoch containing len(datset) / get_world_size() samples.

Note

If the dataset is already shareded by rank, use a SequentialSampler or RandomSampler.

Parameters
  • dataset (Dataset) โ€“ The dataset.

  • drop_last (bool) โ€“ Whether to trop the last batch.

  • shuffle (bool) โ€“ Whether to shuffle the dataset.

Returns

torch.utils.data.distributed.DistributedSampler โ€“ The sampler.

composer.utils.dist.get_world_size()[source]#

Returns the world size, which is the number of processes participating in this training run.

Returns

int โ€“ The world size.

composer.utils.dist.initialize_dist(backend, timeout)[source]#

Initialize the default PyTorch distributed process group.

This function assumes that the following environment variables are set:

  • RANK: The global rank of the process, which should be on [0; WORLD_SIZE - 1].

  • LOCAL_RANK: The local rank for the process, which should be on [0; LOCAL_WORLD_SIZE - 1].

  • NODE_RANK: The rank of the node.

  • WORLD_SIZE: The total number of processes.

  • LOCAL_WORLD_SIZE: The number of processes on the current node.

  • MASTER_ADDR: The hostname for the rank-zero process.

  • MASTER_PORT: The port for the rank-zero process.

If none of the environment variables are set, this function will assume a single-rank configuration and initialize the default process group using a torch.distributed.HashStore store.

Parameters
  • backend (str) โ€“ The distributed backend to use. Should be gloo for CPU training, or nccl for GPU training.

  • timeout (timedelta) โ€“ The timeout for operations exected against the process group.

composer.utils.dist.is_available()[source]#

Returns whether PyTorch was built with distributed support.

Returns

bool โ€“ Whether PyTorch distributed support is available.

composer.utils.dist.is_initialized()[source]#

Returns whether PyTorch distributed is initialized.

Returns

bool โ€“ Whether PyTorch distributed is initialized.