PHOENIX-5791 Eliminate false invalid row detection due to concurrent updates
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/IndexRebuildRegionScanner.java b/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/IndexRebuildRegionScanner.java
index 912945e..ac557ca 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/IndexRebuildRegionScanner.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/IndexRebuildRegionScanner.java
@@ -524,11 +524,13 @@
errorMsg, null, null);
return false;
}
+ int expectedCellCount = 0;
for (List<Cell> cells : expected.getFamilyCellMap().values()) {
if (cells == null) {
continue;
}
for (Cell expectedCell : cells) {
+ expectedCellCount++;
byte[] family = CellUtil.cloneFamily(expectedCell);
byte[] qualifier = CellUtil.cloneQualifier(expectedCell);
Cell actualCell = getCell(actual, family, qualifier);
@@ -540,14 +542,6 @@
return false;
}
if (!CellUtil.matchingValue(actualCell, expectedCell)) {
- if (verifyType == IndexTool.IndexVerifyType.ONLY &&
- (Bytes.compareTo(family, indexMaintainer.getEmptyKeyValueFamily().copyBytesIfNecessary()) == 0 &&
- Bytes.compareTo(qualifier, indexMaintainer.getEmptyKeyValueQualifier()) == 0) &&
- (Bytes.compareTo(actualCell.getValueArray(), actualCell.getValueOffset(), actualCell.getValueLength(),
- UNVERIFIED_BYTES, 0, UNVERIFIED_BYTES.length) == 0)) {
- // We do not flag this as mismatch as we can have unverified but still valid rows
- continue;
- }
String errorMsg = "Not matching value (in iteration " + iteration + ") for " + Bytes.toString(family) + ":" + Bytes.toString(qualifier);
byte[] dataKey = indexMaintainer.buildDataRowKey(new ImmutableBytesWritable(expected.getRow()), viewConstants);
logToIndexToolOutputTable(dataKey, expected.getRow(), getTimestamp(expected), getTimestamp(actual),
@@ -556,6 +550,20 @@
}
}
}
+ int actualCellCount = 0;
+ for (List<Cell> cells : actual.getFamilyCellMap().values()) {
+ if (cells == null) {
+ continue;
+ }
+ actualCellCount += cells.size();
+ }
+ if (expectedCellCount != actualCellCount) {
+ String errorMsg = "Index has extra cells (in iteration " + iteration + ")";
+ byte[] dataKey = indexMaintainer.buildDataRowKey(new ImmutableBytesWritable(expected.getRow()), viewConstants);
+ logToIndexToolOutputTable(dataKey, expected.getRow(), getTimestamp(expected), getTimestamp(actual),
+ errorMsg);
+ return false;
+ }
return true;
}
@@ -639,39 +647,138 @@
}
return getMutationsWithSameTS(put, del);
}
+ /**
+ * In this method, the actual list is repaired in memory using the expected list which is actually the output of
+ * rebuilding the index table row. The result of this repair is used only for verification.
+ */
+ private void repairActualMutationList(List<Mutation> actualMutationList, List<Mutation> expectedMutationList)
+ throws IOException {
+ // Find the first (latest) actual unverified put mutation
+ List<Mutation> repairedMutationList = new ArrayList<>(expectedMutationList.size());
+ for (Mutation actual : actualMutationList) {
+ if (actual instanceof Put && !isVerified((Put) actual)) {
+ long ts = getTimestamp(actual);
+ int expectedIndex;
+ int expectedListSize = expectedMutationList.size();
+ for (expectedIndex = 0; expectedIndex < expectedListSize; expectedIndex++) {
+ if (getTimestamp(expectedMutationList.get(expectedIndex)) <= ts) {
+ if (expectedIndex > 0) {
+ expectedIndex--;
+ }
+ break;
+ }
+ }
+ if (expectedIndex == expectedListSize) {
+ continue;
+ }
+ for (; expectedIndex < expectedListSize; expectedIndex++) {
+ Mutation mutation = expectedMutationList.get(expectedIndex);
+ if (mutation instanceof Put) {
+ mutation = new Put((Put) mutation);
+ } else {
+ mutation = new Delete((Delete) mutation);
+ }
+ repairedMutationList.add(mutation);
+ }
+ // Since we repair the entire history, there is no need to more than once
+ break;
+ }
+ }
+ if (repairedMutationList.isEmpty()) {
+ return;
+ }
+ actualMutationList.addAll(repairedMutationList);
+ Collections.sort(actualMutationList, MUTATION_TS_DESC_COMPARATOR);
+ }
+
+ private void cleanUpActualMutationList(List<Mutation> actualMutationList)
+ throws IOException {
+ Iterator<Mutation> iterator = actualMutationList.iterator();
+ Mutation previous = null;
+ while (iterator.hasNext()) {
+ Mutation mutation = iterator.next();
+ if ((mutation instanceof Put && !isVerified((Put) mutation)) ||
+ (mutation instanceof Delete && isDeleteFamilyVersion(mutation))) {
+ iterator.remove();
+ } else {
+ if (previous != null && getTimestamp(previous) == getTimestamp(mutation) &&
+ ((previous instanceof Put && mutation instanceof Put) ||
+ previous instanceof Delete && mutation instanceof Delete)) {
+ iterator.remove();
+ } else {
+ previous = mutation;
+ }
+ }
+ }
+ }
/**
- * indexRow is the set of all cells of all the row version of an index row from the index table. These are actual
- * cells. We group these cells based on timestamp and type (put vs delete), and form the actual set of
- * index mutations. indexKeyToMutationMap is a map from an index row key to a set of mutations that are generated
- * using the rebuild process (i.e., by replaying raw data table mutations). These sets are sets of expected
- * index mutations, one set for each index row key. Since not all mutations in the index table have both phase
- * (i.e., pre and post data phase) mutations, we cannot compare actual index mutations with expected one by one
- * and expect to find them identical. We need to consider concurrent data mutation effects, data table row write
- * failures, post index write failures. Thus, we need to allow some expected and actual mutations to be skipped
- * during comparing actual mutations to index mutations.
+ * There are two types of verification: without repair and with repair. Without-repair verification is done before
+ * or after index rebuild. It is done before index rebuild to identify the rows to be rebuilt. It is done after
+ * index rebuild to verify the rows that have been rebuilt. With-repair verification can be done anytime using
+ * the “-v ONLY” option to check the consistency of the index table. Note that with-repair verification simulates
+ * read repair in-memory for the purpose of verification, but does not actually repair the data in the index.
*
- * The main idea for the verification algorithm used here is to match every expected verified put with an actual
- * put such that these two mutations are the same except that actual mutation can be unverified.
+ * Unverified Rows
*
- * Some background on why we can skip some of the actual unverified puts and delete markers due to concurrent data
- * table updates is as follows:
+ * For each mutable data table mutation during regular data table updates, two operations are done on the data table.
+ * One is to read the existing row state, and the second is to update the data table for this row. The processing of
+ * concurrent data mutations are serialized once for reading the existing row states, and then serialized again
+ * for updating the data table. In other words, they go through locking twice, i.e., [lock, read, unlock] and
+ * [lock, write, unlock]. Because of this two phase locking, for a pair of concurrent mutations (for the same row),
+ * the same row state can be read from the data table. This means the same existing index row can be made unverified
+ * twice with different timestamps, one for each concurrent mutation. These unverified mutations can be repaired
+ * from the data table later during HBase scans using the index read repair process. This is one of the reasons
+ * for having extra unverified rows in the index table. The other reason is the data table write failures.
+ * When a data table write fails, it leaves an unverified index row behind. These rows are never returned to clients,
+ * instead they are repaired, which means either they are rebuilt from their data table rows or they are deleted if
+ * their data table rows do not exist.
*
- * For each data table mutation, two operations are done on the data table. One is to read the existing row state,
- * and the second is to write to the data table. The processing of concurrent data mutations are serialized once
- * for reading the existing row states, and then serialized again for updating data table. In other words,
- * they go through locking twice, i.e., [lock, read, unlock] and [lock, write, unlock]. Because of this two phase
- * locking, for a pair of concurrent mutations (for the same row), the same row state can be read from the data
- * table. This means the same existing index row can be made unverified twice with different timestamps, one for
- * each concurrent mutation. These unverified mutation are then repaired from the data table. Since expected
- * mutations are used for rebuild (which is also used by the read repair), skipping these unverified put mutations
- * that are not matching with expected mutation are safe as they will go through the same process during
- * read repair and will be skipped and eventually cleaned up by the read repair. We can skip the delete markers
- * safely too as they are placed to clean up these unverified mutations. When the data table rows are rebuilt,
- * the rebuild process generates the delete family markers. The timestamp of delete markers are the timestamp of
- * the data table mutation for which the delete marker is added. Thus, the timestamp of these delete markers will be
- * higher than the timestamp of index row to be deleted.
+ * Delete Family Version Markers
+ *
+ * The family version delete markers are generated by the read repair to remove extra unverified rows. They only
+ * show up in the actual mutation list since they are not generated for regular table updates or index rebuilds.
+ * For the verification purpose, these delete markers can be treated as extra unverified rows and can be safely
+ * skipped.
+ *
+ * Delete Family Markers
+ * Delete family markers are generated during read repair, regular table updates and index rebuilds to delete index
+ * table rows. The read repair generates them to delete extra unverified rows. During regular table updates or
+ * index rebuilds, the delete family markers are used to delete index rows due to data table row deletes or
+ * data table row overwrites.
+ *
+ * Verification Algorithm
+ *
+ * IndexTool verification generates an expected list of index mutations from the data table rows and uses this list
+ * to check if index table rows are consistent with the data table.
+ *
+ * The expect list is generated using the index rebuild algorithm. This mean for a given row, the list can include
+ * a number of put and delete mutations such that the followings hold:
+ *
+ * Every mutation will include a set of cells with the same timestamp
+ * Every mutation has a different timestamp
+ * A delete mutation will include only delete family cells and it is for deleting the entire row and its versions
+ * Every put mutation is verified
+ *
+ * For both verification types, after the expected list of index mutations is constructed for a given data table,
+ * another list called the actual list of index mutations is constructed by reading the index table row using HBase
+ * raw scan and all versions of the cells of the row are retrieved.
+ *
+ * As in the construction for the expected list, the cells are grouped into a put and a delete set. The put and
+ * delete sets for a given row are further grouped based on their timestamps into put and delete mutations such that
+ * all the cells in a mutation have the timestamps. The put and delete mutations are then sorted within a single
+ * list. Mutations in this list are sorted in ascending order of their timestamp. This list is the actual list.
+ *
+ * For the without-repair verification, unverified mutations and family version delete markers are removed from
+ * the actual list and then the list is compared with the expected list.
+ *
+ * In case of the with-repair verification, the actual list is first repaired, then unverified mutations and family
+ * version delete markers are removed from the actual list and finally the list is compared with the expected list.
+ *
+ * The actual list is repaired as follows: Every unverified mutation is repaired using the method read repair uses.
+ * However, instead of going through actual repair implementation, the expected mutations are used for repair.
*/
+
@VisibleForTesting
public boolean verifySingleIndexRow(Result indexRow, IndexToolVerificationResult.PhaseResult verificationPhaseResult)
throws IOException {
@@ -685,6 +792,10 @@
}
Collections.sort(expectedMutationList, MUTATION_TS_DESC_COMPARATOR);
Collections.sort(actualMutationList, MUTATION_TS_DESC_COMPARATOR);
+ if (verifyType == IndexTool.IndexVerifyType.ONLY) {
+ repairActualMutationList(actualMutationList, expectedMutationList);
+ }
+ cleanUpActualMutationList(actualMutationList);
long currentTime = EnvironmentEdgeManager.currentTime();
int actualIndex = 0;
int expectedIndex = 0;
@@ -708,49 +819,30 @@
}
actual = actualMutationList.get(actualIndex);
if (expected instanceof Put) {
- if (previousExpected == null || previousExpected instanceof Put) {
- // This expected put is either the first mutation or a put just comes after another expected mutation
- // on the expected mutation list which is sorted by the mutation timestamps. The cell timestamps
- // within each mutation here are the same.
- // Go down the list of actual mutations and find the corresponding actual put mutation with the same
- // timestamp. Stop if a verified put or delete family mutation is encountered on the way. Skip
- // unverified puts or delete family version delete markers.
- while (getTimestamp(actual) > getTimestamp(expected) &&
- ((actual instanceof Put && !isVerified((Put) actual)) ||
- (actual instanceof Delete && isDeleteFamilyVersion(actual)))) {
+ if (previousExpected instanceof Delete) {
+ // Between an expected delete and put, there can be one or more deletes due to
+ // concurrent mutations or data table write failures. Skip all of them if any
+ while (getTimestamp(actual) > getTimestamp(expected) && (actual instanceof Delete)) {
actualIndex++;
if (actualIndex == actualSize) {
break;
}
actual = actualMutationList.get(actualIndex);
}
- } else { // previousExpected instanceof Delete
- // Between an expected delete and put, there cannot be any types of mutation even verified put
- while (getTimestamp(actual) > getTimestamp(expected)) {
- actualIndex++;
- if (actualIndex == actualSize) {
- break;
- }
- actual = actualMutationList.get(actualIndex);
+ if (actualIndex == actualSize) {
+ break;
}
}
- if (actualIndex == actualSize) {
- break;
- }
- // Now the expected and actual mutations should match
if (isMatchingMutation(expected, actual, expectedIndex)) {
expectedIndex++;
actualIndex++;
matchingCount++;
continue;
}
- verificationPhaseResult.invalidIndexRowCount++;
- return false;
} else { // expected instanceof Delete
- // Between put and delete, delete and delete, or before first delete, there can be other deletes and
- // unverified puts. Skip all of them if any
- while (getTimestamp(actual) > getTimestamp(expected) &&
- ((actual instanceof Put && !isVerified((Put) actual)) || actual instanceof Delete)) {
+ // Between put and delete, delete and delete, or before first delete, there can be other deletes.
+ // Skip all of them if any
+ while (getTimestamp(actual) > getTimestamp(expected) && actual instanceof Delete) {
actualIndex++;
if (actualIndex == actualSize) {
break;
@@ -760,34 +852,34 @@
if (actualIndex == actualSize) {
break;
}
- // If this is first expected mutation is delete, there should be an actual delete mutation with the
- // same timestamp or an unverified put with the same or older timestamp
if (getTimestamp(actual) == getTimestamp(expected) &&
(actual instanceof Delete && isDeleteFamily(actual))) {
expectedIndex++;
actualIndex++;
matchingCount++;
continue;
- } else if (getTimestamp(actual) <= getTimestamp(expected) &&
- (actual instanceof Put && !isVerified((Put) actual))) {
- expectedIndex++;
- if (previousExpected == null) {
- matchingCount++;
- }
- continue;
}
- if (previousExpected == null) {
- String errorMsg = "First delete check failure";
- byte[] dataKey = indexMaintainer.buildDataRowKey(new ImmutableBytesWritable(indexRow.getRow()), viewConstants);
- logToIndexToolOutputTable(dataKey, indexRow.getRow(),
- getTimestamp(expected),
- getTimestamp(actual), errorMsg);
- verificationPhaseResult.invalidIndexRowCount++;
- return false;
- }
+ String errorMsg = "Delete check failure";
+ byte[] dataKey = indexMaintainer.buildDataRowKey(new ImmutableBytesWritable(indexRow.getRow()), viewConstants);
+ logToIndexToolOutputTable(dataKey, indexRow.getRow(),
+ getTimestamp(expected),
+ getTimestamp(actual), errorMsg);
}
+ verificationPhaseResult.invalidIndexRowCount++;
+ return false;
}
if ((expectedIndex != expectedSize) || actualIndex != actualSize) {
+ for (; expectedIndex < expectedSize; expectedIndex++) {
+ expected = expectedMutationList.get(expectedIndex);
+ // Check if cell expired as per the current server's time and data table ttl
+ // Index table should have the same ttl as the data table, hence we might not
+ // get a value back from index if it has already expired between our rebuild and
+ // verify
+ // TODO: have a metric to update for these cases
+ if (isTimestampBeforeTTL(currentTime, getTimestamp(expected))) {
+ verificationPhaseResult.expiredIndexRowCount++;
+ }
+ }
if (matchingCount > 0) {
if (verifyType != IndexTool.IndexVerifyType.ONLY) {
// We do not consider this as a verification issue but log it for further information.
@@ -803,8 +895,7 @@
byte[] dataKey = indexMaintainer.buildDataRowKey(new ImmutableBytesWritable(indexRow.getRow()), viewConstants);
String errorMsg = "Not matching index row";
logToIndexToolOutputTable(dataKey, indexRow.getRow(),
- getTimestamp(expectedMutationList.get(0)),
- getTimestamp(actualMutationList.get(0)), errorMsg);
+ getTimestamp(expectedMutationList.get(0)), 0L, errorMsg);
verificationPhaseResult.invalidIndexRowCount++;
return false;
}
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/UngroupedAggregateRegionObserver.java b/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/UngroupedAggregateRegionObserver.java
index fa42ce9..76ef06d 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/UngroupedAggregateRegionObserver.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/UngroupedAggregateRegionObserver.java
@@ -1060,7 +1060,7 @@
private RegionScanner rebuildIndices(final RegionScanner innerScanner, final Region region, final Scan scan,
final RegionCoprocessorEnvironment env) throws IOException {
- if (!scan.isRaw() && scan.getAttribute(BaseScannerRegionObserver.INDEX_ROW_KEY) == null) {
+ if (!scan.isRaw()) {
Scan rawScan = new Scan(scan);
rawScan.setRaw(true);
rawScan.setMaxVersions();
diff --git a/phoenix-core/src/test/java/org/apache/phoenix/index/VerifySingleIndexRowTest.java b/phoenix-core/src/test/java/org/apache/phoenix/index/VerifySingleIndexRowTest.java
index 2506609..013578b 100644
--- a/phoenix-core/src/test/java/org/apache/phoenix/index/VerifySingleIndexRowTest.java
+++ b/phoenix-core/src/test/java/org/apache/phoenix/index/VerifySingleIndexRowTest.java
@@ -41,6 +41,7 @@
import org.apache.phoenix.util.PhoenixRuntime;
import org.apache.phoenix.util.SchemaUtil;
import org.junit.Before;
+import org.junit.Ignore;
import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.ExpectedException;
@@ -542,7 +543,7 @@
case INVALID_CELL_VALUE:
if (CellUtil.matchingQualifier(c, EMPTY_COLUMN_BYTES)) {
newCell = getCellWithPut(c);
- emptyCell = getUnverifiedEmptyCell(c);
+ emptyCell = getVerifiedEmptyCell(c);
newCellList.add(newCell);
newCellList.add(emptyCell);
} else {
@@ -562,24 +563,24 @@
break;
case INVALID_EXTRA_CELL:
newCell = getCellWithPut(c);
- emptyCell = getUnverifiedEmptyCell(c);
+ emptyCell = getVerifiedEmptyCell(c);
newCellList.add(newCell);
newCellList.add(emptyCell);
newCellList.add(c);
}
}
- private Cell getUnverifiedEmptyCell(Cell c) {
+ private Cell getVerifiedEmptyCell(Cell c) {
return CellUtil.createCell(CellUtil.cloneRow(c), CellUtil.cloneFamily(c),
indexMaintainer.getEmptyKeyValueQualifier(),
EnvironmentEdgeManager.currentTimeMillis(),
- KeyValue.Type.Put.getCode(), UNVERIFIED_BYTES);
+ KeyValue.Type.Put.getCode(), VERIFIED_BYTES);
}
private Cell getCellWithPut(Cell c) {
return CellUtil.createCell(CellUtil.cloneRow(c),
CellUtil.cloneFamily(c), Bytes.toBytes(INCLUDED_COLUMN),
- EnvironmentEdgeManager.currentTimeMillis(), KeyValue.Type.Put.getCode(),
+ c.getTimestamp(), KeyValue.Type.Put.getCode(),
Bytes.toBytes("zxcv"));
}