blob: 6cc4d3bc4bc0875f01516b3727c4a142b8e768ba [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.drill.exec.planner.sql.handlers;
import java.io.IOException;
import java.util.Collection;
import java.util.List;
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;
import com.fasterxml.jackson.databind.ser.PropertyFilter;
import com.fasterxml.jackson.databind.ser.impl.SimpleBeanPropertyFilter;
import com.fasterxml.jackson.databind.ser.impl.SimpleFilterProvider;
import org.apache.drill.exec.util.Utilities;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.Sets;
import org.apache.calcite.plan.RelOptCostImpl;
import org.apache.calcite.plan.RelOptPlanner;
import org.apache.calcite.plan.RelOptRule;
import org.apache.calcite.plan.RelOptUtil;
import org.apache.calcite.plan.RelTraitSet;
import org.apache.calcite.plan.hep.HepMatchOrder;
import org.apache.calcite.plan.hep.HepPlanner;
import org.apache.calcite.plan.hep.HepProgramBuilder;
import org.apache.calcite.plan.volcano.VolcanoPlanner;
import org.apache.calcite.rel.RelNode;
import org.apache.calcite.rel.RelShuttleImpl;
import org.apache.calcite.rel.core.Project;
import org.apache.calcite.rel.core.TableFunctionScan;
import org.apache.calcite.rel.core.TableScan;
import org.apache.calcite.rel.logical.LogicalValues;
import org.apache.calcite.rel.metadata.JaninoRelMetadataProvider;
import org.apache.calcite.rel.metadata.RelMetadataProvider;
import org.apache.calcite.rel.type.RelDataType;
import org.apache.calcite.rex.RexBuilder;
import org.apache.calcite.rex.RexNode;
import org.apache.calcite.rex.RexUtil;
import org.apache.calcite.sql.SqlExplainLevel;
import org.apache.calcite.sql.SqlNode;
import org.apache.calcite.sql.validate.SqlValidatorUtil;
import org.apache.calcite.sql2rel.RelDecorrelator;
import org.apache.calcite.tools.Program;
import org.apache.calcite.tools.Programs;
import org.apache.calcite.tools.RelConversionException;
import org.apache.calcite.tools.RuleSet;
import org.apache.calcite.tools.ValidationException;
import org.apache.calcite.util.Pair;
import org.apache.drill.common.JSONOptions;
import org.apache.drill.common.logical.PlanProperties;
import org.apache.drill.common.logical.PlanProperties.Generator.ResultMode;
import org.apache.drill.common.logical.PlanProperties.PlanPropertiesBuilder;
import org.apache.drill.common.logical.PlanProperties.PlanType;
import org.apache.drill.exec.ExecConstants;
import org.apache.drill.exec.ops.QueryContext;
import org.apache.drill.exec.physical.PhysicalPlan;
import org.apache.drill.exec.physical.base.AbstractPhysicalVisitor;
import org.apache.drill.exec.physical.base.PhysicalOperator;
import org.apache.drill.exec.physical.impl.join.JoinUtils;
import org.apache.drill.exec.planner.PlannerPhase;
import org.apache.drill.exec.planner.PlannerType;
import org.apache.drill.exec.planner.common.DrillRelOptUtil;
import org.apache.drill.exec.planner.logical.DrillProjectRel;
import org.apache.drill.exec.planner.logical.DrillRel;
import org.apache.drill.exec.planner.logical.DrillRelFactories;
import org.apache.drill.exec.planner.logical.DrillScreenRel;
import org.apache.drill.exec.planner.logical.PreProcessLogicalRel;
import org.apache.drill.exec.planner.physical.DrillDistributionTrait;
import org.apache.drill.exec.planner.physical.PhysicalPlanCreator;
import org.apache.drill.exec.planner.physical.PlannerSettings;
import org.apache.drill.exec.planner.physical.Prel;
import org.apache.drill.exec.planner.physical.explain.PrelSequencer;
import org.apache.drill.exec.planner.physical.visitor.AdjustOperatorsSchemaVisitor;
import org.apache.drill.exec.planner.physical.visitor.ComplexToJsonPrelVisitor;
import org.apache.drill.exec.planner.physical.visitor.ExcessiveExchangeIdentifier;
import org.apache.drill.exec.planner.physical.visitor.FinalColumnReorderer;
import org.apache.drill.exec.planner.physical.visitor.InsertLocalExchangeVisitor;
import org.apache.drill.exec.planner.physical.visitor.LateralUnnestRowIDVisitor;
import org.apache.drill.exec.planner.physical.visitor.MemoryEstimationVisitor;
import org.apache.drill.exec.planner.physical.visitor.RelUniqifier;
import org.apache.drill.exec.planner.physical.visitor.RewriteProjectToFlatten;
import org.apache.drill.exec.planner.physical.visitor.RuntimeFilterVisitor;
import org.apache.drill.exec.planner.physical.visitor.SelectionVectorPrelVisitor;
import org.apache.drill.exec.planner.physical.visitor.SplitUpComplexExpressions;
import org.apache.drill.exec.planner.physical.visitor.StarColumnConverter;
import org.apache.drill.exec.planner.physical.visitor.SwapHashJoinVisitor;
import org.apache.drill.exec.planner.physical.visitor.TopProjectVisitor;
import org.apache.drill.exec.planner.sql.parser.FindLimit0SqlVisitor;
import org.apache.drill.exec.planner.sql.parser.UnsupportedOperatorsVisitor;
import org.apache.drill.exec.server.options.OptionManager;
import org.apache.drill.exec.store.StoragePlugin;
import org.apache.drill.exec.util.Pointer;
import org.apache.drill.exec.work.foreman.ForemanSetupException;
import org.apache.drill.exec.work.foreman.SqlUnsupportedException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import com.google.common.base.Preconditions;
import com.google.common.base.Stopwatch;
import com.google.common.collect.Lists;
public class DefaultSqlHandler extends AbstractSqlHandler {
private static final Logger logger = LoggerFactory.getLogger(DefaultSqlHandler.class);
private final Pointer<String> textPlan;
private final long targetSliceSize;
protected final SqlHandlerConfig config;
protected final QueryContext context;
public DefaultSqlHandler(SqlHandlerConfig config) {
this(config, null);
}
public DefaultSqlHandler(SqlHandlerConfig config, Pointer<String> textPlan) {
super();
this.config = config;
this.context = config.getContext();
this.textPlan = textPlan;
this.targetSliceSize = config.getContext().getOptions().getOption(ExecConstants.SLICE_TARGET_OPTION);
}
protected void log(final PlannerType plannerType, final PlannerPhase phase, final RelNode node, final Logger logger,
Stopwatch watch) {
if (logger.isDebugEnabled()) {
log(plannerType.name() + ":" + phase.description, node, logger, watch);
}
}
protected void log(final String description, final RelNode node, final Logger logger, Stopwatch watch) {
if (logger.isDebugEnabled()) {
final String plan = RelOptUtil.toString(node, SqlExplainLevel.ALL_ATTRIBUTES);
final String time = watch == null ? "" : String.format(" (%dms)", watch.elapsed(TimeUnit.MILLISECONDS));
logger.debug(String.format("%s%s:\n%s", description, time, plan));
}
}
protected void logAndSetTextPlan(final String description, final Prel prel, final Logger logger) {
final String plan = PrelSequencer.printWithIds(prel, SqlExplainLevel.ALL_ATTRIBUTES);
if (textPlan != null) {
textPlan.value = plan;
}
if (logger.isDebugEnabled()) {
logger.debug(String.format("%s:\n%s", description, plan));
}
}
protected void log(final String name, final PhysicalPlan plan, final Logger logger) {
if (logger.isDebugEnabled()) {
PropertyFilter filter = new SimpleBeanPropertyFilter.SerializeExceptFilter(Sets.newHashSet("password"));
String planText = plan.unparse(context.getLpPersistence().getMapper()
.writer(new SimpleFilterProvider().addFilter("passwordFilter", filter)));
logger.debug(name + " : \n" + planText);
}
}
@Override
public PhysicalPlan getPlan(SqlNode sqlNode) throws ValidationException, RelConversionException, IOException, ForemanSetupException {
final ConvertedRelNode convertedRelNode = validateAndConvert(sqlNode);
final RelDataType validatedRowType = convertedRelNode.getValidatedRowType();
final RelNode queryRelNode = convertedRelNode.getConvertedNode();
final DrillRel drel = convertToDrel(queryRelNode);
final Prel prel = convertToPrel(drel, validatedRowType);
logAndSetTextPlan("Drill Physical", prel, logger);
final PhysicalOperator pop = convertToPop(prel);
final PhysicalPlan plan = convertToPlan(pop, queryRelNode);
log("Drill Plan", plan, logger);
return plan;
}
/**
* Rewrite the parse tree. Used before validating the parse tree. Useful if a
* particular statement needs to converted into another statement.
*
* @param node sql parse tree to be rewritten
* @return Rewritten sql parse tree
*/
protected SqlNode rewrite(SqlNode node) throws RelConversionException, ForemanSetupException {
return node;
}
protected ConvertedRelNode validateAndConvert(SqlNode sqlNode) throws ForemanSetupException, RelConversionException, ValidationException {
final SqlNode rewrittenSqlNode = rewrite(sqlNode);
final Pair<SqlNode, RelDataType> validatedTypedSqlNode = validateNode(rewrittenSqlNode);
final SqlNode validated = validatedTypedSqlNode.getKey();
RelNode rel = convertToRel(validated);
rel = preprocessNode(rel);
return new ConvertedRelNode(rel, validatedTypedSqlNode.getValue());
}
/**
* Given a relNode tree for SELECT statement, convert to Drill Logical RelNode tree.
*
* @param relNode relational node
* @return Drill Logical RelNode tree
* @throws SqlUnsupportedException if query cannot be planned
*/
protected DrillRel convertToRawDrel(final RelNode relNode) throws SqlUnsupportedException {
if (context.getOptions().getOption(ExecConstants.EARLY_LIMIT0_OPT) &&
context.getPlannerSettings().isTypeInferenceEnabled() &&
FindLimit0Visitor.containsLimit0(relNode)) {
// if the schema is known, return the schema directly
final DrillRel shorterPlan;
if ((shorterPlan = FindLimit0Visitor.getDirectScanRelIfFullySchemaed(relNode)) != null) {
return shorterPlan;
}
if (FindHardDistributionScans.canForceSingleMode(relNode)) {
// disable distributed mode
context.getPlannerSettings().forceSingleMode();
}
}
try {
// HEP Directory pruning.
final RelNode pruned = transform(PlannerType.HEP_BOTTOM_UP, PlannerPhase.DIRECTORY_PRUNING, relNode);
final RelTraitSet logicalTraits = pruned.getTraitSet().plus(DrillRel.DRILL_LOGICAL);
final RelNode convertedRelNode;
if (!context.getPlannerSettings().isHepOptEnabled()) {
// hep is disabled, use volcano
convertedRelNode = transform(PlannerType.VOLCANO, PlannerPhase.LOGICAL_PRUNE_AND_JOIN, pruned, logicalTraits);
} else {
final RelNode intermediateNode2;
final RelNode intermediateNode3;
if (context.getPlannerSettings().isHepPartitionPruningEnabled()) {
final RelNode intermediateNode = transform(PlannerType.VOLCANO, PlannerPhase.LOGICAL, pruned, logicalTraits);
// HEP Join Push Transitive Predicates
final RelNode transitiveClosureNode =
transform(PlannerType.HEP, PlannerPhase.TRANSITIVE_CLOSURE, intermediateNode);
// hep is enabled and hep pruning is enabled.
intermediateNode2 = transform(PlannerType.HEP_BOTTOM_UP, PlannerPhase.PARTITION_PRUNING, transitiveClosureNode);
} else {
// Only hep is enabled
final RelNode intermediateNode =
transform(PlannerType.VOLCANO, PlannerPhase.LOGICAL_PRUNE, pruned, logicalTraits);
// HEP Join Push Transitive Predicates
intermediateNode2 = transform(PlannerType.HEP, PlannerPhase.TRANSITIVE_CLOSURE, intermediateNode);
}
// Do Join Planning.
intermediateNode3 = transform(PlannerType.HEP_BOTTOM_UP, PlannerPhase.JOIN_PLANNING, intermediateNode2);
if (context.getPlannerSettings().isRowKeyJoinConversionEnabled()) {
// Covert Join to RowKeyJoin, where applicable.
convertedRelNode = transform(PlannerType.HEP_BOTTOM_UP, PlannerPhase.ROWKEYJOIN_CONVERSION, intermediateNode3);
} else {
convertedRelNode = intermediateNode3;
}
}
// Convert SUM to $SUM0
final RelNode convertedRelNodeWithSum0 = transform(PlannerType.HEP_BOTTOM_UP, PlannerPhase.SUM_CONVERSION, convertedRelNode);
DrillRel drillRel = (DrillRel) convertedRelNodeWithSum0;
// If the query contains a limit 0 clause, disable distributed mode since it is overkill for determining schema.
if (FindLimit0Visitor.containsLimit0(convertedRelNodeWithSum0) &&
FindHardDistributionScans.canForceSingleMode(convertedRelNodeWithSum0)) {
context.getPlannerSettings().forceSingleMode();
if (context.getOptions().getOption(ExecConstants.LATE_LIMIT0_OPT)) {
drillRel = FindLimit0Visitor.addLimitOnTopOfLeafNodes(drillRel);
}
}
return drillRel;
} catch (RelOptPlanner.CannotPlanException ex) {
logger.error(ex.getMessage());
if (JoinUtils.checkCartesianJoin(relNode)) {
throw JoinUtils.cartesianJoinPlanningException();
} else {
throw ex;
}
}
}
/**
* Return Drill Logical RelNode tree for a SELECT statement, when it is executed / explained directly.
* Adds screen operator on top of converted node.
*
* @param relNode root RelNode corresponds to Calcite Logical RelNode.
* @return Drill Logical RelNode tree
* @throws SqlUnsupportedException if query cannot be planned
*/
protected DrillRel convertToDrel(RelNode relNode) throws SqlUnsupportedException {
final DrillRel convertedRelNode = convertToRawDrel(relNode);
return new DrillScreenRel(convertedRelNode.getCluster(), convertedRelNode.getTraitSet(),
convertedRelNode);
}
/**
* Finalize all RelNodes.
*/
private static class PrelFinalizer extends RelShuttleImpl {
@Override
public RelNode visit(RelNode other) {
if (other instanceof PrelFinalizable) {
return ((PrelFinalizable) other).finalizeRel();
} else {
return super.visit(other);
}
}
}
/**
* Transform RelNode to a new RelNode without changing any traits. Also will log the outcome.
*
* @param plannerType The type of Planner to use.
* @param phase The transformation phase we're running.
* @param input The original RelNode
* @return The transformed relnode.
*/
private RelNode transform(PlannerType plannerType, PlannerPhase phase, RelNode input) {
return transform(plannerType, phase, input, input.getTraitSet());
}
/**
* Transform RelNode to a new RelNode, targeting the provided set of traits. Also will log the outcome.
*
* @param plannerType The type of Planner to use.
* @param phase The transformation phase we're running.
* @param input The original RelNode
* @param targetTraits The traits we are targeting for output.
* @return The transformed relnode.
*/
protected RelNode transform(PlannerType plannerType, PlannerPhase phase, RelNode input, RelTraitSet targetTraits) {
return transform(plannerType, phase, input, targetTraits, true);
}
/**
* Transform RelNode to a new RelNode, targeting the provided set of traits. Also will log the outcome if asked.
*
* @param plannerType The type of Planner to use.
* @param phase The transformation phase we're running.
* @param input The original RelNode
* @param targetTraits The traits we are targeting for output.
* @param log Whether to log the planning phase.
* @return The transformed relnode.
*/
protected RelNode transform(PlannerType plannerType, PlannerPhase phase, RelNode input, RelTraitSet targetTraits,
boolean log) {
final Stopwatch watch = Stopwatch.createStarted();
final RuleSet rules = config.getRules(phase, input);
final RelTraitSet toTraits = targetTraits.simplify();
final RelNode output;
switch (plannerType) {
case HEP_BOTTOM_UP:
case HEP: {
final HepProgramBuilder hepPgmBldr = new HepProgramBuilder();
if (plannerType == PlannerType.HEP_BOTTOM_UP) {
hepPgmBldr.addMatchOrder(HepMatchOrder.BOTTOM_UP);
}
for (RelOptRule rule : rules) {
hepPgmBldr.addRuleInstance(rule);
}
// Set noDAG = true to avoid caching problems which lead to incorrect Drill work.
final HepPlanner planner = new HepPlanner(hepPgmBldr.build(), context.getPlannerSettings(), true, null,
RelOptCostImpl.FACTORY);
JaninoRelMetadataProvider relMetadataProvider = Utilities.registerJaninoRelMetadataProvider();
// Modify RelMetaProvider for every RelNode in the SQL operator Rel tree.
input.accept(new MetaDataProviderModifier(relMetadataProvider));
planner.setRoot(input);
if (!input.getTraitSet().equals(targetTraits)) {
planner.changeTraits(input, toTraits);
}
output = planner.findBestExp();
break;
}
case VOLCANO:
default: {
// as weird as it seems, the cluster's only planner is the volcano planner.
final RelOptPlanner planner = input.getCluster().getPlanner();
final Program program = Programs.of(rules);
Preconditions.checkArgument(planner instanceof VolcanoPlanner,
"Cluster is expected to be constructed using VolcanoPlanner. Was actually of type %s.", planner.getClass()
.getName());
output = program.run(planner, input, toTraits,
ImmutableList.of(), ImmutableList.of());
break;
}
}
if (log) {
log(plannerType, phase, output, logger, watch);
}
return output;
}
/**
* Applies physical rules and certain transformations to convert drill relational node into physical one.
*
* @param drel relational node
* @param validatedRowType final output row type
* @return physical relational node
* @throws RelConversionException
* @throws SqlUnsupportedException
*/
protected Prel convertToPrel(RelNode drel, RelDataType validatedRowType)
throws RelConversionException, SqlUnsupportedException {
Preconditions.checkArgument(drel.getConvention() == DrillRel.DRILL_LOGICAL);
final RelTraitSet traits = drel.getTraitSet().plus(Prel.DRILL_PHYSICAL).plus(DrillDistributionTrait.SINGLETON);
Prel phyRelNode;
try {
final Stopwatch watch = Stopwatch.createStarted();
final RelNode relNode = transform(PlannerType.VOLCANO, PlannerPhase.PHYSICAL, drel, traits, false);
phyRelNode = (Prel) relNode.accept(new PrelFinalizer());
// log externally as we need to finalize before traversing the tree.
log(PlannerType.VOLCANO, PlannerPhase.PHYSICAL, phyRelNode, logger, watch);
} catch (RelOptPlanner.CannotPlanException ex) {
logger.error(ex.getMessage());
if (JoinUtils.checkCartesianJoin(drel)) {
throw JoinUtils.cartesianJoinPlanningException();
} else {
throw ex;
}
}
OptionManager queryOptions = context.getOptions();
if (context.getPlannerSettings().isMemoryEstimationEnabled()
&& !MemoryEstimationVisitor.enoughMemory(phyRelNode, queryOptions, context.getActiveEndpoints().size())) {
log("Not enough memory for this plan", phyRelNode, logger, null);
logger.debug("Re-planning without hash operations.");
queryOptions.setLocalOption(PlannerSettings.HASHJOIN.getOptionName(), false);
queryOptions.setLocalOption(PlannerSettings.HASHAGG.getOptionName(), false);
try {
final RelNode relNode = transform(PlannerType.VOLCANO, PlannerPhase.PHYSICAL, drel, traits);
phyRelNode = (Prel) relNode.accept(new PrelFinalizer());
} catch (RelOptPlanner.CannotPlanException ex) {
logger.error(ex.getMessage());
if (JoinUtils.checkCartesianJoin(drel)) {
throw JoinUtils.cartesianJoinPlanningException();
} else {
throw ex;
}
}
}
// Handy way to visualize the plan while debugging
//ExplainHandler.printPlan(phyRelNode, context);
/* The order of the following transformations is important */
/*
* 0.)
* Add top project before screen operator or writer to ensure that final output column names are preserved.
*/
phyRelNode = TopProjectVisitor.insertTopProject(phyRelNode, validatedRowType);
/*
* 1.) For select * from join query, we need insert project on top of scan and a top project just
* under screen operator. The project on top of scan will rename from * to T1*, while the top project
* will rename T1* to *, before it output the final result. Only the top project will allow
* duplicate columns, since user could "explicitly" ask for duplicate columns ( select *, col, *).
* The rest of projects will remove the duplicate column when we generate POP in json format.
*/
phyRelNode = StarColumnConverter.insertRenameProject(phyRelNode);
log("Physical RelNode after Top and Rename Project inserting: ", phyRelNode, logger, null);
/*
* 2.)
* Join might cause naming conflicts from its left and right child.
* In such case, we have to insert Project to rename the conflicting names.
* Unnest operator might need to adjust the correlated field after the physical planning.
*/
phyRelNode = AdjustOperatorsSchemaVisitor.adjustSchema(phyRelNode);
/*
* 2.1) Swap left / right for INNER hash join, if left's row count is < (1 + margin) right's row count.
* We want to have smaller dataset on the right side, since hash table builds on right side.
*/
if (context.getPlannerSettings().isHashJoinSwapEnabled()) {
phyRelNode = SwapHashJoinVisitor.swapHashJoin(phyRelNode,
context.getPlannerSettings().getHashJoinSwapMarginFactor());
}
/* Parquet row group filter pushdown in planning time */
if (context.getPlannerSettings().isParquetRowGroupFilterPushdownPlanningEnabled()) {
phyRelNode = (Prel) transform(PlannerType.HEP_BOTTOM_UP, PlannerPhase.PHYSICAL_PARTITION_PRUNING, phyRelNode);
}
/*
* 2.2) Break up all expressions with complex outputs into their own project operations
*/
phyRelNode = phyRelNode.accept(
new SplitUpComplexExpressions(
config.getConverter().getTypeFactory(),
context.getPlannerSettings().functionImplementationRegistry,
phyRelNode.getCluster().getRexBuilder()
),
null);
/*
* 2.3) Projections that contain reference to flatten are rewritten as Flatten operators followed by Project
*/
phyRelNode = phyRelNode.accept(
new RewriteProjectToFlatten(config.getConverter().getTypeFactory(), context.getDrillOperatorTable()), null);
/*
* 3.)
* Since our operators work via names rather than indices, we have to reorder any
* output before we return data to the user as we may have accidentally shuffled things.
* This adds a trivial project to reorder columns prior to output.
*/
phyRelNode = FinalColumnReorderer.addFinalColumnOrdering(phyRelNode);
/*
* 4.)
* If two fragments are both estimated to be parallelization one, remove the exchange
* separating them.
*/
phyRelNode = ExcessiveExchangeIdentifier.removeExcessiveExchanges(phyRelNode, targetSliceSize);
/* Insert the IMPLICIT_COLUMN in the lateral unnest pipeline */
phyRelNode = LateralUnnestRowIDVisitor.insertRowID(phyRelNode);
/* 5.)
* Add ProducerConsumer after each scan if the option is set
* Use the configured queueSize
*/
/* DRILL-1617 Disabling ProducerConsumer as it produces incorrect results
if (context.getOptions().getOption(PlannerSettings.PRODUCER_CONSUMER.getOptionName()).bool_val) {
long queueSize = context.getOptions().getOption(PlannerSettings.PRODUCER_CONSUMER_QUEUE_SIZE.getOptionName()).num_val;
phyRelNode = ProducerConsumerPrelVisitor.addProducerConsumerToScans(phyRelNode, (int) queueSize);
}
*/
/* 6.)
* if the client does not support complex types (Map, Repeated)
* insert a project which would convert
*/
if (!context.getSession().isSupportComplexTypes()) {
logger.debug("Client does not support complex types, add ComplexToJson operator.");
phyRelNode = ComplexToJsonPrelVisitor.addComplexToJsonPrel(phyRelNode);
}
/* 7.)
* Insert LocalExchange (mux and/or demux) nodes
*/
phyRelNode = InsertLocalExchangeVisitor.insertLocalExchanges(phyRelNode, queryOptions);
/*
* 8.)
* Insert RuntimeFilter over Scan nodes
*/
if (context.isRuntimeFilterEnabled()) {
phyRelNode = RuntimeFilterVisitor.addRuntimeFilter(phyRelNode, context);
}
/* 9.)
* Next, we add any required selection vector removers given the supported encodings of each
* operator. This will ultimately move to a new trait but we're managing here for now to avoid
* introducing new issues in planning before the next release
*/
phyRelNode = SelectionVectorPrelVisitor.addSelectionRemoversWhereNecessary(phyRelNode);
/*
* 10.)
* Insert project above the screen operator or writer to ensure that final output column names are preserved after all optimizations.
*/
phyRelNode = TopProjectVisitor.insertTopProject(phyRelNode, validatedRowType);
/* 11.)
* Finally, Make sure that the no rels are repeats.
* This could happen in the case of querying the same table twice as Optiq may canonicalize these.
*/
phyRelNode = RelUniqifier.uniqifyGraph(phyRelNode);
return phyRelNode;
}
protected PhysicalOperator convertToPop(Prel prel) throws IOException {
PhysicalPlanCreator creator = new PhysicalPlanCreator(context, PrelSequencer.getIdMap(prel));
PhysicalOperator op = prel.getPhysicalOperator(creator);
return op;
}
protected PhysicalPlan convertToPlan(PhysicalOperator op, RelNode queryRelNode) {
List<String> scannedPluginNames = config.getScannedPlugins(queryRelNode)
.stream()
.map(StoragePlugin::getName)
.collect(Collectors.toList());
PlanPropertiesBuilder propsBuilder = PlanProperties.builder();
propsBuilder.type(PlanType.APACHE_DRILL_PHYSICAL);
propsBuilder.version(1);
propsBuilder.options(new JSONOptions(context.getOptions().getOptionList()));
propsBuilder.resultMode(ResultMode.EXEC);
propsBuilder.generator(this.getClass().getSimpleName(), "");
propsBuilder.scannedPluginNames(scannedPluginNames);
PhysicalPlan plan = new PhysicalPlan(propsBuilder.build(), getPops(op));
return plan;
}
public static List<PhysicalOperator> getPops(PhysicalOperator root) {
List<PhysicalOperator> ops = Lists.newArrayList();
PopCollector c = new PopCollector();
root.accept(c, ops);
return ops;
}
private static class PopCollector extends
AbstractPhysicalVisitor<Void, Collection<PhysicalOperator>, RuntimeException> {
@Override
public Void visitOp(PhysicalOperator op, Collection<PhysicalOperator> collection) throws RuntimeException {
collection.add(op);
for (PhysicalOperator o : op) {
o.accept(this, collection);
}
return null;
}
}
protected Pair<SqlNode, RelDataType> validateNode(SqlNode sqlNode) throws ValidationException, RelConversionException, ForemanSetupException {
// Check for a LIMIT 0 in the root portion of the query before validation
// because validation of the query's FROM clauses will already trigger
// the recursive listing files to which FILE_LISTING_LIMIT0_OPT is meant
// to apply.
boolean rootSelectLimit0 = FindLimit0SqlVisitor.containsLimit0(sqlNode);
context.getOptions().setLocalOption(
ExecConstants.FILE_LISTING_LIMIT0_OPT_KEY,
rootSelectLimit0
);
final SqlNode sqlNodeValidated = config.getConverter().validate(sqlNode);
final Pair<SqlNode, RelDataType> typedSqlNode = new Pair<>(sqlNodeValidated, config.getConverter().getOutputType(
sqlNodeValidated));
// Check if the unsupported functionality is used
UnsupportedOperatorsVisitor visitor = UnsupportedOperatorsVisitor.createVisitor(context);
try {
sqlNodeValidated.accept(visitor);
} catch (UnsupportedOperationException ex) {
// If the exception due to the unsupported functionalities
visitor.convertException();
// If it is not, let this exception move forward to higher logic
throw ex;
}
return typedSqlNode;
}
private RelNode convertToRel(SqlNode node) {
final RelNode convertedNode = config.getConverter().toRel(node).rel;
log("INITIAL", convertedNode, logger, null);
RelNode transformedNode = transform(PlannerType.HEP,
PlannerPhase.SUBQUERY_REWRITE, convertedNode);
RelNode decorrelatedNode = RelDecorrelator.decorrelateQuery(transformedNode,
DrillRelFactories.LOGICAL_BUILDER.create(transformedNode.getCluster(), null));
return transform(PlannerType.HEP, PlannerPhase.WINDOW_REWRITE, decorrelatedNode);
}
private RelNode preprocessNode(RelNode rel) throws SqlUnsupportedException {
/*
* Traverse the tree to do the following pre-processing tasks: 1. replace the convert_from, convert_to function to
* actual implementations Eg: convert_from(EXPR, 'JSON') be converted to convert_fromjson(EXPR); TODO: Ideally all
* function rewrites would move here instead of DrillOptiq.
*
* 2. see where the tree contains unsupported functions; throw SqlUnsupportedException if there is any.
*/
PreProcessLogicalRel visitor = PreProcessLogicalRel.createVisitor(config.getConverter().getTypeFactory(),
context.getDrillOperatorTable(),
rel.getCluster().getRexBuilder());
try {
rel = rel.accept(visitor);
} catch (UnsupportedOperationException ex) {
visitor.convertException();
throw ex;
}
// moves complex expressions below Uncollect to the right side of Correlate
return ComplexUnnestVisitor.rewriteUnnestWithComplexExprs(rel);
}
protected DrillRel addRenamedProject(DrillRel rel, RelDataType validatedRowType) {
RelDataType t = rel.getRowType();
RexBuilder b = rel.getCluster().getRexBuilder();
List<RexNode> projections = Lists.newArrayList();
int projectCount = t.getFieldList().size();
for (int i =0; i < projectCount; i++) {
projections.add(b.makeInputRef(rel, i));
}
final List<String> fieldNames2 = SqlValidatorUtil.uniquify(
validatedRowType.getFieldNames(),
SqlValidatorUtil.EXPR_SUGGESTER,
rel.getCluster().getTypeFactory().getTypeSystem().isSchemaCaseSensitive());
RelDataType newRowType = RexUtil.createStructType(rel.getCluster().getTypeFactory(),
projections, fieldNames2, null);
DrillProjectRel topProj = DrillProjectRel.create(rel.getCluster(), rel.getTraitSet(), rel, projections, newRowType);
// Add a final non-trivial Project to get the validatedRowType, if child is not project.
if (rel instanceof Project && DrillRelOptUtil.isTrivialProject(topProj, true)) {
return rel;
} else{
return topProj;
}
}
public static class MetaDataProviderModifier extends RelShuttleImpl {
private final RelMetadataProvider metadataProvider;
public MetaDataProviderModifier(RelMetadataProvider metadataProvider) {
this.metadataProvider = metadataProvider;
}
@Override
public RelNode visit(TableScan scan) {
scan.getCluster().setMetadataProvider(metadataProvider);
return super.visit(scan);
}
@Override
public RelNode visit(TableFunctionScan scan) {
scan.getCluster().setMetadataProvider(metadataProvider);
return super.visit(scan);
}
@Override
public RelNode visit(LogicalValues values) {
values.getCluster().setMetadataProvider(metadataProvider);
return super.visit(values);
}
@Override
protected RelNode visitChild(RelNode parent, int i, RelNode child) {
child.accept(this);
parent.getCluster().setMetadataProvider(metadataProvider);
return parent;
}
}
protected class ConvertedRelNode {
private final RelNode relNode;
private final RelDataType validatedRowType;
public ConvertedRelNode(RelNode relNode, RelDataType validatedRowType) {
this.relNode = relNode;
this.validatedRowType = validatedRowType;
}
public RelNode getConvertedNode() {
return this.relNode;
}
public RelDataType getValidatedRowType() {
return this.validatedRowType;
}
}
}