HELIX-705: implemented SourceClusterDataProvider's core logic and related tests

RB=1205694
BUG=HELIX-775
G=helix-reviewers
R=lxia,jjwang,jxue,erkim
A=jjwang
diff --git a/helix-view-aggregator/src/main/java/org/apache/helix/view/dataprovider/SourceClusterDataProvider.java b/helix-view-aggregator/src/main/java/org/apache/helix/view/dataprovider/SourceClusterDataProvider.java
index e0595c4..7b148e0 100644
--- a/helix-view-aggregator/src/main/java/org/apache/helix/view/dataprovider/SourceClusterDataProvider.java
+++ b/helix-view-aggregator/src/main/java/org/apache/helix/view/dataprovider/SourceClusterDataProvider.java
@@ -51,46 +51,66 @@
 public class SourceClusterDataProvider extends BasicClusterDataCache
     implements InstanceConfigChangeListener, LiveInstanceChangeListener,
     ExternalViewChangeListener {
-  private HelixManager _helixManager;
+  private final HelixManager _helixManager;
+  private final ViewClusterSourceConfig _sourceClusterConfig;
+  private final ClusterEventProcessor _eventProcessor;
+
   private HelixDataAccessor _dataAccessor;
   private PropertyKey.Builder _propertyKeyBuilder;
-  private ViewClusterSourceConfig _sourceClusterConfig;
-  private ClusterEventProcessor _eventProcessor;
 
   public SourceClusterDataProvider(ViewClusterSourceConfig config,
       ClusterEventProcessor eventProcessor) {
     super(config.getName());
     _eventProcessor = eventProcessor;
     _sourceClusterConfig = config;
-    _helixManager = HelixManagerFactory
-        .getZKHelixManager(config.getName(), generateHelixManagerInstanceName(config.getName()),
-            InstanceType.SPECTATOR, config.getZkAddress());
-    requireFullRefreshBasedOnConfig();
+    _helixManager = HelixManagerFactory.getZKHelixManager(config.getName(),
+        generateHelixManagerInstanceName(config.getName()),
+        InstanceType.SPECTATOR, config.getZkAddress());
   }
 
   public String getName() {
     return _helixManager.getInstanceName();
   }
 
-  public ViewClusterSourceConfig getSourceClusterConfig() {
-    return _sourceClusterConfig;
-  }
-
   /**
    * Set up ClusterDataProvider. After setting up, the class should start listening
    * on change events and perform corresponding reactions
    * @throws Exception
    */
   public void setup() throws Exception {
+    if (_helixManager != null && _helixManager.isConnected()) {
+      LOG.info(String.format("Data provider %s is already setup", _helixManager.getInstanceName()));
+      return;
+    }
     try {
-      LOG.info(String.format("%s setting up ...", _helixManager.getInstanceName()));
       _helixManager.connect();
-      _helixManager.addInstanceConfigChangeListener(this);
-      _helixManager.addLiveInstanceChangeListener(this);
-      _helixManager.addExternalViewChangeListener(this);
+      for (PropertyType property : _sourceClusterConfig.getProperties()) {
+        HelixConstants.ChangeType changeType;
+        switch (property) {
+        case INSTANCES:
+          _helixManager.addInstanceConfigChangeListener(this);
+          changeType = HelixConstants.ChangeType.INSTANCE_CONFIG;
+          break;
+        case LIVEINSTANCES:
+          _helixManager.addLiveInstanceChangeListener(this);
+          changeType = HelixConstants.ChangeType.LIVE_INSTANCE;
+          break;
+        case EXTERNALVIEW:
+          _helixManager.addExternalViewChangeListener(this);
+          changeType = HelixConstants.ChangeType.EXTERNAL_VIEW;
+          break;
+        default:
+          LOG.warn(String
+              .format("Unsupported property type: %s. Skip adding listener", property.name()));
+          continue;
+        }
+        notifyDataChange(changeType);
+      }
       _dataAccessor = _helixManager.getHelixDataAccessor();
       _propertyKeyBuilder = _dataAccessor.keyBuilder();
-      LOG.info(String.format("%s started.", _helixManager.getInstanceName()));
+      LOG.info(String
+          .format("%s started. Source cluster detail: %s", _helixManager.getInstanceName(),
+              _sourceClusterConfig.toString()));
     } catch(Exception e) {
       shutdown();
       throw e;
@@ -105,6 +125,8 @@
     if (_helixManager != null && _helixManager.isConnected()) {
       try {
         _helixManager.disconnect();
+        LOG.info(
+            String.format("Data provider %s shutdown cleanly.", _helixManager.getInstanceName()));
       } catch (ZkInterruptedException e) {
         // OK
       }
@@ -116,66 +138,40 @@
   }
 
   /**
-   * Re-list current instance config names. ListName is a more reliable way to find
+   * Get current instance config names. ListName is a more reliable way to find
    * current instance config names. This is needed for ViewClusterRefresher when
    * it is generating diffs to push to ViewCluster
    * @return
    */
-  public List<String> listInstanceConfigNames() {
+  public List<String> getInstanceConfigNames() {
     return _dataAccessor.getChildNames(_propertyKeyBuilder.instanceConfigs());
   }
 
   /**
-   * Re-list current live instance names.
+   * Get current live instance names.
    * @return
    */
-  public List<String> listLiveInstanceNames() {
+  public List<String> getLiveInstanceNames() {
     return _dataAccessor.getChildNames(_propertyKeyBuilder.liveInstances());
   }
 
   /**
-   * re-list external view names
+   * Get external view names
    * @return
    */
-  public List<String> listExternalViewNames() {
+  public List<String> getExternalViewNames() {
     return _dataAccessor.getChildNames(_propertyKeyBuilder.externalViews());
   }
 
-  /**
-   * Based on current ViewClusterSourceConfig, decide whether caller should
-   * read instance config. Used by ViewClusterRefresher
-   * @return
-   */
-  public boolean shouldReadInstanceConfigs() {
-    // TODO: implement logic
-    return false;
-  }
-
-  /**
-   * Based on current ViewClusterSourceConfig, decide whether caller should
-   * read live instances. Used by ViewClusterRefresher
-   * @return
-   */
-  public boolean shouldReadLiveInstances() {
-    // TODO: implement logic
-    return false;
-  }
-
-  /**
-   * Based on current ViewClusterSourceConfig, decide whether caller should
-   * read external view. Used by ViewClusterRefresher
-   * @return
-   */
-  public boolean shouldReadExternalViews() {
-    // TODO: implement logic
-    return false;
+  public List<PropertyType> getPropertiesToAggregate() {
+    return _sourceClusterConfig.getProperties();
   }
 
   @Override
   @PreFetch(enabled = false)
   public void onInstanceConfigChange(List<InstanceConfig> instanceConfigs,
       NotificationContext context) {
-    queueEventToProcessor(context, ClusterEventType.InstanceConfigChange,
+    queueEvent(context, ClusterEventType.InstanceConfigChange,
         HelixConstants.ChangeType.INSTANCE_CONFIG);
   }
 
@@ -183,7 +179,7 @@
   @PreFetch(enabled = false)
   public void onLiveInstanceChange(List<LiveInstance> liveInstances,
       NotificationContext changeContext) {
-    queueEventToProcessor(changeContext, ClusterEventType.LiveInstanceChange,
+    queueEvent(changeContext, ClusterEventType.LiveInstanceChange,
         HelixConstants.ChangeType.LIVE_INSTANCE);
   }
 
@@ -191,55 +187,24 @@
   @PreFetch(enabled = false)
   public void onExternalViewChange(List<ExternalView> externalViewList,
       NotificationContext changeContext) {
-    queueEventToProcessor(changeContext, ClusterEventType.ExternalViewChange,
+    queueEvent(changeContext, ClusterEventType.ExternalViewChange,
         HelixConstants.ChangeType.EXTERNAL_VIEW);
   }
 
-  private void queueEventToProcessor(NotificationContext context,
+  private void queueEvent(NotificationContext context,
       ClusterEventType clusterEventType, HelixConstants.ChangeType cacheChangeType)
       throws IllegalStateException {
-    if (!shouldProcessEvent(cacheChangeType)) {
-      LOG.info(String.format(
-          "Skip processing event based on ViewClusterSourceConfig: ClusterName=%s; ClusterEventType=%s, ChangeType=%s",
-          _clusterName, clusterEventType, cacheChangeType));
-      return;
-    }
-    ClusterEvent event = new ClusterEvent(_clusterName, clusterEventType);
+    // TODO: in case of FINALIZE, if we are not shutdown, re-connect helix manager and report error
     if (context != null && context.getType() != NotificationContext.Type.FINALIZE) {
       notifyDataChange(cacheChangeType);
-      _eventProcessor.queueEvent(event);
+      _eventProcessor.queueEvent(new ClusterEvent(_clusterName, clusterEventType));
     } else {
-      LOG.info(String.format("SourceClusterDataProvider: skip queuing event %s", event));
+      LOG.info(String.format("Skip queuing event. EventType: %s, ChangeType: %s, ContextType: %s",
+          clusterEventType.name(), cacheChangeType.name(),
+          context == null ? "NoContext" : context.getType().name()));
     }
   }
 
-  /**
-   * Replace current source cluster config properties (A list of PropertyType we should aggregate)
-   * with the given ones, and update corresponding provider mechanisms
-   * @param properties
-   */
-  public synchronized void setSourceClusterConfigProperty(List<PropertyType> properties) {
-    // TODO: implement logic
-  }
-
-  /**
-   * Based on current ViewClusterSourceConfig, notify cache accordingly.
-   */
-  private void requireFullRefreshBasedOnConfig() {
-    // TODO: implement logic
-  }
-
-  /**
-   * Check source cluster config and decide whether this event should be processed
-   * @param sourceClusterChangeType
-   * @return
-   */
-  private synchronized boolean shouldProcessEvent(
-      HelixConstants.ChangeType sourceClusterChangeType) {
-    // TODO: implement
-    return false;
-  }
-
   private static String generateHelixManagerInstanceName(String clusterName) {
     return String.format("SourceClusterSpectatorHelixManager-%s", clusterName);
   }
diff --git a/helix-view-aggregator/src/test/java/org/apache/helix/view/integration/TestSourceClusterDataProvider.java b/helix-view-aggregator/src/test/java/org/apache/helix/view/integration/TestSourceClusterDataProvider.java
new file mode 100644
index 0000000..2f429e0
--- /dev/null
+++ b/helix-view-aggregator/src/test/java/org/apache/helix/view/integration/TestSourceClusterDataProvider.java
@@ -0,0 +1,144 @@
+package org.apache.helix.view.integration;
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+import java.util.Arrays;
+import java.util.HashSet;
+import java.util.List;
+import org.apache.helix.PropertyType;
+import org.apache.helix.api.config.ViewClusterSourceConfig;
+import org.apache.helix.integration.manager.MockParticipantManager;
+import org.apache.helix.view.ViewAggregatorIntegrationTestBase;
+import org.apache.helix.view.dataprovider.SourceClusterDataProvider;
+import org.apache.helix.view.mock.MockClusterEventProcessor;
+import org.testng.Assert;
+import org.testng.annotations.Test;
+
+public class TestSourceClusterDataProvider extends ViewAggregatorIntegrationTestBase {
+  private static final int numSourceCluster = 1;
+  private static final String stateModel = "MasterSlave";
+  private static final String testResource = "restResource";
+
+  @Test
+  public void testSourceClusterDataProviderWatchAndRefresh() throws Exception {
+    String clusterName = _allSourceClusters.get(0);
+    List<PropertyType> properties = Arrays.asList(
+        new PropertyType[] { PropertyType.LIVEINSTANCES, PropertyType.EXTERNALVIEW,
+            PropertyType.INSTANCES
+        });
+
+    ViewClusterSourceConfig sourceClusterConfig =
+        new ViewClusterSourceConfig(clusterName, ZK_ADDR, properties);
+
+    MockClusterEventProcessor processor = new MockClusterEventProcessor(clusterName);
+    processor.start();
+
+    SourceClusterDataProvider dataProvider =
+        new SourceClusterDataProvider(sourceClusterConfig, processor);
+
+    // setup can be re-called
+    dataProvider.setup();
+    dataProvider.setup();
+
+    Assert.assertEquals(new HashSet<>(dataProvider.getPropertiesToAggregate()),
+        new HashSet<>(properties));
+
+    // When first connected, data provider will have some initial events
+    Assert.assertEquals(processor.getHandledExternalViewChangeCount(), 1);
+    Assert.assertEquals(processor.getHandledInstanceConfigChangeCount(), 1);
+    Assert.assertEquals(processor.getHandledLiveInstancesChangeCount(), 1);
+    processor.resetHandledEventCount();
+
+    // ListNames should work
+    Assert.assertEquals(dataProvider.getInstanceConfigNames().size(), numPaticipantCount);
+    Assert.assertEquals(dataProvider.getLiveInstanceNames().size(), numPaticipantCount);
+    Assert.assertEquals(dataProvider.getExternalViewNames().size(), 0);
+
+    processor.resetHandledEventCount();
+
+    // rebalance resource to check external view related events
+    _gSetupTool.addResourceToCluster(clusterName, testResource, numPaticipantCount, stateModel);
+    _gSetupTool.rebalanceResource(clusterName, testResource, 3);
+    Thread.sleep(1000);
+    Assert.assertTrue(processor.getHandledExternalViewChangeCount() > 0);
+    Assert.assertEquals(dataProvider.getExternalViewNames().size(), 1);
+
+    // refresh data provider will have correct data loaded
+    dataProvider.refreshCache();
+    Assert.assertEquals(dataProvider.getLiveInstances().size(), numPaticipantCount);
+    Assert.assertEquals(dataProvider.getInstanceConfigMap().size(), numPaticipantCount);
+    Assert.assertEquals(dataProvider.getExternalViews().size(), 1);
+    processor.resetHandledEventCount();
+
+    // Add additional participant will have corresponding change
+    String testParticipantName = "testParticipant";
+    _gSetupTool.addInstanceToCluster(clusterName, testParticipantName);
+    MockParticipantManager participant =
+        new MockParticipantManager(ZK_ADDR, clusterName, testParticipantName);
+    participant.syncStart();
+
+    Thread.sleep(500);
+    Assert.assertEquals(processor.getHandledInstanceConfigChangeCount(), 1);
+    Assert.assertEquals(processor.getHandledLiveInstancesChangeCount(), 1);
+
+    // shutdown can be re-called
+    dataProvider.shutdown();
+    dataProvider.shutdown();
+
+    // Verify cache is cleaned up
+    Assert.assertEquals(dataProvider.getLiveInstances().size(), 0);
+    Assert.assertEquals(dataProvider.getInstanceConfigMap().size(), 0);
+    Assert.assertEquals(dataProvider.getExternalViews().size(), 0);
+  }
+
+  @Test
+  public void testSourceClusterDataProviderPropertyFilter() throws Exception {
+    String clusterName = _allSourceClusters.get(0);
+    List<PropertyType> properties = Arrays.asList(
+        new PropertyType[] { PropertyType.LIVEINSTANCES, PropertyType.EXTERNALVIEW });
+
+    ViewClusterSourceConfig sourceClusterConfig =
+        new ViewClusterSourceConfig(clusterName, ZK_ADDR, properties);
+
+    MockClusterEventProcessor processor = new MockClusterEventProcessor(clusterName);
+    processor.start();
+
+    SourceClusterDataProvider dataProvider =
+        new SourceClusterDataProvider(sourceClusterConfig, processor);
+    dataProvider.setup();
+
+    Assert.assertEquals(new HashSet<>(dataProvider.getPropertiesToAggregate()),
+        new HashSet<>(properties));
+
+    // When first connected, data provider will have some initial events, but InstanceConfig
+    // will be filtered out since its not in properties
+    Assert.assertEquals(processor.getHandledExternalViewChangeCount(), 1);
+    Assert.assertEquals(processor.getHandledInstanceConfigChangeCount(), 0);
+    Assert.assertEquals(processor.getHandledLiveInstancesChangeCount(), 1);
+
+    dataProvider.shutdown();
+  }
+
+  @Override
+  protected int getNumSourceCluster() {
+    return numSourceCluster;
+  }
+
+}
diff --git a/helix-view-aggregator/src/test/java/org/apache/helix/view/integration/ViewAggregatorIntegrationTestBase.java b/helix-view-aggregator/src/test/java/org/apache/helix/view/integration/ViewAggregatorIntegrationTestBase.java
new file mode 100644
index 0000000..272ef7a
--- /dev/null
+++ b/helix-view-aggregator/src/test/java/org/apache/helix/view/integration/ViewAggregatorIntegrationTestBase.java
@@ -0,0 +1,87 @@
+package org.apache.helix.view;
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+import java.util.ArrayList;
+import java.util.List;
+import org.apache.helix.integration.common.ZkIntegrationTestBase;
+import org.apache.helix.integration.manager.ClusterControllerManager;
+import org.apache.helix.integration.manager.MockParticipantManager;
+import org.testng.annotations.AfterClass;
+import org.testng.annotations.BeforeClass;
+
+public class ViewAggregatorIntegrationTestBase extends ZkIntegrationTestBase {
+  protected static final int numSourceCluster = 2;
+  protected static final int numPaticipantCount = 3;
+  protected static final String testSourceClusterNamePrefix = "testSourceCluster";
+  protected static final String testParticipantNamePrefix = "testParticipant";
+  protected static final String testControllerNamePrefix = "testController";
+
+  protected List<String> _allSourceClusters = new ArrayList<>();
+  protected List<MockParticipantManager> _allParticipants = new ArrayList<>();
+  protected List<ClusterControllerManager> _allControllers = new ArrayList<>();
+
+  @BeforeClass
+  public void beforeClass() throws Exception {
+    for (int i = 0; i < getNumSourceCluster(); i++) {
+      // Setup cluster
+      String clusterName =
+          String.format("%s-%s-%s", testSourceClusterNamePrefix, this.hashCode(), i);
+      _gSetupTool.addCluster(clusterName, false);
+      // Setup participants
+      for (int j = 0; j < numPaticipantCount; j++) {
+        String instanceName =
+            String.format("%s-%s-%s", testParticipantNamePrefix, clusterName, j);
+        _gSetupTool.addInstanceToCluster(clusterName, instanceName);
+        MockParticipantManager participant = new MockParticipantManager(ZK_ADDR, clusterName, instanceName);
+        participant.syncStart();
+        _allParticipants.add(participant);
+      }
+
+      // Setup controller
+      ClusterControllerManager controller = new ClusterControllerManager(ZK_ADDR, clusterName,
+          String.format("%s-%s", testControllerNamePrefix, clusterName));
+      controller.syncStart();
+      _allControllers.add(controller);
+      _allSourceClusters.add(clusterName);
+    }
+  }
+
+  @AfterClass
+  public void afterClass() throws Exception {
+    // Stop all controllers
+    for (ClusterControllerManager controller : _allControllers) {
+      if (controller.isConnected()) {
+        controller.syncStop();
+      }
+    }
+
+    // Stop all participants
+    for (MockParticipantManager participant : _allParticipants) {
+      if (participant.isConnected()) {
+        participant.syncStop();
+      }
+    }
+  }
+
+  protected int getNumSourceCluster() {
+    return numSourceCluster;
+  }
+}
diff --git a/helix-view-aggregator/src/test/java/org/apache/helix/view/mock/MockClusterEventProcessor.java b/helix-view-aggregator/src/test/java/org/apache/helix/view/mock/MockClusterEventProcessor.java
new file mode 100644
index 0000000..36f27a2
--- /dev/null
+++ b/helix-view-aggregator/src/test/java/org/apache/helix/view/mock/MockClusterEventProcessor.java
@@ -0,0 +1,78 @@
+package org.apache.helix.view.mock;
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+import org.apache.helix.common.ClusterEventProcessor;
+import org.apache.helix.controller.stages.ClusterEvent;
+
+public class MockClusterEventProcessor extends ClusterEventProcessor {
+  private int _handledClusterConfigChange;
+  private int _handledExternalViewChange;
+  private int _handledInstanceConfigChange;
+  private int _handledLiveInstancesChange;
+
+  public MockClusterEventProcessor(String clusterName) {
+    super(clusterName);
+    resetHandledEventCount();
+  }
+
+  public int getHandledClusterConfigChangeCount() {
+    return _handledClusterConfigChange;
+  }
+
+  public int getHandledExternalViewChangeCount() {
+    return _handledExternalViewChange;
+  }
+
+  public int getHandledInstanceConfigChangeCount() {
+    return _handledInstanceConfigChange;
+  }
+
+  public int getHandledLiveInstancesChangeCount() {
+    return _handledLiveInstancesChange;
+  }
+
+  public void resetHandledEventCount() {
+    _handledClusterConfigChange = 0;
+    _handledExternalViewChange = 0;
+    _handledInstanceConfigChange = 0;
+    _handledLiveInstancesChange = 0;
+  }
+
+  @Override
+  public void handleEvent(ClusterEvent event) {
+    switch (event.getEventType()) {
+    case ClusterConfigChange:
+      _handledClusterConfigChange += 1;
+      break;
+    case LiveInstanceChange:
+      _handledLiveInstancesChange += 1;
+      break;
+    case InstanceConfigChange:
+      _handledInstanceConfigChange += 1;
+      break;
+    case ExternalViewChange:
+      _handledExternalViewChange += 1;
+      break;
+    default:
+      break;
+    }
+  }
+}