Took care of Vinayak's review.

git-svn-id: https://asterixdb.googlecode.com/svn/branches/asterix-fix-issue-9@326 eaa15691-b419-025a-1212-ee371bd00084
diff --git a/asterix-app/src/main/java/edu/uci/ics/asterix/file/IndexOperations.java b/asterix-app/src/main/java/edu/uci/ics/asterix/file/IndexOperations.java
index 103e429..7e8ede1 100644
--- a/asterix-app/src/main/java/edu/uci/ics/asterix/file/IndexOperations.java
+++ b/asterix-app/src/main/java/edu/uci/ics/asterix/file/IndexOperations.java
@@ -57,52 +57,4 @@
 
         return spec;
     }
-
-    public static void main(String[] args) throws Exception {
-        String host;
-        String appName;
-        String ddlFile;
-
-        switch (args.length) {
-            case 0: {
-                host = "127.0.0.1";
-                appName = "asterix";
-                ddlFile = "/home/abehm/workspace/asterix/asterix-app/src/test/resources/demo0927/local/create-index.aql";
-                System.out.println("No arguments specified, using defauls:");
-                System.out.println("HYRACKS HOST: " + host);
-                System.out.println("APPNAME:      " + appName);
-                System.out.println("DDLFILE:      " + ddlFile);
-            }
-                break;
-
-            case 3: {
-                host = args[0];
-                appName = args[1];
-                ddlFile = args[2];
-            }
-                break;
-
-            default: {
-                System.out.println("USAGE:");
-                System.out.println("ARG 1: Hyracks Host (IP or Hostname)");
-                System.out.println("ARG 2: Application Name (e.g., asterix)");
-                System.out.println("ARG 3: DDL File");
-                host = null;
-                appName = null;
-                ddlFile = null;
-                System.exit(0);
-            }
-                break;
-
-        }
-
-        // int port = HyracksIntegrationUtil.DEFAULT_HYRACKS_CC_PORT;
-
-        // AsterixJavaClient q = compileQuery(ddlFile, true, false, true);
-
-        // long start = System.currentTimeMillis();
-        // q.execute(port);
-        // long end = System.currentTimeMillis();
-        // System.err.println(start + " " + end + " " + (end - start));
-    }
-}
+}
\ No newline at end of file
diff --git a/asterix-hyracks-glue/src/main/java/edu/uci/ics/asterix/runtime/transaction/TreeIndexInsertUpdateDeleteOperatorDescriptor.java b/asterix-hyracks-glue/src/main/java/edu/uci/ics/asterix/runtime/transaction/TreeIndexInsertUpdateDeleteOperatorDescriptor.java
index 02b688b..7d1f64f 100644
--- a/asterix-hyracks-glue/src/main/java/edu/uci/ics/asterix/runtime/transaction/TreeIndexInsertUpdateDeleteOperatorDescriptor.java
+++ b/asterix-hyracks-glue/src/main/java/edu/uci/ics/asterix/runtime/transaction/TreeIndexInsertUpdateDeleteOperatorDescriptor.java
@@ -46,36 +46,32 @@
 
     private final long transactionId;
 
-	/*
-	 * TODO: Index operators should live in Hyracks. Right now, they are needed
-	 * here in Asterix as a hack to provide transactionIDs. The Asterix verions
-	 * of this operator will disappear and the operator will come from Hyracks
-	 * once the LSM/Recovery/Transactions world has been introduced.
-	 */
-	public TreeIndexInsertUpdateDeleteOperatorDescriptor(JobSpecification spec,
-			RecordDescriptor recDesc, IStorageManagerInterface storageManager,
-			IIndexRegistryProvider<IIndex> indexRegistryProvider,
-			IFileSplitProvider fileSplitProvider, ITypeTraits[] typeTraits,
-			IBinaryComparatorFactory[] comparatorFactories,
-			int[] fieldPermutation, IndexOp op,
-			IIndexDataflowHelperFactory dataflowHelperFactory,
-			ITupleFilterFactory tupleFilterFactory,
-			IOperationCallbackProvider opCallbackProvider, long transactionId) {
-		super(spec, 1, 1, recDesc, storageManager, indexRegistryProvider,
-				fileSplitProvider, typeTraits, comparatorFactories,
-				dataflowHelperFactory, tupleFilterFactory, opCallbackProvider);
-		this.fieldPermutation = fieldPermutation;
-		this.op = op;
-		this.transactionId = transactionId;
-	}
+    /**
+     * TODO: Index operators should live in Hyracks. Right now, they are needed
+     * here in Asterix as a hack to provide transactionIDs. The Asterix verions
+     * of this operator will disappear and the operator will come from Hyracks
+     * once the LSM/Recovery/Transactions world has been introduced.
+     */
+    public TreeIndexInsertUpdateDeleteOperatorDescriptor(JobSpecification spec, RecordDescriptor recDesc,
+            IStorageManagerInterface storageManager, IIndexRegistryProvider<IIndex> indexRegistryProvider,
+            IFileSplitProvider fileSplitProvider, ITypeTraits[] typeTraits,
+            IBinaryComparatorFactory[] comparatorFactories, int[] fieldPermutation, IndexOp op,
+            IIndexDataflowHelperFactory dataflowHelperFactory, ITupleFilterFactory tupleFilterFactory,
+            IOperationCallbackProvider opCallbackProvider, long transactionId) {
+        super(spec, 1, 1, recDesc, storageManager, indexRegistryProvider, fileSplitProvider, typeTraits,
+                comparatorFactories, dataflowHelperFactory, tupleFilterFactory, opCallbackProvider);
+        this.fieldPermutation = fieldPermutation;
+        this.op = op;
+        this.transactionId = transactionId;
+    }
 
     @Override
     public IOperatorNodePushable createPushRuntime(IHyracksTaskContext ctx,
             IRecordDescriptorProvider recordDescProvider, int partition, int nPartitions) {
         TransactionContext txnContext;
         try {
-            ITransactionManager transactionManager = ((AsterixAppRuntimeContext) ctx.getJobletContext().getApplicationContext()
-                    .getApplicationObject()).getTransactionProvider().getTransactionManager();
+            ITransactionManager transactionManager = ((AsterixAppRuntimeContext) ctx.getJobletContext()
+                    .getApplicationContext().getApplicationObject()).getTransactionProvider().getTransactionManager();
             txnContext = transactionManager.getTransactionContext(transactionId);
         } catch (ACIDException ae) {
             throw new RuntimeException(" could not obtain context for invalid transaction id " + transactionId);
diff --git a/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/declared/AqlMetadataProvider.java b/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/declared/AqlMetadataProvider.java
index 856028c..d8e340c 100644
--- a/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/declared/AqlMetadataProvider.java
+++ b/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/declared/AqlMetadataProvider.java
@@ -89,1011 +89,839 @@
 import edu.uci.ics.hyracks.storage.am.rtree.dataflow.RTreeDataflowHelperFactory;
 import edu.uci.ics.hyracks.storage.am.rtree.dataflow.RTreeSearchOperatorDescriptor;
 
-public class AqlMetadataProvider implements
-		IMetadataProvider<AqlSourceId, String> {
-	private final long txnId;
-	private boolean isWriteTransaction;
-	private final AqlCompiledMetadataDeclarations metadata;
+public class AqlMetadataProvider implements IMetadataProvider<AqlSourceId, String> {
+    private final long txnId;
+    private boolean isWriteTransaction;
+    private final AqlCompiledMetadataDeclarations metadata;
 
-	public AqlMetadataProvider(long txnId, boolean isWriteTransaction,
-			AqlCompiledMetadataDeclarations metadata) {
-		this.txnId = txnId;
-		this.isWriteTransaction = isWriteTransaction;
-		this.metadata = metadata;
-	}
+    public AqlMetadataProvider(long txnId, boolean isWriteTransaction, AqlCompiledMetadataDeclarations metadata) {
+        this.txnId = txnId;
+        this.isWriteTransaction = isWriteTransaction;
+        this.metadata = metadata;
+    }
 
-	@Override
-	public AqlDataSource findDataSource(AqlSourceId id)
-			throws AlgebricksException {
-		AqlSourceId aqlId = (AqlSourceId) id;
-		return lookupSourceInMetadata(metadata, aqlId);
-	}
+    @Override
+    public AqlDataSource findDataSource(AqlSourceId id) throws AlgebricksException {
+        AqlSourceId aqlId = (AqlSourceId) id;
+        return lookupSourceInMetadata(metadata, aqlId);
+    }
 
-	public AqlCompiledMetadataDeclarations getMetadataDeclarations() {
-		return metadata;
-	}
+    public AqlCompiledMetadataDeclarations getMetadataDeclarations() {
+        return metadata;
+    }
 
-	public boolean isWriteTransaction() {
-		return isWriteTransaction;
-	}
+    public boolean isWriteTransaction() {
+        return isWriteTransaction;
+    }
 
-	@Override
-	public Pair<IOperatorDescriptor, AlgebricksPartitionConstraint> getScannerRuntime(
-			IDataSource<AqlSourceId> dataSource,
-			List<LogicalVariable> scanVariables,
-			List<LogicalVariable> projectVariables, boolean projectPushed,
-			JobGenContext context, JobSpecification jobSpec)
-			throws AlgebricksException {
-		AqlCompiledDatasetDecl adecl = metadata.findDataset(dataSource.getId()
-				.getDatasetName());
-		if (adecl == null) {
-			throw new AlgebricksException("Unknown dataset "
-					+ dataSource.getId().getDatasetName());
-		}
-		switch (adecl.getDatasetType()) {
-		case FEED:
-			if (dataSource instanceof ExternalFeedDataSource) {
-				return buildExternalDatasetScan(jobSpec, adecl, dataSource);
-			} else {
-				return buildInternalDatasetScan(jobSpec, adecl, dataSource,
-						context);
-			}
+    @Override
+    public Pair<IOperatorDescriptor, AlgebricksPartitionConstraint> getScannerRuntime(
+            IDataSource<AqlSourceId> dataSource, List<LogicalVariable> scanVariables,
+            List<LogicalVariable> projectVariables, boolean projectPushed, JobGenContext context,
+            JobSpecification jobSpec) throws AlgebricksException {
+        AqlCompiledDatasetDecl adecl = metadata.findDataset(dataSource.getId().getDatasetName());
+        if (adecl == null) {
+            throw new AlgebricksException("Unknown dataset " + dataSource.getId().getDatasetName());
+        }
+        switch (adecl.getDatasetType()) {
+            case FEED:
+                if (dataSource instanceof ExternalFeedDataSource) {
+                    return buildExternalDatasetScan(jobSpec, adecl, dataSource);
+                } else {
+                    return buildInternalDatasetScan(jobSpec, adecl, dataSource, context);
+                }
 
-		case INTERNAL: {
-			return buildInternalDatasetScan(jobSpec, adecl, dataSource, context);
-		}
+            case INTERNAL: {
+                return buildInternalDatasetScan(jobSpec, adecl, dataSource, context);
+            }
 
-		case EXTERNAL: {
-			return buildExternalDatasetScan(jobSpec, adecl, dataSource);
-		}
+            case EXTERNAL: {
+                return buildExternalDatasetScan(jobSpec, adecl, dataSource);
+            }
 
-		default: {
-			throw new IllegalArgumentException();
-		}
-		}
-	}
+            default: {
+                throw new IllegalArgumentException();
+            }
+        }
+    }
 
-	private Pair<IOperatorDescriptor, AlgebricksPartitionConstraint> buildInternalDatasetScan(
-			JobSpecification jobSpec, AqlCompiledDatasetDecl acedl,
-			IDataSource<AqlSourceId> dataSource, JobGenContext context)
-			throws AlgebricksException {
-		AqlSourceId asid = dataSource.getId();
-		String datasetName = asid.getDatasetName();
-		String indexName = DatasetUtils.getPrimaryIndex(acedl).getIndexName();
+    private Pair<IOperatorDescriptor, AlgebricksPartitionConstraint> buildInternalDatasetScan(JobSpecification jobSpec,
+            AqlCompiledDatasetDecl acedl, IDataSource<AqlSourceId> dataSource, JobGenContext context)
+            throws AlgebricksException {
+        AqlSourceId asid = dataSource.getId();
+        String datasetName = asid.getDatasetName();
+        String indexName = DatasetUtils.getPrimaryIndex(acedl).getIndexName();
 
-		try {
-			return buildBtreeRuntime(metadata, context, jobSpec, datasetName,
-					acedl, indexName, null, null, true, true);
-		} catch (AlgebricksException e) {
-			throw new AlgebricksException(e);
-		}
-	}
+        try {
+            return buildBtreeRuntime(metadata, context, jobSpec, datasetName, acedl, indexName, null, null, true, true);
+        } catch (AlgebricksException e) {
+            throw new AlgebricksException(e);
+        }
+    }
 
-	private Pair<IOperatorDescriptor, AlgebricksPartitionConstraint> buildExternalDatasetScan(
-			JobSpecification jobSpec, AqlCompiledDatasetDecl acedl,
-			IDataSource<AqlSourceId> dataSource) throws AlgebricksException {
-		String itemTypeName = acedl.getItemTypeName();
-		IAType itemType;
-		try {
-			itemType = metadata.findType(itemTypeName);
-		} catch (Exception e) {
-			throw new AlgebricksException(e);
-		}
+    private Pair<IOperatorDescriptor, AlgebricksPartitionConstraint> buildExternalDatasetScan(JobSpecification jobSpec,
+            AqlCompiledDatasetDecl acedl, IDataSource<AqlSourceId> dataSource) throws AlgebricksException {
+        String itemTypeName = acedl.getItemTypeName();
+        IAType itemType;
+        try {
+            itemType = metadata.findType(itemTypeName);
+        } catch (Exception e) {
+            throw new AlgebricksException(e);
+        }
 
-		if (dataSource instanceof ExternalFeedDataSource) {
-			AqlCompiledFeedDatasetDetails acfdd = (AqlCompiledFeedDatasetDetails) ((ExternalFeedDataSource) dataSource)
-					.getCompiledDatasetDecl().getAqlCompiledDatasetDetails();
+        if (dataSource instanceof ExternalFeedDataSource) {
+            AqlCompiledFeedDatasetDetails acfdd = (AqlCompiledFeedDatasetDetails) ((ExternalFeedDataSource) dataSource)
+                    .getCompiledDatasetDecl().getAqlCompiledDatasetDetails();
 
-			return buildFeedIntakeRuntime(jobSpec, metadata.getDataverseName(),
-					acedl.getName(), itemType, acfdd, metadata.getFormat());
-		} else {
-			return buildExternalDataScannerRuntime(jobSpec, itemType,
-					(AqlCompiledExternalDatasetDetails) acedl
-							.getAqlCompiledDatasetDetails(), metadata
-							.getFormat());
-		}
-	}
+            return buildFeedIntakeRuntime(jobSpec, metadata.getDataverseName(), acedl.getName(), itemType, acfdd,
+                    metadata.getFormat());
+        } else {
+            return buildExternalDataScannerRuntime(jobSpec, itemType,
+                    (AqlCompiledExternalDatasetDetails) acedl.getAqlCompiledDatasetDetails(), metadata.getFormat());
+        }
+    }
 
-	@SuppressWarnings("rawtypes")
-	public static Pair<IOperatorDescriptor, AlgebricksPartitionConstraint> buildExternalDataScannerRuntime(
-			JobSpecification jobSpec, IAType itemType,
-			AqlCompiledExternalDatasetDetails decl, IDataFormat format)
-			throws AlgebricksException {
-		if (itemType.getTypeTag() != ATypeTag.RECORD) {
-			throw new AlgebricksException("Can only scan datasets of records.");
-		}
+    @SuppressWarnings("rawtypes")
+    public static Pair<IOperatorDescriptor, AlgebricksPartitionConstraint> buildExternalDataScannerRuntime(
+            JobSpecification jobSpec, IAType itemType, AqlCompiledExternalDatasetDetails decl, IDataFormat format)
+            throws AlgebricksException {
+        if (itemType.getTypeTag() != ATypeTag.RECORD) {
+            throw new AlgebricksException("Can only scan datasets of records.");
+        }
 
-		IDatasourceReadAdapter adapter;
-		try {
-			adapter = (IDatasourceReadAdapter) Class.forName(decl.getAdapter())
-					.newInstance();
-		} catch (Exception e) {
-			e.printStackTrace();
-			throw new AlgebricksException("unable to load the adapter class "
-					+ e);
-		}
+        IDatasourceReadAdapter adapter;
+        try {
+            adapter = (IDatasourceReadAdapter) Class.forName(decl.getAdapter()).newInstance();
+        } catch (Exception e) {
+            e.printStackTrace();
+            throw new AlgebricksException("unable to load the adapter class " + e);
+        }
 
-		if (!(adapter.getAdapterType().equals(
-				IDatasourceAdapter.AdapterType.READ) || adapter
-				.getAdapterType().equals(
-						IDatasourceAdapter.AdapterType.READ_WRITE))) {
-			throw new AlgebricksException(
-					"external dataset adapter does not support read operation");
-		}
-		ARecordType rt = (ARecordType) itemType;
+        if (!(adapter.getAdapterType().equals(IDatasourceAdapter.AdapterType.READ) || adapter.getAdapterType().equals(
+                IDatasourceAdapter.AdapterType.READ_WRITE))) {
+            throw new AlgebricksException("external dataset adapter does not support read operation");
+        }
+        ARecordType rt = (ARecordType) itemType;
 
-		try {
-			adapter.configure(decl.getProperties(), itemType);
-		} catch (Exception e) {
-			e.printStackTrace();
-			throw new AlgebricksException(
-					"unable to configure the datasource adapter " + e);
-		}
+        try {
+            adapter.configure(decl.getProperties(), itemType);
+        } catch (Exception e) {
+            e.printStackTrace();
+            throw new AlgebricksException("unable to configure the datasource adapter " + e);
+        }
 
-		ISerializerDeserializer payloadSerde = format.getSerdeProvider()
-				.getSerializerDeserializer(itemType);
-		RecordDescriptor scannerDesc = new RecordDescriptor(
-				new ISerializerDeserializer[] { payloadSerde });
+        ISerializerDeserializer payloadSerde = format.getSerdeProvider().getSerializerDeserializer(itemType);
+        RecordDescriptor scannerDesc = new RecordDescriptor(new ISerializerDeserializer[] { payloadSerde });
 
-		ExternalDataScanOperatorDescriptor dataScanner = new ExternalDataScanOperatorDescriptor(
-				jobSpec, decl.getAdapter(), decl.getProperties(), rt,
-				scannerDesc);
-		dataScanner.setDatasourceAdapter(adapter);
-		AlgebricksPartitionConstraint constraint = adapter
-				.getPartitionConstraint();
-		return new Pair<IOperatorDescriptor, AlgebricksPartitionConstraint>(
-				dataScanner, constraint);
-	}
+        ExternalDataScanOperatorDescriptor dataScanner = new ExternalDataScanOperatorDescriptor(jobSpec,
+                decl.getAdapter(), decl.getProperties(), rt, scannerDesc);
+        dataScanner.setDatasourceAdapter(adapter);
+        AlgebricksPartitionConstraint constraint = adapter.getPartitionConstraint();
+        return new Pair<IOperatorDescriptor, AlgebricksPartitionConstraint>(dataScanner, constraint);
+    }
 
-	@SuppressWarnings("rawtypes")
-	public static Pair<IOperatorDescriptor, AlgebricksPartitionConstraint> buildScannerRuntime(
-			JobSpecification jobSpec, IAType itemType,
-			IParseFileSplitsDecl decl, IDataFormat format)
-			throws AlgebricksException {
-		if (itemType.getTypeTag() != ATypeTag.RECORD) {
-			throw new AlgebricksException("Can only scan datasets of records.");
-		}
-		ARecordType rt = (ARecordType) itemType;
-		ITupleParserFactory tupleParser = format.createTupleParser(rt, decl);
-		FileSplit[] splits = decl.getSplits();
-		IFileSplitProvider scannerSplitProvider = new ConstantFileSplitProvider(
-				splits);
-		ISerializerDeserializer payloadSerde = format.getSerdeProvider()
-				.getSerializerDeserializer(itemType);
-		RecordDescriptor scannerDesc = new RecordDescriptor(
-				new ISerializerDeserializer[] { payloadSerde });
-		IOperatorDescriptor scanner = new FileScanOperatorDescriptor(jobSpec,
-				scannerSplitProvider, tupleParser, scannerDesc);
-		String[] locs = new String[splits.length];
-		for (int i = 0; i < splits.length; i++) {
-			locs[i] = splits[i].getNodeName();
-		}
-		AlgebricksPartitionConstraint apc = new AlgebricksAbsolutePartitionConstraint(
-				locs);
-		return new Pair<IOperatorDescriptor, AlgebricksPartitionConstraint>(
-				scanner, apc);
-	}
+    @SuppressWarnings("rawtypes")
+    public static Pair<IOperatorDescriptor, AlgebricksPartitionConstraint> buildScannerRuntime(
+            JobSpecification jobSpec, IAType itemType, IParseFileSplitsDecl decl, IDataFormat format)
+            throws AlgebricksException {
+        if (itemType.getTypeTag() != ATypeTag.RECORD) {
+            throw new AlgebricksException("Can only scan datasets of records.");
+        }
+        ARecordType rt = (ARecordType) itemType;
+        ITupleParserFactory tupleParser = format.createTupleParser(rt, decl);
+        FileSplit[] splits = decl.getSplits();
+        IFileSplitProvider scannerSplitProvider = new ConstantFileSplitProvider(splits);
+        ISerializerDeserializer payloadSerde = format.getSerdeProvider().getSerializerDeserializer(itemType);
+        RecordDescriptor scannerDesc = new RecordDescriptor(new ISerializerDeserializer[] { payloadSerde });
+        IOperatorDescriptor scanner = new FileScanOperatorDescriptor(jobSpec, scannerSplitProvider, tupleParser,
+                scannerDesc);
+        String[] locs = new String[splits.length];
+        for (int i = 0; i < splits.length; i++) {
+            locs[i] = splits[i].getNodeName();
+        }
+        AlgebricksPartitionConstraint apc = new AlgebricksAbsolutePartitionConstraint(locs);
+        return new Pair<IOperatorDescriptor, AlgebricksPartitionConstraint>(scanner, apc);
+    }
 
-	@SuppressWarnings("rawtypes")
-	public static Pair<IOperatorDescriptor, AlgebricksPartitionConstraint> buildFeedIntakeRuntime(
-			JobSpecification jobSpec, String dataverse, String dataset,
-			IAType itemType, AqlCompiledFeedDatasetDetails decl,
-			IDataFormat format) throws AlgebricksException {
-		if (itemType.getTypeTag() != ATypeTag.RECORD) {
-			throw new AlgebricksException("Can only consume records.");
-		}
-		IDatasourceAdapter adapter;
-		try {
-			adapter = (IDatasourceAdapter) Class.forName(decl.getAdapter())
-					.newInstance();
-		} catch (Exception e) {
-			e.printStackTrace();
-			throw new AlgebricksException("unable to load the adapter class "
-					+ e);
-		}
+    @SuppressWarnings("rawtypes")
+    public static Pair<IOperatorDescriptor, AlgebricksPartitionConstraint> buildFeedIntakeRuntime(
+            JobSpecification jobSpec, String dataverse, String dataset, IAType itemType,
+            AqlCompiledFeedDatasetDetails decl, IDataFormat format) throws AlgebricksException {
+        if (itemType.getTypeTag() != ATypeTag.RECORD) {
+            throw new AlgebricksException("Can only consume records.");
+        }
+        IDatasourceAdapter adapter;
+        try {
+            adapter = (IDatasourceAdapter) Class.forName(decl.getAdapter()).newInstance();
+        } catch (Exception e) {
+            e.printStackTrace();
+            throw new AlgebricksException("unable to load the adapter class " + e);
+        }
 
-		ARecordType rt = (ARecordType) itemType;
-		try {
-			adapter.configure(decl.getProperties(), itemType);
-		} catch (Exception e) {
-			e.printStackTrace();
-			throw new AlgebricksException(
-					"unable to configure the datasource adapter " + e);
-		}
+        ARecordType rt = (ARecordType) itemType;
+        try {
+            adapter.configure(decl.getProperties(), itemType);
+        } catch (Exception e) {
+            e.printStackTrace();
+            throw new AlgebricksException("unable to configure the datasource adapter " + e);
+        }
 
-		ISerializerDeserializer payloadSerde = format.getSerdeProvider()
-				.getSerializerDeserializer(itemType);
-		RecordDescriptor feedDesc = new RecordDescriptor(
-				new ISerializerDeserializer[] { payloadSerde });
+        ISerializerDeserializer payloadSerde = format.getSerdeProvider().getSerializerDeserializer(itemType);
+        RecordDescriptor feedDesc = new RecordDescriptor(new ISerializerDeserializer[] { payloadSerde });
 
-		FeedIntakeOperatorDescriptor feedIngestor = new FeedIntakeOperatorDescriptor(
-				jobSpec, new FeedId(dataverse, dataset), decl.getAdapter(),
-				decl.getProperties(), rt, feedDesc);
+        FeedIntakeOperatorDescriptor feedIngestor = new FeedIntakeOperatorDescriptor(jobSpec, new FeedId(dataverse,
+                dataset), decl.getAdapter(), decl.getProperties(), rt, feedDesc);
 
-		AlgebricksPartitionConstraint constraint = adapter
-				.getPartitionConstraint();
-		return new Pair<IOperatorDescriptor, AlgebricksPartitionConstraint>(
-				feedIngestor, constraint);
-	}
+        AlgebricksPartitionConstraint constraint = adapter.getPartitionConstraint();
+        return new Pair<IOperatorDescriptor, AlgebricksPartitionConstraint>(feedIngestor, constraint);
+    }
 
-	@SuppressWarnings("rawtypes")
-	public static Pair<IOperatorDescriptor, AlgebricksPartitionConstraint> buildFeedMessengerRuntime(
-			JobSpecification jobSpec, AqlCompiledMetadataDeclarations metadata,
-			AqlCompiledFeedDatasetDetails decl, String dataverse,
-			String dataset, List<IFeedMessage> feedMessages)
-			throws AlgebricksException {
+    @SuppressWarnings("rawtypes")
+    public static Pair<IOperatorDescriptor, AlgebricksPartitionConstraint> buildFeedMessengerRuntime(
+            JobSpecification jobSpec, AqlCompiledMetadataDeclarations metadata, AqlCompiledFeedDatasetDetails decl,
+            String dataverse, String dataset, List<IFeedMessage> feedMessages) throws AlgebricksException {
 
-		Pair<IFileSplitProvider, AlgebricksPartitionConstraint> spPc;
-		try {
-			spPc = metadata
-					.splitProviderAndPartitionConstraintsForInternalOrFeedDataset(
-							dataset, dataset);
-		} catch (Exception e) {
-			throw new AlgebricksException(e);
-		}
+        Pair<IFileSplitProvider, AlgebricksPartitionConstraint> spPc;
+        try {
+            spPc = metadata.splitProviderAndPartitionConstraintsForInternalOrFeedDataset(dataset, dataset);
+        } catch (Exception e) {
+            throw new AlgebricksException(e);
+        }
 
-		FeedMessageOperatorDescriptor feedMessenger = new FeedMessageOperatorDescriptor(
-				jobSpec, dataverse, dataset, feedMessages);
+        FeedMessageOperatorDescriptor feedMessenger = new FeedMessageOperatorDescriptor(jobSpec, dataverse, dataset,
+                feedMessages);
 
-		return new Pair<IOperatorDescriptor, AlgebricksPartitionConstraint>(
-				feedMessenger, spPc.second);
-	}
+        return new Pair<IOperatorDescriptor, AlgebricksPartitionConstraint>(feedMessenger, spPc.second);
+    }
 
-	@SuppressWarnings("rawtypes")
-	public static Pair<IOperatorDescriptor, AlgebricksPartitionConstraint> buildBtreeRuntime(
-			AqlCompiledMetadataDeclarations metadata, JobGenContext context,
-			JobSpecification jobSpec, String datasetName,
-			AqlCompiledDatasetDecl ddecl, String indexName, int[] lowKeyFields,
-			int[] highKeyFields, boolean lowKeyInclusive,
-			boolean highKeyInclusive) throws AlgebricksException {
-		String itemTypeName = ddecl.getItemTypeName();
-		IAType itemType;
-		try {
-			itemType = metadata.findType(itemTypeName);
-		} catch (Exception e) {
-			throw new AlgebricksException(e);
-		}
+    @SuppressWarnings("rawtypes")
+    public static Pair<IOperatorDescriptor, AlgebricksPartitionConstraint> buildBtreeRuntime(
+            AqlCompiledMetadataDeclarations metadata, JobGenContext context, JobSpecification jobSpec,
+            String datasetName, AqlCompiledDatasetDecl ddecl, String indexName, int[] lowKeyFields,
+            int[] highKeyFields, boolean lowKeyInclusive, boolean highKeyInclusive) throws AlgebricksException {
+        String itemTypeName = ddecl.getItemTypeName();
+        IAType itemType;
+        try {
+            itemType = metadata.findType(itemTypeName);
+        } catch (Exception e) {
+            throw new AlgebricksException(e);
+        }
 
-		boolean isSecondary = true;
-		AqlCompiledIndexDecl primIdxDecl = DatasetUtils.getPrimaryIndex(ddecl);
+        boolean isSecondary = true;
+        AqlCompiledIndexDecl primIdxDecl = DatasetUtils.getPrimaryIndex(ddecl);
 
-		if (primIdxDecl != null) {
-			isSecondary = !indexName.equals(primIdxDecl.getIndexName());
-		}
+        if (primIdxDecl != null) {
+            isSecondary = !indexName.equals(primIdxDecl.getIndexName());
+        }
 
-		int numPrimaryKeys = DatasetUtils.getPartitioningFunctions(ddecl)
-				.size();
-		ISerializerDeserializer[] recordFields;
-		IBinaryComparatorFactory[] comparatorFactories;
-		ITypeTraits[] typeTraits;
-		int numSecondaryKeys = 0;
-		int i = 0;
-		if (isSecondary) {
-			AqlCompiledIndexDecl cid = DatasetUtils.findSecondaryIndexByName(
-					ddecl, indexName);
-			if (cid == null) {
-				throw new AlgebricksException(
-						"Code generation error: no index " + indexName
-								+ " for dataset " + datasetName);
-			}
-			List<String> secondaryKeyFields = cid.getFieldExprs();
-			numSecondaryKeys = secondaryKeyFields.size();
-			int numKeys = numSecondaryKeys + numPrimaryKeys;
-			recordFields = new ISerializerDeserializer[numKeys];
-			typeTraits = new ITypeTraits[numKeys];
-			// comparatorFactories = new
-			// IBinaryComparatorFactory[numSecondaryKeys];
-			comparatorFactories = new IBinaryComparatorFactory[numKeys];
-			if (itemType.getTypeTag() != ATypeTag.RECORD) {
-				throw new AlgebricksException(
-						"Only record types can be indexed.");
-			}
-			ARecordType recType = (ARecordType) itemType;
-			for (i = 0; i < numSecondaryKeys; i++) {
-				Pair<IAType, Boolean> keyTypePair = AqlCompiledIndexDecl.getNonNullableKeyFieldType(secondaryKeyFields.get(i), recType);
-				IAType keyType = keyTypePair.first;
-				ISerializerDeserializer keySerde = metadata.getFormat()
-						.getSerdeProvider().getSerializerDeserializer(keyType);
-				recordFields[i] = keySerde;
-				comparatorFactories[i] = AqlBinaryComparatorFactoryProvider.INSTANCE
-						.getBinaryComparatorFactory(keyType, true);
-				typeTraits[i] = AqlTypeTraitProvider.INSTANCE
-						.getTypeTrait(keyType);
-			}
-		} else {
-			recordFields = new ISerializerDeserializer[numPrimaryKeys + 1];
-			comparatorFactories = new IBinaryComparatorFactory[numPrimaryKeys];
-			typeTraits = new ITypeTraits[numPrimaryKeys + 1];
-			ISerializerDeserializer payloadSerde = metadata.getFormat()
-					.getSerdeProvider().getSerializerDeserializer(itemType);
-			recordFields[numPrimaryKeys] = payloadSerde;
-			typeTraits[numPrimaryKeys] = AqlTypeTraitProvider.INSTANCE
-					.getTypeTrait(itemType);
-		}
+        int numPrimaryKeys = DatasetUtils.getPartitioningFunctions(ddecl).size();
+        ISerializerDeserializer[] recordFields;
+        IBinaryComparatorFactory[] comparatorFactories;
+        ITypeTraits[] typeTraits;
+        int numSecondaryKeys = 0;
+        int i = 0;
+        if (isSecondary) {
+            AqlCompiledIndexDecl cid = DatasetUtils.findSecondaryIndexByName(ddecl, indexName);
+            if (cid == null) {
+                throw new AlgebricksException("Code generation error: no index " + indexName + " for dataset "
+                        + datasetName);
+            }
+            List<String> secondaryKeyFields = cid.getFieldExprs();
+            numSecondaryKeys = secondaryKeyFields.size();
+            int numKeys = numSecondaryKeys + numPrimaryKeys;
+            recordFields = new ISerializerDeserializer[numKeys];
+            typeTraits = new ITypeTraits[numKeys];
+            // comparatorFactories = new
+            // IBinaryComparatorFactory[numSecondaryKeys];
+            comparatorFactories = new IBinaryComparatorFactory[numKeys];
+            if (itemType.getTypeTag() != ATypeTag.RECORD) {
+                throw new AlgebricksException("Only record types can be indexed.");
+            }
+            ARecordType recType = (ARecordType) itemType;
+            for (i = 0; i < numSecondaryKeys; i++) {
+                Pair<IAType, Boolean> keyTypePair = AqlCompiledIndexDecl.getNonNullableKeyFieldType(
+                        secondaryKeyFields.get(i), recType);
+                IAType keyType = keyTypePair.first;
+                ISerializerDeserializer keySerde = metadata.getFormat().getSerdeProvider()
+                        .getSerializerDeserializer(keyType);
+                recordFields[i] = keySerde;
+                comparatorFactories[i] = AqlBinaryComparatorFactoryProvider.INSTANCE.getBinaryComparatorFactory(
+                        keyType, true);
+                typeTraits[i] = AqlTypeTraitProvider.INSTANCE.getTypeTrait(keyType);
+            }
+        } else {
+            recordFields = new ISerializerDeserializer[numPrimaryKeys + 1];
+            comparatorFactories = new IBinaryComparatorFactory[numPrimaryKeys];
+            typeTraits = new ITypeTraits[numPrimaryKeys + 1];
+            ISerializerDeserializer payloadSerde = metadata.getFormat().getSerdeProvider()
+                    .getSerializerDeserializer(itemType);
+            recordFields[numPrimaryKeys] = payloadSerde;
+            typeTraits[numPrimaryKeys] = AqlTypeTraitProvider.INSTANCE.getTypeTrait(itemType);
+        }
 
-		for (Triple<IEvaluatorFactory, ScalarFunctionCallExpression, IAType> evalFactoryAndType : DatasetUtils
-				.getPartitioningFunctions(ddecl)) {
-			IAType keyType = evalFactoryAndType.third;
-			ISerializerDeserializer keySerde = metadata.getFormat()
-					.getSerdeProvider().getSerializerDeserializer(keyType);
-			recordFields[i] = keySerde;
-			// if (!isSecondary) {
-			comparatorFactories[i] = AqlBinaryComparatorFactoryProvider.INSTANCE
-					.getBinaryComparatorFactory(keyType, true);
-			// }
-			typeTraits[i] = AqlTypeTraitProvider.INSTANCE.getTypeTrait(keyType);
-			++i;
-		}
+        for (Triple<IEvaluatorFactory, ScalarFunctionCallExpression, IAType> evalFactoryAndType : DatasetUtils
+                .getPartitioningFunctions(ddecl)) {
+            IAType keyType = evalFactoryAndType.third;
+            ISerializerDeserializer keySerde = metadata.getFormat().getSerdeProvider()
+                    .getSerializerDeserializer(keyType);
+            recordFields[i] = keySerde;
+            // if (!isSecondary) {
+            comparatorFactories[i] = AqlBinaryComparatorFactoryProvider.INSTANCE.getBinaryComparatorFactory(keyType,
+                    true);
+            // }
+            typeTraits[i] = AqlTypeTraitProvider.INSTANCE.getTypeTrait(keyType);
+            ++i;
+        }
 
-		IAsterixApplicationContextInfo appContext = (IAsterixApplicationContextInfo) context
-				.getAppContext();
-		RecordDescriptor recDesc = new RecordDescriptor(recordFields);
+        IAsterixApplicationContextInfo appContext = (IAsterixApplicationContextInfo) context.getAppContext();
+        RecordDescriptor recDesc = new RecordDescriptor(recordFields);
 
-		Pair<IFileSplitProvider, AlgebricksPartitionConstraint> spPc;
-		try {
-			spPc = metadata
-					.splitProviderAndPartitionConstraintsForInternalOrFeedDataset(
-							datasetName, indexName);
-		} catch (Exception e) {
-			throw new AlgebricksException(e);
-		}
+        Pair<IFileSplitProvider, AlgebricksPartitionConstraint> spPc;
+        try {
+            spPc = metadata.splitProviderAndPartitionConstraintsForInternalOrFeedDataset(datasetName, indexName);
+        } catch (Exception e) {
+            throw new AlgebricksException(e);
+        }
 
-		BTreeSearchOperatorDescriptor btreeSearchOp = new BTreeSearchOperatorDescriptor(
-				jobSpec, recDesc, appContext.getStorageManagerInterface(),
-				appContext.getIndexRegistryProvider(), spPc.first,
-				typeTraits,
-				comparatorFactories, lowKeyFields, highKeyFields,
-				lowKeyInclusive, highKeyInclusive,
-				new BTreeDataflowHelperFactory(), NoOpOperationCallbackProvider.INSTANCE);
-		return new Pair<IOperatorDescriptor, AlgebricksPartitionConstraint>(
-				btreeSearchOp, spPc.second);
-	}
+        BTreeSearchOperatorDescriptor btreeSearchOp = new BTreeSearchOperatorDescriptor(jobSpec, recDesc,
+                appContext.getStorageManagerInterface(), appContext.getIndexRegistryProvider(), spPc.first, typeTraits,
+                comparatorFactories, lowKeyFields, highKeyFields, lowKeyInclusive, highKeyInclusive,
+                new BTreeDataflowHelperFactory(), NoOpOperationCallbackProvider.INSTANCE);
+        return new Pair<IOperatorDescriptor, AlgebricksPartitionConstraint>(btreeSearchOp, spPc.second);
+    }
 
-	@SuppressWarnings("rawtypes")
-	public static Pair<IOperatorDescriptor, AlgebricksPartitionConstraint> buildRtreeRuntime(
-			AqlCompiledMetadataDeclarations metadata, JobGenContext context,
-			JobSpecification jobSpec, String datasetName,
-			AqlCompiledDatasetDecl ddecl, String indexName, int[] keyFields)
-			throws AlgebricksException {
-		String itemTypeName = ddecl.getItemTypeName();
-		IAType itemType;
-		try {
-			itemType = metadata.findType(itemTypeName);
-		} catch (Exception e) {
-			throw new AlgebricksException(e);
-		}
+    @SuppressWarnings("rawtypes")
+    public static Pair<IOperatorDescriptor, AlgebricksPartitionConstraint> buildRtreeRuntime(
+            AqlCompiledMetadataDeclarations metadata, JobGenContext context, JobSpecification jobSpec,
+            String datasetName, AqlCompiledDatasetDecl ddecl, String indexName, int[] keyFields)
+            throws AlgebricksException {
+        String itemTypeName = ddecl.getItemTypeName();
+        IAType itemType;
+        try {
+            itemType = metadata.findType(itemTypeName);
+        } catch (Exception e) {
+            throw new AlgebricksException(e);
+        }
 
-		boolean isSecondary = true;
-		AqlCompiledIndexDecl primIdxDecl = DatasetUtils.getPrimaryIndex(ddecl);
-		if (primIdxDecl != null) {
-			isSecondary = !indexName.equals(primIdxDecl.getIndexName());
-		}
+        boolean isSecondary = true;
+        AqlCompiledIndexDecl primIdxDecl = DatasetUtils.getPrimaryIndex(ddecl);
+        if (primIdxDecl != null) {
+            isSecondary = !indexName.equals(primIdxDecl.getIndexName());
+        }
 
-		int numPrimaryKeys = DatasetUtils.getPartitioningFunctions(ddecl)
-				.size();
-		ISerializerDeserializer[] recordFields;
-		IBinaryComparatorFactory[] comparatorFactories;
-		ITypeTraits[] typeTraits;
-		IPrimitiveValueProviderFactory[] valueProviderFactories;
-		int numSecondaryKeys = 0;
-		int numNestedSecondaryKeyFields = 0;
-		int i = 0;
-		if (isSecondary) {
-			AqlCompiledIndexDecl cid = DatasetUtils.findSecondaryIndexByName(
-					ddecl, indexName);
-			if (cid == null) {
-				throw new AlgebricksException(
-						"Code generation error: no index " + indexName
-								+ " for dataset " + datasetName);
-			}
-			List<String> secondaryKeyFields = cid.getFieldExprs();
-			numSecondaryKeys = secondaryKeyFields.size();
+        int numPrimaryKeys = DatasetUtils.getPartitioningFunctions(ddecl).size();
+        ISerializerDeserializer[] recordFields;
+        IBinaryComparatorFactory[] comparatorFactories;
+        ITypeTraits[] typeTraits;
+        IPrimitiveValueProviderFactory[] valueProviderFactories;
+        int numSecondaryKeys = 0;
+        int numNestedSecondaryKeyFields = 0;
+        int i = 0;
+        if (isSecondary) {
+            AqlCompiledIndexDecl cid = DatasetUtils.findSecondaryIndexByName(ddecl, indexName);
+            if (cid == null) {
+                throw new AlgebricksException("Code generation error: no index " + indexName + " for dataset "
+                        + datasetName);
+            }
+            List<String> secondaryKeyFields = cid.getFieldExprs();
+            numSecondaryKeys = secondaryKeyFields.size();
 
-			if (numSecondaryKeys != 1) {
-				throw new AlgebricksException(
-						"Cannot use "
-								+ numSecondaryKeys
-								+ " fields as a key for the R-tree index. There can be only one field as a key for the R-tree index.");
-			}
+            if (numSecondaryKeys != 1) {
+                throw new AlgebricksException(
+                        "Cannot use "
+                                + numSecondaryKeys
+                                + " fields as a key for the R-tree index. There can be only one field as a key for the R-tree index.");
+            }
 
-			if (itemType.getTypeTag() != ATypeTag.RECORD) {
-				throw new AlgebricksException(
-						"Only record types can be indexed.");
-			}
-			ARecordType recType = (ARecordType) itemType;
+            if (itemType.getTypeTag() != ATypeTag.RECORD) {
+                throw new AlgebricksException("Only record types can be indexed.");
+            }
+            ARecordType recType = (ARecordType) itemType;
 
-			Pair<IAType, Boolean> keyTypePair = AqlCompiledIndexDecl.getNonNullableKeyFieldType(secondaryKeyFields.get(0), recType);
-			IAType keyType = keyTypePair.first;
-			if (keyType == null) {
-				throw new AlgebricksException("Could not find field "
-						+ secondaryKeyFields.get(0) + " in the schema.");
-			}
+            Pair<IAType, Boolean> keyTypePair = AqlCompiledIndexDecl.getNonNullableKeyFieldType(
+                    secondaryKeyFields.get(0), recType);
+            IAType keyType = keyTypePair.first;
+            if (keyType == null) {
+                throw new AlgebricksException("Could not find field " + secondaryKeyFields.get(0) + " in the schema.");
+            }
 
-			int dimension = NonTaggedFormatUtil.getNumDimensions(keyType
-					.getTypeTag());
-			numNestedSecondaryKeyFields = dimension * 2;
+            int dimension = NonTaggedFormatUtil.getNumDimensions(keyType.getTypeTag());
+            numNestedSecondaryKeyFields = dimension * 2;
 
-			int numFields = numNestedSecondaryKeyFields + numPrimaryKeys;
-			recordFields = new ISerializerDeserializer[numFields];
-			typeTraits = new ITypeTraits[numFields];
-			comparatorFactories = new IBinaryComparatorFactory[numNestedSecondaryKeyFields];
-			valueProviderFactories = new IPrimitiveValueProviderFactory[numNestedSecondaryKeyFields];
+            int numFields = numNestedSecondaryKeyFields + numPrimaryKeys;
+            recordFields = new ISerializerDeserializer[numFields];
+            typeTraits = new ITypeTraits[numFields];
+            comparatorFactories = new IBinaryComparatorFactory[numNestedSecondaryKeyFields];
+            valueProviderFactories = new IPrimitiveValueProviderFactory[numNestedSecondaryKeyFields];
 
-			IAType nestedKeyType = NonTaggedFormatUtil
-					.getNestedSpatialType(keyType.getTypeTag());
-			for (i = 0; i < numNestedSecondaryKeyFields; i++) {
-				ISerializerDeserializer keySerde = AqlSerializerDeserializerProvider.INSTANCE
-						.getSerializerDeserializer(nestedKeyType);
-				recordFields[i] = keySerde;
-				comparatorFactories[i] = AqlBinaryComparatorFactoryProvider.INSTANCE
-						.getBinaryComparatorFactory(nestedKeyType,
-								true);
-				typeTraits[i] = AqlTypeTraitProvider.INSTANCE
-						.getTypeTrait(nestedKeyType);
-				valueProviderFactories[i] = AqlPrimitiveValueProviderFactory.INSTANCE;
-			}
-		} else {
-			throw new AlgebricksException(
-					"R-tree can only be used as a secondary index");
-		}
+            IAType nestedKeyType = NonTaggedFormatUtil.getNestedSpatialType(keyType.getTypeTag());
+            for (i = 0; i < numNestedSecondaryKeyFields; i++) {
+                ISerializerDeserializer keySerde = AqlSerializerDeserializerProvider.INSTANCE
+                        .getSerializerDeserializer(nestedKeyType);
+                recordFields[i] = keySerde;
+                comparatorFactories[i] = AqlBinaryComparatorFactoryProvider.INSTANCE.getBinaryComparatorFactory(
+                        nestedKeyType, true);
+                typeTraits[i] = AqlTypeTraitProvider.INSTANCE.getTypeTrait(nestedKeyType);
+                valueProviderFactories[i] = AqlPrimitiveValueProviderFactory.INSTANCE;
+            }
+        } else {
+            throw new AlgebricksException("R-tree can only be used as a secondary index");
+        }
 
-		for (Triple<IEvaluatorFactory, ScalarFunctionCallExpression, IAType> evalFactoryAndType : DatasetUtils
-				.getPartitioningFunctions(ddecl)) {
-			IAType keyType = evalFactoryAndType.third;
-			ISerializerDeserializer keySerde = AqlSerializerDeserializerProvider.INSTANCE
-					.getSerializerDeserializer(keyType);
-			recordFields[i] = keySerde;
-			typeTraits[i] = AqlTypeTraitProvider.INSTANCE.getTypeTrait(keyType);
-			++i;
-		}
+        for (Triple<IEvaluatorFactory, ScalarFunctionCallExpression, IAType> evalFactoryAndType : DatasetUtils
+                .getPartitioningFunctions(ddecl)) {
+            IAType keyType = evalFactoryAndType.third;
+            ISerializerDeserializer keySerde = AqlSerializerDeserializerProvider.INSTANCE
+                    .getSerializerDeserializer(keyType);
+            recordFields[i] = keySerde;
+            typeTraits[i] = AqlTypeTraitProvider.INSTANCE.getTypeTrait(keyType);
+            ++i;
+        }
 
-		IAsterixApplicationContextInfo appContext = (IAsterixApplicationContextInfo) context
-				.getAppContext();
-		RecordDescriptor recDesc = new RecordDescriptor(recordFields);
+        IAsterixApplicationContextInfo appContext = (IAsterixApplicationContextInfo) context.getAppContext();
+        RecordDescriptor recDesc = new RecordDescriptor(recordFields);
+
+        Pair<IFileSplitProvider, AlgebricksPartitionConstraint> spPc;
+        try {
+            spPc = metadata.splitProviderAndPartitionConstraintsForInternalOrFeedDataset(datasetName, indexName);
+        } catch (Exception e) {
+            throw new AlgebricksException(e);
+        }
 
-		Pair<IFileSplitProvider, AlgebricksPartitionConstraint> spPc;
-		try {
-			spPc = metadata
-					.splitProviderAndPartitionConstraintsForInternalOrFeedDataset(
-							datasetName, indexName);
-		} catch (Exception e) {
-			throw new AlgebricksException(e);
-		}
+        RTreeSearchOperatorDescriptor rtreeSearchOp = new RTreeSearchOperatorDescriptor(jobSpec, recDesc,
+                appContext.getStorageManagerInterface(), appContext.getIndexRegistryProvider(), spPc.first, typeTraits,
+                comparatorFactories, keyFields, new RTreeDataflowHelperFactory(valueProviderFactories),
+                NoOpOperationCallbackProvider.INSTANCE);
 
-		RTreeSearchOperatorDescriptor rtreeSearchOp = new RTreeSearchOperatorDescriptor(
-				jobSpec, recDesc, appContext.getStorageManagerInterface(),
-				appContext.getIndexRegistryProvider(), spPc.first,
-				typeTraits,
-				comparatorFactories, keyFields,
-				new RTreeDataflowHelperFactory(valueProviderFactories), NoOpOperationCallbackProvider.INSTANCE);
+        return new Pair<IOperatorDescriptor, AlgebricksPartitionConstraint>(rtreeSearchOp, spPc.second);
+    }
 
-		return new Pair<IOperatorDescriptor, AlgebricksPartitionConstraint>(
-				rtreeSearchOp, spPc.second);
-	}
+    @Override
+    public Pair<IPushRuntimeFactory, AlgebricksPartitionConstraint> getWriteFileRuntime(IDataSink sink,
+            int[] printColumns, IPrinterFactory[] printerFactories, RecordDescriptor inputDesc) {
+        FileSplitDataSink fsds = (FileSplitDataSink) sink;
+        FileSplitSinkId fssi = (FileSplitSinkId) fsds.getId();
+        FileSplit fs = fssi.getFileSplit();
+        File outFile = fs.getLocalFile().getFile();
+        String nodeId = fs.getNodeName();
 
-	@Override
-	public Pair<IPushRuntimeFactory, AlgebricksPartitionConstraint> getWriteFileRuntime(
-			IDataSink sink, int[] printColumns,
-			IPrinterFactory[] printerFactories, RecordDescriptor inputDesc) {
-		FileSplitDataSink fsds = (FileSplitDataSink) sink;
-		FileSplitSinkId fssi = (FileSplitSinkId) fsds.getId();
-		FileSplit fs = fssi.getFileSplit();
-		File outFile = fs.getLocalFile().getFile();
-		String nodeId = fs.getNodeName();
+        SinkWriterRuntimeFactory runtime = new SinkWriterRuntimeFactory(printColumns, printerFactories, outFile,
+                metadata.getWriterFactory(), inputDesc);
+        AlgebricksPartitionConstraint apc = new AlgebricksAbsolutePartitionConstraint(new String[] { nodeId });
+        return new Pair<IPushRuntimeFactory, AlgebricksPartitionConstraint>(runtime, apc);
+    }
 
-		SinkWriterRuntimeFactory runtime = new SinkWriterRuntimeFactory(
-				printColumns, printerFactories, outFile, metadata
-						.getWriterFactory(), inputDesc);
-		AlgebricksPartitionConstraint apc = new AlgebricksAbsolutePartitionConstraint(
-				new String[] { nodeId });
-		return new Pair<IPushRuntimeFactory, AlgebricksPartitionConstraint>(
-				runtime, apc);
-	}
+    @Override
+    public IDataSourceIndex<String, AqlSourceId> findDataSourceIndex(String indexId, AqlSourceId dataSourceId)
+            throws AlgebricksException {
+        AqlDataSource ads = findDataSource(dataSourceId);
+        AqlCompiledDatasetDecl adecl = ads.getCompiledDatasetDecl();
+        if (adecl.getDatasetType() == DatasetType.EXTERNAL) {
+            throw new AlgebricksException("No index for external dataset " + dataSourceId);
+        }
 
-	@Override
-	public IDataSourceIndex<String, AqlSourceId> findDataSourceIndex(
-			String indexId, AqlSourceId dataSourceId)
-			throws AlgebricksException {
-		AqlDataSource ads = findDataSource(dataSourceId);
-		AqlCompiledDatasetDecl adecl = ads.getCompiledDatasetDecl();
-		if (adecl.getDatasetType() == DatasetType.EXTERNAL) {
-			throw new AlgebricksException("No index for external dataset "
-					+ dataSourceId);
-		}
+        String idxName = (String) indexId;
+        AqlCompiledIndexDecl acid = DatasetUtils.findSecondaryIndexByName(adecl, idxName);
+        AqlSourceId asid = (AqlSourceId) dataSourceId;
+        if (acid != null) {
+            return new AqlIndex(acid, metadata, asid.getDatasetName());
+        } else {
+            AqlCompiledIndexDecl primIdx = DatasetUtils.getPrimaryIndex(adecl);
+            if (primIdx.getIndexName().equals(indexId)) {
+                return new AqlIndex(primIdx, metadata, asid.getDatasetName());
+            } else {
+                return null;
+            }
+        }
+    }
 
-		String idxName = (String) indexId;
-		AqlCompiledIndexDecl acid = DatasetUtils.findSecondaryIndexByName(
-				adecl, idxName);
-		AqlSourceId asid = (AqlSourceId) dataSourceId;
-		if (acid != null) {
-			return new AqlIndex(acid, metadata, asid.getDatasetName());
-		} else {
-			AqlCompiledIndexDecl primIdx = DatasetUtils.getPrimaryIndex(adecl);
-			if (primIdx.getIndexName().equals(indexId)) {
-				return new AqlIndex(primIdx, metadata, asid.getDatasetName());
-			} else {
-				return null;
-			}
-		}
-	}
+    public static AqlDataSource lookupSourceInMetadata(AqlCompiledMetadataDeclarations metadata, AqlSourceId aqlId)
+            throws AlgebricksException {
+        if (!aqlId.getDataverseName().equals(metadata.getDataverseName())) {
+            return null;
+        }
+        AqlCompiledDatasetDecl acdd = metadata.findDataset(aqlId.getDatasetName());
+        if (acdd == null) {
+            throw new AlgebricksException("Datasource with id " + aqlId + " was not found.");
+        }
+        String tName = acdd.getItemTypeName();
+        IAType itemType;
+        try {
+            itemType = metadata.findType(tName);
+        } catch (Exception e) {
+            throw new AlgebricksException(e);
+        }
+        return new AqlDataSource(aqlId, acdd, itemType);
+    }
 
-	public static AqlDataSource lookupSourceInMetadata(
-			AqlCompiledMetadataDeclarations metadata, AqlSourceId aqlId)
-			throws AlgebricksException {
-		if (!aqlId.getDataverseName().equals(metadata.getDataverseName())) {
-			return null;
-		}
-		AqlCompiledDatasetDecl acdd = metadata.findDataset(aqlId
-				.getDatasetName());
-		if (acdd == null) {
-			throw new AlgebricksException("Datasource with id " + aqlId
-					+ " was not found.");
-		}
-		String tName = acdd.getItemTypeName();
-		IAType itemType;
-		try {
-			itemType = metadata.findType(tName);
-		} catch (Exception e) {
-			throw new AlgebricksException(e);
-		}
-		return new AqlDataSource(aqlId, acdd, itemType);
-	}
+    @Override
+    public boolean scannerOperatorIsLeaf(IDataSource<AqlSourceId> dataSource) {
+        AqlSourceId asid = dataSource.getId();
+        String datasetName = asid.getDatasetName();
+        AqlCompiledDatasetDecl adecl = metadata.findDataset(datasetName);
+        if (adecl == null) {
+            throw new IllegalArgumentException("Unknown dataset " + datasetName);
+        }
+        return adecl.getDatasetType() == DatasetType.EXTERNAL;
+    }
 
-	@Override
-	public boolean scannerOperatorIsLeaf(IDataSource<AqlSourceId> dataSource) {
-		AqlSourceId asid = dataSource.getId();
-		String datasetName = asid.getDatasetName();
-		AqlCompiledDatasetDecl adecl = metadata.findDataset(datasetName);
-		if (adecl == null) {
-			throw new IllegalArgumentException("Unknown dataset " + datasetName);
-		}
-		return adecl.getDatasetType() == DatasetType.EXTERNAL;
-	}
+    @Override
+    public Pair<IOperatorDescriptor, AlgebricksPartitionConstraint> getWriteResultRuntime(
+            IDataSource<AqlSourceId> dataSource, IOperatorSchema propagatedSchema, List<LogicalVariable> keys,
+            LogicalVariable payload, JobGenContext context, JobSpecification spec) throws AlgebricksException {
+        String datasetName = dataSource.getId().getDatasetName();
+        int numKeys = keys.size();
+        // move key fields to front
+        int[] fieldPermutation = new int[numKeys + 1];
+        // System.arraycopy(keys, 0, fieldPermutation, 0, numKeys);
+        int i = 0;
+        for (LogicalVariable varKey : keys) {
+            int idx = propagatedSchema.findVariable(varKey);
+            fieldPermutation[i] = idx;
+            i++;
+        }
+        fieldPermutation[numKeys] = propagatedSchema.findVariable(payload);
 
-	@Override
-	public Pair<IOperatorDescriptor, AlgebricksPartitionConstraint> getWriteResultRuntime(
-			IDataSource<AqlSourceId> dataSource,
-			IOperatorSchema propagatedSchema, List<LogicalVariable> keys,
-			LogicalVariable payload, JobGenContext context,
-			JobSpecification spec) throws AlgebricksException {
-		String datasetName = dataSource.getId().getDatasetName();
-		int numKeys = keys.size();
-		// move key fields to front
-		int[] fieldPermutation = new int[numKeys + 1];
-		// System.arraycopy(keys, 0, fieldPermutation, 0, numKeys);
-		int i = 0;
-		for (LogicalVariable varKey : keys) {
-			int idx = propagatedSchema.findVariable(varKey);
-			fieldPermutation[i] = idx;
-			i++;
-		}
-		fieldPermutation[numKeys] = propagatedSchema.findVariable(payload);
+        AqlCompiledDatasetDecl compiledDatasetDecl = metadata.findDataset(datasetName);
+        if (compiledDatasetDecl == null) {
+            throw new AlgebricksException("Unknown dataset " + datasetName);
+        }
+        String indexName = DatasetUtils.getPrimaryIndex(compiledDatasetDecl).getIndexName();
 
-		AqlCompiledDatasetDecl compiledDatasetDecl = metadata
-				.findDataset(datasetName);
-		if (compiledDatasetDecl == null) {
-			throw new AlgebricksException("Unknown dataset " + datasetName);
-		}
-		String indexName = DatasetUtils.getPrimaryIndex(compiledDatasetDecl)
-				.getIndexName();
+        ITypeTraits[] typeTraits = DatasetUtils.computeTupleTypeTraits(compiledDatasetDecl, metadata);
 
-		ITypeTraits[] typeTraits = DatasetUtils.computeTupleTypeTraits(
-				compiledDatasetDecl, metadata);
+        IAsterixApplicationContextInfo appContext = (IAsterixApplicationContextInfo) context.getAppContext();
 
-		IAsterixApplicationContextInfo appContext = (IAsterixApplicationContextInfo) context
-				.getAppContext();
+        IBinaryComparatorFactory[] comparatorFactories = DatasetUtils.computeKeysBinaryComparatorFactories(
+                compiledDatasetDecl, context.getBinaryComparatorFactoryProvider());
 
-		IBinaryComparatorFactory[] comparatorFactories = DatasetUtils
-				.computeKeysBinaryComparatorFactories(compiledDatasetDecl,
-						context.getBinaryComparatorFactoryProvider());
+        Pair<IFileSplitProvider, AlgebricksPartitionConstraint> splitsAndConstraint;
+        try {
+            splitsAndConstraint = metadata.splitProviderAndPartitionConstraintsForInternalOrFeedDataset(datasetName,
+                    indexName);
+        } catch (Exception e) {
+            throw new AlgebricksException(e);
+        }
 
-		Pair<IFileSplitProvider, AlgebricksPartitionConstraint> splitsAndConstraint;
-		try {
-			splitsAndConstraint = metadata
-					.splitProviderAndPartitionConstraintsForInternalOrFeedDataset(
-							datasetName, indexName);
-		} catch (Exception e) {
-			throw new AlgebricksException(e);
-		}
+        TreeIndexBulkLoadOperatorDescriptor btreeBulkLoad = new TreeIndexBulkLoadOperatorDescriptor(spec,
+                appContext.getStorageManagerInterface(), appContext.getIndexRegistryProvider(),
+                splitsAndConstraint.first, typeTraits, comparatorFactories, fieldPermutation,
+                GlobalConfig.DEFAULT_BTREE_FILL_FACTOR, new BTreeDataflowHelperFactory(),
+                NoOpOperationCallbackProvider.INSTANCE);
+        return new Pair<IOperatorDescriptor, AlgebricksPartitionConstraint>(btreeBulkLoad, splitsAndConstraint.second);
+    }
 
-		TreeIndexBulkLoadOperatorDescriptor btreeBulkLoad = new TreeIndexBulkLoadOperatorDescriptor(
-				spec, appContext.getStorageManagerInterface(),
-				appContext.getIndexRegistryProvider(),
-				splitsAndConstraint.first, typeTraits, comparatorFactories,
-				fieldPermutation, GlobalConfig.DEFAULT_BTREE_FILL_FACTOR,
-				new BTreeDataflowHelperFactory(),
-				NoOpOperationCallbackProvider.INSTANCE);
-		return new Pair<IOperatorDescriptor, AlgebricksPartitionConstraint>(
-				btreeBulkLoad, splitsAndConstraint.second);
-	}
+    @Override
+    public Pair<IOperatorDescriptor, AlgebricksPartitionConstraint> getInsertRuntime(
+            IDataSource<AqlSourceId> dataSource, IOperatorSchema propagatedSchema, List<LogicalVariable> keys,
+            LogicalVariable payload, RecordDescriptor recordDesc, JobGenContext context, JobSpecification spec)
+            throws AlgebricksException {
+        String datasetName = dataSource.getId().getDatasetName();
+        int numKeys = keys.size();
+        // move key fields to front
+        int[] fieldPermutation = new int[numKeys + 1];
+        // System.arraycopy(keys, 0, fieldPermutation, 0, numKeys);
+        int i = 0;
+        for (LogicalVariable varKey : keys) {
+            int idx = propagatedSchema.findVariable(varKey);
+            fieldPermutation[i] = idx;
+            i++;
+        }
+        fieldPermutation[numKeys] = propagatedSchema.findVariable(payload);
 
-	@Override
-	public Pair<IOperatorDescriptor, AlgebricksPartitionConstraint> getInsertRuntime(
-			IDataSource<AqlSourceId> dataSource,
-			IOperatorSchema propagatedSchema, List<LogicalVariable> keys,
-			LogicalVariable payload, RecordDescriptor recordDesc,
-			JobGenContext context, JobSpecification spec)
-			throws AlgebricksException {
-		String datasetName = dataSource.getId().getDatasetName();
-		int numKeys = keys.size();
-		// move key fields to front
-		int[] fieldPermutation = new int[numKeys + 1];
-		// System.arraycopy(keys, 0, fieldPermutation, 0, numKeys);
-		int i = 0;
-		for (LogicalVariable varKey : keys) {
-			int idx = propagatedSchema.findVariable(varKey);
-			fieldPermutation[i] = idx;
-			i++;
-		}
-		fieldPermutation[numKeys] = propagatedSchema.findVariable(payload);
+        AqlCompiledDatasetDecl compiledDatasetDecl = metadata.findDataset(datasetName);
+        if (compiledDatasetDecl == null) {
+            throw new AlgebricksException("Unknown dataset " + datasetName);
+        }
+        String indexName = DatasetUtils.getPrimaryIndex(compiledDatasetDecl).getIndexName();
 
-		AqlCompiledDatasetDecl compiledDatasetDecl = metadata
-				.findDataset(datasetName);
-		if (compiledDatasetDecl == null) {
-			throw new AlgebricksException("Unknown dataset " + datasetName);
-		}
-		String indexName = DatasetUtils.getPrimaryIndex(compiledDatasetDecl)
-				.getIndexName();
+        ITypeTraits[] typeTraits = DatasetUtils.computeTupleTypeTraits(compiledDatasetDecl, metadata);
 
-		ITypeTraits[] typeTraits = DatasetUtils.computeTupleTypeTraits(
-				compiledDatasetDecl, metadata);
+        IAsterixApplicationContextInfo appContext = (IAsterixApplicationContextInfo) context.getAppContext();
 
-		IAsterixApplicationContextInfo appContext = (IAsterixApplicationContextInfo) context
-				.getAppContext();
+        IBinaryComparatorFactory[] comparatorFactories = DatasetUtils.computeKeysBinaryComparatorFactories(
+                compiledDatasetDecl, context.getBinaryComparatorFactoryProvider());
 
-		IBinaryComparatorFactory[] comparatorFactories = DatasetUtils
-				.computeKeysBinaryComparatorFactories(compiledDatasetDecl,
-						context.getBinaryComparatorFactoryProvider());
+        Pair<IFileSplitProvider, AlgebricksPartitionConstraint> splitsAndConstraint;
+        try {
+            splitsAndConstraint = metadata.splitProviderAndPartitionConstraintsForInternalOrFeedDataset(datasetName,
+                    indexName);
+        } catch (Exception e) {
+            throw new AlgebricksException(e);
+        }
 
-		Pair<IFileSplitProvider, AlgebricksPartitionConstraint> splitsAndConstraint;
-		try {
-			splitsAndConstraint = metadata
-					.splitProviderAndPartitionConstraintsForInternalOrFeedDataset(
-							datasetName, indexName);
-		} catch (Exception e) {
-			throw new AlgebricksException(e);
-		}
+        TreeIndexInsertUpdateDeleteOperatorDescriptor btreeBulkLoad = new TreeIndexInsertUpdateDeleteOperatorDescriptor(
+                spec, recordDesc, appContext.getStorageManagerInterface(), appContext.getIndexRegistryProvider(),
+                splitsAndConstraint.first, typeTraits, comparatorFactories, fieldPermutation, IndexOp.INSERT,
+                new BTreeDataflowHelperFactory(), null, NoOpOperationCallbackProvider.INSTANCE, txnId);
+        return new Pair<IOperatorDescriptor, AlgebricksPartitionConstraint>(btreeBulkLoad, splitsAndConstraint.second);
+    }
 
-		TreeIndexInsertUpdateDeleteOperatorDescriptor btreeBulkLoad = new TreeIndexInsertUpdateDeleteOperatorDescriptor(
-				spec, recordDesc, appContext.getStorageManagerInterface(),
-				appContext.getIndexRegistryProvider(),
-				splitsAndConstraint.first, typeTraits, comparatorFactories,
-				fieldPermutation, IndexOp.INSERT,
-				new BTreeDataflowHelperFactory(),
-				null, NoOpOperationCallbackProvider.INSTANCE, txnId);
-		return new Pair<IOperatorDescriptor, AlgebricksPartitionConstraint>(
-				btreeBulkLoad, splitsAndConstraint.second);
-	}
+    @Override
+    public Pair<IOperatorDescriptor, AlgebricksPartitionConstraint> getDeleteRuntime(
+            IDataSource<AqlSourceId> dataSource, IOperatorSchema propagatedSchema, List<LogicalVariable> keys,
+            LogicalVariable payload, RecordDescriptor recordDesc, JobGenContext context, JobSpecification spec)
+            throws AlgebricksException {
+        String datasetName = dataSource.getId().getDatasetName();
+        int numKeys = keys.size();
+        // move key fields to front
+        int[] fieldPermutation = new int[numKeys + 1];
+        // System.arraycopy(keys, 0, fieldPermutation, 0, numKeys);
+        int i = 0;
+        for (LogicalVariable varKey : keys) {
+            int idx = propagatedSchema.findVariable(varKey);
+            fieldPermutation[i] = idx;
+            i++;
+        }
+        fieldPermutation[numKeys] = propagatedSchema.findVariable(payload);
 
-	@Override
-	public Pair<IOperatorDescriptor, AlgebricksPartitionConstraint> getDeleteRuntime(
-			IDataSource<AqlSourceId> dataSource,
-			IOperatorSchema propagatedSchema, List<LogicalVariable> keys,
-			LogicalVariable payload, RecordDescriptor recordDesc,
-			JobGenContext context, JobSpecification spec)
-			throws AlgebricksException {
-		String datasetName = dataSource.getId().getDatasetName();
-		int numKeys = keys.size();
-		// move key fields to front
-		int[] fieldPermutation = new int[numKeys + 1];
-		// System.arraycopy(keys, 0, fieldPermutation, 0, numKeys);
-		int i = 0;
-		for (LogicalVariable varKey : keys) {
-			int idx = propagatedSchema.findVariable(varKey);
-			fieldPermutation[i] = idx;
-			i++;
-		}
-		fieldPermutation[numKeys] = propagatedSchema.findVariable(payload);
+        AqlCompiledDatasetDecl compiledDatasetDecl = metadata.findDataset(datasetName);
+        if (compiledDatasetDecl == null) {
+            throw new AlgebricksException("Unknown dataset " + datasetName);
+        }
+        String indexName = DatasetUtils.getPrimaryIndex(compiledDatasetDecl).getIndexName();
 
-		AqlCompiledDatasetDecl compiledDatasetDecl = metadata
-				.findDataset(datasetName);
-		if (compiledDatasetDecl == null) {
-			throw new AlgebricksException("Unknown dataset " + datasetName);
-		}
-		String indexName = DatasetUtils.getPrimaryIndex(compiledDatasetDecl)
-				.getIndexName();
+        ITypeTraits[] typeTraits = DatasetUtils.computeTupleTypeTraits(compiledDatasetDecl, metadata);
 
-		ITypeTraits[] typeTraits = DatasetUtils.computeTupleTypeTraits(
-				compiledDatasetDecl, metadata);
+        IAsterixApplicationContextInfo appContext = (IAsterixApplicationContextInfo) context.getAppContext();
 
-		IAsterixApplicationContextInfo appContext = (IAsterixApplicationContextInfo) context
-				.getAppContext();
+        IBinaryComparatorFactory[] comparatorFactories = DatasetUtils.computeKeysBinaryComparatorFactories(
+                compiledDatasetDecl, context.getBinaryComparatorFactoryProvider());
 
-		IBinaryComparatorFactory[] comparatorFactories = DatasetUtils
-				.computeKeysBinaryComparatorFactories(compiledDatasetDecl,
-						context.getBinaryComparatorFactoryProvider());
+        Pair<IFileSplitProvider, AlgebricksPartitionConstraint> splitsAndConstraint;
+        try {
+            splitsAndConstraint = metadata.splitProviderAndPartitionConstraintsForInternalOrFeedDataset(datasetName,
+                    indexName);
+        } catch (Exception e) {
+            throw new AlgebricksException(e);
+        }
 
-		Pair<IFileSplitProvider, AlgebricksPartitionConstraint> splitsAndConstraint;
-		try {
-			splitsAndConstraint = metadata
-					.splitProviderAndPartitionConstraintsForInternalOrFeedDataset(
-							datasetName, indexName);
-		} catch (Exception e) {
-			throw new AlgebricksException(e);
-		}
+        TreeIndexInsertUpdateDeleteOperatorDescriptor btreeBulkLoad = new TreeIndexInsertUpdateDeleteOperatorDescriptor(
+                spec, recordDesc, appContext.getStorageManagerInterface(), appContext.getIndexRegistryProvider(),
+                splitsAndConstraint.first, typeTraits, comparatorFactories, fieldPermutation, IndexOp.DELETE,
+                new BTreeDataflowHelperFactory(), null, NoOpOperationCallbackProvider.INSTANCE, txnId);
+        return new Pair<IOperatorDescriptor, AlgebricksPartitionConstraint>(btreeBulkLoad, splitsAndConstraint.second);
+    }
 
-		TreeIndexInsertUpdateDeleteOperatorDescriptor btreeBulkLoad = new TreeIndexInsertUpdateDeleteOperatorDescriptor(
-				spec, recordDesc, appContext.getStorageManagerInterface(),
-				appContext.getIndexRegistryProvider(),
-				splitsAndConstraint.first, typeTraits, comparatorFactories,
-				fieldPermutation, IndexOp.DELETE,
-				new BTreeDataflowHelperFactory(),
-				null, NoOpOperationCallbackProvider.INSTANCE, txnId);
-		return new Pair<IOperatorDescriptor, AlgebricksPartitionConstraint>(
-				btreeBulkLoad, splitsAndConstraint.second);
-	}
+    @Override
+    public Pair<IOperatorDescriptor, AlgebricksPartitionConstraint> getIndexInsertRuntime(
+            IDataSourceIndex<String, AqlSourceId> dataSourceIndex, IOperatorSchema propagatedSchema,
+            IOperatorSchema[] inputSchemas, IVariableTypeEnvironment typeEnv, List<LogicalVariable> primaryKeys,
+            List<LogicalVariable> secondaryKeys, ILogicalExpression filterExpr, RecordDescriptor recordDesc,
+            JobGenContext context, JobSpecification spec) throws AlgebricksException {
+        String indexName = dataSourceIndex.getId();
+        String datasetName = dataSourceIndex.getDataSource().getId().getDatasetName();
+        AqlCompiledDatasetDecl compiledDatasetDecl = metadata.findDataset(datasetName);
+        if (compiledDatasetDecl == null) {
+            throw new AlgebricksException("Unknown dataset " + datasetName);
+        }
+        AqlCompiledIndexDecl cid = DatasetUtils.findSecondaryIndexByName(compiledDatasetDecl, indexName);
+        AsterixTupleFilterFactory filterFactory = createTupleFilterFactory(inputSchemas, typeEnv, filterExpr, context);
+        if (cid.getKind() == IndexKind.BTREE) {
+            return getBTreeDmlRuntime(datasetName, indexName, propagatedSchema, primaryKeys, secondaryKeys,
+                    filterFactory, recordDesc, context, spec, IndexOp.INSERT);
+        } else {
+            return getRTreeDmlRuntime(datasetName, indexName, propagatedSchema, primaryKeys, secondaryKeys,
+                    filterFactory, recordDesc, context, spec, IndexOp.INSERT);
+        }
+    }
 
-	@Override
-	public Pair<IOperatorDescriptor, AlgebricksPartitionConstraint> getIndexInsertRuntime(
-			IDataSourceIndex<String, AqlSourceId> dataSourceIndex,
-			IOperatorSchema propagatedSchema, IOperatorSchema[] inputSchemas,
-			IVariableTypeEnvironment typeEnv,
-			List<LogicalVariable> primaryKeys,
-			List<LogicalVariable> secondaryKeys, ILogicalExpression filterExpr,
-			RecordDescriptor recordDesc, JobGenContext context,
-			JobSpecification spec) throws AlgebricksException {
-		String indexName = dataSourceIndex.getId();
-		String datasetName = dataSourceIndex.getDataSource().getId()
-				.getDatasetName();
-		AqlCompiledDatasetDecl compiledDatasetDecl = metadata
-				.findDataset(datasetName);
-		if (compiledDatasetDecl == null) {
-			throw new AlgebricksException("Unknown dataset " + datasetName);
-		}
-		AqlCompiledIndexDecl cid = DatasetUtils.findSecondaryIndexByName(
-				compiledDatasetDecl, indexName);
-		AsterixTupleFilterFactory filterFactory = createTupleFilterFactory(
-				inputSchemas, typeEnv, filterExpr, context);
-		if (cid.getKind() == IndexKind.BTREE) {
-			return getBTreeDmlRuntime(datasetName, indexName, propagatedSchema,
-					primaryKeys, secondaryKeys, filterFactory, recordDesc, context, spec,
-					IndexOp.INSERT);
-		} else {
-			return getRTreeDmlRuntime(datasetName, indexName, propagatedSchema,
-					primaryKeys, secondaryKeys, filterFactory, recordDesc, context, spec,
-					IndexOp.INSERT);
-		}
-	}
+    @Override
+    public Pair<IOperatorDescriptor, AlgebricksPartitionConstraint> getIndexDeleteRuntime(
+            IDataSourceIndex<String, AqlSourceId> dataSourceIndex, IOperatorSchema propagatedSchema,
+            IOperatorSchema[] inputSchemas, IVariableTypeEnvironment typeEnv, List<LogicalVariable> primaryKeys,
+            List<LogicalVariable> secondaryKeys, ILogicalExpression filterExpr, RecordDescriptor recordDesc,
+            JobGenContext context, JobSpecification spec) throws AlgebricksException {
+        String indexName = dataSourceIndex.getId();
+        String datasetName = dataSourceIndex.getDataSource().getId().getDatasetName();
+        AqlCompiledDatasetDecl compiledDatasetDecl = metadata.findDataset(datasetName);
+        if (compiledDatasetDecl == null) {
+            throw new AlgebricksException("Unknown dataset " + datasetName);
+        }
+        AqlCompiledIndexDecl cid = DatasetUtils.findSecondaryIndexByName(compiledDatasetDecl, indexName);
+        AsterixTupleFilterFactory filterFactory = createTupleFilterFactory(inputSchemas, typeEnv, filterExpr, context);
+        if (cid.getKind() == IndexKind.BTREE) {
+            return getBTreeDmlRuntime(datasetName, indexName, propagatedSchema, primaryKeys, secondaryKeys,
+                    filterFactory, recordDesc, context, spec, IndexOp.DELETE);
+        } else {
+            return getRTreeDmlRuntime(datasetName, indexName, propagatedSchema, primaryKeys, secondaryKeys,
+                    filterFactory, recordDesc, context, spec, IndexOp.DELETE);
+        }
+    }
 
-	@Override
-	public Pair<IOperatorDescriptor, AlgebricksPartitionConstraint> getIndexDeleteRuntime(
-			IDataSourceIndex<String, AqlSourceId> dataSourceIndex,
-			IOperatorSchema propagatedSchema, IOperatorSchema[] inputSchemas,
-			IVariableTypeEnvironment typeEnv,
-			List<LogicalVariable> primaryKeys,
-			List<LogicalVariable> secondaryKeys, ILogicalExpression filterExpr,
-			RecordDescriptor recordDesc, JobGenContext context,
-			JobSpecification spec) throws AlgebricksException {
-		String indexName = dataSourceIndex.getId();
-		String datasetName = dataSourceIndex.getDataSource().getId()
-				.getDatasetName();
-		AqlCompiledDatasetDecl compiledDatasetDecl = metadata
-				.findDataset(datasetName);
-		if (compiledDatasetDecl == null) {
-			throw new AlgebricksException("Unknown dataset " + datasetName);
-		}
-		AqlCompiledIndexDecl cid = DatasetUtils.findSecondaryIndexByName(
-				compiledDatasetDecl, indexName);
-		AsterixTupleFilterFactory filterFactory = createTupleFilterFactory(
-				inputSchemas, typeEnv, filterExpr, context);
-		if (cid.getKind() == IndexKind.BTREE) {
-			return getBTreeDmlRuntime(datasetName, indexName, propagatedSchema,
-					primaryKeys, secondaryKeys, filterFactory, recordDesc, context, spec,
-					IndexOp.DELETE);
-		} else {
-			return getRTreeDmlRuntime(datasetName, indexName, propagatedSchema,
-					primaryKeys, secondaryKeys, filterFactory, recordDesc, context, spec,
-					IndexOp.DELETE);
-		}
-	}
+    private AsterixTupleFilterFactory createTupleFilterFactory(IOperatorSchema[] inputSchemas,
+            IVariableTypeEnvironment typeEnv, ILogicalExpression filterExpr, JobGenContext context)
+            throws AlgebricksException {
+        // No filtering condition.
+        if (filterExpr == null) {
+            return null;
+        }
+        ILogicalExpressionJobGen exprJobGen = context.getExpressionJobGen();
+        IEvaluatorFactory filterEvalFactory = exprJobGen.createEvaluatorFactory(filterExpr, typeEnv, inputSchemas,
+                context);
+        return new AsterixTupleFilterFactory(filterEvalFactory, context.getBinaryBooleanInspector());
+    }
 
-	private AsterixTupleFilterFactory createTupleFilterFactory(
-			IOperatorSchema[] inputSchemas, IVariableTypeEnvironment typeEnv,
-			ILogicalExpression filterExpr, JobGenContext context)
-			throws AlgebricksException {
-		// No filtering condition.
-		if (filterExpr == null) {
-			return null;
-		}
-		ILogicalExpressionJobGen exprJobGen = context.getExpressionJobGen();
-		IEvaluatorFactory filterEvalFactory = exprJobGen
-				.createEvaluatorFactory(filterExpr, typeEnv, inputSchemas,
-						context);
-		return new AsterixTupleFilterFactory(filterEvalFactory,
-				context.getBinaryBooleanInspector());
-	}
-	
-	private Pair<IOperatorDescriptor, AlgebricksPartitionConstraint> getBTreeDmlRuntime(
-			String datasetName, String indexName,
-			IOperatorSchema propagatedSchema,
-			List<LogicalVariable> primaryKeys,
-			List<LogicalVariable> secondaryKeys,
-			AsterixTupleFilterFactory filterFactory,
-			RecordDescriptor recordDesc, JobGenContext context,
-			JobSpecification spec, IndexOp indexOp) throws AlgebricksException {
-		int numKeys = primaryKeys.size() + secondaryKeys.size();
-		// generate field permutations
-		int[] fieldPermutation = new int[numKeys];
-		int i = 0;
-		for (LogicalVariable varKey : secondaryKeys) {
-			int idx = propagatedSchema.findVariable(varKey);
-			fieldPermutation[i] = idx;
-			i++;
-		}
-		for (LogicalVariable varKey : primaryKeys) {
-			int idx = propagatedSchema.findVariable(varKey);
-			fieldPermutation[i] = idx;
-			i++;
-		}
+    private Pair<IOperatorDescriptor, AlgebricksPartitionConstraint> getBTreeDmlRuntime(String datasetName,
+            String indexName, IOperatorSchema propagatedSchema, List<LogicalVariable> primaryKeys,
+            List<LogicalVariable> secondaryKeys, AsterixTupleFilterFactory filterFactory, RecordDescriptor recordDesc,
+            JobGenContext context, JobSpecification spec, IndexOp indexOp) throws AlgebricksException {
+        int numKeys = primaryKeys.size() + secondaryKeys.size();
+        // generate field permutations
+        int[] fieldPermutation = new int[numKeys];
+        int i = 0;
+        for (LogicalVariable varKey : secondaryKeys) {
+            int idx = propagatedSchema.findVariable(varKey);
+            fieldPermutation[i] = idx;
+            i++;
+        }
+        for (LogicalVariable varKey : primaryKeys) {
+            int idx = propagatedSchema.findVariable(varKey);
+            fieldPermutation[i] = idx;
+            i++;
+        }
 
-		// dataset
-		AqlCompiledDatasetDecl compiledDatasetDecl = metadata
-				.findDataset(datasetName);
-		if (compiledDatasetDecl == null) {
-			throw new AlgebricksException("Unknown dataset " + datasetName);
-		}
-		String itemTypeName = compiledDatasetDecl.getItemTypeName();
-		IAType itemType;
-		try {
-			itemType = metadata.findType(itemTypeName);
-		} catch (Exception e) {
-			throw new AlgebricksException(e);
-		}
-		if (itemType.getTypeTag() != ATypeTag.RECORD) {
-			throw new AlgebricksException("Only record types can be indexed.");
-		}
-		ARecordType recType = (ARecordType) itemType;
+        // dataset
+        AqlCompiledDatasetDecl compiledDatasetDecl = metadata.findDataset(datasetName);
+        if (compiledDatasetDecl == null) {
+            throw new AlgebricksException("Unknown dataset " + datasetName);
+        }
+        String itemTypeName = compiledDatasetDecl.getItemTypeName();
+        IAType itemType;
+        try {
+            itemType = metadata.findType(itemTypeName);
+        } catch (Exception e) {
+            throw new AlgebricksException(e);
+        }
+        if (itemType.getTypeTag() != ATypeTag.RECORD) {
+            throw new AlgebricksException("Only record types can be indexed.");
+        }
+        ARecordType recType = (ARecordType) itemType;
 
-		// index parameters
-		AqlCompiledIndexDecl cid = DatasetUtils.findSecondaryIndexByName(
-				compiledDatasetDecl, indexName);
-		List<String> secondaryKeyExprs = cid.getFieldExprs();
-		ITypeTraits[] typeTraits = new ITypeTraits[numKeys];
-		IBinaryComparatorFactory[] comparatorFactories = new IBinaryComparatorFactory[numKeys];
-		for (i = 0; i < secondaryKeys.size(); ++i) {
-			IAType keyType = AqlCompiledIndexDecl.keyFieldType(
-					secondaryKeyExprs.get(i).toString(), recType);
-			comparatorFactories[i] = AqlBinaryComparatorFactoryProvider.INSTANCE
-					.getBinaryComparatorFactory(keyType, true);
-			typeTraits[i] = AqlTypeTraitProvider.INSTANCE.getTypeTrait(keyType);
-		}
-		for (Triple<IEvaluatorFactory, ScalarFunctionCallExpression, IAType> evalFactoryAndType : DatasetUtils
-				.getPartitioningFunctions(compiledDatasetDecl)) {
-			IAType keyType = evalFactoryAndType.third;
-			comparatorFactories[i] = AqlBinaryComparatorFactoryProvider.INSTANCE
-					.getBinaryComparatorFactory(keyType, true);
-			typeTraits[i] = AqlTypeTraitProvider.INSTANCE.getTypeTrait(keyType);
-			++i;
-		}
+        // index parameters
+        AqlCompiledIndexDecl cid = DatasetUtils.findSecondaryIndexByName(compiledDatasetDecl, indexName);
+        List<String> secondaryKeyExprs = cid.getFieldExprs();
+        ITypeTraits[] typeTraits = new ITypeTraits[numKeys];
+        IBinaryComparatorFactory[] comparatorFactories = new IBinaryComparatorFactory[numKeys];
+        for (i = 0; i < secondaryKeys.size(); ++i) {
+            IAType keyType = AqlCompiledIndexDecl.keyFieldType(secondaryKeyExprs.get(i).toString(), recType);
+            comparatorFactories[i] = AqlBinaryComparatorFactoryProvider.INSTANCE.getBinaryComparatorFactory(keyType,
+                    true);
+            typeTraits[i] = AqlTypeTraitProvider.INSTANCE.getTypeTrait(keyType);
+        }
+        for (Triple<IEvaluatorFactory, ScalarFunctionCallExpression, IAType> evalFactoryAndType : DatasetUtils
+                .getPartitioningFunctions(compiledDatasetDecl)) {
+            IAType keyType = evalFactoryAndType.third;
+            comparatorFactories[i] = AqlBinaryComparatorFactoryProvider.INSTANCE.getBinaryComparatorFactory(keyType,
+                    true);
+            typeTraits[i] = AqlTypeTraitProvider.INSTANCE.getTypeTrait(keyType);
+            ++i;
+        }
 
-		IAsterixApplicationContextInfo appContext = (IAsterixApplicationContextInfo) context
-				.getAppContext();
-		Pair<IFileSplitProvider, AlgebricksPartitionConstraint> splitsAndConstraint;
-		try {
-			splitsAndConstraint = metadata
-					.splitProviderAndPartitionConstraintsForInternalOrFeedDataset(
-							datasetName, indexName);
-		} catch (Exception e) {
-			throw new AlgebricksException(e);
-		}
-		TreeIndexInsertUpdateDeleteOperatorDescriptor btreeBulkLoad = new TreeIndexInsertUpdateDeleteOperatorDescriptor(
-				spec, recordDesc, appContext.getStorageManagerInterface(),
-				appContext.getIndexRegistryProvider(),
-				splitsAndConstraint.first, typeTraits, comparatorFactories,
-				fieldPermutation, indexOp, new BTreeDataflowHelperFactory(),
-				filterFactory, NoOpOperationCallbackProvider.INSTANCE, txnId);
-		return new Pair<IOperatorDescriptor, AlgebricksPartitionConstraint>(
-				btreeBulkLoad, splitsAndConstraint.second);
-	}
+        IAsterixApplicationContextInfo appContext = (IAsterixApplicationContextInfo) context.getAppContext();
+        Pair<IFileSplitProvider, AlgebricksPartitionConstraint> splitsAndConstraint;
+        try {
+            splitsAndConstraint = metadata.splitProviderAndPartitionConstraintsForInternalOrFeedDataset(datasetName,
+                    indexName);
+        } catch (Exception e) {
+            throw new AlgebricksException(e);
+        }
+        TreeIndexInsertUpdateDeleteOperatorDescriptor btreeBulkLoad = new TreeIndexInsertUpdateDeleteOperatorDescriptor(
+                spec, recordDesc, appContext.getStorageManagerInterface(), appContext.getIndexRegistryProvider(),
+                splitsAndConstraint.first, typeTraits, comparatorFactories, fieldPermutation, indexOp,
+                new BTreeDataflowHelperFactory(), filterFactory, NoOpOperationCallbackProvider.INSTANCE, txnId);
+        return new Pair<IOperatorDescriptor, AlgebricksPartitionConstraint>(btreeBulkLoad, splitsAndConstraint.second);
+    }
 
-	private Pair<IOperatorDescriptor, AlgebricksPartitionConstraint> getRTreeDmlRuntime(
-			String datasetName, String indexName,
-			IOperatorSchema propagatedSchema,
-			List<LogicalVariable> primaryKeys,
-			List<LogicalVariable> secondaryKeys,
-			AsterixTupleFilterFactory filterFactory,
-			RecordDescriptor recordDesc, JobGenContext context,
-			JobSpecification spec, IndexOp indexOp) throws AlgebricksException {
-		AqlCompiledDatasetDecl compiledDatasetDecl = metadata
-				.findDataset(datasetName);
-		String itemTypeName = compiledDatasetDecl.getItemTypeName();
-		IAType itemType;
-		try {
-			itemType = metadata.findType(itemTypeName);
-		} catch (Exception e) {
-			throw new AlgebricksException(e);
-		}
-		if (itemType.getTypeTag() != ATypeTag.RECORD) {
-			throw new AlgebricksException("Only record types can be indexed.");
-		}
-		ARecordType recType = (ARecordType) itemType;
-		AqlCompiledIndexDecl cid = DatasetUtils.findSecondaryIndexByName(
-				compiledDatasetDecl, indexName);
-		List<String> secondaryKeyExprs = cid.getFieldExprs();
-		Pair<IAType, Boolean> keyPairType = AqlCompiledIndexDecl.getNonNullableKeyFieldType(
-				secondaryKeyExprs.get(0), recType);
+    private Pair<IOperatorDescriptor, AlgebricksPartitionConstraint> getRTreeDmlRuntime(String datasetName,
+            String indexName, IOperatorSchema propagatedSchema, List<LogicalVariable> primaryKeys,
+            List<LogicalVariable> secondaryKeys, AsterixTupleFilterFactory filterFactory, RecordDescriptor recordDesc,
+            JobGenContext context, JobSpecification spec, IndexOp indexOp) throws AlgebricksException {
+        AqlCompiledDatasetDecl compiledDatasetDecl = metadata.findDataset(datasetName);
+        String itemTypeName = compiledDatasetDecl.getItemTypeName();
+        IAType itemType;
+        try {
+            itemType = metadata.findType(itemTypeName);
+        } catch (Exception e) {
+            throw new AlgebricksException(e);
+        }
+        if (itemType.getTypeTag() != ATypeTag.RECORD) {
+            throw new AlgebricksException("Only record types can be indexed.");
+        }
+        ARecordType recType = (ARecordType) itemType;
+        AqlCompiledIndexDecl cid = DatasetUtils.findSecondaryIndexByName(compiledDatasetDecl, indexName);
+        List<String> secondaryKeyExprs = cid.getFieldExprs();
+        Pair<IAType, Boolean> keyPairType = AqlCompiledIndexDecl.getNonNullableKeyFieldType(secondaryKeyExprs.get(0),
+                recType);
         IAType spatialType = keyPairType.first;
-		int dimension = NonTaggedFormatUtil.getNumDimensions(spatialType
-				.getTypeTag());
-		int numSecondaryKeys = dimension * 2;
-		int numPrimaryKeys = primaryKeys.size();
-		int numKeys = numSecondaryKeys + numPrimaryKeys;
-		ITypeTraits[] typeTraits = new ITypeTraits[numKeys];
-		IBinaryComparatorFactory[] comparatorFactories = new IBinaryComparatorFactory[numKeys];
-		int[] fieldPermutation = new int[numKeys];
-		int i = 0;
+        int dimension = NonTaggedFormatUtil.getNumDimensions(spatialType.getTypeTag());
+        int numSecondaryKeys = dimension * 2;
+        int numPrimaryKeys = primaryKeys.size();
+        int numKeys = numSecondaryKeys + numPrimaryKeys;
+        ITypeTraits[] typeTraits = new ITypeTraits[numKeys];
+        IBinaryComparatorFactory[] comparatorFactories = new IBinaryComparatorFactory[numKeys];
+        int[] fieldPermutation = new int[numKeys];
+        int i = 0;
 
-		for (LogicalVariable varKey : secondaryKeys) {
-			int idx = propagatedSchema.findVariable(varKey);
-			fieldPermutation[i] = idx;
-			i++;
-		}
-		for (LogicalVariable varKey : primaryKeys) {
-			int idx = propagatedSchema.findVariable(varKey);
-			fieldPermutation[i] = idx;
-			i++;
-		}
-		IAType nestedKeyType = NonTaggedFormatUtil
-				.getNestedSpatialType(spatialType.getTypeTag());
-		IPrimitiveValueProviderFactory[] valueProviderFactories = new IPrimitiveValueProviderFactory[numSecondaryKeys];
-		for (i = 0; i < numSecondaryKeys; i++) {
-			ISerializerDeserializer keySerde = AqlSerializerDeserializerProvider.INSTANCE
-					.getSerializerDeserializer(nestedKeyType);
-			comparatorFactories[i] = AqlBinaryComparatorFactoryProvider.INSTANCE
-					.getBinaryComparatorFactory(nestedKeyType, true);
-			typeTraits[i] = AqlTypeTraitProvider.INSTANCE
-					.getTypeTrait(nestedKeyType);
-			valueProviderFactories[i] = AqlPrimitiveValueProviderFactory.INSTANCE;
-		}
-		for (Triple<IEvaluatorFactory, ScalarFunctionCallExpression, IAType> evalFactoryAndType : DatasetUtils
-				.getPartitioningFunctions(compiledDatasetDecl)) {
-			IAType keyType = evalFactoryAndType.third;
-			comparatorFactories[i] = AqlBinaryComparatorFactoryProvider.INSTANCE
-					.getBinaryComparatorFactory(keyType, true);
-			typeTraits[i] = AqlTypeTraitProvider.INSTANCE.getTypeTrait(keyType);
-			++i;
-		}
+        for (LogicalVariable varKey : secondaryKeys) {
+            int idx = propagatedSchema.findVariable(varKey);
+            fieldPermutation[i] = idx;
+            i++;
+        }
+        for (LogicalVariable varKey : primaryKeys) {
+            int idx = propagatedSchema.findVariable(varKey);
+            fieldPermutation[i] = idx;
+            i++;
+        }
+        IAType nestedKeyType = NonTaggedFormatUtil.getNestedSpatialType(spatialType.getTypeTag());
+        IPrimitiveValueProviderFactory[] valueProviderFactories = new IPrimitiveValueProviderFactory[numSecondaryKeys];
+        for (i = 0; i < numSecondaryKeys; i++) {
+            ISerializerDeserializer keySerde = AqlSerializerDeserializerProvider.INSTANCE
+                    .getSerializerDeserializer(nestedKeyType);
+            comparatorFactories[i] = AqlBinaryComparatorFactoryProvider.INSTANCE.getBinaryComparatorFactory(
+                    nestedKeyType, true);
+            typeTraits[i] = AqlTypeTraitProvider.INSTANCE.getTypeTrait(nestedKeyType);
+            valueProviderFactories[i] = AqlPrimitiveValueProviderFactory.INSTANCE;
+        }
+        for (Triple<IEvaluatorFactory, ScalarFunctionCallExpression, IAType> evalFactoryAndType : DatasetUtils
+                .getPartitioningFunctions(compiledDatasetDecl)) {
+            IAType keyType = evalFactoryAndType.third;
+            comparatorFactories[i] = AqlBinaryComparatorFactoryProvider.INSTANCE.getBinaryComparatorFactory(keyType,
+                    true);
+            typeTraits[i] = AqlTypeTraitProvider.INSTANCE.getTypeTrait(keyType);
+            ++i;
+        }
 
-		IAsterixApplicationContextInfo appContext = (IAsterixApplicationContextInfo) context
-				.getAppContext();
-		Pair<IFileSplitProvider, AlgebricksPartitionConstraint> splitsAndConstraint;
-		try {
-			splitsAndConstraint = metadata
-					.splitProviderAndPartitionConstraintsForInternalOrFeedDataset(
-							datasetName, indexName);
-		} catch (Exception e) {
-			throw new AlgebricksException(e);
-		}
-		TreeIndexInsertUpdateDeleteOperatorDescriptor rtreeUpdate = new TreeIndexInsertUpdateDeleteOperatorDescriptor(
-				spec, recordDesc, appContext.getStorageManagerInterface(),
-				appContext.getIndexRegistryProvider(),
-				splitsAndConstraint.first, typeTraits, comparatorFactories,
-				fieldPermutation, indexOp, new RTreeDataflowHelperFactory(
-						valueProviderFactories), filterFactory,
-				NoOpOperationCallbackProvider.INSTANCE, txnId);
-		return new Pair<IOperatorDescriptor, AlgebricksPartitionConstraint>(
-				rtreeUpdate, splitsAndConstraint.second);
-	}
+        IAsterixApplicationContextInfo appContext = (IAsterixApplicationContextInfo) context.getAppContext();
+        Pair<IFileSplitProvider, AlgebricksPartitionConstraint> splitsAndConstraint;
+        try {
+            splitsAndConstraint = metadata.splitProviderAndPartitionConstraintsForInternalOrFeedDataset(datasetName,
+                    indexName);
+        } catch (Exception e) {
+            throw new AlgebricksException(e);
+        }
+        TreeIndexInsertUpdateDeleteOperatorDescriptor rtreeUpdate = new TreeIndexInsertUpdateDeleteOperatorDescriptor(
+                spec, recordDesc, appContext.getStorageManagerInterface(), appContext.getIndexRegistryProvider(),
+                splitsAndConstraint.first, typeTraits, comparatorFactories, fieldPermutation, indexOp,
+                new RTreeDataflowHelperFactory(valueProviderFactories), filterFactory,
+                NoOpOperationCallbackProvider.INSTANCE, txnId);
+        return new Pair<IOperatorDescriptor, AlgebricksPartitionConstraint>(rtreeUpdate, splitsAndConstraint.second);
+    }
 
-	public long getTxnId() {
-		return txnId;
-	}
+    public long getTxnId() {
+        return txnId;
+    }
 
-	public static ITreeIndexFrameFactory createBTreeNSMInteriorFrameFactory(
-			ITypeTraits[] typeTraits) {
-		return new BTreeNSMInteriorFrameFactory(
-				new TypeAwareTupleWriterFactory(typeTraits));
-	}
+    public static ITreeIndexFrameFactory createBTreeNSMInteriorFrameFactory(ITypeTraits[] typeTraits) {
+        return new BTreeNSMInteriorFrameFactory(new TypeAwareTupleWriterFactory(typeTraits));
+    }
 
-	public static ITreeIndexFrameFactory createBTreeNSMLeafFrameFactory(
-			ITypeTraits[] typeTraits) {
-		return new BTreeNSMLeafFrameFactory(new TypeAwareTupleWriterFactory(
-				typeTraits));
-	}
+    public static ITreeIndexFrameFactory createBTreeNSMLeafFrameFactory(ITypeTraits[] typeTraits) {
+        return new BTreeNSMLeafFrameFactory(new TypeAwareTupleWriterFactory(typeTraits));
+    }
 
-	@Override
-	public IFunctionInfo lookupFunction(FunctionIdentifier fid) {
-		return AsterixBuiltinFunctions.lookupFunction(fid);
-	}
-
+    @Override
+    public IFunctionInfo lookupFunction(FunctionIdentifier fid) {
+        return AsterixBuiltinFunctions.lookupFunction(fid);
+    }
 }