YARN-321. Forwarding YARN-321 branch to latest trunk.
git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/branches/YARN-321@1561449 13f79535-47bb-0310-9956-ffa450edef68
diff --git a/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt b/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt
index b221d9d..09eaf6bd 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt
+++ b/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt
@@ -148,6 +148,8 @@
HDFS-5721. sharedEditsImage in Namenode#initializeSharedEdits() should be
closed before method returns. (Ted Yu via junping_du)
+ HDFS-5138. Support HDFS upgrade in HA. (atm via todd)
+
OPTIMIZATIONS
BUG FIXES
@@ -594,6 +596,9 @@
HDFS-5806. balancer should set SoTimeout to avoid indefinite hangs.
(Nathan Roberts via Andrew Wang).
+ HDFS-5728. Block recovery will fail if the metafile does not have crc
+ for all chunks of the block (Vinay via kihwal)
+
BREAKDOWN OF HDFS-2832 SUBTASKS AND RELATED JIRAS
HDFS-4985. Add storage type to the protocol and expose it in block report
@@ -1134,6 +1139,9 @@
HDFS-5789. Some of snapshot APIs missing checkOperation double check in fsn. (umamahesh)
+ HDFS-5343. When cat command is issued on snapshot files getting unexpected result.
+ (Sathish via umamahesh)
+
Release 2.2.0 - 2013-10-13
INCOMPATIBLE CHANGES
diff --git a/hadoop-hdfs-project/hadoop-hdfs/dev-support/findbugsExcludeFile.xml b/hadoop-hdfs-project/hadoop-hdfs/dev-support/findbugsExcludeFile.xml
index f971107..028e64c 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/dev-support/findbugsExcludeFile.xml
+++ b/hadoop-hdfs-project/hadoop-hdfs/dev-support/findbugsExcludeFile.xml
@@ -361,5 +361,10 @@
<Class name="org.apache.hadoop.hdfs.server.blockmanagement.CacheReplicationMonitor" />
<Bug pattern="RV_RETURN_VALUE_IGNORED_BAD_PRACTICE" />
</Match>
+ <Match>
+ <Class name="org.apache.hadoop.hdfs.DFSUtil"/>
+ <Method name="assertAllResultsEqual" />
+ <Bug pattern="NP_LOAD_OF_KNOWN_NULL_VALUE" />
+ </Match>
</FindBugsFilter>
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/contrib/bkjournal/src/main/java/org/apache/hadoop/contrib/bkjournal/BookKeeperJournalManager.java b/hadoop-hdfs-project/hadoop-hdfs/src/contrib/bkjournal/src/main/java/org/apache/hadoop/contrib/bkjournal/BookKeeperJournalManager.java
index 5dc1281..91ccc54 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/contrib/bkjournal/src/main/java/org/apache/hadoop/contrib/bkjournal/BookKeeperJournalManager.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/contrib/bkjournal/src/main/java/org/apache/hadoop/contrib/bkjournal/BookKeeperJournalManager.java
@@ -18,6 +18,8 @@
package org.apache.hadoop.contrib.bkjournal;
import org.apache.hadoop.hdfs.protocol.HdfsConstants;
+import org.apache.hadoop.hdfs.server.common.Storage;
+import org.apache.hadoop.hdfs.server.common.StorageInfo;
import org.apache.hadoop.hdfs.server.namenode.JournalManager;
import org.apache.hadoop.hdfs.server.namenode.EditLogOutputStream;
import org.apache.hadoop.hdfs.server.namenode.EditLogInputStream;
@@ -660,6 +662,37 @@
}
@Override
+ public void doPreUpgrade() throws IOException {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public void doUpgrade(Storage storage) throws IOException {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public long getJournalCTime() throws IOException {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public void doFinalize() throws IOException {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public boolean canRollBack(StorageInfo storage, StorageInfo prevStorage,
+ int targetLayoutVersion) throws IOException {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public void doRollback() throws IOException {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
public void close() throws IOException {
try {
bkc.close();
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/contrib/bkjournal/src/test/java/org/apache/hadoop/contrib/bkjournal/TestBookKeeperAsHASharedDir.java b/hadoop-hdfs-project/hadoop-hdfs/src/contrib/bkjournal/src/test/java/org/apache/hadoop/contrib/bkjournal/TestBookKeeperAsHASharedDir.java
index 0a14e78..5611bb8 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/contrib/bkjournal/src/test/java/org/apache/hadoop/contrib/bkjournal/TestBookKeeperAsHASharedDir.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/contrib/bkjournal/src/test/java/org/apache/hadoop/contrib/bkjournal/TestBookKeeperAsHASharedDir.java
@@ -316,7 +316,7 @@
} catch (IOException ioe) {
LOG.info("Got expected exception", ioe);
GenericTestUtils.assertExceptionContains(
- "Cannot start an HA namenode with name dirs that need recovery", ioe);
+ "storage directory does not exist or is not accessible", ioe);
}
}
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSInputStream.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSInputStream.java
index d90317d..73861bc 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSInputStream.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSInputStream.java
@@ -792,6 +792,9 @@
currentNode = blockSeekTo(pos);
}
int realLen = (int) Math.min(len, (blockEnd - pos + 1L));
+ if (locatedBlocks.isLastBlockComplete()) {
+ realLen = (int) Math.min(realLen, locatedBlocks.getFileLength());
+ }
int result = readBuffer(strategy, off, realLen, corruptedBlockMap);
if (result >= 0) {
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSUtil.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSUtil.java
index 1d0421e..abddf1f 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSUtil.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSUtil.java
@@ -43,6 +43,7 @@
import java.security.SecureRandom;
import java.text.SimpleDateFormat;
import java.util.ArrayList;
+import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.Comparator;
@@ -574,10 +575,24 @@
}
return ret;
}
+
+ /**
+ * Get all of the RPC addresses of the individual NNs in a given nameservice.
+ *
+ * @param conf Configuration
+ * @param nsId the nameservice whose NNs addresses we want.
+ * @param defaultValue default address to return in case key is not found.
+ * @return A map from nnId -> RPC address of each NN in the nameservice.
+ */
+ public static Map<String, InetSocketAddress> getRpcAddressesForNameserviceId(
+ Configuration conf, String nsId, String defaultValue) {
+ return getAddressesForNameserviceId(conf, nsId, defaultValue,
+ DFS_NAMENODE_RPC_ADDRESS_KEY);
+ }
private static Map<String, InetSocketAddress> getAddressesForNameserviceId(
Configuration conf, String nsId, String defaultValue,
- String[] keys) {
+ String... keys) {
Collection<String> nnIds = getNameNodeIds(conf, nsId);
Map<String, InetSocketAddress> ret = Maps.newHashMap();
for (String nnId : emptyAsSingletonNull(nnIds)) {
@@ -1670,4 +1685,32 @@
}
return builder;
}
+
+ /**
+ * Assert that all objects in the collection are equal. Returns silently if
+ * so, throws an AssertionError if any object is not equal. All null values
+ * are considered equal.
+ *
+ * @param objects the collection of objects to check for equality.
+ */
+ public static void assertAllResultsEqual(Collection<?> objects) {
+ Object[] resultsArray = objects.toArray();
+
+ if (resultsArray.length == 0)
+ return;
+
+ for (int i = 0; i < resultsArray.length; i++) {
+ if (i == 0)
+ continue;
+ else {
+ Object currElement = resultsArray[i];
+ Object lastElement = resultsArray[i - 1];
+ if ((currElement == null && currElement != lastElement) ||
+ (currElement != null && !currElement.equals(lastElement))) {
+ throw new AssertionError("Not all elements match in results: " +
+ Arrays.toString(resultsArray));
+ }
+ }
+ }
+ }
}
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/HAUtil.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/HAUtil.java
index 7d53fb9..47ea821 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/HAUtil.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/HAUtil.java
@@ -26,22 +26,29 @@
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.UnresolvedLinkException;
+import org.apache.hadoop.hdfs.protocol.ClientProtocol;
import org.apache.hadoop.hdfs.protocol.HdfsConstants;
import org.apache.hadoop.hdfs.security.token.delegation.DelegationTokenIdentifier;
import org.apache.hadoop.hdfs.security.token.delegation.DelegationTokenSelector;
import org.apache.hadoop.hdfs.server.namenode.NameNode;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.ipc.RPC;
+import org.apache.hadoop.ipc.RemoteException;
+import org.apache.hadoop.ipc.StandbyException;
+import org.apache.hadoop.security.AccessControlException;
import org.apache.hadoop.security.SecurityUtil;
import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.security.token.Token;
+import java.io.FileNotFoundException;
import java.io.IOException;
import java.net.InetSocketAddress;
import java.net.URI;
import java.net.URISyntaxException;
import java.util.ArrayList;
import java.util.Collection;
+import java.util.List;
import java.util.Map;
import static org.apache.hadoop.hdfs.DFSConfigKeys.*;
@@ -305,4 +312,55 @@
DFSClient dfsClient = dfs.getClient();
return RPC.getServerAddress(dfsClient.getNamenode());
}
+
+ /**
+ * Get an RPC proxy for each NN in an HA nameservice. Used when a given RPC
+ * call should be made on every NN in an HA nameservice, not just the active.
+ *
+ * @param conf configuration
+ * @param nsId the nameservice to get all of the proxies for.
+ * @return a list of RPC proxies for each NN in the nameservice.
+ * @throws IOException in the event of error.
+ */
+ public static List<ClientProtocol> getProxiesForAllNameNodesInNameservice(
+ Configuration conf, String nsId) throws IOException {
+ Map<String, InetSocketAddress> nnAddresses =
+ DFSUtil.getRpcAddressesForNameserviceId(conf, nsId, null);
+
+ List<ClientProtocol> namenodes = new ArrayList<ClientProtocol>();
+ for (InetSocketAddress nnAddress : nnAddresses.values()) {
+ NameNodeProxies.ProxyAndInfo<ClientProtocol> proxyInfo = null;
+ proxyInfo = NameNodeProxies.createNonHAProxy(conf,
+ nnAddress, ClientProtocol.class,
+ UserGroupInformation.getCurrentUser(), false);
+ namenodes.add(proxyInfo.getProxy());
+ }
+ return namenodes;
+ }
+
+ /**
+ * Used to ensure that at least one of the given HA NNs is currently in the
+ * active state..
+ *
+ * @param namenodes list of RPC proxies for each NN to check.
+ * @return true if at least one NN is active, false if all are in the standby state.
+ * @throws IOException in the event of error.
+ */
+ public static boolean isAtLeastOneActive(List<ClientProtocol> namenodes)
+ throws IOException {
+ for (ClientProtocol namenode : namenodes) {
+ try {
+ namenode.getFileInfo("/");
+ return true;
+ } catch (RemoteException re) {
+ IOException cause = re.unwrapRemoteException();
+ if (cause instanceof StandbyException) {
+ // This is expected to happen for a standby NN.
+ } else {
+ throw re;
+ }
+ }
+ }
+ return false;
+ }
}
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/qjournal/client/AsyncLogger.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/qjournal/client/AsyncLogger.java
index 2f6baa3..a3a6387 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/qjournal/client/AsyncLogger.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/qjournal/client/AsyncLogger.java
@@ -27,6 +27,7 @@
import org.apache.hadoop.hdfs.qjournal.protocol.QJournalProtocolProtos.PrepareRecoveryResponseProto;
import org.apache.hadoop.hdfs.qjournal.protocol.QJournalProtocolProtos.SegmentStateProto;
import org.apache.hadoop.hdfs.qjournal.protocol.RequestInfo;
+import org.apache.hadoop.hdfs.server.common.StorageInfo;
import org.apache.hadoop.hdfs.server.protocol.NamespaceInfo;
import org.apache.hadoop.hdfs.server.protocol.RemoteEditLogManifest;
@@ -151,4 +152,17 @@
* StringBuilder. This is displayed on the NN web UI.
*/
public void appendReport(StringBuilder sb);
+
+ public ListenableFuture<Void> doPreUpgrade();
+
+ public ListenableFuture<Void> doUpgrade(StorageInfo sInfo);
+
+ public ListenableFuture<Void> doFinalize();
+
+ public ListenableFuture<Boolean> canRollBack(StorageInfo storage,
+ StorageInfo prevStorage, int targetLayoutVersion);
+
+ public ListenableFuture<Void> doRollback();
+
+ public ListenableFuture<Long> getJournalCTime();
}
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/qjournal/client/AsyncLoggerSet.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/qjournal/client/AsyncLoggerSet.java
index d891858..66a03c9 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/qjournal/client/AsyncLoggerSet.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/qjournal/client/AsyncLoggerSet.java
@@ -29,6 +29,7 @@
import org.apache.hadoop.hdfs.qjournal.protocol.QJournalProtocolProtos.NewEpochResponseProto;
import org.apache.hadoop.hdfs.qjournal.protocol.QJournalProtocolProtos.PrepareRecoveryResponseProto;
import org.apache.hadoop.hdfs.qjournal.protocol.QJournalProtocolProtos.SegmentStateProto;
+import org.apache.hadoop.hdfs.server.common.StorageInfo;
import org.apache.hadoop.hdfs.server.protocol.NamespaceInfo;
import org.apache.hadoop.hdfs.server.protocol.RemoteEditLogManifest;
@@ -308,4 +309,71 @@
}
return QuorumCall.create(calls);
}
+
+ QuorumCall<AsyncLogger, Void> doPreUpgrade() {
+ Map<AsyncLogger, ListenableFuture<Void>> calls =
+ Maps.newHashMap();
+ for (AsyncLogger logger : loggers) {
+ ListenableFuture<Void> future =
+ logger.doPreUpgrade();
+ calls.put(logger, future);
+ }
+ return QuorumCall.create(calls);
+ }
+
+ public QuorumCall<AsyncLogger, Void> doUpgrade(StorageInfo sInfo) {
+ Map<AsyncLogger, ListenableFuture<Void>> calls =
+ Maps.newHashMap();
+ for (AsyncLogger logger : loggers) {
+ ListenableFuture<Void> future =
+ logger.doUpgrade(sInfo);
+ calls.put(logger, future);
+ }
+ return QuorumCall.create(calls);
+ }
+
+ public QuorumCall<AsyncLogger, Void> doFinalize() {
+ Map<AsyncLogger, ListenableFuture<Void>> calls =
+ Maps.newHashMap();
+ for (AsyncLogger logger : loggers) {
+ ListenableFuture<Void> future =
+ logger.doFinalize();
+ calls.put(logger, future);
+ }
+ return QuorumCall.create(calls);
+ }
+
+ public QuorumCall<AsyncLogger, Boolean> canRollBack(StorageInfo storage,
+ StorageInfo prevStorage, int targetLayoutVersion) {
+ Map<AsyncLogger, ListenableFuture<Boolean>> calls =
+ Maps.newHashMap();
+ for (AsyncLogger logger : loggers) {
+ ListenableFuture<Boolean> future =
+ logger.canRollBack(storage, prevStorage, targetLayoutVersion);
+ calls.put(logger, future);
+ }
+ return QuorumCall.create(calls);
+ }
+
+ public QuorumCall<AsyncLogger, Void> doRollback() {
+ Map<AsyncLogger, ListenableFuture<Void>> calls =
+ Maps.newHashMap();
+ for (AsyncLogger logger : loggers) {
+ ListenableFuture<Void> future =
+ logger.doRollback();
+ calls.put(logger, future);
+ }
+ return QuorumCall.create(calls);
+ }
+
+ public QuorumCall<AsyncLogger, Long> getJournalCTime() {
+ Map<AsyncLogger, ListenableFuture<Long>> calls =
+ Maps.newHashMap();
+ for (AsyncLogger logger : loggers) {
+ ListenableFuture<Long> future = logger.getJournalCTime();
+ calls.put(logger, future);
+ }
+ return QuorumCall.create(calls);
+ }
+
}
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/qjournal/client/IPCLoggerChannel.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/qjournal/client/IPCLoggerChannel.java
index 3731f5a..2f1bff1 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/qjournal/client/IPCLoggerChannel.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/qjournal/client/IPCLoggerChannel.java
@@ -46,6 +46,7 @@
import org.apache.hadoop.hdfs.qjournal.protocolPB.QJournalProtocolPB;
import org.apache.hadoop.hdfs.qjournal.protocolPB.QJournalProtocolTranslatorPB;
import org.apache.hadoop.hdfs.qjournal.server.GetJournalEditServlet;
+import org.apache.hadoop.hdfs.server.common.StorageInfo;
import org.apache.hadoop.hdfs.server.protocol.NamespaceInfo;
import org.apache.hadoop.hdfs.server.protocol.RemoteEditLogManifest;
import org.apache.hadoop.ipc.ProtobufRpcEngine;
@@ -564,6 +565,72 @@
}
});
}
+
+ @Override
+ public ListenableFuture<Void> doPreUpgrade() {
+ return executor.submit(new Callable<Void>() {
+ @Override
+ public Void call() throws IOException {
+ getProxy().doPreUpgrade(journalId);
+ return null;
+ }
+ });
+ }
+
+ @Override
+ public ListenableFuture<Void> doUpgrade(final StorageInfo sInfo) {
+ return executor.submit(new Callable<Void>() {
+ @Override
+ public Void call() throws IOException {
+ getProxy().doUpgrade(journalId, sInfo);
+ return null;
+ }
+ });
+ }
+
+ @Override
+ public ListenableFuture<Void> doFinalize() {
+ return executor.submit(new Callable<Void>() {
+ @Override
+ public Void call() throws IOException {
+ getProxy().doFinalize(journalId);
+ return null;
+ }
+ });
+ }
+
+ @Override
+ public ListenableFuture<Boolean> canRollBack(final StorageInfo storage,
+ final StorageInfo prevStorage, final int targetLayoutVersion) {
+ return executor.submit(new Callable<Boolean>() {
+ @Override
+ public Boolean call() throws IOException {
+ return getProxy().canRollBack(journalId, storage, prevStorage,
+ targetLayoutVersion);
+ }
+ });
+ }
+
+ @Override
+ public ListenableFuture<Void> doRollback() {
+ return executor.submit(new Callable<Void>() {
+ @Override
+ public Void call() throws IOException {
+ getProxy().doRollback(journalId);
+ return null;
+ }
+ });
+ }
+
+ @Override
+ public ListenableFuture<Long> getJournalCTime() {
+ return executor.submit(new Callable<Long>() {
+ @Override
+ public Long call() throws IOException {
+ return getProxy().getJournalCTime(journalId);
+ }
+ });
+ }
@Override
public String toString() {
@@ -636,4 +703,5 @@
private boolean hasHttpServerEndPoint() {
return httpServerURL != null;
}
+
}
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/qjournal/client/QuorumJournalManager.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/qjournal/client/QuorumJournalManager.java
index 9f2cd56..befb876 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/qjournal/client/QuorumJournalManager.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/qjournal/client/QuorumJournalManager.java
@@ -34,10 +34,13 @@
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hdfs.DFSConfigKeys;
+import org.apache.hadoop.hdfs.DFSUtil;
import org.apache.hadoop.hdfs.qjournal.protocol.QJournalProtocolProtos.GetJournalStateResponseProto;
import org.apache.hadoop.hdfs.qjournal.protocol.QJournalProtocolProtos.NewEpochResponseProto;
import org.apache.hadoop.hdfs.qjournal.protocol.QJournalProtocolProtos.PrepareRecoveryResponseProto;
import org.apache.hadoop.hdfs.qjournal.protocol.QJournalProtocolProtos.SegmentStateProto;
+import org.apache.hadoop.hdfs.server.common.Storage;
+import org.apache.hadoop.hdfs.server.common.StorageInfo;
import org.apache.hadoop.hdfs.server.namenode.EditLogFileInputStream;
import org.apache.hadoop.hdfs.server.namenode.EditLogInputStream;
import org.apache.hadoop.hdfs.server.namenode.EditLogOutputStream;
@@ -77,8 +80,14 @@
// Since these don't occur during normal operation, we can
// use rather lengthy timeouts, and don't need to make them
// configurable.
- private static final int FORMAT_TIMEOUT_MS = 60000;
- private static final int HASDATA_TIMEOUT_MS = 60000;
+ private static final int FORMAT_TIMEOUT_MS = 60000;
+ private static final int HASDATA_TIMEOUT_MS = 60000;
+ private static final int CAN_ROLL_BACK_TIMEOUT_MS = 60000;
+ private static final int FINALIZE_TIMEOUT_MS = 60000;
+ private static final int PRE_UPGRADE_TIMEOUT_MS = 60000;
+ private static final int ROLL_BACK_TIMEOUT_MS = 60000;
+ private static final int UPGRADE_TIMEOUT_MS = 60000;
+ private static final int GET_JOURNAL_CTIME_TIMEOUT_MS = 60000;
private final Configuration conf;
private final URI uri;
@@ -492,4 +501,131 @@
return loggers;
}
+ @Override
+ public void doPreUpgrade() throws IOException {
+ QuorumCall<AsyncLogger, Void> call = loggers.doPreUpgrade();
+ try {
+ call.waitFor(loggers.size(), loggers.size(), 0, PRE_UPGRADE_TIMEOUT_MS,
+ "doPreUpgrade");
+
+ if (call.countExceptions() > 0) {
+ call.rethrowException("Could not do pre-upgrade of one or more JournalNodes");
+ }
+ } catch (InterruptedException e) {
+ throw new IOException("Interrupted waiting for doPreUpgrade() response");
+ } catch (TimeoutException e) {
+ throw new IOException("Timed out waiting for doPreUpgrade() response");
+ }
+ }
+
+ @Override
+ public void doUpgrade(Storage storage) throws IOException {
+ QuorumCall<AsyncLogger, Void> call = loggers.doUpgrade(storage);
+ try {
+ call.waitFor(loggers.size(), loggers.size(), 0, UPGRADE_TIMEOUT_MS,
+ "doUpgrade");
+
+ if (call.countExceptions() > 0) {
+ call.rethrowException("Could not perform upgrade of one or more JournalNodes");
+ }
+ } catch (InterruptedException e) {
+ throw new IOException("Interrupted waiting for doUpgrade() response");
+ } catch (TimeoutException e) {
+ throw new IOException("Timed out waiting for doUpgrade() response");
+ }
+ }
+
+ @Override
+ public void doFinalize() throws IOException {
+ QuorumCall<AsyncLogger, Void> call = loggers.doFinalize();
+ try {
+ call.waitFor(loggers.size(), loggers.size(), 0, FINALIZE_TIMEOUT_MS,
+ "doFinalize");
+
+ if (call.countExceptions() > 0) {
+ call.rethrowException("Could not finalize one or more JournalNodes");
+ }
+ } catch (InterruptedException e) {
+ throw new IOException("Interrupted waiting for doFinalize() response");
+ } catch (TimeoutException e) {
+ throw new IOException("Timed out waiting for doFinalize() response");
+ }
+ }
+
+ @Override
+ public boolean canRollBack(StorageInfo storage, StorageInfo prevStorage,
+ int targetLayoutVersion) throws IOException {
+ QuorumCall<AsyncLogger, Boolean> call = loggers.canRollBack(storage,
+ prevStorage, targetLayoutVersion);
+ try {
+ call.waitFor(loggers.size(), loggers.size(), 0, CAN_ROLL_BACK_TIMEOUT_MS,
+ "lockSharedStorage");
+
+ if (call.countExceptions() > 0) {
+ call.rethrowException("Could not check if roll back possible for"
+ + " one or more JournalNodes");
+ }
+
+ // Either they all return the same thing or this call fails, so we can
+ // just return the first result.
+ DFSUtil.assertAllResultsEqual(call.getResults().values());
+ for (Boolean result : call.getResults().values()) {
+ return result;
+ }
+ } catch (InterruptedException e) {
+ throw new IOException("Interrupted waiting for lockSharedStorage() " +
+ "response");
+ } catch (TimeoutException e) {
+ throw new IOException("Timed out waiting for lockSharedStorage() " +
+ "response");
+ }
+
+ throw new AssertionError("Unreachable code.");
+ }
+
+ @Override
+ public void doRollback() throws IOException {
+ QuorumCall<AsyncLogger, Void> call = loggers.doRollback();
+ try {
+ call.waitFor(loggers.size(), loggers.size(), 0, ROLL_BACK_TIMEOUT_MS,
+ "doRollback");
+
+ if (call.countExceptions() > 0) {
+ call.rethrowException("Could not perform rollback of one or more JournalNodes");
+ }
+ } catch (InterruptedException e) {
+ throw new IOException("Interrupted waiting for doFinalize() response");
+ } catch (TimeoutException e) {
+ throw new IOException("Timed out waiting for doFinalize() response");
+ }
+ }
+
+ @Override
+ public long getJournalCTime() throws IOException {
+ QuorumCall<AsyncLogger, Long> call = loggers.getJournalCTime();
+ try {
+ call.waitFor(loggers.size(), loggers.size(), 0,
+ GET_JOURNAL_CTIME_TIMEOUT_MS, "getJournalCTime");
+
+ if (call.countExceptions() > 0) {
+ call.rethrowException("Could not journal CTime for one "
+ + "more JournalNodes");
+ }
+
+ // Either they all return the same thing or this call fails, so we can
+ // just return the first result.
+ DFSUtil.assertAllResultsEqual(call.getResults().values());
+ for (Long result : call.getResults().values()) {
+ return result;
+ }
+ } catch (InterruptedException e) {
+ throw new IOException("Interrupted waiting for getJournalCTime() " +
+ "response");
+ } catch (TimeoutException e) {
+ throw new IOException("Timed out waiting for getJournalCTime() " +
+ "response");
+ }
+
+ throw new AssertionError("Unreachable code.");
+ }
}
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/qjournal/protocol/QJournalProtocol.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/qjournal/protocol/QJournalProtocol.java
index 41600ac..c7ab691 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/qjournal/protocol/QJournalProtocol.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/qjournal/protocol/QJournalProtocol.java
@@ -29,6 +29,7 @@
import org.apache.hadoop.hdfs.qjournal.protocol.QJournalProtocolProtos.PrepareRecoveryResponseProto;
import org.apache.hadoop.hdfs.qjournal.protocol.QJournalProtocolProtos.SegmentStateProto;
import org.apache.hadoop.hdfs.qjournal.server.JournalNode;
+import org.apache.hadoop.hdfs.server.common.StorageInfo;
import org.apache.hadoop.hdfs.server.namenode.JournalManager;
import org.apache.hadoop.hdfs.server.protocol.NamespaceInfo;
import org.apache.hadoop.security.KerberosInfo;
@@ -143,4 +144,17 @@
*/
public void acceptRecovery(RequestInfo reqInfo,
SegmentStateProto stateToAccept, URL fromUrl) throws IOException;
+
+ public void doPreUpgrade(String journalId) throws IOException;
+
+ public void doUpgrade(String journalId, StorageInfo sInfo) throws IOException;
+
+ public void doFinalize(String journalId) throws IOException;
+
+ public Boolean canRollBack(String journalId, StorageInfo storage,
+ StorageInfo prevStorage, int targetLayoutVersion) throws IOException;
+
+ public void doRollback(String journalId) throws IOException;
+
+ public Long getJournalCTime(String journalId) throws IOException;
}
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/qjournal/protocolPB/QJournalProtocolServerSideTranslatorPB.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/qjournal/protocolPB/QJournalProtocolServerSideTranslatorPB.java
index 47d8100..b118aba 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/qjournal/protocolPB/QJournalProtocolServerSideTranslatorPB.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/qjournal/protocolPB/QJournalProtocolServerSideTranslatorPB.java
@@ -17,17 +17,35 @@
*/
package org.apache.hadoop.hdfs.qjournal.protocolPB;
+import java.io.IOException;
+import java.net.URL;
+
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.hdfs.protocol.HdfsConstants;
+import org.apache.hadoop.hdfs.protocolPB.JournalProtocolPB;
import org.apache.hadoop.hdfs.protocolPB.PBHelper;
import org.apache.hadoop.hdfs.qjournal.protocol.QJournalProtocol;
import org.apache.hadoop.hdfs.qjournal.protocol.QJournalProtocolProtos;
+import org.apache.hadoop.hdfs.qjournal.protocol.QJournalProtocolProtos.AcceptRecoveryRequestProto;
+import org.apache.hadoop.hdfs.qjournal.protocol.QJournalProtocolProtos.AcceptRecoveryResponseProto;
+import org.apache.hadoop.hdfs.qjournal.protocol.QJournalProtocolProtos.CanRollBackRequestProto;
+import org.apache.hadoop.hdfs.qjournal.protocol.QJournalProtocolProtos.CanRollBackResponseProto;
+import org.apache.hadoop.hdfs.qjournal.protocol.QJournalProtocolProtos.DoFinalizeRequestProto;
+import org.apache.hadoop.hdfs.qjournal.protocol.QJournalProtocolProtos.DoFinalizeResponseProto;
+import org.apache.hadoop.hdfs.qjournal.protocol.QJournalProtocolProtos.DoPreUpgradeRequestProto;
+import org.apache.hadoop.hdfs.qjournal.protocol.QJournalProtocolProtos.DoPreUpgradeResponseProto;
+import org.apache.hadoop.hdfs.qjournal.protocol.QJournalProtocolProtos.DoRollbackRequestProto;
+import org.apache.hadoop.hdfs.qjournal.protocol.QJournalProtocolProtos.DoRollbackResponseProto;
+import org.apache.hadoop.hdfs.qjournal.protocol.QJournalProtocolProtos.DoUpgradeRequestProto;
+import org.apache.hadoop.hdfs.qjournal.protocol.QJournalProtocolProtos.DoUpgradeResponseProto;
import org.apache.hadoop.hdfs.qjournal.protocol.QJournalProtocolProtos.FinalizeLogSegmentRequestProto;
import org.apache.hadoop.hdfs.qjournal.protocol.QJournalProtocolProtos.FinalizeLogSegmentResponseProto;
import org.apache.hadoop.hdfs.qjournal.protocol.QJournalProtocolProtos.FormatRequestProto;
import org.apache.hadoop.hdfs.qjournal.protocol.QJournalProtocolProtos.FormatResponseProto;
import org.apache.hadoop.hdfs.qjournal.protocol.QJournalProtocolProtos.GetEditLogManifestRequestProto;
import org.apache.hadoop.hdfs.qjournal.protocol.QJournalProtocolProtos.GetEditLogManifestResponseProto;
+import org.apache.hadoop.hdfs.qjournal.protocol.QJournalProtocolProtos.GetJournalCTimeRequestProto;
+import org.apache.hadoop.hdfs.qjournal.protocol.QJournalProtocolProtos.GetJournalCTimeResponseProto;
import org.apache.hadoop.hdfs.qjournal.protocol.QJournalProtocolProtos.GetJournalStateRequestProto;
import org.apache.hadoop.hdfs.qjournal.protocol.QJournalProtocolProtos.GetJournalStateResponseProto;
import org.apache.hadoop.hdfs.qjournal.protocol.QJournalProtocolProtos.HeartbeatRequestProto;
@@ -39,8 +57,6 @@
import org.apache.hadoop.hdfs.qjournal.protocol.QJournalProtocolProtos.JournalResponseProto;
import org.apache.hadoop.hdfs.qjournal.protocol.QJournalProtocolProtos.NewEpochRequestProto;
import org.apache.hadoop.hdfs.qjournal.protocol.QJournalProtocolProtos.NewEpochResponseProto;
-import org.apache.hadoop.hdfs.qjournal.protocol.QJournalProtocolProtos.AcceptRecoveryRequestProto;
-import org.apache.hadoop.hdfs.qjournal.protocol.QJournalProtocolProtos.AcceptRecoveryResponseProto;
import org.apache.hadoop.hdfs.qjournal.protocol.QJournalProtocolProtos.PrepareRecoveryRequestProto;
import org.apache.hadoop.hdfs.qjournal.protocol.QJournalProtocolProtos.PrepareRecoveryResponseProto;
import org.apache.hadoop.hdfs.qjournal.protocol.QJournalProtocolProtos.PurgeLogsRequestProto;
@@ -48,13 +64,11 @@
import org.apache.hadoop.hdfs.qjournal.protocol.QJournalProtocolProtos.StartLogSegmentRequestProto;
import org.apache.hadoop.hdfs.qjournal.protocol.QJournalProtocolProtos.StartLogSegmentResponseProto;
import org.apache.hadoop.hdfs.qjournal.protocol.RequestInfo;
+import org.apache.hadoop.hdfs.server.protocol.JournalProtocol;
import com.google.protobuf.RpcController;
import com.google.protobuf.ServiceException;
-import java.io.IOException;
-import java.net.URL;
-
/**
* Implementation for protobuf service that forwards requests
* received on {@link JournalProtocolPB} to the
@@ -244,4 +258,79 @@
reqInfo.hasCommittedTxId() ?
reqInfo.getCommittedTxId() : HdfsConstants.INVALID_TXID);
}
+
+
+ @Override
+ public DoPreUpgradeResponseProto doPreUpgrade(RpcController controller,
+ DoPreUpgradeRequestProto request) throws ServiceException {
+ try {
+ impl.doPreUpgrade(convert(request.getJid()));
+ return DoPreUpgradeResponseProto.getDefaultInstance();
+ } catch (IOException e) {
+ throw new ServiceException(e);
+ }
+ }
+
+ @Override
+ public DoUpgradeResponseProto doUpgrade(RpcController controller,
+ DoUpgradeRequestProto request) throws ServiceException {
+ try {
+ impl.doUpgrade(convert(request.getJid()),
+ PBHelper.convert(request.getSInfo()));
+ return DoUpgradeResponseProto.getDefaultInstance();
+ } catch (IOException e) {
+ throw new ServiceException(e);
+ }
+ }
+
+ @Override
+ public DoFinalizeResponseProto doFinalize(RpcController controller,
+ DoFinalizeRequestProto request) throws ServiceException {
+ try {
+ impl.doFinalize(convert(request.getJid()));
+ return DoFinalizeResponseProto.getDefaultInstance();
+ } catch (IOException e) {
+ throw new ServiceException(e);
+ }
+ }
+
+ @Override
+ public CanRollBackResponseProto canRollBack(RpcController controller,
+ CanRollBackRequestProto request) throws ServiceException {
+ try {
+ Boolean result = impl.canRollBack(convert(request.getJid()),
+ PBHelper.convert(request.getStorage()),
+ PBHelper.convert(request.getPrevStorage()),
+ request.getTargetLayoutVersion());
+ return CanRollBackResponseProto.newBuilder()
+ .setCanRollBack(result)
+ .build();
+ } catch (IOException e) {
+ throw new ServiceException(e);
+ }
+ }
+
+ @Override
+ public DoRollbackResponseProto doRollback(RpcController controller, DoRollbackRequestProto request)
+ throws ServiceException {
+ try {
+ impl.doRollback(convert(request.getJid()));
+ return DoRollbackResponseProto.getDefaultInstance();
+ } catch (IOException e) {
+ throw new ServiceException(e);
+ }
+ }
+
+ @Override
+ public GetJournalCTimeResponseProto getJournalCTime(RpcController controller,
+ GetJournalCTimeRequestProto request) throws ServiceException {
+ try {
+ Long resultCTime = impl.getJournalCTime(convert(request.getJid()));
+ return GetJournalCTimeResponseProto.newBuilder()
+ .setResultCTime(resultCTime)
+ .build();
+ } catch (IOException e) {
+ throw new ServiceException(e);
+ }
+ }
}
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/qjournal/protocolPB/QJournalProtocolTranslatorPB.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/qjournal/protocolPB/QJournalProtocolTranslatorPB.java
index f111933..6127478 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/qjournal/protocolPB/QJournalProtocolTranslatorPB.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/qjournal/protocolPB/QJournalProtocolTranslatorPB.java
@@ -23,13 +23,23 @@
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.classification.InterfaceStability;
+import org.apache.hadoop.hdfs.protocolPB.JournalProtocolPB;
import org.apache.hadoop.hdfs.protocolPB.PBHelper;
import org.apache.hadoop.hdfs.qjournal.protocol.QJournalProtocol;
import org.apache.hadoop.hdfs.qjournal.protocol.QJournalProtocolProtos;
+import org.apache.hadoop.hdfs.qjournal.protocol.QJournalProtocolProtos.AcceptRecoveryRequestProto;
+import org.apache.hadoop.hdfs.qjournal.protocol.QJournalProtocolProtos.CanRollBackRequestProto;
+import org.apache.hadoop.hdfs.qjournal.protocol.QJournalProtocolProtos.CanRollBackResponseProto;
+import org.apache.hadoop.hdfs.qjournal.protocol.QJournalProtocolProtos.DoFinalizeRequestProto;
+import org.apache.hadoop.hdfs.qjournal.protocol.QJournalProtocolProtos.DoPreUpgradeRequestProto;
+import org.apache.hadoop.hdfs.qjournal.protocol.QJournalProtocolProtos.DoRollbackRequestProto;
+import org.apache.hadoop.hdfs.qjournal.protocol.QJournalProtocolProtos.DoUpgradeRequestProto;
import org.apache.hadoop.hdfs.qjournal.protocol.QJournalProtocolProtos.FinalizeLogSegmentRequestProto;
import org.apache.hadoop.hdfs.qjournal.protocol.QJournalProtocolProtos.FormatRequestProto;
import org.apache.hadoop.hdfs.qjournal.protocol.QJournalProtocolProtos.GetEditLogManifestRequestProto;
import org.apache.hadoop.hdfs.qjournal.protocol.QJournalProtocolProtos.GetEditLogManifestResponseProto;
+import org.apache.hadoop.hdfs.qjournal.protocol.QJournalProtocolProtos.GetJournalCTimeRequestProto;
+import org.apache.hadoop.hdfs.qjournal.protocol.QJournalProtocolProtos.GetJournalCTimeResponseProto;
import org.apache.hadoop.hdfs.qjournal.protocol.QJournalProtocolProtos.GetJournalStateRequestProto;
import org.apache.hadoop.hdfs.qjournal.protocol.QJournalProtocolProtos.GetJournalStateResponseProto;
import org.apache.hadoop.hdfs.qjournal.protocol.QJournalProtocolProtos.HeartbeatRequestProto;
@@ -39,7 +49,6 @@
import org.apache.hadoop.hdfs.qjournal.protocol.QJournalProtocolProtos.JournalRequestProto;
import org.apache.hadoop.hdfs.qjournal.protocol.QJournalProtocolProtos.NewEpochRequestProto;
import org.apache.hadoop.hdfs.qjournal.protocol.QJournalProtocolProtos.NewEpochResponseProto;
-import org.apache.hadoop.hdfs.qjournal.protocol.QJournalProtocolProtos.AcceptRecoveryRequestProto;
import org.apache.hadoop.hdfs.qjournal.protocol.QJournalProtocolProtos.PrepareRecoveryRequestProto;
import org.apache.hadoop.hdfs.qjournal.protocol.QJournalProtocolProtos.PrepareRecoveryResponseProto;
import org.apache.hadoop.hdfs.qjournal.protocol.QJournalProtocolProtos.PurgeLogsRequestProto;
@@ -47,6 +56,7 @@
import org.apache.hadoop.hdfs.qjournal.protocol.QJournalProtocolProtos.SegmentStateProto;
import org.apache.hadoop.hdfs.qjournal.protocol.QJournalProtocolProtos.StartLogSegmentRequestProto;
import org.apache.hadoop.hdfs.qjournal.protocol.RequestInfo;
+import org.apache.hadoop.hdfs.server.common.StorageInfo;
import org.apache.hadoop.hdfs.server.protocol.JournalProtocol;
import org.apache.hadoop.hdfs.server.protocol.NamespaceInfo;
import org.apache.hadoop.ipc.ProtobufHelper;
@@ -277,4 +287,85 @@
RPC.getProtocolVersion(QJournalProtocolPB.class), methodName);
}
+ @Override
+ public void doPreUpgrade(String jid) throws IOException {
+ try {
+ rpcProxy.doPreUpgrade(NULL_CONTROLLER,
+ DoPreUpgradeRequestProto.newBuilder()
+ .setJid(convertJournalId(jid))
+ .build());
+ } catch (ServiceException e) {
+ throw ProtobufHelper.getRemoteException(e);
+ }
+ }
+
+ @Override
+ public void doUpgrade(String journalId, StorageInfo sInfo) throws IOException {
+ try {
+ rpcProxy.doUpgrade(NULL_CONTROLLER,
+ DoUpgradeRequestProto.newBuilder()
+ .setJid(convertJournalId(journalId))
+ .setSInfo(PBHelper.convert(sInfo))
+ .build());
+ } catch (ServiceException e) {
+ throw ProtobufHelper.getRemoteException(e);
+ }
+ }
+
+ @Override
+ public void doFinalize(String jid) throws IOException {
+ try {
+ rpcProxy.doFinalize(NULL_CONTROLLER,
+ DoFinalizeRequestProto.newBuilder()
+ .setJid(convertJournalId(jid))
+ .build());
+ } catch (ServiceException e) {
+ throw ProtobufHelper.getRemoteException(e);
+ }
+ }
+
+ @Override
+ public Boolean canRollBack(String journalId, StorageInfo storage,
+ StorageInfo prevStorage, int targetLayoutVersion) throws IOException {
+ try {
+ CanRollBackResponseProto response = rpcProxy.canRollBack(
+ NULL_CONTROLLER,
+ CanRollBackRequestProto.newBuilder()
+ .setJid(convertJournalId(journalId))
+ .setStorage(PBHelper.convert(storage))
+ .setPrevStorage(PBHelper.convert(prevStorage))
+ .setTargetLayoutVersion(targetLayoutVersion)
+ .build());
+ return response.getCanRollBack();
+ } catch (ServiceException e) {
+ throw ProtobufHelper.getRemoteException(e);
+ }
+ }
+
+ @Override
+ public void doRollback(String journalId) throws IOException {
+ try {
+ rpcProxy.doRollback(NULL_CONTROLLER,
+ DoRollbackRequestProto.newBuilder()
+ .setJid(convertJournalId(journalId))
+ .build());
+ } catch (ServiceException e) {
+ throw ProtobufHelper.getRemoteException(e);
+ }
+ }
+
+ @Override
+ public Long getJournalCTime(String journalId) throws IOException {
+ try {
+ GetJournalCTimeResponseProto response = rpcProxy.getJournalCTime(
+ NULL_CONTROLLER,
+ GetJournalCTimeRequestProto.newBuilder()
+ .setJid(convertJournalId(journalId))
+ .build());
+ return response.getResultCTime();
+ } catch (ServiceException e) {
+ throw ProtobufHelper.getRemoteException(e);
+ }
+ }
+
}
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/qjournal/server/GetJournalEditServlet.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/qjournal/server/GetJournalEditServlet.java
index 36135cb..2571670 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/qjournal/server/GetJournalEditServlet.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/qjournal/server/GetJournalEditServlet.java
@@ -40,6 +40,7 @@
import org.apache.hadoop.hdfs.DFSUtil;
import org.apache.hadoop.hdfs.qjournal.client.QuorumJournalManager;
import org.apache.hadoop.hdfs.server.common.JspHelper;
+import org.apache.hadoop.hdfs.server.common.StorageInfo;
import org.apache.hadoop.hdfs.server.namenode.FileJournalManager;
import org.apache.hadoop.hdfs.server.namenode.FileJournalManager.EditLogFile;
import org.apache.hadoop.hdfs.server.namenode.GetImageServlet;
@@ -139,20 +140,26 @@
private boolean checkStorageInfoOrSendError(JNStorage storage,
HttpServletRequest request, HttpServletResponse response)
throws IOException {
- String myStorageInfoString = storage.toColonSeparatedString();
+ int myNsId = storage.getNamespaceID();
+ String myClusterId = storage.getClusterID();
+
String theirStorageInfoString = StringEscapeUtils.escapeHtml(
request.getParameter(STORAGEINFO_PARAM));
- if (theirStorageInfoString != null
- && !myStorageInfoString.equals(theirStorageInfoString)) {
- String msg = "This node has storage info '" + myStorageInfoString
- + "' but the requesting node expected '"
- + theirStorageInfoString + "'";
-
- response.sendError(HttpServletResponse.SC_FORBIDDEN, msg);
- LOG.warn("Received an invalid request file transfer request from " +
- request.getRemoteAddr() + ": " + msg);
- return false;
+ if (theirStorageInfoString != null) {
+ int theirNsId = StorageInfo.getNsIdFromColonSeparatedString(
+ theirStorageInfoString);
+ String theirClusterId = StorageInfo.getClusterIdFromColonSeparatedString(
+ theirStorageInfoString);
+ if (myNsId != theirNsId || !myClusterId.equals(theirClusterId)) {
+ String msg = "This node has namespaceId '" + myNsId + " and clusterId '"
+ + myClusterId + "' but the requesting node expected '" + theirNsId
+ + "' and '" + theirClusterId + "'";
+ response.sendError(HttpServletResponse.SC_FORBIDDEN, msg);
+ LOG.warn("Received an invalid request file transfer request from " +
+ request.getRemoteAddr() + ": " + msg);
+ return false;
+ }
}
return true;
}
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/qjournal/server/JNStorage.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/qjournal/server/JNStorage.java
index 347ac53..e972fe0 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/qjournal/server/JNStorage.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/qjournal/server/JNStorage.java
@@ -130,6 +130,10 @@
return new File(sd.getCurrentDir(), "paxos");
}
+ File getRoot() {
+ return sd.getRoot();
+ }
+
/**
* Remove any log files and associated paxos files which are older than
* the given txid.
@@ -182,12 +186,15 @@
unlockAll();
sd.clearDirectory();
writeProperties(sd);
+ createPaxosDir();
+ analyzeStorage();
+ }
+
+ void createPaxosDir() throws IOException {
if (!getPaxosDir().mkdirs()) {
throw new IOException("Could not create paxos dir: " + getPaxosDir());
}
- analyzeStorage();
}
-
void analyzeStorage() throws IOException {
this.state = sd.analyzeStorage(StartupOption.REGULAR, this);
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/qjournal/server/Journal.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/qjournal/server/Journal.java
index cf83cef..e1bb69d 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/qjournal/server/Journal.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/qjournal/server/Journal.java
@@ -37,12 +37,14 @@
import org.apache.hadoop.hdfs.qjournal.protocol.JournalNotFormattedException;
import org.apache.hadoop.hdfs.qjournal.protocol.JournalOutOfSyncException;
import org.apache.hadoop.hdfs.qjournal.protocol.QJournalProtocol;
+import org.apache.hadoop.hdfs.qjournal.protocol.QJournalProtocolProtos;
import org.apache.hadoop.hdfs.qjournal.protocol.QJournalProtocolProtos.NewEpochResponseProto;
import org.apache.hadoop.hdfs.qjournal.protocol.QJournalProtocolProtos.PersistedRecoveryPaxosData;
import org.apache.hadoop.hdfs.qjournal.protocol.QJournalProtocolProtos.PrepareRecoveryResponseProto;
import org.apache.hadoop.hdfs.qjournal.protocol.QJournalProtocolProtos.SegmentStateProto;
import org.apache.hadoop.hdfs.qjournal.protocol.RequestInfo;
import org.apache.hadoop.hdfs.server.common.StorageErrorReporter;
+import org.apache.hadoop.hdfs.server.common.StorageInfo;
import org.apache.hadoop.hdfs.server.namenode.EditLogOutputStream;
import org.apache.hadoop.hdfs.server.namenode.FileJournalManager;
import org.apache.hadoop.hdfs.server.namenode.FileJournalManager.EditLogFile;
@@ -73,7 +75,7 @@
* Each such journal is entirely independent despite being hosted by
* the same JVM.
*/
-class Journal implements Closeable {
+public class Journal implements Closeable {
static final Log LOG = LogFactory.getLog(Journal.class);
@@ -122,8 +124,8 @@
*/
private BestEffortLongFile committedTxnId;
- private static final String LAST_PROMISED_FILENAME = "last-promised-epoch";
- private static final String LAST_WRITER_EPOCH = "last-writer-epoch";
+ public static final String LAST_PROMISED_FILENAME = "last-promised-epoch";
+ public static final String LAST_WRITER_EPOCH = "last-writer-epoch";
private static final String COMMITTED_TXID_FILENAME = "committed-txid";
private final FileJournalManager fjm;
@@ -627,7 +629,7 @@
}
/**
- * @see QJournalProtocol#getEditLogManifest(String, long)
+ * @see QJournalProtocol#getEditLogManifest(String, long, boolean)
*/
public RemoteEditLogManifest getEditLogManifest(long sinceTxId,
boolean inProgressOk) throws IOException {
@@ -728,7 +730,7 @@
}
/**
- * @see QJournalProtocol#acceptRecovery(RequestInfo, SegmentStateProto, URL)
+ * @see QJournalProtocol#acceptRecovery(RequestInfo, QJournalProtocolProtos.SegmentStateProto, URL)
*/
public synchronized void acceptRecovery(RequestInfo reqInfo,
SegmentStateProto segment, URL fromUrl)
@@ -980,4 +982,62 @@
}
}
}
+
+ public synchronized void doPreUpgrade() throws IOException {
+ storage.getJournalManager().doPreUpgrade();
+ }
+
+ public synchronized void doUpgrade(StorageInfo sInfo) throws IOException {
+ long oldCTime = storage.getCTime();
+ storage.cTime = sInfo.cTime;
+ int oldLV = storage.getLayoutVersion();
+ storage.layoutVersion = sInfo.layoutVersion;
+ LOG.info("Starting upgrade of edits directory: "
+ + ".\n old LV = " + oldLV
+ + "; old CTime = " + oldCTime
+ + ".\n new LV = " + storage.getLayoutVersion()
+ + "; new CTime = " + storage.getCTime());
+ storage.getJournalManager().doUpgrade(storage);
+ storage.createPaxosDir();
+
+ // Copy over the contents of the epoch data files to the new dir.
+ File currentDir = storage.getSingularStorageDir().getCurrentDir();
+ File previousDir = storage.getSingularStorageDir().getPreviousDir();
+
+ PersistentLongFile prevLastPromisedEpoch = new PersistentLongFile(
+ new File(previousDir, LAST_PROMISED_FILENAME), 0);
+ PersistentLongFile prevLastWriterEpoch = new PersistentLongFile(
+ new File(previousDir, LAST_WRITER_EPOCH), 0);
+
+ lastPromisedEpoch = new PersistentLongFile(
+ new File(currentDir, LAST_PROMISED_FILENAME), 0);
+ lastWriterEpoch = new PersistentLongFile(
+ new File(currentDir, LAST_WRITER_EPOCH), 0);
+
+ lastPromisedEpoch.set(prevLastPromisedEpoch.get());
+ lastWriterEpoch.set(prevLastWriterEpoch.get());
+ }
+
+ public synchronized void doFinalize() throws IOException {
+ LOG.info("Finalizing upgrade for journal "
+ + storage.getRoot() + "."
+ + (storage.getLayoutVersion()==0 ? "" :
+ "\n cur LV = " + storage.getLayoutVersion()
+ + "; cur CTime = " + storage.getCTime()));
+ storage.getJournalManager().doFinalize();
+ }
+
+ public Boolean canRollBack(StorageInfo storage, StorageInfo prevStorage,
+ int targetLayoutVersion) throws IOException {
+ return this.storage.getJournalManager().canRollBack(storage, prevStorage,
+ targetLayoutVersion);
+ }
+
+ public void doRollback() throws IOException {
+ storage.getJournalManager().doRollback();
+ }
+
+ public Long getJournalCTime() throws IOException {
+ return storage.getJournalManager().getJournalCTime();
+ }
}
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/qjournal/server/JournalNode.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/qjournal/server/JournalNode.java
index c43edb9..9f1665b 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/qjournal/server/JournalNode.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/qjournal/server/JournalNode.java
@@ -35,6 +35,7 @@
import org.apache.hadoop.hdfs.HdfsConfiguration;
import org.apache.hadoop.hdfs.qjournal.client.QuorumJournalManager;
import org.apache.hadoop.hdfs.server.common.StorageErrorReporter;
+import org.apache.hadoop.hdfs.server.common.StorageInfo;
import org.apache.hadoop.io.IOUtils;
import org.apache.hadoop.metrics2.lib.DefaultMetricsSystem;
import org.apache.hadoop.metrics2.source.JvmMetrics;
@@ -285,4 +286,31 @@
StringUtils.startupShutdownMessage(JournalNode.class, args, LOG);
System.exit(ToolRunner.run(new JournalNode(), args));
}
+
+ public void doPreUpgrade(String journalId) throws IOException {
+ getOrCreateJournal(journalId).doPreUpgrade();
+ }
+
+ public void doUpgrade(String journalId, StorageInfo sInfo) throws IOException {
+ getOrCreateJournal(journalId).doUpgrade(sInfo);
+ }
+
+ public void doFinalize(String journalId) throws IOException {
+ getOrCreateJournal(journalId).doFinalize();
+ }
+
+ public Boolean canRollBack(String journalId, StorageInfo storage,
+ StorageInfo prevStorage, int targetLayoutVersion) throws IOException {
+ return getOrCreateJournal(journalId).canRollBack(storage, prevStorage,
+ targetLayoutVersion);
+ }
+
+ public void doRollback(String journalId) throws IOException {
+ getOrCreateJournal(journalId).doRollback();
+ }
+
+ public Long getJournalCTime(String journalId) throws IOException {
+ return getOrCreateJournal(journalId).getJournalCTime();
+ }
+
}
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/qjournal/server/JournalNodeRpcServer.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/qjournal/server/JournalNodeRpcServer.java
index 5749cc1..2dbda7e 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/qjournal/server/JournalNodeRpcServer.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/qjournal/server/JournalNodeRpcServer.java
@@ -37,6 +37,7 @@
import org.apache.hadoop.hdfs.qjournal.protocol.RequestInfo;
import org.apache.hadoop.hdfs.qjournal.protocolPB.QJournalProtocolPB;
import org.apache.hadoop.hdfs.qjournal.protocolPB.QJournalProtocolServerSideTranslatorPB;
+import org.apache.hadoop.hdfs.server.common.StorageInfo;
import org.apache.hadoop.hdfs.server.protocol.NamespaceInfo;
import org.apache.hadoop.hdfs.server.protocol.RemoteEditLogManifest;
import org.apache.hadoop.ipc.ProtobufRpcEngine;
@@ -205,4 +206,35 @@
.acceptRecovery(reqInfo, log, fromUrl);
}
+ @Override
+ public void doPreUpgrade(String journalId) throws IOException {
+ jn.doPreUpgrade(journalId);
+ }
+
+ @Override
+ public void doUpgrade(String journalId, StorageInfo sInfo) throws IOException {
+ jn.doUpgrade(journalId, sInfo);
+ }
+
+ @Override
+ public void doFinalize(String journalId) throws IOException {
+ jn.doFinalize(journalId);
+ }
+
+ @Override
+ public Boolean canRollBack(String journalId, StorageInfo storage,
+ StorageInfo prevStorage, int targetLayoutVersion)
+ throws IOException {
+ return jn.canRollBack(journalId, storage, prevStorage, targetLayoutVersion);
+ }
+
+ @Override
+ public void doRollback(String journalId) throws IOException {
+ jn.doRollback(journalId);
+ }
+
+ @Override
+ public Long getJournalCTime(String journalId) throws IOException {
+ return jn.getJournalCTime(journalId);
+ }
}
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/common/Storage.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/common/Storage.java
index 3d661f1..f6189b8 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/common/Storage.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/common/Storage.java
@@ -18,7 +18,6 @@
package org.apache.hadoop.hdfs.server.common;
import java.io.File;
-import java.io.FileInputStream;
import java.io.FileOutputStream;
import java.io.IOException;
import java.io.RandomAccessFile;
@@ -26,26 +25,23 @@
import java.nio.channels.FileLock;
import java.nio.channels.OverlappingFileLockException;
import java.util.ArrayList;
-import java.util.List;
import java.util.Iterator;
+import java.util.List;
import java.util.Properties;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.fs.FileUtil;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hdfs.protocol.HdfsConstants;
-import org.apache.hadoop.hdfs.protocol.LayoutVersion;
-import org.apache.hadoop.hdfs.protocol.LayoutVersion.Feature;
import org.apache.hadoop.hdfs.server.common.HdfsServerConstants.NodeType;
import org.apache.hadoop.hdfs.server.common.HdfsServerConstants.StartupOption;
-import org.apache.hadoop.fs.FileUtil;
import org.apache.hadoop.util.ToolRunner;
import org.apache.hadoop.util.VersionInfo;
-import com.google.common.base.Preconditions;
-
import com.google.common.base.Charsets;
+import com.google.common.base.Preconditions;
@@ -82,7 +78,6 @@
public static final int[] LAYOUT_VERSIONS_203 = {-19, -31};
public static final String STORAGE_FILE_LOCK = "in_use.lock";
- protected static final String STORAGE_FILE_VERSION = "VERSION";
public static final String STORAGE_DIR_CURRENT = "current";
public static final String STORAGE_DIR_PREVIOUS = "previous";
public static final String STORAGE_TMP_REMOVED = "removed.tmp";
@@ -126,22 +121,24 @@
private class DirIterator implements Iterator<StorageDirectory> {
StorageDirType dirType;
+ boolean includeShared;
int prevIndex; // for remove()
int nextIndex; // for next()
- DirIterator(StorageDirType dirType) {
+ DirIterator(StorageDirType dirType, boolean includeShared) {
this.dirType = dirType;
this.nextIndex = 0;
this.prevIndex = 0;
+ this.includeShared = includeShared;
}
@Override
public boolean hasNext() {
if (storageDirs.isEmpty() || nextIndex >= storageDirs.size())
return false;
- if (dirType != null) {
+ if (dirType != null || !includeShared) {
while (nextIndex < storageDirs.size()) {
- if (getStorageDir(nextIndex).getStorageDirType().isOfType(dirType))
+ if (shouldReturnNextDir())
break;
nextIndex++;
}
@@ -156,9 +153,9 @@
StorageDirectory sd = getStorageDir(nextIndex);
prevIndex = nextIndex;
nextIndex++;
- if (dirType != null) {
+ if (dirType != null || !includeShared) {
while (nextIndex < storageDirs.size()) {
- if (getStorageDir(nextIndex).getStorageDirType().isOfType(dirType))
+ if (shouldReturnNextDir())
break;
nextIndex++;
}
@@ -172,6 +169,12 @@
storageDirs.remove(prevIndex); // remove last returned element
hasNext(); // reset nextIndex to correct place
}
+
+ private boolean shouldReturnNextDir() {
+ StorageDirectory sd = getStorageDir(nextIndex);
+ return (dirType == null || sd.getStorageDirType().isOfType(dirType)) &&
+ (includeShared || !sd.isShared());
+ }
}
/**
@@ -203,7 +206,27 @@
* them via the Iterator
*/
public Iterator<StorageDirectory> dirIterator(StorageDirType dirType) {
- return new DirIterator(dirType);
+ return dirIterator(dirType, true);
+ }
+
+ /**
+ * Return all entries in storageDirs, potentially excluding shared dirs.
+ * @param includeShared whether or not to include shared dirs.
+ * @return an iterator over the configured storage dirs.
+ */
+ public Iterator<StorageDirectory> dirIterator(boolean includeShared) {
+ return dirIterator(null, includeShared);
+ }
+
+ /**
+ * @param dirType all entries will be of this type of dir
+ * @param includeShared true to include any shared directories,
+ * false otherwise
+ * @return an iterator over the configured storage dirs.
+ */
+ public Iterator<StorageDirectory> dirIterator(StorageDirType dirType,
+ boolean includeShared) {
+ return new DirIterator(dirType, includeShared);
}
public Iterable<StorageDirectory> dirIterable(final StorageDirType dirType) {
@@ -233,7 +256,9 @@
@InterfaceAudience.Private
public static class StorageDirectory implements FormatConfirmable {
final File root; // root directory
- final boolean useLock; // flag to enable storage lock
+ // whether or not this dir is shared between two separate NNs for HA, or
+ // between multiple block pools in the case of federation.
+ final boolean isShared;
final StorageDirType dirType; // storage dir type
FileLock lock; // storage lock
@@ -241,11 +266,11 @@
public StorageDirectory(File dir) {
// default dirType is null
- this(dir, null, true);
+ this(dir, null, false);
}
public StorageDirectory(File dir, StorageDirType dirType) {
- this(dir, dirType, true);
+ this(dir, dirType, false);
}
public void setStorageUuid(String storageUuid) {
@@ -260,14 +285,14 @@
* Constructor
* @param dir directory corresponding to the storage
* @param dirType storage directory type
- * @param useLock true - enables locking on the storage directory and false
- * disables locking
+ * @param isShared whether or not this dir is shared between two NNs. true
+ * disables locking on the storage directory, false enables locking
*/
- public StorageDirectory(File dir, StorageDirType dirType, boolean useLock) {
+ public StorageDirectory(File dir, StorageDirType dirType, boolean isShared) {
this.root = dir;
this.lock = null;
this.dirType = dirType;
- this.useLock = useLock;
+ this.isShared = isShared;
}
/**
@@ -621,6 +646,10 @@
return true;
}
+
+ public boolean isShared() {
+ return isShared;
+ }
/**
@@ -635,7 +664,7 @@
* @throws IOException if locking fails
*/
public void lock() throws IOException {
- if (!useLock) {
+ if (isShared()) {
LOG.info("Locking is disabled");
return;
}
@@ -890,22 +919,6 @@
}
/**
- * Get common storage fields.
- * Should be overloaded if additional fields need to be get.
- *
- * @param props
- * @throws IOException
- */
- protected void setFieldsFromProperties(
- Properties props, StorageDirectory sd) throws IOException {
- setLayoutVersion(props, sd);
- setNamespaceID(props, sd);
- setStorageType(props, sd);
- setcTime(props, sd);
- setClusterId(props, layoutVersion, sd);
- }
-
- /**
* Set common storage fields into the given properties object.
* Should be overloaded if additional fields need to be set.
*
@@ -923,22 +936,29 @@
}
props.setProperty("cTime", String.valueOf(cTime));
}
-
+
/**
- * Read properties from the VERSION file in the given storage directory.
+ * Get common storage fields.
+ * Should be overloaded if additional fields need to be get.
+ *
+ * @param props
+ * @throws IOException
*/
- public void readProperties(StorageDirectory sd) throws IOException {
- Properties props = readPropertiesFile(sd.getVersionFile());
- setFieldsFromProperties(props, sd);
+ protected void setFieldsFromProperties(
+ Properties props, StorageDirectory sd) throws IOException {
+ super.setFieldsFromProperties(props, sd);
+ setStorageType(props, sd);
}
-
- /**
- * Read properties from the the previous/VERSION file in the given storage directory.
- */
- public void readPreviousVersionProperties(StorageDirectory sd)
- throws IOException {
- Properties props = readPropertiesFile(sd.getPreviousVersionFile());
- setFieldsFromProperties(props, sd);
+
+ /** Validate and set storage type from {@link Properties}*/
+ protected void setStorageType(Properties props, StorageDirectory sd)
+ throws InconsistentFSStateException {
+ NodeType type = NodeType.valueOf(getProperty(props, sd, "storageType"));
+ if (!storageType.equals(type)) {
+ throw new InconsistentFSStateException(sd.root,
+ "node type is incompatible with others.");
+ }
+ storageType = type;
}
/**
@@ -947,10 +967,15 @@
public void writeProperties(StorageDirectory sd) throws IOException {
writeProperties(sd.getVersionFile(), sd);
}
-
+
public void writeProperties(File to, StorageDirectory sd) throws IOException {
Properties props = new Properties();
setPropertiesFromFields(props, sd);
+ writeProperties(to, sd, props);
+ }
+
+ public static void writeProperties(File to, StorageDirectory sd,
+ Properties props) throws IOException {
RandomAccessFile file = new RandomAccessFile(to, "rws");
FileOutputStream out = null;
try {
@@ -977,23 +1002,6 @@
file.close();
}
}
-
- public static Properties readPropertiesFile(File from) throws IOException {
- RandomAccessFile file = new RandomAccessFile(from, "rws");
- FileInputStream in = null;
- Properties props = new Properties();
- try {
- in = new FileInputStream(file.getFD());
- file.seek(0);
- props.load(in);
- } finally {
- if (in != null) {
- in.close();
- }
- file.close();
- }
- return props;
- }
public static void rename(File from, File to) throws IOException {
if (!from.renameTo(to))
@@ -1044,69 +1052,6 @@
+ "-" + Long.toString(storage.getCTime());
}
- String getProperty(Properties props, StorageDirectory sd,
- String name) throws InconsistentFSStateException {
- String property = props.getProperty(name);
- if (property == null) {
- throw new InconsistentFSStateException(sd.root, "file "
- + STORAGE_FILE_VERSION + " has " + name + " missing.");
- }
- return property;
- }
-
- /** Validate and set storage type from {@link Properties}*/
- protected void setStorageType(Properties props, StorageDirectory sd)
- throws InconsistentFSStateException {
- NodeType type = NodeType.valueOf(getProperty(props, sd, "storageType"));
- if (!storageType.equals(type)) {
- throw new InconsistentFSStateException(sd.root,
- "node type is incompatible with others.");
- }
- storageType = type;
- }
-
- /** Validate and set ctime from {@link Properties}*/
- protected void setcTime(Properties props, StorageDirectory sd)
- throws InconsistentFSStateException {
- cTime = Long.parseLong(getProperty(props, sd, "cTime"));
- }
-
- /** Validate and set clusterId from {@link Properties}*/
- protected void setClusterId(Properties props, int layoutVersion,
- StorageDirectory sd) throws InconsistentFSStateException {
- // Set cluster ID in version that supports federation
- if (LayoutVersion.supports(Feature.FEDERATION, layoutVersion)) {
- String cid = getProperty(props, sd, "clusterID");
- if (!(clusterID.equals("") || cid.equals("") || clusterID.equals(cid))) {
- throw new InconsistentFSStateException(sd.getRoot(),
- "cluster Id is incompatible with others.");
- }
- clusterID = cid;
- }
- }
-
- /** Validate and set layout version from {@link Properties}*/
- protected void setLayoutVersion(Properties props, StorageDirectory sd)
- throws IncorrectVersionException, InconsistentFSStateException {
- int lv = Integer.parseInt(getProperty(props, sd, "layoutVersion"));
- if (lv < HdfsConstants.LAYOUT_VERSION) { // future version
- throw new IncorrectVersionException(lv, "storage directory "
- + sd.root.getAbsolutePath());
- }
- layoutVersion = lv;
- }
-
- /** Validate and set namespaceID version from {@link Properties}*/
- protected void setNamespaceID(Properties props, StorageDirectory sd)
- throws InconsistentFSStateException {
- int nsId = Integer.parseInt(getProperty(props, sd, "namespaceID"));
- if (namespaceID != 0 && nsId != 0 && namespaceID != nsId) {
- throw new InconsistentFSStateException(sd.root,
- "namespaceID is incompatible with others.");
- }
- namespaceID = nsId;
- }
-
public static boolean is203LayoutVersion(int layoutVersion) {
for (int lv203 : LAYOUT_VERSIONS_203) {
if (lv203 == layoutVersion) {
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/common/StorageInfo.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/common/StorageInfo.java
index 1dc8340..59f3d99 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/common/StorageInfo.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/common/StorageInfo.java
@@ -17,9 +17,17 @@
*/
package org.apache.hadoop.hdfs.server.common;
+import java.io.File;
+import java.io.FileInputStream;
+import java.io.IOException;
+import java.io.RandomAccessFile;
+import java.util.Properties;
+
import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.hdfs.protocol.HdfsConstants;
import org.apache.hadoop.hdfs.protocol.LayoutVersion;
import org.apache.hadoop.hdfs.protocol.LayoutVersion.Feature;
+import org.apache.hadoop.hdfs.server.common.Storage.StorageDirectory;
import com.google.common.base.Joiner;
@@ -34,6 +42,8 @@
public int namespaceID; // id of the file system
public String clusterID; // id of the cluster
public long cTime; // creation time of the file system state
+
+ protected static final String STORAGE_FILE_VERSION = "VERSION";
public StorageInfo () {
this(0, 0, "", 0L);
@@ -96,4 +106,113 @@
return Joiner.on(":").join(
layoutVersion, namespaceID, cTime, clusterID);
}
+
+ public static int getNsIdFromColonSeparatedString(String in) {
+ return Integer.parseInt(in.split(":")[1]);
+ }
+
+ public static String getClusterIdFromColonSeparatedString(String in) {
+ return in.split(":")[3];
+ }
+
+ /**
+ * Read properties from the VERSION file in the given storage directory.
+ */
+ public void readProperties(StorageDirectory sd) throws IOException {
+ Properties props = readPropertiesFile(sd.getVersionFile());
+ setFieldsFromProperties(props, sd);
+ }
+
+ /**
+ * Read properties from the the previous/VERSION file in the given storage directory.
+ */
+ public void readPreviousVersionProperties(StorageDirectory sd)
+ throws IOException {
+ Properties props = readPropertiesFile(sd.getPreviousVersionFile());
+ setFieldsFromProperties(props, sd);
+ }
+
+ /**
+ * Get common storage fields.
+ * Should be overloaded if additional fields need to be get.
+ *
+ * @param props
+ * @throws IOException
+ */
+ protected void setFieldsFromProperties(
+ Properties props, StorageDirectory sd) throws IOException {
+ setLayoutVersion(props, sd);
+ setNamespaceID(props, sd);
+ setcTime(props, sd);
+ setClusterId(props, layoutVersion, sd);
+ }
+
+ /** Validate and set ctime from {@link Properties}*/
+ protected void setcTime(Properties props, StorageDirectory sd)
+ throws InconsistentFSStateException {
+ cTime = Long.parseLong(getProperty(props, sd, "cTime"));
+ }
+
+ /** Validate and set clusterId from {@link Properties}*/
+ protected void setClusterId(Properties props, int layoutVersion,
+ StorageDirectory sd) throws InconsistentFSStateException {
+ // Set cluster ID in version that supports federation
+ if (LayoutVersion.supports(Feature.FEDERATION, layoutVersion)) {
+ String cid = getProperty(props, sd, "clusterID");
+ if (!(clusterID.equals("") || cid.equals("") || clusterID.equals(cid))) {
+ throw new InconsistentFSStateException(sd.getRoot(),
+ "cluster Id is incompatible with others.");
+ }
+ clusterID = cid;
+ }
+ }
+
+ /** Validate and set layout version from {@link Properties}*/
+ protected void setLayoutVersion(Properties props, StorageDirectory sd)
+ throws IncorrectVersionException, InconsistentFSStateException {
+ int lv = Integer.parseInt(getProperty(props, sd, "layoutVersion"));
+ if (lv < HdfsConstants.LAYOUT_VERSION) { // future version
+ throw new IncorrectVersionException(lv, "storage directory "
+ + sd.root.getAbsolutePath());
+ }
+ layoutVersion = lv;
+ }
+
+ /** Validate and set namespaceID version from {@link Properties}*/
+ protected void setNamespaceID(Properties props, StorageDirectory sd)
+ throws InconsistentFSStateException {
+ int nsId = Integer.parseInt(getProperty(props, sd, "namespaceID"));
+ if (namespaceID != 0 && nsId != 0 && namespaceID != nsId) {
+ throw new InconsistentFSStateException(sd.root,
+ "namespaceID is incompatible with others.");
+ }
+ namespaceID = nsId;
+ }
+
+ static String getProperty(Properties props, StorageDirectory sd,
+ String name) throws InconsistentFSStateException {
+ String property = props.getProperty(name);
+ if (property == null) {
+ throw new InconsistentFSStateException(sd.root, "file "
+ + STORAGE_FILE_VERSION + " has " + name + " missing.");
+ }
+ return property;
+ }
+
+ public static Properties readPropertiesFile(File from) throws IOException {
+ RandomAccessFile file = new RandomAccessFile(from, "rws");
+ FileInputStream in = null;
+ Properties props = new Properties();
+ try {
+ in = new FileInputStream(file.getFD());
+ file.seek(0);
+ props.load(in);
+ } finally {
+ if (in != null) {
+ in.close();
+ }
+ file.close();
+ }
+ return props;
+ }
}
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BlockPoolSliceStorage.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BlockPoolSliceStorage.java
index 8558e95..2497621 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BlockPoolSliceStorage.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BlockPoolSliceStorage.java
@@ -103,7 +103,7 @@
dataDirs.size());
for (Iterator<File> it = dataDirs.iterator(); it.hasNext();) {
File dataDir = it.next();
- StorageDirectory sd = new StorageDirectory(dataDir, null, false);
+ StorageDirectory sd = new StorageDirectory(dataDir, null, true);
StorageState curState;
try {
curState = sd.analyzeStorage(startOpt, this);
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/BlockPoolSlice.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/BlockPoolSlice.java
index ce3dccd..ed9ba58 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/BlockPoolSlice.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/BlockPoolSlice.java
@@ -23,6 +23,7 @@
import java.io.FileInputStream;
import java.io.IOException;
import java.io.InputStream;
+import java.io.RandomAccessFile;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.DU;
@@ -191,7 +192,7 @@
blockFile.length(), genStamp, volume, blockFile.getParentFile());
} else {
newReplica = new ReplicaWaitingToBeRecovered(blockId,
- validateIntegrity(blockFile, genStamp),
+ validateIntegrityAndSetLength(blockFile, genStamp),
genStamp, volume, blockFile.getParentFile());
}
@@ -214,7 +215,7 @@
* @param genStamp generation stamp of the block
* @return the number of valid bytes
*/
- private long validateIntegrity(File blockFile, long genStamp) {
+ private long validateIntegrityAndSetLength(File blockFile, long genStamp) {
DataInputStream checksumIn = null;
InputStream blockIn = null;
try {
@@ -257,11 +258,25 @@
IOUtils.readFully(blockIn, buf, 0, lastChunkSize);
checksum.update(buf, 0, lastChunkSize);
+ long validFileLength;
if (checksum.compare(buf, lastChunkSize)) { // last chunk matches crc
- return lastChunkStartPos + lastChunkSize;
+ validFileLength = lastChunkStartPos + lastChunkSize;
} else { // last chunck is corrupt
- return lastChunkStartPos;
+ validFileLength = lastChunkStartPos;
}
+
+ // truncate if extra bytes are present without CRC
+ if (blockFile.length() > validFileLength) {
+ RandomAccessFile blockRAF = new RandomAccessFile(blockFile, "rw");
+ try {
+ // truncate blockFile
+ blockRAF.setLength(validFileLength);
+ } finally {
+ blockRAF.close();
+ }
+ }
+
+ return validFileLength;
} catch (IOException e) {
FsDatasetImpl.LOG.warn(e);
return 0;
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/BackupJournalManager.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/BackupJournalManager.java
index 5420b12..2547d88 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/BackupJournalManager.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/BackupJournalManager.java
@@ -20,6 +20,8 @@
import java.io.IOException;
import java.util.Collection;
+import org.apache.hadoop.hdfs.server.common.Storage;
+import org.apache.hadoop.hdfs.server.common.StorageInfo;
import org.apache.hadoop.hdfs.server.protocol.JournalInfo;
import org.apache.hadoop.hdfs.server.protocol.NamenodeRegistration;
import org.apache.hadoop.hdfs.server.protocol.NamespaceInfo;
@@ -97,4 +99,35 @@
public String toString() {
return "BackupJournalManager";
}
+
+ @Override
+ public void doPreUpgrade() throws IOException {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public void doUpgrade(Storage storage) throws IOException {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public void doFinalize() throws IOException {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public boolean canRollBack(StorageInfo storage, StorageInfo prevStorage,
+ int targetLayoutVersion) throws IOException {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public void doRollback() throws IOException {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public long getJournalCTime() throws IOException {
+ throw new UnsupportedOperationException();
+ }
}
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/BackupNode.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/BackupNode.java
index 5e7740c..6a1afb7 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/BackupNode.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/BackupNode.java
@@ -36,6 +36,7 @@
import org.apache.hadoop.hdfs.protocolPB.JournalProtocolPB;
import org.apache.hadoop.hdfs.protocolPB.JournalProtocolServerSideTranslatorPB;
import org.apache.hadoop.hdfs.server.common.HdfsServerConstants.NamenodeRole;
+import org.apache.hadoop.hdfs.server.common.HdfsServerConstants.StartupOption;
import org.apache.hadoop.hdfs.server.common.Storage;
import org.apache.hadoop.hdfs.server.namenode.ha.HAState;
import org.apache.hadoop.hdfs.server.protocol.FenceResponse;
@@ -367,7 +368,7 @@
} else {
nsInfo.validateStorage(storage);
}
- bnImage.initEditLog();
+ bnImage.initEditLog(StartupOption.REGULAR);
setRegistration();
NamenodeRegistration nnReg = null;
while(!isStopRequested()) {
@@ -423,7 +424,8 @@
return DFSUtil.getBackupNameServiceId(conf);
}
- protected HAState createHAState() {
+ @Override
+ protected HAState createHAState(StartupOption startOpt) {
return new BackupState();
}
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSEditLog.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSEditLog.java
index d98dad1..d43a55bf 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSEditLog.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSEditLog.java
@@ -42,6 +42,7 @@
import org.apache.hadoop.hdfs.security.token.delegation.DelegationTokenIdentifier;
import org.apache.hadoop.hdfs.server.blockmanagement.BlockInfo;
import org.apache.hadoop.hdfs.server.common.HdfsServerConstants.NamenodeRole;
+import org.apache.hadoop.hdfs.server.common.Storage;
import org.apache.hadoop.hdfs.server.common.Storage.FormatConfirmable;
import org.apache.hadoop.hdfs.server.common.Storage.StorageDirectory;
import org.apache.hadoop.hdfs.server.namenode.FSEditLogOp.AddBlockOp;
@@ -252,10 +253,12 @@
if (u.getScheme().equals(NNStorage.LOCAL_URI_SCHEME)) {
StorageDirectory sd = storage.getStorageDirectory(u);
if (sd != null) {
- journalSet.add(new FileJournalManager(conf, sd, storage), required);
+ journalSet.add(new FileJournalManager(conf, sd, storage),
+ required, sharedEditsDirs.contains(u));
}
} else {
- journalSet.add(createJournal(u), required);
+ journalSet.add(createJournal(u), required,
+ sharedEditsDirs.contains(u));
}
}
@@ -1339,6 +1342,58 @@
}
}
+ public long getSharedLogCTime() throws IOException {
+ for (JournalAndStream jas : journalSet.getAllJournalStreams()) {
+ if (jas.isShared()) {
+ return jas.getManager().getJournalCTime();
+ }
+ }
+ throw new IOException("No shared log found.");
+ }
+
+ public synchronized void doPreUpgradeOfSharedLog() throws IOException {
+ for (JournalAndStream jas : journalSet.getAllJournalStreams()) {
+ if (jas.isShared()) {
+ jas.getManager().doPreUpgrade();
+ }
+ }
+ }
+
+ public synchronized void doUpgradeOfSharedLog() throws IOException {
+ for (JournalAndStream jas : journalSet.getAllJournalStreams()) {
+ if (jas.isShared()) {
+ jas.getManager().doUpgrade(storage);
+ }
+ }
+ }
+
+ public synchronized void doFinalizeOfSharedLog() throws IOException {
+ for (JournalAndStream jas : journalSet.getAllJournalStreams()) {
+ if (jas.isShared()) {
+ jas.getManager().doFinalize();
+ }
+ }
+ }
+
+ public synchronized boolean canRollBackSharedLog(Storage prevStorage,
+ int targetLayoutVersion) throws IOException {
+ for (JournalAndStream jas : journalSet.getAllJournalStreams()) {
+ if (jas.isShared()) {
+ return jas.getManager().canRollBack(storage, prevStorage,
+ targetLayoutVersion);
+ }
+ }
+ throw new IOException("No shared log found.");
+ }
+
+ public synchronized void doRollback() throws IOException {
+ for (JournalAndStream jas : journalSet.getAllJournalStreams()) {
+ if (jas.isShared()) {
+ jas.getManager().doRollback();
+ }
+ }
+ }
+
@Override
public void selectInputStreams(Collection<EditLogInputStream> streams,
long fromTxId, boolean inProgressOk) throws IOException {
@@ -1470,4 +1525,5 @@
+ uri, e);
}
}
+
}
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSImage.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSImage.java
index cc4ca0c..166ffb2 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSImage.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSImage.java
@@ -178,7 +178,8 @@
* @return true if the image needs to be saved or false otherwise
*/
boolean recoverTransitionRead(StartupOption startOpt, FSNamesystem target,
- MetaRecoveryContext recovery) throws IOException {
+ MetaRecoveryContext recovery)
+ throws IOException {
assert startOpt != StartupOption.FORMAT :
"NameNode formatting should be performed before reading the image";
@@ -252,14 +253,14 @@
doImportCheckpoint(target);
return false; // import checkpoint saved image already
case ROLLBACK:
- doRollback();
- break;
+ throw new AssertionError("Rollback is now a standalone command, " +
+ "NameNode should not be starting with this option.");
case REGULAR:
default:
// just load the image
}
- return loadFSImage(target, recovery);
+ return loadFSImage(target, recovery, startOpt);
}
/**
@@ -272,17 +273,15 @@
private boolean recoverStorageDirs(StartupOption startOpt,
Map<StorageDirectory, StorageState> dataDirStates) throws IOException {
boolean isFormatted = false;
+ // This loop needs to be over all storage dirs, even shared dirs, to make
+ // sure that we properly examine their state, but we make sure we don't
+ // mutate the shared dir below in the actual loop.
for (Iterator<StorageDirectory> it =
storage.dirIterator(); it.hasNext();) {
StorageDirectory sd = it.next();
StorageState curState;
try {
curState = sd.analyzeStorage(startOpt, storage);
- String nameserviceId = DFSUtil.getNamenodeNameServiceId(conf);
- if (curState != StorageState.NORMAL && HAUtil.isHAEnabled(conf, nameserviceId)) {
- throw new IOException("Cannot start an HA namenode with name dirs " +
- "that need recovery. Dir: " + sd + " state: " + curState);
- }
// sd is locked but not opened
switch(curState) {
case NON_EXISTENT:
@@ -294,7 +293,7 @@
case NORMAL:
break;
default: // recovery is possible
- sd.doRecover(curState);
+ sd.doRecover(curState);
}
if (curState != StorageState.NOT_FORMATTED
&& startOpt != StartupOption.ROLLBACK) {
@@ -315,10 +314,10 @@
return isFormatted;
}
- private void doUpgrade(FSNamesystem target) throws IOException {
+ void doUpgrade(FSNamesystem target) throws IOException {
// Upgrade is allowed only if there are
- // no previous fs states in any of the directories
- for (Iterator<StorageDirectory> it = storage.dirIterator(); it.hasNext();) {
+ // no previous fs states in any of the local directories
+ for (Iterator<StorageDirectory> it = storage.dirIterator(false); it.hasNext();) {
StorageDirectory sd = it.next();
if (sd.getPreviousDir().exists())
throw new InconsistentFSStateException(sd.getRoot(),
@@ -327,9 +326,9 @@
}
// load the latest image
- this.loadFSImage(target, null);
-
// Do upgrade for each directory
+ this.loadFSImage(target, null, StartupOption.UPGRADE);
+
long oldCTime = storage.getCTime();
storage.cTime = now(); // generate new cTime for the state
int oldLV = storage.getLayoutVersion();
@@ -337,28 +336,17 @@
List<StorageDirectory> errorSDs =
Collections.synchronizedList(new ArrayList<StorageDirectory>());
- for (Iterator<StorageDirectory> it = storage.dirIterator(); it.hasNext();) {
+ assert !editLog.isSegmentOpen() : "Edits log must not be open.";
+ LOG.info("Starting upgrade of local storage directories."
+ + "\n old LV = " + oldLV
+ + "; old CTime = " + oldCTime
+ + ".\n new LV = " + storage.getLayoutVersion()
+ + "; new CTime = " + storage.getCTime());
+ // Do upgrade for each directory
+ for (Iterator<StorageDirectory> it = storage.dirIterator(false); it.hasNext();) {
StorageDirectory sd = it.next();
- LOG.info("Starting upgrade of image directory " + sd.getRoot()
- + ".\n old LV = " + oldLV
- + "; old CTime = " + oldCTime
- + ".\n new LV = " + storage.getLayoutVersion()
- + "; new CTime = " + storage.getCTime());
try {
- File curDir = sd.getCurrentDir();
- File prevDir = sd.getPreviousDir();
- File tmpDir = sd.getPreviousTmp();
- assert curDir.exists() : "Current directory must exist.";
- assert !prevDir.exists() : "previous directory must not exist.";
- assert !tmpDir.exists() : "previous.tmp directory must not exist.";
- assert !editLog.isSegmentOpen() : "Edits log must not be open.";
-
- // rename current to tmp
- NNStorage.rename(curDir, tmpDir);
-
- if (!curDir.mkdir()) {
- throw new IOException("Cannot create directory " + curDir);
- }
+ NNUpgradeUtil.doPreUpgrade(sd);
} catch (Exception e) {
LOG.error("Failed to move aside pre-upgrade storage " +
"in image directory " + sd.getRoot(), e);
@@ -366,41 +354,38 @@
continue;
}
}
+ if (target.isHaEnabled()) {
+ editLog.doPreUpgradeOfSharedLog();
+ }
storage.reportErrorsOnDirectories(errorSDs);
errorSDs.clear();
saveFSImageInAllDirs(target, editLog.getLastWrittenTxId());
- for (Iterator<StorageDirectory> it = storage.dirIterator(); it.hasNext();) {
+ for (Iterator<StorageDirectory> it = storage.dirIterator(false); it.hasNext();) {
StorageDirectory sd = it.next();
try {
- // Write the version file, since saveFsImage above only makes the
- // fsimage_<txid>, and the directory is otherwise empty.
- storage.writeProperties(sd);
-
- File prevDir = sd.getPreviousDir();
- File tmpDir = sd.getPreviousTmp();
- // rename tmp to previous
- NNStorage.rename(tmpDir, prevDir);
+ NNUpgradeUtil.doUpgrade(sd, storage);
} catch (IOException ioe) {
- LOG.error("Unable to rename temp to previous for " + sd.getRoot(), ioe);
errorSDs.add(sd);
continue;
}
- LOG.info("Upgrade of " + sd.getRoot() + " is complete.");
+ }
+ if (target.isHaEnabled()) {
+ editLog.doUpgradeOfSharedLog();
}
storage.reportErrorsOnDirectories(errorSDs);
-
+
isUpgradeFinalized = false;
if (!storage.getRemovedStorageDirs().isEmpty()) {
- //during upgrade, it's a fatal error to fail any storage directory
+ // during upgrade, it's a fatal error to fail any storage directory
throw new IOException("Upgrade failed in "
+ storage.getRemovedStorageDirs().size()
+ " storage directory(ies), previously logged.");
}
}
- private void doRollback() throws IOException {
+ void doRollback(FSNamesystem fsns) throws IOException {
// Rollback is allowed only if there is
// a previous fs states in at least one of the storage directories.
// Directories that don't have previous state do not rollback
@@ -408,85 +393,46 @@
FSImage prevState = new FSImage(conf);
try {
prevState.getStorage().layoutVersion = HdfsConstants.LAYOUT_VERSION;
- for (Iterator<StorageDirectory> it = storage.dirIterator(); it.hasNext();) {
+ for (Iterator<StorageDirectory> it = storage.dirIterator(false); it.hasNext();) {
StorageDirectory sd = it.next();
- File prevDir = sd.getPreviousDir();
- if (!prevDir.exists()) { // use current directory then
- LOG.info("Storage directory " + sd.getRoot()
- + " does not contain previous fs state.");
- // read and verify consistency with other directories
- storage.readProperties(sd);
+ if (!NNUpgradeUtil.canRollBack(sd, storage, prevState.getStorage(),
+ HdfsConstants.LAYOUT_VERSION)) {
continue;
}
-
- // read and verify consistency of the prev dir
- prevState.getStorage().readPreviousVersionProperties(sd);
-
- if (prevState.getLayoutVersion() != HdfsConstants.LAYOUT_VERSION) {
- throw new IOException(
- "Cannot rollback to storage version " +
- prevState.getLayoutVersion() +
- " using this version of the NameNode, which uses storage version " +
- HdfsConstants.LAYOUT_VERSION + ". " +
- "Please use the previous version of HDFS to perform the rollback.");
- }
canRollback = true;
}
+
+ if (fsns.isHaEnabled()) {
+ // If HA is enabled, check if the shared log can be rolled back as well.
+ editLog.initJournalsForWrite();
+ canRollback |= editLog.canRollBackSharedLog(prevState.getStorage(),
+ HdfsConstants.LAYOUT_VERSION);
+ }
+
if (!canRollback)
throw new IOException("Cannot rollback. None of the storage "
+ "directories contain previous fs state.");
-
+
// Now that we know all directories are going to be consistent
// Do rollback for each directory containing previous state
- for (Iterator<StorageDirectory> it = storage.dirIterator(); it.hasNext();) {
+ for (Iterator<StorageDirectory> it = storage.dirIterator(false); it.hasNext();) {
StorageDirectory sd = it.next();
- File prevDir = sd.getPreviousDir();
- if (!prevDir.exists())
- continue;
-
LOG.info("Rolling back storage directory " + sd.getRoot()
- + ".\n new LV = " + prevState.getStorage().getLayoutVersion()
- + "; new CTime = " + prevState.getStorage().getCTime());
- File tmpDir = sd.getRemovedTmp();
- assert !tmpDir.exists() : "removed.tmp directory must not exist.";
- // rename current to tmp
- File curDir = sd.getCurrentDir();
- assert curDir.exists() : "Current directory must exist.";
- NNStorage.rename(curDir, tmpDir);
- // rename previous to current
- NNStorage.rename(prevDir, curDir);
-
- // delete tmp dir
- NNStorage.deleteDir(tmpDir);
- LOG.info("Rollback of " + sd.getRoot()+ " is complete.");
+ + ".\n new LV = " + prevState.getStorage().getLayoutVersion()
+ + "; new CTime = " + prevState.getStorage().getCTime());
+ NNUpgradeUtil.doRollBack(sd);
}
+ if (fsns.isHaEnabled()) {
+ // If HA is enabled, try to roll back the shared log as well.
+ editLog.doRollback();
+ }
+
isUpgradeFinalized = true;
} finally {
prevState.close();
}
}
- private void doFinalize(StorageDirectory sd) throws IOException {
- File prevDir = sd.getPreviousDir();
- if (!prevDir.exists()) { // already discarded
- LOG.info("Directory " + prevDir + " does not exist.");
- LOG.info("Finalize upgrade for " + sd.getRoot()+ " is not required.");
- return;
- }
- LOG.info("Finalizing upgrade for storage directory "
- + sd.getRoot() + "."
- + (storage.getLayoutVersion()==0 ? "" :
- "\n cur LV = " + storage.getLayoutVersion()
- + "; cur CTime = " + storage.getCTime()));
- assert sd.getCurrentDir().exists() : "Current directory must exist.";
- final File tmpDir = sd.getFinalizedTmp();
- // rename previous to tmp and remove
- NNStorage.rename(prevDir, tmpDir);
- NNStorage.deleteDir(tmpDir);
- isUpgradeFinalized = true;
- LOG.info("Finalize upgrade for " + sd.getRoot()+ " is complete.");
- }
-
/**
* Load image from a checkpoint directory and save it into the current one.
* @param target the NameSystem to import into
@@ -521,7 +467,7 @@
// return back the real image
realImage.getStorage().setStorageInfo(ckptImage.getStorage());
realImage.getEditLog().setNextTxId(ckptImage.getEditLog().getLastWrittenTxId()+1);
- realImage.initEditLog();
+ realImage.initEditLog(StartupOption.IMPORT);
target.dir.fsImage = realImage;
realImage.getStorage().setBlockPoolID(ckptImage.getBlockPoolID());
@@ -530,12 +476,23 @@
saveNamespace(target);
getStorage().writeAll();
}
-
- void finalizeUpgrade() throws IOException {
- for (Iterator<StorageDirectory> it = storage.dirIterator(); it.hasNext();) {
+
+ void finalizeUpgrade(boolean finalizeEditLog) throws IOException {
+ LOG.info("Finalizing upgrade for local dirs. " +
+ (storage.getLayoutVersion() == 0 ? "" :
+ "\n cur LV = " + storage.getLayoutVersion()
+ + "; cur CTime = " + storage.getCTime()));
+ for (Iterator<StorageDirectory> it = storage.dirIterator(false); it.hasNext();) {
StorageDirectory sd = it.next();
- doFinalize(sd);
+ NNUpgradeUtil.doFinalize(sd);
}
+ if (finalizeEditLog) {
+ // We only do this in the case that HA is enabled and we're active. In any
+ // other case the NN will have done the upgrade of the edits directories
+ // already by virtue of the fact that they're local.
+ editLog.doFinalizeOfSharedLog();
+ }
+ isUpgradeFinalized = true;
}
boolean isUpgradeFinalized() {
@@ -582,8 +539,8 @@
* @return whether the image should be saved
* @throws IOException
*/
- boolean loadFSImage(FSNamesystem target, MetaRecoveryContext recovery)
- throws IOException {
+ boolean loadFSImage(FSNamesystem target, MetaRecoveryContext recovery,
+ StartupOption startOpt) throws IOException {
FSImageStorageInspector inspector = storage.readAndInspectDirs();
FSImageFile imageFile = null;
@@ -600,7 +557,7 @@
Iterable<EditLogInputStream> editStreams = null;
- initEditLog();
+ initEditLog(startOpt);
if (LayoutVersion.supports(Feature.TXID_BASED_LAYOUT,
getLayoutVersion())) {
@@ -682,14 +639,30 @@
}
}
- public void initEditLog() {
+ public void initEditLog(StartupOption startOpt) throws IOException {
Preconditions.checkState(getNamespaceID() != 0,
"Must know namespace ID before initting edit log");
String nameserviceId = DFSUtil.getNamenodeNameServiceId(conf);
if (!HAUtil.isHAEnabled(conf, nameserviceId)) {
+ // If this NN is not HA
editLog.initJournalsForWrite();
editLog.recoverUnclosedStreams();
+ } else if (HAUtil.isHAEnabled(conf, nameserviceId) &&
+ startOpt == StartupOption.UPGRADE) {
+ // This NN is HA, but we're doing an upgrade so init the edit log for
+ // write.
+ editLog.initJournalsForWrite();
+ long sharedLogCTime = editLog.getSharedLogCTime();
+ if (this.storage.getCTime() < sharedLogCTime) {
+ throw new IOException("It looks like the shared log is already " +
+ "being upgraded but this NN has not been upgraded yet. You " +
+ "should restart this NameNode with the '" +
+ StartupOption.BOOTSTRAPSTANDBY.getName() + "' option to bring " +
+ "this NN in sync with the other.");
+ }
+ editLog.recoverUnclosedStreams();
} else {
+ // This NN is HA and we're not doing an upgrade.
editLog.initSharedJournalsForRead();
}
}
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java
index 537aa72..f1bf5d5 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java
@@ -549,6 +549,10 @@
return leaseManager;
}
+ boolean isHaEnabled() {
+ return haEnabled;
+ }
+
/**
* Check the supplied configuration for correctness.
* @param conf Supplies the configuration to validate.
@@ -878,7 +882,7 @@
}
// This will start a new log segment and write to the seen_txid file, so
// we shouldn't do it when coming up in standby state
- if (!haEnabled) {
+ if (!haEnabled || (haEnabled && startOpt == StartupOption.UPGRADE)) {
fsImage.openEditLogForWrite();
}
success = true;
@@ -1005,6 +1009,7 @@
dir.fsImage.editLog.openForWrite();
}
+
if (haEnabled) {
// Renew all of the leases before becoming active.
// This is because, while we were in standby mode,
@@ -1031,14 +1036,17 @@
}
}
+ private boolean inActiveState() {
+ return haContext != null &&
+ haContext.getState().getServiceState() == HAServiceState.ACTIVE;
+ }
+
/**
* @return Whether the namenode is transitioning to active state and is in the
* middle of the {@link #startActiveServices()}
*/
public boolean inTransitionToActive() {
- return haEnabled && haContext != null
- && haContext.getState().getServiceState() == HAServiceState.ACTIVE
- && startingActiveService;
+ return haEnabled && inActiveState() && startingActiveService;
}
private boolean shouldUseDelegationTokens() {
@@ -4512,11 +4520,11 @@
void finalizeUpgrade() throws IOException {
checkSuperuserPrivilege();
- checkOperation(OperationCategory.WRITE);
+ checkOperation(OperationCategory.UNCHECKED);
writeLock();
try {
- checkOperation(OperationCategory.WRITE);
- getFSImage().finalizeUpgrade();
+ checkOperation(OperationCategory.UNCHECKED);
+ getFSImage().finalizeUpgrade(this.isHaEnabled() && inActiveState());
} finally {
writeUnlock();
}
@@ -7421,5 +7429,6 @@
logger.addAppender(asyncAppender);
}
}
+
}
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FileJournalManager.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FileJournalManager.java
index 09bddef..4c78add 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FileJournalManager.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FileJournalManager.java
@@ -33,14 +33,15 @@
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileUtil;
import org.apache.hadoop.hdfs.protocol.HdfsConstants;
+import org.apache.hadoop.hdfs.server.common.Storage;
import org.apache.hadoop.hdfs.server.common.Storage.StorageDirectory;
import org.apache.hadoop.hdfs.server.common.StorageErrorReporter;
+import org.apache.hadoop.hdfs.server.common.StorageInfo;
import org.apache.hadoop.hdfs.server.namenode.NNStorageRetentionManager.StoragePurger;
import org.apache.hadoop.hdfs.server.namenode.FSEditLogLoader.EditLogValidation;
import org.apache.hadoop.hdfs.server.namenode.NNStorage.NameNodeFile;
import org.apache.hadoop.hdfs.server.protocol.NamespaceInfo;
import org.apache.hadoop.hdfs.server.protocol.RemoteEditLog;
-
import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Joiner;
import com.google.common.base.Preconditions;
@@ -489,4 +490,49 @@
isInProgress(), hasCorruptHeader);
}
}
+
+ @Override
+ public void doPreUpgrade() throws IOException {
+ LOG.info("Starting upgrade of edits directory " + sd.getRoot());
+ try {
+ NNUpgradeUtil.doPreUpgrade(sd);
+ } catch (IOException ioe) {
+ LOG.error("Failed to move aside pre-upgrade storage " +
+ "in image directory " + sd.getRoot(), ioe);
+ throw ioe;
+ }
+ }
+
+ /**
+ * This method assumes that the fields of the {@link Storage} object have
+ * already been updated to the appropriate new values for the upgrade.
+ */
+ @Override
+ public void doUpgrade(Storage storage) throws IOException {
+ NNUpgradeUtil.doUpgrade(sd, storage);
+ }
+
+ @Override
+ public void doFinalize() throws IOException {
+ NNUpgradeUtil.doFinalize(sd);
+ }
+
+ @Override
+ public boolean canRollBack(StorageInfo storage, StorageInfo prevStorage,
+ int targetLayoutVersion) throws IOException {
+ return NNUpgradeUtil.canRollBack(sd, storage,
+ prevStorage, targetLayoutVersion);
+ }
+
+ @Override
+ public void doRollback() throws IOException {
+ NNUpgradeUtil.doRollBack(sd);
+ }
+
+ @Override
+ public long getJournalCTime() throws IOException {
+ StorageInfo sInfo = new StorageInfo();
+ sInfo.readProperties(sd);
+ return sInfo.getCTime();
+ }
}
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/JournalManager.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/JournalManager.java
index 785c1fe..a50b3aa 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/JournalManager.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/JournalManager.java
@@ -22,7 +22,9 @@
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.classification.InterfaceStability;
+import org.apache.hadoop.hdfs.server.common.Storage;
import org.apache.hadoop.hdfs.server.common.Storage.FormatConfirmable;
+import org.apache.hadoop.hdfs.server.common.StorageInfo;
import org.apache.hadoop.hdfs.server.protocol.NamespaceInfo;
/**
@@ -64,6 +66,54 @@
* Recover segments which have not been finalized.
*/
void recoverUnfinalizedSegments() throws IOException;
+
+ /**
+ * Perform any steps that must succeed across all JournalManagers involved in
+ * an upgrade before proceeding onto the actual upgrade stage. If a call to
+ * any JM's doPreUpgrade method fails, then doUpgrade will not be called for
+ * any JM.
+ */
+ void doPreUpgrade() throws IOException;
+
+ /**
+ * Perform the actual upgrade of the JM. After this is completed, the NN can
+ * begin to use the new upgraded metadata. This metadata may later be either
+ * finalized or rolled back to the previous state.
+ *
+ * @param storage info about the new upgraded versions.
+ */
+ void doUpgrade(Storage storage) throws IOException;
+
+ /**
+ * Finalize the upgrade. JMs should purge any state that they had been keeping
+ * around during the upgrade process. After this is completed, rollback is no
+ * longer allowed.
+ */
+ void doFinalize() throws IOException;
+
+ /**
+ * Return true if this JM can roll back to the previous storage state, false
+ * otherwise. The NN will refuse to run the rollback operation unless at least
+ * one JM or fsimage storage directory can roll back.
+ *
+ * @param storage the storage info for the current state
+ * @param prevStorage the storage info for the previous (unupgraded) state
+ * @param targetLayoutVersion the layout version we intend to roll back to
+ * @return true if this JM can roll back, false otherwise.
+ */
+ boolean canRollBack(StorageInfo storage, StorageInfo prevStorage,
+ int targetLayoutVersion) throws IOException;
+
+ /**
+ * Perform the rollback to the previous FS state. JMs which do not need to
+ * roll back their state should just return without error.
+ */
+ void doRollback() throws IOException;
+
+ /**
+ * @return the CTime of the journal manager.
+ */
+ long getJournalCTime() throws IOException;
/**
* Close the journal manager, freeing any resources it may hold.
@@ -84,4 +134,5 @@
super(reason);
}
}
+
}
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/JournalSet.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/JournalSet.java
index b117606..7bf5015 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/JournalSet.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/JournalSet.java
@@ -33,6 +33,8 @@
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.hdfs.server.common.Storage;
+import org.apache.hadoop.hdfs.server.common.StorageInfo;
import org.apache.hadoop.hdfs.server.protocol.NamespaceInfo;
import org.apache.hadoop.hdfs.server.protocol.RemoteEditLog;
import org.apache.hadoop.hdfs.server.protocol.RemoteEditLogManifest;
@@ -77,11 +79,14 @@
private final JournalManager journal;
private boolean disabled = false;
private EditLogOutputStream stream;
- private boolean required = false;
+ private final boolean required;
+ private final boolean shared;
- public JournalAndStream(JournalManager manager, boolean required) {
+ public JournalAndStream(JournalManager manager, boolean required,
+ boolean shared) {
this.journal = manager;
this.required = required;
+ this.shared = shared;
}
public void startLogSegment(long txId) throws IOException {
@@ -163,6 +168,10 @@
public boolean isRequired() {
return required;
}
+
+ public boolean isShared() {
+ return shared;
+ }
}
// COW implementation is necessary since some users (eg the web ui) call
@@ -178,7 +187,7 @@
@Override
public void format(NamespaceInfo nsInfo) throws IOException {
- // The iteration is done by FSEditLog itself
+ // The operation is done by FSEditLog itself
throw new UnsupportedOperationException();
}
@@ -537,9 +546,13 @@
}
return jList;
}
-
+
void add(JournalManager j, boolean required) {
- JournalAndStream jas = new JournalAndStream(j, required);
+ add(j, required, false);
+ }
+
+ void add(JournalManager j, boolean required, boolean shared) {
+ JournalAndStream jas = new JournalAndStream(j, required, shared);
journals.add(jas);
}
@@ -655,4 +668,40 @@
}
return buf.toString();
}
+
+ @Override
+ public void doPreUpgrade() throws IOException {
+ // This operation is handled by FSEditLog directly.
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public void doUpgrade(Storage storage) throws IOException {
+ // This operation is handled by FSEditLog directly.
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public void doFinalize() throws IOException {
+ // This operation is handled by FSEditLog directly.
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public boolean canRollBack(StorageInfo storage, StorageInfo prevStorage, int targetLayoutVersion) throws IOException {
+ // This operation is handled by FSEditLog directly.
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public void doRollback() throws IOException {
+ // This operation is handled by FSEditLog directly.
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public long getJournalCTime() throws IOException {
+ // This operation is handled by FSEditLog directly.
+ throw new UnsupportedOperationException();
+ }
}
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NNStorage.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NNStorage.java
index 21c56c2..ce23c57 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NNStorage.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NNStorage.java
@@ -299,7 +299,7 @@
if(dirName.getScheme().compareTo("file") == 0) {
this.addStorageDir(new StorageDirectory(new File(dirName.getPath()),
dirType,
- !sharedEditsDirs.contains(dirName))); // Don't lock the dir if it's shared.
+ sharedEditsDirs.contains(dirName))); // Don't lock the dir if it's shared.
}
}
@@ -310,7 +310,7 @@
// URI is of type file://
if(dirName.getScheme().compareTo("file") == 0)
this.addStorageDir(new StorageDirectory(new File(dirName.getPath()),
- NameNodeDirType.EDITS, !sharedEditsDirs.contains(dirName)));
+ NameNodeDirType.EDITS, sharedEditsDirs.contains(dirName)));
}
}
@@ -976,7 +976,7 @@
StringBuilder layoutVersions = new StringBuilder();
// First determine what range of layout versions we're going to inspect
- for (Iterator<StorageDirectory> it = dirIterator();
+ for (Iterator<StorageDirectory> it = dirIterator(false);
it.hasNext();) {
StorageDirectory sd = it.next();
if (!sd.getVersionFile().exists()) {
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NNUpgradeUtil.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NNUpgradeUtil.java
new file mode 100644
index 0000000..1c491e5
--- /dev/null
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NNUpgradeUtil.java
@@ -0,0 +1,174 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hdfs.server.namenode;
+
+import java.io.File;
+import java.io.IOException;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.hdfs.server.common.Storage;
+import org.apache.hadoop.hdfs.server.common.Storage.StorageDirectory;
+import org.apache.hadoop.hdfs.server.common.StorageInfo;
+
+abstract class NNUpgradeUtil {
+
+ private static final Log LOG = LogFactory.getLog(NNUpgradeUtil.class);
+
+ /**
+ * Return true if this storage dir can roll back to the previous storage
+ * state, false otherwise. The NN will refuse to run the rollback operation
+ * unless at least one JM or fsimage storage directory can roll back.
+ *
+ * @param storage the storage info for the current state
+ * @param prevStorage the storage info for the previous (unupgraded) state
+ * @param targetLayoutVersion the layout version we intend to roll back to
+ * @return true if this JM can roll back, false otherwise.
+ * @throws IOException in the event of error
+ */
+ static boolean canRollBack(StorageDirectory sd, StorageInfo storage,
+ StorageInfo prevStorage, int targetLayoutVersion) throws IOException {
+ File prevDir = sd.getPreviousDir();
+ if (!prevDir.exists()) { // use current directory then
+ LOG.info("Storage directory " + sd.getRoot()
+ + " does not contain previous fs state.");
+ // read and verify consistency with other directories
+ storage.readProperties(sd);
+ return false;
+ }
+
+ // read and verify consistency of the prev dir
+ prevStorage.readPreviousVersionProperties(sd);
+
+ if (prevStorage.getLayoutVersion() != targetLayoutVersion) {
+ throw new IOException(
+ "Cannot rollback to storage version " +
+ prevStorage.getLayoutVersion() +
+ " using this version of the NameNode, which uses storage version " +
+ targetLayoutVersion + ". " +
+ "Please use the previous version of HDFS to perform the rollback.");
+ }
+
+ return true;
+ }
+
+ /**
+ * Finalize the upgrade. The previous dir, if any, will be renamed and
+ * removed. After this is completed, rollback is no longer allowed.
+ *
+ * @param sd the storage directory to finalize
+ * @throws IOException in the event of error
+ */
+ static void doFinalize(StorageDirectory sd) throws IOException {
+ File prevDir = sd.getPreviousDir();
+ if (!prevDir.exists()) { // already discarded
+ LOG.info("Directory " + prevDir + " does not exist.");
+ LOG.info("Finalize upgrade for " + sd.getRoot()+ " is not required.");
+ return;
+ }
+ LOG.info("Finalizing upgrade of storage directory " + sd.getRoot());
+ assert sd.getCurrentDir().exists() : "Current directory must exist.";
+ final File tmpDir = sd.getFinalizedTmp();
+ // rename previous to tmp and remove
+ NNStorage.rename(prevDir, tmpDir);
+ NNStorage.deleteDir(tmpDir);
+ LOG.info("Finalize upgrade for " + sd.getRoot()+ " is complete.");
+ }
+
+ /**
+ * Perform any steps that must succeed across all storage dirs/JournalManagers
+ * involved in an upgrade before proceeding onto the actual upgrade stage. If
+ * a call to any JM's or local storage dir's doPreUpgrade method fails, then
+ * doUpgrade will not be called for any JM. The existing current dir is
+ * renamed to previous.tmp, and then a new, empty current dir is created.
+ *
+ * @param sd the storage directory to perform the pre-upgrade procedure.
+ * @throws IOException in the event of error
+ */
+ static void doPreUpgrade(StorageDirectory sd) throws IOException {
+ LOG.info("Starting upgrade of storage directory " + sd.getRoot());
+ File curDir = sd.getCurrentDir();
+ File prevDir = sd.getPreviousDir();
+ File tmpDir = sd.getPreviousTmp();
+ assert curDir.exists() : "Current directory must exist.";
+ assert !prevDir.exists() : "previous directory must not exist.";
+ assert !tmpDir.exists() : "previous.tmp directory must not exist.";
+
+ // rename current to tmp
+ NNStorage.rename(curDir, tmpDir);
+
+ if (!curDir.mkdir()) {
+ throw new IOException("Cannot create directory " + curDir);
+ }
+ }
+
+ /**
+ * Perform the upgrade of the storage dir to the given storage info. The new
+ * storage info is written into the current directory, and the previous.tmp
+ * directory is renamed to previous.
+ *
+ * @param sd the storage directory to upgrade
+ * @param storage info about the new upgraded versions.
+ * @throws IOException in the event of error
+ */
+ static void doUpgrade(StorageDirectory sd, Storage storage) throws
+ IOException {
+ LOG.info("Performing upgrade of storage directory " + sd.getRoot());
+ try {
+ // Write the version file, since saveFsImage only makes the
+ // fsimage_<txid>, and the directory is otherwise empty.
+ storage.writeProperties(sd);
+
+ File prevDir = sd.getPreviousDir();
+ File tmpDir = sd.getPreviousTmp();
+ // rename tmp to previous
+ NNStorage.rename(tmpDir, prevDir);
+ } catch (IOException ioe) {
+ LOG.error("Unable to rename temp to previous for " + sd.getRoot(), ioe);
+ throw ioe;
+ }
+ }
+
+ /**
+ * Perform rollback of the storage dir to the previous state. The existing
+ * current dir is removed, and the previous dir is renamed to current.
+ *
+ * @param sd the storage directory to roll back.
+ * @throws IOException in the event of error
+ */
+ static void doRollBack(StorageDirectory sd)
+ throws IOException {
+ File prevDir = sd.getPreviousDir();
+ if (!prevDir.exists())
+ return;
+
+ File tmpDir = sd.getRemovedTmp();
+ assert !tmpDir.exists() : "removed.tmp directory must not exist.";
+ // rename current to tmp
+ File curDir = sd.getCurrentDir();
+ assert curDir.exists() : "Current directory must exist.";
+ NNStorage.rename(curDir, tmpDir);
+ // rename previous to current
+ NNStorage.rename(prevDir, curDir);
+
+ // delete tmp dir
+ NNStorage.deleteDir(tmpDir);
+ LOG.info("Rollback of " + sd.getRoot() + " is complete.");
+ }
+
+}
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NameNode.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NameNode.java
index eb3755b..1d02e01 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NameNode.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NameNode.java
@@ -648,7 +648,7 @@
String nsId = getNameServiceId(conf);
String namenodeId = HAUtil.getNameNodeId(conf, nsId);
this.haEnabled = HAUtil.isHAEnabled(conf, nsId);
- state = createHAState();
+ state = createHAState(getStartupOption(conf));
this.allowStaleStandbyReads = HAUtil.shouldAllowStandbyReads(conf);
this.haContext = createHAContext();
try {
@@ -670,8 +670,12 @@
}
}
- protected HAState createHAState() {
- return !haEnabled ? ACTIVE_STATE : STANDBY_STATE;
+ protected HAState createHAState(StartupOption startOpt) {
+ if (!haEnabled || startOpt == StartupOption.UPGRADE) {
+ return ACTIVE_STATE;
+ } else {
+ return STANDBY_STATE;
+ }
}
protected HAContext createHAContext() {
@@ -1023,26 +1027,28 @@
}
}
}
-
- private static boolean finalize(Configuration conf,
- boolean isConfirmationNeeded
- ) throws IOException {
+
+ @VisibleForTesting
+ public static boolean doRollback(Configuration conf,
+ boolean isConfirmationNeeded) throws IOException {
String nsId = DFSUtil.getNamenodeNameServiceId(conf);
String namenodeId = HAUtil.getNameNodeId(conf, nsId);
initializeGenericKeys(conf, nsId, namenodeId);
FSNamesystem nsys = new FSNamesystem(conf, new FSImage(conf));
System.err.print(
- "\"finalize\" will remove the previous state of the files system.\n"
- + "Recent upgrade will become permanent.\n"
- + "Rollback option will not be available anymore.\n");
+ "\"rollBack\" will remove the current state of the file system,\n"
+ + "returning you to the state prior to initiating your recent.\n"
+ + "upgrade. This action is permanent and cannot be undone. If you\n"
+ + "are performing a rollback in an HA environment, you should be\n"
+ + "certain that no NameNode process is running on any host.");
if (isConfirmationNeeded) {
- if (!confirmPrompt("Finalize filesystem state?")) {
- System.err.println("Finalize aborted.");
+ if (!confirmPrompt("Roll back file system state?")) {
+ System.err.println("Rollback aborted.");
return true;
}
}
- nsys.dir.fsImage.finalizeUpgrade();
+ nsys.dir.fsImage.doRollback(nsys);
return false;
}
@@ -1206,14 +1212,6 @@
return null;
}
setStartupOption(conf, startOpt);
-
- if (HAUtil.isHAEnabled(conf, DFSUtil.getNamenodeNameServiceId(conf)) &&
- (startOpt == StartupOption.UPGRADE ||
- startOpt == StartupOption.ROLLBACK ||
- startOpt == StartupOption.FINALIZE)) {
- throw new HadoopIllegalArgumentException("Invalid startup option. " +
- "Cannot perform DFS upgrade with HA enabled.");
- }
switch (startOpt) {
case FORMAT: {
@@ -1229,10 +1227,17 @@
return null;
}
case FINALIZE: {
- boolean aborted = finalize(conf, true);
- terminate(aborted ? 1 : 0);
+ System.err.println("Use of the argument '" + StartupOption.FINALIZE +
+ "' is no longer supported. To finalize an upgrade, start the NN " +
+ " and then run `hdfs dfsadmin -finalizeUpgrade'");
+ terminate(1);
return null; // avoid javac warning
}
+ case ROLLBACK: {
+ boolean aborted = doRollback(conf, true);
+ terminate(aborted ? 1 : 0);
+ return null; // avoid warning
+ }
case BOOTSTRAPSTANDBY: {
String toolArgs[] = Arrays.copyOfRange(argv, 1, argv.length);
int rc = BootstrapStandby.run(toolArgs, conf);
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/ha/BootstrapStandby.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/ha/BootstrapStandby.java
index ac0761d..29b43cf 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/ha/BootstrapStandby.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/ha/BootstrapStandby.java
@@ -39,6 +39,7 @@
import org.apache.hadoop.hdfs.NameNodeProxies;
import org.apache.hadoop.hdfs.protocol.HdfsConstants;
import org.apache.hadoop.hdfs.server.common.Storage;
+import org.apache.hadoop.hdfs.server.common.HdfsServerConstants.StartupOption;
import org.apache.hadoop.hdfs.server.namenode.EditLogInputStream;
import org.apache.hadoop.hdfs.server.namenode.FSImage;
import org.apache.hadoop.hdfs.server.namenode.FSNamesystem;
@@ -192,7 +193,7 @@
FSImage image = new FSImage(conf);
try {
image.getStorage().setStorageInfo(storage);
- image.initEditLog();
+ image.initEditLog(StartupOption.REGULAR);
assert image.getEditLog().isOpenForRead() :
"Expected edit log to be open for read";
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/tools/DFSAdmin.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/tools/DFSAdmin.java
index 09a4d70..9b5b2ab 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/tools/DFSAdmin.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/tools/DFSAdmin.java
@@ -20,6 +20,7 @@
import java.io.File;
import java.io.IOException;
import java.net.InetSocketAddress;
+import java.net.URI;
import java.net.URL;
import java.security.PrivilegedExceptionAction;
import java.util.ArrayList;
@@ -46,6 +47,7 @@
import org.apache.hadoop.hdfs.HdfsConfiguration;
import org.apache.hadoop.hdfs.NameNodeProxies;
import org.apache.hadoop.hdfs.protocol.ClientDatanodeProtocol;
+import org.apache.hadoop.hdfs.protocol.ClientProtocol;
import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
import org.apache.hadoop.hdfs.protocol.HdfsConstants;
import org.apache.hadoop.hdfs.protocol.HdfsConstants.DatanodeReportType;
@@ -769,7 +771,24 @@
*/
public int finalizeUpgrade() throws IOException {
DistributedFileSystem dfs = getDFS();
- dfs.finalizeUpgrade();
+
+ Configuration dfsConf = dfs.getConf();
+ URI dfsUri = dfs.getUri();
+ boolean isHaEnabled = HAUtil.isLogicalUri(dfsConf, dfsUri);
+ if (isHaEnabled) {
+ // In the case of HA, run finalizeUpgrade for all NNs in this nameservice
+ String nsId = dfsUri.getHost();
+ List<ClientProtocol> namenodes =
+ HAUtil.getProxiesForAllNameNodesInNameservice(dfsConf, nsId);
+ if (!HAUtil.isAtLeastOneActive(namenodes)) {
+ throw new IOException("Cannot finalize with no NameNode active");
+ }
+ for (ClientProtocol haNn : namenodes) {
+ haNn.finalizeUpgrade();
+ }
+ } else {
+ dfs.finalizeUpgrade();
+ }
return 0;
}
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/proto/QJournalProtocol.proto b/hadoop-hdfs-project/hadoop-hdfs/src/main/proto/QJournalProtocol.proto
index ae96375..cff6439 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/proto/QJournalProtocol.proto
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/proto/QJournalProtocol.proto
@@ -134,6 +134,72 @@
}
/**
+ * getJournalCTime()
+ */
+message GetJournalCTimeRequestProto {
+ required JournalIdProto jid = 1;
+}
+
+message GetJournalCTimeResponseProto {
+ required int64 resultCTime = 1;
+}
+
+/**
+ * doPreUpgrade()
+ */
+message DoPreUpgradeRequestProto {
+ required JournalIdProto jid = 1;
+}
+
+message DoPreUpgradeResponseProto {
+}
+
+/**
+ * doUpgrade()
+ */
+message DoUpgradeRequestProto {
+ required JournalIdProto jid = 1;
+ required StorageInfoProto sInfo = 2;
+}
+
+message DoUpgradeResponseProto {
+}
+
+/**
+ * doFinalize()
+ */
+message DoFinalizeRequestProto {
+ required JournalIdProto jid = 1;
+}
+
+message DoFinalizeResponseProto {
+}
+
+/**
+ * canRollBack()
+ */
+message CanRollBackRequestProto {
+ required JournalIdProto jid = 1;
+ required StorageInfoProto storage = 2;
+ required StorageInfoProto prevStorage = 3;
+ required int32 targetLayoutVersion = 4;
+}
+
+message CanRollBackResponseProto {
+ required bool canRollBack = 1;
+}
+
+/**
+ * doRollback()
+ */
+message DoRollbackRequestProto {
+ required JournalIdProto jid = 1;
+}
+
+message DoRollbackResponseProto {
+}
+
+/**
* getJournalState()
*/
message GetJournalStateRequestProto {
@@ -236,6 +302,18 @@
service QJournalProtocolService {
rpc isFormatted(IsFormattedRequestProto) returns (IsFormattedResponseProto);
+ rpc getJournalCTime(GetJournalCTimeRequestProto) returns (GetJournalCTimeResponseProto);
+
+ rpc doPreUpgrade(DoPreUpgradeRequestProto) returns (DoPreUpgradeResponseProto);
+
+ rpc doUpgrade(DoUpgradeRequestProto) returns (DoUpgradeResponseProto);
+
+ rpc doFinalize(DoFinalizeRequestProto) returns (DoFinalizeResponseProto);
+
+ rpc canRollBack(CanRollBackRequestProto) returns (CanRollBackResponseProto);
+
+ rpc doRollback(DoRollbackRequestProto) returns (DoRollbackResponseProto);
+
rpc getJournalState(GetJournalStateRequestProto) returns (GetJournalStateResponseProto);
rpc newEpoch(NewEpochRequestProto) returns (NewEpochResponseProto);
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/site/apt/HDFSHighAvailabilityWithQJM.apt.vm b/hadoop-hdfs-project/hadoop-hdfs/src/site/apt/HDFSHighAvailabilityWithQJM.apt.vm
index 2aefc35..eccd705 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/site/apt/HDFSHighAvailabilityWithQJM.apt.vm
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/site/apt/HDFSHighAvailabilityWithQJM.apt.vm
@@ -765,3 +765,49 @@
Even if automatic failover is configured, you may initiate a manual failover
using the same <<<hdfs haadmin>>> command. It will perform a coordinated
failover.
+
+* HDFS Upgrade/Finalization/Rollback with HA Enabled
+
+ When moving between versions of HDFS, sometimes the newer software can simply
+ be installed and the cluster restarted. Sometimes, however, upgrading the
+ version of HDFS you're running may require changing on-disk data. In this case,
+ one must use the HDFS Upgrade/Finalize/Rollback facility after installing the
+ new software. This process is made more complex in an HA environment, since the
+ on-disk metadata that the NN relies upon is by definition distributed, both on
+ the two HA NNs in the pair, and on the JournalNodes in the case that QJM is
+ being used for the shared edits storage. This documentation section describes
+ the procedure to use the HDFS Upgrade/Finalize/Rollback facility in an HA setup.
+
+ <<To perform an HA upgrade>>, the operator must do the following:
+
+ [[1]] Shut down all of the NNs as normal, and install the newer software.
+
+ [[2]] Start one of the NNs with the <<<'-upgrade'>>> flag.
+
+ [[3]] On start, this NN will not enter the standby state as usual in an HA
+ setup. Rather, this NN will immediately enter the active state, perform an
+ upgrade of its local storage dirs, and also perform an upgrade of the shared
+ edit log.
+
+ [[4]] At this point the other NN in the HA pair will be out of sync with
+ the upgraded NN. In order to bring it back in sync and once again have a highly
+ available setup, you should re-bootstrap this NameNode by running the NN with
+ the <<<'-bootstrapStandby'>>> flag. It is an error to start this second NN with
+ the <<<'-upgrade'>>> flag.
+
+ Note that if at any time you want to restart the NameNodes before finalizing
+ or rolling back the upgrade, you should start the NNs as normal, i.e. without
+ any special startup flag.
+
+ <<To finalize an HA upgrade>>, the operator will use the <<<`hdfsadmin
+ dfsadmin -finalizeUpgrade'>>> command while the NNs are running and one of them
+ is active. The active NN at the time this happens will perform the finalization
+ of the shared log, and the NN whose local storage directories contain the
+ previous FS state will delete its local state.
+
+ <<To perform a rollback>> of an upgrade, both NNs should first be shut down.
+ The operator should run the roll back command on the NN where they initiated
+ the upgrade procedure, which will perform the rollback on the local dirs there,
+ as well as on the shared log, either NFS or on the JNs. Afterward, this NN
+ should be started and the operator should run <<<`-bootstrapStandby'>>> on the
+ other NN to bring the two NNs in sync with this rolled-back file system state.
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/MiniDFSCluster.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/MiniDFSCluster.java
index ea78be1..3e9b614 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/MiniDFSCluster.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/MiniDFSCluster.java
@@ -100,7 +100,6 @@
import org.apache.hadoop.security.SecurityUtil;
import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.security.authorize.ProxyUsers;
-import org.apache.hadoop.security.ssl.SSLFactory;
import org.apache.hadoop.util.ExitUtil;
import org.apache.hadoop.util.StringUtils;
import org.apache.hadoop.util.ToolRunner;
@@ -147,6 +146,7 @@
private boolean enableManagedDfsDirsRedundancy = true;
private boolean manageDataDfsDirs = true;
private StartupOption option = null;
+ private StartupOption dnOption = null;
private String[] racks = null;
private String [] hosts = null;
private long [] simulatedCapacities = null;
@@ -241,6 +241,14 @@
this.option = val;
return this;
}
+
+ /**
+ * Default: null
+ */
+ public Builder dnStartupOption(StartupOption val) {
+ this.dnOption = val;
+ return this;
+ }
/**
* Default: null
@@ -357,6 +365,7 @@
builder.enableManagedDfsDirsRedundancy,
builder.manageDataDfsDirs,
builder.option,
+ builder.dnOption,
builder.racks,
builder.hosts,
builder.simulatedCapacities,
@@ -406,18 +415,24 @@
/**
* Stores the information related to a namenode in the cluster
*/
- static class NameNodeInfo {
+ public static class NameNodeInfo {
final NameNode nameNode;
final Configuration conf;
final String nameserviceId;
final String nnId;
+ StartupOption startOpt;
NameNodeInfo(NameNode nn, String nameserviceId, String nnId,
- Configuration conf) {
+ StartupOption startOpt, Configuration conf) {
this.nameNode = nn;
this.nameserviceId = nameserviceId;
this.nnId = nnId;
+ this.startOpt = startOpt;
this.conf = conf;
}
+
+ public void setStartOpt(StartupOption startOpt) {
+ this.startOpt = startOpt;
+ }
}
/**
@@ -603,8 +618,8 @@
long[] simulatedCapacities) throws IOException {
this.nameNodes = new NameNodeInfo[1]; // Single namenode in the cluster
initMiniDFSCluster(conf, numDataNodes, StorageType.DEFAULT, format,
- manageNameDfsDirs, true, manageDataDfsDirs, manageDataDfsDirs,
- operation, racks, hosts,
+ manageNameDfsDirs, true, manageDataDfsDirs, manageDataDfsDirs,
+ operation, null, racks, hosts,
simulatedCapacities, null, true, false,
MiniDFSNNTopology.simpleSingleNN(nameNodePort, 0), true, false, false);
}
@@ -613,7 +628,8 @@
Configuration conf,
int numDataNodes, StorageType storageType, boolean format, boolean manageNameDfsDirs,
boolean manageNameDfsSharedDirs, boolean enableManagedDfsDirsRedundancy,
- boolean manageDataDfsDirs, StartupOption operation, String[] racks,
+ boolean manageDataDfsDirs, StartupOption startOpt,
+ StartupOption dnStartOpt, String[] racks,
String[] hosts, long[] simulatedCapacities, String clusterId,
boolean waitSafeMode, boolean setupHostsFile,
MiniDFSNNTopology nnTopology, boolean checkExitOnShutdown,
@@ -662,7 +678,7 @@
createNameNodesAndSetConf(
nnTopology, manageNameDfsDirs, manageNameDfsSharedDirs,
enableManagedDfsDirsRedundancy,
- format, operation, clusterId, conf);
+ format, startOpt, clusterId, conf);
} catch (IOException ioe) {
LOG.error("IOE creating namenodes. Permissions dump:\n" +
createPermissionsDiagnosisString(data_dir));
@@ -675,13 +691,15 @@
}
}
- if (operation == StartupOption.RECOVER) {
+ if (startOpt == StartupOption.RECOVER) {
return;
}
// Start the DataNodes
- startDataNodes(conf, numDataNodes, storageType, manageDataDfsDirs, operation, racks,
- hosts, simulatedCapacities, setupHostsFile, checkDataNodeAddrConfig, checkDataNodeHostConfig);
+ startDataNodes(conf, numDataNodes, storageType, manageDataDfsDirs,
+ dnStartOpt != null ? dnStartOpt : startOpt,
+ racks, hosts, simulatedCapacities, setupHostsFile,
+ checkDataNodeAddrConfig, checkDataNodeHostConfig);
waitClusterUp();
//make sure ProxyUsers uses the latest conf
ProxyUsers.refreshSuperUserGroupsConfiguration(conf);
@@ -759,6 +777,8 @@
if (manageNameDfsSharedDirs) {
URI sharedEditsUri = getSharedEditsDir(nnCounter, nnCounter+nnIds.size()-1);
conf.set(DFS_NAMENODE_SHARED_EDITS_DIR_KEY, sharedEditsUri.toString());
+ // Clean out the shared edits dir completely, including all subdirectories.
+ FileUtil.fullyDelete(new File(sharedEditsUri));
}
}
@@ -858,7 +878,8 @@
URI srcDir = Lists.newArrayList(srcDirs).get(0);
FileSystem dstFS = FileSystem.getLocal(dstConf).getRaw();
for (URI dstDir : dstDirs) {
- Preconditions.checkArgument(!dstDir.equals(srcDir));
+ Preconditions.checkArgument(!dstDir.equals(srcDir),
+ "src and dst are the same: " + dstDir);
File dstDirF = new File(dstDir);
if (dstDirF.exists()) {
if (!FileUtil.fullyDelete(dstDirF)) {
@@ -892,6 +913,14 @@
conf.set(key, "127.0.0.1:" + nnConf.getIpcPort());
}
+ private static String[] createArgs(StartupOption operation) {
+ String[] args = (operation == null ||
+ operation == StartupOption.FORMAT ||
+ operation == StartupOption.REGULAR) ?
+ new String[] {} : new String[] {operation.getName()};
+ return args;
+ }
+
private void createNameNode(int nnIndex, Configuration conf,
int numDataNodes, boolean format, StartupOption operation,
String clusterId, String nameserviceId,
@@ -906,10 +935,7 @@
}
// Start the NameNode
- String[] args = (operation == null ||
- operation == StartupOption.FORMAT ||
- operation == StartupOption.REGULAR) ?
- new String[] {} : new String[] {operation.getName()};
+ String[] args = createArgs(operation);
NameNode nn = NameNode.createNameNode(args, conf);
if (operation == StartupOption.RECOVER) {
return;
@@ -931,7 +957,7 @@
DFSUtil.setGenericConf(conf, nameserviceId, nnId,
DFS_NAMENODE_HTTP_ADDRESS_KEY);
nameNodes[nnIndex] = new NameNodeInfo(nn, nameserviceId, nnId,
- new Configuration(conf));
+ operation, new Configuration(conf));
}
/**
@@ -1499,7 +1525,7 @@
nn.stop();
nn.join();
Configuration conf = nameNodes[nnIndex].conf;
- nameNodes[nnIndex] = new NameNodeInfo(null, null, null, conf);
+ nameNodes[nnIndex] = new NameNodeInfo(null, null, null, null, conf);
}
}
@@ -1545,10 +1571,12 @@
throws IOException {
String nameserviceId = nameNodes[nnIndex].nameserviceId;
String nnId = nameNodes[nnIndex].nnId;
+ StartupOption startOpt = nameNodes[nnIndex].startOpt;
Configuration conf = nameNodes[nnIndex].conf;
shutdownNameNode(nnIndex);
- NameNode nn = NameNode.createNameNode(new String[] {}, conf);
- nameNodes[nnIndex] = new NameNodeInfo(nn, nameserviceId, nnId, conf);
+ NameNode nn = NameNode.createNameNode(createArgs(startOpt), conf);
+ nameNodes[nnIndex] = new NameNodeInfo(nn, nameserviceId, nnId, startOpt,
+ conf);
if (waitActive) {
waitClusterUp();
LOG.info("Restarted the namenode");
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDFSRollback.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDFSRollback.java
index 7715041..f88edcd 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDFSRollback.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDFSRollback.java
@@ -34,6 +34,7 @@
import org.apache.hadoop.hdfs.server.common.HdfsServerConstants.StartupOption;
import org.apache.hadoop.hdfs.server.common.StorageInfo;
import org.apache.hadoop.hdfs.server.namenode.FSImageTestUtil;
+import org.apache.hadoop.hdfs.server.namenode.NameNode;
import org.apache.hadoop.util.StringUtils;
import org.junit.After;
import org.junit.Test;
@@ -97,10 +98,10 @@
* Attempts to start a NameNode with the given operation. Starting
* the NameNode should throw an exception.
*/
- void startNameNodeShouldFail(StartupOption operation, String searchString) {
+ void startNameNodeShouldFail(String searchString) {
try {
+ NameNode.doRollback(conf, false);
cluster = new MiniDFSCluster.Builder(conf).numDataNodes(0)
- .startupOption(operation)
.format(false)
.manageDataDfsDirs(false)
.manageNameDfsDirs(false)
@@ -149,24 +150,19 @@
log("Normal NameNode rollback", numDirs);
UpgradeUtilities.createNameNodeStorageDirs(nameNodeDirs, "current");
UpgradeUtilities.createNameNodeStorageDirs(nameNodeDirs, "previous");
- cluster = new MiniDFSCluster.Builder(conf).numDataNodes(0)
- .format(false)
- .manageDataDfsDirs(false)
- .manageNameDfsDirs(false)
- .startupOption(StartupOption.ROLLBACK)
- .build();
+ NameNode.doRollback(conf, false);
checkResult(NAME_NODE, nameNodeDirs);
- cluster.shutdown();
UpgradeUtilities.createEmptyDirs(nameNodeDirs);
log("Normal DataNode rollback", numDirs);
UpgradeUtilities.createNameNodeStorageDirs(nameNodeDirs, "current");
UpgradeUtilities.createNameNodeStorageDirs(nameNodeDirs, "previous");
+ NameNode.doRollback(conf, false);
cluster = new MiniDFSCluster.Builder(conf).numDataNodes(0)
.format(false)
.manageDataDfsDirs(false)
.manageNameDfsDirs(false)
- .startupOption(StartupOption.ROLLBACK)
+ .dnStartupOption(StartupOption.ROLLBACK)
.build();
UpgradeUtilities.createDataNodeStorageDirs(dataNodeDirs, "current");
UpgradeUtilities.createDataNodeStorageDirs(dataNodeDirs, "previous");
@@ -179,11 +175,12 @@
log("Normal BlockPool rollback", numDirs);
UpgradeUtilities.createNameNodeStorageDirs(nameNodeDirs, "current");
UpgradeUtilities.createNameNodeStorageDirs(nameNodeDirs, "previous");
+ NameNode.doRollback(conf, false);
cluster = new MiniDFSCluster.Builder(conf).numDataNodes(0)
.format(false)
.manageDataDfsDirs(false)
.manageNameDfsDirs(false)
- .startupOption(StartupOption.ROLLBACK)
+ .dnStartupOption(StartupOption.ROLLBACK)
.build();
UpgradeUtilities.createDataNodeStorageDirs(dataNodeDirs, "current");
UpgradeUtilities.createBlockPoolStorageDirs(dataNodeDirs, "current",
@@ -217,10 +214,10 @@
cluster.shutdown();
UpgradeUtilities.createEmptyDirs(nameNodeDirs);
UpgradeUtilities.createEmptyDirs(dataNodeDirs);
-
+
log("NameNode rollback without existing previous dir", numDirs);
UpgradeUtilities.createNameNodeStorageDirs(nameNodeDirs, "current");
- startNameNodeShouldFail(StartupOption.ROLLBACK,
+ startNameNodeShouldFail(
"None of the storage directories contain previous fs state");
UpgradeUtilities.createEmptyDirs(nameNodeDirs);
@@ -237,15 +234,16 @@
cluster.shutdown();
UpgradeUtilities.createEmptyDirs(nameNodeDirs);
UpgradeUtilities.createEmptyDirs(dataNodeDirs);
-
+
log("DataNode rollback with future stored layout version in previous", numDirs);
UpgradeUtilities.createNameNodeStorageDirs(nameNodeDirs, "current");
UpgradeUtilities.createNameNodeStorageDirs(nameNodeDirs, "previous");
+ NameNode.doRollback(conf, false);
cluster = new MiniDFSCluster.Builder(conf).numDataNodes(0)
.format(false)
.manageDataDfsDirs(false)
.manageNameDfsDirs(false)
- .startupOption(StartupOption.ROLLBACK)
+ .dnStartupOption(StartupOption.ROLLBACK)
.build();
UpgradeUtilities.createDataNodeStorageDirs(dataNodeDirs, "current");
baseDirs = UpgradeUtilities.createDataNodeStorageDirs(dataNodeDirs, "previous");
@@ -266,11 +264,12 @@
log("DataNode rollback with newer fsscTime in previous", numDirs);
UpgradeUtilities.createNameNodeStorageDirs(nameNodeDirs, "current");
UpgradeUtilities.createNameNodeStorageDirs(nameNodeDirs, "previous");
+ NameNode.doRollback(conf, false);
cluster = new MiniDFSCluster.Builder(conf).numDataNodes(0)
.format(false)
.manageDataDfsDirs(false)
.manageNameDfsDirs(false)
- .startupOption(StartupOption.ROLLBACK)
+ .dnStartupOption(StartupOption.ROLLBACK)
.build();
UpgradeUtilities.createDataNodeStorageDirs(dataNodeDirs, "current");
@@ -287,21 +286,19 @@
cluster.shutdown();
UpgradeUtilities.createEmptyDirs(nameNodeDirs);
UpgradeUtilities.createEmptyDirs(dataNodeDirs);
-
+
log("NameNode rollback with no edits file", numDirs);
UpgradeUtilities.createNameNodeStorageDirs(nameNodeDirs, "current");
baseDirs = UpgradeUtilities.createNameNodeStorageDirs(nameNodeDirs, "previous");
deleteMatchingFiles(baseDirs, "edits.*");
- startNameNodeShouldFail(StartupOption.ROLLBACK,
- "Gap in transactions");
+ startNameNodeShouldFail("Gap in transactions");
UpgradeUtilities.createEmptyDirs(nameNodeDirs);
log("NameNode rollback with no image file", numDirs);
UpgradeUtilities.createNameNodeStorageDirs(nameNodeDirs, "current");
baseDirs = UpgradeUtilities.createNameNodeStorageDirs(nameNodeDirs, "previous");
deleteMatchingFiles(baseDirs, "fsimage_.*");
- startNameNodeShouldFail(StartupOption.ROLLBACK,
- "No valid image files found");
+ startNameNodeShouldFail("No valid image files found");
UpgradeUtilities.createEmptyDirs(nameNodeDirs);
log("NameNode rollback with corrupt version file", numDirs);
@@ -313,8 +310,7 @@
"layoutVersion".getBytes(Charsets.UTF_8),
"xxxxxxxxxxxxx".getBytes(Charsets.UTF_8));
}
- startNameNodeShouldFail(StartupOption.ROLLBACK,
- "file VERSION has layoutVersion missing");
+ startNameNodeShouldFail("file VERSION has layoutVersion missing");
UpgradeUtilities.createEmptyDirs(nameNodeDirs);
@@ -328,8 +324,7 @@
UpgradeUtilities.createNameNodeVersionFile(conf, baseDirs,
storageInfo, UpgradeUtilities.getCurrentBlockPoolID(cluster));
- startNameNodeShouldFail(StartupOption.ROLLBACK,
- "Cannot rollback to storage version 1 using this version");
+ startNameNodeShouldFail("Cannot rollback to storage version 1 using this version");
UpgradeUtilities.createEmptyDirs(nameNodeDirs);
} // end numDir loop
}
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDFSUtil.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDFSUtil.java
index 3a0134e..cf72e79 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDFSUtil.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDFSUtil.java
@@ -62,6 +62,7 @@
import org.apache.hadoop.hdfs.server.namenode.ha.ConfiguredFailoverProxyProvider;
import org.apache.hadoop.net.NetUtils;
import org.apache.hadoop.security.UserGroupInformation;
+import org.apache.hadoop.test.GenericTestUtils;
import org.apache.hadoop.util.Shell;
import org.junit.Assume;
import org.junit.Before;
@@ -764,4 +765,37 @@
assertEquals(4*24*60*60*1000l, DFSUtil.parseRelativeTime("4d"));
assertEquals(999*24*60*60*1000l, DFSUtil.parseRelativeTime("999d"));
}
+
+ @Test
+ public void testAssertAllResultsEqual() {
+ checkAllResults(new Long[]{}, true);
+ checkAllResults(new Long[]{1l}, true);
+ checkAllResults(new Long[]{1l, 1l}, true);
+ checkAllResults(new Long[]{1l, 1l, 1l}, true);
+ checkAllResults(new Long[]{new Long(1), new Long(1)}, true);
+ checkAllResults(new Long[]{null, null, null}, true);
+
+ checkAllResults(new Long[]{1l, 2l}, false);
+ checkAllResults(new Long[]{2l, 1l}, false);
+ checkAllResults(new Long[]{1l, 2l, 1l}, false);
+ checkAllResults(new Long[]{2l, 1l, 1l}, false);
+ checkAllResults(new Long[]{1l, 1l, 2l}, false);
+ checkAllResults(new Long[]{1l, null}, false);
+ checkAllResults(new Long[]{null, 1l}, false);
+ checkAllResults(new Long[]{1l, null, 1l}, false);
+ }
+
+ private static void checkAllResults(Long[] toCheck, boolean shouldSucceed) {
+ if (shouldSucceed) {
+ DFSUtil.assertAllResultsEqual(Arrays.asList(toCheck));
+ } else {
+ try {
+ DFSUtil.assertAllResultsEqual(Arrays.asList(toCheck));
+ fail("Should not have succeeded with input: " +
+ Arrays.toString(toCheck));
+ } catch (AssertionError ae) {
+ GenericTestUtils.assertExceptionContains("Not all elements match", ae);
+ }
+ }
+ }
}
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestLeaseRecovery.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestLeaseRecovery.java
index 2dbe6e8..91c9ba8 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestLeaseRecovery.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestLeaseRecovery.java
@@ -19,20 +19,28 @@
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertTrue;
+import java.io.File;
import java.io.IOException;
+import java.io.RandomAccessFile;
import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FSDataOutputStream;
+import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hdfs.MiniDFSCluster.DataNodeProperties;
import org.apache.hadoop.hdfs.protocol.Block;
+import org.apache.hadoop.hdfs.protocol.BlockLocalPathInfo;
import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
import org.apache.hadoop.hdfs.protocol.HdfsConstants;
import org.apache.hadoop.hdfs.protocol.LocatedBlock;
+import org.apache.hadoop.hdfs.protocol.LocatedBlocks;
import org.apache.hadoop.hdfs.server.datanode.DataNode;
import org.apache.hadoop.hdfs.server.datanode.DataNodeTestUtils;
import org.apache.hadoop.hdfs.server.datanode.fsdataset.impl.TestInterDatanodeProtocol;
import org.apache.hadoop.hdfs.server.namenode.LeaseManager;
import org.apache.hadoop.hdfs.server.namenode.NameNodeAdapter;
+import org.apache.hadoop.security.UserGroupInformation;
import org.junit.Test;
public class TestLeaseRecovery {
@@ -148,4 +156,55 @@
if (cluster != null) {cluster.shutdown();}
}
}
+
+ /**
+ * Block Recovery when the meta file not having crcs for all chunks in block
+ * file
+ */
+ @Test
+ public void testBlockRecoveryWithLessMetafile() throws Exception {
+ Configuration conf = new Configuration();
+ conf.set(DFSConfigKeys.DFS_BLOCK_LOCAL_PATH_ACCESS_USER_KEY,
+ UserGroupInformation.getCurrentUser().getShortUserName());
+ MiniDFSCluster cluster = new MiniDFSCluster.Builder(conf).numDataNodes(1)
+ .build();
+ Path file = new Path("/testRecoveryFile");
+ DistributedFileSystem dfs = cluster.getFileSystem();
+ FSDataOutputStream out = dfs.create(file);
+ int count = 0;
+ while (count < 2 * 1024 * 1024) {
+ out.writeBytes("Data");
+ count += 4;
+ }
+ out.hsync();
+ // abort the original stream
+ ((DFSOutputStream) out.getWrappedStream()).abort();
+
+ LocatedBlocks locations = cluster.getNameNodeRpc().getBlockLocations(
+ file.toString(), 0, count);
+ ExtendedBlock block = locations.get(0).getBlock();
+ DataNode dn = cluster.getDataNodes().get(0);
+ BlockLocalPathInfo localPathInfo = dn.getBlockLocalPathInfo(block, null);
+ File metafile = new File(localPathInfo.getMetaPath());
+ assertTrue(metafile.exists());
+
+ // reduce the block meta file size
+ RandomAccessFile raf = new RandomAccessFile(metafile, "rw");
+ raf.setLength(metafile.length() - 20);
+ raf.close();
+
+ // restart DN to make replica to RWR
+ DataNodeProperties dnProp = cluster.stopDataNode(0);
+ cluster.restartDataNode(dnProp, true);
+
+ // try to recover the lease
+ DistributedFileSystem newdfs = (DistributedFileSystem) FileSystem
+ .newInstance(cluster.getConfiguration(0));
+ count = 0;
+ while (++count < 10 && !newdfs.recoverLease(file)) {
+ Thread.sleep(1000);
+ }
+ assertTrue("File should be closed", newdfs.recoverLease(file));
+
+ }
}
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/qjournal/MiniJournalCluster.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/qjournal/MiniJournalCluster.java
index fab83b4..a4f67e5 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/qjournal/MiniJournalCluster.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/qjournal/MiniJournalCluster.java
@@ -167,8 +167,16 @@
return new File(baseDir, "journalnode-" + idx).getAbsoluteFile();
}
+ public File getJournalDir(int idx, String jid) {
+ return new File(getStorageDir(idx), jid);
+ }
+
public File getCurrentDir(int idx, String jid) {
- return new File(new File(getStorageDir(idx), jid), "current");
+ return new File(getJournalDir(idx, jid), "current");
+ }
+
+ public File getPreviousDir(int idx, String jid) {
+ return new File(getJournalDir(idx, jid), "previous");
}
public JournalNode getJournalNode(int i) {
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/qjournal/MiniQJMHACluster.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/qjournal/MiniQJMHACluster.java
index e5b636c..e782da2 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/qjournal/MiniQJMHACluster.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/qjournal/MiniQJMHACluster.java
@@ -29,6 +29,7 @@
import org.apache.hadoop.hdfs.DFSUtil;
import org.apache.hadoop.hdfs.MiniDFSCluster;
import org.apache.hadoop.hdfs.MiniDFSNNTopology;
+import org.apache.hadoop.hdfs.server.common.HdfsServerConstants.StartupOption;
import org.apache.hadoop.hdfs.server.namenode.NameNode;
import org.apache.hadoop.hdfs.server.namenode.ha.ConfiguredFailoverProxyProvider;
@@ -47,6 +48,7 @@
public static class Builder {
private final Configuration conf;
+ private StartupOption startOpt = null;
private final MiniDFSCluster.Builder dfsBuilder;
public Builder(Configuration conf) {
@@ -61,6 +63,10 @@
public MiniQJMHACluster build() throws IOException {
return new MiniQJMHACluster(this);
}
+
+ public void startupOption(StartupOption startOpt) {
+ this.startOpt = startOpt;
+ }
}
public static MiniDFSNNTopology createDefaultTopology() {
@@ -95,6 +101,9 @@
Configuration confNN0 = cluster.getConfiguration(0);
NameNode.initializeSharedEdits(confNN0, true);
+ cluster.getNameNodeInfos()[0].setStartOpt(builder.startOpt);
+ cluster.getNameNodeInfos()[1].setStartOpt(builder.startOpt);
+
// restart the cluster
cluster.restartNameNodes();
}
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestGenericJournalConf.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestGenericJournalConf.java
index 43bd7a4..28d1dac 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestGenericJournalConf.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestGenericJournalConf.java
@@ -27,6 +27,8 @@
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hdfs.DFSConfigKeys;
import org.apache.hadoop.hdfs.MiniDFSCluster;
+import org.apache.hadoop.hdfs.server.common.Storage;
+import org.apache.hadoop.hdfs.server.common.StorageInfo;
import org.apache.hadoop.hdfs.server.protocol.NamespaceInfo;
import org.junit.Test;
@@ -191,6 +193,29 @@
shouldPromptCalled = true;
return false;
}
+
+ @Override
+ public void doPreUpgrade() throws IOException {}
+
+ @Override
+ public void doUpgrade(Storage storage) throws IOException {}
+
+ @Override
+ public void doFinalize() throws IOException {}
+
+ @Override
+ public boolean canRollBack(StorageInfo storage, StorageInfo prevStorage, int targetLayoutVersion)
+ throws IOException {
+ return false;
+ }
+
+ @Override
+ public void doRollback() throws IOException {}
+
+ @Override
+ public long getJournalCTime() throws IOException {
+ return -1;
+ }
}
public static class BadConstructorJournalManager extends DummyJournalManager {
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/ha/TestBootstrapStandby.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/ha/TestBootstrapStandby.java
index 502c9de..7abc502 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/ha/TestBootstrapStandby.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/ha/TestBootstrapStandby.java
@@ -91,7 +91,7 @@
fail("Did not throw");
} catch (IOException ioe) {
GenericTestUtils.assertExceptionContains(
- "Cannot start an HA namenode with name dirs that need recovery",
+ "storage directory does not exist or is not accessible",
ioe);
}
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/ha/TestDFSUpgradeWithHA.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/ha/TestDFSUpgradeWithHA.java
index 4f213b2..c3a8674 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/ha/TestDFSUpgradeWithHA.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/ha/TestDFSUpgradeWithHA.java
@@ -1,89 +1,506 @@
/**
-* Licensed to the Apache Software Foundation (ASF) under one
-* or more contributor license agreements. See the NOTICE file
-* distributed with this work for additional information
-* regarding copyright ownership. The ASF licenses this file
-* to you under the Apache License, Version 2.0 (the
-* "License"); you may not use this file except in compliance
-* with the License. You may obtain a copy of the License at
-*
-* http://www.apache.org/licenses/LICENSE-2.0
-*
-* Unless required by applicable law or agreed to in writing, software
-* distributed under the License is distributed on an "AS IS" BASIS,
-* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-* See the License for the specific language governing permissions and
-* limitations under the License.
-*/
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
package org.apache.hadoop.hdfs.server.namenode.ha;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertTrue;
import static org.junit.Assert.fail;
import java.io.File;
import java.io.IOException;
import java.net.URI;
+import java.net.URISyntaxException;
+import java.util.Collection;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.CommonConfigurationKeysPublic;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hdfs.DFSConfigKeys;
+import org.apache.hadoop.hdfs.HdfsConfiguration;
import org.apache.hadoop.hdfs.MiniDFSCluster;
import org.apache.hadoop.hdfs.MiniDFSNNTopology;
+import org.apache.hadoop.hdfs.qjournal.MiniQJMHACluster;
+import org.apache.hadoop.hdfs.qjournal.MiniQJMHACluster.Builder;
+import org.apache.hadoop.hdfs.qjournal.server.Journal;
import org.apache.hadoop.hdfs.server.common.HdfsServerConstants.StartupOption;
import org.apache.hadoop.hdfs.server.common.Storage;
+import org.apache.hadoop.hdfs.server.namenode.NameNode;
+import org.apache.hadoop.hdfs.tools.DFSAdmin;
+import org.apache.hadoop.hdfs.util.PersistentLongFile;
import org.apache.hadoop.test.GenericTestUtils;
+import org.junit.Before;
import org.junit.Test;
-import com.google.common.collect.Lists;
+import com.google.common.base.Joiner;
/**
* Tests for upgrading with HA enabled.
*/
public class TestDFSUpgradeWithHA {
-
- private static final Log LOG = LogFactory.getLog(TestDFSUpgradeWithHA.class);
+ private static final Log LOG = LogFactory.getLog(TestDFSUpgradeWithHA.class);
+
+ private Configuration conf;
+
+ @Before
+ public void createConfiguration() {
+ conf = new HdfsConfiguration();
+ // Turn off persistent IPC, so that the DFSClient can survive NN restart
+ conf.setInt(
+ CommonConfigurationKeysPublic.IPC_CLIENT_CONNECTION_MAXIDLETIME_KEY,
+ 0);
+ }
+
+ private static void assertCTimesEqual(MiniDFSCluster cluster) {
+ long nn1CTime = cluster.getNamesystem(0).getFSImage().getStorage().getCTime();
+ long nn2CTime = cluster.getNamesystem(1).getFSImage().getStorage().getCTime();
+ assertEquals(nn1CTime, nn2CTime);
+ }
+
+ private static void checkClusterPreviousDirExistence(MiniDFSCluster cluster,
+ boolean shouldExist) {
+ for (int i = 0; i < 2; i++) {
+ checkNnPreviousDirExistence(cluster, i, shouldExist);
+ }
+ }
+
+ private static void checkNnPreviousDirExistence(MiniDFSCluster cluster,
+ int index, boolean shouldExist) {
+ Collection<URI> nameDirs = cluster.getNameDirs(index);
+ for (URI nnDir : nameDirs) {
+ checkPreviousDirExistence(new File(nnDir), shouldExist);
+ }
+ }
+
+ private static void checkJnPreviousDirExistence(MiniQJMHACluster jnCluster,
+ boolean shouldExist) throws IOException {
+ for (int i = 0; i < 3; i++) {
+ checkPreviousDirExistence(
+ jnCluster.getJournalCluster().getJournalDir(i, "ns1"), shouldExist);
+ }
+ if (shouldExist) {
+ assertEpochFilesCopied(jnCluster);
+ }
+ }
+
+ private static void assertEpochFilesCopied(MiniQJMHACluster jnCluster)
+ throws IOException {
+ for (int i = 0; i < 3; i++) {
+ File journalDir = jnCluster.getJournalCluster().getJournalDir(i, "ns1");
+ File currDir = new File(journalDir, "current");
+ File prevDir = new File(journalDir, "previous");
+ for (String fileName : new String[]{ Journal.LAST_PROMISED_FILENAME,
+ Journal.LAST_WRITER_EPOCH }) {
+ File prevFile = new File(prevDir, fileName);
+ // Possible the prev file doesn't exist, e.g. if there has never been a
+ // writer before the upgrade.
+ if (prevFile.exists()) {
+ PersistentLongFile prevLongFile = new PersistentLongFile(prevFile, -10);
+ PersistentLongFile currLongFile = new PersistentLongFile(new File(currDir,
+ fileName), -11);
+ assertTrue("Value in " + fileName + " has decreased on upgrade in "
+ + journalDir, prevLongFile.get() <= currLongFile.get());
+ }
+ }
+ }
+ }
+
+ private static void checkPreviousDirExistence(File rootDir,
+ boolean shouldExist) {
+ File previousDir = new File(rootDir, "previous");
+ if (shouldExist) {
+ assertTrue(previousDir + " does not exist", previousDir.exists());
+ } else {
+ assertFalse(previousDir + " does exist", previousDir.exists());
+ }
+ }
+
+ private void runFinalizeCommand(MiniDFSCluster cluster)
+ throws IOException {
+ HATestUtil.setFailoverConfigurations(cluster, conf);
+ new DFSAdmin(conf).finalizeUpgrade();
+ }
+
/**
- * Make sure that an HA NN refuses to start if given an upgrade-related
- * startup option.
+ * Ensure that an admin cannot finalize an HA upgrade without at least one NN
+ * being active.
*/
@Test
- public void testStartingWithUpgradeOptionsFails() throws IOException {
- for (StartupOption startOpt : Lists.newArrayList(new StartupOption[] {
- StartupOption.UPGRADE, StartupOption.FINALIZE,
- StartupOption.ROLLBACK })) {
- MiniDFSCluster cluster = null;
+ public void testCannotFinalizeIfNoActive() throws IOException,
+ URISyntaxException {
+ MiniDFSCluster cluster = null;
+ FileSystem fs = null;
+ try {
+ cluster = new MiniDFSCluster.Builder(conf)
+ .nnTopology(MiniDFSNNTopology.simpleHATopology())
+ .numDataNodes(0)
+ .build();
+
+ File sharedDir = new File(cluster.getSharedEditsDir(0, 1));
+
+ // No upgrade is in progress at the moment.
+ checkClusterPreviousDirExistence(cluster, false);
+ assertCTimesEqual(cluster);
+ checkPreviousDirExistence(sharedDir, false);
+
+ // Transition NN0 to active and do some FS ops.
+ cluster.transitionToActive(0);
+ fs = HATestUtil.configureFailoverFs(cluster, conf);
+ assertTrue(fs.mkdirs(new Path("/foo1")));
+
+ // Do the upgrade. Shut down NN1 and then restart NN0 with the upgrade
+ // flag.
+ cluster.shutdownNameNode(1);
+ cluster.getNameNodeInfos()[0].setStartOpt(StartupOption.UPGRADE);
+ cluster.restartNameNode(0, false);
+
+ checkNnPreviousDirExistence(cluster, 0, true);
+ checkNnPreviousDirExistence(cluster, 1, false);
+ checkPreviousDirExistence(sharedDir, true);
+
+ // NN0 should come up in the active state when given the -upgrade option,
+ // so no need to transition it to active.
+ assertTrue(fs.mkdirs(new Path("/foo2")));
+
+ // Restart NN0 without the -upgrade flag, to make sure that works.
+ cluster.getNameNodeInfos()[0].setStartOpt(StartupOption.REGULAR);
+ cluster.restartNameNode(0, false);
+
+ // Make sure we can still do FS ops after upgrading.
+ cluster.transitionToActive(0);
+ assertTrue(fs.mkdirs(new Path("/foo3")));
+
+ // Now bootstrap the standby with the upgraded info.
+ int rc = BootstrapStandby.run(
+ new String[]{"-force"},
+ cluster.getConfiguration(1));
+ assertEquals(0, rc);
+
+ // Now restart NN1 and make sure that we can do ops against that as well.
+ cluster.restartNameNode(1);
+ cluster.transitionToStandby(0);
+ cluster.transitionToActive(1);
+ assertTrue(fs.mkdirs(new Path("/foo4")));
+
+ assertCTimesEqual(cluster);
+
+ // Now there's no active NN.
+ cluster.transitionToStandby(1);
+
try {
- cluster = new MiniDFSCluster.Builder(new Configuration())
- .nnTopology(MiniDFSNNTopology.simpleHATopology())
- .startupOption(startOpt)
- .numDataNodes(0)
- .build();
- fail("Should not have been able to start an HA NN in upgrade mode");
- } catch (IllegalArgumentException iae) {
+ runFinalizeCommand(cluster);
+ fail("Should not have been able to finalize upgrade with no NN active");
+ } catch (IOException ioe) {
GenericTestUtils.assertExceptionContains(
- "Cannot perform DFS upgrade with HA enabled.", iae);
- LOG.info("Got expected exception", iae);
- } finally {
- if (cluster != null) {
- cluster.shutdown();
- }
+ "Cannot finalize with no NameNode active", ioe);
+ }
+ } finally {
+ if (fs != null) {
+ fs.close();
+ }
+ if (cluster != null) {
+ cluster.shutdown();
+ }
+ }
+ }
+
+ /**
+ * Make sure that an HA NN with NFS-based HA can successfully start and
+ * upgrade.
+ */
+ @Test
+ public void testNfsUpgrade() throws IOException, URISyntaxException {
+ MiniDFSCluster cluster = null;
+ FileSystem fs = null;
+ try {
+ cluster = new MiniDFSCluster.Builder(conf)
+ .nnTopology(MiniDFSNNTopology.simpleHATopology())
+ .numDataNodes(0)
+ .build();
+
+ File sharedDir = new File(cluster.getSharedEditsDir(0, 1));
+
+ // No upgrade is in progress at the moment.
+ checkClusterPreviousDirExistence(cluster, false);
+ assertCTimesEqual(cluster);
+ checkPreviousDirExistence(sharedDir, false);
+
+ // Transition NN0 to active and do some FS ops.
+ cluster.transitionToActive(0);
+ fs = HATestUtil.configureFailoverFs(cluster, conf);
+ assertTrue(fs.mkdirs(new Path("/foo1")));
+
+ // Do the upgrade. Shut down NN1 and then restart NN0 with the upgrade
+ // flag.
+ cluster.shutdownNameNode(1);
+ cluster.getNameNodeInfos()[0].setStartOpt(StartupOption.UPGRADE);
+ cluster.restartNameNode(0, false);
+
+ checkNnPreviousDirExistence(cluster, 0, true);
+ checkNnPreviousDirExistence(cluster, 1, false);
+ checkPreviousDirExistence(sharedDir, true);
+
+ // NN0 should come up in the active state when given the -upgrade option,
+ // so no need to transition it to active.
+ assertTrue(fs.mkdirs(new Path("/foo2")));
+
+ // Restart NN0 without the -upgrade flag, to make sure that works.
+ cluster.getNameNodeInfos()[0].setStartOpt(StartupOption.REGULAR);
+ cluster.restartNameNode(0, false);
+
+ // Make sure we can still do FS ops after upgrading.
+ cluster.transitionToActive(0);
+ assertTrue(fs.mkdirs(new Path("/foo3")));
+
+ // Now bootstrap the standby with the upgraded info.
+ int rc = BootstrapStandby.run(
+ new String[]{"-force"},
+ cluster.getConfiguration(1));
+ assertEquals(0, rc);
+
+ // Now restart NN1 and make sure that we can do ops against that as well.
+ cluster.restartNameNode(1);
+ cluster.transitionToStandby(0);
+ cluster.transitionToActive(1);
+ assertTrue(fs.mkdirs(new Path("/foo4")));
+
+ assertCTimesEqual(cluster);
+ } finally {
+ if (fs != null) {
+ fs.close();
+ }
+ if (cluster != null) {
+ cluster.shutdown();
+ }
+ }
+ }
+
+ /**
+ * Make sure that an HA NN can successfully upgrade when configured using
+ * JournalNodes.
+ */
+ @Test
+ public void testUpgradeWithJournalNodes() throws IOException,
+ URISyntaxException {
+ MiniQJMHACluster qjCluster = null;
+ FileSystem fs = null;
+ try {
+ Builder builder = new MiniQJMHACluster.Builder(conf);
+ builder.getDfsBuilder()
+ .numDataNodes(0);
+ qjCluster = builder.build();
+
+ MiniDFSCluster cluster = qjCluster.getDfsCluster();
+
+ // No upgrade is in progress at the moment.
+ checkJnPreviousDirExistence(qjCluster, false);
+ checkClusterPreviousDirExistence(cluster, false);
+ assertCTimesEqual(cluster);
+
+ // Transition NN0 to active and do some FS ops.
+ cluster.transitionToActive(0);
+ fs = HATestUtil.configureFailoverFs(cluster, conf);
+ assertTrue(fs.mkdirs(new Path("/foo1")));
+
+ // Do the upgrade. Shut down NN1 and then restart NN0 with the upgrade
+ // flag.
+ cluster.shutdownNameNode(1);
+ cluster.getNameNodeInfos()[0].setStartOpt(StartupOption.UPGRADE);
+ cluster.restartNameNode(0, false);
+
+ checkNnPreviousDirExistence(cluster, 0, true);
+ checkNnPreviousDirExistence(cluster, 1, false);
+ checkJnPreviousDirExistence(qjCluster, true);
+
+ // NN0 should come up in the active state when given the -upgrade option,
+ // so no need to transition it to active.
+ assertTrue(fs.mkdirs(new Path("/foo2")));
+
+ // Restart NN0 without the -upgrade flag, to make sure that works.
+ cluster.getNameNodeInfos()[0].setStartOpt(StartupOption.REGULAR);
+ cluster.restartNameNode(0, false);
+
+ // Make sure we can still do FS ops after upgrading.
+ cluster.transitionToActive(0);
+ assertTrue(fs.mkdirs(new Path("/foo3")));
+
+ // Now bootstrap the standby with the upgraded info.
+ int rc = BootstrapStandby.run(
+ new String[]{"-force"},
+ cluster.getConfiguration(1));
+ assertEquals(0, rc);
+
+ // Now restart NN1 and make sure that we can do ops against that as well.
+ cluster.restartNameNode(1);
+ cluster.transitionToStandby(0);
+ cluster.transitionToActive(1);
+ assertTrue(fs.mkdirs(new Path("/foo4")));
+
+ assertCTimesEqual(cluster);
+ } finally {
+ if (fs != null) {
+ fs.close();
+ }
+ if (qjCluster != null) {
+ qjCluster.shutdown();
+ }
+ }
+ }
+
+ @Test
+ public void testFinalizeWithJournalNodes() throws IOException,
+ URISyntaxException {
+ MiniQJMHACluster qjCluster = null;
+ FileSystem fs = null;
+ try {
+ Builder builder = new MiniQJMHACluster.Builder(conf);
+ builder.getDfsBuilder()
+ .numDataNodes(0);
+ qjCluster = builder.build();
+
+ MiniDFSCluster cluster = qjCluster.getDfsCluster();
+
+ // No upgrade is in progress at the moment.
+ checkJnPreviousDirExistence(qjCluster, false);
+ checkClusterPreviousDirExistence(cluster, false);
+ assertCTimesEqual(cluster);
+
+ // Transition NN0 to active and do some FS ops.
+ cluster.transitionToActive(0);
+ fs = HATestUtil.configureFailoverFs(cluster, conf);
+ assertTrue(fs.mkdirs(new Path("/foo1")));
+
+ // Do the upgrade. Shut down NN1 and then restart NN0 with the upgrade
+ // flag.
+ cluster.shutdownNameNode(1);
+ cluster.getNameNodeInfos()[0].setStartOpt(StartupOption.UPGRADE);
+ cluster.restartNameNode(0, false);
+
+ assertTrue(fs.mkdirs(new Path("/foo2")));
+
+ checkNnPreviousDirExistence(cluster, 0, true);
+ checkNnPreviousDirExistence(cluster, 1, false);
+ checkJnPreviousDirExistence(qjCluster, true);
+
+ // Now bootstrap the standby with the upgraded info.
+ int rc = BootstrapStandby.run(
+ new String[]{"-force"},
+ cluster.getConfiguration(1));
+ assertEquals(0, rc);
+
+ cluster.restartNameNode(1);
+
+ runFinalizeCommand(cluster);
+
+ checkClusterPreviousDirExistence(cluster, false);
+ checkJnPreviousDirExistence(qjCluster, false);
+ assertCTimesEqual(cluster);
+ } finally {
+ if (fs != null) {
+ fs.close();
+ }
+ if (qjCluster != null) {
+ qjCluster.shutdown();
}
}
}
/**
- * Make sure that an HA NN won't start if a previous upgrade was in progress.
+ * Make sure that even if the NN which initiated the upgrade is in the standby
+ * state that we're allowed to finalize.
*/
@Test
- public void testStartingWithUpgradeInProgressFails() throws Exception {
+ public void testFinalizeFromSecondNameNodeWithJournalNodes()
+ throws IOException, URISyntaxException {
+ MiniQJMHACluster qjCluster = null;
+ FileSystem fs = null;
+ try {
+ Builder builder = new MiniQJMHACluster.Builder(conf);
+ builder.getDfsBuilder()
+ .numDataNodes(0);
+ qjCluster = builder.build();
+
+ MiniDFSCluster cluster = qjCluster.getDfsCluster();
+
+ // No upgrade is in progress at the moment.
+ checkJnPreviousDirExistence(qjCluster, false);
+ checkClusterPreviousDirExistence(cluster, false);
+ assertCTimesEqual(cluster);
+
+ // Transition NN0 to active and do some FS ops.
+ cluster.transitionToActive(0);
+ fs = HATestUtil.configureFailoverFs(cluster, conf);
+ assertTrue(fs.mkdirs(new Path("/foo1")));
+
+ // Do the upgrade. Shut down NN1 and then restart NN0 with the upgrade
+ // flag.
+ cluster.shutdownNameNode(1);
+ cluster.getNameNodeInfos()[0].setStartOpt(StartupOption.UPGRADE);
+ cluster.restartNameNode(0, false);
+
+ checkNnPreviousDirExistence(cluster, 0, true);
+ checkNnPreviousDirExistence(cluster, 1, false);
+ checkJnPreviousDirExistence(qjCluster, true);
+
+ // Now bootstrap the standby with the upgraded info.
+ int rc = BootstrapStandby.run(
+ new String[]{"-force"},
+ cluster.getConfiguration(1));
+ assertEquals(0, rc);
+
+ cluster.restartNameNode(1);
+
+ // Make the second NN (not the one that initiated the upgrade) active when
+ // the finalize command is run.
+ cluster.transitionToStandby(0);
+ cluster.transitionToActive(1);
+
+ runFinalizeCommand(cluster);
+
+ checkClusterPreviousDirExistence(cluster, false);
+ checkJnPreviousDirExistence(qjCluster, false);
+ assertCTimesEqual(cluster);
+ } finally {
+ if (fs != null) {
+ fs.close();
+ }
+ if (qjCluster != null) {
+ qjCluster.shutdown();
+ }
+ }
+ }
+
+ /**
+ * Make sure that an HA NN will start if a previous upgrade was in progress.
+ */
+ @Test
+ public void testStartingWithUpgradeInProgressSucceeds() throws Exception {
MiniDFSCluster cluster = null;
try {
- cluster = new MiniDFSCluster.Builder(new Configuration())
+ cluster = new MiniDFSCluster.Builder(conf)
.nnTopology(MiniDFSNNTopology.simpleHATopology())
.numDataNodes(0)
.build();
-
+
// Simulate an upgrade having started.
for (int i = 0; i < 2; i++) {
for (URI uri : cluster.getNameDirs(i)) {
@@ -92,18 +509,226 @@
assertTrue(prevTmp.mkdirs());
}
}
-
+
cluster.restartNameNodes();
- fail("Should not have been able to start an HA NN with an in-progress upgrade");
- } catch (IOException ioe) {
- GenericTestUtils.assertExceptionContains(
- "Cannot start an HA namenode with name dirs that need recovery.",
- ioe);
- LOG.info("Got expected exception", ioe);
} finally {
if (cluster != null) {
cluster.shutdown();
}
}
}
+
+ /**
+ * Test rollback with NFS shared dir.
+ */
+ @Test
+ public void testRollbackWithNfs() throws Exception {
+ MiniDFSCluster cluster = null;
+ FileSystem fs = null;
+ try {
+ cluster = new MiniDFSCluster.Builder(conf)
+ .nnTopology(MiniDFSNNTopology.simpleHATopology())
+ .numDataNodes(0)
+ .build();
+
+ File sharedDir = new File(cluster.getSharedEditsDir(0, 1));
+
+ // No upgrade is in progress at the moment.
+ checkClusterPreviousDirExistence(cluster, false);
+ assertCTimesEqual(cluster);
+ checkPreviousDirExistence(sharedDir, false);
+
+ // Transition NN0 to active and do some FS ops.
+ cluster.transitionToActive(0);
+ fs = HATestUtil.configureFailoverFs(cluster, conf);
+ assertTrue(fs.mkdirs(new Path("/foo1")));
+
+ // Do the upgrade. Shut down NN1 and then restart NN0 with the upgrade
+ // flag.
+ cluster.shutdownNameNode(1);
+ cluster.getNameNodeInfos()[0].setStartOpt(StartupOption.UPGRADE);
+ cluster.restartNameNode(0, false);
+
+ checkNnPreviousDirExistence(cluster, 0, true);
+ checkNnPreviousDirExistence(cluster, 1, false);
+ checkPreviousDirExistence(sharedDir, true);
+
+ // NN0 should come up in the active state when given the -upgrade option,
+ // so no need to transition it to active.
+ assertTrue(fs.mkdirs(new Path("/foo2")));
+
+ // Now bootstrap the standby with the upgraded info.
+ int rc = BootstrapStandby.run(
+ new String[]{"-force"},
+ cluster.getConfiguration(1));
+ assertEquals(0, rc);
+
+ cluster.restartNameNode(1);
+
+ checkNnPreviousDirExistence(cluster, 0, true);
+ checkNnPreviousDirExistence(cluster, 1, false);
+ checkPreviousDirExistence(sharedDir, true);
+ assertCTimesEqual(cluster);
+
+ // Now shut down the cluster and do the rollback.
+ Collection<URI> nn1NameDirs = cluster.getNameDirs(0);
+ cluster.shutdown();
+
+ conf.setStrings(DFSConfigKeys.DFS_NAMENODE_NAME_DIR_KEY, Joiner.on(",").join(nn1NameDirs));
+ NameNode.doRollback(conf, false);
+
+ // The rollback operation should have rolled back the first NN's local
+ // dirs, and the shared dir, but not the other NN's dirs. Those have to be
+ // done by bootstrapping the standby.
+ checkNnPreviousDirExistence(cluster, 0, false);
+ checkPreviousDirExistence(sharedDir, false);
+ } finally {
+ if (fs != null) {
+ fs.close();
+ }
+ if (cluster != null) {
+ cluster.shutdown();
+ }
+ }
+ }
+
+ @Test
+ public void testRollbackWithJournalNodes() throws IOException,
+ URISyntaxException {
+ MiniQJMHACluster qjCluster = null;
+ FileSystem fs = null;
+ try {
+ Builder builder = new MiniQJMHACluster.Builder(conf);
+ builder.getDfsBuilder()
+ .numDataNodes(0);
+ qjCluster = builder.build();
+
+ MiniDFSCluster cluster = qjCluster.getDfsCluster();
+
+ // No upgrade is in progress at the moment.
+ checkClusterPreviousDirExistence(cluster, false);
+ assertCTimesEqual(cluster);
+ checkJnPreviousDirExistence(qjCluster, false);
+
+ // Transition NN0 to active and do some FS ops.
+ cluster.transitionToActive(0);
+ fs = HATestUtil.configureFailoverFs(cluster, conf);
+ assertTrue(fs.mkdirs(new Path("/foo1")));
+
+ // Do the upgrade. Shut down NN1 and then restart NN0 with the upgrade
+ // flag.
+ cluster.shutdownNameNode(1);
+ cluster.getNameNodeInfos()[0].setStartOpt(StartupOption.UPGRADE);
+ cluster.restartNameNode(0, false);
+
+ checkNnPreviousDirExistence(cluster, 0, true);
+ checkNnPreviousDirExistence(cluster, 1, false);
+ checkJnPreviousDirExistence(qjCluster, true);
+
+ // NN0 should come up in the active state when given the -upgrade option,
+ // so no need to transition it to active.
+ assertTrue(fs.mkdirs(new Path("/foo2")));
+
+ // Now bootstrap the standby with the upgraded info.
+ int rc = BootstrapStandby.run(
+ new String[]{"-force"},
+ cluster.getConfiguration(1));
+ assertEquals(0, rc);
+
+ cluster.restartNameNode(1);
+
+ checkNnPreviousDirExistence(cluster, 0, true);
+ checkNnPreviousDirExistence(cluster, 1, false);
+ checkJnPreviousDirExistence(qjCluster, true);
+ assertCTimesEqual(cluster);
+
+ // Shut down the NNs, but deliberately leave the JNs up and running.
+ Collection<URI> nn1NameDirs = cluster.getNameDirs(0);
+ cluster.shutdown();
+
+ conf.setStrings(DFSConfigKeys.DFS_NAMENODE_NAME_DIR_KEY, Joiner.on(",").join(nn1NameDirs));
+ NameNode.doRollback(conf, false);
+
+ // The rollback operation should have rolled back the first NN's local
+ // dirs, and the shared dir, but not the other NN's dirs. Those have to be
+ // done by bootstrapping the standby.
+ checkNnPreviousDirExistence(cluster, 0, false);
+ checkJnPreviousDirExistence(qjCluster, false);
+ } finally {
+ if (fs != null) {
+ fs.close();
+ }
+ if (qjCluster != null) {
+ qjCluster.shutdown();
+ }
+ }
+ }
+
+ /**
+ * Make sure that starting a second NN with the -upgrade flag fails if the
+ * other NN has already done that.
+ */
+ @Test
+ public void testCannotUpgradeSecondNameNode() throws IOException,
+ URISyntaxException {
+ MiniDFSCluster cluster = null;
+ FileSystem fs = null;
+ try {
+ cluster = new MiniDFSCluster.Builder(conf)
+ .nnTopology(MiniDFSNNTopology.simpleHATopology())
+ .numDataNodes(0)
+ .build();
+
+ File sharedDir = new File(cluster.getSharedEditsDir(0, 1));
+
+ // No upgrade is in progress at the moment.
+ checkClusterPreviousDirExistence(cluster, false);
+ assertCTimesEqual(cluster);
+ checkPreviousDirExistence(sharedDir, false);
+
+ // Transition NN0 to active and do some FS ops.
+ cluster.transitionToActive(0);
+ fs = HATestUtil.configureFailoverFs(cluster, conf);
+ assertTrue(fs.mkdirs(new Path("/foo1")));
+
+ // Do the upgrade. Shut down NN1 and then restart NN0 with the upgrade
+ // flag.
+ cluster.shutdownNameNode(1);
+ cluster.getNameNodeInfos()[0].setStartOpt(StartupOption.UPGRADE);
+ cluster.restartNameNode(0, false);
+
+ checkNnPreviousDirExistence(cluster, 0, true);
+ checkNnPreviousDirExistence(cluster, 1, false);
+ checkPreviousDirExistence(sharedDir, true);
+
+ // NN0 should come up in the active state when given the -upgrade option,
+ // so no need to transition it to active.
+ assertTrue(fs.mkdirs(new Path("/foo2")));
+
+ // Restart NN0 without the -upgrade flag, to make sure that works.
+ cluster.getNameNodeInfos()[0].setStartOpt(StartupOption.REGULAR);
+ cluster.restartNameNode(0, false);
+
+ // Make sure we can still do FS ops after upgrading.
+ cluster.transitionToActive(0);
+ assertTrue(fs.mkdirs(new Path("/foo3")));
+
+ // Make sure that starting the second NN with the -upgrade flag fails.
+ cluster.getNameNodeInfos()[1].setStartOpt(StartupOption.UPGRADE);
+ try {
+ cluster.restartNameNode(1, false);
+ fail("Should not have been able to start second NN with -upgrade");
+ } catch (IOException ioe) {
+ GenericTestUtils.assertExceptionContains(
+ "It looks like the shared log is already being upgraded", ioe);
+ }
+ } finally {
+ if (fs != null) {
+ fs.close();
+ }
+ if (cluster != null) {
+ cluster.shutdown();
+ }
+ }
+ }
}
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/ha/TestInitializeSharedEdits.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/ha/TestInitializeSharedEdits.java
index b534c03..272e543 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/ha/TestInitializeSharedEdits.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/ha/TestInitializeSharedEdits.java
@@ -96,7 +96,7 @@
} catch (IOException ioe) {
LOG.info("Got expected exception", ioe);
GenericTestUtils.assertExceptionContains(
- "Cannot start an HA namenode with name dirs that need recovery", ioe);
+ "storage directory does not exist or is not accessible", ioe);
}
try {
cluster.restartNameNode(1, false);
@@ -104,7 +104,7 @@
} catch (IOException ioe) {
LOG.info("Got expected exception", ioe);
GenericTestUtils.assertExceptionContains(
- "Cannot start an HA namenode with name dirs that need recovery", ioe);
+ "storage directory does not exist or is not accessible", ioe);
}
}
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/snapshot/TestSnapshotFileLength.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/snapshot/TestSnapshotFileLength.java
index cd316ab..817a2de 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/snapshot/TestSnapshotFileLength.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/snapshot/TestSnapshotFileLength.java
@@ -17,22 +17,26 @@
*/
package org.apache.hadoop.hdfs.server.namenode.snapshot;
-import java.util.Random;
+import java.io.ByteArrayOutputStream;
+import java.io.PrintStream;
+
import org.apache.hadoop.fs.FileStatus;
import org.junit.After;
import org.junit.Before;
import org.junit.Test;
import static org.hamcrest.CoreMatchers.is;
-import static org.junit.Assert.assertThat;
-import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.*;
+
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataInputStream;
+import org.apache.hadoop.fs.FsShell;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hdfs.DFSTestUtil;
import org.apache.hadoop.hdfs.DistributedFileSystem;
import org.apache.hadoop.hdfs.MiniDFSCluster;
+import org.apache.hadoop.util.ToolRunner;
public class TestSnapshotFileLength {
@@ -112,4 +116,61 @@
assertThat(bytesRead, is(BLOCKSIZE));
fis.close();
}
+
+ /**
+ * Adding as part of jira HDFS-5343
+ * Test for checking the cat command on snapshot path it
+ * cannot read a file beyond snapshot file length
+ * @throws Exception
+ */
+ @Test (timeout = 600000)
+ public void testSnapshotFileLengthWithCatCommand() throws Exception {
+
+ FSDataInputStream fis = null;
+ FileStatus fileStatus = null;
+
+ int bytesRead;
+ byte[] buffer = new byte[BLOCKSIZE * 8];
+
+ hdfs.mkdirs(sub);
+ Path file1 = new Path(sub, file1Name);
+ DFSTestUtil.createFile(hdfs, file1, BLOCKSIZE, REPLICATION, SEED);
+
+ hdfs.allowSnapshot(sub);
+ hdfs.createSnapshot(sub, snapshot1);
+
+ DFSTestUtil.appendFile(hdfs, file1, BLOCKSIZE);
+
+ // Make sure we can read the entire file via its non-snapshot path.
+ fileStatus = hdfs.getFileStatus(file1);
+ assertEquals(fileStatus.getLen(), BLOCKSIZE * 2);
+ fis = hdfs.open(file1);
+ bytesRead = fis.read(buffer, 0, buffer.length);
+ assertEquals(bytesRead, BLOCKSIZE * 2);
+ fis.close();
+
+ Path file1snap1 =
+ SnapshotTestHelper.getSnapshotPath(sub, snapshot1, file1Name);
+ fis = hdfs.open(file1snap1);
+ fileStatus = hdfs.getFileStatus(file1snap1);
+ assertEquals(fileStatus.getLen(), BLOCKSIZE);
+ // Make sure we can only read up to the snapshot length.
+ bytesRead = fis.read(buffer, 0, buffer.length);
+ assertEquals(bytesRead, BLOCKSIZE);
+ fis.close();
+
+ PrintStream psBackup = System.out;
+ ByteArrayOutputStream bao = new ByteArrayOutputStream();
+ System.setOut(new PrintStream(bao));
+ System.setErr(new PrintStream(bao));
+ // Make sure we can cat the file upto to snapshot length
+ FsShell shell = new FsShell();
+ try{
+ ToolRunner.run(conf, shell, new String[] { "-cat",
+ "/TestSnapshotFileLength/sub1/.snapshot/snapshot1/file1" });
+ assertEquals(bao.size(), BLOCKSIZE);
+ }finally{
+ System.setOut(psBackup);
+ }
+ }
}