Use appName as param and new PEventStore and LEventStore API
diff --git a/README.md b/README.md
index 8a975a6..bbcb2fb 100644
--- a/README.md
+++ b/README.md
@@ -6,9 +6,17 @@
 
 ## Versions
 
+### v0.3.0
+
+- update for PredictionIO 0.9.2, including:
+
+  - use new PEventStore and LEventStore API
+  - use appName in DataSource and Algorithm parameters
+
+
 ### v0.2.0
 
-- update for PredictionIO 0.9.2
+- update build.sbt and template.json for PredictionIO 0.9.2
 
 ### v0.1.1
 
diff --git a/engine.json b/engine.json
index b8103a6..10b284f 100644
--- a/engine.json
+++ b/engine.json
@@ -4,14 +4,14 @@
   "engineFactory": "org.template.ecommercerecommendation.ECommerceRecommendationEngine",
   "datasource": {
     "params" : {
-      "appId": 17
+      "appName": ""
     }
   },
   "algorithms": [
     {
       "name": "als",
       "params": {
-        "appId": 17,
+        "appName": "",
         "unseenOnly": true,
         "seenEvents": ["buy", "view"],
         "rank": 10,
diff --git a/src/main/scala/ALSAlgorithm.scala b/src/main/scala/ALSAlgorithm.scala
index e28c4fe..18a6240 100644
--- a/src/main/scala/ALSAlgorithm.scala
+++ b/src/main/scala/ALSAlgorithm.scala
@@ -4,7 +4,7 @@
 import io.prediction.controller.Params
 import io.prediction.data.storage.BiMap
 import io.prediction.data.storage.Event
-import io.prediction.data.storage.Storage
+import io.prediction.data.store.LEventStore
 
 import org.apache.spark.SparkContext
 import org.apache.spark.SparkContext._
@@ -18,7 +18,7 @@
 import scala.concurrent.ExecutionContext.Implicits.global
 
 case class ALSAlgorithmParams(
-  appId: Int,
+  appName: String,
   unseenOnly: Boolean,
   seenEvents: List[String],
   rank: Int,
@@ -57,8 +57,6 @@
   extends P2LAlgorithm[PreparedData, ALSModel, Query, PredictedResult] {
 
   @transient lazy val logger = Logger[this.type]
-  // NOTE: use getLEvents() for local access
-  @transient lazy val lEventsDb = Storage.getLEvents()
 
   def train(sc: SparkContext, data: PreparedData): ALSModel = {
     require(!data.viewEvents.take(1).isEmpty,
@@ -155,20 +153,24 @@
     val seenItems: Set[String] = if (ap.unseenOnly) {
 
       // get all user item events which are considered as "seen" events
-      val seenEvents: Iterator[Event] = lEventsDb.findSingleEntity(
-        appId = ap.appId,
-        entityType = "user",
-        entityId = query.user,
-        eventNames = Some(ap.seenEvents),
-        targetEntityType = Some(Some("item")),
-        // set time limit to avoid super long DB access
-        timeout = Duration(200, "millis")
-      ) match {
-        case Right(x) => x
-        case Left(e) => {
-          logger.error(s"Error when read seen events: ${e}")
+      val seenEvents: Iterator[Event] = try {
+        LEventStore.findByEntity(
+          appName = ap.appName,
+          entityType = "user",
+          entityId = query.user,
+          eventNames = Some(ap.seenEvents),
+          targetEntityType = Some(Some("item")),
+          // set time limit to avoid super long DB access
+          timeout = Duration(200, "millis")
+        )
+      } catch {
+        case e: scala.concurrent.TimeoutException =>
+          logger.error(s"Timeout when read seen events." +
+            s" Empty list is used. ${e}")
           Iterator[Event]()
-        }
+        case e: Exception =>
+          logger.error(s"Error when read seen events: ${e}")
+          throw e
       }
 
       seenEvents.map { event =>
@@ -186,26 +188,29 @@
     }
 
     // get the latest constraint unavailableItems $set event
-    val unavailableItems: Set[String] = lEventsDb.findSingleEntity(
-      appId = ap.appId,
-      entityType = "constraint",
-      entityId = "unavailableItems",
-      eventNames = Some(Seq("$set")),
-      limit = Some(1),
-      latest = true,
-      timeout = Duration(200, "millis")
-    ) match {
-      case Right(x) => {
-        if (x.hasNext) {
-          x.next.properties.get[Set[String]]("items")
-        } else {
-          Set[String]()
-        }
-      }
-      case Left(e) => {
-        logger.error(s"Error when read set unavailableItems event: ${e}")
+    val unavailableItems: Set[String] = try {
+      val constr = LEventStore.findByEntity(
+        appName = ap.appName,
+        entityType = "constraint",
+        entityId = "unavailableItems",
+        eventNames = Some(Seq("$set")),
+        limit = Some(1),
+        latest = true,
+        timeout = Duration(200, "millis")
+      )
+      if (constr.hasNext) {
+        constr.next.properties.get[Set[String]]("items")
+      } else {
         Set[String]()
       }
+    } catch {
+      case e: scala.concurrent.TimeoutException =>
+        logger.error(s"Timeout when read set unavailableItems event." +
+          s" Empty list is used. ${e}")
+        Set[String]()
+      case e: Exception =>
+        logger.error(s"Error when read set unavailableItems event: ${e}")
+        throw e
     }
 
     // combine query's blackList,seenItems and unavailableItems
@@ -286,23 +291,27 @@
     val productFeatures = model.productFeatures
 
     // get latest 10 user view item events
-    val recentEvents = lEventsDb.findSingleEntity(
-      appId = ap.appId,
-      // entityType and entityId is specified for fast lookup
-      entityType = "user",
-      entityId = query.user,
-      eventNames = Some(Seq("view")),
-      targetEntityType = Some(Some("item")),
-      limit = Some(10),
-      latest = true,
-      // set time limit to avoid super long DB access
-      timeout = Duration(200, "millis")
-    ) match {
-      case Right(x) => x
-      case Left(e) => {
-        logger.error(s"Error when read recent events: ${e}")
+    val recentEvents = try {
+      LEventStore.findByEntity(
+        appName = ap.appName,
+        // entityType and entityId is specified for fast lookup
+        entityType = "user",
+        entityId = query.user,
+        eventNames = Some(Seq("view")),
+        targetEntityType = Some(Some("item")),
+        limit = Some(10),
+        latest = true,
+        // set time limit to avoid super long DB access
+        timeout = Duration(200, "millis")
+      )
+    } catch {
+      case e: scala.concurrent.TimeoutException =>
+        logger.error(s"Timeout when read recent events." +
+          s" Empty list is used. ${e}")
         Iterator[Event]()
-      }
+      case e: Exception =>
+        logger.error(s"Error when read recent events: ${e}")
+        throw e
     }
 
     val recentItems: Set[String] = recentEvents.map { event =>
diff --git a/src/main/scala/DataSource.scala b/src/main/scala/DataSource.scala
index c102b72..783b2bb 100644
--- a/src/main/scala/DataSource.scala
+++ b/src/main/scala/DataSource.scala
@@ -5,7 +5,7 @@
 import io.prediction.controller.EmptyActualResult
 import io.prediction.controller.Params
 import io.prediction.data.storage.Event
-import io.prediction.data.storage.Storage
+import io.prediction.data.store.PEventStore
 
 import org.apache.spark.SparkContext
 import org.apache.spark.SparkContext._
@@ -13,7 +13,7 @@
 
 import grizzled.slf4j.Logger
 
-case class DataSourceParams(appId: Int) extends Params
+case class DataSourceParams(appName: String) extends Params
 
 class DataSource(val dsp: DataSourceParams)
   extends PDataSource[TrainingData,
@@ -23,11 +23,10 @@
 
   override
   def readTraining(sc: SparkContext): TrainingData = {
-    val eventsDb = Storage.getPEvents()
 
     // create a RDD of (entityID, User)
-    val usersRDD: RDD[(String, User)] = eventsDb.aggregateProperties(
-      appId = dsp.appId,
+    val usersRDD: RDD[(String, User)] = PEventStore.aggregateProperties(
+      appName = dsp.appName,
       entityType = "user"
     )(sc).map { case (entityId, properties) =>
       val user = try {
@@ -43,8 +42,8 @@
     }.cache()
 
     // create a RDD of (entityID, Item)
-    val itemsRDD: RDD[(String, Item)] = eventsDb.aggregateProperties(
-      appId = dsp.appId,
+    val itemsRDD: RDD[(String, Item)] = PEventStore.aggregateProperties(
+      appName = dsp.appName,
       entityType = "item"
     )(sc).map { case (entityId, properties) =>
       val item = try {
@@ -61,8 +60,8 @@
     }.cache()
 
     // get all "user" "view" "item" events
-    val viewEventsRDD: RDD[ViewEvent] = eventsDb.find(
-      appId = dsp.appId,
+    val viewEventsRDD: RDD[ViewEvent] = PEventStore.find(
+      appName = dsp.appName,
       entityType = Some("user"),
       eventNames = Some(List("view")),
       // targetEntityType is optional field of an event.