blob: 16752ceabefdf08f790f737acc476cdc25e131ad [file] [log] [blame]
#
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
#
from singa import singa_wrap as singa
from singa import autograd
from singa import layer
from singa import tensor
from singa import device
from singa import opt
import numpy as np
import os
import sys
import gzip
import codecs
import time
class CNN:
def __init__(self):
self.conv1 = layer.Conv2d(1, 20, 5, padding=0)
self.conv2 = layer.Conv2d(20, 50, 5, padding=0)
self.linear1 = layer.Linear(4 * 4 * 50, 500)
self.linear2 = layer.Linear(500, 10)
self.pooling1 = layer.MaxPool2d(2, 2, padding=0)
self.pooling2 = layer.MaxPool2d(2, 2, padding=0)
self.relu1 = layer.ReLU()
self.relu2 = layer.ReLU()
self.relu3 = layer.ReLU()
self.flatten = layer.Flatten()
def forward(self, x):
y = self.conv1(x)
y = self.relu1(y)
y = self.pooling1(y)
y = self.conv2(y)
y = self.relu2(y)
y = self.pooling2(y)
y = self.flatten(y)
y = self.linear1(y)
y = self.relu3(y)
y = self.linear2(y)
return y
def check_dataset_exist(dirpath):
if not os.path.exists(dirpath):
print(
'The MNIST dataset does not exist. Please download the mnist dataset using download_mnist.py (e.g. python3 download_mnist.py)'
)
sys.exit(0)
return dirpath
def load_dataset():
train_x_path = '/tmp/train-images-idx3-ubyte.gz'
train_y_path = '/tmp/train-labels-idx1-ubyte.gz'
valid_x_path = '/tmp/t10k-images-idx3-ubyte.gz'
valid_y_path = '/tmp/t10k-labels-idx1-ubyte.gz'
train_x = read_image_file(check_dataset_exist(train_x_path)).astype(
np.float32)
train_y = read_label_file(check_dataset_exist(train_y_path)).astype(
np.float32)
valid_x = read_image_file(check_dataset_exist(valid_x_path)).astype(
np.float32)
valid_y = read_label_file(check_dataset_exist(valid_y_path)).astype(
np.float32)
return train_x, train_y, valid_x, valid_y
def read_label_file(path):
with gzip.open(path, 'rb') as f:
data = f.read()
assert get_int(data[:4]) == 2049
length = get_int(data[4:8])
parsed = np.frombuffer(data, dtype=np.uint8, offset=8).reshape((length))
return parsed
def get_int(b):
return int(codecs.encode(b, 'hex'), 16)
def read_image_file(path):
with gzip.open(path, 'rb') as f:
data = f.read()
assert get_int(data[:4]) == 2051
length = get_int(data[4:8])
num_rows = get_int(data[8:12])
num_cols = get_int(data[12:16])
parsed = np.frombuffer(data, dtype=np.uint8, offset=16).reshape(
(length, 1, num_rows, num_cols))
return parsed
def to_categorical(y, num_classes):
y = np.array(y, dtype="int")
n = y.shape[0]
categorical = np.zeros((n, num_classes))
categorical[np.arange(n), y] = 1
categorical = categorical.astype(np.float32)
return categorical
def accuracy(pred, target):
y = np.argmax(pred, axis=1)
t = np.argmax(target, axis=1)
a = y == t
return np.array(a, "int").sum()
# Function to all reduce NUMPY accuracy and loss from multiple devices
def reduce_variable(variable, dist_opt, reducer):
reducer.copy_from_numpy(variable)
dist_opt.all_reduce(reducer.data)
dist_opt.wait()
output = tensor.to_numpy(reducer)
return output
# Function to sychronize SINGA TENSOR initial model parameters
def synchronize(tensor, dist_opt):
dist_opt.all_reduce(tensor.data)
dist_opt.wait()
tensor /= dist_opt.world_size
# Data augmentation
def augmentation(x, batch_size):
xpad = np.pad(x, [[0, 0], [0, 0], [4, 4], [4, 4]], 'symmetric')
for data_num in range(0, batch_size):
offset = np.random.randint(8, size=2)
x[data_num, :, :, :] = xpad[data_num, :, offset[0]:offset[0] + 28,
offset[1]:offset[1] + 28]
if_flip = np.random.randint(2)
if (if_flip):
x[data_num, :, :, :] = x[data_num, :, :, ::-1]
return x
# Data partition
def data_partition(dataset_x, dataset_y, global_rank, world_size):
data_per_rank = dataset_x.shape[0] // world_size
idx_start = global_rank * data_per_rank
idx_end = (global_rank + 1) * data_per_rank
return dataset_x[idx_start:idx_end], dataset_y[idx_start:idx_end]
def train_mnist_cnn(DIST=False,
local_rank=None,
world_size=None,
nccl_id=None,
spars=0,
topK=False,
corr=True):
# Define the hypermeters for the mnist_cnn
max_epoch = 10
batch_size = 64
sgd = opt.SGD(lr=0.005, momentum=0.9, weight_decay=1e-5)
# Prepare training and valadiation data
train_x, train_y, test_x, test_y = load_dataset()
IMG_SIZE = 28
num_classes = 10
train_y = to_categorical(train_y, num_classes)
test_y = to_categorical(test_y, num_classes)
# Normalization
train_x = train_x / 255
test_x = test_x / 255
if DIST:
# For distributed GPU training
sgd = opt.DistOpt(sgd,
nccl_id=nccl_id,
local_rank=local_rank,
world_size=world_size)
dev = device.create_cuda_gpu_on(sgd.local_rank)
# Dataset partition for distributed training
train_x, train_y = data_partition(train_x, train_y, sgd.global_rank,
sgd.world_size)
test_x, test_y = data_partition(test_x, test_y, sgd.global_rank,
sgd.world_size)
world_size = sgd.world_size
else:
# For single GPU
dev = device.create_cuda_gpu()
world_size = 1
# Create model
model = CNN()
tx = tensor.Tensor((batch_size, 1, IMG_SIZE, IMG_SIZE), dev, tensor.float32)
ty = tensor.Tensor((batch_size, num_classes), dev, tensor.int32)
num_train_batch = train_x.shape[0] // batch_size
num_test_batch = test_x.shape[0] // batch_size
idx = np.arange(train_x.shape[0], dtype=np.int32)
if DIST:
#Sychronize the initial parameters
autograd.training = True
x = np.random.randn(batch_size, 1, IMG_SIZE,
IMG_SIZE).astype(np.float32)
y = np.zeros(shape=(batch_size, num_classes), dtype=np.int32)
tx.copy_from_numpy(x)
ty.copy_from_numpy(y)
out = model.forward(tx)
loss = autograd.softmax_cross_entropy(out, ty)
for p, g in autograd.backward(loss):
synchronize(p, sgd)
# Training and evaulation loop
for epoch in range(max_epoch):
start_time = time.time()
np.random.shuffle(idx)
if ((DIST == False) or (sgd.global_rank == 0)):
print('Starting Epoch %d:' % (epoch))
# Training phase
autograd.training = True
train_correct = np.zeros(shape=[1], dtype=np.float32)
test_correct = np.zeros(shape=[1], dtype=np.float32)
train_loss = np.zeros(shape=[1], dtype=np.float32)
for b in range(num_train_batch):
x = train_x[idx[b * batch_size:(b + 1) * batch_size]]
x = augmentation(x, batch_size)
y = train_y[idx[b * batch_size:(b + 1) * batch_size]]
tx.copy_from_numpy(x)
ty.copy_from_numpy(y)
out = model.forward(tx)
loss = autograd.softmax_cross_entropy(out, ty)
train_correct += accuracy(tensor.to_numpy(out), y)
train_loss += tensor.to_numpy(loss)[0]
if DIST:
if (spars == 0):
sgd.backward_and_update(loss, threshold=50000)
else:
sgd.backward_and_sparse_update(loss,
spars=spars,
topK=topK,
corr=corr)
else:
sgd(loss)
if DIST:
# Reduce the evaluation accuracy and loss from multiple devices
reducer = tensor.Tensor((1,), dev, tensor.float32)
train_correct = reduce_variable(train_correct, sgd, reducer)
train_loss = reduce_variable(train_loss, sgd, reducer)
# Output the training loss and accuracy
if ((DIST == False) or (sgd.global_rank == 0)):
print('Training loss = %f, training accuracy = %f' %
(train_loss, train_correct /
(num_train_batch * batch_size * world_size)),
flush=True)
# Evaluation phase
autograd.training = False
for b in range(num_test_batch):
x = test_x[b * batch_size:(b + 1) * batch_size]
y = test_y[b * batch_size:(b + 1) * batch_size]
tx.copy_from_numpy(x)
ty.copy_from_numpy(y)
out_test = model.forward(tx)
test_correct += accuracy(tensor.to_numpy(out_test), y)
if DIST:
# Reduce the evaulation accuracy from multiple devices
test_correct = reduce_variable(test_correct, sgd, reducer)
# Output the evaluation accuracy
if ((DIST == False) or (sgd.global_rank == 0)):
print('Evaluation accuracy = %f, Elapsed Time = %fs' %
(test_correct / (num_test_batch * batch_size * world_size),
time.time() - start_time),
flush=True)
if __name__ == '__main__':
DIST = False
train_mnist_cnn(DIST=DIST)