๐Ÿ‘จโ€๐Ÿ‘ฉโ€๐Ÿ‘งโ€๐Ÿ‘ฆ Distributed Training#

Composer supports distributed training on multiple devices, whether it be multiple GPUs on a single node or multiple GPUs across multiple nodes.

Data Parallelism#

Composer distributes work across devices via data-parallelism-only. We choose this in order to provide the most flexibility to algorithms, which can modify the training loop in complex ways. Data parallelism greatly simplifies model building and memory management. Every GPU is performing the same work, so inspecting the rank zero is sufficient to reason about memory, performance, and other properties.

Within Composer, we have two options for data-parallelism-only execution: Pytorch DDP`_ and DeepSpeed Zero. We currently default to Pytorch DDP, though DeepSpeed Zero can provide better performance and lower memory utilization when configured correctly.

Usage#

To launch a multi-GPU training job, we provide the composer launcher:

# run training on 8 GPUs
>>> composer -n 8 my_training_script.py

Under the hood, this script (source code here) sets the required torch.distributed environment variables, launches the processes, and runs the script on each process.

Note

The batch_size passed to your dataloader should be the per-device minibatch size. We further split this into smaller microbatches with gradient accumulation.

For additional configurations of our launcher script, run composer --help.

usage: composer [-h] [--version] [-n NPROC] [--stdout STDOUT]
                [--stderr STDERR] [-v] [-m] [-c] [--world_size WORLD_SIZE]
                [--base_rank BASE_RANK] [--node_rank NODE_RANK]
                [--master_addr MASTER_ADDR] [--master_port MASTER_PORT]
                training_script ...

Named Arguments#

--version

show programโ€™s version number and exit

-n, --nproc

The number of processes to launch on this node. Overrides env var LOCAL_WORLD_SIZE if specified; otherwise, defaults to max(1, torch.cuda.device_count()).

--stdout

Format string for a filename to dump the STDOUT from the non-local-rank-zero processes. The local rank zero process will be piped through to STDOUT. The available format variables are: โ€˜{rank}โ€™, โ€˜{local_rank}โ€™, โ€˜{world_size}โ€™, โ€˜{node_rank}โ€™, and โ€˜{local_world_size}โ€™. If specified, it is recommended to include โ€˜{rank}โ€™ or โ€˜{local_rank}โ€™ in the filename so each rank will write to its own file. By default, the STDOUT of the non-local-rank-zero processes is discarded; instead, use the FileLogger within Composer. This logger captures and saves the STDOUT of each process.

--stderr

Format string for a filename to dump the STDERR from the non-local-rank-zero processes. The local rank zero process will be piped through to STDERR. The available format variables are: โ€˜{rank}โ€™, โ€˜{local_rank}โ€™, โ€˜{world_size}โ€™, โ€˜{node_rank}โ€™, and โ€˜{local_world_size}โ€™. If specified, it is recommended to include โ€˜{rank}โ€™ or โ€˜{local_rank}โ€™ in the filename so each rank will write to its own file. By default, the STDERR of the non-local-rank-zero processes is discarded; instead, use the FileLogger within Composer. This logger captures and saves the STDERR of each process.

-v, --verbose

If set, print verbose messages

Default: False

-m, --module_mode

If set, run the training script as a module instead of as a script. Cannot be used in conjunction with command_mode

Default: False

-c, --command_mode

If set, run the training script as a command (i.e. without python). Cannot be used in conjunction with module_mode.

Default: False

required arguments#

training_script

The path to the training script used to initialize a single training process. Should be followed by any command-line arguments the script should be launched with.

training_script_args

Any arguments for the training script, given in the expected order.

multi-node arguments#

These arguments generally only need to be set when training in a multi-node environment, i.e. when the world_size is bigger than nproc.

--world_size

The total number of processes to launch across all nodes. Setting this to a value greater than nproc indicates a multi-node environment. Overrides env var WORLD_SIZE. Defaults to nproc.

--base_rank

The rank of the lowest ranked process to launch on this node. Specifying a base_rank B and an nproc N will spawn processes with global ranks [B, B+1, โ€ฆ B+N-1]. In a multi-node environment, at least one of base_rank and node_rank must be specified. If only one of base_rank and node_rank are provided, it is assumed that all nodes have the same amount of processes, and that the two values are related as node_rank * nproc = base_rank. If this is not the case, both base_rank and node_rank must be provided. Overrides env var BASE_RANK. Defaults to 0 in a single-node environment.

--node_rank

The rank of this node. See base_rank for information on when this must be provided. Overrides env var NODE_RANK. Defaults to 0 in a single-node environment.

--master_addr

The FQDN of the node hosting the C10d TCP store. For single-node operation, this can generally be left as 127.0.0.1. Overrides env var MASTER_ADDR. Defaults to 127.0.0.1 in a single-node environment.

--master_port

The port on the master hosting the C10d TCP store. If you are running multiple trainers on a single node, this generally needs to be unique for each one. Overrides env var MASTER_PORT. Defaults to a random free port in a single-node environment.

Distributed Properties#

Developers may need to access the current rank or world size in a distributed setting. For example, a callback may only want to log something for rank zero. Use our composer.utils.dist module to retrieve this information. The methods are similiar to torch.distributed, but also return defaults in a non-distributed setting.

from composer.utils import dist

dist.get_world_size()  # torch.distributed.get_world_size()
dist.get_local_rank()
dist.get_global_rank()  # torch.distributed.get_rank()

For all retrievable properties, see composer.utils.dist.

Space-time Equivalence#

We consider an equivalency principle between distributed training and gradient accumulation. That is, batches can either be parallelized across space (e.g. devices) or across time (e.g. gradient accumulation). Furthermore, the two dimensions are interchangable โ€“ more devices, less gradient accumulation, and vice versa. Our trainer strives to respect this equivalency and ensure identical behavior regardless of the combinations of space and time parallelization used.

Deepspeed#

Composer comes with DeepSpeed support, allowing you to leverage their full set of features that makes it easier to train large models across (1) any type of GPU and (2) multiple nodes. For more details on DeepSpeed, see their website.

We support optimizer and gradient sharing via Deepspeed Zero`_ stages 1 and 2 respectively. In the future, weโ€™ll support model sharding via Zero-3. These methods reduce model state memory by a factor of (1 / the number of data-parallel devices).

To enable DeepSpeed, simply pass in a config as specified in the DeepSpeed docs here.

# run_trainer.py

from composer import Trainer

trainer = Trainer(
    model=model,
    train_dataloader=train_dataloader,
    eval_dataloader=eval_dataloader,
    max_duration='160ep',
    device='gpu',
    deepspeed_config={
        "train_batch_size": 2048,
        "fp16": {"enabled": True},
    })

Providing an empty dictionary to deepspeed is also valid. The deepspeed defaults will be used and other fields (such as precision) will be inferred from the trainer.

Warning

The deepspeed_config must not conflict with any other parameters passed to the trainer.

Warning

Not all algorithms have been tested with Deepspeed, please proceed with caution.