blob: 3cfc7725e66282441779241cdc257b068567f2b5 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.livy.test.framework
import java.io._
import java.nio.charset.Charset
import java.nio.file.{Files, Paths}
import javax.servlet.http.HttpServletResponse
import scala.concurrent.duration._
import scala.language.postfixOps
import com.ning.http.client.AsyncHttpClient
import org.apache.commons.io.FileUtils
import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.fs.Path
import org.apache.hadoop.hdfs.MiniDFSCluster
import org.apache.hadoop.yarn.conf.YarnConfiguration
import org.apache.hadoop.yarn.server.MiniYARNCluster
import org.apache.spark.launcher.SparkLauncher
import org.scalatest.concurrent.Eventually._
import org.apache.livy.{LivyConf, Logging}
import org.apache.livy.client.common.TestUtils
import org.apache.livy.server.LivyServer
private class MiniClusterConfig(val config: Map[String, String]) {
val nmCount = getInt("yarn.nm-count", 1)
val localDirCount = getInt("yarn.local-dir-count", 1)
val logDirCount = getInt("yarn.log-dir-count", 1)
val dnCount = getInt("hdfs.dn-count", 1)
private def getInt(key: String, default: Int): Int = {
config.get(key).map(_.toInt).getOrElse(default)
}
}
sealed trait MiniClusterUtils extends ClusterUtils {
protected def saveConfig(conf: Configuration, dest: File): Unit = {
val redacted = new Configuration(conf)
// This setting references a test class that is not available when using a real Spark
// installation, so remove it from client configs.
redacted.unset("net.topology.node.switch.mapping.impl")
val out = new FileOutputStream(dest)
try {
redacted.writeXml(out)
} finally {
out.close()
}
}
}
sealed abstract class MiniClusterBase extends MiniClusterUtils with Logging {
def main(args: Array[String]): Unit = {
val klass = getClass().getSimpleName()
info(s"$klass is starting up.")
val Array(configPath) = args
val config = {
val file = new File(s"$configPath/cluster.conf")
val props = loadProperties(file)
new MiniClusterConfig(props)
}
start(config, configPath)
info(s"$klass running.")
while (true) synchronized {
wait()
}
}
protected def start(config: MiniClusterConfig, configPath: String): Unit
}
object MiniHdfsMain extends MiniClusterBase {
override protected def start(config: MiniClusterConfig, configPath: String): Unit = {
val hadoopConf = new Configuration()
val hdfsCluster = new MiniDFSCluster.Builder(hadoopConf)
.numDataNodes(config.dnCount)
.format(true)
.waitSafeMode(true)
.build()
hdfsCluster.waitActive()
saveConfig(hadoopConf, new File(configPath + "/core-site.xml"))
}
}
object MiniYarnMain extends MiniClusterBase {
override protected def start(config: MiniClusterConfig, configPath: String): Unit = {
val baseConfig = new YarnConfiguration()
var yarnCluster = new MiniYARNCluster(getClass().getName(), config.nmCount,
config.localDirCount, config.logDirCount)
yarnCluster.init(baseConfig)
// Install a shutdown hook for stop the service and kill all running applications.
Runtime.getRuntime().addShutdownHook(new Thread() {
override def run(): Unit = yarnCluster.stop()
})
yarnCluster.start()
// Workaround for YARN-2642.
val yarnConfig = yarnCluster.getConfig()
eventually(timeout(30 seconds), interval(100 millis)) {
assert(yarnConfig.get(YarnConfiguration.RM_ADDRESS).split(":")(1) != "0",
"RM not up yes.")
}
info(s"RM address in configuration is ${yarnConfig.get(YarnConfiguration.RM_ADDRESS)}")
saveConfig(yarnConfig, new File(configPath + "/yarn-site.xml"))
}
}
object MiniLivyMain extends MiniClusterBase {
var livyUrl: Option[String] = None
def start(config: MiniClusterConfig, configPath: String): Unit = {
var livyConf = Map(
LivyConf.LIVY_SPARK_MASTER.key -> "yarn",
LivyConf.LIVY_SPARK_DEPLOY_MODE.key -> "cluster",
LivyConf.HEARTBEAT_WATCHDOG_INTERVAL.key -> "1s",
LivyConf.YARN_POLL_INTERVAL.key -> "500ms",
LivyConf.RECOVERY_MODE.key -> "recovery",
LivyConf.RECOVERY_STATE_STORE.key -> "filesystem",
LivyConf.RECOVERY_STATE_STORE_URL.key -> s"file://$configPath/state-store")
if (Cluster.isRunningOnTravis) {
livyConf ++= Map("livy.server.yarn.app-lookup-timeout" -> "2m")
}
saveProperties(livyConf, new File(configPath + "/livy.conf"))
val server = new LivyServer()
server.start()
server.livyConf.set(LivyConf.ENABLE_HIVE_CONTEXT, false)
// Write a serverUrl.conf file to the conf directory with the location of the Livy
// server. Do it atomically since it's used by MiniCluster to detect when the Livy server
// is up and ready.
eventually(timeout(30 seconds), interval(1 second)) {
val serverUrlConf = Map("livy.server.server-url" -> server.serverUrl())
saveProperties(serverUrlConf, new File(configPath + "/serverUrl.conf"))
}
}
}
private case class ProcessInfo(process: Process, logFile: File)
/**
* Cluster implementation that uses HDFS / YARN mini clusters running as sub-processes locally.
* Launching Livy through this mini cluster results in three child processes:
*
* - A HDFS mini cluster
* - A YARN mini cluster
* - The Livy server
*
* Each service will write its client configuration to a temporary directory managed by the
* framework, so that applications can connect to the services.
*
* TODO: add support for MiniKdc.
*/
class MiniCluster(config: Map[String, String]) extends Cluster with MiniClusterUtils with Logging {
private val tempDir = new File(s"${sys.props("java.io.tmpdir")}/livy-int-test")
private var sparkConfDir: File = _
private var _configDir: File = _
private var hdfs: Option[ProcessInfo] = None
private var yarn: Option[ProcessInfo] = None
private var livy: Option[ProcessInfo] = None
private var livyUrl: String = _
private var _hdfsScrathDir: Path = _
override def configDir(): File = _configDir
override def hdfsScratchDir(): Path = _hdfsScrathDir
override def doAsClusterUser[T](task: => T): T = task
// Explicitly remove the "test-lib" dependency from the classpath of child processes. We
// want tests to explicitly upload this jar when necessary, to test those code paths.
private val childClasspath = {
val cp = sys.props("java.class.path").split(File.pathSeparator)
val filtered = cp.filter { path => !new File(path).getName().startsWith("livy-test-lib-") }
assert(cp.size != filtered.size, "livy-test-lib jar not found in classpath!")
filtered.mkString(File.pathSeparator)
}
override def deploy(): Unit = {
if (tempDir.exists()) {
FileUtils.deleteQuietly(tempDir)
}
assert(tempDir.mkdir(), "Cannot create temp test dir.")
sparkConfDir = mkdir("spark-conf")
val sparkConf = Map(
"spark.executor.instances" -> "1",
"spark.scheduler.minRegisteredResourcesRatio" -> "0.0",
"spark.ui.enabled" -> "false",
SparkLauncher.DRIVER_MEMORY -> "512m",
SparkLauncher.EXECUTOR_MEMORY -> "512m",
SparkLauncher.DRIVER_EXTRA_JAVA_OPTIONS -> "-Dtest.appender=console",
SparkLauncher.EXECUTOR_EXTRA_JAVA_OPTIONS -> "-Dtest.appender=console"
)
saveProperties(sparkConf, new File(sparkConfDir, "spark-defaults.conf"))
_configDir = mkdir("hadoop-conf")
saveProperties(config, new File(configDir, "cluster.conf"))
hdfs = Some(start(MiniHdfsMain.getClass, new File(configDir, "core-site.xml")))
yarn = Some(start(MiniYarnMain.getClass, new File(configDir, "yarn-site.xml")))
runLivy()
_hdfsScrathDir = fs.makeQualified(new Path("/"))
}
override def cleanUp(): Unit = {
Seq(hdfs, yarn, livy).flatten.foreach(stop)
hdfs = None
yarn = None
livy = None
}
def runLivy(): Unit = {
assert(!livy.isDefined)
val confFile = new File(configDir, "serverUrl.conf")
val jacocoArgs = Option(TestUtils.getJacocoArgs())
.map { args =>
Seq(args, s"-Djacoco.args=$args")
}.getOrElse(Nil)
val localLivy = start(MiniLivyMain.getClass, confFile, extraJavaArgs = jacocoArgs)
val props = loadProperties(confFile)
livyUrl = props("livy.server.server-url")
// Wait until Livy server responds.
val httpClient = new AsyncHttpClient()
eventually(timeout(30 seconds), interval(1 second)) {
val res = httpClient.prepareGet(livyUrl + "/metrics").execute().get()
assert(res.getStatusCode() == HttpServletResponse.SC_OK)
}
livy = Some(localLivy)
}
def stopLivy(): Unit = {
assert(livy.isDefined)
livy.foreach(stop)
livyUrl = null
livy = None
}
def livyEndpoint: String = livyUrl
private def mkdir(name: String, parent: File = tempDir): File = {
val dir = new File(parent, name)
if (!dir.exists()) {
assert(dir.mkdir(), s"Failed to create directory $name.")
}
dir
}
private def start(
klass: Class[_],
configFile: File,
extraJavaArgs: Seq[String] = Nil): ProcessInfo = {
val simpleName = klass.getSimpleName().stripSuffix("$")
val procDir = mkdir(simpleName)
val procTmp = mkdir("tmp", parent = procDir)
// Before starting anything, clean up previous running sessions.
sys.process.Process(s"pkill -f $simpleName") !
val java = sys.props("java.home") + "/bin/java"
val cmd =
Seq(
sys.props("java.home") + "/bin/java",
"-Dtest.appender=console",
"-Djava.io.tmpdir=" + procTmp.getAbsolutePath(),
"-cp", childClasspath + File.pathSeparator + configDir.getAbsolutePath(),
"-XX:MaxPermSize=256m") ++
extraJavaArgs ++
Seq(
klass.getName().stripSuffix("$"),
configDir.getAbsolutePath())
val logFile = new File(procDir, "output.log")
val pb = new ProcessBuilder(cmd.toArray: _*)
.directory(procDir)
.redirectErrorStream(true)
.redirectOutput(ProcessBuilder.Redirect.appendTo(logFile))
pb.environment().put("LIVY_CONF_DIR", configDir.getAbsolutePath())
pb.environment().put("HADOOP_CONF_DIR", configDir.getAbsolutePath())
pb.environment().put("SPARK_CONF_DIR", sparkConfDir.getAbsolutePath())
pb.environment().put("SPARK_LOCAL_IP", "127.0.0.1")
val child = pb.start()
// Wait for the config file to show up before returning, so that dependent services
// can see the configuration. Exit early if process dies.
eventually(timeout(30 seconds), interval(100 millis)) {
assert(configFile.isFile(), s"$simpleName hasn't started yet.")
try {
val exitCode = child.exitValue()
throw new IOException(s"Child process exited unexpectedly (exit code $exitCode)")
} catch {
case _: IllegalThreadStateException => // Try again.
}
}
ProcessInfo(child, logFile)
}
private def stop(svc: ProcessInfo): Unit = {
svc.process.destroy()
svc.process.waitFor()
}
}