Merge branch 'develop'
diff --git a/README.md b/README.md
index 507ecf0..23eea3a 100644
--- a/README.md
+++ b/README.md
@@ -6,6 +6,16 @@
 
 ## Versions
 
+### v0.4.0
+
+- Change from ALSAlgorithm.scala to ECommAlgorithm.scala
+
+  * return popular bought items when no information is found for the user.
+  * add "similarEvents" parameter for configuration what user-to-item events are used for finding similar items
+  * re-structure the Algorithm code for easier customization and testing
+
+- add some unit tests for testing code that may be customized
+
 ### v0.3.1
 
 - use INVALID_APP_NAME as default appName in engine.json
@@ -112,7 +122,11 @@
 import some view events and try to get recommendation for x1 again.
 
 ```
-curl -i -X POST http://localhost:7070/events.json?accessKey=zPkr6sBwQoBwBjVHK2hsF9u26L38ARSe19QzkdYentuomCtYSuH0vXP5fq7advo4 \
+accessKey=<YOUR_ACCESS_KEY>
+```
+
+```
+curl -i -X POST http://localhost:7070/events.json?accessKey=$accessKey \
 -H "Content-Type: application/json" \
 -d '{
   "event" : "view",
@@ -123,7 +137,7 @@
   "eventTime" : "2015-02-17T02:11:21.934Z"
 }'
 
-curl -i -X POST http://localhost:7070/events.json?accessKey=zPkr6sBwQoBwBjVHK2hsF9u26L38ARSe19QzkdYentuomCtYSuH0vXP5fq7advo4 \
+curl -i -X POST http://localhost:7070/events.json?accessKey=$accessKey \
 -H "Content-Type: application/json" \
 -d '{
   "event" : "view",
@@ -141,7 +155,7 @@
 Set the following items as unavailable (need to specify complete list each time when this list is changed):
 
 ```
-curl -i -X POST http://localhost:7070/events.json?accessKey=zPkr6sBwQoBwBjVHK2hsF9u26L38ARSe19QzkdYentuomCtYSuH0vXP5fq7advo4 \
+curl -i -X POST http://localhost:7070/events.json?accessKey=$accessKey \
 -H "Content-Type: application/json" \
 -d '{
   "event" : "$set",
@@ -157,7 +171,7 @@
 Set empty list when no more items unavailable:
 
 ```
-curl -i -X POST http://localhost:7070/events.json?accessKey=zPkr6sBwQoBwBjVHK2hsF9u26L38ARSe19QzkdYentuomCtYSuH0vXP5fq7advo4 \
+curl -i -X POST http://localhost:7070/events.json?accessKey=$accessKey \
 -H "Content-Type: application/json" \
 -d '{
   "event" : "$set",
diff --git a/build.sbt b/build.sbt
index 6de8bac..896f6db 100644
--- a/build.sbt
+++ b/build.sbt
@@ -6,7 +6,10 @@
 
 organization := "io.prediction"
 
+parallelExecution in Test := false
+
 libraryDependencies ++= Seq(
   "io.prediction"    %% "core"          % pioVersion.value  % "provided",
   "org.apache.spark" %% "spark-core"    % "1.3.0" % "provided",
-  "org.apache.spark" %% "spark-mllib"   % "1.3.0" % "provided")
+  "org.apache.spark" %% "spark-mllib"   % "1.3.0" % "provided",
+  "org.scalatest"    %% "scalatest"     % "2.2.1" % "test")
diff --git a/engine.json b/engine.json
index 1da14e1..0ea26a0 100644
--- a/engine.json
+++ b/engine.json
@@ -9,11 +9,12 @@
   },
   "algorithms": [
     {
-      "name": "als",
+      "name": "ecomm",
       "params": {
         "appName": "INVALID_APP_NAME",
         "unseenOnly": true,
         "seenEvents": ["buy", "view"],
+        "similarEvents": ["view"],
         "rank": 10,
         "numIterations" : 20,
         "lambda": 0.01,
diff --git a/src/main/scala/ALSAlgorithm.scala b/src/main/scala/ALSAlgorithm.scala
deleted file mode 100644
index 18a6240..0000000
--- a/src/main/scala/ALSAlgorithm.scala
+++ /dev/null
@@ -1,439 +0,0 @@
-package org.template.ecommercerecommendation
-
-import io.prediction.controller.P2LAlgorithm
-import io.prediction.controller.Params
-import io.prediction.data.storage.BiMap
-import io.prediction.data.storage.Event
-import io.prediction.data.store.LEventStore
-
-import org.apache.spark.SparkContext
-import org.apache.spark.SparkContext._
-import org.apache.spark.mllib.recommendation.ALS
-import org.apache.spark.mllib.recommendation.{Rating => MLlibRating}
-
-import grizzled.slf4j.Logger
-
-import scala.collection.mutable.PriorityQueue
-import scala.concurrent.duration.Duration
-import scala.concurrent.ExecutionContext.Implicits.global
-
-case class ALSAlgorithmParams(
-  appName: String,
-  unseenOnly: Boolean,
-  seenEvents: List[String],
-  rank: Int,
-  numIterations: Int,
-  lambda: Double,
-  seed: Option[Long]
-) extends Params
-
-class ALSModel(
-  val rank: Int,
-  val userFeatures: Map[Int, Array[Double]],
-  val productFeatures: Map[Int, (Item, Option[Array[Double]])],
-  val userStringIntMap: BiMap[String, Int],
-  val itemStringIntMap: BiMap[String, Int]
-) extends Serializable {
-
-  @transient lazy val itemIntStringMap = itemStringIntMap.inverse
-
-  override def toString = {
-    s" rank: ${rank}" +
-    s" userFeatures: [${userFeatures.size}]" +
-    s"(${userFeatures.take(2).toList}...)" +
-    s" productFeatures: [${productFeatures.size}]" +
-    s"(${productFeatures.take(2).toList}...)" +
-    s" userStringIntMap: [${userStringIntMap.size}]" +
-    s"(${userStringIntMap.take(2).toString}...)]" +
-    s" itemStringIntMap: [${itemStringIntMap.size}]" +
-    s"(${itemStringIntMap.take(2).toString}...)]"
-  }
-}
-
-/**
-  * Use ALS to build item x feature matrix
-  */
-class ALSAlgorithm(val ap: ALSAlgorithmParams)
-  extends P2LAlgorithm[PreparedData, ALSModel, Query, PredictedResult] {
-
-  @transient lazy val logger = Logger[this.type]
-
-  def train(sc: SparkContext, data: PreparedData): ALSModel = {
-    require(!data.viewEvents.take(1).isEmpty,
-      s"viewEvents in PreparedData cannot be empty." +
-      " Please check if DataSource generates TrainingData" +
-      " and Preprator generates PreparedData correctly.")
-    require(!data.users.take(1).isEmpty,
-      s"users in PreparedData cannot be empty." +
-      " Please check if DataSource generates TrainingData" +
-      " and Preprator generates PreparedData correctly.")
-    require(!data.items.take(1).isEmpty,
-      s"items in PreparedData cannot be empty." +
-      " Please check if DataSource generates TrainingData" +
-      " and Preprator generates PreparedData correctly.")
-    // create User and item's String ID to integer index BiMap
-    val userStringIntMap = BiMap.stringInt(data.users.keys)
-    val itemStringIntMap = BiMap.stringInt(data.items.keys)
-
-    val mllibRatings = data.viewEvents
-      .map { r =>
-        // Convert user and item String IDs to Int index for MLlib
-        val uindex = userStringIntMap.getOrElse(r.user, -1)
-        val iindex = itemStringIntMap.getOrElse(r.item, -1)
-
-        if (uindex == -1)
-          logger.info(s"Couldn't convert nonexistent user ID ${r.user}"
-            + " to Int index.")
-
-        if (iindex == -1)
-          logger.info(s"Couldn't convert nonexistent item ID ${r.item}"
-            + " to Int index.")
-
-        ((uindex, iindex), 1)
-      }.filter { case ((u, i), v) =>
-        // keep events with valid user and item index
-        (u != -1) && (i != -1)
-      }
-      .reduceByKey(_ + _) // aggregate all view events of same user-item pair
-      .map { case ((u, i), v) =>
-        // MLlibRating requires integer index for user and item
-        MLlibRating(u, i, v)
-      }.cache()
-
-    // MLLib ALS cannot handle empty training data.
-    require(!mllibRatings.take(1).isEmpty,
-      s"mllibRatings cannot be empty." +
-      " Please check if your events contain valid user and item ID.")
-
-    // seed for MLlib ALS
-    val seed = ap.seed.getOrElse(System.nanoTime)
-
-    val m = ALS.trainImplicit(
-      ratings = mllibRatings,
-      rank = ap.rank,
-      iterations = ap.numIterations,
-      lambda = ap.lambda,
-      blocks = -1,
-      alpha = 1.0,
-      seed = seed)
-
-    val userFeatures = m.userFeatures.collectAsMap.toMap
-
-    // convert ID to Int index
-    val items = data.items.map { case (id, item) =>
-      (itemStringIntMap(id), item)
-    }
-
-    // join item with the trained productFeatures
-    val productFeatures = items.leftOuterJoin(m.productFeatures)
-      .collectAsMap.toMap
-
-    new ALSModel(
-      rank = m.rank,
-      userFeatures = userFeatures,
-      productFeatures = productFeatures,
-      userStringIntMap = userStringIntMap,
-      itemStringIntMap = itemStringIntMap
-    )
-  }
-
-  def predict(model: ALSModel, query: Query): PredictedResult = {
-
-    val userFeatures = model.userFeatures
-    val productFeatures = model.productFeatures
-
-    // convert whiteList's string ID to integer index
-    val whiteList: Option[Set[Int]] = query.whiteList.map( set =>
-      set.map(model.itemStringIntMap.get(_)).flatten
-    )
-
-    val blackList: Set[String] = query.blackList.getOrElse(Set[String]())
-
-    // if unseenOnly is True, get all seen items
-    val seenItems: Set[String] = if (ap.unseenOnly) {
-
-      // get all user item events which are considered as "seen" events
-      val seenEvents: Iterator[Event] = try {
-        LEventStore.findByEntity(
-          appName = ap.appName,
-          entityType = "user",
-          entityId = query.user,
-          eventNames = Some(ap.seenEvents),
-          targetEntityType = Some(Some("item")),
-          // set time limit to avoid super long DB access
-          timeout = Duration(200, "millis")
-        )
-      } catch {
-        case e: scala.concurrent.TimeoutException =>
-          logger.error(s"Timeout when read seen events." +
-            s" Empty list is used. ${e}")
-          Iterator[Event]()
-        case e: Exception =>
-          logger.error(s"Error when read seen events: ${e}")
-          throw e
-      }
-
-      seenEvents.map { event =>
-        try {
-          event.targetEntityId.get
-        } catch {
-          case e => {
-            logger.error(s"Can't get targetEntityId of event ${event}.")
-            throw e
-          }
-        }
-      }.toSet
-    } else {
-      Set[String]()
-    }
-
-    // get the latest constraint unavailableItems $set event
-    val unavailableItems: Set[String] = try {
-      val constr = LEventStore.findByEntity(
-        appName = ap.appName,
-        entityType = "constraint",
-        entityId = "unavailableItems",
-        eventNames = Some(Seq("$set")),
-        limit = Some(1),
-        latest = true,
-        timeout = Duration(200, "millis")
-      )
-      if (constr.hasNext) {
-        constr.next.properties.get[Set[String]]("items")
-      } else {
-        Set[String]()
-      }
-    } catch {
-      case e: scala.concurrent.TimeoutException =>
-        logger.error(s"Timeout when read set unavailableItems event." +
-          s" Empty list is used. ${e}")
-        Set[String]()
-      case e: Exception =>
-        logger.error(s"Error when read set unavailableItems event: ${e}")
-        throw e
-    }
-
-    // combine query's blackList,seenItems and unavailableItems
-    // into final blackList.
-    // convert seen Items list from String ID to interger Index
-    val finalBlackList: Set[Int] = (blackList ++ seenItems ++
-      unavailableItems).map( x => model.itemStringIntMap.get(x)).flatten
-
-    val userFeature =
-      model.userStringIntMap.get(query.user).map { userIndex =>
-        userFeatures.get(userIndex)
-      }
-      // flatten Option[Option[Array[Double]]] to Option[Array[Double]]
-      .flatten
-
-    val topScores = if (userFeature.isDefined) {
-      // the user has feature vector
-      val uf = userFeature.get
-      val indexScores: Map[Int, Double] =
-        productFeatures.par // convert to parallel collection
-          .filter { case (i, (item, feature)) =>
-            feature.isDefined &&
-            isCandidateItem(
-              i = i,
-              item = item,
-              categories = query.categories,
-              whiteList = whiteList,
-              blackList = finalBlackList
-            )
-          }
-          .map { case (i, (item, feature)) =>
-            // NOTE: feature must be defined, so can call .get
-            val s = dotProduct(uf, feature.get)
-            // Can adjust score here
-            (i, s)
-          }
-          .filter(_._2 > 0) // only keep items with score > 0
-          .seq // convert back to sequential collection
-
-      val ord = Ordering.by[(Int, Double), Double](_._2).reverse
-      val topScores = getTopN(indexScores, query.num)(ord).toArray
-
-      topScores
-
-    } else {
-      // the user doesn't have feature vector.
-      // For example, new user is created after model is trained.
-      logger.info(s"No userFeature found for user ${query.user}.")
-      predictNewUser(
-        model = model,
-        query = query,
-        whiteList = whiteList,
-        blackList = finalBlackList
-      )
-
-    }
-
-    val itemScores = topScores.map { case (i, s) =>
-      new ItemScore(
-        // convert item int index back to string ID
-        item = model.itemIntStringMap(i),
-        score = s
-      )
-    }
-
-    new PredictedResult(itemScores)
-  }
-
-  /** Get recently viewed item of the user and return top similar items */
-  private
-  def predictNewUser(
-    model: ALSModel,
-    query: Query,
-    whiteList: Option[Set[Int]],
-    blackList: Set[Int]): Array[(Int, Double)] = {
-
-    val userFeatures = model.userFeatures
-    val productFeatures = model.productFeatures
-
-    // get latest 10 user view item events
-    val recentEvents = try {
-      LEventStore.findByEntity(
-        appName = ap.appName,
-        // entityType and entityId is specified for fast lookup
-        entityType = "user",
-        entityId = query.user,
-        eventNames = Some(Seq("view")),
-        targetEntityType = Some(Some("item")),
-        limit = Some(10),
-        latest = true,
-        // set time limit to avoid super long DB access
-        timeout = Duration(200, "millis")
-      )
-    } catch {
-      case e: scala.concurrent.TimeoutException =>
-        logger.error(s"Timeout when read recent events." +
-          s" Empty list is used. ${e}")
-        Iterator[Event]()
-      case e: Exception =>
-        logger.error(s"Error when read recent events: ${e}")
-        throw e
-    }
-
-    val recentItems: Set[String] = recentEvents.map { event =>
-      try {
-        event.targetEntityId.get
-      } catch {
-        case e => {
-          logger.error("Can't get targetEntityId of event ${event}.")
-          throw e
-        }
-      }
-    }.toSet
-
-    val recentList: Set[Int] = recentItems.map (x =>
-      model.itemStringIntMap.get(x)).flatten
-
-    val recentFeatures: Vector[Array[Double]] = recentList.toVector
-      // productFeatures may not contain the requested item
-      .map { i =>
-        productFeatures.get(i).map { case (item, f) => f }.flatten
-      }.flatten
-
-    val indexScores: Map[Int, Double] = if (recentFeatures.isEmpty) {
-      logger.info(s"No productFeatures vector for recent items ${recentItems}.")
-      Map[Int, Double]()
-    } else {
-      productFeatures.par // convert to parallel collection
-        .filter { case (i, (item, feature)) =>
-          feature.isDefined &&
-          isCandidateItem(
-            i = i,
-            item = item,
-            categories = query.categories,
-            whiteList = whiteList,
-            blackList = blackList
-          )
-        }
-        .map { case (i, (item, feature)) =>
-          val s = recentFeatures.map{ rf =>
-            cosine(rf, feature.get) // feature is defined
-          }.reduce(_ + _)
-          // Can adjust score here
-          (i, s)
-        }
-        .filter(_._2 > 0) // keep items with score > 0
-        .seq // convert back to sequential collection
-    }
-
-    val ord = Ordering.by[(Int, Double), Double](_._2).reverse
-    val topScores = getTopN(indexScores, query.num)(ord).toArray
-
-    topScores
-  }
-
-  private
-  def getTopN[T](s: Iterable[T], n: Int)(implicit ord: Ordering[T]): Seq[T] = {
-
-    val q = PriorityQueue()
-
-    for (x <- s) {
-      if (q.size < n)
-        q.enqueue(x)
-      else {
-        // q is full
-        if (ord.compare(x, q.head) < 0) {
-          q.dequeue()
-          q.enqueue(x)
-        }
-      }
-    }
-
-    q.dequeueAll.toSeq.reverse
-  }
-
-  private
-  def dotProduct(v1: Array[Double], v2: Array[Double]): Double = {
-    val size = v1.size
-    var i = 0
-    var d: Double = 0
-    while (i < size) {
-      d += v1(i) * v2(i)
-      i += 1
-    }
-    d
-  }
-
-  private
-  def cosine(v1: Array[Double], v2: Array[Double]): Double = {
-    val size = v1.size
-    var i = 0
-    var n1: Double = 0
-    var n2: Double = 0
-    var d: Double = 0
-    while (i < size) {
-      n1 += v1(i) * v1(i)
-      n2 += v2(i) * v2(i)
-      d += v1(i) * v2(i)
-      i += 1
-    }
-    val n1n2 = (math.sqrt(n1) * math.sqrt(n2))
-    if (n1n2 == 0) 0 else (d / n1n2)
-  }
-
-  private
-  def isCandidateItem(
-    i: Int,
-    item: Item,
-    categories: Option[Set[String]],
-    whiteList: Option[Set[Int]],
-    blackList: Set[Int]
-  ): Boolean = {
-    // can add other custom filtering here
-    whiteList.map(_.contains(i)).getOrElse(true) &&
-    !blackList.contains(i) &&
-    // filter categories
-    categories.map { cat =>
-      item.categories.map { itemCat =>
-        // keep this item if has ovelap categories with the query
-        !(itemCat.toSet.intersect(cat).isEmpty)
-      }.getOrElse(false) // discard this item if it has no categories
-    }.getOrElse(true)
-
-  }
-
-}
diff --git a/src/main/scala/DataSource.scala b/src/main/scala/DataSource.scala
index 783b2bb..bb83a3d 100644
--- a/src/main/scala/DataSource.scala
+++ b/src/main/scala/DataSource.scala
@@ -59,37 +59,53 @@
       (entityId, item)
     }.cache()
 
-    // get all "user" "view" "item" events
-    val viewEventsRDD: RDD[ViewEvent] = PEventStore.find(
+    val eventsRDD: RDD[Event] = PEventStore.find(
       appName = dsp.appName,
       entityType = Some("user"),
-      eventNames = Some(List("view")),
+      eventNames = Some(List("view", "buy")),
       // targetEntityType is optional field of an event.
       targetEntityType = Some(Some("item")))(sc)
-      // eventsDb.find() returns RDD[Event]
+      .cache()
+
+    val viewEventsRDD: RDD[ViewEvent] = eventsRDD
+      .filter { event => event.event == "view" }
       .map { event =>
-        val viewEvent = try {
-          event.event match {
-            case "view" => ViewEvent(
-              user = event.entityId,
-              item = event.targetEntityId.get,
-              t = event.eventTime.getMillis)
-            case _ => throw new Exception(s"Unexpected event ${event} is read.")
-          }
+        try {
+          ViewEvent(
+            user = event.entityId,
+            item = event.targetEntityId.get,
+            t = event.eventTime.getMillis
+          )
         } catch {
-          case e: Exception => {
+          case e: Exception =>
             logger.error(s"Cannot convert ${event} to ViewEvent." +
               s" Exception: ${e}.")
             throw e
-          }
         }
-        viewEvent
-      }.cache()
+      }
+
+    val buyEventsRDD: RDD[BuyEvent] = eventsRDD
+      .filter { event => event.event == "buy" }
+      .map { event =>
+        try {
+          BuyEvent(
+            user = event.entityId,
+            item = event.targetEntityId.get,
+            t = event.eventTime.getMillis
+          )
+        } catch {
+          case e: Exception =>
+            logger.error(s"Cannot convert ${event} to BuyEvent." +
+              s" Exception: ${e}.")
+            throw e
+        }
+      }
 
     new TrainingData(
       users = usersRDD,
       items = itemsRDD,
-      viewEvents = viewEventsRDD
+      viewEvents = viewEventsRDD,
+      buyEvents = buyEventsRDD
     )
   }
 }
@@ -100,14 +116,18 @@
 
 case class ViewEvent(user: String, item: String, t: Long)
 
+case class BuyEvent(user: String, item: String, t: Long)
+
 class TrainingData(
   val users: RDD[(String, User)],
   val items: RDD[(String, Item)],
-  val viewEvents: RDD[ViewEvent]
+  val viewEvents: RDD[ViewEvent],
+  val buyEvents: RDD[BuyEvent]
 ) extends Serializable {
   override def toString = {
     s"users: [${users.count()} (${users.take(2).toList}...)]" +
     s"items: [${items.count()} (${items.take(2).toList}...)]" +
-    s"viewEvents: [${viewEvents.count()}] (${viewEvents.take(2).toList}...)"
+    s"viewEvents: [${viewEvents.count()}] (${viewEvents.take(2).toList}...)" +
+    s"buyEvents: [${buyEvents.count()}] (${buyEvents.take(2).toList}...)"
   }
 }
diff --git a/src/main/scala/ECommAlgorithm.scala b/src/main/scala/ECommAlgorithm.scala
new file mode 100644
index 0000000..674fd09
--- /dev/null
+++ b/src/main/scala/ECommAlgorithm.scala
@@ -0,0 +1,573 @@
+package org.template.ecommercerecommendation
+
+import io.prediction.controller.P2LAlgorithm
+import io.prediction.controller.Params
+import io.prediction.data.storage.BiMap
+import io.prediction.data.storage.Event
+import io.prediction.data.store.LEventStore
+
+import org.apache.spark.SparkContext
+import org.apache.spark.SparkContext._
+import org.apache.spark.mllib.recommendation.ALS
+import org.apache.spark.mllib.recommendation.{Rating => MLlibRating}
+import org.apache.spark.rdd.RDD
+
+import grizzled.slf4j.Logger
+
+import scala.collection.mutable.PriorityQueue
+import scala.concurrent.duration.Duration
+import scala.concurrent.ExecutionContext.Implicits.global
+
+case class ECommAlgorithmParams(
+  appName: String,
+  unseenOnly: Boolean,
+  seenEvents: List[String],
+  similarEvents: List[String],
+  rank: Int,
+  numIterations: Int,
+  lambda: Double,
+  seed: Option[Long]
+) extends Params
+
+
+case class ProductModel(
+  item: Item,
+  features: Option[Array[Double]], // features by ALS
+  count: Int // popular count for default score
+)
+
+class ECommModel(
+  val rank: Int,
+  val userFeatures: Map[Int, Array[Double]],
+  val productModels: Map[Int, ProductModel],
+  val userStringIntMap: BiMap[String, Int],
+  val itemStringIntMap: BiMap[String, Int]
+) extends Serializable {
+
+  @transient lazy val itemIntStringMap = itemStringIntMap.inverse
+
+  override def toString = {
+    s" rank: ${rank}" +
+    s" userFeatures: [${userFeatures.size}]" +
+    s"(${userFeatures.take(2).toList}...)" +
+    s" productModels: [${productModels.size}]" +
+    s"(${productModels.take(2).toList}...)" +
+    s" userStringIntMap: [${userStringIntMap.size}]" +
+    s"(${userStringIntMap.take(2).toString}...)]" +
+    s" itemStringIntMap: [${itemStringIntMap.size}]" +
+    s"(${itemStringIntMap.take(2).toString}...)]"
+  }
+}
+
+class ECommAlgorithm(val ap: ECommAlgorithmParams)
+  extends P2LAlgorithm[PreparedData, ECommModel, Query, PredictedResult] {
+
+  @transient lazy val logger = Logger[this.type]
+
+  def train(sc: SparkContext, data: PreparedData): ECommModel = {
+    require(!data.viewEvents.take(1).isEmpty,
+      s"viewEvents in PreparedData cannot be empty." +
+      " Please check if DataSource generates TrainingData" +
+      " and Preprator generates PreparedData correctly.")
+    require(!data.users.take(1).isEmpty,
+      s"users in PreparedData cannot be empty." +
+      " Please check if DataSource generates TrainingData" +
+      " and Preprator generates PreparedData correctly.")
+    require(!data.items.take(1).isEmpty,
+      s"items in PreparedData cannot be empty." +
+      " Please check if DataSource generates TrainingData" +
+      " and Preprator generates PreparedData correctly.")
+    // create User and item's String ID to integer index BiMap
+    val userStringIntMap = BiMap.stringInt(data.users.keys)
+    val itemStringIntMap = BiMap.stringInt(data.items.keys)
+
+    val mllibRatings: RDD[MLlibRating] = genMLlibRating(
+      userStringIntMap = userStringIntMap,
+      itemStringIntMap = itemStringIntMap,
+      data = data
+    )
+
+    // MLLib ALS cannot handle empty training data.
+    require(!mllibRatings.take(1).isEmpty,
+      s"mllibRatings cannot be empty." +
+      " Please check if your events contain valid user and item ID.")
+
+    // seed for MLlib ALS
+    val seed = ap.seed.getOrElse(System.nanoTime)
+
+    // use ALS to train feature vectors
+    val m = ALS.trainImplicit(
+      ratings = mllibRatings,
+      rank = ap.rank,
+      iterations = ap.numIterations,
+      lambda = ap.lambda,
+      blocks = -1,
+      alpha = 1.0,
+      seed = seed)
+
+    val userFeatures = m.userFeatures.collectAsMap.toMap
+
+    // convert ID to Int index
+    val items = data.items.map { case (id, item) =>
+      (itemStringIntMap(id), item)
+    }
+
+    // join item with the trained productFeatures
+    val productFeatures: Map[Int, (Item, Option[Array[Double]])] =
+      items.leftOuterJoin(m.productFeatures).collectAsMap.toMap
+
+    val popularCount = trainDefault(
+      userStringIntMap = userStringIntMap,
+      itemStringIntMap = itemStringIntMap,
+      data = data
+    )
+
+    val productModels: Map[Int, ProductModel] = productFeatures
+      .map { case (index, (item, features)) =>
+        val pm = ProductModel(
+          item = item,
+          features = features,
+          // NOTE: use getOrElse because popularCount may not contain all items.
+          count = popularCount.getOrElse(index, 0)
+        )
+        (index, pm)
+      }
+
+    new ECommModel(
+      rank = m.rank,
+      userFeatures = userFeatures,
+      productModels = productModels,
+      userStringIntMap = userStringIntMap,
+      itemStringIntMap = itemStringIntMap
+    )
+  }
+
+  /** Generate MLlibRating from PreparedData.
+    * You may customize this function if use different events or different aggregation method
+    */
+  def genMLlibRating(
+    userStringIntMap: BiMap[String, Int],
+    itemStringIntMap: BiMap[String, Int],
+    data: PreparedData): RDD[MLlibRating] = {
+
+    val mllibRatings = data.viewEvents
+      .map { r =>
+        // Convert user and item String IDs to Int index for MLlib
+        val uindex = userStringIntMap.getOrElse(r.user, -1)
+        val iindex = itemStringIntMap.getOrElse(r.item, -1)
+
+        if (uindex == -1)
+          logger.info(s"Couldn't convert nonexistent user ID ${r.user}"
+            + " to Int index.")
+
+        if (iindex == -1)
+          logger.info(s"Couldn't convert nonexistent item ID ${r.item}"
+            + " to Int index.")
+
+        ((uindex, iindex), 1)
+      }
+      .filter { case ((u, i), v) =>
+        // keep events with valid user and item index
+        (u != -1) && (i != -1)
+      }
+      .reduceByKey(_ + _) // aggregate all view events of same user-item pair
+      .map { case ((u, i), v) =>
+        // MLlibRating requires integer index for user and item
+        MLlibRating(u, i, v)
+      }
+      .cache()
+
+    mllibRatings
+  }
+
+  /** Train default model.
+    * You may customize this function if use different events or
+    * need different ways to count "popular" score or return default score for item.
+    */
+  def trainDefault(
+    userStringIntMap: BiMap[String, Int],
+    itemStringIntMap: BiMap[String, Int],
+    data: PreparedData): Map[Int, Int] = {
+    // count number of buys
+    // (item index, count)
+    val buyCountsRDD: RDD[(Int, Int)] = data.buyEvents
+      .map { r =>
+        // Convert user and item String IDs to Int index
+        val uindex = userStringIntMap.getOrElse(r.user, -1)
+        val iindex = itemStringIntMap.getOrElse(r.item, -1)
+
+        if (uindex == -1)
+          logger.info(s"Couldn't convert nonexistent user ID ${r.user}"
+            + " to Int index.")
+
+        if (iindex == -1)
+          logger.info(s"Couldn't convert nonexistent item ID ${r.item}"
+            + " to Int index.")
+
+        (uindex, iindex, 1)
+      }
+      .filter { case (u, i, v) =>
+        // keep events with valid user and item index
+        (u != -1) && (i != -1)
+      }
+      .map { case (u, i, v) => (i, 1) } // key is item
+      .reduceByKey{ case (a, b) => a + b } // count number of items occurrence
+
+    buyCountsRDD.collectAsMap.toMap
+  }
+
+  def predict(model: ECommModel, query: Query): PredictedResult = {
+
+    val userFeatures = model.userFeatures
+    val productModels = model.productModels
+
+    // convert whiteList's string ID to integer index
+    val whiteList: Option[Set[Int]] = query.whiteList.map( set =>
+      set.flatMap(model.itemStringIntMap.get(_))
+    )
+
+    val finalBlackList: Set[Int] = genBlackList(query = query)
+      // convert seen Items list from String ID to interger Index
+      .flatMap(x => model.itemStringIntMap.get(x))
+
+    val userFeature: Option[Array[Double]] =
+      model.userStringIntMap.get(query.user).flatMap { userIndex =>
+        userFeatures.get(userIndex)
+      }
+
+    val topScores: Array[(Int, Double)] = if (userFeature.isDefined) {
+      // the user has feature vector
+      predictKnownUser(
+        userFeature = userFeature.get,
+        productModels = productModels,
+        query = query,
+        whiteList = whiteList,
+        blackList = finalBlackList
+      )
+    } else {
+      // the user doesn't have feature vector.
+      // For example, new user is created after model is trained.
+      logger.info(s"No userFeature found for user ${query.user}.")
+
+      // check if the user has recent events on some items
+      val recentItems: Set[String] = getRecentItems(query)
+      val recentList: Set[Int] = recentItems.flatMap (x =>
+        model.itemStringIntMap.get(x))
+
+      val recentFeatures: Vector[Array[Double]] = recentList.toVector
+        // productModels may not contain the requested item
+        .map { i =>
+          productModels.get(i).flatMap { pm => pm.features }
+        }.flatten
+
+      if (recentFeatures.isEmpty) {
+        logger.info(s"No features vector for recent items ${recentItems}.")
+        predictDefault(
+          productModels = productModels,
+          query = query,
+          whiteList = whiteList,
+          blackList = finalBlackList
+        )
+      } else {
+        predictSimilar(
+          recentFeatures = recentFeatures,
+          productModels = productModels,
+          query = query,
+          whiteList = whiteList,
+          blackList = finalBlackList
+        )
+      }
+    }
+
+    val itemScores = topScores.map { case (i, s) =>
+      new ItemScore(
+        // convert item int index back to string ID
+        item = model.itemIntStringMap(i),
+        score = s
+      )
+    }
+
+    new PredictedResult(itemScores)
+  }
+
+  /** Generate final blackList based on other constraints */
+  def genBlackList(query: Query): Set[String] = {
+    // if unseenOnly is True, get all seen items
+    val seenItems: Set[String] = if (ap.unseenOnly) {
+
+      // get all user item events which are considered as "seen" events
+      val seenEvents: Iterator[Event] = try {
+        LEventStore.findByEntity(
+          appName = ap.appName,
+          entityType = "user",
+          entityId = query.user,
+          eventNames = Some(ap.seenEvents),
+          targetEntityType = Some(Some("item")),
+          // set time limit to avoid super long DB access
+          timeout = Duration(200, "millis")
+        )
+      } catch {
+        case e: scala.concurrent.TimeoutException =>
+          logger.error(s"Timeout when read seen events." +
+            s" Empty list is used. ${e}")
+          Iterator[Event]()
+        case e: Exception =>
+          logger.error(s"Error when read seen events: ${e}")
+          throw e
+      }
+
+      seenEvents.map { event =>
+        try {
+          event.targetEntityId.get
+        } catch {
+          case e => {
+            logger.error(s"Can't get targetEntityId of event ${event}.")
+            throw e
+          }
+        }
+      }.toSet
+    } else {
+      Set[String]()
+    }
+
+    // get the latest constraint unavailableItems $set event
+    val unavailableItems: Set[String] = try {
+      val constr = LEventStore.findByEntity(
+        appName = ap.appName,
+        entityType = "constraint",
+        entityId = "unavailableItems",
+        eventNames = Some(Seq("$set")),
+        limit = Some(1),
+        latest = true,
+        timeout = Duration(200, "millis")
+      )
+      if (constr.hasNext) {
+        constr.next.properties.get[Set[String]]("items")
+      } else {
+        Set[String]()
+      }
+    } catch {
+      case e: scala.concurrent.TimeoutException =>
+        logger.error(s"Timeout when read set unavailableItems event." +
+          s" Empty list is used. ${e}")
+        Set[String]()
+      case e: Exception =>
+        logger.error(s"Error when read set unavailableItems event: ${e}")
+        throw e
+    }
+
+    // combine query's blackList,seenItems and unavailableItems
+    // into final blackList.
+    query.blackList.getOrElse(Set[String]()) ++ seenItems ++ unavailableItems
+  }
+
+  /** Get recent events of the user on items for recommending similar items */
+  def getRecentItems(query: Query): Set[String] = {
+    // get latest 10 user view item events
+    val recentEvents = try {
+      LEventStore.findByEntity(
+        appName = ap.appName,
+        // entityType and entityId is specified for fast lookup
+        entityType = "user",
+        entityId = query.user,
+        eventNames = Some(ap.similarEvents),
+        targetEntityType = Some(Some("item")),
+        limit = Some(10),
+        latest = true,
+        // set time limit to avoid super long DB access
+        timeout = Duration(200, "millis")
+      )
+    } catch {
+      case e: scala.concurrent.TimeoutException =>
+        logger.error(s"Timeout when read recent events." +
+          s" Empty list is used. ${e}")
+        Iterator[Event]()
+      case e: Exception =>
+        logger.error(s"Error when read recent events: ${e}")
+        throw e
+    }
+
+    val recentItems: Set[String] = recentEvents.map { event =>
+      try {
+        event.targetEntityId.get
+      } catch {
+        case e => {
+          logger.error("Can't get targetEntityId of event ${event}.")
+          throw e
+        }
+      }
+    }.toSet
+
+    recentItems
+  }
+
+  /** Prediction for user with known feature vector */
+  def predictKnownUser(
+    userFeature: Array[Double],
+    productModels: Map[Int, ProductModel],
+    query: Query,
+    whiteList: Option[Set[Int]],
+    blackList: Set[Int]
+  ): Array[(Int, Double)] = {
+    val indexScores: Map[Int, Double] = productModels.par // convert to parallel collection
+      .filter { case (i, pm) =>
+        pm.features.isDefined &&
+        isCandidateItem(
+          i = i,
+          item = pm.item,
+          categories = query.categories,
+          whiteList = whiteList,
+          blackList = blackList
+        )
+      }
+      .map { case (i, pm) =>
+        // NOTE: features must be defined, so can call .get
+        val s = dotProduct(userFeature, pm.features.get)
+        // may customize here to further adjust score
+        (i, s)
+      }
+      .filter(_._2 > 0) // only keep items with score > 0
+      .seq // convert back to sequential collection
+
+    val ord = Ordering.by[(Int, Double), Double](_._2).reverse
+    val topScores = getTopN(indexScores, query.num)(ord).toArray
+
+    topScores
+  }
+
+  /** Default prediction when know nothing about the user */
+  def predictDefault(
+    productModels: Map[Int, ProductModel],
+    query: Query,
+    whiteList: Option[Set[Int]],
+    blackList: Set[Int]
+  ): Array[(Int, Double)] = {
+    val indexScores: Map[Int, Double] = productModels.par // convert back to sequential collection
+      .filter { case (i, pm) =>
+        isCandidateItem(
+          i = i,
+          item = pm.item,
+          categories = query.categories,
+          whiteList = whiteList,
+          blackList = blackList
+        )
+      }
+      .map { case (i, pm) =>
+        // may customize here to further adjust score
+        (i, pm.count.toDouble)
+      }
+      .seq
+
+    val ord = Ordering.by[(Int, Double), Double](_._2).reverse
+    val topScores = getTopN(indexScores, query.num)(ord).toArray
+
+    topScores
+  }
+
+  /** Return top similar items based on items user recently has action on */
+  def predictSimilar(
+    recentFeatures: Vector[Array[Double]],
+    productModels: Map[Int, ProductModel],
+    query: Query,
+    whiteList: Option[Set[Int]],
+    blackList: Set[Int]
+  ): Array[(Int, Double)] = {
+    val indexScores: Map[Int, Double] = productModels.par // convert to parallel collection
+      .filter { case (i, pm) =>
+        pm.features.isDefined &&
+        isCandidateItem(
+          i = i,
+          item = pm.item,
+          categories = query.categories,
+          whiteList = whiteList,
+          blackList = blackList
+        )
+      }
+      .map { case (i, pm) =>
+        val s = recentFeatures.map{ rf =>
+          // pm.features must be defined because of filter logic above
+          cosine(rf, pm.features.get)
+        }.reduce(_ + _)
+        // may customize here to further adjust score
+        (i, s)
+      }
+      .filter(_._2 > 0) // keep items with score > 0
+      .seq // convert back to sequential collection
+
+    val ord = Ordering.by[(Int, Double), Double](_._2).reverse
+    val topScores = getTopN(indexScores, query.num)(ord).toArray
+
+    topScores
+  }
+
+  private
+  def getTopN[T](s: Iterable[T], n: Int)(implicit ord: Ordering[T]): Seq[T] = {
+
+    val q = PriorityQueue()
+
+    for (x <- s) {
+      if (q.size < n)
+        q.enqueue(x)
+      else {
+        // q is full
+        if (ord.compare(x, q.head) < 0) {
+          q.dequeue()
+          q.enqueue(x)
+        }
+      }
+    }
+
+    q.dequeueAll.toSeq.reverse
+  }
+
+  private
+  def dotProduct(v1: Array[Double], v2: Array[Double]): Double = {
+    val size = v1.size
+    var i = 0
+    var d: Double = 0
+    while (i < size) {
+      d += v1(i) * v2(i)
+      i += 1
+    }
+    d
+  }
+
+  private
+  def cosine(v1: Array[Double], v2: Array[Double]): Double = {
+    val size = v1.size
+    var i = 0
+    var n1: Double = 0
+    var n2: Double = 0
+    var d: Double = 0
+    while (i < size) {
+      n1 += v1(i) * v1(i)
+      n2 += v2(i) * v2(i)
+      d += v1(i) * v2(i)
+      i += 1
+    }
+    val n1n2 = (math.sqrt(n1) * math.sqrt(n2))
+    if (n1n2 == 0) 0 else (d / n1n2)
+  }
+
+  private
+  def isCandidateItem(
+    i: Int,
+    item: Item,
+    categories: Option[Set[String]],
+    whiteList: Option[Set[Int]],
+    blackList: Set[Int]
+  ): Boolean = {
+    // can add other custom filtering here
+    whiteList.map(_.contains(i)).getOrElse(true) &&
+    !blackList.contains(i) &&
+    // filter categories
+    categories.map { cat =>
+      item.categories.map { itemCat =>
+        // keep this item if has ovelap categories with the query
+        !(itemCat.toSet.intersect(cat).isEmpty)
+      }.getOrElse(false) // discard this item if it has no categories
+    }.getOrElse(true)
+
+  }
+
+}
diff --git a/src/main/scala/Engine.scala b/src/main/scala/Engine.scala
index 42ec4d4..47d9896 100644
--- a/src/main/scala/Engine.scala
+++ b/src/main/scala/Engine.scala
@@ -25,7 +25,7 @@
     new Engine(
       classOf[DataSource],
       classOf[Preparator],
-      Map("als" -> classOf[ALSAlgorithm]),
+      Map("ecomm" -> classOf[ECommAlgorithm]),
       classOf[Serving])
   }
 }
diff --git a/src/main/scala/Preparator.scala b/src/main/scala/Preparator.scala
index 4dd45cf..ff82f80 100644
--- a/src/main/scala/Preparator.scala
+++ b/src/main/scala/Preparator.scala
@@ -13,12 +13,14 @@
     new PreparedData(
       users = trainingData.users,
       items = trainingData.items,
-      viewEvents = trainingData.viewEvents)
+      viewEvents = trainingData.viewEvents,
+      buyEvents = trainingData.buyEvents)
   }
 }
 
 class PreparedData(
   val users: RDD[(String, User)],
   val items: RDD[(String, Item)],
-  val viewEvents: RDD[ViewEvent]
+  val viewEvents: RDD[ViewEvent],
+  val buyEvents: RDD[BuyEvent]
 ) extends Serializable
diff --git a/src/test/scala/ECommAlgorithmTest.scala b/src/test/scala/ECommAlgorithmTest.scala
new file mode 100644
index 0000000..5ddac96
--- /dev/null
+++ b/src/test/scala/ECommAlgorithmTest.scala
@@ -0,0 +1,172 @@
+package org.template.ecommercerecommendation
+
+import org.scalatest.FlatSpec
+import org.scalatest.Matchers
+
+import io.prediction.data.storage.BiMap
+
+import org.apache.spark.mllib.recommendation.{Rating => MLlibRating}
+
+class ECommAlgorithmTest
+  extends FlatSpec with EngineTestSparkContext with Matchers {
+
+  val algorithmParams = new ECommAlgorithmParams(
+    appName = "test-app",
+    unseenOnly = true,
+    seenEvents = List("buy", "view"),
+    similarEvents = List("view"),
+    rank = 10,
+    numIterations = 20,
+    lambda = 0.01,
+    seed = Some(3)
+  )
+  val algorithm = new ECommAlgorithm(algorithmParams)
+
+  val userStringIntMap = BiMap(Map("u0" -> 0, "u1" -> 1))
+
+  val itemStringIntMap = BiMap(Map("i0" -> 0, "i1" -> 1, "i2" -> 2))
+
+  val users = Map("u0" -> User(), "u1" -> User())
+
+
+  val i0 = Item(categories = Some(List("c0", "c1")))
+  val i1 = Item(categories = None)
+  val i2 = Item(categories = Some(List("c0", "c2")))
+
+  val items = Map(
+    "i0" -> i0,
+    "i1" -> i1,
+    "i2" -> i2
+  )
+
+  val view = Seq(
+    ViewEvent("u0", "i0", 1000010),
+    ViewEvent("u0", "i1", 1000020),
+    ViewEvent("u0", "i1", 1000020),
+    ViewEvent("u1", "i1", 1000030),
+    ViewEvent("u1", "i2", 1000040)
+  )
+
+  val buy = Seq(
+    BuyEvent("u0", "i0", 1000020),
+    BuyEvent("u0", "i1", 1000030),
+    BuyEvent("u1", "i1", 1000040)
+  )
+
+
+  "ECommAlgorithm.genMLlibRating()" should "create RDD[MLlibRating] correctly" in {
+
+    val preparedData = new PreparedData(
+      users = sc.parallelize(users.toSeq),
+      items = sc.parallelize(items.toSeq),
+      viewEvents = sc.parallelize(view.toSeq),
+      buyEvents = sc.parallelize(buy.toSeq)
+    )
+
+    val mllibRatings = algorithm.genMLlibRating(
+      userStringIntMap = userStringIntMap,
+      itemStringIntMap = itemStringIntMap,
+      data = preparedData
+    )
+
+    val expected = Seq(
+      MLlibRating(0, 0, 1),
+      MLlibRating(0, 1, 2),
+      MLlibRating(1, 1, 1),
+      MLlibRating(1, 2, 1)
+    )
+
+    mllibRatings.collect should contain theSameElementsAs expected
+  }
+
+  "ECommAlgorithm.trainDefault()" should "return popular count for each item" in {
+    val preparedData = new PreparedData(
+      users = sc.parallelize(users.toSeq),
+      items = sc.parallelize(items.toSeq),
+      viewEvents = sc.parallelize(view.toSeq),
+      buyEvents = sc.parallelize(buy.toSeq)
+    )
+
+    val popCount = algorithm.trainDefault(
+      userStringIntMap = userStringIntMap,
+      itemStringIntMap = itemStringIntMap,
+      data = preparedData
+    )
+
+    val expected = Map(0 -> 1, 1 -> 2)
+
+    popCount should contain theSameElementsAs expected
+  }
+
+  "ECommAlgorithm.predictKnownuser()" should "return top item" in {
+
+    val top = algorithm.predictKnownUser(
+      userFeature = Array(1.0, 2.0, 0.5),
+      productModels = Map(
+        0 -> ProductModel(i0, Some(Array(2.0, 1.0, 2.0)), 3),
+        1 -> ProductModel(i1, Some(Array(3.0, 0.5, 1.0)), 4),
+        2 -> ProductModel(i2, Some(Array(1.0, 3.0, 1.0)), 1)
+      ),
+      query = Query(
+        user = "u0",
+        num = 5,
+        categories = Some(Set("c0")),
+        whiteList = None,
+        blackList = None),
+      whiteList = None,
+      blackList = Set()
+    )
+
+    val expected = Array((2, 7.5), (0, 5.0))
+    top shouldBe expected
+  }
+
+  "ECommAlgorithm.predictDefault()" should "return top item" in {
+
+    val top = algorithm.predictDefault(
+      productModels = Map(
+        0 -> ProductModel(i0, Some(Array(2.0, 1.0, 2.0)), 3),
+        1 -> ProductModel(i1, Some(Array(3.0, 0.5, 1.0)), 4),
+        2 -> ProductModel(i2, Some(Array(1.0, 3.0, 1.0)), 1)
+      ),
+      query = Query(
+        user = "u0",
+        num = 5,
+        categories = None,
+        whiteList = None,
+        blackList = None),
+      whiteList = None,
+      blackList = Set()
+    )
+
+    val expected = Array((1, 4.0), (0, 3.0), (2, 1.0))
+    top shouldBe expected
+  }
+
+  "ECommAlgorithm.predictSimilar()" should "return top item" in {
+
+    val top = algorithm.predictSimilar(
+      recentFeatures = Vector(Array(1.0, 2.0, 0.5), Array(1.0, 0.2, 0.3)),
+      productModels = Map(
+        0 -> ProductModel(i0, Some(Array(2.0, 1.0, 2.0)), 3),
+        1 -> ProductModel(i1, Some(Array(3.0, 0.5, 1.0)), 4),
+        2 -> ProductModel(i2, Some(Array(1.0, 3.0, 1.0)), 1)
+      ),
+      query = Query(
+        user = "u0",
+        num = 5,
+        categories = Some(Set("c0")),
+        whiteList = None,
+        blackList = None),
+      whiteList = None,
+      blackList = Set()
+    )
+
+    val expected = Array((0, 1.605), (2, 1.525))
+
+    top(0)._1 should be (expected(0)._1)
+    top(1)._1 should be (expected(1)._1)
+    top(0)._2 should be (expected(0)._2 plusOrMinus 0.001)
+    top(1)._2 should be (expected(1)._2 plusOrMinus 0.001)
+  }
+}
diff --git a/src/test/scala/EngineTestSparkContext.scala b/src/test/scala/EngineTestSparkContext.scala
new file mode 100644
index 0000000..2931403
--- /dev/null
+++ b/src/test/scala/EngineTestSparkContext.scala
@@ -0,0 +1,36 @@
+package org.template.ecommercerecommendation
+
+import org.apache.spark.{SparkConf, SparkContext}
+import org.scalatest.{BeforeAndAfterAll, Suite}
+
+trait EngineTestSparkContext extends BeforeAndAfterAll {
+  self: Suite =>
+  @transient private var _sc: SparkContext = _
+
+  def sc: SparkContext = _sc
+
+  var conf = new SparkConf(false)
+
+  override def beforeAll() {
+    _sc = new SparkContext("local", "test", conf)
+    super.beforeAll()
+  }
+
+  override def afterAll() {
+    LocalSparkContext.stop(_sc)
+
+    _sc = null
+    super.afterAll()
+  }
+}
+
+object LocalSparkContext {
+  def stop(sc: SparkContext) {
+    if (sc != null) {
+      sc.stop()
+    }
+    // To avoid Akka rebinding to the same port, since it doesn't unbind
+    // immediately on shutdown
+    System.clearProperty("spark.driver.port")
+  }
+}
diff --git a/src/test/scala/PreparatorTest.scala b/src/test/scala/PreparatorTest.scala
new file mode 100644
index 0000000..ec421b3
--- /dev/null
+++ b/src/test/scala/PreparatorTest.scala
@@ -0,0 +1,49 @@
+package org.template.ecommercerecommendation
+
+import org.scalatest.FlatSpec
+import org.scalatest.Matchers
+
+class PreparatorTest
+  extends FlatSpec with EngineTestSparkContext with Matchers {
+
+  val preparator = new Preparator()
+  val users = Map(
+    "u0" -> User(),
+    "u1" -> User()
+  )
+
+  val items = Map(
+    "i0" -> Item(categories = Some(List("c0", "c1"))),
+    "i1" -> Item(categories = None)
+  )
+
+  val view = Seq(
+    ViewEvent("u0", "i0", 1000010),
+    ViewEvent("u0", "i1", 1000020),
+    ViewEvent("u1", "i1", 1000030)
+  )
+
+  val buy = Seq(
+    BuyEvent("u0", "i0", 1000020),
+    BuyEvent("u0", "i1", 1000030),
+    BuyEvent("u1", "i1", 1000040)
+  )
+
+  // simple test for demonstration purpose
+  "Preparator" should "prepare PreparedData" in {
+
+    val trainingData = new TrainingData(
+      users = sc.parallelize(users.toSeq),
+      items = sc.parallelize(items.toSeq),
+      viewEvents = sc.parallelize(view.toSeq),
+      buyEvents = sc.parallelize(buy.toSeq)
+    )
+
+    val preparedData = preparator.prepare(sc, trainingData)
+
+    preparedData.users.collect should contain theSameElementsAs users
+    preparedData.items.collect should contain theSameElementsAs items
+    preparedData.viewEvents.collect should contain theSameElementsAs view
+    preparedData.buyEvents.collect should contain theSameElementsAs buy
+  }
+}