Remove a potential deadlock when shutting down a RoutingTableProvider. (#1751)

This PR aims to avoid a potential deadlock that happens in a race condition when shutting down RoutingTableProvider.

The race condition happens when the RoutingTableProvider is shut down and the corresponding HelixManager is processing an onLiveInstanceChange event. Both threads require 2 locks, HelixManager lock and RoutingTableProvider._lastSeenSessions, in different orders.
To resolve this problem, this PR changes 2 logics.
1. Remove the _lastSeenSessions lock usage. So no more deadlock. The logic correctness is now ensured by the AtomicReference.getAndSet method.
2. Remove LiveInstanceChangeListener in the shutdown call before processing the other logics. This is a missing logic and should be done regardless of the deadlock problem.Moreover, this change also avoids a more complicated race condition introduced by the previous change that may lead to CurrentStates callback handler leakage.

Test cases in TestZkCallbackHandlerLeak has been updated to validate the logic changes.
diff --git a/helix-core/src/main/java/org/apache/helix/spectator/RoutingTableProvider.java b/helix-core/src/main/java/org/apache/helix/spectator/RoutingTableProvider.java
index 37600ac..0d97c9f 100644
--- a/helix-core/src/main/java/org/apache/helix/spectator/RoutingTableProvider.java
+++ b/helix-core/src/main/java/org/apache/helix/spectator/RoutingTableProvider.java
@@ -33,6 +33,7 @@
 import java.util.concurrent.ScheduledThreadPoolExecutor;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicReference;
+import java.util.stream.Collectors;
 import javax.management.JMException;
 
 import com.google.common.collect.ImmutableMap;
@@ -325,13 +326,15 @@
     }
     _routerUpdater.shutdown();
 
-
     for (PropertyType propertyType : _monitorMap.keySet()) {
       _monitorMap.get(propertyType).unregister();
     }
 
     if (_helixManager != null) {
       PropertyKey.Builder keyBuilder = _helixManager.getHelixDataAccessor().keyBuilder();
+      // Clean up all the listeners.
+      _helixManager.removeListener(keyBuilder.liveInstances(), this);
+      _helixManager.removeListener(keyBuilder.instanceConfigs(), this);
       for (PropertyType propertyType : _sourceDataTypeMap.keySet()) {
         switch (propertyType) {
         case EXTERNALVIEW:
@@ -350,7 +353,7 @@
         case CURRENTSTATES:
           NotificationContext context = new NotificationContext(_helixManager);
           context.setType(NotificationContext.Type.FINALIZE);
-          updateCurrentStatesListeners(Collections.<LiveInstance> emptyList(), context);
+          updateCurrentStatesListeners(Collections.emptyList(), context);
           break;
         default:
           break;
@@ -751,51 +754,49 @@
 
     if (changeContext.getType() == NotificationContext.Type.FINALIZE) {
       // on finalize, should remove all current-state listeners
-      logger.info("remove current-state listeners. lastSeenSessions: {}", _lastSeenSessions);
+      logger.info("remove current-state listeners. lastSeenSessions: {}", _lastSeenSessions.get());
       liveInstances = Collections.emptyList();
     }
 
-    Map<String, LiveInstance> curSessions = new HashMap<>();
-    for (LiveInstance liveInstance : liveInstances) {
-      curSessions.put(liveInstance.getEphemeralOwner(), liveInstance);
+    Map<String, LiveInstance> curSessions = liveInstances.stream().collect(
+        Collectors.toConcurrentMap(LiveInstance::getEphemeralOwner, liveInstance -> liveInstance));
+
+    // Add listeners to new live instances
+    for (String session : curSessions.keySet()) {
+      String instanceName = curSessions.get(session).getInstanceName();
+      try {
+        // Add current-state listeners for new sessions
+        // It is possible this loop attempt to re-add listeners. But the HelixManager will skip the
+        // operation if a listener has been already registered.
+        manager.addCurrentStateChangeListener(this, instanceName, session);
+        logger.info("{} added current-state listener for instance: {}, session: {}, listener: {}",
+            manager.getInstanceName(), instanceName, session, this);
+      } catch (Exception e) {
+        logger.error("Fail to add current state listener for instance: {} with session: {}",
+            instanceName, session, e);
+      }
     }
 
-    // Go though the live instance list and update CurrentState listeners
-    synchronized (_lastSeenSessions) {
-      Map<String, LiveInstance> lastSessions = _lastSeenSessions.get();
-      if (lastSessions == null) {
-        lastSessions = Collections.emptyMap();
-      }
+    // The getAndSet ensures that all the records in _lastSeenSessions map are active listeners.
+    // And the following logic ensures that all the exchanged last seen listeners are cleaned up.
+    Map<String, LiveInstance> lastSessions = _lastSeenSessions.getAndSet(curSessions);
+    if (lastSessions == null) {
+      lastSessions = Collections.emptyMap();
+    }
 
-      // add listeners to new live instances
-      for (String session : curSessions.keySet()) {
-        if (!lastSessions.containsKey(session)) {
-          String instanceName = curSessions.get(session).getInstanceName();
-          try {
-            // add current-state listeners for new sessions
-            manager.addCurrentStateChangeListener(this, instanceName, session);
-            logger.info(
-                "{} added current-state listener for instance: {}, session: {}, listener: {}",
-                manager.getInstanceName(), instanceName, session, this);
-          } catch (Exception e) {
-            logger.error("Fail to add current state listener for instance: {} with session: {}",
-                instanceName, session, e);
-          }
-        }
-      }
-
-      // remove current-state listener for expired session
-      for (String session : lastSessions.keySet()) {
-        if (!curSessions.containsKey(session)) {
-          String instanceName = lastSessions.get(session).getInstanceName();
+    // remove current-state listener for expired session
+    for (String session : lastSessions.keySet()) {
+      if (!curSessions.containsKey(session)) {
+        String instanceName = lastSessions.get(session).getInstanceName();
+        try {
           manager.removeListener(keyBuilder.currentStates(instanceName, session), this);
           logger.info("remove current-state listener for instance: {}, session: {}", instanceName,
               session);
+        } catch (Exception e) {
+          logger.error("Fail to remove current state listener for instance: {} with session: {}",
+              instanceName, session, e);
         }
       }
-
-      // update last-seen
-      _lastSeenSessions.set(curSessions);
     }
   }
 
diff --git a/helix-core/src/test/java/org/apache/helix/integration/TestZkCallbackHandlerLeak.java b/helix-core/src/test/java/org/apache/helix/integration/TestZkCallbackHandlerLeak.java
index d9f8707..c09fbed 100644
--- a/helix-core/src/test/java/org/apache/helix/integration/TestZkCallbackHandlerLeak.java
+++ b/helix-core/src/test/java/org/apache/helix/integration/TestZkCallbackHandlerLeak.java
@@ -305,7 +305,7 @@
   }
 
   @Test
-  public void testDanglingCallbackHanlderFix() throws Exception {
+  public void testDanglingCallbackHandler() throws Exception {
     String className = TestHelper.getTestClassName();
     String methodName = TestHelper.getTestMethodName();
     String clusterName = className + "_" + methodName;
@@ -379,6 +379,10 @@
     // clean up
     controller.syncStop();
     rp.shutdown();
+
+    Assert.assertTrue(rpManager.getHandlers().isEmpty(),
+        "HelixManager should not have any callback handlers after shutting down RoutingTableProvider");
+
     rpManager.syncStop();
     for (int i = 0; i < n; i++) {
       participants[i].syncStop();
@@ -474,6 +478,10 @@
       participants[i].syncStop();
     }
     rp.shutdown();
+
+    Assert.assertTrue(rpManager.getHandlers().isEmpty(),
+        "HelixManager should not have any callback handlers after shutting down RoutingTableProvider");
+
     rpManager.syncStop();
     TestHelper.dropCluster(clusterName, _gZkClient);