| package edu.uci.ics.asterix.algebra.operators.physical; |
| |
| import java.util.ArrayList; |
| import java.util.List; |
| |
| import edu.uci.ics.asterix.metadata.declared.AqlCompiledMetadataDeclarations; |
| import edu.uci.ics.asterix.metadata.declared.AqlMetadataProvider; |
| import edu.uci.ics.asterix.metadata.declared.AqlSourceId; |
| import edu.uci.ics.asterix.metadata.entities.Dataset; |
| import edu.uci.ics.asterix.om.functions.AsterixBuiltinFunctions; |
| import edu.uci.ics.asterix.optimizer.rules.am.BTreeJobGenParams; |
| import edu.uci.ics.hyracks.algebricks.common.constraints.AlgebricksPartitionConstraint; |
| import edu.uci.ics.hyracks.algebricks.common.exceptions.AlgebricksException; |
| import edu.uci.ics.hyracks.algebricks.common.utils.ListSet; |
| import edu.uci.ics.hyracks.algebricks.common.utils.Pair; |
| import edu.uci.ics.hyracks.algebricks.core.algebra.base.IHyracksJobBuilder; |
| import edu.uci.ics.hyracks.algebricks.core.algebra.base.ILogicalExpression; |
| import edu.uci.ics.hyracks.algebricks.core.algebra.base.ILogicalOperator; |
| import edu.uci.ics.hyracks.algebricks.core.algebra.base.LogicalExpressionTag; |
| import edu.uci.ics.hyracks.algebricks.core.algebra.base.LogicalVariable; |
| import edu.uci.ics.hyracks.algebricks.core.algebra.base.PhysicalOperatorTag; |
| import edu.uci.ics.hyracks.algebricks.core.algebra.expressions.AbstractFunctionCallExpression; |
| import edu.uci.ics.hyracks.algebricks.core.algebra.expressions.IVariableTypeEnvironment; |
| import edu.uci.ics.hyracks.algebricks.core.algebra.functions.FunctionIdentifier; |
| import edu.uci.ics.hyracks.algebricks.core.algebra.metadata.IDataSourceIndex; |
| import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.IOperatorSchema; |
| import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.OrderOperator.IOrder.OrderKind; |
| import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.UnnestMapOperator; |
| import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.visitors.VariableUtilities; |
| import edu.uci.ics.hyracks.algebricks.core.algebra.properties.BroadcastPartitioningProperty; |
| import edu.uci.ics.hyracks.algebricks.core.algebra.properties.ILocalStructuralProperty; |
| import edu.uci.ics.hyracks.algebricks.core.algebra.properties.IPartitioningRequirementsCoordinator; |
| import edu.uci.ics.hyracks.algebricks.core.algebra.properties.IPhysicalPropertiesVector; |
| import edu.uci.ics.hyracks.algebricks.core.algebra.properties.LocalOrderProperty; |
| import edu.uci.ics.hyracks.algebricks.core.algebra.properties.OrderColumn; |
| import edu.uci.ics.hyracks.algebricks.core.algebra.properties.PhysicalRequirements; |
| import edu.uci.ics.hyracks.algebricks.core.algebra.properties.StructuralPropertiesVector; |
| import edu.uci.ics.hyracks.algebricks.core.algebra.properties.UnorderedPartitionedProperty; |
| import edu.uci.ics.hyracks.algebricks.core.jobgen.impl.JobGenContext; |
| import edu.uci.ics.hyracks.api.dataflow.IOperatorDescriptor; |
| |
| /** |
| * Contributes the runtime operator for an unnest-map representing a BTree search. |
| */ |
| public class BTreeSearchPOperator extends IndexSearchPOperator { |
| |
| private final List<LogicalVariable> lowKeyVarList; |
| private final List<LogicalVariable> highKeyVarList; |
| private boolean isPrimaryIndex; |
| |
| public BTreeSearchPOperator(IDataSourceIndex<String, AqlSourceId> idx, boolean requiresBroadcast, |
| boolean isPrimaryIndex, List<LogicalVariable> lowKeyVarList, List<LogicalVariable> highKeyVarList) { |
| super(idx, requiresBroadcast); |
| this.isPrimaryIndex = isPrimaryIndex; |
| this.lowKeyVarList = lowKeyVarList; |
| this.highKeyVarList = highKeyVarList; |
| } |
| |
| @Override |
| public PhysicalOperatorTag getOperatorTag() { |
| return PhysicalOperatorTag.BTREE_SEARCH; |
| } |
| |
| @Override |
| public void contributeRuntimeOperator(IHyracksJobBuilder builder, JobGenContext context, ILogicalOperator op, |
| IOperatorSchema opSchema, IOperatorSchema[] inputSchemas, IOperatorSchema outerPlanSchema) |
| throws AlgebricksException { |
| UnnestMapOperator unnestMap = (UnnestMapOperator) op; |
| ILogicalExpression unnestExpr = unnestMap.getExpressionRef().getValue(); |
| if (unnestExpr.getExpressionTag() != LogicalExpressionTag.FUNCTION_CALL) { |
| throw new IllegalStateException(); |
| } |
| AbstractFunctionCallExpression unnestFuncExpr = (AbstractFunctionCallExpression) unnestExpr; |
| FunctionIdentifier funcIdent = unnestFuncExpr.getFunctionIdentifier(); |
| if (!funcIdent.equals(AsterixBuiltinFunctions.INDEX_SEARCH)) { |
| return; |
| } |
| BTreeJobGenParams jobGenParams = new BTreeJobGenParams(); |
| jobGenParams.readFromFuncArgs(unnestFuncExpr.getArguments()); |
| int[] lowKeyIndexes = getKeyIndexes(jobGenParams.getLowKeyVarList(), inputSchemas); |
| int[] highKeyIndexes = getKeyIndexes(jobGenParams.getHighKeyVarList(), inputSchemas); |
| AqlMetadataProvider metadataProvider = (AqlMetadataProvider) context.getMetadataProvider(); |
| AqlCompiledMetadataDeclarations metadata = metadataProvider.getMetadataDeclarations(); |
| Dataset dataset = metadata.findDataset(jobGenParams.getDatasetName()); |
| IVariableTypeEnvironment typeEnv = context.getTypeEnvironment(op); |
| List<LogicalVariable> outputVars = unnestMap.getVariables(); |
| if (jobGenParams.getRetainInput()) { |
| outputVars = new ArrayList<LogicalVariable>(); |
| VariableUtilities.getLiveVariables(unnestMap, outputVars); |
| } |
| Pair<IOperatorDescriptor, AlgebricksPartitionConstraint> btreeSearch = AqlMetadataProvider.buildBtreeRuntime( |
| builder.getJobSpec(), outputVars, opSchema, typeEnv, metadata, context, jobGenParams.getRetainInput(), |
| jobGenParams.getDatasetName(), dataset, jobGenParams.getIndexName(), lowKeyIndexes, highKeyIndexes, |
| jobGenParams.isLowKeyInclusive(), jobGenParams.isHighKeyInclusive()); |
| builder.contributeHyracksOperator(unnestMap, btreeSearch.first); |
| builder.contributeAlgebricksPartitionConstraint(btreeSearch.first, btreeSearch.second); |
| |
| ILogicalOperator srcExchange = unnestMap.getInputs().get(0).getValue(); |
| builder.contributeGraphEdge(srcExchange, 0, unnestMap, 0); |
| } |
| |
| public PhysicalRequirements getRequiredPropertiesForChildren(ILogicalOperator op, |
| IPhysicalPropertiesVector reqdByParent) { |
| if (requiresBroadcast) { |
| if (isPrimaryIndex) { |
| // For primary indexes, we require re-partitioning on the primary key, and not a broadcast. |
| // Also, add a local sorting property to enforce a sort before the primary-index operator. |
| StructuralPropertiesVector[] pv = new StructuralPropertiesVector[1]; |
| ListSet<LogicalVariable> searchKeyVars = new ListSet<LogicalVariable>(); |
| searchKeyVars.addAll(lowKeyVarList); |
| searchKeyVars.addAll(highKeyVarList); |
| List<ILocalStructuralProperty> propsLocal = new ArrayList<ILocalStructuralProperty>(); |
| for (LogicalVariable orderVar : searchKeyVars) { |
| propsLocal.add(new LocalOrderProperty(new OrderColumn(orderVar, OrderKind.ASC))); |
| } |
| pv[0] = new StructuralPropertiesVector(new UnorderedPartitionedProperty(searchKeyVars, null), |
| propsLocal); |
| return new PhysicalRequirements(pv, IPartitioningRequirementsCoordinator.NO_COORDINATION); |
| } else { |
| StructuralPropertiesVector[] pv = new StructuralPropertiesVector[1]; |
| pv[0] = new StructuralPropertiesVector(new BroadcastPartitioningProperty(null), null); |
| return new PhysicalRequirements(pv, IPartitioningRequirementsCoordinator.NO_COORDINATION); |
| } |
| } else { |
| return super.getRequiredPropertiesForChildren(op, reqdByParent); |
| } |
| } |
| } |