HELIX-705: Implement ViewClusterRefresher logic and tests

RB=1213810
BUG=HELIX-776
G=helix-reviewers
R=lxia,jjwang,jxue,erkim
A=jjwang,jxue
diff --git a/helix-core/src/main/java/org/apache/helix/common/caches/BasicClusterDataCache.java b/helix-core/src/main/java/org/apache/helix/common/caches/BasicClusterDataCache.java
index 8331f30..2870112 100644
--- a/helix-core/src/main/java/org/apache/helix/common/caches/BasicClusterDataCache.java
+++ b/helix-core/src/main/java/org/apache/helix/common/caches/BasicClusterDataCache.java
@@ -46,6 +46,10 @@
   protected PropertyCache<LiveInstance> _liveInstancePropertyCache;
   protected PropertyCache<InstanceConfig> _instanceConfigPropertyCache;
   protected ExternalViewCache _externalViewCache;
+  protected Map<String, LiveInstance> _liveInstanceMap;
+  protected Map<String, InstanceConfig> _instanceConfigMap;
+  protected Map<String, ExternalView> _externalViewMap;
+  private final PropertyType _sourceDataType;
 
   protected String _clusterName;
 
diff --git a/helix-view-aggregator/src/main/java/org/apache/helix/view/aggregator/ViewClusterRefresher.java b/helix-view-aggregator/src/main/java/org/apache/helix/view/aggregator/ViewClusterRefresher.java
index e770302..58a35e6 100644
--- a/helix-view-aggregator/src/main/java/org/apache/helix/view/aggregator/ViewClusterRefresher.java
+++ b/helix-view-aggregator/src/main/java/org/apache/helix/view/aggregator/ViewClusterRefresher.java
@@ -19,32 +19,39 @@
  * under the License.
  */
 
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
 import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
 import java.util.Map;
+import java.util.Set;
 import org.apache.helix.HelixDataAccessor;
+import org.apache.helix.HelixProperty;
 import org.apache.helix.PropertyKey;
+import org.apache.helix.PropertyType;
 import org.apache.helix.model.ExternalView;
-import org.apache.helix.model.InstanceConfig;
-import org.apache.helix.model.LiveInstance;
 import org.apache.helix.view.dataprovider.SourceClusterDataProvider;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 /**
- * Contains logics to refresh view cluster based on information from source cluster data providers
+ * This class contains logics to refresh view cluster based on information from source cluster data
+ * providers.
+ * This class assumes SourceClusterDataProviders have its caches refreshed already.
  */
 public class ViewClusterRefresher {
   private static final Logger logger = LoggerFactory.getLogger(ViewClusterRefresher.class);
   private String _viewClusterName;
   private HelixDataAccessor _viewClusterDataAccessor;
-  private PropertyKey.Builder _viewClusterKeyBuilder;
   private Map<String, SourceClusterDataProvider> _dataProviderMap;
 
   // These 3 caches stores objects that are pushed to view cluster (write-through) cache,
   // thus we don't need to read from view cluster everytime we refresh it.
-  private Map<String, LiveInstance> _viewClusterLiveInstanceCache;
-  private Map<String, InstanceConfig> _viewClusterInstanceConfigCache;
-  private Map<String, ExternalView> _viewClusterExternalViewCache;
+  private Map<String, HelixProperty> _viewClusterLiveInstanceCache;
+  private Map<String, HelixProperty> _viewClusterInstanceConfigCache;
+  private Map<String, HelixProperty> _viewClusterExternalViewCache;
 
   public ViewClusterRefresher(String viewClusterName, HelixDataAccessor viewClusterDataAccessor,
       Map<String, SourceClusterDataProvider> dataProviderMap) {
@@ -56,38 +63,135 @@
     _viewClusterExternalViewCache = new HashMap<>();
   }
 
-  /**
-   * Create / update current live instances; remove dead live instances.
-   * This function assumes that all data providers have been refreshed.
-   */
-  public void refreshLiveInstancesInViewCluster() {
-    // TODO: implement it
+  private class ClusterPropertyDiff {
+    /**
+     * List of names of objects to set (create or modify)
+     */
+    List<String> _keysToSet;
+
+    /**
+     * List of actual objects represented by _keysToSet
+     */
+    List<HelixProperty> _propertiesToSet;
+
+    /**
+     * List of names of objects to delete
+     */
+    List<String> _keysToDelete;
+
+    public ClusterPropertyDiff() {
+      _keysToSet = new ArrayList<>();
+      _propertiesToSet = new ArrayList<>();
+      _keysToDelete = new ArrayList<>();
+    }
+
+    public void addPropertyToSet(String key, HelixProperty obj) {
+      // batch setChildren API needs keys and objects to have corresponding order
+      _keysToSet.add(key);
+      _propertiesToSet.add(obj);
+    }
+
+    public void addPropertiesToDelete(Collection<? extends String> keys) {
+      _keysToDelete.addAll(keys);
+    }
+
+    public List<String> getKeysToSet() {
+      return Collections.unmodifiableList(_keysToSet);
+    }
+
+    public List<HelixProperty> getPropertiesToSet() {
+      return Collections.unmodifiableList(_propertiesToSet);
+    }
+
+    public List<String> getKeysToDelete() {
+      return Collections.unmodifiableList(_keysToDelete);
+    }
   }
 
   /**
-   * Create / update current instance configs; remove dead instance configs
-   * This function assumes that all data providers have been refreshed.
+   * Create / update / delete property of given type in view cluster, based on data change from
+   * source clusters.
+   *
+   * @param propertyType type of property to refresh in view cluster
+   * @return true if successfully refreshed all instances of the given property else false
+   * @throws IllegalArgumentException throws exception when give type is not supported
    */
-  public void refreshInstanceConfigsInViewCluster() {
-    // TODO: implement it
-  }
+  public boolean refreshPropertiesInViewCluster(PropertyType propertyType)
+      throws IllegalArgumentException {
+    boolean ok = false;
+    Set<String> listedNamesInView;
+    Set<String> listedNamesInSource = new HashSet<>();
+    Map<String, HelixProperty> sourceProperties = new HashMap<>();
+    Map<String, HelixProperty> viewClusterPropertyCache =
+        getViewClusterPropertyCache(propertyType);
+    if (viewClusterPropertyCache == null) {
+      throw new IllegalArgumentException(
+          "Cannot find view cluster property cache. Property: " + propertyType.name());
+    }
 
-  /**
-   * Create / update currently existing external views; delete outdated external views.
-   * This function assumes that all data providers have been refreshed.
-   */
-  public void refreshExtrenalViewsInViewCluster() {
-    // TODO: implement it
+    try {
+      listedNamesInView =
+          new HashSet<>(_viewClusterDataAccessor.getChildNames(getPropertyKey(propertyType, null)));
+      // Prepare data
+      for (SourceClusterDataProvider provider : _dataProviderMap.values()) {
+        if (!provider.getPropertiesToAggregate().contains(propertyType)) {
+          logger.info(String
+              .format("SourceCluster %s does not need to aggregate %s, skip.", provider.getName(),
+                  propertyType.name()));
+          continue;
+        }
+        switch (propertyType) {
+        case INSTANCES:
+          listedNamesInSource.addAll(provider.getInstanceConfigNames());
+          sourceProperties.putAll(provider.getInstanceConfigMap());
+          break;
+        case LIVEINSTANCES:
+          listedNamesInSource.addAll(provider.getLiveInstanceNames());
+          sourceProperties.putAll(provider.getLiveInstances());
+          break;
+        case EXTERNALVIEW:
+          listedNamesInSource.addAll(provider.getExternalViewNames());
+          for (Map.Entry<String, ExternalView> entry : provider.getExternalViews().entrySet()) {
+            String resourceName = entry.getKey();
+            ExternalView resourceEV = entry.getValue();
+            if (sourceProperties.containsKey(resourceName)) {
+              // Merge external views if we already have a record
+              mergeExternalViews((ExternalView) sourceProperties.get(resourceName), resourceEV);
+            } else {
+              // merging simple fields and list fields are meaningless and would cause confusion
+              resourceEV.getRecord().getSimpleFields().clear();
+              resourceEV.getRecord().getListFields().clear();
+              sourceProperties.put(resourceName, resourceEV);
+            }
+          }
+          break;
+        default:
+          // Will NOT come here as for unsupported property type, exception will be thrown out
+          // earlier
+          break;
+        }
+      }
+
+      // Perform refresh
+      ok = doRefresh(propertyType, listedNamesInView, listedNamesInSource, sourceProperties,
+          viewClusterPropertyCache);
+    } catch (Exception e) {
+      logger.warn(String
+          .format("Caught exception during refreshing %s for view cluster %s", propertyType.name(),
+              _viewClusterName), e);
+    }
+    return ok;
   }
 
   /**
    * Merge external view "toMerge" into external view "source":
    *  - if partition in toMerge does not exist in source, we add it into source
    *  - if partition exist in both external views, we add all map fields from toMerge to source
+   *
    * @param source
    * @param toMerge
    */
-  public static void mergeExternalViews(ExternalView source, ExternalView toMerge)
+  private static void mergeExternalViews(ExternalView source, ExternalView toMerge)
       throws IllegalArgumentException {
     if (!source.getId().equals(toMerge.getId())) {
       throw new IllegalArgumentException(String
@@ -104,4 +208,220 @@
       }
     }
   }
+
+  /**
+   * Based on property names in view cluster, property names in source clusters, and all cached
+   * properties in source clusters, generate ClusterPropertyDiff that contains information about
+   * what to add / update or delete
+   *
+   * @param viewPropertyNames names of all properties (i.e. liveInstances) in view cluster
+   * @param sourcePropertyNames names of all properties (i.e. liveInstances) in all source clusters
+   * @param cachedSourceProperties all cached properties from source clusters
+   * @param viewClusterPropertyCache all properties that are previously set successfully to view cluster
+   * @param <T> extends HelixProperty
+   * @return ClusterPropertyDiff object contains diff information
+   */
+  private <T extends HelixProperty> ClusterPropertyDiff calculatePropertyDiff(
+      Set<String> viewPropertyNames, Set<String> sourcePropertyNames,
+      Map<String, T> cachedSourceProperties, Map<String, T> viewClusterPropertyCache) {
+    ClusterPropertyDiff diff = new ClusterPropertyDiff();
+
+    // items whose names are in view cluster but not in source should be removed for sure
+    Set<String> toDelete = new HashSet<>(viewPropertyNames);
+    toDelete.removeAll(sourcePropertyNames);
+    diff.addPropertiesToDelete(toDelete);
+
+    for (Map.Entry<String, T> sourceProperty : cachedSourceProperties.entrySet()) {
+      String name = sourceProperty.getKey();
+      HelixProperty property = sourceProperty.getValue();
+
+      // cache refresh happens earlier than list curNames, so if cache is still in curNames,
+      // we confirm that this is a valid live instance. This is necessary because ZK
+      // can possibly not return all children content in a refresh, but list child names
+      // will reliably return all children names.
+      //
+      // Else, either this child is already deleted, or we fail to retrieve information
+      // from a cache refresh. either way, we will leave it to next ViewClusterRefresh cycle
+      // to confirm state
+      if (property != null && sourcePropertyNames.contains(name) && (
+          !viewClusterPropertyCache.containsKey(name) || !viewClusterPropertyCache.get(name)
+              .getRecord().equals(property.getRecord()))) {
+        diff.addPropertyToSet(name, property);
+      }
+    }
+    return diff;
+  }
+
+  /**
+   * Refresh view cluster regarding a particular property based given source of truths.
+   * Steps are:
+   *  - Calculate diff based on propertyNamesInView, propertyNamesInSource and
+   *    cachedSourceProperties
+   *  - Generate property keys for properties to set / delete
+   *  - Delete properties
+   *  - Set properties
+   *
+   * @param propertyType type of property to refresh
+   * @param viewPropertyNames all names of the target properties in view cluster
+   * @param sourcePropertyNames all names of the target properties in source clusters
+   * @param cachedSourceProperties all up-to-date cached properties in source cluster
+   * @param viewClusterPropertyCache view cluster cache
+   * @param <T> extends HelixProperty
+   * @return true if all required refreshes are successful, else false
+   */
+  private <T extends HelixProperty> boolean doRefresh(PropertyType propertyType,
+      Set<String> viewPropertyNames, Set<String> sourcePropertyNames,
+      Map<String, T> cachedSourceProperties, Map<String, T> viewClusterPropertyCache) {
+    boolean ok = true;
+
+    // Calculate diff
+    ClusterPropertyDiff diff =
+        calculatePropertyDiff(viewPropertyNames, sourcePropertyNames, cachedSourceProperties,
+            viewClusterPropertyCache);
+
+    // Generate property keys
+    List<PropertyKey> keysToSet = new ArrayList<>();
+    List<PropertyKey> keysToDelete = new ArrayList<>();
+    for (String name : diff.getKeysToSet()) {
+      PropertyKey key = getPropertyKey(propertyType, name);
+      if (key != null) {
+        keysToSet.add(key);
+      }
+    }
+
+    for (String name : diff.getKeysToDelete()) {
+      PropertyKey key = getPropertyKey(propertyType, name);
+      if (key != null) {
+        keysToDelete.add(key);
+      }
+    }
+
+    // Delete outdated properties
+    if (!deleteProperties(keysToDelete, viewClusterPropertyCache)) {
+      ok = false;
+    }
+
+    // Add or update changed properties
+    if (!addOrUpdateProperties(keysToSet, diff.getPropertiesToSet(), viewClusterPropertyCache)) {
+      ok = false;
+    }
+    return ok;
+  }
+
+  /**
+   * Based on type and property name, generate property key
+   * @param propertyType type of the property
+   * @param propertyName name of the property. If null, return key of the parent of
+   *                     all properties of given type
+   * @return property key
+   */
+  private PropertyKey getPropertyKey(PropertyType propertyType, String propertyName) {
+    switch (propertyType) {
+    case INSTANCES:
+      return propertyName == null
+          ? _viewClusterDataAccessor.keyBuilder().instanceConfigs()
+          : _viewClusterDataAccessor.keyBuilder().instanceConfig(propertyName);
+    case LIVEINSTANCES:
+      return propertyName == null
+          ? _viewClusterDataAccessor.keyBuilder().liveInstances()
+          : _viewClusterDataAccessor.keyBuilder().liveInstance(propertyName);
+    case EXTERNALVIEW:
+      return propertyName == null
+          ? _viewClusterDataAccessor.keyBuilder().externalViews()
+          : _viewClusterDataAccessor.keyBuilder().externalView(propertyName);
+    default:
+      return null;
+    }
+  }
+
+  private Map<String, HelixProperty> getViewClusterPropertyCache(PropertyType propertyType) {
+    switch (propertyType) {
+    case INSTANCES:
+      return _viewClusterInstanceConfigCache;
+    case LIVEINSTANCES:
+      return _viewClusterLiveInstanceCache;
+    case EXTERNALVIEW:
+      return _viewClusterExternalViewCache;
+    default:
+      return null;
+    }
+  }
+
+  /**
+   * Create or Update properties in ZK specified by a list of property keys. Update the given cache
+   * for the objects that got successfully created or updated in ZK
+   * @param keysToAddOrUpdate
+   * @param objects
+   * @param cache
+   * @param <T> HelixProperty
+   *
+   * @return true if all objects are successfully created or updated, else false
+   */
+  private <T extends HelixProperty> boolean addOrUpdateProperties(
+      List<PropertyKey> keysToAddOrUpdate, List<HelixProperty> objects, Map<String, T> cache) {
+    boolean ok = true;
+    logger.info(String.format("AddOrUpdate objects: %s", keysToAddOrUpdate));
+    boolean[] addOrUpdateResults = _viewClusterDataAccessor.setChildren(keysToAddOrUpdate, objects);
+    for (int i = 0; i < addOrUpdateResults.length; i++) {
+      if (!addOrUpdateResults[i]) {
+        // Don't add item to cache yet - will retry during next refresh
+        logger.warn(String.format("Failed to create or update live instance %s, will retry later",
+            keysToAddOrUpdate.get(i).getPath()));
+        ok = false;
+      } else {
+        // Successfully updated ViewClusterZK, proceed to update cache
+        @SuppressWarnings("unchecked")
+        T property = (T) objects.get(i);
+        cache.put(getBaseObjectNameFromPropertyKey(keysToAddOrUpdate.get(i)), property);
+      }
+    }
+    return ok;
+  }
+
+  /**
+   * Delete properties in ZK specified by a list of property keys. Update the given cache
+   * for the objects that got successfully deleted in ZK
+   * @param keysToDelete
+   * @param cache
+   * @param <T> HelixProperty
+   * @return true if all objects got successfully deleted else false
+   */
+  private <T extends HelixProperty> boolean deleteProperties(List<PropertyKey> keysToDelete,
+      Map<String, T> cache) {
+    boolean ok = true;
+    logger.info(String.format("Deleting objects: %s", keysToDelete));
+    for (PropertyKey key : keysToDelete) {
+      if (!_viewClusterDataAccessor.removeProperty(key)) {
+        // Don't remove item from cache yet - will retry during next refresh
+        ok = false;
+        logger.warn(String.format("Failed to create or update live instance %s, will retry later",
+            key.getPath()));
+      } else {
+        // Successfully updated ViewClusterZK, proceed to update cache
+        cache.remove(getBaseObjectNameFromPropertyKey(key));
+      }
+    }
+    return ok;
+  }
+
+  /**
+   * Parse property key and get the id of the ZNode that property key represents
+   * @param key
+   * @return
+   */
+  private static String getBaseObjectNameFromPropertyKey(PropertyKey key) {
+    String[] params = key.getParams();
+    return params[params.length - 1];
+  }
+
+  private void logRefreshResult(PropertyType type, boolean ok) {
+    if (!ok) {
+      logger.warn(String
+          .format("Failed to refresh all %s for view cluster %s, will retry",
+              type.name(), _viewClusterName));
+    } else {
+      logger.info(String.format("Successfully refreshed all %s for view cluster %s",
+          type.name(), _viewClusterName));
+    }
+  }
 }
diff --git a/helix-view-aggregator/src/main/java/org/apache/helix/view/dataprovider/SourceClusterDataProvider.java b/helix-view-aggregator/src/main/java/org/apache/helix/view/dataprovider/SourceClusterDataProvider.java
index 7b148e0..3c590a6 100644
--- a/helix-view-aggregator/src/main/java/org/apache/helix/view/dataprovider/SourceClusterDataProvider.java
+++ b/helix-view-aggregator/src/main/java/org/apache/helix/view/dataprovider/SourceClusterDataProvider.java
@@ -52,9 +52,9 @@
     implements InstanceConfigChangeListener, LiveInstanceChangeListener,
     ExternalViewChangeListener {
   private final HelixManager _helixManager;
-  private final ViewClusterSourceConfig _sourceClusterConfig;
   private final ClusterEventProcessor _eventProcessor;
 
+  protected ViewClusterSourceConfig _sourceClusterConfig;
   private HelixDataAccessor _dataAccessor;
   private PropertyKey.Builder _propertyKeyBuilder;
 
diff --git a/helix-view-aggregator/src/test/java/org/apache/helix/view/aggregator/TestViewClusterRefresher.java b/helix-view-aggregator/src/test/java/org/apache/helix/view/aggregator/TestViewClusterRefresher.java
new file mode 100644
index 0000000..0d0af22
--- /dev/null
+++ b/helix-view-aggregator/src/test/java/org/apache/helix/view/aggregator/TestViewClusterRefresher.java
@@ -0,0 +1,335 @@
+package org.apache.helix.view.aggregator;
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import org.apache.helix.HelixConstants;
+import org.apache.helix.HelixDataAccessor;
+import org.apache.helix.HelixProperty;
+import org.apache.helix.MockAccessor;
+import org.apache.helix.PropertyKey;
+import org.apache.helix.PropertyType;
+import org.apache.helix.api.config.ViewClusterSourceConfig;
+import org.apache.helix.model.ExternalView;
+import org.apache.helix.model.InstanceConfig;
+import org.apache.helix.model.LiveInstance;
+import org.apache.helix.view.dataprovider.SourceClusterDataProvider;
+import org.apache.helix.view.mock.MockSourceClusterDataProvider;
+import org.testng.Assert;
+import org.testng.annotations.Test;
+
+public class TestViewClusterRefresher {
+  private static final String viewClusterName = "viewCluster";
+  private static final int numSourceCluster = 3;
+  private static final int numInstancePerSourceCluster = 2;
+  private static final int numExternalViewPerSourceCluster = 3;
+  private static final int numPartition = 3;
+  private static final List<PropertyType> defaultProperties = Arrays.asList(
+      new PropertyType[] { PropertyType.LIVEINSTANCES, PropertyType.INSTANCES,
+          PropertyType.EXTERNALVIEW
+      });
+
+  private class CounterBasedMockAccessor extends MockAccessor {
+    private int _setCount;
+    private int _removeCount;
+
+    public CounterBasedMockAccessor(String clusterName) {
+      super(clusterName);
+      resetCounters();
+    }
+
+    public void resetCounters() {
+      _setCount = 0;
+      _removeCount = 0;
+    }
+
+    @Override
+    public boolean setProperty(PropertyKey key, HelixProperty value) {
+      _setCount += 1;
+      return super.setProperty(key, value);
+    }
+
+    @Override
+    public boolean removeProperty(PropertyKey key) {
+      _removeCount += 1;
+      return super.removeProperty(key);
+    }
+
+    public int getSetCount() {
+      return _setCount;
+    }
+
+    public int getRemoveCount() {
+      return _removeCount;
+    }
+  }
+
+  @Test
+  public void testRefreshWithNoChange() {
+    CounterBasedMockAccessor viewClusterDataAccessor =
+        new CounterBasedMockAccessor(viewClusterName);
+    Map<String, SourceClusterDataProvider> dataProviderMap = new HashMap<>();
+    createMockDataProviders(dataProviderMap);
+
+    ViewClusterRefresher refresher =
+        new ViewClusterRefresher(viewClusterName, viewClusterDataAccessor, dataProviderMap);
+
+    // Refresh an empty view cluster
+    Assert.assertTrue(refresher.refreshPropertiesInViewCluster(PropertyType.LIVEINSTANCES));
+    Assert.assertTrue(refresher.refreshPropertiesInViewCluster(PropertyType.INSTANCES));
+    Assert.assertTrue(refresher.refreshPropertiesInViewCluster(PropertyType.EXTERNALVIEW));
+    viewClusterDataAccessor.resetCounters();
+
+    // Refresh again without change
+    Assert.assertTrue(refresher.refreshPropertiesInViewCluster(PropertyType.LIVEINSTANCES));
+    Assert.assertTrue(refresher.refreshPropertiesInViewCluster(PropertyType.INSTANCES));
+    Assert.assertTrue(refresher.refreshPropertiesInViewCluster(PropertyType.EXTERNALVIEW));
+    Assert.assertEquals(viewClusterDataAccessor.getSetCount(), 0);
+  }
+
+  @Test
+  public void testRefreshWithInstanceChange() {
+    CounterBasedMockAccessor viewClusterDataAccessor =
+        new CounterBasedMockAccessor(viewClusterName);
+    Map<String, SourceClusterDataProvider> dataProviderMap = new HashMap<>();
+    createMockDataProviders(dataProviderMap);
+
+    ViewClusterRefresher refresher =
+        new ViewClusterRefresher(viewClusterName, viewClusterDataAccessor, dataProviderMap);
+    MockSourceClusterDataProvider sampleProvider =
+        (MockSourceClusterDataProvider) dataProviderMap.get("cluster0");
+
+    // Refresh an empty view cluster
+    Assert.assertTrue(refresher.refreshPropertiesInViewCluster(PropertyType.LIVEINSTANCES));
+    Assert.assertTrue(refresher.refreshPropertiesInViewCluster(PropertyType.INSTANCES));
+    // multiply by 2 for both instance config and live instances
+    Assert.assertEquals(viewClusterDataAccessor.getSetCount(),
+        numSourceCluster * numInstancePerSourceCluster * 2);
+    verifyInstances(viewClusterDataAccessor, dataProviderMap);
+
+    // Source cluster has instances deleted
+    viewClusterDataAccessor.resetCounters();
+    sampleProvider.clearCache(HelixConstants.ChangeType.LIVE_INSTANCE);
+    sampleProvider.clearCache(HelixConstants.ChangeType.INSTANCE_CONFIG);
+    Assert.assertTrue(refresher.refreshPropertiesInViewCluster(PropertyType.LIVEINSTANCES));
+    Assert.assertTrue(refresher.refreshPropertiesInViewCluster(PropertyType.INSTANCES));
+    Assert.assertEquals(viewClusterDataAccessor.getRemoveCount(), numInstancePerSourceCluster * 2);
+
+    verifyInstances(viewClusterDataAccessor, dataProviderMap);
+
+    // Source cluster has live instances added
+    viewClusterDataAccessor.resetCounters();
+    List<LiveInstance> liveInstances =
+        new ArrayList<>(sampleProvider.getLiveInstances().values());
+    liveInstances.add(new LiveInstance("newLiveInstance"));
+    sampleProvider.setLiveInstances(liveInstances);
+    Assert.assertTrue(refresher.refreshPropertiesInViewCluster(PropertyType.LIVEINSTANCES));
+    Assert.assertTrue(refresher.refreshPropertiesInViewCluster(PropertyType.INSTANCES));
+    Assert.assertEquals(viewClusterDataAccessor.getSetCount(), 1);
+
+    verifyInstances(viewClusterDataAccessor, dataProviderMap);
+  }
+
+  @Test
+  public void testRefreshWithExternalViewChange() {
+    CounterBasedMockAccessor accessor =
+        new CounterBasedMockAccessor(viewClusterName);
+    Map<String, SourceClusterDataProvider> dataProviderMap = new HashMap<>();
+    createMockDataProviders(dataProviderMap);
+
+    ViewClusterRefresher refresher =
+        new ViewClusterRefresher(viewClusterName, accessor, dataProviderMap);
+    MockSourceClusterDataProvider sampleProvider =
+        (MockSourceClusterDataProvider) dataProviderMap.get("cluster0");
+
+    // Refresh an empty view cluster
+    Assert.assertTrue(refresher.refreshPropertiesInViewCluster(PropertyType.EXTERNALVIEW));
+    // We only have 2 resource names so should be setting 2 evs
+    Assert.assertEquals(accessor.getSetCount(), numExternalViewPerSourceCluster);
+    Assert.assertEquals(accessor.getRemoveCount(), 0);
+    // Partition count should be maintained
+    // Since we are creating 1 replica per source cluster so replica should be aggregated.
+    verifyExternalView(accessor, numExternalViewPerSourceCluster, numPartition, numSourceCluster);
+
+    // One cluster deletes all its EVs, number of partition should reflect in changes
+    accessor.resetCounters();
+    sampleProvider.clearCache(HelixConstants.ChangeType.EXTERNAL_VIEW);
+    Assert.assertTrue(refresher.refreshPropertiesInViewCluster(PropertyType.EXTERNALVIEW));
+    Assert.assertEquals(accessor.getSetCount(), numExternalViewPerSourceCluster);
+    Assert.assertEquals(accessor.getRemoveCount(), 0);
+    verifyExternalView(accessor, numExternalViewPerSourceCluster, numPartition,
+        numSourceCluster - 1);
+
+    // All EVs are deleted, and all EVs should be removed from view cluster
+    for (SourceClusterDataProvider provider : dataProviderMap.values()) {
+      provider.clearCache(HelixConstants.ChangeType.EXTERNAL_VIEW);
+    }
+    accessor.resetCounters();
+    Assert.assertTrue(refresher.refreshPropertiesInViewCluster(PropertyType.EXTERNALVIEW));
+    Assert.assertEquals(accessor.getSetCount(), 0);
+    Assert.assertEquals(accessor.getRemoveCount(), numExternalViewPerSourceCluster);
+    verifyExternalView(accessor, 0, 0, 0);
+  }
+
+  @Test
+  public void testRefreshWithProviderChange() {
+    CounterBasedMockAccessor viewClusterDataAccessor =
+        new CounterBasedMockAccessor(viewClusterName);
+    Map<String, SourceClusterDataProvider> dataProviderMap = new HashMap<>();
+    createMockDataProviders(dataProviderMap);
+
+    ViewClusterRefresher refresher =
+        new ViewClusterRefresher(viewClusterName, viewClusterDataAccessor, dataProviderMap);
+    MockSourceClusterDataProvider sampleProvider =
+        (MockSourceClusterDataProvider) dataProviderMap.get("cluster0");
+
+    // Refresh an empty view cluster
+    Assert.assertTrue(refresher.refreshPropertiesInViewCluster(PropertyType.LIVEINSTANCES));
+    Assert.assertTrue(refresher.refreshPropertiesInViewCluster(PropertyType.INSTANCES));
+    Assert.assertTrue(refresher.refreshPropertiesInViewCluster(PropertyType.EXTERNALVIEW));
+    verifyInstances(viewClusterDataAccessor, dataProviderMap);
+    verifyExternalView(viewClusterDataAccessor, numExternalViewPerSourceCluster, numPartition,
+        numSourceCluster);
+    viewClusterDataAccessor.resetCounters();
+
+    // remove InstanceConfig and ExternalView requirement from sample provider
+    sampleProvider.getConfig()
+        .setProperties(Arrays.asList(new PropertyType[] { PropertyType.LIVEINSTANCES }));
+
+    // Refresh again
+    Assert.assertTrue(refresher.refreshPropertiesInViewCluster(PropertyType.LIVEINSTANCES));
+    Assert.assertTrue(refresher.refreshPropertiesInViewCluster(PropertyType.INSTANCES));
+    Assert.assertTrue(refresher.refreshPropertiesInViewCluster(PropertyType.EXTERNALVIEW));
+
+    // Removing external view from config of 1 cluster will not result in removing any external view
+    // but to update external views
+    Assert.assertEquals(viewClusterDataAccessor.getSetCount(), numExternalViewPerSourceCluster);
+    Assert.assertEquals(viewClusterDataAccessor.getRemoveCount(), numInstancePerSourceCluster);
+    verifyExternalView(viewClusterDataAccessor, numExternalViewPerSourceCluster, numPartition,
+        numSourceCluster - 1);
+    verifyInstances(viewClusterDataAccessor, dataProviderMap);
+
+    // Remove external view from all source clusters, will resulting in removing all external views
+    viewClusterDataAccessor.resetCounters();
+    for (SourceClusterDataProvider provider : dataProviderMap.values()) {
+      MockSourceClusterDataProvider mockProvider = (MockSourceClusterDataProvider) provider;
+      if (mockProvider != sampleProvider) {
+        mockProvider.getConfig().setProperties(Arrays
+            .asList(new PropertyType[] { PropertyType.LIVEINSTANCES, PropertyType.INSTANCES }));
+      }
+    }
+
+    // Refresh again
+    Assert.assertTrue(refresher.refreshPropertiesInViewCluster(PropertyType.LIVEINSTANCES));
+    Assert.assertTrue(refresher.refreshPropertiesInViewCluster(PropertyType.INSTANCES));
+    Assert.assertTrue(refresher.refreshPropertiesInViewCluster(PropertyType.EXTERNALVIEW));
+
+    // No change in instances, but external views are all removed
+    Assert.assertEquals(viewClusterDataAccessor.getSetCount(), 0);
+    Assert.assertEquals(viewClusterDataAccessor.getRemoveCount(), numExternalViewPerSourceCluster);
+    verifyExternalView(viewClusterDataAccessor, 0, 0, 0);
+    verifyInstances(viewClusterDataAccessor, dataProviderMap);
+  }
+
+  private void verifyExternalView(HelixDataAccessor accessor, int expectedResourceCnt,
+      int expectedPartitionPerResource, int expectedReplicaPerPartition) {
+    PropertyKey.Builder keyBuilder = accessor.keyBuilder();
+    Assert.assertEquals(accessor.getChildNames(keyBuilder.externalViews()).size(),
+        expectedResourceCnt);
+    for (String resourceName : accessor.getChildNames(keyBuilder.externalViews())) {
+      ExternalView ev = accessor.getProperty(keyBuilder.externalView(resourceName));
+
+      Assert.assertEquals(ev.getPartitionSet().size(), expectedPartitionPerResource);
+      for (String partitionName : ev.getPartitionSet()) {
+        Assert.assertEquals(ev.getStateMap(partitionName).size(), expectedReplicaPerPartition);
+      }
+    }
+  }
+
+  private void verifyInstances(HelixDataAccessor accessor,
+      Map<String, SourceClusterDataProvider> dataProviderMap) {
+    // Verify live instances - since we don't modify ZNode's internal content,
+    // we just verify names
+    Set<String> fetchedNames = new HashSet<>(accessor
+        .getChildNames(accessor.keyBuilder().liveInstances()));
+    Set<String> expectedNames = new HashSet<>();
+    for (SourceClusterDataProvider provider : dataProviderMap.values()) {
+      if (provider.getPropertiesToAggregate().contains(PropertyType.LIVEINSTANCES)) {
+        expectedNames.addAll(provider.getLiveInstanceNames());
+      }
+    }
+    Assert.assertEquals(fetchedNames, expectedNames);
+
+    // Verify instance configs - since we don't modify ZNode's internal content,
+    // we just verify names
+    fetchedNames = new HashSet<>(accessor
+        .getChildNames(accessor.keyBuilder().instanceConfigs()));
+    expectedNames = new HashSet<>();
+    for (SourceClusterDataProvider provider : dataProviderMap.values()) {
+      if (provider.getPropertiesToAggregate().contains(PropertyType.INSTANCES)) {
+        expectedNames.addAll(provider.getInstanceConfigNames());
+      }
+    }
+    Assert.assertEquals(fetchedNames, expectedNames);
+  }
+
+  private void createMockDataProviders(Map<String, SourceClusterDataProvider> dataProviderMap) {
+    for (int i = 0; i < numSourceCluster; i++) {
+      String sourceClusterName = "cluster" + i;
+      ViewClusterSourceConfig sourceConfig =
+          new ViewClusterSourceConfig(sourceClusterName, "", defaultProperties);
+      MockSourceClusterDataProvider provider =
+          new MockSourceClusterDataProvider(sourceConfig, null);
+      List<LiveInstance> liveInstanceList = new ArrayList<>();
+      List<InstanceConfig> instanceConfigList = new ArrayList<>();
+      List<ExternalView> externalViewList = new ArrayList<>();
+
+      // InstanceConfig and LiveInstance
+      for (int j = 0; j < numInstancePerSourceCluster; j++) {
+        String instanceName = String.format("%s-instance%s", sourceClusterName, j);
+        liveInstanceList.add(new LiveInstance(instanceName));
+        instanceConfigList.add(new InstanceConfig(instanceName));
+      }
+      provider.setLiveInstances(liveInstanceList);
+      provider.setInstanceConfigs(instanceConfigList);
+
+      // ExternalView
+      for (int j = 0; j < numExternalViewPerSourceCluster; j++) {
+        String resourceName = String.format("Resource%s", j);
+        ExternalView ev = new ExternalView(resourceName);
+        for (int k = 0; k < numPartition; k++) {
+          String partitionName = String.format("Partition%s", k);
+          Map<String, String> stateMap = new HashMap<>();
+          stateMap.put(String.format("%s-instance", sourceClusterName), "MASTER");
+          ev.setStateMap(partitionName, stateMap);
+        }
+        externalViewList.add(ev);
+      }
+      provider.setExternalViews(externalViewList);
+      dataProviderMap.put(sourceClusterName, provider);
+    }
+  }
+}
diff --git a/helix-view-aggregator/src/test/java/org/apache/helix/view/mock/MockSourceClusterDataProvider.java b/helix-view-aggregator/src/test/java/org/apache/helix/view/mock/MockSourceClusterDataProvider.java
new file mode 100644
index 0000000..d7d0599
--- /dev/null
+++ b/helix-view-aggregator/src/test/java/org/apache/helix/view/mock/MockSourceClusterDataProvider.java
@@ -0,0 +1,84 @@
+package org.apache.helix.view.mock;
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+import java.util.ArrayList;
+import java.util.List;
+import org.apache.helix.api.config.ViewClusterSourceConfig;
+import org.apache.helix.common.ClusterEventProcessor;
+import org.apache.helix.model.ExternalView;
+import org.apache.helix.model.InstanceConfig;
+import org.apache.helix.model.LiveInstance;
+import org.apache.helix.view.dataprovider.SourceClusterDataProvider;
+
+public class MockSourceClusterDataProvider extends SourceClusterDataProvider {
+
+  public MockSourceClusterDataProvider(ViewClusterSourceConfig config,
+      ClusterEventProcessor processor) {
+    super(config, processor);
+  }
+
+  @Override
+  public void setup() {}
+
+  @Override
+  public void refreshCache() {}
+
+  @Override
+  public List<String> getInstanceConfigNames() {
+    return new ArrayList<>(getInstanceConfigMap().keySet());
+  }
+
+  @Override
+  public List<String> getLiveInstanceNames() {
+    return new ArrayList<>(getLiveInstances().keySet());
+  }
+
+  @Override
+  public List<String> getExternalViewNames() {
+    return new ArrayList<>(getExternalViews().keySet());
+  }
+
+  public void setConfig(ViewClusterSourceConfig config) {
+    _sourceClusterConfig = config;
+  }
+
+  public ViewClusterSourceConfig getConfig() {
+    return _sourceClusterConfig;
+  }
+
+  public void setInstanceConfigs(List<InstanceConfig> instanceConfigList) {
+    for (InstanceConfig config : instanceConfigList) {
+      _instanceConfigMap.put(config.getInstanceName(), config);
+    }
+  }
+
+  public void setLiveInstances(List<LiveInstance> liveInstanceList) {
+    for (LiveInstance instance : liveInstanceList) {
+      _liveInstanceMap.put(instance.getInstanceName(), instance);
+    }
+  }
+
+  public void setExternalViews(List<ExternalView> externalViewList) {
+    for (ExternalView ev : externalViewList) {
+      _externalViewMap.put(ev.getResourceName(), ev);
+    }
+  }
+}
diff --git a/helix-view-aggregator/src/test/resources/log4j.properties b/helix-view-aggregator/src/test/resources/log4j.properties
index 24b6d10..8e9c0c0 100644
--- a/helix-view-aggregator/src/test/resources/log4j.properties
+++ b/helix-view-aggregator/src/test/resources/log4j.properties
@@ -22,11 +22,11 @@
 # A1 is set to be a ConsoleAppender.
 log4j.appender.C=org.apache.log4j.ConsoleAppender
 log4j.appender.C.layout=org.apache.log4j.PatternLayout
-log4j.appender.C.layout.ConversionPattern=%-4r [%t] %-5p %c %x - %m%n
+log4j.appender.C.layout.ConversionPattern=[%d] [%-5p] [%t] [%c:%L] - %m%n
 
 log4j.appender.R=org.apache.log4j.RollingFileAppender
 log4j.appender.R.layout=org.apache.log4j.PatternLayout
-log4j.appender.R.layout.ConversionPattern=%5p [%C:%M] (%F:%L) - %m%n
+log4j.appender.R.layout.ConversionPattern=[%d] [%-5p] [%t] [%c:%L] - %m%n
 log4j.appender.R.File=target/ClusterManagerLogs/log.txt
 
 log4j.appender.STATUSDUMP=org.apache.log4j.RollingFileAppender
@@ -35,6 +35,7 @@
 
 log4j.logger.org.I0Itec=ERROR
 log4j.logger.org.apache=ERROR
+log4j.logger.org.apache.helix.view=INFO
 log4j.logger.com.noelios=ERROR
 log4j.logger.org.restlet=ERROR