Emigration from core repo
diff --git a/.gitignore b/.gitignore
new file mode 100644
index 0000000..c89ee97
--- /dev/null
+++ b/.gitignore
@@ -0,0 +1,2 @@
+data/*.txt
+manifest.json
diff --git a/build.sbt b/build.sbt
new file mode 100644
index 0000000..6105185
--- /dev/null
+++ b/build.sbt
@@ -0,0 +1,14 @@
+import AssemblyKeys._
+
+assemblySettings
+
+name := "template-scala-parallel-classification"
+
+organization := "io.prediction"
+
+libraryDependencies ++= Seq(
+  "io.prediction"    %% "core"          % "0.8.6-SNAPSHOT" % "provided",
+  "commons-io"        % "commons-io"    % "2.4",
+  "org.apache.spark" %% "spark-core"    % "1.2.0" % "provided",
+  "org.apache.spark" %% "spark-mllib"   % "1.2.0" % "provided",
+  "org.json4s"       %% "json4s-native" % "3.2.10")
diff --git a/data/import_eventserver.py b/data/import_eventserver.py
new file mode 100644
index 0000000..a35235d
--- /dev/null
+++ b/data/import_eventserver.py
@@ -0,0 +1,46 @@
+"""
+Import sample data for classification engine
+"""
+
+import predictionio
+import argparse
+
+def import_events(client, file):
+  f = open(file, 'r')
+  count = 0
+  print "Importing data..."
+  for line in f:
+    data = line.rstrip('\r\n').split(",")
+    plan = data[0]
+    attr = data[1].split(" ")
+    client.create_event(
+      event="$set",
+      entity_type="user",
+      entity_id=str(count), # use the count num as user ID
+      properties= {
+        "attr0" : int(attr[0]),
+        "attr1" : int(attr[1]),
+        "attr2" : int(attr[2]),
+        "plan" : int(plan)
+      }
+    )
+    count += 1
+  f.close()
+  print "%s events are imported." % count
+
+if __name__ == '__main__':
+  parser = argparse.ArgumentParser(
+    description="Import sample data for classification engine")
+  parser.add_argument('--access_key', default='invald_access_key')
+  parser.add_argument('--url', default="http://localhost:7070")
+  parser.add_argument('--file', default="./data/sample_naive_bayes_data.txt")
+
+  args = parser.parse_args()
+  print args
+
+  client = predictionio.EventClient(
+    access_key=args.access_key,
+    url=args.url,
+    threads=5,
+    qsize=500)
+  import_events(client, args.file)
diff --git a/engine.json b/engine.json
new file mode 100644
index 0000000..d20a2ad
--- /dev/null
+++ b/engine.json
@@ -0,0 +1,18 @@
+{
+  "id": "default",
+  "description": "Default settings",
+  "engineFactory": "org.template.classification.ClassificationEngine",
+  "datasource": {
+    "params": {
+      "appId": 2
+    }
+  },
+  "algorithms": [
+    {
+      "name": "naive",
+      "params": {
+        "lambda": 1.0
+      }
+    }
+  ]
+}
diff --git a/project/assembly.sbt b/project/assembly.sbt
new file mode 100644
index 0000000..54c3252
--- /dev/null
+++ b/project/assembly.sbt
@@ -0,0 +1 @@
+addSbtPlugin("com.eed3si9n" % "sbt-assembly" % "0.11.2")
diff --git a/src/main/scala/DataSource.scala b/src/main/scala/DataSource.scala
new file mode 100644
index 0000000..c2a422d
--- /dev/null
+++ b/src/main/scala/DataSource.scala
@@ -0,0 +1,60 @@
+package org.template.classification
+
+import io.prediction.controller.PDataSource
+import io.prediction.controller.EmptyEvaluationInfo
+import io.prediction.controller.EmptyActualResult
+import io.prediction.controller.Params
+import io.prediction.data.storage.Event
+import io.prediction.data.storage.Storage
+
+import org.apache.spark.SparkContext
+import org.apache.spark.SparkContext._
+import org.apache.spark.rdd.RDD
+import org.apache.spark.mllib.regression.LabeledPoint
+import org.apache.spark.mllib.linalg.Vectors
+
+import grizzled.slf4j.Logger
+
+case class DataSourceParams(appId: Int) extends Params
+
+class DataSource(val dsp: DataSourceParams)
+  extends PDataSource[TrainingData,
+      EmptyEvaluationInfo, Query, EmptyActualResult] {
+
+  @transient lazy val logger = Logger[this.type]
+
+  override
+  def readTraining(sc: SparkContext): TrainingData = {
+    val eventsDb = Storage.getPEvents()
+    val labeledPoints: RDD[LabeledPoint] = eventsDb.aggregateProperties(
+      appId = dsp.appId,
+      entityType = "user",
+      // only keep entities with these required properties defined
+      required = Some(List("plan", "attr0", "attr1", "attr2")))(sc)
+      // aggregateProperties() returns RDD pair of
+      // entity ID and its aggregated properties
+      .map { case (entityId, properties) =>
+        try {
+          LabeledPoint(properties.get[Double]("plan"),
+            Vectors.dense(Array(
+              properties.get[Double]("attr0"),
+              properties.get[Double]("attr1"),
+              properties.get[Double]("attr2")
+            ))
+          )
+        } catch {
+          case e: Exception => {
+            logger.error(s"Failed to get properties ${properties} of" +
+              s" ${entityId}. Exception: ${e}.")
+            throw e
+          }
+        }
+      }
+
+    new TrainingData(labeledPoints)
+  }
+}
+
+class TrainingData(
+  val labeledPoints: RDD[LabeledPoint]
+) extends Serializable
diff --git a/src/main/scala/Engine.scala b/src/main/scala/Engine.scala
new file mode 100644
index 0000000..b2eb05e
--- /dev/null
+++ b/src/main/scala/Engine.scala
@@ -0,0 +1,22 @@
+package org.template.classification
+
+import io.prediction.controller.IEngineFactory
+import io.prediction.controller.Engine
+
+class Query(
+  val features: Array[Double]
+) extends Serializable
+
+class PredictedResult(
+  val label: Double
+) extends Serializable
+
+object ClassificationEngine extends IEngineFactory {
+  def apply() = {
+    new Engine(
+      classOf[DataSource],
+      classOf[Preparator],
+      Map("naive" -> classOf[NaiveBayesAlgorithm]),
+      classOf[Serving])
+  }
+}
diff --git a/src/main/scala/NaiveBayesAlgorithm.scala b/src/main/scala/NaiveBayesAlgorithm.scala
new file mode 100644
index 0000000..f4e57a7
--- /dev/null
+++ b/src/main/scala/NaiveBayesAlgorithm.scala
@@ -0,0 +1,37 @@
+package org.template.classification
+
+import io.prediction.controller.P2LAlgorithm
+import io.prediction.controller.Params
+
+import org.apache.spark.mllib.classification.NaiveBayes
+import org.apache.spark.mllib.classification.NaiveBayesModel
+import org.apache.spark.mllib.linalg.Vectors
+
+import grizzled.slf4j.Logger
+
+case class AlgorithmParams(
+  lambda: Double
+) extends Params
+
+// extends P2LAlgorithm because the MLlib's NaiveBayesModel doesn't contain RDD.
+class NaiveBayesAlgorithm(val ap: AlgorithmParams)
+  extends P2LAlgorithm[PreparedData, NaiveBayesModel, Query, PredictedResult] {
+
+  @transient lazy val logger = Logger[this.type]
+
+  def train(data: PreparedData): NaiveBayesModel = {
+    // MLLib NaiveBayes cannot handle empty training data.
+    require(!data.labeledPoints.take(1).isEmpty,
+      s"RDD[labeldPoints] in PreparedData cannot be empty." +
+      " Please check if DataSource generates TrainingData" +
+      " and Preprator generates PreparedData correctly.")
+
+    NaiveBayes.train(data.labeledPoints, ap.lambda)
+  }
+
+  def predict(model: NaiveBayesModel, query: Query): PredictedResult = {
+    val label = model.predict(Vectors.dense(query.features))
+    new PredictedResult(label)
+  }
+
+}
diff --git a/src/main/scala/Preparator.scala b/src/main/scala/Preparator.scala
new file mode 100644
index 0000000..4e581ea
--- /dev/null
+++ b/src/main/scala/Preparator.scala
@@ -0,0 +1,19 @@
+package org.template.classification
+
+import io.prediction.controller.PPreparator
+
+import org.apache.spark.SparkContext
+import org.apache.spark.SparkContext._
+import org.apache.spark.rdd.RDD
+import org.apache.spark.mllib.regression.LabeledPoint
+
+class PreparedData(
+  val labeledPoints: RDD[LabeledPoint]
+) extends Serializable
+
+class Preparator extends PPreparator[TrainingData, PreparedData] {
+
+  def prepare(sc: SparkContext, trainingData: TrainingData): PreparedData = {
+    new PreparedData(trainingData.labeledPoints)
+  }
+}
diff --git a/src/main/scala/Serving.scala b/src/main/scala/Serving.scala
new file mode 100644
index 0000000..ef06088
--- /dev/null
+++ b/src/main/scala/Serving.scala
@@ -0,0 +1,12 @@
+package org.template.classification
+
+import io.prediction.controller.LServing
+
+class Serving extends LServing[Query, PredictedResult] {
+
+  override
+  def serve(query: Query,
+    predictedResults: Seq[PredictedResult]): PredictedResult = {
+    predictedResults.head
+  }
+}