blob: 0fb17b338f3adcb754487e63c1c53f20d54cf9b6 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*
* @file mlp_igd.cpp
*
* @brief Multilayer Perceptron functions
*
*//* ----------------------------------------------------------------------- */
#include <boost/lexical_cast.hpp>
#include <dbconnector/dbconnector.hpp>
#include <modules/shared/HandleTraits.hpp>
#include "mlp_igd.hpp"
#include "task/mlp.hpp"
#include "task/l2.hpp"
#include "algo/igd.hpp"
#include "algo/loss.hpp"
#include "type/tuple.hpp"
#include "type/model.hpp"
#include "type/state.hpp"
namespace madlib {
namespace modules {
namespace convex {
// These 2 classes contain public static methods that can be called
typedef IGD<MLPIGDState<MutableArrayHandle<double> >, MLPIGDState<ArrayHandle<double> >,
MLP<MLPModel<MutableArrayHandle<double> >, MLPTuple > > MLPIGDAlgorithm;
typedef IGD<MLPMiniBatchState<MutableArrayHandle<double> >, MLPMiniBatchState<ArrayHandle<double> >,
MLP<MLPModel<MutableArrayHandle<double> >, MiniBatchTuple > > MLPMiniBatchAlgorithm;
typedef Loss<MLPIGDState<MutableArrayHandle<double> >, MLPIGDState<ArrayHandle<double> >,
MLP<MLPModel<MutableArrayHandle<double> >, MLPTuple > > MLPLossAlgorithm;
typedef MLP<MLPModel<MutableArrayHandle<double> >,MLPTuple> MLPTask;
typedef MLPModel<MutableArrayHandle<double> > MLPModelType;
/**
* @brief Perform the multilayer perceptron transition step
*
* Called for each tuple.
*/
AnyType
mlp_igd_transition::run(AnyType &args) {
// For the first tuple: args[0] is nothing more than a marker that
// indicates that we should do some initial operations.
// For other tuples: args[0] holds the computation state until last tuple
MLPIGDState<MutableArrayHandle<double> > state = args[0];
// initilize the state if first tuple
if (state.algo.numRows == 0) {
if (!args[3].isNull()) {
MLPIGDState<ArrayHandle<double> > previousState = args[3];
state.allocate(*this, previousState.task.numberOfStages,
previousState.task.numbersOfUnits);
state = previousState;
} else {
// configuration parameters and initialization
// this is run only once (first iteration, first tuple)
ArrayHandle<double> numbersOfUnits = args[4].getAs<ArrayHandle<double> >();
uint16_t numberOfStages = static_cast<uint16_t>(numbersOfUnits.size() - 1);
state.allocate(*this, numberOfStages,
reinterpret_cast<const double *>(numbersOfUnits.ptr()));
state.task.stepsize = args[5].getAs<double>();
state.task.model.activation = static_cast<double>(args[6].getAs<int>());
state.task.model.is_classification = static_cast<double>(args[7].getAs<int>());
// args[8] is for weighting the input row, which is populated later.
state.task.lambda = args[10].getAs<double>();
MLPTask::lambda = state.task.lambda;
state.task.model.momentum = args[11].getAs<double>();
state.task.model.is_nesterov = static_cast<double>(args[12].getAs<bool>());
if (!args[9].isNull()){
// initial coefficients are provided
MappedColumnVector warm_start_coeff = args[9].getAs<MappedColumnVector>();
// copy warm start into the task model
// state.reset() ensures algo.incrModel is copied from task.model
Index layer_start = 0;
for (size_t k = 0; k < numberOfStages; ++k){
for (Index j=0; j < state.task.model.u[k].cols(); ++j){
for (Index i=0; i < state.task.model.u[k].rows(); ++i){
state.task.model.u[k](i, j) = warm_start_coeff(
layer_start + j * state.task.model.u[k].rows() + i);
}
}
layer_start = state.task.model.u[k].rows() * state.task.model.u[k].cols();
}
} else {
// initialize the model with appropriate coefficients
state.task.model.initialize(
numberOfStages,
reinterpret_cast<const double *>(numbersOfUnits.ptr()));
}
}
// resetting in either case
state.reset();
}
MLPTuple tuple;
try {
tuple.indVar = args[1].getAs<MappedColumnVector>();;
tuple.depVar = args[2].getAs<MappedColumnVector>();
} catch (const ArrayWithNullException &e) {
return args[0];
}
tuple.weight = args[8].getAs<double>();
MLPIGDAlgorithm::transition(state, tuple);
// Use the model from the previous iteration to compute the loss (note that
// it is stored in Task's state, and the Algo's state holds the model from
// the current iteration.
MLPLossAlgorithm::transition(state, tuple);
state.algo.numRows ++;
return state;
}
/**
* @brief Perform the perliminary aggregation function: Merge transition states
*/
AnyType
mlp_igd_merge::run(AnyType &args) {
MLPIGDState<MutableArrayHandle<double> > stateLeft = args[0];
MLPIGDState<ArrayHandle<double> > stateRight = args[1];
if (stateLeft.algo.numRows == 0) { return stateRight; }
else if (stateRight.algo.numRows == 0) { return stateLeft; }
MLPIGDAlgorithm::merge(stateLeft, stateRight);
MLPLossAlgorithm::merge(stateLeft, stateRight);
// The following numRows update cannot be put above, because the model
// averaging depends on their original values
stateLeft.algo.numRows += stateRight.algo.numRows;
return stateLeft;
}
/**
* @brief Perform the multilayer perceptron final step
*/
AnyType
mlp_igd_final::run(AnyType &args) {
// We request a mutable object. Depending on the backend, this might perform
// a deep copy.
MLPIGDState<MutableArrayHandle<double> > state = args[0];
if (state.algo.numRows == 0) { return Null(); }
L2<MLPModelType>::lambda = state.task.lambda;
state.algo.loss = state.algo.loss/static_cast<double>(state.algo.numRows);
state.algo.loss += L2<MLPModelType>::loss(state.task.model);
MLPIGDAlgorithm::final(state);
return state;
}
/**
* @brief Perform the multilayer perceptron minibatch transition step
*
* Called for each tuple.
*/
AnyType
mlp_minibatch_transition::run(AnyType &args) {
// For the first tuple: args[0] is nothing more than a marker that
// indicates that we should do some initial operations.
// For other tuples: args[0] holds the computation state until last tuple
MLPMiniBatchState<MutableArrayHandle<double> > state = args[0];
// initilize the state if first tuple
if (state.numRows == 0) {
if (!args[3].isNull()) {
MLPMiniBatchState<ArrayHandle<double> > previousState = args[3];
state.allocate(*this, previousState.numberOfStages,
previousState.numbersOfUnits);
state = previousState;
} else {
// configuration parameters
ArrayHandle<double> numbersOfUnits = args[4].getAs<ArrayHandle<double> >();
uint16_t numberOfStages = static_cast<uint16_t>(numbersOfUnits.size() - 1);
state.allocate(*this, numberOfStages,
reinterpret_cast<const double *>(numbersOfUnits.ptr()));
state.stepsize = args[5].getAs<double>();
state.model.activation = static_cast<double>(args[6].getAs<int>());
state.model.is_classification = static_cast<double>(args[7].getAs<int>());
// args[8] is for weighting the input row, which is populated later.
state.model.momentum = args[13].getAs<double>();
state.model.is_nesterov = static_cast<double>(args[14].getAs<bool>());
if (!args[9].isNull()){
// initial coefficients are provided copy warm start into the model
MappedColumnVector warm_start_coeff = args[9].getAs<MappedColumnVector>();
Index layer_start = 0;
for (size_t k = 0; k < numberOfStages; ++k){
for (Index j=0; j < state.model.u[k].cols(); ++j){
for (Index i=0; i < state.model.u[k].rows(); ++i){
state.model.u[k](i, j) = warm_start_coeff(
layer_start + j * state.model.u[k].rows() + i);
}
}
layer_start = state.model.u[k].rows() * state.model.u[k].cols();
}
} else {
// initialize the model with appropriate coefficients
state.model.initialize(
numberOfStages,
reinterpret_cast<const double *>(numbersOfUnits.ptr()));
}
state.lambda = args[10].getAs<double>();
MLPTask::lambda = state.lambda;
state.batchSize = static_cast<uint16_t>(args[11].getAs<int>());
state.nEpochs = static_cast<uint16_t>(args[12].getAs<int>());
}
// resetting in either case
state.reset();
}
MiniBatchTuple tuple;
try {
// Ideally there should be no NULLs in the pre-processed input data,
// but keep it in a try block in case the user has modified the
// pre-processed data in any way.
// The matrices are by default read as column-major. We will have to
// transpose it to get back the matrix like how it is in the database.
tuple.indVar = trans(args[1].getAs<MappedMatrix>());
tuple.depVar = trans(args[2].getAs<MappedMatrix>());
} catch (const ArrayWithNullException &e) {
return args[0];
}
tuple.weight = args[8].getAs<double>();
/*
Note that the IGD version uses the model in Task (model from the
previous iteration) to compute the loss.
Minibatch uses the model from Algo (the model based on current
iteration) to compute the loss. The difference in loss based on one
iteration is not too much, hence doing so here. We therefore don't
need to maintain another copy of the model (from previous iteration)
in the state. The model for the current iteration, and the loss are
both computed in one function now.
*/
MLPMiniBatchAlgorithm::transitionInMiniBatch(state, tuple);
state.numRows += tuple.indVar.rows();
return state;
}
/**
* @brief Perform the perliminary aggregation function: Merge transition states
*/
AnyType
mlp_minibatch_merge::run(AnyType &args) {
MLPMiniBatchState<MutableArrayHandle<double> > stateLeft = args[0];
MLPMiniBatchState<ArrayHandle<double> > stateRight = args[1];
if (stateLeft.numRows == 0) { return stateRight; }
else if (stateRight.numRows == 0) { return stateLeft; }
MLPMiniBatchAlgorithm::mergeInPlace(stateLeft, stateRight);
// The following numRows update, cannot be put above, because the model
// averaging depends on their original values
stateLeft.numRows += stateRight.numRows;
stateLeft.loss += stateRight.loss;
return stateLeft;
}
/**
* @brief Perform the multilayer perceptron final step
*/
AnyType
mlp_minibatch_final::run(AnyType &args) {
// We request a mutable object. Depending on the backend, this might perform
// a deep copy.
MLPMiniBatchState<MutableArrayHandle<double> > state = args[0];
// Aggregates that haven't seen any data just return Null.
if (state.numRows == 0) { return Null(); }
L2<MLPModelType>::lambda = state.lambda;
state.loss = state.loss/static_cast<double>(state.numRows);
state.loss += L2<MLPModelType>::loss(state.model);
return state;
}
/**
* @brief Return the difference in RMSE between two states
*/
AnyType
internal_mlp_igd_distance::run(AnyType &args) {
MLPIGDState<ArrayHandle<double> > stateLeft = args[0];
MLPIGDState<ArrayHandle<double> > stateRight = args[1];
return std::abs(stateLeft.algo.loss - stateRight.algo.loss);
}
AnyType
internal_mlp_minibatch_distance::run(AnyType &args) {
MLPMiniBatchState<ArrayHandle<double> > stateLeft = args[0];
MLPMiniBatchState<ArrayHandle<double> > stateRight = args[1];
return std::abs(stateLeft.loss - stateRight.loss);
}
/**
* @brief Return the coefficients and diagnostic statistics of the state
*/
AnyType
internal_mlp_igd_result::run(AnyType &args) {
MLPIGDState<ArrayHandle<double> > state = args[0];
HandleTraits<ArrayHandle<double> >::ColumnVectorTransparentHandleMap
flattenU;
flattenU.rebind(&state.task.model.u[0](0, 0),
state.task.model.coeffArraySize(state.task.numberOfStages,
state.task.numbersOfUnits));
AnyType tuple;
tuple << flattenU
<< static_cast<double>(state.algo.loss);;
return tuple;
}
/**
* @brief Return the coefficients and diagnostic statistics of the state
*/
AnyType
internal_mlp_minibatch_result::run(AnyType &args) {
MLPMiniBatchState<ArrayHandle<double> > state = args[0];
HandleTraits<ArrayHandle<double> >::ColumnVectorTransparentHandleMap flattenU;
flattenU.rebind(&state.model.u[0](0, 0),
state.model.coeffArraySize(state.numberOfStages,
state.numbersOfUnits));
AnyType tuple;
tuple << flattenU
<< static_cast<double>(state.loss);
return tuple;
}
AnyType
internal_predict_mlp::run(AnyType &args) {
MLPModel<MutableArrayHandle<double> > model;
ColumnVector indVar;
int is_response = args[5].getAs<int>();
MappedColumnVector x_means = args[6].getAs<MappedColumnVector>();
MappedColumnVector x_stds = args[7].getAs<MappedColumnVector>();
MappedColumnVector coeff = args[0].getAs<MappedColumnVector>();
MappedColumnVector layerSizes = args[4].getAs<MappedColumnVector>();
// Input layer doesn't count
uint16_t numberOfStages = static_cast<uint16_t>(layerSizes.size() - 1);
double is_classification = args[2].getAs<double>();
double activation = args[3].getAs<double>();
int is_dep_var_array_for_classification = args[8].getAs<int>();
bool is_classification_response = is_classification && is_response;
// The model rebind function is called by both predict and train functions.
// Since we have to use the same function, we are passing a dummy value for
// activation, momentum and nesterov because predict does not care
// about the actual values for these params.
const double dummy_value = static_cast<double>(-1);
model.rebind(&is_classification, &activation, &dummy_value, &dummy_value, &coeff.data()[0],
numberOfStages, &layerSizes.data()[0]);
try {
indVar = (args[1].getAs<MappedColumnVector>()-x_means).cwiseQuotient(x_stds);
} catch (const ArrayWithNullException &e) {
return args[0];
}
ColumnVector prediction = MLPTask::predict(model, indVar, is_classification_response,
is_dep_var_array_for_classification);
return prediction;
}
} // namespace convex
} // namespace modules
} // namespace madlib