blob: 4873ed9855ee190290a6ed1d87a7eb64e7c9eb76 [file] [log] [blame]
package edu.uci.ics.asterix.optimizer.rules;
import java.util.ArrayList;
import java.util.List;
import org.apache.commons.lang3.mutable.Mutable;
import org.apache.commons.lang3.mutable.MutableObject;
import edu.uci.ics.asterix.aql.util.FunctionUtils;
import edu.uci.ics.asterix.common.config.DatasetConfig.DatasetType;
import edu.uci.ics.asterix.metadata.declared.AqlCompiledDatasetDecl;
import edu.uci.ics.asterix.metadata.declared.AqlCompiledIndexDecl;
import edu.uci.ics.asterix.metadata.declared.AqlCompiledMetadataDeclarations;
import edu.uci.ics.asterix.metadata.declared.AqlDataSource;
import edu.uci.ics.asterix.metadata.declared.AqlIndex;
import edu.uci.ics.asterix.metadata.declared.AqlMetadataProvider;
import edu.uci.ics.asterix.metadata.declared.AqlCompiledIndexDecl.IndexKind;
import edu.uci.ics.asterix.metadata.utils.DatasetUtils;
import edu.uci.ics.asterix.om.base.AInt32;
import edu.uci.ics.asterix.om.constants.AsterixConstantValue;
import edu.uci.ics.asterix.om.functions.AsterixBuiltinFunctions;
import edu.uci.ics.asterix.om.types.ARecordType;
import edu.uci.ics.asterix.om.types.ATypeTag;
import edu.uci.ics.asterix.om.types.IAType;
import edu.uci.ics.asterix.om.util.NonTaggedFormatUtil;
import edu.uci.ics.hyracks.algebricks.core.algebra.base.ILogicalExpression;
import edu.uci.ics.hyracks.algebricks.core.algebra.base.ILogicalOperator;
import edu.uci.ics.hyracks.algebricks.core.algebra.base.IOptimizationContext;
import edu.uci.ics.hyracks.algebricks.core.algebra.base.LogicalExpressionTag;
import edu.uci.ics.hyracks.algebricks.core.algebra.base.LogicalOperatorTag;
import edu.uci.ics.hyracks.algebricks.core.algebra.base.LogicalVariable;
import edu.uci.ics.hyracks.algebricks.core.algebra.expressions.AbstractFunctionCallExpression;
import edu.uci.ics.hyracks.algebricks.core.algebra.expressions.ConstantExpression;
import edu.uci.ics.hyracks.algebricks.core.algebra.expressions.ScalarFunctionCallExpression;
import edu.uci.ics.hyracks.algebricks.core.algebra.expressions.VariableReferenceExpression;
import edu.uci.ics.hyracks.algebricks.core.algebra.functions.FunctionIdentifier;
import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.AbstractLogicalOperator;
import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.AssignOperator;
import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.IndexInsertDeleteOperator;
import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.InsertDeleteOperator;
import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.ProjectOperator;
import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.visitors.VariableUtilities;
import edu.uci.ics.hyracks.algebricks.core.api.exceptions.AlgebricksException;
import edu.uci.ics.hyracks.algebricks.core.rewriter.base.IAlgebraicRewriteRule;
public class IntroduceSecondaryIndexInsertDeleteRule implements IAlgebraicRewriteRule {
@Override
public boolean rewritePre(Mutable<ILogicalOperator> opRef, IOptimizationContext context) throws AlgebricksException {
return false;
}
@Override
public boolean rewritePost(Mutable<ILogicalOperator> opRef, IOptimizationContext context)
throws AlgebricksException {
AbstractLogicalOperator op0 = (AbstractLogicalOperator) opRef.getValue();
if (op0.getOperatorTag() != LogicalOperatorTag.SINK) {
return false;
}
AbstractLogicalOperator op1 = (AbstractLogicalOperator) op0.getInputs().get(0).getValue();
if (op1.getOperatorTag() != LogicalOperatorTag.INSERT_DELETE) {
return false;
}
FunctionIdentifier fid = null;
AbstractLogicalOperator op2 = op1;
List<LogicalVariable> recordVar = new ArrayList<LogicalVariable>();
while (fid != AsterixBuiltinFunctions.OPEN_RECORD_CONSTRUCTOR) {
if (op2.getInputs().size() == 0)
return false;
op2 = (AbstractLogicalOperator) op2.getInputs().get(0).getValue();
if (op2.getOperatorTag() != LogicalOperatorTag.ASSIGN) {
continue;
} else {
AssignOperator assignOp = (AssignOperator) op2;
ILogicalExpression assignExpr = assignOp.getExpressions().get(0).getValue();
if (assignExpr.getExpressionTag() == LogicalExpressionTag.FUNCTION_CALL) {
ScalarFunctionCallExpression funcExpr = (ScalarFunctionCallExpression) assignOp.getExpressions()
.get(0).getValue();
fid = funcExpr.getFunctionIdentifier();
}
}
}
AssignOperator assignOp2 = (AssignOperator) op2;
recordVar.addAll(assignOp2.getVariables());
InsertDeleteOperator insertOp = (InsertDeleteOperator) op1;
AqlDataSource datasetSource = (AqlDataSource) insertOp.getDataSource();
AqlMetadataProvider mp = (AqlMetadataProvider) context.getMetadataProvider();
AqlCompiledMetadataDeclarations metadata = mp.getMetadataDeclarations();
String datasetName = datasetSource.getId().getDatasetName();
AqlCompiledDatasetDecl adecl = metadata.findDataset(datasetName);
if (adecl == null) {
throw new AlgebricksException("Unknown dataset " + datasetName);
}
if (adecl.getDatasetType() == DatasetType.EXTERNAL)
return false;
List<LogicalVariable> projectVars = new ArrayList<LogicalVariable>();
VariableUtilities.getUsedVariables(op1, projectVars);
// create operators for secondary index insert
String itemTypeName = adecl.getItemTypeName();
IAType itemType = metadata.findType(itemTypeName);
if (itemType.getTypeTag() != ATypeTag.RECORD) {
throw new AlgebricksException("Only record types can be indexed.");
}
ARecordType recType = (ARecordType) itemType;
List<AqlCompiledIndexDecl> secondaryIndexes = DatasetUtils.getSecondaryIndexes(adecl);
if (secondaryIndexes.size() <= 0)
return false;
ILogicalOperator currentTop = op1;
for (AqlCompiledIndexDecl index : secondaryIndexes) {
List<String> secondaryKeyFields = index.getFieldExprs();
List<LogicalVariable> secondaryKeyVars = new ArrayList<LogicalVariable>();
List<Mutable<ILogicalExpression>> expressions = new ArrayList<Mutable<ILogicalExpression>>();
List<Mutable<ILogicalExpression>> secondaryExpressions = new ArrayList<Mutable<ILogicalExpression>>();
for (String secondaryKey : secondaryKeyFields) {
Mutable<ILogicalExpression> varRef = new MutableObject<ILogicalExpression>(
new VariableReferenceExpression(recordVar.get(0)));
String[] fieldNames = recType.getFieldNames();
int pos = -1;
for (int j = 0; j < fieldNames.length; j++)
if (fieldNames[j].equals(secondaryKey))
pos = j;
Mutable<ILogicalExpression> indexRef = new MutableObject<ILogicalExpression>(new ConstantExpression(
new AsterixConstantValue(new AInt32(pos))));
AbstractFunctionCallExpression func = new ScalarFunctionCallExpression(
FunctionUtils.getFunctionInfo(AsterixBuiltinFunctions.FIELD_ACCESS_BY_INDEX), varRef, indexRef);
expressions.add(new MutableObject<ILogicalExpression>(func));
LogicalVariable newVar = context.newVar();
secondaryKeyVars.add(newVar);
}
AssignOperator assign = new AssignOperator(secondaryKeyVars, expressions);
ProjectOperator project = new ProjectOperator(projectVars);
if (index.getKind() == IndexKind.BTREE) {
for (LogicalVariable secondaryKeyVar : secondaryKeyVars)
secondaryExpressions.add(new MutableObject<ILogicalExpression>(new VariableReferenceExpression(
secondaryKeyVar)));
AqlIndex dataSourceIndex = new AqlIndex(index, metadata, datasetName);
IndexInsertDeleteOperator indexUpdate = new IndexInsertDeleteOperator(dataSourceIndex,
insertOp.getPrimaryKeyExpressions(), secondaryExpressions, insertOp.getOperation());
indexUpdate.getInputs().add(new MutableObject<ILogicalOperator>(assign));
assign.getInputs().add(new MutableObject<ILogicalOperator>(project));
project.getInputs().add(new MutableObject<ILogicalOperator>(currentTop));
currentTop = indexUpdate;
context.computeAndSetTypeEnvironmentForOperator(project);
context.computeAndSetTypeEnvironmentForOperator(assign);
context.computeAndSetTypeEnvironmentForOperator(indexUpdate);
} else if (index.getKind() == IndexKind.RTREE) {
IAType spatialType = null;
for (String secondaryKey : secondaryKeyFields) {
spatialType = keyFieldType(secondaryKey, recType);
}
int dimension = NonTaggedFormatUtil.getNumDimensions(spatialType.getTypeTag());
int numKeys = dimension * 2;
List<LogicalVariable> keyVarList = new ArrayList<LogicalVariable>();
List<Mutable<ILogicalExpression>> keyExprList = new ArrayList<Mutable<ILogicalExpression>>();
for (int i = 0; i < numKeys; i++) {
LogicalVariable keyVar = context.newVar();
keyVarList.add(keyVar);
AbstractFunctionCallExpression createMBR = new ScalarFunctionCallExpression(
FunctionUtils.getFunctionInfo(AsterixBuiltinFunctions.CREATE_MBR));
createMBR.getArguments().add(
new MutableObject<ILogicalExpression>(new VariableReferenceExpression(secondaryKeyVars
.get(0))));
createMBR.getArguments().add(
new MutableObject<ILogicalExpression>(new ConstantExpression(new AsterixConstantValue(
new AInt32(dimension)))));
createMBR.getArguments().add(
new MutableObject<ILogicalExpression>(new ConstantExpression(new AsterixConstantValue(
new AInt32(i)))));
keyExprList.add(new MutableObject<ILogicalExpression>(createMBR));
}
for (LogicalVariable secondaryKeyVar : keyVarList)
secondaryExpressions.add(new MutableObject<ILogicalExpression>(new VariableReferenceExpression(
secondaryKeyVar)));
AqlIndex dataSourceIndex = new AqlIndex(index, metadata, datasetName);
IndexInsertDeleteOperator indexUpdate = new IndexInsertDeleteOperator(dataSourceIndex,
insertOp.getPrimaryKeyExpressions(), secondaryExpressions, insertOp.getOperation());
AssignOperator assignCoordinates = new AssignOperator(keyVarList, keyExprList);
indexUpdate.getInputs().add(new MutableObject<ILogicalOperator>(assignCoordinates));
assignCoordinates.getInputs().add(new MutableObject<ILogicalOperator>(assign));
assign.getInputs().add(new MutableObject<ILogicalOperator>(project));
project.getInputs().add(new MutableObject<ILogicalOperator>(currentTop));
currentTop = indexUpdate;
context.computeAndSetTypeEnvironmentForOperator(project);
context.computeAndSetTypeEnvironmentForOperator(assign);
context.computeAndSetTypeEnvironmentForOperator(assignCoordinates);
context.computeAndSetTypeEnvironmentForOperator(indexUpdate);
}
}
op0.getInputs().clear();
op0.getInputs().add(new MutableObject<ILogicalOperator>(currentTop));
return true;
}
public static IAType keyFieldType(String expr, ARecordType recType) throws AlgebricksException {
String[] names = recType.getFieldNames();
int n = names.length;
for (int i = 0; i < n; i++) {
if (names[i].equals(expr)) {
return recType.getFieldTypes()[i];
}
}
throw new AlgebricksException("Could not find field " + expr + " in the schema.");
}
}