HBASE-28342 Decommissioned hosts should be rejected by the HMaster (#5681)

Signed-off by: Nick Dimiduk <ndimiduk@apache.org>
diff --git a/hbase-common/src/main/java/org/apache/hadoop/hbase/HConstants.java b/hbase-common/src/main/java/org/apache/hadoop/hbase/HConstants.java
index 5b53d2b..9597ec2 100644
--- a/hbase-common/src/main/java/org/apache/hadoop/hbase/HConstants.java
+++ b/hbase-common/src/main/java/org/apache/hadoop/hbase/HConstants.java
@@ -1622,6 +1622,20 @@
    */
   public final static boolean HBASE_SERVER_USEIP_ENABLED_DEFAULT = false;
 
+  /**
+   * Should the HMaster reject hosts of decommissioned RegionServers, bypass matching their port and
+   * startcode parts of their ServerName or not? When True, the HMaster will reject a RegionServer's
+   * request to `reportForDuty` if it's hostname exists in the list of decommissioned RegionServers
+   * it maintains internally. Added in HBASE-28342.
+   */
+  public final static String REJECT_DECOMMISSIONED_HOSTS_KEY =
+    "hbase.master.reject.decommissioned.hosts";
+
+  /**
+   * Default value of {@link #REJECT_DECOMMISSIONED_HOSTS_KEY}
+   */
+  public final static boolean REJECT_DECOMMISSIONED_HOSTS_DEFAULT = false;
+
   private HConstants() {
     // Can't be instantiated with this ctor.
   }
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/DecommissionedHostRejectedException.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/DecommissionedHostRejectedException.java
new file mode 100644
index 0000000..3d28b1e
--- /dev/null
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/DecommissionedHostRejectedException.java
@@ -0,0 +1,28 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.master;
+
+import org.apache.hadoop.hbase.HBaseIOException;
+import org.apache.yetus.audience.InterfaceAudience;
+
+@InterfaceAudience.Private
+public class DecommissionedHostRejectedException extends HBaseIOException {
+  public DecommissionedHostRejectedException(String message) {
+    super(message);
+  }
+}
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/HMaster.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/HMaster.java
index 88b82f0..ddef3e2 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/HMaster.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/HMaster.java
@@ -546,7 +546,6 @@
         HConstants.DEFAULT_HBASE_MASTER_BALANCER_MAX_RIT_PERCENT);
 
       // Do we publish the status?
-
       boolean shouldPublish =
         conf.getBoolean(HConstants.STATUS_PUBLISHED, HConstants.STATUS_PUBLISHED_DEFAULT);
       Class<? extends ClusterStatusPublisher.Publisher> publisherClass =
@@ -997,7 +996,10 @@
     masterRegion = MasterRegionFactory.create(this);
     rsListStorage = new MasterRegionServerList(masterRegion, this);
 
+    // Initialize the ServerManager and register it as a configuration observer
     this.serverManager = createServerManager(this, rsListStorage);
+    this.configurationManager.registerObserver(this.serverManager);
+
     this.syncReplicationReplayWALManager = new SyncReplicationReplayWALManager(this);
     if (
       !conf.getBoolean(HBASE_SPLIT_WAL_COORDINATED_BY_ZK, DEFAULT_HBASE_SPLIT_COORDINATED_BY_ZK)
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/ServerManager.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/ServerManager.java
index 2afd48c..a2ed4da 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/ServerManager.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/ServerManager.java
@@ -28,6 +28,7 @@
 import java.util.List;
 import java.util.Map;
 import java.util.Map.Entry;
+import java.util.Objects;
 import java.util.Set;
 import java.util.concurrent.ConcurrentNavigableMap;
 import java.util.concurrent.ConcurrentSkipListMap;
@@ -51,6 +52,7 @@
 import org.apache.hadoop.hbase.client.AsyncClusterConnection;
 import org.apache.hadoop.hbase.client.AsyncRegionServerAdmin;
 import org.apache.hadoop.hbase.client.RegionInfo;
+import org.apache.hadoop.hbase.conf.ConfigurationObserver;
 import org.apache.hadoop.hbase.ipc.RemoteWithExtrasException;
 import org.apache.hadoop.hbase.master.assignment.RegionStates;
 import org.apache.hadoop.hbase.master.procedure.ServerCrashProcedure;
@@ -100,7 +102,7 @@
  * only after the handler is fully enabled and has completed the handling.
  */
 @InterfaceAudience.Private
-public class ServerManager {
+public class ServerManager implements ConfigurationObserver {
   public static final String WAIT_ON_REGIONSERVERS_MAXTOSTART =
     "hbase.master.wait.on.regionservers.maxtostart";
 
@@ -172,6 +174,9 @@
   /** Listeners that are called on server events. */
   private List<ServerListener> listeners = new CopyOnWriteArrayList<>();
 
+  /** Configured value of HConstants.REJECT_DECOMMISSIONED_HOSTS_KEY */
+  private volatile boolean rejectDecommissionedHostsConfig;
+
   /**
    * Constructor.
    */
@@ -183,6 +188,35 @@
     warningSkew = c.getLong("hbase.master.warningclockskew", 10000);
     persistFlushedSequenceId =
       c.getBoolean(PERSIST_FLUSHEDSEQUENCEID, PERSIST_FLUSHEDSEQUENCEID_DEFAULT);
+    rejectDecommissionedHostsConfig = getRejectDecommissionedHostsConfig(c);
+  }
+
+  /**
+   * Implementation of the ConfigurationObserver interface. We are interested in live-loading the
+   * configuration value of HConstants.REJECT_DECOMMISSIONED_HOSTS_KEY
+   * @param conf Server configuration instance
+   */
+  @Override
+  public void onConfigurationChange(Configuration conf) {
+    final boolean newValue = getRejectDecommissionedHostsConfig(conf);
+    if (rejectDecommissionedHostsConfig == newValue) {
+      // no-op
+      return;
+    }
+
+    LOG.info("Config Reload for RejectDecommissionedHosts. previous value: {}, new value: {}",
+      rejectDecommissionedHostsConfig, newValue);
+
+    rejectDecommissionedHostsConfig = newValue;
+  }
+
+  /**
+   * Reads the value of HConstants.REJECT_DECOMMISSIONED_HOSTS_KEY from the config and returns it
+   * @param conf Configuration instance of the Master
+   */
+  public boolean getRejectDecommissionedHostsConfig(Configuration conf) {
+    return conf.getBoolean(HConstants.REJECT_DECOMMISSIONED_HOSTS_KEY,
+      HConstants.REJECT_DECOMMISSIONED_HOSTS_DEFAULT);
   }
 
   /**
@@ -227,11 +261,14 @@
     final String hostname =
       request.hasUseThisHostnameInstead() ? request.getUseThisHostnameInstead() : isaHostName;
     ServerName sn = ServerName.valueOf(hostname, request.getPort(), request.getServerStartCode());
+
+    // Check if the host should be rejected based on it's decommissioned status
+    checkRejectableDecommissionedStatus(sn);
+
     checkClockSkew(sn, request.getServerCurrentTime());
     checkIsDead(sn, "STARTUP");
     if (!checkAndRecordNewServer(sn, ServerMetricsBuilder.of(sn, versionNumber, version))) {
-      LOG.warn(
-        "THIS SHOULD NOT HAPPEN, RegionServerStartup" + " could not record the server: " + sn);
+      LOG.warn("THIS SHOULD NOT HAPPEN, RegionServerStartup could not record the server: {}", sn);
     }
     storage.started(sn);
     return sn;
@@ -294,6 +331,42 @@
   }
 
   /**
+   * Checks if the Master is configured to reject decommissioned hosts or not. When it's configured
+   * to do so, any RegionServer trying to join the cluster will have it's host checked against the
+   * list of hosts of currently decommissioned servers and potentially get prevented from reporting
+   * for duty; otherwise, we do nothing and we let them pass to the next check. See HBASE-28342 for
+   * details.
+   * @param sn The ServerName to check for
+   * @throws DecommissionedHostRejectedException if the Master is configured to reject
+   *                                             decommissioned hosts and this host exists in the
+   *                                             list of the decommissioned servers
+   */
+  private void checkRejectableDecommissionedStatus(ServerName sn)
+    throws DecommissionedHostRejectedException {
+    LOG.info("Checking decommissioned status of RegionServer {}", sn.getServerName());
+
+    // If the Master is not configured to reject decommissioned hosts, return early.
+    if (!rejectDecommissionedHostsConfig) {
+      return;
+    }
+
+    // Look for a match for the hostname in the list of decommissioned servers
+    for (ServerName server : getDrainingServersList()) {
+      if (Objects.equals(server.getHostname(), sn.getHostname())) {
+        // Found a match and master is configured to reject decommissioned hosts, throw exception!
+        LOG.warn(
+          "Rejecting RegionServer {} from reporting for duty because Master is configured "
+            + "to reject decommissioned hosts and this host was marked as such in the past.",
+          sn.getServerName());
+        throw new DecommissionedHostRejectedException(String.format(
+          "Host %s exists in the list of decommissioned servers and Master is configured to "
+            + "reject decommissioned hosts",
+          sn.getHostname()));
+      }
+    }
+  }
+
+  /**
    * Check is a server of same host and port already exists, if not, or the existed one got a
    * smaller start code, record it.
    * @param serverName the server to check and record
@@ -647,13 +720,8 @@
    * Remove the server from the drain list.
    */
   public synchronized boolean removeServerFromDrainList(final ServerName sn) {
-    // Warn if the server (sn) is not online. ServerName is of the form:
-    // <hostname> , <port> , <startcode>
+    LOG.info("Removing server {} from the draining list.", sn);
 
-    if (!this.isServerOnline(sn)) {
-      LOG.warn("Server " + sn + " is not currently online. "
-        + "Removing from draining list anyway, as requested.");
-    }
     // Remove the server from the draining servers lists.
     return this.drainingServers.remove(sn);
   }
@@ -663,22 +731,23 @@
    * @return True if the server is added or the server is already on the drain list.
    */
   public synchronized boolean addServerToDrainList(final ServerName sn) {
-    // Warn if the server (sn) is not online. ServerName is of the form:
-    // <hostname> , <port> , <startcode>
-
-    if (!this.isServerOnline(sn)) {
-      LOG.warn("Server " + sn + " is not currently online. "
-        + "Ignoring request to add it to draining list.");
+    // If master is not rejecting decommissioned hosts, warn if the server (sn) is not online.
+    // However, we want to add servers even if they're not online if the master is configured
+    // to reject decommissioned hosts
+    if (!rejectDecommissionedHostsConfig && !this.isServerOnline(sn)) {
+      LOG.warn("Server {} is not currently online. Ignoring request to add it to draining list.",
+        sn);
       return false;
     }
-    // Add the server to the draining servers lists, if it's not already in
-    // it.
+
+    // Add the server to the draining servers lists, if it's not already in it.
     if (this.drainingServers.contains(sn)) {
-      LOG.warn("Server " + sn + " is already in the draining server list."
-        + "Ignoring request to add it again.");
+      LOG.warn(
+        "Server {} is already in the draining server list. Ignoring request to add it again.", sn);
       return true;
     }
-    LOG.info("Server " + sn + " added to draining server list.");
+
+    LOG.info("Server {} added to draining server list.", sn);
     return this.drainingServers.add(sn);
   }
 
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java
index dfb8e2a..c71859e 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java
@@ -120,6 +120,7 @@
 import org.apache.hadoop.hbase.ipc.ServerNotRunningYetException;
 import org.apache.hadoop.hbase.ipc.ServerRpcController;
 import org.apache.hadoop.hbase.log.HBaseMarkers;
+import org.apache.hadoop.hbase.master.DecommissionedHostRejectedException;
 import org.apache.hadoop.hbase.mob.MobFileCache;
 import org.apache.hadoop.hbase.mob.RSMobFileCleanerChore;
 import org.apache.hadoop.hbase.monitoring.TaskMonitor;
@@ -2664,6 +2665,11 @@
         LOG.error(HBaseMarkers.FATAL, "Master rejected startup because clock is out of sync", ioe);
         // Re-throw IOE will cause RS to abort
         throw ioe;
+      } else if (ioe instanceof DecommissionedHostRejectedException) {
+        LOG.error(HBaseMarkers.FATAL,
+          "Master rejected startup because the host is considered decommissioned", ioe);
+        // Re-throw IOE will cause RS to abort
+        throw ioe;
       } else if (ioe instanceof ServerNotRunningYetException) {
         LOG.debug("Master is not running yet");
       } else {
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestRegionServerReportForDuty.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestRegionServerReportForDuty.java
index 4917d6f..b408229 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestRegionServerReportForDuty.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestRegionServerReportForDuty.java
@@ -17,11 +17,16 @@
  */
 package org.apache.hadoop.hbase.regionserver;
 
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertTrue;
+import static org.hamcrest.CoreMatchers.containsString;
+import static org.hamcrest.Matchers.allOf;
+import static org.hamcrest.Matchers.hasItem;
+import static org.hamcrest.Matchers.is;
+import static org.junit.Assert.*;
 
 import java.io.IOException;
 import java.io.StringWriter;
+import java.util.Arrays;
+import java.util.Collections;
 import java.util.concurrent.ScheduledThreadPoolExecutor;
 import java.util.concurrent.TimeUnit;
 import org.apache.commons.lang3.StringUtils;
@@ -30,9 +35,11 @@
 import org.apache.hadoop.hbase.HBaseTestingUtil;
 import org.apache.hadoop.hbase.HConstants;
 import org.apache.hadoop.hbase.LocalHBaseCluster;
+import org.apache.hadoop.hbase.MatcherPredicate;
 import org.apache.hadoop.hbase.ServerName;
 import org.apache.hadoop.hbase.SingleProcessHBaseCluster.MiniHBaseClusterRegionServer;
 import org.apache.hadoop.hbase.ipc.ServerNotRunningYetException;
+import org.apache.hadoop.hbase.master.DecommissionedHostRejectedException;
 import org.apache.hadoop.hbase.master.HMaster;
 import org.apache.hadoop.hbase.master.ServerManager;
 import org.apache.hadoop.hbase.testclassification.LargeTests;
@@ -242,6 +249,72 @@
   }
 
   /**
+   * Tests that the RegionServer's reportForDuty gets rejected by the master when the master is
+   * configured to reject decommissioned hosts and when there is a match for the joining
+   * RegionServer in the list of decommissioned servers. Test case for HBASE-28342.
+   */
+  @Test
+  public void testReportForDutyGetsRejectedByMasterWhenConfiguredToRejectDecommissionedHosts()
+    throws Exception {
+    // Start a master and wait for it to become the active/primary master.
+    // Use a random unique port
+    cluster.getConfiguration().setInt(HConstants.MASTER_PORT, HBaseTestingUtil.randomFreePort());
+    cluster.getConfiguration().setInt(ServerManager.WAIT_ON_REGIONSERVERS_MINTOSTART, 1);
+    cluster.getConfiguration().setInt(ServerManager.WAIT_ON_REGIONSERVERS_MAXTOSTART, 1);
+
+    // Set the cluster to reject decommissioned hosts
+    cluster.getConfiguration().setBoolean(HConstants.REJECT_DECOMMISSIONED_HOSTS_KEY, true);
+
+    master = cluster.addMaster();
+    rs = cluster.addRegionServer();
+    master.start();
+    rs.start();
+    waitForClusterOnline(master);
+
+    // Add a second decommissioned region server to the cluster, wait for it to fail reportForDuty
+    LogCapturer capturer =
+      new LogCapturer((org.apache.logging.log4j.core.Logger) org.apache.logging.log4j.LogManager
+        .getLogger(HRegionServer.class));
+
+    rs2 = cluster.addRegionServer();
+    master.getMaster().decommissionRegionServers(
+      Collections.singletonList(rs2.getRegionServer().getServerName()), false);
+    rs2.start();
+
+    // Assert that the second regionserver has aborted
+    testUtil.waitFor(TimeUnit.SECONDS.toMillis(90),
+      new MatcherPredicate<>(() -> rs2.getRegionServer().isAborted(), is(true)));
+
+    // Assert that the log messages for DecommissionedHostRejectedException exist in the logs
+    capturer.stopCapturing();
+
+    assertThat(capturer.getOutput(),
+      containsString("Master rejected startup because the host is considered decommissioned"));
+
+    /**
+     * Assert that the following log message occurred (one line):
+     * "org.apache.hadoop.hbase.master.DecommissionedHostRejectedException:
+     * org.apache.hadoop.hbase.master.DecommissionedHostRejectedException: Host localhost exists in
+     * the list of decommissioned servers and Master is configured to reject decommissioned hosts"
+     */
+    assertThat(Arrays.asList(capturer.getOutput().split("\n")),
+      hasItem(allOf(containsString(DecommissionedHostRejectedException.class.getSimpleName()),
+        containsString(DecommissionedHostRejectedException.class.getSimpleName()),
+        containsString("Host " + rs2.getRegionServer().getServerName().getHostname()
+          + " exists in the list of decommissioned servers and Master is configured to reject"
+          + " decommissioned hosts"))));
+
+    assertThat(Arrays.asList(capturer.getOutput().split("\n")),
+      hasItem(
+        allOf(containsString("ABORTING region server " + rs2.getRegionServer().getServerName()),
+          containsString("Unhandled"),
+          containsString(DecommissionedHostRejectedException.class.getSimpleName()),
+          containsString("Host " + rs2.getRegionServer().getServerName().getHostname()
+            + " exists in the list of decommissioned servers and Master is configured to reject"
+            + " decommissioned hosts"))));
+  }
+
+  /**
    * Tests region sever reportForDuty with a non-default environment edge
    */
   @Test