blob: bac98421dd7a39f423c12560f0cee30f8eca6fbc [file] [log] [blame]
package org.apache.helix.controller.changedetector;
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import org.apache.helix.AccessOption;
import org.apache.helix.HelixConstants.ChangeType;
import org.apache.helix.HelixDataAccessor;
import org.apache.helix.PropertyKey;
import org.apache.helix.TestHelper;
import org.apache.helix.common.ZkTestBase;
import org.apache.helix.controller.dataproviders.ResourceControllerDataProvider;
import org.apache.helix.integration.manager.ClusterControllerManager;
import org.apache.helix.integration.manager.MockParticipantManager;
import org.apache.helix.manager.zk.ZKHelixDataAccessor;
import org.apache.helix.model.ClusterConfig;
import org.apache.helix.model.IdealState;
import org.apache.helix.model.InstanceConfig;
import org.apache.helix.model.ResourceConfig;
import org.testng.Assert;
import org.testng.annotations.AfterClass;
import org.testng.annotations.BeforeClass;
import org.testng.annotations.Test;
/**
* This test contains a series of unit tests for ResourceChangeDetector.
*/
public class TestResourceChangeDetector extends ZkTestBase {
// All possible change types for ResourceChangeDetector except for ClusterConfig
// since we don't provide the names of changed fields for ClusterConfig
private static final ChangeType[] RESOURCE_CHANGE_TYPES = {
ChangeType.IDEAL_STATE, ChangeType.INSTANCE_CONFIG, ChangeType.LIVE_INSTANCE,
ChangeType.RESOURCE_CONFIG, ChangeType.CLUSTER_CONFIG
};
private static final String CLUSTER_NAME = TestHelper.getTestClassName();
private static final String RESOURCE_NAME = "TestDB";
private static final String NEW_RESOURCE_NAME = "TestDB2";
private static final String STATE_MODEL = "MasterSlave";
// There are 5 possible change types for ResourceChangeDetector
private static final int NUM_CHANGE_TYPES = 5;
private static final int NUM_RESOURCES = 1;
private static final int NUM_PARTITIONS = 10;
private static final int NUM_REPLICAS = 3;
private static final int NUM_NODES = 5;
// Create a mock of ResourceControllerDataProvider so that we could manipulate it
private ResourceControllerDataProvider _dataProvider;
private ResourceChangeDetector _resourceChangeDetector;
private ClusterControllerManager _controller;
private MockParticipantManager[] _participants = new MockParticipantManager[NUM_NODES];
private HelixDataAccessor _dataAccessor;
private PropertyKey.Builder _keyBuilder;
@BeforeClass
public void beforeClass() throws Exception {
super.beforeClass();
// Set up a mock cluster
TestHelper.setupCluster(CLUSTER_NAME, ZK_ADDR, 12918, // participant port
"localhost", // participant name prefix
RESOURCE_NAME, // resource name prefix
NUM_RESOURCES, // resources
NUM_PARTITIONS, // partitions per resource
NUM_NODES, // nodes
NUM_REPLICAS, // replicas
STATE_MODEL, true); // do rebalance
// Start a controller
_controller = new ClusterControllerManager(ZK_ADDR, CLUSTER_NAME, "controller_0");
_controller.syncStart();
// Start Participants
for (int i = 0; i < NUM_NODES; i++) {
String instanceName = "localhost_" + (12918 + i);
_participants[i] = new MockParticipantManager(ZK_ADDR, CLUSTER_NAME, instanceName);
_participants[i].syncStart();
}
_dataAccessor = new ZKHelixDataAccessor(CLUSTER_NAME, _baseAccessor);
_keyBuilder = _dataAccessor.keyBuilder();
_resourceChangeDetector = new ResourceChangeDetector();
// Create a custom data provider
_dataProvider = new ResourceControllerDataProvider(CLUSTER_NAME);
}
@AfterClass
public void afterClass() throws Exception {
for (MockParticipantManager participant : _participants) {
if (participant != null && participant.isConnected()) {
participant.syncStop();
}
}
_controller.syncStop();
deleteCluster(CLUSTER_NAME);
Assert.assertFalse(TestHelper.verify(() -> _dataAccessor.getBaseDataAccessor()
.exists("/" + CLUSTER_NAME, AccessOption.PERSISTENT), 20000L));
}
/**
* Tests the initialization of the change detector. It should tell us that there's been changes
* for every change type and for all items per type.
* @throws Exception
*/
@Test
public void testResourceChangeDetectorInit() {
_dataProvider.refresh(_dataAccessor);
_resourceChangeDetector.updateSnapshots(_dataProvider);
Collection<ChangeType> changeTypes = _resourceChangeDetector.getChangeTypes();
Assert.assertEquals(changeTypes.size(), NUM_CHANGE_TYPES,
"Not all change types have been detected for ResourceChangeDetector!");
// Check that the right amount of resources show up as added
checkDetectionCounts(ChangeType.IDEAL_STATE, NUM_RESOURCES, 0, 0);
// Check that the right amount of instances show up as added
checkDetectionCounts(ChangeType.LIVE_INSTANCE, NUM_NODES, 0, 0);
checkDetectionCounts(ChangeType.INSTANCE_CONFIG, NUM_NODES, 0, 0);
// Check that the right amount of cluster config item show up
checkDetectionCounts(ChangeType.CLUSTER_CONFIG, 1, 0, 0);
}
/**
* Add a resource (IS and ResourceConfig) and see if the detector detects it.
*/
@Test(dependsOnMethods = "testResourceChangeDetectorInit")
public void testAddResource() {
// Create an IS and ResourceConfig
_gSetupTool.getClusterManagementTool().addResource(CLUSTER_NAME, NEW_RESOURCE_NAME,
NUM_PARTITIONS, STATE_MODEL);
ResourceConfig resourceConfig = new ResourceConfig(NEW_RESOURCE_NAME);
_dataAccessor.setProperty(_keyBuilder.resourceConfig(NEW_RESOURCE_NAME), resourceConfig);
// Manually notify dataProvider
_dataProvider.notifyDataChange(ChangeType.IDEAL_STATE);
_dataProvider.notifyDataChange(ChangeType.RESOURCE_CONFIG);
// Refresh the data provider
_dataProvider.refresh(_dataAccessor);
// Update the detector
_resourceChangeDetector.updateSnapshots(_dataProvider);
checkChangeTypes(ChangeType.IDEAL_STATE, ChangeType.RESOURCE_CONFIG);
// Check the counts
for (ChangeType type : RESOURCE_CHANGE_TYPES) {
if (type == ChangeType.IDEAL_STATE || type == ChangeType.RESOURCE_CONFIG) {
checkDetectionCounts(type, 1, 0, 0);
} else {
checkDetectionCounts(type, 0, 0, 0);
}
}
// Check that detector gives the right item
Assert.assertTrue(_resourceChangeDetector.getAdditionsByType(ChangeType.RESOURCE_CONFIG)
.contains(NEW_RESOURCE_NAME));
}
/**
* Modify a resource config for the new resource and test that detector detects it.
*/
@Test(dependsOnMethods = "testAddResource")
public void testModifyResource() {
// Modify resource config
ResourceConfig resourceConfig =
_dataAccessor.getProperty(_keyBuilder.resourceConfig(NEW_RESOURCE_NAME));
resourceConfig.getRecord().setSimpleField("Did I change?", "Yes!");
_dataAccessor.updateProperty(_keyBuilder.resourceConfig(NEW_RESOURCE_NAME), resourceConfig);
// Notify data provider and check
_dataProvider.notifyDataChange(ChangeType.RESOURCE_CONFIG);
_dataProvider.refresh(_dataAccessor);
_resourceChangeDetector.updateSnapshots(_dataProvider);
checkChangeTypes(ChangeType.RESOURCE_CONFIG);
// Check the counts
for (ChangeType type : RESOURCE_CHANGE_TYPES) {
if (type == ChangeType.RESOURCE_CONFIG) {
checkDetectionCounts(type, 0, 1, 0);
} else {
checkDetectionCounts(type, 0, 0, 0);
}
}
Assert.assertTrue(_resourceChangeDetector.getChangesByType(ChangeType.RESOURCE_CONFIG)
.contains(NEW_RESOURCE_NAME));
}
/**
* Delete the new resource and test that detector detects it.
*/
@Test(dependsOnMethods = "testModifyResource")
public void testDeleteResource() {
// Delete the newly added resource
_dataAccessor.removeProperty(_keyBuilder.idealStates(NEW_RESOURCE_NAME));
_dataAccessor.removeProperty(_keyBuilder.resourceConfig(NEW_RESOURCE_NAME));
// Notify data provider and check
_dataProvider.notifyDataChange(ChangeType.IDEAL_STATE);
_dataProvider.notifyDataChange(ChangeType.RESOURCE_CONFIG);
_dataProvider.refresh(_dataAccessor);
_resourceChangeDetector.updateSnapshots(_dataProvider);
checkChangeTypes(ChangeType.RESOURCE_CONFIG, ChangeType.IDEAL_STATE);
// Check the counts
for (ChangeType type : RESOURCE_CHANGE_TYPES) {
if (type == ChangeType.IDEAL_STATE || type == ChangeType.RESOURCE_CONFIG) {
checkDetectionCounts(type, 0, 0, 1);
} else {
checkDetectionCounts(type, 0, 0, 0);
}
}
}
/**
* Disconnect and reconnect a Participant and see if detector detects.
*/
@Test(dependsOnMethods = "testDeleteResource")
public void testDisconnectReconnectInstance() {
// Disconnect a Participant
_participants[0].syncStop();
_dataProvider.notifyDataChange(ChangeType.LIVE_INSTANCE);
_dataProvider.refresh(_dataAccessor);
_resourceChangeDetector.updateSnapshots(_dataProvider);
checkChangeTypes(ChangeType.LIVE_INSTANCE);
// Check the counts
for (ChangeType type : RESOURCE_CHANGE_TYPES) {
if (type == ChangeType.LIVE_INSTANCE) {
checkDetectionCounts(type, 0, 0, 1);
} else {
checkDetectionCounts(type, 0, 0, 0);
}
}
// Reconnect the Participant
_participants[0] = new MockParticipantManager(ZK_ADDR, CLUSTER_NAME, "localhost_12918");
_participants[0].syncStart();
_dataProvider.notifyDataChange(ChangeType.LIVE_INSTANCE);
_dataProvider.refresh(_dataAccessor);
_resourceChangeDetector.updateSnapshots(_dataProvider);
checkChangeTypes(ChangeType.LIVE_INSTANCE);
// Check the counts
for (ChangeType type : RESOURCE_CHANGE_TYPES) {
if (type == ChangeType.LIVE_INSTANCE) {
checkDetectionCounts(type, 1, 0, 0);
} else {
checkDetectionCounts(type, 0, 0, 0);
}
}
}
/**
* Remove an instance completely and see if detector detects.
*/
@Test(dependsOnMethods = "testDisconnectReconnectInstance")
public void testRemoveInstance() {
_participants[0].syncStop();
InstanceConfig instanceConfig =
_dataAccessor.getProperty(_keyBuilder.instanceConfig(_participants[0].getInstanceName()));
_gSetupTool.getClusterManagementTool().dropInstance(CLUSTER_NAME, instanceConfig);
_dataProvider.notifyDataChange(ChangeType.LIVE_INSTANCE);
_dataProvider.notifyDataChange(ChangeType.INSTANCE_CONFIG);
_dataProvider.refresh(_dataAccessor);
_resourceChangeDetector.updateSnapshots(_dataProvider);
checkChangeTypes(ChangeType.LIVE_INSTANCE, ChangeType.INSTANCE_CONFIG);
// Check the counts
for (ChangeType type : RESOURCE_CHANGE_TYPES) {
if (type == ChangeType.LIVE_INSTANCE || type == ChangeType.INSTANCE_CONFIG) {
checkDetectionCounts(type, 0, 0, 1);
} else {
checkDetectionCounts(type, 0, 0, 0);
}
}
}
/**
* Modify cluster config and see if detector detects.
*/
@Test(dependsOnMethods = "testRemoveInstance")
public void testModifyClusterConfig() {
// Modify cluster config
ClusterConfig clusterConfig = _dataAccessor.getProperty(_keyBuilder.clusterConfig());
clusterConfig.setTopology("Change");
_dataAccessor.updateProperty(_keyBuilder.clusterConfig(), clusterConfig);
_dataProvider.notifyDataChange(ChangeType.CLUSTER_CONFIG);
_dataProvider.refresh(_dataAccessor);
_resourceChangeDetector.updateSnapshots(_dataProvider);
checkChangeTypes(ChangeType.CLUSTER_CONFIG);
// Check the counts for other types
for (ChangeType type : RESOURCE_CHANGE_TYPES) {
if (type == ChangeType.CLUSTER_CONFIG) {
checkDetectionCounts(type, 0, 1, 0);
} else {
checkDetectionCounts(type, 0, 0, 0);
}
}
}
/**
* Test that change detector gives correct results when there are no changes after updating
* snapshots.
*/
@Test(dependsOnMethods = "testModifyClusterConfig")
public void testNoChange() {
// Test twice to make sure that no change is stable across different runs
for (int i = 0; i < 2; i++) {
_dataProvider.refresh(_dataAccessor);
_resourceChangeDetector.updateSnapshots(_dataProvider);
Assert.assertEquals(_resourceChangeDetector.getChangeTypes().size(), 0);
// Check the counts for all the other types
for (ChangeType type : RESOURCE_CHANGE_TYPES) {
checkDetectionCounts(type, 0, 0, 0);
}
}
}
/**
* Modify IdealState mapping fields for a FULL_AUTO resource and see if detector detects.
*/
@Test(dependsOnMethods = "testNoChange")
public void testIgnoreControllerGeneratedFields() {
// Modify cluster config and IdealState to ensure the mapping field of the IdealState will be
// considered as the fields that are modified by Helix logic.
ClusterConfig clusterConfig = _dataAccessor.getProperty(_keyBuilder.clusterConfig());
clusterConfig.setPersistBestPossibleAssignment(true);
_dataAccessor.updateProperty(_keyBuilder.clusterConfig(), clusterConfig);
// Create an new IS
String resourceName = "Resource" + TestHelper.getTestMethodName();
_gSetupTool.getClusterManagementTool()
.addResource(CLUSTER_NAME, resourceName, NUM_PARTITIONS, STATE_MODEL);
IdealState idealState = _dataAccessor.getProperty(_keyBuilder.idealStates(resourceName));
idealState.setRebalanceMode(IdealState.RebalanceMode.FULL_AUTO);
idealState.getRecord().getMapFields().put("Partition1", new HashMap<>());
_dataAccessor.updateProperty(_keyBuilder.idealStates(resourceName), idealState);
_dataProvider.notifyDataChange(ChangeType.CLUSTER_CONFIG);
_dataProvider.notifyDataChange(ChangeType.IDEAL_STATE);
_dataProvider.refresh(_dataAccessor);
// Test with ignore option to be true
ResourceChangeDetector changeDetector = new ResourceChangeDetector(true);
changeDetector.updateSnapshots(_dataProvider);
// Now, modify the field
idealState.getRecord().getMapFields().put("Partition1", Collections.singletonMap("foo", "bar"));
_dataAccessor.updateProperty(_keyBuilder.idealStates(resourceName), idealState);
_dataProvider.notifyDataChange(ChangeType.IDEAL_STATE);
_dataProvider.refresh(_dataAccessor);
changeDetector.updateSnapshots(_dataProvider);
Assert.assertEquals(changeDetector.getChangeTypes(),
Collections.singleton(ChangeType.IDEAL_STATE));
Assert.assertEquals(
changeDetector.getAdditionsByType(ChangeType.IDEAL_STATE).size() + changeDetector
.getChangesByType(ChangeType.IDEAL_STATE).size() + changeDetector
.getRemovalsByType(ChangeType.IDEAL_STATE).size(), 0);
}
@Test(dependsOnMethods = "testIgnoreControllerGeneratedFields")
public void testResetSnapshots() {
// Initialize a new detector with the existing data
ResourceChangeDetector changeDetector = new ResourceChangeDetector();
_dataProvider.notifyDataChange(ChangeType.IDEAL_STATE);
_dataProvider.refresh(_dataAccessor);
changeDetector.updateSnapshots(_dataProvider);
Assert.assertEquals(
changeDetector.getAdditionsByType(ChangeType.IDEAL_STATE).size() + changeDetector
.getChangesByType(ChangeType.IDEAL_STATE).size() + changeDetector
.getRemovalsByType(ChangeType.IDEAL_STATE).size(), 2);
// Update the detector with old data, since nothing changed, the result will be empty.
_dataProvider.notifyDataChange(ChangeType.IDEAL_STATE);
_dataProvider.refresh(_dataAccessor);
changeDetector.updateSnapshots(_dataProvider);
Assert.assertEquals(
changeDetector.getAdditionsByType(ChangeType.IDEAL_STATE).size() + changeDetector
.getChangesByType(ChangeType.IDEAL_STATE).size() + changeDetector
.getRemovalsByType(ChangeType.IDEAL_STATE).size(), 0);
// Reset the snapshots
changeDetector.resetSnapshots();
// After reset, all the data in the data provider will be treated as new changes
_dataProvider.notifyDataChange(ChangeType.IDEAL_STATE);
_dataProvider.refresh(_dataAccessor);
changeDetector.updateSnapshots(_dataProvider);
Assert.assertEquals(
changeDetector.getAdditionsByType(ChangeType.IDEAL_STATE).size() + changeDetector
.getChangesByType(ChangeType.IDEAL_STATE).size() + changeDetector
.getRemovalsByType(ChangeType.IDEAL_STATE).size(), 2);
}
/**
* Check that the given change types appear in detector's change types.
* @param types
*/
private void checkChangeTypes(ChangeType... types) {
for (ChangeType type : types) {
Assert.assertTrue(_resourceChangeDetector.getChangeTypes().contains(type));
}
}
/**
* Convenience method for checking three types of detections.
* @param changeType
* @param additions
* @param changes
* @param deletions
*/
private void checkDetectionCounts(ChangeType changeType, int additions, int changes,
int deletions) {
Assert.assertEquals(_resourceChangeDetector.getAdditionsByType(changeType).size(), additions);
Assert.assertEquals(_resourceChangeDetector.getChangesByType(changeType).size(), changes);
Assert.assertEquals(_resourceChangeDetector.getRemovalsByType(changeType).size(), deletions);
}
}