| /* |
| * Licensed to the Apache Software Foundation (ASF) under one or more |
| * contributor license agreements. See the NOTICE file distributed with |
| * this work for additional information regarding copyright ownership. |
| * The ASF licenses this file to You under the Apache License, Version 2.0 |
| * (the "License"); you may not use this file except in compliance with |
| * the License. You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, software |
| * distributed under the License is distributed on an "AS IS" BASIS, |
| * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| * See the License for the specific language governing permissions and |
| * limitations under the License. |
| */ |
| package org.apache.amaterasu.leader.yarn |
| |
| import com.fasterxml.jackson.databind.ObjectMapper |
| import com.fasterxml.jackson.dataformat.yaml.YAMLFactory |
| import com.fasterxml.jackson.module.kotlin.KotlinModule |
| |
| import kotlinx.coroutines.runBlocking |
| |
| import org.apache.activemq.broker.BrokerService |
| |
| import org.apache.amaterasu.common.configuration.ClusterConfig |
| import org.apache.amaterasu.common.dataobjects.ActionData |
| import org.apache.amaterasu.common.logging.KLogging |
| import org.apache.amaterasu.common.utils.ActiveNotifier |
| import org.apache.amaterasu.common.configuration.ConfigManager |
| import org.apache.amaterasu.leader.common.execution.JobLoader |
| import org.apache.amaterasu.leader.common.execution.JobManager |
| import org.apache.amaterasu.leader.common.execution.frameworks.FrameworkProvidersFactory |
| import org.apache.amaterasu.leader.common.launcher.AmaOpts |
| import org.apache.amaterasu.leader.common.utilities.DataLoader |
| import org.apache.amaterasu.leader.common.utilities.MessagingClientUtil |
| import org.apache.amaterasu.sdk.frameworks.FrameworkSetupProvider |
| import org.apache.amaterasu.sdk.frameworks.RunnerSetupProvider |
| import org.apache.commons.lang.exception.ExceptionUtils |
| |
| import org.apache.curator.framework.CuratorFramework |
| import org.apache.curator.framework.CuratorFrameworkFactory |
| import org.apache.curator.framework.recipes.barriers.DistributedBarrier |
| import org.apache.curator.retry.ExponentialBackoffRetry |
| |
| import org.apache.hadoop.fs.FileSystem |
| import org.apache.hadoop.fs.Path |
| import org.apache.hadoop.io.DataOutputBuffer |
| import org.apache.hadoop.security.UserGroupInformation |
| import org.apache.hadoop.yarn.api.records.* |
| import org.apache.hadoop.yarn.client.api.AMRMClient |
| import org.apache.hadoop.yarn.client.api.async.AMRMClientAsync |
| import org.apache.hadoop.yarn.client.api.async.NMClientAsync |
| import org.apache.hadoop.yarn.client.api.async.impl.NMClientAsyncImpl |
| import org.apache.hadoop.yarn.conf.YarnConfiguration |
| import org.apache.hadoop.yarn.exceptions.YarnException |
| import org.apache.hadoop.yarn.security.AMRMTokenIdentifier |
| import org.apache.hadoop.yarn.util.Records |
| |
| import org.apache.zookeeper.CreateMode |
| import java.io.* |
| |
| import java.nio.ByteBuffer |
| import java.util.concurrent.ConcurrentHashMap |
| import java.util.concurrent.ConcurrentLinkedQueue |
| import java.util.concurrent.LinkedBlockingQueue |
| |
| import javax.jms.MessageConsumer |
| |
| class ApplicationMaster : KLogging(), AMRMClientAsync.CallbackHandler { |
| |
| |
| lateinit var address: String |
| |
| val broker: BrokerService = BrokerService() |
| private val conf = YarnConfiguration() |
| |
| |
| private val actionsBuffer = ConcurrentLinkedQueue<ActionData>() |
| private val completedContainersAndTaskIds = ConcurrentHashMap<Long, String>() |
| private val containersIdsToTask = ConcurrentHashMap<Long, ActionData>() |
| private val yamlMapper = ObjectMapper(YAMLFactory()) |
| |
| private lateinit var propPath: String |
| private lateinit var props: FileInputStream |
| private lateinit var jobManager: JobManager |
| private lateinit var zkClient: CuratorFramework |
| private lateinit var env: String |
| private lateinit var rmClient: AMRMClientAsync<AMRMClient.ContainerRequest> |
| private lateinit var frameworkFactory: FrameworkProvidersFactory |
| private lateinit var config: ClusterConfig |
| private lateinit var fs: FileSystem |
| private lateinit var consumer: MessageConsumer |
| private lateinit var configManager: ConfigManager |
| private lateinit var notifier: ActiveNotifier |
| private lateinit var nmClient: NMClientAsync |
| |
| init { |
| yamlMapper.registerModule(KotlinModule()) |
| } |
| |
| fun execute(opts: AmaOpts) { |
| |
| propPath = System.getenv("PWD") + "/amaterasu.properties" |
| props = FileInputStream(File(propPath)) |
| |
| // no need for HDFS double check (nod to Aaron Rodgers) |
| // jars on HDFS should have been verified by the YARN zkClient |
| config = ClusterConfig.apply(props) |
| fs = FileSystem.get(conf) |
| |
| initJob(opts) |
| |
| // now that the job was initiated, the curator zkClient is Started and we can |
| // register the broker's address |
| zkClient.create().withMode(CreateMode.PERSISTENT).forPath("/${jobManager.jobId}/broker") |
| zkClient.setData().forPath("/${jobManager.jobId}/broker", address.toByteArray()) |
| |
| // once the broker is registered, we can remove the barrier so clients can connect |
| log.info("/${jobManager.jobId}-report-barrier") |
| val barrier = DistributedBarrier(zkClient, "/${jobManager.jobId}-report-barrier") |
| barrier.removeBarrier() |
| |
| consumer = MessagingClientUtil.setupMessaging(address) |
| notifier = ActiveNotifier(address) |
| |
| log.info("number of messages ${broker.adminView.totalMessageCount}") |
| |
| // Initialize clients to ResourceManager and NodeManagers |
| nmClient = NMClientAsyncImpl(YarnNMCallbackHandler(notifier)) |
| nmClient.init(conf) |
| nmClient.start() |
| |
| rmClient = startRMClient() |
| |
| val items = mutableListOf<FrameworkSetupProvider>() |
| |
| for (p in frameworkFactory.providers().values()) { |
| items.add(p) |
| } |
| val configItems = items.flatMap { it.configurationItems.asIterable() } |
| configManager = ConfigManager(env, "repo", configItems) |
| |
| |
| val registrationResponse = rmClient.registerApplicationMaster("", 0, "") |
| val maxMem = registrationResponse.maximumResourceCapability.memorySize |
| val maxVCores = registrationResponse.maximumResourceCapability.virtualCores |
| |
| while (!jobManager.outOfActions) { |
| val capability = Records.newRecord(Resource::class.java) |
| |
| val actionData = jobManager.nextActionData |
| if (actionData != null) { |
| |
| notifier.info("requesting container fo ${actionData.name}") |
| val frameworkProvider = frameworkFactory.getFramework(actionData.groupId) |
| val driverConfiguration = frameworkProvider.getDriverConfiguration(configManager) |
| |
| var mem: Long = driverConfiguration.memory.toLong() |
| mem = Math.min(mem, maxMem) |
| capability.memorySize = mem |
| |
| var cpu = driverConfiguration.cpus |
| cpu = Math.min(cpu, maxVCores) |
| capability.virtualCores = cpu |
| |
| createTaskConfiguration(actionData) |
| requestContainer(actionData, capability) |
| |
| } |
| } |
| |
| log.info("Finished requesting containers") |
| readLine() |
| } |
| |
| private fun initJob(opts: AmaOpts) { |
| |
| this.env = opts.env |
| frameworkFactory = FrameworkProvidersFactory.apply(env, config) |
| |
| try { |
| val retryPolicy = ExponentialBackoffRetry(1000, 3) |
| zkClient = CuratorFrameworkFactory.newClient(config.zk(), retryPolicy) |
| zkClient.start() |
| } catch (e: Exception) { |
| log.error("Error connecting to zookeeper", e) |
| throw e |
| } |
| |
| val zkPath = zkClient.checkExists().forPath("/${opts.newJobId}") |
| |
| log.info("zkPath is $zkPath") |
| if (zkPath != null) { |
| log.info("resuming job" + opts.newJobId) |
| jobManager = JobLoader.reloadJob( |
| opts.newJobId, |
| opts.userName, |
| opts.password, |
| zkClient, |
| config.Jobs().tasks().attempts(), |
| LinkedBlockingQueue<ActionData>()) |
| |
| } else { |
| log.info("new job is being created") |
| try { |
| |
| jobManager = JobLoader.loadJob( |
| opts.repo, |
| opts.branch, |
| opts.newJobId, |
| opts.userName, |
| opts.password, |
| zkClient, |
| config.Jobs().tasks().attempts(), |
| LinkedBlockingQueue<ActionData>()) |
| } catch (e: Exception) { |
| log.error("Error creating JobManager.", e) |
| throw e |
| } |
| |
| } |
| |
| jobManager.start() |
| log.info("Started jobManager") |
| } |
| |
| override fun onContainersAllocated(containers: MutableList<Container>?) = runBlocking { |
| containers?.let { |
| for (container in it) { |
| |
| log.info("container ${container.id} allocated") |
| if (actionsBuffer.isNotEmpty()) { |
| val actionData = actionsBuffer.poll() |
| //val cd = async { |
| try { |
| log.info("container ${container.id} allocated") |
| |
| val framework = frameworkFactory.getFramework(actionData.groupId) |
| val runnerProvider = framework.getRunnerProvider(actionData.typeId) |
| val ctx = Records.newRecord(ContainerLaunchContext::class.java) |
| |
| val envConf = configManager.getActionConfiguration(actionData.name, actionData.config) |
| val commands: List<String> = listOf(runnerProvider.getCommand(jobManager.jobId, actionData, envConf, "${actionData.id}-${container.id.containerId}", address)) |
| |
| notifier.info("container command ${commands.joinToString(prefix = " ", postfix = " ")}") |
| ctx.commands = commands |
| ctx.tokens = allTokens() |
| |
| ctx.localResources = setupContainerResources(framework, runnerProvider, actionData) |
| ctx.environment = framework.environmentVariables |
| |
| nmClient.startContainerAsync(container, ctx) |
| |
| jobManager.actionStarted(actionData.id) |
| containersIdsToTask[container.id.containerId] = actionData |
| notifier.info("created container for ${actionData.name} created") |
| |
| log.info("launching container succeeded: ${container.id.containerId}; task: ${actionData.id}") |
| } catch (e: Exception) { |
| notifier.error("", "error launching container with ${e.message} in ${ExceptionUtils.getStackTrace(e)}") |
| requestContainer(actionData, container.resource) |
| } |
| } |
| } |
| } |
| }!! |
| |
| private fun allTokens(): ByteBuffer { |
| // creating the credentials for container execution |
| val credentials = UserGroupInformation.getCurrentUser().credentials |
| val dob = DataOutputBuffer() |
| credentials.writeTokenStorageToStream(dob) |
| |
| // removing the AM->RM token so that containers cannot access it. |
| val iter = credentials.allTokens.iterator() |
| log.info("Executing with tokens:") |
| for (token in iter) { |
| log.info(token.toString()) |
| if (token.kind == AMRMTokenIdentifier.KIND_NAME) iter.remove() |
| } |
| return ByteBuffer.wrap(dob.data, 0, dob.length) |
| } |
| |
| /** |
| * Creates the map of resources to be copied into the container |
| * @framework The frameworkSetupProvider for the action |
| * @runnerProvider the actions runner provider |
| */ |
| private fun setupContainerResources(framework: FrameworkSetupProvider, runnerProvider: RunnerSetupProvider, actionData: ActionData): Map<String, LocalResource> { |
| |
| val yarnJarPath = Path(config.yarn().hdfsJarsPath()) |
| |
| // Getting framework (group) resources |
| val result = framework.groupResources.map { it.path to createLocalResourceFromPath(Path.mergePaths(yarnJarPath, createDistPath(it.path))) }.toMap().toMutableMap() |
| |
| // Getting runner resources |
| result.putAll(runnerProvider.runnerResources.map { it to createLocalResourceFromPath(Path.mergePaths(yarnJarPath, createDistPath(it))) }.toMap()) |
| |
| // getting the action specific resources |
| result.putAll(runnerProvider.getActionResources(jobManager.jobId, actionData).map { it.removePrefix("${jobManager.jobId}/${actionData.name}/") to createLocalResourceFromPath(Path.mergePaths(yarnJarPath, createDistPath(it))) }) |
| |
| // getting the action specific dependencies |
| runnerProvider.getActionDependencies(jobManager.jobId, actionData).forEach { distributeFile(it, "${jobManager.jobId}/${actionData.name}/") } |
| |
| result.putAll(runnerProvider.getActionDependencies(jobManager.jobId, actionData).map { File(it).name to createLocalResourceFromPath(Path.mergePaths(yarnJarPath, createDistPath("${jobManager.jobId}/${actionData.name}/$it"))) }) |
| |
| // Adding the Amaterasu configuration files |
| result["amaterasu.properties"] = createLocalResourceFromPath(Path.mergePaths(yarnJarPath, Path("/amaterasu.properties"))) |
| result["log4j.properties"] = createLocalResourceFromPath(Path.mergePaths(yarnJarPath, Path("/log4j.properties"))) |
| |
| // getting the action executable |
| val executable = runnerProvider.getActionExecutable(jobManager.jobId, actionData) |
| |
| // setting the action executable |
| distributeFile(executable, "${jobManager.jobId}/${actionData.name}/") |
| result[File(executable).name] = createLocalResourceFromPath(Path.mergePaths(yarnJarPath, createDistPath("${jobManager.jobId}/${actionData.name}/$executable"))) |
| |
| result.forEach { log.debug("entry ${it.key} with value ${it.value}") } |
| |
| return result.map { x -> x.key.removePrefix("/") to x.value }.toMap() |
| } |
| |
| private fun createTaskConfiguration(actionData: ActionData) { |
| |
| // setting up the configuration files for the container |
| val envYaml = configManager.getActionConfigContent(actionData.name, actionData.config) |
| writeConfigFile(envYaml, jobManager.jobId, actionData.name, "env.yaml") |
| |
| val dataStores = DataLoader.getTaskData(actionData, env).exports |
| val dataStoresYaml = yamlMapper.writeValueAsString(dataStores) |
| writeConfigFile(dataStoresYaml, jobManager.jobId, actionData.name, "datastores.yaml") |
| |
| val datasets = DataLoader.getDatasets(env) |
| writeConfigFile(datasets, jobManager.jobId, actionData.name, "datasets.yaml") |
| |
| writeConfigFile("jobId: ${jobManager.jobId}\nactionName: ${actionData.name}", jobManager.jobId, actionData.name, "runtime.yaml") |
| |
| } |
| |
| private fun writeConfigFile(content: String, jobId: String, actionName: String, fileName: String) { |
| |
| val actionDistPath = createDistPath("$jobId/$actionName/$fileName") |
| val yarnJarPath = Path(config.yarn().hdfsJarsPath()) |
| val targetPath = Path.mergePaths(yarnJarPath, actionDistPath) |
| |
| val outputStream = fs.create(targetPath) |
| outputStream.write(content.toByteArray()) |
| outputStream.close() |
| log.info("written file $targetPath") |
| |
| } |
| |
| private fun distributeFile(file: String, distributionPath: String) { |
| |
| |
| val actionDistPath = createDistPath("$distributionPath/$file") |
| val yarnJarPath = Path(config.yarn().hdfsJarsPath()) |
| val targetPath = Path.mergePaths(yarnJarPath, actionDistPath) |
| |
| notifier.info("copying file $file, file status ${File(file).exists()} to $targetPath") |
| |
| log.info("target is $targetPath") |
| |
| fs.copyFromLocalFile(false, true, Path(file), targetPath) |
| |
| } |
| |
| private fun createDistPath(path: String): Path = Path("/dist/$path") |
| |
| private fun startRMClient(): AMRMClientAsync<AMRMClient.ContainerRequest> { |
| val client = AMRMClientAsync.createAMRMClientAsync<AMRMClient.ContainerRequest>(1000, this) |
| client.init(conf) |
| client.start() |
| return client |
| } |
| |
| private fun createLocalResourceFromPath(path: Path): LocalResource { |
| |
| val stat = fs.getFileStatus(path) |
| val fileResource = Records.newRecord(LocalResource::class.java) |
| |
| fileResource.shouldBeUploadedToSharedCache = true |
| fileResource.visibility = LocalResourceVisibility.PUBLIC |
| fileResource.resource = URL.fromPath(path) |
| fileResource.size = stat.len |
| fileResource.timestamp = stat.modificationTime |
| fileResource.type = LocalResourceType.FILE |
| fileResource.visibility = LocalResourceVisibility.PUBLIC |
| return fileResource |
| |
| } |
| |
| private fun requestContainer(actionData: ActionData, capability: Resource) { |
| |
| actionsBuffer.add(actionData) |
| log.info("About to ask container for action ${actionData.id} with mem ${capability.memorySize} and cores ${capability.virtualCores}. Action buffer size is: ${actionsBuffer.size}") |
| |
| // we have an action to schedule, let's request a container |
| val priority: Priority = Records.newRecord(Priority::class.java) |
| priority.priority = 1 |
| val containerReq = AMRMClient.ContainerRequest(capability, null, null, priority) |
| rmClient.addContainerRequest(containerReq) |
| log.info("Asked container for action ${actionData.id}") |
| |
| } |
| |
| override fun onNodesUpdated(updatedNodes: MutableList<NodeReport>?) { |
| log.info("Nodes change. Nothing to report.") |
| } |
| |
| override fun onShutdownRequest() { |
| log.error("Shutdown requested.") |
| stopApplication(FinalApplicationStatus.KILLED, "Shutdown requested") |
| } |
| |
| override fun getProgress(): Float { |
| return jobManager.registeredActions.size.toFloat() / completedContainersAndTaskIds.size |
| } |
| |
| override fun onError(e: Throwable?) { |
| notifier.error("Error running a container ${e!!.message!!}", ExceptionUtils.getStackTrace(e)) |
| stopApplication(FinalApplicationStatus.FAILED, "Error on AM") |
| } |
| |
| override fun onContainersCompleted(statuses: MutableList<ContainerStatus>?) { |
| for (status in statuses!!) { |
| if (status.state == ContainerState.COMPLETE) { |
| |
| val containerId = status.containerId.containerId |
| val task = containersIdsToTask[containerId] |
| rmClient.releaseAssignedContainer(status.containerId) |
| |
| val taskId = task!!.id |
| if (status.exitStatus == 0) { |
| |
| //completedContainersAndTaskIds.put(containerId, task.id) |
| jobManager.actionComplete(taskId) |
| notifier.info("Container $containerId Complete with task $taskId with success.") |
| } else { |
| // TODO: Check the getDiagnostics value and see if appropriate |
| jobManager.actionFailed(taskId, status.diagnostics) |
| notifier.error("Container $containerId Complete with task $taskId with Failed status code (${status.exitStatus})", status.diagnostics) |
| } |
| } |
| } |
| |
| if (jobManager.outOfActions) { |
| log.info("Finished all tasks successfully! Wow!") |
| jobManager.actionsCount() |
| stopApplication(FinalApplicationStatus.SUCCEEDED, "SUCCESS") |
| } else { |
| log.info("jobManager.registeredActions.size: ${jobManager.registeredActions.size}; completedContainersAndTaskIds.size: ${completedContainersAndTaskIds.size}") |
| } |
| } |
| |
| private fun stopApplication(finalApplicationStatus: FinalApplicationStatus, appMessage: String) { |
| |
| try { |
| rmClient.unregisterApplicationMaster(finalApplicationStatus, appMessage, null) |
| } catch (ex: YarnException) { |
| |
| log.error("Failed to unregister application", ex) |
| } catch (e: IOException) { |
| log.error("Failed to unregister application", e) |
| } |
| rmClient.stop() |
| nmClient.stop() |
| } |
| |
| companion object { |
| @JvmStatic |
| fun main(args: Array<String>) = AppMasterArgsParser().main(args) |
| |
| } |
| } |