blob: 3b31f6d119968886aa16cea15088d9e8303717d5 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.hyracks.algebricks.rewriter.rules;
import java.util.ArrayList;
import java.util.BitSet;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import org.apache.commons.lang3.mutable.Mutable;
import org.apache.commons.lang3.mutable.MutableInt;
import org.apache.commons.lang3.mutable.MutableObject;
import org.apache.hyracks.algebricks.common.exceptions.AlgebricksException;
import org.apache.hyracks.algebricks.common.utils.Pair;
import org.apache.hyracks.algebricks.core.algebra.base.ILogicalExpression;
import org.apache.hyracks.algebricks.core.algebra.base.ILogicalOperator;
import org.apache.hyracks.algebricks.core.algebra.base.IOptimizationContext;
import org.apache.hyracks.algebricks.core.algebra.base.LogicalOperatorTag;
import org.apache.hyracks.algebricks.core.algebra.base.LogicalVariable;
import org.apache.hyracks.algebricks.core.algebra.expressions.VariableReferenceExpression;
import org.apache.hyracks.algebricks.core.algebra.operators.logical.AbstractLogicalOperator;
import org.apache.hyracks.algebricks.core.algebra.operators.logical.AssignOperator;
import org.apache.hyracks.algebricks.core.algebra.operators.logical.ExchangeOperator;
import org.apache.hyracks.algebricks.core.algebra.operators.logical.ProjectOperator;
import org.apache.hyracks.algebricks.core.algebra.operators.logical.ReplicateOperator;
import org.apache.hyracks.algebricks.core.algebra.operators.logical.visitors.IsomorphismUtilities;
import org.apache.hyracks.algebricks.core.algebra.operators.logical.visitors.VariableUtilities;
import org.apache.hyracks.algebricks.core.algebra.operators.physical.AssignPOperator;
import org.apache.hyracks.algebricks.core.algebra.operators.physical.OneToOneExchangePOperator;
import org.apache.hyracks.algebricks.core.algebra.operators.physical.ReplicatePOperator;
import org.apache.hyracks.algebricks.core.algebra.operators.physical.StreamProjectPOperator;
import org.apache.hyracks.algebricks.core.rewriter.base.HeuristicOptimizer;
import org.apache.hyracks.algebricks.core.rewriter.base.IAlgebraicRewriteRule;
public class ExtractCommonOperatorsRule implements IAlgebraicRewriteRule {
private final HashMap<Mutable<ILogicalOperator>, List<Mutable<ILogicalOperator>>> childrenToParents = new HashMap<Mutable<ILogicalOperator>, List<Mutable<ILogicalOperator>>>();
private final List<Mutable<ILogicalOperator>> roots = new ArrayList<Mutable<ILogicalOperator>>();
private final List<List<Mutable<ILogicalOperator>>> equivalenceClasses = new ArrayList<List<Mutable<ILogicalOperator>>>();
private final HashMap<Mutable<ILogicalOperator>, BitSet> opToCandidateInputs = new HashMap<Mutable<ILogicalOperator>, BitSet>();
private final HashMap<Mutable<ILogicalOperator>, MutableInt> clusterMap = new HashMap<Mutable<ILogicalOperator>, MutableInt>();
private final HashMap<Integer, BitSet> clusterWaitForMap = new HashMap<Integer, BitSet>();
private int lastUsedClusterId = 0;
@Override
public boolean rewritePre(Mutable<ILogicalOperator> opRef, IOptimizationContext context)
throws AlgebricksException {
AbstractLogicalOperator op = (AbstractLogicalOperator) opRef.getValue();
if (op.getOperatorTag() != LogicalOperatorTag.WRITE && op.getOperatorTag() != LogicalOperatorTag.WRITE_RESULT
&& op.getOperatorTag() != LogicalOperatorTag.DISTRIBUTE_RESULT) {
return false;
}
if (!roots.contains(op)) {
roots.add(new MutableObject<ILogicalOperator>(op));
}
return false;
}
@Override
public boolean rewritePost(Mutable<ILogicalOperator> opRef, IOptimizationContext context)
throws AlgebricksException {
AbstractLogicalOperator op = (AbstractLogicalOperator) opRef.getValue();
if (op.getOperatorTag() != LogicalOperatorTag.WRITE && op.getOperatorTag() != LogicalOperatorTag.WRITE_RESULT
&& op.getOperatorTag() != LogicalOperatorTag.DISTRIBUTE_RESULT) {
return false;
}
boolean rewritten = false;
boolean changed = false;
if (roots.size() > 0) {
do {
changed = false;
// applying the rewriting until fixpoint
topDownMaterialization(roots);
genCandidates(context);
removeTrivialShare();
if (equivalenceClasses.size() > 0) {
changed = rewrite(context);
}
if (!rewritten) {
rewritten = changed;
}
equivalenceClasses.clear();
childrenToParents.clear();
opToCandidateInputs.clear();
clusterMap.clear();
clusterWaitForMap.clear();
lastUsedClusterId = 0;
} while (changed);
roots.clear();
}
return rewritten;
}
private void removeTrivialShare() {
for (List<Mutable<ILogicalOperator>> candidates : equivalenceClasses) {
for (int i = candidates.size() - 1; i >= 0; i--) {
Mutable<ILogicalOperator> opRef = candidates.get(i);
AbstractLogicalOperator aop = (AbstractLogicalOperator) opRef.getValue();
if (aop.getOperatorTag() == LogicalOperatorTag.EXCHANGE) {
aop = (AbstractLogicalOperator) aop.getInputs().get(0).getValue();
}
if (aop.getOperatorTag() == LogicalOperatorTag.EMPTYTUPLESOURCE) {
candidates.remove(i);
}
}
}
for (int i = equivalenceClasses.size() - 1; i >= 0; i--) {
if (equivalenceClasses.get(i).size() < 2) {
equivalenceClasses.remove(i);
}
}
}
private boolean rewrite(IOptimizationContext context) throws AlgebricksException {
boolean changed = false;
for (List<Mutable<ILogicalOperator>> members : equivalenceClasses) {
if (rewriteForOneEquivalentClass(members, context)) {
changed = true;
}
}
return changed;
}
private boolean rewriteForOneEquivalentClass(List<Mutable<ILogicalOperator>> members, IOptimizationContext context)
throws AlgebricksException {
List<Mutable<ILogicalOperator>> group = new ArrayList<Mutable<ILogicalOperator>>();
boolean rewritten = false;
while (members.size() > 0) {
group.clear();
Mutable<ILogicalOperator> candidate = members.remove(members.size() - 1);
group.add(candidate);
for (int i = members.size() - 1; i >= 0; i--) {
Mutable<ILogicalOperator> peer = members.get(i);
if (IsomorphismUtilities.isOperatorIsomorphic(candidate.getValue(), peer.getValue())) {
group.add(peer);
members.remove(i);
}
}
boolean[] materializationFlags = computeMaterilizationFlags(group);
if (group.isEmpty()) {
continue;
}
candidate = group.get(0);
ReplicateOperator rop = new ReplicateOperator(group.size(), materializationFlags);
rop.setPhysicalOperator(new ReplicatePOperator());
Mutable<ILogicalOperator> ropRef = new MutableObject<ILogicalOperator>(rop);
AbstractLogicalOperator aopCandidate = (AbstractLogicalOperator) candidate.getValue();
List<Mutable<ILogicalOperator>> originalCandidateParents = childrenToParents.get(candidate);
rop.setExecutionMode(((AbstractLogicalOperator) candidate.getValue()).getExecutionMode());
if (aopCandidate.getOperatorTag() == LogicalOperatorTag.EXCHANGE) {
rop.getInputs().add(candidate);
} else {
AbstractLogicalOperator beforeExchange = new ExchangeOperator();
beforeExchange.setPhysicalOperator(new OneToOneExchangePOperator());
beforeExchange.setExecutionMode(rop.getExecutionMode());
Mutable<ILogicalOperator> beforeExchangeRef = new MutableObject<ILogicalOperator>(beforeExchange);
beforeExchange.getInputs().add(candidate);
context.computeAndSetTypeEnvironmentForOperator(beforeExchange);
rop.getInputs().add(beforeExchangeRef);
}
context.computeAndSetTypeEnvironmentForOperator(rop);
for (Mutable<ILogicalOperator> parentRef : originalCandidateParents) {
AbstractLogicalOperator parent = (AbstractLogicalOperator) parentRef.getValue();
int index = parent.getInputs().indexOf(candidate);
if (parent.getOperatorTag() == LogicalOperatorTag.EXCHANGE) {
parent.getInputs().set(index, ropRef);
rop.getOutputs().add(parentRef);
} else {
AbstractLogicalOperator exchange = new ExchangeOperator();
exchange.setPhysicalOperator(new OneToOneExchangePOperator());
exchange.setExecutionMode(rop.getExecutionMode());
MutableObject<ILogicalOperator> exchangeRef = new MutableObject<ILogicalOperator>(exchange);
exchange.getInputs().add(ropRef);
rop.getOutputs().add(exchangeRef);
context.computeAndSetTypeEnvironmentForOperator(exchange);
parent.getInputs().set(index, exchangeRef);
context.computeAndSetTypeEnvironmentForOperator(parent);
}
}
List<LogicalVariable> liveVarsNew = new ArrayList<LogicalVariable>();
VariableUtilities.getLiveVariables(candidate.getValue(), liveVarsNew);
ArrayList<Mutable<ILogicalExpression>> assignExprs = new ArrayList<Mutable<ILogicalExpression>>();
for (LogicalVariable liveVar : liveVarsNew) {
assignExprs.add(new MutableObject<ILogicalExpression>(new VariableReferenceExpression(liveVar)));
}
for (Mutable<ILogicalOperator> ref : group) {
if (ref.equals(candidate)) {
continue;
}
ArrayList<LogicalVariable> liveVars = new ArrayList<LogicalVariable>();
Map<LogicalVariable, LogicalVariable> variableMappingBack = new HashMap<LogicalVariable, LogicalVariable>();
IsomorphismUtilities.mapVariablesTopDown(ref.getValue(), candidate.getValue(), variableMappingBack);
for (int i = 0; i < liveVarsNew.size(); i++) {
liveVars.add(variableMappingBack.get(liveVarsNew.get(i)));
}
AbstractLogicalOperator assignOperator = new AssignOperator(liveVars, assignExprs);
assignOperator.setExecutionMode(rop.getExecutionMode());
assignOperator.setPhysicalOperator(new AssignPOperator());
AbstractLogicalOperator projectOperator = new ProjectOperator(liveVars);
projectOperator.setPhysicalOperator(new StreamProjectPOperator());
projectOperator.setExecutionMode(rop.getExecutionMode());
AbstractLogicalOperator exchOp = new ExchangeOperator();
exchOp.setPhysicalOperator(new OneToOneExchangePOperator());
exchOp.setExecutionMode(rop.getExecutionMode());
exchOp.getInputs().add(ropRef);
MutableObject<ILogicalOperator> exchOpRef = new MutableObject<ILogicalOperator>(exchOp);
rop.getOutputs().add(exchOpRef);
assignOperator.getInputs().add(exchOpRef);
projectOperator.getInputs().add(new MutableObject<ILogicalOperator>(assignOperator));
// set the types
context.computeAndSetTypeEnvironmentForOperator(exchOp);
context.computeAndSetTypeEnvironmentForOperator(assignOperator);
context.computeAndSetTypeEnvironmentForOperator(projectOperator);
List<Mutable<ILogicalOperator>> parentOpList = childrenToParents.get(ref);
for (Mutable<ILogicalOperator> parentOpRef : parentOpList) {
AbstractLogicalOperator parentOp = (AbstractLogicalOperator) parentOpRef.getValue();
int index = parentOp.getInputs().indexOf(ref);
ILogicalOperator childOp = parentOp.getOperatorTag() == LogicalOperatorTag.PROJECT ? assignOperator
: projectOperator;
if (!HeuristicOptimizer.isHyracksOp(parentOp.getPhysicalOperator().getOperatorTag())) {
parentOp.getInputs().set(index, new MutableObject<ILogicalOperator>(childOp));
} else {
// If the parent operator is a hyracks operator,
// an extra one-to-one exchange is needed.
AbstractLogicalOperator exchg = new ExchangeOperator();
exchg.setPhysicalOperator(new OneToOneExchangePOperator());
exchg.setExecutionMode(childOp.getExecutionMode());
exchg.getInputs().add(new MutableObject<ILogicalOperator>(childOp));
parentOp.getInputs().set(index, new MutableObject<ILogicalOperator>(exchg));
context.computeAndSetTypeEnvironmentForOperator(exchg);
}
context.computeAndSetTypeEnvironmentForOperator(parentOp);
}
}
rewritten = true;
}
return rewritten;
}
private void genCandidates(IOptimizationContext context) throws AlgebricksException {
List<List<Mutable<ILogicalOperator>>> previousEquivalenceClasses = new ArrayList<List<Mutable<ILogicalOperator>>>();
while (equivalenceClasses.size() > 0) {
previousEquivalenceClasses.clear();
for (List<Mutable<ILogicalOperator>> candidates : equivalenceClasses) {
List<Mutable<ILogicalOperator>> candidatesCopy = new ArrayList<Mutable<ILogicalOperator>>();
candidatesCopy.addAll(candidates);
previousEquivalenceClasses.add(candidatesCopy);
}
List<Mutable<ILogicalOperator>> currentLevelOpRefs = new ArrayList<Mutable<ILogicalOperator>>();
for (List<Mutable<ILogicalOperator>> candidates : equivalenceClasses) {
if (candidates.size() > 0) {
for (Mutable<ILogicalOperator> opRef : candidates) {
List<Mutable<ILogicalOperator>> refs = childrenToParents.get(opRef);
if (refs != null) {
currentLevelOpRefs.addAll(refs);
}
}
}
if (currentLevelOpRefs.size() == 0) {
continue;
}
candidatesGrow(currentLevelOpRefs, candidates);
}
if (currentLevelOpRefs.size() == 0) {
break;
}
prune(context);
}
if (equivalenceClasses.size() < 1 && previousEquivalenceClasses.size() > 0) {
equivalenceClasses.addAll(previousEquivalenceClasses);
prune(context);
}
}
private void topDownMaterialization(List<Mutable<ILogicalOperator>> tops) {
List<Mutable<ILogicalOperator>> candidates = new ArrayList<Mutable<ILogicalOperator>>();
List<Mutable<ILogicalOperator>> nextLevel = new ArrayList<Mutable<ILogicalOperator>>();
for (Mutable<ILogicalOperator> op : tops) {
for (Mutable<ILogicalOperator> opRef : op.getValue().getInputs()) {
List<Mutable<ILogicalOperator>> opRefList = childrenToParents.get(opRef);
if (opRefList == null) {
opRefList = new ArrayList<Mutable<ILogicalOperator>>();
childrenToParents.put(opRef, opRefList);
nextLevel.add(opRef);
}
opRefList.add(op);
}
if (op.getValue().getInputs().size() == 0) {
candidates.add(op);
}
}
if (equivalenceClasses.size() > 0) {
equivalenceClasses.get(0).addAll(candidates);
} else {
equivalenceClasses.add(candidates);
}
if (nextLevel.size() > 0) {
topDownMaterialization(nextLevel);
}
}
private void candidatesGrow(List<Mutable<ILogicalOperator>> opList, List<Mutable<ILogicalOperator>> candidates) {
List<Mutable<ILogicalOperator>> previousCandidates = new ArrayList<Mutable<ILogicalOperator>>();
previousCandidates.addAll(candidates);
candidates.clear();
boolean validCandidate = false;
for (Mutable<ILogicalOperator> op : opList) {
List<Mutable<ILogicalOperator>> inputs = op.getValue().getInputs();
for (int i = 0; i < inputs.size(); i++) {
Mutable<ILogicalOperator> inputRef = inputs.get(i);
validCandidate = false;
for (Mutable<ILogicalOperator> candidate : previousCandidates) {
// if current input is in candidates
if (inputRef.getValue().equals(candidate.getValue())) {
if (inputs.size() == 1) {
validCandidate = true;
} else {
BitSet candidateInputBitMap = opToCandidateInputs.get(op);
if (candidateInputBitMap == null) {
candidateInputBitMap = new BitSet(inputs.size());
opToCandidateInputs.put(op, candidateInputBitMap);
}
candidateInputBitMap.set(i);
if (candidateInputBitMap.cardinality() == inputs.size()) {
validCandidate = true;
}
}
break;
}
}
}
if (!validCandidate) {
continue;
}
if (!candidates.contains(op)) {
candidates.add(op);
}
}
}
private void prune(IOptimizationContext context) throws AlgebricksException {
List<List<Mutable<ILogicalOperator>>> previousEquivalenceClasses = new ArrayList<List<Mutable<ILogicalOperator>>>();
for (List<Mutable<ILogicalOperator>> candidates : equivalenceClasses) {
List<Mutable<ILogicalOperator>> candidatesCopy = new ArrayList<Mutable<ILogicalOperator>>();
candidatesCopy.addAll(candidates);
previousEquivalenceClasses.add(candidatesCopy);
}
equivalenceClasses.clear();
for (List<Mutable<ILogicalOperator>> candidates : previousEquivalenceClasses) {
boolean[] reserved = new boolean[candidates.size()];
for (int i = 0; i < reserved.length; i++) {
reserved[i] = false;
}
for (int i = candidates.size() - 1; i >= 0; i--) {
if (reserved[i] == false) {
List<Mutable<ILogicalOperator>> equivalentClass = new ArrayList<Mutable<ILogicalOperator>>();
ILogicalOperator candidate = candidates.get(i).getValue();
equivalentClass.add(candidates.get(i));
for (int j = i - 1; j >= 0; j--) {
ILogicalOperator peer = candidates.get(j).getValue();
if (IsomorphismUtilities.isOperatorIsomorphic(candidate, peer)) {
reserved[i] = true;
reserved[j] = true;
equivalentClass.add(candidates.get(j));
}
}
if (equivalentClass.size() > 1) {
equivalenceClasses.add(equivalentClass);
Collections.reverse(equivalentClass);
}
}
}
for (int i = candidates.size() - 1; i >= 0; i--) {
if (!reserved[i]) {
candidates.remove(i);
}
}
}
}
private boolean[] computeMaterilizationFlags(List<Mutable<ILogicalOperator>> group) {
lastUsedClusterId = 0;
for (Mutable<ILogicalOperator> root : roots) {
computeClusters(null, root, new MutableInt(++lastUsedClusterId));
}
boolean[] materializationFlags = new boolean[group.size()];
boolean worthMaterialization = worthMaterialization(group.get(0));
boolean requiresMaterialization;
// get clusterIds for each candidate in the group
List<Integer> groupClusterIds = new ArrayList<Integer>(group.size());
for (int i = 0; i < group.size(); i++) {
groupClusterIds.add(clusterMap.get(group.get(i)).getValue());
}
for (int i = group.size() - 1; i >= 0; i--) {
requiresMaterialization = requiresMaterialization(groupClusterIds, i);
if (requiresMaterialization && !worthMaterialization) {
group.remove(i);
groupClusterIds.remove(i);
}
materializationFlags[i] = requiresMaterialization;
}
if (group.size() < 2) {
group.clear();
}
// if does not worth materialization, the flags for the remaining candidates should be false
return worthMaterialization ? materializationFlags : new boolean[group.size()];
}
private boolean requiresMaterialization(List<Integer> groupClusterIds, int index) {
Integer clusterId = groupClusterIds.get(index);
BitSet blockingClusters = new BitSet();
getAllBlockingClusterIds(clusterId, blockingClusters);
if (!blockingClusters.isEmpty()) {
for (int i = 0; i < groupClusterIds.size(); i++) {
if (i == index) {
continue;
}
if (blockingClusters.get(groupClusterIds.get(i))) {
return true;
}
}
}
return false;
}
private void getAllBlockingClusterIds(int clusterId, BitSet blockingClusters) {
BitSet waitFor = clusterWaitForMap.get(clusterId);
if (waitFor != null) {
for (int i = waitFor.nextSetBit(0); i >= 0; i = waitFor.nextSetBit(i + 1)) {
getAllBlockingClusterIds(i, blockingClusters);
}
blockingClusters.or(waitFor);
}
}
private void computeClusters(Mutable<ILogicalOperator> parentRef, Mutable<ILogicalOperator> opRef,
MutableInt currentClusterId) {
// only replicate operator has multiple outputs
int outputIndex = 0;
if (opRef.getValue().getOperatorTag() == LogicalOperatorTag.REPLICATE) {
ReplicateOperator rop = (ReplicateOperator) opRef.getValue();
List<Mutable<ILogicalOperator>> outputs = rop.getOutputs();
for (outputIndex = 0; outputIndex < outputs.size(); outputIndex++) {
if (outputs.get(outputIndex).equals(parentRef)) {
break;
}
}
}
AbstractLogicalOperator aop = (AbstractLogicalOperator) opRef.getValue();
Pair<int[], int[]> labels = aop.getPhysicalOperator().getInputOutputDependencyLabels(opRef.getValue());
List<Mutable<ILogicalOperator>> inputs = opRef.getValue().getInputs();
for (int i = 0; i < inputs.size(); i++) {
Mutable<ILogicalOperator> inputRef = inputs.get(i);
if (labels.second[outputIndex] == 1 && labels.first[i] == 0) { // 1 -> 0
if (labels.second.length == 1) {
clusterMap.put(opRef, currentClusterId);
// start a new cluster
MutableInt newClusterId = new MutableInt(++lastUsedClusterId);
computeClusters(opRef, inputRef, newClusterId);
BitSet waitForList = clusterWaitForMap.get(currentClusterId.getValue());
if (waitForList == null) {
waitForList = new BitSet();
clusterWaitForMap.put(currentClusterId.getValue(), waitForList);
}
waitForList.set(newClusterId.getValue());
}
} else { // 0 -> 0 and 1 -> 1
MutableInt prevClusterId = clusterMap.get(opRef);
if (prevClusterId == null || prevClusterId.getValue().equals(currentClusterId.getValue())) {
clusterMap.put(opRef, currentClusterId);
computeClusters(opRef, inputRef, currentClusterId);
} else {
// merge prevClusterId and currentClusterId: update all the map entries that has currentClusterId to prevClusterId
for (BitSet bs : clusterWaitForMap.values()) {
if (bs.get(currentClusterId.getValue())) {
bs.clear(currentClusterId.getValue());
bs.set(prevClusterId.getValue());
}
}
currentClusterId.setValue(prevClusterId.getValue());
}
}
}
}
protected boolean worthMaterialization(Mutable<ILogicalOperator> candidate) {
AbstractLogicalOperator aop = (AbstractLogicalOperator) candidate.getValue();
if (aop.getPhysicalOperator().expensiveThanMaterialization()) {
return true;
}
List<Mutable<ILogicalOperator>> inputs = candidate.getValue().getInputs();
for (Mutable<ILogicalOperator> inputRef : inputs) {
if (worthMaterialization(inputRef)) {
return true;
}
}
return false;
}
}