v0.1.0 release
Merge remote-tracking branch 'origin/develop'
diff --git a/.gitignore b/.gitignore
new file mode 100644
index 0000000..64fa18b
--- /dev/null
+++ b/.gitignore
@@ -0,0 +1,3 @@
+manifest.json
+target/
+pio.log
diff --git a/README.md b/README.md
index e1e0199..33436a5 100644
--- a/README.md
+++ b/README.md
@@ -1,2 +1,155 @@
-# template-scala-parallel-ecommercerecommendation
-PredictionIO E-Commerce Recommendation engine template (Scala-based parallelized engine) 
+# E-Commerce Recommendation Template
+
+## Documentation
+
+Please refer to http://docs.prediction.io/templates/ecommercerecommendation/quickstart/
+
+## Versions
+
+### develop
+
+- TBD
+
+### v0.1.0
+
+- initial version
+
+
+## Development Notes
+
+### import sample data
+
+```
+$ python data/import_eventserver.py --access_key <your_access_key>
+```
+
+### query
+
+normal:
+
+```
+$ curl -H "Content-Type: application/json" \
+-d '{
+  "user" : "u1",
+  "num" : 10 }' \
+http://localhost:8000/queries.json \
+-w %{time_connect}:%{time_starttransfer}:%{time_total}
+```
+
+```
+$ curl -H "Content-Type: application/json" \
+-d '{
+  "user" : "u1",
+  "num": 10,
+  "categories" : ["c4", "c3"]
+}' \
+http://localhost:8000/queries.json \
+-w %{time_connect}:%{time_starttransfer}:%{time_total}
+```
+
+```
+curl -H "Content-Type: application/json" \
+-d '{
+  "user" : "u1",
+  "num": 10,
+  "whiteList": ["i21", "i26", "i40"]
+}' \
+http://localhost:8000/queries.json \
+-w %{time_connect}:%{time_starttransfer}:%{time_total}
+```
+
+```
+curl -H "Content-Type: application/json" \
+-d '{
+  "user" : "u1",
+  "num": 10,
+  "blackList": ["i21", "i26", "i40"]
+}' \
+http://localhost:8000/queries.json \
+-w %{time_connect}:%{time_starttransfer}:%{time_total}
+```
+
+unknown user:
+
+```
+curl -H "Content-Type: application/json" \
+-d '{
+  "user" : "unk1",
+  "num": 10}' \
+http://localhost:8000/queries.json \
+-w %{time_connect}:%{time_starttransfer}:%{time_total}
+```
+
+### handle new user
+
+new user:
+
+```
+curl -H "Content-Type: application/json" \
+-d '{
+  "user" : "x1",
+  "num": 10}' \
+http://localhost:8000/queries.json \
+-w %{time_connect}:%{time_starttransfer}:%{time_total}
+```
+
+import some view events and try to get recommendation for x1 again.
+
+```
+curl -i -X POST http://localhost:7070/events.json?accessKey=zPkr6sBwQoBwBjVHK2hsF9u26L38ARSe19QzkdYentuomCtYSuH0vXP5fq7advo4 \
+-H "Content-Type: application/json" \
+-d '{
+  "event" : "view",
+  "entityType" : "user"
+  "entityId" : "x1",
+  "targetEntityType" : "item",
+  "targetEntityId" : "i2",
+  "eventTime" : "2015-02-17T02:11:21.934Z"
+}'
+
+curl -i -X POST http://localhost:7070/events.json?accessKey=zPkr6sBwQoBwBjVHK2hsF9u26L38ARSe19QzkdYentuomCtYSuH0vXP5fq7advo4 \
+-H "Content-Type: application/json" \
+-d '{
+  "event" : "view",
+  "entityType" : "user"
+  "entityId" : "x1",
+  "targetEntityType" : "item",
+  "targetEntityId" : "i3",
+  "eventTime" : "2015-02-17T02:12:21.934Z"
+}'
+
+```
+
+## handle unavailable items
+
+Set the following items as unavailable (need to specify complete list each time when this list is changed):
+
+```
+curl -i -X POST http://localhost:7070/events.json?accessKey=zPkr6sBwQoBwBjVHK2hsF9u26L38ARSe19QzkdYentuomCtYSuH0vXP5fq7advo4 \
+-H "Content-Type: application/json" \
+-d '{
+  "event" : "$set",
+  "entityType" : "constraint"
+  "entityId" : "unavailableItems",
+  "properties" : {
+    "items": ["i43", "i20", "i37", "i3", "i4", "i5"],
+  }
+  "eventTime" : "2015-02-17T02:11:21.934Z"
+}'
+```
+
+Set empty list when no more items unavailable:
+
+```
+curl -i -X POST http://localhost:7070/events.json?accessKey=zPkr6sBwQoBwBjVHK2hsF9u26L38ARSe19QzkdYentuomCtYSuH0vXP5fq7advo4 \
+-H "Content-Type: application/json" \
+-d '{
+  "event" : "$set",
+  "entityType" : "constraint"
+  "entityId" : "unavailableItems",
+  "properties" : {
+    "items": [],
+  }
+  "eventTime" : "2015-02-18T02:11:21.934Z"
+}'
+```
diff --git a/build.sbt b/build.sbt
new file mode 100644
index 0000000..575ccf2
--- /dev/null
+++ b/build.sbt
@@ -0,0 +1,12 @@
+import AssemblyKeys._
+
+assemblySettings
+
+name := "template-scala-parallel-ecommercerecommendation"
+
+organization := "io.prediction"
+
+libraryDependencies ++= Seq(
+  "io.prediction"    %% "core"          % "0.8.7-SNAPSHOT" % "provided",
+  "org.apache.spark" %% "spark-core"    % "1.2.0" % "provided",
+  "org.apache.spark" %% "spark-mllib"   % "1.2.0" % "provided")
diff --git a/data/import_eventserver.py b/data/import_eventserver.py
new file mode 100644
index 0000000..5c49837
--- /dev/null
+++ b/data/import_eventserver.py
@@ -0,0 +1,84 @@
+"""
+Import sample data for E-Commerce Recommendation Engine Template
+"""
+
+import predictionio
+import argparse
+import random
+
+SEED = 3
+
+def import_events(client):
+  random.seed(SEED)
+  count = 0
+  print client.get_status()
+  print "Importing data..."
+
+  # generate 10 users, with user ids u1,u2,....,u10
+  user_ids = ["u%s" % i for i in range(1, 11)]
+  for user_id in user_ids:
+    print "Set user", user_id
+    client.create_event(
+      event="$set",
+      entity_type="user",
+      entity_id=user_id
+    )
+    count += 1
+
+  # generate 50 items, with item ids i1,i2,....,i50
+  # random assign 1 to 4 categories among c1-c6 to items
+  categories = ["c%s" % i for i in range(1, 7)]
+  item_ids = ["i%s" % i for i in range(1, 51)]
+  for item_id in item_ids:
+    print "Set item", item_id
+    client.create_event(
+      event="$set",
+      entity_type="item",
+      entity_id=item_id,
+      properties={
+        "categories" : random.sample(categories, random.randint(1, 4))
+      }
+    )
+    count += 1
+
+  # each user randomly viewed 10 items
+  for user_id in user_ids:
+    for viewed_item in random.sample(item_ids, 10):
+      print "User", user_id ,"views item", viewed_item
+      client.create_event(
+        event="view",
+        entity_type="user",
+        entity_id=user_id,
+        target_entity_type="item",
+        target_entity_id=viewed_item
+      )
+      count += 1
+      # randomly buy some of the viewed items
+      if random.choice([True, False]):
+        print "User", user_id ,"buys item", viewed_item
+        client.create_event(
+          event="buy",
+          entity_type="user",
+          entity_id=user_id,
+          target_entity_type="item",
+          target_entity_id=viewed_item
+        )
+        count += 1
+
+  print "%s events are imported." % count
+
+if __name__ == '__main__':
+  parser = argparse.ArgumentParser(
+    description="Import sample data for e-commerce recommendation engine")
+  parser.add_argument('--access_key', default='invald_access_key')
+  parser.add_argument('--url', default="http://localhost:7070")
+
+  args = parser.parse_args()
+  print args
+
+  client = predictionio.EventClient(
+    access_key=args.access_key,
+    url=args.url,
+    threads=5,
+    qsize=500)
+  import_events(client)
diff --git a/data/send_query.py b/data/send_query.py
new file mode 100644
index 0000000..b0eb651
--- /dev/null
+++ b/data/send_query.py
@@ -0,0 +1,7 @@
+"""
+Send sample query to prediction engine
+"""
+
+import predictionio
+engine_client = predictionio.EngineClient(url="http://localhost:8000")
+print engine_client.send_query({"user": "u1", "num": 4})
diff --git a/engine.json b/engine.json
new file mode 100644
index 0000000..b8103a6
--- /dev/null
+++ b/engine.json
@@ -0,0 +1,24 @@
+{
+  "id": "default",
+  "description": "Default settings",
+  "engineFactory": "org.template.ecommercerecommendation.ECommerceRecommendationEngine",
+  "datasource": {
+    "params" : {
+      "appId": 17
+    }
+  },
+  "algorithms": [
+    {
+      "name": "als",
+      "params": {
+        "appId": 17,
+        "unseenOnly": true,
+        "seenEvents": ["buy", "view"],
+        "rank": 10,
+        "numIterations" : 20,
+        "lambda": 0.01,
+        "seed": 3
+      }
+    }
+  ]
+}
diff --git a/project/assembly.sbt b/project/assembly.sbt
new file mode 100644
index 0000000..54c3252
--- /dev/null
+++ b/project/assembly.sbt
@@ -0,0 +1 @@
+addSbtPlugin("com.eed3si9n" % "sbt-assembly" % "0.11.2")
diff --git a/src/main/scala/ALSAlgorithm.scala b/src/main/scala/ALSAlgorithm.scala
new file mode 100644
index 0000000..67dc32a
--- /dev/null
+++ b/src/main/scala/ALSAlgorithm.scala
@@ -0,0 +1,430 @@
+package org.template.ecommercerecommendation
+
+import io.prediction.controller.P2LAlgorithm
+import io.prediction.controller.Params
+import io.prediction.data.storage.BiMap
+import io.prediction.data.storage.Event
+import io.prediction.data.storage.Storage
+
+import org.apache.spark.SparkContext
+import org.apache.spark.SparkContext._
+import org.apache.spark.mllib.recommendation.ALS
+import org.apache.spark.mllib.recommendation.{Rating => MLlibRating}
+
+import grizzled.slf4j.Logger
+
+import scala.collection.mutable.PriorityQueue
+import scala.concurrent.duration.Duration
+import scala.concurrent.ExecutionContext.Implicits.global
+
+case class ALSAlgorithmParams(
+  appId: Int,
+  unseenOnly: Boolean,
+  seenEvents: List[String],
+  rank: Int,
+  numIterations: Int,
+  lambda: Double,
+  seed: Option[Long]
+) extends Params
+
+class ALSModel(
+  val rank: Int,
+  val userFeatures: Map[Int, Array[Double]],
+  val productFeatures: Map[Int, (Item, Option[Array[Double]])],
+  val userStringIntMap: BiMap[String, Int],
+  val itemStringIntMap: BiMap[String, Int]
+) extends Serializable {
+
+  @transient lazy val itemIntStringMap = itemStringIntMap.inverse
+
+  override def toString = {
+    s" rank: ${rank}" +
+    s" userFeatures: [${userFeatures.size}]" +
+    s"(${userFeatures.take(2).toList}...)" +
+    s" productFeatures: [${productFeatures.size}]" +
+    s"(${productFeatures.take(2).toList}...)" +
+    s" userStringIntMap: [${userStringIntMap.size}]" +
+    s"(${userStringIntMap.take(2).toString}...)]" +
+    s" itemStringIntMap: [${itemStringIntMap.size}]" +
+    s"(${itemStringIntMap.take(2).toString}...)]"
+  }
+}
+
+/**
+  * Use ALS to build item x feature matrix
+  */
+class ALSAlgorithm(val ap: ALSAlgorithmParams)
+  extends P2LAlgorithm[PreparedData, ALSModel, Query, PredictedResult] {
+
+  @transient lazy val logger = Logger[this.type]
+  // NOTE: use getLEvents() for local access
+  @transient lazy val lEventsDb = Storage.getLEvents()
+
+  def train(data: PreparedData): ALSModel = {
+    require(!data.viewEvents.take(1).isEmpty,
+      s"viewEvents in PreparedData cannot be empty." +
+      " Please check if DataSource generates TrainingData" +
+      " and Preprator generates PreparedData correctly.")
+    require(!data.users.take(1).isEmpty,
+      s"users in PreparedData cannot be empty." +
+      " Please check if DataSource generates TrainingData" +
+      " and Preprator generates PreparedData correctly.")
+    require(!data.items.take(1).isEmpty,
+      s"items in PreparedData cannot be empty." +
+      " Please check if DataSource generates TrainingData" +
+      " and Preprator generates PreparedData correctly.")
+    // create User and item's String ID to integer index BiMap
+    val userStringIntMap = BiMap.stringInt(data.users.keys)
+    val itemStringIntMap = BiMap.stringInt(data.items.keys)
+
+    val mllibRatings = data.viewEvents
+      .map { r =>
+        // Convert user and item String IDs to Int index for MLlib
+        val uindex = userStringIntMap.getOrElse(r.user, -1)
+        val iindex = itemStringIntMap.getOrElse(r.item, -1)
+
+        if (uindex == -1)
+          logger.info(s"Couldn't convert nonexistent user ID ${r.user}"
+            + " to Int index.")
+
+        if (iindex == -1)
+          logger.info(s"Couldn't convert nonexistent item ID ${r.item}"
+            + " to Int index.")
+
+        ((uindex, iindex), 1)
+      }.filter { case ((u, i), v) =>
+        // keep events with valid user and item index
+        (u != -1) && (i != -1)
+      }
+      .reduceByKey(_ + _) // aggregate all view events of same user-item pair
+      .map { case ((u, i), v) =>
+        // MLlibRating requires integer index for user and item
+        MLlibRating(u, i, v)
+      }.cache()
+
+    // MLLib ALS cannot handle empty training data.
+    require(!mllibRatings.take(1).isEmpty,
+      s"mllibRatings cannot be empty." +
+      " Please check if your events contain valid user and item ID.")
+
+    // seed for MLlib ALS
+    val seed = ap.seed.getOrElse(System.nanoTime)
+
+    val m = ALS.trainImplicit(
+      ratings = mllibRatings,
+      rank = ap.rank,
+      iterations = ap.numIterations,
+      lambda = ap.lambda,
+      blocks = -1,
+      alpha = 1.0,
+      seed = seed)
+
+    val userFeatures = m.userFeatures.collectAsMap.toMap
+
+    // convert ID to Int index
+    val items = data.items.map { case (id, item) =>
+      (itemStringIntMap(id), item)
+    }
+
+    // join item with the trained productFeatures
+    val productFeatures = items.leftOuterJoin(m.productFeatures)
+      .collectAsMap.toMap
+
+    new ALSModel(
+      rank = m.rank,
+      userFeatures = userFeatures,
+      productFeatures = productFeatures,
+      userStringIntMap = userStringIntMap,
+      itemStringIntMap = itemStringIntMap
+    )
+  }
+
+  def predict(model: ALSModel, query: Query): PredictedResult = {
+
+    val userFeatures = model.userFeatures
+    val productFeatures = model.productFeatures
+
+    // convert whiteList's string ID to integer index
+    val whiteList: Option[Set[Int]] = query.whiteList.map( set =>
+      set.map(model.itemStringIntMap.get(_)).flatten
+    )
+
+    val blackList: Set[String] = query.blackList.getOrElse(Set[String]())
+
+    // if unseenOnly is True, get all seen items
+    val seenItems: Set[String] = if (ap.unseenOnly) {
+
+      // get all user item events which are considered as "seen" events
+      val seenEvents: Iterator[Event] = lEventsDb.findSingleEntity(
+        appId = ap.appId,
+        entityType = "user",
+        entityId = query.user,
+        eventNames = Some(ap.seenEvents),
+        targetEntityType = Some(Some("item")),
+        // set time limit to avoid super long DB access
+        timeout = Duration(200, "millis")
+      ) match {
+        case Right(x) => x
+        case Left(e) => {
+          logger.error(s"Error when read seen events: ${e}")
+          Iterator[Event]()
+        }
+      }
+
+      seenEvents.map { event =>
+        try {
+          event.targetEntityId.get
+        } catch {
+          case e => {
+            logger.error(s"Can't get targetEntityId of event ${event}.")
+            throw e
+          }
+        }
+      }.toSet
+    } else {
+      Set[String]()
+    }
+
+    // get the latest constraint unavailableItems $set event
+    val unavailableItems: Set[String] = lEventsDb.findSingleEntity(
+      appId = ap.appId,
+      entityType = "constraint",
+      entityId = "unavailableItems",
+      eventNames = Some(Seq("$set")),
+      limit = Some(1),
+      latest = true,
+      timeout = Duration(200, "millis")
+    ) match {
+      case Right(x) => {
+        if (x.hasNext) {
+          x.next.properties.get[Set[String]]("items")
+        } else {
+          Set[String]()
+        }
+      }
+      case Left(e) => {
+        logger.error(s"Error when read set unavailableItems event: ${e}")
+        Set[String]()
+      }
+    }
+
+    // combine query's blackList,seenItems and unavailableItems
+    // into final blackList.
+    // convert seen Items list from String ID to interger Index
+    val finalBlackList: Set[Int] = (blackList ++ seenItems ++
+      unavailableItems).map( x => model.itemStringIntMap.get(x)).flatten
+
+    val userFeature =
+      model.userStringIntMap.get(query.user).map { userIndex =>
+        userFeatures.get(userIndex)
+      }
+      // flatten Option[Option[Array[Double]]] to Option[Array[Double]]
+      .flatten
+
+    val topScores = if (userFeature.isDefined) {
+      // the user has feature vector
+      val uf = userFeature.get
+      val indexScores: Map[Int, Double] =
+        productFeatures.par // convert to parallel collection
+          .filter { case (i, (item, feature)) =>
+            feature.isDefined &&
+            isCandidateItem(
+              i = i,
+              item = item,
+              categories = query.categories,
+              whiteList = whiteList,
+              blackList = finalBlackList
+            )
+          }
+          .map { case (i, (item, feature)) =>
+            // NOTE: feature must be defined, so can call .get
+            val s = dotProduct(uf, feature.get)
+            // Can adjust score here
+            (i, s)
+          }
+          .filter(_._2 > 0) // only keep items with score > 0
+          .seq // convert back to sequential collection
+
+      val ord = Ordering.by[(Int, Double), Double](_._2).reverse
+      val topScores = getTopN(indexScores, query.num)(ord).toArray
+
+      topScores
+
+    } else {
+      // the user doesn't have feature vector.
+      // For example, new user is created after model is trained.
+      logger.info(s"No userFeature found for user ${query.user}.")
+      predictNewUser(
+        model = model,
+        query = query,
+        whiteList = whiteList,
+        blackList = finalBlackList
+      )
+
+    }
+
+    val itemScores = topScores.map { case (i, s) =>
+      new ItemScore(
+        // convert item int index back to string ID
+        item = model.itemIntStringMap(i),
+        score = s
+      )
+    }
+
+    new PredictedResult(itemScores)
+  }
+
+  /** Get recently viewed item of the user and return top similar items */
+  private
+  def predictNewUser(
+    model: ALSModel,
+    query: Query,
+    whiteList: Option[Set[Int]],
+    blackList: Set[Int]): Array[(Int, Double)] = {
+
+    val userFeatures = model.userFeatures
+    val productFeatures = model.productFeatures
+
+    // get latest 10 user view item events
+    val recentEvents = lEventsDb.findSingleEntity(
+      appId = ap.appId,
+      // entityType and entityId is specified for fast lookup
+      entityType = "user",
+      entityId = query.user,
+      eventNames = Some(Seq("view")),
+      targetEntityType = Some(Some("item")),
+      limit = Some(10),
+      latest = true,
+      // set time limit to avoid super long DB access
+      timeout = Duration(200, "millis")
+    ) match {
+      case Right(x) => x
+      case Left(e) => {
+        logger.error(s"Error when read recent events: ${e}")
+        Iterator[Event]()
+      }
+    }
+
+    val recentItems: Set[String] = recentEvents.map { event =>
+      try {
+        event.targetEntityId.get
+      } catch {
+        case e => {
+          logger.error("Can't get targetEntityId of event ${event}.")
+          throw e
+        }
+      }
+    }.toSet
+
+    val recentList: Set[Int] = recentItems.map (x =>
+      model.itemStringIntMap.get(x)).flatten
+
+    val recentFeatures: Vector[Array[Double]] = recentList.toVector
+      // productFeatures may not contain the requested item
+      .map { i =>
+        productFeatures.get(i).map { case (item, f) => f }.flatten
+      }.flatten
+
+    val indexScores: Map[Int, Double] = if (recentFeatures.isEmpty) {
+      logger.info(s"No productFeatures vector for recent items ${recentItems}.")
+      Map[Int, Double]()
+    } else {
+      productFeatures.par // convert to parallel collection
+        .filter { case (i, (item, feature)) =>
+          feature.isDefined &&
+          isCandidateItem(
+            i = i,
+            item = item,
+            categories = query.categories,
+            whiteList = whiteList,
+            blackList = blackList
+          )
+        }
+        .map { case (i, (item, feature)) =>
+          val s = recentFeatures.map{ rf =>
+            cosine(rf, feature.get) // feature is defined
+          }.reduce(_ + _)
+          // Can adjust score here
+          (i, s)
+        }
+        .filter(_._2 > 0) // keep items with score > 0
+        .seq // convert back to sequential collection
+    }
+
+    val ord = Ordering.by[(Int, Double), Double](_._2).reverse
+    val topScores = getTopN(indexScores, query.num)(ord).toArray
+
+    topScores
+  }
+
+  private
+  def getTopN[T](s: Iterable[T], n: Int)(implicit ord: Ordering[T]): Seq[T] = {
+
+    val q = PriorityQueue()
+
+    for (x <- s) {
+      if (q.size < n)
+        q.enqueue(x)
+      else {
+        // q is full
+        if (ord.compare(x, q.head) < 0) {
+          q.dequeue()
+          q.enqueue(x)
+        }
+      }
+    }
+
+    q.dequeueAll.toSeq.reverse
+  }
+
+  private
+  def dotProduct(v1: Array[Double], v2: Array[Double]): Double = {
+    val size = v1.size
+    var i = 0
+    var d: Double = 0
+    while (i < size) {
+      d += v1(i) * v2(i)
+      i += 1
+    }
+    d
+  }
+
+  private
+  def cosine(v1: Array[Double], v2: Array[Double]): Double = {
+    val size = v1.size
+    var i = 0
+    var n1: Double = 0
+    var n2: Double = 0
+    var d: Double = 0
+    while (i < size) {
+      n1 += v1(i) * v1(i)
+      n2 += v2(i) * v2(i)
+      d += v1(i) * v2(i)
+      i += 1
+    }
+    val n1n2 = (math.sqrt(n1) * math.sqrt(n2))
+    if (n1n2 == 0) 0 else (d / n1n2)
+  }
+
+  private
+  def isCandidateItem(
+    i: Int,
+    item: Item,
+    categories: Option[Set[String]],
+    whiteList: Option[Set[Int]],
+    blackList: Set[Int]
+  ): Boolean = {
+    // can add other custom filtering here
+    whiteList.map(_.contains(i)).getOrElse(true) &&
+    !blackList.contains(i) &&
+    // filter categories
+    categories.map { cat =>
+      item.categories.map { itemCat =>
+        // keep this item if has ovelap categories with the query
+        !(itemCat.toSet.intersect(cat).isEmpty)
+      }.getOrElse(false) // discard this item if it has no categories
+    }.getOrElse(true)
+
+  }
+
+}
diff --git a/src/main/scala/DataSource.scala b/src/main/scala/DataSource.scala
new file mode 100644
index 0000000..c102b72
--- /dev/null
+++ b/src/main/scala/DataSource.scala
@@ -0,0 +1,114 @@
+package org.template.ecommercerecommendation
+
+import io.prediction.controller.PDataSource
+import io.prediction.controller.EmptyEvaluationInfo
+import io.prediction.controller.EmptyActualResult
+import io.prediction.controller.Params
+import io.prediction.data.storage.Event
+import io.prediction.data.storage.Storage
+
+import org.apache.spark.SparkContext
+import org.apache.spark.SparkContext._
+import org.apache.spark.rdd.RDD
+
+import grizzled.slf4j.Logger
+
+case class DataSourceParams(appId: Int) extends Params
+
+class DataSource(val dsp: DataSourceParams)
+  extends PDataSource[TrainingData,
+      EmptyEvaluationInfo, Query, EmptyActualResult] {
+
+  @transient lazy val logger = Logger[this.type]
+
+  override
+  def readTraining(sc: SparkContext): TrainingData = {
+    val eventsDb = Storage.getPEvents()
+
+    // create a RDD of (entityID, User)
+    val usersRDD: RDD[(String, User)] = eventsDb.aggregateProperties(
+      appId = dsp.appId,
+      entityType = "user"
+    )(sc).map { case (entityId, properties) =>
+      val user = try {
+        User()
+      } catch {
+        case e: Exception => {
+          logger.error(s"Failed to get properties ${properties} of" +
+            s" user ${entityId}. Exception: ${e}.")
+          throw e
+        }
+      }
+      (entityId, user)
+    }.cache()
+
+    // create a RDD of (entityID, Item)
+    val itemsRDD: RDD[(String, Item)] = eventsDb.aggregateProperties(
+      appId = dsp.appId,
+      entityType = "item"
+    )(sc).map { case (entityId, properties) =>
+      val item = try {
+        // Assume categories is optional property of item.
+        Item(categories = properties.getOpt[List[String]]("categories"))
+      } catch {
+        case e: Exception => {
+          logger.error(s"Failed to get properties ${properties} of" +
+            s" item ${entityId}. Exception: ${e}.")
+          throw e
+        }
+      }
+      (entityId, item)
+    }.cache()
+
+    // get all "user" "view" "item" events
+    val viewEventsRDD: RDD[ViewEvent] = eventsDb.find(
+      appId = dsp.appId,
+      entityType = Some("user"),
+      eventNames = Some(List("view")),
+      // targetEntityType is optional field of an event.
+      targetEntityType = Some(Some("item")))(sc)
+      // eventsDb.find() returns RDD[Event]
+      .map { event =>
+        val viewEvent = try {
+          event.event match {
+            case "view" => ViewEvent(
+              user = event.entityId,
+              item = event.targetEntityId.get,
+              t = event.eventTime.getMillis)
+            case _ => throw new Exception(s"Unexpected event ${event} is read.")
+          }
+        } catch {
+          case e: Exception => {
+            logger.error(s"Cannot convert ${event} to ViewEvent." +
+              s" Exception: ${e}.")
+            throw e
+          }
+        }
+        viewEvent
+      }.cache()
+
+    new TrainingData(
+      users = usersRDD,
+      items = itemsRDD,
+      viewEvents = viewEventsRDD
+    )
+  }
+}
+
+case class User()
+
+case class Item(categories: Option[List[String]])
+
+case class ViewEvent(user: String, item: String, t: Long)
+
+class TrainingData(
+  val users: RDD[(String, User)],
+  val items: RDD[(String, Item)],
+  val viewEvents: RDD[ViewEvent]
+) extends Serializable {
+  override def toString = {
+    s"users: [${users.count()} (${users.take(2).toList}...)]" +
+    s"items: [${items.count()} (${items.take(2).toList}...)]" +
+    s"viewEvents: [${viewEvents.count()}] (${viewEvents.take(2).toList}...)"
+  }
+}
diff --git a/src/main/scala/Engine.scala b/src/main/scala/Engine.scala
new file mode 100644
index 0000000..42ec4d4
--- /dev/null
+++ b/src/main/scala/Engine.scala
@@ -0,0 +1,31 @@
+package org.template.ecommercerecommendation
+
+import io.prediction.controller.IEngineFactory
+import io.prediction.controller.Engine
+
+case class Query(
+  user: String,
+  num: Int,
+  categories: Option[Set[String]],
+  whiteList: Option[Set[String]],
+  blackList: Option[Set[String]]
+) extends Serializable
+
+case class PredictedResult(
+  itemScores: Array[ItemScore]
+) extends Serializable
+
+case class ItemScore(
+  item: String,
+  score: Double
+) extends Serializable
+
+object ECommerceRecommendationEngine extends IEngineFactory {
+  def apply() = {
+    new Engine(
+      classOf[DataSource],
+      classOf[Preparator],
+      Map("als" -> classOf[ALSAlgorithm]),
+      classOf[Serving])
+  }
+}
diff --git a/src/main/scala/Preparator.scala b/src/main/scala/Preparator.scala
new file mode 100644
index 0000000..4dd45cf
--- /dev/null
+++ b/src/main/scala/Preparator.scala
@@ -0,0 +1,24 @@
+package org.template.ecommercerecommendation
+
+import io.prediction.controller.PPreparator
+
+import org.apache.spark.SparkContext
+import org.apache.spark.SparkContext._
+import org.apache.spark.rdd.RDD
+
+class Preparator
+  extends PPreparator[TrainingData, PreparedData] {
+
+  def prepare(sc: SparkContext, trainingData: TrainingData): PreparedData = {
+    new PreparedData(
+      users = trainingData.users,
+      items = trainingData.items,
+      viewEvents = trainingData.viewEvents)
+  }
+}
+
+class PreparedData(
+  val users: RDD[(String, User)],
+  val items: RDD[(String, Item)],
+  val viewEvents: RDD[ViewEvent]
+) extends Serializable
diff --git a/src/main/scala/Serving.scala b/src/main/scala/Serving.scala
new file mode 100644
index 0000000..21cf2df
--- /dev/null
+++ b/src/main/scala/Serving.scala
@@ -0,0 +1,13 @@
+package org.template.ecommercerecommendation
+
+import io.prediction.controller.LServing
+
+class Serving
+  extends LServing[Query, PredictedResult] {
+
+  override
+  def serve(query: Query,
+    predictedResults: Seq[PredictedResult]): PredictedResult = {
+    predictedResults.head
+  }
+}