Merge trunk into auto-HA branch
git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/branches/HDFS-3042@1340622 13f79535-47bb-0310-9956-ffa450edef68
diff --git a/hadoop-common-project/hadoop-common/CHANGES.HDFS-3042.txt b/hadoop-common-project/hadoop-common/CHANGES.HDFS-3042.txt
new file mode 100644
index 0000000..8bf3d88
--- /dev/null
+++ b/hadoop-common-project/hadoop-common/CHANGES.HDFS-3042.txt
@@ -0,0 +1,29 @@
+Changes for HDFS-3042 branch.
+
+This change list will be merged into the trunk CHANGES.txt when the HDFS-3-42
+branch is merged.
+------------------------------
+
+HADOOP-8220. ZKFailoverController doesn't handle failure to become active correctly (todd)
+
+HADOOP-8228. Auto HA: Refactor tests and add stress tests. (todd)
+
+HADOOP-8215. Security support for ZK Failover controller (todd)
+
+HADOOP-8245. Fix flakiness in TestZKFailoverController (todd)
+
+HADOOP-8257. TestZKFailoverControllerStress occasionally fails with Mockito error (todd)
+
+HADOOP-8260. Replace ClientBaseWithFixes with our own modified copy of the class (todd)
+
+HADOOP-8246. Auto-HA: automatically scope znode by nameservice ID (todd)
+
+HADOOP-8247. Add a config to enable auto-HA, which disables manual FailoverController (todd)
+
+HADOOP-8306. ZKFC: improve error message when ZK is not running. (todd)
+
+HADOOP-8279. Allow manual failover to be invoked when auto-failover is enabled. (todd)
+
+HADOOP-8276. Auto-HA: add config for java options to pass to zkfc daemon (todd via eli)
+
+HADOOP-8405. ZKFC tests leak ZK instances. (todd)
diff --git a/hadoop-common-project/hadoop-common/dev-support/findbugsExcludeFile.xml b/hadoop-common-project/hadoop-common/dev-support/findbugsExcludeFile.xml
index 44092c0..c1c3cd2 100644
--- a/hadoop-common-project/hadoop-common/dev-support/findbugsExcludeFile.xml
+++ b/hadoop-common-project/hadoop-common/dev-support/findbugsExcludeFile.xml
@@ -290,5 +290,9 @@
<!-- protobuf generated code -->
<Class name="~org\.apache\.hadoop\.ha\.proto\.HAServiceProtocolProtos.*"/>
</Match>
+ <Match>
+ <!-- protobuf generated code -->
+ <Class name="~org\.apache\.hadoop\.ha\.proto\.ZKFCProtocolProtos.*"/>
+ </Match>
</FindBugsFilter>
diff --git a/hadoop-common-project/hadoop-common/src/main/bin/hadoop-daemon.sh b/hadoop-common-project/hadoop-common/src/main/bin/hadoop-daemon.sh
index 80393a5..9d19250 100755
--- a/hadoop-common-project/hadoop-common/src/main/bin/hadoop-daemon.sh
+++ b/hadoop-common-project/hadoop-common/src/main/bin/hadoop-daemon.sh
@@ -141,7 +141,7 @@
echo starting $command, logging to $log
cd "$HADOOP_PREFIX"
case $command in
- namenode|secondarynamenode|datanode|dfs|dfsadmin|fsck|balancer)
+ namenode|secondarynamenode|datanode|dfs|dfsadmin|fsck|balancer|zkfc)
if [ -z "$HADOOP_HDFS_HOME" ]; then
hdfsScript="$HADOOP_PREFIX"/bin/hdfs
else
diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/CommonConfigurationKeys.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/CommonConfigurationKeys.java
index 52cb1f3..e2955ab 100644
--- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/CommonConfigurationKeys.java
+++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/CommonConfigurationKeys.java
@@ -116,6 +116,8 @@
"security.refresh.user.mappings.protocol.acl";
public static final String
SECURITY_HA_SERVICE_PROTOCOL_ACL = "security.ha.service.protocol.acl";
+ public static final String
+ SECURITY_ZKFC_PROTOCOL_ACL = "security.zkfc.protocol.acl";
public static final String HADOOP_SECURITY_TOKEN_SERVICE_USE_IP =
"hadoop.security.token.service.use_ip";
diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ha/ActiveStandbyElector.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ha/ActiveStandbyElector.java
index ef05456..a4ed255 100644
--- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ha/ActiveStandbyElector.java
+++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ha/ActiveStandbyElector.java
@@ -29,6 +29,7 @@
import org.apache.hadoop.HadoopIllegalArgumentException;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.classification.InterfaceStability;
+import org.apache.hadoop.ha.HAZKUtil.ZKAuthInfo;
import org.apache.hadoop.util.StringUtils;
import org.apache.zookeeper.data.ACL;
import org.apache.zookeeper.KeeperException;
@@ -81,9 +82,15 @@
*/
public interface ActiveStandbyElectorCallback {
/**
- * This method is called when the app becomes the active leader
+ * This method is called when the app becomes the active leader.
+ * If the service fails to become active, it should throw
+ * ServiceFailedException. This will cause the elector to
+ * sleep for a short period, then re-join the election.
+ *
+ * Callback implementations are expected to manage their own
+ * timeouts (e.g. when making an RPC to a remote node).
*/
- void becomeActive();
+ void becomeActive() throws ServiceFailedException;
/**
* This method is called when the app becomes a standby
@@ -134,7 +141,8 @@
public static final Log LOG = LogFactory.getLog(ActiveStandbyElector.class);
- private static final int NUM_RETRIES = 3;
+ static int NUM_RETRIES = 3;
+ private static final int SLEEP_AFTER_FAILURE_TO_BECOME_ACTIVE = 1000;
private static enum ConnectionState {
DISCONNECTED, CONNECTED, TERMINATED
@@ -154,6 +162,7 @@
private final String zkHostPort;
private final int zkSessionTimeout;
private final List<ACL> zkAcl;
+ private final List<ZKAuthInfo> zkAuthInfo;
private byte[] appData;
private final String zkLockFilePath;
private final String zkBreadCrumbPath;
@@ -185,6 +194,8 @@
* znode under which to create the lock
* @param acl
* ZooKeeper ACL's
+ * @param authInfo a list of authentication credentials to add to the
+ * ZK connection
* @param app
* reference to callback interface object
* @throws IOException
@@ -192,6 +203,7 @@
*/
public ActiveStandbyElector(String zookeeperHostPorts,
int zookeeperSessionTimeout, String parentZnodeName, List<ACL> acl,
+ List<ZKAuthInfo> authInfo,
ActiveStandbyElectorCallback app) throws IOException,
HadoopIllegalArgumentException {
if (app == null || acl == null || parentZnodeName == null
@@ -201,6 +213,7 @@
zkHostPort = zookeeperHostPorts;
zkSessionTimeout = zookeeperSessionTimeout;
zkAcl = acl;
+ zkAuthInfo = authInfo;
appClient = app;
znodeWorkingDir = parentZnodeName;
zkLockFilePath = znodeWorkingDir + "/" + LOCK_FILENAME;
@@ -227,8 +240,6 @@
public synchronized void joinElection(byte[] data)
throws HadoopIllegalArgumentException {
- LOG.debug("Attempting active election");
-
if (data == null) {
throw new HadoopIllegalArgumentException("data cannot be null");
}
@@ -236,6 +247,7 @@
appData = new byte[data.length];
System.arraycopy(data, 0, appData, 0, data.length);
+ LOG.debug("Attempting active election for " + this);
joinElectionInternal();
}
@@ -259,6 +271,9 @@
*/
public synchronized void ensureParentZNode()
throws IOException, InterruptedException {
+ Preconditions.checkState(!wantToBeInElection,
+ "ensureParentZNode() may not be called while in the election");
+
String pathParts[] = znodeWorkingDir.split("/");
Preconditions.checkArgument(pathParts.length >= 1 &&
"".equals(pathParts[0]),
@@ -292,6 +307,9 @@
*/
public synchronized void clearParentZNode()
throws IOException, InterruptedException {
+ Preconditions.checkState(!wantToBeInElection,
+ "clearParentZNode() may not be called while in the election");
+
try {
LOG.info("Recursively deleting " + znodeWorkingDir + " from ZK...");
@@ -360,7 +378,7 @@
createConnection();
}
Stat stat = new Stat();
- return zkClient.getData(zkLockFilePath, false, stat);
+ return getDataWithRetries(zkLockFilePath, false, stat);
} catch(KeeperException e) {
Code code = e.code();
if (isNodeDoesNotExist(code)) {
@@ -380,13 +398,17 @@
String name) {
if (isStaleClient(ctx)) return;
LOG.debug("CreateNode result: " + rc + " for path: " + path
- + " connectionState: " + zkConnectionState);
+ + " connectionState: " + zkConnectionState +
+ " for " + this);
Code code = Code.get(rc);
if (isSuccess(code)) {
// we successfully created the znode. we are the leader. start monitoring
- becomeActive();
- monitorActiveStatus();
+ if (becomeActive()) {
+ monitorActiveStatus();
+ } else {
+ reJoinElectionAfterFailureToBecomeActive();
+ }
return;
}
@@ -433,8 +455,13 @@
public synchronized void processResult(int rc, String path, Object ctx,
Stat stat) {
if (isStaleClient(ctx)) return;
+
+ assert wantToBeInElection :
+ "Got a StatNode result after quitting election";
+
LOG.debug("StatNode result: " + rc + " for path: " + path
- + " connectionState: " + zkConnectionState);
+ + " connectionState: " + zkConnectionState + " for " + this);
+
Code code = Code.get(rc);
if (isSuccess(code)) {
@@ -442,7 +469,9 @@
// creation was retried
if (stat.getEphemeralOwner() == zkClient.getSessionId()) {
// we own the lock znode. so we are the leader
- becomeActive();
+ if (!becomeActive()) {
+ reJoinElectionAfterFailureToBecomeActive();
+ }
} else {
// we dont own the lock znode. so we are a standby.
becomeStandby();
@@ -470,20 +499,37 @@
}
errorMessage = errorMessage
+ ". Not retrying further znode monitoring connection errors.";
+ } else if (isSessionExpired(code)) {
+ // This isn't fatal - the client Watcher will re-join the election
+ LOG.warn("Lock monitoring failed because session was lost");
+ return;
}
fatalError(errorMessage);
}
/**
- * interface implementation of Zookeeper watch events (connection and node)
+ * We failed to become active. Re-join the election, but
+ * sleep for a few seconds after terminating our existing
+ * session, so that other nodes have a chance to become active.
+ * The failure to become active is already logged inside
+ * becomeActive().
+ */
+ private void reJoinElectionAfterFailureToBecomeActive() {
+ reJoinElection(SLEEP_AFTER_FAILURE_TO_BECOME_ACTIVE);
+ }
+
+ /**
+ * interface implementation of Zookeeper watch events (connection and node),
+ * proxied by {@link WatcherWithClientRef}.
*/
synchronized void processWatchEvent(ZooKeeper zk, WatchedEvent event) {
Event.EventType eventType = event.getType();
if (isStaleClient(zk)) return;
LOG.debug("Watcher event type: " + eventType + " with state:"
+ event.getState() + " for path:" + event.getPath()
- + " connectionState: " + zkConnectionState);
+ + " connectionState: " + zkConnectionState
+ + " for " + this);
if (eventType == Event.EventType.None) {
// the connection state has changed
@@ -494,7 +540,8 @@
// be undone
ConnectionState prevConnectionState = zkConnectionState;
zkConnectionState = ConnectionState.CONNECTED;
- if (prevConnectionState == ConnectionState.DISCONNECTED) {
+ if (prevConnectionState == ConnectionState.DISCONNECTED &&
+ wantToBeInElection) {
monitorActiveStatus();
}
break;
@@ -511,7 +558,7 @@
// call listener to reconnect
LOG.info("Session expired. Entering neutral mode and rejoining...");
enterNeutralMode();
- reJoinElection();
+ reJoinElection(0);
break;
default:
fatalError("Unexpected Zookeeper watch event state: "
@@ -559,16 +606,21 @@
protected synchronized ZooKeeper getNewZooKeeper() throws IOException {
ZooKeeper zk = new ZooKeeper(zkHostPort, zkSessionTimeout, null);
zk.register(new WatcherWithClientRef(zk));
+ for (ZKAuthInfo auth : zkAuthInfo) {
+ zk.addAuthInfo(auth.getScheme(), auth.getAuth());
+ }
return zk;
}
private void fatalError(String errorMessage) {
+ LOG.fatal(errorMessage);
reset();
appClient.notifyFatalError(errorMessage);
}
private void monitorActiveStatus() {
- LOG.debug("Monitoring active leader");
+ assert wantToBeInElection;
+ LOG.debug("Monitoring active leader for " + this);
statRetryCount = 0;
monitorLockNodeAsync();
}
@@ -586,7 +638,7 @@
createLockNodeAsync();
}
- private void reJoinElection() {
+ private void reJoinElection(int sleepTime) {
LOG.info("Trying to re-establish ZK session");
// Some of the test cases rely on expiring the ZK sessions and
@@ -599,12 +651,30 @@
sessionReestablishLockForTests.lock();
try {
terminateConnection();
+ sleepFor(sleepTime);
+
joinElectionInternal();
} finally {
sessionReestablishLockForTests.unlock();
}
}
-
+
+ /**
+ * Sleep for the given number of milliseconds.
+ * This is non-static, and separated out, so that unit tests
+ * can override the behavior not to sleep.
+ */
+ @VisibleForTesting
+ protected void sleepFor(int sleepMs) {
+ if (sleepMs > 0) {
+ try {
+ Thread.sleep(sleepMs);
+ } catch (InterruptedException e) {
+ Thread.currentThread().interrupt();
+ }
+ }
+ }
+
@VisibleForTesting
void preventSessionReestablishmentForTests() {
sessionReestablishLockForTests.lock();
@@ -616,8 +686,12 @@
}
@VisibleForTesting
- long getZKSessionIdForTests() {
- return zkClient.getSessionId();
+ synchronized long getZKSessionIdForTests() {
+ if (zkClient != null) {
+ return zkClient.getSessionId();
+ } else {
+ return -1;
+ }
}
@VisibleForTesting
@@ -629,17 +703,13 @@
int connectionRetryCount = 0;
boolean success = false;
while(!success && connectionRetryCount < NUM_RETRIES) {
- LOG.debug("Establishing zookeeper connection");
+ LOG.debug("Establishing zookeeper connection for " + this);
try {
createConnection();
success = true;
} catch(IOException e) {
LOG.warn(e);
- try {
- Thread.sleep(5000);
- } catch(InterruptedException e1) {
- LOG.warn(e1);
- }
+ sleepFor(5000);
}
++connectionRetryCount;
}
@@ -647,14 +717,24 @@
}
private void createConnection() throws IOException {
+ if (zkClient != null) {
+ try {
+ zkClient.close();
+ } catch (InterruptedException e) {
+ throw new IOException("Interrupted while closing ZK",
+ e);
+ }
+ zkClient = null;
+ }
zkClient = getNewZooKeeper();
+ LOG.debug("Created new connection for " + this);
}
- private void terminateConnection() {
+ void terminateConnection() {
if (zkClient == null) {
return;
}
- LOG.debug("Terminating ZK connection");
+ LOG.debug("Terminating ZK connection for " + this);
ZooKeeper tempZk = zkClient;
zkClient = null;
try {
@@ -670,20 +750,24 @@
terminateConnection();
}
- private void becomeActive() {
+ private boolean becomeActive() {
assert wantToBeInElection;
- if (state != State.ACTIVE) {
- try {
- Stat oldBreadcrumbStat = fenceOldActive();
- writeBreadCrumbNode(oldBreadcrumbStat);
- } catch (Exception e) {
- LOG.warn("Exception handling the winning of election", e);
- reJoinElection();
- return;
- }
- LOG.debug("Becoming active");
- state = State.ACTIVE;
+ if (state == State.ACTIVE) {
+ // already active
+ return true;
+ }
+ try {
+ Stat oldBreadcrumbStat = fenceOldActive();
+ writeBreadCrumbNode(oldBreadcrumbStat);
+
+ LOG.debug("Becoming active for " + this);
appClient.becomeActive();
+ state = State.ACTIVE;
+ return true;
+ } catch (Exception e) {
+ LOG.warn("Exception handling the winning of election", e);
+ // Caller will handle quitting and rejoining the election.
+ return false;
}
}
@@ -779,7 +863,7 @@
private void becomeStandby() {
if (state != State.STANDBY) {
- LOG.debug("Becoming standby");
+ LOG.debug("Becoming standby for " + this);
state = State.STANDBY;
appClient.becomeStandby();
}
@@ -787,7 +871,7 @@
private void enterNeutralMode() {
if (state != State.NEUTRAL) {
- LOG.debug("Entering neutral mode");
+ LOG.debug("Entering neutral mode for " + this);
state = State.NEUTRAL;
appClient.enterNeutralMode();
}
@@ -814,6 +898,15 @@
});
}
+ private byte[] getDataWithRetries(final String path, final boolean watch,
+ final Stat stat) throws InterruptedException, KeeperException {
+ return zkDoWithRetries(new ZKAction<byte[]>() {
+ public byte[] run() throws KeeperException, InterruptedException {
+ return zkClient.getData(path, watch, stat);
+ }
+ });
+ }
+
private Stat setDataWithRetries(final String path, final byte[] data,
final int version) throws InterruptedException, KeeperException {
return zkDoWithRetries(new ZKAction<Stat>() {
@@ -884,8 +977,14 @@
@Override
public void process(WatchedEvent event) {
- ActiveStandbyElector.this.processWatchEvent(
- zk, event);
+ try {
+ ActiveStandbyElector.this.processWatchEvent(
+ zk, event);
+ } catch (Throwable t) {
+ fatalError(
+ "Failed to process watcher event " + event + ": " +
+ StringUtils.stringifyException(t));
+ }
}
}
@@ -913,5 +1012,13 @@
}
return false;
}
+
+ @Override
+ public String toString() {
+ return "elector id=" + System.identityHashCode(this) +
+ " appData=" +
+ ((appData == null) ? "null" : StringUtils.byteToHexString(appData)) +
+ " cb=" + appClient;
+ }
}
diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ha/FailoverController.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ha/FailoverController.java
index 22f245a..b1d2c7e 100644
--- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ha/FailoverController.java
+++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ha/FailoverController.java
@@ -27,6 +27,8 @@
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.CommonConfigurationKeys;
import org.apache.hadoop.ha.HAServiceProtocol.HAServiceState;
+import org.apache.hadoop.ha.HAServiceProtocol.StateChangeRequestInfo;
+import org.apache.hadoop.ha.HAServiceProtocol.RequestSource;
import org.apache.hadoop.ipc.RPC;
import com.google.common.base.Preconditions;
@@ -48,9 +50,12 @@
private final Configuration conf;
+ private final RequestSource requestSource;
- public FailoverController(Configuration conf) {
+ public FailoverController(Configuration conf,
+ RequestSource source) {
this.conf = conf;
+ this.requestSource = source;
this.gracefulFenceTimeout = getGracefulFenceTimeout(conf);
this.rpcTimeoutToNewActive = getRpcTimeoutToNewActive(conf);
@@ -100,7 +105,7 @@
toSvcStatus = toSvc.getServiceStatus();
} catch (IOException e) {
String msg = "Unable to get service state for " + target;
- LOG.error(msg, e);
+ LOG.error(msg + ": " + e.getLocalizedMessage());
throw new FailoverFailedException(msg, e);
}
@@ -122,7 +127,7 @@
}
try {
- HAServiceProtocolHelper.monitorHealth(toSvc);
+ HAServiceProtocolHelper.monitorHealth(toSvc, createReqInfo());
} catch (HealthCheckFailedException hce) {
throw new FailoverFailedException(
"Can't failover to an unhealthy service", hce);
@@ -132,7 +137,10 @@
}
}
-
+ private StateChangeRequestInfo createReqInfo() {
+ return new StateChangeRequestInfo(requestSource);
+ }
+
/**
* Try to get the HA state of the node at the given address. This
* function is guaranteed to be "quick" -- ie it has a short timeout
@@ -143,7 +151,7 @@
HAServiceProtocol proxy = null;
try {
proxy = svc.getProxy(conf, gracefulFenceTimeout);
- proxy.transitionToStandby();
+ proxy.transitionToStandby(createReqInfo());
return true;
} catch (ServiceFailedException sfe) {
LOG.warn("Unable to gracefully make " + svc + " standby (" +
@@ -198,7 +206,8 @@
Throwable cause = null;
try {
HAServiceProtocolHelper.transitionToActive(
- toSvc.getProxy(conf, rpcTimeoutToNewActive));
+ toSvc.getProxy(conf, rpcTimeoutToNewActive),
+ createReqInfo());
} catch (ServiceFailedException sfe) {
LOG.error("Unable to make " + toSvc + " active (" +
sfe.getMessage() + "). Failing back.");
diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ha/HAAdmin.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ha/HAAdmin.java
index a3d898c..7d85c01 100644
--- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ha/HAAdmin.java
+++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ha/HAAdmin.java
@@ -19,11 +19,11 @@
import java.io.IOException;
import java.io.PrintStream;
+import java.util.Arrays;
import java.util.Map;
import org.apache.commons.cli.Options;
import org.apache.commons.cli.CommandLine;
-import org.apache.commons.cli.CommandLineParser;
import org.apache.commons.cli.GnuParser;
import org.apache.commons.cli.ParseException;
import org.apache.commons.logging.Log;
@@ -33,9 +33,12 @@
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.fs.CommonConfigurationKeys;
+import org.apache.hadoop.ha.HAServiceProtocol.StateChangeRequestInfo;
+import org.apache.hadoop.ha.HAServiceProtocol.RequestSource;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;
+import com.google.common.base.Preconditions;
import com.google.common.collect.ImmutableMap;
/**
@@ -49,6 +52,13 @@
private static final String FORCEFENCE = "forcefence";
private static final String FORCEACTIVE = "forceactive";
+
+ /**
+ * Undocumented flag which allows an administrator to use manual failover
+ * state transitions even when auto-failover is enabled. This is an unsafe
+ * operation, which is why it is not documented in the usage below.
+ */
+ private static final String FORCEMANUAL = "forcemanual";
private static final Log LOG = LogFactory.getLog(HAAdmin.class);
private int rpcTimeoutForChecks = -1;
@@ -79,6 +89,7 @@
/** Output stream for errors, for use in tests */
protected PrintStream errOut = System.err;
PrintStream out = System.out;
+ private RequestSource requestSource = RequestSource.REQUEST_BY_USER;
protected abstract HAServiceTarget resolveTarget(String string);
@@ -106,63 +117,83 @@
errOut.println("Usage: HAAdmin [" + cmd + " " + usage.args + "]");
}
- private int transitionToActive(final String[] argv)
+ private int transitionToActive(final CommandLine cmd)
throws IOException, ServiceFailedException {
- if (argv.length != 2) {
+ String[] argv = cmd.getArgs();
+ if (argv.length != 1) {
errOut.println("transitionToActive: incorrect number of arguments");
printUsage(errOut, "-transitionToActive");
return -1;
}
-
- HAServiceProtocol proto = resolveTarget(argv[1]).getProxy(
+ HAServiceTarget target = resolveTarget(argv[0]);
+ if (!checkManualStateManagementOK(target)) {
+ return -1;
+ }
+ HAServiceProtocol proto = target.getProxy(
getConf(), 0);
- HAServiceProtocolHelper.transitionToActive(proto);
+ HAServiceProtocolHelper.transitionToActive(proto, createReqInfo());
return 0;
}
- private int transitionToStandby(final String[] argv)
+ private int transitionToStandby(final CommandLine cmd)
throws IOException, ServiceFailedException {
- if (argv.length != 2) {
+ String[] argv = cmd.getArgs();
+ if (argv.length != 1) {
errOut.println("transitionToStandby: incorrect number of arguments");
printUsage(errOut, "-transitionToStandby");
return -1;
}
- HAServiceProtocol proto = resolveTarget(argv[1]).getProxy(
- getConf(), 0);
- HAServiceProtocolHelper.transitionToStandby(proto);
- return 0;
- }
-
- private int failover(final String[] argv)
- throws IOException, ServiceFailedException {
- boolean forceFence = false;
- boolean forceActive = false;
-
- Options failoverOpts = new Options();
- // "-failover" isn't really an option but we need to add
- // it to appease CommandLineParser
- failoverOpts.addOption("failover", false, "failover");
- failoverOpts.addOption(FORCEFENCE, false, "force fencing");
- failoverOpts.addOption(FORCEACTIVE, false, "force failover");
-
- CommandLineParser parser = new GnuParser();
- CommandLine cmd;
-
- try {
- cmd = parser.parse(failoverOpts, argv);
- forceFence = cmd.hasOption(FORCEFENCE);
- forceActive = cmd.hasOption(FORCEACTIVE);
- } catch (ParseException pe) {
- errOut.println("failover: incorrect arguments");
- printUsage(errOut, "-failover");
+ HAServiceTarget target = resolveTarget(argv[0]);
+ if (!checkManualStateManagementOK(target)) {
return -1;
}
-
+ HAServiceProtocol proto = target.getProxy(
+ getConf(), 0);
+ HAServiceProtocolHelper.transitionToStandby(proto, createReqInfo());
+ return 0;
+ }
+ /**
+ * Ensure that we are allowed to manually manage the HA state of the target
+ * service. If automatic failover is configured, then the automatic
+ * failover controllers should be doing state management, and it is generally
+ * an error to use the HAAdmin command line to do so.
+ *
+ * @param target the target to check
+ * @return true if manual state management is allowed
+ */
+ private boolean checkManualStateManagementOK(HAServiceTarget target) {
+ if (target.isAutoFailoverEnabled()) {
+ if (requestSource != RequestSource.REQUEST_BY_USER_FORCED) {
+ errOut.println(
+ "Automatic failover is enabled for " + target + "\n" +
+ "Refusing to manually manage HA state, since it may cause\n" +
+ "a split-brain scenario or other incorrect state.\n" +
+ "If you are very sure you know what you are doing, please \n" +
+ "specify the " + FORCEMANUAL + " flag.");
+ return false;
+ } else {
+ LOG.warn("Proceeding with manual HA state management even though\n" +
+ "automatic failover is enabled for " + target);
+ return true;
+ }
+ }
+ return true;
+ }
+
+ private StateChangeRequestInfo createReqInfo() {
+ return new StateChangeRequestInfo(requestSource);
+ }
+
+ private int failover(CommandLine cmd)
+ throws IOException, ServiceFailedException {
+ boolean forceFence = cmd.hasOption(FORCEFENCE);
+ boolean forceActive = cmd.hasOption(FORCEACTIVE);
+
int numOpts = cmd.getOptions() == null ? 0 : cmd.getOptions().length;
final String[] args = cmd.getArgs();
- if (numOpts > 2 || args.length != 2) {
+ if (numOpts > 3 || args.length != 2) {
errOut.println("failover: incorrect arguments");
printUsage(errOut, "-failover");
return -1;
@@ -171,7 +202,30 @@
HAServiceTarget fromNode = resolveTarget(args[0]);
HAServiceTarget toNode = resolveTarget(args[1]);
- FailoverController fc = new FailoverController(getConf());
+ // Check that auto-failover is consistently configured for both nodes.
+ Preconditions.checkState(
+ fromNode.isAutoFailoverEnabled() ==
+ toNode.isAutoFailoverEnabled(),
+ "Inconsistent auto-failover configs between %s and %s!",
+ fromNode, toNode);
+
+ if (fromNode.isAutoFailoverEnabled()) {
+ if (forceFence || forceActive) {
+ // -forceActive doesn't make sense with auto-HA, since, if the node
+ // is not healthy, then its ZKFC will immediately quit the election
+ // again the next time a health check runs.
+ //
+ // -forceFence doesn't seem to have any real use cases with auto-HA
+ // so it isn't implemented.
+ errOut.println(FORCEFENCE + " and " + FORCEACTIVE + " flags not " +
+ "supported with auto-failover enabled.");
+ return -1;
+ }
+ return gracefulFailoverThroughZKFCs(toNode);
+ }
+
+ FailoverController fc = new FailoverController(getConf(),
+ requestSource);
try {
fc.failover(fromNode, toNode, forceFence, forceActive);
@@ -182,19 +236,44 @@
}
return 0;
}
+
- private int checkHealth(final String[] argv)
+ /**
+ * Initiate a graceful failover by talking to the target node's ZKFC.
+ * This sends an RPC to the ZKFC, which coordinates the failover.
+ *
+ * @param toNode the node to fail to
+ * @return status code (0 for success)
+ * @throws IOException if failover does not succeed
+ */
+ private int gracefulFailoverThroughZKFCs(HAServiceTarget toNode)
+ throws IOException {
+
+ int timeout = FailoverController.getRpcTimeoutToNewActive(getConf());
+ ZKFCProtocol proxy = toNode.getZKFCProxy(getConf(), timeout);
+ try {
+ proxy.gracefulFailover();
+ out.println("Failover to " + toNode + " successful");
+ } catch (ServiceFailedException sfe) {
+ errOut.println("Failover failed: " + sfe.getLocalizedMessage());
+ return -1;
+ }
+
+ return 0;
+ }
+
+ private int checkHealth(final CommandLine cmd)
throws IOException, ServiceFailedException {
- if (argv.length != 2) {
+ String[] argv = cmd.getArgs();
+ if (argv.length != 1) {
errOut.println("checkHealth: incorrect number of arguments");
printUsage(errOut, "-checkHealth");
return -1;
}
-
- HAServiceProtocol proto = resolveTarget(argv[1]).getProxy(
+ HAServiceProtocol proto = resolveTarget(argv[0]).getProxy(
getConf(), rpcTimeoutForChecks);
try {
- HAServiceProtocolHelper.monitorHealth(proto);
+ HAServiceProtocolHelper.monitorHealth(proto, createReqInfo());
} catch (HealthCheckFailedException e) {
errOut.println("Health check failed: " + e.getLocalizedMessage());
return -1;
@@ -202,15 +281,16 @@
return 0;
}
- private int getServiceState(final String[] argv)
+ private int getServiceState(final CommandLine cmd)
throws IOException, ServiceFailedException {
- if (argv.length != 2) {
+ String[] argv = cmd.getArgs();
+ if (argv.length != 1) {
errOut.println("getServiceState: incorrect number of arguments");
printUsage(errOut, "-getServiceState");
return -1;
}
- HAServiceProtocol proto = resolveTarget(argv[1]).getProxy(
+ HAServiceProtocol proto = resolveTarget(argv[0]).getProxy(
getConf(), rpcTimeoutForChecks);
out.println(proto.getServiceStatus().getState());
return 0;
@@ -263,26 +343,101 @@
printUsage(errOut);
return -1;
}
-
- if ("-transitionToActive".equals(cmd)) {
- return transitionToActive(argv);
- } else if ("-transitionToStandby".equals(cmd)) {
- return transitionToStandby(argv);
- } else if ("-failover".equals(cmd)) {
- return failover(argv);
- } else if ("-getServiceState".equals(cmd)) {
- return getServiceState(argv);
- } else if ("-checkHealth".equals(cmd)) {
- return checkHealth(argv);
- } else if ("-help".equals(cmd)) {
- return help(argv);
- } else {
+
+ if (!USAGE.containsKey(cmd)) {
errOut.println(cmd.substring(1) + ": Unknown command");
printUsage(errOut);
return -1;
+ }
+
+ Options opts = new Options();
+
+ // Add command-specific options
+ if ("-failover".equals(cmd)) {
+ addFailoverCliOpts(opts);
+ }
+ // Mutative commands take FORCEMANUAL option
+ if ("-transitionToActive".equals(cmd) ||
+ "-transitionToStandby".equals(cmd) ||
+ "-failover".equals(cmd)) {
+ opts.addOption(FORCEMANUAL, false,
+ "force manual control even if auto-failover is enabled");
+ }
+
+ CommandLine cmdLine = parseOpts(cmd, opts, argv);
+ if (cmdLine == null) {
+ // error already printed
+ return -1;
+ }
+
+ if (cmdLine.hasOption(FORCEMANUAL)) {
+ if (!confirmForceManual()) {
+ LOG.fatal("Aborted");
+ return -1;
+ }
+ // Instruct the NNs to honor this request even if they're
+ // configured for manual failover.
+ requestSource = RequestSource.REQUEST_BY_USER_FORCED;
+ }
+
+ if ("-transitionToActive".equals(cmd)) {
+ return transitionToActive(cmdLine);
+ } else if ("-transitionToStandby".equals(cmd)) {
+ return transitionToStandby(cmdLine);
+ } else if ("-failover".equals(cmd)) {
+ return failover(cmdLine);
+ } else if ("-getServiceState".equals(cmd)) {
+ return getServiceState(cmdLine);
+ } else if ("-checkHealth".equals(cmd)) {
+ return checkHealth(cmdLine);
+ } else if ("-help".equals(cmd)) {
+ return help(argv);
+ } else {
+ // we already checked command validity above, so getting here
+ // would be a coding error
+ throw new AssertionError("Should not get here, command: " + cmd);
}
}
+ private boolean confirmForceManual() throws IOException {
+ return ToolRunner.confirmPrompt(
+ "You have specified the " + FORCEMANUAL + " flag. This flag is " +
+ "dangerous, as it can induce a split-brain scenario that WILL " +
+ "CORRUPT your HDFS namespace, possibly irrecoverably.\n" +
+ "\n" +
+ "It is recommended not to use this flag, but instead to shut down the " +
+ "cluster and disable automatic failover if you prefer to manually " +
+ "manage your HA state.\n" +
+ "\n" +
+ "You may abort safely by answering 'n' or hitting ^C now.\n" +
+ "\n" +
+ "Are you sure you want to continue?");
+ }
+
+ /**
+ * Add CLI options which are specific to the failover command and no
+ * others.
+ */
+ private void addFailoverCliOpts(Options failoverOpts) {
+ failoverOpts.addOption(FORCEFENCE, false, "force fencing");
+ failoverOpts.addOption(FORCEACTIVE, false, "force failover");
+ // Don't add FORCEMANUAL, since that's added separately for all commands
+ // that change state.
+ }
+
+ private CommandLine parseOpts(String cmdName, Options opts, String[] argv) {
+ try {
+ // Strip off the first arg, since that's just the command name
+ argv = Arrays.copyOfRange(argv, 1, argv.length);
+ return new GnuParser().parse(opts, argv);
+ } catch (ParseException pe) {
+ errOut.println(cmdName.substring(1) +
+ ": incorrect arguments");
+ printUsage(errOut, cmdName);
+ return null;
+ }
+ }
+
private int help(String[] argv) {
if (argv.length != 2) {
printUsage(errOut, "-help");
diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ha/HAServiceProtocol.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ha/HAServiceProtocol.java
index b086382..d4ae089 100644
--- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ha/HAServiceProtocol.java
+++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ha/HAServiceProtocol.java
@@ -60,6 +60,31 @@
return name;
}
}
+
+ public static enum RequestSource {
+ REQUEST_BY_USER,
+ REQUEST_BY_USER_FORCED,
+ REQUEST_BY_ZKFC;
+ }
+
+ /**
+ * Information describing the source for a request to change state.
+ * This is used to differentiate requests from automatic vs CLI
+ * failover controllers, and in the future may include epoch
+ * information.
+ */
+ public static class StateChangeRequestInfo {
+ private final RequestSource source;
+
+ public StateChangeRequestInfo(RequestSource source) {
+ super();
+ this.source = source;
+ }
+
+ public RequestSource getSource() {
+ return source;
+ }
+ }
/**
* Monitor the health of service. This periodically called by the HA
@@ -95,7 +120,8 @@
* @throws IOException
* if other errors happen
*/
- public void transitionToActive() throws ServiceFailedException,
+ public void transitionToActive(StateChangeRequestInfo reqInfo)
+ throws ServiceFailedException,
AccessControlException,
IOException;
@@ -110,7 +136,8 @@
* @throws IOException
* if other errors happen
*/
- public void transitionToStandby() throws ServiceFailedException,
+ public void transitionToStandby(StateChangeRequestInfo reqInfo)
+ throws ServiceFailedException,
AccessControlException,
IOException;
diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ha/HAServiceProtocolHelper.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ha/HAServiceProtocolHelper.java
index b8ee717..58d4a7f 100644
--- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ha/HAServiceProtocolHelper.java
+++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ha/HAServiceProtocolHelper.java
@@ -21,6 +21,7 @@
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.classification.InterfaceStability;
+import org.apache.hadoop.ha.HAServiceProtocol.StateChangeRequestInfo;
import org.apache.hadoop.ipc.RemoteException;
/**
@@ -30,7 +31,8 @@
@InterfaceAudience.Public
@InterfaceStability.Evolving
public class HAServiceProtocolHelper {
- public static void monitorHealth(HAServiceProtocol svc)
+ public static void monitorHealth(HAServiceProtocol svc,
+ StateChangeRequestInfo reqInfo)
throws IOException {
try {
svc.monitorHealth();
@@ -39,19 +41,21 @@
}
}
- public static void transitionToActive(HAServiceProtocol svc)
+ public static void transitionToActive(HAServiceProtocol svc,
+ StateChangeRequestInfo reqInfo)
throws IOException {
try {
- svc.transitionToActive();
+ svc.transitionToActive(reqInfo);
} catch (RemoteException e) {
throw e.unwrapRemoteException(ServiceFailedException.class);
}
}
- public static void transitionToStandby(HAServiceProtocol svc)
+ public static void transitionToStandby(HAServiceProtocol svc,
+ StateChangeRequestInfo reqInfo)
throws IOException {
try {
- svc.transitionToStandby();
+ svc.transitionToStandby(reqInfo);
} catch (RemoteException e) {
throw e.unwrapRemoteException(ServiceFailedException.class);
}
diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ha/HAServiceTarget.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ha/HAServiceTarget.java
index 00edfa0..56678b4 100644
--- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ha/HAServiceTarget.java
+++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ha/HAServiceTarget.java
@@ -28,6 +28,7 @@
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.CommonConfigurationKeysPublic;
import org.apache.hadoop.ha.protocolPB.HAServiceProtocolClientSideTranslatorPB;
+import org.apache.hadoop.ha.protocolPB.ZKFCProtocolClientSideTranslatorPB;
import org.apache.hadoop.net.NetUtils;
import com.google.common.collect.Maps;
@@ -49,6 +50,11 @@
public abstract InetSocketAddress getAddress();
/**
+ * @return the IPC address of the ZKFC on the target node
+ */
+ public abstract InetSocketAddress getZKFCAddress();
+
+ /**
* @return a Fencer implementation configured for this target node
*/
public abstract NodeFencer getFencer();
@@ -76,6 +82,20 @@
confCopy, factory, timeoutMs);
}
+ /**
+ * @return a proxy to the ZKFC which is associated with this HA service.
+ */
+ public ZKFCProtocol getZKFCProxy(Configuration conf, int timeoutMs)
+ throws IOException {
+ Configuration confCopy = new Configuration(conf);
+ // Lower the timeout so we quickly fail to connect
+ confCopy.setInt(CommonConfigurationKeysPublic.IPC_CLIENT_CONNECT_MAX_RETRIES_KEY, 1);
+ SocketFactory factory = NetUtils.getDefaultSocketFactory(confCopy);
+ return new ZKFCProtocolClientSideTranslatorPB(
+ getZKFCAddress(),
+ confCopy, factory, timeoutMs);
+ }
+
public final Map<String, String> getFencingParameters() {
Map<String, String> ret = Maps.newHashMap();
addFencingParameters(ret);
@@ -99,4 +119,11 @@
ret.put(HOST_SUBST_KEY, getAddress().getHostName());
ret.put(PORT_SUBST_KEY, String.valueOf(getAddress().getPort()));
}
+
+ /**
+ * @return true if auto failover should be considered enabled
+ */
+ public boolean isAutoFailoverEnabled() {
+ return false;
+ }
}
diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ha/HAZKUtil.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ha/HAZKUtil.java
new file mode 100644
index 0000000..093b878
--- /dev/null
+++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ha/HAZKUtil.java
@@ -0,0 +1,199 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.ha;
+
+import java.io.File;
+import java.io.IOException;
+import java.util.List;
+
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.HadoopIllegalArgumentException;
+import org.apache.zookeeper.ZooDefs;
+import org.apache.zookeeper.data.ACL;
+import org.apache.zookeeper.data.Id;
+
+import com.google.common.base.Charsets;
+import com.google.common.base.Splitter;
+import com.google.common.collect.Lists;
+import com.google.common.io.Files;
+
+/**
+ * Utilities for working with ZooKeeper.
+ */
+@InterfaceAudience.Private
+public class HAZKUtil {
+
+ /**
+ * Parse ACL permission string, partially borrowed from
+ * ZooKeeperMain private method
+ */
+ private static int getPermFromString(String permString) {
+ int perm = 0;
+ for (int i = 0; i < permString.length(); i++) {
+ char c = permString.charAt(i);
+ switch (c) {
+ case 'r':
+ perm |= ZooDefs.Perms.READ;
+ break;
+ case 'w':
+ perm |= ZooDefs.Perms.WRITE;
+ break;
+ case 'c':
+ perm |= ZooDefs.Perms.CREATE;
+ break;
+ case 'd':
+ perm |= ZooDefs.Perms.DELETE;
+ break;
+ case 'a':
+ perm |= ZooDefs.Perms.ADMIN;
+ break;
+ default:
+ throw new BadAclFormatException(
+ "Invalid permission '" + c + "' in permission string '" +
+ permString + "'");
+ }
+ }
+ return perm;
+ }
+
+ /**
+ * Parse comma separated list of ACL entries to secure generated nodes, e.g.
+ * <code>sasl:hdfs/host1@MY.DOMAIN:cdrwa,sasl:hdfs/host2@MY.DOMAIN:cdrwa</code>
+ *
+ * @return ACL list
+ * @throws HadoopIllegalArgumentException if an ACL is invalid
+ */
+ public static List<ACL> parseACLs(String aclString) {
+ List<ACL> acl = Lists.newArrayList();
+ if (aclString == null) {
+ return acl;
+ }
+
+ List<String> aclComps = Lists.newArrayList(
+ Splitter.on(',').omitEmptyStrings().trimResults()
+ .split(aclString));
+ for (String a : aclComps) {
+ // from ZooKeeperMain private method
+ int firstColon = a.indexOf(':');
+ int lastColon = a.lastIndexOf(':');
+ if (firstColon == -1 || lastColon == -1 || firstColon == lastColon) {
+ throw new BadAclFormatException(
+ "ACL '" + a + "' not of expected form scheme:id:perm");
+ }
+
+ ACL newAcl = new ACL();
+ newAcl.setId(new Id(a.substring(0, firstColon), a.substring(
+ firstColon + 1, lastColon)));
+ newAcl.setPerms(getPermFromString(a.substring(lastColon + 1)));
+ acl.add(newAcl);
+ }
+
+ return acl;
+ }
+
+ /**
+ * Parse a comma-separated list of authentication mechanisms. Each
+ * such mechanism should be of the form 'scheme:auth' -- the same
+ * syntax used for the 'addAuth' command in the ZK CLI.
+ *
+ * @param authString the comma-separated auth mechanisms
+ * @return a list of parsed authentications
+ */
+ public static List<ZKAuthInfo> parseAuth(String authString) {
+ List<ZKAuthInfo> ret = Lists.newArrayList();
+ if (authString == null) {
+ return ret;
+ }
+
+ List<String> authComps = Lists.newArrayList(
+ Splitter.on(',').omitEmptyStrings().trimResults()
+ .split(authString));
+
+ for (String comp : authComps) {
+ String parts[] = comp.split(":", 2);
+ if (parts.length != 2) {
+ throw new BadAuthFormatException(
+ "Auth '" + comp + "' not of expected form scheme:auth");
+ }
+ ret.add(new ZKAuthInfo(parts[0],
+ parts[1].getBytes(Charsets.UTF_8)));
+ }
+ return ret;
+ }
+
+ /**
+ * Because ZK ACLs and authentication information may be secret,
+ * allow the configuration values to be indirected through a file
+ * by specifying the configuration as "@/path/to/file". If this
+ * syntax is used, this function will return the contents of the file
+ * as a String.
+ *
+ * @param valInConf the value from the Configuration
+ * @return either the same value, or the contents of the referenced
+ * file if the configured value starts with "@"
+ * @throws IOException if the file cannot be read
+ */
+ public static String resolveConfIndirection(String valInConf)
+ throws IOException {
+ if (valInConf == null) return null;
+ if (!valInConf.startsWith("@")) {
+ return valInConf;
+ }
+ String path = valInConf.substring(1).trim();
+ return Files.toString(new File(path), Charsets.UTF_8).trim();
+ }
+
+ /**
+ * An authentication token passed to ZooKeeper.addAuthInfo
+ */
+ static class ZKAuthInfo {
+ private final String scheme;
+ private final byte[] auth;
+
+ public ZKAuthInfo(String scheme, byte[] auth) {
+ super();
+ this.scheme = scheme;
+ this.auth = auth;
+ }
+
+ String getScheme() {
+ return scheme;
+ }
+
+ byte[] getAuth() {
+ return auth;
+ }
+ }
+
+ static class BadAclFormatException extends HadoopIllegalArgumentException {
+ private static final long serialVersionUID = 1L;
+
+ public BadAclFormatException(String message) {
+ super(message);
+ }
+ }
+
+ static class BadAuthFormatException extends HadoopIllegalArgumentException {
+ private static final long serialVersionUID = 1L;
+
+ public BadAuthFormatException(String message) {
+ super(message);
+ }
+ }
+
+}
diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ha/HealthMonitor.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ha/HealthMonitor.java
index 7533529..a349626 100644
--- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ha/HealthMonitor.java
+++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ha/HealthMonitor.java
@@ -22,6 +22,7 @@
import java.util.LinkedList;
import java.util.List;
+import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
@@ -43,7 +44,8 @@
* Classes which need callbacks should implement the {@link Callback}
* interface.
*/
-class HealthMonitor {
+@InterfaceAudience.Private
+public class HealthMonitor {
private static final Log LOG = LogFactory.getLog(
HealthMonitor.class);
@@ -75,7 +77,8 @@
private HAServiceStatus lastServiceState = new HAServiceStatus(
HAServiceState.INITIALIZING);
- enum State {
+ @InterfaceAudience.Private
+ public enum State {
/**
* The health monitor is still starting up.
*/
diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ha/ZKFCProtocol.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ha/ZKFCProtocol.java
new file mode 100644
index 0000000..02342f4
--- /dev/null
+++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ha/ZKFCProtocol.java
@@ -0,0 +1,101 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.ha;
+
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.classification.InterfaceStability;
+import org.apache.hadoop.fs.CommonConfigurationKeys;
+import org.apache.hadoop.io.retry.Idempotent;
+import org.apache.hadoop.security.AccessControlException;
+import org.apache.hadoop.security.KerberosInfo;
+
+import java.io.IOException;
+
+/**
+ * Protocol exposed by the ZKFailoverController, allowing for graceful
+ * failover.
+ */
+@KerberosInfo(
+ serverPrincipal=CommonConfigurationKeys.HADOOP_SECURITY_SERVICE_USER_NAME_KEY)
+@InterfaceAudience.Private
+@InterfaceStability.Evolving
+public interface ZKFCProtocol {
+ /**
+ * Initial version of the protocol
+ */
+ public static final long versionID = 1L;
+
+ /**
+ * Request that this service yield from the active node election for the
+ * specified time period.
+ *
+ * If the node is not currently active, it simply prevents any attempts
+ * to become active for the specified time period. Otherwise, it first
+ * tries to transition the local service to standby state, and then quits
+ * the election.
+ *
+ * If the attempt to transition to standby succeeds, then the ZKFC receiving
+ * this RPC will delete its own breadcrumb node in ZooKeeper. Thus, the
+ * next node to become active will not run any fencing process. Otherwise,
+ * the breadcrumb will be left, such that the next active will fence this
+ * node.
+ *
+ * After the specified time period elapses, the node will attempt to re-join
+ * the election, provided that its service is healthy.
+ *
+ * If the node has previously been instructed to cede active, and is still
+ * within the specified time period, the later command's time period will
+ * take precedence, resetting the timer.
+ *
+ * A call to cedeActive which specifies a 0 or negative time period will
+ * allow the target node to immediately rejoin the election, so long as
+ * it is healthy.
+ *
+ * @param millisToCede period for which the node should not attempt to
+ * become active
+ * @throws IOException if the operation fails
+ * @throws AccessControlException if the operation is disallowed
+ */
+ @Idempotent
+ public void cedeActive(int millisToCede)
+ throws IOException, AccessControlException;
+
+ /**
+ * Request that this node try to become active through a graceful failover.
+ *
+ * If the node is already active, this is a no-op and simply returns success
+ * without taking any further action.
+ *
+ * If the node is not healthy, it will throw an exception indicating that it
+ * is not able to become active.
+ *
+ * If the node is healthy and not active, it will try to initiate a graceful
+ * failover to become active, returning only when it has successfully become
+ * active. See {@link ZKFailoverController#gracefulFailoverToYou()} for the
+ * implementation details.
+ *
+ * If the node fails to successfully coordinate the failover, throws an
+ * exception indicating the reason for failure.
+ *
+ * @throws IOException if graceful failover fails
+ * @throws AccessControlException if the operation is disallowed
+ */
+ @Idempotent
+ public void gracefulFailover()
+ throws IOException, AccessControlException;
+}
diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ha/ZKFCRpcServer.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ha/ZKFCRpcServer.java
new file mode 100644
index 0000000..2077a86
--- /dev/null
+++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ha/ZKFCRpcServer.java
@@ -0,0 +1,98 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.ha;
+
+import java.io.IOException;
+import java.net.InetSocketAddress;
+
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.classification.InterfaceStability;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.CommonConfigurationKeys;
+import org.apache.hadoop.ha.proto.ZKFCProtocolProtos.ZKFCProtocolService;
+import org.apache.hadoop.ha.protocolPB.ZKFCProtocolPB;
+import org.apache.hadoop.ha.protocolPB.ZKFCProtocolServerSideTranslatorPB;
+import org.apache.hadoop.ipc.ProtobufRpcEngine;
+import org.apache.hadoop.ipc.RPC;
+import org.apache.hadoop.ipc.RPC.Server;
+import org.apache.hadoop.security.AccessControlException;
+import org.apache.hadoop.security.authorize.PolicyProvider;
+
+import com.google.protobuf.BlockingService;
+
+@InterfaceAudience.LimitedPrivate("HDFS")
+@InterfaceStability.Evolving
+public class ZKFCRpcServer implements ZKFCProtocol {
+
+ private static final int HANDLER_COUNT = 3;
+ private final ZKFailoverController zkfc;
+ private Server server;
+
+ ZKFCRpcServer(Configuration conf,
+ InetSocketAddress bindAddr,
+ ZKFailoverController zkfc,
+ PolicyProvider policy) throws IOException {
+ this.zkfc = zkfc;
+
+ RPC.setProtocolEngine(conf, ZKFCProtocolPB.class,
+ ProtobufRpcEngine.class);
+ ZKFCProtocolServerSideTranslatorPB translator =
+ new ZKFCProtocolServerSideTranslatorPB(this);
+ BlockingService service = ZKFCProtocolService
+ .newReflectiveBlockingService(translator);
+ this.server = RPC.getServer(
+ ZKFCProtocolPB.class,
+ service, bindAddr.getHostName(),
+ bindAddr.getPort(), HANDLER_COUNT, false, conf,
+ null /*secretManager*/);
+
+ // set service-level authorization security policy
+ if (conf.getBoolean(
+ CommonConfigurationKeys.HADOOP_SECURITY_AUTHORIZATION, false)) {
+ server.refreshServiceAcl(conf, policy);
+ }
+
+ }
+
+ void start() {
+ this.server.start();
+ }
+
+ public InetSocketAddress getAddress() {
+ return server.getListenerAddress();
+ }
+
+ void stopAndJoin() throws InterruptedException {
+ this.server.stop();
+ this.server.join();
+ }
+
+ @Override
+ public void cedeActive(int millisToCede) throws IOException,
+ AccessControlException {
+ zkfc.checkRpcAdminAccess();
+ zkfc.cedeActive(millisToCede);
+ }
+
+ @Override
+ public void gracefulFailover() throws IOException, AccessControlException {
+ zkfc.checkRpcAdminAccess();
+ zkfc.gracefulFailoverToYou();
+ }
+
+}
diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ha/ZKFailoverController.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ha/ZKFailoverController.java
index 9a50fe6..c02fe0d 100644
--- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ha/ZKFailoverController.java
+++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ha/ZKFailoverController.java
@@ -18,79 +18,143 @@
package org.apache.hadoop.ha;
import java.io.IOException;
+import java.net.InetSocketAddress;
import java.security.PrivilegedAction;
+import java.security.PrivilegedExceptionAction;
+import java.util.Collections;
import java.util.List;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.TimeUnit;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.HadoopIllegalArgumentException;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.ha.ActiveStandbyElector.ActiveNotFoundException;
import org.apache.hadoop.ha.ActiveStandbyElector.ActiveStandbyElectorCallback;
+import org.apache.hadoop.ha.HAServiceProtocol.StateChangeRequestInfo;
+import org.apache.hadoop.ha.HAServiceProtocol.RequestSource;
+import org.apache.hadoop.ha.HAZKUtil.ZKAuthInfo;
import org.apache.hadoop.ha.HealthMonitor.State;
+import org.apache.hadoop.ipc.Server;
+import org.apache.hadoop.security.AccessControlException;
import org.apache.hadoop.security.SecurityUtil;
-import org.apache.hadoop.util.Tool;
-import org.apache.hadoop.util.ToolRunner;
+import org.apache.hadoop.security.UserGroupInformation;
+import org.apache.hadoop.security.authorize.PolicyProvider;
+import org.apache.hadoop.util.StringUtils;
+import org.apache.zookeeper.KeeperException;
import org.apache.zookeeper.ZooDefs.Ids;
+import org.apache.hadoop.util.ToolRunner;
import org.apache.zookeeper.data.ACL;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Preconditions;
+import com.google.common.base.Throwables;
+import com.google.common.util.concurrent.ThreadFactoryBuilder;
@InterfaceAudience.LimitedPrivate("HDFS")
-public abstract class ZKFailoverController implements Tool {
+public abstract class ZKFailoverController {
static final Log LOG = LogFactory.getLog(ZKFailoverController.class);
- // TODO: this should be namespace-scoped
public static final String ZK_QUORUM_KEY = "ha.zookeeper.quorum";
private static final String ZK_SESSION_TIMEOUT_KEY = "ha.zookeeper.session-timeout.ms";
private static final int ZK_SESSION_TIMEOUT_DEFAULT = 5*1000;
private static final String ZK_PARENT_ZNODE_KEY = "ha.zookeeper.parent-znode";
+ public static final String ZK_ACL_KEY = "ha.zookeeper.acl";
+ private static final String ZK_ACL_DEFAULT = "world:anyone:rwcda";
+ public static final String ZK_AUTH_KEY = "ha.zookeeper.auth";
static final String ZK_PARENT_ZNODE_DEFAULT = "/hadoop-ha";
+ /**
+ * All of the conf keys used by the ZKFC. This is used in order to allow
+ * them to be overridden on a per-nameservice or per-namenode basis.
+ */
+ protected static final String[] ZKFC_CONF_KEYS = new String[] {
+ ZK_QUORUM_KEY,
+ ZK_SESSION_TIMEOUT_KEY,
+ ZK_PARENT_ZNODE_KEY,
+ ZK_ACL_KEY,
+ ZK_AUTH_KEY
+ };
+
+
/** Unable to format the parent znode in ZK */
static final int ERR_CODE_FORMAT_DENIED = 2;
/** The parent znode doesn't exist in ZK */
static final int ERR_CODE_NO_PARENT_ZNODE = 3;
/** Fencing is not properly configured */
static final int ERR_CODE_NO_FENCER = 4;
+ /** Automatic failover is not enabled */
+ static final int ERR_CODE_AUTO_FAILOVER_NOT_ENABLED = 5;
+ /** Cannot connect to ZooKeeper */
+ static final int ERR_CODE_NO_ZK = 6;
- private Configuration conf;
+ protected Configuration conf;
+ private String zkQuorum;
+ protected final HAServiceTarget localTarget;
private HealthMonitor healthMonitor;
private ActiveStandbyElector elector;
-
- private HAServiceTarget localTarget;
-
- private String parentZnode;
+ protected ZKFCRpcServer rpcServer;
private State lastHealthState = State.INITIALIZING;
/** Set if a fatal error occurs */
private String fatalError = null;
- @Override
- public void setConf(Configuration conf) {
+ /**
+ * A future nanotime before which the ZKFC will not join the election.
+ * This is used during graceful failover.
+ */
+ private long delayJoiningUntilNanotime = 0;
+
+ /** Executor on which {@link #scheduleRecheck(long)} schedules events */
+ private ScheduledExecutorService delayExecutor =
+ Executors.newScheduledThreadPool(1,
+ new ThreadFactoryBuilder().setDaemon(true)
+ .setNameFormat("ZKFC Delay timer #%d")
+ .build());
+
+ private ActiveAttemptRecord lastActiveAttemptRecord;
+ private Object activeAttemptRecordLock = new Object();
+
+ protected ZKFailoverController(Configuration conf, HAServiceTarget localTarget) {
+ this.localTarget = localTarget;
this.conf = conf;
- localTarget = getLocalTarget();
}
protected abstract byte[] targetToData(HAServiceTarget target);
- protected abstract HAServiceTarget getLocalTarget();
protected abstract HAServiceTarget dataToTarget(byte[] data);
+ protected abstract void loginAsFCUser() throws IOException;
+ protected abstract void checkRpcAdminAccess()
+ throws AccessControlException, IOException;
+ protected abstract InetSocketAddress getRpcAddressToBindTo();
+ protected abstract PolicyProvider getPolicyProvider();
+ /**
+ * Return the name of a znode inside the configured parent znode in which
+ * the ZKFC will do all of its work. This is so that multiple federated
+ * nameservices can run on the same ZK quorum without having to manually
+ * configure them to separate subdirectories.
+ */
+ protected abstract String getScopeInsideParentNode();
- @Override
- public Configuration getConf() {
- return conf;
+ public HAServiceTarget getLocalTarget() {
+ return localTarget;
}
-
- @Override
+
public int run(final String[] args) throws Exception {
- // TODO: need to hook DFS here to find the NN keytab info, etc,
- // similar to what DFSHAAdmin does. Annoying that this is in common.
+ if (!localTarget.isAutoFailoverEnabled()) {
+ LOG.fatal("Automatic failover is not enabled for " + localTarget + "." +
+ " Please ensure that automatic failover is enabled in the " +
+ "configuration before running the ZK failover controller.");
+ return ERR_CODE_AUTO_FAILOVER_NOT_ENABLED;
+ }
+ loginAsFCUser();
try {
return SecurityUtil.doAsLoginUserOrFatal(new PrivilegedAction<Integer>() {
@Override
@@ -99,6 +163,10 @@
return doRun(args);
} catch (Exception t) {
throw new RuntimeException(t);
+ } finally {
+ if (elector != null) {
+ elector.terminateConnection();
+ }
}
}
});
@@ -107,6 +175,7 @@
}
}
+
private int doRun(String[] args)
throws HadoopIllegalArgumentException, IOException, InterruptedException {
initZK();
@@ -129,11 +198,23 @@
}
}
- if (!elector.parentZNodeExists()) {
- LOG.fatal("Unable to start failover controller. " +
- "Parent znode does not exist.\n" +
- "Run with -formatZK flag to initialize ZooKeeper.");
- return ERR_CODE_NO_PARENT_ZNODE;
+ try {
+ if (!elector.parentZNodeExists()) {
+ LOG.fatal("Unable to start failover controller. " +
+ "Parent znode does not exist.\n" +
+ "Run with -formatZK flag to initialize ZooKeeper.");
+ return ERR_CODE_NO_PARENT_ZNODE;
+ }
+ } catch (IOException ioe) {
+ if (ioe.getCause() instanceof KeeperException.ConnectionLossException) {
+ LOG.fatal("Unable to start failover controller. Unable to connect " +
+ "to ZooKeeper quorum at " + zkQuorum + ". Please check the " +
+ "configured value for " + ZK_QUORUM_KEY + " and ensure that " +
+ "ZooKeeper is running.");
+ return ERR_CODE_NO_ZK;
+ } else {
+ throw ioe;
+ }
}
try {
@@ -145,8 +226,18 @@
return ERR_CODE_NO_FENCER;
}
+ initRPC();
initHM();
- mainLoop();
+ startRPC();
+ try {
+ mainLoop();
+ } finally {
+ rpcServer.stopAndJoin();
+
+ elector.quitElection(true);
+ healthMonitor.shutdown();
+ healthMonitor.join();
+ }
return 0;
}
@@ -181,6 +272,7 @@
}
private boolean confirmFormat() {
+ String parentZnode = getParentZnode();
System.err.println(
"===============================================\n" +
"The configured parent znode " + parentZnode + " already exists.\n" +
@@ -206,16 +298,40 @@
healthMonitor.addCallback(new HealthCallbacks());
healthMonitor.start();
}
+
+ protected void initRPC() throws IOException {
+ InetSocketAddress bindAddr = getRpcAddressToBindTo();
+ rpcServer = new ZKFCRpcServer(conf, bindAddr, this, getPolicyProvider());
+ }
+
+ protected void startRPC() throws IOException {
+ rpcServer.start();
+ }
+
private void initZK() throws HadoopIllegalArgumentException, IOException {
- String zkQuorum = conf.get(ZK_QUORUM_KEY);
+ zkQuorum = conf.get(ZK_QUORUM_KEY);
int zkTimeout = conf.getInt(ZK_SESSION_TIMEOUT_KEY,
ZK_SESSION_TIMEOUT_DEFAULT);
- parentZnode = conf.get(ZK_PARENT_ZNODE_KEY,
- ZK_PARENT_ZNODE_DEFAULT);
- // TODO: need ZK ACL support in config, also maybe auth!
- List<ACL> zkAcls = Ids.OPEN_ACL_UNSAFE;
+ // Parse ACLs from configuration.
+ String zkAclConf = conf.get(ZK_ACL_KEY, ZK_ACL_DEFAULT);
+ zkAclConf = HAZKUtil.resolveConfIndirection(zkAclConf);
+ List<ACL> zkAcls = HAZKUtil.parseACLs(zkAclConf);
+ if (zkAcls.isEmpty()) {
+ zkAcls = Ids.CREATOR_ALL_ACL;
+ }
+
+ // Parse authentication from configuration.
+ String zkAuthConf = conf.get(ZK_AUTH_KEY);
+ zkAuthConf = HAZKUtil.resolveConfIndirection(zkAuthConf);
+ List<ZKAuthInfo> zkAuths;
+ if (zkAuthConf != null) {
+ zkAuths = HAZKUtil.parseAuth(zkAuthConf);
+ } else {
+ zkAuths = Collections.emptyList();
+ }
+ // Sanity check configuration.
Preconditions.checkArgument(zkQuorum != null,
"Missing required configuration '%s' for ZooKeeper quorum",
ZK_QUORUM_KEY);
@@ -224,9 +340,19 @@
elector = new ActiveStandbyElector(zkQuorum,
- zkTimeout, parentZnode, zkAcls, new ElectorCallbacks());
+ zkTimeout, getParentZnode(), zkAcls, zkAuths,
+ new ElectorCallbacks());
}
+ private String getParentZnode() {
+ String znode = conf.get(ZK_PARENT_ZNODE_KEY,
+ ZK_PARENT_ZNODE_DEFAULT);
+ if (!znode.endsWith("/")) {
+ znode += "/";
+ }
+ return znode + getScopeInsideParentNode();
+ }
+
private synchronized void mainLoop() throws InterruptedException {
while (fatalError == null) {
wait();
@@ -242,16 +368,30 @@
notifyAll();
}
- private synchronized void becomeActive() {
+ private synchronized void becomeActive() throws ServiceFailedException {
LOG.info("Trying to make " + localTarget + " active...");
try {
HAServiceProtocolHelper.transitionToActive(localTarget.getProxy(
- conf, FailoverController.getRpcTimeoutToNewActive(conf)));
- LOG.info("Successfully transitioned " + localTarget +
- " to active state");
+ conf, FailoverController.getRpcTimeoutToNewActive(conf)),
+ createReqInfo());
+ String msg = "Successfully transitioned " + localTarget +
+ " to active state";
+ LOG.info(msg);
+ recordActiveAttempt(new ActiveAttemptRecord(true, msg));
+
} catch (Throwable t) {
- LOG.fatal("Couldn't make " + localTarget + " active", t);
- elector.quitElection(true);
+ String msg = "Couldn't make " + localTarget + " active";
+ LOG.fatal(msg, t);
+
+ recordActiveAttempt(new ActiveAttemptRecord(false, msg + "\n" +
+ StringUtils.stringifyException(t)));
+
+ if (t instanceof ServiceFailedException) {
+ throw (ServiceFailedException)t;
+ } else {
+ throw new ServiceFailedException("Couldn't transition to active",
+ t);
+ }
/*
* TODO:
* we need to make sure that if we get fenced and then quickly restarted,
@@ -264,12 +404,79 @@
}
}
+ /**
+ * Store the results of the last attempt to become active.
+ * This is used so that, during manually initiated failover,
+ * we can report back the results of the attempt to become active
+ * to the initiator of the failover.
+ */
+ private void recordActiveAttempt(
+ ActiveAttemptRecord record) {
+ synchronized (activeAttemptRecordLock) {
+ lastActiveAttemptRecord = record;
+ activeAttemptRecordLock.notifyAll();
+ }
+ }
+
+ /**
+ * Wait until one of the following events:
+ * <ul>
+ * <li>Another thread publishes the results of an attempt to become active
+ * using {@link #recordActiveAttempt(ActiveAttemptRecord)}</li>
+ * <li>The node enters bad health status</li>
+ * <li>The specified timeout elapses</li>
+ * </ul>
+ *
+ * @param timeoutMillis number of millis to wait
+ * @return the published record, or null if the timeout elapses or the
+ * service becomes unhealthy
+ * @throws InterruptedException if the thread is interrupted.
+ */
+ private ActiveAttemptRecord waitForActiveAttempt(int timeoutMillis)
+ throws InterruptedException {
+ long st = System.nanoTime();
+ long waitUntil = st + TimeUnit.NANOSECONDS.convert(
+ timeoutMillis, TimeUnit.MILLISECONDS);
+
+ do {
+ // periodically check health state, because entering an
+ // unhealthy state could prevent us from ever attempting to
+ // become active. We can detect this and respond to the user
+ // immediately.
+ synchronized (this) {
+ if (lastHealthState != State.SERVICE_HEALTHY) {
+ // early out if service became unhealthy
+ return null;
+ }
+ }
+
+ synchronized (activeAttemptRecordLock) {
+ if ((lastActiveAttemptRecord != null &&
+ lastActiveAttemptRecord.nanoTime >= st)) {
+ return lastActiveAttemptRecord;
+ }
+ // Only wait 1sec so that we periodically recheck the health state
+ // above.
+ activeAttemptRecordLock.wait(1000);
+ }
+ } while (System.nanoTime() < waitUntil);
+
+ // Timeout elapsed.
+ LOG.warn(timeoutMillis + "ms timeout elapsed waiting for an attempt " +
+ "to become active");
+ return null;
+ }
+
+ private StateChangeRequestInfo createReqInfo() {
+ return new StateChangeRequestInfo(RequestSource.REQUEST_BY_ZKFC);
+ }
+
private synchronized void becomeStandby() {
LOG.info("ZK Election indicated that " + localTarget +
" should become standby");
try {
int timeout = FailoverController.getGracefulFenceTimeout(conf);
- localTarget.getProxy(conf, timeout).transitionToStandby();
+ localTarget.getProxy(conf, timeout).transitionToStandby(createReqInfo());
LOG.info("Successfully transitioned " + localTarget +
" to standby state");
} catch (Exception e) {
@@ -279,27 +486,336 @@
// at the same time.
}
}
+
+
+ private synchronized void fenceOldActive(byte[] data) {
+ HAServiceTarget target = dataToTarget(data);
+
+ try {
+ doFence(target);
+ } catch (Throwable t) {
+ recordActiveAttempt(new ActiveAttemptRecord(false, "Unable to fence old active: " + StringUtils.stringifyException(t)));
+ Throwables.propagate(t);
+ }
+ }
+
+ private void doFence(HAServiceTarget target) {
+ LOG.info("Should fence: " + target);
+ boolean gracefulWorked = new FailoverController(conf,
+ RequestSource.REQUEST_BY_ZKFC).tryGracefulFence(target);
+ if (gracefulWorked) {
+ // It's possible that it's in standby but just about to go into active,
+ // no? Is there some race here?
+ LOG.info("Successfully transitioned " + target + " to standby " +
+ "state without fencing");
+ return;
+ }
+
+ try {
+ target.checkFencingConfigured();
+ } catch (BadFencingConfigurationException e) {
+ LOG.error("Couldn't fence old active " + target, e);
+ recordActiveAttempt(new ActiveAttemptRecord(false, "Unable to fence old active"));
+ throw new RuntimeException(e);
+ }
+
+ if (!target.getFencer().fence(target)) {
+ throw new RuntimeException("Unable to fence " + target);
+ }
+ }
+
+
+ /**
+ * Request from graceful failover to cede active role. Causes
+ * this ZKFC to transition its local node to standby, then quit
+ * the election for the specified period of time, after which it
+ * will rejoin iff it is healthy.
+ */
+ void cedeActive(final int millisToCede)
+ throws AccessControlException, ServiceFailedException, IOException {
+ try {
+ UserGroupInformation.getLoginUser().doAs(new PrivilegedExceptionAction<Void>() {
+ @Override
+ public Void run() throws Exception {
+ doCedeActive(millisToCede);
+ return null;
+ }
+ });
+ } catch (InterruptedException e) {
+ throw new IOException(e);
+ }
+ }
+
+ private void doCedeActive(int millisToCede)
+ throws AccessControlException, ServiceFailedException, IOException {
+ int timeout = FailoverController.getGracefulFenceTimeout(conf);
+
+ // Lock elector to maintain lock ordering of elector -> ZKFC
+ synchronized (elector) {
+ synchronized (this) {
+ if (millisToCede <= 0) {
+ delayJoiningUntilNanotime = 0;
+ recheckElectability();
+ return;
+ }
+
+ LOG.info("Requested by " + UserGroupInformation.getCurrentUser() +
+ " at " + Server.getRemoteAddress() + " to cede active role.");
+ boolean needFence = false;
+ try {
+ localTarget.getProxy(conf, timeout).transitionToStandby(createReqInfo());
+ LOG.info("Successfully ensured local node is in standby mode");
+ } catch (IOException ioe) {
+ LOG.warn("Unable to transition local node to standby: " +
+ ioe.getLocalizedMessage());
+ LOG.warn("Quitting election but indicating that fencing is " +
+ "necessary");
+ needFence = true;
+ }
+ delayJoiningUntilNanotime = System.nanoTime() +
+ TimeUnit.MILLISECONDS.toNanos(millisToCede);
+ elector.quitElection(needFence);
+ }
+ }
+ recheckElectability();
+ }
+
+ /**
+ * Coordinate a graceful failover to this node.
+ * @throws ServiceFailedException if the node fails to become active
+ * @throws IOException some other error occurs
+ */
+ void gracefulFailoverToYou() throws ServiceFailedException, IOException {
+ try {
+ UserGroupInformation.getLoginUser().doAs(new PrivilegedExceptionAction<Void>() {
+ @Override
+ public Void run() throws Exception {
+ doGracefulFailover();
+ return null;
+ }
+
+ });
+ } catch (InterruptedException e) {
+ throw new IOException(e);
+ }
+ }
+
+ /**
+ * Coordinate a graceful failover. This proceeds in several phases:
+ * 1) Pre-flight checks: ensure that the local node is healthy, and
+ * thus a candidate for failover.
+ * 2) Determine the current active node. If it is the local node, no
+ * need to failover - return success.
+ * 3) Ask that node to yield from the election for a number of seconds.
+ * 4) Allow the normal election path to run in other threads. Wait until
+ * we either become unhealthy or we see an election attempt recorded by
+ * the normal code path.
+ * 5) Allow the old active to rejoin the election, so a future
+ * failback is possible.
+ */
+ private void doGracefulFailover()
+ throws ServiceFailedException, IOException, InterruptedException {
+ int timeout = FailoverController.getGracefulFenceTimeout(conf) * 2;
+
+ // Phase 1: pre-flight checks
+ checkEligibleForFailover();
+
+ // Phase 2: determine old/current active node. Check that we're not
+ // ourselves active, etc.
+ HAServiceTarget oldActive = getCurrentActive();
+ if (oldActive == null) {
+ // No node is currently active. So, if we aren't already
+ // active ourselves by means of a normal election, then there's
+ // probably something preventing us from becoming active.
+ throw new ServiceFailedException(
+ "No other node is currently active.");
+ }
+
+ if (oldActive.getAddress().equals(localTarget.getAddress())) {
+ LOG.info("Local node " + localTarget + " is already active. " +
+ "No need to failover. Returning success.");
+ return;
+ }
+
+ // Phase 3: ask the old active to yield from the election.
+ LOG.info("Asking " + oldActive + " to cede its active state for " +
+ timeout + "ms");
+ ZKFCProtocol oldZkfc = oldActive.getZKFCProxy(conf, timeout);
+ oldZkfc.cedeActive(timeout);
+
+ // Phase 4: wait for the normal election to make the local node
+ // active.
+ ActiveAttemptRecord attempt = waitForActiveAttempt(timeout + 60000);
+
+ if (attempt == null) {
+ // We didn't even make an attempt to become active.
+ synchronized(this) {
+ if (lastHealthState != State.SERVICE_HEALTHY) {
+ throw new ServiceFailedException("Unable to become active. " +
+ "Service became unhealthy while trying to failover.");
+ }
+ }
+
+ throw new ServiceFailedException("Unable to become active. " +
+ "Local node did not get an opportunity to do so from ZooKeeper, " +
+ "or the local node took too long to transition to active.");
+ }
+
+ // Phase 5. At this point, we made some attempt to become active. So we
+ // can tell the old active to rejoin if it wants. This allows a quick
+ // fail-back if we immediately crash.
+ oldZkfc.cedeActive(-1);
+
+ if (attempt.succeeded) {
+ LOG.info("Successfully became active. " + attempt.status);
+ } else {
+ // Propagate failure
+ String msg = "Failed to become active. " + attempt.status;
+ throw new ServiceFailedException(msg);
+ }
+ }
+
+ /**
+ * Ensure that the local node is in a healthy state, and thus
+ * eligible for graceful failover.
+ * @throws ServiceFailedException if the node is unhealthy
+ */
+ private synchronized void checkEligibleForFailover()
+ throws ServiceFailedException {
+ // Check health
+ if (this.getLastHealthState() != State.SERVICE_HEALTHY) {
+ throw new ServiceFailedException(
+ localTarget + " is not currently healthy. " +
+ "Cannot be failover target");
+ }
+ }
+
+ /**
+ * @return an {@link HAServiceTarget} for the current active node
+ * in the cluster, or null if no node is active.
+ * @throws IOException if a ZK-related issue occurs
+ * @throws InterruptedException if thread is interrupted
+ */
+ private HAServiceTarget getCurrentActive()
+ throws IOException, InterruptedException {
+ synchronized (elector) {
+ synchronized (this) {
+ byte[] activeData;
+ try {
+ activeData = elector.getActiveData();
+ } catch (ActiveNotFoundException e) {
+ return null;
+ } catch (KeeperException ke) {
+ throw new IOException(
+ "Unexpected ZooKeeper issue fetching active node info", ke);
+ }
+
+ HAServiceTarget oldActive = dataToTarget(activeData);
+ return oldActive;
+ }
+ }
+ }
+
+ /**
+ * Check the current state of the service, and join the election
+ * if it should be in the election.
+ */
+ private void recheckElectability() {
+ // Maintain lock ordering of elector -> ZKFC
+ synchronized (elector) {
+ synchronized (this) {
+ boolean healthy = lastHealthState == State.SERVICE_HEALTHY;
+
+ long remainingDelay = delayJoiningUntilNanotime - System.nanoTime();
+ if (remainingDelay > 0) {
+ if (healthy) {
+ LOG.info("Would have joined master election, but this node is " +
+ "prohibited from doing so for " +
+ TimeUnit.NANOSECONDS.toMillis(remainingDelay) + " more ms");
+ }
+ scheduleRecheck(remainingDelay);
+ return;
+ }
+
+ switch (lastHealthState) {
+ case SERVICE_HEALTHY:
+ elector.joinElection(targetToData(localTarget));
+ break;
+
+ case INITIALIZING:
+ LOG.info("Ensuring that " + localTarget + " does not " +
+ "participate in active master election");
+ elector.quitElection(false);
+ break;
+
+ case SERVICE_UNHEALTHY:
+ case SERVICE_NOT_RESPONDING:
+ LOG.info("Quitting master election for " + localTarget +
+ " and marking that fencing is necessary");
+ elector.quitElection(true);
+ break;
+
+ case HEALTH_MONITOR_FAILED:
+ fatalError("Health monitor failed!");
+ break;
+
+ default:
+ throw new IllegalArgumentException("Unhandled state:" + lastHealthState);
+ }
+ }
+ }
+ }
+
+ /**
+ * Schedule a call to {@link #recheckElectability()} in the future.
+ */
+ private void scheduleRecheck(long whenNanos) {
+ delayExecutor.schedule(
+ new Runnable() {
+ @Override
+ public void run() {
+ try {
+ recheckElectability();
+ } catch (Throwable t) {
+ fatalError("Failed to recheck electability: " +
+ StringUtils.stringifyException(t));
+ }
+ }
+ },
+ whenNanos, TimeUnit.NANOSECONDS);
+ }
/**
* @return the last health state passed to the FC
* by the HealthMonitor.
*/
@VisibleForTesting
- State getLastHealthState() {
+ synchronized State getLastHealthState() {
return lastHealthState;
}
+
+ private synchronized void setLastHealthState(HealthMonitor.State newState) {
+ LOG.info("Local service " + localTarget +
+ " entered state: " + newState);
+ lastHealthState = newState;
+ }
@VisibleForTesting
ActiveStandbyElector getElectorForTests() {
return elector;
}
+
+ @VisibleForTesting
+ ZKFCRpcServer getRpcServerForTests() {
+ return rpcServer;
+ }
/**
* Callbacks from elector
*/
class ElectorCallbacks implements ActiveStandbyElectorCallback {
@Override
- public void becomeActive() {
+ public void becomeActive() throws ServiceFailedException {
ZKFailoverController.this.becomeActive();
}
@@ -319,31 +835,13 @@
@Override
public void fenceOldActive(byte[] data) {
- HAServiceTarget target = dataToTarget(data);
-
- LOG.info("Should fence: " + target);
- boolean gracefulWorked = new FailoverController(conf)
- .tryGracefulFence(target);
- if (gracefulWorked) {
- // It's possible that it's in standby but just about to go into active,
- // no? Is there some race here?
- LOG.info("Successfully transitioned " + target + " to standby " +
- "state without fencing");
- return;
- }
-
- try {
- target.checkFencingConfigured();
- } catch (BadFencingConfigurationException e) {
- LOG.error("Couldn't fence old active " + target, e);
- // TODO: see below todo
- throw new RuntimeException(e);
- }
-
- if (!target.getFencer().fence(target)) {
- // TODO: this will end up in some kind of tight loop,
- // won't it? We need some kind of backoff
- throw new RuntimeException("Unable to fence " + target);
+ ZKFailoverController.this.fenceOldActive(data);
+ }
+
+ @Override
+ public String toString() {
+ synchronized (ZKFailoverController.this) {
+ return "Elector callbacks for " + localTarget;
}
}
}
@@ -354,36 +852,21 @@
class HealthCallbacks implements HealthMonitor.Callback {
@Override
public void enteredState(HealthMonitor.State newState) {
- LOG.info("Local service " + localTarget +
- " entered state: " + newState);
- switch (newState) {
- case SERVICE_HEALTHY:
- LOG.info("Joining master election for " + localTarget);
- elector.joinElection(targetToData(localTarget));
- break;
-
- case INITIALIZING:
- LOG.info("Ensuring that " + localTarget + " does not " +
- "participate in active master election");
- elector.quitElection(false);
- break;
-
- case SERVICE_UNHEALTHY:
- case SERVICE_NOT_RESPONDING:
- LOG.info("Quitting master election for " + localTarget +
- " and marking that fencing is necessary");
- elector.quitElection(true);
- break;
-
- case HEALTH_MONITOR_FAILED:
- fatalError("Health monitor failed!");
- break;
-
- default:
- throw new IllegalArgumentException("Unhandled state:" + newState);
- }
-
- lastHealthState = newState;
+ setLastHealthState(newState);
+ recheckElectability();
}
}
+
+ private static class ActiveAttemptRecord {
+ private final boolean succeeded;
+ private final String status;
+ private final long nanoTime;
+
+ public ActiveAttemptRecord(boolean succeeded, String status) {
+ this.succeeded = succeeded;
+ this.status = status;
+ this.nanoTime = System.nanoTime();
+ }
+ }
+
}
diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ha/protocolPB/HAServiceProtocolClientSideTranslatorPB.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ha/protocolPB/HAServiceProtocolClientSideTranslatorPB.java
index c269bd6..589ccd1 100644
--- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ha/protocolPB/HAServiceProtocolClientSideTranslatorPB.java
+++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ha/protocolPB/HAServiceProtocolClientSideTranslatorPB.java
@@ -30,13 +30,14 @@
import org.apache.hadoop.ha.HAServiceStatus;
import org.apache.hadoop.ha.proto.HAServiceProtocolProtos.GetServiceStatusRequestProto;
import org.apache.hadoop.ha.proto.HAServiceProtocolProtos.GetServiceStatusResponseProto;
+import org.apache.hadoop.ha.proto.HAServiceProtocolProtos.HAStateChangeRequestInfoProto;
+import org.apache.hadoop.ha.proto.HAServiceProtocolProtos.HARequestSource;
import org.apache.hadoop.ha.proto.HAServiceProtocolProtos.HAServiceStateProto;
import org.apache.hadoop.ha.proto.HAServiceProtocolProtos.MonitorHealthRequestProto;
import org.apache.hadoop.ha.proto.HAServiceProtocolProtos.TransitionToActiveRequestProto;
import org.apache.hadoop.ha.proto.HAServiceProtocolProtos.TransitionToStandbyRequestProto;
import org.apache.hadoop.ipc.ProtobufHelper;
import org.apache.hadoop.ipc.ProtobufRpcEngine;
-import org.apache.hadoop.ipc.ProtocolSignature;
import org.apache.hadoop.ipc.ProtocolTranslator;
import org.apache.hadoop.ipc.RPC;
import org.apache.hadoop.security.UserGroupInformation;
@@ -57,10 +58,6 @@
private final static RpcController NULL_CONTROLLER = null;
private final static MonitorHealthRequestProto MONITOR_HEALTH_REQ =
MonitorHealthRequestProto.newBuilder().build();
- private final static TransitionToActiveRequestProto TRANSITION_TO_ACTIVE_REQ =
- TransitionToActiveRequestProto.newBuilder().build();
- private final static TransitionToStandbyRequestProto TRANSITION_TO_STANDBY_REQ =
- TransitionToStandbyRequestProto.newBuilder().build();
private final static GetServiceStatusRequestProto GET_SERVICE_STATUS_REQ =
GetServiceStatusRequestProto.newBuilder().build();
@@ -94,18 +91,25 @@
}
@Override
- public void transitionToActive() throws IOException {
+ public void transitionToActive(StateChangeRequestInfo reqInfo) throws IOException {
try {
- rpcProxy.transitionToActive(NULL_CONTROLLER, TRANSITION_TO_ACTIVE_REQ);
+ TransitionToActiveRequestProto req =
+ TransitionToActiveRequestProto.newBuilder()
+ .setReqInfo(convert(reqInfo)).build();
+
+ rpcProxy.transitionToActive(NULL_CONTROLLER, req);
} catch (ServiceException e) {
throw ProtobufHelper.getRemoteException(e);
}
}
@Override
- public void transitionToStandby() throws IOException {
+ public void transitionToStandby(StateChangeRequestInfo reqInfo) throws IOException {
try {
- rpcProxy.transitionToStandby(NULL_CONTROLLER, TRANSITION_TO_STANDBY_REQ);
+ TransitionToStandbyRequestProto req =
+ TransitionToStandbyRequestProto.newBuilder()
+ .setReqInfo(convert(reqInfo)).build();
+ rpcProxy.transitionToStandby(NULL_CONTROLLER, req);
} catch (ServiceException e) {
throw ProtobufHelper.getRemoteException(e);
}
@@ -143,6 +147,27 @@
}
}
+ private HAStateChangeRequestInfoProto convert(StateChangeRequestInfo reqInfo) {
+ HARequestSource src;
+ switch (reqInfo.getSource()) {
+ case REQUEST_BY_USER:
+ src = HARequestSource.REQUEST_BY_USER;
+ break;
+ case REQUEST_BY_USER_FORCED:
+ src = HARequestSource.REQUEST_BY_USER_FORCED;
+ break;
+ case REQUEST_BY_ZKFC:
+ src = HARequestSource.REQUEST_BY_ZKFC;
+ break;
+ default:
+ throw new IllegalArgumentException("Bad source: " + reqInfo.getSource());
+ }
+ return HAStateChangeRequestInfoProto.newBuilder()
+ .setReqSource(src)
+ .build();
+ }
+
+
@Override
public void close() {
RPC.stopProxy(rpcProxy);
diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ha/protocolPB/HAServiceProtocolServerSideTranslatorPB.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ha/protocolPB/HAServiceProtocolServerSideTranslatorPB.java
index b5b6a89..63bfbca 100644
--- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ha/protocolPB/HAServiceProtocolServerSideTranslatorPB.java
+++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ha/protocolPB/HAServiceProtocolServerSideTranslatorPB.java
@@ -19,12 +19,17 @@
import java.io.IOException;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.classification.InterfaceStability;
import org.apache.hadoop.ha.HAServiceProtocol;
+import org.apache.hadoop.ha.HAServiceProtocol.StateChangeRequestInfo;
+import org.apache.hadoop.ha.HAServiceProtocol.RequestSource;
import org.apache.hadoop.ha.HAServiceStatus;
import org.apache.hadoop.ha.proto.HAServiceProtocolProtos.GetServiceStatusRequestProto;
import org.apache.hadoop.ha.proto.HAServiceProtocolProtos.GetServiceStatusResponseProto;
+import org.apache.hadoop.ha.proto.HAServiceProtocolProtos.HAStateChangeRequestInfoProto;
import org.apache.hadoop.ha.proto.HAServiceProtocolProtos.HAServiceStateProto;
import org.apache.hadoop.ha.proto.HAServiceProtocolProtos.MonitorHealthRequestProto;
import org.apache.hadoop.ha.proto.HAServiceProtocolProtos.MonitorHealthResponseProto;
@@ -56,6 +61,8 @@
TransitionToActiveResponseProto.newBuilder().build();
private static final TransitionToStandbyResponseProto TRANSITION_TO_STANDBY_RESP =
TransitionToStandbyResponseProto.newBuilder().build();
+ private static final Log LOG = LogFactory.getLog(
+ HAServiceProtocolServerSideTranslatorPB.class);
public HAServiceProtocolServerSideTranslatorPB(HAServiceProtocol server) {
this.server = server;
@@ -71,13 +78,33 @@
throw new ServiceException(e);
}
}
+
+ private StateChangeRequestInfo convert(HAStateChangeRequestInfoProto proto) {
+ RequestSource src;
+ switch (proto.getReqSource()) {
+ case REQUEST_BY_USER:
+ src = RequestSource.REQUEST_BY_USER;
+ break;
+ case REQUEST_BY_USER_FORCED:
+ src = RequestSource.REQUEST_BY_USER_FORCED;
+ break;
+ case REQUEST_BY_ZKFC:
+ src = RequestSource.REQUEST_BY_ZKFC;
+ break;
+ default:
+ LOG.warn("Unknown request source: " + proto.getReqSource());
+ src = null;
+ }
+
+ return new StateChangeRequestInfo(src);
+ }
@Override
public TransitionToActiveResponseProto transitionToActive(
RpcController controller, TransitionToActiveRequestProto request)
throws ServiceException {
try {
- server.transitionToActive();
+ server.transitionToActive(convert(request.getReqInfo()));
return TRANSITION_TO_ACTIVE_RESP;
} catch(IOException e) {
throw new ServiceException(e);
@@ -89,7 +116,7 @@
RpcController controller, TransitionToStandbyRequestProto request)
throws ServiceException {
try {
- server.transitionToStandby();
+ server.transitionToStandby(convert(request.getReqInfo()));
return TRANSITION_TO_STANDBY_RESP;
} catch(IOException e) {
throw new ServiceException(e);
diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ha/protocolPB/ZKFCProtocolClientSideTranslatorPB.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ha/protocolPB/ZKFCProtocolClientSideTranslatorPB.java
new file mode 100644
index 0000000..62896fa8
--- /dev/null
+++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ha/protocolPB/ZKFCProtocolClientSideTranslatorPB.java
@@ -0,0 +1,90 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.ha.protocolPB;
+
+import java.io.Closeable;
+import java.io.IOException;
+import java.net.InetSocketAddress;
+
+import javax.net.SocketFactory;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.ha.ZKFCProtocol;
+import org.apache.hadoop.ha.proto.ZKFCProtocolProtos.CedeActiveRequestProto;
+import org.apache.hadoop.ha.proto.ZKFCProtocolProtos.GracefulFailoverRequestProto;
+import org.apache.hadoop.ipc.ProtobufHelper;
+import org.apache.hadoop.ipc.ProtobufRpcEngine;
+import org.apache.hadoop.ipc.ProtocolTranslator;
+import org.apache.hadoop.ipc.RPC;
+import org.apache.hadoop.security.AccessControlException;
+import org.apache.hadoop.security.UserGroupInformation;
+
+import com.google.protobuf.RpcController;
+import com.google.protobuf.ServiceException;
+
+
+public class ZKFCProtocolClientSideTranslatorPB implements
+ ZKFCProtocol, Closeable, ProtocolTranslator {
+
+ private final static RpcController NULL_CONTROLLER = null;
+ private final ZKFCProtocolPB rpcProxy;
+
+ public ZKFCProtocolClientSideTranslatorPB(
+ InetSocketAddress addr, Configuration conf,
+ SocketFactory socketFactory, int timeout) throws IOException {
+ RPC.setProtocolEngine(conf, ZKFCProtocolPB.class,
+ ProtobufRpcEngine.class);
+ rpcProxy = RPC.getProxy(ZKFCProtocolPB.class,
+ RPC.getProtocolVersion(ZKFCProtocolPB.class), addr,
+ UserGroupInformation.getCurrentUser(), conf, socketFactory, timeout);
+ }
+
+ @Override
+ public void cedeActive(int millisToCede) throws IOException,
+ AccessControlException {
+ try {
+ CedeActiveRequestProto req = CedeActiveRequestProto.newBuilder()
+ .setMillisToCede(millisToCede)
+ .build();
+ rpcProxy.cedeActive(NULL_CONTROLLER, req);
+ } catch (ServiceException e) {
+ throw ProtobufHelper.getRemoteException(e);
+ }
+ }
+
+ @Override
+ public void gracefulFailover() throws IOException, AccessControlException {
+ try {
+ rpcProxy.gracefulFailover(NULL_CONTROLLER,
+ GracefulFailoverRequestProto.getDefaultInstance());
+ } catch (ServiceException e) {
+ throw ProtobufHelper.getRemoteException(e);
+ }
+ }
+
+
+ @Override
+ public void close() {
+ RPC.stopProxy(rpcProxy);
+ }
+
+ @Override
+ public Object getUnderlyingProxyObject() {
+ return rpcProxy;
+ }
+}
diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ha/protocolPB/ZKFCProtocolPB.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ha/protocolPB/ZKFCProtocolPB.java
new file mode 100644
index 0000000..348004f
--- /dev/null
+++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ha/protocolPB/ZKFCProtocolPB.java
@@ -0,0 +1,39 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.ha.protocolPB;
+
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.classification.InterfaceStability;
+import org.apache.hadoop.fs.CommonConfigurationKeys;
+import org.apache.hadoop.ha.proto.ZKFCProtocolProtos.ZKFCProtocolService;
+import org.apache.hadoop.ipc.ProtocolInfo;
+import org.apache.hadoop.ipc.VersionedProtocol;
+import org.apache.hadoop.security.KerberosInfo;
+
+@KerberosInfo(
+ serverPrincipal=CommonConfigurationKeys.HADOOP_SECURITY_SERVICE_USER_NAME_KEY)
+@ProtocolInfo(protocolName = "org.apache.hadoop.ha.ZKFCProtocol",
+ protocolVersion = 1)
+@InterfaceAudience.Public
+@InterfaceStability.Evolving
+public interface ZKFCProtocolPB extends
+ ZKFCProtocolService.BlockingInterface, VersionedProtocol {
+ /**
+ * If any methods need annotation, it can be added here
+ */
+}
\ No newline at end of file
diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ha/protocolPB/ZKFCProtocolServerSideTranslatorPB.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ha/protocolPB/ZKFCProtocolServerSideTranslatorPB.java
new file mode 100644
index 0000000..5494998
--- /dev/null
+++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ha/protocolPB/ZKFCProtocolServerSideTranslatorPB.java
@@ -0,0 +1,88 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.ha.protocolPB;
+
+import java.io.IOException;
+
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.classification.InterfaceStability;
+import org.apache.hadoop.ha.ZKFCProtocol;
+import org.apache.hadoop.ha.proto.ZKFCProtocolProtos.CedeActiveRequestProto;
+import org.apache.hadoop.ha.proto.ZKFCProtocolProtos.CedeActiveResponseProto;
+import org.apache.hadoop.ha.proto.ZKFCProtocolProtos.GracefulFailoverRequestProto;
+import org.apache.hadoop.ha.proto.ZKFCProtocolProtos.GracefulFailoverResponseProto;
+import org.apache.hadoop.ipc.ProtocolSignature;
+import org.apache.hadoop.ipc.RPC;
+
+import com.google.protobuf.RpcController;
+import com.google.protobuf.ServiceException;
+
+@InterfaceAudience.Private
+@InterfaceStability.Stable
+public class ZKFCProtocolServerSideTranslatorPB implements
+ ZKFCProtocolPB {
+ private final ZKFCProtocol server;
+
+ public ZKFCProtocolServerSideTranslatorPB(ZKFCProtocol server) {
+ this.server = server;
+ }
+
+ @Override
+ public CedeActiveResponseProto cedeActive(RpcController controller,
+ CedeActiveRequestProto request) throws ServiceException {
+ try {
+ server.cedeActive(request.getMillisToCede());
+ return CedeActiveResponseProto.getDefaultInstance();
+ } catch (IOException e) {
+ throw new ServiceException(e);
+ }
+ }
+
+ @Override
+ public GracefulFailoverResponseProto gracefulFailover(
+ RpcController controller, GracefulFailoverRequestProto request)
+ throws ServiceException {
+ try {
+ server.gracefulFailover();
+ return GracefulFailoverResponseProto.getDefaultInstance();
+ } catch (IOException e) {
+ throw new ServiceException(e);
+ }
+ }
+
+ @Override
+ public long getProtocolVersion(String protocol, long clientVersion)
+ throws IOException {
+ return RPC.getProtocolVersion(ZKFCProtocolPB.class);
+ }
+
+ @Override
+ public ProtocolSignature getProtocolSignature(String protocol,
+ long clientVersion, int clientMethodsHash) throws IOException {
+ if (!protocol.equals(RPC.getProtocolName(ZKFCProtocolPB.class))) {
+ throw new IOException("Serverside implements " +
+ RPC.getProtocolName(ZKFCProtocolPB.class) +
+ ". The following requested protocol is unknown: " + protocol);
+ }
+
+ return ProtocolSignature.getProtocolSignature(clientMethodsHash,
+ RPC.getProtocolVersion(ZKFCProtocolPB.class),
+ HAServiceProtocolPB.class);
+ }
+
+}
diff --git a/hadoop-common-project/hadoop-common/src/main/packages/templates/conf/hadoop-env.sh b/hadoop-common-project/hadoop-common/src/main/packages/templates/conf/hadoop-env.sh
index 8fea863..d8c731e 100644
--- a/hadoop-common-project/hadoop-common/src/main/packages/templates/conf/hadoop-env.sh
+++ b/hadoop-common-project/hadoop-common/src/main/packages/templates/conf/hadoop-env.sh
@@ -53,6 +53,10 @@
export HADOOP_SECONDARYNAMENODE_OPTS="-Dhadoop.security.logger=${HADOOP_SECURITY_LOGGER:-INFO,RFAS} -Dhdfs.audit.logger=${HDFS_AUDIT_LOGGER:-INFO,NullAppender} $HADOOP_SECONDARYNAMENODE_OPTS"
+# The ZKFC does not need a large heap, and keeping it small avoids
+# any potential for long GC pauses
+export HADOOP_ZKFC_OPTS="-Xmx256m $HADOOP_ZKFC_OPTS"
+
# The following applies to multiple commands (fs, dfs, fsck, distcp etc)
export HADOOP_CLIENT_OPTS="-Xmx128m $HADOOP_CLIENT_OPTS"
#HADOOP_JAVA_PLATFORM_OPTS="-XX:-UsePerfData $HADOOP_JAVA_PLATFORM_OPTS"
diff --git a/hadoop-common-project/hadoop-common/src/main/packages/templates/conf/hadoop-policy.xml b/hadoop-common-project/hadoop-common/src/main/packages/templates/conf/hadoop-policy.xml
index 2fd9f8d..131fecf 100644
--- a/hadoop-common-project/hadoop-common/src/main/packages/templates/conf/hadoop-policy.xml
+++ b/hadoop-common-project/hadoop-common/src/main/packages/templates/conf/hadoop-policy.xml
@@ -223,6 +223,12 @@
<description>ACL for HAService protocol used by HAAdmin to manage the
active and stand-by states of namenode.</description>
</property>
+ <property>
+ <name>security.zkfc.protocol.acl</name>
+ <value>*</value>
+ <description>ACL for access to the ZK Failover Controller
+ </description>
+ </property>
<property>
<name>security.mrhs.client.protocol.acl</name>
diff --git a/hadoop-common-project/hadoop-common/src/main/proto/HAServiceProtocol.proto b/hadoop-common-project/hadoop-common/src/main/proto/HAServiceProtocol.proto
index 70ba82b..70ecac8 100644
--- a/hadoop-common-project/hadoop-common/src/main/proto/HAServiceProtocol.proto
+++ b/hadoop-common-project/hadoop-common/src/main/proto/HAServiceProtocol.proto
@@ -27,6 +27,16 @@
STANDBY = 2;
}
+enum HARequestSource {
+ REQUEST_BY_USER = 0;
+ REQUEST_BY_USER_FORCED = 1;
+ REQUEST_BY_ZKFC = 2;
+}
+
+message HAStateChangeRequestInfoProto {
+ required HARequestSource reqSource = 1;
+}
+
/**
* void request
*/
@@ -43,6 +53,7 @@
* void request
*/
message TransitionToActiveRequestProto {
+ required HAStateChangeRequestInfoProto reqInfo = 1;
}
/**
@@ -55,6 +66,7 @@
* void request
*/
message TransitionToStandbyRequestProto {
+ required HAStateChangeRequestInfoProto reqInfo = 1;
}
/**
diff --git a/hadoop-common-project/hadoop-common/src/main/proto/ZKFCProtocol.proto b/hadoop-common-project/hadoop-common/src/main/proto/ZKFCProtocol.proto
new file mode 100644
index 0000000..1037b02
--- /dev/null
+++ b/hadoop-common-project/hadoop-common/src/main/proto/ZKFCProtocol.proto
@@ -0,0 +1,52 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+option java_package = "org.apache.hadoop.ha.proto";
+option java_outer_classname = "ZKFCProtocolProtos";
+option java_generic_services = true;
+option java_generate_equals_and_hash = true;
+
+message CedeActiveRequestProto {
+ required uint32 millisToCede = 1;
+}
+
+message CedeActiveResponseProto {
+}
+
+message GracefulFailoverRequestProto {
+}
+
+message GracefulFailoverResponseProto {
+}
+
+
+/**
+ * Protocol provides manual control of the ZK Failover Controllers
+ */
+service ZKFCProtocolService {
+ /**
+ * Request that the service cede its active state, and quit the election
+ * for some amount of time
+ */
+ rpc cedeActive(CedeActiveRequestProto)
+ returns(CedeActiveResponseProto);
+
+
+ rpc gracefulFailover(GracefulFailoverRequestProto)
+ returns(GracefulFailoverResponseProto);
+}
diff --git a/hadoop-common-project/hadoop-common/src/main/resources/core-default.xml b/hadoop-common-project/hadoop-common/src/main/resources/core-default.xml
index f94e497..e7e3c1b 100644
--- a/hadoop-common-project/hadoop-common/src/main/resources/core-default.xml
+++ b/hadoop-common-project/hadoop-common/src/main/resources/core-default.xml
@@ -944,4 +944,63 @@
</description>
</property>
+<property>
+ <name>ha.zookeeper.quorum</name>
+ <description>
+ A list of ZooKeeper server addresses, separated by commas, that are
+ to be used by the ZKFailoverController in automatic failover.
+ </description>
+</property>
+
+<property>
+ <name>ha.zookeeper.session-timeout.ms</name>
+ <value>5000</value>
+ <description>
+ The session timeout to use when the ZKFC connects to ZooKeeper.
+ Setting this value to a lower value implies that server crashes
+ will be detected more quickly, but risks triggering failover too
+ aggressively in the case of a transient error or network blip.
+ </description>
+</property>
+
+<property>
+ <name>ha.zookeeper.parent-znode</name>
+ <value>/hadoop-ha</value>
+ <description>
+ The ZooKeeper znode under which the ZK failover controller stores
+ its information. Note that the nameservice ID is automatically
+ appended to this znode, so it is not normally necessary to
+ configure this, even in a federated environment.
+ </description>
+</property>
+
+<property>
+ <name>ha.zookeeper.acl</name>
+ <value>world:anyone:rwcda</value>
+ <description>
+ A comma-separated list of ZooKeeper ACLs to apply to the znodes
+ used by automatic failover. These ACLs are specified in the same
+ format as used by the ZooKeeper CLI.
+
+ If the ACL itself contains secrets, you may instead specify a
+ path to a file, prefixed with the '@' symbol, and the value of
+ this configuration will be loaded from within.
+ </description>
+</property>
+
+<property>
+ <name>ha.zookeeper.auth</name>
+ <value></value>
+ <description>
+ A comma-separated list of ZooKeeper authentications to add when
+ connecting to ZooKeeper. These are specified in the same format
+ as used by the "addauth" command in the ZK CLI. It is
+ important that the authentications specified here are sufficient
+ to access znodes with the ACL specified in ha.zookeeper.acl.
+
+ If the auths contain secrets, you may instead specify a
+ path to a file, prefixed with the '@' symbol, and the value of
+ this configuration will be loaded from within.
+ </description>
+</property>
</configuration>
diff --git a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ha/ActiveStandbyElectorTestUtil.java b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ha/ActiveStandbyElectorTestUtil.java
index dc87ebd..bd9b40a 100644
--- a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ha/ActiveStandbyElectorTestUtil.java
+++ b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ha/ActiveStandbyElectorTestUtil.java
@@ -19,16 +19,25 @@
import java.util.Arrays;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.test.MultithreadedTestUtil.TestContext;
+import org.apache.hadoop.util.StringUtils;
import org.apache.zookeeper.KeeperException.NoNodeException;
import org.apache.zookeeper.data.Stat;
import org.apache.zookeeper.server.ZooKeeperServer;
public abstract class ActiveStandbyElectorTestUtil {
+
+ private static final Log LOG = LogFactory.getLog(
+ ActiveStandbyElectorTestUtil.class);
+ private static final long LOG_INTERVAL_MS = 500;
public static void waitForActiveLockData(TestContext ctx,
ZooKeeperServer zks, String parentDir, byte[] activeData)
throws Exception {
+ long st = System.currentTimeMillis();
+ long lastPrint = st;
while (true) {
if (ctx != null) {
ctx.checkException();
@@ -42,10 +51,18 @@
Arrays.equals(activeData, data)) {
return;
}
+ if (System.currentTimeMillis() > lastPrint + LOG_INTERVAL_MS) {
+ LOG.info("Cur data: " + StringUtils.byteToHexString(data));
+ lastPrint = System.currentTimeMillis();
+ }
} catch (NoNodeException nne) {
if (activeData == null) {
return;
}
+ if (System.currentTimeMillis() > lastPrint + LOG_INTERVAL_MS) {
+ LOG.info("Cur data: no node");
+ lastPrint = System.currentTimeMillis();
+ }
}
Thread.sleep(50);
}
diff --git a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ha/ClientBaseWithFixes.java b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ha/ClientBaseWithFixes.java
new file mode 100644
index 0000000..858162a
--- /dev/null
+++ b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ha/ClientBaseWithFixes.java
@@ -0,0 +1,452 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.ha;
+
+import java.io.BufferedReader;
+import java.io.File;
+import java.io.IOException;
+import java.io.InputStreamReader;
+import java.io.OutputStream;
+import java.net.Socket;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
+
+import org.apache.zookeeper.PortAssignment;
+import org.apache.zookeeper.TestableZooKeeper;
+import org.apache.zookeeper.WatchedEvent;
+import org.apache.zookeeper.Watcher;
+import org.apache.zookeeper.Watcher.Event.KeeperState;
+import org.apache.zookeeper.ZKTestCase;
+import org.apache.zookeeper.ZooKeeper;
+import org.apache.zookeeper.server.ServerCnxnFactory;
+import org.apache.zookeeper.server.ServerCnxnFactoryAccessor;
+import org.apache.zookeeper.server.ZKDatabase;
+import org.apache.zookeeper.server.ZooKeeperServer;
+import org.apache.zookeeper.server.persistence.FileTxnLog;
+import org.junit.After;
+import org.junit.Assert;
+import org.junit.Before;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Copy-paste of ClientBase from ZooKeeper, but without any of the
+ * JMXEnv verification. There seems to be a bug ZOOKEEPER-1438
+ * which causes spurious failures in the JMXEnv verification when
+ * we run these tests with the upstream ClientBase.
+ */
+public abstract class ClientBaseWithFixes extends ZKTestCase {
+ protected static final Logger LOG = LoggerFactory.getLogger(ClientBaseWithFixes.class);
+
+ public static int CONNECTION_TIMEOUT = 30000;
+ static final File BASETEST =
+ new File(System.getProperty("build.test.dir", "build"));
+
+ protected String hostPort = "127.0.0.1:" + PortAssignment.unique();
+ protected int maxCnxns = 0;
+ protected ServerCnxnFactory serverFactory = null;
+ protected File tmpDir = null;
+
+ long initialFdCount;
+
+ public ClientBaseWithFixes() {
+ super();
+ }
+
+ /**
+ * In general don't use this. Only use in the special case that you
+ * want to ignore results (for whatever reason) in your test. Don't
+ * use empty watchers in real code!
+ *
+ */
+ protected class NullWatcher implements Watcher {
+ public void process(WatchedEvent event) { /* nada */ }
+ }
+
+ protected static class CountdownWatcher implements Watcher {
+ // XXX this doesn't need to be volatile! (Should probably be final)
+ volatile CountDownLatch clientConnected;
+ volatile boolean connected;
+
+ public CountdownWatcher() {
+ reset();
+ }
+ synchronized public void reset() {
+ clientConnected = new CountDownLatch(1);
+ connected = false;
+ }
+ synchronized public void process(WatchedEvent event) {
+ if (event.getState() == KeeperState.SyncConnected ||
+ event.getState() == KeeperState.ConnectedReadOnly) {
+ connected = true;
+ notifyAll();
+ clientConnected.countDown();
+ } else {
+ connected = false;
+ notifyAll();
+ }
+ }
+ synchronized boolean isConnected() {
+ return connected;
+ }
+ synchronized void waitForConnected(long timeout) throws InterruptedException, TimeoutException {
+ long expire = System.currentTimeMillis() + timeout;
+ long left = timeout;
+ while(!connected && left > 0) {
+ wait(left);
+ left = expire - System.currentTimeMillis();
+ }
+ if (!connected) {
+ throw new TimeoutException("Did not connect");
+
+ }
+ }
+ synchronized void waitForDisconnected(long timeout) throws InterruptedException, TimeoutException {
+ long expire = System.currentTimeMillis() + timeout;
+ long left = timeout;
+ while(connected && left > 0) {
+ wait(left);
+ left = expire - System.currentTimeMillis();
+ }
+ if (connected) {
+ throw new TimeoutException("Did not disconnect");
+
+ }
+ }
+ }
+
+ protected TestableZooKeeper createClient()
+ throws IOException, InterruptedException
+ {
+ return createClient(hostPort);
+ }
+
+ protected TestableZooKeeper createClient(String hp)
+ throws IOException, InterruptedException
+ {
+ CountdownWatcher watcher = new CountdownWatcher();
+ return createClient(watcher, hp);
+ }
+
+ private LinkedList<ZooKeeper> allClients;
+ private boolean allClientsSetup = false;
+
+ protected TestableZooKeeper createClient(CountdownWatcher watcher, String hp)
+ throws IOException, InterruptedException
+ {
+ return createClient(watcher, hp, CONNECTION_TIMEOUT);
+ }
+
+ protected TestableZooKeeper createClient(CountdownWatcher watcher,
+ String hp, int timeout)
+ throws IOException, InterruptedException
+ {
+ watcher.reset();
+ TestableZooKeeper zk = new TestableZooKeeper(hp, timeout, watcher);
+ if (!watcher.clientConnected.await(timeout, TimeUnit.MILLISECONDS))
+ {
+ Assert.fail("Unable to connect to server");
+ }
+ synchronized(this) {
+ if (!allClientsSetup) {
+ LOG.error("allClients never setup");
+ Assert.fail("allClients never setup");
+ }
+ if (allClients != null) {
+ allClients.add(zk);
+ } else {
+ // test done - close the zk, not needed
+ zk.close();
+ }
+ }
+
+
+ return zk;
+ }
+
+ public static class HostPort {
+ String host;
+ int port;
+ public HostPort(String host, int port) {
+ this.host = host;
+ this.port = port;
+ }
+ }
+ public static List<HostPort> parseHostPortList(String hplist) {
+ ArrayList<HostPort> alist = new ArrayList<HostPort>();
+ for (String hp: hplist.split(",")) {
+ int idx = hp.lastIndexOf(':');
+ String host = hp.substring(0, idx);
+ int port;
+ try {
+ port = Integer.parseInt(hp.substring(idx + 1));
+ } catch(RuntimeException e) {
+ throw new RuntimeException("Problem parsing " + hp + e.toString());
+ }
+ alist.add(new HostPort(host,port));
+ }
+ return alist;
+ }
+
+ /**
+ * Send the 4letterword
+ * @param host the destination host
+ * @param port the destination port
+ * @param cmd the 4letterword
+ * @return
+ * @throws IOException
+ */
+ public static String send4LetterWord(String host, int port, String cmd)
+ throws IOException
+ {
+ LOG.info("connecting to " + host + " " + port);
+ Socket sock = new Socket(host, port);
+ BufferedReader reader = null;
+ try {
+ OutputStream outstream = sock.getOutputStream();
+ outstream.write(cmd.getBytes());
+ outstream.flush();
+ // this replicates NC - close the output stream before reading
+ sock.shutdownOutput();
+
+ reader =
+ new BufferedReader(
+ new InputStreamReader(sock.getInputStream()));
+ StringBuilder sb = new StringBuilder();
+ String line;
+ while((line = reader.readLine()) != null) {
+ sb.append(line + "\n");
+ }
+ return sb.toString();
+ } finally {
+ sock.close();
+ if (reader != null) {
+ reader.close();
+ }
+ }
+ }
+
+ public static boolean waitForServerUp(String hp, long timeout) {
+ long start = System.currentTimeMillis();
+ while (true) {
+ try {
+ // if there are multiple hostports, just take the first one
+ HostPort hpobj = parseHostPortList(hp).get(0);
+ String result = send4LetterWord(hpobj.host, hpobj.port, "stat");
+ if (result.startsWith("Zookeeper version:") &&
+ !result.contains("READ-ONLY")) {
+ return true;
+ }
+ } catch (IOException e) {
+ // ignore as this is expected
+ LOG.info("server " + hp + " not up " + e);
+ }
+
+ if (System.currentTimeMillis() > start + timeout) {
+ break;
+ }
+ try {
+ Thread.sleep(250);
+ } catch (InterruptedException e) {
+ // ignore
+ }
+ }
+ return false;
+ }
+ public static boolean waitForServerDown(String hp, long timeout) {
+ long start = System.currentTimeMillis();
+ while (true) {
+ try {
+ HostPort hpobj = parseHostPortList(hp).get(0);
+ send4LetterWord(hpobj.host, hpobj.port, "stat");
+ } catch (IOException e) {
+ return true;
+ }
+
+ if (System.currentTimeMillis() > start + timeout) {
+ break;
+ }
+ try {
+ Thread.sleep(250);
+ } catch (InterruptedException e) {
+ // ignore
+ }
+ }
+ return false;
+ }
+
+ public static File createTmpDir() throws IOException {
+ return createTmpDir(BASETEST);
+ }
+ static File createTmpDir(File parentDir) throws IOException {
+ File tmpFile = File.createTempFile("test", ".junit", parentDir);
+ // don't delete tmpFile - this ensures we don't attempt to create
+ // a tmpDir with a duplicate name
+ File tmpDir = new File(tmpFile + ".dir");
+ Assert.assertFalse(tmpDir.exists()); // never true if tmpfile does it's job
+ Assert.assertTrue(tmpDir.mkdirs());
+
+ return tmpDir;
+ }
+ private static int getPort(String hostPort) {
+ String[] split = hostPort.split(":");
+ String portstr = split[split.length-1];
+ String[] pc = portstr.split("/");
+ if (pc.length > 1) {
+ portstr = pc[0];
+ }
+ return Integer.parseInt(portstr);
+ }
+
+ static ServerCnxnFactory createNewServerInstance(File dataDir,
+ ServerCnxnFactory factory, String hostPort, int maxCnxns)
+ throws IOException, InterruptedException
+ {
+ ZooKeeperServer zks = new ZooKeeperServer(dataDir, dataDir, 3000);
+ final int PORT = getPort(hostPort);
+ if (factory == null) {
+ factory = ServerCnxnFactory.createFactory(PORT, maxCnxns);
+ }
+ factory.startup(zks);
+ Assert.assertTrue("waiting for server up",
+ ClientBaseWithFixes.waitForServerUp("127.0.0.1:" + PORT,
+ CONNECTION_TIMEOUT));
+
+ return factory;
+ }
+
+ static void shutdownServerInstance(ServerCnxnFactory factory,
+ String hostPort)
+ {
+ if (factory != null) {
+ ZKDatabase zkDb;
+ {
+ ZooKeeperServer zs = getServer(factory);
+
+ zkDb = zs.getZKDatabase();
+ }
+ factory.shutdown();
+ try {
+ zkDb.close();
+ } catch (IOException ie) {
+ LOG.warn("Error closing logs ", ie);
+ }
+ final int PORT = getPort(hostPort);
+
+ Assert.assertTrue("waiting for server down",
+ ClientBaseWithFixes.waitForServerDown("127.0.0.1:" + PORT,
+ CONNECTION_TIMEOUT));
+ }
+ }
+
+ /**
+ * Test specific setup
+ */
+ public static void setupTestEnv() {
+ // during the tests we run with 100K prealloc in the logs.
+ // on windows systems prealloc of 64M was seen to take ~15seconds
+ // resulting in test Assert.failure (client timeout on first session).
+ // set env and directly in order to handle static init/gc issues
+ System.setProperty("zookeeper.preAllocSize", "100");
+ FileTxnLog.setPreallocSize(100 * 1024);
+ }
+
+ protected void setUpAll() throws Exception {
+ allClients = new LinkedList<ZooKeeper>();
+ allClientsSetup = true;
+ }
+
+ @Before
+ public void setUp() throws Exception {
+ BASETEST.mkdirs();
+
+ setupTestEnv();
+
+ setUpAll();
+
+ tmpDir = createTmpDir(BASETEST);
+
+ startServer();
+
+ LOG.info("Client test setup finished");
+ }
+
+ protected void startServer() throws Exception {
+ LOG.info("STARTING server");
+ serverFactory = createNewServerInstance(tmpDir, serverFactory, hostPort, maxCnxns);
+ }
+
+ protected void stopServer() throws Exception {
+ LOG.info("STOPPING server");
+ shutdownServerInstance(serverFactory, hostPort);
+ serverFactory = null;
+ }
+
+
+ protected static ZooKeeperServer getServer(ServerCnxnFactory fac) {
+ ZooKeeperServer zs = ServerCnxnFactoryAccessor.getZkServer(fac);
+
+ return zs;
+ }
+
+ protected void tearDownAll() throws Exception {
+ synchronized (this) {
+ if (allClients != null) for (ZooKeeper zk : allClients) {
+ try {
+ if (zk != null)
+ zk.close();
+ } catch (InterruptedException e) {
+ LOG.warn("ignoring interrupt", e);
+ }
+ }
+ allClients = null;
+ }
+ }
+
+ @After
+ public void tearDown() throws Exception {
+ LOG.info("tearDown starting");
+
+ tearDownAll();
+
+ stopServer();
+
+ if (tmpDir != null) {
+ Assert.assertTrue("delete " + tmpDir.toString(), recursiveDelete(tmpDir));
+ }
+
+ // This has to be set to null when the same instance of this class is reused between test cases
+ serverFactory = null;
+ }
+
+ public static boolean recursiveDelete(File d) {
+ if (d.isDirectory()) {
+ File children[] = d.listFiles();
+ for (File f : children) {
+ Assert.assertTrue("delete " + f.toString(), recursiveDelete(f));
+ }
+ }
+ return d.delete();
+ }
+}
diff --git a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ha/DummyHAService.java b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ha/DummyHAService.java
index a8cd3d6..c38bc53 100644
--- a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ha/DummyHAService.java
+++ b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ha/DummyHAService.java
@@ -22,6 +22,8 @@
import java.net.InetSocketAddress;
import java.util.ArrayList;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.ha.HAServiceProtocol.HAServiceState;
import org.apache.hadoop.security.AccessControlException;
@@ -34,13 +36,19 @@
* a mock implementation.
*/
class DummyHAService extends HAServiceTarget {
+ public static final Log LOG = LogFactory.getLog(DummyHAService.class);
+ private static final String DUMMY_FENCE_KEY = "dummy.fence.key";
volatile HAServiceState state;
HAServiceProtocol proxy;
+ ZKFCProtocol zkfcProxy = null;
NodeFencer fencer;
InetSocketAddress address;
boolean isHealthy = true;
boolean actUnreachable = false;
- boolean failToBecomeActive;
+ boolean failToBecomeActive, failToBecomeStandby, failToFence;
+
+ DummySharedResource sharedResource;
+ public int fenceCount = 0;
static ArrayList<DummyHAService> instances = Lists.newArrayList();
int index;
@@ -48,7 +56,14 @@
DummyHAService(HAServiceState state, InetSocketAddress address) {
this.state = state;
this.proxy = makeMock();
- this.fencer = Mockito.mock(NodeFencer.class);
+ try {
+ Configuration conf = new Configuration();
+ conf.set(DUMMY_FENCE_KEY, DummyFencer.class.getName());
+ this.fencer = Mockito.spy(
+ NodeFencer.create(conf, DUMMY_FENCE_KEY));
+ } catch (BadFencingConfigurationException e) {
+ throw new RuntimeException(e);
+ }
this.address = address;
synchronized (instances) {
instances.add(this);
@@ -56,6 +71,10 @@
}
}
+ public void setSharedResource(DummySharedResource rsrc) {
+ this.sharedResource = rsrc;
+ }
+
private HAServiceProtocol makeMock() {
return Mockito.spy(new MockHAProtocolImpl());
}
@@ -66,12 +85,24 @@
}
@Override
+ public InetSocketAddress getZKFCAddress() {
+ return null;
+ }
+
+ @Override
public HAServiceProtocol getProxy(Configuration conf, int timeout)
throws IOException {
return proxy;
}
@Override
+ public ZKFCProtocol getZKFCProxy(Configuration conf, int timeout)
+ throws IOException {
+ assert zkfcProxy != null;
+ return zkfcProxy;
+ }
+
+ @Override
public NodeFencer getFencer() {
return fencer;
}
@@ -81,6 +112,11 @@
}
@Override
+ public boolean isAutoFailoverEnabled() {
+ return true;
+ }
+
+ @Override
public String toString() {
return "DummyHAService #" + index;
}
@@ -101,20 +137,28 @@
}
@Override
- public void transitionToActive() throws ServiceFailedException,
+ public void transitionToActive(StateChangeRequestInfo req) throws ServiceFailedException,
AccessControlException, IOException {
checkUnreachable();
if (failToBecomeActive) {
throw new ServiceFailedException("injected failure");
}
-
+ if (sharedResource != null) {
+ sharedResource.take(DummyHAService.this);
+ }
state = HAServiceState.ACTIVE;
}
@Override
- public void transitionToStandby() throws ServiceFailedException,
+ public void transitionToStandby(StateChangeRequestInfo req) throws ServiceFailedException,
AccessControlException, IOException {
checkUnreachable();
+ if (failToBecomeStandby) {
+ throw new ServiceFailedException("injected failure");
+ }
+ if (sharedResource != null) {
+ sharedResource.release(DummyHAService.this);
+ }
state = HAServiceState.STANDBY;
}
@@ -138,4 +182,26 @@
public void close() throws IOException {
}
}
+
+ public static class DummyFencer implements FenceMethod {
+ public void checkArgs(String args) throws BadFencingConfigurationException {
+ }
+
+ @Override
+ public boolean tryFence(HAServiceTarget target, String args)
+ throws BadFencingConfigurationException {
+ LOG.info("tryFence(" + target + ")");
+ DummyHAService svc = (DummyHAService)target;
+ synchronized (svc) {
+ svc.fenceCount++;
+ }
+ if (svc.failToFence) {
+ LOG.info("Injected failure to fence");
+ return false;
+ }
+ svc.sharedResource.release(svc);
+ return true;
+ }
+ }
+
}
diff --git a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ha/DummySharedResource.java b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ha/DummySharedResource.java
new file mode 100644
index 0000000..a7cf41d
--- /dev/null
+++ b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ha/DummySharedResource.java
@@ -0,0 +1,52 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.ha;
+
+import org.junit.Assert;
+
+/**
+ * A fake shared resource, for use in automatic failover testing.
+ * This simulates a real shared resource like a shared edit log.
+ * When the {@link DummyHAService} instances change state or get
+ * fenced, they notify the shared resource, which asserts that
+ * we never have two HA services who think they're holding the
+ * resource at the same time.
+ */
+public class DummySharedResource {
+ private DummyHAService holder = null;
+ private int violations = 0;
+
+ public synchronized void take(DummyHAService newHolder) {
+ if (holder == null || holder == newHolder) {
+ holder = newHolder;
+ } else {
+ violations++;
+ throw new IllegalStateException("already held by: " + holder);
+ }
+ }
+
+ public synchronized void release(DummyHAService oldHolder) {
+ if (holder == oldHolder) {
+ holder = null;
+ }
+ }
+
+ public synchronized void assertNoViolations() {
+ Assert.assertEquals(0, violations);
+ }
+}
diff --git a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ha/MiniZKFCCluster.java b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ha/MiniZKFCCluster.java
new file mode 100644
index 0000000..1db7924
--- /dev/null
+++ b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ha/MiniZKFCCluster.java
@@ -0,0 +1,319 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.ha;
+
+import static org.junit.Assert.assertArrayEquals;
+import static org.junit.Assert.assertEquals;
+
+import java.io.IOException;
+import java.net.InetSocketAddress;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.CommonConfigurationKeys;
+import org.apache.hadoop.ha.HAServiceProtocol.HAServiceState;
+import org.apache.hadoop.ha.HealthMonitor.State;
+import org.apache.hadoop.security.AccessControlException;
+import org.apache.hadoop.security.authorize.PolicyProvider;
+import org.apache.hadoop.test.MultithreadedTestUtil.TestContext;
+import org.apache.hadoop.test.MultithreadedTestUtil.TestingThread;
+import org.apache.zookeeper.KeeperException.NoNodeException;
+import org.apache.zookeeper.data.Stat;
+import org.apache.zookeeper.server.ZooKeeperServer;
+
+import com.google.common.base.Preconditions;
+import com.google.common.primitives.Ints;
+
+/**
+ * Harness for starting two dummy ZK FailoverControllers, associated with
+ * DummyHAServices. This harness starts two such ZKFCs, designated by
+ * indexes 0 and 1, and provides utilities for building tests around them.
+ */
+public class MiniZKFCCluster {
+ private final TestContext ctx;
+ private final ZooKeeperServer zks;
+
+ private DummyHAService svcs[];
+ private DummyZKFCThread thrs[];
+ private Configuration conf;
+
+ private DummySharedResource sharedResource = new DummySharedResource();
+
+ private static final Log LOG = LogFactory.getLog(MiniZKFCCluster.class);
+
+ public MiniZKFCCluster(Configuration conf, ZooKeeperServer zks) {
+ this.conf = conf;
+ // Fast check interval so tests run faster
+ conf.setInt(CommonConfigurationKeys.HA_HM_CHECK_INTERVAL_KEY, 50);
+ conf.setInt(CommonConfigurationKeys.HA_HM_CONNECT_RETRY_INTERVAL_KEY, 50);
+ conf.setInt(CommonConfigurationKeys.HA_HM_SLEEP_AFTER_DISCONNECT_KEY, 50);
+ svcs = new DummyHAService[2];
+ svcs[0] = new DummyHAService(HAServiceState.INITIALIZING,
+ new InetSocketAddress("svc1", 1234));
+ svcs[0].setSharedResource(sharedResource);
+ svcs[1] = new DummyHAService(HAServiceState.INITIALIZING,
+ new InetSocketAddress("svc2", 1234));
+ svcs[1].setSharedResource(sharedResource);
+
+ this.ctx = new TestContext();
+ this.zks = zks;
+ }
+
+ /**
+ * Set up two services and their failover controllers. svc1 is started
+ * first, so that it enters ACTIVE state, and then svc2 is started,
+ * which enters STANDBY
+ */
+ public void start() throws Exception {
+ // Format the base dir, should succeed
+ thrs = new DummyZKFCThread[2];
+ thrs[0] = new DummyZKFCThread(ctx, svcs[0]);
+ assertEquals(0, thrs[0].zkfc.run(new String[]{"-formatZK"}));
+ ctx.addThread(thrs[0]);
+ thrs[0].start();
+
+ LOG.info("Waiting for svc0 to enter active state");
+ waitForHAState(0, HAServiceState.ACTIVE);
+
+ LOG.info("Adding svc1");
+ thrs[1] = new DummyZKFCThread(ctx, svcs[1]);
+ thrs[1].start();
+ waitForHAState(1, HAServiceState.STANDBY);
+ }
+
+ /**
+ * Stop the services.
+ * @throws Exception if either of the services had encountered a fatal error
+ */
+ public void stop() throws Exception {
+ for (DummyZKFCThread thr : thrs) {
+ if (thr != null) {
+ thr.interrupt();
+ }
+ }
+ if (ctx != null) {
+ ctx.stop();
+ }
+ sharedResource.assertNoViolations();
+ }
+
+ /**
+ * @return the TestContext implementation used internally. This allows more
+ * threads to be added to the context, etc.
+ */
+ public TestContext getTestContext() {
+ return ctx;
+ }
+
+ public DummyHAService getService(int i) {
+ return svcs[i];
+ }
+
+ public ActiveStandbyElector getElector(int i) {
+ return thrs[i].zkfc.getElectorForTests();
+ }
+
+ public DummyZKFC getZkfc(int i) {
+ return thrs[i].zkfc;
+ }
+
+ public void setHealthy(int idx, boolean healthy) {
+ svcs[idx].isHealthy = healthy;
+ }
+
+ public void setFailToBecomeActive(int idx, boolean doFail) {
+ svcs[idx].failToBecomeActive = doFail;
+ }
+
+ public void setFailToBecomeStandby(int idx, boolean doFail) {
+ svcs[idx].failToBecomeStandby = doFail;
+ }
+
+ public void setFailToFence(int idx, boolean doFail) {
+ svcs[idx].failToFence = doFail;
+ }
+
+ public void setUnreachable(int idx, boolean unreachable) {
+ svcs[idx].actUnreachable = unreachable;
+ }
+
+ /**
+ * Wait for the given HA service to enter the given HA state.
+ */
+ public void waitForHAState(int idx, HAServiceState state)
+ throws Exception {
+ DummyHAService svc = getService(idx);
+ while (svc.state != state) {
+ ctx.checkException();
+ Thread.sleep(50);
+ }
+ }
+
+ /**
+ * Wait for the ZKFC to be notified of a change in health state.
+ */
+ public void waitForHealthState(int idx, State state)
+ throws Exception {
+ ZKFCTestUtil.waitForHealthState(thrs[idx].zkfc, state, ctx);
+ }
+
+ /**
+ * Wait for the given elector to enter the given elector state.
+ * @param idx the service index (0 or 1)
+ * @param state the state to wait for
+ * @throws Exception if it times out, or an exception occurs on one
+ * of the ZKFC threads while waiting.
+ */
+ public void waitForElectorState(int idx,
+ ActiveStandbyElector.State state) throws Exception {
+ ActiveStandbyElectorTestUtil.waitForElectorState(ctx,
+ getElector(idx), state);
+ }
+
+
+
+ /**
+ * Expire the ZK session of the given service. This requires
+ * (and asserts) that the given service be the current active.
+ * @throws NoNodeException if no service holds the lock
+ */
+ public void expireActiveLockHolder(int idx)
+ throws NoNodeException {
+ Stat stat = new Stat();
+ byte[] data = zks.getZKDatabase().getData(
+ DummyZKFC.LOCK_ZNODE, stat, null);
+
+ assertArrayEquals(Ints.toByteArray(svcs[idx].index), data);
+ long session = stat.getEphemeralOwner();
+ LOG.info("Expiring svc " + idx + "'s zookeeper session " + session);
+ zks.closeSession(session);
+ }
+
+
+ /**
+ * Wait for the given HA service to become the active lock holder.
+ * If the passed svc is null, waits for there to be no active
+ * lock holder.
+ */
+ public void waitForActiveLockHolder(Integer idx)
+ throws Exception {
+ DummyHAService svc = idx == null ? null : svcs[idx];
+ ActiveStandbyElectorTestUtil.waitForActiveLockData(ctx, zks,
+ DummyZKFC.SCOPED_PARENT_ZNODE,
+ (idx == null) ? null : Ints.toByteArray(svc.index));
+ }
+
+
+ /**
+ * Expires the ZK session associated with service 'fromIdx', and waits
+ * until service 'toIdx' takes over.
+ * @throws Exception if the target service does not become active
+ */
+ public void expireAndVerifyFailover(int fromIdx, int toIdx)
+ throws Exception {
+ Preconditions.checkArgument(fromIdx != toIdx);
+
+ getElector(fromIdx).preventSessionReestablishmentForTests();
+ try {
+ expireActiveLockHolder(fromIdx);
+
+ waitForHAState(fromIdx, HAServiceState.STANDBY);
+ waitForHAState(toIdx, HAServiceState.ACTIVE);
+ } finally {
+ getElector(fromIdx).allowSessionReestablishmentForTests();
+ }
+ }
+
+ /**
+ * Test-thread which runs a ZK Failover Controller corresponding
+ * to a given dummy service.
+ */
+ private class DummyZKFCThread extends TestingThread {
+ private final DummyZKFC zkfc;
+
+ public DummyZKFCThread(TestContext ctx, DummyHAService svc) {
+ super(ctx);
+ this.zkfc = new DummyZKFC(conf, svc);
+ }
+
+ @Override
+ public void doWork() throws Exception {
+ try {
+ assertEquals(0, zkfc.run(new String[0]));
+ } catch (InterruptedException ie) {
+ // Interrupted by main thread, that's OK.
+ }
+ }
+ }
+
+ static class DummyZKFC extends ZKFailoverController {
+ private static final String DUMMY_CLUSTER = "dummy-cluster";
+ public static final String SCOPED_PARENT_ZNODE =
+ ZKFailoverController.ZK_PARENT_ZNODE_DEFAULT + "/" +
+ DUMMY_CLUSTER;
+ private static final String LOCK_ZNODE =
+ SCOPED_PARENT_ZNODE + "/" + ActiveStandbyElector.LOCK_FILENAME;
+ private final DummyHAService localTarget;
+
+ public DummyZKFC(Configuration conf, DummyHAService localTarget) {
+ super(conf, localTarget);
+ this.localTarget = localTarget;
+ }
+
+ @Override
+ protected byte[] targetToData(HAServiceTarget target) {
+ return Ints.toByteArray(((DummyHAService)target).index);
+ }
+
+ @Override
+ protected HAServiceTarget dataToTarget(byte[] data) {
+ int index = Ints.fromByteArray(data);
+ return DummyHAService.getInstance(index);
+ }
+
+ @Override
+ protected void loginAsFCUser() throws IOException {
+ }
+
+ @Override
+ protected String getScopeInsideParentNode() {
+ return DUMMY_CLUSTER;
+ }
+
+ @Override
+ protected void checkRpcAdminAccess() throws AccessControlException {
+ }
+
+ @Override
+ protected InetSocketAddress getRpcAddressToBindTo() {
+ return new InetSocketAddress(0);
+ }
+
+ @Override
+ protected void initRPC() throws IOException {
+ super.initRPC();
+ localTarget.zkfcProxy = this.getRpcServerForTests();
+ }
+
+ @Override
+ protected PolicyProvider getPolicyProvider() {
+ return null;
+ }
+ }
+}
\ No newline at end of file
diff --git a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ha/TestActiveStandbyElector.java b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ha/TestActiveStandbyElector.java
index b9786ae..2eba967 100644
--- a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ha/TestActiveStandbyElector.java
+++ b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ha/TestActiveStandbyElector.java
@@ -19,6 +19,7 @@
package org.apache.hadoop.ha;
import java.io.IOException;
+import java.util.Collections;
import java.util.List;
import org.apache.zookeeper.AsyncCallback;
@@ -40,6 +41,7 @@
import org.apache.hadoop.HadoopIllegalArgumentException;
import org.apache.hadoop.ha.ActiveStandbyElector.ActiveStandbyElectorCallback;
import org.apache.hadoop.ha.ActiveStandbyElector.ActiveNotFoundException;
+import org.apache.hadoop.ha.HAZKUtil.ZKAuthInfo;
public class TestActiveStandbyElector {
@@ -51,9 +53,12 @@
private ActiveStandbyElectorTester elector;
class ActiveStandbyElectorTester extends ActiveStandbyElector {
+ private int sleptFor = 0;
+
ActiveStandbyElectorTester(String hostPort, int timeout, String parent,
List<ACL> acl, ActiveStandbyElectorCallback app) throws IOException {
- super(hostPort, timeout, parent, acl, app);
+ super(hostPort, timeout, parent, acl,
+ Collections.<ZKAuthInfo>emptyList(), app);
}
@Override
@@ -61,6 +66,14 @@
++count;
return mockZK;
}
+
+ @Override
+ protected void sleepFor(int ms) {
+ // don't sleep in unit tests! Instead, just record the amount of
+ // time slept
+ LOG.info("Would have slept for " + ms + "ms");
+ sleptFor += ms;
+ }
}
private static final String ZK_PARENT_NAME = "/parent/node";
@@ -147,6 +160,68 @@
}
/**
+ * Verify that, when the callback fails to enter active state,
+ * the elector rejoins the election after sleeping for a short period.
+ */
+ @Test
+ public void testFailToBecomeActive() throws Exception {
+ mockNoPriorActive();
+ elector.joinElection(data);
+ Assert.assertEquals(0, elector.sleptFor);
+
+ Mockito.doThrow(new ServiceFailedException("failed to become active"))
+ .when(mockApp).becomeActive();
+ elector.processResult(Code.OK.intValue(), ZK_LOCK_NAME, mockZK,
+ ZK_LOCK_NAME);
+ // Should have tried to become active
+ Mockito.verify(mockApp).becomeActive();
+
+ // should re-join
+ Mockito.verify(mockZK, Mockito.times(2)).create(ZK_LOCK_NAME, data,
+ Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL, elector, mockZK);
+ Assert.assertEquals(2, count);
+ Assert.assertTrue(elector.sleptFor > 0);
+ }
+
+ /**
+ * Verify that, when the callback fails to enter active state, after
+ * a ZK disconnect (i.e from the StatCallback), that the elector rejoins
+ * the election after sleeping for a short period.
+ */
+ @Test
+ public void testFailToBecomeActiveAfterZKDisconnect() throws Exception {
+ mockNoPriorActive();
+ elector.joinElection(data);
+ Assert.assertEquals(0, elector.sleptFor);
+
+ elector.processResult(Code.CONNECTIONLOSS.intValue(), ZK_LOCK_NAME, mockZK,
+ ZK_LOCK_NAME);
+ Mockito.verify(mockZK, Mockito.times(2)).create(ZK_LOCK_NAME, data,
+ Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL, elector, mockZK);
+
+ elector.processResult(Code.NODEEXISTS.intValue(), ZK_LOCK_NAME, mockZK,
+ ZK_LOCK_NAME);
+ verifyExistCall(1);
+
+ Stat stat = new Stat();
+ stat.setEphemeralOwner(1L);
+ Mockito.when(mockZK.getSessionId()).thenReturn(1L);
+
+ // Fake failure to become active from within the stat callback
+ Mockito.doThrow(new ServiceFailedException("fail to become active"))
+ .when(mockApp).becomeActive();
+ elector.processResult(Code.OK.intValue(), ZK_LOCK_NAME, mockZK, stat);
+ Mockito.verify(mockApp, Mockito.times(1)).becomeActive();
+
+ // should re-join
+ Mockito.verify(mockZK, Mockito.times(3)).create(ZK_LOCK_NAME, data,
+ Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL, elector, mockZK);
+ Assert.assertEquals(2, count);
+ Assert.assertTrue(elector.sleptFor > 0);
+ }
+
+
+ /**
* Verify that, if there is a record of a prior active node, the
* elector asks the application to fence it before becoming active.
*/
@@ -314,6 +389,7 @@
*/
@Test
public void testStatNodeRetry() {
+ elector.joinElection(data);
elector.processResult(Code.CONNECTIONLOSS.intValue(), ZK_LOCK_NAME, mockZK,
(Stat) null);
elector.processResult(Code.CONNECTIONLOSS.intValue(), ZK_LOCK_NAME, mockZK,
@@ -334,6 +410,7 @@
*/
@Test
public void testStatNodeError() {
+ elector.joinElection(data);
elector.processResult(Code.RUNTIMEINCONSISTENCY.intValue(), ZK_LOCK_NAME,
mockZK, (Stat) null);
Mockito.verify(mockApp, Mockito.times(0)).enterNeutralMode();
@@ -517,6 +594,8 @@
*/
@Test
public void testQuitElection() throws Exception {
+ elector.joinElection(data);
+ Mockito.verify(mockZK, Mockito.times(0)).close();
elector.quitElection(true);
Mockito.verify(mockZK, Mockito.times(1)).close();
// no watches added
diff --git a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ha/TestActiveStandbyElectorRealZK.java b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ha/TestActiveStandbyElectorRealZK.java
index 3a0fa5f..d51d5fa 100644
--- a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ha/TestActiveStandbyElectorRealZK.java
+++ b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ha/TestActiveStandbyElectorRealZK.java
@@ -21,15 +21,16 @@
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertTrue;
-import java.io.File;
+import java.util.Collections;
import java.util.UUID;
import org.apache.commons.logging.impl.Log4JLogger;
import org.apache.hadoop.ha.ActiveStandbyElector.ActiveStandbyElectorCallback;
+import org.apache.hadoop.ha.ActiveStandbyElector.State;
+import org.apache.hadoop.ha.HAZKUtil.ZKAuthInfo;
import org.apache.log4j.Level;
import org.apache.zookeeper.ZooDefs.Ids;
import org.apache.zookeeper.server.ZooKeeperServer;
-import org.apache.zookeeper.test.ClientBase;
import org.junit.Test;
import org.mockito.AdditionalMatchers;
import org.mockito.Mockito;
@@ -39,7 +40,7 @@
/**
* Test for {@link ActiveStandbyElector} using real zookeeper.
*/
-public class TestActiveStandbyElectorRealZK extends ClientBase {
+public class TestActiveStandbyElectorRealZK extends ClientBaseWithFixes {
static final int NUM_ELECTORS = 2;
static {
@@ -58,8 +59,6 @@
@Override
public void setUp() throws Exception {
- // build.test.dir is used by zookeeper
- new File(System.getProperty("build.test.dir", "build")).mkdirs();
super.setUp();
zkServer = getServer(serverFactory);
@@ -68,7 +67,8 @@
cbs[i] = Mockito.mock(ActiveStandbyElectorCallback.class);
appDatas[i] = Ints.toByteArray(i);
electors[i] = new ActiveStandbyElector(
- hostPort, 5000, PARENT_DIR, Ids.OPEN_ACL_UNSAFE, cbs[i]);
+ hostPort, 5000, PARENT_DIR, Ids.OPEN_ACL_UNSAFE,
+ Collections.<ZKAuthInfo>emptyList(), cbs[i]);
}
}
@@ -196,4 +196,63 @@
checkFatalsAndReset();
}
+
+ @Test(timeout=15000)
+ public void testHandleSessionExpirationOfStandby() throws Exception {
+ // Let elector 0 be active
+ electors[0].ensureParentZNode();
+ electors[0].joinElection(appDatas[0]);
+ ZooKeeperServer zks = getServer(serverFactory);
+ ActiveStandbyElectorTestUtil.waitForActiveLockData(null,
+ zks, PARENT_DIR, appDatas[0]);
+ Mockito.verify(cbs[0], Mockito.timeout(1000)).becomeActive();
+ checkFatalsAndReset();
+
+ // Let elector 1 be standby
+ electors[1].joinElection(appDatas[1]);
+ ActiveStandbyElectorTestUtil.waitForElectorState(null, electors[1],
+ State.STANDBY);
+
+ LOG.info("========================== Expiring standby's session");
+ zks.closeSession(electors[1].getZKSessionIdForTests());
+
+ // Should enter neutral mode when disconnected
+ Mockito.verify(cbs[1], Mockito.timeout(1000)).enterNeutralMode();
+
+ // Should re-join the election and go back to STANDBY
+ ActiveStandbyElectorTestUtil.waitForElectorState(null, electors[1],
+ State.STANDBY);
+ checkFatalsAndReset();
+
+ LOG.info("========================== Quitting election");
+ electors[1].quitElection(false);
+
+ // Double check that we don't accidentally re-join the election
+ // by quitting elector 0 and ensuring elector 1 doesn't become active
+ electors[0].quitElection(false);
+
+ // due to receiving the "expired" event.
+ Thread.sleep(1000);
+ Mockito.verify(cbs[1], Mockito.never()).becomeActive();
+ ActiveStandbyElectorTestUtil.waitForActiveLockData(null,
+ zks, PARENT_DIR, null);
+
+ checkFatalsAndReset();
+ }
+
+ @Test(timeout=15000)
+ public void testDontJoinElectionOnDisconnectAndReconnect() throws Exception {
+ electors[0].ensureParentZNode();
+
+ stopServer();
+ ActiveStandbyElectorTestUtil.waitForElectorState(
+ null, electors[0], State.NEUTRAL);
+ startServer();
+ waitForServerUp(hostPort, CONNECTION_TIMEOUT);
+ // Have to sleep to allow time for the clients to reconnect.
+ Thread.sleep(2000);
+ Mockito.verify(cbs[0], Mockito.never()).becomeActive();
+ Mockito.verify(cbs[1], Mockito.never()).becomeActive();
+ checkFatalsAndReset();
+ }
}
diff --git a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ha/TestFailoverController.java b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ha/TestFailoverController.java
index 7d30bdf..791aaad 100644
--- a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ha/TestFailoverController.java
+++ b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ha/TestFailoverController.java
@@ -27,11 +27,12 @@
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.CommonConfigurationKeys;
import org.apache.hadoop.ha.HAServiceProtocol.HAServiceState;
+import org.apache.hadoop.ha.HAServiceProtocol.StateChangeRequestInfo;
+import org.apache.hadoop.ha.HAServiceProtocol.RequestSource;
import org.apache.hadoop.ha.TestNodeFencer.AlwaysSucceedFencer;
import org.apache.hadoop.ha.TestNodeFencer.AlwaysFailFencer;
import static org.apache.hadoop.ha.TestNodeFencer.setupFencer;
import org.apache.hadoop.security.AccessControlException;
-import org.apache.hadoop.test.MockitoUtil;
import org.junit.Test;
import org.mockito.Mockito;
@@ -118,7 +119,8 @@
public void testFailoverToUnreadyService() throws Exception {
DummyHAService svc1 = new DummyHAService(HAServiceState.ACTIVE, svc1Addr);
DummyHAService svc2 = new DummyHAService(HAServiceState.STANDBY, svc2Addr);
- Mockito.doReturn(STATE_NOT_READY).when(svc2.proxy).getServiceStatus();
+ Mockito.doReturn(STATE_NOT_READY).when(svc2.proxy)
+ .getServiceStatus();
svc1.fencer = svc2.fencer = setupFencer(AlwaysSucceedFencer.class.getName());
try {
@@ -162,7 +164,7 @@
public void testFailoverFromFaultyServiceSucceeds() throws Exception {
DummyHAService svc1 = new DummyHAService(HAServiceState.ACTIVE, svc1Addr);
Mockito.doThrow(new ServiceFailedException("Failed!"))
- .when(svc1.proxy).transitionToStandby();
+ .when(svc1.proxy).transitionToStandby(anyReqInfo());
DummyHAService svc2 = new DummyHAService(HAServiceState.STANDBY, svc2Addr);
svc1.fencer = svc2.fencer = setupFencer(AlwaysSucceedFencer.class.getName());
@@ -185,7 +187,7 @@
public void testFailoverFromFaultyServiceFencingFailure() throws Exception {
DummyHAService svc1 = new DummyHAService(HAServiceState.ACTIVE, svc1Addr);
Mockito.doThrow(new ServiceFailedException("Failed!"))
- .when(svc1.proxy).transitionToStandby();
+ .when(svc1.proxy).transitionToStandby(anyReqInfo());
DummyHAService svc2 = new DummyHAService(HAServiceState.STANDBY, svc2Addr);
svc1.fencer = svc2.fencer = setupFencer(AlwaysFailFencer.class.getName());
@@ -284,7 +286,7 @@
DummyHAService svc1 = spy(new DummyHAService(HAServiceState.ACTIVE, svc1Addr));
DummyHAService svc2 = new DummyHAService(HAServiceState.STANDBY, svc2Addr);
Mockito.doThrow(new ServiceFailedException("Failed!"))
- .when(svc2.proxy).transitionToActive();
+ .when(svc2.proxy).transitionToActive(anyReqInfo());
svc1.fencer = svc2.fencer = setupFencer(AlwaysSucceedFencer.class.getName());
try {
@@ -295,8 +297,8 @@
}
// svc1 went standby then back to active
- verify(svc1.proxy).transitionToStandby();
- verify(svc1.proxy).transitionToActive();
+ verify(svc1.proxy).transitionToStandby(anyReqInfo());
+ verify(svc1.proxy).transitionToActive(anyReqInfo());
assertEquals(HAServiceState.ACTIVE, svc1.state);
assertEquals(HAServiceState.STANDBY, svc2.state);
}
@@ -306,7 +308,7 @@
DummyHAService svc1 = new DummyHAService(HAServiceState.ACTIVE, svc1Addr);
DummyHAService svc2 = new DummyHAService(HAServiceState.STANDBY, svc2Addr);
Mockito.doThrow(new ServiceFailedException("Failed!"))
- .when(svc2.proxy).transitionToActive();
+ .when(svc2.proxy).transitionToActive(anyReqInfo());
svc1.fencer = svc2.fencer = setupFencer(AlwaysSucceedFencer.class.getName());
try {
@@ -327,7 +329,7 @@
DummyHAService svc1 = new DummyHAService(HAServiceState.ACTIVE, svc1Addr);
DummyHAService svc2 = new DummyHAService(HAServiceState.STANDBY, svc2Addr);
Mockito.doThrow(new ServiceFailedException("Failed!"))
- .when(svc2.proxy).transitionToActive();
+ .when(svc2.proxy).transitionToActive(anyReqInfo());
svc1.fencer = svc2.fencer = setupFencer(AlwaysSucceedFencer.class.getName());
AlwaysSucceedFencer.fenceCalled = 0;
@@ -346,12 +348,16 @@
assertSame(svc2, AlwaysSucceedFencer.fencedSvc);
}
+ private StateChangeRequestInfo anyReqInfo() {
+ return Mockito.<StateChangeRequestInfo>any();
+ }
+
@Test
public void testFailureToFenceOnFailbackFailsTheFailback() throws Exception {
DummyHAService svc1 = new DummyHAService(HAServiceState.ACTIVE, svc1Addr);
DummyHAService svc2 = new DummyHAService(HAServiceState.STANDBY, svc2Addr);
Mockito.doThrow(new IOException("Failed!"))
- .when(svc2.proxy).transitionToActive();
+ .when(svc2.proxy).transitionToActive(anyReqInfo());
svc1.fencer = svc2.fencer = setupFencer(AlwaysFailFencer.class.getName());
AlwaysFailFencer.fenceCalled = 0;
@@ -374,10 +380,10 @@
public void testFailbackToFaultyServiceFails() throws Exception {
DummyHAService svc1 = new DummyHAService(HAServiceState.ACTIVE, svc1Addr);
Mockito.doThrow(new ServiceFailedException("Failed!"))
- .when(svc1.proxy).transitionToActive();
+ .when(svc1.proxy).transitionToActive(anyReqInfo());
DummyHAService svc2 = new DummyHAService(HAServiceState.STANDBY, svc2Addr);
Mockito.doThrow(new ServiceFailedException("Failed!"))
- .when(svc2.proxy).transitionToActive();
+ .when(svc2.proxy).transitionToActive(anyReqInfo());
svc1.fencer = svc2.fencer = setupFencer(AlwaysSucceedFencer.class.getName());
@@ -420,7 +426,8 @@
private void doFailover(HAServiceTarget tgt1, HAServiceTarget tgt2,
boolean forceFence, boolean forceActive) throws FailoverFailedException {
- FailoverController fc = new FailoverController(conf);
+ FailoverController fc = new FailoverController(conf,
+ RequestSource.REQUEST_BY_USER);
fc.failover(tgt1, tgt2, forceFence, forceActive);
}
diff --git a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ha/TestHAZKUtil.java b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ha/TestHAZKUtil.java
new file mode 100644
index 0000000..7b4d63a
--- /dev/null
+++ b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ha/TestHAZKUtil.java
@@ -0,0 +1,135 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.ha;
+
+import static org.junit.Assert.*;
+
+import java.io.File;
+import java.io.FileNotFoundException;
+import java.io.IOException;
+import java.util.List;
+
+import org.apache.hadoop.ha.HAZKUtil.BadAclFormatException;
+import org.apache.hadoop.ha.HAZKUtil.ZKAuthInfo;
+import org.apache.zookeeper.ZooDefs.Perms;
+import org.apache.zookeeper.data.ACL;
+import org.junit.Test;
+
+import com.google.common.base.Charsets;
+import com.google.common.io.Files;
+
+public class TestHAZKUtil {
+ private static final String TEST_ROOT_DIR = System.getProperty(
+ "test.build.data", "/tmp") + "/TestHAZKUtil";
+ private static final File TEST_FILE = new File(TEST_ROOT_DIR,
+ "test-file");
+
+ /** A path which is expected not to exist */
+ private static final String BOGUS_FILE = "/xxxx-this-does-not-exist";
+
+ @Test
+ public void testEmptyACL() {
+ List<ACL> result = HAZKUtil.parseACLs("");
+ assertTrue(result.isEmpty());
+ }
+
+ @Test
+ public void testNullACL() {
+ List<ACL> result = HAZKUtil.parseACLs(null);
+ assertTrue(result.isEmpty());
+ }
+
+ @Test
+ public void testInvalidACLs() {
+ badAcl("a:b",
+ "ACL 'a:b' not of expected form scheme:id:perm"); // not enough parts
+ badAcl("a",
+ "ACL 'a' not of expected form scheme:id:perm"); // not enough parts
+ badAcl("password:foo:rx",
+ "Invalid permission 'x' in permission string 'rx'");
+ }
+
+ private static void badAcl(String acls, String expectedErr) {
+ try {
+ HAZKUtil.parseACLs(acls);
+ fail("Should have failed to parse '" + acls + "'");
+ } catch (BadAclFormatException e) {
+ assertEquals(expectedErr, e.getMessage());
+ }
+ }
+
+ @Test
+ public void testGoodACLs() {
+ List<ACL> result = HAZKUtil.parseACLs(
+ "sasl:hdfs/host1@MY.DOMAIN:cdrwa, sasl:hdfs/host2@MY.DOMAIN:ca");
+ ACL acl0 = result.get(0);
+ assertEquals(Perms.CREATE | Perms.DELETE | Perms.READ |
+ Perms.WRITE | Perms.ADMIN, acl0.getPerms());
+ assertEquals("sasl", acl0.getId().getScheme());
+ assertEquals("hdfs/host1@MY.DOMAIN", acl0.getId().getId());
+
+ ACL acl1 = result.get(1);
+ assertEquals(Perms.CREATE | Perms.ADMIN, acl1.getPerms());
+ assertEquals("sasl", acl1.getId().getScheme());
+ assertEquals("hdfs/host2@MY.DOMAIN", acl1.getId().getId());
+ }
+
+ @Test
+ public void testEmptyAuth() {
+ List<ZKAuthInfo> result = HAZKUtil.parseAuth("");
+ assertTrue(result.isEmpty());
+ }
+
+ @Test
+ public void testNullAuth() {
+ List<ZKAuthInfo> result = HAZKUtil.parseAuth(null);
+ assertTrue(result.isEmpty());
+ }
+
+ @Test
+ public void testGoodAuths() {
+ List<ZKAuthInfo> result = HAZKUtil.parseAuth(
+ "scheme:data,\n scheme2:user:pass");
+ assertEquals(2, result.size());
+ ZKAuthInfo auth0 = result.get(0);
+ assertEquals("scheme", auth0.getScheme());
+ assertEquals("data", new String(auth0.getAuth()));
+
+ ZKAuthInfo auth1 = result.get(1);
+ assertEquals("scheme2", auth1.getScheme());
+ assertEquals("user:pass", new String(auth1.getAuth()));
+ }
+
+ @Test
+ public void testConfIndirection() throws IOException {
+ assertNull(HAZKUtil.resolveConfIndirection(null));
+ assertEquals("x", HAZKUtil.resolveConfIndirection("x"));
+
+ TEST_FILE.getParentFile().mkdirs();
+ Files.write("hello world", TEST_FILE, Charsets.UTF_8);
+ assertEquals("hello world", HAZKUtil.resolveConfIndirection(
+ "@" + TEST_FILE.getAbsolutePath()));
+
+ try {
+ HAZKUtil.resolveConfIndirection("@" + BOGUS_FILE);
+ fail("Did not throw for non-existent file reference");
+ } catch (FileNotFoundException fnfe) {
+ assertTrue(fnfe.getMessage().startsWith(BOGUS_FILE));
+ }
+ }
+}
diff --git a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ha/TestZKFailoverController.java b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ha/TestZKFailoverController.java
index 93f46a5..e5480e2 100644
--- a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ha/TestZKFailoverController.java
+++ b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ha/TestZKFailoverController.java
@@ -19,93 +19,58 @@
import static org.junit.Assert.*;
-import java.io.File;
-import java.net.InetSocketAddress;
+import java.security.NoSuchAlgorithmException;
import org.apache.commons.logging.impl.Log4JLogger;
import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.CommonConfigurationKeys;
import org.apache.hadoop.ha.HAServiceProtocol.HAServiceState;
+import org.apache.hadoop.ha.HAServiceProtocol.StateChangeRequestInfo;
import org.apache.hadoop.ha.HealthMonitor.State;
-import org.apache.hadoop.test.MultithreadedTestUtil;
-import org.apache.hadoop.test.MultithreadedTestUtil.TestContext;
-import org.apache.hadoop.test.MultithreadedTestUtil.TestingThread;
+import org.apache.hadoop.ha.MiniZKFCCluster.DummyZKFC;
+import org.apache.hadoop.test.GenericTestUtils;
import org.apache.log4j.Level;
-import org.apache.zookeeper.KeeperException.NoNodeException;
+import org.apache.zookeeper.KeeperException;
+import org.apache.zookeeper.ZooKeeper;
import org.apache.zookeeper.data.Stat;
-import org.apache.zookeeper.server.ZooKeeperServer;
-import org.apache.zookeeper.test.ClientBase;
+import org.apache.zookeeper.server.auth.DigestAuthenticationProvider;
import org.junit.Before;
import org.junit.Test;
import org.mockito.Mockito;
-import com.google.common.primitives.Ints;
-
-public class TestZKFailoverController extends ClientBase {
+public class TestZKFailoverController extends ClientBaseWithFixes {
private Configuration conf;
- private DummyHAService svc1;
- private DummyHAService svc2;
- private TestContext ctx;
- private DummyZKFCThread thr1, thr2;
+ private MiniZKFCCluster cluster;
+
+ // Set up ZK digest-based credentials for the purposes of the tests,
+ // to make sure all of our functionality works with auth and ACLs
+ // present.
+ private static final String DIGEST_USER_PASS="test-user:test-password";
+ private static final String TEST_AUTH_GOOD =
+ "digest:" + DIGEST_USER_PASS;
+ private static final String DIGEST_USER_HASH;
+ static {
+ try {
+ DIGEST_USER_HASH = DigestAuthenticationProvider.generateDigest(
+ DIGEST_USER_PASS);
+ } catch (NoSuchAlgorithmException e) {
+ throw new RuntimeException(e);
+ }
+ }
+ private static final String TEST_ACL =
+ "digest:" + DIGEST_USER_HASH + ":rwcda";
static {
((Log4JLogger)ActiveStandbyElector.LOG).getLogger().setLevel(Level.ALL);
}
- @Override
- public void setUp() throws Exception {
- // build.test.dir is used by zookeeper
- new File(System.getProperty("build.test.dir", "build")).mkdirs();
- super.setUp();
- }
-
@Before
public void setupConfAndServices() {
conf = new Configuration();
- conf.set(ZKFailoverController.ZK_QUORUM_KEY, hostPort);
- // Fast check interval so tests run faster
- conf.setInt(CommonConfigurationKeys.HA_HM_CHECK_INTERVAL_KEY, 50);
- conf.setInt(CommonConfigurationKeys.HA_HM_CONNECT_RETRY_INTERVAL_KEY, 50);
- conf.setInt(CommonConfigurationKeys.HA_HM_SLEEP_AFTER_DISCONNECT_KEY, 50);
- svc1 = new DummyHAService(HAServiceState.INITIALIZING,
- new InetSocketAddress("svc1", 1234));
- svc2 = new DummyHAService(HAServiceState.INITIALIZING,
- new InetSocketAddress("svc2", 1234));
- }
-
- /**
- * Set up two services and their failover controllers. svc1 is started
- * first, so that it enters ACTIVE state, and then svc2 is started,
- * which enters STANDBY
- */
- private void setupFCs() throws Exception {
- // Format the base dir, should succeed
- assertEquals(0, runFC(svc1, "-formatZK"));
+ conf.set(ZKFailoverController.ZK_ACL_KEY, TEST_ACL);
+ conf.set(ZKFailoverController.ZK_AUTH_KEY, TEST_AUTH_GOOD);
- ctx = new MultithreadedTestUtil.TestContext();
- thr1 = new DummyZKFCThread(ctx, svc1);
- ctx.addThread(thr1);
- thr1.start();
-
- LOG.info("Waiting for svc1 to enter active state");
- waitForHAState(svc1, HAServiceState.ACTIVE);
-
- LOG.info("Adding svc2");
- thr2 = new DummyZKFCThread(ctx, svc2);
- thr2.start();
- waitForHAState(svc2, HAServiceState.STANDBY);
- }
-
- private void stopFCs() throws Exception {
- if (thr1 != null) {
- thr1.interrupt();
- }
- if (thr2 != null) {
- thr2.interrupt();
- }
- if (ctx != null) {
- ctx.stop();
- }
+ conf.set(ZKFailoverController.ZK_QUORUM_KEY, hostPort);
+ this.cluster = new MiniZKFCCluster(conf, getServer(serverFactory));
}
/**
@@ -114,20 +79,104 @@
*/
@Test(timeout=15000)
public void testFormatZK() throws Exception {
+ DummyHAService svc = cluster.getService(1);
// Run without formatting the base dir,
// should barf
assertEquals(ZKFailoverController.ERR_CODE_NO_PARENT_ZNODE,
- runFC(svc1));
+ runFC(svc));
// Format the base dir, should succeed
- assertEquals(0, runFC(svc1, "-formatZK"));
+ assertEquals(0, runFC(svc, "-formatZK"));
// Should fail to format if already formatted
assertEquals(ZKFailoverController.ERR_CODE_FORMAT_DENIED,
- runFC(svc1, "-formatZK", "-nonInteractive"));
+ runFC(svc, "-formatZK", "-nonInteractive"));
// Unless '-force' is on
- assertEquals(0, runFC(svc1, "-formatZK", "-force"));
+ assertEquals(0, runFC(svc, "-formatZK", "-force"));
+ }
+
+ /**
+ * Test that if ZooKeeper is not running, the correct error
+ * code is returned.
+ */
+ @Test(timeout=15000)
+ public void testNoZK() throws Exception {
+ stopServer();
+ DummyHAService svc = cluster.getService(1);
+ assertEquals(ZKFailoverController.ERR_CODE_NO_ZK,
+ runFC(svc));
+ }
+
+ @Test
+ public void testFormatOneClusterLeavesOtherClustersAlone() throws Exception {
+ DummyHAService svc = cluster.getService(1);
+
+ DummyZKFC zkfcInOtherCluster = new DummyZKFC(conf, cluster.getService(1)) {
+ @Override
+ protected String getScopeInsideParentNode() {
+ return "other-scope";
+ }
+ };
+
+ // Run without formatting the base dir,
+ // should barf
+ assertEquals(ZKFailoverController.ERR_CODE_NO_PARENT_ZNODE,
+ runFC(svc));
+
+ // Format the base dir, should succeed
+ assertEquals(0, runFC(svc, "-formatZK"));
+
+ // Run the other cluster without formatting, should barf because
+ // it uses a different parent znode
+ assertEquals(ZKFailoverController.ERR_CODE_NO_PARENT_ZNODE,
+ zkfcInOtherCluster.run(new String[]{}));
+
+ // Should succeed in formatting the second cluster
+ assertEquals(0, zkfcInOtherCluster.run(new String[]{"-formatZK"}));
+
+ // But should not have deleted the original base node from the first
+ // cluster
+ assertEquals(ZKFailoverController.ERR_CODE_FORMAT_DENIED,
+ runFC(svc, "-formatZK", "-nonInteractive"));
+ }
+
+ /**
+ * Test that automatic failover won't run against a target that hasn't
+ * explicitly enabled the feature.
+ */
+ @Test(timeout=10000)
+ public void testWontRunWhenAutoFailoverDisabled() throws Exception {
+ DummyHAService svc = cluster.getService(1);
+ svc = Mockito.spy(svc);
+ Mockito.doReturn(false).when(svc).isAutoFailoverEnabled();
+
+ assertEquals(ZKFailoverController.ERR_CODE_AUTO_FAILOVER_NOT_ENABLED,
+ runFC(svc, "-formatZK"));
+ assertEquals(ZKFailoverController.ERR_CODE_AUTO_FAILOVER_NOT_ENABLED,
+ runFC(svc));
+ }
+
+ /**
+ * Test that, if ACLs are specified in the configuration, that
+ * it sets the ACLs when formatting the parent node.
+ */
+ @Test(timeout=15000)
+ public void testFormatSetsAcls() throws Exception {
+ // Format the base dir, should succeed
+ DummyHAService svc = cluster.getService(1);
+ assertEquals(0, runFC(svc, "-formatZK"));
+
+ ZooKeeper otherClient = createClient();
+ try {
+ // client without auth should not be able to read it
+ Stat stat = new Stat();
+ otherClient.getData(ZKFailoverController.ZK_PARENT_ZNODE_DEFAULT,
+ false, stat);
+ fail("Was able to read data without authenticating!");
+ } catch (KeeperException.NoAuthException nae) {
+ // expected
+ }
}
/**
@@ -136,14 +185,14 @@
*/
@Test(timeout=15000)
public void testFencingMustBeConfigured() throws Exception {
- svc1 = Mockito.spy(svc1);
+ DummyHAService svc = Mockito.spy(cluster.getService(0));
Mockito.doThrow(new BadFencingConfigurationException("no fencing"))
- .when(svc1).checkFencingConfigured();
+ .when(svc).checkFencingConfigured();
// Format the base dir, should succeed
- assertEquals(0, runFC(svc1, "-formatZK"));
+ assertEquals(0, runFC(svc, "-formatZK"));
// Try to run the actual FC, should fail without a fencer
assertEquals(ZKFailoverController.ERR_CODE_NO_FENCER,
- runFC(svc1));
+ runFC(svc));
}
/**
@@ -155,66 +204,50 @@
@Test(timeout=15000)
public void testAutoFailoverOnBadHealth() throws Exception {
try {
- setupFCs();
+ cluster.start();
+ DummyHAService svc1 = cluster.getService(1);
- LOG.info("Faking svc1 unhealthy, should failover to svc2");
- svc1.isHealthy = false;
- LOG.info("Waiting for svc1 to enter standby state");
- waitForHAState(svc1, HAServiceState.STANDBY);
- waitForHAState(svc2, HAServiceState.ACTIVE);
+ LOG.info("Faking svc0 unhealthy, should failover to svc1");
+ cluster.setHealthy(0, false);
+
+ LOG.info("Waiting for svc0 to enter standby state");
+ cluster.waitForHAState(0, HAServiceState.STANDBY);
+ cluster.waitForHAState(1, HAServiceState.ACTIVE);
- LOG.info("Allowing svc1 to be healthy again, making svc2 unreachable " +
+ LOG.info("Allowing svc0 to be healthy again, making svc1 unreachable " +
"and fail to gracefully go to standby");
- svc1.isHealthy = true;
- svc2.actUnreachable = true;
-
- // Allow fencing to succeed
- Mockito.doReturn(true).when(svc2.fencer).fence(Mockito.same(svc2));
- // Should fail back to svc1 at this point
- waitForHAState(svc1, HAServiceState.ACTIVE);
- // and fence svc2
- Mockito.verify(svc2.fencer).fence(Mockito.same(svc2));
+ cluster.setUnreachable(1, true);
+ cluster.setHealthy(0, true);
+
+ // Should fail back to svc0 at this point
+ cluster.waitForHAState(0, HAServiceState.ACTIVE);
+ // and fence svc1
+ Mockito.verify(svc1.fencer).fence(Mockito.same(svc1));
} finally {
- stopFCs();
+ cluster.stop();
}
}
@Test(timeout=15000)
public void testAutoFailoverOnLostZKSession() throws Exception {
try {
- setupFCs();
+ cluster.start();
- // Expire svc1, it should fail over to svc2
- expireAndVerifyFailover(thr1, thr2);
+ // Expire svc0, it should fail over to svc1
+ cluster.expireAndVerifyFailover(0, 1);
- // Expire svc2, it should fail back to svc1
- expireAndVerifyFailover(thr2, thr1);
+ // Expire svc1, it should fail back to svc0
+ cluster.expireAndVerifyFailover(1, 0);
LOG.info("======= Running test cases second time to test " +
"re-establishment =========");
- // Expire svc1, it should fail over to svc2
- expireAndVerifyFailover(thr1, thr2);
+ // Expire svc0, it should fail over to svc1
+ cluster.expireAndVerifyFailover(0, 1);
- // Expire svc2, it should fail back to svc1
- expireAndVerifyFailover(thr2, thr1);
+ // Expire svc1, it should fail back to svc0
+ cluster.expireAndVerifyFailover(1, 0);
} finally {
- stopFCs();
- }
- }
-
- private void expireAndVerifyFailover(DummyZKFCThread fromThr,
- DummyZKFCThread toThr) throws Exception {
- DummyHAService fromSvc = fromThr.zkfc.localTarget;
- DummyHAService toSvc = toThr.zkfc.localTarget;
-
- fromThr.zkfc.getElectorForTests().preventSessionReestablishmentForTests();
- try {
- expireActiveLockHolder(fromSvc);
-
- waitForHAState(fromSvc, HAServiceState.STANDBY);
- waitForHAState(toSvc, HAServiceState.ACTIVE);
- } finally {
- fromThr.zkfc.getElectorForTests().allowSessionReestablishmentForTests();
+ cluster.stop();
}
}
@@ -225,33 +258,32 @@
@Test(timeout=15000)
public void testDontFailoverToUnhealthyNode() throws Exception {
try {
- setupFCs();
+ cluster.start();
- // Make svc2 unhealthy, and wait for its FC to notice the bad health.
- svc2.isHealthy = false;
- waitForHealthState(thr2.zkfc,
- HealthMonitor.State.SERVICE_UNHEALTHY);
+ // Make svc1 unhealthy, and wait for its FC to notice the bad health.
+ cluster.setHealthy(1, false);
+ cluster.waitForHealthState(1, HealthMonitor.State.SERVICE_UNHEALTHY);
- // Expire svc1
- thr1.zkfc.getElectorForTests().preventSessionReestablishmentForTests();
+ // Expire svc0
+ cluster.getElector(0).preventSessionReestablishmentForTests();
try {
- expireActiveLockHolder(svc1);
+ cluster.expireActiveLockHolder(0);
- LOG.info("Expired svc1's ZK session. Waiting a second to give svc2" +
+ LOG.info("Expired svc0's ZK session. Waiting a second to give svc1" +
" a chance to take the lock, if it is ever going to.");
Thread.sleep(1000);
// Ensure that no one holds the lock.
- waitForActiveLockHolder(null);
+ cluster.waitForActiveLockHolder(null);
} finally {
- LOG.info("Allowing svc1's elector to re-establish its connection");
- thr1.zkfc.getElectorForTests().allowSessionReestablishmentForTests();
+ LOG.info("Allowing svc0's elector to re-establish its connection");
+ cluster.getElector(0).allowSessionReestablishmentForTests();
}
- // svc1 should get the lock again
- waitForActiveLockHolder(svc1);
+ // svc0 should get the lock again
+ cluster.waitForActiveLockHolder(0);
} finally {
- stopFCs();
+ cluster.stop();
}
}
@@ -262,29 +294,38 @@
@Test(timeout=15000)
public void testBecomingActiveFails() throws Exception {
try {
- setupFCs();
+ cluster.start();
+ DummyHAService svc1 = cluster.getService(1);
- LOG.info("Making svc2 fail to become active");
- svc2.failToBecomeActive = true;
+ LOG.info("Making svc1 fail to become active");
+ cluster.setFailToBecomeActive(1, true);
- LOG.info("Faking svc1 unhealthy, should NOT successfully " +
- "failover to svc2");
- svc1.isHealthy = false;
- waitForHealthState(thr1.zkfc, State.SERVICE_UNHEALTHY);
- waitForActiveLockHolder(null);
+ LOG.info("Faking svc0 unhealthy, should NOT successfully " +
+ "failover to svc1");
+ cluster.setHealthy(0, false);
+ cluster.waitForHealthState(0, State.SERVICE_UNHEALTHY);
+ cluster.waitForActiveLockHolder(null);
- Mockito.verify(svc2.proxy).transitionToActive();
-
- waitForHAState(svc1, HAServiceState.STANDBY);
- waitForHAState(svc2, HAServiceState.STANDBY);
- LOG.info("Faking svc1 healthy again, should go back to svc1");
- svc1.isHealthy = true;
- waitForHAState(svc1, HAServiceState.ACTIVE);
- waitForHAState(svc2, HAServiceState.STANDBY);
- waitForActiveLockHolder(svc1);
+ Mockito.verify(svc1.proxy, Mockito.timeout(2000).atLeastOnce())
+ .transitionToActive(Mockito.<StateChangeRequestInfo>any());
+
+ cluster.waitForHAState(0, HAServiceState.STANDBY);
+ cluster.waitForHAState(1, HAServiceState.STANDBY);
+
+ LOG.info("Faking svc0 healthy again, should go back to svc0");
+ cluster.setHealthy(0, true);
+ cluster.waitForHAState(0, HAServiceState.ACTIVE);
+ cluster.waitForHAState(1, HAServiceState.STANDBY);
+ cluster.waitForActiveLockHolder(0);
+
+ // Ensure that we can fail back to svc1 once it it is able
+ // to become active (e.g the admin has restarted it)
+ LOG.info("Allowing svc1 to become active, expiring svc0");
+ svc1.failToBecomeActive = false;
+ cluster.expireAndVerifyFailover(0, 1);
} finally {
- stopFCs();
+ cluster.stop();
}
}
@@ -296,27 +337,25 @@
@Test(timeout=15000)
public void testZooKeeperFailure() throws Exception {
try {
- setupFCs();
+ cluster.start();
// Record initial ZK sessions
- long session1 = thr1.zkfc.getElectorForTests().getZKSessionIdForTests();
- long session2 = thr2.zkfc.getElectorForTests().getZKSessionIdForTests();
+ long session0 = cluster.getElector(0).getZKSessionIdForTests();
+ long session1 = cluster.getElector(1).getZKSessionIdForTests();
LOG.info("====== Stopping ZK server");
stopServer();
waitForServerDown(hostPort, CONNECTION_TIMEOUT);
LOG.info("====== Waiting for services to enter NEUTRAL mode");
- ActiveStandbyElectorTestUtil.waitForElectorState(ctx,
- thr1.zkfc.getElectorForTests(),
+ cluster.waitForElectorState(0,
ActiveStandbyElector.State.NEUTRAL);
- ActiveStandbyElectorTestUtil.waitForElectorState(ctx,
- thr2.zkfc.getElectorForTests(),
+ cluster.waitForElectorState(1,
ActiveStandbyElector.State.NEUTRAL);
LOG.info("====== Checking that the services didn't change HA state");
- assertEquals(HAServiceState.ACTIVE, svc1.state);
- assertEquals(HAServiceState.STANDBY, svc2.state);
+ assertEquals(HAServiceState.ACTIVE, cluster.getService(0).state);
+ assertEquals(HAServiceState.STANDBY, cluster.getService(1).state);
LOG.info("====== Restarting server");
startServer();
@@ -324,134 +363,224 @@
// Nodes should go back to their original states, since they re-obtain
// the same sessions.
- ActiveStandbyElectorTestUtil.waitForElectorState(ctx,
- thr1.zkfc.getElectorForTests(),
- ActiveStandbyElector.State.ACTIVE);
- ActiveStandbyElectorTestUtil.waitForElectorState(ctx,
- thr2.zkfc.getElectorForTests(),
- ActiveStandbyElector.State.STANDBY);
+ cluster.waitForElectorState(0, ActiveStandbyElector.State.ACTIVE);
+ cluster.waitForElectorState(1, ActiveStandbyElector.State.STANDBY);
// Check HA states didn't change.
- ActiveStandbyElectorTestUtil.waitForElectorState(ctx,
- thr1.zkfc.getElectorForTests(),
- ActiveStandbyElector.State.ACTIVE);
- ActiveStandbyElectorTestUtil.waitForElectorState(ctx,
- thr2.zkfc.getElectorForTests(),
- ActiveStandbyElector.State.STANDBY);
+ cluster.waitForHAState(0, HAServiceState.ACTIVE);
+ cluster.waitForHAState(1, HAServiceState.STANDBY);
+
// Check they re-used the same sessions and didn't spuriously reconnect
+ assertEquals(session0,
+ cluster.getElector(0).getZKSessionIdForTests());
assertEquals(session1,
- thr1.zkfc.getElectorForTests().getZKSessionIdForTests());
- assertEquals(session2,
- thr2.zkfc.getElectorForTests().getZKSessionIdForTests());
+ cluster.getElector(1).getZKSessionIdForTests());
} finally {
- stopFCs();
- }
- }
-
- /**
- * Expire the ZK session of the given service. This requires
- * (and asserts) that the given service be the current active.
- * @throws NoNodeException if no service holds the lock
- */
- private void expireActiveLockHolder(DummyHAService expectedActive)
- throws NoNodeException {
- ZooKeeperServer zks = getServer(serverFactory);
- Stat stat = new Stat();
- byte[] data = zks.getZKDatabase().getData(
- ZKFailoverController.ZK_PARENT_ZNODE_DEFAULT + "/" +
- ActiveStandbyElector.LOCK_FILENAME, stat, null);
-
- assertArrayEquals(Ints.toByteArray(expectedActive.index), data);
- long session = stat.getEphemeralOwner();
- LOG.info("Expiring svc " + expectedActive + "'s zookeeper session " + session);
- zks.closeSession(session);
- }
-
- /**
- * Wait for the given HA service to enter the given HA state.
- */
- private void waitForHAState(DummyHAService svc, HAServiceState state)
- throws Exception {
- while (svc.state != state) {
- ctx.checkException();
- Thread.sleep(50);
+ cluster.stop();
}
}
/**
- * Wait for the ZKFC to be notified of a change in health state.
+ * Test that the ZKFC can gracefully cede its active status.
*/
- private void waitForHealthState(DummyZKFC zkfc, State state)
+ @Test(timeout=15000)
+ public void testCedeActive() throws Exception {
+ try {
+ cluster.start();
+ DummyZKFC zkfc = cluster.getZkfc(0);
+ // It should be in active to start.
+ assertEquals(ActiveStandbyElector.State.ACTIVE,
+ zkfc.getElectorForTests().getStateForTests());
+
+ // Ask it to cede active for 3 seconds. It should respond promptly
+ // (i.e. the RPC itself should not take 3 seconds!)
+ ZKFCProtocol proxy = zkfc.getLocalTarget().getZKFCProxy(conf, 5000);
+ long st = System.currentTimeMillis();
+ proxy.cedeActive(3000);
+ long et = System.currentTimeMillis();
+ assertTrue("RPC to cedeActive took " + (et - st) + " ms",
+ et - st < 1000);
+
+ // Should be in "INIT" state since it's not in the election
+ // at this point.
+ assertEquals(ActiveStandbyElector.State.INIT,
+ zkfc.getElectorForTests().getStateForTests());
+
+ // After the prescribed 3 seconds, should go into STANDBY state,
+ // since the other node in the cluster would have taken ACTIVE.
+ cluster.waitForElectorState(0, ActiveStandbyElector.State.STANDBY);
+ long et2 = System.currentTimeMillis();
+ assertTrue("Should take ~3 seconds to rejoin. Only took " + (et2 - et) +
+ "ms before rejoining.",
+ et2 - et > 2800);
+ } finally {
+ cluster.stop();
+ }
+ }
+
+ @Test(timeout=15000)
+ public void testGracefulFailover() throws Exception {
+ try {
+ cluster.start();
+
+ cluster.waitForActiveLockHolder(0);
+ cluster.getService(1).getZKFCProxy(conf, 5000).gracefulFailover();
+ cluster.waitForActiveLockHolder(1);
+ cluster.getService(0).getZKFCProxy(conf, 5000).gracefulFailover();
+ cluster.waitForActiveLockHolder(0);
+
+ assertEquals(0, cluster.getService(0).fenceCount);
+ assertEquals(0, cluster.getService(1).fenceCount);
+ } finally {
+ cluster.stop();
+ }
+ }
+
+ @Test(timeout=15000)
+ public void testGracefulFailoverToUnhealthy() throws Exception {
+ try {
+ cluster.start();
+
+ cluster.waitForActiveLockHolder(0);
+
+ // Mark it unhealthy, wait for it to exit election
+ cluster.setHealthy(1, false);
+ cluster.waitForElectorState(1, ActiveStandbyElector.State.INIT);
+
+ // Ask for failover, it should fail, because it's unhealthy
+ try {
+ cluster.getService(1).getZKFCProxy(conf, 5000).gracefulFailover();
+ fail("Did not fail to graceful failover to unhealthy service!");
+ } catch (ServiceFailedException sfe) {
+ GenericTestUtils.assertExceptionContains(
+ cluster.getService(1).toString() +
+ " is not currently healthy.", sfe);
+ }
+ } finally {
+ cluster.stop();
+ }
+ }
+
+ @Test(timeout=15000)
+ public void testGracefulFailoverFailBecomingActive() throws Exception {
+ try {
+ cluster.start();
+
+ cluster.waitForActiveLockHolder(0);
+ cluster.setFailToBecomeActive(1, true);
+
+ // Ask for failover, it should fail and report back to user.
+ try {
+ cluster.getService(1).getZKFCProxy(conf, 5000).gracefulFailover();
+ fail("Did not fail to graceful failover when target failed " +
+ "to become active!");
+ } catch (ServiceFailedException sfe) {
+ GenericTestUtils.assertExceptionContains(
+ "Couldn't make " + cluster.getService(1) + " active", sfe);
+ GenericTestUtils.assertExceptionContains(
+ "injected failure", sfe);
+ }
+
+ // No fencing
+ assertEquals(0, cluster.getService(0).fenceCount);
+ assertEquals(0, cluster.getService(1).fenceCount);
+
+ // Service 0 should go back to being active after the failed failover
+ cluster.waitForActiveLockHolder(0);
+ } finally {
+ cluster.stop();
+ }
+ }
+
+ @Test(timeout=15000)
+ public void testGracefulFailoverFailBecomingStandby() throws Exception {
+ try {
+ cluster.start();
+
+ cluster.waitForActiveLockHolder(0);
+
+ // Ask for failover when old node fails to transition to standby.
+ // This should trigger fencing, since the cedeActive() command
+ // still works, but leaves the breadcrumb in place.
+ cluster.setFailToBecomeStandby(0, true);
+ cluster.getService(1).getZKFCProxy(conf, 5000).gracefulFailover();
+
+ // Check that the old node was fenced
+ assertEquals(1, cluster.getService(0).fenceCount);
+ } finally {
+ cluster.stop();
+ }
+ }
+
+ @Test(timeout=15000)
+ public void testGracefulFailoverFailBecomingStandbyAndFailFence()
throws Exception {
- while (zkfc.getLastHealthState() != state) {
- ctx.checkException();
- Thread.sleep(50);
+ try {
+ cluster.start();
+
+ cluster.waitForActiveLockHolder(0);
+
+ // Ask for failover when old node fails to transition to standby.
+ // This should trigger fencing, since the cedeActive() command
+ // still works, but leaves the breadcrumb in place.
+ cluster.setFailToBecomeStandby(0, true);
+ cluster.setFailToFence(0, true);
+
+ try {
+ cluster.getService(1).getZKFCProxy(conf, 5000).gracefulFailover();
+ fail("Failover should have failed when old node wont fence");
+ } catch (ServiceFailedException sfe) {
+ GenericTestUtils.assertExceptionContains(
+ "Unable to fence " + cluster.getService(0), sfe);
+ }
+ } finally {
+ cluster.stop();
}
}
/**
- * Wait for the given HA service to become the active lock holder.
- * If the passed svc is null, waits for there to be no active
- * lock holder.
+ * Test which exercises all of the inputs into ZKFC. This is particularly
+ * useful for running under jcarder to check for lock order violations.
*/
- private void waitForActiveLockHolder(DummyHAService svc)
- throws Exception {
- ZooKeeperServer zks = getServer(serverFactory);
- ActiveStandbyElectorTestUtil.waitForActiveLockData(ctx, zks,
- ZKFailoverController.ZK_PARENT_ZNODE_DEFAULT,
- (svc == null) ? null : Ints.toByteArray(svc.index));
- }
+ @Test(timeout=30000)
+ public void testOneOfEverything() throws Exception {
+ try {
+ cluster.start();
+
+ // Failover by session expiration
+ LOG.info("====== Failing over by session expiration");
+ cluster.expireAndVerifyFailover(0, 1);
+ cluster.expireAndVerifyFailover(1, 0);
+
+ // Restart ZK
+ LOG.info("====== Restarting server");
+ stopServer();
+ waitForServerDown(hostPort, CONNECTION_TIMEOUT);
+ startServer();
+ waitForServerUp(hostPort, CONNECTION_TIMEOUT);
+ // Failover by bad health
+ cluster.setHealthy(0, false);
+ cluster.waitForHAState(0, HAServiceState.STANDBY);
+ cluster.waitForHAState(1, HAServiceState.ACTIVE);
+ cluster.setHealthy(1, true);
+ cluster.setHealthy(0, false);
+ cluster.waitForHAState(1, HAServiceState.ACTIVE);
+ cluster.waitForHAState(0, HAServiceState.STANDBY);
+ cluster.setHealthy(0, true);
+
+ cluster.waitForHealthState(0, State.SERVICE_HEALTHY);
+
+ // Graceful failovers
+ cluster.getZkfc(1).gracefulFailoverToYou();
+ cluster.getZkfc(0).gracefulFailoverToYou();
+ } finally {
+ cluster.stop();
+ }
+ }
private int runFC(DummyHAService target, String ... args) throws Exception {
- DummyZKFC zkfc = new DummyZKFC(target);
- zkfc.setConf(conf);
+ DummyZKFC zkfc = new DummyZKFC(conf, target);
return zkfc.run(args);
}
- /**
- * Test-thread which runs a ZK Failover Controller corresponding
- * to a given dummy service.
- */
- private class DummyZKFCThread extends TestingThread {
- private final DummyZKFC zkfc;
-
- public DummyZKFCThread(TestContext ctx, DummyHAService svc) {
- super(ctx);
- this.zkfc = new DummyZKFC(svc);
- zkfc.setConf(conf);
- }
-
- @Override
- public void doWork() throws Exception {
- try {
- assertEquals(0, zkfc.run(new String[0]));
- } catch (InterruptedException ie) {
- // Interrupted by main thread, that's OK.
- }
- }
- }
-
- private static class DummyZKFC extends ZKFailoverController {
- private final DummyHAService localTarget;
-
- public DummyZKFC(DummyHAService localTarget) {
- this.localTarget = localTarget;
- }
-
- @Override
- protected byte[] targetToData(HAServiceTarget target) {
- return Ints.toByteArray(((DummyHAService)target).index);
- }
-
- @Override
- protected HAServiceTarget dataToTarget(byte[] data) {
- int index = Ints.fromByteArray(data);
- return DummyHAService.getInstance(index);
- }
-
- @Override
- protected HAServiceTarget getLocalTarget() {
- return localTarget;
- }
- }
}
diff --git a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ha/TestZKFailoverControllerStress.java b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ha/TestZKFailoverControllerStress.java
new file mode 100644
index 0000000..c1c2726
--- /dev/null
+++ b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ha/TestZKFailoverControllerStress.java
@@ -0,0 +1,156 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.ha;
+
+import java.util.Random;
+
+
+import org.apache.hadoop.conf.Configuration;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+import org.mockito.Mockito;
+import org.mockito.invocation.InvocationOnMock;
+import org.mockito.stubbing.Answer;
+
+/**
+ * Stress test for ZKFailoverController.
+ * Starts multiple ZKFCs for dummy services, and then performs many automatic
+ * failovers. While doing so, ensures that a fake "shared resource"
+ * (simulating the shared edits dir) is only owned by one service at a time.
+ */
+public class TestZKFailoverControllerStress extends ClientBaseWithFixes {
+
+ private static final int STRESS_RUNTIME_SECS = 30;
+ private static final int EXTRA_TIMEOUT_SECS = 10;
+
+ private Configuration conf;
+ private MiniZKFCCluster cluster;
+
+ @Before
+ public void setupConfAndServices() throws Exception {
+ conf = new Configuration();
+ conf.set(ZKFailoverController.ZK_QUORUM_KEY, hostPort);
+ this.cluster = new MiniZKFCCluster(conf, getServer(serverFactory));
+ }
+
+ @After
+ public void stopCluster() throws Exception {
+ cluster.stop();
+ }
+
+ /**
+ * Simply fail back and forth between two services for the
+ * configured amount of time, via expiring their ZK sessions.
+ */
+ @Test(timeout=(STRESS_RUNTIME_SECS + EXTRA_TIMEOUT_SECS) * 1000)
+ public void testExpireBackAndForth() throws Exception {
+ cluster.start();
+ long st = System.currentTimeMillis();
+ long runFor = STRESS_RUNTIME_SECS * 1000;
+
+ int i = 0;
+ while (System.currentTimeMillis() - st < runFor) {
+ // flip flop the services back and forth
+ int from = i % 2;
+ int to = (i + 1) % 2;
+
+ // Expire one service, it should fail over to the other
+ LOG.info("Failing over via expiration from " + from + " to " + to);
+ cluster.expireAndVerifyFailover(from, to);
+
+ i++;
+ }
+ }
+
+ /**
+ * Randomly expire the ZK sessions of the two ZKFCs. This differs
+ * from the above test in that it is not a controlled failover -
+ * we just do random expirations and expect neither one to ever
+ * generate fatal exceptions.
+ */
+ @Test(timeout=(STRESS_RUNTIME_SECS + EXTRA_TIMEOUT_SECS) * 1000)
+ public void testRandomExpirations() throws Exception {
+ cluster.start();
+ long st = System.currentTimeMillis();
+ long runFor = STRESS_RUNTIME_SECS * 1000;
+
+ Random r = new Random();
+ while (System.currentTimeMillis() - st < runFor) {
+ cluster.getTestContext().checkException();
+ int targetIdx = r.nextInt(2);
+ ActiveStandbyElector target = cluster.getElector(targetIdx);
+ long sessId = target.getZKSessionIdForTests();
+ if (sessId != -1) {
+ LOG.info(String.format("Expiring session %x for svc %d",
+ sessId, targetIdx));
+ getServer(serverFactory).closeSession(sessId);
+ }
+ Thread.sleep(r.nextInt(300));
+ }
+ }
+
+ /**
+ * Have the services fail their health checks half the time,
+ * causing the master role to bounce back and forth in the
+ * cluster. Meanwhile, causes ZK to disconnect clients every
+ * 50ms, to trigger the retry code and failures to become active.
+ */
+ @Test(timeout=(STRESS_RUNTIME_SECS + EXTRA_TIMEOUT_SECS) * 1000)
+ public void testRandomHealthAndDisconnects() throws Exception {
+ long runFor = STRESS_RUNTIME_SECS * 1000;
+ Mockito.doAnswer(new RandomlyThrow(0))
+ .when(cluster.getService(0).proxy).monitorHealth();
+ Mockito.doAnswer(new RandomlyThrow(1))
+ .when(cluster.getService(1).proxy).monitorHealth();
+ ActiveStandbyElector.NUM_RETRIES = 100;
+
+ // Don't start until after the above mocking. Otherwise we can get
+ // Mockito errors if the HM calls the proxy in the middle of
+ // setting up the mock.
+ cluster.start();
+
+ long st = System.currentTimeMillis();
+ while (System.currentTimeMillis() - st < runFor) {
+ cluster.getTestContext().checkException();
+ serverFactory.closeAll();
+ Thread.sleep(50);
+ }
+ }
+
+
+ /**
+ * Randomly throw an exception half the time the method is called
+ */
+ @SuppressWarnings("rawtypes")
+ private static class RandomlyThrow implements Answer {
+ private Random r = new Random();
+ private final int svcIdx;
+ public RandomlyThrow(int svcIdx) {
+ this.svcIdx = svcIdx;
+ }
+ @Override
+ public Object answer(InvocationOnMock invocation) throws Throwable {
+ if (r.nextBoolean()) {
+ LOG.info("Throwing an exception for svc " + svcIdx);
+ throw new HealthCheckFailedException("random failure");
+ }
+ return invocation.callRealMethod();
+ }
+ }
+}
diff --git a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ha/ZKFCTestUtil.java b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ha/ZKFCTestUtil.java
new file mode 100644
index 0000000..4a5eacd
--- /dev/null
+++ b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ha/ZKFCTestUtil.java
@@ -0,0 +1,34 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.ha;
+
+import org.apache.hadoop.test.MultithreadedTestUtil;
+
+public class ZKFCTestUtil {
+
+ public static void waitForHealthState(ZKFailoverController zkfc,
+ HealthMonitor.State state,
+ MultithreadedTestUtil.TestContext ctx) throws Exception {
+ while (zkfc.getLastHealthState() != state) {
+ if (ctx != null) {
+ ctx.checkException();
+ }
+ Thread.sleep(50);
+ }
+ }
+}
diff --git a/hadoop-hdfs-project/hadoop-hdfs/CHANGES.HDFS-3042.txt b/hadoop-hdfs-project/hadoop-hdfs/CHANGES.HDFS-3042.txt
new file mode 100644
index 0000000..394064f
--- /dev/null
+++ b/hadoop-hdfs-project/hadoop-hdfs/CHANGES.HDFS-3042.txt
@@ -0,0 +1,19 @@
+Changes for HDFS-3042 branch.
+
+This change list will be merged into the trunk CHANGES.txt when the HDFS-3042
+branch is merged.
+------------------------------
+
+HDFS-2185. HDFS portion of ZK-based FailoverController (todd)
+
+HDFS-3200. Scope all ZKFC configurations by nameservice (todd)
+
+HDFS-3223. add zkfc to hadoop-daemon.sh script (todd)
+
+HDFS-3261. TestHASafeMode fails on HDFS-3042 branch (todd)
+
+HDFS-3159. Document NN auto-failover setup and configuration (todd)
+
+HDFS-3412. Fix findbugs warnings in auto-HA branch (todd)
+
+HDFS-3432. TestDFSZKFailoverController tries to fail over too early (todd)
diff --git a/hadoop-hdfs-project/hadoop-hdfs/dev-support/findbugsExcludeFile.xml b/hadoop-hdfs-project/hadoop-hdfs/dev-support/findbugsExcludeFile.xml
index 31a38c7..3461d6a 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/dev-support/findbugsExcludeFile.xml
+++ b/hadoop-hdfs-project/hadoop-hdfs/dev-support/findbugsExcludeFile.xml
@@ -6,6 +6,9 @@
<Package name="org.apache.hadoop.hdfs.protocol.proto" />
</Match>
<Match>
+ <Package name="org.apache.hadoop.hdfs.server.namenode.ha.proto" />
+ </Match>
+ <Match>
<Bug pattern="EI_EXPOSE_REP" />
</Match>
<Match>
diff --git a/hadoop-hdfs-project/hadoop-hdfs/pom.xml b/hadoop-hdfs-project/hadoop-hdfs/pom.xml
index 8bdef62..7198f9f 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/pom.xml
+++ b/hadoop-hdfs-project/hadoop-hdfs/pom.xml
@@ -100,6 +100,33 @@
<artifactId>ant</artifactId>
<scope>provided</scope>
</dependency>
+ <dependency>
+ <groupId>org.apache.zookeeper</groupId>
+ <artifactId>zookeeper</artifactId>
+ <version>3.4.2</version>
+ <exclusions>
+ <exclusion>
+ <!-- otherwise seems to drag in junit 3.8.1 via jline -->
+ <groupId>junit</groupId>
+ <artifactId>junit</artifactId>
+ </exclusion>
+ <exclusion>
+ <groupId>com.sun.jdmk</groupId>
+ <artifactId>jmxtools</artifactId>
+ </exclusion>
+ <exclusion>
+ <groupId>com.sun.jmx</groupId>
+ <artifactId>jmxri</artifactId>
+ </exclusion>
+ </exclusions>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.zookeeper</groupId>
+ <artifactId>zookeeper</artifactId>
+ <version>3.4.2</version>
+ <type>test-jar</type>
+ <scope>test</scope>
+ </dependency>
</dependencies>
<build>
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/bin/hdfs b/hadoop-hdfs-project/hadoop-hdfs/src/main/bin/hdfs
index 986849e..06b8b5a 100755
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/bin/hdfs
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/bin/hdfs
@@ -30,6 +30,7 @@
echo " namenode -format format the DFS filesystem"
echo " secondarynamenode run the DFS secondary namenode"
echo " namenode run the DFS namenode"
+ echo " zkfc run the ZK Failover Controller daemon"
echo " datanode run a DFS datanode"
echo " dfsadmin run a DFS admin client"
echo " haadmin run a DFS HA admin client"
@@ -76,6 +77,9 @@
if [ "$COMMAND" = "namenode" ] ; then
CLASS='org.apache.hadoop.hdfs.server.namenode.NameNode'
HADOOP_OPTS="$HADOOP_OPTS $HADOOP_NAMENODE_OPTS"
+elif [ "$COMMAND" = "zkfc" ] ; then
+ CLASS='org.apache.hadoop.hdfs.tools.DFSZKFailoverController'
+ HADOOP_OPTS="$HADOOP_OPTS $HADOOP_ZKFC_OPTS"
elif [ "$COMMAND" = "secondarynamenode" ] ; then
CLASS='org.apache.hadoop.hdfs.server.namenode.SecondaryNameNode'
HADOOP_OPTS="$HADOOP_OPTS $HADOOP_SECONDARYNAMENODE_OPTS"
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/bin/start-dfs.sh b/hadoop-hdfs-project/hadoop-hdfs/src/main/bin/start-dfs.sh
index 72d9e90..0d41e55 100755
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/bin/start-dfs.sh
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/bin/start-dfs.sh
@@ -85,4 +85,15 @@
--script "$bin/hdfs" start secondarynamenode
fi
+#---------------------------------------------------------
+# ZK Failover controllers, if auto-HA is enabled
+AUTOHA_ENABLED=$($HADOOP_PREFIX/bin/hdfs getconf -confKey dfs.ha.automatic-failover.enabled)
+if [ "$(echo "$AUTOHA_ENABLED" | tr A-Z a-z)" = "true" ]; then
+ echo "Starting ZK Failover Controllers on NN hosts [$NAMENODES]"
+ "$HADOOP_PREFIX/sbin/hadoop-daemons.sh" \
+ --config "$HADOOP_CONF_DIR" \
+ --hostnames "$NAMENODES" \
+ --script "$bin/hdfs" start zkfc
+fi
+
# eof
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java
index e037e21..493f48e 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java
@@ -348,4 +348,8 @@
public static final String DFS_HA_TAILEDITS_PERIOD_KEY = "dfs.ha.tail-edits.period";
public static final int DFS_HA_TAILEDITS_PERIOD_DEFAULT = 60; // 1m
public static final String DFS_HA_FENCE_METHODS_KEY = "dfs.ha.fencing.methods";
+ public static final String DFS_HA_AUTO_FAILOVER_ENABLED_KEY = "dfs.ha.automatic-failover.enabled";
+ public static final boolean DFS_HA_AUTO_FAILOVER_ENABLED_DEFAULT = false;
+ public static final String DFS_HA_ZKFC_PORT_KEY = "dfs.ha.zkfc.port";
+ public static final int DFS_HA_ZKFC_PORT_DEFAULT = 8019;
}
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/HDFSPolicyProvider.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/HDFSPolicyProvider.java
index 6e21245..e8e80a8 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/HDFSPolicyProvider.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/HDFSPolicyProvider.java
@@ -20,6 +20,7 @@
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.fs.CommonConfigurationKeys;
import org.apache.hadoop.ha.HAServiceProtocol;
+import org.apache.hadoop.ha.ZKFCProtocol;
import org.apache.hadoop.hdfs.protocol.ClientDatanodeProtocol;
import org.apache.hadoop.hdfs.protocol.ClientProtocol;
import org.apache.hadoop.hdfs.server.protocol.DatanodeProtocol;
@@ -47,6 +48,8 @@
new Service("security.namenode.protocol.acl", NamenodeProtocol.class),
new Service(CommonConfigurationKeys.SECURITY_HA_SERVICE_PROTOCOL_ACL,
HAServiceProtocol.class),
+ new Service(CommonConfigurationKeys.SECURITY_ZKFC_PROTOCOL_ACL,
+ ZKFCProtocol.class),
new Service(
CommonConfigurationKeys.HADOOP_SECURITY_SERVICE_AUTHORIZATION_REFRESH_POLICY,
RefreshAuthorizationPolicyProtocol.class),
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NameNode.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NameNode.java
index 6f87fa7..7d26428 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NameNode.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NameNode.java
@@ -36,6 +36,7 @@
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.ha.HAServiceProtocol.HAServiceState;
+import org.apache.hadoop.ha.HAServiceProtocol.StateChangeRequestInfo;
import org.apache.hadoop.ha.HAServiceStatus;
import org.apache.hadoop.ha.HealthCheckFailedException;
import org.apache.hadoop.ha.ServiceFailedException;
@@ -69,6 +70,7 @@
import org.apache.hadoop.hdfs.server.protocol.NamespaceInfo;
import org.apache.hadoop.hdfs.util.AtomicFileOutputStream;
import org.apache.hadoop.io.IOUtils;
+import org.apache.hadoop.ipc.Server;
import org.apache.hadoop.ipc.StandbyException;
import org.apache.hadoop.metrics2.lib.DefaultMetricsSystem;
import org.apache.hadoop.net.NetUtils;
@@ -145,17 +147,25 @@
}
/**
- * HDFS federation configuration can have two types of parameters:
+ * HDFS configuration can have three types of parameters:
* <ol>
- * <li>Parameter that is common for all the name services in the cluster.</li>
- * <li>Parameters that are specific to a name service. This keys are suffixed
+ * <li>Parameters that are common for all the name services in the cluster.</li>
+ * <li>Parameters that are specific to a name service. These keys are suffixed
* with nameserviceId in the configuration. For example,
* "dfs.namenode.rpc-address.nameservice1".</li>
+ * <li>Parameters that are specific to a single name node. These keys are suffixed
+ * with nameserviceId and namenodeId in the configuration. for example,
+ * "dfs.namenode.rpc-address.nameservice1.namenode1"</li>
* </ol>
*
- * Following are nameservice specific keys.
+ * In the latter cases, operators may specify the configuration without
+ * any suffix, with a nameservice suffix, or with a nameservice and namenode
+ * suffix. The more specific suffix will take precedence.
+ *
+ * These keys are specific to a given namenode, and thus may be configured
+ * globally, for a nameservice, or for a specific namenode within a nameservice.
*/
- public static final String[] NAMESERVICE_SPECIFIC_KEYS = {
+ public static final String[] NAMENODE_SPECIFIC_KEYS = {
DFS_NAMENODE_RPC_ADDRESS_KEY,
DFS_NAMENODE_NAME_DIR_KEY,
DFS_NAMENODE_EDITS_DIR_KEY,
@@ -170,8 +180,19 @@
DFS_NAMENODE_BACKUP_ADDRESS_KEY,
DFS_NAMENODE_BACKUP_HTTP_ADDRESS_KEY,
DFS_NAMENODE_BACKUP_SERVICE_RPC_ADDRESS_KEY,
+ DFS_NAMENODE_USER_NAME_KEY,
DFS_HA_FENCE_METHODS_KEY,
- DFS_NAMENODE_USER_NAME_KEY
+ DFS_HA_ZKFC_PORT_KEY,
+ DFS_HA_FENCE_METHODS_KEY
+ };
+
+ /**
+ * @see #NAMENODE_SPECIFIC_KEYS
+ * These keys are specific to a nameservice, but may not be overridden
+ * for a specific namenode.
+ */
+ public static final String[] NAMESERVICE_SPECIFIC_KEYS = {
+ DFS_HA_AUTO_FAILOVER_ENABLED_KEY
};
public long getProtocolVersion(String protocol,
@@ -1145,8 +1166,11 @@
}
DFSUtil.setGenericConf(conf, nameserviceId, namenodeId,
+ NAMENODE_SPECIFIC_KEYS);
+ DFSUtil.setGenericConf(conf, nameserviceId, null,
NAMESERVICE_SPECIFIC_KEYS);
}
+
if (conf.get(DFS_NAMENODE_RPC_ADDRESS_KEY) != null) {
URI defaultUri = URI.create(HdfsConstants.HDFS_URI_SCHEME + "://"
+ conf.get(DFS_NAMENODE_RPC_ADDRESS_KEY));
@@ -1362,4 +1386,43 @@
public boolean isStandbyState() {
return (state.equals(STANDBY_STATE));
}
+
+ /**
+ * Check that a request to change this node's HA state is valid.
+ * In particular, verifies that, if auto failover is enabled, non-forced
+ * requests from the HAAdmin CLI are rejected, and vice versa.
+ *
+ * @param req the request to check
+ * @throws AccessControlException if the request is disallowed
+ */
+ void checkHaStateChange(StateChangeRequestInfo req)
+ throws AccessControlException {
+ boolean autoHaEnabled = conf.getBoolean(DFS_HA_AUTO_FAILOVER_ENABLED_KEY,
+ DFS_HA_AUTO_FAILOVER_ENABLED_DEFAULT);
+ switch (req.getSource()) {
+ case REQUEST_BY_USER:
+ if (autoHaEnabled) {
+ throw new AccessControlException(
+ "Manual HA control for this NameNode is disallowed, because " +
+ "automatic HA is enabled.");
+ }
+ break;
+ case REQUEST_BY_USER_FORCED:
+ if (autoHaEnabled) {
+ LOG.warn("Allowing manual HA control from " +
+ Server.getRemoteAddress() +
+ " even though automatic HA is enabled, because the user " +
+ "specified the force flag");
+ }
+ break;
+ case REQUEST_BY_ZKFC:
+ if (!autoHaEnabled) {
+ throw new AccessControlException(
+ "Request from ZK failover controller at " +
+ Server.getRemoteAddress() + " denied since automatic HA " +
+ "is not enabled");
+ }
+ break;
+ }
+ }
}
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NameNodeRpcServer.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NameNodeRpcServer.java
index c153633..426f578 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NameNodeRpcServer.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NameNodeRpcServer.java
@@ -977,14 +977,16 @@
}
@Override // HAServiceProtocol
- public synchronized void transitionToActive()
+ public synchronized void transitionToActive(StateChangeRequestInfo req)
throws ServiceFailedException, AccessControlException {
+ nn.checkHaStateChange(req);
nn.transitionToActive();
}
@Override // HAServiceProtocol
- public synchronized void transitionToStandby()
+ public synchronized void transitionToStandby(StateChangeRequestInfo req)
throws ServiceFailedException, AccessControlException {
+ nn.checkHaStateChange(req);
nn.transitionToStandby();
}
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/ha/BootstrapStandby.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/ha/BootstrapStandby.java
index da09fff..fd816a6 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/ha/BootstrapStandby.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/ha/BootstrapStandby.java
@@ -207,7 +207,6 @@
return 0;
}
-
private boolean checkLogsAvailableForRead(FSImage image, long imageTxId,
long curTxIdOnOtherNode) {
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/tools/DFSZKFailoverController.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/tools/DFSZKFailoverController.java
new file mode 100644
index 0000000..b1163d6
--- /dev/null
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/tools/DFSZKFailoverController.java
@@ -0,0 +1,188 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hdfs.tools;
+
+import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_KEYTAB_FILE_KEY;
+import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_USER_NAME_KEY;
+
+import java.io.IOException;
+import java.net.InetSocketAddress;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.HadoopIllegalArgumentException;
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.ha.HAServiceTarget;
+import org.apache.hadoop.ha.ZKFailoverController;
+import org.apache.hadoop.hdfs.DFSConfigKeys;
+import org.apache.hadoop.hdfs.DFSUtil;
+import org.apache.hadoop.hdfs.HAUtil;
+import org.apache.hadoop.hdfs.HDFSPolicyProvider;
+import org.apache.hadoop.hdfs.HdfsConfiguration;
+import org.apache.hadoop.hdfs.server.namenode.NameNode;
+import org.apache.hadoop.hdfs.server.namenode.ha.proto.HAZKInfoProtos.ActiveNodeInfo;
+import org.apache.hadoop.ipc.Server;
+import org.apache.hadoop.security.AccessControlException;
+import org.apache.hadoop.security.SecurityUtil;
+import org.apache.hadoop.security.UserGroupInformation;
+import org.apache.hadoop.security.authorize.AccessControlList;
+import org.apache.hadoop.security.authorize.PolicyProvider;
+import org.apache.hadoop.util.GenericOptionsParser;
+import org.apache.hadoop.util.StringUtils;
+
+import com.google.protobuf.InvalidProtocolBufferException;
+
+@InterfaceAudience.Private
+public class DFSZKFailoverController extends ZKFailoverController {
+
+ private static final Log LOG =
+ LogFactory.getLog(DFSZKFailoverController.class);
+ private AccessControlList adminAcl;
+ /* the same as superclass's localTarget, but with the more specfic NN type */
+ private final NNHAServiceTarget localNNTarget;
+
+ @Override
+ protected HAServiceTarget dataToTarget(byte[] data) {
+ ActiveNodeInfo proto;
+ try {
+ proto = ActiveNodeInfo.parseFrom(data);
+ } catch (InvalidProtocolBufferException e) {
+ throw new RuntimeException("Invalid data in ZK: " +
+ StringUtils.byteToHexString(data));
+ }
+ NNHAServiceTarget ret = new NNHAServiceTarget(
+ conf, proto.getNameserviceId(), proto.getNamenodeId());
+ InetSocketAddress addressFromProtobuf = new InetSocketAddress(
+ proto.getHostname(), proto.getPort());
+
+ if (!addressFromProtobuf.equals(ret.getAddress())) {
+ throw new RuntimeException("Mismatched address stored in ZK for " +
+ ret + ": Stored protobuf was " + proto + ", address from our own " +
+ "configuration for this NameNode was " + ret.getAddress());
+ }
+
+ ret.setZkfcPort(proto.getZkfcPort());
+ return ret;
+ }
+
+ @Override
+ protected byte[] targetToData(HAServiceTarget target) {
+ InetSocketAddress addr = target.getAddress();
+
+ return ActiveNodeInfo.newBuilder()
+ .setHostname(addr.getHostName())
+ .setPort(addr.getPort())
+ .setZkfcPort(target.getZKFCAddress().getPort())
+ .setNameserviceId(localNNTarget.getNameServiceId())
+ .setNamenodeId(localNNTarget.getNameNodeId())
+ .build()
+ .toByteArray();
+ }
+
+ @Override
+ protected InetSocketAddress getRpcAddressToBindTo() {
+ int zkfcPort = getZkfcPort(conf);
+ return new InetSocketAddress(localTarget.getAddress().getAddress(),
+ zkfcPort);
+ }
+
+
+ @Override
+ protected PolicyProvider getPolicyProvider() {
+ return new HDFSPolicyProvider();
+ }
+
+ static int getZkfcPort(Configuration conf) {
+ return conf.getInt(DFSConfigKeys.DFS_HA_ZKFC_PORT_KEY,
+ DFSConfigKeys.DFS_HA_ZKFC_PORT_DEFAULT);
+ }
+
+ public static DFSZKFailoverController create(Configuration conf) {
+ Configuration localNNConf = DFSHAAdmin.addSecurityConfiguration(conf);
+ String nsId = DFSUtil.getNamenodeNameServiceId(conf);
+
+ if (!HAUtil.isHAEnabled(localNNConf, nsId)) {
+ throw new HadoopIllegalArgumentException(
+ "HA is not enabled for this namenode.");
+ }
+ String nnId = HAUtil.getNameNodeId(localNNConf, nsId);
+ NameNode.initializeGenericKeys(localNNConf, nsId, nnId);
+ DFSUtil.setGenericConf(localNNConf, nsId, nnId, ZKFC_CONF_KEYS);
+
+ NNHAServiceTarget localTarget = new NNHAServiceTarget(
+ localNNConf, nsId, nnId);
+ return new DFSZKFailoverController(localNNConf, localTarget);
+ }
+
+ private DFSZKFailoverController(Configuration conf,
+ NNHAServiceTarget localTarget) {
+ super(conf, localTarget);
+ this.localNNTarget = localTarget;
+ // Setup ACLs
+ adminAcl = new AccessControlList(
+ conf.get(DFSConfigKeys.DFS_ADMIN, " "));
+ LOG.info("Failover controller configured for NameNode " +
+ localTarget);
+}
+
+
+ @Override
+ protected void initRPC() throws IOException {
+ super.initRPC();
+ localNNTarget.setZkfcPort(rpcServer.getAddress().getPort());
+ }
+
+ @Override
+ public void loginAsFCUser() throws IOException {
+ InetSocketAddress socAddr = NameNode.getAddress(conf);
+ SecurityUtil.login(conf, DFS_NAMENODE_KEYTAB_FILE_KEY,
+ DFS_NAMENODE_USER_NAME_KEY, socAddr.getHostName());
+ }
+
+ @Override
+ protected String getScopeInsideParentNode() {
+ return localNNTarget.getNameServiceId();
+ }
+
+ public static void main(String args[])
+ throws Exception {
+
+ GenericOptionsParser parser = new GenericOptionsParser(
+ new HdfsConfiguration(), args);
+ DFSZKFailoverController zkfc = DFSZKFailoverController.create(
+ parser.getConfiguration());
+
+ System.exit(zkfc.run(parser.getRemainingArgs()));
+ }
+
+ @Override
+ protected void checkRpcAdminAccess() throws IOException, AccessControlException {
+ UserGroupInformation ugi = UserGroupInformation.getCurrentUser();
+ UserGroupInformation zkfcUgi = UserGroupInformation.getLoginUser();
+ if (adminAcl.isUserAllowed(ugi) ||
+ ugi.getShortUserName().equals(zkfcUgi.getShortUserName())) {
+ LOG.info("Allowed RPC access from " + ugi + " at " + Server.getRemoteAddress());
+ return;
+ }
+ String msg = "Disallowed RPC access from " + ugi + " at " +
+ Server.getRemoteAddress() + ". Not listed in " + DFSConfigKeys.DFS_ADMIN;
+ LOG.warn(msg);
+ throw new AccessControlException(msg);
+ }
+}
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/tools/NNHAServiceTarget.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/tools/NNHAServiceTarget.java
index 1ef58e1..38f5123 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/tools/NNHAServiceTarget.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/tools/NNHAServiceTarget.java
@@ -21,6 +21,7 @@
import java.util.Map;
import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.ha.BadFencingConfigurationException;
import org.apache.hadoop.ha.HAServiceTarget;
import org.apache.hadoop.ha.NodeFencer;
@@ -44,12 +45,14 @@
private static final String NAMENODE_ID_KEY = "namenodeid";
private final InetSocketAddress addr;
+ private InetSocketAddress zkfcAddr;
private NodeFencer fencer;
private BadFencingConfigurationException fenceConfigError;
private final String nnId;
private final String nsId;
-
- public NNHAServiceTarget(HdfsConfiguration conf,
+ private final boolean autoFailoverEnabled;
+
+ public NNHAServiceTarget(Configuration conf,
String nsId, String nnId) {
Preconditions.checkNotNull(nnId);
@@ -75,12 +78,24 @@
}
this.addr = NetUtils.createSocketAddr(serviceAddr,
NameNode.DEFAULT_PORT);
+
+ this.autoFailoverEnabled = targetConf.getBoolean(
+ DFSConfigKeys.DFS_HA_AUTO_FAILOVER_ENABLED_KEY,
+ DFSConfigKeys.DFS_HA_AUTO_FAILOVER_ENABLED_DEFAULT);
+ if (autoFailoverEnabled) {
+ int port = DFSZKFailoverController.getZkfcPort(targetConf);
+ if (port != 0) {
+ setZkfcPort(port);
+ }
+ }
+
try {
this.fencer = NodeFencer.create(targetConf,
DFSConfigKeys.DFS_HA_FENCE_METHODS_KEY);
} catch (BadFencingConfigurationException e) {
this.fenceConfigError = e;
}
+
this.nnId = nnId;
this.nsId = nsId;
}
@@ -94,10 +109,29 @@
}
@Override
+ public InetSocketAddress getZKFCAddress() {
+ Preconditions.checkState(autoFailoverEnabled,
+ "ZKFC address not relevant when auto failover is off");
+ assert zkfcAddr != null;
+
+ return zkfcAddr;
+ }
+
+ void setZkfcPort(int port) {
+ assert autoFailoverEnabled;
+
+ this.zkfcAddr = new InetSocketAddress(addr.getAddress(), port);
+ }
+
+ @Override
public void checkFencingConfigured() throws BadFencingConfigurationException {
if (fenceConfigError != null) {
throw fenceConfigError;
}
+ if (fencer == null) {
+ throw new BadFencingConfigurationException(
+ "No fencer configured for " + this);
+ }
}
@Override
@@ -125,4 +159,9 @@
ret.put(NAMESERVICE_ID_KEY, getNameServiceId());
ret.put(NAMENODE_ID_KEY, getNameNodeId());
}
+
+ @Override
+ public boolean isAutoFailoverEnabled() {
+ return autoFailoverEnabled;
+ }
}
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/proto/HAZKInfo.proto b/hadoop-hdfs-project/hadoop-hdfs/src/main/proto/HAZKInfo.proto
new file mode 100644
index 0000000..364c5bd
--- /dev/null
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/proto/HAZKInfo.proto
@@ -0,0 +1,28 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+option java_package = "org.apache.hadoop.hdfs.server.namenode.ha.proto";
+option java_outer_classname = "HAZKInfoProtos";
+
+message ActiveNodeInfo {
+ required string nameserviceId = 1;
+ required string namenodeId = 2;
+
+ required string hostname = 3;
+ required int32 port = 4;
+ required int32 zkfcPort = 5;
+}
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/resources/hdfs-default.xml b/hadoop-hdfs-project/hadoop-hdfs/src/main/resources/hdfs-default.xml
index 54ce2a2..209ed2e 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/resources/hdfs-default.xml
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/resources/hdfs-default.xml
@@ -837,6 +837,16 @@
</property>
<property>
+ <name>dfs.ha.automatic-failover.enabled</name>
+ <value>false</value>
+ <description>
+ Whether automatic failover is enabled. See the HDFS High
+ Availability documentation for details on automatic HA
+ configuration.
+ </description>
+</property>
+
+<property>
<name>dfs.support.append</name>
<value>true</value>
<description>
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/MiniDFSCluster.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/MiniDFSCluster.java
index b418fcf..8e190ca 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/MiniDFSCluster.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/MiniDFSCluster.java
@@ -67,8 +67,10 @@
import org.apache.hadoop.fs.FileUtil;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.ha.HAServiceProtocol;
+import org.apache.hadoop.ha.HAServiceProtocol.StateChangeRequestInfo;
import org.apache.hadoop.ha.HAServiceProtocolHelper;
import org.apache.hadoop.ha.ServiceFailedException;
+import org.apache.hadoop.ha.HAServiceProtocol.RequestSource;
import org.apache.hadoop.ha.protocolPB.HAServiceProtocolClientSideTranslatorPB;
import org.apache.hadoop.hdfs.MiniDFSNNTopology.NNConf;
import org.apache.hadoop.hdfs.protocol.Block;
@@ -1660,12 +1662,14 @@
public void transitionToActive(int nnIndex) throws IOException,
ServiceFailedException {
- getNameNode(nnIndex).getRpcServer().transitionToActive();
+ getNameNode(nnIndex).getRpcServer().transitionToActive(
+ new StateChangeRequestInfo(RequestSource.REQUEST_BY_USER_FORCED));
}
public void transitionToStandby(int nnIndex) throws IOException,
ServiceFailedException {
- getNameNode(nnIndex).getRpcServer().transitionToStandby();
+ getNameNode(nnIndex).getRpcServer().transitionToStandby(
+ new StateChangeRequestInfo(RequestSource.REQUEST_BY_USER_FORCED));
}
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDFSUtil.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDFSUtil.java
index a0948e6..1896877 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDFSUtil.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDFSUtil.java
@@ -274,7 +274,7 @@
conf.set(DFS_FEDERATION_NAMESERVICE_ID, nsId);
// Set the nameservice specific keys with nameserviceId in the config key
- for (String key : NameNode.NAMESERVICE_SPECIFIC_KEYS) {
+ for (String key : NameNode.NAMENODE_SPECIFIC_KEYS) {
// Note: value is same as the key
conf.set(DFSUtil.addKeySuffixes(key, nsId), key);
}
@@ -284,7 +284,7 @@
// Retrieve the keys without nameserviceId and Ensure generic keys are set
// to the correct value
- for (String key : NameNode.NAMESERVICE_SPECIFIC_KEYS) {
+ for (String key : NameNode.NAMENODE_SPECIFIC_KEYS) {
assertEquals(key, conf.get(key));
}
}
@@ -304,7 +304,7 @@
conf.set(DFS_HA_NAMENODES_KEY_PREFIX + "." + nsId, nnId);
// Set the nameservice specific keys with nameserviceId in the config key
- for (String key : NameNode.NAMESERVICE_SPECIFIC_KEYS) {
+ for (String key : NameNode.NAMENODE_SPECIFIC_KEYS) {
// Note: value is same as the key
conf.set(DFSUtil.addKeySuffixes(key, nsId, nnId), key);
}
@@ -314,7 +314,7 @@
// Retrieve the keys without nameserviceId and Ensure generic keys are set
// to the correct value
- for (String key : NameNode.NAMESERVICE_SPECIFIC_KEYS) {
+ for (String key : NameNode.NAMENODE_SPECIFIC_KEYS) {
assertEquals(key, conf.get(key));
}
}
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/ha/TestDFSZKFailoverController.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/ha/TestDFSZKFailoverController.java
new file mode 100644
index 0000000..52e1369
--- /dev/null
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/ha/TestDFSZKFailoverController.java
@@ -0,0 +1,220 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hdfs.server.namenode.ha;
+
+import static org.junit.Assert.*;
+
+import java.util.concurrent.TimeoutException;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.CommonConfigurationKeysPublic;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.ha.ClientBaseWithFixes;
+import org.apache.hadoop.ha.HealthMonitor;
+import org.apache.hadoop.ha.ZKFCTestUtil;
+import org.apache.hadoop.ha.ZKFailoverController;
+import org.apache.hadoop.ha.HAServiceProtocol.HAServiceState;
+import org.apache.hadoop.ha.TestNodeFencer.AlwaysSucceedFencer;
+import org.apache.hadoop.hdfs.DFSConfigKeys;
+import org.apache.hadoop.hdfs.MiniDFSCluster;
+import org.apache.hadoop.hdfs.MiniDFSNNTopology;
+import org.apache.hadoop.hdfs.server.namenode.NameNode;
+import org.apache.hadoop.hdfs.tools.DFSHAAdmin;
+import org.apache.hadoop.hdfs.tools.DFSZKFailoverController;
+import org.apache.hadoop.test.GenericTestUtils;
+import org.apache.hadoop.test.MultithreadedTestUtil.TestContext;
+import org.apache.hadoop.test.MultithreadedTestUtil.TestingThread;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+
+
+import com.google.common.base.Supplier;
+
+public class TestDFSZKFailoverController extends ClientBaseWithFixes {
+ private Configuration conf;
+ private MiniDFSCluster cluster;
+ private TestContext ctx;
+ private ZKFCThread thr1, thr2;
+ private FileSystem fs;
+
+ @Before
+ public void setup() throws Exception {
+ conf = new Configuration();
+ // Specify the quorum per-nameservice, to ensure that these configs
+ // can be nameservice-scoped.
+ conf.set(ZKFailoverController.ZK_QUORUM_KEY + ".ns1", hostPort);
+ conf.set(DFSConfigKeys.DFS_HA_FENCE_METHODS_KEY,
+ AlwaysSucceedFencer.class.getName());
+ conf.setBoolean(DFSConfigKeys.DFS_HA_AUTO_FAILOVER_ENABLED_KEY, true);
+
+ // Turn off IPC client caching, so that the suite can handle
+ // the restart of the daemons between test cases.
+ conf.setInt(
+ CommonConfigurationKeysPublic.IPC_CLIENT_CONNECTION_MAXIDLETIME_KEY,
+ 0);
+
+ conf.setInt(DFSConfigKeys.DFS_HA_ZKFC_PORT_KEY + ".ns1.nn1", 10003);
+ conf.setInt(DFSConfigKeys.DFS_HA_ZKFC_PORT_KEY + ".ns1.nn2", 10004);
+
+ MiniDFSNNTopology topology = new MiniDFSNNTopology()
+ .addNameservice(new MiniDFSNNTopology.NSConf("ns1")
+ .addNN(new MiniDFSNNTopology.NNConf("nn1").setIpcPort(10001))
+ .addNN(new MiniDFSNNTopology.NNConf("nn2").setIpcPort(10002)));
+ cluster = new MiniDFSCluster.Builder(conf)
+ .nnTopology(topology)
+ .numDataNodes(0)
+ .build();
+ cluster.waitActive();
+
+ ctx = new TestContext();
+ ctx.addThread(thr1 = new ZKFCThread(ctx, 0));
+ assertEquals(0, thr1.zkfc.run(new String[]{"-formatZK"}));
+
+ thr1.start();
+ waitForHAState(0, HAServiceState.ACTIVE);
+
+ ctx.addThread(thr2 = new ZKFCThread(ctx, 1));
+ thr2.start();
+
+ // Wait for the ZKFCs to fully start up
+ ZKFCTestUtil.waitForHealthState(thr1.zkfc,
+ HealthMonitor.State.SERVICE_HEALTHY, ctx);
+ ZKFCTestUtil.waitForHealthState(thr2.zkfc,
+ HealthMonitor.State.SERVICE_HEALTHY, ctx);
+
+ fs = HATestUtil.configureFailoverFs(cluster, conf);
+ }
+
+ @After
+ public void shutdown() throws Exception {
+ cluster.shutdown();
+
+ if (thr1 != null) {
+ thr1.interrupt();
+ }
+ if (thr2 != null) {
+ thr2.interrupt();
+ }
+ if (ctx != null) {
+ ctx.stop();
+ }
+ }
+
+ /**
+ * Test that automatic failover is triggered by shutting the
+ * active NN down.
+ */
+ @Test(timeout=30000)
+ public void testFailoverAndBackOnNNShutdown() throws Exception {
+ Path p1 = new Path("/dir1");
+ Path p2 = new Path("/dir2");
+
+ // Write some data on the first NN
+ fs.mkdirs(p1);
+ // Shut it down, causing automatic failover
+ cluster.shutdownNameNode(0);
+ // Data should still exist. Write some on the new NN
+ assertTrue(fs.exists(p1));
+ fs.mkdirs(p2);
+ assertEquals(AlwaysSucceedFencer.getLastFencedService().getAddress(),
+ thr1.zkfc.getLocalTarget().getAddress());
+
+ // Start the first node back up
+ cluster.restartNameNode(0);
+ // This should have no effect -- the new node should be STANDBY.
+ waitForHAState(0, HAServiceState.STANDBY);
+ assertTrue(fs.exists(p1));
+ assertTrue(fs.exists(p2));
+ // Shut down the second node, which should failback to the first
+ cluster.shutdownNameNode(1);
+ waitForHAState(0, HAServiceState.ACTIVE);
+
+ // First node should see what was written on the second node while it was down.
+ assertTrue(fs.exists(p1));
+ assertTrue(fs.exists(p2));
+ assertEquals(AlwaysSucceedFencer.getLastFencedService().getAddress(),
+ thr2.zkfc.getLocalTarget().getAddress());
+ }
+
+ @Test(timeout=30000)
+ public void testManualFailover() throws Exception {
+ thr2.zkfc.getLocalTarget().getZKFCProxy(conf, 15000).gracefulFailover();
+ waitForHAState(0, HAServiceState.STANDBY);
+ waitForHAState(1, HAServiceState.ACTIVE);
+
+ thr1.zkfc.getLocalTarget().getZKFCProxy(conf, 15000).gracefulFailover();
+ waitForHAState(0, HAServiceState.ACTIVE);
+ waitForHAState(1, HAServiceState.STANDBY);
+ }
+
+ @Test(timeout=30000)
+ public void testManualFailoverWithDFSHAAdmin() throws Exception {
+ DFSHAAdmin tool = new DFSHAAdmin();
+ tool.setConf(conf);
+ assertEquals(0,
+ tool.run(new String[]{"-failover", "nn1", "nn2"}));
+ waitForHAState(0, HAServiceState.STANDBY);
+ waitForHAState(1, HAServiceState.ACTIVE);
+ assertEquals(0,
+ tool.run(new String[]{"-failover", "nn2", "nn1"}));
+ waitForHAState(0, HAServiceState.ACTIVE);
+ waitForHAState(1, HAServiceState.STANDBY);
+ }
+
+ private void waitForHAState(int nnidx, final HAServiceState state)
+ throws TimeoutException, InterruptedException {
+ final NameNode nn = cluster.getNameNode(nnidx);
+ GenericTestUtils.waitFor(new Supplier<Boolean>() {
+ @Override
+ public Boolean get() {
+ try {
+ return nn.getRpcServer().getServiceStatus().getState() == state;
+ } catch (Exception e) {
+ e.printStackTrace();
+ return false;
+ }
+ }
+ }, 50, 5000);
+ }
+
+ /**
+ * Test-thread which runs a ZK Failover Controller corresponding
+ * to a given NameNode in the minicluster.
+ */
+ private class ZKFCThread extends TestingThread {
+ private final DFSZKFailoverController zkfc;
+
+ public ZKFCThread(TestContext ctx, int idx) {
+ super(ctx);
+ this.zkfc = DFSZKFailoverController.create(
+ cluster.getConfiguration(idx));
+ }
+
+ @Override
+ public void doWork() throws Exception {
+ try {
+ assertEquals(0, zkfc.run(new String[0]));
+ } catch (InterruptedException ie) {
+ // Interrupted by main thread, that's OK.
+ }
+ }
+ }
+
+}
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/ha/TestEditLogsDuringFailover.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/ha/TestEditLogsDuringFailover.java
index a245301..79dcec4 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/ha/TestEditLogsDuringFailover.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/ha/TestEditLogsDuringFailover.java
@@ -71,7 +71,7 @@
// Set the first NN to active, make sure it creates edits
// in its own dirs and the shared dir. The standby
// should still have no edits!
- cluster.getNameNode(0).getRpcServer().transitionToActive();
+ cluster.transitionToActive(0);
assertEditFiles(cluster.getNameDirs(0),
NNStorage.getInProgressEditsFileName(1));
@@ -107,7 +107,7 @@
// If we restart NN0, it'll come back as standby, and we can
// transition NN1 to active and make sure it reads edits correctly at this point.
cluster.restartNameNode(0);
- cluster.getNameNode(1).getRpcServer().transitionToActive();
+ cluster.transitionToActive(1);
// NN1 should have both the edits that came before its restart, and the edits that
// came after its restart.
@@ -134,7 +134,7 @@
NNStorage.getInProgressEditsFileName(1));
// Transition one of the NNs to active
- cluster.getNameNode(0).getRpcServer().transitionToActive();
+ cluster.transitionToActive(0);
// In the transition to active, it should have read the log -- and
// hence see one of the dirs we made in the fake log.
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/ha/TestHASafeMode.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/ha/TestHASafeMode.java
index 8790d0f..5af5391 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/ha/TestHASafeMode.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/ha/TestHASafeMode.java
@@ -34,6 +34,8 @@
import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.ha.HAServiceProtocol.RequestSource;
+import org.apache.hadoop.ha.HAServiceProtocol.StateChangeRequestInfo;
import org.apache.hadoop.hdfs.DFSConfigKeys;
import org.apache.hadoop.hdfs.DFSTestUtil;
import org.apache.hadoop.hdfs.HAUtil;
@@ -129,7 +131,8 @@
DFSTestUtil
.createFile(fs, new Path("/test"), 3 * BLOCK_SIZE, (short) 3, 1L);
restartActive();
- nn0.getRpcServer().transitionToActive();
+ nn0.getRpcServer().transitionToActive(
+ new StateChangeRequestInfo(RequestSource.REQUEST_BY_USER));
FSNamesystem namesystem = nn0.getNamesystem();
String status = namesystem.getSafemode();
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/ha/TestHAStateTransitions.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/ha/TestHAStateTransitions.java
index 092bb5a..e44ebc9 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/ha/TestHAStateTransitions.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/ha/TestHAStateTransitions.java
@@ -37,6 +37,8 @@
import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.ha.HAServiceProtocol.StateChangeRequestInfo;
+import org.apache.hadoop.ha.HAServiceProtocol.RequestSource;
import org.apache.hadoop.hdfs.DFSConfigKeys;
import org.apache.hadoop.hdfs.DFSTestUtil;
import org.apache.hadoop.hdfs.MiniDFSCluster;
@@ -71,6 +73,8 @@
private static final String TEST_FILE_STR = TEST_FILE_PATH.toUri().getPath();
private static final String TEST_FILE_DATA =
"Hello state transitioning world";
+ private static final StateChangeRequestInfo REQ_INFO = new StateChangeRequestInfo(
+ RequestSource.REQUEST_BY_USER_FORCED);
static {
((Log4JLogger)EditLogTailer.LOG).getLogger().setLevel(Level.ALL);
@@ -481,19 +485,19 @@
assertFalse(isDTRunning(nn));
banner("Transition 1->3. Should not start secret manager.");
- nn.getRpcServer().transitionToActive();
+ nn.getRpcServer().transitionToActive(REQ_INFO);
assertFalse(nn.isStandbyState());
assertTrue(nn.isInSafeMode());
assertFalse(isDTRunning(nn));
banner("Transition 3->1. Should not start secret manager.");
- nn.getRpcServer().transitionToStandby();
+ nn.getRpcServer().transitionToStandby(REQ_INFO);
assertTrue(nn.isStandbyState());
assertTrue(nn.isInSafeMode());
assertFalse(isDTRunning(nn));
banner("Transition 1->3->4. Should start secret manager.");
- nn.getRpcServer().transitionToActive();
+ nn.getRpcServer().transitionToActive(REQ_INFO);
NameNodeAdapter.leaveSafeMode(nn, false);
assertFalse(nn.isStandbyState());
assertFalse(nn.isInSafeMode());
@@ -514,13 +518,13 @@
for (int i = 0; i < 20; i++) {
// Loop the last check to suss out races.
banner("Transition 4->2. Should stop secret manager.");
- nn.getRpcServer().transitionToStandby();
+ nn.getRpcServer().transitionToStandby(REQ_INFO);
assertTrue(nn.isStandbyState());
assertFalse(nn.isInSafeMode());
assertFalse(isDTRunning(nn));
banner("Transition 2->4. Should start secret manager");
- nn.getRpcServer().transitionToActive();
+ nn.getRpcServer().transitionToActive(REQ_INFO);
assertFalse(nn.isStandbyState());
assertFalse(nn.isInSafeMode());
assertTrue(isDTRunning(nn));
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/ha/TestInitializeSharedEdits.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/ha/TestInitializeSharedEdits.java
index b976a9c..8fc4d15 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/ha/TestInitializeSharedEdits.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/ha/TestInitializeSharedEdits.java
@@ -27,6 +27,8 @@
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.FileUtil;
import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.ha.HAServiceProtocol.RequestSource;
+import org.apache.hadoop.ha.HAServiceProtocol.StateChangeRequestInfo;
import org.apache.hadoop.ha.ServiceFailedException;
import org.apache.hadoop.hdfs.DFSConfigKeys;
import org.apache.hadoop.hdfs.DFSUtil;
@@ -111,7 +113,8 @@
cluster.restartNameNode(1, true);
// Make sure HA is working.
- cluster.getNameNode(0).getRpcServer().transitionToActive();
+ cluster.getNameNode(0).getRpcServer().transitionToActive(
+ new StateChangeRequestInfo(RequestSource.REQUEST_BY_USER));
FileSystem fs = null;
try {
Path newPath = new Path(TEST_PATH, pathSuffix);
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/ha/TestNNHealthCheck.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/ha/TestNNHealthCheck.java
index ab2a8dd..e5b53ba 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/ha/TestNNHealthCheck.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/ha/TestNNHealthCheck.java
@@ -22,6 +22,8 @@
import java.io.IOException;
import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.ha.HAServiceProtocol.StateChangeRequestInfo;
+import org.apache.hadoop.ha.HAServiceProtocol.RequestSource;
import org.apache.hadoop.ha.HealthCheckFailedException;
import org.apache.hadoop.hdfs.MiniDFSCluster;
import org.apache.hadoop.hdfs.MiniDFSNNTopology;
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/tools/TestDFSHAAdmin.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/tools/TestDFSHAAdmin.java
index 4c4d0f2..8b0fd0e 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/tools/TestDFSHAAdmin.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/tools/TestDFSHAAdmin.java
@@ -20,6 +20,7 @@
import static org.junit.Assert.*;
+import java.io.ByteArrayInputStream;
import java.io.ByteArrayOutputStream;
import java.io.IOException;
import java.io.PrintStream;
@@ -32,14 +33,17 @@
import org.apache.hadoop.hdfs.HdfsConfiguration;
import org.apache.hadoop.ha.HAServiceProtocol;
import org.apache.hadoop.ha.HAServiceProtocol.HAServiceState;
+import org.apache.hadoop.ha.HAServiceProtocol.StateChangeRequestInfo;
+import org.apache.hadoop.ha.HAServiceProtocol.RequestSource;
import org.apache.hadoop.ha.HAServiceStatus;
import org.apache.hadoop.ha.HAServiceTarget;
import org.apache.hadoop.ha.HealthCheckFailedException;
-import org.apache.hadoop.ha.NodeFencer;
+import org.apache.hadoop.ha.ZKFCProtocol;
import org.apache.hadoop.test.MockitoUtil;
import org.junit.Before;
import org.junit.Test;
+import org.mockito.ArgumentCaptor;
import org.mockito.Mockito;
import com.google.common.base.Charsets;
@@ -52,6 +56,7 @@
private ByteArrayOutputStream errOutBytes = new ByteArrayOutputStream();
private String errOutput;
private HAServiceProtocol mockProtocol;
+ private ZKFCProtocol mockZkfcProtocol;
private static final String NSID = "ns1";
@@ -59,6 +64,9 @@
new HAServiceStatus(HAServiceState.STANDBY)
.setReadyToBecomeActive();
+ private ArgumentCaptor<StateChangeRequestInfo> reqInfoCaptor =
+ ArgumentCaptor.forClass(StateChangeRequestInfo.class);
+
private static String HOST_A = "1.2.3.1";
private static String HOST_B = "1.2.3.2";
@@ -81,6 +89,7 @@
@Before
public void setup() throws IOException {
mockProtocol = MockitoUtil.mockProtocol(HAServiceProtocol.class);
+ mockZkfcProtocol = MockitoUtil.mockProtocol(ZKFCProtocol.class);
tool = new DFSHAAdmin() {
@Override
@@ -90,7 +99,9 @@
// OVerride the target to return our mock protocol
try {
Mockito.doReturn(mockProtocol).when(spy).getProxy(
- Mockito.<Configuration>any(), Mockito.anyInt());
+ Mockito.<Configuration>any(), Mockito.anyInt());
+ Mockito.doReturn(mockZkfcProtocol).when(spy).getZKFCProxy(
+ Mockito.<Configuration>any(), Mockito.anyInt());
} catch (IOException e) {
throw new AssertionError(e); // mock setup doesn't really throw
}
@@ -139,13 +150,89 @@
@Test
public void testTransitionToActive() throws Exception {
assertEquals(0, runTool("-transitionToActive", "nn1"));
- Mockito.verify(mockProtocol).transitionToActive();
+ Mockito.verify(mockProtocol).transitionToActive(
+ reqInfoCaptor.capture());
+ assertEquals(RequestSource.REQUEST_BY_USER,
+ reqInfoCaptor.getValue().getSource());
+ }
+
+ /**
+ * Test that, if automatic HA is enabled, none of the mutative operations
+ * will succeed, unless the -forcemanual flag is specified.
+ * @throws Exception
+ */
+ @Test
+ public void testMutativeOperationsWithAutoHaEnabled() throws Exception {
+ Mockito.doReturn(STANDBY_READY_RESULT).when(mockProtocol).getServiceStatus();
+
+ // Turn on auto-HA in the config
+ HdfsConfiguration conf = getHAConf();
+ conf.setBoolean(DFSConfigKeys.DFS_HA_AUTO_FAILOVER_ENABLED_KEY, true);
+ conf.set(DFSConfigKeys.DFS_HA_FENCE_METHODS_KEY, "shell(true)");
+ tool.setConf(conf);
+
+ // Should fail without the forcemanual flag
+ assertEquals(-1, runTool("-transitionToActive", "nn1"));
+ assertTrue(errOutput.contains("Refusing to manually manage"));
+ assertEquals(-1, runTool("-transitionToStandby", "nn1"));
+ assertTrue(errOutput.contains("Refusing to manually manage"));
+
+ Mockito.verify(mockProtocol, Mockito.never())
+ .transitionToActive(anyReqInfo());
+ Mockito.verify(mockProtocol, Mockito.never())
+ .transitionToStandby(anyReqInfo());
+
+ // Force flag should bypass the check and change the request source
+ // for the RPC
+ setupConfirmationOnSystemIn();
+ assertEquals(0, runTool("-transitionToActive", "-forcemanual", "nn1"));
+ setupConfirmationOnSystemIn();
+ assertEquals(0, runTool("-transitionToStandby", "-forcemanual", "nn1"));
+
+ Mockito.verify(mockProtocol, Mockito.times(1)).transitionToActive(
+ reqInfoCaptor.capture());
+ Mockito.verify(mockProtocol, Mockito.times(1)).transitionToStandby(
+ reqInfoCaptor.capture());
+
+ // All of the RPCs should have had the "force" source
+ for (StateChangeRequestInfo ri : reqInfoCaptor.getAllValues()) {
+ assertEquals(RequestSource.REQUEST_BY_USER_FORCED, ri.getSource());
+ }
+ }
+
+ /**
+ * Setup System.in with a stream that feeds a "yes" answer on the
+ * next prompt.
+ */
+ private static void setupConfirmationOnSystemIn() {
+ // Answer "yes" to the prompt about transition to active
+ System.setIn(new ByteArrayInputStream("yes\n".getBytes()));
+ }
+
+ /**
+ * Test that, even if automatic HA is enabled, the monitoring operations
+ * still function correctly.
+ */
+ @Test
+ public void testMonitoringOperationsWithAutoHaEnabled() throws Exception {
+ Mockito.doReturn(STANDBY_READY_RESULT).when(mockProtocol).getServiceStatus();
+
+ // Turn on auto-HA
+ HdfsConfiguration conf = getHAConf();
+ conf.setBoolean(DFSConfigKeys.DFS_HA_AUTO_FAILOVER_ENABLED_KEY, true);
+ tool.setConf(conf);
+
+ assertEquals(0, runTool("-checkHealth", "nn1"));
+ Mockito.verify(mockProtocol).monitorHealth();
+
+ assertEquals(0, runTool("-getServiceState", "nn1"));
+ Mockito.verify(mockProtocol).getServiceStatus();
}
@Test
public void testTransitionToStandby() throws Exception {
assertEquals(0, runTool("-transitionToStandby", "nn1"));
- Mockito.verify(mockProtocol).transitionToStandby();
+ Mockito.verify(mockProtocol).transitionToStandby(anyReqInfo());
}
@Test
@@ -213,6 +300,19 @@
tool.setConf(conf);
assertEquals(-1, runTool("-failover", "nn1", "nn2", "--forcefence"));
}
+
+ @Test
+ public void testFailoverWithAutoHa() throws Exception {
+ Mockito.doReturn(STANDBY_READY_RESULT).when(mockProtocol).getServiceStatus();
+ // Turn on auto-HA in the config
+ HdfsConfiguration conf = getHAConf();
+ conf.setBoolean(DFSConfigKeys.DFS_HA_AUTO_FAILOVER_ENABLED_KEY, true);
+ conf.set(DFSConfigKeys.DFS_HA_FENCE_METHODS_KEY, "shell(true)");
+ tool.setConf(conf);
+
+ assertEquals(0, runTool("-failover", "nn1", "nn2"));
+ Mockito.verify(mockZkfcProtocol).gracefulFailover();
+ }
@Test
public void testForceFenceOptionListedBeforeArgs() throws Exception {
@@ -283,4 +383,8 @@
LOG.info("Output:\n" + errOutput);
return ret;
}
+
+ private StateChangeRequestInfo anyReqInfo() {
+ return Mockito.<StateChangeRequestInfo>any();
+ }
}
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/resources/hadoop-policy.xml b/hadoop-hdfs-project/hadoop-hdfs/src/test/resources/hadoop-policy.xml
index eb3f4bd..afbf420 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/test/resources/hadoop-policy.xml
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/resources/hadoop-policy.xml
@@ -116,5 +116,11 @@
<description>ACL for HAService protocol used by HAAdmin to manage the
active and stand-by states of namenode.</description>
</property>
+ <property>
+ <name>security.zkfc.protocol.acl</name>
+ <value>*</value>
+ <description>ACL for access to the ZK Failover Controller
+ </description>
+ </property>
</configuration>
diff --git a/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-site/src/site/apt/HDFSHighAvailability.apt.vm b/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-site/src/site/apt/HDFSHighAvailability.apt.vm
index 94fb854..d78b75b 100644
--- a/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-site/src/site/apt/HDFSHighAvailability.apt.vm
+++ b/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-site/src/site/apt/HDFSHighAvailability.apt.vm
@@ -33,7 +33,7 @@
* {Background}
- Prior to Hadoop 0.23.2, the NameNode was a single point of failure (SPOF) in
+ Prior to Hadoop 2.0.0, the NameNode was a single point of failure (SPOF) in
an HDFS cluster. Each cluster had a single NameNode, and if that machine or
process became unavailable, the cluster as a whole would be unavailable
until the NameNode was either restarted or brought up on a separate machine.
@@ -90,12 +90,6 @@
prevents it from making any further edits to the namespace, allowing the new
Active to safely proceed with failover.
- <<Note:>> Currently, only manual failover is supported. This means the HA
- NameNodes are incapable of automatically detecting a failure of the Active
- NameNode, and instead rely on the operator to manually initiate a failover.
- Automatic failure detection and initiation of a failover will be implemented in
- future versions.
-
* {Hardware resources}
In order to deploy an HA cluster, you should prepare the following:
@@ -459,3 +453,263 @@
<<Note:>> This is not yet implemented, and at present will always return
success, unless the given NameNode is completely down.
+
+* {Automatic Failover}
+
+** Introduction
+
+ The above sections describe how to configure manual failover. In that mode,
+ the system will not automatically trigger a failover from the active to the
+ standby NameNode, even if the active node has failed. This section describes
+ how to configure and deploy automatic failover.
+
+** Components
+
+ Automatic failover adds two new components to an HDFS deployment: a ZooKeeper
+ quorum, and the ZKFailoverController process (abbreviated as ZKFC).
+
+ Apache ZooKeeper is a highly available service for maintaining small amounts
+ of coordination data, notifying clients of changes in that data, and
+ monitoring clients for failures. The implementation of automatic HDFS failover
+ relies on ZooKeeper for the following things:
+
+ * <<Failure detection>> - each of the NameNode machines in the cluster
+ maintains a persistent session in ZooKeeper. If the machine crashes, the
+ ZooKeeper session will expire, notifying the other NameNode that a failover
+ should be triggered.
+
+ * <<Active NameNode election>> - ZooKeeper provides a simple mechanism to
+ exclusively elect a node as active. If the current active NameNode crashes,
+ another node may take a special exclusive lock in ZooKeeper indicating that
+ it should become the next active.
+
+ The ZKFailoverController (ZKFC) is a new component which is a ZooKeeper client
+ which also monitors and manages the state of the NameNode. Each of the
+ machines which runs a NameNode also runs a ZKFC, and that ZKFC is responsible
+ for:
+
+ * <<Health monitoring>> - the ZKFC pings its local NameNode on a periodic
+ basis with a health-check command. So long as the NameNode responds in a
+ timely fashion with a healthy status, the ZKFC considers the node
+ healthy. If the node has crashed, frozen, or otherwise entered an unhealthy
+ state, the health monitor will mark it as unhealthy.
+
+ * <<ZooKeeper session management>> - when the local NameNode is healthy, the
+ ZKFC holds a session open in ZooKeeper. If the local NameNode is active, it
+ also holds a special "lock" znode. This lock uses ZooKeeper's support for
+ "ephemeral" nodes; if the session expires, the lock node will be
+ automatically deleted.
+
+ * <<ZooKeeper-based election>> - if the local NameNode is healthy, and the
+ ZKFC sees that no other node currently holds the lock znode, it will itself
+ try to acquire the lock. If it succeeds, then it has "won the election", and
+ is responsible for running a failover to make its local NameNode active. The
+ failover process is similar to the manual failover described above: first,
+ the previous active is fenced if necessary, and then the local NameNode
+ transitions to active state.
+
+ For more details on the design of automatic failover, refer to the design
+ document attached to HDFS-2185 on the Apache HDFS JIRA.
+
+** Deploying ZooKeeper
+
+ In a typical deployment, ZooKeeper daemons are configured to run on three or
+ five nodes. Since ZooKeeper itself has light resource requirements, it is
+ acceptable to collocate the ZooKeeper nodes on the same hardware as the HDFS
+ NameNode and Standby Node. Many operators choose to deploy the third ZooKeeper
+ process on the same node as the YARN ResourceManager. It is advisable to
+ configure the ZooKeeper nodes to store their data on separate disk drives from
+ the HDFS metadata for best performance and isolation.
+
+ The setup of ZooKeeper is out of scope for this document. We will assume that
+ you have set up a ZooKeeper cluster running on three or more nodes, and have
+ verified its correct operation by connecting using the ZK CLI.
+
+** Before you begin
+
+ Before you begin configuring automatic failover, you should shut down your
+ cluster. It is not currently possible to transition from a manual failover
+ setup to an automatic failover setup while the cluster is running.
+
+** Configuring automatic failover
+
+ The configuration of automatic failover requires the addition of two new
+ parameters to your configuration. In your <<<hdfs-site.xml>>> file, add:
+
+----
+ <property>
+ <name>dfs.ha.automatic-failover.enabled</name>
+ <value>true</value>
+ </property>
+----
+
+ This specifies that the cluster should be set up for automatic failover.
+ In your <<<core-site.xml>>> file, add:
+
+----
+ <property>
+ <name>ha.zookeeper.quorum</name>
+ <value>zk1.example.com:2181,zk2.example.com:2181,zk3.example.com:2181</value>
+ </property>
+----
+
+ This lists the host-port pairs running the ZooKeeper service.
+
+ As with the parameters described earlier in the document, these settings may
+ be configured on a per-nameservice basis by suffixing the configuration key
+ with the nameservice ID. For example, in a cluster with federation enabled,
+ you can explicitly enable automatic failover for only one of the nameservices
+ by setting <<<dfs.ha.automatic-failover.enabled.my-nameservice-id>>>.
+
+ There are also several other configuration parameters which may be set to
+ control the behavior of automatic failover; however, they are not necessary
+ for most installations. Please refer to the configuration key specific
+ documentation for details.
+
+** Initializing HA state in ZooKeeper
+
+ After the configuration keys have been added, the next step is to initialize
+ required state in ZooKeeper. You can do so by running the following command
+ from one of the NameNode hosts.
+
+----
+$ hdfs zkfc -formatZK
+----
+
+ This will create a znode in ZooKeeper inside of which the automatic failover
+ system stores its data.
+
+** Starting the cluster with <<<start-dfs.sh>>>
+
+ Since automatic failover has been enabled in the configuration, the
+ <<<start-dfs.sh>>> script will now automatically start a ZKFC daemon on any
+ machine that runs a NameNode. When the ZKFCs start, they will automatically
+ select one of the NameNodes to become active.
+
+** Starting the cluster manually
+
+ If you manually manage the services on your cluster, you will need to manually
+ start the <<<zkfc>>> daemon on each of the machines that runs a NameNode. You
+ can start the daemon by running:
+
+----
+$ hadoop-daemon.sh start zkfc
+----
+
+** Securing access to ZooKeeper
+
+ If you are running a secure cluster, you will likely want to ensure that the
+ information stored in ZooKeeper is also secured. This prevents malicious
+ clients from modifying the metadata in ZooKeeper or potentially triggering a
+ false failover.
+
+ In order to secure the information in ZooKeeper, first add the following to
+ your <<<core-site.xml>>> file:
+
+----
+ <property>
+ <name>ha.zookeeper.auth</name>
+ <value>@/path/to/zk-auth.txt</value>
+ </property>
+ <property>
+ <name>ha.zookeeper.acl</name>
+ <value>@/path/to/zk-acl.txt</value>
+ </property>
+----
+
+ Please note the '@' character in these values -- this specifies that the
+ configurations are not inline, but rather point to a file on disk.
+
+ The first configured file specifies a list of ZooKeeper authentications, in
+ the same format as used by the ZK CLI. For example, you may specify something
+ like:
+
+----
+digest:hdfs-zkfcs:mypassword
+----
+ ...where <<<hdfs-zkfcs>>> is a unique username for ZooKeeper, and
+ <<<mypassword>>> is some unique string used as a password.
+
+ Next, generate a ZooKeeper ACL that corresponds to this authentication, using
+ a command like the following:
+
+----
+$ java -cp $ZK_HOME/lib/*:$ZK_HOME/zookeeper-3.4.2.jar org.apache.zookeeper.server.auth.DigestAuthenticationProvider hdfs-zkfcs:mypassword
+output: hdfs-zkfcs:mypassword->hdfs-zkfcs:P/OQvnYyU/nF/mGYvB/xurX8dYs=
+----
+
+ Copy and paste the section of this output after the '->' string into the file
+ <<<zk-acls.txt>>>, prefixed by the string "<<<digest:>>>". For example:
+
+----
+digest:hdfs-zkfcs:vlUvLnd8MlacsE80rDuu6ONESbM=:rwcda
+----
+
+ In order for these ACLs to take effect, you should then rerun the
+ <<<zkfc -formatZK>>> command as described above.
+
+ After doing so, you may verify the ACLs from the ZK CLI as follows:
+
+----
+[zk: localhost:2181(CONNECTED) 1] getAcl /hadoop-ha
+'digest,'hdfs-zkfcs:vlUvLnd8MlacsE80rDuu6ONESbM=
+: cdrwa
+----
+
+** Verifying automatic failover
+
+ Once automatic failover has been set up, you should test its operation. To do
+ so, first locate the active NameNode. You can tell which node is active by
+ visiting the NameNode web interfaces -- each node reports its HA state at the
+ top of the page.
+
+ Once you have located your active NameNode, you may cause a failure on that
+ node. For example, you can use <<<kill -9 <pid of NN>>>> to simulate a JVM
+ crash. Or, you could power cycle the machine or unplug its network interface
+ to simulate a different kind of outage. After triggering the outage you wish
+ to test, the other NameNode should automatically become active within several
+ seconds. The amount of time required to detect a failure and trigger a
+ fail-over depends on the configuration of
+ <<<ha.zookeeper.session-timeout.ms>>>, but defaults to 5 seconds.
+
+ If the test does not succeed, you may have a misconfiguration. Check the logs
+ for the <<<zkfc>>> daemons as well as the NameNode daemons in order to further
+ diagnose the issue.
+
+
+* Automatic Failover FAQ
+
+ * <<Is it important that I start the ZKFC and NameNode daemons in any
+ particular order?>>
+
+ No. On any given node you may start the ZKFC before or after its corresponding
+ NameNode.
+
+ * <<What additional monitoring should I put in place?>>
+
+ You should add monitoring on each host that runs a NameNode to ensure that the
+ ZKFC remains running. In some types of ZooKeeper failures, for example, the
+ ZKFC may unexpectedly exit, and should be restarted to ensure that the system
+ is ready for automatic failover.
+
+ Additionally, you should monitor each of the servers in the ZooKeeper
+ quorum. If ZooKeeper crashes, then automatic failover will not function.
+
+ * <<What happens if ZooKeeper goes down?>>
+
+ If the ZooKeeper cluster crashes, no automatic failovers will be triggered.
+ However, HDFS will continue to run without any impact. When ZooKeeper is
+ restarted, HDFS will reconnect with no issues.
+
+ * <<Can I designate one of my NameNodes as primary/preferred?>>
+
+ No. Currently, this is not supported. Whichever NameNode is started first will
+ become active. You may choose to start the cluster in a specific order such
+ that your preferred node starts first.
+
+ * <<How can I initiate a manual failover when automatic failover is
+ configured?>>
+
+ Even if automatic failover is configured, you may initiate a manual failover
+ using the same <<<hdfs haadmin>>> command. It will perform a coordinated
+ failover.
\ No newline at end of file