Merge branch 'develop'
diff --git a/.gitignore b/.gitignore
index edc8287..7c6771b 100644
--- a/.gitignore
+++ b/.gitignore
@@ -1,4 +1,5 @@
 data/*.txt
 manifest.json
 pio.log
+/pio.sbt
 target/
diff --git a/README.md b/README.md
index a369500..6f0eb4c 100644
--- a/README.md
+++ b/README.md
@@ -8,6 +8,10 @@
 
 ### develop
 
+### v0.1.2
+
+- update for PredictionIO 0.9.0
+- add sample evaluation
 
 ### v0.1.1
 
diff --git a/build.sbt b/build.sbt
index a54ce50..a7e7858 100644
--- a/build.sbt
+++ b/build.sbt
@@ -7,8 +7,6 @@
 organization := "io.prediction"
 
 libraryDependencies ++= Seq(
-  "io.prediction"    %% "core"          % "0.8.6" % "provided",
-  "commons-io"        % "commons-io"    % "2.4",
+  "io.prediction"    %% "core"          % pioVersion.value % "provided",
   "org.apache.spark" %% "spark-core"    % "1.2.0" % "provided",
-  "org.apache.spark" %% "spark-mllib"   % "1.2.0" % "provided",
-  "org.json4s"       %% "json4s-native" % "3.2.10")
+  "org.apache.spark" %% "spark-mllib"   % "1.2.0" % "provided")
diff --git a/data/import_eventserver.py b/data/import_eventserver.py
index a35235d..7e808e3 100644
--- a/data/import_eventserver.py
+++ b/data/import_eventserver.py
@@ -33,7 +33,7 @@
     description="Import sample data for classification engine")
   parser.add_argument('--access_key', default='invald_access_key')
   parser.add_argument('--url', default="http://localhost:7070")
-  parser.add_argument('--file', default="./data/sample_naive_bayes_data.txt")
+  parser.add_argument('--file', default="./data/data.txt")
 
   args = parser.parse_args()
   print args
diff --git a/project/pio-build.sbt b/project/pio-build.sbt
new file mode 100644
index 0000000..8346a96
--- /dev/null
+++ b/project/pio-build.sbt
@@ -0,0 +1 @@
+addSbtPlugin("io.prediction" % "pio-build" % "0.9.0")
diff --git a/src/main/scala/DataSource.scala b/src/main/scala/DataSource.scala
index 166bcaa..c9b8b82 100644
--- a/src/main/scala/DataSource.scala
+++ b/src/main/scala/DataSource.scala
@@ -15,11 +15,14 @@
 
 import grizzled.slf4j.Logger
 
-case class DataSourceParams(appId: Int) extends Params
+case class DataSourceParams(
+  appId: Int,
+  evalK: Option[Int]  // define the k-fold parameter.
+) extends Params
 
 class DataSource(val dsp: DataSourceParams)
   extends PDataSource[TrainingData,
-      EmptyEvaluationInfo, Query, EmptyActualResult] {
+      EmptyEvaluationInfo, Query, ActualResult] {
 
   @transient lazy val logger = Logger[this.type]
 
@@ -53,6 +56,61 @@
 
     new TrainingData(labeledPoints)
   }
+
+  override
+  def readEval(sc: SparkContext)
+  : Seq[(TrainingData, EmptyEvaluationInfo, RDD[(Query, ActualResult)])] = {
+    require(!dsp.evalK.isEmpty, "DataSourceParams.evalK must not be None")
+
+    // The following code reads the data from data store. It is equivalent to
+    // the readTraining method. We copy-and-paste the exact code here for
+    // illustration purpose, a recommended approach is to factor out this logic
+    // into a helper function and have both readTraining and readEval call the
+    // helper.
+    val eventsDb = Storage.getPEvents()
+    val labeledPoints: RDD[LabeledPoint] = eventsDb.aggregateProperties(
+      appId = dsp.appId,
+      entityType = "user",
+      // only keep entities with these required properties defined
+      required = Some(List("plan", "attr0", "attr1", "attr2")))(sc)
+      // aggregateProperties() returns RDD pair of
+      // entity ID and its aggregated properties
+      .map { case (entityId, properties) =>
+        try {
+          LabeledPoint(properties.get[Double]("plan"),
+            Vectors.dense(Array(
+              properties.get[Double]("attr0"),
+              properties.get[Double]("attr1"),
+              properties.get[Double]("attr2")
+            ))
+          )
+        } catch {
+          case e: Exception => {
+            logger.error(s"Failed to get properties ${properties} of" +
+              s" ${entityId}. Exception: ${e}.")
+            throw e
+          }
+        }
+      }.cache()
+    // End of reading from data store
+
+    // K-fold splitting
+    val evalK = dsp.evalK.get
+    val indexedPoints: RDD[(LabeledPoint, Long)] = labeledPoints.zipWithIndex
+
+    (0 until evalK).map { idx => 
+      val trainingPoints = indexedPoints.filter(_._2 % evalK != idx).map(_._1)
+      val testingPoints = indexedPoints.filter(_._2 % evalK == idx).map(_._1)
+
+      (
+        new TrainingData(trainingPoints),
+        new EmptyEvaluationInfo(),
+        testingPoints.map { 
+          p => (new Query(p.features.toArray), new ActualResult(p.label)) 
+        }
+      )
+    }
+  }
 }
 
 class TrainingData(
diff --git a/src/main/scala/Engine.scala b/src/main/scala/Engine.scala
index b2eb05e..e7134d0 100644
--- a/src/main/scala/Engine.scala
+++ b/src/main/scala/Engine.scala
@@ -11,6 +11,10 @@
   val label: Double
 ) extends Serializable
 
+class ActualResult(
+  val label: Double
+) extends Serializable
+
 object ClassificationEngine extends IEngineFactory {
   def apply() = {
     new Engine(
diff --git a/src/main/scala/Evaluation.scala b/src/main/scala/Evaluation.scala
new file mode 100644
index 0000000..8a903ee
--- /dev/null
+++ b/src/main/scala/Evaluation.scala
@@ -0,0 +1,41 @@
+package org.template.classification
+
+import io.prediction.controller.AverageMetric
+import io.prediction.controller.EmptyEvaluationInfo
+import io.prediction.controller.EngineParams
+import io.prediction.controller.EngineParamsGenerator
+import io.prediction.controller.Evaluation
+import io.prediction.controller.Workflow
+import org.apache.spark.SparkContext
+import org.apache.spark.SparkContext._
+import org.apache.spark.rdd.RDD
+
+case class Precision 
+  extends AverageMetric[EmptyEvaluationInfo, 
+      Query, PredictedResult, ActualResult] {
+  def calculate(query: Query, predicted: PredictedResult, actual: ActualResult)
+  : Double = (if (predicted.label == actual.label) 1.0 else 0.0)
+}
+
+object PrecisionEvaluation extends Evaluation {
+  // Define Engine and Metric used in Evaluation
+  engineMetric = (ClassificationEngine(), new Precision())
+}
+
+object EngineParamsList extends EngineParamsGenerator {
+  // Define list of EngineParams used in Evaluation
+
+  // First, we define the base engine params. It specifies the appId from which
+  // the data is read, and a evalK parameter is used to define the
+  // cross-validation.
+  private[this] val baseEP = EngineParams(
+    dataSourceParams = DataSourceParams(appId = 18, evalK = Some(5)))
+
+  // Second, we specify the engine params list by explicitly listing all
+  // algorithm parameters. In this case, we evaluate 3 engine params, each with
+  // a different algorithm params value.
+  engineParamsList = Seq(
+    baseEP.copy(algorithmParamsList = Seq(("naive", AlgorithmParams(10.0)))),
+    baseEP.copy(algorithmParamsList = Seq(("naive", AlgorithmParams(100.0)))),
+    baseEP.copy(algorithmParamsList = Seq(("naive", AlgorithmParams(1000.0)))))
+}
diff --git a/src/main/scala/NaiveBayesAlgorithm.scala b/src/main/scala/NaiveBayesAlgorithm.scala
index f4e57a7..205c8c3 100644
--- a/src/main/scala/NaiveBayesAlgorithm.scala
+++ b/src/main/scala/NaiveBayesAlgorithm.scala
@@ -6,6 +6,7 @@
 import org.apache.spark.mllib.classification.NaiveBayes
 import org.apache.spark.mllib.classification.NaiveBayesModel
 import org.apache.spark.mllib.linalg.Vectors
+import org.apache.spark.SparkContext
 
 import grizzled.slf4j.Logger
 
@@ -19,7 +20,7 @@
 
   @transient lazy val logger = Logger[this.type]
 
-  def train(data: PreparedData): NaiveBayesModel = {
+  def train(sc: SparkContext, data: PreparedData): NaiveBayesModel = {
     // MLLib NaiveBayes cannot handle empty training data.
     require(!data.labeledPoints.take(1).isEmpty,
       s"RDD[labeldPoints] in PreparedData cannot be empty." +
diff --git a/template.json b/template.json
new file mode 100644
index 0000000..932e603
--- /dev/null
+++ b/template.json
@@ -0,0 +1 @@
+{"pio": {"version": { "min": "0.9.0" }}}