blob: 5dd786b6d7cba123a893f6e0d27ac3a850723569 [file] [log] [blame]
#include "dbconnector/dbconnector.hpp"
#include "state/igd.hpp"
#include "share/shared_utils.hpp"
#include <limits>
namespace madlib {
namespace modules {
namespace elastic_net {
template <class Model>
class Igd
{
public:
static AnyType igd_transition (AnyType& args, const Allocator& inAllocator);
static AnyType igd_merge (AnyType& args);
static AnyType igd_final (AnyType& args);
static AnyType igd_state_diff (AnyType& args);
static AnyType igd_result (AnyType& args);
private:
static double p_abs (CVector& v, double r);
static void link_fn (CVector& theta, CVector& w, double p);
static double sign (const double & x);
};
// ------------------------------------------------------------------------
/**
@brief Perform IGD transition step
It is called for each tuple.
The input AnyType has 9 args: state. ind_var, dep_var,
pre_state, lambda, alpha, dimension, stepsize, totalrows
*/
template <class Model>
AnyType Igd<Model>::igd_transition (AnyType& args, const Allocator& inAllocator)
{
IgdState<MutableArrayHandle<double> > state = args[0];
double lambda = args[4].getAs<double>();
double stepsize = args[7].getAs<double>();
// initialize the state if working on the first tuple
if (state.numRows == 0)
{
if (!args[3].isNull())
{
IgdState<ArrayHandle<double> > pre_state = args[3];
state.allocate(inAllocator, pre_state.dimension);
state = pre_state;
}
else
{
double alpha = args[5].getAs<double>();
int dimension = args[6].getAs<int>();
int total_rows = args[8].getAs<int>();
state.allocate(inAllocator, dimension);
state.step_decay = args[11].getAs<double>();
state.stepsize = stepsize * exp(state.step_decay);
state.alpha = alpha;
state.totalRows = total_rows;
state.xmean = args[9].getAs<MappedColumnVector>();
state.ymean = args[10].getAs<double>();
// dual vector theta
state.theta.setZero();
state.p = 2 * log(double(state.dimension));
state.lambda = lambda;
state.q = state.p / (state.p - 1);
link_fn(state.theta, state.coef, state.p);
Model::init_intercept (state); // initialize intercept
}
//state.loss = 0.;
if (state.lambda != lambda)
{
state.lambda = lambda;
state.stepsize = stepsize * exp(state.step_decay);
}
// state.stepsize = state.stepsize / exp(state.step_decay);
state.numRows = 0; // resetting
}
state.stepsize = state.stepsize / exp(state.step_decay);
try {
MappedColumnVector x = args[1].getAs<MappedColumnVector>();
double y;
Model::get_y(y, args);
ColumnVector gradient(state.dimension); // gradient for coef only, not for intercept
Model::compute_gradient(gradient, state, x, y);
double a = state.stepsize / static_cast<double>(state.totalRows);
double b = state.stepsize * state.alpha * state.lambda
/ static_cast<double>(state.totalRows);
for (uint32_t i = 0; i < state.dimension; i++)
{
// step 1
state.theta(i) -= a * gradient(i);
double step1_sign = sign(state.theta(i));
// step 2
state.theta(i) -= b * sign(state.theta(i));
// set to 0 if the value crossed zero during the two steps
if (step1_sign != sign(state.theta(i))) state.theta(i) = 0;
}
link_fn(state.theta, state.coef, state.p);
Model::update_intercept(state, x, y); // intercept is updated separately
Model::update_loglikelihood(state, x, y);
// state.loss += r * r / 2.;
state.numRows ++;
} catch(const ArrayWithNullException &e) {
// warning("Input array most likely contains NULL, skipping this array.");
} catch(const std::invalid_argument &ia) {
//warning("Input array is invalid (with NULL values), skipping this array.");
}
return state;
}
// ------------------------------------------------------------------------
/**
* @brief Perform the perliminary aggregation function: Merge transition states
*/
template <class Model>
AnyType Igd<Model>::igd_merge (AnyType& args)
{
IgdState<MutableArrayHandle<double> > state1 = args[0];
IgdState<ArrayHandle<double> > state2 = args[1];
// We first handle the trivial case where this function is called with one
// of the states being the initial state
if (state1.numRows == 0) { return state2; }
else if (state2.numRows == 0) { return state1; }
double totalNumRows = static_cast<double>(state1.numRows + state2.numRows);
state1.coef *= static_cast<double>(state1.numRows) /
static_cast<double>(state2.numRows);
state1.coef += state2.coef;
state1.coef *= static_cast<double>(state2.numRows) /
static_cast<double>(totalNumRows);
Model::merge_intercept(state1, state2);
state1.loglikelihood += state2.loglikelihood;
// The following numRows update, cannot be put above, because the coef
// averaging depends on their original values
state1.numRows += state2.numRows;
if (state1.stepsize > state2.stepsize)
state1.stepsize = state2.stepsize;
return state1;
}
// ------------------------------------------------------------------------
/**
* @brief Perform the final step
*/
template <class Model>
AnyType Igd<Model>::igd_final (AnyType& args)
{
// We request a mutable object. Depending on the backend, this might perform
// a deep copy.
IgdState<MutableArrayHandle<double> > state = args[0];
// Aggregates that haven't seen any data just return Null.
if (state.numRows == 0) return Null();
Model::update_intercept_final (state); // intercept is updated separately
// generate new theta values
link_fn(state.coef, state.theta, state.q);
// compute the final loglikelihood value from the accumulated loss value
double loss_value;
loss_value = state.loglikelihood / static_cast<double>(state.numRows * 2);
double sum_sqr_coef = 0;
double sum_abs_coef = 0;
for (uint32_t i = 0; i < state.dimension; i++){
sum_sqr_coef += state.coef(i) * state.coef(i);
sum_abs_coef += std::abs(state.coef(i));
}
state.loglikelihood = -(loss_value + state.lambda *
((1 - state.alpha) * sum_sqr_coef / 2 +
state.alpha * sum_abs_coef));
return state;
}
// ------------------------------------------------------------------------
/**
* @brief Return the difference in RMSE between two states
*/
template <class Model>
AnyType Igd<Model>::igd_state_diff (AnyType& args)
{
IgdState<ArrayHandle<double> > state1 = args[0];
IgdState<ArrayHandle<double> > state2 = args[1];
double diff;
diff = std::abs(std::abs(state1.loglikelihood) -
std::abs(state2.loglikelihood));
// double tmp;
// double diff_sum = 0;
// uint32_t n = state1.coef.rows();
// for (uint32_t i = 0; i < n; i++)
// {
// diff = std::abs(state1.coef(i) - state2.coef(i));
// tmp = std::abs(state2.coef(i));
// if (tmp != 0) diff /= tmp;
// diff_sum += diff;
// }
// // deal with intercept
// diff = std::abs(state1.intercept - state2.intercept);
// tmp = std::abs(state2.intercept);
// if (tmp != 0) diff /= tmp;
// diff_sum += diff;
// return diff_sum / (n + 1);
return diff;
}
// ------------------------------------------------------------------------
/**
* @brief Return the coefficients and diagnostic statistics of the state
*/
template <class Model>
AnyType Igd<Model>::igd_result (AnyType& args)
{
IgdState<MutableArrayHandle<double> > state = args[0];
MappedColumnVector x2 = args[1].getAs<MappedColumnVector>();
double threshold = args[2].getAs<double>();
double tolerance = args[3].getAs<double>();
ColumnVector norm_coef(state.dimension);
double avg = 0;
for (uint32_t i = 0; i < state.dimension; i++)
{
norm_coef(i) = state.coef(i) * sqrt(x2(i) - state.xmean(i) * state.xmean(i));
avg += fabs(norm_coef(i));
}
avg /= state.dimension;
for (uint32_t i = 0; i < state.dimension; i++)
if (fabs(norm_coef(i)/avg) < threshold || fabs(norm_coef(i)) < tolerance)
state.coef(i) = 0;
AnyType tuple;
tuple << static_cast<double>(state.intercept)
<< state.coef
<< static_cast<double>(state.lambda);
return tuple;
}
// ------------------------------------------------------------------------
template <class Model>
inline double Igd<Model>::p_abs (CVector& v, double r)
{
double sum = 0;
for (int i = 0; i < v.size(); i++)
if (v(i) != 0)
sum += pow(fabs(v(i)), r);
return pow(sum, 1./r);
}
// ------------------------------------------------------------------------
// p-form link function, q = p/(p-1)
// For inverse function, jut replace w with theta and q with p
template <class Model>
inline void Igd<Model>::link_fn (CVector& theta, CVector& w, double p)
{
double abs_theta = p_abs(theta, p);
if (fabs(abs_theta) <= std::numeric_limits<double>::denorm_min())
{
for (int i = 0; i < theta.size(); i++) w(i) = 0;
return;
}
double denominator = pow(abs_theta, p - 2);
for (int i = 0; i < theta.size(); i++)
if (fabs(theta(i)) <= std::numeric_limits<double>::denorm_min())
w(i) = 0;
else
w(i) = sign(theta(i)) * pow(fabs(theta(i)), p - 1)
/ denominator;
}
// ------------------------------------------------------------------------
// sign of a number
template <class Model>
inline double Igd<Model>::sign (const double & x)
{
if (x > 0)
return 1;
else if (x < 0)
return -1;
else
return 0;
}
} // namespace elastic_net
} // namespace modules
} // namespace madlib