blob: 6654098f8bbdb7f86b85c018c190ac306b3dc884 [file] [log] [blame]
package org.apache.helix.integration.controller;
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
import java.io.IOException;
import java.util.HashMap;
import java.util.Map;
import javax.management.MalformedObjectNameException;
import javax.management.ObjectName;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.fasterxml.jackson.databind.type.TypeFactory;
import com.google.common.collect.ImmutableMap;
import org.apache.helix.HelixDataAccessor;
import org.apache.helix.PropertyKey;
import org.apache.helix.TestHelper;
import org.apache.helix.controller.rebalancer.strategy.CrushEdRebalanceStrategy;
import org.apache.helix.integration.manager.MockParticipantManager;
import org.apache.helix.integration.task.TaskTestBase;
import org.apache.helix.integration.task.WorkflowGenerator;
import org.apache.helix.model.ClusterConfig;
import org.apache.helix.model.ControllerHistory;
import org.apache.helix.model.ExternalView;
import org.apache.helix.model.IdealState;
import org.apache.helix.model.MaintenanceSignal;
import org.apache.helix.monitoring.mbeans.MonitorDomainNames;
import org.testng.Assert;
import org.testng.annotations.AfterClass;
import org.testng.annotations.BeforeClass;
import org.testng.annotations.Test;
import static org.apache.helix.monitoring.mbeans.ClusterStatusMonitor.CLUSTER_DN_KEY;
public class TestClusterMaintenanceMode extends TaskTestBase {
private static final long TIMEOUT = 180 * 1000L;
private MockParticipantManager _newInstance;
private String newResourceAddedDuringMaintenanceMode =
String.format("%s_%s", WorkflowGenerator.DEFAULT_TGT_DB, 1);
private HelixDataAccessor _dataAccessor;
private PropertyKey.Builder _keyBuilder;
@BeforeClass
public void beforeClass() throws Exception {
_numDbs = 1;
_numNodes = 3;
_numReplicas = 3;
_numPartitions = 5;
super.beforeClass();
_dataAccessor = _manager.getHelixDataAccessor();
_keyBuilder = _dataAccessor.keyBuilder();
}
@AfterClass
public void afterClass() throws Exception {
if (_newInstance != null && _newInstance.isConnected()) {
_newInstance.syncStop();
}
super.afterClass();
}
@Test
public void testNotInMaintenanceMode() {
boolean isInMaintenanceMode =
_gSetupTool.getClusterManagementTool().isInMaintenanceMode(CLUSTER_NAME);
Assert.assertFalse(isInMaintenanceMode);
}
@Test(dependsOnMethods = "testNotInMaintenanceMode")
public void testInMaintenanceMode() {
_gSetupTool.getClusterManagementTool().enableMaintenanceMode(CLUSTER_NAME, true, TestHelper.getTestMethodName());
boolean isInMaintenanceMode = _gSetupTool.getClusterManagementTool().isInMaintenanceMode(CLUSTER_NAME);
Assert.assertTrue(isInMaintenanceMode);
}
@Test(dependsOnMethods = "testInMaintenanceMode")
public void testMaintenanceModeAddNewInstance() {
_gSetupTool.getClusterManagementTool().enableMaintenanceMode(CLUSTER_NAME, true, TestHelper.getTestMethodName());
ExternalView prevExternalView = _gSetupTool.getClusterManagementTool()
.getResourceExternalView(CLUSTER_NAME, WorkflowGenerator.DEFAULT_TGT_DB);
String instanceName = PARTICIPANT_PREFIX + "_" + (_startPort + 10);
_gSetupTool.addInstanceToCluster(CLUSTER_NAME, instanceName);
_newInstance = new MockParticipantManager(ZK_ADDR, CLUSTER_NAME, instanceName);
_newInstance.syncStart();
_gSetupTool.getClusterManagementTool().rebalance(CLUSTER_NAME, WorkflowGenerator.DEFAULT_TGT_DB,
3);
Assert.assertTrue(_clusterVerifier.verifyByPolling());
ExternalView newExternalView = _gSetupTool.getClusterManagementTool()
.getResourceExternalView(CLUSTER_NAME, WorkflowGenerator.DEFAULT_TGT_DB);
Assert.assertEquals(prevExternalView.getRecord().getMapFields(),
newExternalView.getRecord().getMapFields());
}
@Test(dependsOnMethods = "testMaintenanceModeAddNewInstance")
public void testMaintenanceModeAddNewResource() {
_gSetupTool.getClusterManagementTool().addResource(CLUSTER_NAME,
newResourceAddedDuringMaintenanceMode, 7, "MasterSlave",
IdealState.RebalanceMode.FULL_AUTO.name(), CrushEdRebalanceStrategy.class.getName());
_gSetupTool.getClusterManagementTool().rebalance(CLUSTER_NAME,
newResourceAddedDuringMaintenanceMode, 3);
Assert.assertTrue(_clusterVerifier.verifyByPolling());
ExternalView externalView = _gSetupTool.getClusterManagementTool()
.getResourceExternalView(CLUSTER_NAME, newResourceAddedDuringMaintenanceMode);
Assert.assertNull(externalView);
}
@Test(dependsOnMethods = "testMaintenanceModeAddNewResource")
public void testMaintenanceModeInstanceDown() {
_participants[0].syncStop();
Assert.assertTrue(_clusterVerifier.verifyByPolling());
ExternalView externalView = _gSetupTool.getClusterManagementTool()
.getResourceExternalView(CLUSTER_NAME, WorkflowGenerator.DEFAULT_TGT_DB);
for (Map<String, String> stateMap : externalView.getRecord().getMapFields().values()) {
Assert.assertTrue(stateMap.values().contains("MASTER"));
}
}
@Test(dependsOnMethods = "testMaintenanceModeInstanceDown")
public void testMaintenanceModeInstanceBack() {
_participants[0] =
new MockParticipantManager(ZK_ADDR, CLUSTER_NAME, _participants[0].getInstanceName());
_participants[0].syncStart();
Assert.assertTrue(_clusterVerifier.verifyByPolling());
ExternalView externalView = _gSetupTool.getClusterManagementTool()
.getResourceExternalView(CLUSTER_NAME, WorkflowGenerator.DEFAULT_TGT_DB);
for (Map<String, String> stateMap : externalView.getRecord().getMapFields().values()) {
if (stateMap.containsKey(_participants[0].getInstanceName())) {
Assert.assertEquals(stateMap.get(_participants[0].getInstanceName()), "SLAVE");
}
}
}
@Test(dependsOnMethods = "testMaintenanceModeInstanceBack")
public void testExitMaintenanceModeNewResourceRecovery() {
_gSetupTool.getClusterManagementTool().enableMaintenanceMode(CLUSTER_NAME, false);
Assert.assertTrue(_clusterVerifier.verifyByPolling());
ExternalView externalView = _gSetupTool.getClusterManagementTool()
.getResourceExternalView(CLUSTER_NAME, newResourceAddedDuringMaintenanceMode);
Assert.assertEquals(externalView.getRecord().getMapFields().size(), 7);
for (Map<String, String> stateMap : externalView.getRecord().getMapFields().values()) {
Assert.assertTrue(stateMap.values().contains("MASTER"));
}
}
/**
* Test that the auto-exit functionality works.
*/
@Test(dependsOnMethods = "testExitMaintenanceModeNewResourceRecovery")
public void testAutoExitMaintenanceMode() throws Exception {
// Set the config for auto-exiting maintenance mode
ClusterConfig clusterConfig = _manager.getConfigAccessor().getClusterConfig(CLUSTER_NAME);
clusterConfig.setMaxOfflineInstancesAllowed(2);
clusterConfig.setNumOfflineInstancesForAutoExit(1);
_manager.getConfigAccessor().setClusterConfig(CLUSTER_NAME, clusterConfig);
// Kill 3 instances
for (int i = 0; i < 3; i++) {
_participants[i].syncStop();
}
TestHelper.verify(() -> _dataAccessor.getChildNames(_keyBuilder.liveInstances()).size() == 0, 2000L);
// Check that the cluster is in maintenance
MaintenanceSignal maintenanceSignal = _dataAccessor.getProperty(_keyBuilder.maintenance());
Assert.assertNotNull(maintenanceSignal);
// Now bring up 2 instances
for (int i = 0; i < 2; i++) {
String instanceName = PARTICIPANT_PREFIX + "_" + (_startPort + i);
_participants[i] = new MockParticipantManager(ZK_ADDR, CLUSTER_NAME, instanceName);
_participants[i].syncStart();
}
TestHelper.verify(() -> _dataAccessor.getChildNames(_keyBuilder.liveInstances()).size() == 3, 2000L);
// Check that the cluster is no longer in maintenance (auto-recovered)
maintenanceSignal = _dataAccessor.getProperty(_keyBuilder.maintenance());
Assert.assertNull(maintenanceSignal);
}
@Test(dependsOnMethods = "testAutoExitMaintenanceMode")
public void testNoAutoExitWhenManuallyPutInMaintenance() throws Exception {
// Manually put the cluster in maintenance
_gSetupTool.getClusterManagementTool().manuallyEnableMaintenanceMode(CLUSTER_NAME, true, null,
null);
// Kill 2 instances, which makes it a total of 3 down instances
for (int i = 0; i < 2; i++) {
_participants[i].syncStop();
}
TestHelper.verify(() -> _dataAccessor.getChildNames(_keyBuilder.liveInstances()).size() == 0, 2000L);
// Now bring up all instances
for (int i = 0; i < 3; i++) {
String instanceName = PARTICIPANT_PREFIX + "_" + (_startPort + i);
_participants[i] = new MockParticipantManager(ZK_ADDR, CLUSTER_NAME, instanceName);
_participants[i].syncStart();
}
TestHelper.verify(() -> _dataAccessor.getChildNames(_keyBuilder.liveInstances()).size() == 3, 2000L);
// The cluster should still be in maintenance because it was enabled manually
MaintenanceSignal maintenanceSignal = _dataAccessor.getProperty(_keyBuilder.maintenance());
Assert.assertNotNull(maintenanceSignal);
}
/**
* Test that manual triggering of maintenance mode overrides auto-enabled maintenance.
* @throws InterruptedException
*/
@Test(dependsOnMethods = "testNoAutoExitWhenManuallyPutInMaintenance")
public void testManualEnablingOverridesAutoEnabling() throws Exception {
// Exit maintenance mode manually
_gSetupTool.getClusterManagementTool().manuallyEnableMaintenanceMode(CLUSTER_NAME, false, null,
null);
// Kill 3 instances, which would put cluster in maintenance automatically
for (int i = 0; i < 3; i++) {
_participants[i].syncStop();
}
TestHelper.verify(() -> _dataAccessor.getChildNames(_keyBuilder.liveInstances()).size() == 0, 2000L);
// Check that maintenance signal was triggered by Controller
MaintenanceSignal maintenanceSignal = _dataAccessor.getProperty(_keyBuilder.maintenance());
Assert.assertNotNull(maintenanceSignal);
Assert.assertEquals(maintenanceSignal.getTriggeringEntity(),
MaintenanceSignal.TriggeringEntity.CONTROLLER);
// Manually enable maintenance mode with customFields
Map<String, String> customFields = ImmutableMap.of("LDAP", "hulee", "JIRA", "HELIX-999",
"TRIGGERED_BY", "SHOULD NOT BE RECORDED");
_gSetupTool.getClusterManagementTool().manuallyEnableMaintenanceMode(CLUSTER_NAME, true, null,
customFields);
TestHelper.verify(() -> _dataAccessor.getProperty(_keyBuilder.maintenance()) != null, 2000L);
// Check that maintenance mode has successfully overwritten with the right TRIGGERED_BY field
maintenanceSignal = _dataAccessor.getProperty(_keyBuilder.maintenance());
Assert.assertEquals(maintenanceSignal.getTriggeringEntity(),
MaintenanceSignal.TriggeringEntity.USER);
for (Map.Entry<String, String> entry : customFields.entrySet()) {
if (entry.getKey().equals("TRIGGERED_BY")) {
Assert.assertEquals(maintenanceSignal.getRecord().getSimpleField(entry.getKey()), "USER");
} else {
Assert.assertEquals(maintenanceSignal.getRecord().getSimpleField(entry.getKey()),
entry.getValue());
}
}
}
/**
* Test that maxNumPartitionPerInstance still applies (if any Participant has more replicas than
* the threshold, the cluster should not auto-exit maintenance mode).
* @throws InterruptedException
*/
@Test(dependsOnMethods = "testManualEnablingOverridesAutoEnabling")
public void testMaxPartitionLimit() throws Exception {
// Manually exit maintenance mode
_gSetupTool.getClusterManagementTool().manuallyEnableMaintenanceMode(CLUSTER_NAME, false, null,
null);
TestHelper.verify(() -> _dataAccessor.getProperty(_keyBuilder.maintenance()) != null, 2000L);
// Since 3 instances are missing, the cluster should have gone back under maintenance
// automatically
MaintenanceSignal maintenanceSignal = _dataAccessor.getProperty(_keyBuilder.maintenance());
Assert.assertNotNull(maintenanceSignal);
Assert.assertEquals(maintenanceSignal.getTriggeringEntity(),
MaintenanceSignal.TriggeringEntity.CONTROLLER);
Assert.assertEquals(maintenanceSignal.getAutoTriggerReason(),
MaintenanceSignal.AutoTriggerReason.MAX_INSTANCES_UNABLE_TO_ACCEPT_ONLINE_REPLICAS);
// Bring up all instances
for (int i = 0; i < 3; i++) {
String instanceName = PARTICIPANT_PREFIX + "_" + (_startPort + i);
_participants[i] = new MockParticipantManager(ZK_ADDR, CLUSTER_NAME, instanceName);
_participants[i].syncStart();
}
TestHelper.verify(() -> _dataAccessor.getChildNames(_keyBuilder.liveInstances()).size() == 3, 2000L);
// Check that the cluster exited maintenance
maintenanceSignal = _dataAccessor.getProperty(_keyBuilder.maintenance());
Assert.assertNull(maintenanceSignal);
// Kill 3 instances, which would put cluster in maintenance automatically
for (int i = 0; i < 3; i++) {
_participants[i].syncStop();
}
TestHelper.verify(() -> _dataAccessor.getChildNames(_keyBuilder.liveInstances()).size() == 0, 2000L);
// Check that cluster is back under maintenance
maintenanceSignal = _dataAccessor.getProperty(_keyBuilder.maintenance());
Assert.assertNotNull(maintenanceSignal);
Assert.assertEquals(maintenanceSignal.getTriggeringEntity(),
MaintenanceSignal.TriggeringEntity.CONTROLLER);
Assert.assertEquals(maintenanceSignal.getAutoTriggerReason(),
MaintenanceSignal.AutoTriggerReason.MAX_INSTANCES_UNABLE_TO_ACCEPT_ONLINE_REPLICAS);
// Set the cluster config for auto-exiting maintenance mode
ClusterConfig clusterConfig = _manager.getConfigAccessor().getClusterConfig(CLUSTER_NAME);
// Setting MaxPartitionsPerInstance to 1 will prevent the cluster from exiting maintenance mode
// automatically because the instances currently have more than 1
clusterConfig.setMaxPartitionsPerInstance(1);
_manager.getConfigAccessor().setClusterConfig(CLUSTER_NAME, clusterConfig);
TestHelper.verify(
() -> ((ClusterConfig) _dataAccessor.getProperty(_keyBuilder.clusterConfig())).getMaxPartitionsPerInstance() == 1,
2000L);
// Now bring up all instances
for (int i = 0; i < 3; i++) {
String instanceName = PARTICIPANT_PREFIX + "_" + (_startPort + i);
_participants[i] = new MockParticipantManager(ZK_ADDR, CLUSTER_NAME, instanceName);
_participants[i].syncStart();
}
TestHelper.verify(() -> _dataAccessor.getChildNames(_keyBuilder.liveInstances()).size() == 3, 2000L);
// Check that the cluster is still in maintenance (should not have auto-exited because it would
// fail the MaxPartitionsPerInstance check)
maintenanceSignal = _dataAccessor.getProperty(_keyBuilder.maintenance());
Assert.assertNotNull(maintenanceSignal);
Assert.assertEquals(maintenanceSignal.getTriggeringEntity(),
MaintenanceSignal.TriggeringEntity.CONTROLLER);
Assert.assertEquals(maintenanceSignal.getAutoTriggerReason(),
MaintenanceSignal.AutoTriggerReason.MAX_PARTITION_PER_INSTANCE_EXCEEDED);
// Check if failed rebalance counter is updated
boolean result = TestHelper.verify(() -> {
try {
Long value =
(Long) _server.getAttribute(getMbeanName(CLUSTER_NAME), "RebalanceFailureCounter");
return value != null && (value > 0);
} catch (Exception e) {
return false;
}
}, TIMEOUT);
Assert.assertTrue(result);
// Check failed continuous task rebalance counter is not updated
result = TestHelper.verify(() -> {
try {
Long value = (Long) _server
.getAttribute(getMbeanName(CLUSTER_NAME), "ContinuousTaskRebalanceFailureCount");
return value != null && (value == 0);
} catch (Exception e) {
return false;
}
}, TIMEOUT);
Assert.assertTrue(result);
// Check if failed continuous resource rebalance counter is updated
result = TestHelper.verify(() -> {
try {
Long value = (Long) _server
.getAttribute(getMbeanName(CLUSTER_NAME), "ContinuousResourceRebalanceFailureCount");
return value != null && (value > 0);
} catch (Exception e) {
return false;
}
}, TIMEOUT);
Assert.assertTrue(result);
}
private ObjectName getMbeanName(String clusterName) throws MalformedObjectNameException {
String clusterBeanName = String.format("%s=%s", CLUSTER_DN_KEY, clusterName);
return new ObjectName(
String.format("%s:%s", MonitorDomainNames.ClusterStatus.name(), clusterBeanName));
}
/**
* Test that the Controller correctly records maintenance history in various situations.
* @throws InterruptedException
*/
@Test(dependsOnMethods = "testMaxPartitionLimit")
public void testMaintenanceHistory() throws Exception {
// In maintenance mode, by controller, for MAX_PARTITION_PER_INSTANCE_EXCEEDED
ControllerHistory history = _dataAccessor.getProperty(_keyBuilder.controllerLeaderHistory());
Map<String, String> lastHistoryEntry = convertStringToMap(
history.getMaintenanceHistoryList().get(history.getMaintenanceHistoryList().size() - 1));
// **The KV pairs are hard-coded in here for the ease of reading!**
Assert.assertEquals(lastHistoryEntry.get("OPERATION_TYPE"), "ENTER");
Assert.assertEquals(lastHistoryEntry.get("TRIGGERED_BY"), "CONTROLLER");
Assert.assertEquals(lastHistoryEntry.get("AUTO_TRIGGER_REASON"),
"MAX_PARTITION_PER_INSTANCE_EXCEEDED");
// Remove the maxPartitionPerInstance config
ClusterConfig clusterConfig = _manager.getConfigAccessor().getClusterConfig(CLUSTER_NAME);
clusterConfig.setMaxPartitionsPerInstance(-1);
_manager.getConfigAccessor().setClusterConfig(CLUSTER_NAME, clusterConfig);
TestHelper.verify(() -> _dataAccessor.getProperty(_keyBuilder.maintenance()) == null, 2000L);
// Now check that the cluster exited maintenance
// EXIT, CONTROLLER, for MAX_PARTITION_PER_INSTANCE_EXCEEDED
history = _dataAccessor.getProperty(_keyBuilder.controllerLeaderHistory());
lastHistoryEntry = convertStringToMap(
history.getMaintenanceHistoryList().get(history.getMaintenanceHistoryList().size() - 1));
Assert.assertEquals(lastHistoryEntry.get("OPERATION_TYPE"), "EXIT");
Assert.assertEquals(lastHistoryEntry.get("TRIGGERED_BY"), "CONTROLLER");
Assert.assertEquals(lastHistoryEntry.get("AUTO_TRIGGER_REASON"),
"MAX_PARTITION_PER_INSTANCE_EXCEEDED");
// Manually put the cluster in maintenance with a custom field
Map<String, String> customFieldMap = ImmutableMap.of("k1", "v1", "k2", "v2");
_gSetupTool.getClusterManagementTool()
.manuallyEnableMaintenanceMode(CLUSTER_NAME, true, TestHelper.getTestMethodName(), customFieldMap);
TestHelper.verify(() -> _dataAccessor.getProperty(_keyBuilder.maintenance()) != null, 2000L);
// ENTER, USER, for reason TEST, no internalReason
history = _dataAccessor.getProperty(_keyBuilder.controllerLeaderHistory());
lastHistoryEntry =
convertStringToMap(history.getMaintenanceHistoryList().get(history.getMaintenanceHistoryList().size() - 1));
Assert.assertEquals(lastHistoryEntry.get("OPERATION_TYPE"), "ENTER");
Assert.assertEquals(lastHistoryEntry.get("TRIGGERED_BY"), "USER");
Assert.assertEquals(lastHistoryEntry.get("REASON"), TestHelper.getTestMethodName());
Assert.assertNull(lastHistoryEntry.get("AUTO_TRIGGER_REASON"));
}
/**
* Convert a String representation of a Map into a Map object for verification purposes.
* @param value
* @return
*/
private static Map<String, String> convertStringToMap(String value) throws IOException {
return new ObjectMapper().readValue(value,
TypeFactory.defaultInstance().constructMapType(HashMap.class, String.class, String.class));
}
}