Use local model for faster serving
diff --git a/README.md b/README.md
new file mode 100644
index 0000000..631c094
--- /dev/null
+++ b/README.md
@@ -0,0 +1,73 @@
+# Similar Product Template
+
+## Development Notes
+
+### import sample data
+
+```
+$ python data/import_eventserver.py --access_key <your_access_key>
+```
+
+### query
+
+normal:
+
+```
+curl -H "Content-Type: application/json" \
+-d '{ "items": ["i1", "i3", "i10", "i2", "i5", "i31", "i9"], "num": 10}' \
+http://localhost:8000/queries.json \
+-w %{time_connect}:%{time_starttransfer}:%{time_total}
+```
+
+```
+curl -H "Content-Type: application/json" \
+-d '{
+  "items": ["i1", "i3", "i10", "i2", "i5", "i31", "i9"],
+  "num": 10,
+  "categories" : ["c4", "c3"]
+}' \
+http://localhost:8000/queries.json \
+-w %{time_connect}:%{time_starttransfer}:%{time_total}
+```
+
+```
+curl -H "Content-Type: application/json" \
+-d '{
+  "items": ["i1", "i3", "i10", "i2", "i5", "i31", "i9"],
+  "num": 10,
+  "whiteList": ["i21", "i26", "i40"]
+}' \
+http://localhost:8000/queries.json \
+-w %{time_connect}:%{time_starttransfer}:%{time_total}
+```
+
+```
+curl -H "Content-Type: application/json" \
+-d '{
+  "items": ["i1", "i3", "i10", "i2", "i5", "i31", "i9"],
+  "num": 10,
+  "blackList": ["i21", "i26", "i40"]
+}' \
+http://localhost:8000/queries.json \
+-w %{time_connect}:%{time_starttransfer}:%{time_total}
+```
+
+unknown item:
+
+```
+curl -H "Content-Type: application/json" \
+-d '{ "items": ["unk1", "i3", "i10", "i2", "i5", "i31", "i9"], "num": 10}' \
+http://localhost:8000/queries.json \
+-w %{time_connect}:%{time_starttransfer}:%{time_total}
+```
+
+```
+
+all unknown items:
+
+```
+curl -H "Content-Type: application/json" \
+-d '{ "items": ["unk1", "unk2", "unk3", "unk4"], "num": 10}' \
+http://localhost:8000/queries.json \
+-w %{time_connect}:%{time_starttransfer}:%{time_total}
+```
diff --git a/src/main/scala/ALSAlgorithm.scala b/src/main/scala/ALSAlgorithm.scala
index 52a3ed2..a4dfcbf 100644
--- a/src/main/scala/ALSAlgorithm.scala
+++ b/src/main/scala/ALSAlgorithm.scala
@@ -1,14 +1,11 @@
 package org.template.similarproduct
 
-import io.prediction.controller.PAlgorithm
+import io.prediction.controller.P2LAlgorithm
 import io.prediction.controller.Params
-import io.prediction.controller.IPersistentModel
-import io.prediction.controller.IPersistentModelLoader
 import io.prediction.data.storage.BiMap
 
 import org.apache.spark.SparkContext
 import org.apache.spark.SparkContext._
-import org.apache.spark.rdd.RDD
 import org.apache.spark.mllib.recommendation.ALS
 import org.apache.spark.mllib.recommendation.{Rating => MLlibRating}
 
@@ -23,26 +20,15 @@
   seed: Option[Long]) extends Params
 
 class ALSModel(
-  val productFeatures: RDD[(Int, Array[Double])],
+  val productFeatures: Map[Int, Array[Double]],
   val itemStringIntMap: BiMap[String, Int],
   val items: Map[Int, Item]
-) extends IPersistentModel[ALSAlgorithmParams] with Serializable {
+) extends Serializable {
 
   @transient lazy val itemIntStringMap = itemStringIntMap.inverse
 
-  def save(id: String, params: ALSAlgorithmParams,
-    sc: SparkContext): Boolean = {
-
-    productFeatures.saveAsObjectFile(s"/tmp/${id}/productFeatures")
-    sc.parallelize(Seq(itemStringIntMap))
-      .saveAsObjectFile(s"/tmp/${id}/itemStringIntMap")
-    sc.parallelize(Seq(items))
-      .saveAsObjectFile(s"/tmp/${id}/items")
-    true
-  }
-
   override def toString = {
-    s" productFeatures: [${productFeatures.count()}]" +
+    s" productFeatures: [${productFeatures.size}]" +
     s"(${productFeatures.take(2).toList}...)" +
     s" itemStringIntMap: [${itemStringIntMap.size}]" +
     s"(${itemStringIntMap.take(2).toString}...)]" +
@@ -51,26 +37,11 @@
   }
 }
 
-object ALSModel
-  extends IPersistentModelLoader[ALSAlgorithmParams, ALSModel] {
-  def apply(id: String, params: ALSAlgorithmParams,
-    sc: Option[SparkContext]) = {
-    new ALSModel(
-      productFeatures = sc.get
-        .objectFile(s"/tmp/${id}/productFeatures")
-        .cache(), // persist to memory for fast multiple accesses
-      itemStringIntMap = sc.get
-        .objectFile[BiMap[String, Int]](s"/tmp/${id}/itemStringIntMap").first,
-      items = sc.get
-        .objectFile[Map[Int, Item]](s"/tmp/${id}/items").first)
-  }
-}
-
 /**
   * Use ALS to build item x feature matrix
   */
 class ALSAlgorithm(val ap: ALSAlgorithmParams)
-  extends PAlgorithm[PreparedData, ALSModel, Query, PredictedResult] {
+  extends P2LAlgorithm[PreparedData, ALSModel, Query, PredictedResult] {
 
   @transient lazy val logger = Logger[this.type]
 
@@ -138,7 +109,7 @@
       seed = seed)
 
     new ALSModel(
-      productFeatures = m.productFeatures,
+      productFeatures = m.productFeatures.collectAsMap.toMap,
       itemStringIntMap = itemStringIntMap,
       items = items
     )
@@ -146,17 +117,16 @@
 
   def predict(model: ALSModel, query: Query): PredictedResult = {
 
+    val productFeatures = model.productFeatures
+
     // convert items to Int index
     val queryList: Set[Int] = query.items.map(model.itemStringIntMap.get(_))
       .flatten.toSet
 
-    val queryFeatures: Vector[Array[Double]] = queryList.toVector.par
-      .map { item =>
-        // productFeatures may not contain the requested item
-        val qf: Option[Array[Double]] = model.productFeatures
-          .lookup(item).headOption
-        qf
-      }.seq.flatten
+    val queryFeatures: Vector[Array[Double]] = queryList.toVector
+      // productFeatures may not contain the requested item
+      .map { item => productFeatures.get(item) }
+      .flatten
 
     val whiteList: Option[Set[Int]] = query.whiteList.map( set =>
       set.map(model.itemStringIntMap.get(_)).flatten
@@ -171,14 +141,15 @@
       logger.info(s"No productFeatures vector for query items ${query.items}.")
       Array[(Int, Double)]()
     } else {
-      model.productFeatures
+      productFeatures.par // convert to parallel collection
         .mapValues { f =>
           queryFeatures.map{ qf =>
             cosine(qf, f)
           }.reduce(_ + _)
         }
         .filter(_._2 > 0) // keep items with score > 0
-        .collect()
+        .seq // convert back to sequential collection
+        .toArray
     }
 
     val filteredScore = indexScores.view.filter { case (i, v) =>