PHOENIX-5951 - IndexTool output logging for past-max-lookback rows should be configurable
diff --git a/phoenix-core/src/it/java/org/apache/phoenix/end2end/IndexToolForNonTxGlobalIndexIT.java b/phoenix-core/src/it/java/org/apache/phoenix/end2end/IndexToolForNonTxGlobalIndexIT.java
index 1ddab5a..18c140b 100644
--- a/phoenix-core/src/it/java/org/apache/phoenix/end2end/IndexToolForNonTxGlobalIndexIT.java
+++ b/phoenix-core/src/it/java/org/apache/phoenix/end2end/IndexToolForNonTxGlobalIndexIT.java
@@ -41,6 +41,7 @@
 import org.apache.hadoop.hbase.regionserver.ScanInfoUtil;
 import org.apache.hadoop.hbase.util.Bytes;
 import org.apache.hadoop.hbase.util.Pair;
+import org.apache.hadoop.mapreduce.Counters;
 import org.apache.phoenix.coprocessor.IndexRebuildRegionScanner;
 import org.apache.phoenix.jdbc.PhoenixConnection;
 import org.apache.phoenix.mapreduce.index.IndexTool;
@@ -115,6 +116,7 @@
 @RunWith(Parameterized.class)
 public class IndexToolForNonTxGlobalIndexIT extends BaseUniqueNamesOwnClusterIT {
 
+    public static final int MAX_LOOKBACK_AGE = 3600;
     private final String tableDDLOptions;
     private boolean directApi = true;
     private boolean useSnapshot = false;
@@ -147,7 +149,7 @@
         serverProps.put(QueryServices.EXTRA_JDBC_ARGUMENTS_ATTRIB,
                 QueryServicesOptions.DEFAULT_EXTRA_JDBC_ARGUMENTS);
         serverProps.put(QueryServices.INDEX_REBUILD_PAGE_SIZE_IN_ROWS, Long.toString(8));
-        serverProps.put(ScanInfoUtil.PHOENIX_MAX_LOOKBACK_AGE_CONF_KEY, Long.toString(3600));
+        serverProps.put(ScanInfoUtil.PHOENIX_MAX_LOOKBACK_AGE_CONF_KEY, Long.toString(MAX_LOOKBACK_AGE));
         serverProps.put(QueryServices.TASK_HANDLING_INTERVAL_MS_ATTRIB,
             Long.toString(Long.MAX_VALUE));
         serverProps.put(QueryServices.TASK_HANDLING_INITIAL_DELAY_MS_ATTRIB,
@@ -558,6 +560,11 @@
         Properties props = PropertiesUtil.deepCopy(TEST_PROPERTIES);
         List<String> expectedStatus = new ArrayList<>();
         try (Connection conn = DriverManager.getConnection(getUrl(), props)) {
+            //this only seems to work if you truncate, rather than use deleteAllRows, and I'm
+            //not sure why because deleteAllRows seems to work fine for the output table
+            Admin admin = conn.unwrap(PhoenixConnection.class).getQueryServices().getAdmin();
+            admin.disableTable(TableName.valueOf(RESULT_TABLE_NAME));
+            admin.truncateTable(TableName.valueOf(RESULT_TABLE_NAME), true);
             conn.createStatement().execute("CREATE TABLE " + dataTableFullName
                     + " (ID INTEGER NOT NULL PRIMARY KEY, NAME VARCHAR, CODE VARCHAR) "+tableDDLOptions);
             conn.createStatement().execute(String.format(
@@ -574,8 +581,12 @@
             expectedStatus.add(RUN_STATUS_EXECUTED);
             expectedStatus.add(RUN_STATUS_EXECUTED);
             expectedStatus.add(RUN_STATUS_EXECUTED);
-
-            verifyRunStatusFromResultTable(conn, scn, indexTableFullName, 3, expectedStatus);
+            try {
+                verifyRunStatusFromResultTable(conn, scn, indexTableFullName, 3, expectedStatus);
+            } catch (AssertionError ae) {
+                TestUtil.dumpTable(conn, TableName.valueOf(RESULT_TABLE_NAME));
+                throw ae;
+            }
 
             deleteOneRowFromResultTable(conn, scn, indexTableFullName);
 
@@ -933,6 +944,75 @@
     }
 
     @Test
+    public void testEnableOutputLoggingForMaxLookback() throws Exception {
+        //by default we don't log invalid or missing rows past max lookback age to the
+        // PHOENIX_INDEX_TOOL table. Verify that we can flip that logging on from the client-side
+        // using a system property (such as from the command line) and have it log rows on the
+        // server-side
+        if (!mutable) {
+            return;
+        }
+        String schemaName = generateUniqueName();
+        String dataTableName = generateUniqueName();
+        String dataTableFullName = SchemaUtil.getTableName(schemaName, dataTableName);
+        String indexTableName = generateUniqueName();
+        String indexTableFullName = SchemaUtil.getTableName(schemaName, indexTableName);
+
+        String oldProperty =
+            System.getProperty(IndexRebuildRegionScanner.PHOENIX_INDEX_MR_LOG_BEYOND_MAX_LOOKBACK_ERRORS);
+        try(Connection conn = DriverManager.getConnection(getUrl())) {
+            System.setProperty(IndexRebuildRegionScanner.PHOENIX_INDEX_MR_LOG_BEYOND_MAX_LOOKBACK_ERRORS, "true");
+            ManualEnvironmentEdge injectEdge = new ManualEnvironmentEdge();
+            injectEdge.setValue(EnvironmentEdgeManager.currentTimeMillis());
+            EnvironmentEdgeManager.injectEdge(injectEdge);
+            deleteAllRows(conn,
+                TableName.valueOf(IndexVerificationOutputRepository.OUTPUT_TABLE_NAME));
+            String stmString1 =
+                "CREATE TABLE " + dataTableFullName
+                    + " (ID INTEGER NOT NULL PRIMARY KEY, NAME VARCHAR, ZIP INTEGER) ";
+            conn.createStatement().execute(stmString1);
+
+            injectEdge.incrementValue(1L);
+            String upsertQuery = String.format("UPSERT INTO %s VALUES(?, ?, ?)", dataTableFullName);
+            PreparedStatement stmt1 = conn.prepareStatement(upsertQuery);
+
+            // insert two rows
+            IndexToolIT.upsertRow(stmt1, 1);
+            IndexToolIT.upsertRow(stmt1, 2);
+            conn.commit();
+            injectEdge.incrementValue(1L); //we have to increment time to see our writes
+            //now create an index async so it won't have the two rows in the base table
+
+            String stmtString2 =
+                String.format(
+                    "CREATE INDEX %s ON %s (LPAD(UPPER(NAME, 'en_US'),8,'x')||'_xyz') ASYNC ",
+                    indexTableName, dataTableFullName);
+            conn.createStatement().execute(stmtString2);
+            conn.commit();
+            injectEdge.incrementValue(MAX_LOOKBACK_AGE * 1000);
+            deleteAllRows(conn, TableName.valueOf(IndexVerificationOutputRepository.OUTPUT_TABLE_NAME));
+            IndexTool it = IndexToolIT.runIndexTool(directApi, useSnapshot, schemaName,
+                dataTableName,
+                indexTableName, null, 0, IndexTool.IndexVerifyType.ONLY);
+            TestUtil.dumpTable(conn, TableName.valueOf(IndexVerificationOutputRepository.OUTPUT_TABLE_NAME));
+            Counters counters = it.getJob().getCounters();
+            System.out.println(counters.toString());
+            assertEquals(2L,
+                counters.findCounter(BEFORE_REBUILD_BEYOND_MAXLOOKBACK_MISSING_INDEX_ROW_COUNT).getValue());
+
+            IndexVerificationOutputRepository outputRepository =
+                new IndexVerificationOutputRepository(Bytes.toBytes(indexTableFullName), conn);
+            List<IndexVerificationOutputRow> outputRows = outputRepository.getAllOutputRows();
+            assertEquals(0, outputRows.size());
+        } finally {
+            if (oldProperty != null) {
+                System.setProperty(IndexRebuildRegionScanner.PHOENIX_INDEX_MR_LOG_BEYOND_MAX_LOOKBACK_ERRORS, oldProperty);
+            }
+            EnvironmentEdgeManager.reset();
+        }
+    }
+
+    @Test
     public void testUpdatablePKFilterViewIndexRebuild() throws Exception {
         if (!mutable) {
             return;
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/IndexRebuildRegionScanner.java b/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/IndexRebuildRegionScanner.java
index 6f39837..f7864b2 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/IndexRebuildRegionScanner.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/IndexRebuildRegionScanner.java
@@ -123,9 +123,11 @@
 
     @VisibleForTesting
     public IndexRebuildRegionScanner(final RegionScanner innerScanner, final Region region, final Scan scan,
-                              final RegionCoprocessorEnvironment env) throws IOException {
+                                     final RegionCoprocessorEnvironment env,
+                                     IndexTool.IndexVerifyType verifyType) throws IOException {
         super(innerScanner, region, scan, env);
         this.env = env;
+        this.verifyType = verifyType;
         indexRowKeyforReadRepair = scan.getAttribute(BaseScannerRegionObserver.INDEX_ROW_KEY);
         if (indexRowKeyforReadRepair != null) {
             setReturnCodeForSingleRowRebuild();
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/UngroupedAggregateRegionObserver.java b/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/UngroupedAggregateRegionObserver.java
index af09c3c..a1d5a83 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/UngroupedAggregateRegionObserver.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/UngroupedAggregateRegionObserver.java
@@ -1103,13 +1103,13 @@
             if (oldCoproc) {
                 return new IndexerRegionScanner(scanner, region, scan, env, this);
             } else {
-                return new IndexRebuildRegionScanner(scanner, region, scan, env);
+                return new IndexRebuildRegionScanner(scanner, region, scan, env, verifyType);
             }
         }
         if (oldCoproc) {
             return new IndexerRegionScanner(innerScanner, region, scan, env, this);
         } else {
-            return new IndexRebuildRegionScanner(innerScanner, region, scan, env);
+            return new IndexRebuildRegionScanner(innerScanner, region, scan, env, verifyType);
         }
     }