PHOENIX-5597 No read repair happens when scans filter rows based on a covered column
diff --git a/phoenix-core/src/it/java/org/apache/phoenix/end2end/index/GlobalIndexCheckerIT.java b/phoenix-core/src/it/java/org/apache/phoenix/end2end/index/GlobalIndexCheckerIT.java
index 2cf3f1a..9a86bb4 100644
--- a/phoenix-core/src/it/java/org/apache/phoenix/end2end/index/GlobalIndexCheckerIT.java
+++ b/phoenix-core/src/it/java/org/apache/phoenix/end2end/index/GlobalIndexCheckerIT.java
@@ -17,7 +17,6 @@
*/
package org.apache.phoenix.end2end.index;
-import static org.apache.phoenix.end2end.index.ImmutableIndexIT.verifyRowsForEmptyColValue;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertTrue;
@@ -35,23 +34,12 @@
import com.google.common.collect.Maps;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
-import org.apache.hadoop.hbase.CellScanner;
-import org.apache.hadoop.hbase.client.HBaseAdmin;
-import org.apache.hadoop.hbase.client.HTable;
-import org.apache.hadoop.hbase.client.Result;
-import org.apache.hadoop.hbase.client.ResultScanner;
-import org.apache.hadoop.hbase.client.Scan;
-import org.apache.hadoop.hbase.client.Table;
-import org.apache.hadoop.hbase.util.Bytes;
import org.apache.phoenix.end2end.BaseUniqueNamesOwnClusterIT;
import org.apache.phoenix.end2end.IndexToolIT;
import org.apache.phoenix.hbase.index.IndexRegionObserver;
-import org.apache.phoenix.jdbc.PhoenixConnection;
-import org.apache.phoenix.schema.PTable;
import org.apache.phoenix.util.PhoenixRuntime;
import org.apache.phoenix.util.QueryUtil;
import org.apache.phoenix.util.ReadOnlyProps;
-import org.apache.phoenix.util.TestUtil;
import org.junit.After;
import org.junit.BeforeClass;
import org.junit.Test;
@@ -258,7 +246,7 @@
conn.createStatement().execute("upsert into " + dataTableName + " (id, val1, val2) values ('c', 'cd','cde')");
conn.commit();
IndexRegionObserver.setFailPostIndexUpdatesForTesting(false);
- String selectSql = "SELECT val2, val3 from " + dataTableName + " WHERE val1 = 'ab'";
+ String selectSql = "SELECT val2, val3 from " + dataTableName + " WHERE val1 = 'ab' and val2 = 'abcc'";
// Verify that we will read from the index table
assertExplainPlan(conn, selectSql, dataTableName, indexTableName);
ResultSet rs = conn.createStatement().executeQuery(selectSql);
@@ -271,47 +259,6 @@
}
}
- public static void checkUnverifiedCellCount(Connection conn, String indexTableName) throws Exception {
- Table hIndexTable = conn.unwrap(PhoenixConnection.class).getQueryServices()
- .getTable(Bytes.toBytes(indexTableName));
- long indexCnt = TestUtil.getRowCount(hIndexTable, false);
- assertEquals(1, indexCnt);
- assertEquals(true, verifyRowsForEmptyColValue(conn, indexTableName, IndexRegionObserver.UNVERIFIED_BYTES));
- Scan s = new Scan();
- int cntCellValues = 0;
- try (ResultScanner scanner = hIndexTable.getScanner(s)) {
- Result result;
- while ((result = scanner.next()) != null) {
- CellScanner cellScanner = result.cellScanner();
- while (cellScanner.advance()) {
- cntCellValues++;
- }
- }
- }
- assertEquals(1, cntCellValues);
- }
- @Test
- public void testUnverifiedRowIncludesOnlyEmptyCell() throws Exception {
- String dataTableName = generateUniqueName();
- try (Connection conn = DriverManager.getConnection(getUrl())) {
- conn.createStatement().execute("create table " + dataTableName +
- " (id varchar(10) not null primary key, val1 varchar(10), val2 varchar(10), val3 varchar(10))" + tableDDLOptions);
- String indexTableName = generateUniqueName();
- conn.createStatement().execute("CREATE INDEX " + indexTableName + " on " +
- dataTableName + " (val1) include (val2, val3)");
- // Configure IndexRegionObserver to fail the last write phase (i.e., the post index update phase)
- IndexRegionObserver.setFailPostIndexUpdatesForTesting(true);
- conn.createStatement().execute("upsert into " + dataTableName + " (id, val2) values ('a', 'abcc')");
- conn.commit();
- IndexRegionObserver.setFailPostIndexUpdatesForTesting(false);
- // check that in the first phase we don't send the full row.
- // We count the num of cells for this
- checkUnverifiedCellCount(conn, indexTableName);
- // Add rows and check everything is still okay
- verifyTableHealth(conn, dataTableName, indexTableName);
- }
- }
-
@Test
public void testOnePhaseOverwiteFollowingTwoPhaseWrite() throws Exception {
try (Connection conn = DriverManager.getConnection(getUrl())) {
diff --git a/phoenix-core/src/it/java/org/apache/phoenix/end2end/index/ImmutableIndexIT.java b/phoenix-core/src/it/java/org/apache/phoenix/end2end/index/ImmutableIndexIT.java
index 8d6b650..f1ada39 100644
--- a/phoenix-core/src/it/java/org/apache/phoenix/end2end/index/ImmutableIndexIT.java
+++ b/phoenix-core/src/it/java/org/apache/phoenix/end2end/index/ImmutableIndexIT.java
@@ -422,39 +422,6 @@
}
}
- @Test
- public void testGlobalImmutableIndexUnverifiedOnlyInPhase1() throws Exception {
- if (localIndex || transactionProvider != null) {
- return;
- }
- Properties props = PropertiesUtil.deepCopy(TEST_PROPERTIES);
- String tableName = "TBL_" + generateUniqueName();
- String indexName = "IND_" + generateUniqueName();
- String fullTableName = SchemaUtil.getTableName(TestUtil.DEFAULT_SCHEMA_NAME, tableName);
- String fullIndexName = SchemaUtil.getTableName(TestUtil.DEFAULT_SCHEMA_NAME, indexName);
- TABLE_NAME = fullTableName;
- try (Connection conn = DriverManager.getConnection(getUrl(), props);
- Admin admin = conn.unwrap(PhoenixConnection.class).getQueryServices().getAdmin();) {
- conn.setAutoCommit(true);
- createAndPopulateTableAndIndexForConsistentIndex(conn, fullTableName, fullIndexName, 0, null);
-
- // Now force fail
- TestUtil.removeCoprocessor(conn, fullTableName, IndexRegionObserver.class);
- TestUtil.addCoprocessor(conn, fullTableName, UpsertFailingRegionObserver.class);
- try {
- upsertRows(conn, fullTableName, 1);
- } catch (Exception e) {
- // ignore this since we force the fail
- }
-
- ResultSet rs = conn.createStatement().executeQuery("SELECT /*+ NO_INDEX */ COUNT(*) FROM " + TABLE_NAME);
- assertTrue(rs.next());
- assertEquals(0, rs.getInt(1));
-
- GlobalIndexCheckerIT.checkUnverifiedCellCount(conn, fullIndexName);
- }
- }
-
public static class DeleteFailingRegionObserver extends SimpleRegionObserver {
@Override
public void preBatchMutate(ObserverContext<RegionCoprocessorEnvironment> c, MiniBatchOperationInProgress<Mutation> miniBatchOp) throws
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/execute/MutationState.java b/phoenix-core/src/main/java/org/apache/phoenix/execute/MutationState.java
index 8c38f1e..95a354b 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/execute/MutationState.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/execute/MutationState.java
@@ -1233,7 +1233,7 @@
}
if (m instanceof Delete) {
Put put = new Put(m.getRow());
- put.addColumn(emptyCF, emptyCQ, IndexRegionObserver.getMaxTimestamp(m) - 1,
+ put.addColumn(emptyCF, emptyCQ, IndexRegionObserver.getMaxTimestamp(m),
IndexRegionObserver.UNVERIFIED_BYTES);
// The Delete gets marked as unverified in Phase 1 and gets deleted on Phase 3.
addToMap(unverifiedIndexMutations, tableInfo, put);
@@ -1241,19 +1241,17 @@
} else if (m instanceof Put) {
long timestamp = IndexRegionObserver.getMaxTimestamp(m);
- // Phase 1 index mutations are set to unverified.
- // Just send empty with Unverified
- Put unverifiedPut = new Put(m.getRow());
- unverifiedPut.addColumn(emptyCF, emptyCQ, timestamp - 1, IndexRegionObserver.UNVERIFIED_BYTES);
- addToMap(unverifiedIndexMutations, tableInfo, unverifiedPut);
-
- // Phase 3 mutations are verified
- // Send entire mutation with verified
+ // Phase 1 index mutations are set to unverified
+ // Send entire mutation with the unverified status
// Remove the empty column prepared by Index codec as we need to change its value
IndexRegionObserver.removeEmptyColumn(m, emptyCF, emptyCQ);
- ((Put) m).addColumn(emptyCF, emptyCQ, timestamp,
- IndexRegionObserver.VERIFIED_BYTES);
- addToMap(verifiedOrDeletedIndexMutations, tableInfo, m);
+ ((Put) m).addColumn(emptyCF, emptyCQ, timestamp, IndexRegionObserver.UNVERIFIED_BYTES);
+ addToMap(unverifiedIndexMutations, tableInfo, m);
+
+ // Phase 3 mutations are verified
+ Put verifiedPut = new Put(m.getRow());
+ verifiedPut.addColumn(emptyCF, emptyCQ, timestamp, IndexRegionObserver.VERIFIED_BYTES);
+ addToMap(verifiedOrDeletedIndexMutations, tableInfo, verifiedPut);
} else {
addToMap(unverifiedIndexMutations, tableInfo, m);
}
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/IndexRegionObserver.java b/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/IndexRegionObserver.java
index a33a3ee..fa4375c 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/IndexRegionObserver.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/IndexRegionObserver.java
@@ -18,13 +18,10 @@
package org.apache.phoenix.hbase.index;
import static org.apache.phoenix.hbase.index.util.IndexManagementUtil.rethrowIndexingException;
-import static org.apache.phoenix.index.IndexMaintainer.getIndexMaintainer;
-import static org.apache.phoenix.util.ServerUtil.wrapInDoNotRetryIOException;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Collection;
-import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
@@ -157,10 +154,9 @@
// the verified column) will have the value true ("verified") on the put mutations
private ListMultimap<HTableInterfaceReference, Mutation> postIndexUpdates;
// The collection of candidate index mutations that will be applied after the data table mutations
- private ListMultimap<HTableInterfaceReference, Pair<Mutation, byte[]>> intermediatePostIndexUpdates;
+ private ListMultimap<HTableInterfaceReference, Pair<Mutation, byte[]>> indexUpdates;
private List<RowLock> rowLocks = Lists.newArrayListWithExpectedSize(QueryServicesOptions.DEFAULT_MUTATE_BATCH_SIZE);
private HashSet<ImmutableBytesPtr> rowsToLock = new HashSet<>();
- private long dataWriteStartTime;
private boolean rebuild;
private BatchMutateContext(int clientVersion) {
this.clientVersion = clientVersion;
@@ -551,10 +547,10 @@
current = NullSpan.INSTANCE;
}
// get the index updates for all elements in this batch
- context.intermediatePostIndexUpdates = ArrayListMultimap.<HTableInterfaceReference, Pair<Mutation, byte[]>>create();
- this.builder.getIndexUpdates(context.intermediatePostIndexUpdates, miniBatchOp, mutations, indexMetaData);
+ context.indexUpdates = ArrayListMultimap.<HTableInterfaceReference, Pair<Mutation, byte[]>>create();
+ this.builder.getIndexUpdates(context.indexUpdates, miniBatchOp, mutations, indexMetaData);
current.addTimelineAnnotation("Built index updates, doing preStep");
- handleLocalIndexUpdates(c, miniBatchOp, context.intermediatePostIndexUpdates);
+ handleLocalIndexUpdates(c, miniBatchOp, context.indexUpdates);
context.preIndexUpdates = ArrayListMultimap.<HTableInterfaceReference, Mutation>create();
int updateCount = 0;
for (IndexMaintainer indexMaintainer : maintainers) {
@@ -563,14 +559,11 @@
byte[] emptyCQ = indexMaintainer.getEmptyKeyValueQualifier();
HTableInterfaceReference hTableInterfaceReference =
new HTableInterfaceReference(new ImmutableBytesPtr(indexMaintainer.getIndexTableName()));
- Iterator<Pair<Mutation, byte[]>> indexUpdatesItr =
- context.intermediatePostIndexUpdates.get(hTableInterfaceReference).iterator();
- while (indexUpdatesItr.hasNext()) {
- Pair<Mutation, byte[]> next = indexUpdatesItr.next();
+ List <Pair<Mutation, byte[]>> updates = context.indexUpdates.get(hTableInterfaceReference);
+ for (Pair<Mutation, byte[]> update : updates) {
// add the VERIFIED cell, which is the empty cell
- Mutation m = next.getFirst();
+ Mutation m = update.getFirst();
if (context.rebuild) {
- indexUpdatesItr.remove();
if (m instanceof Put) {
long ts = getMaxTimestamp(m);
// Remove the empty column prepared by Index codec as we need to change its value
@@ -579,15 +572,20 @@
}
context.preIndexUpdates.put(hTableInterfaceReference, m);
} else {
- // For this mutation whether it is put or delete, set the status of the index row "unverified"
- // This will be done before the data table row is updated (i.e., in the first write phase)
- Put unverifiedPut = new Put(m.getRow());
- unverifiedPut.addColumn(emptyCF, emptyCQ, now - 1, UNVERIFIED_BYTES);
- context.preIndexUpdates.put(hTableInterfaceReference, unverifiedPut);
if (m instanceof Put) {
// Remove the empty column prepared by Index codec as we need to change its value
removeEmptyColumn(m, emptyCF, emptyCQ);
- ((Put) m).addColumn(emptyCF, emptyCQ, now, VERIFIED_BYTES);
+ // Set the status of the index row to "unverified"
+ ((Put) m).addColumn(emptyCF, emptyCQ, now, UNVERIFIED_BYTES);
+ // This will be done before the data table row is updated (i.e., in the first write phase)
+ context.preIndexUpdates.put(hTableInterfaceReference, m);
+ }
+ else {
+ // Set the status of the index row to "unverified"
+ Put unverifiedPut = new Put(m.getRow());
+ unverifiedPut.addColumn(emptyCF, emptyCQ, now, UNVERIFIED_BYTES);
+ // This will be done before the data table row is updated (i.e., in the first write phase)
+ context.preIndexUpdates.put(hTableInterfaceReference, unverifiedPut);
}
}
}
@@ -609,6 +607,45 @@
return (PhoenixIndexMetaData)indexMetaData;
}
+ private void preparePostIndexMutations(
+ BatchMutateContext context,
+ long now,
+ PhoenixIndexMetaData indexMetaData) throws Throwable {
+ context.postIndexUpdates = ArrayListMultimap.<HTableInterfaceReference, Mutation>create();
+ List<IndexMaintainer> maintainers = indexMetaData.getIndexMaintainers();
+ // Check if we need to skip post index update for any of the rows
+ for (IndexMaintainer indexMaintainer : maintainers) {
+ byte[] emptyCF = indexMaintainer.getEmptyKeyValueFamily().copyBytesIfNecessary();
+ byte[] emptyCQ = indexMaintainer.getEmptyKeyValueQualifier();
+ HTableInterfaceReference hTableInterfaceReference =
+ new HTableInterfaceReference(new ImmutableBytesPtr(indexMaintainer.getIndexTableName()));
+ List <Pair<Mutation, byte[]>> updates = context.indexUpdates.get(hTableInterfaceReference);
+ for (Pair<Mutation, byte[]> update : updates) {
+ // Are there concurrent updates on the data table row? if so, skip post index updates
+ // and let read repair resolve conflicts
+ ImmutableBytesPtr rowKey = new ImmutableBytesPtr(update.getSecond());
+ PendingRow pendingRow = pendingRows.get(rowKey);
+ if (!pendingRow.isConcurrent()) {
+ Mutation m = update.getFirst();
+ if (m instanceof Put) {
+ Put verifiedPut = new Put(m.getRow());
+ // Set the status of the index row to "verified"
+ verifiedPut.addColumn(emptyCF, emptyCQ, now, VERIFIED_BYTES);
+ context.postIndexUpdates.put(hTableInterfaceReference, verifiedPut);
+ }
+ else {
+ context.postIndexUpdates.put(hTableInterfaceReference, m);
+ }
+
+ }
+ }
+ }
+ // We are done with handling concurrent mutations. So we can remove the rows of this batch from
+ // the collection of pending rows
+ removePendingRows(context);
+ context.indexUpdates.clear();
+ }
+
public void preBatchMutateWithExceptions(ObserverContext<RegionCoprocessorEnvironment> c,
MiniBatchOperationInProgress<Mutation> miniBatchOp) throws Throwable {
ignoreAtomicOperations(miniBatchOp);
@@ -658,37 +695,15 @@
}
// Do the first phase index updates
doPre(c, context, miniBatchOp);
- context.postIndexUpdates = ArrayListMultimap.<HTableInterfaceReference, Mutation>create();
if (!context.rebuild) {
- List<IndexMaintainer> maintainers = indexMetaData.getIndexMaintainers();
// Acquire the locks again before letting the region proceed with data table updates
List<RowLock> rowLocks = Lists.newArrayListWithExpectedSize(context.rowLocks.size());
for (RowLock rowLock : context.rowLocks) {
rowLocks.add(lockManager.lockRow(rowLock.getRowKey(), rowLockWaitDuration));
}
- context.dataWriteStartTime = EnvironmentEdgeManager.currentTimeMillis();
context.rowLocks.clear();
context.rowLocks = rowLocks;
- // Check if we need to skip post index update for any of the row
- for (IndexMaintainer indexMaintainer : maintainers) {
- HTableInterfaceReference hTableInterfaceReference =
- new HTableInterfaceReference(new ImmutableBytesPtr(indexMaintainer.getIndexTableName()));
- Iterator<Pair<Mutation, byte[]>> iterator =
- context.intermediatePostIndexUpdates.get(hTableInterfaceReference).iterator();
- while (iterator.hasNext()) {
- // Are there concurrent updates on the data table row? if so, skip post index updates
- // and let read repair resolve conflicts
- Pair<Mutation, byte[]> update = iterator.next();
- ImmutableBytesPtr rowKey = new ImmutableBytesPtr(update.getSecond());
- PendingRow pendingRow = pendingRows.get(rowKey);
- if (!pendingRow.isConcurrent()) {
- context.postIndexUpdates.put(hTableInterfaceReference, update.getFirst());
- }
- }
- }
- // We are done with handling concurrent mutations. So we can remove the rows of this batch from
- // the collection of pending rows
- removePendingRows(context);
+ preparePostIndexMutations(context, now, indexMetaData);
}
if (failDataTableUpdatesForTesting) {
throw new DoNotRetryIOException("Simulating the data table write failure");
@@ -721,23 +736,6 @@
for (RowLock rowLock : context.rowLocks) {
rowLock.release();
}
- // Sleep for one millisecond if we have done the data table updates in less than 1 ms. The sleep is necessary
- // not to allow another data table write on the same row within the same ms. The sleep is very rare to happen.
- // Assume that a data table write completes at timestamp t. Now assume another write happens on the same data
- // row at timestamp t+1. The mutation for unverifying the index table row(s) for the previous write will have
- // timestamp t. If this happens before the last phase of the first write (with timestamp t) completes, then
- // the index row(s) for the first write will have the verified status. We do not want this to happen as the
- // updates for the first write are supposed to be overwritten (unverified) by the second write.
-
- if (!context.rowLocks.isEmpty() && context.dataWriteStartTime == EnvironmentEdgeManager.currentTimeMillis()) {
- try {
- Thread.sleep(1);
- } catch (InterruptedException e) {
- wrapInDoNotRetryIOException("Thread sleep is interrupted after data write", e,
- EnvironmentEdgeManager.currentTimeMillis());
- }
- LOG.debug("After data write, slept 1ms for " + c.getEnvironment().getRegion().getRegionInfo().getTable().getNameAsString());
- }
this.builder.batchCompleted(miniBatchOp);
if (success) { // The pre-index and data table updates are successful, and now, do post index updates
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/query/QueryServicesOptions.java b/phoenix-core/src/main/java/org/apache/phoenix/query/QueryServicesOptions.java
index 441cd36..8a2cbec 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/query/QueryServicesOptions.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/query/QueryServicesOptions.java
@@ -355,7 +355,7 @@
public static final long DEFAULT_TASK_HANDLING_MAX_INTERVAL_MS = 30*60*1000; // 30 min
public static final long DEFAULT_TASK_HANDLING_INITIAL_DELAY_MS = 10*1000; // 10 sec
- public static final long DEFAULT_GLOBAL_INDEX_ROW_AGE_THRESHOLD_TO_DELETE_MS = 10*60*1000; /* 10 minutes */
+ public static final long DEFAULT_GLOBAL_INDEX_ROW_AGE_THRESHOLD_TO_DELETE_MS = 7*24*60*60*1000; /* 7 days */
public static final boolean DEFAULT_INDEX_REGION_OBSERVER_ENABLED = true;
public static final long DEFAULT_INDEX_REBUILD_PAGE_SIZE_IN_ROWS = 16*1024;