blob: 4840047ea205e3e9569547a2629d8059621aa5d8 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.phoenix.end2end;
import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.Cell;
import org.apache.hadoop.hbase.CellUtil;
import org.apache.hadoop.hbase.HBaseIOException;
import org.apache.hadoop.hbase.HColumnDescriptor;
import org.apache.hadoop.hbase.HTableDescriptor;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.client.Admin;
import org.apache.hadoop.hbase.client.HBaseAdmin;
import org.apache.hadoop.hbase.client.Mutation;
import org.apache.hadoop.hbase.client.Result;
import org.apache.hadoop.hbase.client.ResultScanner;
import org.apache.hadoop.hbase.client.Scan;
import org.apache.hadoop.hbase.client.Table;
import org.apache.hadoop.hbase.coprocessor.ObserverContext;
import org.apache.hadoop.hbase.coprocessor.RegionCoprocessorEnvironment;
import org.apache.hadoop.hbase.coprocessor.SimpleRegionObserver;
import org.apache.hadoop.hbase.regionserver.HRegion;
import org.apache.hadoop.hbase.regionserver.MiniBatchOperationInProgress;
import org.apache.hadoop.hbase.regionserver.RegionScanner;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.hbase.util.Pair;
import org.apache.hadoop.mapreduce.Job;
import org.apache.phoenix.coprocessor.IndexRebuildRegionScanner;
import org.apache.phoenix.hbase.index.IndexRegionObserver;
import org.apache.phoenix.mapreduce.index.IndexVerificationOutputRepository;
import org.apache.phoenix.mapreduce.index.IndexVerificationResultRepository;
import org.apache.phoenix.jdbc.PhoenixConnection;
import org.apache.phoenix.mapreduce.index.IndexTool;
import org.apache.phoenix.mapreduce.index.PhoenixIndexImportDirectMapper;
import org.apache.phoenix.mapreduce.index.PhoenixServerBuildIndexMapper;
import org.apache.phoenix.query.ConnectionQueryServices;
import org.apache.phoenix.query.QueryServices;
import org.apache.phoenix.query.QueryServicesOptions;
import org.apache.phoenix.schema.PIndexState;
import org.apache.phoenix.schema.PTable;
import org.apache.phoenix.transaction.PhoenixTransactionProvider.Feature;
import org.apache.phoenix.transaction.TransactionFactory;
import org.apache.phoenix.util.IndexScrutiny;
import org.apache.phoenix.util.PhoenixRuntime;
import org.apache.phoenix.util.PropertiesUtil;
import org.apache.phoenix.util.QueryUtil;
import org.apache.phoenix.util.ReadOnlyProps;
import org.apache.phoenix.util.SchemaUtil;
import org.apache.phoenix.util.TestUtil;
import org.junit.Assert;
import org.junit.BeforeClass;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.junit.runners.Parameterized;
import org.junit.runners.Parameterized.Parameters;
import java.sql.Connection;
import java.sql.DriverManager;
import java.sql.PreparedStatement;
import java.sql.ResultSet;
import java.sql.SQLException;
import java.sql.Types;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.List;
import java.util.Map;
import java.util.Properties;
import java.util.UUID;
import java.util.concurrent.atomic.AtomicInteger;
import static org.apache.phoenix.mapreduce.PhoenixJobCounters.INPUT_RECORDS;
import static org.apache.phoenix.mapreduce.index.PhoenixIndexToolJobCounters.AFTER_REBUILD_INVALID_INDEX_ROW_COUNT;
import static org.apache.phoenix.mapreduce.index.PhoenixIndexToolJobCounters.AFTER_REBUILD_MISSING_INDEX_ROW_COUNT;
import static org.apache.phoenix.mapreduce.index.PhoenixIndexToolJobCounters.AFTER_REBUILD_VALID_INDEX_ROW_COUNT;
import static org.apache.phoenix.mapreduce.index.PhoenixIndexToolJobCounters.AFTER_REBUILD_EXPIRED_INDEX_ROW_COUNT;
import static org.apache.phoenix.mapreduce.index.PhoenixIndexToolJobCounters.AFTER_REBUILD_BEYOND_MAXLOOKBACK_INVALID_INDEX_ROW_COUNT;
import static org.apache.phoenix.mapreduce.index.PhoenixIndexToolJobCounters.AFTER_REBUILD_BEYOND_MAXLOOKBACK_MISSING_INDEX_ROW_COUNT;
import static org.apache.phoenix.mapreduce.index.PhoenixIndexToolJobCounters.BEFORE_REBUILD_BEYOND_MAXLOOKBACK_MISSING_INDEX_ROW_COUNT;
import static org.apache.phoenix.mapreduce.index.PhoenixIndexToolJobCounters.BEFORE_REBUILD_EXPIRED_INDEX_ROW_COUNT;
import static org.apache.phoenix.mapreduce.index.PhoenixIndexToolJobCounters.BEFORE_REBUILD_INVALID_INDEX_ROW_COUNT;
import static org.apache.phoenix.mapreduce.index.PhoenixIndexToolJobCounters.BEFORE_REBUILD_MISSING_INDEX_ROW_COUNT;
import static org.apache.phoenix.mapreduce.index.PhoenixIndexToolJobCounters.BEFORE_REBUILD_VALID_INDEX_ROW_COUNT;
import static org.apache.phoenix.mapreduce.index.PhoenixIndexToolJobCounters.BEFORE_REBUILD_BEYOND_MAXLOOKBACK_INVALID_INDEX_ROW_COUNT;
import static org.apache.phoenix.mapreduce.index.PhoenixIndexToolJobCounters.REBUILT_INDEX_ROW_COUNT;
import static org.apache.phoenix.mapreduce.index.PhoenixIndexToolJobCounters.SCANNED_DATA_ROW_COUNT;
import static org.apache.phoenix.util.TestUtil.TEST_PROPERTIES;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertTrue;
import static org.junit.Assert.fail;
@RunWith(Parameterized.class)
public class IndexToolIT extends BaseUniqueNamesOwnClusterIT {
private final boolean localIndex;
private final boolean mutable;
private final boolean transactional;
private final boolean directApi;
private final String tableDDLOptions;
private final boolean useSnapshot;
private final boolean useTenantId;
public IndexToolIT(String transactionProvider, boolean mutable, boolean localIndex,
boolean directApi, boolean useSnapshot, boolean useTenantId) {
this.localIndex = localIndex;
this.mutable = mutable;
this.transactional = transactionProvider != null;
this.directApi = directApi;
this.useSnapshot = useSnapshot;
this.useTenantId = useTenantId;
StringBuilder optionBuilder = new StringBuilder();
if (!mutable) {
optionBuilder.append(" IMMUTABLE_ROWS=true ");
}
if (transactional) {
if (!(optionBuilder.length() == 0)) {
optionBuilder.append(",");
}
optionBuilder.append(" TRANSACTIONAL=true,TRANSACTION_PROVIDER='" + transactionProvider + "'");
}
optionBuilder.append(" SPLIT ON(1,2)");
this.tableDDLOptions = optionBuilder.toString();
}
@BeforeClass
public static synchronized void setup() throws Exception {
Map<String, String> serverProps = Maps.newHashMapWithExpectedSize(2);
serverProps.put(QueryServices.STATS_GUIDEPOST_WIDTH_BYTES_ATTRIB, Long.toString(20));
serverProps.put(QueryServices.MAX_SERVER_METADATA_CACHE_TIME_TO_LIVE_MS_ATTRIB, Long.toString(5));
serverProps.put(QueryServices.EXTRA_JDBC_ARGUMENTS_ATTRIB,
QueryServicesOptions.DEFAULT_EXTRA_JDBC_ARGUMENTS);
serverProps.put(QueryServices.INDEX_REBUILD_PAGE_SIZE_IN_ROWS, Long.toString(8));
Map<String, String> clientProps = Maps.newHashMapWithExpectedSize(2);
clientProps.put(QueryServices.USE_STATS_FOR_PARALLELIZATION, Boolean.toString(true));
clientProps.put(QueryServices.STATS_UPDATE_FREQ_MS_ATTRIB, Long.toString(5));
clientProps.put(QueryServices.TRANSACTIONS_ENABLED, Boolean.TRUE.toString());
clientProps.put(QueryServices.FORCE_ROW_KEY_ORDER_ATTRIB, Boolean.TRUE.toString());
setUpTestDriver(new ReadOnlyProps(serverProps.entrySet().iterator()),
new ReadOnlyProps(clientProps.entrySet().iterator()));
}
@Parameters(
name = "transactionProvider={0},mutable={1},localIndex={2},directApi={3},useSnapshot={4}")
public static synchronized Collection<Object[]> data() {
List<Object[]> list = Lists.newArrayListWithExpectedSize(48);
boolean[] Booleans = new boolean[] { false, true };
for (String transactionProvider : new String[] {"TEPHRA", "OMID", null}) {
for (boolean mutable : Booleans) {
for (boolean localIndex : Booleans) {
if (!localIndex
|| transactionProvider == null
|| !TransactionFactory.getTransactionProvider(
TransactionFactory.Provider.valueOf(transactionProvider))
.isUnsupported(Feature.ALLOW_LOCAL_INDEX)) {
if (localIndex) {
for (boolean directApi : Booleans) {
list.add(new Object[]{transactionProvider, mutable, localIndex,
directApi, false, false});
}
}
else {
// Due to PHOENIX-5375 and PHOENIX-5376, the snapshot and bulk load options are ignored for global indexes
list.add(new Object[]{transactionProvider, mutable, localIndex,
true, false, false});
}
}
}
}
}
// Add the usetenantId
list.add(new Object[] { null, false, false, true, false, true});
return TestUtil.filterTxParamData(list,0);
}
private void setEveryNthRowWithNull(int nrows, int nthRowNull, PreparedStatement stmt) throws Exception {
for (int i = 1; i <= nrows; i++) {
stmt.setInt(1, i);
stmt.setInt(2, i + 1);
if (i % nthRowNull != 0) {
stmt.setInt(3, i * i);
} else {
stmt.setNull(3, Types.INTEGER);
}
stmt.execute();
}
}
@Test
public void testWithSetNull() throws Exception {
// This test is for building non-transactional mutable global indexes with direct api
if (localIndex || transactional || !directApi || useSnapshot || useTenantId || !mutable) {
return;
}
// This tests the cases where a column having a null value is overwritten with a not null value and vice versa;
// and after that the index table is still rebuilt correctly
final int NROWS = 2 * 3 * 5 * 7;
String schemaName = generateUniqueName();
String dataTableName = generateUniqueName();
String dataTableFullName = SchemaUtil.getTableName(schemaName, dataTableName);
String indexTableName = generateUniqueName();
String indexTableFullName = SchemaUtil.getTableName(schemaName, indexTableName);
Properties props = PropertiesUtil.deepCopy(TEST_PROPERTIES);
try (Connection conn = DriverManager.getConnection(getUrl(), props)) {
conn.createStatement().execute("CREATE TABLE " + dataTableFullName
+ " (ID INTEGER NOT NULL PRIMARY KEY, VAL1 INTEGER, VAL2 INTEGER) "
+ tableDDLOptions);
String upsertStmt = "UPSERT INTO " + dataTableFullName + " VALUES(?,?,?)";
PreparedStatement stmt = conn.prepareStatement(upsertStmt);
setEveryNthRowWithNull(NROWS, 2, stmt);
conn.commit();
setEveryNthRowWithNull(NROWS, 3, stmt);
conn.commit();
conn.createStatement().execute(String.format(
"CREATE %s INDEX %s ON %s (VAL1) INCLUDE (VAL2) ASYNC ",
(localIndex ? "LOCAL" : ""), indexTableName, dataTableFullName));
// Run the index MR job and verify that the index table is built correctly
IndexTool indexTool = runIndexTool(directApi, useSnapshot, schemaName, dataTableName, indexTableName, null, 0, new String[0]);
assertEquals(NROWS, indexTool.getJob().getCounters().findCounter(INPUT_RECORDS).getValue());
long actualRowCount = IndexScrutiny.scrutinizeIndex(conn, dataTableFullName, indexTableFullName);
assertEquals(NROWS, actualRowCount);
actualRowCount = IndexScrutiny.scrutinizeIndex(conn, dataTableFullName, indexTableFullName);
assertEquals(NROWS, actualRowCount);
setEveryNthRowWithNull(NROWS, 5, stmt);
conn.commit();
actualRowCount = IndexScrutiny.scrutinizeIndex(conn, dataTableFullName, indexTableFullName);
assertEquals(NROWS, actualRowCount);
setEveryNthRowWithNull(NROWS, 7, stmt);
conn.commit();
actualRowCount = IndexScrutiny.scrutinizeIndex(conn, dataTableFullName, indexTableFullName);
assertEquals(NROWS, actualRowCount);
actualRowCount = IndexScrutiny.scrutinizeIndex(conn, dataTableFullName, indexTableFullName);
assertEquals(NROWS, actualRowCount);
indexTool = runIndexTool(directApi, useSnapshot, schemaName, dataTableName, indexTableName, null,
0, IndexTool.IndexVerifyType.ONLY, new String[0]);
assertEquals(NROWS, indexTool.getJob().getCounters().findCounter(INPUT_RECORDS).getValue());
assertEquals(NROWS, indexTool.getJob().getCounters().findCounter(SCANNED_DATA_ROW_COUNT).getValue());
assertEquals(0, indexTool.getJob().getCounters().findCounter(REBUILT_INDEX_ROW_COUNT).getValue());
assertEquals(NROWS, indexTool.getJob().getCounters().findCounter(BEFORE_REBUILD_VALID_INDEX_ROW_COUNT).getValue());
assertEquals(0, indexTool.getJob().getCounters().findCounter(BEFORE_REBUILD_EXPIRED_INDEX_ROW_COUNT).getValue());
assertEquals(0, indexTool.getJob().getCounters().findCounter(BEFORE_REBUILD_INVALID_INDEX_ROW_COUNT).getValue());
assertEquals(0, indexTool.getJob().getCounters().findCounter(BEFORE_REBUILD_MISSING_INDEX_ROW_COUNT).getValue());
assertEquals(0, indexTool.getJob().getCounters().findCounter(BEFORE_REBUILD_BEYOND_MAXLOOKBACK_INVALID_INDEX_ROW_COUNT).getValue());
assertEquals(0, indexTool.getJob().getCounters().findCounter(BEFORE_REBUILD_BEYOND_MAXLOOKBACK_MISSING_INDEX_ROW_COUNT).getValue());
dropIndexToolTables(conn);
}
}
@Test
public void testSecondaryIndex() throws Exception {
String schemaName = generateUniqueName();
String dataTableName = generateUniqueName();
String dataTableFullName = SchemaUtil.getTableName(schemaName, dataTableName);
String indexTableName = generateUniqueName();
String indexTableFullName = SchemaUtil.getTableName(schemaName, indexTableName);
Properties props = PropertiesUtil.deepCopy(TEST_PROPERTIES);
Connection conn = DriverManager.getConnection(getUrl(), props);
try {
String stmString1 =
"CREATE TABLE " + dataTableFullName
+ " (ID INTEGER NOT NULL PRIMARY KEY, NAME VARCHAR, ZIP INTEGER) "
+ tableDDLOptions;
conn.createStatement().execute(stmString1);
String upsertQuery = String.format("UPSERT INTO %s VALUES(?, ?, ?)", dataTableFullName);
PreparedStatement stmt1 = conn.prepareStatement(upsertQuery);
// insert two rows
upsertRow(stmt1, 1);
upsertRow(stmt1, 2);
conn.commit();
if (transactional) {
// insert two rows in another connection without committing so that they are not
// visible to other transactions
try (Connection conn2 = DriverManager.getConnection(getUrl(), props)) {
conn2.setAutoCommit(false);
PreparedStatement stmt2 = conn2.prepareStatement(upsertQuery);
upsertRow(stmt2, 5);
upsertRow(stmt2, 6);
ResultSet rs =
conn.createStatement()
.executeQuery("SELECT count(*) from " + dataTableFullName);
assertTrue(rs.next());
assertEquals("Unexpected row count ", 2, rs.getInt(1));
assertFalse(rs.next());
rs =
conn2.createStatement()
.executeQuery("SELECT count(*) from " + dataTableFullName);
assertTrue(rs.next());
assertEquals("Unexpected row count ", 4, rs.getInt(1));
assertFalse(rs.next());
}
}
String stmtString2 =
String.format(
"CREATE %s INDEX %s ON %s (LPAD(UPPER(NAME, 'en_US'),8,'x')||'_xyz') ASYNC ",
(localIndex ? "LOCAL" : ""), indexTableName, dataTableFullName);
conn.createStatement().execute(stmtString2);
// verify rows are fetched from data table.
String selectSql =
String.format(
"SELECT ID FROM %s WHERE LPAD(UPPER(NAME, 'en_US'),8,'x')||'_xyz' = 'xxUNAME2_xyz'",
dataTableFullName);
ResultSet rs = conn.createStatement().executeQuery("EXPLAIN " + selectSql);
String actualExplainPlan = QueryUtil.getExplainPlan(rs);
// assert we are pulling from data table.
assertEquals(String.format(
"CLIENT PARALLEL 1-WAY FULL SCAN OVER %s\n"
+ " SERVER FILTER BY (LPAD(UPPER(NAME, 'en_US'), 8, 'x') || '_xyz') = 'xxUNAME2_xyz'",
dataTableFullName), actualExplainPlan);
rs = stmt1.executeQuery(selectSql);
assertTrue(rs.next());
assertEquals(2, rs.getInt(1));
assertFalse(rs.next());
conn.commit();
// run the index MR job.
runIndexTool(directApi, useSnapshot, schemaName, dataTableName, indexTableName);
// insert two more rows
upsertRow(stmt1, 3);
upsertRow(stmt1, 4);
conn.commit();
// assert we are pulling from index table.
rs = conn.createStatement().executeQuery("EXPLAIN " + selectSql);
actualExplainPlan = QueryUtil.getExplainPlan(rs);
assertExplainPlan(localIndex, actualExplainPlan, dataTableFullName, indexTableFullName);
rs = conn.createStatement().executeQuery(selectSql);
assertTrue(rs.next());
assertEquals(2, rs.getInt(1));
assertFalse(rs.next());
if (localIndex || transactional || useTenantId || useSnapshot) {
return;
}
// Run the index MR job and verify that the global index table is built correctly
IndexTool indexTool = runIndexTool(directApi, useSnapshot, schemaName, dataTableName, indexTableName, null, 0, IndexTool.IndexVerifyType.BEFORE, new String[0]);
assertEquals(4, indexTool.getJob().getCounters().findCounter(INPUT_RECORDS).getValue());
assertEquals(4, indexTool.getJob().getCounters().findCounter(SCANNED_DATA_ROW_COUNT).getValue());
assertEquals(0, indexTool.getJob().getCounters().findCounter(REBUILT_INDEX_ROW_COUNT).getValue());
assertEquals(4, indexTool.getJob().getCounters().findCounter(BEFORE_REBUILD_VALID_INDEX_ROW_COUNT).getValue());
assertEquals(0, indexTool.getJob().getCounters().findCounter(BEFORE_REBUILD_EXPIRED_INDEX_ROW_COUNT).getValue());
assertEquals(0, indexTool.getJob().getCounters().findCounter(BEFORE_REBUILD_INVALID_INDEX_ROW_COUNT).getValue());
assertEquals(0, indexTool.getJob().getCounters().findCounter(BEFORE_REBUILD_MISSING_INDEX_ROW_COUNT).getValue());
assertEquals(0, indexTool.getJob().getCounters().findCounter(BEFORE_REBUILD_BEYOND_MAXLOOKBACK_INVALID_INDEX_ROW_COUNT).getValue());
assertEquals(0, indexTool.getJob().getCounters().findCounter(BEFORE_REBUILD_BEYOND_MAXLOOKBACK_MISSING_INDEX_ROW_COUNT).getValue());
} finally {
conn.close();
}
}
private void dropIndexToolTables(Connection conn) throws Exception {
Admin admin = conn.unwrap(PhoenixConnection.class).getQueryServices().getAdmin();
TableName indexToolOutputTable =
TableName.valueOf(IndexVerificationOutputRepository.OUTPUT_TABLE_NAME_BYTES);
admin.disableTable(indexToolOutputTable);
admin.deleteTable(indexToolOutputTable);
TableName indexToolResultTable = TableName.valueOf(IndexVerificationResultRepository.RESULT_TABLE_NAME_BYTES);
admin.disableTable(indexToolResultTable);
admin.deleteTable(indexToolResultTable);
}
@Test
public void testBuildSecondaryIndexAndScrutinize() throws Exception {
// This test is for building non-transactional global indexes with direct api
if (localIndex || transactional || !directApi || useSnapshot || useTenantId) {
return;
}
String schemaName = generateUniqueName();
String dataTableName = generateUniqueName();
String dataTableFullName = SchemaUtil.getTableName(schemaName, dataTableName);
String indexTableName = generateUniqueName();
String indexTableFullName = SchemaUtil.getTableName(schemaName, indexTableName);
Properties props = PropertiesUtil.deepCopy(TEST_PROPERTIES);
try (Connection conn = DriverManager.getConnection(getUrl(), props)) {
String stmString1 =
"CREATE TABLE " + dataTableFullName
+ " (ID INTEGER NOT NULL PRIMARY KEY, NAME VARCHAR, ZIP INTEGER) "
+ tableDDLOptions;
conn.createStatement().execute(stmString1);
String upsertQuery = String.format("UPSERT INTO %s VALUES(?, ?, ?)", dataTableFullName);
PreparedStatement stmt1 = conn.prepareStatement(upsertQuery);
// Insert NROWS rows
final int NROWS = 1000;
for (int i = 0; i < NROWS; i++) {
upsertRow(stmt1, i);
}
conn.commit();
String stmtString2 =
String.format(
"CREATE %s INDEX %s ON %s (NAME) INCLUDE (ZIP) ASYNC ",
(localIndex ? "LOCAL" : ""), indexTableName, dataTableFullName);
conn.createStatement().execute(stmtString2);
// Run the index MR job and verify that the index table is built correctly
IndexTool indexTool = runIndexTool(directApi, useSnapshot, schemaName, dataTableName, indexTableName, null, 0, IndexTool.IndexVerifyType.BEFORE, new String[0]);
assertEquals(NROWS, indexTool.getJob().getCounters().findCounter(INPUT_RECORDS).getValue());
assertEquals(NROWS, indexTool.getJob().getCounters().findCounter(SCANNED_DATA_ROW_COUNT).getValue());
assertEquals(NROWS, indexTool.getJob().getCounters().findCounter(REBUILT_INDEX_ROW_COUNT).getValue());
assertEquals(0, indexTool.getJob().getCounters().findCounter(BEFORE_REBUILD_VALID_INDEX_ROW_COUNT).getValue());
assertEquals(0, indexTool.getJob().getCounters().findCounter(BEFORE_REBUILD_EXPIRED_INDEX_ROW_COUNT).getValue());
assertEquals(0, indexTool.getJob().getCounters().findCounter(BEFORE_REBUILD_INVALID_INDEX_ROW_COUNT).getValue());
assertEquals(0, indexTool.getJob().getCounters().findCounter(BEFORE_REBUILD_MISSING_INDEX_ROW_COUNT).getValue());
assertEquals(NROWS, indexTool.getJob().getCounters().findCounter(BEFORE_REBUILD_BEYOND_MAXLOOKBACK_MISSING_INDEX_ROW_COUNT).getValue());
assertEquals(0, indexTool.getJob().getCounters().findCounter(BEFORE_REBUILD_BEYOND_MAXLOOKBACK_INVALID_INDEX_ROW_COUNT).getValue());
long actualRowCount = IndexScrutiny.scrutinizeIndex(conn, dataTableFullName, indexTableFullName);
assertEquals(NROWS, actualRowCount);
// Add more rows and make sure that these rows will be visible to IndexTool
for (int i = NROWS; i < 2 * NROWS; i++) {
upsertRow(stmt1, i);
}
conn.commit();
indexTool = runIndexTool(directApi, useSnapshot, schemaName, dataTableName, indexTableName, null, 0, IndexTool.IndexVerifyType.BOTH, new String[0]);
assertEquals(2 * NROWS, indexTool.getJob().getCounters().findCounter(INPUT_RECORDS).getValue());
assertEquals(2 * NROWS, indexTool.getJob().getCounters().findCounter(SCANNED_DATA_ROW_COUNT).getValue());
assertEquals(0, indexTool.getJob().getCounters().findCounter(REBUILT_INDEX_ROW_COUNT).getValue());
assertEquals(2 * NROWS, indexTool.getJob().getCounters().findCounter(BEFORE_REBUILD_VALID_INDEX_ROW_COUNT).getValue());
assertEquals(0, indexTool.getJob().getCounters().findCounter(BEFORE_REBUILD_EXPIRED_INDEX_ROW_COUNT).getValue());
assertEquals(0, indexTool.getJob().getCounters().findCounter(BEFORE_REBUILD_INVALID_INDEX_ROW_COUNT).getValue());
assertEquals(0, indexTool.getJob().getCounters().findCounter(BEFORE_REBUILD_MISSING_INDEX_ROW_COUNT).getValue());
assertEquals(0, indexTool.getJob().getCounters().findCounter(BEFORE_REBUILD_BEYOND_MAXLOOKBACK_MISSING_INDEX_ROW_COUNT).getValue());
assertEquals(0, indexTool.getJob().getCounters().findCounter(BEFORE_REBUILD_BEYOND_MAXLOOKBACK_INVALID_INDEX_ROW_COUNT).getValue());
assertEquals(0, indexTool.getJob().getCounters().findCounter(AFTER_REBUILD_VALID_INDEX_ROW_COUNT).getValue());
assertEquals(0, indexTool.getJob().getCounters().findCounter(AFTER_REBUILD_EXPIRED_INDEX_ROW_COUNT).getValue());
assertEquals(0, indexTool.getJob().getCounters().findCounter(AFTER_REBUILD_INVALID_INDEX_ROW_COUNT).getValue());
assertEquals(0, indexTool.getJob().getCounters().findCounter(AFTER_REBUILD_MISSING_INDEX_ROW_COUNT).getValue());
assertEquals(0, indexTool.getJob().getCounters().findCounter(AFTER_REBUILD_BEYOND_MAXLOOKBACK_MISSING_INDEX_ROW_COUNT).getValue());
assertEquals(0, indexTool.getJob().getCounters().findCounter(AFTER_REBUILD_BEYOND_MAXLOOKBACK_INVALID_INDEX_ROW_COUNT).getValue());
actualRowCount = IndexScrutiny.scrutinizeIndex(conn, dataTableFullName, indexTableFullName);
assertEquals(2 * NROWS, actualRowCount);
dropIndexToolTables(conn);
}
}
public static class MutationCountingRegionObserver extends SimpleRegionObserver {
public static AtomicInteger mutationCount = new AtomicInteger(0);
public static void setMutationCount(int value) {
mutationCount.set(0);
}
public static int getMutationCount() {
return mutationCount.get();
}
@Override
public void preBatchMutate(ObserverContext<RegionCoprocessorEnvironment> c,
MiniBatchOperationInProgress<Mutation> miniBatchOp) throws HBaseIOException {
mutationCount.addAndGet(miniBatchOp.size());
}
}
private void verifyIndexTableRowKey(byte[] rowKey, String indexTableFullName) {
// The row key for the output table : timestamp | index table name | data row key
// The row key for the result table : timestamp | index table name | datable table region name |
// scan start row | scan stop row
// This method verifies the common prefix, i.e., "timestamp | index table name | ", since the rest of the
// fields may include the separator key
int offset = Bytes.indexOf(rowKey, IndexVerificationResultRepository.ROW_KEY_SEPARATOR_BYTE);
offset++;
byte[] indexTableFullNameBytes = Bytes.toBytes(indexTableFullName);
assertEquals(Bytes.compareTo(rowKey, offset, indexTableFullNameBytes.length, indexTableFullNameBytes, 0,
indexTableFullNameBytes.length), 0);
assertEquals(rowKey[offset + indexTableFullNameBytes.length],
IndexVerificationResultRepository.ROW_KEY_SEPARATOR_BYTE[0]);
}
private Cell getErrorMessageFromIndexToolOutputTable(Connection conn, String dataTableFullName, String indexTableFullName)
throws Exception {
byte[] indexTableFullNameBytes = Bytes.toBytes(indexTableFullName);
byte[] dataTableFullNameBytes = Bytes.toBytes(dataTableFullName);
Table hIndexTable = conn.unwrap(PhoenixConnection.class).getQueryServices()
.getTable(IndexVerificationOutputRepository.OUTPUT_TABLE_NAME_BYTES);
Scan scan = new Scan();
ResultScanner scanner = hIndexTable.getScanner(scan);
boolean dataTableNameCheck = false;
boolean indexTableNameCheck = false;
Cell errorMessageCell = null;
for (Result result = scanner.next(); result != null; result = scanner.next()) {
for (Cell cell : result.rawCells()) {
assertTrue(Bytes.compareTo(cell.getFamilyArray(), cell.getFamilyOffset(), cell.getFamilyLength(),
IndexVerificationOutputRepository.OUTPUT_TABLE_COLUMN_FAMILY, 0,
IndexVerificationOutputRepository.OUTPUT_TABLE_COLUMN_FAMILY.length) == 0);
if (Bytes.compareTo(cell.getQualifierArray(), cell.getQualifierOffset(), cell.getQualifierLength(),
IndexVerificationOutputRepository.DATA_TABLE_NAME_BYTES, 0, IndexVerificationOutputRepository.DATA_TABLE_NAME_BYTES.length) == 0) {
dataTableNameCheck = true;
assertTrue(Bytes.compareTo(cell.getValueArray(), cell.getValueOffset(), cell.getValueLength(),
dataTableFullNameBytes, 0, dataTableFullNameBytes.length) == 0);
} else if (Bytes.compareTo(cell.getQualifierArray(), cell.getQualifierOffset(), cell.getQualifierLength(),
IndexVerificationOutputRepository.INDEX_TABLE_NAME_BYTES, 0, IndexVerificationOutputRepository.INDEX_TABLE_NAME_BYTES.length) == 0) {
indexTableNameCheck = true;
assertTrue(Bytes.compareTo(cell.getValueArray(), cell.getValueOffset(), cell.getValueLength(),
indexTableFullNameBytes, 0, indexTableFullNameBytes.length) == 0);
} else if (Bytes.compareTo(cell.getQualifierArray(), cell.getQualifierOffset(), cell.getQualifierLength(),
IndexVerificationOutputRepository.ERROR_MESSAGE_BYTES, 0, IndexVerificationOutputRepository.ERROR_MESSAGE_BYTES.length) == 0) {
errorMessageCell = cell;
}
}
}
assertTrue( "DataTableNameCheck was false", dataTableNameCheck);
assertTrue("IndexTableNameCheck was false", indexTableNameCheck);
assertTrue("Error message cell was null", errorMessageCell != null);
verifyIndexTableRowKey(CellUtil.cloneRow(errorMessageCell), indexTableFullName);
hIndexTable = conn.unwrap(PhoenixConnection.class).getQueryServices()
.getTable(IndexVerificationResultRepository.RESULT_TABLE_NAME_BYTES);
scan = new Scan();
scanner = hIndexTable.getScanner(scan);
Result result = scanner.next();
assert(result != null);
verifyIndexTableRowKey(CellUtil.cloneRow(result.rawCells()[0]), indexTableFullName);
return errorMessageCell;
}
@Test
public void testIndexToolVerifyBeforeAndBothOptions() throws Exception {
// This test is for building non-transactional global indexes with direct api
if (localIndex || transactional || !directApi || useSnapshot || useTenantId) {
return;
}
Properties props = PropertiesUtil.deepCopy(TEST_PROPERTIES);
try (Connection conn = DriverManager.getConnection(getUrl(), props)) {
String schemaName = generateUniqueName();
String dataTableName = generateUniqueName();
String dataTableFullName = SchemaUtil.getTableName(schemaName, dataTableName);
String indexTableName = generateUniqueName();
String viewName = generateUniqueName();
String viewFullName = SchemaUtil.getTableName(schemaName, viewName);
conn.createStatement().execute("CREATE TABLE " + dataTableFullName
+ " (ID INTEGER NOT NULL PRIMARY KEY, NAME VARCHAR, ZIP INTEGER) "
+ tableDDLOptions);
conn.commit();
conn.createStatement().execute("CREATE VIEW " + viewFullName + " AS SELECT * FROM " + dataTableFullName);
conn.commit();
// Insert a row
conn.createStatement().execute("upsert into " + viewFullName + " values (1, 'Phoenix', 12345)");
conn.commit();
conn.createStatement().execute(String.format(
"CREATE INDEX %s ON %s (NAME) INCLUDE (ZIP) ASYNC", indexTableName, viewFullName));
TestUtil.addCoprocessor(conn, "_IDX_" + dataTableFullName, MutationCountingRegionObserver.class);
// Run the index MR job and verify that the index table rebuild succeeds
runIndexTool(directApi, useSnapshot, schemaName, viewName, indexTableName,
null, 0, IndexTool.IndexVerifyType.AFTER);
assertEquals(1, MutationCountingRegionObserver.getMutationCount());
MutationCountingRegionObserver.setMutationCount(0);
// Since all the rows are in the index table, running the index tool with the "-v BEFORE" option should not
// write any index rows
runIndexTool(directApi, useSnapshot, schemaName, viewName, indexTableName,
null, 0, IndexTool.IndexVerifyType.BEFORE);
assertEquals(0, MutationCountingRegionObserver.getMutationCount());
// The "-v BOTH" option should not write any index rows either
runIndexTool(directApi, useSnapshot, schemaName, viewName, indexTableName,
null, 0, IndexTool.IndexVerifyType.BOTH);
assertEquals(0, MutationCountingRegionObserver.getMutationCount());
dropIndexToolTables(conn);
}
}
@Test
public void testIndexToolVerifyAfterOption() throws Exception {
// This test is for building non-transactional global indexes with direct api
if (localIndex || transactional || !directApi || useSnapshot || useTenantId) {
return;
}
Properties props = PropertiesUtil.deepCopy(TEST_PROPERTIES);
try (Connection conn = DriverManager.getConnection(getUrl(), props)) {
String schemaName = generateUniqueName();
String dataTableName = generateUniqueName();
String dataTableFullName = SchemaUtil.getTableName(schemaName, dataTableName);
String indexTableName = generateUniqueName();
String viewName = generateUniqueName();
String viewFullName = SchemaUtil.getTableName(schemaName, viewName);
conn.createStatement().execute("CREATE TABLE " + dataTableFullName
+ " (ID INTEGER NOT NULL PRIMARY KEY, NAME VARCHAR, ZIP INTEGER) "
+ tableDDLOptions);
conn.commit();
conn.createStatement().execute("CREATE VIEW " + viewFullName + " AS SELECT * FROM " + dataTableFullName);
conn.commit();
// Insert a row
conn.createStatement().execute("upsert into " + viewFullName + " values (1, 'Phoenix', 12345)");
conn.commit();
// Configure IndexRegionObserver to fail the first write phase. This should not
// lead to any change on index and thus the index verify during index rebuild should fail
IndexRegionObserver.setIgnoreIndexRebuildForTesting(true);
conn.createStatement().execute(String.format(
"CREATE INDEX %s ON %s (NAME) INCLUDE (ZIP) ASYNC", indexTableName, viewFullName));
// Run the index MR job and verify that the index table rebuild fails
runIndexTool(directApi, useSnapshot, schemaName, viewName, indexTableName,
null, -1, IndexTool.IndexVerifyType.AFTER);
// The index tool output table should report that there is a missing index row
Cell cell = getErrorMessageFromIndexToolOutputTable(conn, dataTableFullName, "_IDX_" + dataTableFullName);
try {
String expectedErrorMsg = IndexRebuildRegionScanner.ERROR_MESSAGE_MISSING_INDEX_ROW_BEYOND_MAX_LOOKBACK;
String actualErrorMsg = Bytes.toString(cell.getValueArray(), cell.getValueOffset(), cell.getValueLength());
assertTrue(expectedErrorMsg.equals(actualErrorMsg));
} catch(Exception ex){
Assert.fail("Fail to parsing the error message from IndexToolOutputTable");
}
IndexRegionObserver.setIgnoreIndexRebuildForTesting(false);
dropIndexToolTables(conn);
}
}
@Test
public void testIndexToolOnlyVerifyOption() throws Exception {
// This test is for building non-transactional global indexes with direct api
if (localIndex || transactional || !directApi || useSnapshot || useTenantId) {
return;
}
String schemaName = generateUniqueName();
String dataTableName = generateUniqueName();
String dataTableFullName = SchemaUtil.getTableName(schemaName, dataTableName);
String indexTableName = generateUniqueName();
String indexTableFullName = SchemaUtil.getTableName(schemaName, indexTableName);
Properties props = PropertiesUtil.deepCopy(TEST_PROPERTIES);
try (Connection conn = DriverManager.getConnection(getUrl(), props)) {
conn.createStatement().execute("CREATE TABLE " + dataTableFullName
+ " (ID INTEGER NOT NULL PRIMARY KEY, NAME VARCHAR, CODE VARCHAR) COLUMN_ENCODED_BYTES=0");
// Insert a row
conn.createStatement().execute("upsert into " + dataTableFullName + " values (1, 'Phoenix', 'A')");
conn.commit();
conn.createStatement().execute(String.format(
"CREATE INDEX %s ON %s (NAME) INCLUDE (CODE) ASYNC", indexTableName, dataTableFullName));
// Run the index MR job to only verify that each data table row has a corresponding index row
// IndexTool will go through each data table row and record the mismatches in the output table
// called PHOENIX_INDEX_TOOL
runIndexTool(directApi, useSnapshot, schemaName, dataTableName, indexTableName,
null, -1, IndexTool.IndexVerifyType.ONLY);
Cell cell = getErrorMessageFromIndexToolOutputTable(conn, dataTableFullName, indexTableFullName);
try {
String expectedErrorMsg = IndexRebuildRegionScanner.ERROR_MESSAGE_MISSING_INDEX_ROW_BEYOND_MAX_LOOKBACK;
String actualErrorMsg = Bytes.toString(cell.getValueArray(), cell.getValueOffset(), cell.getValueLength());
assertTrue(expectedErrorMsg.equals(actualErrorMsg));
} catch(Exception ex) {
Assert.fail("Fail to parsing the error message from IndexToolOutputTable");
}
// Delete the output table for the next test
dropIndexToolTables(conn);
// Run the index tool to populate the index while verifying rows
runIndexTool(directApi, useSnapshot, schemaName, dataTableName, indexTableName,
null, 0, IndexTool.IndexVerifyType.AFTER);
runIndexTool(directApi, useSnapshot, schemaName, dataTableName, indexTableName,
null, 0, IndexTool.IndexVerifyType.ONLY);
dropIndexToolTables(conn);
}
}
@Test
public void testIndexToolVerifyWithExpiredIndexRows() throws Exception {
// This test is for building non-transactional global indexes with direct api
if (localIndex || transactional || !directApi || useSnapshot || useTenantId) {
return;
}
String schemaName = generateUniqueName();
String dataTableName = generateUniqueName();
String dataTableFullName = SchemaUtil.getTableName(schemaName, dataTableName);
String indexTableName = generateUniqueName();
String indexTableFullName = SchemaUtil.getTableName(schemaName, indexTableName);
Properties props = PropertiesUtil.deepCopy(TEST_PROPERTIES);
try (Connection conn = DriverManager.getConnection(getUrl(), props)) {
conn.createStatement().execute("CREATE TABLE " + dataTableFullName
+ " (ID INTEGER NOT NULL PRIMARY KEY, NAME VARCHAR, CODE VARCHAR) COLUMN_ENCODED_BYTES=0");
// Insert a row
conn.createStatement()
.execute("upsert into " + dataTableFullName + " values (1, 'Phoenix', 'A')");
conn.commit();
conn.createStatement()
.execute(String.format("CREATE INDEX %s ON %s (NAME) INCLUDE (CODE) ASYNC",
indexTableName, dataTableFullName));
runIndexTool(directApi, useSnapshot, schemaName, dataTableName, indexTableName, null, -1,
IndexTool.IndexVerifyType.ONLY);
Cell cell =
getErrorMessageFromIndexToolOutputTable(conn, dataTableFullName,
indexTableFullName);
try {
String expectedErrorMsg = IndexRebuildRegionScanner.ERROR_MESSAGE_MISSING_INDEX_ROW_BEYOND_MAX_LOOKBACK;
String actualErrorMsg = Bytes.toString(cell.getValueArray(), cell.getValueOffset(), cell.getValueLength());
assertTrue(expectedErrorMsg.equals(actualErrorMsg));
} catch(Exception ex) {
Assert.fail("Fail to parsing the error message from IndexToolOutputTable");
}
// Run the index tool to populate the index while verifying rows
runIndexTool(directApi, useSnapshot, schemaName, dataTableName, indexTableName, null, 0,
IndexTool.IndexVerifyType.AFTER);
// Set ttl of index table ridiculously low so that all data is expired
Admin admin = conn.unwrap(PhoenixConnection.class).getQueryServices().getAdmin();
TableName indexTable = TableName.valueOf(indexTableFullName);
HColumnDescriptor desc = admin.getTableDescriptor(indexTable).getColumnFamilies()[0];
desc.setTimeToLive(1);
admin.modifyColumn(indexTable, desc);
Thread.sleep(1000);
Pair<Integer, Integer> status = admin.getAlterStatus(indexTable);
int retry = 0;
while (retry < 20 && status.getFirst() != 0) {
Thread.sleep(2000);
status = admin.getAlterStatus(indexTable);
}
assertTrue(status.getFirst() == 0);
TableName indexToolOutputTable = TableName.valueOf(IndexVerificationOutputRepository.OUTPUT_TABLE_NAME_BYTES);
admin.disableTable(indexToolOutputTable);
admin.deleteTable(indexToolOutputTable);
// Run the index tool using the only-verify option, verify it gives no mismatch
runIndexTool(directApi, useSnapshot, schemaName, dataTableName, indexTableName, null, 0,
IndexTool.IndexVerifyType.ONLY);
Scan scan = new Scan();
Table hIndexToolTable =
conn.unwrap(PhoenixConnection.class).getQueryServices()
.getTable(indexToolOutputTable.getName());
Result r = hIndexToolTable.getScanner(scan).next();
assertTrue(r == null);
dropIndexToolTables(conn);
}
}
@Test
public void testIndexToolWithTenantId() throws Exception {
if (!useTenantId) { return;}
String tenantId = generateUniqueName();
String schemaName = generateUniqueName();
String dataTableName = generateUniqueName();
String viewTenantName = generateUniqueName();
String indexNameGlobal = generateUniqueName();
String indexNameTenant = generateUniqueName();
String viewIndexTableName = "_IDX_" + dataTableName;
Properties props = PropertiesUtil.deepCopy(TEST_PROPERTIES);
Connection connGlobal = DriverManager.getConnection(getUrl(), props);
props.setProperty(PhoenixRuntime.TENANT_ID_ATTRIB, tenantId);
Connection connTenant = DriverManager.getConnection(getUrl(), props);
String createTblStr = "CREATE TABLE %s (TENANT_ID VARCHAR(15) NOT NULL,ID INTEGER NOT NULL"
+ ", NAME VARCHAR, CONSTRAINT PK_1 PRIMARY KEY (TENANT_ID, ID)) MULTI_TENANT=true";
String createViewStr = "CREATE VIEW %s AS SELECT * FROM %s";
String upsertQueryStr = "UPSERT INTO %s (TENANT_ID, ID, NAME) VALUES('%s' , %d, '%s')";
String createIndexStr = "CREATE INDEX %s ON %s (NAME) ";
try {
String tableStmtGlobal = String.format(createTblStr, dataTableName);
connGlobal.createStatement().execute(tableStmtGlobal);
String viewStmtTenant = String.format(createViewStr, viewTenantName, dataTableName);
connTenant.createStatement().execute(viewStmtTenant);
String idxStmtTenant = String.format(createIndexStr, indexNameTenant, viewTenantName);
connTenant.createStatement().execute(idxStmtTenant);
connTenant.createStatement()
.execute(String.format(upsertQueryStr, viewTenantName, tenantId, 1, "x"));
connTenant.commit();
runIndexTool(true, false, "", viewTenantName, indexNameTenant,
tenantId, 0, new String[0]);
String selectSql = String.format("SELECT ID FROM %s WHERE NAME='x'", viewTenantName);
ResultSet rs = connTenant.createStatement().executeQuery("EXPLAIN " + selectSql);
String actualExplainPlan = QueryUtil.getExplainPlan(rs);
assertExplainPlan(false, actualExplainPlan, "", viewIndexTableName);
rs = connTenant.createStatement().executeQuery(selectSql);
assertTrue(rs.next());
assertEquals(1, rs.getInt(1));
assertFalse(rs.next());
// Remove from tenant view index and build.
ConnectionQueryServices queryServices = connGlobal.unwrap(PhoenixConnection.class).getQueryServices();
Admin admin = queryServices.getAdmin();
TableName tableName = TableName.valueOf(viewIndexTableName);
admin.disableTable(tableName);
admin.truncateTable(tableName, false);
runIndexTool(true, false, "", viewTenantName, indexNameTenant,
tenantId, 0, new String[0]);
Table htable= queryServices.getTable(Bytes.toBytes(viewIndexTableName));
int count = getUtility().countRows(htable);
// Confirm index has rows
assertTrue(count == 1);
selectSql = String.format("SELECT /*+ INDEX(%s) */ COUNT(*) FROM %s",
indexNameTenant, viewTenantName);
rs = connTenant.createStatement().executeQuery(selectSql);
assertTrue(rs.next());
assertEquals(1, rs.getInt(1));
assertFalse(rs.next());
String idxStmtGlobal =
String.format(createIndexStr, indexNameGlobal, dataTableName);
connGlobal.createStatement().execute(idxStmtGlobal);
// run the index MR job this time with tenant id.
// We expect it to return -1 because indexTable is not correct for this tenant.
runIndexTool(true, false, schemaName, dataTableName, indexNameGlobal,
tenantId, -1, new String[0]);
} finally {
connGlobal.close();
connTenant.close();
}
}
@Test
public void testSecondaryGlobalIndexFailure() throws Exception {
// This test is for building non-transactional global indexes with direct api
if (localIndex || transactional || !directApi || useSnapshot || useTenantId) {
return;
}
String schemaName = generateUniqueName();
String dataTableName = generateUniqueName();
String dataTableFullName = SchemaUtil.getTableName(schemaName, dataTableName);
String indexTableName = generateUniqueName();
Properties props = PropertiesUtil.deepCopy(TEST_PROPERTIES);
try (Connection conn = DriverManager.getConnection(getUrl(), props)) {
String stmString1 =
"CREATE TABLE " + dataTableFullName
+ " (ID INTEGER NOT NULL PRIMARY KEY, NAME VARCHAR, ZIP INTEGER) "
+ tableDDLOptions;
conn.createStatement().execute(stmString1);
String upsertQuery = String.format("UPSERT INTO %s VALUES(?, ?, ?)", dataTableFullName);
PreparedStatement stmt1 = conn.prepareStatement(upsertQuery);
// Insert two rows
upsertRow(stmt1, 1);
upsertRow(stmt1, 2);
conn.commit();
String stmtString2 =
String.format(
"CREATE %s INDEX %s ON %s (LPAD(UPPER(NAME, 'en_US'),8,'x')||'_xyz') ASYNC ",
(localIndex ? "LOCAL" : ""), indexTableName, dataTableFullName);
conn.createStatement().execute(stmtString2);
// Run the index MR job.
runIndexTool(directApi, useSnapshot, schemaName, dataTableName, indexTableName);
String qIndexTableName = SchemaUtil.getQualifiedTableName(schemaName, indexTableName);
// Verify that the index table is in the ACTIVE state
assertEquals(PIndexState.ACTIVE, TestUtil.getIndexState(conn, qIndexTableName));
ConnectionQueryServices queryServices = conn.unwrap(PhoenixConnection.class).getQueryServices();
Admin admin = queryServices.getAdmin();
TableName tableName = TableName.valueOf(qIndexTableName);
admin.disableTable(tableName);
// Run the index MR job and it should fail (return -1)
runIndexTool(directApi, useSnapshot, schemaName, dataTableName, indexTableName,
null, -1, new String[0]);
// Verify that the index table should be still in the ACTIVE state
assertEquals(PIndexState.ACTIVE, TestUtil.getIndexState(conn, qIndexTableName));
}
}
@Test
public void testSaltedVariableLengthPK() throws Exception {
if (!mutable) return;
if (transactional) return;
String schemaName = generateUniqueName();
String dataTableName = generateUniqueName();
String dataTableFullName = SchemaUtil.getTableName(schemaName, dataTableName);
String indexTableName = generateUniqueName();
try (Connection conn =
DriverManager.getConnection(getUrl(), PropertiesUtil.deepCopy(TEST_PROPERTIES))) {
String dataDDL =
"CREATE TABLE " + dataTableFullName + "(\n"
+ "ID VARCHAR NOT NULL PRIMARY KEY,\n"
+ "\"info\".CAR_NUM VARCHAR(18) NULL,\n"
+ "\"info\".CAP_DATE VARCHAR NULL,\n" + "\"info\".ORG_ID BIGINT NULL,\n"
+ "\"info\".ORG_NAME VARCHAR(255) NULL\n" + ") SALT_BUCKETS=3";
conn.createStatement().execute(dataDDL);
String upsert =
"UPSERT INTO " + dataTableFullName
+ "(ID,CAR_NUM,CAP_DATE,ORG_ID,ORG_NAME) VALUES('1','car1','2016-01-01 00:00:00',11,'orgname1')";
conn.createStatement().execute(upsert);
conn.commit();
String indexDDL =
String.format(
"CREATE %s INDEX %s on %s (\"info\".CAR_NUM,\"info\".CAP_DATE) ASYNC",
(localIndex ? "LOCAL" : ""), indexTableName, dataTableFullName);
conn.createStatement().execute(indexDDL);
runIndexTool(directApi, useSnapshot, schemaName, dataTableName, indexTableName);
ResultSet rs =
conn.createStatement().executeQuery(
"SELECT ORG_ID,CAP_DATE,CAR_NUM,ORG_NAME FROM " + dataTableFullName
+ " WHERE CAR_NUM='car1' AND CAP_DATE>='2016-01-01' AND CAP_DATE<='2016-05-02' LIMIT 10");
assertTrue(rs.next());
int orgId = rs.getInt(1);
assertEquals(11, orgId);
}
}
/**
* Test presplitting an index table
*/
@Test
public void testSplitIndex() throws Exception {
if (localIndex) return; // can't split local indexes
if (!mutable) return;
if (transactional) return;
String schemaName = generateUniqueName();
String dataTableName = generateUniqueName();
String dataTableFullName = SchemaUtil.getTableName(schemaName, dataTableName);
final TableName dataTN = TableName.valueOf(dataTableFullName);
String indexTableName = generateUniqueName();
String indexTableFullName = SchemaUtil.getTableName(schemaName, indexTableName);
TableName indexTN = TableName.valueOf(indexTableFullName);
try (Connection conn =
DriverManager.getConnection(getUrl(), PropertiesUtil.deepCopy(TEST_PROPERTIES));
HBaseAdmin admin = conn.unwrap(PhoenixConnection.class).getQueryServices().getAdmin()) {
String dataDDL =
"CREATE TABLE " + dataTableFullName + "(\n"
+ "ID VARCHAR NOT NULL PRIMARY KEY,\n"
+ "\"info\".CAR_NUM VARCHAR(18) NULL,\n"
+ "\"test\".CAR_NUM VARCHAR(18) NULL,\n"
+ "\"info\".CAP_DATE VARCHAR NULL,\n" + "\"info\".ORG_ID BIGINT NULL,\n"
+ "\"info\".ORG_NAME VARCHAR(255) NULL\n" + ") COLUMN_ENCODED_BYTES = 0";
conn.createStatement().execute(dataDDL);
String[] carNumPrefixes = new String[] {"a", "b", "c", "d"};
// split the data table, as the tool splits the index table to have the same # of regions
// doesn't really matter what the split points are, we just want a target # of regions
int numSplits = carNumPrefixes.length;
int targetNumRegions = numSplits + 1;
byte[][] splitPoints = new byte[numSplits][];
for (String prefix : carNumPrefixes) {
splitPoints[--numSplits] = Bytes.toBytes(prefix);
}
HTableDescriptor dataTD = admin.getTableDescriptor(dataTN);
admin.disableTable(dataTN);
admin.deleteTable(dataTN);
admin.createTable(dataTD, splitPoints);
assertEquals(targetNumRegions, admin.getTableRegions(dataTN).size());
// insert data where index column values start with a, b, c, d
int idCounter = 1;
try (PreparedStatement ps = conn.prepareStatement("UPSERT INTO " + dataTableFullName
+ "(ID,\"info\".CAR_NUM,\"test\".CAR_NUM,CAP_DATE,ORG_ID,ORG_NAME) VALUES(?,?,?,'2016-01-01 00:00:00',11,'orgname1')")){
for (String carNum : carNumPrefixes) {
for (int i = 0; i < 100; i++) {
ps.setString(1, idCounter++ + "");
ps.setString(2, carNum + "_" + i);
ps.setString(3, "test-" + carNum + "_ " + i);
ps.addBatch();
}
}
ps.executeBatch();
conn.commit();
}
String indexDDL =
String.format(
"CREATE INDEX %s on %s (\"info\".CAR_NUM,\"test\".CAR_NUM,\"info\".CAP_DATE) ASYNC",
indexTableName, dataTableFullName);
conn.createStatement().execute(indexDDL);
// run with 50% sampling rate, split if data table more than 3 regions
runIndexTool(directApi, useSnapshot, schemaName, dataTableName, indexTableName, "-sp", "50", "-spa", "3");
assertEquals(targetNumRegions, admin.getTableRegions(indexTN).size());
List<Cell> values = new ArrayList<>();
// every index region should have been written to, if the index table was properly split uniformly
for (HRegion region : getUtility().getHBaseCluster().getRegions(indexTN)) {
values.clear();
RegionScanner scanner = region.getScanner(new Scan());
scanner.next(values);
if (values.isEmpty()) {
fail("Region did not have any results: " + region.getRegionInfo());
}
}
}
}
public static void assertExplainPlan(boolean localIndex, String actualExplainPlan,
String dataTableFullName, String indexTableFullName) {
String expectedExplainPlan;
if (localIndex) {
expectedExplainPlan = String.format(" RANGE SCAN OVER %s [1,", dataTableFullName);
} else {
expectedExplainPlan = String.format(" RANGE SCAN OVER %s", indexTableFullName);
}
assertTrue(actualExplainPlan + "\n expected to contain \n" + expectedExplainPlan,
actualExplainPlan.contains(expectedExplainPlan));
}
private static List<String> getArgList (boolean directApi, boolean useSnapshot, String schemaName,
String dataTable, String indxTable, String tenantId,
IndexTool.IndexVerifyType verifyType, Long startTime, Long endTime) {
List<String> args = Lists.newArrayList();
if (schemaName != null) {
args.add("-s");
args.add(schemaName);
}
args.add("-dt");
args.add(dataTable);
args.add("-it");
args.add(indxTable);
if (directApi) {
args.add("-direct");
}
args.add("-v" + verifyType.getValue()); // verify index rows inline
// Need to run this job in foreground for the test to be deterministic
args.add("-runfg");
if (useSnapshot) {
args.add("-snap");
}
if (tenantId != null) {
args.add("-tenant");
args.add(tenantId);
}
if(startTime != null) {
args.add("-st");
args.add(String.valueOf(startTime));
}
if(endTime != null) {
args.add("-et");
args.add(String.valueOf(endTime));
}
args.add("-op");
args.add("/tmp/" + UUID.randomUUID().toString());
return args;
}
public static String[] getArgValues(boolean directApi, boolean useSnapshot, String schemaName,
String dataTable, String indexTable, String tenantId, IndexTool.IndexVerifyType verifyType) {
List<String> args = getArgList(directApi, useSnapshot, schemaName, dataTable, indexTable,
tenantId, verifyType, null, null);
return args.toArray(new String[0]);
}
public static String [] getArgValues(boolean directApi, boolean useSnapshot, String schemaName,
String dataTable, String indexTable, String tenantId,
IndexTool.IndexVerifyType verifyType, Long startTime, Long endTime) {
List<String> args = getArgList(directApi, useSnapshot, schemaName, dataTable, indexTable,
tenantId, verifyType, startTime, endTime);
return args.toArray(new String[0]);
}
public static void upsertRow(PreparedStatement stmt, int i) throws SQLException {
// insert row
stmt.setInt(1, i);
stmt.setString(2, "uname" + String.valueOf(i));
stmt.setInt(3, 95050 + i);
stmt.executeUpdate();
}
public static void runIndexTool(boolean directApi, boolean useSnapshot, String schemaName,
String dataTableName, String indexTableName) throws Exception {
runIndexTool(directApi, useSnapshot, schemaName, dataTableName, indexTableName, new String[0]);
}
private static void verifyMapper(Job job, boolean directApi, boolean useSnapshot, String schemaName,
String dataTableName, String indexTableName, String tenantId) throws Exception {
Properties props = PropertiesUtil.deepCopy(TEST_PROPERTIES);
if (tenantId != null) {
props.setProperty(PhoenixRuntime.TENANT_ID_ATTRIB, tenantId);
}
try (Connection conn =
DriverManager.getConnection(getUrl(), props)) {
PTable dataTable = PhoenixRuntime.getTable(conn, SchemaUtil.getTableName(schemaName, dataTableName));
PTable indexTable = PhoenixRuntime.getTable(conn, SchemaUtil.getTableName(schemaName, indexTableName));
boolean transactional = dataTable.isTransactional();
boolean localIndex = PTable.IndexType.LOCAL.equals(indexTable.getIndexType());
if ((localIndex || !transactional) && !useSnapshot) {
assertEquals(job.getMapperClass(), PhoenixServerBuildIndexMapper.class);
} else {
assertEquals(job.getMapperClass(), PhoenixIndexImportDirectMapper.class);
}
}
}
public static void runIndexTool(boolean directApi, boolean useSnapshot, String schemaName,
String dataTableName, String indexTableName, String... additionalArgs) throws Exception {
runIndexTool(directApi, useSnapshot, schemaName, dataTableName, indexTableName, null, 0, additionalArgs);
}
public static IndexTool runIndexTool(boolean directApi, boolean useSnapshot, String schemaName,
String dataTableName, String indexTableName, String tenantId, int expectedStatus,
String... additionalArgs) throws Exception {
return runIndexTool(directApi, useSnapshot, schemaName, dataTableName, indexTableName, tenantId, expectedStatus,
IndexTool.IndexVerifyType.NONE, additionalArgs);
}
public static IndexTool runIndexTool(boolean directApi, boolean useSnapshot, String schemaName,
String dataTableName, String indexTableName, String tenantId,
int expectedStatus, IndexTool.IndexVerifyType verifyType,
String... additionalArgs) throws Exception {
IndexTool indexingTool = new IndexTool();
Configuration conf = new Configuration(getUtility().getConfiguration());
conf.set(QueryServices.TRANSACTIONS_ENABLED, Boolean.TRUE.toString());
indexingTool.setConf(conf);
final String[] cmdArgs = getArgValues(directApi, useSnapshot, schemaName, dataTableName,
indexTableName, tenantId, verifyType);
List<String> cmdArgList = new ArrayList<>(Arrays.asList(cmdArgs));
cmdArgList.addAll(Arrays.asList(additionalArgs));
int status = indexingTool.run(cmdArgList.toArray(new String[cmdArgList.size()]));
if (expectedStatus == 0) {
verifyMapper(indexingTool.getJob(), directApi, useSnapshot, schemaName, dataTableName, indexTableName, tenantId);
}
assertEquals(expectedStatus, status);
return indexingTool;
}
}