blob: e36d90acf3f036cb0a6c99010c5a911b97eef23a [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.phoenix.end2end.index;
import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.TABLE_FAMILY_BYTES;
import static org.junit.Assert.assertTrue;
import java.sql.Connection;
import java.sql.DriverManager;
import java.sql.SQLException;
import java.sql.Statement;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.Random;
import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.client.Get;
import org.apache.hadoop.hbase.client.Result;
import org.apache.hadoop.hbase.client.Table;
import org.apache.hadoop.hbase.coprocessor.RegionCoprocessorEnvironment;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.phoenix.coprocessor.MetaDataProtocol.MutationCode;
import org.apache.phoenix.coprocessor.MetaDataRegionObserver;
import org.apache.phoenix.coprocessor.MetaDataRegionObserver.BuildIndexScheduleTask;
import org.apache.phoenix.end2end.BaseUniqueNamesOwnClusterIT;
import org.apache.phoenix.jdbc.PhoenixConnection;
import org.apache.phoenix.jdbc.PhoenixDatabaseMetaData;
import org.apache.phoenix.query.QueryServices;
import org.apache.phoenix.schema.PIndexState;
import org.apache.phoenix.util.EnvironmentEdgeManager;
import org.apache.phoenix.util.IndexUtil;
import org.apache.phoenix.util.ReadOnlyProps;
import org.apache.phoenix.util.SchemaUtil;
import org.apache.phoenix.util.TestUtil;
import org.junit.BeforeClass;
import org.junit.Test;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import com.google.common.collect.Maps;
public class IndexRebuildIncrementDisableCountIT extends BaseUniqueNamesOwnClusterIT {
private static final Logger LOGGER =
LoggerFactory.getLogger(IndexRebuildIncrementDisableCountIT.class);
private static long pendingDisableCount = 0;
private static String ORG_PREFIX = "ORG";
private static Result pendingDisableCountResult = null;
private static String indexState = null;
private static final Random RAND = new Random(5);
private static final int WAIT_AFTER_DISABLED = 5000;
private static final long REBUILD_PERIOD = 50000;
private static final long REBUILD_INTERVAL = 2000;
private static RegionCoprocessorEnvironment indexRebuildTaskRegionEnvironment;
private static String schemaName;
private static String tableName;
private static String fullTableName;
private static String indexName;
private static String fullIndexName;
private static Connection conn;
private static PhoenixConnection phoenixConn;
@BeforeClass
public static synchronized void doSetup() throws Exception {
Map<String, String> serverProps = Maps.newHashMapWithExpectedSize(10);
serverProps.put(QueryServices.INDEX_FAILURE_HANDLING_REBUILD_ATTRIB,
Boolean.TRUE.toString());
serverProps.put(QueryServices.INDEX_FAILURE_HANDLING_REBUILD_INTERVAL_ATTRIB,
Long.toString(REBUILD_INTERVAL));
serverProps.put(QueryServices.INDEX_REBUILD_DISABLE_TIMESTAMP_THRESHOLD, "50000000");
serverProps.put(QueryServices.INDEX_FAILURE_HANDLING_REBUILD_PERIOD,
Long.toString(REBUILD_PERIOD)); // batch at 50 seconds
serverProps.put(QueryServices.INDEX_FAILURE_HANDLING_REBUILD_OVERLAP_FORWARD_TIME_ATTRIB,
Long.toString(WAIT_AFTER_DISABLED));
Map<String, String> clientProps = Maps.newHashMapWithExpectedSize(1);
clientProps.put(HConstants.HBASE_CLIENT_RETRIES_NUMBER, "2");
setUpTestDriver(new ReadOnlyProps(serverProps.entrySet().iterator()),
new ReadOnlyProps(clientProps.entrySet().iterator()));
indexRebuildTaskRegionEnvironment =
(RegionCoprocessorEnvironment) getUtility()
.getRSForFirstRegionInTable(
PhoenixDatabaseMetaData.SYSTEM_CATALOG_HBASE_TABLE_NAME)
.getRegions(PhoenixDatabaseMetaData.SYSTEM_CATALOG_HBASE_TABLE_NAME)
.get(0).getCoprocessorHost()
.findCoprocessorEnvironment(MetaDataRegionObserver.class.getName());
MetaDataRegionObserver.initRebuildIndexConnectionProps(
indexRebuildTaskRegionEnvironment.getConfiguration());
schemaName = generateUniqueName();
tableName = generateUniqueName();
fullTableName = SchemaUtil.getTableName(schemaName, tableName);
indexName = generateUniqueName();
fullIndexName = SchemaUtil.getTableName(schemaName, indexName);
conn = DriverManager.getConnection(getUrl());
phoenixConn = conn.unwrap(PhoenixConnection.class);
}
static long getPendingDisableCount(PhoenixConnection conn, String indexTableName) {
byte[] indexTableKey = SchemaUtil.getTableKeyFromFullName(indexTableName);
Get get = new Get(indexTableKey);
get.addColumn(TABLE_FAMILY_BYTES, PhoenixDatabaseMetaData.PENDING_DISABLE_COUNT_BYTES);
get.addColumn(TABLE_FAMILY_BYTES, PhoenixDatabaseMetaData.INDEX_STATE_BYTES);
try {
pendingDisableCountResult =
conn.getQueryServices()
.getTable(SchemaUtil.getPhysicalTableName(
PhoenixDatabaseMetaData.SYSTEM_CATALOG_NAME,
conn.getQueryServices().getProps()).getName())
.get(get);
return Bytes.toLong(pendingDisableCountResult.getValue(TABLE_FAMILY_BYTES,
PhoenixDatabaseMetaData.PENDING_DISABLE_COUNT_BYTES));
} catch (Exception e) {
LOGGER.error("Exception in getPendingDisableCount: " + e);
return 0;
}
}
private static void checkIndexPendingDisableCount(final PhoenixConnection conn,
final String indexTableName) throws Exception {
Runnable runnable = new Runnable() {
@Override
public void run() {
try {
while (!TestUtil.checkIndexState(conn, indexTableName, PIndexState.ACTIVE,
0L)) {
long count = getPendingDisableCount(conn, indexTableName);
if (count > 0) {
indexState =
new String(
pendingDisableCountResult.getValue(TABLE_FAMILY_BYTES,
PhoenixDatabaseMetaData.INDEX_STATE_BYTES));
pendingDisableCount = count;
}
Thread.sleep(100);
}
} catch (Exception e) {
LOGGER.error("Error in checkPendingDisableCount : " + e);
}
}
};
Thread t1 = new Thread(runnable);
t1.start();
}
static String getOrgId(long id) {
return ORG_PREFIX + "-" + id;
}
static String getRandomOrgId(int maxOrgId) {
return getOrgId(Math.round(Math.random() * maxOrgId));
}
private static void mutateRandomly(Connection conn, String tableName, int maxOrgId) {
try {
Statement stmt = conn.createStatement();
for (int i = 0; i < 10000; i++) {
stmt.executeUpdate(
"UPSERT INTO " + tableName + " VALUES('" + getRandomOrgId(maxOrgId) + "'," + i
+ "," + (i + 1) + "," + (i + 2) + ")");
}
conn.commit();
} catch (Exception e) {
LOGGER.error("Client side exception:" + e);
}
}
private static MutationCode updateIndexState(PhoenixConnection phoenixConn,
String fullIndexName, PIndexState state) throws Throwable {
Table metaTable =
phoenixConn.getQueryServices()
.getTable(PhoenixDatabaseMetaData.SYSTEM_CATALOG_NAME_BYTES);
long ts = EnvironmentEdgeManager.currentTimeMillis();
return IndexUtil.updateIndexState(fullIndexName, ts, metaTable, state).getMutationCode();
}
@Test
public void testIndexStateTransitions() throws Throwable {
// create table and indices
String createTableSql =
"CREATE TABLE " + fullTableName
+ "(org_id VARCHAR NOT NULL PRIMARY KEY, v1 INTEGER, v2 INTEGER, v3 INTEGER)";
conn.createStatement().execute(createTableSql);
conn.createStatement()
.execute("CREATE INDEX " + indexName + " ON " + fullTableName + "(v1)");
conn.commit();
updateIndexState(phoenixConn, fullIndexName, PIndexState.DISABLE);
mutateRandomly(conn, fullTableName, 20);
boolean[] cancel = new boolean[1];
checkIndexPendingDisableCount(phoenixConn, fullIndexName);
try {
do {
runIndexRebuilder(Collections.<String> singletonList(fullTableName));
} while (!TestUtil.checkIndexState(conn, fullIndexName, PIndexState.ACTIVE, 0L));
} finally {
cancel[0] = true;
}
assertTrue("Index state is inactive ", indexState.equals("i"));
assertTrue("pendingDisable count is incremented when index is inactive",
pendingDisableCount == MetaDataRegionObserver.PENDING_DISABLE_INACTIVE_STATE_COUNT);
assertTrue("pending disable count is 0 when index is active: ", getPendingDisableCount(phoenixConn, fullIndexName) == 0);
}
@Test
public void checkIndexPendingDisableResetCounter() throws Throwable {
IndexUtil.incrementCounterForIndex(phoenixConn, fullIndexName, MetaDataRegionObserver.PENDING_DISABLE_INACTIVE_STATE_COUNT);
updateIndexState(phoenixConn, fullIndexName, PIndexState.PENDING_DISABLE);
assertTrue("Pending disable count should reset when index moves from ACTIVE to PENDING_DISABLE ", getPendingDisableCount(phoenixConn, fullIndexName) == 0);
IndexUtil.incrementCounterForIndex(phoenixConn, fullIndexName, MetaDataRegionObserver.PENDING_DISABLE_INACTIVE_STATE_COUNT);
updateIndexState(phoenixConn, fullIndexName, PIndexState.INACTIVE);
updateIndexState(phoenixConn, fullIndexName, PIndexState.PENDING_DISABLE);
assertTrue("Pending disable count should reset when index moves from ACTIVE to PENDING_DISABLE ", getPendingDisableCount(phoenixConn, fullIndexName) == MetaDataRegionObserver.PENDING_DISABLE_INACTIVE_STATE_COUNT);
}
private static void runIndexRebuilder(List<String> tables)
throws InterruptedException, SQLException {
BuildIndexScheduleTask task =
new MetaDataRegionObserver.BuildIndexScheduleTask(indexRebuildTaskRegionEnvironment,
tables);
task.run();
}
}