blob: 18c140b9eca51eee410963d5ba924deb50864421 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.phoenix.end2end;
import com.google.common.collect.Maps;
import org.apache.hadoop.hbase.Cell;
import org.apache.hadoop.hbase.DoNotRetryIOException;
import org.apache.hadoop.hbase.HColumnDescriptor;
import org.apache.hadoop.hbase.HTableDescriptor;
import org.apache.hadoop.hbase.KeyValue;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.client.Admin;
import org.apache.hadoop.hbase.client.ConnectionFactory;
import org.apache.hadoop.hbase.client.Delete;
import org.apache.hadoop.hbase.client.HBaseAdmin;
import org.apache.hadoop.hbase.client.HConnection;
import org.apache.hadoop.hbase.client.HTableInterface;
import org.apache.hadoop.hbase.client.Result;
import org.apache.hadoop.hbase.client.ResultScanner;
import org.apache.hadoop.hbase.client.Scan;
import org.apache.hadoop.hbase.client.Table;
import org.apache.hadoop.hbase.coprocessor.BaseRegionObserver;
import org.apache.hadoop.hbase.coprocessor.ObserverContext;
import org.apache.hadoop.hbase.coprocessor.RegionCoprocessorEnvironment;
import org.apache.hadoop.hbase.regionserver.RegionScanner;
import org.apache.hadoop.hbase.regionserver.ScanInfoUtil;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.hbase.util.Pair;
import org.apache.hadoop.mapreduce.Counters;
import org.apache.phoenix.coprocessor.IndexRebuildRegionScanner;
import org.apache.phoenix.jdbc.PhoenixConnection;
import org.apache.phoenix.mapreduce.index.IndexTool;
import org.apache.phoenix.mapreduce.index.IndexVerificationOutputRepository;
import org.apache.phoenix.mapreduce.index.IndexVerificationOutputRow;
import org.apache.phoenix.mapreduce.index.IndexVerificationResultRepository;
import org.apache.phoenix.query.ConnectionQueryServices;
import org.apache.phoenix.query.QueryServices;
import org.apache.phoenix.query.QueryServicesOptions;
import org.apache.phoenix.schema.PIndexState;
import org.apache.phoenix.util.EnvironmentEdgeManager;
import org.apache.phoenix.util.IndexScrutiny;
import org.apache.phoenix.util.ManualEnvironmentEdge;
import org.apache.phoenix.util.PropertiesUtil;
import org.apache.phoenix.util.ReadOnlyProps;
import org.apache.phoenix.util.SchemaUtil;
import org.apache.phoenix.util.TestUtil;
import org.junit.After;
import org.junit.Assert;
import org.junit.BeforeClass;
import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.ExpectedException;
import org.junit.runner.RunWith;
import org.junit.runners.Parameterized;
import java.io.IOException;
import java.sql.Connection;
import java.sql.DriverManager;
import java.sql.PreparedStatement;
import java.sql.ResultSet;
import java.sql.SQLException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.List;
import java.util.Map;
import java.util.Properties;
import static org.apache.phoenix.mapreduce.PhoenixJobCounters.INPUT_RECORDS;
import static org.apache.phoenix.mapreduce.index.IndexVerificationResultRepository.INDEX_TOOL_RUN_STATUS_BYTES;
import static org.apache.phoenix.mapreduce.index.IndexVerificationResultRepository.RESULT_TABLE_COLUMN_FAMILY;
import static org.apache.phoenix.mapreduce.index.IndexVerificationResultRepository.RESULT_TABLE_NAME;
import static org.apache.phoenix.mapreduce.index.IndexVerificationResultRepository.RESULT_TABLE_NAME_BYTES;
import static org.apache.phoenix.mapreduce.index.IndexVerificationResultRepository.ROW_KEY_SEPARATOR;
import static org.apache.phoenix.mapreduce.index.IndexVerificationResultRepository.RUN_STATUS_EXECUTED;
import static org.apache.phoenix.mapreduce.index.IndexVerificationResultRepository.RUN_STATUS_SKIPPED;
import static org.apache.phoenix.mapreduce.index.PhoenixIndexToolJobCounters.AFTER_REBUILD_BEYOND_MAXLOOKBACK_INVALID_INDEX_ROW_COUNT;
import static org.apache.phoenix.mapreduce.index.PhoenixIndexToolJobCounters.AFTER_REBUILD_BEYOND_MAXLOOKBACK_MISSING_INDEX_ROW_COUNT;
import static org.apache.phoenix.mapreduce.index.PhoenixIndexToolJobCounters.AFTER_REBUILD_EXPIRED_INDEX_ROW_COUNT;
import static org.apache.phoenix.mapreduce.index.PhoenixIndexToolJobCounters.AFTER_REBUILD_INVALID_INDEX_ROW_COUNT;
import static org.apache.phoenix.mapreduce.index.PhoenixIndexToolJobCounters.AFTER_REBUILD_MISSING_INDEX_ROW_COUNT;
import static org.apache.phoenix.mapreduce.index.PhoenixIndexToolJobCounters.AFTER_REBUILD_VALID_INDEX_ROW_COUNT;
import static org.apache.phoenix.mapreduce.index.PhoenixIndexToolJobCounters.BEFORE_REBUILD_BEYOND_MAXLOOKBACK_INVALID_INDEX_ROW_COUNT;
import static org.apache.phoenix.mapreduce.index.PhoenixIndexToolJobCounters.BEFORE_REBUILD_BEYOND_MAXLOOKBACK_MISSING_INDEX_ROW_COUNT;
import static org.apache.phoenix.mapreduce.index.PhoenixIndexToolJobCounters.BEFORE_REBUILD_EXPIRED_INDEX_ROW_COUNT;
import static org.apache.phoenix.mapreduce.index.PhoenixIndexToolJobCounters.BEFORE_REBUILD_INVALID_INDEX_ROW_COUNT;
import static org.apache.phoenix.mapreduce.index.PhoenixIndexToolJobCounters.BEFORE_REBUILD_MISSING_INDEX_ROW_COUNT;
import static org.apache.phoenix.mapreduce.index.PhoenixIndexToolJobCounters.BEFORE_REBUILD_OLD_INDEX_ROW_COUNT;
import static org.apache.phoenix.mapreduce.index.PhoenixIndexToolJobCounters.BEFORE_REBUILD_UNKNOWN_INDEX_ROW_COUNT;
import static org.apache.phoenix.mapreduce.index.PhoenixIndexToolJobCounters.BEFORE_REBUILD_UNVERIFIED_INDEX_ROW_COUNT;
import static org.apache.phoenix.mapreduce.index.PhoenixIndexToolJobCounters.BEFORE_REBUILD_VALID_INDEX_ROW_COUNT;
import static org.apache.phoenix.mapreduce.index.PhoenixIndexToolJobCounters.REBUILT_INDEX_ROW_COUNT;
import static org.apache.phoenix.mapreduce.index.PhoenixIndexToolJobCounters.SCANNED_DATA_ROW_COUNT;
import static org.apache.phoenix.mapreduce.util.PhoenixConfigurationUtil.CURRENT_SCN_VALUE;
import static org.apache.phoenix.util.TestUtil.TEST_PROPERTIES;
import static org.junit.Assert.assertArrayEquals;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertTrue;
@RunWith(Parameterized.class)
public class IndexToolForNonTxGlobalIndexIT extends BaseUniqueNamesOwnClusterIT {
public static final int MAX_LOOKBACK_AGE = 3600;
private final String tableDDLOptions;
private boolean directApi = true;
private boolean useSnapshot = false;
private boolean mutable;
@Rule
public ExpectedException exceptionRule = ExpectedException.none();
public IndexToolForNonTxGlobalIndexIT(boolean mutable) {
StringBuilder optionBuilder = new StringBuilder();
this.mutable = mutable;
if (!mutable) {
optionBuilder.append(" IMMUTABLE_ROWS=true ");
}
optionBuilder.append(" SPLIT ON(1,2)");
this.tableDDLOptions = optionBuilder.toString();
}
@Parameterized.Parameters(name = "mutable={0}")
public static synchronized Collection<Object[]> data() {
return Arrays.asList(new Object[][] {
{true},
{false} });
}
@BeforeClass
public static synchronized void setup() throws Exception {
Map<String, String> serverProps = Maps.newHashMapWithExpectedSize(2);
serverProps.put(QueryServices.STATS_GUIDEPOST_WIDTH_BYTES_ATTRIB, Long.toString(20));
serverProps.put(QueryServices.MAX_SERVER_METADATA_CACHE_TIME_TO_LIVE_MS_ATTRIB, Long.toString(5));
serverProps.put(QueryServices.EXTRA_JDBC_ARGUMENTS_ATTRIB,
QueryServicesOptions.DEFAULT_EXTRA_JDBC_ARGUMENTS);
serverProps.put(QueryServices.INDEX_REBUILD_PAGE_SIZE_IN_ROWS, Long.toString(8));
serverProps.put(ScanInfoUtil.PHOENIX_MAX_LOOKBACK_AGE_CONF_KEY, Long.toString(MAX_LOOKBACK_AGE));
serverProps.put(QueryServices.TASK_HANDLING_INTERVAL_MS_ATTRIB,
Long.toString(Long.MAX_VALUE));
serverProps.put(QueryServices.TASK_HANDLING_INITIAL_DELAY_MS_ATTRIB,
Long.toString(Long.MAX_VALUE));
Map<String, String> clientProps = Maps.newHashMapWithExpectedSize(2);
clientProps.put(QueryServices.USE_STATS_FOR_PARALLELIZATION, Boolean.toString(true));
clientProps.put(QueryServices.STATS_UPDATE_FREQ_MS_ATTRIB, Long.toString(5));
clientProps.put(QueryServices.TRANSACTIONS_ENABLED, Boolean.TRUE.toString());
clientProps.put(QueryServices.FORCE_ROW_KEY_ORDER_ATTRIB, Boolean.TRUE.toString());
destroyDriver();
setUpTestDriver(new ReadOnlyProps(serverProps.entrySet().iterator()),
new ReadOnlyProps(clientProps.entrySet().iterator()));
//IndexToolIT.runIndexTool pulls from the minicluster's config directly
getUtility().getConfiguration().set(QueryServices.INDEX_REBUILD_RPC_RETRIES_COUNTER, "1");
}
@After
public void cleanup() throws Exception {
Properties props = PropertiesUtil.deepCopy(TEST_PROPERTIES);
try (Connection conn = DriverManager.getConnection(getUrl(), props)) {
deleteAllRows(conn,
TableName.valueOf(IndexVerificationOutputRepository.OUTPUT_TABLE_NAME_BYTES));
deleteAllRows(conn,
TableName.valueOf(IndexVerificationResultRepository.RESULT_TABLE_NAME));
}
EnvironmentEdgeManager.reset();
}
@Test
public void testWithSetNull() throws Exception {
// This tests the cases where a column having a null value is overwritten with a not null value and vice versa;
// and after that the index table is still rebuilt correctly
if(!this.mutable) {
return;
}
final int NROWS = 2 * 3 * 5 * 7;
String schemaName = generateUniqueName();
String dataTableName = generateUniqueName();
String dataTableFullName = SchemaUtil.getTableName(schemaName, dataTableName);
String indexTableName = generateUniqueName();
String indexTableFullName = SchemaUtil.getTableName(schemaName, indexTableName);
Properties props = PropertiesUtil.deepCopy(TEST_PROPERTIES);
try (Connection conn = DriverManager.getConnection(getUrl(), props)) {
conn.createStatement().execute("CREATE TABLE " + dataTableFullName
+ " (ID INTEGER NOT NULL PRIMARY KEY, VAL1 INTEGER, VAL2 INTEGER) "
+ tableDDLOptions);
String upsertStmt = "UPSERT INTO " + dataTableFullName + " VALUES(?,?,?)";
PreparedStatement stmt = conn.prepareStatement(upsertStmt);
IndexToolIT.setEveryNthRowWithNull(NROWS, 2, stmt);
conn.commit();
IndexToolIT.setEveryNthRowWithNull(NROWS, 3, stmt);
conn.commit();
conn.createStatement().execute(String.format(
"CREATE INDEX %s ON %s (VAL1) INCLUDE (VAL2) ASYNC ", indexTableName, dataTableFullName));
// Run the index MR job and verify that the index table is built correctly
IndexTool
indexTool = IndexToolIT.runIndexTool(directApi, useSnapshot, schemaName, dataTableName, indexTableName, null, 0, new String[0]);
assertEquals(NROWS, indexTool.getJob().getCounters().findCounter(INPUT_RECORDS).getValue());
assertTrue("Index rebuild failed!", indexTool.getJob().isSuccessful());
TestUtil.assertIndexState(conn, indexTableFullName, PIndexState.ACTIVE, null);
long actualRowCount = IndexScrutiny
.scrutinizeIndex(conn, dataTableFullName, indexTableFullName);
assertEquals(NROWS, actualRowCount);
IndexToolIT.setEveryNthRowWithNull(NROWS, 5, stmt);
conn.commit();
actualRowCount = IndexScrutiny.scrutinizeIndex(conn, dataTableFullName, indexTableFullName);
assertEquals(NROWS, actualRowCount);
IndexToolIT.setEveryNthRowWithNull(NROWS, 7, stmt);
conn.commit();
actualRowCount = IndexScrutiny.scrutinizeIndex(conn, dataTableFullName, indexTableFullName);
assertEquals(NROWS, actualRowCount);
actualRowCount = IndexScrutiny.scrutinizeIndex(conn, dataTableFullName, indexTableFullName);
assertEquals(NROWS, actualRowCount);
indexTool = IndexToolIT.runIndexTool(directApi, useSnapshot, schemaName, dataTableName, indexTableName, null,
0, IndexTool.IndexVerifyType.ONLY, new String[0]);
assertEquals(NROWS, indexTool.getJob().getCounters().findCounter(INPUT_RECORDS).getValue());
assertEquals(NROWS, indexTool.getJob().getCounters().findCounter(SCANNED_DATA_ROW_COUNT).getValue());
assertEquals(0, indexTool.getJob().getCounters().findCounter(REBUILT_INDEX_ROW_COUNT).getValue());
assertEquals(NROWS, indexTool.getJob().getCounters().findCounter(BEFORE_REBUILD_VALID_INDEX_ROW_COUNT).getValue());
assertEquals(0, indexTool.getJob().getCounters().findCounter(BEFORE_REBUILD_EXPIRED_INDEX_ROW_COUNT).getValue());
assertEquals(0, indexTool.getJob().getCounters().findCounter(BEFORE_REBUILD_INVALID_INDEX_ROW_COUNT).getValue());
assertEquals(0, indexTool.getJob().getCounters().findCounter(BEFORE_REBUILD_MISSING_INDEX_ROW_COUNT).getValue());
assertEquals(0, indexTool.getJob().getCounters().findCounter(BEFORE_REBUILD_BEYOND_MAXLOOKBACK_INVALID_INDEX_ROW_COUNT).getValue());
assertEquals(0, indexTool.getJob().getCounters().findCounter(BEFORE_REBUILD_BEYOND_MAXLOOKBACK_MISSING_INDEX_ROW_COUNT).getValue());
}
}
@Test
public void testIndexToolVerifyWithExpiredIndexRows() throws Exception {
String schemaName = generateUniqueName();
String dataTableName = generateUniqueName();
String dataTableFullName = SchemaUtil.getTableName(schemaName, dataTableName);
String indexTableName = generateUniqueName();
String indexTableFullName = SchemaUtil.getTableName(schemaName, indexTableName);
Properties props = PropertiesUtil.deepCopy(TEST_PROPERTIES);
try (Connection conn = DriverManager.getConnection(getUrl(), props)) {
conn.createStatement().execute("CREATE TABLE " + dataTableFullName
+ " (ID INTEGER NOT NULL PRIMARY KEY, NAME VARCHAR, CODE VARCHAR) COLUMN_ENCODED_BYTES=0");
// Insert a row
conn.createStatement()
.execute("upsert into " + dataTableFullName + " values (1, 'Phoenix', 'A')");
conn.commit();
conn.createStatement()
.execute(String.format("CREATE INDEX %s ON %s (NAME) INCLUDE (CODE) ASYNC",
indexTableName, dataTableFullName));
IndexToolIT.runIndexTool(directApi, useSnapshot, schemaName, dataTableName, indexTableName, null, 0,
IndexTool.IndexVerifyType.ONLY);
Cell cell =
IndexToolIT.getErrorMessageFromIndexToolOutputTable(conn, dataTableFullName,
indexTableFullName);
try {
String expectedErrorMsg = IndexRebuildRegionScanner.ERROR_MESSAGE_MISSING_INDEX_ROW;
String actualErrorMsg = Bytes
.toString(cell.getValueArray(), cell.getValueOffset(), cell.getValueLength());
assertTrue(expectedErrorMsg.equals(actualErrorMsg));
} catch(Exception ex) {
Assert.fail("Fail to parsing the error message from IndexToolOutputTable");
}
// Run the index tool to populate the index while verifying rows
IndexToolIT.runIndexTool(directApi, useSnapshot, schemaName, dataTableName, indexTableName, null, 0,
IndexTool.IndexVerifyType.AFTER);
// Set ttl of index table ridiculously low so that all data is expired
Admin admin = conn.unwrap(PhoenixConnection.class).getQueryServices().getAdmin();
TableName indexTable = TableName.valueOf(indexTableFullName);
HColumnDescriptor desc = admin.getTableDescriptor(indexTable).getColumnFamilies()[0];
desc.setTimeToLive(1);
admin.modifyColumn(indexTable, desc);
Thread.sleep(1000);
Pair<Integer, Integer> status = admin.getAlterStatus(indexTable);
int retry = 0;
while (retry < 20 && status.getFirst() != 0) {
Thread.sleep(2000);
status = admin.getAlterStatus(indexTable);
}
assertTrue(status.getFirst() == 0);
TableName indexToolOutputTable = TableName.valueOf(IndexVerificationOutputRepository.OUTPUT_TABLE_NAME_BYTES);
admin.disableTable(indexToolOutputTable);
admin.deleteTable(indexToolOutputTable);
// Run the index tool using the only-verify option, verify it gives no mismatch
IndexToolIT.runIndexTool(directApi, useSnapshot, schemaName, dataTableName, indexTableName, null, 0,
IndexTool.IndexVerifyType.ONLY);
Scan scan = new Scan();
Table hIndexToolTable =
conn.unwrap(PhoenixConnection.class).getQueryServices()
.getTable(indexToolOutputTable.getName());
Result r = hIndexToolTable.getScanner(scan).next();
assertTrue(r == null);
}
}
@Test
public void testSecondaryGlobalIndexFailure() throws Exception {
if (!mutable) {
return; //nothing in this test is mutable specific, so no need to run twice
}
String schemaName = generateUniqueName();
String dataTableName = generateUniqueName();
String dataTableFullName = SchemaUtil.getTableName(schemaName, dataTableName);
String indexTableName = generateUniqueName();
Properties props = PropertiesUtil.deepCopy(TEST_PROPERTIES);
try (Connection conn = DriverManager.getConnection(getUrl(), props)) {
String stmString1 =
"CREATE TABLE " + dataTableFullName
+ " (ID INTEGER NOT NULL PRIMARY KEY, NAME VARCHAR, ZIP INTEGER) "
+ tableDDLOptions;
conn.createStatement().execute(stmString1);
conn.commit();
String upsertQuery = String.format("UPSERT INTO %s VALUES(?, ?, ?)", dataTableFullName);
PreparedStatement stmt1 = conn.prepareStatement(upsertQuery);
// Insert two rows
IndexToolIT.upsertRow(stmt1, 1);
IndexToolIT.upsertRow(stmt1, 2);
conn.commit();
String stmtString2 =
String.format(
"CREATE INDEX %s ON %s (LPAD(UPPER(NAME, 'en_US'),8,'x')||'_xyz') ", indexTableName, dataTableFullName);
conn.createStatement().execute(stmtString2);
conn.commit();
String qIndexTableName = SchemaUtil.getQualifiedTableName(schemaName, indexTableName);
// Verify that the index table is in the ACTIVE state
assertEquals(PIndexState.ACTIVE, TestUtil.getIndexState(conn, qIndexTableName));
ConnectionQueryServices queryServices = conn.unwrap(PhoenixConnection.class).getQueryServices();
Admin admin = queryServices.getAdmin();
TableName tn = TableName.valueOf(Bytes.toBytes(dataTableFullName));
HTableDescriptor td =
admin.getTableDescriptor(tn);
//add the fast fail coproc and make sure it goes first
td.addCoprocessor(FastFailRegionObserver.class.getName(), null, 1, null);
admin.modifyTable(tn, td);
// Run the index MR job and it should fail (return -1)
IndexToolIT.runIndexTool(directApi, useSnapshot, schemaName, dataTableName, indexTableName,
null, -1, new String[0]);
// Verify that the index table should be still in the ACTIVE state
assertEquals(PIndexState.ACTIVE, TestUtil.getIndexState(conn, qIndexTableName));
}
}
@Test
public void testBuildSecondaryIndexAndScrutinize() throws Exception {
String schemaName = generateUniqueName();
String dataTableName = generateUniqueName();
String dataTableFullName = SchemaUtil.getTableName(schemaName, dataTableName);
String indexTableName = generateUniqueName();
String indexTableFullName = SchemaUtil.getTableName(schemaName, indexTableName);
Properties props = PropertiesUtil.deepCopy(TEST_PROPERTIES);
try (Connection conn = DriverManager.getConnection(getUrl(), props)) {
String stmString1 =
"CREATE TABLE " + dataTableFullName
+ " (ID INTEGER NOT NULL PRIMARY KEY, NAME VARCHAR, ZIP INTEGER) "
+ tableDDLOptions;
conn.createStatement().execute(stmString1);
String upsertQuery = String.format("UPSERT INTO %s VALUES(?, ?, ?)", dataTableFullName);
PreparedStatement stmt1 = conn.prepareStatement(upsertQuery);
// Insert NROWS rows
final int NROWS = 1000;
for (int i = 0; i < NROWS; i++) {
IndexToolIT.upsertRow(stmt1, i);
}
conn.commit();
String stmtString2 =
String.format(
"CREATE INDEX %s ON %s (NAME) INCLUDE (ZIP) ASYNC ", indexTableName, dataTableFullName);
conn.createStatement().execute(stmtString2);
// Run the index MR job and verify that the index table is built correctly
IndexTool indexTool = IndexToolIT.runIndexTool(directApi, useSnapshot, schemaName, dataTableName, indexTableName, null, 0, IndexTool.IndexVerifyType.BEFORE, new String[0]);
assertEquals(NROWS, indexTool.getJob().getCounters().findCounter(INPUT_RECORDS).getValue());
assertEquals(NROWS, indexTool.getJob().getCounters().findCounter(SCANNED_DATA_ROW_COUNT).getValue());
assertEquals(NROWS, indexTool.getJob().getCounters().findCounter(REBUILT_INDEX_ROW_COUNT).getValue());
assertEquals(0, indexTool.getJob().getCounters().findCounter(BEFORE_REBUILD_VALID_INDEX_ROW_COUNT).getValue());
assertEquals(0, indexTool.getJob().getCounters().findCounter(BEFORE_REBUILD_EXPIRED_INDEX_ROW_COUNT).getValue());
assertEquals(0, indexTool.getJob().getCounters().findCounter(BEFORE_REBUILD_INVALID_INDEX_ROW_COUNT).getValue());
assertEquals(NROWS, indexTool.getJob().getCounters().findCounter(BEFORE_REBUILD_MISSING_INDEX_ROW_COUNT).getValue());
assertEquals(0, indexTool.getJob().getCounters().findCounter(BEFORE_REBUILD_BEYOND_MAXLOOKBACK_MISSING_INDEX_ROW_COUNT).getValue());
assertEquals(0, indexTool.getJob().getCounters().findCounter(BEFORE_REBUILD_BEYOND_MAXLOOKBACK_INVALID_INDEX_ROW_COUNT).getValue());
long actualRowCount = IndexScrutiny.scrutinizeIndex(conn, dataTableFullName, indexTableFullName);
assertEquals(NROWS, actualRowCount);
// Add more rows and make sure that these rows will be visible to IndexTool
for (int i = NROWS; i < 2 * NROWS; i++) {
IndexToolIT.upsertRow(stmt1, i);
}
conn.commit();
indexTool = IndexToolIT.runIndexTool(directApi, useSnapshot, schemaName, dataTableName, indexTableName, null, 0, IndexTool.IndexVerifyType.BOTH, new String[0]);
assertEquals(2 * NROWS, indexTool.getJob().getCounters().findCounter(INPUT_RECORDS).getValue());
assertEquals(2 * NROWS, indexTool.getJob().getCounters().findCounter(SCANNED_DATA_ROW_COUNT).getValue());
assertEquals(0, indexTool.getJob().getCounters().findCounter(REBUILT_INDEX_ROW_COUNT).getValue());
assertEquals(2 * NROWS, indexTool.getJob().getCounters().findCounter(BEFORE_REBUILD_VALID_INDEX_ROW_COUNT).getValue());
assertEquals(0, indexTool.getJob().getCounters().findCounter(BEFORE_REBUILD_EXPIRED_INDEX_ROW_COUNT).getValue());
assertEquals(0, indexTool.getJob().getCounters().findCounter(BEFORE_REBUILD_INVALID_INDEX_ROW_COUNT).getValue());
assertEquals(0, indexTool.getJob().getCounters().findCounter(BEFORE_REBUILD_MISSING_INDEX_ROW_COUNT).getValue());
assertEquals(0, indexTool.getJob().getCounters().findCounter(BEFORE_REBUILD_BEYOND_MAXLOOKBACK_MISSING_INDEX_ROW_COUNT).getValue());
assertEquals(0, indexTool.getJob().getCounters().findCounter(BEFORE_REBUILD_BEYOND_MAXLOOKBACK_INVALID_INDEX_ROW_COUNT).getValue());
assertEquals(0, indexTool.getJob().getCounters().findCounter(BEFORE_REBUILD_UNVERIFIED_INDEX_ROW_COUNT).getValue());
assertEquals(0, indexTool.getJob().getCounters().findCounter(BEFORE_REBUILD_OLD_INDEX_ROW_COUNT).getValue());
assertEquals(0, indexTool.getJob().getCounters().findCounter(BEFORE_REBUILD_UNKNOWN_INDEX_ROW_COUNT).getValue());
assertEquals(0, indexTool.getJob().getCounters().findCounter(AFTER_REBUILD_VALID_INDEX_ROW_COUNT).getValue());
assertEquals(0, indexTool.getJob().getCounters().findCounter(AFTER_REBUILD_EXPIRED_INDEX_ROW_COUNT).getValue());
assertEquals(0, indexTool.getJob().getCounters().findCounter(AFTER_REBUILD_INVALID_INDEX_ROW_COUNT).getValue());
assertEquals(0, indexTool.getJob().getCounters().findCounter(AFTER_REBUILD_MISSING_INDEX_ROW_COUNT).getValue());
assertEquals(0, indexTool.getJob().getCounters().findCounter(AFTER_REBUILD_BEYOND_MAXLOOKBACK_MISSING_INDEX_ROW_COUNT).getValue());
assertEquals(0, indexTool.getJob().getCounters().findCounter(AFTER_REBUILD_BEYOND_MAXLOOKBACK_INVALID_INDEX_ROW_COUNT).getValue());
actualRowCount = IndexScrutiny.scrutinizeIndex(conn, dataTableFullName, indexTableFullName);
assertEquals(2 * NROWS, actualRowCount);
}
}
@Test
public void testIndexToolVerifyBeforeAndBothOptions() throws Exception {
Properties props = PropertiesUtil.deepCopy(TEST_PROPERTIES);
try (Connection conn = DriverManager.getConnection(getUrl(), props)) {
String schemaName = generateUniqueName();
String dataTableName = generateUniqueName();
String dataTableFullName = SchemaUtil.getTableName(schemaName, dataTableName);
String indexTableName = generateUniqueName();
String viewName = generateUniqueName();
String viewFullName = SchemaUtil.getTableName(schemaName, viewName);
conn.createStatement().execute("CREATE TABLE " + dataTableFullName
+ " (ID INTEGER NOT NULL PRIMARY KEY, NAME VARCHAR, ZIP INTEGER) "
+ tableDDLOptions);
conn.commit();
conn.createStatement().execute("CREATE VIEW " + viewFullName + " AS SELECT * FROM " + dataTableFullName);
conn.commit();
// Insert a row
conn.createStatement().execute("upsert into " + viewFullName + " values (1, 'Phoenix', 12345)");
conn.commit();
conn.createStatement().execute(String.format(
"CREATE INDEX %s ON %s (NAME) INCLUDE (ZIP) ASYNC", indexTableName, viewFullName));
TestUtil.addCoprocessor(conn, "_IDX_" + dataTableFullName, IndexToolIT.MutationCountingRegionObserver.class);
// Run the index MR job and verify that the index table rebuild succeeds
IndexToolIT.runIndexTool(directApi, useSnapshot, schemaName, viewName, indexTableName,
null, 0, IndexTool.IndexVerifyType.AFTER);
assertEquals(1, IndexToolIT.MutationCountingRegionObserver.getMutationCount());
IndexToolIT.MutationCountingRegionObserver.setMutationCount(0);
// Since all the rows are in the index table, running the index tool with the "-v BEFORE" option should not
// write any index rows
IndexToolIT.runIndexTool(directApi, useSnapshot, schemaName, viewName, indexTableName,
null, 0, IndexTool.IndexVerifyType.BEFORE);
assertEquals(0, IndexToolIT.MutationCountingRegionObserver.getMutationCount());
// The "-v BOTH" option should not write any index rows either
IndexToolIT.runIndexTool(directApi, useSnapshot, schemaName, viewName, indexTableName,
null, 0, IndexTool.IndexVerifyType.BOTH);
assertEquals(0, IndexToolIT.MutationCountingRegionObserver.getMutationCount());
}
}
@Test
public void testIndexToolVerifyAfterOption() throws Exception {
Properties props = PropertiesUtil.deepCopy(TEST_PROPERTIES);
try (Connection conn = DriverManager.getConnection(getUrl(), props)) {
String schemaName = generateUniqueName();
String dataTableName = generateUniqueName();
String dataTableFullName = SchemaUtil.getTableName(schemaName, dataTableName);
String indexTableName = generateUniqueName();
String viewName = generateUniqueName();
String viewFullName = SchemaUtil.getTableName(schemaName, viewName);
conn.createStatement().execute("CREATE TABLE " + dataTableFullName
+ " (ID INTEGER NOT NULL PRIMARY KEY, NAME VARCHAR, ZIP INTEGER) "
+ tableDDLOptions);
conn.commit();
conn.createStatement().execute("CREATE VIEW " + viewFullName + " AS SELECT * FROM " + dataTableFullName);
conn.commit();
// Insert a row
conn.createStatement().execute("upsert into " + viewFullName + " values (1, 'Phoenix', 12345)");
conn.commit();
// Configure IndexRegionObserver to fail the first write phase. This should not
// lead to any change on index and thus the index verify during index rebuild should fail
IndexRebuildRegionScanner.setIgnoreIndexRebuildForTesting(true);
conn.createStatement().execute(String.format(
"CREATE INDEX %s ON %s (NAME) INCLUDE (ZIP) ASYNC", indexTableName, viewFullName));
// Run the index MR job and verify that the index table rebuild fails
IndexToolIT.runIndexTool(directApi, useSnapshot, schemaName, viewName, indexTableName,
null, -1, IndexTool.IndexVerifyType.AFTER);
// The index tool output table should report that there is a missing index row
Cell cell = IndexToolIT.getErrorMessageFromIndexToolOutputTable(conn, dataTableFullName, "_IDX_" + dataTableFullName);
try {
String expectedErrorMsg = IndexRebuildRegionScanner.ERROR_MESSAGE_MISSING_INDEX_ROW;
String actualErrorMsg = Bytes.toString(cell.getValueArray(), cell.getValueOffset(), cell.getValueLength());
assertTrue(expectedErrorMsg.equals(actualErrorMsg));
} catch(Exception ex){
Assert.fail("Fail to parsing the error message from IndexToolOutputTable");
}
IndexRebuildRegionScanner.setIgnoreIndexRebuildForTesting(false);
}
}
@Test
public void testIndexToolOnlyVerifyOption() throws Exception {
String schemaName = generateUniqueName();
String dataTableName = generateUniqueName();
String dataTableFullName = SchemaUtil.getTableName(schemaName, dataTableName);
String indexTableName = generateUniqueName();
String indexTableFullName = SchemaUtil.getTableName(schemaName, indexTableName);
Properties props = PropertiesUtil.deepCopy(TEST_PROPERTIES);
try (Connection conn = DriverManager.getConnection(getUrl(), props)) {
conn.createStatement().execute("CREATE TABLE " + dataTableFullName
+ " (ID INTEGER NOT NULL PRIMARY KEY, NAME VARCHAR, CODE VARCHAR) COLUMN_ENCODED_BYTES=0");
// Insert a row
conn.createStatement().execute("upsert into " + dataTableFullName + " values (1, 'Phoenix', 'A')");
conn.commit();
conn.createStatement().execute(String.format(
"CREATE INDEX %s ON %s (NAME) INCLUDE (CODE) ASYNC", indexTableName, dataTableFullName));
// Run the index MR job to only verify that each data table row has a corresponding index row
// IndexTool will go through each data table row and record the mismatches in the output table
// called PHOENIX_INDEX_TOOL
IndexToolIT.runIndexTool(directApi, useSnapshot, schemaName, dataTableName, indexTableName,
null, 0, IndexTool.IndexVerifyType.ONLY);
Cell cell = IndexToolIT.getErrorMessageFromIndexToolOutputTable(conn, dataTableFullName, indexTableFullName);
try {
String expectedErrorMsg = IndexRebuildRegionScanner.ERROR_MESSAGE_MISSING_INDEX_ROW;
String actualErrorMsg = Bytes.toString(cell.getValueArray(), cell.getValueOffset(), cell.getValueLength());
assertTrue(expectedErrorMsg.equals(actualErrorMsg));
} catch(Exception ex) {
Assert.fail("Fail to parsing the error message from IndexToolOutputTable");
}
// VERIFY option should not change the index state.
Assert.assertEquals(PIndexState.BUILDING, TestUtil.getIndexState(conn, indexTableFullName));
// Delete the output table for the next test
deleteAllRows(conn,
TableName.valueOf(IndexVerificationOutputRepository.OUTPUT_TABLE_NAME));
// Run the index tool to populate the index while verifying rows
IndexToolIT.runIndexTool(directApi, useSnapshot, schemaName, dataTableName, indexTableName,
null, 0, IndexTool.IndexVerifyType.AFTER);
IndexToolIT.runIndexTool(directApi, useSnapshot, schemaName, dataTableName, indexTableName,
null, 0, IndexTool.IndexVerifyType.ONLY);
}
}
@Test
public void testIndexToolForIncrementalRebuild() throws Exception {
String schemaName = generateUniqueName();
String dataTableName = generateUniqueName();
String dataTableFullName = SchemaUtil.getTableName(schemaName, dataTableName);
String indexTableName = generateUniqueName();
String indexTableFullName = SchemaUtil.getTableName(schemaName, indexTableName);
Properties props = PropertiesUtil.deepCopy(TEST_PROPERTIES);
List<String> expectedStatus = new ArrayList<>();
try (Connection conn = DriverManager.getConnection(getUrl(), props)) {
//this only seems to work if you truncate, rather than use deleteAllRows, and I'm
//not sure why because deleteAllRows seems to work fine for the output table
Admin admin = conn.unwrap(PhoenixConnection.class).getQueryServices().getAdmin();
admin.disableTable(TableName.valueOf(RESULT_TABLE_NAME));
admin.truncateTable(TableName.valueOf(RESULT_TABLE_NAME), true);
conn.createStatement().execute("CREATE TABLE " + dataTableFullName
+ " (ID INTEGER NOT NULL PRIMARY KEY, NAME VARCHAR, CODE VARCHAR) "+tableDDLOptions);
conn.createStatement().execute(String.format(
"CREATE INDEX %s ON %s (NAME) INCLUDE (CODE)", indexTableName, dataTableFullName));
conn.createStatement().execute("upsert into " + dataTableFullName + " values (1, 'Phoenix', 'A')");
conn.createStatement().execute("upsert into " + dataTableFullName + " values (2, 'Phoenix1', 'B')");
conn.commit();
IndexTool it = IndexToolIT.runIndexTool(directApi, useSnapshot, schemaName, dataTableName, indexTableName,
null, 0, IndexTool.IndexVerifyType.AFTER);
Long scn = it.getJob().getConfiguration().getLong(CURRENT_SCN_VALUE, 1L);
expectedStatus.add(RUN_STATUS_EXECUTED);
expectedStatus.add(RUN_STATUS_EXECUTED);
expectedStatus.add(RUN_STATUS_EXECUTED);
try {
verifyRunStatusFromResultTable(conn, scn, indexTableFullName, 3, expectedStatus);
} catch (AssertionError ae) {
TestUtil.dumpTable(conn, TableName.valueOf(RESULT_TABLE_NAME));
throw ae;
}
deleteOneRowFromResultTable(conn, scn, indexTableFullName);
it = IndexToolIT.runIndexTool(directApi, useSnapshot, schemaName, dataTableName, indexTableName,
null, 0, IndexTool.IndexVerifyType.AFTER, "-rv", Long.toString(scn));
scn = it.getJob().getConfiguration().getLong(CURRENT_SCN_VALUE, 1L);
expectedStatus.set(0, RUN_STATUS_EXECUTED);
expectedStatus.set(1, RUN_STATUS_SKIPPED);
expectedStatus.set(2, RUN_STATUS_SKIPPED);
verifyRunStatusFromResultTable(conn, scn, indexTableFullName, 5, expectedStatus);
deleteAllRows(conn, TableName.valueOf(indexTableFullName));
expectedStatus.set(0, RUN_STATUS_SKIPPED);
expectedStatus.set(1, RUN_STATUS_SKIPPED);
expectedStatus.set(2, RUN_STATUS_SKIPPED);
it = IndexToolIT.runIndexTool(directApi, useSnapshot, schemaName, dataTableName, indexTableName,
null, 0, IndexTool.IndexVerifyType.AFTER, "-rv", Long.toString(scn));
scn = it.getJob().getConfiguration().getLong(CURRENT_SCN_VALUE, 1L);
verifyRunStatusFromResultTable(conn, scn, indexTableFullName, 8, expectedStatus);
ResultSet rs = conn.createStatement().executeQuery("SELECT * FROM " + indexTableFullName);
Assert.assertFalse(rs.next());
//testing the dependent method
Assert.assertFalse(it.isValidLastVerifyTime(10L));
Assert.assertFalse(it.isValidLastVerifyTime(EnvironmentEdgeManager.currentTimeMillis() - 1000L));
Assert.assertTrue(it.isValidLastVerifyTime(scn));
}
}
@Test
public void testIndexToolForIncrementalVerify() throws Exception {
ManualEnvironmentEdge customEdge = new ManualEnvironmentEdge();
String schemaName = generateUniqueName();
String dataTableName = generateUniqueName();
String viewName = generateUniqueName();
String dataTableFullName = SchemaUtil.getTableName(schemaName, dataTableName);
String viewFullName = SchemaUtil.getTableName(schemaName, viewName);
String indexTableName = generateUniqueName();
String viewIndexName = generateUniqueName();
long waitForUpsert = 2;
Properties props = PropertiesUtil.deepCopy(TEST_PROPERTIES);
try (Connection conn = DriverManager.getConnection(getUrl(), props)) {
conn.setAutoCommit(true);
conn.createStatement().execute("CREATE TABLE "+dataTableFullName+" "
+ "(key1 BIGINT NOT NULL, key2 BIGINT NOT NULL, val1 VARCHAR, val2 BIGINT, "
+ "val3 BIGINT, val4 DOUBLE, val5 BIGINT, val6 VARCHAR "
+ "CONSTRAINT my_pk PRIMARY KEY(key1, key2)) "+tableDDLOptions);
conn.createStatement().execute("CREATE VIEW "+viewFullName+" AS SELECT * FROM "+dataTableFullName);
conn.createStatement().execute(String.format(
"CREATE INDEX "+viewIndexName+" ON "+viewFullName+" (val3) INCLUDE(val5)"));
conn.createStatement().execute(String.format(
"CREATE INDEX "+indexTableName+" ON "+dataTableFullName+" (val3) INCLUDE(val5)"));
customEdge.setValue(EnvironmentEdgeManager.currentTimeMillis());
EnvironmentEdgeManager.injectEdge(customEdge);
long t0 = customEdge.currentTime();
customEdge.incrementValue(waitForUpsert);
conn.createStatement().execute("UPSERT INTO "+viewFullName+"(key1, key2, val1, val2) VALUES (4,5,'abc',3)");
customEdge.incrementValue(waitForUpsert);
long t1 = customEdge.currentTime();
customEdge.incrementValue(waitForUpsert);
conn.createStatement().execute("UPSERT INTO "+viewFullName+"(key1, key2, val1, val2) VALUES (1,2,'abc',3)");
customEdge.incrementValue(waitForUpsert);
long t2 = customEdge.currentTime();
customEdge.incrementValue(waitForUpsert);
conn.createStatement().execute("UPSERT INTO "+viewFullName+"(key1, key2, val3, val4) VALUES (1,2,4,1.2)");
customEdge.incrementValue(waitForUpsert);
long t3 = customEdge.currentTime();
customEdge.incrementValue(waitForUpsert);
conn.createStatement().execute("UPSERT INTO "+viewFullName+"(key1, key2, val5, val6) VALUES (1,2,5,'def')");
customEdge.incrementValue(waitForUpsert);
long t4 = customEdge.currentTime();
customEdge.incrementValue(waitForUpsert);
conn.createStatement().execute("DELETE FROM "+viewFullName+" WHERE key1=4");
customEdge.incrementValue(waitForUpsert);
long t5 = customEdge.currentTime();
customEdge.incrementValue(10);
long t6 = customEdge.currentTime();
IndexTool it;
if(!mutable) {
// job with 2 rows
it = IndexToolIT.runIndexTool(directApi, useSnapshot, schemaName, dataTableName, indexTableName,
null, 0, IndexTool.IndexVerifyType.ONLY, "-st", String.valueOf(t0),"-et", String.valueOf(t2));
verifyCounters(it, 2, 2);
//increment time between rebuilds so that PHOENIX_INDEX_TOOL and
// PHOENIX_INDEX_TOOL_RESULT tables get unique keys for each run
customEdge.incrementValue(waitForUpsert);
// only one row
it = IndexToolIT.runIndexTool(directApi, useSnapshot, schemaName, dataTableName, indexTableName,
null, 0, IndexTool.IndexVerifyType.ONLY, "-st", String.valueOf(t1),"-et", String.valueOf(t2));
verifyCounters(it, 1, 1);
customEdge.incrementValue(waitForUpsert);
// no rows
it = IndexToolIT.runIndexTool(directApi, useSnapshot, schemaName, dataTableName, indexTableName,
null, 0, IndexTool.IndexVerifyType.ONLY, "-st", String.valueOf(t5),"-et", String.valueOf(t6));
verifyCounters(it, 0, 0);
customEdge.incrementValue(waitForUpsert);
//view index
// job with 2 rows
it = IndexToolIT.runIndexTool(directApi, useSnapshot, schemaName, viewName, viewIndexName,
null, 0, IndexTool.IndexVerifyType.ONLY, "-st", String.valueOf(t0),"-et", String.valueOf(t2));
verifyCounters(it, 2, 2);
customEdge.incrementValue(waitForUpsert);
// only one row
it = IndexToolIT.runIndexTool(directApi, useSnapshot, schemaName, viewName, viewIndexName,
null, 0, IndexTool.IndexVerifyType.ONLY, "-st", String.valueOf(t1),"-et", String.valueOf(t2));
verifyCounters(it, 1, 1);
customEdge.incrementValue(waitForUpsert);
// no rows
it = IndexToolIT.runIndexTool(directApi, useSnapshot, schemaName, viewName, viewIndexName,
null, 0, IndexTool.IndexVerifyType.ONLY, "-st", String.valueOf(t5),"-et", String.valueOf(t6));
verifyCounters(it, 0, 0);
customEdge.incrementValue(waitForUpsert);
return;
}
// regular job without delete row
it = IndexToolIT.runIndexTool(directApi, useSnapshot, schemaName, dataTableName, indexTableName,
null, 0, IndexTool.IndexVerifyType.ONLY, "-st", String.valueOf(t0),"-et", String.valueOf(t4));
verifyCounters(it, 2, 3);
customEdge.incrementValue(waitForUpsert);
// job with 2 rows
it = IndexToolIT.runIndexTool(directApi, useSnapshot, schemaName, dataTableName, indexTableName,
null, 0, IndexTool.IndexVerifyType.ONLY, "-st", String.valueOf(t0),"-et", String.valueOf(t2));
verifyCounters(it, 2, 2);
customEdge.incrementValue(waitForUpsert);
// job with update on only one row
it = IndexToolIT.runIndexTool(directApi, useSnapshot, schemaName, dataTableName, indexTableName,
null, 0, IndexTool.IndexVerifyType.ONLY, "-st", String.valueOf(t1),"-et", String.valueOf(t3));
verifyCounters(it, 1, 2);
customEdge.incrementValue(waitForUpsert);
// job with update on only one row
it = IndexToolIT.runIndexTool(directApi, useSnapshot, schemaName, dataTableName, indexTableName,
null, 0, IndexTool.IndexVerifyType.ONLY, "-st", String.valueOf(t2),"-et", String.valueOf(t4));
verifyCounters(it, 1, 2);
customEdge.incrementValue(waitForUpsert);
// job with update on only one row
it = IndexToolIT.runIndexTool(directApi, useSnapshot, schemaName, dataTableName, indexTableName,
null, 0, IndexTool.IndexVerifyType.ONLY, "-st", String.valueOf(t4),"-et", String.valueOf(t5));
verifyCounters(it, 1, 1);
customEdge.incrementValue(waitForUpsert);
// job with no new updates on any row
it = IndexToolIT.runIndexTool(directApi, useSnapshot, schemaName, dataTableName, indexTableName,
null, 0, IndexTool.IndexVerifyType.ONLY, "-st", String.valueOf(t5),"-et", String.valueOf(t6));
verifyCounters(it, 0, 0);
customEdge.incrementValue(waitForUpsert);
} finally {
EnvironmentEdgeManager.reset();
}
}
@Test
public void testIndexToolForIncrementalVerify_viewIndex() throws Exception {
ManualEnvironmentEdge customeEdge = new ManualEnvironmentEdge();
String schemaName = generateUniqueName();
String dataTableName = generateUniqueName();
String viewName = generateUniqueName();
String viewFullName = SchemaUtil.getTableName(schemaName, viewName);
String dataTableFullName = SchemaUtil.getTableName(schemaName, dataTableName);
String viewIndexName = generateUniqueName();
long waitForUpsert = 2;
Properties props = PropertiesUtil.deepCopy(TEST_PROPERTIES);
try (Connection conn = DriverManager.getConnection(getUrl(), props)) {
conn.setAutoCommit(true);
conn.createStatement().execute("CREATE TABLE " + dataTableFullName + " "
+ "(key1 BIGINT NOT NULL, key2 BIGINT NOT NULL, val1 VARCHAR, val2 BIGINT, "
+ "val3 BIGINT, val4 DOUBLE, val5 BIGINT, val6 VARCHAR "
+ "CONSTRAINT my_pk PRIMARY KEY(key1, key2)) COLUMN_ENCODED_BYTES=0");
conn.createStatement().execute(
"CREATE VIEW " + viewFullName + " AS SELECT * FROM " + dataTableFullName + " WHERE val6 = 'def'");
conn.createStatement().execute(String.format(
"CREATE INDEX " + viewIndexName + " ON " + viewFullName
+ " (val3) INCLUDE(val5)"));
customeEdge.setValue(EnvironmentEdgeManager.currentTimeMillis());
EnvironmentEdgeManager.injectEdge(customeEdge);
long t1 = customeEdge.currentTime();
customeEdge.incrementValue(waitForUpsert);
conn.createStatement()
.execute("UPSERT INTO " + viewFullName + " VALUES (5,6,'abc',8,4,1.3,6,'def')");
customeEdge.incrementValue(waitForUpsert);
long t2 = customeEdge.currentTime();
customeEdge.incrementValue(waitForUpsert);
conn.createStatement()
.execute("UPSERT INTO " + viewFullName + " VALUES (1,2,'abc',3,4,1.2,5,'def')");
customeEdge.incrementValue(waitForUpsert);
long t3 = customeEdge.currentTime();
customeEdge.incrementValue(waitForUpsert);
conn.createStatement().execute("DELETE FROM " + viewFullName + " WHERE key1=5");
customeEdge.incrementValue(waitForUpsert);
long t4 = customeEdge.currentTime();
customeEdge.incrementValue(10);
long t5 = customeEdge.currentTime();
IndexTool it;
// regular job with delete row
it =
IndexToolIT.runIndexTool(directApi, useSnapshot, schemaName, viewName,
viewIndexName, null, 0, IndexTool.IndexVerifyType.ONLY, "-st", String.valueOf(t1),
"-et", String.valueOf(t4));
verifyCounters(it, 2, 2);
// job with 1 row
it =
IndexToolIT.runIndexTool(directApi, useSnapshot, schemaName, viewName,
viewIndexName, null, 0, IndexTool.IndexVerifyType.ONLY, "-st", String.valueOf(t1),
"-et", String.valueOf(t2));
verifyCounters(it, 1, 1);
// job with update on only one row
it =
IndexToolIT.runIndexTool(directApi, useSnapshot, schemaName, viewName,
viewIndexName, null, 0, IndexTool.IndexVerifyType.ONLY, "-st", String.valueOf(t2),
"-et", String.valueOf(t3));
verifyCounters(it, 1, 1);
// job with update on 2 rows
it =
IndexToolIT.runIndexTool(directApi, useSnapshot, schemaName, viewName,
viewIndexName, null, 0, IndexTool.IndexVerifyType.ONLY, "-st", String.valueOf(t1),
"-et", String.valueOf(t3));
verifyCounters(it, 2, 2);
/*
// job with update on only one row
it =
IndexToolIT.runIndexTool(directApi, useSnapshot, schemaName, viewName,
viewIndexName, null, 0, IndexTool.IndexVerifyType.ONLY, "-st", String.valueOf(t3),
"-et", String.valueOf(t4));
verifyCounters(it, 1, 1);
*/
// job with no new updates on any row
it =
IndexToolIT.runIndexTool(directApi, useSnapshot, schemaName, viewName,
viewIndexName, null, 0, IndexTool.IndexVerifyType.ONLY, "-st", String.valueOf(t4),
"-et", String.valueOf(t5));
verifyCounters(it, 0, 0);
} finally {
EnvironmentEdgeManager.reset();
}
}
private void verifyCounters(IndexTool it, int scanned, int valid) throws IOException {
assertEquals(scanned, it.getJob().getCounters().findCounter(SCANNED_DATA_ROW_COUNT).getValue());
assertEquals(0, it.getJob().getCounters().findCounter(REBUILT_INDEX_ROW_COUNT).getValue());
assertEquals(valid, it.getJob().getCounters().findCounter(BEFORE_REBUILD_VALID_INDEX_ROW_COUNT).getValue());
assertEquals(0, it.getJob().getCounters().findCounter(BEFORE_REBUILD_EXPIRED_INDEX_ROW_COUNT).getValue());
assertEquals(0, it.getJob().getCounters().findCounter(BEFORE_REBUILD_INVALID_INDEX_ROW_COUNT).getValue());
assertEquals(0, it.getJob().getCounters().findCounter(BEFORE_REBUILD_MISSING_INDEX_ROW_COUNT).getValue());
assertEquals(0, it.getJob().getCounters().findCounter(BEFORE_REBUILD_BEYOND_MAXLOOKBACK_MISSING_INDEX_ROW_COUNT).getValue());
assertEquals(0, it.getJob().getCounters().findCounter(BEFORE_REBUILD_BEYOND_MAXLOOKBACK_INVALID_INDEX_ROW_COUNT).getValue());
assertEquals(0, it.getJob().getCounters().findCounter(BEFORE_REBUILD_UNVERIFIED_INDEX_ROW_COUNT).getValue());
assertEquals(0, it.getJob().getCounters().findCounter(BEFORE_REBUILD_OLD_INDEX_ROW_COUNT).getValue());
assertEquals(0, it.getJob().getCounters().findCounter(BEFORE_REBUILD_UNKNOWN_INDEX_ROW_COUNT).getValue());
}
@Test
public void testDisableOutputLogging() throws Exception {
if (!mutable || useSnapshot) {
return;
}
String schemaName = generateUniqueName();
String dataTableName = generateUniqueName();
String dataTableFullName = SchemaUtil.getTableName(schemaName, dataTableName);
String indexTableName = generateUniqueName();
String indexTableFullName = SchemaUtil.getTableName(schemaName, indexTableName);
Properties props = PropertiesUtil.deepCopy(TEST_PROPERTIES);
try(Connection conn = DriverManager.getConnection(getUrl(), props)) {
deleteAllRows(conn,
TableName.valueOf(IndexVerificationOutputRepository.OUTPUT_TABLE_NAME));
String stmString1 =
"CREATE TABLE " + dataTableFullName
+ " (ID INTEGER NOT NULL PRIMARY KEY, NAME VARCHAR, ZIP INTEGER) "
+ tableDDLOptions;
conn.createStatement().execute(stmString1);
String upsertQuery = String.format("UPSERT INTO %s VALUES(?, ?, ?)", dataTableFullName);
PreparedStatement stmt1 = conn.prepareStatement(upsertQuery);
// insert two rows
IndexToolIT.upsertRow(stmt1, 1);
IndexToolIT.upsertRow(stmt1, 2);
conn.commit();
//create ASYNC
String stmtString2 =
String.format(
"CREATE INDEX %s ON %s (LPAD(UPPER(NAME, 'en_US'),8,'x')||'_xyz') ASYNC ",
indexTableName, dataTableFullName);
conn.createStatement().execute(stmtString2);
conn.commit();
// run the index MR job as ONLY so the index doesn't get rebuilt. Should be 2 missing
//rows. We pass in --disable-logging BEFORE to silence the output logging to
// PHOENIX_INDEX_TOOL, since ONLY logs BEFORE the (non-existent in this case)
// rebuild
assertDisableLogging(conn, 0, IndexTool.IndexVerifyType.ONLY,
IndexTool.IndexDisableLoggingType.BEFORE, null, schemaName, dataTableName, indexTableName,
indexTableFullName, 0);
// disabling logging AFTER on an AFTER run should leave no output rows
assertDisableLogging(conn, 0, IndexTool.IndexVerifyType.AFTER,
IndexTool.IndexDisableLoggingType.AFTER, null, schemaName, dataTableName,
indexTableName,
indexTableFullName, 0);
//disabling logging BEFORE on a BEFORE run should leave no output rows
assertDisableLogging(conn, 0, IndexTool.IndexVerifyType.BEFORE,
IndexTool.IndexDisableLoggingType.BEFORE, null, schemaName, dataTableName, indexTableName,
indexTableFullName, 0);
//now clear out all the rebuilt index rows
deleteAllRows(conn, TableName.valueOf(indexTableFullName));
//now check that disabling logging AFTER leaves only the BEFORE logs on a BOTH run
assertDisableLogging(conn, 2, IndexTool.IndexVerifyType.BOTH,
IndexTool.IndexDisableLoggingType.AFTER,
IndexVerificationOutputRepository.PHASE_BEFORE_VALUE, schemaName,
dataTableName, indexTableName,
indexTableFullName, -1);
//clear out both the output table and the index
deleteAllRows(conn,
TableName.valueOf(IndexVerificationOutputRepository.OUTPUT_TABLE_NAME));
deleteAllRows(conn, TableName.valueOf(indexTableFullName));
//now check that disabling logging BEFORE creates only the AFTER logs on a BOTH run
assertDisableLogging(conn, 2, IndexTool.IndexVerifyType.BOTH,
IndexTool.IndexDisableLoggingType.BEFORE,
IndexVerificationOutputRepository.PHASE_AFTER_VALUE, schemaName,
dataTableName, indexTableName,
indexTableFullName, -1);
deleteAllRows(conn, TableName.valueOf(IndexVerificationOutputRepository.OUTPUT_TABLE_NAME));
deleteAllRows(conn, TableName.valueOf(indexTableFullName));
//now check that disabling logging BOTH creates no logs on a BOTH run
assertDisableLogging(conn, 0, IndexTool.IndexVerifyType.BOTH,
IndexTool.IndexDisableLoggingType.BOTH,
IndexVerificationOutputRepository.PHASE_BEFORE_VALUE, schemaName,
dataTableName, indexTableName,
indexTableFullName, -1);
}
}
@Test
public void testEnableOutputLoggingForMaxLookback() throws Exception {
//by default we don't log invalid or missing rows past max lookback age to the
// PHOENIX_INDEX_TOOL table. Verify that we can flip that logging on from the client-side
// using a system property (such as from the command line) and have it log rows on the
// server-side
if (!mutable) {
return;
}
String schemaName = generateUniqueName();
String dataTableName = generateUniqueName();
String dataTableFullName = SchemaUtil.getTableName(schemaName, dataTableName);
String indexTableName = generateUniqueName();
String indexTableFullName = SchemaUtil.getTableName(schemaName, indexTableName);
String oldProperty =
System.getProperty(IndexRebuildRegionScanner.PHOENIX_INDEX_MR_LOG_BEYOND_MAX_LOOKBACK_ERRORS);
try(Connection conn = DriverManager.getConnection(getUrl())) {
System.setProperty(IndexRebuildRegionScanner.PHOENIX_INDEX_MR_LOG_BEYOND_MAX_LOOKBACK_ERRORS, "true");
ManualEnvironmentEdge injectEdge = new ManualEnvironmentEdge();
injectEdge.setValue(EnvironmentEdgeManager.currentTimeMillis());
EnvironmentEdgeManager.injectEdge(injectEdge);
deleteAllRows(conn,
TableName.valueOf(IndexVerificationOutputRepository.OUTPUT_TABLE_NAME));
String stmString1 =
"CREATE TABLE " + dataTableFullName
+ " (ID INTEGER NOT NULL PRIMARY KEY, NAME VARCHAR, ZIP INTEGER) ";
conn.createStatement().execute(stmString1);
injectEdge.incrementValue(1L);
String upsertQuery = String.format("UPSERT INTO %s VALUES(?, ?, ?)", dataTableFullName);
PreparedStatement stmt1 = conn.prepareStatement(upsertQuery);
// insert two rows
IndexToolIT.upsertRow(stmt1, 1);
IndexToolIT.upsertRow(stmt1, 2);
conn.commit();
injectEdge.incrementValue(1L); //we have to increment time to see our writes
//now create an index async so it won't have the two rows in the base table
String stmtString2 =
String.format(
"CREATE INDEX %s ON %s (LPAD(UPPER(NAME, 'en_US'),8,'x')||'_xyz') ASYNC ",
indexTableName, dataTableFullName);
conn.createStatement().execute(stmtString2);
conn.commit();
injectEdge.incrementValue(MAX_LOOKBACK_AGE * 1000);
deleteAllRows(conn, TableName.valueOf(IndexVerificationOutputRepository.OUTPUT_TABLE_NAME));
IndexTool it = IndexToolIT.runIndexTool(directApi, useSnapshot, schemaName,
dataTableName,
indexTableName, null, 0, IndexTool.IndexVerifyType.ONLY);
TestUtil.dumpTable(conn, TableName.valueOf(IndexVerificationOutputRepository.OUTPUT_TABLE_NAME));
Counters counters = it.getJob().getCounters();
System.out.println(counters.toString());
assertEquals(2L,
counters.findCounter(BEFORE_REBUILD_BEYOND_MAXLOOKBACK_MISSING_INDEX_ROW_COUNT).getValue());
IndexVerificationOutputRepository outputRepository =
new IndexVerificationOutputRepository(Bytes.toBytes(indexTableFullName), conn);
List<IndexVerificationOutputRow> outputRows = outputRepository.getAllOutputRows();
assertEquals(0, outputRows.size());
} finally {
if (oldProperty != null) {
System.setProperty(IndexRebuildRegionScanner.PHOENIX_INDEX_MR_LOG_BEYOND_MAX_LOOKBACK_ERRORS, oldProperty);
}
EnvironmentEdgeManager.reset();
}
}
@Test
public void testUpdatablePKFilterViewIndexRebuild() throws Exception {
if (!mutable) {
return;
}
String schemaName = generateUniqueName();
String dataTableName = generateUniqueName();
String dataTableFullName = SchemaUtil.getTableName(schemaName, dataTableName);
String view1Name = generateUniqueName();
String view1FullName = SchemaUtil.getTableName(schemaName, view1Name);
String view2Name = generateUniqueName();
String view2FullName = SchemaUtil.getTableName(schemaName, view2Name);
String indexTableName = generateUniqueName();
String indexTableFullName = SchemaUtil.getTableName(schemaName, indexTableName);
Properties props = PropertiesUtil.deepCopy(TEST_PROPERTIES);
try (Connection conn = DriverManager.getConnection(getUrl(), props)) {
// Create Table and Views. Note the view is on a non leading PK data table column
String createTable =
"CREATE TABLE IF NOT EXISTS " + dataTableFullName + " (\n"
+ " ORGANIZATION_ID VARCHAR NOT NULL,\n"
+ " KEY_PREFIX CHAR(3) NOT NULL,\n" + " CREATED_BY VARCHAR,\n"
+ " CONSTRAINT PK PRIMARY KEY (\n" + " ORGANIZATION_ID,\n"
+ " KEY_PREFIX\n" + " )\n"
+ ") VERSIONS=1, COLUMN_ENCODED_BYTES=0";
conn.createStatement().execute(createTable);
String createView1 =
"CREATE VIEW IF NOT EXISTS " + view1FullName + " (\n"
+ " VIEW_COLA VARCHAR NOT NULL,\n"
+ " VIEW_COLB CHAR(1) CONSTRAINT PKVIEW PRIMARY KEY (\n"
+ " VIEW_COLA\n" + " )) AS \n" + " SELECT * FROM " + dataTableFullName
+ " WHERE KEY_PREFIX = 'aaa'";
conn.createStatement().execute(createView1);
String createView2 =
"CREATE VIEW IF NOT EXISTS " + view2FullName + " (\n"
+ " VIEW_COL1 VARCHAR NOT NULL,\n"
+ " VIEW_COL2 CHAR(1) CONSTRAINT PKVIEW PRIMARY KEY (\n"
+ " VIEW_COL1\n" + " )) AS \n" + " SELECT * FROM " + dataTableFullName
+ " WHERE KEY_PREFIX = 'ccc'";
conn.createStatement().execute(createView2);
// We want to verify if deletes and set null result in expected rebuild of view index
conn.createStatement().execute("UPSERT INTO " + view1FullName
+ "(ORGANIZATION_ID, VIEW_COLA, VIEW_COLB) VALUES('ORG1', 'A', 'G')");
conn.createStatement().execute("UPSERT INTO " + view1FullName
+ "(ORGANIZATION_ID, VIEW_COLA, VIEW_COLB) VALUES('ORG1', 'C', 'I')");
conn.createStatement().execute("UPSERT INTO " + view1FullName
+ "(ORGANIZATION_ID, VIEW_COLA, VIEW_COLB) VALUES('ORG1', 'D', 'J')");
conn.createStatement().execute("UPSERT INTO " + view2FullName
+ "(ORGANIZATION_ID, VIEW_COL1, VIEW_COL2) VALUES('ORG2', 'B', 'H')");
conn.commit();
conn.createStatement().execute("DELETE FROM " + view1FullName
+ " WHERE ORGANIZATION_ID = 'ORG1' AND VIEW_COLA = 'C'");
conn.createStatement().execute("UPSERT INTO " + view1FullName
+ "(ORGANIZATION_ID, VIEW_COLA, VIEW_COLB) VALUES('ORG1', 'D', NULL)");
conn.commit();
String createViewIndex =
"CREATE INDEX IF NOT EXISTS " + indexTableName + " ON " + view1FullName
+ " (VIEW_COLB) ASYNC";
conn.createStatement().execute(createViewIndex);
conn.commit();
// Rebuild using index tool
IndexToolIT.runIndexTool(directApi, useSnapshot, schemaName, view1Name, indexTableName);
ResultSet rs =
conn.createStatement()
.executeQuery("SELECT COUNT(*) FROM " + indexTableFullName);
rs.next();
assertEquals(2, rs.getInt(1));
Pair<Integer, Integer> putsAndDeletes =
countPutsAndDeletes("_IDX_" + dataTableFullName);
assertEquals(4, (int) putsAndDeletes.getFirst());
assertEquals(2, (int) putsAndDeletes.getSecond());
}
}
@Test
public void testUpdatableNonPkFilterViewIndexRebuild() throws Exception {
if (!mutable) {
return;
}
String schemaName = generateUniqueName();
String dataTableName = generateUniqueName();
String dataTableFullName = SchemaUtil.getTableName(schemaName, dataTableName);
String view1Name = generateUniqueName();
String view1FullName = SchemaUtil.getTableName(schemaName, view1Name);
String view2Name = generateUniqueName();
String view2FullName = SchemaUtil.getTableName(schemaName, view2Name);
String indexTableName = generateUniqueName();
String indexTableFullName = SchemaUtil.getTableName(schemaName, indexTableName);
Properties props = PropertiesUtil.deepCopy(TEST_PROPERTIES);
try (Connection conn = DriverManager.getConnection(getUrl(), props)) {
// Create Table and Views. Note the view is on a non PK data table column
String createTable =
"CREATE TABLE IF NOT EXISTS " + dataTableFullName + " (\n"
+ " ORGANIZATION_ID VARCHAR NOT NULL,\n"
+ " KEY_PREFIX CHAR(3) NOT NULL,\n" + " CREATED_BY VARCHAR,\n"
+ " CONSTRAINT PK PRIMARY KEY (\n" + " ORGANIZATION_ID,\n"
+ " KEY_PREFIX\n" + " )\n"
+ ") VERSIONS=1, COLUMN_ENCODED_BYTES=0";
conn.createStatement().execute(createTable);
String createView1 =
"CREATE VIEW IF NOT EXISTS " + view1FullName + " (\n"
+ " VIEW_COLA VARCHAR NOT NULL,\n"
+ " VIEW_COLB CHAR(1) CONSTRAINT PKVIEW PRIMARY KEY (\n"
+ " VIEW_COLA\n" + " )) AS \n" + " SELECT * FROM " + dataTableFullName
+ " WHERE CREATED_BY = 'foo'";
conn.createStatement().execute(createView1);
String createView2 =
"CREATE VIEW IF NOT EXISTS " + view2FullName + " (\n"
+ " VIEW_COL1 VARCHAR NOT NULL,\n"
+ " VIEW_COL2 CHAR(1) CONSTRAINT PKVIEW PRIMARY KEY (\n"
+ " VIEW_COL1\n" + " )) AS \n" + " SELECT * FROM " + dataTableFullName
+ " WHERE CREATED_BY = 'bar'";
conn.createStatement().execute(createView2);
// We want to verify if deletes and set null result in expected rebuild of view index
conn.createStatement().execute("UPSERT INTO " + view1FullName
+ "(ORGANIZATION_ID, KEY_PREFIX, VIEW_COLA, VIEW_COLB) VALUES('ORG1', 'aaa', 'A', 'G')");
conn.createStatement().execute("UPSERT INTO " + view1FullName
+ "(ORGANIZATION_ID, KEY_PREFIX, VIEW_COLA, VIEW_COLB) VALUES('ORG1', 'ccc', 'C', 'I')");
conn.createStatement().execute("UPSERT INTO " + view1FullName
+ "(ORGANIZATION_ID, KEY_PREFIX, VIEW_COLA, VIEW_COLB) VALUES('ORG1', 'ddd', 'D', 'J')");
conn.createStatement().execute("UPSERT INTO " + view2FullName
+ "(ORGANIZATION_ID, KEY_PREFIX, VIEW_COL1, VIEW_COL2) VALUES('ORG2', 'bbb', 'B', 'H')");
conn.commit();
conn.createStatement().execute("DELETE FROM " + view1FullName
+ " WHERE ORGANIZATION_ID = 'ORG1' AND VIEW_COLA = 'C'");
conn.createStatement().execute("UPSERT INTO " + view1FullName
+ "(ORGANIZATION_ID, KEY_PREFIX, VIEW_COLA, VIEW_COLB) VALUES('ORG1', 'ddd', 'D', NULL)");
conn.commit();
String createViewIndex =
"CREATE INDEX IF NOT EXISTS " + indexTableName + " ON " + view1FullName
+ " (VIEW_COLB) ASYNC";
conn.createStatement().execute(createViewIndex);
conn.commit();
// Rebuild using index tool
IndexToolIT.runIndexTool(directApi, useSnapshot, schemaName, view1Name, indexTableName);
ResultSet rs =
conn.createStatement()
.executeQuery("SELECT COUNT(*) FROM " + indexTableFullName);
rs.next();
assertEquals(2, rs.getInt(1));
Pair<Integer, Integer> putsAndDeletes =
countPutsAndDeletes("_IDX_" + dataTableFullName);
assertEquals(4, (int) putsAndDeletes.getFirst());
assertEquals(2, (int) putsAndDeletes.getSecond());
}
}
private Pair<Integer, Integer> countPutsAndDeletes(String tableName) throws Exception {
int numPuts = 0;
int numDeletes = 0;
try (org.apache.hadoop.hbase.client.Connection hcon =
ConnectionFactory.createConnection(config)) {
Table htable = hcon.getTable(TableName.valueOf(tableName));
Scan scan = new Scan();
scan.setRaw(true);
ResultScanner scanner = htable.getScanner(scan);
for (Result result = scanner.next(); result != null; result = scanner.next()) {
for (Cell cell : result.rawCells()) {
if (KeyValue.Type.codeToType(cell.getTypeByte()) == KeyValue.Type.Put) {
numPuts++;
} else if (KeyValue.Type
.codeToType(cell.getTypeByte()) == KeyValue.Type.DeleteFamily) {
numDeletes++;
}
}
}
}
return new Pair<Integer, Integer>(numPuts, numDeletes);
}
public void deleteAllRows(Connection conn, TableName tableName) throws SQLException,
IOException, InterruptedException {
Scan scan = new Scan();
HBaseAdmin admin = conn.unwrap(PhoenixConnection.class).getQueryServices().
getAdmin();
HConnection hbaseConn = admin.getConnection();
HTableInterface table = hbaseConn.getTable(tableName);
boolean deletedRows = false;
try (ResultScanner scanner = table.getScanner(scan)) {
for (Result r : scanner) {
Delete del = new Delete(r.getRow());
table.delete(del);
deletedRows = true;
}
} catch (Exception e) {
//if the table doesn't exist, we have no rows to delete. Easier to catch
//than to pre-check for existence
}
//don't flush/compact if we didn't write anything, because we'll hang forever
if (deletedRows) {
getUtility().getHBaseAdmin().flush(tableName);
TestUtil.majorCompact(getUtility(), tableName);
}
}
private void assertDisableLogging(Connection conn, int expectedRows,
IndexTool.IndexVerifyType verifyType,
IndexTool.IndexDisableLoggingType disableLoggingType,
byte[] expectedPhase,
String schemaName, String dataTableName,
String indexTableName, String indexTableFullName,
int expectedStatus) throws Exception {
IndexTool tool = IndexToolIT.runIndexTool(true, false, schemaName, dataTableName,
indexTableName,
null,
expectedStatus, verifyType, "-dl", disableLoggingType.toString());
assertNotNull(tool);
byte[] indexTableFullNameBytes = Bytes.toBytes(indexTableFullName);
IndexVerificationOutputRepository outputRepository =
new IndexVerificationOutputRepository(indexTableFullNameBytes, conn);
List<IndexVerificationOutputRow> rows =
outputRepository.getAllOutputRows();
try {
assertEquals(expectedRows, rows.size());
} catch (AssertionError e) {
TestUtil.dumpTable(conn, TableName.valueOf(IndexVerificationOutputRepository.OUTPUT_TABLE_NAME));
throw e;
}
if (expectedRows > 0) {
assertArrayEquals(expectedPhase, rows.get(0).getPhaseValue());
}
}
private void deleteOneRowFromResultTable(Connection conn, Long scn, String indexTable)
throws SQLException, IOException {
Table hIndexToolTable = conn.unwrap(PhoenixConnection.class).getQueryServices()
.getTable(RESULT_TABLE_NAME_BYTES);
Scan s = new Scan();
s.setRowPrefixFilter(Bytes.toBytes(String.format("%s%s%s", scn, ROW_KEY_SEPARATOR, indexTable)));
ResultScanner rs = hIndexToolTable.getScanner(s);
hIndexToolTable.delete(new Delete(rs.next().getRow()));
}
private List<String> verifyRunStatusFromResultTable(Connection conn, Long scn, String indexTable, int totalRows, List<String> expectedStatus) throws SQLException, IOException {
Table hIndexToolTable = conn.unwrap(PhoenixConnection.class).getQueryServices()
.getTable(RESULT_TABLE_NAME_BYTES);
Assert.assertEquals(totalRows, TestUtil.getRowCount(hIndexToolTable, false));
List<String> output = new ArrayList<>();
Scan s = new Scan();
s.setRowPrefixFilter(Bytes.toBytes(String.format("%s%s%s", scn, ROW_KEY_SEPARATOR, indexTable)));
ResultScanner rs = hIndexToolTable.getScanner(s);
int count =0;
for(Result r : rs) {
Assert.assertTrue(r != null);
List<Cell> cells = r.getColumnCells(RESULT_TABLE_COLUMN_FAMILY, INDEX_TOOL_RUN_STATUS_BYTES);
Assert.assertEquals(cells.size(), 1);
Assert.assertTrue(Bytes.toString(cells.get(0).getRow()).startsWith(String.valueOf(scn)));
output.add(Bytes.toString(cells.get(0).getValue()));
count++;
}
//for each region
Assert.assertEquals(3, count);
for(int i=0; i< count; i++) {
Assert.assertEquals(expectedStatus.get(i), output.get(i));
}
return output;
}
public static class FastFailRegionObserver extends BaseRegionObserver {
@Override
public RegionScanner postScannerOpen(final ObserverContext<RegionCoprocessorEnvironment> c,
final Scan scan,
final RegionScanner s) throws IOException {
throw new DoNotRetryIOException("I'm just a coproc that's designed to fail fast");
}
}
}