| /* |
| * Licensed to the Apache Software Foundation (ASF) under one |
| * or more contributor license agreements. See the NOTICE file |
| * distributed with this work for additional information |
| * regarding copyright ownership. The ASF licenses this file |
| * to you under the Apache License, Version 2.0 (the |
| * "License"); you may not use this file except in compliance |
| * with the License. You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, software |
| * distributed under the License is distributed on an "AS IS" BASIS, |
| * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| * See the License for the specific language governing permissions and |
| * limitations under the License. |
| */ |
| package org.apache.drill.exec.planner.sql.handlers; |
| |
| import org.apache.drill.exec.planner.common.DrillSetOpRel; |
| import com.google.common.base.Preconditions; |
| import com.google.common.collect.ImmutableSet; |
| import com.fasterxml.jackson.annotation.JsonCreator; |
| import com.fasterxml.jackson.annotation.JsonProperty; |
| import com.fasterxml.jackson.annotation.JsonTypeName; |
| import org.apache.calcite.plan.RelTraitSet; |
| import org.apache.calcite.rel.RelNode; |
| import org.apache.calcite.rel.RelShuttle; |
| import org.apache.calcite.rel.RelShuttleImpl; |
| import org.apache.calcite.rel.core.TableScan; |
| import org.apache.calcite.rel.logical.LogicalAggregate; |
| import org.apache.calcite.rel.logical.LogicalIntersect; |
| import org.apache.calcite.rel.logical.LogicalJoin; |
| import org.apache.calcite.rel.logical.LogicalMinus; |
| import org.apache.calcite.rel.logical.LogicalSort; |
| import org.apache.calcite.rel.logical.LogicalUnion; |
| import org.apache.calcite.rel.logical.LogicalValues; |
| import org.apache.calcite.rel.type.RelDataTypeField; |
| |
| import org.apache.calcite.rex.RexCall; |
| import org.apache.calcite.rex.RexLiteral; |
| import org.apache.calcite.rex.RexNode; |
| import org.apache.calcite.rex.RexShuttle; |
| import org.apache.calcite.sql.SqlOperator; |
| import org.apache.calcite.sql.type.SqlTypeName; |
| import org.apache.drill.common.exceptions.ExecutionSetupException; |
| import org.apache.drill.common.types.TypeProtos; |
| import org.apache.drill.exec.exception.SchemaChangeException; |
| import org.apache.drill.exec.expr.TypeHelper; |
| import org.apache.drill.exec.ops.OperatorContext; |
| import org.apache.drill.exec.physical.base.ScanStats; |
| import org.apache.drill.exec.physical.impl.OutputMutator; |
| import org.apache.drill.exec.planner.common.DrillRelOptUtil; |
| import org.apache.drill.exec.planner.logical.DrillDirectScanRel; |
| import org.apache.drill.exec.planner.logical.DrillLimitRel; |
| import org.apache.drill.exec.planner.logical.DrillRel; |
| import org.apache.drill.exec.planner.common.DrillProjectRelBase; |
| import org.apache.drill.exec.planner.sql.DrillSqlOperator; |
| import org.apache.drill.exec.planner.sql.TypeInferenceUtils; |
| import org.apache.drill.exec.record.MaterializedField; |
| import org.apache.drill.exec.store.AbstractRecordReader; |
| import org.apache.drill.exec.store.direct.DirectGroupScan; |
| |
| import java.util.ArrayList; |
| import java.util.List; |
| import org.apache.calcite.rex.RexBuilder; |
| import org.apache.drill.exec.planner.common.DrillAggregateRelBase; |
| import org.apache.drill.exec.planner.common.DrillJoinRelBase; |
| import org.apache.drill.exec.util.Pointer; |
| |
| import java.math.BigDecimal; |
| import java.util.Set; |
| |
| /** |
| * Visitor that will identify whether the root portion of the RelNode tree contains a limit 0 pattern. In this case, we |
| * inform the planner settings that this plan should be run as a single node plan to reduce the overhead associated with |
| * executing a schema-only query. |
| */ |
| public class FindLimit0Visitor extends RelShuttleImpl { |
| |
| // Some types are excluded in this set: |
| // + VARBINARY is not fully tested. |
| // + MAP, ARRAY are currently not exposed to the planner. |
| // + TINYINT, SMALLINT are defined in the Drill type system but have been turned off for now. |
| // + SYMBOL, MULTISET, DISTINCT, STRUCTURED, ROW, OTHER, CURSOR, COLUMN_LIST are Calcite types |
| // currently not supported by Drill, nor defined in the Drill type list. |
| // + ANY is the late binding type. |
| private static final ImmutableSet<SqlTypeName> TYPES = |
| ImmutableSet.<SqlTypeName>builder() |
| .add(SqlTypeName.INTEGER, SqlTypeName.BIGINT, SqlTypeName.FLOAT, SqlTypeName.DOUBLE, |
| SqlTypeName.VARCHAR, SqlTypeName.BOOLEAN, SqlTypeName.DATE, SqlTypeName.TIME, |
| SqlTypeName.TIMESTAMP, SqlTypeName.INTERVAL_YEAR, SqlTypeName.INTERVAL_YEAR_MONTH, |
| SqlTypeName.INTERVAL_MONTH, SqlTypeName.INTERVAL_DAY, SqlTypeName.INTERVAL_DAY_HOUR, |
| SqlTypeName.INTERVAL_DAY_MINUTE, SqlTypeName.INTERVAL_DAY_SECOND, SqlTypeName.INTERVAL_HOUR, |
| SqlTypeName.INTERVAL_HOUR_MINUTE, SqlTypeName.INTERVAL_HOUR_SECOND, SqlTypeName.INTERVAL_MINUTE, |
| SqlTypeName.INTERVAL_MINUTE_SECOND, SqlTypeName.INTERVAL_SECOND, SqlTypeName.CHAR, SqlTypeName.DECIMAL) |
| .build(); |
| |
| private static final Set<String> unsupportedFunctions = ImmutableSet.<String>builder() |
| // see Mappify |
| .add("KVGEN") |
| .add("MAPPIFY") |
| // see DummyFlatten |
| .add("FLATTEN") |
| // see JsonConvertFrom |
| .add("CONVERT_FROMJSON") |
| // see JsonConvertTo class |
| .add("CONVERT_TOJSON") |
| .add("CONVERT_TOSIMPLEJSON") |
| .add("CONVERT_TOEXTENDEDJSON") |
| .build(); |
| |
| private boolean contains = false; |
| |
| private FindLimit0Visitor() { |
| } |
| |
| /** |
| * If all field types of the given node are {@link #TYPES recognized types} and honored by execution, then this |
| * method returns the tree: DrillDirectScanRel(field types). Otherwise, the method returns null. |
| * |
| * @param rel calcite logical rel tree |
| * @return drill logical rel tree |
| */ |
| public static DrillRel getDirectScanRelIfFullySchemaed(RelNode rel) { |
| final List<RelDataTypeField> fieldList = rel.getRowType().getFieldList(); |
| final List<TypeProtos.MajorType> columnTypes = new ArrayList<>(); |
| |
| for (final RelDataTypeField field : fieldList) { |
| final SqlTypeName sqlTypeName = field.getType().getSqlTypeName(); |
| if (!TYPES.contains(sqlTypeName)) { |
| return null; |
| } else { |
| final TypeProtos.MajorType.Builder builder = TypeProtos.MajorType.newBuilder() |
| .setMode(field.getType().isNullable() ? TypeProtos.DataMode.OPTIONAL : TypeProtos.DataMode.REQUIRED) |
| .setMinorType(TypeInferenceUtils.getDrillTypeFromCalciteType(sqlTypeName)); |
| |
| if (sqlTypeName == SqlTypeName.DECIMAL) { |
| builder.setScale(field.getType().getScale()); |
| builder.setPrecision(field.getType().getPrecision()); |
| } else if (TypeInferenceUtils.isScalarStringType(sqlTypeName)) { |
| builder.setPrecision(field.getType().getPrecision()); |
| } |
| |
| columnTypes.add(builder.build()); |
| } |
| } |
| final RelTraitSet traits = rel.getTraitSet().plus(DrillRel.DRILL_LOGICAL); |
| final RelDataTypeReader reader = new RelDataTypeReader(rel.getRowType().getFieldNames(), columnTypes); |
| return new DrillDirectScanRel(rel.getCluster(), traits, |
| new DirectGroupScan(reader, ScanStats.ZERO_RECORD_TABLE), rel.getRowType()); |
| } |
| |
| /** |
| * Check if the root portion of the tree contains LIMIT(0). |
| * |
| * @param rel rel node tree |
| * @return true if the root portion of the tree contains LIMIT(0) |
| */ |
| public static boolean containsLimit0(final RelNode rel) { |
| FindLimit0Visitor visitor = new FindLimit0Visitor(); |
| rel.accept(visitor); |
| |
| return visitor.isContains(); |
| } |
| |
| public static DrillRel addLimitOnTopOfLeafNodes(final DrillRel rel) { |
| final Pointer<Boolean> isUnsupported = new Pointer<>(false); |
| |
| // to visit unsupported functions |
| final RexShuttle unsupportedFunctionsVisitor = new RexShuttle() { |
| @Override |
| public RexNode visitCall(RexCall call) { |
| final SqlOperator operator = call.getOperator(); |
| if (isUnsupportedScalarFunction(operator)) { |
| isUnsupported.value = true; |
| return call; |
| } |
| return super.visitCall(call); |
| } |
| }; |
| |
| // to visit unsupported operators |
| final RelShuttle unsupportedOperationsVisitor = new RelShuttleImpl() { |
| @Override |
| public RelNode visit(RelNode other) { |
| if (other instanceof DrillSetOpRel) { |
| isUnsupported.value = true; |
| return other; |
| } else if (other instanceof DrillProjectRelBase) { |
| if (!isUnsupported.value) { |
| other.accept(unsupportedFunctionsVisitor); |
| } |
| if (isUnsupported.value) { |
| return other; |
| } |
| } |
| return super.visit(other); |
| } |
| }; |
| |
| rel.accept(unsupportedOperationsVisitor); |
| if (isUnsupported.value) { |
| return rel; |
| } |
| |
| // to add LIMIT (0) on top of leaf nodes |
| final RelShuttle addLimitOnScanVisitor = new RelShuttleImpl() { |
| |
| private RelNode addLimitAsParent(RelNode node) { |
| final RexBuilder builder = node.getCluster().getRexBuilder(); |
| final RexLiteral offset = builder.makeExactLiteral(BigDecimal.ZERO); |
| final RexLiteral fetch = builder.makeExactLiteral(BigDecimal.ZERO); |
| return new DrillLimitRel(node.getCluster(), node.getTraitSet(), node, offset, fetch); |
| } |
| |
| @Override |
| public RelNode visit(LogicalValues values) { |
| return addLimitAsParent(values); |
| } |
| |
| @Override |
| public RelNode visit(TableScan scan) { |
| return addLimitAsParent(scan); |
| } |
| |
| @Override |
| public RelNode visit(RelNode other) { |
| if (other.getInputs().isEmpty()) { // leaf operator |
| return addLimitAsParent(other); |
| } |
| return super.visit(other); |
| } |
| }; |
| |
| return (DrillRel) rel.accept(addLimitOnScanVisitor); |
| } |
| |
| private static boolean isUnsupportedScalarFunction(final SqlOperator operator) { |
| return operator instanceof DrillSqlOperator && |
| unsupportedFunctions.contains(operator.getName().toUpperCase()); |
| } |
| |
| boolean isContains() { |
| return contains; |
| } |
| |
| @Override |
| public RelNode visit(LogicalSort sort) { |
| if (DrillRelOptUtil.isLimit0(sort.fetch)) { |
| contains = true; |
| return sort; |
| } |
| |
| return super.visit(sort); |
| } |
| |
| @Override |
| public RelNode visit(RelNode other) { |
| if (other instanceof DrillJoinRelBase || |
| other instanceof DrillAggregateRelBase || |
| other instanceof DrillSetOpRel) { |
| return other; |
| } |
| if (other instanceof DrillLimitRel) { |
| if (DrillRelOptUtil.isLimit0(((DrillLimitRel) other).getFetch())) { |
| contains = true; |
| return other; |
| } |
| } |
| |
| return super.visit(other); |
| } |
| |
| // The following set of RelNodes should terminate a search for the limit 0 pattern as they want convey its meaning. |
| |
| @Override |
| public RelNode visit(LogicalAggregate aggregate) { |
| return aggregate; |
| } |
| |
| @Override |
| public RelNode visit(LogicalIntersect intersect) { |
| return intersect; |
| } |
| |
| @Override |
| public RelNode visit(LogicalJoin join) { |
| return join; |
| } |
| |
| @Override |
| public RelNode visit(LogicalMinus minus) { |
| return minus; |
| } |
| |
| @Override |
| public RelNode visit(LogicalUnion union) { |
| return union; |
| } |
| |
| /** |
| * Reader for column names and types. |
| */ |
| @JsonTypeName("RelDataTypeRecordReader") |
| public static class RelDataTypeReader extends AbstractRecordReader { |
| |
| public final List<String> columnNames; |
| public final List<TypeProtos.MajorType> columnTypes; |
| |
| @JsonCreator |
| public RelDataTypeReader(@JsonProperty("columnNames") List<String> columnNames, |
| @JsonProperty("columnTypes") List<TypeProtos.MajorType> columnTypes) { |
| Preconditions.checkArgument(columnNames.size() == columnTypes.size(), "Number of columns and their types should match"); |
| this.columnNames = columnNames; |
| this.columnTypes = columnTypes; |
| } |
| |
| @Override |
| public void setup(OperatorContext context, OutputMutator output) throws ExecutionSetupException { |
| for (int i = 0; i < columnNames.size(); i++) { |
| final TypeProtos.MajorType type = columnTypes.get(i); |
| final MaterializedField field = MaterializedField.create(columnNames.get(i), type); |
| final Class vvClass = TypeHelper.getValueVectorClass(type.getMinorType(), type.getMode()); |
| try { |
| output.addField(field, vvClass); |
| } catch (SchemaChangeException e) { |
| throw new ExecutionSetupException(e); |
| } |
| } |
| } |
| |
| @Override |
| public int next() { |
| return 0; |
| } |
| |
| @Override |
| public void close() throws Exception { |
| } |
| |
| /** |
| * Represents RelDataTypeReader content as string, used in query plan json. |
| * Example: RelDataTypeReader{columnNames=[col1], columnTypes=[INTERVALYEAR-OPTIONAL]} |
| * |
| * @return string representation of RelDataTypeReader content |
| */ |
| @Override |
| public String toString() { |
| StringBuilder builder = new StringBuilder(); |
| builder.append("RelDataTypeReader{columnNames="); |
| builder.append(columnNames).append(", columnTypes="); |
| List<String> columnTypesList = new ArrayList<>(columnTypes.size()); |
| for (TypeProtos.MajorType columnType : columnTypes) { |
| columnTypesList.add(columnType.getMinorType().toString() + "-" + columnType.getMode().toString()); |
| } |
| builder.append(columnTypesList); |
| builder.append("}"); |
| |
| return builder.toString(); |
| } |
| } |
| } |