blob: ac3c49837a6a1ba39c08c642fa5dcd480aaea1c0 [file] [log] [blame]
/* ----------------------------------------------------------------------- *//**
*
* @file igd.hpp
*
* Generic implementaion of incremental gradient descent, in the fashion of
* user-definied aggregates. They should be called by actually database
* functions, after arguments are properly parsed.
*
*//* ----------------------------------------------------------------------- */
#include <dbconnector/dbconnector.hpp>
#ifndef MADLIB_MODULES_CONVEX_ALGO_IGD_HPP_
#define MADLIB_MODULES_CONVEX_ALGO_IGD_HPP_
namespace madlib {
namespace modules {
namespace convex {
// use Eigen
using namespace madlib::dbal::eigen_integration;
// The reason for using ConstState instead of const State to reduce the
// template type list: flexibility to high-level for mutability control
// More: cast<ConstState>(MutableState) may not always work
template <class State, class ConstState, class Task>
class IGD {
public:
typedef State state_type;
typedef ConstState const_state_type;
typedef typename Task::tuple_type tuple_type;
typedef typename Task::model_type model_type;
static void transition(state_type &state, const tuple_type &tuple);
static void merge(state_type &state, const_state_type &otherState);
static void transitionInMiniBatch(state_type &state, const tuple_type &tuple);
static void transitionInMiniBatchWithALR(state_type &state, const tuple_type &tuple);
static void mergeInPlace(state_type &state, const_state_type &otherState);
static void final(state_type &state);
};
template <class State, class ConstState, class Task>
void
IGD<State, ConstState, Task>::transition(state_type &state,
const tuple_type &tuple) {
// The reason for update model inside a Task:: function instead of
// returning the gradient and do it here: the gradient is a sparse
// representation of the model (which is dense), returning the gradient
// forces the algo to be aware of one more template type
// -- Task::sparse_model_type, which we do not explicit define
// apply to the model directly
Task::gradientInPlace(
state.algo.incrModel,
tuple.indVar,
tuple.depVar,
state.task.stepsize * tuple.weight);
}
template <class State, class ConstState, class Task>
void
IGD<State, ConstState, Task>::merge(state_type &state,
const_state_type &otherState) {
// Having zero checking here to reduce dependency to the caller.
// This can be removed if it affects performance in the future,
// with the expectation that callers should do the zero checking.
if (state.algo.numRows == 0) {
state.algo.incrModel = otherState.algo.incrModel;
return;
} else if (otherState.algo.numRows == 0) {
return;
}
// The reason of this weird algorithm instead of an intuitive one
// -- (w1 * m1 + w2 * m2) / (w1 + w2): we have only one mutable state,
// therefore, (m1 * w1 / w2 + m2) * w2 / (w1 + w2).
// Order: 111111111 22222 3333333333333333
// model averaging, weighted by rows seen
double totalNumRows = static_cast<double>(state.algo.numRows + otherState.algo.numRows);
state.algo.incrModel *= static_cast<double>(state.algo.numRows) /
static_cast<double>(otherState.algo.numRows);
state.algo.incrModel += otherState.algo.incrModel;
state.algo.incrModel *= static_cast<double>(otherState.algo.numRows) /
static_cast<double>(totalNumRows);
}
/**
* @brief Update the transition state in mini-batches
*
* Note: We assume that
* 1. Task defines a model_eigen_type
* 2. A batch of tuple.indVar is a Matrix
* 3. A batch of tuple.depVar is a ColumnVector
* 4. Task defines a getLossAndUpdateModel method
*
*/
template <class State, class ConstState, class Task>
void
IGD<State, ConstState, Task>::transitionInMiniBatch(
state_type &state,
const tuple_type &tuple) {
madlib_assert(tuple.indVar.rows() == tuple.depVar.rows(),
std::runtime_error("Invalid data. Independent and dependent "
"batches don't have same number of rows."));
uint16_t batch_size = state.batchSize;
uint16_t n_epochs = state.nEpochs;
// n_rows/n_ind_cols are the rows/cols in a transition tuple.
Index n_rows = tuple.indVar.rows();
size_t n_batches = n_rows < batch_size ? 1 :
size_t(n_rows / batch_size) + size_t(n_rows % batch_size > 0);
double max_loss = 0.0;
for (int curr_epoch=0; curr_epoch < n_epochs; curr_epoch++) {
double loss = 0.0;
/*
Randomizing the input data before every iteration is good for
minibatch gradient descent convergence. Since we don't do that,
we are randomizing the order in which every batch is visited in
a buffer. Note that this still does not randomize rows within
a batch.
*/
std::vector<size_t> random_curr_batch(n_batches, 0);
for(size_t i=0; i < n_batches; i++) {
random_curr_batch[i] = i;
}
std::random_shuffle(&random_curr_batch[0], &random_curr_batch[n_batches]);
for (size_t i = 0; i < n_batches; i++) {
size_t curr_batch = random_curr_batch[i];
Index curr_batch_row_index = static_cast<Index>(curr_batch * batch_size);
Matrix X_batch;
Matrix Y_batch;
if (curr_batch == n_batches-1) {
// last batch
X_batch = tuple.indVar.bottomRows(n_rows - curr_batch_row_index);
Y_batch = tuple.depVar.bottomRows(n_rows - curr_batch_row_index);
} else {
X_batch = tuple.indVar.block(curr_batch_row_index, 0,
batch_size, tuple.indVar.cols());
Y_batch = tuple.depVar.block(curr_batch_row_index, 0,
batch_size, tuple.depVar.cols());
}
Task::lambda = state.lambda;
loss += Task::getLossAndUpdateModel(
state.model, X_batch, Y_batch, state.stepsize);
}
if (max_loss < loss) max_loss = loss;
}
// Be pessimistic and report the maximum loss
state.loss += max_loss;
return;
}
/**
* @brief Update the transition state in mini-batches using adaptive learning
* rate solvers.
*
* Note: We assume that
* 1. Task defines a model_eigen_type
* 2. A batch of tuple.indVar is a Matrix
* 3. A batch of tuple.depVar is a ColumnVector
* 4. Task defines a getLossAndUpdateModel method
*
*/
template <class State, class ConstState, class Task>
void
IGD<State, ConstState, Task>::transitionInMiniBatchWithALR(
state_type &state,
const tuple_type &tuple) {
madlib_assert(tuple.indVar.rows() == tuple.depVar.rows(),
std::runtime_error("Invalid data. Independent and dependent "
"batches don't have same number of rows."));
uint16_t batch_size = state.batchSize;
uint16_t n_epochs = state.nEpochs;
// n_rows/n_ind_cols are the rows/cols in a transition tuple.
Index n_rows = tuple.indVar.rows();
size_t n_batches = n_rows < batch_size ? 1 :
size_t(n_rows / batch_size) + size_t(n_rows % batch_size > 0);
double max_loss = 0.0;
std::vector<Matrix> m(state.model.num_layers);
std::vector<Matrix> v(state.model.num_layers);
int t = 0;
for (Index k=0; k < state.model.num_layers; ++k) {
m[k] = Matrix::Zero(state.model.u[k].rows(), state.model.u[k].cols());
v[k] = Matrix::Zero(state.model.u[k].rows(), state.model.u[k].cols());
}
for (int curr_epoch=0; curr_epoch < n_epochs; curr_epoch++) {
double loss = 0.0;
/*
Randomizing the input data before every iteration is good for
minibatch gradient descent convergence. Since we don't do that,
we are randomizing the order in which every batch is visited in
a buffer. Note that this still does not randomize rows within
a batch.
*/
std::vector<size_t> random_curr_batch(n_batches, 0);
for(size_t i=0; i < n_batches; i++) {
random_curr_batch[i] = i;
}
std::random_shuffle(&random_curr_batch[0], &random_curr_batch[n_batches]);
for (size_t i = 0; i < n_batches; i++) {
size_t curr_batch = random_curr_batch[i];
Index curr_batch_row_index = static_cast<Index>(curr_batch * batch_size);
Matrix X_batch;
Matrix Y_batch;
if (curr_batch == n_batches-1) {
// last batch
X_batch = tuple.indVar.bottomRows(n_rows - curr_batch_row_index);
Y_batch = tuple.depVar.bottomRows(n_rows - curr_batch_row_index);
} else {
X_batch = tuple.indVar.block(curr_batch_row_index, 0,
batch_size, tuple.indVar.cols());
Y_batch = tuple.depVar.block(curr_batch_row_index, 0,
batch_size, tuple.depVar.cols());
}
t++;
Task::lambda = state.lambda;
loss += Task::getLossAndUpdateModelALR(state.model, X_batch, Y_batch,
state.stepsize, state.opt_code,
state.rho, m, state.beta1,
state.beta2, v, t, state.eps);
}
if (max_loss < loss) max_loss = loss;
}
// Be pessimistic and report the maximum loss
state.loss += max_loss;
return;
}
template <class State, class ConstState, class Task>
void
IGD<State, ConstState, Task>::mergeInPlace(state_type &state,
const_state_type &otherState) {
// avoid division by zero
if (state.numRows == 0) {
state.model = otherState.model;
return;
} else if (otherState.numRows == 0) {
return;
}
// model averaging, weighted by rows seen
double leftRows = static_cast<double>(state.numRows + state.numRows);
double rightRows = static_cast<double>(otherState.numRows + otherState.numRows);
double totalNumRows = leftRows + rightRows;
state.model *= leftRows / rightRows;
state.model += otherState.model;
state.model *= rightRows / totalNumRows;
}
template <class State, class ConstState, class Task>
void
IGD<State, ConstState, Task>::final(state_type &state) {
// The reason that we have to keep the task.model untouched in transition
// funtion: loss computation needs the model from last iteration cleanly
state.task.model = state.algo.incrModel;
}
} // namespace convex
} // namespace modules
} // namespace madlib
#endif