blob: c71171a1f7298d533e32a261f415f793e1c5ad5c [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.livy.thriftserver
import java.util
import java.util.{Map => JMap}
import java.util.concurrent.ConcurrentHashMap
import scala.collection.mutable
import org.apache.hadoop.hive.conf.HiveConf.ConfVars
import org.apache.hadoop.hive.metastore.api.{FieldSchema, Schema}
import org.apache.hive.service.cli._
import org.apache.hive.service.cli.operation.{GetCatalogsOperation, GetTableTypesOperation, GetTypeInfoOperation, Operation}
import org.apache.livy.Logging
class LivyOperationManager(val livyThriftSessionManager: LivyThriftSessionManager)
extends Logging {
private val handleToOperation = new ConcurrentHashMap[OperationHandle, Operation]()
private val sessionToOperationHandles =
new mutable.HashMap[SessionHandle, mutable.Set[OperationHandle]]()
private def addOperation(operation: Operation, sessionHandle: SessionHandle): Unit = {
handleToOperation.put(operation.getHandle, operation)
sessionToOperationHandles.synchronized {
val set = sessionToOperationHandles.getOrElseUpdate(sessionHandle,
new mutable.HashSet[OperationHandle])
set += operation.getHandle
}
}
@throws[HiveSQLException]
private def removeOperation(operationHandle: OperationHandle): Operation = {
val operation = handleToOperation.remove(operationHandle)
if (operation == null) {
throw new HiveSQLException(s"Operation does not exist: $operationHandle")
}
val sessionHandle = operation.getSessionHandle
sessionToOperationHandles.synchronized {
sessionToOperationHandles(sessionHandle) -= operationHandle
if (sessionToOperationHandles(sessionHandle).isEmpty) {
sessionToOperationHandles.remove(sessionHandle)
}
}
operation
}
def getOperations(sessionHandle: SessionHandle): Set[OperationHandle] = {
sessionToOperationHandles.synchronized {
sessionToOperationHandles(sessionHandle).toSet
}
}
def getTimedOutOperations(sessionHandle: SessionHandle): Set[Operation] = {
val opHandles = getOperations(sessionHandle)
val currentTime = System.currentTimeMillis()
opHandles.flatMap { handle =>
// Some operations may have finished and been removed since we got them.
Option(handleToOperation.get(handle))
}.filter(_.isTimedOut(currentTime))
}
@throws[HiveSQLException]
def getOperation(operationHandle: OperationHandle): Operation = {
val operation = handleToOperation.get(operationHandle)
if (operation == null) {
throw new HiveSQLException(s"Invalid OperationHandle: $operationHandle")
}
operation
}
def newExecuteStatementOperation(
sessionHandle: SessionHandle,
statement: String,
confOverlay: JMap[String, String],
runAsync: Boolean,
queryTimeout: Long): Operation = {
val op = new LivyExecuteStatementOperation(
sessionHandle,
statement,
confOverlay,
runAsync,
livyThriftSessionManager)
addOperation(op, sessionHandle)
debug(s"Created Operation for $statement with session=$sessionHandle, " +
s"runInBackground=$runAsync")
op
}
def getOperationLogRowSet(
opHandle: OperationHandle,
orientation: FetchOrientation,
maxRows: Long): RowSet = {
val tableSchema = new TableSchema(LivyOperationManager.LOG_SCHEMA)
val session = livyThriftSessionManager.getSessionInfo(getOperation(opHandle).getSessionHandle)
val logs = RowSetFactory.create(tableSchema, session.protocolVersion, false)
if (!livyThriftSessionManager.getHiveConf.getBoolVar(
ConfVars.HIVE_SERVER2_LOGGING_OPERATION_ENABLED)) {
warn("Try to get operation log when " +
ConfVars.HIVE_SERVER2_LOGGING_OPERATION_ENABLED.varname +
" is false, no log will be returned. ")
} else {
// Get the operation log. This is implemented only for LivyExecuteStatementOperation
val operation = getOperation(opHandle)
if (operation.isInstanceOf[LivyExecuteStatementOperation]) {
val op = getOperation(opHandle).asInstanceOf[LivyExecuteStatementOperation]
op.getOperationMessages.foreach { l =>
logs.addRow(Array(l))
}
}
}
logs
}
@throws[HiveSQLException]
def executeStatement(
sessionHandle: SessionHandle,
statement: String,
confOverlay: util.Map[String, String],
runAsync: Boolean,
queryTimeout: Long): OperationHandle = {
executeOperation(sessionHandle, {
newExecuteStatementOperation(sessionHandle, statement, confOverlay, runAsync, queryTimeout)
})
}
@throws[HiveSQLException]
private def executeOperation(
sessionHandle: SessionHandle,
operationCreator: => Operation): OperationHandle = {
var opHandle: OperationHandle = null
try {
val operation = operationCreator
opHandle = operation.getHandle
operation.run()
opHandle
} catch {
case e: HiveSQLException =>
if (opHandle != null) {
closeOperation(opHandle)
}
throw e
}
}
@throws[HiveSQLException]
def getTypeInfo(sessionHandle: SessionHandle): OperationHandle = {
executeOperation(sessionHandle, {
val op = new GetTypeInfoOperation(sessionHandle)
addOperation(op, sessionHandle)
op
})
}
@throws[HiveSQLException]
def getCatalogs(sessionHandle: SessionHandle): OperationHandle = {
executeOperation(sessionHandle, {
val op = new GetCatalogsOperation(sessionHandle)
addOperation(op, sessionHandle)
op
})
}
@throws[HiveSQLException]
def getTableTypes(sessionHandle: SessionHandle): OperationHandle = {
executeOperation(sessionHandle, {
val op = new GetTableTypesOperation(sessionHandle)
addOperation(op, sessionHandle)
op
})
}
/**
* Cancel the running operation unless it is already in a terminal state
*/
@throws[HiveSQLException]
def cancelOperation(opHandle: OperationHandle, errMsg: String): Unit = {
val operation = getOperation(opHandle)
val opState = operation.getStatus.getState
if (opState.isTerminal) {
// Cancel should be a no-op
debug(s"$opHandle: Operation is already aborted in state - $opState")
} else {
debug(s"$opHandle: Attempting to cancel from state - $opState")
val operationState = OperationState.CANCELED
operationState.setErrorMessage(errMsg)
operation.cancel(operationState)
}
}
@throws[HiveSQLException]
def cancelOperation(opHandle: OperationHandle): Unit = {
cancelOperation(opHandle, "")
}
@throws[HiveSQLException]
def closeOperation(opHandle: OperationHandle): Unit = {
info("Closing operation: " + opHandle)
val operation = removeOperation(opHandle)
operation.close()
}
@throws[HiveSQLException]
def fetchResults(
opHandle: OperationHandle,
orientation: FetchOrientation,
maxRows: Long,
fetchType: FetchType): RowSet = {
if (fetchType == FetchType.QUERY_OUTPUT) {
getOperation(opHandle).getNextRowSet(orientation, maxRows)
} else {
getOperationLogRowSet(opHandle, orientation, maxRows)
}
}
}
object LivyOperationManager {
val LOG_SCHEMA: Schema = {
val schema = new Schema
val fieldSchema = new FieldSchema
fieldSchema.setName("operation_log")
fieldSchema.setType("string")
schema.addToFieldSchemas(fieldSchema)
schema
}
}