blob: 3fd5d2c6ded677e76f0f92ec98d30c73fb802d8e [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.lens.driver.hive;
import static org.apache.lens.server.api.error.LensDriverErrorCode.*;
import java.io.ByteArrayInputStream;
import java.io.IOException;
import java.io.ObjectInput;
import java.io.ObjectOutput;
import java.util.*;
import java.util.concurrent.*;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
import org.apache.lens.api.LensConf;
import org.apache.lens.api.LensSessionHandle;
import org.apache.lens.api.Priority;
import org.apache.lens.api.query.QueryHandle;
import org.apache.lens.api.query.QueryPrepareHandle;
import org.apache.lens.cube.query.cost.FactPartitionBasedQueryCostCalculator;
import org.apache.lens.server.api.LensConfConstants;
import org.apache.lens.server.api.driver.*;
import org.apache.lens.server.api.driver.DriverQueryStatus.DriverQueryState;
import org.apache.lens.server.api.error.LensException;
import org.apache.lens.server.api.events.LensEventListener;
import org.apache.lens.server.api.query.AbstractQueryContext;
import org.apache.lens.server.api.query.PreparedQueryContext;
import org.apache.lens.server.api.query.QueryContext;
import org.apache.lens.server.api.query.cost.*;
import org.apache.lens.server.api.query.priority.CostRangePriorityDecider;
import org.apache.lens.server.api.query.priority.CostToPriorityRangeConf;
import org.apache.lens.server.api.query.priority.QueryPriorityDecider;
import org.apache.commons.lang.StringUtils;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hive.conf.HiveConf;
import org.apache.hadoop.hive.ql.QueryDisplay.TaskDisplay;
import org.apache.hadoop.hive.ql.exec.Task;
import org.apache.hadoop.hive.ql.metadata.HiveException;
import org.apache.hadoop.hive.ql.session.SessionState;
import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hive.service.auth.HiveAuthFactory;
import org.apache.hive.service.cli.*;
import org.apache.hive.service.rpc.thrift.TOperationHandle;
import org.apache.hive.service.rpc.thrift.TProtocolVersion;
import org.apache.hive.service.rpc.thrift.TSessionHandle;
import com.fasterxml.jackson.core.type.TypeReference;
import com.fasterxml.jackson.databind.DeserializationFeature;
import com.fasterxml.jackson.databind.ObjectMapper;
import lombok.Getter;
import lombok.extern.slf4j.Slf4j;
/**
* The Class HiveDriver.
*/
@Slf4j
public class HiveDriver extends AbstractLensDriver {
/** The Constant HIVE_CONNECTION_CLASS. */
public static final String HIVE_CONNECTION_CLASS = "lens.driver.hive.connection.class";
/** The Constant HS2_CONNECTION_EXPIRY_DELAY. */
public static final String HS2_CONNECTION_EXPIRY_DELAY = "lens.driver.hive.hs2.connection.expiry.delay";
public static final String HS2_CALCULATE_PRIORITY = "lens.driver.hive.calculate.priority";
public static final String HS2_COST_CALCULATOR = "lens.driver.hive.cost.calculator.class";
/**
* Config param for defining priority ranges.
*/
public static final String HS2_PRIORITY_RANGES = "lens.driver.hive.priority.ranges";
// Default values of conf params
public static final long DEFAULT_EXPIRY_DELAY = 600 * 1000;
public static final String HS2_PRIORITY_DEFAULT_RANGES = "VERY_HIGH,7.0,HIGH,30.0,NORMAL,90,LOW";
public static final String SESSION_KEY_DELIMITER = ".";
/** The HiveConf - used for connecting to hive server and metastore */
private HiveConf hiveConf;
/** The hive handles. */
@Getter
private final Map<QueryHandle, OperationHandle> hiveHandles = new ConcurrentHashMap<QueryHandle, OperationHandle>();
/** The orphaned hive sessions. */
private ConcurrentLinkedQueue<SessionHandle> orphanedHiveSessions;
/** The opHandle to hive session map. */
private Map<OperationHandle, SessionHandle> opHandleToSession;
/** The session lock. */
private final Lock sessionLock;
// connections need to be separate for each user and each thread
/** The thread connections. */
private final Map<String, ExpirableConnection> threadConnections =
new ConcurrentHashMap<String, ExpirableConnection>();
/** The thrift conn expiry queue. */
private final DelayQueue<ExpirableConnection> thriftConnExpiryQueue = new DelayQueue<ExpirableConnection>();
/** The connection expiry thread. */
private final Thread connectionExpiryThread = new Thread(new ConnectionExpiryRunnable());
// assigned only in case of embedded connection
/** The embedded connection. */
private ThriftConnection embeddedConnection;
// Store mapping of Lens session ID to Hive session identifier
/** The lens to hive session. */
private Map<String, SessionHandle> lensToHiveSession;
/** Keep track of resources added to the Hive session */
private Map<SessionHandle, Boolean> resourcesAddedForSession;
/** The driver listeners. */
private List<LensEventListener<DriverEvent>> driverListeners;
QueryCostCalculator queryCostCalculator;
QueryPriorityDecider queryPriorityDecider;
// package-local. Test case can change.
boolean whetherCalculatePriority;
private static final Map<String, String> SESSION_CONF = new HashMap<String, String>() {
{
put(HiveConf.ConfVars.HIVE_SERVER2_CLOSE_SESSION_ON_DISCONNECT.varname, "false");
}
};
private String sessionDbKey(String sessionHandle, String database) {
return sessionHandle + SESSION_KEY_DELIMITER + database;
}
/**
* Return true if resources have been added to this Hive session
* @param sessionHandle lens session identifier
* @param database lens database
* @return true if resources have been already added to this session + db pair
*/
public boolean areDBResourcesAddedForSession(String sessionHandle, String database) {
String key = sessionDbKey(sessionHandle, database);
SessionHandle hiveSession = lensToHiveSession.get(key);
return hiveSession != null
&& resourcesAddedForSession.containsKey(hiveSession)
&& resourcesAddedForSession.get(hiveSession);
}
/**
* Tell Hive driver that resources have been added for this session and for the given database
* @param sessionHandle lens session identifier
* @param database lens database
*/
public void setResourcesAddedForSession(String sessionHandle, String database) {
SessionHandle hiveSession = lensToHiveSession.get(sessionDbKey(sessionHandle, database));
resourcesAddedForSession.put(hiveSession, Boolean.TRUE);
}
/**
* The Class ConnectionExpiryRunnable.
*/
class ConnectionExpiryRunnable implements Runnable {
/*
* (non-Javadoc)
*
* @see java.lang.Runnable#run()
*/
@Override
public void run() {
try {
while (true) {
ExpirableConnection expired = thriftConnExpiryQueue.take();
expired.setExpired();
ThriftConnection thConn = expired.getConnection();
if (thConn != null) {
try {
log.info("Closed connection: {}", expired.getConnId());
thConn.close();
} catch (IOException e) {
log.error("Error closing connection", e);
}
}
}
} catch (InterruptedException intr) {
log.warn("Connection expiry thread interrupted", intr);
return;
}
}
}
/** The Constant CONNECTION_COUNTER. */
private static final AtomicInteger CONNECTION_COUNTER = new AtomicInteger();
/**
* The Class ExpirableConnection.
*/
static class ExpirableConnection implements Delayed {
/** The access time. */
long accessTime;
/** The conn. */
private final ThriftConnection conn;
/** The timeout. */
private final long timeout;
/** The expired. */
private volatile boolean expired;
/** The conn id. */
private final int connId;
/**
* Instantiates a new expirable connection.
*
* @param conn the conn
* @param timeout the timeout
*/
ExpirableConnection(ThriftConnection conn, long timeout) {
this.conn = conn;
this.timeout = timeout;
connId = CONNECTION_COUNTER.incrementAndGet();
accessTime = System.currentTimeMillis();
}
private ThriftConnection getConnection() {
accessTime = System.currentTimeMillis();
return conn;
}
private boolean isExpired() {
return expired;
}
/**
* Sets the expired.
*/
private void setExpired() {
expired = true;
}
private int getConnId() {
return connId;
}
/*
* (non-Javadoc)
*
* @see java.lang.Comparable#compareTo(java.lang.Object)
*/
@Override
public int compareTo(Delayed other) {
return (int) (this.getDelay(TimeUnit.MILLISECONDS) - other.getDelay(TimeUnit.MILLISECONDS));
}
/*
* (non-Javadoc)
*
* @see java.util.concurrent.Delayed#getDelay(java.util.concurrent.TimeUnit)
*/
@Override
public long getDelay(TimeUnit unit) {
long age = System.currentTimeMillis() - accessTime;
return unit.convert(timeout - age, TimeUnit.MILLISECONDS);
}
}
/**
* Open connections.
*
* @return the int
*/
int openConnections() {
return thriftConnExpiryQueue.size();
}
/** The connection class. */
private Class<? extends ThriftConnection> connectionClass;
/** The is embedded. */
private boolean isEmbedded;
/** The connection expiry timeout. */
private long connectionExpiryTimeout;
/**
* Instantiates a new hive driver.
*
* @throws LensException the lens exception
*/
public HiveDriver() throws LensException {
this.sessionLock = new ReentrantLock();
lensToHiveSession = new HashMap<>();
opHandleToSession = new ConcurrentHashMap<>();
orphanedHiveSessions = new ConcurrentLinkedQueue<>();
resourcesAddedForSession = new HashMap<>();
connectionExpiryThread.setDaemon(true);
connectionExpiryThread.setName("HiveDriver-ConnectionExpiryThread");
connectionExpiryThread.start();
driverListeners = new ArrayList<LensEventListener<DriverEvent>>();
log.info("Hive driver inited");
}
/*
* (non-Javadoc)
*
* @see org.apache.lens.server.api.driver.LensDriver#configure(org.apache.hadoop.conf.Configuration)
*/
@Override
public void configure(Configuration conf, String driverType, String driverName) throws LensException {
super.configure(conf, driverType, driverName);
this.hiveConf = new HiveConf(conf, HiveDriver.class);
this.hiveConf.addResource(getConf());
connectionClass = getConf().getClass(HIVE_CONNECTION_CLASS, EmbeddedThriftConnection.class,
ThriftConnection.class);
isEmbedded = (connectionClass.getName().equals(EmbeddedThriftConnection.class.getName()));
connectionExpiryTimeout = getConf().getLong(HS2_CONNECTION_EXPIRY_DELAY, DEFAULT_EXPIRY_DELAY);
whetherCalculatePriority = getConf().getBoolean(HS2_CALCULATE_PRIORITY, true);
Class<? extends QueryCostCalculator> queryCostCalculatorClass = getConf().getClass(HS2_COST_CALCULATOR,
FactPartitionBasedQueryCostCalculator.class, QueryCostCalculator.class);
try {
queryCostCalculator = queryCostCalculatorClass.newInstance();
} catch (InstantiationException | IllegalAccessException e) {
throw new LensException("Can't instantiate query cost calculator of class: " + queryCostCalculatorClass, e);
}
//For initializing the decider class instance
queryCostCalculator.init(this);
queryPriorityDecider = new CostRangePriorityDecider(
new CostToPriorityRangeConf(getConf().get(HS2_PRIORITY_RANGES, HS2_PRIORITY_DEFAULT_RANGES))
);
log.info("Hive driver {} configured successfully", getFullyQualifiedName());
}
private QueryCost calculateQueryCost(AbstractQueryContext qctx) throws LensException {
if (qctx.isOlapQuery()) {
QueryCost cost = queryCostCalculator.calculateCost(qctx, this);
if (cost != null) {
return cost;
}
}
return new FactPartitionBasedQueryCost(Double.MAX_VALUE);
}
@Override
public QueryCost estimate(AbstractQueryContext qctx) throws LensException {
log.info("{} Estimate: {}", getFullyQualifiedName(), qctx.getDriverQuery(this));
if (qctx.getDriverQuery(this) == null) {
throw new NullPointerException("Null driver query for " + qctx.getUserQuery());
}
if (qctx.getDriverContext().getDriverQueryCost(this) != null) {
// estimate called again and again
return qctx.getDriverContext().getDriverQueryCost(this);
}
if (qctx.isOlapQuery()) {
// if query is olap query and rewriting takes care of semantic validation
// estimate is calculating cost of the query
// the calculation is done only for cube queries
// for all other native table queries, the cost will be maximum
return calculateQueryCost(qctx);
} else {
// its native table query. validate and return cost
return explain(qctx).getCost();
}
}
/*
* (non-Javadoc)
*
* @see org.apache.lens.server.api.driver.LensDriver#explain(java.lang.String, org.apache.hadoop.conf.Configuration)
*/
@Override
public HiveQueryPlan explain(AbstractQueryContext explainCtx) throws LensException {
if (explainCtx.getDriverQuery(this) == null) {
throw new NullPointerException("Null driver query for " + explainCtx.getUserQuery());
}
if (explainCtx.getDriverContext().getDriverQueryPlan(this) != null) {
// explain called again and again
return (HiveQueryPlan) explainCtx.getDriverContext().getDriverQueryPlan(this);
}
log.info("{} Explain: {}", getFullyQualifiedName(), explainCtx.getDriverQuery(this));
Configuration explainConf = new Configuration(explainCtx.getDriverConf(this));
explainConf.setClassLoader(explainCtx.getConf().getClassLoader());
explainConf.setBoolean(LensConfConstants.QUERY_PERSISTENT_RESULT_INDRIVER, false);
final String explainQuery = "EXPLAIN EXTENDED " + explainCtx.getDriverQuery(this);
QueryContext explainQueryCtx = QueryContext.createContextWithSingleDriver(explainQuery,
explainCtx.getSubmittedUser(), new LensConf(), explainConf, this, explainCtx.getLensSessionIdentifier(), false);
// Get result set of explain
InMemoryResultSet inMemoryResultSet = (InMemoryResultSet) execute(explainQueryCtx);
List<String> explainOutput = new ArrayList<>();
while (inMemoryResultSet.hasNext()) {
explainOutput.add((String) inMemoryResultSet.next().getValues().get(0));
}
closeQuery(explainQueryCtx.getQueryHandle());
try {
hiveConf.setClassLoader(explainCtx.getConf().getClassLoader());
HiveQueryPlan hqp = new HiveQueryPlan(explainOutput, null, hiveConf, calculateQueryCost(explainCtx));
explainCtx.getDriverContext().setDriverQueryPlan(this, hqp);
return hqp;
} catch (HiveException e) {
throw new LensException("Unable to create hive query plan", e);
}
}
// this is used for tests
int getHiveHandleSize() {
return hiveHandles.size();
}
/*
* (non-Javadoc)
*
* @see
* org.apache.lens.server.api.driver.LensDriver#explainAndPrepare
* (org.apache.lens.server.api.query.PreparedQueryContext)
*/
@Override
public DriverQueryPlan explainAndPrepare(PreparedQueryContext pContext) throws LensException {
DriverQueryPlan plan = explain(pContext);
plan.setPrepareHandle(pContext.getPrepareHandle());
return plan;
}
/*
* (non-Javadoc)
*
* @see org.apache.lens.server.api.driver.LensDriver#prepare(org.apache.lens.server.api.query.PreparedQueryContext)
*/
@Override
public void prepare(PreparedQueryContext pContext) throws LensException {
// NO OP
}
/*
* (non-Javadoc)
*
* @see org.apache.lens.server.api.driver.LensDriver#closePreparedQuery(org.apache.lens.api.query.QueryPrepareHandle)
*/
@Override
public void closePreparedQuery(QueryPrepareHandle handle) throws LensException {
// NO OP
}
/*
* (non-Javadoc)
*
* @see org.apache.lens.server.api.driver.LensDriver#execute(org.apache.lens.server.api.query.QueryContext)
*/
// assuming this is only called for executing explain/insert/set/delete/etc... queries which don't ask to fetch data.
public LensResultSet execute(QueryContext ctx) throws LensException {
OperationHandle op = null;
LensResultSet result = null;
try {
addPersistentPath(ctx);
Configuration qdconf = ctx.getDriverConf(this);
qdconf.set("mapred.job.name", ctx.getQueryHandle().toString());
SessionHandle sessionHandle = getSession(ctx);
op = getClient().executeStatement(sessionHandle, ctx.getSelectedDriverQuery(), qdconf.getValByRegex(".*"));
log.info("The hive operation handle: {}", op);
ctx.setDriverOpHandle(op.toString());
hiveHandles.put(ctx.getQueryHandle(), op);
opHandleToSession.put(op, sessionHandle);
updateStatus(ctx);
OperationStatus status = getClient().getOperationStatus(op);
if (status.getState() == OperationState.ERROR) {
throw new LensException("Unknown error while running query " + ctx.getUserQuery());
}
result = createResultSet(ctx, true);
// close the query immediately if the result is not inmemory result set
if (result == null || !(result instanceof InMemoryResultSet)) {
closeQuery(ctx.getQueryHandle());
}
// remove query handle from hiveHandles even in case of inmemory result set
hiveHandles.remove(ctx.getQueryHandle());
} catch (IOException e) {
throw new LensException("Error adding persistent path", e);
} catch (HiveSQLException hiveErr) {
handleHiveServerError(ctx, hiveErr);
handleHiveSQLException(hiveErr);
} finally {
if (null != op) {
opHandleToSession.remove(op);
}
}
return result;
}
/*
* (non-Javadoc)
*
* @see org.apache.lens.server.api.driver.LensDriver#executeAsync(org.apache.lens.server.api.query.QueryContext)
*/
@Override
public void executeAsync(QueryContext ctx) throws LensException {
try {
addPersistentPath(ctx);
Configuration qdconf = ctx.getDriverConf(this);
qdconf.set("mapred.job.name", ctx.getQueryHandle().toString());
decidePriority(ctx);
SessionHandle sessionHandle = getSession(ctx);
OperationHandle op = getClient().executeStatementAsync(sessionHandle, ctx.getSelectedDriverQuery(),
qdconf.getValByRegex(".*"));
ctx.setDriverOpHandle(op.toString());
log.info("QueryHandle: {} HiveHandle:{}", ctx.getQueryHandle(), op);
hiveHandles.put(ctx.getQueryHandle(), op);
opHandleToSession.put(op, sessionHandle);
} catch (IOException e) {
throw new LensException("Error adding persistent path", e);
} catch (HiveSQLException e) {
handleHiveServerError(ctx, e);
handleHiveSQLException(e);
}
}
private LensException handleHiveSQLException(HiveSQLException ex) throws LensException {
if (ex.getMessage().contains("SemanticException")) {
throw new LensException(SEMANTIC_ERROR.getLensErrorInfo(), ex, ex.getMessage());
}
throw new LensException(DRIVER_ERROR.getLensErrorInfo(), ex, ex.getMessage());
}
private DriverQueryStatus updateDriverStateFromOperationStatus(OperationHandle handle, DriverQueryStatus status)
throws LensException, HiveSQLException, IOException {
if (status == null) {
status = new DriverQueryStatus();
}
OperationStatus opStatus = getClient().getOperationStatus(handle);
log.debug("GetStatus on hiveHandle: {} returned state:", handle, opStatus.getState().name());
switch (opStatus.getState()) {
case CANCELED:
status.setState(DriverQueryState.CANCELED);
status.setStatusMessage("Query has been cancelled!");
break;
case CLOSED:
status.setState(DriverQueryState.CLOSED);
status.setStatusMessage("Query has been closed!");
break;
case ERROR:
status.setState(DriverQueryState.FAILED);
status.setStatusMessage("Query execution failed!");
status.setErrorMessage(
"Query failed with errorCode:" + opStatus.getOperationException().getErrorCode() + " with errorMessage: "
+ opStatus.getOperationException().getMessage());
break;
case FINISHED:
status.setState(DriverQueryState.SUCCESSFUL);
status.setStatusMessage("Query is successful!");
status.setResultSetAvailable(handle.hasResultSet());
break;
case INITIALIZED:
status.setState(DriverQueryState.INITIALIZED);
status.setStatusMessage("Query is initiazed in HiveServer!");
break;
case RUNNING:
status.setState(DriverQueryState.RUNNING);
status.setStatusMessage("Query is running in HiveServer!");
break;
case PENDING:
status.setState(DriverQueryState.PENDING);
status.setStatusMessage("Query is pending in HiveServer");
break;
case UNKNOWN:
default:
throw new LensException("Query is in unknown state at HiveServer");
}
float progress = 0f;
String jsonTaskStatus = opStatus.getTaskStatus();
String errorMsg = null;
if (StringUtils.isNotBlank(jsonTaskStatus)) {
try (ByteArrayInputStream in = new ByteArrayInputStream(jsonTaskStatus.getBytes("UTF-8"))) {
ObjectMapper mapper = new ObjectMapper();
mapper.configure(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES, false);
List<TaskDisplay> taskStatuses = mapper.readValue(in, new TypeReference<List<TaskDisplay>>() {
});
int completedTasks = 0;
StringBuilder errorMessage = new StringBuilder();
for (TaskDisplay taskStat : taskStatuses) {
Task.TaskState tstate = taskStat.taskState;
if (tstate == Task.TaskState.FINISHED) {
completedTasks++;
}
if ((taskStat.getReturnValue() != null && taskStat.getReturnValue() != 0) || taskStat.getErrorMsg() != null) {
appendTaskIds(errorMessage, taskStat);
errorMessage.append(" has failed! ");
}
}
progress = taskStatuses.size() == 0 ? 0 : (float) completedTasks / taskStatuses.size();
errorMsg = errorMessage.toString();
}
} else {
log.warn("Empty task statuses");
}
String error = null;
if (StringUtils.isNotBlank(errorMsg)) {
error = errorMsg;
} else if (status.getState().equals(DriverQueryState.FAILED)) {
error = status.getErrorMessage();
}
status.setErrorMessage(error);
status.setProgressMessage(jsonTaskStatus);
status.setProgress(progress);
status.setDriverStartTime(opStatus.getOperationStarted());
status.setDriverFinishTime(opStatus.getOperationCompleted());
return status;
}
/*
* (non-Javadoc)
*
* @see org.apache.lens.server.api.driver.LensDriver#updateStatus(org.apache.lens.server.api.query.QueryContext)
*/
@Override
public void updateStatus(QueryContext context) throws LensException {
log.debug("GetStatus: {}", context.getQueryHandle());
if (context.getDriverStatus().isFinished()) {
return;
}
OperationHandle hiveHandle = getHiveHandle(context.getQueryHandle());
try {
// Get operation status from hive server
log.debug("GetStatus hiveHandle: {}", hiveHandle);
fetchLogs(hiveHandle);
updateDriverStateFromOperationStatus(hiveHandle, context.getDriverStatus());
} catch (Exception e) {
log.error("Error getting query status", e);
handleHiveServerError(context, e);
throw new LensException("Error getting query status", e);
}
}
private void fetchLogs(OperationHandle opHandle) throws LensException {
try {
for (Object[] o : getClient().fetchResults(opHandle, FetchOrientation.FETCH_NEXT, -1, FetchType.LOG)) {
for (Object logLine : o) {
log.info("Update from hive: " + String.valueOf(logLine));
}
}
} catch (HiveSQLException e) {
log.error("Error fetching hive operation logs for {}", opHandle, e);
}
}
/**
* Append task ids.
*
* @param message the message
* @param taskStat the task stat
*/
private void appendTaskIds(StringBuilder message, TaskDisplay taskStat) {
message.append(taskStat.getTaskId()).append("(");
message.append(taskStat.getTaskType()).append("):");
if (taskStat.getExternalHandle() != null) {
message.append(taskStat.getExternalHandle()).append(":");
}
}
/*
* (non-Javadoc)
*
* @see org.apache.lens.server.api.driver.LensDriver#closeResultSet(org.apache.lens.api.query.QueryHandle)
*/
@Override
public void closeResultSet(QueryHandle handle) throws LensException {
// NO OP ?
}
/*
* (non-Javadoc)
*
* @see org.apache.lens.server.api.driver.LensDriver#closeQuery(org.apache.lens.api.query.QueryHandle)
*/
@Override
public void closeQuery(QueryHandle handle) throws LensException {
if (handle == null) {
return;
}
log.info("CloseQuery: {}", handle);
OperationHandle opHandle = hiveHandles.remove(handle);
if (opHandle != null) {
log.info("CloseQuery hiveHandle: {}", opHandle);
try {
fetchLogs(opHandle);
getClient().closeOperation(opHandle);
} catch (HiveSQLException e) {
checkInvalidOperation(handle, e);
throw new LensException("Unable to close query", e);
} finally {
SessionHandle hiveSession = opHandleToSession.remove(opHandle);
if (null != hiveSession && !opHandleToSession.containsValue(hiveSession)
&& orphanedHiveSessions.contains(hiveSession)) {
orphanedHiveSessions.remove(hiveSession);
try {
getClient().closeSession(hiveSession);
log.info("Closed orphaned hive session : {}", hiveSession.getHandleIdentifier());
} catch (HiveSQLException e) {
log.warn("Error closing orphan hive session : {} ", hiveSession.getHandleIdentifier(), e);
}
}
}
}
}
/*
* (non-Javadoc)
*
* @see org.apache.lens.server.api.driver.LensDriver#cancelQuery(org.apache.lens.api.query.QueryHandle)
*/
@Override
public boolean cancelQuery(QueryHandle handle) throws LensException {
log.info("CancelQuery: {}", handle);
OperationHandle hiveHandle = getHiveHandle(handle);
opHandleToSession.remove(hiveHandle);
try {
log.info("CancelQuery hiveHandle: {}", hiveHandle);
getClient().cancelOperation(hiveHandle);
return true;
} catch (HiveSQLException e) {
checkInvalidOperation(handle, e);
throw new LensException();
}
}
/*
* (non-Javadoc)
*
* @see org.apache.lens.server.api.driver.LensDriver#close()
*/
@Override
public void close() {
log.info("CloseDriver {}", getFullyQualifiedName());
// Close this driver
sessionLock.lock();
lensToHiveSession.clear();
orphanedHiveSessions.clear();
sessionLock.unlock();
}
/**
* Add a listener for driver events.
*
* @param driverEventListener the driver event listener
*/
@Override
public void registerDriverEventListener(LensEventListener<DriverEvent> driverEventListener) {
driverListeners.add(driverEventListener);
}
@Override
public Priority decidePriority(AbstractQueryContext ctx) {
return decidePriority(ctx, queryPriorityDecider);
}
Priority decidePriority(AbstractQueryContext ctx, QueryPriorityDecider queryPriorityDecider) {
if (whetherCalculatePriority && ctx.getPriority() == null) {
try {
// On-demand re-computation of cost, in case it's not alredy set by a previous estimate call.
// In driver test cases, estimate doesn't happen. Hence this code path ensures cost is computed and
// priority is set based on correct cost.
if (ctx.getDriverQueryCost(this) == null) {
ctx.setDriverCost(this, this.estimate(ctx));
}
// Inside try since non-data fetching queries can also be executed by async method.
Priority priority = queryPriorityDecider.decidePriority(ctx.getDriverQueryCost(this));
String priorityStr = priority.toString();
ctx.getDriverConf(this).set("mapred.job.priority", priorityStr);
Map<String, String> confUpdate = new HashMap<>();
confUpdate.put("mapred.job.priority", priorityStr);
ctx.updateConf(confUpdate);
log.info("set priority to {}", priority);
return priority;
} catch (Exception e) {
// not failing query launch when setting priority fails
// priority will be set to usually NORMAL - the default in underlying system.
log.error("could not set priority for lens session id:{} User query: {}", ctx.getLensSessionIdentifier(),
ctx.getUserQuery(), e);
return null;
}
}
return null;
}
protected CLIServiceClient getClient() throws LensException {
if (isEmbedded) {
if (embeddedConnection == null) {
try {
embeddedConnection = connectionClass.newInstance();
embeddedConnection.init(hiveConf, null);
} catch (Exception e) {
throw new LensException(e);
}
log.info("New thrift connection {}", connectionClass);
}
return embeddedConnection.getClient();
} else {
String user = hiveConf.getVar(HiveConf.ConfVars.HIVE_SERVER2_THRIFT_CLIENT_USER);
if (SessionState.get() != null && SessionState.get().getUserName() != null) {
user = SessionState.get().getUserName();
}
String connectionKey = user.toLowerCase() + Thread.currentThread().getId();
ExpirableConnection connection = threadConnections.get(connectionKey);
if (connection == null || connection.isExpired()) {
try {
ThriftConnection tconn = connectionClass.newInstance();
tconn.init(hiveConf, user);
connection = new ExpirableConnection(tconn, connectionExpiryTimeout);
thriftConnExpiryQueue.offer(connection);
threadConnections.put(connectionKey, connection);
log.info("New thrift connection {} for thread: {} for user: {} connection ID={} on driver:{}",
connectionClass, Thread.currentThread().getId(), user, connection.getConnId(), getFullyQualifiedName());
} catch (Exception e) {
throw new LensException(e);
}
} else {
synchronized (thriftConnExpiryQueue) {
thriftConnExpiryQueue.remove(connection);
thriftConnExpiryQueue.offer(connection);
}
}
return connection.getConnection().getClient();
}
}
@Override
protected LensResultSet createResultSet(QueryContext context) throws LensException {
return createResultSet(context, false);
}
/**
* Creates the result set.
*
* @param context the context
* @param closeAfterFetch the close after fetch
* @return the lens result set
* @throws LensException the lens exception
*/
private LensResultSet createResultSet(QueryContext context, boolean closeAfterFetch) throws LensException {
OperationHandle op = getHiveHandle(context.getQueryHandle());
log.info("Creating result set for hiveHandle:{}", op);
try {
if (context.isDriverPersistent()) {
return new HivePersistentResultSet(new Path(context.getDriverResultPath()), op, getClient());
} else if (op.hasResultSet()) {
return new HiveInMemoryResultSet(op, getClient(), closeAfterFetch);
} else {
// queries that do not have result
return null;
}
} catch (HiveSQLException hiveErr) {
handleHiveServerError(context, hiveErr);
throw new LensException("Error creating result set", hiveErr);
}
}
/**
* Adds the persistent path.
*
* @param context the context
* @throws IOException Signals that an I/O exception has occurred.
*/
void addPersistentPath(QueryContext context) throws IOException {
String hiveQuery;
Configuration qdconf = context.getDriverConf(this);
boolean addInsertOverwrite = qdconf.getBoolean(
LensConfConstants.QUERY_ADD_INSERT_OVEWRITE, LensConfConstants.DEFAULT_ADD_INSERT_OVEWRITE);
if (context.isDriverPersistent() && addInsertOverwrite
&& (context.getSelectedDriverQuery().startsWith("SELECT")
|| context.getSelectedDriverQuery().startsWith("select"))) {
// store persistent data into user specified location
// If absent, take default home directory
Path resultSetPath = context.getHDFSResultDir();
// create query
StringBuilder builder = new StringBuilder("INSERT OVERWRITE DIRECTORY ");
context.setDriverResultPath(
resultSetPath.makeQualified(resultSetPath.getFileSystem(context.getConf())).toString());
builder.append('"').append(resultSetPath).append("\" ");
String outputDirFormat = qdconf.get(LensConfConstants.QUERY_OUTPUT_DIRECTORY_FORMAT);
if (outputDirFormat != null) {
builder.append(outputDirFormat);
}
builder.append(' ').append(context.getSelectedDriverQuery()).append(' ');
hiveQuery = builder.toString();
} else {
context.unSetDriverPersistent();
hiveQuery = context.getSelectedDriverQuery();
}
log.info("Hive driver {} query:{}", getFullyQualifiedName(), hiveQuery);
context.setSelectedDriverQuery(hiveQuery);
}
/**
* Gets the session.
*
* @param ctx the ctx
* @return the session
* @throws LensException the lens exception
*/
private SessionHandle getSession(QueryContext ctx) throws LensException {
sessionLock.lock();
try {
String lensSession = ctx.getLensSessionIdentifier();
String sessionDbKey = sessionDbKey(lensSession, ctx.getDatabase());
if (lensSession == null && SessionState.get() != null) {
lensSession = SessionState.get().getSessionId();
}
if (lensSession == null) {
throw new IllegalStateException("Current session state does not have a Lens session id");
}
SessionHandle hiveSession;
if (!lensToHiveSession.containsKey(sessionDbKey)) {
try {
if (ctx.getHiveConf().getVar(HiveConf.ConfVars.HIVE_SERVER2_AUTHENTICATION)
.equals(HiveAuthFactory.AuthTypes.KERBEROS.toString())) {
String user = UserGroupInformation.getLoginUser().getUserName();
hiveSession = getClient().openSession(user, "", SESSION_CONF);
} else {
hiveSession = getClient().openSession(ctx.getClusterUser(), "", SESSION_CONF);
}
lensToHiveSession.put(sessionDbKey, hiveSession);
log.info("New hive session for user: {} , lens session: {} , hive session handle: {} , driver : {}",
ctx.getClusterUser(), sessionDbKey, hiveSession.getHandleIdentifier(), getFullyQualifiedName());
for (LensEventListener<DriverEvent> eventListener : driverListeners) {
try {
eventListener.onEvent(new DriverSessionStarted(System.currentTimeMillis(), this, lensSession, hiveSession
.getSessionId().toString()));
} catch (Exception exc) {
log.error("Error sending driver {} start event to listener {}", getFullyQualifiedName(), eventListener,
exc);
}
}
} catch (Exception e) {
throw new LensException(e);
}
} else {
hiveSession = lensToHiveSession.get(sessionDbKey);
}
return hiveSession;
} finally {
sessionLock.unlock();
}
}
/**
* Gets the hive handle.
*
* @param handle the handle
* @return the hive handle
* @throws LensException the lens exception
*/
private OperationHandle getHiveHandle(QueryHandle handle) throws LensException {
OperationHandle opHandle = hiveHandles.get(handle);
if (opHandle == null) {
throw new LensException("Query not found " + handle);
}
return opHandle;
}
/**
* The Class QueryCompletionNotifier.
*/
private class QueryCompletionNotifier implements Runnable {
/** The poll interval. */
long pollInterval;
/** The hive handle. */
OperationHandle hiveHandle;
/** The timeout millis. */
long timeoutMillis;
/** The listener. */
QueryCompletionListener listener;
/** The handle. */
QueryHandle handle;
/**
* Instantiates a new query completion notifier.
*
* @param handle the handle
* @param timeoutMillis the timeout millis
* @param listener the listener
* @throws LensException the lens exception
*/
QueryCompletionNotifier(QueryHandle handle, long timeoutMillis, QueryCompletionListener listener) {
this.handle = handle;
this.timeoutMillis = timeoutMillis;
this.listener = listener;
this.pollInterval = timeoutMillis / 10;
if (pollInterval < 5000) {
pollInterval = 5000; //minimum poll interval is 5 secs
}
}
/*
* (non-Javadoc)
*
* @see java.lang.Runnable#run()
*/
@Override
public void run() {
// till query is complete or timeout has reached
long timeSpent = 0;
String error;
try {
while (timeSpent <= timeoutMillis) {
try {
hiveHandle = getHiveHandle(handle);
if (isFinished(hiveHandle)) {
listener.onDriverStatusUpdated(handle, updateDriverStateFromOperationStatus(hiveHandle, null));
return;
}
} catch (LensException e) {
log.debug("query handle: {} Not yet launched on driver {}", handle, getFullyQualifiedName());
}
Thread.sleep(pollInterval);
timeSpent += pollInterval;
}
error = "timedout";
} catch (Exception e) {
log.warn("Error while polling for status", e);
error = "error polling";
}
listener.onError(handle, error);
}
/**
* Checks if is finished.
*
* @param hiveHandle the hive handle
* @return true, if is finished
* @throws LensException the lens exception
*/
private boolean isFinished(OperationHandle hiveHandle) throws LensException {
OperationState state;
try {
state = getClient().getOperationStatus(hiveHandle).getState();
} catch (HiveSQLException e) {
throw new LensException("Could not get Status", e);
}
if (state.equals(OperationState.FINISHED) || state.equals(OperationState.CANCELED)
|| state.equals(OperationState.ERROR) || state.equals(OperationState.CLOSED)) {
return true;
}
return false;
}
}
/*
* (non-Javadoc)
*
* @see
* org.apache.lens.server.api.driver.LensDriver#registerForCompletionNotification
* (org.apache.lens.api.query.QueryHandle, long, org.apache.lens.server.api.driver.QueryDriverStatusUpdateListener)
*/
@Override
public void registerForCompletionNotification(
QueryContext context, long timeoutMillis, QueryCompletionListener listener) {
Thread th = new Thread(new QueryCompletionNotifier(context.getQueryHandle(), timeoutMillis, listener));
th.start();
}
/*
* (non-Javadoc)
*
* @see java.io.Externalizable#readExternal(java.io.ObjectInput)
*/
@Override
public void readExternal(ObjectInput in) throws IOException, ClassNotFoundException {
synchronized (hiveHandles) {
int numHiveHnadles = in.readInt();
for (int i = 0; i < numHiveHnadles; i++) {
QueryHandle qhandle = (QueryHandle) in.readObject();
OperationHandle opHandle = new OperationHandle((TOperationHandle) in.readObject());
hiveHandles.put(qhandle, opHandle);
log.debug("Hive driver {} recovered {}:{}", getFullyQualifiedName(), qhandle, opHandle);
}
log.info("Hive driver {} recovered {} queries", getFullyQualifiedName(), hiveHandles.size());
int numSessions = in.readInt();
for (int i = 0; i < numSessions; i++) {
String lensId = in.readUTF();
SessionHandle sHandle = new SessionHandle((TSessionHandle) in.readObject(),
TProtocolVersion.HIVE_CLI_SERVICE_PROTOCOL_V6);
lensToHiveSession.put(lensId, sHandle);
}
log.info("Hive driver {} recovered {} sessions", getFullyQualifiedName(), lensToHiveSession.size());
}
int numOpHandles = in.readInt();
for (int i = 0; i < numOpHandles; i++) {
OperationHandle opHandle = new OperationHandle((TOperationHandle) in.readObject());
SessionHandle sHandle = new SessionHandle((TSessionHandle) in.readObject(),
TProtocolVersion.HIVE_CLI_SERVICE_PROTOCOL_V6);
opHandleToSession.put(opHandle, sHandle);
}
log.info("Hive driver {} recovered {} operation handles", getFullyQualifiedName(), opHandleToSession.size());
int numOrphanedSessions = in.readInt();
for (int i = 0; i < numOrphanedSessions; i++) {
SessionHandle sHandle = new SessionHandle((TSessionHandle) in.readObject(),
TProtocolVersion.HIVE_CLI_SERVICE_PROTOCOL_V6);
orphanedHiveSessions.add(sHandle);
}
log.info("Hive driver {} recovered {} orphaned sessions", getFullyQualifiedName(), orphanedHiveSessions.size());
}
/*
* (non-Javadoc)
*
* @see java.io.Externalizable#writeExternal(java.io.ObjectOutput)
*/
@Override
public void writeExternal(ObjectOutput out) throws IOException {
// Write the query handle to hive handle map to output
synchronized (hiveHandles) {
out.writeInt(hiveHandles.size());
for (Map.Entry<QueryHandle, OperationHandle> entry : hiveHandles.entrySet()) {
out.writeObject(entry.getKey());
out.writeObject(entry.getValue().toTOperationHandle());
log.debug("Hive driver {} persisted {}:{}", getFullyQualifiedName(), entry.getKey(), entry.getValue());
}
log.info("Hive driver {} persisted {} queries ", getFullyQualifiedName(), hiveHandles.size());
out.writeInt(lensToHiveSession.size());
for (Map.Entry<String, SessionHandle> entry : lensToHiveSession.entrySet()) {
out.writeUTF(entry.getKey());
out.writeObject(entry.getValue().toTSessionHandle());
}
log.info("Hive driver {} persisted {} sessions", getFullyQualifiedName(), lensToHiveSession.size());
out.writeInt(opHandleToSession.size());
for (Map.Entry<OperationHandle, SessionHandle> entry : opHandleToSession.entrySet()) {
out.writeObject(entry.getKey().toTOperationHandle());
out.writeObject(entry.getValue().toTSessionHandle());
}
log.info("Hive driver {} persisted {} operation handles", getFullyQualifiedName(), opHandleToSession.size());
out.writeInt(orphanedHiveSessions.size());
for (SessionHandle sessionHandle : orphanedHiveSessions) {
out.writeObject(sessionHandle.toTSessionHandle());
}
log.info("Hive driver {} persisted {} orphaned sessions", getFullyQualifiedName(), orphanedHiveSessions.size());
}
}
/**
* Checks if is session invalid.
*
* @param exc the exc
* @param sessionHandle the session handle
* @return true, if is session invalid
*/
protected boolean isSessionInvalid(HiveSQLException exc, SessionHandle sessionHandle) {
if (exc.getMessage().contains("Invalid SessionHandle") && exc.getMessage().contains(sessionHandle.toString())) {
return true;
}
// Check if there is underlying cause
if (exc.getCause() instanceof HiveSQLException) {
isSessionInvalid((HiveSQLException) exc.getCause(), sessionHandle);
}
return false;
}
/**
* Check invalid session.
*
* @param e the e
*/
protected void checkInvalidSession(Exception e) {
if (!(e instanceof HiveSQLException)) {
return;
}
HiveSQLException exc = (HiveSQLException) e;
String lensSession = null;
if (SessionState.get() != null) {
lensSession = SessionState.get().getSessionId();
}
if (lensSession == null) {
return;
}
// Get all hive sessions corresponding to the lens session and check if
// any of those sessions have become invalid
List<String> sessionKeys = new ArrayList<String>(lensToHiveSession.keySet());
List<SessionHandle> hiveSessionsToCheck = new ArrayList<SessionHandle>();
sessionLock.lock();
try {
for (String key : sessionKeys) {
if (key.startsWith(lensSession)) {
hiveSessionsToCheck.add(lensToHiveSession.get(key));
}
}
} finally {
sessionLock.unlock();
}
for (SessionHandle session : hiveSessionsToCheck) {
if (isSessionInvalid(exc, session)) {
// We have to expire previous session
log.info("{} Hive server session {} for lens session {} has become invalid", getFullyQualifiedName(), session,
lensSession);
sessionLock.lock();
try {
// We should close all connections and clear the session map since
// most likely all sessions are gone
closeAllConnections();
lensToHiveSession.clear();
log.info("{} Cleared all sessions", getFullyQualifiedName());
} finally {
sessionLock.unlock();
}
}
}
}
/**
* Check invalid operation.
*
* @param queryHandle the query handle
* @param exc the exc
*/
protected void checkInvalidOperation(QueryHandle queryHandle, HiveSQLException exc) {
final OperationHandle operation = hiveHandles.get(queryHandle);
if (operation == null) {
log.info("No hive operation available for {}", queryHandle);
return;
}
if (exc.getMessage() != null && exc.getMessage().contains("Invalid OperationHandle:")
&& exc.getMessage().contains(operation.toString())) {
log.info("Hive operation {} for query {} has become invalid", operation, queryHandle);
hiveHandles.remove(queryHandle);
return;
}
if (exc.getCause() instanceof HiveSQLException) {
checkInvalidOperation(queryHandle, (HiveSQLException) exc.getCause());
}
return;
}
/**
* Handle hive server error.
*
* @param ctx the ctx
* @param exc the exc
*/
protected void handleHiveServerError(QueryContext ctx, Exception exc) {
if (exc instanceof HiveSQLException) {
if (ctx != null) {
checkInvalidOperation(ctx.getQueryHandle(), (HiveSQLException) exc);
}
checkInvalidSession((HiveSQLException) exc);
}
}
/**
* Close session.
*
* @param sessionHandle the session handle
*/
public void closeSession(LensSessionHandle sessionHandle) {
String sessionIdentifier = sessionHandle.getPublicId().toString();
sessionLock.lock();
try {
for (String sessionDbKey : new ArrayList<String>(lensToHiveSession.keySet())) {
if (sessionDbKey.startsWith(sessionIdentifier)) {
SessionHandle hiveSession = lensToHiveSession.remove(sessionDbKey);
if (hiveSession != null) {
try {
if (isSessionClosable(hiveSession)) {
getClient().closeSession(hiveSession);
log.info("Closed Hive session {} for lens session {}", hiveSession.getHandleIdentifier(),
sessionDbKey);
} else {
log.info("Skipped closing hive session {} for lens session {} due to active operations",
hiveSession.getHandleIdentifier(), sessionDbKey);
orphanedHiveSessions.add(hiveSession);
}
} catch (Exception e) {
log.error("Error closing hive session {} for lens session {}", hiveSession.getHandleIdentifier(),
sessionDbKey, e);
}
resourcesAddedForSession.remove(hiveSession);
}
}
}
} finally {
sessionLock.unlock();
}
}
private boolean isSessionClosable(SessionHandle hiveSession) {
return !opHandleToSession.containsValue(hiveSession);
}
/**
* Close all connections.
*/
private void closeAllConnections() {
synchronized (thriftConnExpiryQueue) {
for (ExpirableConnection connection : threadConnections.values()) {
try {
connection.getConnection().close();
} catch (Exception ce) {
log.warn("Error closing connection to hive server");
}
}
threadConnections.clear();
}
}
// For test
/**
* Checks for lens session.
*
* @param session the session
* @return true, if successful
*/
public boolean hasLensSession(LensSessionHandle session) {
return lensToHiveSession.containsKey(session.getPublicId().toString());
}
}