PHOENIX-5743 Concurrent read repairs on the same index row should be idempotent
diff --git a/phoenix-core/src/it/java/org/apache/phoenix/end2end/index/GlobalIndexCheckerIT.java b/phoenix-core/src/it/java/org/apache/phoenix/end2end/index/GlobalIndexCheckerIT.java
index 144bcf1..65262ec 100644
--- a/phoenix-core/src/it/java/org/apache/phoenix/end2end/index/GlobalIndexCheckerIT.java
+++ b/phoenix-core/src/it/java/org/apache/phoenix/end2end/index/GlobalIndexCheckerIT.java
@@ -300,6 +300,67 @@
     }
 
     @Test
+    public void testUnverifiedRowRepair() throws Exception {
+        if (async) {
+            // No need to run the same test twice one for async = true and the other for async = false
+            return;
+        }
+        try (Connection conn = DriverManager.getConnection(getUrl())) {
+            String dataTableName = generateUniqueName();
+            conn.createStatement().execute("create table " + dataTableName +
+                    " (id varchar(10) not null primary key, val1 varchar(10), val2 varchar(10), val3 varchar(10))" + tableDDLOptions);
+            String indexTableName = generateUniqueName();
+            conn.createStatement().execute("CREATE INDEX " + indexTableName + " on " +
+                    dataTableName + " (val1) include (val2, val3)");
+            // Configure IndexRegionObserver to fail the data write phase
+            IndexRegionObserver.setFailDataTableUpdatesForTesting(true);
+            conn.createStatement().execute("upsert into " + dataTableName + " (id, val1, val2) values ('a', 'ab','abc')");
+            commitWithException(conn);
+            // The above upsert will create an unverified index row
+            // Configure IndexRegionObserver to allow the data write phase
+            IndexRegionObserver.setFailDataTableUpdatesForTesting(false);
+            // Verify that this row is not visible
+            String selectSql = "SELECT * from " + dataTableName + " WHERE val1  = 'ab'";
+            // Verify that we will read from the index table
+            assertExplainPlan(conn, selectSql, dataTableName, indexTableName);
+            ResultSet rs = conn.createStatement().executeQuery(selectSql);
+            // Verify that we do not read from the unverified row
+            assertFalse(rs.next());
+            // Insert the same row with a value for val3
+            conn.createStatement().execute("upsert into " + dataTableName + " values ('a', 'ab','abc', 'abcd')");
+            conn.commit();
+            // At this moment val3 in the data table row should not have null value
+            selectSql = "SELECT val3 from " + dataTableName + " WHERE val1  = 'ab'";
+            // Verify that we will read from the index table
+            assertExplainPlan(conn, selectSql, dataTableName, indexTableName);
+            rs = conn.createStatement().executeQuery(selectSql);
+            assertTrue(rs.next());
+            assertEquals("abcd", rs.getString(1));
+            assertFalse(rs.next());
+            // Configure IndexRegionObserver to fail the data write phase
+            IndexRegionObserver.setFailDataTableUpdatesForTesting(true);
+            conn.createStatement().execute("upsert into " + dataTableName + " (id, val1, val2) values ('a', 'ab','abc')");
+            commitWithException(conn);
+            // The above upsert will create an unverified index row
+            // Configure IndexRegionObserver to allow the data write phase
+            IndexRegionObserver.setFailDataTableUpdatesForTesting(false);
+            // Verify that this row is still read back correctly
+            for (int i = 0; i < 2; i++) {
+                selectSql = "SELECT val3 from " + dataTableName + " WHERE val1  = 'ab'";
+                // Verify that we will read from the index table
+                assertExplainPlan(conn, selectSql, dataTableName, indexTableName);
+                rs = conn.createStatement().executeQuery(selectSql);
+                // Verify that the repair of the unverified row did not affect the valid index row
+                assertTrue(rs.next());
+                assertEquals("abcd", rs.getString(1));
+                assertFalse(rs.next());
+            }
+            // Add rows and check everything is still okay
+            verifyTableHealth(conn, dataTableName, indexTableName);
+        }
+    }
+
+    @Test
     public void testOnePhaseOverwiteFollowingTwoPhaseWrite() throws Exception {
         try (Connection conn = DriverManager.getConnection(getUrl())) {
             String dataTableName = generateUniqueName();
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/index/GlobalIndexChecker.java b/phoenix-core/src/main/java/org/apache/phoenix/index/GlobalIndexChecker.java
index 74ae7ce..a65d63c 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/index/GlobalIndexChecker.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/index/GlobalIndexChecker.java
@@ -247,18 +247,9 @@
             if ((EnvironmentEdgeManager.currentTimeMillis() - ts) > ageThreshold) {
                 Delete del = new Delete(indexRowKey, ts);
                 if (specific) {
-                    // Get all the cells of this row
-                    deleteRowScan.withStartRow(indexRowKey, true);
-                    deleteRowScan.withStopRow(indexRowKey, true);
-                    deleteRowScan.setTimeRange(0, ts + 1);
-                    deleteRowScanner = region.getScanner(deleteRowScan);
-                    row.clear();
-                    deleteRowScanner.next(row);
-                    deleteRowScanner.close();
-                    // We are deleting a specific version of a row so the flowing loop is for that
-                    for (Cell cell : row) {
-                        del.addColumn(CellUtil.cloneFamily(cell), CellUtil.cloneQualifier(cell), cell.getTimestamp());
-                    }
+                    del.addFamilyVersion(indexMaintainer.getEmptyKeyValueFamily().copyBytesIfNecessary(), ts);
+                } else {
+                    del.addFamily(indexMaintainer.getEmptyKeyValueFamily().copyBytesIfNecessary(), ts);
                 }
                 Mutation[] mutations = new Mutation[]{del};
                 region.batchMutate(mutations);