| package org.template.recommendation |
| |
| import io.prediction.controller.PDataSource |
| import io.prediction.controller.EmptyEvaluationInfo |
| import io.prediction.controller.EmptyActualResult |
| import io.prediction.controller.Params |
| import io.prediction.data.storage.Event |
| import io.prediction.data.storage.Storage |
| |
| import org.apache.spark.SparkContext |
| import org.apache.spark.SparkContext._ |
| import org.apache.spark.rdd.RDD |
| |
| import grizzled.slf4j.Logger |
| |
| case class DataSourceParams(filepath: String) extends Params // CHANGED |
| |
| class DataSource(val dsp: DataSourceParams) |
| extends PDataSource[TrainingData, |
| EmptyEvaluationInfo, Query, EmptyActualResult] { |
| |
| @transient lazy val logger = Logger[this.type] |
| |
| override |
| def readTraining(sc: SparkContext): TrainingData = { |
| // CHANGED |
| val data = sc.textFile(dsp.filepath) |
| val ratings: RDD[Rating] = data.map(_.split("::") match { |
| case Array(user, item, rate) => |
| Rating(user, item, rate.toDouble) |
| }) |
| new TrainingData(ratings) |
| } |
| } |
| |
| case class Rating( |
| user: String, |
| item: String, |
| rating: Double |
| ) |
| |
| class TrainingData( |
| val ratings: RDD[Rating] |
| ) extends Serializable { |
| override def toString = { |
| s"ratings: [${ratings.count()}] (${ratings.take(2).toList}...)" |
| } |
| } |