Update dependencies and fix compile errors

Unit test fix and code style improvement.
diff --git a/helix-core/src/main/java/org/apache/helix/api/config/ViewClusterSourceConfig.java b/helix-core/src/main/java/org/apache/helix/api/config/ViewClusterSourceConfig.java
index ff24211..174f6a8 100644
--- a/helix-core/src/main/java/org/apache/helix/api/config/ViewClusterSourceConfig.java
+++ b/helix-core/src/main/java/org/apache/helix/api/config/ViewClusterSourceConfig.java
@@ -19,40 +19,37 @@
  * under the License.
  */
 
+import com.fasterxml.jackson.annotation.JsonCreator;
+import com.fasterxml.jackson.annotation.JsonIgnore;
+import com.fasterxml.jackson.annotation.JsonIgnoreProperties;
+import com.fasterxml.jackson.annotation.JsonProperty;
+import com.fasterxml.jackson.databind.ObjectMapper;
 import java.io.IOException;
 import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.Collections;
 import java.util.List;
 import org.apache.helix.PropertyType;
-import org.codehaus.jackson.annotate.JsonProperty;
-import org.codehaus.jackson.map.ObjectMapper;
 
 /**
  * Represents source physical cluster information for view cluster
  */
+@JsonIgnoreProperties(ignoreUnknown = true)
 public class ViewClusterSourceConfig {
+  private static final List<PropertyType> _validPropertyTypes = Collections.unmodifiableList(
+      Arrays.asList(PropertyType.INSTANCES, PropertyType.EXTERNALVIEW, PropertyType.LIVEINSTANCES));
+  private static final ObjectMapper _objectMapper = new ObjectMapper();
 
-  private static final List<PropertyType> _validPropertyTypes = Collections.unmodifiableList(Arrays
-      .asList(new PropertyType[] { PropertyType.INSTANCES, PropertyType.EXTERNALVIEW,
-          PropertyType.LIVEINSTANCES
-      }));
-
-  private static ObjectMapper _objectMapper = new ObjectMapper();
-
-  @JsonProperty("name")
-  private String _name;
-
-  @JsonProperty("zkAddress")
-  String _zkAddress;
-
-  @JsonProperty("properties")
+  private final String _name;
+  private final String _zkAddress;
   private List<PropertyType> _properties;
 
-  private ViewClusterSourceConfig() {
-  }
-
-  public ViewClusterSourceConfig(String name, String zkAddress, List<PropertyType> properties) {
+  @JsonCreator
+  public ViewClusterSourceConfig(
+      @JsonProperty("name") String name,
+      @JsonProperty("zkAddress") String zkAddress,
+      @JsonProperty("properties") List<PropertyType> properties
+  ) {
     _name = name;
     _zkAddress = zkAddress;
     _properties = properties;
@@ -62,14 +59,6 @@
     this(config.getName(), config.getZkAddress(), new ArrayList<>(config.getProperties()));
   }
 
-  public void setName(String name) {
-    _name = name;
-  }
-
-  public void setZkAddress(String zkAddress) {
-    _zkAddress = zkAddress;
-  }
-
   public void setProperties(List<PropertyType> properties) {
     for (PropertyType p : properties) {
       if (!_validPropertyTypes.contains(p)) {
@@ -92,8 +81,13 @@
     return _properties;
   }
 
+  @JsonIgnore
+  public static List<PropertyType> getValidPropertyTypes() {
+    return _validPropertyTypes;
+  }
+
   public String toJson() throws IOException {
-    return new ObjectMapper().writeValueAsString(this);
+    return _objectMapper.writeValueAsString(this);
   }
 
   public String toString() {
@@ -121,4 +115,4 @@
           String.format("Invalid Json: %s, Exception: %s", jsonString, e.toString()));
     }
   }
-}
+}
\ No newline at end of file
diff --git a/helix-core/src/main/java/org/apache/helix/common/ClusterEventProcessor.java b/helix-core/src/main/java/org/apache/helix/common/ClusterEventProcessor.java
index 690cdd2..c2a48b5 100644
--- a/helix-core/src/main/java/org/apache/helix/common/ClusterEventProcessor.java
+++ b/helix-core/src/main/java/org/apache/helix/common/ClusterEventProcessor.java
@@ -42,9 +42,4 @@
   public void queueEvent(ClusterEvent event) {
     _eventQueue.put(event.getEventType(), event);
   }
-
-  public void shutdown() {
-    _eventQueue.clear();
-    this.interrupt();
-  }
 }
diff --git a/helix-core/src/main/java/org/apache/helix/common/caches/BasicClusterDataCache.java b/helix-core/src/main/java/org/apache/helix/common/caches/BasicClusterDataCache.java
index 2870112..4d3ce72 100644
--- a/helix-core/src/main/java/org/apache/helix/common/caches/BasicClusterDataCache.java
+++ b/helix-core/src/main/java/org/apache/helix/common/caches/BasicClusterDataCache.java
@@ -19,11 +19,13 @@
  * under the License.
  */
 
+import java.util.Collections;
 import java.util.Map;
 import java.util.concurrent.ConcurrentHashMap;
 
 import org.apache.helix.HelixConstants;
 import org.apache.helix.HelixDataAccessor;
+import org.apache.helix.HelixProperty;
 import org.apache.helix.PropertyKey;
 import org.apache.helix.common.controllers.ControlContextProvider;
 import org.apache.helix.model.ExternalView;
@@ -46,10 +48,6 @@
   protected PropertyCache<LiveInstance> _liveInstancePropertyCache;
   protected PropertyCache<InstanceConfig> _instanceConfigPropertyCache;
   protected ExternalViewCache _externalViewCache;
-  protected Map<String, LiveInstance> _liveInstanceMap;
-  protected Map<String, InstanceConfig> _instanceConfigMap;
-  protected Map<String, ExternalView> _externalViewMap;
-  private final PropertyType _sourceDataType;
 
   protected String _clusterName;
 
@@ -110,12 +108,12 @@
     LOG.info("START: BasicClusterDataCache.refresh() for cluster " + _clusterName);
     long startTime = System.currentTimeMillis();
 
-    if (_propertyDataChangedMap.get(HelixConstants.ChangeType.EXTERNAL_VIEW)) {
+    if (_propertyDataChangedMap.getOrDefault(HelixConstants.ChangeType.EXTERNAL_VIEW, false)) {
       _propertyDataChangedMap.put(HelixConstants.ChangeType.EXTERNAL_VIEW, false);
       _externalViewCache.refresh(accessor);
     }
 
-    if (_propertyDataChangedMap.get(HelixConstants.ChangeType.LIVE_INSTANCE)) {
+    if (_propertyDataChangedMap.getOrDefault(HelixConstants.ChangeType.LIVE_INSTANCE, false)) {
       long start = System.currentTimeMillis();
       _propertyDataChangedMap.put(HelixConstants.ChangeType.LIVE_INSTANCE, false);
       _propertyDataChangedMap.put(HelixConstants.ChangeType.CURRENT_STATE, true);
@@ -124,7 +122,7 @@
           + ". Takes " + (System.currentTimeMillis() - start) + " ms");
     }
 
-    if (_propertyDataChangedMap.get(HelixConstants.ChangeType.INSTANCE_CONFIG)) {
+    if (_propertyDataChangedMap.getOrDefault(HelixConstants.ChangeType.INSTANCE_CONFIG, false)) {
       long start = System.currentTimeMillis();
       _propertyDataChangedMap.put(HelixConstants.ChangeType.INSTANCE_CONFIG, false);
       _instanceConfigPropertyCache.refresh(accessor);
@@ -196,15 +194,17 @@
    */
   public synchronized void clearCache(HelixConstants.ChangeType changeType) {
     switch (changeType) {
-    case LIVE_INSTANCE:
-    case INSTANCE_CONFIG:
-      LOG.warn("clearCache is deprecated for changeType: {}.", changeType);
-      break;
-    case EXTERNAL_VIEW:
-      _externalViewCache.clear();
-      break;
-    default:
-      break;
+      case LIVE_INSTANCE:
+        _liveInstancePropertyCache.setPropertyMap(HelixProperty.convertListToMap(Collections.emptyList()));
+        break;
+      case INSTANCE_CONFIG:
+        _instanceConfigPropertyCache.setPropertyMap(HelixProperty.convertListToMap(Collections.emptyList()));
+        break;
+      case EXTERNAL_VIEW:
+        _externalViewCache.clear();
+        break;
+      default:
+        break;
     }
   }
 
diff --git a/helix-core/src/main/java/org/apache/helix/controller/rebalancer/topology/Topology.java b/helix-core/src/main/java/org/apache/helix/controller/rebalancer/topology/Topology.java
index f35b637..98ef0c5 100644
--- a/helix-core/src/main/java/org/apache/helix/controller/rebalancer/topology/Topology.java
+++ b/helix-core/src/main/java/org/apache/helix/controller/rebalancer/topology/Topology.java
@@ -93,7 +93,6 @@
     }
     return Collections.emptyList();
   }
-
   /**
    * Returns all leaf nodes that belong in the tree. Returns itself if this node is a leaf.
    *
diff --git a/helix-core/src/main/java/org/apache/helix/controller/stages/ClusterEventType.java b/helix-core/src/main/java/org/apache/helix/controller/stages/ClusterEventType.java
index 12dc758..65f6bb4 100644
--- a/helix-core/src/main/java/org/apache/helix/controller/stages/ClusterEventType.java
+++ b/helix-core/src/main/java/org/apache/helix/controller/stages/ClusterEventType.java
@@ -39,7 +39,6 @@
   OnDemandRebalance,
   ControllerChange,
   RetryRebalance,
-  ViewClusterPeriodicRefresh,
   StateVerifier,
   Unknown
 }
diff --git a/helix-core/src/main/java/org/apache/helix/model/ClusterConfig.java b/helix-core/src/main/java/org/apache/helix/model/ClusterConfig.java
index 0456458..e279856 100644
--- a/helix-core/src/main/java/org/apache/helix/model/ClusterConfig.java
+++ b/helix-core/src/main/java/org/apache/helix/model/ClusterConfig.java
@@ -19,6 +19,7 @@
  * under the License.
  */
 
+import java.io.IOException;
 import java.util.ArrayList;
 import java.util.Collections;
 import java.util.HashMap;
@@ -37,6 +38,7 @@
 import org.apache.helix.util.ConfigStringUtil;
 import org.apache.helix.zookeeper.datamodel.ZNRecord;
 import org.apache.helix.api.config.ViewClusterSourceConfig;
+import org.apache.helix.zookeeper.datamodel.ZNRecord;
 
 /**
  * Cluster configurations
@@ -85,6 +87,10 @@
     // error exceeds this limitation
     DISABLED_INSTANCES,
 
+    VIEW_CLUSTER, // Set to "true" to indicate this is a view cluster
+    VIEW_CLUSTER_SOURCES, // Map field, key is the name of source cluster, value is
+    // ViewClusterSourceConfig JSON string
+    VIEW_CLUSTER_REFRESH_PERIOD, // In second
     // Specifies job types and used for quota allocation
     QUOTA_TYPES,
 
@@ -178,6 +184,7 @@
   public final static boolean DEFAULT_GLOBAL_REBALANCE_ASYNC_MODE_ENABLED = true;
   private static final int GLOBAL_TARGET_TASK_THREAD_POOL_SIZE_NOT_SET = -1;
   private static final int OFFLINE_NODE_TIME_OUT_FOR_MAINTENANCE_MODE_NOT_SET = -1;
+  private final static int DEFAULT_VIEW_CLUSTER_REFRESH_PERIOD = 30;
 
   /**
    * Instantiate for a specific cluster
@@ -195,6 +202,36 @@
     super(record);
   }
 
+  public void setViewCluster() {
+    _record.setBooleanField(ClusterConfigProperty.VIEW_CLUSTER.name(), true);
+  }
+
+  /**
+   * Whether this cluster is a ViewCluster
+   * @return
+   */
+  public boolean isViewCluster() {
+    return _record
+        .getBooleanField(ClusterConfigProperty.VIEW_CLUSTER.name(), false);
+  }
+
+  /**
+   * Set a list of ViewClusterSourceConfig to ClusterConfig. Current source config will be
+   * overwritten
+   * @param sourceConfigList
+   */
+  public void setViewClusterSourceConfigs(List<ViewClusterSourceConfig> sourceConfigList) {
+    List<String> sourceConfigs = new ArrayList<>();
+    for (ViewClusterSourceConfig config : sourceConfigList) {
+      try {
+        sourceConfigs.add(config.toJson());
+      } catch (IOException e) {
+        throw new IllegalArgumentException("Invalid source config. Error: " + e.toString());
+      }
+    }
+    _record.setListField(ClusterConfigProperty.VIEW_CLUSTER_SOURCES.name(), sourceConfigs);
+  }
+
   /**
    * Set task quota type with the ratio of this quota.
    * @param quotaType  String
@@ -263,6 +300,30 @@
   }
 
   /**
+   * Set view cluster max refresh period
+   * @param refreshPeriod refresh period in second
+   */
+  public void setViewClusterRefreshPeriod(int refreshPeriod) {
+    _record.setIntField(ClusterConfigProperty.VIEW_CLUSTER_REFRESH_PERIOD.name(),
+        refreshPeriod);
+  }
+
+  public List<ViewClusterSourceConfig> getViewClusterSourceConfigs() {
+    List<ViewClusterSourceConfig> sourceConfigList = new ArrayList<>();
+    for (String configJSON : _record
+        .getListField(ClusterConfigProperty.VIEW_CLUSTER_SOURCES.name())) {
+      ViewClusterSourceConfig config = ViewClusterSourceConfig.fromJson(configJSON);
+      sourceConfigList.add(config);
+    }
+    return sourceConfigList;
+  }
+
+  public int getViewClusterRefershPeriod() {
+    return _record.getIntField(ClusterConfigProperty.VIEW_CLUSTER_REFRESH_PERIOD.name(),
+        DEFAULT_VIEW_CLUSTER_REFRESH_PERIOD);
+  }
+
+  /**
    * Whether to persist best possible assignment in a resource's idealstate.
    * @return
    */
diff --git a/helix-core/src/test/java/org/apache/helix/integration/manager/TestZkHelixAdmin.java b/helix-core/src/test/java/org/apache/helix/integration/manager/TestZkHelixAdmin.java
index e60b7ce..7de552e 100644
--- a/helix-core/src/test/java/org/apache/helix/integration/manager/TestZkHelixAdmin.java
+++ b/helix-core/src/test/java/org/apache/helix/integration/manager/TestZkHelixAdmin.java
@@ -31,7 +31,6 @@
 import org.apache.helix.integration.task.WorkflowGenerator;
 import org.apache.helix.manager.zk.ZKHelixAdmin;
 import org.apache.helix.model.IdealState;
-import org.apache.helix.api.config.ViewClusterSourceConfig;
 import org.apache.helix.task.JobConfig;
 import org.apache.helix.task.JobContext;
 import org.apache.helix.task.TaskPartitionState;
diff --git a/helix-view-aggregator/helix-view-aggregator-0.6.10-SNAPSHOT.ivy b/helix-view-aggregator/helix-view-aggregator-1.0.3-SNAPSHOT.ivy
similarity index 84%
rename from helix-view-aggregator/helix-view-aggregator-0.6.10-SNAPSHOT.ivy
rename to helix-view-aggregator/helix-view-aggregator-1.0.3-SNAPSHOT.ivy
index 7ba8579..cf817b8 100644
--- a/helix-view-aggregator/helix-view-aggregator-0.6.10-SNAPSHOT.ivy
+++ b/helix-view-aggregator/helix-view-aggregator-1.0.3-SNAPSHOT.ivy
@@ -7,9 +7,7 @@
 to you under the Apache License, Version 2.0 (the
 "License"); you may not use this file except in compliance
 with the License.  You may obtain a copy of the License at
-
   http://www.apache.org/licenses/LICENSE-2.0
-
 Unless required by applicable law or agreed to in writing,
 software distributed under the License is distributed on an
 "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
@@ -20,7 +18,7 @@
 <ivy-module version="1.0">
 	<info organisation="org.apache.helix"
 		module="helix-view-aggregator"
-		revision="0.6.10-SNAPSHOT"
+		revision="1.0.3-SNAPSHOT"
 		status="integration"
 		publication="20170128141623"
 	/>
@@ -37,16 +35,19 @@
 		<artifact name="helix-view-aggregator" type="jar" ext="jar" conf="master"/>
 	</publications>
 	<dependencies>
-	  <dependency org="org.slf4j" name="slf4j-api" rev="1.7.25" force="true" conf="compile->compile(*),master(*);runtime->runtime(*)">
+	  <dependency org="org.slf4j" name="slf4j-api" rev="1.7.32" force="true" conf="compile->compile(*),master(*);runtime->runtime(*)">
         <artifact name="slf4j-api" ext="jar"/>
     </dependency>
-    <dependency org="org.slf4j" name="slf4j-log4j12" rev="1.7.14" force="true" conf="compile->compile(*),master(*);runtime->runtime(*)">
-        <artifact name="slf4j-log4j12" ext="jar"/>
+    <dependency org="org.apache.logging.log4j" name="log4j-slf4j-impl" rev="2.17.1" force="true" conf="compile->compile(*),master(*);runtime->runtime(*)">
+        <artifact name="log4j-slf4j-impl" ext="jar"/>
     </dependency>
-		<dependency org="org.apache.helix" name="helix-core" rev="0.8.0-SNAPSHOT" force="true" conf="compile->compile(*),master(*);runtime->runtime(*)"/>
+    <--dependency org="org.slf4j" name="slf4j-log4j12" rev="1.7.14" force="true" conf="compile->compile(*),master(*);runtime->runtime(*)">
+        <artifact name="slf4j-log4j12" ext="jar"/>
+    </dependency-->
+		<dependency org="org.apache.helix" name="helix-core" rev="1.0.3-SNAPSHOT" force="true" conf="compile->compile(*),master(*);runtime->runtime(*)"/>
 		<dependency org="org.codehaus.jackson" name="jackson-core-asl" rev="1.8.5" force="true" conf="compile->compile(*),master(*);runtime->runtime(*)"/>
 		<dependency org="org.codehaus.jackson" name="jackson-mapper-asl" rev="1.8.5" force="true" conf="compile->compile(*),master(*);runtime->runtime(*)"/>
 		<dependency org="commons-cli" name="commons-cli" rev="1.2" force="true" conf="compile->compile(*),master(*);runtime->runtime(*)"/>
 		<dependency org="io.dropwizard.metrics" name="metrics-core" rev="3.2.3" conf="compile->compile(default);runtime->runtime(default);default->default"/>
 	</dependencies>
-</ivy-module>
+</ivy-module>
\ No newline at end of file
diff --git a/helix-view-aggregator/pom.xml b/helix-view-aggregator/pom.xml
index 2f05fcc..f914c61 100644
--- a/helix-view-aggregator/pom.xml
+++ b/helix-view-aggregator/pom.xml
@@ -21,7 +21,7 @@
   <parent>
     <groupId>org.apache.helix</groupId>
     <artifactId>helix</artifactId>
-    <version>0.6.10-SNAPSHOT</version>
+    <version>1.0.3-SNAPSHOT</version>
   </parent>
   <modelVersion>4.0.0</modelVersion>
 
@@ -32,9 +32,7 @@
   <properties>
     <osgi.import>
       org.apache.helix*,
-      org.codehaus.jackson*,
       org.apache.commons.cli*,
-      org.slf4j*;version="[1.6,2)",
       *
     </osgi.import>
     <osgi.export>org.apache.helix.view*;version="${project.version};-noimport:=true</osgi.export>
@@ -49,26 +47,31 @@
     <dependency>
       <groupId>org.slf4j</groupId>
       <artifactId>slf4j-api</artifactId>
-      <version>1.7.25</version>
+      <version>1.7.32</version>
     </dependency>
     <dependency>
-      <groupId>org.slf4j</groupId>
-      <artifactId>slf4j-log4j12</artifactId>
-      <version>1.7.14</version>
+      <groupId>org.apache.logging.log4j</groupId>
+      <artifactId>log4j-slf4j-impl</artifactId>
+      <version>2.17.1</version>
+    </dependency>
+    <dependency>
+      <groupId>org.apache.logging.log4j</groupId>
+      <artifactId>log4j-core</artifactId>
+      <version>2.17.1</version>
     </dependency>
     <dependency>
       <groupId>org.apache.helix</groupId>
       <artifactId>helix-core</artifactId>
     </dependency>
     <dependency>
-      <groupId>org.codehaus.jackson</groupId>
-      <artifactId>jackson-core-asl</artifactId>
-      <version>1.8.5</version>
+      <groupId>com.fasterxml.jackson.core</groupId>
+      <artifactId>jackson-databind</artifactId>
+      <version>2.11.0</version>
     </dependency>
     <dependency>
-      <groupId>org.codehaus.jackson</groupId>
-      <artifactId>jackson-mapper-asl</artifactId>
-      <version>1.8.5</version>
+      <groupId>com.fasterxml.jackson.core</groupId>
+      <artifactId>jackson-core</artifactId>
+      <version>2.11.0</version>
     </dependency>
     <dependency>
       <groupId>commons-cli</groupId>
@@ -133,4 +136,4 @@
       </plugin>
     </plugins>
   </build>
-</project>
+</project>
\ No newline at end of file
diff --git a/helix-view-aggregator/src/main/java/org/apache/helix/view/aggregator/HelixViewAggregator.java b/helix-view-aggregator/src/main/java/org/apache/helix/view/aggregator/HelixViewAggregator.java
index ab122db..b1d34b9 100644
--- a/helix-view-aggregator/src/main/java/org/apache/helix/view/aggregator/HelixViewAggregator.java
+++ b/helix-view-aggregator/src/main/java/org/apache/helix/view/aggregator/HelixViewAggregator.java
@@ -19,28 +19,29 @@
  * under the License.
  */
 
-import java.util.HashMap;
 import java.util.HashSet;
 import java.util.Map;
+import java.util.Set;
 import java.util.Timer;
 import java.util.TimerTask;
-import org.I0Itec.zkclient.exception.ZkInterruptedException;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.atomic.AtomicBoolean;
 import org.apache.helix.HelixDataAccessor;
 import org.apache.helix.HelixException;
 import org.apache.helix.HelixManager;
 import org.apache.helix.HelixManagerFactory;
 import org.apache.helix.InstanceType;
 import org.apache.helix.NotificationContext;
+import org.apache.helix.PropertyType;
 import org.apache.helix.api.config.ViewClusterSourceConfig;
 import org.apache.helix.api.listeners.ClusterConfigChangeListener;
 import org.apache.helix.api.listeners.PreFetch;
-import org.apache.helix.common.ClusterEventProcessor;
-import org.apache.helix.controller.stages.ClusterEvent;
-import org.apache.helix.controller.stages.ClusterEventType;
+import org.apache.helix.common.DedupEventProcessor;
 import org.apache.helix.model.ClusterConfig;
-import org.apache.helix.view.common.ViewAggregatorEventAttributes;
+import org.apache.helix.view.common.ClusterViewEvent;
 import org.apache.helix.view.dataprovider.SourceClusterDataProvider;
 import org.apache.helix.view.monitoring.ViewAggregatorMonitor;
+import org.apache.helix.zookeeper.zkclient.exception.ZkInterruptedException;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -49,56 +50,58 @@
  */
 public class HelixViewAggregator implements ClusterConfigChangeListener {
   private static final Logger logger = LoggerFactory.getLogger(HelixViewAggregator.class);
-  private static final int PROCESS_VIEW_CONFIG_CHANGE_BACKOFF_MS = 3 * 1000;
+  private static final long DEFAULT_INITIAL_EVENT_PROCESS_BACKOFF = 10;
+  private static final long DEFAULT_MAX_EVENT_PROCESS_BACKOFF = 5 * 1000;
   private final String _viewClusterName;
   private final HelixManager _viewClusterManager;
-  private HelixDataAccessor _dataAccessor;
+  private final Map<String, SourceClusterDataProvider> _dataProviderMap;
 
   // Worker that processes source cluster events and refresh view cluster
-  private ClusterEventProcessor _aggregator;
-  private boolean _refreshViewCluster;
+  private final DedupEventProcessor<ClusterViewEvent.Type, ClusterViewEvent> _aggregator;
+  private final AtomicBoolean _refreshViewCluster;
 
   // Worker that processes view cluster config change
-  private ClusterEventProcessor _viewConfigProcessor;
+  private final DedupEventProcessor<ClusterViewEvent.Type, ClusterViewEvent> _viewConfigProcessor;
+  private final ViewAggregatorMonitor _monitor;
 
-  private Map<String, SourceClusterDataProvider> _dataProviderMap;
   private ClusterConfig _curViewClusterConfig;
   private Timer _viewClusterRefreshTimer;
   private ViewClusterRefresher _viewClusterRefresher;
-  private ViewAggregatorMonitor _monitor;
+  private HelixDataAccessor _dataAccessor;
 
   public HelixViewAggregator(String viewClusterName, String zkAddr) {
     _viewClusterName = viewClusterName;
-    _dataProviderMap = new HashMap<>();
+    _dataProviderMap = new ConcurrentHashMap<>();
     _viewClusterManager = HelixManagerFactory
         .getZKHelixManager(_viewClusterName, generateHelixManagerInstanceName(_viewClusterName),
             InstanceType.SPECTATOR, zkAddr);
     _refreshViewCluster = new AtomicBoolean(false);
     _monitor = new ViewAggregatorMonitor(viewClusterName);
-    _aggregator = new ClusterEventProcessor(_viewClusterName, "Aggregator") {
+    _aggregator = new DedupEventProcessor<ClusterViewEvent.Type, ClusterViewEvent>(_viewClusterName,
+        "Aggregator") {
       @Override
-      public void handleEvent(ClusterEvent event) {
+      public void handleEvent(ClusterViewEvent event) {
         handleSourceClusterEvent(event);
         _monitor.recordProcessedSourceEvent();
       }
     };
 
-    _viewConfigProcessor = new ClusterEventProcessor(_viewClusterName, "ViewConfigProcessor") {
+    _viewConfigProcessor = new DedupEventProcessor<ClusterViewEvent.Type, ClusterViewEvent>(_viewClusterName, "ViewConfigProcessor") {
       @Override
-      public void handleEvent(ClusterEvent event) {
+      public void handleEvent(ClusterViewEvent event) {
         handleViewClusterConfigChange(event);
       }
     };
   }
 
   public String getAggregatorInstanceName() {
-    return String
-        .format("%s::%s", _viewClusterManager.getInstanceName(), hashCode());
+    return String.format("%s::%s", _viewClusterManager.getInstanceName(), hashCode());
   }
 
   /**
    * Start controller main logic
-   * @throws Exception
+   * @throws Exception when HelixViewAggregator fails to start. Will try to shut it down before
+   *                   exception is thrown out
    */
   public void start() throws Exception {
     _monitor.register();
@@ -110,16 +113,15 @@
     // Setup manager
     try {
       _viewClusterManager.connect();
-      _viewClusterManager.addClusterfigChangeListener(this);
       _dataAccessor = _viewClusterManager.getHelixDataAccessor();
+      _viewClusterManager.addClusterfigChangeListener(this);
     } catch (Exception e) {
+      shutdown();
       throw new HelixException("Failed to connect view cluster helix manager", e);
     }
 
     // Set up view cluster refresher
-    _viewClusterRefresher =
-        new ViewClusterRefresher(_viewClusterName, _viewClusterManager.getHelixDataAccessor(),
-            _dataProviderMap);
+    _viewClusterRefresher = new ViewClusterRefresher(_viewClusterName, _viewClusterManager.getHelixDataAccessor());
   }
 
   public void shutdown() {
@@ -144,22 +146,19 @@
         logger.warn("ZK interrupted when disconnecting helix manager", zkintr);
       } catch (Exception e) {
         success = false;
-        logger.error(String
-            .format("Failed to disconnect helix manager for view cluster %s", _viewClusterName), e);
+        logger.error(String.format("Failed to disconnect helix manager for view cluster %s", _viewClusterName), e);
       }
     }
 
     // Clean up all data providers
     for (SourceClusterDataProvider provider : _dataProviderMap.values()) {
-      logger
-          .info(String.format("Shutting data provider for source cluster %s", provider.getName()));
+      logger.info(String.format("Shutting down data provider for source cluster %s", provider.getName()));
       try {
         provider.shutdown();
       } catch (Exception e) {
         success = false;
-        logger.error(String
-            .format("Failed to shutdown data provider %s for view cluster %s", provider.getName(),
-                _viewClusterName), e);
+        logger.error(String.format("Failed to shutdown data provider %s for view cluster %s", provider.getName(),
+            _viewClusterName), e);
       }
     }
 
@@ -173,79 +172,100 @@
   @PreFetch(enabled = false)
   public void onClusterConfigChange(ClusterConfig clusterConfig, NotificationContext context) {
     if (context != null && context.getType() != NotificationContext.Type.FINALIZE) {
-      _viewConfigProcessor
-          .queueEvent(new ClusterEvent(_viewClusterName, ClusterEventType.ClusterConfigChange));
+      _viewConfigProcessor.queueEvent(ClusterViewEvent.Type.ConfigChange,
+          new ClusterViewEvent(_viewClusterName, ClusterViewEvent.Type.ConfigChange));
     } else {
-      logger.info(String
-          .format("Skip processing view cluster config change with notification context type %s",
-              context == null ? "NoContext" : context.getType().name()));
+      logger.info(String.format("Skip processing view cluster config change with notification context type %s",
+          context == null ? "NoContext" : context.getType().name()));
     }
   }
 
-  private void handleSourceClusterEvent(ClusterEvent event) {
-    logger.info("Processing event from source cluster " + event.getClusterName());
+  private void handleSourceClusterEvent(ClusterViewEvent event) {
+    logger.info(String
+        .format("Processing event %s from source cluster %s.", event.getEventType().name(),
+            event.getClusterName()));
     switch (event.getEventType()) {
-    case LiveInstanceChange:
-    case InstanceConfigChange:
-    case ExternalViewChange:
-      _refreshViewCluster = true;
-      break;
-    case ViewClusterPeriodicRefresh:
-      if (!_refreshViewCluster) {
-        logger.info("Skip refresh: No event happened since last refresh, and no force refresh.");
-        return;
-      }
-      // mark source cluster as changed to trigger next refresh as we failed to refresh at
-      // least some of the elements in view cluster
-      logger.info("Refreshing cluster based on event " + event.getEventType().name());
-      _refreshViewCluster = refreshViewCluster();
-      break;
-    default:
-      logger.error(String.format("Unrecognized event type: %s", event.getEventType()));
-    }
-  }
-
-  private synchronized void handleViewClusterConfigChange(ClusterEvent event) {
-    logger.info("Processing view cluster event " + event.getEventType().name());
-    switch (event.getEventType()) {
-    case ClusterConfigChange:
-      // TODO: when clusterEventProcessor supports delayed scheduling,
-      // we should not have this head-of-line blocking but to have ClusterEventProcessor do the work.
-      // Currently it's acceptable as we can endure delay in processing view cluster config change
-      if (event.getAttribute(ViewAggregatorEventAttributes.EventProcessBackoff.name()) != null) {
-        try {
-          Thread.sleep(PROCESS_VIEW_CONFIG_CHANGE_BACKOFF_MS);
-        } catch (InterruptedException e) {
-          logger.warn("Interrupted when backing off during process view config change retry", e);
+      case ExternalViewChange:
+      case InstanceConfigChange:
+      case LiveInstanceChange:
+        _refreshViewCluster.set(true);
+        break;
+      case PeriodicViewRefresh:
+        if (!_refreshViewCluster.get()) {
+          logger.info("Skip refresh: No event happened since last refresh, and no force refresh.");
+          return;
         }
+        // mark source cluster as changed to trigger next refresh as we failed to refresh at
+        // least some of the elements in view cluster
+        logger.info("Refreshing cluster based on event " + event.getEventType().name());
+        refreshViewCluster();
+        break;
+      default:
+        logger.error(String.format("Unrecognized event type: %s", event.getEventType()));
+    }
+  }
+
+  private void handleViewClusterConfigChange(ClusterViewEvent event) {
+    logger.info(String.format("Processing event %s for view cluster %s", event.getEventType().name(),
+        _viewClusterName));
+    if (event.getEventType() == ClusterViewEvent.Type.ConfigChange) {
+      // TODO: when DedupEventProcessor supports delayed scheduling,
+      // we should not have this head-of-line blocking but to have DedupEventProcessor do the work.
+      // Currently it's acceptable as we can endure delay in processing view cluster config change
+      try {
+        Thread.sleep(event.getEventProcessBackoff());
+      } catch (InterruptedException e) {
+        logger.warn("Interrupted when backing off during process view config change retry", e);
+        Thread.currentThread().interrupt();
       }
+
       // We always compare current cluster config with most up-to-date cluster config
+      boolean success;
       ClusterConfig newClusterConfig =
           _dataAccessor.getProperty(_dataAccessor.keyBuilder().clusterConfig());
-      SourceClusterConfigChangeAction action =
-          new SourceClusterConfigChangeAction(_curViewClusterConfig, newClusterConfig);
-      action.computeAction();
+
+      if (newClusterConfig == null) {
+        logger.warn("Failed to read view cluster config");
+        success = false;
+      } else {
+        SourceClusterConfigChangeAction action =
+            new SourceClusterConfigChangeAction(_curViewClusterConfig, newClusterConfig);
+        action.computeAction();
+        success = processViewClusterConfigUpdate(action);
+      }
 
       // If we fail to process action and should retry, re-queue event to retry
-      if (!processViewClusterConfigUpdate(action)) {
+      if (!success) {
         _monitor.recordViewConfigProcessFailure();
-        event.addAttribute(ViewAggregatorEventAttributes.EventProcessBackoff.name(), true);
-        _viewConfigProcessor.queueEvent(event);
+        long backoff = computeNextEventProcessBackoff(event.getEventProcessBackoff());
+        logger.info("Failed to process view cluster config change. Will retry in {} ms", backoff);
+        event.setEventProcessBackoff(backoff);
+        _viewConfigProcessor.queueEvent(event.getEventType(), event);
       } else {
         _curViewClusterConfig = newClusterConfig;
       }
-      break;
-    default:
+    } else {
       logger.error(String.format("Unrecognized event type: %s", event.getEventType()));
     }
   }
 
+  private long computeNextEventProcessBackoff(long currentBackoff) {
+    if (currentBackoff <= 0) {
+      return DEFAULT_INITIAL_EVENT_PROCESS_BACKOFF;
+    }
+
+    // Exponential backoff with ceiling
+    return currentBackoff * 2 > DEFAULT_MAX_EVENT_PROCESS_BACKOFF
+        ? DEFAULT_MAX_EVENT_PROCESS_BACKOFF
+        : currentBackoff * 2;
+  }
+
   private class RefreshViewClusterTask extends TimerTask {
     @Override
     public void run() {
       logger.info("Triggering view cluster refresh");
-      _aggregator.queueEvent(
-          new ClusterEvent(_viewClusterName, ClusterEventType.ViewClusterPeriodicRefresh));
+      _aggregator.queueEvent(ClusterViewEvent.Type.PeriodicViewRefresh,
+          new ClusterViewEvent(_viewClusterName, ClusterViewEvent.Type.PeriodicViewRefresh));
     }
   }
 
@@ -275,7 +295,13 @@
       if (_dataProviderMap.containsKey(key)) {
         try {
           _dataProviderMap.get(key).shutdown();
-          _dataProviderMap.remove(key);
+          synchronized (_dataProviderMap) {
+            _dataProviderMap.remove(key);
+            // upon successful removal of data provider, set refresh view cluster to true
+            // or if no event from source cluster happened before next refresh cycle, this
+            // removal will be missed.
+            _refreshViewCluster.set(true);
+          }
         } catch (Exception e) {
           success = false;
           logger.warn(String.format("Failed to shutdown data provider %s, will retry", key));
@@ -302,6 +328,7 @@
       }
     }
 
+
     if (action.shouldResetTimer()) {
       logger.info(
           "Resetting view cluster refresh timer at interval " + action.getCurrentRefreshPeriodMs());
@@ -312,7 +339,6 @@
 
   /**
    * Use ViewClusterRefresher to refresh ViewCluster.
-   * @return true if needs retry, else false
    */
   private void refreshViewCluster() {
     long startRefreshMs = System.currentTimeMillis();
diff --git a/helix-view-aggregator/src/main/java/org/apache/helix/view/aggregator/HelixViewAggregatorMain.java b/helix-view-aggregator/src/main/java/org/apache/helix/view/aggregator/HelixViewAggregatorMain.java
index 7f28441..c5696a1 100644
--- a/helix-view-aggregator/src/main/java/org/apache/helix/view/aggregator/HelixViewAggregatorMain.java
+++ b/helix-view-aggregator/src/main/java/org/apache/helix/view/aggregator/HelixViewAggregatorMain.java
@@ -19,16 +19,104 @@
  * under the License.
  */
 
+import java.util.logging.Level;
+import org.apache.commons.cli.CommandLine;
+import org.apache.commons.cli.CommandLineParser;
+import org.apache.commons.cli.GnuParser;
+import org.apache.commons.cli.HelpFormatter;
+import org.apache.commons.cli.Option;
+import org.apache.commons.cli.OptionBuilder;
+import org.apache.commons.cli.Options;
+import org.apache.commons.cli.ParseException;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 public class HelixViewAggregatorMain {
   private static Logger logger = LoggerFactory.getLogger(HelixViewAggregatorMain.class);
+  private static final String HELP = "help";
+  private static final String ZK_ADDR = "zookeeper-address";
+  private static final String VIEW_CLUSTER_NAME = "view-cluster-name";
+
+  private static void printUsage(Options cliOptions) {
+    HelpFormatter helpFormatter = new HelpFormatter();
+    helpFormatter.printHelp("java " + HelixViewAggregatorMain.class.getName(), cliOptions);
+  }
+
+  private static Options constructCommandLineOptions() {
+    Option helpOption =
+        OptionBuilder.withLongOpt(HELP).withDescription("Prints command-line options info")
+            .create();
+    helpOption.setArgs(0);
+    helpOption.setRequired(false);
+    helpOption.setArgName("print help message");
+
+    Option zkServerOption =
+        OptionBuilder.withLongOpt(ZK_ADDR).withDescription("Provide zookeeper address")
+            .create();
+    zkServerOption.setArgs(1);
+    zkServerOption.setRequired(true);
+    zkServerOption.setArgName("ZooKeeper server connection string (Required)");
+
+    Option portOption =
+        OptionBuilder.withLongOpt(VIEW_CLUSTER_NAME).withDescription("Name of the view cluster")
+            .create();
+    portOption.setArgs(1);
+    portOption.setRequired(true);
+    portOption.setArgName("Name of the view cluster");
+
+    Options options = new Options();
+    options.addOption(helpOption);
+    options.addOption(zkServerOption);
+    options.addOption(portOption);
+
+    return options;
+  }
   /**
    * @param args
    * @throws Exception
    */
   public static void main(String[] args) throws Exception {
-    // TODO: implement this function as launching HelixViewAggregator using CLI
+    java.util.logging.Logger topJavaLogger = java.util.logging.Logger.getLogger("");
+    topJavaLogger.setLevel(Level.INFO);
+    CommandLineParser cliParser = new GnuParser();
+    Options cliOptions = constructCommandLineOptions();
+    CommandLine cmd = null;
+
+    try {
+      cmd = cliParser.parse(cliOptions, args);
+    } catch (ParseException pe) {
+      logger.error("HelixViewAggregatorMain: failed to parse command-line options.", pe);
+      printUsage(cliOptions);
+      System.exit(1);
+    }
+
+    String zkAddr, viewClusterName;
+    if (cmd.hasOption(HELP)) {
+      printUsage(cliOptions);
+      return;
+    } else {
+      zkAddr = String.valueOf(cmd.getOptionValue(ZK_ADDR));
+      viewClusterName = String.valueOf(cmd.getOptionValue(VIEW_CLUSTER_NAME));
+    }
+
+    final HelixViewAggregator aggregator = new HelixViewAggregator(viewClusterName, zkAddr);
+    Runtime.getRuntime().addShutdownHook(new Thread(new Runnable() {
+      @Override
+      public void run() {
+        aggregator.shutdown();
+      }
+    }));
+
+    try {
+      aggregator.start();
+      Thread.currentThread().join();
+    } catch (Exception e) {
+      logger.error("HelixViewAggregator caught exception.", e);
+    } finally {
+      aggregator.shutdown();
+    }
+
+    // Service should not exit successfully
+    System.exit(1);
   }
 }
diff --git a/helix-view-aggregator/src/main/java/org/apache/helix/view/aggregator/SourceClusterConfigChangeAction.java b/helix-view-aggregator/src/main/java/org/apache/helix/view/aggregator/SourceClusterConfigChangeAction.java
index 7345769..e7f28fc 100644
--- a/helix-view-aggregator/src/main/java/org/apache/helix/view/aggregator/SourceClusterConfigChangeAction.java
+++ b/helix-view-aggregator/src/main/java/org/apache/helix/view/aggregator/SourceClusterConfigChangeAction.java
@@ -35,12 +35,12 @@
  * be reset or not
  */
 public class SourceClusterConfigChangeAction {
-  private ClusterConfig _oldConfig;
-  private ClusterConfig _newConfig;
+  private final List<ViewClusterSourceConfig> _toAdd;
+  private final List<ViewClusterSourceConfig> _toDelete;
+  private final ClusterConfig _oldConfig;
+  private final ClusterConfig _newConfig;
 
   private boolean _shouldResetTimer;
-  private List<ViewClusterSourceConfig> _toAdd;
-  private List<ViewClusterSourceConfig> _toDelete;
 
   public SourceClusterConfigChangeAction(ClusterConfig oldConfig, ClusterConfig newConfig) {
     _oldConfig = oldConfig;
@@ -79,22 +79,19 @@
 
     if (_oldConfig != null) {
       for (ViewClusterSourceConfig oldConfig : _oldConfig.getViewClusterSourceConfigs()) {
-        oldConfigMap
-            .put(generateConfigMapKey(oldConfig.getName(), oldConfig.getZkAddress()), oldConfig);
+        oldConfigMap.put(generateConfigMapKey(oldConfig.getName(), oldConfig.getZkAddress()), oldConfig);
       }
     }
 
     for (ViewClusterSourceConfig currentConfig : _newConfig.getViewClusterSourceConfigs()) {
-      currentConfigMap
-          .put(generateConfigMapKey(currentConfig.getName(), currentConfig.getZkAddress()),
-              currentConfig);
+      currentConfigMap.put(
+          generateConfigMapKey(currentConfig.getName(), currentConfig.getZkAddress()), currentConfig);
     }
 
     // Configs whose properties-to-aggregate got modified should have its data provider recreated
     for (Map.Entry<String, ViewClusterSourceConfig> entry : currentConfigMap.entrySet()) {
       if (oldConfigMap.containsKey(entry.getKey())) {
-        Set<PropertyType> oldPropertySet =
-            new HashSet<>(oldConfigMap.get(entry.getKey()).getProperties());
+        Set<PropertyType> oldPropertySet = new HashSet<>(oldConfigMap.get(entry.getKey()).getProperties());
         Set<PropertyType> currentPropertySet = new HashSet<>(entry.getValue().getProperties());
         if (!oldPropertySet.equals(currentPropertySet)) {
           _toAdd.add(new ViewClusterSourceConfig(entry.getValue()));
diff --git a/helix-view-aggregator/src/main/java/org/apache/helix/view/aggregator/ViewClusterRefresher.java b/helix-view-aggregator/src/main/java/org/apache/helix/view/aggregator/ViewClusterRefresher.java
index 58a35e6..1885a73 100644
--- a/helix-view-aggregator/src/main/java/org/apache/helix/view/aggregator/ViewClusterRefresher.java
+++ b/helix-view-aggregator/src/main/java/org/apache/helix/view/aggregator/ViewClusterRefresher.java
@@ -27,6 +27,7 @@
 import java.util.List;
 import java.util.Map;
 import java.util.Set;
+import java.util.TreeMap;
 import org.apache.helix.HelixDataAccessor;
 import org.apache.helix.HelixProperty;
 import org.apache.helix.PropertyKey;
@@ -45,7 +46,7 @@
   private static final Logger logger = LoggerFactory.getLogger(ViewClusterRefresher.class);
   private String _viewClusterName;
   private HelixDataAccessor _viewClusterDataAccessor;
-  private Map<String, SourceClusterDataProvider> _dataProviderMap;
+  private Set<SourceClusterDataProvider> _dataProviderView;
 
   // These 3 caches stores objects that are pushed to view cluster (write-through) cache,
   // thus we don't need to read from view cluster everytime we refresh it.
@@ -53,11 +54,9 @@
   private Map<String, HelixProperty> _viewClusterInstanceConfigCache;
   private Map<String, HelixProperty> _viewClusterExternalViewCache;
 
-  public ViewClusterRefresher(String viewClusterName, HelixDataAccessor viewClusterDataAccessor,
-      Map<String, SourceClusterDataProvider> dataProviderMap) {
+  public ViewClusterRefresher(String viewClusterName, HelixDataAccessor viewClusterDataAccessor) {
     _viewClusterName = viewClusterName;
     _viewClusterDataAccessor = viewClusterDataAccessor;
-    _dataProviderMap = dataProviderMap;
     _viewClusterLiveInstanceCache = new HashMap<>();
     _viewClusterInstanceConfigCache = new HashMap<>();
     _viewClusterExternalViewCache = new HashMap<>();
@@ -108,6 +107,10 @@
     }
   }
 
+  public void updateProviderView(Set<SourceClusterDataProvider> dataProviderView) {
+    _dataProviderView = dataProviderView;
+  }
+
   /**
    * Create / update / delete property of given type in view cluster, based on data change from
    * source clusters.
@@ -133,7 +136,7 @@
       listedNamesInView =
           new HashSet<>(_viewClusterDataAccessor.getChildNames(getPropertyKey(propertyType, null)));
       // Prepare data
-      for (SourceClusterDataProvider provider : _dataProviderMap.values()) {
+      for (SourceClusterDataProvider provider : _dataProviderView) {
         if (!provider.getPropertiesToAggregate().contains(propertyType)) {
           logger.info(String
               .format("SourceCluster %s does not need to aggregate %s, skip.", provider.getName(),
@@ -153,16 +156,10 @@
           listedNamesInSource.addAll(provider.getExternalViewNames());
           for (Map.Entry<String, ExternalView> entry : provider.getExternalViews().entrySet()) {
             String resourceName = entry.getKey();
-            ExternalView resourceEV = entry.getValue();
-            if (sourceProperties.containsKey(resourceName)) {
-              // Merge external views if we already have a record
-              mergeExternalViews((ExternalView) sourceProperties.get(resourceName), resourceEV);
-            } else {
-              // merging simple fields and list fields are meaningless and would cause confusion
-              resourceEV.getRecord().getSimpleFields().clear();
-              resourceEV.getRecord().getListFields().clear();
-              sourceProperties.put(resourceName, resourceEV);
+            if (!sourceProperties.containsKey(resourceName)) {
+              sourceProperties.put(resourceName, new ExternalView(resourceName));
             }
+            mergeExternalViews((ExternalView) sourceProperties.get(resourceName), entry.getValue());
           }
           break;
         default:
@@ -180,6 +177,8 @@
           .format("Caught exception during refreshing %s for view cluster %s", propertyType.name(),
               _viewClusterName), e);
     }
+    logRefreshResult(propertyType, ok);
+
     return ok;
   }
 
@@ -199,13 +198,11 @@
               source.getId(), toMerge.getId()));
     }
     for (String partitionName : toMerge.getPartitionSet()) {
+      // Deep copying state map to avoid modifying source cache
       if (!source.getPartitionSet().contains(partitionName)) {
-        source.setStateMap(partitionName, toMerge.getStateMap(partitionName));
-      } else {
-        Map<String, String> mergedPartitionState = source.getStateMap(partitionName);
-        mergedPartitionState.putAll(toMerge.getStateMap(partitionName));
-        source.setStateMap(partitionName, mergedPartitionState);
+        source.setStateMap(partitionName, new TreeMap<String, String>());
       }
+      source.getStateMap(partitionName).putAll(toMerge.getStateMap(partitionName));
     }
   }
 
@@ -273,7 +270,6 @@
       Set<String> viewPropertyNames, Set<String> sourcePropertyNames,
       Map<String, T> cachedSourceProperties, Map<String, T> viewClusterPropertyCache) {
     boolean ok = true;
-
     // Calculate diff
     ClusterPropertyDiff diff =
         calculatePropertyDiff(viewPropertyNames, sourcePropertyNames, cachedSourceProperties,
@@ -360,7 +356,8 @@
   private <T extends HelixProperty> boolean addOrUpdateProperties(
       List<PropertyKey> keysToAddOrUpdate, List<HelixProperty> objects, Map<String, T> cache) {
     boolean ok = true;
-    logger.info(String.format("AddOrUpdate objects: %s", keysToAddOrUpdate));
+    logger.info(
+        String.format("AddOrUpdate %s objects: %s", keysToAddOrUpdate.size(), keysToAddOrUpdate));
     boolean[] addOrUpdateResults = _viewClusterDataAccessor.setChildren(keysToAddOrUpdate, objects);
     for (int i = 0; i < addOrUpdateResults.length; i++) {
       if (!addOrUpdateResults[i]) {
@@ -389,7 +386,7 @@
   private <T extends HelixProperty> boolean deleteProperties(List<PropertyKey> keysToDelete,
       Map<String, T> cache) {
     boolean ok = true;
-    logger.info(String.format("Deleting objects: %s", keysToDelete));
+    logger.info(String.format("Deleting %s objects: %s", keysToDelete.size(), keysToDelete));
     for (PropertyKey key : keysToDelete) {
       if (!_viewClusterDataAccessor.removeProperty(key)) {
         // Don't remove item from cache yet - will retry during next refresh
diff --git a/helix-view-aggregator/src/main/java/org/apache/helix/view/common/ClusterViewEvent.java b/helix-view-aggregator/src/main/java/org/apache/helix/view/common/ClusterViewEvent.java
new file mode 100644
index 0000000..42cedde
--- /dev/null
+++ b/helix-view-aggregator/src/main/java/org/apache/helix/view/common/ClusterViewEvent.java
@@ -0,0 +1,56 @@
+package org.apache.helix.view.common;
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+public class ClusterViewEvent {
+  public enum Type {
+    LiveInstanceChange,
+    ExternalViewChange,
+    InstanceConfigChange,
+    ConfigChange,
+    PeriodicViewRefresh
+  }
+
+  private String _clusterName;
+  private Type _eventType;
+  private long _eventProcessBackoffMs;
+
+  public ClusterViewEvent(String clusterName, Type eventType) {
+    _clusterName = clusterName;
+    _eventType = eventType;
+    _eventProcessBackoffMs = 0;
+  }
+
+  public String getClusterName() {
+    return _clusterName;
+  }
+
+  public Type getEventType() {
+    return _eventType;
+  }
+
+  public void setEventProcessBackoff(long backoffMs) {
+    _eventProcessBackoffMs = backoffMs;
+  }
+
+  public long getEventProcessBackoff() {
+    return _eventProcessBackoffMs;
+  }
+}
diff --git a/helix-view-aggregator/src/main/java/org/apache/helix/view/common/ViewAggregatorEventAttributes.java b/helix-view-aggregator/src/main/java/org/apache/helix/view/common/ViewAggregatorEventAttributes.java
deleted file mode 100644
index f550f1c..0000000
--- a/helix-view-aggregator/src/main/java/org/apache/helix/view/common/ViewAggregatorEventAttributes.java
+++ /dev/null
@@ -1,24 +0,0 @@
-package org.apache.helix.view.common;
-
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-public enum ViewAggregatorEventAttributes {
-  EventProcessBackoff,
-}
diff --git a/helix-view-aggregator/src/main/java/org/apache/helix/view/dataprovider/SourceClusterDataProvider.java b/helix-view-aggregator/src/main/java/org/apache/helix/view/dataprovider/SourceClusterDataProvider.java
index 897b8ef..c581e68 100644
--- a/helix-view-aggregator/src/main/java/org/apache/helix/view/dataprovider/SourceClusterDataProvider.java
+++ b/helix-view-aggregator/src/main/java/org/apache/helix/view/dataprovider/SourceClusterDataProvider.java
@@ -20,10 +20,8 @@
  */
 
 import java.util.List;
-import org.I0Itec.zkclient.exception.ZkInterruptedException;
 import org.apache.helix.HelixConstants;
 import org.apache.helix.HelixDataAccessor;
-import org.apache.helix.HelixException;
 import org.apache.helix.HelixManager;
 import org.apache.helix.HelixManagerFactory;
 import org.apache.helix.InstanceType;
@@ -35,14 +33,13 @@
 import org.apache.helix.api.listeners.InstanceConfigChangeListener;
 import org.apache.helix.api.listeners.LiveInstanceChangeListener;
 import org.apache.helix.api.listeners.PreFetch;
-import org.apache.helix.common.BasicClusterDataCache;
-import org.apache.helix.common.ClusterEventProcessor;
-import org.apache.helix.controller.stages.AttributeName;
-import org.apache.helix.controller.stages.ClusterEvent;
-import org.apache.helix.controller.stages.ClusterEventType;
+import org.apache.helix.common.DedupEventProcessor;
+import org.apache.helix.common.caches.BasicClusterDataCache;
 import org.apache.helix.model.ExternalView;
 import org.apache.helix.model.InstanceConfig;
 import org.apache.helix.model.LiveInstance;
+import org.apache.helix.view.common.ClusterViewEvent;
+import org.apache.helix.zookeeper.zkclient.exception.ZkInterruptedException;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -56,14 +53,14 @@
   private static final Logger LOG = LoggerFactory.getLogger(SourceClusterDataProvider.class);
 
   private final HelixManager _helixManager;
-  private final ClusterEventProcessor _eventProcessor;
+  private final DedupEventProcessor<ClusterViewEvent.Type, ClusterViewEvent> _eventProcessor;
 
   protected ViewClusterSourceConfig _sourceClusterConfig;
   private HelixDataAccessor _dataAccessor;
   private PropertyKey.Builder _propertyKeyBuilder;
 
   public SourceClusterDataProvider(ViewClusterSourceConfig config,
-      ClusterEventProcessor eventProcessor) {
+      DedupEventProcessor<ClusterViewEvent.Type, ClusterViewEvent> eventProcessor) {
     super(config.getName());
     _eventProcessor = eventProcessor;
     _sourceClusterConfig = config;
@@ -112,8 +109,8 @@
       }
       _dataAccessor = _helixManager.getHelixDataAccessor();
       _propertyKeyBuilder = _dataAccessor.keyBuilder();
-      LOG.info(String
-          .format("%s started. Source cluster detail: %s", _helixManager.getInstanceName(),
+      LOG.info(String.format("Data provider %s (%s) started. Source cluster detail: %s",
+          _helixManager.getInstanceName(), hashCode(),
               _sourceClusterConfig.toString()));
     } catch(Exception e) {
       shutdown();
@@ -130,7 +127,7 @@
       try {
         _helixManager.disconnect();
         LOG.info(
-            String.format("Data provider %s shutdown cleanly.", _helixManager.getInstanceName()));
+            String.format("Data provider %s (%s) shutdown cleanly.", _helixManager.getInstanceName(), hashCode()));
       } catch (ZkInterruptedException e) {
         // OK
       }
@@ -175,7 +172,7 @@
   @PreFetch(enabled = false)
   public void onInstanceConfigChange(List<InstanceConfig> instanceConfigs,
       NotificationContext context) {
-    queueEvent(context, ClusterEventType.InstanceConfigChange,
+    queueEvent(context, ClusterViewEvent.Type.InstanceConfigChange,
         HelixConstants.ChangeType.INSTANCE_CONFIG);
   }
 
@@ -183,7 +180,7 @@
   @PreFetch(enabled = false)
   public void onLiveInstanceChange(List<LiveInstance> liveInstances,
       NotificationContext changeContext) {
-    queueEvent(changeContext, ClusterEventType.LiveInstanceChange,
+    queueEvent(changeContext, ClusterViewEvent.Type.LiveInstanceChange,
         HelixConstants.ChangeType.LIVE_INSTANCE);
   }
 
@@ -191,21 +188,21 @@
   @PreFetch(enabled = false)
   public void onExternalViewChange(List<ExternalView> externalViewList,
       NotificationContext changeContext) {
-    queueEvent(changeContext, ClusterEventType.ExternalViewChange,
+    queueEvent(changeContext, ClusterViewEvent.Type.ExternalViewChange,
         HelixConstants.ChangeType.EXTERNAL_VIEW);
   }
 
-  private void queueEvent(NotificationContext context,
-      ClusterEventType clusterEventType, HelixConstants.ChangeType cacheChangeType)
+  private void queueEvent(NotificationContext context, ClusterViewEvent.Type changeType,
+      HelixConstants.ChangeType cacheChangeType)
       throws IllegalStateException {
     // TODO: in case of FINALIZE, if we are not shutdown, re-connect helix manager and report error
     if (context != null && context.getType() != NotificationContext.Type.FINALIZE) {
       notifyDataChange(cacheChangeType);
-      _eventProcessor.queueEvent(new ClusterEvent(_clusterName, clusterEventType));
+      _eventProcessor.queueEvent(changeType, new ClusterViewEvent(_clusterName, changeType));
     } else {
-      LOG.info(String.format("Skip queuing event. EventType: %s, ChangeType: %s, ContextType: %s",
-          clusterEventType.name(), cacheChangeType.name(),
-          context == null ? "NoContext" : context.getType().name()));
+      LOG.info("Skip queuing event from source cluster {}. ChangeType: {}, ContextType: {}",
+          _clusterName, cacheChangeType.name(),
+          context == null ? "NoContext" : context.getType().name());
     }
   }
 
diff --git a/helix-view-aggregator/src/test/java/org/apache/helix/view/aggregator/TestViewClusterRefresher.java b/helix-view-aggregator/src/test/java/org/apache/helix/view/aggregator/TestViewClusterRefresher.java
index 0d0af22..2a069ee 100644
--- a/helix-view-aggregator/src/test/java/org/apache/helix/view/aggregator/TestViewClusterRefresher.java
+++ b/helix-view-aggregator/src/test/java/org/apache/helix/view/aggregator/TestViewClusterRefresher.java
@@ -21,11 +21,13 @@
 
 import java.util.ArrayList;
 import java.util.Arrays;
+import java.util.Collections;
 import java.util.HashMap;
 import java.util.HashSet;
 import java.util.List;
 import java.util.Map;
 import java.util.Set;
+import java.util.concurrent.atomic.AtomicInteger;
 import org.apache.helix.HelixConstants;
 import org.apache.helix.HelixDataAccessor;
 import org.apache.helix.HelixProperty;
@@ -33,6 +35,7 @@
 import org.apache.helix.PropertyKey;
 import org.apache.helix.PropertyType;
 import org.apache.helix.api.config.ViewClusterSourceConfig;
+import org.apache.helix.common.ZkTestBase;
 import org.apache.helix.model.ExternalView;
 import org.apache.helix.model.InstanceConfig;
 import org.apache.helix.model.LiveInstance;
@@ -41,20 +44,18 @@
 import org.testng.Assert;
 import org.testng.annotations.Test;
 
-public class TestViewClusterRefresher {
+public class TestViewClusterRefresher extends ZkTestBase {
   private static final String viewClusterName = "viewCluster";
   private static final int numSourceCluster = 3;
   private static final int numInstancePerSourceCluster = 2;
   private static final int numExternalViewPerSourceCluster = 3;
   private static final int numPartition = 3;
-  private static final List<PropertyType> defaultProperties = Arrays.asList(
-      new PropertyType[] { PropertyType.LIVEINSTANCES, PropertyType.INSTANCES,
-          PropertyType.EXTERNALVIEW
-      });
+  private static final List<PropertyType> defaultProperties =
+      Arrays.asList(PropertyType.LIVEINSTANCES, PropertyType.INSTANCES, PropertyType.EXTERNALVIEW);
 
-  private class CounterBasedMockAccessor extends MockAccessor {
-    private int _setCount;
-    private int _removeCount;
+  private static class CounterBasedMockAccessor extends MockAccessor {
+    private AtomicInteger _setCount = new AtomicInteger(0);
+    private AtomicInteger _removeCount = new AtomicInteger(0);
 
     public CounterBasedMockAccessor(String clusterName) {
       super(clusterName);
@@ -62,40 +63,40 @@
     }
 
     public void resetCounters() {
-      _setCount = 0;
-      _removeCount = 0;
+      _setCount.set(0);
+      _removeCount.set(0);
     }
 
     @Override
     public boolean setProperty(PropertyKey key, HelixProperty value) {
-      _setCount += 1;
+      _setCount.incrementAndGet();
       return super.setProperty(key, value);
     }
 
     @Override
     public boolean removeProperty(PropertyKey key) {
-      _removeCount += 1;
+      _removeCount.incrementAndGet();
       return super.removeProperty(key);
     }
 
     public int getSetCount() {
-      return _setCount;
+      return _setCount.get();
     }
 
     public int getRemoveCount() {
-      return _removeCount;
+      return _removeCount.get();
     }
   }
 
   @Test
   public void testRefreshWithNoChange() {
-    CounterBasedMockAccessor viewClusterDataAccessor =
-        new CounterBasedMockAccessor(viewClusterName);
+    CounterBasedMockAccessor viewClusterDataAccessor = new CounterBasedMockAccessor(viewClusterName);
     Map<String, SourceClusterDataProvider> dataProviderMap = new HashMap<>();
     createMockDataProviders(dataProviderMap);
 
     ViewClusterRefresher refresher =
-        new ViewClusterRefresher(viewClusterName, viewClusterDataAccessor, dataProviderMap);
+        new ViewClusterRefresher(viewClusterName, viewClusterDataAccessor);
+    refresher.updateProviderView(new HashSet<>(dataProviderMap.values()));
 
     // Refresh an empty view cluster
     Assert.assertTrue(refresher.refreshPropertiesInViewCluster(PropertyType.LIVEINSTANCES));
@@ -111,14 +112,14 @@
   }
 
   @Test
-  public void testRefreshWithInstanceChange() {
-    CounterBasedMockAccessor viewClusterDataAccessor =
-        new CounterBasedMockAccessor(viewClusterName);
+  public void testRefreshWithInstanceChange() throws InterruptedException {
+    CounterBasedMockAccessor viewClusterDataAccessor = new CounterBasedMockAccessor(viewClusterName);
     Map<String, SourceClusterDataProvider> dataProviderMap = new HashMap<>();
     createMockDataProviders(dataProviderMap);
 
     ViewClusterRefresher refresher =
-        new ViewClusterRefresher(viewClusterName, viewClusterDataAccessor, dataProviderMap);
+        new ViewClusterRefresher(viewClusterName, viewClusterDataAccessor);
+    refresher.updateProviderView(new HashSet<>(dataProviderMap.values()));
     MockSourceClusterDataProvider sampleProvider =
         (MockSourceClusterDataProvider) dataProviderMap.get("cluster0");
 
@@ -155,13 +156,13 @@
 
   @Test
   public void testRefreshWithExternalViewChange() {
-    CounterBasedMockAccessor accessor =
-        new CounterBasedMockAccessor(viewClusterName);
+    CounterBasedMockAccessor accessor = new CounterBasedMockAccessor(viewClusterName);
     Map<String, SourceClusterDataProvider> dataProviderMap = new HashMap<>();
     createMockDataProviders(dataProviderMap);
 
     ViewClusterRefresher refresher =
-        new ViewClusterRefresher(viewClusterName, accessor, dataProviderMap);
+        new ViewClusterRefresher(viewClusterName, accessor);
+    refresher.updateProviderView(new HashSet<>(dataProviderMap.values()));
     MockSourceClusterDataProvider sampleProvider =
         (MockSourceClusterDataProvider) dataProviderMap.get("cluster0");
 
@@ -196,13 +197,13 @@
 
   @Test
   public void testRefreshWithProviderChange() {
-    CounterBasedMockAccessor viewClusterDataAccessor =
-        new CounterBasedMockAccessor(viewClusterName);
+    CounterBasedMockAccessor viewClusterDataAccessor = new CounterBasedMockAccessor(viewClusterName);
     Map<String, SourceClusterDataProvider> dataProviderMap = new HashMap<>();
     createMockDataProviders(dataProviderMap);
 
     ViewClusterRefresher refresher =
-        new ViewClusterRefresher(viewClusterName, viewClusterDataAccessor, dataProviderMap);
+        new ViewClusterRefresher(viewClusterName, viewClusterDataAccessor);
+    refresher.updateProviderView(new HashSet<>(dataProviderMap.values()));
     MockSourceClusterDataProvider sampleProvider =
         (MockSourceClusterDataProvider) dataProviderMap.get("cluster0");
 
@@ -217,7 +218,7 @@
 
     // remove InstanceConfig and ExternalView requirement from sample provider
     sampleProvider.getConfig()
-        .setProperties(Arrays.asList(new PropertyType[] { PropertyType.LIVEINSTANCES }));
+        .setProperties(Collections.singletonList(PropertyType.LIVEINSTANCES));
 
     // Refresh again
     Assert.assertTrue(refresher.refreshPropertiesInViewCluster(PropertyType.LIVEINSTANCES));
@@ -238,7 +239,7 @@
       MockSourceClusterDataProvider mockProvider = (MockSourceClusterDataProvider) provider;
       if (mockProvider != sampleProvider) {
         mockProvider.getConfig().setProperties(Arrays
-            .asList(new PropertyType[] { PropertyType.LIVEINSTANCES, PropertyType.INSTANCES }));
+            .asList(PropertyType.LIVEINSTANCES, PropertyType.INSTANCES));
       }
     }
 
@@ -300,7 +301,7 @@
     for (int i = 0; i < numSourceCluster; i++) {
       String sourceClusterName = "cluster" + i;
       ViewClusterSourceConfig sourceConfig =
-          new ViewClusterSourceConfig(sourceClusterName, "", defaultProperties);
+          new ViewClusterSourceConfig(sourceClusterName, ZK_ADDR, defaultProperties);
       MockSourceClusterDataProvider provider =
           new MockSourceClusterDataProvider(sourceConfig, null);
       List<LiveInstance> liveInstanceList = new ArrayList<>();
diff --git a/helix-view-aggregator/src/test/java/org/apache/helix/view/dataprovider/TestDataProviderUtil.java b/helix-view-aggregator/src/test/java/org/apache/helix/view/dataprovider/DataProviderTestUtil.java
similarity index 88%
rename from helix-view-aggregator/src/test/java/org/apache/helix/view/dataprovider/TestDataProviderUtil.java
rename to helix-view-aggregator/src/test/java/org/apache/helix/view/dataprovider/DataProviderTestUtil.java
index a9f8dba..3369420 100644
--- a/helix-view-aggregator/src/test/java/org/apache/helix/view/dataprovider/TestDataProviderUtil.java
+++ b/helix-view-aggregator/src/test/java/org/apache/helix/view/dataprovider/DataProviderTestUtil.java
@@ -26,13 +26,11 @@
 import org.apache.helix.api.config.ViewClusterSourceConfig;
 import org.apache.helix.model.ClusterConfig;
 
-public class TestDataProviderUtil {
+public class DataProviderTestUtil {
   private static final int viewClusterRefreshPeriod = 10;
   private static final String testZkAddr = "localhost:1010";
-  private static final List<PropertyType> testProperties = Arrays.asList(
-      new PropertyType[] { PropertyType.LIVEINSTANCES, PropertyType.EXTERNALVIEW,
-          PropertyType.INSTANCES
-      });
+  private static final List<PropertyType> testProperties =
+      Arrays.asList(PropertyType.LIVEINSTANCES, PropertyType.EXTERNALVIEW, PropertyType.INSTANCES);
 
   public static ClusterConfig createDefaultViewClusterConfig(String viewClusterName,
       int numSourceCluster) {
diff --git a/helix-view-aggregator/src/test/java/org/apache/helix/view/dataprovider/TestSourceClusterConfigChangeAction.java b/helix-view-aggregator/src/test/java/org/apache/helix/view/dataprovider/TestSourceClusterConfigChangeAction.java
index db8ddc2..16a364f 100644
--- a/helix-view-aggregator/src/test/java/org/apache/helix/view/dataprovider/TestSourceClusterConfigChangeAction.java
+++ b/helix-view-aggregator/src/test/java/org/apache/helix/view/dataprovider/TestSourceClusterConfigChangeAction.java
@@ -35,7 +35,7 @@
 
   @Test
   public void testActionComputationOnStartup() {
-    ClusterConfig config = TestDataProviderUtil.createDefaultViewClusterConfig(viewClusterName, 2);
+    ClusterConfig config = DataProviderTestUtil.createDefaultViewClusterConfig(viewClusterName, 2);
     SourceClusterConfigChangeAction action = new SourceClusterConfigChangeAction(null, config);
     action.computeAction();
     Assert.assertEquals(action.getConfigsToAdd().size(),
@@ -50,7 +50,7 @@
 
   @Test
   public void testActionComputationOnConfigModified() {
-    ClusterConfig config = TestDataProviderUtil.createDefaultViewClusterConfig(viewClusterName, 2);
+    ClusterConfig config = DataProviderTestUtil.createDefaultViewClusterConfig(viewClusterName, 2);
     ClusterConfig newConfig = new ClusterConfig(viewClusterName);
     newConfig.setViewCluster();
 
@@ -91,7 +91,7 @@
 
   @Test
   public void testActionComputationNoChange() {
-    ClusterConfig config = TestDataProviderUtil.createDefaultViewClusterConfig(viewClusterName, 2);
+    ClusterConfig config = DataProviderTestUtil.createDefaultViewClusterConfig(viewClusterName, 2);
     SourceClusterConfigChangeAction action = new SourceClusterConfigChangeAction(config, config);
     Assert.assertEquals(action.getConfigsToAdd().size(), 0);
     Assert.assertEquals(action.getConfigsToDelete().size(), 0);
@@ -100,7 +100,7 @@
 
   @Test
   public void testActionComputationInvalidInitialization() {
-    ClusterConfig config = TestDataProviderUtil.createDefaultViewClusterConfig(viewClusterName, 2);
+    ClusterConfig config = DataProviderTestUtil.createDefaultViewClusterConfig(viewClusterName, 2);
     ClusterConfig badConfig = new ClusterConfig(viewClusterName);
     try {
       SourceClusterConfigChangeAction action = new SourceClusterConfigChangeAction(config, null);
diff --git a/helix-view-aggregator/src/test/java/org/apache/helix/view/integration/TestHelixViewAggregator.java b/helix-view-aggregator/src/test/java/org/apache/helix/view/integration/TestHelixViewAggregator.java
index eed25b9..9ffb906 100644
--- a/helix-view-aggregator/src/test/java/org/apache/helix/view/integration/TestHelixViewAggregator.java
+++ b/helix-view-aggregator/src/test/java/org/apache/helix/view/integration/TestHelixViewAggregator.java
@@ -35,7 +35,6 @@
 import org.apache.helix.model.ClusterConfig;
 import org.apache.helix.model.Message;
 import org.apache.helix.participant.statemachine.StateModelParser;
-import org.apache.helix.view.ViewAggregatorIntegrationTestBase;
 import org.apache.helix.view.mock.MockViewClusterSpectator;
 import org.apache.helix.view.statemodel.DistViewAggregatorStateModel;
 import org.testng.Assert;
@@ -172,8 +171,7 @@
     // Wait for refresh and verify
     Thread.sleep((_viewClusterRefreshPeriodSec + 2) * 1000);
     verifyViewClusterEventChanges(false, false, true);
-    Assert.assertEquals(_monitor.getPropertyNamesFromViewCluster(PropertyType.LIVEINSTANCES).size(),
-        0);
+    Assert.assertEquals(_monitor.getPropertyNamesFromViewCluster(PropertyType.LIVEINSTANCES).size(), 0);
     _monitor.reset();
 
     // Simulate view aggregator service crashed and got reset
diff --git a/helix-view-aggregator/src/test/java/org/apache/helix/view/integration/TestSourceClusterDataProvider.java b/helix-view-aggregator/src/test/java/org/apache/helix/view/integration/TestSourceClusterDataProvider.java
index 2f429e0..a8046bb 100644
--- a/helix-view-aggregator/src/test/java/org/apache/helix/view/integration/TestSourceClusterDataProvider.java
+++ b/helix-view-aggregator/src/test/java/org/apache/helix/view/integration/TestSourceClusterDataProvider.java
@@ -25,7 +25,8 @@
 import org.apache.helix.PropertyType;
 import org.apache.helix.api.config.ViewClusterSourceConfig;
 import org.apache.helix.integration.manager.MockParticipantManager;
-import org.apache.helix.view.ViewAggregatorIntegrationTestBase;
+import org.apache.helix.tools.ClusterVerifiers.BestPossibleExternalViewVerifier;
+import org.apache.helix.tools.ClusterVerifiers.ZkHelixClusterVerifier;
 import org.apache.helix.view.dataprovider.SourceClusterDataProvider;
 import org.apache.helix.view.mock.MockClusterEventProcessor;
 import org.testng.Assert;
@@ -34,31 +35,31 @@
 public class TestSourceClusterDataProvider extends ViewAggregatorIntegrationTestBase {
   private static final int numSourceCluster = 1;
   private static final String stateModel = "MasterSlave";
-  private static final String testResource = "restResource";
+  private static final String testResource = "testResource";
 
   @Test
   public void testSourceClusterDataProviderWatchAndRefresh() throws Exception {
     String clusterName = _allSourceClusters.get(0);
-    List<PropertyType> properties = Arrays.asList(
-        new PropertyType[] { PropertyType.LIVEINSTANCES, PropertyType.EXTERNALVIEW,
-            PropertyType.INSTANCES
-        });
+    ZkHelixClusterVerifier clusterVerifier = new BestPossibleExternalViewVerifier.Builder(clusterName)
+        .setZkAddress(ZK_ADDR)
+        .build();
 
-    ViewClusterSourceConfig sourceClusterConfig =
-        new ViewClusterSourceConfig(clusterName, ZK_ADDR, properties);
+    List<PropertyType> properties = Arrays.asList(
+        PropertyType.LIVEINSTANCES, PropertyType.EXTERNALVIEW, PropertyType.INSTANCES);
+
+    ViewClusterSourceConfig sourceClusterConfig = new ViewClusterSourceConfig(clusterName, ZK_ADDR, properties);
 
     MockClusterEventProcessor processor = new MockClusterEventProcessor(clusterName);
     processor.start();
 
-    SourceClusterDataProvider dataProvider =
-        new SourceClusterDataProvider(sourceClusterConfig, processor);
+    SourceClusterDataProvider dataProvider = new SourceClusterDataProvider(sourceClusterConfig, processor);
 
     // setup can be re-called
     dataProvider.setup();
     dataProvider.setup();
 
-    Assert.assertEquals(new HashSet<>(dataProvider.getPropertiesToAggregate()),
-        new HashSet<>(properties));
+    Assert.assertTrue(clusterVerifier.verify(1000));
+    Assert.assertEquals(new HashSet<>(dataProvider.getPropertiesToAggregate()), new HashSet<>(properties));
 
     // When first connected, data provider will have some initial events
     Assert.assertEquals(processor.getHandledExternalViewChangeCount(), 1);
@@ -67,34 +68,33 @@
     processor.resetHandledEventCount();
 
     // ListNames should work
-    Assert.assertEquals(dataProvider.getInstanceConfigNames().size(), numPaticipantCount);
-    Assert.assertEquals(dataProvider.getLiveInstanceNames().size(), numPaticipantCount);
+    Assert.assertEquals(dataProvider.getInstanceConfigNames().size(), numParticipant);
+    Assert.assertEquals(dataProvider.getLiveInstanceNames().size(), numParticipant);
     Assert.assertEquals(dataProvider.getExternalViewNames().size(), 0);
 
     processor.resetHandledEventCount();
 
     // rebalance resource to check external view related events
-    _gSetupTool.addResourceToCluster(clusterName, testResource, numPaticipantCount, stateModel);
+    _gSetupTool.addResourceToCluster(clusterName, testResource, numParticipant, stateModel);
     _gSetupTool.rebalanceResource(clusterName, testResource, 3);
-    Thread.sleep(1000);
+    Assert.assertTrue(clusterVerifier.verify(1000));
     Assert.assertTrue(processor.getHandledExternalViewChangeCount() > 0);
     Assert.assertEquals(dataProvider.getExternalViewNames().size(), 1);
 
     // refresh data provider will have correct data loaded
     dataProvider.refreshCache();
-    Assert.assertEquals(dataProvider.getLiveInstances().size(), numPaticipantCount);
-    Assert.assertEquals(dataProvider.getInstanceConfigMap().size(), numPaticipantCount);
+    Assert.assertEquals(dataProvider.getLiveInstances().size(), numParticipant);
+    Assert.assertEquals(dataProvider.getInstanceConfigMap().size(), numParticipant);
     Assert.assertEquals(dataProvider.getExternalViews().size(), 1);
     processor.resetHandledEventCount();
 
     // Add additional participant will have corresponding change
     String testParticipantName = "testParticipant";
     _gSetupTool.addInstanceToCluster(clusterName, testParticipantName);
-    MockParticipantManager participant =
-        new MockParticipantManager(ZK_ADDR, clusterName, testParticipantName);
+    MockParticipantManager participant = new MockParticipantManager(ZK_ADDR, clusterName, testParticipantName);
     participant.syncStart();
 
-    Thread.sleep(500);
+    Assert.assertTrue(clusterVerifier.verify(500));
     Assert.assertEquals(processor.getHandledInstanceConfigChangeCount(), 1);
     Assert.assertEquals(processor.getHandledLiveInstancesChangeCount(), 1);
 
@@ -111,21 +111,20 @@
   @Test
   public void testSourceClusterDataProviderPropertyFilter() throws Exception {
     String clusterName = _allSourceClusters.get(0);
-    List<PropertyType> properties = Arrays.asList(
-        new PropertyType[] { PropertyType.LIVEINSTANCES, PropertyType.EXTERNALVIEW });
+    ZkHelixClusterVerifier clusterVerifier = new BestPossibleExternalViewVerifier.Builder(clusterName)
+        .setZkAddress(ZK_ADDR)
+        .build();
 
-    ViewClusterSourceConfig sourceClusterConfig =
-        new ViewClusterSourceConfig(clusterName, ZK_ADDR, properties);
-
+    List<PropertyType> properties = Arrays.asList(PropertyType.LIVEINSTANCES, PropertyType.EXTERNALVIEW);
+    ViewClusterSourceConfig sourceClusterConfig = new ViewClusterSourceConfig(clusterName, ZK_ADDR, properties);
     MockClusterEventProcessor processor = new MockClusterEventProcessor(clusterName);
     processor.start();
 
-    SourceClusterDataProvider dataProvider =
-        new SourceClusterDataProvider(sourceClusterConfig, processor);
+    SourceClusterDataProvider dataProvider = new SourceClusterDataProvider(sourceClusterConfig, processor);
     dataProvider.setup();
 
-    Assert.assertEquals(new HashSet<>(dataProvider.getPropertiesToAggregate()),
-        new HashSet<>(properties));
+    Assert.assertTrue(clusterVerifier.verify(1000));
+    Assert.assertEquals(new HashSet<>(dataProvider.getPropertiesToAggregate()), new HashSet<>(properties));
 
     // When first connected, data provider will have some initial events, but InstanceConfig
     // will be filtered out since its not in properties
diff --git a/helix-view-aggregator/src/test/java/org/apache/helix/view/integration/ViewAggregatorIntegrationTestBase.java b/helix-view-aggregator/src/test/java/org/apache/helix/view/integration/ViewAggregatorIntegrationTestBase.java
index 272ef7a..26afeef 100644
--- a/helix-view-aggregator/src/test/java/org/apache/helix/view/integration/ViewAggregatorIntegrationTestBase.java
+++ b/helix-view-aggregator/src/test/java/org/apache/helix/view/integration/ViewAggregatorIntegrationTestBase.java
@@ -1,4 +1,4 @@
-package org.apache.helix.view;
+package org.apache.helix.view.integration;
 
 /*
  * Licensed to the Apache Software Foundation (ASF) under one
@@ -21,15 +21,15 @@
 
 import java.util.ArrayList;
 import java.util.List;
-import org.apache.helix.integration.common.ZkIntegrationTestBase;
+import org.apache.helix.common.ZkTestBase;
 import org.apache.helix.integration.manager.ClusterControllerManager;
 import org.apache.helix.integration.manager.MockParticipantManager;
 import org.testng.annotations.AfterClass;
 import org.testng.annotations.BeforeClass;
 
-public class ViewAggregatorIntegrationTestBase extends ZkIntegrationTestBase {
+public class ViewAggregatorIntegrationTestBase extends ZkTestBase {
   protected static final int numSourceCluster = 2;
-  protected static final int numPaticipantCount = 3;
+  protected static final int numParticipant = 3;
   protected static final String testSourceClusterNamePrefix = "testSourceCluster";
   protected static final String testParticipantNamePrefix = "testParticipant";
   protected static final String testControllerNamePrefix = "testController";
@@ -46,11 +46,11 @@
           String.format("%s-%s-%s", testSourceClusterNamePrefix, this.hashCode(), i);
       _gSetupTool.addCluster(clusterName, false);
       // Setup participants
-      for (int j = 0; j < numPaticipantCount; j++) {
-        String instanceName =
-            String.format("%s-%s-%s", testParticipantNamePrefix, clusterName, j);
+      for (int j = 0; j < numParticipant; j++) {
+        String instanceName = String.format("%s-%s-%s", clusterName, testParticipantNamePrefix, j);
         _gSetupTool.addInstanceToCluster(clusterName, instanceName);
-        MockParticipantManager participant = new MockParticipantManager(ZK_ADDR, clusterName, instanceName);
+        MockParticipantManager participant =
+            new MockParticipantManager(ZK_ADDR, clusterName, instanceName);
         participant.syncStart();
         _allParticipants.add(participant);
       }
diff --git a/helix-view-aggregator/src/test/java/org/apache/helix/view/mock/MockClusterEventProcessor.java b/helix-view-aggregator/src/test/java/org/apache/helix/view/mock/MockClusterEventProcessor.java
index 36f27a2..cf51cbd 100644
--- a/helix-view-aggregator/src/test/java/org/apache/helix/view/mock/MockClusterEventProcessor.java
+++ b/helix-view-aggregator/src/test/java/org/apache/helix/view/mock/MockClusterEventProcessor.java
@@ -19,14 +19,15 @@
  * under the License.
  */
 
-import org.apache.helix.common.ClusterEventProcessor;
-import org.apache.helix.controller.stages.ClusterEvent;
+import java.util.concurrent.atomic.AtomicInteger;
+import org.apache.helix.common.DedupEventProcessor;
+import org.apache.helix.view.common.ClusterViewEvent;
 
-public class MockClusterEventProcessor extends ClusterEventProcessor {
-  private int _handledClusterConfigChange;
-  private int _handledExternalViewChange;
-  private int _handledInstanceConfigChange;
-  private int _handledLiveInstancesChange;
+public class MockClusterEventProcessor extends DedupEventProcessor<ClusterViewEvent.Type, ClusterViewEvent> {
+  private final AtomicInteger _handledClusterConfigChange = new AtomicInteger(0);
+  private final AtomicInteger _handledExternalViewChange = new AtomicInteger(0);
+  private final AtomicInteger _handledInstanceConfigChange = new AtomicInteger(0);
+  private final AtomicInteger _handledLiveInstancesChange = new AtomicInteger(0);
 
   public MockClusterEventProcessor(String clusterName) {
     super(clusterName);
@@ -34,45 +35,45 @@
   }
 
   public int getHandledClusterConfigChangeCount() {
-    return _handledClusterConfigChange;
+    return _handledClusterConfigChange.get();
   }
 
   public int getHandledExternalViewChangeCount() {
-    return _handledExternalViewChange;
+    return _handledExternalViewChange.get();
   }
 
   public int getHandledInstanceConfigChangeCount() {
-    return _handledInstanceConfigChange;
+    return _handledInstanceConfigChange.get();
   }
 
   public int getHandledLiveInstancesChangeCount() {
-    return _handledLiveInstancesChange;
+    return _handledLiveInstancesChange.get();
   }
 
   public void resetHandledEventCount() {
-    _handledClusterConfigChange = 0;
-    _handledExternalViewChange = 0;
-    _handledInstanceConfigChange = 0;
-    _handledLiveInstancesChange = 0;
+    _handledClusterConfigChange.set(0);
+    _handledExternalViewChange.set(0);
+    _handledInstanceConfigChange.set(0);
+    _handledLiveInstancesChange.set(0);
   }
 
   @Override
-  public void handleEvent(ClusterEvent event) {
+  public void handleEvent(ClusterViewEvent event) {
     switch (event.getEventType()) {
-    case ClusterConfigChange:
-      _handledClusterConfigChange += 1;
-      break;
-    case LiveInstanceChange:
-      _handledLiveInstancesChange += 1;
-      break;
-    case InstanceConfigChange:
-      _handledInstanceConfigChange += 1;
-      break;
-    case ExternalViewChange:
-      _handledExternalViewChange += 1;
-      break;
-    default:
-      break;
+      case ConfigChange:
+        _handledClusterConfigChange.incrementAndGet();
+        break;
+      case LiveInstanceChange:
+        _handledLiveInstancesChange.incrementAndGet();
+        break;
+      case InstanceConfigChange:
+        _handledInstanceConfigChange.incrementAndGet();
+        break;
+      case ExternalViewChange:
+        _handledExternalViewChange.incrementAndGet();
+        break;
+      default:
+        break;
     }
   }
 }
diff --git a/helix-view-aggregator/src/test/java/org/apache/helix/view/mock/MockSourceClusterDataProvider.java b/helix-view-aggregator/src/test/java/org/apache/helix/view/mock/MockSourceClusterDataProvider.java
index cc4afd6..57e9c38 100644
--- a/helix-view-aggregator/src/test/java/org/apache/helix/view/mock/MockSourceClusterDataProvider.java
+++ b/helix-view-aggregator/src/test/java/org/apache/helix/view/mock/MockSourceClusterDataProvider.java
@@ -20,12 +20,17 @@
  */
 
 import java.util.ArrayList;
+import java.util.HashMap;
 import java.util.List;
+import java.util.Map;
+import org.apache.helix.HelixProperty;
 import org.apache.helix.api.config.ViewClusterSourceConfig;
-import org.apache.helix.common.ClusterEventProcessor;
+import org.apache.helix.common.DedupEventProcessor;
+import org.apache.helix.common.caches.ExternalViewCache;
 import org.apache.helix.model.ExternalView;
 import org.apache.helix.model.InstanceConfig;
 import org.apache.helix.model.LiveInstance;
+import org.apache.helix.view.common.ClusterViewEvent;
 import org.apache.helix.view.dataprovider.SourceClusterDataProvider;
 
 public class MockSourceClusterDataProvider extends SourceClusterDataProvider {
@@ -48,7 +53,7 @@
   }
 
   public MockSourceClusterDataProvider(ViewClusterSourceConfig config,
-      ClusterEventProcessor processor) {
+      DedupEventProcessor<ClusterViewEvent.Type, ClusterViewEvent> processor) {
     super(config, processor);
     _externalViewCache = new MockExternalViewCache("Test");
   }
@@ -83,21 +88,14 @@
   }
 
   public void setInstanceConfigs(List<InstanceConfig> instanceConfigList) {
-    for (InstanceConfig config : instanceConfigList) {
-      _instanceConfigMap.put(config.getInstanceName(), config);
-    }
+    _instanceConfigPropertyCache.setPropertyMap(HelixProperty.convertListToMap(instanceConfigList));
   }
 
   public void setLiveInstances(List<LiveInstance> liveInstanceList) {
-    for (LiveInstance instance : liveInstanceList) {
-      _liveInstanceMap.put(instance.getInstanceName(), instance);
-    }
+    _liveInstancePropertyCache.setPropertyMap(HelixProperty.convertListToMap(liveInstanceList));
   }
 
   public void setExternalViews(List<ExternalView> externalViewList) {
-    for (ExternalView ev : externalViewList) {
-      _externalViewMap.put(ev.getResourceName(), ev);
-    }
     ((MockExternalViewCache) _externalViewCache).setExternalView(externalViewList);
   }
-}
+}
\ No newline at end of file
diff --git a/helix-view-aggregator/src/test/java/org/apache/helix/view/mock/MockViewClusterSpectator.java b/helix-view-aggregator/src/test/java/org/apache/helix/view/mock/MockViewClusterSpectator.java
new file mode 100644
index 0000000..c95ded5
--- /dev/null
+++ b/helix-view-aggregator/src/test/java/org/apache/helix/view/mock/MockViewClusterSpectator.java
@@ -0,0 +1,123 @@
+package org.apache.helix.view.mock;
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+import java.util.List;
+import org.apache.helix.HelixDataAccessor;
+import org.apache.helix.HelixManager;
+import org.apache.helix.HelixManagerFactory;
+import org.apache.helix.InstanceType;
+import org.apache.helix.NotificationContext;
+import org.apache.helix.PropertyType;
+import org.apache.helix.api.listeners.ExternalViewChangeListener;
+import org.apache.helix.api.listeners.InstanceConfigChangeListener;
+import org.apache.helix.api.listeners.LiveInstanceChangeListener;
+import org.apache.helix.api.listeners.PreFetch;
+import org.apache.helix.model.ExternalView;
+import org.apache.helix.model.InstanceConfig;
+import org.apache.helix.model.LiveInstance;
+import org.apache.helix.view.common.ClusterViewEvent;
+
+/**
+ * MockViewClusterSpectator monitors change in view cluster. When event happens, it push event to
+ * MockClusterEventProcessor which records event change count in view cluster
+ */
+public class MockViewClusterSpectator implements ExternalViewChangeListener,
+    InstanceConfigChangeListener, LiveInstanceChangeListener{
+  private final HelixManager _manager;
+  private final MockClusterEventProcessor _eventProcessor;
+  private final String _viewClusterName;
+  private final HelixDataAccessor _dataAccessor;
+
+  public MockViewClusterSpectator(String viewClusterName, String zkAddr) throws Exception {
+    _viewClusterName = viewClusterName;
+    _eventProcessor = new MockClusterEventProcessor(viewClusterName);
+    _eventProcessor.start();
+    _manager = HelixManagerFactory
+        .getZKHelixManager(viewClusterName, "MockViewClusterSpectator-" + viewClusterName,
+            InstanceType.SPECTATOR, zkAddr);
+    _manager.connect();
+    _manager.addExternalViewChangeListener(this);
+    _manager.addLiveInstanceChangeListener(this);
+    _manager.addInstanceConfigChangeListener(this);
+    _dataAccessor = _manager.getHelixDataAccessor();
+  }
+
+  public void shutdown() throws Exception {
+    _eventProcessor.interrupt();
+    if (_manager != null && _manager.isConnected()) {
+      _manager.disconnect();
+    }
+  }
+
+  @Override
+  @PreFetch(enabled = false)
+  public void onExternalViewChange(List<ExternalView> externalViewList,
+      NotificationContext changeContext) {
+    _eventProcessor.queueEvent(ClusterViewEvent.Type.ExternalViewChange,
+        new ClusterViewEvent(_viewClusterName, ClusterViewEvent.Type.ExternalViewChange));
+  }
+
+  @Override
+  @PreFetch(enabled = false)
+  public void onInstanceConfigChange(List<InstanceConfig> instanceConfigs,
+      NotificationContext context) {
+    _eventProcessor.queueEvent(ClusterViewEvent.Type.InstanceConfigChange,
+        new ClusterViewEvent(_viewClusterName, ClusterViewEvent.Type.InstanceConfigChange));
+  }
+
+  @Override
+  @PreFetch(enabled = false)
+  public void onLiveInstanceChange(List<LiveInstance> liveInstances,
+      NotificationContext changeContext) {
+    _eventProcessor.queueEvent(ClusterViewEvent.Type.LiveInstanceChange,
+        new ClusterViewEvent(_viewClusterName, ClusterViewEvent.Type.LiveInstanceChange));
+  }
+
+  public int getLiveInstanceChangeCount() {
+    return _eventProcessor.getHandledLiveInstancesChangeCount();
+  }
+
+  public int getInstanceConfigChangeCount() {
+    return _eventProcessor.getHandledInstanceConfigChangeCount();
+  }
+
+  public int getExternalViewChangeCount() {
+    return _eventProcessor.getHandledExternalViewChangeCount();
+  }
+
+  public synchronized void reset() {
+    _eventProcessor.resetHandledEventCount();
+  }
+
+  public List<String> getPropertyNamesFromViewCluster(PropertyType propertyType) {
+    switch (propertyType) {
+    case INSTANCES:
+      return _dataAccessor.getChildNames(_dataAccessor.keyBuilder().instanceConfigs());
+    case LIVEINSTANCES:
+      return _dataAccessor.getChildNames(_dataAccessor.keyBuilder().liveInstances());
+    case EXTERNALVIEW:
+      return _dataAccessor.getChildNames(_dataAccessor.keyBuilder().externalViews());
+    default:
+      return null;
+    }
+  }
+
+}
diff --git a/pom.xml b/pom.xml
index 1876824..07c7abe 100644
--- a/pom.xml
+++ b/pom.xml
@@ -275,10 +275,10 @@
     <module>helix-admin-webapp</module>
     <module>helix-rest</module>
     <module>helix-lock</module>
-    <module>helix-view-aggregator</module>
     <module>helix-agent</module>
     <!--<module>helix-front</module>-->
     <module>recipes</module>
+    <module>helix-view-aggregator</module>
   </modules>
 
   <mailingLists>