Merge pull request #14 from nudles/master

rearrange contents in
diff --git a/docs-site/docs/ b/docs-site/docs/
index dc3d729..f52ca03 100644
--- a/docs-site/docs/
+++ b/docs-site/docs/
@@ -5,125 +5,40 @@
 <!--- Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements.  See the NOTICE file distributed with this work for additional information regarding copyright ownership.  The ASF licenses this file to you under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the License.  You may obtain a copy of the License at Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.  See the License for the specific language governing permissions and limitations under the License.  -->
-SINGA supports distributed data parallel training and evaulation process based
-on multiprocessing. The following is the illustration of the data parallel
+SINGA supports data parallel training across multiple GPUs (on a single node or
+across different nodes). The following figure illustrates the data parallel
-In the distributed training, each process runs a training script which utilizes
-one GPU. Each process has an individual rank, which gives information of which
-GPU the individual process is using. The training data is partitioned, so that
-each process can evaluate the sub-gradient based on the partitioned training
-data. Once the sub-graident is calculated on each processes, the overall
-stochastic gradient is obtained by all-reducing the sub-gradients evaluated by
-all processes. The all-reduce operation is supported by the NVidia Collective
-Communication Library (NCCL).
+In distributed training, each process (called a worker) runs a training script
+over a single GPU. Each process has an individual communication rank. The
+training data is partitioned among the workers and the model is replicated on
+every worker. In each iteration, the workers read a mini-batch of data (e.g.,
+256 images) from its partition and run the BackPropagation algorithm to compute
+the gradients of the weights, which are averaged via All-Reduce (provided by
+[NCCL]( for weight update following
+stochastic gradient descent algorithms (SGD).
 The all-reduce operation by NCCL can be used to reduce and synchronize the
-parameters from different GPUs. Let's consider a data partitioned distributed
-training using 4 GPUs. Once the sub-gradients from the 4 GPUs are calculated,
-the NCCL can perform the all-reduce process so that all the GPUs can get the sum
-of the sub-gradients over the GPUs:
+gradients from different GPUs. Let's consider the training with 4 GPUs as shown
+below. Once the gradients from the 4 GPUs are calculated, All-Reduce will the
+sum of the gradients over the GPUs and make it available on every GPU. Then the
+averaged gradients can be easily calculated.
-Finally, the parameter update of Stochastic Gradient Descent (SGD) can then be
-performed by using the overall stochastic gradient obtained by the all-reduce
+## Usage
-## Python DistOpt Methods:
+SINGA implements a module called `DistOpt` for distributed training. It replaces
+the normal SGD optimizer for updating the model parameters. The following
+example illustrates the usage of `DistOpt` for training a CNN model over the
+MNIST dataset. The full example is available
-There are a list of methods for distributed training with DistOpt:
+### Example Code
-1. Create a DistOpt with the SGD object and device assignment:
-sgd = opt.SGD(lr=0.005, momentum=0.9, weight_decay=1e-5)
-sgd = opt.DistOpt(sgd)
-dev = device.create_cuda_gpu_on(sgd.rank_in_local)
-2. Backward propagation and distributed parameter update:
-&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;loss is the objective function of the deep
-learning model optimization,
-&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;e.g. for classification problem it can be the
-output of the softmax_cross_entropy function.
-3. Backward propagation and distributed parameter update, using half precision
-   for gradient communication:
-&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;It converts the gradients to 16 bits half
-precision format before allreduce
-4. Backward propagation and distributed asychronous training with partial
-   parameter synchronization:
-&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;It performs asychronous training where one
-parameter partition is all-reduced per iteration.
-5. Backward propagation and distributed parameter update, with sparsification to
-   reduce data transmission:
-&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;It applies sparsification schemes to transfer only
-the gradient elements which are significant.
-## Instruction to Use:
-SINGA supports two ways to launch the distributed training, namely I. MPI
-(Message Passing Interface) and II. python multiprocessing.
-### I. Using MPI
-The following are the detailed steps to start up a distributed training with
-MPI, using MNIST dataset as an example:
-1. Import SINGA and Miscellaneous Libraries used for the training
-from singa import singa_wrap as singa
-from singa import autograd
-from singa import tensor
-from singa import device
-from singa import opt
-import numpy as np
-import os
-import sys
-import gzip
-import codecs
-import time
-import urllib.request
-2. Create a Convolutional Neural Network Model
+1. Define the neural network model:
 class CNN:
@@ -152,7 +67,7 @@
 model = CNN()
-3. Create a Distributed Optimizer Object and Device Assignment
+2. Create the `DistOpt` instance:
 sgd = opt.SGD(lr=0.005, momentum=0.9, weight_decay=1e-5)
@@ -160,229 +75,128 @@
 dev = device.create_cuda_gpu_on(sgd.rank_in_local)
-4. Prepare the Training and Evaluation Data
+`dev` represents the `Device` instance, where to load data and run the CNN
+3. Load and partition the training/validation data:
-def load_dataset():
-    train_x_url = ''
-    train_y_url = ''
-    valid_x_url = ''
-    valid_y_url = ''
-    train_x = read_image_file(check_exist_or_download(train_x_url)).astype(
-        np.float32)
-    train_y = read_label_file(check_exist_or_download(train_y_url)).astype(
-        np.float32)
-    valid_x = read_image_file(check_exist_or_download(valid_x_url)).astype(
-        np.float32)
-    valid_y = read_label_file(check_exist_or_download(valid_y_url)).astype(
-        np.float32)
-    return train_x, train_y, valid_x, valid_y
-def check_exist_or_download(url):
-    download_dir = '/tmp/'
-    name = url.rsplit('/', 1)[-1]
-    filename = os.path.join(download_dir, name)
-    if not os.path.isfile(filename):
-        print("Downloading %s" % url)
-        urllib.request.urlretrieve(url, filename)
-    return filename
-def read_label_file(path):
-    with, 'rb') as f:
-        data =
-        assert get_int(data[:4]) == 2049
-        length = get_int(data[4:8])
-        parsed = np.frombuffer(data, dtype=np.uint8, offset=8).reshape(
-            (length))
-        return parsed
-def get_int(b):
-    return int(codecs.encode(b, 'hex'), 16)
-def read_image_file(path):
-    with, 'rb') as f:
-        data =
-        assert get_int(data[:4]) == 2051
-        length = get_int(data[4:8])
-        num_rows = get_int(data[8:12])
-        num_cols = get_int(data[12:16])
-        parsed = np.frombuffer(data, dtype=np.uint8, offset=16).reshape(
-            (length, 1, num_rows, num_cols))
-        return parsed
-def to_categorical(y, num_classes):
-    y = np.array(y, dtype="int")
-    n = y.shape[0]
-    categorical = np.zeros((n, num_classes))
-    categorical[np.arange(n), y] = 1
-    categorical = categorical.astype(np.float32)
-    return categorical
-# Prepare training and valadiation data
 train_x, train_y, test_x, test_y = load_dataset()
-IMG_SIZE = 28
-train_y = to_categorical(train_y, num_classes)
-test_y = to_categorical(test_y, num_classes)
-# Normalization
-train_x = train_x / 255
-test_x = test_x / 255
+train_x, train_y = data_partition(train_x, train_y,
+                                  sgd.rank_in_global, sgd.world_size)
+test_x, test_y = data_partition(test_x, test_y,
+                                sgd.rank_in_global, sgd.world_size)
-5. Data Partitioning of the Training and Evaluation Datasets
+A partition of the dataset is returned for this `dev`.
+4. Initialize and synchronize the model parameters among all workers:
-def data_partition(dataset_x, dataset_y, rank_in_global, world_size):
-    data_per_rank = dataset_x.shape[0] // world_size
-    idx_start = rank_in_global * data_per_rank
-    idx_end = (rank_in_global + 1) * data_per_rank
-    return dataset_x[idx_start: idx_end], dataset_y[idx_start: idx_end]
-train_x, train_y = data_partition(train_x, train_y, sgd.rank_in_global, sgd.world_size)
-test_x, test_y = data_partition(test_x, test_y, sgd.rank_in_global, sgd.world_size)
-6. Configuring the Training Loop Variables
-max_epoch = 10
-batch_size = 64
-tx = tensor.Tensor((batch_size, 1, IMG_SIZE, IMG_SIZE), dev, tensor.float32)
-ty = tensor.Tensor((batch_size, num_classes), dev, tensor.int32)
-num_train_batch = train_x.shape[0] // batch_size
-num_test_batch = test_x.shape[0] // batch_size
-idx = np.arange(train_x.shape[0], dtype=np.int32)
-7. Initialize and Synchronize the Model Parameters
-def sychronize(tensor, dist_opt):
+def synchronize(tensor, dist_opt):
     tensor /= dist_opt.world_size
-#Sychronize the initial parameter = True
-x = np.random.randn(batch_size, 1, IMG_SIZE, IMG_SIZE).astype(np.float32)
-y = np.zeros( shape=(batch_size, num_classes), dtype=np.int32)
+#Synchronize the initial parameter
+tx = tensor.Tensor((batch_size, 1, IMG_SIZE, IMG_SIZE), dev, tensor.float32)
+ty = tensor.Tensor((batch_size, num_classes), dev, tensor.int32)
 out = model.forward(tx)
 loss = autograd.softmax_cross_entropy(out, ty)
 for p, g in autograd.backward(loss):
-    sychronize(p, sgd)
+    synchronize(p, sgd)
-8. Start the Training and Evaluation Loop
+5. Run BackPropagation and distributed SGD
-# Function to all reduce Accuracy and Loss from Multiple Devices
-def reduce_variable(variable, dist_opt, reducer):
-    reducer.copy_from_numpy(variable)
-    dist_opt.all_reduce(
-    dist_opt.wait()
-    output=tensor.to_numpy(reducer)
-    return output
-def accuracy(pred, target):
-    y = np.argmax(pred, axis=1)
-    t = np.argmax(target, axis=1)
-    a = y == t
-    return np.array(a, "int").sum()
-def augmentation(x, batch_size):
-    xpad = np.pad(x, [[0, 0], [0, 0], [4, 4], [4, 4]], 'symmetric')
-    for data_num in range(0, batch_size):
-        offset = np.random.randint(8, size=2)
-        x[data_num,:,:,:] = xpad[data_num, :, offset[0]: offset[0] + 28, offset[1]: offset[1] + 28]
-        if_flip = np.random.randint(2)
-        if (if_flip):
-            x[data_num, :, :, :] = x[data_num, :, :, ::-1]
-    return x
-# Training and Evaulation Loop
 for epoch in range(max_epoch):
-    start_time = time.time()
-    np.random.shuffle(idx)
-    if(sgd.rank_in_global==0):
-        print('Starting Epoch %d:' % (epoch))
-    # Training Phase
- = True
-    train_correct = np.zeros(shape=[1],dtype=np.float32)
-    test_correct = np.zeros(shape=[1],dtype=np.float32)
-    train_loss = np.zeros(shape=[1],dtype=np.float32)
     for b in range(num_train_batch):
         x = train_x[idx[b * batch_size: (b + 1) * batch_size]]
-        x = augmentation(x, batch_size)
         y = train_y[idx[b * batch_size: (b + 1) * batch_size]]
         out = model.forward(tx)
         loss = autograd.softmax_cross_entropy(out, ty)
-        train_correct += accuracy(tensor.to_numpy(out), y)
-        train_loss += tensor.to_numpy(loss)[0]
+        # do backpropagation and all-reduce
-    # Reduce the Evaluation Accuracy and Loss from Multiple Devices
-    reducer = tensor.Tensor((1,), dev, tensor.float32)
-    train_correct = reduce_variable(train_correct, sgd, reducer)
-    train_loss = reduce_variable(train_loss, sgd, reducer)
-    # Output the Training Loss and Accuracy
-    if(sgd.rank_in_global==0):
-        print('Training loss = %f, training accuracy = %f' %
-              (train_loss, train_correct / (num_train_batch*batch_size*sgd.world_size)), flush=True)
-    # Evaluation Phase
- = False
-    for b in range(num_test_batch):
-        x = test_x[b * batch_size: (b + 1) * batch_size]
-        y = test_y[b * batch_size: (b + 1) * batch_size]
-        tx.copy_from_numpy(x)
-        ty.copy_from_numpy(y)
-        out_test = model.forward(tx)
-        test_correct += accuracy(tensor.to_numpy(out_test), y)
-    # Reduce the Evaulation Accuracy from Multiple Devices
-    test_correct = reduce_variable(test_correct, sgd, reducer)
-    # Output the Evaluation Accuracy
-    if(sgd.rank_in_global==0):
-        print('Evaluation accuracy = %f, Elapsed Time = %fs' %
-              (test_correct / (num_test_batch*batch_size*sgd.world_size), time.time() - start_time ), flush=True)
-9. Save the above training code in a python file, e.g.
+### Execution Instruction
-10. Generate a hostfile to be used by the MPI, e.g. the hostfile below uses 4
-    processes and hence 4 GPUs for the training.
+There are two ways to launch the training: MPI or Python multiprocessing.
+#### Python multiprocessing
+It works on a single node with multiple GPUs, where each GPU is one worker.
+1. Put all the above training codes in a function
-cat host_file
+def train_mnist_cnn(nccl_id=None, gpu_num=None, gpu_per=None):
+    ...
-    localhost:4
-11. Finally, use the MPIEXEC command to Execute the Multi-GPUs Training with the
-    hostfile:
+2. Create ``
-mpiexec --hostfile host_file python3
+if __name__ == '__main__':
+    # Generate a NCCL ID to be used for collective communication
+    nccl_id = singa.NcclIdHolder()
+    # Define the number of GPUs to be used in the training process
+    gpu_per_node = int(sys.argv[1])
+    gpu_num = 1
+    # Define and launch the multi-processing
+	import multiprocessing
+    process = []
+    for gpu_num in range(0, gpu_per_node):
+        process.append(multiprocessing.Process(target=train_mnist_cnn,
+                       args=(nccl_id, gpu_num, gpu_per_node)))
+    for p in process:
+        p.start()
-&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;It could result in several times speed up compared
-to the single GPU training.
+The arguments for creating the `DistOpt` instance should be updated as follows
+sgd = opt.DistOpt(sgd, nccl_id=nccl_id, gpu_num=gpu_num, gpu_per_node=gpu_per_node)
+3. Run ``
+#### MPI
+It works for both single node and multiple nodes as long as there are multiple
+1. Create ``
+if __name__ == '__main__':
+    train_mnist_cnn()
+2. Generate a hostfile for MPI, e.g. the hostfile below uses 4 processes (i.e.,
+   4 GPUs) on a single node
+3. Launch the training via `mpiexec`
+mpiexec --hostfile host_file python3
+It could result in several times speed up compared to the single GPU training.
 Starting Epoch 0:
@@ -417,49 +231,50 @@
 Evaluation accuracy = 0.982672, Elapsed Time = 0.717571s
-### II. Using Python multiprocessing
+## Optimizations for Distributed Training
-For single node, we can use Python multiprocessing module instead of MPI. It
-needs just a small portion of code changes:
+SINGA provides multiple optimization strategies for distributed training to
+reduce the communication cost. Refer to the API for `DistOpt` for the
+configuration of each strategy.
-1. Put all the above training codes in a function, e.g. train_mnist_cnn
-2. Generate a NCCIdHolder, define the number of GPUs to be used in the training
-   process (gpu_per_node), and uses the multiprocessing to launch the training
-   code with the arguments.
+### No Optimizations
-    # Generate a NCCL ID to be used for collective communication
-    nccl_id = singa.NcclIdHolder()
-    # Define the number of GPUs to be used in the training process
-    gpu_per_node = 8
-    # Define and launch the multi-processing
-	import multiprocessing
-    process = []
-    for gpu_num in range(0, gpu_per_node):
-        process.append(multiprocessing.Process(target=train_mnist_cnn, args=(nccl_id, gpu_num, gpu_per_node)))
-    for p in process:
-        p.start()
-3. In the training code, it should pass the arguments defined above to the
-   DistOpt object.
+`loss` is the output tensor from the loss function, e.g., cross-entropy for
+classification tasks.
+### Half-precision Gradients
-sgd = opt.DistOpt(sgd, nccl_id=nccl_id, gpu_num=gpu_num, gpu_per_node=gpu_per_node)
-4. Finally, we can launch the code with the multiprocessing module.
+It converts each gradient value to 16-bit representation (i.e., half-precision)
+before calling AllReduce.
-## Full Examples
+### Partial Synchronization
-The full examples of the distributed training using the MNIST dataset are
-available in the examples folder of SINGA:
-1. MPI: examples/autograd/
+In each iteration, only a chunk of of gradients are averaged, which saves the
+communication cost. The other gradients are used to update the parameters
+locally. The chunk size is configured when creating the `DistOpt` instance.
-2. Python Multiprocessing: examples/autograd/
+### Gradient Sparsification
+It applies sparsification schemes to select a subset of gradients for
+All-Reduce. There are two scheme:
+- The top-K largest elements are selected
+- All gradients whose absolute value are larger than predefined threshold.
+The hype-parameters are configured when creating the `DistOpt` instance.