Merge branch 'develop'
diff --git a/README.md b/README.md
index 094c1a0..3bdc167 100644
--- a/README.md
+++ b/README.md
@@ -8,6 +8,10 @@
 
 ### develop
 
+### v0.3.1
+
+- Add Evaluation module and modify DataSource for it
+
 ### v0.3.0
 
 - update for PredictionIO 0.9.2, including:
diff --git a/src/main/scala/ALSAlgorithm.scala b/src/main/scala/ALSAlgorithm.scala
index 0c0db22..60282db 100644
--- a/src/main/scala/ALSAlgorithm.scala
+++ b/src/main/scala/ALSAlgorithm.scala
@@ -24,6 +24,12 @@
 
   @transient lazy val logger = Logger[this.type]
 
+  if (ap.numIterations > 30) {
+    logger.warn(
+      s"ALSAlgorithmParams.numIterations > 30, current: ${ap.numIterations}. " +
+      s"There is a chance of running to StackOverflowException. Lower this number to remedy it")
+  }
+
   def train(sc: SparkContext, data: PreparedData): ALSModel = {
     // MLLib ALS cannot handle empty training data.
     require(!data.ratings.take(1).isEmpty,
@@ -31,6 +37,7 @@
       " Please check if DataSource generates TrainingData" +
       " and Preprator generates PreparedData correctly.")
     // Convert user and item String IDs to Int index for MLlib
+
     val userStringIntMap = BiMap.stringInt(data.ratings.map(_.user))
     val itemStringIntMap = BiMap.stringInt(data.ratings.map(_.item))
     val mllibRatings = data.ratings.map( r =>
@@ -84,4 +91,48 @@
     }
   }
 
+  // This function is used by the evaluation module, where a batch of queries is sent to this engine
+  // for evaluation purpose.
+  override def batchPredict(model: ALSModel, queries: RDD[(Long, Query)]): RDD[(Long, PredictedResult)] = {
+    val userIxQueries: RDD[(Int, (Long, Query))] = queries
+    .map { case (ix, query) => {
+      // If user not found, then the index is -1
+      val userIx = model.userStringIntMap.get(query.user).getOrElse(-1)
+      (userIx, (ix, query))
+    }}
+
+    // Cross product of all valid users from the queries and products in the model.
+    val usersProducts: RDD[(Int, Int)] = userIxQueries
+      .keys
+      .filter(_ != -1)
+      .cartesian(model.productFeatures.map(_._1))
+
+    // Call mllib ALS's predict function.
+    val ratings: RDD[MLlibRating] = model.predict(usersProducts)
+
+    // The following code construct predicted results from mllib's ratings.
+    // Not optimal implementation. Instead of groupBy, should use combineByKey with a PriorityQueue
+    val userRatings: RDD[(Int, Iterable[MLlibRating])] = ratings.groupBy(_.user)
+
+    userIxQueries.leftOuterJoin(userRatings)
+    .map { 
+      // When there are ratings
+      case (userIx, ((ix, query), Some(ratings))) => {
+        val topItemScores: Array[ItemScore] = ratings
+        .toArray
+        .sortBy(_.rating)
+        .take(query.num)
+        .map { rating => ItemScore(
+          model.itemStringIntMap.inverse(rating.product),
+          rating.rating) }
+          
+        (ix, PredictedResult(itemScores = topItemScores))
+      }
+      // When user doesn't exist in training data
+      case (userIx, ((ix, query), None)) => {
+        require(userIx == -1)
+        (ix, PredictedResult(itemScores = Array.empty))
+      }
+    }
+  }
 }
diff --git a/src/main/scala/DataSource.scala b/src/main/scala/DataSource.scala
index 43ac5f9..36111b7 100644
--- a/src/main/scala/DataSource.scala
+++ b/src/main/scala/DataSource.scala
@@ -13,16 +13,19 @@
 
 import grizzled.slf4j.Logger
 
-case class DataSourceParams(appName: String) extends Params
+case class DataSourceEvalParams(kFold: Int, queryNum: Int)
+
+case class DataSourceParams(
+  appName: String,
+  evalParams: Option[DataSourceEvalParams]) extends Params
 
 class DataSource(val dsp: DataSourceParams)
   extends PDataSource[TrainingData,
-      EmptyEvaluationInfo, Query, EmptyActualResult] {
+      EmptyEvaluationInfo, Query, ActualResult] {
 
   @transient lazy val logger = Logger[this.type]
 
-  override
-  def readTraining(sc: SparkContext): TrainingData = {
+  def getRatings(sc: SparkContext): RDD[Rating] = {
 
     val eventsRDD: RDD[Event] = PEventStore.find(
       appName = dsp.appName,
@@ -51,7 +54,37 @@
       rating
     }.cache()
 
-    new TrainingData(ratingsRDD)
+    ratingsRDD
+  }
+
+  override
+  def readTraining(sc: SparkContext): TrainingData = {
+    new TrainingData(getRatings(sc))
+  }
+
+  override
+  def readEval(sc: SparkContext)
+  : Seq[(TrainingData, EmptyEvaluationInfo, RDD[(Query, ActualResult)])] = {
+    require(!dsp.evalParams.isEmpty, "Must specify evalParams")
+    val evalParams = dsp.evalParams.get
+
+    val kFold = evalParams.kFold
+    val ratings: RDD[(Rating, Long)] = getRatings(sc).zipWithUniqueId
+    ratings.cache
+
+    (0 until kFold).map { idx => {
+      val trainingRatings = ratings.filter(_._2 % kFold != idx).map(_._1)
+      val testingRatings = ratings.filter(_._2 % kFold == idx).map(_._1)
+
+      val testingUsers: RDD[(String, Iterable[Rating])] = testingRatings.groupBy(_.user)
+
+      (new TrainingData(trainingRatings),
+        new EmptyEvaluationInfo(),
+        testingUsers.map {
+          case (user, ratings) => (Query(user, evalParams.queryNum), ActualResult(ratings.toArray))
+        }
+      )
+    }}
   }
 }
 
diff --git a/src/main/scala/Engine.scala b/src/main/scala/Engine.scala
index 22add39..2f0ad39 100644
--- a/src/main/scala/Engine.scala
+++ b/src/main/scala/Engine.scala
@@ -12,6 +12,10 @@
   itemScores: Array[ItemScore]
 ) extends Serializable
 
+case class ActualResult(
+  ratings: Array[Rating]
+) extends Serializable
+
 case class ItemScore(
   item: String,
   score: Double
diff --git a/src/main/scala/Evaluation.scala b/src/main/scala/Evaluation.scala
new file mode 100644
index 0000000..f744677
--- /dev/null
+++ b/src/main/scala/Evaluation.scala
@@ -0,0 +1,89 @@
+package org.template.recommendation
+
+import io.prediction.controller.Evaluation
+import io.prediction.controller.OptionAverageMetric
+import io.prediction.controller.AverageMetric
+import io.prediction.controller.EmptyEvaluationInfo
+import io.prediction.controller.EngineParamsGenerator
+import io.prediction.controller.EngineParams
+import io.prediction.controller.MetricEvaluator
+
+// Usage:
+// $ pio eval org.template.recommendation.RecommendationEvaluation \
+//   org.template.recommendation.ParamsList
+
+case class PrecisionAtK(k: Int, ratingThreshold: Double = 2.0)
+    extends OptionAverageMetric[EmptyEvaluationInfo, Query, PredictedResult, ActualResult] {
+  require(k > 0, "k must be greater than 0")
+
+  override def header = s"Precision@K (k=$k, threshold=$ratingThreshold)"
+
+  def calculate(q: Query, p: PredictedResult, a: ActualResult): Option[Double] = {
+    val positives: Set[String] = a.ratings.filter(_.rating >= ratingThreshold).map(_.item).toSet
+
+    // If there is no positive results, Precision is undefined. We don't consider this case in the
+    // metrics, hence we return None.
+    if (positives.size == 0) {
+      return None
+    }
+
+    val tpCount: Int = p.itemScores.take(k).filter(is => positives(is.item)).size
+
+    Some(tpCount.toDouble / math.min(k, positives.size))
+  }
+}
+
+case class PositiveCount(ratingThreshold: Double = 2.0)
+    extends AverageMetric[EmptyEvaluationInfo, Query, PredictedResult, ActualResult] {
+  override def header = s"PositiveCount (threshold=$ratingThreshold)"
+
+  def calculate(q: Query, p: PredictedResult, a: ActualResult): Double = {
+    a.ratings.filter(_.rating >= ratingThreshold).size
+  }
+}
+
+object RecommendationEvaluation extends Evaluation {
+  engineEvaluator = (
+    RecommendationEngine(),
+    MetricEvaluator(
+      metric = PrecisionAtK(k = 10, ratingThreshold = 4.0),
+      otherMetrics = Seq(
+        PositiveCount(ratingThreshold = 4.0),
+        PrecisionAtK(k = 10, ratingThreshold = 2.0),
+        PositiveCount(ratingThreshold = 2.0),
+        PrecisionAtK(k = 10, ratingThreshold = 1.0),
+        PositiveCount(ratingThreshold = 1.0)
+      )))
+}
+
+
+object ComprehensiveRecommendationEvaluation extends Evaluation {
+  val ratingThresholds = Seq(0.0, 2.0, 4.0)
+  val ks = Seq(1, 3, 10)
+
+  engineEvaluator = (
+    RecommendationEngine(),
+    MetricEvaluator(
+      metric = PrecisionAtK(k = 3, ratingThreshold = 2.0),
+      otherMetrics = (
+        (for (r <- ratingThresholds) yield PositiveCount(ratingThreshold = r)) ++
+        (for (r <- ratingThresholds; k <- ks) yield PrecisionAtK(k = k, ratingThreshold = r))
+      )))
+}
+
+
+trait BaseEngineParamsList extends EngineParamsGenerator {
+  protected val baseEP = EngineParams(
+    dataSourceParams = DataSourceParams(
+      appName = "INVALID_APP_NAME",
+      evalParams = Some(DataSourceEvalParams(kFold = 5, queryNum = 10))))
+}
+
+object EngineParamsList extends BaseEngineParamsList {
+  engineParamsList = for(
+    rank <- Seq(5, 10, 20);
+    numIterations <- Seq(1, 5, 10))
+    yield baseEP.copy(
+      algorithmParamsList = Seq(
+        ("als", ALSAlgorithmParams(rank, numIterations, 0.01, Some(3)))))
+}