Merge branch 'develop'
diff --git a/README.md b/README.md
index 507ecf0..23eea3a 100644
--- a/README.md
+++ b/README.md
@@ -6,6 +6,16 @@
## Versions
+### v0.4.0
+
+- Change from ALSAlgorithm.scala to ECommAlgorithm.scala
+
+ * return popular bought items when no information is found for the user.
+ * add "similarEvents" parameter for configuration what user-to-item events are used for finding similar items
+ * re-structure the Algorithm code for easier customization and testing
+
+- add some unit tests for testing code that may be customized
+
### v0.3.1
- use INVALID_APP_NAME as default appName in engine.json
@@ -112,7 +122,11 @@
import some view events and try to get recommendation for x1 again.
```
-curl -i -X POST http://localhost:7070/events.json?accessKey=zPkr6sBwQoBwBjVHK2hsF9u26L38ARSe19QzkdYentuomCtYSuH0vXP5fq7advo4 \
+accessKey=<YOUR_ACCESS_KEY>
+```
+
+```
+curl -i -X POST http://localhost:7070/events.json?accessKey=$accessKey \
-H "Content-Type: application/json" \
-d '{
"event" : "view",
@@ -123,7 +137,7 @@
"eventTime" : "2015-02-17T02:11:21.934Z"
}'
-curl -i -X POST http://localhost:7070/events.json?accessKey=zPkr6sBwQoBwBjVHK2hsF9u26L38ARSe19QzkdYentuomCtYSuH0vXP5fq7advo4 \
+curl -i -X POST http://localhost:7070/events.json?accessKey=$accessKey \
-H "Content-Type: application/json" \
-d '{
"event" : "view",
@@ -141,7 +155,7 @@
Set the following items as unavailable (need to specify complete list each time when this list is changed):
```
-curl -i -X POST http://localhost:7070/events.json?accessKey=zPkr6sBwQoBwBjVHK2hsF9u26L38ARSe19QzkdYentuomCtYSuH0vXP5fq7advo4 \
+curl -i -X POST http://localhost:7070/events.json?accessKey=$accessKey \
-H "Content-Type: application/json" \
-d '{
"event" : "$set",
@@ -157,7 +171,7 @@
Set empty list when no more items unavailable:
```
-curl -i -X POST http://localhost:7070/events.json?accessKey=zPkr6sBwQoBwBjVHK2hsF9u26L38ARSe19QzkdYentuomCtYSuH0vXP5fq7advo4 \
+curl -i -X POST http://localhost:7070/events.json?accessKey=$accessKey \
-H "Content-Type: application/json" \
-d '{
"event" : "$set",
diff --git a/build.sbt b/build.sbt
index 6de8bac..896f6db 100644
--- a/build.sbt
+++ b/build.sbt
@@ -6,7 +6,10 @@
organization := "io.prediction"
+parallelExecution in Test := false
+
libraryDependencies ++= Seq(
"io.prediction" %% "core" % pioVersion.value % "provided",
"org.apache.spark" %% "spark-core" % "1.3.0" % "provided",
- "org.apache.spark" %% "spark-mllib" % "1.3.0" % "provided")
+ "org.apache.spark" %% "spark-mllib" % "1.3.0" % "provided",
+ "org.scalatest" %% "scalatest" % "2.2.1" % "test")
diff --git a/engine.json b/engine.json
index 1da14e1..0ea26a0 100644
--- a/engine.json
+++ b/engine.json
@@ -9,11 +9,12 @@
},
"algorithms": [
{
- "name": "als",
+ "name": "ecomm",
"params": {
"appName": "INVALID_APP_NAME",
"unseenOnly": true,
"seenEvents": ["buy", "view"],
+ "similarEvents": ["view"],
"rank": 10,
"numIterations" : 20,
"lambda": 0.01,
diff --git a/src/main/scala/ALSAlgorithm.scala b/src/main/scala/ALSAlgorithm.scala
deleted file mode 100644
index 18a6240..0000000
--- a/src/main/scala/ALSAlgorithm.scala
+++ /dev/null
@@ -1,439 +0,0 @@
-package org.template.ecommercerecommendation
-
-import io.prediction.controller.P2LAlgorithm
-import io.prediction.controller.Params
-import io.prediction.data.storage.BiMap
-import io.prediction.data.storage.Event
-import io.prediction.data.store.LEventStore
-
-import org.apache.spark.SparkContext
-import org.apache.spark.SparkContext._
-import org.apache.spark.mllib.recommendation.ALS
-import org.apache.spark.mllib.recommendation.{Rating => MLlibRating}
-
-import grizzled.slf4j.Logger
-
-import scala.collection.mutable.PriorityQueue
-import scala.concurrent.duration.Duration
-import scala.concurrent.ExecutionContext.Implicits.global
-
-case class ALSAlgorithmParams(
- appName: String,
- unseenOnly: Boolean,
- seenEvents: List[String],
- rank: Int,
- numIterations: Int,
- lambda: Double,
- seed: Option[Long]
-) extends Params
-
-class ALSModel(
- val rank: Int,
- val userFeatures: Map[Int, Array[Double]],
- val productFeatures: Map[Int, (Item, Option[Array[Double]])],
- val userStringIntMap: BiMap[String, Int],
- val itemStringIntMap: BiMap[String, Int]
-) extends Serializable {
-
- @transient lazy val itemIntStringMap = itemStringIntMap.inverse
-
- override def toString = {
- s" rank: ${rank}" +
- s" userFeatures: [${userFeatures.size}]" +
- s"(${userFeatures.take(2).toList}...)" +
- s" productFeatures: [${productFeatures.size}]" +
- s"(${productFeatures.take(2).toList}...)" +
- s" userStringIntMap: [${userStringIntMap.size}]" +
- s"(${userStringIntMap.take(2).toString}...)]" +
- s" itemStringIntMap: [${itemStringIntMap.size}]" +
- s"(${itemStringIntMap.take(2).toString}...)]"
- }
-}
-
-/**
- * Use ALS to build item x feature matrix
- */
-class ALSAlgorithm(val ap: ALSAlgorithmParams)
- extends P2LAlgorithm[PreparedData, ALSModel, Query, PredictedResult] {
-
- @transient lazy val logger = Logger[this.type]
-
- def train(sc: SparkContext, data: PreparedData): ALSModel = {
- require(!data.viewEvents.take(1).isEmpty,
- s"viewEvents in PreparedData cannot be empty." +
- " Please check if DataSource generates TrainingData" +
- " and Preprator generates PreparedData correctly.")
- require(!data.users.take(1).isEmpty,
- s"users in PreparedData cannot be empty." +
- " Please check if DataSource generates TrainingData" +
- " and Preprator generates PreparedData correctly.")
- require(!data.items.take(1).isEmpty,
- s"items in PreparedData cannot be empty." +
- " Please check if DataSource generates TrainingData" +
- " and Preprator generates PreparedData correctly.")
- // create User and item's String ID to integer index BiMap
- val userStringIntMap = BiMap.stringInt(data.users.keys)
- val itemStringIntMap = BiMap.stringInt(data.items.keys)
-
- val mllibRatings = data.viewEvents
- .map { r =>
- // Convert user and item String IDs to Int index for MLlib
- val uindex = userStringIntMap.getOrElse(r.user, -1)
- val iindex = itemStringIntMap.getOrElse(r.item, -1)
-
- if (uindex == -1)
- logger.info(s"Couldn't convert nonexistent user ID ${r.user}"
- + " to Int index.")
-
- if (iindex == -1)
- logger.info(s"Couldn't convert nonexistent item ID ${r.item}"
- + " to Int index.")
-
- ((uindex, iindex), 1)
- }.filter { case ((u, i), v) =>
- // keep events with valid user and item index
- (u != -1) && (i != -1)
- }
- .reduceByKey(_ + _) // aggregate all view events of same user-item pair
- .map { case ((u, i), v) =>
- // MLlibRating requires integer index for user and item
- MLlibRating(u, i, v)
- }.cache()
-
- // MLLib ALS cannot handle empty training data.
- require(!mllibRatings.take(1).isEmpty,
- s"mllibRatings cannot be empty." +
- " Please check if your events contain valid user and item ID.")
-
- // seed for MLlib ALS
- val seed = ap.seed.getOrElse(System.nanoTime)
-
- val m = ALS.trainImplicit(
- ratings = mllibRatings,
- rank = ap.rank,
- iterations = ap.numIterations,
- lambda = ap.lambda,
- blocks = -1,
- alpha = 1.0,
- seed = seed)
-
- val userFeatures = m.userFeatures.collectAsMap.toMap
-
- // convert ID to Int index
- val items = data.items.map { case (id, item) =>
- (itemStringIntMap(id), item)
- }
-
- // join item with the trained productFeatures
- val productFeatures = items.leftOuterJoin(m.productFeatures)
- .collectAsMap.toMap
-
- new ALSModel(
- rank = m.rank,
- userFeatures = userFeatures,
- productFeatures = productFeatures,
- userStringIntMap = userStringIntMap,
- itemStringIntMap = itemStringIntMap
- )
- }
-
- def predict(model: ALSModel, query: Query): PredictedResult = {
-
- val userFeatures = model.userFeatures
- val productFeatures = model.productFeatures
-
- // convert whiteList's string ID to integer index
- val whiteList: Option[Set[Int]] = query.whiteList.map( set =>
- set.map(model.itemStringIntMap.get(_)).flatten
- )
-
- val blackList: Set[String] = query.blackList.getOrElse(Set[String]())
-
- // if unseenOnly is True, get all seen items
- val seenItems: Set[String] = if (ap.unseenOnly) {
-
- // get all user item events which are considered as "seen" events
- val seenEvents: Iterator[Event] = try {
- LEventStore.findByEntity(
- appName = ap.appName,
- entityType = "user",
- entityId = query.user,
- eventNames = Some(ap.seenEvents),
- targetEntityType = Some(Some("item")),
- // set time limit to avoid super long DB access
- timeout = Duration(200, "millis")
- )
- } catch {
- case e: scala.concurrent.TimeoutException =>
- logger.error(s"Timeout when read seen events." +
- s" Empty list is used. ${e}")
- Iterator[Event]()
- case e: Exception =>
- logger.error(s"Error when read seen events: ${e}")
- throw e
- }
-
- seenEvents.map { event =>
- try {
- event.targetEntityId.get
- } catch {
- case e => {
- logger.error(s"Can't get targetEntityId of event ${event}.")
- throw e
- }
- }
- }.toSet
- } else {
- Set[String]()
- }
-
- // get the latest constraint unavailableItems $set event
- val unavailableItems: Set[String] = try {
- val constr = LEventStore.findByEntity(
- appName = ap.appName,
- entityType = "constraint",
- entityId = "unavailableItems",
- eventNames = Some(Seq("$set")),
- limit = Some(1),
- latest = true,
- timeout = Duration(200, "millis")
- )
- if (constr.hasNext) {
- constr.next.properties.get[Set[String]]("items")
- } else {
- Set[String]()
- }
- } catch {
- case e: scala.concurrent.TimeoutException =>
- logger.error(s"Timeout when read set unavailableItems event." +
- s" Empty list is used. ${e}")
- Set[String]()
- case e: Exception =>
- logger.error(s"Error when read set unavailableItems event: ${e}")
- throw e
- }
-
- // combine query's blackList,seenItems and unavailableItems
- // into final blackList.
- // convert seen Items list from String ID to interger Index
- val finalBlackList: Set[Int] = (blackList ++ seenItems ++
- unavailableItems).map( x => model.itemStringIntMap.get(x)).flatten
-
- val userFeature =
- model.userStringIntMap.get(query.user).map { userIndex =>
- userFeatures.get(userIndex)
- }
- // flatten Option[Option[Array[Double]]] to Option[Array[Double]]
- .flatten
-
- val topScores = if (userFeature.isDefined) {
- // the user has feature vector
- val uf = userFeature.get
- val indexScores: Map[Int, Double] =
- productFeatures.par // convert to parallel collection
- .filter { case (i, (item, feature)) =>
- feature.isDefined &&
- isCandidateItem(
- i = i,
- item = item,
- categories = query.categories,
- whiteList = whiteList,
- blackList = finalBlackList
- )
- }
- .map { case (i, (item, feature)) =>
- // NOTE: feature must be defined, so can call .get
- val s = dotProduct(uf, feature.get)
- // Can adjust score here
- (i, s)
- }
- .filter(_._2 > 0) // only keep items with score > 0
- .seq // convert back to sequential collection
-
- val ord = Ordering.by[(Int, Double), Double](_._2).reverse
- val topScores = getTopN(indexScores, query.num)(ord).toArray
-
- topScores
-
- } else {
- // the user doesn't have feature vector.
- // For example, new user is created after model is trained.
- logger.info(s"No userFeature found for user ${query.user}.")
- predictNewUser(
- model = model,
- query = query,
- whiteList = whiteList,
- blackList = finalBlackList
- )
-
- }
-
- val itemScores = topScores.map { case (i, s) =>
- new ItemScore(
- // convert item int index back to string ID
- item = model.itemIntStringMap(i),
- score = s
- )
- }
-
- new PredictedResult(itemScores)
- }
-
- /** Get recently viewed item of the user and return top similar items */
- private
- def predictNewUser(
- model: ALSModel,
- query: Query,
- whiteList: Option[Set[Int]],
- blackList: Set[Int]): Array[(Int, Double)] = {
-
- val userFeatures = model.userFeatures
- val productFeatures = model.productFeatures
-
- // get latest 10 user view item events
- val recentEvents = try {
- LEventStore.findByEntity(
- appName = ap.appName,
- // entityType and entityId is specified for fast lookup
- entityType = "user",
- entityId = query.user,
- eventNames = Some(Seq("view")),
- targetEntityType = Some(Some("item")),
- limit = Some(10),
- latest = true,
- // set time limit to avoid super long DB access
- timeout = Duration(200, "millis")
- )
- } catch {
- case e: scala.concurrent.TimeoutException =>
- logger.error(s"Timeout when read recent events." +
- s" Empty list is used. ${e}")
- Iterator[Event]()
- case e: Exception =>
- logger.error(s"Error when read recent events: ${e}")
- throw e
- }
-
- val recentItems: Set[String] = recentEvents.map { event =>
- try {
- event.targetEntityId.get
- } catch {
- case e => {
- logger.error("Can't get targetEntityId of event ${event}.")
- throw e
- }
- }
- }.toSet
-
- val recentList: Set[Int] = recentItems.map (x =>
- model.itemStringIntMap.get(x)).flatten
-
- val recentFeatures: Vector[Array[Double]] = recentList.toVector
- // productFeatures may not contain the requested item
- .map { i =>
- productFeatures.get(i).map { case (item, f) => f }.flatten
- }.flatten
-
- val indexScores: Map[Int, Double] = if (recentFeatures.isEmpty) {
- logger.info(s"No productFeatures vector for recent items ${recentItems}.")
- Map[Int, Double]()
- } else {
- productFeatures.par // convert to parallel collection
- .filter { case (i, (item, feature)) =>
- feature.isDefined &&
- isCandidateItem(
- i = i,
- item = item,
- categories = query.categories,
- whiteList = whiteList,
- blackList = blackList
- )
- }
- .map { case (i, (item, feature)) =>
- val s = recentFeatures.map{ rf =>
- cosine(rf, feature.get) // feature is defined
- }.reduce(_ + _)
- // Can adjust score here
- (i, s)
- }
- .filter(_._2 > 0) // keep items with score > 0
- .seq // convert back to sequential collection
- }
-
- val ord = Ordering.by[(Int, Double), Double](_._2).reverse
- val topScores = getTopN(indexScores, query.num)(ord).toArray
-
- topScores
- }
-
- private
- def getTopN[T](s: Iterable[T], n: Int)(implicit ord: Ordering[T]): Seq[T] = {
-
- val q = PriorityQueue()
-
- for (x <- s) {
- if (q.size < n)
- q.enqueue(x)
- else {
- // q is full
- if (ord.compare(x, q.head) < 0) {
- q.dequeue()
- q.enqueue(x)
- }
- }
- }
-
- q.dequeueAll.toSeq.reverse
- }
-
- private
- def dotProduct(v1: Array[Double], v2: Array[Double]): Double = {
- val size = v1.size
- var i = 0
- var d: Double = 0
- while (i < size) {
- d += v1(i) * v2(i)
- i += 1
- }
- d
- }
-
- private
- def cosine(v1: Array[Double], v2: Array[Double]): Double = {
- val size = v1.size
- var i = 0
- var n1: Double = 0
- var n2: Double = 0
- var d: Double = 0
- while (i < size) {
- n1 += v1(i) * v1(i)
- n2 += v2(i) * v2(i)
- d += v1(i) * v2(i)
- i += 1
- }
- val n1n2 = (math.sqrt(n1) * math.sqrt(n2))
- if (n1n2 == 0) 0 else (d / n1n2)
- }
-
- private
- def isCandidateItem(
- i: Int,
- item: Item,
- categories: Option[Set[String]],
- whiteList: Option[Set[Int]],
- blackList: Set[Int]
- ): Boolean = {
- // can add other custom filtering here
- whiteList.map(_.contains(i)).getOrElse(true) &&
- !blackList.contains(i) &&
- // filter categories
- categories.map { cat =>
- item.categories.map { itemCat =>
- // keep this item if has ovelap categories with the query
- !(itemCat.toSet.intersect(cat).isEmpty)
- }.getOrElse(false) // discard this item if it has no categories
- }.getOrElse(true)
-
- }
-
-}
diff --git a/src/main/scala/DataSource.scala b/src/main/scala/DataSource.scala
index 783b2bb..bb83a3d 100644
--- a/src/main/scala/DataSource.scala
+++ b/src/main/scala/DataSource.scala
@@ -59,37 +59,53 @@
(entityId, item)
}.cache()
- // get all "user" "view" "item" events
- val viewEventsRDD: RDD[ViewEvent] = PEventStore.find(
+ val eventsRDD: RDD[Event] = PEventStore.find(
appName = dsp.appName,
entityType = Some("user"),
- eventNames = Some(List("view")),
+ eventNames = Some(List("view", "buy")),
// targetEntityType is optional field of an event.
targetEntityType = Some(Some("item")))(sc)
- // eventsDb.find() returns RDD[Event]
+ .cache()
+
+ val viewEventsRDD: RDD[ViewEvent] = eventsRDD
+ .filter { event => event.event == "view" }
.map { event =>
- val viewEvent = try {
- event.event match {
- case "view" => ViewEvent(
- user = event.entityId,
- item = event.targetEntityId.get,
- t = event.eventTime.getMillis)
- case _ => throw new Exception(s"Unexpected event ${event} is read.")
- }
+ try {
+ ViewEvent(
+ user = event.entityId,
+ item = event.targetEntityId.get,
+ t = event.eventTime.getMillis
+ )
} catch {
- case e: Exception => {
+ case e: Exception =>
logger.error(s"Cannot convert ${event} to ViewEvent." +
s" Exception: ${e}.")
throw e
- }
}
- viewEvent
- }.cache()
+ }
+
+ val buyEventsRDD: RDD[BuyEvent] = eventsRDD
+ .filter { event => event.event == "buy" }
+ .map { event =>
+ try {
+ BuyEvent(
+ user = event.entityId,
+ item = event.targetEntityId.get,
+ t = event.eventTime.getMillis
+ )
+ } catch {
+ case e: Exception =>
+ logger.error(s"Cannot convert ${event} to BuyEvent." +
+ s" Exception: ${e}.")
+ throw e
+ }
+ }
new TrainingData(
users = usersRDD,
items = itemsRDD,
- viewEvents = viewEventsRDD
+ viewEvents = viewEventsRDD,
+ buyEvents = buyEventsRDD
)
}
}
@@ -100,14 +116,18 @@
case class ViewEvent(user: String, item: String, t: Long)
+case class BuyEvent(user: String, item: String, t: Long)
+
class TrainingData(
val users: RDD[(String, User)],
val items: RDD[(String, Item)],
- val viewEvents: RDD[ViewEvent]
+ val viewEvents: RDD[ViewEvent],
+ val buyEvents: RDD[BuyEvent]
) extends Serializable {
override def toString = {
s"users: [${users.count()} (${users.take(2).toList}...)]" +
s"items: [${items.count()} (${items.take(2).toList}...)]" +
- s"viewEvents: [${viewEvents.count()}] (${viewEvents.take(2).toList}...)"
+ s"viewEvents: [${viewEvents.count()}] (${viewEvents.take(2).toList}...)" +
+ s"buyEvents: [${buyEvents.count()}] (${buyEvents.take(2).toList}...)"
}
}
diff --git a/src/main/scala/ECommAlgorithm.scala b/src/main/scala/ECommAlgorithm.scala
new file mode 100644
index 0000000..674fd09
--- /dev/null
+++ b/src/main/scala/ECommAlgorithm.scala
@@ -0,0 +1,573 @@
+package org.template.ecommercerecommendation
+
+import io.prediction.controller.P2LAlgorithm
+import io.prediction.controller.Params
+import io.prediction.data.storage.BiMap
+import io.prediction.data.storage.Event
+import io.prediction.data.store.LEventStore
+
+import org.apache.spark.SparkContext
+import org.apache.spark.SparkContext._
+import org.apache.spark.mllib.recommendation.ALS
+import org.apache.spark.mllib.recommendation.{Rating => MLlibRating}
+import org.apache.spark.rdd.RDD
+
+import grizzled.slf4j.Logger
+
+import scala.collection.mutable.PriorityQueue
+import scala.concurrent.duration.Duration
+import scala.concurrent.ExecutionContext.Implicits.global
+
+case class ECommAlgorithmParams(
+ appName: String,
+ unseenOnly: Boolean,
+ seenEvents: List[String],
+ similarEvents: List[String],
+ rank: Int,
+ numIterations: Int,
+ lambda: Double,
+ seed: Option[Long]
+) extends Params
+
+
+case class ProductModel(
+ item: Item,
+ features: Option[Array[Double]], // features by ALS
+ count: Int // popular count for default score
+)
+
+class ECommModel(
+ val rank: Int,
+ val userFeatures: Map[Int, Array[Double]],
+ val productModels: Map[Int, ProductModel],
+ val userStringIntMap: BiMap[String, Int],
+ val itemStringIntMap: BiMap[String, Int]
+) extends Serializable {
+
+ @transient lazy val itemIntStringMap = itemStringIntMap.inverse
+
+ override def toString = {
+ s" rank: ${rank}" +
+ s" userFeatures: [${userFeatures.size}]" +
+ s"(${userFeatures.take(2).toList}...)" +
+ s" productModels: [${productModels.size}]" +
+ s"(${productModels.take(2).toList}...)" +
+ s" userStringIntMap: [${userStringIntMap.size}]" +
+ s"(${userStringIntMap.take(2).toString}...)]" +
+ s" itemStringIntMap: [${itemStringIntMap.size}]" +
+ s"(${itemStringIntMap.take(2).toString}...)]"
+ }
+}
+
+class ECommAlgorithm(val ap: ECommAlgorithmParams)
+ extends P2LAlgorithm[PreparedData, ECommModel, Query, PredictedResult] {
+
+ @transient lazy val logger = Logger[this.type]
+
+ def train(sc: SparkContext, data: PreparedData): ECommModel = {
+ require(!data.viewEvents.take(1).isEmpty,
+ s"viewEvents in PreparedData cannot be empty." +
+ " Please check if DataSource generates TrainingData" +
+ " and Preprator generates PreparedData correctly.")
+ require(!data.users.take(1).isEmpty,
+ s"users in PreparedData cannot be empty." +
+ " Please check if DataSource generates TrainingData" +
+ " and Preprator generates PreparedData correctly.")
+ require(!data.items.take(1).isEmpty,
+ s"items in PreparedData cannot be empty." +
+ " Please check if DataSource generates TrainingData" +
+ " and Preprator generates PreparedData correctly.")
+ // create User and item's String ID to integer index BiMap
+ val userStringIntMap = BiMap.stringInt(data.users.keys)
+ val itemStringIntMap = BiMap.stringInt(data.items.keys)
+
+ val mllibRatings: RDD[MLlibRating] = genMLlibRating(
+ userStringIntMap = userStringIntMap,
+ itemStringIntMap = itemStringIntMap,
+ data = data
+ )
+
+ // MLLib ALS cannot handle empty training data.
+ require(!mllibRatings.take(1).isEmpty,
+ s"mllibRatings cannot be empty." +
+ " Please check if your events contain valid user and item ID.")
+
+ // seed for MLlib ALS
+ val seed = ap.seed.getOrElse(System.nanoTime)
+
+ // use ALS to train feature vectors
+ val m = ALS.trainImplicit(
+ ratings = mllibRatings,
+ rank = ap.rank,
+ iterations = ap.numIterations,
+ lambda = ap.lambda,
+ blocks = -1,
+ alpha = 1.0,
+ seed = seed)
+
+ val userFeatures = m.userFeatures.collectAsMap.toMap
+
+ // convert ID to Int index
+ val items = data.items.map { case (id, item) =>
+ (itemStringIntMap(id), item)
+ }
+
+ // join item with the trained productFeatures
+ val productFeatures: Map[Int, (Item, Option[Array[Double]])] =
+ items.leftOuterJoin(m.productFeatures).collectAsMap.toMap
+
+ val popularCount = trainDefault(
+ userStringIntMap = userStringIntMap,
+ itemStringIntMap = itemStringIntMap,
+ data = data
+ )
+
+ val productModels: Map[Int, ProductModel] = productFeatures
+ .map { case (index, (item, features)) =>
+ val pm = ProductModel(
+ item = item,
+ features = features,
+ // NOTE: use getOrElse because popularCount may not contain all items.
+ count = popularCount.getOrElse(index, 0)
+ )
+ (index, pm)
+ }
+
+ new ECommModel(
+ rank = m.rank,
+ userFeatures = userFeatures,
+ productModels = productModels,
+ userStringIntMap = userStringIntMap,
+ itemStringIntMap = itemStringIntMap
+ )
+ }
+
+ /** Generate MLlibRating from PreparedData.
+ * You may customize this function if use different events or different aggregation method
+ */
+ def genMLlibRating(
+ userStringIntMap: BiMap[String, Int],
+ itemStringIntMap: BiMap[String, Int],
+ data: PreparedData): RDD[MLlibRating] = {
+
+ val mllibRatings = data.viewEvents
+ .map { r =>
+ // Convert user and item String IDs to Int index for MLlib
+ val uindex = userStringIntMap.getOrElse(r.user, -1)
+ val iindex = itemStringIntMap.getOrElse(r.item, -1)
+
+ if (uindex == -1)
+ logger.info(s"Couldn't convert nonexistent user ID ${r.user}"
+ + " to Int index.")
+
+ if (iindex == -1)
+ logger.info(s"Couldn't convert nonexistent item ID ${r.item}"
+ + " to Int index.")
+
+ ((uindex, iindex), 1)
+ }
+ .filter { case ((u, i), v) =>
+ // keep events with valid user and item index
+ (u != -1) && (i != -1)
+ }
+ .reduceByKey(_ + _) // aggregate all view events of same user-item pair
+ .map { case ((u, i), v) =>
+ // MLlibRating requires integer index for user and item
+ MLlibRating(u, i, v)
+ }
+ .cache()
+
+ mllibRatings
+ }
+
+ /** Train default model.
+ * You may customize this function if use different events or
+ * need different ways to count "popular" score or return default score for item.
+ */
+ def trainDefault(
+ userStringIntMap: BiMap[String, Int],
+ itemStringIntMap: BiMap[String, Int],
+ data: PreparedData): Map[Int, Int] = {
+ // count number of buys
+ // (item index, count)
+ val buyCountsRDD: RDD[(Int, Int)] = data.buyEvents
+ .map { r =>
+ // Convert user and item String IDs to Int index
+ val uindex = userStringIntMap.getOrElse(r.user, -1)
+ val iindex = itemStringIntMap.getOrElse(r.item, -1)
+
+ if (uindex == -1)
+ logger.info(s"Couldn't convert nonexistent user ID ${r.user}"
+ + " to Int index.")
+
+ if (iindex == -1)
+ logger.info(s"Couldn't convert nonexistent item ID ${r.item}"
+ + " to Int index.")
+
+ (uindex, iindex, 1)
+ }
+ .filter { case (u, i, v) =>
+ // keep events with valid user and item index
+ (u != -1) && (i != -1)
+ }
+ .map { case (u, i, v) => (i, 1) } // key is item
+ .reduceByKey{ case (a, b) => a + b } // count number of items occurrence
+
+ buyCountsRDD.collectAsMap.toMap
+ }
+
+ def predict(model: ECommModel, query: Query): PredictedResult = {
+
+ val userFeatures = model.userFeatures
+ val productModels = model.productModels
+
+ // convert whiteList's string ID to integer index
+ val whiteList: Option[Set[Int]] = query.whiteList.map( set =>
+ set.flatMap(model.itemStringIntMap.get(_))
+ )
+
+ val finalBlackList: Set[Int] = genBlackList(query = query)
+ // convert seen Items list from String ID to interger Index
+ .flatMap(x => model.itemStringIntMap.get(x))
+
+ val userFeature: Option[Array[Double]] =
+ model.userStringIntMap.get(query.user).flatMap { userIndex =>
+ userFeatures.get(userIndex)
+ }
+
+ val topScores: Array[(Int, Double)] = if (userFeature.isDefined) {
+ // the user has feature vector
+ predictKnownUser(
+ userFeature = userFeature.get,
+ productModels = productModels,
+ query = query,
+ whiteList = whiteList,
+ blackList = finalBlackList
+ )
+ } else {
+ // the user doesn't have feature vector.
+ // For example, new user is created after model is trained.
+ logger.info(s"No userFeature found for user ${query.user}.")
+
+ // check if the user has recent events on some items
+ val recentItems: Set[String] = getRecentItems(query)
+ val recentList: Set[Int] = recentItems.flatMap (x =>
+ model.itemStringIntMap.get(x))
+
+ val recentFeatures: Vector[Array[Double]] = recentList.toVector
+ // productModels may not contain the requested item
+ .map { i =>
+ productModels.get(i).flatMap { pm => pm.features }
+ }.flatten
+
+ if (recentFeatures.isEmpty) {
+ logger.info(s"No features vector for recent items ${recentItems}.")
+ predictDefault(
+ productModels = productModels,
+ query = query,
+ whiteList = whiteList,
+ blackList = finalBlackList
+ )
+ } else {
+ predictSimilar(
+ recentFeatures = recentFeatures,
+ productModels = productModels,
+ query = query,
+ whiteList = whiteList,
+ blackList = finalBlackList
+ )
+ }
+ }
+
+ val itemScores = topScores.map { case (i, s) =>
+ new ItemScore(
+ // convert item int index back to string ID
+ item = model.itemIntStringMap(i),
+ score = s
+ )
+ }
+
+ new PredictedResult(itemScores)
+ }
+
+ /** Generate final blackList based on other constraints */
+ def genBlackList(query: Query): Set[String] = {
+ // if unseenOnly is True, get all seen items
+ val seenItems: Set[String] = if (ap.unseenOnly) {
+
+ // get all user item events which are considered as "seen" events
+ val seenEvents: Iterator[Event] = try {
+ LEventStore.findByEntity(
+ appName = ap.appName,
+ entityType = "user",
+ entityId = query.user,
+ eventNames = Some(ap.seenEvents),
+ targetEntityType = Some(Some("item")),
+ // set time limit to avoid super long DB access
+ timeout = Duration(200, "millis")
+ )
+ } catch {
+ case e: scala.concurrent.TimeoutException =>
+ logger.error(s"Timeout when read seen events." +
+ s" Empty list is used. ${e}")
+ Iterator[Event]()
+ case e: Exception =>
+ logger.error(s"Error when read seen events: ${e}")
+ throw e
+ }
+
+ seenEvents.map { event =>
+ try {
+ event.targetEntityId.get
+ } catch {
+ case e => {
+ logger.error(s"Can't get targetEntityId of event ${event}.")
+ throw e
+ }
+ }
+ }.toSet
+ } else {
+ Set[String]()
+ }
+
+ // get the latest constraint unavailableItems $set event
+ val unavailableItems: Set[String] = try {
+ val constr = LEventStore.findByEntity(
+ appName = ap.appName,
+ entityType = "constraint",
+ entityId = "unavailableItems",
+ eventNames = Some(Seq("$set")),
+ limit = Some(1),
+ latest = true,
+ timeout = Duration(200, "millis")
+ )
+ if (constr.hasNext) {
+ constr.next.properties.get[Set[String]]("items")
+ } else {
+ Set[String]()
+ }
+ } catch {
+ case e: scala.concurrent.TimeoutException =>
+ logger.error(s"Timeout when read set unavailableItems event." +
+ s" Empty list is used. ${e}")
+ Set[String]()
+ case e: Exception =>
+ logger.error(s"Error when read set unavailableItems event: ${e}")
+ throw e
+ }
+
+ // combine query's blackList,seenItems and unavailableItems
+ // into final blackList.
+ query.blackList.getOrElse(Set[String]()) ++ seenItems ++ unavailableItems
+ }
+
+ /** Get recent events of the user on items for recommending similar items */
+ def getRecentItems(query: Query): Set[String] = {
+ // get latest 10 user view item events
+ val recentEvents = try {
+ LEventStore.findByEntity(
+ appName = ap.appName,
+ // entityType and entityId is specified for fast lookup
+ entityType = "user",
+ entityId = query.user,
+ eventNames = Some(ap.similarEvents),
+ targetEntityType = Some(Some("item")),
+ limit = Some(10),
+ latest = true,
+ // set time limit to avoid super long DB access
+ timeout = Duration(200, "millis")
+ )
+ } catch {
+ case e: scala.concurrent.TimeoutException =>
+ logger.error(s"Timeout when read recent events." +
+ s" Empty list is used. ${e}")
+ Iterator[Event]()
+ case e: Exception =>
+ logger.error(s"Error when read recent events: ${e}")
+ throw e
+ }
+
+ val recentItems: Set[String] = recentEvents.map { event =>
+ try {
+ event.targetEntityId.get
+ } catch {
+ case e => {
+ logger.error("Can't get targetEntityId of event ${event}.")
+ throw e
+ }
+ }
+ }.toSet
+
+ recentItems
+ }
+
+ /** Prediction for user with known feature vector */
+ def predictKnownUser(
+ userFeature: Array[Double],
+ productModels: Map[Int, ProductModel],
+ query: Query,
+ whiteList: Option[Set[Int]],
+ blackList: Set[Int]
+ ): Array[(Int, Double)] = {
+ val indexScores: Map[Int, Double] = productModels.par // convert to parallel collection
+ .filter { case (i, pm) =>
+ pm.features.isDefined &&
+ isCandidateItem(
+ i = i,
+ item = pm.item,
+ categories = query.categories,
+ whiteList = whiteList,
+ blackList = blackList
+ )
+ }
+ .map { case (i, pm) =>
+ // NOTE: features must be defined, so can call .get
+ val s = dotProduct(userFeature, pm.features.get)
+ // may customize here to further adjust score
+ (i, s)
+ }
+ .filter(_._2 > 0) // only keep items with score > 0
+ .seq // convert back to sequential collection
+
+ val ord = Ordering.by[(Int, Double), Double](_._2).reverse
+ val topScores = getTopN(indexScores, query.num)(ord).toArray
+
+ topScores
+ }
+
+ /** Default prediction when know nothing about the user */
+ def predictDefault(
+ productModels: Map[Int, ProductModel],
+ query: Query,
+ whiteList: Option[Set[Int]],
+ blackList: Set[Int]
+ ): Array[(Int, Double)] = {
+ val indexScores: Map[Int, Double] = productModels.par // convert back to sequential collection
+ .filter { case (i, pm) =>
+ isCandidateItem(
+ i = i,
+ item = pm.item,
+ categories = query.categories,
+ whiteList = whiteList,
+ blackList = blackList
+ )
+ }
+ .map { case (i, pm) =>
+ // may customize here to further adjust score
+ (i, pm.count.toDouble)
+ }
+ .seq
+
+ val ord = Ordering.by[(Int, Double), Double](_._2).reverse
+ val topScores = getTopN(indexScores, query.num)(ord).toArray
+
+ topScores
+ }
+
+ /** Return top similar items based on items user recently has action on */
+ def predictSimilar(
+ recentFeatures: Vector[Array[Double]],
+ productModels: Map[Int, ProductModel],
+ query: Query,
+ whiteList: Option[Set[Int]],
+ blackList: Set[Int]
+ ): Array[(Int, Double)] = {
+ val indexScores: Map[Int, Double] = productModels.par // convert to parallel collection
+ .filter { case (i, pm) =>
+ pm.features.isDefined &&
+ isCandidateItem(
+ i = i,
+ item = pm.item,
+ categories = query.categories,
+ whiteList = whiteList,
+ blackList = blackList
+ )
+ }
+ .map { case (i, pm) =>
+ val s = recentFeatures.map{ rf =>
+ // pm.features must be defined because of filter logic above
+ cosine(rf, pm.features.get)
+ }.reduce(_ + _)
+ // may customize here to further adjust score
+ (i, s)
+ }
+ .filter(_._2 > 0) // keep items with score > 0
+ .seq // convert back to sequential collection
+
+ val ord = Ordering.by[(Int, Double), Double](_._2).reverse
+ val topScores = getTopN(indexScores, query.num)(ord).toArray
+
+ topScores
+ }
+
+ private
+ def getTopN[T](s: Iterable[T], n: Int)(implicit ord: Ordering[T]): Seq[T] = {
+
+ val q = PriorityQueue()
+
+ for (x <- s) {
+ if (q.size < n)
+ q.enqueue(x)
+ else {
+ // q is full
+ if (ord.compare(x, q.head) < 0) {
+ q.dequeue()
+ q.enqueue(x)
+ }
+ }
+ }
+
+ q.dequeueAll.toSeq.reverse
+ }
+
+ private
+ def dotProduct(v1: Array[Double], v2: Array[Double]): Double = {
+ val size = v1.size
+ var i = 0
+ var d: Double = 0
+ while (i < size) {
+ d += v1(i) * v2(i)
+ i += 1
+ }
+ d
+ }
+
+ private
+ def cosine(v1: Array[Double], v2: Array[Double]): Double = {
+ val size = v1.size
+ var i = 0
+ var n1: Double = 0
+ var n2: Double = 0
+ var d: Double = 0
+ while (i < size) {
+ n1 += v1(i) * v1(i)
+ n2 += v2(i) * v2(i)
+ d += v1(i) * v2(i)
+ i += 1
+ }
+ val n1n2 = (math.sqrt(n1) * math.sqrt(n2))
+ if (n1n2 == 0) 0 else (d / n1n2)
+ }
+
+ private
+ def isCandidateItem(
+ i: Int,
+ item: Item,
+ categories: Option[Set[String]],
+ whiteList: Option[Set[Int]],
+ blackList: Set[Int]
+ ): Boolean = {
+ // can add other custom filtering here
+ whiteList.map(_.contains(i)).getOrElse(true) &&
+ !blackList.contains(i) &&
+ // filter categories
+ categories.map { cat =>
+ item.categories.map { itemCat =>
+ // keep this item if has ovelap categories with the query
+ !(itemCat.toSet.intersect(cat).isEmpty)
+ }.getOrElse(false) // discard this item if it has no categories
+ }.getOrElse(true)
+
+ }
+
+}
diff --git a/src/main/scala/Engine.scala b/src/main/scala/Engine.scala
index 42ec4d4..47d9896 100644
--- a/src/main/scala/Engine.scala
+++ b/src/main/scala/Engine.scala
@@ -25,7 +25,7 @@
new Engine(
classOf[DataSource],
classOf[Preparator],
- Map("als" -> classOf[ALSAlgorithm]),
+ Map("ecomm" -> classOf[ECommAlgorithm]),
classOf[Serving])
}
}
diff --git a/src/main/scala/Preparator.scala b/src/main/scala/Preparator.scala
index 4dd45cf..ff82f80 100644
--- a/src/main/scala/Preparator.scala
+++ b/src/main/scala/Preparator.scala
@@ -13,12 +13,14 @@
new PreparedData(
users = trainingData.users,
items = trainingData.items,
- viewEvents = trainingData.viewEvents)
+ viewEvents = trainingData.viewEvents,
+ buyEvents = trainingData.buyEvents)
}
}
class PreparedData(
val users: RDD[(String, User)],
val items: RDD[(String, Item)],
- val viewEvents: RDD[ViewEvent]
+ val viewEvents: RDD[ViewEvent],
+ val buyEvents: RDD[BuyEvent]
) extends Serializable
diff --git a/src/test/scala/ECommAlgorithmTest.scala b/src/test/scala/ECommAlgorithmTest.scala
new file mode 100644
index 0000000..5ddac96
--- /dev/null
+++ b/src/test/scala/ECommAlgorithmTest.scala
@@ -0,0 +1,172 @@
+package org.template.ecommercerecommendation
+
+import org.scalatest.FlatSpec
+import org.scalatest.Matchers
+
+import io.prediction.data.storage.BiMap
+
+import org.apache.spark.mllib.recommendation.{Rating => MLlibRating}
+
+class ECommAlgorithmTest
+ extends FlatSpec with EngineTestSparkContext with Matchers {
+
+ val algorithmParams = new ECommAlgorithmParams(
+ appName = "test-app",
+ unseenOnly = true,
+ seenEvents = List("buy", "view"),
+ similarEvents = List("view"),
+ rank = 10,
+ numIterations = 20,
+ lambda = 0.01,
+ seed = Some(3)
+ )
+ val algorithm = new ECommAlgorithm(algorithmParams)
+
+ val userStringIntMap = BiMap(Map("u0" -> 0, "u1" -> 1))
+
+ val itemStringIntMap = BiMap(Map("i0" -> 0, "i1" -> 1, "i2" -> 2))
+
+ val users = Map("u0" -> User(), "u1" -> User())
+
+
+ val i0 = Item(categories = Some(List("c0", "c1")))
+ val i1 = Item(categories = None)
+ val i2 = Item(categories = Some(List("c0", "c2")))
+
+ val items = Map(
+ "i0" -> i0,
+ "i1" -> i1,
+ "i2" -> i2
+ )
+
+ val view = Seq(
+ ViewEvent("u0", "i0", 1000010),
+ ViewEvent("u0", "i1", 1000020),
+ ViewEvent("u0", "i1", 1000020),
+ ViewEvent("u1", "i1", 1000030),
+ ViewEvent("u1", "i2", 1000040)
+ )
+
+ val buy = Seq(
+ BuyEvent("u0", "i0", 1000020),
+ BuyEvent("u0", "i1", 1000030),
+ BuyEvent("u1", "i1", 1000040)
+ )
+
+
+ "ECommAlgorithm.genMLlibRating()" should "create RDD[MLlibRating] correctly" in {
+
+ val preparedData = new PreparedData(
+ users = sc.parallelize(users.toSeq),
+ items = sc.parallelize(items.toSeq),
+ viewEvents = sc.parallelize(view.toSeq),
+ buyEvents = sc.parallelize(buy.toSeq)
+ )
+
+ val mllibRatings = algorithm.genMLlibRating(
+ userStringIntMap = userStringIntMap,
+ itemStringIntMap = itemStringIntMap,
+ data = preparedData
+ )
+
+ val expected = Seq(
+ MLlibRating(0, 0, 1),
+ MLlibRating(0, 1, 2),
+ MLlibRating(1, 1, 1),
+ MLlibRating(1, 2, 1)
+ )
+
+ mllibRatings.collect should contain theSameElementsAs expected
+ }
+
+ "ECommAlgorithm.trainDefault()" should "return popular count for each item" in {
+ val preparedData = new PreparedData(
+ users = sc.parallelize(users.toSeq),
+ items = sc.parallelize(items.toSeq),
+ viewEvents = sc.parallelize(view.toSeq),
+ buyEvents = sc.parallelize(buy.toSeq)
+ )
+
+ val popCount = algorithm.trainDefault(
+ userStringIntMap = userStringIntMap,
+ itemStringIntMap = itemStringIntMap,
+ data = preparedData
+ )
+
+ val expected = Map(0 -> 1, 1 -> 2)
+
+ popCount should contain theSameElementsAs expected
+ }
+
+ "ECommAlgorithm.predictKnownuser()" should "return top item" in {
+
+ val top = algorithm.predictKnownUser(
+ userFeature = Array(1.0, 2.0, 0.5),
+ productModels = Map(
+ 0 -> ProductModel(i0, Some(Array(2.0, 1.0, 2.0)), 3),
+ 1 -> ProductModel(i1, Some(Array(3.0, 0.5, 1.0)), 4),
+ 2 -> ProductModel(i2, Some(Array(1.0, 3.0, 1.0)), 1)
+ ),
+ query = Query(
+ user = "u0",
+ num = 5,
+ categories = Some(Set("c0")),
+ whiteList = None,
+ blackList = None),
+ whiteList = None,
+ blackList = Set()
+ )
+
+ val expected = Array((2, 7.5), (0, 5.0))
+ top shouldBe expected
+ }
+
+ "ECommAlgorithm.predictDefault()" should "return top item" in {
+
+ val top = algorithm.predictDefault(
+ productModels = Map(
+ 0 -> ProductModel(i0, Some(Array(2.0, 1.0, 2.0)), 3),
+ 1 -> ProductModel(i1, Some(Array(3.0, 0.5, 1.0)), 4),
+ 2 -> ProductModel(i2, Some(Array(1.0, 3.0, 1.0)), 1)
+ ),
+ query = Query(
+ user = "u0",
+ num = 5,
+ categories = None,
+ whiteList = None,
+ blackList = None),
+ whiteList = None,
+ blackList = Set()
+ )
+
+ val expected = Array((1, 4.0), (0, 3.0), (2, 1.0))
+ top shouldBe expected
+ }
+
+ "ECommAlgorithm.predictSimilar()" should "return top item" in {
+
+ val top = algorithm.predictSimilar(
+ recentFeatures = Vector(Array(1.0, 2.0, 0.5), Array(1.0, 0.2, 0.3)),
+ productModels = Map(
+ 0 -> ProductModel(i0, Some(Array(2.0, 1.0, 2.0)), 3),
+ 1 -> ProductModel(i1, Some(Array(3.0, 0.5, 1.0)), 4),
+ 2 -> ProductModel(i2, Some(Array(1.0, 3.0, 1.0)), 1)
+ ),
+ query = Query(
+ user = "u0",
+ num = 5,
+ categories = Some(Set("c0")),
+ whiteList = None,
+ blackList = None),
+ whiteList = None,
+ blackList = Set()
+ )
+
+ val expected = Array((0, 1.605), (2, 1.525))
+
+ top(0)._1 should be (expected(0)._1)
+ top(1)._1 should be (expected(1)._1)
+ top(0)._2 should be (expected(0)._2 plusOrMinus 0.001)
+ top(1)._2 should be (expected(1)._2 plusOrMinus 0.001)
+ }
+}
diff --git a/src/test/scala/EngineTestSparkContext.scala b/src/test/scala/EngineTestSparkContext.scala
new file mode 100644
index 0000000..2931403
--- /dev/null
+++ b/src/test/scala/EngineTestSparkContext.scala
@@ -0,0 +1,36 @@
+package org.template.ecommercerecommendation
+
+import org.apache.spark.{SparkConf, SparkContext}
+import org.scalatest.{BeforeAndAfterAll, Suite}
+
+trait EngineTestSparkContext extends BeforeAndAfterAll {
+ self: Suite =>
+ @transient private var _sc: SparkContext = _
+
+ def sc: SparkContext = _sc
+
+ var conf = new SparkConf(false)
+
+ override def beforeAll() {
+ _sc = new SparkContext("local", "test", conf)
+ super.beforeAll()
+ }
+
+ override def afterAll() {
+ LocalSparkContext.stop(_sc)
+
+ _sc = null
+ super.afterAll()
+ }
+}
+
+object LocalSparkContext {
+ def stop(sc: SparkContext) {
+ if (sc != null) {
+ sc.stop()
+ }
+ // To avoid Akka rebinding to the same port, since it doesn't unbind
+ // immediately on shutdown
+ System.clearProperty("spark.driver.port")
+ }
+}
diff --git a/src/test/scala/PreparatorTest.scala b/src/test/scala/PreparatorTest.scala
new file mode 100644
index 0000000..ec421b3
--- /dev/null
+++ b/src/test/scala/PreparatorTest.scala
@@ -0,0 +1,49 @@
+package org.template.ecommercerecommendation
+
+import org.scalatest.FlatSpec
+import org.scalatest.Matchers
+
+class PreparatorTest
+ extends FlatSpec with EngineTestSparkContext with Matchers {
+
+ val preparator = new Preparator()
+ val users = Map(
+ "u0" -> User(),
+ "u1" -> User()
+ )
+
+ val items = Map(
+ "i0" -> Item(categories = Some(List("c0", "c1"))),
+ "i1" -> Item(categories = None)
+ )
+
+ val view = Seq(
+ ViewEvent("u0", "i0", 1000010),
+ ViewEvent("u0", "i1", 1000020),
+ ViewEvent("u1", "i1", 1000030)
+ )
+
+ val buy = Seq(
+ BuyEvent("u0", "i0", 1000020),
+ BuyEvent("u0", "i1", 1000030),
+ BuyEvent("u1", "i1", 1000040)
+ )
+
+ // simple test for demonstration purpose
+ "Preparator" should "prepare PreparedData" in {
+
+ val trainingData = new TrainingData(
+ users = sc.parallelize(users.toSeq),
+ items = sc.parallelize(items.toSeq),
+ viewEvents = sc.parallelize(view.toSeq),
+ buyEvents = sc.parallelize(buy.toSeq)
+ )
+
+ val preparedData = preparator.prepare(sc, trainingData)
+
+ preparedData.users.collect should contain theSameElementsAs users
+ preparedData.items.collect should contain theSameElementsAs items
+ preparedData.viewEvents.collect should contain theSameElementsAs view
+ preparedData.buyEvents.collect should contain theSameElementsAs buy
+ }
+}