Implemented dataflow components for length-partitioned inverted indexes. Added integration test.

git-svn-id: https://hyracks.googlecode.com/svn/branches/hyracks_lsm_length_filter@2487 123451ca-8445-de46-9d55-352943316053
diff --git a/hyracks-examples/hyracks-integration-tests/src/test/java/edu/uci/ics/hyracks/tests/am/invertedindex/AbstractfWordInvertedIndexTest.java b/hyracks-examples/hyracks-integration-tests/src/test/java/edu/uci/ics/hyracks/tests/am/invertedindex/AbstractfWordInvertedIndexTest.java
new file mode 100644
index 0000000..05b7a26
--- /dev/null
+++ b/hyracks-examples/hyracks-integration-tests/src/test/java/edu/uci/ics/hyracks/tests/am/invertedindex/AbstractfWordInvertedIndexTest.java
@@ -0,0 +1,346 @@
+/*
+ * Copyright 2009-2012 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ * 
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package edu.uci.ics.hyracks.tests.am.invertedindex;
+
+import java.io.DataOutput;
+import java.io.File;
+import java.text.SimpleDateFormat;
+import java.util.Date;
+
+import org.junit.Before;
+import org.junit.Test;
+
+import edu.uci.ics.hyracks.api.constraints.PartitionConstraintHelper;
+import edu.uci.ics.hyracks.api.dataflow.IOperatorDescriptor;
+import edu.uci.ics.hyracks.api.dataflow.value.IBinaryComparatorFactory;
+import edu.uci.ics.hyracks.api.dataflow.value.ISerializerDeserializer;
+import edu.uci.ics.hyracks.api.dataflow.value.ITypeTraits;
+import edu.uci.ics.hyracks.api.dataflow.value.RecordDescriptor;
+import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
+import edu.uci.ics.hyracks.api.io.FileReference;
+import edu.uci.ics.hyracks.api.job.JobSpecification;
+import edu.uci.ics.hyracks.data.std.accessors.PointableBinaryComparatorFactory;
+import edu.uci.ics.hyracks.data.std.primitive.IntegerPointable;
+import edu.uci.ics.hyracks.data.std.primitive.UTF8StringPointable;
+import edu.uci.ics.hyracks.dataflow.common.comm.io.ArrayTupleBuilder;
+import edu.uci.ics.hyracks.dataflow.common.data.marshalling.IntegerSerializerDeserializer;
+import edu.uci.ics.hyracks.dataflow.common.data.marshalling.UTF8StringSerializerDeserializer;
+import edu.uci.ics.hyracks.dataflow.common.data.parsers.IValueParserFactory;
+import edu.uci.ics.hyracks.dataflow.common.data.parsers.IntegerParserFactory;
+import edu.uci.ics.hyracks.dataflow.common.data.parsers.UTF8StringParserFactory;
+import edu.uci.ics.hyracks.dataflow.std.connectors.OneToOneConnectorDescriptor;
+import edu.uci.ics.hyracks.dataflow.std.file.ConstantFileSplitProvider;
+import edu.uci.ics.hyracks.dataflow.std.file.DelimitedDataTupleParserFactory;
+import edu.uci.ics.hyracks.dataflow.std.file.FileScanOperatorDescriptor;
+import edu.uci.ics.hyracks.dataflow.std.file.FileSplit;
+import edu.uci.ics.hyracks.dataflow.std.file.IFileSplitProvider;
+import edu.uci.ics.hyracks.dataflow.std.file.PlainFileWriterOperatorDescriptor;
+import edu.uci.ics.hyracks.dataflow.std.misc.ConstantTupleSourceOperatorDescriptor;
+import edu.uci.ics.hyracks.dataflow.std.sort.ExternalSortOperatorDescriptor;
+import edu.uci.ics.hyracks.storage.am.btree.dataflow.BTreeDataflowHelperFactory;
+import edu.uci.ics.hyracks.storage.am.btree.dataflow.BTreeSearchOperatorDescriptor;
+import edu.uci.ics.hyracks.storage.am.common.api.IIndexLifecycleManagerProvider;
+import edu.uci.ics.hyracks.storage.am.common.dataflow.IIndexDataflowHelperFactory;
+import edu.uci.ics.hyracks.storage.am.common.dataflow.TreeIndexBulkLoadOperatorDescriptor;
+import edu.uci.ics.hyracks.storage.am.common.dataflow.TreeIndexCreateOperatorDescriptor;
+import edu.uci.ics.hyracks.storage.am.common.impls.NoOpOperationCallbackFactory;
+import edu.uci.ics.hyracks.storage.am.lsm.invertedindex.api.IInvertedIndexSearchModifierFactory;
+import edu.uci.ics.hyracks.storage.am.lsm.invertedindex.dataflow.BinaryTokenizerOperatorDescriptor;
+import edu.uci.ics.hyracks.storage.am.lsm.invertedindex.dataflow.LSMInvertedIndexBulkLoadOperatorDescriptor;
+import edu.uci.ics.hyracks.storage.am.lsm.invertedindex.dataflow.LSMInvertedIndexCreateOperatorDescriptor;
+import edu.uci.ics.hyracks.storage.am.lsm.invertedindex.dataflow.LSMInvertedIndexSearchOperatorDescriptor;
+import edu.uci.ics.hyracks.storage.am.lsm.invertedindex.search.ConjunctiveSearchModifierFactory;
+import edu.uci.ics.hyracks.storage.am.lsm.invertedindex.tokenizers.DelimitedUTF8StringBinaryTokenizerFactory;
+import edu.uci.ics.hyracks.storage.am.lsm.invertedindex.tokenizers.IBinaryTokenizerFactory;
+import edu.uci.ics.hyracks.storage.am.lsm.invertedindex.tokenizers.ITokenFactory;
+import edu.uci.ics.hyracks.storage.am.lsm.invertedindex.tokenizers.UTF8WordTokenFactory;
+import edu.uci.ics.hyracks.storage.common.IStorageManagerInterface;
+import edu.uci.ics.hyracks.storage.common.file.ILocalResourceFactoryProvider;
+import edu.uci.ics.hyracks.storage.common.file.TransientLocalResourceFactoryProvider;
+import edu.uci.ics.hyracks.test.support.TestIndexLifecycleManagerProvider;
+import edu.uci.ics.hyracks.test.support.TestStorageManagerComponentHolder;
+import edu.uci.ics.hyracks.test.support.TestStorageManagerInterface;
+import edu.uci.ics.hyracks.tests.integration.AbstractIntegrationTest;
+
+@SuppressWarnings("rawtypes")
+public abstract class AbstractfWordInvertedIndexTest extends AbstractIntegrationTest {
+    static {
+        TestStorageManagerComponentHolder.init(8192, 20, 20);
+    }
+
+    protected static final int MERGE_THRESHOLD = 3;
+
+    protected IStorageManagerInterface storageManager = new TestStorageManagerInterface();
+    protected IIndexLifecycleManagerProvider lcManagerProvider = new TestIndexLifecycleManagerProvider();
+    protected IIndexDataflowHelperFactory btreeDataflowHelperFactory = new BTreeDataflowHelperFactory();
+    protected IIndexDataflowHelperFactory invertedIndexDataflowHelperFactory;
+
+    protected final static SimpleDateFormat simpleDateFormat = new SimpleDateFormat("ddMMyy-hhmmssSS");
+    protected final static String sep = System.getProperty("file.separator");
+    protected final String dateString = simpleDateFormat.format(new Date());
+    protected final String primaryFileName = System.getProperty("java.io.tmpdir") + sep + "primaryBtree" + dateString;
+    protected final String btreeFileName = System.getProperty("java.io.tmpdir") + sep + "invIndexBtree" + dateString;
+
+    protected IFileSplitProvider primaryFileSplitProvider = new ConstantFileSplitProvider(
+            new FileSplit[] { new FileSplit(NC1_ID, new FileReference(new File(primaryFileName))) });
+    protected IFileSplitProvider btreeFileSplitProvider = new ConstantFileSplitProvider(
+            new FileSplit[] { new FileSplit(NC1_ID, new FileReference(new File(btreeFileName))) });
+
+    // Primary BTree index.
+    protected int primaryFieldCount = 2;
+    protected ITypeTraits[] primaryTypeTraits = new ITypeTraits[primaryFieldCount];
+    protected int primaryKeyFieldCount = 1;
+    protected IBinaryComparatorFactory[] primaryComparatorFactories = new IBinaryComparatorFactory[primaryKeyFieldCount];
+    protected RecordDescriptor primaryRecDesc = new RecordDescriptor(new ISerializerDeserializer[] {
+            IntegerSerializerDeserializer.INSTANCE, UTF8StringSerializerDeserializer.INSTANCE });
+
+    // Inverted index BTree dictionary.
+    protected ITypeTraits[] tokenTypeTraits;
+    protected IBinaryComparatorFactory[] tokenComparatorFactories;
+
+    // Inverted index stuff.
+    protected int invListElementFieldCount = 1;
+    protected ITypeTraits[] invListsTypeTraits = new ITypeTraits[invListElementFieldCount];
+    protected IBinaryComparatorFactory[] invListsComparatorFactories = new IBinaryComparatorFactory[invListElementFieldCount];
+    protected RecordDescriptor invListsRecDesc = new RecordDescriptor(
+            new ISerializerDeserializer[] { IntegerSerializerDeserializer.INSTANCE });
+    protected RecordDescriptor tokenizerRecDesc;
+
+    // Tokenizer stuff.
+    protected ITokenFactory tokenFactory = new UTF8WordTokenFactory();
+    protected IBinaryTokenizerFactory tokenizerFactory = new DelimitedUTF8StringBinaryTokenizerFactory(true, false,
+            tokenFactory);
+
+    // Sorting stuff.
+    IBinaryComparatorFactory[] sortComparatorFactories;
+
+    @Before
+    public void setup() throws Exception {
+        prepare();
+
+        // Field declarations and comparators for primary BTree index.
+        primaryTypeTraits[0] = IntegerPointable.TYPE_TRAITS;
+        primaryTypeTraits[1] = UTF8StringPointable.TYPE_TRAITS;
+        primaryComparatorFactories[0] = PointableBinaryComparatorFactory.of(IntegerPointable.FACTORY);
+
+        // Field declarations and comparators for inverted lists.
+        invListsTypeTraits[0] = IntegerPointable.TYPE_TRAITS;
+        invListsComparatorFactories[0] = PointableBinaryComparatorFactory.of(IntegerPointable.FACTORY);
+
+        createPrimaryIndex();
+        loadPrimaryIndex();
+        printPrimaryIndex();
+        createInvertedIndex();
+        loadInvertedIndex();
+    }
+
+    protected abstract void prepare();
+
+    protected abstract boolean addNumTokensKey();
+
+    public void createPrimaryIndex() throws Exception {
+        JobSpecification spec = new JobSpecification();
+        TransientLocalResourceFactoryProvider localResourceFactoryProvider = new TransientLocalResourceFactoryProvider();
+        TreeIndexCreateOperatorDescriptor primaryCreateOp = new TreeIndexCreateOperatorDescriptor(spec, storageManager,
+                lcManagerProvider, primaryFileSplitProvider, primaryTypeTraits, primaryComparatorFactories,
+                btreeDataflowHelperFactory, localResourceFactoryProvider, NoOpOperationCallbackFactory.INSTANCE);
+        PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, primaryCreateOp, NC1_ID);
+        spec.addRoot(primaryCreateOp);
+        runTest(spec);
+    }
+
+    public void createInvertedIndex() throws Exception {
+        JobSpecification spec = new JobSpecification();
+        ILocalResourceFactoryProvider localResourceFactoryProvider = new TransientLocalResourceFactoryProvider();
+        LSMInvertedIndexCreateOperatorDescriptor invIndexCreateOp = new LSMInvertedIndexCreateOperatorDescriptor(spec,
+                storageManager, btreeFileSplitProvider, lcManagerProvider, tokenTypeTraits, tokenComparatorFactories,
+                invListsTypeTraits, invListsComparatorFactories, tokenizerFactory, invertedIndexDataflowHelperFactory,
+                localResourceFactoryProvider, NoOpOperationCallbackFactory.INSTANCE);
+        PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, invIndexCreateOp, NC1_ID);
+        spec.addRoot(invIndexCreateOp);
+        runTest(spec);
+    }
+
+    @Test
+    public void testConjunctiveSearcher() throws Exception {
+        IInvertedIndexSearchModifierFactory conjunctiveSearchModifierFactory = new ConjunctiveSearchModifierFactory();
+        searchInvertedIndex("of", conjunctiveSearchModifierFactory);
+        searchInvertedIndex("3d", conjunctiveSearchModifierFactory);
+        searchInvertedIndex("of the human", conjunctiveSearchModifierFactory);
+    }
+
+    private IOperatorDescriptor createFileScanOp(JobSpecification spec) {
+        FileSplit[] dblpTitleFileSplits = new FileSplit[] { new FileSplit(NC1_ID, new FileReference(new File(
+                "data/cleanednumbereddblptitles.txt"))) };
+        IFileSplitProvider dblpTitleSplitProvider = new ConstantFileSplitProvider(dblpTitleFileSplits);
+        RecordDescriptor dblpTitleRecDesc = new RecordDescriptor(new ISerializerDeserializer[] {
+                IntegerSerializerDeserializer.INSTANCE, UTF8StringSerializerDeserializer.INSTANCE });
+        FileScanOperatorDescriptor dblpTitleScanner = new FileScanOperatorDescriptor(spec, dblpTitleSplitProvider,
+                new DelimitedDataTupleParserFactory(new IValueParserFactory[] { IntegerParserFactory.INSTANCE,
+                        UTF8StringParserFactory.INSTANCE }, '|'), dblpTitleRecDesc);
+        PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, dblpTitleScanner, NC1_ID);
+        return dblpTitleScanner;
+    }
+
+    private IOperatorDescriptor createPrimaryBulkLoadOp(JobSpecification spec) {
+        int[] fieldPermutation = { 0, 1 };
+        TreeIndexBulkLoadOperatorDescriptor primaryBtreeBulkLoad = new TreeIndexBulkLoadOperatorDescriptor(spec,
+                storageManager, lcManagerProvider, primaryFileSplitProvider, primaryTypeTraits,
+                primaryComparatorFactories, fieldPermutation, 0.7f, true, btreeDataflowHelperFactory,
+                NoOpOperationCallbackFactory.INSTANCE);
+        PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, primaryBtreeBulkLoad, NC1_ID);
+        return primaryBtreeBulkLoad;
+    }
+
+    private IOperatorDescriptor createScanKeyProviderOp(JobSpecification spec) throws HyracksDataException {
+        // build dummy tuple containing nothing
+        ArrayTupleBuilder tb = new ArrayTupleBuilder(primaryKeyFieldCount * 2);
+        DataOutput dos = tb.getDataOutput();
+        tb.reset();
+        UTF8StringSerializerDeserializer.INSTANCE.serialize("0", dos);
+        tb.addFieldEndOffset();
+        ISerializerDeserializer[] keyRecDescSers = { UTF8StringSerializerDeserializer.INSTANCE,
+                UTF8StringSerializerDeserializer.INSTANCE };
+        RecordDescriptor keyRecDesc = new RecordDescriptor(keyRecDescSers);
+        ConstantTupleSourceOperatorDescriptor keyProviderOp = new ConstantTupleSourceOperatorDescriptor(spec,
+                keyRecDesc, tb.getFieldEndOffsets(), tb.getByteArray(), tb.getSize());
+        PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, keyProviderOp, NC1_ID);
+        return keyProviderOp;
+    }
+
+    private IOperatorDescriptor createPrimaryScanOp(JobSpecification spec) throws HyracksDataException {
+        int[] lowKeyFields = null; // - infinity
+        int[] highKeyFields = null; // + infinity
+        BTreeSearchOperatorDescriptor primaryBtreeSearchOp = new BTreeSearchOperatorDescriptor(spec, primaryRecDesc,
+                storageManager, lcManagerProvider, primaryFileSplitProvider, primaryTypeTraits,
+                primaryComparatorFactories, lowKeyFields, highKeyFields, true, true, btreeDataflowHelperFactory, false,
+                NoOpOperationCallbackFactory.INSTANCE);
+        PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, primaryBtreeSearchOp, NC1_ID);
+        return primaryBtreeSearchOp;
+    }
+
+    private void loadPrimaryIndex() throws Exception {
+        JobSpecification spec = new JobSpecification();
+        // Assuming that the data is pre-sorted on the key. No need to sort
+        // before bulk load.
+        IOperatorDescriptor fileScanOp = createFileScanOp(spec);
+        IOperatorDescriptor primaryBulkLoad = createPrimaryBulkLoadOp(spec);
+        spec.connect(new OneToOneConnectorDescriptor(spec), fileScanOp, 0, primaryBulkLoad, 0);
+        spec.addRoot(primaryBulkLoad);
+        runTest(spec);
+    }
+
+    private void printPrimaryIndex() throws Exception {
+        JobSpecification spec = new JobSpecification();
+        IOperatorDescriptor keyProviderOp = createScanKeyProviderOp(spec);
+        IOperatorDescriptor primaryScanOp = createPrimaryScanOp(spec);
+        IFileSplitProvider outSplits = new ConstantFileSplitProvider(new FileSplit[] { new FileSplit(NC1_ID,
+                createTempFile().getAbsolutePath()) });
+        IOperatorDescriptor printer = new PlainFileWriterOperatorDescriptor(spec, outSplits, ",");
+        spec.connect(new OneToOneConnectorDescriptor(spec), keyProviderOp, 0, primaryScanOp, 0);
+        spec.connect(new OneToOneConnectorDescriptor(spec), primaryScanOp, 0, printer, 0);
+        spec.addRoot(printer);
+        runTest(spec);
+    }
+
+    private IOperatorDescriptor createExternalSortOp(JobSpecification spec, int[] sortFields,
+            RecordDescriptor outputRecDesc) {
+        ExternalSortOperatorDescriptor externalSortOp = new ExternalSortOperatorDescriptor(spec, 1000, sortFields,
+                sortComparatorFactories, outputRecDesc);
+        PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, externalSortOp, NC1_ID);
+        return externalSortOp;
+    }
+
+    private IOperatorDescriptor createBinaryTokenizerOp(JobSpecification spec, int docField, int[] keyFields) {
+        BinaryTokenizerOperatorDescriptor binaryTokenizer = new BinaryTokenizerOperatorDescriptor(spec,
+                tokenizerRecDesc, tokenizerFactory, docField, keyFields, addNumTokensKey());
+        PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, binaryTokenizer, NC1_ID);
+        return binaryTokenizer;
+    }
+
+    private IOperatorDescriptor createInvertedIndexBulkLoadOp(JobSpecification spec, int[] fieldPermutation) {
+        LSMInvertedIndexBulkLoadOperatorDescriptor invIndexBulkLoadOp = new LSMInvertedIndexBulkLoadOperatorDescriptor(
+                spec, fieldPermutation, true, storageManager, btreeFileSplitProvider, lcManagerProvider,
+                tokenTypeTraits, tokenComparatorFactories, invListsTypeTraits, invListsComparatorFactories,
+                tokenizerFactory, invertedIndexDataflowHelperFactory, NoOpOperationCallbackFactory.INSTANCE);
+        PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, invIndexBulkLoadOp, NC1_ID);
+        return invIndexBulkLoadOp;
+    }
+
+    public void loadInvertedIndex() throws Exception {
+        JobSpecification spec = new JobSpecification();
+        IOperatorDescriptor keyProviderOp = createScanKeyProviderOp(spec);
+        IOperatorDescriptor primaryScanOp = createPrimaryScanOp(spec);
+        int docField = 1;
+        int[] keyFields = { 0 };
+        IOperatorDescriptor binaryTokenizerOp = createBinaryTokenizerOp(spec, docField, keyFields);
+        int[] sortFields = new int[sortComparatorFactories.length];
+        int[] fieldPermutation = new int[sortComparatorFactories.length];
+        for (int i = 0; i < sortFields.length; i++) {
+            sortFields[i] = i;
+            fieldPermutation[i] = i;
+        }
+        IOperatorDescriptor externalSortOp = createExternalSortOp(spec, sortFields, tokenizerRecDesc);
+        IOperatorDescriptor invIndexBulkLoadOp = createInvertedIndexBulkLoadOp(spec, fieldPermutation);
+        spec.connect(new OneToOneConnectorDescriptor(spec), keyProviderOp, 0, primaryScanOp, 0);
+        spec.connect(new OneToOneConnectorDescriptor(spec), primaryScanOp, 0, binaryTokenizerOp, 0);
+        spec.connect(new OneToOneConnectorDescriptor(spec), binaryTokenizerOp, 0, externalSortOp, 0);
+        spec.connect(new OneToOneConnectorDescriptor(spec), externalSortOp, 0, invIndexBulkLoadOp, 0);
+        spec.addRoot(invIndexBulkLoadOp);
+        runTest(spec);
+    }
+
+    private IOperatorDescriptor createQueryProviderOp(JobSpecification spec, String queryString)
+            throws HyracksDataException {
+        // Build tuple with exactly one field, which is the query,
+        ArrayTupleBuilder tb = new ArrayTupleBuilder(1);
+        DataOutput dos = tb.getDataOutput();
+        tb.reset();
+        UTF8StringSerializerDeserializer.INSTANCE.serialize(queryString, dos);
+        tb.addFieldEndOffset();
+        ISerializerDeserializer[] querySerde = { UTF8StringSerializerDeserializer.INSTANCE };
+        RecordDescriptor queryRecDesc = new RecordDescriptor(querySerde);
+        ConstantTupleSourceOperatorDescriptor queryProviderOp = new ConstantTupleSourceOperatorDescriptor(spec,
+                queryRecDesc, tb.getFieldEndOffsets(), tb.getByteArray(), tb.getSize());
+        PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, queryProviderOp, NC1_ID);
+        return queryProviderOp;
+    }
+
+    private IOperatorDescriptor createInvertedIndexSearchOp(JobSpecification spec,
+            IInvertedIndexSearchModifierFactory searchModifierFactory) {
+        LSMInvertedIndexSearchOperatorDescriptor invIndexSearchOp = new LSMInvertedIndexSearchOperatorDescriptor(spec,
+                0, storageManager, btreeFileSplitProvider, lcManagerProvider, tokenTypeTraits,
+                tokenComparatorFactories, invListsTypeTraits, invListsComparatorFactories,
+                invertedIndexDataflowHelperFactory, tokenizerFactory, searchModifierFactory, invListsRecDesc, false,
+                NoOpOperationCallbackFactory.INSTANCE);
+        PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, invIndexSearchOp, NC1_ID);
+        return invIndexSearchOp;
+    }
+
+    public void searchInvertedIndex(String queryString, IInvertedIndexSearchModifierFactory searchModifierFactory)
+            throws Exception {
+        JobSpecification spec = new JobSpecification();
+        IOperatorDescriptor queryProviderOp = createQueryProviderOp(spec, queryString);
+        IOperatorDescriptor invIndexSearchOp = createInvertedIndexSearchOp(spec, searchModifierFactory);
+        IFileSplitProvider outSplits = new ConstantFileSplitProvider(new FileSplit[] { new FileSplit(NC1_ID,
+                createTempFile().getAbsolutePath()) });
+        IOperatorDescriptor printer = new PlainFileWriterOperatorDescriptor(spec, outSplits, ",");
+        spec.connect(new OneToOneConnectorDescriptor(spec), queryProviderOp, 0, invIndexSearchOp, 0);
+        spec.connect(new OneToOneConnectorDescriptor(spec), invIndexSearchOp, 0, printer, 0);
+        spec.addRoot(printer);
+        runTest(spec);
+    }
+}
diff --git a/hyracks-examples/hyracks-integration-tests/src/test/java/edu/uci/ics/hyracks/tests/am/invertedindex/BinaryTokenizerOperatorTest.java b/hyracks-examples/hyracks-integration-tests/src/test/java/edu/uci/ics/hyracks/tests/am/invertedindex/BinaryTokenizerOperatorTest.java
index 76155ae..47480da 100644
--- a/hyracks-examples/hyracks-integration-tests/src/test/java/edu/uci/ics/hyracks/tests/am/invertedindex/BinaryTokenizerOperatorTest.java
+++ b/hyracks-examples/hyracks-integration-tests/src/test/java/edu/uci/ics/hyracks/tests/am/invertedindex/BinaryTokenizerOperatorTest.java
@@ -11,6 +11,7 @@
 import edu.uci.ics.hyracks.api.io.FileReference;
 import edu.uci.ics.hyracks.api.job.JobSpecification;
 import edu.uci.ics.hyracks.dataflow.common.data.marshalling.IntegerSerializerDeserializer;
+import edu.uci.ics.hyracks.dataflow.common.data.marshalling.ShortSerializerDeserializer;
 import edu.uci.ics.hyracks.dataflow.common.data.marshalling.UTF8StringSerializerDeserializer;
 import edu.uci.ics.hyracks.dataflow.common.data.parsers.IValueParserFactory;
 import edu.uci.ics.hyracks.dataflow.common.data.parsers.IntegerParserFactory;
@@ -33,6 +34,15 @@
 
     @Test
     public void tokenizerTest() throws Exception {
+        test(false);
+    }
+
+    @Test
+    public void tokenizerWithNumTokensTest() throws Exception {
+        test(true);
+    }
+
+    private void test(boolean addNumTokensKey) throws Exception {
         JobSpecification spec = new JobSpecification();
 
         FileSplit[] dblpTitleFileSplits = new FileSplit[] { new FileSplit(NC1_ID, new FileReference(new File(
@@ -46,16 +56,22 @@
                         UTF8StringParserFactory.INSTANCE }, '|'), dblpTitleRecDesc);
         PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, dblpTitleScanner, NC1_ID);
 
-        RecordDescriptor tokenizerRecDesc = new RecordDescriptor(new ISerializerDeserializer[] {
-                UTF8StringSerializerDeserializer.INSTANCE, IntegerSerializerDeserializer.INSTANCE });
+        RecordDescriptor tokenizerRecDesc;
+        if (!addNumTokensKey) {
+            tokenizerRecDesc = new RecordDescriptor(new ISerializerDeserializer[] {
+                    UTF8StringSerializerDeserializer.INSTANCE, IntegerSerializerDeserializer.INSTANCE });
+        } else {
+            tokenizerRecDesc = new RecordDescriptor(new ISerializerDeserializer[] {
+                    UTF8StringSerializerDeserializer.INSTANCE, ShortSerializerDeserializer.INSTANCE,
+                    IntegerSerializerDeserializer.INSTANCE });
+        }
 
         ITokenFactory tokenFactory = new UTF8WordTokenFactory();
         IBinaryTokenizerFactory tokenizerFactory = new DelimitedUTF8StringBinaryTokenizerFactory(true, false,
                 tokenFactory);
-        int[] tokenFields = { 1 };
         int[] keyFields = { 0 };
         BinaryTokenizerOperatorDescriptor binaryTokenizer = new BinaryTokenizerOperatorDescriptor(spec,
-                tokenizerRecDesc, tokenizerFactory, tokenFields, keyFields);
+                tokenizerRecDesc, tokenizerFactory, 1, keyFields, addNumTokensKey);
         PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, binaryTokenizer, NC1_ID);
 
         IFileSplitProvider outSplits = new ConstantFileSplitProvider(new FileSplit[] { new FileSplit(NC1_ID,
diff --git a/hyracks-examples/hyracks-integration-tests/src/test/java/edu/uci/ics/hyracks/tests/am/invertedindex/InvertedIndexOperatorsTest.java b/hyracks-examples/hyracks-integration-tests/src/test/java/edu/uci/ics/hyracks/tests/am/invertedindex/InvertedIndexOperatorsTest.java
deleted file mode 100644
index d108289..0000000
--- a/hyracks-examples/hyracks-integration-tests/src/test/java/edu/uci/ics/hyracks/tests/am/invertedindex/InvertedIndexOperatorsTest.java
+++ /dev/null
@@ -1,73 +0,0 @@
-package edu.uci.ics.hyracks.tests.am.invertedindex;
-
-import java.io.File;
-
-import org.junit.Test;
-
-import edu.uci.ics.hyracks.api.constraints.PartitionConstraintHelper;
-import edu.uci.ics.hyracks.api.dataflow.IOperatorDescriptor;
-import edu.uci.ics.hyracks.api.dataflow.value.ISerializerDeserializer;
-import edu.uci.ics.hyracks.api.dataflow.value.RecordDescriptor;
-import edu.uci.ics.hyracks.api.io.FileReference;
-import edu.uci.ics.hyracks.api.job.JobSpecification;
-import edu.uci.ics.hyracks.dataflow.common.data.marshalling.IntegerSerializerDeserializer;
-import edu.uci.ics.hyracks.dataflow.common.data.marshalling.UTF8StringSerializerDeserializer;
-import edu.uci.ics.hyracks.dataflow.common.data.parsers.IValueParserFactory;
-import edu.uci.ics.hyracks.dataflow.common.data.parsers.IntegerParserFactory;
-import edu.uci.ics.hyracks.dataflow.common.data.parsers.UTF8StringParserFactory;
-import edu.uci.ics.hyracks.dataflow.std.connectors.OneToOneConnectorDescriptor;
-import edu.uci.ics.hyracks.dataflow.std.file.ConstantFileSplitProvider;
-import edu.uci.ics.hyracks.dataflow.std.file.DelimitedDataTupleParserFactory;
-import edu.uci.ics.hyracks.dataflow.std.file.FileScanOperatorDescriptor;
-import edu.uci.ics.hyracks.dataflow.std.file.FileSplit;
-import edu.uci.ics.hyracks.dataflow.std.file.IFileSplitProvider;
-import edu.uci.ics.hyracks.dataflow.std.file.PlainFileWriterOperatorDescriptor;
-import edu.uci.ics.hyracks.storage.am.lsm.invertedindex.dataflow.BinaryTokenizerOperatorDescriptor;
-import edu.uci.ics.hyracks.storage.am.lsm.invertedindex.tokenizers.DelimitedUTF8StringBinaryTokenizerFactory;
-import edu.uci.ics.hyracks.storage.am.lsm.invertedindex.tokenizers.IBinaryTokenizerFactory;
-import edu.uci.ics.hyracks.storage.am.lsm.invertedindex.tokenizers.ITokenFactory;
-import edu.uci.ics.hyracks.storage.am.lsm.invertedindex.tokenizers.UTF8WordTokenFactory;
-import edu.uci.ics.hyracks.tests.integration.AbstractIntegrationTest;
-
-public class InvertedIndexOperatorsTest extends AbstractIntegrationTest {
-
-    @Test
-    public void tokenizerTest() throws Exception {
-        JobSpecification spec = new JobSpecification();
-
-        FileSplit[] dblpTitleFileSplits = new FileSplit[] { new FileSplit(NC1_ID, new FileReference(new File(
-                "data/cleanednumbereddblptitles.txt"))) };
-        IFileSplitProvider dblpTitleSplitProvider = new ConstantFileSplitProvider(dblpTitleFileSplits);
-        RecordDescriptor dblpTitleRecDesc = new RecordDescriptor(new ISerializerDeserializer[] {
-                IntegerSerializerDeserializer.INSTANCE, UTF8StringSerializerDeserializer.INSTANCE });
-
-        FileScanOperatorDescriptor dblpTitleScanner = new FileScanOperatorDescriptor(spec, dblpTitleSplitProvider,
-                new DelimitedDataTupleParserFactory(new IValueParserFactory[] { IntegerParserFactory.INSTANCE,
-                        UTF8StringParserFactory.INSTANCE }, '|'), dblpTitleRecDesc);
-        PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, dblpTitleScanner, NC1_ID);
-
-        RecordDescriptor tokenizerRecDesc = new RecordDescriptor(new ISerializerDeserializer[] {
-                UTF8StringSerializerDeserializer.INSTANCE, IntegerSerializerDeserializer.INSTANCE });
-
-        ITokenFactory tokenFactory = new UTF8WordTokenFactory();
-        IBinaryTokenizerFactory tokenizerFactory = new DelimitedUTF8StringBinaryTokenizerFactory(true, false,
-                tokenFactory);
-        int[] tokenFields = { 1 };
-        int[] projFields = { 0 };
-        BinaryTokenizerOperatorDescriptor binaryTokenizer = new BinaryTokenizerOperatorDescriptor(spec,
-                tokenizerRecDesc, tokenizerFactory, tokenFields, projFields);
-        PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, binaryTokenizer, NC1_ID);
-
-        IFileSplitProvider outSplits = new ConstantFileSplitProvider(new FileSplit[] { new FileSplit(NC1_ID,
-                createTempFile().getAbsolutePath()) });
-        IOperatorDescriptor printer = new PlainFileWriterOperatorDescriptor(spec, outSplits, ",");
-        PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, printer, NC1_ID);
-
-        spec.connect(new OneToOneConnectorDescriptor(spec), dblpTitleScanner, 0, binaryTokenizer, 0);
-
-        spec.connect(new OneToOneConnectorDescriptor(spec), binaryTokenizer, 0, printer, 0);
-
-        spec.addRoot(printer);
-        runTest(spec);
-    }
-}
\ No newline at end of file
diff --git a/hyracks-examples/hyracks-integration-tests/src/test/java/edu/uci/ics/hyracks/tests/am/invertedindex/PartitionedWordInvertedIndexTest.java b/hyracks-examples/hyracks-integration-tests/src/test/java/edu/uci/ics/hyracks/tests/am/invertedindex/PartitionedWordInvertedIndexTest.java
new file mode 100644
index 0000000..1bd6878
--- /dev/null
+++ b/hyracks-examples/hyracks-integration-tests/src/test/java/edu/uci/ics/hyracks/tests/am/invertedindex/PartitionedWordInvertedIndexTest.java
@@ -0,0 +1,64 @@
+/*
+ * Copyright 2009-2012 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ * 
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package edu.uci.ics.hyracks.tests.am.invertedindex;
+
+import edu.uci.ics.hyracks.api.dataflow.value.IBinaryComparatorFactory;
+import edu.uci.ics.hyracks.api.dataflow.value.ISerializerDeserializer;
+import edu.uci.ics.hyracks.api.dataflow.value.ITypeTraits;
+import edu.uci.ics.hyracks.api.dataflow.value.RecordDescriptor;
+import edu.uci.ics.hyracks.data.std.accessors.PointableBinaryComparatorFactory;
+import edu.uci.ics.hyracks.data.std.primitive.IntegerPointable;
+import edu.uci.ics.hyracks.data.std.primitive.ShortPointable;
+import edu.uci.ics.hyracks.data.std.primitive.UTF8StringPointable;
+import edu.uci.ics.hyracks.dataflow.common.data.marshalling.IntegerSerializerDeserializer;
+import edu.uci.ics.hyracks.dataflow.common.data.marshalling.ShortSerializerDeserializer;
+import edu.uci.ics.hyracks.dataflow.common.data.marshalling.UTF8StringSerializerDeserializer;
+import edu.uci.ics.hyracks.storage.am.lsm.common.impls.ConstantMergePolicyProvider;
+import edu.uci.ics.hyracks.storage.am.lsm.common.impls.FlushControllerProvider;
+import edu.uci.ics.hyracks.storage.am.lsm.common.impls.ImmediateSchedulerProvider;
+import edu.uci.ics.hyracks.storage.am.lsm.common.impls.RefCountingOperationTrackerFactory;
+import edu.uci.ics.hyracks.storage.am.lsm.invertedindex.dataflow.PartitionedLSMInvertedIndexDataflowHelperFactory;
+
+public class PartitionedWordInvertedIndexTest extends AbstractfWordInvertedIndexTest {
+
+    @Override
+    protected void prepare() {
+        // Field declarations and comparators for tokens.
+        tokenTypeTraits = new ITypeTraits[] { UTF8StringPointable.TYPE_TRAITS, ShortPointable.TYPE_TRAITS };
+        tokenComparatorFactories = new IBinaryComparatorFactory[] {
+                PointableBinaryComparatorFactory.of(UTF8StringPointable.FACTORY),
+                PointableBinaryComparatorFactory.of(ShortPointable.FACTORY) };
+
+        tokenizerRecDesc = new RecordDescriptor(new ISerializerDeserializer[] {
+                UTF8StringSerializerDeserializer.INSTANCE, ShortSerializerDeserializer.INSTANCE,
+                IntegerSerializerDeserializer.INSTANCE });
+
+        sortComparatorFactories = new IBinaryComparatorFactory[] {
+                PointableBinaryComparatorFactory.of(UTF8StringPointable.FACTORY),
+                PointableBinaryComparatorFactory.of(ShortPointable.FACTORY),
+                PointableBinaryComparatorFactory.of(IntegerPointable.FACTORY) };
+
+        invertedIndexDataflowHelperFactory = new PartitionedLSMInvertedIndexDataflowHelperFactory(
+                new FlushControllerProvider(), new ConstantMergePolicyProvider(ImmediateSchedulerProvider.INSTANCE,
+                        MERGE_THRESHOLD), RefCountingOperationTrackerFactory.INSTANCE,
+                ImmediateSchedulerProvider.INSTANCE);
+    }
+
+    @Override
+    protected boolean addNumTokensKey() {
+        return true;
+    }
+}
diff --git a/hyracks-examples/hyracks-integration-tests/src/test/java/edu/uci/ics/hyracks/tests/am/invertedindex/WordInvertedIndexTest.java b/hyracks-examples/hyracks-integration-tests/src/test/java/edu/uci/ics/hyracks/tests/am/invertedindex/WordInvertedIndexTest.java
index a63cc07..69c7c90 100644
--- a/hyracks-examples/hyracks-integration-tests/src/test/java/edu/uci/ics/hyracks/tests/am/invertedindex/WordInvertedIndexTest.java
+++ b/hyracks-examples/hyracks-integration-tests/src/test/java/edu/uci/ics/hyracks/tests/am/invertedindex/WordInvertedIndexTest.java
@@ -15,332 +15,44 @@
 
 package edu.uci.ics.hyracks.tests.am.invertedindex;
 
-import java.io.DataOutput;
-import java.io.File;
-import java.text.SimpleDateFormat;
-import java.util.Date;
-
-import org.junit.Before;
-import org.junit.Test;
-
-import edu.uci.ics.hyracks.api.constraints.PartitionConstraintHelper;
-import edu.uci.ics.hyracks.api.dataflow.IOperatorDescriptor;
 import edu.uci.ics.hyracks.api.dataflow.value.IBinaryComparatorFactory;
 import edu.uci.ics.hyracks.api.dataflow.value.ISerializerDeserializer;
 import edu.uci.ics.hyracks.api.dataflow.value.ITypeTraits;
 import edu.uci.ics.hyracks.api.dataflow.value.RecordDescriptor;
-import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
-import edu.uci.ics.hyracks.api.io.FileReference;
-import edu.uci.ics.hyracks.api.job.JobSpecification;
 import edu.uci.ics.hyracks.data.std.accessors.PointableBinaryComparatorFactory;
 import edu.uci.ics.hyracks.data.std.primitive.IntegerPointable;
 import edu.uci.ics.hyracks.data.std.primitive.UTF8StringPointable;
-import edu.uci.ics.hyracks.dataflow.common.comm.io.ArrayTupleBuilder;
 import edu.uci.ics.hyracks.dataflow.common.data.marshalling.IntegerSerializerDeserializer;
 import edu.uci.ics.hyracks.dataflow.common.data.marshalling.UTF8StringSerializerDeserializer;
-import edu.uci.ics.hyracks.dataflow.common.data.parsers.IValueParserFactory;
-import edu.uci.ics.hyracks.dataflow.common.data.parsers.IntegerParserFactory;
-import edu.uci.ics.hyracks.dataflow.common.data.parsers.UTF8StringParserFactory;
-import edu.uci.ics.hyracks.dataflow.std.connectors.OneToOneConnectorDescriptor;
-import edu.uci.ics.hyracks.dataflow.std.file.ConstantFileSplitProvider;
-import edu.uci.ics.hyracks.dataflow.std.file.DelimitedDataTupleParserFactory;
-import edu.uci.ics.hyracks.dataflow.std.file.FileScanOperatorDescriptor;
-import edu.uci.ics.hyracks.dataflow.std.file.FileSplit;
-import edu.uci.ics.hyracks.dataflow.std.file.IFileSplitProvider;
-import edu.uci.ics.hyracks.dataflow.std.file.PlainFileWriterOperatorDescriptor;
-import edu.uci.ics.hyracks.dataflow.std.misc.ConstantTupleSourceOperatorDescriptor;
-import edu.uci.ics.hyracks.dataflow.std.sort.ExternalSortOperatorDescriptor;
-import edu.uci.ics.hyracks.storage.am.btree.dataflow.BTreeDataflowHelperFactory;
-import edu.uci.ics.hyracks.storage.am.btree.dataflow.BTreeSearchOperatorDescriptor;
-import edu.uci.ics.hyracks.storage.am.common.api.IIndexLifecycleManagerProvider;
-import edu.uci.ics.hyracks.storage.am.common.dataflow.IIndexDataflowHelperFactory;
-import edu.uci.ics.hyracks.storage.am.common.dataflow.TreeIndexBulkLoadOperatorDescriptor;
-import edu.uci.ics.hyracks.storage.am.common.dataflow.TreeIndexCreateOperatorDescriptor;
-import edu.uci.ics.hyracks.storage.am.common.impls.NoOpOperationCallbackFactory;
 import edu.uci.ics.hyracks.storage.am.lsm.common.impls.ConstantMergePolicyProvider;
 import edu.uci.ics.hyracks.storage.am.lsm.common.impls.FlushControllerProvider;
 import edu.uci.ics.hyracks.storage.am.lsm.common.impls.ImmediateSchedulerProvider;
 import edu.uci.ics.hyracks.storage.am.lsm.common.impls.RefCountingOperationTrackerFactory;
-import edu.uci.ics.hyracks.storage.am.lsm.invertedindex.api.IInvertedIndexSearchModifierFactory;
-import edu.uci.ics.hyracks.storage.am.lsm.invertedindex.dataflow.BinaryTokenizerOperatorDescriptor;
-import edu.uci.ics.hyracks.storage.am.lsm.invertedindex.dataflow.LSMInvertedIndexBulkLoadOperatorDescriptor;
-import edu.uci.ics.hyracks.storage.am.lsm.invertedindex.dataflow.LSMInvertedIndexCreateOperatorDescriptor;
 import edu.uci.ics.hyracks.storage.am.lsm.invertedindex.dataflow.LSMInvertedIndexDataflowHelperFactory;
-import edu.uci.ics.hyracks.storage.am.lsm.invertedindex.dataflow.LSMInvertedIndexSearchOperatorDescriptor;
-import edu.uci.ics.hyracks.storage.am.lsm.invertedindex.search.ConjunctiveSearchModifierFactory;
-import edu.uci.ics.hyracks.storage.am.lsm.invertedindex.tokenizers.DelimitedUTF8StringBinaryTokenizerFactory;
-import edu.uci.ics.hyracks.storage.am.lsm.invertedindex.tokenizers.IBinaryTokenizerFactory;
-import edu.uci.ics.hyracks.storage.am.lsm.invertedindex.tokenizers.ITokenFactory;
-import edu.uci.ics.hyracks.storage.am.lsm.invertedindex.tokenizers.UTF8WordTokenFactory;
-import edu.uci.ics.hyracks.storage.common.IStorageManagerInterface;
-import edu.uci.ics.hyracks.storage.common.file.ILocalResourceFactoryProvider;
-import edu.uci.ics.hyracks.storage.common.file.TransientLocalResourceFactoryProvider;
-import edu.uci.ics.hyracks.test.support.TestIndexLifecycleManagerProvider;
-import edu.uci.ics.hyracks.test.support.TestStorageManagerComponentHolder;
-import edu.uci.ics.hyracks.test.support.TestStorageManagerInterface;
-import edu.uci.ics.hyracks.tests.integration.AbstractIntegrationTest;
 
-@SuppressWarnings("rawtypes")
-public class WordInvertedIndexTest extends AbstractIntegrationTest {
-    static {
-        TestStorageManagerComponentHolder.init(8192, 20, 20);
-    }
+public class WordInvertedIndexTest extends AbstractfWordInvertedIndexTest {
 
-    private static final int MERGE_THRESHOLD = 3;
-
-    private IStorageManagerInterface storageManager = new TestStorageManagerInterface();
-    private IIndexLifecycleManagerProvider lcManagerProvider = new TestIndexLifecycleManagerProvider();
-    private IIndexDataflowHelperFactory btreeDataflowHelperFactory = new BTreeDataflowHelperFactory();
-    private IIndexDataflowHelperFactory invertedIndexDataflowHelperFactory = new LSMInvertedIndexDataflowHelperFactory(
-            new FlushControllerProvider(), new ConstantMergePolicyProvider(ImmediateSchedulerProvider.INSTANCE,
-                    MERGE_THRESHOLD), RefCountingOperationTrackerFactory.INSTANCE, ImmediateSchedulerProvider.INSTANCE);
-
-    private final static SimpleDateFormat simpleDateFormat = new SimpleDateFormat("ddMMyy-hhmmssSS");
-    private final static String sep = System.getProperty("file.separator");
-    private final String dateString = simpleDateFormat.format(new Date());
-    private final String primaryFileName = System.getProperty("java.io.tmpdir") + sep + "primaryBtree" + dateString;
-    private final String btreeFileName = System.getProperty("java.io.tmpdir") + sep + "invIndexBtree" + dateString;
-
-    private IFileSplitProvider primaryFileSplitProvider = new ConstantFileSplitProvider(
-            new FileSplit[] { new FileSplit(NC1_ID, new FileReference(new File(primaryFileName))) });
-    private IFileSplitProvider btreeFileSplitProvider = new ConstantFileSplitProvider(new FileSplit[] { new FileSplit(
-            NC1_ID, new FileReference(new File(btreeFileName))) });
-
-    // Primary BTree index.
-    private int primaryFieldCount = 2;
-    private ITypeTraits[] primaryTypeTraits = new ITypeTraits[primaryFieldCount];
-    private int primaryKeyFieldCount = 1;
-    private IBinaryComparatorFactory[] primaryComparatorFactories = new IBinaryComparatorFactory[primaryKeyFieldCount];
-    private RecordDescriptor primaryRecDesc = new RecordDescriptor(new ISerializerDeserializer[] {
-            IntegerSerializerDeserializer.INSTANCE, UTF8StringSerializerDeserializer.INSTANCE });
-
-    // Inverted index BTree dictionary.
-    private ITypeTraits[] tokenTypeTraits = new ITypeTraits[1];
-    private IBinaryComparatorFactory[] tokenComparatorFactories = new IBinaryComparatorFactory[1];
-
-    // Inverted index stuff.
-    private int invListElementFieldCount = 1;
-    private ITypeTraits[] invListsTypeTraits = new ITypeTraits[invListElementFieldCount];
-    private IBinaryComparatorFactory[] invListsComparatorFactories = new IBinaryComparatorFactory[invListElementFieldCount];
-    private RecordDescriptor tokenizerRecDesc = new RecordDescriptor(new ISerializerDeserializer[] {
-            UTF8StringSerializerDeserializer.INSTANCE, IntegerSerializerDeserializer.INSTANCE });
-    private RecordDescriptor invListsRecDesc = new RecordDescriptor(
-            new ISerializerDeserializer[] { IntegerSerializerDeserializer.INSTANCE });
-
-    // Tokenizer stuff.
-    private ITokenFactory tokenFactory = new UTF8WordTokenFactory();
-    private IBinaryTokenizerFactory tokenizerFactory = new DelimitedUTF8StringBinaryTokenizerFactory(true, false,
-            tokenFactory);
-
-    @Before
-    public void setup() throws Exception {
-        // Field declarations and comparators for primary BTree index.
-        primaryTypeTraits[0] = IntegerPointable.TYPE_TRAITS;
-        primaryTypeTraits[1] = UTF8StringPointable.TYPE_TRAITS;
-        primaryComparatorFactories[0] = PointableBinaryComparatorFactory.of(IntegerPointable.FACTORY);
-
+    @Override
+    protected void prepare() {
         // Field declarations and comparators for tokens.
-        tokenTypeTraits[0] = UTF8StringPointable.TYPE_TRAITS;
-        tokenComparatorFactories[0] = PointableBinaryComparatorFactory.of(UTF8StringPointable.FACTORY);
+        tokenTypeTraits = new ITypeTraits[] { UTF8StringPointable.TYPE_TRAITS };
+        tokenComparatorFactories = new IBinaryComparatorFactory[] { PointableBinaryComparatorFactory
+                .of(UTF8StringPointable.FACTORY) };
 
-        // Field declarations and comparators for inverted lists.
-        invListsTypeTraits[0] = IntegerPointable.TYPE_TRAITS;
-        invListsComparatorFactories[0] = PointableBinaryComparatorFactory.of(IntegerPointable.FACTORY);
+        tokenizerRecDesc = new RecordDescriptor(new ISerializerDeserializer[] {
+                UTF8StringSerializerDeserializer.INSTANCE, IntegerSerializerDeserializer.INSTANCE });
 
-        createPrimaryIndex();
-        loadPrimaryIndex();
-        printPrimaryIndex();
-        createInvertedIndex();
-        loadInvertedIndex();
+        sortComparatorFactories = new IBinaryComparatorFactory[] {
+                PointableBinaryComparatorFactory.of(UTF8StringPointable.FACTORY),
+                PointableBinaryComparatorFactory.of(IntegerPointable.FACTORY) };
+
+        invertedIndexDataflowHelperFactory = new LSMInvertedIndexDataflowHelperFactory(new FlushControllerProvider(),
+                new ConstantMergePolicyProvider(ImmediateSchedulerProvider.INSTANCE, MERGE_THRESHOLD),
+                RefCountingOperationTrackerFactory.INSTANCE, ImmediateSchedulerProvider.INSTANCE);
     }
 
-    public void createPrimaryIndex() throws Exception {
-        JobSpecification spec = new JobSpecification();
-        TransientLocalResourceFactoryProvider localResourceFactoryProvider = new TransientLocalResourceFactoryProvider();
-        TreeIndexCreateOperatorDescriptor primaryCreateOp = new TreeIndexCreateOperatorDescriptor(spec, storageManager,
-                lcManagerProvider, primaryFileSplitProvider, primaryTypeTraits, primaryComparatorFactories,
-                btreeDataflowHelperFactory, localResourceFactoryProvider, NoOpOperationCallbackFactory.INSTANCE);
-        PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, primaryCreateOp, NC1_ID);
-        spec.addRoot(primaryCreateOp);
-        runTest(spec);
-    }
-
-    public void createInvertedIndex() throws Exception {
-        JobSpecification spec = new JobSpecification();
-        ILocalResourceFactoryProvider localResourceFactoryProvider = new TransientLocalResourceFactoryProvider();
-        LSMInvertedIndexCreateOperatorDescriptor invIndexCreateOp = new LSMInvertedIndexCreateOperatorDescriptor(spec,
-                storageManager, btreeFileSplitProvider, lcManagerProvider, tokenTypeTraits, tokenComparatorFactories,
-                invListsTypeTraits, invListsComparatorFactories, tokenizerFactory, invertedIndexDataflowHelperFactory,
-                localResourceFactoryProvider, NoOpOperationCallbackFactory.INSTANCE);
-        PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, invIndexCreateOp, NC1_ID);
-        spec.addRoot(invIndexCreateOp);
-        runTest(spec);
-    }
-
-    @Test
-    public void testConjunctiveSearcher() throws Exception {
-        IInvertedIndexSearchModifierFactory conjunctiveSearchModifierFactory = new ConjunctiveSearchModifierFactory();
-        searchInvertedIndex("of", conjunctiveSearchModifierFactory);
-        searchInvertedIndex("3d", conjunctiveSearchModifierFactory);
-        searchInvertedIndex("of the human", conjunctiveSearchModifierFactory);
-    }
-
-    private IOperatorDescriptor createFileScanOp(JobSpecification spec) {
-        FileSplit[] dblpTitleFileSplits = new FileSplit[] { new FileSplit(NC1_ID, new FileReference(new File(
-                "data/cleanednumbereddblptitles.txt"))) };
-        IFileSplitProvider dblpTitleSplitProvider = new ConstantFileSplitProvider(dblpTitleFileSplits);
-        RecordDescriptor dblpTitleRecDesc = new RecordDescriptor(new ISerializerDeserializer[] {
-                IntegerSerializerDeserializer.INSTANCE, UTF8StringSerializerDeserializer.INSTANCE });
-        FileScanOperatorDescriptor dblpTitleScanner = new FileScanOperatorDescriptor(spec, dblpTitleSplitProvider,
-                new DelimitedDataTupleParserFactory(new IValueParserFactory[] { IntegerParserFactory.INSTANCE,
-                        UTF8StringParserFactory.INSTANCE }, '|'), dblpTitleRecDesc);
-        PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, dblpTitleScanner, NC1_ID);
-        return dblpTitleScanner;
-    }
-
-    private IOperatorDescriptor createPrimaryBulkLoadOp(JobSpecification spec) {
-        int[] fieldPermutation = { 0, 1 };
-        TreeIndexBulkLoadOperatorDescriptor primaryBtreeBulkLoad = new TreeIndexBulkLoadOperatorDescriptor(spec,
-                storageManager, lcManagerProvider, primaryFileSplitProvider, primaryTypeTraits,
-                primaryComparatorFactories, fieldPermutation, 0.7f, true, btreeDataflowHelperFactory,
-                NoOpOperationCallbackFactory.INSTANCE);
-        PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, primaryBtreeBulkLoad, NC1_ID);
-        return primaryBtreeBulkLoad;
-    }
-
-    private IOperatorDescriptor createScanKeyProviderOp(JobSpecification spec) throws HyracksDataException {
-        // build dummy tuple containing nothing
-        ArrayTupleBuilder tb = new ArrayTupleBuilder(primaryKeyFieldCount * 2);
-        DataOutput dos = tb.getDataOutput();
-        tb.reset();
-        UTF8StringSerializerDeserializer.INSTANCE.serialize("0", dos);
-        tb.addFieldEndOffset();
-        ISerializerDeserializer[] keyRecDescSers = { UTF8StringSerializerDeserializer.INSTANCE,
-                UTF8StringSerializerDeserializer.INSTANCE };
-        RecordDescriptor keyRecDesc = new RecordDescriptor(keyRecDescSers);
-        ConstantTupleSourceOperatorDescriptor keyProviderOp = new ConstantTupleSourceOperatorDescriptor(spec,
-                keyRecDesc, tb.getFieldEndOffsets(), tb.getByteArray(), tb.getSize());
-        PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, keyProviderOp, NC1_ID);
-        return keyProviderOp;
-    }
-
-    private IOperatorDescriptor createPrimaryScanOp(JobSpecification spec) throws HyracksDataException {
-        int[] lowKeyFields = null; // - infinity
-        int[] highKeyFields = null; // + infinity
-        BTreeSearchOperatorDescriptor primaryBtreeSearchOp = new BTreeSearchOperatorDescriptor(spec, primaryRecDesc,
-                storageManager, lcManagerProvider, primaryFileSplitProvider, primaryTypeTraits,
-                primaryComparatorFactories, lowKeyFields, highKeyFields, true, true, btreeDataflowHelperFactory, false,
-                NoOpOperationCallbackFactory.INSTANCE);
-        PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, primaryBtreeSearchOp, NC1_ID);
-        return primaryBtreeSearchOp;
-    }
-
-    private void loadPrimaryIndex() throws Exception {
-        JobSpecification spec = new JobSpecification();
-        // Assuming that the data is pre-sorted on the key. No need to sort
-        // before bulk load.
-        IOperatorDescriptor fileScanOp = createFileScanOp(spec);
-        IOperatorDescriptor primaryBulkLoad = createPrimaryBulkLoadOp(spec);
-        spec.connect(new OneToOneConnectorDescriptor(spec), fileScanOp, 0, primaryBulkLoad, 0);
-        spec.addRoot(primaryBulkLoad);
-        runTest(spec);
-    }
-
-    private void printPrimaryIndex() throws Exception {
-        JobSpecification spec = new JobSpecification();
-        IOperatorDescriptor keyProviderOp = createScanKeyProviderOp(spec);
-        IOperatorDescriptor primaryScanOp = createPrimaryScanOp(spec);
-        IFileSplitProvider outSplits = new ConstantFileSplitProvider(new FileSplit[] { new FileSplit(NC1_ID,
-                createTempFile().getAbsolutePath()) });
-        IOperatorDescriptor printer = new PlainFileWriterOperatorDescriptor(spec, outSplits, ",");
-        spec.connect(new OneToOneConnectorDescriptor(spec), keyProviderOp, 0, primaryScanOp, 0);
-        spec.connect(new OneToOneConnectorDescriptor(spec), primaryScanOp, 0, printer, 0);
-        spec.addRoot(printer);
-        runTest(spec);
-    }
-
-    private IOperatorDescriptor createExternalSortOp(JobSpecification spec, int[] sortFields,
-            RecordDescriptor outputRecDesc) {
-        ExternalSortOperatorDescriptor externalSortOp = new ExternalSortOperatorDescriptor(spec, 1000, sortFields,
-                new IBinaryComparatorFactory[] { PointableBinaryComparatorFactory.of(UTF8StringPointable.FACTORY),
-                        PointableBinaryComparatorFactory.of(IntegerPointable.FACTORY) }, outputRecDesc);
-        PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, externalSortOp, NC1_ID);
-        return externalSortOp;
-    }
-
-    private IOperatorDescriptor createBinaryTokenizerOp(JobSpecification spec, int[] tokenFields, int[] keyFields) {
-        BinaryTokenizerOperatorDescriptor binaryTokenizer = new BinaryTokenizerOperatorDescriptor(spec,
-                tokenizerRecDesc, tokenizerFactory, tokenFields, keyFields);
-        PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, binaryTokenizer, NC1_ID);
-        return binaryTokenizer;
-    }
-
-    private IOperatorDescriptor createInvertedIndexBulkLoadOp(JobSpecification spec, int[] fieldPermutation) {
-        LSMInvertedIndexBulkLoadOperatorDescriptor invIndexBulkLoadOp = new LSMInvertedIndexBulkLoadOperatorDescriptor(
-                spec, fieldPermutation, true, storageManager, btreeFileSplitProvider, lcManagerProvider,
-                tokenTypeTraits, tokenComparatorFactories, invListsTypeTraits, invListsComparatorFactories,
-                tokenizerFactory, invertedIndexDataflowHelperFactory, NoOpOperationCallbackFactory.INSTANCE);
-        PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, invIndexBulkLoadOp, NC1_ID);
-        return invIndexBulkLoadOp;
-    }
-
-    public void loadInvertedIndex() throws Exception {
-        JobSpecification spec = new JobSpecification();
-        IOperatorDescriptor keyProviderOp = createScanKeyProviderOp(spec);
-        IOperatorDescriptor primaryScanOp = createPrimaryScanOp(spec);
-        int[] tokenFields = { 1 };
-        int[] keyFields = { 0 };
-        IOperatorDescriptor binaryTokenizerOp = createBinaryTokenizerOp(spec, tokenFields, keyFields);
-        int[] sortFields = { 0, 1 };
-        IOperatorDescriptor externalSortOp = createExternalSortOp(spec, sortFields, tokenizerRecDesc);
-        int[] fieldPermutation = { 0, 1 };
-        IOperatorDescriptor invIndexBulkLoadOp = createInvertedIndexBulkLoadOp(spec, fieldPermutation);
-        spec.connect(new OneToOneConnectorDescriptor(spec), keyProviderOp, 0, primaryScanOp, 0);
-        spec.connect(new OneToOneConnectorDescriptor(spec), primaryScanOp, 0, binaryTokenizerOp, 0);
-        spec.connect(new OneToOneConnectorDescriptor(spec), binaryTokenizerOp, 0, externalSortOp, 0);
-        spec.connect(new OneToOneConnectorDescriptor(spec), externalSortOp, 0, invIndexBulkLoadOp, 0);
-        spec.addRoot(invIndexBulkLoadOp);
-        runTest(spec);
-    }
-
-    private IOperatorDescriptor createQueryProviderOp(JobSpecification spec, String queryString)
-            throws HyracksDataException {
-        // Build tuple with exactly one field, which is the query,
-        ArrayTupleBuilder tb = new ArrayTupleBuilder(1);
-        DataOutput dos = tb.getDataOutput();
-        tb.reset();
-        UTF8StringSerializerDeserializer.INSTANCE.serialize(queryString, dos);
-        tb.addFieldEndOffset();
-        ISerializerDeserializer[] querySerde = { UTF8StringSerializerDeserializer.INSTANCE };
-        RecordDescriptor queryRecDesc = new RecordDescriptor(querySerde);
-        ConstantTupleSourceOperatorDescriptor queryProviderOp = new ConstantTupleSourceOperatorDescriptor(spec,
-                queryRecDesc, tb.getFieldEndOffsets(), tb.getByteArray(), tb.getSize());
-        PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, queryProviderOp, NC1_ID);
-        return queryProviderOp;
-    }
-
-    private IOperatorDescriptor createInvertedIndexSearchOp(JobSpecification spec,
-            IInvertedIndexSearchModifierFactory searchModifierFactory) {
-        LSMInvertedIndexSearchOperatorDescriptor invIndexSearchOp = new LSMInvertedIndexSearchOperatorDescriptor(spec,
-                0, storageManager, btreeFileSplitProvider, lcManagerProvider, tokenTypeTraits,
-                tokenComparatorFactories, invListsTypeTraits, invListsComparatorFactories,
-                invertedIndexDataflowHelperFactory, tokenizerFactory, searchModifierFactory, invListsRecDesc, false,
-                NoOpOperationCallbackFactory.INSTANCE);
-        PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, invIndexSearchOp, NC1_ID);
-        return invIndexSearchOp;
-    }
-
-    public void searchInvertedIndex(String queryString, IInvertedIndexSearchModifierFactory searchModifierFactory)
-            throws Exception {
-        JobSpecification spec = new JobSpecification();
-        IOperatorDescriptor queryProviderOp = createQueryProviderOp(spec, queryString);
-        IOperatorDescriptor invIndexSearchOp = createInvertedIndexSearchOp(spec, searchModifierFactory);
-        IFileSplitProvider outSplits = new ConstantFileSplitProvider(new FileSplit[] { new FileSplit(NC1_ID,
-                createTempFile().getAbsolutePath()) });
-        IOperatorDescriptor printer = new PlainFileWriterOperatorDescriptor(spec, outSplits, ",");
-        spec.connect(new OneToOneConnectorDescriptor(spec), queryProviderOp, 0, invIndexSearchOp, 0);
-        spec.connect(new OneToOneConnectorDescriptor(spec), invIndexSearchOp, 0, printer, 0);
-        spec.addRoot(printer);
-        runTest(spec);
+    @Override
+    protected boolean addNumTokensKey() {
+        return false;
     }
 }
diff --git a/hyracks-storage-am-lsm-invertedindex/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/invertedindex/dataflow/BinaryTokenizerOperatorDescriptor.java b/hyracks-storage-am-lsm-invertedindex/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/invertedindex/dataflow/BinaryTokenizerOperatorDescriptor.java
index 4706eed..84152d5 100644
--- a/hyracks-storage-am-lsm-invertedindex/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/invertedindex/dataflow/BinaryTokenizerOperatorDescriptor.java
+++ b/hyracks-storage-am-lsm-invertedindex/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/invertedindex/dataflow/BinaryTokenizerOperatorDescriptor.java
@@ -29,19 +29,23 @@
     private static final long serialVersionUID = 1L;
 
     private final IBinaryTokenizerFactory tokenizerFactory;
-    // Fields that will be tokenized
-    private final int[] tokenFields;
+    // Field that will be tokenized.
+    private final int docField;
     // operator will append these key fields to each token, e.g., as
     // payload for an inverted list
     // WARNING: too many key fields can cause significant data blowup.
     private final int[] keyFields;
+    // Indicates whether the first key field should be the number of tokens in the tokenized set of the document.
+    // This value is used in partitioned inverted indexes, for example.
+    private final boolean addNumTokensKey;
 
     public BinaryTokenizerOperatorDescriptor(IOperatorDescriptorRegistry spec, RecordDescriptor recDesc,
-            IBinaryTokenizerFactory tokenizerFactory, int[] tokenFields, int[] keyFields) {
+            IBinaryTokenizerFactory tokenizerFactory, int docField, int[] keyFields, boolean addNumTokensKey) {
         super(spec, 1, 1);
         this.tokenizerFactory = tokenizerFactory;
-        this.tokenFields = tokenFields;
+        this.docField = docField;
         this.keyFields = keyFields;
+        this.addNumTokensKey = addNumTokensKey;
         recordDescriptors[0] = recDesc;
     }
 
@@ -49,6 +53,7 @@
     public IOperatorNodePushable createPushRuntime(IHyracksTaskContext ctx,
             IRecordDescriptorProvider recordDescProvider, int partition, int nPartitions) throws HyracksDataException {
         return new BinaryTokenizerOperatorNodePushable(ctx, recordDescProvider.getInputRecordDescriptor(
-                getActivityId(), 0), recordDescriptors[0], tokenizerFactory.createTokenizer(), tokenFields, keyFields);
+                getActivityId(), 0), recordDescriptors[0], tokenizerFactory.createTokenizer(), docField, keyFields,
+                addNumTokensKey);
     }
 }
diff --git a/hyracks-storage-am-lsm-invertedindex/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/invertedindex/dataflow/BinaryTokenizerOperatorNodePushable.java b/hyracks-storage-am-lsm-invertedindex/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/invertedindex/dataflow/BinaryTokenizerOperatorNodePushable.java
index 8f2b155..9495516 100644
--- a/hyracks-storage-am-lsm-invertedindex/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/invertedindex/dataflow/BinaryTokenizerOperatorNodePushable.java
+++ b/hyracks-storage-am-lsm-invertedindex/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/invertedindex/dataflow/BinaryTokenizerOperatorNodePushable.java
@@ -1,5 +1,5 @@
 /*
- * Copyright 2009-2010 by The Regents of the University of California
+ * Copyright 2009-2012 by The Regents of the University of California
  * Licensed under the Apache License, Version 2.0 (the "License");
  * you may not use this file except in compliance with the License.
  * you may obtain a copy of the License from
@@ -34,8 +34,9 @@
 
     private final IHyracksTaskContext ctx;
     private final IBinaryTokenizer tokenizer;
-    private final int[] tokenFields;
-    private final int[] projFields;
+    private final int docField;
+    private final int[] keyFields;
+    private final boolean addNumTokensKey;
     private final RecordDescriptor inputRecDesc;
     private final RecordDescriptor outputRecDesc;
 
@@ -46,11 +47,13 @@
     private ByteBuffer writeBuffer;
 
     public BinaryTokenizerOperatorNodePushable(IHyracksTaskContext ctx, RecordDescriptor inputRecDesc,
-            RecordDescriptor outputRecDesc, IBinaryTokenizer tokenizer, int[] tokenFields, int[] projFields) {
+            RecordDescriptor outputRecDesc, IBinaryTokenizer tokenizer, int docField, int[] keyFields,
+            boolean addNumTokensKey) {
         this.ctx = ctx;
         this.tokenizer = tokenizer;
-        this.tokenFields = tokenFields;
-        this.projFields = projFields;
+        this.docField = docField;
+        this.keyFields = keyFields;
+        this.addNumTokensKey = addNumTokensKey;
         this.inputRecDesc = inputRecDesc;
         this.outputRecDesc = outputRecDesc;
     }
@@ -69,41 +72,51 @@
     @Override
     public void nextFrame(ByteBuffer buffer) throws HyracksDataException {
         accessor.reset(buffer);
-
         int tupleCount = accessor.getTupleCount();
         for (int i = 0; i < tupleCount; i++) {
-
-            for (int j = 0; j < tokenFields.length; j++) {
-
+            short numTokens = 0;
+            if (addNumTokensKey) {
+                // Run through the tokens to get the total number of tokens.
                 tokenizer.reset(
                         accessor.getBuffer().array(),
                         accessor.getTupleStartOffset(i) + accessor.getFieldSlotsLength()
-                                + accessor.getFieldStartOffset(i, tokenFields[j]),
-                        accessor.getFieldLength(i, tokenFields[j]));
-
+                                + accessor.getFieldStartOffset(i, docField), accessor.getFieldLength(i, docField));
                 while (tokenizer.hasNext()) {
                     tokenizer.next();
+                    numTokens++;
+                }
+            }
 
-                    builder.reset();
-                    try {
-                        IToken token = tokenizer.getToken();
-                        token.serializeToken(builderDos);
+            tokenizer.reset(
+                    accessor.getBuffer().array(),
+                    accessor.getTupleStartOffset(i) + accessor.getFieldSlotsLength()
+                            + accessor.getFieldStartOffset(i, docField), accessor.getFieldLength(i, docField));
+            while (tokenizer.hasNext()) {
+                tokenizer.next();
+
+                builder.reset();
+                try {
+                    IToken token = tokenizer.getToken();
+                    token.serializeToken(builderDos);
+                    builder.addFieldEndOffset();
+                    // Add number of tokens if requested.
+                    if (addNumTokensKey) {
+                        builder.getDataOutput().writeShort(numTokens);
                         builder.addFieldEndOffset();
-                    } catch (IOException e) {
-                        throw new HyracksDataException(e.getMessage());
                     }
+                } catch (IOException e) {
+                    throw new HyracksDataException(e.getMessage());
+                }
 
-                    for (int k = 0; k < projFields.length; k++) {
-                        builder.addField(accessor, i, projFields[k]);
-                    }
+                for (int k = 0; k < keyFields.length; k++) {
+                    builder.addField(accessor, i, keyFields[k]);
+                }
 
+                if (!appender.append(builder.getFieldEndOffsets(), builder.getByteArray(), 0, builder.getSize())) {
+                    FrameUtils.flushFrame(writeBuffer, writer);
+                    appender.reset(writeBuffer, true);
                     if (!appender.append(builder.getFieldEndOffsets(), builder.getByteArray(), 0, builder.getSize())) {
-                        FrameUtils.flushFrame(writeBuffer, writer);
-                        appender.reset(writeBuffer, true);
-                        if (!appender
-                                .append(builder.getFieldEndOffsets(), builder.getByteArray(), 0, builder.getSize())) {
-                            throw new IllegalStateException();
-                        }
+                        throw new IllegalStateException();
                     }
                 }
             }
diff --git a/hyracks-storage-am-lsm-invertedindex/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/invertedindex/dataflow/PartitionedLSMInvertedIndexDataflowHelper.java b/hyracks-storage-am-lsm-invertedindex/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/invertedindex/dataflow/PartitionedLSMInvertedIndexDataflowHelper.java
new file mode 100644
index 0000000..ea07b24
--- /dev/null
+++ b/hyracks-storage-am-lsm-invertedindex/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/invertedindex/dataflow/PartitionedLSMInvertedIndexDataflowHelper.java
@@ -0,0 +1,79 @@
+/*
+ * Copyright 2009-2012 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ * 
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.hyracks.storage.am.lsm.invertedindex.dataflow;
+
+import edu.uci.ics.hyracks.api.context.IHyracksTaskContext;
+import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
+import edu.uci.ics.hyracks.storage.am.common.api.IInMemoryFreePageManager;
+import edu.uci.ics.hyracks.storage.am.common.api.IIndex;
+import edu.uci.ics.hyracks.storage.am.common.api.ITreeIndexMetaDataFrameFactory;
+import edu.uci.ics.hyracks.storage.am.common.api.IndexException;
+import edu.uci.ics.hyracks.storage.am.common.dataflow.IIndexOperatorDescriptor;
+import edu.uci.ics.hyracks.storage.am.common.frames.LIFOMetaDataFrameFactory;
+import edu.uci.ics.hyracks.storage.am.lsm.common.api.IInMemoryBufferCache;
+import edu.uci.ics.hyracks.storage.am.lsm.common.api.ILSMFlushController;
+import edu.uci.ics.hyracks.storage.am.lsm.common.api.ILSMIOOperationScheduler;
+import edu.uci.ics.hyracks.storage.am.lsm.common.api.ILSMMergePolicy;
+import edu.uci.ics.hyracks.storage.am.lsm.common.api.ILSMOperationTrackerFactory;
+import edu.uci.ics.hyracks.storage.am.lsm.common.dataflow.AbstractLSMIndexDataflowHelper;
+import edu.uci.ics.hyracks.storage.am.lsm.common.freepage.DualIndexInMemoryBufferCache;
+import edu.uci.ics.hyracks.storage.am.lsm.common.freepage.DualIndexInMemoryFreePageManager;
+import edu.uci.ics.hyracks.storage.am.lsm.invertedindex.api.IInvertedIndexOperatorDescriptor;
+import edu.uci.ics.hyracks.storage.am.lsm.invertedindex.impls.PartitionedLSMInvertedIndex;
+import edu.uci.ics.hyracks.storage.am.lsm.invertedindex.util.InvertedIndexUtils;
+import edu.uci.ics.hyracks.storage.common.buffercache.HeapBufferAllocator;
+import edu.uci.ics.hyracks.storage.common.buffercache.IBufferCache;
+import edu.uci.ics.hyracks.storage.common.file.IFileMapProvider;
+
+public final class PartitionedLSMInvertedIndexDataflowHelper extends AbstractLSMIndexDataflowHelper {
+
+    public PartitionedLSMInvertedIndexDataflowHelper(IIndexOperatorDescriptor opDesc, IHyracksTaskContext ctx,
+            int partition, ILSMFlushController flushController, ILSMMergePolicy mergePolicy,
+            ILSMOperationTrackerFactory opTrackerFactory, ILSMIOOperationScheduler ioScheduler) {
+        this(opDesc, ctx, partition, DEFAULT_MEM_PAGE_SIZE, DEFAULT_MEM_NUM_PAGES, flushController, mergePolicy,
+                opTrackerFactory, ioScheduler);
+    }
+
+    public PartitionedLSMInvertedIndexDataflowHelper(IIndexOperatorDescriptor opDesc, IHyracksTaskContext ctx,
+            int partition, int memPageSize, int memNumPages, ILSMFlushController flushController,
+            ILSMMergePolicy mergePolicy, ILSMOperationTrackerFactory opTrackerFactory,
+            ILSMIOOperationScheduler ioScheduler) {
+        super(opDesc, ctx, partition, memPageSize, memNumPages, flushController, mergePolicy, opTrackerFactory,
+                ioScheduler);
+    }
+
+    @Override
+    public IIndex createIndexInstance() throws HyracksDataException {
+        IInvertedIndexOperatorDescriptor invIndexOpDesc = (IInvertedIndexOperatorDescriptor) opDesc;
+        try {
+            ITreeIndexMetaDataFrameFactory metaDataFrameFactory = new LIFOMetaDataFrameFactory();
+            IInMemoryBufferCache memBufferCache = new DualIndexInMemoryBufferCache(new HeapBufferAllocator(),
+                    memPageSize, memNumPages);
+            IInMemoryFreePageManager memFreePageManager = new DualIndexInMemoryFreePageManager(memNumPages,
+                    metaDataFrameFactory);
+            IBufferCache diskBufferCache = opDesc.getStorageManager().getBufferCache(ctx);
+            IFileMapProvider diskFileMapProvider = opDesc.getStorageManager().getFileMapProvider(ctx);
+            PartitionedLSMInvertedIndex invIndex = InvertedIndexUtils.createPartitionedLSMInvertedIndex(memBufferCache,
+                    memFreePageManager, diskFileMapProvider, invIndexOpDesc.getInvListsTypeTraits(),
+                    invIndexOpDesc.getInvListsComparatorFactories(), invIndexOpDesc.getTokenTypeTraits(),
+                    invIndexOpDesc.getTokenComparatorFactories(), invIndexOpDesc.getTokenizerFactory(),
+                    diskBufferCache, ctx.getIOManager(), file.getFile().getPath(), flushController, mergePolicy,
+                    opTrackerFactory, ioScheduler);
+            return invIndex;
+        } catch (IndexException e) {
+            throw new HyracksDataException(e);
+        }
+    }
+}
\ No newline at end of file
diff --git a/hyracks-storage-am-lsm-invertedindex/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/invertedindex/dataflow/PartitionedLSMInvertedIndexDataflowHelperFactory.java b/hyracks-storage-am-lsm-invertedindex/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/invertedindex/dataflow/PartitionedLSMInvertedIndexDataflowHelperFactory.java
new file mode 100644
index 0000000..34367db
--- /dev/null
+++ b/hyracks-storage-am-lsm-invertedindex/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/invertedindex/dataflow/PartitionedLSMInvertedIndexDataflowHelperFactory.java
@@ -0,0 +1,45 @@
+/*
+ * Copyright 2009-2012 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ * 
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package edu.uci.ics.hyracks.storage.am.lsm.invertedindex.dataflow;
+
+import edu.uci.ics.hyracks.api.context.IHyracksTaskContext;
+import edu.uci.ics.hyracks.storage.am.common.dataflow.IIndexOperatorDescriptor;
+import edu.uci.ics.hyracks.storage.am.common.dataflow.IndexDataflowHelper;
+import edu.uci.ics.hyracks.storage.am.lsm.common.api.ILSMFlushControllerProvider;
+import edu.uci.ics.hyracks.storage.am.lsm.common.api.ILSMIOOperationSchedulerProvider;
+import edu.uci.ics.hyracks.storage.am.lsm.common.api.ILSMMergePolicyProvider;
+import edu.uci.ics.hyracks.storage.am.lsm.common.api.ILSMOperationTrackerFactory;
+import edu.uci.ics.hyracks.storage.am.lsm.common.dataflow.AbstractLSMIndexDataflowHelperFactory;
+
+public class PartitionedLSMInvertedIndexDataflowHelperFactory extends AbstractLSMIndexDataflowHelperFactory {
+
+    private static final long serialVersionUID = 1L;
+
+    public PartitionedLSMInvertedIndexDataflowHelperFactory(ILSMFlushControllerProvider flushControllerProvider,
+            ILSMMergePolicyProvider mergePolicyProvider, ILSMOperationTrackerFactory opTrackerProvider,
+            ILSMIOOperationSchedulerProvider ioSchedulerProvider) {
+        super(flushControllerProvider, mergePolicyProvider, opTrackerProvider, ioSchedulerProvider);
+    }
+
+    @Override
+    public IndexDataflowHelper createIndexDataflowHelper(IIndexOperatorDescriptor opDesc, IHyracksTaskContext ctx,
+            int partition) {
+        return new PartitionedLSMInvertedIndexDataflowHelper(opDesc, ctx, partition,
+                flushControllerProvider.getFlushController(ctx), mergePolicyProvider.getMergePolicy(ctx),
+                opTrackerFactory, ioSchedulerProvider.getIOScheduler(ctx));
+    }
+
+}