Merge remote-tracking branch 'origin/livedoc' into develop
diff --git a/README.md b/README.md
index 52cf38d..252cb41 100644
--- a/README.md
+++ b/README.md
@@ -78,7 +78,7 @@
 
 *   Follow [@predictionio](https://twitter.com/predictionio) on Twitter.
 *   Read and subscribe to [the
-    Newsletter](http://prediction.us6.list-manage1.com/subscribe?u=d8c0435d851c1310fc64c6e26&id=8c6c1b46d0).
+    Newsletter](https://prediction.io/#newsletter).
 *   Join the [Community
     Forum](https://groups.google.com/forum/#!forum/predictionio-user).
 
diff --git a/bin/install.sh b/bin/install.sh
index f3d0bde..f54aac3 100755
--- a/bin/install.sh
+++ b/bin/install.sh
@@ -71,6 +71,7 @@
   echo -e "\033[1;33mForcing Docker defaults!\033[0m"
   pio_dir=${PIO_DIR}
   vendors_dir=${pio_dir}/vendors
+  source_setup=${ES_HB}
 
   spark_dir=${vendors_dir}/spark-${SPARK_VERSION}
   elasticsearch_dir=${vendors_dir}/elasticsearch-${ELASTICSEARCH_VERSION}
diff --git a/build.sbt b/build.sbt
index d39dc89..787ac3b 100644
--- a/build.sbt
+++ b/build.sbt
@@ -18,7 +18,7 @@
 
 name := "pio"
 
-version in ThisBuild := "0.9.4"
+version in ThisBuild := "0.9.5-SNAPSHOT"
 
 organization in ThisBuild := "io.prediction"
 
diff --git a/core/src/main/scala/io/prediction/workflow/Workflow.scala b/core/src/main/scala/io/prediction/workflow/Workflow.scala
index 5b3390e..c0543ab 100644
--- a/core/src/main/scala/io/prediction/workflow/Workflow.scala
+++ b/core/src/main/scala/io/prediction/workflow/Workflow.scala
@@ -103,7 +103,7 @@
       evaluator: BaseEvaluator[EEI, EQ, EP, EA, ER],
       env: Map[String, String] = WorkflowUtils.pioEnvVars,
       params: WorkflowParams = WorkflowParams()) {
-    runEvaluation(
+    runEvaluationViaCoreWorkflow(
       evaluation = evaluation,
       engine = engine,
       engineParamsList = engineParamsList,
@@ -115,7 +115,7 @@
 
   /** :: Experimental :: */
   @Experimental
-  def runEvaluation[EI, Q, P, A, R <: BaseEvaluatorResult](
+  def runEvaluationViaCoreWorkflow[EI, Q, P, A, R <: BaseEvaluatorResult](
       evaluation: Evaluation,
       engine: BaseEngine[EI, Q, P, A],
       engineParamsList: Seq[EngineParams],
diff --git a/data/src/main/scala/io/prediction/data/api/EventServer.scala b/data/src/main/scala/io/prediction/data/api/EventServer.scala
index 40bd37a..0ebc279 100644
--- a/data/src/main/scala/io/prediction/data/api/EventServer.scala
+++ b/data/src/main/scala/io/prediction/data/api/EventServer.scala
@@ -30,6 +30,7 @@
 import io.prediction.data.storage.DateTimeJson4sSupport
 import io.prediction.data.storage.Event
 import io.prediction.data.storage.EventJson4sSupport
+import io.prediction.data.storage.BatchEventsJson4sSupport
 import io.prediction.data.storage.LEvents
 import io.prediction.data.storage.Storage
 import org.json4s.DefaultFormats
@@ -46,6 +47,7 @@
 
 import scala.concurrent.ExecutionContext
 import scala.concurrent.Future
+import scala.util.{Try, Success, Failure}
 
 class EventServiceActor(
     val eventClient: LEvents,
@@ -56,11 +58,14 @@
   object Json4sProtocol extends Json4sSupport {
     implicit def json4sFormats: Formats = DefaultFormats +
       new EventJson4sSupport.APISerializer +
+      new BatchEventsJson4sSupport.APISerializer +
       // NOTE: don't use Json4s JodaTimeSerializers since it has issues,
       // some format not converted, or timezone not correct
       new DateTimeJson4sSupport.Serializer
   }
 
+  val MaxNumberOfEventsPerBatchRequest = 50
+
   val log = Logging(context.system, this)
 
   // we use the enclosing ActorContext's or ActorSystem's dispatcher for our
@@ -339,6 +344,73 @@
         }
       }
     } ~
+    path("batch" / "events.json") {
+
+      import Json4sProtocol._
+
+      post {
+        handleExceptions(Common.exceptionHandler) {
+          handleRejections(rejectionHandler) {
+            authenticate(withAccessKey) { authData =>
+              val appId = authData.appId
+              val channelId = authData.channelId
+              val allowedEvents = authData.events
+              val handleEvent: PartialFunction[Try[Event], Future[Map[String, Any]]] = {
+                case Success(event) => {
+                  if (allowedEvents.isEmpty || allowedEvents.contains(event.event)) {
+                    pluginContext.inputBlockers.values.foreach(
+                      _.process(EventInfo(
+                        appId = appId,
+                        channelId = channelId,
+                        event = event), pluginContext))
+                    val data = eventClient.futureInsert(event, appId, channelId).map { id =>
+                      pluginsActorRef ! EventInfo(
+                        appId = appId,
+                        channelId = channelId,
+                        event = event)
+                      val status = StatusCodes.Created
+                      val result = Map(
+                        "status" -> status.intValue,
+                        "eventId" -> s"${id}")
+                      if (config.stats) {
+                        statsActorRef ! Bookkeeping(appId, status, event)
+                      }
+                      result
+                    }.recover { case exception =>
+                      Map(
+                        "status" -> StatusCodes.InternalServerError.intValue,
+                        "message" -> s"${exception.getMessage()}")
+                    }
+                    data
+                  } else {
+                    Future.successful(Map(
+                      "status" -> StatusCodes.Forbidden.intValue,
+                      "message" -> s"${event.event} events are not allowed"))
+                  }
+                }
+                case Failure(exception) => {
+                  Future.successful(Map(
+                    "status" -> StatusCodes.BadRequest.intValue,
+                    "message" -> s"${exception.getMessage()}"))
+                }
+              }
+
+              entity(as[Seq[Try[Event]]]) { events =>
+                complete {
+                  if (events.length <= MaxNumberOfEventsPerBatchRequest) {
+                    Future.traverse(events)(handleEvent)
+                  } else {
+                    (StatusCodes.BadRequest,
+                      Map("message" -> (s"Batch request must have less than or equal to " +
+                        s"${MaxNumberOfEventsPerBatchRequest} events")))
+                  }
+                }
+              }
+            }
+          }
+        }
+      }
+    } ~
     path("stats.json") {
 
       import Json4sProtocol._
diff --git a/data/src/main/scala/io/prediction/data/storage/EventJson4sSupport.scala b/data/src/main/scala/io/prediction/data/storage/EventJson4sSupport.scala
index a1935d7..22243c2 100644
--- a/data/src/main/scala/io/prediction/data/storage/EventJson4sSupport.scala
+++ b/data/src/main/scala/io/prediction/data/storage/EventJson4sSupport.scala
@@ -19,6 +19,7 @@
 import io.prediction.data.{Utils => DataUtils}
 import org.joda.time.DateTime
 import org.json4s._
+import scala.util.{Try, Success, Failure}
 
 /** :: DeveloperApi ::
   * Support library for dealing with [[Event]] and JSON4S
@@ -211,3 +212,25 @@
   class APISerializer extends CustomSerializer[Event](format => (
     readJson, writeJson))
 }
+
+
+@DeveloperApi
+object BatchEventsJson4sSupport {
+  implicit val formats = DefaultFormats
+
+  @DeveloperApi
+  def readJson: PartialFunction[JValue, Seq[Try[Event]]] = {
+    case JArray(events) => {
+      events.map { event =>
+        try {
+          Success(EventJson4sSupport.readJson(event))
+        } catch {
+          case e: Exception => Failure(e)
+        }
+      }
+    }
+  }
+
+  @DeveloperApi
+  class APISerializer extends CustomSerializer[Seq[Try[Event]]](format => (readJson, Map.empty))
+}
diff --git a/data/src/main/scala/io/prediction/data/storage/Storage.scala b/data/src/main/scala/io/prediction/data/storage/Storage.scala
index 4e0e57e..3ad1400 100644
--- a/data/src/main/scala/io/prediction/data/storage/Storage.scala
+++ b/data/src/main/scala/io/prediction/data/storage/Storage.scala
@@ -15,6 +15,8 @@
 
 package io.prediction.data.storage
 
+import java.lang.reflect.InvocationTargetException
+
 import grizzled.slf4j.Logging
 import io.prediction.annotation.DeveloperApi
 
@@ -208,8 +210,8 @@
   }
 
   private def getClient(
-      clientConfig: StorageClientConfig,
-      pkg: String): BaseStorageClient = {
+    clientConfig: StorageClientConfig,
+    pkg: String): BaseStorageClient = {
     val className = "io.prediction.data.storage." + pkg + ".StorageClient"
     try {
       Class.forName(className).getConstructors()(0).newInstance(clientConfig).
@@ -224,6 +226,14 @@
     }
   }
 
+  /** Get the StorageClient config data from PIO Framework's environment variables */
+  def getConfig(sourceName: String): Option[StorageClientConfig] = {
+    if (s2cm.contains(sourceName) && s2cm.get(sourceName).nonEmpty
+      && s2cm.get(sourceName).get.nonEmpty) {
+      Some(s2cm.get(sourceName).get.get.config)
+    } else None
+  }
+
   private def updateS2CM(k: String, parallel: Boolean, test: Boolean):
   Option[ClientMeta] = {
     try {
@@ -246,7 +256,7 @@
   }
 
   private[prediction]
-  def getDataObject[T](repo: String, test: Boolean = false)
+  def getDataObjectFromRepo[T](repo: String, test: Boolean = false)
     (implicit tag: TypeTag[T]): T = {
     val repoDOMeta = repositoriesToDataObjectMeta(repo)
     val repoDOSourceName = repoDOMeta.sourceName
@@ -348,31 +358,31 @@
   }
 
   private[prediction] def getMetaDataEngineManifests(): EngineManifests =
-    getDataObject[EngineManifests](MetaDataRepository)
+    getDataObjectFromRepo[EngineManifests](MetaDataRepository)
 
   private[prediction] def getMetaDataEngineInstances(): EngineInstances =
-    getDataObject[EngineInstances](MetaDataRepository)
+    getDataObjectFromRepo[EngineInstances](MetaDataRepository)
 
   private[prediction] def getMetaDataEvaluationInstances(): EvaluationInstances =
-    getDataObject[EvaluationInstances](MetaDataRepository)
+    getDataObjectFromRepo[EvaluationInstances](MetaDataRepository)
 
   private[prediction] def getMetaDataApps(): Apps =
-    getDataObject[Apps](MetaDataRepository)
+    getDataObjectFromRepo[Apps](MetaDataRepository)
 
   private[prediction] def getMetaDataAccessKeys(): AccessKeys =
-    getDataObject[AccessKeys](MetaDataRepository)
+    getDataObjectFromRepo[AccessKeys](MetaDataRepository)
 
   private[prediction] def getMetaDataChannels(): Channels =
-    getDataObject[Channels](MetaDataRepository)
+    getDataObjectFromRepo[Channels](MetaDataRepository)
 
   private[prediction] def getModelDataModels(): Models =
-    getDataObject[Models](ModelDataRepository)
+    getDataObjectFromRepo[Models](ModelDataRepository)
 
   /** Obtains a data access object that returns [[Event]] related local data
     * structure.
     */
   def getLEvents(test: Boolean = false): LEvents =
-    getDataObject[LEvents](EventDataRepository, test = test)
+    getDataObjectFromRepo[LEvents](EventDataRepository, test = test)
 
   /** Obtains a data access object that returns [[Event]] related RDD data
     * structure.
diff --git a/data/src/main/scala/io/prediction/data/storage/jdbc/JDBCApps.scala b/data/src/main/scala/io/prediction/data/storage/jdbc/JDBCApps.scala
index 0a34aeb..498dbc4 100644
--- a/data/src/main/scala/io/prediction/data/storage/jdbc/JDBCApps.scala
+++ b/data/src/main/scala/io/prediction/data/storage/jdbc/JDBCApps.scala
@@ -41,7 +41,7 @@
       """
     } else {
       sql"""
-      insert into $tableName values(${app.id}, ${app.name}, $app{description}())
+      insert into $tableName values(${app.id}, ${app.name}, ${app.description})
       """
     }
     Some(q.updateAndReturnGeneratedKey().apply().toInt)
diff --git a/data/src/main/scala/io/prediction/data/storage/jdbc/JDBCLEvents.scala b/data/src/main/scala/io/prediction/data/storage/jdbc/JDBCLEvents.scala
index 2a15e8f..7a6de4e 100644
--- a/data/src/main/scala/io/prediction/data/storage/jdbc/JDBCLEvents.scala
+++ b/data/src/main/scala/io/prediction/data/storage/jdbc/JDBCLEvents.scala
@@ -146,8 +146,8 @@
     DB readOnly { implicit session =>
       val tableName = sqls.createUnsafely(JDBCUtils.eventTableName(namespace, appId, channelId))
       val whereClause = sqls.toAndConditionOpt(
-        startTime.map(x => sqls"startTime >= $x"),
-        untilTime.map(x => sqls"endTime < $x"),
+        startTime.map(x => sqls"eventTime >= $x"),
+        untilTime.map(x => sqls"eventTime < $x"),
         entityType.map(x => sqls"entityType = $x"),
         entityId.map(x => sqls"entityId = $x"),
         eventNames.map(x =>
diff --git a/data/test.sh b/data/test.sh
index f882e50..7a71b07 100755
--- a/data/test.sh
+++ b/data/test.sh
@@ -442,3 +442,43 @@
 checkGET "/events.json?accessKey=$accessKey&untilTime=abc" 400
 
 checkGET "/events.json?accessKey=$accessKey&startTime=2004-12-13T21:39:45.618Z&untilTime=2004-12-15T21:39:45.618Z" 200
+
+# -----
+# batch request
+# ----
+
+# normal request
+testdata='[{
+  "event" : "my_event",
+  "entityType" : "user",
+  "entityId" : "uid",
+  "targetEntityType" : "item",
+  "targetEntityId" : "iid",
+  "properties" : {
+    "someProperty" : "value1",
+    "anotherProperty" : "value2"
+  },
+  "eventTime" : "2004-12-13T21:39:45.618Z"
+}]'
+
+checkPOST "/batch/events.json?accessKey=$accessKey" "$testdata" 200
+
+# request with a malformed event (2nd event)
+# the response code is succesful but the error for individual event is reflected in the response's body.
+testdata='[{
+  "event" : "my_event_1",
+  "entityType" : "user",
+  "entityId" : "uid",
+  "eventTime" : "2004-12-13T21:39:45.618Z"
+}, {
+  "eve" : "my_event_2",
+  "entityType" : "user",
+  "entityId" : "uid",
+  "eventTime" : "2015-12-13T21:39:45.618Z"
+}]'
+
+checkPOST "/batch/events.json?accessKey=$accessKey" "$testdata" 200
+
+# request with too many events (more than 50)
+testdata=`cat data/very_long_batch_request.txt`
+checkPOST "/batch/events.json?accessKey=$accessKey" "$testdata" 400
diff --git a/data/very_long_batch_request.txt b/data/very_long_batch_request.txt
new file mode 100644
index 0000000..8fdd7e2
--- /dev/null
+++ b/data/very_long_batch_request.txt
@@ -0,0 +1,205 @@
+[{
+  "event" : "my_event",
+  "entityType" : "user",
+  "entityId" : "uid",
+},{
+  "event" : "my_event",
+  "entityType" : "user",
+  "entityId" : "uid",
+},{
+  "event" : "my_event",
+  "entityType" : "user",
+  "entityId" : "uid",
+},{
+  "event" : "my_event",
+  "entityType" : "user",
+  "entityId" : "uid",
+},{
+  "event" : "my_event",
+  "entityType" : "user",
+  "entityId" : "uid",
+},{
+  "event" : "my_event",
+  "entityType" : "user",
+  "entityId" : "uid",
+},{
+  "event" : "my_event",
+  "entityType" : "user",
+  "entityId" : "uid",
+},{
+  "event" : "my_event",
+  "entityType" : "user",
+  "entityId" : "uid",
+},{
+  "event" : "my_event",
+  "entityType" : "user",
+  "entityId" : "uid",
+},{
+  "event" : "my_event",
+  "entityType" : "user",
+  "entityId" : "uid",
+},{
+  "event" : "my_event",
+  "entityType" : "user",
+  "entityId" : "uid",
+},{
+  "event" : "my_event",
+  "entityType" : "user",
+  "entityId" : "uid",
+},{
+  "event" : "my_event",
+  "entityType" : "user",
+  "entityId" : "uid",
+},{
+  "event" : "my_event",
+  "entityType" : "user",
+  "entityId" : "uid",
+},{
+  "event" : "my_event",
+  "entityType" : "user",
+  "entityId" : "uid",
+},{
+  "event" : "my_event",
+  "entityType" : "user",
+  "entityId" : "uid",
+},{
+  "event" : "my_event",
+  "entityType" : "user",
+  "entityId" : "uid",
+},{
+  "event" : "my_event",
+  "entityType" : "user",
+  "entityId" : "uid",
+},{
+  "event" : "my_event",
+  "entityType" : "user",
+  "entityId" : "uid",
+},{
+  "event" : "my_event",
+  "entityType" : "user",
+  "entityId" : "uid",
+},{
+  "event" : "my_event",
+  "entityType" : "user",
+  "entityId" : "uid",
+},{
+  "event" : "my_event",
+  "entityType" : "user",
+  "entityId" : "uid",
+},{
+  "event" : "my_event",
+  "entityType" : "user",
+  "entityId" : "uid",
+},{
+  "event" : "my_event",
+  "entityType" : "user",
+  "entityId" : "uid",
+},{
+  "event" : "my_event",
+  "entityType" : "user",
+  "entityId" : "uid",
+},{
+  "event" : "my_event",
+  "entityType" : "user",
+  "entityId" : "uid",
+},{
+  "event" : "my_event",
+  "entityType" : "user",
+  "entityId" : "uid",
+},{
+  "event" : "my_event",
+  "entityType" : "user",
+  "entityId" : "uid",
+},{
+  "event" : "my_event",
+  "entityType" : "user",
+  "entityId" : "uid",
+},{
+  "event" : "my_event",
+  "entityType" : "user",
+  "entityId" : "uid",
+},{
+  "event" : "my_event",
+  "entityType" : "user",
+  "entityId" : "uid",
+},{
+  "event" : "my_event",
+  "entityType" : "user",
+  "entityId" : "uid",
+},{
+  "event" : "my_event",
+  "entityType" : "user",
+  "entityId" : "uid",
+},{
+  "event" : "my_event",
+  "entityType" : "user",
+  "entityId" : "uid",
+},{
+  "event" : "my_event",
+  "entityType" : "user",
+  "entityId" : "uid",
+},{
+  "event" : "my_event",
+  "entityType" : "user",
+  "entityId" : "uid",
+},{
+  "event" : "my_event",
+  "entityType" : "user",
+  "entityId" : "uid",
+},{
+  "event" : "my_event",
+  "entityType" : "user",
+  "entityId" : "uid",
+},{
+  "event" : "my_event",
+  "entityType" : "user",
+  "entityId" : "uid",
+},{
+  "event" : "my_event",
+  "entityType" : "user",
+  "entityId" : "uid",
+},{
+  "event" : "my_event",
+  "entityType" : "user",
+  "entityId" : "uid",
+},{
+  "event" : "my_event",
+  "entityType" : "user",
+  "entityId" : "uid",
+},{
+  "event" : "my_event",
+  "entityType" : "user",
+  "entityId" : "uid",
+},{
+  "event" : "my_event",
+  "entityType" : "user",
+  "entityId" : "uid",
+},{
+  "event" : "my_event",
+  "entityType" : "user",
+  "entityId" : "uid",
+},{
+  "event" : "my_event",
+  "entityType" : "user",
+  "entityId" : "uid",
+},{
+  "event" : "my_event",
+  "entityType" : "user",
+  "entityId" : "uid",
+},{
+  "event" : "my_event",
+  "entityType" : "user",
+  "entityId" : "uid",
+},{
+  "event" : "my_event",
+  "entityType" : "user",
+  "entityId" : "uid",
+},{
+  "event" : "my_event",
+  "entityType" : "user",
+  "entityId" : "uid",
+},{
+  "event" : "my_event",
+  "entityType" : "user",
+  "entityId" : "uid",
+}]
\ No newline at end of file
diff --git a/docs/manual/source/demo/textclassification.html.md b/docs/manual/source/demo/textclassification.html.md
index 8246cd0..7f8719b 100644
--- a/docs/manual/source/demo/textclassification.html.md
+++ b/docs/manual/source/demo/textclassification.html.md
@@ -500,7 +500,7 @@
     {
       "name": "regParam",
       "params": {
-        "regParam": 2,5
+        "regParam": 0.1
       }
     }
   ]
diff --git a/examples/experimental/scala-cleanup-app/README.md b/examples/experimental/scala-cleanup-app/README.md
new file mode 100644
index 0000000..98168dd
--- /dev/null
+++ b/examples/experimental/scala-cleanup-app/README.md
@@ -0,0 +1,12 @@
+# Removing old events from app
+
+## Documentation
+
+This shows how to remove old events from the certain app.
+
+Parameters in engine.json are appId and cutoffTime.
+All events in that appId before the cutoffTime are removed,
+including $set, $unset and $delete
+(so please adapt it for use when you want to preserve these special events).
+
+To use, edit `engine.json`, run `pio build` then `pio train`.
diff --git a/examples/experimental/scala-cleanup-app/build.sbt b/examples/experimental/scala-cleanup-app/build.sbt
new file mode 100644
index 0000000..dc87bee
--- /dev/null
+++ b/examples/experimental/scala-cleanup-app/build.sbt
@@ -0,0 +1,12 @@
+import AssemblyKeys._
+
+assemblySettings
+
+name := "template-scala-parallel-vanilla"
+
+organization := "io.prediction"
+
+libraryDependencies ++= Seq(
+  "io.prediction"    %% "core"          % "0.9.4-SNAPSHOT" % "provided",
+  "org.apache.spark" %% "spark-core"    % "1.3.1" % "provided",
+  "org.apache.spark" %% "spark-mllib"   % "1.3.1" % "provided")
diff --git a/examples/experimental/scala-cleanup-app/engine.json b/examples/experimental/scala-cleanup-app/engine.json
new file mode 100644
index 0000000..ab467c9
--- /dev/null
+++ b/examples/experimental/scala-cleanup-app/engine.json
@@ -0,0 +1,11 @@
+{
+  "id": "default",
+  "description": "Default settings",
+  "engineFactory": "io.prediction.examples.experimental.cleanupapp.VanillaEngine",
+  "datasource": {
+    "params" : {
+      "appId": 1000000000,
+      "cutoffTime": "2014-04-29T00:00:00.000Z"
+    }
+  }
+}
diff --git a/examples/experimental/scala-cleanup-app/project/assembly.sbt b/examples/experimental/scala-cleanup-app/project/assembly.sbt
new file mode 100644
index 0000000..54c3252
--- /dev/null
+++ b/examples/experimental/scala-cleanup-app/project/assembly.sbt
@@ -0,0 +1 @@
+addSbtPlugin("com.eed3si9n" % "sbt-assembly" % "0.11.2")
diff --git a/examples/experimental/scala-cleanup-app/src/main/scala/Algorithm.scala b/examples/experimental/scala-cleanup-app/src/main/scala/Algorithm.scala
new file mode 100644
index 0000000..2b3bbab
--- /dev/null
+++ b/examples/experimental/scala-cleanup-app/src/main/scala/Algorithm.scala
@@ -0,0 +1,32 @@
+package io.prediction.examples.experimental.cleanupapp
+
+import io.prediction.controller.P2LAlgorithm
+import io.prediction.controller.Params
+
+import org.apache.spark.SparkContext
+import org.apache.spark.SparkContext._
+import org.apache.spark.rdd.RDD
+
+import grizzled.slf4j.Logger
+
+//case class AlgorithmParams(mult: Int) extends Params
+
+//class Algorithm(val ap: AlgorithmParams)
+class Algorithm
+  extends P2LAlgorithm[TrainingData, Model, Query, PredictedResult] {
+
+  @transient lazy val logger = Logger[this.type]
+
+  def train(sc: SparkContext, data: TrainingData): Model = {
+    new Model
+  }
+
+  def predict(model: Model, query: Query): PredictedResult = {
+    // Prefix the query with the model data
+    PredictedResult(p = "")
+  }
+}
+
+class Model extends Serializable {
+  override def toString = "Model"
+}
diff --git a/examples/experimental/scala-cleanup-app/src/main/scala/DataSource.scala b/examples/experimental/scala-cleanup-app/src/main/scala/DataSource.scala
new file mode 100644
index 0000000..41ce53e
--- /dev/null
+++ b/examples/experimental/scala-cleanup-app/src/main/scala/DataSource.scala
@@ -0,0 +1,80 @@
+package io.prediction.examples.experimental.cleanupapp
+
+import io.prediction.controller.PDataSource
+import io.prediction.controller.EmptyEvaluationInfo
+import io.prediction.controller.EmptyActualResult
+import io.prediction.controller.Params
+import io.prediction.data.storage.Event
+import io.prediction.data.storage.Storage
+import io.prediction.workflow.StopAfterReadInterruption
+
+import org.apache.spark.SparkContext
+import org.apache.spark.SparkContext._
+import org.apache.spark.rdd.RDD
+import com.github.nscala_time.time.Imports._
+
+import grizzled.slf4j.Logger
+
+import scala.concurrent.ExecutionContext.Implicits.global
+import scala.concurrent.{Await, Future}
+
+case class DataSourceParams(
+  appId: Int,
+  cutoffTime: DateTime
+) extends Params
+
+class DataSource(val dsp: DataSourceParams)
+  extends PDataSource[TrainingData,
+      EmptyEvaluationInfo, Query, EmptyActualResult] {
+
+  @transient lazy val logger = Logger[this.type]
+
+  override
+  def readTraining(sc: SparkContext): TrainingData = {
+    val eventsDb = Storage.getPEvents()
+    val lEventsDb = Storage.getLEvents()
+    logger.info(s"CleanupApp: $dsp")
+
+    val countBefore = eventsDb.find(
+      appId = dsp.appId
+    )(sc).count
+    logger.info(s"Event count before cleanup: $countBefore")
+
+    val countRemove = eventsDb.find(
+      appId = dsp.appId,
+      untilTime = Some(dsp.cutoffTime)
+    )(sc).count
+    logger.info(s"Number of events to remove: $countRemove")
+
+    logger.info(s"Remove events from appId ${dsp.appId}")
+    val eventsToRemove: Array[String] = eventsDb.find(
+      appId = dsp.appId,
+      untilTime = Some(dsp.cutoffTime)
+    )(sc).map { case e =>
+      e.eventId.getOrElse("")
+    }.collect
+
+    var lastFuture: Future[Boolean] = Future[Boolean] {true}
+    eventsToRemove.foreach { case eventId =>
+      if (eventId != "") {
+        lastFuture = lEventsDb.futureDelete(eventId, dsp.appId)
+      }
+    }
+    // No, it's not correct to just wait for the last result.
+    // This program only demonstrates how to remove old events.
+    Await.result(lastFuture, scala.concurrent.duration.Duration(5, "minutes"))
+    logger.info(s"Finish cleaning up events to appId ${dsp.appId}")
+
+    val countAfter = eventsDb.find(
+      appId = dsp.appId
+    )(sc).count
+    logger.info(s"Event count after cleanup: $countAfter")
+
+    throw new StopAfterReadInterruption()
+  }
+}
+
+class TrainingData(
+) extends Serializable {
+  override def toString = ""
+}
diff --git a/examples/experimental/scala-cleanup-app/src/main/scala/Engine.scala b/examples/experimental/scala-cleanup-app/src/main/scala/Engine.scala
new file mode 100644
index 0000000..b7ac9dc
--- /dev/null
+++ b/examples/experimental/scala-cleanup-app/src/main/scala/Engine.scala
@@ -0,0 +1,19 @@
+package io.prediction.examples.experimental.cleanupapp
+
+import io.prediction.controller.IEngineFactory
+import io.prediction.controller.Engine
+import io.prediction.controller._
+
+case class Query(q: String) extends Serializable
+
+case class PredictedResult(p: String) extends Serializable
+
+object VanillaEngine extends IEngineFactory {
+  def apply() = {
+    new Engine(
+      classOf[DataSource],
+      PIdentityPreparator(classOf[DataSource]),
+      Map("" -> classOf[Algorithm]),
+      classOf[Serving])
+  }
+}
diff --git a/examples/experimental/scala-cleanup-app/src/main/scala/Preparator.scala b/examples/experimental/scala-cleanup-app/src/main/scala/Preparator.scala
new file mode 100644
index 0000000..54bba9f
--- /dev/null
+++ b/examples/experimental/scala-cleanup-app/src/main/scala/Preparator.scala
@@ -0,0 +1,22 @@
+package io.prediction.examples.experimental.cleanupapp
+
+import io.prediction.controller.PPreparator
+import io.prediction.data.storage.Event
+
+import org.apache.spark.SparkContext
+import org.apache.spark.SparkContext._
+import org.apache.spark.rdd.RDD
+
+/*
+class Preparator
+  extends PPreparator[TrainingData, PreparedData] {
+
+  def prepare(sc: SparkContext, trainingData: TrainingData): PreparedData = {
+    new PreparedData(events = trainingData.events)
+  }
+}
+
+class PreparedData(
+  val events: RDD[Event]
+) extends Serializable
+*/
diff --git a/examples/experimental/scala-cleanup-app/src/main/scala/Serving.scala b/examples/experimental/scala-cleanup-app/src/main/scala/Serving.scala
new file mode 100644
index 0000000..9898307
--- /dev/null
+++ b/examples/experimental/scala-cleanup-app/src/main/scala/Serving.scala
@@ -0,0 +1,13 @@
+package io.prediction.examples.experimental.cleanupapp
+
+import io.prediction.controller.LServing
+
+class Serving
+  extends LServing[Query, PredictedResult] {
+
+  override
+  def serve(query: Query,
+    predictedResults: Seq[PredictedResult]): PredictedResult = {
+    predictedResults.head
+  }
+}
\ No newline at end of file
diff --git a/manifest.json b/manifest.json
index 65911fb..1423946 100644
--- a/manifest.json
+++ b/manifest.json
@@ -1 +1 @@
-{"id":"AQUacEPKfAHhmU7wmxHErpEKiHwCJ1X0","version":"1d33d77ca11aaf0658d2f32e95d881007b406818","name":"PredictionIO","description":"pio-autogen-manifest","files":[],"engineFactory":""}
\ No newline at end of file
+{"id":"ipNrDsdxAtsDjZHRSAqfgOTL0PztjuR3","version":"b6a10c9f995426f7ee739156066bc26b577f2f09","name":"pio","description":"pio-autogen-manifest","files":[],"engineFactory":""}
\ No newline at end of file
diff --git a/sbt/sbt-launch-lib.bash b/sbt/sbt-launch-lib.bash
index 09f151e..10eb7fe 100644
--- a/sbt/sbt-launch-lib.bash
+++ b/sbt/sbt-launch-lib.bash
@@ -37,8 +37,8 @@
 
 acquire_sbt_jar () {
   SBT_VERSION=`awk -F "=" '/sbt\\.version/ {print $2}' $sbt_bin_dir/../project/build.properties`
-  URL1=http://typesafe.artifactoryonline.com/typesafe/ivy-releases/org.scala-sbt/sbt-launch/${SBT_VERSION}/sbt-launch.jar
-  URL2=http://repo.typesafe.com/typesafe/ivy-releases/org.scala-sbt/sbt-launch/${SBT_VERSION}/sbt-launch.jar
+  URL1=http://repo.typesafe.com/typesafe/ivy-releases/org.scala-sbt/sbt-launch/${SBT_VERSION}/sbt-launch.jar
+  URL2=http://typesafe.artifactoryonline.com/typesafe/ivy-releases/org.scala-sbt/sbt-launch/${SBT_VERSION}/sbt-launch.jar
   JAR=$sbt_bin_dir/sbt-launch-${SBT_VERSION}.jar
 
   sbt_jar=$JAR
@@ -50,9 +50,9 @@
     printf "Attempting to fetch sbt\n"
     JAR_DL=${JAR}.part
     if hash curl 2>/dev/null; then
-      (curl --progress-bar ${URL1} > ${JAR_DL} || curl --progress-bar ${URL2} > ${JAR_DL}) && mv ${JAR_DL} ${JAR}
+      (curl -L --verbose --progress-bar ${URL1} > ${JAR_DL} || curl -L --verbose --progress-bar ${URL2} > ${JAR_DL}) && mv ${JAR_DL} ${JAR}
     elif hash wget 2>/dev/null; then
-      (wget --progress=bar ${URL1} -O ${JAR_DL} || wget --progress=bar ${URL2} -O ${JAR_DL}) && mv ${JAR_DL} ${JAR}
+      (wget --verbose --progress=bar ${URL1} -O ${JAR_DL} || wget --verbose --progress=bar ${URL2} -O ${JAR_DL}) && mv ${JAR_DL} ${JAR}
     else
       printf "You do not have curl or wget installed, please install sbt manually from http://www.scala-sbt.org/\n"
       exit -1
diff --git a/tools/build.sbt b/tools/build.sbt
index 4e330bb..865386f 100644
--- a/tools/build.sbt
+++ b/tools/build.sbt
@@ -22,7 +22,7 @@
   "com.github.scopt"       %% "scopt"          % "3.2.0",
   "io.spray"               %% "spray-can"      % "1.3.2",
   "io.spray"               %% "spray-routing"  % "1.3.2",
-  "me.lessis"              %% "semverfi"       % "0.1.3",
+  "me.lessis"              % "semverfi_2.10"  % "0.1.3",
   "org.apache.hadoop"       % "hadoop-common"  % "2.5.0",
   "org.apache.hadoop"       % "hadoop-hdfs"    % "2.5.0",
   "org.apache.spark"       %% "spark-core"     % sparkVersion.value % "provided",