blob: e642a304477f2dfd578b9fdcc1776baabafe11e9 [file] [log] [blame]
package org.apache.helix.view.aggregator;
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.atomic.AtomicInteger;
import org.apache.helix.HelixConstants;
import org.apache.helix.HelixDataAccessor;
import org.apache.helix.HelixProperty;
import org.apache.helix.MockAccessor;
import org.apache.helix.PropertyKey;
import org.apache.helix.PropertyType;
import org.apache.helix.api.config.ViewClusterSourceConfig;
import org.apache.helix.common.ZkTestBase;
import org.apache.helix.model.ExternalView;
import org.apache.helix.model.InstanceConfig;
import org.apache.helix.model.LiveInstance;
import org.apache.helix.view.dataprovider.SourceClusterDataProvider;
import org.apache.helix.view.mock.MockSourceClusterDataProvider;
import org.testng.Assert;
import org.testng.annotations.Test;
public class TestViewClusterRefresher extends ZkTestBase {
private static final String viewClusterName = "viewCluster";
private static final int numSourceCluster = 3;
private static final int numInstancePerSourceCluster = 2;
private static final int numExternalViewPerSourceCluster = 3;
private static final int numPartition = 3;
private static final List<PropertyType> defaultProperties =
Arrays.asList(PropertyType.LIVEINSTANCES, PropertyType.INSTANCES, PropertyType.EXTERNALVIEW);
private static class CounterBasedMockAccessor extends MockAccessor {
private AtomicInteger _setCount = new AtomicInteger(0);
private AtomicInteger _removeCount = new AtomicInteger(0);
public CounterBasedMockAccessor(String clusterName) {
super(clusterName);
resetCounters();
}
public void resetCounters() {
_setCount.set(0);
_removeCount.set(0);
}
@Override
public boolean setProperty(PropertyKey key, HelixProperty value) {
_setCount.incrementAndGet();
return super.setProperty(key, value);
}
@Override
public boolean removeProperty(PropertyKey key) {
_removeCount.incrementAndGet();
return super.removeProperty(key);
}
public int getSetCount() {
return _setCount.get();
}
public int getRemoveCount() {
return _removeCount.get();
}
}
@Test
public void testRefreshWithNoChange() {
CounterBasedMockAccessor viewClusterDataAccessor = new CounterBasedMockAccessor(viewClusterName);
Map<String, SourceClusterDataProvider> dataProviderMap = new HashMap<>();
createMockDataProviders(dataProviderMap);
ViewClusterRefresher refresher =
new ViewClusterRefresher(viewClusterName, viewClusterDataAccessor);
refresher.updateProviderView(new HashSet<>(dataProviderMap.values()));
// Refresh an empty view cluster
Assert.assertTrue(refresher.refreshPropertiesInViewCluster(PropertyType.LIVEINSTANCES));
Assert.assertTrue(refresher.refreshPropertiesInViewCluster(PropertyType.INSTANCES));
Assert.assertTrue(refresher.refreshPropertiesInViewCluster(PropertyType.EXTERNALVIEW));
viewClusterDataAccessor.resetCounters();
refresher.refreshViewClusterDataCache();
// Refresh again without change
Assert.assertTrue(refresher.refreshPropertiesInViewCluster(PropertyType.LIVEINSTANCES));
Assert.assertTrue(refresher.refreshPropertiesInViewCluster(PropertyType.INSTANCES));
Assert.assertTrue(refresher.refreshPropertiesInViewCluster(PropertyType.EXTERNALVIEW));
Assert.assertEquals(viewClusterDataAccessor.getSetCount(), 0);
}
@Test
public void testRefreshWithInstanceChange() throws InterruptedException {
CounterBasedMockAccessor viewClusterDataAccessor = new CounterBasedMockAccessor(viewClusterName);
Map<String, SourceClusterDataProvider> dataProviderMap = new HashMap<>();
createMockDataProviders(dataProviderMap);
ViewClusterRefresher refresher =
new ViewClusterRefresher(viewClusterName, viewClusterDataAccessor);
refresher.updateProviderView(new HashSet<>(dataProviderMap.values()));
MockSourceClusterDataProvider sampleProvider =
(MockSourceClusterDataProvider) dataProviderMap.get("cluster0");
// Refresh an empty view cluster
Assert.assertTrue(refresher.refreshPropertiesInViewCluster(PropertyType.LIVEINSTANCES));
Assert.assertTrue(refresher.refreshPropertiesInViewCluster(PropertyType.INSTANCES));
// multiply by 2 for both instance config and live instances
Assert.assertEquals(viewClusterDataAccessor.getSetCount(),
numSourceCluster * numInstancePerSourceCluster * 2);
verifyInstances(viewClusterDataAccessor, dataProviderMap);
// Source cluster has instances deleted
viewClusterDataAccessor.resetCounters();
sampleProvider.clearCache(HelixConstants.ChangeType.LIVE_INSTANCE);
sampleProvider.clearCache(HelixConstants.ChangeType.INSTANCE_CONFIG);
Assert.assertTrue(refresher.refreshPropertiesInViewCluster(PropertyType.LIVEINSTANCES));
Assert.assertTrue(refresher.refreshPropertiesInViewCluster(PropertyType.INSTANCES));
Assert.assertEquals(viewClusterDataAccessor.getRemoveCount(), numInstancePerSourceCluster * 2);
verifyInstances(viewClusterDataAccessor, dataProviderMap);
// Source cluster has live instances added
viewClusterDataAccessor.resetCounters();
List<LiveInstance> liveInstances =
new ArrayList<>(sampleProvider.getLiveInstances().values());
liveInstances.add(new LiveInstance("newLiveInstance"));
sampleProvider.setLiveInstances(liveInstances);
refresher.refreshViewClusterDataCache();
Assert.assertTrue(refresher.refreshPropertiesInViewCluster(PropertyType.LIVEINSTANCES));
Assert.assertTrue(refresher.refreshPropertiesInViewCluster(PropertyType.INSTANCES));
Assert.assertEquals(viewClusterDataAccessor.getSetCount(), 1);
verifyInstances(viewClusterDataAccessor, dataProviderMap);
}
@Test
public void testRefreshWithExternalViewChange() {
CounterBasedMockAccessor accessor = new CounterBasedMockAccessor(viewClusterName);
Map<String, SourceClusterDataProvider> dataProviderMap = new HashMap<>();
createMockDataProviders(dataProviderMap);
ViewClusterRefresher refresher =
new ViewClusterRefresher(viewClusterName, accessor);
refresher.updateProviderView(new HashSet<>(dataProviderMap.values()));
MockSourceClusterDataProvider sampleProvider =
(MockSourceClusterDataProvider) dataProviderMap.get("cluster0");
// Refresh an empty view cluster
Assert.assertTrue(refresher.refreshPropertiesInViewCluster(PropertyType.EXTERNALVIEW));
// We only have 2 resource names so should be setting 2 evs
Assert.assertEquals(accessor.getSetCount(), numExternalViewPerSourceCluster);
Assert.assertEquals(accessor.getRemoveCount(), 0);
// Partition count should be maintained
// Since we are creating 1 replica per source cluster so replica should be aggregated.
verifyExternalView(accessor, numExternalViewPerSourceCluster, numPartition, numSourceCluster);
// One cluster deletes all its EVs, number of partition should reflect in changes
accessor.resetCounters();
sampleProvider.clearCache(HelixConstants.ChangeType.EXTERNAL_VIEW);
Assert.assertTrue(refresher.refreshPropertiesInViewCluster(PropertyType.EXTERNALVIEW));
Assert.assertEquals(accessor.getSetCount(), numExternalViewPerSourceCluster);
Assert.assertEquals(accessor.getRemoveCount(), 0);
verifyExternalView(accessor, numExternalViewPerSourceCluster, numPartition,
numSourceCluster - 1);
// All EVs are deleted, and all EVs should be removed from view cluster
for (SourceClusterDataProvider provider : dataProviderMap.values()) {
provider.clearCache(HelixConstants.ChangeType.EXTERNAL_VIEW);
}
accessor.resetCounters();
Assert.assertTrue(refresher.refreshPropertiesInViewCluster(PropertyType.EXTERNALVIEW));
Assert.assertEquals(accessor.getSetCount(), 0);
Assert.assertEquals(accessor.getRemoveCount(), numExternalViewPerSourceCluster);
verifyExternalView(accessor, 0, 0, 0);
}
@Test
public void testRefreshWithProviderChange() {
CounterBasedMockAccessor viewClusterDataAccessor = new CounterBasedMockAccessor(viewClusterName);
Map<String, SourceClusterDataProvider> dataProviderMap = new HashMap<>();
createMockDataProviders(dataProviderMap);
ViewClusterRefresher refresher =
new ViewClusterRefresher(viewClusterName, viewClusterDataAccessor);
refresher.updateProviderView(new HashSet<>(dataProviderMap.values()));
MockSourceClusterDataProvider sampleProvider =
(MockSourceClusterDataProvider) dataProviderMap.get("cluster0");
// Refresh an empty view cluster
Assert.assertTrue(refresher.refreshPropertiesInViewCluster(PropertyType.LIVEINSTANCES));
Assert.assertTrue(refresher.refreshPropertiesInViewCluster(PropertyType.INSTANCES));
Assert.assertTrue(refresher.refreshPropertiesInViewCluster(PropertyType.EXTERNALVIEW));
verifyInstances(viewClusterDataAccessor, dataProviderMap);
verifyExternalView(viewClusterDataAccessor, numExternalViewPerSourceCluster, numPartition,
numSourceCluster);
viewClusterDataAccessor.resetCounters();
// remove InstanceConfig and ExternalView requirement from sample provider
sampleProvider.getConfig()
.setProperties(Collections.singletonList(PropertyType.LIVEINSTANCES));
refresher.refreshViewClusterDataCache();
// Refresh again
Assert.assertTrue(refresher.refreshPropertiesInViewCluster(PropertyType.LIVEINSTANCES));
Assert.assertTrue(refresher.refreshPropertiesInViewCluster(PropertyType.INSTANCES));
Assert.assertTrue(refresher.refreshPropertiesInViewCluster(PropertyType.EXTERNALVIEW));
// Removing external view from config of 1 cluster will not result in removing any external view
// but to update external views
Assert.assertEquals(viewClusterDataAccessor.getSetCount(), numExternalViewPerSourceCluster);
Assert.assertEquals(viewClusterDataAccessor.getRemoveCount(), numInstancePerSourceCluster);
verifyExternalView(viewClusterDataAccessor, numExternalViewPerSourceCluster, numPartition,
numSourceCluster - 1);
verifyInstances(viewClusterDataAccessor, dataProviderMap);
// Remove external view from all source clusters, will resulting in removing all external views
viewClusterDataAccessor.resetCounters();
for (SourceClusterDataProvider provider : dataProviderMap.values()) {
MockSourceClusterDataProvider mockProvider = (MockSourceClusterDataProvider) provider;
if (mockProvider != sampleProvider) {
mockProvider.getConfig().setProperties(Arrays
.asList(PropertyType.LIVEINSTANCES, PropertyType.INSTANCES));
}
}
// Refresh again
Assert.assertTrue(refresher.refreshPropertiesInViewCluster(PropertyType.LIVEINSTANCES));
Assert.assertTrue(refresher.refreshPropertiesInViewCluster(PropertyType.INSTANCES));
Assert.assertTrue(refresher.refreshPropertiesInViewCluster(PropertyType.EXTERNALVIEW));
// No change in instances, but external views are all removed
Assert.assertEquals(viewClusterDataAccessor.getSetCount(), 0);
Assert.assertEquals(viewClusterDataAccessor.getRemoveCount(), numExternalViewPerSourceCluster);
verifyExternalView(viewClusterDataAccessor, 0, 0, 0);
verifyInstances(viewClusterDataAccessor, dataProviderMap);
}
private void verifyExternalView(HelixDataAccessor accessor, int expectedResourceCnt,
int expectedPartitionPerResource, int expectedReplicaPerPartition) {
PropertyKey.Builder keyBuilder = accessor.keyBuilder();
Assert.assertEquals(accessor.getChildNames(keyBuilder.externalViews()).size(),
expectedResourceCnt);
for (String resourceName : accessor.getChildNames(keyBuilder.externalViews())) {
ExternalView ev = accessor.getProperty(keyBuilder.externalView(resourceName));
Assert.assertEquals(ev.getPartitionSet().size(), expectedPartitionPerResource);
for (String partitionName : ev.getPartitionSet()) {
Assert.assertEquals(ev.getStateMap(partitionName).size(), expectedReplicaPerPartition);
}
}
}
private void verifyInstances(HelixDataAccessor accessor,
Map<String, SourceClusterDataProvider> dataProviderMap) {
// Verify live instances - since we don't modify ZNode's internal content,
// we just verify names
Set<String> fetchedNames = new HashSet<>(accessor
.getChildNames(accessor.keyBuilder().liveInstances()));
Set<String> expectedNames = new HashSet<>();
for (SourceClusterDataProvider provider : dataProviderMap.values()) {
if (provider.getPropertiesToAggregate().contains(PropertyType.LIVEINSTANCES)) {
expectedNames.addAll(provider.getLiveInstanceNames());
}
}
Assert.assertEquals(fetchedNames, expectedNames);
// Verify instance configs - since we don't modify ZNode's internal content,
// we just verify names
fetchedNames = new HashSet<>(accessor
.getChildNames(accessor.keyBuilder().instanceConfigs()));
expectedNames = new HashSet<>();
for (SourceClusterDataProvider provider : dataProviderMap.values()) {
if (provider.getPropertiesToAggregate().contains(PropertyType.INSTANCES)) {
expectedNames.addAll(provider.getInstanceConfigNames());
}
}
Assert.assertEquals(fetchedNames, expectedNames);
}
private void createMockDataProviders(Map<String, SourceClusterDataProvider> dataProviderMap) {
for (int i = 0; i < numSourceCluster; i++) {
String sourceClusterName = "cluster" + i;
ViewClusterSourceConfig sourceConfig =
new ViewClusterSourceConfig(sourceClusterName, ZK_ADDR, defaultProperties);
MockSourceClusterDataProvider provider =
new MockSourceClusterDataProvider(sourceConfig, null);
List<LiveInstance> liveInstanceList = new ArrayList<>();
List<InstanceConfig> instanceConfigList = new ArrayList<>();
List<ExternalView> externalViewList = new ArrayList<>();
// InstanceConfig and LiveInstance
for (int j = 0; j < numInstancePerSourceCluster; j++) {
String instanceName = String.format("%s-instance%s", sourceClusterName, j);
liveInstanceList.add(new LiveInstance(instanceName));
instanceConfigList.add(new InstanceConfig(instanceName));
}
provider.setLiveInstances(liveInstanceList);
provider.setInstanceConfigs(instanceConfigList);
// ExternalView
for (int j = 0; j < numExternalViewPerSourceCluster; j++) {
String resourceName = String.format("Resource%s", j);
ExternalView ev = new ExternalView(resourceName);
for (int k = 0; k < numPartition; k++) {
String partitionName = String.format("Partition%s", k);
Map<String, String> stateMap = new HashMap<>();
stateMap.put(String.format("%s-instance", sourceClusterName), "MASTER");
ev.setStateMap(partitionName, stateMap);
}
externalViewList.add(ev);
}
provider.setExternalViews(externalViewList);
dataProviderMap.put(sourceClusterName, provider);
}
}
}