PHOENIX-5951 - Index rebuild output logging for past-max-lookback rows should be configurable
diff --git a/phoenix-core/src/it/java/org/apache/phoenix/end2end/index/IndexVerificationOutputRepositoryIT.java b/phoenix-core/src/it/java/org/apache/phoenix/end2end/index/IndexVerificationOutputRepositoryIT.java
index f24f49d..5731c6a 100644
--- a/phoenix-core/src/it/java/org/apache/phoenix/end2end/index/IndexVerificationOutputRepositoryIT.java
+++ b/phoenix-core/src/it/java/org/apache/phoenix/end2end/index/IndexVerificationOutputRepositoryIT.java
@@ -31,6 +31,7 @@
 import org.apache.phoenix.jdbc.PhoenixConnection;
 import org.apache.phoenix.mapreduce.index.IndexTool;
 import org.apache.phoenix.mapreduce.index.IndexVerificationOutputRepository;
+import org.apache.phoenix.mapreduce.index.IndexVerificationOutputRepository.IndexVerificationErrorType;
 import org.apache.phoenix.mapreduce.index.IndexVerificationOutputRow;
 import org.apache.phoenix.query.ConnectionQueryServices;
 import org.apache.phoenix.util.EnvironmentEdgeManager;
@@ -53,6 +54,9 @@
 import java.util.Map;
 
 import static org.apache.phoenix.coprocessor.MetaDataProtocol.DEFAULT_LOG_TTL;
+import static org.apache.phoenix.mapreduce.index.IndexVerificationOutputRepository.IndexVerificationErrorType.BEYOND_MAX_LOOKBACK_INVALID;
+import static org.apache.phoenix.mapreduce.index.IndexVerificationOutputRepository.IndexVerificationErrorType.BEYOND_MAX_LOOKBACK_MISSING;
+import static org.apache.phoenix.mapreduce.index.IndexVerificationOutputRepository.IndexVerificationErrorType.INVALID_ROW;
 import static org.apache.phoenix.mapreduce.index.IndexVerificationOutputRepository.OUTPUT_TABLE_NAME_BYTES;
 import static org.apache.phoenix.mapreduce.index.IndexVerificationOutputRepository.PHASE_AFTER_VALUE;
 import static org.apache.phoenix.mapreduce.index.IndexVerificationOutputRepository.PHASE_BEFORE_VALUE;
@@ -90,21 +94,23 @@
             long scanMaxTs = EnvironmentEdgeManager.currentTimeMillis();
             outputRepository.logToIndexToolOutputTable(dataRowKey, indexRowKey, dataRowTs,
                 indexRowTs, expectedErrorMessage, expectedValue, actualValue,
-                scanMaxTs, tableNameBytes, true);
+                scanMaxTs, tableNameBytes, true,
+                INVALID_ROW);
             //now increment the scan time by 1 and do it again
             outputRepository.logToIndexToolOutputTable(dataRowKey, indexRowKey, dataRowTs,
                 indexRowTs, expectedErrorMessage, expectedValue, actualValue,
-                scanMaxTs +1, tableNameBytes, false);
+                scanMaxTs +1, tableNameBytes, false,
+                INVALID_ROW);
             //make sure we only get the first row back
             IndexVerificationOutputRow expectedRow = buildVerificationRow(dataRowKey, indexRowKey, dataRowTs,
                 indexRowTs, expectedErrorMessage, expectedValue, actualValue,
-                scanMaxTs, tableNameBytes, indexNameBytes, PHASE_BEFORE_VALUE);
+                scanMaxTs, tableNameBytes, indexNameBytes, PHASE_BEFORE_VALUE, INVALID_ROW);
             verifyOutputRow(outputRepository, scanMaxTs, indexNameBytes, expectedRow);
             //make sure we get the second row back
             IndexVerificationOutputRow secondExpectedRow = buildVerificationRow(dataRowKey,
                 indexRowKey, dataRowTs,
                 indexRowTs, expectedErrorMessage, expectedValue, actualValue,
-                scanMaxTs + 1, tableNameBytes, indexNameBytes, PHASE_AFTER_VALUE);
+                scanMaxTs + 1, tableNameBytes, indexNameBytes, PHASE_AFTER_VALUE, INVALID_ROW);
             verifyOutputRow(outputRepository, scanMaxTs+1, indexNameBytes, secondExpectedRow);
         }
 
@@ -128,7 +134,8 @@
             TestUtil.assertTableHasTtl(conn, TableName.valueOf(OUTPUT_TABLE_NAME_BYTES), DEFAULT_LOG_TTL);
             outputRepository.logToIndexToolOutputTable(mockStringBytes, mockStringBytes,
                     1, 2, mockString, mockStringBytes, mockStringBytes,
-                    EnvironmentEdgeManager.currentTimeMillis(), mockStringBytes, true);
+                    EnvironmentEdgeManager.currentTimeMillis(), mockStringBytes, true,
+                INVALID_ROW);
 
             Assert.assertEquals(1, TestUtil.getRowCount(hTable, false));
 
@@ -147,7 +154,7 @@
         IndexTool.IndexDisableLoggingType disableLoggingVerifyType = IndexTool.IndexDisableLoggingType.BEFORE;
         boolean expectedBefore = false;
         boolean expectedAfter = true;
-        verifyDisableLogging(disableLoggingVerifyType, expectedBefore, expectedAfter);
+        verifyDisableLogging(disableLoggingVerifyType, expectedBefore, expectedAfter, INVALID_ROW);
     }
 
     @Test
@@ -155,7 +162,7 @@
         IndexTool.IndexDisableLoggingType disableLoggingVerifyType = IndexTool.IndexDisableLoggingType.AFTER;
         boolean expectedBefore = true;
         boolean expectedAfter = false;
-        verifyDisableLogging(disableLoggingVerifyType, expectedBefore, expectedAfter);
+        verifyDisableLogging(disableLoggingVerifyType, expectedBefore, expectedAfter, INVALID_ROW);
     }
 
     @Test
@@ -163,7 +170,7 @@
         IndexTool.IndexDisableLoggingType disableLoggingVerifyType = IndexTool.IndexDisableLoggingType.BOTH;
         boolean expectedBefore = false;
         boolean expectedAfter = false;
-        verifyDisableLogging(disableLoggingVerifyType, expectedBefore, expectedAfter);
+        verifyDisableLogging(disableLoggingVerifyType, expectedBefore, expectedAfter, INVALID_ROW);
     }
 
     @Test
@@ -171,16 +178,46 @@
         IndexTool.IndexDisableLoggingType disableLoggingVerifyType = IndexTool.IndexDisableLoggingType.NONE;
         boolean expectedBefore = true;
         boolean expectedAfter = true;
-        verifyDisableLogging(disableLoggingVerifyType, expectedBefore, expectedAfter);
+        verifyDisableLogging(disableLoggingVerifyType, expectedBefore, expectedAfter, INVALID_ROW);
     }
 
-    public void verifyDisableLogging(IndexTool.IndexDisableLoggingType disableLoggingVerifyType, boolean expectedBefore, boolean expectedAfter) throws SQLException, IOException {
+    @Test
+    public void testDisableLoggingBeyondMaxLookback() throws SQLException, IOException {
+        IndexTool.IndexDisableLoggingType disableLoggingVerifyType = IndexTool.IndexDisableLoggingType.NONE;
+        boolean expectedBefore = false;
+        boolean expectedAfter = false;
+        verifyDisableLogging(disableLoggingVerifyType, expectedBefore, expectedAfter,
+            BEYOND_MAX_LOOKBACK_INVALID, false);
+        verifyDisableLogging(disableLoggingVerifyType, expectedBefore, expectedAfter,
+            BEYOND_MAX_LOOKBACK_MISSING, false);
+
+        expectedBefore = true;
+        expectedAfter = true;
+        verifyDisableLogging(disableLoggingVerifyType, expectedBefore, expectedAfter,
+            BEYOND_MAX_LOOKBACK_INVALID, true);
+        verifyDisableLogging(disableLoggingVerifyType, expectedBefore, expectedAfter,
+            BEYOND_MAX_LOOKBACK_MISSING, true);
+    }
+
+    public void verifyDisableLogging(IndexTool.IndexDisableLoggingType disableLoggingVerifyType,
+                                     boolean expectedBefore, boolean expectedAfter,
+                                     IndexVerificationErrorType errorType) throws SQLException, IOException {
+        verifyDisableLogging(disableLoggingVerifyType, expectedBefore, expectedAfter, errorType,
+            true);
+    }
+
+    public void verifyDisableLogging(IndexTool.IndexDisableLoggingType disableLoggingVerifyType,
+                                     boolean expectedBefore, boolean expectedAfter,
+                                     IndexVerificationErrorType errorType,
+                                     boolean shouldLogBeyondMaxLookback) throws SQLException,
+        IOException {
         Table mockOutputTable = Mockito.mock(Table.class);
         Table mockIndexTable = Mockito.mock(Table.class);
         when(mockIndexTable.getName()).thenReturn(TableName.valueOf("testDisableLoggingIndexName"));
         IndexVerificationOutputRepository outputRepository =
             new IndexVerificationOutputRepository(mockOutputTable,
                 mockIndexTable, disableLoggingVerifyType);
+        outputRepository.setShouldLogBeyondMaxLookback(shouldLogBeyondMaxLookback);
         byte[] dataRowKey = Bytes.toBytes("dataRowKey");
         byte[] indexRowKey = Bytes.toBytes("indexRowKey");
         long dataRowTs = EnvironmentEdgeManager.currentTimeMillis();
@@ -192,9 +229,9 @@
         byte[] tableName = Bytes.toBytes("testDisableLoggingTableName");
 
         outputRepository.logToIndexToolOutputTable(dataRowKey, indexRowKey, dataRowTs, indexRowTs
-            , errorMsg, expectedValue, actualValue, scanMaxTs, tableName, true);
+            , errorMsg, expectedValue, actualValue, scanMaxTs, tableName, true, errorType);
         outputRepository.logToIndexToolOutputTable(dataRowKey, indexRowKey, dataRowTs, indexRowTs
-            , errorMsg, expectedValue, actualValue, scanMaxTs, tableName, false);
+            , errorMsg, expectedValue, actualValue, scanMaxTs, tableName, false, errorType);
         int expectedRowsLogged = 0;
         if (expectedBefore && expectedAfter) {
             expectedRowsLogged = 2;
@@ -222,7 +259,8 @@
                                                             long scanMaxTs,
                                                             byte[] tableNameBytes,
                                                             byte[] indexNameBytes,
-                                                            byte[] phaseBeforeValue) {
+                                                            byte[] phaseBeforeValue,
+                                                            IndexVerificationErrorType errorType) {
         IndexVerificationOutputRow.IndexVerificationOutputRowBuilder builder =
             new IndexVerificationOutputRow.IndexVerificationOutputRowBuilder();
         return builder.setDataTableRowKey(dataRowKey).
@@ -239,6 +277,7 @@
             setDataTableName(Bytes.toString(tableNameBytes)).
             setIndexTableName(Bytes.toString(indexNameBytes)).
             setPhaseValue(phaseBeforeValue).
+            setErrorType(errorType).
             build();
     }
 
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/IndexRebuildRegionScanner.java b/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/IndexRebuildRegionScanner.java
index 6f82db3..6f39837 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/IndexRebuildRegionScanner.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/IndexRebuildRegionScanner.java
@@ -20,6 +20,12 @@
 import static org.apache.phoenix.hbase.index.IndexRegionObserver.UNVERIFIED_BYTES;
 import static org.apache.phoenix.hbase.index.IndexRegionObserver.VERIFIED_BYTES;
 import static org.apache.phoenix.hbase.index.IndexRegionObserver.removeEmptyColumn;
+import static org.apache.phoenix.hbase.index.write.AbstractParallelWriterIndexCommitter.INDEX_WRITER_KEEP_ALIVE_TIME_CONF_KEY;
+import static org.apache.phoenix.mapreduce.index.IndexVerificationOutputRepository.IndexVerificationErrorType.BEYOND_MAX_LOOKBACK_INVALID;
+import static org.apache.phoenix.mapreduce.index.IndexVerificationOutputRepository.IndexVerificationErrorType.BEYOND_MAX_LOOKBACK_MISSING;
+import static org.apache.phoenix.mapreduce.index.IndexVerificationOutputRepository.IndexVerificationErrorType.EXTRA_CELLS;
+import static org.apache.phoenix.mapreduce.index.IndexVerificationOutputRepository.IndexVerificationErrorType.INVALID_ROW;
+import static org.apache.phoenix.mapreduce.index.IndexVerificationOutputRepository.IndexVerificationErrorType.MISSING_ROW;
 import static org.apache.phoenix.query.QueryConstants.AGG_TIMESTAMP;
 import static org.apache.phoenix.query.QueryConstants.SINGLE_COLUMN;
 import static org.apache.phoenix.query.QueryConstants.SINGLE_COLUMN_FAMILY;
@@ -40,6 +46,7 @@
 
 import com.google.common.annotations.VisibleForTesting;
 import com.google.common.collect.Lists;
+import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.hbase.Cell;
 import org.apache.hadoop.hbase.CellUtil;
 import org.apache.hadoop.hbase.DoNotRetryIOException;
@@ -47,6 +54,7 @@
 import org.apache.hadoop.hbase.HRegionInfo;
 import org.apache.hadoop.hbase.KeyValue;
 import org.apache.hadoop.hbase.client.Delete;
+import org.apache.hadoop.hbase.client.Durability;
 import org.apache.hadoop.hbase.client.Mutation;
 import org.apache.hadoop.hbase.client.Put;
 import org.apache.hadoop.hbase.client.Result;
@@ -92,6 +100,12 @@
 
     private static final Logger LOGGER = LoggerFactory.getLogger(IndexRebuildRegionScanner.class);
 
+    public static final String PHOENIX_INDEX_MR_LOG_BEYOND_MAX_LOOKBACK_ERRORS =
+        "phoenix.index.mr.log.beyond.max.lookback.errors";
+    public static final boolean DEFAULT_PHOENIX_INDEX_MR_LOG_BEYOND_MAX_LOOKBACK_ERRORS = false;
+    private boolean useProto = true;
+    private byte[] indexRowKey;
+    private IndexTool.IndexVerifyType verifyType = IndexTool.IndexVerifyType.NONE;
     private static boolean ignoreIndexRebuildForTesting  = false;
     public static void setIgnoreIndexRebuildForTesting(boolean ignore) { ignoreIndexRebuildForTesting = ignore; }
     private byte[] indexRowKeyforReadRepair;
@@ -123,6 +137,9 @@
             }
         }
         if (verify) {
+            boolean shouldLogBeyondMaxLookbackInvalidRows =
+                env.getConfiguration().getBoolean(PHOENIX_INDEX_MR_LOG_BEYOND_MAX_LOOKBACK_ERRORS,
+                    DEFAULT_PHOENIX_INDEX_MR_LOG_BEYOND_MAX_LOOKBACK_ERRORS);
             viewConstants = IndexUtil.deserializeViewConstantsFromScan(scan);
             byte[] disableLoggingValueBytes =
                 scan.getAttribute(BaseScannerRegionObserver.INDEX_REBUILD_DISABLE_LOGGING_VERIFY_TYPE);
@@ -133,6 +150,7 @@
             verificationOutputRepository =
                 new IndexVerificationOutputRepository(indexMaintainer.getIndexTableName()
                     , hTableFactory, disableLoggingVerifyType);
+            verificationOutputRepository.setShouldLogBeyondMaxLookback(shouldLogBeyondMaxLookbackInvalidRows);
             verificationResult = new IndexToolVerificationResult(scan);
             verificationResultRepository =
                 new IndexVerificationResultRepository(indexMaintainer.getIndexTableName(), hTableFactory);
@@ -257,17 +275,19 @@
     }
 
     public void logToIndexToolOutputTable(byte[] dataRowKey, byte[] indexRowKey, long dataRowTs, long indexRowTs,
-            String errorMsg, boolean isBeforeRebuild) throws IOException {
-        logToIndexToolOutputTable(dataRowKey, indexRowKey, dataRowTs, indexRowTs, errorMsg, null, null, isBeforeRebuild);
+            String errorMsg, boolean isBeforeRebuild,
+            IndexVerificationOutputRepository.IndexVerificationErrorType errorType) throws IOException {
+        logToIndexToolOutputTable(dataRowKey, indexRowKey, dataRowTs, indexRowTs, errorMsg, null,
+            null, isBeforeRebuild, errorType);
     }
 
     @VisibleForTesting
     public void logToIndexToolOutputTable(byte[] dataRowKey, byte[] indexRowKey, long dataRowTs, long indexRowTs,
-            String errorMsg, byte[] expectedVaue, byte[] actualValue, boolean isBeforeRebuild)
-            throws IOException {
+            String errorMsg, byte[] expectedVaue, byte[] actualValue, boolean isBeforeRebuild,
+                                          IndexVerificationOutputRepository.IndexVerificationErrorType errorType) throws IOException {
         verificationOutputRepository.logToIndexToolOutputTable(dataRowKey, indexRowKey, dataRowTs, indexRowTs,
                 errorMsg, expectedVaue, actualValue, scan.getTimeRange().getMax(),
-                region.getRegionInfo().getTable().getName(), isBeforeRebuild);
+                region.getRegionInfo().getTable().getName(), isBeforeRebuild, errorType);
     }
 
     private static Cell getCell(Mutation m, byte[] family, byte[] qualifier) {
@@ -288,7 +308,7 @@
             String errorMsg = "Not matching timestamp";
             byte[] dataKey = indexMaintainer.buildDataRowKey(new ImmutableBytesWritable(expected.getRow()), viewConstants);
             logToIndexToolOutputTable(dataKey, expected.getRow(), getTimestamp(expected), getTimestamp(actual),
-                    errorMsg, null, null, isBeforeRebuild);
+                    errorMsg, null, null, isBeforeRebuild, INVALID_ROW);
             return;
         }
         int expectedCellCount = 0;
@@ -305,7 +325,8 @@
                         !CellUtil.matchingType(expectedCell, actualCell)) {
                     byte[] dataKey = indexMaintainer.buildDataRowKey(new ImmutableBytesWritable(expected.getRow()), viewConstants);
                     String errorMsg = "Missing cell (in iteration " + iteration + ") " + Bytes.toString(family) + ":" + Bytes.toString(qualifier);
-                    logToIndexToolOutputTable(dataKey, expected.getRow(), getTimestamp(expected), getTimestamp(actual), errorMsg,isBeforeRebuild);
+                    logToIndexToolOutputTable(dataKey, expected.getRow(), getTimestamp(expected),
+                        getTimestamp(actual), errorMsg, isBeforeRebuild, INVALID_ROW);
                     verificationPhaseResult.setIndexHasMissingCellsCount(verificationPhaseResult.getIndexHasMissingCellsCount() + 1);
                     return;
                 }
@@ -313,7 +334,8 @@
                     String errorMsg = "Not matching value (in iteration " + iteration + ") for " + Bytes.toString(family) + ":" + Bytes.toString(qualifier);
                     byte[] dataKey = indexMaintainer.buildDataRowKey(new ImmutableBytesWritable(expected.getRow()), viewConstants);
                     logToIndexToolOutputTable(dataKey, expected.getRow(), getTimestamp(expected), getTimestamp(actual),
-                            errorMsg, CellUtil.cloneValue(expectedCell), CellUtil.cloneValue(actualCell), isBeforeRebuild);
+                            errorMsg, CellUtil.cloneValue(expectedCell),
+                        CellUtil.cloneValue(actualCell), isBeforeRebuild, INVALID_ROW);
                     return;
                 }
             }
@@ -329,7 +351,7 @@
             String errorMsg = "Index has extra cells (in iteration " + iteration + ")";
             byte[] dataKey = indexMaintainer.buildDataRowKey(new ImmutableBytesWritable(expected.getRow()), viewConstants);
             logToIndexToolOutputTable(dataKey, expected.getRow(), getTimestamp(expected), getTimestamp(actual),
-                    errorMsg, isBeforeRebuild);
+                    errorMsg, isBeforeRebuild, EXTRA_CELLS);
             verificationPhaseResult.setIndexHasExtraCellsCount(verificationPhaseResult.getIndexHasExtraCellsCount() + 1);
         }
     }
@@ -719,7 +741,7 @@
                     actualSize);
             logToIndexToolOutputTable(dataKey, indexRow.getRow(),
                     getTimestamp(expectedMutationList.get(expectedIndex)),
-                    0, errorMsg, isBeforeRebuild);
+                    0, errorMsg, isBeforeRebuild, BEYOND_MAX_LOOKBACK_INVALID);
             return false;
         }
         else {
@@ -730,7 +752,8 @@
                 byte[] dataKey = indexMaintainer.buildDataRowKey(new ImmutableBytesWritable(indexRow.getRow()), viewConstants);
                 String errorMsg = "Not matching index row";
                 logToIndexToolOutputTable(dataKey, indexRow.getRow(),
-                        getTimestamp(expectedMutationList.get(0)), 0L, errorMsg, isBeforeRebuild);
+                        getTimestamp(expectedMutationList.get(0)), 0L, errorMsg, isBeforeRebuild,
+                    INVALID_ROW);
             }
             verificationPhaseResult.setInvalidIndexRowCount(verificationPhaseResult.getInvalidIndexRowCount() + 1);
             return false;
@@ -791,17 +814,21 @@
             }
             currentTime = EnvironmentEdgeManager.currentTimeMillis();
             String errorMsg;
+            IndexVerificationOutputRepository.IndexVerificationErrorType errorType;
             if (isTimestampBeyondMaxLookBack(maxLookBackInMills, currentTime, getTimestamp(mutation))){
                 errorMsg = ERROR_MESSAGE_MISSING_INDEX_ROW_BEYOND_MAX_LOOKBACK;
+                errorType = BEYOND_MAX_LOOKBACK_MISSING;
                 verificationPhaseResult.
                         setBeyondMaxLookBackMissingIndexRowCount(verificationPhaseResult.getBeyondMaxLookBackMissingIndexRowCount() + 1);
             }
             else {
                 errorMsg = ERROR_MESSAGE_MISSING_INDEX_ROW;
+                errorType = MISSING_ROW;
                 verificationPhaseResult.setMissingIndexRowCount(verificationPhaseResult.getMissingIndexRowCount() + 1);
             }
             byte[] dataKey = indexMaintainer.buildDataRowKey(new ImmutableBytesWritable(indexKey), viewConstants);
-            logToIndexToolOutputTable(dataKey, indexKey, getTimestamp(mutation), 0, errorMsg, isBeforeRebuild);
+            logToIndexToolOutputTable(dataKey, indexKey, getTimestamp(mutation), 0, errorMsg,
+                isBeforeRebuild, errorType);
         }
         // Leave the invalid and missing rows in indexKeyToMutationMap
         indexKeyToMutationMap.putAll(invalidIndexRows);
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/index/IndexVerificationOutputRepository.java b/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/index/IndexVerificationOutputRepository.java
index 3cf5446..549f876 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/index/IndexVerificationOutputRepository.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/index/IndexVerificationOutputRepository.java
@@ -52,6 +52,7 @@
     private Table outputTable;
     private IndexTool.IndexDisableLoggingType disableLoggingVerifyType =
         IndexTool.IndexDisableLoggingType.NONE;
+    private boolean shouldLogBeyondMaxLookback = true;
 
     public final static String OUTPUT_TABLE_NAME = "PHOENIX_INDEX_TOOL";
     public final static byte[] OUTPUT_TABLE_NAME_BYTES = Bytes.toBytes(OUTPUT_TABLE_NAME);
@@ -71,6 +72,8 @@
     public final static byte[] INDEX_TABLE_TS_BYTES = Bytes.toBytes(INDEX_TABLE_TS);
     public final static String ERROR_MESSAGE = "Error";
     public final static byte[] ERROR_MESSAGE_BYTES = Bytes.toBytes(ERROR_MESSAGE);
+    public static final String ERROR_TYPE = "ErrorType";
+    public static final byte[] ERROR_TYPE_BYTES = Bytes.toBytes(ERROR_TYPE);
 
     public static String  VERIFICATION_PHASE = "Phase";
     public final static byte[] VERIFICATION_PHASE_BYTES = Bytes.toBytes(VERIFICATION_PHASE);
@@ -85,6 +88,15 @@
     public static final byte[] PHASE_BEFORE_VALUE = Bytes.toBytes("BEFORE");
     public static final byte[] PHASE_AFTER_VALUE = Bytes.toBytes("AFTER");
 
+
+    public enum IndexVerificationErrorType {
+        INVALID_ROW,
+        MISSING_ROW,
+        EXTRA_CELLS,
+        BEYOND_MAX_LOOKBACK_INVALID,
+        BEYOND_MAX_LOOKBACK_MISSING,
+        UNKNOWN
+    }
     /**
      * Only usable for the create table / read path or for testing. Use setOutputTable and
      * setIndexTable first to write.
@@ -117,6 +129,10 @@
         this.disableLoggingVerifyType = disableLoggingVerifyType;
     }
 
+    public void setShouldLogBeyondMaxLookback(boolean shouldLogBeyondMaxLookback) {
+        this.shouldLogBeyondMaxLookback = shouldLogBeyondMaxLookback;
+    }
+
     public static byte[] generateOutputTableRowKey(long ts, byte[] indexTableName, byte[] dataRowKey ) {
         byte[] keyPrefix = Bytes.toBytes(Long.toString(ts));
         byte[] rowKey;
@@ -173,9 +189,11 @@
     public void logToIndexToolOutputTable(byte[] dataRowKey, byte[] indexRowKey, long dataRowTs,
                                           long indexRowTs,
                                           String errorMsg, byte[] expectedValue, byte[] actualValue,
-                                          long scanMaxTs, byte[] tableName, boolean isBeforeRebuild)
+                                          long scanMaxTs, byte[] tableName,
+                                          boolean isBeforeRebuild,
+                                          IndexVerificationErrorType errorType)
         throws IOException {
-        if (shouldLogOutput(isBeforeRebuild)) {
+        if (shouldLogOutput(isBeforeRebuild, errorType)) {
             byte[] rowKey = generateOutputTableRowKey(scanMaxTs, indexTable.getName().toBytes(), dataRowKey);
             Put put = new Put(rowKey);
             put.addColumn(OUTPUT_TABLE_COLUMN_FAMILY, DATA_TABLE_NAME_BYTES, tableName);
@@ -193,6 +211,8 @@
                 errorMessageBytes = Bytes.toBytes(errorMsg);
             }
             put.addColumn(OUTPUT_TABLE_COLUMN_FAMILY, ERROR_MESSAGE_BYTES, errorMessageBytes);
+            put.addColumn(OUTPUT_TABLE_COLUMN_FAMILY, ERROR_TYPE_BYTES,
+                Bytes.toBytes(errorType.toString()));
             if (isBeforeRebuild) {
                 put.addColumn(OUTPUT_TABLE_COLUMN_FAMILY, VERIFICATION_PHASE_BYTES, PHASE_BEFORE_VALUE);
             } else {
@@ -202,7 +222,12 @@
         }
     }
 
-    public boolean shouldLogOutput(boolean isBeforeRebuild) {
+    public boolean shouldLogOutput(boolean isBeforeRebuild, IndexVerificationErrorType errorType) {
+        return shouldLogOutputForVerifyType(isBeforeRebuild) &&
+            shouldLogOutputForErrorType(errorType);
+    }
+
+    private boolean shouldLogOutputForVerifyType(boolean isBeforeRebuild) {
         if (disableLoggingVerifyType.equals(IndexTool.IndexDisableLoggingType.BOTH)) {
             return false;
         }
@@ -219,6 +244,15 @@
         return false;
     }
 
+    private boolean shouldLogOutputForErrorType(IndexVerificationErrorType errorType) {
+        if (errorType != null &&
+            (errorType.equals(IndexVerificationErrorType.BEYOND_MAX_LOOKBACK_INVALID) ||
+                errorType.equals(IndexVerificationErrorType.BEYOND_MAX_LOOKBACK_MISSING))){
+            return shouldLogBeyondMaxLookback;
+        }
+        return true;
+    }
+
     public static byte[] getErrorMessageBytes(String errorMsg, byte[] expectedValue, byte[] actualValue) {
         byte[] errorMessageBytes;
         errorMessageBytes = new byte[errorMsg.length() + expectedValue.length + actualValue.length +
@@ -297,6 +331,18 @@
         builder.setExpectedValue(result.getValue(OUTPUT_TABLE_COLUMN_FAMILY, EXPECTED_VALUE_BYTES));
         builder.setActualValue(result.getValue(OUTPUT_TABLE_COLUMN_FAMILY, ACTUAL_VALUE_BYTES));
         builder.setPhaseValue(result.getValue(OUTPUT_TABLE_COLUMN_FAMILY, VERIFICATION_PHASE_BYTES));
+        IndexVerificationErrorType errorType;
+        try {
+          errorType =
+              IndexVerificationErrorType.valueOf(
+                  Bytes.toString(result.getValue(OUTPUT_TABLE_COLUMN_FAMILY, ERROR_TYPE_BYTES)));
+        } catch (Throwable e) {
+            //in case we have a cast exception because an incompatible version of the enum produced
+            //the row, or an earlier version that didn't record error types, it's better to mark
+            // the error type unknown and move on rather than fail
+            errorType = IndexVerificationErrorType.UNKNOWN;
+        }
+        builder.setErrorType(errorType);
         return builder.build();
     }
 
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/index/IndexVerificationOutputRow.java b/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/index/IndexVerificationOutputRow.java
index 8c54796..4dad9b6 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/index/IndexVerificationOutputRow.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/index/IndexVerificationOutputRow.java
@@ -18,6 +18,7 @@
 package org.apache.phoenix.mapreduce.index;
 
 import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.phoenix.mapreduce.index.IndexVerificationOutputRepository.IndexVerificationErrorType;
 
 import java.util.Arrays;
 import java.util.Objects;
@@ -35,13 +36,14 @@
     private byte[] expectedValue;
     private byte[] actualValue;
     private byte[] phaseValue;
+    private IndexVerificationErrorType errorType;
 
     private IndexVerificationOutputRow(String dataTableName, String indexTableName,
                                        byte[] dataTableRowKey, Long scanMaxTimestamp,
                                       byte[] indexTableRowKey,
                                        long dataTableRowTimestamp, long indexTableRowTimestamp,
                                       String errorMessage, byte[] expectedValue, byte[] actualValue,
-                                      byte[] phaseValue) {
+                                      byte[] phaseValue, IndexVerificationErrorType errorType) {
         this.dataTableName = dataTableName;
         this.indexTableName = indexTableName;
         this.scanMaxTimestamp = scanMaxTimestamp;
@@ -53,6 +55,7 @@
         this.expectedValue = expectedValue;
         this.actualValue = actualValue;
         this.phaseValue = phaseValue;
+        this.errorType = errorType;
     }
 
     public String getDataTableName() {
@@ -119,7 +122,8 @@
             Objects.equals(errorMessage, otherRow.getErrorMessage()) &&
             Arrays.equals(expectedValue, otherRow.getExpectedValue()) &&
             Arrays.equals(actualValue, otherRow.getActualValue()) &&
-            Arrays.equals(phaseValue, otherRow.getPhaseValue());
+            Arrays.equals(phaseValue, otherRow.getPhaseValue()) &&
+            Objects.equals(errorType, otherRow.getErrorType());
     }
 
     @Override
@@ -143,9 +147,14 @@
         sb.append(IndexVerificationOutputRepository.ACTUAL_VALUE + ": ").append(Bytes.toString(actualValue)).append(
             ",");
         sb.append(IndexVerificationOutputRepository.VERIFICATION_PHASE + ": ").append(Bytes.toString(phaseValue));
+        sb.append(IndexVerificationOutputRepository.ERROR_TYPE + ": " ).append(Objects.toString(errorType));
         return sb.toString();
     }
 
+    public IndexVerificationErrorType getErrorType() {
+        return errorType;
+    }
+
     public static class IndexVerificationOutputRowBuilder {
         private String dataTableName;
         private String indexTableName;
@@ -158,6 +167,7 @@
         private byte[] expectedValue;
         private byte[] actualValue;
         private byte[] phaseValue;
+        private IndexVerificationErrorType errorType;
 
         public IndexVerificationOutputRowBuilder setDataTableName(String dataTableName) {
             this.dataTableName = dataTableName;
@@ -214,10 +224,15 @@
             return this;
         }
 
+        public IndexVerificationOutputRowBuilder setErrorType(IndexVerificationErrorType errorType) {
+            this.errorType = errorType;
+            return this;
+        }
+
         public IndexVerificationOutputRow build() {
             return new IndexVerificationOutputRow(dataTableName, indexTableName, dataTableRowKey,
                 scanMaxTimestamp, indexTableRowKey, dataTableRowTimestamp, indexTableRowTimestamp,
-                errorMessage, expectedValue, actualValue, phaseValue);
+                errorMessage, expectedValue, actualValue, phaseValue, errorType);
         }
     }
 }
diff --git a/phoenix-core/src/test/java/org/apache/phoenix/index/VerifySingleIndexRowTest.java b/phoenix-core/src/test/java/org/apache/phoenix/index/VerifySingleIndexRowTest.java
index 1b8ed55..86f39b9 100644
--- a/phoenix-core/src/test/java/org/apache/phoenix/index/VerifySingleIndexRowTest.java
+++ b/phoenix-core/src/test/java/org/apache/phoenix/index/VerifySingleIndexRowTest.java
@@ -35,6 +35,7 @@
 import org.apache.phoenix.coprocessor.IndexToolVerificationResult;
 import org.apache.phoenix.hbase.index.IndexRegionObserver;
 import org.apache.phoenix.jdbc.PhoenixConnection;
+import org.apache.phoenix.mapreduce.index.IndexVerificationOutputRepository;
 import org.apache.phoenix.query.BaseConnectionlessQueryTest;
 import org.apache.phoenix.query.QueryConstants;
 import org.apache.phoenix.schema.PTable;
@@ -279,10 +280,13 @@
         doNothing().when(rebuildScanner)
                 .logToIndexToolOutputTable(Matchers.<byte[]>any(),Matchers.<byte[]>any(),
                         Mockito.anyLong(),Mockito.anyLong(), Mockito.anyString(),
-                        Matchers.<byte[]>any(), Matchers.<byte[]>any(), Matchers.anyBoolean());
+                        Matchers.<byte[]>any(), Matchers.<byte[]>any(), Matchers.anyBoolean(),
+                    Mockito.<IndexVerificationOutputRepository.IndexVerificationErrorType>any());
         doNothing().when(rebuildScanner)
                 .logToIndexToolOutputTable(Matchers.<byte[]>any(),Matchers.<byte[]>any(),
-                        Mockito.anyLong(),Mockito.anyLong(), Mockito.anyString(), Matchers.anyBoolean());
+                        Mockito.anyLong(),Mockito.anyLong(), Mockito.anyString(),
+                    Matchers.anyBoolean(),
+                    Mockito.<IndexVerificationOutputRepository.IndexVerificationErrorType>any());
 
         //populate the local map to use to create actual mutations
         indexKeyToMutationMap = Maps.newTreeMap(Bytes.BYTES_COMPARATOR);