blob: 081edc0d0aeb1868126620862295981698efa0c2 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.kylin.query.engine;
import static org.apache.kylin.query.relnode.OLAPContext.clearThreadLocalContexts;
import static org.apache.kylin.query.util.DefaultQueryTransformer.S0;
import java.sql.PreparedStatement;
import java.sql.SQLException;
import java.sql.Time;
import java.sql.Timestamp;
import java.util.HashMap;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.concurrent.Semaphore;
import java.util.concurrent.TimeUnit;
import java.util.regex.Pattern;
import org.apache.calcite.avatica.ColumnMetaData;
import org.apache.calcite.prepare.CalcitePrepareImpl;
import org.apache.kylin.common.KapConfig;
import org.apache.kylin.common.KylinConfig;
import org.apache.kylin.common.QueryContext;
import org.apache.kylin.common.QueryTrace;
import org.apache.kylin.common.debug.BackdoorToggles;
import org.apache.kylin.common.exception.CalciteNotSupportException;
import org.apache.kylin.common.exception.KylinException;
import org.apache.kylin.common.exception.NewQueryRefuseException;
import org.apache.kylin.common.exception.TargetSegmentNotFoundException;
import org.apache.kylin.common.persistence.transaction.TransactionException;
import org.apache.kylin.common.persistence.transaction.UnitOfWork;
import org.apache.kylin.common.persistence.transaction.UnitOfWorkParams;
import org.apache.kylin.common.util.DBUtils;
import org.apache.kylin.guava30.shaded.common.annotations.VisibleForTesting;
import org.apache.kylin.guava30.shaded.common.collect.Lists;
import org.apache.kylin.metadata.project.NProjectLoader;
import org.apache.kylin.metadata.project.NProjectManager;
import org.apache.kylin.metadata.query.NativeQueryRealization;
import org.apache.kylin.metadata.query.StructField;
import org.apache.kylin.metadata.querymeta.SelectedColumnMeta;
import org.apache.kylin.metadata.realization.NoStreamingRealizationFoundException;
import org.apache.kylin.query.engine.data.QueryResult;
import org.apache.kylin.query.exception.BusyQueryException;
import org.apache.kylin.query.exception.NotSupportedSQLException;
import org.apache.kylin.query.mask.QueryResultMasks;
import org.apache.kylin.query.relnode.OLAPContext;
import org.apache.kylin.query.util.PushDownQueryRequestLimits;
import org.apache.kylin.query.util.PushDownUtil;
import org.apache.kylin.query.util.QueryInterruptChecker;
import org.apache.kylin.query.util.QueryParams;
import org.apache.kylin.query.util.QueryUtil;
import org.apache.kylin.source.adhocquery.PushdownResult;
import org.apache.spark.SparkException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import lombok.val;
public class QueryRoutingEngine {
private static final Logger logger = LoggerFactory.getLogger(QueryRoutingEngine.class);
// reference org.apache.spark.deploy.yarn.YarnAllocator.memLimitExceededLogMessage
public static final String SPARK_MEM_LIMIT_EXCEEDED = "Container killed by YARN for exceeding memory limits";
public static final String SPARK_JOB_FAILED = "Job aborted due to stage failure";
private static final Pattern PTN_SUM_LC = Pattern.compile(
S0 + "\\bSUM_LC" + S0 + "[(]" + S0 + ".*\\.?.*" + S0 + "[,]" + S0 + ".*\\.?.*" + S0 + "[)]" + S0,
Pattern.CASE_INSENSITIVE);
public QueryResult queryWithSqlMassage(QueryParams queryParams) throws Exception {
QueryContext.current().setAclInfo(queryParams.getAclInfo());
KylinConfig projectKylinConfig = NProjectManager.getProjectConfig(queryParams.getProject());
QueryExec queryExec = new QueryExec(queryParams.getProject(), projectKylinConfig, true);
queryParams.setDefaultSchema(queryExec.getDefaultSchemaName());
if (queryParams.isForcedToPushDown()) {
checkContainsSumLC(queryParams);
return pushDownQuery(null, queryParams);
}
try {
return doTransactionEnabled(() -> {
if (projectKylinConfig.enableReplaceDynamicParams() && queryParams.isPrepareStatementWithParams()) {
queryParams.setSql(queryParams.getPrepareSql());
}
String correctedSql = QueryUtil.massageSql(queryParams);
//CAUTION: should not change sqlRequest content!
QueryContext.current().getMetrics().setCorrectedSql(correctedSql);
QueryContext.current().setPartialMatchIndex(queryParams.isPartialMatchIndex());
logger.info("The corrected query: {}", correctedSql);
// add extra parameters into olap context, like acceptPartial
Map<String, String> parameters = new HashMap<>();
parameters.put(OLAPContext.PRM_ACCEPT_PARTIAL_RESULT, String.valueOf(queryParams.isAcceptPartial()));
OLAPContext.setParameters(parameters);
// force clear the query context before a new query
clearThreadLocalContexts();
// skip masking if executing user has admin permission
if (!queryParams.isACLDisabledOrAdmin()) {
QueryResultMasks.init(queryParams.getProject(),
NProjectManager.getInstance(KylinConfig.getInstanceFromEnv())
.getProject(queryParams.getProject()).getConfig());
}
// special case for prepare query.
if (BackdoorToggles.getPrepareOnly()) {
return prepareOnly(correctedSql, queryExec, Lists.newArrayList(), Lists.newArrayList());
}
if (queryParams.isPrepareStatementWithParams()) {
for (int i = 0; i < queryParams.getParams().length; i++) {
setParam(queryExec, i, queryParams.getParams()[i]);
}
}
// execute query and get result from kylin layouts
return execute(correctedSql, queryExec);
}, queryParams.getProject());
} catch (TransactionException e) {
return handleTransactionException(queryParams, e);
} catch (SQLException e) {
return handleSqlException(queryParams, e);
} catch (AssertionError e) {
return handleAssertionError(queryParams, e);
} finally {
QueryResultMasks.remove();
}
}
private QueryResult handleTransactionException(QueryParams queryParams, TransactionException e)
throws SQLException {
Throwable cause = e.getCause();
if (cause instanceof SQLException && cause.getCause() instanceof KylinException) {
throw (SQLException) cause;
}
if (!(cause instanceof SQLException)) {
throw e;
}
if (shouldPushdown(cause, queryParams)) {
return pushDownQuery((SQLException) cause, queryParams);
} else {
throw e;
}
}
private QueryResult handleSqlException(QueryParams queryParams, SQLException e) throws Exception {
if (e.getCause() instanceof KylinException) {
if (checkIfRetryQuery(e.getCause())) {
NProjectLoader.removeCache();
return queryWithSqlMassage(queryParams);
} else {
if (e.getCause() instanceof NewQueryRefuseException && shouldPushdown(e, queryParams)) {
return pushDownQuery(e, queryParams);
} else {
throw e;
}
}
}
if (shouldPushdown(e, queryParams)) {
return pushDownQuery(e, queryParams);
}
throw e;
}
private QueryResult handleAssertionError(QueryParams queryParams, AssertionError e) throws SQLException {
// for example: split('abc', 'b') will jump into this AssertionError
if (e.getMessage().equals("OTHER")) {
SQLException ex = new SQLException(e.getMessage(), new CalciteNotSupportException());
return pushDownQuery(ex, queryParams);
}
throw e;
}
public boolean checkIfRetryQuery(Throwable cause) {
if (TargetSegmentNotFoundException.causedBySegmentNotFound(cause)
&& QueryContext.current().getMetrics().getRetryTimes() == 0) {
logger.info("Retry current query, retry times:{}, cause:{}",
QueryContext.current().getMetrics().getRetryTimes(), cause.getMessage());
QueryContext.current().getMetrics().addRetryTimes();
return true;
}
return false;
}
private void checkContainsSumLC(QueryParams queryParams) {
if (PTN_SUM_LC.matcher(queryParams.getSql()).find()) {
String message = "sum_lc() function now is not supported by other query engine";
throw new NotSupportedSQLException(message);
}
}
protected boolean shouldPushdown(Throwable e, QueryParams queryParams) {
if (queryParams.isForcedToIndex()) {
return false;
}
if (e.getCause() instanceof NoStreamingRealizationFoundException) {
return false;
}
if (e.getCause() instanceof SparkException && e.getCause().getMessage().contains(SPARK_JOB_FAILED)) {
return false;
}
if (e.getCause() instanceof NewQueryRefuseException) {
return checkBigQueryPushDown(queryParams);
}
if (PTN_SUM_LC.matcher(queryParams.getSql()).find()) {
return false;
}
return e instanceof SQLException && !e.getMessage().contains(SPARK_MEM_LIMIT_EXCEEDED);
}
private <T> T doTransactionEnabled(UnitOfWork.Callback<T> f, String project) throws Exception {
val kylinConfig = KylinConfig.getInstanceFromEnv();
if (kylinConfig.isTransactionEnabledInQuery()) {
return UnitOfWork.doInTransactionWithRetry(
UnitOfWorkParams.<T> builder().unitName(project).readonly(true).processor(f).build());
} else {
return f.process();
}
}
@VisibleForTesting
public QueryResult execute(String correctedSql, QueryExec queryExec) throws Exception {
QueryResult queryResult = queryExec.executeQuery(correctedSql);
List<QueryContext.NativeQueryRealization> nativeQueryRealizationList = Lists.newArrayList();
for (NativeQueryRealization nqReal : OLAPContext.getNativeRealizations()) {
nativeQueryRealizationList.add(new QueryContext.NativeQueryRealization(nqReal.getModelId(),
nqReal.getModelAlias(), nqReal.getLayoutId(), nqReal.getIndexType(), nqReal.isPartialMatchModel(),
nqReal.isValid(), nqReal.isLayoutExist(), nqReal.isStreamingLayout(), nqReal.getSnapshots()));
}
QueryContext.current().setNativeQueryRealizationList(nativeQueryRealizationList);
return queryResult;
}
private boolean checkBigQueryPushDown(QueryParams queryParams) {
KylinConfig kylinConfig = NProjectManager.getInstance(KylinConfig.getInstanceFromEnv())
.getProject(queryParams.getProject()).getConfig();
boolean isPush = QueryUtil.isBigQueryPushDownCapable(kylinConfig);
if (isPush) {
logger.info("Big query route to pushdown.");
}
return isPush;
}
private QueryResult pushDownQuery(SQLException sqlException, QueryParams queryParams) throws SQLException {
QueryContext.current().getMetrics().setOlapCause(sqlException);
QueryContext.current().getQueryTagInfo().setPushdown(true);
PushdownResult result = null;
try {
result = tryPushDownSelectQuery(queryParams, sqlException, BackdoorToggles.getPrepareOnly());
} catch (KylinException e) {
logger.error("Pushdown failed with kylin exception ", e);
throw e;
} catch (Exception e2) {
logger.error("Pushdown engine failed current query too", e2);
//exception in pushdown, throw it instead of exception in calcite
throw new RuntimeException(
"[" + QueryContext.current().getPushdownEngine() + " Exception] " + e2.getMessage());
}
if (result == null)
throw sqlException;
return new QueryResult(result.getRows(), result.getSize(), null, result.getColumnMetas());
}
public PushdownResult tryPushDownSelectQuery(QueryParams queryParams, SQLException sqlException, boolean isPrepare)
throws Exception {
QueryContext.currentTrace().startSpan(QueryTrace.SQL_PUSHDOWN_TRANSFORMATION);
Semaphore semaphore = PushDownQueryRequestLimits.getSingletonInstance();
logger.info("Query: {} Before the current push down counter {}.", QueryContext.current().getQueryId(),
semaphore.availablePermits());
boolean acquired = false;
boolean asyncQuery = QueryContext.current().getQueryTagInfo().isAsyncQuery();
KylinConfig projectConfig = NProjectManager.getProjectConfig(queryParams.getProject());
try {
//SlowQueryDetector query timeout period is system-level
int queryTimeout = KylinConfig.getInstanceFromEnv().getQueryTimeoutSeconds();
if (!asyncQuery && projectConfig.isPushDownEnabled()) {
acquired = semaphore.tryAcquire(queryTimeout, TimeUnit.SECONDS);
if (!acquired) {
logger.info("query: {} failed to get acquire.", QueryContext.current().getQueryId());
throw new BusyQueryException("Query rejected. Caused by PushDown query server is too busy.");
} else {
logger.info("query: {} success to get acquire.", QueryContext.current().getQueryId());
}
}
String sqlString = queryParams.getSql();
if (isPrepareStatementWithParams(queryParams)) {
sqlString = queryParams.getPrepareSql();
}
if (BackdoorToggles.getPrepareOnly()) {
sqlString = QueryUtil.addLimit(sqlString);
}
String massagedSql = QueryUtil.appendLimitOffset(queryParams.getProject(), sqlString,
queryParams.getLimit(), queryParams.getOffset());
if (isPrepareStatementWithParams(queryParams)) {
QueryContext.current().getMetrics().setCorrectedSql(massagedSql);
}
queryParams.setSql(massagedSql);
queryParams.setSqlException(sqlException);
queryParams.setPrepare(isPrepare);
return PushDownUtil.tryIterQuery(queryParams);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
QueryInterruptChecker.checkThreadInterrupted("Interrupted sql push down at the stage of QueryRoutingEngine",
"Current step: try push down select query");
throw e;
} finally {
if (acquired) {
semaphore.release();
logger.info("Query: {} success to release acquire", QueryContext.current().getQueryId());
}
logger.info("Query: {} After the current push down counter {}.", QueryContext.current().getQueryId(),
semaphore.availablePermits());
}
}
private boolean isPrepareStatementWithParams(QueryParams queryParams) {
KylinConfig projectConfig = NProjectManager.getProjectConfig(queryParams.getProject());
return (KapConfig.getInstanceFromEnv().enablePushdownPrepareStatementWithParams()
|| projectConfig.enableReplaceDynamicParams()) && queryParams.isPrepareStatementWithParams();
}
private QueryResult prepareOnly(String correctedSql, QueryExec queryExec, List<List<String>> results,
List<SelectedColumnMeta> columnMetas) throws SQLException {
CalcitePrepareImpl.KYLIN_ONLY_PREPARE.set(true);
PreparedStatement preparedStatement = null;
try {
List<StructField> fields = queryExec.getColumnMetaData(correctedSql);
for (int i = 0; i < fields.size(); ++i) {
StructField field = fields.get(i);
String columnName = field.getName();
if (columnName.startsWith("_KY_")) {
continue;
}
columnMetas.add(new SelectedColumnMeta(false, false, false, false, field.isNullable() ? 1 : 0, true,
field.getPrecision(), columnName, columnName, null, null, null, field.getPrecision(),
Math.max(field.getScale(), 0), field.getDataType(), field.getDataTypeName(), true, false,
false));
}
return new QueryResult(new LinkedList<>(), 0, fields, columnMetas);
} finally {
CalcitePrepareImpl.KYLIN_ONLY_PREPARE.set(false);
DBUtils.closeQuietly(preparedStatement);
}
}
/**
* @param queryExec
* @param index 0 based index
* @param param
* @throws SQLException
*/
@SuppressWarnings("squid:S3776")
private void setParam(QueryExec queryExec, int index, PrepareSqlStateParam param) {
boolean isNull = (null == param.getValue());
Class<?> clazz;
try {
clazz = Class.forName(param.getClassName());
} catch (ClassNotFoundException e) {
throw new IllegalArgumentException(e);
}
ColumnMetaData.Rep rep = ColumnMetaData.Rep.of(clazz);
switch (rep) {
case PRIMITIVE_CHAR:
case CHARACTER:
case STRING:
queryExec.setPrepareParam(index, isNull ? null : String.valueOf(param.getValue()));
break;
case PRIMITIVE_INT:
case INTEGER:
queryExec.setPrepareParam(index, isNull ? 0 : Integer.parseInt(param.getValue()));
break;
case PRIMITIVE_SHORT:
case SHORT:
queryExec.setPrepareParam(index, isNull ? 0 : Short.parseShort(param.getValue()));
break;
case PRIMITIVE_LONG:
case LONG:
queryExec.setPrepareParam(index, isNull ? 0 : Long.parseLong(param.getValue()));
break;
case PRIMITIVE_FLOAT:
case FLOAT:
queryExec.setPrepareParam(index, isNull ? 0 : Float.parseFloat(param.getValue()));
break;
case PRIMITIVE_DOUBLE:
case DOUBLE:
queryExec.setPrepareParam(index, isNull ? 0 : Double.parseDouble(param.getValue()));
break;
case PRIMITIVE_BOOLEAN:
case BOOLEAN:
queryExec.setPrepareParam(index, !isNull && Boolean.parseBoolean(param.getValue()));
break;
case PRIMITIVE_BYTE:
case BYTE:
queryExec.setPrepareParam(index, isNull ? 0 : Byte.parseByte(param.getValue()));
break;
case JAVA_UTIL_DATE:
case JAVA_SQL_DATE:
queryExec.setPrepareParam(index, isNull ? null : java.sql.Date.valueOf(param.getValue()));
break;
case JAVA_SQL_TIME:
queryExec.setPrepareParam(index, isNull ? null : Time.valueOf(param.getValue()));
break;
case JAVA_SQL_TIMESTAMP:
queryExec.setPrepareParam(index, isNull ? null : Timestamp.valueOf(param.getValue()));
break;
default:
queryExec.setPrepareParam(index, isNull ? null : param.getValue());
}
}
}