| /* |
| * Licensed to the Apache Software Foundation (ASF) under one or more |
| * contributor license agreements. See the NOTICE file distributed with |
| * this work for additional information regarding copyright ownership. |
| * The ASF licenses this file to You under the Apache License, Version 2.0 |
| * (the "License"); you may not use this file except in compliance with |
| * the License. You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, software |
| * distributed under the License is distributed on an "AS IS" BASIS, |
| * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| * See the License for the specific language governing permissions and |
| * limitations under the License. |
| */ |
| package org.apache.amaterasu.leader.yarn |
| |
| import java.io.{File, FileInputStream, InputStream} |
| import java.net.{InetAddress, ServerSocket} |
| import java.nio.ByteBuffer |
| import java.util |
| import java.util.concurrent.{ConcurrentHashMap, LinkedBlockingQueue} |
| |
| import javax.jms.MessageConsumer |
| import org.apache.activemq.broker.BrokerService |
| import org.apache.amaterasu.common.configuration.ClusterConfig |
| import org.apache.amaterasu.common.dataobjects.ActionData |
| import org.apache.amaterasu.common.logging.Logging |
| import org.apache.amaterasu.leader.common.execution.JobManager |
| import org.apache.amaterasu.leader.common.execution.frameworks.FrameworkProvidersFactory |
| import org.apache.amaterasu.leader.common.utilities.MessagingClientUtil |
| import org.apache.amaterasu.leader.execution.JobLoader |
| import org.apache.amaterasu.leader.utilities.Args |
| import org.apache.curator.framework.recipes.barriers.DistributedBarrier |
| import org.apache.curator.framework.{CuratorFramework, CuratorFrameworkFactory} |
| import org.apache.curator.retry.ExponentialBackoffRetry |
| import org.apache.hadoop.fs.{FileSystem, Path} |
| import org.apache.hadoop.io.DataOutputBuffer |
| import org.apache.hadoop.security.UserGroupInformation |
| import org.apache.hadoop.yarn.api.records._ |
| import org.apache.hadoop.yarn.client.api.AMRMClient.ContainerRequest |
| import org.apache.hadoop.yarn.client.api.async.impl.NMClientAsyncImpl |
| import org.apache.hadoop.yarn.client.api.async.{AMRMClientAsync, NMClientAsync} |
| import org.apache.hadoop.yarn.conf.YarnConfiguration |
| import org.apache.hadoop.yarn.security.AMRMTokenIdentifier |
| import org.apache.hadoop.yarn.util.{ConverterUtils, Records} |
| import org.apache.zookeeper.CreateMode |
| |
| import scala.collection.JavaConversions._ |
| import scala.collection.JavaConverters._ |
| import scala.collection.{concurrent, mutable} |
| import scala.concurrent.ExecutionContext.Implicits.global |
| import scala.concurrent.Future |
| import scala.util.{Failure, Success} |
| |
| class ApplicationMaster extends Logging with AMRMClientAsync.CallbackHandler { |
| |
| var capability: Resource = _ |
| |
| log.info("ApplicationMaster start") |
| |
| private var jobManager: JobManager = _ |
| private var client: CuratorFramework = _ |
| private var config: ClusterConfig = _ |
| private var env: String = _ |
| private var branch: String = _ |
| private var fs: FileSystem = _ |
| private var conf: YarnConfiguration = _ |
| private var propPath: String = "" |
| private var props: InputStream = _ |
| private var jarPath: Path = _ |
| private var executorPath: Path = _ |
| private var executorJar: LocalResource = _ |
| private var propFile: LocalResource = _ |
| private var log4jPropFile: LocalResource = _ |
| private var nmClient: NMClientAsync = _ |
| private var allocListener: YarnRMCallbackHandler = _ |
| private var rmClient: AMRMClientAsync[ContainerRequest] = _ |
| private var address: String = _ |
| private var consumer: MessageConsumer = _ |
| |
| private val containersIdsToTask: concurrent.Map[Long, ActionData] = new ConcurrentHashMap[Long, ActionData].asScala |
| private val completedContainersAndTaskIds: concurrent.Map[Long, String] = new ConcurrentHashMap[Long, String].asScala |
| private val actionsBuffer: java.util.concurrent.ConcurrentLinkedQueue[ActionData] = new java.util.concurrent.ConcurrentLinkedQueue[ActionData]() |
| private val host: String = InetAddress.getLocalHost.getHostName |
| private val broker: BrokerService = new BrokerService() |
| |
| |
| def setLocalResourceFromPath(path: Path): LocalResource = { |
| |
| val stat = fs.getFileStatus(path) |
| val fileResource = Records.newRecord(classOf[LocalResource]) |
| |
| fileResource.setShouldBeUploadedToSharedCache(true) |
| fileResource.setVisibility(LocalResourceVisibility.PUBLIC) |
| fileResource.setResource(ConverterUtils.getYarnUrlFromPath(path)) |
| fileResource.setSize(stat.getLen) |
| fileResource.setTimestamp(stat.getModificationTime) |
| fileResource.setType(LocalResourceType.FILE) |
| fileResource.setVisibility(LocalResourceVisibility.PUBLIC) |
| fileResource |
| |
| } |
| |
| def execute(arguments: Args): Unit = { |
| |
| log.info(s"Started AM with args $arguments") |
| |
| propPath = System.getenv("PWD") + "/amaterasu.properties" |
| props = new FileInputStream(new File(propPath)) |
| |
| // no need for hdfs double check (nod to Aaron Rodgers) |
| // jars on HDFS should have been verified by the YARN client |
| conf = new YarnConfiguration() |
| fs = FileSystem.get(conf) |
| |
| config = ClusterConfig(props) |
| |
| try { |
| initJob(arguments) |
| } catch { |
| case e: Exception => log.error("error initializing ", e.getMessage) |
| } |
| |
| // now that the job was initiated, the curator client is Started and we can |
| // register the broker's address |
| client.create().withMode(CreateMode.PERSISTENT).forPath(s"/${jobManager.getJobId}/broker") |
| client.setData().forPath(s"/${jobManager.getJobId}/broker", address.getBytes) |
| |
| // once the broker is registered, we can remove the barrier so clients can connect |
| log.info(s"/${jobManager.getJobId}-report-barrier") |
| val barrier = new DistributedBarrier(client, s"/${jobManager.getJobId}-report-barrier") |
| barrier.removeBarrier() |
| |
| consumer = MessagingClientUtil.setupMessaging(address) |
| |
| log.info(s"Job ${jobManager.getJobId} initiated with ${jobManager.getRegisteredActions.size} actions") |
| |
| jarPath = new Path(config.YARNConf.hdfsJarsPath) |
| |
| // TODO: change this to read all dist folder and add to exec path |
| executorPath = Path.mergePaths(jarPath, new Path(s"/dist/executor-${config.version}-all.jar")) |
| log.info("Executor jar path is {}", executorPath) |
| executorJar = setLocalResourceFromPath(executorPath) |
| propFile = setLocalResourceFromPath(Path.mergePaths(jarPath, new Path("/amaterasu.properties"))) |
| log4jPropFile = setLocalResourceFromPath(Path.mergePaths(jarPath, new Path("/log4j.properties"))) |
| |
| log.info("Started execute") |
| |
| nmClient = new NMClientAsyncImpl(new YarnNMCallbackHandler()) |
| |
| // Initialize clients to ResourceManager and NodeManagers |
| nmClient.init(conf) |
| nmClient.start() |
| |
| // TODO: awsEnv currently set to empty string. should be changed to read values from (where?). |
| allocListener = new YarnRMCallbackHandler(nmClient, jobManager, env, awsEnv = "", config, executorJar) |
| |
| rmClient = startRMClient() |
| val registrationResponse = registerAppMaster("", 0, "") |
| val maxMem = registrationResponse.getMaximumResourceCapability.getMemory |
| log.info("Max mem capability of resources in this cluster " + maxMem) |
| val maxVCores = registrationResponse.getMaximumResourceCapability.getVirtualCores |
| log.info("Max vcores capability of resources in this cluster " + maxVCores) |
| log.info(s"Created jobManager. jobManager.registeredActions.size: ${jobManager.getRegisteredActions.size}") |
| |
| // Resource requirements for worker containers |
| this.capability = Records.newRecord(classOf[Resource]) |
| val frameworkFactory = FrameworkProvidersFactory.apply(env, config) |
| |
| while (!jobManager.getOutOfActions) { |
| val actionData = jobManager.getNextActionData |
| if (actionData != null) { |
| |
| val frameworkProvider = frameworkFactory.providers(actionData.getGroupId) |
| val driverConfiguration = frameworkProvider.getDriverConfiguration |
| |
| var mem: Int = driverConfiguration.getMemory |
| mem = Math.min(mem, maxMem) |
| this.capability.setMemory(mem) |
| |
| var cpu = driverConfiguration.getCpus |
| cpu = Math.min(cpu, maxVCores) |
| this.capability.setVirtualCores(cpu) |
| |
| askContainer(actionData) |
| } |
| } |
| |
| log.info("Finished asking for containers") |
| } |
| |
| private def startRMClient(): AMRMClientAsync[ContainerRequest] = { |
| val client = AMRMClientAsync.createAMRMClientAsync[ContainerRequest](1000, this) |
| client.init(conf) |
| client.start() |
| client |
| } |
| |
| private def registerAppMaster(host: String, port: Int, url: String) = { |
| // Register with ResourceManager |
| log.info("Registering application") |
| val registrationResponse = rmClient.registerApplicationMaster(host, port, url) |
| log.info("Registered application") |
| registrationResponse |
| } |
| |
| // private def setupMessaging(jobId: String): Unit = { |
| // |
| // val cf = new ActiveMQConnectionFactory(address) |
| // val conn = cf.createConnection() |
| // conn.start() |
| // |
| // val session = conn.createSession(false, Session.AUTO_ACKNOWLEDGE) |
| // //TODO: move to a const in common |
| // val destination = session.createTopic("JOB.REPORT") |
| // |
| // val consumer = session.createConsumer(destination) |
| // consumer.setMessageListener(new ActiveReportListener) |
| // |
| // } |
| |
| private def askContainer(actionData: ActionData): Unit = { |
| |
| actionsBuffer.add(actionData) |
| log.info(s"About to ask container for action ${actionData.getId}. Action buffer size is: ${actionsBuffer.size()}") |
| |
| // we have an action to schedule, let's request a container |
| val priority: Priority = Records.newRecord(classOf[Priority]) |
| priority.setPriority(1) |
| val containerReq = new ContainerRequest(capability, null, null, priority) |
| rmClient.addContainerRequest(containerReq) |
| log.info(s"Asked container for action ${actionData.getId}") |
| |
| } |
| |
| override def onContainersAllocated(containers: util.List[Container]): Unit = { |
| |
| log.info(s"${containers.size()} Containers allocated") |
| for (container <- containers.asScala) { // Launch container by create ContainerLaunchContext |
| if (actionsBuffer.isEmpty) { |
| log.warn(s"Why actionBuffer empty and i was called?. Container ids: ${containers.map(c => c.getId.getContainerId)}") |
| return |
| } |
| |
| val actionData = actionsBuffer.poll() |
| val containerTask = Future[ActionData] { |
| |
| val frameworkFactory = FrameworkProvidersFactory(env, config) |
| val framework = frameworkFactory.getFramework(actionData.getGroupId) |
| val runnerProvider = framework.getRunnerProvider(actionData.getTypeId) |
| val ctx = Records.newRecord(classOf[ContainerLaunchContext]) |
| val commands: List[String] = List(runnerProvider.getCommand(jobManager.getJobId, actionData, env, s"${actionData.getId}-${container.getId.getContainerId}", address)) |
| |
| log.info("Running container id {}.", container.getId.getContainerId) |
| log.info("Running container id {} with command '{}'", container.getId.getContainerId, commands.last) |
| |
| ctx.setCommands(commands) |
| ctx.setTokens(allTokens) |
| |
| val yarnJarPath = new Path(config.YARNConf.hdfsJarsPath) |
| |
| //TODO Eyal - Remove the hardcoding of the dist path |
| /* val resources = mutable.Map[String, LocalResource]() |
| val binaryFileIter = fs.listFiles(new Path(s"${config.YARN.hdfsJarsPath}/dist"), false) |
| while (binaryFileIter.hasNext) { |
| val eachFile = binaryFileIter.next().getPath |
| resources (eachFile.getName) = setLocalResourceFromPath(fs.makeQualified(eachFile)) |
| } |
| resources("log4j.properties") = setLocalResourceFromPath(fs.makeQualified(new Path(s"${config.YARN.hdfsJarsPath}/log4j.properties"))) |
| resources ("amaterasu.properties") = setLocalResourceFromPath(fs.makeQualified(new Path(s"${config.YARN.hdfsJarsPath}/amaterasu.properties")))*/ |
| |
| val resources = mutable.Map[String, LocalResource]( |
| "executor.jar" -> setLocalResourceFromPath(Path.mergePaths(yarnJarPath, new Path(s"/dist/executor-${config.version}-all.jar"))), |
| "spark-runner.jar" -> setLocalResourceFromPath(Path.mergePaths(yarnJarPath, new Path(s"/dist/spark-runner-${config.version}-all.jar"))), |
| "spark-runtime.jar" -> setLocalResourceFromPath(Path.mergePaths(yarnJarPath, new Path(s"/dist/spark-runtime-${config.version}.jar"))), |
| "amaterasu.properties" -> setLocalResourceFromPath(Path.mergePaths(yarnJarPath, new Path("/amaterasu.properties"))), |
| "log4j.properties" -> setLocalResourceFromPath(Path.mergePaths(yarnJarPath, new Path("/log4j.properties"))), |
| // TODO: Nadav/Eyal all of these should move to the executor resource setup |
| "miniconda.sh" -> setLocalResourceFromPath(Path.mergePaths(yarnJarPath, new Path("/dist/miniconda.sh"))), |
| "codegen.py" -> setLocalResourceFromPath(Path.mergePaths(yarnJarPath, new Path("/dist/codegen.py"))), |
| "runtime.py" -> setLocalResourceFromPath(Path.mergePaths(yarnJarPath, new Path("/dist/runtime.py"))), |
| "spark-version-info.properties" -> setLocalResourceFromPath(Path.mergePaths(yarnJarPath, new Path("/dist/spark-version-info.properties"))), |
| "spark_intp.py" -> setLocalResourceFromPath(Path.mergePaths(yarnJarPath, new Path("/dist/spark_intp.py")))) |
| |
| //adding the framework and executor resources |
| setupResources(yarnJarPath, framework.getGroupIdentifier, resources, framework.getGroupIdentifier) |
| setupResources(yarnJarPath, s"${framework.getGroupIdentifier}/${actionData.getTypeId}", resources, s"${framework.getGroupIdentifier}-${actionData.getTypeId}") |
| |
| ctx.setLocalResources(resources) |
| |
| ctx.setEnvironment(Map[String, String]( |
| "HADOOP_CONF_DIR" -> s"${config.YARNConf.hadoopHomeDir}/conf/", |
| "YARN_CONF_DIR" -> s"${config.YARNConf.hadoopHomeDir}/conf/", |
| "AMA_NODE" -> sys.env("AMA_NODE"), |
| "HADOOP_USER_NAME" -> UserGroupInformation.getCurrentUser.getUserName |
| )) |
| |
| log.info(s"hadoop conf dir is ${config.YARNConf.hadoopHomeDir}/conf/") |
| nmClient.startContainerAsync(container, ctx) |
| actionData |
| } |
| |
| containerTask onComplete { |
| case Failure(t) => |
| log.error(s"launching container Failed", t) |
| askContainer(actionData) |
| |
| case Success(requestedActionData) => |
| jobManager.actionStarted(requestedActionData.getId) |
| containersIdsToTask.put(container.getId.getContainerId, requestedActionData) |
| log.info(s"launching container succeeded: ${container.getId.getContainerId}; task: ${requestedActionData.getId}") |
| |
| } |
| } |
| } |
| |
| private def allTokens: ByteBuffer = { |
| // creating the credentials for container execution |
| val credentials = UserGroupInformation.getCurrentUser.getCredentials |
| val dob = new DataOutputBuffer |
| credentials.writeTokenStorageToStream(dob) |
| |
| // removing the AM->RM token so that containers cannot access it. |
| val iter = credentials.getAllTokens.iterator |
| log.info("Executing with tokens:") |
| for (token <- iter) { |
| log.info(token.toString) |
| if (token.getKind == AMRMTokenIdentifier.KIND_NAME) iter.remove() |
| } |
| ByteBuffer.wrap(dob.getData, 0, dob.getLength) |
| } |
| |
| private def setupResources(yarnJarPath: Path, frameworkPath: String, countainerResources: mutable.Map[String, LocalResource], resourcesPath: String): Unit = { |
| |
| val sourcePath = Path.mergePaths(yarnJarPath, new Path(s"/$resourcesPath")) |
| |
| if (fs.exists(sourcePath)) { |
| |
| val files = fs.listFiles(sourcePath, true) |
| |
| while (files.hasNext) { |
| val res = files.next() |
| val containerPath = res.getPath.toUri.getPath.replace("/apps/amaterasu/", "") |
| countainerResources.put(containerPath, setLocalResourceFromPath(res.getPath)) |
| } |
| } |
| } |
| |
| def stopApplication(finalApplicationStatus: FinalApplicationStatus, appMessage: String): Unit = { |
| import java.io.IOException |
| |
| import org.apache.hadoop.yarn.exceptions.YarnException |
| try |
| rmClient.unregisterApplicationMaster(finalApplicationStatus, appMessage, null) |
| catch { |
| case ex: YarnException => |
| log.error("Failed to unregister application", ex) |
| case e: IOException => |
| log.error("Failed to unregister application", e) |
| } |
| rmClient.stop() |
| nmClient.stop() |
| } |
| |
| override def onContainersCompleted(statuses: util.List[ContainerStatus]): Unit = { |
| |
| for (status <- statuses.asScala) { |
| |
| if (status.getState == ContainerState.COMPLETE) { |
| |
| val containerId = status.getContainerId.getContainerId |
| val task = containersIdsToTask(containerId) |
| rmClient.releaseAssignedContainer(status.getContainerId) |
| |
| val taskId = task.getId |
| if (status.getExitStatus == 0) { |
| |
| //completedContainersAndTaskIds.put(containerId, task.id) |
| jobManager.actionComplete(taskId) |
| log.info(s"Container $containerId Complete with task ${taskId} with success.") |
| } else { |
| // TODO: Check the getDiagnostics value and see if appropriate |
| jobManager.actionFailed(taskId, status.getDiagnostics) |
| log.warn(s"Container $containerId Complete with task ${taskId} with Failed status code (${status.getExitStatus})") |
| } |
| } |
| } |
| |
| if (jobManager.getOutOfActions) { |
| log.info("Finished all tasks successfully! Wow!") |
| jobManager.actionsCount() |
| stopApplication(FinalApplicationStatus.SUCCEEDED, "SUCCESS") |
| } else { |
| log.info(s"jobManager.registeredActions.size: ${jobManager.getRegisteredActions.size}; completedContainersAndTaskIds.size: ${completedContainersAndTaskIds.size}") |
| } |
| } |
| |
| override def getProgress: Float = { |
| jobManager.getRegisteredActions.size.toFloat / completedContainersAndTaskIds.size |
| } |
| |
| override def onNodesUpdated(updatedNodes: util.List[NodeReport]): Unit = { |
| log.info("Nodes change. Nothing to report.") |
| } |
| |
| override def onShutdownRequest(): Unit = { |
| log.error("Shutdown requested.") |
| stopApplication(FinalApplicationStatus.KILLED, "Shutdown requested") |
| } |
| |
| override def onError(e: Throwable): Unit = { |
| log.error("Error on AM", e) |
| stopApplication(FinalApplicationStatus.FAILED, "Error on AM") |
| } |
| |
| def initJob(args: Args): Unit = { |
| |
| this.env = args.env |
| this.branch = args.branch |
| try { |
| val retryPolicy = new ExponentialBackoffRetry(1000, 3) |
| client = CuratorFrameworkFactory.newClient(config.zk, retryPolicy) |
| client.start() |
| } catch { |
| case e: Exception => |
| log.error("Error connecting to zookeeper", e) |
| throw e |
| } |
| if (args.jobId != null && !args.jobId.isEmpty) { |
| log.info("resuming job" + args.jobId) |
| jobManager = JobLoader.reloadJob( |
| args.jobId, |
| client, |
| config.Jobs.Tasks.attempts, |
| new LinkedBlockingQueue[ActionData]) |
| |
| } else { |
| log.info("new job is being created") |
| try { |
| |
| jobManager = JobLoader.loadJob( |
| args.repo, |
| args.branch, |
| args.newJobId, |
| client, |
| config.Jobs.Tasks.attempts, |
| new LinkedBlockingQueue[ActionData]) |
| } catch { |
| case e: Exception => |
| log.error("Error creating JobManager.", e) |
| throw e |
| } |
| |
| } |
| |
| jobManager.start() |
| log.info("Started jobManager") |
| } |
| } |
| |
| object ApplicationMaster extends Logging with App { |
| |
| |
| val parser = Args.getParser |
| parser.parse(args, Args()) match { |
| |
| case Some(arguments: Args) => |
| val appMaster = new ApplicationMaster() |
| |
| appMaster.address = MessagingClientUtil.getBorkerAddress |
| appMaster.broker.addConnector(appMaster.address) |
| appMaster.broker.start() |
| |
| log.info(s"broker Started with address ${appMaster.address}") |
| appMaster.execute(arguments) |
| |
| case None => |
| } |
| |
| |
| } |