blob: 474fc1de11f06eeb201bb958a2dea6ce74827595 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.drill.exec.planner.sql.handlers;
import org.apache.drill.exec.planner.common.DrillSetOpRel;
import com.google.common.base.Preconditions;
import com.google.common.collect.ImmutableSet;
import com.fasterxml.jackson.annotation.JsonCreator;
import com.fasterxml.jackson.annotation.JsonProperty;
import com.fasterxml.jackson.annotation.JsonTypeName;
import org.apache.calcite.plan.RelTraitSet;
import org.apache.calcite.rel.RelNode;
import org.apache.calcite.rel.RelShuttle;
import org.apache.calcite.rel.RelShuttleImpl;
import org.apache.calcite.rel.core.TableScan;
import org.apache.calcite.rel.logical.LogicalAggregate;
import org.apache.calcite.rel.logical.LogicalIntersect;
import org.apache.calcite.rel.logical.LogicalJoin;
import org.apache.calcite.rel.logical.LogicalMinus;
import org.apache.calcite.rel.logical.LogicalSort;
import org.apache.calcite.rel.logical.LogicalUnion;
import org.apache.calcite.rel.logical.LogicalValues;
import org.apache.calcite.rel.type.RelDataTypeField;
import org.apache.calcite.rex.RexCall;
import org.apache.calcite.rex.RexLiteral;
import org.apache.calcite.rex.RexNode;
import org.apache.calcite.rex.RexShuttle;
import org.apache.calcite.sql.SqlOperator;
import org.apache.calcite.sql.type.SqlTypeName;
import org.apache.drill.common.exceptions.ExecutionSetupException;
import org.apache.drill.common.types.TypeProtos;
import org.apache.drill.exec.exception.SchemaChangeException;
import org.apache.drill.exec.expr.TypeHelper;
import org.apache.drill.exec.ops.OperatorContext;
import org.apache.drill.exec.physical.base.ScanStats;
import org.apache.drill.exec.physical.impl.OutputMutator;
import org.apache.drill.exec.planner.common.DrillRelOptUtil;
import org.apache.drill.exec.planner.logical.DrillDirectScanRel;
import org.apache.drill.exec.planner.logical.DrillLimitRel;
import org.apache.drill.exec.planner.logical.DrillRel;
import org.apache.drill.exec.planner.common.DrillProjectRelBase;
import org.apache.drill.exec.planner.sql.DrillSqlOperator;
import org.apache.drill.exec.planner.sql.TypeInferenceUtils;
import org.apache.drill.exec.record.MaterializedField;
import org.apache.drill.exec.store.AbstractRecordReader;
import org.apache.drill.exec.store.direct.DirectGroupScan;
import java.util.ArrayList;
import java.util.List;
import org.apache.calcite.rex.RexBuilder;
import org.apache.drill.exec.planner.common.DrillAggregateRelBase;
import org.apache.drill.exec.planner.common.DrillJoinRelBase;
import org.apache.drill.exec.util.Pointer;
import java.math.BigDecimal;
import java.util.Set;
/**
* Visitor that will identify whether the root portion of the RelNode tree contains a limit 0 pattern. In this case, we
* inform the planner settings that this plan should be run as a single node plan to reduce the overhead associated with
* executing a schema-only query.
*/
public class FindLimit0Visitor extends RelShuttleImpl {
// Some types are excluded in this set:
// + VARBINARY is not fully tested.
// + MAP, ARRAY are currently not exposed to the planner.
// + TINYINT, SMALLINT are defined in the Drill type system but have been turned off for now.
// + SYMBOL, MULTISET, DISTINCT, STRUCTURED, ROW, OTHER, CURSOR, COLUMN_LIST are Calcite types
// currently not supported by Drill, nor defined in the Drill type list.
// + ANY is the late binding type.
private static final ImmutableSet<SqlTypeName> TYPES =
ImmutableSet.<SqlTypeName>builder()
.add(SqlTypeName.INTEGER, SqlTypeName.BIGINT, SqlTypeName.FLOAT, SqlTypeName.DOUBLE,
SqlTypeName.VARCHAR, SqlTypeName.BOOLEAN, SqlTypeName.DATE, SqlTypeName.TIME,
SqlTypeName.TIMESTAMP, SqlTypeName.INTERVAL_YEAR, SqlTypeName.INTERVAL_YEAR_MONTH,
SqlTypeName.INTERVAL_MONTH, SqlTypeName.INTERVAL_DAY, SqlTypeName.INTERVAL_DAY_HOUR,
SqlTypeName.INTERVAL_DAY_MINUTE, SqlTypeName.INTERVAL_DAY_SECOND, SqlTypeName.INTERVAL_HOUR,
SqlTypeName.INTERVAL_HOUR_MINUTE, SqlTypeName.INTERVAL_HOUR_SECOND, SqlTypeName.INTERVAL_MINUTE,
SqlTypeName.INTERVAL_MINUTE_SECOND, SqlTypeName.INTERVAL_SECOND, SqlTypeName.CHAR, SqlTypeName.DECIMAL)
.build();
private static final Set<String> unsupportedFunctions = ImmutableSet.<String>builder()
// see Mappify
.add("KVGEN")
.add("MAPPIFY")
// see DummyFlatten
.add("FLATTEN")
// see JsonConvertFrom
.add("CONVERT_FROMJSON")
// see JsonConvertTo class
.add("CONVERT_TOJSON")
.add("CONVERT_TOSIMPLEJSON")
.add("CONVERT_TOEXTENDEDJSON")
.build();
private boolean contains = false;
private FindLimit0Visitor() {
}
/**
* If all field types of the given node are {@link #TYPES recognized types} and honored by execution, then this
* method returns the tree: DrillDirectScanRel(field types). Otherwise, the method returns null.
*
* @param rel calcite logical rel tree
* @return drill logical rel tree
*/
public static DrillRel getDirectScanRelIfFullySchemaed(RelNode rel) {
final List<RelDataTypeField> fieldList = rel.getRowType().getFieldList();
final List<TypeProtos.MajorType> columnTypes = new ArrayList<>();
for (final RelDataTypeField field : fieldList) {
final SqlTypeName sqlTypeName = field.getType().getSqlTypeName();
if (!TYPES.contains(sqlTypeName)) {
return null;
} else {
final TypeProtos.MajorType.Builder builder = TypeProtos.MajorType.newBuilder()
.setMode(field.getType().isNullable() ? TypeProtos.DataMode.OPTIONAL : TypeProtos.DataMode.REQUIRED)
.setMinorType(TypeInferenceUtils.getDrillTypeFromCalciteType(sqlTypeName));
if (sqlTypeName == SqlTypeName.DECIMAL) {
builder.setScale(field.getType().getScale());
builder.setPrecision(field.getType().getPrecision());
} else if (TypeInferenceUtils.isScalarStringType(sqlTypeName)) {
builder.setPrecision(field.getType().getPrecision());
}
columnTypes.add(builder.build());
}
}
final RelTraitSet traits = rel.getTraitSet().plus(DrillRel.DRILL_LOGICAL);
final RelDataTypeReader reader = new RelDataTypeReader(rel.getRowType().getFieldNames(), columnTypes);
return new DrillDirectScanRel(rel.getCluster(), traits,
new DirectGroupScan(reader, ScanStats.ZERO_RECORD_TABLE), rel.getRowType());
}
/**
* Check if the root portion of the tree contains LIMIT(0).
*
* @param rel rel node tree
* @return true if the root portion of the tree contains LIMIT(0)
*/
public static boolean containsLimit0(final RelNode rel) {
FindLimit0Visitor visitor = new FindLimit0Visitor();
rel.accept(visitor);
return visitor.isContains();
}
public static DrillRel addLimitOnTopOfLeafNodes(final DrillRel rel) {
final Pointer<Boolean> isUnsupported = new Pointer<>(false);
// to visit unsupported functions
final RexShuttle unsupportedFunctionsVisitor = new RexShuttle() {
@Override
public RexNode visitCall(RexCall call) {
final SqlOperator operator = call.getOperator();
if (isUnsupportedScalarFunction(operator)) {
isUnsupported.value = true;
return call;
}
return super.visitCall(call);
}
};
// to visit unsupported operators
final RelShuttle unsupportedOperationsVisitor = new RelShuttleImpl() {
@Override
public RelNode visit(RelNode other) {
if (other instanceof DrillSetOpRel) {
isUnsupported.value = true;
return other;
} else if (other instanceof DrillProjectRelBase) {
if (!isUnsupported.value) {
other.accept(unsupportedFunctionsVisitor);
}
if (isUnsupported.value) {
return other;
}
}
return super.visit(other);
}
};
rel.accept(unsupportedOperationsVisitor);
if (isUnsupported.value) {
return rel;
}
// to add LIMIT (0) on top of leaf nodes
final RelShuttle addLimitOnScanVisitor = new RelShuttleImpl() {
private RelNode addLimitAsParent(RelNode node) {
final RexBuilder builder = node.getCluster().getRexBuilder();
final RexLiteral offset = builder.makeExactLiteral(BigDecimal.ZERO);
final RexLiteral fetch = builder.makeExactLiteral(BigDecimal.ZERO);
return new DrillLimitRel(node.getCluster(), node.getTraitSet(), node, offset, fetch);
}
@Override
public RelNode visit(LogicalValues values) {
return addLimitAsParent(values);
}
@Override
public RelNode visit(TableScan scan) {
return addLimitAsParent(scan);
}
@Override
public RelNode visit(RelNode other) {
if (other.getInputs().isEmpty()) { // leaf operator
return addLimitAsParent(other);
}
return super.visit(other);
}
};
return (DrillRel) rel.accept(addLimitOnScanVisitor);
}
private static boolean isUnsupportedScalarFunction(final SqlOperator operator) {
return operator instanceof DrillSqlOperator &&
unsupportedFunctions.contains(operator.getName().toUpperCase());
}
boolean isContains() {
return contains;
}
@Override
public RelNode visit(LogicalSort sort) {
if (DrillRelOptUtil.isLimit0(sort.fetch)) {
contains = true;
return sort;
}
return super.visit(sort);
}
@Override
public RelNode visit(RelNode other) {
if (other instanceof DrillJoinRelBase ||
other instanceof DrillAggregateRelBase ||
other instanceof DrillSetOpRel) {
return other;
}
if (other instanceof DrillLimitRel) {
if (DrillRelOptUtil.isLimit0(((DrillLimitRel) other).getFetch())) {
contains = true;
return other;
}
}
return super.visit(other);
}
// The following set of RelNodes should terminate a search for the limit 0 pattern as they want convey its meaning.
@Override
public RelNode visit(LogicalAggregate aggregate) {
return aggregate;
}
@Override
public RelNode visit(LogicalIntersect intersect) {
return intersect;
}
@Override
public RelNode visit(LogicalJoin join) {
return join;
}
@Override
public RelNode visit(LogicalMinus minus) {
return minus;
}
@Override
public RelNode visit(LogicalUnion union) {
return union;
}
/**
* Reader for column names and types.
*/
@JsonTypeName("RelDataTypeRecordReader")
public static class RelDataTypeReader extends AbstractRecordReader {
public final List<String> columnNames;
public final List<TypeProtos.MajorType> columnTypes;
@JsonCreator
public RelDataTypeReader(@JsonProperty("columnNames") List<String> columnNames,
@JsonProperty("columnTypes") List<TypeProtos.MajorType> columnTypes) {
Preconditions.checkArgument(columnNames.size() == columnTypes.size(), "Number of columns and their types should match");
this.columnNames = columnNames;
this.columnTypes = columnTypes;
}
@Override
public void setup(OperatorContext context, OutputMutator output) throws ExecutionSetupException {
for (int i = 0; i < columnNames.size(); i++) {
final TypeProtos.MajorType type = columnTypes.get(i);
final MaterializedField field = MaterializedField.create(columnNames.get(i), type);
final Class vvClass = TypeHelper.getValueVectorClass(type.getMinorType(), type.getMode());
try {
output.addField(field, vvClass);
} catch (SchemaChangeException e) {
throw new ExecutionSetupException(e);
}
}
}
@Override
public int next() {
return 0;
}
@Override
public void close() throws Exception {
}
/**
* Represents RelDataTypeReader content as string, used in query plan json.
* Example: RelDataTypeReader{columnNames=[col1], columnTypes=[INTERVALYEAR-OPTIONAL]}
*
* @return string representation of RelDataTypeReader content
*/
@Override
public String toString() {
StringBuilder builder = new StringBuilder();
builder.append("RelDataTypeReader{columnNames=");
builder.append(columnNames).append(", columnTypes=");
List<String> columnTypesList = new ArrayList<>(columnTypes.size());
for (TypeProtos.MajorType columnType : columnTypes) {
columnTypesList.add(columnType.getMinorType().toString() + "-" + columnType.getMode().toString());
}
builder.append(columnTypesList);
builder.append("}");
return builder.toString();
}
}
}