blob: e17b29bb76126efcb5cf3aae5ea2f7f1f3219af1 [file] [log] [blame]
package io.prediction.scheduler
import io.prediction.commons._
import io.prediction.commons.filepath._
import io.prediction.commons.settings.{ Algo, App, Engine, OfflineEval, OfflineEvalMetric, OfflineTune }
import com.github.nscala_time.time.Imports._
import org.clapper.scalasti.StringTemplate
import org.quartz.{ DisallowConcurrentExecution, PersistJobDataAfterExecution }
import org.quartz.{ InterruptableJob, Job, JobDetail, JobExecutionContext }
import org.quartz.JobBuilder.newJob
import org.quartz.JobKey.jobKey
import org.quartz.jobs.NativeJob
import play.api.Logger
import scala.collection.mutable.{ HashMap, Map, SynchronizedMap }
import scala.concurrent.future
import scala.sys.process._
import Contexts.stepExecutionContext
object Jobs {
val algoJobGroup = "predictionio-algo"
val offlineEvalJobGroup = "predictionio-offlineeval"
val offlineTuneJobGroup = "predictionio-offlinetune"
def algoJob(config: Config, app: App, engine: Engine, algo: Algo, batchcommands: Seq[String]) = {
/**
* Add a job, then build a trigger for it.
* This is necessary for updating any existing job,
* and make sure the trigger will fire.
*/
val job = newJob(classOf[AlgoJob]) withIdentity (algo.id.toString, algoJobGroup) storeDurably (true) build ()
job.getJobDataMap().put("algoid", algo.id)
job.getJobDataMap().put("engineinfoid", engine.infoid)
job
}
/**
* Offline Evaluation Flow
*
* 1. Iterate the following for a specified number of times
* 1. Perform data splitting
* 2. For each algo to be evaluated
* 1. Run algo on training set
* 2. Run all metrics on model data from the above against test set
* 2. Mark offline evaluation as finished
*/
def offlineEvalJob(config: Config, app: App, engine: Engine, offlineEval: OfflineEval) = {
/**
* Add a job, then build a trigger for it.
* This is necessary for updating any existing job,
* and make sure the trigger will fire.
*/
val job = newJob(classOf[OfflineEvalJob]) withIdentity (offlineEval.id.toString, offlineEvalJobGroup) storeDurably (true) build ()
job.getJobDataMap().put("evalid", offlineEval.id)
job
}
/**
* Offline Tuning Flow
*
* 1. Perform multiple set of data splitting
* 2. Train and evaluate baseline algo against data sets from 1.
* 3. For a specified number of iterations:
* 1. Parameter generator generates new parameters based on previous evaluation results.
* 2. Train algo and run metrics against both validation and test sets.
* 4. Mark offline evaluation as finished
*/
def offlineTuneJob(config: Config, app: App, engine: Engine, offlineTune: OfflineTune) = {
/**
* Add a job, then build a trigger for it.
* This is necessary for updating any existing job,
* and make sure the trigger will fire.
*/
val job = newJob(classOf[OfflineTuneJob]) withIdentity (offlineTune.id.toString, offlineTuneJobGroup) storeDurably (true) build ()
job.getJobDataMap().put("tuneid", offlineTune.id)
job
}
def setSharedAttributes(command: StringTemplate, config: Config, app: App,
engine: Engine, algo: Option[Algo], offlineEval: Option[OfflineEval],
metric: Option[OfflineEvalMetric],
params: Option[collection.immutable.Map[String, Any]] = None) = {
/** Custom attributes */
params map { command.setAttributes(_) }
/** OfflineEvalMetric-specific attributes */
metric map { met =>
command.setAttributes(command.attributes ++ met.params)
command.setAttribute("metricid", met.id)
command.attributes.get("iteration").getOrElse(command.setAttribute("iteration", 0))
command.attributes.get("splitset").getOrElse(command.setAttribute("splitset", "test"))
}
/** OfflineEval-specific attributes */
offlineEval map { oe =>
command.setAttribute("evalid", oe.id)
command.setAttribute("modelset", "false")
}
/** Algo-specific attributes */
algo map { alg =>
val defaultParams = Scheduler.algoInfos.get(alg.infoid) map { _.params.mapValues(_.defaultvalue) } getOrElse Map[String, String]()
command.setAttributes(command.attributes ++ defaultParams ++ alg.params)
command.setAttribute("algoid", alg.id)
command.setAttribute("localTempDir", BaseDir.algoDir(config.settingsLocalTempRoot, app.id, engine.id, alg.id, offlineEval.map(_.id)))
command.setAttribute("mahoutTempDir", BaseDir.algoDir(config.settingsHdfsRoot + "mahout_temp/", app.id, engine.id, alg.id, offlineEval.map(_.id)))
command.setAttribute("algoDir", BaseDir.algoDir(config.settingsHdfsRoot, app.id, engine.id, alg.id, offlineEval.map(_.id)))
command.setAttribute("dataFilePrefix", DataFile(config.settingsHdfsRoot, app.id, engine.id, alg.id, offlineEval.map(_.id), ""))
command.setAttribute("algoFilePrefix", AlgoFile(config.settingsHdfsRoot, app.id, engine.id, alg.id, offlineEval.map(_.id), ""))
}
/** Engine-specific attributes */
val engineDefaultParams = Scheduler.engineInfos.get(engine.infoid) map {
_.params.mapValues(_.defaultvalue)
} getOrElse Map[String, String]()
command.setAttributes(command.attributes ++ engineDefaultParams ++ engine.params)
/** Common attributes */
val appdataItems = config.getAppdataItems
command.setAttribute("base", config.base)
command.setAttribute("hadoop", Scheduler.hadoopCommand)
command.setAttribute("itemCount", appdataItems.countByAppid(app.id))
/**
* Locate JAR names
* Use those from config file first, then override with SystemInfos.
*/
config.jars foreach { kv => command.setAttribute(kv._1, kv._2) }
val systemInfosJarsR = """^jars\.(.*)""".r
config.getSettingsSystemInfos.getAll foreach { e =>
systemInfosJarsR findFirstIn e.id match {
case Some(systemInfosJarsR(jarKey)) => command.setAttribute(jarKey, e.value)
case None => Unit
}
}
command.setAttribute("configFile", Option(System.getProperty("config.file")).map(c => "-Dconfig.file=" + c).getOrElse("-Dconfig.file=conf/application.conf"))
command.setAttribute("appid", app.id)
command.setAttribute("engineid", engine.id)
command.setAttribute("hdfsRoot", config.settingsHdfsRoot)
command.setAttribute("hadoopOptions", Seq(
config.schedulerMapredMinSplitSize map { x => s"-Dmapred.min.split.size=${x}" } getOrElse "",
config.schedulerMapredMapTasks map { x => s"-Dmapred.map.tasks=${x}" } getOrElse "",
config.schedulerMapredReduceTasks map { x => s"-Dmapred.reduce.tasks=${x}" } getOrElse "").mkString(" "))
command.setAttribute("localTempRoot", config.settingsLocalTempRoot)
command.setAttribute("javaOpts", config.schedulerChildJavaOpts)
command.setAttribute("settingsDbType", config.settingsDbType)
command.setAttribute("settingsDbName", config.settingsDbName)
command.setAttribute("settingsDbHost", config.settingsDbHost.mkString(" "))
command.setAttribute("settingsDbPort", config.settingsDbPort.mkString(" "))
command.setAttribute("appdataDbType", config.appdataDbType)
command.setAttribute("appdataDbName", config.appdataDbName)
command.setAttribute("appdataDbHost", config.appdataDbHost.mkString(" "))
command.setAttribute("appdataDbPort", config.appdataDbPort.mkString(" "))
command.setAttribute("appdataTrainingDbType", config.appdataTrainingDbType)
command.setAttribute("appdataTrainingDbName", config.appdataTrainingDbName)
command.setAttribute("appdataTrainingDbHost", config.appdataTrainingDbHost.mkString(" "))
command.setAttribute("appdataTrainingDbPort", config.appdataTrainingDbPort.mkString(" "))
command.setAttribute("appdataValidationDbType", config.appdataValidationDbType)
command.setAttribute("appdataValidationDbName", config.appdataValidationDbName)
command.setAttribute("appdataValidationDbHost", config.appdataValidationDbHost.mkString(" "))
command.setAttribute("appdataValidationDbPort", config.appdataValidationDbPort.mkString(" "))
command.attributes.get("appdataTestDbType").getOrElse(command.setAttribute("appdataTestDbType", config.appdataTestDbType))
command.attributes.get("appdataTestDbName").getOrElse(command.setAttribute("appdataTestDbName", config.appdataTestDbName))
command.attributes.get("appdataTestDbHost").getOrElse(command.setAttribute("appdataTestDbHost", config.appdataTestDbHost.mkString(" ")))
command.attributes.get("appdataTestDbPort").getOrElse(command.setAttribute("appdataTestDbPort", config.appdataTestDbPort.mkString(" ")))
command.setAttribute("modeldataDbType", config.modeldataDbType)
command.setAttribute("modeldataDbName", config.modeldataDbName)
command.setAttribute("modeldataDbHost", config.modeldataDbHost.mkString(" "))
command.setAttribute("modeldataDbPort", config.modeldataDbPort.mkString(" "))
command.setAttribute("modeldataTrainingDbType", config.modeldataTrainingDbType)
command.setAttribute("modeldataTrainingDbName", config.modeldataTrainingDbName)
command.setAttribute("modeldataTrainingDbHost", config.modeldataTrainingDbHost.mkString(" "))
command.setAttribute("modeldataTrainingDbPort", config.modeldataTrainingDbPort.mkString(" "))
engine.itypes foreach { it =>
command.setAttribute("itypes", "--itypes " + it.mkString(" ")) // NOTE: a space ' ' is necessary after --itypes
command.setAttribute("itypesCSV", "--itypes " + it.mkString(","))
}
command.setAttribute("unseenOnly", engine.params.getOrElse("unseenonly", false))
command.setAttribute("recommendationTime", System.currentTimeMillis)
}
}
class UpdateCheckJob extends Job {
override def execute(context: JobExecutionContext) = {
Logger.info("Checking for new release...")
"bin/updatecheck --answer n".!
}
}
@DisallowConcurrentExecution
@PersistJobDataAfterExecution
class AlgoJob extends InterruptableJob {
@volatile
var kill = false
var exitCode: Int = 0
var finishFlag: Boolean = false
@volatile
var proc: Option[Process] = None
override def execute(context: JobExecutionContext) = {
val jobDataMap = context.getMergedJobDataMap
val algoid = jobDataMap.getInt("algoid")
val engineinfoid = jobDataMap.getString("engineinfoid")
val config = Scheduler.config
val apps = Scheduler.apps
val engines = Scheduler.engines
val algos = Scheduler.algos
val algoInfos = Scheduler.algoInfos
val logPrefix = s"Algo ID ${algoid}: "
algos.get(algoid) map { algo =>
engines.get(algo.engineid) map { engine =>
apps.get(engine.appid) map { app =>
algoInfos.get(algo.infoid) map { info =>
info.batchcommands map { batchcommands =>
Logger.info(s"${logPrefix}Current model set is ${algo.modelset}")
Logger.info(s"${logPrefix}Running database specific before-logic for model set ${!algo.modelset}")
val modelData = config.getModeldata(engine.infoid)
modelData.before(algo.id, !algo.modelset)
Logger.info(s"${logPrefix}Launching algo job for model set ${!algo.modelset}")
val commands = batchcommands map { c => Jobs.setSharedAttributes(new StringTemplate(c), config, app, engine, Some(algo), None, None, Some(collection.immutable.Map("modelset" -> !algo.modelset))).toString }
commands map { _.trim } foreach { c =>
this.synchronized {
if (!kill && !c.isEmpty && exitCode == 0) {
Logger.info(s"${logPrefix}Going to run: $c")
proc = Some(Process(c, None, "JVM_OPT" -> config.schedulerChildJavaOpts).run)
Logger.info(s"${logPrefix}Scheduler waiting for sub-process to finish")
}
}
if (!kill && !c.isEmpty && exitCode == 0) {
exitCode = proc.get.exitValue
Logger.info(s"${logPrefix}Sub-process has finished with exit code ${exitCode}")
}
}
finishFlag = true
/** Display completion information */
if (kill) {
Logger.info(s"${logPrefix}Sub-process was killed upon request")
Logger.info(s"${logPrefix}Not flipping model set flag because the algo job was killed")
} else if (exitCode == 0) {
Logger.info(s"${logPrefix}Running database specific after-logic for model set ${!algo.modelset}")
modelData.after(algo.id, !algo.modelset)
Logger.info(s"${logPrefix}Flipping model set flag to ${!algo.modelset}")
algos.update(algo.copy(modelset = !algo.modelset, lasttraintime = Some(DateTime.now)))
Logger.info(s"${logPrefix}Running database specific deletion for model set ${algo.modelset}")
modelData.delete(algo.id, algo.modelset)
Logger.info(s"${logPrefix}Job completed")
} else {
Logger.warn(s"${logPrefix}Not flipping model set flag because the algo job returned non-zero exit code")
}
} getOrElse {
Logger.warn(s"${logPrefix}Not starting algorithm training because this algorithm has no command")
}
} getOrElse {
Logger.warn(s"${logPrefix}Not starting algorithm training because information of this algorithm cannot be found from the database")
}
} getOrElse {
Logger.warn(s"${logPrefix}Not starting algorithm training because the app that owns this algorithm cannot be found from the database")
}
} getOrElse {
Logger.warn(s"${logPrefix}Not starting algorithm training because the engine that owns this algorithm cannot be found from the database")
}
} getOrElse {
Logger.warn(s"${logPrefix}Not starting algorithm training because the algorithm cannot be found from the database")
}
}
override def interrupt() = {
this.synchronized {
kill = true
proc map { _.destroy }
}
}
}
@DisallowConcurrentExecution
@PersistJobDataAfterExecution
class OfflineEvalJob extends InterruptableJob {
@volatile
var kill = false
val exitCodes: Map[String, Int] = new HashMap[String, Int] with SynchronizedMap[String, Int]
val finishFlags: Map[String, Boolean] = new HashMap[String, Boolean] with SynchronizedMap[String, Boolean]
val procs: Map[String, Process] = new HashMap[String, Process] with SynchronizedMap[String, Process]
def step(evalid: Int, iteration: Int, steptype: String, commands: Seq[String], algoid: Option[Int] = None, metricid: Option[Int] = None, algoids: Option[Seq[Int]] = None, metricids: Option[Seq[Int]] = None) = future {
val logPrefix = s"OfflineEval ID $evalid: Iteration ${iteration}: " + algoid.map(id => s"Algo ID ${id}: ").getOrElse("") + metricid.map(id => s"Metric ID ${id}: ").getOrElse("")
val key = s"${steptype}-${iteration}" + algoid.map(id => s"-${id}").getOrElse("") + metricid.map(id => s"-${id}").getOrElse("")
var abort = false
Some(steptype) collect {
case "split" => {
if (iteration > 1) {
val iterationkey = s"iteration-${iteration - 1}"
while (!finishFlags(iterationkey)) {
Thread.sleep(1000)
}
}
/** Delete old model data, if any (for recovering from an incomplete run, and clean old score for multi-iterations) */
Scheduler.offlineEvals.get(evalid) map { offlineEval =>
Scheduler.engines.get(offlineEval.engineid) map { engine =>
val algosToRun = Scheduler.algos.getByOfflineEvalid(offlineEval.id).toSeq
val modelData = Scheduler.config.getModeldataTraining(engine.infoid)
algosToRun foreach { algo =>
Logger.info(s"${logPrefix}Algo ID ${algo.id}: Deleting any old model data")
modelData.delete(algo.id, false)
}
Logger.info(s"${logPrefix}Deleting any old user-to-item actions")
Scheduler.appdataTrainingU2IActions.deleteByAppid(offlineEval.id)
Scheduler.appdataTestU2IActions.deleteByAppid(offlineEval.id)
}
}
}
case "training" => {
val splitkey = s"split-${iteration}"
val trainingkey = s"training-${iteration}-${algoid.get}"
while (!finishFlags(splitkey)) {
Thread.sleep(1000)
}
if (exitCodes(splitkey) != 0) {
abort = true
exitCodes(trainingkey) = 1
Logger.info(s"${logPrefix}(${steptype}) Aborted due to split error")
}
}
case "metric" => {
val trainingkey = s"training-${iteration}-${algoid.get}"
val metrickey = s"metric-${iteration}-${algoid.get}-${metricid.get}"
while (!finishFlags(trainingkey)) {
Thread.sleep(1000)
}
if (exitCodes(trainingkey) != 0) {
abort = true
exitCodes(metrickey) = 1
Logger.info(s"${logPrefix}(${steptype}) Aborted due to training error")
}
}
case "iteration" => {
val keys = for {
aid <- algoids.get
mid <- metricids.get
} yield s"metric-${iteration}-${aid}-${mid}"
while (!finishFlags.filterKeys(keys.contains(_)).values.reduce((a, b) => a && b)) {
Thread.sleep(1000)
}
Logger.info(s"${logPrefix}(${steptype}) Finished iteration")
}
}
commands map { _.trim } foreach { c =>
var exception = false
this.synchronized {
if (!kill && !abort && !c.isEmpty && exitCodes(key) == 0) {
Logger.info(s"${logPrefix}(${steptype}) Going to run: $c")
try {
procs(key) = Process(c).run
Logger.info(s"${logPrefix}(${steptype}) Scheduler waiting for sub-process to finish")
} catch {
case e: java.io.IOException => {
exception = true
Logger.info(s"${logPrefix}(${steptype}) ${e.getMessage}")
}
}
}
}
// Continue if the last command succeeded
if (exitCodes(key) == 0) {
procs.get(key) map { p =>
val exitCode = if (exception) 1 else p.exitValue
/** Save completion information for global access */
exitCodes(key) = exitCode
if (exception)
Logger.info(s"${logPrefix}(${steptype}) Exception trying to run sub-process")
else
Logger.info(s"${logPrefix}(${steptype}) Sub-process has finished with exit code ${exitCode}")
}
}
}
finishFlags(key) = true
/** Display completion information */
if (kill) Logger.info(s"${logPrefix}(${steptype}) Sub-process was killed upon request")
}
override def execute(context: JobExecutionContext): Unit = {
val jobDataMap = context.getMergedJobDataMap
val evalid = jobDataMap.getInt("evalid")
val config = Scheduler.config
val apps = Scheduler.apps
val engines = Scheduler.engines
val algos = Scheduler.algos
val algoInfos = Scheduler.algoInfos
val offlineEvals = Scheduler.offlineEvals
val offlineEvalSplitters = Scheduler.offlineEvalSplitters
val offlineEvalSplitterInfos = Scheduler.offlineEvalSplitterInfos
val offlineEvalMetrics = Scheduler.offlineEvalMetrics
val offlineEvalMetricInfos = Scheduler.offlineEvalMetricInfos
val logPrefix = s"OfflineEval ID $evalid: "
offlineEvals.get(evalid) map { offlineEval =>
engines.get(offlineEval.engineid) map { engine =>
apps.get(engine.appid) map { app =>
val totalIterations = offlineEval.iterations
val splittersToRun = offlineEvalSplitters.getByEvalid(offlineEval.id).toSeq
val algosToRun = algos.getByOfflineEvalid(offlineEval.id).toSeq
val metricsToRun = offlineEvalMetrics.getByEvalid(offlineEval.id).toSeq
val algoids = algosToRun map { _.id }
val metricids = metricsToRun map { _.id }
Logger.info(s"${logPrefix}Starting offline evaluation with ${totalIterations} iteration(s)")
/** Mark the start time */
val offlineEvalWithStartTime = offlineEval.copy(starttime = Some(DateTime.now))
offlineEvals.update(offlineEvalWithStartTime)
for (currentIteration <- 1 to totalIterations) {
val iterationParam = collection.immutable.Map("iteration" -> currentIteration)
/** Spiltters setup (support 1 splitter for now) */
if (splittersToRun.length > 0) {
val splitkey = s"split-${currentIteration}"
exitCodes(splitkey) = 0
finishFlags(splitkey) = false
val splitter = splittersToRun(0)
offlineEvalSplitterInfos.get(splitter.infoid) map { splitterInfo =>
splitterInfo.commands map { commands =>
val splitterCommands = commands map { c => Jobs.setSharedAttributes(new StringTemplate(c), config, app, engine, None, Some(offlineEval), None, Some(splitterInfo.params.mapValues(_.defaultvalue) ++ splitter.settings ++ iterationParam)).toString }
step(evalid, currentIteration, "split", splitterCommands)
} getOrElse {
Logger.warn(s"${logPrefix}Not doing data split because splitter information for ${splitter.infoid} contains no command")
step(evalid, currentIteration, "split", Seq())
}
} getOrElse {
Logger.warn(s"${logPrefix}Not doing data split because splitter information for ${splitter.infoid} is missing")
step(evalid, currentIteration, "split", Seq())
}
}
/** Training and metric setup */
algosToRun foreach { algo =>
val trainingkey = s"training-${currentIteration}-${algo.id}"
exitCodes(trainingkey) = 0
finishFlags(trainingkey) = false
algoInfos.get(algo.infoid) map { algoInfo =>
algoInfo.offlineevalcommands map { commands =>
val trainingCommands = commands map { c => Jobs.setSharedAttributes(new StringTemplate(c), config, app, engine, Some(algo), Some(offlineEval), None, Some(iterationParam)).toString }
step(evalid, currentIteration, "training", trainingCommands, Some(algo.id))
} getOrElse {
Logger.warn(s"${logPrefix}Algo ID ${algo.id}: Not doing training because algo information for ${algo.infoid} contains no command for offline evaluation")
step(evalid, currentIteration, "training", Seq(), Some(algo.id))
}
} getOrElse {
Logger.warn(s"${logPrefix}Algo ID ${algo.id}: Not doing training because algo information for ${algo.infoid} is missing")
step(evalid, currentIteration, "training", Seq(), Some(algo.id))
}
/** Run metrics */
metricsToRun foreach { metric =>
val metrickey = s"metric-${currentIteration}-${algo.id}-${metric.id}"
exitCodes(metrickey) = 0
finishFlags(metrickey) = false
offlineEvalMetricInfos.get(metric.infoid) map { metricInfo =>
metricInfo.commands map { commands =>
val metricCommands = commands map { c => Jobs.setSharedAttributes(new StringTemplate(c), config, app, engine, Some(algo), Some(offlineEval), Some(metric), Some(iterationParam)).toString }
step(evalid, currentIteration, "metric", metricCommands, Some(algo.id), Some(metric.id))
} getOrElse {
Logger.warn(s"${logPrefix}Algo ID ${algo.id}: Metric ID ${metric.id}: Not doing training because algo information for ${algo.infoid} contains no command for offline evaluation")
step(evalid, currentIteration, "metric", Seq(), Some(algo.id), Some(metric.id))
}
} getOrElse {
Logger.warn(s"${logPrefix}Algo ID ${algo.id}: Metric ID ${metric.id}: Not doing training because algo information for ${algo.infoid} is missing")
step(evalid, currentIteration, "metric", Seq(), Some(algo.id), Some(metric.id))
}
}
}
val iterationkey = s"iteration-${currentIteration}"
exitCodes(iterationkey) = 0
finishFlags(iterationkey) = false
step(evalid, currentIteration, "iteration", Seq(), None, None, Some(algoids), Some(metricids))
}
/** Block on the last iteration */
while (!finishFlags(s"iteration-${totalIterations}")) {
Thread.sleep(1000)
}
/** Clean up if ended normally or killed */
val sumExitCodes = exitCodes.values.sum
if (kill || sumExitCodes == 0) {
val modelData = config.getModeldataTraining(engine.infoid)
algosToRun foreach { algo =>
Logger.info(s"${logPrefix}Algo ID ${algo.id}: Deleting used model data")
modelData.delete(algo.id, false)
}
Logger.info(s"${logPrefix}Deleting used app data")
Scheduler.appdataTrainingUsers.deleteByAppid(offlineEval.id)
Scheduler.appdataTrainingItems.deleteByAppid(offlineEval.id)
Scheduler.appdataTrainingU2IActions.deleteByAppid(offlineEval.id)
Scheduler.appdataTestUsers.deleteByAppid(offlineEval.id)
Scheduler.appdataTestItems.deleteByAppid(offlineEval.id)
Scheduler.appdataTestU2IActions.deleteByAppid(offlineEval.id)
}
/** Check for errors from metric */
Logger.info(s"${logPrefix}Exit code summary:")
for (currentIteration <- 1 to totalIterations) {
Logger.info(s"${logPrefix}Iteration ${currentIteration}:")
Logger.info(s"${logPrefix} Split: " + exitCodes(s"split-${currentIteration}"))
algoids foreach { algoid =>
Logger.info(s"${logPrefix} Algo ID ${algoid}: " + exitCodes(s"training-${currentIteration}-${algoid}"))
metricids foreach { metricid =>
Logger.info(s"${logPrefix} Metric ID ${metricid}: " + exitCodes(s"metric-${currentIteration}-${algoid}-${metricid}"))
}
}
}
if (sumExitCodes != 0)
Logger.warn(s"${logPrefix}Offline evaluation completed with error(s)")
else
Logger.info(s"${logPrefix}Offline evaluation completed")
/** Mark the end time since this is used to determine whether the run has finished */
offlineEvals.update(offlineEvalWithStartTime.copy(endtime = Some(DateTime.now)))
} getOrElse {
Logger.warn(s"${logPrefix}Not starting offline evaluation because the app that owns this offline evaluation cannot be found from the database")
}
} getOrElse {
Logger.warn(s"${logPrefix}Not starting offline evaluation because the engine that owns this offline evaluation cannot be found from the database")
}
} getOrElse {
Logger.warn(s"${logPrefix}Not starting offline evaluation because the offline evaluation cannot be found from the database")
}
}
override def interrupt() = {
this.synchronized {
kill = true
procs.values map { _.destroy }
}
}
}
@DisallowConcurrentExecution
@PersistJobDataAfterExecution
class OfflineTuneJob extends InterruptableJob {
@volatile
var kill = false
val exitCodes: Map[String, Int] = new HashMap[String, Int] with SynchronizedMap[String, Int]
val finishFlags: Map[String, Boolean] = new HashMap[String, Boolean] with SynchronizedMap[String, Boolean]
val procs: Map[String, Process] = new HashMap[String, Process] with SynchronizedMap[String, Process]
def step(tuneid: Int, evalid: Int, iteration: Int, steptype: String, commands: Seq[String], algoid: Option[Int] = None, metricid: Option[Int] = None, algoids: Option[Seq[Int]] = None, metricids: Option[Seq[Int]] = None) = future {
val logPrefix = s"OfflineTune ID $tuneid: OfflineEval ID $evalid: Iteration ${iteration}: " + algoid.map(id => s"Algo ID ${id}: ").getOrElse("") + metricid.map(id => s"Metric ID ${id}: ").getOrElse("")
val key = s"${steptype}-${evalid}-${iteration}" + algoid.map(id => s"-${id}").getOrElse("") + metricid.map(id => s"-${id}").getOrElse("")
var abort = false
steptype match {
case "split" => {
/**
* if (iteration > 1) {
* val iterationkey = s"iteration-${iteration-1}"
* while (!finishFlags(iterationkey)) {
* Thread.sleep(1000)
* }
* }
*/
}
case "paramgen" => {
val keys = Scheduler.offlineEvals.getByTuneid(tuneid).toSeq.map(oe => s"iteration-${oe.id}-${iteration - 1}")
while (!finishFlags.filterKeys(keys.contains(_)).values.reduce((a, b) => a && b)) {
Thread.sleep(1000)
}
}
case "training" => {
if (iteration == 0) {
val splitkey = s"split-${evalid}-${iteration}"
while (!finishFlags(splitkey)) {
Thread.sleep(1000)
}
if (exitCodes(splitkey) != 0) {
abort = true
Logger.info(s"${logPrefix}(${steptype}) Aborted due to split error")
}
}
}
case "metric" => {
val trainingkey = s"training-${evalid}-${iteration}-${algoid.get}"
while (!finishFlags(trainingkey)) {
Thread.sleep(1000)
}
if (exitCodes(trainingkey) != 0) {
abort = true
Logger.info(s"${logPrefix}(${steptype}) Aborted due to training error")
}
}
case "iteration" => {
val keys = for {
aid <- algoids.get
mid <- metricids.get
} yield s"metric-${evalid}-${iteration}-${aid}-${mid}"
while (!finishFlags.filterKeys(keys.contains(_)).values.reduce((a, b) => a && b)) {
Thread.sleep(1000)
}
Logger.info(s"${logPrefix}(${steptype}) Finished iteration")
}
}
//val scommands = if (steptype == "iteration" || steptype == "paramgen") commands else Seq("sleep 3")
commands map { _.trim } foreach { c =>
this.synchronized {
if (!kill && !abort && !c.isEmpty && exitCodes(key) == 0) {
Logger.info(s"${logPrefix}(${steptype}) Going to run: $c")
procs(key) = Process(c).run
Logger.info(s"${logPrefix}(${steptype}) Scheduler waiting for sub-process to finish")
}
}
procs.get(key) map { p =>
val exitCode = p.exitValue
/** Save completion information for global access */
exitCodes(key) = exitCode
Logger.info(s"${logPrefix}(${steptype}) Sub-process has finished with exit code ${exitCode}")
}
}
finishFlags(key) = true
/** Display completion information */
if (kill) Logger.info(s"${logPrefix}(${steptype}) Sub-process was killed upon request")
}
override def execute(context: JobExecutionContext): Unit = {
val jobDataMap = context.getMergedJobDataMap
val tuneid = jobDataMap.getInt("tuneid")
val config = Scheduler.config
val apps = Scheduler.apps
val engines = Scheduler.engines
val algos = Scheduler.algos
val algoInfos = Scheduler.algoInfos
val offlineEvals = Scheduler.offlineEvals
val offlineEvalSplitters = Scheduler.offlineEvalSplitters
val offlineEvalSplitterInfos = Scheduler.offlineEvalSplitterInfos
val offlineEvalMetrics = Scheduler.offlineEvalMetrics
val offlineEvalMetricInfos = Scheduler.offlineEvalMetricInfos
val offlineTunes = Scheduler.offlineTunes
val paramGens = Scheduler.paramGens
val paramGenInfos = Scheduler.paramGenInfos
val logPrefix = s"OfflineTune ID $tuneid: "
val testMetricParams = collection.immutable.Map("splitset" -> "test")
val validationMetricParams = collection.immutable.Map(
"splitset" -> "validation",
"appdataTestDbType" -> config.appdataValidationDbType,
"appdataTestDbName" -> config.appdataValidationDbName,
"appdataTestDbHost" -> config.appdataValidationDbHost,
"appdataTestDbPort" -> config.appdataValidationDbPort)
offlineTunes.get(tuneid) map { offlineTune =>
engines.get(offlineTune.engineid) map { engine =>
val modelData = config.getModeldataTraining(engine.infoid)
apps.get(engine.appid) map { app =>
val totalLoops = offlineTune.loops
val offlineEvalsToRun = offlineEvals.getByTuneid(offlineTune.id).toSeq
Logger.info(s"${logPrefix}Starting offline tuning with ${offlineEvalsToRun.size} data set(s) and ${totalLoops} iteration(s) of parameter generation")
/** Mark the start time */
val offlineTuneWithStartTime = offlineTune.copy(starttime = Some(DateTime.now))
offlineTunes.update(offlineTuneWithStartTime)
/** Data splitting (done only once for each evaluation), and baseline algo evaluation */
offlineEvalsToRun foreach { offlineEval =>
val splittersToRun = offlineEvalSplitters.getByEvalid(offlineEval.id).toSeq
val algosToRun = algos.getByOfflineEvalid(offlineEval.id).toSeq.filter(_.loop.map(_ == 0).getOrElse(false))
val metricsToRun = offlineEvalMetrics.getByEvalid(offlineEval.id).toSeq
val algoids = algosToRun map { _.id }
val metricids = metricsToRun map { _.id }
/** Delete old model data, if any (usually recovering from an incomplete run) */
val algosToClean = algos.getByOfflineEvalid(offlineEval.id).toSeq.filter(_.loop.map(_ != 0).getOrElse(false))
algosToClean foreach { algo =>
Logger.info(s"${logPrefix}OfflineEval ID ${offlineEval.id}: Algo ID ${algo.id}: Deleting any old model data")
modelData.delete(algo.id, false)
algos.delete(algo.id)
}
Logger.info(s"${logPrefix}OfflineEval ID ${offlineEval.id}: Deleting any old app data")
Scheduler.appdataTrainingUsers.deleteByAppid(offlineEval.id)
Scheduler.appdataTrainingItems.deleteByAppid(offlineEval.id)
Scheduler.appdataTrainingU2IActions.deleteByAppid(offlineEval.id)
Scheduler.appdataTestUsers.deleteByAppid(offlineEval.id)
Scheduler.appdataTestItems.deleteByAppid(offlineEval.id)
Scheduler.appdataTestU2IActions.deleteByAppid(offlineEval.id)
Scheduler.appdataValidationUsers.deleteByAppid(offlineEval.id)
Scheduler.appdataValidationItems.deleteByAppid(offlineEval.id)
Scheduler.appdataValidationU2IActions.deleteByAppid(offlineEval.id)
val currentIteration = 0
val iterationParam = collection.immutable.Map("iteration" -> currentIteration)
val splitIterationParam = collection.immutable.Map("iteration" -> 1)
/** Spiltters setup (support 1 splitter for now) */
if (splittersToRun.length > 0) {
val splitter = splittersToRun(0)
val splitkey = s"split-${offlineEval.id}-${currentIteration}"
exitCodes(splitkey) = 0
finishFlags(splitkey) = false
offlineEvalSplitterInfos.get(splitter.infoid) map { splitterInfo =>
splitterInfo.commands map { commands =>
val splitterCommands = commands map { c => Jobs.setSharedAttributes(new StringTemplate(c), config, app, engine, None, Some(offlineEval), None, Some(splitterInfo.params.mapValues(_.defaultvalue) ++ splitter.settings ++ splitIterationParam)).toString }
step(tuneid, offlineEval.id, currentIteration, "split", splitterCommands)
} getOrElse {
Logger.warn(s"${logPrefix}OfflineEval ID ${offlineEval.id}: Not doing data split because splitter information for ${splitter.infoid} contains no command")
step(tuneid, offlineEval.id, currentIteration, "split", Seq())
}
} getOrElse {
Logger.warn(s"${logPrefix}OfflineEval ID ${offlineEval.id}: Not doing data split because splitter information for ${splitter.infoid} is missing")
step(tuneid, offlineEval.id, currentIteration, "split", Seq())
}
}
algosToRun foreach { algo =>
val trainingkey = s"training-${offlineEval.id}-${currentIteration}-${algo.id}"
exitCodes(trainingkey) = 0
finishFlags(trainingkey) = false
algoInfos.get(algo.infoid) map { algoInfo =>
algoInfo.offlineevalcommands map { commands =>
val trainingCommands = commands map { c => Jobs.setSharedAttributes(new StringTemplate(c), config, app, engine, Some(algo), Some(offlineEval), None, Some(iterationParam)).toString }
step(tuneid, offlineEval.id, currentIteration, "training", trainingCommands, Some(algo.id))
} getOrElse {
Logger.warn(s"${logPrefix}OfflineEval ID ${offlineEval.id}: Algo ID ${algo.id}: Not doing training because algo information for ${algo.infoid} contains no command for offline evaluation")
step(tuneid, offlineEval.id, currentIteration, "training", Seq(), Some(algo.id))
}
} getOrElse {
Logger.warn(s"${logPrefix}OfflineEval ID ${offlineEval.id}: Algo ID ${algo.id}: Not doing training because algo information for ${algo.infoid} is missing")
step(tuneid, offlineEval.id, currentIteration, "training", Seq(), Some(algo.id))
}
/** Run metrics */
metricsToRun foreach { metric =>
val metrickey = s"metric-${offlineEval.id}-${currentIteration}-${algo.id}-${metric.id}"
exitCodes(metrickey) = 0
finishFlags(metrickey) = false
offlineEvalMetricInfos.get(metric.infoid) map { metricInfo =>
metricInfo.commands map { commands =>
val metricCommands = commands map { c => Jobs.setSharedAttributes(new StringTemplate(c), config, app, engine, Some(algo), Some(offlineEval), Some(metric), Some(testMetricParams ++ iterationParam)).toString }
step(tuneid, offlineEval.id, currentIteration, "metric", metricCommands, Some(algo.id), Some(metric.id))
} getOrElse {
Logger.warn(s"${logPrefix}OfflineEval ID ${offlineEval.id}: Algo ID ${algo.id}: Metric ID ${metric.id}: Not doing training because algo information for ${algo.infoid} contains no command for offline evaluation")
step(tuneid, offlineEval.id, currentIteration, "metric", Seq(), Some(algo.id), Some(metric.id))
}
} getOrElse {
Logger.warn(s"${logPrefix}OfflineEval ID ${offlineEval.id}: Algo ID ${algo.id}: Metric ID ${metric.id}: Not doing training because algo information for ${algo.infoid} is missing")
step(tuneid, offlineEval.id, currentIteration, "metric", Seq(), Some(algo.id), Some(metric.id))
}
}
}
/** Block on baseline training and metric */
val iterationkey = s"iteration-${offlineEval.id}-${currentIteration}"
exitCodes(iterationkey) = 0
finishFlags(iterationkey) = false
step(tuneid, offlineEval.id, currentIteration, "iteration", Seq(), None, None, Some(algoids), Some(metricids))
}
/** Start iterative tuning */
/** Parameter generator setup (support 1 param gen for now) */
/** Parameter generator is run once for all offline evaluations */
val tuneSubject = algos.getTuneSubjectByOfflineTuneid(tuneid).get
val paramGensToRun = paramGens.getByTuneid(tuneid).toSeq
val paramGen = paramGensToRun(0)
val paramGenInfo = paramGenInfos.get(paramGen.infoid)
val paramGenParams = collection.immutable.Map("algoid" -> tuneSubject.id, "paramsets" -> 1, "evalids" -> offlineEvalsToRun.map(_.id.toString).reduce((a, b) => s"${a},${b}"))
for (currentLoop <- 1 to totalLoops) {
val iterationParam = collection.immutable.Map("iteration" -> currentLoop)
val loopParam = collection.immutable.Map("loop" -> currentLoop)
val paramGenKey = s"paramgen-0-${currentLoop}"
if (paramGensToRun.length > 0) {
val paramGen = paramGensToRun(0)
exitCodes(paramGenKey) = 0
finishFlags(paramGenKey) = false
paramGenInfo map { paramGenInfo =>
paramGenInfo.commands map { commands =>
val paramGenCommands = commands map { c => Jobs.setSharedAttributes(new StringTemplate(c), config, app, engine, None, None, None, Some(paramGenInfo.paramdefaults ++ paramGen.params ++ loopParam ++ paramGenParams)).toString }
step(tuneid, 0, currentLoop, "paramgen", paramGenCommands)
} getOrElse {
Logger.warn(s"${logPrefix}: Not generating parameters because generator information for ${paramGen.infoid} contains no command")
step(tuneid, 0, currentLoop, "paramgen", Seq())
}
} getOrElse {
Logger.warn(s"${logPrefix}: Not generating parameters because generator information for ${paramGen.infoid} is missing")
step(tuneid, 0, currentLoop, "paramgen", Seq())
}
}
/** Block on param gen */
while (!finishFlags(paramGenKey)) {
Thread.sleep(1000)
}
/** Evaluate generated algos */
offlineEvalsToRun foreach { offlineEval =>
/** Only 1 param set for now */
val algosToRun = algos.getByOfflineEvalid(offlineEval.id, Some(currentLoop), Some(1)).toSeq
val metricsToRun = offlineEvalMetrics.getByEvalid(offlineEval.id).toSeq
val algoids = algosToRun map { _.id }
val metricids = metricsToRun map { _.id }
algosToRun foreach { algo =>
val trainingkey = s"training-${offlineEval.id}-${currentLoop}-${algo.id}"
exitCodes(trainingkey) = 0
finishFlags(trainingkey) = false
algoInfos.get(algo.infoid) map { algoInfo =>
algoInfo.offlineevalcommands map { commands =>
val trainingCommands = commands map { c => Jobs.setSharedAttributes(new StringTemplate(c), config, app, engine, Some(algo), Some(offlineEval), None, Some(loopParam)).toString }
step(tuneid, offlineEval.id, currentLoop, "training", trainingCommands, Some(algo.id))
} getOrElse {
Logger.warn(s"${logPrefix}OfflineEval ID ${offlineEval.id}: Algo ID ${algo.id}: Not doing training because algo information for ${algo.infoid} contains no command for offline evaluation")
step(tuneid, offlineEval.id, currentLoop, "training", Seq(), Some(algo.id))
}
} getOrElse {
Logger.warn(s"${logPrefix}OfflineEval ID ${offlineEval.id}: Algo ID ${algo.id}: Not doing training because algo information for ${algo.infoid} is missing")
step(tuneid, offlineEval.id, currentLoop, "training", Seq(), Some(algo.id))
}
/** Run metrics */
metricsToRun foreach { metric =>
val metrickey = s"metric-${offlineEval.id}-${currentLoop}-${algo.id}-${metric.id}"
exitCodes(metrickey) = 0
finishFlags(metrickey) = false
offlineEvalMetricInfos.get(metric.infoid) map { metricInfo =>
metricInfo.commands map { commands =>
val validationMetricCommands = commands map { c => Jobs.setSharedAttributes(new StringTemplate(c), config, app, engine, Some(algo), Some(offlineEval), Some(metric), Some(validationMetricParams ++ iterationParam)).toString }
val testMetricCommands = commands map { c => Jobs.setSharedAttributes(new StringTemplate(c), config, app, engine, Some(algo), Some(offlineEval), Some(metric), Some(testMetricParams ++ iterationParam)).toString }
step(tuneid, offlineEval.id, currentLoop, "metric", validationMetricCommands ++ testMetricCommands, Some(algo.id), Some(metric.id))
} getOrElse {
Logger.warn(s"${logPrefix}OfflineEval ID ${offlineEval.id}: Algo ID ${algo.id}: Metric ID ${metric.id}: Not doing training because algo information for ${algo.infoid} contains no command for offline evaluation")
step(tuneid, offlineEval.id, currentLoop, "metric", Seq(), Some(algo.id), Some(metric.id))
}
} getOrElse {
Logger.warn(s"${logPrefix}OfflineEval ID ${offlineEval.id}: Algo ID ${algo.id}: Metric ID ${metric.id}: Not doing training because algo information for ${algo.infoid} is missing")
step(tuneid, offlineEval.id, currentLoop, "metric", Seq(), Some(algo.id), Some(metric.id))
}
}
}
/** Block on training and metric */
val iterationkey = s"iteration-${offlineEval.id}-${currentLoop}"
exitCodes(iterationkey) = 0
finishFlags(iterationkey) = false
step(tuneid, offlineEval.id, currentLoop, "iteration", Seq(), None, None, Some(algoids), Some(metricids))
}
}
/** Block on last iterations of all offline evaluations */
val offlineEvalKeys = offlineEvalsToRun map { o => s"iteration-${o.id}-${totalLoops}" }
while (!finishFlags.filterKeys(offlineEvalKeys.contains(_)).values.reduce((a, b) => a && b)) {
Thread.sleep(1000)
}
/** Check for errors from metric */
Logger.info(s"${logPrefix}Exit code summary:")
Logger.info(s"${logPrefix}Iteration 0:")
offlineEvalsToRun foreach { offlineEval =>
val currentIteration = 0
val algosToRun = algos.getByOfflineEvalid(offlineEval.id).toSeq.filter(_.loop.map(_ == currentIteration).getOrElse(false))
val metricsToRun = offlineEvalMetrics.getByEvalid(offlineEval.id).toSeq
val algoids = algosToRun map { _.id }
val metricids = metricsToRun map { _.id }
Logger.info(s"${logPrefix}OfflineEval ID ${offlineEval.id}: Split: " + exitCodes(s"split-${offlineEval.id}-${currentIteration}"))
algoids foreach { algoid =>
Logger.info(s"${logPrefix}OfflineEval ID ${offlineEval.id}: Algo ID ${algoid}: " + exitCodes(s"training-${offlineEval.id}-${currentIteration}-${algoid}"))
metricids foreach { metricid =>
Logger.info(s"${logPrefix}OfflineEval ID ${offlineEval.id}: Metric ID ${metricid}: " + exitCodes(s"metric-${offlineEval.id}-${currentIteration}-${algoid}-${metricid}"))
}
}
}
for (currentLoop <- 1 to totalLoops) {
Logger.info(s"${logPrefix}Iteration ${currentLoop}:")
offlineEvalsToRun foreach { offlineEval =>
val algosToRun = algos.getByOfflineEvalid(offlineEval.id, Some(currentLoop), Some(1)).toSeq
val metricsToRun = offlineEvalMetrics.getByEvalid(offlineEval.id).toSeq
val algoids = algosToRun map { _.id }
val metricids = metricsToRun map { _.id }
algoids foreach { algoid =>
Logger.info(s"${logPrefix}OfflineEval ID ${offlineEval.id}: Algo ID ${algoid}: " + exitCodes(s"training-${offlineEval.id}-${currentLoop}-${algoid}"))
metricids foreach { metricid =>
Logger.info(s"${logPrefix}OfflineEval ID ${offlineEval.id}: Metric ID ${metricid}: " + exitCodes(s"metric-${offlineEval.id}-${currentLoop}-${algoid}-${metricid}"))
}
}
}
}
if (exitCodes.values.sum != 0)
Logger.warn(s"${logPrefix}Offline tuning completed with error(s)")
else
Logger.info(s"${logPrefix}Offline tuning completed")
/** Mark the end time since this is used to determine whether the run has finished */
offlineTunes.update(offlineTuneWithStartTime.copy(endtime = Some(DateTime.now)))
/** Mark subject algo as tuned */
algos.update(tuneSubject.copy(status = "tuned"))
/** Clean up */
offlineEvalsToRun foreach { offlineEval =>
val algosToClean = algos.getByOfflineEvalid(offlineEval.id).toSeq.filter(_.loop.map(_ != 0).getOrElse(false))
algosToClean foreach { algo =>
Logger.info(s"${logPrefix}OfflineEval ID ${offlineEval.id}: Algo ID ${algo.id}: Deleting used model data")
modelData.delete(algo.id, false)
}
Logger.info(s"${logPrefix}OfflineEval ID ${offlineEval.id}: Deleting used app data")
Scheduler.appdataTrainingUsers.deleteByAppid(offlineEval.id)
Scheduler.appdataTrainingItems.deleteByAppid(offlineEval.id)
Scheduler.appdataTrainingU2IActions.deleteByAppid(offlineEval.id)
Scheduler.appdataTestUsers.deleteByAppid(offlineEval.id)
Scheduler.appdataTestItems.deleteByAppid(offlineEval.id)
Scheduler.appdataTestU2IActions.deleteByAppid(offlineEval.id)
Scheduler.appdataValidationUsers.deleteByAppid(offlineEval.id)
Scheduler.appdataValidationItems.deleteByAppid(offlineEval.id)
Scheduler.appdataValidationU2IActions.deleteByAppid(offlineEval.id)
}
} getOrElse {
Logger.warn(s"${logPrefix}Not starting offline tuning because the app that owns this offline tuning cannot be found from the database")
}
} getOrElse {
Logger.warn(s"${logPrefix}Not starting offline tuning because the engine that owns this offline tuning cannot be found from the database")
}
} getOrElse {
Logger.warn(s"${logPrefix}Not starting offline tuning because the offline tuning cannot be found from the database")
}
}
override def interrupt() = {
this.synchronized {
kill = true
procs.values map { _.destroy }
}
}
}