Merge branch 'develop'
diff --git a/README.md b/README.md
index 094c1a0..3bdc167 100644
--- a/README.md
+++ b/README.md
@@ -8,6 +8,10 @@
### develop
+### v0.3.1
+
+- Add Evaluation module and modify DataSource for it
+
### v0.3.0
- update for PredictionIO 0.9.2, including:
diff --git a/src/main/scala/ALSAlgorithm.scala b/src/main/scala/ALSAlgorithm.scala
index 0c0db22..60282db 100644
--- a/src/main/scala/ALSAlgorithm.scala
+++ b/src/main/scala/ALSAlgorithm.scala
@@ -24,6 +24,12 @@
@transient lazy val logger = Logger[this.type]
+ if (ap.numIterations > 30) {
+ logger.warn(
+ s"ALSAlgorithmParams.numIterations > 30, current: ${ap.numIterations}. " +
+ s"There is a chance of running to StackOverflowException. Lower this number to remedy it")
+ }
+
def train(sc: SparkContext, data: PreparedData): ALSModel = {
// MLLib ALS cannot handle empty training data.
require(!data.ratings.take(1).isEmpty,
@@ -31,6 +37,7 @@
" Please check if DataSource generates TrainingData" +
" and Preprator generates PreparedData correctly.")
// Convert user and item String IDs to Int index for MLlib
+
val userStringIntMap = BiMap.stringInt(data.ratings.map(_.user))
val itemStringIntMap = BiMap.stringInt(data.ratings.map(_.item))
val mllibRatings = data.ratings.map( r =>
@@ -84,4 +91,48 @@
}
}
+ // This function is used by the evaluation module, where a batch of queries is sent to this engine
+ // for evaluation purpose.
+ override def batchPredict(model: ALSModel, queries: RDD[(Long, Query)]): RDD[(Long, PredictedResult)] = {
+ val userIxQueries: RDD[(Int, (Long, Query))] = queries
+ .map { case (ix, query) => {
+ // If user not found, then the index is -1
+ val userIx = model.userStringIntMap.get(query.user).getOrElse(-1)
+ (userIx, (ix, query))
+ }}
+
+ // Cross product of all valid users from the queries and products in the model.
+ val usersProducts: RDD[(Int, Int)] = userIxQueries
+ .keys
+ .filter(_ != -1)
+ .cartesian(model.productFeatures.map(_._1))
+
+ // Call mllib ALS's predict function.
+ val ratings: RDD[MLlibRating] = model.predict(usersProducts)
+
+ // The following code construct predicted results from mllib's ratings.
+ // Not optimal implementation. Instead of groupBy, should use combineByKey with a PriorityQueue
+ val userRatings: RDD[(Int, Iterable[MLlibRating])] = ratings.groupBy(_.user)
+
+ userIxQueries.leftOuterJoin(userRatings)
+ .map {
+ // When there are ratings
+ case (userIx, ((ix, query), Some(ratings))) => {
+ val topItemScores: Array[ItemScore] = ratings
+ .toArray
+ .sortBy(_.rating)
+ .take(query.num)
+ .map { rating => ItemScore(
+ model.itemStringIntMap.inverse(rating.product),
+ rating.rating) }
+
+ (ix, PredictedResult(itemScores = topItemScores))
+ }
+ // When user doesn't exist in training data
+ case (userIx, ((ix, query), None)) => {
+ require(userIx == -1)
+ (ix, PredictedResult(itemScores = Array.empty))
+ }
+ }
+ }
}
diff --git a/src/main/scala/DataSource.scala b/src/main/scala/DataSource.scala
index 43ac5f9..36111b7 100644
--- a/src/main/scala/DataSource.scala
+++ b/src/main/scala/DataSource.scala
@@ -13,16 +13,19 @@
import grizzled.slf4j.Logger
-case class DataSourceParams(appName: String) extends Params
+case class DataSourceEvalParams(kFold: Int, queryNum: Int)
+
+case class DataSourceParams(
+ appName: String,
+ evalParams: Option[DataSourceEvalParams]) extends Params
class DataSource(val dsp: DataSourceParams)
extends PDataSource[TrainingData,
- EmptyEvaluationInfo, Query, EmptyActualResult] {
+ EmptyEvaluationInfo, Query, ActualResult] {
@transient lazy val logger = Logger[this.type]
- override
- def readTraining(sc: SparkContext): TrainingData = {
+ def getRatings(sc: SparkContext): RDD[Rating] = {
val eventsRDD: RDD[Event] = PEventStore.find(
appName = dsp.appName,
@@ -51,7 +54,37 @@
rating
}.cache()
- new TrainingData(ratingsRDD)
+ ratingsRDD
+ }
+
+ override
+ def readTraining(sc: SparkContext): TrainingData = {
+ new TrainingData(getRatings(sc))
+ }
+
+ override
+ def readEval(sc: SparkContext)
+ : Seq[(TrainingData, EmptyEvaluationInfo, RDD[(Query, ActualResult)])] = {
+ require(!dsp.evalParams.isEmpty, "Must specify evalParams")
+ val evalParams = dsp.evalParams.get
+
+ val kFold = evalParams.kFold
+ val ratings: RDD[(Rating, Long)] = getRatings(sc).zipWithUniqueId
+ ratings.cache
+
+ (0 until kFold).map { idx => {
+ val trainingRatings = ratings.filter(_._2 % kFold != idx).map(_._1)
+ val testingRatings = ratings.filter(_._2 % kFold == idx).map(_._1)
+
+ val testingUsers: RDD[(String, Iterable[Rating])] = testingRatings.groupBy(_.user)
+
+ (new TrainingData(trainingRatings),
+ new EmptyEvaluationInfo(),
+ testingUsers.map {
+ case (user, ratings) => (Query(user, evalParams.queryNum), ActualResult(ratings.toArray))
+ }
+ )
+ }}
}
}
diff --git a/src/main/scala/Engine.scala b/src/main/scala/Engine.scala
index 22add39..2f0ad39 100644
--- a/src/main/scala/Engine.scala
+++ b/src/main/scala/Engine.scala
@@ -12,6 +12,10 @@
itemScores: Array[ItemScore]
) extends Serializable
+case class ActualResult(
+ ratings: Array[Rating]
+) extends Serializable
+
case class ItemScore(
item: String,
score: Double
diff --git a/src/main/scala/Evaluation.scala b/src/main/scala/Evaluation.scala
new file mode 100644
index 0000000..f744677
--- /dev/null
+++ b/src/main/scala/Evaluation.scala
@@ -0,0 +1,89 @@
+package org.template.recommendation
+
+import io.prediction.controller.Evaluation
+import io.prediction.controller.OptionAverageMetric
+import io.prediction.controller.AverageMetric
+import io.prediction.controller.EmptyEvaluationInfo
+import io.prediction.controller.EngineParamsGenerator
+import io.prediction.controller.EngineParams
+import io.prediction.controller.MetricEvaluator
+
+// Usage:
+// $ pio eval org.template.recommendation.RecommendationEvaluation \
+// org.template.recommendation.ParamsList
+
+case class PrecisionAtK(k: Int, ratingThreshold: Double = 2.0)
+ extends OptionAverageMetric[EmptyEvaluationInfo, Query, PredictedResult, ActualResult] {
+ require(k > 0, "k must be greater than 0")
+
+ override def header = s"Precision@K (k=$k, threshold=$ratingThreshold)"
+
+ def calculate(q: Query, p: PredictedResult, a: ActualResult): Option[Double] = {
+ val positives: Set[String] = a.ratings.filter(_.rating >= ratingThreshold).map(_.item).toSet
+
+ // If there is no positive results, Precision is undefined. We don't consider this case in the
+ // metrics, hence we return None.
+ if (positives.size == 0) {
+ return None
+ }
+
+ val tpCount: Int = p.itemScores.take(k).filter(is => positives(is.item)).size
+
+ Some(tpCount.toDouble / math.min(k, positives.size))
+ }
+}
+
+case class PositiveCount(ratingThreshold: Double = 2.0)
+ extends AverageMetric[EmptyEvaluationInfo, Query, PredictedResult, ActualResult] {
+ override def header = s"PositiveCount (threshold=$ratingThreshold)"
+
+ def calculate(q: Query, p: PredictedResult, a: ActualResult): Double = {
+ a.ratings.filter(_.rating >= ratingThreshold).size
+ }
+}
+
+object RecommendationEvaluation extends Evaluation {
+ engineEvaluator = (
+ RecommendationEngine(),
+ MetricEvaluator(
+ metric = PrecisionAtK(k = 10, ratingThreshold = 4.0),
+ otherMetrics = Seq(
+ PositiveCount(ratingThreshold = 4.0),
+ PrecisionAtK(k = 10, ratingThreshold = 2.0),
+ PositiveCount(ratingThreshold = 2.0),
+ PrecisionAtK(k = 10, ratingThreshold = 1.0),
+ PositiveCount(ratingThreshold = 1.0)
+ )))
+}
+
+
+object ComprehensiveRecommendationEvaluation extends Evaluation {
+ val ratingThresholds = Seq(0.0, 2.0, 4.0)
+ val ks = Seq(1, 3, 10)
+
+ engineEvaluator = (
+ RecommendationEngine(),
+ MetricEvaluator(
+ metric = PrecisionAtK(k = 3, ratingThreshold = 2.0),
+ otherMetrics = (
+ (for (r <- ratingThresholds) yield PositiveCount(ratingThreshold = r)) ++
+ (for (r <- ratingThresholds; k <- ks) yield PrecisionAtK(k = k, ratingThreshold = r))
+ )))
+}
+
+
+trait BaseEngineParamsList extends EngineParamsGenerator {
+ protected val baseEP = EngineParams(
+ dataSourceParams = DataSourceParams(
+ appName = "INVALID_APP_NAME",
+ evalParams = Some(DataSourceEvalParams(kFold = 5, queryNum = 10))))
+}
+
+object EngineParamsList extends BaseEngineParamsList {
+ engineParamsList = for(
+ rank <- Seq(5, 10, 20);
+ numIterations <- Seq(1, 5, 10))
+ yield baseEP.copy(
+ algorithmParamsList = Seq(
+ ("als", ALSAlgorithmParams(rank, numIterations, 0.01, Some(3)))))
+}