| /* |
| * Licensed to the Apache Software Foundation (ASF) under one or more |
| * contributor license agreements. See the NOTICE file distributed with |
| * this work for additional information regarding copyright ownership. |
| * The ASF licenses this file to you under the Apache License, Version 2.0 |
| * (the "License"); you may not use this file except in compliance with |
| * the License. You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, software |
| * distributed under the License is distributed on an "AS IS" BASIS, |
| * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| * See the License for the specific language governing permissions and |
| * limitations under the License. |
| */ |
| package org.apache.calcite.rel.rules; |
| |
| import org.apache.calcite.plan.RelOptPredicateList; |
| import org.apache.calcite.plan.RelOptRuleCall; |
| import org.apache.calcite.plan.RelRule; |
| import org.apache.calcite.rel.RelCollation; |
| import org.apache.calcite.rel.RelCollations; |
| import org.apache.calcite.rel.RelDistribution; |
| import org.apache.calcite.rel.RelDistributions; |
| import org.apache.calcite.rel.RelFieldCollation; |
| import org.apache.calcite.rel.RelNode; |
| import org.apache.calcite.rel.core.Exchange; |
| import org.apache.calcite.rel.core.SortExchange; |
| import org.apache.calcite.rel.logical.LogicalExchange; |
| import org.apache.calcite.rel.logical.LogicalSortExchange; |
| import org.apache.calcite.rel.metadata.RelMetadataQuery; |
| import org.apache.calcite.rex.RexInputRef; |
| |
| import org.immutables.value.Value; |
| |
| import java.util.ArrayList; |
| import java.util.HashSet; |
| import java.util.List; |
| import java.util.Set; |
| import java.util.function.Predicate; |
| import java.util.stream.Collectors; |
| |
| /** |
| * Planner rule that removes keys from |
| * a {@link Exchange} if those keys are known to be constant. |
| * |
| * <p>For example, |
| * <code>SELECT key,value FROM (SELECT 1 AS key, value FROM src) r DISTRIBUTE |
| * BY key</code> can be reduced to |
| * <code>SELECT 1 AS key, value FROM src</code>. |
| * |
| * @see CoreRules#EXCHANGE_REMOVE_CONSTANT_KEYS |
| * @see CoreRules#SORT_EXCHANGE_REMOVE_CONSTANT_KEYS |
| */ |
| @Value.Enclosing |
| public class ExchangeRemoveConstantKeysRule |
| extends RelRule<ExchangeRemoveConstantKeysRule.Config> |
| implements SubstitutionRule { |
| |
| /** Creates an ExchangeRemoveConstantKeysRule. */ |
| protected ExchangeRemoveConstantKeysRule(Config config) { |
| super(config); |
| } |
| |
| /** Removes constant in distribution keys. */ |
| protected static List<Integer> simplifyDistributionKeys(RelDistribution distribution, |
| Set<Integer> constants) { |
| return distribution.getKeys().stream() |
| .filter(key -> !constants.contains(key)) |
| .collect(Collectors.toList()); |
| } |
| |
| @Override public void onMatch(RelOptRuleCall call) { |
| config.matchHandler().accept(this, call); |
| } |
| |
| private static void matchExchange(ExchangeRemoveConstantKeysRule rule, |
| RelOptRuleCall call) { |
| final Exchange exchange = call.rel(0); |
| final RelMetadataQuery mq = call.getMetadataQuery(); |
| final RelNode input = exchange.getInput(); |
| final RelOptPredicateList predicates = mq.getPulledUpPredicates(input); |
| if (RelOptPredicateList.isEmpty(predicates)) { |
| return; |
| } |
| |
| final Set<Integer> constants = new HashSet<>(); |
| predicates.constantMap.keySet().forEach(key -> { |
| if (key instanceof RexInputRef) { |
| constants.add(((RexInputRef) key).getIndex()); |
| } |
| }); |
| if (constants.isEmpty()) { |
| return; |
| } |
| |
| final List<Integer> distributionKeys = simplifyDistributionKeys( |
| exchange.getDistribution(), constants); |
| |
| if (distributionKeys.size() != exchange.getDistribution().getKeys() |
| .size()) { |
| call.transformTo(call.builder() |
| .push(exchange.getInput()) |
| .exchange(distributionKeys.isEmpty() |
| ? RelDistributions.SINGLETON |
| : RelDistributions.hash(distributionKeys)) |
| .build()); |
| call.getPlanner().prune(exchange); |
| } |
| } |
| |
| private static void matchSortExchange(ExchangeRemoveConstantKeysRule rule, |
| RelOptRuleCall call) { |
| final SortExchange sortExchange = call.rel(0); |
| final RelMetadataQuery mq = call.getMetadataQuery(); |
| final RelNode input = sortExchange.getInput(); |
| final RelOptPredicateList predicates = mq.getPulledUpPredicates(input); |
| if (RelOptPredicateList.isEmpty(predicates)) { |
| return; |
| } |
| |
| final Set<Integer> constants = new HashSet<>(); |
| predicates.constantMap.keySet().forEach(key -> { |
| if (key instanceof RexInputRef) { |
| constants.add(((RexInputRef) key).getIndex()); |
| } |
| }); |
| |
| if (constants.isEmpty()) { |
| return; |
| } |
| |
| List<Integer> distributionKeys = new ArrayList<>(); |
| boolean distributionSimplified = false; |
| boolean hashDistribution = sortExchange.getDistribution().getType() |
| == RelDistribution.Type.HASH_DISTRIBUTED; |
| if (hashDistribution) { |
| distributionKeys = simplifyDistributionKeys( |
| sortExchange.getDistribution(), constants); |
| distributionSimplified = |
| distributionKeys.size() != sortExchange.getDistribution().getKeys() |
| .size(); |
| } |
| |
| final List<RelFieldCollation> fieldCollations = sortExchange |
| .getCollation().getFieldCollations().stream().filter( |
| fc -> !constants.contains(fc.getFieldIndex())) |
| .collect(Collectors.toList()); |
| |
| boolean collationSimplified = |
| fieldCollations.size() != sortExchange.getCollation() |
| .getFieldCollations().size(); |
| if (distributionSimplified |
| || collationSimplified) { |
| RelDistribution distribution = distributionSimplified |
| ? (distributionKeys.isEmpty() |
| ? RelDistributions.SINGLETON |
| : RelDistributions.hash(distributionKeys)) |
| : sortExchange.getDistribution(); |
| RelCollation collation = collationSimplified |
| ? RelCollations.of(fieldCollations) |
| : sortExchange.getCollation(); |
| |
| call.transformTo(call.builder() |
| .push(sortExchange.getInput()) |
| .sortExchange(distribution, collation) |
| .build()); |
| call.getPlanner().prune(sortExchange); |
| } |
| } |
| |
| /** Rule configuration. */ |
| @Value.Immutable(singleton = false) |
| public interface Config extends RelRule.Config { |
| Config DEFAULT = ImmutableExchangeRemoveConstantKeysRule.Config |
| .of(ExchangeRemoveConstantKeysRule::matchExchange) |
| .withOperandFor(LogicalExchange.class, |
| exchange -> exchange.getDistribution().getType() |
| == RelDistribution.Type.HASH_DISTRIBUTED); |
| |
| Config SORT = ImmutableExchangeRemoveConstantKeysRule.Config |
| .of(ExchangeRemoveConstantKeysRule::matchSortExchange) |
| .withDescription("SortExchangeRemoveConstantKeysRule") |
| .as(Config.class) |
| .withOperandFor(LogicalSortExchange.class, |
| sortExchange -> sortExchange.getDistribution().getType() |
| == RelDistribution.Type.HASH_DISTRIBUTED |
| || !sortExchange.getCollation().getFieldCollations() |
| .isEmpty()); |
| |
| @Override default ExchangeRemoveConstantKeysRule toRule() { |
| return new ExchangeRemoveConstantKeysRule(this); |
| } |
| |
| /** Forwards a call to {@link #onMatch(RelOptRuleCall)}. */ |
| @Value.Parameter |
| MatchHandler<ExchangeRemoveConstantKeysRule> matchHandler(); |
| |
| /** Sets {@link #matchHandler()}. */ |
| Config withMatchHandler(MatchHandler<ExchangeRemoveConstantKeysRule> matchHandler); |
| |
| /** Defines an operand tree for the given classes. */ |
| default <R extends Exchange> Config withOperandFor(Class<R> exchangeClass, |
| Predicate<R> predicate) { |
| return withOperandSupplier(b -> |
| b.operand(exchangeClass).predicate(predicate) |
| .anyInputs()) |
| .as(Config.class); |
| } |
| } |
| } |