| # | |
| # Licensed to the Apache Software Foundation (ASF) under one | |
| # or more contributor license agreements. See the NOTICE file | |
| # distributed with this work for additional information | |
| # regarding copyright ownership. The ASF licenses this file | |
| # to you under the Apache License, Version 2.0 (the | |
| # "License"); you may not use this file except in compliance | |
| # with the License. You may obtain a copy of the License at | |
| # | |
| # http://www.apache.org/licenses/LICENSE-2.0 | |
| # | |
| # Unless required by applicable law or agreed to in writing, | |
| # software distributed under the License is distributed on an | |
| # "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY | |
| # KIND, either express or implied. See the License for the | |
| # specific language governing permissions and limitations | |
| # under the License. | |
| # | |
| try: | |
| import pickle | |
| except ImportError: | |
| import cPickle as pickle | |
| from singa import singa_wrap as singa | |
| from singa import autograd | |
| from singa import tensor | |
| from singa import device | |
| from singa import opt | |
| from PIL import Image | |
| import numpy as np | |
| import os | |
| import sys | |
| import time | |
| def load_dataset(filepath): | |
| with open(filepath, 'rb') as fd: | |
| try: | |
| cifar10 = pickle.load(fd, encoding='latin1') | |
| except TypeError: | |
| cifar10 = pickle.load(fd) | |
| image = cifar10['data'].astype(dtype=np.uint8) | |
| image = image.reshape((-1, 3, 32, 32)) | |
| label = np.asarray(cifar10['labels'], dtype=np.uint8) | |
| label = label.reshape(label.size, 1) | |
| return image, label | |
| def load_train_data(dir_path='cifar-10-batches-py', num_batches=5): | |
| labels = [] | |
| batchsize = 10000 | |
| images = np.empty((num_batches * batchsize, 3, 32, 32), dtype=np.uint8) | |
| for did in range(1, num_batches + 1): | |
| fname_train_data = dir_path + "/data_batch_{}".format(did) | |
| image, label = load_dataset(check_dataset_exist(fname_train_data)) | |
| images[(did - 1) * batchsize:did * batchsize] = image | |
| labels.extend(label) | |
| images = np.array(images, dtype=np.float32) | |
| labels = np.array(labels, dtype=np.int32) | |
| return images, labels | |
| def load_test_data(dir_path='cifar-10-batches-py'): | |
| images, labels = load_dataset(check_dataset_exist(dir_path + "/test_batch")) | |
| return np.array(images, dtype=np.float32), np.array(labels, dtype=np.int32) | |
| def check_dataset_exist(dirpath): | |
| if not os.path.exists(dirpath): | |
| print( | |
| 'Please download the cifar10 dataset using download_data.py (e.g. python ~/singa/examples/cifar10/download_data.py py)' | |
| ) | |
| sys.exit(0) | |
| return dirpath | |
| def normalize_for_resnet(train_x, test_x): | |
| mean = [0.4914, 0.4822, 0.4465] | |
| std = [0.2023, 0.1994, 0.2010] | |
| train_x /= 255 | |
| test_x /= 255 | |
| for ch in range(0, 2): | |
| train_x[:, ch, :, :] -= mean[ch] | |
| train_x[:, ch, :, :] /= std[ch] | |
| test_x[:, ch, :, :] -= mean[ch] | |
| test_x[:, ch, :, :] /= std[ch] | |
| return train_x, test_x | |
| def resize_dataset(x, IMG_SIZE): | |
| num_data = x.shape[0] | |
| dim = x.shape[1] | |
| X = np.zeros(shape=(num_data, dim, IMG_SIZE, IMG_SIZE), dtype=np.float32) | |
| for n in range(0, num_data): | |
| for d in range(0, dim): | |
| X[n, d, :, :] = np.array(Image.fromarray(x[n, d, :, :]).resize( | |
| (IMG_SIZE, IMG_SIZE), Image.BILINEAR), | |
| dtype=np.float32) | |
| return X | |
| def augmentation(x, batch_size): | |
| xpad = np.pad(x, [[0, 0], [0, 0], [4, 4], [4, 4]], 'symmetric') | |
| for data_num in range(0, batch_size): | |
| offset = np.random.randint(8, size=2) | |
| x[data_num, :, :, :] = xpad[data_num, :, offset[0]:offset[0] + 32, | |
| offset[1]:offset[1] + 32] | |
| if_flip = np.random.randint(2) | |
| if (if_flip): | |
| x[data_num, :, :, :] = x[data_num, :, :, ::-1] | |
| return x | |
| def accuracy(pred, target): | |
| y = np.argmax(pred, axis=1) | |
| t = np.argmax(target, axis=1) | |
| a = y == t | |
| return np.array(a, "int").sum() | |
| def to_categorical(y, num_classes): | |
| y = np.array(y, dtype="int") | |
| n = y.shape[0] | |
| categorical = np.zeros((n, num_classes)) | |
| for i in range(0, n): | |
| categorical[i, y[i]] = 1 | |
| categorical = categorical.astype(np.float32) | |
| return categorical | |
| # Function to all reduce NUMPY accuracy and loss from multiple devices | |
| def reduce_variable(variable, dist_opt, reducer): | |
| reducer.copy_from_numpy(variable) | |
| dist_opt.all_reduce(reducer.data) | |
| dist_opt.wait() | |
| output = tensor.to_numpy(reducer) | |
| return output | |
| # Function to sychronize SINGA TENSOR initial model parameters | |
| def synchronize(tensor, dist_opt): | |
| dist_opt.all_reduce(tensor.data) | |
| dist_opt.wait() | |
| tensor /= dist_opt.world_size | |
| # Data partition | |
| def data_partition(dataset_x, dataset_y, global_rank, world_size): | |
| data_per_rank = dataset_x.shape[0] // world_size | |
| idx_start = global_rank * data_per_rank | |
| idx_end = (global_rank + 1) * data_per_rank | |
| return dataset_x[idx_start:idx_end], dataset_y[idx_start:idx_end] | |
| def train_cifar10(DIST=False, | |
| local_rank=None, | |
| world_size=None, | |
| nccl_id=None, | |
| partial_update=False): | |
| # Define the hypermeters for the train_cifar10 | |
| sgd = opt.SGD(lr=0.005, momentum=0.9, weight_decay=1e-5) | |
| max_epoch = 5 | |
| batch_size = 32 | |
| train_x, train_y = load_train_data() | |
| test_x, test_y = load_test_data() | |
| train_x, test_x = normalize_for_resnet(train_x, test_x) | |
| IMG_SIZE = 224 | |
| num_classes = 10 | |
| if DIST: | |
| # For distributed GPU training | |
| sgd = opt.DistOpt(sgd, | |
| nccl_id=nccl_id, | |
| local_rank=local_rank, | |
| world_size=world_size) | |
| dev = device.create_cuda_gpu_on(sgd.local_rank) | |
| # Dataset partition for distributed training | |
| train_x, train_y = data_partition(train_x, train_y, sgd.global_rank, | |
| sgd.world_size) | |
| test_x, test_y = data_partition(test_x, test_y, sgd.global_rank, | |
| sgd.world_size) | |
| world_size = sgd.world_size | |
| else: | |
| # For single GPU | |
| dev = device.create_cuda_gpu() | |
| world_size = 1 | |
| from resnet import resnet50 | |
| model = resnet50(num_classes=num_classes) | |
| tx = tensor.Tensor((batch_size, 3, IMG_SIZE, IMG_SIZE), dev, tensor.float32) | |
| ty = tensor.Tensor((batch_size,), dev, tensor.int32) | |
| num_train_batch = train_x.shape[0] // batch_size | |
| num_test_batch = test_x.shape[0] // batch_size | |
| idx = np.arange(train_x.shape[0], dtype=np.int32) | |
| if DIST: | |
| # Sychronize the initial parameters | |
| autograd.training = True | |
| x = np.random.randn(batch_size, 3, IMG_SIZE, | |
| IMG_SIZE).astype(np.float32) | |
| y = np.zeros(shape=(batch_size,), dtype=np.int32) | |
| tx.copy_from_numpy(x) | |
| ty.copy_from_numpy(y) | |
| out = model(tx) | |
| loss = autograd.softmax_cross_entropy(out, ty) | |
| param = [] | |
| for p, _ in autograd.backward(loss): | |
| synchronize(p, sgd) | |
| param.append(p) | |
| for epoch in range(max_epoch): | |
| start_time = time.time() | |
| np.random.shuffle(idx) | |
| if ((DIST == False) or (sgd.global_rank == 0)): | |
| print('Starting Epoch %d:' % (epoch)) | |
| # Training phase | |
| autograd.training = True | |
| train_correct = np.zeros(shape=[1], dtype=np.float32) | |
| test_correct = np.zeros(shape=[1], dtype=np.float32) | |
| train_loss = np.zeros(shape=[1], dtype=np.float32) | |
| for b in range(num_train_batch): | |
| x = train_x[idx[b * batch_size:(b + 1) * batch_size]] | |
| x = augmentation(x, batch_size) | |
| x = resize_dataset(x, IMG_SIZE) | |
| y = train_y[idx[b * batch_size:(b + 1) * batch_size]] | |
| tx.copy_from_numpy(x) | |
| ty.copy_from_numpy(y) | |
| out = model(tx) | |
| loss = autograd.softmax_cross_entropy(out, ty) | |
| train_correct += accuracy(tensor.to_numpy(out), | |
| to_categorical(y, num_classes)).astype( | |
| np.float32) | |
| train_loss += tensor.to_numpy(loss)[0] | |
| if not partial_update: | |
| sgd.backward_and_update(loss) | |
| else: | |
| sgd.backward_and_partial_update(loss) | |
| if DIST: | |
| # Reduce the evaluation accuracy and loss from multiple devices | |
| reducer = tensor.Tensor((1,), dev, tensor.float32) | |
| train_correct = reduce_variable(train_correct, sgd, reducer) | |
| train_loss = reduce_variable(train_loss, sgd, reducer) | |
| # Output the training loss and accuracy | |
| if ((DIST == False) or (sgd.global_rank == 0)): | |
| print('Training loss = %f, training accuracy = %f' % | |
| (train_loss, train_correct / | |
| (num_train_batch * batch_size * world_size)), | |
| flush=True) | |
| if partial_update: | |
| # Sychronize parameters before evaluation phase | |
| for p in param: | |
| synchronize(p, sgd) | |
| # Evaulation phase | |
| autograd.training = False | |
| for b in range(num_test_batch): | |
| x = test_x[b * batch_size:(b + 1) * batch_size] | |
| x = resize_dataset(x, IMG_SIZE) | |
| y = test_y[b * batch_size:(b + 1) * batch_size] | |
| tx.copy_from_numpy(x) | |
| ty.copy_from_numpy(y) | |
| out_test = model(tx) | |
| test_correct += accuracy(tensor.to_numpy(out_test), | |
| to_categorical(y, num_classes)) | |
| if DIST: | |
| # Reduce the evaulation accuracy from multiple devices | |
| test_correct = reduce_variable(test_correct, sgd, reducer) | |
| # Output the evaluation accuracy | |
| if ((DIST == False) or (sgd.global_rank == 0)): | |
| print('Evaluation accuracy = %f, Elapsed Time = %fs' % | |
| (test_correct / (num_test_batch * batch_size * world_size), | |
| time.time() - start_time), | |
| flush=True) | |
| if __name__ == '__main__': | |
| DIST = False | |
| train_cifar10(DIST=DIST) |