blob: 98dd281b5e5dafe774e84e072301c3d956358e19 [file] [log] [blame]
package org.apache.helix.integration;
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import org.apache.helix.HelixDataAccessor;
import org.apache.helix.PropertyKey;
import org.apache.helix.TestHelper;
import org.apache.helix.api.config.StateTransitionThrottleConfig;
import org.apache.helix.common.ZkTestBase;
import org.apache.helix.integration.manager.ClusterControllerManager;
import org.apache.helix.integration.manager.MockParticipantManager;
import org.apache.helix.manager.zk.ZKHelixDataAccessor;
import org.apache.helix.manager.zk.ZkBaseDataAccessor;
import org.apache.helix.model.ClusterConfig;
import org.apache.helix.model.CurrentState;
import org.apache.helix.model.IdealState;
import org.apache.helix.model.InstanceConfig;
import org.apache.helix.model.LiveInstance;
import org.apache.helix.model.builder.FullAutoModeISBuilder;
import org.apache.helix.tools.ClusterVerifiers.BestPossibleExternalViewVerifier;
import org.apache.helix.zookeeper.datamodel.ZNRecord;
import org.testng.Assert;
import org.testng.annotations.Test;
public class TestNoThrottleDisabledPartitions extends ZkTestBase {
private String _resourceName = "TestDB";
private String _clusterName = getShortClassName();
private HelixDataAccessor _accessor;
private MockParticipantManager[] _participants;
/**
* Given the following setup for a partition:
* instance 1 : M
* instance 2 : S
* instance 3 : S
* and no throttle config set for recovery balance
* and throttle config of 1 set for load balance,
* test that disabling instance 1 puts this partition in recovery balance, so that all transitions
* for a partition go through.
* * instance 1 : S (M->S->Offline)
* * instance 2 : M (S->M because it's in recovery)
* * instance 3 : S
* @throws Exception
*/
@Test
public void testDisablingTopStateReplicaByDisablingInstance() throws Exception {
int participantCount = 5;
setupEnvironment(participantCount);
// Set the throttling only for load balance
setThrottleConfigForLoadBalance(1);
// Disable instance 0 so that it will cause a partition to do a load balance
PropertyKey key = _accessor.keyBuilder().instanceConfig(_participants[0].getInstanceName());
InstanceConfig instanceConfig = _accessor.getProperty(key);
instanceConfig.setInstanceEnabled(false);
_accessor.setProperty(key, instanceConfig);
// Resume the controller
ClusterControllerManager controller =
new ClusterControllerManager(ZK_ADDR, _clusterName, "controller_0");
controller.syncStart();
Thread.sleep(500L);
// The disabled instance should not hold any top state replicas (MASTER)
PropertyKey liveInstanceKey =
_accessor.keyBuilder().liveInstance(_participants[0].getInstanceName());
LiveInstance liveInstance = _accessor.getProperty(liveInstanceKey);
if (liveInstance != null) {
String sessionId = liveInstance.getEphemeralOwner();
List<CurrentState> currentStates = _accessor.getChildValues(
_accessor.keyBuilder().currentStates(_participants[0].getInstanceName(), sessionId),
true);
for (CurrentState currentState : currentStates) {
for (Map.Entry<String, String> partitionState : currentState.getPartitionStateMap()
.entrySet()) {
Assert.assertFalse(partitionState.getValue().equals("MASTER"));
}
}
}
// clean up the cluster
controller.syncStop();
for (int i = 0; i < participantCount; i++) {
_participants[i].syncStop();
}
deleteCluster(_clusterName);
}
/**
* Given the following setup for a partition:
* instance 1 : M
* instance 2 : S
* instance 3 : S
* and no throttle config set for recovery balance
* and throttle config of 1 set for load balance,
* Instead of disabling the instance, we disable the partition in the instance config.
* * instance 1 : S (M->S->Offline)
* * instance 2 : M (S->M because it's in recovery)
* * instance 3 : S
* @throws Exception
*/
@Test
public void testDisablingPartitionOnInstance() throws Exception {
int participantCount = 5;
setupEnvironment(participantCount);
// Set the throttling only for load balance
setThrottleConfigForLoadBalance();
// In this setup, TestDB0_2 has a MASTER replica on localhost_12918
disablePartitionOnInstance(_participants[0], _resourceName + "0", "TestDB0_2");
// Resume the controller
ClusterControllerManager controller =
new ClusterControllerManager(ZK_ADDR, _clusterName, "controller_0");
controller.syncStart();
Thread.sleep(500L);
// The disabled instance should not hold any top state replicas (MASTER)
PropertyKey liveInstanceKey =
_accessor.keyBuilder().liveInstance(_participants[0].getInstanceName());
LiveInstance liveInstance = _accessor.getProperty(liveInstanceKey);
if (liveInstance != null) {
String sessionId = liveInstance.getEphemeralOwner();
List<CurrentState> currentStates = _accessor.getChildValues(
_accessor.keyBuilder().currentStates(_participants[0].getInstanceName(), sessionId),
true);
for (CurrentState currentState : currentStates) {
for (Map.Entry<String, String> partitionState : currentState.getPartitionStateMap()
.entrySet()) {
if (partitionState.getKey().equals("TestDB0_2")) {
Assert.assertFalse(partitionState.getValue().equals("MASTER"));
}
}
}
}
// clean up the cluster
controller.syncStop();
for (int i = 0; i < participantCount; i++) {
_participants[i].syncStop();
}
deleteCluster(_clusterName);
}
/**
* Given the following setup for a partition:
* instance 1 : M
* instance 2 : S
* instance 3 : S
* and no throttle config set for recovery balance
* and throttle config of 1 set for load balance,
* Instead of disabling the instance, we disable the partition in the instance config.
* Here, we set the recovery balance config to 0. But we should still see the downward transition
* regardless.
* * instance 1 : S (M->S->Offline)
* * instance 2 : M (S->M because it's in recovery)
* * instance 3 : S
* @throws Exception
*/
@Test
public void testDisablingPartitionOnInstanceWithRecoveryThrottle() throws Exception {
int participantCount = 5;
setupEnvironment(participantCount);
// Set the throttling
setThrottleConfigForLoadBalance();
setThrottleConfigForRecoveryBalance();
// In this setup, TestDB0_2 has a MASTER replica on localhost_12918
disablePartitionOnInstance(_participants[0], _resourceName + "0", "TestDB0_2");
// Resume the controller
ClusterControllerManager controller =
new ClusterControllerManager(ZK_ADDR, _clusterName, "controller_0");
controller.syncStart();
Thread.sleep(500L);
// The disabled instance should not hold any top state replicas (MASTER)
PropertyKey liveInstanceKey =
_accessor.keyBuilder().liveInstance(_participants[0].getInstanceName());
LiveInstance liveInstance = _accessor.getProperty(liveInstanceKey);
if (liveInstance != null) {
String sessionId = liveInstance.getEphemeralOwner();
List<CurrentState> currentStates = _accessor.getChildValues(
_accessor.keyBuilder().currentStates(_participants[0].getInstanceName(), sessionId),
true);
for (CurrentState currentState : currentStates) {
for (Map.Entry<String, String> partitionState : currentState.getPartitionStateMap()
.entrySet()) {
if (partitionState.getKey().equals("TestDB0_2")) {
Assert.assertFalse(partitionState.getValue().equals("MASTER"));
}
}
}
}
// clean up the cluster
controller.syncStop();
for (int i = 0; i < participantCount; i++) {
_participants[i].syncStop();
}
deleteCluster(_clusterName);
}
@Test
public void testNoThrottleOnDisabledInstance() throws Exception {
int participantCount = 5;
setupEnvironment(participantCount);
setThrottleConfig();
// Disable an instance so that it will not be subject to throttling
PropertyKey key = _accessor.keyBuilder().instanceConfig(_participants[0].getInstanceName());
InstanceConfig instanceConfig = _accessor.getProperty(key);
instanceConfig.setInstanceEnabled(false);
_accessor.setProperty(key, instanceConfig);
// Set the state transition delay so that transitions would be processed slowly
DelayedTransitionBase.setDelay(1000000L);
// Resume the controller
ClusterControllerManager controller =
new ClusterControllerManager(ZK_ADDR, _clusterName, "controller_0");
controller.syncStart();
Thread.sleep(500L);
// Check that there are more messages on this Participant despite the throttle config set at 1
Assert.assertTrue(verifyMultipleMessages(_participants[0]));
// clean up the cluster
controller.syncStop();
for (int i = 0; i < participantCount; i++) {
_participants[i].syncStop();
}
deleteCluster(_clusterName);
}
@Test
public void testNoThrottleOnDisabledPartition() throws Exception {
int participantCount = 3;
setupEnvironment(participantCount);
setThrottleConfig(3); // Convert partition to replica mapping should be 1 -> 3
// Disable a partition so that it will not be subject to throttling
String partitionName = _resourceName + "0_0";
for (int i = 0; i < participantCount; i++) {
disablePartitionOnInstance(_participants[i], _resourceName + "0", partitionName);
}
String newResource = "abc";
IdealState idealState = new FullAutoModeISBuilder(newResource).setStateModel("MasterSlave")
.setStateModelFactoryName("DEFAULT").setNumPartitions(5).setNumReplica(3)
.setMinActiveReplica(2).setRebalancerMode(IdealState.RebalanceMode.FULL_AUTO)
.setRebalancerClass("org.apache.helix.controller.rebalancer.DelayedAutoRebalancer")
.setRebalanceStrategy(
"org.apache.helix.controller.rebalancer.strategy.CrushEdRebalanceStrategy")
.build();
_gSetupTool.addResourceToCluster(_clusterName, newResource, idealState);
_gSetupTool.rebalanceStorageCluster(_clusterName, newResource, 3);
// Set the state transition delay so that transitions would be processed slowly
DelayedTransitionBase.setDelay(1000000L);
// Now Helix will try to bring this up on all instances. But the disabled partition will go to
// offline. This should allow each instance to have 2 messages despite having the throttle set
// at 1
ClusterControllerManager controller =
new ClusterControllerManager(ZK_ADDR, _clusterName, "controller_0");
controller.syncStart();
Thread.sleep(500L);
// The throttle quota will be consumed by first partition with all
for (MockParticipantManager participantManager : _participants) {
Assert.assertTrue(verifySingleMessage(participantManager));
}
// clean up the cluster
controller.syncStop();
for (int i = 0; i < participantCount; i++) {
_participants[i].syncStop();
}
deleteCluster(_clusterName);
}
/**
* Set up the cluster and pause the controller.
* @param participantCount
* @throws Exception
*/
private void setupEnvironment(int participantCount) throws Exception {
_participants = new MockParticipantManager[participantCount];
_accessor = new ZKHelixDataAccessor(_clusterName, new ZkBaseDataAccessor<ZNRecord>(_gZkClient));
setupCluster(_clusterName, participantCount);
DelayedTransitionBase transition = new DelayedTransitionBase(10L);
// Start _participants
for (int i = 0; i < participantCount; i++) {
_participants[i] =
new MockParticipantManager(ZK_ADDR, _clusterName, "localhost_" + (12918 + i));
_participants[i].setTransition(transition);
_participants[i].syncStart();
}
// Start the controller and verify that it is in the best possible state
ClusterControllerManager controller =
new ClusterControllerManager(ZK_ADDR, _clusterName, "controller_0");
controller.syncStart();
BestPossibleExternalViewVerifier verifier =
new BestPossibleExternalViewVerifier.Builder(_clusterName).setZkClient(_gZkClient)
.setWaitTillVerify(TestHelper.DEFAULT_REBALANCE_PROCESSING_WAIT_TIME)
.build();
Assert.assertTrue(verifier.verify(3000));
// Pause the controller
controller.syncStop();
}
private void setThrottleConfig() {
setThrottleConfig(1);
}
/**
* Set all throttle configs at 1 so that we could test by observing the number of ongoing
* transitions.
*/
private void setThrottleConfig(int maxReplicas) {
PropertyKey.Builder keyBuilder = _accessor.keyBuilder();
ClusterConfig clusterConfig = _accessor.getProperty(_accessor.keyBuilder().clusterConfig());
clusterConfig.setResourcePriorityField("Name");
List<StateTransitionThrottleConfig> throttleConfigs = new ArrayList<>();
// Add throttling at cluster-level
throttleConfigs.add(new StateTransitionThrottleConfig(
StateTransitionThrottleConfig.RebalanceType.RECOVERY_BALANCE,
StateTransitionThrottleConfig.ThrottleScope.CLUSTER, maxReplicas));
throttleConfigs.add(
new StateTransitionThrottleConfig(StateTransitionThrottleConfig.RebalanceType.LOAD_BALANCE,
StateTransitionThrottleConfig.ThrottleScope.CLUSTER, maxReplicas));
throttleConfigs
.add(new StateTransitionThrottleConfig(StateTransitionThrottleConfig.RebalanceType.ANY,
StateTransitionThrottleConfig.ThrottleScope.CLUSTER, maxReplicas));
// Add throttling at instance level
throttleConfigs
.add(new StateTransitionThrottleConfig(StateTransitionThrottleConfig.RebalanceType.ANY,
StateTransitionThrottleConfig.ThrottleScope.INSTANCE, maxReplicas));
clusterConfig.setStateTransitionThrottleConfigs(throttleConfigs);
_accessor.setProperty(keyBuilder.clusterConfig(), clusterConfig);
}
private void setThrottleConfigForLoadBalance() {
setThrottleConfigForLoadBalance(0);
}
/**
* Set throttle limits only for load balance so that none of them would happen.
*/
private void setThrottleConfigForLoadBalance(int maxReplicas) {
PropertyKey.Builder keyBuilder = _accessor.keyBuilder();
ClusterConfig clusterConfig = _accessor.getProperty(_accessor.keyBuilder().clusterConfig());
clusterConfig.setResourcePriorityField("Name");
List<StateTransitionThrottleConfig> throttleConfigs = new ArrayList<>();
// Add throttling at cluster-level
throttleConfigs.add(
new StateTransitionThrottleConfig(StateTransitionThrottleConfig.RebalanceType.LOAD_BALANCE,
StateTransitionThrottleConfig.ThrottleScope.CLUSTER, maxReplicas));
// Add throttling at instance level
throttleConfigs.add(
new StateTransitionThrottleConfig(StateTransitionThrottleConfig.RebalanceType.LOAD_BALANCE,
StateTransitionThrottleConfig.ThrottleScope.INSTANCE, maxReplicas));
clusterConfig.setStateTransitionThrottleConfigs(throttleConfigs);
_accessor.setProperty(keyBuilder.clusterConfig(), clusterConfig);
}
/**
* Set throttle limits only for recovery balance so that none of them would happen.
*/
private void setThrottleConfigForRecoveryBalance() {
PropertyKey.Builder keyBuilder = _accessor.keyBuilder();
ClusterConfig clusterConfig = _accessor.getProperty(_accessor.keyBuilder().clusterConfig());
clusterConfig.setResourcePriorityField("Name");
List<StateTransitionThrottleConfig> throttleConfigs = new ArrayList<>();
// Add throttling at cluster-level
throttleConfigs.add(new StateTransitionThrottleConfig(
StateTransitionThrottleConfig.RebalanceType.RECOVERY_BALANCE,
StateTransitionThrottleConfig.ThrottleScope.CLUSTER, 0));
// Add throttling at instance level
throttleConfigs.add(new StateTransitionThrottleConfig(
StateTransitionThrottleConfig.RebalanceType.RECOVERY_BALANCE,
StateTransitionThrottleConfig.ThrottleScope.INSTANCE, 0));
clusterConfig.setStateTransitionThrottleConfigs(throttleConfigs);
_accessor.setProperty(keyBuilder.clusterConfig(), clusterConfig);
}
/**
* Set up delayed rebalancer and minimum active replica settings to mimic user's use case.
* @param clusterName
* @param participantCount
* @throws Exception
*/
private void setupCluster(String clusterName, int participantCount) throws Exception {
TestHelper.setupCluster(clusterName, ZK_ADDR, 12918, // participant start port
"localhost", // participant name prefix
_resourceName, // resource name prefix
3, // resources
5, // partitions per resource
participantCount, // number of nodes
3, // replicas
"MasterSlave", IdealState.RebalanceMode.FULL_AUTO, true); // do rebalance
// Enable DelayedAutoRebalance
ClusterConfig clusterConfig = _accessor.getProperty(_accessor.keyBuilder().clusterConfig());
clusterConfig.setDelayRebalaceEnabled(true);
clusterConfig.setRebalanceDelayTime(1800000L);
_accessor.setProperty(_accessor.keyBuilder().clusterConfig(), clusterConfig);
// Set minActiveReplicas at 2
List<String> idealStates = _accessor.getChildNames(_accessor.keyBuilder().idealStates());
for (String is : idealStates) {
IdealState idealState = _accessor.getProperty(_accessor.keyBuilder().idealStates(is));
idealState.setMinActiveReplicas(2);
idealState.setRebalanceStrategy(
"org.apache.helix.controller.rebalancer.strategy.CrushEdRebalanceStrategy");
idealState
.setRebalancerClassName("org.apache.helix.controller.rebalancer.DelayedAutoRebalancer");
_accessor.setProperty(_accessor.keyBuilder().idealStates(is), idealState);
}
}
/**
* Disable select partitions from the first instance to test that these partitions are not subject
* to throttling.
*/
private void disablePartitionOnInstance(MockParticipantManager participant, String resourceName,
String partitionName) {
String instanceName = participant.getInstanceName();
PropertyKey key = _accessor.keyBuilder().instanceConfig(instanceName);
InstanceConfig instanceConfig = _accessor.getProperty(key);
instanceConfig.setInstanceEnabledForPartition(resourceName, partitionName, false);
_accessor.setProperty(key, instanceConfig);
}
/**
* Ensure that there are more than 1 message for a given Participant.
* @param participant
* @return
*/
private boolean verifyMultipleMessages(final MockParticipantManager participant) {
PropertyKey key = _accessor.keyBuilder().messages(participant.getInstanceName());
List<String> messageNames = _accessor.getChildNames(key);
if (messageNames != null) {
return messageNames.size() > 1;
}
return false;
}
/**
* Ensure that there are 1 messages for a given Participant.
* @param participant
* @return
*/
private boolean verifySingleMessage(final MockParticipantManager participant) {
PropertyKey key = _accessor.keyBuilder().messages(participant.getInstanceName());
List<String> messageNames = _accessor.getChildNames(key);
if (messageNames != null) {
return messageNames.size() == 1;
}
return false;
}
}