| package org.apache.spark.mllib.recommendation.engine |
| |
| import io.prediction.controller.Engine |
| import io.prediction.controller.IEngineFactory |
| import io.prediction.controller.IPersistentModel |
| import io.prediction.controller.IPersistentModelLoader |
| import io.prediction.controller.PDataSource |
| import io.prediction.controller.Params |
| import io.prediction.controller.PAlgorithm |
| import io.prediction.controller.IdentityPreparator |
| import io.prediction.controller.FirstServing |
| import io.prediction.controller.Utils |
| import io.prediction.controller.Workflow |
| import io.prediction.controller.WorkflowParams |
| |
| import org.apache.commons.io.FileUtils |
| import org.apache.spark.SparkContext |
| import org.apache.spark.SparkContext._ |
| import org.apache.spark.rdd.RDD |
| import org.apache.spark.mllib.recommendation.ALS |
| import org.apache.spark.mllib.recommendation.Rating |
| import org.apache.spark.mllib.recommendation.MatrixFactorizationModel |
| import org.json4s._ |
| |
| import scala.io.Source |
| |
| import java.io.File |
| |
| case class DataSourceParams(val filepath: String) extends Params |
| |
| case class DataSource(val dsp: DataSourceParams) |
| extends PDataSource[DataSourceParams, Null, RDD[Rating], (Int, Int), Double] { |
| |
| def read(sc: SparkContext) |
| : Seq[(Null, RDD[Rating], RDD[((Int, Int), Double)])] = { |
| val data = sc.textFile(dsp.filepath) |
| val ratings: RDD[Rating] = data.map(_.split("::") match { |
| case Array(user, item, rate) => |
| Rating(user.toInt, item.toInt, rate.toDouble) |
| }) |
| |
| val featureTargets: RDD[((Int, Int), Double)] = ratings.map { |
| case Rating(user, product, rate) => ((user, product), rate) |
| } |
| |
| Seq((null, ratings, featureTargets)) |
| } |
| } |
| |
| case class AlgorithmParams( |
| val rank: Int = 10, |
| val numIterations: Int = 20, |
| val lambda: Double = 0.01, |
| val persistModel: Boolean = false) extends Params |
| |
| class PMatrixFactorizationModel(rank: Int, |
| userFeatures: RDD[(Int, Array[Double])], |
| productFeatures: RDD[(Int, Array[Double])]) |
| extends MatrixFactorizationModel(rank, userFeatures, productFeatures) |
| with IPersistentModel[AlgorithmParams] { |
| def save(id: String, params: AlgorithmParams, sc: SparkContext): Boolean = { |
| if (params.persistModel) { |
| sc.parallelize(Seq(rank)).saveAsObjectFile(s"/tmp/${id}/rank") |
| userFeatures.saveAsObjectFile(s"/tmp/${id}/userFeatures") |
| productFeatures.saveAsObjectFile(s"/tmp/${id}/productFeatures") |
| } |
| params.persistModel |
| } |
| } |
| |
| object PMatrixFactorizationModel |
| extends IPersistentModelLoader[AlgorithmParams, PMatrixFactorizationModel] { |
| def apply(id: String, params: AlgorithmParams, sc: Option[SparkContext]) = { |
| new PMatrixFactorizationModel( |
| rank = sc.get.objectFile[Int](s"/tmp/${id}/rank").first, |
| userFeatures = sc.get.objectFile(s"/tmp/${id}/userFeatures"), |
| productFeatures = sc.get.objectFile(s"/tmp/${id}/productFeatures")) |
| } |
| } |
| |
| class ALSAlgorithm(val ap: AlgorithmParams) |
| extends PAlgorithm[AlgorithmParams, RDD[Rating], |
| PMatrixFactorizationModel, (Int, Int), Double] { |
| |
| def train(data: RDD[Rating]): PMatrixFactorizationModel = { |
| val m = ALS.train(data, ap.rank, ap.numIterations, ap.lambda) |
| new PMatrixFactorizationModel( |
| rank = m.rank, |
| userFeatures = m.userFeatures, |
| productFeatures = m.productFeatures) |
| } |
| |
| def batchPredict( |
| model: PMatrixFactorizationModel, |
| feature: RDD[(Long, (Int, Int))]): RDD[(Long, Double)] = { |
| val indexlessFeature = feature.values |
| |
| val prediction: RDD[Rating] = model.predict(indexlessFeature) |
| |
| val p: RDD[((Int, Int), Double)] = prediction.map { |
| r => ((r.user, r.product), r.rating) |
| } |
| |
| feature.map{ _.swap } |
| .join(p) |
| .map { case (up, (fi, r)) => (fi,r) } |
| } |
| |
| def predict( |
| model: PMatrixFactorizationModel, feature: (Int, Int)): Double = { |
| model.predict(feature._1, feature._2) |
| } |
| |
| @transient override lazy val querySerializer = |
| Utils.json4sDefaultFormats + new Tuple2IntSerializer |
| } |
| |
| object Run { |
| def main(args: Array[String]) { |
| val dsp = DataSourceParams("data/movielens.txt") |
| val ap = AlgorithmParams() |
| |
| Workflow.run( |
| dataSourceClassOpt = Some(classOf[DataSource]), |
| dataSourceParams = dsp, |
| preparatorClassOpt = Some(IdentityPreparator(classOf[DataSource])), |
| algorithmClassMapOpt = Some(Map("" -> classOf[ALSAlgorithm])), |
| algorithmParamsList = Seq(("", ap)), |
| servingClassOpt = Some(FirstServing(classOf[ALSAlgorithm])), |
| params = WorkflowParams( |
| batch = "Imagine: P Recommendations", |
| verbose = 1 |
| ) |
| ) |
| } |
| } |
| |
| object RecommendationEngine extends IEngineFactory { |
| def apply() = { |
| new Engine( |
| classOf[DataSource], |
| IdentityPreparator(classOf[DataSource]), |
| Map("" -> classOf[ALSAlgorithm]), |
| FirstServing(classOf[ALSAlgorithm])) |
| } |
| } |
| |
| |
| class Tuple2IntSerializer extends CustomSerializer[(Int, Int)](format => ( |
| { |
| case JArray(List(JInt(x), JInt(y))) => (x.intValue, y.intValue) |
| }, |
| { |
| case x: (Int, Int) => JArray(List(JInt(x._1), JInt(x._2))) |
| } |
| )) |