blob: 9089c4233663261bc7600efe6769f1c4407e7bd6 [file] [log] [blame]
/*
* Copyright 2009-2012 by The Regents of the University of California
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* you may obtain a copy of the License from
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package edu.uci.ics.asterix.optimizer.rules;
import java.util.ArrayList;
import java.util.HashSet;
import java.util.List;
import java.util.Set;
import org.apache.commons.lang3.mutable.Mutable;
import edu.uci.ics.asterix.metadata.declared.AqlDataSource;
import edu.uci.ics.asterix.metadata.utils.DatasetUtils;
import edu.uci.ics.hyracks.algebricks.common.exceptions.AlgebricksException;
import edu.uci.ics.hyracks.algebricks.core.algebra.base.ILogicalExpression;
import edu.uci.ics.hyracks.algebricks.core.algebra.base.ILogicalOperator;
import edu.uci.ics.hyracks.algebricks.core.algebra.base.IOptimizationContext;
import edu.uci.ics.hyracks.algebricks.core.algebra.base.LogicalExpressionTag;
import edu.uci.ics.hyracks.algebricks.core.algebra.base.LogicalOperatorTag;
import edu.uci.ics.hyracks.algebricks.core.algebra.base.LogicalVariable;
import edu.uci.ics.hyracks.algebricks.core.algebra.expressions.AbstractFunctionCallExpression;
import edu.uci.ics.hyracks.algebricks.core.algebra.expressions.AbstractLogicalExpression;
import edu.uci.ics.hyracks.algebricks.core.algebra.functions.AlgebricksBuiltinFunctions;
import edu.uci.ics.hyracks.algebricks.core.algebra.functions.FunctionIdentifier;
import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.AbstractBinaryJoinOperator;
import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.AbstractLogicalOperator;
import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.DataSourceScanOperator;
import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.visitors.VariableUtilities;
import edu.uci.ics.hyracks.algebricks.core.rewriter.base.IAlgebraicRewriteRule;
/**
* Removes join operators for which all of the following conditions are true:
* 1. The live variables of one input branch of the join are not used in the upstream plan
* 2. The join is an inner equi join
* 3. The join condition only uses variables that correspond to primary keys of the same dataset
* Notice that the last condition implies a 1:1 join, i.e., the join does not change the result cardinality.
*
* Joins that satisfy the above conditions may be introduced by other rules
* which use surrogate optimizations. Such an optimization aims to reduce data copies and communication costs by
* using the primary keys as surrogates for the desired data items. Typically,
* such a surrogate-based plan introduces a top-level join to finally resolve
* the surrogates to the desired data items.
* In case the upstream plan does not require the original data items at all, such a top-level join is unnecessary.
* The purpose of this rule is to remove such unnecessary joins.
*/
public class RemoveUnusedOneToOneEquiJoinRule implements IAlgebraicRewriteRule {
private final Set<LogicalVariable> parentsUsedVars = new HashSet<LogicalVariable>();
private final List<LogicalVariable> usedVars = new ArrayList<LogicalVariable>();
private final List<LogicalVariable> liveVars = new ArrayList<LogicalVariable>();
private final List<LogicalVariable> pkVars = new ArrayList<LogicalVariable>();
private final List<DataSourceScanOperator> dataScans = new ArrayList<DataSourceScanOperator>();
private boolean hasRun = false;
@Override
public boolean rewritePre(Mutable<ILogicalOperator> opRef, IOptimizationContext context) throws AlgebricksException {
if (hasRun) {
return false;
}
hasRun = true;
if (removeUnusedJoin(opRef)) {
return true;
}
return false;
}
@Override
public boolean rewritePost(Mutable<ILogicalOperator> opRef, IOptimizationContext context)
throws AlgebricksException {
return false;
}
private boolean removeUnusedJoin(Mutable<ILogicalOperator> opRef) throws AlgebricksException {
AbstractLogicalOperator op = (AbstractLogicalOperator) opRef.getValue();
boolean modified = false;
usedVars.clear();
VariableUtilities.getUsedVariables(op, usedVars);
// Propagate used variables from parents downwards.
parentsUsedVars.addAll(usedVars);
int numInputs = op.getInputs().size();
for (int i = 0; i < numInputs; i++) {
Mutable<ILogicalOperator> childOpRef = op.getInputs().get(i);
int unusedJoinBranchIndex = removeJoinFromInputBranch(childOpRef);
if (unusedJoinBranchIndex >= 0) {
int usedBranchIndex = (unusedJoinBranchIndex == 0) ? 1 : 0;
// Remove join at input index i, by hooking up op's input i with
// the join's branch at unusedJoinBranchIndex.
AbstractBinaryJoinOperator joinOp = (AbstractBinaryJoinOperator) childOpRef.getValue();
op.getInputs().set(i, joinOp.getInputs().get(usedBranchIndex));
modified = true;
}
// Descend into children.
if (removeUnusedJoin(childOpRef)) {
modified = true;
}
}
return modified;
}
private int removeJoinFromInputBranch(Mutable<ILogicalOperator> opRef) throws AlgebricksException {
AbstractLogicalOperator op = (AbstractLogicalOperator) opRef.getValue();
if (op.getOperatorTag() != LogicalOperatorTag.INNERJOIN) {
return -1;
}
AbstractBinaryJoinOperator joinOp = (AbstractBinaryJoinOperator) op;
// Make sure the join is an equi-join.
if (!isEquiJoin(joinOp.getCondition())) {
return -1;
}
int unusedJoinBranchIndex = -1;
for (int i = 0; i < joinOp.getInputs().size(); i++) {
liveVars.clear();
VariableUtilities.getLiveVariables(joinOp.getInputs().get(i).getValue(), liveVars);
liveVars.retainAll(parentsUsedVars);
if (liveVars.isEmpty()) {
// None of the live variables from this branch are used by its parents.
unusedJoinBranchIndex = i;
break;
}
}
if (unusedJoinBranchIndex < 0) {
// The variables from both branches are used in the upstream plan. We cannot remove this join.
return -1;
}
// Check whether one of the join branches is unused.
usedVars.clear();
VariableUtilities.getUsedVariables(joinOp, usedVars);
// Check whether all used variables originate from primary keys of exactly the same dataset.
// Collect a list of datascans whose primary key variables are used in the join condition.
gatherProducingDataScans(opRef, usedVars, dataScans);
// Check that all datascans scan the same dataset, and that the join condition
// only used primary key variables of those datascans.
for (int i = 0; i < dataScans.size(); i++) {
if (i > 0) {
AqlDataSource prevAqlDataSource = (AqlDataSource) dataScans.get(i - 1).getDataSource();
AqlDataSource currAqlDataSource = (AqlDataSource) dataScans.get(i).getDataSource();
if (!prevAqlDataSource.getDataset().equals(currAqlDataSource.getDataset())) {
return -1;
}
}
// Remove from the used variables all the primary key vars of this dataset.
fillPKVars(dataScans.get(i), pkVars);
usedVars.removeAll(pkVars);
}
if (!usedVars.isEmpty()) {
// The join condition also uses some other variables that are not primary
// keys from datasource scans of the same dataset.
return -1;
}
return unusedJoinBranchIndex;
}
private void gatherProducingDataScans(Mutable<ILogicalOperator> opRef, List<LogicalVariable> joinUsedVars,
List<DataSourceScanOperator> dataScans) {
AbstractLogicalOperator op = (AbstractLogicalOperator) opRef.getValue();
if (op.getOperatorTag() != LogicalOperatorTag.DATASOURCESCAN) {
for (Mutable<ILogicalOperator> inputOp : op.getInputs()) {
gatherProducingDataScans(inputOp, joinUsedVars, dataScans);
}
return;
}
DataSourceScanOperator dataScan = (DataSourceScanOperator) op;
fillPKVars(dataScan, pkVars);
// Check if join uses all PK vars.
if (joinUsedVars.containsAll(pkVars)) {
dataScans.add(dataScan);
}
}
private void fillPKVars(DataSourceScanOperator dataScan, List<LogicalVariable> pkVars) {
pkVars.clear();
AqlDataSource aqlDataSource = (AqlDataSource) dataScan.getDataSource();
int numPKs = DatasetUtils.getPartitioningKeys(aqlDataSource.getDataset()).size();
pkVars.clear();
for (int i = 0; i < numPKs; i++) {
pkVars.add(dataScan.getVariables().get(i));
}
}
private boolean isEquiJoin(Mutable<ILogicalExpression> conditionExpr) {
AbstractLogicalExpression expr = (AbstractLogicalExpression) conditionExpr.getValue();
if (expr.getExpressionTag() == LogicalExpressionTag.FUNCTION_CALL) {
AbstractFunctionCallExpression funcExpr = (AbstractFunctionCallExpression) expr;
FunctionIdentifier funcIdent = funcExpr.getFunctionIdentifier();
if (funcIdent != AlgebricksBuiltinFunctions.AND && funcIdent != AlgebricksBuiltinFunctions.EQ) {
return false;
}
}
return true;
}
}