Implement ViewClusterRefresher caching synchronization (#2199)

Implement ViewClusterRefresher caching synchronization
Implement a refresh mechanism to update view cluster local cache from remote source of truth data in case of external data change.
diff --git a/helix-view-aggregator/src/main/java/org/apache/helix/view/aggregator/HelixViewAggregator.java b/helix-view-aggregator/src/main/java/org/apache/helix/view/aggregator/HelixViewAggregator.java
index b1d34b9..4a44579 100644
--- a/helix-view-aggregator/src/main/java/org/apache/helix/view/aggregator/HelixViewAggregator.java
+++ b/helix-view-aggregator/src/main/java/org/apache/helix/view/aggregator/HelixViewAggregator.java
@@ -75,7 +75,7 @@
     _viewClusterManager = HelixManagerFactory
         .getZKHelixManager(_viewClusterName, generateHelixManagerInstanceName(_viewClusterName),
             InstanceType.SPECTATOR, zkAddr);
-    _refreshViewCluster = new AtomicBoolean(false);
+    _refreshViewCluster = new AtomicBoolean(true);
     _monitor = new ViewAggregatorMonitor(viewClusterName);
     _aggregator = new DedupEventProcessor<ClusterViewEvent.Type, ClusterViewEvent>(_viewClusterName,
         "Aggregator") {
@@ -191,7 +191,11 @@
         _refreshViewCluster.set(true);
         break;
       case PeriodicViewRefresh:
-        if (!_refreshViewCluster.get()) {
+        // refresh local view cluster data cache
+        boolean cacheChanged = _viewClusterRefresher.refreshViewClusterDataCache();
+        if (cacheChanged) {
+          logger.info("Detected change in view cluster data, trigger a refresh");
+        } else if (!_refreshViewCluster.get()) {
           logger.info("Skip refresh: No event happened since last refresh, and no force refresh.");
           return;
         }
@@ -199,6 +203,7 @@
         // least some of the elements in view cluster
         logger.info("Refreshing cluster based on event " + event.getEventType().name());
         refreshViewCluster();
+        _viewClusterRefresher.refreshViewClusterDataCache();
         break;
       default:
         logger.error(String.format("Unrecognized event type: %s", event.getEventType()));
diff --git a/helix-view-aggregator/src/main/java/org/apache/helix/view/aggregator/ViewClusterRefresher.java b/helix-view-aggregator/src/main/java/org/apache/helix/view/aggregator/ViewClusterRefresher.java
index 1885a73..c6df908 100644
--- a/helix-view-aggregator/src/main/java/org/apache/helix/view/aggregator/ViewClusterRefresher.java
+++ b/helix-view-aggregator/src/main/java/org/apache/helix/view/aggregator/ViewClusterRefresher.java
@@ -34,6 +34,7 @@
 import org.apache.helix.PropertyType;
 import org.apache.helix.model.ExternalView;
 import org.apache.helix.view.dataprovider.SourceClusterDataProvider;
+import org.apache.helix.view.dataprovider.ViewClusterDataCache;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -44,25 +45,18 @@
  */
 public class ViewClusterRefresher {
   private static final Logger logger = LoggerFactory.getLogger(ViewClusterRefresher.class);
-  private String _viewClusterName;
-  private HelixDataAccessor _viewClusterDataAccessor;
+  private final String _viewClusterName;
+  private final HelixDataAccessor _viewClusterDataAccessor;
+  private final ViewClusterDataCache _viewClusterDataCache;
   private Set<SourceClusterDataProvider> _dataProviderView;
 
-  // These 3 caches stores objects that are pushed to view cluster (write-through) cache,
-  // thus we don't need to read from view cluster everytime we refresh it.
-  private Map<String, HelixProperty> _viewClusterLiveInstanceCache;
-  private Map<String, HelixProperty> _viewClusterInstanceConfigCache;
-  private Map<String, HelixProperty> _viewClusterExternalViewCache;
-
   public ViewClusterRefresher(String viewClusterName, HelixDataAccessor viewClusterDataAccessor) {
     _viewClusterName = viewClusterName;
     _viewClusterDataAccessor = viewClusterDataAccessor;
-    _viewClusterLiveInstanceCache = new HashMap<>();
-    _viewClusterInstanceConfigCache = new HashMap<>();
-    _viewClusterExternalViewCache = new HashMap<>();
+    _viewClusterDataCache = new ViewClusterDataCache(viewClusterName);
   }
 
-  private class ClusterPropertyDiff {
+  private static class ClusterPropertyDiff {
     /**
      * List of names of objects to set (create or modify)
      */
@@ -126,7 +120,7 @@
     Set<String> listedNamesInSource = new HashSet<>();
     Map<String, HelixProperty> sourceProperties = new HashMap<>();
     Map<String, HelixProperty> viewClusterPropertyCache =
-        getViewClusterPropertyCache(propertyType);
+        (Map<String, HelixProperty>) getViewClusterPropertyCache(propertyType);
     if (viewClusterPropertyCache == null) {
       throw new IllegalArgumentException(
           "Cannot find view cluster property cache. Property: " + propertyType.name());
@@ -170,8 +164,7 @@
       }
 
       // Perform refresh
-      ok = doRefresh(propertyType, listedNamesInView, listedNamesInSource, sourceProperties,
-          viewClusterPropertyCache);
+      ok = doRefresh(propertyType, listedNamesInView, listedNamesInSource, sourceProperties, viewClusterPropertyCache);
     } catch (Exception e) {
       logger.warn(String
           .format("Caught exception during refreshing %s for view cluster %s", propertyType.name(),
@@ -215,12 +208,11 @@
    * @param sourcePropertyNames names of all properties (i.e. liveInstances) in all source clusters
    * @param cachedSourceProperties all cached properties from source clusters
    * @param viewClusterPropertyCache all properties that are previously set successfully to view cluster
-   * @param <T> extends HelixProperty
    * @return ClusterPropertyDiff object contains diff information
    */
-  private <T extends HelixProperty> ClusterPropertyDiff calculatePropertyDiff(
+  private ClusterPropertyDiff calculatePropertyDiff(
       Set<String> viewPropertyNames, Set<String> sourcePropertyNames,
-      Map<String, T> cachedSourceProperties, Map<String, T> viewClusterPropertyCache) {
+      Map<String, HelixProperty> cachedSourceProperties, Map<String, HelixProperty> viewClusterPropertyCache) {
     ClusterPropertyDiff diff = new ClusterPropertyDiff();
 
     // items whose names are in view cluster but not in source should be removed for sure
@@ -228,7 +220,7 @@
     toDelete.removeAll(sourcePropertyNames);
     diff.addPropertiesToDelete(toDelete);
 
-    for (Map.Entry<String, T> sourceProperty : cachedSourceProperties.entrySet()) {
+    for (Map.Entry<String, HelixProperty> sourceProperty : cachedSourceProperties.entrySet()) {
       String name = sourceProperty.getKey();
       HelixProperty property = sourceProperty.getValue();
 
@@ -263,17 +255,15 @@
    * @param sourcePropertyNames all names of the target properties in source clusters
    * @param cachedSourceProperties all up-to-date cached properties in source cluster
    * @param viewClusterPropertyCache view cluster cache
-   * @param <T> extends HelixProperty
    * @return true if all required refreshes are successful, else false
    */
-  private <T extends HelixProperty> boolean doRefresh(PropertyType propertyType,
+  private boolean doRefresh(PropertyType propertyType,
       Set<String> viewPropertyNames, Set<String> sourcePropertyNames,
-      Map<String, T> cachedSourceProperties, Map<String, T> viewClusterPropertyCache) {
+      Map<String, HelixProperty> cachedSourceProperties, Map<String, HelixProperty> viewClusterPropertyCache) {
     boolean ok = true;
     // Calculate diff
     ClusterPropertyDiff diff =
-        calculatePropertyDiff(viewPropertyNames, sourcePropertyNames, cachedSourceProperties,
-            viewClusterPropertyCache);
+        calculatePropertyDiff(viewPropertyNames, sourcePropertyNames, cachedSourceProperties, viewClusterPropertyCache);
 
     // Generate property keys
     List<PropertyKey> keysToSet = new ArrayList<>();
@@ -293,12 +283,12 @@
     }
 
     // Delete outdated properties
-    if (!deleteProperties(keysToDelete, viewClusterPropertyCache)) {
+    if (!deleteProperties(keysToDelete)) {
       ok = false;
     }
 
     // Add or update changed properties
-    if (!addOrUpdateProperties(keysToSet, diff.getPropertiesToSet(), viewClusterPropertyCache)) {
+    if (!addOrUpdateProperties(keysToSet, diff.getPropertiesToSet())) {
       ok = false;
     }
     return ok;
@@ -330,14 +320,22 @@
     }
   }
 
-  private Map<String, HelixProperty> getViewClusterPropertyCache(PropertyType propertyType) {
+  /**
+   * Refresh view cluster data cache and return true if there is data update.
+   * @return true if new change is fetched from remote
+   */
+  boolean refreshViewClusterDataCache() {
+    return _viewClusterDataCache.updateCache(_viewClusterDataAccessor);
+  }
+
+  private Map<String, ? extends HelixProperty> getViewClusterPropertyCache(PropertyType propertyType) {
     switch (propertyType) {
     case INSTANCES:
-      return _viewClusterInstanceConfigCache;
+      return _viewClusterDataCache.getInstanceConfigMap();
     case LIVEINSTANCES:
-      return _viewClusterLiveInstanceCache;
+      return _viewClusterDataCache.getLiveInstances();
     case EXTERNALVIEW:
-      return _viewClusterExternalViewCache;
+      return _viewClusterDataCache.getExternalViews();
     default:
       return null;
     }
@@ -348,28 +346,21 @@
    * for the objects that got successfully created or updated in ZK
    * @param keysToAddOrUpdate
    * @param objects
-   * @param cache
    * @param <T> HelixProperty
    *
    * @return true if all objects are successfully created or updated, else false
    */
   private <T extends HelixProperty> boolean addOrUpdateProperties(
-      List<PropertyKey> keysToAddOrUpdate, List<HelixProperty> objects, Map<String, T> cache) {
+      List<PropertyKey> keysToAddOrUpdate, List<HelixProperty> objects) {
     boolean ok = true;
     logger.info(
         String.format("AddOrUpdate %s objects: %s", keysToAddOrUpdate.size(), keysToAddOrUpdate));
     boolean[] addOrUpdateResults = _viewClusterDataAccessor.setChildren(keysToAddOrUpdate, objects);
     for (int i = 0; i < addOrUpdateResults.length; i++) {
       if (!addOrUpdateResults[i]) {
-        // Don't add item to cache yet - will retry during next refresh
         logger.warn(String.format("Failed to create or update live instance %s, will retry later",
             keysToAddOrUpdate.get(i).getPath()));
         ok = false;
-      } else {
-        // Successfully updated ViewClusterZK, proceed to update cache
-        @SuppressWarnings("unchecked")
-        T property = (T) objects.get(i);
-        cache.put(getBaseObjectNameFromPropertyKey(keysToAddOrUpdate.get(i)), property);
       }
     }
     return ok;
@@ -379,38 +370,22 @@
    * Delete properties in ZK specified by a list of property keys. Update the given cache
    * for the objects that got successfully deleted in ZK
    * @param keysToDelete
-   * @param cache
    * @param <T> HelixProperty
    * @return true if all objects got successfully deleted else false
    */
-  private <T extends HelixProperty> boolean deleteProperties(List<PropertyKey> keysToDelete,
-      Map<String, T> cache) {
+  private <T extends HelixProperty> boolean deleteProperties(List<PropertyKey> keysToDelete) {
     boolean ok = true;
     logger.info(String.format("Deleting %s objects: %s", keysToDelete.size(), keysToDelete));
     for (PropertyKey key : keysToDelete) {
       if (!_viewClusterDataAccessor.removeProperty(key)) {
-        // Don't remove item from cache yet - will retry during next refresh
         ok = false;
         logger.warn(String.format("Failed to create or update live instance %s, will retry later",
             key.getPath()));
-      } else {
-        // Successfully updated ViewClusterZK, proceed to update cache
-        cache.remove(getBaseObjectNameFromPropertyKey(key));
       }
     }
     return ok;
   }
 
-  /**
-   * Parse property key and get the id of the ZNode that property key represents
-   * @param key
-   * @return
-   */
-  private static String getBaseObjectNameFromPropertyKey(PropertyKey key) {
-    String[] params = key.getParams();
-    return params[params.length - 1];
-  }
-
   private void logRefreshResult(PropertyType type, boolean ok) {
     if (!ok) {
       logger.warn(String
diff --git a/helix-view-aggregator/src/main/java/org/apache/helix/view/dataprovider/ViewClusterDataCache.java b/helix-view-aggregator/src/main/java/org/apache/helix/view/dataprovider/ViewClusterDataCache.java
new file mode 100644
index 0000000..f9b6a72
--- /dev/null
+++ b/helix-view-aggregator/src/main/java/org/apache/helix/view/dataprovider/ViewClusterDataCache.java
@@ -0,0 +1,53 @@
+package org.apache.helix.view.dataprovider;
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+import java.util.HashMap;
+import java.util.Map;
+import org.apache.helix.HelixDataAccessor;
+import org.apache.helix.common.caches.BasicClusterDataCache;
+import org.apache.helix.model.ExternalView;
+import org.apache.helix.model.InstanceConfig;
+import org.apache.helix.model.LiveInstance;
+
+
+public class ViewClusterDataCache extends BasicClusterDataCache {
+
+  public ViewClusterDataCache(String viewClusterName) {
+    super(viewClusterName);
+  }
+
+  /**
+   * Update cache and return true if any local data is changed compared to before refresh.
+   * @param dataAccessor the data accessor used to fetch data.
+   * @return true if there is a change to local cache.
+   */
+  public boolean updateCache(HelixDataAccessor dataAccessor) {
+    Map<String, LiveInstance> liveInstanceSnapshot = new HashMap<>(getLiveInstances());
+    Map<String, ExternalView> externalViewSnapshot = new HashMap<>(getExternalViews());
+    Map<String, InstanceConfig> instanceConfigSnapshot = new HashMap<>(getInstanceConfigMap());
+    requireFullRefresh();
+    refresh(dataAccessor);
+
+    return !(liveInstanceSnapshot.equals(getLiveInstances())
+        && externalViewSnapshot.equals(getExternalViews())
+        && instanceConfigSnapshot.equals(getInstanceConfigMap()));
+  }
+}
diff --git a/helix-view-aggregator/src/test/java/org/apache/helix/view/aggregator/TestViewClusterRefresher.java b/helix-view-aggregator/src/test/java/org/apache/helix/view/aggregator/TestViewClusterRefresher.java
index 2a069ee..e642a30 100644
--- a/helix-view-aggregator/src/test/java/org/apache/helix/view/aggregator/TestViewClusterRefresher.java
+++ b/helix-view-aggregator/src/test/java/org/apache/helix/view/aggregator/TestViewClusterRefresher.java
@@ -103,6 +103,7 @@
     Assert.assertTrue(refresher.refreshPropertiesInViewCluster(PropertyType.INSTANCES));
     Assert.assertTrue(refresher.refreshPropertiesInViewCluster(PropertyType.EXTERNALVIEW));
     viewClusterDataAccessor.resetCounters();
+    refresher.refreshViewClusterDataCache();
 
     // Refresh again without change
     Assert.assertTrue(refresher.refreshPropertiesInViewCluster(PropertyType.LIVEINSTANCES));
@@ -147,6 +148,7 @@
         new ArrayList<>(sampleProvider.getLiveInstances().values());
     liveInstances.add(new LiveInstance("newLiveInstance"));
     sampleProvider.setLiveInstances(liveInstances);
+    refresher.refreshViewClusterDataCache();
     Assert.assertTrue(refresher.refreshPropertiesInViewCluster(PropertyType.LIVEINSTANCES));
     Assert.assertTrue(refresher.refreshPropertiesInViewCluster(PropertyType.INSTANCES));
     Assert.assertEquals(viewClusterDataAccessor.getSetCount(), 1);
@@ -219,6 +221,7 @@
     // remove InstanceConfig and ExternalView requirement from sample provider
     sampleProvider.getConfig()
         .setProperties(Collections.singletonList(PropertyType.LIVEINSTANCES));
+    refresher.refreshViewClusterDataCache();
 
     // Refresh again
     Assert.assertTrue(refresher.refreshPropertiesInViewCluster(PropertyType.LIVEINSTANCES));
diff --git a/helix-view-aggregator/src/test/java/org/apache/helix/view/dataprovider/TestViewClusterDataCache.java b/helix-view-aggregator/src/test/java/org/apache/helix/view/dataprovider/TestViewClusterDataCache.java
new file mode 100644
index 0000000..006cd02
--- /dev/null
+++ b/helix-view-aggregator/src/test/java/org/apache/helix/view/dataprovider/TestViewClusterDataCache.java
@@ -0,0 +1,68 @@
+package org.apache.helix.view.dataprovider;
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+import java.util.HashSet;
+import java.util.Set;
+import org.apache.helix.HelixConstants;
+import org.apache.helix.common.ZkTestBase;
+import org.apache.helix.integration.manager.MockParticipantManager;
+import org.apache.helix.manager.zk.ZKHelixDataAccessor;
+import org.testng.Assert;
+import org.testng.annotations.BeforeClass;
+import org.testng.annotations.Test;
+
+
+public class TestViewClusterDataCache extends ZkTestBase {
+  private static final int NUM_INSTANCE = 5;
+
+  private final String CLUSTER_NAME = CLUSTER_PREFIX + "_" + getShortClassName();
+  private final Set<String> _instances = new HashSet<>();
+
+  @BeforeClass
+  public void beforeClass() {
+    _gSetupTool.addCluster(CLUSTER_NAME, true);
+    for (int i = 0; i < NUM_INSTANCE; i++) {
+      String instanceName = String.format("%s-%s-%s", CLUSTER_NAME, PARTICIPANT_PREFIX, i);
+      _gSetupTool.addInstanceToCluster(CLUSTER_NAME, instanceName);
+      _instances.add(instanceName);
+      MockParticipantManager participant = new MockParticipantManager(ZK_ADDR, CLUSTER_NAME, instanceName);
+      participant.syncStart();
+    }
+  }
+
+  @Test
+  public void testCacheUpdate() {
+    ViewClusterDataCache cache = new ViewClusterDataCache(CLUSTER_NAME);
+    ZKHelixDataAccessor accessor = new ZKHelixDataAccessor(CLUSTER_NAME, _baseAccessor);
+    Assert.assertTrue(cache.updateCache(accessor));
+    Assert.assertEquals(cache.getLiveInstances().size(), NUM_INSTANCE);
+    Assert.assertEquals(cache.getInstanceConfigMap().size(), NUM_INSTANCE);
+    for (String instance : _instances) {
+      Assert.assertTrue(cache.getLiveInstances().containsKey(instance));
+    }
+    // update again, expect no cache change
+    Assert.assertFalse(cache.updateCache(accessor));
+    // clear cache and update again
+    cache.clearCache(HelixConstants.ChangeType.LIVE_INSTANCE);
+    cache.clearCache(HelixConstants.ChangeType.INSTANCE_CONFIG);
+    Assert.assertTrue(cache.updateCache(accessor));
+  }
+}
diff --git a/helix-view-aggregator/src/test/java/org/apache/helix/view/integration/TestHelixViewAggregator.java b/helix-view-aggregator/src/test/java/org/apache/helix/view/integration/TestHelixViewAggregator.java
index adf16ec..b98b684 100644
--- a/helix-view-aggregator/src/test/java/org/apache/helix/view/integration/TestHelixViewAggregator.java
+++ b/helix-view-aggregator/src/test/java/org/apache/helix/view/integration/TestHelixViewAggregator.java
@@ -26,12 +26,14 @@
 import java.util.Set;
 import org.apache.helix.ConfigAccessor;
 import org.apache.helix.HelixAdmin;
+import org.apache.helix.HelixDataAccessor;
 import org.apache.helix.HelixException;
 import org.apache.helix.NotificationContext;
 import org.apache.helix.PropertyType;
 import org.apache.helix.api.config.ViewClusterSourceConfig;
 import org.apache.helix.integration.manager.MockParticipantManager;
 import org.apache.helix.manager.zk.ZKHelixAdmin;
+import org.apache.helix.manager.zk.ZKHelixDataAccessor;
 import org.apache.helix.model.ClusterConfig;
 import org.apache.helix.model.Message;
 import org.apache.helix.participant.statemachine.StateModelParser;
@@ -214,6 +216,25 @@
     triggerViewAggregatorStateTransition("LEADER", "STANDBY");
   }
 
+  @Test(dependsOnMethods = "testHelixViewAggregator")
+  public void testRemoteDataRemovalAndRefresh() throws Exception {
+    HelixDataAccessor accessor = new ZKHelixDataAccessor(viewClusterName, _baseAccessor);
+    // Start view aggregator
+    triggerViewAggregatorStateTransition("STANDBY", "LEADER");
+    // Wait for refresh and verify
+    Thread.sleep((_viewClusterRefreshPeriodSec + 2) * 1000);
+    // remove live instances from view cluster zk data, wait for next refresh trigger
+    Assert.assertTrue(accessor.removeProperty(accessor.keyBuilder().liveInstances()));
+    Thread.sleep((_viewClusterRefreshPeriodSec + 2) * 1000);
+    Assert.assertTrue(accessor.getChildNames(accessor.keyBuilder().liveInstances()).size() > 0);
+
+    Assert.assertTrue(accessor.removeProperty(accessor.keyBuilder().externalViews()));
+    Thread.sleep((_viewClusterRefreshPeriodSec + 2) * 1000);
+    Assert.assertTrue(accessor.getChildNames(accessor.keyBuilder().externalViews()).size() > 0);
+    // Stop view aggregator
+    triggerViewAggregatorStateTransition("LEADER", "STANDBY");
+  }
+
   private void resetViewClusterConfig(int refreshPeriod, List<PropertyType> properties) {
     List<ViewClusterSourceConfig> sourceConfigs = new ArrayList<>();
     for (String sourceCluster : _allSourceClusters) {