blob: de0ee387c0b84cdfda7e5c5edeaa9f2b9fdf88bd [file] [log] [blame]
package edu.uci.ics.asterix.optimizer.rules;
import java.util.ArrayList;
import java.util.HashSet;
import java.util.LinkedList;
import java.util.List;
import org.apache.commons.lang3.mutable.Mutable;
import org.apache.commons.lang3.mutable.MutableObject;
import edu.uci.ics.asterix.algebra.base.AsterixOperatorAnnotations;
import edu.uci.ics.asterix.common.config.DatasetConfig.DatasetType;
import edu.uci.ics.asterix.common.exceptions.AsterixRuntimeException;
import edu.uci.ics.asterix.metadata.declared.AqlCompiledMetadataDeclarations;
import edu.uci.ics.asterix.metadata.declared.AqlMetadataProvider;
import edu.uci.ics.asterix.metadata.declared.AqlSourceId;
import edu.uci.ics.asterix.metadata.entities.Dataset;
import edu.uci.ics.asterix.metadata.entities.Index;
import edu.uci.ics.asterix.metadata.utils.DatasetUtils;
import edu.uci.ics.asterix.om.base.AInt32;
import edu.uci.ics.asterix.om.base.AString;
import edu.uci.ics.asterix.om.base.IAObject;
import edu.uci.ics.asterix.om.constants.AsterixConstantValue;
import edu.uci.ics.asterix.om.types.ARecordType;
import edu.uci.ics.asterix.om.types.ATypeTag;
import edu.uci.ics.asterix.om.types.IAType;
import edu.uci.ics.asterix.optimizer.base.AnalysisUtil;
import edu.uci.ics.hyracks.algebricks.common.exceptions.AlgebricksException;
import edu.uci.ics.hyracks.algebricks.common.utils.Pair;
import edu.uci.ics.hyracks.algebricks.core.algebra.base.ILogicalExpression;
import edu.uci.ics.hyracks.algebricks.core.algebra.base.ILogicalOperator;
import edu.uci.ics.hyracks.algebricks.core.algebra.base.ILogicalPlan;
import edu.uci.ics.hyracks.algebricks.core.algebra.base.IOptimizationContext;
import edu.uci.ics.hyracks.algebricks.core.algebra.base.LogicalExpressionTag;
import edu.uci.ics.hyracks.algebricks.core.algebra.base.LogicalOperatorTag;
import edu.uci.ics.hyracks.algebricks.core.algebra.base.LogicalVariable;
import edu.uci.ics.hyracks.algebricks.core.algebra.expressions.AbstractFunctionCallExpression;
import edu.uci.ics.hyracks.algebricks.core.algebra.expressions.ConstantExpression;
import edu.uci.ics.hyracks.algebricks.core.algebra.expressions.VariableReferenceExpression;
import edu.uci.ics.hyracks.algebricks.core.algebra.metadata.IDataSource;
import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.AbstractLogicalOperator;
import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.AbstractOperatorWithNestedPlans;
import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.AssignOperator;
import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.DataSourceScanOperator;
import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.GroupByOperator;
import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.NestedTupleSourceOperator;
import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.visitors.VariableUtilities;
import edu.uci.ics.hyracks.algebricks.core.rewriter.base.IAlgebraicRewriteRule;
public class PushFieldAccessRule implements IAlgebraicRewriteRule {
@Override
public boolean rewritePre(Mutable<ILogicalOperator> opRef, IOptimizationContext context) {
return false;
}
@Override
public boolean rewritePost(Mutable<ILogicalOperator> opRef, IOptimizationContext context)
throws AlgebricksException {
AbstractLogicalOperator op = (AbstractLogicalOperator) opRef.getValue();
if (context.checkIfInDontApplySet(this, op)) {
return false;
}
if (op.getOperatorTag() != LogicalOperatorTag.ASSIGN) {
return false;
}
AssignOperator access = (AssignOperator) op;
ILogicalExpression expr = getFirstExpr(access);
String finalAnnot = null;
if (AnalysisUtil.isAccessToFieldRecord(expr)) {
finalAnnot = AsterixOperatorAnnotations.PUSHED_FIELD_ACCESS;
} else if (AnalysisUtil.isRunnableAccessToFieldRecord(expr)) {
finalAnnot = AsterixOperatorAnnotations.PUSHED_RUNNABLE_FIELD_ACCESS;
} else {
return false;
}
AbstractLogicalOperator op2 = (AbstractLogicalOperator) access.getInputs().get(0).getValue();
// If it's not an indexed field, it is pushed so that scan can be
// rewritten into index search.
if (op2.getOperatorTag() == LogicalOperatorTag.PROJECT || context.checkAndAddToAlreadyCompared(op, op2)
&& !(op2.getOperatorTag() == LogicalOperatorTag.SELECT && isAccessToIndexedField(access, context))) {
return false;
}
boolean changed = propagateFieldAccessRec(opRef, context, finalAnnot);
// if (changed) {
// OptimizationUtil.typeOpRec(opRef, context);
// }
return changed;
}
@SuppressWarnings("unchecked")
private boolean isAccessToIndexedField(AssignOperator assign, IOptimizationContext context)
throws AlgebricksException {
AbstractFunctionCallExpression accessFun = (AbstractFunctionCallExpression) assign.getExpressions().get(0)
.getValue();
ILogicalExpression e0 = accessFun.getArguments().get(0).getValue();
if (e0.getExpressionTag() != LogicalExpressionTag.VARIABLE) {
return false;
}
LogicalVariable var = ((VariableReferenceExpression) e0).getVariableReference();
if (context.findPrimaryKey(var) == null) {
// not referring to a dataset record
return false;
}
AbstractLogicalOperator op = assign;
while (op.getInputs().size() == 1 && op.getOperatorTag() != LogicalOperatorTag.DATASOURCESCAN) {
op = (AbstractLogicalOperator) op.getInputs().get(0).getValue();
}
if (op.getOperatorTag() != LogicalOperatorTag.DATASOURCESCAN) {
return false;
}
DataSourceScanOperator scan = (DataSourceScanOperator) op;
LogicalVariable recVar = scan.getVariables().get(scan.getVariables().size() - 1);
if (recVar != var) {
return false;
}
AqlMetadataProvider mp = (AqlMetadataProvider) context.getMetadataProvider();
AqlCompiledMetadataDeclarations metadata = mp.getMetadataDeclarations();
AqlSourceId asid = ((IDataSource<AqlSourceId>) scan.getDataSource()).getId();
Dataset dataset = metadata.findDataset(asid.getDatasetName());
if (dataset == null) {
throw new AlgebricksException("Dataset " + asid.getDatasetName() + " not found.");
}
if (dataset.getDatasetType() != DatasetType.INTERNAL && dataset.getDatasetType() != DatasetType.FEED) {
return false;
}
ILogicalExpression e1 = accessFun.getArguments().get(1).getValue();
if (e1.getExpressionTag() != LogicalExpressionTag.CONSTANT) {
return false;
}
ConstantExpression ce = (ConstantExpression) e1;
IAObject obj = ((AsterixConstantValue) ce.getValue()).getObject();
String fldName;
if (obj.getType().getTypeTag() == ATypeTag.STRING) {
fldName = ((AString) obj).getStringValue();
} else {
int pos = ((AInt32) obj).getIntegerValue();
String tName = dataset.getItemTypeName();
IAType t = metadata.findType(tName);
if (t.getTypeTag() != ATypeTag.RECORD) {
return false;
}
ARecordType rt = (ARecordType) t;
if (pos >= rt.getFieldNames().length) {
return false;
}
fldName = rt.getFieldNames()[pos];
}
List<Index> datasetIndexes = metadata.getDatasetIndexes(dataset.getDataverseName(), dataset.getDatasetName());
boolean hasSecondaryIndex = false;
for (Index index : datasetIndexes) {
if (index.isSecondaryIndex()) {
hasSecondaryIndex = true;
break;
}
}
return hasSecondaryIndex;
}
private boolean tryingToPushThroughSelectionWithSameDataSource(AssignOperator access, AbstractLogicalOperator op2) {
if (op2.getOperatorTag() != LogicalOperatorTag.SELECT) {
return false;
}
ILogicalExpression e1 = (ILogicalExpression) access.getAnnotations().get(
AsterixOperatorAnnotations.FIELD_ACCESS);
if (e1 == null) {
return false;
}
ILogicalExpression e2 = (ILogicalExpression) op2.getAnnotations().get(AsterixOperatorAnnotations.FIELD_ACCESS);
if (e2 == null) {
return false;
}
return e1.equals(e2);
}
@SuppressWarnings("unchecked")
private boolean propagateFieldAccessRec(Mutable<ILogicalOperator> opRef, IOptimizationContext context,
String finalAnnot) throws AlgebricksException {
AssignOperator access = (AssignOperator) opRef.getValue();
Mutable<ILogicalOperator> opRef2 = access.getInputs().get(0);
AbstractLogicalOperator op2 = (AbstractLogicalOperator) opRef2.getValue();
if (tryingToPushThroughSelectionWithSameDataSource(access, op2)) {
return false;
}
if (testAndModifyRedundantOp(access, op2)) {
propagateFieldAccessRec(opRef2, context, finalAnnot);
return true;
}
List<LogicalVariable> usedInAccess = new LinkedList<LogicalVariable>();
VariableUtilities.getUsedVariables(access, usedInAccess);
List<LogicalVariable> produced2 = new LinkedList<LogicalVariable>();
if (op2.getOperatorTag() == LogicalOperatorTag.GROUP) {
VariableUtilities.getLiveVariables(op2, produced2);
} else {
VariableUtilities.getProducedVariables(op2, produced2);
}
boolean pushItDown = false;
List<LogicalVariable> inter = new ArrayList<LogicalVariable>(usedInAccess);
if (inter.isEmpty()) { // ground value
return false;
}
inter.retainAll(produced2);
if (inter.isEmpty()) {
pushItDown = true;
} else if (op2.getOperatorTag() == LogicalOperatorTag.GROUP) {
GroupByOperator g = (GroupByOperator) op2;
List<Pair<LogicalVariable, LogicalVariable>> varMappings = new ArrayList<Pair<LogicalVariable, LogicalVariable>>();
for (Pair<LogicalVariable, Mutable<ILogicalExpression>> p : g.getDecorList()) {
ILogicalExpression e = p.second.getValue();
if (e.getExpressionTag() == LogicalExpressionTag.VARIABLE) {
LogicalVariable decorVar = GroupByOperator.getDecorVariable(p);
if (inter.contains(decorVar)) {
inter.remove(decorVar);
LogicalVariable v1 = ((VariableReferenceExpression) e).getVariableReference();
varMappings.add(new Pair<LogicalVariable, LogicalVariable>(decorVar, v1));
}
}
}
if (inter.isEmpty()) {
boolean changed = false;
for (Pair<LogicalVariable, LogicalVariable> m : varMappings) {
LogicalVariable v2 = context.newVar();
LogicalVariable oldVar = access.getVariables().get(0);
g.getDecorList().add(
new Pair<LogicalVariable, Mutable<ILogicalExpression>>(oldVar,
new MutableObject<ILogicalExpression>(new VariableReferenceExpression(v2))));
changed = true;
access.getVariables().set(0, v2);
VariableUtilities.substituteVariables(access, m.first, m.second, context);
}
if (changed) {
context.computeAndSetTypeEnvironmentForOperator(g);
}
usedInAccess.clear();
VariableUtilities.getUsedVariables(access, usedInAccess);
pushItDown = true;
}
}
if (pushItDown) {
if (op2.getOperatorTag() == LogicalOperatorTag.NESTEDTUPLESOURCE) {
Mutable<ILogicalOperator> childOfSubplan = ((NestedTupleSourceOperator) op2).getDataSourceReference()
.getValue().getInputs().get(0);
pushAccessDown(opRef, op2, childOfSubplan, context, finalAnnot);
return true;
}
if (op2.getInputs().size() == 1 && !op2.hasNestedPlans()) {
pushAccessDown(opRef, op2, op2.getInputs().get(0), context, finalAnnot);
return true;
} else {
for (Mutable<ILogicalOperator> inp : op2.getInputs()) {
HashSet<LogicalVariable> v2 = new HashSet<LogicalVariable>();
VariableUtilities.getLiveVariables(inp.getValue(), v2);
if (v2.containsAll(usedInAccess)) {
pushAccessDown(opRef, op2, inp, context, finalAnnot);
return true;
}
}
}
if (op2.hasNestedPlans()) {
AbstractOperatorWithNestedPlans nestedOp = (AbstractOperatorWithNestedPlans) op2;
for (ILogicalPlan plan : nestedOp.getNestedPlans()) {
for (Mutable<ILogicalOperator> root : plan.getRoots()) {
HashSet<LogicalVariable> v2 = new HashSet<LogicalVariable>();
VariableUtilities.getLiveVariables(root.getValue(), v2);
if (v2.containsAll(usedInAccess)) {
pushAccessDown(opRef, op2, root, context, finalAnnot);
return true;
}
}
}
}
throw new AsterixRuntimeException("Field access " + access.getExpressions().get(0).getValue()
+ " does not correspond to any input of operator " + op2);
} else {
// Check if the accessed field is not one of the partitioning key
// fields. If yes, we can equate the two variables.
if (op2.getOperatorTag() == LogicalOperatorTag.DATASOURCESCAN) {
DataSourceScanOperator scan = (DataSourceScanOperator) op2;
int n = scan.getVariables().size();
LogicalVariable scanRecordVar = scan.getVariables().get(n - 1);
AbstractFunctionCallExpression accessFun = (AbstractFunctionCallExpression) access.getExpressions()
.get(0).getValue();
ILogicalExpression e0 = accessFun.getArguments().get(0).getValue();
LogicalExpressionTag tag = e0.getExpressionTag();
if (tag == LogicalExpressionTag.VARIABLE) {
VariableReferenceExpression varRef = (VariableReferenceExpression) e0;
if (varRef.getVariableReference() == scanRecordVar) {
ILogicalExpression e1 = accessFun.getArguments().get(1).getValue();
if (e1.getExpressionTag() == LogicalExpressionTag.CONSTANT) {
IDataSource<AqlSourceId> dataSource = (IDataSource<AqlSourceId>) scan.getDataSource();
AqlSourceId asid = dataSource.getId();
AqlMetadataProvider mp = (AqlMetadataProvider) context.getMetadataProvider();
AqlCompiledMetadataDeclarations metadata = mp.getMetadataDeclarations();
Dataset dataset = metadata.findDataset(asid.getDatasetName());
if (dataset == null) {
throw new AlgebricksException("Dataset " + asid.getDatasetName() + " not found.");
}
if (dataset.getDatasetType() != DatasetType.INTERNAL
&& dataset.getDatasetType() != DatasetType.FEED) {
setAsFinal(access, context, finalAnnot);
return false;
}
ConstantExpression ce = (ConstantExpression) e1;
IAObject obj = ((AsterixConstantValue) ce.getValue()).getObject();
String fldName;
if (obj.getType().getTypeTag() == ATypeTag.STRING) {
fldName = ((AString) obj).getStringValue();
} else {
int pos = ((AInt32) obj).getIntegerValue();
String tName = dataset.getItemTypeName();
IAType t = metadata.findType(tName);
if (t.getTypeTag() != ATypeTag.RECORD) {
return false;
}
ARecordType rt = (ARecordType) t;
if (pos >= rt.getFieldNames().length) {
setAsFinal(access, context, finalAnnot);
return false;
}
fldName = rt.getFieldNames()[pos];
}
int p = DatasetUtils.getPositionOfPartitioningKeyField(dataset, fldName);
if (p < 0) { // not one of the partitioning fields
setAsFinal(access, context, finalAnnot);
return false;
}
LogicalVariable keyVar = scan.getVariables().get(p);
access.getExpressions().get(0).setValue(new VariableReferenceExpression(keyVar));
return true;
}
}
}
}
setAsFinal(access, context, finalAnnot);
return false;
}
}
private void setAsFinal(ILogicalOperator access, IOptimizationContext context, String finalAnnot) {
access.getAnnotations().put(finalAnnot, true);
context.addToDontApplySet(this, access);
}
private boolean testAndModifyRedundantOp(AssignOperator access, AbstractLogicalOperator op2) {
if (op2.getOperatorTag() != LogicalOperatorTag.ASSIGN) {
return false;
}
AssignOperator a2 = (AssignOperator) op2;
if (getFirstExpr(access).equals(getFirstExpr(a2))) {
access.getExpressions().get(0).setValue(new VariableReferenceExpression(a2.getVariables().get(0)));
return true;
} else {
return false;
}
}
// indirect recursivity with propagateFieldAccessRec
private void pushAccessDown(Mutable<ILogicalOperator> fldAccessOpRef, ILogicalOperator op2,
Mutable<ILogicalOperator> inputOfOp2, IOptimizationContext context, String finalAnnot)
throws AlgebricksException {
ILogicalOperator fieldAccessOp = fldAccessOpRef.getValue();
fldAccessOpRef.setValue(op2);
List<Mutable<ILogicalOperator>> faInpList = fieldAccessOp.getInputs();
faInpList.clear();
faInpList.add(new MutableObject<ILogicalOperator>(inputOfOp2.getValue()));
inputOfOp2.setValue(fieldAccessOp);
// typing
context.computeAndSetTypeEnvironmentForOperator(fieldAccessOp);
context.computeAndSetTypeEnvironmentForOperator(op2);
propagateFieldAccessRec(inputOfOp2, context, finalAnnot);
}
private ILogicalExpression getFirstExpr(AssignOperator assign) {
return assign.getExpressions().get(0).getValue();
}
}