HDFS-16684. Exclude the current JournalNode (#4723)
Exclude bound local addresses, including the use of a wildcard address in the bound host configurations.
* Allow sync attempts with unresolved addresses
* Update the comments.
* Remove unused import
Signed-off-by: stack <stack@apache.org>
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/qjournal/server/JournalNode.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/qjournal/server/JournalNode.java
index df1314a..3e8831d 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/qjournal/server/JournalNode.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/qjournal/server/JournalNode.java
@@ -125,6 +125,11 @@
}
@VisibleForTesting
+ public JournalNodeSyncer getJournalSyncer(String jid) {
+ return journalSyncersById.get(jid);
+ }
+
+ @VisibleForTesting
public boolean getJournalSyncerStatus(String jid) {
if (journalSyncersById.get(jid) != null) {
return journalSyncersById.get(jid).isJournalSyncerStarted();
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/qjournal/server/JournalNodeSyncer.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/qjournal/server/JournalNodeSyncer.java
index fd29c84..f451b46 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/qjournal/server/JournalNodeSyncer.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/qjournal/server/JournalNodeSyncer.java
@@ -17,6 +17,7 @@
*/
package org.apache.hadoop.hdfs.qjournal.server;
+import org.apache.hadoop.classification.VisibleForTesting;
import org.apache.hadoop.thirdparty.com.google.common.collect.ImmutableList;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.conf.Configuration;
@@ -39,6 +40,7 @@
import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.util.Daemon;
import org.apache.hadoop.util.Lists;
+import org.apache.hadoop.util.Sets;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -50,10 +52,10 @@
import java.net.URISyntaxException;
import java.net.URL;
import java.security.PrivilegedExceptionAction;
-import java.util.Arrays;
import java.util.Collection;
import java.util.HashSet;
import java.util.List;
+import java.util.Set;
/**
* A Journal Sync thread runs through the lifetime of the JN. It periodically
@@ -153,6 +155,9 @@
LOG.warn("Could not add proxy for Journal at addresss " + addr, e);
}
}
+ // Check if there are any other JournalNodes before starting the sync. Although some proxies
+ // may be unresolved now, the act of attempting to sync will instigate resolution when the
+ // servers become available.
if (otherJNProxies.isEmpty()) {
LOG.error("Cannot sync as there is no other JN available for sync.");
return false;
@@ -310,12 +315,24 @@
return null;
}
- private List<InetSocketAddress> getJournalAddrList(String uriStr) throws
+ @VisibleForTesting
+ protected List<InetSocketAddress> getJournalAddrList(String uriStr) throws
URISyntaxException,
IOException {
URI uri = new URI(uriStr);
- return Util.getLoggerAddresses(uri,
- new HashSet<>(Arrays.asList(jn.getBoundIpcAddress())), conf);
+
+ InetSocketAddress boundIpcAddress = jn.getBoundIpcAddress();
+ Set<InetSocketAddress> excluded = Sets.newHashSet(boundIpcAddress);
+ List<InetSocketAddress> addrList = Util.getLoggerAddresses(uri, excluded, conf);
+
+ // Exclude the current JournalNode instance (a local address and the same port). If the address
+ // is bound to a local address on the same port, then remove it to handle scenarios where a
+ // wildcard address (e.g. "0.0.0.0") is used. We can't simply exclude all local addresses
+ // since we may be running multiple servers on the same host.
+ addrList.removeIf(addr -> !addr.isUnresolved() && addr.getAddress().isAnyLocalAddress()
+ && boundIpcAddress.getPort() == addr.getPort());
+
+ return addrList;
}
private void getMissingLogSegments(List<RemoteEditLog> thisJournalEditLogs,
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/qjournal/server/TestJournalNodeSync.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/qjournal/server/TestJournalNodeSync.java
index 1564e41..28e36e0 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/qjournal/server/TestJournalNodeSync.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/qjournal/server/TestJournalNodeSync.java
@@ -17,6 +17,8 @@
*/
package org.apache.hadoop.hdfs.qjournal.server;
+import java.net.InetSocketAddress;
+import java.net.URISyntaxException;
import java.util.function.Supplier;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
@@ -34,6 +36,7 @@
import static org.apache.hadoop.hdfs.server.namenode.FileJournalManager
.getLogFile;
import static org.assertj.core.api.Assertions.assertThat;
+import static org.junit.Assert.assertEquals;
import org.apache.hadoop.hdfs.server.protocol.NamespaceInfo;
import org.apache.hadoop.test.GenericTestUtils;
@@ -96,12 +99,45 @@
}
}
+ /**
+ * Test that the "self exclusion" works when there are multiple JournalNode instances running on
+ * the same server, but on different ports.
+ */
+ @Test
+ public void testJournalNodeExcludesSelfMultilpePorts() throws URISyntaxException, IOException {
+ String uri = qjmhaCluster.getJournalCluster().getQuorumJournalURI("ns1").toString();
+ JournalNodeSyncer syncer = jCluster.getJournalNode(0).getJournalSyncer("ns1");
+
+ // Test: Get the Journal address list for the default configuration
+ List<InetSocketAddress> addrList = syncer.getJournalAddrList(uri);
+
+ // Verify: One of the addresses should be excluded so that the node isn't syncing with itself
+ assertEquals(2, addrList.size());
+ }
+
+ /**
+ * Test that the "self exclusion" works when there a host uses a wildcard address.
+ */
+ @Test
+ public void testJournalNodeExcludesSelfWildCard() throws URISyntaxException, IOException {
+ String uri = qjmhaCluster.getJournalCluster().getQuorumJournalURI("ns1").toString();
+ JournalNodeSyncer syncer = jCluster.getJournalNode(0).getJournalSyncer("ns1");
+
+ // Test: Request the same Journal address list, but using the IPv4 "0.0.0.0" which is commonly
+ // used as a bind host.
+ String boundHostUri = uri.replaceAll("127.0.0.1", "0.0.0.0");
+ List<InetSocketAddress> boundHostAddrList = syncer.getJournalAddrList(boundHostUri);
+
+ // Verify: One of the address should be excluded so that the node isn't syncing with itself
+ assertEquals(2, boundHostAddrList.size());
+ }
+
@Test(timeout=30000)
public void testJournalNodeSync() throws Exception {
//As by default 3 journal nodes are started;
for(int i=0; i<3; i++) {
- Assert.assertEquals(true,
+ assertEquals(true,
jCluster.getJournalNode(i).getJournalSyncerStatus("ns1"));
}
@@ -386,13 +422,13 @@
HdfsConstants.RollingUpgradeAction.PREPARE);
//query rolling upgrade
- Assert.assertEquals(info, dfsActive.rollingUpgrade(
+ assertEquals(info, dfsActive.rollingUpgrade(
HdfsConstants.RollingUpgradeAction.QUERY));
// Restart the Standby NN with rollingUpgrade option
dfsCluster.restartNameNode(standbyNNindex, true,
"-rollingUpgrade", "started");
- Assert.assertEquals(info, dfsActive.rollingUpgrade(
+ assertEquals(info, dfsActive.rollingUpgrade(
HdfsConstants.RollingUpgradeAction.QUERY));
// Do some edits and delete some edit logs
@@ -420,7 +456,7 @@
// Restart the current standby NN (previously active)
dfsCluster.restartNameNode(standbyNNindex, true,
"-rollingUpgrade", "started");
- Assert.assertEquals(info, dfsActive.rollingUpgrade(
+ assertEquals(info, dfsActive.rollingUpgrade(
HdfsConstants.RollingUpgradeAction.QUERY));
dfsCluster.waitActive();