blob: 267d588bf8977d7513b40f027168c82eaaad7e96 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.gearpump.experiments.yarn.client
import java.io.{File, IOException, OutputStreamWriter}
import java.net.InetAddress
import java.util.zip.ZipInputStream
import akka.actor.ActorSystem
import com.typesafe.config.{Config, ConfigValueFactory}
import org.apache.gearpump.cluster.ClusterConfig
import org.apache.gearpump.cluster.main.{ArgumentsParser, CLIOption, ParseResult}
import org.apache.gearpump.experiments.yarn.Constants
import org.apache.gearpump.experiments.yarn.Constants._
import org.apache.gearpump.experiments.yarn.appmaster.AppMasterCommand
import org.apache.gearpump.experiments.yarn.appmaster.YarnAppMaster.{ActiveConfig, GetActiveConfig}
import org.apache.gearpump.experiments.yarn.glue.Records.{ApplicationId, Resource}
import org.apache.gearpump.experiments.yarn.glue.{FileSystem, YarnClient, YarnConfig}
import org.apache.gearpump.util.ActorUtil.askActor
import org.apache.gearpump.util.{AkkaApp, LogUtil, Util}
import org.slf4j.Logger
import scala.concurrent.duration.Duration
import scala.concurrent.{Await, Future}
/**
* Launch Gearpump on YARN
*/
class LaunchCluster(
akka: Config,
yarnConf: YarnConfig,
yarnClient: YarnClient,
fs: FileSystem,
actorSystem: ActorSystem,
appMasterResolver: AppMasterResolver,
version: String = Util.version) {
import org.apache.gearpump.experiments.yarn.Constants._
private implicit val dispatcher = actorSystem.dispatcher
private val LOG: Logger = LogUtil.getLogger(getClass)
private val host = InetAddress.getLocalHost.getHostName
private val queue = akka.getString(APPMASTER_QUEUE)
private val memory = akka.getString(APPMASTER_MEMORY).toInt
private val vcore = akka.getString(APPMASTER_VCORES).toInt
def submit(appName: String, packagePath: String): ApplicationId = {
LOG.info("Starting AM")
// First step, check the version, to make sure local version matches remote version
if (!packagePath.endsWith(".zip")) {
throw new IOException(s"YarnClient only support .zip distribution package," +
s" now it is ${packagePath}. Please download the zip " +
"package from website or use sbt assembly packArchiveZip to build one.")
}
if (!fs.exists(packagePath)) {
throw new IOException(s"Cannot find package ${packagePath} on HDFS ${fs.name}. ")
}
val rootEntry = rootEntryPath(zip = packagePath)
if (!rootEntry.contains(version)) {
throw new IOException(s"Check version failed! Local gearpump binary" +
s" version $version doesn't match with remote path $packagePath")
}
val resource = Resource.newInstance(memory, vcore)
val appId = yarnClient.createApplication
// uploads the configs to HDFS home directory of current user.
val configPath = uploadConfigToHDFS(appId)
val command = AppMasterCommand(akka, rootEntry, Array(s"-conf $configPath",
s"-package $packagePath"))
yarnClient.submit(appName, appId, command.get, resource, queue, packagePath, configPath)
LOG.info("Waiting application to finish...")
val report = yarnClient.awaitApplication(appId, LaunchCluster.TIMEOUT_MILLISECONDS)
LOG.info(s"Application $appId finished with state ${report.getYarnApplicationState} " +
s"at ${report.getFinishTime}, info: ${report.getDiagnostics}")
// scalastyle:off println
Console.println("================================================")
Console.println("==Application Id: " + appId)
// scalastyle:on println
appId
}
def saveConfig(appId: ApplicationId, output: String): Future[File] = {
LOG.info(s"Trying to download active configuration to output path: " + output)
LOG.info(s"Resolving YarnAppMaster ActorRef for application " + appId)
val appMaster = appMasterResolver.resolve(appId)
LOG.info(s"appMaster=${appMaster.path} host=$host")
val future = askActor[ActiveConfig](appMaster, GetActiveConfig(host)).map(_.config)
future.map { config =>
val out = new File(output)
ClusterConfig.saveConfig(config, out)
out
}
}
private def uploadConfigToHDFS(appId: ApplicationId): String = {
// Uses personal home directory so that it will not conflict with other users
// conf path pattern: /user/<userid>/.gearpump_application_<timestamp>_<id>/conf
val confDir = s"${fs.getHomeDirectory}/.gearpump_${appId}/conf/"
LOG.info(s"Uploading configuration files to remote HDFS(under $confDir)...")
// Copies config from local to remote.
val remoteConfFile = s"$confDir/gear.conf"
var out = fs.create(remoteConfFile)
var writer = new OutputStreamWriter(out)
val cleanedConfig = ClusterConfig.filterOutDefaultConfig(akka)
writer.write(cleanedConfig.root().render())
writer.close()
// Saves yarn-site.xml to remote
val yarn_site_xml = s"$confDir/yarn-site.xml"
out = fs.create(yarn_site_xml)
writer = new OutputStreamWriter(out)
yarnConf.writeXml(writer)
writer.close()
// Saves log4j.properties to remote
val log4j_properties = s"$confDir/log4j.properties"
val log4j = LogUtil.loadConfiguration
out = fs.create(log4j_properties)
writer = new OutputStreamWriter(out)
log4j.store(writer, "gearpump on yarn")
writer.close()
confDir.toString
}
private def rootEntryPath(zip: String): String = {
val stream = new ZipInputStream(fs.open(zip))
val entry = stream.getNextEntry()
val name = entry.getName
name.substring(0, entry.getName.indexOf("/"))
}
}
object LaunchCluster extends AkkaApp with ArgumentsParser {
val PACKAGE = "package"
val NAME = "name"
val VERBOSE = "verbose"
val OUTPUT = "output"
override protected def akkaConfig: Config = {
ClusterConfig.default()
}
override val options: Array[(String, CLIOption[Any])] = Array(
PACKAGE -> CLIOption[String]("<Please specify the gearpump.zip package path on HDFS. " +
"If not specified, we will use default value /user/gearpump/gearpump.zip>", required = false),
NAME -> CLIOption[String]("<Application name showed in YARN>", required = false,
defaultValue = Some("Gearpump")),
VERBOSE -> CLIOption("<print verbose log on console>", required = false,
defaultValue = Some(false)),
OUTPUT -> CLIOption("<output path for configuration file>", required = false,
defaultValue = None)
)
private val TIMEOUT_MILLISECONDS = 30 * 1000
override def main(inputAkkaConf: Config, args: Array[String]): Unit = {
val parsed = parse(args)
if (parsed.getBoolean(VERBOSE)) {
LogUtil.verboseLogToConsole()
}
if (inputAkkaConf.hasPath(CONTAINER_USER)) {
val userName = inputAkkaConf.getString(CONTAINER_USER)
System.setProperty("HADOOP_USER_NAME", userName)
}
val yarnConfig = new YarnConfig()
val fs = new FileSystem(yarnConfig)
val yarnClient = new YarnClient(yarnConfig)
val akkaConf = updateConf(inputAkkaConf, parsed)
val actorSystem = ActorSystem("launchCluster", akkaConf)
val appMasterResolver = new AppMasterResolver(yarnClient, actorSystem)
val client = new LaunchCluster(akkaConf, yarnConfig, yarnClient, fs,
actorSystem, appMasterResolver)
val name = parsed.getString(NAME)
val appId = client.submit(name, akkaConf.getString(Constants.PACKAGE_PATH))
if (parsed.exists(OUTPUT)) {
import scala.concurrent.duration._
Await.result(client.saveConfig(appId, parsed.getString(OUTPUT)),
TIMEOUT_MILLISECONDS.milliseconds)
}
yarnClient.stop()
actorSystem.terminate()
Await.result(actorSystem.whenTerminated, Duration.Inf)
}
private def updateConf(akka: Config, parsed: ParseResult): Config = {
if (parsed.exists(PACKAGE)) {
akka.withValue(Constants.PACKAGE_PATH,
ConfigValueFactory.fromAnyRef(parsed.getString(PACKAGE)))
} else {
akka
}
}
}