blob: 0b341e2a539e9bc46b2c0e0f74844e87c1e809a5 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.livy.test
import java.util.concurrent.atomic.AtomicInteger
import java.util.regex.Pattern
import org.apache.hadoop.yarn.api.records.YarnApplicationState
import org.scalatest.concurrent.Eventually._
import org.scalatest.OptionValues._
import scala.concurrent.duration._
import scala.language.postfixOps
import org.apache.livy.rsc.RSCConf
import org.apache.livy.sessions._
import org.apache.livy.test.framework.{BaseIntegrationTestSuite, LivyRestClient}
class InteractiveIT extends BaseIntegrationTestSuite {
test("basic interactive session") {
withNewSession(Spark()) { s =>
s.run("val sparkVersion = sc.version").result().left.foreach(info(_))
s.run("1+1").verifyResult("res0: Int = 2")
s.run("""sc.getConf.get("spark.executor.instances")""").verifyResult("res1: String = 1")
s.run("val sql = new org.apache.spark.sql.SQLContext(sc)").verifyResult(
".*" + Pattern.quote(
"sql: org.apache.spark.sql.SQLContext = org.apache.spark.sql.SQLContext") + ".*")
s.run("abcde").verifyError(evalue = ".*?:[0-9]+: error: not found: value abcde.*")
s.run("throw new IllegalStateException()")
.verifyError(evalue = ".*java\\.lang\\.IllegalStateException.*")
// Verify Livy internal configurations are not exposed.
// TODO separate all these checks to different sub tests after merging new IT code.
s.run("""sc.getConf.getAll.exists(_._1.startsWith("spark.__livy__."))""")
.verifyResult(".*false")
s.run("""sys.props.exists(_._1.startsWith("spark.__livy__."))""").verifyResult(".*false")
s.run("""val str = "str"""")
s.complete("str.", "scala", 4).verifyContaining(List("compare", "contains"))
s.complete("str2.", "scala", 5).verifyNone()
// Make sure appInfo is reported correctly.
val state = s.snapshot()
state.appInfo.driverLogUrl.value should include ("containerlogs")
state.appInfo.sparkUiUrl.value should startWith ("http")
// Stop session and verify the YARN app state is finished.
// This is important because if YARN app state is killed, Spark history is not archived.
val appId = s.appId()
s.stop()
val appReport = cluster.yarnClient.getApplicationReport(appId)
appReport.getYarnApplicationState() shouldEqual YarnApplicationState.FINISHED
}
}
pytest("pyspark interactive session") {
withNewSession(PySpark()) { s =>
s.run("1+1").verifyResult("2")
s.run("sqlContext").verifyResult(startsWith("<pyspark.sql.context.HiveContext"))
s.run("sc.parallelize(range(100)).map(lambda x: x * 2).reduce(lambda x, y: x + y)")
.verifyResult("9900")
s.run("from pyspark.sql.types import Row").verifyResult("")
s.run("x = [Row(age=1, name=u'a'), Row(age=2, name=u'b'), Row(age=3, name=u'c')]")
.verifyResult("")
s.run("%table x").verifyResult(".*headers.*type.*name.*data.*")
s.run("abcde").verifyError(ename = "NameError", evalue = "name 'abcde' is not defined")
s.run("raise KeyError, 'foo'").verifyError(ename = "KeyError", evalue = "'foo'")
}
}
rtest("R interactive session") {
withNewSession(SparkR()) { s =>
// R's output sometimes includes the count of statements, which makes it annoying to test
// things. This helps a bit.
val curr = new AtomicInteger()
def count: Int = curr.incrementAndGet()
s.run("1+1").verifyResult(startsWith(s"[$count] 2"))
s.run("sqlContext <- sparkRSQL.init(sc)").verifyResult(null)
s.run("hiveContext <- sparkRHive.init(sc)").verifyResult(null)
s.run("""localDF <- data.frame(name=c("John", "Smith", "Sarah"), age=c(19, 23, 18))""")
.verifyResult(null)
s.run("df <- createDataFrame(sqlContext, localDF)").verifyResult(null)
s.run("printSchema(df)").verifyResult(literal(
"""|root
| |-- name: string (nullable = true)
| |-- age: double (nullable = true)""".stripMargin))
}
}
test("application kills session") {
withNewSession(Spark()) { s =>
s.runFatalStatement("System.exit(0)")
}
}
test("should kill RSCDriver if it doesn't respond to end session") {
val testConfName = s"${RSCConf.LIVY_SPARK_PREFIX}${RSCConf.Entry.TEST_STUCK_END_SESSION.key()}"
withNewSession(Spark(), Map(testConfName -> "true")) { s =>
val appId = s.appId()
s.stop()
val appReport = cluster.yarnClient.getApplicationReport(appId)
appReport.getYarnApplicationState() shouldBe YarnApplicationState.KILLED
}
}
test("should kill RSCDriver if it didn't register itself in time") {
val testConfName =
s"${RSCConf.LIVY_SPARK_PREFIX}${RSCConf.Entry.TEST_STUCK_START_DRIVER.key()}"
withNewSession(Spark(), Map(testConfName -> "true"), false) { s =>
eventually(timeout(2 minutes), interval(5 seconds)) {
val appId = s.appId()
appId should not be null
val appReport = cluster.yarnClient.getApplicationReport(appId)
appReport.getYarnApplicationState() shouldBe YarnApplicationState.KILLED
}
}
}
test("user jars are properly imported in Scala interactive sessions") {
// Include a popular Java library to test importing user jars.
val sparkConf = Map("spark.jars.packages" -> "org.codehaus.plexus:plexus-utils:3.0.24")
withNewSession(Spark(), sparkConf) { s =>
// Check is the library loaded in JVM in the proper class loader.
s.run("Thread.currentThread.getContextClassLoader.loadClass" +
"""("org.codehaus.plexus.util.FileUtils")""")
.verifyResult(".*Class\\[_\\] = class org.codehaus.plexus.util.FileUtils")
// Check does Scala interpreter see the library.
s.run("import org.codehaus.plexus.util._").verifyResult("import org.codehaus.plexus.util._")
// Check does SparkContext see classes defined by Scala interpreter.
s.run("case class Item(i: Int)").verifyResult("defined class Item")
s.run("val rdd = sc.parallelize(Array.fill(10){new Item(scala.util.Random.nextInt(1000))})")
.verifyResult("rdd.*")
s.run("rdd.count()").verifyResult(".*= 10")
}
}
test("heartbeat should kill expired session") {
// Set it to 2s because verifySessionIdle() is calling GET every second.
val heartbeatTimeout = Duration.create("2s")
withNewSession(Spark(), Map.empty, true, heartbeatTimeout.toSeconds.toInt) { s =>
// If the test reaches here, that means verifySessionIdle() is successfully keeping the
// session alive. Now verify heartbeat is killing expired session.
Thread.sleep(heartbeatTimeout.toMillis * 2)
s.verifySessionDoesNotExist()
}
}
test("recover interactive session") {
withNewSession(Spark()) { s =>
val stmt1 = s.run("1")
stmt1.verifyResult("res0: Int = 1")
restartLivy()
// Verify session still exists.
s.verifySessionIdle()
s.run("2").verifyResult("res1: Int = 2")
// Verify statement result is preserved.
stmt1.verifyResult("res0: Int = 1")
s.stop()
restartLivy()
// Verify deleted session doesn't show up after recovery.
s.verifySessionDoesNotExist()
// Verify new session doesn't reuse old session id.
withNewSession(Spark(), Map.empty, false) { s1 =>
s1.id should be > s.id
}
}
}
private def withNewSession[R] (
kind: Kind,
sparkConf: Map[String, String] = Map.empty,
waitForIdle: Boolean = true,
heartbeatTimeoutInSecond: Int = 0)
(f: (LivyRestClient#InteractiveSession) => R): R = {
withSession(livyClient.startSession(kind, sparkConf, heartbeatTimeoutInSecond)) { s =>
if (waitForIdle) {
s.verifySessionIdle()
}
f(s)
}
}
private def startsWith(result: String): String = Pattern.quote(result) + ".*"
private def literal(result: String): String = Pattern.quote(result)
}