HELIX-705: Implement ViewClusterRefresher logic and tests
RB=1213810
BUG=HELIX-776
G=helix-reviewers
R=lxia,jjwang,jxue,erkim
A=jjwang,jxue
diff --git a/helix-core/src/main/java/org/apache/helix/common/caches/BasicClusterDataCache.java b/helix-core/src/main/java/org/apache/helix/common/caches/BasicClusterDataCache.java
index 8331f30..2870112 100644
--- a/helix-core/src/main/java/org/apache/helix/common/caches/BasicClusterDataCache.java
+++ b/helix-core/src/main/java/org/apache/helix/common/caches/BasicClusterDataCache.java
@@ -46,6 +46,10 @@
protected PropertyCache<LiveInstance> _liveInstancePropertyCache;
protected PropertyCache<InstanceConfig> _instanceConfigPropertyCache;
protected ExternalViewCache _externalViewCache;
+ protected Map<String, LiveInstance> _liveInstanceMap;
+ protected Map<String, InstanceConfig> _instanceConfigMap;
+ protected Map<String, ExternalView> _externalViewMap;
+ private final PropertyType _sourceDataType;
protected String _clusterName;
diff --git a/helix-view-aggregator/src/main/java/org/apache/helix/view/aggregator/ViewClusterRefresher.java b/helix-view-aggregator/src/main/java/org/apache/helix/view/aggregator/ViewClusterRefresher.java
index e770302..58a35e6 100644
--- a/helix-view-aggregator/src/main/java/org/apache/helix/view/aggregator/ViewClusterRefresher.java
+++ b/helix-view-aggregator/src/main/java/org/apache/helix/view/aggregator/ViewClusterRefresher.java
@@ -19,32 +19,39 @@
* under the License.
*/
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
import java.util.Map;
+import java.util.Set;
import org.apache.helix.HelixDataAccessor;
+import org.apache.helix.HelixProperty;
import org.apache.helix.PropertyKey;
+import org.apache.helix.PropertyType;
import org.apache.helix.model.ExternalView;
-import org.apache.helix.model.InstanceConfig;
-import org.apache.helix.model.LiveInstance;
import org.apache.helix.view.dataprovider.SourceClusterDataProvider;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
- * Contains logics to refresh view cluster based on information from source cluster data providers
+ * This class contains logics to refresh view cluster based on information from source cluster data
+ * providers.
+ * This class assumes SourceClusterDataProviders have its caches refreshed already.
*/
public class ViewClusterRefresher {
private static final Logger logger = LoggerFactory.getLogger(ViewClusterRefresher.class);
private String _viewClusterName;
private HelixDataAccessor _viewClusterDataAccessor;
- private PropertyKey.Builder _viewClusterKeyBuilder;
private Map<String, SourceClusterDataProvider> _dataProviderMap;
// These 3 caches stores objects that are pushed to view cluster (write-through) cache,
// thus we don't need to read from view cluster everytime we refresh it.
- private Map<String, LiveInstance> _viewClusterLiveInstanceCache;
- private Map<String, InstanceConfig> _viewClusterInstanceConfigCache;
- private Map<String, ExternalView> _viewClusterExternalViewCache;
+ private Map<String, HelixProperty> _viewClusterLiveInstanceCache;
+ private Map<String, HelixProperty> _viewClusterInstanceConfigCache;
+ private Map<String, HelixProperty> _viewClusterExternalViewCache;
public ViewClusterRefresher(String viewClusterName, HelixDataAccessor viewClusterDataAccessor,
Map<String, SourceClusterDataProvider> dataProviderMap) {
@@ -56,38 +63,135 @@
_viewClusterExternalViewCache = new HashMap<>();
}
- /**
- * Create / update current live instances; remove dead live instances.
- * This function assumes that all data providers have been refreshed.
- */
- public void refreshLiveInstancesInViewCluster() {
- // TODO: implement it
+ private class ClusterPropertyDiff {
+ /**
+ * List of names of objects to set (create or modify)
+ */
+ List<String> _keysToSet;
+
+ /**
+ * List of actual objects represented by _keysToSet
+ */
+ List<HelixProperty> _propertiesToSet;
+
+ /**
+ * List of names of objects to delete
+ */
+ List<String> _keysToDelete;
+
+ public ClusterPropertyDiff() {
+ _keysToSet = new ArrayList<>();
+ _propertiesToSet = new ArrayList<>();
+ _keysToDelete = new ArrayList<>();
+ }
+
+ public void addPropertyToSet(String key, HelixProperty obj) {
+ // batch setChildren API needs keys and objects to have corresponding order
+ _keysToSet.add(key);
+ _propertiesToSet.add(obj);
+ }
+
+ public void addPropertiesToDelete(Collection<? extends String> keys) {
+ _keysToDelete.addAll(keys);
+ }
+
+ public List<String> getKeysToSet() {
+ return Collections.unmodifiableList(_keysToSet);
+ }
+
+ public List<HelixProperty> getPropertiesToSet() {
+ return Collections.unmodifiableList(_propertiesToSet);
+ }
+
+ public List<String> getKeysToDelete() {
+ return Collections.unmodifiableList(_keysToDelete);
+ }
}
/**
- * Create / update current instance configs; remove dead instance configs
- * This function assumes that all data providers have been refreshed.
+ * Create / update / delete property of given type in view cluster, based on data change from
+ * source clusters.
+ *
+ * @param propertyType type of property to refresh in view cluster
+ * @return true if successfully refreshed all instances of the given property else false
+ * @throws IllegalArgumentException throws exception when give type is not supported
*/
- public void refreshInstanceConfigsInViewCluster() {
- // TODO: implement it
- }
+ public boolean refreshPropertiesInViewCluster(PropertyType propertyType)
+ throws IllegalArgumentException {
+ boolean ok = false;
+ Set<String> listedNamesInView;
+ Set<String> listedNamesInSource = new HashSet<>();
+ Map<String, HelixProperty> sourceProperties = new HashMap<>();
+ Map<String, HelixProperty> viewClusterPropertyCache =
+ getViewClusterPropertyCache(propertyType);
+ if (viewClusterPropertyCache == null) {
+ throw new IllegalArgumentException(
+ "Cannot find view cluster property cache. Property: " + propertyType.name());
+ }
- /**
- * Create / update currently existing external views; delete outdated external views.
- * This function assumes that all data providers have been refreshed.
- */
- public void refreshExtrenalViewsInViewCluster() {
- // TODO: implement it
+ try {
+ listedNamesInView =
+ new HashSet<>(_viewClusterDataAccessor.getChildNames(getPropertyKey(propertyType, null)));
+ // Prepare data
+ for (SourceClusterDataProvider provider : _dataProviderMap.values()) {
+ if (!provider.getPropertiesToAggregate().contains(propertyType)) {
+ logger.info(String
+ .format("SourceCluster %s does not need to aggregate %s, skip.", provider.getName(),
+ propertyType.name()));
+ continue;
+ }
+ switch (propertyType) {
+ case INSTANCES:
+ listedNamesInSource.addAll(provider.getInstanceConfigNames());
+ sourceProperties.putAll(provider.getInstanceConfigMap());
+ break;
+ case LIVEINSTANCES:
+ listedNamesInSource.addAll(provider.getLiveInstanceNames());
+ sourceProperties.putAll(provider.getLiveInstances());
+ break;
+ case EXTERNALVIEW:
+ listedNamesInSource.addAll(provider.getExternalViewNames());
+ for (Map.Entry<String, ExternalView> entry : provider.getExternalViews().entrySet()) {
+ String resourceName = entry.getKey();
+ ExternalView resourceEV = entry.getValue();
+ if (sourceProperties.containsKey(resourceName)) {
+ // Merge external views if we already have a record
+ mergeExternalViews((ExternalView) sourceProperties.get(resourceName), resourceEV);
+ } else {
+ // merging simple fields and list fields are meaningless and would cause confusion
+ resourceEV.getRecord().getSimpleFields().clear();
+ resourceEV.getRecord().getListFields().clear();
+ sourceProperties.put(resourceName, resourceEV);
+ }
+ }
+ break;
+ default:
+ // Will NOT come here as for unsupported property type, exception will be thrown out
+ // earlier
+ break;
+ }
+ }
+
+ // Perform refresh
+ ok = doRefresh(propertyType, listedNamesInView, listedNamesInSource, sourceProperties,
+ viewClusterPropertyCache);
+ } catch (Exception e) {
+ logger.warn(String
+ .format("Caught exception during refreshing %s for view cluster %s", propertyType.name(),
+ _viewClusterName), e);
+ }
+ return ok;
}
/**
* Merge external view "toMerge" into external view "source":
* - if partition in toMerge does not exist in source, we add it into source
* - if partition exist in both external views, we add all map fields from toMerge to source
+ *
* @param source
* @param toMerge
*/
- public static void mergeExternalViews(ExternalView source, ExternalView toMerge)
+ private static void mergeExternalViews(ExternalView source, ExternalView toMerge)
throws IllegalArgumentException {
if (!source.getId().equals(toMerge.getId())) {
throw new IllegalArgumentException(String
@@ -104,4 +208,220 @@
}
}
}
+
+ /**
+ * Based on property names in view cluster, property names in source clusters, and all cached
+ * properties in source clusters, generate ClusterPropertyDiff that contains information about
+ * what to add / update or delete
+ *
+ * @param viewPropertyNames names of all properties (i.e. liveInstances) in view cluster
+ * @param sourcePropertyNames names of all properties (i.e. liveInstances) in all source clusters
+ * @param cachedSourceProperties all cached properties from source clusters
+ * @param viewClusterPropertyCache all properties that are previously set successfully to view cluster
+ * @param <T> extends HelixProperty
+ * @return ClusterPropertyDiff object contains diff information
+ */
+ private <T extends HelixProperty> ClusterPropertyDiff calculatePropertyDiff(
+ Set<String> viewPropertyNames, Set<String> sourcePropertyNames,
+ Map<String, T> cachedSourceProperties, Map<String, T> viewClusterPropertyCache) {
+ ClusterPropertyDiff diff = new ClusterPropertyDiff();
+
+ // items whose names are in view cluster but not in source should be removed for sure
+ Set<String> toDelete = new HashSet<>(viewPropertyNames);
+ toDelete.removeAll(sourcePropertyNames);
+ diff.addPropertiesToDelete(toDelete);
+
+ for (Map.Entry<String, T> sourceProperty : cachedSourceProperties.entrySet()) {
+ String name = sourceProperty.getKey();
+ HelixProperty property = sourceProperty.getValue();
+
+ // cache refresh happens earlier than list curNames, so if cache is still in curNames,
+ // we confirm that this is a valid live instance. This is necessary because ZK
+ // can possibly not return all children content in a refresh, but list child names
+ // will reliably return all children names.
+ //
+ // Else, either this child is already deleted, or we fail to retrieve information
+ // from a cache refresh. either way, we will leave it to next ViewClusterRefresh cycle
+ // to confirm state
+ if (property != null && sourcePropertyNames.contains(name) && (
+ !viewClusterPropertyCache.containsKey(name) || !viewClusterPropertyCache.get(name)
+ .getRecord().equals(property.getRecord()))) {
+ diff.addPropertyToSet(name, property);
+ }
+ }
+ return diff;
+ }
+
+ /**
+ * Refresh view cluster regarding a particular property based given source of truths.
+ * Steps are:
+ * - Calculate diff based on propertyNamesInView, propertyNamesInSource and
+ * cachedSourceProperties
+ * - Generate property keys for properties to set / delete
+ * - Delete properties
+ * - Set properties
+ *
+ * @param propertyType type of property to refresh
+ * @param viewPropertyNames all names of the target properties in view cluster
+ * @param sourcePropertyNames all names of the target properties in source clusters
+ * @param cachedSourceProperties all up-to-date cached properties in source cluster
+ * @param viewClusterPropertyCache view cluster cache
+ * @param <T> extends HelixProperty
+ * @return true if all required refreshes are successful, else false
+ */
+ private <T extends HelixProperty> boolean doRefresh(PropertyType propertyType,
+ Set<String> viewPropertyNames, Set<String> sourcePropertyNames,
+ Map<String, T> cachedSourceProperties, Map<String, T> viewClusterPropertyCache) {
+ boolean ok = true;
+
+ // Calculate diff
+ ClusterPropertyDiff diff =
+ calculatePropertyDiff(viewPropertyNames, sourcePropertyNames, cachedSourceProperties,
+ viewClusterPropertyCache);
+
+ // Generate property keys
+ List<PropertyKey> keysToSet = new ArrayList<>();
+ List<PropertyKey> keysToDelete = new ArrayList<>();
+ for (String name : diff.getKeysToSet()) {
+ PropertyKey key = getPropertyKey(propertyType, name);
+ if (key != null) {
+ keysToSet.add(key);
+ }
+ }
+
+ for (String name : diff.getKeysToDelete()) {
+ PropertyKey key = getPropertyKey(propertyType, name);
+ if (key != null) {
+ keysToDelete.add(key);
+ }
+ }
+
+ // Delete outdated properties
+ if (!deleteProperties(keysToDelete, viewClusterPropertyCache)) {
+ ok = false;
+ }
+
+ // Add or update changed properties
+ if (!addOrUpdateProperties(keysToSet, diff.getPropertiesToSet(), viewClusterPropertyCache)) {
+ ok = false;
+ }
+ return ok;
+ }
+
+ /**
+ * Based on type and property name, generate property key
+ * @param propertyType type of the property
+ * @param propertyName name of the property. If null, return key of the parent of
+ * all properties of given type
+ * @return property key
+ */
+ private PropertyKey getPropertyKey(PropertyType propertyType, String propertyName) {
+ switch (propertyType) {
+ case INSTANCES:
+ return propertyName == null
+ ? _viewClusterDataAccessor.keyBuilder().instanceConfigs()
+ : _viewClusterDataAccessor.keyBuilder().instanceConfig(propertyName);
+ case LIVEINSTANCES:
+ return propertyName == null
+ ? _viewClusterDataAccessor.keyBuilder().liveInstances()
+ : _viewClusterDataAccessor.keyBuilder().liveInstance(propertyName);
+ case EXTERNALVIEW:
+ return propertyName == null
+ ? _viewClusterDataAccessor.keyBuilder().externalViews()
+ : _viewClusterDataAccessor.keyBuilder().externalView(propertyName);
+ default:
+ return null;
+ }
+ }
+
+ private Map<String, HelixProperty> getViewClusterPropertyCache(PropertyType propertyType) {
+ switch (propertyType) {
+ case INSTANCES:
+ return _viewClusterInstanceConfigCache;
+ case LIVEINSTANCES:
+ return _viewClusterLiveInstanceCache;
+ case EXTERNALVIEW:
+ return _viewClusterExternalViewCache;
+ default:
+ return null;
+ }
+ }
+
+ /**
+ * Create or Update properties in ZK specified by a list of property keys. Update the given cache
+ * for the objects that got successfully created or updated in ZK
+ * @param keysToAddOrUpdate
+ * @param objects
+ * @param cache
+ * @param <T> HelixProperty
+ *
+ * @return true if all objects are successfully created or updated, else false
+ */
+ private <T extends HelixProperty> boolean addOrUpdateProperties(
+ List<PropertyKey> keysToAddOrUpdate, List<HelixProperty> objects, Map<String, T> cache) {
+ boolean ok = true;
+ logger.info(String.format("AddOrUpdate objects: %s", keysToAddOrUpdate));
+ boolean[] addOrUpdateResults = _viewClusterDataAccessor.setChildren(keysToAddOrUpdate, objects);
+ for (int i = 0; i < addOrUpdateResults.length; i++) {
+ if (!addOrUpdateResults[i]) {
+ // Don't add item to cache yet - will retry during next refresh
+ logger.warn(String.format("Failed to create or update live instance %s, will retry later",
+ keysToAddOrUpdate.get(i).getPath()));
+ ok = false;
+ } else {
+ // Successfully updated ViewClusterZK, proceed to update cache
+ @SuppressWarnings("unchecked")
+ T property = (T) objects.get(i);
+ cache.put(getBaseObjectNameFromPropertyKey(keysToAddOrUpdate.get(i)), property);
+ }
+ }
+ return ok;
+ }
+
+ /**
+ * Delete properties in ZK specified by a list of property keys. Update the given cache
+ * for the objects that got successfully deleted in ZK
+ * @param keysToDelete
+ * @param cache
+ * @param <T> HelixProperty
+ * @return true if all objects got successfully deleted else false
+ */
+ private <T extends HelixProperty> boolean deleteProperties(List<PropertyKey> keysToDelete,
+ Map<String, T> cache) {
+ boolean ok = true;
+ logger.info(String.format("Deleting objects: %s", keysToDelete));
+ for (PropertyKey key : keysToDelete) {
+ if (!_viewClusterDataAccessor.removeProperty(key)) {
+ // Don't remove item from cache yet - will retry during next refresh
+ ok = false;
+ logger.warn(String.format("Failed to create or update live instance %s, will retry later",
+ key.getPath()));
+ } else {
+ // Successfully updated ViewClusterZK, proceed to update cache
+ cache.remove(getBaseObjectNameFromPropertyKey(key));
+ }
+ }
+ return ok;
+ }
+
+ /**
+ * Parse property key and get the id of the ZNode that property key represents
+ * @param key
+ * @return
+ */
+ private static String getBaseObjectNameFromPropertyKey(PropertyKey key) {
+ String[] params = key.getParams();
+ return params[params.length - 1];
+ }
+
+ private void logRefreshResult(PropertyType type, boolean ok) {
+ if (!ok) {
+ logger.warn(String
+ .format("Failed to refresh all %s for view cluster %s, will retry",
+ type.name(), _viewClusterName));
+ } else {
+ logger.info(String.format("Successfully refreshed all %s for view cluster %s",
+ type.name(), _viewClusterName));
+ }
+ }
}
diff --git a/helix-view-aggregator/src/main/java/org/apache/helix/view/dataprovider/SourceClusterDataProvider.java b/helix-view-aggregator/src/main/java/org/apache/helix/view/dataprovider/SourceClusterDataProvider.java
index 7b148e0..3c590a6 100644
--- a/helix-view-aggregator/src/main/java/org/apache/helix/view/dataprovider/SourceClusterDataProvider.java
+++ b/helix-view-aggregator/src/main/java/org/apache/helix/view/dataprovider/SourceClusterDataProvider.java
@@ -52,9 +52,9 @@
implements InstanceConfigChangeListener, LiveInstanceChangeListener,
ExternalViewChangeListener {
private final HelixManager _helixManager;
- private final ViewClusterSourceConfig _sourceClusterConfig;
private final ClusterEventProcessor _eventProcessor;
+ protected ViewClusterSourceConfig _sourceClusterConfig;
private HelixDataAccessor _dataAccessor;
private PropertyKey.Builder _propertyKeyBuilder;
diff --git a/helix-view-aggregator/src/test/java/org/apache/helix/view/aggregator/TestViewClusterRefresher.java b/helix-view-aggregator/src/test/java/org/apache/helix/view/aggregator/TestViewClusterRefresher.java
new file mode 100644
index 0000000..0d0af22
--- /dev/null
+++ b/helix-view-aggregator/src/test/java/org/apache/helix/view/aggregator/TestViewClusterRefresher.java
@@ -0,0 +1,335 @@
+package org.apache.helix.view.aggregator;
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import org.apache.helix.HelixConstants;
+import org.apache.helix.HelixDataAccessor;
+import org.apache.helix.HelixProperty;
+import org.apache.helix.MockAccessor;
+import org.apache.helix.PropertyKey;
+import org.apache.helix.PropertyType;
+import org.apache.helix.api.config.ViewClusterSourceConfig;
+import org.apache.helix.model.ExternalView;
+import org.apache.helix.model.InstanceConfig;
+import org.apache.helix.model.LiveInstance;
+import org.apache.helix.view.dataprovider.SourceClusterDataProvider;
+import org.apache.helix.view.mock.MockSourceClusterDataProvider;
+import org.testng.Assert;
+import org.testng.annotations.Test;
+
+public class TestViewClusterRefresher {
+ private static final String viewClusterName = "viewCluster";
+ private static final int numSourceCluster = 3;
+ private static final int numInstancePerSourceCluster = 2;
+ private static final int numExternalViewPerSourceCluster = 3;
+ private static final int numPartition = 3;
+ private static final List<PropertyType> defaultProperties = Arrays.asList(
+ new PropertyType[] { PropertyType.LIVEINSTANCES, PropertyType.INSTANCES,
+ PropertyType.EXTERNALVIEW
+ });
+
+ private class CounterBasedMockAccessor extends MockAccessor {
+ private int _setCount;
+ private int _removeCount;
+
+ public CounterBasedMockAccessor(String clusterName) {
+ super(clusterName);
+ resetCounters();
+ }
+
+ public void resetCounters() {
+ _setCount = 0;
+ _removeCount = 0;
+ }
+
+ @Override
+ public boolean setProperty(PropertyKey key, HelixProperty value) {
+ _setCount += 1;
+ return super.setProperty(key, value);
+ }
+
+ @Override
+ public boolean removeProperty(PropertyKey key) {
+ _removeCount += 1;
+ return super.removeProperty(key);
+ }
+
+ public int getSetCount() {
+ return _setCount;
+ }
+
+ public int getRemoveCount() {
+ return _removeCount;
+ }
+ }
+
+ @Test
+ public void testRefreshWithNoChange() {
+ CounterBasedMockAccessor viewClusterDataAccessor =
+ new CounterBasedMockAccessor(viewClusterName);
+ Map<String, SourceClusterDataProvider> dataProviderMap = new HashMap<>();
+ createMockDataProviders(dataProviderMap);
+
+ ViewClusterRefresher refresher =
+ new ViewClusterRefresher(viewClusterName, viewClusterDataAccessor, dataProviderMap);
+
+ // Refresh an empty view cluster
+ Assert.assertTrue(refresher.refreshPropertiesInViewCluster(PropertyType.LIVEINSTANCES));
+ Assert.assertTrue(refresher.refreshPropertiesInViewCluster(PropertyType.INSTANCES));
+ Assert.assertTrue(refresher.refreshPropertiesInViewCluster(PropertyType.EXTERNALVIEW));
+ viewClusterDataAccessor.resetCounters();
+
+ // Refresh again without change
+ Assert.assertTrue(refresher.refreshPropertiesInViewCluster(PropertyType.LIVEINSTANCES));
+ Assert.assertTrue(refresher.refreshPropertiesInViewCluster(PropertyType.INSTANCES));
+ Assert.assertTrue(refresher.refreshPropertiesInViewCluster(PropertyType.EXTERNALVIEW));
+ Assert.assertEquals(viewClusterDataAccessor.getSetCount(), 0);
+ }
+
+ @Test
+ public void testRefreshWithInstanceChange() {
+ CounterBasedMockAccessor viewClusterDataAccessor =
+ new CounterBasedMockAccessor(viewClusterName);
+ Map<String, SourceClusterDataProvider> dataProviderMap = new HashMap<>();
+ createMockDataProviders(dataProviderMap);
+
+ ViewClusterRefresher refresher =
+ new ViewClusterRefresher(viewClusterName, viewClusterDataAccessor, dataProviderMap);
+ MockSourceClusterDataProvider sampleProvider =
+ (MockSourceClusterDataProvider) dataProviderMap.get("cluster0");
+
+ // Refresh an empty view cluster
+ Assert.assertTrue(refresher.refreshPropertiesInViewCluster(PropertyType.LIVEINSTANCES));
+ Assert.assertTrue(refresher.refreshPropertiesInViewCluster(PropertyType.INSTANCES));
+ // multiply by 2 for both instance config and live instances
+ Assert.assertEquals(viewClusterDataAccessor.getSetCount(),
+ numSourceCluster * numInstancePerSourceCluster * 2);
+ verifyInstances(viewClusterDataAccessor, dataProviderMap);
+
+ // Source cluster has instances deleted
+ viewClusterDataAccessor.resetCounters();
+ sampleProvider.clearCache(HelixConstants.ChangeType.LIVE_INSTANCE);
+ sampleProvider.clearCache(HelixConstants.ChangeType.INSTANCE_CONFIG);
+ Assert.assertTrue(refresher.refreshPropertiesInViewCluster(PropertyType.LIVEINSTANCES));
+ Assert.assertTrue(refresher.refreshPropertiesInViewCluster(PropertyType.INSTANCES));
+ Assert.assertEquals(viewClusterDataAccessor.getRemoveCount(), numInstancePerSourceCluster * 2);
+
+ verifyInstances(viewClusterDataAccessor, dataProviderMap);
+
+ // Source cluster has live instances added
+ viewClusterDataAccessor.resetCounters();
+ List<LiveInstance> liveInstances =
+ new ArrayList<>(sampleProvider.getLiveInstances().values());
+ liveInstances.add(new LiveInstance("newLiveInstance"));
+ sampleProvider.setLiveInstances(liveInstances);
+ Assert.assertTrue(refresher.refreshPropertiesInViewCluster(PropertyType.LIVEINSTANCES));
+ Assert.assertTrue(refresher.refreshPropertiesInViewCluster(PropertyType.INSTANCES));
+ Assert.assertEquals(viewClusterDataAccessor.getSetCount(), 1);
+
+ verifyInstances(viewClusterDataAccessor, dataProviderMap);
+ }
+
+ @Test
+ public void testRefreshWithExternalViewChange() {
+ CounterBasedMockAccessor accessor =
+ new CounterBasedMockAccessor(viewClusterName);
+ Map<String, SourceClusterDataProvider> dataProviderMap = new HashMap<>();
+ createMockDataProviders(dataProviderMap);
+
+ ViewClusterRefresher refresher =
+ new ViewClusterRefresher(viewClusterName, accessor, dataProviderMap);
+ MockSourceClusterDataProvider sampleProvider =
+ (MockSourceClusterDataProvider) dataProviderMap.get("cluster0");
+
+ // Refresh an empty view cluster
+ Assert.assertTrue(refresher.refreshPropertiesInViewCluster(PropertyType.EXTERNALVIEW));
+ // We only have 2 resource names so should be setting 2 evs
+ Assert.assertEquals(accessor.getSetCount(), numExternalViewPerSourceCluster);
+ Assert.assertEquals(accessor.getRemoveCount(), 0);
+ // Partition count should be maintained
+ // Since we are creating 1 replica per source cluster so replica should be aggregated.
+ verifyExternalView(accessor, numExternalViewPerSourceCluster, numPartition, numSourceCluster);
+
+ // One cluster deletes all its EVs, number of partition should reflect in changes
+ accessor.resetCounters();
+ sampleProvider.clearCache(HelixConstants.ChangeType.EXTERNAL_VIEW);
+ Assert.assertTrue(refresher.refreshPropertiesInViewCluster(PropertyType.EXTERNALVIEW));
+ Assert.assertEquals(accessor.getSetCount(), numExternalViewPerSourceCluster);
+ Assert.assertEquals(accessor.getRemoveCount(), 0);
+ verifyExternalView(accessor, numExternalViewPerSourceCluster, numPartition,
+ numSourceCluster - 1);
+
+ // All EVs are deleted, and all EVs should be removed from view cluster
+ for (SourceClusterDataProvider provider : dataProviderMap.values()) {
+ provider.clearCache(HelixConstants.ChangeType.EXTERNAL_VIEW);
+ }
+ accessor.resetCounters();
+ Assert.assertTrue(refresher.refreshPropertiesInViewCluster(PropertyType.EXTERNALVIEW));
+ Assert.assertEquals(accessor.getSetCount(), 0);
+ Assert.assertEquals(accessor.getRemoveCount(), numExternalViewPerSourceCluster);
+ verifyExternalView(accessor, 0, 0, 0);
+ }
+
+ @Test
+ public void testRefreshWithProviderChange() {
+ CounterBasedMockAccessor viewClusterDataAccessor =
+ new CounterBasedMockAccessor(viewClusterName);
+ Map<String, SourceClusterDataProvider> dataProviderMap = new HashMap<>();
+ createMockDataProviders(dataProviderMap);
+
+ ViewClusterRefresher refresher =
+ new ViewClusterRefresher(viewClusterName, viewClusterDataAccessor, dataProviderMap);
+ MockSourceClusterDataProvider sampleProvider =
+ (MockSourceClusterDataProvider) dataProviderMap.get("cluster0");
+
+ // Refresh an empty view cluster
+ Assert.assertTrue(refresher.refreshPropertiesInViewCluster(PropertyType.LIVEINSTANCES));
+ Assert.assertTrue(refresher.refreshPropertiesInViewCluster(PropertyType.INSTANCES));
+ Assert.assertTrue(refresher.refreshPropertiesInViewCluster(PropertyType.EXTERNALVIEW));
+ verifyInstances(viewClusterDataAccessor, dataProviderMap);
+ verifyExternalView(viewClusterDataAccessor, numExternalViewPerSourceCluster, numPartition,
+ numSourceCluster);
+ viewClusterDataAccessor.resetCounters();
+
+ // remove InstanceConfig and ExternalView requirement from sample provider
+ sampleProvider.getConfig()
+ .setProperties(Arrays.asList(new PropertyType[] { PropertyType.LIVEINSTANCES }));
+
+ // Refresh again
+ Assert.assertTrue(refresher.refreshPropertiesInViewCluster(PropertyType.LIVEINSTANCES));
+ Assert.assertTrue(refresher.refreshPropertiesInViewCluster(PropertyType.INSTANCES));
+ Assert.assertTrue(refresher.refreshPropertiesInViewCluster(PropertyType.EXTERNALVIEW));
+
+ // Removing external view from config of 1 cluster will not result in removing any external view
+ // but to update external views
+ Assert.assertEquals(viewClusterDataAccessor.getSetCount(), numExternalViewPerSourceCluster);
+ Assert.assertEquals(viewClusterDataAccessor.getRemoveCount(), numInstancePerSourceCluster);
+ verifyExternalView(viewClusterDataAccessor, numExternalViewPerSourceCluster, numPartition,
+ numSourceCluster - 1);
+ verifyInstances(viewClusterDataAccessor, dataProviderMap);
+
+ // Remove external view from all source clusters, will resulting in removing all external views
+ viewClusterDataAccessor.resetCounters();
+ for (SourceClusterDataProvider provider : dataProviderMap.values()) {
+ MockSourceClusterDataProvider mockProvider = (MockSourceClusterDataProvider) provider;
+ if (mockProvider != sampleProvider) {
+ mockProvider.getConfig().setProperties(Arrays
+ .asList(new PropertyType[] { PropertyType.LIVEINSTANCES, PropertyType.INSTANCES }));
+ }
+ }
+
+ // Refresh again
+ Assert.assertTrue(refresher.refreshPropertiesInViewCluster(PropertyType.LIVEINSTANCES));
+ Assert.assertTrue(refresher.refreshPropertiesInViewCluster(PropertyType.INSTANCES));
+ Assert.assertTrue(refresher.refreshPropertiesInViewCluster(PropertyType.EXTERNALVIEW));
+
+ // No change in instances, but external views are all removed
+ Assert.assertEquals(viewClusterDataAccessor.getSetCount(), 0);
+ Assert.assertEquals(viewClusterDataAccessor.getRemoveCount(), numExternalViewPerSourceCluster);
+ verifyExternalView(viewClusterDataAccessor, 0, 0, 0);
+ verifyInstances(viewClusterDataAccessor, dataProviderMap);
+ }
+
+ private void verifyExternalView(HelixDataAccessor accessor, int expectedResourceCnt,
+ int expectedPartitionPerResource, int expectedReplicaPerPartition) {
+ PropertyKey.Builder keyBuilder = accessor.keyBuilder();
+ Assert.assertEquals(accessor.getChildNames(keyBuilder.externalViews()).size(),
+ expectedResourceCnt);
+ for (String resourceName : accessor.getChildNames(keyBuilder.externalViews())) {
+ ExternalView ev = accessor.getProperty(keyBuilder.externalView(resourceName));
+
+ Assert.assertEquals(ev.getPartitionSet().size(), expectedPartitionPerResource);
+ for (String partitionName : ev.getPartitionSet()) {
+ Assert.assertEquals(ev.getStateMap(partitionName).size(), expectedReplicaPerPartition);
+ }
+ }
+ }
+
+ private void verifyInstances(HelixDataAccessor accessor,
+ Map<String, SourceClusterDataProvider> dataProviderMap) {
+ // Verify live instances - since we don't modify ZNode's internal content,
+ // we just verify names
+ Set<String> fetchedNames = new HashSet<>(accessor
+ .getChildNames(accessor.keyBuilder().liveInstances()));
+ Set<String> expectedNames = new HashSet<>();
+ for (SourceClusterDataProvider provider : dataProviderMap.values()) {
+ if (provider.getPropertiesToAggregate().contains(PropertyType.LIVEINSTANCES)) {
+ expectedNames.addAll(provider.getLiveInstanceNames());
+ }
+ }
+ Assert.assertEquals(fetchedNames, expectedNames);
+
+ // Verify instance configs - since we don't modify ZNode's internal content,
+ // we just verify names
+ fetchedNames = new HashSet<>(accessor
+ .getChildNames(accessor.keyBuilder().instanceConfigs()));
+ expectedNames = new HashSet<>();
+ for (SourceClusterDataProvider provider : dataProviderMap.values()) {
+ if (provider.getPropertiesToAggregate().contains(PropertyType.INSTANCES)) {
+ expectedNames.addAll(provider.getInstanceConfigNames());
+ }
+ }
+ Assert.assertEquals(fetchedNames, expectedNames);
+ }
+
+ private void createMockDataProviders(Map<String, SourceClusterDataProvider> dataProviderMap) {
+ for (int i = 0; i < numSourceCluster; i++) {
+ String sourceClusterName = "cluster" + i;
+ ViewClusterSourceConfig sourceConfig =
+ new ViewClusterSourceConfig(sourceClusterName, "", defaultProperties);
+ MockSourceClusterDataProvider provider =
+ new MockSourceClusterDataProvider(sourceConfig, null);
+ List<LiveInstance> liveInstanceList = new ArrayList<>();
+ List<InstanceConfig> instanceConfigList = new ArrayList<>();
+ List<ExternalView> externalViewList = new ArrayList<>();
+
+ // InstanceConfig and LiveInstance
+ for (int j = 0; j < numInstancePerSourceCluster; j++) {
+ String instanceName = String.format("%s-instance%s", sourceClusterName, j);
+ liveInstanceList.add(new LiveInstance(instanceName));
+ instanceConfigList.add(new InstanceConfig(instanceName));
+ }
+ provider.setLiveInstances(liveInstanceList);
+ provider.setInstanceConfigs(instanceConfigList);
+
+ // ExternalView
+ for (int j = 0; j < numExternalViewPerSourceCluster; j++) {
+ String resourceName = String.format("Resource%s", j);
+ ExternalView ev = new ExternalView(resourceName);
+ for (int k = 0; k < numPartition; k++) {
+ String partitionName = String.format("Partition%s", k);
+ Map<String, String> stateMap = new HashMap<>();
+ stateMap.put(String.format("%s-instance", sourceClusterName), "MASTER");
+ ev.setStateMap(partitionName, stateMap);
+ }
+ externalViewList.add(ev);
+ }
+ provider.setExternalViews(externalViewList);
+ dataProviderMap.put(sourceClusterName, provider);
+ }
+ }
+}
diff --git a/helix-view-aggregator/src/test/java/org/apache/helix/view/mock/MockSourceClusterDataProvider.java b/helix-view-aggregator/src/test/java/org/apache/helix/view/mock/MockSourceClusterDataProvider.java
new file mode 100644
index 0000000..d7d0599
--- /dev/null
+++ b/helix-view-aggregator/src/test/java/org/apache/helix/view/mock/MockSourceClusterDataProvider.java
@@ -0,0 +1,84 @@
+package org.apache.helix.view.mock;
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+import java.util.ArrayList;
+import java.util.List;
+import org.apache.helix.api.config.ViewClusterSourceConfig;
+import org.apache.helix.common.ClusterEventProcessor;
+import org.apache.helix.model.ExternalView;
+import org.apache.helix.model.InstanceConfig;
+import org.apache.helix.model.LiveInstance;
+import org.apache.helix.view.dataprovider.SourceClusterDataProvider;
+
+public class MockSourceClusterDataProvider extends SourceClusterDataProvider {
+
+ public MockSourceClusterDataProvider(ViewClusterSourceConfig config,
+ ClusterEventProcessor processor) {
+ super(config, processor);
+ }
+
+ @Override
+ public void setup() {}
+
+ @Override
+ public void refreshCache() {}
+
+ @Override
+ public List<String> getInstanceConfigNames() {
+ return new ArrayList<>(getInstanceConfigMap().keySet());
+ }
+
+ @Override
+ public List<String> getLiveInstanceNames() {
+ return new ArrayList<>(getLiveInstances().keySet());
+ }
+
+ @Override
+ public List<String> getExternalViewNames() {
+ return new ArrayList<>(getExternalViews().keySet());
+ }
+
+ public void setConfig(ViewClusterSourceConfig config) {
+ _sourceClusterConfig = config;
+ }
+
+ public ViewClusterSourceConfig getConfig() {
+ return _sourceClusterConfig;
+ }
+
+ public void setInstanceConfigs(List<InstanceConfig> instanceConfigList) {
+ for (InstanceConfig config : instanceConfigList) {
+ _instanceConfigMap.put(config.getInstanceName(), config);
+ }
+ }
+
+ public void setLiveInstances(List<LiveInstance> liveInstanceList) {
+ for (LiveInstance instance : liveInstanceList) {
+ _liveInstanceMap.put(instance.getInstanceName(), instance);
+ }
+ }
+
+ public void setExternalViews(List<ExternalView> externalViewList) {
+ for (ExternalView ev : externalViewList) {
+ _externalViewMap.put(ev.getResourceName(), ev);
+ }
+ }
+}
diff --git a/helix-view-aggregator/src/test/resources/log4j.properties b/helix-view-aggregator/src/test/resources/log4j.properties
index 24b6d10..8e9c0c0 100644
--- a/helix-view-aggregator/src/test/resources/log4j.properties
+++ b/helix-view-aggregator/src/test/resources/log4j.properties
@@ -22,11 +22,11 @@
# A1 is set to be a ConsoleAppender.
log4j.appender.C=org.apache.log4j.ConsoleAppender
log4j.appender.C.layout=org.apache.log4j.PatternLayout
-log4j.appender.C.layout.ConversionPattern=%-4r [%t] %-5p %c %x - %m%n
+log4j.appender.C.layout.ConversionPattern=[%d] [%-5p] [%t] [%c:%L] - %m%n
log4j.appender.R=org.apache.log4j.RollingFileAppender
log4j.appender.R.layout=org.apache.log4j.PatternLayout
-log4j.appender.R.layout.ConversionPattern=%5p [%C:%M] (%F:%L) - %m%n
+log4j.appender.R.layout.ConversionPattern=[%d] [%-5p] [%t] [%c:%L] - %m%n
log4j.appender.R.File=target/ClusterManagerLogs/log.txt
log4j.appender.STATUSDUMP=org.apache.log4j.RollingFileAppender
@@ -35,6 +35,7 @@
log4j.logger.org.I0Itec=ERROR
log4j.logger.org.apache=ERROR
+log4j.logger.org.apache.helix.view=INFO
log4j.logger.com.noelios=ERROR
log4j.logger.org.restlet=ERROR