blob: d43125d254ced094960c5ba0209adb96171363b7 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.livy.utils
import java.util.ArrayList
import java.util.concurrent.{CountDownLatch, TimeUnit}
import java.util.concurrent.atomic.{AtomicBoolean, AtomicInteger}
import scala.collection.JavaConverters._
import scala.concurrent.duration._
import scala.language.postfixOps
import org.apache.hadoop.yarn.api.records._
import org.apache.hadoop.yarn.api.records.FinalApplicationStatus.UNDEFINED
import org.apache.hadoop.yarn.api.records.YarnApplicationState._
import org.apache.hadoop.yarn.client.api.YarnClient
import org.apache.hadoop.yarn.exceptions.ApplicationAttemptNotFoundException
import org.apache.hadoop.yarn.util.ConverterUtils
import org.mockito.Mockito._
import org.mockito.invocation.InvocationOnMock
import org.mockito.stubbing.Answer
import org.scalatest.concurrent.Eventually
import org.scalatest.FunSpec
import org.scalatest.mock.MockitoSugar.mock
import org.apache.livy.{LivyBaseUnitTestSuite, LivyConf, Utils}
import org.apache.livy.utils.SparkApp._
class SparkYarnAppSpec extends FunSpec with LivyBaseUnitTestSuite {
private def cleanupThread(t: Thread)(f: => Unit) = {
try { f } finally { t.interrupt() }
}
private def mockSleep(ms: Long) = {
Thread.`yield`()
}
describe("SparkYarnApp") {
val TEST_TIMEOUT = 30 seconds
val appId = ConverterUtils.toApplicationId("application_1467912463905_0021")
val appIdOption = Some(appId.toString)
val appTag = "fakeTag"
val livyConf = new LivyConf()
livyConf.set(LivyConf.YARN_APP_LOOKUP_TIMEOUT, "30s")
it("should poll YARN state and terminate") {
Clock.withSleepMethod(mockSleep) {
val mockYarnClient = mock[YarnClient]
val mockAppListener = mock[SparkAppListener]
val mockAppReport = mock[ApplicationReport]
when(mockAppReport.getApplicationId).thenReturn(appId)
// Simulate YARN app state progression.
val applicationStateList = List(
ACCEPTED,
RUNNING,
FINISHED
)
val finalApplicationStatusList = List(
FinalApplicationStatus.UNDEFINED,
FinalApplicationStatus.UNDEFINED,
FinalApplicationStatus.SUCCEEDED
)
val stateIndex = new AtomicInteger(-1)
when(mockAppReport.getYarnApplicationState).thenAnswer(
// get and increment
new Answer[YarnApplicationState] {
override def answer(invocationOnMock: InvocationOnMock): YarnApplicationState = {
stateIndex.incrementAndGet match {
case i if i < applicationStateList.size =>
applicationStateList(i)
case _ =>
applicationStateList.last
}
}
}
)
when(mockAppReport.getFinalApplicationStatus).thenAnswer(
new Answer[FinalApplicationStatus] {
override def answer(invocationOnMock: InvocationOnMock): FinalApplicationStatus = {
// do not increment here, only get
stateIndex.get match {
case i if i < applicationStateList.size =>
finalApplicationStatusList(i)
case _ =>
finalApplicationStatusList.last
}
}
}
)
when(mockYarnClient.getApplicationReport(appId)).thenReturn(mockAppReport)
val app = new SparkYarnApp(
appTag,
appIdOption,
None,
Some(mockAppListener),
livyConf,
mockYarnClient)
cleanupThread(app.yarnAppMonitorThread) {
app.yarnAppMonitorThread.join(TEST_TIMEOUT.toMillis)
assert(!app.yarnAppMonitorThread.isAlive,
"YarnAppMonitorThread should terminate after YARN app is finished.")
verify(mockYarnClient, atLeast(1)).getApplicationReport(appId)
verify(mockAppListener).stateChanged(State.STARTING, State.RUNNING)
verify(mockAppListener).stateChanged(State.RUNNING, State.FINISHED)
}
}
}
it("should kill yarn app") {
Clock.withSleepMethod(mockSleep) {
val diag = "DIAG"
val mockYarnClient = mock[YarnClient]
val mockAppReport = mock[ApplicationReport]
when(mockAppReport.getApplicationId).thenReturn(appId)
when(mockAppReport.getDiagnostics).thenReturn(diag)
when(mockAppReport.getFinalApplicationStatus).thenReturn(FinalApplicationStatus.SUCCEEDED)
var appKilled = false
when(mockAppReport.getYarnApplicationState).thenAnswer(new Answer[YarnApplicationState]() {
override def answer(invocation: InvocationOnMock): YarnApplicationState = {
if (!appKilled) {
RUNNING
} else {
KILLED
}
}
})
when(mockYarnClient.getApplicationReport(appId)).thenReturn(mockAppReport)
val app = new SparkYarnApp(appTag, appIdOption, None, None, livyConf, mockYarnClient)
Utils.waitUntil({ () => app.isRunning }, Duration(10, TimeUnit.SECONDS))
cleanupThread(app.yarnAppMonitorThread) {
app.kill()
appKilled = true
app.yarnAppMonitorThread.join(TEST_TIMEOUT.toMillis)
assert(!app.yarnAppMonitorThread.isAlive,
"YarnAppMonitorThread should terminate after YARN app is finished.")
verify(mockYarnClient, atLeast(1)).getApplicationReport(appId)
verify(mockYarnClient).killApplication(appId)
assert(app.log().mkString.contains(diag))
}
}
}
it("should return spark-submit log") {
Clock.withSleepMethod(mockSleep) {
val mockYarnClient = mock[YarnClient]
val mockSparkSubmit = mock[LineBufferedProcess]
val sparkSubmitInfoLog = IndexedSeq("SPARK-SUBMIT", "LOG")
val sparkSubmitErrorLog = IndexedSeq("SPARK-SUBMIT", "error log")
val sparkSubmitLog = ("stdout: " +: sparkSubmitInfoLog) ++
("\nstderr: " +: sparkSubmitErrorLog) :+ "\nYARN Diagnostics: "
when(mockSparkSubmit.inputLines).thenReturn(sparkSubmitInfoLog)
when(mockSparkSubmit.errorLines).thenReturn(sparkSubmitErrorLog)
val waitForCalledLatch = new CountDownLatch(1)
when(mockSparkSubmit.waitFor()).thenAnswer(new Answer[Int]() {
override def answer(invocation: InvocationOnMock): Int = {
waitForCalledLatch.countDown()
0
}
})
val mockAppReport = mock[ApplicationReport]
when(mockAppReport.getApplicationId).thenReturn(appId)
when(mockAppReport.getYarnApplicationState).thenReturn(YarnApplicationState.FINISHED)
when(mockAppReport.getDiagnostics).thenReturn(null)
when(mockYarnClient.getApplicationReport(appId)).thenReturn(mockAppReport)
val app = new SparkYarnApp(
appTag,
appIdOption,
Some(mockSparkSubmit),
None,
livyConf,
mockYarnClient)
cleanupThread(app.yarnAppMonitorThread) {
waitForCalledLatch.await(TEST_TIMEOUT.toMillis, TimeUnit.MILLISECONDS)
assert(app.log() == sparkSubmitLog, "Expect spark-submit log")
}
}
}
it("can kill spark-submit while it's running") {
Clock.withSleepMethod(mockSleep) {
val diag = "DIAG"
val livyConf = new LivyConf()
livyConf.set(LivyConf.YARN_APP_LOOKUP_TIMEOUT, "0")
val mockAppReport = mock[ApplicationReport]
when(mockAppReport.getApplicationId).thenReturn(appId)
when(mockAppReport.getDiagnostics).thenReturn(diag)
when(mockAppReport.getFinalApplicationStatus).thenReturn(FinalApplicationStatus.SUCCEEDED)
when(mockAppReport.getYarnApplicationState).thenReturn(RUNNING)
val mockYarnClient = mock[YarnClient]
when(mockYarnClient.getApplicationReport(appId)).thenReturn(mockAppReport)
val mockSparkSubmit = mock[LineBufferedProcess]
val sparkSubmitRunningLatch = new CountDownLatch(1)
// Simulate a running spark-submit
when(mockSparkSubmit.waitFor()).thenAnswer(new Answer[Int]() {
override def answer(invocation: InvocationOnMock): Int = {
sparkSubmitRunningLatch.await()
0
}
})
val app = new SparkYarnApp(
appTag,
appIdOption,
Some(mockSparkSubmit),
None,
livyConf,
mockYarnClient)
Eventually.eventually(Eventually.timeout(10 seconds), Eventually.interval(100 millis)) {
assert(app.isRunning)
cleanupThread(app.yarnAppMonitorThread) {
app.kill()
verify(mockSparkSubmit, times(1)).destroy()
sparkSubmitRunningLatch.countDown()
}
}
}
}
it("should end with state failed when spark submit start failed") {
Clock.withSleepMethod(mockSleep) {
val livyConf = new LivyConf()
val mockSparkSubmit = mock[LineBufferedProcess]
when(mockSparkSubmit.isAlive).thenReturn(false)
when(mockSparkSubmit.exitValue).thenReturn(-1)
val app = new SparkYarnApp(
appTag,
None,
Some(mockSparkSubmit),
None,
livyConf)
cleanupThread(app.yarnAppMonitorThread) {
app.yarnAppMonitorThread.join(TEST_TIMEOUT.toMillis)
assert(!app.yarnAppMonitorThread.isAlive,
"YarnAppMonitorThread should terminate after YARN app is finished.")
assert(app.state == SparkApp.State.FAILED,
"SparkYarnApp should end with state failed when spark submit start failed")
}
}
}
it("should map YARN state to SparkApp.State correctly") {
val app = new SparkYarnApp(appTag, appIdOption, None, None, livyConf)
cleanupThread(app.yarnAppMonitorThread) {
assert(app.mapYarnState(appId, NEW, UNDEFINED) == State.STARTING)
assert(app.mapYarnState(appId, NEW_SAVING, UNDEFINED) == State.STARTING)
assert(app.mapYarnState(appId, SUBMITTED, UNDEFINED) == State.STARTING)
assert(app.mapYarnState(appId, ACCEPTED, UNDEFINED) == State.STARTING)
assert(app.mapYarnState(appId, RUNNING, UNDEFINED) == State.RUNNING)
assert(
app.mapYarnState(appId, FINISHED, FinalApplicationStatus.SUCCEEDED) == State.FINISHED)
assert(app.mapYarnState(appId, FAILED, FinalApplicationStatus.FAILED) == State.FAILED)
assert(app.mapYarnState(appId, KILLED, FinalApplicationStatus.KILLED) == State.KILLED)
// none of the (state , finalStatus) combination below should happen
assert(app.mapYarnState(appId, FINISHED, UNDEFINED) == State.FAILED)
assert(app.mapYarnState(appId, FINISHED, FinalApplicationStatus.FAILED) == State.FAILED)
assert(app.mapYarnState(appId, FINISHED, FinalApplicationStatus.KILLED) == State.FAILED)
assert(app.mapYarnState(appId, FAILED, UNDEFINED) == State.FAILED)
assert(app.mapYarnState(appId, KILLED, UNDEFINED) == State.FAILED)
assert(app.mapYarnState(appId, FAILED, FinalApplicationStatus.SUCCEEDED) == State.FAILED)
assert(app.mapYarnState(appId, KILLED, FinalApplicationStatus.SUCCEEDED) == State.FAILED)
}
}
it("should get App Id") {
Clock.withSleepMethod(mockSleep) {
val mockYarnClient = mock[YarnClient]
val mockAppReport = mock[ApplicationReport]
when(mockAppReport.getApplicationTags).thenReturn(Set(appTag.toLowerCase).asJava)
when(mockAppReport.getApplicationId).thenReturn(appId)
when(mockAppReport.getFinalApplicationStatus).thenReturn(FinalApplicationStatus.SUCCEEDED)
when(mockAppReport.getYarnApplicationState).thenReturn(YarnApplicationState.FINISHED)
when(mockYarnClient.getApplicationReport(appId)).thenReturn(mockAppReport)
when(mockYarnClient.getApplications(Set("SPARK").asJava))
.thenReturn(List(mockAppReport).asJava)
val mockListener = mock[SparkAppListener]
val mockSparkSubmit = mock[LineBufferedProcess]
val app = new SparkYarnApp(
appTag, None, Some(mockSparkSubmit), Some(mockListener), livyConf, mockYarnClient)
cleanupThread(app.yarnAppMonitorThread) {
app.yarnAppMonitorThread.join(TEST_TIMEOUT.toMillis)
assert(!app.yarnAppMonitorThread.isAlive,
"YarnAppMonitorThread should terminate after YARN app is finished.")
verify(mockYarnClient, atLeast(1)).getApplicationReport(appId)
verify(mockListener).appIdKnown(appId.toString)
}
}
}
it("should expose driver log url and Spark UI url") {
Clock.withSleepMethod(mockSleep) {
val mockYarnClient = mock[YarnClient]
val driverLogUrl = "DRIVER LOG URL"
val sparkUiUrl = "SPARK UI URL"
val mockApplicationAttemptId = mock[ApplicationAttemptId]
val mockAppReport = mock[ApplicationReport]
when(mockAppReport.getApplicationId).thenReturn(appId)
when(mockAppReport.getFinalApplicationStatus).thenReturn(FinalApplicationStatus.SUCCEEDED)
when(mockAppReport.getTrackingUrl).thenReturn(sparkUiUrl)
when(mockAppReport.getCurrentApplicationAttemptId).thenReturn(mockApplicationAttemptId)
var done = false
when(mockAppReport.getYarnApplicationState).thenAnswer(new Answer[YarnApplicationState]() {
override def answer(invocation: InvocationOnMock): YarnApplicationState = {
if (!done) {
RUNNING
} else {
FINISHED
}
}
})
when(mockYarnClient.getApplicationReport(appId)).thenReturn(mockAppReport)
val mockAttemptReport = mock[ApplicationAttemptReport]
val mockContainerId = mock[ContainerId]
when(mockAttemptReport.getAMContainerId).thenReturn(mockContainerId)
when(mockYarnClient.getApplicationAttemptReport(mockApplicationAttemptId))
.thenReturn(mockAttemptReport)
val mockContainerReport = mock[ContainerReport]
when(mockYarnClient.getContainerReport(mockContainerId)).thenReturn(mockContainerReport)
// Block test until getLogUrl is called 10 times.
val getLogUrlCountDown = new CountDownLatch(10)
when(mockContainerReport.getLogUrl).thenAnswer(new Answer[String] {
override def answer(invocation: InvocationOnMock): String = {
getLogUrlCountDown.countDown()
driverLogUrl
}
})
val mockListener = mock[SparkAppListener]
val app = new SparkYarnApp(
appTag, appIdOption, None, Some(mockListener), livyConf, mockYarnClient)
cleanupThread(app.yarnAppMonitorThread) {
getLogUrlCountDown.await(TEST_TIMEOUT.length, TEST_TIMEOUT.unit)
done = true
app.yarnAppMonitorThread.join(TEST_TIMEOUT.toMillis)
assert(!app.yarnAppMonitorThread.isAlive,
"YarnAppMonitorThread should terminate after YARN app is finished.")
verify(mockYarnClient, atLeast(1)).getApplicationReport(appId)
verify(mockAppReport, atLeast(1)).getTrackingUrl()
verify(mockContainerReport, atLeast(1)).getLogUrl()
verify(mockListener).appIdKnown(appId.toString)
verify(mockListener).infoChanged(AppInfo(Some(driverLogUrl), Some(sparkUiUrl)))
}
}
}
it("should not die on YARN-4411") {
Clock.withSleepMethod(mockSleep) {
val mockYarnClient = mock[YarnClient]
// Block test until getApplicationReport is called 10 times.
val pollCountDown = new CountDownLatch(10)
when(mockYarnClient.getApplicationReport(appId)).thenAnswer(new Answer[ApplicationReport] {
override def answer(invocation: InvocationOnMock): ApplicationReport = {
pollCountDown.countDown()
throw new IllegalArgumentException("No enum constant " +
"org.apache.hadoop.yarn.api.records.YarnApplicationAttemptState.FINAL_SAVING")
}
})
val app = new SparkYarnApp(appTag, appIdOption, None, None, livyConf, mockYarnClient)
cleanupThread(app.yarnAppMonitorThread) {
pollCountDown.await(TEST_TIMEOUT.length, TEST_TIMEOUT.unit)
assert(app.state == SparkApp.State.STARTING)
app.state = SparkApp.State.FINISHED
app.yarnAppMonitorThread.join(TEST_TIMEOUT.toMillis)
}
}
}
it("should not die on ApplicationAttemptNotFoundException") {
Clock.withSleepMethod(mockSleep) {
val mockYarnClient = mock[YarnClient]
val mockAppReport = mock[ApplicationReport]
val mockApplicationAttemptId = mock[ApplicationAttemptId]
val done = new AtomicBoolean(false)
when(mockAppReport.getApplicationId).thenReturn(appId)
when(mockAppReport.getYarnApplicationState).thenAnswer(
new Answer[YarnApplicationState]() {
override def answer(invocation: InvocationOnMock): YarnApplicationState = {
if (done.get()) {
FINISHED
} else {
RUNNING
}
}
})
when(mockAppReport.getFinalApplicationStatus).thenAnswer(
new Answer[FinalApplicationStatus]() {
override def answer(invocation: InvocationOnMock): FinalApplicationStatus = {
if (done.get()) {
FinalApplicationStatus.SUCCEEDED
} else {
FinalApplicationStatus.UNDEFINED
}
}
})
when(mockAppReport.getCurrentApplicationAttemptId).thenReturn(mockApplicationAttemptId)
when(mockYarnClient.getApplicationReport(appId)).thenReturn(mockAppReport)
// Block test until getApplicationReport is called 10 times.
val pollCountDown = new CountDownLatch(10)
when(mockYarnClient.getApplicationAttemptReport(mockApplicationAttemptId)).thenAnswer(
new Answer[ApplicationReport] {
override def answer(invocation: InvocationOnMock): ApplicationReport = {
pollCountDown.countDown()
throw new ApplicationAttemptNotFoundException("unit test")
}
})
val app = new SparkYarnApp(appTag, appIdOption, None, None, livyConf, mockYarnClient)
cleanupThread(app.yarnAppMonitorThread) {
pollCountDown.await(TEST_TIMEOUT.length, TEST_TIMEOUT.unit)
assert(app.state == SparkApp.State.RUNNING)
done.set(true)
app.yarnAppMonitorThread.join(TEST_TIMEOUT.toMillis)
}
}
}
it("should delete leak app when timeout") {
Clock.withSleepMethod(mockSleep) {
livyConf.set(LivyConf.YARN_APP_LEAKAGE_CHECK_INTERVAL, "100ms")
livyConf.set(LivyConf.YARN_APP_LEAKAGE_CHECK_TIMEOUT, "1000ms")
val client = mock[YarnClient]
when(client.getApplications(SparkYarnApp.appType)).
thenReturn(new ArrayList[ApplicationReport]())
SparkYarnApp.init(livyConf, Some(client))
SparkYarnApp.leakedAppTags.clear()
SparkYarnApp.leakedAppTags.put("leakApp", System.currentTimeMillis())
Eventually.eventually(Eventually.timeout(TEST_TIMEOUT), Eventually.interval(100 millis)) {
assert(SparkYarnApp.leakedAppTags.size() == 0)
}
}
}
}
}