Add integration tests for cluster freeze mode (#1816)

Integration tests are needed to cover the scenarios for the cluster freeze mode.
This commit adds integration tests for cluster freeze mode:
a. freeze cluster when there are pending state transition messages
b. handle new session when frozen - it should be reset and then frozen
c. restart participants when frozen - it should be reset and then frozen
d. reset partition when frozen
f. unfreeze cluster
diff --git a/helix-core/src/main/java/org/apache/helix/controller/stages/ManagementMessageGenerationPhase.java b/helix-core/src/main/java/org/apache/helix/controller/stages/ManagementMessageGenerationPhase.java
index 4479d50..c795deb 100644
--- a/helix-core/src/main/java/org/apache/helix/controller/stages/ManagementMessageGenerationPhase.java
+++ b/helix-core/src/main/java/org/apache/helix/controller/stages/ManagementMessageGenerationPhase.java
@@ -96,7 +96,9 @@
       LiveInstance liveInstance = liveInstanceMap.get(instanceName);
       Collection<Message> pendingMessages = allInstanceMessages.get(instanceName);
       String sessionId = liveInstance.getEphemeralOwner();
-      LiveInstanceStatus currentStatus = liveInstance.getStatus();
+      LiveInstanceStatus liveInstanceStatus = liveInstance.getStatus();
+      LiveInstanceStatus currentStatus = (liveInstanceStatus == null
+          ? LiveInstanceStatus.NORMAL : liveInstanceStatus);
 
       if (needStatusChangeMessage(pendingMessages, currentStatus, desiredStatus)) {
         Message statusChangeMessage = MessageUtil.createStatusChangeMessage(currentStatus,
diff --git a/helix-core/src/test/java/org/apache/helix/integration/controller/TestClusterFreezeMode.java b/helix-core/src/test/java/org/apache/helix/integration/controller/TestClusterFreezeMode.java
new file mode 100644
index 0000000..cbc8535
--- /dev/null
+++ b/helix-core/src/test/java/org/apache/helix/integration/controller/TestClusterFreezeMode.java
@@ -0,0 +1,372 @@
+package org.apache.helix.integration.controller;
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.CountDownLatch;
+
+import org.apache.helix.HelixConstants;
+import org.apache.helix.HelixDataAccessor;
+import org.apache.helix.HelixManager;
+import org.apache.helix.HelixManagerFactory;
+import org.apache.helix.InstanceType;
+import org.apache.helix.NotificationContext;
+import org.apache.helix.PropertyKey;
+import org.apache.helix.TestHelper;
+import org.apache.helix.api.status.ClusterManagementMode;
+import org.apache.helix.api.status.ClusterManagementModeRequest;
+import org.apache.helix.common.ZkTestBase;
+import org.apache.helix.integration.manager.ClusterControllerManager;
+import org.apache.helix.integration.manager.ClusterManager;
+import org.apache.helix.integration.manager.MockParticipantManager;
+import org.apache.helix.mock.participant.ErrTransition;
+import org.apache.helix.mock.participant.MockTransition;
+import org.apache.helix.model.ClusterStatus;
+import org.apache.helix.model.ControllerHistory;
+import org.apache.helix.model.CurrentState;
+import org.apache.helix.model.LiveInstance;
+import org.apache.helix.model.Message;
+import org.apache.helix.model.PauseSignal;
+import org.apache.helix.model.Resource;
+import org.apache.helix.model.StateModelDefinition;
+import org.apache.helix.tools.ClusterStateVerifier;
+import org.apache.helix.util.MessageUtil;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.testng.Assert;
+import org.testng.annotations.AfterClass;
+import org.testng.annotations.BeforeClass;
+import org.testng.annotations.Test;
+
+public class TestClusterFreezeMode extends ZkTestBase {
+  private HelixManager _manager;
+  private HelixDataAccessor _accessor;
+  private String _clusterName;
+  private int _numNodes;
+  private MockParticipantManager[] _participants;
+  private ClusterControllerManager _controller;
+
+  @BeforeClass
+  public void beforeClass() throws Exception {
+    _numNodes = 3;
+    _clusterName = "CLUSTER_" + TestHelper.getTestClassName();
+    _participants = new MockParticipantManager[_numNodes];
+    TestHelper.setupCluster(_clusterName, ZK_ADDR, 12918, // participant port
+        "localhost", // participant name prefix
+        "TestDB", // resource name prefix
+        1, // resources
+        2, // partitions per resource
+        _numNodes, // number of nodes
+        3, // replicas
+        "MasterSlave", true);
+
+    _manager = HelixManagerFactory
+        .getZKHelixManager(_clusterName, "Admin", InstanceType.ADMINISTRATOR, ZK_ADDR);
+    _manager.connect();
+    _accessor = _manager.getHelixDataAccessor();
+
+    // start controller
+    _controller = new ClusterControllerManager(ZK_ADDR, _clusterName, "controller_0");
+    _controller.syncStart();
+
+    Map<String, Set<String>> errPartitions = new HashMap<String, Set<String>>() {
+      {
+        put("OFFLINE-SLAVE", TestHelper.setOf("TestDB0_0"));
+      }
+    };
+
+    // start participants
+    for (int i = 0; i < _numNodes; i++) {
+      String instanceName = "localhost_" + (12918 + i);
+      _participants[i] = new MockParticipantManager(ZK_ADDR, _clusterName, instanceName);
+      if (i == 0) {
+        // Make TestDB0_0 be error state on participant_0
+        _participants[i].setTransition(new ErrTransition(errPartitions));
+      }
+      _participants[i].syncStart();
+    }
+
+    boolean result = ClusterStateVerifier.verifyByZkCallback(
+        new ClusterStateVerifier.BestPossAndExtViewZkVerifier(ZK_ADDR, _clusterName));
+    Assert.assertTrue(result);
+  }
+
+  @AfterClass
+  public void afterClass() {
+    _manager.disconnect();
+    _controller.syncStop();
+    Arrays.stream(_participants).forEach(ClusterManager::syncStop);
+    deleteCluster(_clusterName);
+  }
+
+  /*
+   * Tests below scenarios:
+   * 1. cluster is in progress to freeze mode if there is a pending state transition message;
+   * 2. after state transition is completed, cluster freeze mode is completed
+   *
+   * Also tests cluster status and management mode history recording.
+   */
+  @Test
+  public void testEnableFreezeMode() throws Exception {
+    String methodName = TestHelper.getTestMethodName();
+    // Not in freeze mode
+    PropertyKey.Builder keyBuilder = _accessor.keyBuilder();
+    PauseSignal pauseSignal = _accessor.getProperty(keyBuilder.pause());
+    Assert.assertNull(pauseSignal);
+
+    // Block state transition for participants[1]
+    CountDownLatch latch = new CountDownLatch(1);
+    _participants[1].setTransition(new BlockingTransition(latch));
+
+    // Send a state transition message to participants[1]
+    Resource resource = new Resource("TestDB0");
+    resource.setStateModelFactoryName(HelixConstants.DEFAULT_STATE_MODEL_FACTORY);
+    Message message = MessageUtil
+        .createStateTransitionMessage(_manager.getInstanceName(), _manager.getSessionId(), resource,
+            "TestDB0_1", _participants[1].getInstanceName(), "SLAVE", "OFFLINE",
+            _participants[1].getSessionId(), "MasterSlave");
+    Assert.assertTrue(_accessor
+        .updateProperty(keyBuilder.message(message.getTgtName(), message.getMsgId()), message));
+
+    // Freeze cluster
+    ClusterManagementModeRequest request = ClusterManagementModeRequest.newBuilder()
+        .withClusterName(_clusterName)
+        .withMode(ClusterManagementMode.Type.CLUSTER_PAUSE)
+        .withReason(methodName)
+        .build();
+    _gSetupTool.getClusterManagementTool().setClusterManagementMode(request);
+
+    // Pending ST message exists
+    Assert.assertTrue(
+        _gZkClient.exists(keyBuilder.message(message.getTgtName(), message.getMsgId()).getPath()));
+
+    // Cluster is in progress to cluster pause because there is a pending state transition message
+    ClusterStatus expectedClusterStatus = new ClusterStatus();
+    expectedClusterStatus.setManagementMode(ClusterManagementMode.Type.CLUSTER_PAUSE);
+    expectedClusterStatus.setManagementModeStatus(ClusterManagementMode.Status.IN_PROGRESS);
+    verifyClusterStatus(expectedClusterStatus);
+
+    // Unblock to finish state transition and delete the ST message
+    latch.countDown();
+
+    // Verify live instance status and cluster status
+    verifyLiveInstanceStatus(_participants, LiveInstance.LiveInstanceStatus.PAUSED);
+
+    expectedClusterStatus = new ClusterStatus();
+    expectedClusterStatus.setManagementMode(ClusterManagementMode.Type.CLUSTER_PAUSE);
+    expectedClusterStatus.setManagementModeStatus(ClusterManagementMode.Status.COMPLETED);
+    verifyClusterStatus(expectedClusterStatus);
+
+    // Verify management mode history
+    Assert.assertTrue(TestHelper.verify(() -> {
+      ControllerHistory history = _accessor.getProperty(keyBuilder.controllerLeaderHistory());
+      List<String> managementHistory = history.getManagementModeHistory();
+      if (managementHistory == null || managementHistory.isEmpty()) {
+        return false;
+      }
+      String lastHistory = managementHistory.get(managementHistory.size() - 1);
+      return lastHistory.contains("MODE=" + ClusterManagementMode.Type.CLUSTER_PAUSE)
+          && lastHistory.contains("STATUS=" + ClusterManagementMode.Status.COMPLETED)
+          && lastHistory.contains("REASON=" + methodName);
+    }, TestHelper.WAIT_DURATION));
+  }
+
+  @Test(dependsOnMethods = "testEnableFreezeMode")
+  public void testNewLiveInstanceAddedWhenFrozen() throws Exception {
+    // Add a new live instance. Simulate an instance is rebooted and back to online
+    String newInstanceName = "localhost_" + (12918 + _numNodes + 1);
+    _gSetupTool.addInstancesToCluster(_clusterName, new String[]{newInstanceName});
+    MockParticipantManager newParticipant =
+        new MockParticipantManager(ZK_ADDR, _clusterName, newInstanceName);
+    newParticipant.syncStart();
+
+    // The new participant/live instance should be frozen by controller
+    verifyLiveInstanceStatus(new MockParticipantManager[]{newParticipant},
+        LiveInstance.LiveInstanceStatus.PAUSED);
+
+    newParticipant.syncStop();
+  }
+
+  // Simulates instance is restarted and the in-memory status is gone.
+  // When instance comes back alive, it'll reset state model, carry over
+  // and set current state to init state.
+  @Test(dependsOnMethods = "testNewLiveInstanceAddedWhenFrozen")
+  public void testRestartParticipantWhenFrozen() throws Exception {
+    String instanceName = _participants[1].getInstanceName();
+    PropertyKey.Builder keyBuilder = _accessor.keyBuilder();
+    List<CurrentState> originCurStates = _accessor
+        .getChildValues(keyBuilder.currentStates(instanceName, _participants[1].getSessionId()),
+            false);
+    String oldSession = _participants[1].getSessionId();
+
+    // Restart participants[1]
+    _participants[1].syncStop();
+    _participants[1] = new MockParticipantManager(ZK_ADDR, _participants[1].getClusterName(),
+        instanceName);
+    _participants[1].syncStart();
+
+    Assert.assertTrue(TestHelper.verify(() ->
+            _gZkClient.exists(keyBuilder.liveInstance(instanceName).getPath()),
+        TestHelper.WAIT_DURATION));
+    LiveInstance liveInstance = _accessor.getProperty(keyBuilder.liveInstance(instanceName));
+
+    // New live instance ephemeral node
+    Assert.assertEquals(liveInstance.getEphemeralOwner(), _participants[1].getSessionId());
+    // Status is frozen because controller sends a freeze message.
+    verifyLiveInstanceStatus(new MockParticipantManager[]{_participants[1]},
+        LiveInstance.LiveInstanceStatus.PAUSED);
+
+    // Old session current state is deleted because of current state carry-over
+    Assert.assertTrue(TestHelper.verify(
+        () -> !_gZkClient.exists(keyBuilder.currentStates(instanceName, oldSession).getPath()),
+        TestHelper.WAIT_DURATION));
+
+    // Current states are set to init states (OFFLINE)
+    List<CurrentState> curStates = _accessor
+        .getChildValues(keyBuilder.currentStates(instanceName, _participants[1].getSessionId()),
+            false);
+    Assert.assertEquals(curStates.size(), 1);
+    Assert.assertTrue(TestHelper.verify(() -> {
+      for (CurrentState cs : originCurStates) {
+        String stateModelDefRef = cs.getStateModelDefRef();
+        for (String partition : cs.getPartitionStateMap().keySet()) {
+          StateModelDefinition stateModelDef =
+              _accessor.getProperty(keyBuilder.stateModelDef(stateModelDefRef));
+          String initState = stateModelDef.getInitialState();
+          if (!initState.equals(curStates.get(0).getPartitionStateMap().get(partition))) {
+            return false;
+          }
+        }
+      }
+      return true;
+    }, TestHelper.WAIT_DURATION));
+  }
+
+  // Partition reset is allowed when cluster is frozen
+  @Test(dependsOnMethods = "testRestartParticipantWhenFrozen")
+  public void testResetPartitionWhenFrozen() throws Exception {
+    String instanceName = _participants[0].getInstanceName();
+    // Remove errTransition
+    _participants[0].setTransition(null);
+    _gSetupTool.getClusterManagementTool().resetPartition(_clusterName, instanceName, "TestDB0",
+        Collections.singletonList("TestDB0_0"));
+
+    // Error partition is reset: ERROR -> OFFLINE
+    Assert.assertTrue(TestHelper.verify(() -> {
+      CurrentState currentState = _accessor.getProperty(_accessor.keyBuilder()
+          .currentState(instanceName, _participants[0].getSessionId(), "TestDB0"));
+      return "OFFLINE".equals(currentState.getPartitionStateMap().get("TestDB0_0"));
+    }, TestHelper.WAIT_DURATION));
+  }
+
+  @Test(dependsOnMethods = "testResetPartitionWhenFrozen")
+  public void testCreateResourceWhenFrozen() {
+    // Add a new resource
+    _gSetupTool.addResourceToCluster(_clusterName, "TestDB1", 2, "MasterSlave");
+    _gSetupTool.rebalanceStorageCluster(_clusterName, "TestDB1", 3);
+
+    // TestDB1 external view is empty
+    TestHelper.verifyWithTimeout("verifyEmptyCurStateAndExtView", 1000, _clusterName, "TestDB1",
+        TestHelper.setOf("localhost_12918", "localhost_12919", "localhost_12920"), ZK_ADDR);
+  }
+
+  @Test(dependsOnMethods = "testCreateResourceWhenFrozen")
+  public void testUnfreezeCluster() throws Exception {
+    String methodName = TestHelper.getTestMethodName();
+    // Unfreeze cluster
+    ClusterManagementModeRequest request = ClusterManagementModeRequest.newBuilder()
+        .withClusterName(_clusterName)
+        .withMode(ClusterManagementMode.Type.NORMAL)
+        .withReason(methodName)
+        .build();
+    _gSetupTool.getClusterManagementTool().setClusterManagementMode(request);
+
+    verifyLiveInstanceStatus(_participants, null);
+
+    ClusterStatus expectedClusterStatus = new ClusterStatus();
+    expectedClusterStatus.setManagementMode(ClusterManagementMode.Type.NORMAL);
+    expectedClusterStatus.setManagementModeStatus(ClusterManagementMode.Status.COMPLETED);
+    verifyClusterStatus(expectedClusterStatus);
+
+    // Verify management mode history: NORMAL + COMPLETED
+    Assert.assertTrue(TestHelper.verify(() -> {
+      ControllerHistory history =
+          _accessor.getProperty(_accessor.keyBuilder().controllerLeaderHistory());
+      List<String> managementHistory = history.getManagementModeHistory();
+      if (managementHistory == null || managementHistory.isEmpty()) {
+        return false;
+      }
+      String lastHistory = managementHistory.get(managementHistory.size() - 1);
+      return lastHistory.contains("MODE=" + ClusterManagementMode.Type.NORMAL)
+          && lastHistory.contains("STATUS=" + ClusterManagementMode.Status.COMPLETED);
+    }, TestHelper.WAIT_DURATION));
+
+    // Verify cluster's normal rebalance ability after unfrozen.
+    Assert.assertTrue(ClusterStateVerifier.verifyByZkCallback(
+        new ClusterStateVerifier.BestPossAndExtViewZkVerifier(ZK_ADDR, _clusterName)));
+  }
+
+  private void verifyLiveInstanceStatus(MockParticipantManager[] participants,
+      LiveInstance.LiveInstanceStatus status) throws Exception {
+    final PropertyKey.Builder keyBuilder = _accessor.keyBuilder();
+    Assert.assertTrue(TestHelper.verify(() -> {
+      for (MockParticipantManager participant : participants) {
+        String instanceName = participant.getInstanceName();
+        LiveInstance liveInstance = _accessor.getProperty(keyBuilder.liveInstance(instanceName));
+        if (status != liveInstance.getStatus()) {
+          return false;
+        }
+      }
+      return true;
+    }, TestHelper.WAIT_DURATION));
+  }
+
+  private void verifyClusterStatus(ClusterStatus expectedMode) throws Exception {
+    final PropertyKey statusPropertyKey = _accessor.keyBuilder().clusterStatus();
+    TestHelper.verify(() -> {
+      ClusterStatus clusterStatus = _accessor.getProperty(statusPropertyKey);
+      return clusterStatus != null
+          && expectedMode.getManagementMode().equals(clusterStatus.getManagementMode())
+          && expectedMode.getManagementModeStatus().equals(clusterStatus.getManagementModeStatus());
+    }, TestHelper.WAIT_DURATION);
+  }
+
+  private static class BlockingTransition extends MockTransition {
+    private static final Logger LOG = LoggerFactory.getLogger(BlockingTransition.class);
+    private final CountDownLatch _countDownLatch;
+
+    private BlockingTransition(CountDownLatch countDownLatch) {
+      _countDownLatch = countDownLatch;
+    }
+
+    @Override
+    public void doTransition(Message message, NotificationContext context)
+        throws InterruptedException {
+      LOG.info("Transition is blocked");
+      _countDownLatch.await();
+      LOG.info("Transition is completed");
+    }
+  }
+}