Fix the race condition while Helix refresh cluster status cache. (#363)

* Fix the race condition while Helix refresh cluster status cache.

This change fix issue #331.
The design is ensuring one read only to avoid locking during the change notification. However, a later update introduced addition read. The result is that two reads may have different results because notification is lock free. This leads the cache to be in an inconsistent state. The impact is that the expected rebalance might not happen.
diff --git a/helix-core/src/main/java/org/apache/helix/controller/dataproviders/BaseControllerDataProvider.java b/helix-core/src/main/java/org/apache/helix/controller/dataproviders/BaseControllerDataProvider.java
index 9402029..0024177 100644
--- a/helix-core/src/main/java/org/apache/helix/controller/dataproviders/BaseControllerDataProvider.java
+++ b/helix-core/src/main/java/org/apache/helix/controller/dataproviders/BaseControllerDataProvider.java
@@ -19,17 +19,6 @@
  * under the License.
  */
 
-import java.util.ArrayList;
-import java.util.Arrays;
-import java.util.Collection;
-import java.util.Collections;
-import java.util.HashMap;
-import java.util.HashSet;
-import java.util.List;
-import java.util.Map;
-import java.util.Set;
-import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.ExecutorService;
 import org.apache.helix.HelixConstants;
 import org.apache.helix.HelixDataAccessor;
 import org.apache.helix.HelixProperty;
@@ -54,6 +43,19 @@
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.atomic.AtomicBoolean;
+
 /**
  * Common building block for controller to cache their data. This common building block contains
  * information about cluster config, instance config, resource config, ideal states, current state,
@@ -82,7 +84,7 @@
   private ExecutorService _asyncTasksThreadPool;
 
   // A map recording what data has changed
-  protected Map<HelixConstants.ChangeType, Boolean> _propertyDataChangedMap;
+  protected Map<HelixConstants.ChangeType, AtomicBoolean> _propertyDataChangedMap;
 
   // Property caches
   private final PropertyCache<ResourceConfig> _resourceConfigCache;
@@ -112,7 +114,8 @@
     _propertyDataChangedMap = new ConcurrentHashMap<>();
     for (HelixConstants.ChangeType type : HelixConstants.ChangeType.values()) {
       // refresh every type when it is initialized
-      _propertyDataChangedMap.put(type, true);
+      _propertyDataChangedMap
+          .put(type, new AtomicBoolean(true));
     }
 
     // initialize caches
@@ -217,11 +220,11 @@
     _instanceMessagesCache = new InstanceMessagesCache(_clusterName);
   }
 
-  private void refreshIdealState(final HelixDataAccessor accessor) {
-    if (_propertyDataChangedMap.get(HelixConstants.ChangeType.IDEAL_STATE)) {
-      _propertyDataChangedMap.put(HelixConstants.ChangeType.IDEAL_STATE, false);
-
+  private void refreshIdealState(final HelixDataAccessor accessor,
+      Set<HelixConstants.ChangeType> refreshedType) {
+    if (_propertyDataChangedMap.get(HelixConstants.ChangeType.IDEAL_STATE).getAndSet(false)) {
       _idealStateCache.refresh(accessor);
+      refreshedType.add(HelixConstants.ChangeType.IDEAL_STATE);
     } else {
       LogUtil.logInfo(logger, getClusterEventId(), String
           .format("No ideal state change for %s cluster, %s pipeline", _clusterName,
@@ -229,11 +232,12 @@
     }
   }
 
-  private void refreshLiveInstances(final HelixDataAccessor accessor) {
-    if (_propertyDataChangedMap.get(HelixConstants.ChangeType.LIVE_INSTANCE)) {
-      _propertyDataChangedMap.put(HelixConstants.ChangeType.LIVE_INSTANCE, false);
+  private void refreshLiveInstances(final HelixDataAccessor accessor,
+      Set<HelixConstants.ChangeType> refreshedType) {
+    if (_propertyDataChangedMap.get(HelixConstants.ChangeType.LIVE_INSTANCE).getAndSet(false)) {
       _liveInstanceCache.refresh(accessor);
       _updateInstanceOfflineTime = true;
+      refreshedType.add(HelixConstants.ChangeType.LIVE_INSTANCE);
     } else {
       LogUtil.logInfo(logger, getClusterEventId(), String
           .format("No live instance change for %s cluster, %s pipeline", _clusterName,
@@ -241,13 +245,14 @@
     }
   }
 
-  private void refreshInstanceConfigs(final HelixDataAccessor accessor) {
-    if (_propertyDataChangedMap.get(HelixConstants.ChangeType.INSTANCE_CONFIG)) {
-      _propertyDataChangedMap.put(HelixConstants.ChangeType.INSTANCE_CONFIG, false);
+  private void refreshInstanceConfigs(final HelixDataAccessor accessor,
+      Set<HelixConstants.ChangeType> refreshedType) {
+    if (_propertyDataChangedMap.get(HelixConstants.ChangeType.INSTANCE_CONFIG).getAndSet(false)) {
       _instanceConfigCache.refresh(accessor);
       LogUtil.logInfo(logger, getClusterEventId(), String
           .format("Reloaded InstanceConfig for cluster %s, %s pipeline. Keys: %s", _clusterName,
               getPipelineName(), _instanceConfigCache.getPropertyMap().keySet()));
+      refreshedType.add(HelixConstants.ChangeType.INSTANCE_CONFIG);
     } else {
       LogUtil.logInfo(logger, getClusterEventId(), String
           .format("No instance config change for %s cluster, %s pipeline", _clusterName,
@@ -255,13 +260,14 @@
     }
   }
 
-  private void refreshResourceConfig(final HelixDataAccessor accessor) {
-    if (_propertyDataChangedMap.get(HelixConstants.ChangeType.RESOURCE_CONFIG)) {
-      _propertyDataChangedMap.put(HelixConstants.ChangeType.RESOURCE_CONFIG, false);
+  private void refreshResourceConfig(final HelixDataAccessor accessor,
+      Set<HelixConstants.ChangeType> refreshedType) {
+    if (_propertyDataChangedMap.get(HelixConstants.ChangeType.RESOURCE_CONFIG).getAndSet(false)) {
       _resourceConfigCache.refresh(accessor);
       LogUtil.logInfo(logger, getClusterEventId(), String
           .format("Reloaded ResourceConfig for cluster %s, %s pipeline. Cnt: %s", _clusterName,
               getPipelineName(), _resourceConfigCache.getPropertyMap().keySet().size()));
+      refreshedType.add(HelixConstants.ChangeType.RESOURCE_CONFIG);
     } else {
       LogUtil.logInfo(logger, getClusterEventId(), String
           .format("No resource config change for %s cluster, %s pipeline", _clusterName,
@@ -290,12 +296,22 @@
   }
 
   public synchronized void refresh(HelixDataAccessor accessor) {
+    doRefresh(accessor);
+  }
+
+  /**
+   * @param accessor
+   * @return The types that has been updated during the refresh.
+   */
+  protected synchronized Set<HelixConstants.ChangeType> doRefresh(HelixDataAccessor accessor) {
+    Set<HelixConstants.ChangeType> refreshedTypes = new HashSet<>();
+
     // Refresh raw data
     _clusterConfig = accessor.getProperty(accessor.keyBuilder().clusterConfig());
-    refreshIdealState(accessor);
-    refreshLiveInstances(accessor);
-    refreshInstanceConfigs(accessor);
-    refreshResourceConfig(accessor);
+    refreshIdealState(accessor, refreshedTypes);
+    refreshLiveInstances(accessor, refreshedTypes);
+    refreshInstanceConfigs(accessor, refreshedTypes);
+    refreshResourceConfig(accessor, refreshedTypes);
     _stateModelDefinitionCache.refresh(accessor);
     _clusterConstraintsCache.refresh(accessor);
     updateMaintenanceInfo(accessor);
@@ -314,6 +330,8 @@
 
     updateIdealRuleMap();
     updateDisabledInstances();
+
+    return refreshedTypes;
   }
 
   protected void dumpDebugInfo() {
@@ -595,9 +613,13 @@
 
   /**
    * Notify the cache that some part of the cluster data has been changed.
+   *
+   * Don't lock the propertyDataChangedMap here because the refresh process, which also read this map,
+   * may take a long time to finish. If a lock is required, the notification might be blocked by refresh.
+   * In this case, the delayed notification processing might cause performance issue.
    */
   public void notifyDataChange(HelixConstants.ChangeType changeType) {
-    _propertyDataChangedMap.put(changeType, true);
+    _propertyDataChangedMap.get(changeType).set(true);
   }
 
   /**
@@ -669,7 +691,7 @@
   public void requireFullRefresh() {
     for (HelixConstants.ChangeType type : HelixConstants.ChangeType.values()) {
       if (!_noFullRefreshProperty.contains(type)) {
-        _propertyDataChangedMap.put(type, true);
+        _propertyDataChangedMap.get(type).set(true);
       }
     }
   }
diff --git a/helix-core/src/main/java/org/apache/helix/controller/dataproviders/ResourceControllerDataProvider.java b/helix-core/src/main/java/org/apache/helix/controller/dataproviders/ResourceControllerDataProvider.java
index cf21230..59c973b 100644
--- a/helix-core/src/main/java/org/apache/helix/controller/dataproviders/ResourceControllerDataProvider.java
+++ b/helix-core/src/main/java/org/apache/helix/controller/dataproviders/ResourceControllerDataProvider.java
@@ -21,8 +21,11 @@
 
 import java.util.Collections;
 import java.util.HashMap;
+import java.util.HashSet;
 import java.util.List;
 import java.util.Map;
+import java.util.Set;
+
 import org.apache.helix.HelixConstants;
 import org.apache.helix.HelixDataAccessor;
 import org.apache.helix.PropertyKey;
@@ -109,17 +112,17 @@
   public synchronized void refresh(HelixDataAccessor accessor) {
     long startTime = System.currentTimeMillis();
 
-    // Invalidate cached information
-    if (_propertyDataChangedMap.get(HelixConstants.ChangeType.IDEAL_STATE)
-        || _propertyDataChangedMap.get(HelixConstants.ChangeType.LIVE_INSTANCE)
-        || _propertyDataChangedMap.get(HelixConstants.ChangeType.INSTANCE_CONFIG)
-        || _propertyDataChangedMap.get(HelixConstants.ChangeType.RESOURCE_CONFIG)) {
+    // Refresh base
+    Set<HelixConstants.ChangeType> propertyRefreshed = super.doRefresh(accessor);
+
+    // Invalidate cached information if any of the important data has been refreshed
+    if (propertyRefreshed.contains(HelixConstants.ChangeType.IDEAL_STATE)
+        || propertyRefreshed.contains(HelixConstants.ChangeType.LIVE_INSTANCE)
+        || propertyRefreshed.contains(HelixConstants.ChangeType.INSTANCE_CONFIG)
+        || propertyRefreshed.contains(HelixConstants.ChangeType.RESOURCE_CONFIG)) {
       clearCachedResourceAssignments();
     }
 
-    // Refresh base
-    super.refresh(accessor);
-
     // Refresh resource controller specific property caches
     refreshExternalViews(accessor);
     refreshTargetExternalViews(accessor);
@@ -140,20 +143,16 @@
   private void refreshExternalViews(final HelixDataAccessor accessor) {
     // As we are not listening on external view change, external view will be
     // refreshed once during the cache's first refresh() call, or when full refresh is required
-    if (_propertyDataChangedMap.get(HelixConstants.ChangeType.EXTERNAL_VIEW)) {
+    if (_propertyDataChangedMap.get(HelixConstants.ChangeType.EXTERNAL_VIEW).getAndSet(false)) {
       _externalViewCache.refresh(accessor);
-      _propertyDataChangedMap.put(HelixConstants.ChangeType.EXTERNAL_VIEW, false);
     }
   }
 
   private void refreshTargetExternalViews(final HelixDataAccessor accessor) {
-    if (_propertyDataChangedMap.get(HelixConstants.ChangeType.TARGET_EXTERNAL_VIEW)) {
+    if (_propertyDataChangedMap.get(HelixConstants.ChangeType.TARGET_EXTERNAL_VIEW).getAndSet(false)) {
       if (getClusterConfig() != null && getClusterConfig().isTargetExternalViewEnabled()) {
+        // Only refresh with data accessor for the first time
         _targetExternalViewCache.refresh(accessor);
-
-        // Only set the change type back once we get refreshed with data accessor for the
-        // first time
-        _propertyDataChangedMap.put(HelixConstants.ChangeType.TARGET_EXTERNAL_VIEW, false);
       }
     }
   }
diff --git a/helix-core/src/main/java/org/apache/helix/controller/dataproviders/WorkflowControllerDataProvider.java b/helix-core/src/main/java/org/apache/helix/controller/dataproviders/WorkflowControllerDataProvider.java
index 2eee09e..e637e3d 100644
--- a/helix-core/src/main/java/org/apache/helix/controller/dataproviders/WorkflowControllerDataProvider.java
+++ b/helix-core/src/main/java/org/apache/helix/controller/dataproviders/WorkflowControllerDataProvider.java
@@ -19,9 +19,6 @@
  * under the License.
  */
 
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
 import org.apache.helix.HelixConstants;
 import org.apache.helix.HelixDataAccessor;
 import org.apache.helix.ZNRecord;
@@ -41,6 +38,11 @@
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
 
 /**
  * Data provider for workflow controller.
@@ -69,24 +71,22 @@
     _taskDataCache = new TaskDataCache(this);
   }
 
-  private void refreshClusterStateChangeFlags() {
+  private void refreshClusterStateChangeFlags(Set<HelixConstants.ChangeType> propertyRefreshed) {
     // This is for targeted jobs' task assignment. It needs to watch for current state changes for
     // when targeted resources' state transitions complete
-    if (_propertyDataChangedMap.get(HelixConstants.ChangeType.CURRENT_STATE)
-        || _propertyDataChangedMap.get(HelixConstants.ChangeType.LIVE_INSTANCE)) {
-      _existsLiveInstanceOrCurrentStateChange = true;
-
-      // BaseControllerDataProvider will take care of marking live instance change
-      _propertyDataChangedMap.put(HelixConstants.ChangeType.CURRENT_STATE, false);
-    } else {
-      _existsLiveInstanceOrCurrentStateChange = false;
-    }
+    _existsLiveInstanceOrCurrentStateChange =
+        // TODO read and update CURRENT_STATE in the BaseControllerDataProvider as well.
+        // This check (and set) is necessary for now since the current state flag in _propertyDataChangedMap is not used by the BaseControllerDataProvider for now.
+        _propertyDataChangedMap.get(HelixConstants.ChangeType.CURRENT_STATE).getAndSet(false)
+            || propertyRefreshed.contains(HelixConstants.ChangeType.CURRENT_STATE)
+            || propertyRefreshed.contains(HelixConstants.ChangeType.LIVE_INSTANCE);
   }
 
   public synchronized void refresh(HelixDataAccessor accessor) {
     long startTime = System.currentTimeMillis();
-    refreshClusterStateChangeFlags();
-    super.refresh(accessor);
+    Set<HelixConstants.ChangeType> propertyRefreshed = super.doRefresh(accessor);
+
+    refreshClusterStateChangeFlags(propertyRefreshed);
 
     // Refresh TaskCache
     _taskDataCache.refresh(accessor, getResourceConfigMap());