blob: d34c1c0e8e838d6b81eb7aa137745424a933f969 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.livy.thriftserver
import scala.collection.JavaConverters._
import org.apache.hadoop.hive.conf.HiveConf
import org.apache.hive.service.server.HiveServer2
import org.scalatra.ScalatraServlet
import org.apache.livy.{LivyConf, Logging}
import org.apache.livy.server.AccessManager
import org.apache.livy.server.interactive.InteractiveSession
import org.apache.livy.server.recovery.SessionStore
import org.apache.livy.sessions.InteractiveSessionManager
/**
* The main entry point for the Livy thrift server leveraging HiveServer2. Starts up a
* `HiveThriftServer2` thrift server.
*/
object LivyThriftServer extends Logging {
// Visible for testing
private[thriftserver] var thriftServerThread: Thread = _
private var thriftServer: LivyThriftServer = _
private def hiveConf(livyConf: LivyConf): HiveConf = {
val conf = new HiveConf()
// Remove all configs coming from hive-site.xml which may be in the classpath for the Spark
// applications to run.
conf.getAllProperties.asScala.filter(_._1.startsWith("hive.")).foreach { case (key, _) =>
conf.unset(key)
}
livyConf.asScala.foreach {
case nameAndValue if nameAndValue.getKey.startsWith("livy.hive") =>
conf.set(nameAndValue.getKey.stripPrefix("livy."), nameAndValue.getValue)
case _ => // Ignore
}
conf
}
def start(
livyConf: LivyConf,
livySessionManager: InteractiveSessionManager,
sessionStore: SessionStore,
accessManager: AccessManager): Unit = synchronized {
if (thriftServerThread == null) {
info("Starting LivyThriftServer")
val runThriftServer = new Runnable {
override def run(): Unit = {
try {
thriftServer = new LivyThriftServer(
livyConf,
livySessionManager,
sessionStore,
accessManager)
thriftServer.init(hiveConf(livyConf))
thriftServer.start()
info("LivyThriftServer started")
} catch {
case e: Exception =>
error("Error starting LivyThriftServer", e)
}
}
}
thriftServerThread =
new Thread(new ThreadGroup("thriftserver"), runThriftServer, "Livy-Thriftserver")
thriftServerThread.start()
} else {
error("Livy Thriftserver is already started")
}
}
private[thriftserver] def getInstance: Option[LivyThriftServer] = {
Option(thriftServer)
}
// Used in testing
def stopServer(): Unit = {
if (thriftServerThread != null) {
thriftServerThread.join()
}
thriftServerThread = null
thriftServer.stop()
thriftServer = null
}
}
class LivyThriftServer(
private[thriftserver] val livyConf: LivyConf,
private[thriftserver] val livySessionManager: InteractiveSessionManager,
private[thriftserver] val sessionStore: SessionStore,
private val accessManager: AccessManager) extends HiveServer2 {
override def init(hiveConf: HiveConf): Unit = {
this.cliService = new LivyCLIService(this)
super.init(hiveConf)
}
private[thriftserver] def getSessionManager(): LivyThriftSessionManager = {
this.cliService.asInstanceOf[LivyCLIService].getSessionManager
}
def isAllowedToUse(user: String, session: InteractiveSession): Boolean = {
session.owner == user || accessManager.checkModifyPermissions(user)
}
}