blob: 0b547798be974caab674edb0660dd98eedf638bd [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.livy.sessions
import java.io.InputStream
import java.net.{URI, URISyntaxException}
import java.security.PrivilegedExceptionAction
import java.util.UUID
import java.util.concurrent.TimeUnit
import scala.concurrent.{ExecutionContext, Future}
import org.apache.hadoop.fs.{FileSystem, Path}
import org.apache.hadoop.fs.permission.FsPermission
import org.apache.hadoop.security.UserGroupInformation
import org.apache.livy.{LivyConf, Logging, Utils}
import org.apache.livy.utils.AppInfo
object Session {
trait RecoveryMetadata { val id: Int }
lazy val configBlackList: Set[String] = {
val url = getClass.getResource("/spark-blacklist.conf")
if (url != null) Utils.loadProperties(url).keySet else Set()
}
/**
* Validates and prepares a user-provided configuration for submission.
*
* - Verifies that no blacklisted configurations are provided.
* - Merges file lists in the configuration with the explicit lists provided in the request
* - Resolve file URIs to make sure they reference the default FS
* - Verify that file URIs don't reference non-whitelisted local resources
*/
def prepareConf(
conf: Map[String, String],
jars: Seq[String],
files: Seq[String],
archives: Seq[String],
pyFiles: Seq[String],
livyConf: LivyConf): Map[String, String] = {
if (conf == null) {
return Map()
}
val errors = conf.keySet.filter(configBlackList.contains)
if (errors.nonEmpty) {
throw new IllegalArgumentException(
"Blacklisted configuration values in session config: " + errors.mkString(", "))
}
val confLists: Map[String, Seq[String]] = livyConf.sparkFileLists
.map { key => (key -> Nil) }.toMap
val userLists = confLists ++ Map(
LivyConf.SPARK_JARS -> jars,
LivyConf.SPARK_FILES -> files,
LivyConf.SPARK_ARCHIVES -> archives,
LivyConf.SPARK_PY_FILES -> pyFiles)
val merged = userLists.flatMap { case (key, list) =>
val confList = conf.get(key)
.map { list =>
resolveURIs(list.split("[, ]+").toSeq, livyConf)
}
.getOrElse(Nil)
val userList = resolveURIs(list, livyConf)
if (confList.nonEmpty || userList.nonEmpty) {
Some(key -> (userList ++ confList).mkString(","))
} else {
None
}
}
val masterConfList = Map(LivyConf.SPARK_MASTER -> livyConf.sparkMaster()) ++
livyConf.sparkDeployMode().map(LivyConf.SPARK_DEPLOY_MODE -> _).toMap
conf ++ masterConfList ++ merged
}
/**
* Prepends the value of the "fs.defaultFS" configuration to any URIs that do not have a
* scheme. URIs are required to at least be absolute paths.
*
* @throws IllegalArgumentException If an invalid URI is found in the given list.
*/
def resolveURIs(uris: Seq[String], livyConf: LivyConf): Seq[String] = {
val defaultFS = livyConf.hadoopConf.get("fs.defaultFS").stripSuffix("/")
uris.filter(_.nonEmpty).map { _uri =>
val uri = try {
new URI(_uri)
} catch {
case e: URISyntaxException => throw new IllegalArgumentException(e)
}
resolveURI(uri, livyConf).toString()
}
}
def resolveURI(uri: URI, livyConf: LivyConf): URI = {
val defaultFS = livyConf.hadoopConf.get("fs.defaultFS").stripSuffix("/")
val resolved =
if (uri.getScheme() == null) {
val pathWithSegment =
if (uri.getFragment() != null) uri.getPath() + '#' + uri.getFragment() else uri.getPath()
require(pathWithSegment.startsWith("/"), s"Path '${uri.getPath()}' is not absolute.")
new URI(defaultFS + pathWithSegment)
} else {
uri
}
if (resolved.getScheme() == "file") {
// Make sure the location is whitelisted before allowing local files to be added.
require(livyConf.localFsWhitelist.find(resolved.getPath().startsWith).isDefined,
s"Local path ${uri.getPath()} cannot be added to user sessions.")
}
resolved
}
}
abstract class Session(val id: Int, val owner: String, val livyConf: LivyConf)
extends Logging {
import Session._
protected implicit val executionContext = ExecutionContext.global
protected var _appId: Option[String] = None
private var _lastActivity = System.nanoTime()
// Directory where the session's staging files are created. The directory is only accessible
// to the session's effective user.
private var stagingDir: Path = null
def appId: Option[String] = _appId
var appInfo: AppInfo = AppInfo()
def lastActivity: Long = state match {
case SessionState.Error(time) => time
case SessionState.Dead(time) => time
case SessionState.Success(time) => time
case _ => _lastActivity
}
def logLines(): IndexedSeq[String]
def recordActivity(): Unit = {
_lastActivity = System.nanoTime()
}
def recoveryMetadata: RecoveryMetadata
def state: SessionState
def stop(): Future[Unit] = Future {
try {
info(s"Stopping $this...")
stopSession()
info(s"Stopped $this.")
} catch {
case e: Exception =>
warn(s"Error stopping session $id.", e)
}
try {
if (stagingDir != null) {
debug(s"Deleting session $id staging directory $stagingDir")
doAsOwner {
val fs = FileSystem.newInstance(livyConf.hadoopConf)
try {
fs.delete(stagingDir, true)
} finally {
fs.close()
}
}
}
} catch {
case e: Exception =>
warn(s"Error cleaning up session $id staging dir.", e)
}
}
override def toString(): String = s"${this.getClass.getSimpleName} $id"
protected def stopSession(): Unit
protected val proxyUser: Option[String]
protected def doAsOwner[T](fn: => T): T = {
val user = proxyUser.getOrElse(owner)
if (user != null) {
val ugi = if (UserGroupInformation.isSecurityEnabled) {
if (livyConf.getBoolean(LivyConf.IMPERSONATION_ENABLED)) {
UserGroupInformation.createProxyUser(user, UserGroupInformation.getCurrentUser())
} else {
UserGroupInformation.getCurrentUser()
}
} else {
UserGroupInformation.createRemoteUser(user)
}
ugi.doAs(new PrivilegedExceptionAction[T] {
override def run(): T = fn
})
} else {
fn
}
}
protected def copyResourceToHDFS(dataStream: InputStream, name: String): URI = doAsOwner {
val fs = FileSystem.newInstance(livyConf.hadoopConf)
try {
val filePath = new Path(getStagingDir(fs), name)
debug(s"Uploading user file to $filePath")
val outFile = fs.create(filePath, true)
val buffer = new Array[Byte](512 * 1024)
var read = -1
try {
while ({read = dataStream.read(buffer); read != -1}) {
outFile.write(buffer, 0, read)
}
} finally {
outFile.close()
}
filePath.toUri
} finally {
fs.close()
}
}
private def getStagingDir(fs: FileSystem): Path = synchronized {
if (stagingDir == null) {
val stagingRoot = Option(livyConf.get(LivyConf.SESSION_STAGING_DIR)).getOrElse {
new Path(fs.getHomeDirectory(), ".livy-sessions").toString()
}
val sessionDir = new Path(stagingRoot, UUID.randomUUID().toString())
fs.mkdirs(sessionDir)
fs.setPermission(sessionDir, new FsPermission("700"))
stagingDir = sessionDir
debug(s"Session $id staging directory is $stagingDir")
}
stagingDir
}
}