Update dependencies and fix compile errors
Unit test fix and code style improvement.
diff --git a/helix-core/src/main/java/org/apache/helix/api/config/ViewClusterSourceConfig.java b/helix-core/src/main/java/org/apache/helix/api/config/ViewClusterSourceConfig.java
index ff24211..174f6a8 100644
--- a/helix-core/src/main/java/org/apache/helix/api/config/ViewClusterSourceConfig.java
+++ b/helix-core/src/main/java/org/apache/helix/api/config/ViewClusterSourceConfig.java
@@ -19,40 +19,37 @@
* under the License.
*/
+import com.fasterxml.jackson.annotation.JsonCreator;
+import com.fasterxml.jackson.annotation.JsonIgnore;
+import com.fasterxml.jackson.annotation.JsonIgnoreProperties;
+import com.fasterxml.jackson.annotation.JsonProperty;
+import com.fasterxml.jackson.databind.ObjectMapper;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.List;
import org.apache.helix.PropertyType;
-import org.codehaus.jackson.annotate.JsonProperty;
-import org.codehaus.jackson.map.ObjectMapper;
/**
* Represents source physical cluster information for view cluster
*/
+@JsonIgnoreProperties(ignoreUnknown = true)
public class ViewClusterSourceConfig {
+ private static final List<PropertyType> _validPropertyTypes = Collections.unmodifiableList(
+ Arrays.asList(PropertyType.INSTANCES, PropertyType.EXTERNALVIEW, PropertyType.LIVEINSTANCES));
+ private static final ObjectMapper _objectMapper = new ObjectMapper();
- private static final List<PropertyType> _validPropertyTypes = Collections.unmodifiableList(Arrays
- .asList(new PropertyType[] { PropertyType.INSTANCES, PropertyType.EXTERNALVIEW,
- PropertyType.LIVEINSTANCES
- }));
-
- private static ObjectMapper _objectMapper = new ObjectMapper();
-
- @JsonProperty("name")
- private String _name;
-
- @JsonProperty("zkAddress")
- String _zkAddress;
-
- @JsonProperty("properties")
+ private final String _name;
+ private final String _zkAddress;
private List<PropertyType> _properties;
- private ViewClusterSourceConfig() {
- }
-
- public ViewClusterSourceConfig(String name, String zkAddress, List<PropertyType> properties) {
+ @JsonCreator
+ public ViewClusterSourceConfig(
+ @JsonProperty("name") String name,
+ @JsonProperty("zkAddress") String zkAddress,
+ @JsonProperty("properties") List<PropertyType> properties
+ ) {
_name = name;
_zkAddress = zkAddress;
_properties = properties;
@@ -62,14 +59,6 @@
this(config.getName(), config.getZkAddress(), new ArrayList<>(config.getProperties()));
}
- public void setName(String name) {
- _name = name;
- }
-
- public void setZkAddress(String zkAddress) {
- _zkAddress = zkAddress;
- }
-
public void setProperties(List<PropertyType> properties) {
for (PropertyType p : properties) {
if (!_validPropertyTypes.contains(p)) {
@@ -92,8 +81,13 @@
return _properties;
}
+ @JsonIgnore
+ public static List<PropertyType> getValidPropertyTypes() {
+ return _validPropertyTypes;
+ }
+
public String toJson() throws IOException {
- return new ObjectMapper().writeValueAsString(this);
+ return _objectMapper.writeValueAsString(this);
}
public String toString() {
@@ -121,4 +115,4 @@
String.format("Invalid Json: %s, Exception: %s", jsonString, e.toString()));
}
}
-}
+}
\ No newline at end of file
diff --git a/helix-core/src/main/java/org/apache/helix/common/ClusterEventProcessor.java b/helix-core/src/main/java/org/apache/helix/common/ClusterEventProcessor.java
index 690cdd2..c2a48b5 100644
--- a/helix-core/src/main/java/org/apache/helix/common/ClusterEventProcessor.java
+++ b/helix-core/src/main/java/org/apache/helix/common/ClusterEventProcessor.java
@@ -42,9 +42,4 @@
public void queueEvent(ClusterEvent event) {
_eventQueue.put(event.getEventType(), event);
}
-
- public void shutdown() {
- _eventQueue.clear();
- this.interrupt();
- }
}
diff --git a/helix-core/src/main/java/org/apache/helix/common/caches/BasicClusterDataCache.java b/helix-core/src/main/java/org/apache/helix/common/caches/BasicClusterDataCache.java
index 2870112..4d3ce72 100644
--- a/helix-core/src/main/java/org/apache/helix/common/caches/BasicClusterDataCache.java
+++ b/helix-core/src/main/java/org/apache/helix/common/caches/BasicClusterDataCache.java
@@ -19,11 +19,13 @@
* under the License.
*/
+import java.util.Collections;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import org.apache.helix.HelixConstants;
import org.apache.helix.HelixDataAccessor;
+import org.apache.helix.HelixProperty;
import org.apache.helix.PropertyKey;
import org.apache.helix.common.controllers.ControlContextProvider;
import org.apache.helix.model.ExternalView;
@@ -46,10 +48,6 @@
protected PropertyCache<LiveInstance> _liveInstancePropertyCache;
protected PropertyCache<InstanceConfig> _instanceConfigPropertyCache;
protected ExternalViewCache _externalViewCache;
- protected Map<String, LiveInstance> _liveInstanceMap;
- protected Map<String, InstanceConfig> _instanceConfigMap;
- protected Map<String, ExternalView> _externalViewMap;
- private final PropertyType _sourceDataType;
protected String _clusterName;
@@ -110,12 +108,12 @@
LOG.info("START: BasicClusterDataCache.refresh() for cluster " + _clusterName);
long startTime = System.currentTimeMillis();
- if (_propertyDataChangedMap.get(HelixConstants.ChangeType.EXTERNAL_VIEW)) {
+ if (_propertyDataChangedMap.getOrDefault(HelixConstants.ChangeType.EXTERNAL_VIEW, false)) {
_propertyDataChangedMap.put(HelixConstants.ChangeType.EXTERNAL_VIEW, false);
_externalViewCache.refresh(accessor);
}
- if (_propertyDataChangedMap.get(HelixConstants.ChangeType.LIVE_INSTANCE)) {
+ if (_propertyDataChangedMap.getOrDefault(HelixConstants.ChangeType.LIVE_INSTANCE, false)) {
long start = System.currentTimeMillis();
_propertyDataChangedMap.put(HelixConstants.ChangeType.LIVE_INSTANCE, false);
_propertyDataChangedMap.put(HelixConstants.ChangeType.CURRENT_STATE, true);
@@ -124,7 +122,7 @@
+ ". Takes " + (System.currentTimeMillis() - start) + " ms");
}
- if (_propertyDataChangedMap.get(HelixConstants.ChangeType.INSTANCE_CONFIG)) {
+ if (_propertyDataChangedMap.getOrDefault(HelixConstants.ChangeType.INSTANCE_CONFIG, false)) {
long start = System.currentTimeMillis();
_propertyDataChangedMap.put(HelixConstants.ChangeType.INSTANCE_CONFIG, false);
_instanceConfigPropertyCache.refresh(accessor);
@@ -196,15 +194,17 @@
*/
public synchronized void clearCache(HelixConstants.ChangeType changeType) {
switch (changeType) {
- case LIVE_INSTANCE:
- case INSTANCE_CONFIG:
- LOG.warn("clearCache is deprecated for changeType: {}.", changeType);
- break;
- case EXTERNAL_VIEW:
- _externalViewCache.clear();
- break;
- default:
- break;
+ case LIVE_INSTANCE:
+ _liveInstancePropertyCache.setPropertyMap(HelixProperty.convertListToMap(Collections.emptyList()));
+ break;
+ case INSTANCE_CONFIG:
+ _instanceConfigPropertyCache.setPropertyMap(HelixProperty.convertListToMap(Collections.emptyList()));
+ break;
+ case EXTERNAL_VIEW:
+ _externalViewCache.clear();
+ break;
+ default:
+ break;
}
}
diff --git a/helix-core/src/main/java/org/apache/helix/controller/rebalancer/topology/Topology.java b/helix-core/src/main/java/org/apache/helix/controller/rebalancer/topology/Topology.java
index f35b637..98ef0c5 100644
--- a/helix-core/src/main/java/org/apache/helix/controller/rebalancer/topology/Topology.java
+++ b/helix-core/src/main/java/org/apache/helix/controller/rebalancer/topology/Topology.java
@@ -93,7 +93,6 @@
}
return Collections.emptyList();
}
-
/**
* Returns all leaf nodes that belong in the tree. Returns itself if this node is a leaf.
*
diff --git a/helix-core/src/main/java/org/apache/helix/controller/stages/ClusterEventType.java b/helix-core/src/main/java/org/apache/helix/controller/stages/ClusterEventType.java
index 12dc758..65f6bb4 100644
--- a/helix-core/src/main/java/org/apache/helix/controller/stages/ClusterEventType.java
+++ b/helix-core/src/main/java/org/apache/helix/controller/stages/ClusterEventType.java
@@ -39,7 +39,6 @@
OnDemandRebalance,
ControllerChange,
RetryRebalance,
- ViewClusterPeriodicRefresh,
StateVerifier,
Unknown
}
diff --git a/helix-core/src/main/java/org/apache/helix/model/ClusterConfig.java b/helix-core/src/main/java/org/apache/helix/model/ClusterConfig.java
index 0456458..e279856 100644
--- a/helix-core/src/main/java/org/apache/helix/model/ClusterConfig.java
+++ b/helix-core/src/main/java/org/apache/helix/model/ClusterConfig.java
@@ -19,6 +19,7 @@
* under the License.
*/
+import java.io.IOException;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
@@ -37,6 +38,7 @@
import org.apache.helix.util.ConfigStringUtil;
import org.apache.helix.zookeeper.datamodel.ZNRecord;
import org.apache.helix.api.config.ViewClusterSourceConfig;
+import org.apache.helix.zookeeper.datamodel.ZNRecord;
/**
* Cluster configurations
@@ -85,6 +87,10 @@
// error exceeds this limitation
DISABLED_INSTANCES,
+ VIEW_CLUSTER, // Set to "true" to indicate this is a view cluster
+ VIEW_CLUSTER_SOURCES, // Map field, key is the name of source cluster, value is
+ // ViewClusterSourceConfig JSON string
+ VIEW_CLUSTER_REFRESH_PERIOD, // In second
// Specifies job types and used for quota allocation
QUOTA_TYPES,
@@ -178,6 +184,7 @@
public final static boolean DEFAULT_GLOBAL_REBALANCE_ASYNC_MODE_ENABLED = true;
private static final int GLOBAL_TARGET_TASK_THREAD_POOL_SIZE_NOT_SET = -1;
private static final int OFFLINE_NODE_TIME_OUT_FOR_MAINTENANCE_MODE_NOT_SET = -1;
+ private final static int DEFAULT_VIEW_CLUSTER_REFRESH_PERIOD = 30;
/**
* Instantiate for a specific cluster
@@ -195,6 +202,36 @@
super(record);
}
+ public void setViewCluster() {
+ _record.setBooleanField(ClusterConfigProperty.VIEW_CLUSTER.name(), true);
+ }
+
+ /**
+ * Whether this cluster is a ViewCluster
+ * @return
+ */
+ public boolean isViewCluster() {
+ return _record
+ .getBooleanField(ClusterConfigProperty.VIEW_CLUSTER.name(), false);
+ }
+
+ /**
+ * Set a list of ViewClusterSourceConfig to ClusterConfig. Current source config will be
+ * overwritten
+ * @param sourceConfigList
+ */
+ public void setViewClusterSourceConfigs(List<ViewClusterSourceConfig> sourceConfigList) {
+ List<String> sourceConfigs = new ArrayList<>();
+ for (ViewClusterSourceConfig config : sourceConfigList) {
+ try {
+ sourceConfigs.add(config.toJson());
+ } catch (IOException e) {
+ throw new IllegalArgumentException("Invalid source config. Error: " + e.toString());
+ }
+ }
+ _record.setListField(ClusterConfigProperty.VIEW_CLUSTER_SOURCES.name(), sourceConfigs);
+ }
+
/**
* Set task quota type with the ratio of this quota.
* @param quotaType String
@@ -263,6 +300,30 @@
}
/**
+ * Set view cluster max refresh period
+ * @param refreshPeriod refresh period in second
+ */
+ public void setViewClusterRefreshPeriod(int refreshPeriod) {
+ _record.setIntField(ClusterConfigProperty.VIEW_CLUSTER_REFRESH_PERIOD.name(),
+ refreshPeriod);
+ }
+
+ public List<ViewClusterSourceConfig> getViewClusterSourceConfigs() {
+ List<ViewClusterSourceConfig> sourceConfigList = new ArrayList<>();
+ for (String configJSON : _record
+ .getListField(ClusterConfigProperty.VIEW_CLUSTER_SOURCES.name())) {
+ ViewClusterSourceConfig config = ViewClusterSourceConfig.fromJson(configJSON);
+ sourceConfigList.add(config);
+ }
+ return sourceConfigList;
+ }
+
+ public int getViewClusterRefershPeriod() {
+ return _record.getIntField(ClusterConfigProperty.VIEW_CLUSTER_REFRESH_PERIOD.name(),
+ DEFAULT_VIEW_CLUSTER_REFRESH_PERIOD);
+ }
+
+ /**
* Whether to persist best possible assignment in a resource's idealstate.
* @return
*/
diff --git a/helix-core/src/test/java/org/apache/helix/integration/manager/TestZkHelixAdmin.java b/helix-core/src/test/java/org/apache/helix/integration/manager/TestZkHelixAdmin.java
index e60b7ce..7de552e 100644
--- a/helix-core/src/test/java/org/apache/helix/integration/manager/TestZkHelixAdmin.java
+++ b/helix-core/src/test/java/org/apache/helix/integration/manager/TestZkHelixAdmin.java
@@ -31,7 +31,6 @@
import org.apache.helix.integration.task.WorkflowGenerator;
import org.apache.helix.manager.zk.ZKHelixAdmin;
import org.apache.helix.model.IdealState;
-import org.apache.helix.api.config.ViewClusterSourceConfig;
import org.apache.helix.task.JobConfig;
import org.apache.helix.task.JobContext;
import org.apache.helix.task.TaskPartitionState;
diff --git a/helix-view-aggregator/helix-view-aggregator-0.6.10-SNAPSHOT.ivy b/helix-view-aggregator/helix-view-aggregator-1.0.3-SNAPSHOT.ivy
similarity index 84%
rename from helix-view-aggregator/helix-view-aggregator-0.6.10-SNAPSHOT.ivy
rename to helix-view-aggregator/helix-view-aggregator-1.0.3-SNAPSHOT.ivy
index 7ba8579..cf817b8 100644
--- a/helix-view-aggregator/helix-view-aggregator-0.6.10-SNAPSHOT.ivy
+++ b/helix-view-aggregator/helix-view-aggregator-1.0.3-SNAPSHOT.ivy
@@ -7,9 +7,7 @@
to you under the Apache License, Version 2.0 (the
"License"); you may not use this file except in compliance
with the License. You may obtain a copy of the License at
-
http://www.apache.org/licenses/LICENSE-2.0
-
Unless required by applicable law or agreed to in writing,
software distributed under the License is distributed on an
"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
@@ -20,7 +18,7 @@
<ivy-module version="1.0">
<info organisation="org.apache.helix"
module="helix-view-aggregator"
- revision="0.6.10-SNAPSHOT"
+ revision="1.0.3-SNAPSHOT"
status="integration"
publication="20170128141623"
/>
@@ -37,16 +35,19 @@
<artifact name="helix-view-aggregator" type="jar" ext="jar" conf="master"/>
</publications>
<dependencies>
- <dependency org="org.slf4j" name="slf4j-api" rev="1.7.25" force="true" conf="compile->compile(*),master(*);runtime->runtime(*)">
+ <dependency org="org.slf4j" name="slf4j-api" rev="1.7.32" force="true" conf="compile->compile(*),master(*);runtime->runtime(*)">
<artifact name="slf4j-api" ext="jar"/>
</dependency>
- <dependency org="org.slf4j" name="slf4j-log4j12" rev="1.7.14" force="true" conf="compile->compile(*),master(*);runtime->runtime(*)">
- <artifact name="slf4j-log4j12" ext="jar"/>
+ <dependency org="org.apache.logging.log4j" name="log4j-slf4j-impl" rev="2.17.1" force="true" conf="compile->compile(*),master(*);runtime->runtime(*)">
+ <artifact name="log4j-slf4j-impl" ext="jar"/>
</dependency>
- <dependency org="org.apache.helix" name="helix-core" rev="0.8.0-SNAPSHOT" force="true" conf="compile->compile(*),master(*);runtime->runtime(*)"/>
+ <--dependency org="org.slf4j" name="slf4j-log4j12" rev="1.7.14" force="true" conf="compile->compile(*),master(*);runtime->runtime(*)">
+ <artifact name="slf4j-log4j12" ext="jar"/>
+ </dependency-->
+ <dependency org="org.apache.helix" name="helix-core" rev="1.0.3-SNAPSHOT" force="true" conf="compile->compile(*),master(*);runtime->runtime(*)"/>
<dependency org="org.codehaus.jackson" name="jackson-core-asl" rev="1.8.5" force="true" conf="compile->compile(*),master(*);runtime->runtime(*)"/>
<dependency org="org.codehaus.jackson" name="jackson-mapper-asl" rev="1.8.5" force="true" conf="compile->compile(*),master(*);runtime->runtime(*)"/>
<dependency org="commons-cli" name="commons-cli" rev="1.2" force="true" conf="compile->compile(*),master(*);runtime->runtime(*)"/>
<dependency org="io.dropwizard.metrics" name="metrics-core" rev="3.2.3" conf="compile->compile(default);runtime->runtime(default);default->default"/>
</dependencies>
-</ivy-module>
+</ivy-module>
\ No newline at end of file
diff --git a/helix-view-aggregator/pom.xml b/helix-view-aggregator/pom.xml
index 2f05fcc..f914c61 100644
--- a/helix-view-aggregator/pom.xml
+++ b/helix-view-aggregator/pom.xml
@@ -21,7 +21,7 @@
<parent>
<groupId>org.apache.helix</groupId>
<artifactId>helix</artifactId>
- <version>0.6.10-SNAPSHOT</version>
+ <version>1.0.3-SNAPSHOT</version>
</parent>
<modelVersion>4.0.0</modelVersion>
@@ -32,9 +32,7 @@
<properties>
<osgi.import>
org.apache.helix*,
- org.codehaus.jackson*,
org.apache.commons.cli*,
- org.slf4j*;version="[1.6,2)",
*
</osgi.import>
<osgi.export>org.apache.helix.view*;version="${project.version};-noimport:=true</osgi.export>
@@ -49,26 +47,31 @@
<dependency>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-api</artifactId>
- <version>1.7.25</version>
+ <version>1.7.32</version>
</dependency>
<dependency>
- <groupId>org.slf4j</groupId>
- <artifactId>slf4j-log4j12</artifactId>
- <version>1.7.14</version>
+ <groupId>org.apache.logging.log4j</groupId>
+ <artifactId>log4j-slf4j-impl</artifactId>
+ <version>2.17.1</version>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.logging.log4j</groupId>
+ <artifactId>log4j-core</artifactId>
+ <version>2.17.1</version>
</dependency>
<dependency>
<groupId>org.apache.helix</groupId>
<artifactId>helix-core</artifactId>
</dependency>
<dependency>
- <groupId>org.codehaus.jackson</groupId>
- <artifactId>jackson-core-asl</artifactId>
- <version>1.8.5</version>
+ <groupId>com.fasterxml.jackson.core</groupId>
+ <artifactId>jackson-databind</artifactId>
+ <version>2.11.0</version>
</dependency>
<dependency>
- <groupId>org.codehaus.jackson</groupId>
- <artifactId>jackson-mapper-asl</artifactId>
- <version>1.8.5</version>
+ <groupId>com.fasterxml.jackson.core</groupId>
+ <artifactId>jackson-core</artifactId>
+ <version>2.11.0</version>
</dependency>
<dependency>
<groupId>commons-cli</groupId>
@@ -133,4 +136,4 @@
</plugin>
</plugins>
</build>
-</project>
+</project>
\ No newline at end of file
diff --git a/helix-view-aggregator/src/main/java/org/apache/helix/view/aggregator/HelixViewAggregator.java b/helix-view-aggregator/src/main/java/org/apache/helix/view/aggregator/HelixViewAggregator.java
index ab122db..b1d34b9 100644
--- a/helix-view-aggregator/src/main/java/org/apache/helix/view/aggregator/HelixViewAggregator.java
+++ b/helix-view-aggregator/src/main/java/org/apache/helix/view/aggregator/HelixViewAggregator.java
@@ -19,28 +19,29 @@
* under the License.
*/
-import java.util.HashMap;
import java.util.HashSet;
import java.util.Map;
+import java.util.Set;
import java.util.Timer;
import java.util.TimerTask;
-import org.I0Itec.zkclient.exception.ZkInterruptedException;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.atomic.AtomicBoolean;
import org.apache.helix.HelixDataAccessor;
import org.apache.helix.HelixException;
import org.apache.helix.HelixManager;
import org.apache.helix.HelixManagerFactory;
import org.apache.helix.InstanceType;
import org.apache.helix.NotificationContext;
+import org.apache.helix.PropertyType;
import org.apache.helix.api.config.ViewClusterSourceConfig;
import org.apache.helix.api.listeners.ClusterConfigChangeListener;
import org.apache.helix.api.listeners.PreFetch;
-import org.apache.helix.common.ClusterEventProcessor;
-import org.apache.helix.controller.stages.ClusterEvent;
-import org.apache.helix.controller.stages.ClusterEventType;
+import org.apache.helix.common.DedupEventProcessor;
import org.apache.helix.model.ClusterConfig;
-import org.apache.helix.view.common.ViewAggregatorEventAttributes;
+import org.apache.helix.view.common.ClusterViewEvent;
import org.apache.helix.view.dataprovider.SourceClusterDataProvider;
import org.apache.helix.view.monitoring.ViewAggregatorMonitor;
+import org.apache.helix.zookeeper.zkclient.exception.ZkInterruptedException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -49,56 +50,58 @@
*/
public class HelixViewAggregator implements ClusterConfigChangeListener {
private static final Logger logger = LoggerFactory.getLogger(HelixViewAggregator.class);
- private static final int PROCESS_VIEW_CONFIG_CHANGE_BACKOFF_MS = 3 * 1000;
+ private static final long DEFAULT_INITIAL_EVENT_PROCESS_BACKOFF = 10;
+ private static final long DEFAULT_MAX_EVENT_PROCESS_BACKOFF = 5 * 1000;
private final String _viewClusterName;
private final HelixManager _viewClusterManager;
- private HelixDataAccessor _dataAccessor;
+ private final Map<String, SourceClusterDataProvider> _dataProviderMap;
// Worker that processes source cluster events and refresh view cluster
- private ClusterEventProcessor _aggregator;
- private boolean _refreshViewCluster;
+ private final DedupEventProcessor<ClusterViewEvent.Type, ClusterViewEvent> _aggregator;
+ private final AtomicBoolean _refreshViewCluster;
// Worker that processes view cluster config change
- private ClusterEventProcessor _viewConfigProcessor;
+ private final DedupEventProcessor<ClusterViewEvent.Type, ClusterViewEvent> _viewConfigProcessor;
+ private final ViewAggregatorMonitor _monitor;
- private Map<String, SourceClusterDataProvider> _dataProviderMap;
private ClusterConfig _curViewClusterConfig;
private Timer _viewClusterRefreshTimer;
private ViewClusterRefresher _viewClusterRefresher;
- private ViewAggregatorMonitor _monitor;
+ private HelixDataAccessor _dataAccessor;
public HelixViewAggregator(String viewClusterName, String zkAddr) {
_viewClusterName = viewClusterName;
- _dataProviderMap = new HashMap<>();
+ _dataProviderMap = new ConcurrentHashMap<>();
_viewClusterManager = HelixManagerFactory
.getZKHelixManager(_viewClusterName, generateHelixManagerInstanceName(_viewClusterName),
InstanceType.SPECTATOR, zkAddr);
_refreshViewCluster = new AtomicBoolean(false);
_monitor = new ViewAggregatorMonitor(viewClusterName);
- _aggregator = new ClusterEventProcessor(_viewClusterName, "Aggregator") {
+ _aggregator = new DedupEventProcessor<ClusterViewEvent.Type, ClusterViewEvent>(_viewClusterName,
+ "Aggregator") {
@Override
- public void handleEvent(ClusterEvent event) {
+ public void handleEvent(ClusterViewEvent event) {
handleSourceClusterEvent(event);
_monitor.recordProcessedSourceEvent();
}
};
- _viewConfigProcessor = new ClusterEventProcessor(_viewClusterName, "ViewConfigProcessor") {
+ _viewConfigProcessor = new DedupEventProcessor<ClusterViewEvent.Type, ClusterViewEvent>(_viewClusterName, "ViewConfigProcessor") {
@Override
- public void handleEvent(ClusterEvent event) {
+ public void handleEvent(ClusterViewEvent event) {
handleViewClusterConfigChange(event);
}
};
}
public String getAggregatorInstanceName() {
- return String
- .format("%s::%s", _viewClusterManager.getInstanceName(), hashCode());
+ return String.format("%s::%s", _viewClusterManager.getInstanceName(), hashCode());
}
/**
* Start controller main logic
- * @throws Exception
+ * @throws Exception when HelixViewAggregator fails to start. Will try to shut it down before
+ * exception is thrown out
*/
public void start() throws Exception {
_monitor.register();
@@ -110,16 +113,15 @@
// Setup manager
try {
_viewClusterManager.connect();
- _viewClusterManager.addClusterfigChangeListener(this);
_dataAccessor = _viewClusterManager.getHelixDataAccessor();
+ _viewClusterManager.addClusterfigChangeListener(this);
} catch (Exception e) {
+ shutdown();
throw new HelixException("Failed to connect view cluster helix manager", e);
}
// Set up view cluster refresher
- _viewClusterRefresher =
- new ViewClusterRefresher(_viewClusterName, _viewClusterManager.getHelixDataAccessor(),
- _dataProviderMap);
+ _viewClusterRefresher = new ViewClusterRefresher(_viewClusterName, _viewClusterManager.getHelixDataAccessor());
}
public void shutdown() {
@@ -144,22 +146,19 @@
logger.warn("ZK interrupted when disconnecting helix manager", zkintr);
} catch (Exception e) {
success = false;
- logger.error(String
- .format("Failed to disconnect helix manager for view cluster %s", _viewClusterName), e);
+ logger.error(String.format("Failed to disconnect helix manager for view cluster %s", _viewClusterName), e);
}
}
// Clean up all data providers
for (SourceClusterDataProvider provider : _dataProviderMap.values()) {
- logger
- .info(String.format("Shutting data provider for source cluster %s", provider.getName()));
+ logger.info(String.format("Shutting down data provider for source cluster %s", provider.getName()));
try {
provider.shutdown();
} catch (Exception e) {
success = false;
- logger.error(String
- .format("Failed to shutdown data provider %s for view cluster %s", provider.getName(),
- _viewClusterName), e);
+ logger.error(String.format("Failed to shutdown data provider %s for view cluster %s", provider.getName(),
+ _viewClusterName), e);
}
}
@@ -173,79 +172,100 @@
@PreFetch(enabled = false)
public void onClusterConfigChange(ClusterConfig clusterConfig, NotificationContext context) {
if (context != null && context.getType() != NotificationContext.Type.FINALIZE) {
- _viewConfigProcessor
- .queueEvent(new ClusterEvent(_viewClusterName, ClusterEventType.ClusterConfigChange));
+ _viewConfigProcessor.queueEvent(ClusterViewEvent.Type.ConfigChange,
+ new ClusterViewEvent(_viewClusterName, ClusterViewEvent.Type.ConfigChange));
} else {
- logger.info(String
- .format("Skip processing view cluster config change with notification context type %s",
- context == null ? "NoContext" : context.getType().name()));
+ logger.info(String.format("Skip processing view cluster config change with notification context type %s",
+ context == null ? "NoContext" : context.getType().name()));
}
}
- private void handleSourceClusterEvent(ClusterEvent event) {
- logger.info("Processing event from source cluster " + event.getClusterName());
+ private void handleSourceClusterEvent(ClusterViewEvent event) {
+ logger.info(String
+ .format("Processing event %s from source cluster %s.", event.getEventType().name(),
+ event.getClusterName()));
switch (event.getEventType()) {
- case LiveInstanceChange:
- case InstanceConfigChange:
- case ExternalViewChange:
- _refreshViewCluster = true;
- break;
- case ViewClusterPeriodicRefresh:
- if (!_refreshViewCluster) {
- logger.info("Skip refresh: No event happened since last refresh, and no force refresh.");
- return;
- }
- // mark source cluster as changed to trigger next refresh as we failed to refresh at
- // least some of the elements in view cluster
- logger.info("Refreshing cluster based on event " + event.getEventType().name());
- _refreshViewCluster = refreshViewCluster();
- break;
- default:
- logger.error(String.format("Unrecognized event type: %s", event.getEventType()));
- }
- }
-
- private synchronized void handleViewClusterConfigChange(ClusterEvent event) {
- logger.info("Processing view cluster event " + event.getEventType().name());
- switch (event.getEventType()) {
- case ClusterConfigChange:
- // TODO: when clusterEventProcessor supports delayed scheduling,
- // we should not have this head-of-line blocking but to have ClusterEventProcessor do the work.
- // Currently it's acceptable as we can endure delay in processing view cluster config change
- if (event.getAttribute(ViewAggregatorEventAttributes.EventProcessBackoff.name()) != null) {
- try {
- Thread.sleep(PROCESS_VIEW_CONFIG_CHANGE_BACKOFF_MS);
- } catch (InterruptedException e) {
- logger.warn("Interrupted when backing off during process view config change retry", e);
+ case ExternalViewChange:
+ case InstanceConfigChange:
+ case LiveInstanceChange:
+ _refreshViewCluster.set(true);
+ break;
+ case PeriodicViewRefresh:
+ if (!_refreshViewCluster.get()) {
+ logger.info("Skip refresh: No event happened since last refresh, and no force refresh.");
+ return;
}
+ // mark source cluster as changed to trigger next refresh as we failed to refresh at
+ // least some of the elements in view cluster
+ logger.info("Refreshing cluster based on event " + event.getEventType().name());
+ refreshViewCluster();
+ break;
+ default:
+ logger.error(String.format("Unrecognized event type: %s", event.getEventType()));
+ }
+ }
+
+ private void handleViewClusterConfigChange(ClusterViewEvent event) {
+ logger.info(String.format("Processing event %s for view cluster %s", event.getEventType().name(),
+ _viewClusterName));
+ if (event.getEventType() == ClusterViewEvent.Type.ConfigChange) {
+ // TODO: when DedupEventProcessor supports delayed scheduling,
+ // we should not have this head-of-line blocking but to have DedupEventProcessor do the work.
+ // Currently it's acceptable as we can endure delay in processing view cluster config change
+ try {
+ Thread.sleep(event.getEventProcessBackoff());
+ } catch (InterruptedException e) {
+ logger.warn("Interrupted when backing off during process view config change retry", e);
+ Thread.currentThread().interrupt();
}
+
// We always compare current cluster config with most up-to-date cluster config
+ boolean success;
ClusterConfig newClusterConfig =
_dataAccessor.getProperty(_dataAccessor.keyBuilder().clusterConfig());
- SourceClusterConfigChangeAction action =
- new SourceClusterConfigChangeAction(_curViewClusterConfig, newClusterConfig);
- action.computeAction();
+
+ if (newClusterConfig == null) {
+ logger.warn("Failed to read view cluster config");
+ success = false;
+ } else {
+ SourceClusterConfigChangeAction action =
+ new SourceClusterConfigChangeAction(_curViewClusterConfig, newClusterConfig);
+ action.computeAction();
+ success = processViewClusterConfigUpdate(action);
+ }
// If we fail to process action and should retry, re-queue event to retry
- if (!processViewClusterConfigUpdate(action)) {
+ if (!success) {
_monitor.recordViewConfigProcessFailure();
- event.addAttribute(ViewAggregatorEventAttributes.EventProcessBackoff.name(), true);
- _viewConfigProcessor.queueEvent(event);
+ long backoff = computeNextEventProcessBackoff(event.getEventProcessBackoff());
+ logger.info("Failed to process view cluster config change. Will retry in {} ms", backoff);
+ event.setEventProcessBackoff(backoff);
+ _viewConfigProcessor.queueEvent(event.getEventType(), event);
} else {
_curViewClusterConfig = newClusterConfig;
}
- break;
- default:
+ } else {
logger.error(String.format("Unrecognized event type: %s", event.getEventType()));
}
}
+ private long computeNextEventProcessBackoff(long currentBackoff) {
+ if (currentBackoff <= 0) {
+ return DEFAULT_INITIAL_EVENT_PROCESS_BACKOFF;
+ }
+
+ // Exponential backoff with ceiling
+ return currentBackoff * 2 > DEFAULT_MAX_EVENT_PROCESS_BACKOFF
+ ? DEFAULT_MAX_EVENT_PROCESS_BACKOFF
+ : currentBackoff * 2;
+ }
+
private class RefreshViewClusterTask extends TimerTask {
@Override
public void run() {
logger.info("Triggering view cluster refresh");
- _aggregator.queueEvent(
- new ClusterEvent(_viewClusterName, ClusterEventType.ViewClusterPeriodicRefresh));
+ _aggregator.queueEvent(ClusterViewEvent.Type.PeriodicViewRefresh,
+ new ClusterViewEvent(_viewClusterName, ClusterViewEvent.Type.PeriodicViewRefresh));
}
}
@@ -275,7 +295,13 @@
if (_dataProviderMap.containsKey(key)) {
try {
_dataProviderMap.get(key).shutdown();
- _dataProviderMap.remove(key);
+ synchronized (_dataProviderMap) {
+ _dataProviderMap.remove(key);
+ // upon successful removal of data provider, set refresh view cluster to true
+ // or if no event from source cluster happened before next refresh cycle, this
+ // removal will be missed.
+ _refreshViewCluster.set(true);
+ }
} catch (Exception e) {
success = false;
logger.warn(String.format("Failed to shutdown data provider %s, will retry", key));
@@ -302,6 +328,7 @@
}
}
+
if (action.shouldResetTimer()) {
logger.info(
"Resetting view cluster refresh timer at interval " + action.getCurrentRefreshPeriodMs());
@@ -312,7 +339,6 @@
/**
* Use ViewClusterRefresher to refresh ViewCluster.
- * @return true if needs retry, else false
*/
private void refreshViewCluster() {
long startRefreshMs = System.currentTimeMillis();
diff --git a/helix-view-aggregator/src/main/java/org/apache/helix/view/aggregator/HelixViewAggregatorMain.java b/helix-view-aggregator/src/main/java/org/apache/helix/view/aggregator/HelixViewAggregatorMain.java
index 7f28441..c5696a1 100644
--- a/helix-view-aggregator/src/main/java/org/apache/helix/view/aggregator/HelixViewAggregatorMain.java
+++ b/helix-view-aggregator/src/main/java/org/apache/helix/view/aggregator/HelixViewAggregatorMain.java
@@ -19,16 +19,104 @@
* under the License.
*/
+import java.util.logging.Level;
+import org.apache.commons.cli.CommandLine;
+import org.apache.commons.cli.CommandLineParser;
+import org.apache.commons.cli.GnuParser;
+import org.apache.commons.cli.HelpFormatter;
+import org.apache.commons.cli.Option;
+import org.apache.commons.cli.OptionBuilder;
+import org.apache.commons.cli.Options;
+import org.apache.commons.cli.ParseException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
public class HelixViewAggregatorMain {
private static Logger logger = LoggerFactory.getLogger(HelixViewAggregatorMain.class);
+ private static final String HELP = "help";
+ private static final String ZK_ADDR = "zookeeper-address";
+ private static final String VIEW_CLUSTER_NAME = "view-cluster-name";
+
+ private static void printUsage(Options cliOptions) {
+ HelpFormatter helpFormatter = new HelpFormatter();
+ helpFormatter.printHelp("java " + HelixViewAggregatorMain.class.getName(), cliOptions);
+ }
+
+ private static Options constructCommandLineOptions() {
+ Option helpOption =
+ OptionBuilder.withLongOpt(HELP).withDescription("Prints command-line options info")
+ .create();
+ helpOption.setArgs(0);
+ helpOption.setRequired(false);
+ helpOption.setArgName("print help message");
+
+ Option zkServerOption =
+ OptionBuilder.withLongOpt(ZK_ADDR).withDescription("Provide zookeeper address")
+ .create();
+ zkServerOption.setArgs(1);
+ zkServerOption.setRequired(true);
+ zkServerOption.setArgName("ZooKeeper server connection string (Required)");
+
+ Option portOption =
+ OptionBuilder.withLongOpt(VIEW_CLUSTER_NAME).withDescription("Name of the view cluster")
+ .create();
+ portOption.setArgs(1);
+ portOption.setRequired(true);
+ portOption.setArgName("Name of the view cluster");
+
+ Options options = new Options();
+ options.addOption(helpOption);
+ options.addOption(zkServerOption);
+ options.addOption(portOption);
+
+ return options;
+ }
/**
* @param args
* @throws Exception
*/
public static void main(String[] args) throws Exception {
- // TODO: implement this function as launching HelixViewAggregator using CLI
+ java.util.logging.Logger topJavaLogger = java.util.logging.Logger.getLogger("");
+ topJavaLogger.setLevel(Level.INFO);
+ CommandLineParser cliParser = new GnuParser();
+ Options cliOptions = constructCommandLineOptions();
+ CommandLine cmd = null;
+
+ try {
+ cmd = cliParser.parse(cliOptions, args);
+ } catch (ParseException pe) {
+ logger.error("HelixViewAggregatorMain: failed to parse command-line options.", pe);
+ printUsage(cliOptions);
+ System.exit(1);
+ }
+
+ String zkAddr, viewClusterName;
+ if (cmd.hasOption(HELP)) {
+ printUsage(cliOptions);
+ return;
+ } else {
+ zkAddr = String.valueOf(cmd.getOptionValue(ZK_ADDR));
+ viewClusterName = String.valueOf(cmd.getOptionValue(VIEW_CLUSTER_NAME));
+ }
+
+ final HelixViewAggregator aggregator = new HelixViewAggregator(viewClusterName, zkAddr);
+ Runtime.getRuntime().addShutdownHook(new Thread(new Runnable() {
+ @Override
+ public void run() {
+ aggregator.shutdown();
+ }
+ }));
+
+ try {
+ aggregator.start();
+ Thread.currentThread().join();
+ } catch (Exception e) {
+ logger.error("HelixViewAggregator caught exception.", e);
+ } finally {
+ aggregator.shutdown();
+ }
+
+ // Service should not exit successfully
+ System.exit(1);
}
}
diff --git a/helix-view-aggregator/src/main/java/org/apache/helix/view/aggregator/SourceClusterConfigChangeAction.java b/helix-view-aggregator/src/main/java/org/apache/helix/view/aggregator/SourceClusterConfigChangeAction.java
index 7345769..e7f28fc 100644
--- a/helix-view-aggregator/src/main/java/org/apache/helix/view/aggregator/SourceClusterConfigChangeAction.java
+++ b/helix-view-aggregator/src/main/java/org/apache/helix/view/aggregator/SourceClusterConfigChangeAction.java
@@ -35,12 +35,12 @@
* be reset or not
*/
public class SourceClusterConfigChangeAction {
- private ClusterConfig _oldConfig;
- private ClusterConfig _newConfig;
+ private final List<ViewClusterSourceConfig> _toAdd;
+ private final List<ViewClusterSourceConfig> _toDelete;
+ private final ClusterConfig _oldConfig;
+ private final ClusterConfig _newConfig;
private boolean _shouldResetTimer;
- private List<ViewClusterSourceConfig> _toAdd;
- private List<ViewClusterSourceConfig> _toDelete;
public SourceClusterConfigChangeAction(ClusterConfig oldConfig, ClusterConfig newConfig) {
_oldConfig = oldConfig;
@@ -79,22 +79,19 @@
if (_oldConfig != null) {
for (ViewClusterSourceConfig oldConfig : _oldConfig.getViewClusterSourceConfigs()) {
- oldConfigMap
- .put(generateConfigMapKey(oldConfig.getName(), oldConfig.getZkAddress()), oldConfig);
+ oldConfigMap.put(generateConfigMapKey(oldConfig.getName(), oldConfig.getZkAddress()), oldConfig);
}
}
for (ViewClusterSourceConfig currentConfig : _newConfig.getViewClusterSourceConfigs()) {
- currentConfigMap
- .put(generateConfigMapKey(currentConfig.getName(), currentConfig.getZkAddress()),
- currentConfig);
+ currentConfigMap.put(
+ generateConfigMapKey(currentConfig.getName(), currentConfig.getZkAddress()), currentConfig);
}
// Configs whose properties-to-aggregate got modified should have its data provider recreated
for (Map.Entry<String, ViewClusterSourceConfig> entry : currentConfigMap.entrySet()) {
if (oldConfigMap.containsKey(entry.getKey())) {
- Set<PropertyType> oldPropertySet =
- new HashSet<>(oldConfigMap.get(entry.getKey()).getProperties());
+ Set<PropertyType> oldPropertySet = new HashSet<>(oldConfigMap.get(entry.getKey()).getProperties());
Set<PropertyType> currentPropertySet = new HashSet<>(entry.getValue().getProperties());
if (!oldPropertySet.equals(currentPropertySet)) {
_toAdd.add(new ViewClusterSourceConfig(entry.getValue()));
diff --git a/helix-view-aggregator/src/main/java/org/apache/helix/view/aggregator/ViewClusterRefresher.java b/helix-view-aggregator/src/main/java/org/apache/helix/view/aggregator/ViewClusterRefresher.java
index 58a35e6..1885a73 100644
--- a/helix-view-aggregator/src/main/java/org/apache/helix/view/aggregator/ViewClusterRefresher.java
+++ b/helix-view-aggregator/src/main/java/org/apache/helix/view/aggregator/ViewClusterRefresher.java
@@ -27,6 +27,7 @@
import java.util.List;
import java.util.Map;
import java.util.Set;
+import java.util.TreeMap;
import org.apache.helix.HelixDataAccessor;
import org.apache.helix.HelixProperty;
import org.apache.helix.PropertyKey;
@@ -45,7 +46,7 @@
private static final Logger logger = LoggerFactory.getLogger(ViewClusterRefresher.class);
private String _viewClusterName;
private HelixDataAccessor _viewClusterDataAccessor;
- private Map<String, SourceClusterDataProvider> _dataProviderMap;
+ private Set<SourceClusterDataProvider> _dataProviderView;
// These 3 caches stores objects that are pushed to view cluster (write-through) cache,
// thus we don't need to read from view cluster everytime we refresh it.
@@ -53,11 +54,9 @@
private Map<String, HelixProperty> _viewClusterInstanceConfigCache;
private Map<String, HelixProperty> _viewClusterExternalViewCache;
- public ViewClusterRefresher(String viewClusterName, HelixDataAccessor viewClusterDataAccessor,
- Map<String, SourceClusterDataProvider> dataProviderMap) {
+ public ViewClusterRefresher(String viewClusterName, HelixDataAccessor viewClusterDataAccessor) {
_viewClusterName = viewClusterName;
_viewClusterDataAccessor = viewClusterDataAccessor;
- _dataProviderMap = dataProviderMap;
_viewClusterLiveInstanceCache = new HashMap<>();
_viewClusterInstanceConfigCache = new HashMap<>();
_viewClusterExternalViewCache = new HashMap<>();
@@ -108,6 +107,10 @@
}
}
+ public void updateProviderView(Set<SourceClusterDataProvider> dataProviderView) {
+ _dataProviderView = dataProviderView;
+ }
+
/**
* Create / update / delete property of given type in view cluster, based on data change from
* source clusters.
@@ -133,7 +136,7 @@
listedNamesInView =
new HashSet<>(_viewClusterDataAccessor.getChildNames(getPropertyKey(propertyType, null)));
// Prepare data
- for (SourceClusterDataProvider provider : _dataProviderMap.values()) {
+ for (SourceClusterDataProvider provider : _dataProviderView) {
if (!provider.getPropertiesToAggregate().contains(propertyType)) {
logger.info(String
.format("SourceCluster %s does not need to aggregate %s, skip.", provider.getName(),
@@ -153,16 +156,10 @@
listedNamesInSource.addAll(provider.getExternalViewNames());
for (Map.Entry<String, ExternalView> entry : provider.getExternalViews().entrySet()) {
String resourceName = entry.getKey();
- ExternalView resourceEV = entry.getValue();
- if (sourceProperties.containsKey(resourceName)) {
- // Merge external views if we already have a record
- mergeExternalViews((ExternalView) sourceProperties.get(resourceName), resourceEV);
- } else {
- // merging simple fields and list fields are meaningless and would cause confusion
- resourceEV.getRecord().getSimpleFields().clear();
- resourceEV.getRecord().getListFields().clear();
- sourceProperties.put(resourceName, resourceEV);
+ if (!sourceProperties.containsKey(resourceName)) {
+ sourceProperties.put(resourceName, new ExternalView(resourceName));
}
+ mergeExternalViews((ExternalView) sourceProperties.get(resourceName), entry.getValue());
}
break;
default:
@@ -180,6 +177,8 @@
.format("Caught exception during refreshing %s for view cluster %s", propertyType.name(),
_viewClusterName), e);
}
+ logRefreshResult(propertyType, ok);
+
return ok;
}
@@ -199,13 +198,11 @@
source.getId(), toMerge.getId()));
}
for (String partitionName : toMerge.getPartitionSet()) {
+ // Deep copying state map to avoid modifying source cache
if (!source.getPartitionSet().contains(partitionName)) {
- source.setStateMap(partitionName, toMerge.getStateMap(partitionName));
- } else {
- Map<String, String> mergedPartitionState = source.getStateMap(partitionName);
- mergedPartitionState.putAll(toMerge.getStateMap(partitionName));
- source.setStateMap(partitionName, mergedPartitionState);
+ source.setStateMap(partitionName, new TreeMap<String, String>());
}
+ source.getStateMap(partitionName).putAll(toMerge.getStateMap(partitionName));
}
}
@@ -273,7 +270,6 @@
Set<String> viewPropertyNames, Set<String> sourcePropertyNames,
Map<String, T> cachedSourceProperties, Map<String, T> viewClusterPropertyCache) {
boolean ok = true;
-
// Calculate diff
ClusterPropertyDiff diff =
calculatePropertyDiff(viewPropertyNames, sourcePropertyNames, cachedSourceProperties,
@@ -360,7 +356,8 @@
private <T extends HelixProperty> boolean addOrUpdateProperties(
List<PropertyKey> keysToAddOrUpdate, List<HelixProperty> objects, Map<String, T> cache) {
boolean ok = true;
- logger.info(String.format("AddOrUpdate objects: %s", keysToAddOrUpdate));
+ logger.info(
+ String.format("AddOrUpdate %s objects: %s", keysToAddOrUpdate.size(), keysToAddOrUpdate));
boolean[] addOrUpdateResults = _viewClusterDataAccessor.setChildren(keysToAddOrUpdate, objects);
for (int i = 0; i < addOrUpdateResults.length; i++) {
if (!addOrUpdateResults[i]) {
@@ -389,7 +386,7 @@
private <T extends HelixProperty> boolean deleteProperties(List<PropertyKey> keysToDelete,
Map<String, T> cache) {
boolean ok = true;
- logger.info(String.format("Deleting objects: %s", keysToDelete));
+ logger.info(String.format("Deleting %s objects: %s", keysToDelete.size(), keysToDelete));
for (PropertyKey key : keysToDelete) {
if (!_viewClusterDataAccessor.removeProperty(key)) {
// Don't remove item from cache yet - will retry during next refresh
diff --git a/helix-view-aggregator/src/main/java/org/apache/helix/view/common/ClusterViewEvent.java b/helix-view-aggregator/src/main/java/org/apache/helix/view/common/ClusterViewEvent.java
new file mode 100644
index 0000000..42cedde
--- /dev/null
+++ b/helix-view-aggregator/src/main/java/org/apache/helix/view/common/ClusterViewEvent.java
@@ -0,0 +1,56 @@
+package org.apache.helix.view.common;
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+public class ClusterViewEvent {
+ public enum Type {
+ LiveInstanceChange,
+ ExternalViewChange,
+ InstanceConfigChange,
+ ConfigChange,
+ PeriodicViewRefresh
+ }
+
+ private String _clusterName;
+ private Type _eventType;
+ private long _eventProcessBackoffMs;
+
+ public ClusterViewEvent(String clusterName, Type eventType) {
+ _clusterName = clusterName;
+ _eventType = eventType;
+ _eventProcessBackoffMs = 0;
+ }
+
+ public String getClusterName() {
+ return _clusterName;
+ }
+
+ public Type getEventType() {
+ return _eventType;
+ }
+
+ public void setEventProcessBackoff(long backoffMs) {
+ _eventProcessBackoffMs = backoffMs;
+ }
+
+ public long getEventProcessBackoff() {
+ return _eventProcessBackoffMs;
+ }
+}
diff --git a/helix-view-aggregator/src/main/java/org/apache/helix/view/common/ViewAggregatorEventAttributes.java b/helix-view-aggregator/src/main/java/org/apache/helix/view/common/ViewAggregatorEventAttributes.java
deleted file mode 100644
index f550f1c..0000000
--- a/helix-view-aggregator/src/main/java/org/apache/helix/view/common/ViewAggregatorEventAttributes.java
+++ /dev/null
@@ -1,24 +0,0 @@
-package org.apache.helix.view.common;
-
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-public enum ViewAggregatorEventAttributes {
- EventProcessBackoff,
-}
diff --git a/helix-view-aggregator/src/main/java/org/apache/helix/view/dataprovider/SourceClusterDataProvider.java b/helix-view-aggregator/src/main/java/org/apache/helix/view/dataprovider/SourceClusterDataProvider.java
index 897b8ef..c581e68 100644
--- a/helix-view-aggregator/src/main/java/org/apache/helix/view/dataprovider/SourceClusterDataProvider.java
+++ b/helix-view-aggregator/src/main/java/org/apache/helix/view/dataprovider/SourceClusterDataProvider.java
@@ -20,10 +20,8 @@
*/
import java.util.List;
-import org.I0Itec.zkclient.exception.ZkInterruptedException;
import org.apache.helix.HelixConstants;
import org.apache.helix.HelixDataAccessor;
-import org.apache.helix.HelixException;
import org.apache.helix.HelixManager;
import org.apache.helix.HelixManagerFactory;
import org.apache.helix.InstanceType;
@@ -35,14 +33,13 @@
import org.apache.helix.api.listeners.InstanceConfigChangeListener;
import org.apache.helix.api.listeners.LiveInstanceChangeListener;
import org.apache.helix.api.listeners.PreFetch;
-import org.apache.helix.common.BasicClusterDataCache;
-import org.apache.helix.common.ClusterEventProcessor;
-import org.apache.helix.controller.stages.AttributeName;
-import org.apache.helix.controller.stages.ClusterEvent;
-import org.apache.helix.controller.stages.ClusterEventType;
+import org.apache.helix.common.DedupEventProcessor;
+import org.apache.helix.common.caches.BasicClusterDataCache;
import org.apache.helix.model.ExternalView;
import org.apache.helix.model.InstanceConfig;
import org.apache.helix.model.LiveInstance;
+import org.apache.helix.view.common.ClusterViewEvent;
+import org.apache.helix.zookeeper.zkclient.exception.ZkInterruptedException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -56,14 +53,14 @@
private static final Logger LOG = LoggerFactory.getLogger(SourceClusterDataProvider.class);
private final HelixManager _helixManager;
- private final ClusterEventProcessor _eventProcessor;
+ private final DedupEventProcessor<ClusterViewEvent.Type, ClusterViewEvent> _eventProcessor;
protected ViewClusterSourceConfig _sourceClusterConfig;
private HelixDataAccessor _dataAccessor;
private PropertyKey.Builder _propertyKeyBuilder;
public SourceClusterDataProvider(ViewClusterSourceConfig config,
- ClusterEventProcessor eventProcessor) {
+ DedupEventProcessor<ClusterViewEvent.Type, ClusterViewEvent> eventProcessor) {
super(config.getName());
_eventProcessor = eventProcessor;
_sourceClusterConfig = config;
@@ -112,8 +109,8 @@
}
_dataAccessor = _helixManager.getHelixDataAccessor();
_propertyKeyBuilder = _dataAccessor.keyBuilder();
- LOG.info(String
- .format("%s started. Source cluster detail: %s", _helixManager.getInstanceName(),
+ LOG.info(String.format("Data provider %s (%s) started. Source cluster detail: %s",
+ _helixManager.getInstanceName(), hashCode(),
_sourceClusterConfig.toString()));
} catch(Exception e) {
shutdown();
@@ -130,7 +127,7 @@
try {
_helixManager.disconnect();
LOG.info(
- String.format("Data provider %s shutdown cleanly.", _helixManager.getInstanceName()));
+ String.format("Data provider %s (%s) shutdown cleanly.", _helixManager.getInstanceName(), hashCode()));
} catch (ZkInterruptedException e) {
// OK
}
@@ -175,7 +172,7 @@
@PreFetch(enabled = false)
public void onInstanceConfigChange(List<InstanceConfig> instanceConfigs,
NotificationContext context) {
- queueEvent(context, ClusterEventType.InstanceConfigChange,
+ queueEvent(context, ClusterViewEvent.Type.InstanceConfigChange,
HelixConstants.ChangeType.INSTANCE_CONFIG);
}
@@ -183,7 +180,7 @@
@PreFetch(enabled = false)
public void onLiveInstanceChange(List<LiveInstance> liveInstances,
NotificationContext changeContext) {
- queueEvent(changeContext, ClusterEventType.LiveInstanceChange,
+ queueEvent(changeContext, ClusterViewEvent.Type.LiveInstanceChange,
HelixConstants.ChangeType.LIVE_INSTANCE);
}
@@ -191,21 +188,21 @@
@PreFetch(enabled = false)
public void onExternalViewChange(List<ExternalView> externalViewList,
NotificationContext changeContext) {
- queueEvent(changeContext, ClusterEventType.ExternalViewChange,
+ queueEvent(changeContext, ClusterViewEvent.Type.ExternalViewChange,
HelixConstants.ChangeType.EXTERNAL_VIEW);
}
- private void queueEvent(NotificationContext context,
- ClusterEventType clusterEventType, HelixConstants.ChangeType cacheChangeType)
+ private void queueEvent(NotificationContext context, ClusterViewEvent.Type changeType,
+ HelixConstants.ChangeType cacheChangeType)
throws IllegalStateException {
// TODO: in case of FINALIZE, if we are not shutdown, re-connect helix manager and report error
if (context != null && context.getType() != NotificationContext.Type.FINALIZE) {
notifyDataChange(cacheChangeType);
- _eventProcessor.queueEvent(new ClusterEvent(_clusterName, clusterEventType));
+ _eventProcessor.queueEvent(changeType, new ClusterViewEvent(_clusterName, changeType));
} else {
- LOG.info(String.format("Skip queuing event. EventType: %s, ChangeType: %s, ContextType: %s",
- clusterEventType.name(), cacheChangeType.name(),
- context == null ? "NoContext" : context.getType().name()));
+ LOG.info("Skip queuing event from source cluster {}. ChangeType: {}, ContextType: {}",
+ _clusterName, cacheChangeType.name(),
+ context == null ? "NoContext" : context.getType().name());
}
}
diff --git a/helix-view-aggregator/src/test/java/org/apache/helix/view/aggregator/TestViewClusterRefresher.java b/helix-view-aggregator/src/test/java/org/apache/helix/view/aggregator/TestViewClusterRefresher.java
index 0d0af22..2a069ee 100644
--- a/helix-view-aggregator/src/test/java/org/apache/helix/view/aggregator/TestViewClusterRefresher.java
+++ b/helix-view-aggregator/src/test/java/org/apache/helix/view/aggregator/TestViewClusterRefresher.java
@@ -21,11 +21,13 @@
import java.util.ArrayList;
import java.util.Arrays;
+import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
+import java.util.concurrent.atomic.AtomicInteger;
import org.apache.helix.HelixConstants;
import org.apache.helix.HelixDataAccessor;
import org.apache.helix.HelixProperty;
@@ -33,6 +35,7 @@
import org.apache.helix.PropertyKey;
import org.apache.helix.PropertyType;
import org.apache.helix.api.config.ViewClusterSourceConfig;
+import org.apache.helix.common.ZkTestBase;
import org.apache.helix.model.ExternalView;
import org.apache.helix.model.InstanceConfig;
import org.apache.helix.model.LiveInstance;
@@ -41,20 +44,18 @@
import org.testng.Assert;
import org.testng.annotations.Test;
-public class TestViewClusterRefresher {
+public class TestViewClusterRefresher extends ZkTestBase {
private static final String viewClusterName = "viewCluster";
private static final int numSourceCluster = 3;
private static final int numInstancePerSourceCluster = 2;
private static final int numExternalViewPerSourceCluster = 3;
private static final int numPartition = 3;
- private static final List<PropertyType> defaultProperties = Arrays.asList(
- new PropertyType[] { PropertyType.LIVEINSTANCES, PropertyType.INSTANCES,
- PropertyType.EXTERNALVIEW
- });
+ private static final List<PropertyType> defaultProperties =
+ Arrays.asList(PropertyType.LIVEINSTANCES, PropertyType.INSTANCES, PropertyType.EXTERNALVIEW);
- private class CounterBasedMockAccessor extends MockAccessor {
- private int _setCount;
- private int _removeCount;
+ private static class CounterBasedMockAccessor extends MockAccessor {
+ private AtomicInteger _setCount = new AtomicInteger(0);
+ private AtomicInteger _removeCount = new AtomicInteger(0);
public CounterBasedMockAccessor(String clusterName) {
super(clusterName);
@@ -62,40 +63,40 @@
}
public void resetCounters() {
- _setCount = 0;
- _removeCount = 0;
+ _setCount.set(0);
+ _removeCount.set(0);
}
@Override
public boolean setProperty(PropertyKey key, HelixProperty value) {
- _setCount += 1;
+ _setCount.incrementAndGet();
return super.setProperty(key, value);
}
@Override
public boolean removeProperty(PropertyKey key) {
- _removeCount += 1;
+ _removeCount.incrementAndGet();
return super.removeProperty(key);
}
public int getSetCount() {
- return _setCount;
+ return _setCount.get();
}
public int getRemoveCount() {
- return _removeCount;
+ return _removeCount.get();
}
}
@Test
public void testRefreshWithNoChange() {
- CounterBasedMockAccessor viewClusterDataAccessor =
- new CounterBasedMockAccessor(viewClusterName);
+ CounterBasedMockAccessor viewClusterDataAccessor = new CounterBasedMockAccessor(viewClusterName);
Map<String, SourceClusterDataProvider> dataProviderMap = new HashMap<>();
createMockDataProviders(dataProviderMap);
ViewClusterRefresher refresher =
- new ViewClusterRefresher(viewClusterName, viewClusterDataAccessor, dataProviderMap);
+ new ViewClusterRefresher(viewClusterName, viewClusterDataAccessor);
+ refresher.updateProviderView(new HashSet<>(dataProviderMap.values()));
// Refresh an empty view cluster
Assert.assertTrue(refresher.refreshPropertiesInViewCluster(PropertyType.LIVEINSTANCES));
@@ -111,14 +112,14 @@
}
@Test
- public void testRefreshWithInstanceChange() {
- CounterBasedMockAccessor viewClusterDataAccessor =
- new CounterBasedMockAccessor(viewClusterName);
+ public void testRefreshWithInstanceChange() throws InterruptedException {
+ CounterBasedMockAccessor viewClusterDataAccessor = new CounterBasedMockAccessor(viewClusterName);
Map<String, SourceClusterDataProvider> dataProviderMap = new HashMap<>();
createMockDataProviders(dataProviderMap);
ViewClusterRefresher refresher =
- new ViewClusterRefresher(viewClusterName, viewClusterDataAccessor, dataProviderMap);
+ new ViewClusterRefresher(viewClusterName, viewClusterDataAccessor);
+ refresher.updateProviderView(new HashSet<>(dataProviderMap.values()));
MockSourceClusterDataProvider sampleProvider =
(MockSourceClusterDataProvider) dataProviderMap.get("cluster0");
@@ -155,13 +156,13 @@
@Test
public void testRefreshWithExternalViewChange() {
- CounterBasedMockAccessor accessor =
- new CounterBasedMockAccessor(viewClusterName);
+ CounterBasedMockAccessor accessor = new CounterBasedMockAccessor(viewClusterName);
Map<String, SourceClusterDataProvider> dataProviderMap = new HashMap<>();
createMockDataProviders(dataProviderMap);
ViewClusterRefresher refresher =
- new ViewClusterRefresher(viewClusterName, accessor, dataProviderMap);
+ new ViewClusterRefresher(viewClusterName, accessor);
+ refresher.updateProviderView(new HashSet<>(dataProviderMap.values()));
MockSourceClusterDataProvider sampleProvider =
(MockSourceClusterDataProvider) dataProviderMap.get("cluster0");
@@ -196,13 +197,13 @@
@Test
public void testRefreshWithProviderChange() {
- CounterBasedMockAccessor viewClusterDataAccessor =
- new CounterBasedMockAccessor(viewClusterName);
+ CounterBasedMockAccessor viewClusterDataAccessor = new CounterBasedMockAccessor(viewClusterName);
Map<String, SourceClusterDataProvider> dataProviderMap = new HashMap<>();
createMockDataProviders(dataProviderMap);
ViewClusterRefresher refresher =
- new ViewClusterRefresher(viewClusterName, viewClusterDataAccessor, dataProviderMap);
+ new ViewClusterRefresher(viewClusterName, viewClusterDataAccessor);
+ refresher.updateProviderView(new HashSet<>(dataProviderMap.values()));
MockSourceClusterDataProvider sampleProvider =
(MockSourceClusterDataProvider) dataProviderMap.get("cluster0");
@@ -217,7 +218,7 @@
// remove InstanceConfig and ExternalView requirement from sample provider
sampleProvider.getConfig()
- .setProperties(Arrays.asList(new PropertyType[] { PropertyType.LIVEINSTANCES }));
+ .setProperties(Collections.singletonList(PropertyType.LIVEINSTANCES));
// Refresh again
Assert.assertTrue(refresher.refreshPropertiesInViewCluster(PropertyType.LIVEINSTANCES));
@@ -238,7 +239,7 @@
MockSourceClusterDataProvider mockProvider = (MockSourceClusterDataProvider) provider;
if (mockProvider != sampleProvider) {
mockProvider.getConfig().setProperties(Arrays
- .asList(new PropertyType[] { PropertyType.LIVEINSTANCES, PropertyType.INSTANCES }));
+ .asList(PropertyType.LIVEINSTANCES, PropertyType.INSTANCES));
}
}
@@ -300,7 +301,7 @@
for (int i = 0; i < numSourceCluster; i++) {
String sourceClusterName = "cluster" + i;
ViewClusterSourceConfig sourceConfig =
- new ViewClusterSourceConfig(sourceClusterName, "", defaultProperties);
+ new ViewClusterSourceConfig(sourceClusterName, ZK_ADDR, defaultProperties);
MockSourceClusterDataProvider provider =
new MockSourceClusterDataProvider(sourceConfig, null);
List<LiveInstance> liveInstanceList = new ArrayList<>();
diff --git a/helix-view-aggregator/src/test/java/org/apache/helix/view/dataprovider/TestDataProviderUtil.java b/helix-view-aggregator/src/test/java/org/apache/helix/view/dataprovider/DataProviderTestUtil.java
similarity index 88%
rename from helix-view-aggregator/src/test/java/org/apache/helix/view/dataprovider/TestDataProviderUtil.java
rename to helix-view-aggregator/src/test/java/org/apache/helix/view/dataprovider/DataProviderTestUtil.java
index a9f8dba..3369420 100644
--- a/helix-view-aggregator/src/test/java/org/apache/helix/view/dataprovider/TestDataProviderUtil.java
+++ b/helix-view-aggregator/src/test/java/org/apache/helix/view/dataprovider/DataProviderTestUtil.java
@@ -26,13 +26,11 @@
import org.apache.helix.api.config.ViewClusterSourceConfig;
import org.apache.helix.model.ClusterConfig;
-public class TestDataProviderUtil {
+public class DataProviderTestUtil {
private static final int viewClusterRefreshPeriod = 10;
private static final String testZkAddr = "localhost:1010";
- private static final List<PropertyType> testProperties = Arrays.asList(
- new PropertyType[] { PropertyType.LIVEINSTANCES, PropertyType.EXTERNALVIEW,
- PropertyType.INSTANCES
- });
+ private static final List<PropertyType> testProperties =
+ Arrays.asList(PropertyType.LIVEINSTANCES, PropertyType.EXTERNALVIEW, PropertyType.INSTANCES);
public static ClusterConfig createDefaultViewClusterConfig(String viewClusterName,
int numSourceCluster) {
diff --git a/helix-view-aggregator/src/test/java/org/apache/helix/view/dataprovider/TestSourceClusterConfigChangeAction.java b/helix-view-aggregator/src/test/java/org/apache/helix/view/dataprovider/TestSourceClusterConfigChangeAction.java
index db8ddc2..16a364f 100644
--- a/helix-view-aggregator/src/test/java/org/apache/helix/view/dataprovider/TestSourceClusterConfigChangeAction.java
+++ b/helix-view-aggregator/src/test/java/org/apache/helix/view/dataprovider/TestSourceClusterConfigChangeAction.java
@@ -35,7 +35,7 @@
@Test
public void testActionComputationOnStartup() {
- ClusterConfig config = TestDataProviderUtil.createDefaultViewClusterConfig(viewClusterName, 2);
+ ClusterConfig config = DataProviderTestUtil.createDefaultViewClusterConfig(viewClusterName, 2);
SourceClusterConfigChangeAction action = new SourceClusterConfigChangeAction(null, config);
action.computeAction();
Assert.assertEquals(action.getConfigsToAdd().size(),
@@ -50,7 +50,7 @@
@Test
public void testActionComputationOnConfigModified() {
- ClusterConfig config = TestDataProviderUtil.createDefaultViewClusterConfig(viewClusterName, 2);
+ ClusterConfig config = DataProviderTestUtil.createDefaultViewClusterConfig(viewClusterName, 2);
ClusterConfig newConfig = new ClusterConfig(viewClusterName);
newConfig.setViewCluster();
@@ -91,7 +91,7 @@
@Test
public void testActionComputationNoChange() {
- ClusterConfig config = TestDataProviderUtil.createDefaultViewClusterConfig(viewClusterName, 2);
+ ClusterConfig config = DataProviderTestUtil.createDefaultViewClusterConfig(viewClusterName, 2);
SourceClusterConfigChangeAction action = new SourceClusterConfigChangeAction(config, config);
Assert.assertEquals(action.getConfigsToAdd().size(), 0);
Assert.assertEquals(action.getConfigsToDelete().size(), 0);
@@ -100,7 +100,7 @@
@Test
public void testActionComputationInvalidInitialization() {
- ClusterConfig config = TestDataProviderUtil.createDefaultViewClusterConfig(viewClusterName, 2);
+ ClusterConfig config = DataProviderTestUtil.createDefaultViewClusterConfig(viewClusterName, 2);
ClusterConfig badConfig = new ClusterConfig(viewClusterName);
try {
SourceClusterConfigChangeAction action = new SourceClusterConfigChangeAction(config, null);
diff --git a/helix-view-aggregator/src/test/java/org/apache/helix/view/integration/TestHelixViewAggregator.java b/helix-view-aggregator/src/test/java/org/apache/helix/view/integration/TestHelixViewAggregator.java
index eed25b9..9ffb906 100644
--- a/helix-view-aggregator/src/test/java/org/apache/helix/view/integration/TestHelixViewAggregator.java
+++ b/helix-view-aggregator/src/test/java/org/apache/helix/view/integration/TestHelixViewAggregator.java
@@ -35,7 +35,6 @@
import org.apache.helix.model.ClusterConfig;
import org.apache.helix.model.Message;
import org.apache.helix.participant.statemachine.StateModelParser;
-import org.apache.helix.view.ViewAggregatorIntegrationTestBase;
import org.apache.helix.view.mock.MockViewClusterSpectator;
import org.apache.helix.view.statemodel.DistViewAggregatorStateModel;
import org.testng.Assert;
@@ -172,8 +171,7 @@
// Wait for refresh and verify
Thread.sleep((_viewClusterRefreshPeriodSec + 2) * 1000);
verifyViewClusterEventChanges(false, false, true);
- Assert.assertEquals(_monitor.getPropertyNamesFromViewCluster(PropertyType.LIVEINSTANCES).size(),
- 0);
+ Assert.assertEquals(_monitor.getPropertyNamesFromViewCluster(PropertyType.LIVEINSTANCES).size(), 0);
_monitor.reset();
// Simulate view aggregator service crashed and got reset
diff --git a/helix-view-aggregator/src/test/java/org/apache/helix/view/integration/TestSourceClusterDataProvider.java b/helix-view-aggregator/src/test/java/org/apache/helix/view/integration/TestSourceClusterDataProvider.java
index 2f429e0..a8046bb 100644
--- a/helix-view-aggregator/src/test/java/org/apache/helix/view/integration/TestSourceClusterDataProvider.java
+++ b/helix-view-aggregator/src/test/java/org/apache/helix/view/integration/TestSourceClusterDataProvider.java
@@ -25,7 +25,8 @@
import org.apache.helix.PropertyType;
import org.apache.helix.api.config.ViewClusterSourceConfig;
import org.apache.helix.integration.manager.MockParticipantManager;
-import org.apache.helix.view.ViewAggregatorIntegrationTestBase;
+import org.apache.helix.tools.ClusterVerifiers.BestPossibleExternalViewVerifier;
+import org.apache.helix.tools.ClusterVerifiers.ZkHelixClusterVerifier;
import org.apache.helix.view.dataprovider.SourceClusterDataProvider;
import org.apache.helix.view.mock.MockClusterEventProcessor;
import org.testng.Assert;
@@ -34,31 +35,31 @@
public class TestSourceClusterDataProvider extends ViewAggregatorIntegrationTestBase {
private static final int numSourceCluster = 1;
private static final String stateModel = "MasterSlave";
- private static final String testResource = "restResource";
+ private static final String testResource = "testResource";
@Test
public void testSourceClusterDataProviderWatchAndRefresh() throws Exception {
String clusterName = _allSourceClusters.get(0);
- List<PropertyType> properties = Arrays.asList(
- new PropertyType[] { PropertyType.LIVEINSTANCES, PropertyType.EXTERNALVIEW,
- PropertyType.INSTANCES
- });
+ ZkHelixClusterVerifier clusterVerifier = new BestPossibleExternalViewVerifier.Builder(clusterName)
+ .setZkAddress(ZK_ADDR)
+ .build();
- ViewClusterSourceConfig sourceClusterConfig =
- new ViewClusterSourceConfig(clusterName, ZK_ADDR, properties);
+ List<PropertyType> properties = Arrays.asList(
+ PropertyType.LIVEINSTANCES, PropertyType.EXTERNALVIEW, PropertyType.INSTANCES);
+
+ ViewClusterSourceConfig sourceClusterConfig = new ViewClusterSourceConfig(clusterName, ZK_ADDR, properties);
MockClusterEventProcessor processor = new MockClusterEventProcessor(clusterName);
processor.start();
- SourceClusterDataProvider dataProvider =
- new SourceClusterDataProvider(sourceClusterConfig, processor);
+ SourceClusterDataProvider dataProvider = new SourceClusterDataProvider(sourceClusterConfig, processor);
// setup can be re-called
dataProvider.setup();
dataProvider.setup();
- Assert.assertEquals(new HashSet<>(dataProvider.getPropertiesToAggregate()),
- new HashSet<>(properties));
+ Assert.assertTrue(clusterVerifier.verify(1000));
+ Assert.assertEquals(new HashSet<>(dataProvider.getPropertiesToAggregate()), new HashSet<>(properties));
// When first connected, data provider will have some initial events
Assert.assertEquals(processor.getHandledExternalViewChangeCount(), 1);
@@ -67,34 +68,33 @@
processor.resetHandledEventCount();
// ListNames should work
- Assert.assertEquals(dataProvider.getInstanceConfigNames().size(), numPaticipantCount);
- Assert.assertEquals(dataProvider.getLiveInstanceNames().size(), numPaticipantCount);
+ Assert.assertEquals(dataProvider.getInstanceConfigNames().size(), numParticipant);
+ Assert.assertEquals(dataProvider.getLiveInstanceNames().size(), numParticipant);
Assert.assertEquals(dataProvider.getExternalViewNames().size(), 0);
processor.resetHandledEventCount();
// rebalance resource to check external view related events
- _gSetupTool.addResourceToCluster(clusterName, testResource, numPaticipantCount, stateModel);
+ _gSetupTool.addResourceToCluster(clusterName, testResource, numParticipant, stateModel);
_gSetupTool.rebalanceResource(clusterName, testResource, 3);
- Thread.sleep(1000);
+ Assert.assertTrue(clusterVerifier.verify(1000));
Assert.assertTrue(processor.getHandledExternalViewChangeCount() > 0);
Assert.assertEquals(dataProvider.getExternalViewNames().size(), 1);
// refresh data provider will have correct data loaded
dataProvider.refreshCache();
- Assert.assertEquals(dataProvider.getLiveInstances().size(), numPaticipantCount);
- Assert.assertEquals(dataProvider.getInstanceConfigMap().size(), numPaticipantCount);
+ Assert.assertEquals(dataProvider.getLiveInstances().size(), numParticipant);
+ Assert.assertEquals(dataProvider.getInstanceConfigMap().size(), numParticipant);
Assert.assertEquals(dataProvider.getExternalViews().size(), 1);
processor.resetHandledEventCount();
// Add additional participant will have corresponding change
String testParticipantName = "testParticipant";
_gSetupTool.addInstanceToCluster(clusterName, testParticipantName);
- MockParticipantManager participant =
- new MockParticipantManager(ZK_ADDR, clusterName, testParticipantName);
+ MockParticipantManager participant = new MockParticipantManager(ZK_ADDR, clusterName, testParticipantName);
participant.syncStart();
- Thread.sleep(500);
+ Assert.assertTrue(clusterVerifier.verify(500));
Assert.assertEquals(processor.getHandledInstanceConfigChangeCount(), 1);
Assert.assertEquals(processor.getHandledLiveInstancesChangeCount(), 1);
@@ -111,21 +111,20 @@
@Test
public void testSourceClusterDataProviderPropertyFilter() throws Exception {
String clusterName = _allSourceClusters.get(0);
- List<PropertyType> properties = Arrays.asList(
- new PropertyType[] { PropertyType.LIVEINSTANCES, PropertyType.EXTERNALVIEW });
+ ZkHelixClusterVerifier clusterVerifier = new BestPossibleExternalViewVerifier.Builder(clusterName)
+ .setZkAddress(ZK_ADDR)
+ .build();
- ViewClusterSourceConfig sourceClusterConfig =
- new ViewClusterSourceConfig(clusterName, ZK_ADDR, properties);
-
+ List<PropertyType> properties = Arrays.asList(PropertyType.LIVEINSTANCES, PropertyType.EXTERNALVIEW);
+ ViewClusterSourceConfig sourceClusterConfig = new ViewClusterSourceConfig(clusterName, ZK_ADDR, properties);
MockClusterEventProcessor processor = new MockClusterEventProcessor(clusterName);
processor.start();
- SourceClusterDataProvider dataProvider =
- new SourceClusterDataProvider(sourceClusterConfig, processor);
+ SourceClusterDataProvider dataProvider = new SourceClusterDataProvider(sourceClusterConfig, processor);
dataProvider.setup();
- Assert.assertEquals(new HashSet<>(dataProvider.getPropertiesToAggregate()),
- new HashSet<>(properties));
+ Assert.assertTrue(clusterVerifier.verify(1000));
+ Assert.assertEquals(new HashSet<>(dataProvider.getPropertiesToAggregate()), new HashSet<>(properties));
// When first connected, data provider will have some initial events, but InstanceConfig
// will be filtered out since its not in properties
diff --git a/helix-view-aggregator/src/test/java/org/apache/helix/view/integration/ViewAggregatorIntegrationTestBase.java b/helix-view-aggregator/src/test/java/org/apache/helix/view/integration/ViewAggregatorIntegrationTestBase.java
index 272ef7a..26afeef 100644
--- a/helix-view-aggregator/src/test/java/org/apache/helix/view/integration/ViewAggregatorIntegrationTestBase.java
+++ b/helix-view-aggregator/src/test/java/org/apache/helix/view/integration/ViewAggregatorIntegrationTestBase.java
@@ -1,4 +1,4 @@
-package org.apache.helix.view;
+package org.apache.helix.view.integration;
/*
* Licensed to the Apache Software Foundation (ASF) under one
@@ -21,15 +21,15 @@
import java.util.ArrayList;
import java.util.List;
-import org.apache.helix.integration.common.ZkIntegrationTestBase;
+import org.apache.helix.common.ZkTestBase;
import org.apache.helix.integration.manager.ClusterControllerManager;
import org.apache.helix.integration.manager.MockParticipantManager;
import org.testng.annotations.AfterClass;
import org.testng.annotations.BeforeClass;
-public class ViewAggregatorIntegrationTestBase extends ZkIntegrationTestBase {
+public class ViewAggregatorIntegrationTestBase extends ZkTestBase {
protected static final int numSourceCluster = 2;
- protected static final int numPaticipantCount = 3;
+ protected static final int numParticipant = 3;
protected static final String testSourceClusterNamePrefix = "testSourceCluster";
protected static final String testParticipantNamePrefix = "testParticipant";
protected static final String testControllerNamePrefix = "testController";
@@ -46,11 +46,11 @@
String.format("%s-%s-%s", testSourceClusterNamePrefix, this.hashCode(), i);
_gSetupTool.addCluster(clusterName, false);
// Setup participants
- for (int j = 0; j < numPaticipantCount; j++) {
- String instanceName =
- String.format("%s-%s-%s", testParticipantNamePrefix, clusterName, j);
+ for (int j = 0; j < numParticipant; j++) {
+ String instanceName = String.format("%s-%s-%s", clusterName, testParticipantNamePrefix, j);
_gSetupTool.addInstanceToCluster(clusterName, instanceName);
- MockParticipantManager participant = new MockParticipantManager(ZK_ADDR, clusterName, instanceName);
+ MockParticipantManager participant =
+ new MockParticipantManager(ZK_ADDR, clusterName, instanceName);
participant.syncStart();
_allParticipants.add(participant);
}
diff --git a/helix-view-aggregator/src/test/java/org/apache/helix/view/mock/MockClusterEventProcessor.java b/helix-view-aggregator/src/test/java/org/apache/helix/view/mock/MockClusterEventProcessor.java
index 36f27a2..cf51cbd 100644
--- a/helix-view-aggregator/src/test/java/org/apache/helix/view/mock/MockClusterEventProcessor.java
+++ b/helix-view-aggregator/src/test/java/org/apache/helix/view/mock/MockClusterEventProcessor.java
@@ -19,14 +19,15 @@
* under the License.
*/
-import org.apache.helix.common.ClusterEventProcessor;
-import org.apache.helix.controller.stages.ClusterEvent;
+import java.util.concurrent.atomic.AtomicInteger;
+import org.apache.helix.common.DedupEventProcessor;
+import org.apache.helix.view.common.ClusterViewEvent;
-public class MockClusterEventProcessor extends ClusterEventProcessor {
- private int _handledClusterConfigChange;
- private int _handledExternalViewChange;
- private int _handledInstanceConfigChange;
- private int _handledLiveInstancesChange;
+public class MockClusterEventProcessor extends DedupEventProcessor<ClusterViewEvent.Type, ClusterViewEvent> {
+ private final AtomicInteger _handledClusterConfigChange = new AtomicInteger(0);
+ private final AtomicInteger _handledExternalViewChange = new AtomicInteger(0);
+ private final AtomicInteger _handledInstanceConfigChange = new AtomicInteger(0);
+ private final AtomicInteger _handledLiveInstancesChange = new AtomicInteger(0);
public MockClusterEventProcessor(String clusterName) {
super(clusterName);
@@ -34,45 +35,45 @@
}
public int getHandledClusterConfigChangeCount() {
- return _handledClusterConfigChange;
+ return _handledClusterConfigChange.get();
}
public int getHandledExternalViewChangeCount() {
- return _handledExternalViewChange;
+ return _handledExternalViewChange.get();
}
public int getHandledInstanceConfigChangeCount() {
- return _handledInstanceConfigChange;
+ return _handledInstanceConfigChange.get();
}
public int getHandledLiveInstancesChangeCount() {
- return _handledLiveInstancesChange;
+ return _handledLiveInstancesChange.get();
}
public void resetHandledEventCount() {
- _handledClusterConfigChange = 0;
- _handledExternalViewChange = 0;
- _handledInstanceConfigChange = 0;
- _handledLiveInstancesChange = 0;
+ _handledClusterConfigChange.set(0);
+ _handledExternalViewChange.set(0);
+ _handledInstanceConfigChange.set(0);
+ _handledLiveInstancesChange.set(0);
}
@Override
- public void handleEvent(ClusterEvent event) {
+ public void handleEvent(ClusterViewEvent event) {
switch (event.getEventType()) {
- case ClusterConfigChange:
- _handledClusterConfigChange += 1;
- break;
- case LiveInstanceChange:
- _handledLiveInstancesChange += 1;
- break;
- case InstanceConfigChange:
- _handledInstanceConfigChange += 1;
- break;
- case ExternalViewChange:
- _handledExternalViewChange += 1;
- break;
- default:
- break;
+ case ConfigChange:
+ _handledClusterConfigChange.incrementAndGet();
+ break;
+ case LiveInstanceChange:
+ _handledLiveInstancesChange.incrementAndGet();
+ break;
+ case InstanceConfigChange:
+ _handledInstanceConfigChange.incrementAndGet();
+ break;
+ case ExternalViewChange:
+ _handledExternalViewChange.incrementAndGet();
+ break;
+ default:
+ break;
}
}
}
diff --git a/helix-view-aggregator/src/test/java/org/apache/helix/view/mock/MockSourceClusterDataProvider.java b/helix-view-aggregator/src/test/java/org/apache/helix/view/mock/MockSourceClusterDataProvider.java
index cc4afd6..57e9c38 100644
--- a/helix-view-aggregator/src/test/java/org/apache/helix/view/mock/MockSourceClusterDataProvider.java
+++ b/helix-view-aggregator/src/test/java/org/apache/helix/view/mock/MockSourceClusterDataProvider.java
@@ -20,12 +20,17 @@
*/
import java.util.ArrayList;
+import java.util.HashMap;
import java.util.List;
+import java.util.Map;
+import org.apache.helix.HelixProperty;
import org.apache.helix.api.config.ViewClusterSourceConfig;
-import org.apache.helix.common.ClusterEventProcessor;
+import org.apache.helix.common.DedupEventProcessor;
+import org.apache.helix.common.caches.ExternalViewCache;
import org.apache.helix.model.ExternalView;
import org.apache.helix.model.InstanceConfig;
import org.apache.helix.model.LiveInstance;
+import org.apache.helix.view.common.ClusterViewEvent;
import org.apache.helix.view.dataprovider.SourceClusterDataProvider;
public class MockSourceClusterDataProvider extends SourceClusterDataProvider {
@@ -48,7 +53,7 @@
}
public MockSourceClusterDataProvider(ViewClusterSourceConfig config,
- ClusterEventProcessor processor) {
+ DedupEventProcessor<ClusterViewEvent.Type, ClusterViewEvent> processor) {
super(config, processor);
_externalViewCache = new MockExternalViewCache("Test");
}
@@ -83,21 +88,14 @@
}
public void setInstanceConfigs(List<InstanceConfig> instanceConfigList) {
- for (InstanceConfig config : instanceConfigList) {
- _instanceConfigMap.put(config.getInstanceName(), config);
- }
+ _instanceConfigPropertyCache.setPropertyMap(HelixProperty.convertListToMap(instanceConfigList));
}
public void setLiveInstances(List<LiveInstance> liveInstanceList) {
- for (LiveInstance instance : liveInstanceList) {
- _liveInstanceMap.put(instance.getInstanceName(), instance);
- }
+ _liveInstancePropertyCache.setPropertyMap(HelixProperty.convertListToMap(liveInstanceList));
}
public void setExternalViews(List<ExternalView> externalViewList) {
- for (ExternalView ev : externalViewList) {
- _externalViewMap.put(ev.getResourceName(), ev);
- }
((MockExternalViewCache) _externalViewCache).setExternalView(externalViewList);
}
-}
+}
\ No newline at end of file
diff --git a/helix-view-aggregator/src/test/java/org/apache/helix/view/mock/MockViewClusterSpectator.java b/helix-view-aggregator/src/test/java/org/apache/helix/view/mock/MockViewClusterSpectator.java
new file mode 100644
index 0000000..c95ded5
--- /dev/null
+++ b/helix-view-aggregator/src/test/java/org/apache/helix/view/mock/MockViewClusterSpectator.java
@@ -0,0 +1,123 @@
+package org.apache.helix.view.mock;
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+import java.util.List;
+import org.apache.helix.HelixDataAccessor;
+import org.apache.helix.HelixManager;
+import org.apache.helix.HelixManagerFactory;
+import org.apache.helix.InstanceType;
+import org.apache.helix.NotificationContext;
+import org.apache.helix.PropertyType;
+import org.apache.helix.api.listeners.ExternalViewChangeListener;
+import org.apache.helix.api.listeners.InstanceConfigChangeListener;
+import org.apache.helix.api.listeners.LiveInstanceChangeListener;
+import org.apache.helix.api.listeners.PreFetch;
+import org.apache.helix.model.ExternalView;
+import org.apache.helix.model.InstanceConfig;
+import org.apache.helix.model.LiveInstance;
+import org.apache.helix.view.common.ClusterViewEvent;
+
+/**
+ * MockViewClusterSpectator monitors change in view cluster. When event happens, it push event to
+ * MockClusterEventProcessor which records event change count in view cluster
+ */
+public class MockViewClusterSpectator implements ExternalViewChangeListener,
+ InstanceConfigChangeListener, LiveInstanceChangeListener{
+ private final HelixManager _manager;
+ private final MockClusterEventProcessor _eventProcessor;
+ private final String _viewClusterName;
+ private final HelixDataAccessor _dataAccessor;
+
+ public MockViewClusterSpectator(String viewClusterName, String zkAddr) throws Exception {
+ _viewClusterName = viewClusterName;
+ _eventProcessor = new MockClusterEventProcessor(viewClusterName);
+ _eventProcessor.start();
+ _manager = HelixManagerFactory
+ .getZKHelixManager(viewClusterName, "MockViewClusterSpectator-" + viewClusterName,
+ InstanceType.SPECTATOR, zkAddr);
+ _manager.connect();
+ _manager.addExternalViewChangeListener(this);
+ _manager.addLiveInstanceChangeListener(this);
+ _manager.addInstanceConfigChangeListener(this);
+ _dataAccessor = _manager.getHelixDataAccessor();
+ }
+
+ public void shutdown() throws Exception {
+ _eventProcessor.interrupt();
+ if (_manager != null && _manager.isConnected()) {
+ _manager.disconnect();
+ }
+ }
+
+ @Override
+ @PreFetch(enabled = false)
+ public void onExternalViewChange(List<ExternalView> externalViewList,
+ NotificationContext changeContext) {
+ _eventProcessor.queueEvent(ClusterViewEvent.Type.ExternalViewChange,
+ new ClusterViewEvent(_viewClusterName, ClusterViewEvent.Type.ExternalViewChange));
+ }
+
+ @Override
+ @PreFetch(enabled = false)
+ public void onInstanceConfigChange(List<InstanceConfig> instanceConfigs,
+ NotificationContext context) {
+ _eventProcessor.queueEvent(ClusterViewEvent.Type.InstanceConfigChange,
+ new ClusterViewEvent(_viewClusterName, ClusterViewEvent.Type.InstanceConfigChange));
+ }
+
+ @Override
+ @PreFetch(enabled = false)
+ public void onLiveInstanceChange(List<LiveInstance> liveInstances,
+ NotificationContext changeContext) {
+ _eventProcessor.queueEvent(ClusterViewEvent.Type.LiveInstanceChange,
+ new ClusterViewEvent(_viewClusterName, ClusterViewEvent.Type.LiveInstanceChange));
+ }
+
+ public int getLiveInstanceChangeCount() {
+ return _eventProcessor.getHandledLiveInstancesChangeCount();
+ }
+
+ public int getInstanceConfigChangeCount() {
+ return _eventProcessor.getHandledInstanceConfigChangeCount();
+ }
+
+ public int getExternalViewChangeCount() {
+ return _eventProcessor.getHandledExternalViewChangeCount();
+ }
+
+ public synchronized void reset() {
+ _eventProcessor.resetHandledEventCount();
+ }
+
+ public List<String> getPropertyNamesFromViewCluster(PropertyType propertyType) {
+ switch (propertyType) {
+ case INSTANCES:
+ return _dataAccessor.getChildNames(_dataAccessor.keyBuilder().instanceConfigs());
+ case LIVEINSTANCES:
+ return _dataAccessor.getChildNames(_dataAccessor.keyBuilder().liveInstances());
+ case EXTERNALVIEW:
+ return _dataAccessor.getChildNames(_dataAccessor.keyBuilder().externalViews());
+ default:
+ return null;
+ }
+ }
+
+}
diff --git a/pom.xml b/pom.xml
index 1876824..07c7abe 100644
--- a/pom.xml
+++ b/pom.xml
@@ -275,10 +275,10 @@
<module>helix-admin-webapp</module>
<module>helix-rest</module>
<module>helix-lock</module>
- <module>helix-view-aggregator</module>
<module>helix-agent</module>
<!--<module>helix-front</module>-->
<module>recipes</module>
+ <module>helix-view-aggregator</module>
</modules>
<mailingLists>