blob: 4d614f41b9a0d2b0c07010d94bac16bb12067d14 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.livy.server.interactive
import java.net.URI
import javax.servlet.http.HttpServletRequest
import scala.collection.JavaConverters._
import scala.concurrent._
import scala.concurrent.duration._
import org.json4s.jackson.Json4sScalaModule
import org.scalatra._
import org.scalatra.servlet.FileUploadSupport
import org.apache.livy.{CompletionRequest, ExecuteRequest, JobHandle, LivyConf, Logging}
import org.apache.livy.client.common.HttpMessages
import org.apache.livy.client.common.HttpMessages._
import org.apache.livy.server.{AccessManager, SessionServlet}
import org.apache.livy.server.recovery.SessionStore
import org.apache.livy.sessions._
object InteractiveSessionServlet extends Logging
class InteractiveSessionServlet(
sessionManager: InteractiveSessionManager,
sessionStore: SessionStore,
livyConf: LivyConf,
accessManager: AccessManager)
extends SessionServlet(sessionManager, livyConf, accessManager)
with SessionHeartbeatNotifier[InteractiveSession, InteractiveRecoveryMetadata]
with FileUploadSupport
{
mapper.registerModule(new SessionKindModule())
.registerModule(new Json4sScalaModule())
override protected def createSession(req: HttpServletRequest): InteractiveSession = {
val createRequest = bodyAs[CreateInteractiveRequest](req)
val proxyUser = checkImpersonation(createRequest.proxyUser, req)
InteractiveSession.create(
sessionManager.nextId(),
remoteUser(req),
proxyUser,
livyConf,
createRequest,
sessionStore)
}
override protected[interactive] def clientSessionView(
session: InteractiveSession,
req: HttpServletRequest): Any = {
val logs =
if (hasViewAccess(session.owner, req)) {
Option(session.logLines())
.map { lines =>
val size = 10
val from = math.max(0, lines.length - size)
val until = from + size
lines.view(from, until)
}
.getOrElse(Nil)
} else {
Nil
}
new SessionInfo(session.id, session.appId.orNull, session.owner, session.proxyUser.orNull,
session.state.toString, session.kind.toString, session.appInfo.asJavaMap, logs.asJava)
}
post("/:id/stop") {
withModifyAccessSession { session =>
Await.ready(session.stop(), Duration.Inf)
NoContent()
}
}
post("/:id/interrupt") {
withModifyAccessSession { session =>
Await.ready(session.interrupt(), Duration.Inf)
Ok(Map("msg" -> "interrupted"))
}
}
get("/:id/statements") {
withViewAccessSession { session =>
val statements = session.statements
val from = params.get("from").map(_.toInt).getOrElse(0)
val size = params.get("size").map(_.toInt).getOrElse(statements.length)
Map(
"total_statements" -> statements.length,
"statements" -> statements.view(from, from + size)
)
}
}
val getStatement = get("/:id/statements/:statementId") {
withViewAccessSession { session =>
val statementId = params("statementId").toInt
session.getStatement(statementId).getOrElse(NotFound("Statement not found"))
}
}
jpost[ExecuteRequest]("/:id/statements") { req =>
withModifyAccessSession { session =>
val statement = session.executeStatement(req)
Created(statement,
headers = Map(
"Location" -> url(getStatement,
"id" -> session.id.toString,
"statementId" -> statement.id.toString)))
}
}
jpost[CompletionRequest]("/:id/completion") { req =>
withModifyAccessSession { session =>
val compl = session.completion(req)
Ok(Map("candidates" -> compl.candidates))
}
}
post("/:id/statements/:statementId/cancel") {
withModifyAccessSession { session =>
val statementId = params("statementId")
session.cancelStatement(statementId.toInt)
Ok(Map("msg" -> "canceled"))
}
}
// This endpoint is used by the client-http module to "connect" to an existing session and
// update its last activity time. It performs authorization checks to make sure the caller
// has access to the session, so even though it returns the same data, it behaves differently
// from get("/:id").
post("/:id/connect") {
withModifyAccessSession { session =>
session.recordActivity()
Ok(clientSessionView(session, request))
}
}
jpost[SerializedJob]("/:id/submit-job") { req =>
withModifyAccessSession { session =>
try {
require(req.job != null && req.job.length > 0, "no job provided.")
val jobId = session.submitJob(req.job, req.jobType)
Created(new JobStatus(jobId, JobHandle.State.SENT, null, null))
} catch {
case e: Throwable =>
throw e
}
}
}
jpost[SerializedJob]("/:id/run-job") { req =>
withModifyAccessSession { session =>
require(req.job != null && req.job.length > 0, "no job provided.")
val jobId = session.runJob(req.job, req.jobType)
Created(new JobStatus(jobId, JobHandle.State.SENT, null, null))
}
}
post("/:id/upload-jar") {
withModifyAccessSession { lsession =>
fileParams.get("jar") match {
case Some(file) =>
lsession.addJar(file.getInputStream, file.name)
case None =>
BadRequest("No jar sent!")
}
}
}
post("/:id/upload-pyfile") {
withModifyAccessSession { lsession =>
fileParams.get("file") match {
case Some(file) =>
lsession.addJar(file.getInputStream, file.name)
case None =>
BadRequest("No file sent!")
}
}
}
post("/:id/upload-file") {
withModifyAccessSession { lsession =>
fileParams.get("file") match {
case Some(file) =>
lsession.addFile(file.getInputStream, file.name)
case None =>
BadRequest("No file sent!")
}
}
}
jpost[AddResource]("/:id/add-jar") { req =>
withModifyAccessSession { lsession =>
addJarOrPyFile(req, lsession)
}
}
jpost[AddResource]("/:id/add-pyfile") { req =>
withModifyAccessSession { lsession =>
addJarOrPyFile(req, lsession)
}
}
jpost[AddResource]("/:id/add-file") { req =>
withModifyAccessSession { lsession =>
val uri = new URI(req.uri)
lsession.addFile(uri)
}
}
get("/:id/jobs/:jobid") {
withViewAccessSession { lsession =>
val jobId = params("jobid").toLong
Ok(lsession.jobStatus(jobId))
}
}
post("/:id/jobs/:jobid/cancel") {
withModifyAccessSession { lsession =>
val jobId = params("jobid").toLong
lsession.cancelJob(jobId)
}
}
private def addJarOrPyFile(req: HttpMessages.AddResource, session: InteractiveSession): Unit = {
val uri = new URI(req.uri)
session.addJar(uri)
}
}