PHOENIX-5565 Unify index update structures in IndexRegionObserver and IndexCommitter
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/IndexRegionObserver.java b/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/IndexRegionObserver.java
index b058b33..340832f 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/IndexRegionObserver.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/IndexRegionObserver.java
@@ -32,6 +32,8 @@
 import java.util.Map;
 import java.util.concurrent.ConcurrentHashMap;
 
+import com.google.common.collect.ArrayListMultimap;
+import com.google.common.collect.ListMultimap;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.conf.Configuration;
@@ -67,6 +69,7 @@
 import org.apache.phoenix.hbase.index.covered.IndexMetaData;
 import org.apache.phoenix.hbase.index.metrics.MetricsIndexerSource;
 import org.apache.phoenix.hbase.index.metrics.MetricsIndexerSourceFactory;
+import org.apache.phoenix.hbase.index.table.HTableInterfaceReference;
 import org.apache.phoenix.hbase.index.util.ImmutableBytesPtr;
 import org.apache.phoenix.hbase.index.util.IndexManagementUtil;
 import org.apache.phoenix.hbase.index.write.IndexWriter;
@@ -145,16 +148,16 @@
       private final int clientVersion;
       // The collection of index mutations that will be applied before the data table mutations. The empty column (i.e.,
       // the verified column) will have the value false ("unverified") on these mutations
-      private Collection<Pair<Mutation, byte[]>> preIndexUpdates = Collections.emptyList();
+      private ListMultimap<HTableInterfaceReference, Mutation> preIndexUpdates;
       // The collection of index mutations that will be applied after the data table mutations. The empty column (i.e.,
       // the verified column) will have the value true ("verified") on the put mutations
-      private Collection<Pair<Mutation, byte[]>> postIndexUpdates = Collections.emptyList();
+      private ListMultimap<HTableInterfaceReference, Mutation> postIndexUpdates;
       // The collection of candidate index mutations that will be applied after the data table mutations
-      private Collection<Pair<Pair<Mutation, byte[]>, byte[]>> intermediatePostIndexUpdates;
+      private ListMultimap<HTableInterfaceReference, Pair<Mutation, byte[]>> intermediatePostIndexUpdates;
       private List<RowLock> rowLocks = Lists.newArrayListWithExpectedSize(QueryServicesOptions.DEFAULT_MUTATE_BATCH_SIZE);
       private HashSet<ImmutableBytesPtr> rowsToLock = new HashSet<>();
-      long dataWriteStartTime;
-
+      private long dataWriteStartTime;
+      private boolean rebuild;
       private BatchMutateContext(int clientVersion) {
           this.clientVersion = clientVersion;
       }
@@ -506,6 +509,27 @@
       }
   }
 
+  private void handleLocalIndexUpdates(ObserverContext<RegionCoprocessorEnvironment> c,
+                                       MiniBatchOperationInProgress<Mutation> miniBatchOp,
+                                       ListMultimap<HTableInterfaceReference, Pair<Mutation, byte[]>> indexUpdates) {
+      byte[] tableName = c.getEnvironment().getRegion().getTableDesc().getTableName().getName();
+      HTableInterfaceReference hTableInterfaceReference =
+                          new HTableInterfaceReference(new ImmutableBytesPtr(tableName));
+      List<Pair<Mutation, byte[]>> localIndexUpdates = indexUpdates.removeAll(hTableInterfaceReference);
+      if (localIndexUpdates == null || localIndexUpdates.isEmpty()) {
+          return;
+      }
+      List<Mutation> localUpdates = new ArrayList<Mutation>();
+      Iterator<Pair<Mutation, byte[]>> indexUpdatesItr = localIndexUpdates.iterator();
+      while (indexUpdatesItr.hasNext()) {
+          Pair<Mutation, byte[]> next = indexUpdatesItr.next();
+          localUpdates.add(next.getFirst());
+      }
+      if (!localUpdates.isEmpty()) {
+          miniBatchOp.addOperationsFromCP(0, localUpdates.toArray(new Mutation[localUpdates.size()]));
+      }
+  }
+
   private void prepareIndexMutations(
           ObserverContext<RegionCoprocessorEnvironment> c,
           MiniBatchOperationInProgress<Mutation> miniBatchOp,
@@ -513,79 +537,56 @@
           Collection<? extends Mutation> mutations,
           long now,
           PhoenixIndexMetaData indexMetaData) throws Throwable {
-
       List<IndexMaintainer> maintainers = indexMetaData.getIndexMaintainers();
-
       // get the current span, or just use a null-span to avoid a bunch of if statements
       try (TraceScope scope = Trace.startSpan("Starting to build index updates")) {
           Span current = scope.getSpan();
           if (current == null) {
               current = NullSpan.INSTANCE;
           }
-
           // get the index updates for all elements in this batch
-          Collection<Pair<Pair<Mutation, byte[]>, byte[]>> indexUpdates =
-                  this.builder.getIndexUpdates(miniBatchOp, mutations, indexMetaData);
-
+          context.intermediatePostIndexUpdates = ArrayListMultimap.<HTableInterfaceReference, Pair<Mutation, byte[]>>create();
+          this.builder.getIndexUpdates(context.intermediatePostIndexUpdates, miniBatchOp, mutations, indexMetaData);
           current.addTimelineAnnotation("Built index updates, doing preStep");
-          TracingUtils.addAnnotation(current, "index update count", indexUpdates.size());
-          byte[] tableName = c.getEnvironment().getRegion().getTableDesc().getTableName().getName();
-          Iterator<Pair<Pair<Mutation, byte[]>, byte[]>> indexUpdatesItr = indexUpdates.iterator();
-          List<Mutation> localUpdates = new ArrayList<Mutation>(indexUpdates.size());
-          context.preIndexUpdates = new ArrayList<>(indexUpdates.size());
-          context.intermediatePostIndexUpdates = new ArrayList<>(indexUpdates.size());
-          while(indexUpdatesItr.hasNext()) {
-              Pair<Pair<Mutation, byte[]>, byte[]> next = indexUpdatesItr.next();
-              if (Bytes.compareTo(next.getFirst().getSecond(), tableName) == 0) {
-                  localUpdates.add(next.getFirst().getFirst());
-                  indexUpdatesItr.remove();
-              }
-              else {
-                  // get index maintainer for this index table
-                  IndexMaintainer indexMaintainer = getIndexMaintainer(maintainers, next.getFirst().getSecond());
-                  if (indexMaintainer == null) {
-                      throw new DoNotRetryIOException(
-                              "preBatchMutateWithExceptions: indexMaintainer is null " +
-                                      c.getEnvironment().getRegion().getRegionInfo().getTable().getNameAsString());
-                  }
-                  byte[] emptyCF = indexMaintainer.getEmptyKeyValueFamily().copyBytesIfNecessary();
-                  byte[] emptyCQ = indexMaintainer.getEmptyKeyValueQualifier();
+          handleLocalIndexUpdates(c, miniBatchOp, context.intermediatePostIndexUpdates);
+          context.preIndexUpdates = ArrayListMultimap.<HTableInterfaceReference, Mutation>create();
+          int updateCount = 0;
+          for (IndexMaintainer indexMaintainer : maintainers) {
+              updateCount++;
+              byte[] emptyCF = indexMaintainer.getEmptyKeyValueFamily().copyBytesIfNecessary();
+              byte[] emptyCQ = indexMaintainer.getEmptyKeyValueQualifier();
+              HTableInterfaceReference hTableInterfaceReference =
+                      new HTableInterfaceReference(new ImmutableBytesPtr(indexMaintainer.getIndexTableName()));
+              Iterator<Pair<Mutation, byte[]>> indexUpdatesItr =
+                      context.intermediatePostIndexUpdates.get(hTableInterfaceReference).iterator();
+              while (indexUpdatesItr.hasNext()) {
+                  Pair<Mutation, byte[]> next = indexUpdatesItr.next();
                   // add the VERIFIED cell, which is the empty cell
-                  Mutation m = next.getFirst().getFirst();
-                  boolean rebuild = PhoenixIndexMetaData.isIndexRebuild(m.getAttributesMap());
-                  if (rebuild) {
+                  Mutation m = next.getFirst();
+                  if (context.rebuild) {
+                      indexUpdatesItr.remove();
                       if (m instanceof Put) {
                           long ts = getMaxTimestamp(m);
                           // Remove the empty column prepared by Index codec as we need to change its value
                           removeEmptyColumn(m, emptyCF, emptyCQ);
-                          ((Put)m).addColumn(emptyCF, emptyCQ, ts, VERIFIED_BYTES);
+                          ((Put) m).addColumn(emptyCF, emptyCQ, ts, VERIFIED_BYTES);
                       }
+                      context.preIndexUpdates.put(hTableInterfaceReference, m);
                   } else {
-                      indexUpdatesItr.remove();
                       // For this mutation whether it is put or delete, set the status of the index row "unverified"
                       // This will be done before the data table row is updated (i.e., in the first write phase)
                       Put unverifiedPut = new Put(m.getRow());
                       unverifiedPut.addColumn(emptyCF, emptyCQ, now - 1, UNVERIFIED_BYTES);
-                      context.preIndexUpdates.add(new Pair <Mutation, byte[]>(unverifiedPut, next.getFirst().getSecond()));
+                      context.preIndexUpdates.put(hTableInterfaceReference, unverifiedPut);
                       if (m instanceof Put) {
                           // Remove the empty column prepared by Index codec as we need to change its value
                           removeEmptyColumn(m, emptyCF, emptyCQ);
                           ((Put) m).addColumn(emptyCF, emptyCQ, now, VERIFIED_BYTES);
                       }
-                      context.intermediatePostIndexUpdates.add(next);
                   }
               }
           }
-          if (!localUpdates.isEmpty()) {
-              miniBatchOp.addOperationsFromCP(0,
-                      localUpdates.toArray(new Mutation[localUpdates.size()]));
-          }
-          if (!indexUpdates.isEmpty() && context.preIndexUpdates.isEmpty()) {
-              context.preIndexUpdates = new ArrayList<>(indexUpdates.size());
-          }
-          for (Pair<Pair<Mutation, byte[]>, byte[]> update : indexUpdates) {
-              context.preIndexUpdates.add(update.getFirst());
-          }
+          TracingUtils.addAnnotation(current, "index update count", updateCount);
       }
   }
 
@@ -610,20 +611,23 @@
       setBatchMutateContext(c, context);
       Mutation firstMutation = miniBatchOp.getOperation(0);
       ReplayWrite replayWrite = this.builder.getReplayWrite(firstMutation);
+      context.rebuild = replayWrite != null;
       /*
        * Exclusively lock all rows so we get a consistent read
        * while determining the index updates
        */
-      if (replayWrite == null) {
+      long now;
+      if (!context.rebuild) {
           populateRowsToLock(miniBatchOp, context);
           lockRows(context);
-      }
-      long now = EnvironmentEdgeManager.currentTimeMillis();
-      // Add the table rows in the mini batch to the collection of pending rows. This will be used to detect
-      // concurrent updates
-      if (replayWrite == null) {
+          now = EnvironmentEdgeManager.currentTimeMillis();
+          // Add the table rows in the mini batch to the collection of pending rows. This will be used to detect
+          // concurrent updates
           populatePendingRows(context);
       }
+      else {
+          now = EnvironmentEdgeManager.currentTimeMillis();
+      }
       // First group all the updates for a single row into a single update to be processed
       Collection<? extends Mutation> mutations = groupMutations(miniBatchOp, now, replayWrite);
       // early exit if it turns out we don't have any edits
@@ -646,9 +650,11 @@
       for (RowLock rowLock : context.rowLocks) {
           rowLock.release();
       }
-      // Do the index updates
+      // Do the first phase index updates
       doPre(c, context, miniBatchOp);
-      if (replayWrite == null) {
+      context.postIndexUpdates = ArrayListMultimap.<HTableInterfaceReference, Mutation>create();
+      if (!context.rebuild) {
+          List<IndexMaintainer> maintainers = indexMetaData.getIndexMaintainers();
           // Acquire the locks again before letting the region proceed with data table updates
           List<RowLock> rowLocks = Lists.newArrayListWithExpectedSize(context.rowLocks.size());
           for (RowLock rowLock : context.rowLocks) {
@@ -658,29 +664,26 @@
           context.rowLocks.clear();
           context.rowLocks = rowLocks;
           // Check if we need to skip post index update for any of the row
-          Iterator<Pair<Pair<Mutation, byte[]>, byte[]>> iterator = context.intermediatePostIndexUpdates.iterator();
-          while (iterator.hasNext()) {
-              // Check if this row is going through another mutation which has a newer timestamp. If so,
-              // ignore the pending updates for this row
-              Pair<Pair<Mutation, byte[]>, byte[]> update = iterator.next();
-              ImmutableBytesPtr rowKey = new ImmutableBytesPtr(update.getSecond());
-              PendingRow pendingRow = pendingRows.get(rowKey);
-              // Are there concurrent updates on the data table row? if so, skip post index updates
-              // and let read repair resolve conflicts
-              if (pendingRow.isConcurrent()) {
-                  iterator.remove();
+          for (IndexMaintainer indexMaintainer : maintainers) {
+              HTableInterfaceReference hTableInterfaceReference =
+                      new HTableInterfaceReference(new ImmutableBytesPtr(indexMaintainer.getIndexTableName()));
+              Iterator<Pair<Mutation, byte[]>> iterator =
+                      context.intermediatePostIndexUpdates.get(hTableInterfaceReference).iterator();
+              while (iterator.hasNext()) {
+                  // Are there concurrent updates on the data table row? if so, skip post index updates
+                  // and let read repair resolve conflicts
+                  Pair<Mutation, byte[]> update = iterator.next();
+                  ImmutableBytesPtr rowKey = new ImmutableBytesPtr(update.getSecond());
+                  PendingRow pendingRow = pendingRows.get(rowKey);
+                  if (!pendingRow.isConcurrent()) {
+                      context.postIndexUpdates.put(hTableInterfaceReference, update.getFirst());
+                  }
               }
           }
           // We are done with handling concurrent mutations. So we can remove the rows of this batch from
           // the collection of pending rows
           removePendingRows(context);
       }
-      if (context.postIndexUpdates.isEmpty() && !context.intermediatePostIndexUpdates.isEmpty()) {
-          context.postIndexUpdates = new ArrayList<>(context.intermediatePostIndexUpdates.size());
-      }
-      for (Pair<Pair<Mutation, byte[]>, byte[]> update : context.intermediatePostIndexUpdates) {
-          context.postIndexUpdates.add(update.getFirst());
-      }
       if (failDataTableUpdatesForTesting) {
           throw new DoNotRetryIOException("Simulating the data table write failure");
       }
@@ -758,7 +761,7 @@
 
   private void doIndexWritesWithExceptions(BatchMutateContext context, boolean post)
             throws IOException {
-      Collection<Pair<Mutation, byte[]>> indexUpdates = post ? context.postIndexUpdates : context.preIndexUpdates;
+      ListMultimap<HTableInterfaceReference, Mutation> indexUpdates = post ? context.postIndexUpdates : context.preIndexUpdates;
       //short circuit, if we don't need to do any work
 
       if (context == null || indexUpdates.isEmpty()) {
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/builder/IndexBuildManager.java b/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/builder/IndexBuildManager.java
index 7639a49..90d28b8 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/builder/IndexBuildManager.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/builder/IndexBuildManager.java
@@ -22,6 +22,7 @@
 import java.util.Collection;
 import java.util.List;
 
+import com.google.common.collect.ListMultimap;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.hbase.KeyValue;
 import org.apache.hadoop.hbase.Stoppable;
@@ -34,6 +35,8 @@
 import org.apache.phoenix.coprocessor.BaseScannerRegionObserver.ReplayWrite;
 import org.apache.phoenix.hbase.index.Indexer;
 import org.apache.phoenix.hbase.index.covered.IndexMetaData;
+import org.apache.phoenix.hbase.index.table.HTableInterfaceReference;
+import org.apache.phoenix.hbase.index.util.ImmutableBytesPtr;
 import org.apache.phoenix.index.PhoenixIndexMetaData;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -79,7 +82,7 @@
       return this.delegate.getIndexMetaData(miniBatchOp);
   }
 
-  public Collection<Pair<Pair<Mutation, byte[]>, byte[]>> getIndexUpdates(
+  public void getIndexUpdates(ListMultimap<HTableInterfaceReference, Pair<Mutation, byte[]>> indexUpdates,
       MiniBatchOperationInProgress<Mutation> miniBatchOp,
       Collection<? extends Mutation> mutations,
       IndexMetaData indexMetaData) throws Throwable {
@@ -87,20 +90,12 @@
     this.delegate.batchStarted(miniBatchOp, indexMetaData);
 
     // Avoid the Object overhead of the executor when it's not actually parallelizing anything.
-    ArrayList<Pair<Pair<Mutation, byte[]>, byte[]>> results = new ArrayList<>(mutations.size());
     for (Mutation m : mutations) {
       Collection<Pair<Mutation, byte[]>> updates = delegate.getIndexUpdate(m, indexMetaData);
-      if (PhoenixIndexMetaData.isIndexRebuild(m.getAttributesMap())) {
-          for (Pair<Mutation, byte[]> update : updates) {
-            update.getFirst().setAttribute(BaseScannerRegionObserver.REPLAY_WRITES,
-                BaseScannerRegionObserver.REPLAY_INDEX_REBUILD_WRITES);
-          }
-      }
       for (Pair<Mutation, byte[]> update : updates) {
-        results.add(new Pair<>(update, m.getRow()));
+        indexUpdates.put(new HTableInterfaceReference(new ImmutableBytesPtr(update.getSecond())), new Pair<>(update.getFirst(), m.getRow()));
       }
     }
-    return results;
   }
 
   public Collection<Pair<Mutation, byte[]>> getIndexUpdate(