HELIX-705: create interfaces and interactions among HelixViewAggregator components

RB=1205300
BUG=HELIX-705
G=helix-reviewers
R=lxia,jjwang,jxue,erkim
A=jjwang,jxue
diff --git a/helix-core/src/main/java/org/apache/helix/api/config/ViewClusterSourceConfig.java b/helix-core/src/main/java/org/apache/helix/api/config/ViewClusterSourceConfig.java
new file mode 100644
index 0000000..ff24211
--- /dev/null
+++ b/helix-core/src/main/java/org/apache/helix/api/config/ViewClusterSourceConfig.java
@@ -0,0 +1,124 @@
+package org.apache.helix.api.config;
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.List;
+import org.apache.helix.PropertyType;
+import org.codehaus.jackson.annotate.JsonProperty;
+import org.codehaus.jackson.map.ObjectMapper;
+
+/**
+ * Represents source physical cluster information for view cluster
+ */
+public class ViewClusterSourceConfig {
+
+  private static final List<PropertyType> _validPropertyTypes = Collections.unmodifiableList(Arrays
+      .asList(new PropertyType[] { PropertyType.INSTANCES, PropertyType.EXTERNALVIEW,
+          PropertyType.LIVEINSTANCES
+      }));
+
+  private static ObjectMapper _objectMapper = new ObjectMapper();
+
+  @JsonProperty("name")
+  private String _name;
+
+  @JsonProperty("zkAddress")
+  String _zkAddress;
+
+  @JsonProperty("properties")
+  private List<PropertyType> _properties;
+
+  private ViewClusterSourceConfig() {
+  }
+
+  public ViewClusterSourceConfig(String name, String zkAddress, List<PropertyType> properties) {
+    _name = name;
+    _zkAddress = zkAddress;
+    _properties = properties;
+  }
+
+  public ViewClusterSourceConfig(ViewClusterSourceConfig config) {
+    this(config.getName(), config.getZkAddress(), new ArrayList<>(config.getProperties()));
+  }
+
+  public void setName(String name) {
+    _name = name;
+  }
+
+  public void setZkAddress(String zkAddress) {
+    _zkAddress = zkAddress;
+  }
+
+  public void setProperties(List<PropertyType> properties) {
+    for (PropertyType p : properties) {
+      if (!_validPropertyTypes.contains(p)) {
+        throw new IllegalArgumentException(
+            String.format("Property %s is not support in ViewCluster yet.", p));
+      }
+    }
+    _properties = properties;
+  }
+
+  public String getName() {
+    return _name;
+  }
+
+  public String getZkAddress() {
+    return _zkAddress;
+  }
+
+  public List<PropertyType> getProperties() {
+    return _properties;
+  }
+
+  public String toJson() throws IOException {
+    return new ObjectMapper().writeValueAsString(this);
+  }
+
+  public String toString() {
+    return String.format("name=%s; zkAddr=%s; properties=%s", _name, _zkAddress, _properties);
+  }
+
+  @Override
+  public boolean equals(Object other) {
+    if (other == null || !(other instanceof ViewClusterSourceConfig)) {
+      return false;
+    }
+    ViewClusterSourceConfig otherConfig = (ViewClusterSourceConfig) other;
+
+    return _name.equals(otherConfig.getName()) && _zkAddress.equals(otherConfig.getZkAddress())
+        && _properties.containsAll(otherConfig.getProperties()) && otherConfig.getProperties()
+        .containsAll(_properties);
+
+  }
+
+  public static ViewClusterSourceConfig fromJson(String jsonString) {
+    try {
+      return _objectMapper.readValue(jsonString, ViewClusterSourceConfig.class);
+    } catch (Exception e) {
+      throw new IllegalArgumentException(
+          String.format("Invalid Json: %s, Exception: %s", jsonString, e.toString()));
+    }
+  }
+}
diff --git a/helix-core/src/main/java/org/apache/helix/controller/stages/ClusterEventType.java b/helix-core/src/main/java/org/apache/helix/controller/stages/ClusterEventType.java
index 65f6bb4..12dc758 100644
--- a/helix-core/src/main/java/org/apache/helix/controller/stages/ClusterEventType.java
+++ b/helix-core/src/main/java/org/apache/helix/controller/stages/ClusterEventType.java
@@ -39,6 +39,7 @@
   OnDemandRebalance,
   ControllerChange,
   RetryRebalance,
+  ViewClusterPeriodicRefresh,
   StateVerifier,
   Unknown
 }
diff --git a/helix-core/src/main/java/org/apache/helix/model/ClusterConfig.java b/helix-core/src/main/java/org/apache/helix/model/ClusterConfig.java
index 72bc5dc..0456458 100644
--- a/helix-core/src/main/java/org/apache/helix/model/ClusterConfig.java
+++ b/helix-core/src/main/java/org/apache/helix/model/ClusterConfig.java
@@ -36,6 +36,7 @@
 import org.apache.helix.constants.InstanceConstants;
 import org.apache.helix.util.ConfigStringUtil;
 import org.apache.helix.zookeeper.datamodel.ZNRecord;
+import org.apache.helix.api.config.ViewClusterSourceConfig;
 
 /**
  * Cluster configurations
diff --git a/helix-core/src/test/java/org/apache/helix/integration/manager/TestZkHelixAdmin.java b/helix-core/src/test/java/org/apache/helix/integration/manager/TestZkHelixAdmin.java
index 7de552e..e60b7ce 100644
--- a/helix-core/src/test/java/org/apache/helix/integration/manager/TestZkHelixAdmin.java
+++ b/helix-core/src/test/java/org/apache/helix/integration/manager/TestZkHelixAdmin.java
@@ -31,6 +31,7 @@
 import org.apache.helix.integration.task.WorkflowGenerator;
 import org.apache.helix.manager.zk.ZKHelixAdmin;
 import org.apache.helix.model.IdealState;
+import org.apache.helix.api.config.ViewClusterSourceConfig;
 import org.apache.helix.task.JobConfig;
 import org.apache.helix.task.JobContext;
 import org.apache.helix.task.TaskPartitionState;
diff --git a/helix-view-aggregator/src/main/java/org/apache/helix/view/aggregator/HelixViewAggregator.java b/helix-view-aggregator/src/main/java/org/apache/helix/view/aggregator/HelixViewAggregator.java
new file mode 100644
index 0000000..b361ea4
--- /dev/null
+++ b/helix-view-aggregator/src/main/java/org/apache/helix/view/aggregator/HelixViewAggregator.java
@@ -0,0 +1,237 @@
+package org.apache.helix.view.aggregator;
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Timer;
+import java.util.TimerTask;
+import org.apache.helix.HelixException;
+import org.apache.helix.HelixManager;
+import org.apache.helix.HelixManagerFactory;
+import org.apache.helix.InstanceType;
+import org.apache.helix.api.config.ViewClusterSourceConfig;
+import org.apache.helix.common.ClusterEventProcessor;
+import org.apache.helix.controller.stages.ClusterEvent;
+import org.apache.helix.controller.stages.ClusterEventType;
+import org.apache.helix.view.common.ViewAggregatorEventAttributes;
+import org.apache.helix.view.dataprovider.SourceClusterDataProvider;
+import org.apache.helix.view.dataprovider.ViewClusterConfigProvider;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Main logic for Helix view aggregator
+ */
+public class HelixViewAggregator {
+  private static final Logger logger = LoggerFactory.getLogger(HelixViewAggregator.class);
+  private final String _viewClusterName;
+  private final HelixManager _viewClusterManager;
+  private ViewAggregationWorker _aggregationWorker;
+  private ViewConfigWorker _viewConfigWorker;
+  private long _lastViewClusterRefreshTimestampMs;
+  private Map<String, SourceClusterDataProvider> _dataProviderMap;
+  private ViewClusterConfigProvider _viewClusterConfigProvider;
+  private List<ViewClusterSourceConfig> _sourceConfigs;
+  private long _refreshPeriodMs;
+  private Timer _viewClusterRefreshTimer;
+  private ViewClusterRefresher _viewClusterRefresher;
+
+  public HelixViewAggregator(String viewClusterName, String zkAddr) {
+    _viewClusterName = viewClusterName;
+    _lastViewClusterRefreshTimestampMs = 0L;
+    _refreshPeriodMs = -1L;
+    _sourceConfigs = new ArrayList<>();
+    _dataProviderMap = new HashMap<>();
+    _viewClusterManager = HelixManagerFactory
+        .getZKHelixManager(_viewClusterName, generateHelixManagerInstanceName(_viewClusterName),
+            InstanceType.SPECTATOR, zkAddr);
+
+    Runtime.getRuntime().addShutdownHook(new Thread(new Runnable() {
+      @Override
+      public void run() {
+        shutdown();
+      }
+    }));
+  }
+
+  /**
+   * Start controller main logic
+   * @throws Exception
+   */
+  public void start() throws Exception {
+    try {
+      _viewClusterManager.connect();
+    } catch (Exception e) {
+      throw new HelixException("Failed to connect view cluster helix manager", e);
+    }
+
+    // set up view cluster refresher
+    _viewClusterRefresher =
+        new ViewClusterRefresher(_viewClusterName, _viewClusterManager.getHelixDataAccessor(),
+            _dataProviderMap);
+
+    // Start workers
+    _aggregationWorker = new ViewAggregationWorker();
+    _aggregationWorker.start();
+    _viewConfigWorker = new ViewConfigWorker();
+    _viewConfigWorker.start();
+
+    // Start cluster config provider
+    _viewClusterConfigProvider =
+        new ViewClusterConfigProvider(_viewClusterName, _viewClusterManager, _viewConfigWorker);
+    _viewClusterConfigProvider.setup();
+  }
+
+  /**
+   * Process view cluster config change events
+   */
+  private class ViewConfigWorker extends ClusterEventProcessor {
+    public ViewConfigWorker() {
+      super(_viewClusterName, "ViewConfigWorker");
+    }
+    @Override
+    public void handleEvent(ClusterEvent event) {
+      logger.info("Processing event " + event.getEventType().name());
+      switch (event.getEventType()) {
+      case ClusterConfigChange:
+        processViewClusterConfigUpdate();
+        break;
+      default:
+        logger.error(String.format("Unrecognized event type: %s", event.getEventType()));
+      }
+    }
+  }
+
+  /**
+   * Process source cluster data change events and view cluster periodic refresh events
+   */
+  private class ViewAggregationWorker extends ClusterEventProcessor {
+    private boolean _shouldRefresh;
+
+    public ViewAggregationWorker() {
+      super(_viewClusterName, "ViewAggregationWorker");
+      _shouldRefresh = false;
+    }
+
+    @Override
+    public void handleEvent(ClusterEvent event) {
+      logger.info("Processing event " + event.getEventType().name());
+
+      switch (event.getEventType()) {
+      case LiveInstanceChange:
+      case InstanceConfigChange:
+      case ExternalViewChange:
+        _shouldRefresh = true;
+        break;
+      case ViewClusterPeriodicRefresh:
+        Boolean forceRefresh =
+            event.getAttribute(ViewAggregatorEventAttributes.ViewClusterForceRefresh.name());
+        if (!forceRefresh && !_shouldRefresh) {
+          logger.info("Skip refresh: No event happened since last refresh, and no force refresh.");
+          return;
+        }
+
+        // mark source cluster as changed to trigger next refresh as we failed to refresh at
+        // least some of the elements in view cluster
+        logger.info("Refreshing cluster based on event " + event.getEventType().name());
+        _shouldRefresh = refreshViewCluster();
+        break;
+      default:
+        logger.error(String.format("Unrecognized event type: %s", event.getEventType()));
+      }
+    }
+  }
+
+  private class RefreshViewClusterTask extends TimerTask {
+    @Override
+    public void run() {
+      triggerViewClusterRefresh(false);
+    }
+  }
+
+  public void shutdown() {
+    if (_viewClusterManager != null) {
+      logger.info("Shutting down view cluster helix manager");
+      _viewClusterManager.disconnect();
+    }
+
+    if (_viewClusterRefreshTimer != null) {
+      logger.info("Shutting down view cluster refresh timer");
+      _viewClusterRefreshTimer.cancel();
+    }
+
+    for (SourceClusterDataProvider provider : _dataProviderMap.values()) {
+      logger
+          .info(String.format("Shutting data provider for source cluster %s", provider.getName()));
+      provider.shutdown();
+    }
+    logger.info("HelixViewAggregator shutdown cleanly");
+  }
+
+  /**
+   * Recreate timer that triggers RefreshViewClusterTask
+   */
+  private void resetTimer() {
+    // TODO: implement
+  }
+
+  /**
+   * Use ViewClusterConfigProvider (assume its up-to-date) to compute
+   * SourceClusterConfigChangeAction, based on _sourceConfigs. Use the action object to
+   * reset timer (RefreshViewClusterTask), create/delete/update SourceClusterDataProvider in
+   * data provider map and populate new _sourceConfigs
+   */
+  private synchronized void processViewClusterConfigUpdate() {
+    // TODO: implement
+  }
+
+  /**
+   * push event to worker queue to trigger refresh. Worker might not refresh view cluster
+   * if there is no event happened since last refresh
+   * @param forceRefresh
+   */
+  private void triggerViewClusterRefresh(boolean forceRefresh) {
+    ClusterEvent event = new ClusterEvent(_viewClusterName, ClusterEventType.ViewClusterPeriodicRefresh);
+    event.addAttribute(ViewAggregatorEventAttributes.ViewClusterForceRefresh.name(),
+        Boolean.valueOf(forceRefresh));
+    _aggregationWorker.queueEvent(event);
+    logger.info("Triggering view cluster refresh, forceRefresh=" + forceRefresh);
+  }
+
+  /**
+   * Use ViewClusterRefresher to refresh ViewCluster.
+   * @return true if needs retry, else false
+   */
+  private synchronized boolean refreshViewCluster() {
+    // TODO: Implement refresh logic
+    return false;
+  }
+
+  private static String generateHelixManagerInstanceName(String viewClusterName) {
+    return String.format("HelixViewAggregator-%s", viewClusterName);
+  }
+
+  private static String generateSourceClusterDataProviderMapKey(ViewClusterSourceConfig config) {
+    return String.format("%s-%s", config.getName(), config.getZkAddress());
+  }
+}
diff --git a/helix-view-aggregator/src/main/java/org/apache/helix/view/aggregator/HelixViewAggregatorMain.java b/helix-view-aggregator/src/main/java/org/apache/helix/view/aggregator/HelixViewAggregatorMain.java
new file mode 100644
index 0000000..7f28441
--- /dev/null
+++ b/helix-view-aggregator/src/main/java/org/apache/helix/view/aggregator/HelixViewAggregatorMain.java
@@ -0,0 +1,34 @@
+package org.apache.helix.view.aggregator;
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class HelixViewAggregatorMain {
+  private static Logger logger = LoggerFactory.getLogger(HelixViewAggregatorMain.class);
+  /**
+   * @param args
+   * @throws Exception
+   */
+  public static void main(String[] args) throws Exception {
+    // TODO: implement this function as launching HelixViewAggregator using CLI
+  }
+}
diff --git a/helix-view-aggregator/src/main/java/org/apache/helix/view/aggregator/ViewClusterRefresher.java b/helix-view-aggregator/src/main/java/org/apache/helix/view/aggregator/ViewClusterRefresher.java
new file mode 100644
index 0000000..e770302
--- /dev/null
+++ b/helix-view-aggregator/src/main/java/org/apache/helix/view/aggregator/ViewClusterRefresher.java
@@ -0,0 +1,107 @@
+package org.apache.helix.view.aggregator;
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+import java.util.HashMap;
+import java.util.Map;
+import org.apache.helix.HelixDataAccessor;
+import org.apache.helix.PropertyKey;
+import org.apache.helix.model.ExternalView;
+import org.apache.helix.model.InstanceConfig;
+import org.apache.helix.model.LiveInstance;
+import org.apache.helix.view.dataprovider.SourceClusterDataProvider;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Contains logics to refresh view cluster based on information from source cluster data providers
+ */
+public class ViewClusterRefresher {
+  private static final Logger logger = LoggerFactory.getLogger(ViewClusterRefresher.class);
+  private String _viewClusterName;
+  private HelixDataAccessor _viewClusterDataAccessor;
+  private PropertyKey.Builder _viewClusterKeyBuilder;
+  private Map<String, SourceClusterDataProvider> _dataProviderMap;
+
+  // These 3 caches stores objects that are pushed to view cluster (write-through) cache,
+  // thus we don't need to read from view cluster everytime we refresh it.
+  private Map<String, LiveInstance> _viewClusterLiveInstanceCache;
+  private Map<String, InstanceConfig> _viewClusterInstanceConfigCache;
+  private Map<String, ExternalView> _viewClusterExternalViewCache;
+
+  public ViewClusterRefresher(String viewClusterName, HelixDataAccessor viewClusterDataAccessor,
+      Map<String, SourceClusterDataProvider> dataProviderMap) {
+    _viewClusterName = viewClusterName;
+    _viewClusterDataAccessor = viewClusterDataAccessor;
+    _dataProviderMap = dataProviderMap;
+    _viewClusterLiveInstanceCache = new HashMap<>();
+    _viewClusterInstanceConfigCache = new HashMap<>();
+    _viewClusterExternalViewCache = new HashMap<>();
+  }
+
+  /**
+   * Create / update current live instances; remove dead live instances.
+   * This function assumes that all data providers have been refreshed.
+   */
+  public void refreshLiveInstancesInViewCluster() {
+    // TODO: implement it
+  }
+
+  /**
+   * Create / update current instance configs; remove dead instance configs
+   * This function assumes that all data providers have been refreshed.
+   */
+  public void refreshInstanceConfigsInViewCluster() {
+    // TODO: implement it
+  }
+
+  /**
+   * Create / update currently existing external views; delete outdated external views.
+   * This function assumes that all data providers have been refreshed.
+   */
+  public void refreshExtrenalViewsInViewCluster() {
+    // TODO: implement it
+  }
+
+  /**
+   * Merge external view "toMerge" into external view "source":
+   *  - if partition in toMerge does not exist in source, we add it into source
+   *  - if partition exist in both external views, we add all map fields from toMerge to source
+   * @param source
+   * @param toMerge
+   */
+  public static void mergeExternalViews(ExternalView source, ExternalView toMerge)
+      throws IllegalArgumentException {
+    if (!source.getId().equals(toMerge.getId())) {
+      throw new IllegalArgumentException(String
+          .format("Cannot merge ExternalViews with different ID. SourceID: %s; ToMergeID: %s",
+              source.getId(), toMerge.getId()));
+    }
+    for (String partitionName : toMerge.getPartitionSet()) {
+      if (!source.getPartitionSet().contains(partitionName)) {
+        source.setStateMap(partitionName, toMerge.getStateMap(partitionName));
+      } else {
+        Map<String, String> mergedPartitionState = source.getStateMap(partitionName);
+        mergedPartitionState.putAll(toMerge.getStateMap(partitionName));
+        source.setStateMap(partitionName, mergedPartitionState);
+      }
+    }
+  }
+}
diff --git a/helix-view-aggregator/src/main/java/org/apache/helix/view/common/ViewAggregatorEventAttributes.java b/helix-view-aggregator/src/main/java/org/apache/helix/view/common/ViewAggregatorEventAttributes.java
new file mode 100644
index 0000000..ca9b164
--- /dev/null
+++ b/helix-view-aggregator/src/main/java/org/apache/helix/view/common/ViewAggregatorEventAttributes.java
@@ -0,0 +1,24 @@
+package org.apache.helix.view.common;
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+public enum ViewAggregatorEventAttributes {
+  ViewClusterForceRefresh,
+}
diff --git a/helix-view-aggregator/src/main/java/org/apache/helix/view/dataprovider/SourceClusterDataProvider.java b/helix-view-aggregator/src/main/java/org/apache/helix/view/dataprovider/SourceClusterDataProvider.java
new file mode 100644
index 0000000..add2688
--- /dev/null
+++ b/helix-view-aggregator/src/main/java/org/apache/helix/view/dataprovider/SourceClusterDataProvider.java
@@ -0,0 +1,241 @@
+package org.apache.helix.view.dataprovider;
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+import java.util.List;
+import org.apache.helix.HelixConstants;
+import org.apache.helix.HelixDataAccessor;
+import org.apache.helix.HelixException;
+import org.apache.helix.HelixManager;
+import org.apache.helix.HelixManagerFactory;
+import org.apache.helix.InstanceType;
+import org.apache.helix.NotificationContext;
+import org.apache.helix.PropertyKey;
+import org.apache.helix.PropertyType;
+import org.apache.helix.api.config.ViewClusterSourceConfig;
+import org.apache.helix.api.listeners.ExternalViewChangeListener;
+import org.apache.helix.api.listeners.InstanceConfigChangeListener;
+import org.apache.helix.api.listeners.LiveInstanceChangeListener;
+import org.apache.helix.api.listeners.PreFetch;
+import org.apache.helix.common.BasicClusterDataCache;
+import org.apache.helix.common.ClusterEventProcessor;
+import org.apache.helix.controller.stages.AttributeName;
+import org.apache.helix.controller.stages.ClusterEvent;
+import org.apache.helix.controller.stages.ClusterEventType;
+import org.apache.helix.model.ExternalView;
+import org.apache.helix.model.InstanceConfig;
+import org.apache.helix.model.LiveInstance;
+
+/**
+ * SourceClusterDataProvider listens to changes in 1 source cluster, notifies cluster data cache,
+ * generates event for event processor and provide methods to read data cache
+ */
+public class SourceClusterDataProvider extends BasicClusterDataCache
+    implements InstanceConfigChangeListener, LiveInstanceChangeListener,
+    ExternalViewChangeListener {
+  private HelixManager _helixManager;
+  private HelixDataAccessor _dataAccessor;
+  private PropertyKey.Builder _propertyKeyBuilder;
+  private ViewClusterSourceConfig _sourceClusterConfig;
+  private ClusterEventProcessor _eventProcessor;
+
+  public SourceClusterDataProvider(ViewClusterSourceConfig config,
+      ClusterEventProcessor eventProcessor) {
+    super(config.getName());
+    _eventProcessor = eventProcessor;
+    _sourceClusterConfig = config;
+    _helixManager = HelixManagerFactory
+        .getZKHelixManager(config.getName(), generateHelixManagerInstanceName(config.getName()),
+            InstanceType.SPECTATOR, config.getZkAddress());
+    requireFullRefreshBasedOnConfig();
+  }
+
+  public String getName() {
+    return _helixManager.getInstanceName();
+  }
+
+  public ViewClusterSourceConfig getSourceClusterConfig() {
+    return _sourceClusterConfig;
+  }
+
+  /**
+   * Set up ClusterDataProvider. After setting up, the class should start listening
+   * on change events and perform corresponding reactions
+   * @throws Exception
+   */
+  public void setup() throws Exception {
+    try {
+      LOG.info(String.format("%s setting up ...", _helixManager.getInstanceName()));
+      _helixManager.connect();
+      _helixManager.addInstanceConfigChangeListener(this);
+      _helixManager.addLiveInstanceChangeListener(this);
+      _helixManager.addExternalViewChangeListener(this);
+      _dataAccessor = _helixManager.getHelixDataAccessor();
+      _propertyKeyBuilder = _dataAccessor.keyBuilder();
+      LOG.info(String.format("%s started.", _helixManager.getInstanceName()));
+    } catch(Exception e) {
+      shutdown();
+      throw e;
+    }
+  }
+
+  public void shutdown() {
+    // Clear cache to make sure memory is released
+    for (HelixConstants.ChangeType changeType : HelixConstants.ChangeType.values()) {
+      clearCache(changeType);
+    }
+    if (_helixManager != null && _helixManager.isConnected()) {
+      _helixManager.disconnect();
+    }
+  }
+
+  public void refreshCache() {
+    refresh(_dataAccessor);
+  }
+
+  /**
+   * Re-list current instance config names. ListName is a more reliable way to find
+   * current instance config names. This is needed for ViewClusterRefresher when
+   * it is generating diffs to push to ViewCluster
+   * @return
+   */
+  public List<String> listInstanceConfigNames() {
+    return _dataAccessor.getChildNames(_propertyKeyBuilder.instanceConfigs());
+  }
+
+  /**
+   * Re-list current live instance names.
+   * @return
+   */
+  public List<String> listLiveInstanceNames() {
+    return _dataAccessor.getChildNames(_propertyKeyBuilder.liveInstances());
+  }
+
+  /**
+   * re-list external view names
+   * @return
+   */
+  public List<String> listExternalViewNames() {
+    return _dataAccessor.getChildNames(_propertyKeyBuilder.externalViews());
+  }
+
+  /**
+   * Based on current ViewClusterSourceConfig, decide whether caller should
+   * read instance config. Used by ViewClusterRefresher
+   * @return
+   */
+  public boolean shouldReadInstanceConfigs() {
+    // TODO: implement logic
+    return false;
+  }
+
+  /**
+   * Based on current ViewClusterSourceConfig, decide whether caller should
+   * read live instances. Used by ViewClusterRefresher
+   * @return
+   */
+  public boolean shouldReadLiveInstances() {
+    // TODO: implement logic
+    return false;
+  }
+
+  /**
+   * Based on current ViewClusterSourceConfig, decide whether caller should
+   * read external view. Used by ViewClusterRefresher
+   * @return
+   */
+  public boolean shouldReadExternalViews() {
+    // TODO: implement logic
+    return false;
+  }
+
+  @Override
+  @PreFetch(enabled = false)
+  public void onInstanceConfigChange(List<InstanceConfig> instanceConfigs,
+      NotificationContext context) {
+    queueEventToProcessor(context, ClusterEventType.InstanceConfigChange,
+        HelixConstants.ChangeType.INSTANCE_CONFIG);
+  }
+
+  @Override
+  @PreFetch(enabled = false)
+  public void onLiveInstanceChange(List<LiveInstance> liveInstances,
+      NotificationContext changeContext) {
+    queueEventToProcessor(changeContext, ClusterEventType.LiveInstanceChange,
+        HelixConstants.ChangeType.LIVE_INSTANCE);
+  }
+
+  @Override
+  @PreFetch(enabled = false)
+  public void onExternalViewChange(List<ExternalView> externalViewList,
+      NotificationContext changeContext) {
+    queueEventToProcessor(changeContext, ClusterEventType.ExternalViewChange,
+        HelixConstants.ChangeType.EXTERNAL_VIEW);
+  }
+
+  private void queueEventToProcessor(NotificationContext context,
+      ClusterEventType clusterEventType, HelixConstants.ChangeType cacheChangeType)
+      throws IllegalStateException {
+    if (!shouldProcessEvent(cacheChangeType)) {
+      LOG.info(String.format(
+          "Skip processing event based on ViewClusterSourceConfig: ClusterName=%s; ClusterEventType=%s, ChangeType=%s",
+          _clusterName, clusterEventType, cacheChangeType));
+      return;
+    }
+    ClusterEvent event = new ClusterEvent(_clusterName, clusterEventType);
+    if (context != null && context.getType() != NotificationContext.Type.FINALIZE) {
+      notifyDataChange(cacheChangeType);
+      _eventProcessor.queueEvent(event);
+    } else {
+      LOG.info(String.format("SourceClusterDataProvider: skip queuing event %s", event));
+    }
+  }
+
+  /**
+   * Replace current source cluster config properties (A list of PropertyType we should aggregate)
+   * with the given ones, and update corresponding provider mechanisms
+   * @param properties
+   */
+  public synchronized void setSourceClusterConfigProperty(List<PropertyType> properties) {
+    // TODO: implement logic
+  }
+
+  /**
+   * Based on current ViewClusterSourceConfig, notify cache accordingly.
+   */
+  private void requireFullRefreshBasedOnConfig() {
+    // TODO: implement logic
+  }
+
+  /**
+   * Check source cluster config and decide whether this event should be processed
+   * @param sourceClusterChangeType
+   * @return
+   */
+  private synchronized boolean shouldProcessEvent(
+      HelixConstants.ChangeType sourceClusterChangeType) {
+    // TODO: implement
+    return false;
+  }
+
+  private static String generateHelixManagerInstanceName(String clusterName) {
+    return String.format("SourceClusterSpectatorHelixManager-%s", clusterName);
+  }
+}
diff --git a/helix-view-aggregator/src/main/java/org/apache/helix/view/dataprovider/ViewClusterConfigProvider.java b/helix-view-aggregator/src/main/java/org/apache/helix/view/dataprovider/ViewClusterConfigProvider.java
new file mode 100644
index 0000000..d7281c2
--- /dev/null
+++ b/helix-view-aggregator/src/main/java/org/apache/helix/view/dataprovider/ViewClusterConfigProvider.java
@@ -0,0 +1,138 @@
+package org.apache.helix.view.dataprovider;
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+import java.util.List;
+import org.apache.helix.HelixException;
+import org.apache.helix.HelixManager;
+import org.apache.helix.NotificationContext;
+import org.apache.helix.PropertyKey;
+import org.apache.helix.api.config.ViewClusterSourceConfig;
+import org.apache.helix.api.listeners.ClusterConfigChangeListener;
+import org.apache.helix.common.ClusterEventProcessor;
+import org.apache.helix.controller.stages.ClusterEvent;
+import org.apache.helix.controller.stages.ClusterEventType;
+import org.apache.helix.model.ClusterConfig;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * This class keeps an updated version of cluster config of the view cluster, notifies the event
+ * processor it is designated about view cluster config change, and provides methods to help adjust
+ * cluster settings.
+ */
+public class ViewClusterConfigProvider implements ClusterConfigChangeListener {
+  private static final Logger logger = LoggerFactory.getLogger(ViewClusterConfigProvider.class);
+  private String _viewClusterName;
+  private HelixManager _helixManager;
+  protected ClusterConfig _viewClusterConfig;
+  private ClusterEventProcessor _eventProcessor;
+
+  public ViewClusterConfigProvider(String clusterName, HelixManager manager,
+      ClusterEventProcessor eventProcessor) {
+    _viewClusterName = clusterName;
+    _helixManager = manager;
+    _eventProcessor = eventProcessor;
+  }
+
+  /**
+   * Set up ViewClusterConfigProvider. After setting up, the class should start listening
+   * on change events and perform corresponding reactions
+   */
+  public void setup() {
+    logger.info("Setting up ViewClusterConfigProvider for view cluster %s ...", _viewClusterName);
+    try {
+      _helixManager.addClusterfigChangeListener(this);
+    } catch (Exception e) {
+      throw new HelixException(
+          "ViewClusterConfigProvider: Failed to attach listeners to HelixManager!", e);
+    }
+  }
+
+  /**
+   * This class takes a existing list of source cluster configs and generate lists of source
+   * cluster configs to add / delete / modify, and tell caller if ViewClusterRefreshTimer should
+   * be reset or not
+   */
+  public class SourceClusterConfigChangeAction {
+    private List<ViewClusterSourceConfig> _oldConfigList;
+    private long _oldRefreshPeriodMs;
+
+    private List<ViewClusterSourceConfig> _toAdd;
+    private List<ViewClusterSourceConfig> _toDelete;
+    private List<ViewClusterSourceConfig> _toModify;
+    private boolean _refreshPeriodChanged;
+
+    public SourceClusterConfigChangeAction(List<ViewClusterSourceConfig> oldConfigList,
+        long oldRefreshPeriodMs) {
+      _oldConfigList = oldConfigList;
+      _oldRefreshPeriodMs = oldRefreshPeriodMs;
+    }
+
+    /**
+     * Compute actions and generate toAdd, toDelete toModify, and refreshPeriodChanged.
+     */
+    public void computeAction() {
+      // TODO: implement logic
+    }
+
+    public List<ViewClusterSourceConfig> getConfigsToAdd() {
+      return _toAdd;
+    }
+
+    public List<ViewClusterSourceConfig> getConfigsToDelete() {
+      return _toDelete;
+    }
+
+    public List<ViewClusterSourceConfig> getConfigsToModify() {
+      return _toModify;
+    }
+
+    public boolean shouldResetTimer() {
+      return _refreshPeriodChanged;
+    }
+  }
+
+  public synchronized long getViewClusterRefreshPeriodMs() {
+    return _viewClusterConfig.getViewClusterRefershPeriod() * 1000;
+  }
+
+  public synchronized SourceClusterConfigChangeAction getSourceConfigChangeAction(
+      List<ViewClusterSourceConfig> oldConfigList, long oldRefreshPeriodMs) {
+    return new SourceClusterConfigChangeAction(oldConfigList, oldRefreshPeriodMs);
+  }
+
+  @Override
+  public void onClusterConfigChange(ClusterConfig clusterConfig,
+      NotificationContext context) {
+    // TODO: we assume its a view cluster config here. Error handling if not?
+    refreshViewClusterConfig(clusterConfig);
+    // Source cluster config will not be aggregated so here ClusterConfigChange infers view cluster
+    ClusterEvent event = new ClusterEvent(_viewClusterName, ClusterEventType.ClusterConfigChange);
+    _eventProcessor.queueEvent(event);
+    if (logger.isDebugEnabled()) {
+      logger.debug(String.format("ViewClusterConfigProvider: queued event %s", event.toString()));
+    }
+  }
+
+  protected synchronized void refreshViewClusterConfig(ClusterConfig clusterConfig) {
+    _viewClusterConfig = clusterConfig;
+  }
+}