Merged asterix_stabilization r811:r842.

git-svn-id: https://asterixdb.googlecode.com/svn/branches/asterix_inline_vars@843 eaa15691-b419-025a-1212-ee371bd00084
diff --git a/asterix-algebra/src/main/java/edu/uci/ics/asterix/algebra/operators/physical/BTreeSearchPOperator.java b/asterix-algebra/src/main/java/edu/uci/ics/asterix/algebra/operators/physical/BTreeSearchPOperator.java
index dad8d0a..e653b82 100644
--- a/asterix-algebra/src/main/java/edu/uci/ics/asterix/algebra/operators/physical/BTreeSearchPOperator.java
+++ b/asterix-algebra/src/main/java/edu/uci/ics/asterix/algebra/operators/physical/BTreeSearchPOperator.java
@@ -11,6 +11,7 @@
 import edu.uci.ics.asterix.optimizer.rules.am.BTreeJobGenParams;
 import edu.uci.ics.hyracks.algebricks.common.constraints.AlgebricksPartitionConstraint;
 import edu.uci.ics.hyracks.algebricks.common.exceptions.AlgebricksException;
+import edu.uci.ics.hyracks.algebricks.common.utils.ListSet;
 import edu.uci.ics.hyracks.algebricks.common.utils.Pair;
 import edu.uci.ics.hyracks.algebricks.core.algebra.base.IHyracksJobBuilder;
 import edu.uci.ics.hyracks.algebricks.core.algebra.base.ILogicalExpression;
@@ -23,8 +24,18 @@
 import edu.uci.ics.hyracks.algebricks.core.algebra.functions.FunctionIdentifier;
 import edu.uci.ics.hyracks.algebricks.core.algebra.metadata.IDataSourceIndex;
 import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.IOperatorSchema;
+import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.OrderOperator.IOrder.OrderKind;
 import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.UnnestMapOperator;
 import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.visitors.VariableUtilities;
+import edu.uci.ics.hyracks.algebricks.core.algebra.properties.BroadcastPartitioningProperty;
+import edu.uci.ics.hyracks.algebricks.core.algebra.properties.ILocalStructuralProperty;
+import edu.uci.ics.hyracks.algebricks.core.algebra.properties.IPartitioningRequirementsCoordinator;
+import edu.uci.ics.hyracks.algebricks.core.algebra.properties.IPhysicalPropertiesVector;
+import edu.uci.ics.hyracks.algebricks.core.algebra.properties.LocalOrderProperty;
+import edu.uci.ics.hyracks.algebricks.core.algebra.properties.OrderColumn;
+import edu.uci.ics.hyracks.algebricks.core.algebra.properties.PhysicalRequirements;
+import edu.uci.ics.hyracks.algebricks.core.algebra.properties.StructuralPropertiesVector;
+import edu.uci.ics.hyracks.algebricks.core.algebra.properties.UnorderedPartitionedProperty;
 import edu.uci.ics.hyracks.algebricks.core.jobgen.impl.JobGenContext;
 import edu.uci.ics.hyracks.api.dataflow.IOperatorDescriptor;
 
@@ -33,8 +44,16 @@
  */
 public class BTreeSearchPOperator extends IndexSearchPOperator {
 
-    public BTreeSearchPOperator(IDataSourceIndex<String, AqlSourceId> idx, boolean requiresBroadcast) {
+    private final List<LogicalVariable> lowKeyVarList;
+    private final List<LogicalVariable> highKeyVarList;
+    private boolean isPrimaryIndex;
+
+    public BTreeSearchPOperator(IDataSourceIndex<String, AqlSourceId> idx, boolean requiresBroadcast,
+            boolean isPrimaryIndex, List<LogicalVariable> lowKeyVarList, List<LogicalVariable> highKeyVarList) {
         super(idx, requiresBroadcast);
+        this.isPrimaryIndex = isPrimaryIndex;
+        this.lowKeyVarList = lowKeyVarList;
+        this.highKeyVarList = highKeyVarList;
     }
 
     @Override
@@ -71,12 +90,39 @@
         }
         Pair<IOperatorDescriptor, AlgebricksPartitionConstraint> btreeSearch = AqlMetadataProvider.buildBtreeRuntime(
                 builder.getJobSpec(), outputVars, opSchema, typeEnv, metadata, context, jobGenParams.getRetainInput(),
-                jobGenParams.getDatasetName(), dataset, jobGenParams.getIndexName(), lowKeyIndexes, highKeyIndexes,
-                jobGenParams.isLowKeyInclusive(), jobGenParams.isHighKeyInclusive());
+                dataset, jobGenParams.getIndexName(), lowKeyIndexes, highKeyIndexes, jobGenParams.isLowKeyInclusive(),
+                jobGenParams.isHighKeyInclusive());
         builder.contributeHyracksOperator(unnestMap, btreeSearch.first);
         builder.contributeAlgebricksPartitionConstraint(btreeSearch.first, btreeSearch.second);
 
         ILogicalOperator srcExchange = unnestMap.getInputs().get(0).getValue();
         builder.contributeGraphEdge(srcExchange, 0, unnestMap, 0);
     }
+    
+    public PhysicalRequirements getRequiredPropertiesForChildren(ILogicalOperator op,
+            IPhysicalPropertiesVector reqdByParent) {
+        if (requiresBroadcast) {
+            if (isPrimaryIndex) {
+                // For primary indexes, we require re-partitioning on the primary key, and not a broadcast.
+                // Also, add a local sorting property to enforce a sort before the primary-index operator.
+                StructuralPropertiesVector[] pv = new StructuralPropertiesVector[1];
+                ListSet<LogicalVariable> searchKeyVars = new ListSet<LogicalVariable>();
+                searchKeyVars.addAll(lowKeyVarList);
+                searchKeyVars.addAll(highKeyVarList);
+                List<ILocalStructuralProperty> propsLocal = new ArrayList<ILocalStructuralProperty>();
+                for (LogicalVariable orderVar : searchKeyVars) {
+                    propsLocal.add(new LocalOrderProperty(new OrderColumn(orderVar, OrderKind.ASC)));
+                }
+                pv[0] = new StructuralPropertiesVector(new UnorderedPartitionedProperty(searchKeyVars, null),
+                        propsLocal);
+                return new PhysicalRequirements(pv, IPartitioningRequirementsCoordinator.NO_COORDINATION);
+            } else {
+                StructuralPropertiesVector[] pv = new StructuralPropertiesVector[1];
+                pv[0] = new StructuralPropertiesVector(new BroadcastPartitioningProperty(null), null);
+                return new PhysicalRequirements(pv, IPartitioningRequirementsCoordinator.NO_COORDINATION);
+            }
+        } else {
+            return super.getRequiredPropertiesForChildren(op, reqdByParent);
+        }
+    }
 }
\ No newline at end of file
diff --git a/asterix-algebra/src/main/java/edu/uci/ics/asterix/algebra/operators/physical/IndexSearchPOperator.java b/asterix-algebra/src/main/java/edu/uci/ics/asterix/algebra/operators/physical/IndexSearchPOperator.java
index 65225ca..f6c926b 100644
--- a/asterix-algebra/src/main/java/edu/uci/ics/asterix/algebra/operators/physical/IndexSearchPOperator.java
+++ b/asterix-algebra/src/main/java/edu/uci/ics/asterix/algebra/operators/physical/IndexSearchPOperator.java
@@ -23,8 +23,8 @@
  */
 public abstract class IndexSearchPOperator extends AbstractScanPOperator {
 
-    private final IDataSourceIndex<String, AqlSourceId> idx;
-    private final boolean requiresBroadcast;
+    protected final IDataSourceIndex<String, AqlSourceId> idx;
+    protected final boolean requiresBroadcast;
 
     public IndexSearchPOperator(IDataSourceIndex<String, AqlSourceId> idx, boolean requiresBroadcast) {
         this.idx = idx;
diff --git a/asterix-algebra/src/main/java/edu/uci/ics/asterix/algebra/operators/physical/InvertedIndexPOperator.java b/asterix-algebra/src/main/java/edu/uci/ics/asterix/algebra/operators/physical/InvertedIndexPOperator.java
index ae2559b..60c4155 100644
--- a/asterix-algebra/src/main/java/edu/uci/ics/asterix/algebra/operators/physical/InvertedIndexPOperator.java
+++ b/asterix-algebra/src/main/java/edu/uci/ics/asterix/algebra/operators/physical/InvertedIndexPOperator.java
@@ -102,8 +102,7 @@
             SearchModifierType searchModifierType, IAlgebricksConstantValue similarityThreshold)
             throws AlgebricksException {
         IAObject simThresh = ((AsterixConstantValue) similarityThreshold).getObject();
-        String itemTypeName = dataset.getItemTypeName();
-        IAType itemType = metadata.findType(itemTypeName);
+        IAType itemType = metadata.findType(dataset.getItemTypeName());
         int numPrimaryKeys = DatasetUtils.getPartitioningKeys(dataset).size();
         Index secondaryIndex = metadata.getIndex(dataset.getDataverseName(), dataset.getDatasetName(), indexName);
         if (secondaryIndex == null) {
diff --git a/asterix-algebra/src/main/java/edu/uci/ics/asterix/algebra/operators/physical/RTreeSearchPOperator.java b/asterix-algebra/src/main/java/edu/uci/ics/asterix/algebra/operators/physical/RTreeSearchPOperator.java
index 27a477c..ca973f9 100644
--- a/asterix-algebra/src/main/java/edu/uci/ics/asterix/algebra/operators/physical/RTreeSearchPOperator.java
+++ b/asterix-algebra/src/main/java/edu/uci/ics/asterix/algebra/operators/physical/RTreeSearchPOperator.java
@@ -1,5 +1,8 @@
 package edu.uci.ics.asterix.algebra.operators.physical;
 
+import java.util.ArrayList;
+import java.util.List;
+
 import edu.uci.ics.asterix.metadata.declared.AqlCompiledMetadataDeclarations;
 import edu.uci.ics.asterix.metadata.declared.AqlMetadataProvider;
 import edu.uci.ics.asterix.metadata.declared.AqlSourceId;
@@ -13,12 +16,15 @@
 import edu.uci.ics.hyracks.algebricks.core.algebra.base.ILogicalExpression;
 import edu.uci.ics.hyracks.algebricks.core.algebra.base.ILogicalOperator;
 import edu.uci.ics.hyracks.algebricks.core.algebra.base.LogicalExpressionTag;
+import edu.uci.ics.hyracks.algebricks.core.algebra.base.LogicalVariable;
 import edu.uci.ics.hyracks.algebricks.core.algebra.base.PhysicalOperatorTag;
 import edu.uci.ics.hyracks.algebricks.core.algebra.expressions.AbstractFunctionCallExpression;
+import edu.uci.ics.hyracks.algebricks.core.algebra.expressions.IVariableTypeEnvironment;
 import edu.uci.ics.hyracks.algebricks.core.algebra.functions.FunctionIdentifier;
 import edu.uci.ics.hyracks.algebricks.core.algebra.metadata.IDataSourceIndex;
 import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.IOperatorSchema;
 import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.UnnestMapOperator;
+import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.visitors.VariableUtilities;
 import edu.uci.ics.hyracks.algebricks.core.jobgen.impl.JobGenContext;
 import edu.uci.ics.hyracks.api.dataflow.IOperatorDescriptor;
 
@@ -50,20 +56,21 @@
         if (!funcIdent.equals(AsterixBuiltinFunctions.INDEX_SEARCH)) {
             return;
         }
-
         RTreeJobGenParams jobGenParams = new RTreeJobGenParams();
         jobGenParams.readFromFuncArgs(unnestFuncExpr.getArguments());
-
         int[] keyIndexes = getKeyIndexes(jobGenParams.getKeyVarList(), inputSchemas);
         AqlMetadataProvider mp = (AqlMetadataProvider) context.getMetadataProvider();
         AqlCompiledMetadataDeclarations metadata = mp.getMetadataDeclarations();
         Dataset dataset = metadata.findDataset(jobGenParams.getDatasetName());
-        if (dataset == null) {
-            throw new AlgebricksException("Unknown dataset " + jobGenParams.getDatasetName());
+        IVariableTypeEnvironment typeEnv = context.getTypeEnvironment(unnestMap);
+        List<LogicalVariable> outputVars = unnestMap.getVariables();
+        if (jobGenParams.getRetainInput()) {
+            outputVars = new ArrayList<LogicalVariable>();
+            VariableUtilities.getLiveVariables(unnestMap, outputVars);
         }
         Pair<IOperatorDescriptor, AlgebricksPartitionConstraint> rtreeSearch = AqlMetadataProvider.buildRtreeRuntime(
-                metadata, context, builder.getJobSpec(), jobGenParams.getDatasetName(), dataset,
-                jobGenParams.getIndexName(), keyIndexes);
+                builder.getJobSpec(), outputVars, opSchema, typeEnv, metadata, context, jobGenParams.getRetainInput(),
+                dataset, jobGenParams.getIndexName(), keyIndexes);
         builder.contributeHyracksOperator(unnestMap, rtreeSearch.first);
         builder.contributeAlgebricksPartitionConstraint(rtreeSearch.first, rtreeSearch.second);
         ILogicalOperator srcExchange = unnestMap.getInputs().get(0).getValue();
diff --git a/asterix-algebra/src/main/java/edu/uci/ics/asterix/optimizer/base/RuleCollections.java b/asterix-algebra/src/main/java/edu/uci/ics/asterix/optimizer/base/RuleCollections.java
index 097fea6..2fa2dc9 100644
--- a/asterix-algebra/src/main/java/edu/uci/ics/asterix/optimizer/base/RuleCollections.java
+++ b/asterix-algebra/src/main/java/edu/uci/ics/asterix/optimizer/base/RuleCollections.java
@@ -218,10 +218,11 @@
         physicalRewritesAllLevels.add(new PullPositionalVariableFromUnnestRule());
         physicalRewritesAllLevels.add(new PushProjectDownRule());
         physicalRewritesAllLevels.add(new InsertProjectBeforeUnionRule());
-                
         physicalRewritesAllLevels.add(new InlineSingleReferenceVariablesRule());
         physicalRewritesAllLevels.add(new RemoveUnusedAssignAndAggregateRule());
         physicalRewritesAllLevels.add(new ConsolidateAssignsRule());
+        // After adding projects, we may need need to set physical operators again.
+        physicalRewritesAllLevels.add(new SetAlgebricksPhysicalOperatorsRule());
         return physicalRewritesAllLevels;
     }
 
diff --git a/asterix-algebra/src/main/java/edu/uci/ics/asterix/optimizer/rules/SetAsterixPhysicalOperatorsRule.java b/asterix-algebra/src/main/java/edu/uci/ics/asterix/optimizer/rules/SetAsterixPhysicalOperatorsRule.java
index 74f790f..f57cfb4 100644
--- a/asterix-algebra/src/main/java/edu/uci/ics/asterix/optimizer/rules/SetAsterixPhysicalOperatorsRule.java
+++ b/asterix-algebra/src/main/java/edu/uci/ics/asterix/optimizer/rules/SetAsterixPhysicalOperatorsRule.java
@@ -14,6 +14,7 @@
 import edu.uci.ics.asterix.metadata.declared.AqlSourceId;
 import edu.uci.ics.asterix.om.functions.AsterixBuiltinFunctions;
 import edu.uci.ics.asterix.optimizer.rules.am.AccessMethodJobGenParams;
+import edu.uci.ics.asterix.optimizer.rules.am.BTreeJobGenParams;
 import edu.uci.ics.hyracks.algebricks.common.exceptions.AlgebricksException;
 import edu.uci.ics.hyracks.algebricks.common.exceptions.NotImplementedException;
 import edu.uci.ics.hyracks.algebricks.common.utils.Pair;
@@ -161,7 +162,11 @@
                         boolean requiresBroadcast = jobGenParams.getRequiresBroadcast();
                         switch (indexType) {
                             case BTREE: {
-                                op.setPhysicalOperator(new BTreeSearchPOperator(dsi, requiresBroadcast));
+                                BTreeJobGenParams btreeJobGenParams = new BTreeJobGenParams();
+                                btreeJobGenParams.readFromFuncArgs(f.getArguments());
+                                op.setPhysicalOperator(new BTreeSearchPOperator(dsi, requiresBroadcast,
+                                        btreeJobGenParams.isPrimaryIndex(), btreeJobGenParams.getLowKeyVarList(),
+                                        btreeJobGenParams.getHighKeyVarList()));
                                 break;
                             }
                             case RTREE: {
diff --git a/asterix-algebra/src/main/java/edu/uci/ics/asterix/optimizer/rules/am/AccessMethodJobGenParams.java b/asterix-algebra/src/main/java/edu/uci/ics/asterix/optimizer/rules/am/AccessMethodJobGenParams.java
index af30163..93099e9 100644
--- a/asterix-algebra/src/main/java/edu/uci/ics/asterix/optimizer/rules/am/AccessMethodJobGenParams.java
+++ b/asterix-algebra/src/main/java/edu/uci/ics/asterix/optimizer/rules/am/AccessMethodJobGenParams.java
@@ -23,7 +23,8 @@
     protected String datasetName;
     protected boolean retainInput;
     protected boolean requiresBroadcast;
-
+    protected boolean isPrimaryIndex;
+    
     private final int NUM_PARAMS = 5;
 
     public AccessMethodJobGenParams() {
@@ -36,6 +37,7 @@
         this.datasetName = datasetName;
         this.retainInput = retainInput;
         this.requiresBroadcast = requiresBroadcast;
+        this.isPrimaryIndex = datasetName.equals(indexName);
     }
 
     public void writeToFuncArgs(List<Mutable<ILogicalExpression>> funcArgs) {
@@ -52,6 +54,7 @@
         datasetName = AccessMethodUtils.getStringConstant(funcArgs.get(2));
         retainInput = AccessMethodUtils.getBooleanConstant(funcArgs.get(3));
         requiresBroadcast = AccessMethodUtils.getBooleanConstant(funcArgs.get(4));
+        isPrimaryIndex = datasetName.equals(indexName);
     }
 
     public String getIndexName() {
@@ -100,4 +103,8 @@
     protected int getNumParams() {
         return NUM_PARAMS;
     }
+    
+    public boolean isPrimaryIndex() {
+        return isPrimaryIndex;
+    }
 }
diff --git a/asterix-algebra/src/main/java/edu/uci/ics/asterix/optimizer/rules/am/AccessMethodUtils.java b/asterix-algebra/src/main/java/edu/uci/ics/asterix/optimizer/rules/am/AccessMethodUtils.java
index cd3712d..24bd8ed 100644
--- a/asterix-algebra/src/main/java/edu/uci/ics/asterix/optimizer/rules/am/AccessMethodUtils.java
+++ b/asterix-algebra/src/main/java/edu/uci/ics/asterix/optimizer/rules/am/AccessMethodUtils.java
@@ -109,6 +109,24 @@
         analysisCtx.matchedFuncExprs.add(new OptimizableFuncExpr(funcExpr, fieldVar, constFilterVal));
         return true;
     }
+    
+    public static boolean analyzeFuncExprArgsForTwoVars(AbstractFunctionCallExpression funcExpr,
+            AccessMethodAnalysisContext analysisCtx) {
+        LogicalVariable fieldVar1 = null;
+        LogicalVariable fieldVar2 = null;
+        ILogicalExpression arg1 = funcExpr.getArguments().get(0).getValue();
+        ILogicalExpression arg2 = funcExpr.getArguments().get(1).getValue();
+        if (arg1.getExpressionTag() == LogicalExpressionTag.VARIABLE
+                && arg2.getExpressionTag() == LogicalExpressionTag.VARIABLE) {
+            fieldVar1 = ((VariableReferenceExpression) arg1).getVariableReference();
+            fieldVar2 = ((VariableReferenceExpression) arg2).getVariableReference();
+        } else {
+            return false;
+        }
+        analysisCtx.matchedFuncExprs.add(new OptimizableFuncExpr(funcExpr,
+                new LogicalVariable[] { fieldVar1, fieldVar2 }, null));
+        return true;
+    }
 
     public static int getNumSecondaryKeys(Index index, ARecordType recordType) throws AlgebricksException {
         switch (index.getIndexType()) {
@@ -190,6 +208,35 @@
         return primaryKeyVars;
     }
 
+    /**
+     * Returns the search key expression which feeds a secondary-index search. If we are optimizing a selection query then this method returns
+     * the a ConstantExpression from the first constant value in the optimizable function expression.
+     * If we are optimizing a join, then this method returns the VariableReferenceExpression that should feed the secondary index probe.
+     */
+    public static ILogicalExpression createSearchKeyExpr(IOptimizableFuncExpr optFuncExpr,
+            OptimizableOperatorSubTree indexSubTree, OptimizableOperatorSubTree probeSubTree) {
+        if (probeSubTree == null) {
+            // We are optimizing a selection query. Search key is a constant.
+            return new ConstantExpression(optFuncExpr.getConstantVal(0));
+        } else {
+            // We are optimizing a join query. Determine which variable feeds the secondary index. 
+            if (optFuncExpr.getOperatorSubTree(0) == probeSubTree) {
+                return new VariableReferenceExpression(optFuncExpr.getLogicalVar(0));
+            } else {
+                return new VariableReferenceExpression(optFuncExpr.getLogicalVar(1));
+            }
+        }
+    }
+
+    /**
+     *  Returns the first expr optimizable by this index.
+     */
+    public static IOptimizableFuncExpr chooseFirstOptFuncExpr(Index chosenIndex, AccessMethodAnalysisContext analysisCtx) {
+        List<Integer> indexExprs = analysisCtx.getIndexExprs(chosenIndex);
+        int firstExprIndex = indexExprs.get(0);
+        return analysisCtx.matchedFuncExprs.get(firstExprIndex);
+    }
+    
     public static UnnestMapOperator createSecondaryIndexUnnestMap(Dataset dataset, ARecordType recordType, Index index,
             ILogicalOperator inputOp, AccessMethodJobGenParams jobGenParams, IOptimizationContext context,
             boolean outputPrimaryKeysOnly, boolean retainInput) throws AlgebricksException {
diff --git a/asterix-algebra/src/main/java/edu/uci/ics/asterix/optimizer/rules/am/BTreeAccessMethod.java b/asterix-algebra/src/main/java/edu/uci/ics/asterix/optimizer/rules/am/BTreeAccessMethod.java
index b6103b3..15c216d 100644
--- a/asterix-algebra/src/main/java/edu/uci/ics/asterix/optimizer/rules/am/BTreeAccessMethod.java
+++ b/asterix-algebra/src/main/java/edu/uci/ics/asterix/optimizer/rules/am/BTreeAccessMethod.java
@@ -22,13 +22,14 @@
 import edu.uci.ics.hyracks.algebricks.core.algebra.base.LogicalExpressionTag;
 import edu.uci.ics.hyracks.algebricks.core.algebra.base.LogicalVariable;
 import edu.uci.ics.hyracks.algebricks.core.algebra.expressions.AbstractFunctionCallExpression;
-import edu.uci.ics.hyracks.algebricks.core.algebra.expressions.ConstantExpression;
-import edu.uci.ics.hyracks.algebricks.core.algebra.expressions.IAlgebricksConstantValue;
+import edu.uci.ics.hyracks.algebricks.core.algebra.expressions.IndexedNLJoinExpressionAnnotation;
 import edu.uci.ics.hyracks.algebricks.core.algebra.expressions.ScalarFunctionCallExpression;
+import edu.uci.ics.hyracks.algebricks.core.algebra.expressions.VariableReferenceExpression;
 import edu.uci.ics.hyracks.algebricks.core.algebra.functions.AlgebricksBuiltinFunctions;
 import edu.uci.ics.hyracks.algebricks.core.algebra.functions.AlgebricksBuiltinFunctions.ComparisonKind;
 import edu.uci.ics.hyracks.algebricks.core.algebra.functions.FunctionIdentifier;
 import edu.uci.ics.hyracks.algebricks.core.algebra.functions.IFunctionInfo;
+import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.AbstractBinaryJoinOperator;
 import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.AbstractLogicalOperator;
 import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.AbstractLogicalOperator.ExecutionMode;
 import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.AssignOperator;
@@ -70,7 +71,11 @@
     @Override
     public boolean analyzeFuncExprArgs(AbstractFunctionCallExpression funcExpr, List<AssignOperator> assigns,
             AccessMethodAnalysisContext analysisCtx) {
-        return AccessMethodUtils.analyzeFuncExprArgsForOneConstAndVar(funcExpr, analysisCtx);
+        boolean matches = AccessMethodUtils.analyzeFuncExprArgsForOneConstAndVar(funcExpr, analysisCtx);
+        if (!matches) {
+            matches = AccessMethodUtils.analyzeFuncExprArgsForTwoVars(funcExpr, analysisCtx);
+        }
+        return matches;
     }
 
     @Override
@@ -88,20 +93,91 @@
     public boolean applySelectPlanTransformation(Mutable<ILogicalOperator> selectRef,
             OptimizableOperatorSubTree subTree, Index chosenIndex, AccessMethodAnalysisContext analysisCtx,
             IOptimizationContext context) throws AlgebricksException {
-        Dataset dataset = subTree.dataset;
-        ARecordType recordType = subTree.recordType;
         SelectOperator select = (SelectOperator) selectRef.getValue();
-        DataSourceScanOperator dataSourceScan = subTree.dataSourceScan;
+        Mutable<ILogicalExpression> conditionRef = select.getCondition();
+        ILogicalOperator primaryIndexUnnestOp = createSecondaryToPrimaryPlan(selectRef, conditionRef, subTree, null,
+                chosenIndex, analysisCtx, false, false, context);
+        if (primaryIndexUnnestOp == null) {
+            return false;
+        }
+
         Mutable<ILogicalOperator> assignRef = (subTree.assignRefs.isEmpty()) ? null : subTree.assignRefs.get(0);
         AssignOperator assign = null;
         if (assignRef != null) {
             assign = (AssignOperator) assignRef.getValue();
         }
+        // Generate new select using the new condition.
+        if (conditionRef.getValue() != null) {
+            select.getInputs().clear();
+            if (assign != null) {
+                subTree.dataSourceScanRef.setValue(primaryIndexUnnestOp);
+                select.getInputs().add(new MutableObject<ILogicalOperator>(assign));
+            } else {
+                select.getInputs().add(new MutableObject<ILogicalOperator>(primaryIndexUnnestOp));
+            }
+        } else {
+            ((AbstractLogicalOperator) primaryIndexUnnestOp).setExecutionMode(ExecutionMode.PARTITIONED);
+            if (assign != null) {
+                subTree.dataSourceScanRef.setValue(primaryIndexUnnestOp);
+                selectRef.setValue(assign);
+            } else {
+                selectRef.setValue(primaryIndexUnnestOp);
+            }
+        }
+        return true;
+    }
+
+    @Override
+    public boolean applyJoinPlanTransformation(Mutable<ILogicalOperator> joinRef,
+            OptimizableOperatorSubTree leftSubTree, OptimizableOperatorSubTree rightSubTree, Index chosenIndex,
+            AccessMethodAnalysisContext analysisCtx, IOptimizationContext context) throws AlgebricksException {
+        
+        AbstractBinaryJoinOperator joinOp = (AbstractBinaryJoinOperator) joinRef.getValue();
+        Mutable<ILogicalExpression> conditionRef = joinOp.getCondition();
+        
+        // Skip this rule if there is no index NL hint.
+        AbstractFunctionCallExpression conditionFunc = (AbstractFunctionCallExpression) conditionRef.getValue();
+        if (!conditionFunc.getAnnotations().containsKey(IndexedNLJoinExpressionAnnotation.INSTANCE)) {
+            return false;
+        }
+        
+        // Determine if the index is applicable on the left or right side (if both, we arbitrarily prefer the left side).
+        Dataset dataset = analysisCtx.indexDatasetMap.get(chosenIndex);
+        // Determine probe and index subtrees based on chosen index.
+        OptimizableOperatorSubTree indexSubTree = null;
+        OptimizableOperatorSubTree probeSubTree = null;
+        if (leftSubTree.dataset != null && dataset.getDatasetName().equals(leftSubTree.dataset.getDatasetName())) {
+            indexSubTree = leftSubTree;
+            probeSubTree = rightSubTree;
+        } else if (rightSubTree.dataset != null
+                && dataset.getDatasetName().equals(rightSubTree.dataset.getDatasetName())) {
+            indexSubTree = rightSubTree;
+            probeSubTree = leftSubTree;
+        }
+        
+        ILogicalOperator primaryIndexUnnestOp = createSecondaryToPrimaryPlan(joinRef, conditionRef, indexSubTree, probeSubTree,
+                chosenIndex, analysisCtx, true, true, context);
+        if (primaryIndexUnnestOp == null) {
+            return false;
+        }
+        
+        // TODO: If there are conditions left, add a new select operator on top.
+        joinRef.setValue(primaryIndexUnnestOp);
+        return true;
+    }
+    
+    private ILogicalOperator createSecondaryToPrimaryPlan(Mutable<ILogicalOperator> topOpRef, Mutable<ILogicalExpression> conditionRef,
+            OptimizableOperatorSubTree indexSubTree,
+            OptimizableOperatorSubTree probeSubTree, Index chosenIndex, AccessMethodAnalysisContext analysisCtx,
+            boolean retainInput, boolean requiresBroadcast, IOptimizationContext context) throws AlgebricksException {
+        Dataset dataset = indexSubTree.dataset;
+        ARecordType recordType = indexSubTree.recordType;
+        DataSourceScanOperator dataSourceScan = indexSubTree.dataSourceScan;
         int numSecondaryKeys = chosenIndex.getKeyFieldNames().size();
 
         // Info on high and low keys for the BTree search predicate.
-        IAlgebricksConstantValue[] lowKeyConstants = new IAlgebricksConstantValue[numSecondaryKeys];
-        IAlgebricksConstantValue[] highKeyConstants = new IAlgebricksConstantValue[numSecondaryKeys];
+        ILogicalExpression[] lowKeyExprs = new ILogicalExpression[numSecondaryKeys];
+        ILogicalExpression[] highKeyExprs = new ILogicalExpression[numSecondaryKeys];
         LimitType[] lowKeyLimits = new LimitType[numSecondaryKeys];
         LimitType[] highKeyLimits = new LimitType[numSecondaryKeys];
         boolean[] lowKeyInclusive = new boolean[numSecondaryKeys];
@@ -128,13 +204,15 @@
             if (keyPos < 0) {
                 throw new InternalError();
             }
+            ILogicalExpression searchKeyExpr = AccessMethodUtils.createSearchKeyExpr(optFuncExpr, indexSubTree,
+                    probeSubTree);
             LimitType limit = getLimitType(optFuncExpr);
             switch (limit) {
                 case EQUAL: {
                     if (lowKeyLimits[keyPos] == null && highKeyLimits[keyPos] == null) {
                         lowKeyLimits[keyPos] = highKeyLimits[keyPos] = limit;
                         lowKeyInclusive[keyPos] = highKeyInclusive[keyPos] = true;
-                        lowKeyConstants[keyPos] = highKeyConstants[keyPos] = optFuncExpr.getConstantVal(0);
+                        lowKeyExprs[keyPos] = highKeyExprs[keyPos] = searchKeyExpr;
                         setLowKeys.set(keyPos);
                         setHighKeys.set(keyPos);
                     } else {
@@ -144,14 +222,14 @@
                     // If high and low keys are set, we exit for now.
                     if (setLowKeys.cardinality() == numSecondaryKeys
                             && setHighKeys.cardinality() == numSecondaryKeys) {
-                    	doneWithExprs = true;
+                        doneWithExprs = true;
                     }             
                     break;
                 }
                 case HIGH_EXCLUSIVE: {
                     if (highKeyLimits[keyPos] == null || (highKeyLimits[keyPos] != null && highKeyInclusive[keyPos])) {
                         highKeyLimits[keyPos] = limit;
-                        highKeyConstants[keyPos] = optFuncExpr.getConstantVal(0);
+                        highKeyExprs[keyPos] = searchKeyExpr;
                         highKeyInclusive[keyPos] = false;
                     } else {
                         couldntFigureOut = true;
@@ -162,7 +240,7 @@
                 case HIGH_INCLUSIVE: {
                     if (highKeyLimits[keyPos] == null) {
                         highKeyLimits[keyPos] = limit;
-                        highKeyConstants[keyPos] = optFuncExpr.getConstantVal(0);
+                        highKeyExprs[keyPos] = searchKeyExpr;
                         highKeyInclusive[keyPos] = true;
                     } else {
                         couldntFigureOut = true;
@@ -173,7 +251,7 @@
                 case LOW_EXCLUSIVE: {
                     if (lowKeyLimits[keyPos] == null || (lowKeyLimits[keyPos] != null && lowKeyInclusive[keyPos])) {
                         lowKeyLimits[keyPos] = limit;
-                        lowKeyConstants[keyPos] = optFuncExpr.getConstantVal(0);
+                        lowKeyExprs[keyPos] = searchKeyExpr;
                         lowKeyInclusive[keyPos] = false;
                     } else {
                         couldntFigureOut = true;
@@ -184,7 +262,7 @@
                 case LOW_INCLUSIVE: {
                     if (lowKeyLimits[keyPos] == null) {
                         lowKeyLimits[keyPos] = limit;
-                        lowKeyConstants[keyPos] = optFuncExpr.getConstantVal(0);
+                        lowKeyExprs[keyPos] = searchKeyExpr;
                         lowKeyInclusive[keyPos] = true;
                     } else {
                         couldntFigureOut = true;
@@ -205,22 +283,22 @@
             }
         }
         if (couldntFigureOut) {
-            return false;
+            return null;
         }
 
         // Rule out the cases unsupported by the current btree search
         // implementation.
         for (int i = 1; i < numSecondaryKeys; i++) {
             if (lowKeyInclusive[i] != lowKeyInclusive[0] || highKeyInclusive[i] != highKeyInclusive[0]) {
-                return false;
+                return null;
             }
             if (lowKeyLimits[0] == null && lowKeyLimits[i] != null || lowKeyLimits[0] != null
                     && lowKeyLimits[i] == null) {
-                return false;
+                return null;
             }
             if (highKeyLimits[0] == null && highKeyLimits[i] != null || highKeyLimits[0] != null
                     && highKeyLimits[i] == null) {
-                return false;
+                return null;
             }
         }
         if (lowKeyLimits[0] == null) {
@@ -233,97 +311,90 @@
         // Here we generate vars and funcs for assigning the secondary-index keys to be fed into the secondary-index search.
         // List of variables for the assign.
         ArrayList<LogicalVariable> keyVarList = new ArrayList<LogicalVariable>();
-        // List of expressions for the assign.
-        ArrayList<Mutable<ILogicalExpression>> keyExprList = new ArrayList<Mutable<ILogicalExpression>>();
-        int numLowKeys = createKeyVarsAndExprs(lowKeyLimits, lowKeyConstants, keyExprList, keyVarList, context);
-        int numHighKeys = createKeyVarsAndExprs(highKeyLimits, highKeyConstants, keyExprList, keyVarList, context);
+        // List of variables and expressions for the assign.
+        ArrayList<LogicalVariable> assignKeyVarList = new ArrayList<LogicalVariable>();
+        ArrayList<Mutable<ILogicalExpression>> assignKeyExprList = new ArrayList<Mutable<ILogicalExpression>>();        
+        int numLowKeys = createKeyVarsAndExprs(lowKeyLimits, lowKeyExprs, assignKeyVarList, assignKeyExprList, keyVarList, context);
+        int numHighKeys = createKeyVarsAndExprs(highKeyLimits, highKeyExprs, assignKeyVarList, assignKeyExprList, keyVarList, context);
 
         BTreeJobGenParams jobGenParams = new BTreeJobGenParams(chosenIndex.getIndexName(), IndexType.BTREE,
-                dataset.getDatasetName(), false, false);
+                dataset.getDatasetName(), retainInput, requiresBroadcast);
         jobGenParams.setLowKeyInclusive(lowKeyInclusive[0]);
         jobGenParams.setHighKeyInclusive(highKeyInclusive[0]);
         jobGenParams.setLowKeyVarList(keyVarList, 0, numLowKeys);
         jobGenParams.setHighKeyVarList(keyVarList, numLowKeys, numHighKeys);
 
-        // Assign operator that sets the secondary-index search-key fields.
-        AssignOperator assignSearchKeys = new AssignOperator(keyVarList, keyExprList);
-        // Input to this assign is the EmptyTupleSource (which the dataSourceScan also must have had as input).
-        assignSearchKeys.getInputs().add(dataSourceScan.getInputs().get(0));
-        assignSearchKeys.setExecutionMode(dataSourceScan.getExecutionMode());
+        ILogicalOperator inputOp = null;
+        if (!assignKeyVarList.isEmpty()) {
+            // Assign operator that sets the constant secondary-index search-key fields if necessary.
+            AssignOperator assignConstantSearchKeys = new AssignOperator(assignKeyVarList, assignKeyExprList);
+            // Input to this assign is the EmptyTupleSource (which the dataSourceScan also must have had as input).
+            assignConstantSearchKeys.getInputs().add(dataSourceScan.getInputs().get(0));
+            assignConstantSearchKeys.setExecutionMode(dataSourceScan.getExecutionMode());
+            inputOp = assignConstantSearchKeys;
+        } else {
+            // All index search keys are variables.
+            inputOp = probeSubTree.root;
+        }
 
         UnnestMapOperator secondaryIndexUnnestOp = AccessMethodUtils.createSecondaryIndexUnnestMap(dataset, recordType,
-                chosenIndex, assignSearchKeys, jobGenParams, context, false, false);
+                chosenIndex, inputOp, jobGenParams, context, false, retainInput);
 
         // Generate the rest of the upstream plan which feeds the search results into the primary index.        
         UnnestMapOperator primaryIndexUnnestOp;
         boolean isPrimaryIndex = chosenIndex.getIndexName().equals(dataset.getDatasetName());
         if (!isPrimaryIndex) {
             primaryIndexUnnestOp = AccessMethodUtils.createPrimaryIndexUnnestMap(dataSourceScan, dataset, recordType,
-                    secondaryIndexUnnestOp, context, true, false, false);
+                    secondaryIndexUnnestOp, context, true, retainInput, false);
         } else {
             List<Object> primaryIndexOutputTypes = new ArrayList<Object>();
             AccessMethodUtils.appendPrimaryIndexTypes(dataset, recordType, primaryIndexOutputTypes);
             primaryIndexUnnestOp = new UnnestMapOperator(dataSourceScan.getVariables(),
-                    secondaryIndexUnnestOp.getExpressionRef(), primaryIndexOutputTypes, false);
-            primaryIndexUnnestOp.getInputs().add(new MutableObject<ILogicalOperator>(assignSearchKeys));
+                    secondaryIndexUnnestOp.getExpressionRef(), primaryIndexOutputTypes, retainInput);
+            primaryIndexUnnestOp.getInputs().add(new MutableObject<ILogicalOperator>(inputOp));
         }
 
         List<Mutable<ILogicalExpression>> remainingFuncExprs = new ArrayList<Mutable<ILogicalExpression>>();
-        getNewSelectExprs(select, replacedFuncExprs, remainingFuncExprs);
-        // Generate new select using the new condition.
+        getNewConditionExprs(conditionRef, replacedFuncExprs, remainingFuncExprs);
+        // Generate new condition.
         if (!remainingFuncExprs.isEmpty()) {
             ILogicalExpression pulledCond = createSelectCondition(remainingFuncExprs);
-            SelectOperator selectRest = new SelectOperator(new MutableObject<ILogicalExpression>(pulledCond));
-            if (assign != null) {
-                subTree.dataSourceScanRef.setValue(primaryIndexUnnestOp);
-                selectRest.getInputs().add(new MutableObject<ILogicalOperator>(assign));
-            } else {
-                selectRest.getInputs().add(new MutableObject<ILogicalOperator>(primaryIndexUnnestOp));
-            }
-            selectRest.setExecutionMode(((AbstractLogicalOperator) selectRef.getValue()).getExecutionMode());
-            selectRef.setValue(selectRest);
+            conditionRef.setValue(pulledCond);
         } else {
-            primaryIndexUnnestOp.setExecutionMode(ExecutionMode.PARTITIONED);
-            if (assign != null) {
-                subTree.dataSourceScanRef.setValue(primaryIndexUnnestOp);
-                selectRef.setValue(assign);
-            } else {
-                selectRef.setValue(primaryIndexUnnestOp);
-            }
+            conditionRef.setValue(null);
         }
-        return true;
+        return primaryIndexUnnestOp;
     }
-
-    @Override
-    public boolean applyJoinPlanTransformation(Mutable<ILogicalOperator> joinRef,
-            OptimizableOperatorSubTree leftSubTree, OptimizableOperatorSubTree rightSubTree, Index chosenIndex,
-            AccessMethodAnalysisContext analysisCtx, IOptimizationContext context) throws AlgebricksException {
-        // TODO: Implement this.
-        return false;
-    }
-
-    private int createKeyVarsAndExprs(LimitType[] keyLimits, IAlgebricksConstantValue[] keyConstants,
-            ArrayList<Mutable<ILogicalExpression>> keyExprList, ArrayList<LogicalVariable> keyVarList,
-            IOptimizationContext context) {
+    
+    private int createKeyVarsAndExprs(LimitType[] keyLimits, ILogicalExpression[] searchKeyExprs,
+            ArrayList<LogicalVariable> assignKeyVarList, ArrayList<Mutable<ILogicalExpression>> assignKeyExprList,
+            ArrayList<LogicalVariable> keyVarList, IOptimizationContext context) {
         if (keyLimits[0] == null) {
             return 0;
         }
         int numKeys = keyLimits.length;
         for (int i = 0; i < numKeys; i++) {
-            LogicalVariable keyVar = context.newVar();
+            ILogicalExpression searchKeyExpr = searchKeyExprs[i];
+            LogicalVariable keyVar = null;
+            if (searchKeyExpr.getExpressionTag() == LogicalExpressionTag.CONSTANT) {
+                keyVar = context.newVar();
+                assignKeyExprList.add(new MutableObject<ILogicalExpression>(searchKeyExpr));
+                assignKeyVarList.add(keyVar);
+            } else {
+                keyVar = ((VariableReferenceExpression) searchKeyExpr).getVariableReference();
+            }
             keyVarList.add(keyVar);
-            keyExprList.add(new MutableObject<ILogicalExpression>(new ConstantExpression(keyConstants[i])));
         }
         return numKeys;
     }
 
-    private void getNewSelectExprs(SelectOperator select, Set<ILogicalExpression> replacedFuncExprs,
-            List<Mutable<ILogicalExpression>> remainingFuncExprs) {
+    private void getNewConditionExprs(Mutable<ILogicalExpression> conditionRef,
+            Set<ILogicalExpression> replacedFuncExprs, List<Mutable<ILogicalExpression>> remainingFuncExprs) {
         remainingFuncExprs.clear();
         if (replacedFuncExprs.isEmpty()) {
             return;
         }
-        AbstractFunctionCallExpression funcExpr = (AbstractFunctionCallExpression) select.getCondition().getValue();
+        AbstractFunctionCallExpression funcExpr = (AbstractFunctionCallExpression) conditionRef.getValue();
         if (replacedFuncExprs.size() == 1) {
             Iterator<ILogicalExpression> it = replacedFuncExprs.iterator();
             if (!it.hasNext()) {
diff --git a/asterix-algebra/src/main/java/edu/uci/ics/asterix/optimizer/rules/am/IntroduceJoinAccessMethodRule.java b/asterix-algebra/src/main/java/edu/uci/ics/asterix/optimizer/rules/am/IntroduceJoinAccessMethodRule.java
index e4555aa..d73bdc1 100644
--- a/asterix-algebra/src/main/java/edu/uci/ics/asterix/optimizer/rules/am/IntroduceJoinAccessMethodRule.java
+++ b/asterix-algebra/src/main/java/edu/uci/ics/asterix/optimizer/rules/am/IntroduceJoinAccessMethodRule.java
@@ -57,7 +57,9 @@
     // Register access methods.
     protected static Map<FunctionIdentifier, List<IAccessMethod>> accessMethods = new HashMap<FunctionIdentifier, List<IAccessMethod>>();
     static {
-        registerAccessMethod(InvertedIndexAccessMethod.INSTANCE, accessMethods);
+        registerAccessMethod(BTreeAccessMethod.INSTANCE, accessMethods);
+        registerAccessMethod(RTreeAccessMethod.INSTANCE, accessMethods);
+        registerAccessMethod(InvertedIndexAccessMethod.INSTANCE, accessMethods);        
     }
 
     @Override
diff --git a/asterix-algebra/src/main/java/edu/uci/ics/asterix/optimizer/rules/am/InvertedIndexAccessMethod.java b/asterix-algebra/src/main/java/edu/uci/ics/asterix/optimizer/rules/am/InvertedIndexAccessMethod.java
index 866720b..1715c97 100644
--- a/asterix-algebra/src/main/java/edu/uci/ics/asterix/optimizer/rules/am/InvertedIndexAccessMethod.java
+++ b/asterix-algebra/src/main/java/edu/uci/ics/asterix/optimizer/rules/am/InvertedIndexAccessMethod.java
@@ -376,7 +376,7 @@
     public boolean applySelectPlanTransformation(Mutable<ILogicalOperator> selectRef,
             OptimizableOperatorSubTree subTree, Index chosenIndex, AccessMethodAnalysisContext analysisCtx,
             IOptimizationContext context) throws AlgebricksException {
-        IOptimizableFuncExpr optFuncExpr = chooseOptFuncExpr(chosenIndex, analysisCtx);
+        IOptimizableFuncExpr optFuncExpr = AccessMethodUtils.chooseFirstOptFuncExpr(chosenIndex, analysisCtx);
         ILogicalOperator indexPlanRootOp = createSecondaryToPrimaryPlan(subTree, null, chosenIndex, optFuncExpr, false,
                 false, context);
         // Replace the datasource scan with the new plan rooted at primaryIndexUnnestMap.
@@ -400,7 +400,7 @@
             indexSubTree = rightSubTree;
             probeSubTree = leftSubTree;
         }
-        IOptimizableFuncExpr optFuncExpr = chooseOptFuncExpr(chosenIndex, analysisCtx);
+        IOptimizableFuncExpr optFuncExpr = AccessMethodUtils.chooseFirstOptFuncExpr(chosenIndex, analysisCtx);
 
         // Clone the original join condition because we may have to modify it (and we also need the original).
         InnerJoinOperator join = (InnerJoinOperator) joinRef.getValue();
@@ -456,14 +456,6 @@
         return true;
     }
 
-    private IOptimizableFuncExpr chooseOptFuncExpr(Index chosenIndex, AccessMethodAnalysisContext analysisCtx) {
-        // TODO: We can probably do something smarter here.
-        // Pick the first expr optimizable by this index.
-        List<Integer> indexExprs = analysisCtx.getIndexExprs(chosenIndex);
-        int firstExprIndex = indexExprs.get(0);
-        return analysisCtx.matchedFuncExprs.get(firstExprIndex);
-    }
-
     private Mutable<ILogicalOperator> createPanicNestedLoopJoinPlan(Mutable<ILogicalOperator> joinRef,
             OptimizableOperatorSubTree indexSubTree, OptimizableOperatorSubTree probeSubTree,
             IOptimizableFuncExpr optFuncExpr, Index chosenIndex, IOptimizationContext context)
diff --git a/asterix-algebra/src/main/java/edu/uci/ics/asterix/optimizer/rules/am/RTreeAccessMethod.java b/asterix-algebra/src/main/java/edu/uci/ics/asterix/optimizer/rules/am/RTreeAccessMethod.java
index dfd3ff7..231a6d8 100644
--- a/asterix-algebra/src/main/java/edu/uci/ics/asterix/optimizer/rules/am/RTreeAccessMethod.java
+++ b/asterix-algebra/src/main/java/edu/uci/ics/asterix/optimizer/rules/am/RTreeAccessMethod.java
@@ -26,8 +26,11 @@
 import edu.uci.ics.hyracks.algebricks.core.algebra.expressions.ConstantExpression;
 import edu.uci.ics.hyracks.algebricks.core.algebra.expressions.ScalarFunctionCallExpression;
 import edu.uci.ics.hyracks.algebricks.core.algebra.functions.FunctionIdentifier;
+import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.AbstractBinaryJoinOperator;
+import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.AbstractLogicalOperator.ExecutionMode;
 import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.AssignOperator;
 import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.DataSourceScanOperator;
+import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.SelectOperator;
 import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.UnnestMapOperator;
 
 /**
@@ -50,7 +53,11 @@
     @Override
     public boolean analyzeFuncExprArgs(AbstractFunctionCallExpression funcExpr, List<AssignOperator> assigns,
             AccessMethodAnalysisContext analysisCtx) {
-        return AccessMethodUtils.analyzeFuncExprArgsForOneConstAndVar(funcExpr, analysisCtx);
+        boolean matches = AccessMethodUtils.analyzeFuncExprArgsForOneConstAndVar(funcExpr, analysisCtx);
+        if (!matches) {
+            matches = AccessMethodUtils.analyzeFuncExprArgsForTwoVars(funcExpr, analysisCtx);
+        }
+        return matches;
     }
 
     @Override
@@ -65,27 +72,71 @@
 
     @Override
     public boolean applySelectPlanTransformation(Mutable<ILogicalOperator> selectRef,
-            OptimizableOperatorSubTree subTree, Index index, AccessMethodAnalysisContext analysisCtx,
+            OptimizableOperatorSubTree subTree, Index chosenIndex, AccessMethodAnalysisContext analysisCtx,
             IOptimizationContext context) throws AlgebricksException {
-        Dataset dataset = subTree.dataset;
-        ARecordType recordType = subTree.recordType;
         // TODO: We can probably do something smarter here based on selectivity or MBR area.
-        // Pick the first expr optimizable by this index.
-        List<Integer> indexExprs = analysisCtx.getIndexExprs(index);
-        int firstExprIndex = indexExprs.get(0);
-        IOptimizableFuncExpr optFuncExpr = analysisCtx.matchedFuncExprs.get(firstExprIndex);
+        IOptimizableFuncExpr optFuncExpr = AccessMethodUtils.chooseFirstOptFuncExpr(chosenIndex, analysisCtx);
+        ILogicalOperator primaryIndexUnnestOp = createSecondaryToPrimaryPlan(subTree, null, chosenIndex, optFuncExpr,
+                false, false, context);
+        if (primaryIndexUnnestOp == null) {
+            return false;
+        }
+        // Replace the datasource scan with the new plan rooted at primaryIndexUnnestMap.
+        subTree.dataSourceScanRef.setValue(primaryIndexUnnestOp);
+        return true;
+    }
 
-        // Get the number of dimensions corresponding to the field indexed by
-        // chosenIndex.
+    @Override
+    public boolean applyJoinPlanTransformation(Mutable<ILogicalOperator> joinRef,
+            OptimizableOperatorSubTree leftSubTree, OptimizableOperatorSubTree rightSubTree, Index chosenIndex,
+            AccessMethodAnalysisContext analysisCtx, IOptimizationContext context) throws AlgebricksException {
+        // Determine if the index is applicable on the left or right side (if both, we arbitrarily prefer the left side).
+        Dataset dataset = analysisCtx.indexDatasetMap.get(chosenIndex);
+        // Determine probe and index subtrees based on chosen index.
+        OptimizableOperatorSubTree indexSubTree = null;
+        OptimizableOperatorSubTree probeSubTree = null;
+        if (leftSubTree.dataset != null && dataset.getDatasetName().equals(leftSubTree.dataset.getDatasetName())) {
+            indexSubTree = leftSubTree;
+            probeSubTree = rightSubTree;
+        } else if (rightSubTree.dataset != null
+                && dataset.getDatasetName().equals(rightSubTree.dataset.getDatasetName())) {
+            indexSubTree = rightSubTree;
+            probeSubTree = leftSubTree;
+        }
+        // TODO: We can probably do something smarter here based on selectivity or MBR area.
+        IOptimizableFuncExpr optFuncExpr = AccessMethodUtils.chooseFirstOptFuncExpr(chosenIndex, analysisCtx);
+        ILogicalOperator primaryIndexUnnestOp = createSecondaryToPrimaryPlan(indexSubTree, probeSubTree, chosenIndex,
+                optFuncExpr, true, true, context);
+        if (primaryIndexUnnestOp == null) {
+            return false;
+        }
+        indexSubTree.dataSourceScanRef.setValue(primaryIndexUnnestOp);
+        // Change join into a select with the same condition.
+        AbstractBinaryJoinOperator joinOp = (AbstractBinaryJoinOperator) joinRef.getValue();
+        SelectOperator topSelect = new SelectOperator(joinOp.getCondition());
+        topSelect.getInputs().add(indexSubTree.rootRef);
+        topSelect.setExecutionMode(ExecutionMode.LOCAL);
+        context.computeAndSetTypeEnvironmentForOperator(topSelect);
+        // Replace the original join with the new subtree rooted at the select op.
+        joinRef.setValue(topSelect);
+        return true;
+    }
+
+    private ILogicalOperator createSecondaryToPrimaryPlan(OptimizableOperatorSubTree indexSubTree,
+            OptimizableOperatorSubTree probeSubTree, Index chosenIndex, IOptimizableFuncExpr optFuncExpr,
+            boolean retainInput, boolean requiresBroadcast, IOptimizationContext context) throws AlgebricksException {
+        Dataset dataset = indexSubTree.dataset;
+        ARecordType recordType = indexSubTree.recordType;
+
+        // Get the number of dimensions corresponding to the field indexed by chosenIndex.
         Pair<IAType, Boolean> keyPairType = Index.getNonNullableKeyFieldType(optFuncExpr.getFieldName(0), recordType);
         IAType spatialType = keyPairType.first;
         int numDimensions = NonTaggedFormatUtil.getNumDimensions(spatialType.getTypeTag());
         int numSecondaryKeys = numDimensions * 2;
 
-        DataSourceScanOperator dataSourceScan = subTree.dataSourceScan;
-        // TODO: For now retainInput and requiresBroadcast are always false.
-        RTreeJobGenParams jobGenParams = new RTreeJobGenParams(index.getIndexName(), IndexType.RTREE,
-                dataset.getDatasetName(), false, false);
+        DataSourceScanOperator dataSourceScan = indexSubTree.dataSourceScan;
+        RTreeJobGenParams jobGenParams = new RTreeJobGenParams(chosenIndex.getIndexName(), IndexType.RTREE,
+                dataset.getDatasetName(), retainInput, requiresBroadcast);
         // A spatial object is serialized in the constant of the func expr we are optimizing.
         // The R-Tree expects as input an MBR represented with 1 field per dimension. 
         // Here we generate vars and funcs for extracting MBR fields from the constant into fields of a tuple (as the R-Tree expects them).
@@ -93,13 +144,14 @@
         ArrayList<LogicalVariable> keyVarList = new ArrayList<LogicalVariable>();
         // List of expressions for the assign.
         ArrayList<Mutable<ILogicalExpression>> keyExprList = new ArrayList<Mutable<ILogicalExpression>>();
+        ILogicalExpression searchKeyExpr = AccessMethodUtils.createSearchKeyExpr(optFuncExpr, indexSubTree,
+                probeSubTree);
         for (int i = 0; i < numSecondaryKeys; i++) {
             // The create MBR function "extracts" one field of an MBR around the given spatial object.
             AbstractFunctionCallExpression createMBR = new ScalarFunctionCallExpression(
                     FunctionUtils.getFunctionInfo(AsterixBuiltinFunctions.CREATE_MBR));
             // Spatial object is the constant from the func expr we are optimizing.
-            createMBR.getArguments().add(
-                    new MutableObject<ILogicalExpression>(new ConstantExpression(optFuncExpr.getConstantVal(0))));
+            createMBR.getArguments().add(new MutableObject<ILogicalExpression>(searchKeyExpr));
             // The number of dimensions.
             createMBR.getArguments().add(
                     new MutableObject<ILogicalExpression>(new ConstantExpression(new AsterixConstantValue(new AInt32(
@@ -117,26 +169,23 @@
 
         // Assign operator that "extracts" the MBR fields from the func-expr constant into a tuple.
         AssignOperator assignSearchKeys = new AssignOperator(keyVarList, keyExprList);
-        // Input to this assign is the EmptyTupleSource (which the dataSourceScan also must have had as input).
-        assignSearchKeys.getInputs().add(dataSourceScan.getInputs().get(0));
-        assignSearchKeys.setExecutionMode(dataSourceScan.getExecutionMode());
+        if (probeSubTree == null) {
+            // We are optimizing a selection query.
+            // Input to this assign is the EmptyTupleSource (which the dataSourceScan also must have had as input).
+            assignSearchKeys.getInputs().add(dataSourceScan.getInputs().get(0));
+            assignSearchKeys.setExecutionMode(dataSourceScan.getExecutionMode());
+        } else {
+            // We are optimizing a join, place the assign op top of the probe subtree.
+            assignSearchKeys.getInputs().add(probeSubTree.rootRef);
+        }
 
         UnnestMapOperator secondaryIndexUnnestOp = AccessMethodUtils.createSecondaryIndexUnnestMap(dataset, recordType,
-                index, assignSearchKeys, jobGenParams, context, false, false);
+                chosenIndex, assignSearchKeys, jobGenParams, context, false, retainInput);
         // Generate the rest of the upstream plan which feeds the search results into the primary index.
         UnnestMapOperator primaryIndexUnnestOp = AccessMethodUtils.createPrimaryIndexUnnestMap(dataSourceScan, dataset,
-                recordType, secondaryIndexUnnestOp, context, true, false, false);
-        // Replace the datasource scan with the new plan rooted at primaryIndexUnnestMap.
-        subTree.dataSourceScanRef.setValue(primaryIndexUnnestOp);
-        return true;
-    }
+                recordType, secondaryIndexUnnestOp, context, true, retainInput, false);
 
-    @Override
-    public boolean applyJoinPlanTransformation(Mutable<ILogicalOperator> joinRef,
-            OptimizableOperatorSubTree leftSubTree, OptimizableOperatorSubTree rightSubTree, Index chosenIndex,
-            AccessMethodAnalysisContext analysisCtx, IOptimizationContext context) throws AlgebricksException {
-        // TODO Implement this.
-        return false;
+        return primaryIndexUnnestOp;
     }
 
     @Override
diff --git a/asterix-app/src/test/resources/optimizerts/queries/btree-index-join/primary-equi-join_01.aql b/asterix-app/src/test/resources/optimizerts/queries/btree-index-join/primary-equi-join_01.aql
new file mode 100644
index 0000000..f7f8d6c
--- /dev/null
+++ b/asterix-app/src/test/resources/optimizerts/queries/btree-index-join/primary-equi-join_01.aql
@@ -0,0 +1,46 @@
+/*
+ * Description    : Equi joins two datasets, Customers and Orders, based on the customer id.
+ *                  Given the 'indexnl' hint we expect the join to be transformed
+ *                  into an indexed nested-loop join using Customers' primary index.
+ * Success        : Yes
+ */
+
+drop dataverse test if exists;
+create dataverse test;
+use dataverse test;
+
+create type AddressType as closed {
+  number: int32, 
+  street: string,
+  city: string
+}
+
+create type CustomerType as closed {
+  cid: int32, 
+  name: string,
+  age: int32?,
+  address: AddressType?,
+  lastorder: {
+    oid: int32,
+    total: float
+  }
+}
+
+create type OrderType as closed {
+  oid: int32,
+  cid: int32,
+  orderstatus: string,
+  orderpriority: string,
+  clerk: string,
+  total: float
+}
+
+create dataset Customers(CustomerType) partitioned by key cid;
+create dataset Orders(OrderType) partitioned by key oid;
+
+write output to nc1:"rttest/btree-index-join_primary-equi-join_01.adm";
+
+for $c in dataset('Customers')
+for $o in dataset('Orders')
+where $c.cid /*+ indexnl */ = $o.cid
+return {"customer":$c, "order": $o} 
diff --git a/asterix-app/src/test/resources/optimizerts/queries/btree-index-join/primary-equi-join_02.aql b/asterix-app/src/test/resources/optimizerts/queries/btree-index-join/primary-equi-join_02.aql
new file mode 100644
index 0000000..9ac6140
--- /dev/null
+++ b/asterix-app/src/test/resources/optimizerts/queries/btree-index-join/primary-equi-join_02.aql
@@ -0,0 +1,46 @@
+/*
+ * Description    : Equi joins two datasets, Customers and Orders, based on the customer id.
+ *                  Given the 'indexnl' hint we expect the join to be transformed
+ *                  into an indexed nested-loop join using Customers' primary index.
+ * Success        : Yes
+ */
+
+drop dataverse test if exists;
+create dataverse test;
+use dataverse test;
+
+create type AddressType as closed {
+  number: int32, 
+  street: string,
+  city: string
+}
+
+create type CustomerType as closed {
+  cid: int32, 
+  name: string,
+  age: int32?,
+  address: AddressType?,
+  lastorder: {
+    oid: int32,
+    total: float
+  }
+}
+
+create type OrderType as closed {
+  oid: int32,
+  cid: int32,
+  orderstatus: string,
+  orderpriority: string,
+  clerk: string,
+  total: float
+}
+
+create dataset Customers(CustomerType) partitioned by key cid;
+create dataset Orders(OrderType) partitioned by key oid;
+
+write output to nc1:"rttest/btree-index-join_primary-equi-join_02.adm";
+
+for $o in dataset('Orders')
+for $c in dataset('Customers')
+where $o.cid /*+ indexnl */ = $c.cid 
+return {"customer":$c, "order": $o} 
diff --git a/asterix-app/src/test/resources/optimizerts/queries/btree-index-join/primary-equi-join_03.aql b/asterix-app/src/test/resources/optimizerts/queries/btree-index-join/primary-equi-join_03.aql
new file mode 100644
index 0000000..e33e2a9
--- /dev/null
+++ b/asterix-app/src/test/resources/optimizerts/queries/btree-index-join/primary-equi-join_03.aql
@@ -0,0 +1,36 @@
+/*
+ * Description    : Self-equi joins a dataset, Customers, based on the customer id.
+ *                  Given the 'indexnl' hint we expect the join to be transformed
+ *                  into an indexed nested-loop join using Customers' primary index.
+ * Success        : Yes
+ */
+
+drop dataverse test if exists;
+create dataverse test;
+use dataverse test;
+
+create type AddressType as closed {
+  number: int32, 
+  street: string,
+  city: string
+}
+
+create type CustomerType as closed {
+  cid: int32, 
+  name: string,
+  age: int32?,
+  address: AddressType?,
+  lastorder: {
+    oid: int32,
+    total: float
+  }
+}
+
+create dataset Customers(CustomerType) partitioned by key cid;
+
+write output to nc1:"rttest/btree-index-join_primary-equi-join_03.adm";
+
+for $c1 in dataset('Customers')
+for $c2 in dataset('Customers')
+where $c1.cid /*+ indexnl */ = $c2.cid 
+return {"customer1":$c1, "customer2":$c2} 
diff --git a/asterix-app/src/test/resources/optimizerts/queries/btree-index-join/secondary-equi-join_01.aql b/asterix-app/src/test/resources/optimizerts/queries/btree-index-join/secondary-equi-join_01.aql
new file mode 100644
index 0000000..c861e3f
--- /dev/null
+++ b/asterix-app/src/test/resources/optimizerts/queries/btree-index-join/secondary-equi-join_01.aql
@@ -0,0 +1,47 @@
+/*
+ * Description    : Equi joins two datasets, DBLP and CSX, based on their title.
+ *                  DBLP has a secondary btree index on title, and given the 'indexnl' hint 
+ *                  we expect the join to be transformed into an indexed nested-loop join.
+ * Success        : Yes
+ */
+
+drop dataverse test if exists;
+create dataverse test;
+use dataverse test;
+
+create type DBLPType as closed {
+  id: int32, 
+  dblpid: string,
+  title: string,
+  authors: string,
+  misc: string
+}
+
+create type CSXType as closed {
+  id: int32, 
+  csxid: string,
+  title: string,
+  authors: string,
+  misc: string
+}
+
+create dataset DBLP(DBLPType) partitioned by key id;
+
+create dataset CSX(CSXType) partitioned by key id;
+
+load dataset DBLP 
+using "edu.uci.ics.asterix.external.dataset.adapter.NCFileSystemAdapter"
+(("path"="nc1://data/dblp-small/dblp-small-id.txt"),("format"="delimited-text"),("delimiter"=":")) pre-sorted;
+
+load dataset CSX
+using "edu.uci.ics.asterix.external.dataset.adapter.NCFileSystemAdapter"
+(("path"="nc1://data/pub-small/csx-small-id.txt"),("format"="delimited-text"),("delimiter"=":"));
+
+create index title_index on DBLP(title);
+
+write output to nc1:"rttest/btree-index-join_title-secondary-equi-join_01.adm";
+
+for $a in dataset('DBLP')
+for $b in dataset('CSX')
+where $a.title /*+ indexnl */ = $b.title
+return {"arec": $a, "brec": $b}
diff --git a/asterix-app/src/test/resources/optimizerts/queries/btree-index-join/secondary-equi-join_02.aql b/asterix-app/src/test/resources/optimizerts/queries/btree-index-join/secondary-equi-join_02.aql
new file mode 100644
index 0000000..1f07316
--- /dev/null
+++ b/asterix-app/src/test/resources/optimizerts/queries/btree-index-join/secondary-equi-join_02.aql
@@ -0,0 +1,47 @@
+/*
+ * Description    : Equi joins two datasets, DBLP and CSX, based on their title.
+ *                  CSX has a secondary btree index on title, and given the 'indexnl' hint 
+ *                  we expect the join to be transformed into an indexed nested-loop join.
+ * Success        : Yes
+ */
+
+drop dataverse test if exists;
+create dataverse test;
+use dataverse test;
+
+create type DBLPType as closed {
+  id: int32, 
+  dblpid: string,
+  title: string,
+  authors: string,
+  misc: string
+}
+
+create type CSXType as closed {
+  id: int32, 
+  csxid: string,
+  title: string,
+  authors: string,
+  misc: string
+}
+
+create dataset DBLP(DBLPType) partitioned by key id;
+
+create dataset CSX(CSXType) partitioned by key id;
+
+load dataset DBLP 
+using "edu.uci.ics.asterix.external.dataset.adapter.NCFileSystemAdapter"
+(("path"="nc1://data/dblp-small/dblp-small-id.txt"),("format"="delimited-text"),("delimiter"=":")) pre-sorted;
+
+load dataset CSX
+using "edu.uci.ics.asterix.external.dataset.adapter.NCFileSystemAdapter"
+(("path"="nc1://data/pub-small/csx-small-id.txt"),("format"="delimited-text"),("delimiter"=":"));
+
+create index title_index on CSX(title);
+
+write output to nc1:"rttest/btree-index-join_title-secondary-equi-join_02.adm";
+
+for $a in dataset('DBLP')
+for $b in dataset('CSX')
+where $a.title /*+ indexnl */ = $b.title
+return {"arec": $a, "brec": $b}
diff --git a/asterix-app/src/test/resources/optimizerts/queries/btree-index-join/secondary-equi-join_03.aql b/asterix-app/src/test/resources/optimizerts/queries/btree-index-join/secondary-equi-join_03.aql
new file mode 100644
index 0000000..cd3d964
--- /dev/null
+++ b/asterix-app/src/test/resources/optimizerts/queries/btree-index-join/secondary-equi-join_03.aql
@@ -0,0 +1,33 @@
+/*
+ * Description    : Equi self-joins a dataset, DBLP, based on its title.
+ *                  DBLP has a secondary btree index on title, and given the 'indexnl' hint 
+ *                  we expect the join to be transformed into an indexed nested-loop join.
+ * Success        : Yes
+ */
+
+drop dataverse test if exists;
+create dataverse test;
+use dataverse test;
+
+create type DBLPType as closed {
+  id: int32, 
+  dblpid: string,
+  title: string,
+  authors: string,
+  misc: string
+}
+
+create dataset DBLP(DBLPType) partitioned by key id;
+
+load dataset DBLP 
+using "edu.uci.ics.asterix.external.dataset.adapter.NCFileSystemAdapter"
+(("path"="nc1://data/dblp-small/dblp-small-id.txt"),("format"="delimited-text"),("delimiter"=":")) pre-sorted;
+
+create index title_index on DBLP(title);
+
+write output to nc1:"rttest/btree-index-join_title-secondary-equi-join_03.adm";
+
+for $a in dataset('DBLP')
+for $b in dataset('DBLP')
+where $a.title /*+ indexnl */ = $b.title
+return {"arec": $a, "brec": $b}
diff --git a/asterix-app/src/test/resources/optimizerts/queries/rtree-index-join/spatial-intersect-point_01.aql b/asterix-app/src/test/resources/optimizerts/queries/rtree-index-join/spatial-intersect-point_01.aql
new file mode 100644
index 0000000..207b5c9
--- /dev/null
+++ b/asterix-app/src/test/resources/optimizerts/queries/rtree-index-join/spatial-intersect-point_01.aql
@@ -0,0 +1,33 @@
+/*
+ * Description    : Joins two datasets on the intersection of their point attributes.
+ *                  The dataset 'MyData1' has an RTree index, and we expect the 
+ *                  join to be transformed into an indexed nested-loop join.
+ * Success        : Yes
+ */
+
+drop dataverse test if exists;
+create dataverse test;
+use dataverse test;
+
+create type MyRecord as closed {
+  id: int32,
+  point: point,
+  kwds: string,
+  line1: line,
+  line2: line,
+  poly1: polygon,
+  poly2: polygon,
+  rec: rectangle
+}
+
+create dataset MyData1(MyRecord) partitioned by key id;
+create dataset MyData2(MyRecord) partitioned by key id;
+
+create index rtree_index on MyData1(point) type rtree;
+
+write output to nc1:"rttest/index-join_rtree-spatial-intersect-point.adm";
+
+for $a in dataset('MyData1')
+for $b in dataset('MyData2')
+where spatial-intersect($a.point, $b.point)
+return {"a": $a, "b": $b}
diff --git a/asterix-app/src/test/resources/optimizerts/queries/rtree-index-join/spatial-intersect-point_02.aql b/asterix-app/src/test/resources/optimizerts/queries/rtree-index-join/spatial-intersect-point_02.aql
new file mode 100644
index 0000000..e70bedf
--- /dev/null
+++ b/asterix-app/src/test/resources/optimizerts/queries/rtree-index-join/spatial-intersect-point_02.aql
@@ -0,0 +1,33 @@
+/*
+ * Description    : Joins two datasets on the intersection of their point attributes.
+ *                  The dataset 'MyData2' has an RTree index, and we expect the 
+ *                  join to be transformed into an indexed nested-loop join.
+ * Success        : Yes
+ */
+
+drop dataverse test if exists;
+create dataverse test;
+use dataverse test;
+
+create type MyRecord as closed {
+  id: int32,
+  point: point,
+  kwds: string,
+  line1: line,
+  line2: line,
+  poly1: polygon,
+  poly2: polygon,
+  rec: rectangle
+}
+
+create dataset MyData1(MyRecord) partitioned by key id;
+create dataset MyData2(MyRecord) partitioned by key id;
+
+create index rtree_index on MyData2(point) type rtree;
+
+write output to nc1:"rttest/rtree-index-join_spatial-intersect-point_02.adm";
+
+for $a in dataset('MyData1')
+for $b in dataset('MyData2')
+where spatial-intersect($a.point, $b.point)
+return {"a": $a, "b": $b}
diff --git a/asterix-app/src/test/resources/optimizerts/queries/rtree-index-join/spatial-intersect-point_03.aql b/asterix-app/src/test/resources/optimizerts/queries/rtree-index-join/spatial-intersect-point_03.aql
new file mode 100644
index 0000000..85fc22b
--- /dev/null
+++ b/asterix-app/src/test/resources/optimizerts/queries/rtree-index-join/spatial-intersect-point_03.aql
@@ -0,0 +1,32 @@
+/*
+ * Description    : Self-joins a dataset on the intersection of its point attribute.
+ *                  The dataset has an RTree index, and we expect the 
+ *                  join to be transformed into an indexed nested-loop join.
+ * Success        : Yes
+ */
+
+drop dataverse test if exists;
+create dataverse test;
+use dataverse test;
+
+create type MyRecord as closed {
+  id: int32,
+  point: point,
+  kwds: string,
+  line1: line,
+  line2: line,
+  poly1: polygon,
+  poly2: polygon,
+  rec: rectangle
+}
+
+create dataset MyData(MyRecord) partitioned by key id;
+
+create index rtree_index on MyData(point) type rtree;
+
+write output to nc1:"rttest/rtree-index-join_spatial-intersect-point_03.adm";
+
+for $a in dataset('MyData')
+for $b in dataset('MyData')
+where spatial-intersect($a.point, $b.point)
+return {"a": $a, "b": $b}
diff --git a/asterix-app/src/test/resources/optimizerts/results/btree-index-join/primary-equi-join_01.plan b/asterix-app/src/test/resources/optimizerts/results/btree-index-join/primary-equi-join_01.plan
new file mode 100644
index 0000000..b9b830e
--- /dev/null
+++ b/asterix-app/src/test/resources/optimizerts/results/btree-index-join/primary-equi-join_01.plan
@@ -0,0 +1,15 @@
+-- SINK_WRITE  |PARTITIONED|
+  -- RANDOM_MERGE_EXCHANGE  |PARTITIONED|
+    -- STREAM_PROJECT  |PARTITIONED|
+      -- ASSIGN  |PARTITIONED|
+        -- ONE_TO_ONE_EXCHANGE  |UNPARTITIONED|
+          -- BTREE_SEARCH  |UNPARTITIONED|
+            -- ONE_TO_ONE_EXCHANGE  |LOCAL|
+              -- STABLE_SORT [$$11(ASC)]  |LOCAL|
+                -- HASH_PARTITION_EXCHANGE [$$11]  |PARTITIONED|
+                  -- STREAM_PROJECT  |PARTITIONED|
+                    -- ASSIGN  |PARTITIONED|
+                      -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
+                        -- DATASOURCE_SCAN  |PARTITIONED|
+                          -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
+                            -- EMPTY_TUPLE_SOURCE  |PARTITIONED|
diff --git a/asterix-app/src/test/resources/optimizerts/results/btree-index-join/primary-equi-join_02.plan b/asterix-app/src/test/resources/optimizerts/results/btree-index-join/primary-equi-join_02.plan
new file mode 100644
index 0000000..d11dc6a
--- /dev/null
+++ b/asterix-app/src/test/resources/optimizerts/results/btree-index-join/primary-equi-join_02.plan
@@ -0,0 +1,15 @@
+-- SINK_WRITE  |PARTITIONED|
+  -- RANDOM_MERGE_EXCHANGE  |PARTITIONED|
+    -- STREAM_PROJECT  |PARTITIONED|
+      -- ASSIGN  |PARTITIONED|
+        -- ONE_TO_ONE_EXCHANGE  |UNPARTITIONED|
+          -- BTREE_SEARCH  |UNPARTITIONED|
+            -- ONE_TO_ONE_EXCHANGE  |LOCAL|
+              -- STABLE_SORT [$$10(ASC)]  |LOCAL|
+                -- HASH_PARTITION_EXCHANGE [$$10]  |PARTITIONED|
+                  -- STREAM_PROJECT  |PARTITIONED|
+                    -- ASSIGN  |PARTITIONED|
+                      -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
+                        -- DATASOURCE_SCAN  |PARTITIONED|
+                          -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
+                            -- EMPTY_TUPLE_SOURCE  |PARTITIONED|
diff --git a/asterix-app/src/test/resources/optimizerts/results/btree-index-join/primary-equi-join_03.plan b/asterix-app/src/test/resources/optimizerts/results/btree-index-join/primary-equi-join_03.plan
new file mode 100644
index 0000000..a2e27c9
--- /dev/null
+++ b/asterix-app/src/test/resources/optimizerts/results/btree-index-join/primary-equi-join_03.plan
@@ -0,0 +1,10 @@
+-- SINK_WRITE  |PARTITIONED|
+  -- RANDOM_MERGE_EXCHANGE  |PARTITIONED|
+    -- STREAM_PROJECT  |PARTITIONED|
+      -- ASSIGN  |PARTITIONED|
+        -- ONE_TO_ONE_EXCHANGE  |UNPARTITIONED|
+          -- BTREE_SEARCH  |UNPARTITIONED|
+            -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
+              -- DATASOURCE_SCAN  |PARTITIONED|
+                -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
+                  -- EMPTY_TUPLE_SOURCE  |PARTITIONED|
diff --git a/asterix-app/src/test/resources/optimizerts/results/btree-index-join/secondary-equi-join_01.plan b/asterix-app/src/test/resources/optimizerts/results/btree-index-join/secondary-equi-join_01.plan
new file mode 100644
index 0000000..14cd128
--- /dev/null
+++ b/asterix-app/src/test/resources/optimizerts/results/btree-index-join/secondary-equi-join_01.plan
@@ -0,0 +1,18 @@
+-- SINK_WRITE  |PARTITIONED|
+  -- RANDOM_MERGE_EXCHANGE  |PARTITIONED|
+    -- STREAM_PROJECT  |PARTITIONED|
+      -- ASSIGN  |PARTITIONED|
+        -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
+          -- BTREE_SEARCH  |PARTITIONED|
+            -- ONE_TO_ONE_EXCHANGE  |LOCAL|
+              -- STABLE_SORT [$$13(ASC)]  |LOCAL|
+                -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
+                  -- STREAM_PROJECT  |PARTITIONED|
+                    -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
+                      -- BTREE_SEARCH  |PARTITIONED|
+                        -- BROADCAST_EXCHANGE  |PARTITIONED|
+                          -- ASSIGN  |PARTITIONED|
+                            -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
+                              -- DATASOURCE_SCAN  |PARTITIONED|
+                                -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
+                                  -- EMPTY_TUPLE_SOURCE  |PARTITIONED|
diff --git a/asterix-app/src/test/resources/optimizerts/results/btree-index-join/secondary-equi-join_02.plan b/asterix-app/src/test/resources/optimizerts/results/btree-index-join/secondary-equi-join_02.plan
new file mode 100644
index 0000000..14cd128
--- /dev/null
+++ b/asterix-app/src/test/resources/optimizerts/results/btree-index-join/secondary-equi-join_02.plan
@@ -0,0 +1,18 @@
+-- SINK_WRITE  |PARTITIONED|
+  -- RANDOM_MERGE_EXCHANGE  |PARTITIONED|
+    -- STREAM_PROJECT  |PARTITIONED|
+      -- ASSIGN  |PARTITIONED|
+        -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
+          -- BTREE_SEARCH  |PARTITIONED|
+            -- ONE_TO_ONE_EXCHANGE  |LOCAL|
+              -- STABLE_SORT [$$13(ASC)]  |LOCAL|
+                -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
+                  -- STREAM_PROJECT  |PARTITIONED|
+                    -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
+                      -- BTREE_SEARCH  |PARTITIONED|
+                        -- BROADCAST_EXCHANGE  |PARTITIONED|
+                          -- ASSIGN  |PARTITIONED|
+                            -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
+                              -- DATASOURCE_SCAN  |PARTITIONED|
+                                -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
+                                  -- EMPTY_TUPLE_SOURCE  |PARTITIONED|
diff --git a/asterix-app/src/test/resources/optimizerts/results/btree-index-join/secondary-equi-join_03.plan b/asterix-app/src/test/resources/optimizerts/results/btree-index-join/secondary-equi-join_03.plan
new file mode 100644
index 0000000..14cd128
--- /dev/null
+++ b/asterix-app/src/test/resources/optimizerts/results/btree-index-join/secondary-equi-join_03.plan
@@ -0,0 +1,18 @@
+-- SINK_WRITE  |PARTITIONED|
+  -- RANDOM_MERGE_EXCHANGE  |PARTITIONED|
+    -- STREAM_PROJECT  |PARTITIONED|
+      -- ASSIGN  |PARTITIONED|
+        -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
+          -- BTREE_SEARCH  |PARTITIONED|
+            -- ONE_TO_ONE_EXCHANGE  |LOCAL|
+              -- STABLE_SORT [$$13(ASC)]  |LOCAL|
+                -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
+                  -- STREAM_PROJECT  |PARTITIONED|
+                    -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
+                      -- BTREE_SEARCH  |PARTITIONED|
+                        -- BROADCAST_EXCHANGE  |PARTITIONED|
+                          -- ASSIGN  |PARTITIONED|
+                            -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
+                              -- DATASOURCE_SCAN  |PARTITIONED|
+                                -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
+                                  -- EMPTY_TUPLE_SOURCE  |PARTITIONED|
diff --git a/asterix-app/src/test/resources/optimizerts/results/rtree-index-join/spatial-intersect-point_01.plan b/asterix-app/src/test/resources/optimizerts/results/rtree-index-join/spatial-intersect-point_01.plan
new file mode 100644
index 0000000..73968f0
--- /dev/null
+++ b/asterix-app/src/test/resources/optimizerts/results/rtree-index-join/spatial-intersect-point_01.plan
@@ -0,0 +1,20 @@
+-- SINK_WRITE  |PARTITIONED|
+  -- RANDOM_MERGE_EXCHANGE  |PARTITIONED|
+    -- STREAM_PROJECT  |PARTITIONED|
+      -- ASSIGN  |PARTITIONED|
+        -- STREAM_SELECT  |LOCAL|
+          -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
+            -- BTREE_SEARCH  |PARTITIONED|
+              -- ONE_TO_ONE_EXCHANGE  |LOCAL|
+                -- STABLE_SORT [$$20(ASC)]  |LOCAL|
+                  -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
+                    -- STREAM_PROJECT  |PARTITIONED|
+                      -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
+                        -- RTREE_SEARCH  |PARTITIONED|
+                          -- BROADCAST_EXCHANGE  |PARTITIONED|
+                            -- ASSIGN  |UNPARTITIONED|
+                              -- ASSIGN  |PARTITIONED|
+                                -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
+                                  -- DATASOURCE_SCAN  |PARTITIONED|
+                                    -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
+                                      -- EMPTY_TUPLE_SOURCE  |PARTITIONED|
diff --git a/asterix-app/src/test/resources/optimizerts/results/rtree-index-join/spatial-intersect-point_02.plan b/asterix-app/src/test/resources/optimizerts/results/rtree-index-join/spatial-intersect-point_02.plan
new file mode 100644
index 0000000..73968f0
--- /dev/null
+++ b/asterix-app/src/test/resources/optimizerts/results/rtree-index-join/spatial-intersect-point_02.plan
@@ -0,0 +1,20 @@
+-- SINK_WRITE  |PARTITIONED|
+  -- RANDOM_MERGE_EXCHANGE  |PARTITIONED|
+    -- STREAM_PROJECT  |PARTITIONED|
+      -- ASSIGN  |PARTITIONED|
+        -- STREAM_SELECT  |LOCAL|
+          -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
+            -- BTREE_SEARCH  |PARTITIONED|
+              -- ONE_TO_ONE_EXCHANGE  |LOCAL|
+                -- STABLE_SORT [$$20(ASC)]  |LOCAL|
+                  -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
+                    -- STREAM_PROJECT  |PARTITIONED|
+                      -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
+                        -- RTREE_SEARCH  |PARTITIONED|
+                          -- BROADCAST_EXCHANGE  |PARTITIONED|
+                            -- ASSIGN  |UNPARTITIONED|
+                              -- ASSIGN  |PARTITIONED|
+                                -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
+                                  -- DATASOURCE_SCAN  |PARTITIONED|
+                                    -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
+                                      -- EMPTY_TUPLE_SOURCE  |PARTITIONED|
diff --git a/asterix-app/src/test/resources/optimizerts/results/rtree-index-join/spatial-intersect-point_03.plan b/asterix-app/src/test/resources/optimizerts/results/rtree-index-join/spatial-intersect-point_03.plan
new file mode 100644
index 0000000..73968f0
--- /dev/null
+++ b/asterix-app/src/test/resources/optimizerts/results/rtree-index-join/spatial-intersect-point_03.plan
@@ -0,0 +1,20 @@
+-- SINK_WRITE  |PARTITIONED|
+  -- RANDOM_MERGE_EXCHANGE  |PARTITIONED|
+    -- STREAM_PROJECT  |PARTITIONED|
+      -- ASSIGN  |PARTITIONED|
+        -- STREAM_SELECT  |LOCAL|
+          -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
+            -- BTREE_SEARCH  |PARTITIONED|
+              -- ONE_TO_ONE_EXCHANGE  |LOCAL|
+                -- STABLE_SORT [$$20(ASC)]  |LOCAL|
+                  -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
+                    -- STREAM_PROJECT  |PARTITIONED|
+                      -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
+                        -- RTREE_SEARCH  |PARTITIONED|
+                          -- BROADCAST_EXCHANGE  |PARTITIONED|
+                            -- ASSIGN  |UNPARTITIONED|
+                              -- ASSIGN  |PARTITIONED|
+                                -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
+                                  -- DATASOURCE_SCAN  |PARTITIONED|
+                                    -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
+                                      -- EMPTY_TUPLE_SOURCE  |PARTITIONED|
diff --git a/asterix-app/src/test/resources/runtimets/queries/index-join/rtree-spatial-intersect-point.aql b/asterix-app/src/test/resources/runtimets/queries/index-join/rtree-spatial-intersect-point.aql
new file mode 100644
index 0000000..efee6cf
--- /dev/null
+++ b/asterix-app/src/test/resources/runtimets/queries/index-join/rtree-spatial-intersect-point.aql
@@ -0,0 +1,42 @@
+/*
+ * Description    : Joins two datasets on the intersection of their point attributes.
+ *                  The dataset 'MyData1' has an RTree index, and we expect the 
+ *                  join to be transformed into an indexed nested-loop join.
+ * Success        : Yes
+ */
+
+drop dataverse test if exists;
+create dataverse test;
+use dataverse test;
+
+create type MyRecord as closed {
+  id: int32,
+  point: point,
+  kwds: string,
+  line1: line,
+  line2: line,
+  poly1: polygon,
+  poly2: polygon,
+  rec: rectangle
+}
+
+create dataset MyData1(MyRecord) partitioned by key id;
+create dataset MyData2(MyRecord) partitioned by key id;
+
+load dataset MyData1
+using "edu.uci.ics.asterix.external.dataset.adapter.NCFileSystemAdapter"
+(("path"="nc1://data/spatial/spatialData.json"),("format"="adm")) pre-sorted;
+
+load dataset MyData2
+using "edu.uci.ics.asterix.external.dataset.adapter.NCFileSystemAdapter"
+(("path"="nc1://data/spatial/spatialData.json"),("format"="adm")) pre-sorted;
+
+create index rtree_index on MyData1(point) type rtree;
+
+write output to nc1:"rttest/index-join_rtree-spatial-intersect-point.adm";
+
+for $a in dataset('MyData1')
+for $b in dataset('MyData2')
+where spatial-intersect($a.point, $b.point) and $a.id != $b.id
+order by $a.id, $b.id
+return {"aid": $a.id, "bid": $b.id, "apt": $a.point, "bp": $b.point}
diff --git a/asterix-app/src/test/resources/runtimets/results/index-join/rtree-spatial-intersect-point.adm b/asterix-app/src/test/resources/runtimets/results/index-join/rtree-spatial-intersect-point.adm
new file mode 100644
index 0000000..6e8c011
--- /dev/null
+++ b/asterix-app/src/test/resources/runtimets/results/index-join/rtree-spatial-intersect-point.adm
@@ -0,0 +1,44 @@
+{ "aid": 1, "bid": 17, "apt": point("4.1,7.0"), "bp": point("4.1,7.0") }
+{ "aid": 3, "bid": 4, "apt": point("43.5083,-79.3007"), "bp": point("43.5083,-79.3007") }
+{ "aid": 3, "bid": 5, "apt": point("43.5083,-79.3007"), "bp": point("43.5083,-79.3007") }
+{ "aid": 3, "bid": 6, "apt": point("43.5083,-79.3007"), "bp": point("43.5083,-79.3007") }
+{ "aid": 3, "bid": 7, "apt": point("43.5083,-79.3007"), "bp": point("43.5083,-79.3007") }
+{ "aid": 3, "bid": 8, "apt": point("43.5083,-79.3007"), "bp": point("43.5083,-79.3007") }
+{ "aid": 4, "bid": 3, "apt": point("43.5083,-79.3007"), "bp": point("43.5083,-79.3007") }
+{ "aid": 4, "bid": 5, "apt": point("43.5083,-79.3007"), "bp": point("43.5083,-79.3007") }
+{ "aid": 4, "bid": 6, "apt": point("43.5083,-79.3007"), "bp": point("43.5083,-79.3007") }
+{ "aid": 4, "bid": 7, "apt": point("43.5083,-79.3007"), "bp": point("43.5083,-79.3007") }
+{ "aid": 4, "bid": 8, "apt": point("43.5083,-79.3007"), "bp": point("43.5083,-79.3007") }
+{ "aid": 5, "bid": 3, "apt": point("43.5083,-79.3007"), "bp": point("43.5083,-79.3007") }
+{ "aid": 5, "bid": 4, "apt": point("43.5083,-79.3007"), "bp": point("43.5083,-79.3007") }
+{ "aid": 5, "bid": 6, "apt": point("43.5083,-79.3007"), "bp": point("43.5083,-79.3007") }
+{ "aid": 5, "bid": 7, "apt": point("43.5083,-79.3007"), "bp": point("43.5083,-79.3007") }
+{ "aid": 5, "bid": 8, "apt": point("43.5083,-79.3007"), "bp": point("43.5083,-79.3007") }
+{ "aid": 6, "bid": 3, "apt": point("43.5083,-79.3007"), "bp": point("43.5083,-79.3007") }
+{ "aid": 6, "bid": 4, "apt": point("43.5083,-79.3007"), "bp": point("43.5083,-79.3007") }
+{ "aid": 6, "bid": 5, "apt": point("43.5083,-79.3007"), "bp": point("43.5083,-79.3007") }
+{ "aid": 6, "bid": 7, "apt": point("43.5083,-79.3007"), "bp": point("43.5083,-79.3007") }
+{ "aid": 6, "bid": 8, "apt": point("43.5083,-79.3007"), "bp": point("43.5083,-79.3007") }
+{ "aid": 7, "bid": 3, "apt": point("43.5083,-79.3007"), "bp": point("43.5083,-79.3007") }
+{ "aid": 7, "bid": 4, "apt": point("43.5083,-79.3007"), "bp": point("43.5083,-79.3007") }
+{ "aid": 7, "bid": 5, "apt": point("43.5083,-79.3007"), "bp": point("43.5083,-79.3007") }
+{ "aid": 7, "bid": 6, "apt": point("43.5083,-79.3007"), "bp": point("43.5083,-79.3007") }
+{ "aid": 7, "bid": 8, "apt": point("43.5083,-79.3007"), "bp": point("43.5083,-79.3007") }
+{ "aid": 8, "bid": 3, "apt": point("43.5083,-79.3007"), "bp": point("43.5083,-79.3007") }
+{ "aid": 8, "bid": 4, "apt": point("43.5083,-79.3007"), "bp": point("43.5083,-79.3007") }
+{ "aid": 8, "bid": 5, "apt": point("43.5083,-79.3007"), "bp": point("43.5083,-79.3007") }
+{ "aid": 8, "bid": 6, "apt": point("43.5083,-79.3007"), "bp": point("43.5083,-79.3007") }
+{ "aid": 8, "bid": 7, "apt": point("43.5083,-79.3007"), "bp": point("43.5083,-79.3007") }
+{ "aid": 15, "bid": 16, "apt": point("-2.0,3.0"), "bp": point("-2.0,3.0") }
+{ "aid": 15, "bid": 18, "apt": point("-2.0,3.0"), "bp": point("-2.0,3.0") }
+{ "aid": 15, "bid": 19, "apt": point("-2.0,3.0"), "bp": point("-2.0,3.0") }
+{ "aid": 16, "bid": 15, "apt": point("-2.0,3.0"), "bp": point("-2.0,3.0") }
+{ "aid": 16, "bid": 18, "apt": point("-2.0,3.0"), "bp": point("-2.0,3.0") }
+{ "aid": 16, "bid": 19, "apt": point("-2.0,3.0"), "bp": point("-2.0,3.0") }
+{ "aid": 17, "bid": 1, "apt": point("4.1,7.0"), "bp": point("4.1,7.0") }
+{ "aid": 18, "bid": 15, "apt": point("-2.0,3.0"), "bp": point("-2.0,3.0") }
+{ "aid": 18, "bid": 16, "apt": point("-2.0,3.0"), "bp": point("-2.0,3.0") }
+{ "aid": 18, "bid": 19, "apt": point("-2.0,3.0"), "bp": point("-2.0,3.0") }
+{ "aid": 19, "bid": 15, "apt": point("-2.0,3.0"), "bp": point("-2.0,3.0") }
+{ "aid": 19, "bid": 16, "apt": point("-2.0,3.0"), "bp": point("-2.0,3.0") }
+{ "aid": 19, "bid": 18, "apt": point("-2.0,3.0"), "bp": point("-2.0,3.0") }
\ No newline at end of file
diff --git a/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/declared/AqlMetadataProvider.java b/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/declared/AqlMetadataProvider.java
index b8f0258..e903a9c 100644
--- a/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/declared/AqlMetadataProvider.java
+++ b/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/declared/AqlMetadataProvider.java
@@ -32,7 +32,6 @@
 import edu.uci.ics.asterix.feed.operator.FeedMessageOperatorDescriptor;
 import edu.uci.ics.asterix.formats.base.IDataFormat;
 import edu.uci.ics.asterix.formats.nontagged.AqlBinaryComparatorFactoryProvider;
-import edu.uci.ics.asterix.formats.nontagged.AqlSerializerDeserializerProvider;
 import edu.uci.ics.asterix.formats.nontagged.AqlTypeTraitProvider;
 import edu.uci.ics.asterix.metadata.entities.Dataset;
 import edu.uci.ics.asterix.metadata.entities.ExternalDatasetDetails;
@@ -152,8 +151,8 @@
         String dataverseName = asid.getDataverseName();
         String datasetName = asid.getDatasetName();
         Index primaryIndex = metadata.getDatasetPrimaryIndex(dataverseName, datasetName);
-        return buildBtreeRuntime(jobSpec, outputVars, opSchema, typeEnv, metadata, context, false, datasetName,
-                dataset, primaryIndex.getIndexName(), null, null, true, true);
+        return buildBtreeRuntime(jobSpec, outputVars, opSchema, typeEnv, metadata, context, false, dataset,
+                primaryIndex.getIndexName(), null, null, true, true);
     }
 
     private Pair<IOperatorDescriptor, AlgebricksPartitionConstraint> buildExternalDatasetScan(JobSpecification jobSpec,
@@ -277,9 +276,9 @@
 
     public static Pair<IOperatorDescriptor, AlgebricksPartitionConstraint> buildBtreeRuntime(JobSpecification jobSpec,
             List<LogicalVariable> outputVars, IOperatorSchema opSchema, IVariableTypeEnvironment typeEnv,
-            AqlCompiledMetadataDeclarations metadata, JobGenContext context, boolean retainInput, String datasetName,
-            Dataset dataset, String indexName, int[] lowKeyFields, int[] highKeyFields, boolean lowKeyInclusive,
-            boolean highKeyInclusive) throws AlgebricksException {
+            AqlCompiledMetadataDeclarations metadata, JobGenContext context, boolean retainInput, Dataset dataset,
+            String indexName, int[] lowKeyFields, int[] highKeyFields, boolean lowKeyInclusive, boolean highKeyInclusive)
+            throws AlgebricksException {
         boolean isSecondary = true;
         Index primaryIndex = metadata.getDatasetPrimaryIndex(dataset.getDataverseName(), dataset.getDatasetName());
         if (primaryIndex != null) {
@@ -299,21 +298,15 @@
                 outputVars, keysStartIndex, numKeys, typeEnv, context);
         ITypeTraits[] typeTraits = null;
 
-	if (isSecondary) {
-        	typeTraits = JobGenHelper.variablesToTypeTraits(outputVars, keysStartIndex, numKeys, typeEnv,
-                    context);
+        if (isSecondary) {
+            typeTraits = JobGenHelper.variablesToTypeTraits(outputVars, keysStartIndex, numKeys, typeEnv, context);
         } else {
-        	typeTraits = JobGenHelper.variablesToTypeTraits(outputVars, keysStartIndex, numKeys + 1, typeEnv,
-                    context);
+            typeTraits = JobGenHelper.variablesToTypeTraits(outputVars, keysStartIndex, numKeys + 1, typeEnv, context);
         }
 
         IAsterixApplicationContextInfo appContext = (IAsterixApplicationContextInfo) context.getAppContext();
-        Pair<IFileSplitProvider, AlgebricksPartitionConstraint> spPc;
-        try {
-            spPc = metadata.splitProviderAndPartitionConstraintsForInternalOrFeedDataset(datasetName, indexName);
-        } catch (Exception e) {
-            throw new AlgebricksException(e);
-        }
+        Pair<IFileSplitProvider, AlgebricksPartitionConstraint> spPc = metadata
+                .splitProviderAndPartitionConstraintsForInternalOrFeedDataset(dataset.getDatasetName(), indexName);
         BTreeSearchOperatorDescriptor btreeSearchOp = new BTreeSearchOperatorDescriptor(jobSpec, outputRecDesc,
                 appContext.getStorageManagerInterface(), appContext.getIndexRegistryProvider(), spPc.first, typeTraits,
                 comparatorFactories, lowKeyFields, highKeyFields, lowKeyInclusive, highKeyInclusive,
@@ -321,35 +314,19 @@
         return new Pair<IOperatorDescriptor, AlgebricksPartitionConstraint>(btreeSearchOp, spPc.second);
     }
 
-    @SuppressWarnings("rawtypes")
-    public static Pair<IOperatorDescriptor, AlgebricksPartitionConstraint> buildRtreeRuntime(
-            AqlCompiledMetadataDeclarations metadata, JobGenContext context, JobSpecification jobSpec,
-            String datasetName, Dataset dataset, String indexName, int[] keyFields) throws AlgebricksException {
+    public static Pair<IOperatorDescriptor, AlgebricksPartitionConstraint> buildRtreeRuntime(JobSpecification jobSpec,
+            List<LogicalVariable> outputVars, IOperatorSchema opSchema, IVariableTypeEnvironment typeEnv,
+            AqlCompiledMetadataDeclarations metadata, JobGenContext context, boolean retainInput, Dataset dataset,
+            String indexName, int[] keyFields) throws AlgebricksException {
         ARecordType recType = (ARecordType) metadata.findType(dataset.getItemTypeName());
-        boolean isSecondary = true;
-        Index primaryIndex = metadata.getDatasetPrimaryIndex(dataset.getDataverseName(), dataset.getDatasetName());
-        if (primaryIndex != null) {
-            isSecondary = !indexName.equals(primaryIndex.getIndexName());
-        }
-
         int numPrimaryKeys = DatasetUtils.getPartitioningKeys(dataset).size();
-        ISerializerDeserializer[] recordFields;
-        IBinaryComparatorFactory[] comparatorFactories;
-        ITypeTraits[] typeTraits;
-        IPrimitiveValueProviderFactory[] valueProviderFactories;
-        int numSecondaryKeys = 0;
-        int numNestedSecondaryKeyFields = 0;
-        int i = 0;
-        if (!isSecondary) {
-            throw new AlgebricksException("R-tree can only be used as a secondary index");
-        }
         Index secondaryIndex = metadata.getIndex(dataset.getDataverseName(), dataset.getDatasetName(), indexName);
         if (secondaryIndex == null) {
             throw new AlgebricksException("Code generation error: no index " + indexName + " for dataset "
-                    + datasetName);
+                    + dataset.getDatasetName());
         }
         List<String> secondaryKeyFields = secondaryIndex.getKeyFieldNames();
-        numSecondaryKeys = secondaryKeyFields.size();
+        int numSecondaryKeys = secondaryKeyFields.size();
         if (numSecondaryKeys != 1) {
             throw new AlgebricksException(
                     "Cannot use "
@@ -361,42 +338,27 @@
         if (keyType == null) {
             throw new AlgebricksException("Could not find field " + secondaryKeyFields.get(0) + " in the schema.");
         }
-        int dimension = NonTaggedFormatUtil.getNumDimensions(keyType.getTypeTag());
-        numNestedSecondaryKeyFields = dimension * 2;
-
-        int numFields = numNestedSecondaryKeyFields + numPrimaryKeys;
-        recordFields = new ISerializerDeserializer[numFields];
-        typeTraits = new ITypeTraits[numFields];
-        comparatorFactories = new IBinaryComparatorFactory[numNestedSecondaryKeyFields];
-        valueProviderFactories = new IPrimitiveValueProviderFactory[numNestedSecondaryKeyFields];
-
-        IAType nestedKeyType = NonTaggedFormatUtil.getNestedSpatialType(keyType.getTypeTag());
-        for (i = 0; i < numNestedSecondaryKeyFields; i++) {
-            ISerializerDeserializer keySerde = AqlSerializerDeserializerProvider.INSTANCE
-                    .getSerializerDeserializer(nestedKeyType);
-            recordFields[i] = keySerde;
-            comparatorFactories[i] = AqlBinaryComparatorFactoryProvider.INSTANCE.getBinaryComparatorFactory(
-                    nestedKeyType, true);
-            typeTraits[i] = AqlTypeTraitProvider.INSTANCE.getTypeTrait(nestedKeyType);
+        int numDimensions = NonTaggedFormatUtil.getNumDimensions(keyType.getTypeTag());
+        int numNestedSecondaryKeyFields = numDimensions * 2;
+        IPrimitiveValueProviderFactory[] valueProviderFactories = new IPrimitiveValueProviderFactory[numNestedSecondaryKeyFields];
+        for (int i = 0; i < numNestedSecondaryKeyFields; i++) {
             valueProviderFactories[i] = AqlPrimitiveValueProviderFactory.INSTANCE;
         }
-
-        List<String> partitioningKeys = DatasetUtils.getPartitioningKeys(dataset);
-        for (String partitioningKey : partitioningKeys) {
-            IAType type = recType.getFieldType(partitioningKey);
-            ISerializerDeserializer keySerde = AqlSerializerDeserializerProvider.INSTANCE
-                    .getSerializerDeserializer(type);
-            recordFields[i] = keySerde;
-            typeTraits[i] = AqlTypeTraitProvider.INSTANCE.getTypeTrait(type);
-            ++i;
+        RecordDescriptor outputRecDesc = JobGenHelper.mkRecordDescriptor(typeEnv, opSchema, context);
+        int keysStartIndex = outputRecDesc.getFieldCount() - numNestedSecondaryKeyFields - numPrimaryKeys;
+        if (retainInput) {
+            keysStartIndex -= numNestedSecondaryKeyFields;
         }
+        IBinaryComparatorFactory[] comparatorFactories = JobGenHelper.variablesToAscBinaryComparatorFactories(
+                outputVars, keysStartIndex, numNestedSecondaryKeyFields, typeEnv, context);
+        ITypeTraits[] typeTraits = JobGenHelper.variablesToTypeTraits(outputVars, keysStartIndex,
+                numNestedSecondaryKeyFields, typeEnv, context);
         IAsterixApplicationContextInfo appContext = (IAsterixApplicationContextInfo) context.getAppContext();
-        RecordDescriptor recDesc = new RecordDescriptor(recordFields);
         Pair<IFileSplitProvider, AlgebricksPartitionConstraint> spPc = metadata
-                .splitProviderAndPartitionConstraintsForInternalOrFeedDataset(datasetName, indexName);
-        RTreeSearchOperatorDescriptor rtreeSearchOp = new RTreeSearchOperatorDescriptor(jobSpec, recDesc,
+                .splitProviderAndPartitionConstraintsForInternalOrFeedDataset(dataset.getDatasetName(), indexName);
+        RTreeSearchOperatorDescriptor rtreeSearchOp = new RTreeSearchOperatorDescriptor(jobSpec, outputRecDesc,
                 appContext.getStorageManagerInterface(), appContext.getIndexRegistryProvider(), spPc.first, typeTraits,
-                comparatorFactories, keyFields, new RTreeDataflowHelperFactory(valueProviderFactories), false,
+                comparatorFactories, keyFields, new RTreeDataflowHelperFactory(valueProviderFactories), retainInput,
                 NoOpOperationCallbackProvider.INSTANCE);
         return new Pair<IOperatorDescriptor, AlgebricksPartitionConstraint>(rtreeSearchOp, spPc.second);
     }