| package edu.uci.ics.asterix.file; |
| |
| import edu.uci.ics.asterix.common.context.AsterixIndexRegistryProvider; |
| import edu.uci.ics.asterix.common.context.AsterixStorageManagerInterface; |
| import edu.uci.ics.asterix.common.exceptions.AsterixException; |
| import edu.uci.ics.hyracks.algebricks.common.constraints.AlgebricksPartitionConstraintHelper; |
| import edu.uci.ics.hyracks.algebricks.common.exceptions.AlgebricksException; |
| import edu.uci.ics.hyracks.algebricks.core.jobgen.impl.ConnectorPolicyAssignmentPolicy; |
| import edu.uci.ics.hyracks.algebricks.core.rewriter.base.PhysicalOptimizationConfig; |
| import edu.uci.ics.hyracks.algebricks.runtime.operators.meta.AlgebricksMetaOperatorDescriptor; |
| import edu.uci.ics.hyracks.api.job.JobSpecification; |
| import edu.uci.ics.hyracks.dataflow.std.base.AbstractOperatorDescriptor; |
| import edu.uci.ics.hyracks.dataflow.std.connectors.OneToOneConnectorDescriptor; |
| import edu.uci.ics.hyracks.dataflow.std.sort.ExternalSortOperatorDescriptor; |
| import edu.uci.ics.hyracks.storage.am.btree.dataflow.BTreeDataflowHelperFactory; |
| import edu.uci.ics.hyracks.storage.am.btree.dataflow.BTreeSearchOperatorDescriptor; |
| import edu.uci.ics.hyracks.storage.am.btree.impls.BTree; |
| import edu.uci.ics.hyracks.storage.am.common.dataflow.TreeIndexBulkLoadOperatorDescriptor; |
| import edu.uci.ics.hyracks.storage.am.common.dataflow.TreeIndexCreateOperatorDescriptor; |
| import edu.uci.ics.hyracks.storage.am.common.impls.NoOpOperationCallbackProvider; |
| |
| public class SecondaryBTreeCreator extends SecondaryIndexCreator { |
| |
| protected SecondaryBTreeCreator(PhysicalOptimizationConfig physOptConf) { |
| super(physOptConf); |
| } |
| |
| @Override |
| public JobSpecification buildCreationJobSpec() throws AsterixException, AlgebricksException { |
| JobSpecification spec = new JobSpecification(); |
| TreeIndexCreateOperatorDescriptor secondaryIndexCreateOp = new TreeIndexCreateOperatorDescriptor(spec, |
| AsterixStorageManagerInterface.INSTANCE, AsterixIndexRegistryProvider.INSTANCE, |
| secondaryFileSplitProvider, secondaryRecDesc.getTypeTraits(), secondaryComparatorFactories, |
| new BTreeDataflowHelperFactory(), NoOpOperationCallbackProvider.INSTANCE); |
| AlgebricksPartitionConstraintHelper.setPartitionConstraintInJobSpec(spec, secondaryIndexCreateOp, |
| secondaryPartitionConstraint); |
| spec.addRoot(secondaryIndexCreateOp); |
| spec.setConnectorPolicyAssignmentPolicy(new ConnectorPolicyAssignmentPolicy()); |
| return spec; |
| } |
| |
| @Override |
| public JobSpecification buildLoadingJobSpec() throws AsterixException, AlgebricksException { |
| JobSpecification spec = new JobSpecification(); |
| |
| // Create dummy key provider for feeding the primary index scan. |
| AbstractOperatorDescriptor keyProviderOp = createDummyKeyProviderOp(spec); |
| |
| // Create primary index scan op. |
| BTreeSearchOperatorDescriptor primaryScanOp = createPrimaryIndexScanOp(spec); |
| |
| // Assign op. |
| AlgebricksMetaOperatorDescriptor asterixAssignOp = createAssignOp(spec, primaryScanOp, numSecondaryKeys); |
| |
| // If any of the secondary fields are nullable, then add a select op that filters nulls. |
| AlgebricksMetaOperatorDescriptor selectOp = null; |
| if (anySecondaryKeyIsNullable) { |
| selectOp = createFilterNullsSelectOp(spec, numSecondaryKeys); |
| } |
| |
| // Sort by secondary keys. |
| ExternalSortOperatorDescriptor sortOp = createSortOp(spec, secondaryComparatorFactories, secondaryRecDesc); |
| AlgebricksPartitionConstraintHelper.setPartitionConstraintInJobSpec(spec, sortOp, primaryPartitionConstraint); |
| |
| // Create secondary BTree bulk load op. |
| TreeIndexBulkLoadOperatorDescriptor secondaryBulkLoadOp = createTreeIndexBulkLoadOp(spec, numSecondaryKeys, |
| new BTreeDataflowHelperFactory(), BTree.DEFAULT_FILL_FACTOR); |
| |
| // Connect the operators. |
| spec.connect(new OneToOneConnectorDescriptor(spec), keyProviderOp, 0, primaryScanOp, 0); |
| spec.connect(new OneToOneConnectorDescriptor(spec), primaryScanOp, 0, asterixAssignOp, 0); |
| if (anySecondaryKeyIsNullable) { |
| spec.connect(new OneToOneConnectorDescriptor(spec), asterixAssignOp, 0, selectOp, 0); |
| spec.connect(new OneToOneConnectorDescriptor(spec), selectOp, 0, sortOp, 0); |
| } else { |
| spec.connect(new OneToOneConnectorDescriptor(spec), asterixAssignOp, 0, sortOp, 0); |
| } |
| spec.connect(new OneToOneConnectorDescriptor(spec), sortOp, 0, secondaryBulkLoadOp, 0); |
| spec.addRoot(secondaryBulkLoadOp); |
| spec.setConnectorPolicyAssignmentPolicy(new ConnectorPolicyAssignmentPolicy()); |
| return spec; |
| } |
| } |