/*
 * Licensed to the Apache Software Foundation (ASF) under one or more
 * contributor license agreements.  See the NOTICE file distributed with
 * this work for additional information regarding copyright ownership.
 * The ASF licenses this file to You under the Apache License, Version 2.0
 * (the "License"); you may not use this file except in compliance with
 * the License.  You may obtain a copy of the License at
 *
 *    http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */

package org.apache.livy.server.interactive

import java.net.URI
import javax.servlet.http.HttpServletRequest

import scala.collection.JavaConverters._
import scala.concurrent._
import scala.concurrent.duration._

import org.json4s.jackson.Json4sScalaModule
import org.scalatra._
import org.scalatra.servlet.FileUploadSupport

import org.apache.livy.{CompletionRequest, ExecuteRequest, JobHandle, LivyConf, Logging}
import org.apache.livy.client.common.HttpMessages
import org.apache.livy.client.common.HttpMessages._
import org.apache.livy.server.{AccessManager, SessionServlet}
import org.apache.livy.server.recovery.SessionStore
import org.apache.livy.sessions._

object InteractiveSessionServlet extends Logging

class InteractiveSessionServlet(
    sessionManager: InteractiveSessionManager,
    sessionStore: SessionStore,
    livyConf: LivyConf,
    accessManager: AccessManager)
  extends SessionServlet(sessionManager, livyConf, accessManager)
  with SessionHeartbeatNotifier[InteractiveSession, InteractiveRecoveryMetadata]
  with FileUploadSupport
{

  mapper.registerModule(new SessionKindModule())
    .registerModule(new Json4sScalaModule())

  override protected def createSession(req: HttpServletRequest): InteractiveSession = {
    val createRequest = bodyAs[CreateInteractiveRequest](req)
    val proxyUser = checkImpersonation(createRequest.proxyUser, req)
    InteractiveSession.create(
      sessionManager.nextId(),
      remoteUser(req),
      proxyUser,
      livyConf,
      createRequest,
      sessionStore)
  }

  override protected[interactive] def clientSessionView(
      session: InteractiveSession,
      req: HttpServletRequest): Any = {
    val logs =
      if (hasViewAccess(session.owner, req)) {
        Option(session.logLines())
          .map { lines =>
            val size = 10
            val from = math.max(0, lines.length - size)
            val until = from + size

            lines.view(from, until)
          }
          .getOrElse(Nil)
      } else {
        Nil
      }

    new SessionInfo(session.id, session.appId.orNull, session.owner, session.proxyUser.orNull,
      session.state.toString, session.kind.toString, session.appInfo.asJavaMap, logs.asJava)
  }

  post("/:id/stop") {
    withModifyAccessSession { session =>
      Await.ready(session.stop(), Duration.Inf)
      NoContent()
    }
  }

  post("/:id/interrupt") {
    withModifyAccessSession { session =>
      Await.ready(session.interrupt(), Duration.Inf)
      Ok(Map("msg" -> "interrupted"))
    }
  }

  get("/:id/statements") {
    withViewAccessSession { session =>
      val statements = session.statements
      val from = params.get("from").map(_.toInt).getOrElse(0)
      val size = params.get("size").map(_.toInt).getOrElse(statements.length)

      Map(
        "total_statements" -> statements.length,
        "statements" -> statements.view(from, from + size)
      )
    }
  }

  val getStatement = get("/:id/statements/:statementId") {
    withViewAccessSession { session =>
      val statementId = params("statementId").toInt

      session.getStatement(statementId).getOrElse(NotFound("Statement not found"))
    }
  }

  jpost[ExecuteRequest]("/:id/statements") { req =>
    withModifyAccessSession { session =>
      val statement = session.executeStatement(req)

      Created(statement,
        headers = Map(
          "Location" -> url(getStatement,
            "id" -> session.id.toString,
            "statementId" -> statement.id.toString)))
    }
  }

  jpost[CompletionRequest]("/:id/completion") { req =>
    withModifyAccessSession { session =>
      val compl = session.completion(req)
      Ok(Map("candidates" -> compl.candidates))
    }
  }

  post("/:id/statements/:statementId/cancel") {
    withModifyAccessSession { session =>
      val statementId = params("statementId")
      session.cancelStatement(statementId.toInt)
      Ok(Map("msg" -> "canceled"))
    }
  }
  // This endpoint is used by the client-http module to "connect" to an existing session and
  // update its last activity time. It performs authorization checks to make sure the caller
  // has access to the session, so even though it returns the same data, it behaves differently
  // from get("/:id").
  post("/:id/connect") {
    withModifyAccessSession { session =>
      session.recordActivity()
      Ok(clientSessionView(session, request))
    }
  }

  jpost[SerializedJob]("/:id/submit-job") { req =>
    withModifyAccessSession { session =>
      try {
      require(req.job != null && req.job.length > 0, "no job provided.")
      val jobId = session.submitJob(req.job, req.jobType)
      Created(new JobStatus(jobId, JobHandle.State.SENT, null, null))
      } catch {
        case e: Throwable =>
        throw e
      }
    }
  }

  jpost[SerializedJob]("/:id/run-job") { req =>
    withModifyAccessSession { session =>
      require(req.job != null && req.job.length > 0, "no job provided.")
      val jobId = session.runJob(req.job, req.jobType)
      Created(new JobStatus(jobId, JobHandle.State.SENT, null, null))
    }
  }

  post("/:id/upload-jar") {
    withModifyAccessSession { lsession =>
      fileParams.get("jar") match {
        case Some(file) =>
          lsession.addJar(file.getInputStream, file.name)
        case None =>
          BadRequest("No jar sent!")
      }
    }
  }

  post("/:id/upload-pyfile") {
    withModifyAccessSession { lsession =>
      fileParams.get("file") match {
        case Some(file) =>
          lsession.addJar(file.getInputStream, file.name)
        case None =>
          BadRequest("No file sent!")
      }
    }
  }

  post("/:id/upload-file") {
    withModifyAccessSession { lsession =>
      fileParams.get("file") match {
        case Some(file) =>
          lsession.addFile(file.getInputStream, file.name)
        case None =>
          BadRequest("No file sent!")
      }
    }
  }

  jpost[AddResource]("/:id/add-jar") { req =>
    withModifyAccessSession { lsession =>
      addJarOrPyFile(req, lsession)
    }
  }

  jpost[AddResource]("/:id/add-pyfile") { req =>
    withModifyAccessSession { lsession =>
      addJarOrPyFile(req, lsession)
    }
  }

  jpost[AddResource]("/:id/add-file") { req =>
    withModifyAccessSession { lsession =>
      val uri = new URI(req.uri)
      lsession.addFile(uri)
    }
  }

  get("/:id/jobs/:jobid") {
    withViewAccessSession { lsession =>
      val jobId = params("jobid").toLong
      Ok(lsession.jobStatus(jobId))
    }
  }

  post("/:id/jobs/:jobid/cancel") {
    withModifyAccessSession { lsession =>
      val jobId = params("jobid").toLong
      lsession.cancelJob(jobId)
    }
  }

  private def addJarOrPyFile(req: HttpMessages.AddResource, session: InteractiveSession): Unit = {
    val uri = new URI(req.uri)
    session.addJar(uri)
  }
}
