# | |
# Licensed to the Apache Software Foundation (ASF) under one | |
# or more contributor license agreements. See the NOTICE file | |
# distributed with this work for additional information | |
# regarding copyright ownership. The ASF licenses this file | |
# to you under the Apache License, Version 2.0 (the | |
# "License"); you may not use this file except in compliance | |
# with the License. You may obtain a copy of the License at | |
# | |
# http://www.apache.org/licenses/LICENSE-2.0 | |
# | |
# Unless required by applicable law or agreed to in writing, | |
# software distributed under the License is distributed on an | |
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY | |
# KIND, either express or implied. See the License for the | |
# specific language governing permissions and limitations | |
# under the License. | |
# | |
try: | |
import pickle | |
except ImportError: | |
import cPickle as pickle | |
from singa import singa_wrap as singa | |
from singa import autograd | |
from singa import tensor | |
from singa import device | |
from singa import opt | |
from PIL import Image | |
import numpy as np | |
import os | |
import sys | |
import time | |
def load_dataset(filepath): | |
with open(filepath, 'rb') as fd: | |
try: | |
cifar10 = pickle.load(fd, encoding='latin1') | |
except TypeError: | |
cifar10 = pickle.load(fd) | |
image = cifar10['data'].astype(dtype=np.uint8) | |
image = image.reshape((-1, 3, 32, 32)) | |
label = np.asarray(cifar10['labels'], dtype=np.uint8) | |
label = label.reshape(label.size, 1) | |
return image, label | |
def load_train_data(dir_path='cifar-10-batches-py', num_batches=5): | |
labels = [] | |
batchsize = 10000 | |
images = np.empty((num_batches * batchsize, 3, 32, 32), dtype=np.uint8) | |
for did in range(1, num_batches + 1): | |
fname_train_data = dir_path + "/data_batch_{}".format(did) | |
image, label = load_dataset(check_dataset_exist(fname_train_data)) | |
images[(did - 1) * batchsize:did * batchsize] = image | |
labels.extend(label) | |
images = np.array(images, dtype=np.float32) | |
labels = np.array(labels, dtype=np.int32) | |
return images, labels | |
def load_test_data(dir_path='cifar-10-batches-py'): | |
images, labels = load_dataset(check_dataset_exist(dir_path + "/test_batch")) | |
return np.array(images, dtype=np.float32), np.array(labels, dtype=np.int32) | |
def check_dataset_exist(dirpath): | |
if not os.path.exists(dirpath): | |
print( | |
'Please download the cifar10 dataset using download_data.py (e.g. python ~/singa/examples/cifar10/download_data.py py)' | |
) | |
sys.exit(0) | |
return dirpath | |
def normalize_for_resnet(train_x, test_x): | |
mean = [0.4914, 0.4822, 0.4465] | |
std = [0.2023, 0.1994, 0.2010] | |
train_x /= 255 | |
test_x /= 255 | |
for ch in range(0, 2): | |
train_x[:, ch, :, :] -= mean[ch] | |
train_x[:, ch, :, :] /= std[ch] | |
test_x[:, ch, :, :] -= mean[ch] | |
test_x[:, ch, :, :] /= std[ch] | |
return train_x, test_x | |
def resize_dataset(x, IMG_SIZE): | |
num_data = x.shape[0] | |
dim = x.shape[1] | |
X = np.zeros(shape=(num_data, dim, IMG_SIZE, IMG_SIZE), dtype=np.float32) | |
for n in range(0, num_data): | |
for d in range(0, dim): | |
X[n, d, :, :] = np.array(Image.fromarray(x[n, d, :, :]).resize( | |
(IMG_SIZE, IMG_SIZE), Image.BILINEAR), | |
dtype=np.float32) | |
return X | |
def augmentation(x, batch_size): | |
xpad = np.pad(x, [[0, 0], [0, 0], [4, 4], [4, 4]], 'symmetric') | |
for data_num in range(0, batch_size): | |
offset = np.random.randint(8, size=2) | |
x[data_num, :, :, :] = xpad[data_num, :, offset[0]:offset[0] + 32, | |
offset[1]:offset[1] + 32] | |
if_flip = np.random.randint(2) | |
if (if_flip): | |
x[data_num, :, :, :] = x[data_num, :, :, ::-1] | |
return x | |
def accuracy(pred, target): | |
y = np.argmax(pred, axis=1) | |
t = np.argmax(target, axis=1) | |
a = y == t | |
return np.array(a, "int").sum() | |
def to_categorical(y, num_classes): | |
y = np.array(y, dtype="int") | |
n = y.shape[0] | |
categorical = np.zeros((n, num_classes)) | |
for i in range(0, n): | |
categorical[i, y[i]] = 1 | |
categorical = categorical.astype(np.float32) | |
return categorical | |
# Function to all reduce NUMPY accuracy and loss from multiple devices | |
def reduce_variable(variable, dist_opt, reducer): | |
reducer.copy_from_numpy(variable) | |
dist_opt.all_reduce(reducer.data) | |
dist_opt.wait() | |
output = tensor.to_numpy(reducer) | |
return output | |
# Function to sychronize SINGA TENSOR initial model parameters | |
def synchronize(tensor, dist_opt): | |
dist_opt.all_reduce(tensor.data) | |
dist_opt.wait() | |
tensor /= dist_opt.world_size | |
# Data partition | |
def data_partition(dataset_x, dataset_y, global_rank, world_size): | |
data_per_rank = dataset_x.shape[0] // world_size | |
idx_start = global_rank * data_per_rank | |
idx_end = (global_rank + 1) * data_per_rank | |
return dataset_x[idx_start:idx_end], dataset_y[idx_start:idx_end] | |
def train_cifar10(DIST=False, | |
local_rank=None, | |
world_size=None, | |
nccl_id=None, | |
partial_update=False): | |
# Define the hypermeters for the train_cifar10 | |
sgd = opt.SGD(lr=0.005, momentum=0.9, weight_decay=1e-5) | |
max_epoch = 5 | |
batch_size = 32 | |
train_x, train_y = load_train_data() | |
test_x, test_y = load_test_data() | |
train_x, test_x = normalize_for_resnet(train_x, test_x) | |
IMG_SIZE = 224 | |
num_classes = 10 | |
if DIST: | |
# For distributed GPU training | |
sgd = opt.DistOpt(sgd, | |
nccl_id=nccl_id, | |
local_rank=local_rank, | |
world_size=world_size) | |
dev = device.create_cuda_gpu_on(sgd.local_rank) | |
# Dataset partition for distributed training | |
train_x, train_y = data_partition(train_x, train_y, sgd.global_rank, | |
sgd.world_size) | |
test_x, test_y = data_partition(test_x, test_y, sgd.global_rank, | |
sgd.world_size) | |
world_size = sgd.world_size | |
else: | |
# For single GPU | |
dev = device.create_cuda_gpu() | |
world_size = 1 | |
from resnet import resnet50 | |
model = resnet50(num_classes=num_classes) | |
tx = tensor.Tensor((batch_size, 3, IMG_SIZE, IMG_SIZE), dev, tensor.float32) | |
ty = tensor.Tensor((batch_size,), dev, tensor.int32) | |
num_train_batch = train_x.shape[0] // batch_size | |
num_test_batch = test_x.shape[0] // batch_size | |
idx = np.arange(train_x.shape[0], dtype=np.int32) | |
if DIST: | |
# Sychronize the initial parameters | |
autograd.training = True | |
x = np.random.randn(batch_size, 3, IMG_SIZE, | |
IMG_SIZE).astype(np.float32) | |
y = np.zeros(shape=(batch_size,), dtype=np.int32) | |
tx.copy_from_numpy(x) | |
ty.copy_from_numpy(y) | |
out = model(tx) | |
loss = autograd.softmax_cross_entropy(out, ty) | |
param = [] | |
for p, _ in autograd.backward(loss): | |
synchronize(p, sgd) | |
param.append(p) | |
for epoch in range(max_epoch): | |
start_time = time.time() | |
np.random.shuffle(idx) | |
if ((DIST == False) or (sgd.global_rank == 0)): | |
print('Starting Epoch %d:' % (epoch)) | |
# Training phase | |
autograd.training = True | |
train_correct = np.zeros(shape=[1], dtype=np.float32) | |
test_correct = np.zeros(shape=[1], dtype=np.float32) | |
train_loss = np.zeros(shape=[1], dtype=np.float32) | |
for b in range(num_train_batch): | |
x = train_x[idx[b * batch_size:(b + 1) * batch_size]] | |
x = augmentation(x, batch_size) | |
x = resize_dataset(x, IMG_SIZE) | |
y = train_y[idx[b * batch_size:(b + 1) * batch_size]] | |
tx.copy_from_numpy(x) | |
ty.copy_from_numpy(y) | |
out = model(tx) | |
loss = autograd.softmax_cross_entropy(out, ty) | |
train_correct += accuracy(tensor.to_numpy(out), | |
to_categorical(y, num_classes)).astype( | |
np.float32) | |
train_loss += tensor.to_numpy(loss)[0] | |
if not partial_update: | |
sgd.backward_and_update(loss) | |
else: | |
sgd.backward_and_partial_update(loss) | |
if DIST: | |
# Reduce the evaluation accuracy and loss from multiple devices | |
reducer = tensor.Tensor((1,), dev, tensor.float32) | |
train_correct = reduce_variable(train_correct, sgd, reducer) | |
train_loss = reduce_variable(train_loss, sgd, reducer) | |
# Output the training loss and accuracy | |
if ((DIST == False) or (sgd.global_rank == 0)): | |
print('Training loss = %f, training accuracy = %f' % | |
(train_loss, train_correct / | |
(num_train_batch * batch_size * world_size)), | |
flush=True) | |
if partial_update: | |
# Sychronize parameters before evaluation phase | |
for p in param: | |
synchronize(p, sgd) | |
# Evaulation phase | |
autograd.training = False | |
for b in range(num_test_batch): | |
x = test_x[b * batch_size:(b + 1) * batch_size] | |
x = resize_dataset(x, IMG_SIZE) | |
y = test_y[b * batch_size:(b + 1) * batch_size] | |
tx.copy_from_numpy(x) | |
ty.copy_from_numpy(y) | |
out_test = model(tx) | |
test_correct += accuracy(tensor.to_numpy(out_test), | |
to_categorical(y, num_classes)) | |
if DIST: | |
# Reduce the evaulation accuracy from multiple devices | |
test_correct = reduce_variable(test_correct, sgd, reducer) | |
# Output the evaluation accuracy | |
if ((DIST == False) or (sgd.global_rank == 0)): | |
print('Evaluation accuracy = %f, Elapsed Time = %fs' % | |
(test_correct / (num_test_batch * batch_size * world_size), | |
time.time() - start_time), | |
flush=True) | |
if __name__ == '__main__': | |
DIST = False | |
train_cifar10(DIST=DIST) |