| # |
| # Licensed to the Apache Software Foundation (ASF) under one |
| # or more contributor license agreements. See the NOTICE file |
| # distributed with this work for additional information |
| # regarding copyright ownership. The ASF licenses this file |
| # to you under the Apache License, Version 2.0 (the |
| # "License"); you may not use this file except in compliance |
| # with the License. You may obtain a copy of the License at |
| # |
| # http://www.apache.org/licenses/LICENSE-2.0 |
| # |
| # Unless required by applicable law or agreed to in writing, |
| # software distributed under the License is distributed on an |
| # "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY |
| # KIND, either express or implied. See the License for the |
| # specific language governing permissions and limitations |
| # under the License. |
| # |
| |
| try: |
| import pickle |
| except ImportError: |
| import cPickle as pickle |
| |
| from singa import singa_wrap as singa |
| from singa import autograd |
| from singa import tensor |
| from singa import device |
| from singa import opt |
| from PIL import Image |
| import numpy as np |
| import os |
| import sys |
| import time |
| |
| |
| def load_dataset(filepath): |
| with open(filepath, 'rb') as fd: |
| try: |
| cifar10 = pickle.load(fd, encoding='latin1') |
| except TypeError: |
| cifar10 = pickle.load(fd) |
| image = cifar10['data'].astype(dtype=np.uint8) |
| image = image.reshape((-1, 3, 32, 32)) |
| label = np.asarray(cifar10['labels'], dtype=np.uint8) |
| label = label.reshape(label.size, 1) |
| return image, label |
| |
| |
| def load_train_data(dir_path='cifar-10-batches-py', num_batches=5): |
| labels = [] |
| batchsize = 10000 |
| images = np.empty((num_batches * batchsize, 3, 32, 32), dtype=np.uint8) |
| for did in range(1, num_batches + 1): |
| fname_train_data = dir_path + "/data_batch_{}".format(did) |
| image, label = load_dataset(check_dataset_exist(fname_train_data)) |
| images[(did - 1) * batchsize:did * batchsize] = image |
| labels.extend(label) |
| images = np.array(images, dtype=np.float32) |
| labels = np.array(labels, dtype=np.int32) |
| return images, labels |
| |
| |
| def load_test_data(dir_path='cifar-10-batches-py'): |
| images, labels = load_dataset(check_dataset_exist(dir_path + "/test_batch")) |
| return np.array(images, dtype=np.float32), np.array(labels, dtype=np.int32) |
| |
| |
| def check_dataset_exist(dirpath): |
| if not os.path.exists(dirpath): |
| print( |
| 'Please download the cifar10 dataset using download_data.py (e.g. python ~/singa/examples/cifar10/download_data.py py)' |
| ) |
| sys.exit(0) |
| return dirpath |
| |
| |
| def normalize_for_resnet(train_x, test_x): |
| mean = [0.4914, 0.4822, 0.4465] |
| std = [0.2023, 0.1994, 0.2010] |
| train_x /= 255 |
| test_x /= 255 |
| for ch in range(0, 2): |
| train_x[:, ch, :, :] -= mean[ch] |
| train_x[:, ch, :, :] /= std[ch] |
| test_x[:, ch, :, :] -= mean[ch] |
| test_x[:, ch, :, :] /= std[ch] |
| return train_x, test_x |
| |
| |
| def resize_dataset(x, IMG_SIZE): |
| num_data = x.shape[0] |
| dim = x.shape[1] |
| X = np.zeros(shape=(num_data, dim, IMG_SIZE, IMG_SIZE), dtype=np.float32) |
| for n in range(0, num_data): |
| for d in range(0, dim): |
| X[n, d, :, :] = np.array(Image.fromarray(x[n, d, :, :]).resize( |
| (IMG_SIZE, IMG_SIZE), Image.BILINEAR), |
| dtype=np.float32) |
| return X |
| |
| |
| def augmentation(x, batch_size): |
| xpad = np.pad(x, [[0, 0], [0, 0], [4, 4], [4, 4]], 'symmetric') |
| for data_num in range(0, batch_size): |
| offset = np.random.randint(8, size=2) |
| x[data_num, :, :, :] = xpad[data_num, :, offset[0]:offset[0] + 32, |
| offset[1]:offset[1] + 32] |
| if_flip = np.random.randint(2) |
| if (if_flip): |
| x[data_num, :, :, :] = x[data_num, :, :, ::-1] |
| return x |
| |
| |
| def accuracy(pred, target): |
| y = np.argmax(pred, axis=1) |
| t = np.argmax(target, axis=1) |
| a = y == t |
| return np.array(a, "int").sum() |
| |
| |
| def to_categorical(y, num_classes): |
| y = np.array(y, dtype="int") |
| n = y.shape[0] |
| categorical = np.zeros((n, num_classes)) |
| for i in range(0, n): |
| categorical[i, y[i]] = 1 |
| categorical = categorical.astype(np.float32) |
| return categorical |
| |
| |
| # Function to all reduce NUMPY accuracy and loss from multiple devices |
| def reduce_variable(variable, dist_opt, reducer): |
| reducer.copy_from_numpy(variable) |
| dist_opt.all_reduce(reducer.data) |
| dist_opt.wait() |
| output = tensor.to_numpy(reducer) |
| return output |
| |
| |
| # Function to sychronize SINGA TENSOR initial model parameters |
| def synchronize(tensor, dist_opt): |
| dist_opt.all_reduce(tensor.data) |
| dist_opt.wait() |
| tensor /= dist_opt.world_size |
| |
| |
| # Data partition |
| def data_partition(dataset_x, dataset_y, global_rank, world_size): |
| data_per_rank = dataset_x.shape[0] // world_size |
| idx_start = global_rank * data_per_rank |
| idx_end = (global_rank + 1) * data_per_rank |
| return dataset_x[idx_start:idx_end], dataset_y[idx_start:idx_end] |
| |
| |
| def train_cifar10(DIST=False, |
| local_rank=None, |
| world_size=None, |
| nccl_id=None, |
| partial_update=False): |
| |
| # Define the hypermeters for the train_cifar10 |
| sgd = opt.SGD(lr=0.005, momentum=0.9, weight_decay=1e-5) |
| max_epoch = 5 |
| batch_size = 32 |
| |
| train_x, train_y = load_train_data() |
| test_x, test_y = load_test_data() |
| train_x, test_x = normalize_for_resnet(train_x, test_x) |
| IMG_SIZE = 224 |
| num_classes = 10 |
| |
| if DIST: |
| # For distributed GPU training |
| sgd = opt.DistOpt(sgd, |
| nccl_id=nccl_id, |
| local_rank=local_rank, |
| world_size=world_size) |
| dev = device.create_cuda_gpu_on(sgd.local_rank) |
| |
| # Dataset partition for distributed training |
| train_x, train_y = data_partition(train_x, train_y, sgd.global_rank, |
| sgd.world_size) |
| test_x, test_y = data_partition(test_x, test_y, sgd.global_rank, |
| sgd.world_size) |
| world_size = sgd.world_size |
| else: |
| # For single GPU |
| dev = device.create_cuda_gpu() |
| world_size = 1 |
| |
| from resnet import resnet50 |
| model = resnet50(num_classes=num_classes) |
| |
| tx = tensor.Tensor((batch_size, 3, IMG_SIZE, IMG_SIZE), dev, tensor.float32) |
| ty = tensor.Tensor((batch_size,), dev, tensor.int32) |
| num_train_batch = train_x.shape[0] // batch_size |
| num_test_batch = test_x.shape[0] // batch_size |
| idx = np.arange(train_x.shape[0], dtype=np.int32) |
| |
| if DIST: |
| # Sychronize the initial parameters |
| autograd.training = True |
| x = np.random.randn(batch_size, 3, IMG_SIZE, |
| IMG_SIZE).astype(np.float32) |
| y = np.zeros(shape=(batch_size,), dtype=np.int32) |
| tx.copy_from_numpy(x) |
| ty.copy_from_numpy(y) |
| out = model(tx) |
| loss = autograd.softmax_cross_entropy(out, ty) |
| param = [] |
| for p, _ in autograd.backward(loss): |
| synchronize(p, sgd) |
| param.append(p) |
| |
| for epoch in range(max_epoch): |
| start_time = time.time() |
| np.random.shuffle(idx) |
| |
| if ((DIST == False) or (sgd.global_rank == 0)): |
| print('Starting Epoch %d:' % (epoch)) |
| |
| # Training phase |
| autograd.training = True |
| train_correct = np.zeros(shape=[1], dtype=np.float32) |
| test_correct = np.zeros(shape=[1], dtype=np.float32) |
| train_loss = np.zeros(shape=[1], dtype=np.float32) |
| |
| for b in range(num_train_batch): |
| x = train_x[idx[b * batch_size:(b + 1) * batch_size]] |
| x = augmentation(x, batch_size) |
| x = resize_dataset(x, IMG_SIZE) |
| y = train_y[idx[b * batch_size:(b + 1) * batch_size]] |
| tx.copy_from_numpy(x) |
| ty.copy_from_numpy(y) |
| out = model(tx) |
| loss = autograd.softmax_cross_entropy(out, ty) |
| train_correct += accuracy(tensor.to_numpy(out), |
| to_categorical(y, num_classes)).astype( |
| np.float32) |
| train_loss += tensor.to_numpy(loss)[0] |
| if not partial_update: |
| sgd.backward_and_update(loss) |
| else: |
| sgd.backward_and_partial_update(loss) |
| |
| if DIST: |
| # Reduce the evaluation accuracy and loss from multiple devices |
| reducer = tensor.Tensor((1,), dev, tensor.float32) |
| train_correct = reduce_variable(train_correct, sgd, reducer) |
| train_loss = reduce_variable(train_loss, sgd, reducer) |
| |
| # Output the training loss and accuracy |
| if ((DIST == False) or (sgd.global_rank == 0)): |
| print('Training loss = %f, training accuracy = %f' % |
| (train_loss, train_correct / |
| (num_train_batch * batch_size * world_size)), |
| flush=True) |
| |
| if partial_update: |
| # Sychronize parameters before evaluation phase |
| for p in param: |
| synchronize(p, sgd) |
| |
| # Evaulation phase |
| autograd.training = False |
| for b in range(num_test_batch): |
| x = test_x[b * batch_size:(b + 1) * batch_size] |
| x = resize_dataset(x, IMG_SIZE) |
| y = test_y[b * batch_size:(b + 1) * batch_size] |
| tx.copy_from_numpy(x) |
| ty.copy_from_numpy(y) |
| out_test = model(tx) |
| test_correct += accuracy(tensor.to_numpy(out_test), |
| to_categorical(y, num_classes)) |
| |
| if DIST: |
| # Reduce the evaulation accuracy from multiple devices |
| test_correct = reduce_variable(test_correct, sgd, reducer) |
| |
| # Output the evaluation accuracy |
| if ((DIST == False) or (sgd.global_rank == 0)): |
| print('Evaluation accuracy = %f, Elapsed Time = %fs' % |
| (test_correct / (num_test_batch * batch_size * world_size), |
| time.time() - start_time), |
| flush=True) |
| |
| |
| if __name__ == '__main__': |
| |
| DIST = False |
| train_cifar10(DIST=DIST) |