Add integration tests for cluster freeze mode (#1816)
Integration tests are needed to cover the scenarios for the cluster freeze mode.
This commit adds integration tests for cluster freeze mode:
a. freeze cluster when there are pending state transition messages
b. handle new session when frozen - it should be reset and then frozen
c. restart participants when frozen - it should be reset and then frozen
d. reset partition when frozen
f. unfreeze cluster
diff --git a/helix-core/src/main/java/org/apache/helix/controller/stages/ManagementMessageGenerationPhase.java b/helix-core/src/main/java/org/apache/helix/controller/stages/ManagementMessageGenerationPhase.java
index 4479d50..c795deb 100644
--- a/helix-core/src/main/java/org/apache/helix/controller/stages/ManagementMessageGenerationPhase.java
+++ b/helix-core/src/main/java/org/apache/helix/controller/stages/ManagementMessageGenerationPhase.java
@@ -96,7 +96,9 @@
LiveInstance liveInstance = liveInstanceMap.get(instanceName);
Collection<Message> pendingMessages = allInstanceMessages.get(instanceName);
String sessionId = liveInstance.getEphemeralOwner();
- LiveInstanceStatus currentStatus = liveInstance.getStatus();
+ LiveInstanceStatus liveInstanceStatus = liveInstance.getStatus();
+ LiveInstanceStatus currentStatus = (liveInstanceStatus == null
+ ? LiveInstanceStatus.NORMAL : liveInstanceStatus);
if (needStatusChangeMessage(pendingMessages, currentStatus, desiredStatus)) {
Message statusChangeMessage = MessageUtil.createStatusChangeMessage(currentStatus,
diff --git a/helix-core/src/test/java/org/apache/helix/integration/controller/TestClusterFreezeMode.java b/helix-core/src/test/java/org/apache/helix/integration/controller/TestClusterFreezeMode.java
new file mode 100644
index 0000000..cbc8535
--- /dev/null
+++ b/helix-core/src/test/java/org/apache/helix/integration/controller/TestClusterFreezeMode.java
@@ -0,0 +1,372 @@
+package org.apache.helix.integration.controller;
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.CountDownLatch;
+
+import org.apache.helix.HelixConstants;
+import org.apache.helix.HelixDataAccessor;
+import org.apache.helix.HelixManager;
+import org.apache.helix.HelixManagerFactory;
+import org.apache.helix.InstanceType;
+import org.apache.helix.NotificationContext;
+import org.apache.helix.PropertyKey;
+import org.apache.helix.TestHelper;
+import org.apache.helix.api.status.ClusterManagementMode;
+import org.apache.helix.api.status.ClusterManagementModeRequest;
+import org.apache.helix.common.ZkTestBase;
+import org.apache.helix.integration.manager.ClusterControllerManager;
+import org.apache.helix.integration.manager.ClusterManager;
+import org.apache.helix.integration.manager.MockParticipantManager;
+import org.apache.helix.mock.participant.ErrTransition;
+import org.apache.helix.mock.participant.MockTransition;
+import org.apache.helix.model.ClusterStatus;
+import org.apache.helix.model.ControllerHistory;
+import org.apache.helix.model.CurrentState;
+import org.apache.helix.model.LiveInstance;
+import org.apache.helix.model.Message;
+import org.apache.helix.model.PauseSignal;
+import org.apache.helix.model.Resource;
+import org.apache.helix.model.StateModelDefinition;
+import org.apache.helix.tools.ClusterStateVerifier;
+import org.apache.helix.util.MessageUtil;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.testng.Assert;
+import org.testng.annotations.AfterClass;
+import org.testng.annotations.BeforeClass;
+import org.testng.annotations.Test;
+
+public class TestClusterFreezeMode extends ZkTestBase {
+ private HelixManager _manager;
+ private HelixDataAccessor _accessor;
+ private String _clusterName;
+ private int _numNodes;
+ private MockParticipantManager[] _participants;
+ private ClusterControllerManager _controller;
+
+ @BeforeClass
+ public void beforeClass() throws Exception {
+ _numNodes = 3;
+ _clusterName = "CLUSTER_" + TestHelper.getTestClassName();
+ _participants = new MockParticipantManager[_numNodes];
+ TestHelper.setupCluster(_clusterName, ZK_ADDR, 12918, // participant port
+ "localhost", // participant name prefix
+ "TestDB", // resource name prefix
+ 1, // resources
+ 2, // partitions per resource
+ _numNodes, // number of nodes
+ 3, // replicas
+ "MasterSlave", true);
+
+ _manager = HelixManagerFactory
+ .getZKHelixManager(_clusterName, "Admin", InstanceType.ADMINISTRATOR, ZK_ADDR);
+ _manager.connect();
+ _accessor = _manager.getHelixDataAccessor();
+
+ // start controller
+ _controller = new ClusterControllerManager(ZK_ADDR, _clusterName, "controller_0");
+ _controller.syncStart();
+
+ Map<String, Set<String>> errPartitions = new HashMap<String, Set<String>>() {
+ {
+ put("OFFLINE-SLAVE", TestHelper.setOf("TestDB0_0"));
+ }
+ };
+
+ // start participants
+ for (int i = 0; i < _numNodes; i++) {
+ String instanceName = "localhost_" + (12918 + i);
+ _participants[i] = new MockParticipantManager(ZK_ADDR, _clusterName, instanceName);
+ if (i == 0) {
+ // Make TestDB0_0 be error state on participant_0
+ _participants[i].setTransition(new ErrTransition(errPartitions));
+ }
+ _participants[i].syncStart();
+ }
+
+ boolean result = ClusterStateVerifier.verifyByZkCallback(
+ new ClusterStateVerifier.BestPossAndExtViewZkVerifier(ZK_ADDR, _clusterName));
+ Assert.assertTrue(result);
+ }
+
+ @AfterClass
+ public void afterClass() {
+ _manager.disconnect();
+ _controller.syncStop();
+ Arrays.stream(_participants).forEach(ClusterManager::syncStop);
+ deleteCluster(_clusterName);
+ }
+
+ /*
+ * Tests below scenarios:
+ * 1. cluster is in progress to freeze mode if there is a pending state transition message;
+ * 2. after state transition is completed, cluster freeze mode is completed
+ *
+ * Also tests cluster status and management mode history recording.
+ */
+ @Test
+ public void testEnableFreezeMode() throws Exception {
+ String methodName = TestHelper.getTestMethodName();
+ // Not in freeze mode
+ PropertyKey.Builder keyBuilder = _accessor.keyBuilder();
+ PauseSignal pauseSignal = _accessor.getProperty(keyBuilder.pause());
+ Assert.assertNull(pauseSignal);
+
+ // Block state transition for participants[1]
+ CountDownLatch latch = new CountDownLatch(1);
+ _participants[1].setTransition(new BlockingTransition(latch));
+
+ // Send a state transition message to participants[1]
+ Resource resource = new Resource("TestDB0");
+ resource.setStateModelFactoryName(HelixConstants.DEFAULT_STATE_MODEL_FACTORY);
+ Message message = MessageUtil
+ .createStateTransitionMessage(_manager.getInstanceName(), _manager.getSessionId(), resource,
+ "TestDB0_1", _participants[1].getInstanceName(), "SLAVE", "OFFLINE",
+ _participants[1].getSessionId(), "MasterSlave");
+ Assert.assertTrue(_accessor
+ .updateProperty(keyBuilder.message(message.getTgtName(), message.getMsgId()), message));
+
+ // Freeze cluster
+ ClusterManagementModeRequest request = ClusterManagementModeRequest.newBuilder()
+ .withClusterName(_clusterName)
+ .withMode(ClusterManagementMode.Type.CLUSTER_PAUSE)
+ .withReason(methodName)
+ .build();
+ _gSetupTool.getClusterManagementTool().setClusterManagementMode(request);
+
+ // Pending ST message exists
+ Assert.assertTrue(
+ _gZkClient.exists(keyBuilder.message(message.getTgtName(), message.getMsgId()).getPath()));
+
+ // Cluster is in progress to cluster pause because there is a pending state transition message
+ ClusterStatus expectedClusterStatus = new ClusterStatus();
+ expectedClusterStatus.setManagementMode(ClusterManagementMode.Type.CLUSTER_PAUSE);
+ expectedClusterStatus.setManagementModeStatus(ClusterManagementMode.Status.IN_PROGRESS);
+ verifyClusterStatus(expectedClusterStatus);
+
+ // Unblock to finish state transition and delete the ST message
+ latch.countDown();
+
+ // Verify live instance status and cluster status
+ verifyLiveInstanceStatus(_participants, LiveInstance.LiveInstanceStatus.PAUSED);
+
+ expectedClusterStatus = new ClusterStatus();
+ expectedClusterStatus.setManagementMode(ClusterManagementMode.Type.CLUSTER_PAUSE);
+ expectedClusterStatus.setManagementModeStatus(ClusterManagementMode.Status.COMPLETED);
+ verifyClusterStatus(expectedClusterStatus);
+
+ // Verify management mode history
+ Assert.assertTrue(TestHelper.verify(() -> {
+ ControllerHistory history = _accessor.getProperty(keyBuilder.controllerLeaderHistory());
+ List<String> managementHistory = history.getManagementModeHistory();
+ if (managementHistory == null || managementHistory.isEmpty()) {
+ return false;
+ }
+ String lastHistory = managementHistory.get(managementHistory.size() - 1);
+ return lastHistory.contains("MODE=" + ClusterManagementMode.Type.CLUSTER_PAUSE)
+ && lastHistory.contains("STATUS=" + ClusterManagementMode.Status.COMPLETED)
+ && lastHistory.contains("REASON=" + methodName);
+ }, TestHelper.WAIT_DURATION));
+ }
+
+ @Test(dependsOnMethods = "testEnableFreezeMode")
+ public void testNewLiveInstanceAddedWhenFrozen() throws Exception {
+ // Add a new live instance. Simulate an instance is rebooted and back to online
+ String newInstanceName = "localhost_" + (12918 + _numNodes + 1);
+ _gSetupTool.addInstancesToCluster(_clusterName, new String[]{newInstanceName});
+ MockParticipantManager newParticipant =
+ new MockParticipantManager(ZK_ADDR, _clusterName, newInstanceName);
+ newParticipant.syncStart();
+
+ // The new participant/live instance should be frozen by controller
+ verifyLiveInstanceStatus(new MockParticipantManager[]{newParticipant},
+ LiveInstance.LiveInstanceStatus.PAUSED);
+
+ newParticipant.syncStop();
+ }
+
+ // Simulates instance is restarted and the in-memory status is gone.
+ // When instance comes back alive, it'll reset state model, carry over
+ // and set current state to init state.
+ @Test(dependsOnMethods = "testNewLiveInstanceAddedWhenFrozen")
+ public void testRestartParticipantWhenFrozen() throws Exception {
+ String instanceName = _participants[1].getInstanceName();
+ PropertyKey.Builder keyBuilder = _accessor.keyBuilder();
+ List<CurrentState> originCurStates = _accessor
+ .getChildValues(keyBuilder.currentStates(instanceName, _participants[1].getSessionId()),
+ false);
+ String oldSession = _participants[1].getSessionId();
+
+ // Restart participants[1]
+ _participants[1].syncStop();
+ _participants[1] = new MockParticipantManager(ZK_ADDR, _participants[1].getClusterName(),
+ instanceName);
+ _participants[1].syncStart();
+
+ Assert.assertTrue(TestHelper.verify(() ->
+ _gZkClient.exists(keyBuilder.liveInstance(instanceName).getPath()),
+ TestHelper.WAIT_DURATION));
+ LiveInstance liveInstance = _accessor.getProperty(keyBuilder.liveInstance(instanceName));
+
+ // New live instance ephemeral node
+ Assert.assertEquals(liveInstance.getEphemeralOwner(), _participants[1].getSessionId());
+ // Status is frozen because controller sends a freeze message.
+ verifyLiveInstanceStatus(new MockParticipantManager[]{_participants[1]},
+ LiveInstance.LiveInstanceStatus.PAUSED);
+
+ // Old session current state is deleted because of current state carry-over
+ Assert.assertTrue(TestHelper.verify(
+ () -> !_gZkClient.exists(keyBuilder.currentStates(instanceName, oldSession).getPath()),
+ TestHelper.WAIT_DURATION));
+
+ // Current states are set to init states (OFFLINE)
+ List<CurrentState> curStates = _accessor
+ .getChildValues(keyBuilder.currentStates(instanceName, _participants[1].getSessionId()),
+ false);
+ Assert.assertEquals(curStates.size(), 1);
+ Assert.assertTrue(TestHelper.verify(() -> {
+ for (CurrentState cs : originCurStates) {
+ String stateModelDefRef = cs.getStateModelDefRef();
+ for (String partition : cs.getPartitionStateMap().keySet()) {
+ StateModelDefinition stateModelDef =
+ _accessor.getProperty(keyBuilder.stateModelDef(stateModelDefRef));
+ String initState = stateModelDef.getInitialState();
+ if (!initState.equals(curStates.get(0).getPartitionStateMap().get(partition))) {
+ return false;
+ }
+ }
+ }
+ return true;
+ }, TestHelper.WAIT_DURATION));
+ }
+
+ // Partition reset is allowed when cluster is frozen
+ @Test(dependsOnMethods = "testRestartParticipantWhenFrozen")
+ public void testResetPartitionWhenFrozen() throws Exception {
+ String instanceName = _participants[0].getInstanceName();
+ // Remove errTransition
+ _participants[0].setTransition(null);
+ _gSetupTool.getClusterManagementTool().resetPartition(_clusterName, instanceName, "TestDB0",
+ Collections.singletonList("TestDB0_0"));
+
+ // Error partition is reset: ERROR -> OFFLINE
+ Assert.assertTrue(TestHelper.verify(() -> {
+ CurrentState currentState = _accessor.getProperty(_accessor.keyBuilder()
+ .currentState(instanceName, _participants[0].getSessionId(), "TestDB0"));
+ return "OFFLINE".equals(currentState.getPartitionStateMap().get("TestDB0_0"));
+ }, TestHelper.WAIT_DURATION));
+ }
+
+ @Test(dependsOnMethods = "testResetPartitionWhenFrozen")
+ public void testCreateResourceWhenFrozen() {
+ // Add a new resource
+ _gSetupTool.addResourceToCluster(_clusterName, "TestDB1", 2, "MasterSlave");
+ _gSetupTool.rebalanceStorageCluster(_clusterName, "TestDB1", 3);
+
+ // TestDB1 external view is empty
+ TestHelper.verifyWithTimeout("verifyEmptyCurStateAndExtView", 1000, _clusterName, "TestDB1",
+ TestHelper.setOf("localhost_12918", "localhost_12919", "localhost_12920"), ZK_ADDR);
+ }
+
+ @Test(dependsOnMethods = "testCreateResourceWhenFrozen")
+ public void testUnfreezeCluster() throws Exception {
+ String methodName = TestHelper.getTestMethodName();
+ // Unfreeze cluster
+ ClusterManagementModeRequest request = ClusterManagementModeRequest.newBuilder()
+ .withClusterName(_clusterName)
+ .withMode(ClusterManagementMode.Type.NORMAL)
+ .withReason(methodName)
+ .build();
+ _gSetupTool.getClusterManagementTool().setClusterManagementMode(request);
+
+ verifyLiveInstanceStatus(_participants, null);
+
+ ClusterStatus expectedClusterStatus = new ClusterStatus();
+ expectedClusterStatus.setManagementMode(ClusterManagementMode.Type.NORMAL);
+ expectedClusterStatus.setManagementModeStatus(ClusterManagementMode.Status.COMPLETED);
+ verifyClusterStatus(expectedClusterStatus);
+
+ // Verify management mode history: NORMAL + COMPLETED
+ Assert.assertTrue(TestHelper.verify(() -> {
+ ControllerHistory history =
+ _accessor.getProperty(_accessor.keyBuilder().controllerLeaderHistory());
+ List<String> managementHistory = history.getManagementModeHistory();
+ if (managementHistory == null || managementHistory.isEmpty()) {
+ return false;
+ }
+ String lastHistory = managementHistory.get(managementHistory.size() - 1);
+ return lastHistory.contains("MODE=" + ClusterManagementMode.Type.NORMAL)
+ && lastHistory.contains("STATUS=" + ClusterManagementMode.Status.COMPLETED);
+ }, TestHelper.WAIT_DURATION));
+
+ // Verify cluster's normal rebalance ability after unfrozen.
+ Assert.assertTrue(ClusterStateVerifier.verifyByZkCallback(
+ new ClusterStateVerifier.BestPossAndExtViewZkVerifier(ZK_ADDR, _clusterName)));
+ }
+
+ private void verifyLiveInstanceStatus(MockParticipantManager[] participants,
+ LiveInstance.LiveInstanceStatus status) throws Exception {
+ final PropertyKey.Builder keyBuilder = _accessor.keyBuilder();
+ Assert.assertTrue(TestHelper.verify(() -> {
+ for (MockParticipantManager participant : participants) {
+ String instanceName = participant.getInstanceName();
+ LiveInstance liveInstance = _accessor.getProperty(keyBuilder.liveInstance(instanceName));
+ if (status != liveInstance.getStatus()) {
+ return false;
+ }
+ }
+ return true;
+ }, TestHelper.WAIT_DURATION));
+ }
+
+ private void verifyClusterStatus(ClusterStatus expectedMode) throws Exception {
+ final PropertyKey statusPropertyKey = _accessor.keyBuilder().clusterStatus();
+ TestHelper.verify(() -> {
+ ClusterStatus clusterStatus = _accessor.getProperty(statusPropertyKey);
+ return clusterStatus != null
+ && expectedMode.getManagementMode().equals(clusterStatus.getManagementMode())
+ && expectedMode.getManagementModeStatus().equals(clusterStatus.getManagementModeStatus());
+ }, TestHelper.WAIT_DURATION);
+ }
+
+ private static class BlockingTransition extends MockTransition {
+ private static final Logger LOG = LoggerFactory.getLogger(BlockingTransition.class);
+ private final CountDownLatch _countDownLatch;
+
+ private BlockingTransition(CountDownLatch countDownLatch) {
+ _countDownLatch = countDownLatch;
+ }
+
+ @Override
+ public void doTransition(Message message, NotificationContext context)
+ throws InterruptedException {
+ LOG.info("Transition is blocked");
+ _countDownLatch.await();
+ LOG.info("Transition is completed");
+ }
+ }
+}