Merge remote-tracking branch 'origin/livedoc' into develop
diff --git a/README.md b/README.md
index 52cf38d..252cb41 100644
--- a/README.md
+++ b/README.md
@@ -78,7 +78,7 @@
* Follow [@predictionio](https://twitter.com/predictionio) on Twitter.
* Read and subscribe to [the
- Newsletter](http://prediction.us6.list-manage1.com/subscribe?u=d8c0435d851c1310fc64c6e26&id=8c6c1b46d0).
+ Newsletter](https://prediction.io/#newsletter).
* Join the [Community
Forum](https://groups.google.com/forum/#!forum/predictionio-user).
diff --git a/bin/install.sh b/bin/install.sh
index f3d0bde..f54aac3 100755
--- a/bin/install.sh
+++ b/bin/install.sh
@@ -71,6 +71,7 @@
echo -e "\033[1;33mForcing Docker defaults!\033[0m"
pio_dir=${PIO_DIR}
vendors_dir=${pio_dir}/vendors
+ source_setup=${ES_HB}
spark_dir=${vendors_dir}/spark-${SPARK_VERSION}
elasticsearch_dir=${vendors_dir}/elasticsearch-${ELASTICSEARCH_VERSION}
diff --git a/build.sbt b/build.sbt
index d39dc89..787ac3b 100644
--- a/build.sbt
+++ b/build.sbt
@@ -18,7 +18,7 @@
name := "pio"
-version in ThisBuild := "0.9.4"
+version in ThisBuild := "0.9.5-SNAPSHOT"
organization in ThisBuild := "io.prediction"
diff --git a/core/src/main/scala/io/prediction/workflow/Workflow.scala b/core/src/main/scala/io/prediction/workflow/Workflow.scala
index 5b3390e..c0543ab 100644
--- a/core/src/main/scala/io/prediction/workflow/Workflow.scala
+++ b/core/src/main/scala/io/prediction/workflow/Workflow.scala
@@ -103,7 +103,7 @@
evaluator: BaseEvaluator[EEI, EQ, EP, EA, ER],
env: Map[String, String] = WorkflowUtils.pioEnvVars,
params: WorkflowParams = WorkflowParams()) {
- runEvaluation(
+ runEvaluationViaCoreWorkflow(
evaluation = evaluation,
engine = engine,
engineParamsList = engineParamsList,
@@ -115,7 +115,7 @@
/** :: Experimental :: */
@Experimental
- def runEvaluation[EI, Q, P, A, R <: BaseEvaluatorResult](
+ def runEvaluationViaCoreWorkflow[EI, Q, P, A, R <: BaseEvaluatorResult](
evaluation: Evaluation,
engine: BaseEngine[EI, Q, P, A],
engineParamsList: Seq[EngineParams],
diff --git a/data/src/main/scala/io/prediction/data/api/EventServer.scala b/data/src/main/scala/io/prediction/data/api/EventServer.scala
index 40bd37a..0ebc279 100644
--- a/data/src/main/scala/io/prediction/data/api/EventServer.scala
+++ b/data/src/main/scala/io/prediction/data/api/EventServer.scala
@@ -30,6 +30,7 @@
import io.prediction.data.storage.DateTimeJson4sSupport
import io.prediction.data.storage.Event
import io.prediction.data.storage.EventJson4sSupport
+import io.prediction.data.storage.BatchEventsJson4sSupport
import io.prediction.data.storage.LEvents
import io.prediction.data.storage.Storage
import org.json4s.DefaultFormats
@@ -46,6 +47,7 @@
import scala.concurrent.ExecutionContext
import scala.concurrent.Future
+import scala.util.{Try, Success, Failure}
class EventServiceActor(
val eventClient: LEvents,
@@ -56,11 +58,14 @@
object Json4sProtocol extends Json4sSupport {
implicit def json4sFormats: Formats = DefaultFormats +
new EventJson4sSupport.APISerializer +
+ new BatchEventsJson4sSupport.APISerializer +
// NOTE: don't use Json4s JodaTimeSerializers since it has issues,
// some format not converted, or timezone not correct
new DateTimeJson4sSupport.Serializer
}
+ val MaxNumberOfEventsPerBatchRequest = 50
+
val log = Logging(context.system, this)
// we use the enclosing ActorContext's or ActorSystem's dispatcher for our
@@ -339,6 +344,73 @@
}
}
} ~
+ path("batch" / "events.json") {
+
+ import Json4sProtocol._
+
+ post {
+ handleExceptions(Common.exceptionHandler) {
+ handleRejections(rejectionHandler) {
+ authenticate(withAccessKey) { authData =>
+ val appId = authData.appId
+ val channelId = authData.channelId
+ val allowedEvents = authData.events
+ val handleEvent: PartialFunction[Try[Event], Future[Map[String, Any]]] = {
+ case Success(event) => {
+ if (allowedEvents.isEmpty || allowedEvents.contains(event.event)) {
+ pluginContext.inputBlockers.values.foreach(
+ _.process(EventInfo(
+ appId = appId,
+ channelId = channelId,
+ event = event), pluginContext))
+ val data = eventClient.futureInsert(event, appId, channelId).map { id =>
+ pluginsActorRef ! EventInfo(
+ appId = appId,
+ channelId = channelId,
+ event = event)
+ val status = StatusCodes.Created
+ val result = Map(
+ "status" -> status.intValue,
+ "eventId" -> s"${id}")
+ if (config.stats) {
+ statsActorRef ! Bookkeeping(appId, status, event)
+ }
+ result
+ }.recover { case exception =>
+ Map(
+ "status" -> StatusCodes.InternalServerError.intValue,
+ "message" -> s"${exception.getMessage()}")
+ }
+ data
+ } else {
+ Future.successful(Map(
+ "status" -> StatusCodes.Forbidden.intValue,
+ "message" -> s"${event.event} events are not allowed"))
+ }
+ }
+ case Failure(exception) => {
+ Future.successful(Map(
+ "status" -> StatusCodes.BadRequest.intValue,
+ "message" -> s"${exception.getMessage()}"))
+ }
+ }
+
+ entity(as[Seq[Try[Event]]]) { events =>
+ complete {
+ if (events.length <= MaxNumberOfEventsPerBatchRequest) {
+ Future.traverse(events)(handleEvent)
+ } else {
+ (StatusCodes.BadRequest,
+ Map("message" -> (s"Batch request must have less than or equal to " +
+ s"${MaxNumberOfEventsPerBatchRequest} events")))
+ }
+ }
+ }
+ }
+ }
+ }
+ }
+ } ~
path("stats.json") {
import Json4sProtocol._
diff --git a/data/src/main/scala/io/prediction/data/storage/EventJson4sSupport.scala b/data/src/main/scala/io/prediction/data/storage/EventJson4sSupport.scala
index a1935d7..22243c2 100644
--- a/data/src/main/scala/io/prediction/data/storage/EventJson4sSupport.scala
+++ b/data/src/main/scala/io/prediction/data/storage/EventJson4sSupport.scala
@@ -19,6 +19,7 @@
import io.prediction.data.{Utils => DataUtils}
import org.joda.time.DateTime
import org.json4s._
+import scala.util.{Try, Success, Failure}
/** :: DeveloperApi ::
* Support library for dealing with [[Event]] and JSON4S
@@ -211,3 +212,25 @@
class APISerializer extends CustomSerializer[Event](format => (
readJson, writeJson))
}
+
+
+@DeveloperApi
+object BatchEventsJson4sSupport {
+ implicit val formats = DefaultFormats
+
+ @DeveloperApi
+ def readJson: PartialFunction[JValue, Seq[Try[Event]]] = {
+ case JArray(events) => {
+ events.map { event =>
+ try {
+ Success(EventJson4sSupport.readJson(event))
+ } catch {
+ case e: Exception => Failure(e)
+ }
+ }
+ }
+ }
+
+ @DeveloperApi
+ class APISerializer extends CustomSerializer[Seq[Try[Event]]](format => (readJson, Map.empty))
+}
diff --git a/data/src/main/scala/io/prediction/data/storage/Storage.scala b/data/src/main/scala/io/prediction/data/storage/Storage.scala
index 4e0e57e..3ad1400 100644
--- a/data/src/main/scala/io/prediction/data/storage/Storage.scala
+++ b/data/src/main/scala/io/prediction/data/storage/Storage.scala
@@ -15,6 +15,8 @@
package io.prediction.data.storage
+import java.lang.reflect.InvocationTargetException
+
import grizzled.slf4j.Logging
import io.prediction.annotation.DeveloperApi
@@ -208,8 +210,8 @@
}
private def getClient(
- clientConfig: StorageClientConfig,
- pkg: String): BaseStorageClient = {
+ clientConfig: StorageClientConfig,
+ pkg: String): BaseStorageClient = {
val className = "io.prediction.data.storage." + pkg + ".StorageClient"
try {
Class.forName(className).getConstructors()(0).newInstance(clientConfig).
@@ -224,6 +226,14 @@
}
}
+ /** Get the StorageClient config data from PIO Framework's environment variables */
+ def getConfig(sourceName: String): Option[StorageClientConfig] = {
+ if (s2cm.contains(sourceName) && s2cm.get(sourceName).nonEmpty
+ && s2cm.get(sourceName).get.nonEmpty) {
+ Some(s2cm.get(sourceName).get.get.config)
+ } else None
+ }
+
private def updateS2CM(k: String, parallel: Boolean, test: Boolean):
Option[ClientMeta] = {
try {
@@ -246,7 +256,7 @@
}
private[prediction]
- def getDataObject[T](repo: String, test: Boolean = false)
+ def getDataObjectFromRepo[T](repo: String, test: Boolean = false)
(implicit tag: TypeTag[T]): T = {
val repoDOMeta = repositoriesToDataObjectMeta(repo)
val repoDOSourceName = repoDOMeta.sourceName
@@ -348,31 +358,31 @@
}
private[prediction] def getMetaDataEngineManifests(): EngineManifests =
- getDataObject[EngineManifests](MetaDataRepository)
+ getDataObjectFromRepo[EngineManifests](MetaDataRepository)
private[prediction] def getMetaDataEngineInstances(): EngineInstances =
- getDataObject[EngineInstances](MetaDataRepository)
+ getDataObjectFromRepo[EngineInstances](MetaDataRepository)
private[prediction] def getMetaDataEvaluationInstances(): EvaluationInstances =
- getDataObject[EvaluationInstances](MetaDataRepository)
+ getDataObjectFromRepo[EvaluationInstances](MetaDataRepository)
private[prediction] def getMetaDataApps(): Apps =
- getDataObject[Apps](MetaDataRepository)
+ getDataObjectFromRepo[Apps](MetaDataRepository)
private[prediction] def getMetaDataAccessKeys(): AccessKeys =
- getDataObject[AccessKeys](MetaDataRepository)
+ getDataObjectFromRepo[AccessKeys](MetaDataRepository)
private[prediction] def getMetaDataChannels(): Channels =
- getDataObject[Channels](MetaDataRepository)
+ getDataObjectFromRepo[Channels](MetaDataRepository)
private[prediction] def getModelDataModels(): Models =
- getDataObject[Models](ModelDataRepository)
+ getDataObjectFromRepo[Models](ModelDataRepository)
/** Obtains a data access object that returns [[Event]] related local data
* structure.
*/
def getLEvents(test: Boolean = false): LEvents =
- getDataObject[LEvents](EventDataRepository, test = test)
+ getDataObjectFromRepo[LEvents](EventDataRepository, test = test)
/** Obtains a data access object that returns [[Event]] related RDD data
* structure.
diff --git a/data/src/main/scala/io/prediction/data/storage/jdbc/JDBCApps.scala b/data/src/main/scala/io/prediction/data/storage/jdbc/JDBCApps.scala
index 0a34aeb..498dbc4 100644
--- a/data/src/main/scala/io/prediction/data/storage/jdbc/JDBCApps.scala
+++ b/data/src/main/scala/io/prediction/data/storage/jdbc/JDBCApps.scala
@@ -41,7 +41,7 @@
"""
} else {
sql"""
- insert into $tableName values(${app.id}, ${app.name}, $app{description}())
+ insert into $tableName values(${app.id}, ${app.name}, ${app.description})
"""
}
Some(q.updateAndReturnGeneratedKey().apply().toInt)
diff --git a/data/src/main/scala/io/prediction/data/storage/jdbc/JDBCLEvents.scala b/data/src/main/scala/io/prediction/data/storage/jdbc/JDBCLEvents.scala
index 2a15e8f..7a6de4e 100644
--- a/data/src/main/scala/io/prediction/data/storage/jdbc/JDBCLEvents.scala
+++ b/data/src/main/scala/io/prediction/data/storage/jdbc/JDBCLEvents.scala
@@ -146,8 +146,8 @@
DB readOnly { implicit session =>
val tableName = sqls.createUnsafely(JDBCUtils.eventTableName(namespace, appId, channelId))
val whereClause = sqls.toAndConditionOpt(
- startTime.map(x => sqls"startTime >= $x"),
- untilTime.map(x => sqls"endTime < $x"),
+ startTime.map(x => sqls"eventTime >= $x"),
+ untilTime.map(x => sqls"eventTime < $x"),
entityType.map(x => sqls"entityType = $x"),
entityId.map(x => sqls"entityId = $x"),
eventNames.map(x =>
diff --git a/data/test.sh b/data/test.sh
index f882e50..7a71b07 100755
--- a/data/test.sh
+++ b/data/test.sh
@@ -442,3 +442,43 @@
checkGET "/events.json?accessKey=$accessKey&untilTime=abc" 400
checkGET "/events.json?accessKey=$accessKey&startTime=2004-12-13T21:39:45.618Z&untilTime=2004-12-15T21:39:45.618Z" 200
+
+# -----
+# batch request
+# ----
+
+# normal request
+testdata='[{
+ "event" : "my_event",
+ "entityType" : "user",
+ "entityId" : "uid",
+ "targetEntityType" : "item",
+ "targetEntityId" : "iid",
+ "properties" : {
+ "someProperty" : "value1",
+ "anotherProperty" : "value2"
+ },
+ "eventTime" : "2004-12-13T21:39:45.618Z"
+}]'
+
+checkPOST "/batch/events.json?accessKey=$accessKey" "$testdata" 200
+
+# request with a malformed event (2nd event)
+# the response code is succesful but the error for individual event is reflected in the response's body.
+testdata='[{
+ "event" : "my_event_1",
+ "entityType" : "user",
+ "entityId" : "uid",
+ "eventTime" : "2004-12-13T21:39:45.618Z"
+}, {
+ "eve" : "my_event_2",
+ "entityType" : "user",
+ "entityId" : "uid",
+ "eventTime" : "2015-12-13T21:39:45.618Z"
+}]'
+
+checkPOST "/batch/events.json?accessKey=$accessKey" "$testdata" 200
+
+# request with too many events (more than 50)
+testdata=`cat data/very_long_batch_request.txt`
+checkPOST "/batch/events.json?accessKey=$accessKey" "$testdata" 400
diff --git a/data/very_long_batch_request.txt b/data/very_long_batch_request.txt
new file mode 100644
index 0000000..8fdd7e2
--- /dev/null
+++ b/data/very_long_batch_request.txt
@@ -0,0 +1,205 @@
+[{
+ "event" : "my_event",
+ "entityType" : "user",
+ "entityId" : "uid",
+},{
+ "event" : "my_event",
+ "entityType" : "user",
+ "entityId" : "uid",
+},{
+ "event" : "my_event",
+ "entityType" : "user",
+ "entityId" : "uid",
+},{
+ "event" : "my_event",
+ "entityType" : "user",
+ "entityId" : "uid",
+},{
+ "event" : "my_event",
+ "entityType" : "user",
+ "entityId" : "uid",
+},{
+ "event" : "my_event",
+ "entityType" : "user",
+ "entityId" : "uid",
+},{
+ "event" : "my_event",
+ "entityType" : "user",
+ "entityId" : "uid",
+},{
+ "event" : "my_event",
+ "entityType" : "user",
+ "entityId" : "uid",
+},{
+ "event" : "my_event",
+ "entityType" : "user",
+ "entityId" : "uid",
+},{
+ "event" : "my_event",
+ "entityType" : "user",
+ "entityId" : "uid",
+},{
+ "event" : "my_event",
+ "entityType" : "user",
+ "entityId" : "uid",
+},{
+ "event" : "my_event",
+ "entityType" : "user",
+ "entityId" : "uid",
+},{
+ "event" : "my_event",
+ "entityType" : "user",
+ "entityId" : "uid",
+},{
+ "event" : "my_event",
+ "entityType" : "user",
+ "entityId" : "uid",
+},{
+ "event" : "my_event",
+ "entityType" : "user",
+ "entityId" : "uid",
+},{
+ "event" : "my_event",
+ "entityType" : "user",
+ "entityId" : "uid",
+},{
+ "event" : "my_event",
+ "entityType" : "user",
+ "entityId" : "uid",
+},{
+ "event" : "my_event",
+ "entityType" : "user",
+ "entityId" : "uid",
+},{
+ "event" : "my_event",
+ "entityType" : "user",
+ "entityId" : "uid",
+},{
+ "event" : "my_event",
+ "entityType" : "user",
+ "entityId" : "uid",
+},{
+ "event" : "my_event",
+ "entityType" : "user",
+ "entityId" : "uid",
+},{
+ "event" : "my_event",
+ "entityType" : "user",
+ "entityId" : "uid",
+},{
+ "event" : "my_event",
+ "entityType" : "user",
+ "entityId" : "uid",
+},{
+ "event" : "my_event",
+ "entityType" : "user",
+ "entityId" : "uid",
+},{
+ "event" : "my_event",
+ "entityType" : "user",
+ "entityId" : "uid",
+},{
+ "event" : "my_event",
+ "entityType" : "user",
+ "entityId" : "uid",
+},{
+ "event" : "my_event",
+ "entityType" : "user",
+ "entityId" : "uid",
+},{
+ "event" : "my_event",
+ "entityType" : "user",
+ "entityId" : "uid",
+},{
+ "event" : "my_event",
+ "entityType" : "user",
+ "entityId" : "uid",
+},{
+ "event" : "my_event",
+ "entityType" : "user",
+ "entityId" : "uid",
+},{
+ "event" : "my_event",
+ "entityType" : "user",
+ "entityId" : "uid",
+},{
+ "event" : "my_event",
+ "entityType" : "user",
+ "entityId" : "uid",
+},{
+ "event" : "my_event",
+ "entityType" : "user",
+ "entityId" : "uid",
+},{
+ "event" : "my_event",
+ "entityType" : "user",
+ "entityId" : "uid",
+},{
+ "event" : "my_event",
+ "entityType" : "user",
+ "entityId" : "uid",
+},{
+ "event" : "my_event",
+ "entityType" : "user",
+ "entityId" : "uid",
+},{
+ "event" : "my_event",
+ "entityType" : "user",
+ "entityId" : "uid",
+},{
+ "event" : "my_event",
+ "entityType" : "user",
+ "entityId" : "uid",
+},{
+ "event" : "my_event",
+ "entityType" : "user",
+ "entityId" : "uid",
+},{
+ "event" : "my_event",
+ "entityType" : "user",
+ "entityId" : "uid",
+},{
+ "event" : "my_event",
+ "entityType" : "user",
+ "entityId" : "uid",
+},{
+ "event" : "my_event",
+ "entityType" : "user",
+ "entityId" : "uid",
+},{
+ "event" : "my_event",
+ "entityType" : "user",
+ "entityId" : "uid",
+},{
+ "event" : "my_event",
+ "entityType" : "user",
+ "entityId" : "uid",
+},{
+ "event" : "my_event",
+ "entityType" : "user",
+ "entityId" : "uid",
+},{
+ "event" : "my_event",
+ "entityType" : "user",
+ "entityId" : "uid",
+},{
+ "event" : "my_event",
+ "entityType" : "user",
+ "entityId" : "uid",
+},{
+ "event" : "my_event",
+ "entityType" : "user",
+ "entityId" : "uid",
+},{
+ "event" : "my_event",
+ "entityType" : "user",
+ "entityId" : "uid",
+},{
+ "event" : "my_event",
+ "entityType" : "user",
+ "entityId" : "uid",
+},{
+ "event" : "my_event",
+ "entityType" : "user",
+ "entityId" : "uid",
+}]
\ No newline at end of file
diff --git a/docs/manual/source/demo/textclassification.html.md b/docs/manual/source/demo/textclassification.html.md
index 8246cd0..7f8719b 100644
--- a/docs/manual/source/demo/textclassification.html.md
+++ b/docs/manual/source/demo/textclassification.html.md
@@ -500,7 +500,7 @@
{
"name": "regParam",
"params": {
- "regParam": 2,5
+ "regParam": 0.1
}
}
]
diff --git a/examples/experimental/scala-cleanup-app/README.md b/examples/experimental/scala-cleanup-app/README.md
new file mode 100644
index 0000000..98168dd
--- /dev/null
+++ b/examples/experimental/scala-cleanup-app/README.md
@@ -0,0 +1,12 @@
+# Removing old events from app
+
+## Documentation
+
+This shows how to remove old events from the certain app.
+
+Parameters in engine.json are appId and cutoffTime.
+All events in that appId before the cutoffTime are removed,
+including $set, $unset and $delete
+(so please adapt it for use when you want to preserve these special events).
+
+To use, edit `engine.json`, run `pio build` then `pio train`.
diff --git a/examples/experimental/scala-cleanup-app/build.sbt b/examples/experimental/scala-cleanup-app/build.sbt
new file mode 100644
index 0000000..dc87bee
--- /dev/null
+++ b/examples/experimental/scala-cleanup-app/build.sbt
@@ -0,0 +1,12 @@
+import AssemblyKeys._
+
+assemblySettings
+
+name := "template-scala-parallel-vanilla"
+
+organization := "io.prediction"
+
+libraryDependencies ++= Seq(
+ "io.prediction" %% "core" % "0.9.4-SNAPSHOT" % "provided",
+ "org.apache.spark" %% "spark-core" % "1.3.1" % "provided",
+ "org.apache.spark" %% "spark-mllib" % "1.3.1" % "provided")
diff --git a/examples/experimental/scala-cleanup-app/engine.json b/examples/experimental/scala-cleanup-app/engine.json
new file mode 100644
index 0000000..ab467c9
--- /dev/null
+++ b/examples/experimental/scala-cleanup-app/engine.json
@@ -0,0 +1,11 @@
+{
+ "id": "default",
+ "description": "Default settings",
+ "engineFactory": "io.prediction.examples.experimental.cleanupapp.VanillaEngine",
+ "datasource": {
+ "params" : {
+ "appId": 1000000000,
+ "cutoffTime": "2014-04-29T00:00:00.000Z"
+ }
+ }
+}
diff --git a/examples/experimental/scala-cleanup-app/project/assembly.sbt b/examples/experimental/scala-cleanup-app/project/assembly.sbt
new file mode 100644
index 0000000..54c3252
--- /dev/null
+++ b/examples/experimental/scala-cleanup-app/project/assembly.sbt
@@ -0,0 +1 @@
+addSbtPlugin("com.eed3si9n" % "sbt-assembly" % "0.11.2")
diff --git a/examples/experimental/scala-cleanup-app/src/main/scala/Algorithm.scala b/examples/experimental/scala-cleanup-app/src/main/scala/Algorithm.scala
new file mode 100644
index 0000000..2b3bbab
--- /dev/null
+++ b/examples/experimental/scala-cleanup-app/src/main/scala/Algorithm.scala
@@ -0,0 +1,32 @@
+package io.prediction.examples.experimental.cleanupapp
+
+import io.prediction.controller.P2LAlgorithm
+import io.prediction.controller.Params
+
+import org.apache.spark.SparkContext
+import org.apache.spark.SparkContext._
+import org.apache.spark.rdd.RDD
+
+import grizzled.slf4j.Logger
+
+//case class AlgorithmParams(mult: Int) extends Params
+
+//class Algorithm(val ap: AlgorithmParams)
+class Algorithm
+ extends P2LAlgorithm[TrainingData, Model, Query, PredictedResult] {
+
+ @transient lazy val logger = Logger[this.type]
+
+ def train(sc: SparkContext, data: TrainingData): Model = {
+ new Model
+ }
+
+ def predict(model: Model, query: Query): PredictedResult = {
+ // Prefix the query with the model data
+ PredictedResult(p = "")
+ }
+}
+
+class Model extends Serializable {
+ override def toString = "Model"
+}
diff --git a/examples/experimental/scala-cleanup-app/src/main/scala/DataSource.scala b/examples/experimental/scala-cleanup-app/src/main/scala/DataSource.scala
new file mode 100644
index 0000000..41ce53e
--- /dev/null
+++ b/examples/experimental/scala-cleanup-app/src/main/scala/DataSource.scala
@@ -0,0 +1,80 @@
+package io.prediction.examples.experimental.cleanupapp
+
+import io.prediction.controller.PDataSource
+import io.prediction.controller.EmptyEvaluationInfo
+import io.prediction.controller.EmptyActualResult
+import io.prediction.controller.Params
+import io.prediction.data.storage.Event
+import io.prediction.data.storage.Storage
+import io.prediction.workflow.StopAfterReadInterruption
+
+import org.apache.spark.SparkContext
+import org.apache.spark.SparkContext._
+import org.apache.spark.rdd.RDD
+import com.github.nscala_time.time.Imports._
+
+import grizzled.slf4j.Logger
+
+import scala.concurrent.ExecutionContext.Implicits.global
+import scala.concurrent.{Await, Future}
+
+case class DataSourceParams(
+ appId: Int,
+ cutoffTime: DateTime
+) extends Params
+
+class DataSource(val dsp: DataSourceParams)
+ extends PDataSource[TrainingData,
+ EmptyEvaluationInfo, Query, EmptyActualResult] {
+
+ @transient lazy val logger = Logger[this.type]
+
+ override
+ def readTraining(sc: SparkContext): TrainingData = {
+ val eventsDb = Storage.getPEvents()
+ val lEventsDb = Storage.getLEvents()
+ logger.info(s"CleanupApp: $dsp")
+
+ val countBefore = eventsDb.find(
+ appId = dsp.appId
+ )(sc).count
+ logger.info(s"Event count before cleanup: $countBefore")
+
+ val countRemove = eventsDb.find(
+ appId = dsp.appId,
+ untilTime = Some(dsp.cutoffTime)
+ )(sc).count
+ logger.info(s"Number of events to remove: $countRemove")
+
+ logger.info(s"Remove events from appId ${dsp.appId}")
+ val eventsToRemove: Array[String] = eventsDb.find(
+ appId = dsp.appId,
+ untilTime = Some(dsp.cutoffTime)
+ )(sc).map { case e =>
+ e.eventId.getOrElse("")
+ }.collect
+
+ var lastFuture: Future[Boolean] = Future[Boolean] {true}
+ eventsToRemove.foreach { case eventId =>
+ if (eventId != "") {
+ lastFuture = lEventsDb.futureDelete(eventId, dsp.appId)
+ }
+ }
+ // No, it's not correct to just wait for the last result.
+ // This program only demonstrates how to remove old events.
+ Await.result(lastFuture, scala.concurrent.duration.Duration(5, "minutes"))
+ logger.info(s"Finish cleaning up events to appId ${dsp.appId}")
+
+ val countAfter = eventsDb.find(
+ appId = dsp.appId
+ )(sc).count
+ logger.info(s"Event count after cleanup: $countAfter")
+
+ throw new StopAfterReadInterruption()
+ }
+}
+
+class TrainingData(
+) extends Serializable {
+ override def toString = ""
+}
diff --git a/examples/experimental/scala-cleanup-app/src/main/scala/Engine.scala b/examples/experimental/scala-cleanup-app/src/main/scala/Engine.scala
new file mode 100644
index 0000000..b7ac9dc
--- /dev/null
+++ b/examples/experimental/scala-cleanup-app/src/main/scala/Engine.scala
@@ -0,0 +1,19 @@
+package io.prediction.examples.experimental.cleanupapp
+
+import io.prediction.controller.IEngineFactory
+import io.prediction.controller.Engine
+import io.prediction.controller._
+
+case class Query(q: String) extends Serializable
+
+case class PredictedResult(p: String) extends Serializable
+
+object VanillaEngine extends IEngineFactory {
+ def apply() = {
+ new Engine(
+ classOf[DataSource],
+ PIdentityPreparator(classOf[DataSource]),
+ Map("" -> classOf[Algorithm]),
+ classOf[Serving])
+ }
+}
diff --git a/examples/experimental/scala-cleanup-app/src/main/scala/Preparator.scala b/examples/experimental/scala-cleanup-app/src/main/scala/Preparator.scala
new file mode 100644
index 0000000..54bba9f
--- /dev/null
+++ b/examples/experimental/scala-cleanup-app/src/main/scala/Preparator.scala
@@ -0,0 +1,22 @@
+package io.prediction.examples.experimental.cleanupapp
+
+import io.prediction.controller.PPreparator
+import io.prediction.data.storage.Event
+
+import org.apache.spark.SparkContext
+import org.apache.spark.SparkContext._
+import org.apache.spark.rdd.RDD
+
+/*
+class Preparator
+ extends PPreparator[TrainingData, PreparedData] {
+
+ def prepare(sc: SparkContext, trainingData: TrainingData): PreparedData = {
+ new PreparedData(events = trainingData.events)
+ }
+}
+
+class PreparedData(
+ val events: RDD[Event]
+) extends Serializable
+*/
diff --git a/examples/experimental/scala-cleanup-app/src/main/scala/Serving.scala b/examples/experimental/scala-cleanup-app/src/main/scala/Serving.scala
new file mode 100644
index 0000000..9898307
--- /dev/null
+++ b/examples/experimental/scala-cleanup-app/src/main/scala/Serving.scala
@@ -0,0 +1,13 @@
+package io.prediction.examples.experimental.cleanupapp
+
+import io.prediction.controller.LServing
+
+class Serving
+ extends LServing[Query, PredictedResult] {
+
+ override
+ def serve(query: Query,
+ predictedResults: Seq[PredictedResult]): PredictedResult = {
+ predictedResults.head
+ }
+}
\ No newline at end of file
diff --git a/manifest.json b/manifest.json
index 65911fb..1423946 100644
--- a/manifest.json
+++ b/manifest.json
@@ -1 +1 @@
-{"id":"AQUacEPKfAHhmU7wmxHErpEKiHwCJ1X0","version":"1d33d77ca11aaf0658d2f32e95d881007b406818","name":"PredictionIO","description":"pio-autogen-manifest","files":[],"engineFactory":""}
\ No newline at end of file
+{"id":"ipNrDsdxAtsDjZHRSAqfgOTL0PztjuR3","version":"b6a10c9f995426f7ee739156066bc26b577f2f09","name":"pio","description":"pio-autogen-manifest","files":[],"engineFactory":""}
\ No newline at end of file
diff --git a/sbt/sbt-launch-lib.bash b/sbt/sbt-launch-lib.bash
index 09f151e..10eb7fe 100644
--- a/sbt/sbt-launch-lib.bash
+++ b/sbt/sbt-launch-lib.bash
@@ -37,8 +37,8 @@
acquire_sbt_jar () {
SBT_VERSION=`awk -F "=" '/sbt\\.version/ {print $2}' $sbt_bin_dir/../project/build.properties`
- URL1=http://typesafe.artifactoryonline.com/typesafe/ivy-releases/org.scala-sbt/sbt-launch/${SBT_VERSION}/sbt-launch.jar
- URL2=http://repo.typesafe.com/typesafe/ivy-releases/org.scala-sbt/sbt-launch/${SBT_VERSION}/sbt-launch.jar
+ URL1=http://repo.typesafe.com/typesafe/ivy-releases/org.scala-sbt/sbt-launch/${SBT_VERSION}/sbt-launch.jar
+ URL2=http://typesafe.artifactoryonline.com/typesafe/ivy-releases/org.scala-sbt/sbt-launch/${SBT_VERSION}/sbt-launch.jar
JAR=$sbt_bin_dir/sbt-launch-${SBT_VERSION}.jar
sbt_jar=$JAR
@@ -50,9 +50,9 @@
printf "Attempting to fetch sbt\n"
JAR_DL=${JAR}.part
if hash curl 2>/dev/null; then
- (curl --progress-bar ${URL1} > ${JAR_DL} || curl --progress-bar ${URL2} > ${JAR_DL}) && mv ${JAR_DL} ${JAR}
+ (curl -L --verbose --progress-bar ${URL1} > ${JAR_DL} || curl -L --verbose --progress-bar ${URL2} > ${JAR_DL}) && mv ${JAR_DL} ${JAR}
elif hash wget 2>/dev/null; then
- (wget --progress=bar ${URL1} -O ${JAR_DL} || wget --progress=bar ${URL2} -O ${JAR_DL}) && mv ${JAR_DL} ${JAR}
+ (wget --verbose --progress=bar ${URL1} -O ${JAR_DL} || wget --verbose --progress=bar ${URL2} -O ${JAR_DL}) && mv ${JAR_DL} ${JAR}
else
printf "You do not have curl or wget installed, please install sbt manually from http://www.scala-sbt.org/\n"
exit -1
diff --git a/tools/build.sbt b/tools/build.sbt
index 4e330bb..865386f 100644
--- a/tools/build.sbt
+++ b/tools/build.sbt
@@ -22,7 +22,7 @@
"com.github.scopt" %% "scopt" % "3.2.0",
"io.spray" %% "spray-can" % "1.3.2",
"io.spray" %% "spray-routing" % "1.3.2",
- "me.lessis" %% "semverfi" % "0.1.3",
+ "me.lessis" % "semverfi_2.10" % "0.1.3",
"org.apache.hadoop" % "hadoop-common" % "2.5.0",
"org.apache.hadoop" % "hadoop-hdfs" % "2.5.0",
"org.apache.spark" %% "spark-core" % sparkVersion.value % "provided",