blob: 015eea869ead9eb482b5c3804eaa29ddcf47fb78 [file] [log] [blame]
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
'''This module includes a set of optimizers for updating model parameters.
It replaces the old optimizers from optimizer.py'''
from singa import tensor
from singa.tensor import Tensor
from singa import autograd
from . import singa_wrap as singa
from deprecated import deprecated
class DecayScheduler:
# to be used for decaying learning rate or regularization coefficient or momentum, etc.
def __init__(self, init_value):
self.init_value = init_value
def __call__(self, step):
assert isinstance(step, Tensor)
return self.call(step)
def call(self, step) -> Tensor:
# step is a Tensor with a single scalar value
# return the current value as a Tensor
raise NotImplementedError
class Constant(DecayScheduler):
def call(self, step: Tensor) -> Tensor:
# TODO should be an in-place operator
ret = Tensor((1,), step.device)
ret.set_value(self.init_value)
return ret
class ExponentialDecay(DecayScheduler):
def __init__(self, init_value, decay_steps, decay_rate, staircase=False):
super(ExponentialDecay, self).__init__(init_value)
self.decay_steps = decay_steps
self.decay_rate = decay_rate
self.staircase = staircase
def call(self, step):
if self.staircase:
s = step // self.decay_steps
else:
s = step / self.decay_steps
ret = Tensor((1,), s.device)
ret.set_value(self.decay_rate)
return self.init_value * tensor.pow(ret, s)
class Optimizer(object):
"""Base optimizer.
Args:
config (Dict): specify the default values of configurable variables.
"""
def __init__(self, lr, dtype=tensor.float32):
# init lr(could be a constant scalar or a learning rate scheduler)
if type(lr) == float or type(lr) == int:
self.lr = Constant(lr)
elif isinstance(lr, DecayScheduler):
self.lr = lr
else:
raise TypeError("Wrong learning rate type")
# init step counter
self.dtype = dtype
# TODO change type to int32
self.step_counter = Tensor((1,), dtype=tensor.float32)
self.step_counter.set_value(0)
self.lr_value = self.lr(self.step_counter)
def get_states(self):
# skip DecayScheduler as it does not have persistent states
return {'step_counter': tensor.to_numpy(self.step_counter)[0]}
def set_states(self, states):
self.step_counter = Tensor((1,))
self.step_counter.set_value(states['step_counter'])
self.lr_value = self.lr(self.step_counter)
def __call__(self, loss):
self.call(loss)
self.step()
def call(self, loss):
for p, g in autograd.backward(loss):
if p.name is None:
p.name = id(p)
self.apply(p.name, p, g)
def step(self):
"""To increment the step counter and update the lr"""
self.step_counter.data += 1
lr_value = self.lr(self.step_counter)
self.lr_value.copy_from(lr_value)
def apply(self, param_name, param_value, param_grad):
"""Performs a single optimization step.
Args:
param_name(String): the name of the param
param_value(Tensor): param values to be update in-place
grad(Tensor): param gradients; the values may be updated
in this function; cannot use it anymore
"""
raise NotImplementedError
@deprecated(
reason=
"Update is deprecated, use apply() to do update, refer to apply for more details."
)
def update(self, param, grad):
"""Update the param values with given gradients.
Args:
param(Tensor): param values to be updated in-place
grad(Tensor): param gradients; the values may be updated
in this function; do not use it anymore
"""
if param.name is None:
param.name = id(param)
self.apply(param.name, param, grad)
def device_check(self, *inputs):
flag = inputs[0].device.graph_enabled()
inputs[0].device.EnableGraph(False)
x_device = inputs[0].device
x_dev_id = x_device.id()
for var in inputs:
if var.device.id() != x_dev_id:
var.to_device(x_device)
inputs[0].device.EnableGraph(flag)
@deprecated(
reason=
"backward_and_update is deprecated, use __call__() to do update, refer to __call__ for more details."
)
def backward_and_update(self, loss):
"""Performs backward propagation from the loss and parameter update.
From the loss, it performs backward propagation to get the gradients
and do the parameter update.
Args:
loss(Tensor): loss is the objective function of the deep learning model
optimization, e.g. for classification problem it can be the output of the
softmax_cross_entropy function.
"""
self.__call__(loss)
class SGD(Optimizer):
"""Implements stochastic gradient descent (optionally with momentum).
Nesterov momentum is based on the formula from `On the importance of initialization and momentum in deep learning`__.
Args:
lr(float): learning rate
momentum(float, optional): momentum factor(default: 0)
weight_decay(float, optional): weight decay(L2 penalty)(default: 0)
dampening(float, optional): dampening for momentum(default: 0)
nesterov(bool, optional): enables Nesterov momentum(default: False)
Typical usage example:
>> > from singa import opt
>> > optimizer = opt.SGD(lr=0.1, momentum=0.9)
>> > optimizer.update()
__ http: // www.cs.toronto.edu / %7Ehinton / absps / momentum.pdf
.. note::
The implementation of SGD with Momentum / Nesterov subtly differs from
Sutskever et. al. and implementations in some other frameworks.
Considering the specific case of Momentum, the update can be written as
.. math::
v = \rho * v + g \\
p = p - lr * v
where p, g, v and: math: `\rho` denote the parameters, gradient,
velocity, and momentum respectively.
This is in contrast to Sutskever et. al. and
other frameworks which employ an update of the form
.. math::
v = \rho * v + lr * g \\
p = p - v
The Nesterov version is analogously modified.
"""
def __init__(self,
lr=0.1,
momentum=0,
dampening=0,
weight_decay=0,
nesterov=False,
dtype=tensor.float32):
super(SGD, self).__init__(lr, dtype)
# init momentum
if type(momentum) == float or type(momentum) == int:
if momentum < 0.0:
raise ValueError("Invalid momentum value: {}".format(momentum))
self.momentum = Constant(momentum)
elif isinstance(momentum, DecayScheduler):
self.momentum = momentum
momentum = momentum.init_value
else:
raise TypeError("Wrong momentum type")
self.mom_value = self.momentum(self.step_counter).as_type(self.dtype)
# init dampening
if type(dampening) == float or type(dampening) == int:
self.dampening = Constant(dampening)
elif isinstance(dampening, DecayScheduler):
self.dampening = dampening
dampening = dampening.init_value
else:
raise TypeError("Wrong dampening type")
self.dam_value = self.dampening(self.step_counter).as_type(self.dtype)
# init weight_decay
if type(weight_decay) == float or type(weight_decay) == int:
if weight_decay < 0.0:
raise ValueError(
"Invalid weight_decay value: {}".format(weight_decay))
self.weight_decay = Constant(weight_decay)
elif isinstance(weight_decay, DecayScheduler):
self.weight_decay = weight_decay
else:
raise TypeError("Wrong weight_decay type")
self.decay_value = self.weight_decay(self.step_counter).as_type(
self.dtype)
# init other params
self.nesterov = nesterov
self.moments = dict()
# check value
if nesterov and (momentum <= 0 or dampening != 0):
raise ValueError(
"Nesterov momentum requires a momentum and zero dampening")
def apply(self, param_name, param_value, param_grad):
"""Performs a single optimization step.
Args:
param_name(String): the name of the param
param_value(Tensor): param values to be update in-place
grad(Tensor): param gradients; the values may be updated
in this function; cannot use it anymore
"""
assert param_value.shape == param_grad.shape, ("shape mismatch",
param_value.shape,
param_grad.shape)
self.device_check(param_value, self.step_counter, self.lr_value,
self.mom_value, self.dam_value, self.decay_value)
# derive dtype from input
assert param_value.dtype == self.dtype
# TODO add branch operator
# if self.decay_value != 0:
if self.weight_decay.init_value != 0:
singa.Axpy(self.decay_value.data, param_value.data, param_grad.data)
if self.momentum.init_value != 0:
if param_name not in self.moments:
flag = param_value.device.graph_enabled()
param_value.device.EnableGraph(False)
self.moments[param_name] = tensor.zeros_like(param_value)
param_value.device.EnableGraph(flag)
buf = self.moments[param_name]
buf *= self.mom_value
alpha = 1.0 - self.dam_value
singa.Axpy(alpha.data, param_grad.data, buf.data)
if self.nesterov:
singa.Axpy(self.mom_value.data, buf.data, param_grad.data)
else:
param_grad = buf
minus_lr = 0.0 - self.lr_value
singa.Axpy(minus_lr.data, param_grad.data, param_value.data)
def step(self):
# increment step counter, lr and moment
super().step()
mom_value = self.momentum(self.step_counter).as_type(self.dtype)
dam_value = self.dampening(self.step_counter).as_type(self.dtype)
decay_value = self.weight_decay(self.step_counter).as_type(self.dtype)
self.mom_value.copy_from(mom_value)
self.dam_value.copy_from(dam_value)
self.decay_value.copy_from(decay_value)
def get_states(self):
states = super().get_states()
if self.mom_value > 0:
states[
'moments'] = self.moments # a dict for 1st order moments tensors
return states
def set_states(self, states):
super().set_states(states)
if 'moments' in states:
self.moments = states['moments']
self.mom_value = self.momentum(self.step_counter)
class RMSProp(Optimizer):
'''RMSProp optimizer.
See the base Optimizer for all constructor args.
Args:
rho (float): float within [0, 1]
epsilon (float): small value for preventing numeric error
'''
def __init__(self, lr=0.1, rho=0.9, epsilon=1e-8, weight_decay=0):
super(RMSProp, self).__init__(lr)
# init weight_decay
if type(weight_decay) == float or type(weight_decay) == int:
if weight_decay < 0.0:
raise ValueError(
"Invalid weight_decay value: {}".format(weight_decay))
self.weight_decay = Constant(weight_decay)
elif isinstance(weight_decay, DecayScheduler):
self.weight_decay = weight_decay
else:
raise TypeError("Wrong weight_decay type")
self.decay_value = self.weight_decay(self.step_counter)
# init rho
if type(rho) == float or type(rho) == int:
self.rho = Constant(rho)
elif isinstance(rho, DecayScheduler):
self.rho = rho
else:
raise TypeError("Wrong rho type")
self.rho_value = self.rho(self.step_counter)
# init epsilon
if type(epsilon) == float or type(epsilon) == int:
self.epsilon = Constant(epsilon)
elif isinstance(rho, DecayScheduler):
self.epsilon = epsilon
else:
raise TypeError("Wrong epsilon type")
self.epsilon_value = self.epsilon(self.step_counter)
# init running average
self.running_average = dict()
def apply(self, param_name, param_value, param_grad):
"""Performs a single optimization step.
Args:
param_name(String): the name of the param
param_value(Tensor): param values to be update in-place
grad(Tensor): param gradients; the values may be updated
in this function; cannot use it anymore
"""
assert param_value.shape == param_grad.shape, ("shape mismatch",
param_value.shape,
param_grad.shape)
self.device_check(param_value, self.step_counter, self.lr_value,
self.rho_value, self.epsilon_value, self.decay_value)
# if self.decay_value != 0:
if self.weight_decay.init_value != 0:
singa.Axpy(self.decay_value.data, param_value.data, param_grad.data)
if param_name not in self.running_average:
flag = param_value.device.graph_enabled()
param_value.device.EnableGraph(False)
self.running_average[param_name] = tensor.zeros_like(param_value)
param_value.device.EnableGraph(flag)
# running_average = running_average * rho + param_grad * param_grad * (1 - rho)
# param_value = param_value - lr * param_grad / sqrt(running_average + epsilon)
self.running_average[param_name] *= self.rho_value
tmp1 = singa.Square(param_grad.data)
tmp2 = 1.0 - self.rho_value
singa.Axpy(tmp2.data, tmp1, self.running_average[param_name].data)
minus_lr = 0.0 - self.lr_value
tmp3 = self.running_average[param_name] + self.epsilon_value
tmp3 = singa.Sqrt(tmp3.data)
tmp3 = singa.__div__(param_grad.data, tmp3)
singa.Axpy(minus_lr.data, tmp3, param_value.data)
def step(self):
# increment step counter, lr and moment
super().step()
decay_value = self.weight_decay(self.step_counter)
rho_value = self.rho(self.step_counter)
epsilon_value = self.epsilon(self.step_counter)
self.decay_value.copy_from(decay_value)
self.rho_value.copy_from(rho_value)
self.epsilon_value.copy_from(epsilon_value)
def get_states(self):
states = super().get_states()
states['running_average'] = self.running_average
return states
def set_states(self, states):
super().set_states(states)
if 'running_average' in states:
self.running_average = states['running_average']
class AdaGrad(Optimizer):
'''AdaGrad optimizer.
See the base Optimizer for all constructor args.
Args:
epsilon (float): small number for preventing numeric error.
'''
def __init__(self, lr=0.1, epsilon=1e-8, weight_decay=0):
super(AdaGrad, self).__init__(lr)
# init weight_decay
if type(weight_decay) == float or type(weight_decay) == int:
if weight_decay < 0.0:
raise ValueError(
"Invalid weight_decay value: {}".format(weight_decay))
self.weight_decay = Constant(weight_decay)
elif isinstance(weight_decay, DecayScheduler):
self.weight_decay = weight_decay
else:
raise TypeError("Wrong weight_decay type")
self.decay_value = self.weight_decay(self.step_counter)
# init epsilon
if type(epsilon) == float or type(epsilon) == int:
self.epsilon = Constant(epsilon)
elif isinstance(epsilon, DecayScheduler):
self.epsilon = epsilon
else:
raise TypeError("Wrong epsilon type")
self.epsilon_value = self.epsilon(self.step_counter)
# init history
self.history = dict()
def apply(self, param_name, param_value, param_grad):
"""Performs a single optimization step.
Args:
param_name(String): the name of the param
param_value(Tensor): param values to be update in-place
grad(Tensor): param gradients; the values may be updated
in this function; cannot use it anymore
"""
assert param_value.shape == param_grad.shape, ("shape mismatch",
param_value.shape,
param_grad.shape)
self.device_check(param_value, self.step_counter, self.lr_value,
self.epsilon_value, self.decay_value)
# if self.decay_value != 0:
if self.weight_decay.init_value != 0:
singa.Axpy(self.decay_value.data, param_value.data, param_grad.data)
if param_name not in self.history:
flag = param_value.device.graph_enabled()
param_value.device.EnableGraph(False)
self.history[param_name] = tensor.zeros_like(param_value)
param_value.device.EnableGraph(flag)
# history = history + param_grad * param_grad
# param_value = param_value - lr * param_grad / sqrt(history + epsilon)
tmp = self.history[param_name].data
tmp += singa.Square(param_grad.data)
minus_lr = 0.0 - self.lr_value
tmp = self.history[param_name] + self.epsilon_value
tmp = singa.Sqrt(tmp.data)
tmp = singa.__div__(param_grad.data, tmp)
singa.Axpy(minus_lr.data, tmp, param_value.data)
def step(self):
# increment step counter, lr and moment
super().step()
decay_value = self.weight_decay(self.step_counter)
epsilon_value = self.epsilon(self.step_counter)
self.decay_value.copy_from(decay_value)
self.epsilon_value.copy_from(epsilon_value)
def get_states(self):
states = super().get_states()
states['history'] = self.history # a dict for 1st order moments tensors
return states
def set_states(self, states):
super().set_states(states)
if 'history' in states:
self.history = states['history']
class Adam(Optimizer):
'''Adam optimizer.
See the base Optimizer for all constructor args.
Args:
beta_1(float): coefficient of momentum
beta_2(float): coefficient of aggregated squared gradient
epsilon (float): small value for preventing numeric error
'''
def __init__(self,
lr=0.1,
beta_1=0.9,
beta_2=0.999,
epsilon=1e-8,
weight_decay=0):
super(Adam, self).__init__(lr)
# init weight_decay
if type(weight_decay) == float or type(weight_decay) == int:
if weight_decay < 0.0:
raise ValueError(
"Invalid weight_decay value: {}".format(weight_decay))
self.weight_decay = Constant(weight_decay)
elif isinstance(weight_decay, DecayScheduler):
self.weight_decay = weight_decay
else:
raise TypeError("Wrong weight_decay type")
self.decay_value = self.weight_decay(self.step_counter)
# init beta_1
if type(beta_1) == float or type(beta_1) == int:
self.beta_1 = Constant(beta_1)
elif isinstance(beta_1, DecayScheduler):
self.beta_1 = beta_1
else:
raise TypeError("Wrong beta_1 type")
self.beta_1_value = self.beta_1(self.step_counter)
# init beta_2
if type(beta_2) == float or type(beta_2) == int:
self.beta_2 = Constant(beta_2)
elif isinstance(beta_2, DecayScheduler):
self.beta_2 = beta_2
else:
raise TypeError("Wrong beta_2 type")
self.beta_2_value = self.beta_2(self.step_counter)
# init epsilon
if type(epsilon) == float or type(epsilon) == int:
self.epsilon = Constant(epsilon)
elif isinstance(epsilon, DecayScheduler):
self.epsilon = epsilon
else:
raise TypeError("Wrong epsilon type")
self.epsilon_value = self.epsilon(self.step_counter)
# init m and v
self.m = dict()
self.v = dict()
def apply(self, param_name, param_value, param_grad):
"""Performs a single optimization step.
Args:
param_name(String): the name of the param
param_value(Tensor): param values to be update in-place
grad(Tensor): param gradients; the values may be updated
in this function; cannot use it anymore
"""
assert param_value.shape == param_grad.shape, ("shape mismatch",
param_value.shape,
param_grad.shape)
self.device_check(param_value, self.step_counter, self.lr_value,
self.beta_1_value, self.beta_2_value,
self.epsilon_value, self.decay_value)
# if self.decay_value != 0:
if self.weight_decay.init_value != 0:
singa.Axpy(self.decay_value.data, param_value.data, param_grad.data)
if param_name not in self.m:
flag = param_value.device.graph_enabled()
param_value.device.EnableGraph(False)
self.m[param_name] = tensor.zeros_like(param_value)
self.v[param_name] = tensor.zeros_like(param_value)
param_value.device.EnableGraph(flag)
# overall steps
# m := beta_1 * m + (1 - beta_1) * grad
# v := beta_2 * v + (1 - beta_2) * grad * grad
# m_norm = m / (1 - beta_1 ^ step)
# v_norm = v / (1 - beta_2 ^ step)
# param := param - (lr * m_norm) / ( sqrt(v_norm) + epsilon) )
step = self.step_counter + 1.0
# m := beta_1 * m + (1 - beta_1) * grad
tmp = 1.0 - self.beta_1_value
self.m[param_name] *= self.beta_1_value
singa.Axpy(tmp.data, param_grad.data, self.m[param_name].data)
# v := beta_2 * v + (1 - beta_2) * grad * grad
tmp = 1.0 - self.beta_2_value
self.v[param_name] *= self.beta_2_value
singa.Axpy(tmp.data, singa.Square(param_grad.data),
self.v[param_name].data)
# m_norm = m / (1 - beta_1 ^ step)
tmp = tensor.pow(self.beta_1_value, step)
tmp = 1.0 - tmp
m_norm = self.m[param_name] / tmp
# v_norm = v / (1 - beta_2 ^ step)
tmp = tensor.pow(self.beta_2_value, step)
tmp = 1.0 - tmp
v_norm = self.v[param_name] / tmp
# param := param - (lr * m_norm) / ( sqrt(v_norm) + epsilon) )
a = tensor.sqrt(v_norm) + self.epsilon_value
tmp = m_norm / a
minus_lr = 0.0 - self.lr_value
singa.Axpy(minus_lr.data, tmp.data, param_value.data)
def step(self):
# increment step counter, lr and moment
super().step()
decay_value = self.weight_decay(self.step_counter)
beta_1_value = self.beta_1(self.step_counter)
beta_2_value = self.beta_2(self.step_counter)
self.decay_value.copy_from(decay_value)
self.beta_1_value.copy_from(beta_1_value)
self.beta_2_value.copy_from(beta_2_value)
def get_states(self):
states = super().get_states()
states['m'] = self.m # a dict for 1st order moments tensors
states['v'] = self.v # a dict for 2nd order moments tensors
return states
def set_states(self, states):
super().set_states(states)
if 'm' in states:
self.m = states['m']
if 'v' in states:
self.v = states['v']
class DistOpt(object):
"""The class is designed to wrap an optimizer to do distributed training.
This class is used to wrap an optimizer object to perform distributed training based
on multiprocessing. Each process has an individual rank, which gives information of
which GPU the individual process is using. The training data is partitioned, so that
each process can evaluate the sub-gradient based on the partitioned training data.
Once the sub-graident is calculated on each processes, the overall stochastic gradient
is obtained by all-reducing the sub-gradients evaluated by all processes. The all-reduce
operation is supported by the NVidia Collective Communication Library (NCCL).
Args:
opt(Optimizer): The optimizer to be wrapped.
nccl_id(NcclIdHolder): an nccl id holder object for a unique communication id
local_rank(int): local rank of a process on the current node
world_size(int): total number of processes
buffSize(int): the buffSize in terms of number of elements used in nccl communicator
Attributes:
world_size(int): total number of processes
local_rank(int): local rank of a process on the current node
global_rank(int): global rank of a process
Typical usage example:
>> > from singa import opt
>> > optimizer = opt.SGD(lr=0.1, momentum=0.9)
>> > optimizer = opt.DistOpt(sgd)
"""
def __init__(self,
opt=SGD(),
nccl_id=None,
local_rank=None,
world_size=None,
buffSize=4194304):
self.opt = opt
if nccl_id is None:
# constructure for application using MPI
self.communicator = singa.Communicator(buffSize)
else:
# constructor for application using python multi-process module
self.communicator = singa.Communicator(local_rank, world_size,
nccl_id, buffSize)
self.world_size = self.communicator.world_size
self.local_rank = self.communicator.local_rank
self.global_rank = self.communicator.global_rank
def __call__(self, loss):
self.backward_and_update(loss)
def update(self, param, grad):
"""Performs a single optimization step.
Args:
param(Tensor): param values to be update
grad(Tensor): param gradients
"""
grad /= self.world_size
self.opt.update(param, grad)
def all_reduce(self, tensor):
"""Performs all reduce of a tensor for distributed training.
Args:
tensor(Tensor): a tensor to be all-reduced
"""
self.communicator.synch(tensor)
def fused_all_reduce(self, tensor, send=True):
"""Performs all reduce of the tensors after fusing them in a buffer.
Args:
tensor(List of Tensors): a list of tensors to be all-reduced
send(bool): When send is False, the tensor won't be send to the
target device immediately, it will be copied to the buffer first
"""
tensor = singa.VecTensor(tensor)
self.communicator.fusedSynch(tensor, send)
def all_reduce_half(self, tensor):
"""Performs all reduce of a tensor after converting to FP16.
Args:
tensor(Tensor): a tensor to be all-reduced
"""
self.communicator.synchHalf(tensor)
def fused_all_reduce_half(self, tensor, send=True):
"""Performs all reduce of the tensors after fusing and converting them to FP16.
Args:
tensor(List of Tensors): a list of tensors to be all-reduced
send(bool): When send is False, the tensor won't be send to the
target device immediately, it will be copied to the buffer first
"""
tensor = singa.VecTensor(tensor)
self.communicator.fusedSynchHalf(tensor, send)
def sparsification(self, tensor, accumulation, spars, topK):
"""Performs all reduce of a tensor after sparsification.
Args:
tensor(Tensor): a tensor to be all-reduced
accumulation(Tensor): local gradient accumulation
spars(float): a parameter to control sparsity as defined below
topK(bool): When topK is False, it sparsifies the gradient with absolute
value >= sparsWhen topK is True, it sparsifies a fraction of total gradient
number equals to spars, E.g. when spars = 0.01, it sparsifies 1 % of the
total gradient elements
"""
if accumulation is None:
self.communicator.sparsification(tensor, spars, topK)
else:
self.communicator.sparsification(tensor, accumulation, spars, topK)
def fused_sparsification(self, tensor, accumulation, spars, topK):
"""Performs all reduce of the tensors after fusing and sparsification.
Args:
tensor(List of Tensors): a list of tensors to be all-reduced
accumulation(Tensor): local gradient accumulation
spars(float): a parameter to control sparsity as defined below
topK(bool): When topK is False, it sparsifies the gradient with absolute
value >= sparsWhen topK is True, it sparsifies a fraction of total gradient
number equals to spars, E.g. when spars = 0.01, it sparsifies 1 % of the
total gradient elements
"""
tensor = singa.VecTensor(tensor)
if accumulation is None:
self.communicator.fusedSparsification(tensor, spars, topK)
else:
self.communicator.fusedSparsification(tensor, accumulation, spars,
topK)
def wait(self):
"""Wait for the cuda streams used by the communicator to finish their operations."""
self.communicator.wait()
def backward_and_update(self, loss, threshold=2097152):
"""Performs backward propagation from the loss and parameter update.
From the loss, it performs backward propagation to get the gradients and do the parameter
update. For gradient communication, it fuses all the tensor smaller than the threshold
value to reduce network latency.
Args:
loss(Tensor): loss is the objective function of the deep learning model
optimization, e.g. for classification problem it can be the output of the
softmax_cross_entropy function.
threshold(int): threshold is a parameter to control performance in fusing
the tensors. For the tensors of sizes smaller than threshold, they are to
be accumulated and fused before the all reduce operation. For the tensors
of its size larger than the threshold value, they are to be reduced directly
without fusion.
"""
plist = []
acc = 0
glist = []
for p, g in autograd.backward(loss):
if g.size() > threshold:
# larger than threshold -> reduced directly
self.all_reduce(g.data)
else:
# smaller than threshold -> accumulate
glist.append(g.data)
self.fused_all_reduce([g.data], send=False)
acc += g.size()
if (acc > threshold):
self.fused_all_reduce(glist)
acc = 0
glist = []
plist.append((p, g))
if glist:
self.fused_all_reduce(glist)
self.wait()
for p, g in plist:
self.update(p, g)
self.opt.step()
def backward_and_update_half(self,
loss,
threshold=2097152,
clipping=False,
clip_Value=100):
"""Performs backward propagation and parameter update, with FP16 precision communication.
THIS IS A EXPERIMENTAL FUNCTION FOR RESEARCH PURPOSE:
From the loss, it performs backward propagation to get the gradients and do the parameter
update. For gradient communication, it fuses all the tensor smaller than the threshold value
to reduce network latency, as well as converting them to FP16 half precision format before
sending them out. To assist training, this functions provide an option to perform gradient
clipping.
Args:
loss(Tensor): loss is the objective function of the deep learning model
optimization, e.g. for classification problem it can be the output of the
softmax_cross_entropy function.
threshold(int): threshold is a parameter to control performance in fusing
the tensors. For the tensors of sizes smaller than threshold, they are to
be accumulated and fused before the all reduce operation. For the tensors
of its size larger than the threshold value, they are to be reduced directly
without fusion.
clipping(bool): a boolean flag to choose whether to clip the gradient value
clip_value(float): the clip value to be used when clipping is True
"""
plist = []
acc = 0
glist = []
for p, g in autograd.backward(loss):
assert p.dtype == tensor.float32, (
'This function is only available for input tensor precision 32 bit, '
'which are converted into 16 bits before transmit')
if clipping:
g = autograd.clip(g, -clip_Value, clip_Value)
if g.size() > threshold:
# larger than threshold -> reduced directly
self.all_reduce_half(g.data)
else:
# smaller than threshold -> accumulate
glist.append(g.data)
self.fused_all_reduce_half([g.data], send=False)
acc += g.size()
if (acc > threshold):
self.fused_all_reduce_half(glist)
acc = 0
glist = []
plist.append((p, g))
if glist:
self.fused_all_reduce_half(glist)
self.wait()
for p, g in plist:
self.update(p, g)
self.opt.step()
def backward_and_partial_update(self, loss, threshold=2097152):
"""Performs backward propagation from the loss and parameter update using asychronous training.
THIS IS A EXPERIMENTAL FUNCTION FOR RESEARCH PURPOSE:
From the loss, it performs backward propagation to get the gradients and do the parameter
update. It fuses the tensors smaller than the threshold value to reduce network latency,
as well as performing asychronous training where one parameter partition is all-reduced
per iteration. The size of the parameter partition depends on the threshold value.
Args:
loss(Tensor): loss is the objective function of the deep learning model
optimization, e.g. for classification problem it can be the output of the
softmax_cross_entropy function.
threshold(int): threshold is a parameter to control performance in fusing
the tensors. For the tensors of sizes smaller than threshold, they are to
be accumulated and fused before the all reduce operation. For the tensors
of its size larger than the threshold value, they are to be reduced directly
without fusion.
Attributes:
self.partial(int): A counter to determine which partition to perform all-reduce.
This counter resets to zero automatlly after an update cycle of the full parameter
set.
"""
if not hasattr(self, "partial"):
self.partial = 0
self.partial += 1
k = 0
plist = []
acc = 0
tenlist = []
reduced = []
for p, g in autograd.backward(loss):
# every parameters update locally
self.opt.update(p, g)
# then do the partial parameter sychronization
if p.size() > threshold:
# larger than threshold -> reduced directly
# k is the partition number of the full gradient set
k += 1
if (k == self.partial):
self.all_reduce(p.data)
reduced.append(p)
else:
# smaller than threshold -> accumulate
plist.append(p.data)
tenlist.append(p)
acc += p.size()
if (acc > threshold):
k += 1
if (k == self.partial):
self.fused_all_reduce(plist, send=False)
self.fused_all_reduce(plist)
reduced = tenlist
acc = 0
plist = []
tenlist = []
if plist:
k += 1
if (k == self.partial):
self.fused_all_reduce(plist, send=False)
self.fused_all_reduce(plist)
reduced = tenlist
self.wait()
# the all-reduced parameters needed to be averaged
for r in reduced:
r /= self.world_size
# the counter returns to zero after a cycle of partial update
if (k == self.partial):
self.partial = 0
self.opt.step()
def backward_and_sparse_update(self,
loss,
threshold=2097152,
spars=0.05,
topK=False,
corr=True):
""" Performs backward propagation from the loss and parameter update with sparsification.
THIS IS A EXPERIMENTAL FUNCTION FOR RESEARCH PURPOSE:
From the loss, it performs backward propagation to get the gradients and do the parameter
update. It fuses the tensors with size smaller than the threshold value to reduce network
latency, as well as using sparsification schemes to transfer only the gradient elements which
are significant.
Args:
loss(Tensor): loss is the objective function of the deep learning model
optimization, e.g. for classification problem it can be the output of the
softmax_cross_entropy function.
threshold(int): threshold is a parameter to control performance in fusing
the tensors. For the tensors of sizes smaller than threshold, they are to
be accumulated and fused before the all reduce operation. For the tensors
of its size larger than the threshold value, they are to be reduced directly
without fusion.
spars(float): a parameter to control sparsity as defined below
topK(bool): When topK is False, it sparsifies the gradient with absolute
value >= sparsWhen topK is True, it sparsifies a fraction of total gradient
number equals to spars, E.g. when spars = 0.01, it sparsifies 1 % of the
total gradient elements
corr(bool): whether to use the local accumulate gradient for correction
Attributes:
self.sparsInit: A counter to determine which partition to perform all-reduce.
self.gradAccumulation: Local gradient accumulation
"""
if ((not hasattr(self, "sparsInit")) and corr):
self.gradAccumulation = []
self.sparsInit = False
plist = []
acc = 0
k = -1
glist = []
for p, g in autograd.backward(loss):
if g.size() > threshold:
# larger than threshold -> reduced directly
k += 1
if (corr and (not self.sparsInit)):
# create a tensor for the gradient accumulation
flag = p.device.graph_enabled()
p.device.EnableGraph(False)
self.gradAccumulation.append(
tensor.Tensor((g.size(),), p.device, p.dtype))
self.gradAccumulation[k].set_value(0.0)
p.device.EnableGraph(flag)
if corr:
self.sparsification(g.data, self.gradAccumulation[k].data,
spars, topK)
else:
self.sparsification(g.data, None, spars, topK)
else:
# smaller than threshold -> accumulate
glist.append(g.data)
acc += g.size()
if (acc > threshold):
k += 1
if (corr and (not self.sparsInit)):
# create a tensor for the gradient accumulation
flag = p.device.graph_enabled()
p.device.EnableGraph(False)
self.gradAccumulation.append(
tensor.Tensor((acc,), p.device, p.dtype))
self.gradAccumulation[k].set_value(0.0)
p.device.EnableGraph(flag)
if corr:
self.fused_sparsification(glist,
self.gradAccumulation[k].data,
spars, topK)
else:
self.fused_sparsification(glist, None, spars, topK)
acc = 0
glist = []
plist.append((p, g))
if glist:
k += 1
if (corr and (not self.sparsInit)):
# create a tensor for the gradient accumulation
flag = p.device.graph_enabled()
p.device.EnableGraph(False)
self.gradAccumulation.append(
tensor.Tensor((acc,), p.device, p.dtype))
self.gradAccumulation[k].set_value(0.0)
p.device.EnableGraph(flag)
if corr:
self.fused_sparsification(glist, self.gradAccumulation[k].data,
spars, topK)
else:
self.fused_sparsification(glist, None, spars, topK)
self.wait()
for p, g in plist:
self.update(p, g)
self.sparsInit = True
self.opt.step()