blob: 965680fb9692ce2feeae0a6825793629f0426439 [file] [log] [blame]
package io.prediction.examples.regression.parallel
import io.prediction.controller.Engine
import io.prediction.controller.Params
import io.prediction.controller.PDataSource
import io.prediction.controller.P2LAlgorithm
import io.prediction.controller.IdentityPreparator
import io.prediction.controller.IEngineFactory
import io.prediction.controller.AverageServing
import io.prediction.controller.MeanSquareError
import io.prediction.controller.Utils
import io.prediction.controller.Workflow
import io.prediction.controller.WorkflowParams
import org.apache.spark.SparkContext
import org.apache.spark.SparkContext._
import org.apache.spark.mllib.linalg.DenseVector
import org.apache.spark.mllib.linalg.Vector
import org.apache.spark.mllib.linalg.Vectors
import org.apache.spark.mllib.regression.LabeledPoint
import org.apache.spark.mllib.regression.LinearRegressionWithSGD
import org.apache.spark.mllib.regression.RegressionModel
import org.apache.spark.mllib.util.MLUtils
import org.apache.spark.rdd.RDD
import org.json4s._
case class DataSourceParams(
val filepath: String, val k: Int = 3, val seed: Int = 9527)
extends Params
case class ParallelDataSource(val dsp: DataSourceParams)
extends PDataSource[
DataSourceParams, Integer,
RDD[LabeledPoint], Vector, Double] {
def read(sc: SparkContext)
: Seq[(Integer, RDD[LabeledPoint], RDD[(Vector, Double)])] = {
val input = sc.textFile(dsp.filepath)
val points = input.map { line =>
val parts = line.split(' ').map(_.toDouble)
LabeledPoint(parts(0), Vectors.dense(parts.drop(1)))
}
MLUtils.kFold(points, dsp.k, dsp.seed)
.zipWithIndex
.map { case (dataSet, index) =>
(Int.box(index), dataSet._1, dataSet._2.map(p => (p.features, p.label)))
}
}
}
case class AlgorithmParams(
val numIterations: Int = 200, val stepSize: Double = 0.1) extends Params
case class ParallelSGDAlgorithm(val ap: AlgorithmParams)
extends P2LAlgorithm[
AlgorithmParams, RDD[LabeledPoint], RegressionModel, Vector, Double] {
def train(data: RDD[LabeledPoint]): RegressionModel = {
LinearRegressionWithSGD.train(data, ap.numIterations, ap.stepSize)
}
def predict(model: RegressionModel, feature: Vector): Double = {
model.predict(feature)
}
@transient override lazy val querySerializer =
Utils.json4sDefaultFormats + new VectorSerializer
}
object RegressionEngineFactory extends IEngineFactory {
def apply() = {
new Engine(
classOf[ParallelDataSource],
classOf[IdentityPreparator[RDD[LabeledPoint]]],
Map("SGD" -> classOf[ParallelSGDAlgorithm]),
AverageServing(classOf[ParallelSGDAlgorithm]))
}
}
object Run {
def main(args: Array[String]) {
val filepath = "data/lr_data.txt"
val dataSourceParams = DataSourceParams(filepath, 3)
val SGD = "SGD"
val algorithmParamsList = Seq(
(SGD, AlgorithmParams(stepSize = 0.1)),
(SGD, AlgorithmParams(stepSize = 0.2)),
(SGD, AlgorithmParams(stepSize = 0.4)))
Workflow.run(
dataSourceClassOpt = Some(classOf[ParallelDataSource]),
dataSourceParams = dataSourceParams,
preparatorClassOpt =
Some(classOf[IdentityPreparator[RDD[LabeledPoint]]]),
algorithmClassMapOpt = Some(Map(SGD -> classOf[ParallelSGDAlgorithm])),
algorithmParamsList = algorithmParamsList,
servingClassOpt = Some(AverageServing(classOf[ParallelSGDAlgorithm])),
metricsClassOpt = Some(classOf[MeanSquareError]),
params = WorkflowParams(
batch = "Imagine: Parallel Regression"))
}
}
class VectorSerializer extends CustomSerializer[Vector](format => (
{
case JArray(x) =>
val v = x.toArray.map { y =>
y match {
case JDouble(z) => z
}
}
new DenseVector(v)
},
{
case x: Vector =>
JArray(x.toArray.toList.map(d => JDouble(d)))
}
))