YARN-5677. RM should transition to standby when connection is lost for an extended period. (Daniel Templeton via kasha)
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/EmbeddedElectorService.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/EmbeddedElectorService.java
index 72327e8..88d2e10 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/EmbeddedElectorService.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/EmbeddedElectorService.java
@@ -17,6 +17,7 @@
*/
package org.apache.hadoop.yarn.server.resourcemanager;
+import com.google.common.annotations.VisibleForTesting;
import com.google.protobuf.InvalidProtocolBufferException;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
@@ -39,6 +40,8 @@
import java.io.IOException;
import java.util.List;
+import java.util.Timer;
+import java.util.TimerTask;
@InterfaceAudience.Private
@InterfaceStability.Unstable
@@ -54,6 +57,10 @@
private byte[] localActiveNodeInfo;
private ActiveStandbyElector elector;
+ private long zkSessionTimeout;
+ private Timer zkDisconnectTimer;
+ @VisibleForTesting
+ final Object zkDisconnectLock = new Object();
EmbeddedElectorService(RMContext rmContext) {
super(EmbeddedElectorService.class.getName());
@@ -80,7 +87,7 @@
YarnConfiguration.DEFAULT_AUTO_FAILOVER_ZK_BASE_PATH);
String electionZNode = zkBasePath + "/" + clusterId;
- long zkSessionTimeout = conf.getLong(YarnConfiguration.RM_ZK_TIMEOUT_MS,
+ zkSessionTimeout = conf.getLong(YarnConfiguration.RM_ZK_TIMEOUT_MS,
YarnConfiguration.DEFAULT_RM_ZK_TIMEOUT_MS);
List<ACL> zkAcls = RMZKUtils.getZKAcls(conf);
@@ -123,6 +130,8 @@
@Override
public void becomeActive() throws ServiceFailedException {
+ cancelDisconnectTimer();
+
try {
rmContext.getRMAdminService().transitionToActive(req);
} catch (Exception e) {
@@ -132,6 +141,8 @@
@Override
public void becomeStandby() {
+ cancelDisconnectTimer();
+
try {
rmContext.getRMAdminService().transitionToStandby(req);
} catch (Exception e) {
@@ -139,13 +150,49 @@
}
}
+ /**
+ * Stop the disconnect timer. Any running tasks will be allowed to complete.
+ */
+ private void cancelDisconnectTimer() {
+ synchronized (zkDisconnectLock) {
+ if (zkDisconnectTimer != null) {
+ zkDisconnectTimer.cancel();
+ zkDisconnectTimer = null;
+ }
+ }
+ }
+
+ /**
+ * When the ZK client loses contact with ZK, this method will be called to
+ * allow the RM to react. Because the loss of connection can be noticed
+ * before the session timeout happens, it is undesirable to transition
+ * immediately. Instead the method starts a timer that will wait
+ * {@link YarnConfiguration#RM_ZK_TIMEOUT_MS} milliseconds before
+ * initiating the transition into standby state.
+ */
@Override
public void enterNeutralMode() {
- /**
- * Possibly due to transient connection issues. Do nothing.
- * TODO: Might want to keep track of how long in this state and transition
- * to standby.
- */
+ LOG.warn("Lost contact with Zookeeper. Transitioning to standby in "
+ + zkSessionTimeout + " ms if connection is not reestablished.");
+
+ // If we've just become disconnected, start a timer. When the time's up,
+ // we'll transition to standby.
+ synchronized (zkDisconnectLock) {
+ if (zkDisconnectTimer == null) {
+ zkDisconnectTimer = new Timer("Zookeeper disconnect timer");
+ zkDisconnectTimer.schedule(new TimerTask() {
+ @Override
+ public void run() {
+ synchronized (zkDisconnectLock) {
+ // Only run if the timer hasn't been cancelled
+ if (zkDisconnectTimer != null) {
+ becomeStandby();
+ }
+ }
+ }
+ }, zkSessionTimeout);
+ }
+ }
}
@SuppressWarnings(value = "unchecked")
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestRMEmbeddedElector.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestRMEmbeddedElector.java
index 20b1c0e..bfd0b4e 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestRMEmbeddedElector.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestRMEmbeddedElector.java
@@ -28,6 +28,14 @@
import java.io.IOException;
import java.util.concurrent.atomic.AtomicBoolean;
+import static org.junit.Assert.fail;
+import static org.mockito.Matchers.any;
+import static org.mockito.Mockito.atLeast;
+import static org.mockito.Mockito.atMost;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.never;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.when;
public class TestRMEmbeddedElector extends ClientBaseWithFixes {
private static final Log LOG =
@@ -41,6 +49,14 @@
private Configuration conf;
private AtomicBoolean callbackCalled;
+ private enum SyncTestType {
+ ACTIVE,
+ STANDBY,
+ NEUTRAL,
+ ACTIVE_TIMING,
+ STANDBY_TIMING
+ }
+
@Before
public void setup() throws IOException {
conf = new YarnConfiguration();
@@ -79,6 +95,181 @@
LOG.info("Stopped RM");
}
+ /**
+ * Test that neutral mode plays well with all other transitions.
+ *
+ * @throws IOException if there's an issue transitioning
+ * @throws InterruptedException if interrupted
+ */
+ @Test
+ public void testCallbackSynchronization()
+ throws IOException, InterruptedException {
+ testCallbackSynchronization(SyncTestType.ACTIVE);
+ testCallbackSynchronization(SyncTestType.STANDBY);
+ testCallbackSynchronization(SyncTestType.NEUTRAL);
+ testCallbackSynchronization(SyncTestType.ACTIVE_TIMING);
+ testCallbackSynchronization(SyncTestType.STANDBY_TIMING);
+ }
+
+ /**
+ * Helper method to test that neutral mode plays well with other transitions.
+ *
+ * @param type the type of test to run
+ * @throws IOException if there's an issue transitioning
+ * @throws InterruptedException if interrupted
+ */
+ private void testCallbackSynchronization(SyncTestType type)
+ throws IOException, InterruptedException {
+ AdminService as = mock(AdminService.class);
+ RMContext rc = mock(RMContext.class);
+ Configuration myConf = new Configuration(conf);
+
+ myConf.setInt(YarnConfiguration.RM_ZK_TIMEOUT_MS, 50);
+ when(rc.getRMAdminService()).thenReturn(as);
+
+ EmbeddedElectorService ees = new EmbeddedElectorService(rc);
+ ees.init(myConf);
+
+ ees.enterNeutralMode();
+
+ switch (type) {
+ case ACTIVE:
+ testCallbackSynchronizationActive(as, ees);
+ break;
+ case STANDBY:
+ testCallbackSynchronizationStandby(as, ees);
+ break;
+ case NEUTRAL:
+ testCallbackSynchronizationNeutral(as, ees);
+ break;
+ case ACTIVE_TIMING:
+ testCallbackSynchronizationTimingActive(as, ees);
+ break;
+ case STANDBY_TIMING:
+ testCallbackSynchronizationTimingStandby(as, ees);
+ break;
+ default:
+ fail("Unknown test type: " + type);
+ break;
+ }
+ }
+
+ /**
+ * Helper method to test that neutral mode plays well with an active
+ * transition.
+ *
+ * @param as the admin service
+ * @param ees the embedded elector service
+ * @throws IOException if there's an issue transitioning
+ * @throws InterruptedException if interrupted
+ */
+ private void testCallbackSynchronizationActive(AdminService as,
+ EmbeddedElectorService ees) throws IOException, InterruptedException {
+ ees.becomeActive();
+
+ Thread.sleep(100);
+
+ verify(as).transitionToActive(any());
+ verify(as, never()).transitionToStandby(any());
+ }
+
+ /**
+ * Helper method to test that neutral mode plays well with a standby
+ * transition.
+ *
+ * @param as the admin service
+ * @param ees the embedded elector service
+ * @throws IOException if there's an issue transitioning
+ * @throws InterruptedException if interrupted
+ */
+ private void testCallbackSynchronizationStandby(AdminService as,
+ EmbeddedElectorService ees) throws IOException, InterruptedException {
+ ees.becomeStandby();
+
+ Thread.sleep(100);
+
+ verify(as, atLeast(1)).transitionToStandby(any());
+ verify(as, atMost(1)).transitionToStandby(any());
+ }
+
+ /**
+ * Helper method to test that neutral mode plays well with itself.
+ *
+ * @param as the admin service
+ * @param ees the embedded elector service
+ * @throws IOException if there's an issue transitioning
+ * @throws InterruptedException if interrupted
+ */
+ private void testCallbackSynchronizationNeutral(AdminService as,
+ EmbeddedElectorService ees) throws IOException, InterruptedException {
+ ees.enterNeutralMode();
+
+ Thread.sleep(100);
+
+ verify(as, atLeast(1)).transitionToStandby(any());
+ verify(as, atMost(1)).transitionToStandby(any());
+ }
+
+ /**
+ * Helper method to test that neutral mode does not race with an active
+ * transition.
+ *
+ * @param as the admin service
+ * @param ees the embedded elector service
+ * @throws IOException if there's an issue transitioning
+ * @throws InterruptedException if interrupted
+ */
+ private void testCallbackSynchronizationTimingActive(AdminService as,
+ EmbeddedElectorService ees) throws IOException, InterruptedException {
+ synchronized (ees.zkDisconnectLock) {
+ // Sleep while holding the lock so that the timer thread can't do
+ // anything when it runs. Sleep until we're pretty sure the timer thread
+ // has tried to run.
+ Thread.sleep(100);
+ // While still holding the lock cancel the timer by transitioning. This
+ // simulates a race where the callback goes to cancel the timer while the
+ // timer is trying to run.
+ ees.becomeActive();
+ }
+
+ // Sleep just a little more so that the timer thread can do whatever it's
+ // going to do, hopefully nothing.
+ Thread.sleep(50);
+
+ verify(as).transitionToActive(any());
+ verify(as, never()).transitionToStandby(any());
+ }
+
+ /**
+ * Helper method to test that neutral mode does not race with an active
+ * transition.
+ *
+ * @param as the admin service
+ * @param ees the embedded elector service
+ * @throws IOException if there's an issue transitioning
+ * @throws InterruptedException if interrupted
+ */
+ private void testCallbackSynchronizationTimingStandby(AdminService as,
+ EmbeddedElectorService ees) throws IOException, InterruptedException {
+ synchronized (ees.zkDisconnectLock) {
+ // Sleep while holding the lock so that the timer thread can't do
+ // anything when it runs. Sleep until we're pretty sure the timer thread
+ // has tried to run.
+ Thread.sleep(100);
+ // While still holding the lock cancel the timer by transitioning. This
+ // simulates a race where the callback goes to cancel the timer while the
+ // timer is trying to run.
+ ees.becomeStandby();
+ }
+
+ // Sleep just a little more so that the timer thread can do whatever it's
+ // going to do, hopefully nothing.
+ Thread.sleep(50);
+
+ verify(as, atLeast(1)).transitionToStandby(any());
+ verify(as, atMost(1)).transitionToStandby(any());
+ }
+
private class MockRMWithElector extends MockRM {
private long delayMs = 0;