blob: 2f1dee269fa9fb73698e4caad33ddc1e613e5642 [file] [log] [blame]
package org.apache.helix.integration;
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
import java.io.IOException;
import java.lang.management.ManagementFactory;
import java.util.Collections;
import java.util.Date;
import java.util.HashSet;
import java.util.Set;
import javax.management.MBeanServerConnection;
import javax.management.MalformedObjectNameException;
import javax.management.ObjectName;
import org.apache.helix.ConfigAccessor;
import org.apache.helix.HelixDataAccessor;
import org.apache.helix.PropertyKey;
import org.apache.helix.TestHelper;
import org.apache.helix.controller.rebalancer.strategy.CrushEdRebalanceStrategy;
import org.apache.helix.controller.rebalancer.strategy.CrushRebalanceStrategy;
import org.apache.helix.integration.common.ZkStandAloneCMTestBase;
import org.apache.helix.integration.manager.ClusterControllerManager;
import org.apache.helix.integration.manager.MockParticipantManager;
import org.apache.helix.manager.zk.ZKHelixDataAccessor;
import org.apache.helix.model.BuiltInStateModelDefinitions;
import org.apache.helix.model.ClusterConfig;
import org.apache.helix.model.IdealState;
import org.apache.helix.model.IdealState.RebalanceMode;
import org.apache.helix.model.InstanceConfig;
import org.apache.helix.model.builder.FullAutoModeISBuilder;
import org.apache.helix.monitoring.mbeans.MonitorDomainNames;
import org.apache.helix.monitoring.mbeans.ResourceMonitor;
import org.apache.helix.tools.ClusterVerifiers.BestPossibleExternalViewVerifier;
import org.apache.helix.tools.ClusterVerifiers.ZkHelixClusterVerifier;
import org.testng.Assert;
import org.testng.annotations.BeforeClass;
import org.testng.annotations.BeforeMethod;
import org.testng.annotations.Test;
import static org.apache.helix.monitoring.mbeans.ClusterStatusMonitor.CLUSTER_DN_KEY;
import static org.apache.helix.monitoring.mbeans.ClusterStatusMonitor.RESOURCE_DN_KEY;
import static org.apache.helix.util.StatusUpdateUtil.ErrorType.RebalanceResourceFailure;
public class TestAlertingRebalancerFailure extends ZkStandAloneCMTestBase {
private static final long TIMEOUT = 180 * 1000L;
private static final Set<String> _instanceNames = new HashSet<>();
private static final MBeanServerConnection _server = ManagementFactory.getPlatformMBeanServer();
protected static final int NODE_NR = 3;
private static String testDb = "TestDB_AlertingRebalancerFailure";
private ZKHelixDataAccessor accessor;
private PropertyKey errorNodeKey;
@Override
@BeforeClass
public void beforeClass() throws Exception {
System.out.println("START " + CLASS_NAME + " at " + new Date(System.currentTimeMillis()));
// Clean up all JMX objects
for (ObjectName mbean : _server.queryNames(null, null)) {
try {
_server.unregisterMBean(mbean);
} catch (Exception e) {
// OK
}
}
// setup storage cluster
_gSetupTool.addCluster(CLUSTER_NAME, true);
for (int i = 0; i < NODE_NR; i++) {
String storageNodeName = PARTICIPANT_PREFIX + "_" + (START_PORT + i);
_gSetupTool.addInstanceToCluster(CLUSTER_NAME, storageNodeName);
}
// start controller
String controllerName = CONTROLLER_PREFIX + "_0";
_controller = new ClusterControllerManager(ZK_ADDR, CLUSTER_NAME, controllerName);
_controller.syncStart();
// start dummy participants
for (int i = 0; i < NODE_NR; i++) {
String instanceName = PARTICIPANT_PREFIX + "_" + (START_PORT + i);
_instanceNames.add(instanceName);
_participants[i] = new MockParticipantManager(ZK_ADDR, CLUSTER_NAME, instanceName);
_participants[i].syncStart();
}
enablePersistBestPossibleAssignment(_gZkClient, CLUSTER_NAME, true);
accessor = new ZKHelixDataAccessor(CLUSTER_NAME, _baseAccessor);
errorNodeKey = accessor.keyBuilder().controllerTaskError(RebalanceResourceFailure.name());
_clusterVerifier =
new BestPossibleExternalViewVerifier.Builder(CLUSTER_NAME)
.setZkClient(_gZkClient)
.setWaitTillVerify(TestHelper.DEFAULT_REBALANCE_PROCESSING_WAIT_TIME)
.build();
}
@BeforeMethod
public void beforeMethod() throws IOException {
// Ensure error has been removed
accessor.removeProperty(errorNodeKey);
}
@Test
public void testParticipantUnavailable() throws Exception {
IdealState idealState = new FullAutoModeISBuilder(testDb)
.setStateModel(BuiltInStateModelDefinitions.MasterSlave.name())
.setStateModelFactoryName("DEFAULT").setNumPartitions(5).setNumReplica(3)
.setRebalancerMode(IdealState.RebalanceMode.FULL_AUTO)
.setRebalancerClass("org.apache.helix.controller.rebalancer.DelayedAutoRebalancer")
.setRebalanceStrategy(
"org.apache.helix.controller.rebalancer.strategy.CrushEdRebalanceStrategy").build();
_gSetupTool.addResourceToCluster(CLUSTER_NAME, testDb, idealState);
_gSetupTool.rebalanceStorageCluster(CLUSTER_NAME, testDb, 3);
ZkHelixClusterVerifier verifier =
new BestPossibleExternalViewVerifier.Builder(CLUSTER_NAME).setZkClient(_gZkClient)
.setResources(new HashSet<>(Collections.singleton(testDb)))
.setWaitTillVerify(TestHelper.DEFAULT_REBALANCE_PROCESSING_WAIT_TIME)
.build();
Assert.assertTrue(verifier.verifyByPolling());
// disable then enable the resource to ensure no rebalancing error is generated during this
// process
_gSetupTool.dropResourceFromCluster(CLUSTER_NAME, testDb);
_gSetupTool.addResourceToCluster(CLUSTER_NAME, testDb, idealState);
_gSetupTool.rebalanceStorageCluster(CLUSTER_NAME, testDb, 3);
Assert.assertTrue(verifier.verifyByPolling());
// Verify there is no rebalance error logged
Assert.assertNull(accessor.getProperty(errorNodeKey));
checkRebalanceFailureGauge(false);
checkResourceBestPossibleCalFailureState(ResourceMonitor.RebalanceStatus.NORMAL, testDb);
// kill nodes, so rebalance cannot be done
for (int i = 0; i < NODE_NR; i++) {
_participants[i].syncStop();
}
// Verify the rebalance error caused by no node available
pollForError(accessor, errorNodeKey);
checkRebalanceFailureGauge(true);
checkResourceBestPossibleCalFailureState(
ResourceMonitor.RebalanceStatus.BEST_POSSIBLE_STATE_CAL_FAILED, testDb);
// clean up
_gSetupTool.getClusterManagementTool().dropResource(CLUSTER_NAME, testDb);
for (int i = 0; i < NODE_NR; i++) {
_participants[i] =
new MockParticipantManager(ZK_ADDR, CLUSTER_NAME, _participants[i].getInstanceName());
_participants[i].syncStart();
}
}
@Test(dependsOnMethods = "testParticipantUnavailable")
public void testTagSetIncorrect() throws Exception {
_gSetupTool.addResourceToCluster(CLUSTER_NAME, testDb, 5,
BuiltInStateModelDefinitions.MasterSlave.name(), RebalanceMode.FULL_AUTO.name(),
CrushEdRebalanceStrategy.class.getName());
ZkHelixClusterVerifier verifier = new BestPossibleExternalViewVerifier.Builder(CLUSTER_NAME)
.setZkClient(_gZkClient).setResources(new HashSet<>(Collections.singleton(testDb)))
.setWaitTillVerify(TestHelper.DEFAULT_REBALANCE_PROCESSING_WAIT_TIME)
.build();
_gSetupTool.getClusterManagementTool().rebalance(CLUSTER_NAME, testDb, 3);
Assert.assertTrue(verifier.verifyByPolling());
// Verify there is no rebalance error logged
Assert.assertNull(accessor.getProperty(errorNodeKey));
Assert.assertTrue(_clusterVerifier.verifyByPolling());
checkRebalanceFailureGauge(false);
checkResourceBestPossibleCalFailureState(ResourceMonitor.RebalanceStatus.NORMAL, testDb);
// set expected instance tag
IdealState is =
_gSetupTool.getClusterManagementTool().getResourceIdealState(CLUSTER_NAME, testDb);
is.setInstanceGroupTag("RandomTag");
_gSetupTool.getClusterManagementTool().setResourceIdealState(CLUSTER_NAME, testDb, is);
_gSetupTool.rebalanceStorageCluster(CLUSTER_NAME, testDb, 3);
// Verify there is rebalance error logged
pollForError(accessor, errorNodeKey);
checkRebalanceFailureGauge(true);
checkResourceBestPossibleCalFailureState(
ResourceMonitor.RebalanceStatus.BEST_POSSIBLE_STATE_CAL_FAILED, testDb);
// clean up
_gSetupTool.getClusterManagementTool().dropResource(CLUSTER_NAME, testDb);
}
@Test(dependsOnMethods = "testTagSetIncorrect")
public void testWithDomainId() throws Exception {
int replicas = 2;
ConfigAccessor configAccessor = new ConfigAccessor(_gZkClient);
// 1. disable all participants except one node, then set domain Id
for (int i = NODE_NR - 1; i >= 0; i--) {
if (i < replicas) {
setDomainId(_participants[i].getInstanceName(), configAccessor);
} else {
setInstanceEnable(_participants[i].getInstanceName(), false, configAccessor);
}
}
// enable topology aware
ClusterConfig clusterConfig = configAccessor.getClusterConfig(CLUSTER_NAME);
clusterConfig.setTopologyAwareEnabled(true);
clusterConfig.setTopology("/Rack/Instance");
clusterConfig.setFaultZoneType("Rack");
configAccessor.setClusterConfig(CLUSTER_NAME, clusterConfig);
// Ensure error caused by node config changes has been removed.
// Error may be recorded unexpectedly when a resource from other tests is not cleaned up.
accessor.removeProperty(errorNodeKey);
_gSetupTool.addResourceToCluster(CLUSTER_NAME, testDb, 5,
BuiltInStateModelDefinitions.MasterSlave.name(), RebalanceMode.FULL_AUTO.name(),
CrushRebalanceStrategy.class.getName());
_gSetupTool.rebalanceStorageCluster(CLUSTER_NAME, testDb, replicas);
ZkHelixClusterVerifier verifier = new BestPossibleExternalViewVerifier.Builder(CLUSTER_NAME)
.setZkClient(_gZkClient).setResources(new HashSet<>(Collections.singleton(testDb)))
.setWaitTillVerify(TestHelper.DEFAULT_REBALANCE_PROCESSING_WAIT_TIME)
.build();
Assert.assertTrue(verifier.verifyByPolling());
// Verify there is no rebalance error logged
Assert.assertNull(accessor.getProperty(errorNodeKey));
checkRebalanceFailureGauge(false);
checkResourceBestPossibleCalFailureState(ResourceMonitor.RebalanceStatus.NORMAL, testDb);
// 2. enable the rest nodes with no domain Id
for (int i = replicas; i < NODE_NR; i++) {
setInstanceEnable(_participants[i].getInstanceName(), true, configAccessor);
}
// Verify there is rebalance error logged
pollForError(accessor, errorNodeKey);
checkRebalanceFailureGauge(true);
checkResourceBestPossibleCalFailureState(
ResourceMonitor.RebalanceStatus.BEST_POSSIBLE_STATE_CAL_FAILED, testDb);
// 3. reset all nodes domain Id to be correct setting
for (int i = replicas; i < NODE_NR; i++) {
setDomainId(_participants[i].getInstanceName(), configAccessor);
}
_gSetupTool.rebalanceStorageCluster(CLUSTER_NAME, testDb, replicas);
Assert.assertTrue(_clusterVerifier.verify());
// Verify that rebalance error state is removed
checkRebalanceFailureGauge(false);
checkResourceBestPossibleCalFailureState(ResourceMonitor.RebalanceStatus.NORMAL, testDb);
// clean up
_gSetupTool.getClusterManagementTool().dropResource(CLUSTER_NAME, testDb);
clusterConfig.setTopologyAwareEnabled(false);
}
private ObjectName getMbeanName(String clusterName) throws MalformedObjectNameException {
String clusterBeanName = String.format("%s=%s", CLUSTER_DN_KEY, clusterName);
return new ObjectName(
String.format("%s:%s", MonitorDomainNames.ClusterStatus.name(), clusterBeanName));
}
private ObjectName getResourceMbeanName(String clusterName, String resourceName)
throws MalformedObjectNameException {
String resourceBeanName =
String.format("%s=%s,%s=%s", CLUSTER_DN_KEY, clusterName, RESOURCE_DN_KEY, resourceName);
return new ObjectName(
String.format("%s:%s", MonitorDomainNames.ClusterStatus.name(), resourceBeanName));
}
private void setDomainId(String instanceName, ConfigAccessor configAccessor) {
String domain = String.format("Rack=%s, Instance=%s", instanceName, instanceName);
InstanceConfig instanceConfig = configAccessor.getInstanceConfig(CLUSTER_NAME, instanceName);
instanceConfig.setDomain(domain);
configAccessor.setInstanceConfig(CLUSTER_NAME, instanceName, instanceConfig);
}
private void setInstanceEnable(String instanceName, boolean enabled,
ConfigAccessor configAccessor) {
InstanceConfig instanceConfig = configAccessor.getInstanceConfig(CLUSTER_NAME, instanceName);
instanceConfig.setInstanceEnabled(enabled);
configAccessor.setInstanceConfig(CLUSTER_NAME, instanceName, instanceConfig);
}
private void checkRebalanceFailureGauge(final boolean expectFailure) throws Exception {
boolean result = TestHelper.verify(() -> {
try {
Long value =
(Long) _server.getAttribute(getMbeanName(CLUSTER_NAME), "RebalanceFailureGauge");
return value != null && (value == 1) == expectFailure;
} catch (Exception e) {
return false;
}
}, TIMEOUT);
Assert.assertTrue(result);
}
private void checkResourceBestPossibleCalFailureState(
final ResourceMonitor.RebalanceStatus expectedState, final String resourceName)
throws Exception {
boolean result = TestHelper.verify(() -> {
try {
String state = (String) _server
.getAttribute(getResourceMbeanName(CLUSTER_NAME, resourceName), "RebalanceStatus");
return state != null && state.equals(expectedState.name());
} catch (Exception e) {
return false;
}
}, TIMEOUT);
Assert.assertTrue(result);
}
private void pollForError(final HelixDataAccessor accessor, final PropertyKey key)
throws Exception {
boolean result = TestHelper.verify(() -> {
/*
* TODO re-enable this check when we start recording rebalance error again
* return accessor.getProperty(key) != null;
*/
return true;
}, TIMEOUT);
Assert.assertTrue(result);
}
}