Implement ViewClusterRefresher caching synchronization (#2199)
Implement ViewClusterRefresher caching synchronization
Implement a refresh mechanism to update view cluster local cache from remote source of truth data in case of external data change.
diff --git a/helix-view-aggregator/src/main/java/org/apache/helix/view/aggregator/HelixViewAggregator.java b/helix-view-aggregator/src/main/java/org/apache/helix/view/aggregator/HelixViewAggregator.java
index b1d34b9..4a44579 100644
--- a/helix-view-aggregator/src/main/java/org/apache/helix/view/aggregator/HelixViewAggregator.java
+++ b/helix-view-aggregator/src/main/java/org/apache/helix/view/aggregator/HelixViewAggregator.java
@@ -75,7 +75,7 @@
_viewClusterManager = HelixManagerFactory
.getZKHelixManager(_viewClusterName, generateHelixManagerInstanceName(_viewClusterName),
InstanceType.SPECTATOR, zkAddr);
- _refreshViewCluster = new AtomicBoolean(false);
+ _refreshViewCluster = new AtomicBoolean(true);
_monitor = new ViewAggregatorMonitor(viewClusterName);
_aggregator = new DedupEventProcessor<ClusterViewEvent.Type, ClusterViewEvent>(_viewClusterName,
"Aggregator") {
@@ -191,7 +191,11 @@
_refreshViewCluster.set(true);
break;
case PeriodicViewRefresh:
- if (!_refreshViewCluster.get()) {
+ // refresh local view cluster data cache
+ boolean cacheChanged = _viewClusterRefresher.refreshViewClusterDataCache();
+ if (cacheChanged) {
+ logger.info("Detected change in view cluster data, trigger a refresh");
+ } else if (!_refreshViewCluster.get()) {
logger.info("Skip refresh: No event happened since last refresh, and no force refresh.");
return;
}
@@ -199,6 +203,7 @@
// least some of the elements in view cluster
logger.info("Refreshing cluster based on event " + event.getEventType().name());
refreshViewCluster();
+ _viewClusterRefresher.refreshViewClusterDataCache();
break;
default:
logger.error(String.format("Unrecognized event type: %s", event.getEventType()));
diff --git a/helix-view-aggregator/src/main/java/org/apache/helix/view/aggregator/ViewClusterRefresher.java b/helix-view-aggregator/src/main/java/org/apache/helix/view/aggregator/ViewClusterRefresher.java
index 1885a73..c6df908 100644
--- a/helix-view-aggregator/src/main/java/org/apache/helix/view/aggregator/ViewClusterRefresher.java
+++ b/helix-view-aggregator/src/main/java/org/apache/helix/view/aggregator/ViewClusterRefresher.java
@@ -34,6 +34,7 @@
import org.apache.helix.PropertyType;
import org.apache.helix.model.ExternalView;
import org.apache.helix.view.dataprovider.SourceClusterDataProvider;
+import org.apache.helix.view.dataprovider.ViewClusterDataCache;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -44,25 +45,18 @@
*/
public class ViewClusterRefresher {
private static final Logger logger = LoggerFactory.getLogger(ViewClusterRefresher.class);
- private String _viewClusterName;
- private HelixDataAccessor _viewClusterDataAccessor;
+ private final String _viewClusterName;
+ private final HelixDataAccessor _viewClusterDataAccessor;
+ private final ViewClusterDataCache _viewClusterDataCache;
private Set<SourceClusterDataProvider> _dataProviderView;
- // These 3 caches stores objects that are pushed to view cluster (write-through) cache,
- // thus we don't need to read from view cluster everytime we refresh it.
- private Map<String, HelixProperty> _viewClusterLiveInstanceCache;
- private Map<String, HelixProperty> _viewClusterInstanceConfigCache;
- private Map<String, HelixProperty> _viewClusterExternalViewCache;
-
public ViewClusterRefresher(String viewClusterName, HelixDataAccessor viewClusterDataAccessor) {
_viewClusterName = viewClusterName;
_viewClusterDataAccessor = viewClusterDataAccessor;
- _viewClusterLiveInstanceCache = new HashMap<>();
- _viewClusterInstanceConfigCache = new HashMap<>();
- _viewClusterExternalViewCache = new HashMap<>();
+ _viewClusterDataCache = new ViewClusterDataCache(viewClusterName);
}
- private class ClusterPropertyDiff {
+ private static class ClusterPropertyDiff {
/**
* List of names of objects to set (create or modify)
*/
@@ -126,7 +120,7 @@
Set<String> listedNamesInSource = new HashSet<>();
Map<String, HelixProperty> sourceProperties = new HashMap<>();
Map<String, HelixProperty> viewClusterPropertyCache =
- getViewClusterPropertyCache(propertyType);
+ (Map<String, HelixProperty>) getViewClusterPropertyCache(propertyType);
if (viewClusterPropertyCache == null) {
throw new IllegalArgumentException(
"Cannot find view cluster property cache. Property: " + propertyType.name());
@@ -170,8 +164,7 @@
}
// Perform refresh
- ok = doRefresh(propertyType, listedNamesInView, listedNamesInSource, sourceProperties,
- viewClusterPropertyCache);
+ ok = doRefresh(propertyType, listedNamesInView, listedNamesInSource, sourceProperties, viewClusterPropertyCache);
} catch (Exception e) {
logger.warn(String
.format("Caught exception during refreshing %s for view cluster %s", propertyType.name(),
@@ -215,12 +208,11 @@
* @param sourcePropertyNames names of all properties (i.e. liveInstances) in all source clusters
* @param cachedSourceProperties all cached properties from source clusters
* @param viewClusterPropertyCache all properties that are previously set successfully to view cluster
- * @param <T> extends HelixProperty
* @return ClusterPropertyDiff object contains diff information
*/
- private <T extends HelixProperty> ClusterPropertyDiff calculatePropertyDiff(
+ private ClusterPropertyDiff calculatePropertyDiff(
Set<String> viewPropertyNames, Set<String> sourcePropertyNames,
- Map<String, T> cachedSourceProperties, Map<String, T> viewClusterPropertyCache) {
+ Map<String, HelixProperty> cachedSourceProperties, Map<String, HelixProperty> viewClusterPropertyCache) {
ClusterPropertyDiff diff = new ClusterPropertyDiff();
// items whose names are in view cluster but not in source should be removed for sure
@@ -228,7 +220,7 @@
toDelete.removeAll(sourcePropertyNames);
diff.addPropertiesToDelete(toDelete);
- for (Map.Entry<String, T> sourceProperty : cachedSourceProperties.entrySet()) {
+ for (Map.Entry<String, HelixProperty> sourceProperty : cachedSourceProperties.entrySet()) {
String name = sourceProperty.getKey();
HelixProperty property = sourceProperty.getValue();
@@ -263,17 +255,15 @@
* @param sourcePropertyNames all names of the target properties in source clusters
* @param cachedSourceProperties all up-to-date cached properties in source cluster
* @param viewClusterPropertyCache view cluster cache
- * @param <T> extends HelixProperty
* @return true if all required refreshes are successful, else false
*/
- private <T extends HelixProperty> boolean doRefresh(PropertyType propertyType,
+ private boolean doRefresh(PropertyType propertyType,
Set<String> viewPropertyNames, Set<String> sourcePropertyNames,
- Map<String, T> cachedSourceProperties, Map<String, T> viewClusterPropertyCache) {
+ Map<String, HelixProperty> cachedSourceProperties, Map<String, HelixProperty> viewClusterPropertyCache) {
boolean ok = true;
// Calculate diff
ClusterPropertyDiff diff =
- calculatePropertyDiff(viewPropertyNames, sourcePropertyNames, cachedSourceProperties,
- viewClusterPropertyCache);
+ calculatePropertyDiff(viewPropertyNames, sourcePropertyNames, cachedSourceProperties, viewClusterPropertyCache);
// Generate property keys
List<PropertyKey> keysToSet = new ArrayList<>();
@@ -293,12 +283,12 @@
}
// Delete outdated properties
- if (!deleteProperties(keysToDelete, viewClusterPropertyCache)) {
+ if (!deleteProperties(keysToDelete)) {
ok = false;
}
// Add or update changed properties
- if (!addOrUpdateProperties(keysToSet, diff.getPropertiesToSet(), viewClusterPropertyCache)) {
+ if (!addOrUpdateProperties(keysToSet, diff.getPropertiesToSet())) {
ok = false;
}
return ok;
@@ -330,14 +320,22 @@
}
}
- private Map<String, HelixProperty> getViewClusterPropertyCache(PropertyType propertyType) {
+ /**
+ * Refresh view cluster data cache and return true if there is data update.
+ * @return true if new change is fetched from remote
+ */
+ boolean refreshViewClusterDataCache() {
+ return _viewClusterDataCache.updateCache(_viewClusterDataAccessor);
+ }
+
+ private Map<String, ? extends HelixProperty> getViewClusterPropertyCache(PropertyType propertyType) {
switch (propertyType) {
case INSTANCES:
- return _viewClusterInstanceConfigCache;
+ return _viewClusterDataCache.getInstanceConfigMap();
case LIVEINSTANCES:
- return _viewClusterLiveInstanceCache;
+ return _viewClusterDataCache.getLiveInstances();
case EXTERNALVIEW:
- return _viewClusterExternalViewCache;
+ return _viewClusterDataCache.getExternalViews();
default:
return null;
}
@@ -348,28 +346,21 @@
* for the objects that got successfully created or updated in ZK
* @param keysToAddOrUpdate
* @param objects
- * @param cache
* @param <T> HelixProperty
*
* @return true if all objects are successfully created or updated, else false
*/
private <T extends HelixProperty> boolean addOrUpdateProperties(
- List<PropertyKey> keysToAddOrUpdate, List<HelixProperty> objects, Map<String, T> cache) {
+ List<PropertyKey> keysToAddOrUpdate, List<HelixProperty> objects) {
boolean ok = true;
logger.info(
String.format("AddOrUpdate %s objects: %s", keysToAddOrUpdate.size(), keysToAddOrUpdate));
boolean[] addOrUpdateResults = _viewClusterDataAccessor.setChildren(keysToAddOrUpdate, objects);
for (int i = 0; i < addOrUpdateResults.length; i++) {
if (!addOrUpdateResults[i]) {
- // Don't add item to cache yet - will retry during next refresh
logger.warn(String.format("Failed to create or update live instance %s, will retry later",
keysToAddOrUpdate.get(i).getPath()));
ok = false;
- } else {
- // Successfully updated ViewClusterZK, proceed to update cache
- @SuppressWarnings("unchecked")
- T property = (T) objects.get(i);
- cache.put(getBaseObjectNameFromPropertyKey(keysToAddOrUpdate.get(i)), property);
}
}
return ok;
@@ -379,38 +370,22 @@
* Delete properties in ZK specified by a list of property keys. Update the given cache
* for the objects that got successfully deleted in ZK
* @param keysToDelete
- * @param cache
* @param <T> HelixProperty
* @return true if all objects got successfully deleted else false
*/
- private <T extends HelixProperty> boolean deleteProperties(List<PropertyKey> keysToDelete,
- Map<String, T> cache) {
+ private <T extends HelixProperty> boolean deleteProperties(List<PropertyKey> keysToDelete) {
boolean ok = true;
logger.info(String.format("Deleting %s objects: %s", keysToDelete.size(), keysToDelete));
for (PropertyKey key : keysToDelete) {
if (!_viewClusterDataAccessor.removeProperty(key)) {
- // Don't remove item from cache yet - will retry during next refresh
ok = false;
logger.warn(String.format("Failed to create or update live instance %s, will retry later",
key.getPath()));
- } else {
- // Successfully updated ViewClusterZK, proceed to update cache
- cache.remove(getBaseObjectNameFromPropertyKey(key));
}
}
return ok;
}
- /**
- * Parse property key and get the id of the ZNode that property key represents
- * @param key
- * @return
- */
- private static String getBaseObjectNameFromPropertyKey(PropertyKey key) {
- String[] params = key.getParams();
- return params[params.length - 1];
- }
-
private void logRefreshResult(PropertyType type, boolean ok) {
if (!ok) {
logger.warn(String
diff --git a/helix-view-aggregator/src/main/java/org/apache/helix/view/dataprovider/ViewClusterDataCache.java b/helix-view-aggregator/src/main/java/org/apache/helix/view/dataprovider/ViewClusterDataCache.java
new file mode 100644
index 0000000..f9b6a72
--- /dev/null
+++ b/helix-view-aggregator/src/main/java/org/apache/helix/view/dataprovider/ViewClusterDataCache.java
@@ -0,0 +1,53 @@
+package org.apache.helix.view.dataprovider;
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+import java.util.HashMap;
+import java.util.Map;
+import org.apache.helix.HelixDataAccessor;
+import org.apache.helix.common.caches.BasicClusterDataCache;
+import org.apache.helix.model.ExternalView;
+import org.apache.helix.model.InstanceConfig;
+import org.apache.helix.model.LiveInstance;
+
+
+public class ViewClusterDataCache extends BasicClusterDataCache {
+
+ public ViewClusterDataCache(String viewClusterName) {
+ super(viewClusterName);
+ }
+
+ /**
+ * Update cache and return true if any local data is changed compared to before refresh.
+ * @param dataAccessor the data accessor used to fetch data.
+ * @return true if there is a change to local cache.
+ */
+ public boolean updateCache(HelixDataAccessor dataAccessor) {
+ Map<String, LiveInstance> liveInstanceSnapshot = new HashMap<>(getLiveInstances());
+ Map<String, ExternalView> externalViewSnapshot = new HashMap<>(getExternalViews());
+ Map<String, InstanceConfig> instanceConfigSnapshot = new HashMap<>(getInstanceConfigMap());
+ requireFullRefresh();
+ refresh(dataAccessor);
+
+ return !(liveInstanceSnapshot.equals(getLiveInstances())
+ && externalViewSnapshot.equals(getExternalViews())
+ && instanceConfigSnapshot.equals(getInstanceConfigMap()));
+ }
+}
diff --git a/helix-view-aggregator/src/test/java/org/apache/helix/view/aggregator/TestViewClusterRefresher.java b/helix-view-aggregator/src/test/java/org/apache/helix/view/aggregator/TestViewClusterRefresher.java
index 2a069ee..e642a30 100644
--- a/helix-view-aggregator/src/test/java/org/apache/helix/view/aggregator/TestViewClusterRefresher.java
+++ b/helix-view-aggregator/src/test/java/org/apache/helix/view/aggregator/TestViewClusterRefresher.java
@@ -103,6 +103,7 @@
Assert.assertTrue(refresher.refreshPropertiesInViewCluster(PropertyType.INSTANCES));
Assert.assertTrue(refresher.refreshPropertiesInViewCluster(PropertyType.EXTERNALVIEW));
viewClusterDataAccessor.resetCounters();
+ refresher.refreshViewClusterDataCache();
// Refresh again without change
Assert.assertTrue(refresher.refreshPropertiesInViewCluster(PropertyType.LIVEINSTANCES));
@@ -147,6 +148,7 @@
new ArrayList<>(sampleProvider.getLiveInstances().values());
liveInstances.add(new LiveInstance("newLiveInstance"));
sampleProvider.setLiveInstances(liveInstances);
+ refresher.refreshViewClusterDataCache();
Assert.assertTrue(refresher.refreshPropertiesInViewCluster(PropertyType.LIVEINSTANCES));
Assert.assertTrue(refresher.refreshPropertiesInViewCluster(PropertyType.INSTANCES));
Assert.assertEquals(viewClusterDataAccessor.getSetCount(), 1);
@@ -219,6 +221,7 @@
// remove InstanceConfig and ExternalView requirement from sample provider
sampleProvider.getConfig()
.setProperties(Collections.singletonList(PropertyType.LIVEINSTANCES));
+ refresher.refreshViewClusterDataCache();
// Refresh again
Assert.assertTrue(refresher.refreshPropertiesInViewCluster(PropertyType.LIVEINSTANCES));
diff --git a/helix-view-aggregator/src/test/java/org/apache/helix/view/dataprovider/TestViewClusterDataCache.java b/helix-view-aggregator/src/test/java/org/apache/helix/view/dataprovider/TestViewClusterDataCache.java
new file mode 100644
index 0000000..006cd02
--- /dev/null
+++ b/helix-view-aggregator/src/test/java/org/apache/helix/view/dataprovider/TestViewClusterDataCache.java
@@ -0,0 +1,68 @@
+package org.apache.helix.view.dataprovider;
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+import java.util.HashSet;
+import java.util.Set;
+import org.apache.helix.HelixConstants;
+import org.apache.helix.common.ZkTestBase;
+import org.apache.helix.integration.manager.MockParticipantManager;
+import org.apache.helix.manager.zk.ZKHelixDataAccessor;
+import org.testng.Assert;
+import org.testng.annotations.BeforeClass;
+import org.testng.annotations.Test;
+
+
+public class TestViewClusterDataCache extends ZkTestBase {
+ private static final int NUM_INSTANCE = 5;
+
+ private final String CLUSTER_NAME = CLUSTER_PREFIX + "_" + getShortClassName();
+ private final Set<String> _instances = new HashSet<>();
+
+ @BeforeClass
+ public void beforeClass() {
+ _gSetupTool.addCluster(CLUSTER_NAME, true);
+ for (int i = 0; i < NUM_INSTANCE; i++) {
+ String instanceName = String.format("%s-%s-%s", CLUSTER_NAME, PARTICIPANT_PREFIX, i);
+ _gSetupTool.addInstanceToCluster(CLUSTER_NAME, instanceName);
+ _instances.add(instanceName);
+ MockParticipantManager participant = new MockParticipantManager(ZK_ADDR, CLUSTER_NAME, instanceName);
+ participant.syncStart();
+ }
+ }
+
+ @Test
+ public void testCacheUpdate() {
+ ViewClusterDataCache cache = new ViewClusterDataCache(CLUSTER_NAME);
+ ZKHelixDataAccessor accessor = new ZKHelixDataAccessor(CLUSTER_NAME, _baseAccessor);
+ Assert.assertTrue(cache.updateCache(accessor));
+ Assert.assertEquals(cache.getLiveInstances().size(), NUM_INSTANCE);
+ Assert.assertEquals(cache.getInstanceConfigMap().size(), NUM_INSTANCE);
+ for (String instance : _instances) {
+ Assert.assertTrue(cache.getLiveInstances().containsKey(instance));
+ }
+ // update again, expect no cache change
+ Assert.assertFalse(cache.updateCache(accessor));
+ // clear cache and update again
+ cache.clearCache(HelixConstants.ChangeType.LIVE_INSTANCE);
+ cache.clearCache(HelixConstants.ChangeType.INSTANCE_CONFIG);
+ Assert.assertTrue(cache.updateCache(accessor));
+ }
+}
diff --git a/helix-view-aggregator/src/test/java/org/apache/helix/view/integration/TestHelixViewAggregator.java b/helix-view-aggregator/src/test/java/org/apache/helix/view/integration/TestHelixViewAggregator.java
index adf16ec..b98b684 100644
--- a/helix-view-aggregator/src/test/java/org/apache/helix/view/integration/TestHelixViewAggregator.java
+++ b/helix-view-aggregator/src/test/java/org/apache/helix/view/integration/TestHelixViewAggregator.java
@@ -26,12 +26,14 @@
import java.util.Set;
import org.apache.helix.ConfigAccessor;
import org.apache.helix.HelixAdmin;
+import org.apache.helix.HelixDataAccessor;
import org.apache.helix.HelixException;
import org.apache.helix.NotificationContext;
import org.apache.helix.PropertyType;
import org.apache.helix.api.config.ViewClusterSourceConfig;
import org.apache.helix.integration.manager.MockParticipantManager;
import org.apache.helix.manager.zk.ZKHelixAdmin;
+import org.apache.helix.manager.zk.ZKHelixDataAccessor;
import org.apache.helix.model.ClusterConfig;
import org.apache.helix.model.Message;
import org.apache.helix.participant.statemachine.StateModelParser;
@@ -214,6 +216,25 @@
triggerViewAggregatorStateTransition("LEADER", "STANDBY");
}
+ @Test(dependsOnMethods = "testHelixViewAggregator")
+ public void testRemoteDataRemovalAndRefresh() throws Exception {
+ HelixDataAccessor accessor = new ZKHelixDataAccessor(viewClusterName, _baseAccessor);
+ // Start view aggregator
+ triggerViewAggregatorStateTransition("STANDBY", "LEADER");
+ // Wait for refresh and verify
+ Thread.sleep((_viewClusterRefreshPeriodSec + 2) * 1000);
+ // remove live instances from view cluster zk data, wait for next refresh trigger
+ Assert.assertTrue(accessor.removeProperty(accessor.keyBuilder().liveInstances()));
+ Thread.sleep((_viewClusterRefreshPeriodSec + 2) * 1000);
+ Assert.assertTrue(accessor.getChildNames(accessor.keyBuilder().liveInstances()).size() > 0);
+
+ Assert.assertTrue(accessor.removeProperty(accessor.keyBuilder().externalViews()));
+ Thread.sleep((_viewClusterRefreshPeriodSec + 2) * 1000);
+ Assert.assertTrue(accessor.getChildNames(accessor.keyBuilder().externalViews()).size() > 0);
+ // Stop view aggregator
+ triggerViewAggregatorStateTransition("LEADER", "STANDBY");
+ }
+
private void resetViewClusterConfig(int refreshPeriod, List<PropertyType> properties) {
List<ViewClusterSourceConfig> sourceConfigs = new ArrayList<>();
for (String sourceCluster : _allSourceClusters) {