blob: e612f49c028e74303788be74e9b22d5292d080ae [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.phoenix.end2end.index;
import static org.apache.phoenix.util.TestUtil.TEST_PROPERTIES;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertTrue;
import static org.junit.Assert.fail;
import java.sql.Connection;
import java.sql.PreparedStatement;
import java.sql.ResultSet;
import java.sql.SQLException;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.Map;
import java.util.Properties;
import java.util.Set;
import org.apache.hadoop.hbase.DoNotRetryIOException;
import org.apache.hadoop.hbase.HBaseIOException;
import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.client.Mutation;
import org.apache.hadoop.hbase.coprocessor.ObserverContext;
import org.apache.hadoop.hbase.coprocessor.RegionCoprocessorEnvironment;
import org.apache.hadoop.hbase.coprocessor.SimpleRegionObserver;
import org.apache.hadoop.hbase.regionserver.MiniBatchOperationInProgress;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.phoenix.end2end.NeedsOwnMiniClusterTest;
import org.apache.phoenix.jdbc.PhoenixConnection;
import org.apache.phoenix.query.BaseTest;
import org.apache.phoenix.query.QueryConstants;
import org.apache.phoenix.query.QueryServices;
import org.apache.phoenix.schema.PIndexState;
import org.apache.phoenix.schema.PTableType;
import org.apache.phoenix.util.PropertiesUtil;
import org.apache.phoenix.util.QueryUtil;
import org.apache.phoenix.util.ReadOnlyProps;
import org.apache.phoenix.util.SchemaUtil;
import org.apache.phoenix.util.StringUtil;
import org.apache.phoenix.util.TestUtil;
import org.junit.AfterClass;
import org.junit.BeforeClass;
import org.junit.Test;
import org.junit.experimental.categories.Category;
import org.junit.runner.RunWith;
import org.junit.runners.Parameterized;
import org.junit.runners.Parameterized.Parameters;
import com.google.common.collect.Maps;
/**
*
* Test for failure of region server to write to index table.
* For some reason dropping tables after running this test
* fails unless it runs its own mini cluster.
*
*/
@Category(NeedsOwnMiniClusterTest.class)
@RunWith(Parameterized.class)
public class MutableIndexFailureIT extends BaseTest {
public static final String INDEX_NAME = "IDX";
public static volatile boolean FAIL_WRITE = false;
public static volatile String fullTableName;
private String tableName;
private String indexName;
private String fullIndexName;
private final boolean transactional;
private final boolean localIndex;
private final String tableDDLOptions;
private final boolean isNamespaceMapped;
private String schema = generateUniqueName();
@AfterClass
public static void doTeardown() throws Exception {
tearDownMiniCluster();
}
public MutableIndexFailureIT(boolean transactional, boolean localIndex, boolean isNamespaceMapped) {
this.transactional = transactional;
this.localIndex = localIndex;
this.tableDDLOptions = " SALT_BUCKETS=2 " + (transactional ? ", TRANSACTIONAL=true " : "");
this.tableName = (localIndex ? "L_" : "") + TestUtil.DEFAULT_DATA_TABLE_NAME + (transactional ? "_TXN" : "")
+ (isNamespaceMapped ? "_NM" : "");
this.indexName = FailingRegionObserver.INDEX_NAME;
fullTableName = SchemaUtil.getTableName(schema, tableName);
this.fullIndexName = SchemaUtil.getTableName(schema, indexName);
this.isNamespaceMapped = isNamespaceMapped;
}
@BeforeClass
public static void doSetup() throws Exception {
Map<String, String> serverProps = Maps.newHashMapWithExpectedSize(10);
serverProps.put("hbase.coprocessor.region.classes", FailingRegionObserver.class.getName());
serverProps.put(HConstants.HBASE_CLIENT_RETRIES_NUMBER, "2");
serverProps.put(HConstants.HBASE_RPC_TIMEOUT_KEY, "10000");
serverProps.put("hbase.client.pause", "5000");
serverProps.put("data.tx.snapshot.dir", "/tmp");
serverProps.put("hbase.balancer.period", String.valueOf(Integer.MAX_VALUE));
Map<String, String> clientProps = Collections.singletonMap(QueryServices.TRANSACTIONS_ENABLED, "true");
NUM_SLAVES_BASE = 4;
setUpTestDriver(new ReadOnlyProps(serverProps.entrySet().iterator()), new ReadOnlyProps(clientProps.entrySet().iterator()));
}
@Parameters(name = "MutableIndexFailureIT_transactional={0},localIndex={1},isNamespaceMapped={2}") // name is used by failsafe as file name in reports
public static Collection<Boolean[]> data() {
return Arrays.asList(new Boolean[][] { { false, false, true }, { false, false, false }, { false, true, true },
{ false, true, false }, { true, false, true }, { true, true, true }, { true, false, false },
{ true, true, false } });
}
@Test
public void testWriteFailureDisablesIndex() throws Exception {
helpTestWriteFailureDisablesIndex();
}
public void helpTestWriteFailureDisablesIndex() throws Exception {
String secondTableName = fullTableName + "_2";
String secondIndexName = indexName + "_2";
String secondFullIndexName = fullIndexName + "_2";
Properties props = PropertiesUtil.deepCopy(TEST_PROPERTIES);
props.put(QueryServices.IS_NAMESPACE_MAPPING_ENABLED, String.valueOf(isNamespaceMapped));
try (Connection conn = driver.connect(url, props)) {
String query;
ResultSet rs;
conn.setAutoCommit(false);
if (isNamespaceMapped) {
conn.createStatement().execute("CREATE SCHEMA IF NOT EXISTS " + schema);
}
conn.createStatement().execute("CREATE TABLE " + fullTableName
+ " (k VARCHAR NOT NULL PRIMARY KEY, v1 VARCHAR, v2 VARCHAR) " + tableDDLOptions);
conn.createStatement().execute("CREATE TABLE " + secondTableName
+ " (k VARCHAR NOT NULL PRIMARY KEY, v1 VARCHAR, v2 VARCHAR) " + tableDDLOptions);
query = "SELECT * FROM " + fullTableName;
rs = conn.createStatement().executeQuery(query);
assertFalse(rs.next());
FailingRegionObserver.FAIL_WRITE = false;
conn.createStatement().execute(
"CREATE " + (localIndex ? "LOCAL " : "") + "INDEX " + indexName + " ON " + fullTableName + " (v1) INCLUDE (v2)");
// Create other index which should be local/global if the other index is global/local to
// check the drop index.
conn.createStatement().execute(
"CREATE " + (!localIndex ? "LOCAL " : "") + "INDEX " + indexName + "_3" + " ON "
+ fullTableName + " (v2) INCLUDE (v1)");
conn.createStatement().execute(
"CREATE " + (localIndex ? "LOCAL " : "") + "INDEX " + secondIndexName + " ON " + secondTableName + " (v1) INCLUDE (v2)");
query = "SELECT * FROM " + fullIndexName;
rs = conn.createStatement().executeQuery(query);
assertFalse(rs.next());
// Verify the metadata for index is correct.
rs = conn.getMetaData().getTables(null, StringUtil.escapeLike(schema), indexName+"%",
new String[] { PTableType.INDEX.toString() });
assertTrue(rs.next());
assertEquals(indexName, rs.getString(3));
assertEquals(PIndexState.ACTIVE.toString(), rs.getString("INDEX_STATE"));
assertTrue(rs.next());
assertEquals(secondIndexName, rs.getString(3));
assertEquals(PIndexState.ACTIVE.toString(), rs.getString("INDEX_STATE"));
assertTrue(rs.next());
assertEquals(indexName+"_3", rs.getString(3));
assertEquals(PIndexState.ACTIVE.toString(), rs.getString("INDEX_STATE"));
initializeTable(conn, fullTableName);
initializeTable(conn, secondTableName);
query = "SELECT /*+ NO_INDEX */ k,v1 FROM " + fullTableName;
rs = conn.createStatement().executeQuery("EXPLAIN " + query);
String expectedPlan = "CLIENT PARALLEL 2-WAY FULL SCAN OVER "
+ SchemaUtil.getPhysicalTableName(fullTableName.getBytes(), isNamespaceMapped)+"\nCLIENT MERGE SORT";
assertEquals(expectedPlan, QueryUtil.getExplainPlan(rs));
rs = conn.createStatement().executeQuery(query);
assertTrue(rs.next());
assertEquals("a", rs.getString(1));
assertEquals("x", rs.getString(2));
assertTrue(rs.next());
assertEquals("b", rs.getString(1));
assertEquals("y", rs.getString(2));
assertTrue(rs.next());
assertEquals("c", rs.getString(1));
assertEquals("z", rs.getString(2));
assertFalse(rs.next());
FailingRegionObserver.FAIL_WRITE = true;
updateTable(conn, fullTableName);
updateTable(conn, secondTableName);
// Verify the metadata for index is correct.
rs = conn.getMetaData().getTables(null, StringUtil.escapeLike(schema), indexName,
new String[] { PTableType.INDEX.toString() });
assertTrue(rs.next());
assertEquals(indexName, rs.getString(3));
// the index is only disabled for non-txn tables upon index table write failure
if (transactional) {
assertEquals(PIndexState.ACTIVE.toString(), rs.getString("INDEX_STATE"));
} else {
String indexState = rs.getString("INDEX_STATE");
assertTrue(PIndexState.DISABLE.toString().equals(indexState) || PIndexState.INACTIVE.toString().equals(indexState));
}
assertFalse(rs.next());
// If the table is transactional the write to both the data and index table will fail
// in an all or none manner. If the table is not transactional, then the data writes
// would have succeeded while the index writes would have failed.
if (!transactional) {
// Verify UPSERT on data table still work after index is disabled
PreparedStatement stmt = conn.prepareStatement("UPSERT INTO " + fullTableName + " VALUES(?,?,?)");
stmt.setString(1, "a3");
stmt.setString(2, "x3");
stmt.setString(3, "3");
stmt.execute();
conn.commit();
stmt = conn.prepareStatement("UPSERT INTO " + secondTableName + " VALUES(?,?,?)");
stmt.setString(1, "a3");
stmt.setString(2, "x3");
stmt.setString(3, "3");
stmt.execute();
conn.commit();
// Verify previous writes succeeded to data table
query = "SELECT /*+ NO_INDEX */ k,v1 FROM " + fullTableName;
rs = conn.createStatement().executeQuery("EXPLAIN " + query);
expectedPlan = "CLIENT PARALLEL 2-WAY FULL SCAN OVER "
+ SchemaUtil.getPhysicalTableName(fullTableName.getBytes(), isNamespaceMapped)+"\nCLIENT MERGE SORT";
assertEquals(expectedPlan, QueryUtil.getExplainPlan(rs));
rs = conn.createStatement().executeQuery(query);
assertTrue(rs.next());
assertEquals("a", rs.getString(1));
assertEquals("x2", rs.getString(2));
assertTrue(rs.next());
assertEquals("a3", rs.getString(1));
assertEquals("x3", rs.getString(2));
assertTrue(rs.next());
assertEquals("c", rs.getString(1));
assertEquals("z", rs.getString(2));
assertTrue(rs.next());
assertEquals("d", rs.getString(1));
assertEquals("d", rs.getString(2));
assertFalse(rs.next());
}
// re-enable index table
FailingRegionObserver.FAIL_WRITE = false;
waitForIndexToBeActive(conn,indexName);
waitForIndexToBeActive(conn,indexName+"_2");
waitForIndexToBeActive(conn,secondIndexName);
// Verify UPSERT on data table still work after index table is recreated
PreparedStatement stmt = conn.prepareStatement("UPSERT INTO " + fullTableName + " VALUES(?,?,?)");
stmt.setString(1, "a3");
stmt.setString(2, "x4");
stmt.setString(3, "4");
stmt.execute();
conn.commit();
stmt = conn.prepareStatement("UPSERT INTO " + secondTableName + " VALUES(?,?,?)");
stmt.setString(1, "a3");
stmt.setString(2, "x4");
stmt.setString(3, "4");
stmt.execute();
conn.commit();
// To clear the index name from connection.
PhoenixConnection phoenixConn = conn.unwrap(PhoenixConnection.class);
phoenixConn.getMetaDataCache().removeTable(null, fullTableName, null, HConstants.LATEST_TIMESTAMP);
// verify index table has correct data
validateDataWithIndex(conn, fullTableName, fullIndexName);
validateDataWithIndex(conn, secondTableName, secondFullIndexName);
} finally {
FAIL_WRITE = false;
}
}
private void waitForIndexToBeActive(Connection conn, String index) throws InterruptedException, SQLException {
boolean isActive = false;
if (!transactional) {
int maxTries = 4, nTries = 0;
do {
Thread.sleep(15 * 1000); // sleep 15 secs
ResultSet rs = conn.getMetaData().getTables(null, StringUtil.escapeLike(schema), index,
new String[] { PTableType.INDEX.toString() });
assertTrue(rs.next());
if (PIndexState.ACTIVE.toString().equals(rs.getString("INDEX_STATE"))) {
isActive = true;
break;
}
} while (++nTries < maxTries);
assertTrue(isActive);
}
}
private void initializeTable(Connection conn, String tableName) throws SQLException {
PreparedStatement stmt = conn.prepareStatement("UPSERT INTO " + tableName + " VALUES(?,?,?)");
stmt.setString(1, "a");
stmt.setString(2, "x");
stmt.setString(3, "1");
stmt.execute();
stmt.setString(1, "b");
stmt.setString(2, "y");
stmt.setString(3, "2");
stmt.execute();
stmt.setString(1, "c");
stmt.setString(2, "z");
stmt.setString(3, "3");
stmt.execute();
conn.commit();
}
private void validateDataWithIndex(Connection conn, String tableName, String indexName) throws SQLException {
String query = "SELECT /*+ INDEX(" + indexName + ") */ k,v1 FROM " + tableName;
ResultSet rs = conn.createStatement().executeQuery("EXPLAIN " + query);
String expectedPlan = " OVER "
+ (localIndex
? Bytes.toString(
SchemaUtil.getPhysicalTableName(tableName.getBytes(), isNamespaceMapped).getName())
: SchemaUtil.getPhysicalTableName(indexName.getBytes(), isNamespaceMapped).getNameAsString());
String explainPlan = QueryUtil.getExplainPlan(rs);
assertTrue(explainPlan, explainPlan.contains(expectedPlan));
rs = conn.createStatement().executeQuery(query);
if (transactional) { // failed commit does not get retried
assertTrue(rs.next());
assertEquals("a", rs.getString(1));
assertEquals("x", rs.getString(2));
assertTrue(rs.next());
assertEquals("a3", rs.getString(1));
assertEquals("x4", rs.getString(2));
assertTrue(rs.next());
assertEquals("b", rs.getString(1));
assertEquals("y", rs.getString(2));
assertTrue(rs.next());
assertEquals("c", rs.getString(1));
assertEquals("z", rs.getString(2));
assertFalse(rs.next());
} else { // failed commit eventually succeeds
assertTrue(rs.next());
assertEquals("d", rs.getString(1));
assertEquals("d", rs.getString(2));
assertTrue(rs.next());
assertEquals("a", rs.getString(1));
assertEquals("x2", rs.getString(2));
assertTrue(rs.next());
assertEquals("a3", rs.getString(1));
assertEquals("x4", rs.getString(2));
assertTrue(rs.next());
assertEquals("c", rs.getString(1));
assertEquals("z", rs.getString(2));
assertFalse(rs.next());
}
}
private void updateTable(Connection conn, String tableName) throws SQLException {
PreparedStatement stmt = conn.prepareStatement("UPSERT INTO " + tableName + " VALUES(?,?,?)");
// Insert new row
stmt.setString(1, "d");
stmt.setString(2, "d");
stmt.setString(3, "4");
stmt.execute();
// Update existing row
stmt.setString(1, "a");
stmt.setString(2, "x2");
stmt.setString(3, "2");
stmt.execute();
// Delete existing row
stmt = conn.prepareStatement("DELETE FROM " + tableName + " WHERE k=?");
stmt.setString(1, "b");
stmt.execute();
try {
conn.commit();
fail();
} catch (SQLException e) {
} catch (Exception e) {
}
}
public static class FailingRegionObserver extends SimpleRegionObserver {
public static volatile boolean FAIL_WRITE = false;
public static final String INDEX_NAME = "IDX";
@Override
public void preBatchMutate(ObserverContext<RegionCoprocessorEnvironment> c, MiniBatchOperationInProgress<Mutation> miniBatchOp) throws HBaseIOException {
if (c.getEnvironment().getRegionInfo().getTable().getNameAsString().contains(INDEX_NAME) && FAIL_WRITE) {
dropIndex(c);
throw new DoNotRetryIOException();
}
Mutation operation = miniBatchOp.getOperation(0);
Set<byte[]> keySet = operation.getFamilyMap().keySet();
for(byte[] family: keySet) {
if(Bytes.toString(family).startsWith(QueryConstants.LOCAL_INDEX_COLUMN_FAMILY_PREFIX) && FAIL_WRITE) {
dropIndex(c);
throw new DoNotRetryIOException();
}
}
}
private void dropIndex(ObserverContext<RegionCoprocessorEnvironment> c) {
try {
Connection connection =
QueryUtil.getConnection(c.getEnvironment().getConfiguration());
connection.createStatement().execute(
"DROP INDEX IF EXISTS " + INDEX_NAME + "_3" + " ON "
+ fullTableName);
} catch (ClassNotFoundException e) {
} catch (SQLException e) {
}
}
}
}