|  |  | 
|  | #include "dbconnector/dbconnector.hpp" | 
|  | #include "state/igd.hpp" | 
|  | #include "share/shared_utils.hpp" | 
|  | #include <limits> | 
|  |  | 
|  | namespace madlib { | 
|  | namespace modules { | 
|  | namespace elastic_net { | 
|  |  | 
|  | template <class Model> | 
|  | class Igd | 
|  | { | 
|  | public: | 
|  | static AnyType igd_transition (AnyType& args, const Allocator& inAllocator); | 
|  | static AnyType igd_merge (AnyType& args); | 
|  | static AnyType igd_final (AnyType& args); | 
|  | static AnyType igd_state_diff (AnyType& args); | 
|  | static AnyType igd_result (AnyType& args); | 
|  |  | 
|  | private: | 
|  | static double p_abs (CVector& v, double r); | 
|  | static void link_fn (CVector& theta, CVector& w, double p); | 
|  | static double sign (const double & x); | 
|  | }; | 
|  |  | 
|  | // ------------------------------------------------------------------------ | 
|  |  | 
|  | /** | 
|  | @brief Perform IGD transition step | 
|  |  | 
|  | It is called for each tuple. | 
|  |  | 
|  | The input AnyType has 9 args: state. ind_var, dep_var, | 
|  | pre_state, lambda, alpha, dimension, stepsize, totalrows | 
|  | */ | 
|  | template <class Model> | 
|  | AnyType Igd<Model>::igd_transition (AnyType& args, const Allocator& inAllocator) | 
|  | { | 
|  | IgdState<MutableArrayHandle<double> > state = args[0]; | 
|  | double lambda = args[4].getAs<double>(); | 
|  | double stepsize = args[7].getAs<double>(); | 
|  |  | 
|  | // initialize the state if working on the first tuple | 
|  | if (state.numRows == 0) | 
|  | { | 
|  | if (!args[3].isNull()) | 
|  | { | 
|  | IgdState<ArrayHandle<double> > pre_state = args[3]; | 
|  | state.allocate(inAllocator, pre_state.dimension); | 
|  | state = pre_state; | 
|  | } | 
|  | else | 
|  | { | 
|  | double alpha = args[5].getAs<double>(); | 
|  | int dimension = args[6].getAs<int>(); | 
|  |  | 
|  | int total_rows = args[8].getAs<int>(); | 
|  |  | 
|  | state.allocate(inAllocator, dimension); | 
|  | state.step_decay = args[11].getAs<double>(); | 
|  | state.stepsize = stepsize * exp(state.step_decay); | 
|  | state.alpha = alpha; | 
|  | state.totalRows = total_rows; | 
|  | state.xmean = args[9].getAs<MappedColumnVector>(); | 
|  | state.ymean = args[10].getAs<double>(); | 
|  | // dual vector theta | 
|  | state.theta.setZero(); | 
|  | state.p = 2 * log(double(state.dimension)); | 
|  | state.lambda = lambda; | 
|  | state.q = state.p / (state.p - 1); | 
|  | link_fn(state.theta, state.coef, state.p); | 
|  |  | 
|  | Model::init_intercept (state); // initialize intercept | 
|  | } | 
|  |  | 
|  | //state.loss = 0.; | 
|  |  | 
|  | if (state.lambda != lambda) | 
|  | { | 
|  | state.lambda = lambda; | 
|  | state.stepsize = stepsize * exp(state.step_decay); | 
|  | } | 
|  |  | 
|  | // state.stepsize = state.stepsize / exp(state.step_decay); | 
|  |  | 
|  | state.numRows = 0; // resetting | 
|  | } | 
|  |  | 
|  | state.stepsize = state.stepsize / exp(state.step_decay); | 
|  |  | 
|  | try { | 
|  | MappedColumnVector x = args[1].getAs<MappedColumnVector>(); | 
|  | double y; | 
|  |  | 
|  | Model::get_y(y, args); | 
|  |  | 
|  | ColumnVector gradient(state.dimension); // gradient for coef only, not for intercept | 
|  | Model::compute_gradient(gradient, state, x, y); | 
|  |  | 
|  | double a = state.stepsize / static_cast<double>(state.totalRows); | 
|  | double b = state.stepsize * state.alpha * state.lambda | 
|  | / static_cast<double>(state.totalRows); | 
|  | for (uint32_t i = 0; i < state.dimension; i++) | 
|  | { | 
|  | // step 1 | 
|  | state.theta(i) -= a * gradient(i); | 
|  | double step1_sign = sign(state.theta(i)); | 
|  | // step 2 | 
|  | state.theta(i) -= b * sign(state.theta(i)); | 
|  | // set to 0 if the value crossed zero during the two steps | 
|  | if (step1_sign != sign(state.theta(i))) state.theta(i) = 0; | 
|  | } | 
|  |  | 
|  | link_fn(state.theta, state.coef, state.p); | 
|  |  | 
|  | Model::update_intercept(state, x, y); // intercept is updated separately | 
|  |  | 
|  | Model::update_loglikelihood(state, x, y); | 
|  | // state.loss += r * r / 2.; | 
|  | state.numRows ++; | 
|  | } catch(const ArrayWithNullException &e) { | 
|  | //  warning("Input array most likely contains NULL, skipping this array."); | 
|  | } catch(const std::invalid_argument &ia) { | 
|  | //warning("Input array is invalid (with NULL values), skipping this array."); | 
|  | } | 
|  |  | 
|  | return state; | 
|  | } | 
|  |  | 
|  | // ------------------------------------------------------------------------ | 
|  |  | 
|  | /** | 
|  | * @brief Perform the perliminary aggregation function: Merge transition states | 
|  | */ | 
|  | template <class Model> | 
|  | AnyType Igd<Model>::igd_merge (AnyType& args) | 
|  | { | 
|  | IgdState<MutableArrayHandle<double> > state1 = args[0]; | 
|  | IgdState<ArrayHandle<double> > state2 = args[1]; | 
|  |  | 
|  | // We first handle the trivial case where this function is called with one | 
|  | // of the states being the initial state | 
|  | if (state1.numRows == 0) { return state2; } | 
|  | else if (state2.numRows == 0) { return state1; } | 
|  |  | 
|  | double totalNumRows = static_cast<double>(state1.numRows + state2.numRows); | 
|  | state1.coef *= static_cast<double>(state1.numRows) / | 
|  | static_cast<double>(state2.numRows); | 
|  | state1.coef += state2.coef; | 
|  | state1.coef *= static_cast<double>(state2.numRows) / | 
|  | static_cast<double>(totalNumRows); | 
|  |  | 
|  | Model::merge_intercept(state1, state2); | 
|  | state1.loglikelihood += state2.loglikelihood; | 
|  |  | 
|  | // The following numRows update, cannot be put above, because the coef | 
|  | // averaging depends on their original values | 
|  | state1.numRows += state2.numRows; | 
|  |  | 
|  | if (state1.stepsize > state2.stepsize) | 
|  | state1.stepsize = state2.stepsize; | 
|  |  | 
|  | return state1; | 
|  | } | 
|  |  | 
|  | // ------------------------------------------------------------------------ | 
|  |  | 
|  | /** | 
|  | * @brief Perform the final step | 
|  | */ | 
|  | template <class Model> | 
|  | AnyType Igd<Model>::igd_final (AnyType& args) | 
|  | { | 
|  | // We request a mutable object. Depending on the backend, this might perform | 
|  | // a deep copy. | 
|  | IgdState<MutableArrayHandle<double> > state = args[0]; | 
|  |  | 
|  | // Aggregates that haven't seen any data just return Null. | 
|  | if (state.numRows == 0) return Null(); | 
|  |  | 
|  | Model::update_intercept_final (state); // intercept is updated separately | 
|  |  | 
|  | // generate new theta values | 
|  | link_fn(state.coef, state.theta, state.q); | 
|  |  | 
|  | // compute the final loglikelihood value from the accumulated loss value | 
|  | double loss_value; | 
|  | loss_value = state.loglikelihood / static_cast<double>(state.numRows * 2); | 
|  | double sum_sqr_coef = 0; | 
|  | double sum_abs_coef = 0; | 
|  | for (uint32_t i = 0; i < state.dimension; i++){ | 
|  | sum_sqr_coef += state.coef(i) * state.coef(i); | 
|  | sum_abs_coef += std::abs(state.coef(i)); | 
|  | } | 
|  | state.loglikelihood = -(loss_value + state.lambda * | 
|  | ((1 - state.alpha) * sum_sqr_coef / 2 + | 
|  | state.alpha * sum_abs_coef)); | 
|  | return state; | 
|  | } | 
|  |  | 
|  | // ------------------------------------------------------------------------ | 
|  |  | 
|  | /** | 
|  | * @brief Return the difference in RMSE between two states | 
|  | */ | 
|  | template <class Model> | 
|  | AnyType Igd<Model>::igd_state_diff (AnyType& args) | 
|  | { | 
|  | IgdState<ArrayHandle<double> > state1 = args[0]; | 
|  | IgdState<ArrayHandle<double> > state2 = args[1]; | 
|  | double diff; | 
|  | diff = std::abs(std::abs(state1.loglikelihood) - | 
|  | std::abs(state2.loglikelihood)); | 
|  | // double tmp; | 
|  | // double diff_sum = 0; | 
|  | // uint32_t n = state1.coef.rows(); | 
|  | // for (uint32_t i = 0; i < n; i++) | 
|  | // { | 
|  | //     diff = std::abs(state1.coef(i) - state2.coef(i)); | 
|  | //     tmp = std::abs(state2.coef(i)); | 
|  | //     if (tmp != 0) diff /= tmp; | 
|  | //     diff_sum += diff; | 
|  | // } | 
|  |  | 
|  | // // deal with intercept | 
|  | // diff = std::abs(state1.intercept - state2.intercept); | 
|  | // tmp = std::abs(state2.intercept); | 
|  | // if (tmp != 0) diff /= tmp; | 
|  | // diff_sum += diff; | 
|  | // return diff_sum / (n + 1); | 
|  | return diff; | 
|  | } | 
|  |  | 
|  | // ------------------------------------------------------------------------ | 
|  |  | 
|  | /** | 
|  | * @brief Return the coefficients and diagnostic statistics of the state | 
|  | */ | 
|  | template <class Model> | 
|  | AnyType Igd<Model>::igd_result (AnyType& args) | 
|  | { | 
|  | IgdState<MutableArrayHandle<double> > state = args[0]; | 
|  | MappedColumnVector x2 = args[1].getAs<MappedColumnVector>(); | 
|  | double threshold = args[2].getAs<double>(); | 
|  | double tolerance = args[3].getAs<double>(); | 
|  |  | 
|  | ColumnVector norm_coef(state.dimension); | 
|  | double avg = 0; | 
|  | for (uint32_t i = 0; i < state.dimension; i++) | 
|  | { | 
|  | norm_coef(i) = state.coef(i) * sqrt(x2(i) - state.xmean(i) * state.xmean(i)); | 
|  | avg += fabs(norm_coef(i)); | 
|  | } | 
|  | avg /= state.dimension; | 
|  |  | 
|  | for (uint32_t i = 0; i < state.dimension; i++) | 
|  | if (fabs(norm_coef(i)/avg) < threshold || fabs(norm_coef(i)) < tolerance) | 
|  | state.coef(i) = 0; | 
|  |  | 
|  | AnyType tuple; | 
|  | tuple << static_cast<double>(state.intercept) | 
|  | << state.coef | 
|  | << static_cast<double>(state.lambda); | 
|  |  | 
|  | return tuple; | 
|  | } | 
|  |  | 
|  | // ------------------------------------------------------------------------ | 
|  | template <class Model> | 
|  | inline double Igd<Model>::p_abs (CVector& v, double r) | 
|  | { | 
|  | double sum = 0; | 
|  | for (int i = 0; i < v.size(); i++) | 
|  | if (v(i) != 0) | 
|  | sum += pow(fabs(v(i)), r); | 
|  | return pow(sum, 1./r); | 
|  | } | 
|  |  | 
|  | // ------------------------------------------------------------------------ | 
|  |  | 
|  | // p-form link function, q = p/(p-1) | 
|  | // For inverse function, jut replace w with theta and q with p | 
|  | template <class Model> | 
|  | inline void Igd<Model>::link_fn (CVector& theta, CVector& w, double p) | 
|  | { | 
|  | double abs_theta = p_abs(theta, p); | 
|  | if (fabs(abs_theta) <= std::numeric_limits<double>::denorm_min()) | 
|  | { | 
|  | for (int i = 0; i < theta.size(); i++) w(i) = 0; | 
|  | return; | 
|  | } | 
|  |  | 
|  | double denominator = pow(abs_theta, p - 2); | 
|  |  | 
|  | for (int i = 0; i < theta.size(); i++) | 
|  | if (fabs(theta(i)) <= std::numeric_limits<double>::denorm_min()) | 
|  | w(i) = 0; | 
|  | else | 
|  | w(i) = sign(theta(i)) * pow(fabs(theta(i)), p - 1) | 
|  | / denominator; | 
|  | } | 
|  |  | 
|  | // ------------------------------------------------------------------------ | 
|  |  | 
|  | // sign of a number | 
|  | template <class Model> | 
|  | inline double Igd<Model>::sign (const double & x) | 
|  | { | 
|  | if (x > 0) | 
|  | return 1; | 
|  | else if (x < 0) | 
|  | return -1; | 
|  | else | 
|  | return 0; | 
|  | } | 
|  |  | 
|  | } // namespace elastic_net | 
|  | } // namespace modules | 
|  | } // namespace madlib |