blob: d4e8bfda790ac27ea8542ded2783c49c8eef3246 [file] [log] [blame]
package io.prediction.examples.experimental.trimapp
import io.prediction.controller.PDataSource
import io.prediction.controller.EmptyEvaluationInfo
import io.prediction.controller.EmptyActualResult
import io.prediction.controller.Params
import io.prediction.data.storage.Event
import io.prediction.data.storage.Storage
import org.apache.spark.SparkContext
import org.apache.spark.SparkContext._
import org.apache.spark.rdd.RDD
import com.github.nscala_time.time.Imports._
import grizzled.slf4j.Logger
case class DataSourceParams(
srcAppId: Int,
dstAppId: Int,
startTime: Option[DateTime],
untilTime: Option[DateTime]
) extends Params
class DataSource(val dsp: DataSourceParams)
extends PDataSource[TrainingData,
EmptyEvaluationInfo, Query, EmptyActualResult] {
@transient lazy val logger = Logger[this.type]
override
def readTraining(sc: SparkContext): TrainingData = {
val eventsDb = Storage.getPEvents()
logger.info(s"TrimApp: $dsp")
logger.info(s"Read events from appId ${dsp.srcAppId}")
val srcEvents: RDD[Event] = eventsDb.find(
appId = dsp.srcAppId,
startTime = dsp.startTime,
untilTime = dsp.untilTime
)(sc)
val dstEvents: Array[Event] = eventsDb.find(appId = dsp.dstAppId)(sc).take(1)
if (dstEvents.size > 0) {
throw new Exception(s"DstApp ${dsp.dstAppId} is not empty. Quitting.")
}
logger.info(s"Write events to appId ${dsp.dstAppId}")
eventsDb.write(srcEvents, dsp.dstAppId)(sc)
logger.info(s"Finish writing events to appId ${dsp.dstAppId}")
new TrainingData()
}
}
class TrainingData(
) extends Serializable {
override def toString = ""
}