blob: e035e358fc27b22223cc6b0f59ead4639f4358ea [file] [log] [blame]
package io.prediction.scheduler
import io.prediction.commons._
import collection.JavaConversions._
import play.api._
import play.api.libs.json._
import play.api.mvc._
import org.quartz.impl.matchers.GroupMatcher._;
import org.quartz.impl.StdSchedulerFactory
import org.quartz.JobBuilder.newJob
import org.quartz.JobKey.jobKey
import org.quartz.SimpleScheduleBuilder.simpleSchedule
import org.quartz.TriggerBuilder.newTrigger
import org.quartz.TriggerKey.triggerKey
import org.quartz.UnableToInterruptJobException
object Scheduler extends Controller {
/** Get settings. */
val config = new Config
val users = config.getSettingsUsers
val apps = config.getSettingsApps
val engines = config.getSettingsEngines
val engineInfos = config.getSettingsEngineInfos
val algos = config.getSettingsAlgos
val algoInfos = config.getSettingsAlgoInfos
val offlineEvals = config.getSettingsOfflineEvals
val offlineEvalSplitters = config.getSettingsOfflineEvalSplitters
val offlineEvalSplitterInfos = config.getSettingsOfflineEvalSplitterInfos
val offlineEvalMetrics = config.getSettingsOfflineEvalMetrics
val offlineEvalMetricInfos = config.getSettingsOfflineEvalMetricInfos
val offlineEvalResults = config.getSettingsOfflineEvalResults
val offlineTunes = config.getSettingsOfflineTunes
val paramGens = config.getSettingsParamGens
val paramGenInfos = config.getSettingsParamGenInfos
val systemInfos = config.getSettingsSystemInfos
val itemRecScores = config.getModeldataItemRecScores
val trainingItemRecScores = config.getModeldataTrainingItemRecScores
val itemSimScores = config.getModeldataItemSimScores
val trainingItemSimScores = config.getModeldataTrainingItemSimScores
val appdataTrainingUsers = config.getAppdataTrainingUsers()
val appdataTrainingItems = config.getAppdataTrainingItems()
val appdataTrainingU2IActions = config.getAppdataTrainingU2IActions()
val appdataTestUsers = config.getAppdataTestUsers()
val appdataTestItems = config.getAppdataTestItems()
val appdataTestU2IActions = config.getAppdataTestU2IActions()
val appdataValidationUsers = config.getAppdataValidationUsers()
val appdataValidationItems = config.getAppdataValidationItems()
val appdataValidationU2IActions = config.getAppdataValidationU2IActions()
val scheduler = StdSchedulerFactory.getDefaultScheduler()
val jobTree = new JobTreeJobListener("predictionio-algo")
scheduler.getListenerManager.addJobListener(jobTree)
/** Try search path if hadoop home is not set. */
val hadoopCommand = config.settingsHadoopHome map { h => h + "/bin/hadoop" } getOrElse { "hadoop" }
/** Schedule update check if enabled. */
if (config.settingsSchedulerUpdatecheck) {
val updateCheckJob = newJob(classOf[UpdateCheckJob]) withIdentity ("updatecheck", "updatecheck") build ()
val updateCheckTrigger = newTrigger() forJob (jobKey("updatecheck", "updatecheck")) withIdentity ("updatecheck", "updatecheck") startNow () withSchedule (simpleSchedule() withIntervalInHours (24) repeatForever ()) build ()
scheduler.scheduleJob(updateCheckJob, updateCheckTrigger)
}
/** Sync the scheduler once against settings database. */
def syncAllUsers() = {
users.getAll foreach { user =>
syncUserJobs(user.id)
}
}
def online() = Action { Ok("PredictionIO Scheduler is online.") }
def userSync(userid: Int) = Action {
try {
syncUserJobs(userid)
/** Complete synchronization. */
Ok(Json.obj("message" -> "Synchronized algorithms settings with scheduler successfully."))
} catch {
case e: RuntimeException =>
e.printStackTrace; NotFound(Json.obj("message" -> ("Synchronization failed: " + e.getMessage())))
case e: Exception => InternalServerError(Json.obj("message" -> ("Synchronization failed: " + e.getMessage())))
}
}
def syncUserJobs(userid: Int) = {
/** Remove jobs that do not correspond to an algo. */
scheduler.getJobKeys(groupEquals(Jobs.algoJobGroup)) foreach { jobKey =>
val algoid = jobKey.getName().toInt
algos.get(algoid) getOrElse {
Logger.info("Found job for algo ID " + algoid + " in scheduler but not in settings. Removing job from scheduler.")
scheduler.deleteJob(jobKey)
}
}
/** Synchronize every app of the user. */
Logger.info(s"User ID ${userid}: Synchronizing settings")
apps.getByUserid(userid) foreach { app =>
engines.getByAppid(app.id) foreach { engine =>
/** Algos. */
syncAlgoJobs(app, engine, false)
/** Offline evaluations. */
offlineEvals.getByEngineid(engine.id) foreach { offlineEval =>
/** Work on those that is not part of auto tuning */
offlineEval.tuneid getOrElse {
val offlineEvalid = offlineEval.id.toString
val triggerkey = triggerKey(offlineEvalid, Jobs.offlineEvalJobGroup)
offlineEval.createtime foreach { ct =>
if (scheduler.checkExists(triggerkey) == false) {
offlineEval.endtime getOrElse {
val offlineEvalJob = Jobs.offlineEvalJob(config, app, engine, offlineEval)
scheduler.addJob(offlineEvalJob, true)
val trigger = newTrigger() forJob (jobKey(offlineEvalid, Jobs.offlineEvalJobGroup)) withIdentity (offlineEvalid, Jobs.offlineEvalJobGroup) startNow () build ()
scheduler.scheduleJob(trigger)
}
}
}
}
}
/** Auto tunings. */
offlineTunes.getByEngineid(engine.id) foreach { offlineTune =>
/** Work on those that is not part of auto tuning */
val offlineTuneid = offlineTune.id.toString
val triggerkey = triggerKey(offlineTuneid, Jobs.offlineTuneJobGroup)
offlineTune.createtime foreach { ct =>
if (scheduler.checkExists(triggerkey) == false) {
offlineTune.endtime getOrElse {
val offlineTuneJob = Jobs.offlineTuneJob(config, app, engine, offlineTune)
scheduler.addJob(offlineTuneJob, true)
val trigger = newTrigger() forJob (jobKey(offlineTuneid, Jobs.offlineTuneJobGroup)) withIdentity (offlineTuneid, Jobs.offlineTuneJobGroup) startNow () build ()
scheduler.scheduleJob(trigger)
}
}
}
}
}
}
}
/** Run training of deployed algorithms immediately */
def syncAlgoJobs(app: settings.App, engine: settings.Engine, runoncenow: Boolean = false) = {
/** Algos. */
algos.getByEngineid(engine.id) foreach { algo =>
val logPrefix = s"Algo ID ${algo.id}: "
algoInfos.get(algo.infoid) map { algoinfo =>
val algoid = algo.id.toString
val triggerkey = triggerKey(algoid, Jobs.algoJobGroup)
if (algo.status == "deployed") {
/** Running once now is independent of whether the trigger exist or not */
if (runoncenow) {
Logger.info(s"${logPrefix}Setting up batch algo job (run once now)")
algoinfo.batchcommands map { batchcommands =>
val job = Jobs.algoJob(config, app, engine, algo, batchcommands)
scheduler.addJob(job, true)
val trigger = newTrigger() forJob (jobKey(algoid, Jobs.algoJobGroup)) withIdentity (s"${algoid}-runonce", Jobs.algoJobGroup) startNow () build ()
scheduler.scheduleJob(trigger)
} getOrElse {
Logger.info(s"${logPrefix}Giving up setting up batch algo job because it does not have any batch command")
}
} else if (scheduler.checkExists(triggerkey) == false) {
Logger.info(s"${logPrefix}Setting up batch algo job (run every hour from now)")
algoinfo.batchcommands map { batchcommands =>
val job = Jobs.algoJob(config, app, engine, algo, batchcommands)
scheduler.addJob(job, true)
val trigger = newTrigger() forJob (jobKey(algoid, Jobs.algoJobGroup)) withIdentity (algoid, Jobs.algoJobGroup) startNow () withSchedule (simpleSchedule() withIntervalInHours (1) repeatForever ()) build ()
scheduler.scheduleJob(trigger)
} getOrElse {
Logger.info(s"${logPrefix}Giving up setting up batch algo job because it does not have any batch command")
}
}
} else {
/** Stop any algo job if it is undeployed */
val algoJobKey = jobKey(algoid, Jobs.algoJobGroup)
if (scheduler.checkExists(algoJobKey)) {
/** The following checks only jobs in this particular scheduler node. */
/** TODO: Clustering support. */
try {
val running = scheduler.getCurrentlyExecutingJobs() map { context =>
val jobDetail = context.getJobDetail()
val jobKey = jobDetail.getKey()
jobKey.getName() == algoid
} reduce { (a, b) => a || b }
if (running) {
try {
scheduler.interrupt(algoJobKey)
Logger.info(s"${logPrefix}Stopping training")
} catch {
case e: UnableToInterruptJobException => Logger.warn(s"${logPrefix}Unable to stop training")
}
}
} catch {
case e: UnsupportedOperationException => Logger.info(s"${logPrefix}No training is running")
}
}
if (scheduler.checkExists(triggerkey) == true) {
scheduler.unscheduleJob(triggerkey)
}
}
} getOrElse {
Logger.info(s"${logPrefix}Skipping batch algo job setup because information about this algo (${algo.infoid}) cannot be found")
}
}
}
def trainEngineOnceNow(appid: Int, engineid: Int) = Action {
try {
apps.get(appid) map { app =>
engines.get(engineid) map { engine =>
syncAlgoJobs(app, engine, true)
Ok(Json.obj("message" -> "Immediate engine training request has been accepted."))
} getOrElse {
NotFound(Json.obj("message" -> s"Engine ID $engineid is invalid"))
}
} getOrElse {
NotFound(Json.obj("message" -> s"App ID $appid is invalid"))
}
} catch {
case e: RuntimeException =>
e.printStackTrace; NotFound(Json.obj("message" -> ("Request failed: " + e.getMessage())))
case e: Exception => InternalServerError(Json.obj("message" -> ("Request failed: " + e.getMessage())))
}
}
def algoStatus(appid: Int, engineid: Int, algoid: Int) = Action {
if (scheduler.checkExists(jobKey(algoid.toString(), Jobs.algoJobGroup))) {
/** The following checks only jobs in this particular scheduler node. */
/** TODO: Clustering support. */
try {
val running = scheduler.getCurrentlyExecutingJobs() map { context =>
val jobDetail = context.getJobDetail()
val jobKey = jobDetail.getKey()
jobKey.getName() == algoid.toString()
} reduce { (a, b) => a || b }
if (running)
Ok(Json.obj("algoid" -> algoid, "status" -> "jobrunning"))
else
Ok(Json.obj("algoid" -> algoid, "status" -> "jobnotrunning"))
} catch {
case e: UnsupportedOperationException => Ok(Json.obj("algoid" -> algoid, "status" -> "jobnotrunning"))
}
} else {
Ok(Json.obj("algoid" -> algoid, "status" -> "jobnotexist"))
}
}
def stopOfflineEval(appid: Int, engineid: Int, offlineevalid: Int) = Action {
val offlineEvalJobKey = jobKey(offlineevalid.toString(), Jobs.offlineEvalJobGroup)
if (scheduler.checkExists(offlineEvalJobKey)) {
/** The following checks only jobs in this particular scheduler node. */
/** TODO: Clustering support. */
try {
val running = scheduler.getCurrentlyExecutingJobs() map { context =>
val jobDetail = context.getJobDetail()
val jobKey = jobDetail.getKey()
jobKey.getName() == offlineevalid.toString()
} reduce { (a, b) => a || b }
if (running)
try {
scheduler.interrupt(offlineEvalJobKey)
Ok(Json.obj("offlineevalid" -> offlineevalid, "status" -> "jobkilled"))
} catch {
case e: UnableToInterruptJobException => Ok(Json.obj("offlineevalid" -> offlineevalid, "status" -> "jobnotkilled"))
}
else
Ok(Json.obj("offlineevalid" -> offlineevalid, "status" -> "jobnotrunning"))
} catch {
case e: UnsupportedOperationException => Ok(Json.obj("offlineevalid" -> offlineevalid, "status" -> "jobnotrunning"))
}
} else {
Ok(Json.obj("offlineevalid" -> offlineevalid, "status" -> "jobnotexist"))
}
}
def stopOfflineTune(appid: Int, engineid: Int, offlinetuneid: Int) = Action {
val offlineTuneJobKey = jobKey(offlinetuneid.toString(), Jobs.offlineTuneJobGroup)
try {
scheduler.interrupt(offlineTuneJobKey)
Ok(Json.obj("offlinetuneid" -> offlinetuneid, "status" -> "jobkilled"))
} catch {
case e: UnableToInterruptJobException => Ok(Json.obj("offlinetuneid" -> offlinetuneid, "status" -> "jobnotkilled"))
}
}
}