PHOENIX-5799 - Inline Index Verification Output API
diff --git a/phoenix-core/src/it/java/org/apache/phoenix/end2end/IndexToolIT.java b/phoenix-core/src/it/java/org/apache/phoenix/end2end/IndexToolIT.java
index 71cb530..4840047 100644
--- a/phoenix-core/src/it/java/org/apache/phoenix/end2end/IndexToolIT.java
+++ b/phoenix-core/src/it/java/org/apache/phoenix/end2end/IndexToolIT.java
@@ -44,6 +44,8 @@
 import org.apache.hadoop.mapreduce.Job;
 import org.apache.phoenix.coprocessor.IndexRebuildRegionScanner;
 import org.apache.phoenix.hbase.index.IndexRegionObserver;
+import org.apache.phoenix.mapreduce.index.IndexVerificationOutputRepository;
+import org.apache.phoenix.mapreduce.index.IndexVerificationResultRepository;
 import org.apache.phoenix.jdbc.PhoenixConnection;
 import org.apache.phoenix.mapreduce.index.IndexTool;
 import org.apache.phoenix.mapreduce.index.PhoenixIndexImportDirectMapper;
@@ -370,10 +372,11 @@
 
     private void dropIndexToolTables(Connection conn) throws Exception {
         Admin admin = conn.unwrap(PhoenixConnection.class).getQueryServices().getAdmin();
-        TableName indexToolOutputTable = TableName.valueOf(IndexTool.OUTPUT_TABLE_NAME_BYTES);
+        TableName indexToolOutputTable =
+            TableName.valueOf(IndexVerificationOutputRepository.OUTPUT_TABLE_NAME_BYTES);
         admin.disableTable(indexToolOutputTable);
         admin.deleteTable(indexToolOutputTable);
-        TableName indexToolResultTable = TableName.valueOf(IndexTool.RESULT_TABLE_NAME_BYTES);
+        TableName indexToolResultTable = TableName.valueOf(IndexVerificationResultRepository.RESULT_TABLE_NAME_BYTES);
         admin.disableTable(indexToolResultTable);
         admin.deleteTable(indexToolResultTable);
     }
@@ -477,12 +480,13 @@
 
         // This method verifies the common prefix, i.e., "timestamp | index table name | ", since the rest of the
         // fields may include the separator key
-        int offset = Bytes.indexOf(rowKey, IndexRebuildRegionScanner.ROW_KEY_SEPARATOR_BYTE);
+        int offset = Bytes.indexOf(rowKey, IndexVerificationResultRepository.ROW_KEY_SEPARATOR_BYTE);
         offset++;
         byte[] indexTableFullNameBytes = Bytes.toBytes(indexTableFullName);
         assertEquals(Bytes.compareTo(rowKey, offset, indexTableFullNameBytes.length, indexTableFullNameBytes, 0,
                 indexTableFullNameBytes.length), 0);
-        assertEquals(rowKey[offset + indexTableFullNameBytes.length], IndexRebuildRegionScanner.ROW_KEY_SEPARATOR_BYTE[0]);
+        assertEquals(rowKey[offset + indexTableFullNameBytes.length],
+            IndexVerificationResultRepository.ROW_KEY_SEPARATOR_BYTE[0]);
     }
 
     private Cell getErrorMessageFromIndexToolOutputTable(Connection conn, String dataTableFullName, String indexTableFullName)
@@ -490,7 +494,7 @@
         byte[] indexTableFullNameBytes = Bytes.toBytes(indexTableFullName);
         byte[] dataTableFullNameBytes = Bytes.toBytes(dataTableFullName);
         Table hIndexTable = conn.unwrap(PhoenixConnection.class).getQueryServices()
-                .getTable(IndexTool.OUTPUT_TABLE_NAME_BYTES);
+                .getTable(IndexVerificationOutputRepository.OUTPUT_TABLE_NAME_BYTES);
         Scan scan = new Scan();
         ResultScanner scanner = hIndexTable.getScanner(scan);
         boolean dataTableNameCheck = false;
@@ -499,28 +503,30 @@
         for (Result result = scanner.next(); result != null; result = scanner.next()) {
             for (Cell cell : result.rawCells()) {
                 assertTrue(Bytes.compareTo(cell.getFamilyArray(), cell.getFamilyOffset(), cell.getFamilyLength(),
-                        IndexTool.OUTPUT_TABLE_COLUMN_FAMILY, 0,
-                        IndexTool.OUTPUT_TABLE_COLUMN_FAMILY.length) == 0);
+                    IndexVerificationOutputRepository.OUTPUT_TABLE_COLUMN_FAMILY, 0,
+                    IndexVerificationOutputRepository.OUTPUT_TABLE_COLUMN_FAMILY.length) == 0);
                 if (Bytes.compareTo(cell.getQualifierArray(), cell.getQualifierOffset(), cell.getQualifierLength(),
-                        IndexTool.DATA_TABLE_NAME_BYTES, 0, IndexTool.DATA_TABLE_NAME_BYTES.length) == 0) {
+                    IndexVerificationOutputRepository.DATA_TABLE_NAME_BYTES, 0, IndexVerificationOutputRepository.DATA_TABLE_NAME_BYTES.length) == 0) {
                     dataTableNameCheck = true;
                     assertTrue(Bytes.compareTo(cell.getValueArray(), cell.getValueOffset(), cell.getValueLength(),
                             dataTableFullNameBytes, 0, dataTableFullNameBytes.length) == 0);
                 } else if (Bytes.compareTo(cell.getQualifierArray(), cell.getQualifierOffset(), cell.getQualifierLength(),
-                        IndexTool.INDEX_TABLE_NAME_BYTES, 0, IndexTool.INDEX_TABLE_NAME_BYTES.length) == 0) {
+                    IndexVerificationOutputRepository.INDEX_TABLE_NAME_BYTES, 0, IndexVerificationOutputRepository.INDEX_TABLE_NAME_BYTES.length) == 0) {
                     indexTableNameCheck = true;
                     assertTrue(Bytes.compareTo(cell.getValueArray(), cell.getValueOffset(), cell.getValueLength(),
                             indexTableFullNameBytes, 0, indexTableFullNameBytes.length) == 0);
                 } else if (Bytes.compareTo(cell.getQualifierArray(), cell.getQualifierOffset(), cell.getQualifierLength(),
-                        IndexTool.ERROR_MESSAGE_BYTES, 0, IndexTool.ERROR_MESSAGE_BYTES.length) == 0) {
+                    IndexVerificationOutputRepository.ERROR_MESSAGE_BYTES, 0, IndexVerificationOutputRepository.ERROR_MESSAGE_BYTES.length) == 0) {
                     errorMessageCell = cell;
                 }
             }
         }
-        assertTrue(dataTableNameCheck && indexTableNameCheck && errorMessageCell != null);
+        assertTrue( "DataTableNameCheck was false", dataTableNameCheck);
+        assertTrue("IndexTableNameCheck was false", indexTableNameCheck);
+        assertTrue("Error message cell was null", errorMessageCell != null);
         verifyIndexTableRowKey(CellUtil.cloneRow(errorMessageCell), indexTableFullName);
         hIndexTable = conn.unwrap(PhoenixConnection.class).getQueryServices()
-                .getTable(IndexTool.RESULT_TABLE_NAME_BYTES);
+                .getTable(IndexVerificationResultRepository.RESULT_TABLE_NAME_BYTES);
         scan = new Scan();
         scanner = hIndexTable.getScanner(scan);
         Result result = scanner.next();
@@ -716,7 +722,7 @@
             }
             assertTrue(status.getFirst() == 0);
 
-            TableName indexToolOutputTable = TableName.valueOf(IndexTool.OUTPUT_TABLE_NAME_BYTES);
+            TableName indexToolOutputTable = TableName.valueOf(IndexVerificationOutputRepository.OUTPUT_TABLE_NAME_BYTES);
             admin.disableTable(indexToolOutputTable);
             admin.deleteTable(indexToolOutputTable);
             // Run the index tool using the only-verify option, verify it gives no mismatch
diff --git a/phoenix-core/src/it/java/org/apache/phoenix/end2end/index/IndexVerificationOutputRepositoryIT.java b/phoenix-core/src/it/java/org/apache/phoenix/end2end/index/IndexVerificationOutputRepositoryIT.java
new file mode 100644
index 0000000..0b67044
--- /dev/null
+++ b/phoenix-core/src/it/java/org/apache/phoenix/end2end/index/IndexVerificationOutputRepositoryIT.java
@@ -0,0 +1,172 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.phoenix.end2end.index;
+
+import org.apache.hadoop.hbase.client.Result;
+import org.apache.hadoop.hbase.client.ResultScanner;
+import org.apache.hadoop.hbase.client.Scan;
+import org.apache.hadoop.hbase.client.Table;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.phoenix.end2end.ParallelStatsDisabledIT;
+import org.apache.phoenix.jdbc.PhoenixConnection;
+import org.apache.phoenix.mapreduce.index.IndexVerificationOutputRepository;
+import org.apache.phoenix.mapreduce.index.IndexVerificationOutputRow;
+import org.apache.phoenix.util.EnvironmentEdgeManager;
+import org.apache.phoenix.util.ReadOnlyProps;
+import org.junit.BeforeClass;
+import org.junit.Test;
+
+import java.io.IOException;
+import java.sql.Connection;
+import java.sql.DriverManager;
+import java.sql.SQLException;
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+
+import static org.apache.phoenix.mapreduce.index.IndexVerificationOutputRepository.PHASE_AFTER_VALUE;
+import static org.apache.phoenix.mapreduce.index.IndexVerificationOutputRepository.PHASE_BEFORE_VALUE;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotNull;
+
+public class IndexVerificationOutputRepositoryIT extends ParallelStatsDisabledIT {
+
+    @BeforeClass
+    public static void setupClass() throws Exception {
+        Map<String, String> props = Collections.emptyMap();
+        setUpTestDriver(new ReadOnlyProps(props.entrySet().iterator()));
+    }
+
+    @Test
+    public void testReadIndexVerificationOutputRow() throws Exception {
+        String expectedErrorMessage = "I am an error message";
+        byte[] expectedValue = Bytes.toBytes("ab");
+        byte[] actualValue = Bytes.toBytes("ac");
+        try (Connection conn = DriverManager.getConnection(getUrl())) {
+            String tableName = "T" + generateUniqueName();
+            byte[] tableNameBytes = Bytes.toBytes(tableName);
+            String indexName = "I" + generateUniqueName();
+            createTableAndIndexes(conn, tableName, indexName);
+            byte[] indexNameBytes = Bytes.toBytes(indexName);
+            IndexVerificationOutputRepository outputRepository =
+                new IndexVerificationOutputRepository(indexNameBytes, conn);
+            outputRepository.createOutputTable(conn);
+            populateTable(conn, tableName);
+            byte[] dataRowKey = getRowKey(conn, tableNameBytes);
+            byte[] indexRowKey = getRowKey(conn, indexNameBytes);
+            long dataRowTs = getTimestamp(conn, tableNameBytes);
+            long indexRowTs = getTimestamp(conn, indexNameBytes);
+            long scanMaxTs = EnvironmentEdgeManager.currentTimeMillis();
+            outputRepository.logToIndexToolOutputTable(dataRowKey, indexRowKey, dataRowTs,
+                indexRowTs, expectedErrorMessage, expectedValue, actualValue,
+                scanMaxTs, tableNameBytes, true);
+            //now increment the scan time by 1 and do it again
+            outputRepository.logToIndexToolOutputTable(dataRowKey, indexRowKey, dataRowTs,
+                indexRowTs, expectedErrorMessage, expectedValue, actualValue,
+                scanMaxTs +1, tableNameBytes, false);
+            //make sure we only get the first row back
+            IndexVerificationOutputRow expectedRow = buildVerificationRow(dataRowKey, indexRowKey, dataRowTs,
+                indexRowTs, expectedErrorMessage, expectedValue, actualValue,
+                scanMaxTs, tableNameBytes, indexNameBytes, PHASE_BEFORE_VALUE);
+            verifyOutputRow(outputRepository, scanMaxTs, indexNameBytes, expectedRow);
+            //make sure we get the second row back
+            IndexVerificationOutputRow secondExpectedRow = buildVerificationRow(dataRowKey,
+                indexRowKey, dataRowTs,
+                indexRowTs, expectedErrorMessage, expectedValue, actualValue,
+                scanMaxTs + 1, tableNameBytes, indexNameBytes, PHASE_AFTER_VALUE);
+            verifyOutputRow(outputRepository, scanMaxTs+1, indexNameBytes, secondExpectedRow);
+        }
+
+    }
+
+    public void verifyOutputRow(IndexVerificationOutputRepository outputRepository, long scanMaxTs,
+                                byte[] indexNameBytes, IndexVerificationOutputRow expectedRow)
+        throws IOException {
+        List<IndexVerificationOutputRow> actualRows =
+            outputRepository.getOutputRows(scanMaxTs, indexNameBytes);
+        assertNotNull(actualRows);
+        assertEquals(1, actualRows.size());
+        assertEquals(expectedRow, actualRows.get(0));
+    }
+
+    private IndexVerificationOutputRow buildVerificationRow(byte[] dataRowKey, byte[] indexRowKey,
+                                                            long dataRowTs, long indexRowTs,
+                                                            String expectedErrorMessage,
+                                                            byte[] expectedValue, byte[] actualValue,
+                                                            long scanMaxTs,
+                                                            byte[] tableNameBytes,
+                                                            byte[] indexNameBytes,
+                                                            byte[] phaseBeforeValue) {
+        IndexVerificationOutputRow.IndexVerificationOutputRowBuilder builder =
+            new IndexVerificationOutputRow.IndexVerificationOutputRowBuilder();
+        return builder.setDataTableRowKey(dataRowKey).
+            setIndexTableRowKey(indexRowKey).
+            setScanMaxTimestamp(dataRowTs).
+            setDataTableRowTimestamp(dataRowTs).
+            setIndexTableRowTimestamp(indexRowTs).
+            setErrorMessage(Bytes.toString(
+                IndexVerificationOutputRepository.
+                    getErrorMessageBytes(expectedErrorMessage, expectedValue, actualValue))).
+            setExpectedValue(expectedValue).
+            setActualValue(actualValue).
+            setScanMaxTimestamp(scanMaxTs).
+            setDataTableName(Bytes.toString(tableNameBytes)).
+            setIndexTableName(Bytes.toString(indexNameBytes)).
+            setPhaseValue(phaseBeforeValue).
+            build();
+    }
+
+    private byte[] getRowKey(Connection conn, byte[] tableNameBytes)
+        throws SQLException, IOException {
+        Scan scan = new Scan();
+        Table table =
+            conn.unwrap(PhoenixConnection.class).getQueryServices().getTable(tableNameBytes);
+        ResultScanner scanner = table.getScanner(scan);
+        Result r = scanner.next();
+        return r.getRow();
+    }
+
+    private long getTimestamp(Connection conn, byte[] tableNameBytes) throws SQLException,
+        IOException {
+        Scan scan = new Scan();
+        Table table =
+            conn.unwrap(PhoenixConnection.class).getQueryServices().getTable(tableNameBytes);
+        ResultScanner scanner = table.getScanner(scan);
+        Result r = scanner.next();
+        return r.listCells().get(0).getTimestamp();
+    }
+
+    private void createTable(Connection conn, String tableName) throws Exception {
+        conn.createStatement().execute("create table " + tableName +
+            " (id varchar(10) not null primary key, val1 varchar(10), val2 varchar(10), " +
+            "val3 varchar(10))");
+    }
+
+    private void populateTable(Connection conn, String tableName) throws Exception {
+        conn.createStatement().execute("upsert into " + tableName + " values ('a', 'ab', 'abc', 'abcd')");
+        conn.commit();
+    }
+
+    private void createTableAndIndexes(Connection conn, String dataTableName,
+                                       String indexTableName) throws Exception {
+        createTable(conn, dataTableName);
+        conn.createStatement().execute("CREATE INDEX " + indexTableName + " on " +
+            dataTableName + " (val1) include (val2, val3)");
+        conn.commit();
+    }
+}
diff --git a/phoenix-core/src/it/java/org/apache/phoenix/end2end/index/IndexVerificationResultRepositoryIT.java b/phoenix-core/src/it/java/org/apache/phoenix/end2end/index/IndexVerificationResultRepositoryIT.java
new file mode 100644
index 0000000..0ffd13a
--- /dev/null
+++ b/phoenix-core/src/it/java/org/apache/phoenix/end2end/index/IndexVerificationResultRepositoryIT.java
@@ -0,0 +1,142 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.phoenix.end2end.index;
+
+import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.phoenix.coprocessor.IndexToolVerificationResult;
+import org.apache.phoenix.end2end.ParallelStatsDisabledIT;
+import org.apache.phoenix.mapreduce.index.IndexTool;
+import org.apache.phoenix.mapreduce.index.IndexVerificationResultRepository;
+import org.apache.phoenix.util.EnvironmentEdgeManager;
+import org.apache.phoenix.util.ReadOnlyProps;
+import org.junit.BeforeClass;
+import org.junit.Test;
+
+import java.sql.Connection;
+import java.sql.DriverManager;
+import java.util.Collections;
+import java.util.Map;
+
+import static org.junit.Assert.assertArrayEquals;
+import static org.junit.Assert.assertEquals;
+
+public class IndexVerificationResultRepositoryIT extends ParallelStatsDisabledIT {
+
+    @BeforeClass
+    public static void setupClass() throws Exception {
+        Map<String, String> props = Collections.emptyMap();
+        setUpTestDriver(new ReadOnlyProps(props.entrySet().iterator()));
+    }
+
+    @Test
+    public void testReadResultRow() throws Exception {
+        String tableName = "T" + generateUniqueName();
+        String indexName = "I" + generateUniqueName();
+        byte[] indexNameBytes = Bytes.toBytes(indexName);
+        try (Connection conn = DriverManager.getConnection(getUrl())) {
+            createTableAndIndex(conn, tableName, indexName);
+            IndexVerificationResultRepository resultRepository =
+                new IndexVerificationResultRepository(conn, indexNameBytes);
+            resultRepository.createResultTable(conn);
+            byte[] regionOne = Bytes.toBytes("a.1.00000000000000000000");
+            byte[] regionTwo = Bytes.toBytes("a.2.00000000000000000000");
+            long scanMaxTs = EnvironmentEdgeManager.currentTimeMillis();
+            IndexToolVerificationResult expectedResult = getExpectedResult(scanMaxTs);
+            resultRepository.logToIndexToolResultTable(expectedResult, IndexTool.IndexVerifyType.BOTH,
+                regionOne);
+            resultRepository.logToIndexToolResultTable(expectedResult, IndexTool.IndexVerifyType.BOTH,
+                regionTwo);
+            IndexToolVerificationResult actualResult =
+                resultRepository.getVerificationResult(conn, scanMaxTs);
+            assertVerificationResult(expectedResult, actualResult);
+
+        }
+    }
+
+    private void assertVerificationResult(IndexToolVerificationResult expectedResult, IndexToolVerificationResult actualResult) {
+        assertEquals(expectedResult.getScanMaxTs(), actualResult.getScanMaxTs());
+        assertArrayEquals(expectedResult.getStartRow(), actualResult.getStartRow());
+        assertArrayEquals(actualResult.getStopRow(), actualResult.getStopRow());
+
+        //because we're combining two near-identical rows (same values, different region)
+        //we assert on 2x the expected value
+        assertEquals(2 * expectedResult.getBeforeRebuildExpiredIndexRowCount(), 
+            actualResult.getBeforeRebuildExpiredIndexRowCount());
+        assertEquals(2 * expectedResult.getBeforeRebuildInvalidIndexRowCount(),
+            actualResult.getBeforeRebuildInvalidIndexRowCount());
+        assertEquals(2 * expectedResult.getBeforeRebuildMissingIndexRowCount(),
+            actualResult.getBeforeRebuildMissingIndexRowCount());
+        assertEquals(2 * expectedResult.getBeforeRebuildValidIndexRowCount(),
+            actualResult.getBeforeRebuildValidIndexRowCount());
+
+        assertEquals(2 * expectedResult.getAfterRebuildExpiredIndexRowCount(),
+            actualResult.getAfterRebuildExpiredIndexRowCount());
+        assertEquals(2 * expectedResult.getAfterRebuildInvalidIndexRowCount(),
+            actualResult.getAfterRebuildInvalidIndexRowCount());
+        assertEquals(2 * expectedResult.getAfterRebuildMissingIndexRowCount(),
+            actualResult.getAfterRebuildMissingIndexRowCount());
+        assertEquals(2 * expectedResult.getAfterRebuildValidIndexRowCount(),
+            actualResult.getAfterRebuildValidIndexRowCount());
+
+        assertEquals(2 * expectedResult.getScannedDataRowCount(),
+            actualResult.getScannedDataRowCount());
+        assertEquals(2 * expectedResult.getRebuiltIndexRowCount(),
+            actualResult.getRebuiltIndexRowCount());
+        
+    }
+
+    private IndexToolVerificationResult getExpectedResult(long scanMaxTs) {
+        byte[] startRow = Bytes.toBytes("a");
+        byte[] stopRow = Bytes.toBytes("b");
+        IndexToolVerificationResult result = new IndexToolVerificationResult(startRow, stopRow,
+            scanMaxTs);
+        result.setScannedDataRowCount(1);
+        result.setRebuiltIndexRowCount(1);
+        IndexToolVerificationResult.PhaseResult before =
+            new IndexToolVerificationResult.PhaseResult();
+        populatePhaseResult(before);
+        IndexToolVerificationResult.PhaseResult after =
+            new IndexToolVerificationResult.PhaseResult();
+        populatePhaseResult(after);
+        result.setBefore(before);
+        result.setAfter(after);
+        return result;
+    }
+
+    private void populatePhaseResult(IndexToolVerificationResult.PhaseResult result){
+        result.setValidIndexRowCount(1);
+        result.setBeyondMaxLookBackInvalidIndexRowCount(1);
+        result.setBeyondMaxLookBackMissingIndexRowCount(1);
+        result.setExpiredIndexRowCount(1);
+        result.setInvalidIndexRowCount(1);
+        result.setMissingIndexRowCount(1);
+    }
+    private void createTable(Connection conn, String tableName) throws Exception {
+        conn.createStatement().execute("create table " + tableName +
+            " (id varchar(10) not null primary key, val1 varchar(10), val2 varchar(10), " +
+            "val3 varchar(10))");
+    }
+
+    private void createTableAndIndex(Connection conn, String dataTableName,
+                                       String indexTableName) throws Exception {
+        createTable(conn, dataTableName);
+        conn.createStatement().execute("CREATE INDEX " + indexTableName + " on " +
+            dataTableName + " (val1) include (val2, val3)");
+        conn.commit();
+    }
+}
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/IndexRebuildRegionScanner.java b/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/IndexRebuildRegionScanner.java
index 2bdaf1d..8b7e3f2 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/IndexRebuildRegionScanner.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/IndexRebuildRegionScanner.java
@@ -20,7 +20,6 @@
 import static org.apache.phoenix.hbase.index.IndexRegionObserver.VERIFIED_BYTES;
 import static org.apache.phoenix.hbase.index.IndexRegionObserver.removeEmptyColumn;
 import static org.apache.phoenix.hbase.index.write.AbstractParallelWriterIndexCommitter.INDEX_WRITER_KEEP_ALIVE_TIME_CONF_KEY;
-import static org.apache.phoenix.mapreduce.index.IndexTool.*;
 import static org.apache.phoenix.query.QueryConstants.AGG_TIMESTAMP;
 import static org.apache.phoenix.query.QueryConstants.SINGLE_COLUMN;
 import static org.apache.phoenix.query.QueryConstants.SINGLE_COLUMN_FAMILY;
@@ -82,6 +81,8 @@
 import org.apache.phoenix.hbase.index.util.IndexManagementUtil;
 import org.apache.phoenix.index.GlobalIndexChecker;
 import org.apache.phoenix.index.IndexMaintainer;
+import org.apache.phoenix.mapreduce.index.IndexVerificationOutputRepository;
+import org.apache.phoenix.mapreduce.index.IndexVerificationResultRepository;
 import org.apache.phoenix.index.PhoenixIndexCodec;
 import org.apache.phoenix.mapreduce.index.IndexTool;
 import org.apache.phoenix.query.KeyRange;
@@ -108,7 +109,6 @@
     public static final String NO_EXPECTED_MUTATION = "No expected mutation";
     public static final String
             ACTUAL_MUTATION_IS_NULL_OR_EMPTY = "actualMutationList is null or empty";
-    public static final byte[] ROW_KEY_SEPARATOR_BYTE = Bytes.toBytes("|");
     private long pageSizeInRows = Long.MAX_VALUE;
     private int rowCountPerTask;
     private boolean hasMore;
@@ -125,8 +125,6 @@
     private IndexMaintainer indexMaintainer;
     private byte[] indexRowKey = null;
     private Table indexHTable = null;
-    private Table outputHTable = null;
-    private Table resultHTable = null;
     private IndexTool.IndexVerifyType verifyType = IndexTool.IndexVerifyType.NONE;
     private boolean verify = false;
     private Map<byte[], List<Mutation>> indexKeyToMutationMap;
@@ -135,7 +133,6 @@
     private TaskBatch<Boolean> tasks;
     private String exceptionMessage;
     private UngroupedAggregateRegionObserver ungroupedAggregateRegionObserver;
-    private RegionCoprocessorEnvironment env;
     private HTableFactory hTableFactory;
     private int indexTableTTL = 0;
     private IndexToolVerificationResult verificationResult;
@@ -144,6 +141,8 @@
     private int  singleRowRebuildReturnCode;
     private Map<byte[], NavigableSet<byte[]>> familyMap;
     private byte[][] viewConstants;
+    private IndexVerificationResultRepository verificationResultRepository;
+    private IndexVerificationOutputRepository verificationOutputRepository;
     private long maxLookBackInMills;
 
     @VisibleForTesting
@@ -179,7 +178,6 @@
 
         this.innerScanner = innerScanner;
         this.region = region;
-        this.env = env;
         this.ungroupedAggregateRegionObserver = ungroupedAggregateRegionObserver;
         indexRowKey = scan.getAttribute(BaseScannerRegionObserver.INDEX_ROW_KEY);
         if (indexRowKey != null) {
@@ -188,7 +186,7 @@
         }
         byte[] valueBytes = scan.getAttribute(BaseScannerRegionObserver.INDEX_REBUILD_VERIFY_TYPE);
         if (valueBytes != null) {
-            verificationResult = new IndexToolVerificationResult();
+            verificationResult = new IndexToolVerificationResult(scan);
             verifyType = IndexTool.IndexVerifyType.fromValue(valueBytes);
             if (verifyType != IndexTool.IndexVerifyType.NONE) {
                 verify = true;
@@ -197,8 +195,10 @@
                 hTableFactory = ServerUtil.getDelegateHTableFactory(env, ServerUtil.ConnectionType.INDEX_WRITER_CONNECTION);
                 indexHTable = hTableFactory.getTable(new ImmutableBytesPtr(indexMaintainer.getIndexTableName()));
                 indexTableTTL = indexHTable.getTableDescriptor().getColumnFamilies()[0].getTimeToLive();
-                outputHTable = hTableFactory.getTable(new ImmutableBytesPtr(IndexTool.OUTPUT_TABLE_NAME_BYTES));
-                resultHTable = hTableFactory.getTable(new ImmutableBytesPtr(IndexTool.RESULT_TABLE_NAME_BYTES));
+                verificationResultRepository =
+                    new IndexVerificationResultRepository(indexMaintainer.getIndexTableName(), hTableFactory);
+                verificationOutputRepository =
+                    new IndexVerificationOutputRepository(indexMaintainer.getIndexTableName(), hTableFactory);
                 indexKeyToMutationMap = Maps.newTreeMap(Bytes.BYTES_COMPARATOR);
                 dataKeyToMutationMap = Maps.newTreeMap(Bytes.BYTES_COMPARATOR);
                 pool = new WaitForCompletionTaskRunner(ThreadPoolManager.getExecutor(
@@ -247,88 +247,21 @@
         return false;
     }
 
-    private static byte[] generateResultTableRowKey(long ts, byte[] indexTableName,  byte [] regionName,
-                                                    byte[] startRow, byte[] stopRow) {
-        byte[] keyPrefix = Bytes.toBytes(Long.toString(ts));
-        int targetOffset = 0;
-        // The row key for the result table : timestamp | index table name | datable table region name |
-        //                                    scan start row | scan stop row
-        byte[] rowKey = new byte[keyPrefix.length + ROW_KEY_SEPARATOR_BYTE.length + indexTableName.length +
-                ROW_KEY_SEPARATOR_BYTE.length + regionName.length + ROW_KEY_SEPARATOR_BYTE.length +
-                startRow.length + ROW_KEY_SEPARATOR_BYTE.length + stopRow.length];
-        Bytes.putBytes(rowKey, targetOffset, keyPrefix, 0, keyPrefix.length);
-        targetOffset += keyPrefix.length;
-        Bytes.putBytes(rowKey, targetOffset, ROW_KEY_SEPARATOR_BYTE, 0, ROW_KEY_SEPARATOR_BYTE.length);
-        targetOffset += ROW_KEY_SEPARATOR_BYTE.length;
-        Bytes.putBytes(rowKey, targetOffset, indexTableName, 0, indexTableName.length);
-        targetOffset += indexTableName.length;
-        Bytes.putBytes(rowKey, targetOffset, ROW_KEY_SEPARATOR_BYTE, 0, ROW_KEY_SEPARATOR_BYTE.length);
-        targetOffset += ROW_KEY_SEPARATOR_BYTE.length;
-        Bytes.putBytes(rowKey, targetOffset, regionName, 0, regionName.length);
-        targetOffset += regionName.length;
-        Bytes.putBytes(rowKey, targetOffset, ROW_KEY_SEPARATOR_BYTE, 0, ROW_KEY_SEPARATOR_BYTE.length);
-        targetOffset += ROW_KEY_SEPARATOR_BYTE.length;
-        Bytes.putBytes(rowKey, targetOffset, startRow, 0, startRow.length);
-        targetOffset += startRow.length;
-        Bytes.putBytes(rowKey, targetOffset, ROW_KEY_SEPARATOR_BYTE, 0, ROW_KEY_SEPARATOR_BYTE.length);
-        targetOffset += ROW_KEY_SEPARATOR_BYTE.length;
-        Bytes.putBytes(rowKey, targetOffset, stopRow, 0, stopRow.length);
-        return rowKey;
-    }
 
-    private void logToIndexToolResultTable() throws IOException {
-        long scanMaxTs = scan.getTimeRange().getMax();
-        byte[] rowKey = generateResultTableRowKey(scanMaxTs, indexHTable.getName().toBytes(),
-                Bytes.toBytes(region.getRegionInfo().getRegionNameAsString()), scan.getStartRow(), scan.getStopRow());
-        Put put = new Put(rowKey);
-        put.addColumn(RESULT_TABLE_COLUMN_FAMILY, SCANNED_DATA_ROW_COUNT_BYTES,
-                scanMaxTs, Bytes.toBytes(Long.toString(verificationResult.scannedDataRowCount)));
-        put.addColumn(RESULT_TABLE_COLUMN_FAMILY, REBUILT_INDEX_ROW_COUNT_BYTES,
-                scanMaxTs, Bytes.toBytes(Long.toString(verificationResult.rebuiltIndexRowCount)));
-        if (verifyType == IndexTool.IndexVerifyType.BEFORE || verifyType == IndexTool.IndexVerifyType.BOTH ||
-                verifyType == IndexTool.IndexVerifyType.ONLY) {
-            put.addColumn(RESULT_TABLE_COLUMN_FAMILY, BEFORE_REBUILD_VALID_INDEX_ROW_COUNT_BYTES,
-                    scanMaxTs, Bytes.toBytes(Long.toString(verificationResult.before.validIndexRowCount)));
-            put.addColumn(RESULT_TABLE_COLUMN_FAMILY, BEFORE_REBUILD_EXPIRED_INDEX_ROW_COUNT_BYTES,
-                    scanMaxTs, Bytes.toBytes(Long.toString(verificationResult.before.expiredIndexRowCount)));
-            put.addColumn(RESULT_TABLE_COLUMN_FAMILY, BEFORE_REBUILD_MISSING_INDEX_ROW_COUNT_BYTES,
-                    scanMaxTs, Bytes.toBytes(Long.toString(verificationResult.before.missingIndexRowCount)));
-            put.addColumn(RESULT_TABLE_COLUMN_FAMILY, BEFORE_REBUILD_INVALID_INDEX_ROW_COUNT_BYTES,
-                    scanMaxTs, Bytes.toBytes(Long.toString(verificationResult.before.invalidIndexRowCount)));
-            put.addColumn(RESULT_TABLE_COLUMN_FAMILY, BEFORE_REBUILD_BEYOND_MAXLOOKBACK_MISSING_INDEX_ROW_COUNT_BYTES,
-                    scanMaxTs, Bytes.toBytes(Long.toString(verificationResult.before.beyondMaxLookBackMissingIndexRowCount)));
-            put.addColumn(RESULT_TABLE_COLUMN_FAMILY, BEFORE_REBUILD_BEYOND_MAXLOOKBACK_INVALID_INDEX_ROW_COUNT_BYTES,
-                    scanMaxTs, Bytes.toBytes(Long.toString(verificationResult.before.beyondMaxLookBackInvalidIndexRowCount)));
-        }
-        if (verifyType == IndexTool.IndexVerifyType.AFTER || verifyType == IndexTool.IndexVerifyType.BOTH) {
-            put.addColumn(RESULT_TABLE_COLUMN_FAMILY, AFTER_REBUILD_VALID_INDEX_ROW_COUNT_BYTES,
-                    scanMaxTs, Bytes.toBytes(Long.toString(verificationResult.after.validIndexRowCount)));
-            put.addColumn(RESULT_TABLE_COLUMN_FAMILY, AFTER_REBUILD_EXPIRED_INDEX_ROW_COUNT_BYTES,
-                    scanMaxTs, Bytes.toBytes(Long.toString(verificationResult.after.expiredIndexRowCount)));
-            put.addColumn(RESULT_TABLE_COLUMN_FAMILY, AFTER_REBUILD_MISSING_INDEX_ROW_COUNT_BYTES,
-                    scanMaxTs, Bytes.toBytes(Long.toString(verificationResult.after.missingIndexRowCount)));
-            put.addColumn(RESULT_TABLE_COLUMN_FAMILY, AFTER_REBUILD_INVALID_INDEX_ROW_COUNT_BYTES,
-                    scanMaxTs, Bytes.toBytes(Long.toString(verificationResult.after.invalidIndexRowCount)));
-            put.addColumn(RESULT_TABLE_COLUMN_FAMILY, AFTER_REBUILD_BEYOND_MAXLOOKBACK_MISSING_INDEX_ROW_COUNT_BYTES,
-                    scanMaxTs, Bytes.toBytes(Long.toString(verificationResult.after.beyondMaxLookBackMissingIndexRowCount)));
-            put.addColumn(RESULT_TABLE_COLUMN_FAMILY, AFTER_REBUILD_BEYOND_MAXLOOKBACK_INVALID_INDEX_ROW_COUNT_BYTES,
-                    scanMaxTs, Bytes.toBytes(Long.toString(verificationResult.after.beyondMaxLookBackInvalidIndexRowCount)));
-        }
-        resultHTable.put(put);
-    }
 
     @Override
     public void close() throws IOException {
         innerScanner.close();
         if (verify) {
             try {
-                logToIndexToolResultTable();
+                verificationResultRepository.logToIndexToolResultTable(verificationResult,
+                    verifyType, region.getRegionInfo().getRegionName());
             } finally {
                 this.pool.stop("IndexRebuildRegionScanner is closing");
                 hTableFactory.shutdown();
                 indexHTable.close();
-                outputHTable.close();
-                resultHTable.close();
+                verificationResultRepository.close();
+                verificationOutputRepository.close();
             }
         }
     }
@@ -419,80 +352,18 @@
         return true;
     }
 
-    @VisibleForTesting
     public void logToIndexToolOutputTable(byte[] dataRowKey, byte[] indexRowKey, long dataRowTs, long indexRowTs,
-                                           String errorMsg) throws IOException {
-        logToIndexToolOutputTable(dataRowKey, indexRowKey, dataRowTs, indexRowTs,
-                errorMsg, null, null);
-
-    }
-
-    private static byte[] generateOutputTableRowKey(long ts, byte[] indexTableName, byte[] dataRowKey ) {
-        byte[] keyPrefix = Bytes.toBytes(Long.toString(ts));
-        byte[] rowKey;
-        int targetOffset = 0;
-        // The row key for the output table : timestamp | index table name | data row key
-        rowKey = new byte[keyPrefix.length + ROW_KEY_SEPARATOR_BYTE.length + indexTableName.length +
-                ROW_KEY_SEPARATOR_BYTE.length + dataRowKey.length];
-        Bytes.putBytes(rowKey, targetOffset, keyPrefix, 0, keyPrefix.length);
-        targetOffset += keyPrefix.length;
-        Bytes.putBytes(rowKey, targetOffset, ROW_KEY_SEPARATOR_BYTE, 0, ROW_KEY_SEPARATOR_BYTE.length);
-        targetOffset += ROW_KEY_SEPARATOR_BYTE.length;
-        Bytes.putBytes(rowKey, targetOffset, indexTableName, 0, indexTableName.length);
-        targetOffset += indexTableName.length;
-        Bytes.putBytes(rowKey, targetOffset, ROW_KEY_SEPARATOR_BYTE, 0, ROW_KEY_SEPARATOR_BYTE.length);
-        targetOffset += ROW_KEY_SEPARATOR_BYTE.length;
-        Bytes.putBytes(rowKey, targetOffset, dataRowKey, 0, dataRowKey.length);
-        return rowKey;
+                                          String errorMsg) throws IOException {
+        logToIndexToolOutputTable(dataRowKey, indexRowKey, dataRowTs, indexRowTs, errorMsg, null, null);
     }
 
     @VisibleForTesting
     public void logToIndexToolOutputTable(byte[] dataRowKey, byte[] indexRowKey, long dataRowTs, long indexRowTs,
-                                           String errorMsg, byte[] expectedValue, byte[] actualValue) throws IOException {
-        final byte[] E_VALUE_PREFIX_BYTES = Bytes.toBytes(" E:");
-        final byte[] A_VALUE_PREFIX_BYTES = Bytes.toBytes(" A:");
-        final int PREFIX_LENGTH = 3;
-        final int TOTAL_PREFIX_LENGTH = 6;
-        final byte[] PHASE_BEFORE_VALUE = Bytes.toBytes("BEFORE");
-        final byte[] PHASE_AFTER_VALUE = Bytes.toBytes("AFTER");
-        long scanMaxTs = scan.getTimeRange().getMax();
-        byte[] rowKey = generateOutputTableRowKey(scanMaxTs, indexHTable.getName().toBytes(), dataRowKey);
-        Put put = new Put(rowKey);
-        put.addColumn(IndexTool.OUTPUT_TABLE_COLUMN_FAMILY, IndexTool.DATA_TABLE_NAME_BYTES,
-                scanMaxTs, region.getRegionInfo().getTable().getName());
-        put.addColumn(IndexTool.OUTPUT_TABLE_COLUMN_FAMILY, IndexTool.INDEX_TABLE_NAME_BYTES,
-                scanMaxTs, indexMaintainer.getIndexTableName());
-        put.addColumn(IndexTool.OUTPUT_TABLE_COLUMN_FAMILY, IndexTool.DATA_TABLE_TS_BYTES,
-                    scanMaxTs, Bytes.toBytes(Long.toString(dataRowTs)));
-
-        put.addColumn(IndexTool.OUTPUT_TABLE_COLUMN_FAMILY, IndexTool.INDEX_TABLE_ROW_KEY_BYTES,
-                scanMaxTs, indexRowKey);
-        put.addColumn(IndexTool.OUTPUT_TABLE_COLUMN_FAMILY, IndexTool.INDEX_TABLE_TS_BYTES,
-                scanMaxTs, Bytes.toBytes(Long.toString(indexRowTs)));
-        byte[] errorMessageBytes;
-        if (expectedValue != null) {
-            errorMessageBytes = new byte[errorMsg.length() + expectedValue.length + actualValue.length +
-                    TOTAL_PREFIX_LENGTH];
-            Bytes.putBytes(errorMessageBytes, 0, Bytes.toBytes(errorMsg), 0, errorMsg.length());
-            int length = errorMsg.length();
-            Bytes.putBytes(errorMessageBytes, length, E_VALUE_PREFIX_BYTES, 0, PREFIX_LENGTH);
-            length += PREFIX_LENGTH;
-            Bytes.putBytes(errorMessageBytes, length, expectedValue, 0, expectedValue.length);
-            length += expectedValue.length;
-            Bytes.putBytes(errorMessageBytes, length, A_VALUE_PREFIX_BYTES, 0, PREFIX_LENGTH);
-            length += PREFIX_LENGTH;
-            Bytes.putBytes(errorMessageBytes, length, actualValue, 0, actualValue.length);
-
-        } else {
-            errorMessageBytes = Bytes.toBytes(errorMsg);
-        }
-        put.addColumn(IndexTool.OUTPUT_TABLE_COLUMN_FAMILY, IndexTool.ERROR_MESSAGE_BYTES, scanMaxTs, errorMessageBytes);
-        if (isBeforeRebuilt) {
-            put.addColumn(IndexTool.OUTPUT_TABLE_COLUMN_FAMILY, IndexTool.VERIFICATION_PHASE_BYTES, scanMaxTs, PHASE_BEFORE_VALUE);
-        } else {
-            put.addColumn(IndexTool.OUTPUT_TABLE_COLUMN_FAMILY, IndexTool.VERIFICATION_PHASE_BYTES, scanMaxTs, PHASE_AFTER_VALUE);
-        }
-        outputHTable.put(put);
+                                           String errorMsg, byte[] expectedVaue, byte[] actualValue)
+        throws IOException {
+        verificationOutputRepository.logToIndexToolOutputTable(dataRowKey, indexRowKey, dataRowTs, indexRowTs,
+                errorMsg, expectedVaue, actualValue, scan.getTimeRange().getMax(),
+            region.getRegionInfo().getTable().getName(), isBeforeRebuilt);
     }
 
     private static long getMaxTimestamp(Mutation m) {
@@ -854,7 +725,7 @@
             // verify
             // TODO: have a metric to update for these cases
             if (isTimestampBeforeTTL(currentTime, getTimestamp(expected))) {
-                verificationPhaseResult.expiredIndexRowCount++;
+                verificationPhaseResult.setExpiredIndexRowCount(verificationPhaseResult.getExpiredIndexRowCount() + 1);
                 return true;
             }
             actual = actualMutationList.get(actualIndex);
@@ -907,7 +778,7 @@
 
         if (expectedIndex == expectedSize ){
             // every expected mutation has its matching one in the actual list.
-            verificationPhaseResult.validIndexRowCount++;
+            verificationPhaseResult.setValidIndexRowCount(verificationPhaseResult.getValidIndexRowCount() + 1);
             return true;
         }
 
@@ -915,7 +786,8 @@
             if (expectedIndex > 0) {
                 // if current expected index mutation is beyond max look back window, we only need to make sure its latest
                 // mutation is a matching one, as an SCN query is required.
-                verificationPhaseResult.validIndexRowCount++;
+                verificationPhaseResult.
+                    setValidIndexRowCount(verificationPhaseResult.getValidIndexRowCount() + 1);
                 return true;
             }
 
@@ -924,7 +796,8 @@
             // We report it as a failure, so "before" option can trigger the index rebuild for this row.
             // This repair is required, when there is only one index row for a given data table row and the timestamp of that row
             // can be beyond maxLookBack.
-            verificationPhaseResult.beyondMaxLookBackInvalidIndexRowCount++;
+            verificationPhaseResult.
+                setBeyondMaxLookBackInvalidIndexRowCount(verificationPhaseResult.getBeyondMaxLookBackInvalidIndexRowCount() + 1);
             byte[] dataKey = indexMaintainer.buildDataRowKey(new ImmutableBytesWritable(indexRow.getRow()), viewConstants);
             String errorMsg = String.format("Expect %1$s mutations but got %2$s (beyond maxLookBack)",
                     expectedSize,
@@ -944,7 +817,7 @@
                 logToIndexToolOutputTable(dataKey, indexRow.getRow(),
                         getTimestamp(expectedMutationList.get(0)), 0L, errorMsg);
             }
-            verificationPhaseResult.invalidIndexRowCount++;
+            verificationPhaseResult.setInvalidIndexRowCount(verificationPhaseResult.getInvalidIndexRowCount() + 1);
             return false;
         }
     }
@@ -999,7 +872,7 @@
                 List<Mutation> mutationList = indexKeyToMutationMap.get(key);
                 if (isTimestampBeforeTTL(currentTime, getTimestamp(mutationList.get(mutationList.size() - 1)))) {
                     itr.remove();
-                    verificationPhaseResult.expiredIndexRowCount++;
+                    verificationPhaseResult.setExpiredIndexRowCount(verificationPhaseResult.getExpiredIndexRowCount() + 1);
                 }
             }
         }
@@ -1015,11 +888,12 @@
                 String errorMsg;
                 if (isTimestampBeyondMaxLookBack(currentTime, getTimestamp(mutation))){
                     errorMsg = ERROR_MESSAGE_MISSING_INDEX_ROW_BEYOND_MAX_LOOKBACK;
-                    verificationPhaseResult.beyondMaxLookBackMissingIndexRowCount++;
+                    verificationPhaseResult.
+                        setBeyondMaxLookBackMissingIndexRowCount(verificationPhaseResult.getBeyondMaxLookBackMissingIndexRowCount() + 1);
                 }
                 else {
                     errorMsg = "Missing index row";
-                    verificationPhaseResult.missingIndexRowCount++;
+                    verificationPhaseResult.setMissingIndexRowCount(verificationPhaseResult.getMissingIndexRowCount() + 1);
                 }
                 byte[] dataKey = indexMaintainer.buildDataRowKey(new ImmutableBytesWritable(keyRange.getLowerRange()), viewConstants);
                 logToIndexToolOutputTable(dataKey,
@@ -1145,12 +1019,12 @@
     }
 
     private void verifyAndOrRebuildIndex() throws IOException {
-        IndexToolVerificationResult nextVerificationResult = new IndexToolVerificationResult();
-        nextVerificationResult.scannedDataRowCount = dataKeyToMutationMap.size();
+        IndexToolVerificationResult nextVerificationResult = new IndexToolVerificationResult(scan);
+        nextVerificationResult.setScannedDataRowCount(dataKeyToMutationMap.size());
         if (verifyType == IndexTool.IndexVerifyType.AFTER || verifyType == IndexTool.IndexVerifyType.NONE) {
             // For these options we start with rebuilding index rows
             rebuildIndexRows(mutations);
-            nextVerificationResult.rebuiltIndexRowCount = dataKeyToMutationMap.size();
+            nextVerificationResult.setRebuiltIndexRowCount(dataKeyToMutationMap.size());
             isBeforeRebuilt = false;
         }
         if (verifyType == IndexTool.IndexVerifyType.NONE) {
@@ -1161,7 +1035,7 @@
             IndexToolVerificationResult.PhaseResult verificationPhaseResult = new IndexToolVerificationResult.PhaseResult();
             // For these options we start with verifying index rows
             parallelizeIndexVerify(verificationPhaseResult);
-            nextVerificationResult.before.add(verificationPhaseResult);
+            nextVerificationResult.getBefore().add(verificationPhaseResult);
         }
         if (verifyType == IndexTool.IndexVerifyType.BEFORE || verifyType == IndexTool.IndexVerifyType.BOTH) {
             // For these options, we have identified the rows to be rebuilt and now need to rebuild them
@@ -1177,7 +1051,7 @@
                 }
             }
             rebuildIndexRows(mutations);
-            nextVerificationResult.rebuiltIndexRowCount += dataKeyToMutationMap.size();
+            nextVerificationResult.setRebuiltIndexRowCount(nextVerificationResult.getRebuiltIndexRowCount() + dataKeyToMutationMap.size());
             isBeforeRebuilt = false;
         }
 
@@ -1189,7 +1063,7 @@
                 prepareIndexMutations(entry.getValue().getFirst(), entry.getValue().getSecond());
             }
             parallelizeIndexVerify(verificationPhaseResult);
-            nextVerificationResult.after.add(verificationPhaseResult);
+            nextVerificationResult.getAfter().add(verificationPhaseResult);
         }
         verificationResult.add(nextVerificationResult);
     }
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/IndexToolVerificationResult.java b/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/IndexToolVerificationResult.java
index 1fbb866..989e03f 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/IndexToolVerificationResult.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/IndexToolVerificationResult.java
@@ -19,63 +19,141 @@
 
 import org.apache.hadoop.hbase.Cell;
 import org.apache.hadoop.hbase.CellUtil;
-import org.apache.hadoop.hbase.HConstants;
-import org.apache.hadoop.hbase.client.Result;
-import org.apache.hadoop.hbase.client.ResultScanner;
 import org.apache.hadoop.hbase.client.Scan;
-import org.apache.hadoop.hbase.client.Table;
 import org.apache.hadoop.hbase.util.Bytes;
 import org.apache.phoenix.mapreduce.index.IndexTool;
 
-import java.io.IOException;
-import java.util.Arrays;
-
-import static org.apache.phoenix.mapreduce.index.IndexTool.*;
+import static org.apache.phoenix.mapreduce.index.IndexVerificationResultRepository.AFTER_REBUILD_BEYOND_MAXLOOKBACK_INVALID_INDEX_ROW_COUNT_BYTES;
+import static org.apache.phoenix.mapreduce.index.IndexVerificationResultRepository.AFTER_REBUILD_BEYOND_MAXLOOKBACK_MISSING_INDEX_ROW_COUNT_BYTES;
+import static org.apache.phoenix.mapreduce.index.IndexVerificationResultRepository.AFTER_REBUILD_EXPIRED_INDEX_ROW_COUNT_BYTES;
+import static org.apache.phoenix.mapreduce.index.IndexVerificationResultRepository.AFTER_REBUILD_INVALID_INDEX_ROW_COUNT_BYTES;
+import static org.apache.phoenix.mapreduce.index.IndexVerificationResultRepository.AFTER_REBUILD_MISSING_INDEX_ROW_COUNT_BYTES;
+import static org.apache.phoenix.mapreduce.index.IndexVerificationResultRepository.AFTER_REBUILD_VALID_INDEX_ROW_COUNT_BYTES;
+import static org.apache.phoenix.mapreduce.index.IndexVerificationResultRepository.BEFORE_REBUILD_BEYOND_MAXLOOKBACK_INVALID_INDEX_ROW_COUNT_BYTES;
+import static org.apache.phoenix.mapreduce.index.IndexVerificationResultRepository.BEFORE_REBUILD_BEYOND_MAXLOOKBACK_MISSING_INDEX_ROW_COUNT_BYTES;
+import static org.apache.phoenix.mapreduce.index.IndexVerificationResultRepository.BEFORE_REBUILD_EXPIRED_INDEX_ROW_COUNT_BYTES;
+import static org.apache.phoenix.mapreduce.index.IndexVerificationResultRepository.BEFORE_REBUILD_INVALID_INDEX_ROW_COUNT_BYTES;
+import static org.apache.phoenix.mapreduce.index.IndexVerificationResultRepository.BEFORE_REBUILD_MISSING_INDEX_ROW_COUNT_BYTES;
+import static org.apache.phoenix.mapreduce.index.IndexVerificationResultRepository.BEFORE_REBUILD_VALID_INDEX_ROW_COUNT_BYTES;
+import static org.apache.phoenix.mapreduce.index.IndexVerificationResultRepository.REBUILT_INDEX_ROW_COUNT_BYTES;
+import static org.apache.phoenix.mapreduce.index.IndexVerificationResultRepository.RESULT_TABLE_COLUMN_FAMILY;
+import static org.apache.phoenix.mapreduce.index.IndexVerificationResultRepository.SCANNED_DATA_ROW_COUNT_BYTES;
 
 public class IndexToolVerificationResult {
+
+    public void setScannedDataRowCount(long scannedDataRowCount) {
+        this.scannedDataRowCount = scannedDataRowCount;
+    }
+
+    public void setRebuiltIndexRowCount(long rebuiltIndexRowCount) {
+        this.rebuiltIndexRowCount = rebuiltIndexRowCount;
+    }
+
+    public PhaseResult getBefore() {
+        return before;
+    }
+
+    public void setBefore(PhaseResult before) {
+        this.before = before;
+    }
+
+    public PhaseResult getAfter() {
+        return after;
+    }
+
+    public void setAfter(PhaseResult after) {
+        this.after = after;
+    }
+
+    public byte[] getStartRow() {
+        return startRow;
+    }
+
+    public byte[] getStopRow() {
+        return stopRow;
+    }
+
+    public long getScanMaxTs() {
+        return scanMaxTs;
+    }
+
+    public IndexToolVerificationResult(long scanMaxTs) {
+        this.scanMaxTs = scanMaxTs;
+    }
+
+    public IndexToolVerificationResult(byte[] startRow, byte[] stopRow, long scanMaxTs) {
+        this.setStartRow(startRow);
+        this.setStopRow(stopRow);
+        this.scanMaxTs = scanMaxTs;
+    }
+
+    public IndexToolVerificationResult(Scan scan) {
+        this.setStartRow(scan.getStartRow());
+        this.setStopRow(scan.getStopRow());
+        this.scanMaxTs = scan.getTimeRange().getMax();
+    }
+
+    public byte[] getRegion() {
+        return region;
+    }
+
+    public void setStartRow(byte[] startRow) {
+        this.startRow = startRow;
+    }
+
+    public void setStopRow(byte[] stopRow) {
+        this.stopRow = stopRow;
+    }
+
     public static class PhaseResult {
-        long validIndexRowCount = 0;
-        long expiredIndexRowCount = 0;
-        long missingIndexRowCount = 0;
-        long invalidIndexRowCount = 0;
-        long beyondMaxLookBackMissingIndexRowCount = 0;
-        long beyondMaxLookBackInvalidIndexRowCount = 0;
+        private long validIndexRowCount = 0;
+        private long expiredIndexRowCount = 0;
+        private long missingIndexRowCount = 0;
+        private long invalidIndexRowCount = 0;
+        private long beyondMaxLookBackMissingIndexRowCount = 0;
+        private long beyondMaxLookBackInvalidIndexRowCount = 0;
 
         public void add(PhaseResult phaseResult) {
-            validIndexRowCount += phaseResult.validIndexRowCount;
-            expiredIndexRowCount += phaseResult.expiredIndexRowCount;
-            missingIndexRowCount += phaseResult.missingIndexRowCount;
-            invalidIndexRowCount += phaseResult.invalidIndexRowCount;
-            beyondMaxLookBackMissingIndexRowCount += phaseResult.beyondMaxLookBackMissingIndexRowCount;
-            beyondMaxLookBackInvalidIndexRowCount += phaseResult.beyondMaxLookBackInvalidIndexRowCount;
+            setBeyondMaxLookBackMissingIndexRowCount(getBeyondMaxLookBackMissingIndexRowCount() +
+                phaseResult.getBeyondMaxLookBackMissingIndexRowCount());
+            setBeyondMaxLookBackInvalidIndexRowCount(getBeyondMaxLookBackInvalidIndexRowCount() +
+                phaseResult.getBeyondMaxLookBackInvalidIndexRowCount());
+            setValidIndexRowCount(getValidIndexRowCount() + phaseResult.getValidIndexRowCount());
+            setExpiredIndexRowCount(getExpiredIndexRowCount() + phaseResult.getExpiredIndexRowCount());
+            setMissingIndexRowCount(getMissingIndexRowCount() + phaseResult.getMissingIndexRowCount());
+            setInvalidIndexRowCount(getInvalidIndexRowCount() + phaseResult.getInvalidIndexRowCount());
         }
 
-        public PhaseResult(){}
+        public PhaseResult() {
+        }
 
         public PhaseResult(long validIndexRowCount, long expiredIndexRowCount,
-                long missingIndexRowCount, long invalidIndexRowCount,
-                long beyondMaxLookBackMissingIndexRowCount, long beyondMaxLookBackInvalidIndexRowCount) {
-            this.validIndexRowCount = validIndexRowCount;
-            this.expiredIndexRowCount = expiredIndexRowCount;
-            this.missingIndexRowCount = missingIndexRowCount;
-            this.invalidIndexRowCount = invalidIndexRowCount;
-            this.beyondMaxLookBackInvalidIndexRowCount = beyondMaxLookBackInvalidIndexRowCount;
-            this.beyondMaxLookBackMissingIndexRowCount = beyondMaxLookBackMissingIndexRowCount;
+                           long missingIndexRowCount, long invalidIndexRowCount,
+                           long beyondMaxLookBackMissingIndexRowCount,
+                           long beyondMaxLookBackInvalidIndexRowCount) {
+            this.setValidIndexRowCount(validIndexRowCount);
+            this.setExpiredIndexRowCount(expiredIndexRowCount);
+            this.setMissingIndexRowCount(missingIndexRowCount);
+            this.setInvalidIndexRowCount(invalidIndexRowCount);
+            this.setBeyondMaxLookBackInvalidIndexRowCount(beyondMaxLookBackInvalidIndexRowCount);
+            this.setBeyondMaxLookBackMissingIndexRowCount(beyondMaxLookBackMissingIndexRowCount);
         }
 
+
         public long getTotalCount() {
-            return validIndexRowCount + expiredIndexRowCount + missingIndexRowCount + invalidIndexRowCount + beyondMaxLookBackMissingIndexRowCount + beyondMaxLookBackInvalidIndexRowCount;
+            return getValidIndexRowCount() + getExpiredIndexRowCount() + getMissingIndexRowCount() + getInvalidIndexRowCount()
+                + getBeyondMaxLookBackMissingIndexRowCount() + getBeyondMaxLookBackInvalidIndexRowCount();
         }
 
         @Override
         public String toString() {
             return "PhaseResult{" +
-                    "validIndexRowCount=" + validIndexRowCount +
-                    ", expiredIndexRowCount=" + expiredIndexRowCount +
-                    ", missingIndexRowCount=" + missingIndexRowCount +
-                    ", invalidIndexRowCount=" + invalidIndexRowCount +
-                    ", beyondMaxLookBackMissingIndexRowCount=" + beyondMaxLookBackMissingIndexRowCount +
-                    ", beyondMaxLookBackInvalidIndexRowCount=" + beyondMaxLookBackInvalidIndexRowCount;
+                "validIndexRowCount=" + getValidIndexRowCount() +
+                ", expiredIndexRowCount=" + getExpiredIndexRowCount() +
+                ", missingIndexRowCount=" + getMissingIndexRowCount() +
+                ", invalidIndexRowCount=" + getInvalidIndexRowCount() +
+                ", beyondMaxLookBackMissingIndexRowCount=" + getBeyondMaxLookBackMissingIndexRowCount() +
+                ", beyondMaxLookBackInvalidIndexRowCount=" + getBeyondMaxLookBackInvalidIndexRowCount();
         }
 
         @Override
@@ -87,39 +165,91 @@
                 return false;
             }
             PhaseResult pr = (PhaseResult) o;
-            return this.expiredIndexRowCount == pr.expiredIndexRowCount
-                    && this.validIndexRowCount == pr.validIndexRowCount
-                    && this.invalidIndexRowCount == pr.invalidIndexRowCount
-                    && this.missingIndexRowCount == pr.missingIndexRowCount
-                    && this.beyondMaxLookBackInvalidIndexRowCount == pr.beyondMaxLookBackInvalidIndexRowCount
-                    && this.beyondMaxLookBackMissingIndexRowCount == pr.beyondMaxLookBackMissingIndexRowCount;
+            return this.getExpiredIndexRowCount() == pr.getExpiredIndexRowCount()
+                && this.getValidIndexRowCount() == pr.getValidIndexRowCount()
+                && this.getInvalidIndexRowCount() == pr.getInvalidIndexRowCount()
+                && this.getMissingIndexRowCount() == pr.getMissingIndexRowCount()
+                && this.getBeyondMaxLookBackInvalidIndexRowCount() == pr.getBeyondMaxLookBackInvalidIndexRowCount()
+                && this.getBeyondMaxLookBackMissingIndexRowCount() == pr.getBeyondMaxLookBackMissingIndexRowCount();
         }
 
         @Override
         public int hashCode() {
             long result = 17;
-            result = 31 * result + expiredIndexRowCount;
-            result = 31 * result + validIndexRowCount;
-            result = 31 * result + missingIndexRowCount;
-            result = 31 * result + invalidIndexRowCount;
-            result = 31 * result + beyondMaxLookBackMissingIndexRowCount;
-            result = 31 * result + beyondMaxLookBackInvalidIndexRowCount;
-            return (int)result;
+            result = 31 * result + getExpiredIndexRowCount();
+            result = 31 * result + getValidIndexRowCount();
+            result = 31 * result + getMissingIndexRowCount();
+            result = 31 * result + getInvalidIndexRowCount();
+            result = 31 * result + getBeyondMaxLookBackMissingIndexRowCount();
+            result = 31 * result + getBeyondMaxLookBackInvalidIndexRowCount();
+            return (int) result;
+        }
+
+        public long getValidIndexRowCount() {
+            return validIndexRowCount;
+        }
+
+        public void setValidIndexRowCount(long validIndexRowCount) {
+            this.validIndexRowCount = validIndexRowCount;
+        }
+
+        public long getExpiredIndexRowCount() {
+            return expiredIndexRowCount;
+        }
+
+        public void setExpiredIndexRowCount(long expiredIndexRowCount) {
+            this.expiredIndexRowCount = expiredIndexRowCount;
+        }
+
+        public long getMissingIndexRowCount() {
+            return missingIndexRowCount;
+        }
+
+        public void setMissingIndexRowCount(long missingIndexRowCount) {
+            this.missingIndexRowCount = missingIndexRowCount;
+        }
+
+        public long getInvalidIndexRowCount() {
+            return invalidIndexRowCount;
+        }
+
+        public void setInvalidIndexRowCount(long invalidIndexRowCount) {
+            this.invalidIndexRowCount = invalidIndexRowCount;
+        }
+
+        public long getBeyondMaxLookBackMissingIndexRowCount() {
+            return beyondMaxLookBackMissingIndexRowCount;
+        }
+
+        public void setBeyondMaxLookBackMissingIndexRowCount(long beyondMaxLookBackMissingIndexRowCount) {
+            this.beyondMaxLookBackMissingIndexRowCount = beyondMaxLookBackMissingIndexRowCount;
+        }
+
+        public long getBeyondMaxLookBackInvalidIndexRowCount() {
+            return beyondMaxLookBackInvalidIndexRowCount;
+        }
+
+        public void setBeyondMaxLookBackInvalidIndexRowCount(long beyondMaxLookBackInvalidIndexRowCount) {
+            this.beyondMaxLookBackInvalidIndexRowCount = beyondMaxLookBackInvalidIndexRowCount;
         }
     }
 
-    long scannedDataRowCount = 0;
-    long rebuiltIndexRowCount = 0;
-    PhaseResult before = new PhaseResult();
-    PhaseResult after = new PhaseResult();
+    private long scannedDataRowCount = 0;
+    private long rebuiltIndexRowCount = 0;
+    private byte[] startRow;
+    private byte[] stopRow;
+    private long scanMaxTs;
+    private byte[] region;
+    private PhaseResult before = new PhaseResult();
+    private PhaseResult after = new PhaseResult();
 
     @Override
     public String toString() {
         return "VerificationResult{" +
-                "scannedDataRowCount=" + scannedDataRowCount +
-                ", rebuiltIndexRowCount=" + rebuiltIndexRowCount +
-                ", before=" + before +
-                ", after=" + after +
+                "scannedDataRowCount=" + getScannedDataRowCount() +
+                ", rebuiltIndexRowCount=" + getRebuiltIndexRowCount() +
+                ", before=" + getBefore() +
+                ", after=" + getAfter() +
                 '}';
     }
 
@@ -132,107 +262,107 @@
     }
 
     public long getBeforeRebuildValidIndexRowCount() {
-        return before.validIndexRowCount;
+        return getBefore().getValidIndexRowCount();
     }
 
     public long getBeforeRebuildExpiredIndexRowCount() {
-        return before.expiredIndexRowCount;
+        return getBefore().getExpiredIndexRowCount();
     }
 
     public long getBeforeRebuildInvalidIndexRowCount() {
-        return before.invalidIndexRowCount;
+        return getBefore().getInvalidIndexRowCount();
     }
 
     public long getBeforeRebuildBeyondMaxLookBackMissingIndexRowCount() {
-        return before.beyondMaxLookBackMissingIndexRowCount;
-    };
+        return before.getBeyondMaxLookBackMissingIndexRowCount();
+    }
 
     public long getBeforeRebuildBeyondMaxLookBackInvalidIndexRowCount() {
-        return before.beyondMaxLookBackInvalidIndexRowCount;
-    };
+        return before.getBeyondMaxLookBackInvalidIndexRowCount();
+    }
 
     public long getBeforeRebuildMissingIndexRowCount() {
-        return before.missingIndexRowCount;
+        return getBefore().getMissingIndexRowCount();
     }
 
     public long getAfterRebuildValidIndexRowCount() {
-        return after.validIndexRowCount;
+        return getAfter().getValidIndexRowCount();
     }
 
     public long getAfterRebuildExpiredIndexRowCount() {
-        return after.expiredIndexRowCount;
+        return getAfter().getExpiredIndexRowCount();
     }
 
     public long getAfterRebuildInvalidIndexRowCount() {
-        return after.invalidIndexRowCount;
+        return getAfter().getInvalidIndexRowCount();
     }
 
     public long getAfterRebuildMissingIndexRowCount() {
-        return after.missingIndexRowCount;
+        return getAfter().getMissingIndexRowCount();
     }
 
     public long getAfterRebuildBeyondMaxLookBackMissingIndexRowCount() {
-        return after.beyondMaxLookBackMissingIndexRowCount;
-    };
+        return after.getBeyondMaxLookBackMissingIndexRowCount();
+    }
 
     public long getAfterRebuildBeyondMaxLookBackInvalidIndexRowCount() {
-        return after.beyondMaxLookBackInvalidIndexRowCount;
-    };
+        return after.getBeyondMaxLookBackInvalidIndexRowCount();
+    }
 
     private void addScannedDataRowCount(long count) {
-        this.scannedDataRowCount += count;
+        this.setScannedDataRowCount(this.getScannedDataRowCount() + count);
     }
 
     private void addRebuiltIndexRowCount(long count) {
-        this.rebuiltIndexRowCount += count;
+        this.setRebuiltIndexRowCount(this.getRebuiltIndexRowCount() + count);
     }
 
     private void addBeforeRebuildValidIndexRowCount(long count) {
-        before.validIndexRowCount += count;
+        getBefore().setValidIndexRowCount(getBefore().getValidIndexRowCount() + count);
     }
 
     private void addBeforeRebuildExpiredIndexRowCount(long count) {
-        before.expiredIndexRowCount += count;
+        getBefore().setExpiredIndexRowCount(getBefore().getExpiredIndexRowCount() + count);
     }
 
     private void addBeforeRebuildMissingIndexRowCount(long count) {
-        before.missingIndexRowCount += count;
+        getBefore().setMissingIndexRowCount(getBefore().getMissingIndexRowCount() + count);
     }
 
     private void addBeforeRebuildInvalidIndexRowCount(long count) {
-        before.invalidIndexRowCount += count;
+        getBefore().setInvalidIndexRowCount(getBefore().getInvalidIndexRowCount() + count);
     }
 
     private void addBeforeRebuildBeyondMaxLookBackMissingIndexRowCount(long count) {
-        before.beyondMaxLookBackMissingIndexRowCount += count;
+        before.setBeyondMaxLookBackMissingIndexRowCount(before.getBeyondMaxLookBackMissingIndexRowCount() + count);
     }
 
     private void addBeforeRebuildBeyondMaxLookBackInvalidIndexRowCount(long count) {
-        before.beyondMaxLookBackInvalidIndexRowCount += count;
+        before.setBeyondMaxLookBackInvalidIndexRowCount(before.getBeyondMaxLookBackInvalidIndexRowCount() + count);
     }
 
     private void addAfterRebuildValidIndexRowCount(long count) {
-        after.validIndexRowCount += count;
+        getAfter().setValidIndexRowCount(getAfter().getValidIndexRowCount() + count);
     }
 
     private void addAfterRebuildExpiredIndexRowCount(long count) {
-        after.expiredIndexRowCount += count;
+        getAfter().setExpiredIndexRowCount(getAfter().getExpiredIndexRowCount() + count);
     }
 
     private void addAfterRebuildMissingIndexRowCount(long count) {
-        after.missingIndexRowCount += count;
+        getAfter().setMissingIndexRowCount(getAfter().getMissingIndexRowCount() + count);
     }
 
     private void addAfterRebuildInvalidIndexRowCount(long count) {
-        after.invalidIndexRowCount += count;
+        getAfter().setInvalidIndexRowCount(getAfter().getInvalidIndexRowCount() + count);
     }
 
     private void addAfterRebuildBeyondMaxLookBackMissingIndexRowCount(long count) {
-        after.beyondMaxLookBackMissingIndexRowCount += count;
+        after.setBeyondMaxLookBackMissingIndexRowCount(after.getBeyondMaxLookBackMissingIndexRowCount() + count);
     }
 
     private void addAfterRebuildBeyondMaxLookBackInvalidIndexRowCount(long count) {
-        after.beyondMaxLookBackInvalidIndexRowCount += count;
+        after.setBeyondMaxLookBackInvalidIndexRowCount(after.getBeyondMaxLookBackInvalidIndexRowCount() + count);
     }
 
     private static boolean isAfterRebuildInvalidIndexRowCount(Cell cell) {
@@ -249,7 +379,7 @@
                 cell.getValueOffset(), cell.getValueLength()));
     }
 
-    private void update(Cell cell) {
+    public void update(Cell cell) {
         if (CellUtil
                 .matchingColumn(cell, RESULT_TABLE_COLUMN_FAMILY, SCANNED_DATA_ROW_COUNT_BYTES)) {
             addScannedDataRowCount(getValue(cell));
@@ -282,57 +412,17 @@
         }
     }
 
-    public static byte[] calculateTheClosestNextRowKeyForPrefix(byte[] rowKeyPrefix) {
-        // Essentially we are treating it like an 'unsigned very very long' and doing +1 manually.
-        // Search for the place where the trailing 0xFFs start
-        int offset = rowKeyPrefix.length;
-        while (offset > 0) {
-            if (rowKeyPrefix[offset - 1] != (byte) 0xFF) {
-                break;
-            }
-            offset--;
-        }
-        if (offset == 0) {
-            // We got an 0xFFFF... (only FFs) stopRow value which is
-            // the last possible prefix before the end of the table.
-            // So set it to stop at the 'end of the table'
-            return HConstants.EMPTY_END_ROW;
-        }
-        // Copy the right length of the original
-        byte[] newStopRow = Arrays.copyOfRange(rowKeyPrefix, 0, offset);
-        // And increment the last one
-        newStopRow[newStopRow.length - 1]++;
-        return newStopRow;
-    }
-
-    public static IndexToolVerificationResult getVerificationResult(Table hTable, long ts)
-            throws IOException {
-        IndexToolVerificationResult verificationResult = new IndexToolVerificationResult();
-        byte[] startRowKey = Bytes.toBytes(Long.toString(ts));
-        byte[] stopRowKey = calculateTheClosestNextRowKeyForPrefix(startRowKey);
-        Scan scan = new Scan();
-        scan.setStartRow(startRowKey);
-        scan.setStopRow(stopRowKey);
-        ResultScanner scanner = hTable.getScanner(scan);
-        for (Result result = scanner.next(); result != null; result = scanner.next()) {
-            for (Cell cell : result.rawCells()) {
-                verificationResult.update(cell);
-            }
-        }
-        return verificationResult;
-    }
-
     public boolean isVerificationFailed(IndexTool.IndexVerifyType verifyType) {
         if (verifyType == IndexTool.IndexVerifyType.BEFORE || verifyType == IndexTool.IndexVerifyType.NONE) {
             return false;
         } else if (verifyType == IndexTool.IndexVerifyType.ONLY) {
-            if (before.invalidIndexRowCount + before.missingIndexRowCount
-                    + before.beyondMaxLookBackInvalidIndexRowCount + before.beyondMaxLookBackMissingIndexRowCount > 0) {
+            if (getBefore().getInvalidIndexRowCount() + getBefore().getMissingIndexRowCount() +
+                before.getBeyondMaxLookBackInvalidIndexRowCount() + before.getBeyondMaxLookBackMissingIndexRowCount() > 0) {
                 return true;
             }
         } else if (verifyType == IndexTool.IndexVerifyType.BOTH || verifyType == IndexTool.IndexVerifyType.AFTER) {
-            if (after.invalidIndexRowCount + after.missingIndexRowCount
-                    + after.beyondMaxLookBackInvalidIndexRowCount + after.beyondMaxLookBackMissingIndexRowCount > 0) {
+            if (getAfter().getInvalidIndexRowCount() + getAfter().getMissingIndexRowCount() +
+                after.getBeyondMaxLookBackInvalidIndexRowCount() + after.getBeyondMaxLookBackMissingIndexRowCount() > 0) {
                 return true;
             }
         }
@@ -340,9 +430,9 @@
     }
 
     public void add(IndexToolVerificationResult verificationResult) {
-        scannedDataRowCount += verificationResult.scannedDataRowCount;
-        rebuiltIndexRowCount += verificationResult.rebuiltIndexRowCount;
-        before.add(verificationResult.before);
-        after.add(verificationResult.after);
+        setScannedDataRowCount(getScannedDataRowCount() + verificationResult.getScannedDataRowCount());
+        setRebuiltIndexRowCount(getRebuiltIndexRowCount() + verificationResult.getRebuiltIndexRowCount());
+        getBefore().add(verificationResult.getBefore());
+        getAfter().add(verificationResult.getAfter());
     }
 }
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/index/IndexTool.java b/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/index/IndexTool.java
index 95703a4..e30da37 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/index/IndexTool.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/index/IndexTool.java
@@ -50,7 +50,6 @@
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.hbase.HBaseConfiguration;
-import org.apache.hadoop.hbase.HColumnDescriptor;
 import org.apache.hadoop.hbase.HConstants;
 import org.apache.hadoop.hbase.HTableDescriptor;
 import org.apache.hadoop.hbase.TableName;
@@ -73,7 +72,6 @@
 import org.apache.hadoop.util.ToolRunner;
 import org.apache.phoenix.compile.PostIndexDDLCompiler;
 import org.apache.phoenix.coprocessor.BaseScannerRegionObserver;
-import org.apache.phoenix.coprocessor.MetaDataProtocol;
 import org.apache.phoenix.hbase.index.ValueGetter;
 import org.apache.phoenix.hbase.index.covered.update.ColumnReference;
 import org.apache.phoenix.hbase.index.util.IndexManagementUtil;
@@ -89,7 +87,6 @@
 import org.apache.phoenix.mapreduce.util.PhoenixMapReduceUtil;
 import org.apache.phoenix.parse.HintNode.Hint;
 import org.apache.phoenix.query.ConnectionQueryServices;
-import org.apache.phoenix.query.QueryConstants;
 import org.apache.phoenix.query.QueryServices;
 import org.apache.phoenix.query.QueryServicesOptions;
 import org.apache.phoenix.schema.PIndexState;
@@ -155,57 +152,6 @@
         }
     }
 
-    public final static String OUTPUT_TABLE_NAME = "PHOENIX_INDEX_TOOL";
-    public final static byte[] OUTPUT_TABLE_NAME_BYTES = Bytes.toBytes(OUTPUT_TABLE_NAME);
-    public final static byte[] OUTPUT_TABLE_COLUMN_FAMILY = QueryConstants.DEFAULT_COLUMN_FAMILY_BYTES;
-    public final static String RESULT_TABLE_NAME = "PHOENIX_INDEX_TOOL_RESULT";
-    public final static byte[] RESULT_TABLE_NAME_BYTES = Bytes.toBytes(RESULT_TABLE_NAME);
-    public final static byte[] RESULT_TABLE_COLUMN_FAMILY = QueryConstants.DEFAULT_COLUMN_FAMILY_BYTES;
-    public final static String DATA_TABLE_NAME = "DTName";
-    public final static byte[] DATA_TABLE_NAME_BYTES = Bytes.toBytes(DATA_TABLE_NAME);
-    public static String INDEX_TABLE_NAME = "ITName";
-    public final static byte[] INDEX_TABLE_NAME_BYTES = Bytes.toBytes(INDEX_TABLE_NAME);
-    public static String DATA_TABLE_ROW_KEY = "DTRowKey";
-    public final static byte[] DATA_TABLE_ROW_KEY_BYTES = Bytes.toBytes(DATA_TABLE_ROW_KEY);
-    public static String INDEX_TABLE_ROW_KEY = "ITRowKey";
-    public final static byte[] INDEX_TABLE_ROW_KEY_BYTES = Bytes.toBytes(INDEX_TABLE_ROW_KEY);
-    public static String DATA_TABLE_TS = "DTTS";
-    public final static byte[] DATA_TABLE_TS_BYTES = Bytes.toBytes(DATA_TABLE_TS);
-    public static String INDEX_TABLE_TS = "ITTS";
-    public final static byte[] INDEX_TABLE_TS_BYTES = Bytes.toBytes(INDEX_TABLE_TS);
-    public static String ERROR_MESSAGE = "Error";
-    public final static byte[] ERROR_MESSAGE_BYTES = Bytes.toBytes(ERROR_MESSAGE);
-    public static String SCANNED_DATA_ROW_COUNT = "ScannedDataRowCount";
-    public final static byte[] SCANNED_DATA_ROW_COUNT_BYTES = Bytes.toBytes(SCANNED_DATA_ROW_COUNT);
-    public static String REBUILT_INDEX_ROW_COUNT = "RebuiltIndexRowCount";
-    public final static byte[] REBUILT_INDEX_ROW_COUNT_BYTES = Bytes.toBytes(REBUILT_INDEX_ROW_COUNT);
-    public static String BEFORE_REBUILD_VALID_INDEX_ROW_COUNT = "BeforeRebuildValidIndexRowCount";
-    public final static byte[] BEFORE_REBUILD_VALID_INDEX_ROW_COUNT_BYTES = Bytes.toBytes(BEFORE_REBUILD_VALID_INDEX_ROW_COUNT);
-    public static String BEFORE_REBUILD_EXPIRED_INDEX_ROW_COUNT = "BeforeRebuildExpiredIndexRowCount";
-    public final static byte[] BEFORE_REBUILD_EXPIRED_INDEX_ROW_COUNT_BYTES = Bytes.toBytes(BEFORE_REBUILD_EXPIRED_INDEX_ROW_COUNT);
-    public static String BEFORE_REBUILD_MISSING_INDEX_ROW_COUNT = "BeforeRebuildMissingIndexRowCount";
-    public final static byte[] BEFORE_REBUILD_MISSING_INDEX_ROW_COUNT_BYTES = Bytes.toBytes(BEFORE_REBUILD_MISSING_INDEX_ROW_COUNT);
-    public static String BEFORE_REBUILD_INVALID_INDEX_ROW_COUNT = "BeforeRebuildInvalidIndexRowCount";
-    public final static byte[] BEFORE_REBUILD_INVALID_INDEX_ROW_COUNT_BYTES = Bytes.toBytes(BEFORE_REBUILD_INVALID_INDEX_ROW_COUNT);
-    public static String BEFORE_REBUILD_BEYOND_MAXLOOKBACK_MISSING_INDEX_ROW_COUNT = "BeforeRebuildBeyondMaxLookBackMissingIndexRowCount";
-    public static byte[] BEFORE_REBUILD_BEYOND_MAXLOOKBACK_MISSING_INDEX_ROW_COUNT_BYTES = Bytes.toBytes(BEFORE_REBUILD_BEYOND_MAXLOOKBACK_MISSING_INDEX_ROW_COUNT);
-    public static String BEFORE_REBUILD_BEYOND_MAXLOOKBACK_INVALID_INDEX_ROW_COUNT = "BeforeRebuildBeyondMaxLookBackInvalidIndexRowCount";
-    public static byte[] BEFORE_REBUILD_BEYOND_MAXLOOKBACK_INVALID_INDEX_ROW_COUNT_BYTES = Bytes.toBytes(BEFORE_REBUILD_BEYOND_MAXLOOKBACK_INVALID_INDEX_ROW_COUNT);
-    public static String AFTER_REBUILD_VALID_INDEX_ROW_COUNT = "AfterValidExpiredIndexRowCount";
-    public final static byte[] AFTER_REBUILD_VALID_INDEX_ROW_COUNT_BYTES = Bytes.toBytes(AFTER_REBUILD_VALID_INDEX_ROW_COUNT);
-    public static String AFTER_REBUILD_EXPIRED_INDEX_ROW_COUNT = "AfterRebuildExpiredIndexRowCount";
-    public final static byte[] AFTER_REBUILD_EXPIRED_INDEX_ROW_COUNT_BYTES = Bytes.toBytes(AFTER_REBUILD_EXPIRED_INDEX_ROW_COUNT);
-    public static String AFTER_REBUILD_MISSING_INDEX_ROW_COUNT = "AfterRebuildMissingIndexRowCount";
-    public final static byte[] AFTER_REBUILD_MISSING_INDEX_ROW_COUNT_BYTES = Bytes.toBytes(AFTER_REBUILD_MISSING_INDEX_ROW_COUNT);
-    public static String AFTER_REBUILD_INVALID_INDEX_ROW_COUNT = "AfterRebuildInvalidIndexRowCount";
-    public final static byte[] AFTER_REBUILD_INVALID_INDEX_ROW_COUNT_BYTES = Bytes.toBytes(AFTER_REBUILD_INVALID_INDEX_ROW_COUNT);
-    public static String AFTER_REBUILD_BEYOND_MAXLOOKBACK_MISSING_INDEX_ROW_COUNT = "AfterRebuildBeyondMaxLookBackMissingIndexRowCount";
-    public static byte[] AFTER_REBUILD_BEYOND_MAXLOOKBACK_MISSING_INDEX_ROW_COUNT_BYTES = Bytes.toBytes(AFTER_REBUILD_BEYOND_MAXLOOKBACK_MISSING_INDEX_ROW_COUNT);
-    public static String AFTER_REBUILD_BEYOND_MAXLOOKBACK_INVALID_INDEX_ROW_COUNT = "AfterRebuildBeyondMaxLookBackInvalidIndexRowCount";
-    public static byte[] AFTER_REBUILD_BEYOND_MAXLOOKBACK_INVALID_INDEX_ROW_COUNT_BYTES = Bytes.toBytes(AFTER_REBUILD_BEYOND_MAXLOOKBACK_INVALID_INDEX_ROW_COUNT);
-    public static String  VERIFICATION_PHASE = "Phase";
-    public final static byte[] VERIFICATION_PHASE_BYTES = Bytes.toBytes(VERIFICATION_PHASE);
-
     private static final Logger LOGGER = LoggerFactory.getLogger(IndexTool.class);
 
     private String schemaName;
@@ -703,23 +649,10 @@
     }
 
     private void createIndexToolTables(Connection connection) throws Exception {
-        ConnectionQueryServices queryServices = connection.unwrap(PhoenixConnection.class).getQueryServices();
-        Admin admin = queryServices.getAdmin();
-        if (!admin.tableExists(TableName.valueOf(OUTPUT_TABLE_NAME))) {
-            HTableDescriptor tableDescriptor = new
-                    HTableDescriptor(TableName.valueOf(OUTPUT_TABLE_NAME));
-            tableDescriptor.setValue(HColumnDescriptor.TTL, String.valueOf(MetaDataProtocol.DEFAULT_LOG_TTL));
-            HColumnDescriptor columnDescriptor = new HColumnDescriptor(OUTPUT_TABLE_COLUMN_FAMILY);
-            tableDescriptor.addFamily(columnDescriptor);
-            admin.createTable(tableDescriptor);
-        }
-        if (!admin.tableExists(TableName.valueOf(RESULT_TABLE_NAME))) {
-            HTableDescriptor tableDescriptor = new
-                    HTableDescriptor(TableName.valueOf(RESULT_TABLE_NAME));
-            tableDescriptor.setValue(HColumnDescriptor.TTL, String.valueOf(MetaDataProtocol.DEFAULT_LOG_TTL));
-            HColumnDescriptor columnDescriptor = new HColumnDescriptor(RESULT_TABLE_COLUMN_FAMILY);
-            tableDescriptor.addFamily(columnDescriptor);
-            admin.createTable(tableDescriptor);
+        try (IndexVerificationResultRepository resultRepo = new IndexVerificationResultRepository();
+            IndexVerificationOutputRepository outputRepo = new IndexVerificationOutputRepository()){
+            resultRepo.createResultTable(connection);
+            outputRepo.createOutputTable(connection);
         }
     }
 
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/index/IndexVerificationOutputRepository.java b/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/index/IndexVerificationOutputRepository.java
new file mode 100644
index 0000000..bcc2e73
--- /dev/null
+++ b/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/index/IndexVerificationOutputRepository.java
@@ -0,0 +1,300 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.phoenix.mapreduce.index;
+
+import com.google.common.annotations.VisibleForTesting;
+import org.apache.hadoop.hbase.HColumnDescriptor;
+import org.apache.hadoop.hbase.HTableDescriptor;
+import org.apache.hadoop.hbase.TableName;
+import org.apache.hadoop.hbase.client.Admin;
+import org.apache.hadoop.hbase.client.Put;
+import org.apache.hadoop.hbase.client.Result;
+import org.apache.hadoop.hbase.client.ResultScanner;
+import org.apache.hadoop.hbase.client.Scan;
+import org.apache.hadoop.hbase.client.Table;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.phoenix.coprocessor.MetaDataProtocol;
+import org.apache.phoenix.hbase.index.table.HTableFactory;
+import org.apache.phoenix.hbase.index.util.ImmutableBytesPtr;
+import org.apache.phoenix.jdbc.PhoenixConnection;
+import org.apache.phoenix.query.ConnectionQueryServices;
+import org.apache.phoenix.query.QueryConstants;
+import org.apache.phoenix.util.ByteUtil;
+
+import java.io.IOException;
+import java.sql.Connection;
+import java.sql.SQLException;
+import java.util.ArrayList;
+import java.util.Iterator;
+import java.util.List;
+
+public class IndexVerificationOutputRepository implements AutoCloseable {
+    public static final byte[] ROW_KEY_SEPARATOR_BYTE = Bytes.toBytes("|");
+
+    private Table indexTable;
+    private byte[] indexName;
+    private Table outputTable;
+    public final static String OUTPUT_TABLE_NAME = "PHOENIX_INDEX_TOOL";
+    public final static byte[] OUTPUT_TABLE_NAME_BYTES = Bytes.toBytes(OUTPUT_TABLE_NAME);
+    public final static byte[] OUTPUT_TABLE_COLUMN_FAMILY = QueryConstants.DEFAULT_COLUMN_FAMILY_BYTES;
+
+    public final static String DATA_TABLE_NAME = "DTName";
+    public final static byte[] DATA_TABLE_NAME_BYTES = Bytes.toBytes(DATA_TABLE_NAME);
+    public final static String INDEX_TABLE_NAME = "ITName";
+    public final static byte[] INDEX_TABLE_NAME_BYTES = Bytes.toBytes(INDEX_TABLE_NAME);
+    public final static String DATA_TABLE_ROW_KEY = "DTRowKey";
+    public final static byte[] DATA_TABLE_ROW_KEY_BYTES = Bytes.toBytes(DATA_TABLE_ROW_KEY);
+    public final static String INDEX_TABLE_ROW_KEY = "ITRowKey";
+    public final static byte[] INDEX_TABLE_ROW_KEY_BYTES = Bytes.toBytes(INDEX_TABLE_ROW_KEY);
+    public final static String DATA_TABLE_TS = "DTTS";
+    public final static byte[] DATA_TABLE_TS_BYTES = Bytes.toBytes(DATA_TABLE_TS);
+    public final static String INDEX_TABLE_TS = "ITTS";
+    public final static byte[] INDEX_TABLE_TS_BYTES = Bytes.toBytes(INDEX_TABLE_TS);
+    public final static String ERROR_MESSAGE = "Error";
+    public final static byte[] ERROR_MESSAGE_BYTES = Bytes.toBytes(ERROR_MESSAGE);
+
+    public static String  VERIFICATION_PHASE = "Phase";
+    public final static byte[] VERIFICATION_PHASE_BYTES = Bytes.toBytes(VERIFICATION_PHASE);
+    public final static String EXPECTED_VALUE = "ExpectedValue";
+    public final static byte[] EXPECTED_VALUE_BYTES = Bytes.toBytes(EXPECTED_VALUE);
+    public final static String ACTUAL_VALUE = "ActualValue";
+    public final static byte[] ACTUAL_VALUE_BYTES = Bytes.toBytes(ACTUAL_VALUE);
+    public static final byte[] E_VALUE_PREFIX_BYTES = Bytes.toBytes(" E:");
+    public static final byte[] A_VALUE_PREFIX_BYTES = Bytes.toBytes(" A:");
+    public static final int PREFIX_LENGTH = 3;
+    public static final int TOTAL_PREFIX_LENGTH = 6;
+    public static final byte[] PHASE_BEFORE_VALUE = Bytes.toBytes("BEFORE");
+    public static final byte[] PHASE_AFTER_VALUE = Bytes.toBytes("AFTER");
+
+    /**
+     * Only usable for the create table / read path or for testing. Use setOutputTable and
+     * setIndexTable first to write.
+     */
+    public IndexVerificationOutputRepository() {
+
+    }
+
+    @VisibleForTesting
+    public IndexVerificationOutputRepository(byte[] indexName, Connection conn) throws SQLException {
+        ConnectionQueryServices queryServices =
+            conn.unwrap(PhoenixConnection.class).getQueryServices();
+        outputTable = queryServices.getTable(OUTPUT_TABLE_NAME_BYTES);
+        indexTable = queryServices.getTable(indexName);
+    }
+
+    public IndexVerificationOutputRepository(byte[] indexName, HTableFactory hTableFactory) throws IOException {
+        this.indexName = indexName;
+        outputTable = hTableFactory.getTable(new ImmutableBytesPtr(OUTPUT_TABLE_NAME_BYTES));
+        indexTable = hTableFactory.getTable(new ImmutableBytesPtr(indexName));
+    }
+
+    public static byte[] generateOutputTableRowKey(long ts, byte[] indexTableName, byte[] dataRowKey ) {
+        byte[] keyPrefix = Bytes.toBytes(Long.toString(ts));
+        byte[] rowKey;
+        int targetOffset = 0;
+        // The row key for the output table : timestamp | index table name | data row key
+        rowKey = new byte[keyPrefix.length + ROW_KEY_SEPARATOR_BYTE.length + indexTableName.length +
+            ROW_KEY_SEPARATOR_BYTE.length + dataRowKey.length];
+        Bytes.putBytes(rowKey, targetOffset, keyPrefix, 0, keyPrefix.length);
+        targetOffset += keyPrefix.length;
+        Bytes.putBytes(rowKey, targetOffset, ROW_KEY_SEPARATOR_BYTE, 0, ROW_KEY_SEPARATOR_BYTE.length);
+        targetOffset += ROW_KEY_SEPARATOR_BYTE.length;
+        Bytes.putBytes(rowKey, targetOffset, indexTableName, 0, indexTableName.length);
+        targetOffset += indexTableName.length;
+        Bytes.putBytes(rowKey, targetOffset, ROW_KEY_SEPARATOR_BYTE, 0, ROW_KEY_SEPARATOR_BYTE.length);
+        targetOffset += ROW_KEY_SEPARATOR_BYTE.length;
+        Bytes.putBytes(rowKey, targetOffset, dataRowKey, 0, dataRowKey.length);
+        return rowKey;
+    }
+
+    /**
+     * Generates partial row key for use in a Scan to get all rows for an index verification
+     */
+    private static byte[] generatePartialOutputTableRowKey(long ts, byte[] indexTableName){
+        byte[] keyPrefix = Bytes.toBytes(Long.toString(ts));
+        byte[] partialRowKey;
+        int targetOffset = 0;
+        // The row key for the output table : timestamp | index table name | data row key
+        partialRowKey = new byte[keyPrefix.length + ROW_KEY_SEPARATOR_BYTE.length + indexTableName.length];
+        Bytes.putBytes(partialRowKey, targetOffset, keyPrefix, 0, keyPrefix.length);
+        targetOffset += keyPrefix.length;
+        Bytes.putBytes(partialRowKey, targetOffset, ROW_KEY_SEPARATOR_BYTE, 0, ROW_KEY_SEPARATOR_BYTE.length);
+        targetOffset += ROW_KEY_SEPARATOR_BYTE.length;
+        Bytes.putBytes(partialRowKey, targetOffset, indexTableName, 0, indexTableName.length);
+        return partialRowKey;
+    }
+
+    public void createOutputTable(Connection connection) throws IOException, SQLException {
+        ConnectionQueryServices queryServices = connection.unwrap(PhoenixConnection.class).getQueryServices();
+        Admin admin = queryServices.getAdmin();
+        TableName outputTableName = TableName.valueOf(OUTPUT_TABLE_NAME);
+        if (!admin.tableExists(outputTableName)) {
+            HTableDescriptor tableDescriptor = new
+                HTableDescriptor(TableName.valueOf(OUTPUT_TABLE_NAME));
+            tableDescriptor.setValue(HColumnDescriptor.TTL,
+                String.valueOf(MetaDataProtocol.DEFAULT_LOG_TTL));
+            HColumnDescriptor columnDescriptor = new HColumnDescriptor(OUTPUT_TABLE_COLUMN_FAMILY);
+            tableDescriptor.addFamily(columnDescriptor);
+            admin.createTable(tableDescriptor);
+            outputTable = admin.getConnection().getTable(outputTableName);
+        }
+    }
+        
+    @VisibleForTesting
+    public void logToIndexToolOutputTable(byte[] dataRowKey, byte[] indexRowKey, long dataRowTs,
+                                          long indexRowTs,
+                                          String errorMsg, byte[] expectedValue, byte[] actualValue,
+                                          long scanMaxTs, byte[] tableName, boolean isBeforeRebuild)
+        throws IOException {
+        byte[] rowKey = generateOutputTableRowKey(scanMaxTs, indexTable.getName().toBytes(), dataRowKey);
+        Put put = new Put(rowKey);
+        put.addColumn(OUTPUT_TABLE_COLUMN_FAMILY, DATA_TABLE_NAME_BYTES,
+            scanMaxTs, tableName);
+        put.addColumn(OUTPUT_TABLE_COLUMN_FAMILY, INDEX_TABLE_NAME_BYTES,
+            scanMaxTs, indexName);
+        put.addColumn(OUTPUT_TABLE_COLUMN_FAMILY, DATA_TABLE_TS_BYTES,
+            scanMaxTs, Bytes.toBytes(Long.toString(dataRowTs)));
+
+        put.addColumn(OUTPUT_TABLE_COLUMN_FAMILY, INDEX_TABLE_ROW_KEY_BYTES,
+            scanMaxTs, indexRowKey);
+        put.addColumn(OUTPUT_TABLE_COLUMN_FAMILY, INDEX_TABLE_TS_BYTES,
+            scanMaxTs, Bytes.toBytes(Long.toString(indexRowTs)));
+        byte[] errorMessageBytes;
+        if (expectedValue != null) {
+            errorMessageBytes = getErrorMessageBytes(errorMsg, expectedValue, actualValue);
+            put.addColumn(OUTPUT_TABLE_COLUMN_FAMILY, EXPECTED_VALUE_BYTES, expectedValue);
+            put.addColumn(OUTPUT_TABLE_COLUMN_FAMILY, ACTUAL_VALUE_BYTES, actualValue);
+        } else {
+            errorMessageBytes = Bytes.toBytes(errorMsg);
+        }
+        put.addColumn(OUTPUT_TABLE_COLUMN_FAMILY, ERROR_MESSAGE_BYTES, scanMaxTs, errorMessageBytes);
+        if (isBeforeRebuild) {
+            put.addColumn(OUTPUT_TABLE_COLUMN_FAMILY, VERIFICATION_PHASE_BYTES, scanMaxTs, PHASE_BEFORE_VALUE);
+        } else {
+            put.addColumn(OUTPUT_TABLE_COLUMN_FAMILY, VERIFICATION_PHASE_BYTES, scanMaxTs, PHASE_AFTER_VALUE);
+        }
+        outputTable.put(put);
+    }
+
+    public static byte[] getErrorMessageBytes(String errorMsg, byte[] expectedValue, byte[] actualValue) {
+        byte[] errorMessageBytes;
+        errorMessageBytes = new byte[errorMsg.length() + expectedValue.length + actualValue.length +
+            TOTAL_PREFIX_LENGTH];
+        Bytes.putBytes(errorMessageBytes, 0, Bytes.toBytes(errorMsg), 0, errorMsg.length());
+        int length = errorMsg.length();
+        Bytes.putBytes(errorMessageBytes, length, E_VALUE_PREFIX_BYTES, 0, PREFIX_LENGTH);
+        length += PREFIX_LENGTH;
+        Bytes.putBytes(errorMessageBytes, length, expectedValue, 0, expectedValue.length);
+        length += expectedValue.length;
+        Bytes.putBytes(errorMessageBytes, length, A_VALUE_PREFIX_BYTES, 0, PREFIX_LENGTH);
+        length += PREFIX_LENGTH;
+        Bytes.putBytes(errorMessageBytes, length, actualValue, 0, actualValue.length);
+        return errorMessageBytes;
+    }
+
+    public List<IndexVerificationOutputRow> getOutputRows(long ts, byte[] indexName)
+        throws IOException {
+        Iterator<IndexVerificationOutputRow> iter = getOutputRowIterator(ts, indexName);
+        List<IndexVerificationOutputRow> outputRowList = new ArrayList<IndexVerificationOutputRow>();
+        while (iter.hasNext()){
+            outputRowList.add(iter.next());
+        }
+        return outputRowList;
+    }
+
+    public Iterator<IndexVerificationOutputRow> getOutputRowIterator(long ts, byte[] indexName)
+        throws IOException {
+        Scan scan = new Scan();
+        byte[] partialKey = generatePartialOutputTableRowKey(ts, indexName);
+        scan.withStartRow(partialKey);
+        scan.withStopRow(ByteUtil.calculateTheClosestNextRowKeyForPrefix(partialKey));
+        ResultScanner scanner = outputTable.getScanner(scan);
+        return new IndexVerificationOutputRowIterator(scanner.iterator());
+    }
+
+    public static IndexVerificationOutputRow getOutputRowFromResult(Result result) {
+        IndexVerificationOutputRow.IndexVerificationOutputRowBuilder builder =
+            new IndexVerificationOutputRow.IndexVerificationOutputRowBuilder();
+        byte[] rowKey = result.getRow();
+        //rowkey is scanTs + SEPARATOR_BYTE + indexTableName + SEPARATOR_BYTE + dataTableRowKey
+        byte[][] rowKeySplit = ByteUtil.splitArrayBySeparator(rowKey, ROW_KEY_SEPARATOR_BYTE[0]);
+        builder.setScanMaxTimestamp(Long.parseLong(Bytes.toString(rowKeySplit[0])));
+        builder.setIndexTableName(Bytes.toString(rowKeySplit[1]));
+        builder.setDataTableRowKey(rowKeySplit[2]);
+
+        builder.setDataTableName(Bytes.toString(result.getValue(OUTPUT_TABLE_COLUMN_FAMILY,
+            DATA_TABLE_NAME_BYTES)));
+        builder.setIndexTableRowKey(result.getValue(OUTPUT_TABLE_COLUMN_FAMILY,
+            INDEX_TABLE_ROW_KEY_BYTES));
+        builder.setDataTableRowTimestamp(Long.parseLong(Bytes.toString(result.getValue(OUTPUT_TABLE_COLUMN_FAMILY,
+            DATA_TABLE_TS_BYTES))));
+        builder.setIndexTableRowTimestamp(Long.parseLong(Bytes.toString(result.getValue(OUTPUT_TABLE_COLUMN_FAMILY,
+            INDEX_TABLE_TS_BYTES))));
+        builder.setErrorMessage(Bytes.toString(result.getValue(OUTPUT_TABLE_COLUMN_FAMILY,
+            ERROR_MESSAGE_BYTES)));
+        //actual and expected value might not be present, but will just set to null if not
+        builder.setExpectedValue(result.getValue(OUTPUT_TABLE_COLUMN_FAMILY, EXPECTED_VALUE_BYTES));
+        builder.setActualValue(result.getValue(OUTPUT_TABLE_COLUMN_FAMILY, ACTUAL_VALUE_BYTES));
+        builder.setPhaseValue(result.getValue(OUTPUT_TABLE_COLUMN_FAMILY, VERIFICATION_PHASE_BYTES));
+        return builder.build();
+    }
+
+    public void close() throws IOException {
+        if (outputTable != null) {
+            outputTable.close();
+        }
+        if (indexTable != null) {
+            indexTable.close();
+        }
+    }
+
+    public class IndexVerificationOutputRowIterator implements Iterator<IndexVerificationOutputRow> {
+        Iterator<Result> delegate;
+        public IndexVerificationOutputRowIterator(Iterator<Result> delegate){
+            this.delegate = delegate;
+        }
+        @Override
+        public boolean hasNext() {
+            return delegate.hasNext();
+        }
+
+        @Override
+        public IndexVerificationOutputRow next() {
+            Result result = delegate.next();
+            if (result == null) {
+                return null;
+            } else {
+                return getOutputRowFromResult(result);
+            }
+        }
+
+        @Override
+        public void remove() {
+            delegate.remove();
+        }
+
+    }
+
+    public void setIndexTable(Table indexTable) {
+        this.indexTable = indexTable;
+    }
+
+    public void setOutputTable(Table outputTable) {
+        this.outputTable = outputTable;
+    }
+}
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/index/IndexVerificationOutputRow.java b/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/index/IndexVerificationOutputRow.java
new file mode 100644
index 0000000..8c54796
--- /dev/null
+++ b/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/index/IndexVerificationOutputRow.java
@@ -0,0 +1,223 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.phoenix.mapreduce.index;
+
+import org.apache.hadoop.hbase.util.Bytes;
+
+import java.util.Arrays;
+import java.util.Objects;
+
+public class IndexVerificationOutputRow {
+    public static final String SCAN_MAX_TIMESTAMP = "ScanMaxTimestamp: ";
+    private String dataTableName;
+    private String indexTableName;
+    private Long scanMaxTimestamp;
+    private byte[] dataTableRowKey;
+    private byte[] indexTableRowKey;
+    private Long dataTableRowTimestamp;
+    private Long indexTableRowTimestamp;
+    private String errorMessage;
+    private byte[] expectedValue;
+    private byte[] actualValue;
+    private byte[] phaseValue;
+
+    private IndexVerificationOutputRow(String dataTableName, String indexTableName,
+                                       byte[] dataTableRowKey, Long scanMaxTimestamp,
+                                      byte[] indexTableRowKey,
+                                       long dataTableRowTimestamp, long indexTableRowTimestamp,
+                                      String errorMessage, byte[] expectedValue, byte[] actualValue,
+                                      byte[] phaseValue) {
+        this.dataTableName = dataTableName;
+        this.indexTableName = indexTableName;
+        this.scanMaxTimestamp = scanMaxTimestamp;
+        this.dataTableRowKey = dataTableRowKey;
+        this.indexTableRowKey = indexTableRowKey;
+        this.dataTableRowTimestamp = dataTableRowTimestamp;
+        this.indexTableRowTimestamp = indexTableRowTimestamp;
+        this.errorMessage = errorMessage;
+        this.expectedValue = expectedValue;
+        this.actualValue = actualValue;
+        this.phaseValue = phaseValue;
+    }
+
+    public String getDataTableName() {
+        return dataTableName;
+    }
+
+    public String getIndexTableName() {
+        return indexTableName;
+    }
+
+    public Long getScanMaxTimestamp() {
+        return scanMaxTimestamp;
+    }
+
+    public byte[] getIndexTableRowKey() {
+        return indexTableRowKey;
+    }
+
+    public long getIndexTableRowTimestamp() {
+        return indexTableRowTimestamp;
+    }
+
+    public String getErrorMessage() {
+        return errorMessage;
+    }
+
+    public byte[] getExpectedValue() {
+        return expectedValue;
+    }
+
+    public byte[] getActualValue() {
+        return actualValue;
+    }
+
+    public byte[] getPhaseValue() {
+        return phaseValue;
+    }
+
+    public byte[] getDataTableRowKey() {
+        return dataTableRowKey;
+    }
+
+    public Long getDataTableRowTimestamp() {
+        return dataTableRowTimestamp;
+    }
+
+    @Override
+    public boolean equals(Object o) {
+        if (o == null ) {
+            return false;
+        }
+        if (!(o instanceof  IndexVerificationOutputRow)) {
+            return false;
+        }
+        IndexVerificationOutputRow otherRow = (IndexVerificationOutputRow) o;
+
+        return Objects.equals(dataTableName, otherRow.getDataTableName()) &&
+            Objects.equals(indexTableName, otherRow.getIndexTableName()) &&
+            Objects.equals(scanMaxTimestamp, otherRow.getScanMaxTimestamp()) &&
+            Arrays.equals(dataTableRowKey, otherRow.getDataTableRowKey()) &&
+            Arrays.equals(indexTableRowKey, otherRow.getIndexTableRowKey()) &&
+            Objects.equals(dataTableRowTimestamp, otherRow.getDataTableRowTimestamp()) &&
+            Objects.equals(indexTableRowTimestamp, otherRow.getIndexTableRowTimestamp()) &&
+            Objects.equals(errorMessage, otherRow.getErrorMessage()) &&
+            Arrays.equals(expectedValue, otherRow.getExpectedValue()) &&
+            Arrays.equals(actualValue, otherRow.getActualValue()) &&
+            Arrays.equals(phaseValue, otherRow.getPhaseValue());
+    }
+
+    @Override
+    public int hashCode(){
+        return Objects.hashCode(scanMaxTimestamp) ^ Objects.hashCode(indexTableName) ^
+            Arrays.hashCode(dataTableRowKey);
+    }
+
+    @Override
+    public String toString() {
+        StringBuilder sb = new StringBuilder();
+        sb.append(IndexVerificationOutputRepository.DATA_TABLE_NAME + ": ").append(dataTableName).append(",");
+        sb.append(IndexVerificationOutputRepository.INDEX_TABLE_NAME + ": ").append(indexTableName).append(",");
+        sb.append(SCAN_MAX_TIMESTAMP).append(": ").append(scanMaxTimestamp).append(",");
+        sb.append(IndexVerificationOutputRepository.DATA_TABLE_ROW_KEY + ": ").append(Bytes.toString(dataTableRowKey)).append(",");
+        sb.append(IndexVerificationOutputRepository.INDEX_TABLE_ROW_KEY + ": ").append(Bytes.toString(indexTableRowKey)).append(",");
+        sb.append(IndexVerificationOutputRepository.DATA_TABLE_TS + ": ").append(dataTableRowTimestamp).append(",");
+        sb.append(IndexVerificationOutputRepository.INDEX_TABLE_TS + ": ").append(indexTableRowTimestamp).append(",");
+        sb.append(IndexVerificationOutputRepository.ERROR_MESSAGE + ": ").append(errorMessage).append(",");
+        sb.append(IndexVerificationOutputRepository.EXPECTED_VALUE + ": ").append(Bytes.toString(expectedValue)).append(",");
+        sb.append(IndexVerificationOutputRepository.ACTUAL_VALUE + ": ").append(Bytes.toString(actualValue)).append(
+            ",");
+        sb.append(IndexVerificationOutputRepository.VERIFICATION_PHASE + ": ").append(Bytes.toString(phaseValue));
+        return sb.toString();
+    }
+
+    public static class IndexVerificationOutputRowBuilder {
+        private String dataTableName;
+        private String indexTableName;
+        private Long scanMaxTimestamp;
+        private byte[] dataTableRowKey;
+        private byte[] indexTableRowKey;
+        private long dataTableRowTimestamp;
+        private long indexTableRowTimestamp;
+        private String errorMessage;
+        private byte[] expectedValue;
+        private byte[] actualValue;
+        private byte[] phaseValue;
+
+        public IndexVerificationOutputRowBuilder setDataTableName(String dataTableName) {
+            this.dataTableName = dataTableName;
+            return this;
+        }
+
+        public IndexVerificationOutputRowBuilder setIndexTableName(String indexTableName) {
+            this.indexTableName = indexTableName;
+            return this;
+        }
+
+        public IndexVerificationOutputRowBuilder setScanMaxTimestamp(Long scanMaxTimestamp) {
+            this.scanMaxTimestamp = scanMaxTimestamp;
+            return this;
+        }
+
+        public IndexVerificationOutputRowBuilder setIndexTableRowKey(byte[] indexTableRowKey) {
+            this.indexTableRowKey = indexTableRowKey;
+            return this;
+        }
+
+        public IndexVerificationOutputRowBuilder setDataTableRowKey(byte[] dataTableRowKey){
+            this.dataTableRowKey = dataTableRowKey;
+            return this;
+        }
+
+        public IndexVerificationOutputRowBuilder setDataTableRowTimestamp(long dataTableRowTimestamp) {
+            this.dataTableRowTimestamp = dataTableRowTimestamp;
+            return this;
+        }
+
+        public IndexVerificationOutputRowBuilder setIndexTableRowTimestamp(long indexTableRowTimestamp) {
+            this.indexTableRowTimestamp = indexTableRowTimestamp;
+            return this;
+        }
+
+        public IndexVerificationOutputRowBuilder setErrorMessage(String errorMessage) {
+            this.errorMessage = errorMessage;
+            return this;
+        }
+
+        public IndexVerificationOutputRowBuilder setExpectedValue(byte[] expectedValue) {
+            this.expectedValue = expectedValue;
+            return this;
+        }
+
+        public IndexVerificationOutputRowBuilder setActualValue(byte[] actualValue) {
+            this.actualValue = actualValue;
+            return this;
+        }
+
+        public IndexVerificationOutputRowBuilder setPhaseValue(byte[] phaseValue) {
+            this.phaseValue = phaseValue;
+            return this;
+        }
+
+        public IndexVerificationOutputRow build() {
+            return new IndexVerificationOutputRow(dataTableName, indexTableName, dataTableRowKey,
+                scanMaxTimestamp, indexTableRowKey, dataTableRowTimestamp, indexTableRowTimestamp,
+                errorMessage, expectedValue, actualValue, phaseValue);
+        }
+    }
+}
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/index/IndexVerificationResultRepository.java b/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/index/IndexVerificationResultRepository.java
new file mode 100644
index 0000000..ca8b129
--- /dev/null
+++ b/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/index/IndexVerificationResultRepository.java
@@ -0,0 +1,258 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.phoenix.mapreduce.index;
+
+import org.apache.hadoop.hbase.Cell;
+import org.apache.hadoop.hbase.HColumnDescriptor;
+import org.apache.hadoop.hbase.HTableDescriptor;
+import org.apache.hadoop.hbase.TableName;
+import org.apache.hadoop.hbase.client.Admin;
+import org.apache.hadoop.hbase.client.Put;
+import org.apache.hadoop.hbase.client.Result;
+import org.apache.hadoop.hbase.client.ResultScanner;
+import org.apache.hadoop.hbase.client.Scan;
+import org.apache.hadoop.hbase.client.Table;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.phoenix.coprocessor.IndexToolVerificationResult;
+import org.apache.phoenix.coprocessor.MetaDataProtocol;
+import org.apache.phoenix.hbase.index.table.HTableFactory;
+import org.apache.phoenix.hbase.index.util.ImmutableBytesPtr;
+import org.apache.phoenix.jdbc.PhoenixConnection;
+import org.apache.phoenix.query.ConnectionQueryServices;
+import org.apache.phoenix.query.QueryConstants;
+import org.apache.phoenix.util.ByteUtil;
+
+import java.io.IOException;
+import java.sql.Connection;
+import java.sql.SQLException;
+
+public class IndexVerificationResultRepository implements AutoCloseable {
+
+    private Table resultTable;
+    private Table indexTable;
+    public static final byte[] ROW_KEY_SEPARATOR_BYTE = Bytes.toBytes("|");
+    public final static String RESULT_TABLE_NAME = "PHOENIX_INDEX_TOOL_RESULT";
+    public final static byte[] RESULT_TABLE_NAME_BYTES = Bytes.toBytes(RESULT_TABLE_NAME);
+    public final static byte[] RESULT_TABLE_COLUMN_FAMILY = QueryConstants.DEFAULT_COLUMN_FAMILY_BYTES;
+    public final static String SCANNED_DATA_ROW_COUNT = "ScannedDataRowCount";
+    public final static byte[] SCANNED_DATA_ROW_COUNT_BYTES = Bytes.toBytes(SCANNED_DATA_ROW_COUNT);
+    public final static String REBUILT_INDEX_ROW_COUNT = "RebuiltIndexRowCount";
+    public final static byte[] REBUILT_INDEX_ROW_COUNT_BYTES = Bytes.toBytes(REBUILT_INDEX_ROW_COUNT);
+    public final static String BEFORE_REBUILD_VALID_INDEX_ROW_COUNT =
+        "BeforeRebuildValidIndexRowCount";
+    public final static byte[] BEFORE_REBUILD_VALID_INDEX_ROW_COUNT_BYTES = Bytes.toBytes(BEFORE_REBUILD_VALID_INDEX_ROW_COUNT);
+    public final static String BEFORE_REBUILD_EXPIRED_INDEX_ROW_COUNT =
+        "BeforeRebuildExpiredIndexRowCount";
+    public final static byte[] BEFORE_REBUILD_EXPIRED_INDEX_ROW_COUNT_BYTES = Bytes.toBytes(BEFORE_REBUILD_EXPIRED_INDEX_ROW_COUNT);
+    public final static String BEFORE_REBUILD_MISSING_INDEX_ROW_COUNT =
+        "BeforeRebuildMissingIndexRowCount";
+    public final static byte[] BEFORE_REBUILD_MISSING_INDEX_ROW_COUNT_BYTES = Bytes.toBytes(BEFORE_REBUILD_MISSING_INDEX_ROW_COUNT);
+    public final static String BEFORE_REBUILD_INVALID_INDEX_ROW_COUNT =
+        "BeforeRebuildInvalidIndexRowCount";
+    public final static byte[] BEFORE_REBUILD_INVALID_INDEX_ROW_COUNT_BYTES = Bytes.toBytes(BEFORE_REBUILD_INVALID_INDEX_ROW_COUNT);
+    public final static String AFTER_REBUILD_VALID_INDEX_ROW_COUNT =
+        "AfterValidExpiredIndexRowCount";
+    public final static byte[] AFTER_REBUILD_VALID_INDEX_ROW_COUNT_BYTES = Bytes.toBytes(AFTER_REBUILD_VALID_INDEX_ROW_COUNT);
+    public final static String AFTER_REBUILD_EXPIRED_INDEX_ROW_COUNT =
+        "AfterRebuildExpiredIndexRowCount";
+    public final static byte[] AFTER_REBUILD_EXPIRED_INDEX_ROW_COUNT_BYTES = Bytes.toBytes(AFTER_REBUILD_EXPIRED_INDEX_ROW_COUNT);
+    public final static String AFTER_REBUILD_MISSING_INDEX_ROW_COUNT =
+        "AfterRebuildMissingIndexRowCount";
+    public final static byte[] AFTER_REBUILD_MISSING_INDEX_ROW_COUNT_BYTES = Bytes.toBytes(AFTER_REBUILD_MISSING_INDEX_ROW_COUNT);
+    public final static String AFTER_REBUILD_INVALID_INDEX_ROW_COUNT =
+        "AfterRebuildInvalidIndexRowCount";
+    public final static byte[] AFTER_REBUILD_INVALID_INDEX_ROW_COUNT_BYTES = Bytes.toBytes(AFTER_REBUILD_INVALID_INDEX_ROW_COUNT);
+    public final static String BEFORE_REBUILD_BEYOND_MAXLOOKBACK_MISSING_INDEX_ROW_COUNT =
+        "BeforeRebuildBeyondMaxLookBackMissingIndexRowCount";
+    public final static byte[] BEFORE_REBUILD_BEYOND_MAXLOOKBACK_MISSING_INDEX_ROW_COUNT_BYTES =
+        Bytes.toBytes(BEFORE_REBUILD_BEYOND_MAXLOOKBACK_MISSING_INDEX_ROW_COUNT);
+    public final static String BEFORE_REBUILD_BEYOND_MAXLOOKBACK_INVALID_INDEX_ROW_COUNT =
+        "BeforeRebuildBeyondMaxLookBackInvalidIndexRowCount";
+    public final static byte[] BEFORE_REBUILD_BEYOND_MAXLOOKBACK_INVALID_INDEX_ROW_COUNT_BYTES =
+        Bytes.toBytes(BEFORE_REBUILD_BEYOND_MAXLOOKBACK_INVALID_INDEX_ROW_COUNT);
+
+    public final static String AFTER_REBUILD_BEYOND_MAXLOOKBACK_MISSING_INDEX_ROW_COUNT =
+        "AfterRebuildBeyondMaxLookBackMissingIndexRowCount";
+    public final static byte[] AFTER_REBUILD_BEYOND_MAXLOOKBACK_MISSING_INDEX_ROW_COUNT_BYTES =
+        Bytes.toBytes(AFTER_REBUILD_BEYOND_MAXLOOKBACK_MISSING_INDEX_ROW_COUNT);
+    public final static String AFTER_REBUILD_BEYOND_MAXLOOKBACK_INVALID_INDEX_ROW_COUNT =
+        "AfterRebuildBeyondMaxLookBackInvalidIndexRowCount";
+    public final static byte[] AFTER_REBUILD_BEYOND_MAXLOOKBACK_INVALID_INDEX_ROW_COUNT_BYTES =
+        Bytes.toBytes(AFTER_REBUILD_BEYOND_MAXLOOKBACK_INVALID_INDEX_ROW_COUNT);
+
+    /***
+     * Only usable for read / create methods. To write use setResultTable and setIndexTable first
+     */
+    public IndexVerificationResultRepository(){
+
+    }
+
+    public IndexVerificationResultRepository(Connection conn, byte[] indexNameBytes) throws SQLException {
+        resultTable = getTable(conn, RESULT_TABLE_NAME_BYTES);
+        indexTable = getTable(conn, indexNameBytes);
+    }
+
+    public IndexVerificationResultRepository(byte[] indexName,
+                                             HTableFactory hTableFactory) throws IOException {
+        resultTable = hTableFactory.getTable(new ImmutableBytesPtr(RESULT_TABLE_NAME_BYTES));
+        indexTable = hTableFactory.getTable(new ImmutableBytesPtr(indexName));
+    }
+
+    public void createResultTable(Connection connection) throws IOException, SQLException {
+        ConnectionQueryServices queryServices = connection.unwrap(PhoenixConnection.class).getQueryServices();
+        Admin admin = queryServices.getAdmin();
+        TableName resultTableName = TableName.valueOf(RESULT_TABLE_NAME);
+        if (!admin.tableExists(resultTableName)) {
+            HTableDescriptor tableDescriptor = new
+                HTableDescriptor(TableName.valueOf(RESULT_TABLE_NAME));
+            tableDescriptor.setValue(HColumnDescriptor.TTL, String.valueOf(MetaDataProtocol.DEFAULT_LOG_TTL));
+            HColumnDescriptor columnDescriptor = new HColumnDescriptor(RESULT_TABLE_COLUMN_FAMILY);
+            tableDescriptor.addFamily(columnDescriptor);
+            admin.createTable(tableDescriptor);
+            setResultTable(admin.getConnection().getTable(resultTableName));
+        }
+    }
+    public static byte[] generateResultTableRowKey(long ts, byte[] indexTableName,  byte [] regionName,
+                                                    byte[] startRow, byte[] stopRow) {
+        byte[] keyPrefix = Bytes.toBytes(Long.toString(ts));
+        int targetOffset = 0;
+        // The row key for the result table : timestamp | index table name | datable table region name |
+        //                                    scan start row | scan stop row
+        byte[] rowKey = new byte[keyPrefix.length + ROW_KEY_SEPARATOR_BYTE.length + indexTableName.length +
+            ROW_KEY_SEPARATOR_BYTE.length + regionName.length + ROW_KEY_SEPARATOR_BYTE.length +
+            startRow.length + ROW_KEY_SEPARATOR_BYTE.length + stopRow.length];
+        Bytes.putBytes(rowKey, targetOffset, keyPrefix, 0, keyPrefix.length);
+        targetOffset += keyPrefix.length;
+        Bytes.putBytes(rowKey, targetOffset, ROW_KEY_SEPARATOR_BYTE, 0, ROW_KEY_SEPARATOR_BYTE.length);
+        targetOffset += ROW_KEY_SEPARATOR_BYTE.length;
+        Bytes.putBytes(rowKey, targetOffset, indexTableName, 0, indexTableName.length);
+        targetOffset += indexTableName.length;
+        Bytes.putBytes(rowKey, targetOffset, ROW_KEY_SEPARATOR_BYTE, 0, ROW_KEY_SEPARATOR_BYTE.length);
+        targetOffset += ROW_KEY_SEPARATOR_BYTE.length;
+        Bytes.putBytes(rowKey, targetOffset, regionName, 0, regionName.length);
+        targetOffset += regionName.length;
+        Bytes.putBytes(rowKey, targetOffset, ROW_KEY_SEPARATOR_BYTE, 0, ROW_KEY_SEPARATOR_BYTE.length);
+        targetOffset += ROW_KEY_SEPARATOR_BYTE.length;
+        Bytes.putBytes(rowKey, targetOffset, startRow, 0, startRow.length);
+        targetOffset += startRow.length;
+        Bytes.putBytes(rowKey, targetOffset, ROW_KEY_SEPARATOR_BYTE, 0, ROW_KEY_SEPARATOR_BYTE.length);
+        targetOffset += ROW_KEY_SEPARATOR_BYTE.length;
+        Bytes.putBytes(rowKey, targetOffset, stopRow, 0, stopRow.length);
+        return rowKey;
+    }
+
+    public void logToIndexToolResultTable(IndexToolVerificationResult verificationResult,
+                                          IndexTool.IndexVerifyType verifyType, byte[] region) throws IOException {
+        long scanMaxTs = verificationResult.getScanMaxTs();
+        byte[] rowKey = generateResultTableRowKey(scanMaxTs, indexTable.getName().toBytes(),
+            region, verificationResult.getStartRow(),
+            verificationResult.getStopRow());
+        Put put = new Put(rowKey);
+        put.addColumn(RESULT_TABLE_COLUMN_FAMILY, SCANNED_DATA_ROW_COUNT_BYTES,
+            scanMaxTs, Bytes.toBytes(Long.toString(verificationResult.getScannedDataRowCount())));
+        put.addColumn(RESULT_TABLE_COLUMN_FAMILY, REBUILT_INDEX_ROW_COUNT_BYTES,
+            scanMaxTs, Bytes.toBytes(Long.toString(verificationResult.getRebuiltIndexRowCount())));
+        if (verifyType == IndexTool.IndexVerifyType.BEFORE || verifyType == IndexTool.IndexVerifyType.BOTH ||
+            verifyType == IndexTool.IndexVerifyType.ONLY) {
+            put.addColumn(RESULT_TABLE_COLUMN_FAMILY, BEFORE_REBUILD_VALID_INDEX_ROW_COUNT_BYTES,
+                scanMaxTs, Bytes.toBytes(Long.toString(verificationResult.getBeforeRebuildValidIndexRowCount())));
+            put.addColumn(RESULT_TABLE_COLUMN_FAMILY, BEFORE_REBUILD_EXPIRED_INDEX_ROW_COUNT_BYTES,
+                scanMaxTs, Bytes.toBytes(Long.toString(verificationResult.getBeforeRebuildExpiredIndexRowCount())));
+            put.addColumn(RESULT_TABLE_COLUMN_FAMILY, BEFORE_REBUILD_MISSING_INDEX_ROW_COUNT_BYTES,
+                scanMaxTs, Bytes.toBytes(Long.toString(verificationResult.getBeforeRebuildMissingIndexRowCount())));
+            put.addColumn(RESULT_TABLE_COLUMN_FAMILY, BEFORE_REBUILD_INVALID_INDEX_ROW_COUNT_BYTES,
+                scanMaxTs, Bytes.toBytes(Long.toString(verificationResult.getBeforeRebuildInvalidIndexRowCount())));
+            put.addColumn(RESULT_TABLE_COLUMN_FAMILY, BEFORE_REBUILD_BEYOND_MAXLOOKBACK_MISSING_INDEX_ROW_COUNT_BYTES,
+                scanMaxTs,
+                Bytes.toBytes(Long.toString(verificationResult.getBefore().getBeyondMaxLookBackMissingIndexRowCount())));
+            put.addColumn(RESULT_TABLE_COLUMN_FAMILY, BEFORE_REBUILD_BEYOND_MAXLOOKBACK_INVALID_INDEX_ROW_COUNT_BYTES,
+                scanMaxTs,
+                Bytes.toBytes(Long.toString(verificationResult.getBefore().getBeyondMaxLookBackInvalidIndexRowCount())));
+        }
+        if (verifyType == IndexTool.IndexVerifyType.AFTER || verifyType == IndexTool.IndexVerifyType.BOTH) {
+            put.addColumn(RESULT_TABLE_COLUMN_FAMILY, AFTER_REBUILD_VALID_INDEX_ROW_COUNT_BYTES,
+                scanMaxTs, Bytes.toBytes(Long.toString(verificationResult.getAfterRebuildValidIndexRowCount())));
+            put.addColumn(RESULT_TABLE_COLUMN_FAMILY, AFTER_REBUILD_EXPIRED_INDEX_ROW_COUNT_BYTES,
+                scanMaxTs, Bytes.toBytes(Long.toString(verificationResult.getAfterRebuildExpiredIndexRowCount())));
+            put.addColumn(RESULT_TABLE_COLUMN_FAMILY, AFTER_REBUILD_MISSING_INDEX_ROW_COUNT_BYTES,
+                scanMaxTs, Bytes.toBytes(Long.toString(verificationResult.getAfterRebuildMissingIndexRowCount())));
+            put.addColumn(RESULT_TABLE_COLUMN_FAMILY, AFTER_REBUILD_INVALID_INDEX_ROW_COUNT_BYTES,
+                scanMaxTs, Bytes.toBytes(Long.toString(verificationResult.getAfterRebuildInvalidIndexRowCount())));
+            put.addColumn(RESULT_TABLE_COLUMN_FAMILY, AFTER_REBUILD_BEYOND_MAXLOOKBACK_MISSING_INDEX_ROW_COUNT_BYTES,
+                scanMaxTs,
+                Bytes.toBytes(Long.toString(verificationResult.getAfter().getBeyondMaxLookBackMissingIndexRowCount())));
+            put.addColumn(RESULT_TABLE_COLUMN_FAMILY, AFTER_REBUILD_BEYOND_MAXLOOKBACK_INVALID_INDEX_ROW_COUNT_BYTES,
+                scanMaxTs,
+                Bytes.toBytes(Long.toString(verificationResult.getAfter().getBeyondMaxLookBackInvalidIndexRowCount())));
+        }
+        resultTable.put(put);
+    }
+
+    public IndexToolVerificationResult getVerificationResult(Connection conn, long ts) throws IOException, SQLException {
+        Table hTable = getTable(conn, RESULT_TABLE_NAME_BYTES);
+        return getVerificationResult(hTable, ts);
+    }
+
+    public Table getTable(Connection conn, byte[] tableName) throws SQLException {
+        return conn.unwrap(PhoenixConnection.class).getQueryServices()
+                .getTable(tableName);
+    }
+
+    public IndexToolVerificationResult getVerificationResult(Table htable, long ts)
+        throws IOException {
+        byte[] startRowKey = Bytes.toBytes(Long.toString(ts));
+        byte[] stopRowKey = ByteUtil.calculateTheClosestNextRowKeyForPrefix(startRowKey);
+        IndexToolVerificationResult verificationResult = new IndexToolVerificationResult(ts);
+        Scan scan = new Scan();
+        scan.withStartRow(startRowKey);
+        scan.withStopRow(stopRowKey);
+        ResultScanner scanner = htable.getScanner(scan);
+        for (Result result = scanner.next(); result != null; result = scanner.next()) {
+            boolean isFirst = true;
+            for (Cell cell : result.rawCells()) {
+                if (isFirst){
+                    byte[][] rowKeyParts = ByteUtil.splitArrayBySeparator(result.getRow(),
+                        ROW_KEY_SEPARATOR_BYTE[0]);
+                    verificationResult.setStartRow(rowKeyParts[3]);
+                    verificationResult.setStopRow(rowKeyParts[4]);
+                    isFirst = false;
+                }
+                verificationResult.update(cell);
+            }
+        }
+        return verificationResult;
+    }
+
+    public void close() throws IOException {
+        if (resultTable != null) {
+            resultTable.close();
+        }
+        if (indexTable != null) {
+            indexTable.close();
+        }
+    }
+
+    public void setResultTable(Table resultTable) {
+        this.resultTable = resultTable;
+    }
+
+    public void setIndexTable(Table indexTable) {
+        this.indexTable = indexTable;
+    }
+}
+
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/index/PhoenixIndexImportDirectReducer.java b/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/index/PhoenixIndexImportDirectReducer.java
index a24e3ab..4cd2603 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/index/PhoenixIndexImportDirectReducer.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/index/PhoenixIndexImportDirectReducer.java
@@ -25,12 +25,10 @@
 import java.util.concurrent.atomic.AtomicBoolean;
 
 import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.hbase.client.Table;
 import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
 import org.apache.hadoop.io.IntWritable;
 import org.apache.hadoop.io.NullWritable;
 import org.apache.hadoop.mapreduce.Reducer;
-import org.apache.phoenix.coprocessor.IndexRebuildRegionScanner;
 import org.apache.phoenix.coprocessor.IndexToolVerificationResult;
 import org.apache.phoenix.coprocessor.TaskRegionObserver;
 import org.apache.phoenix.jdbc.PhoenixConnection;
@@ -53,6 +51,7 @@
 public class PhoenixIndexImportDirectReducer extends
         Reducer<ImmutableBytesWritable, IntWritable, NullWritable, NullWritable> {
     private AtomicBoolean calledOnce = new AtomicBoolean(false);
+    private IndexVerificationResultRepository resultRepository;
     private static final Logger LOGGER =
             LoggerFactory.getLogger(PhoenixIndexImportDirectReducer.class);
 
@@ -62,10 +61,8 @@
         Configuration configuration = context.getConfiguration();
         try (final Connection connection = ConnectionUtil.getInputConnection(configuration)) {
             long ts = Long.valueOf(configuration.get(PhoenixConfigurationUtil.CURRENT_SCN_VALUE));
-            Table hTable = connection.unwrap(PhoenixConnection.class).getQueryServices()
-                    .getTable(IndexTool.RESULT_TABLE_NAME_BYTES);
             IndexToolVerificationResult verificationResult =
-                    IndexToolVerificationResult.getVerificationResult(hTable, ts);
+                    resultRepository.getVerificationResult(connection, ts);
             context.getCounter(PhoenixIndexToolJobCounters.SCANNED_DATA_ROW_COUNT).
                     setValue(verificationResult.getScannedDataRowCount());
             context.getCounter(PhoenixIndexToolJobCounters.REBUILT_INDEX_ROW_COUNT).
@@ -108,6 +105,11 @@
     }
 
     @Override
+    protected void setup(Context context) throws IOException {
+        resultRepository = new IndexVerificationResultRepository();
+    }
+
+    @Override
     protected void reduce(ImmutableBytesWritable arg0, Iterable<IntWritable> arg1,
                           Reducer<ImmutableBytesWritable, IntWritable, NullWritable, NullWritable>.Context context)
             throws IOException, InterruptedException
@@ -133,6 +135,7 @@
     protected void cleanup(Context context) throws IOException, InterruptedException{
         try {
             updateTasksTable(context);
+            resultRepository.close();
         } catch (SQLException e) {
             LOGGER.error(" Failed to update the tasks table");
             throw new RuntimeException(e.getMessage());
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/util/ByteUtil.java b/phoenix-core/src/main/java/org/apache/phoenix/util/ByteUtil.java
index 5a2b624..cc582f7 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/util/ByteUtil.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/util/ByteUtil.java
@@ -23,12 +23,14 @@
 import java.io.DataOutput;
 import java.io.DataOutputStream;
 import java.io.IOException;
+import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.Collection;
 import java.util.Comparator;
 import java.util.List;
 import java.util.Set;
 
+import org.apache.hadoop.hbase.HConstants;
 import org.apache.hadoop.hbase.filter.CompareFilter.CompareOp;
 import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
 import org.apache.hadoop.hbase.util.Bytes;
@@ -606,4 +608,48 @@
         }
         return true;
     }
+
+    public static byte[] calculateTheClosestNextRowKeyForPrefix(byte[] rowKeyPrefix) {
+        // Essentially we are treating it like an 'unsigned very very long' and doing +1 manually.
+        // Search for the place where the trailing 0xFFs start
+        int offset = rowKeyPrefix.length;
+        while (offset > 0) {
+            if (rowKeyPrefix[offset - 1] != (byte) 0xFF) {
+                break;
+            }
+            offset--;
+        }
+        if (offset == 0) {
+            // We got an 0xFFFF... (only FFs) stopRow value which is
+            // the last possible prefix before the end of the table.
+            // So set it to stop at the 'end of the table'
+            return HConstants.EMPTY_END_ROW;
+        }
+        // Copy the right length of the original
+        byte[] newStopRow = Arrays.copyOfRange(rowKeyPrefix, 0, offset);
+        // And increment the last one
+        newStopRow[newStopRow.length - 1]++;
+        return newStopRow;
+    }
+
+    public static byte[][] splitArrayBySeparator(byte[] src, byte separator){
+        List<Integer> separatorLocations = new ArrayList<Integer>();
+        for (int k = 0; k < src.length; k++){
+            if (src[k] == separator){
+                separatorLocations.add(k);
+            }
+        }
+        byte[][] dst = new byte[separatorLocations.size() +1][];
+        int previousSepartor = -1;
+        for (int j = 0; j < separatorLocations.size(); j++){
+            int separatorLocation = separatorLocations.get(j);
+            dst[j] = Bytes.copy(src, previousSepartor +1, separatorLocation- previousSepartor -1);
+            previousSepartor = separatorLocation;
+        }
+        if (previousSepartor < src.length){
+            dst[separatorLocations.size()] = Bytes.copy(src,
+                previousSepartor +1, src.length - previousSepartor -1);
+        }
+        return dst;
+    }
 }