| /* |
| * Licensed to the Apache Software Foundation (ASF) under one |
| * or more contributor license agreements. See the NOTICE file |
| * distributed with this work for additional information |
| * regarding copyright ownership. The ASF licenses this file |
| * to you under the Apache License, Version 2.0 (the |
| * "License"); you may not use this file except in compliance |
| * with the License. You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, software |
| * distributed under the License is distributed on an "AS IS" BASIS, |
| * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| * See the License for the specific language governing permissions and |
| * limitations under the License. |
| */ |
| package org.apache.drill.exec.planner.sql; |
| |
| import java.io.IOException; |
| |
| import org.apache.calcite.sql.SqlDescribeSchema; |
| import org.apache.calcite.sql.SqlKind; |
| import org.apache.calcite.sql.SqlLiteral; |
| import org.apache.calcite.sql.SqlNode; |
| import org.apache.calcite.sql.SqlNodeList; |
| import org.apache.calcite.sql.SqlNumericLiteral; |
| import org.apache.calcite.sql.SqlOrderBy; |
| import org.apache.calcite.sql.parser.SqlParserPos; |
| import org.apache.calcite.tools.RelConversionException; |
| import org.apache.calcite.tools.ValidationException; |
| import org.apache.drill.common.exceptions.UserException; |
| import org.apache.drill.exec.ExecConstants; |
| import org.apache.drill.exec.exception.MetadataException; |
| import org.apache.drill.exec.ops.QueryContext; |
| import org.apache.drill.exec.ops.QueryContext.SqlStatementType; |
| import org.apache.drill.exec.physical.PhysicalPlan; |
| import org.apache.drill.exec.planner.sql.handlers.AbstractSqlHandler; |
| import org.apache.drill.exec.planner.sql.handlers.AnalyzeTableHandler; |
| import org.apache.drill.exec.planner.sql.handlers.DefaultSqlHandler; |
| import org.apache.drill.exec.planner.sql.handlers.DescribeSchemaHandler; |
| import org.apache.drill.exec.planner.sql.handlers.DescribeTableHandler; |
| import org.apache.drill.exec.planner.sql.handlers.ExplainHandler; |
| import org.apache.drill.exec.planner.sql.handlers.InsertHandler; |
| import org.apache.drill.exec.planner.sql.handlers.MetastoreAnalyzeTableHandler; |
| import org.apache.drill.exec.planner.sql.handlers.RefreshMetadataHandler; |
| import org.apache.drill.exec.planner.sql.handlers.ResetOptionHandler; |
| import org.apache.drill.exec.planner.sql.handlers.SchemaHandler; |
| import org.apache.drill.exec.planner.sql.handlers.SetOptionHandler; |
| import org.apache.drill.exec.planner.sql.handlers.SqlHandlerConfig; |
| import org.apache.drill.exec.planner.sql.parser.DrillSqlCall; |
| import org.apache.drill.exec.planner.sql.parser.DrillSqlDescribeTable; |
| import org.apache.drill.exec.planner.sql.parser.DrillSqlResetOption; |
| import org.apache.drill.exec.planner.sql.parser.SqlSchema; |
| import org.apache.drill.exec.planner.sql.conversion.SqlConverter; |
| import org.apache.drill.exec.proto.UserBitShared.DrillPBError; |
| import org.apache.drill.exec.testing.ControlsInjector; |
| import org.apache.drill.exec.testing.ControlsInjectorFactory; |
| import org.apache.drill.exec.util.Pointer; |
| import org.apache.drill.exec.work.foreman.ForemanSetupException; |
| import org.apache.drill.exec.work.foreman.SqlUnsupportedException; |
| import org.apache.drill.shaded.guava.com.google.common.base.Throwables; |
| import org.apache.hadoop.security.AccessControlException; |
| |
| public class DrillSqlWorker { |
| private static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(DrillSqlWorker.class); |
| private static final ControlsInjector injector = ControlsInjectorFactory.getInjector(DrillSqlWorker.class); |
| |
| private DrillSqlWorker() { |
| } |
| |
| /** |
| * Converts sql query string into query physical plan. |
| * |
| * @param context query context |
| * @param sql sql query |
| * @return query physical plan |
| */ |
| public static PhysicalPlan getPlan(QueryContext context, String sql) throws ForemanSetupException { |
| return getPlan(context, sql, null); |
| } |
| |
| /** |
| * Converts sql query string into query physical plan. |
| * Catches various exceptions and converts them into user exception when possible. |
| * |
| * @param context query context |
| * @param sql sql query |
| * @param textPlan text plan |
| * @return query physical plan |
| */ |
| public static PhysicalPlan getPlan(QueryContext context, String sql, Pointer<String> textPlan) throws ForemanSetupException { |
| try { |
| return convertPlan(context, sql, textPlan); |
| } catch (ValidationException e) { |
| String errorMessage = e.getCause() != null ? e.getCause().getMessage() : e.getMessage(); |
| throw UserException.validationError(e) |
| .message(errorMessage) |
| .build(logger); |
| } catch (AccessControlException e) { |
| throw UserException.permissionError(e) |
| .build(logger); |
| } catch (SqlUnsupportedException e) { |
| throw UserException.unsupportedError(e) |
| .build(logger); |
| } catch (IOException | RelConversionException e) { |
| throw new QueryInputException("Failure handling SQL.", e); |
| } |
| } |
| |
| /** |
| * Converts sql query string into query physical plan. |
| * In case of any errors (that might occur due to missing function implementation), |
| * checks if local function registry should be synchronized with remote function registry. |
| * If sync took place, reloads drill operator table |
| * (since functions were added to / removed from local function registry) |
| * and attempts to converts sql query string into query physical plan one more time. |
| * |
| * @param context query context |
| * @param sql sql query |
| * @param textPlan text plan |
| * @return query physical plan |
| */ |
| private static PhysicalPlan convertPlan(QueryContext context, String sql, Pointer<String> textPlan) |
| throws ForemanSetupException, RelConversionException, IOException, ValidationException { |
| Pointer<String> textPlanCopy = textPlan == null ? null : new Pointer<>(textPlan.value); |
| long retryAttempts = context.getOption(ExecConstants.METASTORE_RETRIEVAL_RETRY_ATTEMPTS).num_val; |
| try { |
| return getPhysicalPlan(context, sql, textPlan, retryAttempts); |
| } catch (Exception e) { |
| logger.trace("There was an error during conversion into physical plan.", e); |
| |
| // It is prohibited to retry query planning for ANALYZE statement since it changes |
| // query-level option values and will fail when rerunning with updated values |
| boolean syncFuncsAndRetry = context.getSQLStatementType() != SqlStatementType.ANALYZE; |
| |
| // If an error has occurred in a plugin then it will not help to look for a missing |
| // function again. |
| syncFuncsAndRetry &= !(e instanceof UserException && |
| ((UserException) e).getErrorType() == DrillPBError.ErrorType.PLUGIN); |
| |
| // Attempt a function registry sync |
| logger.trace("Will sync remote and local function registries if needed."); |
| int funcRegistryVer = context.getDrillOperatorTable().getFunctionRegistryVersion(); |
| syncFuncsAndRetry &= context.getFunctionRegistry().syncWithRemoteRegistry(funcRegistryVer); |
| |
| if (!syncFuncsAndRetry) { |
| // We don't want to or could not sync the function registry, return to |
| // raising the original error. |
| throw e; |
| } |
| |
| context.reloadDrillOperatorTable(); |
| logger.trace( |
| "Local function registry was synchronized with remote. " + |
| "Trying to find function one more time." |
| ); |
| return getPhysicalPlan(context, sql, textPlanCopy, retryAttempts); |
| } |
| } |
| |
| /** |
| * Converts sql query string into query physical plan. |
| * For the case when {@link MetadataException} was thrown during query planning, |
| * attempts to convert sql query string again, until number of attempts |
| * exceeds {@code metastore.retrieval.retry_attempts}. |
| * If number of attempts exceeds {@code metastore.retrieval.retry_attempts}, |
| * query will be converted into physical plan without metastore usage. |
| * |
| * @param context query context |
| * @param sql sql query |
| * @param textPlan text plan |
| * @param retryAttempts number of attempts to convert sql query string into query physical plan |
| * @return query physical plan |
| */ |
| private static PhysicalPlan getPhysicalPlan(QueryContext context, String sql, Pointer<String> textPlan, |
| long retryAttempts) throws ForemanSetupException, RelConversionException, IOException, ValidationException { |
| try { |
| return getQueryPlan(context, sql, textPlan); |
| } catch (Exception e) { |
| Throwable rootCause = Throwables.getRootCause(e); |
| // Calcite wraps exceptions thrown during planning, so checks whether original exception is OutdatedMetadataException |
| if (rootCause instanceof MetadataException) { |
| // resets SqlStatementType to avoid errors when it is set during further attempts |
| context.clearSQLStatementType(); |
| switch (((MetadataException) rootCause).getExceptionType()) { |
| case OUTDATED_METADATA: |
| logger.warn("Metastore table metadata is outdated. " + |
| "Retrying to obtain query plan without Metastore usage."); |
| break; |
| case INCONSISTENT_METADATA: |
| if (retryAttempts > 0) { |
| logger.debug("Table metadata was changed during query planning. " + |
| "Retrying to obtain query plan using updated metadata."); |
| return getPhysicalPlan(context, sql, textPlan, --retryAttempts); |
| } |
| logger.warn("Table metadata was changing during query planning for all `metastore.retrieval.retry_attempts` = {} attempts.", |
| context.getOption(ExecConstants.METASTORE_RETRIEVAL_RETRY_ATTEMPTS).num_val); |
| break; |
| default: |
| logger.error("Exception happened during query planning using Metastore: {}", rootCause.getMessage(), rootCause); |
| } |
| logger.warn("Retrying to obtain query plan without Metastore usage."); |
| context.getOptions().setLocalOption(ExecConstants.METASTORE_ENABLED, false); |
| return getQueryPlan(context, sql, textPlan); |
| } |
| throw e; |
| } |
| } |
| |
| /** |
| * Converts sql query string into query physical plan. |
| * |
| * @param context query context |
| * @param sql sql query |
| * @param textPlan text plan |
| * @return query physical plan |
| */ |
| private static PhysicalPlan getQueryPlan(QueryContext context, String sql, Pointer<String> textPlan) |
| throws ForemanSetupException, RelConversionException, IOException, ValidationException { |
| |
| final SqlConverter parser = new SqlConverter(context); |
| injector.injectChecked(context.getExecutionControls(), "sql-parsing", ForemanSetupException.class); |
| final SqlNode sqlNode = checkAndApplyAutoLimit(parser, context, sql); |
| final AbstractSqlHandler handler; |
| final SqlHandlerConfig config = new SqlHandlerConfig(context, parser); |
| |
| switch (sqlNode.getKind()) { |
| case EXPLAIN: |
| handler = new ExplainHandler(config, textPlan); |
| context.setSQLStatementType(SqlStatementType.EXPLAIN); |
| break; |
| case SET_OPTION: |
| handler = sqlNode instanceof DrillSqlResetOption |
| ? new ResetOptionHandler(context) |
| : new SetOptionHandler(context); |
| context.setSQLStatementType(SqlStatementType.SETOPTION); |
| break; |
| case DESCRIBE_TABLE: |
| if (sqlNode instanceof DrillSqlDescribeTable) { |
| handler = new DescribeTableHandler(config); |
| context.setSQLStatementType(SqlStatementType.DESCRIBE_TABLE); |
| break; |
| } |
| case DESCRIBE_SCHEMA: |
| if (sqlNode instanceof SqlDescribeSchema) { |
| handler = new DescribeSchemaHandler(config); |
| context.setSQLStatementType(SqlStatementType.DESCRIBE_SCHEMA); |
| break; |
| } |
| if (sqlNode instanceof SqlSchema.Describe) { |
| handler = new SchemaHandler.Describe(config); |
| context.setSQLStatementType(SqlStatementType.DESCRIBE_SCHEMA); |
| break; |
| } |
| case CREATE_TABLE: |
| handler = ((DrillSqlCall) sqlNode).getSqlHandler(config, textPlan); |
| context.setSQLStatementType(SqlStatementType.CTAS); |
| break; |
| case INSERT: |
| handler = new InsertHandler(config, textPlan); |
| context.setSQLStatementType(SqlStatementType.INSERT); |
| break; |
| case SELECT: |
| handler = new DefaultSqlHandler(config, textPlan); |
| context.setSQLStatementType(SqlStatementType.SELECT); |
| break; |
| case DROP_TABLE: |
| case CREATE_VIEW: |
| case DROP_VIEW: |
| case OTHER_DDL: |
| case OTHER: |
| if (sqlNode instanceof DrillSqlCall) { |
| handler = ((DrillSqlCall) sqlNode).getSqlHandler(config); |
| if (handler instanceof AnalyzeTableHandler || handler instanceof MetastoreAnalyzeTableHandler) { |
| context.setSQLStatementType(SqlStatementType.ANALYZE); |
| } else if (handler instanceof RefreshMetadataHandler) { |
| context.setSQLStatementType(SqlStatementType.REFRESH); |
| } else { |
| context.setSQLStatementType(SqlStatementType.OTHER); |
| } |
| break; |
| } |
| // fallthrough |
| default: |
| handler = new DefaultSqlHandler(config, textPlan); |
| context.setSQLStatementType(SqlStatementType.OTHER); |
| } |
| |
| // Determines whether result set should be returned for the query based on return result set option and sql node kind. |
| // Overrides the option on a query level if it differs from the current value. |
| boolean currentReturnResultValue = context.getOptions().getBoolean(ExecConstants.RETURN_RESULT_SET_FOR_DDL); |
| boolean newReturnResultSetValue = currentReturnResultValue || !SqlKind.DDL.contains(sqlNode.getKind()); |
| if (newReturnResultSetValue != currentReturnResultValue) { |
| context.getOptions().setLocalOption(ExecConstants.RETURN_RESULT_SET_FOR_DDL, true); |
| } |
| |
| return handler.getPlan(sqlNode); |
| } |
| |
| private static boolean isAutoLimitShouldBeApplied(SqlNode sqlNode, int queryMaxRows) { |
| return (queryMaxRows > 0) && sqlNode.getKind().belongsTo(SqlKind.QUERY) |
| && (sqlNode.getKind() != SqlKind.ORDER_BY || isAutoLimitLessThanOrderByFetch((SqlOrderBy) sqlNode, queryMaxRows)); |
| } |
| |
| private static SqlNode checkAndApplyAutoLimit(SqlConverter parser, QueryContext context, String sql) { |
| SqlNode sqlNode = parser.parse(sql); |
| int queryMaxRows = context.getOptions().getOption(ExecConstants.QUERY_MAX_ROWS).num_val.intValue(); |
| if (isAutoLimitShouldBeApplied(sqlNode, queryMaxRows)) { |
| sqlNode = wrapWithAutoLimit(sqlNode, queryMaxRows); |
| } else { |
| //Force setting to zero IFF autoLimit was intended to be set originally but is inapplicable |
| if (queryMaxRows > 0) { |
| context.getOptions().setLocalOption(ExecConstants.QUERY_MAX_ROWS, 0); |
| } |
| } |
| return sqlNode; |
| } |
| |
| private static boolean isAutoLimitLessThanOrderByFetch(SqlOrderBy orderBy, int queryMaxRows) { |
| return orderBy.fetch == null || Integer.parseInt(orderBy.fetch.toString()) > queryMaxRows; |
| } |
| |
| private static SqlNode wrapWithAutoLimit(SqlNode sqlNode, int queryMaxRows) { |
| SqlNumericLiteral autoLimitLiteral = SqlLiteral.createExactNumeric(String.valueOf(queryMaxRows), SqlParserPos.ZERO); |
| if (sqlNode.getKind() == SqlKind.ORDER_BY) { |
| SqlOrderBy orderBy = (SqlOrderBy) sqlNode; |
| return new SqlOrderBy(orderBy.getParserPosition(), orderBy.query, orderBy.orderList, orderBy.offset, autoLimitLiteral); |
| } |
| return new SqlOrderBy(SqlParserPos.ZERO, sqlNode, SqlNodeList.EMPTY, null, autoLimitLiteral); |
| } |
| } |