blob: cf3cb29b4b175050715959e8b3d73c4ef1dcbaed [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.phoenix.end2end.index;
import static org.apache.phoenix.util.TestUtil.TEST_PROPERTIES;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertTrue;
import static org.junit.Assert.fail;
import java.sql.Connection;
import java.sql.PreparedStatement;
import java.sql.ResultSet;
import java.sql.SQLException;
import java.util.Arrays;
import java.util.Collection;
import java.util.Map;
import java.util.Properties;
import java.util.Set;
import org.apache.hadoop.hbase.DoNotRetryIOException;
import org.apache.hadoop.hbase.HBaseIOException;
import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.client.Mutation;
import org.apache.hadoop.hbase.coprocessor.ObserverContext;
import org.apache.hadoop.hbase.coprocessor.RegionCoprocessorEnvironment;
import org.apache.hadoop.hbase.coprocessor.SimpleRegionObserver;
import org.apache.hadoop.hbase.regionserver.MiniBatchOperationInProgress;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.phoenix.end2end.BaseOwnClusterIT;
import org.apache.phoenix.end2end.NeedsOwnMiniClusterTest;
import org.apache.phoenix.exception.SQLExceptionCode;
import org.apache.phoenix.hbase.index.Indexer;
import org.apache.phoenix.jdbc.PhoenixDatabaseMetaData;
import org.apache.phoenix.query.QueryConstants;
import org.apache.phoenix.query.QueryServices;
import org.apache.phoenix.schema.PIndexState;
import org.apache.phoenix.schema.PTableType;
import org.apache.phoenix.util.PropertiesUtil;
import org.apache.phoenix.util.ReadOnlyProps;
import org.apache.phoenix.util.SchemaUtil;
import org.apache.phoenix.util.StringUtil;
import org.apache.phoenix.util.TestUtil;
import org.junit.BeforeClass;
import org.junit.Test;
import org.junit.experimental.categories.Category;
import org.junit.runner.RunWith;
import org.junit.runners.Parameterized;
import org.junit.runners.Parameterized.Parameters;
import com.google.common.collect.Maps;
/**
*
* Test for failure of region server to write to index table.
* For some reason dropping tables after running this test
* fails unless it runs its own mini cluster.
*
*
* @since 2.1
*/
@Category(NeedsOwnMiniClusterTest.class)
@RunWith(Parameterized.class)
public class ReadOnlyIndexFailureIT extends BaseOwnClusterIT {
public static volatile boolean FAIL_WRITE = false;
public static final String INDEX_NAME = "IDX";
private String tableName;
private String indexName;
private String fullTableName;
private String fullIndexName;
private final boolean localIndex;
public ReadOnlyIndexFailureIT(boolean localIndex) {
this.localIndex = localIndex;
this.tableName = (localIndex ? "L_" : "") + TestUtil.DEFAULT_DATA_TABLE_NAME;
this.indexName = INDEX_NAME;
this.fullTableName = SchemaUtil.getTableName(TestUtil.DEFAULT_SCHEMA_NAME, tableName);
this.fullIndexName = SchemaUtil.getTableName(TestUtil.DEFAULT_SCHEMA_NAME, indexName);
}
@Parameters(name = "ReadOnlyIndexFailureIT_localIndex={0}") // name is used by failsafe as file name in reports
public static Collection<Boolean[]> data() {
return Arrays.asList(new Boolean[][] { { false }, { true } });
}
@BeforeClass
public static void doSetup() throws Exception {
Map<String, String> serverProps = Maps.newHashMapWithExpectedSize(10);
serverProps.put(HConstants.HBASE_CLIENT_RETRIES_NUMBER, "2");
serverProps.put(HConstants.HBASE_RPC_TIMEOUT_KEY, "10000");
serverProps.put("hbase.client.pause", "5000");
serverProps.put("hbase.balancer.period", String.valueOf(Integer.MAX_VALUE));
serverProps.put(QueryServices.INDEX_FAILURE_BLOCK_WRITE, "true");
serverProps.put(QueryServices.INDEX_FAILURE_HANDLING_REBUILD_ATTRIB, "true");
serverProps.put(QueryServices.INDEX_FAILURE_HANDLING_REBUILD_INTERVAL_ATTRIB, "1000");
serverProps.put("hbase.coprocessor.region.classes", FailingRegionObserver.class.getName());
serverProps.put("hbase.coprocessor.abortonerror", "false");
serverProps.put(Indexer.CHECK_VERSION_CONF_KEY, "false");
NUM_SLAVES_BASE = 4;
setUpTestDriver(new ReadOnlyProps(serverProps.entrySet().iterator()),
ReadOnlyProps.EMPTY_PROPS);
}
@Test
public void testWriteFailureReadOnlyIndex() throws Exception {
helpTestWriteFailureReadOnlyIndex();
}
public void helpTestWriteFailureReadOnlyIndex() throws Exception {
Properties props = PropertiesUtil.deepCopy(TEST_PROPERTIES);
try (Connection conn = driver.connect(url, props)) {
String query;
ResultSet rs;
conn.setAutoCommit(false);
conn.createStatement().execute(
"CREATE TABLE " + fullTableName + " (k VARCHAR NOT NULL PRIMARY KEY, v1 VARCHAR, v2 VARCHAR)");
query = "SELECT * FROM " + fullTableName;
rs = conn.createStatement().executeQuery(query);
assertFalse(rs.next());
FAIL_WRITE = false;
if(localIndex) {
conn.createStatement().execute(
"CREATE LOCAL INDEX " + indexName + " ON " + fullTableName
+ " (v1) INCLUDE (v2)");
} else {
conn.createStatement().execute(
"CREATE INDEX " + indexName + " ON " + fullTableName
+ " (v1) INCLUDE (v2)");
}
query = "SELECT * FROM " + fullIndexName;
rs = conn.createStatement().executeQuery(query);
assertFalse(rs.next());
// Verify the metadata for index is correct.
rs = conn.getMetaData().getTables(null,
StringUtil.escapeLike(TestUtil.DEFAULT_SCHEMA_NAME), indexName,
new String[] { PTableType.INDEX.toString() });
assertTrue(rs.next());
assertEquals(indexName, rs.getString(3));
assertEquals(PIndexState.ACTIVE.toString(), rs.getString("INDEX_STATE"));
assertFalse(rs.next());
PreparedStatement stmt = conn.prepareStatement("UPSERT INTO " + fullTableName
+ " VALUES(?,?,?)");
stmt.setString(1, "1");
stmt.setString(2, "aaa");
stmt.setString(3, "a1");
stmt.execute();
conn.commit();
FAIL_WRITE = true;
stmt = conn.prepareStatement("UPSERT INTO " + fullTableName + " VALUES(?,?,?)");
stmt.setString(1, "2");
stmt.setString(2, "bbb");
stmt.setString(3, "b2");
stmt.execute();
try {
conn.commit();
fail();
} catch (SQLException e) {
}
// Only successfully committed row should be seen
query = "SELECT /*+ NO_INDEX*/ v1 FROM " + fullTableName;
rs = conn.createStatement().executeQuery(query);
assertTrue(rs.next());
assertEquals("aaa", rs.getString(1));
assertFalse(rs.next());
// Verify the metadata for index is correct.
rs = conn.getMetaData().getTables(null,
StringUtil.escapeLike(TestUtil.DEFAULT_SCHEMA_NAME), indexName,
new String[] { PTableType.INDEX.toString() });
assertTrue(rs.next());
assertEquals(indexName, rs.getString(3));
// the index is always active for tables upon index table write failure
assertEquals(PIndexState.ACTIVE.toString(), rs.getString("INDEX_STATE"));
assertFalse(rs.next());
// if the table is transactional the write to the index table will fail because the
// index has not been disabled
// Verify UPSERT on data table is blocked after index write failed
stmt = conn.prepareStatement("UPSERT INTO " + fullTableName + " VALUES(?,?,?)");
stmt.setString(1, "3");
stmt.setString(2, "ccc");
stmt.setString(3, "3c");
try {
stmt.execute();
/* Writes would be blocked */
conn.commit();
fail();
} catch (SQLException e) {
assertEquals(SQLExceptionCode.INDEX_FAILURE_BLOCK_WRITE.getErrorCode(), e.getErrorCode());
}
FAIL_WRITE = false;
// Second attempt at writing will succeed
int retries = 0;
do {
Thread.sleep(5 * 1000); // sleep 5 secs
if(!hasIndexDisableTimestamp(conn, indexName)){
break;
}
if (++retries == 5) {
fail("Failed to rebuild index with allowed time");
}
} while(true);
// Verify UPSERT on data table still work after index table is recreated
stmt = conn.prepareStatement("UPSERT INTO " + fullTableName + " VALUES(?,?,?)");
stmt.setString(1, "4");
stmt.setString(2, "ddd");
stmt.setString(3, "4d");
stmt.execute();
conn.commit();
// verify index table has data
query = "SELECT count(1) FROM " + fullIndexName;
rs = conn.createStatement().executeQuery(query);
assertTrue(rs.next());
assertEquals(3, rs.getInt(1));
query = "SELECT /*+ INDEX(" + indexName + ") */ v1 FROM " + fullTableName;
rs = conn.createStatement().executeQuery(query);
assertTrue(rs.next());
assertEquals("aaa", rs.getString(1));
assertTrue(rs.next());
assertEquals("bbb", rs.getString(1));
assertTrue(rs.next());
assertEquals("ddd", rs.getString(1));
assertFalse(rs.next());
query = "SELECT /*+ NO_INDEX*/ v1 FROM " + fullTableName;
rs = conn.createStatement().executeQuery(query);
assertTrue(rs.next());
assertEquals("aaa", rs.getString(1));
assertTrue(rs.next());
assertEquals("bbb", rs.getString(1));
assertTrue(rs.next());
assertEquals("ddd", rs.getString(1));
assertFalse(rs.next());
}
}
private static boolean hasIndexDisableTimestamp(Connection conn, String indexName) throws SQLException {
ResultSet rs = conn.createStatement().executeQuery("SELECT " + PhoenixDatabaseMetaData.INDEX_DISABLE_TIMESTAMP +
" FROM " + PhoenixDatabaseMetaData.SYSTEM_CATALOG_NAME +
" WHERE " + PhoenixDatabaseMetaData.COLUMN_NAME + " IS NULL" +
" AND " + PhoenixDatabaseMetaData.TENANT_ID + " IS NULL" +
" AND " + PhoenixDatabaseMetaData.TABLE_SCHEM + " IS NULL" +
" AND " + PhoenixDatabaseMetaData.TABLE_NAME + " = '" + indexName + "'");
assertTrue(rs.next());
long ts = rs.getLong(1);
return (!rs.wasNull() && ts > 0);
}
public static class FailingRegionObserver extends SimpleRegionObserver {
@Override
public void preBatchMutate(ObserverContext<RegionCoprocessorEnvironment> c, MiniBatchOperationInProgress<Mutation> miniBatchOp) throws HBaseIOException {
if (c.getEnvironment().getRegionInfo().getTable().getNameAsString().contains(INDEX_NAME) && FAIL_WRITE) {
throw new DoNotRetryIOException();
}
Mutation operation = miniBatchOp.getOperation(0);
Set<byte[]> keySet = operation.getFamilyMap().keySet();
for(byte[] family: keySet) {
if(Bytes.toString(family).startsWith(QueryConstants.LOCAL_INDEX_COLUMN_FAMILY_PREFIX) && FAIL_WRITE) {
throw new DoNotRetryIOException();
}
}
}
}
}