HDFS-16684. Exclude the current JournalNode (#4723)

Exclude bound local addresses, including the use of a wildcard address in the bound host configurations.
* Allow sync attempts with unresolved addresses
* Update the comments.
* Remove unused import

Signed-off-by: stack <stack@apache.org>
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/qjournal/server/JournalNode.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/qjournal/server/JournalNode.java
index df1314a..3e8831d 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/qjournal/server/JournalNode.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/qjournal/server/JournalNode.java
@@ -125,6 +125,11 @@
   }
 
   @VisibleForTesting
+  public JournalNodeSyncer getJournalSyncer(String jid) {
+    return journalSyncersById.get(jid);
+  }
+
+  @VisibleForTesting
   public boolean getJournalSyncerStatus(String jid) {
     if (journalSyncersById.get(jid) != null) {
       return journalSyncersById.get(jid).isJournalSyncerStarted();
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/qjournal/server/JournalNodeSyncer.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/qjournal/server/JournalNodeSyncer.java
index fd29c84..f451b46 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/qjournal/server/JournalNodeSyncer.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/qjournal/server/JournalNodeSyncer.java
@@ -17,6 +17,7 @@
  */
 package org.apache.hadoop.hdfs.qjournal.server;
 
+import org.apache.hadoop.classification.VisibleForTesting;
 import org.apache.hadoop.thirdparty.com.google.common.collect.ImmutableList;
 import org.apache.hadoop.classification.InterfaceAudience;
 import org.apache.hadoop.conf.Configuration;
@@ -39,6 +40,7 @@
 import org.apache.hadoop.security.UserGroupInformation;
 import org.apache.hadoop.util.Daemon;
 import org.apache.hadoop.util.Lists;
+import org.apache.hadoop.util.Sets;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -50,10 +52,10 @@
 import java.net.URISyntaxException;
 import java.net.URL;
 import java.security.PrivilegedExceptionAction;
-import java.util.Arrays;
 import java.util.Collection;
 import java.util.HashSet;
 import java.util.List;
+import java.util.Set;
 
 /**
  * A Journal Sync thread runs through the lifetime of the JN. It periodically
@@ -153,6 +155,9 @@
         LOG.warn("Could not add proxy for Journal at addresss " + addr, e);
       }
     }
+    // Check if there are any other JournalNodes before starting the sync.  Although some proxies
+    // may be unresolved now, the act of attempting to sync will instigate resolution when the
+    // servers become available.
     if (otherJNProxies.isEmpty()) {
       LOG.error("Cannot sync as there is no other JN available for sync.");
       return false;
@@ -310,12 +315,24 @@
     return null;
   }
 
-  private List<InetSocketAddress> getJournalAddrList(String uriStr) throws
+  @VisibleForTesting
+  protected List<InetSocketAddress> getJournalAddrList(String uriStr) throws
       URISyntaxException,
       IOException {
     URI uri = new URI(uriStr);
-    return Util.getLoggerAddresses(uri,
-        new HashSet<>(Arrays.asList(jn.getBoundIpcAddress())), conf);
+
+    InetSocketAddress boundIpcAddress = jn.getBoundIpcAddress();
+    Set<InetSocketAddress> excluded = Sets.newHashSet(boundIpcAddress);
+    List<InetSocketAddress> addrList = Util.getLoggerAddresses(uri, excluded, conf);
+
+    // Exclude the current JournalNode instance (a local address and the same port).  If the address
+    // is bound to a local address on the same port, then remove it to handle scenarios where a
+    // wildcard address (e.g. "0.0.0.0") is used.   We can't simply exclude all local addresses
+    // since we may be running multiple servers on the same host.
+    addrList.removeIf(addr -> !addr.isUnresolved() &&  addr.getAddress().isAnyLocalAddress()
+          && boundIpcAddress.getPort() == addr.getPort());
+
+    return addrList;
   }
 
   private void getMissingLogSegments(List<RemoteEditLog> thisJournalEditLogs,
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/qjournal/server/TestJournalNodeSync.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/qjournal/server/TestJournalNodeSync.java
index 1564e41..28e36e0 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/qjournal/server/TestJournalNodeSync.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/qjournal/server/TestJournalNodeSync.java
@@ -17,6 +17,8 @@
  */
 package org.apache.hadoop.hdfs.qjournal.server;
 
+import java.net.InetSocketAddress;
+import java.net.URISyntaxException;
 import java.util.function.Supplier;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.Path;
@@ -34,6 +36,7 @@
 import static org.apache.hadoop.hdfs.server.namenode.FileJournalManager
     .getLogFile;
 import static org.assertj.core.api.Assertions.assertThat;
+import static org.junit.Assert.assertEquals;
 
 import org.apache.hadoop.hdfs.server.protocol.NamespaceInfo;
 import org.apache.hadoop.test.GenericTestUtils;
@@ -96,12 +99,45 @@
     }
   }
 
+  /**
+   * Test that the "self exclusion" works when there are multiple JournalNode instances running on
+   * the same server, but on different ports.
+   */
+  @Test
+  public void testJournalNodeExcludesSelfMultilpePorts() throws URISyntaxException, IOException {
+    String uri = qjmhaCluster.getJournalCluster().getQuorumJournalURI("ns1").toString();
+    JournalNodeSyncer syncer = jCluster.getJournalNode(0).getJournalSyncer("ns1");
+
+    // Test: Get the Journal address list for the default configuration
+    List<InetSocketAddress> addrList = syncer.getJournalAddrList(uri);
+
+    // Verify: One of the addresses should be excluded so that the node isn't syncing with itself
+    assertEquals(2, addrList.size());
+  }
+
+  /**
+   * Test that the "self exclusion" works when there a host uses a wildcard address.
+   */
+  @Test
+  public void testJournalNodeExcludesSelfWildCard() throws URISyntaxException, IOException {
+    String uri = qjmhaCluster.getJournalCluster().getQuorumJournalURI("ns1").toString();
+    JournalNodeSyncer syncer = jCluster.getJournalNode(0).getJournalSyncer("ns1");
+
+    // Test: Request the same Journal address list, but using the IPv4 "0.0.0.0" which is commonly
+    // used as a bind host.
+    String boundHostUri = uri.replaceAll("127.0.0.1", "0.0.0.0");
+    List<InetSocketAddress> boundHostAddrList = syncer.getJournalAddrList(boundHostUri);
+
+    // Verify: One of the address should be excluded so that the node isn't syncing with itself
+    assertEquals(2, boundHostAddrList.size());
+  }
+
   @Test(timeout=30000)
   public void testJournalNodeSync() throws Exception {
 
     //As by default 3 journal nodes are started;
     for(int i=0; i<3; i++) {
-      Assert.assertEquals(true,
+      assertEquals(true,
           jCluster.getJournalNode(i).getJournalSyncerStatus("ns1"));
     }
 
@@ -386,13 +422,13 @@
           HdfsConstants.RollingUpgradeAction.PREPARE);
 
     //query rolling upgrade
-    Assert.assertEquals(info, dfsActive.rollingUpgrade(
+    assertEquals(info, dfsActive.rollingUpgrade(
         HdfsConstants.RollingUpgradeAction.QUERY));
 
     // Restart the Standby NN with rollingUpgrade option
     dfsCluster.restartNameNode(standbyNNindex, true,
         "-rollingUpgrade", "started");
-    Assert.assertEquals(info, dfsActive.rollingUpgrade(
+    assertEquals(info, dfsActive.rollingUpgrade(
         HdfsConstants.RollingUpgradeAction.QUERY));
 
     // Do some edits and delete some edit logs
@@ -420,7 +456,7 @@
     // Restart the current standby NN (previously active)
     dfsCluster.restartNameNode(standbyNNindex, true,
         "-rollingUpgrade", "started");
-    Assert.assertEquals(info, dfsActive.rollingUpgrade(
+    assertEquals(info, dfsActive.rollingUpgrade(
         HdfsConstants.RollingUpgradeAction.QUERY));
     dfsCluster.waitActive();