// * Licensed to the Apache Software Foundation (ASF) under one or more
// * contributor license agreements. See the NOTICE file distributed with
// * this work for additional information regarding copyright ownership.
// * The ASF licenses this file to You under the Apache License, Version 2.0
// * (the "License"); you may not use this file except in compliance with
// * the License. You may obtain a copy of the License at
// *
// *
// *
// * Unless required by applicable law or agreed to in writing, software
// * distributed under the License is distributed on an "AS IS" BASIS,
// * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// * See the License for the specific language governing permissions and
// * limitations under the License.
// */
//package org.apache.amaterasu.leader.yarn
//import{File, FileInputStream, InputStream}
//import{InetAddress, ServerSocket}
//import java.nio.ByteBuffer
//import java.util
//import java.util.concurrent.{ConcurrentHashMap, LinkedBlockingQueue}
//import javax.jms.MessageConsumer
//import org.apache.amaterasu.common.configuration.ClusterConfig
//import org.apache.amaterasu.common.dataobjects.ActionData
//import org.apache.amaterasu.common.logging.Logging
//import org.apache.amaterasu.leader.common.execution.JobManager
//import org.apache.amaterasu.leader.common.execution.frameworks.FrameworkProvidersFactory
//import org.apache.amaterasu.leader.common.utilities.MessagingClientUtil
//import org.apache.amaterasu.leader.execution.JobLoader
//import org.apache.amaterasu.leader.utilities.Args
//import org.apache.curator.framework.{CuratorFramework, CuratorFrameworkFactory}
//import org.apache.curator.retry.ExponentialBackoffRetry
//import org.apache.hadoop.fs.{FileSystem, Path}
//import org.apache.hadoop.yarn.api.records._
//import org.apache.hadoop.yarn.client.api.AMRMClient.ContainerRequest
//import org.apache.hadoop.yarn.client.api.async.impl.NMClientAsyncImpl
//import org.apache.hadoop.yarn.client.api.async.{AMRMClientAsync, NMClientAsync}
//import org.apache.hadoop.yarn.conf.YarnConfiguration
//import org.apache.hadoop.yarn.util.{ConverterUtils, Records}
//import org.apache.zookeeper.CreateMode
//import scala.collection.JavaConversions._
//import scala.collection.JavaConverters._
//import scala.collection.{concurrent, mutable}
//import scala.concurrent.Future
//import scala.util.{Failure, Success}
//class ApplicationMaster extends Logging with AMRMClientAsync.CallbackHandler {
// var capability: Resource = _
//"ApplicationMaster start")
// private var jobManager: JobManager = _
// private var client: CuratorFramework = _
// private var config: ClusterConfig = _
// private var env: String = _
// private var branch: String = _
// private var fs: FileSystem = _
// private var conf: YarnConfiguration = _
// private var propPath: String = ""
// private var props: InputStream = _
// private var jarPath: Path = _
// private var executorPath: Path = _
// private var executorJar: LocalResource = _
// private var propFile: LocalResource = _
// private var log4jPropFile: LocalResource = _
// private var nmClient: NMClientAsync = _
// private var allocListener: YarnRMCallbackHandler = _
// private var rmClient: AMRMClientAsync[ContainerRequest] = _
// private var address: String = _
// private var consumer: MessageConsumer = _
// private val containersIdsToTask: concurrent.Map[Long, ActionData] = new ConcurrentHashMap[Long, ActionData].asScala
// private val completedContainersAndTaskIds: concurrent.Map[Long, String] = new ConcurrentHashMap[Long, String].asScala
// private val actionsBuffer: java.util.concurrent.ConcurrentLinkedQueue[ActionData] = new java.util.concurrent.ConcurrentLinkedQueue[ActionData]()
// private val host: String = InetAddress.getLocalHost.getHostName
// private val broker: BrokerService = new BrokerService()
// def setLocalResourceFromPath(path: Path): LocalResource = {
// val stat = fs.getFileStatus(path)
// val fileResource = Records.newRecord(classOf[LocalResource])
// fileResource.setShouldBeUploadedToSharedCache(true)
// fileResource.setVisibility(LocalResourceVisibility.PUBLIC)
// fileResource.setResource(ConverterUtils.getYarnUrlFromPath(path))
// fileResource.setSize(stat.getLen)
// fileResource.setTimestamp(stat.getModificationTime)
// fileResource.setType(LocalResourceType.FILE)
// fileResource.setVisibility(LocalResourceVisibility.PUBLIC)
// fileResource
// }
// def execute(arguments: Args): Unit = {
//"Started AM with args $arguments")
// propPath = System.getenv("PWD") + "/"
// props = new FileInputStream(new File(propPath))
// // no need for hdfs double check (nod to Aaron Rodgers)
// // jars on HDFS should have been verified by the YARN client
// conf = new YarnConfiguration()
// fs = FileSystem.get(conf)
// config = ClusterConfig(props)
// try {
// initJob(arguments)
// } catch {
// case e: Exception => log.error("error initializing ", e.getMessage)
// }
// // now that the job was initiated, the curator client is Started and we can
// // register the broker's address
// client.create().withMode(CreateMode.PERSISTENT).forPath(s"/${jobManager.getJobId}/broker")
// client.setData().forPath(s"/${jobManager.getJobId}/broker", address.getBytes)
// // once the broker is registered, we can remove the barrier so clients can connect
// val barrier = new DistributedBarrier(client, s"/${jobManager.getJobId}-report-barrier")
// barrier.removeBarrier()
// consumer = MessagingClientUtil.setupMessaging(address)
//"Job ${jobManager.getJobId} initiated with ${jobManager.getRegisteredActions.size} actions")
// jarPath = new Path(config.YARN.hdfsJarsPath)
// // TODO: change this to read all dist folder and add to exec path
// executorPath = Path.mergePaths(jarPath, new Path(s"/dist/executor-${config.version}-all.jar"))
//"Executor jar path is {}", executorPath)
// executorJar = setLocalResourceFromPath(executorPath)
// propFile = setLocalResourceFromPath(Path.mergePaths(jarPath, new Path("/")))
// log4jPropFile = setLocalResourceFromPath(Path.mergePaths(jarPath, new Path("/")))
//"Started execute")
// nmClient = new NMClientAsyncImpl(new YarnNMCallbackHandler())
// // Initialize clients to ResourceManager and NodeManagers
// nmClient.init(conf)
// nmClient.start()
// // TODO: awsEnv currently set to empty string. should be changed to read values from (where?).
// allocListener = new YarnRMCallbackHandler(nmClient, jobManager, env, awsEnv = "", config, executorJar)
// rmClient = startRMClient()
// val registrationResponse = registerAppMaster("", 0, "")
// val maxMem = registrationResponse.getMaximumResourceCapability.getMemory
//"Max mem capability of resources in this cluster " + maxMem)
// val maxVCores = registrationResponse.getMaximumResourceCapability.getVirtualCores
//"Max vcores capability of resources in this cluster " + maxVCores)
//"Created jobManager. jobManager.registeredActions.size: ${jobManager.getRegisteredActions.size}")
// // Resource requirements for worker containers
// this.capability = Records.newRecord(classOf[Resource])
// val frameworkFactory = FrameworkProvidersFactory.apply(env, config)
// while (!jobManager.getOutOfActions) {
// val actionData = jobManager.getNextActionData
// if (actionData != null) {
// val frameworkProvider = frameworkFactory.providers(actionData.getGroupId)
// val driverConfiguration = frameworkProvider.getDriverConfiguration
// var mem: Int = driverConfiguration.getMemory
// mem = Math.min(mem, maxMem)
// this.capability.setMemory(mem)
// var cpu = driverConfiguration.getCpus
// cpu = Math.min(cpu, maxVCores)
// this.capability.setVirtualCores(cpu)
// askContainer(actionData)
// }
// }
//"Finished asking for containers")
// }
// private def startRMClient(): AMRMClientAsync[ContainerRequest] = {
// val client = AMRMClientAsync.createAMRMClientAsync[ContainerRequest](1000, this)
// client.init(conf)
// client.start()
// client
// }
// private def registerAppMaster(host: String, port: Int, url: String) = {
// // Register with ResourceManager
//"Registering application")
// val registrationResponse = rmClient.registerApplicationMaster(host, port, url)
//"Registered application")
// registrationResponse
// }
//// private def setupMessaging(jobId: String): Unit = {
//// val cf = new ActiveMQConnectionFactory(address)
//// val conn = cf.createConnection()
//// conn.start()
//// val session = conn.createSession(false, Session.AUTO_ACKNOWLEDGE)
//// //TODO: move to a const in common
//// val destination = session.createTopic("JOB.REPORT")
//// val consumer = session.createConsumer(destination)
//// consumer.setMessageListener(new ActiveReportListener)
//// }
// private def askContainer(actionData: ActionData): Unit = {
// actionsBuffer.add(actionData)
//"About to ask container for action ${actionData.getId}. Action buffer size is: ${actionsBuffer.size()}")
// // we have an action to schedule, let's request a container
// val priority: Priority = Records.newRecord(classOf[Priority])
// priority.setPriority(1)
// val containerReq = new ContainerRequest(capability, null, null, priority)
// rmClient.addContainerRequest(containerReq)
//"Asked container for action ${actionData.getId}")
// }
// override def onContainersAllocated(containers: util.List[Container]): Unit = {
//"${containers.size()} Containers allocated")
// for (container <- containers.asScala) { // Launch container by create ContainerLaunchContext
// if (actionsBuffer.isEmpty) {
// log.warn(s"Why actionBuffer empty and i was called?. Container ids: ${ => c.getId.getContainerId)}")
// return
// }
// val actionData = actionsBuffer.poll()
// val containerTask = Future[ActionData] {
// val frameworkFactory = FrameworkProvidersFactory(env, config)
// val framework = frameworkFactory.getFramework(actionData.getGroupId)
// val runnerProvider = framework.getRunnerProvider(actionData.getTypeId)
// val ctx = Records.newRecord(classOf[ContainerLaunchContext])
// val commands: List[String] = List(runnerProvider.getCommand(jobManager.getJobId, actionData, env, s"${actionData.getId}-${container.getId.getContainerId}", address))
//"Running container id {}.", container.getId.getContainerId)
//"Running container id {} with command '{}'", container.getId.getContainerId, commands.last)
// ctx.setCommands(commands)
// ctx.setTokens(allTokens)
// val yarnJarPath = new Path(config.YARN.hdfsJarsPath)
// //TODO Eyal - Remove the hardcoding of the dist path
// /* val resources = mutable.Map[String, LocalResource]()
// val binaryFileIter = fs.listFiles(new Path(s"${config.YARN.hdfsJarsPath}/dist"), false)
// while (binaryFileIter.hasNext) {
// val eachFile =
// resources (eachFile.getName) = setLocalResourceFromPath(fs.makeQualified(eachFile))
// }
// resources("") = setLocalResourceFromPath(fs.makeQualified(new Path(s"${config.YARN.hdfsJarsPath}/")))
// resources ("") = setLocalResourceFromPath(fs.makeQualified(new Path(s"${config.YARN.hdfsJarsPath}/")))*/
// val resources = mutable.Map[String, LocalResource](
// "executor.jar" -> setLocalResourceFromPath(Path.mergePaths(yarnJarPath, new Path(s"/dist/executor-${config.version}-all.jar"))),
// "spark-runner.jar" -> setLocalResourceFromPath(Path.mergePaths(yarnJarPath, new Path(s"/dist/spark-runner-${config.version}-all.jar"))),
// "spark-runtime.jar" -> setLocalResourceFromPath(Path.mergePaths(yarnJarPath, new Path(s"/dist/spark-runtime-${config.version}.jar"))),
// "" -> setLocalResourceFromPath(Path.mergePaths(yarnJarPath, new Path("/"))),
// "" -> setLocalResourceFromPath(Path.mergePaths(yarnJarPath, new Path("/"))),
// // TODO: Nadav/Eyal all of these should move to the executor resource setup
// "" -> setLocalResourceFromPath(Path.mergePaths(yarnJarPath, new Path("/dist/"))),
// "" -> setLocalResourceFromPath(Path.mergePaths(yarnJarPath, new Path("/dist/"))),
// "" -> setLocalResourceFromPath(Path.mergePaths(yarnJarPath, new Path("/dist/"))),
// "" -> setLocalResourceFromPath(Path.mergePaths(yarnJarPath, new Path("/dist/"))),
// "" -> setLocalResourceFromPath(Path.mergePaths(yarnJarPath, new Path("/dist/"))))
// //adding the framework and executor resources
// setupResources(yarnJarPath, framework.getGroupIdentifier, resources, framework.getGroupIdentifier)
// setupResources(yarnJarPath, s"${framework.getGroupIdentifier}/${actionData.getTypeId}", resources, s"${framework.getGroupIdentifier}-${actionData.getTypeId}")
// ctx.setLocalResources(resources)
// ctx.setEnvironment(Map[String, String](
// "HADOOP_CONF_DIR" -> s"${config.YARN.hadoopHomeDir}/conf/",
// "YARN_CONF_DIR" -> s"${config.YARN.hadoopHomeDir}/conf/",
// "AMA_NODE" -> sys.env("AMA_NODE"),
// "HADOOP_USER_NAME" -> UserGroupInformation.getCurrentUser.getUserName
// ))
//"hadoop conf dir is ${config.YARN.hadoopHomeDir}/conf/")
// nmClient.startContainerAsync(container, ctx)
// actionData
// }
// containerTask onComplete {
// case Failure(t) =>
// log.error(s"launching container Failed", t)
// askContainer(actionData)
// case Success(requestedActionData) =>
// jobManager.actionStarted(requestedActionData.getId)
// containersIdsToTask.put(container.getId.getContainerId, requestedActionData)
//"launching container succeeded: ${container.getId.getContainerId}; task: ${requestedActionData.getId}")
// }
// }
// }
// private def allTokens: ByteBuffer = {
// // creating the credentials for container execution
// val credentials = UserGroupInformation.getCurrentUser.getCredentials
// val dob = new DataOutputBuffer
// credentials.writeTokenStorageToStream(dob)
// // removing the AM->RM token so that containers cannot access it.
// val iter = credentials.getAllTokens.iterator
//"Executing with tokens:")
// for (token <- iter) {
// if (token.getKind == AMRMTokenIdentifier.KIND_NAME) iter.remove()
// }
// ByteBuffer.wrap(dob.getData, 0, dob.getLength)
// }
// private def setupResources(yarnJarPath: Path, frameworkPath: String, countainerResources: mutable.Map[String, LocalResource], resourcesPath: String): Unit = {
// val sourcePath = Path.mergePaths(yarnJarPath, new Path(s"/$resourcesPath"))
// if (fs.exists(sourcePath)) {
// val files = fs.listFiles(sourcePath, true)
// while (files.hasNext) {
// val res =
// val containerPath = res.getPath.toUri.getPath.replace("/apps/amaterasu/", "")
// countainerResources.put(containerPath, setLocalResourceFromPath(res.getPath))
// }
// }
// }
// def stopApplication(finalApplicationStatus: FinalApplicationStatus, appMessage: String): Unit = {
// import
// import org.apache.hadoop.yarn.exceptions.YarnException
// try
// rmClient.unregisterApplicationMaster(finalApplicationStatus, appMessage, null)
// catch {
// case ex: YarnException =>
// log.error("Failed to unregister application", ex)
// case e: IOException =>
// log.error("Failed to unregister application", e)
// }
// rmClient.stop()
// nmClient.stop()
// }
// override def onContainersCompleted(statuses: util.List[ContainerStatus]): Unit = {
// for (status <- statuses.asScala) {
// if (status.getState == ContainerState.COMPLETE) {
// val containerId = status.getContainerId.getContainerId
// val task = containersIdsToTask(containerId)
// rmClient.releaseAssignedContainer(status.getContainerId)
// val taskId = task.getId
// if (status.getExitStatus == 0) {
// //completedContainersAndTaskIds.put(containerId,
// jobManager.actionComplete(taskId)
//"Container $containerId Complete with task ${taskId} with success.")
// } else {
// // TODO: Check the getDiagnostics value and see if appropriate
// jobManager.actionFailed(taskId, status.getDiagnostics)
// log.warn(s"Container $containerId Complete with task ${taskId} with Failed status code (${status.getExitStatus})")
// }
// }
// }
// if (jobManager.getOutOfActions) {
//"Finished all tasks successfully! Wow!")
// jobManager.actionsCount()
// stopApplication(FinalApplicationStatus.SUCCEEDED, "SUCCESS")
// } else {
//"jobManager.registeredActions.size: ${jobManager.getRegisteredActions.size}; completedContainersAndTaskIds.size: ${completedContainersAndTaskIds.size}")
// }
// }
// override def getProgress: Float = {
// jobManager.getRegisteredActions.size.toFloat / completedContainersAndTaskIds.size
// }
// override def onNodesUpdated(updatedNodes: util.List[NodeReport]): Unit = {
//"Nodes change. Nothing to report.")
// }
// override def onShutdownRequest(): Unit = {
// log.error("Shutdown requested.")
// stopApplication(FinalApplicationStatus.KILLED, "Shutdown requested")
// }
// override def onError(e: Throwable): Unit = {
// log.error("Error on AM", e)
// stopApplication(FinalApplicationStatus.FAILED, "Error on AM")
// }
// def initJob(args: Args): Unit = {
// this.env = args.env
// this.branch = args.branch
// try {
// val retryPolicy = new ExponentialBackoffRetry(1000, 3)
// client = CuratorFrameworkFactory.newClient(config.zk, retryPolicy)
// client.start()
// } catch {
// case e: Exception =>
// log.error("Error connecting to zookeeper", e)
// throw e
// }
// if (args.jobId != null && !args.jobId.isEmpty) {
//"resuming job" + args.jobId)
// jobManager = JobLoader.reloadJob(
// args.jobId,
// client,
// config.Jobs.Tasks.attempts,
// new LinkedBlockingQueue[ActionData])
// } else {
//"new job is being created")
// try {
// jobManager = JobLoader.loadJob(
// args.repo,
// args.branch,
// args.newJobId,
// client,
// config.Jobs.Tasks.attempts,
// new LinkedBlockingQueue[ActionData])
// } catch {
// case e: Exception =>
// log.error("Error creating JobManager.", e)
// throw e
// }
// }
// jobManager.start()
//"Started jobManager")
// }
//object ApplicationMaster extends Logging with App {
// val parser = Args.getParser
// parser.parse(args, Args()) match {
// case Some(arguments: Args) =>
// val appMaster = new ApplicationMaster()
// appMaster.address = MessagingClientUtil.getBorkerAddress
//"broker Started with address ${appMaster.address}")
// appMaster.execute(arguments)
// case None =>
// }