Add concurrent control to DistClusterControllerStateModel._controller access to avoid NPE. (#1753)

This PR fixes a potential NPE Exception that may be thrown in the DistClusterControllerStateModel state transition methods.
When this error happens, a follower-to-leader state transition might be interrupted. The controller instance partition will be set with the ERROR state but the controller instance may have connected to the Zookeeper already. This causes inconsistency and leakage since the controller instance won't be properly cleaned up when it is dropped directly from the ERROR state.
diff --git a/helix-core/src/main/java/org/apache/helix/participant/DistClusterControllerStateModel.java b/helix-core/src/main/java/org/apache/helix/participant/DistClusterControllerStateModel.java
index 6877dc5..430f47f 100644
--- a/helix-core/src/main/java/org/apache/helix/participant/DistClusterControllerStateModel.java
+++ b/helix-core/src/main/java/org/apache/helix/participant/DistClusterControllerStateModel.java
@@ -19,6 +19,7 @@
  * under the License.
  */
 
+import java.util.Optional;
 import java.util.Set;
 
 import com.google.common.collect.Sets;
@@ -32,20 +33,18 @@
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-@StateModelInfo(initialState = "OFFLINE", states = {
-    "LEADER", "STANDBY"
-})
+
+@StateModelInfo(initialState = "OFFLINE", states = {"LEADER", "STANDBY"})
 public class DistClusterControllerStateModel extends AbstractHelixLeaderStandbyStateModel {
   private static Logger logger = LoggerFactory.getLogger(DistClusterControllerStateModel.class);
-  protected HelixManager _controller = null;
+  protected Optional<HelixManager> _controllerOpt = Optional.empty();
   private final Set<Pipeline.Type> _enabledPipelineTypes;
 
   public DistClusterControllerStateModel(String zkAddr) {
     this(zkAddr, Sets.newHashSet(Pipeline.Type.DEFAULT, Pipeline.Type.TASK));
   }
 
-  public DistClusterControllerStateModel(String zkAddr,
-      Set<Pipeline.Type> enabledPipelineTypes) {
+  public DistClusterControllerStateModel(String zkAddr, Set<Pipeline.Type> enabledPipelineTypes) {
     super(zkAddr);
     _enabledPipelineTypes = enabledPipelineTypes;
   }
@@ -63,19 +62,20 @@
 
     logger.info(controllerName + " becoming leader from standby for " + clusterName);
 
-    if (_controller == null) {
-      _controller =
-          HelixManagerFactory.getZKHelixManager(clusterName, controllerName,
-              InstanceType.CONTROLLER, _zkAddr);
-      _controller.setEnabledControlPipelineTypes(_enabledPipelineTypes);
-      _controller.connect();
-      _controller.startTimerTasks();
-      logStateTransition("STANDBY", "LEADER", clusterName, controllerName);
-    } else {
-      logger.error("controller already exists:" + _controller.getInstanceName() + " for "
-          + clusterName);
+    synchronized (_controllerOpt) {
+      if (!_controllerOpt.isPresent()) {
+        HelixManager newController = HelixManagerFactory
+            .getZKHelixManager(clusterName, controllerName, InstanceType.CONTROLLER, _zkAddr);
+        newController.setEnabledControlPipelineTypes(_enabledPipelineTypes);
+        newController.connect();
+        newController.startTimerTasks();
+        _controllerOpt = Optional.of(newController);
+        logStateTransition("STANDBY", "LEADER", clusterName, controllerName);
+      } else {
+        logger.error("controller already exists:" + _controllerOpt.get().getInstanceName() + " for "
+            + clusterName);
+      }
     }
-
   }
 
   @Override
@@ -85,7 +85,7 @@
 
     logger.info(controllerName + " becoming standby from leader for " + clusterName);
 
-    if (_controller != null) {
+    if (_controllerOpt.isPresent()) {
       reset();
       logStateTransition("LEADER", "STANDBY", clusterName, controllerName);
     } else {
@@ -112,10 +112,11 @@
 
   @Override
   public void reset() {
-    if (_controller != null) {
-      _controller.disconnect();
-      _controller = null;
+    synchronized (_controllerOpt) {
+      if (_controllerOpt.isPresent()) {
+        _controllerOpt.get().disconnect();
+        _controllerOpt = Optional.empty();
+      }
     }
-
   }
 }
diff --git a/helix-core/src/test/java/org/apache/helix/monitoring/TestClusterStatusMonitorLifecycle.java b/helix-core/src/test/java/org/apache/helix/monitoring/TestClusterStatusMonitorLifecycle.java
index 1eab81b..6a55264 100644
--- a/helix-core/src/test/java/org/apache/helix/monitoring/TestClusterStatusMonitorLifecycle.java
+++ b/helix-core/src/test/java/org/apache/helix/monitoring/TestClusterStatusMonitorLifecycle.java
@@ -288,16 +288,13 @@
     cleanupControllers();
     // Check if any MBeans leftover.
     // Note that MessageQueueStatus is not bound with controller only. So it will still exist.
-    final QueryExp exp2 = Query.and(
-        Query.not(Query.match(Query.attr("SensorName"), Query.value("MessageQueueStatus.*"))),
-        exp1);
+    final QueryExp exp2 = Query
+        .and(Query.not(Query.match(Query.attr("SensorName"), Query.value("MessageQueueStatus.*"))),
+            exp1);
 
-    // Note, the _asyncTasksThreadPool shutting down logic in GenericHelixController is best effort
-    // there is not guarantee that all threads in the pool is gone. Mossstly they will, but not always.
-    // see https://github.com/apache/helix/issues/1280
     boolean result = TestHelper.verify(() -> ManagementFactory.getPlatformMBeanServer()
         .queryMBeans(new ObjectName("ClusterStatus:*"), exp2).isEmpty(), TestHelper.WAIT_DURATION);
-    Assert.assertTrue(result,
-        "A small chance this may fail due to _asyncThread pool in controller may not shutdown in time. Please check issue 1280 to verify if this is the case.");
+    Assert.assertTrue(result, "Remaining MBeans: " + ManagementFactory.getPlatformMBeanServer()
+        .queryMBeans(new ObjectName("ClusterStatus:*"), exp2).toString());
   }
 }