Implemented indexed NL joins using secondary RTrees.

git-svn-id: https://asterixdb.googlecode.com/svn/branches/asterix_stabilization@842 eaa15691-b419-025a-1212-ee371bd00084
diff --git a/asterix-algebra/src/main/java/edu/uci/ics/asterix/algebra/operators/physical/BTreeSearchPOperator.java b/asterix-algebra/src/main/java/edu/uci/ics/asterix/algebra/operators/physical/BTreeSearchPOperator.java
index 0486060..e653b82 100644
--- a/asterix-algebra/src/main/java/edu/uci/ics/asterix/algebra/operators/physical/BTreeSearchPOperator.java
+++ b/asterix-algebra/src/main/java/edu/uci/ics/asterix/algebra/operators/physical/BTreeSearchPOperator.java
@@ -90,8 +90,8 @@
         }
         Pair<IOperatorDescriptor, AlgebricksPartitionConstraint> btreeSearch = AqlMetadataProvider.buildBtreeRuntime(
                 builder.getJobSpec(), outputVars, opSchema, typeEnv, metadata, context, jobGenParams.getRetainInput(),
-                jobGenParams.getDatasetName(), dataset, jobGenParams.getIndexName(), lowKeyIndexes, highKeyIndexes,
-                jobGenParams.isLowKeyInclusive(), jobGenParams.isHighKeyInclusive());
+                dataset, jobGenParams.getIndexName(), lowKeyIndexes, highKeyIndexes, jobGenParams.isLowKeyInclusive(),
+                jobGenParams.isHighKeyInclusive());
         builder.contributeHyracksOperator(unnestMap, btreeSearch.first);
         builder.contributeAlgebricksPartitionConstraint(btreeSearch.first, btreeSearch.second);
 
diff --git a/asterix-algebra/src/main/java/edu/uci/ics/asterix/algebra/operators/physical/InvertedIndexPOperator.java b/asterix-algebra/src/main/java/edu/uci/ics/asterix/algebra/operators/physical/InvertedIndexPOperator.java
index ae2559b..60c4155 100644
--- a/asterix-algebra/src/main/java/edu/uci/ics/asterix/algebra/operators/physical/InvertedIndexPOperator.java
+++ b/asterix-algebra/src/main/java/edu/uci/ics/asterix/algebra/operators/physical/InvertedIndexPOperator.java
@@ -102,8 +102,7 @@
             SearchModifierType searchModifierType, IAlgebricksConstantValue similarityThreshold)
             throws AlgebricksException {
         IAObject simThresh = ((AsterixConstantValue) similarityThreshold).getObject();
-        String itemTypeName = dataset.getItemTypeName();
-        IAType itemType = metadata.findType(itemTypeName);
+        IAType itemType = metadata.findType(dataset.getItemTypeName());
         int numPrimaryKeys = DatasetUtils.getPartitioningKeys(dataset).size();
         Index secondaryIndex = metadata.getIndex(dataset.getDataverseName(), dataset.getDatasetName(), indexName);
         if (secondaryIndex == null) {
diff --git a/asterix-algebra/src/main/java/edu/uci/ics/asterix/algebra/operators/physical/RTreeSearchPOperator.java b/asterix-algebra/src/main/java/edu/uci/ics/asterix/algebra/operators/physical/RTreeSearchPOperator.java
index 27a477c..ca973f9 100644
--- a/asterix-algebra/src/main/java/edu/uci/ics/asterix/algebra/operators/physical/RTreeSearchPOperator.java
+++ b/asterix-algebra/src/main/java/edu/uci/ics/asterix/algebra/operators/physical/RTreeSearchPOperator.java
@@ -1,5 +1,8 @@
 package edu.uci.ics.asterix.algebra.operators.physical;
 
+import java.util.ArrayList;
+import java.util.List;
+
 import edu.uci.ics.asterix.metadata.declared.AqlCompiledMetadataDeclarations;
 import edu.uci.ics.asterix.metadata.declared.AqlMetadataProvider;
 import edu.uci.ics.asterix.metadata.declared.AqlSourceId;
@@ -13,12 +16,15 @@
 import edu.uci.ics.hyracks.algebricks.core.algebra.base.ILogicalExpression;
 import edu.uci.ics.hyracks.algebricks.core.algebra.base.ILogicalOperator;
 import edu.uci.ics.hyracks.algebricks.core.algebra.base.LogicalExpressionTag;
+import edu.uci.ics.hyracks.algebricks.core.algebra.base.LogicalVariable;
 import edu.uci.ics.hyracks.algebricks.core.algebra.base.PhysicalOperatorTag;
 import edu.uci.ics.hyracks.algebricks.core.algebra.expressions.AbstractFunctionCallExpression;
+import edu.uci.ics.hyracks.algebricks.core.algebra.expressions.IVariableTypeEnvironment;
 import edu.uci.ics.hyracks.algebricks.core.algebra.functions.FunctionIdentifier;
 import edu.uci.ics.hyracks.algebricks.core.algebra.metadata.IDataSourceIndex;
 import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.IOperatorSchema;
 import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.UnnestMapOperator;
+import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.visitors.VariableUtilities;
 import edu.uci.ics.hyracks.algebricks.core.jobgen.impl.JobGenContext;
 import edu.uci.ics.hyracks.api.dataflow.IOperatorDescriptor;
 
@@ -50,20 +56,21 @@
         if (!funcIdent.equals(AsterixBuiltinFunctions.INDEX_SEARCH)) {
             return;
         }
-
         RTreeJobGenParams jobGenParams = new RTreeJobGenParams();
         jobGenParams.readFromFuncArgs(unnestFuncExpr.getArguments());
-
         int[] keyIndexes = getKeyIndexes(jobGenParams.getKeyVarList(), inputSchemas);
         AqlMetadataProvider mp = (AqlMetadataProvider) context.getMetadataProvider();
         AqlCompiledMetadataDeclarations metadata = mp.getMetadataDeclarations();
         Dataset dataset = metadata.findDataset(jobGenParams.getDatasetName());
-        if (dataset == null) {
-            throw new AlgebricksException("Unknown dataset " + jobGenParams.getDatasetName());
+        IVariableTypeEnvironment typeEnv = context.getTypeEnvironment(unnestMap);
+        List<LogicalVariable> outputVars = unnestMap.getVariables();
+        if (jobGenParams.getRetainInput()) {
+            outputVars = new ArrayList<LogicalVariable>();
+            VariableUtilities.getLiveVariables(unnestMap, outputVars);
         }
         Pair<IOperatorDescriptor, AlgebricksPartitionConstraint> rtreeSearch = AqlMetadataProvider.buildRtreeRuntime(
-                metadata, context, builder.getJobSpec(), jobGenParams.getDatasetName(), dataset,
-                jobGenParams.getIndexName(), keyIndexes);
+                builder.getJobSpec(), outputVars, opSchema, typeEnv, metadata, context, jobGenParams.getRetainInput(),
+                dataset, jobGenParams.getIndexName(), keyIndexes);
         builder.contributeHyracksOperator(unnestMap, rtreeSearch.first);
         builder.contributeAlgebricksPartitionConstraint(rtreeSearch.first, rtreeSearch.second);
         ILogicalOperator srcExchange = unnestMap.getInputs().get(0).getValue();
diff --git a/asterix-algebra/src/main/java/edu/uci/ics/asterix/optimizer/rules/am/AccessMethodUtils.java b/asterix-algebra/src/main/java/edu/uci/ics/asterix/optimizer/rules/am/AccessMethodUtils.java
index 26a8b83..24bd8ed 100644
--- a/asterix-algebra/src/main/java/edu/uci/ics/asterix/optimizer/rules/am/AccessMethodUtils.java
+++ b/asterix-algebra/src/main/java/edu/uci/ics/asterix/optimizer/rules/am/AccessMethodUtils.java
@@ -208,6 +208,35 @@
         return primaryKeyVars;
     }
 
+    /**
+     * Returns the search key expression which feeds a secondary-index search. If we are optimizing a selection query then this method returns
+     * the a ConstantExpression from the first constant value in the optimizable function expression.
+     * If we are optimizing a join, then this method returns the VariableReferenceExpression that should feed the secondary index probe.
+     */
+    public static ILogicalExpression createSearchKeyExpr(IOptimizableFuncExpr optFuncExpr,
+            OptimizableOperatorSubTree indexSubTree, OptimizableOperatorSubTree probeSubTree) {
+        if (probeSubTree == null) {
+            // We are optimizing a selection query. Search key is a constant.
+            return new ConstantExpression(optFuncExpr.getConstantVal(0));
+        } else {
+            // We are optimizing a join query. Determine which variable feeds the secondary index. 
+            if (optFuncExpr.getOperatorSubTree(0) == probeSubTree) {
+                return new VariableReferenceExpression(optFuncExpr.getLogicalVar(0));
+            } else {
+                return new VariableReferenceExpression(optFuncExpr.getLogicalVar(1));
+            }
+        }
+    }
+
+    /**
+     *  Returns the first expr optimizable by this index.
+     */
+    public static IOptimizableFuncExpr chooseFirstOptFuncExpr(Index chosenIndex, AccessMethodAnalysisContext analysisCtx) {
+        List<Integer> indexExprs = analysisCtx.getIndexExprs(chosenIndex);
+        int firstExprIndex = indexExprs.get(0);
+        return analysisCtx.matchedFuncExprs.get(firstExprIndex);
+    }
+    
     public static UnnestMapOperator createSecondaryIndexUnnestMap(Dataset dataset, ARecordType recordType, Index index,
             ILogicalOperator inputOp, AccessMethodJobGenParams jobGenParams, IOptimizationContext context,
             boolean outputPrimaryKeysOnly, boolean retainInput) throws AlgebricksException {
diff --git a/asterix-algebra/src/main/java/edu/uci/ics/asterix/optimizer/rules/am/BTreeAccessMethod.java b/asterix-algebra/src/main/java/edu/uci/ics/asterix/optimizer/rules/am/BTreeAccessMethod.java
index 6d5c4fc..15c216d 100644
--- a/asterix-algebra/src/main/java/edu/uci/ics/asterix/optimizer/rules/am/BTreeAccessMethod.java
+++ b/asterix-algebra/src/main/java/edu/uci/ics/asterix/optimizer/rules/am/BTreeAccessMethod.java
@@ -22,7 +22,6 @@
 import edu.uci.ics.hyracks.algebricks.core.algebra.base.LogicalExpressionTag;
 import edu.uci.ics.hyracks.algebricks.core.algebra.base.LogicalVariable;
 import edu.uci.ics.hyracks.algebricks.core.algebra.expressions.AbstractFunctionCallExpression;
-import edu.uci.ics.hyracks.algebricks.core.algebra.expressions.ConstantExpression;
 import edu.uci.ics.hyracks.algebricks.core.algebra.expressions.IndexedNLJoinExpressionAnnotation;
 import edu.uci.ics.hyracks.algebricks.core.algebra.expressions.ScalarFunctionCallExpression;
 import edu.uci.ics.hyracks.algebricks.core.algebra.expressions.VariableReferenceExpression;
@@ -90,21 +89,6 @@
         return false;
     }
 
-    private ILogicalExpression createSearchKeyExpr(IOptimizableFuncExpr optFuncExpr,
-            OptimizableOperatorSubTree indexSubTree, OptimizableOperatorSubTree probeSubTree) {
-        if (probeSubTree == null) {
-            // We are optimizing a selection query. Search key is a constant.
-            return new ConstantExpression(optFuncExpr.getConstantVal(0));
-        } else {
-            // We are optimizing a join query. Determine which variable feeds the secondary index. 
-            if (optFuncExpr.getOperatorSubTree(0) == probeSubTree) {
-                return new VariableReferenceExpression(optFuncExpr.getLogicalVar(0));
-            } else {
-                return new VariableReferenceExpression(optFuncExpr.getLogicalVar(1));
-            }
-        }
-    }
-    
     @Override
     public boolean applySelectPlanTransformation(Mutable<ILogicalOperator> selectRef,
             OptimizableOperatorSubTree subTree, Index chosenIndex, AccessMethodAnalysisContext analysisCtx,
@@ -177,7 +161,7 @@
             return false;
         }
         
-        // TODO: If there are conditions left, add a new select operator on top!
+        // TODO: If there are conditions left, add a new select operator on top.
         joinRef.setValue(primaryIndexUnnestOp);
         return true;
     }
@@ -220,7 +204,8 @@
             if (keyPos < 0) {
                 throw new InternalError();
             }
-            ILogicalExpression searchKeyExpr = createSearchKeyExpr(optFuncExpr, indexSubTree, probeSubTree);
+            ILogicalExpression searchKeyExpr = AccessMethodUtils.createSearchKeyExpr(optFuncExpr, indexSubTree,
+                    probeSubTree);
             LimitType limit = getLimitType(optFuncExpr);
             switch (limit) {
                 case EQUAL: {
diff --git a/asterix-algebra/src/main/java/edu/uci/ics/asterix/optimizer/rules/am/IntroduceJoinAccessMethodRule.java b/asterix-algebra/src/main/java/edu/uci/ics/asterix/optimizer/rules/am/IntroduceJoinAccessMethodRule.java
index 1c9477e..d73bdc1 100644
--- a/asterix-algebra/src/main/java/edu/uci/ics/asterix/optimizer/rules/am/IntroduceJoinAccessMethodRule.java
+++ b/asterix-algebra/src/main/java/edu/uci/ics/asterix/optimizer/rules/am/IntroduceJoinAccessMethodRule.java
@@ -58,6 +58,7 @@
     protected static Map<FunctionIdentifier, List<IAccessMethod>> accessMethods = new HashMap<FunctionIdentifier, List<IAccessMethod>>();
     static {
         registerAccessMethod(BTreeAccessMethod.INSTANCE, accessMethods);
+        registerAccessMethod(RTreeAccessMethod.INSTANCE, accessMethods);
         registerAccessMethod(InvertedIndexAccessMethod.INSTANCE, accessMethods);        
     }
 
diff --git a/asterix-algebra/src/main/java/edu/uci/ics/asterix/optimizer/rules/am/InvertedIndexAccessMethod.java b/asterix-algebra/src/main/java/edu/uci/ics/asterix/optimizer/rules/am/InvertedIndexAccessMethod.java
index dbd92c0..f547010 100644
--- a/asterix-algebra/src/main/java/edu/uci/ics/asterix/optimizer/rules/am/InvertedIndexAccessMethod.java
+++ b/asterix-algebra/src/main/java/edu/uci/ics/asterix/optimizer/rules/am/InvertedIndexAccessMethod.java
@@ -377,7 +377,7 @@
     public boolean applySelectPlanTransformation(Mutable<ILogicalOperator> selectRef,
             OptimizableOperatorSubTree subTree, Index chosenIndex, AccessMethodAnalysisContext analysisCtx,
             IOptimizationContext context) throws AlgebricksException {
-        IOptimizableFuncExpr optFuncExpr = chooseOptFuncExpr(chosenIndex, analysisCtx);
+        IOptimizableFuncExpr optFuncExpr = AccessMethodUtils.chooseFirstOptFuncExpr(chosenIndex, analysisCtx);
         ILogicalOperator indexPlanRootOp = createSecondaryToPrimaryPlan(subTree, null, chosenIndex, optFuncExpr, false,
                 false, context);
         // Replace the datasource scan with the new plan rooted at primaryIndexUnnestMap.
@@ -401,7 +401,7 @@
             indexSubTree = rightSubTree;
             probeSubTree = leftSubTree;
         }
-        IOptimizableFuncExpr optFuncExpr = chooseOptFuncExpr(chosenIndex, analysisCtx);
+        IOptimizableFuncExpr optFuncExpr = AccessMethodUtils.chooseFirstOptFuncExpr(chosenIndex, analysisCtx);
 
         // Clone the original join condition because we may have to modify it (and we also need the original).
         InnerJoinOperator join = (InnerJoinOperator) joinRef.getValue();
@@ -464,14 +464,6 @@
         return true;
     }
 
-    private IOptimizableFuncExpr chooseOptFuncExpr(Index chosenIndex, AccessMethodAnalysisContext analysisCtx) {
-        // TODO: We can probably do something smarter here.
-        // Pick the first expr optimizable by this index.
-        List<Integer> indexExprs = analysisCtx.getIndexExprs(chosenIndex);
-        int firstExprIndex = indexExprs.get(0);
-        return analysisCtx.matchedFuncExprs.get(firstExprIndex);
-    }
-
     private Mutable<ILogicalOperator> createPanicNestedLoopJoinPlan(Mutable<ILogicalOperator> joinRef,
             OptimizableOperatorSubTree indexSubTree, OptimizableOperatorSubTree probeSubTree,
             IOptimizableFuncExpr optFuncExpr, Index chosenIndex, IOptimizationContext context)
diff --git a/asterix-algebra/src/main/java/edu/uci/ics/asterix/optimizer/rules/am/RTreeAccessMethod.java b/asterix-algebra/src/main/java/edu/uci/ics/asterix/optimizer/rules/am/RTreeAccessMethod.java
index dfd3ff7..231a6d8 100644
--- a/asterix-algebra/src/main/java/edu/uci/ics/asterix/optimizer/rules/am/RTreeAccessMethod.java
+++ b/asterix-algebra/src/main/java/edu/uci/ics/asterix/optimizer/rules/am/RTreeAccessMethod.java
@@ -26,8 +26,11 @@
 import edu.uci.ics.hyracks.algebricks.core.algebra.expressions.ConstantExpression;
 import edu.uci.ics.hyracks.algebricks.core.algebra.expressions.ScalarFunctionCallExpression;
 import edu.uci.ics.hyracks.algebricks.core.algebra.functions.FunctionIdentifier;
+import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.AbstractBinaryJoinOperator;
+import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.AbstractLogicalOperator.ExecutionMode;
 import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.AssignOperator;
 import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.DataSourceScanOperator;
+import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.SelectOperator;
 import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.UnnestMapOperator;
 
 /**
@@ -50,7 +53,11 @@
     @Override
     public boolean analyzeFuncExprArgs(AbstractFunctionCallExpression funcExpr, List<AssignOperator> assigns,
             AccessMethodAnalysisContext analysisCtx) {
-        return AccessMethodUtils.analyzeFuncExprArgsForOneConstAndVar(funcExpr, analysisCtx);
+        boolean matches = AccessMethodUtils.analyzeFuncExprArgsForOneConstAndVar(funcExpr, analysisCtx);
+        if (!matches) {
+            matches = AccessMethodUtils.analyzeFuncExprArgsForTwoVars(funcExpr, analysisCtx);
+        }
+        return matches;
     }
 
     @Override
@@ -65,27 +72,71 @@
 
     @Override
     public boolean applySelectPlanTransformation(Mutable<ILogicalOperator> selectRef,
-            OptimizableOperatorSubTree subTree, Index index, AccessMethodAnalysisContext analysisCtx,
+            OptimizableOperatorSubTree subTree, Index chosenIndex, AccessMethodAnalysisContext analysisCtx,
             IOptimizationContext context) throws AlgebricksException {
-        Dataset dataset = subTree.dataset;
-        ARecordType recordType = subTree.recordType;
         // TODO: We can probably do something smarter here based on selectivity or MBR area.
-        // Pick the first expr optimizable by this index.
-        List<Integer> indexExprs = analysisCtx.getIndexExprs(index);
-        int firstExprIndex = indexExprs.get(0);
-        IOptimizableFuncExpr optFuncExpr = analysisCtx.matchedFuncExprs.get(firstExprIndex);
+        IOptimizableFuncExpr optFuncExpr = AccessMethodUtils.chooseFirstOptFuncExpr(chosenIndex, analysisCtx);
+        ILogicalOperator primaryIndexUnnestOp = createSecondaryToPrimaryPlan(subTree, null, chosenIndex, optFuncExpr,
+                false, false, context);
+        if (primaryIndexUnnestOp == null) {
+            return false;
+        }
+        // Replace the datasource scan with the new plan rooted at primaryIndexUnnestMap.
+        subTree.dataSourceScanRef.setValue(primaryIndexUnnestOp);
+        return true;
+    }
 
-        // Get the number of dimensions corresponding to the field indexed by
-        // chosenIndex.
+    @Override
+    public boolean applyJoinPlanTransformation(Mutable<ILogicalOperator> joinRef,
+            OptimizableOperatorSubTree leftSubTree, OptimizableOperatorSubTree rightSubTree, Index chosenIndex,
+            AccessMethodAnalysisContext analysisCtx, IOptimizationContext context) throws AlgebricksException {
+        // Determine if the index is applicable on the left or right side (if both, we arbitrarily prefer the left side).
+        Dataset dataset = analysisCtx.indexDatasetMap.get(chosenIndex);
+        // Determine probe and index subtrees based on chosen index.
+        OptimizableOperatorSubTree indexSubTree = null;
+        OptimizableOperatorSubTree probeSubTree = null;
+        if (leftSubTree.dataset != null && dataset.getDatasetName().equals(leftSubTree.dataset.getDatasetName())) {
+            indexSubTree = leftSubTree;
+            probeSubTree = rightSubTree;
+        } else if (rightSubTree.dataset != null
+                && dataset.getDatasetName().equals(rightSubTree.dataset.getDatasetName())) {
+            indexSubTree = rightSubTree;
+            probeSubTree = leftSubTree;
+        }
+        // TODO: We can probably do something smarter here based on selectivity or MBR area.
+        IOptimizableFuncExpr optFuncExpr = AccessMethodUtils.chooseFirstOptFuncExpr(chosenIndex, analysisCtx);
+        ILogicalOperator primaryIndexUnnestOp = createSecondaryToPrimaryPlan(indexSubTree, probeSubTree, chosenIndex,
+                optFuncExpr, true, true, context);
+        if (primaryIndexUnnestOp == null) {
+            return false;
+        }
+        indexSubTree.dataSourceScanRef.setValue(primaryIndexUnnestOp);
+        // Change join into a select with the same condition.
+        AbstractBinaryJoinOperator joinOp = (AbstractBinaryJoinOperator) joinRef.getValue();
+        SelectOperator topSelect = new SelectOperator(joinOp.getCondition());
+        topSelect.getInputs().add(indexSubTree.rootRef);
+        topSelect.setExecutionMode(ExecutionMode.LOCAL);
+        context.computeAndSetTypeEnvironmentForOperator(topSelect);
+        // Replace the original join with the new subtree rooted at the select op.
+        joinRef.setValue(topSelect);
+        return true;
+    }
+
+    private ILogicalOperator createSecondaryToPrimaryPlan(OptimizableOperatorSubTree indexSubTree,
+            OptimizableOperatorSubTree probeSubTree, Index chosenIndex, IOptimizableFuncExpr optFuncExpr,
+            boolean retainInput, boolean requiresBroadcast, IOptimizationContext context) throws AlgebricksException {
+        Dataset dataset = indexSubTree.dataset;
+        ARecordType recordType = indexSubTree.recordType;
+
+        // Get the number of dimensions corresponding to the field indexed by chosenIndex.
         Pair<IAType, Boolean> keyPairType = Index.getNonNullableKeyFieldType(optFuncExpr.getFieldName(0), recordType);
         IAType spatialType = keyPairType.first;
         int numDimensions = NonTaggedFormatUtil.getNumDimensions(spatialType.getTypeTag());
         int numSecondaryKeys = numDimensions * 2;
 
-        DataSourceScanOperator dataSourceScan = subTree.dataSourceScan;
-        // TODO: For now retainInput and requiresBroadcast are always false.
-        RTreeJobGenParams jobGenParams = new RTreeJobGenParams(index.getIndexName(), IndexType.RTREE,
-                dataset.getDatasetName(), false, false);
+        DataSourceScanOperator dataSourceScan = indexSubTree.dataSourceScan;
+        RTreeJobGenParams jobGenParams = new RTreeJobGenParams(chosenIndex.getIndexName(), IndexType.RTREE,
+                dataset.getDatasetName(), retainInput, requiresBroadcast);
         // A spatial object is serialized in the constant of the func expr we are optimizing.
         // The R-Tree expects as input an MBR represented with 1 field per dimension. 
         // Here we generate vars and funcs for extracting MBR fields from the constant into fields of a tuple (as the R-Tree expects them).
@@ -93,13 +144,14 @@
         ArrayList<LogicalVariable> keyVarList = new ArrayList<LogicalVariable>();
         // List of expressions for the assign.
         ArrayList<Mutable<ILogicalExpression>> keyExprList = new ArrayList<Mutable<ILogicalExpression>>();
+        ILogicalExpression searchKeyExpr = AccessMethodUtils.createSearchKeyExpr(optFuncExpr, indexSubTree,
+                probeSubTree);
         for (int i = 0; i < numSecondaryKeys; i++) {
             // The create MBR function "extracts" one field of an MBR around the given spatial object.
             AbstractFunctionCallExpression createMBR = new ScalarFunctionCallExpression(
                     FunctionUtils.getFunctionInfo(AsterixBuiltinFunctions.CREATE_MBR));
             // Spatial object is the constant from the func expr we are optimizing.
-            createMBR.getArguments().add(
-                    new MutableObject<ILogicalExpression>(new ConstantExpression(optFuncExpr.getConstantVal(0))));
+            createMBR.getArguments().add(new MutableObject<ILogicalExpression>(searchKeyExpr));
             // The number of dimensions.
             createMBR.getArguments().add(
                     new MutableObject<ILogicalExpression>(new ConstantExpression(new AsterixConstantValue(new AInt32(
@@ -117,26 +169,23 @@
 
         // Assign operator that "extracts" the MBR fields from the func-expr constant into a tuple.
         AssignOperator assignSearchKeys = new AssignOperator(keyVarList, keyExprList);
-        // Input to this assign is the EmptyTupleSource (which the dataSourceScan also must have had as input).
-        assignSearchKeys.getInputs().add(dataSourceScan.getInputs().get(0));
-        assignSearchKeys.setExecutionMode(dataSourceScan.getExecutionMode());
+        if (probeSubTree == null) {
+            // We are optimizing a selection query.
+            // Input to this assign is the EmptyTupleSource (which the dataSourceScan also must have had as input).
+            assignSearchKeys.getInputs().add(dataSourceScan.getInputs().get(0));
+            assignSearchKeys.setExecutionMode(dataSourceScan.getExecutionMode());
+        } else {
+            // We are optimizing a join, place the assign op top of the probe subtree.
+            assignSearchKeys.getInputs().add(probeSubTree.rootRef);
+        }
 
         UnnestMapOperator secondaryIndexUnnestOp = AccessMethodUtils.createSecondaryIndexUnnestMap(dataset, recordType,
-                index, assignSearchKeys, jobGenParams, context, false, false);
+                chosenIndex, assignSearchKeys, jobGenParams, context, false, retainInput);
         // Generate the rest of the upstream plan which feeds the search results into the primary index.
         UnnestMapOperator primaryIndexUnnestOp = AccessMethodUtils.createPrimaryIndexUnnestMap(dataSourceScan, dataset,
-                recordType, secondaryIndexUnnestOp, context, true, false, false);
-        // Replace the datasource scan with the new plan rooted at primaryIndexUnnestMap.
-        subTree.dataSourceScanRef.setValue(primaryIndexUnnestOp);
-        return true;
-    }
+                recordType, secondaryIndexUnnestOp, context, true, retainInput, false);
 
-    @Override
-    public boolean applyJoinPlanTransformation(Mutable<ILogicalOperator> joinRef,
-            OptimizableOperatorSubTree leftSubTree, OptimizableOperatorSubTree rightSubTree, Index chosenIndex,
-            AccessMethodAnalysisContext analysisCtx, IOptimizationContext context) throws AlgebricksException {
-        // TODO Implement this.
-        return false;
+        return primaryIndexUnnestOp;
     }
 
     @Override
diff --git a/asterix-app/src/test/resources/optimizerts/queries/rtree-index-join/spatial-intersect-point_01.aql b/asterix-app/src/test/resources/optimizerts/queries/rtree-index-join/spatial-intersect-point_01.aql
new file mode 100644
index 0000000..207b5c9
--- /dev/null
+++ b/asterix-app/src/test/resources/optimizerts/queries/rtree-index-join/spatial-intersect-point_01.aql
@@ -0,0 +1,33 @@
+/*
+ * Description    : Joins two datasets on the intersection of their point attributes.
+ *                  The dataset 'MyData1' has an RTree index, and we expect the 
+ *                  join to be transformed into an indexed nested-loop join.
+ * Success        : Yes
+ */
+
+drop dataverse test if exists;
+create dataverse test;
+use dataverse test;
+
+create type MyRecord as closed {
+  id: int32,
+  point: point,
+  kwds: string,
+  line1: line,
+  line2: line,
+  poly1: polygon,
+  poly2: polygon,
+  rec: rectangle
+}
+
+create dataset MyData1(MyRecord) partitioned by key id;
+create dataset MyData2(MyRecord) partitioned by key id;
+
+create index rtree_index on MyData1(point) type rtree;
+
+write output to nc1:"rttest/index-join_rtree-spatial-intersect-point.adm";
+
+for $a in dataset('MyData1')
+for $b in dataset('MyData2')
+where spatial-intersect($a.point, $b.point)
+return {"a": $a, "b": $b}
diff --git a/asterix-app/src/test/resources/optimizerts/queries/rtree-index-join/spatial-intersect-point_02.aql b/asterix-app/src/test/resources/optimizerts/queries/rtree-index-join/spatial-intersect-point_02.aql
new file mode 100644
index 0000000..e70bedf
--- /dev/null
+++ b/asterix-app/src/test/resources/optimizerts/queries/rtree-index-join/spatial-intersect-point_02.aql
@@ -0,0 +1,33 @@
+/*
+ * Description    : Joins two datasets on the intersection of their point attributes.
+ *                  The dataset 'MyData2' has an RTree index, and we expect the 
+ *                  join to be transformed into an indexed nested-loop join.
+ * Success        : Yes
+ */
+
+drop dataverse test if exists;
+create dataverse test;
+use dataverse test;
+
+create type MyRecord as closed {
+  id: int32,
+  point: point,
+  kwds: string,
+  line1: line,
+  line2: line,
+  poly1: polygon,
+  poly2: polygon,
+  rec: rectangle
+}
+
+create dataset MyData1(MyRecord) partitioned by key id;
+create dataset MyData2(MyRecord) partitioned by key id;
+
+create index rtree_index on MyData2(point) type rtree;
+
+write output to nc1:"rttest/rtree-index-join_spatial-intersect-point_02.adm";
+
+for $a in dataset('MyData1')
+for $b in dataset('MyData2')
+where spatial-intersect($a.point, $b.point)
+return {"a": $a, "b": $b}
diff --git a/asterix-app/src/test/resources/optimizerts/queries/rtree-index-join/spatial-intersect-point_03.aql b/asterix-app/src/test/resources/optimizerts/queries/rtree-index-join/spatial-intersect-point_03.aql
new file mode 100644
index 0000000..85fc22b
--- /dev/null
+++ b/asterix-app/src/test/resources/optimizerts/queries/rtree-index-join/spatial-intersect-point_03.aql
@@ -0,0 +1,32 @@
+/*
+ * Description    : Self-joins a dataset on the intersection of its point attribute.
+ *                  The dataset has an RTree index, and we expect the 
+ *                  join to be transformed into an indexed nested-loop join.
+ * Success        : Yes
+ */
+
+drop dataverse test if exists;
+create dataverse test;
+use dataverse test;
+
+create type MyRecord as closed {
+  id: int32,
+  point: point,
+  kwds: string,
+  line1: line,
+  line2: line,
+  poly1: polygon,
+  poly2: polygon,
+  rec: rectangle
+}
+
+create dataset MyData(MyRecord) partitioned by key id;
+
+create index rtree_index on MyData(point) type rtree;
+
+write output to nc1:"rttest/rtree-index-join_spatial-intersect-point_03.adm";
+
+for $a in dataset('MyData')
+for $b in dataset('MyData')
+where spatial-intersect($a.point, $b.point)
+return {"a": $a, "b": $b}
diff --git a/asterix-app/src/test/resources/optimizerts/results/rtree-index-join/spatial-intersect-point_01.plan b/asterix-app/src/test/resources/optimizerts/results/rtree-index-join/spatial-intersect-point_01.plan
new file mode 100644
index 0000000..7dd79e2
--- /dev/null
+++ b/asterix-app/src/test/resources/optimizerts/results/rtree-index-join/spatial-intersect-point_01.plan
@@ -0,0 +1,21 @@
+-- SINK_WRITE  |PARTITIONED|
+  -- RANDOM_MERGE_EXCHANGE  |PARTITIONED|
+    -- STREAM_PROJECT  |PARTITIONED|
+      -- ASSIGN  |PARTITIONED|
+        -- STREAM_SELECT  |LOCAL|
+          -- ASSIGN  |PARTITIONED|
+            -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
+              -- BTREE_SEARCH  |PARTITIONED|
+                -- ONE_TO_ONE_EXCHANGE  |LOCAL|
+                  -- STABLE_SORT [$$20(ASC)]  |LOCAL|
+                    -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
+                      -- STREAM_PROJECT  |PARTITIONED|
+                        -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
+                          -- RTREE_SEARCH  |PARTITIONED|
+                            -- BROADCAST_EXCHANGE  |PARTITIONED|
+                              -- ASSIGN  |UNPARTITIONED|
+                                -- ASSIGN  |PARTITIONED|
+                                  -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
+                                    -- DATASOURCE_SCAN  |PARTITIONED|
+                                      -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
+                                        -- EMPTY_TUPLE_SOURCE  |PARTITIONED|
diff --git a/asterix-app/src/test/resources/optimizerts/results/rtree-index-join/spatial-intersect-point_02.plan b/asterix-app/src/test/resources/optimizerts/results/rtree-index-join/spatial-intersect-point_02.plan
new file mode 100644
index 0000000..7dd79e2
--- /dev/null
+++ b/asterix-app/src/test/resources/optimizerts/results/rtree-index-join/spatial-intersect-point_02.plan
@@ -0,0 +1,21 @@
+-- SINK_WRITE  |PARTITIONED|
+  -- RANDOM_MERGE_EXCHANGE  |PARTITIONED|
+    -- STREAM_PROJECT  |PARTITIONED|
+      -- ASSIGN  |PARTITIONED|
+        -- STREAM_SELECT  |LOCAL|
+          -- ASSIGN  |PARTITIONED|
+            -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
+              -- BTREE_SEARCH  |PARTITIONED|
+                -- ONE_TO_ONE_EXCHANGE  |LOCAL|
+                  -- STABLE_SORT [$$20(ASC)]  |LOCAL|
+                    -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
+                      -- STREAM_PROJECT  |PARTITIONED|
+                        -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
+                          -- RTREE_SEARCH  |PARTITIONED|
+                            -- BROADCAST_EXCHANGE  |PARTITIONED|
+                              -- ASSIGN  |UNPARTITIONED|
+                                -- ASSIGN  |PARTITIONED|
+                                  -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
+                                    -- DATASOURCE_SCAN  |PARTITIONED|
+                                      -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
+                                        -- EMPTY_TUPLE_SOURCE  |PARTITIONED|
diff --git a/asterix-app/src/test/resources/optimizerts/results/rtree-index-join/spatial-intersect-point_03.plan b/asterix-app/src/test/resources/optimizerts/results/rtree-index-join/spatial-intersect-point_03.plan
new file mode 100644
index 0000000..7dd79e2
--- /dev/null
+++ b/asterix-app/src/test/resources/optimizerts/results/rtree-index-join/spatial-intersect-point_03.plan
@@ -0,0 +1,21 @@
+-- SINK_WRITE  |PARTITIONED|
+  -- RANDOM_MERGE_EXCHANGE  |PARTITIONED|
+    -- STREAM_PROJECT  |PARTITIONED|
+      -- ASSIGN  |PARTITIONED|
+        -- STREAM_SELECT  |LOCAL|
+          -- ASSIGN  |PARTITIONED|
+            -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
+              -- BTREE_SEARCH  |PARTITIONED|
+                -- ONE_TO_ONE_EXCHANGE  |LOCAL|
+                  -- STABLE_SORT [$$20(ASC)]  |LOCAL|
+                    -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
+                      -- STREAM_PROJECT  |PARTITIONED|
+                        -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
+                          -- RTREE_SEARCH  |PARTITIONED|
+                            -- BROADCAST_EXCHANGE  |PARTITIONED|
+                              -- ASSIGN  |UNPARTITIONED|
+                                -- ASSIGN  |PARTITIONED|
+                                  -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
+                                    -- DATASOURCE_SCAN  |PARTITIONED|
+                                      -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
+                                        -- EMPTY_TUPLE_SOURCE  |PARTITIONED|
diff --git a/asterix-app/src/test/resources/runtimets/queries/index-join/rtree-spatial-intersect-point.aql b/asterix-app/src/test/resources/runtimets/queries/index-join/rtree-spatial-intersect-point.aql
new file mode 100644
index 0000000..efee6cf
--- /dev/null
+++ b/asterix-app/src/test/resources/runtimets/queries/index-join/rtree-spatial-intersect-point.aql
@@ -0,0 +1,42 @@
+/*
+ * Description    : Joins two datasets on the intersection of their point attributes.
+ *                  The dataset 'MyData1' has an RTree index, and we expect the 
+ *                  join to be transformed into an indexed nested-loop join.
+ * Success        : Yes
+ */
+
+drop dataverse test if exists;
+create dataverse test;
+use dataverse test;
+
+create type MyRecord as closed {
+  id: int32,
+  point: point,
+  kwds: string,
+  line1: line,
+  line2: line,
+  poly1: polygon,
+  poly2: polygon,
+  rec: rectangle
+}
+
+create dataset MyData1(MyRecord) partitioned by key id;
+create dataset MyData2(MyRecord) partitioned by key id;
+
+load dataset MyData1
+using "edu.uci.ics.asterix.external.dataset.adapter.NCFileSystemAdapter"
+(("path"="nc1://data/spatial/spatialData.json"),("format"="adm")) pre-sorted;
+
+load dataset MyData2
+using "edu.uci.ics.asterix.external.dataset.adapter.NCFileSystemAdapter"
+(("path"="nc1://data/spatial/spatialData.json"),("format"="adm")) pre-sorted;
+
+create index rtree_index on MyData1(point) type rtree;
+
+write output to nc1:"rttest/index-join_rtree-spatial-intersect-point.adm";
+
+for $a in dataset('MyData1')
+for $b in dataset('MyData2')
+where spatial-intersect($a.point, $b.point) and $a.id != $b.id
+order by $a.id, $b.id
+return {"aid": $a.id, "bid": $b.id, "apt": $a.point, "bp": $b.point}
diff --git a/asterix-app/src/test/resources/runtimets/results/index-join/rtree-spatial-intersect-point.adm b/asterix-app/src/test/resources/runtimets/results/index-join/rtree-spatial-intersect-point.adm
new file mode 100644
index 0000000..6e8c011
--- /dev/null
+++ b/asterix-app/src/test/resources/runtimets/results/index-join/rtree-spatial-intersect-point.adm
@@ -0,0 +1,44 @@
+{ "aid": 1, "bid": 17, "apt": point("4.1,7.0"), "bp": point("4.1,7.0") }
+{ "aid": 3, "bid": 4, "apt": point("43.5083,-79.3007"), "bp": point("43.5083,-79.3007") }
+{ "aid": 3, "bid": 5, "apt": point("43.5083,-79.3007"), "bp": point("43.5083,-79.3007") }
+{ "aid": 3, "bid": 6, "apt": point("43.5083,-79.3007"), "bp": point("43.5083,-79.3007") }
+{ "aid": 3, "bid": 7, "apt": point("43.5083,-79.3007"), "bp": point("43.5083,-79.3007") }
+{ "aid": 3, "bid": 8, "apt": point("43.5083,-79.3007"), "bp": point("43.5083,-79.3007") }
+{ "aid": 4, "bid": 3, "apt": point("43.5083,-79.3007"), "bp": point("43.5083,-79.3007") }
+{ "aid": 4, "bid": 5, "apt": point("43.5083,-79.3007"), "bp": point("43.5083,-79.3007") }
+{ "aid": 4, "bid": 6, "apt": point("43.5083,-79.3007"), "bp": point("43.5083,-79.3007") }
+{ "aid": 4, "bid": 7, "apt": point("43.5083,-79.3007"), "bp": point("43.5083,-79.3007") }
+{ "aid": 4, "bid": 8, "apt": point("43.5083,-79.3007"), "bp": point("43.5083,-79.3007") }
+{ "aid": 5, "bid": 3, "apt": point("43.5083,-79.3007"), "bp": point("43.5083,-79.3007") }
+{ "aid": 5, "bid": 4, "apt": point("43.5083,-79.3007"), "bp": point("43.5083,-79.3007") }
+{ "aid": 5, "bid": 6, "apt": point("43.5083,-79.3007"), "bp": point("43.5083,-79.3007") }
+{ "aid": 5, "bid": 7, "apt": point("43.5083,-79.3007"), "bp": point("43.5083,-79.3007") }
+{ "aid": 5, "bid": 8, "apt": point("43.5083,-79.3007"), "bp": point("43.5083,-79.3007") }
+{ "aid": 6, "bid": 3, "apt": point("43.5083,-79.3007"), "bp": point("43.5083,-79.3007") }
+{ "aid": 6, "bid": 4, "apt": point("43.5083,-79.3007"), "bp": point("43.5083,-79.3007") }
+{ "aid": 6, "bid": 5, "apt": point("43.5083,-79.3007"), "bp": point("43.5083,-79.3007") }
+{ "aid": 6, "bid": 7, "apt": point("43.5083,-79.3007"), "bp": point("43.5083,-79.3007") }
+{ "aid": 6, "bid": 8, "apt": point("43.5083,-79.3007"), "bp": point("43.5083,-79.3007") }
+{ "aid": 7, "bid": 3, "apt": point("43.5083,-79.3007"), "bp": point("43.5083,-79.3007") }
+{ "aid": 7, "bid": 4, "apt": point("43.5083,-79.3007"), "bp": point("43.5083,-79.3007") }
+{ "aid": 7, "bid": 5, "apt": point("43.5083,-79.3007"), "bp": point("43.5083,-79.3007") }
+{ "aid": 7, "bid": 6, "apt": point("43.5083,-79.3007"), "bp": point("43.5083,-79.3007") }
+{ "aid": 7, "bid": 8, "apt": point("43.5083,-79.3007"), "bp": point("43.5083,-79.3007") }
+{ "aid": 8, "bid": 3, "apt": point("43.5083,-79.3007"), "bp": point("43.5083,-79.3007") }
+{ "aid": 8, "bid": 4, "apt": point("43.5083,-79.3007"), "bp": point("43.5083,-79.3007") }
+{ "aid": 8, "bid": 5, "apt": point("43.5083,-79.3007"), "bp": point("43.5083,-79.3007") }
+{ "aid": 8, "bid": 6, "apt": point("43.5083,-79.3007"), "bp": point("43.5083,-79.3007") }
+{ "aid": 8, "bid": 7, "apt": point("43.5083,-79.3007"), "bp": point("43.5083,-79.3007") }
+{ "aid": 15, "bid": 16, "apt": point("-2.0,3.0"), "bp": point("-2.0,3.0") }
+{ "aid": 15, "bid": 18, "apt": point("-2.0,3.0"), "bp": point("-2.0,3.0") }
+{ "aid": 15, "bid": 19, "apt": point("-2.0,3.0"), "bp": point("-2.0,3.0") }
+{ "aid": 16, "bid": 15, "apt": point("-2.0,3.0"), "bp": point("-2.0,3.0") }
+{ "aid": 16, "bid": 18, "apt": point("-2.0,3.0"), "bp": point("-2.0,3.0") }
+{ "aid": 16, "bid": 19, "apt": point("-2.0,3.0"), "bp": point("-2.0,3.0") }
+{ "aid": 17, "bid": 1, "apt": point("4.1,7.0"), "bp": point("4.1,7.0") }
+{ "aid": 18, "bid": 15, "apt": point("-2.0,3.0"), "bp": point("-2.0,3.0") }
+{ "aid": 18, "bid": 16, "apt": point("-2.0,3.0"), "bp": point("-2.0,3.0") }
+{ "aid": 18, "bid": 19, "apt": point("-2.0,3.0"), "bp": point("-2.0,3.0") }
+{ "aid": 19, "bid": 15, "apt": point("-2.0,3.0"), "bp": point("-2.0,3.0") }
+{ "aid": 19, "bid": 16, "apt": point("-2.0,3.0"), "bp": point("-2.0,3.0") }
+{ "aid": 19, "bid": 18, "apt": point("-2.0,3.0"), "bp": point("-2.0,3.0") }
\ No newline at end of file
diff --git a/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/declared/AqlMetadataProvider.java b/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/declared/AqlMetadataProvider.java
index b8f0258..e903a9c 100644
--- a/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/declared/AqlMetadataProvider.java
+++ b/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/declared/AqlMetadataProvider.java
@@ -32,7 +32,6 @@
 import edu.uci.ics.asterix.feed.operator.FeedMessageOperatorDescriptor;
 import edu.uci.ics.asterix.formats.base.IDataFormat;
 import edu.uci.ics.asterix.formats.nontagged.AqlBinaryComparatorFactoryProvider;
-import edu.uci.ics.asterix.formats.nontagged.AqlSerializerDeserializerProvider;
 import edu.uci.ics.asterix.formats.nontagged.AqlTypeTraitProvider;
 import edu.uci.ics.asterix.metadata.entities.Dataset;
 import edu.uci.ics.asterix.metadata.entities.ExternalDatasetDetails;
@@ -152,8 +151,8 @@
         String dataverseName = asid.getDataverseName();
         String datasetName = asid.getDatasetName();
         Index primaryIndex = metadata.getDatasetPrimaryIndex(dataverseName, datasetName);
-        return buildBtreeRuntime(jobSpec, outputVars, opSchema, typeEnv, metadata, context, false, datasetName,
-                dataset, primaryIndex.getIndexName(), null, null, true, true);
+        return buildBtreeRuntime(jobSpec, outputVars, opSchema, typeEnv, metadata, context, false, dataset,
+                primaryIndex.getIndexName(), null, null, true, true);
     }
 
     private Pair<IOperatorDescriptor, AlgebricksPartitionConstraint> buildExternalDatasetScan(JobSpecification jobSpec,
@@ -277,9 +276,9 @@
 
     public static Pair<IOperatorDescriptor, AlgebricksPartitionConstraint> buildBtreeRuntime(JobSpecification jobSpec,
             List<LogicalVariable> outputVars, IOperatorSchema opSchema, IVariableTypeEnvironment typeEnv,
-            AqlCompiledMetadataDeclarations metadata, JobGenContext context, boolean retainInput, String datasetName,
-            Dataset dataset, String indexName, int[] lowKeyFields, int[] highKeyFields, boolean lowKeyInclusive,
-            boolean highKeyInclusive) throws AlgebricksException {
+            AqlCompiledMetadataDeclarations metadata, JobGenContext context, boolean retainInput, Dataset dataset,
+            String indexName, int[] lowKeyFields, int[] highKeyFields, boolean lowKeyInclusive, boolean highKeyInclusive)
+            throws AlgebricksException {
         boolean isSecondary = true;
         Index primaryIndex = metadata.getDatasetPrimaryIndex(dataset.getDataverseName(), dataset.getDatasetName());
         if (primaryIndex != null) {
@@ -299,21 +298,15 @@
                 outputVars, keysStartIndex, numKeys, typeEnv, context);
         ITypeTraits[] typeTraits = null;
 
-	if (isSecondary) {
-        	typeTraits = JobGenHelper.variablesToTypeTraits(outputVars, keysStartIndex, numKeys, typeEnv,
-                    context);
+        if (isSecondary) {
+            typeTraits = JobGenHelper.variablesToTypeTraits(outputVars, keysStartIndex, numKeys, typeEnv, context);
         } else {
-        	typeTraits = JobGenHelper.variablesToTypeTraits(outputVars, keysStartIndex, numKeys + 1, typeEnv,
-                    context);
+            typeTraits = JobGenHelper.variablesToTypeTraits(outputVars, keysStartIndex, numKeys + 1, typeEnv, context);
         }
 
         IAsterixApplicationContextInfo appContext = (IAsterixApplicationContextInfo) context.getAppContext();
-        Pair<IFileSplitProvider, AlgebricksPartitionConstraint> spPc;
-        try {
-            spPc = metadata.splitProviderAndPartitionConstraintsForInternalOrFeedDataset(datasetName, indexName);
-        } catch (Exception e) {
-            throw new AlgebricksException(e);
-        }
+        Pair<IFileSplitProvider, AlgebricksPartitionConstraint> spPc = metadata
+                .splitProviderAndPartitionConstraintsForInternalOrFeedDataset(dataset.getDatasetName(), indexName);
         BTreeSearchOperatorDescriptor btreeSearchOp = new BTreeSearchOperatorDescriptor(jobSpec, outputRecDesc,
                 appContext.getStorageManagerInterface(), appContext.getIndexRegistryProvider(), spPc.first, typeTraits,
                 comparatorFactories, lowKeyFields, highKeyFields, lowKeyInclusive, highKeyInclusive,
@@ -321,35 +314,19 @@
         return new Pair<IOperatorDescriptor, AlgebricksPartitionConstraint>(btreeSearchOp, spPc.second);
     }
 
-    @SuppressWarnings("rawtypes")
-    public static Pair<IOperatorDescriptor, AlgebricksPartitionConstraint> buildRtreeRuntime(
-            AqlCompiledMetadataDeclarations metadata, JobGenContext context, JobSpecification jobSpec,
-            String datasetName, Dataset dataset, String indexName, int[] keyFields) throws AlgebricksException {
+    public static Pair<IOperatorDescriptor, AlgebricksPartitionConstraint> buildRtreeRuntime(JobSpecification jobSpec,
+            List<LogicalVariable> outputVars, IOperatorSchema opSchema, IVariableTypeEnvironment typeEnv,
+            AqlCompiledMetadataDeclarations metadata, JobGenContext context, boolean retainInput, Dataset dataset,
+            String indexName, int[] keyFields) throws AlgebricksException {
         ARecordType recType = (ARecordType) metadata.findType(dataset.getItemTypeName());
-        boolean isSecondary = true;
-        Index primaryIndex = metadata.getDatasetPrimaryIndex(dataset.getDataverseName(), dataset.getDatasetName());
-        if (primaryIndex != null) {
-            isSecondary = !indexName.equals(primaryIndex.getIndexName());
-        }
-
         int numPrimaryKeys = DatasetUtils.getPartitioningKeys(dataset).size();
-        ISerializerDeserializer[] recordFields;
-        IBinaryComparatorFactory[] comparatorFactories;
-        ITypeTraits[] typeTraits;
-        IPrimitiveValueProviderFactory[] valueProviderFactories;
-        int numSecondaryKeys = 0;
-        int numNestedSecondaryKeyFields = 0;
-        int i = 0;
-        if (!isSecondary) {
-            throw new AlgebricksException("R-tree can only be used as a secondary index");
-        }
         Index secondaryIndex = metadata.getIndex(dataset.getDataverseName(), dataset.getDatasetName(), indexName);
         if (secondaryIndex == null) {
             throw new AlgebricksException("Code generation error: no index " + indexName + " for dataset "
-                    + datasetName);
+                    + dataset.getDatasetName());
         }
         List<String> secondaryKeyFields = secondaryIndex.getKeyFieldNames();
-        numSecondaryKeys = secondaryKeyFields.size();
+        int numSecondaryKeys = secondaryKeyFields.size();
         if (numSecondaryKeys != 1) {
             throw new AlgebricksException(
                     "Cannot use "
@@ -361,42 +338,27 @@
         if (keyType == null) {
             throw new AlgebricksException("Could not find field " + secondaryKeyFields.get(0) + " in the schema.");
         }
-        int dimension = NonTaggedFormatUtil.getNumDimensions(keyType.getTypeTag());
-        numNestedSecondaryKeyFields = dimension * 2;
-
-        int numFields = numNestedSecondaryKeyFields + numPrimaryKeys;
-        recordFields = new ISerializerDeserializer[numFields];
-        typeTraits = new ITypeTraits[numFields];
-        comparatorFactories = new IBinaryComparatorFactory[numNestedSecondaryKeyFields];
-        valueProviderFactories = new IPrimitiveValueProviderFactory[numNestedSecondaryKeyFields];
-
-        IAType nestedKeyType = NonTaggedFormatUtil.getNestedSpatialType(keyType.getTypeTag());
-        for (i = 0; i < numNestedSecondaryKeyFields; i++) {
-            ISerializerDeserializer keySerde = AqlSerializerDeserializerProvider.INSTANCE
-                    .getSerializerDeserializer(nestedKeyType);
-            recordFields[i] = keySerde;
-            comparatorFactories[i] = AqlBinaryComparatorFactoryProvider.INSTANCE.getBinaryComparatorFactory(
-                    nestedKeyType, true);
-            typeTraits[i] = AqlTypeTraitProvider.INSTANCE.getTypeTrait(nestedKeyType);
+        int numDimensions = NonTaggedFormatUtil.getNumDimensions(keyType.getTypeTag());
+        int numNestedSecondaryKeyFields = numDimensions * 2;
+        IPrimitiveValueProviderFactory[] valueProviderFactories = new IPrimitiveValueProviderFactory[numNestedSecondaryKeyFields];
+        for (int i = 0; i < numNestedSecondaryKeyFields; i++) {
             valueProviderFactories[i] = AqlPrimitiveValueProviderFactory.INSTANCE;
         }
-
-        List<String> partitioningKeys = DatasetUtils.getPartitioningKeys(dataset);
-        for (String partitioningKey : partitioningKeys) {
-            IAType type = recType.getFieldType(partitioningKey);
-            ISerializerDeserializer keySerde = AqlSerializerDeserializerProvider.INSTANCE
-                    .getSerializerDeserializer(type);
-            recordFields[i] = keySerde;
-            typeTraits[i] = AqlTypeTraitProvider.INSTANCE.getTypeTrait(type);
-            ++i;
+        RecordDescriptor outputRecDesc = JobGenHelper.mkRecordDescriptor(typeEnv, opSchema, context);
+        int keysStartIndex = outputRecDesc.getFieldCount() - numNestedSecondaryKeyFields - numPrimaryKeys;
+        if (retainInput) {
+            keysStartIndex -= numNestedSecondaryKeyFields;
         }
+        IBinaryComparatorFactory[] comparatorFactories = JobGenHelper.variablesToAscBinaryComparatorFactories(
+                outputVars, keysStartIndex, numNestedSecondaryKeyFields, typeEnv, context);
+        ITypeTraits[] typeTraits = JobGenHelper.variablesToTypeTraits(outputVars, keysStartIndex,
+                numNestedSecondaryKeyFields, typeEnv, context);
         IAsterixApplicationContextInfo appContext = (IAsterixApplicationContextInfo) context.getAppContext();
-        RecordDescriptor recDesc = new RecordDescriptor(recordFields);
         Pair<IFileSplitProvider, AlgebricksPartitionConstraint> spPc = metadata
-                .splitProviderAndPartitionConstraintsForInternalOrFeedDataset(datasetName, indexName);
-        RTreeSearchOperatorDescriptor rtreeSearchOp = new RTreeSearchOperatorDescriptor(jobSpec, recDesc,
+                .splitProviderAndPartitionConstraintsForInternalOrFeedDataset(dataset.getDatasetName(), indexName);
+        RTreeSearchOperatorDescriptor rtreeSearchOp = new RTreeSearchOperatorDescriptor(jobSpec, outputRecDesc,
                 appContext.getStorageManagerInterface(), appContext.getIndexRegistryProvider(), spPc.first, typeTraits,
-                comparatorFactories, keyFields, new RTreeDataflowHelperFactory(valueProviderFactories), false,
+                comparatorFactories, keyFields, new RTreeDataflowHelperFactory(valueProviderFactories), retainInput,
                 NoOpOperationCallbackProvider.INSTANCE);
         return new Pair<IOperatorDescriptor, AlgebricksPartitionConstraint>(rtreeSearchOp, spPc.second);
     }