PDIO-617 update to use appName as param new PEventStore API
diff --git a/README.md b/README.md
index 4bd2564..bc53895 100644
--- a/README.md
+++ b/README.md
@@ -6,9 +6,17 @@
 
 ## Versions
 
+### v0.3.0
+
+- update for PredictionIO 0.9.2, including:
+
+  - use new PEventStore API
+  - use appName in DataSource parameter
+
+
 ### v0.2.0
 
-- update for PredictionIO 0.9.2
+- update build.sbt and template.json for PredictionIO 0.9.2
 
 ### v0.1.3
 
diff --git a/engine.json b/engine.json
index c55849f..d1afffa 100644
--- a/engine.json
+++ b/engine.json
@@ -4,7 +4,7 @@
   "engineFactory": "org.template.similarproduct.SimilarProductEngine",
   "datasource": {
     "params" : {
-      "appId": 9
+      "appName": "INVALID_APP_NAME"
     }
   },
   "algorithms": [
diff --git a/src/main/scala/DataSource.scala b/src/main/scala/DataSource.scala
index bea337d..b302d1a 100644
--- a/src/main/scala/DataSource.scala
+++ b/src/main/scala/DataSource.scala
@@ -5,7 +5,7 @@
 import io.prediction.controller.EmptyActualResult
 import io.prediction.controller.Params
 import io.prediction.data.storage.Event
-import io.prediction.data.storage.Storage
+import io.prediction.data.store.PEventStore
 
 import org.apache.spark.SparkContext
 import org.apache.spark.SparkContext._
@@ -13,7 +13,7 @@
 
 import grizzled.slf4j.Logger
 
-case class DataSourceParams(appId: Int) extends Params
+case class DataSourceParams(appName: String) extends Params
 
 class DataSource(val dsp: DataSourceParams)
   extends PDataSource[TrainingData,
@@ -23,11 +23,10 @@
 
   override
   def readTraining(sc: SparkContext): TrainingData = {
-    val eventsDb = Storage.getPEvents()
 
     // create a RDD of (entityID, User)
-    val usersRDD: RDD[(String, User)] = eventsDb.aggregateProperties(
-      appId = dsp.appId,
+    val usersRDD: RDD[(String, User)] = PEventStore.aggregateProperties(
+      appName = dsp.appName,
       entityType = "user"
     )(sc).map { case (entityId, properties) =>
       val user = try {
@@ -43,8 +42,8 @@
     }.cache()
 
     // create a RDD of (entityID, Item)
-    val itemsRDD: RDD[(String, Item)] = eventsDb.aggregateProperties(
-      appId = dsp.appId,
+    val itemsRDD: RDD[(String, Item)] = PEventStore.aggregateProperties(
+      appName = dsp.appName,
       entityType = "item"
     )(sc).map { case (entityId, properties) =>
       val item = try {
@@ -61,8 +60,8 @@
     }.cache()
 
     // get all "user" "view" "item" events
-    val viewEventsRDD: RDD[ViewEvent] = eventsDb.find(
-      appId = dsp.appId,
+    val viewEventsRDD: RDD[ViewEvent] = PEventStore.find(
+      appName = dsp.appName,
       entityType = Some("user"),
       eventNames = Some(List("view")),
       // targetEntityType is optional field of an event.