blob: 52893546021f48fbc66b67d167a00e0d1aedf71b [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.livy.thriftserver
import java.io.IOException
import java.util
import java.util.concurrent.{CancellationException, ExecutionException, TimeoutException, TimeUnit}
import javax.security.auth.login.LoginException
import scala.collection.JavaConverters._
import org.apache.hadoop.hive.common.log.ProgressMonitor
import org.apache.hadoop.hive.conf.HiveConf
import org.apache.hadoop.hive.conf.HiveConf.ConfVars
import org.apache.hadoop.hive.ql.parse.ParseUtils
import org.apache.hadoop.hive.shims.Utils
import org.apache.hadoop.security.UserGroupInformation
import org.apache.hive.service.{CompositeService, ServiceException}
import org.apache.hive.service.auth.HiveAuthFactory
import org.apache.hive.service.cli._
import org.apache.hive.service.cli.operation.Operation
import org.apache.hive.service.rpc.thrift.{TOperationHandle, TProtocolVersion}
import org.apache.livy.{LIVY_VERSION, Logging}
class LivyCLIService(server: LivyThriftServer)
extends CompositeService(classOf[LivyCLIService].getName) with ICLIService with Logging {
import LivyCLIService._
private var sessionManager: LivyThriftSessionManager = _
private var defaultFetchRows: Int = _
private var serviceUGI: UserGroupInformation = _
private var httpUGI: UserGroupInformation = _
override def init(hiveConf: HiveConf): Unit = {
sessionManager = new LivyThriftSessionManager(server, hiveConf)
addService(sessionManager)
defaultFetchRows =
hiveConf.getIntVar(ConfVars.HIVE_SERVER2_THRIFT_RESULTSET_DEFAULT_FETCH_SIZE)
// If the hadoop cluster is secure, do a kerberos login for the service from the keytab
if (UserGroupInformation.isSecurityEnabled) {
try {
serviceUGI = Utils.getUGI
} catch {
case e: IOException =>
throw new ServiceException("Unable to login to kerberos with given principal/keytab", e)
case e: LoginException =>
throw new ServiceException("Unable to login to kerberos with given principal/keytab", e)
}
// Also try creating a UGI object for the SPNego principal
val principal = hiveConf.getVar(ConfVars.HIVE_SERVER2_SPNEGO_PRINCIPAL)
val keyTabFile = hiveConf.getVar(ConfVars.HIVE_SERVER2_SPNEGO_KEYTAB)
if (principal.isEmpty || keyTabFile.isEmpty) {
info(s"SPNego httpUGI not created, SPNegoPrincipal: $principal, ketabFile: $keyTabFile")
} else try {
httpUGI = HiveAuthFactory.loginFromSpnegoKeytabAndReturnUGI(hiveConf)
info("SPNego httpUGI successfully created.")
} catch {
case e: IOException =>
warn("SPNego httpUGI creation failed: ", e)
}
}
super.init(hiveConf)
}
def getServiceUGI: UserGroupInformation = this.serviceUGI
def getHttpUGI: UserGroupInformation = this.httpUGI
def getSessionManager: LivyThriftSessionManager = sessionManager
@throws[HiveSQLException]
override def getInfo(sessionHandle: SessionHandle, getInfoType: GetInfoType): GetInfoValue = {
getInfoType match {
case GetInfoType.CLI_SERVER_NAME => new GetInfoValue("Livy JDBC")
case GetInfoType.CLI_DBMS_NAME => new GetInfoValue("Livy JDBC")
case GetInfoType.CLI_DBMS_VER => new GetInfoValue(LIVY_VERSION)
// below values are copied from Hive
case GetInfoType.CLI_MAX_COLUMN_NAME_LEN => new GetInfoValue(128)
case GetInfoType.CLI_MAX_SCHEMA_NAME_LEN => new GetInfoValue(128)
case GetInfoType.CLI_MAX_TABLE_NAME_LEN => new GetInfoValue(128)
case GetInfoType.CLI_ODBC_KEYWORDS =>
new GetInfoValue(ParseUtils.getKeywords(LivyCLIService.ODBC_KEYWORDS))
case _ => throw new HiveSQLException(s"Unrecognized GetInfoType value: $getInfoType")
}
}
@throws[HiveSQLException]
def openSession(
protocol: TProtocolVersion,
username: String,
password: String,
ipAddress: String,
configuration: util.Map[String, String]): SessionHandle = {
val sessionHandle = sessionManager.openSession(
protocol, username, password, ipAddress, configuration, false, null)
debug(sessionHandle + ": openSession()")
sessionHandle
}
@throws[HiveSQLException]
def openSessionWithImpersonation(
protocol: TProtocolVersion,
username: String,
password: String,
ipAddress: String,
configuration: util.Map[String, String],
delegationToken: String): SessionHandle = {
val sessionHandle = sessionManager.openSession(
protocol, username, password, ipAddress, configuration, true, delegationToken)
debug(sessionHandle + ": openSession()")
sessionHandle
}
@throws[HiveSQLException]
override def openSession(
username: String,
password: String,
configuration: util.Map[String, String]): SessionHandle = {
val sessionHandle = sessionManager.openSession(
SERVER_VERSION, username, password, null, configuration, false, null)
debug(sessionHandle + ": openSession()")
sessionHandle
}
@throws[HiveSQLException]
override def openSessionWithImpersonation(
username: String,
password: String,
configuration: util.Map[String, String], delegationToken: String): SessionHandle = {
val sessionHandle = sessionManager.openSession(
SERVER_VERSION, username, password, null, configuration, true, delegationToken)
debug(sessionHandle + ": openSession()")
sessionHandle
}
@throws[HiveSQLException]
override def closeSession(sessionHandle: SessionHandle): Unit = {
sessionManager.closeSession(sessionHandle)
debug(sessionHandle + ": closeSession()")
}
@throws[HiveSQLException]
override def executeStatement(
sessionHandle: SessionHandle,
statement: String,
confOverlay: util.Map[String, String]): OperationHandle = {
executeStatement(sessionHandle, statement, confOverlay, 0)
}
/**
* Execute statement on the server with a timeout. This is a blocking call.
*/
@throws[HiveSQLException]
override def executeStatement(
sessionHandle: SessionHandle,
statement: String,
confOverlay: util.Map[String, String],
queryTimeout: Long): OperationHandle = {
val opHandle: OperationHandle = sessionManager.operationManager.executeStatement(
sessionHandle, statement, confOverlay, runAsync = false, queryTimeout)
debug(sessionHandle + ": executeStatement()")
opHandle
}
@throws[HiveSQLException]
override def executeStatementAsync(
sessionHandle: SessionHandle,
statement: String,
confOverlay: util.Map[String, String]): OperationHandle = {
executeStatementAsync(sessionHandle, statement, confOverlay, 0)
}
/**
* Execute statement asynchronously on the server with a timeout. This is a non-blocking call
*/
@throws[HiveSQLException]
override def executeStatementAsync(
sessionHandle: SessionHandle,
statement: String,
confOverlay: util.Map[String, String],
queryTimeout: Long): OperationHandle = {
val opHandle = sessionManager.operationManager.executeStatement(
sessionHandle, statement, confOverlay, runAsync = true, queryTimeout)
debug(sessionHandle + ": executeStatementAsync()")
opHandle
}
@throws[HiveSQLException]
override def getTypeInfo(sessionHandle: SessionHandle): OperationHandle = {
debug(sessionHandle + ": getTypeInfo()")
sessionManager.operationManager.getTypeInfo(sessionHandle)
}
@throws[HiveSQLException]
override def getCatalogs(sessionHandle: SessionHandle): OperationHandle = {
debug(sessionHandle + ": getCatalogs()")
sessionManager.operationManager.getCatalogs(sessionHandle)
}
@throws[HiveSQLException]
override def getSchemas(
sessionHandle: SessionHandle,
catalogName: String,
schemaName: String): OperationHandle = {
// TODO
throw new HiveSQLException("Operation GET_SCHEMAS is not yet supported")
}
@throws[HiveSQLException]
override def getTables(
sessionHandle: SessionHandle,
catalogName: String,
schemaName: String,
tableName: String,
tableTypes: util.List[String]): OperationHandle = {
// TODO
throw new HiveSQLException("Operation GET_TABLES is not yet supported")
}
@throws[HiveSQLException]
override def getTableTypes(sessionHandle: SessionHandle): OperationHandle = {
debug(sessionHandle + ": getTableTypes()")
sessionManager.operationManager.getTableTypes(sessionHandle)
}
@throws[HiveSQLException]
override def getColumns(
sessionHandle: SessionHandle,
catalogName: String,
schemaName: String,
tableName: String,
columnName: String): OperationHandle = {
// TODO
throw new HiveSQLException("Operation GET_COLUMNS is not yet supported")
}
@throws[HiveSQLException]
override def getFunctions(
sessionHandle: SessionHandle,
catalogName: String,
schemaName: String,
functionName: String): OperationHandle = {
// TODO
throw new HiveSQLException("Operation GET_FUNCTIONS is not yet supported")
}
@throws[HiveSQLException]
override def getPrimaryKeys(
sessionHandle: SessionHandle,
catalog: String,
schema: String,
table: String): OperationHandle = {
// TODO
throw new HiveSQLException("Operation GET_PRIMARY_KEYS is not yet supported")
}
@throws[HiveSQLException]
override def getCrossReference(
sessionHandle: SessionHandle,
primaryCatalog: String,
primarySchema: String,
primaryTable: String,
foreignCatalog: String,
foreignSchema: String,
foreignTable: String): OperationHandle = {
// TODO
throw new HiveSQLException("Operation GET_CROSS_REFERENCE is not yet supported")
}
@throws[HiveSQLException]
override def getOperationStatus(
opHandle: OperationHandle,
getProgressUpdate: Boolean): OperationStatus = {
val operation: Operation = sessionManager.operationManager.getOperation(opHandle)
/**
* If this is a background operation run asynchronously,
* we block for a duration determined by a step function, before we return
* However, if the background operation is complete, we return immediately.
*/
if (operation.shouldRunAsync) {
val maxTimeout: Long = HiveConf.getTimeVar(
getHiveConf,
HiveConf.ConfVars.HIVE_SERVER2_LONG_POLLING_TIMEOUT,
TimeUnit.MILLISECONDS)
val elapsed: Long = System.currentTimeMillis - operation.getBeginTime
// A step function to increase the polling timeout by 500 ms every 10 sec,
// starting from 500 ms up to HIVE_SERVER2_LONG_POLLING_TIMEOUT
val timeout: Long = Math.min(maxTimeout, (elapsed / TimeUnit.SECONDS.toMillis(10) + 1) * 500)
try {
operation.getBackgroundHandle.get(timeout, TimeUnit.MILLISECONDS)
} catch {
case e: TimeoutException =>
// No Op, return to the caller since long polling timeout has expired
trace(opHandle + ": Long polling timed out")
case e: CancellationException =>
// The background operation thread was cancelled
trace(opHandle + ": The background operation was cancelled", e)
case e: ExecutionException =>
// Note: Hive ops do not use the normal Future failure path, so this will not happen
// in case of actual failure; the Future will just be done.
// The background operation thread was aborted
warn(opHandle + ": The background operation was aborted", e)
case _: InterruptedException =>
// No op, this thread was interrupted
// In this case, the call might return sooner than long polling timeout
}
}
val opStatus: OperationStatus = operation.getStatus
debug(opHandle + ": getOperationStatus()")
opStatus.setJobProgressUpdate(new JobProgressUpdate(ProgressMonitor.NULL))
opStatus
}
@throws[HiveSQLException]
override def cancelOperation(opHandle: OperationHandle): Unit = {
sessionManager.operationManager.cancelOperation(opHandle)
debug(opHandle + ": cancelOperation()")
}
@throws[HiveSQLException]
override def closeOperation(opHandle: OperationHandle): Unit = {
sessionManager.operationManager.closeOperation(opHandle)
debug(opHandle + ": closeOperation")
}
@throws[HiveSQLException]
override def getResultSetMetadata(opHandle: OperationHandle): TableSchema = {
debug(opHandle + ": getResultSetMetadata()")
sessionManager.operationManager.getOperation(opHandle).getResultSetSchema
}
@throws[HiveSQLException]
override def fetchResults(opHandle: OperationHandle): RowSet = {
fetchResults(
opHandle, Operation.DEFAULT_FETCH_ORIENTATION, defaultFetchRows, FetchType.QUERY_OUTPUT)
}
@throws[HiveSQLException]
override def fetchResults(
opHandle: OperationHandle,
orientation: FetchOrientation,
maxRows: Long,
fetchType: FetchType): RowSet = {
debug(opHandle + ": fetchResults()")
sessionManager.operationManager.fetchResults(opHandle, orientation, maxRows, fetchType)
}
@throws[HiveSQLException]
override def getDelegationToken(
sessionHandle: SessionHandle,
authFactory: HiveAuthFactory,
owner: String,
renewer: String): String = {
throw new HiveSQLException("Operation not yet supported.")
}
@throws[HiveSQLException]
override def setApplicationName(sh: SessionHandle, value: String): Unit = {
throw new HiveSQLException("Operation not yet supported.")
}
override def cancelDelegationToken(
sessionHandle: SessionHandle,
authFactory: HiveAuthFactory,
tokenStr: String): Unit = {
throw new HiveSQLException("Operation not yet supported.")
}
override def renewDelegationToken(
sessionHandle: SessionHandle,
authFactory: HiveAuthFactory,
tokenStr: String): Unit = {
throw new HiveSQLException("Operation not yet supported.")
}
@throws[HiveSQLException]
override def getQueryId(opHandle: TOperationHandle): String = {
throw new HiveSQLException("Operation not yet supported.")
}
}
object LivyCLIService {
val SERVER_VERSION: TProtocolVersion = TProtocolVersion.values().last
// scalastyle:off line.size.limit
// From https://docs.microsoft.com/en-us/sql/t-sql/language-elements/reserved-keywords-transact-sql#odbc-reserved-keywords
// scalastyle:on line.size.limit
private val ODBC_KEYWORDS = Set("ABSOLUTE", "ACTION", "ADA", "ADD", "ALL", "ALLOCATE", "ALTER",
"AND", "ANY", "ARE", "AS", "ASC", "ASSERTION", "AT", "AUTHORIZATION", "AVG", "BEGIN",
"BETWEEN", "BIT_LENGTH", "BIT", "BOTH", "BY", "CASCADE", "CASCADED", "CASE", "CAST", "CATALOG",
"CHAR_LENGTH", "CHAR", "CHARACTER_LENGTH", "CHARACTER", "CHECK", "CLOSE", "COALESCE",
"COLLATE", "COLLATION", "COLUMN", "COMMIT", "CONNECT", "CONNECTION", "CONSTRAINT",
"CONSTRAINTS", "CONTINUE", "CONVERT", "CORRESPONDING", "COUNT", "CREATE", "CROSS",
"CURRENT_DATE", "CURRENT_TIME", "CURRENT_TIMESTAMP", "CURRENT_USER", "CURRENT", "CURSOR",
"DATE", "DAY", "DEALLOCATE", "DEC", "DECIMAL", "DECLARE", "DEFAULT", "DEFERRABLE", "DEFERRED",
"DELETE", "DESC", "DESCRIBE", "DESCRIPTOR", "DIAGNOSTICS", "DISCONNECT", "DISTINCT", "DOMAIN",
"DOUBLE", "DROP", "ELSE", "END", "ESCAPE", "EXCEPT", "EXCEPTION", "EXEC", "EXECUTE", "EXISTS",
"EXTERNAL", "EXTRACT", "FALSE", "FETCH", "FIRST", "FLOAT", "FOR", "FOREIGN", "FORTRAN",
"FOUND", "FROM", "FULL", "GET", "GLOBAL", "GO", "GOTO", "GRANT", "GROUP", "HAVING", "HOUR",
"IDENTITY", "IMMEDIATE", "IN", "INCLUDE", "INDEX", "INDICATOR", "INITIALLY", "INNER", "INPUT",
"INSENSITIVE", "INSERT", "INT", "INTEGER", "INTERSECT", "INTERVAL", "INTO", "IS", "ISOLATION",
"JOIN", "KEY", "LANGUAGE", "LAST", "LEADING", "LEFT", "LEVEL", "LIKE", "LOCAL", "LOWER",
"MATCH", "MAX", "MIN", "MINUTE", "MODULE", "MONTH", "NAMES", "NATIONAL", "NATURAL", "NCHAR",
"NEXT", "NO", "NONE", "NOT", "NULL", "NULLIF", "NUMERIC", "OCTET_LENGTH", "OF", "ON", "ONLY",
"OPEN", "OPTION", "OR", "ORDER", "OUTER", "OUTPUT", "OVERLAPS", "PAD", "PARTIAL", "PASCAL",
"POSITION", "PRECISION", "PREPARE", "PRESERVE", "PRIMARY", "PRIOR", "PRIVILEGES", "PROCEDURE",
"PUBLIC", "READ", "REAL", "REFERENCES", "RELATIVE", "RESTRICT", "REVOKE", "RIGHT", "ROLLBACK",
"ROWS", "SCHEMA", "SCROLL", "SECOND", "SECTION", "SELECT", "SESSION_USER", "SESSION", "SET",
"SIZE", "SMALLINT", "SOME", "SPACE", "SQL", "SQLCA", "SQLCODE", "SQLERROR", "SQLSTATE",
"SQLWARNING", "SUBSTRING", "SUM", "SYSTEM_USER", "TABLE", "TEMPORARY", "THEN", "TIME",
"TIMESTAMP", "TIMEZONE_HOUR", "TIMEZONE_MINUTE", "TO", "TRAILING", "TRANSACTION", "TRANSLATE",
"TRANSLATION", "TRIM", "TRUE", "UNION", "UNIQUE", "UNKNOWN", "UPDATE", "UPPER", "USAGE",
"USER", "USING", "VALUE", "VALUES", "VARCHAR", "VARYING", "VIEW", "WHEN", "WHENEVER", "WHERE",
"WITH", "WORK", "WRITE", "YEAR", "ZONE").asJava
}