blob: ca973b9076f9cd1b35d61a17df5fa7e4bb04c07a [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.lens.driver.jdbc;
import static java.lang.Integer.parseInt;
import static java.util.Arrays.asList;
import static org.apache.lens.driver.jdbc.JDBCDriverConfConstants.*;
import static org.apache.lens.driver.jdbc.JDBCDriverConfConstants.ConnectionPoolProperties.*;
import static com.google.common.base.Preconditions.checkState;
import java.io.IOException;
import java.io.ObjectInput;
import java.io.ObjectOutput;
import java.sql.*;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.*;
import java.util.concurrent.atomic.AtomicInteger;
import org.apache.lens.api.LensConf;
import org.apache.lens.api.query.QueryHandle;
import org.apache.lens.api.query.QueryPrepareHandle;
import org.apache.lens.cube.parse.HQLParser;
import org.apache.lens.cube.query.cost.StaticCostCalculator;
import org.apache.lens.server.api.driver.*;
import org.apache.lens.server.api.driver.DriverQueryStatus.DriverQueryState;
import org.apache.lens.server.api.error.LensDriverErrorCode;
import org.apache.lens.server.api.error.LensException;
import org.apache.lens.server.api.events.LensEventListener;
import org.apache.lens.server.api.metrics.MethodMetricsContext;
import org.apache.lens.server.api.metrics.MethodMetricsFactory;
import org.apache.lens.server.api.query.AbstractQueryContext;
import org.apache.lens.server.api.query.PreparedQueryContext;
import org.apache.lens.server.api.query.QueryContext;
import org.apache.lens.server.api.query.constraint.MaxConcurrentDriverQueriesConstraintFactory;
import org.apache.lens.server.api.query.cost.*;
import org.apache.lens.server.api.query.rewrite.QueryRewriter;
import org.apache.lens.server.api.util.LensUtil;
import org.apache.lens.server.model.LogSegregationContext;
import org.apache.lens.server.model.MappedDiagnosticLogSegregationContext;
import org.apache.commons.lang3.StringUtils;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hive.conf.HiveConf;
import org.apache.hadoop.hive.ql.parse.ASTNode;
import org.apache.hadoop.hive.ql.parse.HiveParser;
import lombok.*;
import lombok.extern.slf4j.Slf4j;
/**
* This driver is responsible for running queries against databases which can be queried using the JDBC API.
*/
@Slf4j
public class JDBCDriver extends AbstractLensDriver {
/** The Constant THID. */
public static final AtomicInteger THID = new AtomicInteger();
/** The connection provider. */
private ConnectionProvider connectionProvider;
/** The configured. */
boolean configured = false;
/** The async query pool. */
private ExecutorService asyncQueryPool;
/** The query context map. */
@Getter
private ConcurrentHashMap<QueryHandle, JdbcQueryContext> queryContextMap;
/** Configuration for estimate connection pool */
private Configuration estimateConf;
/** Estimate connection provider */
private ConnectionProvider estimateConnectionProvider;
private LogSegregationContext logSegregationContext;
private boolean isStatementCancelSupported;
QueryCostCalculator queryCostCalculator;
/**
* Data related to a query submitted to JDBCDriver.
*/
@Data
protected class JdbcQueryContext {
/** The lens context. */
private final QueryContext lensContext;
/** The result future. */
private Future<QueryResult> resultFuture;
/** The rewritten query. */
private String rewrittenQuery;
/** The is prepared. */
private boolean isPrepared;
/** The is closed. */
private boolean isClosed;
/** The query result. */
private QueryResult queryResult;
/**
* Close result.
*/
public void closeResult() {
if (queryResult != null) {
queryResult.close();
}
isClosed = true;
}
public boolean cancel() {
boolean ret;
log.debug("Canceling resultFuture object");
ret = resultFuture.cancel(true);
log.debug("Done resultFuture cancel!");
// queryResult object would be null if query is not yet launched - since we did future.cancel, no other cancel is
// required.
if (queryResult != null && queryResult.stmt != null && isStatementCancelSupported) {
log.debug("Cancelling query through statement cancel");
try {
queryResult.stmt.cancel();
log.debug("Done statement cancel!");
ret = true;
} catch (SQLFeatureNotSupportedException se) {
log.warn("Statement cancel not supported", se);
} catch(SQLException e) {
log.warn("Statement cancel failed", e);
ret = false;
}
}
return ret;
}
public String getQueryHandleString() {
return this.lensContext.getQueryHandleString();
}
}
/**
* Result of a query and associated resources like statement and connection. After the results are consumed, close()
* should be called to close the statement and connection
*/
protected class QueryResult {
/** The result set. */
private ResultSet resultSet;
/** The error. */
private Throwable error;
/** The conn. */
private Connection conn;
/** The stmt. */
private Statement stmt;
/** The is closed. */
private boolean isClosed;
/** The lens result set. */
private InMemoryResultSet lensResultSet;
/**
* Close.
*/
protected synchronized void close() {
if (isClosed) {
return;
}
try {
if (stmt != null) {
try {
stmt.close();
} catch (SQLException e) {
log.error("Error closing SQL statement", e);
}
}
} finally {
if (conn != null) {
try {
conn.close();
} catch (SQLException e) {
log.error("Error closing SQL Connection", e);
}
}
}
isClosed = true;
}
/**
* Gets the lens result set.
*
* @param closeAfterFetch the close after fetch
* @return the lens result set
* @throws LensException the lens exception
*/
protected synchronized LensResultSet getLensResultSet(boolean closeAfterFetch) throws LensException {
if (error != null) {
throw new LensException("Query failed!", error);
}
if (lensResultSet == null) {
lensResultSet = new JDBCResultSet(this, resultSet, closeAfterFetch);
}
return lensResultSet;
}
}
/**
* Callabled that returns query result after running the query. This is used for async queries.
*/
protected class QueryCallable implements Callable<QueryResult> {
/** The query context. */
private final JdbcQueryContext queryContext;
private final LogSegregationContext logSegregationContext;
/**
* Instantiates a new query callable.
*
* @param queryContext the query context
*/
public QueryCallable(JdbcQueryContext queryContext, @NonNull LogSegregationContext logSegregationContext) {
this.queryContext = queryContext;
this.logSegregationContext = logSegregationContext;
queryContext.getLensContext().setDriverStatus(DriverQueryState.INITIALIZED);
}
/*
* (non-Javadoc)
*
* @see java.util.concurrent.Callable#call()
*/
@Override
public QueryResult call() {
logSegregationContext.setLogSegragationAndQueryId(this.queryContext.getQueryHandleString());
queryContext.getLensContext().setDriverStatus(DriverQueryState.RUNNING);
Statement stmt;
Connection conn = null;
QueryResult result = new QueryResult();
try {
queryContext.setQueryResult(result);
try {
conn = getConnection();
result.conn = conn;
} catch (LensException e) {
log.error("Error obtaining connection: ", e);
result.error = e;
}
if (conn != null) {
try {
stmt = createStatement(conn);
result.stmt = stmt;
Boolean isResultAvailable = stmt.execute(queryContext.getRewrittenQuery());
if (queryContext.getLensContext().getDriverStatus().isCanceled()) {
return result;
}
if (isResultAvailable) {
result.resultSet = stmt.getResultSet();
}
queryContext.getLensContext().getDriverStatus().setResultSetAvailable(isResultAvailable);
queryContext.getLensContext().setDriverStatus(DriverQueryState.SUCCESSFUL);
} catch (Exception e) {
if (queryContext.getLensContext().getDriverStatus().isCanceled()) {
return result;
}
if (queryContext.isClosed()) {
log.info("Ignored exception on already closed query : {} - {}",
queryContext.getLensContext().getQueryHandle(), e.getMessage(), e);
} else {
log.error("Error executing SQL query: {} reason: {}", queryContext.getLensContext().getQueryHandle(),
e.getMessage(), e);
result.error = e;
queryContext.getLensContext().setDriverStatus(DriverQueryState.FAILED, e.getMessage());
// Close connection in case of failed queries. For successful queries, connection is closed
// When result set is closed or driver.closeQuery is called
result.close();
}
}
}
} finally {
Long endTime = queryContext.getLensContext().getDriverStatus().getDriverFinishTime();
if (endTime == null || endTime <= 0) {
queryContext.getLensContext().getDriverStatus().setDriverFinishTime(System.currentTimeMillis());
}
}
return result;
}
/**
* Create statement used to issue the query
*
* @param conn pre created SQL Connection object
* @return statement
* @throws SQLException the SQL exception
*/
public Statement createStatement(Connection conn) throws SQLException {
Statement stmt;
boolean enabledRowRetrieval = queryContext.getLensContext().getSelectedDriverConf().getBoolean(
JDBCDriverConfConstants.JDBC_ENABLE_RESULTSET_STREAMING_RETRIEVAL,
JDBCDriverConfConstants.DEFAULT_JDBC_ENABLE_RESULTSET_STREAMING_RETRIEVAL);
if (enabledRowRetrieval) {
log.info("JDBC streaming retrieval is enabled for {}", queryContext.getLensContext().getQueryHandle());
if (queryContext.isPrepared()) {
stmt = conn.prepareStatement(queryContext.getRewrittenQuery(),
ResultSet.TYPE_FORWARD_ONLY, ResultSet.CONCUR_READ_ONLY);
} else {
stmt = conn.createStatement(ResultSet.TYPE_FORWARD_ONLY, ResultSet.CONCUR_READ_ONLY);
}
stmt.setFetchSize(Integer.MIN_VALUE);
} else {
stmt = queryContext.isPrepared() ? conn.prepareStatement(queryContext.getRewrittenQuery())
: conn.createStatement();
// Get default fetch size from conf if not overridden in query conf
int fetchSize = queryContext.getLensContext().getSelectedDriverConf().getInt(
JDBCDriverConfConstants.JDBC_FETCH_SIZE, JDBCDriverConfConstants.DEFAULT_JDBC_FETCH_SIZE);
stmt.setFetchSize(fetchSize);
}
stmt.setFetchDirection(ResultSet.FETCH_FORWARD);
return stmt;
}
}
/**
* The Class DummyQueryRewriter.
*/
public static class DummyQueryRewriter implements QueryRewriter {
/*
* (non-Javadoc)
*
* @see org.apache.lens.server.api.query.QueryRewriter#rewrite
* (java.lang.String, org.apache.hadoop.conf.Configuration)
*/
@Override
public String rewrite(String query, Configuration queryConf, HiveConf metastoreConf) throws LensException {
return query;
}
@Override
public void init(Configuration rewriteConf) {
}
}
/*
* (non-Javadoc)
*
* @see org.apache.lens.server.api.driver.LensDriver#configure(org.apache.hadoop.conf.Configuration)
*/
@Override
public void configure(Configuration conf, String driverType, String driverName) throws LensException {
super.configure(conf, driverType, driverName);
init();
configured = true;
Class<? extends QueryCostCalculator> queryCostCalculatorClass = getConf().getClass(JDBC_COST_CALCULATOR,
StaticCostCalculator.class, QueryCostCalculator.class);
try {
queryCostCalculator = queryCostCalculatorClass.newInstance();
} catch (InstantiationException | IllegalAccessException e) {
throw new LensException("Can't instantiate query cost calculator of class: " + queryCostCalculatorClass, e);
}
//For initializing the decider class instance
queryCostCalculator.init(this);
log.info("JDBC Driver {} configured", getFullyQualifiedName());
}
/**
* Inits the.
*
* @throws LensException the lens exception
*/
public void init() throws LensException {
final int maxPoolSize = parseInt(getConf().get(JDBC_POOL_MAX_SIZE.getConfigKey()));
final int maxConcurrentQueries
= parseInt(getConf().get(MaxConcurrentDriverQueriesConstraintFactory.MAX_CONCURRENT_QUERIES_KEY));
checkState(maxPoolSize >= maxConcurrentQueries, "maxPoolSize:" + maxPoolSize + " maxConcurrentQueries:"
+ maxConcurrentQueries);
queryContextMap = new ConcurrentHashMap<>();
asyncQueryPool = Executors.newCachedThreadPool(new ThreadFactory() {
@Override
public Thread newThread(Runnable runnable) {
Thread th = new Thread(runnable);
th.setName("lens-driver-jdbc-" + THID.incrementAndGet());
return th;
}
});
Class<? extends ConnectionProvider> cpClass = getConf().getClass(JDBC_CONNECTION_PROVIDER,
DataSourceConnectionProvider.class, ConnectionProvider.class);
try {
connectionProvider = cpClass.newInstance();
estimateConnectionProvider = cpClass.newInstance();
} catch (Exception e) {
log.error("Error initializing connection provider: ", e);
throw new LensException(e);
}
this.logSegregationContext = new MappedDiagnosticLogSegregationContext();
this.isStatementCancelSupported = getConf().getBoolean(STATEMENT_CANCEL_SUPPORTED,
DEFAULT_STATEMENT_CANCEL_SUPPORTED);
}
public QueryCost calculateQueryCost(AbstractQueryContext qctx) throws LensException {
return queryCostCalculator.calculateCost(qctx, this);
}
/**
* Check configured.
*
* @throws IllegalStateException the illegal state exception
*/
protected void checkConfigured() throws IllegalStateException {
if (!configured) {
throw new IllegalStateException("JDBC Driver is not configured!");
}
}
protected synchronized Connection getConnection() throws LensException {
try {
// Add here to cover the path when the queries are executed it does not
// use the driver conf
return connectionProvider.getConnection(getConf());
} catch (SQLException e) {
throw new LensException(e);
}
}
/**
* Gets the query rewriter.
*
* @return the query rewriter
* @throws LensException the lens exception
*/
protected QueryRewriter getQueryRewriter() throws LensException {
QueryRewriter rewriter;
Class<? extends QueryRewriter> queryRewriterClass = getConf().getClass(JDBC_QUERY_REWRITER_CLASS,
DummyQueryRewriter.class, QueryRewriter.class);
try {
rewriter = queryRewriterClass.newInstance();
log.info("{} Initialized :{}", getFullyQualifiedName(), queryRewriterClass);
} catch (Exception e) {
log.error("{} Unable to create rewriter object", getFullyQualifiedName(), e);
throw new LensException(e);
}
rewriter.init(getConf());
return rewriter;
}
/**
* Gets the query context.
*
* @param handle the handle
* @return the query context
* @throws LensException the lens exception
*/
protected JdbcQueryContext getQueryContext(QueryHandle handle) throws LensException {
JdbcQueryContext ctx = queryContextMap.get(handle);
if (ctx == null) {
throw new LensException("Query not found:" + handle.getHandleId());
}
return ctx;
}
/**
* Rewrite query.
*
* @return the string
* @throws LensException the lens exception
*/
protected String rewriteQuery(AbstractQueryContext ctx) throws LensException {
if (ctx.getFinalDriverQuery(this) != null) {
return ctx.getFinalDriverQuery(this);
}
String query = ctx.getDriverQuery(this);
Configuration driverQueryConf = ctx.getDriverConf(this);
MethodMetricsContext checkForAllowedQuery = MethodMetricsFactory.createMethodGauge(driverQueryConf, true,
CHECK_ALLOWED_QUERY);
// check if it is select query
ASTNode ast = HQLParser.parseHQL(query, ctx.getHiveConf());
if (ast.getToken().getType() != HiveParser.TOK_QUERY) {
throw new LensException("Not allowed statement:" + query);
} else {
// check for insert clause
ASTNode dest = HQLParser.findNodeByPath(ast, HiveParser.TOK_INSERT);
if (dest != null
&& ((ASTNode) (dest.getChild(0).getChild(0).getChild(0))).getToken().getType() != HiveParser.TOK_TMP_FILE) {
throw new LensException("Not allowed statement:" + query);
}
}
checkForAllowedQuery.markSuccess();
QueryRewriter rewriter = getQueryRewriter();
String rewrittenQuery = rewriter.rewrite(query, driverQueryConf, ctx.getHiveConf());
ctx.setFinalDriverQuery(this, rewrittenQuery);
return rewrittenQuery;
}
/**
* Dummy JDBC query Plan class to get min cost selector working.
*/
private static class JDBCQueryPlan extends DriverQueryPlan {
@Getter
private final QueryCost cost;
JDBCQueryPlan(QueryCost cost){
this.cost = cost;
}
@Override
public String getPlan() {
return "";
}
}
private static final String VALIDATE_GAUGE = "validate-thru-prepare";
private static final String COLUMNAR_SQL_REWRITE_GAUGE = "columnar-sql-rewrite";
private static final String JDBC_PREPARE_GAUGE = "jdbc-prepare-statement";
private static final String CHECK_ALLOWED_QUERY = "jdbc-check-allowed-query";
@Override
public QueryCost estimate(AbstractQueryContext qctx) throws LensException {
String rewrittenQuery = rewriteQuery(qctx);
qctx.setSelectedDriverQuery(rewrittenQuery);
MethodMetricsContext validateGauge = MethodMetricsFactory.createMethodGauge(qctx.getDriverConf(this), true,
VALIDATE_GAUGE);
validate(qctx);
validateGauge.markSuccess();
return calculateQueryCost(qctx);
}
/**
* Explain the given query.
*
* @param explainCtx The explain context
* @return The query plan object;
* @throws LensException the lens exception
*/
@Override
public DriverQueryPlan explain(AbstractQueryContext explainCtx) throws LensException {
if (explainCtx.getDriverQuery(this) == null) {
throw new NullPointerException("Null driver query for " + explainCtx.getUserQuery());
}
if (explainCtx.getDriverContext().getDriverQueryPlan(this) != null) {
// explain called again and again
return explainCtx.getDriverContext().getDriverQueryPlan(this);
}
checkConfigured();
String explainQuery;
String rewrittenQuery = rewriteQuery(explainCtx);
Configuration explainConf = explainCtx.getDriverConf(this);
String explainKeyword = explainConf.get(JDBC_EXPLAIN_KEYWORD_PARAM,
DEFAULT_JDBC_EXPLAIN_KEYWORD);
boolean explainBeforeSelect = explainConf.getBoolean(JDBC_EXPLAIN_KEYWORD_BEFORE_SELECT,
DEFAULT_JDBC_EXPLAIN_KEYWORD_BEFORE_SELECT);
if (explainBeforeSelect) {
explainQuery = explainKeyword + " " + rewrittenQuery;
} else {
explainQuery = rewrittenQuery.replaceAll("select ", "select "
+ explainKeyword + " ");
}
log.info("{} Explain Query : {}", getFullyQualifiedName(), explainQuery);
QueryContext explainQueryCtx = QueryContext.createContextWithSingleDriver(explainQuery, null,
new LensConf(), explainConf, this, explainCtx.getLensSessionIdentifier(), false);
QueryResult result = null;
try {
result = executeInternal(explainQueryCtx, explainQuery);
if (result.error != null) {
throw new LensException("Query explain failed!", result.error);
}
} finally {
if (result != null) {
result.close();
}
}
JDBCQueryPlan jqp = new JDBCQueryPlan(calculateQueryCost(explainCtx));
explainCtx.getDriverContext().setDriverQueryPlan(this, jqp);
return jqp;
}
/**
* Validate query using prepare
*
* @param pContext context to validate
* @throws LensException
*/
public void validate(AbstractQueryContext pContext) throws LensException {
if (pContext.getDriverQuery(this) == null) {
throw new NullPointerException("Null driver query for " + pContext.getUserQuery());
}
boolean validateThroughPrepare = pContext.getDriverConf(this).getBoolean(JDBC_VALIDATE_THROUGH_PREPARE,
DEFAULT_JDBC_VALIDATE_THROUGH_PREPARE);
if (validateThroughPrepare) {
PreparedStatement stmt;
// Estimate queries need to get connection from estimate pool to make sure
// we are not blocked by data queries.
stmt = prepareInternal(pContext, true, true, "validate-");
if (stmt != null) {
try {
stmt.close();
} catch (SQLException e) {
throw new LensException();
}
}
}
}
// Get key used for estimate key config
protected String getEstimateKey(String jdbcKey) {
return JDBC_DRIVER_PFX + "estimate." + jdbcKey.substring(JDBC_DRIVER_PFX.length());
}
// If any 'key' in 'keys' is set in conf, return its value.
private static String getKeyOrFallBack(Configuration conf, String... keys) {
for (String key : keys) {
String val = conf.get(key);
if (StringUtils.isNotBlank(val)) {
return val;
}
}
return null;
}
// Get connection config used by estimate pool.
protected final Configuration getEstimateConnectionConf() {
if (estimateConf == null) {
Configuration tmpConf = new Configuration(getConf());
// Override JDBC settings in estimate conf, if set by user explicitly. Otherwise fall back to default JDBC pool
// config
for (String key : asList(JDBC_CONNECTION_PROPERTIES, JDBC_DB_URI, JDBC_DRIVER_CLASS, JDBC_USER, JDBC_PASSWORD,
JDBC_POOL_MAX_SIZE.getConfigKey(), JDBC_POOL_IDLE_TIME.getConfigKey(),
JDBC_MAX_IDLE_TIME_EXCESS_CONNECTIONS.getConfigKey(),
JDBC_MAX_STATEMENTS_PER_CONNECTION.getConfigKey(), JDBC_GET_CONNECTION_TIMEOUT.getConfigKey())) {
String val = getKeyOrFallBack(tmpConf, getEstimateKey(key), key);
if (val != null) {
tmpConf.set(key, val);
}
}
/* We need to set password as empty string if it is not provided. Setting null on conf is not allowed */
if (tmpConf.get(JDBC_PASSWORD) == null) {
tmpConf.set(JDBC_PASSWORD, "");
}
estimateConf = tmpConf;
}
return estimateConf;
}
protected final Connection getEstimateConnection() throws SQLException {
return estimateConnectionProvider.getConnection(getEstimateConnectionConf());
}
// For tests
protected final ConnectionProvider getEstimateConnectionProvider() {
return estimateConnectionProvider;
}
// For tests
protected final ConnectionProvider getConnectionProvider() {
return connectionProvider;
}
private final Map<QueryPrepareHandle, PreparedStatement> preparedQueries = new HashMap<>();
/**
* Internally prepare the query
*
* @param pContext prepare context
* @return prepared statement of the query
* @throws LensException
*/
private PreparedStatement prepareInternal(AbstractQueryContext pContext) throws LensException {
if (pContext.getDriverQuery(this) == null) {
throw new NullPointerException("Null driver query for " + pContext.getUserQuery());
}
checkConfigured();
return prepareInternal(pContext, false, false, "prepare-");
}
/**
* Prepare statment on the database server
* @param pContext query context
* @param calledForEstimate set this to true if this call will use the estimate connection pool
* @param checkConfigured set this to true if this call needs to check whether JDBC driver is configured
* @param metricCallStack stack for metrics API
* @return prepared statement
* @throws LensException
*/
private PreparedStatement prepareInternal(AbstractQueryContext pContext,
boolean calledForEstimate,
boolean checkConfigured,
String metricCallStack) throws LensException {
// Caller might have already verified configured status and driver query, so we don't have
// to do this check twice. Caller must set checkConfigured to false in that case.
if (checkConfigured) {
if (pContext.getDriverQuery(this) == null) {
throw new NullPointerException("Null driver query for " + pContext.getUserQuery());
}
checkConfigured();
}
// Only create a prepared statement and then close it
MethodMetricsContext sqlRewriteGauge = MethodMetricsFactory.createMethodGauge(pContext.getDriverConf(this), true,
metricCallStack + COLUMNAR_SQL_REWRITE_GAUGE);
String rewrittenQuery = pContext.getSelectedDriverQuery();
sqlRewriteGauge.markSuccess();
MethodMetricsContext jdbcPrepareGauge = MethodMetricsFactory.createMethodGauge(pContext.getDriverConf(this), true,
metricCallStack + JDBC_PREPARE_GAUGE);
PreparedStatement stmt = null;
Connection conn = null;
try {
conn = calledForEstimate ? getEstimateConnection() : getConnection();
stmt = conn.prepareStatement(rewrittenQuery);
if (!pContext.getDriverConf(this).getBoolean(JDBC_VALIDATE_SKIP_WARNINGS,
DEFAULT_JDBC_VALIDATE_SKIP_WARNINGS) && stmt.getWarnings() != null) {
throw new LensException(stmt.getWarnings());
}
} catch (SQLException sql) {
handleJDBCSQLException(sql);
} finally {
if (conn != null) {
try {
conn.close();
} catch (SQLException e) {
log.error("Error closing connection: {}", rewrittenQuery, e);
}
}
jdbcPrepareGauge.markSuccess();
}
log.info("Prepared: {}", rewrittenQuery);
return stmt;
}
/**
* Handle sql exception
*
* @param sqlex SQLException
* @throws LensException
*/
private LensException handleJDBCSQLException(SQLException sqlex) throws LensException {
String cause = LensUtil.getCauseMessage(sqlex);
if (getSqlSynataxExceptions(sqlex).contains("SyntaxError")) {
throw new LensException(LensDriverErrorCode.SEMANTIC_ERROR.getLensErrorInfo(), sqlex, cause);
}
throw new LensException(LensDriverErrorCode.DRIVER_ERROR.getLensErrorInfo(), sqlex, cause);
}
private String getSqlSynataxExceptions(Throwable e) {
String exp = null;
if (e.getCause() != null) {
exp = e.getClass() + getSqlSynataxExceptions(e.getCause());
}
return exp;
}
/**
* Prepare the given query.
*
* @param pContext
* the context
* @throws LensException
* the lens exception
*/
@Override
public void prepare(PreparedQueryContext pContext) throws LensException {
if (preparedQueries.containsKey(pContext.getPrepareHandle())) {
// already prepared
return;
}
PreparedStatement stmt = prepareInternal(pContext);
if (stmt != null) {
preparedQueries.put(pContext.getPrepareHandle(), stmt);
}
}
/**
* Explain and prepare the given query.
*
* @param pContext the context
* @return The query plan object;
* @throws LensException the lens exception
*/
@Override
public DriverQueryPlan explainAndPrepare(PreparedQueryContext pContext) throws LensException {
checkConfigured();
prepare(pContext);
return new JDBCQueryPlan(calculateQueryCost(pContext));
}
/**
* Close the prepare query specified by the prepared handle, releases all the resources held by the prepared query.
*
* @param handle The query handle
* @throws LensException the lens exception
*/
@Override
public void closePreparedQuery(QueryPrepareHandle handle) throws LensException {
checkConfigured();
try {
if (preparedQueries.get(handle) != null) {
preparedQueries.get(handle).close();
}
} catch (SQLException e) {
throw new LensException(e);
}
}
/**
* Blocking execute of the query.
*
* @param context the context
* @return returns the result set
* @throws LensException the lens exception
*/
@Override
public LensResultSet execute(QueryContext context) throws LensException {
checkConfigured();
String rewrittenQuery = rewriteQuery(context);
log.info("{} Execute {}", getFullyQualifiedName(), context.getQueryHandle());
QueryResult result = executeInternal(context, rewrittenQuery);
return result.getLensResultSet(true);
}
/**
* Internally executing query.
*
* @param context the context
* @param rewrittenQuery the rewritten query
* @return returns the result set
* @throws LensException the lens exception
*/
private QueryResult executeInternal(QueryContext context, String rewrittenQuery) throws LensException {
JdbcQueryContext queryContext = new JdbcQueryContext(context);
queryContext.setPrepared(false);
queryContext.setRewrittenQuery(rewrittenQuery);
return new QueryCallable(queryContext, logSegregationContext).call();
}
/**
* Asynchronously execute the query.
*
* @param context The query context
* @throws LensException the lens exception
*/
@Override
public void executeAsync(QueryContext context) throws LensException {
checkConfigured();
// Always use the driver rewritten query not user query. Since the
// conf we are passing here is query context conf, we need to add jdbc xml in resource path
String rewrittenQuery = rewriteQuery(context);
JdbcQueryContext jdbcCtx = new JdbcQueryContext(context);
jdbcCtx.setRewrittenQuery(rewrittenQuery);
try {
Future<QueryResult> future = asyncQueryPool.submit(new QueryCallable(jdbcCtx, logSegregationContext));
jdbcCtx.setResultFuture(future);
} catch (RejectedExecutionException e) {
log.error("Query execution rejected: {} reason:{}", context.getQueryHandle(), e.getMessage(), e);
throw new LensException("Query execution rejected: " + context.getQueryHandle() + " reason:" + e.getMessage(), e);
}
queryContextMap.put(context.getQueryHandle(), jdbcCtx);
log.info("{} ExecuteAsync: {}", getFullyQualifiedName(), context.getQueryHandle());
}
/**
* Get status of the query, specified by the handle.
*
* @param context The query handle
* @throws LensException the lens exception
*/
@Override
public void updateStatus(QueryContext context) throws LensException {
checkConfigured();
JdbcQueryContext ctx = getQueryContext(context.getQueryHandle());
if (ctx.getLensContext().getDriverStatus().isFinished()) {
// terminal state. No updates can be done.
return;
}
if (ctx.getResultFuture().isCancelled()) {
if (!context.getDriverStatus().isCanceled()) {
context.getDriverStatus().setProgress(1.0);
context.getDriverStatus().setState(DriverQueryState.CANCELED);
context.getDriverStatus().setStatusMessage("Query Canceled");
}
} else if (ctx.getResultFuture().isDone()) {
context.getDriverStatus().setProgress(1.0);
// Since future is already done, this call should not block
if (ctx.getQueryResult() != null && ctx.getQueryResult().error != null) {
if (!context.getDriverStatus().isFailed()) {
context.getDriverStatus().setState(DriverQueryState.FAILED);
context.getDriverStatus().setStatusMessage("Query execution failed!");
context.getDriverStatus().setErrorMessage(ctx.getQueryResult().error.getMessage());
}
} else {
if (!context.getDriverStatus().isFinished()) {
// assuming successful
context.getDriverStatus().setState(DriverQueryState.SUCCESSFUL);
context.getDriverStatus().setStatusMessage(context.getQueryHandle() + " successful");
context.getDriverStatus().setResultSetAvailable(true);
}
}
} else {
if (!context.getDriverStatus().isRunning()) {
context.getDriverStatus().setState(DriverQueryState.RUNNING);
context.getDriverStatus().setStatusMessage(context.getQueryHandle() + " is running");
}
}
}
@Override
protected LensResultSet createResultSet(QueryContext ctx) throws LensException {
checkConfigured();
return getQueryContext(ctx.getQueryHandle()).getQueryResult().getLensResultSet(true);
}
/**
* Close the resultset for the query.
*
* @param handle The query handle
* @throws LensException the lens exception
*/
@Override
public void closeResultSet(QueryHandle handle) throws LensException {
checkConfigured();
getQueryContext(handle).closeResult();
}
/**
* Cancel the execution of the query, specified by the handle.
*
* @param handle The query handle.
* @return true if cancel was successful, false otherwise
* @throws LensException the lens exception
*/
@Override
public boolean cancelQuery(QueryHandle handle) throws LensException {
checkConfigured();
JdbcQueryContext context = getQueryContext(handle);
log.info("{} cancel request on query {}", getFullyQualifiedName(), handle);
boolean cancelResult = context.cancel();
if (cancelResult) {
context.getLensContext().setDriverStatus(DriverQueryState.CANCELED);
context.closeResult();
log.info("{} Canceled query : {}", getFullyQualifiedName(), handle);
}
return cancelResult;
}
/**
* Close the query specified by the handle, releases all the resources held by the query.
*
* @param handle The query handle
* @throws LensException the lens exception
*/
@Override
public void closeQuery(QueryHandle handle) throws LensException {
checkConfigured();
try {
JdbcQueryContext ctx = getQueryContext(handle);
if (ctx != null) {
ctx.getResultFuture().cancel(true);
ctx.closeResult();
}
} catch (LensException exc) {
log.error("{} Failed to close query {}", getFullyQualifiedName(), handle.getHandleId());
} finally {
queryContextMap.remove(handle);
}
log.info("{} Closed query {}", getFullyQualifiedName(), handle.getHandleId());
}
/**
* Close the driver, releasing all resouces used up by the driver.
*
* @throws LensException the lens exception
*/
@Override
public void close() throws LensException {
checkConfigured();
try {
for (QueryHandle query : new ArrayList<>(queryContextMap.keySet())) {
try {
closeQuery(query);
} catch (LensException e) {
log.warn("{} Error closing query : {}", getFullyQualifiedName(), query.getHandleId(), e);
}
}
for (QueryPrepareHandle query : preparedQueries.keySet()) {
try {
try {
preparedQueries.get(query).close();
} catch (SQLException e) {
throw new LensException();
}
} catch (LensException e) {
log.warn("{} Error closing prapared query : {}", getFullyQualifiedName(), query, e);
}
}
} finally {
queryContextMap.clear();
}
}
/**
* Add a listener for driver events.
*
* @param driverEventListener the driver event listener
*/
@Override
public void registerDriverEventListener(LensEventListener<DriverEvent> driverEventListener) {
}
/*
* (non-Javadoc)
*
* @see java.io.Externalizable#readExternal(java.io.ObjectInput)
*/
@Override
public void readExternal(ObjectInput arg0) throws IOException, ClassNotFoundException {
// TODO Auto-generated method stub
}
/*
* (non-Javadoc)
*
* @see java.io.Externalizable#writeExternal(java.io.ObjectOutput)
*/
@Override
public void writeExternal(ObjectOutput arg0) throws IOException {
// TODO Auto-generated method stub
}
@Override
public StatusUpdateMethod getStatusUpdateMethod() {
return StatusUpdateMethod.PUSH;
}
}