Merge branch 'develop'
diff --git a/.gitignore b/.gitignore
index edc8287..7c6771b 100644
--- a/.gitignore
+++ b/.gitignore
@@ -1,4 +1,5 @@
data/*.txt
manifest.json
pio.log
+/pio.sbt
target/
diff --git a/README.md b/README.md
index a369500..6f0eb4c 100644
--- a/README.md
+++ b/README.md
@@ -8,6 +8,10 @@
### develop
+### v0.1.2
+
+- update for PredictionIO 0.9.0
+- add sample evaluation
### v0.1.1
diff --git a/build.sbt b/build.sbt
index a54ce50..a7e7858 100644
--- a/build.sbt
+++ b/build.sbt
@@ -7,8 +7,6 @@
organization := "io.prediction"
libraryDependencies ++= Seq(
- "io.prediction" %% "core" % "0.8.6" % "provided",
- "commons-io" % "commons-io" % "2.4",
+ "io.prediction" %% "core" % pioVersion.value % "provided",
"org.apache.spark" %% "spark-core" % "1.2.0" % "provided",
- "org.apache.spark" %% "spark-mllib" % "1.2.0" % "provided",
- "org.json4s" %% "json4s-native" % "3.2.10")
+ "org.apache.spark" %% "spark-mllib" % "1.2.0" % "provided")
diff --git a/data/import_eventserver.py b/data/import_eventserver.py
index a35235d..7e808e3 100644
--- a/data/import_eventserver.py
+++ b/data/import_eventserver.py
@@ -33,7 +33,7 @@
description="Import sample data for classification engine")
parser.add_argument('--access_key', default='invald_access_key')
parser.add_argument('--url', default="http://localhost:7070")
- parser.add_argument('--file', default="./data/sample_naive_bayes_data.txt")
+ parser.add_argument('--file', default="./data/data.txt")
args = parser.parse_args()
print args
diff --git a/project/pio-build.sbt b/project/pio-build.sbt
new file mode 100644
index 0000000..8346a96
--- /dev/null
+++ b/project/pio-build.sbt
@@ -0,0 +1 @@
+addSbtPlugin("io.prediction" % "pio-build" % "0.9.0")
diff --git a/src/main/scala/DataSource.scala b/src/main/scala/DataSource.scala
index 166bcaa..c9b8b82 100644
--- a/src/main/scala/DataSource.scala
+++ b/src/main/scala/DataSource.scala
@@ -15,11 +15,14 @@
import grizzled.slf4j.Logger
-case class DataSourceParams(appId: Int) extends Params
+case class DataSourceParams(
+ appId: Int,
+ evalK: Option[Int] // define the k-fold parameter.
+) extends Params
class DataSource(val dsp: DataSourceParams)
extends PDataSource[TrainingData,
- EmptyEvaluationInfo, Query, EmptyActualResult] {
+ EmptyEvaluationInfo, Query, ActualResult] {
@transient lazy val logger = Logger[this.type]
@@ -53,6 +56,61 @@
new TrainingData(labeledPoints)
}
+
+ override
+ def readEval(sc: SparkContext)
+ : Seq[(TrainingData, EmptyEvaluationInfo, RDD[(Query, ActualResult)])] = {
+ require(!dsp.evalK.isEmpty, "DataSourceParams.evalK must not be None")
+
+ // The following code reads the data from data store. It is equivalent to
+ // the readTraining method. We copy-and-paste the exact code here for
+ // illustration purpose, a recommended approach is to factor out this logic
+ // into a helper function and have both readTraining and readEval call the
+ // helper.
+ val eventsDb = Storage.getPEvents()
+ val labeledPoints: RDD[LabeledPoint] = eventsDb.aggregateProperties(
+ appId = dsp.appId,
+ entityType = "user",
+ // only keep entities with these required properties defined
+ required = Some(List("plan", "attr0", "attr1", "attr2")))(sc)
+ // aggregateProperties() returns RDD pair of
+ // entity ID and its aggregated properties
+ .map { case (entityId, properties) =>
+ try {
+ LabeledPoint(properties.get[Double]("plan"),
+ Vectors.dense(Array(
+ properties.get[Double]("attr0"),
+ properties.get[Double]("attr1"),
+ properties.get[Double]("attr2")
+ ))
+ )
+ } catch {
+ case e: Exception => {
+ logger.error(s"Failed to get properties ${properties} of" +
+ s" ${entityId}. Exception: ${e}.")
+ throw e
+ }
+ }
+ }.cache()
+ // End of reading from data store
+
+ // K-fold splitting
+ val evalK = dsp.evalK.get
+ val indexedPoints: RDD[(LabeledPoint, Long)] = labeledPoints.zipWithIndex
+
+ (0 until evalK).map { idx =>
+ val trainingPoints = indexedPoints.filter(_._2 % evalK != idx).map(_._1)
+ val testingPoints = indexedPoints.filter(_._2 % evalK == idx).map(_._1)
+
+ (
+ new TrainingData(trainingPoints),
+ new EmptyEvaluationInfo(),
+ testingPoints.map {
+ p => (new Query(p.features.toArray), new ActualResult(p.label))
+ }
+ )
+ }
+ }
}
class TrainingData(
diff --git a/src/main/scala/Engine.scala b/src/main/scala/Engine.scala
index b2eb05e..e7134d0 100644
--- a/src/main/scala/Engine.scala
+++ b/src/main/scala/Engine.scala
@@ -11,6 +11,10 @@
val label: Double
) extends Serializable
+class ActualResult(
+ val label: Double
+) extends Serializable
+
object ClassificationEngine extends IEngineFactory {
def apply() = {
new Engine(
diff --git a/src/main/scala/Evaluation.scala b/src/main/scala/Evaluation.scala
new file mode 100644
index 0000000..8a903ee
--- /dev/null
+++ b/src/main/scala/Evaluation.scala
@@ -0,0 +1,41 @@
+package org.template.classification
+
+import io.prediction.controller.AverageMetric
+import io.prediction.controller.EmptyEvaluationInfo
+import io.prediction.controller.EngineParams
+import io.prediction.controller.EngineParamsGenerator
+import io.prediction.controller.Evaluation
+import io.prediction.controller.Workflow
+import org.apache.spark.SparkContext
+import org.apache.spark.SparkContext._
+import org.apache.spark.rdd.RDD
+
+case class Precision
+ extends AverageMetric[EmptyEvaluationInfo,
+ Query, PredictedResult, ActualResult] {
+ def calculate(query: Query, predicted: PredictedResult, actual: ActualResult)
+ : Double = (if (predicted.label == actual.label) 1.0 else 0.0)
+}
+
+object PrecisionEvaluation extends Evaluation {
+ // Define Engine and Metric used in Evaluation
+ engineMetric = (ClassificationEngine(), new Precision())
+}
+
+object EngineParamsList extends EngineParamsGenerator {
+ // Define list of EngineParams used in Evaluation
+
+ // First, we define the base engine params. It specifies the appId from which
+ // the data is read, and a evalK parameter is used to define the
+ // cross-validation.
+ private[this] val baseEP = EngineParams(
+ dataSourceParams = DataSourceParams(appId = 18, evalK = Some(5)))
+
+ // Second, we specify the engine params list by explicitly listing all
+ // algorithm parameters. In this case, we evaluate 3 engine params, each with
+ // a different algorithm params value.
+ engineParamsList = Seq(
+ baseEP.copy(algorithmParamsList = Seq(("naive", AlgorithmParams(10.0)))),
+ baseEP.copy(algorithmParamsList = Seq(("naive", AlgorithmParams(100.0)))),
+ baseEP.copy(algorithmParamsList = Seq(("naive", AlgorithmParams(1000.0)))))
+}
diff --git a/src/main/scala/NaiveBayesAlgorithm.scala b/src/main/scala/NaiveBayesAlgorithm.scala
index f4e57a7..205c8c3 100644
--- a/src/main/scala/NaiveBayesAlgorithm.scala
+++ b/src/main/scala/NaiveBayesAlgorithm.scala
@@ -6,6 +6,7 @@
import org.apache.spark.mllib.classification.NaiveBayes
import org.apache.spark.mllib.classification.NaiveBayesModel
import org.apache.spark.mllib.linalg.Vectors
+import org.apache.spark.SparkContext
import grizzled.slf4j.Logger
@@ -19,7 +20,7 @@
@transient lazy val logger = Logger[this.type]
- def train(data: PreparedData): NaiveBayesModel = {
+ def train(sc: SparkContext, data: PreparedData): NaiveBayesModel = {
// MLLib NaiveBayes cannot handle empty training data.
require(!data.labeledPoints.take(1).isEmpty,
s"RDD[labeldPoints] in PreparedData cannot be empty." +
diff --git a/template.json b/template.json
new file mode 100644
index 0000000..932e603
--- /dev/null
+++ b/template.json
@@ -0,0 +1 @@
+{"pio": {"version": { "min": "0.9.0" }}}