| /* |
| * Licensed to the Apache Software Foundation (ASF) under one |
| * or more contributor license agreements. See the NOTICE file |
| * distributed with this work for additional information |
| * regarding copyright ownership. The ASF licenses this file |
| * to you under the Apache License, Version 2.0 (the |
| * "License"); you may not use this file except in compliance |
| * with the License. You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, software |
| * distributed under the License is distributed on an "AS IS" BASIS, |
| * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| * See the License for the specific language governing permissions and |
| * limitations under the License. |
| */ |
| package org.apache.hadoop.hbase.regionserver; |
| |
| import static org.hamcrest.CoreMatchers.containsString; |
| import static org.hamcrest.Matchers.allOf; |
| import static org.hamcrest.Matchers.hasItem; |
| import static org.hamcrest.Matchers.is; |
| import static org.junit.Assert.*; |
| |
| import java.io.IOException; |
| import java.io.StringWriter; |
| import java.util.Arrays; |
| import java.util.Collections; |
| import java.util.concurrent.ScheduledThreadPoolExecutor; |
| import java.util.concurrent.TimeUnit; |
| import org.apache.commons.lang3.StringUtils; |
| import org.apache.hadoop.conf.Configuration; |
| import org.apache.hadoop.hbase.HBaseClassTestRule; |
| import org.apache.hadoop.hbase.HBaseTestingUtil; |
| import org.apache.hadoop.hbase.HConstants; |
| import org.apache.hadoop.hbase.LocalHBaseCluster; |
| import org.apache.hadoop.hbase.MatcherPredicate; |
| import org.apache.hadoop.hbase.ServerName; |
| import org.apache.hadoop.hbase.SingleProcessHBaseCluster.MiniHBaseClusterRegionServer; |
| import org.apache.hadoop.hbase.ipc.ServerNotRunningYetException; |
| import org.apache.hadoop.hbase.master.DecommissionedHostRejectedException; |
| import org.apache.hadoop.hbase.master.HMaster; |
| import org.apache.hadoop.hbase.master.ServerManager; |
| import org.apache.hadoop.hbase.testclassification.LargeTests; |
| import org.apache.hadoop.hbase.util.EnvironmentEdgeManager; |
| import org.apache.hadoop.hbase.util.IncrementingEnvironmentEdge; |
| import org.apache.hadoop.hbase.util.JVMClusterUtil.MasterThread; |
| import org.apache.hadoop.hbase.util.JVMClusterUtil.RegionServerThread; |
| import org.apache.hadoop.hbase.util.Threads; |
| import org.apache.zookeeper.KeeperException; |
| import org.junit.After; |
| import org.junit.Before; |
| import org.junit.ClassRule; |
| import org.junit.Test; |
| import org.junit.experimental.categories.Category; |
| import org.slf4j.Logger; |
| import org.slf4j.LoggerFactory; |
| |
| import org.apache.hbase.thirdparty.com.google.common.util.concurrent.ThreadFactoryBuilder; |
| |
| @Category(LargeTests.class) |
| public class TestRegionServerReportForDuty { |
| |
| @ClassRule |
| public static final HBaseClassTestRule CLASS_RULE = |
| HBaseClassTestRule.forClass(TestRegionServerReportForDuty.class); |
| |
| private static final Logger LOG = LoggerFactory.getLogger(TestRegionServerReportForDuty.class); |
| |
| private static final long SLEEP_INTERVAL = 500; |
| |
| private HBaseTestingUtil testUtil; |
| private LocalHBaseCluster cluster; |
| private RegionServerThread rs; |
| private RegionServerThread rs2; |
| private MasterThread master; |
| private MasterThread backupMaster; |
| |
| @Before |
| public void setUp() throws Exception { |
| testUtil = new HBaseTestingUtil(); |
| testUtil.startMiniDFSCluster(1); |
| testUtil.startMiniZKCluster(1); |
| testUtil.createRootDir(); |
| cluster = new LocalHBaseCluster(testUtil.getConfiguration(), 0, 0); |
| } |
| |
| @After |
| public void tearDown() throws Exception { |
| cluster.shutdown(); |
| cluster.join(); |
| testUtil.shutdownMiniZKCluster(); |
| testUtil.shutdownMiniDFSCluster(); |
| } |
| |
| private static class LogCapturer { |
| private StringWriter sw = new StringWriter(); |
| private org.apache.logging.log4j.core.appender.WriterAppender appender; |
| private org.apache.logging.log4j.core.Logger logger; |
| |
| LogCapturer(org.apache.logging.log4j.core.Logger logger) { |
| this.logger = logger; |
| this.appender = org.apache.logging.log4j.core.appender.WriterAppender.newBuilder() |
| .setName("test").setTarget(sw).build(); |
| this.logger.addAppender(this.appender); |
| } |
| |
| String getOutput() { |
| return sw.toString(); |
| } |
| |
| public void stopCapturing() { |
| this.logger.removeAppender(this.appender); |
| } |
| } |
| |
| /** |
| * This test HMaster class will always throw ServerNotRunningYetException if checked. |
| */ |
| public static class NeverInitializedMaster extends HMaster { |
| public NeverInitializedMaster(Configuration conf) throws IOException { |
| super(conf); |
| } |
| |
| @Override |
| protected void checkServiceStarted() throws ServerNotRunningYetException { |
| throw new ServerNotRunningYetException("Server is not running yet"); |
| } |
| } |
| |
| /** |
| * Tests region server should backoff to report for duty if master is not ready. |
| */ |
| @Test |
| public void testReportForDutyBackoff() throws IOException, InterruptedException { |
| cluster.getConfiguration().set(HConstants.MASTER_IMPL, NeverInitializedMaster.class.getName()); |
| master = cluster.addMaster(); |
| master.start(); |
| |
| LogCapturer capturer = |
| new LogCapturer((org.apache.logging.log4j.core.Logger) org.apache.logging.log4j.LogManager |
| .getLogger(HRegionServer.class)); |
| // Set sleep interval relatively low so that exponential backoff is more demanding. |
| int msginterval = 100; |
| cluster.getConfiguration().setInt("hbase.regionserver.msginterval", msginterval); |
| rs = cluster.addRegionServer(); |
| rs.start(); |
| |
| int interval = 10_000; |
| Thread.sleep(interval); |
| capturer.stopCapturing(); |
| String output = capturer.getOutput(); |
| LOG.info("{}", output); |
| String failMsg = "reportForDuty failed;"; |
| int count = StringUtils.countMatches(output, failMsg); |
| |
| // Following asserts the actual retry number is in range (expectedRetry/2, expectedRetry*2). |
| // Ideally we can assert the exact retry count. We relax here to tolerate contention error. |
| int expectedRetry = (int) Math.ceil(Math.log(interval - msginterval)); |
| assertTrue(String.format("reportForDuty retries %d times, less than expected min %d", count, |
| expectedRetry / 2), count > expectedRetry / 2); |
| assertTrue(String.format("reportForDuty retries %d times, more than expected max %d", count, |
| expectedRetry * 2), count < expectedRetry * 2); |
| } |
| |
| /** |
| * Tests region sever reportForDuty with backup master becomes primary master after the first |
| * master goes away. |
| */ |
| @Test |
| public void testReportForDutyWithMasterChange() throws Exception { |
| |
| // Start a master and wait for it to become the active/primary master. |
| // Use a random unique port |
| cluster.getConfiguration().setInt(HConstants.MASTER_PORT, HBaseTestingUtil.randomFreePort()); |
| cluster.getConfiguration().setInt(ServerManager.WAIT_ON_REGIONSERVERS_MINTOSTART, 1); |
| cluster.getConfiguration().setInt(ServerManager.WAIT_ON_REGIONSERVERS_MAXTOSTART, 1); |
| master = cluster.addMaster(); |
| rs = cluster.addRegionServer(); |
| LOG.debug("Starting master: " + master.getMaster().getServerName()); |
| master.start(); |
| rs.start(); |
| |
| waitForClusterOnline(master); |
| |
| // Add a 2nd region server |
| cluster.getConfiguration().set(HConstants.REGION_SERVER_IMPL, MyRegionServer.class.getName()); |
| rs2 = cluster.addRegionServer(); |
| // Start the region server. This region server will refresh RPC connection |
| // from the current active master to the next active master before completing |
| // reportForDuty |
| LOG.debug("Starting 2nd region server: " + rs2.getRegionServer().getServerName()); |
| rs2.start(); |
| |
| waitForSecondRsStarted(); |
| |
| // Stop the current master. |
| master.getMaster().stop("Stopping master"); |
| |
| // Start a new master and use another random unique port |
| // Also let it wait for exactly 2 region severs to report in. |
| // TODO: Add handling bindexception. Random port is not enough!!! Flakie test! |
| cluster.getConfiguration().setInt(HConstants.MASTER_PORT, HBaseTestingUtil.randomFreePort()); |
| cluster.getConfiguration().setInt(ServerManager.WAIT_ON_REGIONSERVERS_MINTOSTART, 2); |
| cluster.getConfiguration().setInt(ServerManager.WAIT_ON_REGIONSERVERS_MAXTOSTART, 2); |
| backupMaster = cluster.addMaster(); |
| LOG.debug("Starting new master: " + backupMaster.getMaster().getServerName()); |
| backupMaster.start(); |
| |
| waitForClusterOnline(backupMaster); |
| |
| // Do some checking/asserts here. |
| assertTrue(backupMaster.getMaster().isActiveMaster()); |
| assertTrue(backupMaster.getMaster().isInitialized()); |
| assertEquals(backupMaster.getMaster().getServerManager().getOnlineServersList().size(), 2); |
| |
| } |
| |
| /** |
| * Tests region sever reportForDuty with RS RPC retry |
| */ |
| @Test |
| public void testReportForDutyWithRSRpcRetry() throws Exception { |
| ScheduledThreadPoolExecutor scheduledThreadPoolExecutor = new ScheduledThreadPoolExecutor(1, |
| new ThreadFactoryBuilder().setNameFormat("RSDelayedStart-pool-%d").setDaemon(true) |
| .setUncaughtExceptionHandler(Threads.LOGGING_EXCEPTION_HANDLER).build()); |
| |
| // Start a master and wait for it to become the active/primary master. |
| // Use a random unique port |
| cluster.getConfiguration().setInt(HConstants.MASTER_PORT, HBaseTestingUtil.randomFreePort()); |
| // Override the default RS RPC retry interval of 100ms to 300ms |
| cluster.getConfiguration().setLong("hbase.regionserver.rpc.retry.interval", 300); |
| cluster.getConfiguration().setInt(ServerManager.WAIT_ON_REGIONSERVERS_MINTOSTART, 1); |
| cluster.getConfiguration().setInt(ServerManager.WAIT_ON_REGIONSERVERS_MAXTOSTART, 1); |
| master = cluster.addMaster(); |
| rs = cluster.addRegionServer(); |
| LOG.debug("Starting master: " + master.getMaster().getServerName()); |
| master.start(); |
| // Delay the RS start so that the meta assignment fails in first attempt and goes to retry block |
| scheduledThreadPoolExecutor.schedule(new Runnable() { |
| @Override |
| public void run() { |
| rs.start(); |
| } |
| }, 1000, TimeUnit.MILLISECONDS); |
| |
| waitForClusterOnline(master); |
| } |
| |
| /** |
| * Tests that the RegionServer's reportForDuty gets rejected by the master when the master is |
| * configured to reject decommissioned hosts and when there is a match for the joining |
| * RegionServer in the list of decommissioned servers. Test case for HBASE-28342. |
| */ |
| @Test |
| public void testReportForDutyGetsRejectedByMasterWhenConfiguredToRejectDecommissionedHosts() |
| throws Exception { |
| // Start a master and wait for it to become the active/primary master. |
| // Use a random unique port |
| cluster.getConfiguration().setInt(HConstants.MASTER_PORT, HBaseTestingUtil.randomFreePort()); |
| cluster.getConfiguration().setInt(ServerManager.WAIT_ON_REGIONSERVERS_MINTOSTART, 1); |
| cluster.getConfiguration().setInt(ServerManager.WAIT_ON_REGIONSERVERS_MAXTOSTART, 1); |
| |
| // Set the cluster to reject decommissioned hosts |
| cluster.getConfiguration().setBoolean(HConstants.REJECT_DECOMMISSIONED_HOSTS_KEY, true); |
| |
| master = cluster.addMaster(); |
| rs = cluster.addRegionServer(); |
| master.start(); |
| rs.start(); |
| waitForClusterOnline(master); |
| |
| // Add a second decommissioned region server to the cluster, wait for it to fail reportForDuty |
| LogCapturer capturer = |
| new LogCapturer((org.apache.logging.log4j.core.Logger) org.apache.logging.log4j.LogManager |
| .getLogger(HRegionServer.class)); |
| |
| rs2 = cluster.addRegionServer(); |
| master.getMaster().decommissionRegionServers( |
| Collections.singletonList(rs2.getRegionServer().getServerName()), false); |
| rs2.start(); |
| |
| // Assert that the second regionserver has aborted |
| testUtil.waitFor(TimeUnit.SECONDS.toMillis(90), |
| new MatcherPredicate<>(() -> rs2.getRegionServer().isAborted(), is(true))); |
| |
| // Assert that the log messages for DecommissionedHostRejectedException exist in the logs |
| capturer.stopCapturing(); |
| |
| assertThat(capturer.getOutput(), |
| containsString("Master rejected startup because the host is considered decommissioned")); |
| |
| /** |
| * Assert that the following log message occurred (one line): |
| * "org.apache.hadoop.hbase.master.DecommissionedHostRejectedException: |
| * org.apache.hadoop.hbase.master.DecommissionedHostRejectedException: Host localhost exists in |
| * the list of decommissioned servers and Master is configured to reject decommissioned hosts" |
| */ |
| assertThat(Arrays.asList(capturer.getOutput().split("\n")), |
| hasItem(allOf(containsString(DecommissionedHostRejectedException.class.getSimpleName()), |
| containsString(DecommissionedHostRejectedException.class.getSimpleName()), |
| containsString("Host " + rs2.getRegionServer().getServerName().getHostname() |
| + " exists in the list of decommissioned servers and Master is configured to reject" |
| + " decommissioned hosts")))); |
| |
| assertThat(Arrays.asList(capturer.getOutput().split("\n")), |
| hasItem( |
| allOf(containsString("ABORTING region server " + rs2.getRegionServer().getServerName()), |
| containsString("Unhandled"), |
| containsString(DecommissionedHostRejectedException.class.getSimpleName()), |
| containsString("Host " + rs2.getRegionServer().getServerName().getHostname() |
| + " exists in the list of decommissioned servers and Master is configured to reject" |
| + " decommissioned hosts")))); |
| } |
| |
| /** |
| * Tests region sever reportForDuty with a non-default environment edge |
| */ |
| @Test |
| public void testReportForDutyWithEnvironmentEdge() throws Exception { |
| // Start a master and wait for it to become the active/primary master. |
| // Use a random unique port |
| cluster.getConfiguration().setInt(HConstants.MASTER_PORT, HBaseTestingUtil.randomFreePort()); |
| // Set the dispatch and retry delay to 0 since we want the rpc request to be sent immediately |
| cluster.getConfiguration().setInt("hbase.procedure.remote.dispatcher.delay.msec", 0); |
| cluster.getConfiguration().setLong("hbase.regionserver.rpc.retry.interval", 0); |
| |
| cluster.getConfiguration().setInt(ServerManager.WAIT_ON_REGIONSERVERS_MINTOSTART, 1); |
| cluster.getConfiguration().setInt(ServerManager.WAIT_ON_REGIONSERVERS_MAXTOSTART, 1); |
| |
| // Inject non-default environment edge |
| IncrementingEnvironmentEdge edge = new IncrementingEnvironmentEdge(); |
| EnvironmentEdgeManager.injectEdge(edge); |
| master = cluster.addMaster(); |
| rs = cluster.addRegionServer(); |
| LOG.debug("Starting master: " + master.getMaster().getServerName()); |
| master.start(); |
| rs.start(); |
| waitForClusterOnline(master); |
| } |
| |
| private void waitForClusterOnline(MasterThread master) throws InterruptedException { |
| while (true) { |
| if (master.getMaster().isInitialized()) { |
| break; |
| } |
| Thread.sleep(SLEEP_INTERVAL); |
| LOG.debug("Waiting for master to come online ..."); |
| } |
| rs.waitForServerOnline(); |
| } |
| |
| private void waitForSecondRsStarted() throws InterruptedException { |
| while (true) { |
| if (((MyRegionServer) rs2.getRegionServer()).getRpcStubCreatedFlag() == true) { |
| break; |
| } |
| Thread.sleep(SLEEP_INTERVAL); |
| LOG.debug("Waiting 2nd RS to be started ..."); |
| } |
| } |
| |
| // Create a Region Server that provide a hook so that we can wait for the master switch over |
| // before continuing reportForDuty to the mater. |
| // The idea is that we get a RPC connection to the first active master, then we wait. |
| // The first master goes down, the second master becomes the active master. The region |
| // server continues reportForDuty. It should succeed with the new master. |
| public static class MyRegionServer extends MiniHBaseClusterRegionServer { |
| |
| private ServerName sn; |
| // This flag is to make sure this rs has obtained the rpcStub to the first master. |
| // The first master will go down after this. |
| private boolean rpcStubCreatedFlag = false; |
| private boolean masterChanged = false; |
| |
| public MyRegionServer(Configuration conf) |
| throws IOException, KeeperException, InterruptedException { |
| super(conf); |
| } |
| |
| @Override |
| protected synchronized ServerName createRegionServerStatusStub(boolean refresh) { |
| sn = super.createRegionServerStatusStub(refresh); |
| rpcStubCreatedFlag = true; |
| |
| // Wait for master switch over. Only do this for the second region server. |
| while (!masterChanged) { |
| ServerName newSn = super.getMasterAddressTracker().getMasterAddress(true); |
| if (newSn != null && !newSn.equals(sn)) { |
| masterChanged = true; |
| break; |
| } |
| try { |
| Thread.sleep(SLEEP_INTERVAL); |
| } catch (InterruptedException e) { |
| return null; |
| } |
| LOG.debug("Waiting for master switch over ... "); |
| } |
| return sn; |
| } |
| |
| public boolean getRpcStubCreatedFlag() { |
| return rpcStubCreatedFlag; |
| } |
| } |
| } |