blob: 4bfdcad029ae3162fb6db2d56e4e3bc3df9aff6a [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hbase.client;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertNull;
import static org.junit.Assert.assertTrue;
import static org.junit.Assert.fail;
import java.io.IOException;
import java.util.Arrays;
import java.util.List;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.CallQueueTooBigException;
import org.apache.hadoop.hbase.HBaseClassTestRule;
import org.apache.hadoop.hbase.HBaseTestingUtility;
import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.MultiActionResultTooLarge;
import org.apache.hadoop.hbase.NotServingRegionException;
import org.apache.hadoop.hbase.RegionTooBusyException;
import org.apache.hadoop.hbase.RetryImmediatelyException;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.exceptions.ClientExceptionsUtil;
import org.apache.hadoop.hbase.exceptions.RegionOpeningException;
import org.apache.hadoop.hbase.quotas.RpcThrottlingException;
import org.apache.hadoop.hbase.regionserver.HRegionServer;
import org.apache.hadoop.hbase.regionserver.RSRpcServices;
import org.apache.hadoop.hbase.testclassification.ClientTests;
import org.apache.hadoop.hbase.testclassification.MediumTests;
import org.apache.hadoop.hbase.util.Bytes;
import org.junit.After;
import org.junit.AfterClass;
import org.junit.BeforeClass;
import org.junit.ClassRule;
import org.junit.Test;
import org.junit.experimental.categories.Category;
import org.apache.hbase.thirdparty.com.google.common.io.Closeables;
import org.apache.hbase.thirdparty.com.google.protobuf.RpcController;
import org.apache.hbase.thirdparty.com.google.protobuf.ServiceException;
import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos;
import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.GetResponse;
import org.apache.hadoop.hbase.shaded.protobuf.generated.HBaseProtos;
@Category({MediumTests.class, ClientTests.class})
public class TestMetaCache {
@ClassRule
public static final HBaseClassTestRule CLASS_RULE =
HBaseClassTestRule.forClass(TestMetaCache.class);
private final static HBaseTestingUtility TEST_UTIL = new HBaseTestingUtility();
private static final TableName TABLE_NAME = TableName.valueOf("test_table");
private static final byte[] FAMILY = Bytes.toBytes("fam1");
private static final byte[] QUALIFIER = Bytes.toBytes("qual");
private static HRegionServer badRS;
private Connection conn;
private MetricsConnection metrics;
private AsyncRegionLocator locator;
@BeforeClass
public static void setUpBeforeClass() throws Exception {
Configuration conf = TEST_UTIL.getConfiguration();
conf.setStrings(HConstants.REGION_SERVER_IMPL, RegionServerWithFakeRpcServices.class.getName());
TEST_UTIL.startMiniCluster(1);
TEST_UTIL.getHBaseCluster().waitForActiveAndReadyMaster();
TEST_UTIL.waitUntilAllRegionsAssigned(TableName.META_TABLE_NAME);
badRS = TEST_UTIL.getHBaseCluster().getRegionServer(0);
assertTrue(badRS.getRSRpcServices() instanceof FakeRSRpcServices);
TableDescriptor desc = TableDescriptorBuilder.newBuilder(TABLE_NAME)
.setColumnFamily(ColumnFamilyDescriptorBuilder.newBuilder(FAMILY).setMaxVersions(2).build())
.build();
TEST_UTIL.createTable(desc, null);
}
@AfterClass
public static void tearDownAfterClass() throws Exception {
TEST_UTIL.shutdownMiniCluster();
}
@After
public void tearDown() throws IOException {
Closeables.close(conn, true);
}
private void setupConnection(int retry) throws IOException {
Configuration conf = new Configuration(TEST_UTIL.getConfiguration());
conf.setInt(HConstants.HBASE_CLIENT_RETRIES_NUMBER, retry);
conf.setBoolean(MetricsConnection.CLIENT_SIDE_METRICS_ENABLED_KEY, true);
conn = ConnectionFactory.createConnection(conf);
AsyncConnectionImpl asyncConn = (AsyncConnectionImpl) conn.toAsyncConnection();
locator = asyncConn.getLocator();
metrics = asyncConn.getConnectionMetrics().get();
}
@Test
public void testPreserveMetaCacheOnException() throws Exception {
((FakeRSRpcServices) badRS.getRSRpcServices())
.setExceptionInjector(new RoundRobinExceptionInjector());
setupConnection(1);
try (Table table = conn.getTable(TABLE_NAME)){
byte[] row = Bytes.toBytes("row1");
Put put = new Put(row);
put.addColumn(FAMILY, QUALIFIER, Bytes.toBytes(10));
Get get = new Get(row);
Append append = new Append(row);
append.addColumn(FAMILY, QUALIFIER, Bytes.toBytes(11));
Increment increment = new Increment(row);
increment.addColumn(FAMILY, QUALIFIER, 10);
Delete delete = new Delete(row);
delete.addColumn(FAMILY, QUALIFIER);
RowMutations mutations = new RowMutations(row);
mutations.add(put);
mutations.add(delete);
Exception exp;
boolean success;
for (int i = 0; i < 50; i++) {
exp = null;
success = false;
try {
table.put(put);
// If at least one operation succeeded, we should have cached the region location.
success = true;
table.get(get);
table.append(append);
table.increment(increment);
table.delete(delete);
table.mutateRow(mutations);
} catch (IOException ex) {
// Only keep track of the last exception that updated the meta cache
if (ClientExceptionsUtil.isMetaClearingException(ex) || success) {
exp = ex;
}
}
// Do not test if we did not touch the meta cache in this iteration.
if (exp != null && ClientExceptionsUtil.isMetaClearingException(exp)) {
assertNull(locator.getRegionLocationInCache(TABLE_NAME, row));
} else if (success) {
assertNotNull(locator.getRegionLocationInCache(TABLE_NAME, row));
}
}
}
}
@Test
public void testCacheClearingOnCallQueueTooBig() throws Exception {
((FakeRSRpcServices) badRS.getRSRpcServices())
.setExceptionInjector(new CallQueueTooBigExceptionInjector());
setupConnection(2);
Table table = conn.getTable(TABLE_NAME);
byte[] row = Bytes.toBytes("row1");
Put put = new Put(row);
put.addColumn(FAMILY, QUALIFIER, Bytes.toBytes(10));
table.put(put);
// obtain the client metrics
long preGetRegionClears = metrics.metaCacheNumClearRegion.getCount();
long preGetServerClears = metrics.metaCacheNumClearServer.getCount();
// attempt a get on the test table
Get get = new Get(row);
try {
table.get(get);
fail("Expected CallQueueTooBigException");
} catch (RetriesExhaustedException ree) {
// expected
}
// verify that no cache clearing took place
long postGetRegionClears = metrics.metaCacheNumClearRegion.getCount();
long postGetServerClears = metrics.metaCacheNumClearServer.getCount();
assertEquals(preGetRegionClears, postGetRegionClears);
assertEquals(preGetServerClears, postGetServerClears);
}
public static List<Throwable> metaCachePreservingExceptions() {
return Arrays.asList(new RegionOpeningException(" "),
new RegionTooBusyException("Some old message"), new RpcThrottlingException(" "),
new MultiActionResultTooLarge(" "), new RetryImmediatelyException(" "),
new CallQueueTooBigException());
}
public static class RegionServerWithFakeRpcServices extends HRegionServer {
private FakeRSRpcServices rsRpcServices;
public RegionServerWithFakeRpcServices(Configuration conf)
throws IOException, InterruptedException {
super(conf);
}
@Override
protected RSRpcServices createRpcServices() throws IOException {
this.rsRpcServices = new FakeRSRpcServices(this);
return rsRpcServices;
}
public void setExceptionInjector(ExceptionInjector injector) {
rsRpcServices.setExceptionInjector(injector);
}
}
public static class FakeRSRpcServices extends RSRpcServices {
private ExceptionInjector exceptions;
public FakeRSRpcServices(HRegionServer rs) throws IOException {
super(rs);
exceptions = new RoundRobinExceptionInjector();
}
public void setExceptionInjector(ExceptionInjector injector) {
this.exceptions = injector;
}
@Override
public GetResponse get(final RpcController controller,
final ClientProtos.GetRequest request) throws ServiceException {
exceptions.throwOnGet(this, request);
return super.get(controller, request);
}
@Override
public ClientProtos.MutateResponse mutate(final RpcController controller,
final ClientProtos.MutateRequest request) throws ServiceException {
exceptions.throwOnMutate(this, request);
return super.mutate(controller, request);
}
@Override
public ClientProtos.ScanResponse scan(final RpcController controller,
final ClientProtos.ScanRequest request) throws ServiceException {
exceptions.throwOnScan(this, request);
return super.scan(controller, request);
}
}
public static abstract class ExceptionInjector {
protected boolean isTestTable(FakeRSRpcServices rpcServices,
HBaseProtos.RegionSpecifier regionSpec) throws ServiceException {
try {
return TABLE_NAME.equals(
rpcServices.getRegion(regionSpec).getTableDescriptor().getTableName());
} catch (IOException ioe) {
throw new ServiceException(ioe);
}
}
public abstract void throwOnGet(FakeRSRpcServices rpcServices, ClientProtos.GetRequest request)
throws ServiceException;
public abstract void throwOnMutate(FakeRSRpcServices rpcServices, ClientProtos.MutateRequest request)
throws ServiceException;
public abstract void throwOnScan(FakeRSRpcServices rpcServices, ClientProtos.ScanRequest request)
throws ServiceException;
}
/**
* Rotates through the possible cache clearing and non-cache clearing exceptions
* for requests.
*/
public static class RoundRobinExceptionInjector extends ExceptionInjector {
private int numReqs = -1;
private int expCount = -1;
private List<Throwable> metaCachePreservingExceptions = metaCachePreservingExceptions();
@Override
public void throwOnGet(FakeRSRpcServices rpcServices, ClientProtos.GetRequest request)
throws ServiceException {
throwSomeExceptions(rpcServices, request.getRegion());
}
@Override
public void throwOnMutate(FakeRSRpcServices rpcServices, ClientProtos.MutateRequest request)
throws ServiceException {
throwSomeExceptions(rpcServices, request.getRegion());
}
@Override
public void throwOnScan(FakeRSRpcServices rpcServices, ClientProtos.ScanRequest request)
throws ServiceException {
if (!request.hasScannerId()) {
// only handle initial scan requests
throwSomeExceptions(rpcServices, request.getRegion());
}
}
/**
* Throw some exceptions. Mostly throw exceptions which do not clear meta cache.
* Periodically throw NotSevingRegionException which clears the meta cache.
* @throws ServiceException
*/
private void throwSomeExceptions(FakeRSRpcServices rpcServices,
HBaseProtos.RegionSpecifier regionSpec)
throws ServiceException {
if (!isTestTable(rpcServices, regionSpec)) {
return;
}
numReqs++;
// Succeed every 5 request, throw cache clearing exceptions twice every 5 requests and throw
// meta cache preserving exceptions otherwise.
if (numReqs % 5 ==0) {
return;
} else if (numReqs % 5 == 1 || numReqs % 5 == 2) {
throw new ServiceException(new NotServingRegionException());
}
// Round robin between different special exceptions.
// This is not ideal since exception types are not tied to the operation performed here,
// But, we don't really care here if we throw MultiActionTooLargeException while doing
// single Gets.
expCount++;
Throwable t = metaCachePreservingExceptions.get(
expCount % metaCachePreservingExceptions.size());
throw new ServiceException(t);
}
}
/**
* Throws CallQueueTooBigException for all gets.
*/
public static class CallQueueTooBigExceptionInjector extends ExceptionInjector {
@Override
public void throwOnGet(FakeRSRpcServices rpcServices, ClientProtos.GetRequest request)
throws ServiceException {
if (isTestTable(rpcServices, request.getRegion())) {
throw new ServiceException(new CallQueueTooBigException());
}
}
@Override
public void throwOnMutate(FakeRSRpcServices rpcServices, ClientProtos.MutateRequest request)
throws ServiceException {
}
@Override
public void throwOnScan(FakeRSRpcServices rpcServices, ClientProtos.ScanRequest request)
throws ServiceException {
}
}
}