blob: 859c37ceded65557c6557c0b3d3a166cfe0c9337 [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hbase.client;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertTrue;
import java.io.IOException;
import java.util.List;
import java.util.Set;
import java.util.concurrent.ThreadLocalRandom;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicReference;
import java.util.stream.Collectors;
import java.util.stream.IntStream;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.DoNotRetryIOException;
import org.apache.hadoop.hbase.HBaseClassTestRule;
import org.apache.hadoop.hbase.HBaseTestingUtility;
import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.HRegionLocation;
import org.apache.hadoop.hbase.ServerName;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.Waiter;
import org.apache.hadoop.hbase.client.coprocessor.Batch;
import org.apache.hadoop.hbase.coprocessor.MultiRowMutationEndpoint;
import org.apache.hadoop.hbase.ipc.RpcClient;
import org.apache.hadoop.hbase.testclassification.LargeTests;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
import org.apache.hadoop.hbase.util.ManualEnvironmentEdge;
import org.junit.After;
import org.junit.AfterClass;
import org.junit.Assert;
import org.junit.BeforeClass;
import org.junit.ClassRule;
import org.junit.Rule;
import org.junit.Test;
import org.junit.experimental.categories.Category;
import org.junit.rules.TestName;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.apache.hbase.thirdparty.com.google.protobuf.ServiceException;
import org.apache.hbase.thirdparty.io.netty.util.ResourceLeakDetector;
import org.apache.hbase.thirdparty.io.netty.util.ResourceLeakDetector.Level;
import org.apache.hadoop.hbase.shaded.protobuf.generated.MultiRowMutationProtos.MultiRowMutationService;
import org.apache.hadoop.hbase.shaded.protobuf.generated.MultiRowMutationProtos.MutateRowsResponse;
/**
* This class is for testing {@link Connection}.
*/
@Category({ LargeTests.class })
public class TestConnection {
@ClassRule
public static final HBaseClassTestRule CLASS_RULE =
HBaseClassTestRule.forClass(TestConnection.class);
private static final Logger LOG = LoggerFactory.getLogger(TestConnection.class);
private final static HBaseTestingUtility TEST_UTIL = new HBaseTestingUtility();
private static final byte[] FAM_NAM = Bytes.toBytes("f");
private static final byte[] ROW = Bytes.toBytes("bbb");
private static final int RPC_RETRY = 5;
@Rule
public TestName name = new TestName();
@BeforeClass
public static void setUpBeforeClass() throws Exception {
ResourceLeakDetector.setLevel(Level.PARANOID);
TEST_UTIL.getConfiguration().setBoolean(HConstants.STATUS_PUBLISHED, true);
// Up the handlers; this test needs more than usual.
TEST_UTIL.getConfiguration().setInt(HConstants.REGION_SERVER_HIGH_PRIORITY_HANDLER_COUNT, 10);
TEST_UTIL.getConfiguration().setInt(HConstants.HBASE_CLIENT_RETRIES_NUMBER, RPC_RETRY);
TEST_UTIL.getConfiguration().setInt(HConstants.REGION_SERVER_HANDLER_COUNT, 3);
TEST_UTIL.startMiniCluster(2);
}
@AfterClass
public static void tearDownAfterClass() throws Exception {
TEST_UTIL.shutdownMiniCluster();
}
@After
public void tearDown() throws IOException {
TEST_UTIL.getAdmin().balancerSwitch(true, true);
}
/**
* Naive test to check that Connection#getAdmin returns a properly constructed HBaseAdmin object
* @throws IOException Unable to construct admin
*/
@Test
public void testAdminFactory() throws IOException {
Connection con1 = ConnectionFactory.createConnection(TEST_UTIL.getConfiguration());
Admin admin = con1.getAdmin();
assertTrue(admin.getConnection() == con1);
assertTrue(admin.getConfiguration() == TEST_UTIL.getConfiguration());
con1.close();
}
/**
* Test that we can handle connection close: it will trigger a retry, but the calls will finish.
*/
@Test
public void testConnectionCloseAllowsInterrupt() throws Exception {
testConnectionClose(true);
}
@Test
public void testConnectionNotAllowsInterrupt() throws Exception {
testConnectionClose(false);
}
private void testConnectionClose(boolean allowsInterrupt) throws Exception {
TableName tableName = TableName.valueOf("HCM-testConnectionClose" + allowsInterrupt);
TEST_UTIL.createTable(tableName, FAM_NAM).close();
TEST_UTIL.getAdmin().balancerSwitch(false, true);
Configuration c2 = new Configuration(TEST_UTIL.getConfiguration());
// We want to work on a separate connection.
c2.set(HConstants.HBASE_CLIENT_INSTANCE_ID, String.valueOf(-1));
c2.setInt(HConstants.HBASE_CLIENT_RETRIES_NUMBER, 100); // retry a lot
c2.setInt(HConstants.HBASE_CLIENT_PAUSE, 1); // don't wait between retries.
c2.setInt(RpcClient.FAILED_SERVER_EXPIRY_KEY, 0); // Server do not really expire
c2.setBoolean(RpcClient.SPECIFIC_WRITE_THREAD, allowsInterrupt);
// to avoid the client to be stuck when do the Get
c2.setInt(HConstants.HBASE_CLIENT_META_OPERATION_TIMEOUT, 10000);
c2.setInt(HConstants.HBASE_CLIENT_OPERATION_TIMEOUT, 10000);
c2.setInt(HConstants.HBASE_RPC_TIMEOUT_KEY, 5000);
Connection connection = ConnectionFactory.createConnection(c2);
final Table table = connection.getTable(tableName);
Put put = new Put(ROW);
put.addColumn(FAM_NAM, ROW, ROW);
table.put(put);
// 4 steps: ready=0; doGets=1; mustStop=2; stopped=3
final AtomicInteger step = new AtomicInteger(0);
final AtomicReference<Throwable> failed = new AtomicReference<>(null);
Thread t = new Thread("testConnectionCloseThread") {
@Override
public void run() {
int done = 0;
try {
step.set(1);
while (step.get() == 1) {
Get get = new Get(ROW);
table.get(get);
done++;
if (done % 100 == 0) {
LOG.info("done=" + done);
}
// without the sleep, will cause the exception for too many files in
// org.apache.hadoop.hdfs.server.datanode.DataXceiver
Thread.sleep(100);
}
} catch (Throwable t) {
failed.set(t);
LOG.error(t.toString(), t);
}
step.set(3);
}
};
t.start();
TEST_UTIL.waitFor(20000, new Waiter.Predicate<Exception>() {
@Override
public boolean evaluate() throws Exception {
return step.get() == 1;
}
});
ServerName sn;
try (RegionLocator rl = connection.getRegionLocator(tableName)) {
sn = rl.getRegionLocation(ROW).getServerName();
}
RpcClient rpcClient = ((AsyncConnectionImpl) connection.toAsyncConnection()).rpcClient;
LOG.info("Going to cancel connections. connection=" + connection.toString() + ", sn=" + sn);
for (int i = 0; i < 500; i++) {
rpcClient.cancelConnections(sn);
Thread.sleep(50);
}
step.compareAndSet(1, 2);
// The test may fail here if the thread doing the gets is stuck. The way to find
// out what's happening is to look for the thread named 'testConnectionCloseThread'
TEST_UTIL.waitFor(40000, new Waiter.Predicate<Exception>() {
@Override
public boolean evaluate() throws Exception {
return step.get() == 3;
}
});
table.close();
connection.close();
Assert.assertTrue("Unexpected exception is " + failed.get(), failed.get() == null);
}
/**
* Test that connection can become idle without breaking everything.
*/
@Test
public void testConnectionIdle() throws Exception {
final TableName tableName = TableName.valueOf(name.getMethodName());
TEST_UTIL.createTable(tableName, FAM_NAM).close();
int idleTime = 20000;
boolean previousBalance = TEST_UTIL.getAdmin().balancerSwitch(false, true);
Configuration c2 = new Configuration(TEST_UTIL.getConfiguration());
// We want to work on a separate connection.
c2.set(HConstants.HBASE_CLIENT_INSTANCE_ID, String.valueOf(-1));
c2.setInt(HConstants.HBASE_CLIENT_RETRIES_NUMBER, 1); // Don't retry: retry = test failed
c2.setInt(RpcClient.IDLE_TIME, idleTime);
Connection connection = ConnectionFactory.createConnection(c2);
final Table table = connection.getTable(tableName);
Put put = new Put(ROW);
put.addColumn(FAM_NAM, ROW, ROW);
table.put(put);
ManualEnvironmentEdge mee = new ManualEnvironmentEdge();
mee.setValue(System.currentTimeMillis());
EnvironmentEdgeManager.injectEdge(mee);
LOG.info("first get");
table.get(new Get(ROW));
LOG.info("first get - changing the time & sleeping");
mee.incValue(idleTime + 1000);
Thread.sleep(1500); // we need to wait a little for the connection to be seen as idle.
// 1500 = sleep time in RpcClient#waitForWork + a margin
LOG.info("second get - connection has been marked idle in the middle");
// To check that the connection actually became idle would need to read some private
// fields of RpcClient.
table.get(new Get(ROW));
mee.incValue(idleTime + 1000);
LOG.info("third get - connection is idle, but the reader doesn't know yet");
// We're testing here a special case:
// time limit reached BUT connection not yet reclaimed AND a new call.
// in this situation, we don't close the connection, instead we use it immediately.
// If we're very unlucky we can have a race condition in the test: the connection is already
// under closing when we do the get, so we have an exception, and we don't retry as the
// retry number is 1. The probability is very very low, and seems acceptable for now. It's
// a test issue only.
table.get(new Get(ROW));
LOG.info("we're done - time will change back");
table.close();
connection.close();
EnvironmentEdgeManager.reset();
TEST_UTIL.getAdmin().balancerSwitch(previousBalance, true);
}
@Test
public void testClosing() throws Exception {
Configuration configuration = new Configuration(TEST_UTIL.getConfiguration());
configuration.set(HConstants.HBASE_CLIENT_INSTANCE_ID,
String.valueOf(ThreadLocalRandom.current().nextInt()));
// as connection caching is going away, now we're just testing
// that closed connection does actually get closed.
Connection c1 = ConnectionFactory.createConnection(configuration);
Connection c2 = ConnectionFactory.createConnection(configuration);
// no caching, different connections
assertTrue(c1 != c2);
// closing independently
c1.close();
assertTrue(c1.isClosed());
assertFalse(c2.isClosed());
c2.close();
assertTrue(c2.isClosed());
}
/**
* Trivial test to verify that nobody messes with
* {@link ConnectionFactory#createConnection(Configuration)}
*/
@Test
public void testCreateConnection() throws Exception {
Configuration configuration = TEST_UTIL.getConfiguration();
Connection c1 = ConnectionFactory.createConnection(configuration);
Connection c2 = ConnectionFactory.createConnection(configuration);
// created from the same configuration, yet they are different
assertTrue(c1 != c2);
assertTrue(c1.getConfiguration() == c2.getConfiguration());
}
/*
====> With MasterRegistry, connections cannot outlast the masters' lifetime.
@Test
public void testConnectionRideOverClusterRestart() throws IOException, InterruptedException {
Configuration config = new Configuration(TEST_UTIL.getConfiguration());
final TableName tableName = TableName.valueOf(name.getMethodName());
TEST_UTIL.createTable(tableName, new byte[][] { FAM_NAM }).close();
Connection connection = ConnectionFactory.createConnection(config);
Table table = connection.getTable(tableName);
// this will cache the meta location and table's region location
table.get(new Get(Bytes.toBytes("foo")));
// restart HBase
TEST_UTIL.shutdownMiniHBaseCluster();
TEST_UTIL.restartHBaseCluster(2);
// this should be able to discover new locations for meta and table's region
table.get(new Get(Bytes.toBytes("foo")));
TEST_UTIL.deleteTable(tableName);
table.close();
connection.close();
}
*/
@Test
public void testLocateRegionsWithRegionReplicas() throws IOException {
int regionReplication = 3;
byte[] family = Bytes.toBytes("cf");
TableName tableName = TableName.valueOf(name.getMethodName());
// Create a table with region replicas
TableDescriptorBuilder builder =
TableDescriptorBuilder.newBuilder(tableName).setRegionReplication(regionReplication)
.setColumnFamily(ColumnFamilyDescriptorBuilder.of(family));
TEST_UTIL.getAdmin().createTable(builder.build());
try (Connection conn = ConnectionFactory.createConnection(TEST_UTIL.getConfiguration());
RegionLocator locator = conn.getRegionLocator(tableName)) {
// Get locations of the regions of the table
List<HRegionLocation> locations = locator.getAllRegionLocations();
// The size of the returned locations should be 3
assertEquals(regionReplication, locations.size());
// The replicaIds of the returned locations should be 0, 1 and 2
Set<Integer> expectedReplicaIds =
IntStream.range(0, regionReplication).boxed().collect(Collectors.toSet());
for (HRegionLocation location : locations) {
assertTrue(expectedReplicaIds.remove(location.getRegion().getReplicaId()));
}
} finally {
TEST_UTIL.deleteTable(tableName);
}
}
@Test(expected = DoNotRetryIOException.class)
public void testClosedConnection() throws ServiceException, Throwable {
byte[] family = Bytes.toBytes("cf");
TableName tableName = TableName.valueOf(name.getMethodName());
TableDescriptorBuilder builder = TableDescriptorBuilder.newBuilder(tableName)
.setCoprocessor(MultiRowMutationEndpoint.class.getName())
.setColumnFamily(ColumnFamilyDescriptorBuilder.of(family));
TEST_UTIL.getAdmin().createTable(builder.build());
Connection conn = ConnectionFactory.createConnection(TEST_UTIL.getConfiguration());
// cache the location
try (Table table = conn.getTable(tableName)) {
table.get(new Get(Bytes.toBytes(0)));
} finally {
conn.close();
}
Batch.Call<MultiRowMutationService, MutateRowsResponse> callable = service -> {
throw new RuntimeException("Should not arrive here");
};
conn.getTable(tableName).coprocessorService(MultiRowMutationService.class,
HConstants.EMPTY_START_ROW, HConstants.EMPTY_END_ROW, callable);
}
// There is no assertion, but you need to confirm that there is no resource leak output from netty
@Test
public void testCancelConnectionMemoryLeak() throws IOException, InterruptedException {
TableName tableName = TableName.valueOf(name.getMethodName());
TEST_UTIL.createTable(tableName, FAM_NAM).close();
TEST_UTIL.getAdmin().balancerSwitch(false, true);
try (Connection connection = ConnectionFactory.createConnection(TEST_UTIL.getConfiguration());
Table table = connection.getTable(tableName)) {
table.get(new Get(Bytes.toBytes("1")));
ServerName sn = TEST_UTIL.getRSForFirstRegionInTable(tableName).getServerName();
RpcClient rpcClient = ((AsyncConnectionImpl) connection.toAsyncConnection()).rpcClient;
rpcClient.cancelConnections(sn);
Thread.sleep(1000);
System.gc();
Thread.sleep(1000);
}
}
}