PHOENIX-5951 - Index rebuild output logging for past-max-lookback rows should be configurable
diff --git a/phoenix-core/src/it/java/org/apache/phoenix/end2end/index/IndexVerificationOutputRepositoryIT.java b/phoenix-core/src/it/java/org/apache/phoenix/end2end/index/IndexVerificationOutputRepositoryIT.java
index f24f49d..5731c6a 100644
--- a/phoenix-core/src/it/java/org/apache/phoenix/end2end/index/IndexVerificationOutputRepositoryIT.java
+++ b/phoenix-core/src/it/java/org/apache/phoenix/end2end/index/IndexVerificationOutputRepositoryIT.java
@@ -31,6 +31,7 @@
import org.apache.phoenix.jdbc.PhoenixConnection;
import org.apache.phoenix.mapreduce.index.IndexTool;
import org.apache.phoenix.mapreduce.index.IndexVerificationOutputRepository;
+import org.apache.phoenix.mapreduce.index.IndexVerificationOutputRepository.IndexVerificationErrorType;
import org.apache.phoenix.mapreduce.index.IndexVerificationOutputRow;
import org.apache.phoenix.query.ConnectionQueryServices;
import org.apache.phoenix.util.EnvironmentEdgeManager;
@@ -53,6 +54,9 @@
import java.util.Map;
import static org.apache.phoenix.coprocessor.MetaDataProtocol.DEFAULT_LOG_TTL;
+import static org.apache.phoenix.mapreduce.index.IndexVerificationOutputRepository.IndexVerificationErrorType.BEYOND_MAX_LOOKBACK_INVALID;
+import static org.apache.phoenix.mapreduce.index.IndexVerificationOutputRepository.IndexVerificationErrorType.BEYOND_MAX_LOOKBACK_MISSING;
+import static org.apache.phoenix.mapreduce.index.IndexVerificationOutputRepository.IndexVerificationErrorType.INVALID_ROW;
import static org.apache.phoenix.mapreduce.index.IndexVerificationOutputRepository.OUTPUT_TABLE_NAME_BYTES;
import static org.apache.phoenix.mapreduce.index.IndexVerificationOutputRepository.PHASE_AFTER_VALUE;
import static org.apache.phoenix.mapreduce.index.IndexVerificationOutputRepository.PHASE_BEFORE_VALUE;
@@ -90,21 +94,23 @@
long scanMaxTs = EnvironmentEdgeManager.currentTimeMillis();
outputRepository.logToIndexToolOutputTable(dataRowKey, indexRowKey, dataRowTs,
indexRowTs, expectedErrorMessage, expectedValue, actualValue,
- scanMaxTs, tableNameBytes, true);
+ scanMaxTs, tableNameBytes, true,
+ INVALID_ROW);
//now increment the scan time by 1 and do it again
outputRepository.logToIndexToolOutputTable(dataRowKey, indexRowKey, dataRowTs,
indexRowTs, expectedErrorMessage, expectedValue, actualValue,
- scanMaxTs +1, tableNameBytes, false);
+ scanMaxTs +1, tableNameBytes, false,
+ INVALID_ROW);
//make sure we only get the first row back
IndexVerificationOutputRow expectedRow = buildVerificationRow(dataRowKey, indexRowKey, dataRowTs,
indexRowTs, expectedErrorMessage, expectedValue, actualValue,
- scanMaxTs, tableNameBytes, indexNameBytes, PHASE_BEFORE_VALUE);
+ scanMaxTs, tableNameBytes, indexNameBytes, PHASE_BEFORE_VALUE, INVALID_ROW);
verifyOutputRow(outputRepository, scanMaxTs, indexNameBytes, expectedRow);
//make sure we get the second row back
IndexVerificationOutputRow secondExpectedRow = buildVerificationRow(dataRowKey,
indexRowKey, dataRowTs,
indexRowTs, expectedErrorMessage, expectedValue, actualValue,
- scanMaxTs + 1, tableNameBytes, indexNameBytes, PHASE_AFTER_VALUE);
+ scanMaxTs + 1, tableNameBytes, indexNameBytes, PHASE_AFTER_VALUE, INVALID_ROW);
verifyOutputRow(outputRepository, scanMaxTs+1, indexNameBytes, secondExpectedRow);
}
@@ -128,7 +134,8 @@
TestUtil.assertTableHasTtl(conn, TableName.valueOf(OUTPUT_TABLE_NAME_BYTES), DEFAULT_LOG_TTL);
outputRepository.logToIndexToolOutputTable(mockStringBytes, mockStringBytes,
1, 2, mockString, mockStringBytes, mockStringBytes,
- EnvironmentEdgeManager.currentTimeMillis(), mockStringBytes, true);
+ EnvironmentEdgeManager.currentTimeMillis(), mockStringBytes, true,
+ INVALID_ROW);
Assert.assertEquals(1, TestUtil.getRowCount(hTable, false));
@@ -147,7 +154,7 @@
IndexTool.IndexDisableLoggingType disableLoggingVerifyType = IndexTool.IndexDisableLoggingType.BEFORE;
boolean expectedBefore = false;
boolean expectedAfter = true;
- verifyDisableLogging(disableLoggingVerifyType, expectedBefore, expectedAfter);
+ verifyDisableLogging(disableLoggingVerifyType, expectedBefore, expectedAfter, INVALID_ROW);
}
@Test
@@ -155,7 +162,7 @@
IndexTool.IndexDisableLoggingType disableLoggingVerifyType = IndexTool.IndexDisableLoggingType.AFTER;
boolean expectedBefore = true;
boolean expectedAfter = false;
- verifyDisableLogging(disableLoggingVerifyType, expectedBefore, expectedAfter);
+ verifyDisableLogging(disableLoggingVerifyType, expectedBefore, expectedAfter, INVALID_ROW);
}
@Test
@@ -163,7 +170,7 @@
IndexTool.IndexDisableLoggingType disableLoggingVerifyType = IndexTool.IndexDisableLoggingType.BOTH;
boolean expectedBefore = false;
boolean expectedAfter = false;
- verifyDisableLogging(disableLoggingVerifyType, expectedBefore, expectedAfter);
+ verifyDisableLogging(disableLoggingVerifyType, expectedBefore, expectedAfter, INVALID_ROW);
}
@Test
@@ -171,16 +178,46 @@
IndexTool.IndexDisableLoggingType disableLoggingVerifyType = IndexTool.IndexDisableLoggingType.NONE;
boolean expectedBefore = true;
boolean expectedAfter = true;
- verifyDisableLogging(disableLoggingVerifyType, expectedBefore, expectedAfter);
+ verifyDisableLogging(disableLoggingVerifyType, expectedBefore, expectedAfter, INVALID_ROW);
}
- public void verifyDisableLogging(IndexTool.IndexDisableLoggingType disableLoggingVerifyType, boolean expectedBefore, boolean expectedAfter) throws SQLException, IOException {
+ @Test
+ public void testDisableLoggingBeyondMaxLookback() throws SQLException, IOException {
+ IndexTool.IndexDisableLoggingType disableLoggingVerifyType = IndexTool.IndexDisableLoggingType.NONE;
+ boolean expectedBefore = false;
+ boolean expectedAfter = false;
+ verifyDisableLogging(disableLoggingVerifyType, expectedBefore, expectedAfter,
+ BEYOND_MAX_LOOKBACK_INVALID, false);
+ verifyDisableLogging(disableLoggingVerifyType, expectedBefore, expectedAfter,
+ BEYOND_MAX_LOOKBACK_MISSING, false);
+
+ expectedBefore = true;
+ expectedAfter = true;
+ verifyDisableLogging(disableLoggingVerifyType, expectedBefore, expectedAfter,
+ BEYOND_MAX_LOOKBACK_INVALID, true);
+ verifyDisableLogging(disableLoggingVerifyType, expectedBefore, expectedAfter,
+ BEYOND_MAX_LOOKBACK_MISSING, true);
+ }
+
+ public void verifyDisableLogging(IndexTool.IndexDisableLoggingType disableLoggingVerifyType,
+ boolean expectedBefore, boolean expectedAfter,
+ IndexVerificationErrorType errorType) throws SQLException, IOException {
+ verifyDisableLogging(disableLoggingVerifyType, expectedBefore, expectedAfter, errorType,
+ true);
+ }
+
+ public void verifyDisableLogging(IndexTool.IndexDisableLoggingType disableLoggingVerifyType,
+ boolean expectedBefore, boolean expectedAfter,
+ IndexVerificationErrorType errorType,
+ boolean shouldLogBeyondMaxLookback) throws SQLException,
+ IOException {
Table mockOutputTable = Mockito.mock(Table.class);
Table mockIndexTable = Mockito.mock(Table.class);
when(mockIndexTable.getName()).thenReturn(TableName.valueOf("testDisableLoggingIndexName"));
IndexVerificationOutputRepository outputRepository =
new IndexVerificationOutputRepository(mockOutputTable,
mockIndexTable, disableLoggingVerifyType);
+ outputRepository.setShouldLogBeyondMaxLookback(shouldLogBeyondMaxLookback);
byte[] dataRowKey = Bytes.toBytes("dataRowKey");
byte[] indexRowKey = Bytes.toBytes("indexRowKey");
long dataRowTs = EnvironmentEdgeManager.currentTimeMillis();
@@ -192,9 +229,9 @@
byte[] tableName = Bytes.toBytes("testDisableLoggingTableName");
outputRepository.logToIndexToolOutputTable(dataRowKey, indexRowKey, dataRowTs, indexRowTs
- , errorMsg, expectedValue, actualValue, scanMaxTs, tableName, true);
+ , errorMsg, expectedValue, actualValue, scanMaxTs, tableName, true, errorType);
outputRepository.logToIndexToolOutputTable(dataRowKey, indexRowKey, dataRowTs, indexRowTs
- , errorMsg, expectedValue, actualValue, scanMaxTs, tableName, false);
+ , errorMsg, expectedValue, actualValue, scanMaxTs, tableName, false, errorType);
int expectedRowsLogged = 0;
if (expectedBefore && expectedAfter) {
expectedRowsLogged = 2;
@@ -222,7 +259,8 @@
long scanMaxTs,
byte[] tableNameBytes,
byte[] indexNameBytes,
- byte[] phaseBeforeValue) {
+ byte[] phaseBeforeValue,
+ IndexVerificationErrorType errorType) {
IndexVerificationOutputRow.IndexVerificationOutputRowBuilder builder =
new IndexVerificationOutputRow.IndexVerificationOutputRowBuilder();
return builder.setDataTableRowKey(dataRowKey).
@@ -239,6 +277,7 @@
setDataTableName(Bytes.toString(tableNameBytes)).
setIndexTableName(Bytes.toString(indexNameBytes)).
setPhaseValue(phaseBeforeValue).
+ setErrorType(errorType).
build();
}
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/IndexRebuildRegionScanner.java b/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/IndexRebuildRegionScanner.java
index 6f82db3..6f39837 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/IndexRebuildRegionScanner.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/IndexRebuildRegionScanner.java
@@ -20,6 +20,12 @@
import static org.apache.phoenix.hbase.index.IndexRegionObserver.UNVERIFIED_BYTES;
import static org.apache.phoenix.hbase.index.IndexRegionObserver.VERIFIED_BYTES;
import static org.apache.phoenix.hbase.index.IndexRegionObserver.removeEmptyColumn;
+import static org.apache.phoenix.hbase.index.write.AbstractParallelWriterIndexCommitter.INDEX_WRITER_KEEP_ALIVE_TIME_CONF_KEY;
+import static org.apache.phoenix.mapreduce.index.IndexVerificationOutputRepository.IndexVerificationErrorType.BEYOND_MAX_LOOKBACK_INVALID;
+import static org.apache.phoenix.mapreduce.index.IndexVerificationOutputRepository.IndexVerificationErrorType.BEYOND_MAX_LOOKBACK_MISSING;
+import static org.apache.phoenix.mapreduce.index.IndexVerificationOutputRepository.IndexVerificationErrorType.EXTRA_CELLS;
+import static org.apache.phoenix.mapreduce.index.IndexVerificationOutputRepository.IndexVerificationErrorType.INVALID_ROW;
+import static org.apache.phoenix.mapreduce.index.IndexVerificationOutputRepository.IndexVerificationErrorType.MISSING_ROW;
import static org.apache.phoenix.query.QueryConstants.AGG_TIMESTAMP;
import static org.apache.phoenix.query.QueryConstants.SINGLE_COLUMN;
import static org.apache.phoenix.query.QueryConstants.SINGLE_COLUMN_FAMILY;
@@ -40,6 +46,7 @@
import com.google.common.annotations.VisibleForTesting;
import com.google.common.collect.Lists;
+import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.Cell;
import org.apache.hadoop.hbase.CellUtil;
import org.apache.hadoop.hbase.DoNotRetryIOException;
@@ -47,6 +54,7 @@
import org.apache.hadoop.hbase.HRegionInfo;
import org.apache.hadoop.hbase.KeyValue;
import org.apache.hadoop.hbase.client.Delete;
+import org.apache.hadoop.hbase.client.Durability;
import org.apache.hadoop.hbase.client.Mutation;
import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.client.Result;
@@ -92,6 +100,12 @@
private static final Logger LOGGER = LoggerFactory.getLogger(IndexRebuildRegionScanner.class);
+ public static final String PHOENIX_INDEX_MR_LOG_BEYOND_MAX_LOOKBACK_ERRORS =
+ "phoenix.index.mr.log.beyond.max.lookback.errors";
+ public static final boolean DEFAULT_PHOENIX_INDEX_MR_LOG_BEYOND_MAX_LOOKBACK_ERRORS = false;
+ private boolean useProto = true;
+ private byte[] indexRowKey;
+ private IndexTool.IndexVerifyType verifyType = IndexTool.IndexVerifyType.NONE;
private static boolean ignoreIndexRebuildForTesting = false;
public static void setIgnoreIndexRebuildForTesting(boolean ignore) { ignoreIndexRebuildForTesting = ignore; }
private byte[] indexRowKeyforReadRepair;
@@ -123,6 +137,9 @@
}
}
if (verify) {
+ boolean shouldLogBeyondMaxLookbackInvalidRows =
+ env.getConfiguration().getBoolean(PHOENIX_INDEX_MR_LOG_BEYOND_MAX_LOOKBACK_ERRORS,
+ DEFAULT_PHOENIX_INDEX_MR_LOG_BEYOND_MAX_LOOKBACK_ERRORS);
viewConstants = IndexUtil.deserializeViewConstantsFromScan(scan);
byte[] disableLoggingValueBytes =
scan.getAttribute(BaseScannerRegionObserver.INDEX_REBUILD_DISABLE_LOGGING_VERIFY_TYPE);
@@ -133,6 +150,7 @@
verificationOutputRepository =
new IndexVerificationOutputRepository(indexMaintainer.getIndexTableName()
, hTableFactory, disableLoggingVerifyType);
+ verificationOutputRepository.setShouldLogBeyondMaxLookback(shouldLogBeyondMaxLookbackInvalidRows);
verificationResult = new IndexToolVerificationResult(scan);
verificationResultRepository =
new IndexVerificationResultRepository(indexMaintainer.getIndexTableName(), hTableFactory);
@@ -257,17 +275,19 @@
}
public void logToIndexToolOutputTable(byte[] dataRowKey, byte[] indexRowKey, long dataRowTs, long indexRowTs,
- String errorMsg, boolean isBeforeRebuild) throws IOException {
- logToIndexToolOutputTable(dataRowKey, indexRowKey, dataRowTs, indexRowTs, errorMsg, null, null, isBeforeRebuild);
+ String errorMsg, boolean isBeforeRebuild,
+ IndexVerificationOutputRepository.IndexVerificationErrorType errorType) throws IOException {
+ logToIndexToolOutputTable(dataRowKey, indexRowKey, dataRowTs, indexRowTs, errorMsg, null,
+ null, isBeforeRebuild, errorType);
}
@VisibleForTesting
public void logToIndexToolOutputTable(byte[] dataRowKey, byte[] indexRowKey, long dataRowTs, long indexRowTs,
- String errorMsg, byte[] expectedVaue, byte[] actualValue, boolean isBeforeRebuild)
- throws IOException {
+ String errorMsg, byte[] expectedVaue, byte[] actualValue, boolean isBeforeRebuild,
+ IndexVerificationOutputRepository.IndexVerificationErrorType errorType) throws IOException {
verificationOutputRepository.logToIndexToolOutputTable(dataRowKey, indexRowKey, dataRowTs, indexRowTs,
errorMsg, expectedVaue, actualValue, scan.getTimeRange().getMax(),
- region.getRegionInfo().getTable().getName(), isBeforeRebuild);
+ region.getRegionInfo().getTable().getName(), isBeforeRebuild, errorType);
}
private static Cell getCell(Mutation m, byte[] family, byte[] qualifier) {
@@ -288,7 +308,7 @@
String errorMsg = "Not matching timestamp";
byte[] dataKey = indexMaintainer.buildDataRowKey(new ImmutableBytesWritable(expected.getRow()), viewConstants);
logToIndexToolOutputTable(dataKey, expected.getRow(), getTimestamp(expected), getTimestamp(actual),
- errorMsg, null, null, isBeforeRebuild);
+ errorMsg, null, null, isBeforeRebuild, INVALID_ROW);
return;
}
int expectedCellCount = 0;
@@ -305,7 +325,8 @@
!CellUtil.matchingType(expectedCell, actualCell)) {
byte[] dataKey = indexMaintainer.buildDataRowKey(new ImmutableBytesWritable(expected.getRow()), viewConstants);
String errorMsg = "Missing cell (in iteration " + iteration + ") " + Bytes.toString(family) + ":" + Bytes.toString(qualifier);
- logToIndexToolOutputTable(dataKey, expected.getRow(), getTimestamp(expected), getTimestamp(actual), errorMsg,isBeforeRebuild);
+ logToIndexToolOutputTable(dataKey, expected.getRow(), getTimestamp(expected),
+ getTimestamp(actual), errorMsg, isBeforeRebuild, INVALID_ROW);
verificationPhaseResult.setIndexHasMissingCellsCount(verificationPhaseResult.getIndexHasMissingCellsCount() + 1);
return;
}
@@ -313,7 +334,8 @@
String errorMsg = "Not matching value (in iteration " + iteration + ") for " + Bytes.toString(family) + ":" + Bytes.toString(qualifier);
byte[] dataKey = indexMaintainer.buildDataRowKey(new ImmutableBytesWritable(expected.getRow()), viewConstants);
logToIndexToolOutputTable(dataKey, expected.getRow(), getTimestamp(expected), getTimestamp(actual),
- errorMsg, CellUtil.cloneValue(expectedCell), CellUtil.cloneValue(actualCell), isBeforeRebuild);
+ errorMsg, CellUtil.cloneValue(expectedCell),
+ CellUtil.cloneValue(actualCell), isBeforeRebuild, INVALID_ROW);
return;
}
}
@@ -329,7 +351,7 @@
String errorMsg = "Index has extra cells (in iteration " + iteration + ")";
byte[] dataKey = indexMaintainer.buildDataRowKey(new ImmutableBytesWritable(expected.getRow()), viewConstants);
logToIndexToolOutputTable(dataKey, expected.getRow(), getTimestamp(expected), getTimestamp(actual),
- errorMsg, isBeforeRebuild);
+ errorMsg, isBeforeRebuild, EXTRA_CELLS);
verificationPhaseResult.setIndexHasExtraCellsCount(verificationPhaseResult.getIndexHasExtraCellsCount() + 1);
}
}
@@ -719,7 +741,7 @@
actualSize);
logToIndexToolOutputTable(dataKey, indexRow.getRow(),
getTimestamp(expectedMutationList.get(expectedIndex)),
- 0, errorMsg, isBeforeRebuild);
+ 0, errorMsg, isBeforeRebuild, BEYOND_MAX_LOOKBACK_INVALID);
return false;
}
else {
@@ -730,7 +752,8 @@
byte[] dataKey = indexMaintainer.buildDataRowKey(new ImmutableBytesWritable(indexRow.getRow()), viewConstants);
String errorMsg = "Not matching index row";
logToIndexToolOutputTable(dataKey, indexRow.getRow(),
- getTimestamp(expectedMutationList.get(0)), 0L, errorMsg, isBeforeRebuild);
+ getTimestamp(expectedMutationList.get(0)), 0L, errorMsg, isBeforeRebuild,
+ INVALID_ROW);
}
verificationPhaseResult.setInvalidIndexRowCount(verificationPhaseResult.getInvalidIndexRowCount() + 1);
return false;
@@ -791,17 +814,21 @@
}
currentTime = EnvironmentEdgeManager.currentTimeMillis();
String errorMsg;
+ IndexVerificationOutputRepository.IndexVerificationErrorType errorType;
if (isTimestampBeyondMaxLookBack(maxLookBackInMills, currentTime, getTimestamp(mutation))){
errorMsg = ERROR_MESSAGE_MISSING_INDEX_ROW_BEYOND_MAX_LOOKBACK;
+ errorType = BEYOND_MAX_LOOKBACK_MISSING;
verificationPhaseResult.
setBeyondMaxLookBackMissingIndexRowCount(verificationPhaseResult.getBeyondMaxLookBackMissingIndexRowCount() + 1);
}
else {
errorMsg = ERROR_MESSAGE_MISSING_INDEX_ROW;
+ errorType = MISSING_ROW;
verificationPhaseResult.setMissingIndexRowCount(verificationPhaseResult.getMissingIndexRowCount() + 1);
}
byte[] dataKey = indexMaintainer.buildDataRowKey(new ImmutableBytesWritable(indexKey), viewConstants);
- logToIndexToolOutputTable(dataKey, indexKey, getTimestamp(mutation), 0, errorMsg, isBeforeRebuild);
+ logToIndexToolOutputTable(dataKey, indexKey, getTimestamp(mutation), 0, errorMsg,
+ isBeforeRebuild, errorType);
}
// Leave the invalid and missing rows in indexKeyToMutationMap
indexKeyToMutationMap.putAll(invalidIndexRows);
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/index/IndexVerificationOutputRepository.java b/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/index/IndexVerificationOutputRepository.java
index 3cf5446..549f876 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/index/IndexVerificationOutputRepository.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/index/IndexVerificationOutputRepository.java
@@ -52,6 +52,7 @@
private Table outputTable;
private IndexTool.IndexDisableLoggingType disableLoggingVerifyType =
IndexTool.IndexDisableLoggingType.NONE;
+ private boolean shouldLogBeyondMaxLookback = true;
public final static String OUTPUT_TABLE_NAME = "PHOENIX_INDEX_TOOL";
public final static byte[] OUTPUT_TABLE_NAME_BYTES = Bytes.toBytes(OUTPUT_TABLE_NAME);
@@ -71,6 +72,8 @@
public final static byte[] INDEX_TABLE_TS_BYTES = Bytes.toBytes(INDEX_TABLE_TS);
public final static String ERROR_MESSAGE = "Error";
public final static byte[] ERROR_MESSAGE_BYTES = Bytes.toBytes(ERROR_MESSAGE);
+ public static final String ERROR_TYPE = "ErrorType";
+ public static final byte[] ERROR_TYPE_BYTES = Bytes.toBytes(ERROR_TYPE);
public static String VERIFICATION_PHASE = "Phase";
public final static byte[] VERIFICATION_PHASE_BYTES = Bytes.toBytes(VERIFICATION_PHASE);
@@ -85,6 +88,15 @@
public static final byte[] PHASE_BEFORE_VALUE = Bytes.toBytes("BEFORE");
public static final byte[] PHASE_AFTER_VALUE = Bytes.toBytes("AFTER");
+
+ public enum IndexVerificationErrorType {
+ INVALID_ROW,
+ MISSING_ROW,
+ EXTRA_CELLS,
+ BEYOND_MAX_LOOKBACK_INVALID,
+ BEYOND_MAX_LOOKBACK_MISSING,
+ UNKNOWN
+ }
/**
* Only usable for the create table / read path or for testing. Use setOutputTable and
* setIndexTable first to write.
@@ -117,6 +129,10 @@
this.disableLoggingVerifyType = disableLoggingVerifyType;
}
+ public void setShouldLogBeyondMaxLookback(boolean shouldLogBeyondMaxLookback) {
+ this.shouldLogBeyondMaxLookback = shouldLogBeyondMaxLookback;
+ }
+
public static byte[] generateOutputTableRowKey(long ts, byte[] indexTableName, byte[] dataRowKey ) {
byte[] keyPrefix = Bytes.toBytes(Long.toString(ts));
byte[] rowKey;
@@ -173,9 +189,11 @@
public void logToIndexToolOutputTable(byte[] dataRowKey, byte[] indexRowKey, long dataRowTs,
long indexRowTs,
String errorMsg, byte[] expectedValue, byte[] actualValue,
- long scanMaxTs, byte[] tableName, boolean isBeforeRebuild)
+ long scanMaxTs, byte[] tableName,
+ boolean isBeforeRebuild,
+ IndexVerificationErrorType errorType)
throws IOException {
- if (shouldLogOutput(isBeforeRebuild)) {
+ if (shouldLogOutput(isBeforeRebuild, errorType)) {
byte[] rowKey = generateOutputTableRowKey(scanMaxTs, indexTable.getName().toBytes(), dataRowKey);
Put put = new Put(rowKey);
put.addColumn(OUTPUT_TABLE_COLUMN_FAMILY, DATA_TABLE_NAME_BYTES, tableName);
@@ -193,6 +211,8 @@
errorMessageBytes = Bytes.toBytes(errorMsg);
}
put.addColumn(OUTPUT_TABLE_COLUMN_FAMILY, ERROR_MESSAGE_BYTES, errorMessageBytes);
+ put.addColumn(OUTPUT_TABLE_COLUMN_FAMILY, ERROR_TYPE_BYTES,
+ Bytes.toBytes(errorType.toString()));
if (isBeforeRebuild) {
put.addColumn(OUTPUT_TABLE_COLUMN_FAMILY, VERIFICATION_PHASE_BYTES, PHASE_BEFORE_VALUE);
} else {
@@ -202,7 +222,12 @@
}
}
- public boolean shouldLogOutput(boolean isBeforeRebuild) {
+ public boolean shouldLogOutput(boolean isBeforeRebuild, IndexVerificationErrorType errorType) {
+ return shouldLogOutputForVerifyType(isBeforeRebuild) &&
+ shouldLogOutputForErrorType(errorType);
+ }
+
+ private boolean shouldLogOutputForVerifyType(boolean isBeforeRebuild) {
if (disableLoggingVerifyType.equals(IndexTool.IndexDisableLoggingType.BOTH)) {
return false;
}
@@ -219,6 +244,15 @@
return false;
}
+ private boolean shouldLogOutputForErrorType(IndexVerificationErrorType errorType) {
+ if (errorType != null &&
+ (errorType.equals(IndexVerificationErrorType.BEYOND_MAX_LOOKBACK_INVALID) ||
+ errorType.equals(IndexVerificationErrorType.BEYOND_MAX_LOOKBACK_MISSING))){
+ return shouldLogBeyondMaxLookback;
+ }
+ return true;
+ }
+
public static byte[] getErrorMessageBytes(String errorMsg, byte[] expectedValue, byte[] actualValue) {
byte[] errorMessageBytes;
errorMessageBytes = new byte[errorMsg.length() + expectedValue.length + actualValue.length +
@@ -297,6 +331,18 @@
builder.setExpectedValue(result.getValue(OUTPUT_TABLE_COLUMN_FAMILY, EXPECTED_VALUE_BYTES));
builder.setActualValue(result.getValue(OUTPUT_TABLE_COLUMN_FAMILY, ACTUAL_VALUE_BYTES));
builder.setPhaseValue(result.getValue(OUTPUT_TABLE_COLUMN_FAMILY, VERIFICATION_PHASE_BYTES));
+ IndexVerificationErrorType errorType;
+ try {
+ errorType =
+ IndexVerificationErrorType.valueOf(
+ Bytes.toString(result.getValue(OUTPUT_TABLE_COLUMN_FAMILY, ERROR_TYPE_BYTES)));
+ } catch (Throwable e) {
+ //in case we have a cast exception because an incompatible version of the enum produced
+ //the row, or an earlier version that didn't record error types, it's better to mark
+ // the error type unknown and move on rather than fail
+ errorType = IndexVerificationErrorType.UNKNOWN;
+ }
+ builder.setErrorType(errorType);
return builder.build();
}
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/index/IndexVerificationOutputRow.java b/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/index/IndexVerificationOutputRow.java
index 8c54796..4dad9b6 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/index/IndexVerificationOutputRow.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/index/IndexVerificationOutputRow.java
@@ -18,6 +18,7 @@
package org.apache.phoenix.mapreduce.index;
import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.phoenix.mapreduce.index.IndexVerificationOutputRepository.IndexVerificationErrorType;
import java.util.Arrays;
import java.util.Objects;
@@ -35,13 +36,14 @@
private byte[] expectedValue;
private byte[] actualValue;
private byte[] phaseValue;
+ private IndexVerificationErrorType errorType;
private IndexVerificationOutputRow(String dataTableName, String indexTableName,
byte[] dataTableRowKey, Long scanMaxTimestamp,
byte[] indexTableRowKey,
long dataTableRowTimestamp, long indexTableRowTimestamp,
String errorMessage, byte[] expectedValue, byte[] actualValue,
- byte[] phaseValue) {
+ byte[] phaseValue, IndexVerificationErrorType errorType) {
this.dataTableName = dataTableName;
this.indexTableName = indexTableName;
this.scanMaxTimestamp = scanMaxTimestamp;
@@ -53,6 +55,7 @@
this.expectedValue = expectedValue;
this.actualValue = actualValue;
this.phaseValue = phaseValue;
+ this.errorType = errorType;
}
public String getDataTableName() {
@@ -119,7 +122,8 @@
Objects.equals(errorMessage, otherRow.getErrorMessage()) &&
Arrays.equals(expectedValue, otherRow.getExpectedValue()) &&
Arrays.equals(actualValue, otherRow.getActualValue()) &&
- Arrays.equals(phaseValue, otherRow.getPhaseValue());
+ Arrays.equals(phaseValue, otherRow.getPhaseValue()) &&
+ Objects.equals(errorType, otherRow.getErrorType());
}
@Override
@@ -143,9 +147,14 @@
sb.append(IndexVerificationOutputRepository.ACTUAL_VALUE + ": ").append(Bytes.toString(actualValue)).append(
",");
sb.append(IndexVerificationOutputRepository.VERIFICATION_PHASE + ": ").append(Bytes.toString(phaseValue));
+ sb.append(IndexVerificationOutputRepository.ERROR_TYPE + ": " ).append(Objects.toString(errorType));
return sb.toString();
}
+ public IndexVerificationErrorType getErrorType() {
+ return errorType;
+ }
+
public static class IndexVerificationOutputRowBuilder {
private String dataTableName;
private String indexTableName;
@@ -158,6 +167,7 @@
private byte[] expectedValue;
private byte[] actualValue;
private byte[] phaseValue;
+ private IndexVerificationErrorType errorType;
public IndexVerificationOutputRowBuilder setDataTableName(String dataTableName) {
this.dataTableName = dataTableName;
@@ -214,10 +224,15 @@
return this;
}
+ public IndexVerificationOutputRowBuilder setErrorType(IndexVerificationErrorType errorType) {
+ this.errorType = errorType;
+ return this;
+ }
+
public IndexVerificationOutputRow build() {
return new IndexVerificationOutputRow(dataTableName, indexTableName, dataTableRowKey,
scanMaxTimestamp, indexTableRowKey, dataTableRowTimestamp, indexTableRowTimestamp,
- errorMessage, expectedValue, actualValue, phaseValue);
+ errorMessage, expectedValue, actualValue, phaseValue, errorType);
}
}
}
diff --git a/phoenix-core/src/test/java/org/apache/phoenix/index/VerifySingleIndexRowTest.java b/phoenix-core/src/test/java/org/apache/phoenix/index/VerifySingleIndexRowTest.java
index 1b8ed55..86f39b9 100644
--- a/phoenix-core/src/test/java/org/apache/phoenix/index/VerifySingleIndexRowTest.java
+++ b/phoenix-core/src/test/java/org/apache/phoenix/index/VerifySingleIndexRowTest.java
@@ -35,6 +35,7 @@
import org.apache.phoenix.coprocessor.IndexToolVerificationResult;
import org.apache.phoenix.hbase.index.IndexRegionObserver;
import org.apache.phoenix.jdbc.PhoenixConnection;
+import org.apache.phoenix.mapreduce.index.IndexVerificationOutputRepository;
import org.apache.phoenix.query.BaseConnectionlessQueryTest;
import org.apache.phoenix.query.QueryConstants;
import org.apache.phoenix.schema.PTable;
@@ -279,10 +280,13 @@
doNothing().when(rebuildScanner)
.logToIndexToolOutputTable(Matchers.<byte[]>any(),Matchers.<byte[]>any(),
Mockito.anyLong(),Mockito.anyLong(), Mockito.anyString(),
- Matchers.<byte[]>any(), Matchers.<byte[]>any(), Matchers.anyBoolean());
+ Matchers.<byte[]>any(), Matchers.<byte[]>any(), Matchers.anyBoolean(),
+ Mockito.<IndexVerificationOutputRepository.IndexVerificationErrorType>any());
doNothing().when(rebuildScanner)
.logToIndexToolOutputTable(Matchers.<byte[]>any(),Matchers.<byte[]>any(),
- Mockito.anyLong(),Mockito.anyLong(), Mockito.anyString(), Matchers.anyBoolean());
+ Mockito.anyLong(),Mockito.anyLong(), Mockito.anyString(),
+ Matchers.anyBoolean(),
+ Mockito.<IndexVerificationOutputRepository.IndexVerificationErrorType>any());
//populate the local map to use to create actual mutations
indexKeyToMutationMap = Maps.newTreeMap(Bytes.BYTES_COMPARATOR);