blob: 3e156545544c0ccb548f582d071ac0722cef91a6 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.druid.sql.calcite.planner;
import com.google.common.base.Preconditions;
import com.google.common.base.Supplier;
import com.google.common.base.Suppliers;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableSet;
import org.apache.calcite.DataContext;
import org.apache.calcite.adapter.java.JavaTypeFactory;
import org.apache.calcite.config.CalciteConnectionConfig;
import org.apache.calcite.config.CalciteConnectionConfigImpl;
import org.apache.calcite.config.CalciteConnectionProperty;
import org.apache.calcite.interpreter.BindableConvention;
import org.apache.calcite.interpreter.BindableRel;
import org.apache.calcite.interpreter.Bindables;
import org.apache.calcite.jdbc.CalciteSchema;
import org.apache.calcite.linq4j.Enumerable;
import org.apache.calcite.linq4j.Enumerator;
import org.apache.calcite.plan.RelOptPlanner;
import org.apache.calcite.plan.RelOptUtil;
import org.apache.calcite.prepare.CalciteCatalogReader;
import org.apache.calcite.prepare.Prepare;
import org.apache.calcite.rel.RelNode;
import org.apache.calcite.rel.RelRoot;
import org.apache.calcite.rel.core.Sort;
import org.apache.calcite.rel.logical.LogicalSort;
import org.apache.calcite.rel.type.RelDataType;
import org.apache.calcite.rel.type.RelDataTypeFactory;
import org.apache.calcite.rex.RexBuilder;
import org.apache.calcite.rex.RexNode;
import org.apache.calcite.sql.SqlExplain;
import org.apache.calcite.sql.SqlKind;
import org.apache.calcite.sql.SqlNode;
import org.apache.calcite.sql.parser.SqlParseException;
import org.apache.calcite.sql.type.SqlTypeName;
import org.apache.calcite.sql.validate.SqlValidator;
import org.apache.calcite.sql.validate.SqlValidatorUtil;
import org.apache.calcite.tools.FrameworkConfig;
import org.apache.calcite.tools.Frameworks;
import org.apache.calcite.tools.Planner;
import org.apache.calcite.tools.RelConversionException;
import org.apache.calcite.tools.ValidationException;
import org.apache.calcite.util.Pair;
import org.apache.druid.java.util.common.guava.BaseSequence;
import org.apache.druid.java.util.common.guava.Sequence;
import org.apache.druid.java.util.common.guava.Sequences;
import org.apache.druid.segment.DimensionHandlerUtils;
import org.apache.druid.sql.calcite.rel.DruidConvention;
import org.apache.druid.sql.calcite.rel.DruidRel;
import javax.annotation.Nullable;
import java.io.Closeable;
import java.util.ArrayList;
import java.util.Iterator;
import java.util.List;
import java.util.Properties;
import java.util.Set;
public class DruidPlanner implements Closeable
{
private final FrameworkConfig frameworkConfig;
private final Planner planner;
private final PlannerContext plannerContext;
private RexBuilder rexBuilder;
public DruidPlanner(
final FrameworkConfig frameworkConfig,
final PlannerContext plannerContext
)
{
this.frameworkConfig = frameworkConfig;
this.planner = Frameworks.getPlanner(frameworkConfig);
this.plannerContext = plannerContext;
}
public PrepareResult prepare(final String sql) throws SqlParseException, ValidationException, RelConversionException
{
SqlNode parsed = planner.parse(sql);
SqlExplain explain = null;
if (parsed.getKind() == SqlKind.EXPLAIN) {
explain = (SqlExplain) parsed;
parsed = explain.getExplicandum();
}
final SqlNode validated = planner.validate(parsed);
RelRoot root = planner.rel(validated);
RelDataType rowType = root.validatedRowType;
// this is sort of lame, planner won't cough up its validator, it is private and has no accessors, so make another
// one so we can get the parameter types... but i suppose beats creating our own Prepare and Planner implementations
SqlValidator validator = getValidator();
RelDataType parameterTypes = validator.getParameterRowType(validator.validate(parsed));
if (explain != null) {
final RelDataTypeFactory typeFactory = root.rel.getCluster().getTypeFactory();
return new PrepareResult(getExplainStructType(typeFactory), parameterTypes);
}
return new PrepareResult(rowType, parameterTypes);
}
public PlannerResult plan(final String sql)
throws SqlParseException, ValidationException, RelConversionException
{
SqlExplain explain = null;
SqlNode parsed = planner.parse(sql);
if (parsed.getKind() == SqlKind.EXPLAIN) {
explain = (SqlExplain) parsed;
parsed = explain.getExplicandum();
}
// the planner's type factory is not available until after parsing
this.rexBuilder = new RexBuilder(planner.getTypeFactory());
SqlParameterizerShuttle sshuttle = new SqlParameterizerShuttle(plannerContext);
SqlNode parametized = parsed.accept(sshuttle);
final SqlNode validated = planner.validate(parametized);
final RelRoot root = planner.rel(validated);
try {
return planWithDruidConvention(explain, root);
}
catch (RelOptPlanner.CannotPlanException e) {
// Try again with BINDABLE convention. Used for querying Values and metadata tables.
try {
return planWithBindableConvention(explain, root);
}
catch (Exception e2) {
e.addSuppressed(e2);
throw e;
}
}
}
public PlannerContext getPlannerContext()
{
return plannerContext;
}
@Override
public void close()
{
planner.close();
}
private SqlValidator getValidator()
{
Preconditions.checkNotNull(planner.getTypeFactory());
final CalciteConnectionConfig connectionConfig;
if (frameworkConfig.getContext() != null) {
connectionConfig = frameworkConfig.getContext().unwrap(CalciteConnectionConfig.class);
} else {
Properties properties = new Properties();
properties.setProperty(
CalciteConnectionProperty.CASE_SENSITIVE.camelName(),
String.valueOf(PlannerFactory.PARSER_CONFIG.caseSensitive())
);
connectionConfig = new CalciteConnectionConfigImpl(properties);
}
Prepare.CatalogReader catalogReader = new CalciteCatalogReader(
CalciteSchema.from(frameworkConfig.getDefaultSchema().getParentSchema()),
CalciteSchema.from(frameworkConfig.getDefaultSchema()).path(null),
planner.getTypeFactory(),
connectionConfig
);
return SqlValidatorUtil.newValidator(
frameworkConfig.getOperatorTable(),
catalogReader,
planner.getTypeFactory(),
DruidConformance.instance()
);
}
private PlannerResult planWithDruidConvention(
final SqlExplain explain,
final RelRoot root
) throws RelConversionException
{
final RelNode possiblyWrappedRootRel = possiblyWrapRootWithOuterLimitFromContext(root);
RelParameterizerShuttle parametizer = new RelParameterizerShuttle(plannerContext);
RelNode parametized = possiblyWrappedRootRel.accept(parametizer);
final DruidRel<?> druidRel = (DruidRel<?>) planner.transform(
Rules.DRUID_CONVENTION_RULES,
planner.getEmptyTraitSet()
.replace(DruidConvention.instance())
.plus(root.collation),
parametized
);
final Set<String> dataSourceNames = ImmutableSet.copyOf(druidRel.getDataSourceNames());
if (explain != null) {
return planExplanation(druidRel, explain, dataSourceNames);
} else {
final Supplier<Sequence<Object[]>> resultsSupplier = () -> {
if (root.isRefTrivial()) {
return druidRel.runQuery();
} else {
// Add a mapping on top to accommodate root.fields.
return Sequences.map(
druidRel.runQuery(),
input -> {
final Object[] retVal = new Object[root.fields.size()];
for (int i = 0; i < root.fields.size(); i++) {
retVal[i] = input[root.fields.get(i).getKey()];
}
return retVal;
}
);
}
};
return new PlannerResult(resultsSupplier, root.validatedRowType, dataSourceNames);
}
}
private PlannerResult planWithBindableConvention(
final SqlExplain explain,
final RelRoot root
) throws RelConversionException
{
BindableRel bindableRel = (BindableRel) planner.transform(
Rules.BINDABLE_CONVENTION_RULES,
planner.getEmptyTraitSet()
.replace(BindableConvention.INSTANCE)
.plus(root.collation),
root.rel
);
if (!root.isRefTrivial()) {
// Add a projection on top to accommodate root.fields.
final List<RexNode> projects = new ArrayList<>();
final RexBuilder rexBuilder = bindableRel.getCluster().getRexBuilder();
for (int field : Pair.left(root.fields)) {
projects.add(rexBuilder.makeInputRef(bindableRel, field));
}
bindableRel = new Bindables.BindableProject(
bindableRel.getCluster(),
bindableRel.getTraitSet(),
bindableRel,
projects,
root.validatedRowType
);
}
if (explain != null) {
return planExplanation(bindableRel, explain, ImmutableSet.of());
} else {
final BindableRel theRel = bindableRel;
final DataContext dataContext = plannerContext.createDataContext(
(JavaTypeFactory) planner.getTypeFactory(),
plannerContext.getParameters()
);
final Supplier<Sequence<Object[]>> resultsSupplier = () -> {
final Enumerable enumerable = theRel.bind(dataContext);
final Enumerator enumerator = enumerable.enumerator();
return Sequences.withBaggage(new BaseSequence<>(
new BaseSequence.IteratorMaker<Object[], EnumeratorIterator<Object[]>>()
{
@Override
public EnumeratorIterator<Object[]> make()
{
return new EnumeratorIterator<>(new Iterator<Object[]>()
{
@Override
public boolean hasNext()
{
return enumerator.moveNext();
}
@Override
public Object[] next()
{
return (Object[]) enumerator.current();
}
});
}
@Override
public void cleanup(EnumeratorIterator iterFromMake)
{
}
}
), enumerator::close);
};
return new PlannerResult(resultsSupplier, root.validatedRowType, ImmutableSet.of());
}
}
/**
* This method wraps the root with a {@link LogicalSort} that applies a limit (no ordering change). If the outer rel
* is already a {@link Sort}, we can merge our outerLimit into it, similar to what is going on in
* {@link org.apache.druid.sql.calcite.rule.SortCollapseRule}.
*
* The {@link PlannerContext#CTX_SQL_OUTER_LIMIT} flag that controls this wrapping is meant for internal use only by
* the web console, allowing it to apply a limit to queries without rewriting the original SQL.
*
* @param root root node
*
* @return root node wrapped with a limiting logical sort if a limit is specified in the query context.
*/
@Nullable
private RelNode possiblyWrapRootWithOuterLimitFromContext(RelRoot root)
{
Object outerLimitObj = plannerContext.getQueryContext().get(PlannerContext.CTX_SQL_OUTER_LIMIT);
Long outerLimit = DimensionHandlerUtils.convertObjectToLong(outerLimitObj, true);
if (outerLimit == null) {
return root.rel;
}
if (root.rel instanceof Sort) {
Sort sort = (Sort) root.rel;
final OffsetLimit originalOffsetLimit = OffsetLimit.fromSort(sort);
final OffsetLimit newOffsetLimit = originalOffsetLimit.andThen(new OffsetLimit(0, outerLimit));
if (newOffsetLimit.equals(originalOffsetLimit)) {
// nothing to do, don't bother to make a new sort
return root.rel;
}
return LogicalSort.create(
sort.getInput(),
sort.collation,
newOffsetLimit.getOffsetAsRexNode(rexBuilder),
newOffsetLimit.getLimitAsRexNode(rexBuilder)
);
} else {
return LogicalSort.create(
root.rel,
root.collation,
null,
new OffsetLimit(0, outerLimit).getLimitAsRexNode(rexBuilder)
);
}
}
private PlannerResult planExplanation(
final RelNode rel,
final SqlExplain explain,
final Set<String> datasourceNames
)
{
final String explanation = RelOptUtil.dumpPlan("", rel, explain.getFormat(), explain.getDetailLevel());
final Supplier<Sequence<Object[]>> resultsSupplier = Suppliers.ofInstance(
Sequences.simple(ImmutableList.of(new Object[]{explanation})));
return new PlannerResult(resultsSupplier, getExplainStructType(rel.getCluster().getTypeFactory()), datasourceNames);
}
private static RelDataType getExplainStructType(RelDataTypeFactory typeFactory)
{
return typeFactory.createStructType(
ImmutableList.of(Calcites.createSqlType(typeFactory, SqlTypeName.VARCHAR)),
ImmutableList.of("PLAN")
);
}
private static class EnumeratorIterator<T> implements Iterator<T>
{
private final Iterator<T> it;
EnumeratorIterator(Iterator<T> it)
{
this.it = it;
}
@Override
public boolean hasNext()
{
return it.hasNext();
}
@Override
public T next()
{
return it.next();
}
}
}