blob: a5f72b8fca9215fb6b992d800d3c89786578f334 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.phoenix.end2end.index;
import static org.apache.phoenix.util.TestUtil.TEST_PROPERTIES;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertTrue;
import static org.junit.Assert.fail;
import java.io.IOException;
import java.sql.Connection;
import java.sql.PreparedStatement;
import java.sql.ResultSet;
import java.sql.SQLException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.List;
import java.util.Map;
import java.util.Properties;
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import org.apache.hadoop.hbase.Cell;
import org.apache.hadoop.hbase.DoNotRetryIOException;
import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.client.Mutation;
import org.apache.hadoop.hbase.coprocessor.ObserverContext;
import org.apache.hadoop.hbase.coprocessor.RegionCoprocessorEnvironment;
import org.apache.hadoop.hbase.coprocessor.SimpleRegionObserver;
import org.apache.hadoop.hbase.regionserver.MiniBatchOperationInProgress;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.phoenix.coprocessor.MetaDataRegionObserver;
import org.apache.phoenix.coprocessor.MetaDataRegionObserver.BuildIndexScheduleTask;
import org.apache.phoenix.end2end.NeedsOwnMiniClusterTest;
import org.apache.phoenix.execute.CommitException;
import org.apache.phoenix.hbase.index.write.IndexWriterUtils;
import org.apache.phoenix.index.PhoenixIndexFailurePolicy;
import org.apache.phoenix.jdbc.PhoenixConnection;
import org.apache.phoenix.jdbc.PhoenixDatabaseMetaData;
import org.apache.phoenix.query.BaseTest;
import org.apache.phoenix.query.QueryConstants;
import org.apache.phoenix.query.QueryServices;
import org.apache.phoenix.query.QueryServicesOptions;
import org.apache.phoenix.schema.PIndexState;
import org.apache.phoenix.schema.PTable;
import org.apache.phoenix.schema.PTableKey;
import org.apache.phoenix.schema.PTableType;
import org.apache.phoenix.schema.SortOrder;
import org.apache.phoenix.transaction.PhoenixTransactionProvider;
import org.apache.phoenix.transaction.TransactionFactory;
import org.apache.phoenix.util.IndexScrutiny;
import org.apache.phoenix.util.MetaDataUtil;
import org.apache.phoenix.util.PhoenixRuntime;
import org.apache.phoenix.util.PropertiesUtil;
import org.apache.phoenix.util.QueryUtil;
import org.apache.phoenix.util.ReadOnlyProps;
import org.apache.phoenix.util.SchemaUtil;
import org.apache.phoenix.util.StringUtil;
import org.apache.phoenix.util.TestUtil;
import org.junit.BeforeClass;
import org.junit.Test;
import org.junit.experimental.categories.Category;
import org.junit.runner.RunWith;
import org.junit.runners.Parameterized;
import org.junit.runners.Parameterized.Parameters;
import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
/**
*
* Test for failure of region server to write to index table.
* For some reason dropping tables after running this test
* fails unless it runs its own mini cluster.
*
*/
@Category(NeedsOwnMiniClusterTest.class)
@RunWith(Parameterized.class)
public class MutableIndexFailureIT extends BaseTest {
public static volatile boolean FAIL_WRITE = false;
public static volatile String fullTableName;
private String tableName;
private String indexName;
private String fullIndexName;
private final boolean transactional;
private final PhoenixTransactionProvider transactionProvider;
private final boolean localIndex;
private final String tableDDLOptions;
private final boolean isNamespaceMapped;
private final boolean leaveIndexActiveOnFailure;
private final boolean failRebuildTask;
private final boolean throwIndexWriteFailure;
private String schema = generateUniqueName();
private List<CommitException> exceptions = Lists.newArrayList();
protected static RegionCoprocessorEnvironment indexRebuildTaskRegionEnvironment;
protected static final int forwardOverlapMs = 1000;
protected static final int disableTimestampThresholdMs = 10000;
protected static final int numRpcRetries = 2;
public MutableIndexFailureIT(String transactionProvider, boolean localIndex, boolean isNamespaceMapped, Boolean disableIndexOnWriteFailure, boolean failRebuildTask, Boolean throwIndexWriteFailure) {
this.transactional = transactionProvider != null;
this.transactionProvider = transactionProvider == null ? null :
TransactionFactory.getTransactionProvider(TransactionFactory.Provider.valueOf(transactionProvider));
this.localIndex = localIndex;
this.tableDDLOptions = " SALT_BUCKETS=2, COLUMN_ENCODED_BYTES=NONE" + (transactional ? (",TRANSACTIONAL=true,TRANSACTION_PROVIDER='"+transactionProvider+"'") : "")
+ (disableIndexOnWriteFailure == null ? "" : (", " + PhoenixIndexFailurePolicy.DISABLE_INDEX_ON_WRITE_FAILURE + "=" + disableIndexOnWriteFailure))
+ (throwIndexWriteFailure == null ? "" : (", " + PhoenixIndexFailurePolicy.THROW_INDEX_WRITE_FAILURE + "=" + throwIndexWriteFailure));
this.tableName = FailingRegionObserver.FAIL_TABLE_NAME;
this.indexName = "A_" + FailingRegionObserver.FAIL_INDEX_NAME;
fullTableName = SchemaUtil.getTableName(schema, tableName);
this.fullIndexName = SchemaUtil.getTableName(schema, indexName);
this.isNamespaceMapped = isNamespaceMapped;
this.leaveIndexActiveOnFailure = ! (disableIndexOnWriteFailure == null ? QueryServicesOptions.DEFAULT_INDEX_FAILURE_DISABLE_INDEX : disableIndexOnWriteFailure);
this.failRebuildTask = failRebuildTask;
this.throwIndexWriteFailure = ! Boolean.FALSE.equals(throwIndexWriteFailure);
}
@BeforeClass
public static synchronized void doSetup() throws Exception {
Map<String, String> serverProps = getServerProps();
Map<String, String> clientProps = Maps.newHashMapWithExpectedSize(2);
clientProps.put(HConstants.HBASE_CLIENT_RETRIES_NUMBER, "2");
clientProps.put(QueryServices.INDEX_REGION_OBSERVER_ENABLED_ATTRIB, Boolean.FALSE.toString());
NUM_SLAVES_BASE = 4;
setUpTestDriver(new ReadOnlyProps(serverProps.entrySet().iterator()), new ReadOnlyProps(clientProps.entrySet().iterator()));
indexRebuildTaskRegionEnvironment = getUtility()
.getRSForFirstRegionInTable(PhoenixDatabaseMetaData.SYSTEM_CATALOG_HBASE_TABLE_NAME)
.getRegions(PhoenixDatabaseMetaData.SYSTEM_CATALOG_HBASE_TABLE_NAME).get(0).getCoprocessorHost()
.findCoprocessorEnvironment(MetaDataRegionObserver.class.getName());
MetaDataRegionObserver.initRebuildIndexConnectionProps(
indexRebuildTaskRegionEnvironment.getConfiguration());
}
protected static Map<String,String> getServerProps(){
Map<String, String> serverProps = Maps.newHashMapWithExpectedSize(10);
serverProps.put("hbase.coprocessor.region.classes", FailingRegionObserver.class.getName());
serverProps.put(HConstants.HBASE_RPC_TIMEOUT_KEY, "10000");
serverProps.put(IndexWriterUtils.INDEX_WRITER_RPC_PAUSE, "5000");
serverProps.put("data.tx.snapshot.dir", "/tmp");
serverProps.put("hbase.balancer.period", String.valueOf(Integer.MAX_VALUE));
// need to override rpc retries otherwise test doesn't pass
serverProps.put(QueryServices.INDEX_REBUILD_RPC_RETRIES_COUNTER, Long.toString(numRpcRetries));
serverProps.put(QueryServices.INDEX_FAILURE_HANDLING_REBUILD_OVERLAP_FORWARD_TIME_ATTRIB, Long.toString(forwardOverlapMs));
serverProps.put(QueryServices.INDEX_REBUILD_DISABLE_TIMESTAMP_THRESHOLD, Long.toString(disableTimestampThresholdMs));
/*
* Effectively disable running the index rebuild task by having an infinite delay
* because we want to control it's execution ourselves
*/
serverProps.put(QueryServices.INDEX_REBUILD_TASK_INITIAL_DELAY, Long.toString(Long.MAX_VALUE));
return serverProps;
}
@Parameters(name = "MutableIndexFailureIT_transactionProvider={0},localIndex={1},isNamespaceMapped={2},disableIndexOnWriteFailure={3},failRebuildTask={4},throwIndexWriteFailure={5}") // name is used by failsafe as file name in reports
public static synchronized Collection<Object[]> data() {
return TestUtil.filterTxParamData(
Arrays.asList(new Object[][] {
// note - can't disableIndexOnWriteFailure without throwIndexWriteFailure, PHOENIX-4130
{ null, false, false, false, false, false},
{ null, false, false, true, false, null},
{ "TEPHRA", false, false, true, false, null},
{ "OMID", false, false, true, false, null},
{ null, true, false, null, false, null},
{ "TEPHRA", true, false, true, false, null},
{ null, false, false, false, false, null},
{ null, true, false, false, false, null},
{ null, false, false, false, false, null},
{ null, false, false, true, false, null},
{ null, true, false, true, false, null},
{ null, true, false, true, false, null},
{ null, false, false, true, true, null},
{ null, false, false, false, true, false},
}), 0);
}
private void runRebuildTask(Connection conn) throws InterruptedException, SQLException {
BuildIndexScheduleTask task =
new MetaDataRegionObserver.BuildIndexScheduleTask(
indexRebuildTaskRegionEnvironment);
dumpStateOfIndexes(conn, fullTableName, true);
task.run();
dumpStateOfIndexes(conn, fullTableName, false);
Thread.sleep(forwardOverlapMs + 100);
if (failRebuildTask) {
Thread.sleep(disableTimestampThresholdMs + 100);
}
dumpStateOfIndexes(conn, fullTableName, true);
task.run();
dumpStateOfIndexes(conn, fullTableName, false);
}
private static final void dumpStateOfIndexes(Connection conn, String tableName,
boolean beforeRebuildTaskRun) throws SQLException {
PhoenixConnection phxConn = conn.unwrap(PhoenixConnection.class);
PTable table = phxConn.getTable(new PTableKey(phxConn.getTenantId(), tableName));
List<PTable> indexes = table.getIndexes();
String s = beforeRebuildTaskRun ? "before rebuild run" : "after rebuild run";
System.out.println("************Index state in connection " + s + "******************");
for (PTable idx : indexes) {
System.out.println(
"Index Name: " + idx.getName().getString() + " State: " + idx.getIndexState()
+ " Disable timestamp: " + idx.getIndexDisableTimestamp());
}
System.out.println("************Index state from server " + s + "******************");
table = PhoenixRuntime.getTableNoCache(phxConn, fullTableName);
for (PTable idx : table.getIndexes()) {
System.out.println(
"Index Name: " + idx.getName().getString() + " State: " + idx.getIndexState()
+ " Disable timestamp: " + idx.getIndexDisableTimestamp());
}
}
@Test
public void testIndexWriteFailure() throws Exception {
String secondIndexName = "B_" + FailingRegionObserver.FAIL_INDEX_NAME;
String thirdIndexName = "C_IDX";
String thirdFullIndexName = SchemaUtil.getTableName(schema, thirdIndexName);
Properties props = PropertiesUtil.deepCopy(TEST_PROPERTIES);
props.put(QueryServices.IS_NAMESPACE_MAPPING_ENABLED, String.valueOf(isNamespaceMapped));
try (Connection conn = driver.connect(url, props)) {
String query;
ResultSet rs;
conn.setAutoCommit(false);
if (isNamespaceMapped) {
conn.createStatement().execute("CREATE SCHEMA IF NOT EXISTS " + schema);
}
conn.createStatement().execute("CREATE TABLE " + fullTableName
+ " (k VARCHAR NOT NULL PRIMARY KEY, v1 VARCHAR, v2 VARCHAR) " + tableDDLOptions);
query = "SELECT * FROM " + fullTableName;
rs = conn.createStatement().executeQuery(query);
assertFalse(rs.next());
FailingRegionObserver.FAIL_WRITE = false;
conn.createStatement().execute(
"CREATE " + (localIndex ? "LOCAL " : "") + " INDEX " + indexName + " ON " + fullTableName + " (v1) INCLUDE (v2)");
// Create other index which should be local/global if the other index is global/local to
// check the drop index.
conn.createStatement().execute(
"CREATE " + ((!localIndex &&
(transactionProvider == null
|| !transactionProvider.isUnsupported(PhoenixTransactionProvider.Feature.ALLOW_LOCAL_INDEX)))
? "LOCAL " : "") + " INDEX " + secondIndexName + " ON " + fullTableName + " (v2) INCLUDE (v1)");
conn.createStatement().execute(
"CREATE " + (localIndex ? "LOCAL " : "") + " INDEX " + thirdIndexName + " ON " + fullTableName + " (v1) INCLUDE (v2)");
query = "SELECT * FROM " + fullIndexName;
rs = conn.createStatement().executeQuery(query);
assertFalse(rs.next());
initializeTable(conn, fullTableName);
addRowsInTableDuringRetry(fullTableName);
// Verify the metadata for index is correct.
rs = conn.getMetaData().getTables(null, StringUtil.escapeLike(schema), null,
new String[] { PTableType.INDEX.toString() });
assertTrue(rs.next());
assertEquals(indexName, rs.getString(3));
assertEquals(PIndexState.ACTIVE.toString(), rs.getString("INDEX_STATE"));
assertTrue(rs.next());
assertEquals(secondIndexName, rs.getString(3));
assertEquals(PIndexState.ACTIVE.toString(), rs.getString("INDEX_STATE"));
assertTrue(rs.next());
assertEquals(thirdIndexName, rs.getString(3));
assertEquals(PIndexState.ACTIVE.toString(), rs.getString("INDEX_STATE"));
// we should be able to write to ACTIVE index even in case of disable index on failure policy
addRowToTable(conn, fullTableName);
query = "SELECT /*+ NO_INDEX */ k,v1 FROM " + fullTableName;
rs = conn.createStatement().executeQuery("EXPLAIN " + query);
String expectedPlan = "CLIENT PARALLEL 2-WAY FULL SCAN OVER "
+ SchemaUtil.getPhysicalTableName(fullTableName.getBytes(), isNamespaceMapped)+"\nCLIENT MERGE SORT";
assertEquals(expectedPlan, QueryUtil.getExplainPlan(rs));
rs = conn.createStatement().executeQuery(query);
assertTrue(rs.next());
assertEquals("a", rs.getString(1));
assertEquals("x", rs.getString(2));
assertTrue(rs.next());
assertEquals("b", rs.getString(1));
assertEquals("y", rs.getString(2));
assertTrue(rs.next());
assertEquals("c", rs.getString(1));
assertEquals("z", rs.getString(2));
assertFalse(rs.next());
updateTable(conn, true);
// Verify the metadata for index is correct.
rs = conn.getMetaData().getTables(null, StringUtil.escapeLike(schema), StringUtil.escapeLike(indexName),
new String[] { PTableType.INDEX.toString() });
assertTrue(rs.next());
assertEquals(indexName, rs.getString(3));
// the index is only disabled for non-txn tables upon index table write failure
String indexState = rs.getString("INDEX_STATE");
if (transactional || leaveIndexActiveOnFailure || localIndex) {
assertTrue(PIndexState.ACTIVE.toString().equalsIgnoreCase(indexState) || PIndexState.PENDING_ACTIVE.toString().equalsIgnoreCase(indexState));
} else {
assertTrue(PIndexState.DISABLE.toString().equals(indexState) || PIndexState.INACTIVE.toString().equals(indexState));
// non-failing index should remain active
ResultSet thirdRs = conn.createStatement().executeQuery(getSysCatQuery(thirdIndexName));
assertTrue(thirdRs.next());
assertEquals(PIndexState.ACTIVE.getSerializedValue(), thirdRs.getString(1));
}
assertFalse(rs.next());
// If the table is transactional the write to both the data and index table will fail
// in an all or none manner. If the table is not transactional, then the data writes
// would have succeeded while the index writes would have failed.
if (!transactional) {
updateTableAgain(conn, false);
// Verify previous writes succeeded to data table
query = "SELECT /*+ NO_INDEX */ k,v1 FROM " + fullTableName;
rs = conn.createStatement().executeQuery("EXPLAIN " + query);
expectedPlan = "CLIENT PARALLEL 2-WAY FULL SCAN OVER "
+ SchemaUtil.getPhysicalTableName(fullTableName.getBytes(), isNamespaceMapped)+"\nCLIENT MERGE SORT";
assertEquals(expectedPlan, QueryUtil.getExplainPlan(rs));
rs = conn.createStatement().executeQuery(query);
assertTrue(rs.next());
assertEquals("a", rs.getString(1));
assertEquals("x2", rs.getString(2));
assertTrue(rs.next());
assertEquals("a3", rs.getString(1));
assertEquals("x3", rs.getString(2));
assertTrue(rs.next());
assertEquals("c", rs.getString(1));
assertEquals("z", rs.getString(2));
assertTrue(rs.next());
assertEquals("d", rs.getString(1));
assertEquals("d", rs.getString(2));
assertFalse(rs.next());
}
IndexScrutiny.scrutinizeIndex(conn, fullTableName, thirdFullIndexName);
if (!failRebuildTask) {
// re-enable index table
FailingRegionObserver.FAIL_WRITE = false;
runRebuildTask(conn);
// wait for index to be rebuilt automatically
checkStateAfterRebuild(conn, fullIndexName, PIndexState.ACTIVE);
// Verify UPSERT on data table still works after index table is caught up
PreparedStatement stmt = conn.prepareStatement("UPSERT INTO " + fullTableName + " VALUES(?,?,?)");
stmt.setString(1, "a3");
stmt.setString(2, "x4");
stmt.setString(3, "4");
stmt.execute();
conn.commit();
// verify index table has correct data (note that second index has been dropped)
validateDataWithIndex(conn, fullTableName, fullIndexName, localIndex);
} else {
// Wait for index to be rebuilt automatically. This should fail because
// we haven't flipped the FAIL_WRITE flag to false and as a result this
// should cause index rebuild to fail too.
runRebuildTask(conn);
checkStateAfterRebuild(conn, fullIndexName, PIndexState.DISABLE);
// verify that the index was marked as disabled and the index disable
// timestamp set to 0
String q = getSysCatQuery(indexName);
try (ResultSet r = conn.createStatement().executeQuery(q)) {
assertTrue(r.next());
assertEquals(PIndexState.DISABLE.getSerializedValue(), r.getString(1));
assertEquals(0, r.getLong(2));
assertFalse(r.next());
}
}
} finally {
FAIL_WRITE = false;
}
}
private String getSysCatQuery(String iName) {
String q =
"SELECT INDEX_STATE, INDEX_DISABLE_TIMESTAMP FROM SYSTEM.CATALOG WHERE TABLE_SCHEM = '"
+ schema + "' AND TABLE_NAME = '" + iName + "'"
+ " AND COLUMN_NAME IS NULL AND COLUMN_FAMILY IS NULL";
return q;
}
private void checkStateAfterRebuild(Connection conn, String fullIndexName, PIndexState expectedIndexState) throws InterruptedException, SQLException {
if (!transactional) {
assertTrue(TestUtil.checkIndexState(conn,fullIndexName, expectedIndexState, 0l));
}
}
private void initializeTable(Connection conn, String tableName) throws SQLException {
PreparedStatement stmt = conn.prepareStatement("UPSERT INTO " + tableName + " VALUES(?,?,?)");
stmt.setString(1, "a");
stmt.setString(2, "x");
stmt.setString(3, "1");
stmt.execute();
conn.commit();
}
private void addRowToTable(Connection conn, String tableName) throws SQLException {
PreparedStatement stmt = conn.prepareStatement("UPSERT INTO " + tableName + " VALUES(?,?,?)");
stmt.setString(1, "c");
stmt.setString(2, "z");
stmt.setString(3, "3");
stmt.execute();
conn.commit();
}
private void addRowsInTableDuringRetry(final String tableName)
throws SQLException, InterruptedException, ExecutionException {
int threads=10;
boolean wasFailWrite = FailingRegionObserver.FAIL_WRITE;
boolean wasToggleFailWriteForRetry = FailingRegionObserver.TOGGLE_FAIL_WRITE_FOR_RETRY;
try {
Callable callable = new Callable() {
@Override
public Boolean call() {
Properties props = PropertiesUtil.deepCopy(TEST_PROPERTIES);
props.put(QueryServices.IS_NAMESPACE_MAPPING_ENABLED, String.valueOf(isNamespaceMapped));
try (Connection conn = driver.connect(url, props)) {
// In case of disable index on failure policy, INDEX will be in PENDING_DISABLE on first retry
// but will
// become active if retry is successfull
PreparedStatement stmt = conn.prepareStatement("UPSERT INTO " + tableName + " VALUES(?,?,?)");
stmt.setString(1, "b");
stmt.setString(2, "y");
stmt.setString(3, "2");
stmt.execute();
if (!leaveIndexActiveOnFailure && !transactional) {
FailingRegionObserver.FAIL_WRITE = true;
FailingRegionObserver.TOGGLE_FAIL_WRITE_FOR_RETRY = true;
}
conn.commit();
} catch (SQLException e) {
return false;
}
return true;
}
};
ExecutorService executor = Executors.newFixedThreadPool(threads);
List<Future<Boolean>> futures = new ArrayList<Future<Boolean>>();
for (int i = 0; i < threads; i++) {
futures.add(executor.submit(callable));
}
for (Future<Boolean> future : futures) {
Boolean isSuccess = future.get();
// transactions can have conflict so ignoring the check for them
if (!transactional) {
assertTrue(isSuccess);
}
}
executor.shutdown();
} finally {
FailingRegionObserver.FAIL_WRITE = wasFailWrite;
FailingRegionObserver.TOGGLE_FAIL_WRITE_FOR_RETRY = wasToggleFailWriteForRetry;
}
}
private void validateDataWithIndex(Connection conn, String fullTableName, String fullIndexName, boolean localIndex) throws Exception {
String query = "SELECT /*+ INDEX(" + fullTableName + " " + SchemaUtil.getTableNameFromFullName(fullIndexName) + ") */ k,v1 FROM " + fullTableName;
ResultSet rs = conn.createStatement().executeQuery(query);
String expectedPlan = " OVER "
+ (localIndex
? Bytes.toString(
SchemaUtil.getPhysicalTableName(fullTableName.getBytes(), isNamespaceMapped).getName())
: SchemaUtil.getPhysicalTableName(fullIndexName.getBytes(), isNamespaceMapped).getNameAsString());
String explainPlan = QueryUtil.getExplainPlan(conn.createStatement().executeQuery("EXPLAIN " + query));
assertTrue(explainPlan, explainPlan.contains(expectedPlan));
if (transactional) { // failed commit does not get retried
assertTrue(rs.next());
assertEquals("a", rs.getString(1));
assertEquals("x", rs.getString(2));
assertTrue(rs.next());
assertEquals("a3", rs.getString(1));
assertEquals("x4", rs.getString(2));
assertTrue(rs.next());
assertEquals("b", rs.getString(1));
assertEquals("y", rs.getString(2));
assertTrue(rs.next());
assertEquals("c", rs.getString(1));
assertEquals("z", rs.getString(2));
assertFalse(rs.next());
} else { // failed commit eventually succeeds
assertTrue(rs.next());
assertEquals("d", rs.getString(1));
assertEquals("d", rs.getString(2));
assertTrue(rs.next());
assertEquals("a", rs.getString(1));
assertEquals("x2", rs.getString(2));
assertTrue(rs.next());
assertEquals("a3", rs.getString(1));
assertEquals("x4", rs.getString(2));
assertTrue(rs.next());
assertEquals("c", rs.getString(1));
assertEquals("z", rs.getString(2));
assertFalse(rs.next());
}
}
private void updateTable(Connection conn, boolean commitShouldFail) throws Exception {
PreparedStatement stmt = conn.prepareStatement("UPSERT INTO " + fullTableName + " VALUES(?,?,?)");
// Insert new row
stmt.setString(1, "d");
stmt.setString(2, "d");
stmt.setString(3, "4");
stmt.execute();
// Update existing row
stmt.setString(1, "a");
stmt.setString(2, "x2");
stmt.setString(3, "2");
stmt.execute();
// Delete existing row
stmt = conn.prepareStatement("DELETE FROM " + fullTableName + " WHERE k=?");
stmt.setString(1, "b");
stmt.execute();
// Set to fail after the DELETE, since transactional tables will write
// uncommitted data when the DELETE is executed.
FailingRegionObserver.FAIL_WRITE = true;
try {
FailingRegionObserver.FAIL_NEXT_WRITE = localIndex && transactional;
conn.commit();
if (commitShouldFail && (!localIndex || transactional) && this.throwIndexWriteFailure) {
fail();
}
} catch (CommitException e) {
if (!commitShouldFail || !this.throwIndexWriteFailure) {
throw e;
}
exceptions.add(e);
}
}
private void updateTableAgain(Connection conn, boolean commitShouldFail) throws SQLException {
// Verify UPSERT on data table still work after index is disabled
PreparedStatement stmt = conn.prepareStatement("UPSERT INTO " + fullTableName + " VALUES(?,?,?)");
stmt.setString(1, "a3");
stmt.setString(2, "x3");
stmt.setString(3, "3");
stmt.execute();
try {
conn.commit();
if (commitShouldFail && !localIndex && this.throwIndexWriteFailure) {
fail();
}
} catch (CommitException e) {
if (!commitShouldFail || !this.throwIndexWriteFailure) {
throw e;
}
exceptions.add(e);
}
}
public static class FailingRegionObserver extends SimpleRegionObserver {
public static boolean TOGGLE_FAIL_WRITE_FOR_RETRY = false;
public static volatile boolean FAIL_WRITE = false;
public static volatile boolean FAIL_NEXT_WRITE = false;
public static final String FAIL_INDEX_NAME = "FAIL_IDX";
public static final String FAIL_TABLE_NAME = "FAIL_TABLE";
@Override
public void preBatchMutate(ObserverContext<RegionCoprocessorEnvironment> c, MiniBatchOperationInProgress<Mutation> miniBatchOp) throws IOException {
boolean throwException = false;
if (FAIL_NEXT_WRITE) {
throwException = true;
FAIL_NEXT_WRITE = false;
} else if (c.getEnvironment().getRegionInfo().getTable().getNameAsString().endsWith("A_" + FAIL_INDEX_NAME)
&& FAIL_WRITE) {
throwException = true;
if (TOGGLE_FAIL_WRITE_FOR_RETRY) {
FAIL_WRITE = !FAIL_WRITE;
}
} else {
// When local index updates are atomic with data updates, testing a write failure to a local
// index won't make sense.
Mutation operation = miniBatchOp.getOperation(0);
if (FAIL_WRITE) {
Map<byte[],List<Cell>>cellMap = operation.getFamilyCellMap();
for (Map.Entry<byte[],List<Cell>> entry : cellMap.entrySet()) {
byte[] family = entry.getKey();
if (Bytes.toString(family).startsWith(QueryConstants.LOCAL_INDEX_COLUMN_FAMILY_PREFIX)) {
int regionStartKeyLen = c.getEnvironment().getRegionInfo().getStartKey().length;
Cell firstCell = entry.getValue().get(0);
long indexId = MetaDataUtil.getViewIndexIdDataType().getCodec().decodeLong(firstCell.getRowArray(), firstCell.getRowOffset() + regionStartKeyLen, SortOrder.getDefault());
// Only throw for first local index as the test may have multiple local indexes
if (indexId == Short.MIN_VALUE) {
throwException = true;
break;
}
}
}
}
}
if (throwException) {
if (!TOGGLE_FAIL_WRITE_FOR_RETRY) {
dropIndex(c);
}
throw new DoNotRetryIOException();
}
}
private void dropIndex(ObserverContext<RegionCoprocessorEnvironment> c) {
try {
Connection connection =
QueryUtil.getConnection(c.getEnvironment().getConfiguration());
connection.createStatement().execute(
"DROP INDEX IF EXISTS " + "B_" + FAIL_INDEX_NAME + " ON "
+ fullTableName);
} catch (ClassNotFoundException e) {
} catch (SQLException e) {
}
}
}
}