PHOENIX-5597 No read repair happens when scans filter rows based on a covered column
diff --git a/phoenix-core/src/it/java/org/apache/phoenix/end2end/index/GlobalIndexCheckerIT.java b/phoenix-core/src/it/java/org/apache/phoenix/end2end/index/GlobalIndexCheckerIT.java
index 2cf3f1a..9a86bb4 100644
--- a/phoenix-core/src/it/java/org/apache/phoenix/end2end/index/GlobalIndexCheckerIT.java
+++ b/phoenix-core/src/it/java/org/apache/phoenix/end2end/index/GlobalIndexCheckerIT.java
@@ -17,7 +17,6 @@
  */
 package org.apache.phoenix.end2end.index;
 
-import static org.apache.phoenix.end2end.index.ImmutableIndexIT.verifyRowsForEmptyColValue;
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertFalse;
 import static org.junit.Assert.assertTrue;
@@ -35,23 +34,12 @@
 import com.google.common.collect.Maps;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
-import org.apache.hadoop.hbase.CellScanner;
-import org.apache.hadoop.hbase.client.HBaseAdmin;
-import org.apache.hadoop.hbase.client.HTable;
-import org.apache.hadoop.hbase.client.Result;
-import org.apache.hadoop.hbase.client.ResultScanner;
-import org.apache.hadoop.hbase.client.Scan;
-import org.apache.hadoop.hbase.client.Table;
-import org.apache.hadoop.hbase.util.Bytes;
 import org.apache.phoenix.end2end.BaseUniqueNamesOwnClusterIT;
 import org.apache.phoenix.end2end.IndexToolIT;
 import org.apache.phoenix.hbase.index.IndexRegionObserver;
-import org.apache.phoenix.jdbc.PhoenixConnection;
-import org.apache.phoenix.schema.PTable;
 import org.apache.phoenix.util.PhoenixRuntime;
 import org.apache.phoenix.util.QueryUtil;
 import org.apache.phoenix.util.ReadOnlyProps;
-import org.apache.phoenix.util.TestUtil;
 import org.junit.After;
 import org.junit.BeforeClass;
 import org.junit.Test;
@@ -258,7 +246,7 @@
             conn.createStatement().execute("upsert into " + dataTableName + " (id, val1, val2) values ('c', 'cd','cde')");
             conn.commit();
             IndexRegionObserver.setFailPostIndexUpdatesForTesting(false);
-            String selectSql = "SELECT val2, val3 from " + dataTableName + " WHERE val1  = 'ab'";
+            String selectSql = "SELECT val2, val3 from " + dataTableName + " WHERE val1  = 'ab' and val2 = 'abcc'";
             // Verify that we will read from the index table
             assertExplainPlan(conn, selectSql, dataTableName, indexTableName);
             ResultSet rs = conn.createStatement().executeQuery(selectSql);
@@ -271,47 +259,6 @@
         }
     }
 
-    public static void checkUnverifiedCellCount(Connection conn, String indexTableName) throws Exception {
-        Table hIndexTable = conn.unwrap(PhoenixConnection.class).getQueryServices()
-                .getTable(Bytes.toBytes(indexTableName));
-        long indexCnt = TestUtil.getRowCount(hIndexTable, false);
-        assertEquals(1, indexCnt);
-        assertEquals(true, verifyRowsForEmptyColValue(conn, indexTableName, IndexRegionObserver.UNVERIFIED_BYTES));
-        Scan s = new Scan();
-        int cntCellValues = 0;
-        try (ResultScanner scanner = hIndexTable.getScanner(s)) {
-            Result result;
-            while ((result = scanner.next()) != null) {
-                CellScanner cellScanner = result.cellScanner();
-                while (cellScanner.advance()) {
-                    cntCellValues++;
-                }
-            }
-        }
-        assertEquals(1, cntCellValues);
-    }
-    @Test
-    public void testUnverifiedRowIncludesOnlyEmptyCell() throws Exception {
-        String dataTableName = generateUniqueName();
-        try (Connection conn = DriverManager.getConnection(getUrl())) {
-            conn.createStatement().execute("create table " + dataTableName +
-                    " (id varchar(10) not null primary key, val1 varchar(10), val2 varchar(10), val3 varchar(10))" + tableDDLOptions);
-            String indexTableName = generateUniqueName();
-            conn.createStatement().execute("CREATE INDEX " + indexTableName + " on " +
-                    dataTableName + " (val1) include (val2, val3)");
-            // Configure IndexRegionObserver to fail the last write phase (i.e., the post index update phase)
-            IndexRegionObserver.setFailPostIndexUpdatesForTesting(true);
-            conn.createStatement().execute("upsert into " + dataTableName + " (id, val2) values ('a', 'abcc')");
-            conn.commit();
-            IndexRegionObserver.setFailPostIndexUpdatesForTesting(false);
-            // check that in the first phase we don't send the full row.
-            // We count the num of cells for this
-            checkUnverifiedCellCount(conn, indexTableName);
-            // Add rows and check everything is still okay
-            verifyTableHealth(conn, dataTableName, indexTableName);
-        }
-    }
-
     @Test
     public void testOnePhaseOverwiteFollowingTwoPhaseWrite() throws Exception {
         try (Connection conn = DriverManager.getConnection(getUrl())) {
diff --git a/phoenix-core/src/it/java/org/apache/phoenix/end2end/index/ImmutableIndexIT.java b/phoenix-core/src/it/java/org/apache/phoenix/end2end/index/ImmutableIndexIT.java
index 8d6b650..f1ada39 100644
--- a/phoenix-core/src/it/java/org/apache/phoenix/end2end/index/ImmutableIndexIT.java
+++ b/phoenix-core/src/it/java/org/apache/phoenix/end2end/index/ImmutableIndexIT.java
@@ -422,39 +422,6 @@
         }
     }
 
-    @Test
-    public void testGlobalImmutableIndexUnverifiedOnlyInPhase1() throws Exception {
-        if (localIndex || transactionProvider != null) {
-            return;
-        }
-        Properties props = PropertiesUtil.deepCopy(TEST_PROPERTIES);
-        String tableName = "TBL_" + generateUniqueName();
-        String indexName = "IND_" + generateUniqueName();
-        String fullTableName = SchemaUtil.getTableName(TestUtil.DEFAULT_SCHEMA_NAME, tableName);
-        String fullIndexName = SchemaUtil.getTableName(TestUtil.DEFAULT_SCHEMA_NAME, indexName);
-        TABLE_NAME = fullTableName;
-        try (Connection conn = DriverManager.getConnection(getUrl(), props);
-                Admin admin = conn.unwrap(PhoenixConnection.class).getQueryServices().getAdmin();) {
-            conn.setAutoCommit(true);
-            createAndPopulateTableAndIndexForConsistentIndex(conn, fullTableName, fullIndexName, 0, null);
-
-            // Now force fail
-            TestUtil.removeCoprocessor(conn, fullTableName, IndexRegionObserver.class);
-            TestUtil.addCoprocessor(conn, fullTableName, UpsertFailingRegionObserver.class);
-            try {
-                upsertRows(conn, fullTableName, 1);
-            } catch (Exception e) {
-                // ignore this since we force the fail
-            }
-
-            ResultSet rs = conn.createStatement().executeQuery("SELECT /*+ NO_INDEX */ COUNT(*) FROM " + TABLE_NAME);
-            assertTrue(rs.next());
-            assertEquals(0, rs.getInt(1));
-
-            GlobalIndexCheckerIT.checkUnverifiedCellCount(conn, fullIndexName);
-        }
-    }
-
     public static class DeleteFailingRegionObserver extends SimpleRegionObserver {
         @Override
         public void preBatchMutate(ObserverContext<RegionCoprocessorEnvironment> c, MiniBatchOperationInProgress<Mutation> miniBatchOp) throws
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/execute/MutationState.java b/phoenix-core/src/main/java/org/apache/phoenix/execute/MutationState.java
index 8c38f1e..95a354b 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/execute/MutationState.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/execute/MutationState.java
@@ -1233,7 +1233,7 @@
                     }
                     if (m instanceof Delete) {
                         Put put = new Put(m.getRow());
-                        put.addColumn(emptyCF, emptyCQ, IndexRegionObserver.getMaxTimestamp(m) - 1,
+                        put.addColumn(emptyCF, emptyCQ, IndexRegionObserver.getMaxTimestamp(m),
                                 IndexRegionObserver.UNVERIFIED_BYTES);
                         // The Delete gets marked as unverified in Phase 1 and gets deleted on Phase 3.
                         addToMap(unverifiedIndexMutations, tableInfo, put);
@@ -1241,19 +1241,17 @@
                     } else if (m instanceof Put) {
                         long timestamp = IndexRegionObserver.getMaxTimestamp(m);
 
-                        // Phase 1 index mutations are set to unverified.
-                        // Just send empty with Unverified
-                        Put unverifiedPut = new Put(m.getRow());
-                        unverifiedPut.addColumn(emptyCF, emptyCQ, timestamp - 1, IndexRegionObserver.UNVERIFIED_BYTES);
-                        addToMap(unverifiedIndexMutations, tableInfo, unverifiedPut);
-
-                        // Phase 3 mutations are verified
-                        // Send entire mutation with verified
+                        // Phase 1 index mutations are set to unverified
+                        // Send entire mutation with the unverified status
                         // Remove the empty column prepared by Index codec as we need to change its value
                         IndexRegionObserver.removeEmptyColumn(m, emptyCF, emptyCQ);
-                        ((Put) m).addColumn(emptyCF, emptyCQ, timestamp,
-                                IndexRegionObserver.VERIFIED_BYTES);
-                        addToMap(verifiedOrDeletedIndexMutations, tableInfo, m);
+                        ((Put) m).addColumn(emptyCF, emptyCQ, timestamp, IndexRegionObserver.UNVERIFIED_BYTES);
+                        addToMap(unverifiedIndexMutations, tableInfo, m);
+
+                        // Phase 3 mutations are verified
+                        Put verifiedPut = new Put(m.getRow());
+                        verifiedPut.addColumn(emptyCF, emptyCQ, timestamp, IndexRegionObserver.VERIFIED_BYTES);
+                        addToMap(verifiedOrDeletedIndexMutations, tableInfo, verifiedPut);
                     } else {
                         addToMap(unverifiedIndexMutations, tableInfo, m);
                     }
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/IndexRegionObserver.java b/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/IndexRegionObserver.java
index a33a3ee..fa4375c 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/IndexRegionObserver.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/IndexRegionObserver.java
@@ -18,13 +18,10 @@
 package org.apache.phoenix.hbase.index;
 
 import static org.apache.phoenix.hbase.index.util.IndexManagementUtil.rethrowIndexingException;
-import static org.apache.phoenix.index.IndexMaintainer.getIndexMaintainer;
-import static org.apache.phoenix.util.ServerUtil.wrapInDoNotRetryIOException;
 
 import java.io.IOException;
 import java.util.ArrayList;
 import java.util.Collection;
-import java.util.Collections;
 import java.util.HashMap;
 import java.util.HashSet;
 import java.util.Iterator;
@@ -157,10 +154,9 @@
       // the verified column) will have the value true ("verified") on the put mutations
       private ListMultimap<HTableInterfaceReference, Mutation> postIndexUpdates;
       // The collection of candidate index mutations that will be applied after the data table mutations
-      private ListMultimap<HTableInterfaceReference, Pair<Mutation, byte[]>> intermediatePostIndexUpdates;
+      private ListMultimap<HTableInterfaceReference, Pair<Mutation, byte[]>> indexUpdates;
       private List<RowLock> rowLocks = Lists.newArrayListWithExpectedSize(QueryServicesOptions.DEFAULT_MUTATE_BATCH_SIZE);
       private HashSet<ImmutableBytesPtr> rowsToLock = new HashSet<>();
-      private long dataWriteStartTime;
       private boolean rebuild;
       private BatchMutateContext(int clientVersion) {
           this.clientVersion = clientVersion;
@@ -551,10 +547,10 @@
               current = NullSpan.INSTANCE;
           }
           // get the index updates for all elements in this batch
-          context.intermediatePostIndexUpdates = ArrayListMultimap.<HTableInterfaceReference, Pair<Mutation, byte[]>>create();
-          this.builder.getIndexUpdates(context.intermediatePostIndexUpdates, miniBatchOp, mutations, indexMetaData);
+          context.indexUpdates = ArrayListMultimap.<HTableInterfaceReference, Pair<Mutation, byte[]>>create();
+          this.builder.getIndexUpdates(context.indexUpdates, miniBatchOp, mutations, indexMetaData);
           current.addTimelineAnnotation("Built index updates, doing preStep");
-          handleLocalIndexUpdates(c, miniBatchOp, context.intermediatePostIndexUpdates);
+          handleLocalIndexUpdates(c, miniBatchOp, context.indexUpdates);
           context.preIndexUpdates = ArrayListMultimap.<HTableInterfaceReference, Mutation>create();
           int updateCount = 0;
           for (IndexMaintainer indexMaintainer : maintainers) {
@@ -563,14 +559,11 @@
               byte[] emptyCQ = indexMaintainer.getEmptyKeyValueQualifier();
               HTableInterfaceReference hTableInterfaceReference =
                       new HTableInterfaceReference(new ImmutableBytesPtr(indexMaintainer.getIndexTableName()));
-              Iterator<Pair<Mutation, byte[]>> indexUpdatesItr =
-                      context.intermediatePostIndexUpdates.get(hTableInterfaceReference).iterator();
-              while (indexUpdatesItr.hasNext()) {
-                  Pair<Mutation, byte[]> next = indexUpdatesItr.next();
+              List <Pair<Mutation, byte[]>> updates = context.indexUpdates.get(hTableInterfaceReference);
+              for (Pair<Mutation, byte[]> update : updates) {
                   // add the VERIFIED cell, which is the empty cell
-                  Mutation m = next.getFirst();
+                  Mutation m = update.getFirst();
                   if (context.rebuild) {
-                      indexUpdatesItr.remove();
                       if (m instanceof Put) {
                           long ts = getMaxTimestamp(m);
                           // Remove the empty column prepared by Index codec as we need to change its value
@@ -579,15 +572,20 @@
                       }
                       context.preIndexUpdates.put(hTableInterfaceReference, m);
                   } else {
-                      // For this mutation whether it is put or delete, set the status of the index row "unverified"
-                      // This will be done before the data table row is updated (i.e., in the first write phase)
-                      Put unverifiedPut = new Put(m.getRow());
-                      unverifiedPut.addColumn(emptyCF, emptyCQ, now - 1, UNVERIFIED_BYTES);
-                      context.preIndexUpdates.put(hTableInterfaceReference, unverifiedPut);
                       if (m instanceof Put) {
                           // Remove the empty column prepared by Index codec as we need to change its value
                           removeEmptyColumn(m, emptyCF, emptyCQ);
-                          ((Put) m).addColumn(emptyCF, emptyCQ, now, VERIFIED_BYTES);
+                          // Set the status of the index row to "unverified"
+                          ((Put) m).addColumn(emptyCF, emptyCQ, now, UNVERIFIED_BYTES);
+                          // This will be done before the data table row is updated (i.e., in the first write phase)
+                          context.preIndexUpdates.put(hTableInterfaceReference, m);
+                      }
+                      else {
+                          // Set the status of the index row to "unverified"
+                          Put unverifiedPut = new Put(m.getRow());
+                          unverifiedPut.addColumn(emptyCF, emptyCQ, now, UNVERIFIED_BYTES);
+                          // This will be done before the data table row is updated (i.e., in the first write phase)
+                          context.preIndexUpdates.put(hTableInterfaceReference, unverifiedPut);
                       }
                   }
               }
@@ -609,6 +607,45 @@
       return (PhoenixIndexMetaData)indexMetaData;
   }
 
+  private void preparePostIndexMutations(
+          BatchMutateContext context,
+          long now,
+          PhoenixIndexMetaData indexMetaData) throws Throwable {
+      context.postIndexUpdates = ArrayListMultimap.<HTableInterfaceReference, Mutation>create();
+      List<IndexMaintainer> maintainers = indexMetaData.getIndexMaintainers();
+      // Check if we need to skip post index update for any of the rows
+      for (IndexMaintainer indexMaintainer : maintainers) {
+          byte[] emptyCF = indexMaintainer.getEmptyKeyValueFamily().copyBytesIfNecessary();
+          byte[] emptyCQ = indexMaintainer.getEmptyKeyValueQualifier();
+          HTableInterfaceReference hTableInterfaceReference =
+                  new HTableInterfaceReference(new ImmutableBytesPtr(indexMaintainer.getIndexTableName()));
+          List <Pair<Mutation, byte[]>> updates = context.indexUpdates.get(hTableInterfaceReference);
+          for (Pair<Mutation, byte[]> update : updates) {
+              // Are there concurrent updates on the data table row? if so, skip post index updates
+              // and let read repair resolve conflicts
+              ImmutableBytesPtr rowKey = new ImmutableBytesPtr(update.getSecond());
+              PendingRow pendingRow = pendingRows.get(rowKey);
+              if (!pendingRow.isConcurrent()) {
+                  Mutation m = update.getFirst();
+                  if (m instanceof Put) {
+                      Put verifiedPut = new Put(m.getRow());
+                      // Set the status of the index row to "verified"
+                      verifiedPut.addColumn(emptyCF, emptyCQ, now, VERIFIED_BYTES);
+                      context.postIndexUpdates.put(hTableInterfaceReference, verifiedPut);
+                  }
+                  else {
+                      context.postIndexUpdates.put(hTableInterfaceReference, m);
+                  }
+
+              }
+          }
+      }
+      // We are done with handling concurrent mutations. So we can remove the rows of this batch from
+      // the collection of pending rows
+      removePendingRows(context);
+      context.indexUpdates.clear();
+  }
+
   public void preBatchMutateWithExceptions(ObserverContext<RegionCoprocessorEnvironment> c,
           MiniBatchOperationInProgress<Mutation> miniBatchOp) throws Throwable {
       ignoreAtomicOperations(miniBatchOp);
@@ -658,37 +695,15 @@
       }
       // Do the first phase index updates
       doPre(c, context, miniBatchOp);
-      context.postIndexUpdates = ArrayListMultimap.<HTableInterfaceReference, Mutation>create();
       if (!context.rebuild) {
-          List<IndexMaintainer> maintainers = indexMetaData.getIndexMaintainers();
           // Acquire the locks again before letting the region proceed with data table updates
           List<RowLock> rowLocks = Lists.newArrayListWithExpectedSize(context.rowLocks.size());
           for (RowLock rowLock : context.rowLocks) {
               rowLocks.add(lockManager.lockRow(rowLock.getRowKey(), rowLockWaitDuration));
           }
-          context.dataWriteStartTime = EnvironmentEdgeManager.currentTimeMillis();
           context.rowLocks.clear();
           context.rowLocks = rowLocks;
-          // Check if we need to skip post index update for any of the row
-          for (IndexMaintainer indexMaintainer : maintainers) {
-              HTableInterfaceReference hTableInterfaceReference =
-                      new HTableInterfaceReference(new ImmutableBytesPtr(indexMaintainer.getIndexTableName()));
-              Iterator<Pair<Mutation, byte[]>> iterator =
-                      context.intermediatePostIndexUpdates.get(hTableInterfaceReference).iterator();
-              while (iterator.hasNext()) {
-                  // Are there concurrent updates on the data table row? if so, skip post index updates
-                  // and let read repair resolve conflicts
-                  Pair<Mutation, byte[]> update = iterator.next();
-                  ImmutableBytesPtr rowKey = new ImmutableBytesPtr(update.getSecond());
-                  PendingRow pendingRow = pendingRows.get(rowKey);
-                  if (!pendingRow.isConcurrent()) {
-                      context.postIndexUpdates.put(hTableInterfaceReference, update.getFirst());
-                  }
-              }
-          }
-          // We are done with handling concurrent mutations. So we can remove the rows of this batch from
-          // the collection of pending rows
-          removePendingRows(context);
+          preparePostIndexMutations(context, now, indexMetaData);
       }
       if (failDataTableUpdatesForTesting) {
           throw new DoNotRetryIOException("Simulating the data table write failure");
@@ -721,23 +736,6 @@
           for (RowLock rowLock : context.rowLocks) {
               rowLock.release();
           }
-          // Sleep for one millisecond if we have done the data table updates in less than 1 ms. The sleep is necessary
-          // not to allow another data table write on the same row within the same ms. The sleep is very rare to happen.
-          // Assume that a data table write completes at timestamp t.  Now assume another write happens on the same data
-          // row at timestamp t+1. The mutation for unverifying the index table row(s) for the previous write will have
-          // timestamp t. If this happens before the last phase of the first write (with timestamp t) completes, then
-          // the index row(s) for the first write will have the verified status. We do not want this to happen as the
-          // updates for the first write are supposed to be overwritten (unverified) by the second write.
-
-          if (!context.rowLocks.isEmpty() && context.dataWriteStartTime == EnvironmentEdgeManager.currentTimeMillis()) {
-              try {
-                  Thread.sleep(1);
-              } catch (InterruptedException e) {
-                  wrapInDoNotRetryIOException("Thread sleep is interrupted after data write", e,
-                          EnvironmentEdgeManager.currentTimeMillis());
-              }
-              LOG.debug("After data write, slept 1ms for " + c.getEnvironment().getRegion().getRegionInfo().getTable().getNameAsString());
-          }
           this.builder.batchCompleted(miniBatchOp);
 
           if (success) { // The pre-index and data table updates are successful, and now, do post index updates
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/query/QueryServicesOptions.java b/phoenix-core/src/main/java/org/apache/phoenix/query/QueryServicesOptions.java
index 441cd36..8a2cbec 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/query/QueryServicesOptions.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/query/QueryServicesOptions.java
@@ -355,7 +355,7 @@
     public static final long DEFAULT_TASK_HANDLING_MAX_INTERVAL_MS = 30*60*1000; // 30 min
     public static final long DEFAULT_TASK_HANDLING_INITIAL_DELAY_MS = 10*1000; // 10 sec
 
-    public static final long DEFAULT_GLOBAL_INDEX_ROW_AGE_THRESHOLD_TO_DELETE_MS = 10*60*1000; /* 10 minutes */
+    public static final long DEFAULT_GLOBAL_INDEX_ROW_AGE_THRESHOLD_TO_DELETE_MS = 7*24*60*60*1000; /* 7 days */
     public static final boolean DEFAULT_INDEX_REGION_OBSERVER_ENABLED = true;
     public static final long DEFAULT_INDEX_REBUILD_PAGE_SIZE_IN_ROWS = 16*1024;