Emigration from core repo
diff --git a/.gitignore b/.gitignore
new file mode 100644
index 0000000..c89ee97
--- /dev/null
+++ b/.gitignore
@@ -0,0 +1,2 @@
+data/*.txt
+manifest.json
diff --git a/build.sbt b/build.sbt
new file mode 100644
index 0000000..6105185
--- /dev/null
+++ b/build.sbt
@@ -0,0 +1,14 @@
+import AssemblyKeys._
+
+assemblySettings
+
+name := "template-scala-parallel-classification"
+
+organization := "io.prediction"
+
+libraryDependencies ++= Seq(
+ "io.prediction" %% "core" % "0.8.6-SNAPSHOT" % "provided",
+ "commons-io" % "commons-io" % "2.4",
+ "org.apache.spark" %% "spark-core" % "1.2.0" % "provided",
+ "org.apache.spark" %% "spark-mllib" % "1.2.0" % "provided",
+ "org.json4s" %% "json4s-native" % "3.2.10")
diff --git a/data/import_eventserver.py b/data/import_eventserver.py
new file mode 100644
index 0000000..a35235d
--- /dev/null
+++ b/data/import_eventserver.py
@@ -0,0 +1,46 @@
+"""
+Import sample data for classification engine
+"""
+
+import predictionio
+import argparse
+
+def import_events(client, file):
+ f = open(file, 'r')
+ count = 0
+ print "Importing data..."
+ for line in f:
+ data = line.rstrip('\r\n').split(",")
+ plan = data[0]
+ attr = data[1].split(" ")
+ client.create_event(
+ event="$set",
+ entity_type="user",
+ entity_id=str(count), # use the count num as user ID
+ properties= {
+ "attr0" : int(attr[0]),
+ "attr1" : int(attr[1]),
+ "attr2" : int(attr[2]),
+ "plan" : int(plan)
+ }
+ )
+ count += 1
+ f.close()
+ print "%s events are imported." % count
+
+if __name__ == '__main__':
+ parser = argparse.ArgumentParser(
+ description="Import sample data for classification engine")
+ parser.add_argument('--access_key', default='invald_access_key')
+ parser.add_argument('--url', default="http://localhost:7070")
+ parser.add_argument('--file', default="./data/sample_naive_bayes_data.txt")
+
+ args = parser.parse_args()
+ print args
+
+ client = predictionio.EventClient(
+ access_key=args.access_key,
+ url=args.url,
+ threads=5,
+ qsize=500)
+ import_events(client, args.file)
diff --git a/engine.json b/engine.json
new file mode 100644
index 0000000..d20a2ad
--- /dev/null
+++ b/engine.json
@@ -0,0 +1,18 @@
+{
+ "id": "default",
+ "description": "Default settings",
+ "engineFactory": "org.template.classification.ClassificationEngine",
+ "datasource": {
+ "params": {
+ "appId": 2
+ }
+ },
+ "algorithms": [
+ {
+ "name": "naive",
+ "params": {
+ "lambda": 1.0
+ }
+ }
+ ]
+}
diff --git a/project/assembly.sbt b/project/assembly.sbt
new file mode 100644
index 0000000..54c3252
--- /dev/null
+++ b/project/assembly.sbt
@@ -0,0 +1 @@
+addSbtPlugin("com.eed3si9n" % "sbt-assembly" % "0.11.2")
diff --git a/src/main/scala/DataSource.scala b/src/main/scala/DataSource.scala
new file mode 100644
index 0000000..c2a422d
--- /dev/null
+++ b/src/main/scala/DataSource.scala
@@ -0,0 +1,60 @@
+package org.template.classification
+
+import io.prediction.controller.PDataSource
+import io.prediction.controller.EmptyEvaluationInfo
+import io.prediction.controller.EmptyActualResult
+import io.prediction.controller.Params
+import io.prediction.data.storage.Event
+import io.prediction.data.storage.Storage
+
+import org.apache.spark.SparkContext
+import org.apache.spark.SparkContext._
+import org.apache.spark.rdd.RDD
+import org.apache.spark.mllib.regression.LabeledPoint
+import org.apache.spark.mllib.linalg.Vectors
+
+import grizzled.slf4j.Logger
+
+case class DataSourceParams(appId: Int) extends Params
+
+class DataSource(val dsp: DataSourceParams)
+ extends PDataSource[TrainingData,
+ EmptyEvaluationInfo, Query, EmptyActualResult] {
+
+ @transient lazy val logger = Logger[this.type]
+
+ override
+ def readTraining(sc: SparkContext): TrainingData = {
+ val eventsDb = Storage.getPEvents()
+ val labeledPoints: RDD[LabeledPoint] = eventsDb.aggregateProperties(
+ appId = dsp.appId,
+ entityType = "user",
+ // only keep entities with these required properties defined
+ required = Some(List("plan", "attr0", "attr1", "attr2")))(sc)
+ // aggregateProperties() returns RDD pair of
+ // entity ID and its aggregated properties
+ .map { case (entityId, properties) =>
+ try {
+ LabeledPoint(properties.get[Double]("plan"),
+ Vectors.dense(Array(
+ properties.get[Double]("attr0"),
+ properties.get[Double]("attr1"),
+ properties.get[Double]("attr2")
+ ))
+ )
+ } catch {
+ case e: Exception => {
+ logger.error(s"Failed to get properties ${properties} of" +
+ s" ${entityId}. Exception: ${e}.")
+ throw e
+ }
+ }
+ }
+
+ new TrainingData(labeledPoints)
+ }
+}
+
+class TrainingData(
+ val labeledPoints: RDD[LabeledPoint]
+) extends Serializable
diff --git a/src/main/scala/Engine.scala b/src/main/scala/Engine.scala
new file mode 100644
index 0000000..b2eb05e
--- /dev/null
+++ b/src/main/scala/Engine.scala
@@ -0,0 +1,22 @@
+package org.template.classification
+
+import io.prediction.controller.IEngineFactory
+import io.prediction.controller.Engine
+
+class Query(
+ val features: Array[Double]
+) extends Serializable
+
+class PredictedResult(
+ val label: Double
+) extends Serializable
+
+object ClassificationEngine extends IEngineFactory {
+ def apply() = {
+ new Engine(
+ classOf[DataSource],
+ classOf[Preparator],
+ Map("naive" -> classOf[NaiveBayesAlgorithm]),
+ classOf[Serving])
+ }
+}
diff --git a/src/main/scala/NaiveBayesAlgorithm.scala b/src/main/scala/NaiveBayesAlgorithm.scala
new file mode 100644
index 0000000..f4e57a7
--- /dev/null
+++ b/src/main/scala/NaiveBayesAlgorithm.scala
@@ -0,0 +1,37 @@
+package org.template.classification
+
+import io.prediction.controller.P2LAlgorithm
+import io.prediction.controller.Params
+
+import org.apache.spark.mllib.classification.NaiveBayes
+import org.apache.spark.mllib.classification.NaiveBayesModel
+import org.apache.spark.mllib.linalg.Vectors
+
+import grizzled.slf4j.Logger
+
+case class AlgorithmParams(
+ lambda: Double
+) extends Params
+
+// extends P2LAlgorithm because the MLlib's NaiveBayesModel doesn't contain RDD.
+class NaiveBayesAlgorithm(val ap: AlgorithmParams)
+ extends P2LAlgorithm[PreparedData, NaiveBayesModel, Query, PredictedResult] {
+
+ @transient lazy val logger = Logger[this.type]
+
+ def train(data: PreparedData): NaiveBayesModel = {
+ // MLLib NaiveBayes cannot handle empty training data.
+ require(!data.labeledPoints.take(1).isEmpty,
+ s"RDD[labeldPoints] in PreparedData cannot be empty." +
+ " Please check if DataSource generates TrainingData" +
+ " and Preprator generates PreparedData correctly.")
+
+ NaiveBayes.train(data.labeledPoints, ap.lambda)
+ }
+
+ def predict(model: NaiveBayesModel, query: Query): PredictedResult = {
+ val label = model.predict(Vectors.dense(query.features))
+ new PredictedResult(label)
+ }
+
+}
diff --git a/src/main/scala/Preparator.scala b/src/main/scala/Preparator.scala
new file mode 100644
index 0000000..4e581ea
--- /dev/null
+++ b/src/main/scala/Preparator.scala
@@ -0,0 +1,19 @@
+package org.template.classification
+
+import io.prediction.controller.PPreparator
+
+import org.apache.spark.SparkContext
+import org.apache.spark.SparkContext._
+import org.apache.spark.rdd.RDD
+import org.apache.spark.mllib.regression.LabeledPoint
+
+class PreparedData(
+ val labeledPoints: RDD[LabeledPoint]
+) extends Serializable
+
+class Preparator extends PPreparator[TrainingData, PreparedData] {
+
+ def prepare(sc: SparkContext, trainingData: TrainingData): PreparedData = {
+ new PreparedData(trainingData.labeledPoints)
+ }
+}
diff --git a/src/main/scala/Serving.scala b/src/main/scala/Serving.scala
new file mode 100644
index 0000000..ef06088
--- /dev/null
+++ b/src/main/scala/Serving.scala
@@ -0,0 +1,12 @@
+package org.template.classification
+
+import io.prediction.controller.LServing
+
+class Serving extends LServing[Query, PredictedResult] {
+
+ override
+ def serve(query: Query,
+ predictedResults: Seq[PredictedResult]): PredictedResult = {
+ predictedResults.head
+ }
+}