PHOENIX-5951 - IndexTool output logging for past-max-lookback rows should be configurable
diff --git a/phoenix-core/src/it/java/org/apache/phoenix/end2end/IndexToolForNonTxGlobalIndexIT.java b/phoenix-core/src/it/java/org/apache/phoenix/end2end/IndexToolForNonTxGlobalIndexIT.java
index 1ddab5a..18c140b 100644
--- a/phoenix-core/src/it/java/org/apache/phoenix/end2end/IndexToolForNonTxGlobalIndexIT.java
+++ b/phoenix-core/src/it/java/org/apache/phoenix/end2end/IndexToolForNonTxGlobalIndexIT.java
@@ -41,6 +41,7 @@
import org.apache.hadoop.hbase.regionserver.ScanInfoUtil;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.hbase.util.Pair;
+import org.apache.hadoop.mapreduce.Counters;
import org.apache.phoenix.coprocessor.IndexRebuildRegionScanner;
import org.apache.phoenix.jdbc.PhoenixConnection;
import org.apache.phoenix.mapreduce.index.IndexTool;
@@ -115,6 +116,7 @@
@RunWith(Parameterized.class)
public class IndexToolForNonTxGlobalIndexIT extends BaseUniqueNamesOwnClusterIT {
+ public static final int MAX_LOOKBACK_AGE = 3600;
private final String tableDDLOptions;
private boolean directApi = true;
private boolean useSnapshot = false;
@@ -147,7 +149,7 @@
serverProps.put(QueryServices.EXTRA_JDBC_ARGUMENTS_ATTRIB,
QueryServicesOptions.DEFAULT_EXTRA_JDBC_ARGUMENTS);
serverProps.put(QueryServices.INDEX_REBUILD_PAGE_SIZE_IN_ROWS, Long.toString(8));
- serverProps.put(ScanInfoUtil.PHOENIX_MAX_LOOKBACK_AGE_CONF_KEY, Long.toString(3600));
+ serverProps.put(ScanInfoUtil.PHOENIX_MAX_LOOKBACK_AGE_CONF_KEY, Long.toString(MAX_LOOKBACK_AGE));
serverProps.put(QueryServices.TASK_HANDLING_INTERVAL_MS_ATTRIB,
Long.toString(Long.MAX_VALUE));
serverProps.put(QueryServices.TASK_HANDLING_INITIAL_DELAY_MS_ATTRIB,
@@ -558,6 +560,11 @@
Properties props = PropertiesUtil.deepCopy(TEST_PROPERTIES);
List<String> expectedStatus = new ArrayList<>();
try (Connection conn = DriverManager.getConnection(getUrl(), props)) {
+ //this only seems to work if you truncate, rather than use deleteAllRows, and I'm
+ //not sure why because deleteAllRows seems to work fine for the output table
+ Admin admin = conn.unwrap(PhoenixConnection.class).getQueryServices().getAdmin();
+ admin.disableTable(TableName.valueOf(RESULT_TABLE_NAME));
+ admin.truncateTable(TableName.valueOf(RESULT_TABLE_NAME), true);
conn.createStatement().execute("CREATE TABLE " + dataTableFullName
+ " (ID INTEGER NOT NULL PRIMARY KEY, NAME VARCHAR, CODE VARCHAR) "+tableDDLOptions);
conn.createStatement().execute(String.format(
@@ -574,8 +581,12 @@
expectedStatus.add(RUN_STATUS_EXECUTED);
expectedStatus.add(RUN_STATUS_EXECUTED);
expectedStatus.add(RUN_STATUS_EXECUTED);
-
- verifyRunStatusFromResultTable(conn, scn, indexTableFullName, 3, expectedStatus);
+ try {
+ verifyRunStatusFromResultTable(conn, scn, indexTableFullName, 3, expectedStatus);
+ } catch (AssertionError ae) {
+ TestUtil.dumpTable(conn, TableName.valueOf(RESULT_TABLE_NAME));
+ throw ae;
+ }
deleteOneRowFromResultTable(conn, scn, indexTableFullName);
@@ -933,6 +944,75 @@
}
@Test
+ public void testEnableOutputLoggingForMaxLookback() throws Exception {
+ //by default we don't log invalid or missing rows past max lookback age to the
+ // PHOENIX_INDEX_TOOL table. Verify that we can flip that logging on from the client-side
+ // using a system property (such as from the command line) and have it log rows on the
+ // server-side
+ if (!mutable) {
+ return;
+ }
+ String schemaName = generateUniqueName();
+ String dataTableName = generateUniqueName();
+ String dataTableFullName = SchemaUtil.getTableName(schemaName, dataTableName);
+ String indexTableName = generateUniqueName();
+ String indexTableFullName = SchemaUtil.getTableName(schemaName, indexTableName);
+
+ String oldProperty =
+ System.getProperty(IndexRebuildRegionScanner.PHOENIX_INDEX_MR_LOG_BEYOND_MAX_LOOKBACK_ERRORS);
+ try(Connection conn = DriverManager.getConnection(getUrl())) {
+ System.setProperty(IndexRebuildRegionScanner.PHOENIX_INDEX_MR_LOG_BEYOND_MAX_LOOKBACK_ERRORS, "true");
+ ManualEnvironmentEdge injectEdge = new ManualEnvironmentEdge();
+ injectEdge.setValue(EnvironmentEdgeManager.currentTimeMillis());
+ EnvironmentEdgeManager.injectEdge(injectEdge);
+ deleteAllRows(conn,
+ TableName.valueOf(IndexVerificationOutputRepository.OUTPUT_TABLE_NAME));
+ String stmString1 =
+ "CREATE TABLE " + dataTableFullName
+ + " (ID INTEGER NOT NULL PRIMARY KEY, NAME VARCHAR, ZIP INTEGER) ";
+ conn.createStatement().execute(stmString1);
+
+ injectEdge.incrementValue(1L);
+ String upsertQuery = String.format("UPSERT INTO %s VALUES(?, ?, ?)", dataTableFullName);
+ PreparedStatement stmt1 = conn.prepareStatement(upsertQuery);
+
+ // insert two rows
+ IndexToolIT.upsertRow(stmt1, 1);
+ IndexToolIT.upsertRow(stmt1, 2);
+ conn.commit();
+ injectEdge.incrementValue(1L); //we have to increment time to see our writes
+ //now create an index async so it won't have the two rows in the base table
+
+ String stmtString2 =
+ String.format(
+ "CREATE INDEX %s ON %s (LPAD(UPPER(NAME, 'en_US'),8,'x')||'_xyz') ASYNC ",
+ indexTableName, dataTableFullName);
+ conn.createStatement().execute(stmtString2);
+ conn.commit();
+ injectEdge.incrementValue(MAX_LOOKBACK_AGE * 1000);
+ deleteAllRows(conn, TableName.valueOf(IndexVerificationOutputRepository.OUTPUT_TABLE_NAME));
+ IndexTool it = IndexToolIT.runIndexTool(directApi, useSnapshot, schemaName,
+ dataTableName,
+ indexTableName, null, 0, IndexTool.IndexVerifyType.ONLY);
+ TestUtil.dumpTable(conn, TableName.valueOf(IndexVerificationOutputRepository.OUTPUT_TABLE_NAME));
+ Counters counters = it.getJob().getCounters();
+ System.out.println(counters.toString());
+ assertEquals(2L,
+ counters.findCounter(BEFORE_REBUILD_BEYOND_MAXLOOKBACK_MISSING_INDEX_ROW_COUNT).getValue());
+
+ IndexVerificationOutputRepository outputRepository =
+ new IndexVerificationOutputRepository(Bytes.toBytes(indexTableFullName), conn);
+ List<IndexVerificationOutputRow> outputRows = outputRepository.getAllOutputRows();
+ assertEquals(0, outputRows.size());
+ } finally {
+ if (oldProperty != null) {
+ System.setProperty(IndexRebuildRegionScanner.PHOENIX_INDEX_MR_LOG_BEYOND_MAX_LOOKBACK_ERRORS, oldProperty);
+ }
+ EnvironmentEdgeManager.reset();
+ }
+ }
+
+ @Test
public void testUpdatablePKFilterViewIndexRebuild() throws Exception {
if (!mutable) {
return;
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/IndexRebuildRegionScanner.java b/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/IndexRebuildRegionScanner.java
index 6f39837..f7864b2 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/IndexRebuildRegionScanner.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/IndexRebuildRegionScanner.java
@@ -123,9 +123,11 @@
@VisibleForTesting
public IndexRebuildRegionScanner(final RegionScanner innerScanner, final Region region, final Scan scan,
- final RegionCoprocessorEnvironment env) throws IOException {
+ final RegionCoprocessorEnvironment env,
+ IndexTool.IndexVerifyType verifyType) throws IOException {
super(innerScanner, region, scan, env);
this.env = env;
+ this.verifyType = verifyType;
indexRowKeyforReadRepair = scan.getAttribute(BaseScannerRegionObserver.INDEX_ROW_KEY);
if (indexRowKeyforReadRepair != null) {
setReturnCodeForSingleRowRebuild();
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/UngroupedAggregateRegionObserver.java b/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/UngroupedAggregateRegionObserver.java
index af09c3c..a1d5a83 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/UngroupedAggregateRegionObserver.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/UngroupedAggregateRegionObserver.java
@@ -1103,13 +1103,13 @@
if (oldCoproc) {
return new IndexerRegionScanner(scanner, region, scan, env, this);
} else {
- return new IndexRebuildRegionScanner(scanner, region, scan, env);
+ return new IndexRebuildRegionScanner(scanner, region, scan, env, verifyType);
}
}
if (oldCoproc) {
return new IndexerRegionScanner(innerScanner, region, scan, env, this);
} else {
- return new IndexRebuildRegionScanner(innerScanner, region, scan, env);
+ return new IndexRebuildRegionScanner(innerScanner, region, scan, env, verifyType);
}
}