[ASTERIXDB-3144][RT] Make index modification runtime support multiple partitions

- user model changes: no
- storage format changes: no
- interface changes: yes

Details:
This patch changes the index modification runtime to support
operating on multiple partitions. With this change, an index
modification node pushable will write to multiple indexes
representing multiple partitions. This is a step towards
achieving compute/storage separation.

Change-Id: I08da28f2a26fcaf581c2256312455fe541fae5ea
Reviewed-on: https://asterix-gerrit.ics.uci.edu/c/asterixdb/+/17452
Integration-Tests: Jenkins <jenkins@fulliautomatix.ics.uci.edu>
Tested-by: Jenkins <jenkins@fulliautomatix.ics.uci.edu>
Reviewed-by: Murtadha Hubail <mhubail@apache.org>
diff --git a/asterixdb/asterix-app/src/test/java/org/apache/asterix/app/bootstrap/TestNodeController.java b/asterixdb/asterix-app/src/test/java/org/apache/asterix/app/bootstrap/TestNodeController.java
index f6caf5d..a65c898 100644
--- a/asterixdb/asterix-app/src/test/java/org/apache/asterix/app/bootstrap/TestNodeController.java
+++ b/asterixdb/asterix-app/src/test/java/org/apache/asterix/app/bootstrap/TestNodeController.java
@@ -40,6 +40,7 @@
 import org.apache.asterix.common.transactions.TxnId;
 import org.apache.asterix.dataflow.data.nontagged.MissingWriterFactory;
 import org.apache.asterix.file.StorageComponentProvider;
+import org.apache.asterix.formats.nontagged.BinaryHashFunctionFactoryProvider;
 import org.apache.asterix.formats.nontagged.SerializerDeserializerProvider;
 import org.apache.asterix.formats.nontagged.TypeTraitProvider;
 import org.apache.asterix.metadata.MetadataManager;
@@ -83,8 +84,10 @@
 import org.apache.hyracks.api.dataflow.OperatorDescriptorId;
 import org.apache.hyracks.api.dataflow.TaskAttemptId;
 import org.apache.hyracks.api.dataflow.TaskId;
+import org.apache.hyracks.api.dataflow.value.IBinaryHashFunctionFactory;
 import org.apache.hyracks.api.dataflow.value.IRecordDescriptorProvider;
 import org.apache.hyracks.api.dataflow.value.ISerializerDeserializer;
+import org.apache.hyracks.api.dataflow.value.ITuplePartitionerFactory;
 import org.apache.hyracks.api.dataflow.value.ITypeTraits;
 import org.apache.hyracks.api.dataflow.value.RecordDescriptor;
 import org.apache.hyracks.api.exceptions.HyracksDataException;
@@ -92,6 +95,7 @@
 import org.apache.hyracks.api.job.JobId;
 import org.apache.hyracks.api.job.JobSpecification;
 import org.apache.hyracks.api.util.HyracksConstants;
+import org.apache.hyracks.dataflow.common.data.partition.FieldHashPartitionerFactory;
 import org.apache.hyracks.dataflow.common.utils.TaskUtil;
 import org.apache.hyracks.dataflow.std.file.ConstantFileSplitProvider;
 import org.apache.hyracks.dataflow.std.file.IFileSplitProvider;
@@ -253,10 +257,17 @@
                         pkIndexInfo.fileSplitProvider);
             }
 
+            int numPartitions = primaryIndexInfo.getFileSplitProvider().getFileSplits().length;
+            int[][] partitionsMap = MetadataProvider.getPartitionsMap(numPartitions);
+            IBinaryHashFunctionFactory[] pkHashFunFactories = primaryIndexInfo.hashFuncFactories;
+            ITuplePartitionerFactory tuplePartitionerFactory = new FieldHashPartitionerFactory(
+                    primaryIndexInfo.primaryKeyIndexes, pkHashFunFactories, numPartitions);
+
             LSMPrimaryInsertOperatorNodePushable insertOp = new LSMPrimaryInsertOperatorNodePushable(ctx,
                     ctx.getTaskAttemptId().getTaskId().getPartition(), indexHelperFactory, pkIndexHelperFactory,
                     primaryIndexInfo.primaryIndexInsertFieldsPermutations, recordDesc, modOpCallbackFactory,
-                    searchOpCallbackFactory, primaryKeyIndexes.length, filterFields, null);
+                    searchOpCallbackFactory, primaryKeyIndexes.length, filterFields, null, tuplePartitionerFactory,
+                    partitionsMap);
             // For now, this assumes a single secondary index. recordDesc is always <pk-record-meta>
             // for the index, we will have to create an assign operator that extract the sk
             // then the secondary LSMInsertDeleteOperatorNodePushable
@@ -304,10 +315,12 @@
                         dataset.getModificationCallbackFactory(storageComponentProvider, secondaryIndex,
                                 IndexOperation.INSERT, primaryKeyIndexes);
 
+                ITuplePartitionerFactory tuplePartitionerFactory2 = new FieldHashPartitionerFactory(
+                        secondaryIndexInfo.primaryKeyIndexes, pkHashFunFactories, numPartitions);
                 LSMInsertDeleteOperatorNodePushable secondaryInsertOp = new LSMInsertDeleteOperatorNodePushable(ctx,
                         ctx.getTaskAttemptId().getTaskId().getPartition(), secondaryIndexInfo.insertFieldsPermutations,
                         secondaryIndexInfo.rDesc, IndexOperation.INSERT, false, secondaryIndexHelperFactory,
-                        secondaryModCallbackFactory, null, null);
+                        secondaryModCallbackFactory, null, null, tuplePartitionerFactory2, partitionsMap);
                 assignOp.setOutputFrameWriter(0, secondaryInsertOp, secondaryIndexInfo.rDesc);
 
                 IPushRuntime commitOp =
@@ -353,10 +366,15 @@
                     recordDescProvider.getInputRecordDescriptor(new ActivityId(new OperatorDescriptorId(0), 0), 0);
             IIndexDataflowHelperFactory indexHelperFactory = new IndexDataflowHelperFactory(
                     storageComponentProvider.getStorageManager(), primaryIndexInfo.getFileSplitProvider());
-            LSMInsertDeleteOperatorNodePushable deleteOp =
-                    new LSMInsertDeleteOperatorNodePushable(ctx, ctx.getTaskAttemptId().getTaskId().getPartition(),
-                            primaryIndexInfo.primaryIndexInsertFieldsPermutations, recordDesc, IndexOperation.DELETE,
-                            true, indexHelperFactory, modOpCallbackFactory, null, null);
+            int numPartitions = primaryIndexInfo.getFileSplitProvider().getFileSplits().length;
+            int[][] partitionsMap = MetadataProvider.getPartitionsMap(numPartitions);
+            IBinaryHashFunctionFactory[] pkHashFunFactories = primaryIndexInfo.hashFuncFactories;
+            ITuplePartitionerFactory tuplePartitionerFactory = new FieldHashPartitionerFactory(
+                    primaryIndexInfo.primaryKeyIndexes, pkHashFunFactories, numPartitions);
+            LSMInsertDeleteOperatorNodePushable deleteOp = new LSMInsertDeleteOperatorNodePushable(ctx,
+                    ctx.getTaskAttemptId().getTaskId().getPartition(),
+                    primaryIndexInfo.primaryIndexInsertFieldsPermutations, recordDesc, IndexOperation.DELETE, true,
+                    indexHelperFactory, modOpCallbackFactory, null, null, tuplePartitionerFactory, partitionsMap);
             // For now, this assumes a single secondary index. recordDesc is always <pk-record-meta>
             // for the index, we will have to create an assign operator that extract the sk
             // then the secondary LSMInsertDeleteOperatorNodePushable
@@ -404,10 +422,12 @@
                         dataset.getModificationCallbackFactory(storageComponentProvider, secondaryIndex,
                                 IndexOperation.INSERT, primaryKeyIndexes);
 
+                ITuplePartitionerFactory tuplePartitionerFactory2 = new FieldHashPartitionerFactory(
+                        secondaryIndexInfo.primaryKeyIndexes, pkHashFunFactories, numPartitions);
                 LSMInsertDeleteOperatorNodePushable secondaryInsertOp = new LSMInsertDeleteOperatorNodePushable(ctx,
                         ctx.getTaskAttemptId().getTaskId().getPartition(), secondaryIndexInfo.insertFieldsPermutations,
                         secondaryIndexInfo.rDesc, IndexOperation.DELETE, false, secondaryIndexHelperFactory,
-                        secondaryModCallbackFactory, null, null);
+                        secondaryModCallbackFactory, null, null, tuplePartitionerFactory2, partitionsMap);
                 assignOp.setOutputFrameWriter(0, secondaryInsertOp, secondaryIndexInfo.rDesc);
 
                 IPushRuntime commitOp =
@@ -684,6 +704,7 @@
         private final Map<String, String> mergePolicyProperties;
         private final int primaryIndexNumOfTupleFields;
         private final ITypeTraits[] primaryIndexTypeTraits;
+        private final IBinaryHashFunctionFactory[] hashFuncFactories;
         private final ISerializerDeserializer<?>[] primaryIndexSerdes;
         private final ConstantFileSplitProvider fileSplitProvider;
         private final RecordDescriptor rDesc;
@@ -706,6 +727,11 @@
                     + (filterFields != null ? filterFields.length : 0);
             primaryIndexTypeTraits =
                     createPrimaryIndexTypeTraits(primaryIndexNumOfTupleFields, primaryKeyTypes, recordType, metaType);
+            hashFuncFactories = new IBinaryHashFunctionFactory[primaryKeyTypes.length];
+            for (int i = 0; i < primaryKeyTypes.length; i++) {
+                hashFuncFactories[i] =
+                        BinaryHashFunctionFactoryProvider.INSTANCE.getBinaryHashFunctionFactory(primaryKeyTypes[i]);
+            }
             primaryIndexSerdes =
                     createPrimaryIndexSerdes(primaryIndexNumOfTupleFields, primaryKeyTypes, recordType, metaType);
             rDesc = new RecordDescriptor(primaryIndexSerdes, primaryIndexTypeTraits);
@@ -806,6 +832,11 @@
         IRecordDescriptorProvider recordDescProvider = primaryIndexInfo.getInsertRecordDescriptorProvider();
         IIndexDataflowHelperFactory indexHelperFactory = new IndexDataflowHelperFactory(
                 storageComponentProvider.getStorageManager(), primaryIndexInfo.getFileSplitProvider());
+        int numPartitions = primaryIndexInfo.getFileSplitProvider().getFileSplits().length;
+        int[][] partitionsMap = MetadataProvider.getPartitionsMap(numPartitions);
+        IBinaryHashFunctionFactory[] pkHashFunFactories = primaryIndexInfo.hashFuncFactories;
+        ITuplePartitionerFactory tuplePartitionerFactory =
+                new FieldHashPartitionerFactory(primaryIndexInfo.primaryKeyIndexes, pkHashFunFactories, numPartitions);
         LSMPrimaryUpsertOperatorNodePushable insertOp =
                 new LSMPrimaryUpsertOperatorNodePushable(ctx, ctx.getTaskAttemptId().getTaskId().getPartition(),
                         indexHelperFactory, primaryIndexInfo.primaryIndexInsertFieldsPermutations,
@@ -813,7 +844,8 @@
                         modificationCallbackFactory, searchCallbackFactory, keyIndexes.length, 0, recordType, -1,
                         frameOpCallbackFactory == null ? dataset.getFrameOpCallbackFactory(mdProvider)
                                 : frameOpCallbackFactory,
-                        MissingWriterFactory.INSTANCE, hasSecondaries, NoOpTupleProjectorFactory.INSTANCE);
+                        MissingWriterFactory.INSTANCE, hasSecondaries, NoOpTupleProjectorFactory.INSTANCE,
+                        tuplePartitionerFactory, partitionsMap);
         RecordDescriptor upsertOutRecDesc = getUpsertOutRecDesc(primaryIndexInfo.rDesc, dataset,
                 filterFields == null ? 0 : filterFields.length, recordType, metaType);
         // fix pk fields
diff --git a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/dataflow/LSMInsertDeleteOperatorNodePushable.java b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/dataflow/LSMInsertDeleteOperatorNodePushable.java
index ab0732b..f9934f2 100644
--- a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/dataflow/LSMInsertDeleteOperatorNodePushable.java
+++ b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/dataflow/LSMInsertDeleteOperatorNodePushable.java
@@ -25,33 +25,36 @@
 import org.apache.asterix.common.transactions.PrimaryIndexLogMarkerCallback;
 import org.apache.hyracks.api.comm.VSizeFrame;
 import org.apache.hyracks.api.context.IHyracksTaskContext;
+import org.apache.hyracks.api.dataflow.value.ITuplePartitionerFactory;
 import org.apache.hyracks.api.dataflow.value.RecordDescriptor;
 import org.apache.hyracks.api.exceptions.ErrorCode;
 import org.apache.hyracks.api.exceptions.HyracksDataException;
 import org.apache.hyracks.api.exceptions.SourceLocation;
+import org.apache.hyracks.api.util.ExceptionUtils;
 import org.apache.hyracks.dataflow.common.comm.io.FrameTupleAccessor;
 import org.apache.hyracks.dataflow.common.comm.io.FrameTupleAppender;
 import org.apache.hyracks.dataflow.common.comm.util.FrameUtils;
 import org.apache.hyracks.dataflow.common.data.accessors.FrameTupleReference;
 import org.apache.hyracks.dataflow.common.utils.TaskUtil;
+import org.apache.hyracks.storage.am.common.api.IIndexDataflowHelper;
 import org.apache.hyracks.storage.am.common.api.IModificationOperationCallbackFactory;
 import org.apache.hyracks.storage.am.common.api.ITupleFilterFactory;
 import org.apache.hyracks.storage.am.common.dataflow.IIndexDataflowHelperFactory;
 import org.apache.hyracks.storage.am.common.impls.IndexAccessParameters;
 import org.apache.hyracks.storage.am.common.impls.NoOpOperationCallback;
 import org.apache.hyracks.storage.am.common.ophelpers.IndexOperation;
+import org.apache.hyracks.storage.am.common.util.ResourceReleaseUtils;
+import org.apache.hyracks.storage.am.lsm.common.api.ILSMIndex;
 import org.apache.hyracks.storage.am.lsm.common.api.ILSMIndexAccessor;
 import org.apache.hyracks.storage.am.lsm.common.dataflow.LSMIndexInsertUpdateDeleteOperatorNodePushable;
 import org.apache.hyracks.storage.am.lsm.common.impls.AbstractLSMIndex;
 import org.apache.hyracks.storage.common.IIndexAccessParameters;
+import org.apache.hyracks.storage.common.LocalResource;
 
 public class LSMInsertDeleteOperatorNodePushable extends LSMIndexInsertUpdateDeleteOperatorNodePushable {
 
-    public static final String KEY_INDEX = "Index";
     protected final boolean isPrimary;
     protected final SourceLocation sourceLoc;
-    // This class has both lsmIndex and index (in super class) pointing to the same object
-    private AbstractLSMIndex lsmIndex;
     protected int i = 0;
 
     /**
@@ -71,9 +74,10 @@
     public LSMInsertDeleteOperatorNodePushable(IHyracksTaskContext ctx, int partition, int[] fieldPermutation,
             RecordDescriptor inputRecDesc, IndexOperation op, boolean isPrimary,
             IIndexDataflowHelperFactory indexHelperFactory, IModificationOperationCallbackFactory modCallbackFactory,
-            ITupleFilterFactory tupleFilterFactory, SourceLocation sourceLoc) throws HyracksDataException {
+            ITupleFilterFactory tupleFilterFactory, SourceLocation sourceLoc,
+            ITuplePartitionerFactory tuplePartitionerFactory, int[][] partitionsMap) throws HyracksDataException {
         super(ctx, partition, indexHelperFactory, fieldPermutation, inputRecDesc, op, modCallbackFactory,
-                tupleFilterFactory);
+                tupleFilterFactory, tuplePartitionerFactory, partitionsMap);
         this.isPrimary = isPrimary;
         this.sourceLoc = sourceLoc;
     }
@@ -87,25 +91,32 @@
         accessor = new FrameTupleAccessor(inputRecDesc);
         writeBuffer = new VSizeFrame(ctx);
         appender = new FrameTupleAppender(writeBuffer);
-        indexHelper.open();
-        lsmIndex = (AbstractLSMIndex) indexHelper.getIndexInstance();
         try {
+            INcApplicationContext runtimeCtx =
+                    (INcApplicationContext) ctx.getJobletContext().getServiceContext().getApplicationContext();
+
+            for (int i = 0; i < indexHelpers.length; i++) {
+                IIndexDataflowHelper indexHelper = indexHelpers[i];
+                indexHelper.open();
+                indexes[i] = indexHelper.getIndexInstance();
+                LocalResource resource = indexHelper.getResource();
+                modCallbacks[i] = modOpCallbackFactory.createModificationOperationCallback(resource, ctx, this);
+                IIndexAccessParameters iap = new IndexAccessParameters(modCallbacks[i], NoOpOperationCallback.INSTANCE);
+                indexAccessors[i] = indexes[i].createAccessor(iap);
+                LSMIndexUtil.checkAndSetFirstLSN((AbstractLSMIndex) indexes[i],
+                        runtimeCtx.getTransactionSubsystem().getLogManager());
+            }
+
             if (isPrimary && ctx.getSharedObject() != null) {
-                PrimaryIndexLogMarkerCallback callback = new PrimaryIndexLogMarkerCallback(lsmIndex);
+                PrimaryIndexLogMarkerCallback callback = new PrimaryIndexLogMarkerCallback((ILSMIndex) indexes[0]);
                 TaskUtil.put(ILogMarkerCallback.KEY_MARKER_CALLBACK, callback, ctx);
             }
             writer.open();
-            modCallback =
-                    modOpCallbackFactory.createModificationOperationCallback(indexHelper.getResource(), ctx, this);
-            IIndexAccessParameters iap = new IndexAccessParameters(modCallback, NoOpOperationCallback.INSTANCE);
-            indexAccessor = lsmIndex.createAccessor(iap);
+            writerOpen = true;
             if (tupleFilterFactory != null) {
                 tupleFilter = tupleFilterFactory.createTupleFilter(ctx);
                 frameTuple = new FrameTupleReference();
             }
-            INcApplicationContext runtimeCtx =
-                    (INcApplicationContext) ctx.getJobletContext().getServiceContext().getApplicationContext();
-            LSMIndexUtil.checkAndSetFirstLSN(lsmIndex, runtimeCtx.getTransactionSubsystem().getLogManager());
         } catch (Throwable th) {
             throw HyracksDataException.create(th);
         }
@@ -114,7 +125,6 @@
     @Override
     public void nextFrame(ByteBuffer buffer) throws HyracksDataException {
         accessor.reset(buffer);
-        ILSMIndexAccessor lsmAccessor = (ILSMIndexAccessor) indexAccessor;
         int tupleCount = accessor.getTupleCount();
         try {
             for (; i < tupleCount; i++, currentTupleIdx++) {
@@ -125,6 +135,9 @@
                     }
                 }
                 tuple.reset(accessor, i);
+                int storagePartition = tuplePartitioner.partition(accessor, i);
+                int storageIdx = storagePartitionId2Index.get(storagePartition);
+                ILSMIndexAccessor lsmAccessor = (ILSMIndexAccessor) indexAccessors[storageIdx];
                 switch (op) {
                     case INSERT:
                         if (i == 0 && isPrimary) {
@@ -188,18 +201,25 @@
 
     @Override
     public void close() throws HyracksDataException {
-        if (lsmIndex != null) {
+        Throwable failure = null;
+        for (IIndexDataflowHelper indexHelper : indexHelpers) {
+            failure = ResourceReleaseUtils.close(indexHelper, failure);
+        }
+        if (writerOpen) {
             try {
-                indexHelper.close();
-            } finally {
                 writer.close();
+            } catch (Throwable th) {
+                failure = ExceptionUtils.suppress(failure, th);
             }
         }
+        if (failure != null) {
+            throw HyracksDataException.create(failure);
+        }
     }
 
     @Override
     public void fail() throws HyracksDataException {
-        if (lsmIndex != null) {
+        if (writerOpen) {
             writer.fail();
         }
     }
diff --git a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/dataflow/LSMTreeInsertDeleteOperatorDescriptor.java b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/dataflow/LSMTreeInsertDeleteOperatorDescriptor.java
index 2226ca0..4b4a164 100644
--- a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/dataflow/LSMTreeInsertDeleteOperatorDescriptor.java
+++ b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/dataflow/LSMTreeInsertDeleteOperatorDescriptor.java
@@ -21,6 +21,7 @@
 import org.apache.hyracks.api.context.IHyracksTaskContext;
 import org.apache.hyracks.api.dataflow.IOperatorNodePushable;
 import org.apache.hyracks.api.dataflow.value.IRecordDescriptorProvider;
+import org.apache.hyracks.api.dataflow.value.ITuplePartitionerFactory;
 import org.apache.hyracks.api.dataflow.value.RecordDescriptor;
 import org.apache.hyracks.api.exceptions.HyracksDataException;
 import org.apache.hyracks.api.job.IOperatorDescriptorRegistry;
@@ -38,8 +39,10 @@
     public LSMTreeInsertDeleteOperatorDescriptor(IOperatorDescriptorRegistry spec, RecordDescriptor outRecDesc,
             int[] fieldPermutation, IndexOperation op, IIndexDataflowHelperFactory indexHelperFactory,
             ITupleFilterFactory tupleFilterFactory, boolean isPrimary,
-            IModificationOperationCallbackFactory modCallbackFactory) {
-        super(spec, outRecDesc, indexHelperFactory, fieldPermutation, op, modCallbackFactory, tupleFilterFactory);
+            IModificationOperationCallbackFactory modCallbackFactory, ITuplePartitionerFactory tuplePartitionerFactory,
+            int[][] partitionsMap) {
+        super(spec, outRecDesc, indexHelperFactory, fieldPermutation, op, modCallbackFactory, tupleFilterFactory,
+                tuplePartitionerFactory, partitionsMap);
         this.isPrimary = isPrimary;
     }
 
@@ -48,7 +51,8 @@
             IRecordDescriptorProvider recordDescProvider, int partition, int nPartitions) throws HyracksDataException {
         RecordDescriptor inputRecDesc = recordDescProvider.getInputRecordDescriptor(getActivityId(), 0);
         return new LSMInsertDeleteOperatorNodePushable(ctx, partition, fieldPermutation, inputRecDesc, op, isPrimary,
-                indexHelperFactory, modCallbackFactory, tupleFilterFactory, sourceLoc);
+                indexHelperFactory, modCallbackFactory, tupleFilterFactory, sourceLoc, tuplePartitionerFactory,
+                partitionsMap);
     }
 
     public boolean isPrimary() {
diff --git a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/declared/MetadataProvider.java b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/declared/MetadataProvider.java
index 3692a8d..debf60e 100644
--- a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/declared/MetadataProvider.java
+++ b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/declared/MetadataProvider.java
@@ -606,12 +606,7 @@
         IIndexDataflowHelperFactory indexHelperFactory = new IndexDataflowHelperFactory(storageManager, spPc.first);
         BTreeSearchOperatorDescriptor btreeSearchOp;
 
-        int numPartitions;
-        if (spPc.second.getPartitionConstraintType() == AlgebricksPartitionConstraint.PartitionConstraintType.COUNT) {
-            numPartitions = ((AlgebricksCountPartitionConstraint) spPc.second).getCount();
-        } else {
-            numPartitions = ((AlgebricksAbsolutePartitionConstraint) spPc.second).getLocations().length;
-        }
+        int numPartitions = getNumPartitions(spPc.second);
         int[][] partitionsMap = getPartitionsMap(numPartitions);
         ITuplePartitionerFactory tuplePartitionerFactory = null;
         if (partitionInputTuples) {
@@ -638,7 +633,15 @@
         return new Pair<>(btreeSearchOp, spPc.second);
     }
 
-    private static int[][] getPartitionsMap(int numPartitions) {
+    public static int getNumPartitions(AlgebricksPartitionConstraint constraint) {
+        if (constraint.getPartitionConstraintType() == AlgebricksPartitionConstraint.PartitionConstraintType.COUNT) {
+            return ((AlgebricksCountPartitionConstraint) constraint).getCount();
+        } else {
+            return ((AlgebricksAbsolutePartitionConstraint) constraint).getLocations().length;
+        }
+    }
+
+    public static int[][] getPartitionsMap(int numPartitions) {
         int[][] map = new int[numPartitions][1];
         for (int i = 0; i < numPartitions; i++) {
             map[i] = new int[] { i };
@@ -1060,9 +1063,11 @@
         int[] fieldPermutation = new int[numKeys + 1 + numFilterFields
                 + (additionalNonFilteringFields == null ? 0 : additionalNonFilteringFields.size())];
         int[] bloomFilterKeyFields = new int[numKeys];
+        int[] pkFields = new int[numKeys];
         int i = 0;
         for (LogicalVariable varKey : keys) {
             int idx = propagatedSchema.findVariable(varKey);
+            pkFields[i] = idx;
             fieldPermutation[i] = idx;
             bloomFilterKeyFields[i] = i;
             i++;
@@ -1097,6 +1102,13 @@
                 .getModificationCallbackFactory(storageComponentProvider, primaryIndex, indexOp, primaryKeyFields);
         IIndexDataflowHelperFactory idfh =
                 new IndexDataflowHelperFactory(storageComponentProvider.getStorageManager(), splitsAndConstraint.first);
+
+        int numPartitions = getNumPartitions(splitsAndConstraint.second);
+        int[][] partitionsMap = getPartitionsMap(numPartitions);
+        IBinaryHashFunctionFactory[] pkHashFunFactories = dataset.getPrimaryHashFunctionFactories(this);
+        ITuplePartitionerFactory tuplePartitionerFactory =
+                new FieldHashPartitionerFactory(pkFields, pkHashFunFactories, numPartitions);
+
         IOperatorDescriptor op;
         if (bulkload) {
             long numElementsHint = getCardinalityPerPartitionHint(dataset);
@@ -1119,11 +1131,12 @@
                             primaryKeySplitsAndConstraint.first);
                 }
                 op = createLSMPrimaryInsertOperatorDescriptor(spec, inputRecordDesc, fieldPermutation, idfh, pkidfh,
-                        modificationCallbackFactory, searchCallbackFactory, numKeys, filterFields);
+                        modificationCallbackFactory, searchCallbackFactory, numKeys, filterFields,
+                        tuplePartitionerFactory, partitionsMap);
 
             } else {
                 op = createLSMTreeInsertDeleteOperatorDescriptor(spec, inputRecordDesc, fieldPermutation, indexOp, idfh,
-                        null, true, modificationCallbackFactory);
+                        null, true, modificationCallbackFactory, tuplePartitionerFactory, partitionsMap);
             }
         }
         return new Pair<>(op, splitsAndConstraint.second);
@@ -1132,18 +1145,21 @@
     protected LSMPrimaryInsertOperatorDescriptor createLSMPrimaryInsertOperatorDescriptor(JobSpecification spec,
             RecordDescriptor inputRecordDesc, int[] fieldPermutation, IIndexDataflowHelperFactory idfh,
             IIndexDataflowHelperFactory pkidfh, IModificationOperationCallbackFactory modificationCallbackFactory,
-            ISearchOperationCallbackFactory searchCallbackFactory, int numKeys, int[] filterFields) {
+            ISearchOperationCallbackFactory searchCallbackFactory, int numKeys, int[] filterFields,
+            ITuplePartitionerFactory tuplePartitionerFactory, int[][] partitionsMap) {
         // this can be used by extensions to pick up their own operators
         return new LSMPrimaryInsertOperatorDescriptor(spec, inputRecordDesc, fieldPermutation, idfh, pkidfh,
-                modificationCallbackFactory, searchCallbackFactory, numKeys, filterFields);
+                modificationCallbackFactory, searchCallbackFactory, numKeys, filterFields, tuplePartitionerFactory,
+                partitionsMap);
     }
 
     protected LSMTreeInsertDeleteOperatorDescriptor createLSMTreeInsertDeleteOperatorDescriptor(
             IOperatorDescriptorRegistry spec, RecordDescriptor outRecDesc, int[] fieldPermutation, IndexOperation op,
             IIndexDataflowHelperFactory indexHelperFactory, ITupleFilterFactory tupleFilterFactory, boolean isPrimary,
-            IModificationOperationCallbackFactory modCallbackFactory) {
+            IModificationOperationCallbackFactory modCallbackFactory, ITuplePartitionerFactory tuplePartitionerFactory,
+            int[][] partitionsMap) {
         return new LSMTreeInsertDeleteOperatorDescriptor(spec, outRecDesc, fieldPermutation, op, indexHelperFactory,
-                tupleFilterFactory, isPrimary, modCallbackFactory);
+                tupleFilterFactory, isPrimary, modCallbackFactory, tuplePartitionerFactory, partitionsMap);
     }
 
     private Pair<IOperatorDescriptor, AlgebricksPartitionConstraint> getIndexInsertOrDeleteOrUpsertRuntime(
@@ -1234,6 +1250,7 @@
         // generate field permutations
         int[] fieldPermutation = new int[numKeys + numFilterFields];
         int[] modificationCallbackPrimaryKeyFields = new int[primaryKeys.size()];
+        int[] pkFields = new int[primaryKeys.size()];
         int i = 0;
         int j = 0;
         for (LogicalVariable varKey : secondaryKeys) {
@@ -1244,6 +1261,7 @@
         for (LogicalVariable varKey : primaryKeys) {
             int idx = propagatedSchema.findVariable(varKey);
             fieldPermutation[i] = idx;
+            pkFields[j] = idx;
             modificationCallbackPrimaryKeyFields[j] = i;
             i++;
             j++;
@@ -1286,6 +1304,13 @@
                     storageComponentProvider, secondaryIndex, indexOp, modificationCallbackPrimaryKeyFields);
             IIndexDataflowHelperFactory idfh = new IndexDataflowHelperFactory(
                     storageComponentProvider.getStorageManager(), splitsAndConstraint.first);
+
+            int numPartitions = getNumPartitions(splitsAndConstraint.second);
+            int[][] partitionsMap = getPartitionsMap(numPartitions);
+            IBinaryHashFunctionFactory[] pkHashFunFactories = dataset.getPrimaryHashFunctionFactories(this);
+            ITuplePartitionerFactory tuplePartitionerFactory =
+                    new FieldHashPartitionerFactory(pkFields, pkHashFunFactories, numPartitions);
+
             IOperatorDescriptor op;
             if (bulkload) {
                 long numElementsHint = getCardinalityPerPartitionHint(dataset);
@@ -1296,10 +1321,10 @@
                 int operationFieldIndex = propagatedSchema.findVariable(operationVar);
                 op = new LSMSecondaryUpsertOperatorDescriptor(spec, inputRecordDesc, fieldPermutation, idfh,
                         filterFactory, prevFilterFactory, modificationCallbackFactory, operationFieldIndex,
-                        BinaryIntegerInspector.FACTORY, prevFieldPermutation);
+                        BinaryIntegerInspector.FACTORY, prevFieldPermutation, tuplePartitionerFactory, partitionsMap);
             } else {
                 op = new LSMTreeInsertDeleteOperatorDescriptor(spec, inputRecordDesc, fieldPermutation, indexOp, idfh,
-                        filterFactory, false, modificationCallbackFactory);
+                        filterFactory, false, modificationCallbackFactory, tuplePartitionerFactory, partitionsMap);
             }
             return new Pair<>(op, splitsAndConstraint.second);
         } catch (Exception e) {
@@ -1320,11 +1345,13 @@
         // Generate field permutations (this only includes primary keys and filter fields).
         int[] fieldPermutation = new int[numPrimaryKeys + numFilterFields];
         int[] modificationCallbackPrimaryKeyFields = new int[primaryKeys.size()];
+        int[] pkFields = new int[primaryKeys.size()];
         int i = 0;
         int j = 0;
         for (LogicalVariable varKey : primaryKeys) {
             int idx = propagatedSchema.findVariable(varKey);
             fieldPermutation[i] = idx;
+            pkFields[j] = idx;
             modificationCallbackPrimaryKeyFields[j] = i;
             i++;
             j++;
@@ -1345,15 +1372,24 @@
                     storageComponentProvider, secondaryIndex, indexOp, modificationCallbackPrimaryKeyFields);
             IIndexDataflowHelperFactory idfh = new IndexDataflowHelperFactory(
                     storageComponentProvider.getStorageManager(), splitsAndConstraint.first);
+
+            int numPartitions = getNumPartitions(splitsAndConstraint.second);
+            int[][] partitionsMap = getPartitionsMap(numPartitions);
+            IBinaryHashFunctionFactory[] pkHashFunFactories = dataset.getPrimaryHashFunctionFactories(this);
+            ITuplePartitionerFactory tuplePartitionerFactory =
+                    new FieldHashPartitionerFactory(pkFields, pkHashFunFactories, numPartitions);
+
             IOperatorDescriptor op;
             if (indexOp == IndexOperation.UPSERT) {
                 int operationFieldIndex = propagatedSchema.findVariable(operationVar);
                 op = new LSMSecondaryUpsertWithNestedPlanOperatorDescriptor(spec, inputRecordDesc, fieldPermutation,
                         idfh, modificationCallbackFactory, operationFieldIndex, BinaryIntegerInspector.FACTORY,
-                        secondaryKeysPipelines.get(0), secondaryKeysPipelines.get(1));
+                        secondaryKeysPipelines.get(0), secondaryKeysPipelines.get(1), tuplePartitionerFactory,
+                        partitionsMap);
             } else {
                 op = new LSMSecondaryInsertDeleteWithNestedPlanOperatorDescriptor(spec, inputRecordDesc,
-                        fieldPermutation, indexOp, idfh, modificationCallbackFactory, secondaryKeysPipelines.get(0));
+                        fieldPermutation, indexOp, idfh, modificationCallbackFactory, secondaryKeysPipelines.get(0),
+                        tuplePartitionerFactory, partitionsMap);
             }
             return new Pair<>(op, splitsAndConstraint.second);
         } catch (Exception e) {
@@ -1390,6 +1426,7 @@
         int numFilterFields = DatasetUtil.getFilterField(dataset) == null ? 0 : 1;
         int[] fieldPermutation = new int[numKeys + numFilterFields];
         int[] modificationCallbackPrimaryKeyFields = new int[primaryKeys.size()];
+        int[] pkFields = new int[primaryKeys.size()];
         int i = 0;
         int j = 0;
 
@@ -1401,6 +1438,7 @@
         for (LogicalVariable varKey : primaryKeys) {
             int idx = propagatedSchema.findVariable(varKey);
             fieldPermutation[i] = idx;
+            pkFields[j] = idx;
             modificationCallbackPrimaryKeyFields[j] = i;
             i++;
             j++;
@@ -1441,6 +1479,13 @@
                 storageComponentProvider, secondaryIndex, indexOp, modificationCallbackPrimaryKeyFields);
         IIndexDataflowHelperFactory indexDataflowHelperFactory =
                 new IndexDataflowHelperFactory(storageComponentProvider.getStorageManager(), splitsAndConstraint.first);
+
+        int numPartitions = getNumPartitions(splitsAndConstraint.second);
+        int[][] partitionsMap = getPartitionsMap(numPartitions);
+        IBinaryHashFunctionFactory[] pkHashFunFactories = dataset.getPrimaryHashFunctionFactories(this);
+        ITuplePartitionerFactory tuplePartitionerFactory =
+                new FieldHashPartitionerFactory(pkFields, pkHashFunFactories, numPartitions);
+
         IOperatorDescriptor op;
         if (bulkload) {
             long numElementsHint = getCardinalityPerPartitionHint(dataset);
@@ -1451,10 +1496,12 @@
             int operationFieldIndex = propagatedSchema.findVariable(operationVar);
             op = new LSMSecondaryUpsertOperatorDescriptor(spec, recordDesc, fieldPermutation,
                     indexDataflowHelperFactory, filterFactory, prevFilterFactory, modificationCallbackFactory,
-                    operationFieldIndex, BinaryIntegerInspector.FACTORY, prevFieldPermutation);
+                    operationFieldIndex, BinaryIntegerInspector.FACTORY, prevFieldPermutation, tuplePartitionerFactory,
+                    partitionsMap);
         } else {
             op = new LSMTreeInsertDeleteOperatorDescriptor(spec, recordDesc, fieldPermutation, indexOp,
-                    indexDataflowHelperFactory, filterFactory, false, modificationCallbackFactory);
+                    indexDataflowHelperFactory, filterFactory, false, modificationCallbackFactory,
+                    tuplePartitionerFactory, partitionsMap);
         }
         return new Pair<>(op, splitsAndConstraint.second);
     }
@@ -1492,6 +1539,7 @@
         // generate field permutations
         int[] fieldPermutation = new int[numKeys + numFilterFields];
         int[] modificationCallbackPrimaryKeyFields = new int[primaryKeys.size()];
+        int[] pkFields = new int[primaryKeys.size()];
         int i = 0;
         int j = 0;
 
@@ -1505,6 +1553,7 @@
         for (LogicalVariable varKey : primaryKeys) {
             int idx = propagatedSchema.findVariable(varKey);
             fieldPermutation[i] = idx;
+            pkFields[j] = idx;
             modificationCallbackPrimaryKeyFields[j] = i;
             i++;
             j++;
@@ -1550,6 +1599,13 @@
                     storageComponentProvider, secondaryIndex, indexOp, modificationCallbackPrimaryKeyFields);
             IIndexDataflowHelperFactory indexDataFlowFactory = new IndexDataflowHelperFactory(
                     storageComponentProvider.getStorageManager(), splitsAndConstraint.first);
+
+            int numPartitions = getNumPartitions(splitsAndConstraint.second);
+            int[][] partitionsMap = getPartitionsMap(numPartitions);
+            IBinaryHashFunctionFactory[] pkHashFunFactories = dataset.getPrimaryHashFunctionFactories(this);
+            ITuplePartitionerFactory tuplePartitionerFactory =
+                    new FieldHashPartitionerFactory(pkFields, pkHashFunFactories, numPartitions);
+
             IOperatorDescriptor op;
             if (bulkload) {
                 long numElementsHint = getCardinalityPerPartitionHint(dataset);
@@ -1560,10 +1616,11 @@
                 int upsertOperationFieldIndex = propagatedSchema.findVariable(operationVar);
                 op = new LSMSecondaryUpsertOperatorDescriptor(spec, recordDesc, fieldPermutation, indexDataFlowFactory,
                         filterFactory, prevFilterFactory, modificationCallbackFactory, upsertOperationFieldIndex,
-                        BinaryIntegerInspector.FACTORY, prevFieldPermutation);
+                        BinaryIntegerInspector.FACTORY, prevFieldPermutation, tuplePartitionerFactory, partitionsMap);
             } else {
                 op = new LSMTreeInsertDeleteOperatorDescriptor(spec, recordDesc, fieldPermutation, indexOp,
-                        indexDataFlowFactory, filterFactory, false, modificationCallbackFactory);
+                        indexDataFlowFactory, filterFactory, false, modificationCallbackFactory,
+                        tuplePartitionerFactory, partitionsMap);
             }
             return new Pair<>(op, splitsAndConstraint.second);
         } catch (Exception e) {
diff --git a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/utils/DatasetUtil.java b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/utils/DatasetUtil.java
index b5dd8bf..2c1bdea 100644
--- a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/utils/DatasetUtil.java
+++ b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/utils/DatasetUtil.java
@@ -73,8 +73,10 @@
 import org.apache.hyracks.algebricks.data.IBinaryComparatorFactoryProvider;
 import org.apache.hyracks.api.dataflow.IOperatorDescriptor;
 import org.apache.hyracks.api.dataflow.value.IBinaryComparatorFactory;
+import org.apache.hyracks.api.dataflow.value.IBinaryHashFunctionFactory;
 import org.apache.hyracks.api.dataflow.value.IMissingWriterFactory;
 import org.apache.hyracks.api.dataflow.value.ISerializerDeserializer;
+import org.apache.hyracks.api.dataflow.value.ITuplePartitionerFactory;
 import org.apache.hyracks.api.dataflow.value.ITypeTraits;
 import org.apache.hyracks.api.dataflow.value.RecordDescriptor;
 import org.apache.hyracks.api.exceptions.HyracksDataException;
@@ -83,6 +85,7 @@
 import org.apache.hyracks.data.std.util.ArrayBackedValueStorage;
 import org.apache.hyracks.dataflow.common.comm.io.ArrayTupleBuilder;
 import org.apache.hyracks.dataflow.common.data.marshalling.IntegerSerializerDeserializer;
+import org.apache.hyracks.dataflow.common.data.partition.FieldHashPartitionerFactory;
 import org.apache.hyracks.dataflow.std.file.IFileSplitProvider;
 import org.apache.hyracks.dataflow.std.misc.ConstantTupleSourceOperatorDescriptor;
 import org.apache.hyracks.storage.am.btree.dataflow.BTreeSearchOperatorDescriptor;
@@ -439,8 +442,10 @@
 
         // prepare callback
         int[] primaryKeyFields = new int[numKeys];
+        int[] pkFields = new int[numKeys];
         for (int i = 0; i < numKeys; i++) {
             primaryKeyFields[i] = i;
+            pkFields[i] = fieldPermutation[i];
         }
         boolean hasSecondaries =
                 metadataProvider.getDatasetIndexes(dataset.getDataverseName(), dataset.getDatasetName()).size() > 1;
@@ -506,10 +511,16 @@
         ITupleProjectorFactory projectorFactory = IndexUtil.createUpsertTupleProjectorFactory(
                 dataset.getDatasetFormatInfo(), requestedType, itemType, metaItemType, numKeys);
 
+        int numPartitions = MetadataProvider.getNumPartitions(splitsAndConstraint.second);
+        int[][] partitionsMap = MetadataProvider.getPartitionsMap(numPartitions);
+        IBinaryHashFunctionFactory[] pkHashFunFactories = dataset.getPrimaryHashFunctionFactories(metadataProvider);
+        ITuplePartitionerFactory tuplePartitionerFactory =
+                new FieldHashPartitionerFactory(pkFields, pkHashFunFactories, numPartitions);
+
         op = new LSMPrimaryUpsertOperatorDescriptor(spec, outputRecordDesc, fieldPermutation, idfh,
                 missingWriterFactory, modificationCallbackFactory, searchCallbackFactory,
                 dataset.getFrameOpCallbackFactory(metadataProvider), numKeys, filterSourceIndicator, filterItemType,
-                fieldIdx, hasSecondaries, projectorFactory);
+                fieldIdx, hasSecondaries, projectorFactory, tuplePartitionerFactory, partitionsMap);
         return new Pair<>(op, splitsAndConstraint.second);
     }
 
diff --git a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/operators/LSMPrimaryInsertOperatorDescriptor.java b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/operators/LSMPrimaryInsertOperatorDescriptor.java
index d639f3d..f46e172 100644
--- a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/operators/LSMPrimaryInsertOperatorDescriptor.java
+++ b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/operators/LSMPrimaryInsertOperatorDescriptor.java
@@ -22,6 +22,7 @@
 import org.apache.hyracks.api.context.IHyracksTaskContext;
 import org.apache.hyracks.api.dataflow.IOperatorNodePushable;
 import org.apache.hyracks.api.dataflow.value.IRecordDescriptorProvider;
+import org.apache.hyracks.api.dataflow.value.ITuplePartitionerFactory;
 import org.apache.hyracks.api.dataflow.value.RecordDescriptor;
 import org.apache.hyracks.api.exceptions.HyracksDataException;
 import org.apache.hyracks.api.job.IOperatorDescriptorRegistry;
@@ -43,9 +44,10 @@
             int[] fieldPermutation, IIndexDataflowHelperFactory indexHelperFactory,
             IIndexDataflowHelperFactory keyIndexHelperFactory,
             IModificationOperationCallbackFactory modificationOpCallbackFactory,
-            ISearchOperationCallbackFactory searchOpCallbackFactory, int numOfPrimaryKeys, int[] filterFields) {
+            ISearchOperationCallbackFactory searchOpCallbackFactory, int numOfPrimaryKeys, int[] filterFields,
+            ITuplePartitionerFactory tuplePartitionerFactory, int[][] partitionsMap) {
         super(spec, outRecDesc, fieldPermutation, IndexOperation.UPSERT, indexHelperFactory, null, true,
-                modificationOpCallbackFactory);
+                modificationOpCallbackFactory, tuplePartitionerFactory, partitionsMap);
         this.keyIndexHelperFactory = keyIndexHelperFactory;
         this.searchOpCallbackFactory = searchOpCallbackFactory;
         this.numOfPrimaryKeys = numOfPrimaryKeys;
@@ -58,6 +60,6 @@
         RecordDescriptor intputRecDesc = recordDescProvider.getInputRecordDescriptor(getActivityId(), 0);
         return new LSMPrimaryInsertOperatorNodePushable(ctx, partition, indexHelperFactory, keyIndexHelperFactory,
                 fieldPermutation, intputRecDesc, modCallbackFactory, searchOpCallbackFactory, numOfPrimaryKeys,
-                filterFields, sourceLoc);
+                filterFields, sourceLoc, tuplePartitionerFactory, partitionsMap);
     }
 }
diff --git a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/operators/LSMPrimaryInsertOperatorNodePushable.java b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/operators/LSMPrimaryInsertOperatorNodePushable.java
index eadb614..7e51ec1 100644
--- a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/operators/LSMPrimaryInsertOperatorNodePushable.java
+++ b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/operators/LSMPrimaryInsertOperatorNodePushable.java
@@ -28,6 +28,7 @@
 import org.apache.asterix.transaction.management.opcallbacks.LockThenSearchOperationCallback;
 import org.apache.hyracks.api.comm.VSizeFrame;
 import org.apache.hyracks.api.context.IHyracksTaskContext;
+import org.apache.hyracks.api.dataflow.value.ITuplePartitionerFactory;
 import org.apache.hyracks.api.dataflow.value.RecordDescriptor;
 import org.apache.hyracks.api.exceptions.ErrorCode;
 import org.apache.hyracks.api.exceptions.HyracksDataException;
@@ -52,46 +53,64 @@
 import org.apache.hyracks.storage.am.common.ophelpers.IndexOperation;
 import org.apache.hyracks.storage.am.lsm.common.api.IFrameOperationCallback;
 import org.apache.hyracks.storage.am.lsm.common.api.IFrameTupleProcessor;
+import org.apache.hyracks.storage.am.lsm.common.api.ILSMIndexAccessor;
 import org.apache.hyracks.storage.am.lsm.common.dataflow.LSMIndexInsertUpdateDeleteOperatorNodePushable;
 import org.apache.hyracks.storage.am.lsm.common.impls.AbstractLSMIndex;
 import org.apache.hyracks.storage.am.lsm.common.impls.LSMTreeIndexAccessor;
 import org.apache.hyracks.storage.common.IIndex;
 import org.apache.hyracks.storage.common.IIndexAccessParameters;
+import org.apache.hyracks.storage.common.IIndexAccessor;
 import org.apache.hyracks.storage.common.IIndexCursor;
 import org.apache.hyracks.storage.common.MultiComparator;
 
+import it.unimi.dsi.fastutil.ints.Int2ObjectMap;
+import it.unimi.dsi.fastutil.ints.Int2ObjectOpenHashMap;
+import it.unimi.dsi.fastutil.ints.IntOpenHashSet;
+import it.unimi.dsi.fastutil.ints.IntSet;
+
 public class LSMPrimaryInsertOperatorNodePushable extends LSMIndexInsertUpdateDeleteOperatorNodePushable {
 
-    private final IIndexDataflowHelper keyIndexHelper;
+    private final IIndexDataflowHelper[] keyIndexHelpers;
     private MultiComparator keySearchCmp;
     private RangePredicate searchPred;
-    private IIndexCursor cursor;
-    private LockThenSearchOperationCallback searchCallback;
+    private final IIndexCursor[] cursors;
+    private final LockThenSearchOperationCallback[] searchCallbacks;
     private final ISearchOperationCallbackFactory searchCallbackFactory;
-    private final IFrameTupleProcessor processor;
-    private LSMTreeIndexAccessor lsmAccessor;
-    private LSMTreeIndexAccessor lsmAccessorForKeyIndex;
-    private LSMTreeIndexAccessor lsmAccessorForUniqunessCheck;
+    private final IFrameTupleProcessor[] processors;
+    private final LSMTreeIndexAccessor[] lsmAccessorForKeyIndexes;
+    private final LSMTreeIndexAccessor[] lsmAccessorForUniqunessChecks;
 
-    private final IFrameOperationCallback frameOpCallback;
+    private final IFrameOperationCallback[] frameOpCallbacks;
     private boolean flushedPartialTuples;
     private int currentTupleIdx;
     private int lastFlushedTupleIdx;
 
     private final PermutingFrameTupleReference keyTuple;
+    private final Int2ObjectMap<IntSet> partition2TuplesMap = new Int2ObjectOpenHashMap<>();
+    private final SourceLocation sourceLoc;
 
     public LSMPrimaryInsertOperatorNodePushable(IHyracksTaskContext ctx, int partition,
             IIndexDataflowHelperFactory indexHelperFactory, IIndexDataflowHelperFactory keyIndexHelperFactory,
             int[] fieldPermutation, RecordDescriptor inputRecDesc,
             IModificationOperationCallbackFactory modCallbackFactory,
             ISearchOperationCallbackFactory searchCallbackFactory, int numOfPrimaryKeys, int[] filterFields,
-            SourceLocation sourceLoc) throws HyracksDataException {
+            SourceLocation sourceLoc, ITuplePartitionerFactory tuplePartitionerFactory, int[][] partitionsMap)
+            throws HyracksDataException {
         super(ctx, partition, indexHelperFactory, fieldPermutation, inputRecDesc, IndexOperation.UPSERT,
-                modCallbackFactory, null);
+                modCallbackFactory, null, tuplePartitionerFactory, partitionsMap);
+        this.sourceLoc = sourceLoc;
+        this.frameOpCallbacks = new IFrameOperationCallback[partitions.length];
+        this.searchCallbacks = new LockThenSearchOperationCallback[partitions.length];
+        this.cursors = new IIndexCursor[partitions.length];
+        this.lsmAccessorForUniqunessChecks = new LSMTreeIndexAccessor[partitions.length];
+        this.lsmAccessorForKeyIndexes = new LSMTreeIndexAccessor[partitions.length];
+        this.keyIndexHelpers = new IIndexDataflowHelper[partitions.length];
+        this.processors = new IFrameTupleProcessor[partitions.length];
         if (keyIndexHelperFactory != null) {
-            this.keyIndexHelper = keyIndexHelperFactory.create(ctx.getJobletContext().getServiceContext(), partition);
-        } else {
-            this.keyIndexHelper = null;
+            for (int i = 0; i < partitions.length; i++) {
+                this.keyIndexHelpers[i] =
+                        keyIndexHelperFactory.create(ctx.getJobletContext().getServiceContext(), partitions[i]);
+            }
         }
         this.searchCallbackFactory = searchCallbackFactory;
         int numFilterFieds = filterFields != null ? filterFields.length : 0;
@@ -105,8 +124,6 @@
             }
         }
         keyTuple = new PermutingFrameTupleReference(searchKeyPermutations);
-        processor = createTupleProcessor(sourceLoc);
-        frameOpCallback = NoOpFrameOperationCallbackFactory.INSTANCE.createFrameOperationCallback(ctx, lsmAccessor);
     }
 
     protected void beforeModification(ITupleReference tuple) {
@@ -114,60 +131,68 @@
         // do nothing in the master branch
     }
 
-    protected IFrameTupleProcessor createTupleProcessor(SourceLocation sourceLoc) {
-        return new IFrameTupleProcessor() {
-            @Override
-            public void process(ITupleReference tuple, int index) throws HyracksDataException {
-                if (index < currentTupleIdx) {
-                    // already processed; skip
-                    return;
-                }
-                keyTuple.reset(accessor, index);
-                searchPred.reset(keyTuple, keyTuple, true, true, keySearchCmp, keySearchCmp);
-                boolean duplicate = false;
-
-                lsmAccessorForUniqunessCheck.search(cursor, searchPred);
-                try {
-                    if (cursor.hasNext()) {
-                        // duplicate, skip
-                        searchCallback.release();
-                        duplicate = true;
+    protected void createTupleProcessors(SourceLocation sourceLoc) {
+        for (int i = 0; i < partitions.length; i++) {
+            LSMTreeIndexAccessor lsmAccessorForUniqunessCheck = lsmAccessorForUniqunessChecks[i];
+            IIndexCursor cursor = cursors[i];
+            IIndexAccessor indexAccessor = indexAccessors[i];
+            LSMTreeIndexAccessor lsmAccessorForKeyIndex = lsmAccessorForKeyIndexes[i];
+            LockThenSearchOperationCallback searchCallback = searchCallbacks[i];
+            processors[i] = new IFrameTupleProcessor() {
+                @Override
+                public void process(FrameTupleAccessor accessor, ITupleReference tuple, int index)
+                        throws HyracksDataException {
+                    if (index < currentTupleIdx) {
+                        // already processed; skip
+                        return;
                     }
-                } finally {
-                    cursor.close();
-                }
-                if (!duplicate) {
-                    beforeModification(tuple);
-                    lsmAccessor.forceUpsert(tuple);
-                    if (lsmAccessorForKeyIndex != null) {
-                        lsmAccessorForKeyIndex.forceUpsert(keyTuple);
+                    keyTuple.reset(accessor, index);
+                    searchPred.reset(keyTuple, keyTuple, true, true, keySearchCmp, keySearchCmp);
+                    boolean duplicate = false;
+
+                    lsmAccessorForUniqunessCheck.search(cursor, searchPred);
+                    try {
+                        if (cursor.hasNext()) {
+                            // duplicate, skip
+                            searchCallback.release();
+                            duplicate = true;
+                        }
+                    } finally {
+                        cursor.close();
                     }
-                } else {
-                    // we should flush previous inserted records so that these transactions can commit
-                    flushPartialFrame();
-                    // feed requires this nested exception to remove duplicated tuples
-                    // TODO: a better way to treat duplicates?
-                    throw HyracksDataException.create(ErrorCode.ERROR_PROCESSING_TUPLE,
-                            HyracksDataException.create(ErrorCode.DUPLICATE_KEY), sourceLoc, index);
+                    if (!duplicate) {
+                        beforeModification(tuple);
+                        ((ILSMIndexAccessor) indexAccessor).forceUpsert(tuple);
+                        if (lsmAccessorForKeyIndex != null) {
+                            lsmAccessorForKeyIndex.forceUpsert(keyTuple);
+                        }
+                    } else {
+                        // we should flush previous inserted records so that these transactions can commit
+                        flushPartialFrame();
+                        // feed requires this nested exception to remove duplicated tuples
+                        // TODO: a better way to treat duplicates?
+                        throw HyracksDataException.create(ErrorCode.ERROR_PROCESSING_TUPLE,
+                                HyracksDataException.create(ErrorCode.DUPLICATE_KEY), sourceLoc, index);
+                    }
+                    currentTupleIdx = index + 1;
                 }
-                currentTupleIdx = index + 1;
-            }
 
-            @Override
-            public void start() throws HyracksDataException {
-                lsmAccessor.getCtx().setOperation(IndexOperation.UPSERT);
-            }
+                @Override
+                public void start() throws HyracksDataException {
+                    ((LSMTreeIndexAccessor) indexAccessor).getCtx().setOperation(IndexOperation.UPSERT);
+                }
 
-            @Override
-            public void finish() throws HyracksDataException {
-                lsmAccessor.getCtx().setOperation(IndexOperation.UPSERT);
-            }
+                @Override
+                public void finish() throws HyracksDataException {
+                    ((LSMTreeIndexAccessor) indexAccessor).getCtx().setOperation(IndexOperation.UPSERT);
+                }
 
-            @Override
-            public void fail(Throwable th) {
-                // no op
-            }
-        };
+                @Override
+                public void fail(Throwable th) {
+                    // no op
+                }
+            };
+        }
     }
 
     @Override
@@ -177,48 +202,56 @@
         flushedPartialTuples = false;
         accessor = new FrameTupleAccessor(inputRecDesc);
         writeBuffer = new VSizeFrame(ctx);
-        indexHelper.open();
-        index = indexHelper.getIndexInstance();
-        IIndex indexForUniquessCheck;
-        if (keyIndexHelper != null) {
-            keyIndexHelper.open();
-            indexForUniquessCheck = keyIndexHelper.getIndexInstance();
-        } else {
-            indexForUniquessCheck = index;
-        }
         try {
-            if (ctx.getSharedObject() != null) {
-                PrimaryIndexLogMarkerCallback callback = new PrimaryIndexLogMarkerCallback((AbstractLSMIndex) index);
-                TaskUtil.put(ILogMarkerCallback.KEY_MARKER_CALLBACK, callback, ctx);
-            }
-            frameOpCallback.open();
-            writer.open();
-            keySearchCmp =
-                    BTreeUtils.getSearchMultiComparator(((ITreeIndex) index).getComparatorFactories(), frameTuple);
-            searchPred = new RangePredicate(frameTuple, frameTuple, true, true, keySearchCmp, keySearchCmp, null, null);
-            appender = new FrameTupleAppender(new VSizeFrame(ctx), true);
-            modCallback =
-                    modOpCallbackFactory.createModificationOperationCallback(indexHelper.getResource(), ctx, this);
-            searchCallback = (LockThenSearchOperationCallback) searchCallbackFactory
-                    .createSearchOperationCallback(indexHelper.getResource().getId(), ctx, this);
-            IIndexAccessParameters iap = new IndexAccessParameters(modCallback, NoOpOperationCallback.INSTANCE);
-            indexAccessor = index.createAccessor(iap);
-            lsmAccessor = (LSMTreeIndexAccessor) indexAccessor;
-            if (keyIndexHelper != null) {
-                lsmAccessorForKeyIndex = (LSMTreeIndexAccessor) indexForUniquessCheck.createAccessor(iap);
-            }
-
-            IIndexAccessParameters iapForUniquenessCheck =
-                    new IndexAccessParameters(NoOpOperationCallback.INSTANCE, searchCallback);
-            lsmAccessorForUniqunessCheck =
-                    (LSMTreeIndexAccessor) indexForUniquessCheck.createAccessor(iapForUniquenessCheck);
-
-            cursor = lsmAccessorForUniqunessCheck.createSearchCursor(false);
-            frameTuple = new FrameTupleReference();
             INcApplicationContext appCtx =
                     (INcApplicationContext) ctx.getJobletContext().getServiceContext().getApplicationContext();
-            LSMIndexUtil.checkAndSetFirstLSN((AbstractLSMIndex) index,
-                    appCtx.getTransactionSubsystem().getLogManager());
+
+            for (int i = 0; i < partitions.length; i++) {
+                IIndexDataflowHelper indexHelper = indexHelpers[i];
+                indexHelper.open();
+                indexes[i] = indexHelper.getIndexInstance();
+                IIndex index = indexes[i];
+                IIndexDataflowHelper keyIndexHelper = keyIndexHelpers[i];
+                IIndex indexForUniquessCheck;
+                if (keyIndexHelper != null) {
+                    keyIndexHelper.open();
+                    indexForUniquessCheck = keyIndexHelper.getIndexInstance();
+                } else {
+                    indexForUniquessCheck = index;
+                }
+                if (ctx.getSharedObject() != null && i == 0) {
+                    PrimaryIndexLogMarkerCallback callback =
+                            new PrimaryIndexLogMarkerCallback((AbstractLSMIndex) indexes[0]);
+                    TaskUtil.put(ILogMarkerCallback.KEY_MARKER_CALLBACK, callback, ctx);
+                }
+                writer.open();
+                writerOpen = true;
+                modCallbacks[i] =
+                        modOpCallbackFactory.createModificationOperationCallback(indexHelper.getResource(), ctx, this);
+                searchCallbacks[i] = (LockThenSearchOperationCallback) searchCallbackFactory
+                        .createSearchOperationCallback(indexHelper.getResource().getId(), ctx, this);
+                IIndexAccessParameters iap = new IndexAccessParameters(modCallbacks[i], NoOpOperationCallback.INSTANCE);
+                indexAccessors[i] = index.createAccessor(iap);
+                if (keyIndexHelper != null) {
+                    lsmAccessorForKeyIndexes[i] = (LSMTreeIndexAccessor) indexForUniquessCheck.createAccessor(iap);
+                }
+                frameOpCallbacks[i] = NoOpFrameOperationCallbackFactory.INSTANCE.createFrameOperationCallback(ctx,
+                        (ILSMIndexAccessor) indexAccessors[i]);
+                frameOpCallbacks[i].open();
+                IIndexAccessParameters iapForUniquenessCheck =
+                        new IndexAccessParameters(NoOpOperationCallback.INSTANCE, searchCallbacks[i]);
+                lsmAccessorForUniqunessChecks[i] =
+                        (LSMTreeIndexAccessor) indexForUniquessCheck.createAccessor(iapForUniquenessCheck);
+                cursors[i] = lsmAccessorForUniqunessChecks[i].createSearchCursor(false);
+                LSMIndexUtil.checkAndSetFirstLSN((AbstractLSMIndex) index,
+                        appCtx.getTransactionSubsystem().getLogManager());
+            }
+            createTupleProcessors(sourceLoc);
+            keySearchCmp =
+                    BTreeUtils.getSearchMultiComparator(((ITreeIndex) indexes[0]).getComparatorFactories(), frameTuple);
+            searchPred = new RangePredicate(frameTuple, frameTuple, true, true, keySearchCmp, keySearchCmp, null, null);
+            appender = new FrameTupleAppender(new VSizeFrame(ctx), true);
+            frameTuple = new FrameTupleReference();
         } catch (Throwable e) { // NOSONAR: Re-thrown
             throw HyracksDataException.create(e);
         }
@@ -227,7 +260,21 @@
     @Override
     public void nextFrame(ByteBuffer buffer) throws HyracksDataException {
         accessor.reset(buffer);
-        lsmAccessor.batchOperate(accessor, tuple, processor, frameOpCallback);
+        partition2TuplesMap.clear();
+        int itemCount = accessor.getTupleCount();
+        for (int i = 0; i < itemCount; i++) {
+            int storagePartition = tuplePartitioner.partition(accessor, i);
+            int pIdx = storagePartitionId2Index.get(storagePartition);
+            IntSet tupleIndexes = partition2TuplesMap.computeIfAbsent(pIdx, k -> new IntOpenHashSet());
+            tupleIndexes.add(i);
+        }
+        for (Int2ObjectMap.Entry<IntSet> p2tuplesMapEntry : partition2TuplesMap.int2ObjectEntrySet()) {
+            int pIdx = p2tuplesMapEntry.getIntKey();
+            LSMTreeIndexAccessor lsmAccessor = (LSMTreeIndexAccessor) indexAccessors[pIdx];
+            IFrameOperationCallback frameOpCallback = frameOpCallbacks[pIdx];
+            IFrameTupleProcessor processor = processors[pIdx];
+            lsmAccessor.batchOperate(accessor, tuple, processor, frameOpCallback, p2tuplesMapEntry.getValue());
+        }
 
         writeBuffer.ensureFrameSize(buffer.capacity());
         if (flushedPartialTuples) {
@@ -260,10 +307,10 @@
 
     @Override
     public void close() throws HyracksDataException {
-        Throwable failure = CleanupUtils.destroy(null, cursor);
+        Throwable failure = CleanupUtils.destroy(null, cursors);
         failure = CleanupUtils.close(writer, failure);
-        failure = CleanupUtils.close(indexHelper, failure);
-        failure = CleanupUtils.close(keyIndexHelper, failure);
+        failure = CleanupUtils.close(indexHelpers, failure);
+        failure = CleanupUtils.close(keyIndexHelpers, failure);
         if (failure != null) {
             throw HyracksDataException.create(failure);
         }
diff --git a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/operators/LSMPrimaryUpsertOperatorDescriptor.java b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/operators/LSMPrimaryUpsertOperatorDescriptor.java
index 3f1cd15..ee1d388 100644
--- a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/operators/LSMPrimaryUpsertOperatorDescriptor.java
+++ b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/operators/LSMPrimaryUpsertOperatorDescriptor.java
@@ -24,6 +24,7 @@
 import org.apache.hyracks.api.dataflow.IOperatorNodePushable;
 import org.apache.hyracks.api.dataflow.value.IMissingWriterFactory;
 import org.apache.hyracks.api.dataflow.value.IRecordDescriptorProvider;
+import org.apache.hyracks.api.dataflow.value.ITuplePartitionerFactory;
 import org.apache.hyracks.api.dataflow.value.RecordDescriptor;
 import org.apache.hyracks.api.exceptions.HyracksDataException;
 import org.apache.hyracks.api.job.IOperatorDescriptorRegistry;
@@ -54,9 +55,10 @@
             ISearchOperationCallbackFactory searchOpCallbackFactory,
             IFrameOperationCallbackFactory frameOpCallbackFactory, int numPrimaryKeys, Integer filterSourceIndicator,
             ARecordType filterItemType, int filterIndex, boolean hasSecondaries,
-            ITupleProjectorFactory projectorFactory) {
+            ITupleProjectorFactory projectorFactory, ITuplePartitionerFactory partitionerFactory,
+            int[][] partitionsMap) {
         super(spec, outRecDesc, fieldPermutation, IndexOperation.UPSERT, indexHelperFactory, null, true,
-                modificationOpCallbackFactory);
+                modificationOpCallbackFactory, partitionerFactory, partitionsMap);
         this.frameOpCallbackFactory = frameOpCallbackFactory;
         this.searchOpCallbackFactory = searchOpCallbackFactory;
         this.numPrimaryKeys = numPrimaryKeys;
@@ -75,6 +77,6 @@
         return new LSMPrimaryUpsertOperatorNodePushable(ctx, partition, indexHelperFactory, fieldPermutation,
                 intputRecDesc, modCallbackFactory, searchOpCallbackFactory, numPrimaryKeys, filterSourceIndicator,
                 filterItemType, filterIndex, frameOpCallbackFactory, missingWriterFactory, hasSecondaries,
-                projectorFactory);
+                projectorFactory, tuplePartitionerFactory, partitionsMap);
     }
 }
diff --git a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/operators/LSMPrimaryUpsertOperatorNodePushable.java b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/operators/LSMPrimaryUpsertOperatorNodePushable.java
index 5172cde..3a9a020 100644
--- a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/operators/LSMPrimaryUpsertOperatorNodePushable.java
+++ b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/operators/LSMPrimaryUpsertOperatorNodePushable.java
@@ -42,6 +42,7 @@
 import org.apache.hyracks.api.context.IHyracksTaskContext;
 import org.apache.hyracks.api.dataflow.value.IMissingWriter;
 import org.apache.hyracks.api.dataflow.value.IMissingWriterFactory;
+import org.apache.hyracks.api.dataflow.value.ITuplePartitionerFactory;
 import org.apache.hyracks.api.dataflow.value.RecordDescriptor;
 import org.apache.hyracks.api.exceptions.HyracksDataException;
 import org.apache.hyracks.api.util.CleanupUtils;
@@ -57,6 +58,7 @@
 import org.apache.hyracks.dataflow.common.utils.TaskUtil;
 import org.apache.hyracks.storage.am.btree.impls.RangePredicate;
 import org.apache.hyracks.storage.am.btree.util.BTreeUtils;
+import org.apache.hyracks.storage.am.common.api.IIndexDataflowHelper;
 import org.apache.hyracks.storage.am.common.api.IModificationOperationCallbackFactory;
 import org.apache.hyracks.storage.am.common.api.ISearchOperationCallbackFactory;
 import org.apache.hyracks.storage.am.common.api.ITreeIndex;
@@ -70,8 +72,10 @@
 import org.apache.hyracks.storage.am.lsm.common.dataflow.LSMIndexInsertUpdateDeleteOperatorNodePushable;
 import org.apache.hyracks.storage.am.lsm.common.impls.AbstractLSMIndex;
 import org.apache.hyracks.storage.am.lsm.common.impls.LSMTreeIndexAccessor;
+import org.apache.hyracks.storage.common.IIndex;
 import org.apache.hyracks.storage.common.IIndexAccessParameters;
 import org.apache.hyracks.storage.common.IIndexCursor;
+import org.apache.hyracks.storage.common.IModificationOperationCallback;
 import org.apache.hyracks.storage.common.MultiComparator;
 import org.apache.hyracks.storage.common.projection.ITupleProjector;
 import org.apache.hyracks.storage.common.projection.ITupleProjectorFactory;
@@ -81,6 +85,11 @@
 import org.apache.logging.log4j.LogManager;
 import org.apache.logging.log4j.Logger;
 
+import it.unimi.dsi.fastutil.ints.Int2ObjectMap;
+import it.unimi.dsi.fastutil.ints.Int2ObjectOpenHashMap;
+import it.unimi.dsi.fastutil.ints.IntOpenHashSet;
+import it.unimi.dsi.fastutil.ints.IntSet;
+
 public class LSMPrimaryUpsertOperatorNodePushable extends LSMIndexInsertUpdateDeleteOperatorNodePushable {
 
     public static final AInt8 UPSERT_NEW = new AInt8((byte) 0);
@@ -96,7 +105,7 @@
     protected ArrayTupleBuilder tb;
     private DataOutput dos;
     protected RangePredicate searchPred;
-    protected IIndexCursor cursor;
+    protected final IIndexCursor[] cursors;
     protected ITupleReference prevTuple;
     protected final int numOfPrimaryKeys;
     protected boolean isFiltered = false;
@@ -110,27 +119,32 @@
     private final boolean hasMeta;
     private final int filterFieldIndex;
     private final int metaFieldIndex;
-    protected LockThenSearchOperationCallback searchCallback;
-    protected IFrameOperationCallback frameOpCallback;
+    protected final LockThenSearchOperationCallback[] searchCallbacks;
+    protected final IFrameOperationCallback[] frameOpCallbacks;
     private final IFrameOperationCallbackFactory frameOpCallbackFactory;
-    protected AbstractIndexModificationOperationCallback abstractModCallback;
     private final ISearchOperationCallbackFactory searchCallbackFactory;
-    private final IFrameTupleProcessor processor;
-    protected LSMTreeIndexAccessor lsmAccessor;
+    private final IFrameTupleProcessor[] processors;
     private final ITracer tracer;
     private final long traceCategory;
     private final ITupleProjector tupleProjector;
     private long lastRecordInTimeStamp = 0L;
+    private final Int2ObjectMap<IntSet> partition2TuplesMap = new Int2ObjectOpenHashMap<>();
+    private final boolean hasSecondaries;
 
     public LSMPrimaryUpsertOperatorNodePushable(IHyracksTaskContext ctx, int partition,
             IIndexDataflowHelperFactory indexHelperFactory, int[] fieldPermutation, RecordDescriptor inputRecDesc,
             IModificationOperationCallbackFactory modCallbackFactory,
             ISearchOperationCallbackFactory searchCallbackFactory, int numOfPrimaryKeys, Integer filterSourceIndicator,
             ARecordType filterItemType, int filterFieldIndex, IFrameOperationCallbackFactory frameOpCallbackFactory,
-            IMissingWriterFactory missingWriterFactory, boolean hasSecondaries, ITupleProjectorFactory projectorFactory)
-            throws HyracksDataException {
+            IMissingWriterFactory missingWriterFactory, boolean hasSecondaries, ITupleProjectorFactory projectorFactory,
+            ITuplePartitionerFactory tuplePartitionerFactory, int[][] partitionsMap) throws HyracksDataException {
         super(ctx, partition, indexHelperFactory, fieldPermutation, inputRecDesc, IndexOperation.UPSERT,
-                modCallbackFactory, null);
+                modCallbackFactory, null, tuplePartitionerFactory, partitionsMap);
+        this.hasSecondaries = hasSecondaries;
+        this.frameOpCallbacks = new IFrameOperationCallback[partitions.length];
+        this.searchCallbacks = new LockThenSearchOperationCallback[partitions.length];
+        this.cursors = new IIndexCursor[partitions.length];
+        this.processors = new IFrameTupleProcessor[partitions.length];
         this.key = new PermutingFrameTupleReference();
         this.searchCallbackFactory = searchCallbackFactory;
         this.numOfPrimaryKeys = numOfPrimaryKeys;
@@ -152,7 +166,6 @@
             this.prevRecWithPKWithFilterValue = new ArrayTupleBuilder(fieldPermutation.length + (hasMeta ? 1 : 0));
             this.prevDos = prevRecWithPKWithFilterValue.getDataOutput();
         }
-        processor = createTupleProcessor(hasSecondaries);
         tracer = ctx.getJobletContext().getServiceContext().getTracer();
         traceCategory = tracer.getRegistry().get(TraceUtils.LATENCY);
         tupleProjector = projectorFactory.createTupleProjector(ctx);
@@ -163,78 +176,88 @@
         // do nothing in the master branch
     }
 
-    protected IFrameTupleProcessor createTupleProcessor(final boolean hasSecondaries) {
-        return new IFrameTupleProcessor() {
-            @Override
-            public void process(ITupleReference tuple, int index) throws HyracksDataException {
-                try {
-                    tb.reset();
-                    boolean recordWasInserted = false;
-                    boolean recordWasDeleted = false;
-                    boolean isDelete = isDeleteOperation(tuple, numOfPrimaryKeys);
-                    resetSearchPredicate(index);
-                    if (isFiltered || isDelete || hasSecondaries) {
-                        lsmAccessor.search(cursor, searchPred);
-                        try {
-                            if (cursor.hasNext()) {
-                                cursor.next();
-                                prevTuple = tupleProjector.project(cursor.getTuple(), dos, tb);
-                                appendOperationIndicator(!isDelete, true);
-                                appendFilterToPrevTuple();
-                                appendPrevRecord();
-                                appendPreviousMeta();
-                                appendFilterToOutput();
-                            } else {
-                                appendOperationIndicator(!isDelete, false);
-                                appendPreviousTupleAsMissing();
+    protected void createTupleProcessors(final boolean hasSecondaries) {
+        for (int i = 0; i < partitions.length; i++) {
+            ILSMIndexAccessor lsmAccessor = (ILSMIndexAccessor) indexAccessors[i];
+            IIndexCursor cursor = cursors[i];
+            LockThenSearchOperationCallback searchCallback = searchCallbacks[i];
+            IModificationOperationCallback modCallback = modCallbacks[i];
+            IFrameOperationCallback frameOpCallback = frameOpCallbacks[i];
+            processors[i] = new IFrameTupleProcessor() {
+                @Override
+                public void process(FrameTupleAccessor accessor, ITupleReference tuple, int index)
+                        throws HyracksDataException {
+                    try {
+                        tb.reset();
+                        AbstractIndexModificationOperationCallback abstractModCallback =
+                                (AbstractIndexModificationOperationCallback) modCallback;
+                        boolean recordWasInserted = false;
+                        boolean recordWasDeleted = false;
+                        boolean isDelete = isDeleteOperation(tuple, numOfPrimaryKeys);
+                        resetSearchPredicate(index);
+                        if (isFiltered || isDelete || hasSecondaries) {
+                            lsmAccessor.search(cursor, searchPred);
+                            try {
+                                if (cursor.hasNext()) {
+                                    cursor.next();
+                                    prevTuple = tupleProjector.project(cursor.getTuple(), dos, tb);
+                                    appendOperationIndicator(!isDelete, true);
+                                    appendFilterToPrevTuple();
+                                    appendPrevRecord();
+                                    appendPreviousMeta();
+                                    appendFilterToOutput();
+                                } else {
+                                    appendOperationIndicator(!isDelete, false);
+                                    appendPreviousTupleAsMissing();
+                                }
+                            } finally {
+                                cursor.close(); // end the search
                             }
-                        } finally {
-                            cursor.close(); // end the search
+                        } else {
+                            // simple upsert into a non-filtered dataset having no secondary indexes
+                            searchCallback.before(key); // lock
+                            appendOperationIndicator(true, false);
+                            appendPreviousTupleAsMissing();
                         }
-                    } else {
-                        // simple upsert into a non-filtered dataset having no secondary indexes
-                        searchCallback.before(key); // lock
-                        appendOperationIndicator(true, false);
-                        appendPreviousTupleAsMissing();
+                        beforeModification(tuple);
+                        if (isDelete && prevTuple != null) {
+                            // Only delete if it is a delete and not upsert
+                            // And previous tuple with the same key was found
+                            abstractModCallback.setOp(Operation.DELETE);
+                            lsmAccessor.forceDelete(tuple);
+                            recordWasDeleted = true;
+                        } else if (!isDelete) {
+                            abstractModCallback.setOp(Operation.UPSERT);
+                            lsmAccessor.forceUpsert(tuple);
+                            recordWasInserted = true;
+                        }
+                        if (isFiltered && prevTuple != null) {
+                            // need to update the filter of the new component with the previous value
+                            lsmAccessor.updateFilter(prevTuple);
+                        }
+                        writeOutput(index, recordWasInserted, recordWasDeleted, searchCallback);
+                    } catch (Exception e) {
+                        throw HyracksDataException.create(e);
                     }
-                    beforeModification(tuple);
-                    if (isDelete && prevTuple != null) {
-                        // Only delete if it is a delete and not upsert
-                        // And previous tuple with the same key was found
-                        abstractModCallback.setOp(Operation.DELETE);
-                        lsmAccessor.forceDelete(tuple);
-                        recordWasDeleted = true;
-                    } else if (!isDelete) {
-                        abstractModCallback.setOp(Operation.UPSERT);
-                        lsmAccessor.forceUpsert(tuple);
-                        recordWasInserted = true;
-                    }
-                    if (isFiltered && prevTuple != null) {
-                        // need to update the filter of the new component with the previous value
-                        lsmAccessor.updateFilter(prevTuple);
-                    }
-                    writeOutput(index, recordWasInserted, recordWasDeleted);
-                } catch (Exception e) {
-                    throw HyracksDataException.create(e);
                 }
-            }
 
-            @Override
-            public void start() throws HyracksDataException {
-                lsmAccessor.getCtx().setOperation(IndexOperation.UPSERT);
-            }
+                @Override
+                public void start() throws HyracksDataException {
+                    ((LSMTreeIndexAccessor) lsmAccessor).getCtx().setOperation(IndexOperation.UPSERT);
+                }
 
-            @Override
-            public void finish() throws HyracksDataException {
-                lsmAccessor.getCtx().setOperation(IndexOperation.UPSERT);
-            }
+                @Override
+                public void finish() throws HyracksDataException {
+                    ((LSMTreeIndexAccessor) lsmAccessor).getCtx().setOperation(IndexOperation.UPSERT);
+                }
 
-            @Override
-            public void fail(Throwable th) {
-                // We must fail before we exit the components
-                frameOpCallback.fail(th);
-            }
-        };
+                @Override
+                public void fail(Throwable th) {
+                    // We must fail before we exit the components
+                    frameOpCallback.fail(th);
+                }
+            };
+        }
     }
 
     // we have the permutation which has [pk locations, record location, optional:filter-location]
@@ -247,13 +270,8 @@
         accessor = new FrameTupleAccessor(inputRecDesc);
         writeBuffer = new VSizeFrame(ctx);
         writer.open();
-        indexHelper.open();
-        index = indexHelper.getIndexInstance();
+        writerOpen = true;
         try {
-            if (ctx.getSharedObject() != null) {
-                PrimaryIndexLogMarkerCallback callback = new PrimaryIndexLogMarkerCallback((AbstractLSMIndex) index);
-                TaskUtil.put(ILogMarkerCallback.KEY_MARKER_CALLBACK, callback, ctx);
-            }
             missingTupleBuilder = new ArrayTupleBuilder(1);
             DataOutput out = missingTupleBuilder.getDataOutput();
             try {
@@ -262,31 +280,50 @@
                 throw HyracksDataException.create(e);
             }
             missingTupleBuilder.addFieldEndOffset();
-            searchPred = createSearchPredicate();
             tb = new ArrayTupleBuilder(recordDesc.getFieldCount());
             dos = tb.getDataOutput();
             appender = new FrameTupleAppender(new VSizeFrame(ctx), true);
-            modCallback =
-                    modOpCallbackFactory.createModificationOperationCallback(indexHelper.getResource(), ctx, this);
-            abstractModCallback = (AbstractIndexModificationOperationCallback) modCallback;
-            searchCallback = (LockThenSearchOperationCallback) searchCallbackFactory
-                    .createSearchOperationCallback(indexHelper.getResource().getId(), ctx, this);
-            IIndexAccessParameters iap = new IndexAccessParameters(abstractModCallback, searchCallback);
-            iap.getParameters().put(HyracksConstants.TUPLE_PROJECTOR, tupleProjector);
-            indexAccessor = index.createAccessor(iap);
-            lsmAccessor = (LSMTreeIndexAccessor) indexAccessor;
-            cursor = indexAccessor.createSearchCursor(false);
-            frameTuple = new FrameTupleReference();
             INcApplicationContext appCtx =
                     (INcApplicationContext) ctx.getJobletContext().getServiceContext().getApplicationContext();
-            LSMIndexUtil.checkAndSetFirstLSN((AbstractLSMIndex) index,
-                    appCtx.getTransactionSubsystem().getLogManager());
-            frameOpCallback = new IFrameOperationCallback() {
+            for (int i = 0; i < indexHelpers.length; i++) {
+                IIndexDataflowHelper indexHelper = indexHelpers[i];
+                indexHelper.open();
+                indexes[i] = indexHelper.getIndexInstance();
+                if (ctx.getSharedObject() != null && i == 0) {
+                    PrimaryIndexLogMarkerCallback callback =
+                            new PrimaryIndexLogMarkerCallback((AbstractLSMIndex) indexes[0]);
+                    TaskUtil.put(ILogMarkerCallback.KEY_MARKER_CALLBACK, callback, ctx);
+                }
+                modCallbacks[i] =
+                        modOpCallbackFactory.createModificationOperationCallback(indexHelper.getResource(), ctx, this);
+                searchCallbacks[i] = (LockThenSearchOperationCallback) searchCallbackFactory
+                        .createSearchOperationCallback(indexHelper.getResource().getId(), ctx, this);
+                IIndexAccessParameters iap = new IndexAccessParameters(modCallbacks[i], searchCallbacks[i]);
+                iap.getParameters().put(HyracksConstants.TUPLE_PROJECTOR, tupleProjector);
+                indexAccessors[i] = indexes[i].createAccessor(iap);
+                cursors[i] = ((LSMTreeIndexAccessor) indexAccessors[i]).createSearchCursor(false);
+                LSMIndexUtil.checkAndSetFirstLSN((AbstractLSMIndex) indexes[i],
+                        appCtx.getTransactionSubsystem().getLogManager());
+            }
+            searchPred = createSearchPredicate(indexes[0]);
+            frameTuple = new FrameTupleReference();
+            createFrameOpCallbacks();
+            createTupleProcessors(hasSecondaries);
+        } catch (Throwable e) { // NOSONAR: Re-thrown
+            throw HyracksDataException.create(e);
+        }
+    }
+
+    private void createFrameOpCallbacks() throws HyracksDataException {
+        for (int i = 0; i < partitions.length; i++) {
+            LSMTreeIndexAccessor lsmAccessor = (LSMTreeIndexAccessor) indexAccessors[i];
+            frameOpCallbacks[i] = new IFrameOperationCallback() {
                 final IFrameOperationCallback callback =
-                        frameOpCallbackFactory.createFrameOperationCallback(ctx, (ILSMIndexAccessor) indexAccessor);
+                        frameOpCallbackFactory.createFrameOperationCallback(ctx, lsmAccessor);
 
                 @Override
                 public void frameCompleted() throws HyracksDataException {
+                    //TODO: mixed-frame vs frame-per-storage-partition
                     appender.write(writer, true);
                     callback.frameCompleted();
                 }
@@ -306,9 +343,7 @@
                     callback.open();
                 }
             };
-            frameOpCallback.open();
-        } catch (Throwable e) { // NOSONAR: Re-thrown
-            throw HyracksDataException.create(e);
+            frameOpCallbacks[i].open();
         }
     }
 
@@ -317,7 +352,8 @@
         searchPred.reset(key, key, true, true, keySearchCmp, keySearchCmp);
     }
 
-    protected void writeOutput(int tupleIndex, boolean recordWasInserted, boolean recordWasDeleted) throws IOException {
+    protected void writeOutput(int tupleIndex, boolean recordWasInserted, boolean recordWasDeleted,
+            LockThenSearchOperationCallback searchCallback) throws IOException {
         if (recordWasInserted || recordWasDeleted) {
             frameTuple.reset(accessor, tupleIndex);
             for (int i = 0; i < frameTuple.getFieldCount(); i++) {
@@ -346,8 +382,25 @@
     @Override
     public void nextFrame(ByteBuffer buffer) throws HyracksDataException {
         accessor.reset(buffer);
+        partition2TuplesMap.clear();
         int itemCount = accessor.getTupleCount();
-        lsmAccessor.batchOperate(accessor, tuple, processor, frameOpCallback);
+        for (int i = 0; i < itemCount; i++) {
+            int storagePartition = tuplePartitioner.partition(accessor, i);
+            int pIdx = storagePartitionId2Index.get(storagePartition);
+            IntSet tupleIndexes = partition2TuplesMap.computeIfAbsent(pIdx, k -> new IntOpenHashSet());
+            tupleIndexes.add(i);
+        }
+        // to ensure all partitions will be processed at least once, add partitions with missing tuples
+        for (int partition : storagePartitionId2Index.values()) {
+            partition2TuplesMap.computeIfAbsent(partition, k -> new IntOpenHashSet());
+        }
+        for (Int2ObjectMap.Entry<IntSet> p2tuplesMapEntry : partition2TuplesMap.int2ObjectEntrySet()) {
+            int pIdx = p2tuplesMapEntry.getIntKey();
+            LSMTreeIndexAccessor lsmAccessor = (LSMTreeIndexAccessor) indexAccessors[pIdx];
+            IFrameOperationCallback frameOpCallback = frameOpCallbacks[pIdx];
+            IFrameTupleProcessor processor = processors[pIdx];
+            lsmAccessor.batchOperate(accessor, tuple, processor, frameOpCallback, p2tuplesMapEntry.getValue());
+        }
         if (itemCount > 0) {
             lastRecordInTimeStamp = System.currentTimeMillis();
         }
@@ -441,7 +494,7 @@
         }
     }
 
-    private RangePredicate createSearchPredicate() {
+    private RangePredicate createSearchPredicate(IIndex index) {
         keySearchCmp = BTreeUtils.getSearchMultiComparator(((ITreeIndex) index).getComparatorFactories(), key);
         return new RangePredicate(key, key, true, true, keySearchCmp, keySearchCmp, null, null);
     }
@@ -449,10 +502,10 @@
     @Override
     public void close() throws HyracksDataException {
         traceLastRecordIn();
-        Throwable failure = CleanupUtils.close(frameOpCallback, null);
-        failure = CleanupUtils.destroy(failure, cursor);
+        Throwable failure = CleanupUtils.close(frameOpCallbacks, null);
+        failure = CleanupUtils.destroy(failure, cursors);
         failure = CleanupUtils.close(writer, failure);
-        failure = CleanupUtils.close(indexHelper, failure);
+        failure = CleanupUtils.close(indexHelpers, failure);
         if (failure != null) {
             throw HyracksDataException.create(failure);
         }
@@ -461,11 +514,11 @@
     @SuppressWarnings({ "squid:S1181", "squid:S1166" })
     private void traceLastRecordIn() {
         try {
-            if (tracer.isEnabled(traceCategory) && lastRecordInTimeStamp > 0 && indexHelper != null
-                    && indexHelper.getIndexInstance() != null) {
+            if (tracer.isEnabled(traceCategory) && lastRecordInTimeStamp > 0 && indexHelpers[0] != null
+                    && indexHelpers[0].getIndexInstance() != null) {
                 tracer.instant("UpsertClose", traceCategory, Scope.t,
                         () -> "{\"last-record-in\":\"" + DATE_FORMAT.get().format(new Date(lastRecordInTimeStamp))
-                                + "\", \"index\":" + indexHelper.getIndexInstance().toString() + "}");
+                                + "\", \"index\":" + indexHelpers[0].getIndexInstance().toString() + "}");
             }
         } catch (Throwable traceFailure) {
             try {
diff --git a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/operators/LSMSecondaryInsertDeleteWithNestedPlanOperatorDescriptor.java b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/operators/LSMSecondaryInsertDeleteWithNestedPlanOperatorDescriptor.java
index ae03be2..76c3207 100644
--- a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/operators/LSMSecondaryInsertDeleteWithNestedPlanOperatorDescriptor.java
+++ b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/operators/LSMSecondaryInsertDeleteWithNestedPlanOperatorDescriptor.java
@@ -24,6 +24,7 @@
 import org.apache.hyracks.api.context.IHyracksTaskContext;
 import org.apache.hyracks.api.dataflow.IOperatorNodePushable;
 import org.apache.hyracks.api.dataflow.value.IRecordDescriptorProvider;
+import org.apache.hyracks.api.dataflow.value.ITuplePartitionerFactory;
 import org.apache.hyracks.api.dataflow.value.RecordDescriptor;
 import org.apache.hyracks.api.exceptions.HyracksDataException;
 import org.apache.hyracks.api.job.JobSpecification;
@@ -39,8 +40,10 @@
 
     public LSMSecondaryInsertDeleteWithNestedPlanOperatorDescriptor(JobSpecification spec, RecordDescriptor outRecDesc,
             int[] fieldPermutation, IndexOperation op, IIndexDataflowHelperFactory indexHelperFactory,
-            IModificationOperationCallbackFactory modCallbackFactory, List<AlgebricksPipeline> secondaryKeysPipeline) {
-        super(spec, outRecDesc, indexHelperFactory, fieldPermutation, op, modCallbackFactory, null);
+            IModificationOperationCallbackFactory modCallbackFactory, List<AlgebricksPipeline> secondaryKeysPipeline,
+            ITuplePartitionerFactory tuplePartitionerFactory, int[][] partitionsMap) {
+        super(spec, outRecDesc, indexHelperFactory, fieldPermutation, op, modCallbackFactory, null,
+                tuplePartitionerFactory, partitionsMap);
         this.secondaryKeysPipeline = secondaryKeysPipeline;
     }
 
@@ -49,6 +52,7 @@
             IRecordDescriptorProvider recordDescProvider, int partition, int nPartitions) throws HyracksDataException {
         RecordDescriptor inputRecDesc = recordDescProvider.getInputRecordDescriptor(getActivityId(), 0);
         return new LSMSecondaryInsertDeleteWithNestedPlanOperatorNodePushable(ctx, partition, fieldPermutation,
-                inputRecDesc, op, indexHelperFactory, modCallbackFactory, secondaryKeysPipeline);
+                inputRecDesc, op, indexHelperFactory, modCallbackFactory, secondaryKeysPipeline,
+                tuplePartitionerFactory, partitionsMap);
     }
 }
diff --git a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/operators/LSMSecondaryInsertDeleteWithNestedPlanOperatorNodePushable.java b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/operators/LSMSecondaryInsertDeleteWithNestedPlanOperatorNodePushable.java
index 5712991..465faea 100644
--- a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/operators/LSMSecondaryInsertDeleteWithNestedPlanOperatorNodePushable.java
+++ b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/operators/LSMSecondaryInsertDeleteWithNestedPlanOperatorNodePushable.java
@@ -29,6 +29,7 @@
 import org.apache.hyracks.algebricks.runtime.operators.std.NestedTupleSourceRuntimeFactory.NestedTupleSourceRuntime;
 import org.apache.hyracks.api.comm.IFrameWriter;
 import org.apache.hyracks.api.context.IHyracksTaskContext;
+import org.apache.hyracks.api.dataflow.value.ITuplePartitionerFactory;
 import org.apache.hyracks.api.dataflow.value.RecordDescriptor;
 import org.apache.hyracks.api.exceptions.HyracksDataException;
 import org.apache.hyracks.dataflow.common.comm.io.FrameTupleAccessor;
@@ -49,8 +50,10 @@
     public LSMSecondaryInsertDeleteWithNestedPlanOperatorNodePushable(IHyracksTaskContext ctx, int partition,
             int[] fieldPermutation, RecordDescriptor inputRecDesc, IndexOperation op,
             IIndexDataflowHelperFactory indexHelperFactory, IModificationOperationCallbackFactory modCallbackFactory,
-            List<AlgebricksPipeline> secondaryKeysPipeline) throws HyracksDataException {
-        super(ctx, partition, indexHelperFactory, fieldPermutation, inputRecDesc, op, modCallbackFactory, null);
+            List<AlgebricksPipeline> secondaryKeysPipeline, ITuplePartitionerFactory tuplePartitionerFactory,
+            int[][] partitionsMap) throws HyracksDataException {
+        super(ctx, partition, indexHelperFactory, fieldPermutation, inputRecDesc, op, modCallbackFactory, null,
+                tuplePartitionerFactory, partitionsMap);
         this.numberOfPrimaryKeyAndFilterFields = fieldPermutation.length;
 
         // Build our pipeline.
@@ -138,8 +141,6 @@
 
         @Override
         public void nextFrame(ByteBuffer buffer) throws HyracksDataException {
-            ILSMIndexAccessor workingLSMAccessor = (ILSMIndexAccessor) indexAccessor;
-
             endOfPipelineTupleAccessor.reset(buffer);
             int nTuple = endOfPipelineTupleAccessor.getTupleCount();
             for (int t = 0; t < nTuple; t++) {
@@ -152,6 +153,9 @@
                 // Add the primary keys and filter fields.
                 endTupleReference.addTuple(tuple);
 
+                int storagePartition = tuplePartitioner.partition(tuple.getFrameTupleAccessor(), tuple.getTupleIndex());
+                int storageIdx = storagePartitionId2Index.get(storagePartition);
+                ILSMIndexAccessor workingLSMAccessor = (ILSMIndexAccessor) indexAccessors[storageIdx];
                 // Pass the tuple to our accessor. There are only two operations: insert or delete.
                 if (op.equals(IndexOperation.INSERT)) {
                     try {
@@ -161,7 +165,6 @@
                             throw e;
                         }
                     }
-
                 } else {
                     try {
                         workingLSMAccessor.forceDelete(endTupleReference);
diff --git a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/operators/LSMSecondaryUpsertOperatorDescriptor.java b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/operators/LSMSecondaryUpsertOperatorDescriptor.java
index 9de0827..5b5d013 100644
--- a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/operators/LSMSecondaryUpsertOperatorDescriptor.java
+++ b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/operators/LSMSecondaryUpsertOperatorDescriptor.java
@@ -23,6 +23,7 @@
 import org.apache.hyracks.api.context.IHyracksTaskContext;
 import org.apache.hyracks.api.dataflow.IOperatorNodePushable;
 import org.apache.hyracks.api.dataflow.value.IRecordDescriptorProvider;
+import org.apache.hyracks.api.dataflow.value.ITuplePartitionerFactory;
 import org.apache.hyracks.api.dataflow.value.RecordDescriptor;
 import org.apache.hyracks.api.exceptions.HyracksDataException;
 import org.apache.hyracks.api.job.IOperatorDescriptorRegistry;
@@ -43,9 +44,10 @@
             int[] fieldPermutation, IIndexDataflowHelperFactory indexHelperFactory,
             ITupleFilterFactory tupleFilterFactory, ITupleFilterFactory prevTupleFilterFactory,
             IModificationOperationCallbackFactory modificationOpCallbackFactory, int operationFieldIndex,
-            IBinaryIntegerInspectorFactory operationInspectorFactory, int[] prevValuePermutation) {
+            IBinaryIntegerInspectorFactory operationInspectorFactory, int[] prevValuePermutation,
+            ITuplePartitionerFactory tuplePartitionerFactory, int[][] partitionsMap) {
         super(spec, outRecDesc, fieldPermutation, IndexOperation.UPSERT, indexHelperFactory, tupleFilterFactory, false,
-                modificationOpCallbackFactory);
+                modificationOpCallbackFactory, tuplePartitionerFactory, partitionsMap);
         this.prevValuePermutation = prevValuePermutation;
         this.operationFieldIndex = operationFieldIndex;
         this.operationInspectorFactory = operationInspectorFactory;
@@ -58,6 +60,6 @@
         RecordDescriptor intputRecDesc = recordDescProvider.getInputRecordDescriptor(getActivityId(), 0);
         return new LSMSecondaryUpsertOperatorNodePushable(ctx, partition, indexHelperFactory, modCallbackFactory,
                 tupleFilterFactory, prevTupleFilterFactory, fieldPermutation, intputRecDesc, operationFieldIndex,
-                operationInspectorFactory, prevValuePermutation);
+                operationInspectorFactory, prevValuePermutation, tuplePartitionerFactory, partitionsMap);
     }
 }
diff --git a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/operators/LSMSecondaryUpsertOperatorNodePushable.java b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/operators/LSMSecondaryUpsertOperatorNodePushable.java
index 955d5aa..6d5e88b 100644
--- a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/operators/LSMSecondaryUpsertOperatorNodePushable.java
+++ b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/operators/LSMSecondaryUpsertOperatorNodePushable.java
@@ -27,6 +27,7 @@
 import org.apache.hyracks.algebricks.data.IBinaryIntegerInspector;
 import org.apache.hyracks.algebricks.data.IBinaryIntegerInspectorFactory;
 import org.apache.hyracks.api.context.IHyracksTaskContext;
+import org.apache.hyracks.api.dataflow.value.ITuplePartitionerFactory;
 import org.apache.hyracks.api.dataflow.value.RecordDescriptor;
 import org.apache.hyracks.api.exceptions.HyracksDataException;
 import org.apache.hyracks.dataflow.common.comm.util.FrameUtils;
@@ -63,16 +64,15 @@
 
     protected final int operationFieldIndex;
     protected final IBinaryIntegerInspector operationInspector;
-    protected AbstractIndexModificationOperationCallback abstractModCallback;
 
     public LSMSecondaryUpsertOperatorNodePushable(IHyracksTaskContext ctx, int partition,
             IIndexDataflowHelperFactory indexHelperFactory, IModificationOperationCallbackFactory modCallbackFactory,
             ITupleFilterFactory tupleFilterFactory, ITupleFilterFactory prevTupleFilterFactory, int[] fieldPermutation,
             RecordDescriptor inputRecDesc, int operationFieldIndex,
-            IBinaryIntegerInspectorFactory operationInspectorFactory, int[] prevTuplePermutation)
-            throws HyracksDataException {
+            IBinaryIntegerInspectorFactory operationInspectorFactory, int[] prevTuplePermutation,
+            ITuplePartitionerFactory tuplePartitionerFactory, int[][] partitionsMap) throws HyracksDataException {
         super(ctx, partition, indexHelperFactory, fieldPermutation, inputRecDesc, IndexOperation.UPSERT,
-                modCallbackFactory, tupleFilterFactory);
+                modCallbackFactory, tupleFilterFactory, tuplePartitionerFactory, partitionsMap);
         this.prevTuple.setFieldPermutation(prevTuplePermutation);
         this.operationFieldIndex = operationFieldIndex;
         this.operationInspector = operationInspectorFactory.createBinaryIntegerInspector(ctx);
@@ -85,7 +85,6 @@
     public void open() throws HyracksDataException {
         super.open();
         frameTuple = new FrameTupleReference();
-        abstractModCallback = (AbstractIndexModificationOperationCallback) modCallback;
         if (prevTupleFilterFactory != null) {
             prevTupleFilter = prevTupleFilterFactory.createTupleFilter(ctx);
         }
@@ -94,12 +93,16 @@
     @Override
     public void nextFrame(ByteBuffer buffer) throws HyracksDataException {
         accessor.reset(buffer);
-        ILSMIndexAccessor lsmAccessor = (ILSMIndexAccessor) indexAccessor;
         int tupleCount = accessor.getTupleCount();
         boolean tupleFilterIsNull = tupleFilter == null;
         boolean prevTupleFilterIsNull = prevTupleFilter == null;
         for (int i = 0; i < tupleCount; i++) {
             try {
+                int storagePartition = tuplePartitioner.partition(accessor, i);
+                int storageIdx = storagePartitionId2Index.get(storagePartition);
+                ILSMIndexAccessor lsmAccessor = (ILSMIndexAccessor) indexAccessors[storageIdx];
+                AbstractIndexModificationOperationCallback abstractModCallback =
+                        (AbstractIndexModificationOperationCallback) modCallbacks[storageIdx];
                 frameTuple.reset(accessor, i);
                 int operation = operationInspector.getIntegerValue(frameTuple.getFieldData(operationFieldIndex),
                         frameTuple.getFieldStart(operationFieldIndex), frameTuple.getFieldLength(operationFieldIndex));
diff --git a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/operators/LSMSecondaryUpsertWithNestedPlanOperatorDescriptor.java b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/operators/LSMSecondaryUpsertWithNestedPlanOperatorDescriptor.java
index 41bd0fb..e56ee0c 100644
--- a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/operators/LSMSecondaryUpsertWithNestedPlanOperatorDescriptor.java
+++ b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/operators/LSMSecondaryUpsertWithNestedPlanOperatorDescriptor.java
@@ -25,6 +25,7 @@
 import org.apache.hyracks.api.context.IHyracksTaskContext;
 import org.apache.hyracks.api.dataflow.IOperatorNodePushable;
 import org.apache.hyracks.api.dataflow.value.IRecordDescriptorProvider;
+import org.apache.hyracks.api.dataflow.value.ITuplePartitionerFactory;
 import org.apache.hyracks.api.dataflow.value.RecordDescriptor;
 import org.apache.hyracks.api.exceptions.HyracksDataException;
 import org.apache.hyracks.api.job.JobSpecification;
@@ -41,9 +42,10 @@
             int[] fieldPermutation, IIndexDataflowHelperFactory indexHelperFactory,
             IModificationOperationCallbackFactory modCallbackFactory, int operationFieldIndex,
             IBinaryIntegerInspectorFactory operationInspectorFactory, List<AlgebricksPipeline> secondaryKeysPipeline,
-            List<AlgebricksPipeline> prevSecondaryKeysPipeline) {
+            List<AlgebricksPipeline> prevSecondaryKeysPipeline, ITuplePartitionerFactory tuplePartitionerFactory,
+            int[][] partitionsMap) {
         super(spec, outRecDesc, fieldPermutation, indexHelperFactory, null, null, modCallbackFactory,
-                operationFieldIndex, operationInspectorFactory, null);
+                operationFieldIndex, operationInspectorFactory, null, tuplePartitionerFactory, partitionsMap);
         this.secondaryKeysPipeline = secondaryKeysPipeline;
         this.prevSecondaryKeysPipeline = prevSecondaryKeysPipeline;
     }
@@ -54,6 +56,6 @@
         RecordDescriptor inputRecDesc = recordDescProvider.getInputRecordDescriptor(getActivityId(), 0);
         return new LSMSecondaryUpsertWithNestedPlanOperatorNodePushable(ctx, partition, indexHelperFactory,
                 modCallbackFactory, fieldPermutation, inputRecDesc, operationFieldIndex, operationInspectorFactory,
-                secondaryKeysPipeline, prevSecondaryKeysPipeline);
+                secondaryKeysPipeline, prevSecondaryKeysPipeline, tuplePartitionerFactory, partitionsMap);
     }
 }
diff --git a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/operators/LSMSecondaryUpsertWithNestedPlanOperatorNodePushable.java b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/operators/LSMSecondaryUpsertWithNestedPlanOperatorNodePushable.java
index 08fd566..17858a3 100644
--- a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/operators/LSMSecondaryUpsertWithNestedPlanOperatorNodePushable.java
+++ b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/operators/LSMSecondaryUpsertWithNestedPlanOperatorNodePushable.java
@@ -31,6 +31,7 @@
 import org.apache.hyracks.algebricks.runtime.operators.std.NestedTupleSourceRuntimeFactory.NestedTupleSourceRuntime;
 import org.apache.hyracks.api.comm.IFrameWriter;
 import org.apache.hyracks.api.context.IHyracksTaskContext;
+import org.apache.hyracks.api.dataflow.value.ITuplePartitionerFactory;
 import org.apache.hyracks.api.dataflow.value.RecordDescriptor;
 import org.apache.hyracks.api.exceptions.HyracksDataException;
 import org.apache.hyracks.dataflow.common.comm.io.FrameTupleAccessor;
@@ -50,9 +51,10 @@
             IIndexDataflowHelperFactory indexHelperFactory, IModificationOperationCallbackFactory modCallbackFactory,
             int[] fieldPermutation, RecordDescriptor inputRecDesc, int operationFieldIndex,
             IBinaryIntegerInspectorFactory operationInspectorFactory, List<AlgebricksPipeline> secondaryKeysPipeline,
-            List<AlgebricksPipeline> prevSecondaryKeysPipeline) throws HyracksDataException {
+            List<AlgebricksPipeline> prevSecondaryKeysPipeline, ITuplePartitionerFactory tuplePartitionerFactory,
+            int[][] partitionsMap) throws HyracksDataException {
         super(ctx, partition, indexHelperFactory, modCallbackFactory, null, null, fieldPermutation, inputRecDesc,
-                operationFieldIndex, operationInspectorFactory, null);
+                operationFieldIndex, operationInspectorFactory, null, tuplePartitionerFactory, partitionsMap);
         this.numberOfPrimaryKeyAndFilterFields = fieldPermutation.length;
         this.startOfNewKeyPipelines = buildStartOfPipelines(secondaryKeysPipeline, inputRecDesc, false);
         this.startOfPrevKeyPipelines = buildStartOfPipelines(prevSecondaryKeysPipeline, inputRecDesc, true);
@@ -94,7 +96,6 @@
     public void open() throws HyracksDataException {
         super.open();
         frameTuple = new FrameTupleReference();
-        abstractModCallback = (AbstractIndexModificationOperationCallback) modCallback;
     }
 
     @Override
@@ -168,7 +169,6 @@
 
         @Override
         public void nextFrame(ByteBuffer buffer) throws HyracksDataException {
-            ILSMIndexAccessor workingLSMAccessor = (ILSMIndexAccessor) indexAccessor;
 
             endOfPipelineTupleAccessor.reset(buffer);
             int nTuple = endOfPipelineTupleAccessor.getTupleCount();
@@ -187,6 +187,11 @@
                 // Add the primary keys and filter fields.
                 endTupleReference.addTuple(tuple);
 
+                int storagePartition = tuplePartitioner.partition(tuple.getFrameTupleAccessor(), tuple.getTupleIndex());
+                int storageIdx = storagePartitionId2Index.get(storagePartition);
+                ILSMIndexAccessor workingLSMAccessor = (ILSMIndexAccessor) indexAccessors[storageIdx];
+                AbstractIndexModificationOperationCallback abstractModCallback =
+                        (AbstractIndexModificationOperationCallback) modCallbacks[storageIdx];
                 // Finally, pass the tuple to our accessor. There are only two operations: insert or delete.
                 if (this.isInsert) {
                     abstractModCallback.setOp(AbstractIndexModificationOperationCallback.Operation.INSERT);
diff --git a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/util/CleanupUtils.java b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/util/CleanupUtils.java
index 220311e..7490220 100644
--- a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/util/CleanupUtils.java
+++ b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/util/CleanupUtils.java
@@ -106,6 +106,25 @@
      * Close the AutoCloseable and suppress any Throwable thrown by the close call.
      * This method must NEVER throw any Throwable
      *
+     * @param closables
+     *            the resource to close
+     * @param root
+     *            the first exception encountered during release of resources
+     * @return the root Throwable if not null or a new Throwable if any was thrown, otherwise, it returns null
+     */
+    public static Throwable close(AutoCloseable[] closables, Throwable root) {
+        if (closables != null) {
+            for (AutoCloseable closable : closables) {
+                root = close(closable, root);
+            }
+        }
+        return root;
+    }
+
+    /**
+     * Close the AutoCloseable and suppress any Throwable thrown by the close call.
+     * This method must NEVER throw any Throwable
+     *
      * @param closable
      *            the resource to close
      * @param root
diff --git a/hyracks-fullstack/hyracks/hyracks-examples/btree-example/btreeclient/src/main/java/org/apache/hyracks/examples/btree/client/InsertPipelineExample.java b/hyracks-fullstack/hyracks/hyracks-examples/btree-example/btreeclient/src/main/java/org/apache/hyracks/examples/btree/client/InsertPipelineExample.java
index 292592b..cc922ff 100644
--- a/hyracks-fullstack/hyracks/hyracks-examples/btree-example/btreeclient/src/main/java/org/apache/hyracks/examples/btree/client/InsertPipelineExample.java
+++ b/hyracks-fullstack/hyracks/hyracks-examples/btree-example/btreeclient/src/main/java/org/apache/hyracks/examples/btree/client/InsertPipelineExample.java
@@ -25,6 +25,7 @@
 import org.apache.hyracks.api.dataflow.value.IBinaryComparatorFactory;
 import org.apache.hyracks.api.dataflow.value.IBinaryHashFunctionFactory;
 import org.apache.hyracks.api.dataflow.value.ISerializerDeserializer;
+import org.apache.hyracks.api.dataflow.value.ITuplePartitionerFactory;
 import org.apache.hyracks.api.dataflow.value.ITypeTraits;
 import org.apache.hyracks.api.dataflow.value.RecordDescriptor;
 import org.apache.hyracks.api.job.JobId;
@@ -37,6 +38,7 @@
 import org.apache.hyracks.dataflow.common.data.marshalling.IntegerSerializerDeserializer;
 import org.apache.hyracks.dataflow.common.data.marshalling.UTF8StringSerializerDeserializer;
 import org.apache.hyracks.dataflow.common.data.partition.FieldHashPartitionComputerFactory;
+import org.apache.hyracks.dataflow.common.data.partition.FieldHashPartitionerFactory;
 import org.apache.hyracks.dataflow.std.connectors.MToNPartitioningConnectorDescriptor;
 import org.apache.hyracks.dataflow.std.connectors.OneToOneConnectorDescriptor;
 import org.apache.hyracks.dataflow.std.file.IFileSplitProvider;
@@ -148,10 +150,16 @@
         IIndexDataflowHelperFactory primaryHelperFactory =
                 new IndexDataflowHelperFactory(storageManager, primarySplitProvider);
 
+        int[][] partitionsMap = getPartitionsMap(splitNCs.length);
+        int[] pkFields = new int[] { primaryFieldPermutation[0] };
+        IBinaryHashFunctionFactory[] pkHashFunFactories =
+                new IBinaryHashFunctionFactory[] { PointableBinaryHashFunctionFactory.of(IntegerPointable.FACTORY) };
+        ITuplePartitionerFactory tuplePartitionerFactory =
+                new FieldHashPartitionerFactory(pkFields, pkHashFunFactories, splitNCs.length);
         // create operator descriptor
-        TreeIndexInsertUpdateDeleteOperatorDescriptor primaryInsert =
-                new TreeIndexInsertUpdateDeleteOperatorDescriptor(spec, recDesc, primaryFieldPermutation,
-                        IndexOperation.INSERT, primaryHelperFactory, null, NoOpOperationCallbackFactory.INSTANCE);
+        TreeIndexInsertUpdateDeleteOperatorDescriptor primaryInsert = new TreeIndexInsertUpdateDeleteOperatorDescriptor(
+                spec, recDesc, primaryFieldPermutation, IndexOperation.INSERT, primaryHelperFactory, null,
+                NoOpOperationCallbackFactory.INSTANCE, tuplePartitionerFactory, partitionsMap);
         JobHelper.createPartitionConstraint(spec, primaryInsert, splitNCs);
 
         // prepare insertion into secondary index
@@ -174,9 +182,15 @@
         IIndexDataflowHelperFactory secondaryHelperFactory =
                 new IndexDataflowHelperFactory(storageManager, secondarySplitProvider);
         // create operator descriptor
+        int[] pkFields2 = new int[] { secondaryFieldPermutation[1] };
+        IBinaryHashFunctionFactory[] pkHashFunFactories2 =
+                new IBinaryHashFunctionFactory[] { PointableBinaryHashFunctionFactory.of(IntegerPointable.FACTORY) };
+        ITuplePartitionerFactory tuplePartitionerFactory2 =
+                new FieldHashPartitionerFactory(pkFields2, pkHashFunFactories2, splitNCs.length);
         TreeIndexInsertUpdateDeleteOperatorDescriptor secondaryInsert =
                 new TreeIndexInsertUpdateDeleteOperatorDescriptor(spec, recDesc, secondaryFieldPermutation,
-                        IndexOperation.INSERT, secondaryHelperFactory, null, NoOpOperationCallbackFactory.INSTANCE);
+                        IndexOperation.INSERT, secondaryHelperFactory, null, NoOpOperationCallbackFactory.INSTANCE,
+                        tuplePartitionerFactory2, partitionsMap);
         JobHelper.createPartitionConstraint(spec, secondaryInsert, splitNCs);
 
         // end the insert pipeline at this sink operator
@@ -202,4 +216,12 @@
 
         return spec;
     }
+
+    public static int[][] getPartitionsMap(int numPartitions) {
+        int[][] map = new int[numPartitions][1];
+        for (int i = 0; i < numPartitions; i++) {
+            map[i] = new int[] { i };
+        }
+        return map;
+    }
 }
diff --git a/hyracks-fullstack/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/org/apache/hyracks/tests/am/btree/AbstractBTreeOperatorTest.java b/hyracks-fullstack/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/org/apache/hyracks/tests/am/btree/AbstractBTreeOperatorTest.java
index eac332d..80ce5c4 100644
--- a/hyracks-fullstack/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/org/apache/hyracks/tests/am/btree/AbstractBTreeOperatorTest.java
+++ b/hyracks-fullstack/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/org/apache/hyracks/tests/am/btree/AbstractBTreeOperatorTest.java
@@ -22,10 +22,13 @@
 import static org.apache.hyracks.tests.am.btree.DataSetConstants.inputParserFactories;
 import static org.apache.hyracks.tests.am.btree.DataSetConstants.inputRecordDesc;
 import static org.apache.hyracks.tests.am.btree.DataSetConstants.primaryFieldPermutation;
+import static org.apache.hyracks.tests.am.btree.DataSetConstants.primaryHashFunFactories;
 import static org.apache.hyracks.tests.am.btree.DataSetConstants.primaryKeyFieldCount;
+import static org.apache.hyracks.tests.am.btree.DataSetConstants.primaryKeyFieldPermutation;
 import static org.apache.hyracks.tests.am.btree.DataSetConstants.primaryRecDesc;
 import static org.apache.hyracks.tests.am.btree.DataSetConstants.secondaryFieldPermutationA;
 import static org.apache.hyracks.tests.am.btree.DataSetConstants.secondaryFieldPermutationB;
+import static org.apache.hyracks.tests.am.btree.DataSetConstants.secondaryPKFieldPermutationB;
 import static org.apache.hyracks.tests.am.btree.DataSetConstants.secondaryRecDesc;
 
 import java.io.DataOutput;
@@ -33,6 +36,7 @@
 
 import org.apache.hyracks.api.constraints.PartitionConstraintHelper;
 import org.apache.hyracks.api.dataflow.value.IBinaryComparatorFactory;
+import org.apache.hyracks.api.dataflow.value.ITuplePartitionerFactory;
 import org.apache.hyracks.api.dataflow.value.RecordDescriptor;
 import org.apache.hyracks.api.exceptions.HyracksDataException;
 import org.apache.hyracks.api.io.FileSplit;
@@ -41,6 +45,7 @@
 import org.apache.hyracks.data.std.accessors.UTF8StringBinaryComparatorFactory;
 import org.apache.hyracks.dataflow.common.comm.io.ArrayTupleBuilder;
 import org.apache.hyracks.dataflow.common.data.marshalling.UTF8StringSerializerDeserializer;
+import org.apache.hyracks.dataflow.common.data.partition.FieldHashPartitionerFactory;
 import org.apache.hyracks.dataflow.std.connectors.OneToOneConnectorDescriptor;
 import org.apache.hyracks.dataflow.std.file.ConstantFileSplitProvider;
 import org.apache.hyracks.dataflow.std.file.DelimitedDataTupleParserFactory;
@@ -69,6 +74,7 @@
 import org.apache.hyracks.tests.am.common.ITreeIndexOperatorTestHelper;
 import org.apache.hyracks.tests.am.common.TreeOperatorTestHelper;
 import org.apache.hyracks.tests.integration.AbstractIntegrationTest;
+import org.apache.hyracks.tests.integration.TestUtil;
 import org.junit.After;
 import org.junit.Before;
 
@@ -226,17 +232,26 @@
                 new DelimitedDataTupleParserFactory(inputParserFactories, '|'), ordersDesc);
         PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, ordScanner, NC1_ID);
 
+        int[][] partitionsMap = TestUtil.getPartitionsMap(ordersSplits.length);
+        ITuplePartitionerFactory tuplePartitionerFactory = new FieldHashPartitionerFactory(primaryKeyFieldPermutation,
+                primaryHashFunFactories, ordersSplits.length);
+
         // insert into primary index
         TreeIndexInsertUpdateDeleteOperatorDescriptor primaryBtreeInsertOp =
                 new TreeIndexInsertUpdateDeleteOperatorDescriptor(spec, ordersDesc, primaryFieldPermutation,
-                        pipelineOperation, primaryHelperFactory, null, NoOpOperationCallbackFactory.INSTANCE);
+                        pipelineOperation, primaryHelperFactory, null, NoOpOperationCallbackFactory.INSTANCE,
+                        tuplePartitionerFactory, partitionsMap);
         PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, primaryBtreeInsertOp, NC1_ID);
 
         // first secondary index
         int[] fieldPermutationB = secondaryFieldPermutationB;
+        ITuplePartitionerFactory tuplePartitionerFactory2 = new FieldHashPartitionerFactory(
+                secondaryPKFieldPermutationB, primaryHashFunFactories, ordersSplits.length);
+
         TreeIndexInsertUpdateDeleteOperatorDescriptor secondaryInsertOp =
                 new TreeIndexInsertUpdateDeleteOperatorDescriptor(spec, ordersDesc, fieldPermutationB,
-                        pipelineOperation, secondaryHelperFactory, null, NoOpOperationCallbackFactory.INSTANCE);
+                        pipelineOperation, secondaryHelperFactory, null, NoOpOperationCallbackFactory.INSTANCE,
+                        tuplePartitionerFactory2, partitionsMap);
         PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, secondaryInsertOp, NC1_ID);
 
         NullSinkOperatorDescriptor nullSink = new NullSinkOperatorDescriptor(spec);
diff --git a/hyracks-fullstack/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/org/apache/hyracks/tests/am/btree/DataSetConstants.java b/hyracks-fullstack/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/org/apache/hyracks/tests/am/btree/DataSetConstants.java
index fbb59e2..8ff1736 100644
--- a/hyracks-fullstack/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/org/apache/hyracks/tests/am/btree/DataSetConstants.java
+++ b/hyracks-fullstack/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/org/apache/hyracks/tests/am/btree/DataSetConstants.java
@@ -20,9 +20,11 @@
 package org.apache.hyracks.tests.am.btree;
 
 import org.apache.hyracks.api.dataflow.value.IBinaryComparatorFactory;
+import org.apache.hyracks.api.dataflow.value.IBinaryHashFunctionFactory;
 import org.apache.hyracks.api.dataflow.value.ISerializerDeserializer;
 import org.apache.hyracks.api.dataflow.value.ITypeTraits;
 import org.apache.hyracks.api.dataflow.value.RecordDescriptor;
+import org.apache.hyracks.data.std.accessors.PointableBinaryHashFunctionFactory;
 import org.apache.hyracks.data.std.accessors.UTF8StringBinaryComparatorFactory;
 import org.apache.hyracks.data.std.primitive.UTF8StringPointable;
 import org.apache.hyracks.dataflow.common.data.marshalling.UTF8StringSerializerDeserializer;
@@ -45,6 +47,7 @@
 
     // field, type and key declarations for primary index
     public static int[] primaryFieldPermutation = { 0, 1, 2, 4, 5, 7 };
+    public static int[] primaryKeyFieldPermutation = new int[] { 0 };
     public static final int[] primaryFilterFields = new int[] { 0 };
     public static final int[] primaryBtreeFields = new int[] { 0, 1, 2, 3, 4, 5 };
 
@@ -58,6 +61,8 @@
 
     public static final IBinaryComparatorFactory[] primaryComparatorFactories =
             new IBinaryComparatorFactory[] { UTF8StringBinaryComparatorFactory.INSTANCE };
+    public static final IBinaryHashFunctionFactory[] primaryHashFunFactories =
+            new IBinaryHashFunctionFactory[] { PointableBinaryHashFunctionFactory.of(UTF8StringPointable.FACTORY) };
     public static final int primaryKeyFieldCount = primaryComparatorFactories.length;
 
     public static final int[] primaryBloomFilterKeyFields = new int[] { 0 };
@@ -78,6 +83,7 @@
     public static final int secondaryKeyFieldCount = 2;
     public static final int[] secondaryFieldPermutationA = { 3, 0 };
     public static final int[] secondaryFieldPermutationB = { 4, 0 };
+    public static final int[] secondaryPKFieldPermutationB = { 1 };
     public static final int[] secondaryFilterFields = new int[] { 1 };
     public static final int[] secondaryBtreeFields = new int[] { 0, 1 };
     public static final int[] secondaryBloomFilterKeyFields = new int[] { 0, 1 };
diff --git a/hyracks-fullstack/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/org/apache/hyracks/tests/am/rtree/AbstractRTreeOperatorTest.java b/hyracks-fullstack/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/org/apache/hyracks/tests/am/rtree/AbstractRTreeOperatorTest.java
index f849bc8..71ee656 100644
--- a/hyracks-fullstack/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/org/apache/hyracks/tests/am/rtree/AbstractRTreeOperatorTest.java
+++ b/hyracks-fullstack/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/org/apache/hyracks/tests/am/rtree/AbstractRTreeOperatorTest.java
@@ -19,6 +19,9 @@
 
 package org.apache.hyracks.tests.am.rtree;
 
+import static org.apache.hyracks.tests.am.btree.DataSetConstants.primaryHashFunFactories;
+import static org.apache.hyracks.tests.am.btree.DataSetConstants.primaryKeyFieldPermutation;
+
 import java.io.DataOutput;
 import java.io.File;
 
@@ -26,6 +29,7 @@
 import org.apache.hyracks.api.dataflow.value.IBinaryComparatorFactory;
 import org.apache.hyracks.api.dataflow.value.ILinearizeComparatorFactory;
 import org.apache.hyracks.api.dataflow.value.ISerializerDeserializer;
+import org.apache.hyracks.api.dataflow.value.ITuplePartitionerFactory;
 import org.apache.hyracks.api.dataflow.value.ITypeTraits;
 import org.apache.hyracks.api.dataflow.value.RecordDescriptor;
 import org.apache.hyracks.api.exceptions.HyracksDataException;
@@ -42,6 +46,7 @@
 import org.apache.hyracks.dataflow.common.data.parsers.DoubleParserFactory;
 import org.apache.hyracks.dataflow.common.data.parsers.IValueParserFactory;
 import org.apache.hyracks.dataflow.common.data.parsers.UTF8StringParserFactory;
+import org.apache.hyracks.dataflow.common.data.partition.FieldHashPartitionerFactory;
 import org.apache.hyracks.dataflow.std.connectors.OneToOneConnectorDescriptor;
 import org.apache.hyracks.dataflow.std.file.ConstantFileSplitProvider;
 import org.apache.hyracks.dataflow.std.file.DelimitedDataTupleParserFactory;
@@ -74,6 +79,7 @@
 import org.apache.hyracks.test.support.TestStorageManagerComponentHolder;
 import org.apache.hyracks.tests.am.common.ITreeIndexOperatorTestHelper;
 import org.apache.hyracks.tests.integration.AbstractIntegrationTest;
+import org.apache.hyracks.tests.integration.TestUtil;
 import org.junit.After;
 import org.junit.Before;
 
@@ -367,18 +373,25 @@
                 ordersDesc);
         PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, ordScanner, NC1_ID);
 
+        int[][] partitionsMap = TestUtil.getPartitionsMap(ordersSplits.length);
         // insert into primary index
         int[] primaryFieldPermutation = { 0, 1, 2, 4, 5, 7, 9, 10, 11, 12 };
+        ITuplePartitionerFactory tuplePartitionerFactory = new FieldHashPartitionerFactory(primaryKeyFieldPermutation,
+                primaryHashFunFactories, ordersSplits.length);
         TreeIndexInsertUpdateDeleteOperatorDescriptor primaryInsertOp =
                 new TreeIndexInsertUpdateDeleteOperatorDescriptor(spec, ordersDesc, primaryFieldPermutation,
-                        IndexOperation.INSERT, primaryHelperFactory, null, NoOpOperationCallbackFactory.INSTANCE);
+                        IndexOperation.INSERT, primaryHelperFactory, null, NoOpOperationCallbackFactory.INSTANCE,
+                        tuplePartitionerFactory, partitionsMap);
         PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, primaryInsertOp, NC1_ID);
 
         // secondary index
         int[] secondaryFieldPermutation = { 9, 10, 11, 12, 0 };
+        ITuplePartitionerFactory tuplePartitionerFactory2 =
+                new FieldHashPartitionerFactory(new int[] { 4 }, primaryHashFunFactories, ordersSplits.length);
         TreeIndexInsertUpdateDeleteOperatorDescriptor secondaryInsertOp =
                 new TreeIndexInsertUpdateDeleteOperatorDescriptor(spec, ordersDesc, secondaryFieldPermutation,
-                        IndexOperation.INSERT, secondaryHelperFactory, null, NoOpOperationCallbackFactory.INSTANCE);
+                        IndexOperation.INSERT, secondaryHelperFactory, null, NoOpOperationCallbackFactory.INSTANCE,
+                        tuplePartitionerFactory2, partitionsMap);
         PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, secondaryInsertOp, NC1_ID);
 
         NullSinkOperatorDescriptor nullSink = new NullSinkOperatorDescriptor(spec);
diff --git a/hyracks-fullstack/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/org/apache/hyracks/tests/integration/TestUtil.java b/hyracks-fullstack/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/org/apache/hyracks/tests/integration/TestUtil.java
index d53a04c..8f3181d 100644
--- a/hyracks-fullstack/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/org/apache/hyracks/tests/integration/TestUtil.java
+++ b/hyracks-fullstack/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/org/apache/hyracks/tests/integration/TestUtil.java
@@ -33,7 +33,7 @@
 import com.fasterxml.jackson.databind.ObjectMapper;
 import com.fasterxml.jackson.databind.node.ObjectNode;
 
-class TestUtil {
+public class TestUtil {
 
     private static final String HOST = "127.0.0.1";
     private static final int PORT = 16001;
@@ -68,4 +68,12 @@
     static ObjectNode httpGetAsObject(URI uri) throws URISyntaxException, IOException {
         return getResultAsJson(httpGetAsString(uri));
     }
+
+    public static int[][] getPartitionsMap(int numPartitions) {
+        int[][] map = new int[numPartitions][1];
+        for (int i = 0; i < numPartitions; i++) {
+            map[i] = new int[] { i };
+        }
+        return map;
+    }
 }
diff --git a/hyracks-fullstack/hyracks/hyracks-storage-am-common/src/main/java/org/apache/hyracks/storage/am/common/dataflow/IndexInsertUpdateDeleteOperatorNodePushable.java b/hyracks-fullstack/hyracks/hyracks-storage-am-common/src/main/java/org/apache/hyracks/storage/am/common/dataflow/IndexInsertUpdateDeleteOperatorNodePushable.java
index 9ff9249..d3def46 100644
--- a/hyracks-fullstack/hyracks/hyracks-storage-am-common/src/main/java/org/apache/hyracks/storage/am/common/dataflow/IndexInsertUpdateDeleteOperatorNodePushable.java
+++ b/hyracks-fullstack/hyracks/hyracks-storage-am-common/src/main/java/org/apache/hyracks/storage/am/common/dataflow/IndexInsertUpdateDeleteOperatorNodePushable.java
@@ -23,6 +23,8 @@
 import org.apache.hyracks.api.comm.IFrame;
 import org.apache.hyracks.api.comm.VSizeFrame;
 import org.apache.hyracks.api.context.IHyracksTaskContext;
+import org.apache.hyracks.api.dataflow.value.ITuplePartitioner;
+import org.apache.hyracks.api.dataflow.value.ITuplePartitionerFactory;
 import org.apache.hyracks.api.dataflow.value.RecordDescriptor;
 import org.apache.hyracks.api.exceptions.ErrorCode;
 import org.apache.hyracks.api.exceptions.HyracksDataException;
@@ -38,53 +40,76 @@
 import org.apache.hyracks.storage.am.common.impls.IndexAccessParameters;
 import org.apache.hyracks.storage.am.common.impls.NoOpOperationCallback;
 import org.apache.hyracks.storage.am.common.ophelpers.IndexOperation;
+import org.apache.hyracks.storage.am.common.util.ResourceReleaseUtils;
 import org.apache.hyracks.storage.common.IIndex;
 import org.apache.hyracks.storage.common.IIndexAccessParameters;
 import org.apache.hyracks.storage.common.IIndexAccessor;
 import org.apache.hyracks.storage.common.IModificationOperationCallback;
 import org.apache.hyracks.storage.common.LocalResource;
 
+import it.unimi.dsi.fastutil.ints.Int2IntMap;
+import it.unimi.dsi.fastutil.ints.Int2IntOpenHashMap;
+
 public class IndexInsertUpdateDeleteOperatorNodePushable extends AbstractUnaryInputUnaryOutputOperatorNodePushable {
     protected final IHyracksTaskContext ctx;
-    protected final IIndexDataflowHelper indexHelper;
     protected final RecordDescriptor inputRecDesc;
     protected final IndexOperation op;
     protected final PermutingFrameTupleReference tuple = new PermutingFrameTupleReference();
     protected FrameTupleAccessor accessor;
     protected FrameTupleReference frameTuple;
     protected IFrame writeBuffer;
-    protected IIndexAccessor indexAccessor;
     protected ITupleFilter tupleFilter;
-    protected IModificationOperationCallback modCallback;
-    protected IIndex index;
+    protected final IIndex[] indexes;
+    protected final IIndexAccessor[] indexAccessors;
+    protected final IIndexDataflowHelper[] indexHelpers;
+    protected final IModificationOperationCallback[] modCallbacks;
     protected final IModificationOperationCallbackFactory modOpCallbackFactory;
     protected final ITupleFilterFactory tupleFilterFactory;
+    protected final ITuplePartitioner tuplePartitioner;
+    protected final int[] partitions;
+    protected final Int2IntMap storagePartitionId2Index;
+    protected boolean writerOpen;
 
     public IndexInsertUpdateDeleteOperatorNodePushable(IHyracksTaskContext ctx, int partition,
             IIndexDataflowHelperFactory indexHelperFactory, int[] fieldPermutation, RecordDescriptor inputRecDesc,
             IndexOperation op, IModificationOperationCallbackFactory modOpCallbackFactory,
-            ITupleFilterFactory tupleFilterFactory) throws HyracksDataException {
+            ITupleFilterFactory tupleFilterFactory, ITuplePartitionerFactory tuplePartitionerFactory,
+            int[][] partitionsMap) throws HyracksDataException {
         this.ctx = ctx;
-        this.indexHelper = indexHelperFactory.create(ctx.getJobletContext().getServiceContext(), partition);
+        this.partitions = partitionsMap != null ? partitionsMap[partition] : new int[] { partition };
+        this.indexes = new IIndex[partitions.length];
+        this.indexAccessors = new IIndexAccessor[partitions.length];
+        this.modCallbacks = new IModificationOperationCallback[partitions.length];
+        this.storagePartitionId2Index = new Int2IntOpenHashMap();
+        this.indexHelpers = new IIndexDataflowHelper[partitions.length];
+        for (int i = 0; i < partitions.length; i++) {
+            storagePartitionId2Index.put(partitions[i], i);
+            indexHelpers[i] = indexHelperFactory.create(ctx.getJobletContext().getServiceContext(), partitions[i]);
+        }
         this.modOpCallbackFactory = modOpCallbackFactory;
         this.tupleFilterFactory = tupleFilterFactory;
         this.inputRecDesc = inputRecDesc;
         this.op = op;
         this.tuple.setFieldPermutation(fieldPermutation);
+        this.tuplePartitioner = tuplePartitionerFactory.createPartitioner(ctx);
     }
 
     @Override
     public void open() throws HyracksDataException {
         accessor = new FrameTupleAccessor(inputRecDesc);
         writeBuffer = new VSizeFrame(ctx);
-        indexHelper.open();
-        index = indexHelper.getIndexInstance();
         try {
+            for (int i = 0; i < indexHelpers.length; i++) {
+                IIndexDataflowHelper indexHelper = indexHelpers[i];
+                indexHelper.open();
+                indexes[i] = indexHelper.getIndexInstance();
+                LocalResource resource = indexHelper.getResource();
+                modCallbacks[i] = modOpCallbackFactory.createModificationOperationCallback(resource, ctx, this);
+                IIndexAccessParameters iap = new IndexAccessParameters(modCallbacks[i], NoOpOperationCallback.INSTANCE);
+                indexAccessors[i] = indexes[i].createAccessor(iap);
+            }
             writer.open();
-            LocalResource resource = indexHelper.getResource();
-            modCallback = modOpCallbackFactory.createModificationOperationCallback(resource, ctx, this);
-            IIndexAccessParameters iap = new IndexAccessParameters(modCallback, NoOpOperationCallback.INSTANCE);
-            indexAccessor = index.createAccessor(iap);
+            writerOpen = true;
             if (tupleFilterFactory != null) {
                 tupleFilter = tupleFilterFactory.createTupleFilter(ctx);
                 frameTuple = new FrameTupleReference();
@@ -108,6 +133,9 @@
                 }
                 tuple.reset(accessor, i);
 
+                int storagePartition = tuplePartitioner.partition(accessor, i);
+                int storageIdx = storagePartitionId2Index.get(storagePartition);
+                IIndexAccessor indexAccessor = indexAccessors[storageIdx];
                 switch (op) {
                     case INSERT: {
                         try {
@@ -158,18 +186,24 @@
 
     @Override
     public void close() throws HyracksDataException {
-        if (index != null) {
-            try {
+        Throwable failure = null;
+        try {
+            if (writerOpen) {
                 writer.close();
-            } finally {
-                indexHelper.close();
             }
+        } finally {
+            for (IIndexDataflowHelper indexHelper : indexHelpers) {
+                failure = ResourceReleaseUtils.close(indexHelper, failure);
+            }
+        }
+        if (failure != null) {
+            throw HyracksDataException.create(failure);
         }
     }
 
     @Override
     public void fail() throws HyracksDataException {
-        if (index != null) {
+        if (writerOpen) {
             writer.fail();
         }
     }
diff --git a/hyracks-fullstack/hyracks/hyracks-storage-am-common/src/main/java/org/apache/hyracks/storage/am/common/dataflow/TreeIndexInsertUpdateDeleteOperatorDescriptor.java b/hyracks-fullstack/hyracks/hyracks-storage-am-common/src/main/java/org/apache/hyracks/storage/am/common/dataflow/TreeIndexInsertUpdateDeleteOperatorDescriptor.java
index f5cab5c..81607e7 100644
--- a/hyracks-fullstack/hyracks/hyracks-storage-am-common/src/main/java/org/apache/hyracks/storage/am/common/dataflow/TreeIndexInsertUpdateDeleteOperatorDescriptor.java
+++ b/hyracks-fullstack/hyracks/hyracks-storage-am-common/src/main/java/org/apache/hyracks/storage/am/common/dataflow/TreeIndexInsertUpdateDeleteOperatorDescriptor.java
@@ -22,6 +22,7 @@
 import org.apache.hyracks.api.context.IHyracksTaskContext;
 import org.apache.hyracks.api.dataflow.IOperatorNodePushable;
 import org.apache.hyracks.api.dataflow.value.IRecordDescriptorProvider;
+import org.apache.hyracks.api.dataflow.value.ITuplePartitionerFactory;
 import org.apache.hyracks.api.dataflow.value.RecordDescriptor;
 import org.apache.hyracks.api.exceptions.HyracksDataException;
 import org.apache.hyracks.api.job.IOperatorDescriptorRegistry;
@@ -32,17 +33,19 @@
 
 public class TreeIndexInsertUpdateDeleteOperatorDescriptor extends AbstractSingleActivityOperatorDescriptor {
 
-    private static final long serialVersionUID = 1L;
+    private static final long serialVersionUID = 2L;
     private final IIndexDataflowHelperFactory indexHelperFactory;
     private final IndexOperation op;
     private final int[] fieldPermutation;
     private final IModificationOperationCallbackFactory modificationOpCallbackFactory;
     private final ITupleFilterFactory tupleFilterFactory;
+    protected final ITuplePartitionerFactory tuplePartitionerFactory;
+    protected final int[][] partitionsMap;
 
     public TreeIndexInsertUpdateDeleteOperatorDescriptor(IOperatorDescriptorRegistry spec, RecordDescriptor outRecDesc,
             int[] fieldPermutation, IndexOperation op, IIndexDataflowHelperFactory indexHelperFactory,
-            ITupleFilterFactory tupleFilterFactory,
-            IModificationOperationCallbackFactory modificationOpCallbackFactory) {
+            ITupleFilterFactory tupleFilterFactory, IModificationOperationCallbackFactory modificationOpCallbackFactory,
+            ITuplePartitionerFactory tuplePartitionerFactory, int[][] partitionsMap) {
         super(spec, 1, 1);
         this.indexHelperFactory = indexHelperFactory;
         this.fieldPermutation = fieldPermutation;
@@ -50,6 +53,8 @@
         this.modificationOpCallbackFactory = modificationOpCallbackFactory;
         this.tupleFilterFactory = tupleFilterFactory;
         this.outRecDescs[0] = outRecDesc;
+        this.tuplePartitionerFactory = tuplePartitionerFactory;
+        this.partitionsMap = partitionsMap;
     }
 
     @Override
@@ -57,6 +62,6 @@
             IRecordDescriptorProvider recordDescProvider, int partition, int nPartitions) throws HyracksDataException {
         return new IndexInsertUpdateDeleteOperatorNodePushable(ctx, partition, indexHelperFactory, fieldPermutation,
                 recordDescProvider.getInputRecordDescriptor(getActivityId(), 0), op, modificationOpCallbackFactory,
-                tupleFilterFactory);
+                tupleFilterFactory, tuplePartitionerFactory, partitionsMap);
     }
 }
diff --git a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/pom.xml b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/pom.xml
index d8d155e..ba932e1 100644
--- a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/pom.xml
+++ b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/pom.xml
@@ -94,5 +94,9 @@
       <groupId>com.fasterxml.jackson.core</groupId>
       <artifactId>jackson-databind</artifactId>
     </dependency>
+    <dependency>
+      <groupId>it.unimi.dsi</groupId>
+      <artifactId>fastutil-core</artifactId>
+    </dependency>
   </dependencies>
 </project>
\ No newline at end of file
diff --git a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/api/IFrameTupleProcessor.java b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/api/IFrameTupleProcessor.java
index b6192c1..4cd8a11 100644
--- a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/api/IFrameTupleProcessor.java
+++ b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/api/IFrameTupleProcessor.java
@@ -19,6 +19,7 @@
 package org.apache.hyracks.storage.am.lsm.common.api;
 
 import org.apache.hyracks.api.exceptions.HyracksDataException;
+import org.apache.hyracks.dataflow.common.comm.io.FrameTupleAccessor;
 import org.apache.hyracks.dataflow.common.data.accessors.ITupleReference;
 
 public interface IFrameTupleProcessor {
@@ -37,7 +38,7 @@
      *            the index of the tuple in the frame
      * @throws HyracksDataException
      */
-    void process(ITupleReference tuple, int index) throws HyracksDataException;
+    void process(FrameTupleAccessor accessor, ITupleReference tuple, int index) throws HyracksDataException;
 
     /**
      * Called once per batch before ending the batch process
diff --git a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/api/ILSMHarness.java b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/api/ILSMHarness.java
index 9e8c568..214d9dc 100644
--- a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/api/ILSMHarness.java
+++ b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/api/ILSMHarness.java
@@ -19,6 +19,7 @@
 package org.apache.hyracks.storage.am.lsm.common.api;
 
 import java.util.List;
+import java.util.Set;
 import java.util.function.Predicate;
 
 import org.apache.hyracks.api.exceptions.HyracksDataException;
@@ -39,7 +40,6 @@
      * @param tuple
      *            the operation tuple
      * @throws HyracksDataException
-     * @throws IndexException
      */
     void forceModify(ILSMIndexOperationContext ctx, ITupleReference tuple) throws HyracksDataException;
 
@@ -54,7 +54,6 @@
      *            the operation tuple
      * @return
      * @throws HyracksDataException
-     * @throws IndexException
      */
     boolean modify(ILSMIndexOperationContext ctx, boolean tryOperation, ITupleReference tuple)
             throws HyracksDataException;
@@ -69,7 +68,6 @@
      * @param pred
      *            the search predicate
      * @throws HyracksDataException
-     * @throws IndexException
      */
     void search(ILSMIndexOperationContext ctx, IIndexCursor cursor, ISearchPredicate pred) throws HyracksDataException;
 
@@ -104,9 +102,7 @@
      * Schedule a merge
      *
      * @param ctx
-     * @param callback
      * @throws HyracksDataException
-     * @throws IndexException
      */
     ILSMIOOperation scheduleMerge(ILSMIndexOperationContext ctx) throws HyracksDataException;
 
@@ -114,9 +110,7 @@
      * Schedule full merge
      *
      * @param ctx
-     * @param callback
      * @throws HyracksDataException
-     * @throws IndexException
      */
     ILSMIOOperation scheduleFullMerge(ILSMIndexOperationContext ctx) throws HyracksDataException;
 
@@ -125,7 +119,6 @@
      *
      * @param operation
      * @throws HyracksDataException
-     * @throws IndexException
      */
     void merge(ILSMIOOperation operation) throws HyracksDataException;
 
@@ -133,7 +126,6 @@
      * Schedule a flush
      *
      * @param ctx
-     * @param callback
      * @throws HyracksDataException
      */
     ILSMIOOperation scheduleFlush(ILSMIndexOperationContext ctx) throws HyracksDataException;
@@ -143,7 +135,6 @@
      *
      * @param operation
      * @throws HyracksDataException
-     * @throws IndexException
      */
     void flush(ILSMIOOperation operation) throws HyracksDataException;
 
@@ -153,7 +144,6 @@
      * @param ioOperation
      *            the io operation that added the new component
      * @throws HyracksDataException
-     * @throws IndexException
      */
     void addBulkLoadedComponent(ILSMIOOperation ioOperation) throws HyracksDataException;
 
@@ -235,10 +225,13 @@
      *            the tuple processor
      * @param frameOpCallback
      *            the callback at the end of the frame
+     * @param tuples
+     *            the indexes of tuples to process
      * @throws HyracksDataException
      */
     void batchOperate(ILSMIndexOperationContext ctx, FrameTupleAccessor accessor, FrameTupleReference tuple,
-            IFrameTupleProcessor processor, IFrameOperationCallback frameOpCallback) throws HyracksDataException;
+            IFrameTupleProcessor processor, IFrameOperationCallback frameOpCallback, Set<Integer> tuples)
+            throws HyracksDataException;
 
     /**
      * Rollback components that match the passed predicate
diff --git a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/dataflow/LSMIndexInsertUpdateDeleteOperatorNodePushable.java b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/dataflow/LSMIndexInsertUpdateDeleteOperatorNodePushable.java
index 02744ee..c64e1e1 100644
--- a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/dataflow/LSMIndexInsertUpdateDeleteOperatorNodePushable.java
+++ b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/dataflow/LSMIndexInsertUpdateDeleteOperatorNodePushable.java
@@ -21,6 +21,7 @@
 import java.nio.ByteBuffer;
 
 import org.apache.hyracks.api.context.IHyracksTaskContext;
+import org.apache.hyracks.api.dataflow.value.ITuplePartitionerFactory;
 import org.apache.hyracks.api.dataflow.value.RecordDescriptor;
 import org.apache.hyracks.api.exceptions.HyracksDataException;
 import org.apache.hyracks.dataflow.common.comm.io.FrameTupleAppender;
@@ -47,15 +48,15 @@
     public LSMIndexInsertUpdateDeleteOperatorNodePushable(IHyracksTaskContext ctx, int partition,
             IIndexDataflowHelperFactory indexHelperFactory, int[] fieldPermutation, RecordDescriptor inputRecDesc,
             IndexOperation op, IModificationOperationCallbackFactory modCallbackFactory,
-            ITupleFilterFactory tupleFilterFactory) throws HyracksDataException {
+            ITupleFilterFactory tupleFilterFactory, ITuplePartitionerFactory tuplePartitionerFactory,
+            int[][] partitionsMap) throws HyracksDataException {
         super(ctx, partition, indexHelperFactory, fieldPermutation, inputRecDesc, op, modCallbackFactory,
-                tupleFilterFactory);
+                tupleFilterFactory, tuplePartitionerFactory, partitionsMap);
     }
 
     @Override
     public void nextFrame(ByteBuffer buffer) throws HyracksDataException {
         accessor.reset(buffer);
-        ILSMIndexAccessor lsmAccessor = (ILSMIndexAccessor) indexAccessor;
         int nextFlushTupleIndex = 0;
         int tupleCount = accessor.getTupleCount();
         for (int i = 0; i < tupleCount; i++) {
@@ -68,6 +69,9 @@
                 }
                 tuple.reset(accessor, i);
 
+                int storagePartition = tuplePartitioner.partition(accessor, i);
+                int storageIdx = storagePartitionId2Index.get(storagePartition);
+                ILSMIndexAccessor lsmAccessor = (ILSMIndexAccessor) indexAccessors[storageIdx];
                 switch (op) {
                     case INSERT: {
                         if (!lsmAccessor.tryInsert(tuple)) {
diff --git a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/dataflow/LSMTreeIndexInsertUpdateDeleteOperatorDescriptor.java b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/dataflow/LSMTreeIndexInsertUpdateDeleteOperatorDescriptor.java
index fb884f7..26ce56a 100644
--- a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/dataflow/LSMTreeIndexInsertUpdateDeleteOperatorDescriptor.java
+++ b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/dataflow/LSMTreeIndexInsertUpdateDeleteOperatorDescriptor.java
@@ -22,6 +22,7 @@
 import org.apache.hyracks.api.context.IHyracksTaskContext;
 import org.apache.hyracks.api.dataflow.IOperatorNodePushable;
 import org.apache.hyracks.api.dataflow.value.IRecordDescriptorProvider;
+import org.apache.hyracks.api.dataflow.value.ITuplePartitionerFactory;
 import org.apache.hyracks.api.dataflow.value.RecordDescriptor;
 import org.apache.hyracks.api.exceptions.HyracksDataException;
 import org.apache.hyracks.api.job.IOperatorDescriptorRegistry;
@@ -33,24 +34,29 @@
 
 public class LSMTreeIndexInsertUpdateDeleteOperatorDescriptor extends AbstractSingleActivityOperatorDescriptor {
 
-    private static final long serialVersionUID = 1L;
+    private static final long serialVersionUID = 2L;
 
     protected final int[] fieldPermutation;
     protected final IndexOperation op;
     protected final IIndexDataflowHelperFactory indexHelperFactory;
     protected final IModificationOperationCallbackFactory modCallbackFactory;
     protected final ITupleFilterFactory tupleFilterFactory;
+    protected final ITuplePartitionerFactory tuplePartitionerFactory;
+    protected final int[][] partitionsMap;
 
     public LSMTreeIndexInsertUpdateDeleteOperatorDescriptor(IOperatorDescriptorRegistry spec,
             RecordDescriptor outRecDesc, IIndexDataflowHelperFactory indexHelperFactory, int[] fieldPermutation,
             IndexOperation op, IModificationOperationCallbackFactory modCallbackFactory,
-            ITupleFilterFactory tupleFilterFactory) {
+            ITupleFilterFactory tupleFilterFactory, ITuplePartitionerFactory tuplePartitionerFactory,
+            int[][] partitionsMap) {
         super(spec, 1, 1);
         this.indexHelperFactory = indexHelperFactory;
         this.modCallbackFactory = modCallbackFactory;
         this.tupleFilterFactory = tupleFilterFactory;
         this.fieldPermutation = fieldPermutation;
         this.op = op;
+        this.tuplePartitionerFactory = tuplePartitionerFactory;
+        this.partitionsMap = partitionsMap;
         this.outRecDescs[0] = outRecDesc;
     }
 
@@ -59,6 +65,6 @@
             IRecordDescriptorProvider recordDescProvider, int partition, int nPartitions) throws HyracksDataException {
         return new LSMIndexInsertUpdateDeleteOperatorNodePushable(ctx, partition, indexHelperFactory, fieldPermutation,
                 recordDescProvider.getInputRecordDescriptor(getActivityId(), 0), op, modCallbackFactory,
-                tupleFilterFactory);
+                tupleFilterFactory, tuplePartitionerFactory, partitionsMap);
     }
 }
diff --git a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/LSMHarness.java b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/LSMHarness.java
index 950a8e5..e2c9fab 100644
--- a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/LSMHarness.java
+++ b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/LSMHarness.java
@@ -21,6 +21,7 @@
 
 import java.util.ArrayList;
 import java.util.List;
+import java.util.Set;
 import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.function.Predicate;
 
@@ -711,12 +712,13 @@
 
     @Override
     public void batchOperate(ILSMIndexOperationContext ctx, FrameTupleAccessor accessor, FrameTupleReference tuple,
-            IFrameTupleProcessor processor, IFrameOperationCallback frameOpCallback) throws HyracksDataException {
+            IFrameTupleProcessor processor, IFrameOperationCallback frameOpCallback, Set<Integer> tuples)
+            throws HyracksDataException {
         processor.start();
         enter(ctx);
         try {
             try {
-                processFrame(accessor, tuple, processor);
+                processFrame(accessor, tuple, processor, tuples);
                 frameOpCallback.frameCompleted();
             } catch (Throwable th) {
                 processor.fail(th);
@@ -860,13 +862,14 @@
     }
 
     private static void processFrame(FrameTupleAccessor accessor, FrameTupleReference tuple,
-            IFrameTupleProcessor processor) throws HyracksDataException {
+            IFrameTupleProcessor processor, Set<Integer> tuples) throws HyracksDataException {
         int tupleCount = accessor.getTupleCount();
-        int i = 0;
-        while (i < tupleCount) {
+        for (int i = 0; i < tupleCount; i++) {
+            if (!tuples.contains(i)) {
+                continue;
+            }
             tuple.reset(accessor, i);
-            processor.process(tuple, i);
-            i++;
+            processor.process(accessor, tuple, i);
         }
     }
 
diff --git a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/LSMTreeIndexAccessor.java b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/LSMTreeIndexAccessor.java
index 8412b8c..fb5984d 100644
--- a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/LSMTreeIndexAccessor.java
+++ b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/LSMTreeIndexAccessor.java
@@ -20,6 +20,7 @@
 package org.apache.hyracks.storage.am.lsm.common.impls;
 
 import java.util.List;
+import java.util.Set;
 import java.util.function.Predicate;
 
 import org.apache.hyracks.api.exceptions.HyracksDataException;
@@ -210,8 +211,8 @@
     }
 
     public void batchOperate(FrameTupleAccessor accessor, FrameTupleReference tuple, IFrameTupleProcessor processor,
-            IFrameOperationCallback frameOpCallback) throws HyracksDataException {
-        lsmHarness.batchOperate(ctx, accessor, tuple, processor, frameOpCallback);
+            IFrameOperationCallback frameOpCallback, Set<Integer> tuples) throws HyracksDataException {
+        lsmHarness.batchOperate(ctx, accessor, tuple, processor, frameOpCallback, tuples);
     }
 
     @Override