blob: c1c340d45e71052b32c3b38c145fe8c1543a1d91 [file] [log] [blame]
package org.template.recommendation
import io.prediction.controller.PDataSource
import io.prediction.controller.EmptyEvaluationInfo
import io.prediction.controller.EmptyActualResult
import io.prediction.controller.Params
import io.prediction.data.storage.Event
import io.prediction.data.storage.Storage
import org.apache.spark.SparkContext
import org.apache.spark.SparkContext._
import org.apache.spark.rdd.RDD
import grizzled.slf4j.Logger
case class DataSourceParams(filepath: String) extends Params // CHANGED
class DataSource(val dsp: DataSourceParams)
extends PDataSource[TrainingData,
EmptyEvaluationInfo, Query, EmptyActualResult] {
@transient lazy val logger = Logger[this.type]
override
def readTraining(sc: SparkContext): TrainingData = {
// CHANGED
val data = sc.textFile(dsp.filepath)
val ratings: RDD[Rating] = data.map(_.split("::") match {
case Array(user, item, rate) =>
Rating(user, item, rate.toDouble)
})
new TrainingData(ratings)
}
}
case class Rating(
user: String,
item: String,
rating: Double
)
class TrainingData(
val ratings: RDD[Rating]
) extends Serializable {
override def toString = {
s"ratings: [${ratings.count()}] (${ratings.take(2).toList}...)"
}
}