Merge branch 'hotfix-0.6.2'
diff --git a/bin/common.sh b/bin/common.sh
index 9d29af8..5f08618 100644
--- a/bin/common.sh
+++ b/bin/common.sh
@@ -2,7 +2,7 @@
 
 # This script should be sourced with $BASE set to the base of the repository
 
-VERSION=0.6.1
+VERSION=0.6.2
 
 # Play framework related
 PLAY_OPTS=
diff --git a/commons/build.sbt b/commons/build.sbt
index eb8b944..8b73e54 100644
--- a/commons/build.sbt
+++ b/commons/build.sbt
@@ -1,6 +1,6 @@
 name := "PredictionIO Commons"
 
-version := "0.6.1"
+version := "0.6.2"
 
 organization := "io.prediction"
 
diff --git a/commons/src/main/scala/io/prediction/commons/appdata/Items.scala b/commons/src/main/scala/io/prediction/commons/appdata/Items.scala
index 3e32178..036252b 100644
--- a/commons/src/main/scala/io/prediction/commons/appdata/Items.scala
+++ b/commons/src/main/scala/io/prediction/commons/appdata/Items.scala
@@ -41,6 +41,9 @@
   /** Find all items by App ID. */
   def getByAppid(appid: Int): Iterator[Item]
 
+  /** Find items by App ID sorted by geolocation distance. */
+  def getByAppidAndLatlng(appid: Int, latlng: Tuple2[Double, Double], within: Option[Double], unit: Option[String]): Iterator[Item]
+
   /** Get items by IDs. */
   def getByIds(appid: Int, ids: Seq[String]): Seq[Item]
 
diff --git a/commons/src/main/scala/io/prediction/commons/appdata/mongodb/MongoItems.scala b/commons/src/main/scala/io/prediction/commons/appdata/mongodb/MongoItems.scala
index 33a1d1a..87454a5 100644
--- a/commons/src/main/scala/io/prediction/commons/appdata/mongodb/MongoItems.scala
+++ b/commons/src/main/scala/io/prediction/commons/appdata/mongodb/MongoItems.scala
@@ -14,7 +14,9 @@
 
   /** Indices and hints. */
   val starttimeIndex = MongoDBObject("starttime" -> -1)
+  val lnglatIndex = MongoDBObject("lnglat" -> "2d")
   itemColl.ensureIndex(starttimeIndex)
+  itemColl.ensureIndex(lnglatIndex)
 
   RegisterJodaTimeConversionHelpers()
 
@@ -41,6 +43,22 @@
 
   def getByAppid(appid: Int) = new MongoItemsIterator(itemColl.find(MongoDBObject("appid" -> appid)))
 
+  def getByAppidAndLatlng(appid: Int, latlng: Tuple2[Double, Double], within: Option[Double], unit: Option[String]) = {
+    val earthRadiusInKm = 6371
+    val earthRadiusInMiles = 3959
+
+    val nearSphereObj = MongoDBObject("$nearSphere" -> MongoDBList(latlng._2, latlng._1))
+    val maxDistObj = within map { maxDist =>
+      unit match {
+        case Some("km") => MongoDBObject("$maxDistance" -> maxDist / earthRadiusInKm)
+        case Some("mi") => MongoDBObject("$maxDistance" -> maxDist / earthRadiusInMiles)
+        case _ => MongoDBObject("$maxDistance" -> maxDist / earthRadiusInKm)
+      }
+    } getOrElse emptyObj
+
+    new MongoItemsIterator(itemColl.find(MongoDBObject("appid" -> appid, "lnglat" -> (nearSphereObj ++ maxDistObj))))
+  }
+
   def getByIds(appid: Int, ids: Seq[String]) = {
     itemColl.find(MongoDBObject("_id" -> MongoDBObject("$in" -> ids.map(idWithAppid(appid, _))))).toList map { dbObjToItem(_) }
   }
diff --git a/commons/src/test/scala/io/prediction/commons/appdata/ItemsSpec.scala b/commons/src/test/scala/io/prediction/commons/appdata/ItemsSpec.scala
index 0aed578..e43f5b5 100644
--- a/commons/src/test/scala/io/prediction/commons/appdata/ItemsSpec.scala
+++ b/commons/src/test/scala/io/prediction/commons/appdata/ItemsSpec.scala
@@ -19,6 +19,7 @@
 
   def items(items: Items) = {                                                 t ^
     "inserting and getting an item"                                           ! insert(items) ^
+    "getting items by App ID and geo data"                                    ! getByAppidAndLatlng(items) ^
     "getting items by IDs"                                                    ! getByIds(items) ^
     "getting items by IDs sorted by start time"                               ! getRecentByIds(items) ^
     "updating an item"                                                        ! update(items) ^
@@ -67,6 +68,65 @@
       (items.get(appid, id2) must beSome(item2))
   }
 
+  def getByAppidAndLatlng(items: Items) = {
+    val id = "getByAppidAndLatlng"
+    val appid = 5
+    val dac = Item(
+      id         = id + "dac",
+      appid      = appid,
+      ct         = DateTime.now,
+      itypes     = List("fresh", "meat"),
+      starttime  = Some(DateTime.now.hour(14).minute(13)),
+      endtime    = None,
+      price      = Some(49.394),
+      profit     = None,
+      latlng     = Some((37.3197611, -122.0466141)),
+      inactive   = None,
+      attributes = Some(Map("foo" -> "bar", "foo2" -> "bar2")))
+    val hsh = Item(
+      id         = id + "hsh",
+      appid      = appid,
+      ct         = DateTime.now,
+      itypes     = List("fresh", "meat"),
+      starttime  = Some(DateTime.now.hour(23).minute(13)),
+      endtime    = None,
+      price      = Some(49.394),
+      profit     = None,
+      latlng     = Some((37.3370801, -122.0493201)),
+      inactive   = None,
+      attributes = None)
+    val mvh = Item(
+      id         = id + "mvh",
+      appid      = appid,
+      ct         = DateTime.now,
+      itypes     = List("fresh", "meat"),
+      starttime  = Some(DateTime.now.hour(17).minute(13)),
+      endtime    = None,
+      price      = Some(49.394),
+      profit     = None,
+      latlng     = Some((37.3154153, -122.0566829)),
+      inactive   = None,
+      attributes = Some(Map("foo3" -> "bar3")))
+    val lbh = Item(
+      id         = id + "lbh",
+      appid      = appid,
+      ct         = DateTime.now,
+      itypes     = List("fresh", "meat"),
+      starttime  = Some(DateTime.now.hour(3).minute(13)),
+      endtime    = None,
+      price      = Some(49.394),
+      profit     = None,
+      latlng     = Some((37.2997029, -122.0034684)),
+      inactive   = None,
+      attributes = Some(Map("foo4" -> "bar4", "foo5" -> "bar5")))
+    val allItems = Seq(dac, hsh, lbh, mvh)
+    allItems foreach { items.insert(_) }
+    (items.getByAppidAndLatlng(appid, (37.336402, -122.040467), None, None).toSeq must beEqualTo(Seq(hsh, dac, mvh, lbh))) and
+      (items.getByAppidAndLatlng(appid, (37.3229978, -122.0321823), None, None).toSeq must beEqualTo(Seq(dac, hsh, mvh, lbh))) and
+      (items.getByAppidAndLatlng(appid, (37.3229978, -122.0321823), Some(2.2), None).toSeq must beEqualTo(Seq(dac, hsh))) and
+      (items.getByAppidAndLatlng(appid, (37.3229978, -122.0321823), Some(2.2), Some("mi")).toSeq must beEqualTo(Seq(dac, hsh, mvh)))
+  }
+
   def getByIds(items: Items) = {
     val id = "getByIds"
     val appid = 4
diff --git a/dist/bin/backup b/dist/bin/backup
index 03c7745..c68b701 100755
--- a/dist/bin/backup
+++ b/dist/bin/backup
@@ -145,8 +145,8 @@
 JARS="${JARS}:${PROG_HOME}/lib/mongo-java-driver-2.11.2.jar"
 JARS="${JARS}:${PROG_HOME}/lib/nscala-time_2.10-0.4.2.jar"
 JARS="${JARS}:${PROG_HOME}/lib/objenesis-1.2.jar"
-JARS="${JARS}:${PROG_HOME}/lib/predictionio-commons_2.10-0.6.1.jar"
-JARS="${JARS}:${PROG_HOME}/lib/predictionio-software-manager_2.10-0.6.1.jar"
+JARS="${JARS}:${PROG_HOME}/lib/predictionio-commons_2.10-0.6.2.jar"
+JARS="${JARS}:${PROG_HOME}/lib/predictionio-software-manager_2.10-0.6.2.jar"
 JARS="${JARS}:${PROG_HOME}/lib/reflectasm-1.07.jar"
 JARS="${JARS}:${PROG_HOME}/lib/scala-library-2.10.2.jar"
 JARS="${JARS}:${PROG_HOME}/lib/scopt_2.10-3.1.0.jar"
diff --git a/dist/bin/common.sh b/dist/bin/common.sh
index 29ff9eb..7f111ed 100644
--- a/dist/bin/common.sh
+++ b/dist/bin/common.sh
@@ -2,7 +2,7 @@
 
 # This script should be sourced with $BASE set to the base of the repository
 
-VERSION=0.6.1
+VERSION=0.6.2
 
 # Play framework related
 PLAY_OPTS=""
diff --git a/dist/bin/conncheck b/dist/bin/conncheck
index ab0ae5d..da695f1 100755
--- a/dist/bin/conncheck
+++ b/dist/bin/conncheck
@@ -144,8 +144,8 @@
 JARS="${JARS}:${PROG_HOME}/lib/mongo-java-driver-2.11.2.jar"
 JARS="${JARS}:${PROG_HOME}/lib/nscala-time_2.10-0.4.2.jar"
 JARS="${JARS}:${PROG_HOME}/lib/objenesis-1.2.jar"
-JARS="${JARS}:${PROG_HOME}/lib/predictionio-commons_2.10-0.6.1.jar"
-JARS="${JARS}:${PROG_HOME}/lib/predictionio-connection-check-tool_2.10-0.6.1.jar"
+JARS="${JARS}:${PROG_HOME}/lib/predictionio-commons_2.10-0.6.2.jar"
+JARS="${JARS}:${PROG_HOME}/lib/predictionio-connection-check-tool_2.10-0.6.2.jar"
 JARS="${JARS}:${PROG_HOME}/lib/reflectasm-1.07.jar"
 JARS="${JARS}:${PROG_HOME}/lib/scala-library-2.10.2.jar"
 JARS="${JARS}:${PROG_HOME}/lib/slf4j-api-1.6.0.jar"
diff --git a/dist/bin/migration/appdata b/dist/bin/migration/appdata
index ffdcea7..8a52a35 100755
--- a/dist/bin/migration/appdata
+++ b/dist/bin/migration/appdata
@@ -144,8 +144,8 @@
 JARS="${JARS}:${PROG_HOME}/../lib/mongo-java-driver-2.11.2.jar"
 JARS="${JARS}:${PROG_HOME}/../lib/nscala-time_2.10-0.2.0.jar"
 JARS="${JARS}:${PROG_HOME}/../lib/objenesis-1.2.jar"
-JARS="${JARS}:${PROG_HOME}/../lib/predictionio-0.4-to-0.5-appdata-migration_2.10-0.6.1.jar"
-JARS="${JARS}:${PROG_HOME}/../lib/predictionio-commons_2.10-0.6.1.jar"
+JARS="${JARS}:${PROG_HOME}/../lib/predictionio-0.4-to-0.5-appdata-migration_2.10-0.6.2.jar"
+JARS="${JARS}:${PROG_HOME}/../lib/predictionio-commons_2.10-0.6.2.jar"
 JARS="${JARS}:${PROG_HOME}/../lib/reflectasm-1.07.jar"
 JARS="${JARS}:${PROG_HOME}/../lib/scala-library-2.10.2.jar"
 JARS="${JARS}:${PROG_HOME}/../lib/slf4j-api-1.6.0.jar"
diff --git a/dist/bin/restore b/dist/bin/restore
index 2b65cc4..7e5c3a7 100755
--- a/dist/bin/restore
+++ b/dist/bin/restore
@@ -145,8 +145,8 @@
 JARS="${JARS}:${PROG_HOME}/lib/mongo-java-driver-2.11.2.jar"
 JARS="${JARS}:${PROG_HOME}/lib/nscala-time_2.10-0.4.2.jar"
 JARS="${JARS}:${PROG_HOME}/lib/objenesis-1.2.jar"
-JARS="${JARS}:${PROG_HOME}/lib/predictionio-commons_2.10-0.6.1.jar"
-JARS="${JARS}:${PROG_HOME}/lib/predictionio-software-manager_2.10-0.6.1.jar"
+JARS="${JARS}:${PROG_HOME}/lib/predictionio-commons_2.10-0.6.2.jar"
+JARS="${JARS}:${PROG_HOME}/lib/predictionio-software-manager_2.10-0.6.2.jar"
 JARS="${JARS}:${PROG_HOME}/lib/reflectasm-1.07.jar"
 JARS="${JARS}:${PROG_HOME}/lib/scala-library-2.10.2.jar"
 JARS="${JARS}:${PROG_HOME}/lib/scopt_2.10-3.1.0.jar"
diff --git a/dist/bin/settingsinit b/dist/bin/settingsinit
index 74a2383..bac2ce2 100755
--- a/dist/bin/settingsinit
+++ b/dist/bin/settingsinit
@@ -144,8 +144,8 @@
 JARS="${JARS}:${PROG_HOME}/lib/mongo-java-driver-2.11.2.jar"
 JARS="${JARS}:${PROG_HOME}/lib/nscala-time_2.10-0.4.2.jar"
 JARS="${JARS}:${PROG_HOME}/lib/objenesis-1.2.jar"
-JARS="${JARS}:${PROG_HOME}/lib/predictionio-commons_2.10-0.6.1.jar"
-JARS="${JARS}:${PROG_HOME}/lib/predictionio-settings-initialization_2.10-0.6.1.jar"
+JARS="${JARS}:${PROG_HOME}/lib/predictionio-commons_2.10-0.6.2.jar"
+JARS="${JARS}:${PROG_HOME}/lib/predictionio-settings-initialization_2.10-0.6.2.jar"
 JARS="${JARS}:${PROG_HOME}/lib/reflectasm-1.07.jar"
 JARS="${JARS}:${PROG_HOME}/lib/scala-library-2.10.2.jar"
 JARS="${JARS}:${PROG_HOME}/lib/slf4j-api-1.6.0.jar"
diff --git a/dist/bin/stop-all.sh b/dist/bin/stop-all.sh
index c343338..a9183d1 100755
--- a/dist/bin/stop-all.sh
+++ b/dist/bin/stop-all.sh
@@ -14,6 +14,7 @@
 BASE=`pwd`
 
 . "$BASE/bin/common.sh"
+. "$BASE/bin/vendors.sh"
 
 # Admin server
 $BASE/bin/stop-admin.sh
@@ -24,5 +25,18 @@
 # Scheduler server
 $BASE/bin/stop-scheduler.sh
 
+# Apache Hadoop
+if vendor_hadoop_exists ; then
+	echo ""
+	while true; do
+		read -p "Found Hadoop in vendors area. Do you want to stop it? [y/n] " yn
+		case $yn in
+			[Yy]* ) stop_hadoop; break;;
+			[Nn]* ) break;;
+			* ) echo "Please answer 'y' or 'n'.";;
+		esac
+	done
+fi
+
 echo ""
-echo "Note: You must stop any running MongoDB/Hadoop processes manually"
+echo "Note: You must stop any running MongoDB processes manually."
diff --git a/dist/bin/updatecheck b/dist/bin/updatecheck
index afd2eec..b432929 100755
--- a/dist/bin/updatecheck
+++ b/dist/bin/updatecheck
@@ -145,8 +145,8 @@
 JARS="${JARS}:${PROG_HOME}/lib/mongo-java-driver-2.11.2.jar"
 JARS="${JARS}:${PROG_HOME}/lib/nscala-time_2.10-0.4.2.jar"
 JARS="${JARS}:${PROG_HOME}/lib/objenesis-1.2.jar"
-JARS="${JARS}:${PROG_HOME}/lib/predictionio-commons_2.10-0.6.1.jar"
-JARS="${JARS}:${PROG_HOME}/lib/predictionio-software-manager_2.10-0.6.1.jar"
+JARS="${JARS}:${PROG_HOME}/lib/predictionio-commons_2.10-0.6.2.jar"
+JARS="${JARS}:${PROG_HOME}/lib/predictionio-software-manager_2.10-0.6.2.jar"
 JARS="${JARS}:${PROG_HOME}/lib/reflectasm-1.07.jar"
 JARS="${JARS}:${PROG_HOME}/lib/scala-library-2.10.2.jar"
 JARS="${JARS}:${PROG_HOME}/lib/scopt_2.10-3.1.0.jar"
diff --git a/dist/bin/upgrade b/dist/bin/upgrade
index e5a4ec8..4d76f3a 100755
--- a/dist/bin/upgrade
+++ b/dist/bin/upgrade
@@ -145,8 +145,8 @@
 JARS="${JARS}:${PROG_HOME}/lib/mongo-java-driver-2.11.2.jar"
 JARS="${JARS}:${PROG_HOME}/lib/nscala-time_2.10-0.4.2.jar"
 JARS="${JARS}:${PROG_HOME}/lib/objenesis-1.2.jar"
-JARS="${JARS}:${PROG_HOME}/lib/predictionio-commons_2.10-0.6.1.jar"
-JARS="${JARS}:${PROG_HOME}/lib/predictionio-software-manager_2.10-0.6.1.jar"
+JARS="${JARS}:${PROG_HOME}/lib/predictionio-commons_2.10-0.6.2.jar"
+JARS="${JARS}:${PROG_HOME}/lib/predictionio-software-manager_2.10-0.6.2.jar"
 JARS="${JARS}:${PROG_HOME}/lib/reflectasm-1.07.jar"
 JARS="${JARS}:${PROG_HOME}/lib/scala-library-2.10.2.jar"
 JARS="${JARS}:${PROG_HOME}/lib/scopt_2.10-3.1.0.jar"
diff --git a/dist/bin/users b/dist/bin/users
index 74ae1ee..3d1adac 100755
--- a/dist/bin/users
+++ b/dist/bin/users
@@ -145,8 +145,8 @@
 JARS="${JARS}:${PROG_HOME}/lib/mongo-java-driver-2.11.2.jar"
 JARS="${JARS}:${PROG_HOME}/lib/nscala-time_2.10-0.4.2.jar"
 JARS="${JARS}:${PROG_HOME}/lib/objenesis-1.2.jar"
-JARS="${JARS}:${PROG_HOME}/lib/predictionio-commons_2.10-0.6.1.jar"
-JARS="${JARS}:${PROG_HOME}/lib/predictionio-users-tool_2.10-0.6.1.jar"
+JARS="${JARS}:${PROG_HOME}/lib/predictionio-commons_2.10-0.6.2.jar"
+JARS="${JARS}:${PROG_HOME}/lib/predictionio-users-tool_2.10-0.6.2.jar"
 JARS="${JARS}:${PROG_HOME}/lib/reflectasm-1.07.jar"
 JARS="${JARS}:${PROG_HOME}/lib/scala-library-2.10.2.jar"
 JARS="${JARS}:${PROG_HOME}/lib/slf4j-api-1.6.0.jar"
diff --git a/dist/bin/vendors.sh b/dist/bin/vendors.sh
index 28d16fd..7706929 100644
--- a/dist/bin/vendors.sh
+++ b/dist/bin/vendors.sh
@@ -65,6 +65,11 @@
 	$VENDOR_HADOOP_PATH/bin/start-all.sh
 }
 
+stop_hadoop () {
+	echo "Going to stop Hadoop..."
+	$VENDOR_HADOOP_PATH/bin/stop-all.sh
+}
+
 vendor_mongodb_exists () {
 	[ -e "$VENDOR_MONGODB_PATH/bin/mongod" ]
 }
diff --git a/dist/conf/init.json b/dist/conf/init.json
index a9da410..b185967 100644
--- a/dist/conf/init.json
+++ b/dist/conf/init.json
@@ -1,7 +1,7 @@
 {
     "systeminfos": {
         "version": {
-            "value": "0.6.1",
+            "value": "0.6.2",
             "description": "PredictionIO version"
         }
     },
@@ -44,7 +44,7 @@
             }
         },
         "itemsim": {
-            "name": "Item Similarity Prediction Engine",
+            "name": "Item Similarity Engine",
             "description": "Discover similar items",
             "defaultalgoinfoid": "mahout-itemsimcf",
             "defaultsettings": {
diff --git a/dist/conf/predictionio.conf b/dist/conf/predictionio.conf
index 50b2a85..0811efa 100644
--- a/dist/conf/predictionio.conf
+++ b/dist/conf/predictionio.conf
@@ -75,36 +75,36 @@
 io.prediction.commons.modeldata.training.db.port=27017
 
 # PredictionIO Algorithms
-pdio-knnitembased.jar=${io.prediction.base}/lib/PredictionIO-Process-ItemRec-Algorithms-Hadoop-Scalding-assembly-0.6.1.jar
-pdio-latestrank.jar=${io.prediction.base}/lib/PredictionIO-Process-ItemRec-Algorithms-Hadoop-Scalding-assembly-0.6.1.jar
-pdio-randomrank.jar=${io.prediction.base}/lib/PredictionIO-Process-ItemRec-Algorithms-Hadoop-Scalding-assembly-0.6.1.jar
-mahout-itembased.jar=${io.prediction.base}/lib/PredictionIO-Process-ItemRec-Algorithms-Hadoop-Scalding-assembly-0.6.1.jar
-mahout-parallelals.jar=${io.prediction.base}/lib/PredictionIO-Process-ItemRec-Algorithms-Hadoop-Scalding-assembly-0.6.1.jar
-mahout-knnuserbased.jar=${io.prediction.base}/lib/PredictionIO-Process-ItemRec-Algorithms-Hadoop-Scalding-assembly-0.6.1.jar
-mahout-thresholduserbased.jar=${io.prediction.base}/lib/PredictionIO-Process-ItemRec-Algorithms-Hadoop-Scalding-assembly-0.6.1.jar
-mahout-slopeone.jar=${io.prediction.base}/lib/PredictionIO-Process-ItemRec-Algorithms-Hadoop-Scalding-assembly-0.6.1.jar
-mahout-alswr.jar=${io.prediction.base}/lib/PredictionIO-Process-ItemRec-Algorithms-Hadoop-Scalding-assembly-0.6.1.jar
-mahout-svdsgd.jar=${io.prediction.base}/lib/PredictionIO-Process-ItemRec-Algorithms-Hadoop-Scalding-assembly-0.6.1.jar
-mahout-svdplusplus.jar=${io.prediction.base}/lib/PredictionIO-Process-ItemRec-Algorithms-Hadoop-Scalding-assembly-0.6.1.jar
+pdio-knnitembased.jar=${io.prediction.base}/lib/PredictionIO-Process-ItemRec-Algorithms-Hadoop-Scalding-assembly-0.6.2.jar
+pdio-latestrank.jar=${io.prediction.base}/lib/PredictionIO-Process-ItemRec-Algorithms-Hadoop-Scalding-assembly-0.6.2.jar
+pdio-randomrank.jar=${io.prediction.base}/lib/PredictionIO-Process-ItemRec-Algorithms-Hadoop-Scalding-assembly-0.6.2.jar
+mahout-itembased.jar=${io.prediction.base}/lib/PredictionIO-Process-ItemRec-Algorithms-Hadoop-Scalding-assembly-0.6.2.jar
+mahout-parallelals.jar=${io.prediction.base}/lib/PredictionIO-Process-ItemRec-Algorithms-Hadoop-Scalding-assembly-0.6.2.jar
+mahout-knnuserbased.jar=${io.prediction.base}/lib/PredictionIO-Process-ItemRec-Algorithms-Hadoop-Scalding-assembly-0.6.2.jar
+mahout-thresholduserbased.jar=${io.prediction.base}/lib/PredictionIO-Process-ItemRec-Algorithms-Hadoop-Scalding-assembly-0.6.2.jar
+mahout-slopeone.jar=${io.prediction.base}/lib/PredictionIO-Process-ItemRec-Algorithms-Hadoop-Scalding-assembly-0.6.2.jar
+mahout-alswr.jar=${io.prediction.base}/lib/PredictionIO-Process-ItemRec-Algorithms-Hadoop-Scalding-assembly-0.6.2.jar
+mahout-svdsgd.jar=${io.prediction.base}/lib/PredictionIO-Process-ItemRec-Algorithms-Hadoop-Scalding-assembly-0.6.2.jar
+mahout-svdplusplus.jar=${io.prediction.base}/lib/PredictionIO-Process-ItemRec-Algorithms-Hadoop-Scalding-assembly-0.6.2.jar
 
-pdio-itemsimcf.jar=${io.prediction.base}/lib/PredictionIO-Process-ItemSim-Algorithms-Hadoop-Scalding-assembly-0.6.1.jar
-pdio-itemsimlatestrank.jar=${io.prediction.base}/lib/PredictionIO-Process-ItemSim-Algorithms-Hadoop-Scalding-assembly-0.6.1.jar
-pdio-itemsimrandomrank.jar=${io.prediction.base}/lib/PredictionIO-Process-ItemSim-Algorithms-Hadoop-Scalding-assembly-0.6.1.jar
-mahout-itemsimcf.jar=${io.prediction.base}/lib/PredictionIO-Process-ItemSim-Algorithms-Hadoop-Scalding-assembly-0.6.1.jar
+pdio-itemsimcf.jar=${io.prediction.base}/lib/PredictionIO-Process-ItemSim-Algorithms-Hadoop-Scalding-assembly-0.6.2.jar
+pdio-itemsimlatestrank.jar=${io.prediction.base}/lib/PredictionIO-Process-ItemSim-Algorithms-Hadoop-Scalding-assembly-0.6.2.jar
+pdio-itemsimrandomrank.jar=${io.prediction.base}/lib/PredictionIO-Process-ItemSim-Algorithms-Hadoop-Scalding-assembly-0.6.2.jar
+mahout-itemsimcf.jar=${io.prediction.base}/lib/PredictionIO-Process-ItemSim-Algorithms-Hadoop-Scalding-assembly-0.6.2.jar
 
 # PredictionIO generic scalding job
-io.prediction.algorithms.scalding.itemrec.generic.jar=${io.prediction.base}/lib/PredictionIO-Process-ItemRec-Algorithms-Hadoop-Scalding-assembly-0.6.1.jar
+io.prediction.algorithms.scalding.itemrec.generic.jar=${io.prediction.base}/lib/PredictionIO-Process-ItemRec-Algorithms-Hadoop-Scalding-assembly-0.6.2.jar
 
 # Itemrec Scala Mahout Algorithms
-io.prediction.algorithms.mahout.itemrec.jar=${io.prediction.base}/lib/PredictionIO-Process-ItemRec-Algorithms-Scala-Mahout-assembly-0.6.1.jar
+io.prediction.algorithms.mahout.itemrec.jar=${io.prediction.base}/lib/PredictionIO-Process-ItemRec-Algorithms-Scala-Mahout-assembly-0.6.2.jar
 
 # Mahout core job
 io.prediction.algorithms.mahout-core-job.jar=${io.prediction.base}/vendors/mahout-distribution-0.8/mahout-core-0.8-job.jar
 
 # PredictionIO Offline Evaluation
-io.prediction.evaluations.scalding.itemrec.jar=${io.prediction.base}/lib/PredictionIO-Process-ItemRec-Evaluations-Hadoop-Scalding-assembly-0.6.1.jar
-io.prediction.evaluations.scalding.itemsim.jar=${io.prediction.base}/lib/PredictionIO-Process-ItemSim-Evaluations-Hadoop-Scalding-assembly-0.6.1.jar
-io.prediction.evaluations.itemrec.topkitems.jar=${io.prediction.base}/lib/PredictionIO-Process-ItemRec-Evaluations-TopKItems-assembly-0.6.1.jar
-io.prediction.evaluations.itemrec.trainingtestsplit.jar=${io.prediction.base}/lib/PredictionIO-Process-ItemRec-Evaluations-Scala-TrainingTestSplitTime-assembly-0.6.1.jar
-io.prediction.evaluations.itemrec.paramgen.jar=${io.prediction.base}/lib/PredictionIO-Process-ItemRec-Evaluations-ParamGen-assembly-0.6.1.jar
-io.prediction.evaluations.itemsim.topkitems.jar=${io.prediction.base}/lib/PredictionIO-Process-ItemSim-Evaluations-TopKItems-assembly-0.6.1.jar
+io.prediction.evaluations.scalding.itemrec.jar=${io.prediction.base}/lib/PredictionIO-Process-ItemRec-Evaluations-Hadoop-Scalding-assembly-0.6.2.jar
+io.prediction.evaluations.scalding.itemsim.jar=${io.prediction.base}/lib/PredictionIO-Process-ItemSim-Evaluations-Hadoop-Scalding-assembly-0.6.2.jar
+io.prediction.evaluations.itemrec.topkitems.jar=${io.prediction.base}/lib/PredictionIO-Process-ItemRec-Evaluations-TopKItems-assembly-0.6.2.jar
+io.prediction.evaluations.itemrec.trainingtestsplit.jar=${io.prediction.base}/lib/PredictionIO-Process-ItemRec-Evaluations-Scala-TrainingTestSplitTime-assembly-0.6.2.jar
+io.prediction.evaluations.itemrec.paramgen.jar=${io.prediction.base}/lib/PredictionIO-Process-ItemRec-Evaluations-ParamGen-assembly-0.6.2.jar
+io.prediction.evaluations.itemsim.topkitems.jar=${io.prediction.base}/lib/PredictionIO-Process-ItemSim-Evaluations-TopKItems-assembly-0.6.2.jar
diff --git a/output/build.sbt b/output/build.sbt
index fb86464..5a7e9dc 100644
--- a/output/build.sbt
+++ b/output/build.sbt
@@ -1,6 +1,6 @@
 name := "PredictionIO Output"
 
-version := "0.6.1"
+version := "0.6.2"
 
 organization := "io.prediction"
 
@@ -9,7 +9,7 @@
 scalacOptions in (Compile, doc) ++= Opts.doc.title("PredictionIO Output API Documentation")
 
 libraryDependencies ++= Seq(
-  "io.prediction" %% "predictionio-commons" % "0.6.1",
+  "io.prediction" %% "predictionio-commons" % "0.6.2",
   "com.github.nscala-time" %% "nscala-time" % "0.4.2",
   "org.specs2" %% "specs2" % "1.14" % "test"
 )
diff --git a/output/src/main/scala/io/prediction/output/AlgoOutputSelector.scala b/output/src/main/scala/io/prediction/output/AlgoOutputSelector.scala
index 9760742..1c1d87d 100644
--- a/output/src/main/scala/io/prediction/output/AlgoOutputSelector.scala
+++ b/output/src/main/scala/io/prediction/output/AlgoOutputSelector.scala
@@ -5,10 +5,10 @@
 class AlgoOutputSelector(algos: Algos) {
   val multipleAlgoErrorMsg = "Deploying multiple algorithms is not yet supported. No results can be returned."
 
-  def itemRecSelection(uid: String, n: Int, itypes: Option[Seq[String]])(implicit app: App, engine: Engine): Seq[String] = {
+  def itemRecSelection(uid: String, n: Int, itypes: Option[Seq[String]], latlng: Option[Tuple2[Double, Double]], within: Option[Double], unit: Option[String])(implicit app: App, engine: Engine): Seq[String] = {
     implicit val algo = itemRecAlgoSelection(engine)
 
-    itemrec.ItemRecAlgoOutput.output(uid, n, itypes)
+    itemrec.ItemRecAlgoOutput.output(uid, n, itypes, latlng, within, unit)
   }
 
   def itemRecAlgoSelection(engine: Engine): Algo = {
@@ -27,10 +27,10 @@
     algo
   }
 
-  def itemSimSelection(iid: String, n: Int, itypes: Option[Seq[String]])(implicit app: App, engine: Engine): Seq[String] = {
+  def itemSimSelection(iid: String, n: Int, itypes: Option[Seq[String]], latlng: Option[Tuple2[Double, Double]], within: Option[Double], unit: Option[String])(implicit app: App, engine: Engine): Seq[String] = {
     implicit val algo = itemSimAlgoSelection(engine)
 
-    itemsim.ItemSimAlgoOutput.output(iid, n, itypes)
+    itemsim.ItemSimAlgoOutput.output(iid, n, itypes, latlng, within, unit)
   }
 
   def itemSimAlgoSelection(engine: Engine): Algo = {
diff --git a/output/src/main/scala/io/prediction/output/itemrec/ItemRecAlgoOutput.scala b/output/src/main/scala/io/prediction/output/itemrec/ItemRecAlgoOutput.scala
index 9d85904..39a06fc 100644
--- a/output/src/main/scala/io/prediction/output/itemrec/ItemRecAlgoOutput.scala
+++ b/output/src/main/scala/io/prediction/output/itemrec/ItemRecAlgoOutput.scala
@@ -14,7 +14,7 @@
   val config = new Config
   val items = config.getAppdataItems
 
-  def output(uid: String, n: Int, itypes: Option[Seq[String]])(implicit app: App, engine: Engine, algo: Algo, offlineEval: Option[OfflineEval] = None): Seq[String] = {
+  def output(uid: String, n: Int, itypes: Option[Seq[String]], latlng: Option[Tuple2[Double, Double]], within: Option[Double], unit: Option[String])(implicit app: App, engine: Engine, algo: Algo, offlineEval: Option[OfflineEval] = None): Seq[String] = {
     /** Serendipity settings. */
     val serendipity = engine.settings.get("serendipity") map { _.asInstanceOf[Int] }
 
@@ -23,8 +23,36 @@
       */
     val finalN = serendipity map { s => n*(s+1) } getOrElse n
 
+    /** At the moment, PredictionIO depends only on MongoDB for its model data storage.
+      * Since we are still using the legacy longitude-latitude format, the maximum number
+      * of documents that can be returned from a query with geospatial constraint is 100.
+      * A "manual join" is still feasible with this size.
+      */
+    val outputBuffer = collection.mutable.ListBuffer[String]()
+
+    latlng map { ll =>
+      val geoItems = items.getByAppidAndLatlng(app.id, ll, within, unit).map(_.id).toSet
+      var stopMore = false
+      var after: Option[ItemRecScore] = None
+
+      while (outputBuffer.length < finalN && !stopMore) {
+        val moreItemRecScores = more(uid, finalN, itypes, after)
+        val moreIids = moreItemRecScores.map(_.iid).toSeq
+
+        /** Stop the loop if no more scores can be found. */
+        if (moreItemRecScores.length == 0)
+          stopMore = true
+        else {
+          outputBuffer ++= moreIids filter { geoItems(_) }
+          after = Some(moreItemRecScores.last)
+        }
+      }
+    } getOrElse {
+      outputBuffer ++= more(uid, finalN, itypes, None) map { _.iid }
+    }
+
     /** At this point "output" is guaranteed to have n*(s+1) items (seen or unseen) unless model data is exhausted. */
-    val output = more(uid, finalN, itypes, None) map { _.iid }
+    val output = outputBuffer.toSeq.take(finalN)
 
     /** Serendipity output. */
     val serendipityOutput = serendipity map { s =>
diff --git a/output/src/main/scala/io/prediction/output/itemsim/ItemSimAlgoOutput.scala b/output/src/main/scala/io/prediction/output/itemsim/ItemSimAlgoOutput.scala
index 1db7b3f..c2fd159 100644
--- a/output/src/main/scala/io/prediction/output/itemsim/ItemSimAlgoOutput.scala
+++ b/output/src/main/scala/io/prediction/output/itemsim/ItemSimAlgoOutput.scala
@@ -14,7 +14,7 @@
   val config = new Config
   val items = config.getAppdataItems
 
-  def output(iid: String, n: Int, itypes: Option[Seq[String]])(implicit app: App, engine: Engine, algo: Algo, offlineEval: Option[OfflineEval] = None): Seq[String] = {
+  def output(iid: String, n: Int, itypes: Option[Seq[String]], latlng: Option[Tuple2[Double, Double]], within: Option[Double], unit: Option[String])(implicit app: App, engine: Engine, algo: Algo, offlineEval: Option[OfflineEval] = None): Seq[String] = {
     /** Serendipity settings. */
     val serendipity = engine.settings.get("serendipity") map { _.asInstanceOf[Int] }
 
@@ -23,8 +23,36 @@
       */
     val finalN = serendipity map { s => n*(s+1) } getOrElse n
 
+    /** At the moment, PredictionIO depends only on MongoDB for its model data storage.
+      * Since we are still using the legacy longitude-latitude format, the maximum number
+      * of documents that can be returned from a query with geospatial constraint is 100.
+      * A "manual join" is still feasible with this size.
+      */
+    val outputBuffer = collection.mutable.ListBuffer[String]()
+
+    latlng map { ll =>
+      val geoItems = items.getByAppidAndLatlng(app.id, ll, within, unit).map(_.id).toSet
+      var stopMore = false
+      var after: Option[ItemSimScore] = None
+
+      while (outputBuffer.length < finalN && !stopMore) {
+        val moreItemSimScores = more(iid, finalN, itypes, after)
+        val moreSimiids = moreItemSimScores.map(_.simiid).toSeq
+
+        /** Stop the loop if no more scores can be found. */
+        if (moreItemSimScores.length == 0)
+          stopMore = true
+        else {
+          outputBuffer ++= moreSimiids filter { geoItems(_) }
+          after = Some(moreItemSimScores.last)
+        }
+      }
+    } getOrElse {
+      outputBuffer ++= more(iid, finalN, itypes, None) map { _.simiid }
+    }
+
     /** At this point "output" is guaranteed to have n*(s+1) items (seen or unseen) unless model data is exhausted. */
-    val output = more(iid, finalN, itypes, None) map { _.simiid }
+    val output = outputBuffer.toSeq.take(finalN)
 
     /** Serendipity output. */
     val serendipityOutput = serendipity map { s =>
diff --git a/output/src/test/scala/io/prediction/output/AlgoOutputSelectorSpec.scala b/output/src/test/scala/io/prediction/output/AlgoOutputSelectorSpec.scala
index c97388c..720f6e1 100644
--- a/output/src/test/scala/io/prediction/output/AlgoOutputSelectorSpec.scala
+++ b/output/src/test/scala/io/prediction/output/AlgoOutputSelectorSpec.scala
@@ -17,11 +17,13 @@
   "PredictionIO AlgoOutputSelector Specification"                             ^
                                                                               p ^
     "get itemrec output from a valid engine"                                  ! itemRecOutputSelection(algoOutputSelector) ^
+    "get itemrec output with geo from a valid engine"                         ! itemRecOutputSelectionWithLatlng(algoOutputSelector) ^
     //"get itemrec output from a valid engine without seen items"               ! itemRecOutputSelectionUnseenOnly(algoOutputSelector) ^
     //"get itemrec output from a valid engine with an unsupported algorithm"    ! itemRecOutputSelectionUnsupportedAlgo(algoOutputSelector) ^
     "get itemrec output from a valid engine with no algorithm"                ! itemRecOutputSelectionNoAlgo(algoOutputSelector) ^
     "get itemrec output from an invalid engine"                               ! itemRecOutputSelectionBadEngine(algoOutputSelector) ^
     "get itemsim output from a valid engine"                                  ! itemSimOutputSelection(algoOutputSelector) ^
+    "get itemsim output with geo from a valid engine"                         ! itemSimOutputSelectionWithLatlng(algoOutputSelector) ^
     //"get itemsim output from a valid engine with an unsupported algorithm"    ! itemSimOutputSelectionUnsupportedAlgo(algoOutputSelector) ^
     "get itemsim output from a valid engine with no algorithm"                ! itemSimOutputSelectionNoAlgo(algoOutputSelector) ^
     "get itemsim output from an invalid engine"                               ! itemSimOutputSelectionBadEngine(algoOutputSelector) ^
@@ -32,6 +34,7 @@
   val mongoDb = MongoConnection()(mongoDbName)
   val mongoEngines = new MongoEngines(mongoDb)
   val mongoAlgos = new MongoAlgos(mongoDb)
+  val mongoItems = new MongoItems(mongoDb)
   val mongoU2IActions = new MongoU2IActions(mongoDb)
   val mongoItemRecScores = new MongoItemRecScores(mongoDb)
   val mongoItemSimScores = new MongoItemSimScores(mongoDb)
@@ -106,7 +109,131 @@
       modelset = true
     ))
 
-    algoOutputSelector.itemRecSelection("user1", 10, Some(Seq("bar", "foo")))(dummyApp, engine.copy(id = engineid)) must beEqualTo(Seq("item_x", "item_y"))
+    algoOutputSelector.itemRecSelection("user1", 10, Some(Seq("bar", "foo")), None, None, None)(dummyApp, engine.copy(id = engineid)) must beEqualTo(Seq("item_x", "item_y"))
+  }
+
+  def itemRecOutputSelectionWithLatlng(algoOutputSelector: AlgoOutputSelector) = {
+    val appid = dummyApp.id
+    val engine = Engine(
+      id       = 0,
+      appid    = appid,
+      name     = "itemRecOutputSelectionWithLatlng",
+      infoid   = "itemrec",
+      itypes   = Some(Seq("foo", "bar")),
+      settings = Map()
+    )
+    val engineid = mongoEngines.insert(engine)
+
+    val algo = Algo(
+      id       = 0,
+      engineid = engineid,
+      name     = "itemRecOutputSelectionWithLatlng",
+      infoid   = "pdio-knnitembased",
+      command  = "itemRecOutputSelectionWithLatlng",
+      params   = Map("foo" -> "bar"),
+      settings = Map("dead" -> "beef"),
+      modelset = true,
+      createtime = DateTime.now,
+      updatetime = DateTime.now,
+      status = "deployed",
+      offlineevalid = None
+    )
+    val algoid = mongoAlgos.insert(algo)
+
+    val id = "itemRecOutputSelectionWithLatlng"
+
+    val dac = Item(
+      id         = id + "dac",
+      appid      = appid,
+      ct         = DateTime.now,
+      itypes     = List("fresh", "meat"),
+      starttime  = Some(DateTime.now.hour(14).minute(13)),
+      endtime    = None,
+      price      = Some(49.394),
+      profit     = None,
+      latlng     = Some((37.3197611, -122.0466141)),
+      inactive   = None,
+      attributes = Some(Map("foo" -> "bar", "foo2" -> "bar2")))
+    val hsh = Item(
+      id         = id + "hsh",
+      appid      = appid,
+      ct         = DateTime.now,
+      itypes     = List("fresh", "meat"),
+      starttime  = Some(DateTime.now.hour(23).minute(13)),
+      endtime    = None,
+      price      = Some(49.394),
+      profit     = None,
+      latlng     = Some((37.3370801, -122.0493201)),
+      inactive   = None,
+      attributes = None)
+    val mvh = Item(
+      id         = id + "mvh",
+      appid      = appid,
+      ct         = DateTime.now,
+      itypes     = List("fresh", "meat"),
+      starttime  = Some(DateTime.now.hour(17).minute(13)),
+      endtime    = None,
+      price      = Some(49.394),
+      profit     = None,
+      latlng     = Some((37.3154153, -122.0566829)),
+      inactive   = None,
+      attributes = Some(Map("foo3" -> "bar3")))
+    val lbh = Item(
+      id         = id + "lbh",
+      appid      = appid,
+      ct         = DateTime.now,
+      itypes     = List("fresh", "meat"),
+      starttime  = Some(DateTime.now.hour(3).minute(13)),
+      endtime    = None,
+      price      = Some(49.394),
+      profit     = None,
+      latlng     = Some((37.2997029, -122.0034684)),
+      inactive   = None,
+      attributes = Some(Map("foo4" -> "bar4", "foo5" -> "bar5")))
+    val allItems = Seq(dac, hsh, lbh, mvh)
+    allItems foreach { mongoItems.insert(_) }
+
+    mongoItemRecScores.insert(ItemRecScore(
+      uid = "user1",
+      iid = id + "dac",
+      score = 1,
+      itypes = Seq("bar"),
+      appid = dummyApp.id,
+      algoid = algoid,
+      modelset = true
+    ))
+
+    mongoItemRecScores.insert(ItemRecScore(
+      uid = "user1",
+      iid = id + "hsh",
+      score = 4,
+      itypes = Seq("foo"),
+      appid = dummyApp.id,
+      algoid = algoid,
+      modelset = true
+    ))
+
+    mongoItemRecScores.insert(ItemRecScore(
+      uid = "user1",
+      iid = id + "mvh",
+      score = 3,
+      itypes = Seq("unrelated"),
+      appid = dummyApp.id,
+      algoid = algoid,
+      modelset = true
+    ))
+
+    mongoItemRecScores.insert(ItemRecScore(
+      uid = "user1",
+      iid = id + "lbh",
+      score = 2,
+      itypes = Seq("unrelated"),
+      appid = dummyApp.id,
+      algoid = algoid,
+      modelset = true
+    ))
+
+    algoOutputSelector.itemRecSelection("user1", 10, None, Some((37.3229978, -122.0321823)), Some(2.2), None)(dummyApp, engine.copy(id = engineid)) must beEqualTo(Seq(id + "hsh", id + "dac"))
   }
 
   def itemRecOutputSelectionUnseenOnly(algoOutputSelector: AlgoOutputSelector) = {
@@ -217,7 +344,7 @@
       price = None
     ))
 
-    algoOutputSelector.itemRecSelection("user1", 5, Some(Seq("bar", "foo")))(dummyApp, engine.copy(id = engineid)) must beEqualTo(Seq("item_x", "item_y", "item_z", "item_c", "item_a"))
+    algoOutputSelector.itemRecSelection("user1", 5, Some(Seq("bar", "foo")), None, None, None)(dummyApp, engine.copy(id = engineid)) must beEqualTo(Seq("item_x", "item_y", "item_z", "item_c", "item_a"))
   }
 
   def itemRecOutputSelectionUnsupportedAlgo(algoOutputSelector: AlgoOutputSelector) = {
@@ -247,7 +374,7 @@
     )
     val algoid = mongoAlgos.insert(algo)
 
-    algoOutputSelector.itemRecSelection("", 10, None)(dummyApp, engine.copy(id = engineid)) must throwA[RuntimeException]
+    algoOutputSelector.itemRecSelection("", 10, None, None, None, None)(dummyApp, engine.copy(id = engineid)) must throwA[RuntimeException]
   }
 
   def itemRecOutputSelectionNoAlgo(algoOutputSelector: AlgoOutputSelector) = {
@@ -260,7 +387,7 @@
       settings = Map()
     )
     val engineid = mongoEngines.insert(engine)
-    algoOutputSelector.itemRecSelection("", 10, None)(dummyApp, engine.copy(id = engineid)) must throwA[RuntimeException]
+    algoOutputSelector.itemRecSelection("", 10, None, None, None, None)(dummyApp, engine.copy(id = engineid)) must throwA[RuntimeException]
   }
 
   def itemRecOutputSelectionBadEngine(algoOutputSelector: AlgoOutputSelector) = {
@@ -273,7 +400,7 @@
       settings = Map()
     )
     val engineid = mongoEngines.insert(engine)
-    algoOutputSelector.itemRecSelection("", 10, None)(dummyApp, engine.copy(id = engineid)) must throwA[RuntimeException]
+    algoOutputSelector.itemRecSelection("", 10, None, None, None, None)(dummyApp, engine.copy(id = engineid)) must throwA[RuntimeException]
   }
 
   /** ItemSim engine. */
@@ -334,7 +461,131 @@
       modelset = true
     ))
 
-    algoOutputSelector.itemSimSelection("user1", 10, Some(Seq("bar", "foo")))(dummyApp, engine.copy(id = engineid)) must beEqualTo(Seq("item_x", "item_y"))
+    algoOutputSelector.itemSimSelection("user1", 10, Some(Seq("bar", "foo")), None, None, None)(dummyApp, engine.copy(id = engineid)) must beEqualTo(Seq("item_x", "item_y"))
+  }
+
+  def itemSimOutputSelectionWithLatlng(algoOutputSelector: AlgoOutputSelector) = {
+    val appid = dummyApp.id
+    val engine = Engine(
+      id       = 0,
+      appid    = appid,
+      name     = "itemSimOutputSelectionWithLatlng",
+      infoid   = "itemsim",
+      itypes   = Some(Seq("foo", "bar")),
+      settings = Map()
+    )
+    val engineid = mongoEngines.insert(engine)
+
+    val algo = Algo(
+      id       = 0,
+      engineid = engineid,
+      name     = "itemRecOutputSelectionWithLatlng",
+      infoid   = "pdio-knnitembased",
+      command  = "itemRecOutputSelectionWithLatlng",
+      params   = Map("foo" -> "bar"),
+      settings = Map("dead" -> "beef"),
+      modelset = true,
+      createtime = DateTime.now,
+      updatetime = DateTime.now,
+      status = "deployed",
+      offlineevalid = None
+    )
+    val algoid = mongoAlgos.insert(algo)
+
+    val id = "itemRecOutputSelectionWithLatlng"
+
+    val dac = Item(
+      id         = id + "dac",
+      appid      = appid,
+      ct         = DateTime.now,
+      itypes     = List("fresh", "meat"),
+      starttime  = Some(DateTime.now.hour(14).minute(13)),
+      endtime    = None,
+      price      = Some(49.394),
+      profit     = None,
+      latlng     = Some((37.3197611, -122.0466141)),
+      inactive   = None,
+      attributes = Some(Map("foo" -> "bar", "foo2" -> "bar2")))
+    val hsh = Item(
+      id         = id + "hsh",
+      appid      = appid,
+      ct         = DateTime.now,
+      itypes     = List("fresh", "meat"),
+      starttime  = Some(DateTime.now.hour(23).minute(13)),
+      endtime    = None,
+      price      = Some(49.394),
+      profit     = None,
+      latlng     = Some((37.3370801, -122.0493201)),
+      inactive   = None,
+      attributes = None)
+    val mvh = Item(
+      id         = id + "mvh",
+      appid      = appid,
+      ct         = DateTime.now,
+      itypes     = List("fresh", "meat"),
+      starttime  = Some(DateTime.now.hour(17).minute(13)),
+      endtime    = None,
+      price      = Some(49.394),
+      profit     = None,
+      latlng     = Some((37.3154153, -122.0566829)),
+      inactive   = None,
+      attributes = Some(Map("foo3" -> "bar3")))
+    val lbh = Item(
+      id         = id + "lbh",
+      appid      = appid,
+      ct         = DateTime.now,
+      itypes     = List("fresh", "meat"),
+      starttime  = Some(DateTime.now.hour(3).minute(13)),
+      endtime    = None,
+      price      = Some(49.394),
+      profit     = None,
+      latlng     = Some((37.2997029, -122.0034684)),
+      inactive   = None,
+      attributes = Some(Map("foo4" -> "bar4", "foo5" -> "bar5")))
+    val allItems = Seq(dac, hsh, lbh, mvh)
+    allItems foreach { mongoItems.insert(_) }
+
+    mongoItemSimScores.insert(ItemSimScore(
+      iid = "user1",
+      simiid = id + "dac",
+      score = 1,
+      itypes = Seq("bar"),
+      appid = dummyApp.id,
+      algoid = algoid,
+      modelset = true
+    ))
+
+    mongoItemSimScores.insert(ItemSimScore(
+      iid = "user1",
+      simiid = id + "hsh",
+      score = 4,
+      itypes = Seq("foo"),
+      appid = dummyApp.id,
+      algoid = algoid,
+      modelset = true
+    ))
+
+    mongoItemSimScores.insert(ItemSimScore(
+      iid = "user1",
+      simiid = id + "mvh",
+      score = 3,
+      itypes = Seq("unrelated"),
+      appid = dummyApp.id,
+      algoid = algoid,
+      modelset = true
+    ))
+
+    mongoItemSimScores.insert(ItemSimScore(
+      iid = "user1",
+      simiid = id + "lbh",
+      score = 2,
+      itypes = Seq("unrelated"),
+      appid = dummyApp.id,
+      algoid = algoid,
+      modelset = true
+    ))
+
+    algoOutputSelector.itemSimSelection("user1", 10, None, Some((37.3229978, -122.0321823)), Some(2.2), None)(dummyApp, engine.copy(id = engineid)) must beEqualTo(Seq(id + "hsh", id + "dac"))
   }
 
   def itemSimOutputSelectionUnsupportedAlgo(algoOutputSelector: AlgoOutputSelector) = {
@@ -364,7 +615,7 @@
     )
     val algoid = mongoAlgos.insert(algo)
 
-    algoOutputSelector.itemSimSelection("", 10, None)(dummyApp, engine.copy(id = engineid)) must throwA[RuntimeException]
+    algoOutputSelector.itemSimSelection("", 10, None, None, None, None)(dummyApp, engine.copy(id = engineid)) must throwA[RuntimeException]
   }
 
   def itemSimOutputSelectionNoAlgo(algoOutputSelector: AlgoOutputSelector) = {
@@ -377,7 +628,7 @@
       settings = Map()
     )
     val engineid = mongoEngines.insert(engine)
-    algoOutputSelector.itemSimSelection("", 10, None)(dummyApp, engine.copy(id = engineid)) must throwA[RuntimeException]
+    algoOutputSelector.itemSimSelection("", 10, None, None, None, None)(dummyApp, engine.copy(id = engineid)) must throwA[RuntimeException]
   }
 
   def itemSimOutputSelectionBadEngine(algoOutputSelector: AlgoOutputSelector) = {
@@ -390,6 +641,6 @@
       settings = Map()
     )
     val engineid = mongoEngines.insert(engine)
-    algoOutputSelector.itemSimSelection("", 10, None)(dummyApp, engine.copy(id = engineid)) must throwA[RuntimeException]
+    algoOutputSelector.itemSimSelection("", 10, None, None, None, None)(dummyApp, engine.copy(id = engineid)) must throwA[RuntimeException]
   }
 }
diff --git a/process/commons/hadoop/scalding/build.sbt b/process/commons/hadoop/scalding/build.sbt
index 0c3e8a2..20030ed 100644
--- a/process/commons/hadoop/scalding/build.sbt
+++ b/process/commons/hadoop/scalding/build.sbt
@@ -2,7 +2,7 @@
 
 organization := "io.prediction"
 
-version := "0.6.1"
+version := "0.6.2"
 
 scalaVersion := "2.10.2"
 
@@ -33,7 +33,7 @@
 )
 
 libraryDependencies ++= Seq(
-  "io.prediction" %% "predictionio-commons" % "0.6.1"
+  "io.prediction" %% "predictionio-commons" % "0.6.2"
 )
 
 resolvers += "Local Maven Repository" at "file://"+Path.userHome.absolutePath+"/.m2/repository"
diff --git a/process/engines/itemrec/algorithms/hadoop/scalding/build.sbt b/process/engines/itemrec/algorithms/hadoop/scalding/build.sbt
index 955f423..0455824 100644
--- a/process/engines/itemrec/algorithms/hadoop/scalding/build.sbt
+++ b/process/engines/itemrec/algorithms/hadoop/scalding/build.sbt
@@ -4,7 +4,7 @@
 
 packageOptions in ThisBuild += Package.ManifestAttributes(java.util.jar.Attributes.Name.MAIN_CLASS -> "com.twitter.scalding.Tool")
 
-version in ThisBuild := "0.6.1"
+version in ThisBuild := "0.6.2"
 
 scalaVersion in ThisBuild := "2.10.2"
 
@@ -16,8 +16,8 @@
   "org.apache.hadoop" % "hadoop-core" % "1.0.4",
   "com.twitter" %% "scalding-core" % "0.8.6",
   "org.specs2" %% "specs2" % "1.14" % "test",
-  "io.prediction" %% "predictionio-commons" % "0.6.1",
-  "io.prediction" %% "predictionio-process-commons-hadoop-scalding" % "0.6.1",
+  "io.prediction" %% "predictionio-commons" % "0.6.2",
+  "io.prediction" %% "predictionio-process-commons-hadoop-scalding" % "0.6.2",
   "org.slf4j" % "slf4j-log4j12" % "1.6.6")
 
 resolvers in ThisBuild ++= Seq(
diff --git a/process/engines/itemrec/algorithms/hadoop/scalding/mahout/src/main/scala/io/prediction/algorithms/scalding/mahout/itemrec/ModelConstructor.scala b/process/engines/itemrec/algorithms/hadoop/scalding/mahout/src/main/scala/io/prediction/algorithms/scalding/mahout/itemrec/ModelConstructor.scala
index 0488800..b8ffada 100644
--- a/process/engines/itemrec/algorithms/hadoop/scalding/mahout/src/main/scala/io/prediction/algorithms/scalding/mahout/itemrec/ModelConstructor.scala
+++ b/process/engines/itemrec/algorithms/hadoop/scalding/mahout/src/main/scala/io/prediction/algorithms/scalding/mahout/itemrec/ModelConstructor.scala
@@ -4,6 +4,7 @@
 
 import io.prediction.commons.filepath.{DataFile, AlgoFile}
 import io.prediction.commons.scalding.modeldata.ItemRecScores
+import cascading.pipe.joiner.LeftJoin
 
 /**
  * Source:
@@ -68,7 +69,7 @@
  
   val predicted = Tsv(AlgoFile(hdfsRootArg, appidArg, engineidArg, algoidArg, evalidArg, "predicted.tsv"), ('uindex, 'predicted)).read
 
-  val ratingSource = Csv(DataFile(hdfsRootArg, appidArg, engineidArg, algoidArg, evalidArg, "ratings.csv"), ",", ('uindex, 'iindex, 'rating))
+  val ratingSource = Csv(DataFile(hdfsRootArg, appidArg, engineidArg, algoidArg, evalidArg, "ratings.csv"), ",", ('uindexR, 'iindexR, 'ratingR))
 
   val itemsIndex = Tsv(DataFile(hdfsRootArg, appidArg, engineidArg, algoidArg, evalidArg, "itemsIndex.tsv")).read
     .mapTo((0, 1, 2) -> ('iindexI, 'iidI, 'itypesI)) { fields: (String, String, String) =>
@@ -93,16 +94,24 @@
    * computation
    */
 
+  val seenRatings = ratingSource.read
+
+  // convert to (uindex, iindex, rating) format
+  // and filter seen items from predicted
   val predictedRating = predicted.flatMap('predicted -> ('iindex, 'rating)) { data: String => parsePredictedData(data) }
+    .joinWithSmaller(('uindex, 'iindex) -> ('uindexR, 'iindexR), seenRatings, joiner = new LeftJoin )
+    .filter('ratingR) { r: Double => (r == 0) } // if ratingR == 0, means unseen rating
     .project('uindex, 'iindex, 'rating)
 
-  val combinedRating = if (unseenOnlyArg) predictedRating else (predictedRating ++ (ratingSource.read))
+  val combinedRating = if (unseenOnlyArg) predictedRating else {
+
+    // rename for concatenation
+    val seenRatings2 = seenRatings.rename(('uindexR, 'iindexR, 'ratingR) -> ('uindex, 'iindex, 'rating))
+
+    predictedRating ++ seenRatings2
+  }
   
   combinedRating
-    // just in case, if there are duplicates for the same u-i pair, simply take 1
-    // But note this is not supposed to happen anyway, because
-    // Mahout only recommends items which are not in ratings set.
-    .groupBy('uindex, 'iindex) { _.take(1) }
     .groupBy('uindex) { _.sortBy('rating).reverse.take(numRecommendationsArg) }
     .joinWithSmaller('iindex -> 'iindexI, itemsIndex)
     .joinWithSmaller('uindex -> 'uindexU, usersIndex)
diff --git a/process/engines/itemrec/algorithms/hadoop/scalding/mahout/src/test/scala/io/prediction/algorithms/scalding/mahout/itemrec/ModelConstructorTest.scala b/process/engines/itemrec/algorithms/hadoop/scalding/mahout/src/test/scala/io/prediction/algorithms/scalding/mahout/itemrec/ModelConstructorTest.scala
index 2de7ad5..c99fc4f 100644
--- a/process/engines/itemrec/algorithms/hadoop/scalding/mahout/src/test/scala/io/prediction/algorithms/scalding/mahout/itemrec/ModelConstructorTest.scala
+++ b/process/engines/itemrec/algorithms/hadoop/scalding/mahout/src/test/scala/io/prediction/algorithms/scalding/mahout/itemrec/ModelConstructorTest.scala
@@ -42,7 +42,7 @@
       .arg("unseenOnly", unseenOnly.toString)
       .arg("numRecommendations", numRecommendations.toString)
       .source(Tsv(AlgoFile(hdfsRoot, appid, engineid, algoid, evalid, "predicted.tsv"), new Fields("uindex", "predicted")), predicted)
-      .source(Csv(DataFile(hdfsRoot, appid, engineid, algoid, evalid, "ratings.csv"), ",", new Fields("uindex", "iindex", "rating")), ratings)
+      .source(Csv(DataFile(hdfsRoot, appid, engineid, algoid, evalid, "ratings.csv"), ",", new Fields("uindexR", "iindexR", "ratingR")), ratings)
       .source(Tsv(DataFile(hdfsRoot, appid, engineid, algoid, evalid, "itemsIndex.tsv")), items)
       .source(Tsv(DataFile(hdfsRoot, appid, engineid, algoid, evalid, "usersIndex.tsv")), users)
       .sink[(String, String, String, String, Int, Boolean)](ItemRecScores(dbType=dbType, dbName=dbName, dbHost=dbHost, dbPort=dbPort).getSource) { outputBuffer =>
@@ -60,6 +60,7 @@
   val test1Users = List(("0", "u0"), ("1", "u1"), ("2", "u2"), ("3", "u3"))
 
   val test1Predicted = List(("0", "[1:0.123,2:0.456]"), ("1", "[0:1.2]"))
+  val test1PredictedWithSeenItems = List(("0", "[1:0.123,2:0.456,0:4.321,3:1.234]"), ("1", "[0:1.2]"))
 
   val test1Ratings = List(("0", "0", "2.3"), ("0", "3", "4.56"))
     
@@ -108,4 +109,17 @@
 
   }
 
+  "mahout.itemrec.itembased ModelConstructor with unseenOnly=false, numRecommendations=100 and seen items in predicted results" should {
+
+    test(false, 100, test1Items, test1Users, test1PredictedWithSeenItems, test1Ratings, test1Output)
+
+  }
+
+  "mahout.itemrec.itembased ModelConstructor with unseenOnly=true, numRecommendations=100 and seen items in predicted results" should {
+
+    test(true, 100, test1Items, test1Users, test1PredictedWithSeenItems, test1Ratings, test1OutputUnseenOnly)
+
+  }
+
+
 }
diff --git a/process/engines/itemrec/algorithms/scala/mahout/build.sbt b/process/engines/itemrec/algorithms/scala/mahout/build.sbt
index 8d3788a..abe8004 100644
--- a/process/engines/itemrec/algorithms/scala/mahout/build.sbt
+++ b/process/engines/itemrec/algorithms/scala/mahout/build.sbt
@@ -4,7 +4,7 @@
 
 packageOptions += Package.ManifestAttributes(java.util.jar.Attributes.Name.MAIN_CLASS -> "io.prediction.commons.mahout.itemrec.MahoutJob")
 
-version in ThisBuild:= "0.6.1"
+version in ThisBuild:= "0.6.2"
 
 scalaVersion in ThisBuild:= "2.10.2"
 
diff --git a/process/engines/itemrec/algorithms/scala/mahout/commons/build.sbt b/process/engines/itemrec/algorithms/scala/mahout/commons/build.sbt
index 942129d..4c3db92 100644
--- a/process/engines/itemrec/algorithms/scala/mahout/commons/build.sbt
+++ b/process/engines/itemrec/algorithms/scala/mahout/commons/build.sbt
@@ -1,6 +1,6 @@
 name := "PredictionIO-Process-ItemRec-Algorithms-Scala-Mahout-Commons"
 
 libraryDependencies ++= Seq(
-  "io.prediction" %% "predictionio-commons" % "0.6.1",
+  "io.prediction" %% "predictionio-commons" % "0.6.2",
   "org.apache.mahout" % "mahout-core" % "0.8"
 )
diff --git a/process/engines/itemrec/evaluations/hadoop/scalding/build.sbt b/process/engines/itemrec/evaluations/hadoop/scalding/build.sbt
index 76c8d2e..2e1829b 100644
--- a/process/engines/itemrec/evaluations/hadoop/scalding/build.sbt
+++ b/process/engines/itemrec/evaluations/hadoop/scalding/build.sbt
@@ -4,7 +4,7 @@
 
 packageOptions in ThisBuild += Package.ManifestAttributes(java.util.jar.Attributes.Name.MAIN_CLASS -> "com.twitter.scalding.Tool")
 
-version in ThisBuild := "0.6.1"
+version in ThisBuild := "0.6.2"
 
 scalaVersion in ThisBuild := "2.10.2"
 
@@ -16,8 +16,8 @@
   "org.apache.hadoop" % "hadoop-core" % "1.0.4",
   "com.twitter" %% "scalding-core" % "0.8.6",
   "org.specs2" %% "specs2" % "1.14" % "test",
-  "io.prediction" %% "predictionio-commons" % "0.6.1",
-  "io.prediction" %% "predictionio-process-commons-hadoop-scalding" % "0.6.1",
+  "io.prediction" %% "predictionio-commons" % "0.6.2",
+  "io.prediction" %% "predictionio-process-commons-hadoop-scalding" % "0.6.2",
   "org.slf4j" % "slf4j-log4j12" % "1.6.6")
 
 resolvers in ThisBuild ++= Seq(
diff --git a/process/engines/itemrec/evaluations/scala/paramgen/build.sbt b/process/engines/itemrec/evaluations/scala/paramgen/build.sbt
index 3f467a4..b8f5824 100644
--- a/process/engines/itemrec/evaluations/scala/paramgen/build.sbt
+++ b/process/engines/itemrec/evaluations/scala/paramgen/build.sbt
@@ -4,12 +4,12 @@
 
 name := "PredictionIO-Process-ItemRec-Evaluations-ParamGen"
 
-version := "0.6.1"
+version := "0.6.2"
 
 scalaVersion := "2.10.2"
 
 libraryDependencies ++= Seq(
-  "io.prediction" %% "predictionio-commons" % "0.6.1",
+  "io.prediction" %% "predictionio-commons" % "0.6.2",
   "ch.qos.logback" % "logback-classic" % "1.0.9",
   "ch.qos.logback" % "logback-core" % "1.0.9",
   "com.typesafe" % "config" % "1.0.0",
diff --git a/process/engines/itemrec/evaluations/scala/topkitems/build.sbt b/process/engines/itemrec/evaluations/scala/topkitems/build.sbt
index 67a2a1d..59e05b9 100644
--- a/process/engines/itemrec/evaluations/scala/topkitems/build.sbt
+++ b/process/engines/itemrec/evaluations/scala/topkitems/build.sbt
@@ -4,13 +4,13 @@
 
 name := "PredictionIO-Process-ItemRec-Evaluations-TopKItems"
 
-version := "0.6.1"
+version := "0.6.2"
 
 scalaVersion := "2.10.2"
 
 libraryDependencies ++= Seq(
-  "io.prediction" %% "predictionio-commons" % "0.6.1",
-  "io.prediction" %% "predictionio-output" % "0.6.1",
+  "io.prediction" %% "predictionio-commons" % "0.6.2",
+  "io.prediction" %% "predictionio-output" % "0.6.2",
   "ch.qos.logback" % "logback-classic" % "1.0.9",
   "ch.qos.logback" % "logback-core" % "1.0.9",
   "com.github.scala-incubator.io" %% "scala-io-core" % "0.4.2",
diff --git a/process/engines/itemrec/evaluations/scala/topkitems/src/main/scala/io/prediction/evaluations/itemrec/topkitems/TopKItems.scala b/process/engines/itemrec/evaluations/scala/topkitems/src/main/scala/io/prediction/evaluations/itemrec/topkitems/TopKItems.scala
index 7aa6177..3823971 100644
--- a/process/engines/itemrec/evaluations/scala/topkitems/src/main/scala/io/prediction/evaluations/itemrec/topkitems/TopKItems.scala
+++ b/process/engines/itemrec/evaluations/scala/topkitems/src/main/scala/io/prediction/evaluations/itemrec/topkitems/TopKItems.scala
@@ -45,7 +45,7 @@
 
     var userCount = 0
     users.getByAppid(evalid) foreach { u =>
-      val topKItems = ItemRecAlgoOutput.output(u.id, k, None)(app, engine, algo, Some(offlineEval))
+      val topKItems = ItemRecAlgoOutput.output(u.id, k, None, None, None, None)(app, engine, algo, Some(offlineEval))
       if (topKItems.length > 0) {
         userCount += 1
         output.write("%d_%s\t%s\n".format(evalid, u.id, topKItems.map(iid => "%d_%s".format(evalid, iid)).mkString(",")))
diff --git a/process/engines/itemrec/evaluations/scala/trainingtestsplit/build.sbt b/process/engines/itemrec/evaluations/scala/trainingtestsplit/build.sbt
index 076e1f0..d19b8d8 100644
--- a/process/engines/itemrec/evaluations/scala/trainingtestsplit/build.sbt
+++ b/process/engines/itemrec/evaluations/scala/trainingtestsplit/build.sbt
@@ -4,12 +4,12 @@
 
 name := "PredictionIO-Process-ItemRec-Evaluations-Scala-TrainingTestSplitTime"
 
-version := "0.6.1"
+version := "0.6.2"
 
 scalaVersion in ThisBuild := "2.10.2"
 
 libraryDependencies ++= Seq(
-  "io.prediction" %% "predictionio-commons" % "0.6.1"
+  "io.prediction" %% "predictionio-commons" % "0.6.2"
 )
 
 libraryDependencies += "com.twitter" %% "scalding-args" % "0.8.6"
diff --git a/process/engines/itemsim/algorithms/hadoop/scalding/build.sbt b/process/engines/itemsim/algorithms/hadoop/scalding/build.sbt
index d300cf3..83af744 100644
--- a/process/engines/itemsim/algorithms/hadoop/scalding/build.sbt
+++ b/process/engines/itemsim/algorithms/hadoop/scalding/build.sbt
@@ -4,7 +4,7 @@
 
 packageOptions in ThisBuild += Package.ManifestAttributes(java.util.jar.Attributes.Name.MAIN_CLASS -> "com.twitter.scalding.Tool")
 
-version in ThisBuild := "0.6.1"
+version in ThisBuild := "0.6.2"
 
 scalaVersion in ThisBuild := "2.10.2"
 
@@ -16,8 +16,8 @@
   "org.apache.hadoop" % "hadoop-core" % "1.0.4",
   "com.twitter" %% "scalding-core" % "0.8.6",
   "org.specs2" %% "specs2" % "1.14" % "test",
-  "io.prediction" %% "predictionio-commons" % "0.6.1",
-  "io.prediction" %% "predictionio-process-commons-hadoop-scalding" % "0.6.1",
+  "io.prediction" %% "predictionio-commons" % "0.6.2",
+  "io.prediction" %% "predictionio-process-commons-hadoop-scalding" % "0.6.2",
   "org.slf4j" % "slf4j-log4j12" % "1.6.6")
 
 resolvers in ThisBuild ++= Seq(
diff --git a/process/engines/itemsim/evaluations/hadoop/scalding/build.sbt b/process/engines/itemsim/evaluations/hadoop/scalding/build.sbt
index 3611eeb..dbd14ee 100644
--- a/process/engines/itemsim/evaluations/hadoop/scalding/build.sbt
+++ b/process/engines/itemsim/evaluations/hadoop/scalding/build.sbt
@@ -4,7 +4,7 @@
 
 packageOptions in ThisBuild += Package.ManifestAttributes(java.util.jar.Attributes.Name.MAIN_CLASS -> "com.twitter.scalding.Tool")
 
-version in ThisBuild := "0.6.1"
+version in ThisBuild := "0.6.2"
 
 scalaVersion in ThisBuild := "2.10.2"
 
@@ -16,8 +16,8 @@
   "org.apache.hadoop" % "hadoop-core" % "1.0.4",
   "com.twitter" %% "scalding-core" % "0.8.6",
   "org.specs2" %% "specs2" % "1.14" % "test",
-  "io.prediction" %% "predictionio-commons" % "0.6.1",
-  "io.prediction" %% "predictionio-process-commons-hadoop-scalding" % "0.6.1",
+  "io.prediction" %% "predictionio-commons" % "0.6.2",
+  "io.prediction" %% "predictionio-process-commons-hadoop-scalding" % "0.6.2",
   "org.slf4j" % "slf4j-log4j12" % "1.6.6")
 
 resolvers in ThisBuild ++= Seq(
diff --git a/process/engines/itemsim/evaluations/scala/topkitems/build.sbt b/process/engines/itemsim/evaluations/scala/topkitems/build.sbt
index b5396d7..31ee6a1 100644
--- a/process/engines/itemsim/evaluations/scala/topkitems/build.sbt
+++ b/process/engines/itemsim/evaluations/scala/topkitems/build.sbt
@@ -4,13 +4,13 @@
 
 name := "PredictionIO-Process-ItemSim-Evaluations-TopKItems"
 
-version := "0.6.1"
+version := "0.6.2"
 
 scalaVersion := "2.10.2"
 
 libraryDependencies ++= Seq(
-  "io.prediction" %% "predictionio-commons" % "0.6.1",
-  "io.prediction" %% "predictionio-output" % "0.6.1",
+  "io.prediction" %% "predictionio-commons" % "0.6.2",
+  "io.prediction" %% "predictionio-output" % "0.6.2",
   "ch.qos.logback" % "logback-classic" % "1.0.9",
   "ch.qos.logback" % "logback-core" % "1.0.9",
   "com.github.scala-incubator.io" %% "scala-io-core" % "0.4.2",
diff --git a/process/engines/itemsim/evaluations/scala/topkitems/src/main/scala/io/prediction/evaluations/itemsim/topkitems/TopKItems.scala b/process/engines/itemsim/evaluations/scala/topkitems/src/main/scala/io/prediction/evaluations/itemsim/topkitems/TopKItems.scala
index e982706..a988b1d 100644
--- a/process/engines/itemsim/evaluations/scala/topkitems/src/main/scala/io/prediction/evaluations/itemsim/topkitems/TopKItems.scala
+++ b/process/engines/itemsim/evaluations/scala/topkitems/src/main/scala/io/prediction/evaluations/itemsim/topkitems/TopKItems.scala
@@ -47,7 +47,7 @@
 
     var itemCount = 0
     items.getByAppid(evalid) foreach { i =>
-      val topKItems = ItemSimAlgoOutput.output(i.id, k, None)(app, engine, algo, Some(offlineEval))
+      val topKItems = ItemSimAlgoOutput.output(i.id, k, None, None, None, None)(app, engine, algo, Some(offlineEval))
       if (topKItems.length > 0) {
         itemCount += 1
         topKItems.zip(scores) foreach { tuple =>
diff --git a/servers/admin/app/controllers/Application.scala b/servers/admin/app/controllers/Application.scala
index be7c8c4..76de5f3 100644
--- a/servers/admin/app/controllers/Application.scala
+++ b/servers/admin/app/controllers/Application.scala
@@ -368,7 +368,7 @@
     						"""),
       Map(
         "id" -> "itemsim",
-        "enginetypeName" -> "Items Similarity Prediction Engine",
+        "enginetypeName" -> "Item Similarity Engine",
         "description" -> """
     		            	<h6>Discover similar items.</h6>
 				            <p>Sample Use Cases</p>
diff --git a/servers/admin/project/Build.scala b/servers/admin/project/Build.scala
index f3cffa2..11a3ea7 100644
--- a/servers/admin/project/Build.scala
+++ b/servers/admin/project/Build.scala
@@ -5,11 +5,11 @@
 object ApplicationBuild extends Build {
 
     val appName         = "predictionio-admin"
-    val appVersion      = "0.6.1"
+    val appVersion      = "0.6.2"
 
     val appDependencies = Seq(
-      "io.prediction" %% "predictionio-commons" % "0.6.1",
-      "io.prediction" %% "predictionio-output" % "0.6.1",
+      "io.prediction" %% "predictionio-commons" % "0.6.2",
+      "io.prediction" %% "predictionio-output" % "0.6.2",
       "com.github.nscala-time" %% "nscala-time" % "0.4.2",
       "commons-codec" % "commons-codec" % "1.8"
     )
diff --git a/servers/api/app/io/prediction/api/API.scala b/servers/api/app/io/prediction/api/API.scala
index f3669cc..23796d2 100644
--- a/servers/api/app/io/prediction/api/API.scala
+++ b/servers/api/app/io/prediction/api/API.scala
@@ -544,7 +544,13 @@
                 val res = algoOutputSelector.itemRecSelection(
                   uid = uid,
                   n = n,
-                  itypes = itypes map { _.split(",") }
+                  itypes = itypes map { _.split(",") },
+                  latlng = latlng map { latlng =>
+                    val ll = latlng.split(",")
+                    (ll(0).toDouble, ll(1).toDouble)
+                  },
+                  within = within map { _.toDouble},
+                  unit = unit
                 )
                 if (res.length > 0) {
                   val attributesToGet: Seq[String] = attributes map { _.split(",").toSeq } getOrElse Seq()
@@ -598,7 +604,13 @@
                 val res = algoOutputSelector.itemSimSelection(
                   iid = iid,
                   n = n,
-                  itypes = itypes map { _.split(",") }
+                  itypes = itypes map { _.split(",") },
+                  latlng = latlng map { latlng =>
+                    val ll = latlng.split(",")
+                    (ll(0).toDouble, ll(1).toDouble)
+                  },
+                  within = within map { _.toDouble },
+                  unit = unit
                 )
                 if (res.length > 0) {
                   val attributesToGet: Seq[String] = attributes map { _.split(",").toSeq } getOrElse Seq()
diff --git a/servers/api/conf/test.conf b/servers/api/conf/test.conf
new file mode 100644
index 0000000..2d69baa
--- /dev/null
+++ b/servers/api/conf/test.conf
@@ -0,0 +1,81 @@
+# This is the main configuration file for the application.
+# ~~~~~
+
+# Secret key
+# ~~~~~
+# The secret key is used to secure cryptographics functions.
+# If you deploy your application to several instances be sure to use the same key!
+application.secret="LXWfyDJiEh];Q]w;6W[97aRF;[TR[2Q0yZCrZP0pbpUC2KpNFov1w5u@bpl=4/Ck"
+
+# The application languages
+# ~~~~~
+application.langs="en"
+
+# Global object class
+# ~~~~~
+# Define the Global object class for this application.
+# Default to Global in the root package.
+# global=Global
+
+# Database configuration
+# ~~~~~
+# You can declare as many datasources as you want.
+# By convention, the default datasource is named `default`
+#
+# db.default.driver=org.h2.Driver
+# db.default.url="jdbc:h2:mem:play"
+# db.default.user=sa
+# db.default.password=
+
+# Evolutions
+# ~~~~~
+# You can disable evolutions if needed
+# evolutionplugin=disabled
+
+# Logger
+# ~~~~~
+# You can also configure logback (http://logback.qos.ch/), by providing a logger.xml file in the conf directory .
+
+# Root logger:
+logger.root=ERROR
+
+# Logger used by the framework:
+logger.play=INFO
+
+# Logger provided to your application:
+logger.application=DEBUG
+
+# PredictionIO Repository Base (For Development Only)
+io.prediction.base=../..
+
+# PredictionIO Commons Settings
+io.prediction.commons.settings.db.type=mongodb
+io.prediction.commons.settings.db.host=localhost
+io.prediction.commons.settings.db.port=27017
+io.prediction.commons.settings.db.name=test_api_predictionio
+
+io.prediction.commons.appdata.db.type=mongodb
+io.prediction.commons.appdata.db.host=localhost
+io.prediction.commons.appdata.db.port=27017
+io.prediction.commons.appdata.db.name=test_api_predictionio_appdata
+
+io.prediction.commons.appdata.test.db.type=mongodb
+io.prediction.commons.appdata.test.db.host=localhost
+io.prediction.commons.appdata.test.db.port=27017
+
+io.prediction.commons.appdata.training.db.type=mongodb
+io.prediction.commons.appdata.training.db.host=localhost
+io.prediction.commons.appdata.training.db.port=27017
+
+io.prediction.commons.appdata.validation.db.type=mongodb
+io.prediction.commons.appdata.validation.db.host=localhost
+io.prediction.commons.appdata.validation.db.port=27017
+
+io.prediction.commons.modeldata.db.type=mongodb
+io.prediction.commons.modeldata.db.host=localhost
+io.prediction.commons.modeldata.db.port=27017
+io.prediction.commons.modeldata.db.name=test_api_predictionio_modeldata
+
+io.prediction.commons.modeldata.training.db.type=mongodb
+io.prediction.commons.modeldata.training.db.host=localhost
+io.prediction.commons.modeldata.training.db.port=27017
diff --git a/servers/api/project/Build.scala b/servers/api/project/Build.scala
index 5016e57..151cfbb 100644
--- a/servers/api/project/Build.scala
+++ b/servers/api/project/Build.scala
@@ -5,14 +5,15 @@
 object ApplicationBuild extends Build {
 
   val appName         = "predictionio-api"
-  val appVersion      = "0.6.1"
+  val appVersion      = "0.6.2"
 
   val appDependencies = Seq(
-    "io.prediction" %% "predictionio-commons" % "0.6.1",
-    "io.prediction" %% "predictionio-output" % "0.6.1"
+    "io.prediction" %% "predictionio-commons" % "0.6.2",
+    "io.prediction" %% "predictionio-output" % "0.6.2"
   )
 
   val main = play.Project(appName, appVersion, appDependencies).settings(
+    javaOptions in Test += "-Dconfig.file=conf/test.conf",
     resolvers += (
       "Local Maven Repository" at "file://"+Path.userHome.absolutePath+"/.m2/repository"
     )
diff --git a/servers/api/test/APISpec.scala b/servers/api/test/APISpec.scala
new file mode 100644
index 0000000..b7eed00
--- /dev/null
+++ b/servers/api/test/APISpec.scala
@@ -0,0 +1,270 @@
+package io.prediction.scheduler
+
+import io.prediction.commons.Config
+import io.prediction.commons.settings._
+import io.prediction.commons.appdata._
+import io.prediction.commons.modeldata._
+
+import play.api.libs.json._
+import play.api.test._
+import play.api.test.Helpers._
+
+import org.specs2.mutable._
+//import org.specs2.specification.Step
+
+import com.mongodb.casbah.Imports._
+import com.github.nscala_time.time.Imports._
+
+class APISpec extends Specification {
+  "PredictionIO API Specification".txt
+
+  /** Setup test data. */
+  val config = new Config
+  val apps = config.getSettingsApps()
+  val engines = config.getSettingsEngines()
+  val algos = config.getSettingsAlgos()
+  val items = config.getAppdataItems()
+  val itemRecScores = config.getModeldataItemRecScores()
+  val itemSimScores = config.getModeldataItemSimScores()
+
+  val userid = 1
+
+  val appid = apps.insert(App(
+    id = 0,
+    userid = userid,
+    appkey = "appkey",
+    display = "",
+    url = None,
+    cat = None,
+    desc = None,
+    timezone = "UTC"))
+
+  val dac = Item(
+    id         = "dac",
+    appid      = appid,
+    ct         = DateTime.now,
+    itypes     = List("fresh", "meat"),
+    starttime  = Some(DateTime.now.hour(14).minute(13)),
+    endtime    = None,
+    price      = Some(49.394),
+    profit     = None,
+    latlng     = Some((37.3197611, -122.0466141)),
+    inactive   = None,
+    attributes = Some(Map("foo" -> "bar", "foo2" -> "bar2")))
+  val hsh = Item(
+    id         = "hsh",
+    appid      = appid,
+    ct         = DateTime.now,
+    itypes     = List("fresh", "meat"),
+    starttime  = Some(DateTime.now.hour(23).minute(13)),
+    endtime    = None,
+    price      = Some(49.394),
+    profit     = None,
+    latlng     = Some((37.3370801, -122.0493201)),
+    inactive   = None,
+    attributes = None)
+  val mvh = Item(
+    id         = "mvh",
+    appid      = appid,
+    ct         = DateTime.now,
+    itypes     = List("fresh", "meat"),
+    starttime  = Some(DateTime.now.hour(17).minute(13)),
+    endtime    = None,
+    price      = Some(49.394),
+    profit     = None,
+    latlng     = Some((37.3154153, -122.0566829)),
+    inactive   = None,
+    attributes = Some(Map("foo3" -> "bar3")))
+  val lbh = Item(
+    id         = "lbh",
+    appid      = appid,
+    ct         = DateTime.now,
+    itypes     = List("fresh", "meat"),
+    starttime  = Some(DateTime.now.hour(3).minute(13)),
+    endtime    = None,
+    price      = Some(49.394),
+    profit     = None,
+    latlng     = Some((37.2997029, -122.0034684)),
+    inactive   = None,
+    attributes = Some(Map("foo4" -> "bar4", "foo5" -> "bar5")))
+  val allItems = Seq(dac, hsh, lbh, mvh)
+  allItems foreach { items.insert(_) }
+
+  "ItemRec" should {
+    val enginename = "itemrec"
+
+    val engineid = engines.insert(Engine(
+      id       = 0,
+      appid    = appid,
+      name     = "itemrec",
+      infoid   = "itemrec",
+      itypes   = None,
+      settings = Map()))
+
+    val algoid = algos.insert(Algo(
+      id       = 0,
+      engineid = engineid,
+      name     = enginename,
+      infoid   = "pdio-knnitembased",
+      command  = "itemr",
+      params   = Map("foo" -> "bar"),
+      settings = Map("dead" -> "beef"),
+      modelset = true,
+      createtime = DateTime.now,
+      updatetime = DateTime.now,
+      status = "deployed",
+      offlineevalid = None))
+
+    itemRecScores.insert(ItemRecScore(
+      uid = "user1",
+      iid = "dac",
+      score = 1,
+      itypes = Seq("bar"),
+      appid = appid,
+      algoid = algoid,
+      modelset = true))
+
+    itemRecScores.insert(ItemRecScore(
+      uid = "user1",
+      iid = "hsh",
+      score = 4,
+      itypes = Seq("foo"),
+      appid = appid,
+      algoid = algoid,
+      modelset = true))
+
+    itemRecScores.insert(ItemRecScore(
+      uid = "user1",
+      iid = "mvh",
+      score = 3,
+      itypes = Seq("unrelated"),
+      appid = appid,
+      algoid = algoid,
+      modelset = true))
+
+    itemRecScores.insert(ItemRecScore(
+      uid = "user1",
+      iid = "lbh",
+      score = 2,
+      itypes = Seq("unrelated"),
+      appid = appid,
+      algoid = algoid,
+      modelset = true))
+
+    "get top N" in new WithServer {
+      val response = await(wsUrl(s"/engines/itemrec/${enginename}/topn.json")
+        .withQueryString(
+          "pio_appkey" -> "appkey",
+          "pio_uid"    -> "user1",
+          "pio_n"      -> "10")
+        .get())
+      response.status must beEqualTo(OK) and
+        (response.body must beEqualTo("""{"pio_iids":["hsh","mvh","lbh","dac"]}"""))
+    }
+
+    "get top N with geo" in new WithServer {
+      val response = await(wsUrl(s"/engines/itemrec/${enginename}/topn.json")
+        .withQueryString(
+          "pio_appkey" -> "appkey",
+          "pio_uid"    -> "user1",
+          "pio_n"      -> "10",
+          "pio_latlng" -> "37.3229978,-122.0321823",
+          "pio_within" -> "2.2")
+        .get())
+      response.status must beEqualTo(OK) and
+        (response.body must beEqualTo("""{"pio_iids":["hsh","dac"]}"""))
+    }
+  }
+
+  "ItemSim" should {
+    val enginename = "itemsim"
+
+    val engineid = engines.insert(Engine(
+      id       = 0,
+      appid    = appid,
+      name     = "itemsim",
+      infoid   = "itemsim",
+      itypes   = None,
+      settings = Map()))
+
+    val algoid = algos.insert(Algo(
+      id       = 0,
+      engineid = engineid,
+      name     = enginename,
+      infoid   = "pdio-itembasedcf",
+      command  = "items",
+      params   = Map("foo" -> "bar"),
+      settings = Map("dead" -> "beef"),
+      modelset = true,
+      createtime = DateTime.now,
+      updatetime = DateTime.now,
+      status = "deployed",
+      offlineevalid = None))
+
+    itemSimScores.insert(ItemSimScore(
+      iid = "user1",
+      simiid = "dac",
+      score = 1,
+      itypes = Seq("bar"),
+      appid = appid,
+      algoid = algoid,
+      modelset = true))
+
+    itemSimScores.insert(ItemSimScore(
+      iid = "user1",
+      simiid = "hsh",
+      score = 4,
+      itypes = Seq("foo"),
+      appid = appid,
+      algoid = algoid,
+      modelset = true))
+
+    itemSimScores.insert(ItemSimScore(
+      iid = "user1",
+      simiid = "mvh",
+      score = 3,
+      itypes = Seq("unrelated"),
+      appid = appid,
+      algoid = algoid,
+      modelset = true))
+
+    itemSimScores.insert(ItemSimScore(
+      iid = "user1",
+      simiid = "lbh",
+      score = 2,
+      itypes = Seq("unrelated"),
+      appid = appid,
+      algoid = algoid,
+      modelset = true))
+
+    "get top N" in new WithServer {
+      val response = await(wsUrl(s"/engines/itemsim/${enginename}/topn.json")
+        .withQueryString(
+          "pio_appkey" -> "appkey",
+          "pio_iid"    -> "user1",
+          "pio_n"      -> "10")
+        .get())
+      response.status must beEqualTo(OK) and
+        (response.body must beEqualTo("""{"pio_iids":["hsh","mvh","lbh","dac"]}"""))
+    }
+
+    "get top N with geo" in new WithServer {
+      val response = await(wsUrl(s"/engines/itemsim/${enginename}/topn.json")
+        .withQueryString(
+          "pio_appkey" -> "appkey",
+          "pio_iid"    -> "user1",
+          "pio_n"      -> "10",
+          "pio_latlng" -> "37.3229978,-122.0321823",
+          "pio_within" -> "2.2")
+        .get())
+      response.status must beEqualTo(OK) and
+        (response.body must beEqualTo("""{"pio_iids":["hsh","dac"]}"""))
+    }
+  }
+
+  step {
+    MongoConnection()(config.settingsDbName).dropDatabase()
+    MongoConnection()(config.appdataDbName).dropDatabase()
+    MongoConnection()(config.modeldataDbName).dropDatabase()
+  }
+}
diff --git a/servers/api/tests/basic.sh b/servers/api/test/basic.sh
similarity index 100%
rename from servers/api/tests/basic.sh
rename to servers/api/test/basic.sh
diff --git a/servers/api/tests/basic_itemrectopn.sh b/servers/api/test/basic_itemrectopn.sh
similarity index 100%
rename from servers/api/tests/basic_itemrectopn.sh
rename to servers/api/test/basic_itemrectopn.sh
diff --git a/servers/api/tests/basic_pixel.sh b/servers/api/test/basic_pixel.sh
similarity index 100%
rename from servers/api/tests/basic_pixel.sh
rename to servers/api/test/basic_pixel.sh
diff --git a/servers/api/tests/basic_pixel_legacy.sh b/servers/api/test/basic_pixel_legacy.sh
similarity index 100%
rename from servers/api/tests/basic_pixel_legacy.sh
rename to servers/api/test/basic_pixel_legacy.sh
diff --git a/servers/api/tests/simple_dataset.sh b/servers/api/test/simple_dataset.sh
similarity index 100%
rename from servers/api/tests/simple_dataset.sh
rename to servers/api/test/simple_dataset.sh
diff --git a/servers/scheduler/conf/application.conf b/servers/scheduler/conf/application.conf
index 719db4b..eef5fff 100644
--- a/servers/scheduler/conf/application.conf
+++ b/servers/scheduler/conf/application.conf
@@ -86,28 +86,28 @@
 io.prediction.commons.settings.db.port=27017
 
 # PredictionIO Algorithms
-pdio-knnitembased.jar=${io.prediction.itemrec.base}/algorithms/hadoop/scalding/target/scala-2.10/PredictionIO-Process-ItemRec-Algorithms-Hadoop-Scalding-assembly-0.6.1.jar
-pdio-latestrank.jar=${io.prediction.itemrec.base}/algorithms/hadoop/scalding/target/scala-2.10/PredictionIO-Process-ItemRec-Algorithms-Hadoop-Scalding-assembly-0.6.1.jar
-pdio-randomrank.jar=${io.prediction.itemrec.base}/algorithms/hadoop/scalding/target/scala-2.10/PredictionIO-Process-ItemRec-Algorithms-Hadoop-Scalding-assembly-0.6.1.jar
-mahout-itembased.jar=${io.prediction.itemrec.base}/algorithms/hadoop/scalding/target/scala-2.10/PredictionIO-Process-ItemRec-Algorithms-Hadoop-Scalding-assembly-0.6.1.jar
-mahout-parallelals.jar=${io.prediction.itemrec.base}/algorithms/hadoop/scalding/target/scala-2.10/PredictionIO-Process-ItemRec-Algorithms-Hadoop-Scalding-assembly-0.6.1.jar
-mahout-knnuserbased.jar=${io.prediction.itemrec.base}/algorithms/hadoop/scalding/target/scala-2.10/PredictionIO-Process-ItemRec-Algorithms-Hadoop-Scalding-assembly-0.6.1.jar
-mahout-thresholduserbased.jar=${io.prediction.itemrec.base}/algorithms/hadoop/scalding/target/scala-2.10/PredictionIO-Process-ItemRec-Algorithms-Hadoop-Scalding-assembly-0.6.1.jar
-mahout-slopeone.jar=${io.prediction.itemrec.base}/algorithms/hadoop/scalding/target/scala-2.10/PredictionIO-Process-ItemRec-Algorithms-Hadoop-Scalding-assembly-0.6.1.jar
-mahout-alswr.jar=${io.prediction.itemrec.base}/algorithms/hadoop/scalding/target/scala-2.10/PredictionIO-Process-ItemRec-Algorithms-Hadoop-Scalding-assembly-0.6.1.jar
-mahout-svdsgd.jar=${io.prediction.itemrec.base}/algorithms/hadoop/scalding/target/scala-2.10/PredictionIO-Process-ItemRec-Algorithms-Hadoop-Scalding-assembly-0.6.1.jar
-mahout-svdplusplus.jar=${io.prediction.itemrec.base}/algorithms/hadoop/scalding/target/scala-2.10/PredictionIO-Process-ItemRec-Algorithms-Hadoop-Scalding-assembly-0.6.1.jar
+pdio-knnitembased.jar=${io.prediction.itemrec.base}/algorithms/hadoop/scalding/target/scala-2.10/PredictionIO-Process-ItemRec-Algorithms-Hadoop-Scalding-assembly-0.6.2.jar
+pdio-latestrank.jar=${io.prediction.itemrec.base}/algorithms/hadoop/scalding/target/scala-2.10/PredictionIO-Process-ItemRec-Algorithms-Hadoop-Scalding-assembly-0.6.2.jar
+pdio-randomrank.jar=${io.prediction.itemrec.base}/algorithms/hadoop/scalding/target/scala-2.10/PredictionIO-Process-ItemRec-Algorithms-Hadoop-Scalding-assembly-0.6.2.jar
+mahout-itembased.jar=${io.prediction.itemrec.base}/algorithms/hadoop/scalding/target/scala-2.10/PredictionIO-Process-ItemRec-Algorithms-Hadoop-Scalding-assembly-0.6.2.jar
+mahout-parallelals.jar=${io.prediction.itemrec.base}/algorithms/hadoop/scalding/target/scala-2.10/PredictionIO-Process-ItemRec-Algorithms-Hadoop-Scalding-assembly-0.6.2.jar
+mahout-knnuserbased.jar=${io.prediction.itemrec.base}/algorithms/hadoop/scalding/target/scala-2.10/PredictionIO-Process-ItemRec-Algorithms-Hadoop-Scalding-assembly-0.6.2.jar
+mahout-thresholduserbased.jar=${io.prediction.itemrec.base}/algorithms/hadoop/scalding/target/scala-2.10/PredictionIO-Process-ItemRec-Algorithms-Hadoop-Scalding-assembly-0.6.2.jar
+mahout-slopeone.jar=${io.prediction.itemrec.base}/algorithms/hadoop/scalding/target/scala-2.10/PredictionIO-Process-ItemRec-Algorithms-Hadoop-Scalding-assembly-0.6.2.jar
+mahout-alswr.jar=${io.prediction.itemrec.base}/algorithms/hadoop/scalding/target/scala-2.10/PredictionIO-Process-ItemRec-Algorithms-Hadoop-Scalding-assembly-0.6.2.jar
+mahout-svdsgd.jar=${io.prediction.itemrec.base}/algorithms/hadoop/scalding/target/scala-2.10/PredictionIO-Process-ItemRec-Algorithms-Hadoop-Scalding-assembly-0.6.2.jar
+mahout-svdplusplus.jar=${io.prediction.itemrec.base}/algorithms/hadoop/scalding/target/scala-2.10/PredictionIO-Process-ItemRec-Algorithms-Hadoop-Scalding-assembly-0.6.2.jar
 
-pdio-itemsimcf.jar=${io.prediction.itemsim.base}/algorithms/hadoop/scalding/target/scala-2.10/PredictionIO-Process-ItemSim-Algorithms-Hadoop-Scalding-assembly-0.6.1.jar
-pdio-itemsimlatestrank.jar=${io.prediction.itemsim.base}/algorithms/hadoop/scalding/target/scala-2.10/PredictionIO-Process-ItemSim-Algorithms-Hadoop-Scalding-assembly-0.6.1.jar
-pdio-itemsimrandomrank.jar=${io.prediction.itemsim.base}/algorithms/hadoop/scalding/target/scala-2.10/PredictionIO-Process-ItemSim-Algorithms-Hadoop-Scalding-assembly-0.6.1.jar
-mahout-itemsimcf.jar=${io.prediction.itemsim.base}/algorithms/hadoop/scalding/target/scala-2.10/PredictionIO-Process-ItemSim-Algorithms-Hadoop-Scalding-assembly-0.6.1.jar
+pdio-itemsimcf.jar=${io.prediction.itemsim.base}/algorithms/hadoop/scalding/target/scala-2.10/PredictionIO-Process-ItemSim-Algorithms-Hadoop-Scalding-assembly-0.6.2.jar
+pdio-itemsimlatestrank.jar=${io.prediction.itemsim.base}/algorithms/hadoop/scalding/target/scala-2.10/PredictionIO-Process-ItemSim-Algorithms-Hadoop-Scalding-assembly-0.6.2.jar
+pdio-itemsimrandomrank.jar=${io.prediction.itemsim.base}/algorithms/hadoop/scalding/target/scala-2.10/PredictionIO-Process-ItemSim-Algorithms-Hadoop-Scalding-assembly-0.6.2.jar
+mahout-itemsimcf.jar=${io.prediction.itemsim.base}/algorithms/hadoop/scalding/target/scala-2.10/PredictionIO-Process-ItemSim-Algorithms-Hadoop-Scalding-assembly-0.6.2.jar
 
 # PredictionIO generic scalding job
-io.prediction.algorithms.scalding.itemrec.generic.jar=${io.prediction.itemrec.base}/algorithms/hadoop/scalding/target/scala-2.10/PredictionIO-Process-ItemRec-Algorithms-Hadoop-Scalding-assembly-0.6.1.jar
+io.prediction.algorithms.scalding.itemrec.generic.jar=${io.prediction.itemrec.base}/algorithms/hadoop/scalding/target/scala-2.10/PredictionIO-Process-ItemRec-Algorithms-Hadoop-Scalding-assembly-0.6.2.jar
 
 # Itemrec Scala Mahout Algorithms
-io.prediction.algorithms.mahout.itemrec.jar=${io.prediction.itemrec.base}/algorithms/scala/mahout/target/scala-2.10/PredictionIO-Process-ItemRec-Algorithms-Scala-Mahout-assembly-0.6.1.jar
+io.prediction.algorithms.mahout.itemrec.jar=${io.prediction.itemrec.base}/algorithms/scala/mahout/target/scala-2.10/PredictionIO-Process-ItemRec-Algorithms-Scala-Mahout-assembly-0.6.2.jar
 
 # Mahout core job
 io.prediction.algorithms.mahout-core-job.jar=${io.prediction.base}/vendors/mahout-distribution-0.8/mahout-core-0.8-job.jar
diff --git a/servers/scheduler/project/Build.scala b/servers/scheduler/project/Build.scala
index a1ae4af..0f85580 100644
--- a/servers/scheduler/project/Build.scala
+++ b/servers/scheduler/project/Build.scala
@@ -5,11 +5,11 @@
 object ApplicationBuild extends Build {
 
     val appName         = "predictionio-scheduler"
-    val appVersion      = "0.6.1"
+    val appVersion      = "0.6.2"
 
     val appDependencies = Seq(
       "commons-io" % "commons-io" % "2.4",
-      "io.prediction" %% "predictionio-commons" % "0.6.1",
+      "io.prediction" %% "predictionio-commons" % "0.6.2",
       "mysql" % "mysql-connector-java" % "5.1.22",
       "org.clapper" %% "scalasti" % "1.0.0",
       "org.quartz-scheduler" % "quartz" % "2.1.7",
diff --git a/tools/conncheck/build.sbt b/tools/conncheck/build.sbt
index f9e8279..56cb40c 100644
--- a/tools/conncheck/build.sbt
+++ b/tools/conncheck/build.sbt
@@ -1,13 +1,13 @@
 name := "PredictionIO Connection Check Tool"
 
-version := "0.6.1"
+version := "0.6.2"
 
 organization := "io.prediction"
 
 scalaVersion := "2.10.2"
 
 libraryDependencies ++= Seq(
-  "io.prediction" %% "predictionio-commons" % "0.6.1",
+  "io.prediction" %% "predictionio-commons" % "0.6.2",
   "org.slf4j" % "slf4j-nop" % "1.6.0"
 )
 
diff --git a/tools/migration/0.5/appdata/build.sbt b/tools/migration/0.5/appdata/build.sbt
index 7d5a5d6..8b2ba94 100644
--- a/tools/migration/0.5/appdata/build.sbt
+++ b/tools/migration/0.5/appdata/build.sbt
@@ -1,13 +1,13 @@
 name := "PredictionIO 0.4 to 0.5 appdata Migration"
 
-version := "0.6.1"
+version := "0.6.2"
 
 organization := "io.prediction"
 
 scalaVersion := "2.10.2"
 
 libraryDependencies ++= Seq(
-  "io.prediction" %% "predictionio-commons" % "0.6.1",
+  "io.prediction" %% "predictionio-commons" % "0.6.2",
   "org.mongodb" %% "casbah" % "2.6.2",
   "org.slf4j" % "slf4j-nop" % "1.6.0"
 )
diff --git a/tools/settingsinit/build.sbt b/tools/settingsinit/build.sbt
index 3ba07e7..3dc973d 100644
--- a/tools/settingsinit/build.sbt
+++ b/tools/settingsinit/build.sbt
@@ -1,6 +1,6 @@
 name := "PredictionIO Settings Initialization"
 
-version := "0.6.1"
+version := "0.6.2"
 
 organization := "io.prediction"
 
@@ -9,7 +9,7 @@
 scalacOptions ++= Seq("-deprecation")
 
 libraryDependencies ++= Seq(
-  "io.prediction" %% "predictionio-commons" % "0.6.1"
+  "io.prediction" %% "predictionio-commons" % "0.6.2"
 )
 
 resolvers ++= Seq(
diff --git a/tools/softwaremanager/build.sbt b/tools/softwaremanager/build.sbt
index 7b3f864..2fe581d 100644
--- a/tools/softwaremanager/build.sbt
+++ b/tools/softwaremanager/build.sbt
@@ -1,6 +1,6 @@
 name := "PredictionIO Software Manager"
 
-version := "0.6.1"
+version := "0.6.2"
 
 organization := "io.prediction"
 
@@ -9,7 +9,7 @@
 scalacOptions ++= Seq("-deprecation")
 
 libraryDependencies ++= Seq(
-  "io.prediction" %% "predictionio-commons" % "0.6.1",
+  "io.prediction" %% "predictionio-commons" % "0.6.2",
   "com.github.scopt" %% "scopt" % "3.1.0",
   "commons-io" % "commons-io" % "2.4",
   "org.slf4j" % "slf4j-nop" % "1.6.0"
diff --git a/tools/softwaremanager/src/main/scala/io/prediction/tools/softwaremanager/Backup.scala b/tools/softwaremanager/src/main/scala/io/prediction/tools/softwaremanager/Backup.scala
index bbcc0b1..6f9bfd7 100644
--- a/tools/softwaremanager/src/main/scala/io/prediction/tools/softwaremanager/Backup.scala
+++ b/tools/softwaremanager/src/main/scala/io/prediction/tools/softwaremanager/Backup.scala
@@ -27,7 +27,7 @@
 
   def main(args: Array[String]) {
     val parser = new scopt.OptionParser[BackupConfig]("backup") {
-      head("PredictionIO Backup Utility", "0.6.1")
+      head("PredictionIO Backup Utility", "0.6.2")
       help("help") text("prints this usage text")
       arg[String]("<backup directory>") action { (x, c) =>
         c.copy(backupDir = x)
diff --git a/tools/softwaremanager/src/main/scala/io/prediction/tools/softwaremanager/Restore.scala b/tools/softwaremanager/src/main/scala/io/prediction/tools/softwaremanager/Restore.scala
index 8eeac37..2edd930 100644
--- a/tools/softwaremanager/src/main/scala/io/prediction/tools/softwaremanager/Restore.scala
+++ b/tools/softwaremanager/src/main/scala/io/prediction/tools/softwaremanager/Restore.scala
@@ -27,7 +27,7 @@
 
   def main(args: Array[String]) {
     val parser = new scopt.OptionParser[RestoreConfig]("restore") {
-      head("PredictionIO Restore Utility", "0.6.1")
+      head("PredictionIO Restore Utility", "0.6.2")
       help("help") text("prints this usage text")
       opt[Unit]("upgrade") action { (_, c) =>
         c.copy(upgrade = true)
diff --git a/tools/softwaremanager/src/main/scala/io/prediction/tools/softwaremanager/UpdateCheck.scala b/tools/softwaremanager/src/main/scala/io/prediction/tools/softwaremanager/UpdateCheck.scala
index 1611fcf..1e42ddf 100644
--- a/tools/softwaremanager/src/main/scala/io/prediction/tools/softwaremanager/UpdateCheck.scala
+++ b/tools/softwaremanager/src/main/scala/io/prediction/tools/softwaremanager/UpdateCheck.scala
@@ -16,7 +16,7 @@
 
   def main(args: Array[String]) {
     val parser = new scopt.OptionParser[UpdateCheckConfig]("updatecheck") {
-      head("PredictionIO Update Checker", "0.6.1")
+      head("PredictionIO Update Checker", "0.6.2")
       help("help") text("prints this usage text")
       opt[String]("localVersion") action { (x, c) =>
         c.copy(localVersion = x)
diff --git a/tools/softwaremanager/src/main/scala/io/prediction/tools/softwaremanager/Upgrade.scala b/tools/softwaremanager/src/main/scala/io/prediction/tools/softwaremanager/Upgrade.scala
index 6040909..8ef3edd 100644
--- a/tools/softwaremanager/src/main/scala/io/prediction/tools/softwaremanager/Upgrade.scala
+++ b/tools/softwaremanager/src/main/scala/io/prediction/tools/softwaremanager/Upgrade.scala
@@ -17,7 +17,7 @@
 /** Upgrades previous version to current version. */
 object Upgrade {
   def main(args: Array[String]) {
-    val thisVersion = "0.6.1"
+    val thisVersion = "0.6.2"
     val parser = new scopt.OptionParser[UpgradeConfig]("upgrade") {
       head("PredictionIO Software Upgrade Utility", thisVersion)
       help("help") text("prints this usage text")
diff --git a/tools/users/build.sbt b/tools/users/build.sbt
index af42dfd..21a0637 100644
--- a/tools/users/build.sbt
+++ b/tools/users/build.sbt
@@ -1,13 +1,13 @@
 name := "PredictionIO Users Tool"
 
-version := "0.6.1"
+version := "0.6.2"
 
 organization := "io.prediction"
 
 scalaVersion := "2.10.2"
 
 libraryDependencies ++= Seq(
-  "io.prediction" %% "predictionio-commons" % "0.6.1",
+  "io.prediction" %% "predictionio-commons" % "0.6.2",
   "commons-codec" % "commons-codec" % "1.8",
   "jline" % "jline" % "2.9"
 )
diff --git a/tools/users/src/main/scala/io/prediction/tools/users/Users.scala b/tools/users/src/main/scala/io/prediction/tools/users/Users.scala
index 6ba6581..7ad511e 100644
--- a/tools/users/src/main/scala/io/prediction/tools/users/Users.scala
+++ b/tools/users/src/main/scala/io/prediction/tools/users/Users.scala
@@ -3,6 +3,7 @@
 import io.prediction.commons._
 import jline.console._
 import org.apache.commons.codec.digest.DigestUtils
+import util.control.Breaks._
 
 object Users {
   val config = new Config()
@@ -15,23 +16,46 @@
       println(s"Cannot connect to ${config.settingsDbType}://${config.settingsDbHost}:${config.settingsDbPort}/${config.settingsDbName}. Aborting.")
       sys.exit(1)
     }
+    println()
     println("PredictionIO CLI User Management")
     println()
-    println("This utility currently only support adding users.")
+    println("1 - Add a new user")
+    println("2 - Update email of an existing user")
+    println("3 - Change password of an existing user")
+    val choice = readLine("Please enter a choice (1-3): ")
+    choice match {
+      case "1" => adduser()
+      case "2" => updateEmail()
+      case "3" => changePassword()
+      case _ => println("Unknown choice")
+    }
     println()
-    adduser()
   }
 
   def adduser() = {
     val cr = new ConsoleReader()
-    println("Adding a confirmed user")
+    println("Adding a new user")
     val email = cr.readLine("Email: ")
-    val password = cr.readLine("Password: ", new java.lang.Character('*'));
-    val firstName = cr.readLine("First name: ")
-    val lastName = cr.readLine("Last name: ")
+    
     if (users.emailExists(email)) {
       println("Email already exists. Not adding.")
     } else {
+
+      var password = ""
+      var retyped = ""
+      breakable {
+        while (true) {
+          password = cr.readLine("Password: ", new java.lang.Character('*'));
+          retyped = cr.readLine("Retype password: ", new java.lang.Character('*'));
+          if (password == retyped)
+            break
+          println("Passwords do not match. Please enter again.")
+        }
+      }
+
+      val firstName = cr.readLine("First name: ")
+      val lastName = cr.readLine("Last name: ")
+    
       users.insert(
         email = email,
         password = md5password(password),
@@ -46,4 +70,47 @@
       println("User added")
     }
   }
+
+  def updateEmail() = {
+    val cr = new ConsoleReader()
+    println("Updating email of an existing user")
+    val email = cr.readLine("Current email: ")
+    val password = cr.readLine("Password: ", new java.lang.Character('*'));
+    users.authenticateByEmail(email, md5password(password)) map { id: Int =>
+      val newEmail = cr.readLine("New email: ")
+      if (users.emailExists(newEmail)) {
+        println("New email already exists. Not updating.")
+      } else {
+        users.updateEmail(id, newEmail)
+        println("Email updated.")
+      }
+    } getOrElse {
+      println("Invalid email or password. Please try again.")
+    }
+  }
+
+  def changePassword() = {
+    val cr = new ConsoleReader()
+    println("Changing password of an existing user")
+    val email = cr.readLine("Email: ")
+    val password = cr.readLine("Old password: ", new java.lang.Character('*'));
+    users.authenticateByEmail(email, md5password(password)) map { id: Int =>
+      var newPassword = ""
+      var retyped = ""
+      breakable {
+        while (true) {
+          newPassword = cr.readLine("New password: ", new java.lang.Character('*'));
+          retyped = cr.readLine("Retype new password: ", new java.lang.Character('*'));
+          if (newPassword == retyped)
+            break
+          println("New passwords do not match. Please enter again.")
+        }
+      }
+      users.updatePassword(id, md5password(newPassword))
+      println("Password updated.")
+    } getOrElse {
+      println("Invalid email or password. Please try again.")
+    }
+  }
+
 }