blob: d35737880cc95bdc396aec2f94a84f61524f1939 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.amaterasu.leader.mesos.schedulers
import java.io.{File, PrintWriter, StringWriter}
import java.nio.file.{Files, Path, Paths, StandardCopyOption}
import java.util
import java.util.{Collections, UUID}
import java.util.concurrent.locks.ReentrantLock
import java.util.concurrent.{ConcurrentHashMap, LinkedBlockingQueue}
import com.fasterxml.jackson.databind.ObjectMapper
import com.fasterxml.jackson.dataformat.yaml.YAMLFactory
import com.fasterxml.jackson.module.scala.DefaultScalaModule
import org.apache.amaterasu.common.configuration.ClusterConfig
import org.apache.amaterasu.common.configuration.enums.ActionStatus
import org.apache.amaterasu.common.dataobjects.ActionData
import org.apache.amaterasu.common.execution.actions.Notification
import org.apache.amaterasu.common.execution.actions.enums.{NotificationLevel, NotificationType}
import org.apache.amaterasu.leader.common.configuration.ConfigManager
import org.apache.amaterasu.leader.common.execution.JobManager
import org.apache.amaterasu.leader.common.execution.frameworks.FrameworkProvidersFactory
import org.apache.amaterasu.leader.common.utilities.DataLoader
import org.apache.amaterasu.leader.execution.JobLoader
import org.apache.amaterasu.leader.utilities.HttpServer
import org.apache.commons.io.FileUtils
import org.apache.curator.framework.{CuratorFramework, CuratorFrameworkFactory}
import org.apache.curator.retry.ExponentialBackoffRetry
import org.apache.log4j.LogManager
import org.apache.mesos.Protos.CommandInfo.URI
import org.apache.mesos.Protos.Environment.Variable
import org.apache.mesos.Protos._
import org.apache.mesos.protobuf.ByteString
import org.apache.mesos.{Protos, SchedulerDriver}
import scala.collection.JavaConverters._
import scala.collection.concurrent
import scala.collection.concurrent.TrieMap
/**
* The JobScheduler is a Mesos implementation. It is in charge of scheduling the execution of
* Amaterasu actions for a specific job
*/
class JobScheduler extends AmaterasuScheduler {
/*private val props: Properties = new Properties(new File(""))
private val version = props.getProperty("version")
println(s"===> version $version")*/
LogManager.resetConfiguration()
private var frameworkFactory: FrameworkProvidersFactory = _
private var configManager: ConfigManager = _
private var jobManager: JobManager = _
private var client: CuratorFramework = _
private var config: ClusterConfig = _
private var src: String = _
private var env: String = _
private var branch: String = _
private var resume: Boolean = false
private var reportLevel: NotificationLevel = _
private val jarFile = new File(this.getClass.getProtectionDomain.getCodeSource.getLocation.getPath)
private val amaDist = new File(s"${new File(jarFile.getParent).getParent}/dist")
val slavesExecutors = new TrieMap[String, ExecutorInfo]
private var awsEnv: String = ""
// this map holds the following structure:
// slaveId
// |
// +-> taskId, actionStatus)
private val executionMap: concurrent.Map[String, concurrent.Map[String, ActionStatus]] = new ConcurrentHashMap[String, concurrent.Map[String, ActionStatus]].asScala
private val lock = new ReentrantLock()
private val offersToTaskIds: concurrent.Map[String, String] = new ConcurrentHashMap[String, String].asScala
private val taskIdsToActions: concurrent.Map[Protos.TaskID, String] = new ConcurrentHashMap[Protos.TaskID, String].asScala
private val mapper = new ObjectMapper()
mapper.registerModule(DefaultScalaModule)
private val yamlMapper = new ObjectMapper(new YAMLFactory())
yamlMapper.registerModule(DefaultScalaModule)
def error(driver: SchedulerDriver, message: String): Unit = {
log.error(s"===> $message")
}
def executorLost(driver: SchedulerDriver, executorId: ExecutorID, slaveId: SlaveID, status: Int) {}
def slaveLost(driver: SchedulerDriver, slaveId: SlaveID) {}
def disconnected(driver: SchedulerDriver) {}
def frameworkMessage(driver: SchedulerDriver, executorId: ExecutorID, slaveId: SlaveID, data: Array[Byte]): Unit = {
val notification = mapper.readValue(data, classOf[Notification])
reportLevel match {
case NotificationLevel.Code => printNotification(notification)
case NotificationLevel.Execution =>
if (notification.getNotLevel != NotificationLevel.Code)
printNotification(notification)
case _ =>
}
}
def statusUpdate(driver: SchedulerDriver, status: TaskStatus): Unit = {
val actionName = taskIdsToActions(status.getTaskId)
status.getState match {
case TaskState.TASK_STARTING => log.info("Task starting ...")
case TaskState.TASK_RUNNING => {
jobManager.actionStarted(status.getTaskId.getValue)
printNotification(new Notification("", s"created container for $actionName created", NotificationType.Info, NotificationLevel.Execution))
}
case TaskState.TASK_FINISHED => {
jobManager.actionComplete(status.getTaskId.getValue)
printNotification(new Notification("", s"Container ${status.getExecutorId.getValue} Complete with task ${status.getTaskId.getValue} with success.", NotificationType.Info, NotificationLevel.Execution))
}
case TaskState.TASK_FAILED |
TaskState.TASK_KILLED |
TaskState.TASK_ERROR |
TaskState.TASK_LOST => {
jobManager.actionFailed(status.getTaskId.getValue, status.getMessage)
printNotification(new Notification("", s"error launching container with ${status.getMessage} in ${status.getData}", NotificationType.Error, NotificationLevel.Execution))
}
case _ => log.warn("WTF? just got unexpected task state: " + status.getState)
}
}
def validateOffer(offer: Offer): Boolean = {
val resources = offer.getResourcesList.asScala
resources.count(r => r.getName == "cpus" && r.getScalar.getValue >= config.jobs.tasks.cpus) > 0 &&
resources.count(r => r.getName == "mem" && r.getScalar.getValue >= config.jobs.tasks.mem) > 0
}
def offerRescinded(driver: SchedulerDriver, offerId: OfferID): Unit = {
val actionId = offersToTaskIds(offerId.getValue)
jobManager.reQueueAction(actionId)
}
def resourceOffers(driver: SchedulerDriver, offers: util.List[Offer]): Unit = {
println(jobManager.toString)
for (offer <- offers.asScala) {
if (validateOffer(offer)) {
log.info(s"Accepting offer, id=${offer.getId}")
// this is done to avoid the processing the same action
// multiple times
lock.lock()
try {
val actionData = jobManager.getNextActionData
if (actionData != null) {
val taskId = Protos.TaskID.newBuilder().setValue(actionData.getId).build()
taskIdsToActions.put(taskId, actionData.getName)
// setting up the configuration files for the container
val envYaml = configManager.getActionConfigContent(actionData.getName, actionData.getConfig)
writeConfigFile(envYaml, jobManager.getJobId, actionData.getName, "env.yaml")
val envConf = configManager.getActionConfiguration(actionData.getName, actionData.getConfig)
val dataStores = DataLoader.getTaskData(actionData, env).getExports
val writer = new StringWriter()
yamlMapper.writeValue(writer, dataStores)
val dataStoresYaml = writer.toString
writeConfigFile(dataStoresYaml, jobManager.getJobId, actionData.getName, "datastores.yaml")
writeConfigFile(s"jobId: ${jobManager.getJobId}\nactionName: ${actionData.getName}", jobManager.getJobId, actionData.getName, "runtime.yaml")
val datasets = DataLoader.getDatasets(env)
writeConfigFile(datasets, jobManager.getJobId, actionData.getName, "datasets.yaml")
offersToTaskIds.put(offer.getId.getValue, taskId.getValue)
// atomically adding a record for the slave, I'm storing all the actions
// on a slave level to efficiently handle slave loses
executionMap.putIfAbsent(offer.getSlaveId.toString, new ConcurrentHashMap[String, ActionStatus].asScala)
val slaveActions = executionMap(offer.getSlaveId.toString)
slaveActions.put(taskId.getValue, ActionStatus.Started)
log.info(s">>>> Framework: ${actionData.getGroupId}")
val frameworkProvider = frameworkFactory.providers(actionData.getGroupId)
log.info(s">>>> Runner: ${actionData.getTypeId}")
val runnerProvider = frameworkProvider.getRunnerProvider(actionData.getTypeId)
// searching for an executor that already exist on the slave, if non exist
// we create a new one
var executor: ExecutorInfo = null
// val slaveId = offer.getSlaveId.getValue
// slavesExecutors.synchronized {
val execData = DataLoader.getExecutorDataBytes(env, config)
val executorId = taskId.getValue + "-" + UUID.randomUUID()
//creating the command
// // TODO: move this into the runner provider somehow
// if(!actionData.getSrc.isEmpty){
// copy(get(s"repo/src/${actionData.getSrc}"), get(s"dist/${jobManager.getJobId}/${actionData.getName}/${actionData.getSrc}"), REPLACE_EXISTING)
// }
val commandStr = runnerProvider.getCommand(jobManager.getJobId, actionData, envConf, executorId, "")
printNotification(new Notification("", s"container command $commandStr", NotificationType.Info, NotificationLevel.Execution))
val command = CommandInfo
.newBuilder
.setValue(commandStr)
// .addUris(URI.newBuilder
// .setValue(s"http://${sys.env("AMA_NODE")}:${config.webserver.Port}/executor-${config.version}-all.jar")
// .setExecutable(false)
// .setExtract(false)
// .build())
// Getting framework (group) resources
log.info(s"===> groupResources: ${frameworkProvider.getGroupResources}")
frameworkProvider.getGroupResources.foreach(f => command.addUris(URI.newBuilder
.setValue(s"http://${sys.env("AMA_NODE")}:${config.webserver.Port}/${f.getName}")
.setExecutable(false)
.setExtract(true)
.build()
))
// Getting runner resources
runnerProvider.getRunnerResources.foreach(r => {
command.addUris(URI.newBuilder
.setValue(s"http://${sys.env("AMA_NODE")}:${config.webserver.Port}/$r")
.setExecutable(false)
.setExtract(false)
.build())
}
)
// Getting action dependencies
runnerProvider.getActionDependencies(jobManager.getJobId, actionData).foreach(r => {
FileUtils.copyFile(new File(r), new File(s"dist/$r"))
command.addUris(URI.newBuilder
.setValue(s"http://${sys.env("AMA_NODE")}:${config.webserver.Port}/$r")
.setExecutable(false)
.setExtract(false)
.build())
})
// Getting action specific resources
runnerProvider.getActionResources(jobManager.getJobId, actionData).foreach(r => command.addUris(URI.newBuilder
.setValue(s"http://${sys.env("AMA_NODE")}:${config.webserver.Port}/$r")
.setExecutable(false)
.setExtract(false)
.build()))
// setting up action executable
val sourcePath = new File(runnerProvider.getActionExecutable(jobManager.getJobId, actionData))
var executable: Path = null
if (actionData.getHasArtifact) {
val relativePath = amaDist.toPath.getRoot.relativize(sourcePath.toPath)
executable = relativePath.subpath(amaDist.toPath.getNameCount, relativePath.getNameCount)
} else {
val dest = new File(s"dist/${jobManager.getJobId}/${sourcePath.toString}")
FileUtils.copyFile(sourcePath, dest)
executable = Paths.get(jobManager.getJobId, sourcePath.toPath.toString)
}
println(s"===> executable $executable")
command.addUris(URI.newBuilder
.setValue(s"http://${sys.env("AMA_NODE")}:${config.webserver.Port}/$executable")
.setExecutable(false)
.setExtract(false)
.build())
command
.addUris(URI.newBuilder()
.setValue(s"http://${sys.env("AMA_NODE")}:${config.webserver.Port}/amaterasu.properties")
.setExecutable(false)
.setExtract(false)
.build())
// setting the processes environment variables
val envVarsList = frameworkProvider.getEnvironmentVariables.asScala.toList.map(x => Variable.newBuilder().setName(x._1).setValue(x._2).build()).asJava
command.setEnvironment(Environment.newBuilder().addAllVariables(envVarsList))
executor = ExecutorInfo
.newBuilder
.setData(ByteString.copyFrom(execData))
.setName(taskId.getValue)
.setExecutorId(ExecutorID.newBuilder().setValue(executorId))
.setCommand(command)
.build()
slavesExecutors.put(offer.getSlaveId.getValue, executor)
val driverConfiguration = frameworkProvider.getDriverConfiguration
var actionTask: TaskInfo = null
if (runnerProvider.getHasExecutor) {
actionTask = TaskInfo
.newBuilder
.setName(taskId.getValue)
.setTaskId(taskId)
.setExecutor(executor)
.setData(ByteString.copyFrom(DataLoader.getTaskDataBytes(actionData, env)))
.addResources(createScalarResource("cpus", driverConfiguration.getCpus))
.addResources(createScalarResource("mem", driverConfiguration.getMemory))
.addResources(createScalarResource("disk", config.jobs.repoSize))
.setSlaveId(offer.getSlaveId)
.build()
//driver.launchTasks(Collections.singleton(offer.getId), List(actionTask).asJava)
}
else {
actionTask = TaskInfo
.newBuilder
.setName(taskId.getValue)
.setTaskId(taskId)
.setCommand(command)
//.setData(ByteString.copyFrom(DataLoader.getTaskDataBytes(actionData, env)))
.addResources(createScalarResource("cpus", driverConfiguration.getCpus))
.addResources(createScalarResource("mem", driverConfiguration.getMemory))
.addResources(createScalarResource("disk", config.jobs.repoSize))
.setSlaveId(offer.getSlaveId)
.build()
//driver.launchTasks(Collections.singleton(offer.getId), List(actionTask).asJava)
}
printNotification(new Notification("", s"requesting container for ${actionData.getName}", NotificationType.Info, NotificationLevel.Execution))
driver.launchTasks(Collections.singleton(offer.getId), List(actionTask).asJava)
}
else if (jobManager.getOutOfActions) {
log.info(s"framework ${jobManager.getJobId} execution finished")
val repo = new File("repo/")
repo.delete()
HttpServer.stop()
driver.declineOffer(offer.getId)
driver.stop()
sys.exit()
}
else {
log.info("Declining offer, no action ready for execution")
driver.declineOffer(offer.getId)
}
}
finally {
lock.unlock()
}
}
else {
log.info("Declining offer, no sufficient resources")
driver.declineOffer(offer.getId)
}
}
}
def registered(driver: SchedulerDriver, frameworkId: FrameworkID, masterInfo: MasterInfo): Unit = {
if (!resume) {
jobManager = JobLoader.loadJob(
src,
branch,
frameworkId.getValue,
client,
config.jobs.tasks.attempts,
new LinkedBlockingQueue[ActionData]()
)
}
else {
JobLoader.reloadJob(
frameworkId.getValue,
client,
config.jobs.tasks.attempts,
new LinkedBlockingQueue[ActionData]()
)
}
frameworkFactory = FrameworkProvidersFactory(env, config)
val items = frameworkFactory.providers.values.flatMap(_.getConfigurationItems).toList.asJava
configManager = new ConfigManager(env, "repo", items)
jobManager.start()
createJobDir(jobManager.getJobId)
}
def reregistered(driver: SchedulerDriver, masterInfo: Protos.MasterInfo) {}
def printNotification(notification: Notification): Unit = {
var color = Console.WHITE
notification.getNotType match {
case NotificationType.Info =>
color = Console.WHITE
println(s"$color${Console.BOLD}===> ${notification.getMsg} ${Console.RESET}")
case NotificationType.Success =>
color = Console.GREEN
println(s"$color${Console.BOLD}===> ${notification.getLine} ${Console.RESET}")
case NotificationType.Error =>
color = Console.RED
println(s"$color${Console.BOLD}===> ${notification.getLine} ${Console.RESET}")
println(s"$color${Console.BOLD}===> ${notification.getMsg} ${Console.RESET}")
}
}
private def createJobDir(jobId: String): Unit = {
val jarFile = new File(this.getClass.getProtectionDomain.getCodeSource.getLocation.getPath)
val amaHome = new File(jarFile.getParent).getParent
val jobDir = s"$amaHome/dist/$jobId/"
val dir = new File(jobDir)
if (!dir.exists()) {
dir.mkdir()
}
}
/**
* This function creates an action specific env.yml file int the dist folder with the following path:
* dist/{jobId}/{actionName}/env.yml to be added to the container
*
* @param configuration A YAML string to be written to the env file
* @param jobId the jobId
* @param actionName the name of the action
*/
def writeConfigFile(configuration: String, jobId: String, actionName: String, fileName: String): Unit = {
val jarFile = new File(this.getClass.getProtectionDomain.getCodeSource.getLocation.getPath)
val amaHome = new File(jarFile.getParent).getParent
val envLocation = s"$amaHome/dist/$jobId/$actionName/"
val dir = new File(envLocation)
if (!dir.exists()) {
dir.mkdirs()
}
new PrintWriter(s"$envLocation/$fileName") {
write(configuration)
close
}
}
}
object JobScheduler {
def apply(src: String,
branch: String,
env: String,
resume: Boolean,
config: ClusterConfig,
report: String,
home: String): JobScheduler = {
LogManager.resetConfiguration()
val scheduler = new JobScheduler()
HttpServer.start(config.webserver.Port, s"$home/${config.webserver.Root}")
if (sys.env.get("AWS_ACCESS_KEY_ID").isDefined &&
sys.env.get("AWS_SECRET_ACCESS_KEY").isDefined) {
scheduler.awsEnv = s"env AWS_ACCESS_KEY_ID=${sys.env("AWS_ACCESS_KEY_ID")} env AWS_SECRET_ACCESS_KEY=${sys.env("AWS_SECRET_ACCESS_KEY")}"
}
scheduler.resume = resume
scheduler.src = src
scheduler.branch = branch
scheduler.env = env
scheduler.reportLevel = NotificationLevel.valueOf(report.capitalize)
val retryPolicy = new ExponentialBackoffRetry(1000, 3)
scheduler.client = CuratorFrameworkFactory.newClient(config.zk, retryPolicy)
scheduler.client.start()
scheduler.config = config
scheduler
}
}