PHOENIX-5791 Eliminate false invalid row detection due to concurrent updates
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/IndexRebuildRegionScanner.java b/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/IndexRebuildRegionScanner.java
index 912945e..ac557ca 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/IndexRebuildRegionScanner.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/IndexRebuildRegionScanner.java
@@ -524,11 +524,13 @@
                     errorMsg, null, null);
             return false;
         }
+        int expectedCellCount = 0;
         for (List<Cell> cells : expected.getFamilyCellMap().values()) {
             if (cells == null) {
                 continue;
             }
             for (Cell expectedCell : cells) {
+                expectedCellCount++;
                 byte[] family = CellUtil.cloneFamily(expectedCell);
                 byte[] qualifier = CellUtil.cloneQualifier(expectedCell);
                 Cell actualCell = getCell(actual, family, qualifier);
@@ -540,14 +542,6 @@
                     return false;
                 }
                 if (!CellUtil.matchingValue(actualCell, expectedCell)) {
-                    if (verifyType == IndexTool.IndexVerifyType.ONLY &&
-                            (Bytes.compareTo(family, indexMaintainer.getEmptyKeyValueFamily().copyBytesIfNecessary()) == 0 &&
-                            Bytes.compareTo(qualifier, indexMaintainer.getEmptyKeyValueQualifier()) == 0) &&
-                            (Bytes.compareTo(actualCell.getValueArray(), actualCell.getValueOffset(), actualCell.getValueLength(),
-                                UNVERIFIED_BYTES, 0, UNVERIFIED_BYTES.length) == 0)) {
-                        // We do not flag this as mismatch as we can have unverified but still valid rows
-                        continue;
-                    }
                     String errorMsg = "Not matching value (in iteration " + iteration + ") for " + Bytes.toString(family) + ":" + Bytes.toString(qualifier);
                     byte[] dataKey = indexMaintainer.buildDataRowKey(new ImmutableBytesWritable(expected.getRow()), viewConstants);
                     logToIndexToolOutputTable(dataKey, expected.getRow(), getTimestamp(expected), getTimestamp(actual),
@@ -556,6 +550,20 @@
                 }
             }
         }
+        int actualCellCount = 0;
+        for (List<Cell> cells : actual.getFamilyCellMap().values()) {
+            if (cells == null) {
+                continue;
+            }
+            actualCellCount += cells.size();
+        }
+        if (expectedCellCount != actualCellCount) {
+            String errorMsg = "Index has extra cells (in iteration " + iteration + ")";
+            byte[] dataKey = indexMaintainer.buildDataRowKey(new ImmutableBytesWritable(expected.getRow()), viewConstants);
+            logToIndexToolOutputTable(dataKey, expected.getRow(), getTimestamp(expected), getTimestamp(actual),
+                    errorMsg);
+            return false;
+        }
         return true;
     }
 
@@ -639,39 +647,138 @@
         }
         return getMutationsWithSameTS(put, del);
     }
+    /**
+     * In this method, the actual list is repaired in memory using the expected list which is actually the output of
+     * rebuilding the index table row. The result of this repair is used only for verification.
+     */
+    private void repairActualMutationList(List<Mutation> actualMutationList, List<Mutation> expectedMutationList)
+            throws IOException {
+        // Find the first (latest) actual unverified put mutation
+        List<Mutation> repairedMutationList = new ArrayList<>(expectedMutationList.size());
+        for (Mutation actual : actualMutationList) {
+            if (actual instanceof Put && !isVerified((Put) actual)) {
+                long ts = getTimestamp(actual);
+                int expectedIndex;
+                int expectedListSize = expectedMutationList.size();
+                for (expectedIndex = 0; expectedIndex < expectedListSize; expectedIndex++) {
+                    if (getTimestamp(expectedMutationList.get(expectedIndex)) <= ts) {
+                        if (expectedIndex > 0) {
+                            expectedIndex--;
+                        }
+                        break;
+                    }
+                }
+                if (expectedIndex == expectedListSize) {
+                    continue;
+                }
+                for (; expectedIndex < expectedListSize; expectedIndex++) {
+                    Mutation mutation = expectedMutationList.get(expectedIndex);
+                    if (mutation instanceof Put) {
+                        mutation = new Put((Put) mutation);
+                    } else {
+                        mutation = new Delete((Delete) mutation);
+                    }
+                    repairedMutationList.add(mutation);
+                }
+                // Since we repair the entire history, there is no need to more than once
+                break;
+            }
+        }
+        if (repairedMutationList.isEmpty()) {
+            return;
+        }
+        actualMutationList.addAll(repairedMutationList);
+        Collections.sort(actualMutationList, MUTATION_TS_DESC_COMPARATOR);
+    }
+
+    private void cleanUpActualMutationList(List<Mutation> actualMutationList)
+            throws IOException {
+        Iterator<Mutation> iterator = actualMutationList.iterator();
+        Mutation previous = null;
+        while (iterator.hasNext()) {
+            Mutation mutation = iterator.next();
+            if ((mutation instanceof Put && !isVerified((Put) mutation)) ||
+                    (mutation instanceof Delete && isDeleteFamilyVersion(mutation))) {
+                iterator.remove();
+            } else {
+                if (previous != null && getTimestamp(previous) == getTimestamp(mutation) &&
+                        ((previous instanceof Put && mutation instanceof Put) ||
+                                previous instanceof Delete && mutation instanceof Delete)) {
+                    iterator.remove();
+                } else {
+                    previous = mutation;
+                }
+            }
+        }
+    }
 
     /**
-     * indexRow is the set of all cells of all the row version of an index row from the index table. These are actual
-     * cells. We group these cells based on timestamp and type (put vs delete), and form the actual set of
-     * index mutations. indexKeyToMutationMap is a map from an index row key to a set of mutations that are generated
-     * using the rebuild process (i.e., by replaying raw data table mutations). These sets are sets of expected
-     * index mutations, one set for each index row key. Since not all mutations in the index table have both phase
-     * (i.e., pre and post data phase) mutations, we cannot compare actual index mutations with expected one by one
-     * and expect to find them identical. We need to consider concurrent data mutation effects, data table row write
-     * failures, post index write failures. Thus, we need to allow some expected and actual mutations to be skipped
-     * during comparing actual mutations to index mutations.
+     * There are two types of verification: without repair and with repair. Without-repair verification is done before
+     * or after index rebuild. It is done before index rebuild to identify the rows to be rebuilt. It is done after
+     * index rebuild to verify the rows that have been rebuilt. With-repair verification can be done anytime using
+     * the “-v ONLY” option to check the consistency of the index table. Note that with-repair verification simulates
+     * read repair in-memory for the purpose of verification, but does not actually repair the data in the index.
      *
-     * The main idea for the verification algorithm used here is to match every expected verified put with an actual
-     * put such that these two mutations are the same except that actual mutation can be unverified.
+     * Unverified Rows
      *
-     * Some background on why we can skip some of the actual unverified puts and delete markers due to concurrent data
-     * table updates is as follows:
+     * For each mutable data table mutation during regular data table updates, two operations are done on the data table.
+     * One is to read the existing row state, and the second is to update the data table for this row. The processing of
+     * concurrent data mutations are serialized once for reading the existing row states, and then serialized again
+     * for updating the data table. In other words, they go through locking twice, i.e., [lock, read, unlock] and
+     * [lock, write, unlock]. Because of this two phase locking, for a pair of concurrent mutations (for the same row),
+     * the same row state can be read from the data table. This means the same existing index row can be made unverified
+     * twice with different timestamps, one for each concurrent mutation. These unverified mutations can be repaired
+     * from the data table later during HBase scans using the index read repair process. This is one of the reasons
+     * for having extra unverified rows in the index table. The other reason is the data table write failures.
+     * When a data table write fails, it leaves an unverified index row behind. These rows are never returned to clients,
+     * instead they are repaired, which means either they are rebuilt from their data table rows or they are deleted if
+     * their data table rows do not exist.
      *
-     * For each data table mutation, two operations are done on the data table. One is to read the existing row state,
-     * and the second is to write to the data table. The processing of concurrent data mutations are serialized once
-     * for reading the existing row states, and then serialized again for updating data table. In other words,
-     * they go through locking twice, i.e., [lock, read, unlock] and [lock, write, unlock]. Because of this two phase
-     * locking, for a pair of concurrent mutations (for the same row), the same row state can be read from the data
-     * table. This means the same existing index row can be made unverified twice with different timestamps, one for
-     * each concurrent mutation. These unverified mutation are then repaired from the data table. Since expected
-     * mutations are used for rebuild (which is also used by the read repair), skipping these unverified put mutations
-     * that are not matching with expected mutation are safe as they will go through the same process during
-     * read repair and will be skipped and eventually cleaned up by the read repair. We can skip the delete markers
-     * safely too as they are placed to clean up these unverified mutations. When the data table rows are rebuilt,
-     * the rebuild process generates the delete family markers. The timestamp of delete markers are the timestamp of
-     * the data table mutation for which the delete marker is added. Thus, the timestamp of these delete markers will be
-     * higher than the timestamp of index row to be deleted.
+     * Delete Family Version Markers
+     *
+     * The family version delete markers are generated by the read repair to remove extra unverified rows. They only
+     * show up in the actual mutation list since they are not generated for regular table updates or index rebuilds.
+     * For the verification purpose, these delete markers can be treated as extra unverified rows and can be safely
+     * skipped.
+     *
+     * Delete Family Markers
+     * Delete family markers are generated during read repair, regular table updates and index rebuilds to delete index
+     * table rows. The read repair generates them to delete extra unverified rows. During regular table updates or
+     * index rebuilds, the delete family markers are used to delete index rows due to data table row deletes or
+     * data table row overwrites.
+     *
+     * Verification Algorithm
+     *
+     * IndexTool verification generates an expected list of index mutations from the data table rows and uses this list
+     * to check if index table rows are consistent with the data table.
+     *
+     * The expect list is generated using the index rebuild algorithm. This mean for a given row, the list can include
+     * a number of put and delete mutations such that the followings hold:
+     *
+     * Every mutation will include a set of cells with the same timestamp
+     * Every mutation has a different timestamp
+     * A delete mutation will include only delete family cells and it is for deleting the entire row and its versions
+     * Every put mutation is verified
+     *
+     * For both verification types, after the expected list of index mutations is constructed for a given data table,
+     * another list called the actual list of index mutations is constructed by reading the index table row using HBase
+     * raw scan and all versions of the cells of the row are retrieved.
+     *
+     * As in the construction for the expected list, the cells are grouped into a put and a delete set. The put and
+     * delete sets for a given row are further grouped based on their timestamps into put and delete mutations such that
+     * all the cells in a mutation have the timestamps. The put and delete mutations are then sorted within a single
+     * list. Mutations in this list are sorted in ascending order of their timestamp. This list is the actual list.
+     *
+     * For the without-repair verification, unverified mutations and family version delete markers are removed from
+     * the actual list and then the list is compared with the expected list.
+     *
+     * In case of the with-repair verification, the actual list is first repaired, then unverified mutations and family
+     * version delete markers are removed from the actual list and finally the list is compared with the expected list.
+     *
+     * The actual list is repaired as follows: Every unverified mutation is repaired using the method read repair uses.
+     * However, instead of going through actual repair implementation, the expected mutations are used for repair.
      */
+
     @VisibleForTesting
     public boolean verifySingleIndexRow(Result indexRow, IndexToolVerificationResult.PhaseResult verificationPhaseResult)
             throws IOException {
@@ -685,6 +792,10 @@
         }
         Collections.sort(expectedMutationList, MUTATION_TS_DESC_COMPARATOR);
         Collections.sort(actualMutationList, MUTATION_TS_DESC_COMPARATOR);
+        if (verifyType == IndexTool.IndexVerifyType.ONLY) {
+            repairActualMutationList(actualMutationList, expectedMutationList);
+        }
+        cleanUpActualMutationList(actualMutationList);
         long currentTime = EnvironmentEdgeManager.currentTime();
         int actualIndex = 0;
         int expectedIndex = 0;
@@ -708,49 +819,30 @@
             }
             actual = actualMutationList.get(actualIndex);
             if (expected instanceof Put) {
-                if (previousExpected == null || previousExpected instanceof Put) {
-                    // This expected put is either the first mutation or a put just comes after another expected mutation
-                    // on the expected mutation list which is sorted by the mutation timestamps. The cell timestamps
-                    // within each mutation here are the same.
-                    // Go down the list of actual mutations and find the corresponding actual put mutation with the same
-                    // timestamp. Stop if a verified put or delete family mutation is encountered on the way. Skip
-                    // unverified puts or delete family version delete markers.
-                    while (getTimestamp(actual) > getTimestamp(expected) &&
-                            ((actual instanceof Put && !isVerified((Put) actual)) ||
-                                    (actual instanceof Delete && isDeleteFamilyVersion(actual)))) {
+                if (previousExpected instanceof Delete) {
+                    // Between an expected delete and put, there can be one or more deletes due to
+                    // concurrent mutations or data table write failures. Skip all of them if any
+                    while (getTimestamp(actual) > getTimestamp(expected) && (actual instanceof Delete)) {
                         actualIndex++;
                         if (actualIndex == actualSize) {
                             break;
                         }
                         actual = actualMutationList.get(actualIndex);
                     }
-                } else { // previousExpected instanceof Delete
-                    // Between an expected delete and put, there cannot be any types of mutation even verified put
-                    while (getTimestamp(actual) > getTimestamp(expected)) {
-                        actualIndex++;
-                        if (actualIndex == actualSize) {
-                            break;
-                        }
-                        actual = actualMutationList.get(actualIndex);
+                    if (actualIndex == actualSize) {
+                        break;
                     }
                 }
-                if (actualIndex == actualSize) {
-                    break;
-                }
-                // Now the expected and actual mutations should match
                 if (isMatchingMutation(expected, actual, expectedIndex)) {
                     expectedIndex++;
                     actualIndex++;
                     matchingCount++;
                     continue;
                 }
-                verificationPhaseResult.invalidIndexRowCount++;
-                return false;
             } else { // expected instanceof Delete
-                // Between put and delete, delete and delete, or before first delete, there can be other deletes and
-                // unverified puts. Skip all of them if any
-                while (getTimestamp(actual) > getTimestamp(expected) &&
-                        ((actual instanceof Put && !isVerified((Put) actual)) || actual instanceof Delete)) {
+                // Between put and delete, delete and delete, or before first delete, there can be other deletes.
+                // Skip all of them if any
+                while (getTimestamp(actual) > getTimestamp(expected) && actual instanceof Delete) {
                     actualIndex++;
                     if (actualIndex == actualSize) {
                         break;
@@ -760,34 +852,34 @@
                 if (actualIndex == actualSize) {
                     break;
                 }
-                // If this is first expected mutation is delete, there should be an actual delete mutation with the
-                // same timestamp or an unverified put with the same or older timestamp
                 if (getTimestamp(actual) == getTimestamp(expected) &&
                         (actual instanceof Delete && isDeleteFamily(actual))) {
                     expectedIndex++;
                     actualIndex++;
                     matchingCount++;
                     continue;
-                } else if (getTimestamp(actual) <= getTimestamp(expected) &&
-                        (actual instanceof Put && !isVerified((Put) actual))) {
-                    expectedIndex++;
-                    if (previousExpected == null) {
-                        matchingCount++;
-                    }
-                    continue;
                 }
-                if (previousExpected == null) {
-                    String errorMsg = "First delete check failure";
-                    byte[] dataKey = indexMaintainer.buildDataRowKey(new ImmutableBytesWritable(indexRow.getRow()), viewConstants);
-                    logToIndexToolOutputTable(dataKey, indexRow.getRow(),
-                            getTimestamp(expected),
-                            getTimestamp(actual), errorMsg);
-                    verificationPhaseResult.invalidIndexRowCount++;
-                    return false;
-                }
+                String errorMsg = "Delete check failure";
+                byte[] dataKey = indexMaintainer.buildDataRowKey(new ImmutableBytesWritable(indexRow.getRow()), viewConstants);
+                logToIndexToolOutputTable(dataKey, indexRow.getRow(),
+                        getTimestamp(expected),
+                        getTimestamp(actual), errorMsg);
             }
+            verificationPhaseResult.invalidIndexRowCount++;
+            return false;
         }
         if ((expectedIndex != expectedSize) || actualIndex != actualSize) {
+            for (; expectedIndex < expectedSize; expectedIndex++) {
+                expected = expectedMutationList.get(expectedIndex);
+                // Check if cell expired as per the current server's time and data table ttl
+                // Index table should have the same ttl as the data table, hence we might not
+                // get a value back from index if it has already expired between our rebuild and
+                // verify
+                // TODO: have a metric to update for these cases
+                if (isTimestampBeforeTTL(currentTime, getTimestamp(expected))) {
+                    verificationPhaseResult.expiredIndexRowCount++;
+                }
+            }
             if (matchingCount > 0) {
                 if (verifyType != IndexTool.IndexVerifyType.ONLY) {
                     // We do not consider this as a verification issue but log it for further information.
@@ -803,8 +895,7 @@
                 byte[] dataKey = indexMaintainer.buildDataRowKey(new ImmutableBytesWritable(indexRow.getRow()), viewConstants);
                 String errorMsg = "Not matching index row";
                 logToIndexToolOutputTable(dataKey, indexRow.getRow(),
-                        getTimestamp(expectedMutationList.get(0)),
-                        getTimestamp(actualMutationList.get(0)), errorMsg);
+                        getTimestamp(expectedMutationList.get(0)), 0L, errorMsg);
                 verificationPhaseResult.invalidIndexRowCount++;
                 return false;
             }
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/UngroupedAggregateRegionObserver.java b/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/UngroupedAggregateRegionObserver.java
index fa42ce9..76ef06d 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/UngroupedAggregateRegionObserver.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/UngroupedAggregateRegionObserver.java
@@ -1060,7 +1060,7 @@
 
     private RegionScanner rebuildIndices(final RegionScanner innerScanner, final Region region, final Scan scan,
                                          final RegionCoprocessorEnvironment env) throws IOException {
-        if (!scan.isRaw() && scan.getAttribute(BaseScannerRegionObserver.INDEX_ROW_KEY) == null) {
+        if (!scan.isRaw()) {
             Scan rawScan = new Scan(scan);
             rawScan.setRaw(true);
             rawScan.setMaxVersions();
diff --git a/phoenix-core/src/test/java/org/apache/phoenix/index/VerifySingleIndexRowTest.java b/phoenix-core/src/test/java/org/apache/phoenix/index/VerifySingleIndexRowTest.java
index 2506609..013578b 100644
--- a/phoenix-core/src/test/java/org/apache/phoenix/index/VerifySingleIndexRowTest.java
+++ b/phoenix-core/src/test/java/org/apache/phoenix/index/VerifySingleIndexRowTest.java
@@ -41,6 +41,7 @@
 import org.apache.phoenix.util.PhoenixRuntime;
 import org.apache.phoenix.util.SchemaUtil;
 import org.junit.Before;
+import org.junit.Ignore;
 import org.junit.Rule;
 import org.junit.Test;
 import org.junit.rules.ExpectedException;
@@ -542,7 +543,7 @@
         case INVALID_CELL_VALUE:
             if (CellUtil.matchingQualifier(c, EMPTY_COLUMN_BYTES)) {
                 newCell = getCellWithPut(c);
-                emptyCell = getUnverifiedEmptyCell(c);
+                emptyCell = getVerifiedEmptyCell(c);
                 newCellList.add(newCell);
                 newCellList.add(emptyCell);
             } else {
@@ -562,24 +563,24 @@
             break;
         case INVALID_EXTRA_CELL:
             newCell = getCellWithPut(c);
-            emptyCell = getUnverifiedEmptyCell(c);
+            emptyCell = getVerifiedEmptyCell(c);
             newCellList.add(newCell);
             newCellList.add(emptyCell);
             newCellList.add(c);
         }
     }
 
-    private Cell getUnverifiedEmptyCell(Cell c) {
+    private Cell getVerifiedEmptyCell(Cell c) {
         return CellUtil.createCell(CellUtil.cloneRow(c), CellUtil.cloneFamily(c),
                 indexMaintainer.getEmptyKeyValueQualifier(),
                 EnvironmentEdgeManager.currentTimeMillis(),
-                KeyValue.Type.Put.getCode(), UNVERIFIED_BYTES);
+                KeyValue.Type.Put.getCode(), VERIFIED_BYTES);
     }
 
     private Cell getCellWithPut(Cell c) {
         return CellUtil.createCell(CellUtil.cloneRow(c),
                 CellUtil.cloneFamily(c), Bytes.toBytes(INCLUDED_COLUMN),
-                EnvironmentEdgeManager.currentTimeMillis(), KeyValue.Type.Put.getCode(),
+                c.getTimestamp(), KeyValue.Type.Put.getCode(),
                 Bytes.toBytes("zxcv"));
     }