blob: 7fc95523455762bacd9937d388d12d8cd94fbaff [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hbase.client;
import static org.junit.Assert.assertArrayEquals;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertTrue;
import java.io.File;
import org.apache.commons.io.FileUtils;
import org.apache.hadoop.hbase.HBaseClassTestRule;
import org.apache.hadoop.hbase.HBaseTestingUtility;
import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.MiniHBaseCluster;
import org.apache.hadoop.hbase.ServerName;
import org.apache.hadoop.hbase.StartMiniClusterOption;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.TableNameTestRule;
import org.apache.hadoop.hbase.master.HMaster;
import org.apache.hadoop.hbase.master.assignment.AssignmentTestingUtil;
import org.apache.hadoop.hbase.regionserver.HRegionServer;
import org.apache.hadoop.hbase.testclassification.ClientTests;
import org.apache.hadoop.hbase.testclassification.MediumTests;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.hbase.util.JVMClusterUtil.RegionServerThread;
import org.apache.hadoop.hbase.zookeeper.MiniZooKeeperCluster;
import org.junit.AfterClass;
import org.junit.BeforeClass;
import org.junit.ClassRule;
import org.junit.Rule;
import org.junit.Test;
import org.junit.experimental.categories.Category;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@Category({ ClientTests.class, MediumTests.class })
public class TestSeparateClientZKCluster {
private static final Logger LOG = LoggerFactory.getLogger(TestSeparateClientZKCluster.class);
private static final HBaseTestingUtility TEST_UTIL = new HBaseTestingUtility();
private static final File clientZkDir =
new File(TEST_UTIL.getDataTestDir("TestSeparateClientZKCluster").toString());
private static final int ZK_SESSION_TIMEOUT = 5000;
private static MiniZooKeeperCluster clientZkCluster;
private final byte[] family = Bytes.toBytes("cf");
private final byte[] qualifier = Bytes.toBytes("c1");
private final byte[] row = Bytes.toBytes("row");
private final byte[] value = Bytes.toBytes("v1");
private final byte[] newVal = Bytes.toBytes("v2");
@Rule
public TableNameTestRule name = new TableNameTestRule();
@ClassRule
public static final HBaseClassTestRule CLASS_RULE =
HBaseClassTestRule.forClass(TestSeparateClientZKCluster.class);
@BeforeClass
public static void beforeAllTests() throws Exception {
int clientZkPort = 21828;
clientZkCluster = new MiniZooKeeperCluster(TEST_UTIL.getConfiguration());
clientZkCluster.setDefaultClientPort(clientZkPort);
clientZkCluster.startup(clientZkDir);
// reduce the retry number and start log counter
TEST_UTIL.getConfiguration().setInt(HConstants.HBASE_CLIENT_RETRIES_NUMBER, 2);
TEST_UTIL.getConfiguration().setInt("hbase.client.start.log.errors.counter", -1);
TEST_UTIL.getConfiguration().setInt("zookeeper.recovery.retry", 1);
// core settings for testing client ZK cluster
TEST_UTIL.getConfiguration().setClass(HConstants.CLIENT_CONNECTION_REGISTRY_IMPL_CONF_KEY,
ZKConnectionRegistry.class, ConnectionRegistry.class);
TEST_UTIL.getConfiguration().set(HConstants.CLIENT_ZOOKEEPER_QUORUM, HConstants.LOCALHOST);
TEST_UTIL.getConfiguration().setInt(HConstants.CLIENT_ZOOKEEPER_CLIENT_PORT, clientZkPort);
// reduce zk session timeout to easier trigger session expiration
TEST_UTIL.getConfiguration().setInt(HConstants.ZK_SESSION_TIMEOUT, ZK_SESSION_TIMEOUT);
// Start a cluster with 2 masters and 3 regionservers.
StartMiniClusterOption option =
StartMiniClusterOption.builder().numMasters(2).numRegionServers(3).numDataNodes(3).build();
TEST_UTIL.startMiniCluster(option);
}
@AfterClass
public static void afterAllTests() throws Exception {
TEST_UTIL.shutdownMiniCluster();
clientZkCluster.shutdown();
FileUtils.deleteDirectory(clientZkDir);
}
@Test
public void testBasicOperation() throws Exception {
TableName tn = name.getTableName();
// create table
Connection conn = TEST_UTIL.getConnection();
try (Admin admin = conn.getAdmin(); Table table = conn.getTable(tn)) {
ColumnFamilyDescriptorBuilder cfDescBuilder =
ColumnFamilyDescriptorBuilder.newBuilder(family);
TableDescriptorBuilder tableDescBuilder =
TableDescriptorBuilder.newBuilder(tn).setColumnFamily(cfDescBuilder.build());
admin.createTable(tableDescBuilder.build());
// test simple get and put
Put put = new Put(row);
put.addColumn(family, qualifier, value);
table.put(put);
Get get = new Get(row);
Result result = table.get(get);
LOG.debug("Result: " + Bytes.toString(result.getValue(family, qualifier)));
assertArrayEquals(value, result.getValue(family, qualifier));
}
}
@Test
public void testMasterSwitch() throws Exception {
// get an admin instance and issue some request first
Connection conn = TEST_UTIL.getConnection();
try (Admin admin = conn.getAdmin()) {
LOG.debug("Tables: " + admin.listTableDescriptors());
MiniHBaseCluster cluster = TEST_UTIL.getHBaseCluster();
// switch active master
HMaster master = cluster.getMaster();
master.stopMaster();
LOG.info("Stopped master {}", master.getServerName());
while (!master.isShutDown()) {
Thread.sleep(200);
}
LOG.info("Shutdown master {}", master.getServerName());
while (cluster.getMaster() == null || !cluster.getMaster().isInitialized()) {
LOG.info("Get master {}",
cluster.getMaster() == null ? "null" : cluster.getMaster().getServerName());
Thread.sleep(200);
}
LOG.info("Got master {}", cluster.getMaster().getServerName());
// confirm client access still works
assertTrue(admin.balance(false));
}
}
@Test
public void testMetaRegionMove() throws Exception {
TableName tn = name.getTableName();
// create table
Connection conn = TEST_UTIL.getConnection();
try (Admin admin = conn.getAdmin();
Table table = conn.getTable(tn);
RegionLocator locator = conn.getRegionLocator(tn)) {
MiniHBaseCluster cluster = TEST_UTIL.getHBaseCluster();
ColumnFamilyDescriptorBuilder cfDescBuilder =
ColumnFamilyDescriptorBuilder.newBuilder(family);
TableDescriptorBuilder tableDescBuilder =
TableDescriptorBuilder.newBuilder(tn).setColumnFamily(cfDescBuilder.build());
admin.createTable(tableDescBuilder.build());
// issue some requests to cache the region location
Put put = new Put(row);
put.addColumn(family, qualifier, value);
table.put(put);
Get get = new Get(row);
Result result = table.get(get);
// move meta region and confirm client could detect
ServerName destServerName = null;
for (RegionServerThread rst : cluster.getLiveRegionServerThreads()) {
ServerName name = rst.getRegionServer().getServerName();
if (!name.equals(cluster.getServerHoldingMeta())) {
destServerName = name;
break;
}
}
admin.move(RegionInfoBuilder.FIRST_META_REGIONINFO.getEncodedNameAsBytes(), destServerName);
LOG.debug("Finished moving meta");
// invalidate client cache
RegionInfo region = locator.getRegionLocation(row).getRegion();
ServerName currentServer = cluster.getServerHoldingRegion(tn, region.getRegionName());
for (RegionServerThread rst : cluster.getLiveRegionServerThreads()) {
ServerName name = rst.getRegionServer().getServerName();
if (!name.equals(currentServer)) {
destServerName = name;
break;
}
}
admin.move(region.getEncodedNameAsBytes(), destServerName);
LOG.debug("Finished moving user region");
put = new Put(row);
put.addColumn(family, qualifier, newVal);
table.put(put);
result = table.get(get);
LOG.debug("Result: " + Bytes.toString(result.getValue(family, qualifier)));
assertArrayEquals(newVal, result.getValue(family, qualifier));
}
}
@Test
public void testMetaMoveDuringClientZkClusterRestart() throws Exception {
TableName tn = name.getTableName();
// create table
Connection conn = TEST_UTIL.getConnection();
try (Admin admin = conn.getAdmin(); Table table = conn.getTable(tn)) {
ColumnFamilyDescriptorBuilder cfDescBuilder =
ColumnFamilyDescriptorBuilder.newBuilder(family);
TableDescriptorBuilder tableDescBuilder =
TableDescriptorBuilder.newBuilder(tn).setColumnFamily(cfDescBuilder.build());
admin.createTable(tableDescBuilder.build());
// put some data
Put put = new Put(row);
put.addColumn(family, qualifier, value);
table.put(put);
// invalid connection cache
conn.clearRegionLocationCache();
// stop client zk cluster
clientZkCluster.shutdown();
// stop current meta server and confirm the server shutdown process
// is not affected by client ZK crash
MiniHBaseCluster cluster = TEST_UTIL.getHBaseCluster();
int metaServerId = cluster.getServerWithMeta();
HRegionServer metaServer = cluster.getRegionServer(metaServerId);
metaServer.stop("Stop current RS holding meta region");
while (!metaServer.isShutDown()) {
Thread.sleep(200);
}
// wait for meta region online
AssignmentTestingUtil.waitForAssignment(cluster.getMaster().getAssignmentManager(),
RegionInfoBuilder.FIRST_META_REGIONINFO);
// wait some long time to make sure we will retry sync data to client ZK until data set
Thread.sleep(10000);
clientZkCluster.startup(clientZkDir);
// new request should pass
Get get = new Get(row);
Result result = table.get(get);
LOG.debug("Result: " + Bytes.toString(result.getValue(family, qualifier)));
assertArrayEquals(value, result.getValue(family, qualifier));
}
}
@Test
public void testAsyncTable() throws Exception {
TableName tn = name.getTableName();
ColumnFamilyDescriptorBuilder cfDescBuilder = ColumnFamilyDescriptorBuilder.newBuilder(family);
TableDescriptorBuilder tableDescBuilder =
TableDescriptorBuilder.newBuilder(tn).setColumnFamily(cfDescBuilder.build());
try (AsyncConnection ASYNC_CONN =
ConnectionFactory.createAsyncConnection(TEST_UTIL.getConfiguration()).get()) {
ASYNC_CONN.getAdmin().createTable(tableDescBuilder.build()).get();
AsyncTable<?> table = ASYNC_CONN.getTable(tn);
// put some data
Put put = new Put(row);
put.addColumn(family, qualifier, value);
table.put(put).get();
// get and verify
Get get = new Get(row);
Result result = table.get(get).get();
LOG.debug("Result: " + Bytes.toString(result.getValue(family, qualifier)));
assertArrayEquals(value, result.getValue(family, qualifier));
}
}
@Test
public void testChangeMetaReplicaCount() throws Exception {
Admin admin = TEST_UTIL.getAdmin();
try (RegionLocator locator =
TEST_UTIL.getConnection().getRegionLocator(TableName.META_TABLE_NAME)) {
assertEquals(1, locator.getAllRegionLocations().size());
HBaseTestingUtility.setReplicas(admin, TableName.META_TABLE_NAME, 3);
TEST_UTIL.waitFor(30000, () -> locator.getAllRegionLocations().size() == 3);
HBaseTestingUtility.setReplicas(admin, TableName.META_TABLE_NAME, 2);
TEST_UTIL.waitFor(30000, () -> locator.getAllRegionLocations().size() == 2);
HBaseTestingUtility.setReplicas(admin, TableName.META_TABLE_NAME, 1);
TEST_UTIL.waitFor(30000, () -> locator.getAllRegionLocations().size() == 1);
}
}
}