Merge pull request #918 from NLGithubWP/add_resnet_10

Add resnet_cifar10 to cifar_distributed_cnn/autograd
diff --git a/examples/cifar_distributed_cnn/autograd/resnet_cifar10.py b/examples/cifar_distributed_cnn/autograd/resnet_cifar10.py
new file mode 100644
index 0000000..0d6379b
--- /dev/null
+++ b/examples/cifar_distributed_cnn/autograd/resnet_cifar10.py
@@ -0,0 +1,292 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+#
+
+try:
+    import pickle
+except ImportError:
+    import cPickle as pickle
+
+from singa import singa_wrap as singa
+from singa import autograd
+from singa import tensor
+from singa import device
+from singa import opt
+from PIL import Image
+import numpy as np
+import os
+import sys
+import time
+
+
+def load_dataset(filepath):
+    with open(filepath, 'rb') as fd:
+        try:
+            cifar10 = pickle.load(fd, encoding='latin1')
+        except TypeError:
+            cifar10 = pickle.load(fd)
+    image = cifar10['data'].astype(dtype=np.uint8)
+    image = image.reshape((-1, 3, 32, 32))
+    label = np.asarray(cifar10['labels'], dtype=np.uint8)
+    label = label.reshape(label.size, 1)
+    return image, label
+
+
+def load_train_data(dir_path='cifar-10-batches-py', num_batches=5):
+    labels = []
+    batchsize = 10000
+    images = np.empty((num_batches * batchsize, 3, 32, 32), dtype=np.uint8)
+    for did in range(1, num_batches + 1):
+        fname_train_data = dir_path + "/data_batch_{}".format(did)
+        image, label = load_dataset(check_dataset_exist(fname_train_data))
+        images[(did - 1) * batchsize:did * batchsize] = image
+        labels.extend(label)
+    images = np.array(images, dtype=np.float32)
+    labels = np.array(labels, dtype=np.int32)
+    return images, labels
+
+
+def load_test_data(dir_path='cifar-10-batches-py'):
+    images, labels = load_dataset(check_dataset_exist(dir_path + "/test_batch"))
+    return np.array(images, dtype=np.float32), np.array(labels, dtype=np.int32)
+
+
+def check_dataset_exist(dirpath):
+    if not os.path.exists(dirpath):
+        print(
+            'Please download the cifar10 dataset using download_data.py (e.g. python ~/singa/examples/cifar10/download_data.py py)'
+        )
+        sys.exit(0)
+    return dirpath
+
+
+def normalize_for_resnet(train_x, test_x):
+    mean = [0.4914, 0.4822, 0.4465]
+    std = [0.2023, 0.1994, 0.2010]
+    train_x /= 255
+    test_x /= 255
+    for ch in range(0, 2):
+        train_x[:, ch, :, :] -= mean[ch]
+        train_x[:, ch, :, :] /= std[ch]
+        test_x[:, ch, :, :] -= mean[ch]
+        test_x[:, ch, :, :] /= std[ch]
+    return train_x, test_x
+
+
+def resize_dataset(x, IMG_SIZE):
+    num_data = x.shape[0]
+    dim = x.shape[1]
+    X = np.zeros(shape=(num_data, dim, IMG_SIZE, IMG_SIZE), dtype=np.float32)
+    for n in range(0, num_data):
+        for d in range(0, dim):
+            X[n, d, :, :] = np.array(Image.fromarray(x[n, d, :, :]).resize(
+                (IMG_SIZE, IMG_SIZE), Image.BILINEAR),
+                                     dtype=np.float32)
+    return X
+
+
+def augmentation(x, batch_size):
+    xpad = np.pad(x, [[0, 0], [0, 0], [4, 4], [4, 4]], 'symmetric')
+    for data_num in range(0, batch_size):
+        offset = np.random.randint(8, size=2)
+        x[data_num, :, :, :] = xpad[data_num, :, offset[0]:offset[0] + 32,
+                                    offset[1]:offset[1] + 32]
+        if_flip = np.random.randint(2)
+        if (if_flip):
+            x[data_num, :, :, :] = x[data_num, :, :, ::-1]
+    return x
+
+
+def accuracy(pred, target):
+    y = np.argmax(pred, axis=1)
+    t = np.argmax(target, axis=1)
+    a = y == t
+    return np.array(a, "int").sum()
+
+
+def to_categorical(y, num_classes):
+    y = np.array(y, dtype="int")
+    n = y.shape[0]
+    categorical = np.zeros((n, num_classes))
+    for i in range(0, n):
+        categorical[i, y[i]] = 1
+        categorical = categorical.astype(np.float32)
+    return categorical
+
+
+# Function to all reduce NUMPY accuracy and loss from multiple devices
+def reduce_variable(variable, dist_opt, reducer):
+    reducer.copy_from_numpy(variable)
+    dist_opt.all_reduce(reducer.data)
+    dist_opt.wait()
+    output = tensor.to_numpy(reducer)
+    return output
+
+
+# Function to sychronize SINGA TENSOR initial model parameters
+def synchronize(tensor, dist_opt):
+    dist_opt.all_reduce(tensor.data)
+    dist_opt.wait()
+    tensor /= dist_opt.world_size
+
+
+# Data partition
+def data_partition(dataset_x, dataset_y, global_rank, world_size):
+    data_per_rank = dataset_x.shape[0] // world_size
+    idx_start = global_rank * data_per_rank
+    idx_end = (global_rank + 1) * data_per_rank
+    return dataset_x[idx_start:idx_end], dataset_y[idx_start:idx_end]
+
+
+def train_cifar10(DIST=False,
+                  local_rank=None,
+                  world_size=None,
+                  nccl_id=None,
+                  partial_update=False):
+
+    # Define the hypermeters for the train_cifar10
+    sgd = opt.SGD(lr=0.005, momentum=0.9, weight_decay=1e-5)
+    max_epoch = 5
+    batch_size = 32
+
+    train_x, train_y = load_train_data()
+    test_x, test_y = load_test_data()
+    train_x, test_x = normalize_for_resnet(train_x, test_x)
+    IMG_SIZE = 224
+    num_classes = 10
+
+    if DIST:
+        # For distributed GPU training
+        sgd = opt.DistOpt(sgd,
+                          nccl_id=nccl_id,
+                          local_rank=local_rank,
+                          world_size=world_size)
+        dev = device.create_cuda_gpu_on(sgd.local_rank)
+
+        # Dataset partition for distributed training
+        train_x, train_y = data_partition(train_x, train_y, sgd.global_rank,
+                                          sgd.world_size)
+        test_x, test_y = data_partition(test_x, test_y, sgd.global_rank,
+                                        sgd.world_size)
+        world_size = sgd.world_size
+    else:
+        # For single GPU
+        dev = device.create_cuda_gpu()
+        world_size = 1
+
+    from resnet import resnet50
+    model = resnet50(num_classes=num_classes)
+
+    tx = tensor.Tensor((batch_size, 3, IMG_SIZE, IMG_SIZE), dev, tensor.float32)
+    ty = tensor.Tensor((batch_size,), dev, tensor.int32)
+    num_train_batch = train_x.shape[0] // batch_size
+    num_test_batch = test_x.shape[0] // batch_size
+    idx = np.arange(train_x.shape[0], dtype=np.int32)
+
+    if DIST:
+        #Sychronize the initial parameters
+        autograd.training = True
+        x = np.random.randn(batch_size, 3, IMG_SIZE,
+                            IMG_SIZE).astype(np.float32)
+        y = np.zeros(shape=(batch_size,), dtype=np.int32)
+        tx.copy_from_numpy(x)
+        ty.copy_from_numpy(y)
+        out = model(tx)
+        loss = autograd.softmax_cross_entropy(out, ty)
+        param = []
+        for p, _ in autograd.backward(loss):
+            synchronize(p, sgd)
+            param.append(p)
+
+    for epoch in range(max_epoch):
+        start_time = time.time()
+        np.random.shuffle(idx)
+
+        if ((DIST == False) or (sgd.global_rank == 0)):
+            print('Starting Epoch %d:' % (epoch))
+
+        #Training phase
+        autograd.training = True
+        train_correct = np.zeros(shape=[1], dtype=np.float32)
+        test_correct = np.zeros(shape=[1], dtype=np.float32)
+        train_loss = np.zeros(shape=[1], dtype=np.float32)
+
+        for b in range(num_train_batch):
+            x = train_x[idx[b * batch_size:(b + 1) * batch_size]]
+            x = augmentation(x, batch_size)
+            x = resize_dataset(x, IMG_SIZE)
+            y = train_y[idx[b * batch_size:(b + 1) * batch_size]]
+            tx.copy_from_numpy(x)
+            ty.copy_from_numpy(y)
+            out = model(tx)
+            loss = autograd.softmax_cross_entropy(out, ty)
+            train_correct += accuracy(tensor.to_numpy(out),
+                                      to_categorical(y, num_classes)).astype(
+                                          np.float32)
+            train_loss += tensor.to_numpy(loss)[0]
+            if not partial_update:
+                sgd.backward_and_update(loss)
+            else:
+                sgd.backward_and_partial_update(loss)
+
+        if DIST:
+            # Reduce the evaluation accuracy and loss from multiple devices
+            reducer = tensor.Tensor((1,), dev, tensor.float32)
+            train_correct = reduce_variable(train_correct, sgd, reducer)
+            train_loss = reduce_variable(train_loss, sgd, reducer)
+
+        # Output the training loss and accuracy
+        if ((DIST == False) or (sgd.global_rank == 0)):
+            print('Training loss = %f, training accuracy = %f' %
+                  (train_loss, train_correct /
+                   (num_train_batch * batch_size * world_size)),
+                  flush=True)
+
+        if partial_update:
+            # Sychronize parameters before evaluation phase
+            for p in param:
+                synchronize(p, sgd)
+
+        #Evaulation phase
+        autograd.training = False
+        for b in range(num_test_batch):
+            x = test_x[b * batch_size:(b + 1) * batch_size]
+            x = resize_dataset(x, IMG_SIZE)
+            y = test_y[b * batch_size:(b + 1) * batch_size]
+            tx.copy_from_numpy(x)
+            ty.copy_from_numpy(y)
+            out_test = model(tx)
+            test_correct += accuracy(tensor.to_numpy(out_test),
+                                     to_categorical(y, num_classes))
+
+        if DIST:
+            # Reduce the evaulation accuracy from multiple devices
+            test_correct = reduce_variable(test_correct, sgd, reducer)
+
+        # Output the evaluation accuracy
+        if ((DIST == False) or (sgd.global_rank == 0)):
+            print('Evaluation accuracy = %f, Elapsed Time = %fs' %
+                  (test_correct / (num_test_batch * batch_size * world_size),
+                   time.time() - start_time),
+                  flush=True)
+
+
+if __name__ == '__main__':
+
+    DIST = False
+    train_cifar10(DIST=DIST)