PHOENIX-5973 - Stabilize and speed up IndexToolForNonTxGlobalIndexIT (#809)

diff --git a/phoenix-core/src/it/java/org/apache/phoenix/end2end/IndexToolForNonTxGlobalIndexIT.java b/phoenix-core/src/it/java/org/apache/phoenix/end2end/IndexToolForNonTxGlobalIndexIT.java
index a81ead8..1ddab5a 100644
--- a/phoenix-core/src/it/java/org/apache/phoenix/end2end/IndexToolForNonTxGlobalIndexIT.java
+++ b/phoenix-core/src/it/java/org/apache/phoenix/end2end/IndexToolForNonTxGlobalIndexIT.java
@@ -19,7 +19,9 @@
 
 import com.google.common.collect.Maps;
 import org.apache.hadoop.hbase.Cell;
+import org.apache.hadoop.hbase.DoNotRetryIOException;
 import org.apache.hadoop.hbase.HColumnDescriptor;
+import org.apache.hadoop.hbase.HTableDescriptor;
 import org.apache.hadoop.hbase.KeyValue;
 import org.apache.hadoop.hbase.TableName;
 import org.apache.hadoop.hbase.client.Admin;
@@ -32,11 +34,14 @@
 import org.apache.hadoop.hbase.client.ResultScanner;
 import org.apache.hadoop.hbase.client.Scan;
 import org.apache.hadoop.hbase.client.Table;
+import org.apache.hadoop.hbase.coprocessor.BaseRegionObserver;
+import org.apache.hadoop.hbase.coprocessor.ObserverContext;
+import org.apache.hadoop.hbase.coprocessor.RegionCoprocessorEnvironment;
+import org.apache.hadoop.hbase.regionserver.RegionScanner;
 import org.apache.hadoop.hbase.regionserver.ScanInfoUtil;
 import org.apache.hadoop.hbase.util.Bytes;
 import org.apache.hadoop.hbase.util.Pair;
 import org.apache.phoenix.coprocessor.IndexRebuildRegionScanner;
-import org.apache.phoenix.hbase.index.IndexRegionObserver;
 import org.apache.phoenix.jdbc.PhoenixConnection;
 import org.apache.phoenix.mapreduce.index.IndexTool;
 import org.apache.phoenix.mapreduce.index.IndexVerificationOutputRepository;
@@ -78,6 +83,7 @@
 import static org.apache.phoenix.mapreduce.PhoenixJobCounters.INPUT_RECORDS;
 import static org.apache.phoenix.mapreduce.index.IndexVerificationResultRepository.INDEX_TOOL_RUN_STATUS_BYTES;
 import static org.apache.phoenix.mapreduce.index.IndexVerificationResultRepository.RESULT_TABLE_COLUMN_FAMILY;
+import static org.apache.phoenix.mapreduce.index.IndexVerificationResultRepository.RESULT_TABLE_NAME;
 import static org.apache.phoenix.mapreduce.index.IndexVerificationResultRepository.RESULT_TABLE_NAME_BYTES;
 import static org.apache.phoenix.mapreduce.index.IndexVerificationResultRepository.ROW_KEY_SEPARATOR;
 import static org.apache.phoenix.mapreduce.index.IndexVerificationResultRepository.RUN_STATUS_EXECUTED;
@@ -142,13 +148,20 @@
                 QueryServicesOptions.DEFAULT_EXTRA_JDBC_ARGUMENTS);
         serverProps.put(QueryServices.INDEX_REBUILD_PAGE_SIZE_IN_ROWS, Long.toString(8));
         serverProps.put(ScanInfoUtil.PHOENIX_MAX_LOOKBACK_AGE_CONF_KEY, Long.toString(3600));
+        serverProps.put(QueryServices.TASK_HANDLING_INTERVAL_MS_ATTRIB,
+            Long.toString(Long.MAX_VALUE));
+        serverProps.put(QueryServices.TASK_HANDLING_INITIAL_DELAY_MS_ATTRIB,
+            Long.toString(Long.MAX_VALUE));
         Map<String, String> clientProps = Maps.newHashMapWithExpectedSize(2);
         clientProps.put(QueryServices.USE_STATS_FOR_PARALLELIZATION, Boolean.toString(true));
         clientProps.put(QueryServices.STATS_UPDATE_FREQ_MS_ATTRIB, Long.toString(5));
         clientProps.put(QueryServices.TRANSACTIONS_ENABLED, Boolean.TRUE.toString());
         clientProps.put(QueryServices.FORCE_ROW_KEY_ORDER_ATTRIB, Boolean.TRUE.toString());
+        destroyDriver();
         setUpTestDriver(new ReadOnlyProps(serverProps.entrySet().iterator()),
                 new ReadOnlyProps(clientProps.entrySet().iterator()));
+        //IndexToolIT.runIndexTool pulls from the minicluster's config directly
+        getUtility().getConfiguration().set(QueryServices.INDEX_REBUILD_RPC_RETRIES_COUNTER, "1");
     }
 
     @After
@@ -290,6 +303,9 @@
 
     @Test
     public void testSecondaryGlobalIndexFailure() throws Exception {
+        if (!mutable) {
+            return; //nothing in this test is mutable specific, so no need to run twice
+        }
         String schemaName = generateUniqueName();
         String dataTableName = generateUniqueName();
         String dataTableFullName = SchemaUtil.getTableName(schemaName, dataTableName);
@@ -301,6 +317,8 @@
                             + " (ID INTEGER NOT NULL PRIMARY KEY, NAME VARCHAR, ZIP INTEGER) "
                             + tableDDLOptions;
             conn.createStatement().execute(stmString1);
+            conn.commit();
+
             String upsertQuery = String.format("UPSERT INTO %s VALUES(?, ?, ?)", dataTableFullName);
             PreparedStatement stmt1 = conn.prepareStatement(upsertQuery);
 
@@ -311,12 +329,9 @@
 
             String stmtString2 =
                     String.format(
-                            "CREATE INDEX %s ON %s  (LPAD(UPPER(NAME, 'en_US'),8,'x')||'_xyz') ASYNC ", indexTableName, dataTableFullName);
+                            "CREATE INDEX %s ON %s  (LPAD(UPPER(NAME, 'en_US'),8,'x')||'_xyz') ", indexTableName, dataTableFullName);
             conn.createStatement().execute(stmtString2);
-
-            // Run the index MR job.
-            IndexToolIT.runIndexTool(directApi, useSnapshot, schemaName, dataTableName, indexTableName);
-
+            conn.commit();
             String qIndexTableName = SchemaUtil.getQualifiedTableName(schemaName, indexTableName);
 
             // Verify that the index table is in the ACTIVE state
@@ -324,9 +339,12 @@
 
             ConnectionQueryServices queryServices = conn.unwrap(PhoenixConnection.class).getQueryServices();
             Admin admin = queryServices.getAdmin();
-            TableName tableName = TableName.valueOf(qIndexTableName);
-            admin.disableTable(tableName);
-
+            TableName tn = TableName.valueOf(Bytes.toBytes(dataTableFullName));
+            HTableDescriptor td =
+                admin.getTableDescriptor(tn);
+            //add the fast fail coproc and make sure it goes first
+            td.addCoprocessor(FastFailRegionObserver.class.getName(), null, 1, null);
+            admin.modifyTable(tn, td);
             // Run the index MR job and it should fail (return -1)
             IndexToolIT.runIndexTool(directApi, useSnapshot, schemaName, dataTableName, indexTableName,
                     null, -1, new String[0]);
@@ -520,7 +538,8 @@
             Assert.assertEquals(PIndexState.BUILDING, TestUtil.getIndexState(conn, indexTableFullName));
 
             // Delete the output table for the next test
-            IndexToolIT.dropIndexToolTables(conn);
+            deleteAllRows(conn,
+                TableName.valueOf(IndexVerificationOutputRepository.OUTPUT_TABLE_NAME));
             // Run the index tool to populate the index while verifying rows
             IndexToolIT.runIndexTool(directApi, useSnapshot, schemaName, dataTableName, indexTableName,
                     null, 0, IndexTool.IndexVerifyType.AFTER);
@@ -570,9 +589,7 @@
 
             verifyRunStatusFromResultTable(conn, scn, indexTableFullName, 5, expectedStatus);
 
-            conn.createStatement().execute( "DELETE FROM "+indexTableFullName);
-            conn.commit();
-            TestUtil.doMajorCompaction(conn, indexTableFullName);
+            deleteAllRows(conn, TableName.valueOf(indexTableFullName));
 
             expectedStatus.set(0, RUN_STATUS_SKIPPED);
             expectedStatus.set(1, RUN_STATUS_SKIPPED);
@@ -595,7 +612,7 @@
 
     @Test
     public void testIndexToolForIncrementalVerify() throws Exception {
-        ManualEnvironmentEdge customeEdge = new ManualEnvironmentEdge();
+        ManualEnvironmentEdge customEdge = new ManualEnvironmentEdge();
         String schemaName = generateUniqueName();
         String dataTableName = generateUniqueName();
         String viewName = generateUniqueName();
@@ -619,95 +636,106 @@
             conn.createStatement().execute(String.format(
                     "CREATE INDEX "+indexTableName+" ON "+dataTableFullName+" (val3) INCLUDE(val5)"));
 
-            customeEdge.setValue(EnvironmentEdgeManager.currentTimeMillis());
-            EnvironmentEdgeManager.injectEdge(customeEdge);
-            long t0 = customeEdge.currentTime();
-            customeEdge.incrementValue(waitForUpsert);
-            conn.createStatement().execute("UPSERT INTO "+dataTableFullName+"(key1, key2, val1, val2) VALUES (4,5,'abc',3)");
-            customeEdge.incrementValue(waitForUpsert);
-            long t1 = customeEdge.currentTime();
-            customeEdge.incrementValue(waitForUpsert);
-            conn.createStatement().execute("UPSERT INTO "+dataTableFullName+"(key1, key2, val1, val2) VALUES (1,2,'abc',3)");
-            customeEdge.incrementValue(waitForUpsert);
-            long t2 = customeEdge.currentTime();
-            customeEdge.incrementValue(waitForUpsert);
-            conn.createStatement().execute("UPSERT INTO "+dataTableFullName+"(key1, key2, val3, val4) VALUES (1,2,4,1.2)");
-            customeEdge.incrementValue(waitForUpsert);
-            long t3 = customeEdge.currentTime();
-            customeEdge.incrementValue(waitForUpsert);
-            conn.createStatement().execute("UPSERT INTO "+dataTableFullName+"(key1, key2, val5, val6) VALUES (1,2,5,'def')");
-            customeEdge.incrementValue(waitForUpsert);
-            long t4 = customeEdge.currentTime();
-            customeEdge.incrementValue(waitForUpsert);
-            conn.createStatement().execute("DELETE FROM "+dataTableFullName+" WHERE key1=4");
-            customeEdge.incrementValue(waitForUpsert);
-            long t5 = customeEdge.currentTime();
-            customeEdge.incrementValue(10);
-            long t6 = customeEdge.currentTime();
+            customEdge.setValue(EnvironmentEdgeManager.currentTimeMillis());
+            EnvironmentEdgeManager.injectEdge(customEdge);
+            long t0 = customEdge.currentTime();
+            customEdge.incrementValue(waitForUpsert);
+            conn.createStatement().execute("UPSERT INTO "+viewFullName+"(key1, key2, val1, val2) VALUES (4,5,'abc',3)");
+            customEdge.incrementValue(waitForUpsert);
+            long t1 = customEdge.currentTime();
+            customEdge.incrementValue(waitForUpsert);
+            conn.createStatement().execute("UPSERT INTO "+viewFullName+"(key1, key2, val1, val2) VALUES (1,2,'abc',3)");
+            customEdge.incrementValue(waitForUpsert);
+            long t2 = customEdge.currentTime();
+            customEdge.incrementValue(waitForUpsert);
+            conn.createStatement().execute("UPSERT INTO "+viewFullName+"(key1, key2, val3, val4) VALUES (1,2,4,1.2)");
+            customEdge.incrementValue(waitForUpsert);
+            long t3 = customEdge.currentTime();
+            customEdge.incrementValue(waitForUpsert);
+            conn.createStatement().execute("UPSERT INTO "+viewFullName+"(key1, key2, val5, val6) VALUES (1,2,5,'def')");
+            customEdge.incrementValue(waitForUpsert);
+            long t4 = customEdge.currentTime();
+            customEdge.incrementValue(waitForUpsert);
+            conn.createStatement().execute("DELETE FROM "+viewFullName+" WHERE key1=4");
+            customEdge.incrementValue(waitForUpsert);
+            long t5 = customEdge.currentTime();
+            customEdge.incrementValue(10);
+            long t6 = customEdge.currentTime();
             IndexTool it;
             if(!mutable) {
                 // job with 2 rows
                 it = IndexToolIT.runIndexTool(directApi, useSnapshot, schemaName, dataTableName, indexTableName,
                         null, 0, IndexTool.IndexVerifyType.ONLY, "-st", String.valueOf(t0),"-et", String.valueOf(t2));
                 verifyCounters(it, 2, 2);
+                //increment time between rebuilds so that PHOENIX_INDEX_TOOL and
+                // PHOENIX_INDEX_TOOL_RESULT tables get unique keys for each run
+                customEdge.incrementValue(waitForUpsert);
 
                 // only one row
                 it = IndexToolIT.runIndexTool(directApi, useSnapshot, schemaName, dataTableName, indexTableName,
                         null, 0, IndexTool.IndexVerifyType.ONLY, "-st", String.valueOf(t1),"-et", String.valueOf(t2));
                 verifyCounters(it, 1, 1);
-
+                customEdge.incrementValue(waitForUpsert);
                 // no rows
                 it = IndexToolIT.runIndexTool(directApi, useSnapshot, schemaName, dataTableName, indexTableName,
                         null, 0, IndexTool.IndexVerifyType.ONLY, "-st", String.valueOf(t5),"-et", String.valueOf(t6));
                 verifyCounters(it, 0, 0);
-
+                customEdge.incrementValue(waitForUpsert);
                 //view index
                 // job with 2 rows
                 it = IndexToolIT.runIndexTool(directApi, useSnapshot, schemaName, viewName, viewIndexName,
                         null, 0, IndexTool.IndexVerifyType.ONLY, "-st", String.valueOf(t0),"-et", String.valueOf(t2));
                 verifyCounters(it, 2, 2);
-
+                customEdge.incrementValue(waitForUpsert);
                 // only one row
                 it = IndexToolIT.runIndexTool(directApi, useSnapshot, schemaName, viewName, viewIndexName,
                         null, 0, IndexTool.IndexVerifyType.ONLY, "-st", String.valueOf(t1),"-et", String.valueOf(t2));
                 verifyCounters(it, 1, 1);
-
+                customEdge.incrementValue(waitForUpsert);
                 // no rows
                 it = IndexToolIT.runIndexTool(directApi, useSnapshot, schemaName, viewName, viewIndexName,
                         null, 0, IndexTool.IndexVerifyType.ONLY, "-st", String.valueOf(t5),"-et", String.valueOf(t6));
                 verifyCounters(it, 0, 0);
-
+                customEdge.incrementValue(waitForUpsert);
                 return;
             }
             // regular job without delete row
             it = IndexToolIT.runIndexTool(directApi, useSnapshot, schemaName, dataTableName, indexTableName,
                     null, 0, IndexTool.IndexVerifyType.ONLY, "-st", String.valueOf(t0),"-et", String.valueOf(t4));
             verifyCounters(it, 2, 3);
+            customEdge.incrementValue(waitForUpsert);
 
             // job with 2 rows
             it = IndexToolIT.runIndexTool(directApi, useSnapshot, schemaName, dataTableName, indexTableName,
                     null, 0, IndexTool.IndexVerifyType.ONLY, "-st", String.valueOf(t0),"-et", String.valueOf(t2));
             verifyCounters(it, 2, 2);
+            customEdge.incrementValue(waitForUpsert);
 
             // job with update on only one row
             it = IndexToolIT.runIndexTool(directApi, useSnapshot, schemaName, dataTableName, indexTableName,
                     null, 0, IndexTool.IndexVerifyType.ONLY, "-st", String.valueOf(t1),"-et", String.valueOf(t3));
             verifyCounters(it, 1, 2);
+            customEdge.incrementValue(waitForUpsert);
 
             // job with update on only one row
             it = IndexToolIT.runIndexTool(directApi, useSnapshot, schemaName, dataTableName, indexTableName,
                     null, 0, IndexTool.IndexVerifyType.ONLY, "-st", String.valueOf(t2),"-et", String.valueOf(t4));
             verifyCounters(it, 1, 2);
+            customEdge.incrementValue(waitForUpsert);
 
             // job with update on only one row
             it = IndexToolIT.runIndexTool(directApi, useSnapshot, schemaName, dataTableName, indexTableName,
                     null, 0, IndexTool.IndexVerifyType.ONLY, "-st", String.valueOf(t4),"-et", String.valueOf(t5));
             verifyCounters(it, 1, 1);
+            customEdge.incrementValue(waitForUpsert);
 
             // job with no new updates on any row
             it = IndexToolIT.runIndexTool(directApi, useSnapshot, schemaName, dataTableName, indexTableName,
                     null, 0, IndexTool.IndexVerifyType.ONLY, "-st", String.valueOf(t5),"-et", String.valueOf(t6));
             verifyCounters(it, 0, 0);
+            customEdge.incrementValue(waitForUpsert);
+        } finally {
+            EnvironmentEdgeManager.reset();
         }
     }
 
@@ -754,7 +782,7 @@
             customeEdge.incrementValue(10);
             long t5 = customeEdge.currentTime();
             IndexTool it;
-            // regular job without delete row
+            // regular job with delete row
             it =
                     IndexToolIT.runIndexTool(directApi, useSnapshot, schemaName, viewName,
                             viewIndexName, null, 0, IndexTool.IndexVerifyType.ONLY, "-st", String.valueOf(t1),
@@ -781,20 +809,22 @@
                             viewIndexName, null, 0, IndexTool.IndexVerifyType.ONLY, "-st", String.valueOf(t1),
                             "-et", String.valueOf(t3));
             verifyCounters(it, 2, 2);
-
+/*
             // job with update on only one row
             it =
                     IndexToolIT.runIndexTool(directApi, useSnapshot, schemaName, viewName,
                             viewIndexName, null, 0, IndexTool.IndexVerifyType.ONLY, "-st", String.valueOf(t3),
                             "-et", String.valueOf(t4));
             verifyCounters(it, 1, 1);
-
+*/
             // job with no new updates on any row
             it =
                     IndexToolIT.runIndexTool(directApi, useSnapshot, schemaName, viewName,
                             viewIndexName, null, 0, IndexTool.IndexVerifyType.ONLY, "-st", String.valueOf(t4),
                             "-et", String.valueOf(t5));
             verifyCounters(it, 0, 0);
+        } finally {
+            EnvironmentEdgeManager.reset();
         }
     }
 
@@ -826,6 +856,8 @@
         Properties props = PropertiesUtil.deepCopy(TEST_PROPERTIES);
 
         try(Connection conn = DriverManager.getConnection(getUrl(), props)) {
+            deleteAllRows(conn,
+                TableName.valueOf(IndexVerificationOutputRepository.OUTPUT_TABLE_NAME));
             String stmString1 =
                 "CREATE TABLE " + dataTableFullName
                     + " (ID INTEGER NOT NULL PRIMARY KEY, NAME VARCHAR, ZIP INTEGER) "
@@ -873,19 +905,19 @@
                 IndexTool.IndexDisableLoggingType.AFTER,
                 IndexVerificationOutputRepository.PHASE_BEFORE_VALUE, schemaName,
                 dataTableName, indexTableName,
-                indexTableFullName, 0);
+                indexTableFullName, -1);
 
             //clear out both the output table and the index
             deleteAllRows(conn,
                 TableName.valueOf(IndexVerificationOutputRepository.OUTPUT_TABLE_NAME));
             deleteAllRows(conn, TableName.valueOf(indexTableFullName));
 
-            //now check that disabling logging AFTER creates only the BEFORE logs on a BOTH run
+            //now check that disabling logging BEFORE creates only the AFTER logs on a BOTH run
             assertDisableLogging(conn, 2, IndexTool.IndexVerifyType.BOTH,
-                IndexTool.IndexDisableLoggingType.AFTER,
-                IndexVerificationOutputRepository.PHASE_BEFORE_VALUE, schemaName,
+                IndexTool.IndexDisableLoggingType.BEFORE,
+                IndexVerificationOutputRepository.PHASE_AFTER_VALUE, schemaName,
                 dataTableName, indexTableName,
-                indexTableFullName, 0);
+                indexTableFullName, -1);
 
             deleteAllRows(conn, TableName.valueOf(IndexVerificationOutputRepository.OUTPUT_TABLE_NAME));
             deleteAllRows(conn, TableName.valueOf(indexTableFullName));
@@ -895,7 +927,7 @@
                 IndexTool.IndexDisableLoggingType.BOTH,
                 IndexVerificationOutputRepository.PHASE_BEFORE_VALUE, schemaName,
                 dataTableName, indexTableName,
-                indexTableFullName, 0);
+                indexTableFullName, -1);
 
         }
     }
@@ -1087,14 +1119,22 @@
             getAdmin();
         HConnection hbaseConn = admin.getConnection();
         HTableInterface table = hbaseConn.getTable(tableName);
+        boolean deletedRows = false;
         try (ResultScanner scanner = table.getScanner(scan)) {
             for (Result r : scanner) {
                 Delete del = new Delete(r.getRow());
                 table.delete(del);
+                deletedRows = true;
             }
+        } catch (Exception e) {
+            //if the table doesn't exist, we have no rows to delete. Easier to catch
+            //than to pre-check for existence
         }
-        getUtility().getHBaseAdmin().flush(tableName);
-        TestUtil.majorCompact(getUtility(), tableName);
+        //don't flush/compact if we didn't write anything, because we'll hang forever
+        if (deletedRows) {
+            getUtility().getHBaseAdmin().flush(tableName);
+            TestUtil.majorCompact(getUtility(), tableName);
+        }
     }
 
     private void assertDisableLogging(Connection conn, int expectedRows,
@@ -1104,6 +1144,7 @@
                                       String schemaName, String dataTableName,
                                       String indexTableName, String indexTableFullName,
                                       int expectedStatus) throws Exception {
+
         IndexTool tool = IndexToolIT.runIndexTool(true, false, schemaName, dataTableName,
             indexTableName,
             null,
@@ -1161,4 +1202,12 @@
         return output;
     }
 
+    public static class FastFailRegionObserver extends BaseRegionObserver {
+        @Override
+        public RegionScanner postScannerOpen(final ObserverContext<RegionCoprocessorEnvironment> c,
+                                        final Scan scan,
+                                               final RegionScanner s) throws IOException {
+            throw new DoNotRetryIOException("I'm just a coproc that's designed to fail fast");
+        }
+    }
 }
diff --git a/phoenix-core/src/it/java/org/apache/phoenix/end2end/index/IndexVerificationResultRepositoryIT.java b/phoenix-core/src/it/java/org/apache/phoenix/end2end/index/IndexVerificationResultRepositoryIT.java
index 98fe4a0..2c9380b 100644
--- a/phoenix-core/src/it/java/org/apache/phoenix/end2end/index/IndexVerificationResultRepositoryIT.java
+++ b/phoenix-core/src/it/java/org/apache/phoenix/end2end/index/IndexVerificationResultRepositoryIT.java
@@ -69,7 +69,7 @@
             IndexToolVerificationResult expectedResult = getExpectedResult(scanMaxTs);
             IndexVerificationResultRepository resultRepository = setupResultRepository(conn, indexNameBytes, expectedResult);
             IndexToolVerificationResult actualResult =
-                resultRepository.getVerificationResult(conn, scanMaxTs);
+                resultRepository.getVerificationResult(conn, scanMaxTs, indexNameBytes);
             assertVerificationResult(expectedResult, actualResult);
         }
     }
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/index/IndexVerificationResultRepository.java b/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/index/IndexVerificationResultRepository.java
index 91bf565..bf9b4bf 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/index/IndexVerificationResultRepository.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/index/IndexVerificationResultRepository.java
@@ -155,6 +155,21 @@
             setResultTable(admin.getConnection().getTable(resultTableName));
         }
     }
+    private static byte[] generatePartialResultTableRowKey(long ts, byte[] indexTableName) {
+        byte[] keyPrefix = Bytes.toBytes(Long.toString(ts));
+        int targetOffset = 0;
+        // The row key for the result table : timestamp | index table name | datable table region name |
+        //                                    scan start row | scan stop row
+        byte[] partialRowKey = new byte[keyPrefix.length + ROW_KEY_SEPARATOR_BYTE.length
+            + indexTableName.length];
+        Bytes.putBytes(partialRowKey, targetOffset, keyPrefix, 0, keyPrefix.length);
+        targetOffset += keyPrefix.length;
+        Bytes.putBytes(partialRowKey, targetOffset, ROW_KEY_SEPARATOR_BYTE, 0, ROW_KEY_SEPARATOR_BYTE.length);
+        targetOffset += ROW_KEY_SEPARATOR_BYTE.length;
+        Bytes.putBytes(partialRowKey, targetOffset, indexTableName, 0, indexTableName.length);
+        return partialRowKey;
+    }
+
     private static byte[] generateResultTableRowKey(long ts, byte[] indexTableName,  byte [] regionName,
                                                     byte[] startRow, byte[] stopRow) {
         byte[] keyPrefix = Bytes.toBytes(Long.toString(ts));
@@ -248,11 +263,6 @@
         resultTable.put(put);
     }
 
-    public IndexToolVerificationResult getVerificationResult(Connection conn, long ts) throws IOException, SQLException {
-        Table hTable = getTable(conn, RESULT_TABLE_NAME_BYTES);
-        return getVerificationResult(hTable, ts);
-    }
-
     public Table getTable(Connection conn, byte[] tableName) throws SQLException {
         return conn.unwrap(PhoenixConnection.class).getQueryServices()
                 .getTable(tableName);
@@ -266,6 +276,12 @@
         Scan scan = new Scan();
         scan.setStartRow(startRowKey);
         scan.setStopRow(stopRowKey);
+        return aggregateVerificationResult(htable, verificationResult, scan);
+    }
+
+    private IndexToolVerificationResult aggregateVerificationResult(Table htable,
+                                                                    IndexToolVerificationResult verificationResult,
+                                                                    Scan scan) throws IOException {
         ResultScanner scanner = htable.getScanner(scan);
         for (Result result = scanner.next(); result != null; result = scanner.next()) {
             boolean isFirst = true;
@@ -283,6 +299,21 @@
         return verificationResult;
     }
 
+    public IndexToolVerificationResult getVerificationResult(Connection conn,
+                                                             long ts,
+                                                             byte[] indexTableName
+                                                             ) throws IOException,
+        SQLException {
+        Table htable = getTable(conn, RESULT_TABLE_NAME_BYTES);
+        byte[] startRowKey = generatePartialResultTableRowKey(ts, indexTableName);
+        byte[] stopRowKey = ByteUtil.calculateTheClosestNextRowKeyForPrefix(startRowKey);
+        IndexToolVerificationResult verificationResult = new IndexToolVerificationResult(ts);
+        Scan scan = new Scan();
+        scan.setStartRow(startRowKey);
+        scan.setStopRow(stopRowKey);
+        return aggregateVerificationResult(htable, verificationResult, scan);
+    }
+
     private IndexToolVerificationResult getVerificationResult(Table htable, byte [] oldRowKey, Scan scan )
             throws IOException {
         IndexToolVerificationResult verificationResult = null;
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/index/PhoenixIndexImportDirectReducer.java b/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/index/PhoenixIndexImportDirectReducer.java
index 10a6768..8cee869 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/index/PhoenixIndexImportDirectReducer.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/index/PhoenixIndexImportDirectReducer.java
@@ -26,6 +26,7 @@
 
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
+import org.apache.hadoop.hbase.util.Bytes;
 import org.apache.hadoop.io.IntWritable;
 import org.apache.hadoop.io.NullWritable;
 import org.apache.hadoop.mapreduce.Reducer;
@@ -54,6 +55,8 @@
     private IndexVerificationResultRepository resultRepository;
     private static final Logger LOGGER =
             LoggerFactory.getLogger(PhoenixIndexImportDirectReducer.class);
+    private String indexTableName;
+    private byte[] indexTableNameBytes;
 
     private void updateCounters(IndexTool.IndexVerifyType verifyType,
                                 Reducer<ImmutableBytesWritable, IntWritable, NullWritable, NullWritable>.Context context)
@@ -62,7 +65,7 @@
         try (final Connection connection = ConnectionUtil.getInputConnection(configuration)) {
             long ts = Long.valueOf(configuration.get(PhoenixConfigurationUtil.CURRENT_SCN_VALUE));
             IndexToolVerificationResult verificationResult =
-                    resultRepository.getVerificationResult(connection, ts);
+                    resultRepository.getVerificationResult(connection, ts, indexTableNameBytes);
             context.getCounter(PhoenixIndexToolJobCounters.SCANNED_DATA_ROW_COUNT).
                     setValue(verificationResult.getScannedDataRowCount());
             context.getCounter(PhoenixIndexToolJobCounters.REBUILT_INDEX_ROW_COUNT).
@@ -121,6 +124,8 @@
     @Override
     protected void setup(Context context) throws IOException {
         resultRepository = new IndexVerificationResultRepository();
+        indexTableName = PhoenixConfigurationUtil.getPhysicalTableName(context.getConfiguration());
+        indexTableNameBytes = Bytes.toBytes(indexTableName);
     }
 
     @Override