blob: d2ae72357474d541cf2ee92f6b4aad9237dcafdd [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hive.ql.optimizer.topnkey;
import org.apache.hadoop.hive.ql.exec.CommonJoinOperator;
import org.apache.hadoop.hive.ql.exec.GroupByOperator;
import org.apache.hadoop.hive.ql.exec.Operator;
import org.apache.hadoop.hive.ql.exec.ReduceSinkOperator;
import org.apache.hadoop.hive.ql.exec.SelectOperator;
import org.apache.hadoop.hive.ql.exec.TopNKeyOperator;
import org.apache.hadoop.hive.ql.lib.Node;
import org.apache.hadoop.hive.ql.lib.NodeProcessor;
import org.apache.hadoop.hive.ql.lib.NodeProcessorCtx;
import org.apache.hadoop.hive.ql.parse.SemanticException;
import org.apache.hadoop.hive.ql.plan.ExprNodeDesc;
import org.apache.hadoop.hive.ql.plan.GroupByDesc;
import org.apache.hadoop.hive.ql.plan.JoinCondDesc;
import org.apache.hadoop.hive.ql.plan.JoinDesc;
import org.apache.hadoop.hive.ql.plan.OperatorDesc;
import org.apache.hadoop.hive.ql.plan.ReduceSinkDesc;
import org.apache.hadoop.hive.ql.plan.TopNKeyDesc;
import org.apache.hadoop.hive.ql.plan.api.OperatorType;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.Stack;
import static org.apache.hadoop.hive.ql.optimizer.topnkey.TopNKeyProcessor.copyDown;
/**
* Implementation of TopNKey operator pushdown.
*/
public class TopNKeyPushdownProcessor implements NodeProcessor {
private static final Logger LOG = LoggerFactory.getLogger(TopNKeyPushdownProcessor.class);
@Override
public Object process(Node nd, Stack<Node> stack, NodeProcessorCtx procCtx,
Object... nodeOutputs) throws SemanticException {
pushdown((TopNKeyOperator) nd);
return null;
}
private void pushdown(TopNKeyOperator topNKey) throws SemanticException {
final Operator<? extends OperatorDesc> parent =
topNKey.getParentOperators().get(0);
switch (parent.getType()) {
case SELECT:
pushdownThroughSelect(topNKey);
break;
case FORWARD:
pushdownThroughParent(topNKey);
break;
case GROUPBY:
pushdownThroughGroupBy(topNKey);
break;
case REDUCESINK:
pushdownThroughReduceSink(topNKey);
break;
case MERGEJOIN:
case JOIN:
pushDownThroughJoin(topNKey);
break;
case TOPNKEY:
pushdownThroughTopNKey(topNKey);
break;
default:
break;
}
}
/**
* Push through Project if expression(s) in TopNKey can be mapped to expression(s) based on Project input.
*
* @param topNKey TopNKey operator to push
* @throws SemanticException when removeChildAndAdoptItsChildren was not successful in the method pushdown
*/
private void pushdownThroughSelect(TopNKeyOperator topNKey) throws SemanticException {
final SelectOperator select = (SelectOperator) topNKey.getParentOperators().get(0);
final TopNKeyDesc topNKeyDesc = topNKey.getConf();
final List<ExprNodeDesc> mappedColumns = mapColumns(topNKeyDesc.getKeyColumns(), select.getColumnExprMap());
if (mappedColumns.size() != topNKeyDesc.getKeyColumns().size()) {
return;
}
LOG.debug("Pushing {} through {}", topNKey.getName(), select.getName());
topNKeyDesc.setKeyColumns(mappedColumns);
moveDown(topNKey);
pushdown(topNKey);
}
private static List<ExprNodeDesc> mapColumns(List<ExprNodeDesc> columns, Map<String, ExprNodeDesc>
colExprMap) {
if (colExprMap == null) {
return new ArrayList<>(0);
}
final List<ExprNodeDesc> mappedColumns = new ArrayList<>();
for (ExprNodeDesc column : columns) {
final String columnName = column.getExprString();
if (colExprMap.containsKey(columnName)) {
mappedColumns.add(colExprMap.get(columnName));
}
}
return mappedColumns;
}
private void pushdownThroughParent(TopNKeyOperator topNKey) throws SemanticException {
Operator<? extends OperatorDesc> parent = topNKey.getParentOperators().get(0);
LOG.debug("Pushing {} through {}", topNKey.getName(), parent.getName());
moveDown(topNKey);
pushdown(topNKey);
}
/**
* Push through GroupBy. No grouping sets. If TopNKey expression is same as GroupBy expression,
* we can push it and remove it from above GroupBy. If expression in TopNKey shared common
* prefix with GroupBy, TopNKey could be pushed through GroupBy using that prefix and kept above
* it.
*
* @param topNKey TopNKey operator to push
* @throws SemanticException when removeChildAndAdoptItsChildren was not successful
*/
private void pushdownThroughGroupBy(TopNKeyOperator topNKey) throws SemanticException {
final GroupByOperator groupBy = (GroupByOperator) topNKey.getParentOperators().get(0);
final GroupByDesc groupByDesc = groupBy.getConf();
final TopNKeyDesc topNKeyDesc = topNKey.getConf();
CommonKeyPrefix commonKeyPrefix = CommonKeyPrefix.map(topNKeyDesc, groupByDesc);
if (commonKeyPrefix.isEmpty() || commonKeyPrefix.size() == topNKeyDesc.getPartitionKeyColumns().size()) {
return;
}
LOG.debug("Pushing a copy of {} through {}", topNKey.getName(), groupBy.getName());
final TopNKeyDesc newTopNKeyDesc = topNKeyDesc.combine(commonKeyPrefix);
pushdown(copyDown(groupBy, newTopNKeyDesc));
if (topNKeyDesc.getKeyColumns().size() == commonKeyPrefix.size()) {
LOG.debug("Removing {} above {}", topNKey.getName(), groupBy.getName());
groupBy.removeChildAndAdoptItsChildren(topNKey);
}
}
/**
* Push through ReduceSink. If TopNKey expression is same as ReduceSink expression and order is
* the same, we can push it and remove it from above ReduceSink. If expression in TopNKey shared
* common prefix with ReduceSink including same order, TopNKey could be pushed through
* ReduceSink using that prefix and kept above it.
*
* @param topNKey TopNKey operator to push
* @throws SemanticException when removeChildAndAdoptItsChildren was not successful
*/
private void pushdownThroughReduceSink(TopNKeyOperator topNKey) throws SemanticException {
ReduceSinkOperator reduceSink = (ReduceSinkOperator) topNKey.getParentOperators().get(0);
final ReduceSinkDesc reduceSinkDesc = reduceSink.getConf();
final TopNKeyDesc topNKeyDesc = topNKey.getConf();
CommonKeyPrefix commonKeyPrefix = CommonKeyPrefix.map(topNKeyDesc, reduceSinkDesc);
if (commonKeyPrefix.isEmpty() || commonKeyPrefix.size() == topNKeyDesc.getPartitionKeyColumns().size()) {
return;
}
LOG.debug("Pushing a copy of {} through {}", topNKey.getName(), reduceSink.getName());
final TopNKeyDesc newTopNKeyDesc = topNKeyDesc.combine(commonKeyPrefix);
pushdown(copyDown(reduceSink, newTopNKeyDesc));
if (topNKeyDesc.getKeyColumns().size() == commonKeyPrefix.size()) {
LOG.debug("Removing {} above {}", topNKey.getName(), reduceSink.getName());
reduceSink.removeChildAndAdoptItsChildren(topNKey);
}
}
// Only push down through Left Outer Join is supported.
// Right and Full Outer Join support will be added in a follow up patch.
private void pushDownThroughJoin(TopNKeyOperator topNKey)
throws SemanticException {
CommonJoinOperator<? extends JoinDesc> parent =
(CommonJoinOperator<? extends JoinDesc>) topNKey.getParentOperators().get(0);
JoinCondDesc[] joinConds = parent.getConf().getConds();
JoinCondDesc firstJoinCond = joinConds[0];
for (JoinCondDesc joinCond : joinConds) {
if (!firstJoinCond.equals(joinCond)) {
return;
}
}
if (firstJoinCond.getType() == JoinDesc.LEFT_OUTER_JOIN) {
pushdownThroughLeftOuterJoin(topNKey);
}
}
/**
* Push through LOJ. If TopNKey expression refers fully to expressions from left input, push
* with rewriting of expressions and remove from top of LOJ. If TopNKey expression has a prefix
* that refers to expressions from left input, push with rewriting of those expressions and keep
* on top of LOJ.
*
* @param topNKey TopNKey operator to push
* @throws SemanticException when removeChildAndAdoptItsChildren was not successful
*/
private void pushdownThroughLeftOuterJoin(TopNKeyOperator topNKey) throws SemanticException {
final TopNKeyDesc topNKeyDesc = topNKey.getConf();
final CommonJoinOperator<? extends JoinDesc> join =
(CommonJoinOperator<? extends JoinDesc>) topNKey.getParentOperators().get(0);
final List<Operator<? extends OperatorDesc>> joinInputs = join.getParentOperators();
final ReduceSinkOperator reduceSinkOperator = (ReduceSinkOperator) joinInputs.get(0);
final ReduceSinkDesc reduceSinkDesc = reduceSinkOperator.getConf();
CommonKeyPrefix commonKeyPrefix = CommonKeyPrefix.map(
mapUntilColumnEquals(topNKeyDesc.getKeyColumns(), join.getColumnExprMap()),
topNKeyDesc.getColumnSortOrder(),
topNKeyDesc.getNullOrder(),
reduceSinkDesc.getKeyCols(),
reduceSinkDesc.getColumnExprMap(),
reduceSinkDesc.getOrder(),
reduceSinkDesc.getNullOrder());
if (commonKeyPrefix.isEmpty() || commonKeyPrefix.size() == topNKeyDesc.getPartitionKeyColumns().size()) {
return;
}
LOG.debug("Pushing a copy of {} through {} and {}",
topNKey.getName(), join.getName(), reduceSinkOperator.getName());
final TopNKeyDesc newTopNKeyDesc = topNKeyDesc.combine(commonKeyPrefix);
pushdown(copyDown(reduceSinkOperator, newTopNKeyDesc));
if (topNKeyDesc.getKeyColumns().size() == commonKeyPrefix.size()) {
LOG.debug("Removing {} above {}", topNKey.getName(), join.getName());
join.removeChildAndAdoptItsChildren(topNKey);
}
}
private List<ExprNodeDesc> mapUntilColumnEquals(List<ExprNodeDesc> columns, Map<String,
ExprNodeDesc> colExprMap) {
if (colExprMap == null) {
return new ArrayList<>(0);
}
final List<ExprNodeDesc> mappedColumns = new ArrayList<>();
for (ExprNodeDesc column : columns) {
final String columnName = column.getExprString();
if (colExprMap.containsKey(columnName)) {
mappedColumns.add(colExprMap.get(columnName));
} else {
return mappedColumns;
}
}
return mappedColumns;
}
/**
* Push through another Top N Key operator.
* If the TNK operators are the same one of them will be removed. See {@link TopNKeyDesc#isSame}
* else If expression in <code>topnKey</code> is a common prefix in it's parent TNK op and topN property is same
* then <code>topnkey</code> could be pushed through parent.
* If the Top N Key operator can not be pushed through this method tries to remove one of them:
* - if topN property is the same and the keys of one of the operators are subset of the other then the operator
* can be removed
* - if the keys are the same operator with higher topN value can be removed
* @param topNKey TopNKey operator to push
* @throws SemanticException when removeChildAndAdoptItsChildren was not successful
*/
private void pushdownThroughTopNKey(TopNKeyOperator topNKey) throws SemanticException {
TopNKeyOperator parent = (TopNKeyOperator) topNKey.getParentOperators().get(0);
if (hasSameTopNKeyDesc(parent, topNKey.getConf())) {
LOG.debug("Removing {} above same operator: {}", topNKey.getName(), parent.getName());
parent.removeChildAndAdoptItsChildren(topNKey);
return;
}
TopNKeyDesc topNKeyDesc = topNKey.getConf();
TopNKeyDesc parentTopNKeyDesc = parent.getConf();
CommonKeyPrefix commonKeyPrefix = CommonKeyPrefix.map(
topNKeyDesc.getKeyColumns(), topNKeyDesc.getColumnSortOrder(), topNKeyDesc.getNullOrder(),
parentTopNKeyDesc.getKeyColumns(), parentTopNKeyDesc.getColumnSortOrder(),
parentTopNKeyDesc.getNullOrder());
if (topNKeyDesc.getTopN() == parentTopNKeyDesc.getTopN()) {
if (topNKeyDesc.getKeyColumns().size() == commonKeyPrefix.size()) {
// TNK keys are subset of the parent TNK keys
pushdownThroughParent(topNKey);
if (topNKey.getChildOperators().get(0).getType() == OperatorType.TOPNKEY) {
LOG.debug("Removing {} since child {} supersedes it", parent.getName(), topNKey.getName());
topNKey.getParentOperators().get(0).removeChildAndAdoptItsChildren(topNKey);
}
} else if (parentTopNKeyDesc.getKeyColumns().size() == commonKeyPrefix.size()) {
// parent TNK keys are subset of TNK keys
LOG.debug("Removing parent of {} since it supersedes", topNKey.getName());
parent.getParentOperators().get(0).removeChildAndAdoptItsChildren(parent);
}
} else if (topNKeyDesc.getKeyColumns().size() == commonKeyPrefix.size() &&
parentTopNKeyDesc.getKeyColumns().size() == commonKeyPrefix.size()) {
if (topNKeyDesc.getTopN() > parentTopNKeyDesc.getTopN()) {
LOG.debug("Removing {}. Parent {} has same keys but lower topN {} > {}",
topNKey.getName(), parent.getName(), topNKeyDesc.getTopN(), parentTopNKeyDesc.getTopN());
topNKey.getParentOperators().get(0).removeChildAndAdoptItsChildren(topNKey);
} else {
LOG.debug("Removing parent {}. {} has same keys but lower topN {} < {}",
parent.getName(), topNKey.getName(), topNKeyDesc.getTopN(), parentTopNKeyDesc.getTopN());
parent.getParentOperators().get(0).removeChildAndAdoptItsChildren(parent);
}
}
}
private static boolean hasSameTopNKeyDesc(Operator<? extends OperatorDesc> operator, TopNKeyDesc desc) {
if (!(operator instanceof TopNKeyOperator)) {
return false;
}
final TopNKeyOperator topNKey = (TopNKeyOperator) operator;
final TopNKeyDesc opDesc = topNKey.getConf();
return opDesc.isSame(desc);
}
private static void moveDown(TopNKeyOperator topNKey) throws SemanticException {
assert topNKey.getNumParent() == 1;
final Operator<? extends OperatorDesc> parent = topNKey.getParentOperators().get(0);
final List<Operator<? extends OperatorDesc>> grandParents = parent.getParentOperators();
parent.removeChildAndAdoptItsChildren(topNKey);
for (Operator<? extends OperatorDesc> grandParent : grandParents) {
grandParent.replaceChild(parent, topNKey);
}
topNKey.getParentOperators().clear();
topNKey.getParentOperators().addAll(grandParents);
topNKey.getChildOperators().clear();
topNKey.getChildOperators().add(parent);
parent.getParentOperators().clear();
parent.getParentOperators().add(topNKey);
}
}