blob: b408229f59fad07479f67b0fee00307565fdc8f2 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hbase.regionserver;
import static org.hamcrest.CoreMatchers.containsString;
import static org.hamcrest.Matchers.allOf;
import static org.hamcrest.Matchers.hasItem;
import static org.hamcrest.Matchers.is;
import static org.junit.Assert.*;
import java.io.IOException;
import java.io.StringWriter;
import java.util.Arrays;
import java.util.Collections;
import java.util.concurrent.ScheduledThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import org.apache.commons.lang3.StringUtils;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.HBaseClassTestRule;
import org.apache.hadoop.hbase.HBaseTestingUtil;
import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.LocalHBaseCluster;
import org.apache.hadoop.hbase.MatcherPredicate;
import org.apache.hadoop.hbase.ServerName;
import org.apache.hadoop.hbase.SingleProcessHBaseCluster.MiniHBaseClusterRegionServer;
import org.apache.hadoop.hbase.ipc.ServerNotRunningYetException;
import org.apache.hadoop.hbase.master.DecommissionedHostRejectedException;
import org.apache.hadoop.hbase.master.HMaster;
import org.apache.hadoop.hbase.master.ServerManager;
import org.apache.hadoop.hbase.testclassification.LargeTests;
import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
import org.apache.hadoop.hbase.util.IncrementingEnvironmentEdge;
import org.apache.hadoop.hbase.util.JVMClusterUtil.MasterThread;
import org.apache.hadoop.hbase.util.JVMClusterUtil.RegionServerThread;
import org.apache.hadoop.hbase.util.Threads;
import org.apache.zookeeper.KeeperException;
import org.junit.After;
import org.junit.Before;
import org.junit.ClassRule;
import org.junit.Test;
import org.junit.experimental.categories.Category;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.apache.hbase.thirdparty.com.google.common.util.concurrent.ThreadFactoryBuilder;
@Category(LargeTests.class)
public class TestRegionServerReportForDuty {
@ClassRule
public static final HBaseClassTestRule CLASS_RULE =
HBaseClassTestRule.forClass(TestRegionServerReportForDuty.class);
private static final Logger LOG = LoggerFactory.getLogger(TestRegionServerReportForDuty.class);
private static final long SLEEP_INTERVAL = 500;
private HBaseTestingUtil testUtil;
private LocalHBaseCluster cluster;
private RegionServerThread rs;
private RegionServerThread rs2;
private MasterThread master;
private MasterThread backupMaster;
@Before
public void setUp() throws Exception {
testUtil = new HBaseTestingUtil();
testUtil.startMiniDFSCluster(1);
testUtil.startMiniZKCluster(1);
testUtil.createRootDir();
cluster = new LocalHBaseCluster(testUtil.getConfiguration(), 0, 0);
}
@After
public void tearDown() throws Exception {
cluster.shutdown();
cluster.join();
testUtil.shutdownMiniZKCluster();
testUtil.shutdownMiniDFSCluster();
}
private static class LogCapturer {
private StringWriter sw = new StringWriter();
private org.apache.logging.log4j.core.appender.WriterAppender appender;
private org.apache.logging.log4j.core.Logger logger;
LogCapturer(org.apache.logging.log4j.core.Logger logger) {
this.logger = logger;
this.appender = org.apache.logging.log4j.core.appender.WriterAppender.newBuilder()
.setName("test").setTarget(sw).build();
this.logger.addAppender(this.appender);
}
String getOutput() {
return sw.toString();
}
public void stopCapturing() {
this.logger.removeAppender(this.appender);
}
}
/**
* This test HMaster class will always throw ServerNotRunningYetException if checked.
*/
public static class NeverInitializedMaster extends HMaster {
public NeverInitializedMaster(Configuration conf) throws IOException {
super(conf);
}
@Override
protected void checkServiceStarted() throws ServerNotRunningYetException {
throw new ServerNotRunningYetException("Server is not running yet");
}
}
/**
* Tests region server should backoff to report for duty if master is not ready.
*/
@Test
public void testReportForDutyBackoff() throws IOException, InterruptedException {
cluster.getConfiguration().set(HConstants.MASTER_IMPL, NeverInitializedMaster.class.getName());
master = cluster.addMaster();
master.start();
LogCapturer capturer =
new LogCapturer((org.apache.logging.log4j.core.Logger) org.apache.logging.log4j.LogManager
.getLogger(HRegionServer.class));
// Set sleep interval relatively low so that exponential backoff is more demanding.
int msginterval = 100;
cluster.getConfiguration().setInt("hbase.regionserver.msginterval", msginterval);
rs = cluster.addRegionServer();
rs.start();
int interval = 10_000;
Thread.sleep(interval);
capturer.stopCapturing();
String output = capturer.getOutput();
LOG.info("{}", output);
String failMsg = "reportForDuty failed;";
int count = StringUtils.countMatches(output, failMsg);
// Following asserts the actual retry number is in range (expectedRetry/2, expectedRetry*2).
// Ideally we can assert the exact retry count. We relax here to tolerate contention error.
int expectedRetry = (int) Math.ceil(Math.log(interval - msginterval));
assertTrue(String.format("reportForDuty retries %d times, less than expected min %d", count,
expectedRetry / 2), count > expectedRetry / 2);
assertTrue(String.format("reportForDuty retries %d times, more than expected max %d", count,
expectedRetry * 2), count < expectedRetry * 2);
}
/**
* Tests region sever reportForDuty with backup master becomes primary master after the first
* master goes away.
*/
@Test
public void testReportForDutyWithMasterChange() throws Exception {
// Start a master and wait for it to become the active/primary master.
// Use a random unique port
cluster.getConfiguration().setInt(HConstants.MASTER_PORT, HBaseTestingUtil.randomFreePort());
cluster.getConfiguration().setInt(ServerManager.WAIT_ON_REGIONSERVERS_MINTOSTART, 1);
cluster.getConfiguration().setInt(ServerManager.WAIT_ON_REGIONSERVERS_MAXTOSTART, 1);
master = cluster.addMaster();
rs = cluster.addRegionServer();
LOG.debug("Starting master: " + master.getMaster().getServerName());
master.start();
rs.start();
waitForClusterOnline(master);
// Add a 2nd region server
cluster.getConfiguration().set(HConstants.REGION_SERVER_IMPL, MyRegionServer.class.getName());
rs2 = cluster.addRegionServer();
// Start the region server. This region server will refresh RPC connection
// from the current active master to the next active master before completing
// reportForDuty
LOG.debug("Starting 2nd region server: " + rs2.getRegionServer().getServerName());
rs2.start();
waitForSecondRsStarted();
// Stop the current master.
master.getMaster().stop("Stopping master");
// Start a new master and use another random unique port
// Also let it wait for exactly 2 region severs to report in.
// TODO: Add handling bindexception. Random port is not enough!!! Flakie test!
cluster.getConfiguration().setInt(HConstants.MASTER_PORT, HBaseTestingUtil.randomFreePort());
cluster.getConfiguration().setInt(ServerManager.WAIT_ON_REGIONSERVERS_MINTOSTART, 2);
cluster.getConfiguration().setInt(ServerManager.WAIT_ON_REGIONSERVERS_MAXTOSTART, 2);
backupMaster = cluster.addMaster();
LOG.debug("Starting new master: " + backupMaster.getMaster().getServerName());
backupMaster.start();
waitForClusterOnline(backupMaster);
// Do some checking/asserts here.
assertTrue(backupMaster.getMaster().isActiveMaster());
assertTrue(backupMaster.getMaster().isInitialized());
assertEquals(backupMaster.getMaster().getServerManager().getOnlineServersList().size(), 2);
}
/**
* Tests region sever reportForDuty with RS RPC retry
*/
@Test
public void testReportForDutyWithRSRpcRetry() throws Exception {
ScheduledThreadPoolExecutor scheduledThreadPoolExecutor = new ScheduledThreadPoolExecutor(1,
new ThreadFactoryBuilder().setNameFormat("RSDelayedStart-pool-%d").setDaemon(true)
.setUncaughtExceptionHandler(Threads.LOGGING_EXCEPTION_HANDLER).build());
// Start a master and wait for it to become the active/primary master.
// Use a random unique port
cluster.getConfiguration().setInt(HConstants.MASTER_PORT, HBaseTestingUtil.randomFreePort());
// Override the default RS RPC retry interval of 100ms to 300ms
cluster.getConfiguration().setLong("hbase.regionserver.rpc.retry.interval", 300);
cluster.getConfiguration().setInt(ServerManager.WAIT_ON_REGIONSERVERS_MINTOSTART, 1);
cluster.getConfiguration().setInt(ServerManager.WAIT_ON_REGIONSERVERS_MAXTOSTART, 1);
master = cluster.addMaster();
rs = cluster.addRegionServer();
LOG.debug("Starting master: " + master.getMaster().getServerName());
master.start();
// Delay the RS start so that the meta assignment fails in first attempt and goes to retry block
scheduledThreadPoolExecutor.schedule(new Runnable() {
@Override
public void run() {
rs.start();
}
}, 1000, TimeUnit.MILLISECONDS);
waitForClusterOnline(master);
}
/**
* Tests that the RegionServer's reportForDuty gets rejected by the master when the master is
* configured to reject decommissioned hosts and when there is a match for the joining
* RegionServer in the list of decommissioned servers. Test case for HBASE-28342.
*/
@Test
public void testReportForDutyGetsRejectedByMasterWhenConfiguredToRejectDecommissionedHosts()
throws Exception {
// Start a master and wait for it to become the active/primary master.
// Use a random unique port
cluster.getConfiguration().setInt(HConstants.MASTER_PORT, HBaseTestingUtil.randomFreePort());
cluster.getConfiguration().setInt(ServerManager.WAIT_ON_REGIONSERVERS_MINTOSTART, 1);
cluster.getConfiguration().setInt(ServerManager.WAIT_ON_REGIONSERVERS_MAXTOSTART, 1);
// Set the cluster to reject decommissioned hosts
cluster.getConfiguration().setBoolean(HConstants.REJECT_DECOMMISSIONED_HOSTS_KEY, true);
master = cluster.addMaster();
rs = cluster.addRegionServer();
master.start();
rs.start();
waitForClusterOnline(master);
// Add a second decommissioned region server to the cluster, wait for it to fail reportForDuty
LogCapturer capturer =
new LogCapturer((org.apache.logging.log4j.core.Logger) org.apache.logging.log4j.LogManager
.getLogger(HRegionServer.class));
rs2 = cluster.addRegionServer();
master.getMaster().decommissionRegionServers(
Collections.singletonList(rs2.getRegionServer().getServerName()), false);
rs2.start();
// Assert that the second regionserver has aborted
testUtil.waitFor(TimeUnit.SECONDS.toMillis(90),
new MatcherPredicate<>(() -> rs2.getRegionServer().isAborted(), is(true)));
// Assert that the log messages for DecommissionedHostRejectedException exist in the logs
capturer.stopCapturing();
assertThat(capturer.getOutput(),
containsString("Master rejected startup because the host is considered decommissioned"));
/**
* Assert that the following log message occurred (one line):
* "org.apache.hadoop.hbase.master.DecommissionedHostRejectedException:
* org.apache.hadoop.hbase.master.DecommissionedHostRejectedException: Host localhost exists in
* the list of decommissioned servers and Master is configured to reject decommissioned hosts"
*/
assertThat(Arrays.asList(capturer.getOutput().split("\n")),
hasItem(allOf(containsString(DecommissionedHostRejectedException.class.getSimpleName()),
containsString(DecommissionedHostRejectedException.class.getSimpleName()),
containsString("Host " + rs2.getRegionServer().getServerName().getHostname()
+ " exists in the list of decommissioned servers and Master is configured to reject"
+ " decommissioned hosts"))));
assertThat(Arrays.asList(capturer.getOutput().split("\n")),
hasItem(
allOf(containsString("ABORTING region server " + rs2.getRegionServer().getServerName()),
containsString("Unhandled"),
containsString(DecommissionedHostRejectedException.class.getSimpleName()),
containsString("Host " + rs2.getRegionServer().getServerName().getHostname()
+ " exists in the list of decommissioned servers and Master is configured to reject"
+ " decommissioned hosts"))));
}
/**
* Tests region sever reportForDuty with a non-default environment edge
*/
@Test
public void testReportForDutyWithEnvironmentEdge() throws Exception {
// Start a master and wait for it to become the active/primary master.
// Use a random unique port
cluster.getConfiguration().setInt(HConstants.MASTER_PORT, HBaseTestingUtil.randomFreePort());
// Set the dispatch and retry delay to 0 since we want the rpc request to be sent immediately
cluster.getConfiguration().setInt("hbase.procedure.remote.dispatcher.delay.msec", 0);
cluster.getConfiguration().setLong("hbase.regionserver.rpc.retry.interval", 0);
cluster.getConfiguration().setInt(ServerManager.WAIT_ON_REGIONSERVERS_MINTOSTART, 1);
cluster.getConfiguration().setInt(ServerManager.WAIT_ON_REGIONSERVERS_MAXTOSTART, 1);
// Inject non-default environment edge
IncrementingEnvironmentEdge edge = new IncrementingEnvironmentEdge();
EnvironmentEdgeManager.injectEdge(edge);
master = cluster.addMaster();
rs = cluster.addRegionServer();
LOG.debug("Starting master: " + master.getMaster().getServerName());
master.start();
rs.start();
waitForClusterOnline(master);
}
private void waitForClusterOnline(MasterThread master) throws InterruptedException {
while (true) {
if (master.getMaster().isInitialized()) {
break;
}
Thread.sleep(SLEEP_INTERVAL);
LOG.debug("Waiting for master to come online ...");
}
rs.waitForServerOnline();
}
private void waitForSecondRsStarted() throws InterruptedException {
while (true) {
if (((MyRegionServer) rs2.getRegionServer()).getRpcStubCreatedFlag() == true) {
break;
}
Thread.sleep(SLEEP_INTERVAL);
LOG.debug("Waiting 2nd RS to be started ...");
}
}
// Create a Region Server that provide a hook so that we can wait for the master switch over
// before continuing reportForDuty to the mater.
// The idea is that we get a RPC connection to the first active master, then we wait.
// The first master goes down, the second master becomes the active master. The region
// server continues reportForDuty. It should succeed with the new master.
public static class MyRegionServer extends MiniHBaseClusterRegionServer {
private ServerName sn;
// This flag is to make sure this rs has obtained the rpcStub to the first master.
// The first master will go down after this.
private boolean rpcStubCreatedFlag = false;
private boolean masterChanged = false;
public MyRegionServer(Configuration conf)
throws IOException, KeeperException, InterruptedException {
super(conf);
}
@Override
protected synchronized ServerName createRegionServerStatusStub(boolean refresh) {
sn = super.createRegionServerStatusStub(refresh);
rpcStubCreatedFlag = true;
// Wait for master switch over. Only do this for the second region server.
while (!masterChanged) {
ServerName newSn = super.getMasterAddressTracker().getMasterAddress(true);
if (newSn != null && !newSn.equals(sn)) {
masterChanged = true;
break;
}
try {
Thread.sleep(SLEEP_INTERVAL);
} catch (InterruptedException e) {
return null;
}
LOG.debug("Waiting for master switch over ... ");
}
return sn;
}
public boolean getRpcStubCreatedFlag() {
return rpcStubCreatedFlag;
}
}
}