HELIX-705: implemented SourceClusterDataProvider's core logic and related tests
RB=1205694
BUG=HELIX-775
G=helix-reviewers
R=lxia,jjwang,jxue,erkim
A=jjwang
diff --git a/helix-view-aggregator/src/main/java/org/apache/helix/view/dataprovider/SourceClusterDataProvider.java b/helix-view-aggregator/src/main/java/org/apache/helix/view/dataprovider/SourceClusterDataProvider.java
index e0595c4..7b148e0 100644
--- a/helix-view-aggregator/src/main/java/org/apache/helix/view/dataprovider/SourceClusterDataProvider.java
+++ b/helix-view-aggregator/src/main/java/org/apache/helix/view/dataprovider/SourceClusterDataProvider.java
@@ -51,46 +51,66 @@
public class SourceClusterDataProvider extends BasicClusterDataCache
implements InstanceConfigChangeListener, LiveInstanceChangeListener,
ExternalViewChangeListener {
- private HelixManager _helixManager;
+ private final HelixManager _helixManager;
+ private final ViewClusterSourceConfig _sourceClusterConfig;
+ private final ClusterEventProcessor _eventProcessor;
+
private HelixDataAccessor _dataAccessor;
private PropertyKey.Builder _propertyKeyBuilder;
- private ViewClusterSourceConfig _sourceClusterConfig;
- private ClusterEventProcessor _eventProcessor;
public SourceClusterDataProvider(ViewClusterSourceConfig config,
ClusterEventProcessor eventProcessor) {
super(config.getName());
_eventProcessor = eventProcessor;
_sourceClusterConfig = config;
- _helixManager = HelixManagerFactory
- .getZKHelixManager(config.getName(), generateHelixManagerInstanceName(config.getName()),
- InstanceType.SPECTATOR, config.getZkAddress());
- requireFullRefreshBasedOnConfig();
+ _helixManager = HelixManagerFactory.getZKHelixManager(config.getName(),
+ generateHelixManagerInstanceName(config.getName()),
+ InstanceType.SPECTATOR, config.getZkAddress());
}
public String getName() {
return _helixManager.getInstanceName();
}
- public ViewClusterSourceConfig getSourceClusterConfig() {
- return _sourceClusterConfig;
- }
-
/**
* Set up ClusterDataProvider. After setting up, the class should start listening
* on change events and perform corresponding reactions
* @throws Exception
*/
public void setup() throws Exception {
+ if (_helixManager != null && _helixManager.isConnected()) {
+ LOG.info(String.format("Data provider %s is already setup", _helixManager.getInstanceName()));
+ return;
+ }
try {
- LOG.info(String.format("%s setting up ...", _helixManager.getInstanceName()));
_helixManager.connect();
- _helixManager.addInstanceConfigChangeListener(this);
- _helixManager.addLiveInstanceChangeListener(this);
- _helixManager.addExternalViewChangeListener(this);
+ for (PropertyType property : _sourceClusterConfig.getProperties()) {
+ HelixConstants.ChangeType changeType;
+ switch (property) {
+ case INSTANCES:
+ _helixManager.addInstanceConfigChangeListener(this);
+ changeType = HelixConstants.ChangeType.INSTANCE_CONFIG;
+ break;
+ case LIVEINSTANCES:
+ _helixManager.addLiveInstanceChangeListener(this);
+ changeType = HelixConstants.ChangeType.LIVE_INSTANCE;
+ break;
+ case EXTERNALVIEW:
+ _helixManager.addExternalViewChangeListener(this);
+ changeType = HelixConstants.ChangeType.EXTERNAL_VIEW;
+ break;
+ default:
+ LOG.warn(String
+ .format("Unsupported property type: %s. Skip adding listener", property.name()));
+ continue;
+ }
+ notifyDataChange(changeType);
+ }
_dataAccessor = _helixManager.getHelixDataAccessor();
_propertyKeyBuilder = _dataAccessor.keyBuilder();
- LOG.info(String.format("%s started.", _helixManager.getInstanceName()));
+ LOG.info(String
+ .format("%s started. Source cluster detail: %s", _helixManager.getInstanceName(),
+ _sourceClusterConfig.toString()));
} catch(Exception e) {
shutdown();
throw e;
@@ -105,6 +125,8 @@
if (_helixManager != null && _helixManager.isConnected()) {
try {
_helixManager.disconnect();
+ LOG.info(
+ String.format("Data provider %s shutdown cleanly.", _helixManager.getInstanceName()));
} catch (ZkInterruptedException e) {
// OK
}
@@ -116,66 +138,40 @@
}
/**
- * Re-list current instance config names. ListName is a more reliable way to find
+ * Get current instance config names. ListName is a more reliable way to find
* current instance config names. This is needed for ViewClusterRefresher when
* it is generating diffs to push to ViewCluster
* @return
*/
- public List<String> listInstanceConfigNames() {
+ public List<String> getInstanceConfigNames() {
return _dataAccessor.getChildNames(_propertyKeyBuilder.instanceConfigs());
}
/**
- * Re-list current live instance names.
+ * Get current live instance names.
* @return
*/
- public List<String> listLiveInstanceNames() {
+ public List<String> getLiveInstanceNames() {
return _dataAccessor.getChildNames(_propertyKeyBuilder.liveInstances());
}
/**
- * re-list external view names
+ * Get external view names
* @return
*/
- public List<String> listExternalViewNames() {
+ public List<String> getExternalViewNames() {
return _dataAccessor.getChildNames(_propertyKeyBuilder.externalViews());
}
- /**
- * Based on current ViewClusterSourceConfig, decide whether caller should
- * read instance config. Used by ViewClusterRefresher
- * @return
- */
- public boolean shouldReadInstanceConfigs() {
- // TODO: implement logic
- return false;
- }
-
- /**
- * Based on current ViewClusterSourceConfig, decide whether caller should
- * read live instances. Used by ViewClusterRefresher
- * @return
- */
- public boolean shouldReadLiveInstances() {
- // TODO: implement logic
- return false;
- }
-
- /**
- * Based on current ViewClusterSourceConfig, decide whether caller should
- * read external view. Used by ViewClusterRefresher
- * @return
- */
- public boolean shouldReadExternalViews() {
- // TODO: implement logic
- return false;
+ public List<PropertyType> getPropertiesToAggregate() {
+ return _sourceClusterConfig.getProperties();
}
@Override
@PreFetch(enabled = false)
public void onInstanceConfigChange(List<InstanceConfig> instanceConfigs,
NotificationContext context) {
- queueEventToProcessor(context, ClusterEventType.InstanceConfigChange,
+ queueEvent(context, ClusterEventType.InstanceConfigChange,
HelixConstants.ChangeType.INSTANCE_CONFIG);
}
@@ -183,7 +179,7 @@
@PreFetch(enabled = false)
public void onLiveInstanceChange(List<LiveInstance> liveInstances,
NotificationContext changeContext) {
- queueEventToProcessor(changeContext, ClusterEventType.LiveInstanceChange,
+ queueEvent(changeContext, ClusterEventType.LiveInstanceChange,
HelixConstants.ChangeType.LIVE_INSTANCE);
}
@@ -191,55 +187,24 @@
@PreFetch(enabled = false)
public void onExternalViewChange(List<ExternalView> externalViewList,
NotificationContext changeContext) {
- queueEventToProcessor(changeContext, ClusterEventType.ExternalViewChange,
+ queueEvent(changeContext, ClusterEventType.ExternalViewChange,
HelixConstants.ChangeType.EXTERNAL_VIEW);
}
- private void queueEventToProcessor(NotificationContext context,
+ private void queueEvent(NotificationContext context,
ClusterEventType clusterEventType, HelixConstants.ChangeType cacheChangeType)
throws IllegalStateException {
- if (!shouldProcessEvent(cacheChangeType)) {
- LOG.info(String.format(
- "Skip processing event based on ViewClusterSourceConfig: ClusterName=%s; ClusterEventType=%s, ChangeType=%s",
- _clusterName, clusterEventType, cacheChangeType));
- return;
- }
- ClusterEvent event = new ClusterEvent(_clusterName, clusterEventType);
+ // TODO: in case of FINALIZE, if we are not shutdown, re-connect helix manager and report error
if (context != null && context.getType() != NotificationContext.Type.FINALIZE) {
notifyDataChange(cacheChangeType);
- _eventProcessor.queueEvent(event);
+ _eventProcessor.queueEvent(new ClusterEvent(_clusterName, clusterEventType));
} else {
- LOG.info(String.format("SourceClusterDataProvider: skip queuing event %s", event));
+ LOG.info(String.format("Skip queuing event. EventType: %s, ChangeType: %s, ContextType: %s",
+ clusterEventType.name(), cacheChangeType.name(),
+ context == null ? "NoContext" : context.getType().name()));
}
}
- /**
- * Replace current source cluster config properties (A list of PropertyType we should aggregate)
- * with the given ones, and update corresponding provider mechanisms
- * @param properties
- */
- public synchronized void setSourceClusterConfigProperty(List<PropertyType> properties) {
- // TODO: implement logic
- }
-
- /**
- * Based on current ViewClusterSourceConfig, notify cache accordingly.
- */
- private void requireFullRefreshBasedOnConfig() {
- // TODO: implement logic
- }
-
- /**
- * Check source cluster config and decide whether this event should be processed
- * @param sourceClusterChangeType
- * @return
- */
- private synchronized boolean shouldProcessEvent(
- HelixConstants.ChangeType sourceClusterChangeType) {
- // TODO: implement
- return false;
- }
-
private static String generateHelixManagerInstanceName(String clusterName) {
return String.format("SourceClusterSpectatorHelixManager-%s", clusterName);
}
diff --git a/helix-view-aggregator/src/test/java/org/apache/helix/view/integration/TestSourceClusterDataProvider.java b/helix-view-aggregator/src/test/java/org/apache/helix/view/integration/TestSourceClusterDataProvider.java
new file mode 100644
index 0000000..2f429e0
--- /dev/null
+++ b/helix-view-aggregator/src/test/java/org/apache/helix/view/integration/TestSourceClusterDataProvider.java
@@ -0,0 +1,144 @@
+package org.apache.helix.view.integration;
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+import java.util.Arrays;
+import java.util.HashSet;
+import java.util.List;
+import org.apache.helix.PropertyType;
+import org.apache.helix.api.config.ViewClusterSourceConfig;
+import org.apache.helix.integration.manager.MockParticipantManager;
+import org.apache.helix.view.ViewAggregatorIntegrationTestBase;
+import org.apache.helix.view.dataprovider.SourceClusterDataProvider;
+import org.apache.helix.view.mock.MockClusterEventProcessor;
+import org.testng.Assert;
+import org.testng.annotations.Test;
+
+public class TestSourceClusterDataProvider extends ViewAggregatorIntegrationTestBase {
+ private static final int numSourceCluster = 1;
+ private static final String stateModel = "MasterSlave";
+ private static final String testResource = "restResource";
+
+ @Test
+ public void testSourceClusterDataProviderWatchAndRefresh() throws Exception {
+ String clusterName = _allSourceClusters.get(0);
+ List<PropertyType> properties = Arrays.asList(
+ new PropertyType[] { PropertyType.LIVEINSTANCES, PropertyType.EXTERNALVIEW,
+ PropertyType.INSTANCES
+ });
+
+ ViewClusterSourceConfig sourceClusterConfig =
+ new ViewClusterSourceConfig(clusterName, ZK_ADDR, properties);
+
+ MockClusterEventProcessor processor = new MockClusterEventProcessor(clusterName);
+ processor.start();
+
+ SourceClusterDataProvider dataProvider =
+ new SourceClusterDataProvider(sourceClusterConfig, processor);
+
+ // setup can be re-called
+ dataProvider.setup();
+ dataProvider.setup();
+
+ Assert.assertEquals(new HashSet<>(dataProvider.getPropertiesToAggregate()),
+ new HashSet<>(properties));
+
+ // When first connected, data provider will have some initial events
+ Assert.assertEquals(processor.getHandledExternalViewChangeCount(), 1);
+ Assert.assertEquals(processor.getHandledInstanceConfigChangeCount(), 1);
+ Assert.assertEquals(processor.getHandledLiveInstancesChangeCount(), 1);
+ processor.resetHandledEventCount();
+
+ // ListNames should work
+ Assert.assertEquals(dataProvider.getInstanceConfigNames().size(), numPaticipantCount);
+ Assert.assertEquals(dataProvider.getLiveInstanceNames().size(), numPaticipantCount);
+ Assert.assertEquals(dataProvider.getExternalViewNames().size(), 0);
+
+ processor.resetHandledEventCount();
+
+ // rebalance resource to check external view related events
+ _gSetupTool.addResourceToCluster(clusterName, testResource, numPaticipantCount, stateModel);
+ _gSetupTool.rebalanceResource(clusterName, testResource, 3);
+ Thread.sleep(1000);
+ Assert.assertTrue(processor.getHandledExternalViewChangeCount() > 0);
+ Assert.assertEquals(dataProvider.getExternalViewNames().size(), 1);
+
+ // refresh data provider will have correct data loaded
+ dataProvider.refreshCache();
+ Assert.assertEquals(dataProvider.getLiveInstances().size(), numPaticipantCount);
+ Assert.assertEquals(dataProvider.getInstanceConfigMap().size(), numPaticipantCount);
+ Assert.assertEquals(dataProvider.getExternalViews().size(), 1);
+ processor.resetHandledEventCount();
+
+ // Add additional participant will have corresponding change
+ String testParticipantName = "testParticipant";
+ _gSetupTool.addInstanceToCluster(clusterName, testParticipantName);
+ MockParticipantManager participant =
+ new MockParticipantManager(ZK_ADDR, clusterName, testParticipantName);
+ participant.syncStart();
+
+ Thread.sleep(500);
+ Assert.assertEquals(processor.getHandledInstanceConfigChangeCount(), 1);
+ Assert.assertEquals(processor.getHandledLiveInstancesChangeCount(), 1);
+
+ // shutdown can be re-called
+ dataProvider.shutdown();
+ dataProvider.shutdown();
+
+ // Verify cache is cleaned up
+ Assert.assertEquals(dataProvider.getLiveInstances().size(), 0);
+ Assert.assertEquals(dataProvider.getInstanceConfigMap().size(), 0);
+ Assert.assertEquals(dataProvider.getExternalViews().size(), 0);
+ }
+
+ @Test
+ public void testSourceClusterDataProviderPropertyFilter() throws Exception {
+ String clusterName = _allSourceClusters.get(0);
+ List<PropertyType> properties = Arrays.asList(
+ new PropertyType[] { PropertyType.LIVEINSTANCES, PropertyType.EXTERNALVIEW });
+
+ ViewClusterSourceConfig sourceClusterConfig =
+ new ViewClusterSourceConfig(clusterName, ZK_ADDR, properties);
+
+ MockClusterEventProcessor processor = new MockClusterEventProcessor(clusterName);
+ processor.start();
+
+ SourceClusterDataProvider dataProvider =
+ new SourceClusterDataProvider(sourceClusterConfig, processor);
+ dataProvider.setup();
+
+ Assert.assertEquals(new HashSet<>(dataProvider.getPropertiesToAggregate()),
+ new HashSet<>(properties));
+
+ // When first connected, data provider will have some initial events, but InstanceConfig
+ // will be filtered out since its not in properties
+ Assert.assertEquals(processor.getHandledExternalViewChangeCount(), 1);
+ Assert.assertEquals(processor.getHandledInstanceConfigChangeCount(), 0);
+ Assert.assertEquals(processor.getHandledLiveInstancesChangeCount(), 1);
+
+ dataProvider.shutdown();
+ }
+
+ @Override
+ protected int getNumSourceCluster() {
+ return numSourceCluster;
+ }
+
+}
diff --git a/helix-view-aggregator/src/test/java/org/apache/helix/view/integration/ViewAggregatorIntegrationTestBase.java b/helix-view-aggregator/src/test/java/org/apache/helix/view/integration/ViewAggregatorIntegrationTestBase.java
new file mode 100644
index 0000000..272ef7a
--- /dev/null
+++ b/helix-view-aggregator/src/test/java/org/apache/helix/view/integration/ViewAggregatorIntegrationTestBase.java
@@ -0,0 +1,87 @@
+package org.apache.helix.view;
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+import java.util.ArrayList;
+import java.util.List;
+import org.apache.helix.integration.common.ZkIntegrationTestBase;
+import org.apache.helix.integration.manager.ClusterControllerManager;
+import org.apache.helix.integration.manager.MockParticipantManager;
+import org.testng.annotations.AfterClass;
+import org.testng.annotations.BeforeClass;
+
+public class ViewAggregatorIntegrationTestBase extends ZkIntegrationTestBase {
+ protected static final int numSourceCluster = 2;
+ protected static final int numPaticipantCount = 3;
+ protected static final String testSourceClusterNamePrefix = "testSourceCluster";
+ protected static final String testParticipantNamePrefix = "testParticipant";
+ protected static final String testControllerNamePrefix = "testController";
+
+ protected List<String> _allSourceClusters = new ArrayList<>();
+ protected List<MockParticipantManager> _allParticipants = new ArrayList<>();
+ protected List<ClusterControllerManager> _allControllers = new ArrayList<>();
+
+ @BeforeClass
+ public void beforeClass() throws Exception {
+ for (int i = 0; i < getNumSourceCluster(); i++) {
+ // Setup cluster
+ String clusterName =
+ String.format("%s-%s-%s", testSourceClusterNamePrefix, this.hashCode(), i);
+ _gSetupTool.addCluster(clusterName, false);
+ // Setup participants
+ for (int j = 0; j < numPaticipantCount; j++) {
+ String instanceName =
+ String.format("%s-%s-%s", testParticipantNamePrefix, clusterName, j);
+ _gSetupTool.addInstanceToCluster(clusterName, instanceName);
+ MockParticipantManager participant = new MockParticipantManager(ZK_ADDR, clusterName, instanceName);
+ participant.syncStart();
+ _allParticipants.add(participant);
+ }
+
+ // Setup controller
+ ClusterControllerManager controller = new ClusterControllerManager(ZK_ADDR, clusterName,
+ String.format("%s-%s", testControllerNamePrefix, clusterName));
+ controller.syncStart();
+ _allControllers.add(controller);
+ _allSourceClusters.add(clusterName);
+ }
+ }
+
+ @AfterClass
+ public void afterClass() throws Exception {
+ // Stop all controllers
+ for (ClusterControllerManager controller : _allControllers) {
+ if (controller.isConnected()) {
+ controller.syncStop();
+ }
+ }
+
+ // Stop all participants
+ for (MockParticipantManager participant : _allParticipants) {
+ if (participant.isConnected()) {
+ participant.syncStop();
+ }
+ }
+ }
+
+ protected int getNumSourceCluster() {
+ return numSourceCluster;
+ }
+}
diff --git a/helix-view-aggregator/src/test/java/org/apache/helix/view/mock/MockClusterEventProcessor.java b/helix-view-aggregator/src/test/java/org/apache/helix/view/mock/MockClusterEventProcessor.java
new file mode 100644
index 0000000..36f27a2
--- /dev/null
+++ b/helix-view-aggregator/src/test/java/org/apache/helix/view/mock/MockClusterEventProcessor.java
@@ -0,0 +1,78 @@
+package org.apache.helix.view.mock;
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+import org.apache.helix.common.ClusterEventProcessor;
+import org.apache.helix.controller.stages.ClusterEvent;
+
+public class MockClusterEventProcessor extends ClusterEventProcessor {
+ private int _handledClusterConfigChange;
+ private int _handledExternalViewChange;
+ private int _handledInstanceConfigChange;
+ private int _handledLiveInstancesChange;
+
+ public MockClusterEventProcessor(String clusterName) {
+ super(clusterName);
+ resetHandledEventCount();
+ }
+
+ public int getHandledClusterConfigChangeCount() {
+ return _handledClusterConfigChange;
+ }
+
+ public int getHandledExternalViewChangeCount() {
+ return _handledExternalViewChange;
+ }
+
+ public int getHandledInstanceConfigChangeCount() {
+ return _handledInstanceConfigChange;
+ }
+
+ public int getHandledLiveInstancesChangeCount() {
+ return _handledLiveInstancesChange;
+ }
+
+ public void resetHandledEventCount() {
+ _handledClusterConfigChange = 0;
+ _handledExternalViewChange = 0;
+ _handledInstanceConfigChange = 0;
+ _handledLiveInstancesChange = 0;
+ }
+
+ @Override
+ public void handleEvent(ClusterEvent event) {
+ switch (event.getEventType()) {
+ case ClusterConfigChange:
+ _handledClusterConfigChange += 1;
+ break;
+ case LiveInstanceChange:
+ _handledLiveInstancesChange += 1;
+ break;
+ case InstanceConfigChange:
+ _handledInstanceConfigChange += 1;
+ break;
+ case ExternalViewChange:
+ _handledExternalViewChange += 1;
+ break;
+ default:
+ break;
+ }
+ }
+}