blob: f7d6c16b06f481741bd6740226b4904bdd05f527 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.livy.thriftserver
import java.security.PrivilegedExceptionAction
import java.util.concurrent.{ConcurrentLinkedQueue, RejectedExecutionException}
import scala.collection.mutable
import scala.util.control.NonFatal
import org.apache.hadoop.security.UserGroupInformation
import org.apache.hive.service.cli._
import org.apache.livy.Logging
import org.apache.livy.thriftserver.SessionStates._
import org.apache.livy.thriftserver.operation.Operation
import org.apache.livy.thriftserver.rpc.RpcClient
import org.apache.livy.thriftserver.serde.ThriftResultSet
import org.apache.livy.thriftserver.types.{BasicDataType, DataTypeUtils, Field, Schema}
class LivyExecuteStatementOperation(
sessionHandle: SessionHandle,
statement: String,
runInBackground: Boolean = true,
sessionManager: LivyThriftSessionManager)
extends Operation(sessionHandle, OperationType.EXECUTE_STATEMENT)
with Logging {
/**
* Contains the messages which have to be sent to the client.
*/
private val operationMessages = new ConcurrentLinkedQueue[String]
// The initialization need to be lazy in order not to block when the instance is created
private lazy val rpcClient = {
val sessionState = sessionManager.livySessionState(sessionHandle)
if (sessionState == CREATION_IN_PROGRESS) {
operationMessages.offer(
"Livy session has not yet started. Please wait for it to be ready...")
}
// This call is blocking, we are waiting for the session to be ready.
new RpcClient(sessionManager.getLivySession(sessionHandle))
}
private var rowOffset = 0L
private def statementId: String = opHandle.getHandleIdentifier.toString
private def rpcClientValid: Boolean =
sessionManager.livySessionState(sessionHandle) == CREATION_SUCCESS && rpcClient.isValid
override def getNextRowSet(order: FetchOrientation, maxRowsL: Long): ThriftResultSet = {
validateFetchOrientation(order)
assertState(Seq(OperationState.FINISHED))
setHasResultSet(true)
// maxRowsL here typically maps to java.sql.Statement.getFetchSize, which is an int
val maxRows = maxRowsL.toInt
val resultSet = rpcClient.fetchResult(sessionHandle, statementId, maxRows).get()
val livyColumnResultSet = ThriftResultSet(resultSet)
livyColumnResultSet.setRowOffset(rowOffset)
rowOffset += livyColumnResultSet.numRows
livyColumnResultSet
}
override def runInternal(): Unit = {
setState(OperationState.PENDING)
setHasResultSet(true) // avoid no resultset for async run
if (!runInBackground) {
execute()
} else {
val livyServiceUGI = UserGroupInformation.getCurrentUser
// Runnable impl to call runInternal asynchronously,
// from a different thread
val backgroundOperation = new Runnable() {
override def run(): Unit = {
val doAsAction = new PrivilegedExceptionAction[Unit]() {
override def run(): Unit = {
try {
execute()
} catch {
case e: HiveSQLException =>
setOperationException(e)
error("Error running hive query: ", e)
}
}
}
try {
livyServiceUGI.doAs(doAsAction)
} catch {
case e: Exception =>
setOperationException(new HiveSQLException(e))
error("Error running hive query as user : " +
livyServiceUGI.getShortUserName, e)
}
}
}
try {
// This submit blocks if no background threads are available to run this operation
val backgroundHandle = sessionManager.submitBackgroundOperation(backgroundOperation)
setBackgroundHandle(backgroundHandle)
} catch {
case rejected: RejectedExecutionException =>
setState(OperationState.ERROR)
throw new HiveSQLException("The background threadpool cannot accept" +
" new task for execution, please retry the operation", rejected)
case NonFatal(e) =>
error(s"Error executing query in background", e)
setState(OperationState.ERROR)
throw e
}
}
}
protected def execute(): Unit = {
if (logger.isDebugEnabled) {
debug(s"Running query '$statement' with id $statementId (session = " +
s"${sessionHandle.getSessionId})")
}
setState(OperationState.RUNNING)
val before = System.currentTimeMillis()
try {
rpcClient.executeSql(sessionHandle, statementId, statement).get()
} catch {
case e: Throwable =>
val currentState = getStatus.state
info(s"Error executing query, currentState $currentState, ", e)
setState(OperationState.ERROR)
throw new HiveSQLException(e)
}
setState(OperationState.FINISHED)
val sessionInfo = sessionManager.getSessionInfo(sessionHandle)
val after = System.currentTimeMillis()
ThriftServerAudit.audit(sessionInfo.username, sessionInfo.ipAddress, statement, before, after)
}
def close(): Unit = {
info(s"Close $statementId")
cleanup(OperationState.CLOSED)
}
override def cancel(state: OperationState): Unit = {
info(s"Cancel $statementId with state $state")
cleanup(state)
}
override def shouldRunAsync: Boolean = runInBackground
override def getResultSetSchema: Schema = {
val tableSchema = DataTypeUtils.schemaFromSparkJson(
rpcClient.fetchResultSchema(sessionHandle, statementId).get())
// Workaround for operations returning an empty schema (eg. CREATE, INSERT, ...)
if (!tableSchema.fields.isEmpty) {
tableSchema
} else {
Schema(Field("Result", BasicDataType("string"), ""))
}
}
private def cleanup(state: OperationState) {
if (statementId != null && rpcClientValid) {
val cleaned = rpcClient.cleanupStatement(sessionHandle, statementId).get()
if (!cleaned) {
warn(s"Fail to cleanup query $statementId (session = ${sessionHandle.getSessionId}), " +
"this message can be ignored if the query failed.")
}
}
setState(state)
}
/**
* Returns the messages that should be sent to the client and removes them from the queue in
* order not to send them twice.
*/
def getOperationMessages: Seq[String] = {
def fetchNext(acc: mutable.ListBuffer[String]): Boolean = {
val m = operationMessages.poll()
if (m == null) {
false
} else {
acc += m
true
}
}
val res = new mutable.ListBuffer[String]
while (fetchNext(res)) {}
res
}
}