YARN-5677. RM should transition to standby when connection is lost for an extended period. (Daniel Templeton via kasha)
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/EmbeddedElectorService.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/EmbeddedElectorService.java
index 72327e8..88d2e10 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/EmbeddedElectorService.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/EmbeddedElectorService.java
@@ -17,6 +17,7 @@
  */
 package org.apache.hadoop.yarn.server.resourcemanager;
 
+import com.google.common.annotations.VisibleForTesting;
 import com.google.protobuf.InvalidProtocolBufferException;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
@@ -39,6 +40,8 @@
 
 import java.io.IOException;
 import java.util.List;
+import java.util.Timer;
+import java.util.TimerTask;
 
 @InterfaceAudience.Private
 @InterfaceStability.Unstable
@@ -54,6 +57,10 @@
 
   private byte[] localActiveNodeInfo;
   private ActiveStandbyElector elector;
+  private long zkSessionTimeout;
+  private Timer zkDisconnectTimer;
+  @VisibleForTesting
+  final Object zkDisconnectLock = new Object();
 
   EmbeddedElectorService(RMContext rmContext) {
     super(EmbeddedElectorService.class.getName());
@@ -80,7 +87,7 @@
         YarnConfiguration.DEFAULT_AUTO_FAILOVER_ZK_BASE_PATH);
     String electionZNode = zkBasePath + "/" + clusterId;
 
-    long zkSessionTimeout = conf.getLong(YarnConfiguration.RM_ZK_TIMEOUT_MS,
+    zkSessionTimeout = conf.getLong(YarnConfiguration.RM_ZK_TIMEOUT_MS,
         YarnConfiguration.DEFAULT_RM_ZK_TIMEOUT_MS);
 
     List<ACL> zkAcls = RMZKUtils.getZKAcls(conf);
@@ -123,6 +130,8 @@
 
   @Override
   public void becomeActive() throws ServiceFailedException {
+    cancelDisconnectTimer();
+
     try {
       rmContext.getRMAdminService().transitionToActive(req);
     } catch (Exception e) {
@@ -132,6 +141,8 @@
 
   @Override
   public void becomeStandby() {
+    cancelDisconnectTimer();
+
     try {
       rmContext.getRMAdminService().transitionToStandby(req);
     } catch (Exception e) {
@@ -139,13 +150,49 @@
     }
   }
 
+  /**
+   * Stop the disconnect timer.  Any running tasks will be allowed to complete.
+   */
+  private void cancelDisconnectTimer() {
+    synchronized (zkDisconnectLock) {
+      if (zkDisconnectTimer != null) {
+        zkDisconnectTimer.cancel();
+        zkDisconnectTimer = null;
+      }
+    }
+  }
+
+  /**
+   * When the ZK client loses contact with ZK, this method will be called to
+   * allow the RM to react. Because the loss of connection can be noticed
+   * before the session timeout happens, it is undesirable to transition
+   * immediately. Instead the method starts a timer that will wait
+   * {@link YarnConfiguration#RM_ZK_TIMEOUT_MS} milliseconds before
+   * initiating the transition into standby state.
+   */
   @Override
   public void enterNeutralMode() {
-    /**
-     * Possibly due to transient connection issues. Do nothing.
-     * TODO: Might want to keep track of how long in this state and transition
-     * to standby.
-     */
+    LOG.warn("Lost contact with Zookeeper. Transitioning to standby in "
+        + zkSessionTimeout + " ms if connection is not reestablished.");
+
+    // If we've just become disconnected, start a timer.  When the time's up,
+    // we'll transition to standby.
+    synchronized (zkDisconnectLock) {
+      if (zkDisconnectTimer == null) {
+        zkDisconnectTimer = new Timer("Zookeeper disconnect timer");
+        zkDisconnectTimer.schedule(new TimerTask() {
+          @Override
+          public void run() {
+            synchronized (zkDisconnectLock) {
+              // Only run if the timer hasn't been cancelled
+              if (zkDisconnectTimer != null) {
+                becomeStandby();
+              }
+            }
+          }
+        }, zkSessionTimeout);
+      }
+    }
   }
 
   @SuppressWarnings(value = "unchecked")
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestRMEmbeddedElector.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestRMEmbeddedElector.java
index 20b1c0e..bfd0b4e 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestRMEmbeddedElector.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestRMEmbeddedElector.java
@@ -28,6 +28,14 @@
 
 import java.io.IOException;
 import java.util.concurrent.atomic.AtomicBoolean;
+import static org.junit.Assert.fail;
+import static org.mockito.Matchers.any;
+import static org.mockito.Mockito.atLeast;
+import static org.mockito.Mockito.atMost;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.never;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.when;
 
 public class TestRMEmbeddedElector extends ClientBaseWithFixes {
   private static final Log LOG =
@@ -41,6 +49,14 @@
   private Configuration conf;
   private AtomicBoolean callbackCalled;
 
+  private enum SyncTestType {
+    ACTIVE,
+    STANDBY,
+    NEUTRAL,
+    ACTIVE_TIMING,
+    STANDBY_TIMING
+  }
+
   @Before
   public void setup() throws IOException {
     conf = new YarnConfiguration();
@@ -79,6 +95,181 @@
     LOG.info("Stopped RM");
   }
 
+  /**
+   * Test that neutral mode plays well with all other transitions.
+   *
+   * @throws IOException if there's an issue transitioning
+   * @throws InterruptedException if interrupted
+   */
+  @Test
+  public void testCallbackSynchronization()
+      throws IOException, InterruptedException {
+    testCallbackSynchronization(SyncTestType.ACTIVE);
+    testCallbackSynchronization(SyncTestType.STANDBY);
+    testCallbackSynchronization(SyncTestType.NEUTRAL);
+    testCallbackSynchronization(SyncTestType.ACTIVE_TIMING);
+    testCallbackSynchronization(SyncTestType.STANDBY_TIMING);
+  }
+
+  /**
+   * Helper method to test that neutral mode plays well with other transitions.
+   *
+   * @param type the type of test to run
+   * @throws IOException if there's an issue transitioning
+   * @throws InterruptedException if interrupted
+   */
+  private void testCallbackSynchronization(SyncTestType type)
+      throws IOException, InterruptedException {
+    AdminService as = mock(AdminService.class);
+    RMContext rc = mock(RMContext.class);
+    Configuration myConf = new Configuration(conf);
+
+    myConf.setInt(YarnConfiguration.RM_ZK_TIMEOUT_MS, 50);
+    when(rc.getRMAdminService()).thenReturn(as);
+
+    EmbeddedElectorService ees = new EmbeddedElectorService(rc);
+    ees.init(myConf);
+
+    ees.enterNeutralMode();
+
+    switch (type) {
+    case ACTIVE:
+      testCallbackSynchronizationActive(as, ees);
+      break;
+    case STANDBY:
+      testCallbackSynchronizationStandby(as, ees);
+      break;
+    case NEUTRAL:
+      testCallbackSynchronizationNeutral(as, ees);
+      break;
+    case ACTIVE_TIMING:
+      testCallbackSynchronizationTimingActive(as, ees);
+      break;
+    case STANDBY_TIMING:
+      testCallbackSynchronizationTimingStandby(as, ees);
+      break;
+    default:
+      fail("Unknown test type: " + type);
+      break;
+    }
+  }
+
+  /**
+   * Helper method to test that neutral mode plays well with an active
+   * transition.
+   *
+   * @param as the admin service
+   * @param ees the embedded elector service
+   * @throws IOException if there's an issue transitioning
+   * @throws InterruptedException if interrupted
+   */
+  private void testCallbackSynchronizationActive(AdminService as,
+      EmbeddedElectorService ees) throws IOException, InterruptedException {
+    ees.becomeActive();
+
+    Thread.sleep(100);
+
+    verify(as).transitionToActive(any());
+    verify(as, never()).transitionToStandby(any());
+  }
+
+  /**
+   * Helper method to test that neutral mode plays well with a standby
+   * transition.
+   *
+   * @param as the admin service
+   * @param ees the embedded elector service
+   * @throws IOException if there's an issue transitioning
+   * @throws InterruptedException if interrupted
+   */
+  private void testCallbackSynchronizationStandby(AdminService as,
+      EmbeddedElectorService ees) throws IOException, InterruptedException {
+    ees.becomeStandby();
+
+    Thread.sleep(100);
+
+    verify(as, atLeast(1)).transitionToStandby(any());
+    verify(as, atMost(1)).transitionToStandby(any());
+  }
+
+  /**
+   * Helper method to test that neutral mode plays well with itself.
+   *
+   * @param as the admin service
+   * @param ees the embedded elector service
+   * @throws IOException if there's an issue transitioning
+   * @throws InterruptedException if interrupted
+   */
+  private void testCallbackSynchronizationNeutral(AdminService as,
+      EmbeddedElectorService ees) throws IOException, InterruptedException {
+    ees.enterNeutralMode();
+
+    Thread.sleep(100);
+
+    verify(as, atLeast(1)).transitionToStandby(any());
+    verify(as, atMost(1)).transitionToStandby(any());
+  }
+
+  /**
+   * Helper method to test that neutral mode does not race with an active
+   * transition.
+   *
+   * @param as the admin service
+   * @param ees the embedded elector service
+   * @throws IOException if there's an issue transitioning
+   * @throws InterruptedException if interrupted
+   */
+  private void testCallbackSynchronizationTimingActive(AdminService as,
+      EmbeddedElectorService ees) throws IOException, InterruptedException {
+    synchronized (ees.zkDisconnectLock) {
+      // Sleep while holding the lock so that the timer thread can't do
+      // anything when it runs.  Sleep until we're pretty sure the timer thread
+      // has tried to run.
+      Thread.sleep(100);
+      // While still holding the lock cancel the timer by transitioning. This
+      // simulates a race where the callback goes to cancel the timer while the
+      // timer is trying to run.
+      ees.becomeActive();
+    }
+
+    // Sleep just a little more so that the timer thread can do whatever it's
+    // going to do, hopefully nothing.
+    Thread.sleep(50);
+
+    verify(as).transitionToActive(any());
+    verify(as, never()).transitionToStandby(any());
+  }
+
+  /**
+   * Helper method to test that neutral mode does not race with an active
+   * transition.
+   *
+   * @param as the admin service
+   * @param ees the embedded elector service
+   * @throws IOException if there's an issue transitioning
+   * @throws InterruptedException if interrupted
+   */
+  private void testCallbackSynchronizationTimingStandby(AdminService as,
+      EmbeddedElectorService ees) throws IOException, InterruptedException {
+    synchronized (ees.zkDisconnectLock) {
+      // Sleep while holding the lock so that the timer thread can't do
+      // anything when it runs.  Sleep until we're pretty sure the timer thread
+      // has tried to run.
+      Thread.sleep(100);
+      // While still holding the lock cancel the timer by transitioning. This
+      // simulates a race where the callback goes to cancel the timer while the
+      // timer is trying to run.
+      ees.becomeStandby();
+    }
+
+    // Sleep just a little more so that the timer thread can do whatever it's
+    // going to do, hopefully nothing.
+    Thread.sleep(50);
+
+    verify(as, atLeast(1)).transitionToStandby(any());
+    verify(as, atMost(1)).transitionToStandby(any());
+  }
+
   private class MockRMWithElector extends MockRM {
     private long delayMs = 0;