PHOENIX-5743 Concurrent read repairs on the same index row should be idempotent
diff --git a/phoenix-core/src/it/java/org/apache/phoenix/end2end/index/GlobalIndexCheckerIT.java b/phoenix-core/src/it/java/org/apache/phoenix/end2end/index/GlobalIndexCheckerIT.java
index 144bcf1..65262ec 100644
--- a/phoenix-core/src/it/java/org/apache/phoenix/end2end/index/GlobalIndexCheckerIT.java
+++ b/phoenix-core/src/it/java/org/apache/phoenix/end2end/index/GlobalIndexCheckerIT.java
@@ -300,6 +300,67 @@
}
@Test
+ public void testUnverifiedRowRepair() throws Exception {
+ if (async) {
+ // No need to run the same test twice one for async = true and the other for async = false
+ return;
+ }
+ try (Connection conn = DriverManager.getConnection(getUrl())) {
+ String dataTableName = generateUniqueName();
+ conn.createStatement().execute("create table " + dataTableName +
+ " (id varchar(10) not null primary key, val1 varchar(10), val2 varchar(10), val3 varchar(10))" + tableDDLOptions);
+ String indexTableName = generateUniqueName();
+ conn.createStatement().execute("CREATE INDEX " + indexTableName + " on " +
+ dataTableName + " (val1) include (val2, val3)");
+ // Configure IndexRegionObserver to fail the data write phase
+ IndexRegionObserver.setFailDataTableUpdatesForTesting(true);
+ conn.createStatement().execute("upsert into " + dataTableName + " (id, val1, val2) values ('a', 'ab','abc')");
+ commitWithException(conn);
+ // The above upsert will create an unverified index row
+ // Configure IndexRegionObserver to allow the data write phase
+ IndexRegionObserver.setFailDataTableUpdatesForTesting(false);
+ // Verify that this row is not visible
+ String selectSql = "SELECT * from " + dataTableName + " WHERE val1 = 'ab'";
+ // Verify that we will read from the index table
+ assertExplainPlan(conn, selectSql, dataTableName, indexTableName);
+ ResultSet rs = conn.createStatement().executeQuery(selectSql);
+ // Verify that we do not read from the unverified row
+ assertFalse(rs.next());
+ // Insert the same row with a value for val3
+ conn.createStatement().execute("upsert into " + dataTableName + " values ('a', 'ab','abc', 'abcd')");
+ conn.commit();
+ // At this moment val3 in the data table row should not have null value
+ selectSql = "SELECT val3 from " + dataTableName + " WHERE val1 = 'ab'";
+ // Verify that we will read from the index table
+ assertExplainPlan(conn, selectSql, dataTableName, indexTableName);
+ rs = conn.createStatement().executeQuery(selectSql);
+ assertTrue(rs.next());
+ assertEquals("abcd", rs.getString(1));
+ assertFalse(rs.next());
+ // Configure IndexRegionObserver to fail the data write phase
+ IndexRegionObserver.setFailDataTableUpdatesForTesting(true);
+ conn.createStatement().execute("upsert into " + dataTableName + " (id, val1, val2) values ('a', 'ab','abc')");
+ commitWithException(conn);
+ // The above upsert will create an unverified index row
+ // Configure IndexRegionObserver to allow the data write phase
+ IndexRegionObserver.setFailDataTableUpdatesForTesting(false);
+ // Verify that this row is still read back correctly
+ for (int i = 0; i < 2; i++) {
+ selectSql = "SELECT val3 from " + dataTableName + " WHERE val1 = 'ab'";
+ // Verify that we will read from the index table
+ assertExplainPlan(conn, selectSql, dataTableName, indexTableName);
+ rs = conn.createStatement().executeQuery(selectSql);
+ // Verify that the repair of the unverified row did not affect the valid index row
+ assertTrue(rs.next());
+ assertEquals("abcd", rs.getString(1));
+ assertFalse(rs.next());
+ }
+ // Add rows and check everything is still okay
+ verifyTableHealth(conn, dataTableName, indexTableName);
+ }
+ }
+
+ @Test
public void testOnePhaseOverwiteFollowingTwoPhaseWrite() throws Exception {
try (Connection conn = DriverManager.getConnection(getUrl())) {
String dataTableName = generateUniqueName();
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/index/GlobalIndexChecker.java b/phoenix-core/src/main/java/org/apache/phoenix/index/GlobalIndexChecker.java
index 74ae7ce..a65d63c 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/index/GlobalIndexChecker.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/index/GlobalIndexChecker.java
@@ -247,18 +247,9 @@
if ((EnvironmentEdgeManager.currentTimeMillis() - ts) > ageThreshold) {
Delete del = new Delete(indexRowKey, ts);
if (specific) {
- // Get all the cells of this row
- deleteRowScan.withStartRow(indexRowKey, true);
- deleteRowScan.withStopRow(indexRowKey, true);
- deleteRowScan.setTimeRange(0, ts + 1);
- deleteRowScanner = region.getScanner(deleteRowScan);
- row.clear();
- deleteRowScanner.next(row);
- deleteRowScanner.close();
- // We are deleting a specific version of a row so the flowing loop is for that
- for (Cell cell : row) {
- del.addColumn(CellUtil.cloneFamily(cell), CellUtil.cloneQualifier(cell), cell.getTimestamp());
- }
+ del.addFamilyVersion(indexMaintainer.getEmptyKeyValueFamily().copyBytesIfNecessary(), ts);
+ } else {
+ del.addFamily(indexMaintainer.getEmptyKeyValueFamily().copyBytesIfNecessary(), ts);
}
Mutation[] mutations = new Mutation[]{del};
region.batchMutate(mutations);