PHOENIX-5799 - Inline Index Verification Output API
diff --git a/phoenix-core/src/it/java/org/apache/phoenix/end2end/IndexToolIT.java b/phoenix-core/src/it/java/org/apache/phoenix/end2end/IndexToolIT.java
index 71cb530..4840047 100644
--- a/phoenix-core/src/it/java/org/apache/phoenix/end2end/IndexToolIT.java
+++ b/phoenix-core/src/it/java/org/apache/phoenix/end2end/IndexToolIT.java
@@ -44,6 +44,8 @@
import org.apache.hadoop.mapreduce.Job;
import org.apache.phoenix.coprocessor.IndexRebuildRegionScanner;
import org.apache.phoenix.hbase.index.IndexRegionObserver;
+import org.apache.phoenix.mapreduce.index.IndexVerificationOutputRepository;
+import org.apache.phoenix.mapreduce.index.IndexVerificationResultRepository;
import org.apache.phoenix.jdbc.PhoenixConnection;
import org.apache.phoenix.mapreduce.index.IndexTool;
import org.apache.phoenix.mapreduce.index.PhoenixIndexImportDirectMapper;
@@ -370,10 +372,11 @@
private void dropIndexToolTables(Connection conn) throws Exception {
Admin admin = conn.unwrap(PhoenixConnection.class).getQueryServices().getAdmin();
- TableName indexToolOutputTable = TableName.valueOf(IndexTool.OUTPUT_TABLE_NAME_BYTES);
+ TableName indexToolOutputTable =
+ TableName.valueOf(IndexVerificationOutputRepository.OUTPUT_TABLE_NAME_BYTES);
admin.disableTable(indexToolOutputTable);
admin.deleteTable(indexToolOutputTable);
- TableName indexToolResultTable = TableName.valueOf(IndexTool.RESULT_TABLE_NAME_BYTES);
+ TableName indexToolResultTable = TableName.valueOf(IndexVerificationResultRepository.RESULT_TABLE_NAME_BYTES);
admin.disableTable(indexToolResultTable);
admin.deleteTable(indexToolResultTable);
}
@@ -477,12 +480,13 @@
// This method verifies the common prefix, i.e., "timestamp | index table name | ", since the rest of the
// fields may include the separator key
- int offset = Bytes.indexOf(rowKey, IndexRebuildRegionScanner.ROW_KEY_SEPARATOR_BYTE);
+ int offset = Bytes.indexOf(rowKey, IndexVerificationResultRepository.ROW_KEY_SEPARATOR_BYTE);
offset++;
byte[] indexTableFullNameBytes = Bytes.toBytes(indexTableFullName);
assertEquals(Bytes.compareTo(rowKey, offset, indexTableFullNameBytes.length, indexTableFullNameBytes, 0,
indexTableFullNameBytes.length), 0);
- assertEquals(rowKey[offset + indexTableFullNameBytes.length], IndexRebuildRegionScanner.ROW_KEY_SEPARATOR_BYTE[0]);
+ assertEquals(rowKey[offset + indexTableFullNameBytes.length],
+ IndexVerificationResultRepository.ROW_KEY_SEPARATOR_BYTE[0]);
}
private Cell getErrorMessageFromIndexToolOutputTable(Connection conn, String dataTableFullName, String indexTableFullName)
@@ -490,7 +494,7 @@
byte[] indexTableFullNameBytes = Bytes.toBytes(indexTableFullName);
byte[] dataTableFullNameBytes = Bytes.toBytes(dataTableFullName);
Table hIndexTable = conn.unwrap(PhoenixConnection.class).getQueryServices()
- .getTable(IndexTool.OUTPUT_TABLE_NAME_BYTES);
+ .getTable(IndexVerificationOutputRepository.OUTPUT_TABLE_NAME_BYTES);
Scan scan = new Scan();
ResultScanner scanner = hIndexTable.getScanner(scan);
boolean dataTableNameCheck = false;
@@ -499,28 +503,30 @@
for (Result result = scanner.next(); result != null; result = scanner.next()) {
for (Cell cell : result.rawCells()) {
assertTrue(Bytes.compareTo(cell.getFamilyArray(), cell.getFamilyOffset(), cell.getFamilyLength(),
- IndexTool.OUTPUT_TABLE_COLUMN_FAMILY, 0,
- IndexTool.OUTPUT_TABLE_COLUMN_FAMILY.length) == 0);
+ IndexVerificationOutputRepository.OUTPUT_TABLE_COLUMN_FAMILY, 0,
+ IndexVerificationOutputRepository.OUTPUT_TABLE_COLUMN_FAMILY.length) == 0);
if (Bytes.compareTo(cell.getQualifierArray(), cell.getQualifierOffset(), cell.getQualifierLength(),
- IndexTool.DATA_TABLE_NAME_BYTES, 0, IndexTool.DATA_TABLE_NAME_BYTES.length) == 0) {
+ IndexVerificationOutputRepository.DATA_TABLE_NAME_BYTES, 0, IndexVerificationOutputRepository.DATA_TABLE_NAME_BYTES.length) == 0) {
dataTableNameCheck = true;
assertTrue(Bytes.compareTo(cell.getValueArray(), cell.getValueOffset(), cell.getValueLength(),
dataTableFullNameBytes, 0, dataTableFullNameBytes.length) == 0);
} else if (Bytes.compareTo(cell.getQualifierArray(), cell.getQualifierOffset(), cell.getQualifierLength(),
- IndexTool.INDEX_TABLE_NAME_BYTES, 0, IndexTool.INDEX_TABLE_NAME_BYTES.length) == 0) {
+ IndexVerificationOutputRepository.INDEX_TABLE_NAME_BYTES, 0, IndexVerificationOutputRepository.INDEX_TABLE_NAME_BYTES.length) == 0) {
indexTableNameCheck = true;
assertTrue(Bytes.compareTo(cell.getValueArray(), cell.getValueOffset(), cell.getValueLength(),
indexTableFullNameBytes, 0, indexTableFullNameBytes.length) == 0);
} else if (Bytes.compareTo(cell.getQualifierArray(), cell.getQualifierOffset(), cell.getQualifierLength(),
- IndexTool.ERROR_MESSAGE_BYTES, 0, IndexTool.ERROR_MESSAGE_BYTES.length) == 0) {
+ IndexVerificationOutputRepository.ERROR_MESSAGE_BYTES, 0, IndexVerificationOutputRepository.ERROR_MESSAGE_BYTES.length) == 0) {
errorMessageCell = cell;
}
}
}
- assertTrue(dataTableNameCheck && indexTableNameCheck && errorMessageCell != null);
+ assertTrue( "DataTableNameCheck was false", dataTableNameCheck);
+ assertTrue("IndexTableNameCheck was false", indexTableNameCheck);
+ assertTrue("Error message cell was null", errorMessageCell != null);
verifyIndexTableRowKey(CellUtil.cloneRow(errorMessageCell), indexTableFullName);
hIndexTable = conn.unwrap(PhoenixConnection.class).getQueryServices()
- .getTable(IndexTool.RESULT_TABLE_NAME_BYTES);
+ .getTable(IndexVerificationResultRepository.RESULT_TABLE_NAME_BYTES);
scan = new Scan();
scanner = hIndexTable.getScanner(scan);
Result result = scanner.next();
@@ -716,7 +722,7 @@
}
assertTrue(status.getFirst() == 0);
- TableName indexToolOutputTable = TableName.valueOf(IndexTool.OUTPUT_TABLE_NAME_BYTES);
+ TableName indexToolOutputTable = TableName.valueOf(IndexVerificationOutputRepository.OUTPUT_TABLE_NAME_BYTES);
admin.disableTable(indexToolOutputTable);
admin.deleteTable(indexToolOutputTable);
// Run the index tool using the only-verify option, verify it gives no mismatch
diff --git a/phoenix-core/src/it/java/org/apache/phoenix/end2end/index/IndexVerificationOutputRepositoryIT.java b/phoenix-core/src/it/java/org/apache/phoenix/end2end/index/IndexVerificationOutputRepositoryIT.java
new file mode 100644
index 0000000..0b67044
--- /dev/null
+++ b/phoenix-core/src/it/java/org/apache/phoenix/end2end/index/IndexVerificationOutputRepositoryIT.java
@@ -0,0 +1,172 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.phoenix.end2end.index;
+
+import org.apache.hadoop.hbase.client.Result;
+import org.apache.hadoop.hbase.client.ResultScanner;
+import org.apache.hadoop.hbase.client.Scan;
+import org.apache.hadoop.hbase.client.Table;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.phoenix.end2end.ParallelStatsDisabledIT;
+import org.apache.phoenix.jdbc.PhoenixConnection;
+import org.apache.phoenix.mapreduce.index.IndexVerificationOutputRepository;
+import org.apache.phoenix.mapreduce.index.IndexVerificationOutputRow;
+import org.apache.phoenix.util.EnvironmentEdgeManager;
+import org.apache.phoenix.util.ReadOnlyProps;
+import org.junit.BeforeClass;
+import org.junit.Test;
+
+import java.io.IOException;
+import java.sql.Connection;
+import java.sql.DriverManager;
+import java.sql.SQLException;
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+
+import static org.apache.phoenix.mapreduce.index.IndexVerificationOutputRepository.PHASE_AFTER_VALUE;
+import static org.apache.phoenix.mapreduce.index.IndexVerificationOutputRepository.PHASE_BEFORE_VALUE;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotNull;
+
+public class IndexVerificationOutputRepositoryIT extends ParallelStatsDisabledIT {
+
+ @BeforeClass
+ public static void setupClass() throws Exception {
+ Map<String, String> props = Collections.emptyMap();
+ setUpTestDriver(new ReadOnlyProps(props.entrySet().iterator()));
+ }
+
+ @Test
+ public void testReadIndexVerificationOutputRow() throws Exception {
+ String expectedErrorMessage = "I am an error message";
+ byte[] expectedValue = Bytes.toBytes("ab");
+ byte[] actualValue = Bytes.toBytes("ac");
+ try (Connection conn = DriverManager.getConnection(getUrl())) {
+ String tableName = "T" + generateUniqueName();
+ byte[] tableNameBytes = Bytes.toBytes(tableName);
+ String indexName = "I" + generateUniqueName();
+ createTableAndIndexes(conn, tableName, indexName);
+ byte[] indexNameBytes = Bytes.toBytes(indexName);
+ IndexVerificationOutputRepository outputRepository =
+ new IndexVerificationOutputRepository(indexNameBytes, conn);
+ outputRepository.createOutputTable(conn);
+ populateTable(conn, tableName);
+ byte[] dataRowKey = getRowKey(conn, tableNameBytes);
+ byte[] indexRowKey = getRowKey(conn, indexNameBytes);
+ long dataRowTs = getTimestamp(conn, tableNameBytes);
+ long indexRowTs = getTimestamp(conn, indexNameBytes);
+ long scanMaxTs = EnvironmentEdgeManager.currentTimeMillis();
+ outputRepository.logToIndexToolOutputTable(dataRowKey, indexRowKey, dataRowTs,
+ indexRowTs, expectedErrorMessage, expectedValue, actualValue,
+ scanMaxTs, tableNameBytes, true);
+ //now increment the scan time by 1 and do it again
+ outputRepository.logToIndexToolOutputTable(dataRowKey, indexRowKey, dataRowTs,
+ indexRowTs, expectedErrorMessage, expectedValue, actualValue,
+ scanMaxTs +1, tableNameBytes, false);
+ //make sure we only get the first row back
+ IndexVerificationOutputRow expectedRow = buildVerificationRow(dataRowKey, indexRowKey, dataRowTs,
+ indexRowTs, expectedErrorMessage, expectedValue, actualValue,
+ scanMaxTs, tableNameBytes, indexNameBytes, PHASE_BEFORE_VALUE);
+ verifyOutputRow(outputRepository, scanMaxTs, indexNameBytes, expectedRow);
+ //make sure we get the second row back
+ IndexVerificationOutputRow secondExpectedRow = buildVerificationRow(dataRowKey,
+ indexRowKey, dataRowTs,
+ indexRowTs, expectedErrorMessage, expectedValue, actualValue,
+ scanMaxTs + 1, tableNameBytes, indexNameBytes, PHASE_AFTER_VALUE);
+ verifyOutputRow(outputRepository, scanMaxTs+1, indexNameBytes, secondExpectedRow);
+ }
+
+ }
+
+ public void verifyOutputRow(IndexVerificationOutputRepository outputRepository, long scanMaxTs,
+ byte[] indexNameBytes, IndexVerificationOutputRow expectedRow)
+ throws IOException {
+ List<IndexVerificationOutputRow> actualRows =
+ outputRepository.getOutputRows(scanMaxTs, indexNameBytes);
+ assertNotNull(actualRows);
+ assertEquals(1, actualRows.size());
+ assertEquals(expectedRow, actualRows.get(0));
+ }
+
+ private IndexVerificationOutputRow buildVerificationRow(byte[] dataRowKey, byte[] indexRowKey,
+ long dataRowTs, long indexRowTs,
+ String expectedErrorMessage,
+ byte[] expectedValue, byte[] actualValue,
+ long scanMaxTs,
+ byte[] tableNameBytes,
+ byte[] indexNameBytes,
+ byte[] phaseBeforeValue) {
+ IndexVerificationOutputRow.IndexVerificationOutputRowBuilder builder =
+ new IndexVerificationOutputRow.IndexVerificationOutputRowBuilder();
+ return builder.setDataTableRowKey(dataRowKey).
+ setIndexTableRowKey(indexRowKey).
+ setScanMaxTimestamp(dataRowTs).
+ setDataTableRowTimestamp(dataRowTs).
+ setIndexTableRowTimestamp(indexRowTs).
+ setErrorMessage(Bytes.toString(
+ IndexVerificationOutputRepository.
+ getErrorMessageBytes(expectedErrorMessage, expectedValue, actualValue))).
+ setExpectedValue(expectedValue).
+ setActualValue(actualValue).
+ setScanMaxTimestamp(scanMaxTs).
+ setDataTableName(Bytes.toString(tableNameBytes)).
+ setIndexTableName(Bytes.toString(indexNameBytes)).
+ setPhaseValue(phaseBeforeValue).
+ build();
+ }
+
+ private byte[] getRowKey(Connection conn, byte[] tableNameBytes)
+ throws SQLException, IOException {
+ Scan scan = new Scan();
+ Table table =
+ conn.unwrap(PhoenixConnection.class).getQueryServices().getTable(tableNameBytes);
+ ResultScanner scanner = table.getScanner(scan);
+ Result r = scanner.next();
+ return r.getRow();
+ }
+
+ private long getTimestamp(Connection conn, byte[] tableNameBytes) throws SQLException,
+ IOException {
+ Scan scan = new Scan();
+ Table table =
+ conn.unwrap(PhoenixConnection.class).getQueryServices().getTable(tableNameBytes);
+ ResultScanner scanner = table.getScanner(scan);
+ Result r = scanner.next();
+ return r.listCells().get(0).getTimestamp();
+ }
+
+ private void createTable(Connection conn, String tableName) throws Exception {
+ conn.createStatement().execute("create table " + tableName +
+ " (id varchar(10) not null primary key, val1 varchar(10), val2 varchar(10), " +
+ "val3 varchar(10))");
+ }
+
+ private void populateTable(Connection conn, String tableName) throws Exception {
+ conn.createStatement().execute("upsert into " + tableName + " values ('a', 'ab', 'abc', 'abcd')");
+ conn.commit();
+ }
+
+ private void createTableAndIndexes(Connection conn, String dataTableName,
+ String indexTableName) throws Exception {
+ createTable(conn, dataTableName);
+ conn.createStatement().execute("CREATE INDEX " + indexTableName + " on " +
+ dataTableName + " (val1) include (val2, val3)");
+ conn.commit();
+ }
+}
diff --git a/phoenix-core/src/it/java/org/apache/phoenix/end2end/index/IndexVerificationResultRepositoryIT.java b/phoenix-core/src/it/java/org/apache/phoenix/end2end/index/IndexVerificationResultRepositoryIT.java
new file mode 100644
index 0000000..0ffd13a
--- /dev/null
+++ b/phoenix-core/src/it/java/org/apache/phoenix/end2end/index/IndexVerificationResultRepositoryIT.java
@@ -0,0 +1,142 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.phoenix.end2end.index;
+
+import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.phoenix.coprocessor.IndexToolVerificationResult;
+import org.apache.phoenix.end2end.ParallelStatsDisabledIT;
+import org.apache.phoenix.mapreduce.index.IndexTool;
+import org.apache.phoenix.mapreduce.index.IndexVerificationResultRepository;
+import org.apache.phoenix.util.EnvironmentEdgeManager;
+import org.apache.phoenix.util.ReadOnlyProps;
+import org.junit.BeforeClass;
+import org.junit.Test;
+
+import java.sql.Connection;
+import java.sql.DriverManager;
+import java.util.Collections;
+import java.util.Map;
+
+import static org.junit.Assert.assertArrayEquals;
+import static org.junit.Assert.assertEquals;
+
+public class IndexVerificationResultRepositoryIT extends ParallelStatsDisabledIT {
+
+ @BeforeClass
+ public static void setupClass() throws Exception {
+ Map<String, String> props = Collections.emptyMap();
+ setUpTestDriver(new ReadOnlyProps(props.entrySet().iterator()));
+ }
+
+ @Test
+ public void testReadResultRow() throws Exception {
+ String tableName = "T" + generateUniqueName();
+ String indexName = "I" + generateUniqueName();
+ byte[] indexNameBytes = Bytes.toBytes(indexName);
+ try (Connection conn = DriverManager.getConnection(getUrl())) {
+ createTableAndIndex(conn, tableName, indexName);
+ IndexVerificationResultRepository resultRepository =
+ new IndexVerificationResultRepository(conn, indexNameBytes);
+ resultRepository.createResultTable(conn);
+ byte[] regionOne = Bytes.toBytes("a.1.00000000000000000000");
+ byte[] regionTwo = Bytes.toBytes("a.2.00000000000000000000");
+ long scanMaxTs = EnvironmentEdgeManager.currentTimeMillis();
+ IndexToolVerificationResult expectedResult = getExpectedResult(scanMaxTs);
+ resultRepository.logToIndexToolResultTable(expectedResult, IndexTool.IndexVerifyType.BOTH,
+ regionOne);
+ resultRepository.logToIndexToolResultTable(expectedResult, IndexTool.IndexVerifyType.BOTH,
+ regionTwo);
+ IndexToolVerificationResult actualResult =
+ resultRepository.getVerificationResult(conn, scanMaxTs);
+ assertVerificationResult(expectedResult, actualResult);
+
+ }
+ }
+
+ private void assertVerificationResult(IndexToolVerificationResult expectedResult, IndexToolVerificationResult actualResult) {
+ assertEquals(expectedResult.getScanMaxTs(), actualResult.getScanMaxTs());
+ assertArrayEquals(expectedResult.getStartRow(), actualResult.getStartRow());
+ assertArrayEquals(actualResult.getStopRow(), actualResult.getStopRow());
+
+ //because we're combining two near-identical rows (same values, different region)
+ //we assert on 2x the expected value
+ assertEquals(2 * expectedResult.getBeforeRebuildExpiredIndexRowCount(),
+ actualResult.getBeforeRebuildExpiredIndexRowCount());
+ assertEquals(2 * expectedResult.getBeforeRebuildInvalidIndexRowCount(),
+ actualResult.getBeforeRebuildInvalidIndexRowCount());
+ assertEquals(2 * expectedResult.getBeforeRebuildMissingIndexRowCount(),
+ actualResult.getBeforeRebuildMissingIndexRowCount());
+ assertEquals(2 * expectedResult.getBeforeRebuildValidIndexRowCount(),
+ actualResult.getBeforeRebuildValidIndexRowCount());
+
+ assertEquals(2 * expectedResult.getAfterRebuildExpiredIndexRowCount(),
+ actualResult.getAfterRebuildExpiredIndexRowCount());
+ assertEquals(2 * expectedResult.getAfterRebuildInvalidIndexRowCount(),
+ actualResult.getAfterRebuildInvalidIndexRowCount());
+ assertEquals(2 * expectedResult.getAfterRebuildMissingIndexRowCount(),
+ actualResult.getAfterRebuildMissingIndexRowCount());
+ assertEquals(2 * expectedResult.getAfterRebuildValidIndexRowCount(),
+ actualResult.getAfterRebuildValidIndexRowCount());
+
+ assertEquals(2 * expectedResult.getScannedDataRowCount(),
+ actualResult.getScannedDataRowCount());
+ assertEquals(2 * expectedResult.getRebuiltIndexRowCount(),
+ actualResult.getRebuiltIndexRowCount());
+
+ }
+
+ private IndexToolVerificationResult getExpectedResult(long scanMaxTs) {
+ byte[] startRow = Bytes.toBytes("a");
+ byte[] stopRow = Bytes.toBytes("b");
+ IndexToolVerificationResult result = new IndexToolVerificationResult(startRow, stopRow,
+ scanMaxTs);
+ result.setScannedDataRowCount(1);
+ result.setRebuiltIndexRowCount(1);
+ IndexToolVerificationResult.PhaseResult before =
+ new IndexToolVerificationResult.PhaseResult();
+ populatePhaseResult(before);
+ IndexToolVerificationResult.PhaseResult after =
+ new IndexToolVerificationResult.PhaseResult();
+ populatePhaseResult(after);
+ result.setBefore(before);
+ result.setAfter(after);
+ return result;
+ }
+
+ private void populatePhaseResult(IndexToolVerificationResult.PhaseResult result){
+ result.setValidIndexRowCount(1);
+ result.setBeyondMaxLookBackInvalidIndexRowCount(1);
+ result.setBeyondMaxLookBackMissingIndexRowCount(1);
+ result.setExpiredIndexRowCount(1);
+ result.setInvalidIndexRowCount(1);
+ result.setMissingIndexRowCount(1);
+ }
+ private void createTable(Connection conn, String tableName) throws Exception {
+ conn.createStatement().execute("create table " + tableName +
+ " (id varchar(10) not null primary key, val1 varchar(10), val2 varchar(10), " +
+ "val3 varchar(10))");
+ }
+
+ private void createTableAndIndex(Connection conn, String dataTableName,
+ String indexTableName) throws Exception {
+ createTable(conn, dataTableName);
+ conn.createStatement().execute("CREATE INDEX " + indexTableName + " on " +
+ dataTableName + " (val1) include (val2, val3)");
+ conn.commit();
+ }
+}
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/IndexRebuildRegionScanner.java b/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/IndexRebuildRegionScanner.java
index 2bdaf1d..8b7e3f2 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/IndexRebuildRegionScanner.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/IndexRebuildRegionScanner.java
@@ -20,7 +20,6 @@
import static org.apache.phoenix.hbase.index.IndexRegionObserver.VERIFIED_BYTES;
import static org.apache.phoenix.hbase.index.IndexRegionObserver.removeEmptyColumn;
import static org.apache.phoenix.hbase.index.write.AbstractParallelWriterIndexCommitter.INDEX_WRITER_KEEP_ALIVE_TIME_CONF_KEY;
-import static org.apache.phoenix.mapreduce.index.IndexTool.*;
import static org.apache.phoenix.query.QueryConstants.AGG_TIMESTAMP;
import static org.apache.phoenix.query.QueryConstants.SINGLE_COLUMN;
import static org.apache.phoenix.query.QueryConstants.SINGLE_COLUMN_FAMILY;
@@ -82,6 +81,8 @@
import org.apache.phoenix.hbase.index.util.IndexManagementUtil;
import org.apache.phoenix.index.GlobalIndexChecker;
import org.apache.phoenix.index.IndexMaintainer;
+import org.apache.phoenix.mapreduce.index.IndexVerificationOutputRepository;
+import org.apache.phoenix.mapreduce.index.IndexVerificationResultRepository;
import org.apache.phoenix.index.PhoenixIndexCodec;
import org.apache.phoenix.mapreduce.index.IndexTool;
import org.apache.phoenix.query.KeyRange;
@@ -108,7 +109,6 @@
public static final String NO_EXPECTED_MUTATION = "No expected mutation";
public static final String
ACTUAL_MUTATION_IS_NULL_OR_EMPTY = "actualMutationList is null or empty";
- public static final byte[] ROW_KEY_SEPARATOR_BYTE = Bytes.toBytes("|");
private long pageSizeInRows = Long.MAX_VALUE;
private int rowCountPerTask;
private boolean hasMore;
@@ -125,8 +125,6 @@
private IndexMaintainer indexMaintainer;
private byte[] indexRowKey = null;
private Table indexHTable = null;
- private Table outputHTable = null;
- private Table resultHTable = null;
private IndexTool.IndexVerifyType verifyType = IndexTool.IndexVerifyType.NONE;
private boolean verify = false;
private Map<byte[], List<Mutation>> indexKeyToMutationMap;
@@ -135,7 +133,6 @@
private TaskBatch<Boolean> tasks;
private String exceptionMessage;
private UngroupedAggregateRegionObserver ungroupedAggregateRegionObserver;
- private RegionCoprocessorEnvironment env;
private HTableFactory hTableFactory;
private int indexTableTTL = 0;
private IndexToolVerificationResult verificationResult;
@@ -144,6 +141,8 @@
private int singleRowRebuildReturnCode;
private Map<byte[], NavigableSet<byte[]>> familyMap;
private byte[][] viewConstants;
+ private IndexVerificationResultRepository verificationResultRepository;
+ private IndexVerificationOutputRepository verificationOutputRepository;
private long maxLookBackInMills;
@VisibleForTesting
@@ -179,7 +178,6 @@
this.innerScanner = innerScanner;
this.region = region;
- this.env = env;
this.ungroupedAggregateRegionObserver = ungroupedAggregateRegionObserver;
indexRowKey = scan.getAttribute(BaseScannerRegionObserver.INDEX_ROW_KEY);
if (indexRowKey != null) {
@@ -188,7 +186,7 @@
}
byte[] valueBytes = scan.getAttribute(BaseScannerRegionObserver.INDEX_REBUILD_VERIFY_TYPE);
if (valueBytes != null) {
- verificationResult = new IndexToolVerificationResult();
+ verificationResult = new IndexToolVerificationResult(scan);
verifyType = IndexTool.IndexVerifyType.fromValue(valueBytes);
if (verifyType != IndexTool.IndexVerifyType.NONE) {
verify = true;
@@ -197,8 +195,10 @@
hTableFactory = ServerUtil.getDelegateHTableFactory(env, ServerUtil.ConnectionType.INDEX_WRITER_CONNECTION);
indexHTable = hTableFactory.getTable(new ImmutableBytesPtr(indexMaintainer.getIndexTableName()));
indexTableTTL = indexHTable.getTableDescriptor().getColumnFamilies()[0].getTimeToLive();
- outputHTable = hTableFactory.getTable(new ImmutableBytesPtr(IndexTool.OUTPUT_TABLE_NAME_BYTES));
- resultHTable = hTableFactory.getTable(new ImmutableBytesPtr(IndexTool.RESULT_TABLE_NAME_BYTES));
+ verificationResultRepository =
+ new IndexVerificationResultRepository(indexMaintainer.getIndexTableName(), hTableFactory);
+ verificationOutputRepository =
+ new IndexVerificationOutputRepository(indexMaintainer.getIndexTableName(), hTableFactory);
indexKeyToMutationMap = Maps.newTreeMap(Bytes.BYTES_COMPARATOR);
dataKeyToMutationMap = Maps.newTreeMap(Bytes.BYTES_COMPARATOR);
pool = new WaitForCompletionTaskRunner(ThreadPoolManager.getExecutor(
@@ -247,88 +247,21 @@
return false;
}
- private static byte[] generateResultTableRowKey(long ts, byte[] indexTableName, byte [] regionName,
- byte[] startRow, byte[] stopRow) {
- byte[] keyPrefix = Bytes.toBytes(Long.toString(ts));
- int targetOffset = 0;
- // The row key for the result table : timestamp | index table name | datable table region name |
- // scan start row | scan stop row
- byte[] rowKey = new byte[keyPrefix.length + ROW_KEY_SEPARATOR_BYTE.length + indexTableName.length +
- ROW_KEY_SEPARATOR_BYTE.length + regionName.length + ROW_KEY_SEPARATOR_BYTE.length +
- startRow.length + ROW_KEY_SEPARATOR_BYTE.length + stopRow.length];
- Bytes.putBytes(rowKey, targetOffset, keyPrefix, 0, keyPrefix.length);
- targetOffset += keyPrefix.length;
- Bytes.putBytes(rowKey, targetOffset, ROW_KEY_SEPARATOR_BYTE, 0, ROW_KEY_SEPARATOR_BYTE.length);
- targetOffset += ROW_KEY_SEPARATOR_BYTE.length;
- Bytes.putBytes(rowKey, targetOffset, indexTableName, 0, indexTableName.length);
- targetOffset += indexTableName.length;
- Bytes.putBytes(rowKey, targetOffset, ROW_KEY_SEPARATOR_BYTE, 0, ROW_KEY_SEPARATOR_BYTE.length);
- targetOffset += ROW_KEY_SEPARATOR_BYTE.length;
- Bytes.putBytes(rowKey, targetOffset, regionName, 0, regionName.length);
- targetOffset += regionName.length;
- Bytes.putBytes(rowKey, targetOffset, ROW_KEY_SEPARATOR_BYTE, 0, ROW_KEY_SEPARATOR_BYTE.length);
- targetOffset += ROW_KEY_SEPARATOR_BYTE.length;
- Bytes.putBytes(rowKey, targetOffset, startRow, 0, startRow.length);
- targetOffset += startRow.length;
- Bytes.putBytes(rowKey, targetOffset, ROW_KEY_SEPARATOR_BYTE, 0, ROW_KEY_SEPARATOR_BYTE.length);
- targetOffset += ROW_KEY_SEPARATOR_BYTE.length;
- Bytes.putBytes(rowKey, targetOffset, stopRow, 0, stopRow.length);
- return rowKey;
- }
- private void logToIndexToolResultTable() throws IOException {
- long scanMaxTs = scan.getTimeRange().getMax();
- byte[] rowKey = generateResultTableRowKey(scanMaxTs, indexHTable.getName().toBytes(),
- Bytes.toBytes(region.getRegionInfo().getRegionNameAsString()), scan.getStartRow(), scan.getStopRow());
- Put put = new Put(rowKey);
- put.addColumn(RESULT_TABLE_COLUMN_FAMILY, SCANNED_DATA_ROW_COUNT_BYTES,
- scanMaxTs, Bytes.toBytes(Long.toString(verificationResult.scannedDataRowCount)));
- put.addColumn(RESULT_TABLE_COLUMN_FAMILY, REBUILT_INDEX_ROW_COUNT_BYTES,
- scanMaxTs, Bytes.toBytes(Long.toString(verificationResult.rebuiltIndexRowCount)));
- if (verifyType == IndexTool.IndexVerifyType.BEFORE || verifyType == IndexTool.IndexVerifyType.BOTH ||
- verifyType == IndexTool.IndexVerifyType.ONLY) {
- put.addColumn(RESULT_TABLE_COLUMN_FAMILY, BEFORE_REBUILD_VALID_INDEX_ROW_COUNT_BYTES,
- scanMaxTs, Bytes.toBytes(Long.toString(verificationResult.before.validIndexRowCount)));
- put.addColumn(RESULT_TABLE_COLUMN_FAMILY, BEFORE_REBUILD_EXPIRED_INDEX_ROW_COUNT_BYTES,
- scanMaxTs, Bytes.toBytes(Long.toString(verificationResult.before.expiredIndexRowCount)));
- put.addColumn(RESULT_TABLE_COLUMN_FAMILY, BEFORE_REBUILD_MISSING_INDEX_ROW_COUNT_BYTES,
- scanMaxTs, Bytes.toBytes(Long.toString(verificationResult.before.missingIndexRowCount)));
- put.addColumn(RESULT_TABLE_COLUMN_FAMILY, BEFORE_REBUILD_INVALID_INDEX_ROW_COUNT_BYTES,
- scanMaxTs, Bytes.toBytes(Long.toString(verificationResult.before.invalidIndexRowCount)));
- put.addColumn(RESULT_TABLE_COLUMN_FAMILY, BEFORE_REBUILD_BEYOND_MAXLOOKBACK_MISSING_INDEX_ROW_COUNT_BYTES,
- scanMaxTs, Bytes.toBytes(Long.toString(verificationResult.before.beyondMaxLookBackMissingIndexRowCount)));
- put.addColumn(RESULT_TABLE_COLUMN_FAMILY, BEFORE_REBUILD_BEYOND_MAXLOOKBACK_INVALID_INDEX_ROW_COUNT_BYTES,
- scanMaxTs, Bytes.toBytes(Long.toString(verificationResult.before.beyondMaxLookBackInvalidIndexRowCount)));
- }
- if (verifyType == IndexTool.IndexVerifyType.AFTER || verifyType == IndexTool.IndexVerifyType.BOTH) {
- put.addColumn(RESULT_TABLE_COLUMN_FAMILY, AFTER_REBUILD_VALID_INDEX_ROW_COUNT_BYTES,
- scanMaxTs, Bytes.toBytes(Long.toString(verificationResult.after.validIndexRowCount)));
- put.addColumn(RESULT_TABLE_COLUMN_FAMILY, AFTER_REBUILD_EXPIRED_INDEX_ROW_COUNT_BYTES,
- scanMaxTs, Bytes.toBytes(Long.toString(verificationResult.after.expiredIndexRowCount)));
- put.addColumn(RESULT_TABLE_COLUMN_FAMILY, AFTER_REBUILD_MISSING_INDEX_ROW_COUNT_BYTES,
- scanMaxTs, Bytes.toBytes(Long.toString(verificationResult.after.missingIndexRowCount)));
- put.addColumn(RESULT_TABLE_COLUMN_FAMILY, AFTER_REBUILD_INVALID_INDEX_ROW_COUNT_BYTES,
- scanMaxTs, Bytes.toBytes(Long.toString(verificationResult.after.invalidIndexRowCount)));
- put.addColumn(RESULT_TABLE_COLUMN_FAMILY, AFTER_REBUILD_BEYOND_MAXLOOKBACK_MISSING_INDEX_ROW_COUNT_BYTES,
- scanMaxTs, Bytes.toBytes(Long.toString(verificationResult.after.beyondMaxLookBackMissingIndexRowCount)));
- put.addColumn(RESULT_TABLE_COLUMN_FAMILY, AFTER_REBUILD_BEYOND_MAXLOOKBACK_INVALID_INDEX_ROW_COUNT_BYTES,
- scanMaxTs, Bytes.toBytes(Long.toString(verificationResult.after.beyondMaxLookBackInvalidIndexRowCount)));
- }
- resultHTable.put(put);
- }
@Override
public void close() throws IOException {
innerScanner.close();
if (verify) {
try {
- logToIndexToolResultTable();
+ verificationResultRepository.logToIndexToolResultTable(verificationResult,
+ verifyType, region.getRegionInfo().getRegionName());
} finally {
this.pool.stop("IndexRebuildRegionScanner is closing");
hTableFactory.shutdown();
indexHTable.close();
- outputHTable.close();
- resultHTable.close();
+ verificationResultRepository.close();
+ verificationOutputRepository.close();
}
}
}
@@ -419,80 +352,18 @@
return true;
}
- @VisibleForTesting
public void logToIndexToolOutputTable(byte[] dataRowKey, byte[] indexRowKey, long dataRowTs, long indexRowTs,
- String errorMsg) throws IOException {
- logToIndexToolOutputTable(dataRowKey, indexRowKey, dataRowTs, indexRowTs,
- errorMsg, null, null);
-
- }
-
- private static byte[] generateOutputTableRowKey(long ts, byte[] indexTableName, byte[] dataRowKey ) {
- byte[] keyPrefix = Bytes.toBytes(Long.toString(ts));
- byte[] rowKey;
- int targetOffset = 0;
- // The row key for the output table : timestamp | index table name | data row key
- rowKey = new byte[keyPrefix.length + ROW_KEY_SEPARATOR_BYTE.length + indexTableName.length +
- ROW_KEY_SEPARATOR_BYTE.length + dataRowKey.length];
- Bytes.putBytes(rowKey, targetOffset, keyPrefix, 0, keyPrefix.length);
- targetOffset += keyPrefix.length;
- Bytes.putBytes(rowKey, targetOffset, ROW_KEY_SEPARATOR_BYTE, 0, ROW_KEY_SEPARATOR_BYTE.length);
- targetOffset += ROW_KEY_SEPARATOR_BYTE.length;
- Bytes.putBytes(rowKey, targetOffset, indexTableName, 0, indexTableName.length);
- targetOffset += indexTableName.length;
- Bytes.putBytes(rowKey, targetOffset, ROW_KEY_SEPARATOR_BYTE, 0, ROW_KEY_SEPARATOR_BYTE.length);
- targetOffset += ROW_KEY_SEPARATOR_BYTE.length;
- Bytes.putBytes(rowKey, targetOffset, dataRowKey, 0, dataRowKey.length);
- return rowKey;
+ String errorMsg) throws IOException {
+ logToIndexToolOutputTable(dataRowKey, indexRowKey, dataRowTs, indexRowTs, errorMsg, null, null);
}
@VisibleForTesting
public void logToIndexToolOutputTable(byte[] dataRowKey, byte[] indexRowKey, long dataRowTs, long indexRowTs,
- String errorMsg, byte[] expectedValue, byte[] actualValue) throws IOException {
- final byte[] E_VALUE_PREFIX_BYTES = Bytes.toBytes(" E:");
- final byte[] A_VALUE_PREFIX_BYTES = Bytes.toBytes(" A:");
- final int PREFIX_LENGTH = 3;
- final int TOTAL_PREFIX_LENGTH = 6;
- final byte[] PHASE_BEFORE_VALUE = Bytes.toBytes("BEFORE");
- final byte[] PHASE_AFTER_VALUE = Bytes.toBytes("AFTER");
- long scanMaxTs = scan.getTimeRange().getMax();
- byte[] rowKey = generateOutputTableRowKey(scanMaxTs, indexHTable.getName().toBytes(), dataRowKey);
- Put put = new Put(rowKey);
- put.addColumn(IndexTool.OUTPUT_TABLE_COLUMN_FAMILY, IndexTool.DATA_TABLE_NAME_BYTES,
- scanMaxTs, region.getRegionInfo().getTable().getName());
- put.addColumn(IndexTool.OUTPUT_TABLE_COLUMN_FAMILY, IndexTool.INDEX_TABLE_NAME_BYTES,
- scanMaxTs, indexMaintainer.getIndexTableName());
- put.addColumn(IndexTool.OUTPUT_TABLE_COLUMN_FAMILY, IndexTool.DATA_TABLE_TS_BYTES,
- scanMaxTs, Bytes.toBytes(Long.toString(dataRowTs)));
-
- put.addColumn(IndexTool.OUTPUT_TABLE_COLUMN_FAMILY, IndexTool.INDEX_TABLE_ROW_KEY_BYTES,
- scanMaxTs, indexRowKey);
- put.addColumn(IndexTool.OUTPUT_TABLE_COLUMN_FAMILY, IndexTool.INDEX_TABLE_TS_BYTES,
- scanMaxTs, Bytes.toBytes(Long.toString(indexRowTs)));
- byte[] errorMessageBytes;
- if (expectedValue != null) {
- errorMessageBytes = new byte[errorMsg.length() + expectedValue.length + actualValue.length +
- TOTAL_PREFIX_LENGTH];
- Bytes.putBytes(errorMessageBytes, 0, Bytes.toBytes(errorMsg), 0, errorMsg.length());
- int length = errorMsg.length();
- Bytes.putBytes(errorMessageBytes, length, E_VALUE_PREFIX_BYTES, 0, PREFIX_LENGTH);
- length += PREFIX_LENGTH;
- Bytes.putBytes(errorMessageBytes, length, expectedValue, 0, expectedValue.length);
- length += expectedValue.length;
- Bytes.putBytes(errorMessageBytes, length, A_VALUE_PREFIX_BYTES, 0, PREFIX_LENGTH);
- length += PREFIX_LENGTH;
- Bytes.putBytes(errorMessageBytes, length, actualValue, 0, actualValue.length);
-
- } else {
- errorMessageBytes = Bytes.toBytes(errorMsg);
- }
- put.addColumn(IndexTool.OUTPUT_TABLE_COLUMN_FAMILY, IndexTool.ERROR_MESSAGE_BYTES, scanMaxTs, errorMessageBytes);
- if (isBeforeRebuilt) {
- put.addColumn(IndexTool.OUTPUT_TABLE_COLUMN_FAMILY, IndexTool.VERIFICATION_PHASE_BYTES, scanMaxTs, PHASE_BEFORE_VALUE);
- } else {
- put.addColumn(IndexTool.OUTPUT_TABLE_COLUMN_FAMILY, IndexTool.VERIFICATION_PHASE_BYTES, scanMaxTs, PHASE_AFTER_VALUE);
- }
- outputHTable.put(put);
+ String errorMsg, byte[] expectedVaue, byte[] actualValue)
+ throws IOException {
+ verificationOutputRepository.logToIndexToolOutputTable(dataRowKey, indexRowKey, dataRowTs, indexRowTs,
+ errorMsg, expectedVaue, actualValue, scan.getTimeRange().getMax(),
+ region.getRegionInfo().getTable().getName(), isBeforeRebuilt);
}
private static long getMaxTimestamp(Mutation m) {
@@ -854,7 +725,7 @@
// verify
// TODO: have a metric to update for these cases
if (isTimestampBeforeTTL(currentTime, getTimestamp(expected))) {
- verificationPhaseResult.expiredIndexRowCount++;
+ verificationPhaseResult.setExpiredIndexRowCount(verificationPhaseResult.getExpiredIndexRowCount() + 1);
return true;
}
actual = actualMutationList.get(actualIndex);
@@ -907,7 +778,7 @@
if (expectedIndex == expectedSize ){
// every expected mutation has its matching one in the actual list.
- verificationPhaseResult.validIndexRowCount++;
+ verificationPhaseResult.setValidIndexRowCount(verificationPhaseResult.getValidIndexRowCount() + 1);
return true;
}
@@ -915,7 +786,8 @@
if (expectedIndex > 0) {
// if current expected index mutation is beyond max look back window, we only need to make sure its latest
// mutation is a matching one, as an SCN query is required.
- verificationPhaseResult.validIndexRowCount++;
+ verificationPhaseResult.
+ setValidIndexRowCount(verificationPhaseResult.getValidIndexRowCount() + 1);
return true;
}
@@ -924,7 +796,8 @@
// We report it as a failure, so "before" option can trigger the index rebuild for this row.
// This repair is required, when there is only one index row for a given data table row and the timestamp of that row
// can be beyond maxLookBack.
- verificationPhaseResult.beyondMaxLookBackInvalidIndexRowCount++;
+ verificationPhaseResult.
+ setBeyondMaxLookBackInvalidIndexRowCount(verificationPhaseResult.getBeyondMaxLookBackInvalidIndexRowCount() + 1);
byte[] dataKey = indexMaintainer.buildDataRowKey(new ImmutableBytesWritable(indexRow.getRow()), viewConstants);
String errorMsg = String.format("Expect %1$s mutations but got %2$s (beyond maxLookBack)",
expectedSize,
@@ -944,7 +817,7 @@
logToIndexToolOutputTable(dataKey, indexRow.getRow(),
getTimestamp(expectedMutationList.get(0)), 0L, errorMsg);
}
- verificationPhaseResult.invalidIndexRowCount++;
+ verificationPhaseResult.setInvalidIndexRowCount(verificationPhaseResult.getInvalidIndexRowCount() + 1);
return false;
}
}
@@ -999,7 +872,7 @@
List<Mutation> mutationList = indexKeyToMutationMap.get(key);
if (isTimestampBeforeTTL(currentTime, getTimestamp(mutationList.get(mutationList.size() - 1)))) {
itr.remove();
- verificationPhaseResult.expiredIndexRowCount++;
+ verificationPhaseResult.setExpiredIndexRowCount(verificationPhaseResult.getExpiredIndexRowCount() + 1);
}
}
}
@@ -1015,11 +888,12 @@
String errorMsg;
if (isTimestampBeyondMaxLookBack(currentTime, getTimestamp(mutation))){
errorMsg = ERROR_MESSAGE_MISSING_INDEX_ROW_BEYOND_MAX_LOOKBACK;
- verificationPhaseResult.beyondMaxLookBackMissingIndexRowCount++;
+ verificationPhaseResult.
+ setBeyondMaxLookBackMissingIndexRowCount(verificationPhaseResult.getBeyondMaxLookBackMissingIndexRowCount() + 1);
}
else {
errorMsg = "Missing index row";
- verificationPhaseResult.missingIndexRowCount++;
+ verificationPhaseResult.setMissingIndexRowCount(verificationPhaseResult.getMissingIndexRowCount() + 1);
}
byte[] dataKey = indexMaintainer.buildDataRowKey(new ImmutableBytesWritable(keyRange.getLowerRange()), viewConstants);
logToIndexToolOutputTable(dataKey,
@@ -1145,12 +1019,12 @@
}
private void verifyAndOrRebuildIndex() throws IOException {
- IndexToolVerificationResult nextVerificationResult = new IndexToolVerificationResult();
- nextVerificationResult.scannedDataRowCount = dataKeyToMutationMap.size();
+ IndexToolVerificationResult nextVerificationResult = new IndexToolVerificationResult(scan);
+ nextVerificationResult.setScannedDataRowCount(dataKeyToMutationMap.size());
if (verifyType == IndexTool.IndexVerifyType.AFTER || verifyType == IndexTool.IndexVerifyType.NONE) {
// For these options we start with rebuilding index rows
rebuildIndexRows(mutations);
- nextVerificationResult.rebuiltIndexRowCount = dataKeyToMutationMap.size();
+ nextVerificationResult.setRebuiltIndexRowCount(dataKeyToMutationMap.size());
isBeforeRebuilt = false;
}
if (verifyType == IndexTool.IndexVerifyType.NONE) {
@@ -1161,7 +1035,7 @@
IndexToolVerificationResult.PhaseResult verificationPhaseResult = new IndexToolVerificationResult.PhaseResult();
// For these options we start with verifying index rows
parallelizeIndexVerify(verificationPhaseResult);
- nextVerificationResult.before.add(verificationPhaseResult);
+ nextVerificationResult.getBefore().add(verificationPhaseResult);
}
if (verifyType == IndexTool.IndexVerifyType.BEFORE || verifyType == IndexTool.IndexVerifyType.BOTH) {
// For these options, we have identified the rows to be rebuilt and now need to rebuild them
@@ -1177,7 +1051,7 @@
}
}
rebuildIndexRows(mutations);
- nextVerificationResult.rebuiltIndexRowCount += dataKeyToMutationMap.size();
+ nextVerificationResult.setRebuiltIndexRowCount(nextVerificationResult.getRebuiltIndexRowCount() + dataKeyToMutationMap.size());
isBeforeRebuilt = false;
}
@@ -1189,7 +1063,7 @@
prepareIndexMutations(entry.getValue().getFirst(), entry.getValue().getSecond());
}
parallelizeIndexVerify(verificationPhaseResult);
- nextVerificationResult.after.add(verificationPhaseResult);
+ nextVerificationResult.getAfter().add(verificationPhaseResult);
}
verificationResult.add(nextVerificationResult);
}
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/IndexToolVerificationResult.java b/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/IndexToolVerificationResult.java
index 1fbb866..989e03f 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/IndexToolVerificationResult.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/IndexToolVerificationResult.java
@@ -19,63 +19,141 @@
import org.apache.hadoop.hbase.Cell;
import org.apache.hadoop.hbase.CellUtil;
-import org.apache.hadoop.hbase.HConstants;
-import org.apache.hadoop.hbase.client.Result;
-import org.apache.hadoop.hbase.client.ResultScanner;
import org.apache.hadoop.hbase.client.Scan;
-import org.apache.hadoop.hbase.client.Table;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.phoenix.mapreduce.index.IndexTool;
-import java.io.IOException;
-import java.util.Arrays;
-
-import static org.apache.phoenix.mapreduce.index.IndexTool.*;
+import static org.apache.phoenix.mapreduce.index.IndexVerificationResultRepository.AFTER_REBUILD_BEYOND_MAXLOOKBACK_INVALID_INDEX_ROW_COUNT_BYTES;
+import static org.apache.phoenix.mapreduce.index.IndexVerificationResultRepository.AFTER_REBUILD_BEYOND_MAXLOOKBACK_MISSING_INDEX_ROW_COUNT_BYTES;
+import static org.apache.phoenix.mapreduce.index.IndexVerificationResultRepository.AFTER_REBUILD_EXPIRED_INDEX_ROW_COUNT_BYTES;
+import static org.apache.phoenix.mapreduce.index.IndexVerificationResultRepository.AFTER_REBUILD_INVALID_INDEX_ROW_COUNT_BYTES;
+import static org.apache.phoenix.mapreduce.index.IndexVerificationResultRepository.AFTER_REBUILD_MISSING_INDEX_ROW_COUNT_BYTES;
+import static org.apache.phoenix.mapreduce.index.IndexVerificationResultRepository.AFTER_REBUILD_VALID_INDEX_ROW_COUNT_BYTES;
+import static org.apache.phoenix.mapreduce.index.IndexVerificationResultRepository.BEFORE_REBUILD_BEYOND_MAXLOOKBACK_INVALID_INDEX_ROW_COUNT_BYTES;
+import static org.apache.phoenix.mapreduce.index.IndexVerificationResultRepository.BEFORE_REBUILD_BEYOND_MAXLOOKBACK_MISSING_INDEX_ROW_COUNT_BYTES;
+import static org.apache.phoenix.mapreduce.index.IndexVerificationResultRepository.BEFORE_REBUILD_EXPIRED_INDEX_ROW_COUNT_BYTES;
+import static org.apache.phoenix.mapreduce.index.IndexVerificationResultRepository.BEFORE_REBUILD_INVALID_INDEX_ROW_COUNT_BYTES;
+import static org.apache.phoenix.mapreduce.index.IndexVerificationResultRepository.BEFORE_REBUILD_MISSING_INDEX_ROW_COUNT_BYTES;
+import static org.apache.phoenix.mapreduce.index.IndexVerificationResultRepository.BEFORE_REBUILD_VALID_INDEX_ROW_COUNT_BYTES;
+import static org.apache.phoenix.mapreduce.index.IndexVerificationResultRepository.REBUILT_INDEX_ROW_COUNT_BYTES;
+import static org.apache.phoenix.mapreduce.index.IndexVerificationResultRepository.RESULT_TABLE_COLUMN_FAMILY;
+import static org.apache.phoenix.mapreduce.index.IndexVerificationResultRepository.SCANNED_DATA_ROW_COUNT_BYTES;
public class IndexToolVerificationResult {
+
+ public void setScannedDataRowCount(long scannedDataRowCount) {
+ this.scannedDataRowCount = scannedDataRowCount;
+ }
+
+ public void setRebuiltIndexRowCount(long rebuiltIndexRowCount) {
+ this.rebuiltIndexRowCount = rebuiltIndexRowCount;
+ }
+
+ public PhaseResult getBefore() {
+ return before;
+ }
+
+ public void setBefore(PhaseResult before) {
+ this.before = before;
+ }
+
+ public PhaseResult getAfter() {
+ return after;
+ }
+
+ public void setAfter(PhaseResult after) {
+ this.after = after;
+ }
+
+ public byte[] getStartRow() {
+ return startRow;
+ }
+
+ public byte[] getStopRow() {
+ return stopRow;
+ }
+
+ public long getScanMaxTs() {
+ return scanMaxTs;
+ }
+
+ public IndexToolVerificationResult(long scanMaxTs) {
+ this.scanMaxTs = scanMaxTs;
+ }
+
+ public IndexToolVerificationResult(byte[] startRow, byte[] stopRow, long scanMaxTs) {
+ this.setStartRow(startRow);
+ this.setStopRow(stopRow);
+ this.scanMaxTs = scanMaxTs;
+ }
+
+ public IndexToolVerificationResult(Scan scan) {
+ this.setStartRow(scan.getStartRow());
+ this.setStopRow(scan.getStopRow());
+ this.scanMaxTs = scan.getTimeRange().getMax();
+ }
+
+ public byte[] getRegion() {
+ return region;
+ }
+
+ public void setStartRow(byte[] startRow) {
+ this.startRow = startRow;
+ }
+
+ public void setStopRow(byte[] stopRow) {
+ this.stopRow = stopRow;
+ }
+
public static class PhaseResult {
- long validIndexRowCount = 0;
- long expiredIndexRowCount = 0;
- long missingIndexRowCount = 0;
- long invalidIndexRowCount = 0;
- long beyondMaxLookBackMissingIndexRowCount = 0;
- long beyondMaxLookBackInvalidIndexRowCount = 0;
+ private long validIndexRowCount = 0;
+ private long expiredIndexRowCount = 0;
+ private long missingIndexRowCount = 0;
+ private long invalidIndexRowCount = 0;
+ private long beyondMaxLookBackMissingIndexRowCount = 0;
+ private long beyondMaxLookBackInvalidIndexRowCount = 0;
public void add(PhaseResult phaseResult) {
- validIndexRowCount += phaseResult.validIndexRowCount;
- expiredIndexRowCount += phaseResult.expiredIndexRowCount;
- missingIndexRowCount += phaseResult.missingIndexRowCount;
- invalidIndexRowCount += phaseResult.invalidIndexRowCount;
- beyondMaxLookBackMissingIndexRowCount += phaseResult.beyondMaxLookBackMissingIndexRowCount;
- beyondMaxLookBackInvalidIndexRowCount += phaseResult.beyondMaxLookBackInvalidIndexRowCount;
+ setBeyondMaxLookBackMissingIndexRowCount(getBeyondMaxLookBackMissingIndexRowCount() +
+ phaseResult.getBeyondMaxLookBackMissingIndexRowCount());
+ setBeyondMaxLookBackInvalidIndexRowCount(getBeyondMaxLookBackInvalidIndexRowCount() +
+ phaseResult.getBeyondMaxLookBackInvalidIndexRowCount());
+ setValidIndexRowCount(getValidIndexRowCount() + phaseResult.getValidIndexRowCount());
+ setExpiredIndexRowCount(getExpiredIndexRowCount() + phaseResult.getExpiredIndexRowCount());
+ setMissingIndexRowCount(getMissingIndexRowCount() + phaseResult.getMissingIndexRowCount());
+ setInvalidIndexRowCount(getInvalidIndexRowCount() + phaseResult.getInvalidIndexRowCount());
}
- public PhaseResult(){}
+ public PhaseResult() {
+ }
public PhaseResult(long validIndexRowCount, long expiredIndexRowCount,
- long missingIndexRowCount, long invalidIndexRowCount,
- long beyondMaxLookBackMissingIndexRowCount, long beyondMaxLookBackInvalidIndexRowCount) {
- this.validIndexRowCount = validIndexRowCount;
- this.expiredIndexRowCount = expiredIndexRowCount;
- this.missingIndexRowCount = missingIndexRowCount;
- this.invalidIndexRowCount = invalidIndexRowCount;
- this.beyondMaxLookBackInvalidIndexRowCount = beyondMaxLookBackInvalidIndexRowCount;
- this.beyondMaxLookBackMissingIndexRowCount = beyondMaxLookBackMissingIndexRowCount;
+ long missingIndexRowCount, long invalidIndexRowCount,
+ long beyondMaxLookBackMissingIndexRowCount,
+ long beyondMaxLookBackInvalidIndexRowCount) {
+ this.setValidIndexRowCount(validIndexRowCount);
+ this.setExpiredIndexRowCount(expiredIndexRowCount);
+ this.setMissingIndexRowCount(missingIndexRowCount);
+ this.setInvalidIndexRowCount(invalidIndexRowCount);
+ this.setBeyondMaxLookBackInvalidIndexRowCount(beyondMaxLookBackInvalidIndexRowCount);
+ this.setBeyondMaxLookBackMissingIndexRowCount(beyondMaxLookBackMissingIndexRowCount);
}
+
public long getTotalCount() {
- return validIndexRowCount + expiredIndexRowCount + missingIndexRowCount + invalidIndexRowCount + beyondMaxLookBackMissingIndexRowCount + beyondMaxLookBackInvalidIndexRowCount;
+ return getValidIndexRowCount() + getExpiredIndexRowCount() + getMissingIndexRowCount() + getInvalidIndexRowCount()
+ + getBeyondMaxLookBackMissingIndexRowCount() + getBeyondMaxLookBackInvalidIndexRowCount();
}
@Override
public String toString() {
return "PhaseResult{" +
- "validIndexRowCount=" + validIndexRowCount +
- ", expiredIndexRowCount=" + expiredIndexRowCount +
- ", missingIndexRowCount=" + missingIndexRowCount +
- ", invalidIndexRowCount=" + invalidIndexRowCount +
- ", beyondMaxLookBackMissingIndexRowCount=" + beyondMaxLookBackMissingIndexRowCount +
- ", beyondMaxLookBackInvalidIndexRowCount=" + beyondMaxLookBackInvalidIndexRowCount;
+ "validIndexRowCount=" + getValidIndexRowCount() +
+ ", expiredIndexRowCount=" + getExpiredIndexRowCount() +
+ ", missingIndexRowCount=" + getMissingIndexRowCount() +
+ ", invalidIndexRowCount=" + getInvalidIndexRowCount() +
+ ", beyondMaxLookBackMissingIndexRowCount=" + getBeyondMaxLookBackMissingIndexRowCount() +
+ ", beyondMaxLookBackInvalidIndexRowCount=" + getBeyondMaxLookBackInvalidIndexRowCount();
}
@Override
@@ -87,39 +165,91 @@
return false;
}
PhaseResult pr = (PhaseResult) o;
- return this.expiredIndexRowCount == pr.expiredIndexRowCount
- && this.validIndexRowCount == pr.validIndexRowCount
- && this.invalidIndexRowCount == pr.invalidIndexRowCount
- && this.missingIndexRowCount == pr.missingIndexRowCount
- && this.beyondMaxLookBackInvalidIndexRowCount == pr.beyondMaxLookBackInvalidIndexRowCount
- && this.beyondMaxLookBackMissingIndexRowCount == pr.beyondMaxLookBackMissingIndexRowCount;
+ return this.getExpiredIndexRowCount() == pr.getExpiredIndexRowCount()
+ && this.getValidIndexRowCount() == pr.getValidIndexRowCount()
+ && this.getInvalidIndexRowCount() == pr.getInvalidIndexRowCount()
+ && this.getMissingIndexRowCount() == pr.getMissingIndexRowCount()
+ && this.getBeyondMaxLookBackInvalidIndexRowCount() == pr.getBeyondMaxLookBackInvalidIndexRowCount()
+ && this.getBeyondMaxLookBackMissingIndexRowCount() == pr.getBeyondMaxLookBackMissingIndexRowCount();
}
@Override
public int hashCode() {
long result = 17;
- result = 31 * result + expiredIndexRowCount;
- result = 31 * result + validIndexRowCount;
- result = 31 * result + missingIndexRowCount;
- result = 31 * result + invalidIndexRowCount;
- result = 31 * result + beyondMaxLookBackMissingIndexRowCount;
- result = 31 * result + beyondMaxLookBackInvalidIndexRowCount;
- return (int)result;
+ result = 31 * result + getExpiredIndexRowCount();
+ result = 31 * result + getValidIndexRowCount();
+ result = 31 * result + getMissingIndexRowCount();
+ result = 31 * result + getInvalidIndexRowCount();
+ result = 31 * result + getBeyondMaxLookBackMissingIndexRowCount();
+ result = 31 * result + getBeyondMaxLookBackInvalidIndexRowCount();
+ return (int) result;
+ }
+
+ public long getValidIndexRowCount() {
+ return validIndexRowCount;
+ }
+
+ public void setValidIndexRowCount(long validIndexRowCount) {
+ this.validIndexRowCount = validIndexRowCount;
+ }
+
+ public long getExpiredIndexRowCount() {
+ return expiredIndexRowCount;
+ }
+
+ public void setExpiredIndexRowCount(long expiredIndexRowCount) {
+ this.expiredIndexRowCount = expiredIndexRowCount;
+ }
+
+ public long getMissingIndexRowCount() {
+ return missingIndexRowCount;
+ }
+
+ public void setMissingIndexRowCount(long missingIndexRowCount) {
+ this.missingIndexRowCount = missingIndexRowCount;
+ }
+
+ public long getInvalidIndexRowCount() {
+ return invalidIndexRowCount;
+ }
+
+ public void setInvalidIndexRowCount(long invalidIndexRowCount) {
+ this.invalidIndexRowCount = invalidIndexRowCount;
+ }
+
+ public long getBeyondMaxLookBackMissingIndexRowCount() {
+ return beyondMaxLookBackMissingIndexRowCount;
+ }
+
+ public void setBeyondMaxLookBackMissingIndexRowCount(long beyondMaxLookBackMissingIndexRowCount) {
+ this.beyondMaxLookBackMissingIndexRowCount = beyondMaxLookBackMissingIndexRowCount;
+ }
+
+ public long getBeyondMaxLookBackInvalidIndexRowCount() {
+ return beyondMaxLookBackInvalidIndexRowCount;
+ }
+
+ public void setBeyondMaxLookBackInvalidIndexRowCount(long beyondMaxLookBackInvalidIndexRowCount) {
+ this.beyondMaxLookBackInvalidIndexRowCount = beyondMaxLookBackInvalidIndexRowCount;
}
}
- long scannedDataRowCount = 0;
- long rebuiltIndexRowCount = 0;
- PhaseResult before = new PhaseResult();
- PhaseResult after = new PhaseResult();
+ private long scannedDataRowCount = 0;
+ private long rebuiltIndexRowCount = 0;
+ private byte[] startRow;
+ private byte[] stopRow;
+ private long scanMaxTs;
+ private byte[] region;
+ private PhaseResult before = new PhaseResult();
+ private PhaseResult after = new PhaseResult();
@Override
public String toString() {
return "VerificationResult{" +
- "scannedDataRowCount=" + scannedDataRowCount +
- ", rebuiltIndexRowCount=" + rebuiltIndexRowCount +
- ", before=" + before +
- ", after=" + after +
+ "scannedDataRowCount=" + getScannedDataRowCount() +
+ ", rebuiltIndexRowCount=" + getRebuiltIndexRowCount() +
+ ", before=" + getBefore() +
+ ", after=" + getAfter() +
'}';
}
@@ -132,107 +262,107 @@
}
public long getBeforeRebuildValidIndexRowCount() {
- return before.validIndexRowCount;
+ return getBefore().getValidIndexRowCount();
}
public long getBeforeRebuildExpiredIndexRowCount() {
- return before.expiredIndexRowCount;
+ return getBefore().getExpiredIndexRowCount();
}
public long getBeforeRebuildInvalidIndexRowCount() {
- return before.invalidIndexRowCount;
+ return getBefore().getInvalidIndexRowCount();
}
public long getBeforeRebuildBeyondMaxLookBackMissingIndexRowCount() {
- return before.beyondMaxLookBackMissingIndexRowCount;
- };
+ return before.getBeyondMaxLookBackMissingIndexRowCount();
+ }
public long getBeforeRebuildBeyondMaxLookBackInvalidIndexRowCount() {
- return before.beyondMaxLookBackInvalidIndexRowCount;
- };
+ return before.getBeyondMaxLookBackInvalidIndexRowCount();
+ }
public long getBeforeRebuildMissingIndexRowCount() {
- return before.missingIndexRowCount;
+ return getBefore().getMissingIndexRowCount();
}
public long getAfterRebuildValidIndexRowCount() {
- return after.validIndexRowCount;
+ return getAfter().getValidIndexRowCount();
}
public long getAfterRebuildExpiredIndexRowCount() {
- return after.expiredIndexRowCount;
+ return getAfter().getExpiredIndexRowCount();
}
public long getAfterRebuildInvalidIndexRowCount() {
- return after.invalidIndexRowCount;
+ return getAfter().getInvalidIndexRowCount();
}
public long getAfterRebuildMissingIndexRowCount() {
- return after.missingIndexRowCount;
+ return getAfter().getMissingIndexRowCount();
}
public long getAfterRebuildBeyondMaxLookBackMissingIndexRowCount() {
- return after.beyondMaxLookBackMissingIndexRowCount;
- };
+ return after.getBeyondMaxLookBackMissingIndexRowCount();
+ }
public long getAfterRebuildBeyondMaxLookBackInvalidIndexRowCount() {
- return after.beyondMaxLookBackInvalidIndexRowCount;
- };
+ return after.getBeyondMaxLookBackInvalidIndexRowCount();
+ }
private void addScannedDataRowCount(long count) {
- this.scannedDataRowCount += count;
+ this.setScannedDataRowCount(this.getScannedDataRowCount() + count);
}
private void addRebuiltIndexRowCount(long count) {
- this.rebuiltIndexRowCount += count;
+ this.setRebuiltIndexRowCount(this.getRebuiltIndexRowCount() + count);
}
private void addBeforeRebuildValidIndexRowCount(long count) {
- before.validIndexRowCount += count;
+ getBefore().setValidIndexRowCount(getBefore().getValidIndexRowCount() + count);
}
private void addBeforeRebuildExpiredIndexRowCount(long count) {
- before.expiredIndexRowCount += count;
+ getBefore().setExpiredIndexRowCount(getBefore().getExpiredIndexRowCount() + count);
}
private void addBeforeRebuildMissingIndexRowCount(long count) {
- before.missingIndexRowCount += count;
+ getBefore().setMissingIndexRowCount(getBefore().getMissingIndexRowCount() + count);
}
private void addBeforeRebuildInvalidIndexRowCount(long count) {
- before.invalidIndexRowCount += count;
+ getBefore().setInvalidIndexRowCount(getBefore().getInvalidIndexRowCount() + count);
}
private void addBeforeRebuildBeyondMaxLookBackMissingIndexRowCount(long count) {
- before.beyondMaxLookBackMissingIndexRowCount += count;
+ before.setBeyondMaxLookBackMissingIndexRowCount(before.getBeyondMaxLookBackMissingIndexRowCount() + count);
}
private void addBeforeRebuildBeyondMaxLookBackInvalidIndexRowCount(long count) {
- before.beyondMaxLookBackInvalidIndexRowCount += count;
+ before.setBeyondMaxLookBackInvalidIndexRowCount(before.getBeyondMaxLookBackInvalidIndexRowCount() + count);
}
private void addAfterRebuildValidIndexRowCount(long count) {
- after.validIndexRowCount += count;
+ getAfter().setValidIndexRowCount(getAfter().getValidIndexRowCount() + count);
}
private void addAfterRebuildExpiredIndexRowCount(long count) {
- after.expiredIndexRowCount += count;
+ getAfter().setExpiredIndexRowCount(getAfter().getExpiredIndexRowCount() + count);
}
private void addAfterRebuildMissingIndexRowCount(long count) {
- after.missingIndexRowCount += count;
+ getAfter().setMissingIndexRowCount(getAfter().getMissingIndexRowCount() + count);
}
private void addAfterRebuildInvalidIndexRowCount(long count) {
- after.invalidIndexRowCount += count;
+ getAfter().setInvalidIndexRowCount(getAfter().getInvalidIndexRowCount() + count);
}
private void addAfterRebuildBeyondMaxLookBackMissingIndexRowCount(long count) {
- after.beyondMaxLookBackMissingIndexRowCount += count;
+ after.setBeyondMaxLookBackMissingIndexRowCount(after.getBeyondMaxLookBackMissingIndexRowCount() + count);
}
private void addAfterRebuildBeyondMaxLookBackInvalidIndexRowCount(long count) {
- after.beyondMaxLookBackInvalidIndexRowCount += count;
+ after.setBeyondMaxLookBackInvalidIndexRowCount(after.getBeyondMaxLookBackInvalidIndexRowCount() + count);
}
private static boolean isAfterRebuildInvalidIndexRowCount(Cell cell) {
@@ -249,7 +379,7 @@
cell.getValueOffset(), cell.getValueLength()));
}
- private void update(Cell cell) {
+ public void update(Cell cell) {
if (CellUtil
.matchingColumn(cell, RESULT_TABLE_COLUMN_FAMILY, SCANNED_DATA_ROW_COUNT_BYTES)) {
addScannedDataRowCount(getValue(cell));
@@ -282,57 +412,17 @@
}
}
- public static byte[] calculateTheClosestNextRowKeyForPrefix(byte[] rowKeyPrefix) {
- // Essentially we are treating it like an 'unsigned very very long' and doing +1 manually.
- // Search for the place where the trailing 0xFFs start
- int offset = rowKeyPrefix.length;
- while (offset > 0) {
- if (rowKeyPrefix[offset - 1] != (byte) 0xFF) {
- break;
- }
- offset--;
- }
- if (offset == 0) {
- // We got an 0xFFFF... (only FFs) stopRow value which is
- // the last possible prefix before the end of the table.
- // So set it to stop at the 'end of the table'
- return HConstants.EMPTY_END_ROW;
- }
- // Copy the right length of the original
- byte[] newStopRow = Arrays.copyOfRange(rowKeyPrefix, 0, offset);
- // And increment the last one
- newStopRow[newStopRow.length - 1]++;
- return newStopRow;
- }
-
- public static IndexToolVerificationResult getVerificationResult(Table hTable, long ts)
- throws IOException {
- IndexToolVerificationResult verificationResult = new IndexToolVerificationResult();
- byte[] startRowKey = Bytes.toBytes(Long.toString(ts));
- byte[] stopRowKey = calculateTheClosestNextRowKeyForPrefix(startRowKey);
- Scan scan = new Scan();
- scan.setStartRow(startRowKey);
- scan.setStopRow(stopRowKey);
- ResultScanner scanner = hTable.getScanner(scan);
- for (Result result = scanner.next(); result != null; result = scanner.next()) {
- for (Cell cell : result.rawCells()) {
- verificationResult.update(cell);
- }
- }
- return verificationResult;
- }
-
public boolean isVerificationFailed(IndexTool.IndexVerifyType verifyType) {
if (verifyType == IndexTool.IndexVerifyType.BEFORE || verifyType == IndexTool.IndexVerifyType.NONE) {
return false;
} else if (verifyType == IndexTool.IndexVerifyType.ONLY) {
- if (before.invalidIndexRowCount + before.missingIndexRowCount
- + before.beyondMaxLookBackInvalidIndexRowCount + before.beyondMaxLookBackMissingIndexRowCount > 0) {
+ if (getBefore().getInvalidIndexRowCount() + getBefore().getMissingIndexRowCount() +
+ before.getBeyondMaxLookBackInvalidIndexRowCount() + before.getBeyondMaxLookBackMissingIndexRowCount() > 0) {
return true;
}
} else if (verifyType == IndexTool.IndexVerifyType.BOTH || verifyType == IndexTool.IndexVerifyType.AFTER) {
- if (after.invalidIndexRowCount + after.missingIndexRowCount
- + after.beyondMaxLookBackInvalidIndexRowCount + after.beyondMaxLookBackMissingIndexRowCount > 0) {
+ if (getAfter().getInvalidIndexRowCount() + getAfter().getMissingIndexRowCount() +
+ after.getBeyondMaxLookBackInvalidIndexRowCount() + after.getBeyondMaxLookBackMissingIndexRowCount() > 0) {
return true;
}
}
@@ -340,9 +430,9 @@
}
public void add(IndexToolVerificationResult verificationResult) {
- scannedDataRowCount += verificationResult.scannedDataRowCount;
- rebuiltIndexRowCount += verificationResult.rebuiltIndexRowCount;
- before.add(verificationResult.before);
- after.add(verificationResult.after);
+ setScannedDataRowCount(getScannedDataRowCount() + verificationResult.getScannedDataRowCount());
+ setRebuiltIndexRowCount(getRebuiltIndexRowCount() + verificationResult.getRebuiltIndexRowCount());
+ getBefore().add(verificationResult.getBefore());
+ getAfter().add(verificationResult.getAfter());
}
}
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/index/IndexTool.java b/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/index/IndexTool.java
index 95703a4..e30da37 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/index/IndexTool.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/index/IndexTool.java
@@ -50,7 +50,6 @@
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.HBaseConfiguration;
-import org.apache.hadoop.hbase.HColumnDescriptor;
import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.HTableDescriptor;
import org.apache.hadoop.hbase.TableName;
@@ -73,7 +72,6 @@
import org.apache.hadoop.util.ToolRunner;
import org.apache.phoenix.compile.PostIndexDDLCompiler;
import org.apache.phoenix.coprocessor.BaseScannerRegionObserver;
-import org.apache.phoenix.coprocessor.MetaDataProtocol;
import org.apache.phoenix.hbase.index.ValueGetter;
import org.apache.phoenix.hbase.index.covered.update.ColumnReference;
import org.apache.phoenix.hbase.index.util.IndexManagementUtil;
@@ -89,7 +87,6 @@
import org.apache.phoenix.mapreduce.util.PhoenixMapReduceUtil;
import org.apache.phoenix.parse.HintNode.Hint;
import org.apache.phoenix.query.ConnectionQueryServices;
-import org.apache.phoenix.query.QueryConstants;
import org.apache.phoenix.query.QueryServices;
import org.apache.phoenix.query.QueryServicesOptions;
import org.apache.phoenix.schema.PIndexState;
@@ -155,57 +152,6 @@
}
}
- public final static String OUTPUT_TABLE_NAME = "PHOENIX_INDEX_TOOL";
- public final static byte[] OUTPUT_TABLE_NAME_BYTES = Bytes.toBytes(OUTPUT_TABLE_NAME);
- public final static byte[] OUTPUT_TABLE_COLUMN_FAMILY = QueryConstants.DEFAULT_COLUMN_FAMILY_BYTES;
- public final static String RESULT_TABLE_NAME = "PHOENIX_INDEX_TOOL_RESULT";
- public final static byte[] RESULT_TABLE_NAME_BYTES = Bytes.toBytes(RESULT_TABLE_NAME);
- public final static byte[] RESULT_TABLE_COLUMN_FAMILY = QueryConstants.DEFAULT_COLUMN_FAMILY_BYTES;
- public final static String DATA_TABLE_NAME = "DTName";
- public final static byte[] DATA_TABLE_NAME_BYTES = Bytes.toBytes(DATA_TABLE_NAME);
- public static String INDEX_TABLE_NAME = "ITName";
- public final static byte[] INDEX_TABLE_NAME_BYTES = Bytes.toBytes(INDEX_TABLE_NAME);
- public static String DATA_TABLE_ROW_KEY = "DTRowKey";
- public final static byte[] DATA_TABLE_ROW_KEY_BYTES = Bytes.toBytes(DATA_TABLE_ROW_KEY);
- public static String INDEX_TABLE_ROW_KEY = "ITRowKey";
- public final static byte[] INDEX_TABLE_ROW_KEY_BYTES = Bytes.toBytes(INDEX_TABLE_ROW_KEY);
- public static String DATA_TABLE_TS = "DTTS";
- public final static byte[] DATA_TABLE_TS_BYTES = Bytes.toBytes(DATA_TABLE_TS);
- public static String INDEX_TABLE_TS = "ITTS";
- public final static byte[] INDEX_TABLE_TS_BYTES = Bytes.toBytes(INDEX_TABLE_TS);
- public static String ERROR_MESSAGE = "Error";
- public final static byte[] ERROR_MESSAGE_BYTES = Bytes.toBytes(ERROR_MESSAGE);
- public static String SCANNED_DATA_ROW_COUNT = "ScannedDataRowCount";
- public final static byte[] SCANNED_DATA_ROW_COUNT_BYTES = Bytes.toBytes(SCANNED_DATA_ROW_COUNT);
- public static String REBUILT_INDEX_ROW_COUNT = "RebuiltIndexRowCount";
- public final static byte[] REBUILT_INDEX_ROW_COUNT_BYTES = Bytes.toBytes(REBUILT_INDEX_ROW_COUNT);
- public static String BEFORE_REBUILD_VALID_INDEX_ROW_COUNT = "BeforeRebuildValidIndexRowCount";
- public final static byte[] BEFORE_REBUILD_VALID_INDEX_ROW_COUNT_BYTES = Bytes.toBytes(BEFORE_REBUILD_VALID_INDEX_ROW_COUNT);
- public static String BEFORE_REBUILD_EXPIRED_INDEX_ROW_COUNT = "BeforeRebuildExpiredIndexRowCount";
- public final static byte[] BEFORE_REBUILD_EXPIRED_INDEX_ROW_COUNT_BYTES = Bytes.toBytes(BEFORE_REBUILD_EXPIRED_INDEX_ROW_COUNT);
- public static String BEFORE_REBUILD_MISSING_INDEX_ROW_COUNT = "BeforeRebuildMissingIndexRowCount";
- public final static byte[] BEFORE_REBUILD_MISSING_INDEX_ROW_COUNT_BYTES = Bytes.toBytes(BEFORE_REBUILD_MISSING_INDEX_ROW_COUNT);
- public static String BEFORE_REBUILD_INVALID_INDEX_ROW_COUNT = "BeforeRebuildInvalidIndexRowCount";
- public final static byte[] BEFORE_REBUILD_INVALID_INDEX_ROW_COUNT_BYTES = Bytes.toBytes(BEFORE_REBUILD_INVALID_INDEX_ROW_COUNT);
- public static String BEFORE_REBUILD_BEYOND_MAXLOOKBACK_MISSING_INDEX_ROW_COUNT = "BeforeRebuildBeyondMaxLookBackMissingIndexRowCount";
- public static byte[] BEFORE_REBUILD_BEYOND_MAXLOOKBACK_MISSING_INDEX_ROW_COUNT_BYTES = Bytes.toBytes(BEFORE_REBUILD_BEYOND_MAXLOOKBACK_MISSING_INDEX_ROW_COUNT);
- public static String BEFORE_REBUILD_BEYOND_MAXLOOKBACK_INVALID_INDEX_ROW_COUNT = "BeforeRebuildBeyondMaxLookBackInvalidIndexRowCount";
- public static byte[] BEFORE_REBUILD_BEYOND_MAXLOOKBACK_INVALID_INDEX_ROW_COUNT_BYTES = Bytes.toBytes(BEFORE_REBUILD_BEYOND_MAXLOOKBACK_INVALID_INDEX_ROW_COUNT);
- public static String AFTER_REBUILD_VALID_INDEX_ROW_COUNT = "AfterValidExpiredIndexRowCount";
- public final static byte[] AFTER_REBUILD_VALID_INDEX_ROW_COUNT_BYTES = Bytes.toBytes(AFTER_REBUILD_VALID_INDEX_ROW_COUNT);
- public static String AFTER_REBUILD_EXPIRED_INDEX_ROW_COUNT = "AfterRebuildExpiredIndexRowCount";
- public final static byte[] AFTER_REBUILD_EXPIRED_INDEX_ROW_COUNT_BYTES = Bytes.toBytes(AFTER_REBUILD_EXPIRED_INDEX_ROW_COUNT);
- public static String AFTER_REBUILD_MISSING_INDEX_ROW_COUNT = "AfterRebuildMissingIndexRowCount";
- public final static byte[] AFTER_REBUILD_MISSING_INDEX_ROW_COUNT_BYTES = Bytes.toBytes(AFTER_REBUILD_MISSING_INDEX_ROW_COUNT);
- public static String AFTER_REBUILD_INVALID_INDEX_ROW_COUNT = "AfterRebuildInvalidIndexRowCount";
- public final static byte[] AFTER_REBUILD_INVALID_INDEX_ROW_COUNT_BYTES = Bytes.toBytes(AFTER_REBUILD_INVALID_INDEX_ROW_COUNT);
- public static String AFTER_REBUILD_BEYOND_MAXLOOKBACK_MISSING_INDEX_ROW_COUNT = "AfterRebuildBeyondMaxLookBackMissingIndexRowCount";
- public static byte[] AFTER_REBUILD_BEYOND_MAXLOOKBACK_MISSING_INDEX_ROW_COUNT_BYTES = Bytes.toBytes(AFTER_REBUILD_BEYOND_MAXLOOKBACK_MISSING_INDEX_ROW_COUNT);
- public static String AFTER_REBUILD_BEYOND_MAXLOOKBACK_INVALID_INDEX_ROW_COUNT = "AfterRebuildBeyondMaxLookBackInvalidIndexRowCount";
- public static byte[] AFTER_REBUILD_BEYOND_MAXLOOKBACK_INVALID_INDEX_ROW_COUNT_BYTES = Bytes.toBytes(AFTER_REBUILD_BEYOND_MAXLOOKBACK_INVALID_INDEX_ROW_COUNT);
- public static String VERIFICATION_PHASE = "Phase";
- public final static byte[] VERIFICATION_PHASE_BYTES = Bytes.toBytes(VERIFICATION_PHASE);
-
private static final Logger LOGGER = LoggerFactory.getLogger(IndexTool.class);
private String schemaName;
@@ -703,23 +649,10 @@
}
private void createIndexToolTables(Connection connection) throws Exception {
- ConnectionQueryServices queryServices = connection.unwrap(PhoenixConnection.class).getQueryServices();
- Admin admin = queryServices.getAdmin();
- if (!admin.tableExists(TableName.valueOf(OUTPUT_TABLE_NAME))) {
- HTableDescriptor tableDescriptor = new
- HTableDescriptor(TableName.valueOf(OUTPUT_TABLE_NAME));
- tableDescriptor.setValue(HColumnDescriptor.TTL, String.valueOf(MetaDataProtocol.DEFAULT_LOG_TTL));
- HColumnDescriptor columnDescriptor = new HColumnDescriptor(OUTPUT_TABLE_COLUMN_FAMILY);
- tableDescriptor.addFamily(columnDescriptor);
- admin.createTable(tableDescriptor);
- }
- if (!admin.tableExists(TableName.valueOf(RESULT_TABLE_NAME))) {
- HTableDescriptor tableDescriptor = new
- HTableDescriptor(TableName.valueOf(RESULT_TABLE_NAME));
- tableDescriptor.setValue(HColumnDescriptor.TTL, String.valueOf(MetaDataProtocol.DEFAULT_LOG_TTL));
- HColumnDescriptor columnDescriptor = new HColumnDescriptor(RESULT_TABLE_COLUMN_FAMILY);
- tableDescriptor.addFamily(columnDescriptor);
- admin.createTable(tableDescriptor);
+ try (IndexVerificationResultRepository resultRepo = new IndexVerificationResultRepository();
+ IndexVerificationOutputRepository outputRepo = new IndexVerificationOutputRepository()){
+ resultRepo.createResultTable(connection);
+ outputRepo.createOutputTable(connection);
}
}
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/index/IndexVerificationOutputRepository.java b/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/index/IndexVerificationOutputRepository.java
new file mode 100644
index 0000000..bcc2e73
--- /dev/null
+++ b/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/index/IndexVerificationOutputRepository.java
@@ -0,0 +1,300 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.phoenix.mapreduce.index;
+
+import com.google.common.annotations.VisibleForTesting;
+import org.apache.hadoop.hbase.HColumnDescriptor;
+import org.apache.hadoop.hbase.HTableDescriptor;
+import org.apache.hadoop.hbase.TableName;
+import org.apache.hadoop.hbase.client.Admin;
+import org.apache.hadoop.hbase.client.Put;
+import org.apache.hadoop.hbase.client.Result;
+import org.apache.hadoop.hbase.client.ResultScanner;
+import org.apache.hadoop.hbase.client.Scan;
+import org.apache.hadoop.hbase.client.Table;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.phoenix.coprocessor.MetaDataProtocol;
+import org.apache.phoenix.hbase.index.table.HTableFactory;
+import org.apache.phoenix.hbase.index.util.ImmutableBytesPtr;
+import org.apache.phoenix.jdbc.PhoenixConnection;
+import org.apache.phoenix.query.ConnectionQueryServices;
+import org.apache.phoenix.query.QueryConstants;
+import org.apache.phoenix.util.ByteUtil;
+
+import java.io.IOException;
+import java.sql.Connection;
+import java.sql.SQLException;
+import java.util.ArrayList;
+import java.util.Iterator;
+import java.util.List;
+
+public class IndexVerificationOutputRepository implements AutoCloseable {
+ public static final byte[] ROW_KEY_SEPARATOR_BYTE = Bytes.toBytes("|");
+
+ private Table indexTable;
+ private byte[] indexName;
+ private Table outputTable;
+ public final static String OUTPUT_TABLE_NAME = "PHOENIX_INDEX_TOOL";
+ public final static byte[] OUTPUT_TABLE_NAME_BYTES = Bytes.toBytes(OUTPUT_TABLE_NAME);
+ public final static byte[] OUTPUT_TABLE_COLUMN_FAMILY = QueryConstants.DEFAULT_COLUMN_FAMILY_BYTES;
+
+ public final static String DATA_TABLE_NAME = "DTName";
+ public final static byte[] DATA_TABLE_NAME_BYTES = Bytes.toBytes(DATA_TABLE_NAME);
+ public final static String INDEX_TABLE_NAME = "ITName";
+ public final static byte[] INDEX_TABLE_NAME_BYTES = Bytes.toBytes(INDEX_TABLE_NAME);
+ public final static String DATA_TABLE_ROW_KEY = "DTRowKey";
+ public final static byte[] DATA_TABLE_ROW_KEY_BYTES = Bytes.toBytes(DATA_TABLE_ROW_KEY);
+ public final static String INDEX_TABLE_ROW_KEY = "ITRowKey";
+ public final static byte[] INDEX_TABLE_ROW_KEY_BYTES = Bytes.toBytes(INDEX_TABLE_ROW_KEY);
+ public final static String DATA_TABLE_TS = "DTTS";
+ public final static byte[] DATA_TABLE_TS_BYTES = Bytes.toBytes(DATA_TABLE_TS);
+ public final static String INDEX_TABLE_TS = "ITTS";
+ public final static byte[] INDEX_TABLE_TS_BYTES = Bytes.toBytes(INDEX_TABLE_TS);
+ public final static String ERROR_MESSAGE = "Error";
+ public final static byte[] ERROR_MESSAGE_BYTES = Bytes.toBytes(ERROR_MESSAGE);
+
+ public static String VERIFICATION_PHASE = "Phase";
+ public final static byte[] VERIFICATION_PHASE_BYTES = Bytes.toBytes(VERIFICATION_PHASE);
+ public final static String EXPECTED_VALUE = "ExpectedValue";
+ public final static byte[] EXPECTED_VALUE_BYTES = Bytes.toBytes(EXPECTED_VALUE);
+ public final static String ACTUAL_VALUE = "ActualValue";
+ public final static byte[] ACTUAL_VALUE_BYTES = Bytes.toBytes(ACTUAL_VALUE);
+ public static final byte[] E_VALUE_PREFIX_BYTES = Bytes.toBytes(" E:");
+ public static final byte[] A_VALUE_PREFIX_BYTES = Bytes.toBytes(" A:");
+ public static final int PREFIX_LENGTH = 3;
+ public static final int TOTAL_PREFIX_LENGTH = 6;
+ public static final byte[] PHASE_BEFORE_VALUE = Bytes.toBytes("BEFORE");
+ public static final byte[] PHASE_AFTER_VALUE = Bytes.toBytes("AFTER");
+
+ /**
+ * Only usable for the create table / read path or for testing. Use setOutputTable and
+ * setIndexTable first to write.
+ */
+ public IndexVerificationOutputRepository() {
+
+ }
+
+ @VisibleForTesting
+ public IndexVerificationOutputRepository(byte[] indexName, Connection conn) throws SQLException {
+ ConnectionQueryServices queryServices =
+ conn.unwrap(PhoenixConnection.class).getQueryServices();
+ outputTable = queryServices.getTable(OUTPUT_TABLE_NAME_BYTES);
+ indexTable = queryServices.getTable(indexName);
+ }
+
+ public IndexVerificationOutputRepository(byte[] indexName, HTableFactory hTableFactory) throws IOException {
+ this.indexName = indexName;
+ outputTable = hTableFactory.getTable(new ImmutableBytesPtr(OUTPUT_TABLE_NAME_BYTES));
+ indexTable = hTableFactory.getTable(new ImmutableBytesPtr(indexName));
+ }
+
+ public static byte[] generateOutputTableRowKey(long ts, byte[] indexTableName, byte[] dataRowKey ) {
+ byte[] keyPrefix = Bytes.toBytes(Long.toString(ts));
+ byte[] rowKey;
+ int targetOffset = 0;
+ // The row key for the output table : timestamp | index table name | data row key
+ rowKey = new byte[keyPrefix.length + ROW_KEY_SEPARATOR_BYTE.length + indexTableName.length +
+ ROW_KEY_SEPARATOR_BYTE.length + dataRowKey.length];
+ Bytes.putBytes(rowKey, targetOffset, keyPrefix, 0, keyPrefix.length);
+ targetOffset += keyPrefix.length;
+ Bytes.putBytes(rowKey, targetOffset, ROW_KEY_SEPARATOR_BYTE, 0, ROW_KEY_SEPARATOR_BYTE.length);
+ targetOffset += ROW_KEY_SEPARATOR_BYTE.length;
+ Bytes.putBytes(rowKey, targetOffset, indexTableName, 0, indexTableName.length);
+ targetOffset += indexTableName.length;
+ Bytes.putBytes(rowKey, targetOffset, ROW_KEY_SEPARATOR_BYTE, 0, ROW_KEY_SEPARATOR_BYTE.length);
+ targetOffset += ROW_KEY_SEPARATOR_BYTE.length;
+ Bytes.putBytes(rowKey, targetOffset, dataRowKey, 0, dataRowKey.length);
+ return rowKey;
+ }
+
+ /**
+ * Generates partial row key for use in a Scan to get all rows for an index verification
+ */
+ private static byte[] generatePartialOutputTableRowKey(long ts, byte[] indexTableName){
+ byte[] keyPrefix = Bytes.toBytes(Long.toString(ts));
+ byte[] partialRowKey;
+ int targetOffset = 0;
+ // The row key for the output table : timestamp | index table name | data row key
+ partialRowKey = new byte[keyPrefix.length + ROW_KEY_SEPARATOR_BYTE.length + indexTableName.length];
+ Bytes.putBytes(partialRowKey, targetOffset, keyPrefix, 0, keyPrefix.length);
+ targetOffset += keyPrefix.length;
+ Bytes.putBytes(partialRowKey, targetOffset, ROW_KEY_SEPARATOR_BYTE, 0, ROW_KEY_SEPARATOR_BYTE.length);
+ targetOffset += ROW_KEY_SEPARATOR_BYTE.length;
+ Bytes.putBytes(partialRowKey, targetOffset, indexTableName, 0, indexTableName.length);
+ return partialRowKey;
+ }
+
+ public void createOutputTable(Connection connection) throws IOException, SQLException {
+ ConnectionQueryServices queryServices = connection.unwrap(PhoenixConnection.class).getQueryServices();
+ Admin admin = queryServices.getAdmin();
+ TableName outputTableName = TableName.valueOf(OUTPUT_TABLE_NAME);
+ if (!admin.tableExists(outputTableName)) {
+ HTableDescriptor tableDescriptor = new
+ HTableDescriptor(TableName.valueOf(OUTPUT_TABLE_NAME));
+ tableDescriptor.setValue(HColumnDescriptor.TTL,
+ String.valueOf(MetaDataProtocol.DEFAULT_LOG_TTL));
+ HColumnDescriptor columnDescriptor = new HColumnDescriptor(OUTPUT_TABLE_COLUMN_FAMILY);
+ tableDescriptor.addFamily(columnDescriptor);
+ admin.createTable(tableDescriptor);
+ outputTable = admin.getConnection().getTable(outputTableName);
+ }
+ }
+
+ @VisibleForTesting
+ public void logToIndexToolOutputTable(byte[] dataRowKey, byte[] indexRowKey, long dataRowTs,
+ long indexRowTs,
+ String errorMsg, byte[] expectedValue, byte[] actualValue,
+ long scanMaxTs, byte[] tableName, boolean isBeforeRebuild)
+ throws IOException {
+ byte[] rowKey = generateOutputTableRowKey(scanMaxTs, indexTable.getName().toBytes(), dataRowKey);
+ Put put = new Put(rowKey);
+ put.addColumn(OUTPUT_TABLE_COLUMN_FAMILY, DATA_TABLE_NAME_BYTES,
+ scanMaxTs, tableName);
+ put.addColumn(OUTPUT_TABLE_COLUMN_FAMILY, INDEX_TABLE_NAME_BYTES,
+ scanMaxTs, indexName);
+ put.addColumn(OUTPUT_TABLE_COLUMN_FAMILY, DATA_TABLE_TS_BYTES,
+ scanMaxTs, Bytes.toBytes(Long.toString(dataRowTs)));
+
+ put.addColumn(OUTPUT_TABLE_COLUMN_FAMILY, INDEX_TABLE_ROW_KEY_BYTES,
+ scanMaxTs, indexRowKey);
+ put.addColumn(OUTPUT_TABLE_COLUMN_FAMILY, INDEX_TABLE_TS_BYTES,
+ scanMaxTs, Bytes.toBytes(Long.toString(indexRowTs)));
+ byte[] errorMessageBytes;
+ if (expectedValue != null) {
+ errorMessageBytes = getErrorMessageBytes(errorMsg, expectedValue, actualValue);
+ put.addColumn(OUTPUT_TABLE_COLUMN_FAMILY, EXPECTED_VALUE_BYTES, expectedValue);
+ put.addColumn(OUTPUT_TABLE_COLUMN_FAMILY, ACTUAL_VALUE_BYTES, actualValue);
+ } else {
+ errorMessageBytes = Bytes.toBytes(errorMsg);
+ }
+ put.addColumn(OUTPUT_TABLE_COLUMN_FAMILY, ERROR_MESSAGE_BYTES, scanMaxTs, errorMessageBytes);
+ if (isBeforeRebuild) {
+ put.addColumn(OUTPUT_TABLE_COLUMN_FAMILY, VERIFICATION_PHASE_BYTES, scanMaxTs, PHASE_BEFORE_VALUE);
+ } else {
+ put.addColumn(OUTPUT_TABLE_COLUMN_FAMILY, VERIFICATION_PHASE_BYTES, scanMaxTs, PHASE_AFTER_VALUE);
+ }
+ outputTable.put(put);
+ }
+
+ public static byte[] getErrorMessageBytes(String errorMsg, byte[] expectedValue, byte[] actualValue) {
+ byte[] errorMessageBytes;
+ errorMessageBytes = new byte[errorMsg.length() + expectedValue.length + actualValue.length +
+ TOTAL_PREFIX_LENGTH];
+ Bytes.putBytes(errorMessageBytes, 0, Bytes.toBytes(errorMsg), 0, errorMsg.length());
+ int length = errorMsg.length();
+ Bytes.putBytes(errorMessageBytes, length, E_VALUE_PREFIX_BYTES, 0, PREFIX_LENGTH);
+ length += PREFIX_LENGTH;
+ Bytes.putBytes(errorMessageBytes, length, expectedValue, 0, expectedValue.length);
+ length += expectedValue.length;
+ Bytes.putBytes(errorMessageBytes, length, A_VALUE_PREFIX_BYTES, 0, PREFIX_LENGTH);
+ length += PREFIX_LENGTH;
+ Bytes.putBytes(errorMessageBytes, length, actualValue, 0, actualValue.length);
+ return errorMessageBytes;
+ }
+
+ public List<IndexVerificationOutputRow> getOutputRows(long ts, byte[] indexName)
+ throws IOException {
+ Iterator<IndexVerificationOutputRow> iter = getOutputRowIterator(ts, indexName);
+ List<IndexVerificationOutputRow> outputRowList = new ArrayList<IndexVerificationOutputRow>();
+ while (iter.hasNext()){
+ outputRowList.add(iter.next());
+ }
+ return outputRowList;
+ }
+
+ public Iterator<IndexVerificationOutputRow> getOutputRowIterator(long ts, byte[] indexName)
+ throws IOException {
+ Scan scan = new Scan();
+ byte[] partialKey = generatePartialOutputTableRowKey(ts, indexName);
+ scan.withStartRow(partialKey);
+ scan.withStopRow(ByteUtil.calculateTheClosestNextRowKeyForPrefix(partialKey));
+ ResultScanner scanner = outputTable.getScanner(scan);
+ return new IndexVerificationOutputRowIterator(scanner.iterator());
+ }
+
+ public static IndexVerificationOutputRow getOutputRowFromResult(Result result) {
+ IndexVerificationOutputRow.IndexVerificationOutputRowBuilder builder =
+ new IndexVerificationOutputRow.IndexVerificationOutputRowBuilder();
+ byte[] rowKey = result.getRow();
+ //rowkey is scanTs + SEPARATOR_BYTE + indexTableName + SEPARATOR_BYTE + dataTableRowKey
+ byte[][] rowKeySplit = ByteUtil.splitArrayBySeparator(rowKey, ROW_KEY_SEPARATOR_BYTE[0]);
+ builder.setScanMaxTimestamp(Long.parseLong(Bytes.toString(rowKeySplit[0])));
+ builder.setIndexTableName(Bytes.toString(rowKeySplit[1]));
+ builder.setDataTableRowKey(rowKeySplit[2]);
+
+ builder.setDataTableName(Bytes.toString(result.getValue(OUTPUT_TABLE_COLUMN_FAMILY,
+ DATA_TABLE_NAME_BYTES)));
+ builder.setIndexTableRowKey(result.getValue(OUTPUT_TABLE_COLUMN_FAMILY,
+ INDEX_TABLE_ROW_KEY_BYTES));
+ builder.setDataTableRowTimestamp(Long.parseLong(Bytes.toString(result.getValue(OUTPUT_TABLE_COLUMN_FAMILY,
+ DATA_TABLE_TS_BYTES))));
+ builder.setIndexTableRowTimestamp(Long.parseLong(Bytes.toString(result.getValue(OUTPUT_TABLE_COLUMN_FAMILY,
+ INDEX_TABLE_TS_BYTES))));
+ builder.setErrorMessage(Bytes.toString(result.getValue(OUTPUT_TABLE_COLUMN_FAMILY,
+ ERROR_MESSAGE_BYTES)));
+ //actual and expected value might not be present, but will just set to null if not
+ builder.setExpectedValue(result.getValue(OUTPUT_TABLE_COLUMN_FAMILY, EXPECTED_VALUE_BYTES));
+ builder.setActualValue(result.getValue(OUTPUT_TABLE_COLUMN_FAMILY, ACTUAL_VALUE_BYTES));
+ builder.setPhaseValue(result.getValue(OUTPUT_TABLE_COLUMN_FAMILY, VERIFICATION_PHASE_BYTES));
+ return builder.build();
+ }
+
+ public void close() throws IOException {
+ if (outputTable != null) {
+ outputTable.close();
+ }
+ if (indexTable != null) {
+ indexTable.close();
+ }
+ }
+
+ public class IndexVerificationOutputRowIterator implements Iterator<IndexVerificationOutputRow> {
+ Iterator<Result> delegate;
+ public IndexVerificationOutputRowIterator(Iterator<Result> delegate){
+ this.delegate = delegate;
+ }
+ @Override
+ public boolean hasNext() {
+ return delegate.hasNext();
+ }
+
+ @Override
+ public IndexVerificationOutputRow next() {
+ Result result = delegate.next();
+ if (result == null) {
+ return null;
+ } else {
+ return getOutputRowFromResult(result);
+ }
+ }
+
+ @Override
+ public void remove() {
+ delegate.remove();
+ }
+
+ }
+
+ public void setIndexTable(Table indexTable) {
+ this.indexTable = indexTable;
+ }
+
+ public void setOutputTable(Table outputTable) {
+ this.outputTable = outputTable;
+ }
+}
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/index/IndexVerificationOutputRow.java b/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/index/IndexVerificationOutputRow.java
new file mode 100644
index 0000000..8c54796
--- /dev/null
+++ b/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/index/IndexVerificationOutputRow.java
@@ -0,0 +1,223 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.phoenix.mapreduce.index;
+
+import org.apache.hadoop.hbase.util.Bytes;
+
+import java.util.Arrays;
+import java.util.Objects;
+
+public class IndexVerificationOutputRow {
+ public static final String SCAN_MAX_TIMESTAMP = "ScanMaxTimestamp: ";
+ private String dataTableName;
+ private String indexTableName;
+ private Long scanMaxTimestamp;
+ private byte[] dataTableRowKey;
+ private byte[] indexTableRowKey;
+ private Long dataTableRowTimestamp;
+ private Long indexTableRowTimestamp;
+ private String errorMessage;
+ private byte[] expectedValue;
+ private byte[] actualValue;
+ private byte[] phaseValue;
+
+ private IndexVerificationOutputRow(String dataTableName, String indexTableName,
+ byte[] dataTableRowKey, Long scanMaxTimestamp,
+ byte[] indexTableRowKey,
+ long dataTableRowTimestamp, long indexTableRowTimestamp,
+ String errorMessage, byte[] expectedValue, byte[] actualValue,
+ byte[] phaseValue) {
+ this.dataTableName = dataTableName;
+ this.indexTableName = indexTableName;
+ this.scanMaxTimestamp = scanMaxTimestamp;
+ this.dataTableRowKey = dataTableRowKey;
+ this.indexTableRowKey = indexTableRowKey;
+ this.dataTableRowTimestamp = dataTableRowTimestamp;
+ this.indexTableRowTimestamp = indexTableRowTimestamp;
+ this.errorMessage = errorMessage;
+ this.expectedValue = expectedValue;
+ this.actualValue = actualValue;
+ this.phaseValue = phaseValue;
+ }
+
+ public String getDataTableName() {
+ return dataTableName;
+ }
+
+ public String getIndexTableName() {
+ return indexTableName;
+ }
+
+ public Long getScanMaxTimestamp() {
+ return scanMaxTimestamp;
+ }
+
+ public byte[] getIndexTableRowKey() {
+ return indexTableRowKey;
+ }
+
+ public long getIndexTableRowTimestamp() {
+ return indexTableRowTimestamp;
+ }
+
+ public String getErrorMessage() {
+ return errorMessage;
+ }
+
+ public byte[] getExpectedValue() {
+ return expectedValue;
+ }
+
+ public byte[] getActualValue() {
+ return actualValue;
+ }
+
+ public byte[] getPhaseValue() {
+ return phaseValue;
+ }
+
+ public byte[] getDataTableRowKey() {
+ return dataTableRowKey;
+ }
+
+ public Long getDataTableRowTimestamp() {
+ return dataTableRowTimestamp;
+ }
+
+ @Override
+ public boolean equals(Object o) {
+ if (o == null ) {
+ return false;
+ }
+ if (!(o instanceof IndexVerificationOutputRow)) {
+ return false;
+ }
+ IndexVerificationOutputRow otherRow = (IndexVerificationOutputRow) o;
+
+ return Objects.equals(dataTableName, otherRow.getDataTableName()) &&
+ Objects.equals(indexTableName, otherRow.getIndexTableName()) &&
+ Objects.equals(scanMaxTimestamp, otherRow.getScanMaxTimestamp()) &&
+ Arrays.equals(dataTableRowKey, otherRow.getDataTableRowKey()) &&
+ Arrays.equals(indexTableRowKey, otherRow.getIndexTableRowKey()) &&
+ Objects.equals(dataTableRowTimestamp, otherRow.getDataTableRowTimestamp()) &&
+ Objects.equals(indexTableRowTimestamp, otherRow.getIndexTableRowTimestamp()) &&
+ Objects.equals(errorMessage, otherRow.getErrorMessage()) &&
+ Arrays.equals(expectedValue, otherRow.getExpectedValue()) &&
+ Arrays.equals(actualValue, otherRow.getActualValue()) &&
+ Arrays.equals(phaseValue, otherRow.getPhaseValue());
+ }
+
+ @Override
+ public int hashCode(){
+ return Objects.hashCode(scanMaxTimestamp) ^ Objects.hashCode(indexTableName) ^
+ Arrays.hashCode(dataTableRowKey);
+ }
+
+ @Override
+ public String toString() {
+ StringBuilder sb = new StringBuilder();
+ sb.append(IndexVerificationOutputRepository.DATA_TABLE_NAME + ": ").append(dataTableName).append(",");
+ sb.append(IndexVerificationOutputRepository.INDEX_TABLE_NAME + ": ").append(indexTableName).append(",");
+ sb.append(SCAN_MAX_TIMESTAMP).append(": ").append(scanMaxTimestamp).append(",");
+ sb.append(IndexVerificationOutputRepository.DATA_TABLE_ROW_KEY + ": ").append(Bytes.toString(dataTableRowKey)).append(",");
+ sb.append(IndexVerificationOutputRepository.INDEX_TABLE_ROW_KEY + ": ").append(Bytes.toString(indexTableRowKey)).append(",");
+ sb.append(IndexVerificationOutputRepository.DATA_TABLE_TS + ": ").append(dataTableRowTimestamp).append(",");
+ sb.append(IndexVerificationOutputRepository.INDEX_TABLE_TS + ": ").append(indexTableRowTimestamp).append(",");
+ sb.append(IndexVerificationOutputRepository.ERROR_MESSAGE + ": ").append(errorMessage).append(",");
+ sb.append(IndexVerificationOutputRepository.EXPECTED_VALUE + ": ").append(Bytes.toString(expectedValue)).append(",");
+ sb.append(IndexVerificationOutputRepository.ACTUAL_VALUE + ": ").append(Bytes.toString(actualValue)).append(
+ ",");
+ sb.append(IndexVerificationOutputRepository.VERIFICATION_PHASE + ": ").append(Bytes.toString(phaseValue));
+ return sb.toString();
+ }
+
+ public static class IndexVerificationOutputRowBuilder {
+ private String dataTableName;
+ private String indexTableName;
+ private Long scanMaxTimestamp;
+ private byte[] dataTableRowKey;
+ private byte[] indexTableRowKey;
+ private long dataTableRowTimestamp;
+ private long indexTableRowTimestamp;
+ private String errorMessage;
+ private byte[] expectedValue;
+ private byte[] actualValue;
+ private byte[] phaseValue;
+
+ public IndexVerificationOutputRowBuilder setDataTableName(String dataTableName) {
+ this.dataTableName = dataTableName;
+ return this;
+ }
+
+ public IndexVerificationOutputRowBuilder setIndexTableName(String indexTableName) {
+ this.indexTableName = indexTableName;
+ return this;
+ }
+
+ public IndexVerificationOutputRowBuilder setScanMaxTimestamp(Long scanMaxTimestamp) {
+ this.scanMaxTimestamp = scanMaxTimestamp;
+ return this;
+ }
+
+ public IndexVerificationOutputRowBuilder setIndexTableRowKey(byte[] indexTableRowKey) {
+ this.indexTableRowKey = indexTableRowKey;
+ return this;
+ }
+
+ public IndexVerificationOutputRowBuilder setDataTableRowKey(byte[] dataTableRowKey){
+ this.dataTableRowKey = dataTableRowKey;
+ return this;
+ }
+
+ public IndexVerificationOutputRowBuilder setDataTableRowTimestamp(long dataTableRowTimestamp) {
+ this.dataTableRowTimestamp = dataTableRowTimestamp;
+ return this;
+ }
+
+ public IndexVerificationOutputRowBuilder setIndexTableRowTimestamp(long indexTableRowTimestamp) {
+ this.indexTableRowTimestamp = indexTableRowTimestamp;
+ return this;
+ }
+
+ public IndexVerificationOutputRowBuilder setErrorMessage(String errorMessage) {
+ this.errorMessage = errorMessage;
+ return this;
+ }
+
+ public IndexVerificationOutputRowBuilder setExpectedValue(byte[] expectedValue) {
+ this.expectedValue = expectedValue;
+ return this;
+ }
+
+ public IndexVerificationOutputRowBuilder setActualValue(byte[] actualValue) {
+ this.actualValue = actualValue;
+ return this;
+ }
+
+ public IndexVerificationOutputRowBuilder setPhaseValue(byte[] phaseValue) {
+ this.phaseValue = phaseValue;
+ return this;
+ }
+
+ public IndexVerificationOutputRow build() {
+ return new IndexVerificationOutputRow(dataTableName, indexTableName, dataTableRowKey,
+ scanMaxTimestamp, indexTableRowKey, dataTableRowTimestamp, indexTableRowTimestamp,
+ errorMessage, expectedValue, actualValue, phaseValue);
+ }
+ }
+}
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/index/IndexVerificationResultRepository.java b/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/index/IndexVerificationResultRepository.java
new file mode 100644
index 0000000..ca8b129
--- /dev/null
+++ b/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/index/IndexVerificationResultRepository.java
@@ -0,0 +1,258 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.phoenix.mapreduce.index;
+
+import org.apache.hadoop.hbase.Cell;
+import org.apache.hadoop.hbase.HColumnDescriptor;
+import org.apache.hadoop.hbase.HTableDescriptor;
+import org.apache.hadoop.hbase.TableName;
+import org.apache.hadoop.hbase.client.Admin;
+import org.apache.hadoop.hbase.client.Put;
+import org.apache.hadoop.hbase.client.Result;
+import org.apache.hadoop.hbase.client.ResultScanner;
+import org.apache.hadoop.hbase.client.Scan;
+import org.apache.hadoop.hbase.client.Table;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.phoenix.coprocessor.IndexToolVerificationResult;
+import org.apache.phoenix.coprocessor.MetaDataProtocol;
+import org.apache.phoenix.hbase.index.table.HTableFactory;
+import org.apache.phoenix.hbase.index.util.ImmutableBytesPtr;
+import org.apache.phoenix.jdbc.PhoenixConnection;
+import org.apache.phoenix.query.ConnectionQueryServices;
+import org.apache.phoenix.query.QueryConstants;
+import org.apache.phoenix.util.ByteUtil;
+
+import java.io.IOException;
+import java.sql.Connection;
+import java.sql.SQLException;
+
+public class IndexVerificationResultRepository implements AutoCloseable {
+
+ private Table resultTable;
+ private Table indexTable;
+ public static final byte[] ROW_KEY_SEPARATOR_BYTE = Bytes.toBytes("|");
+ public final static String RESULT_TABLE_NAME = "PHOENIX_INDEX_TOOL_RESULT";
+ public final static byte[] RESULT_TABLE_NAME_BYTES = Bytes.toBytes(RESULT_TABLE_NAME);
+ public final static byte[] RESULT_TABLE_COLUMN_FAMILY = QueryConstants.DEFAULT_COLUMN_FAMILY_BYTES;
+ public final static String SCANNED_DATA_ROW_COUNT = "ScannedDataRowCount";
+ public final static byte[] SCANNED_DATA_ROW_COUNT_BYTES = Bytes.toBytes(SCANNED_DATA_ROW_COUNT);
+ public final static String REBUILT_INDEX_ROW_COUNT = "RebuiltIndexRowCount";
+ public final static byte[] REBUILT_INDEX_ROW_COUNT_BYTES = Bytes.toBytes(REBUILT_INDEX_ROW_COUNT);
+ public final static String BEFORE_REBUILD_VALID_INDEX_ROW_COUNT =
+ "BeforeRebuildValidIndexRowCount";
+ public final static byte[] BEFORE_REBUILD_VALID_INDEX_ROW_COUNT_BYTES = Bytes.toBytes(BEFORE_REBUILD_VALID_INDEX_ROW_COUNT);
+ public final static String BEFORE_REBUILD_EXPIRED_INDEX_ROW_COUNT =
+ "BeforeRebuildExpiredIndexRowCount";
+ public final static byte[] BEFORE_REBUILD_EXPIRED_INDEX_ROW_COUNT_BYTES = Bytes.toBytes(BEFORE_REBUILD_EXPIRED_INDEX_ROW_COUNT);
+ public final static String BEFORE_REBUILD_MISSING_INDEX_ROW_COUNT =
+ "BeforeRebuildMissingIndexRowCount";
+ public final static byte[] BEFORE_REBUILD_MISSING_INDEX_ROW_COUNT_BYTES = Bytes.toBytes(BEFORE_REBUILD_MISSING_INDEX_ROW_COUNT);
+ public final static String BEFORE_REBUILD_INVALID_INDEX_ROW_COUNT =
+ "BeforeRebuildInvalidIndexRowCount";
+ public final static byte[] BEFORE_REBUILD_INVALID_INDEX_ROW_COUNT_BYTES = Bytes.toBytes(BEFORE_REBUILD_INVALID_INDEX_ROW_COUNT);
+ public final static String AFTER_REBUILD_VALID_INDEX_ROW_COUNT =
+ "AfterValidExpiredIndexRowCount";
+ public final static byte[] AFTER_REBUILD_VALID_INDEX_ROW_COUNT_BYTES = Bytes.toBytes(AFTER_REBUILD_VALID_INDEX_ROW_COUNT);
+ public final static String AFTER_REBUILD_EXPIRED_INDEX_ROW_COUNT =
+ "AfterRebuildExpiredIndexRowCount";
+ public final static byte[] AFTER_REBUILD_EXPIRED_INDEX_ROW_COUNT_BYTES = Bytes.toBytes(AFTER_REBUILD_EXPIRED_INDEX_ROW_COUNT);
+ public final static String AFTER_REBUILD_MISSING_INDEX_ROW_COUNT =
+ "AfterRebuildMissingIndexRowCount";
+ public final static byte[] AFTER_REBUILD_MISSING_INDEX_ROW_COUNT_BYTES = Bytes.toBytes(AFTER_REBUILD_MISSING_INDEX_ROW_COUNT);
+ public final static String AFTER_REBUILD_INVALID_INDEX_ROW_COUNT =
+ "AfterRebuildInvalidIndexRowCount";
+ public final static byte[] AFTER_REBUILD_INVALID_INDEX_ROW_COUNT_BYTES = Bytes.toBytes(AFTER_REBUILD_INVALID_INDEX_ROW_COUNT);
+ public final static String BEFORE_REBUILD_BEYOND_MAXLOOKBACK_MISSING_INDEX_ROW_COUNT =
+ "BeforeRebuildBeyondMaxLookBackMissingIndexRowCount";
+ public final static byte[] BEFORE_REBUILD_BEYOND_MAXLOOKBACK_MISSING_INDEX_ROW_COUNT_BYTES =
+ Bytes.toBytes(BEFORE_REBUILD_BEYOND_MAXLOOKBACK_MISSING_INDEX_ROW_COUNT);
+ public final static String BEFORE_REBUILD_BEYOND_MAXLOOKBACK_INVALID_INDEX_ROW_COUNT =
+ "BeforeRebuildBeyondMaxLookBackInvalidIndexRowCount";
+ public final static byte[] BEFORE_REBUILD_BEYOND_MAXLOOKBACK_INVALID_INDEX_ROW_COUNT_BYTES =
+ Bytes.toBytes(BEFORE_REBUILD_BEYOND_MAXLOOKBACK_INVALID_INDEX_ROW_COUNT);
+
+ public final static String AFTER_REBUILD_BEYOND_MAXLOOKBACK_MISSING_INDEX_ROW_COUNT =
+ "AfterRebuildBeyondMaxLookBackMissingIndexRowCount";
+ public final static byte[] AFTER_REBUILD_BEYOND_MAXLOOKBACK_MISSING_INDEX_ROW_COUNT_BYTES =
+ Bytes.toBytes(AFTER_REBUILD_BEYOND_MAXLOOKBACK_MISSING_INDEX_ROW_COUNT);
+ public final static String AFTER_REBUILD_BEYOND_MAXLOOKBACK_INVALID_INDEX_ROW_COUNT =
+ "AfterRebuildBeyondMaxLookBackInvalidIndexRowCount";
+ public final static byte[] AFTER_REBUILD_BEYOND_MAXLOOKBACK_INVALID_INDEX_ROW_COUNT_BYTES =
+ Bytes.toBytes(AFTER_REBUILD_BEYOND_MAXLOOKBACK_INVALID_INDEX_ROW_COUNT);
+
+ /***
+ * Only usable for read / create methods. To write use setResultTable and setIndexTable first
+ */
+ public IndexVerificationResultRepository(){
+
+ }
+
+ public IndexVerificationResultRepository(Connection conn, byte[] indexNameBytes) throws SQLException {
+ resultTable = getTable(conn, RESULT_TABLE_NAME_BYTES);
+ indexTable = getTable(conn, indexNameBytes);
+ }
+
+ public IndexVerificationResultRepository(byte[] indexName,
+ HTableFactory hTableFactory) throws IOException {
+ resultTable = hTableFactory.getTable(new ImmutableBytesPtr(RESULT_TABLE_NAME_BYTES));
+ indexTable = hTableFactory.getTable(new ImmutableBytesPtr(indexName));
+ }
+
+ public void createResultTable(Connection connection) throws IOException, SQLException {
+ ConnectionQueryServices queryServices = connection.unwrap(PhoenixConnection.class).getQueryServices();
+ Admin admin = queryServices.getAdmin();
+ TableName resultTableName = TableName.valueOf(RESULT_TABLE_NAME);
+ if (!admin.tableExists(resultTableName)) {
+ HTableDescriptor tableDescriptor = new
+ HTableDescriptor(TableName.valueOf(RESULT_TABLE_NAME));
+ tableDescriptor.setValue(HColumnDescriptor.TTL, String.valueOf(MetaDataProtocol.DEFAULT_LOG_TTL));
+ HColumnDescriptor columnDescriptor = new HColumnDescriptor(RESULT_TABLE_COLUMN_FAMILY);
+ tableDescriptor.addFamily(columnDescriptor);
+ admin.createTable(tableDescriptor);
+ setResultTable(admin.getConnection().getTable(resultTableName));
+ }
+ }
+ public static byte[] generateResultTableRowKey(long ts, byte[] indexTableName, byte [] regionName,
+ byte[] startRow, byte[] stopRow) {
+ byte[] keyPrefix = Bytes.toBytes(Long.toString(ts));
+ int targetOffset = 0;
+ // The row key for the result table : timestamp | index table name | datable table region name |
+ // scan start row | scan stop row
+ byte[] rowKey = new byte[keyPrefix.length + ROW_KEY_SEPARATOR_BYTE.length + indexTableName.length +
+ ROW_KEY_SEPARATOR_BYTE.length + regionName.length + ROW_KEY_SEPARATOR_BYTE.length +
+ startRow.length + ROW_KEY_SEPARATOR_BYTE.length + stopRow.length];
+ Bytes.putBytes(rowKey, targetOffset, keyPrefix, 0, keyPrefix.length);
+ targetOffset += keyPrefix.length;
+ Bytes.putBytes(rowKey, targetOffset, ROW_KEY_SEPARATOR_BYTE, 0, ROW_KEY_SEPARATOR_BYTE.length);
+ targetOffset += ROW_KEY_SEPARATOR_BYTE.length;
+ Bytes.putBytes(rowKey, targetOffset, indexTableName, 0, indexTableName.length);
+ targetOffset += indexTableName.length;
+ Bytes.putBytes(rowKey, targetOffset, ROW_KEY_SEPARATOR_BYTE, 0, ROW_KEY_SEPARATOR_BYTE.length);
+ targetOffset += ROW_KEY_SEPARATOR_BYTE.length;
+ Bytes.putBytes(rowKey, targetOffset, regionName, 0, regionName.length);
+ targetOffset += regionName.length;
+ Bytes.putBytes(rowKey, targetOffset, ROW_KEY_SEPARATOR_BYTE, 0, ROW_KEY_SEPARATOR_BYTE.length);
+ targetOffset += ROW_KEY_SEPARATOR_BYTE.length;
+ Bytes.putBytes(rowKey, targetOffset, startRow, 0, startRow.length);
+ targetOffset += startRow.length;
+ Bytes.putBytes(rowKey, targetOffset, ROW_KEY_SEPARATOR_BYTE, 0, ROW_KEY_SEPARATOR_BYTE.length);
+ targetOffset += ROW_KEY_SEPARATOR_BYTE.length;
+ Bytes.putBytes(rowKey, targetOffset, stopRow, 0, stopRow.length);
+ return rowKey;
+ }
+
+ public void logToIndexToolResultTable(IndexToolVerificationResult verificationResult,
+ IndexTool.IndexVerifyType verifyType, byte[] region) throws IOException {
+ long scanMaxTs = verificationResult.getScanMaxTs();
+ byte[] rowKey = generateResultTableRowKey(scanMaxTs, indexTable.getName().toBytes(),
+ region, verificationResult.getStartRow(),
+ verificationResult.getStopRow());
+ Put put = new Put(rowKey);
+ put.addColumn(RESULT_TABLE_COLUMN_FAMILY, SCANNED_DATA_ROW_COUNT_BYTES,
+ scanMaxTs, Bytes.toBytes(Long.toString(verificationResult.getScannedDataRowCount())));
+ put.addColumn(RESULT_TABLE_COLUMN_FAMILY, REBUILT_INDEX_ROW_COUNT_BYTES,
+ scanMaxTs, Bytes.toBytes(Long.toString(verificationResult.getRebuiltIndexRowCount())));
+ if (verifyType == IndexTool.IndexVerifyType.BEFORE || verifyType == IndexTool.IndexVerifyType.BOTH ||
+ verifyType == IndexTool.IndexVerifyType.ONLY) {
+ put.addColumn(RESULT_TABLE_COLUMN_FAMILY, BEFORE_REBUILD_VALID_INDEX_ROW_COUNT_BYTES,
+ scanMaxTs, Bytes.toBytes(Long.toString(verificationResult.getBeforeRebuildValidIndexRowCount())));
+ put.addColumn(RESULT_TABLE_COLUMN_FAMILY, BEFORE_REBUILD_EXPIRED_INDEX_ROW_COUNT_BYTES,
+ scanMaxTs, Bytes.toBytes(Long.toString(verificationResult.getBeforeRebuildExpiredIndexRowCount())));
+ put.addColumn(RESULT_TABLE_COLUMN_FAMILY, BEFORE_REBUILD_MISSING_INDEX_ROW_COUNT_BYTES,
+ scanMaxTs, Bytes.toBytes(Long.toString(verificationResult.getBeforeRebuildMissingIndexRowCount())));
+ put.addColumn(RESULT_TABLE_COLUMN_FAMILY, BEFORE_REBUILD_INVALID_INDEX_ROW_COUNT_BYTES,
+ scanMaxTs, Bytes.toBytes(Long.toString(verificationResult.getBeforeRebuildInvalidIndexRowCount())));
+ put.addColumn(RESULT_TABLE_COLUMN_FAMILY, BEFORE_REBUILD_BEYOND_MAXLOOKBACK_MISSING_INDEX_ROW_COUNT_BYTES,
+ scanMaxTs,
+ Bytes.toBytes(Long.toString(verificationResult.getBefore().getBeyondMaxLookBackMissingIndexRowCount())));
+ put.addColumn(RESULT_TABLE_COLUMN_FAMILY, BEFORE_REBUILD_BEYOND_MAXLOOKBACK_INVALID_INDEX_ROW_COUNT_BYTES,
+ scanMaxTs,
+ Bytes.toBytes(Long.toString(verificationResult.getBefore().getBeyondMaxLookBackInvalidIndexRowCount())));
+ }
+ if (verifyType == IndexTool.IndexVerifyType.AFTER || verifyType == IndexTool.IndexVerifyType.BOTH) {
+ put.addColumn(RESULT_TABLE_COLUMN_FAMILY, AFTER_REBUILD_VALID_INDEX_ROW_COUNT_BYTES,
+ scanMaxTs, Bytes.toBytes(Long.toString(verificationResult.getAfterRebuildValidIndexRowCount())));
+ put.addColumn(RESULT_TABLE_COLUMN_FAMILY, AFTER_REBUILD_EXPIRED_INDEX_ROW_COUNT_BYTES,
+ scanMaxTs, Bytes.toBytes(Long.toString(verificationResult.getAfterRebuildExpiredIndexRowCount())));
+ put.addColumn(RESULT_TABLE_COLUMN_FAMILY, AFTER_REBUILD_MISSING_INDEX_ROW_COUNT_BYTES,
+ scanMaxTs, Bytes.toBytes(Long.toString(verificationResult.getAfterRebuildMissingIndexRowCount())));
+ put.addColumn(RESULT_TABLE_COLUMN_FAMILY, AFTER_REBUILD_INVALID_INDEX_ROW_COUNT_BYTES,
+ scanMaxTs, Bytes.toBytes(Long.toString(verificationResult.getAfterRebuildInvalidIndexRowCount())));
+ put.addColumn(RESULT_TABLE_COLUMN_FAMILY, AFTER_REBUILD_BEYOND_MAXLOOKBACK_MISSING_INDEX_ROW_COUNT_BYTES,
+ scanMaxTs,
+ Bytes.toBytes(Long.toString(verificationResult.getAfter().getBeyondMaxLookBackMissingIndexRowCount())));
+ put.addColumn(RESULT_TABLE_COLUMN_FAMILY, AFTER_REBUILD_BEYOND_MAXLOOKBACK_INVALID_INDEX_ROW_COUNT_BYTES,
+ scanMaxTs,
+ Bytes.toBytes(Long.toString(verificationResult.getAfter().getBeyondMaxLookBackInvalidIndexRowCount())));
+ }
+ resultTable.put(put);
+ }
+
+ public IndexToolVerificationResult getVerificationResult(Connection conn, long ts) throws IOException, SQLException {
+ Table hTable = getTable(conn, RESULT_TABLE_NAME_BYTES);
+ return getVerificationResult(hTable, ts);
+ }
+
+ public Table getTable(Connection conn, byte[] tableName) throws SQLException {
+ return conn.unwrap(PhoenixConnection.class).getQueryServices()
+ .getTable(tableName);
+ }
+
+ public IndexToolVerificationResult getVerificationResult(Table htable, long ts)
+ throws IOException {
+ byte[] startRowKey = Bytes.toBytes(Long.toString(ts));
+ byte[] stopRowKey = ByteUtil.calculateTheClosestNextRowKeyForPrefix(startRowKey);
+ IndexToolVerificationResult verificationResult = new IndexToolVerificationResult(ts);
+ Scan scan = new Scan();
+ scan.withStartRow(startRowKey);
+ scan.withStopRow(stopRowKey);
+ ResultScanner scanner = htable.getScanner(scan);
+ for (Result result = scanner.next(); result != null; result = scanner.next()) {
+ boolean isFirst = true;
+ for (Cell cell : result.rawCells()) {
+ if (isFirst){
+ byte[][] rowKeyParts = ByteUtil.splitArrayBySeparator(result.getRow(),
+ ROW_KEY_SEPARATOR_BYTE[0]);
+ verificationResult.setStartRow(rowKeyParts[3]);
+ verificationResult.setStopRow(rowKeyParts[4]);
+ isFirst = false;
+ }
+ verificationResult.update(cell);
+ }
+ }
+ return verificationResult;
+ }
+
+ public void close() throws IOException {
+ if (resultTable != null) {
+ resultTable.close();
+ }
+ if (indexTable != null) {
+ indexTable.close();
+ }
+ }
+
+ public void setResultTable(Table resultTable) {
+ this.resultTable = resultTable;
+ }
+
+ public void setIndexTable(Table indexTable) {
+ this.indexTable = indexTable;
+ }
+}
+
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/index/PhoenixIndexImportDirectReducer.java b/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/index/PhoenixIndexImportDirectReducer.java
index a24e3ab..4cd2603 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/index/PhoenixIndexImportDirectReducer.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/index/PhoenixIndexImportDirectReducer.java
@@ -25,12 +25,10 @@
import java.util.concurrent.atomic.AtomicBoolean;
import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.hbase.client.Table;
import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.mapreduce.Reducer;
-import org.apache.phoenix.coprocessor.IndexRebuildRegionScanner;
import org.apache.phoenix.coprocessor.IndexToolVerificationResult;
import org.apache.phoenix.coprocessor.TaskRegionObserver;
import org.apache.phoenix.jdbc.PhoenixConnection;
@@ -53,6 +51,7 @@
public class PhoenixIndexImportDirectReducer extends
Reducer<ImmutableBytesWritable, IntWritable, NullWritable, NullWritable> {
private AtomicBoolean calledOnce = new AtomicBoolean(false);
+ private IndexVerificationResultRepository resultRepository;
private static final Logger LOGGER =
LoggerFactory.getLogger(PhoenixIndexImportDirectReducer.class);
@@ -62,10 +61,8 @@
Configuration configuration = context.getConfiguration();
try (final Connection connection = ConnectionUtil.getInputConnection(configuration)) {
long ts = Long.valueOf(configuration.get(PhoenixConfigurationUtil.CURRENT_SCN_VALUE));
- Table hTable = connection.unwrap(PhoenixConnection.class).getQueryServices()
- .getTable(IndexTool.RESULT_TABLE_NAME_BYTES);
IndexToolVerificationResult verificationResult =
- IndexToolVerificationResult.getVerificationResult(hTable, ts);
+ resultRepository.getVerificationResult(connection, ts);
context.getCounter(PhoenixIndexToolJobCounters.SCANNED_DATA_ROW_COUNT).
setValue(verificationResult.getScannedDataRowCount());
context.getCounter(PhoenixIndexToolJobCounters.REBUILT_INDEX_ROW_COUNT).
@@ -108,6 +105,11 @@
}
@Override
+ protected void setup(Context context) throws IOException {
+ resultRepository = new IndexVerificationResultRepository();
+ }
+
+ @Override
protected void reduce(ImmutableBytesWritable arg0, Iterable<IntWritable> arg1,
Reducer<ImmutableBytesWritable, IntWritable, NullWritable, NullWritable>.Context context)
throws IOException, InterruptedException
@@ -133,6 +135,7 @@
protected void cleanup(Context context) throws IOException, InterruptedException{
try {
updateTasksTable(context);
+ resultRepository.close();
} catch (SQLException e) {
LOGGER.error(" Failed to update the tasks table");
throw new RuntimeException(e.getMessage());
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/util/ByteUtil.java b/phoenix-core/src/main/java/org/apache/phoenix/util/ByteUtil.java
index 5a2b624..cc582f7 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/util/ByteUtil.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/util/ByteUtil.java
@@ -23,12 +23,14 @@
import java.io.DataOutput;
import java.io.DataOutputStream;
import java.io.IOException;
+import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.Comparator;
import java.util.List;
import java.util.Set;
+import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.filter.CompareFilter.CompareOp;
import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
import org.apache.hadoop.hbase.util.Bytes;
@@ -606,4 +608,48 @@
}
return true;
}
+
+ public static byte[] calculateTheClosestNextRowKeyForPrefix(byte[] rowKeyPrefix) {
+ // Essentially we are treating it like an 'unsigned very very long' and doing +1 manually.
+ // Search for the place where the trailing 0xFFs start
+ int offset = rowKeyPrefix.length;
+ while (offset > 0) {
+ if (rowKeyPrefix[offset - 1] != (byte) 0xFF) {
+ break;
+ }
+ offset--;
+ }
+ if (offset == 0) {
+ // We got an 0xFFFF... (only FFs) stopRow value which is
+ // the last possible prefix before the end of the table.
+ // So set it to stop at the 'end of the table'
+ return HConstants.EMPTY_END_ROW;
+ }
+ // Copy the right length of the original
+ byte[] newStopRow = Arrays.copyOfRange(rowKeyPrefix, 0, offset);
+ // And increment the last one
+ newStopRow[newStopRow.length - 1]++;
+ return newStopRow;
+ }
+
+ public static byte[][] splitArrayBySeparator(byte[] src, byte separator){
+ List<Integer> separatorLocations = new ArrayList<Integer>();
+ for (int k = 0; k < src.length; k++){
+ if (src[k] == separator){
+ separatorLocations.add(k);
+ }
+ }
+ byte[][] dst = new byte[separatorLocations.size() +1][];
+ int previousSepartor = -1;
+ for (int j = 0; j < separatorLocations.size(); j++){
+ int separatorLocation = separatorLocations.get(j);
+ dst[j] = Bytes.copy(src, previousSepartor +1, separatorLocation- previousSepartor -1);
+ previousSepartor = separatorLocation;
+ }
+ if (previousSepartor < src.length){
+ dst[separatorLocations.size()] = Bytes.copy(src,
+ previousSepartor +1, src.length - previousSepartor -1);
+ }
+ return dst;
+ }
}