Merge branch 'develop'
diff --git a/README.md b/README.md
index 4bd2564..bc53895 100644
--- a/README.md
+++ b/README.md
@@ -6,9 +6,17 @@
## Versions
+### v0.3.0
+
+- update for PredictionIO 0.9.2, including:
+
+ - use new PEventStore API
+ - use appName in DataSource parameter
+
+
### v0.2.0
-- update for PredictionIO 0.9.2
+- update build.sbt and template.json for PredictionIO 0.9.2
### v0.1.3
diff --git a/engine.json b/engine.json
index c55849f..d1afffa 100644
--- a/engine.json
+++ b/engine.json
@@ -4,7 +4,7 @@
"engineFactory": "org.template.similarproduct.SimilarProductEngine",
"datasource": {
"params" : {
- "appId": 9
+ "appName": "INVALID_APP_NAME"
}
},
"algorithms": [
diff --git a/src/main/scala/DataSource.scala b/src/main/scala/DataSource.scala
index bea337d..b302d1a 100644
--- a/src/main/scala/DataSource.scala
+++ b/src/main/scala/DataSource.scala
@@ -5,7 +5,7 @@
import io.prediction.controller.EmptyActualResult
import io.prediction.controller.Params
import io.prediction.data.storage.Event
-import io.prediction.data.storage.Storage
+import io.prediction.data.store.PEventStore
import org.apache.spark.SparkContext
import org.apache.spark.SparkContext._
@@ -13,7 +13,7 @@
import grizzled.slf4j.Logger
-case class DataSourceParams(appId: Int) extends Params
+case class DataSourceParams(appName: String) extends Params
class DataSource(val dsp: DataSourceParams)
extends PDataSource[TrainingData,
@@ -23,11 +23,10 @@
override
def readTraining(sc: SparkContext): TrainingData = {
- val eventsDb = Storage.getPEvents()
// create a RDD of (entityID, User)
- val usersRDD: RDD[(String, User)] = eventsDb.aggregateProperties(
- appId = dsp.appId,
+ val usersRDD: RDD[(String, User)] = PEventStore.aggregateProperties(
+ appName = dsp.appName,
entityType = "user"
)(sc).map { case (entityId, properties) =>
val user = try {
@@ -43,8 +42,8 @@
}.cache()
// create a RDD of (entityID, Item)
- val itemsRDD: RDD[(String, Item)] = eventsDb.aggregateProperties(
- appId = dsp.appId,
+ val itemsRDD: RDD[(String, Item)] = PEventStore.aggregateProperties(
+ appName = dsp.appName,
entityType = "item"
)(sc).map { case (entityId, properties) =>
val item = try {
@@ -61,8 +60,8 @@
}.cache()
// get all "user" "view" "item" events
- val viewEventsRDD: RDD[ViewEvent] = eventsDb.find(
- appId = dsp.appId,
+ val viewEventsRDD: RDD[ViewEvent] = PEventStore.find(
+ appName = dsp.appName,
entityType = Some("user"),
eventNames = Some(List("view")),
// targetEntityType is optional field of an event.