Horovod is a distributed training framework that demonstrates excellent scaling efficiency for dense models running on a large number of nodes. It currently supports mainstream deep learning frameworks such as MXNet, TensorFlow, Keras, and PyTorch. It is created at Uber and currently hosted by the Linux Foundation Deep Learning(LF DL).
MXNet is supported starting from Horovod 0.16.0 release.
Compared with the standard distributed training script in MXNet which uses parameter server to distribute and aggregate parameters, Horovod uses ring allreduce and/or tree-based allreduce algorithm to communicate parameters between workers. There is no dedicated server and the communication data size between workers does not depend on the number of workers. Therefore, it scales well in the case where there are a large number of workers and network bandwidth is the bottleneck.
$ pip install mxnet
Note: The known issue when running Horovod with MXNet on a Linux system with GCC version 5.X and above has been resolved. Please use MXNet 1.4.1 or later releases with Horovod 0.16.2 or later releases to avoid the GCC incompatibility issue. MXNet 1.4.0 release works with Horovod 0.16.0 and 0.16.1 releases with the GCC incompatibility issue unsolved.
$ pip install horovod
This basic installation is good for laptops and for getting to know Horovod. If you're installing Horovod on a server with GPUs, read the Horovod on GPU page. If you want to use Docker, read the Horovod in Docker page.
MPI is required to run distributed training with Horovod. Install Open MPI or another MPI implementation. Steps to install Open MPI are listed here.
Note: Open MPI 3.1.3 has an issue that may cause hangs. It is recommended to downgrade to Open MPI 3.1.2 or upgrade to Open MPI 4.0.0.
Distributed MXNet jobs with Horovod can be submitted to a Kubernetes cluster via Kubeflow MPI Operator. Please refer to this example for details, including the Dockerfile with all the dependencies mentioned in previous sections, distributed training Python script based on Horovod, and the YAML configuration file that can be used for submitting a job on a Kubernetes cluster.
To run MXNet with Horovod, make the following additions to your training script:
Run hvd.init()
.
Pin the context to a processor using hvd.local_rank()
. Typically, each Horovod worker is associated with one process. The local rank is a unique ID specifically for all processes running Horovod job on the same node.
Scale the learning rate by number of workers. Effective batch size in synchronous distributed training is scaled by the number of workers. An increase in learning rate compensates for the increased batch size.
Create hvd.DistributedTrainer
with optimizer when using Gluon API. The distributed trainer or optimizer delegates gradient computation to the original optimizer, averages gradients using allreduce, and then applies those averaged gradients.
Add hvd.broadcast_parameters
to broadcast initial variable states from rank 0 to all other processes. This is necessary to ensure consistent initialization of all workers when training is started with random weights or restored from a checkpoint.
Here we provide the building blocks to train a model using MXNet with Horovod. The full examples are in MNIST and ImageNet.
from mxnet import autograd, gluon import mxnet as mx import horovod.mxnet as hvd # Initialize Horovod hvd.init() # Set context to current process context = mx.cpu(hvd.local_rank()) if args.no_cuda else mx.gpu(hvd.local_rank()) num_workers = hvd.size() # Build model model = ... model.hybridize() # Create optimizer optimizer_params = ... opt = mx.optimizer.create('sgd', **optimizer_params) # Create DistributedTrainer, a subclass of gluon.Trainer trainer = hvd.DistributedTrainer(params, opt) # Initialize parameters model.initialize(initializer, ctx=context) # Fetch and broadcast parameters params = model.collect_params() if params is not None: hvd.broadcast_parameters(params, root_rank=0) # Create loss function loss_fn = ... # Train model for epoch in range(num_epoch): train_data.reset() for nbatch, batch in enumerate(train_data, start=1): data = batch.data[0].as_in_context(context) label = batch.label[0].as_in_context(context) with autograd.record(): output = model(data.astype(dtype, copy=False)) loss = loss_fn(output, label) loss.backward() trainer.step(batch_size)
The example commands below show how to run distributed training. See the Running Horovod page for more instructions.
$ mpirun -np 4 \ -H localhost:4 \ -bind-to none -map-by slot \ python train.py
$ mpirun -np 8 \ -H server1:4,server2:4 \ -bind-to none -map-by slot \ -x NCCL_DEBUG=INFO \ -mca pml ob1 -mca btl ^openib \ python train.py
To analyse horovod performance, horovod timeline is a handy tool to trace and visualize the time spent on horovod operations.
A few tuning knobs affect horovod runtime performance (explained here). Apart from HOROVOD_FUSION_THRESHOLD
, sometimes we find increasing HOROVOD_CYCLE_TIME
(up to 100 ms), changing NCCL_ALGO
, and NCCL_MIN_NCHANNELS
improves performance.
If you are running horovod on AWS, you can potentially leverage EFA if your instance supports 100 Gb/s networking. To use EFA, you can refer to the official documentation for the setup instructions, and the environment variables (-x FI_PROVIDER
, -x FI_EFA_TX_MIN_CREDITS
) to set. Besides, you need to make sure EFA library is included in the shared library path (-x LD_LIBRARY_PATH
).