blob: de74ebe225f3a0e9ec3a15c36af0269e0acecf41 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.livy.server.batch
import java.io.FileWriter
import java.nio.file.{Files, Path}
import java.util.concurrent.TimeUnit
import javax.servlet.http.HttpServletRequest
import javax.servlet.http.HttpServletResponse._
import scala.concurrent.duration.Duration
import org.mockito.Mockito._
import org.scalatestplus.mockito.MockitoSugar.mock
import org.apache.livy.{LivyConf, Utils}
import org.apache.livy.server.{AccessManager, BaseSessionServletSpec}
import org.apache.livy.server.recovery.SessionStore
import org.apache.livy.sessions.{BatchSessionManager, SessionState}
import org.apache.livy.utils.AppInfo
class BatchServletSpec extends BaseSessionServletSpec[BatchSession, BatchRecoveryMetadata] {
val script: Path = {
val script = Files.createTempFile("livy-test", ".py")
script.toFile.deleteOnExit()
val writer = new FileWriter(script.toFile)
try {
writer.write(
"""
|print("hello world")
""".stripMargin)
} finally {
writer.close()
}
script
}
override def createServlet(): BatchSessionServlet = {
val livyConf = createConf()
val sessionStore = mock[SessionStore]
val accessManager = new AccessManager(livyConf)
new BatchSessionServlet(
new BatchSessionManager(livyConf, sessionStore, Some(Seq.empty)),
sessionStore,
livyConf,
accessManager)
}
def testShowSessionProperties(name: Option[String]): Unit = {
val id = 0
val state = SessionState.Running
val appId = "appid"
val owner = "owner"
val proxyUser = "proxyUser"
val appInfo = AppInfo(Some("DRIVER LOG URL"), Some("SPARK UI URL"))
val log = IndexedSeq[String]("log1", "log2")
val session = mock[BatchSession]
when(session.id).thenReturn(id)
when(session.name).thenReturn(name)
when(session.state).thenReturn(state)
when(session.appId).thenReturn(Some(appId))
when(session.appInfo).thenReturn(appInfo)
when(session.logLines()).thenReturn(log)
when(session.owner).thenReturn(owner)
when(session.proxyUser).thenReturn(Some(proxyUser))
val req = mock[HttpServletRequest]
val view = servlet.asInstanceOf[BatchSessionServlet].clientSessionView(session, req)
.asInstanceOf[BatchSessionView]
view.id shouldEqual id
view.name shouldEqual name
view.state shouldEqual state.toString
view.appId shouldEqual Some(appId)
view.owner shouldEqual owner
view.proxyUser shouldEqual Some(proxyUser)
view.appInfo shouldEqual appInfo
view.log shouldEqual log
}
describe("Batch Servlet") {
it("should create and tear down a batch") {
jget[Map[String, Any]]("/") { data =>
data("sessions") should equal (Seq())
}
val createRequest = new CreateBatchRequest()
createRequest.file = script.toString
createRequest.conf = Map("spark.driver.extraClassPath" -> sys.props("java.class.path"))
jpost[Map[String, Any]]("/", createRequest) { data =>
header("Location") should equal("/0")
data("id") should equal (0)
val batch = servlet.sessionManager.get(0)
batch should be (defined)
}
// Wait for the process to finish.
{
val batch = servlet.sessionManager.get(0).get
Utils.waitUntil({ () => !batch.state.isActive }, Duration(10, TimeUnit.SECONDS))
(batch.state match {
case SessionState.Success(_) => true
case _ => false
}) should be (true)
}
jget[Map[String, Any]]("/0") { data =>
data("id") should equal (0)
data("state") should equal ("success")
val batch = servlet.sessionManager.get(0)
batch should be (defined)
}
jget[Map[String, Any]]("/0/log?size=1000") { data =>
data("id") should equal (0)
data("log").asInstanceOf[Seq[String]] should contain ("hello world")
val batch = servlet.sessionManager.get(0)
batch should be (defined)
}
jdelete[Map[String, Any]]("/0") { data =>
data should equal (Map("msg" -> "deleted"))
val batch = servlet.sessionManager.get(0)
batch should not be defined
}
}
it("should respect config black list") {
val createRequest = new CreateBatchRequest()
createRequest.name = Some("TEST-BatchServletSpec-Session-0")
createRequest.file = script.toString
createRequest.conf = BLACKLISTED_CONFIG
jpost[Map[String, Any]]("/", createRequest, expectedStatus = SC_BAD_REQUEST) { _ => }
}
Seq(None, Some("TEST-batch-session"))
.foreach { name =>
it(s"should show session properties (name = $name)") {
testShowSessionProperties(name)
}
}
it("should fail session creation when max session creation is hit") {
val createRequest = new CreateBatchRequest()
createRequest.file = script.toString
createRequest.conf = Map("spark.driver.extraClassPath" -> sys.props("java.class.path"))
jpost[Map[String, Any]]("/", createRequest) { data =>
header("Location") should equal("/2")
data("id") should equal (2)
val batch = servlet.sessionManager.get(2)
batch should be (defined)
}
servlet.livyConf.set(LivyConf.SESSION_MAX_CREATION, 1)
jpost[Map[String, Any]]("/", createRequest, SC_BAD_REQUEST) { data => None }
jdelete[Map[String, Any]]("/2") { data =>
data should equal (Map("msg" -> "deleted"))
val batch = servlet.sessionManager.get(2)
batch should not be defined
}
}
}
}