blob: 52893546021f48fbc66b67d167a00e0d1aedf71b [file] [log] [blame]
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* See the License for the specific language governing permissions and
* limitations under the License.
package org.apache.livy.thriftserver
import java.util
import java.util.concurrent.{CancellationException, ExecutionException, TimeoutException, TimeUnit}
import scala.collection.JavaConverters._
import org.apache.hadoop.hive.common.log.ProgressMonitor
import org.apache.hadoop.hive.conf.HiveConf
import org.apache.hadoop.hive.conf.HiveConf.ConfVars
import org.apache.hadoop.hive.ql.parse.ParseUtils
import org.apache.hadoop.hive.shims.Utils
import org.apache.hive.service.{CompositeService, ServiceException}
import org.apache.hive.service.auth.HiveAuthFactory
import org.apache.hive.service.cli._
import org.apache.hive.service.cli.operation.Operation
import org.apache.hive.service.rpc.thrift.{TOperationHandle, TProtocolVersion}
import org.apache.livy.{LIVY_VERSION, Logging}
class LivyCLIService(server: LivyThriftServer)
extends CompositeService(classOf[LivyCLIService].getName) with ICLIService with Logging {
import LivyCLIService._
private var sessionManager: LivyThriftSessionManager = _
private var defaultFetchRows: Int = _
private var serviceUGI: UserGroupInformation = _
private var httpUGI: UserGroupInformation = _
override def init(hiveConf: HiveConf): Unit = {
sessionManager = new LivyThriftSessionManager(server, hiveConf)
defaultFetchRows =
// If the hadoop cluster is secure, do a kerberos login for the service from the keytab
if (UserGroupInformation.isSecurityEnabled) {
try {
serviceUGI = Utils.getUGI
} catch {
case e: IOException =>
throw new ServiceException("Unable to login to kerberos with given principal/keytab", e)
case e: LoginException =>
throw new ServiceException("Unable to login to kerberos with given principal/keytab", e)
// Also try creating a UGI object for the SPNego principal
val principal = hiveConf.getVar(ConfVars.HIVE_SERVER2_SPNEGO_PRINCIPAL)
val keyTabFile = hiveConf.getVar(ConfVars.HIVE_SERVER2_SPNEGO_KEYTAB)
if (principal.isEmpty || keyTabFile.isEmpty) {
info(s"SPNego httpUGI not created, SPNegoPrincipal: $principal, ketabFile: $keyTabFile")
} else try {
httpUGI = HiveAuthFactory.loginFromSpnegoKeytabAndReturnUGI(hiveConf)
info("SPNego httpUGI successfully created.")
} catch {
case e: IOException =>
warn("SPNego httpUGI creation failed: ", e)
def getServiceUGI: UserGroupInformation = this.serviceUGI
def getHttpUGI: UserGroupInformation = this.httpUGI
def getSessionManager: LivyThriftSessionManager = sessionManager
override def getInfo(sessionHandle: SessionHandle, getInfoType: GetInfoType): GetInfoValue = {
getInfoType match {
case GetInfoType.CLI_SERVER_NAME => new GetInfoValue("Livy JDBC")
case GetInfoType.CLI_DBMS_NAME => new GetInfoValue("Livy JDBC")
case GetInfoType.CLI_DBMS_VER => new GetInfoValue(LIVY_VERSION)
// below values are copied from Hive
case GetInfoType.CLI_MAX_COLUMN_NAME_LEN => new GetInfoValue(128)
case GetInfoType.CLI_MAX_SCHEMA_NAME_LEN => new GetInfoValue(128)
case GetInfoType.CLI_MAX_TABLE_NAME_LEN => new GetInfoValue(128)
case GetInfoType.CLI_ODBC_KEYWORDS =>
new GetInfoValue(ParseUtils.getKeywords(LivyCLIService.ODBC_KEYWORDS))
case _ => throw new HiveSQLException(s"Unrecognized GetInfoType value: $getInfoType")
def openSession(
protocol: TProtocolVersion,
username: String,
password: String,
ipAddress: String,
configuration: util.Map[String, String]): SessionHandle = {
val sessionHandle = sessionManager.openSession(
protocol, username, password, ipAddress, configuration, false, null)
debug(sessionHandle + ": openSession()")
def openSessionWithImpersonation(
protocol: TProtocolVersion,
username: String,
password: String,
ipAddress: String,
configuration: util.Map[String, String],
delegationToken: String): SessionHandle = {
val sessionHandle = sessionManager.openSession(
protocol, username, password, ipAddress, configuration, true, delegationToken)
debug(sessionHandle + ": openSession()")
override def openSession(
username: String,
password: String,
configuration: util.Map[String, String]): SessionHandle = {
val sessionHandle = sessionManager.openSession(
SERVER_VERSION, username, password, null, configuration, false, null)
debug(sessionHandle + ": openSession()")
override def openSessionWithImpersonation(
username: String,
password: String,
configuration: util.Map[String, String], delegationToken: String): SessionHandle = {
val sessionHandle = sessionManager.openSession(
SERVER_VERSION, username, password, null, configuration, true, delegationToken)
debug(sessionHandle + ": openSession()")
override def closeSession(sessionHandle: SessionHandle): Unit = {
debug(sessionHandle + ": closeSession()")
override def executeStatement(
sessionHandle: SessionHandle,
statement: String,
confOverlay: util.Map[String, String]): OperationHandle = {
executeStatement(sessionHandle, statement, confOverlay, 0)
* Execute statement on the server with a timeout. This is a blocking call.
override def executeStatement(
sessionHandle: SessionHandle,
statement: String,
confOverlay: util.Map[String, String],
queryTimeout: Long): OperationHandle = {
val opHandle: OperationHandle = sessionManager.operationManager.executeStatement(
sessionHandle, statement, confOverlay, runAsync = false, queryTimeout)
debug(sessionHandle + ": executeStatement()")
override def executeStatementAsync(
sessionHandle: SessionHandle,
statement: String,
confOverlay: util.Map[String, String]): OperationHandle = {
executeStatementAsync(sessionHandle, statement, confOverlay, 0)
* Execute statement asynchronously on the server with a timeout. This is a non-blocking call
override def executeStatementAsync(
sessionHandle: SessionHandle,
statement: String,
confOverlay: util.Map[String, String],
queryTimeout: Long): OperationHandle = {
val opHandle = sessionManager.operationManager.executeStatement(
sessionHandle, statement, confOverlay, runAsync = true, queryTimeout)
debug(sessionHandle + ": executeStatementAsync()")
override def getTypeInfo(sessionHandle: SessionHandle): OperationHandle = {
debug(sessionHandle + ": getTypeInfo()")
override def getCatalogs(sessionHandle: SessionHandle): OperationHandle = {
debug(sessionHandle + ": getCatalogs()")
override def getSchemas(
sessionHandle: SessionHandle,
catalogName: String,
schemaName: String): OperationHandle = {
throw new HiveSQLException("Operation GET_SCHEMAS is not yet supported")
override def getTables(
sessionHandle: SessionHandle,
catalogName: String,
schemaName: String,
tableName: String,
tableTypes: util.List[String]): OperationHandle = {
throw new HiveSQLException("Operation GET_TABLES is not yet supported")
override def getTableTypes(sessionHandle: SessionHandle): OperationHandle = {
debug(sessionHandle + ": getTableTypes()")
override def getColumns(
sessionHandle: SessionHandle,
catalogName: String,
schemaName: String,
tableName: String,
columnName: String): OperationHandle = {
throw new HiveSQLException("Operation GET_COLUMNS is not yet supported")
override def getFunctions(
sessionHandle: SessionHandle,
catalogName: String,
schemaName: String,
functionName: String): OperationHandle = {
throw new HiveSQLException("Operation GET_FUNCTIONS is not yet supported")
override def getPrimaryKeys(
sessionHandle: SessionHandle,
catalog: String,
schema: String,
table: String): OperationHandle = {
throw new HiveSQLException("Operation GET_PRIMARY_KEYS is not yet supported")
override def getCrossReference(
sessionHandle: SessionHandle,
primaryCatalog: String,
primarySchema: String,
primaryTable: String,
foreignCatalog: String,
foreignSchema: String,
foreignTable: String): OperationHandle = {
throw new HiveSQLException("Operation GET_CROSS_REFERENCE is not yet supported")
override def getOperationStatus(
opHandle: OperationHandle,
getProgressUpdate: Boolean): OperationStatus = {
val operation: Operation = sessionManager.operationManager.getOperation(opHandle)
* If this is a background operation run asynchronously,
* we block for a duration determined by a step function, before we return
* However, if the background operation is complete, we return immediately.
if (operation.shouldRunAsync) {
val maxTimeout: Long = HiveConf.getTimeVar(
val elapsed: Long = System.currentTimeMillis - operation.getBeginTime
// A step function to increase the polling timeout by 500 ms every 10 sec,
// starting from 500 ms up to HIVE_SERVER2_LONG_POLLING_TIMEOUT
val timeout: Long = Math.min(maxTimeout, (elapsed / TimeUnit.SECONDS.toMillis(10) + 1) * 500)
try {
operation.getBackgroundHandle.get(timeout, TimeUnit.MILLISECONDS)
} catch {
case e: TimeoutException =>
// No Op, return to the caller since long polling timeout has expired
trace(opHandle + ": Long polling timed out")
case e: CancellationException =>
// The background operation thread was cancelled
trace(opHandle + ": The background operation was cancelled", e)
case e: ExecutionException =>
// Note: Hive ops do not use the normal Future failure path, so this will not happen
// in case of actual failure; the Future will just be done.
// The background operation thread was aborted
warn(opHandle + ": The background operation was aborted", e)
case _: InterruptedException =>
// No op, this thread was interrupted
// In this case, the call might return sooner than long polling timeout
val opStatus: OperationStatus = operation.getStatus
debug(opHandle + ": getOperationStatus()")
opStatus.setJobProgressUpdate(new JobProgressUpdate(ProgressMonitor.NULL))
override def cancelOperation(opHandle: OperationHandle): Unit = {
debug(opHandle + ": cancelOperation()")
override def closeOperation(opHandle: OperationHandle): Unit = {
debug(opHandle + ": closeOperation")
override def getResultSetMetadata(opHandle: OperationHandle): TableSchema = {
debug(opHandle + ": getResultSetMetadata()")
override def fetchResults(opHandle: OperationHandle): RowSet = {
opHandle, Operation.DEFAULT_FETCH_ORIENTATION, defaultFetchRows, FetchType.QUERY_OUTPUT)
override def fetchResults(
opHandle: OperationHandle,
orientation: FetchOrientation,
maxRows: Long,
fetchType: FetchType): RowSet = {
debug(opHandle + ": fetchResults()")
sessionManager.operationManager.fetchResults(opHandle, orientation, maxRows, fetchType)
override def getDelegationToken(
sessionHandle: SessionHandle,
authFactory: HiveAuthFactory,
owner: String,
renewer: String): String = {
throw new HiveSQLException("Operation not yet supported.")
override def setApplicationName(sh: SessionHandle, value: String): Unit = {
throw new HiveSQLException("Operation not yet supported.")
override def cancelDelegationToken(
sessionHandle: SessionHandle,
authFactory: HiveAuthFactory,
tokenStr: String): Unit = {
throw new HiveSQLException("Operation not yet supported.")
override def renewDelegationToken(
sessionHandle: SessionHandle,
authFactory: HiveAuthFactory,
tokenStr: String): Unit = {
throw new HiveSQLException("Operation not yet supported.")
override def getQueryId(opHandle: TOperationHandle): String = {
throw new HiveSQLException("Operation not yet supported.")
object LivyCLIService {
val SERVER_VERSION: TProtocolVersion = TProtocolVersion.values().last
// scalastyle:off line.size.limit
// From
// scalastyle:on line.size.limit
"WITH", "WORK", "WRITE", "YEAR", "ZONE").asJava