HDFS-16690. Automatically format unformatted JNs with JournalNodeSyncer (#6925). Contributed by Aswin M Prabhu.
Signed-off-by: He Xiaoqiao <hexiaoqiao@apache.org>
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java
index b9f8e07..dd3193f 100755
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java
@@ -1471,6 +1471,9 @@ public class DFSConfigKeys extends CommonConfigurationKeys {
public static final String DFS_JOURNALNODE_SYNC_INTERVAL_KEY =
"dfs.journalnode.sync.interval";
public static final long DFS_JOURNALNODE_SYNC_INTERVAL_DEFAULT = 2*60*1000L;
+ public static final String DFS_JOURNALNODE_ENABLE_SYNC_FORMAT_KEY =
+ "dfs.journalnode.enable.sync.format";
+ public static final boolean DFS_JOURNALNODE_ENABLE_SYNC_FORMAT_DEFAULT = false;
public static final String DFS_JOURNALNODE_EDIT_CACHE_SIZE_KEY =
"dfs.journalnode.edit-cache-size.bytes";
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/qjournal/protocol/InterQJournalProtocol.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/qjournal/protocol/InterQJournalProtocol.java
index f1f7e9c..c3eed14 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/qjournal/protocol/InterQJournalProtocol.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/qjournal/protocol/InterQJournalProtocol.java
@@ -20,6 +20,7 @@
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.hdfs.DFSConfigKeys;
+import org.apache.hadoop.hdfs.protocol.proto.HdfsServerProtos.StorageInfoProto;
import org.apache.hadoop.hdfs.qjournal.server.JournalNode;
import org.apache.hadoop.hdfs.qjournal.protocol.QJournalProtocolProtos.GetEditLogManifestResponseProto;
import org.apache.hadoop.security.KerberosInfo;
@@ -51,4 +52,13 @@ GetEditLogManifestResponseProto getEditLogManifestFromJournal(
String jid, String nameServiceId, long sinceTxId, boolean inProgressOk)
throws IOException;
+ /**
+ * Get the storage info for the specified journal.
+ * @param jid the journal identifier
+ * @param nameServiceId the name service id
+ * @return the storage info object
+ */
+ StorageInfoProto getStorageInfo(String jid, String nameServiceId)
+ throws IOException;
+
}
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/qjournal/protocolPB/InterQJournalProtocolServerSideTranslatorPB.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/qjournal/protocolPB/InterQJournalProtocolServerSideTranslatorPB.java
index ba5ddb1..ac67bcb 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/qjournal/protocolPB/InterQJournalProtocolServerSideTranslatorPB.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/qjournal/protocolPB/InterQJournalProtocolServerSideTranslatorPB.java
@@ -24,6 +24,8 @@
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.hdfs.qjournal.protocol.InterQJournalProtocol;
+import org.apache.hadoop.hdfs.protocol.proto.HdfsServerProtos.StorageInfoProto;
+import org.apache.hadoop.hdfs.qjournal.protocol.InterQJournalProtocolProtos.GetStorageInfoRequestProto;
import org.apache.hadoop.hdfs.qjournal.protocol.QJournalProtocolProtos.GetEditLogManifestRequestProto;
import org.apache.hadoop.hdfs.qjournal.protocol.QJournalProtocolProtos.GetEditLogManifestResponseProto;
@@ -60,4 +62,18 @@ public GetEditLogManifestResponseProto getEditLogManifestFromJournal(
throw new ServiceException(e);
}
}
+
+ @Override
+ public StorageInfoProto getStorageInfo(
+ RpcController controller, GetStorageInfoRequestProto request)
+ throws ServiceException {
+ try {
+ return impl.getStorageInfo(
+ request.getJid().getIdentifier(),
+ request.hasNameServiceId() ? request.getNameServiceId() : null
+ );
+ } catch (IOException e) {
+ throw new ServiceException(e);
+ }
+ }
}
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/qjournal/protocolPB/InterQJournalProtocolTranslatorPB.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/qjournal/protocolPB/InterQJournalProtocolTranslatorPB.java
index 4544308..49ae53f 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/qjournal/protocolPB/InterQJournalProtocolTranslatorPB.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/qjournal/protocolPB/InterQJournalProtocolTranslatorPB.java
@@ -19,6 +19,8 @@
package org.apache.hadoop.hdfs.qjournal.protocolPB;
+import org.apache.hadoop.hdfs.protocol.proto.HdfsServerProtos.StorageInfoProto;
+import org.apache.hadoop.hdfs.qjournal.protocol.InterQJournalProtocolProtos;
import org.apache.hadoop.thirdparty.protobuf.RpcController;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.classification.InterfaceStability;
@@ -75,6 +77,18 @@ public GetEditLogManifestResponseProto getEditLogManifestFromJournal(
req.build()));
}
+ @Override
+ public StorageInfoProto getStorageInfo(String jid, String nameServiceId)
+ throws IOException {
+ InterQJournalProtocolProtos.GetStorageInfoRequestProto.Builder req =
+ InterQJournalProtocolProtos.GetStorageInfoRequestProto.newBuilder()
+ .setJid(convertJournalId(jid));
+ if (nameServiceId != null) {
+ req.setNameServiceId(nameServiceId);
+ }
+ return ipc(() -> rpcProxy.getStorageInfo(NULL_CONTROLLER, req.build()));
+ }
+
private QJournalProtocolProtos.JournalIdProto convertJournalId(String jid) {
return QJournalProtocolProtos.JournalIdProto.newBuilder()
.setIdentifier(jid)
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/qjournal/server/JournalNodeRpcServer.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/qjournal/server/JournalNodeRpcServer.java
index 7e33ab5..b09d09a 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/qjournal/server/JournalNodeRpcServer.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/qjournal/server/JournalNodeRpcServer.java
@@ -18,6 +18,7 @@
package org.apache.hadoop.hdfs.qjournal.server;
import org.apache.hadoop.classification.VisibleForTesting;
+import org.apache.hadoop.hdfs.protocol.proto.HdfsServerProtos.StorageInfoProto;
import org.apache.hadoop.thirdparty.protobuf.BlockingService;
import org.slf4j.Logger;
import org.apache.hadoop.classification.InterfaceAudience;
@@ -71,14 +72,14 @@ public class JournalNodeRpcServer implements QJournalProtocol,
JournalNodeRpcServer(Configuration conf, JournalNode jn) throws IOException {
this.jn = jn;
-
+
Configuration confCopy = new Configuration(conf);
-
+
// Ensure that nagling doesn't kick in, which could cause latency issues.
confCopy.setBoolean(
CommonConfigurationKeysPublic.IPC_SERVER_TCPNODELAY_KEY,
true);
-
+
InetSocketAddress addr = getAddress(confCopy);
String bindHost = conf.getTrimmed(DFS_JOURNALNODE_RPC_BIND_HOST_KEY, null);
if (bindHost == null) {
@@ -104,7 +105,7 @@ public class JournalNodeRpcServer implements QJournalProtocol,
this.handlerCount = confHandlerCount;
LOG.info("The number of JournalNodeRpcServer handlers is {}.",
this.handlerCount);
-
+
this.server = new RPC.Builder(confCopy)
.setProtocol(QJournalProtocolPB.class)
.setInstance(service)
@@ -149,15 +150,15 @@ void start() {
public InetSocketAddress getAddress() {
return server.getListenerAddress();
}
-
+
void join() throws InterruptedException {
this.server.join();
}
-
+
void stop() {
this.server.stop();
}
-
+
static InetSocketAddress getAddress(Configuration conf) {
String addr = conf.get(
DFSConfigKeys.DFS_JOURNALNODE_RPC_ADDRESS_KEY,
@@ -211,7 +212,7 @@ public void journal(RequestInfo reqInfo,
jn.getOrCreateJournal(reqInfo.getJournalId(), reqInfo.getNameServiceId())
.journal(reqInfo, segmentTxId, firstTxnId, numTxns, records);
}
-
+
@Override
public void heartbeat(RequestInfo reqInfo) throws IOException {
jn.getOrCreateJournal(reqInfo.getJournalId(), reqInfo.getNameServiceId())
@@ -245,10 +246,10 @@ public GetEditLogManifestResponseProto getEditLogManifest(
String jid, String nameServiceId,
long sinceTxId, boolean inProgressOk)
throws IOException {
-
+
RemoteEditLogManifest manifest = jn.getOrCreateJournal(jid, nameServiceId)
.getEditLogManifest(sinceTxId, inProgressOk);
-
+
return GetEditLogManifestResponseProto.newBuilder()
.setManifest(PBHelper.convert(manifest))
.setHttpPort(jn.getBoundHttpAddress().getPort())
@@ -257,6 +258,13 @@ public GetEditLogManifestResponseProto getEditLogManifest(
}
@Override
+ public StorageInfoProto getStorageInfo(String jid,
+ String nameServiceId) throws IOException {
+ StorageInfo storage = jn.getOrCreateJournal(jid, nameServiceId).getStorage();
+ return PBHelper.convert(storage);
+ }
+
+ @Override
public GetJournaledEditsResponseProto getJournaledEdits(String jid,
String nameServiceId, long sinceTxId, int maxTxns) throws IOException {
return jn.getOrCreateJournal(jid, nameServiceId)
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/qjournal/server/JournalNodeSyncer.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/qjournal/server/JournalNodeSyncer.java
index f451b46..7501059 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/qjournal/server/JournalNodeSyncer.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/qjournal/server/JournalNodeSyncer.java
@@ -18,6 +18,9 @@
package org.apache.hadoop.hdfs.qjournal.server;
import org.apache.hadoop.classification.VisibleForTesting;
+import org.apache.hadoop.hdfs.protocol.proto.HdfsServerProtos;
+import org.apache.hadoop.hdfs.server.common.HdfsServerConstants;
+import org.apache.hadoop.hdfs.server.common.StorageInfo;
import org.apache.hadoop.thirdparty.com.google.common.collect.ImmutableList;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.conf.Configuration;
@@ -79,6 +82,7 @@ public class JournalNodeSyncer {
private int numOtherJNs;
private int journalNodeIndexForSync = 0;
private final long journalSyncInterval;
+ private final boolean tryFormatting;
private final int logSegmentTransferTimeout;
private final DataTransferThrottler throttler;
private final JournalMetrics metrics;
@@ -98,6 +102,9 @@ public class JournalNodeSyncer {
logSegmentTransferTimeout = conf.getInt(
DFSConfigKeys.DFS_EDIT_LOG_TRANSFER_TIMEOUT_KEY,
DFSConfigKeys.DFS_EDIT_LOG_TRANSFER_TIMEOUT_DEFAULT);
+ tryFormatting = conf.getBoolean(
+ DFSConfigKeys.DFS_JOURNALNODE_ENABLE_SYNC_FORMAT_KEY,
+ DFSConfigKeys.DFS_JOURNALNODE_ENABLE_SYNC_FORMAT_DEFAULT);
throttler = getThrottler(conf);
metrics = journal.getMetrics();
journalSyncerStarted = false;
@@ -171,6 +178,8 @@ private void startSyncJournalsDaemon() {
// Wait for journal to be formatted to create edits.sync directory
while(!journal.isFormatted()) {
try {
+ // Format the journal with namespace info from the other JNs if it is not formatted
+ formatWithSyncer();
Thread.sleep(journalSyncInterval);
} catch (InterruptedException e) {
LOG.error("JournalNodeSyncer daemon received Runtime exception.", e);
@@ -187,7 +196,15 @@ private void startSyncJournalsDaemon() {
while(shouldSync) {
try {
if (!journal.isFormatted()) {
- LOG.warn("Journal cannot sync. Not formatted.");
+ LOG.warn("Journal cannot sync. Not formatted. Trying to format with the syncer");
+ formatWithSyncer();
+ if (journal.isFormatted() && !createEditsSyncDir()) {
+ LOG.error("Failed to create directory for downloading log " +
+ "segments: {}. Stopping Journal Node Sync.",
+ journal.getStorage().getEditsSyncDir());
+ return;
+ }
+ continue;
} else {
syncJournals();
}
@@ -233,6 +250,68 @@ private void syncJournals() {
journalNodeIndexForSync = (journalNodeIndexForSync + 1) % numOtherJNs;
}
+ private void formatWithSyncer() {
+ if (!tryFormatting) {
+ return;
+ }
+ LOG.info("Trying to format the journal with the syncer");
+ try {
+ StorageInfo storage = null;
+ for (JournalNodeProxy jnProxy : otherJNProxies) {
+ if (!hasEditLogs(jnProxy)) {
+ // This avoids a race condition between `hdfs namenode -format` and
+ // JN syncer by checking if the other JN is not newly formatted.
+ continue;
+ }
+ try {
+ HdfsServerProtos.StorageInfoProto storageInfoResponse =
+ jnProxy.jnProxy.getStorageInfo(jid, nameServiceId);
+ storage = PBHelper.convert(
+ storageInfoResponse, HdfsServerConstants.NodeType.JOURNAL_NODE
+ );
+ if (storage.getNamespaceID() == 0) {
+ LOG.error("Got invalid StorageInfo from " + jnProxy);
+ storage = null;
+ continue;
+ }
+ LOG.info("Got StorageInfo " + storage + " from " + jnProxy);
+ break;
+ } catch (IOException e) {
+ LOG.error("Could not get StorageInfo from " + jnProxy, e);
+ }
+ }
+ if (storage == null) {
+ LOG.error("Could not get StorageInfo from any JournalNode. " +
+ "JournalNodeSyncer cannot format the journal.");
+ return;
+ }
+ NamespaceInfo nsInfo = new NamespaceInfo(storage);
+ journal.format(nsInfo, true);
+ } catch (IOException e) {
+ LOG.error("Exception in formatting the journal with the syncer", e);
+ }
+ }
+
+ private boolean hasEditLogs(JournalNodeProxy journalProxy) {
+ GetEditLogManifestResponseProto editLogManifest;
+ try {
+ editLogManifest = journalProxy.jnProxy.getEditLogManifestFromJournal(
+ jid, nameServiceId, 0, false);
+ } catch (IOException e) {
+ LOG.error("Could not get edit log manifest from " + journalProxy, e);
+ return false;
+ }
+
+ List<RemoteEditLog> otherJournalEditLogs = PBHelper.convert(
+ editLogManifest.getManifest()).getLogs();
+ if (otherJournalEditLogs == null || otherJournalEditLogs.isEmpty()) {
+ LOG.warn("Journal at " + journalProxy.jnAddr + " has no edit logs");
+ return false;
+ }
+
+ return true;
+ }
+
private void syncWithJournalAtIndex(int index) {
LOG.info("Syncing Journal " + jn.getBoundIpcAddress().getAddress() + ":"
+ jn.getBoundIpcAddress().getPort() + " with "
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/proto/InterQJournalProtocol.proto b/hadoop-hdfs-project/hadoop-hdfs/src/main/proto/InterQJournalProtocol.proto
index 1c78423..5510eeb 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/proto/InterQJournalProtocol.proto
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/proto/InterQJournalProtocol.proto
@@ -31,8 +31,15 @@
import "HdfsServer.proto";
import "QJournalProtocol.proto";
+message GetStorageInfoRequestProto {
+ required JournalIdProto jid = 1;
+ optional string nameServiceId = 2;
+}
service InterQJournalProtocolService {
rpc getEditLogManifestFromJournal(GetEditLogManifestRequestProto)
returns (GetEditLogManifestResponseProto);
+
+ rpc getStorageInfo(GetStorageInfoRequestProto)
+ returns (StorageInfoProto);
}
\ No newline at end of file
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/resources/hdfs-default.xml b/hadoop-hdfs-project/hadoop-hdfs/src/main/resources/hdfs-default.xml
index d6fefa4..1295c0d 100755
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/resources/hdfs-default.xml
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/resources/hdfs-default.xml
@@ -5072,6 +5072,16 @@
</property>
<property>
+ <name>dfs.journalnode.enable.sync.format</name>
+ <value>false</value>
+ <description>
+ If true, the journal node syncer daemon that tries to sync edit
+ logs between journal nodes will try to format its journal if it is not.
+ It will query the other journal nodes for the storage info required to format.
+ </description>
+</property>
+
+<property>
<name>dfs.journalnode.edit-cache-size.bytes</name>
<value></value>
<description>
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/qjournal/server/TestJournalNodeSync.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/qjournal/server/TestJournalNodeSync.java
index 28e36e0..ac250ff 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/qjournal/server/TestJournalNodeSync.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/qjournal/server/TestJournalNodeSync.java
@@ -20,6 +20,7 @@
import java.net.InetSocketAddress;
import java.net.URISyntaxException;
import java.util.function.Supplier;
+import org.apache.commons.io.FileUtils;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hdfs.DFSConfigKeys;
@@ -75,6 +76,7 @@ public void setUpMiniCluster() throws IOException {
conf = new HdfsConfiguration();
conf.setBoolean(DFSConfigKeys.DFS_JOURNALNODE_ENABLE_SYNC_KEY, true);
conf.setLong(DFSConfigKeys.DFS_JOURNALNODE_SYNC_INTERVAL_KEY, 1000L);
+ conf.setBoolean(DFSConfigKeys.DFS_JOURNALNODE_ENABLE_SYNC_FORMAT_KEY, true);
if (testName.getMethodName().equals(
"testSyncAfterJNdowntimeWithoutQJournalQueue")) {
conf.setInt(DFSConfigKeys.DFS_QJOURNAL_QUEUE_SIZE_LIMIT_KEY, 0);
@@ -478,6 +480,33 @@ public void testSyncDuringRollingUpgrade() throws Exception {
}
}
+ @Test(timeout=300_000)
+ public void testFormatWithSyncer() throws Exception {
+ File firstJournalDir = jCluster.getJournalDir(0, jid);
+ File firstJournalCurrentDir = new StorageDirectory(firstJournalDir)
+ .getCurrentDir();
+
+ // Generate some edit logs
+ long firstTxId = generateEditLog();
+
+ // Delete them from the JN01
+ List<File> missingLogs = Lists.newArrayList();
+ missingLogs.add(deleteEditLog(firstJournalCurrentDir, firstTxId));
+
+ // Wait to ensure sync starts, delete the storage directory itself to simulate a disk wipe
+ // and ensure that the in-memory formatting state of JNStorage gets updated
+ Thread.sleep(2000);
+ FileUtils.deleteDirectory(firstJournalDir);
+ jCluster.getJournalNode(0).getOrCreateJournal(jid).getStorage().analyzeStorage();
+
+ // Wait for JN formatting with Syncer
+ GenericTestUtils.waitFor(jnFormatted(0), 500, 30000);
+ // Generate some more edit log so that the JN updates its committed tx id
+ generateEditLog();
+ // Check that the missing edit logs have been synced
+ GenericTestUtils.waitFor(editLogExists(missingLogs), 500, 30000);
+ }
+
private File deleteEditLog(File currentDir, long startTxId)
throws IOException {
EditLogFile logFile = getLogFile(currentDir, startTxId);
@@ -581,4 +610,19 @@ public Boolean get() {
};
return supplier;
}
+
+ private Supplier<Boolean> jnFormatted(int jnIndex) throws Exception {
+ Supplier<Boolean> supplier = new Supplier<Boolean>() {
+ @Override
+ public Boolean get() {
+ try {
+ return jCluster.getJournalNode(jnIndex).getOrCreateJournal(jid)
+ .isFormatted();
+ } catch (Exception e) {
+ return false;
+ }
+ }
+ };
+ return supplier;
+ }
}