blob: ba5b3e6e25368d21c78e97215df683d5067389c3 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.amaterasu.leader.yarn
import com.importre.crayon.bold
import org.apache.amaterasu.common.configuration.ClusterConfig
import org.apache.amaterasu.leader.common.launcher.AmaOpts
import org.apache.amaterasu.leader.common.execution.frameworks.FrameworkProvidersFactory
import org.apache.amaterasu.leader.common.utilities.MessagingClientUtil
import org.apache.curator.framework.CuratorFrameworkFactory
import org.apache.curator.framework.recipes.barriers.DistributedBarrier
import org.apache.curator.retry.ExponentialBackoffRetry
import org.apache.hadoop.fs.*
import org.apache.hadoop.security.UserGroupInformation
import org.apache.hadoop.yarn.api.ApplicationConstants
import org.apache.hadoop.yarn.api.records.*
import org.apache.hadoop.yarn.client.api.YarnClient
import org.apache.hadoop.yarn.client.api.YarnClientApplication
import org.apache.hadoop.yarn.conf.YarnConfiguration
import org.apache.hadoop.yarn.exceptions.YarnException
import org.apache.hadoop.yarn.util.Apps
import org.apache.hadoop.yarn.util.ConverterUtils
import org.apache.hadoop.yarn.util.Records
import org.apache.log4j.LogManager
import org.slf4j.LoggerFactory
import javax.jms.*
import java.io.File
import java.io.FileInputStream
import java.io.IOException
import java.util.*
import java.lang.System.exit
class Client {
private val conf = YarnConfiguration()
private var fs: FileSystem? = null
private lateinit var consumer: MessageConsumer
@Throws(IOException::class)
private fun setLocalResourceFromPath(path: Path): LocalResource {
val stat = fs!!.getFileStatus(path)
val fileResource = Records.newRecord(LocalResource::class.java)
fileResource.resource = ConverterUtils.getYarnUrlFromPath(path)
fileResource.size = stat.len
fileResource.timestamp = stat.modificationTime
fileResource.type = LocalResourceType.FILE
fileResource.visibility = LocalResourceVisibility.PUBLIC
return fileResource
}
@Throws(Exception::class)
fun run(opts: AmaOpts, args: Array<String>) {
LogManager.resetConfiguration()
val config = ClusterConfig()
config.load(FileInputStream(opts.home + "/amaterasu.properties"))
// Create yarnClient
val yarnClient = YarnClient.createYarnClient()
yarnClient.init(conf)
yarnClient.start()
// Create application via yarnClient
var app: YarnClientApplication? = null
try {
app = yarnClient.createApplication()
} catch (e: YarnException) {
LOGGER.error("Error initializing yarn application with yarn client.", e)
exit(1)
} catch (e: IOException) {
LOGGER.error("Error initializing yarn application with yarn client.", e)
exit(2)
}
// Setup jars on hdfs
try {
fs = FileSystem.get(conf)
} catch (e: IOException) {
LOGGER.error("Eror creating HDFS client isntance.", e)
exit(3)
}
val jarPath = Path(config.yarn().hdfsJarsPath())
val jarPathQualified = fs!!.makeQualified(jarPath)
val distPath = Path.mergePaths(jarPathQualified, Path("/dist/"))
val appContext = app!!.applicationSubmissionContext
var newId = ""
val newIdVal = appContext.applicationId.toString() + "-" + UUID.randomUUID().toString()
if (opts.jobId.isEmpty()) {
newId = "--new-job-id=$newIdVal"
}
val commands = listOf("env AMA_NODE=" + System.getenv("AMA_NODE") +
" env HADOOP_USER_NAME=" + UserGroupInformation.getCurrentUser().userName +
" \$JAVA_HOME/bin/java" +
" -Dscala.usejavacp=false" +
" -Xmx1G" +
" org.apache.amaterasu.leader.yarn.ApplicationMaster " +
joinStrings(args) +
newId +
" 1>" + ApplicationConstants.LOG_DIR_EXPANSION_VAR + "/stdout" +
" 2>" + ApplicationConstants.LOG_DIR_EXPANSION_VAR + "/stderr")
println("===> AM command ${commands[0]}".bold())
// Set up the container launch context for the application master
val amContainer = Records.newRecord(ContainerLaunchContext::class.java)
amContainer.commands = commands
// Setup local ama folder on hdfs.
try {
if (!fs!!.exists(jarPathQualified)) {
val home = File(opts.home)
fs!!.mkdirs(jarPathQualified)
for (f in home.listFiles()) {
fs!!.copyFromLocalFile(false, true, Path(f.absolutePath), jarPathQualified)
}
// setup frameworks
val frameworkFactory = FrameworkProvidersFactory(opts.env, config)
for (group in frameworkFactory.groups) {
val framework = frameworkFactory.getFramework(group)
for (file in framework.groupResources) {
if (file.exists())
file.let {
val target = Path.mergePaths(distPath, Path(it.path))
fs!!.copyFromLocalFile(false, true, Path(file.path), target)
}
}
}
}
} catch (e: IOException) {
println("===> error " + e.message + e.stackTrace)
LOGGER.error("Error uploading ama folder to HDFS.", e)
exit(3)
} catch (ne: NullPointerException) {
println("===> ne error " + ne.message)
LOGGER.error("No files in home dir.", ne)
exit(4)
}
// get version of build
val version = config.version()
// get local resources pointers that will be set on the master container env
val leaderJarPath = String.format("/bin/leader-%s-all.jar", version)
LOGGER.info("Leader Jar path is: {}", leaderJarPath)
val mergedPath = Path.mergePaths(jarPath, Path(leaderJarPath))
// System.out.println("===> path: " + jarPathQualified);
LOGGER.info("Leader merged jar path is: {}", mergedPath)
var propFile: LocalResource? = null
var log4jPropFile: LocalResource? = null
try {
propFile = setLocalResourceFromPath(Path.mergePaths(jarPath, Path("/amaterasu.properties")))
log4jPropFile = setLocalResourceFromPath(Path.mergePaths(jarPath, Path("/log4j.properties")))
} catch (e: IOException) {
LOGGER.error("Error initializing yarn local resources.", e)
exit(4)
}
// set local resource on master container
val localResources = HashMap<String, LocalResource>()
// making the bin folder's content available to the appMaster
val bin = fs!!.listFiles(Path.mergePaths(jarPath, Path("/bin")), true)
while (bin.hasNext()) {
val binFile = bin.next()
localResources[binFile.path.name] = setLocalResourceFromPath(binFile.path)
}
localResources["amaterasu.properties"] = propFile!!
localResources["log4j.properties"] = log4jPropFile!!
amContainer.localResources = localResources
// Setup CLASSPATH for ApplicationMaster
val appMasterEnv = HashMap<String, String>()
setupAppMasterEnv(appMasterEnv)
appMasterEnv["AMA_CONF_PATH"] = String.format("%s/amaterasu.properties", config.YARN().hdfsJarsPath())
amContainer.environment = appMasterEnv
// Set up resource type requirements for ApplicationMaster
val capability = Records.newRecord(Resource::class.java)
capability.memorySize = config.YARN().master().memoryMB().toLong()
capability.virtualCores = config.YARN().master().cores()
// Finally, set-up ApplicationSubmissionContext for the application
appContext.applicationName = "amaterasu-" + opts.name
appContext.amContainerSpec = amContainer
appContext.resource = capability
appContext.queue = config.YARN().queue()
appContext.priority = Priority.newInstance(1)
// Submit application
val appId = appContext.applicationId
LOGGER.info("Submitting application {}", appId)
try {
yarnClient.submitApplication(appContext)
} catch (e: YarnException) {
LOGGER.error("Error submitting application.", e)
exit(6)
} catch (e: IOException) {
LOGGER.error("Error submitting application.", e)
exit(7)
}
val zkClient = CuratorFrameworkFactory.newClient(config.zk(),
ExponentialBackoffRetry(1000, 3))
zkClient.start()
val reportBarrier = DistributedBarrier(zkClient, "/$newIdVal-report-barrier")
reportBarrier.setBarrier()
reportBarrier.waitOnBarrier()
val address = String(zkClient.data.forPath("/$newIdVal/broker"))
println("===> $address")
consumer = MessagingClientUtil.setupMessaging(address)
var appReport: ApplicationReport? = null
var appState: YarnApplicationState
do {
try {
appReport = yarnClient.getApplicationReport(appId)
} catch (e: YarnException) {
LOGGER.error("Error getting application report.", e)
exit(8)
} catch (e: IOException) {
LOGGER.error("Error getting application report.", e)
exit(9)
}
appState = appReport!!.yarnApplicationState
if (isAppFinished(appState)) {
exit(0)
break
}
//LOGGER.info("Application not finished ({})", appReport.getProgress());
try {
Thread.sleep(100)
} catch (e: InterruptedException) {
LOGGER.error("Interrupted while waiting for job completion.", e)
exit(137)
}
} while (!isAppFinished(appState))
LOGGER.info("Application {} finished with state {}-{} at {}", appId, appState, appReport!!.finalApplicationStatus, appReport.finishTime)
}
private fun isAppFinished(appState: YarnApplicationState): Boolean {
return appState == YarnApplicationState.FINISHED ||
appState == YarnApplicationState.KILLED ||
appState == YarnApplicationState.FAILED
}
private fun setupAppMasterEnv(appMasterEnv: Map<String, String>) {
Apps.addToEnvironment(appMasterEnv,
ApplicationConstants.Environment.CLASSPATH.name,
ApplicationConstants.Environment.PWD.`$`() + File.separator + "*", File.pathSeparator)
for (c in conf.getStrings(
YarnConfiguration.YARN_APPLICATION_CLASSPATH,
*YarnConfiguration.DEFAULT_YARN_APPLICATION_CLASSPATH)) {
Apps.addToEnvironment(appMasterEnv, ApplicationConstants.Environment.CLASSPATH.name,
c.trim { it <= ' ' }, File.pathSeparator)
}
}
companion object {
private val LOGGER = LoggerFactory.getLogger(Client::class.java)
@Throws(Exception::class)
@JvmStatic
fun main(args: Array<String>) = ClientArgsParser(args).main(args)
private fun joinStrings(str: Array<String>): String {
val builder = StringBuilder()
for (s in str) {
builder.append(s)
builder.append(" ")
}
return builder.toString()
}
}
}