Use appName as param and new PEventStore and LEventStore API
diff --git a/README.md b/README.md
index 8a975a6..bbcb2fb 100644
--- a/README.md
+++ b/README.md
@@ -6,9 +6,17 @@
## Versions
+### v0.3.0
+
+- update for PredictionIO 0.9.2, including:
+
+ - use new PEventStore and LEventStore API
+ - use appName in DataSource and Algorithm parameters
+
+
### v0.2.0
-- update for PredictionIO 0.9.2
+- update build.sbt and template.json for PredictionIO 0.9.2
### v0.1.1
diff --git a/engine.json b/engine.json
index b8103a6..10b284f 100644
--- a/engine.json
+++ b/engine.json
@@ -4,14 +4,14 @@
"engineFactory": "org.template.ecommercerecommendation.ECommerceRecommendationEngine",
"datasource": {
"params" : {
- "appId": 17
+ "appName": ""
}
},
"algorithms": [
{
"name": "als",
"params": {
- "appId": 17,
+ "appName": "",
"unseenOnly": true,
"seenEvents": ["buy", "view"],
"rank": 10,
diff --git a/src/main/scala/ALSAlgorithm.scala b/src/main/scala/ALSAlgorithm.scala
index e28c4fe..18a6240 100644
--- a/src/main/scala/ALSAlgorithm.scala
+++ b/src/main/scala/ALSAlgorithm.scala
@@ -4,7 +4,7 @@
import io.prediction.controller.Params
import io.prediction.data.storage.BiMap
import io.prediction.data.storage.Event
-import io.prediction.data.storage.Storage
+import io.prediction.data.store.LEventStore
import org.apache.spark.SparkContext
import org.apache.spark.SparkContext._
@@ -18,7 +18,7 @@
import scala.concurrent.ExecutionContext.Implicits.global
case class ALSAlgorithmParams(
- appId: Int,
+ appName: String,
unseenOnly: Boolean,
seenEvents: List[String],
rank: Int,
@@ -57,8 +57,6 @@
extends P2LAlgorithm[PreparedData, ALSModel, Query, PredictedResult] {
@transient lazy val logger = Logger[this.type]
- // NOTE: use getLEvents() for local access
- @transient lazy val lEventsDb = Storage.getLEvents()
def train(sc: SparkContext, data: PreparedData): ALSModel = {
require(!data.viewEvents.take(1).isEmpty,
@@ -155,20 +153,24 @@
val seenItems: Set[String] = if (ap.unseenOnly) {
// get all user item events which are considered as "seen" events
- val seenEvents: Iterator[Event] = lEventsDb.findSingleEntity(
- appId = ap.appId,
- entityType = "user",
- entityId = query.user,
- eventNames = Some(ap.seenEvents),
- targetEntityType = Some(Some("item")),
- // set time limit to avoid super long DB access
- timeout = Duration(200, "millis")
- ) match {
- case Right(x) => x
- case Left(e) => {
- logger.error(s"Error when read seen events: ${e}")
+ val seenEvents: Iterator[Event] = try {
+ LEventStore.findByEntity(
+ appName = ap.appName,
+ entityType = "user",
+ entityId = query.user,
+ eventNames = Some(ap.seenEvents),
+ targetEntityType = Some(Some("item")),
+ // set time limit to avoid super long DB access
+ timeout = Duration(200, "millis")
+ )
+ } catch {
+ case e: scala.concurrent.TimeoutException =>
+ logger.error(s"Timeout when read seen events." +
+ s" Empty list is used. ${e}")
Iterator[Event]()
- }
+ case e: Exception =>
+ logger.error(s"Error when read seen events: ${e}")
+ throw e
}
seenEvents.map { event =>
@@ -186,26 +188,29 @@
}
// get the latest constraint unavailableItems $set event
- val unavailableItems: Set[String] = lEventsDb.findSingleEntity(
- appId = ap.appId,
- entityType = "constraint",
- entityId = "unavailableItems",
- eventNames = Some(Seq("$set")),
- limit = Some(1),
- latest = true,
- timeout = Duration(200, "millis")
- ) match {
- case Right(x) => {
- if (x.hasNext) {
- x.next.properties.get[Set[String]]("items")
- } else {
- Set[String]()
- }
- }
- case Left(e) => {
- logger.error(s"Error when read set unavailableItems event: ${e}")
+ val unavailableItems: Set[String] = try {
+ val constr = LEventStore.findByEntity(
+ appName = ap.appName,
+ entityType = "constraint",
+ entityId = "unavailableItems",
+ eventNames = Some(Seq("$set")),
+ limit = Some(1),
+ latest = true,
+ timeout = Duration(200, "millis")
+ )
+ if (constr.hasNext) {
+ constr.next.properties.get[Set[String]]("items")
+ } else {
Set[String]()
}
+ } catch {
+ case e: scala.concurrent.TimeoutException =>
+ logger.error(s"Timeout when read set unavailableItems event." +
+ s" Empty list is used. ${e}")
+ Set[String]()
+ case e: Exception =>
+ logger.error(s"Error when read set unavailableItems event: ${e}")
+ throw e
}
// combine query's blackList,seenItems and unavailableItems
@@ -286,23 +291,27 @@
val productFeatures = model.productFeatures
// get latest 10 user view item events
- val recentEvents = lEventsDb.findSingleEntity(
- appId = ap.appId,
- // entityType and entityId is specified for fast lookup
- entityType = "user",
- entityId = query.user,
- eventNames = Some(Seq("view")),
- targetEntityType = Some(Some("item")),
- limit = Some(10),
- latest = true,
- // set time limit to avoid super long DB access
- timeout = Duration(200, "millis")
- ) match {
- case Right(x) => x
- case Left(e) => {
- logger.error(s"Error when read recent events: ${e}")
+ val recentEvents = try {
+ LEventStore.findByEntity(
+ appName = ap.appName,
+ // entityType and entityId is specified for fast lookup
+ entityType = "user",
+ entityId = query.user,
+ eventNames = Some(Seq("view")),
+ targetEntityType = Some(Some("item")),
+ limit = Some(10),
+ latest = true,
+ // set time limit to avoid super long DB access
+ timeout = Duration(200, "millis")
+ )
+ } catch {
+ case e: scala.concurrent.TimeoutException =>
+ logger.error(s"Timeout when read recent events." +
+ s" Empty list is used. ${e}")
Iterator[Event]()
- }
+ case e: Exception =>
+ logger.error(s"Error when read recent events: ${e}")
+ throw e
}
val recentItems: Set[String] = recentEvents.map { event =>
diff --git a/src/main/scala/DataSource.scala b/src/main/scala/DataSource.scala
index c102b72..783b2bb 100644
--- a/src/main/scala/DataSource.scala
+++ b/src/main/scala/DataSource.scala
@@ -5,7 +5,7 @@
import io.prediction.controller.EmptyActualResult
import io.prediction.controller.Params
import io.prediction.data.storage.Event
-import io.prediction.data.storage.Storage
+import io.prediction.data.store.PEventStore
import org.apache.spark.SparkContext
import org.apache.spark.SparkContext._
@@ -13,7 +13,7 @@
import grizzled.slf4j.Logger
-case class DataSourceParams(appId: Int) extends Params
+case class DataSourceParams(appName: String) extends Params
class DataSource(val dsp: DataSourceParams)
extends PDataSource[TrainingData,
@@ -23,11 +23,10 @@
override
def readTraining(sc: SparkContext): TrainingData = {
- val eventsDb = Storage.getPEvents()
// create a RDD of (entityID, User)
- val usersRDD: RDD[(String, User)] = eventsDb.aggregateProperties(
- appId = dsp.appId,
+ val usersRDD: RDD[(String, User)] = PEventStore.aggregateProperties(
+ appName = dsp.appName,
entityType = "user"
)(sc).map { case (entityId, properties) =>
val user = try {
@@ -43,8 +42,8 @@
}.cache()
// create a RDD of (entityID, Item)
- val itemsRDD: RDD[(String, Item)] = eventsDb.aggregateProperties(
- appId = dsp.appId,
+ val itemsRDD: RDD[(String, Item)] = PEventStore.aggregateProperties(
+ appName = dsp.appName,
entityType = "item"
)(sc).map { case (entityId, properties) =>
val item = try {
@@ -61,8 +60,8 @@
}.cache()
// get all "user" "view" "item" events
- val viewEventsRDD: RDD[ViewEvent] = eventsDb.find(
- appId = dsp.appId,
+ val viewEventsRDD: RDD[ViewEvent] = PEventStore.find(
+ appName = dsp.appName,
entityType = Some("user"),
eventNames = Some(List("view")),
// targetEntityType is optional field of an event.