//package org.apache.amaterasu.leader.mesos.schedulers
//import{File, PrintWriter, StringWriter}
//import java.nio.file.{Files, Path, Paths, StandardCopyOption}
//import java.util
//import java.util.{Collections, UUID}
//import java.util.concurrent.locks.ReentrantLock
//import java.util.concurrent.{ConcurrentHashMap, LinkedBlockingQueue}
//import com.fasterxml.jackson.databind.ObjectMapper
//import com.fasterxml.jackson.dataformat.yaml.YAMLFactory
//import com.fasterxml.jackson.module.scala.DefaultScalaModule
//import org.apache.amaterasu.common.configuration.{ClusterConfig, ConfigManager}
//import org.apache.amaterasu.common.configuration.enums.ActionStatus
//import org.apache.amaterasu.common.dataobjects.ActionData
//import org.apache.amaterasu.common.execution.actions.Notification
//import org.apache.amaterasu.common.execution.actions.enums.{NotificationLevel, NotificationType}
//import org.apache.amaterasu.leader.common.execution.{JobLoader, JobManager}
//import org.apache.amaterasu.leader.common.execution.frameworks.FrameworkProvidersFactory
//import org.apache.amaterasu.leader.common.utilities.DataLoader
//import org.apache.amaterasu.leader.utilities.HttpServer
//import org.apache.curator.framework.{CuratorFramework, CuratorFrameworkFactory}
//import org.apache.curator.retry.ExponentialBackoffRetry
//import org.apache.log4j.LogManager
//import org.apache.mesos.Protos.CommandInfo.URI
//import org.apache.mesos.Protos.Environment.Variable
//import org.apache.mesos.Protos._
//import org.apache.mesos.protobuf.ByteString
//import org.apache.mesos.{Protos, SchedulerDriver}
//import scala.collection.JavaConverters._
//import scala.collection.concurrent
//import scala.collection.concurrent.TrieMap
// * The JobScheduler is a Mesos implementation. It is in charge of scheduling the execution of
// * Amaterasu actions for a specific job
// */
//class JobScheduler extends AmaterasuScheduler {
// /*private val props: Properties = new Properties(new File(""))
// private val version = props.getProperty("version")
// println(s"===> version $version")*/
// LogManager.resetConfiguration()
// private var frameworkFactory: FrameworkProvidersFactory = _
// private var configManager: ConfigManager = _
// private var jobManager: JobManager = _
// private var client: CuratorFramework = _
// private var config: ClusterConfig = _
// private var src: String = _
// private var env: String = _
// private var branch: String = _
// private var userName: String = _
// private var password: String = _
// private var resume: Boolean = false
// private var reportLevel: NotificationLevel = _
// private val jarFile = new File(this.getClass.getProtectionDomain.getCodeSource.getLocation.getPath)
// private val amaDist = new File(s"${new File(jarFile.getParent).getParent}/dist")
// val slavesExecutors = new TrieMap[String, ExecutorInfo]
// private var awsEnv: String = ""
// // this map holds the following structure:
// // slaveId
// // |
// // +-> taskId, actionStatus)
// private val executionMap: concurrent.Map[String, concurrent.Map[String, ActionStatus]] = new ConcurrentHashMap[String, concurrent.Map[String, ActionStatus]].asScala
// private val lock = new ReentrantLock()
// private val offersToTaskIds: concurrent.Map[String, String] = new ConcurrentHashMap[String, String].asScala
// private val taskIdsToActions: concurrent.Map[Protos.TaskID, String] = new ConcurrentHashMap[Protos.TaskID, String].asScala
// private val mapper = new ObjectMapper()
// mapper.registerModule(DefaultScalaModule)
// private val yamlMapper = new ObjectMapper(new YAMLFactory())
// yamlMapper.registerModule(DefaultScalaModule)
// def error(driver: SchedulerDriver, message: String): Unit = {
// log.error(s"===> $message")
// }
// def executorLost(driver: SchedulerDriver, executorId: ExecutorID, slaveId: SlaveID, status: Int) {}
// def slaveLost(driver: SchedulerDriver, slaveId: SlaveID) {}
// def disconnected(driver: SchedulerDriver) {}
// def frameworkMessage(driver: SchedulerDriver, executorId: ExecutorID, slaveId: SlaveID, data: Array[Byte]): Unit = {
// val notification = mapper.readValue(data, classOf[Notification])
// reportLevel match {
// case NotificationLevel.Code => printNotification(notification)
// case NotificationLevel.Execution =>
// if (notification.getNotLevel != NotificationLevel.Code)
// printNotification(notification)
// case _ =>
// }
// }
// def statusUpdate(driver: SchedulerDriver, status: TaskStatus): Unit = {
// val actionName = taskIdsToActions(status.getTaskId)
// status.getState match {
// case TaskState.TASK_STARTING =>"Task starting ...")
// case TaskState.TASK_RUNNING => {
// jobManager.actionStarted(status.getTaskId.getValue)
// printNotification(new Notification("", s"created container for $actionName created", NotificationType.Info, NotificationLevel.Execution))
// }
// case TaskState.TASK_FINISHED => {
// jobManager.actionComplete(status.getTaskId.getValue)
// printNotification(new Notification("", s"Container ${status.getExecutorId.getValue} Complete with task ${status.getTaskId.getValue} with success.", NotificationType.Info, NotificationLevel.Execution))
// }
// case TaskState.TASK_FAILED |
// TaskState.TASK_KILLED |
// TaskState.TASK_ERROR |
// TaskState.TASK_LOST => {
// jobManager.actionFailed(status.getTaskId.getValue, status.getMessage)
// printNotification(new Notification("", s"error launching container with ${status.getMessage} in ${status.getData.toStringUtf8}", NotificationType.Error, NotificationLevel.Execution))
// }
// case _ => log.warn("WTF? just got unexpected task state: " + status.getState)
// }
// }
// def validateOffer(offer: Offer): Boolean = {
// val resources = offer.getResourcesList.asScala
// resources.count(r => r.getName == "cpus" && r.getScalar.getValue >= > 0 &&
// resources.count(r => r.getName == "mem" && r.getScalar.getValue >= > 0
// }
// def offerRescinded(driver: SchedulerDriver, offerId: OfferID): Unit = {
// val actionId = offersToTaskIds(offerId.getValue)
// jobManager.reQueueAction(actionId)
// }
// def resourceOffers(driver: SchedulerDriver, offers: util.List[Offer]): Unit = {
// println(jobManager.toString)
// for (offer <- offers.asScala) {
// if (validateOffer(offer)) {
//"Accepting offer, id=${offer.getId}")
// // this is done to avoid the processing the same action
// // multiple times
// lock.lock()
// try {
// val actionData = jobManager.getNextActionData
// if (actionData != null) {
// frameworkFactory = FrameworkProvidersFactory(env, config)
// val items = frameworkFactory.providers.values.flatMap(_.getConfigurationItems).toList.asJava
// configManager = new ConfigManager(env, "repo", items)
// val taskId = Protos.TaskID.newBuilder().setValue(actionData.getId).build()
// taskIdsToActions.put(taskId, actionData.getName)
// // setting up the configuration files for the container
// val envYaml = configManager.getActionConfigContent(actionData.getName, actionData.getConfig)
// writeConfigFile(envYaml, jobManager.getJobId, actionData.getName, "env.yaml")
// val envConf = configManager.getActionConfiguration(actionData.getName, actionData.getConfig)
// val dataStores = DataLoader.getTaskData(actionData, env).getExports
// val writer = new StringWriter()
// yamlMapper.writeValue(writer, dataStores)
// val dataStoresYaml = writer.toString
// writeConfigFile(dataStoresYaml, jobManager.getJobId, actionData.getName, "datastores.yaml")
// writeConfigFile(s"jobId: ${jobManager.getJobId}\nactionName: ${actionData.getName}", jobManager.getJobId, actionData.getName, "runtime.yaml")
// val datasets = DataLoader.getDatasets(env)
// writeConfigFile(datasets, jobManager.getJobId, actionData.getName, "datasets.yaml")
// offersToTaskIds.put(offer.getId.getValue, taskId.getValue)
// // atomically adding a record for the slave, I'm storing all the actions
// // on a slave level to efficiently handle slave loses
// executionMap.putIfAbsent(offer.getSlaveId.toString, new ConcurrentHashMap[String, ActionStatus].asScala)
// val slaveActions = executionMap(offer.getSlaveId.toString)
// slaveActions.put(taskId.getValue, ActionStatus.Started)
// val frameworkProvider = frameworkFactory.providers(actionData.getGroupId)
// val runnerProvider = frameworkProvider.getRunnerProvider(actionData.getTypeId)
// printNotification(new Notification("", s"provider ${runnerProvider.getClass.getName}", NotificationType.Info, NotificationLevel.Execution))
// // searching for an executor that already exist on the slave, if non exist
// // we create a new one
// var executor: ExecutorInfo = null
// // val slaveId = offer.getSlaveId.getValue
// // slavesExecutors.synchronized {
// val execData = DataLoader.getExecutorDataBytes(env, config)
// val executorId = taskId.getValue + "-" + UUID.randomUUID()
// //creating the command
// // // TODO: move this into the runner provider somehow
// // if(!actionData.getSrc.isEmpty){
// // copy(get(s"repo/src/${actionData.getSrc}"), get(s"dist/${jobManager.getJobId}/${actionData.getName}/${actionData.getSrc}"), REPLACE_EXISTING)
// // }
// val commandStr = runnerProvider.getCommand(jobManager.getJobId, actionData, envConf, executorId, "")
// printNotification(new Notification("", s"container command $commandStr", NotificationType.Info, NotificationLevel.Execution))
// val command = CommandInfo
// .newBuilder
// .setValue(commandStr)
// // .addUris(URI.newBuilder
// // .setValue(s"http://${sys.env("AMA_NODE")}:${config.webserver.Port}/executor-${config.version}-all.jar")
// // .setExecutable(false)
// // .setExtract(false)
// // .build())
// // Getting framework (group) resources
//"===> groupResources: ${frameworkProvider.getGroupResources}")
// frameworkProvider.getGroupResources.foreach(f => command.addUris(URI.newBuilder
// .setValue(s"http://${sys.env("AMA_NODE")}:${config.webserver.Port}/${f.getName}")
// .setExecutable(false)
// .setExtract(true)
// .build()
// ))
// // Getting runner resources
// runnerProvider.getRunnerResources.foreach(r => {
// command.addUris(URI.newBuilder
// .setValue(s"http://${sys.env("AMA_NODE")}:${config.webserver.Port}/$r")
// .setExecutable(false)
// .setExtract(false)
// .build())
// })
// // Getting action dependencies
// runnerProvider.getActionDependencies(jobManager.getJobId, actionData).foreach(r => {
// FileUtils.copyFile(new File(r), new File(s"dist/$r"))
// command.addUris(URI.newBuilder
// .setValue(s"http://${sys.env("AMA_NODE")}:${config.webserver.Port}/$r")
// .setExecutable(false)
// .setExtract(false)
// .build())
// })
// // Getting action specific resources
// runnerProvider.getActionResources(jobManager.getJobId, actionData).foreach(r => command.addUris(URI.newBuilder
// .setValue(s"http://${sys.env("AMA_NODE")}:${config.webserver.Port}/$r")
// .setExecutable(false)
// .setExtract(false)
// .build()))
// // setting up action executable
// val sourcePath = new File(runnerProvider.getActionExecutable(jobManager.getJobId, actionData))
// var executable: Path = null
// if (actionData.getHasArtifact) {
// val relativePath = amaDist.toPath.getRoot.relativize(sourcePath.toPath)
// executable = relativePath.subpath(amaDist.toPath.getNameCount, relativePath.getNameCount)
// } else {
// val dest = new File(s"dist/${jobManager.getJobId}/${sourcePath.toString}")
// FileUtils.copyFile(sourcePath, dest)
// executable = Paths.get(jobManager.getJobId, sourcePath.toPath.toString)
// }
// println(s"===> executable $executable")
// command.addUris(URI.newBuilder
// .setValue(s"http://${sys.env("AMA_NODE")}:${config.webserver.Port}/$executable")
// .setExecutable(false)
// .setExtract(false)
// .build())
// command
// .addUris(URI.newBuilder()
// .setValue(s"http://${sys.env("AMA_NODE")}:${config.webserver.Port}/")
// .setExecutable(false)
// .setExtract(false)
// .build())
// // setting the processes environment variables
// val envVarsList = => Variable.newBuilder().setName(x._1).setValue(x._2).build()).asJava
// command.setEnvironment(Environment.newBuilder().addAllVariables(envVarsList))
// executor = ExecutorInfo
// .newBuilder
// .setData(ByteString.copyFrom(execData))
// .setName(taskId.getValue)
// .setExecutorId(ExecutorID.newBuilder().setValue(executorId))
// .setCommand(command)
// .build()
// slavesExecutors.put(offer.getSlaveId.getValue, executor)
// val driverConfiguration = frameworkProvider.getDriverConfiguration(configManager)
// var actionTask: TaskInfo = null
// if (runnerProvider.getHasExecutor) {
// actionTask = TaskInfo
// .newBuilder
// .setName(taskId.getValue)
// .setTaskId(taskId)
// .setExecutor(executor)
// .setData(ByteString.copyFrom(DataLoader.getTaskDataBytes(actionData, env)))
// .addResources(createScalarResource("cpus", driverConfiguration.getCpus))
// .addResources(createScalarResource("mem", driverConfiguration.getMemory))
// .addResources(createScalarResource("disk",
// .setSlaveId(offer.getSlaveId)
// .build()
// //driver.launchTasks(Collections.singleton(offer.getId), List(actionTask).asJava)
// }
// else {
// actionTask = TaskInfo
// .newBuilder
// .setName(taskId.getValue)
// .setTaskId(taskId)
// .setCommand(command)
// //.setData(ByteString.copyFrom(DataLoader.getTaskDataBytes(actionData, env)))
// .addResources(createScalarResource("cpus", driverConfiguration.getCpus))
// .addResources(createScalarResource("mem", driverConfiguration.getMemory))
// .addResources(createScalarResource("disk",
// .setSlaveId(offer.getSlaveId)
// .build()
// //driver.launchTasks(Collections.singleton(offer.getId), List(actionTask).asJava)
// }
// printNotification(new Notification("", s"requesting container for ${actionData.getName}", NotificationType.Info, NotificationLevel.Execution))
// driver.launchTasks(Collections.singleton(offer.getId), List(actionTask).asJava)
// }
// else if (jobManager.getOutOfActions) {
//"framework ${jobManager.getJobId} execution finished")
// val repo = new File("repo/")
// repo.delete()
// HttpServer.stop()
// driver.declineOffer(offer.getId)
// driver.stop()
// sys.exit()
// }
// else {
//"Declining offer, no action ready for execution")
// driver.declineOffer(offer.getId)
// }
// }
// finally {
// lock.unlock()
// }
// }
// else {
//"Declining offer, no sufficient resources")
// driver.declineOffer(offer.getId)
// }
// }
// }
// def registered(driver: SchedulerDriver, frameworkId: FrameworkID, masterInfo: MasterInfo): Unit = {
// if (!resume) {
// jobManager = JobLoader.loadJob(
// src,
// branch,
// frameworkId.getValue,
// userName,
// password,
// client,
// new LinkedBlockingQueue[ActionData]()
// )
// }
// else {
// JobLoader.reloadJob(
// frameworkId.getValue,
// userName,
// password,
// client,
// new LinkedBlockingQueue[ActionData]()
// )
// }
// jobManager.start()
// createJobDir(jobManager.getJobId)
// }
// def reregistered(driver: SchedulerDriver, masterInfo: Protos.MasterInfo) {}
// def printNotification(notification: Notification): Unit = {
// var color = Console.WHITE
// notification.getNotType match {
// case NotificationType.Info =>
// color = Console.WHITE
// println(s"$color${Console.BOLD}===> ${notification.getMsg} ${Console.RESET}")
// case NotificationType.Success =>
// color = Console.GREEN
// println(s"$color${Console.BOLD}===> ${notification.getLine} ${Console.RESET}")
// case NotificationType.Error =>
// color = Console.RED
// println(s"$color${Console.BOLD}===> ${notification.getLine} ${Console.RESET}")
// println(s"$color${Console.BOLD}===> ${notification.getMsg} ${Console.RESET}")
// }
// }
// private def createJobDir(jobId: String): Unit = {
// val jarFile = new File(this.getClass.getProtectionDomain.getCodeSource.getLocation.getPath)
// val amaHome = new File(jarFile.getParent).getParent
// val jobDir = s"$amaHome/dist/$jobId/"
// val dir = new File(jobDir)
// if (!dir.exists()) {
// dir.mkdir()
// }
// }
// /**
// * This function creates an action specific env.yml file int the dist folder with the following path:
// * dist/{jobId}/{actionName}/env.yml to be added to the container
// *
// * @param configuration A YAML string to be written to the env file
// * @param jobId the jobId
// * @param actionName the name of the action
// */
// def writeConfigFile(configuration: String, jobId: String, actionName: String, fileName: String): Unit = {
// val jarFile = new File(this.getClass.getProtectionDomain.getCodeSource.getLocation.getPath)
// val amaHome = new File(jarFile.getParent).getParent
// val envLocation = s"$amaHome/dist/$jobId/$actionName/"
// val dir = new File(envLocation)
// if (!dir.exists()) {
// dir.mkdirs()
// }
// new PrintWriter(s"$envLocation/$fileName") {
// write(configuration)
// close
// }
// }
//object JobScheduler {
// def apply(src: String,
// branch: String,
// username: String,
// password: String,
// env: String,
// resume: Boolean,
// config: ClusterConfig,
// report: String,
// home: String): JobScheduler = {
// LogManager.resetConfiguration()
// val scheduler = new JobScheduler()
// HttpServer.start(config.webserver.Port, s"$home/${config.webserver.Root}")
// if (sys.env.get("AWS_ACCESS_KEY_ID").isDefined &&
// sys.env.get("AWS_SECRET_ACCESS_KEY").isDefined) {
// scheduler.awsEnv = s"env AWS_ACCESS_KEY_ID=${sys.env("AWS_ACCESS_KEY_ID")} env AWS_SECRET_ACCESS_KEY=${sys.env("AWS_SECRET_ACCESS_KEY")}"
// }
// scheduler.resume = resume
// scheduler.src = src
// scheduler.branch = branch
// scheduler.userName = username
// scheduler.password = password
// scheduler.env = env
// scheduler.reportLevel = NotificationLevel.valueOf(report.capitalize)
// val retryPolicy = new ExponentialBackoffRetry(1000, 3)
// scheduler.client = CuratorFrameworkFactory.newClient(config.zk, retryPolicy)
// scheduler.client.start()
// scheduler.config = config
// scheduler
// }