diff --git a/helix-core/src/main/java/org/apache/helix/controller/GenericHelixController.java b/helix-core/src/main/java/org/apache/helix/controller/GenericHelixController.java
index 7da6ef0..6cc1e5f 100644
--- a/helix-core/src/main/java/org/apache/helix/controller/GenericHelixController.java
+++ b/helix-core/src/main/java/org/apache/helix/controller/GenericHelixController.java
@@ -103,9 +103,7 @@
 import org.apache.helix.model.IdealState;
 import org.apache.helix.model.InstanceConfig;
 import org.apache.helix.model.LiveInstance;
-import org.apache.helix.model.MaintenanceSignal;
 import org.apache.helix.model.Message;
-import org.apache.helix.model.PauseSignal;
 import org.apache.helix.model.ResourceConfig;
 import org.apache.helix.monitoring.mbeans.ClusterEventMonitor;
 import org.apache.helix.monitoring.mbeans.ClusterStatusMonitor;
@@ -177,13 +175,6 @@
   private long _continuousTaskRebalanceFailureCount = 0;
 
   /**
-   * The _paused flag is checked by function handleEvent(), while if the flag is set handleEvent()
-   * will be no-op. Other event handling logic keeps the same when the flag is set.
-   */
-  private boolean _paused;
-  private boolean _inMaintenanceMode;
-
-  /**
    * The executors that can periodically run the rebalancing pipeline. A
    * SingleThreadScheduledExecutor will start if there is resource group that has the config to do
    * periodically rebalance.
@@ -837,18 +828,16 @@
     // have this instanceof clauses
     List<Pipeline> pipelines;
     boolean isTaskFrameworkPipeline = false;
-    Pipeline.Type pipelineType;
+    boolean isManagementPipeline = false;
 
     if (dataProvider instanceof ResourceControllerDataProvider) {
       pipelines = _registry.getPipelinesForEvent(event.getEventType());
-      pipelineType = Pipeline.Type.DEFAULT;
     } else if (dataProvider instanceof WorkflowControllerDataProvider) {
       pipelines = _taskRegistry.getPipelinesForEvent(event.getEventType());
       isTaskFrameworkPipeline = true;
-      pipelineType = Pipeline.Type.TASK;
     } else if (dataProvider instanceof ManagementControllerDataProvider) {
       pipelines = _managementModeRegistry.getPipelinesForEvent(event.getEventType());
-      pipelineType = Pipeline.Type.MANAGEMENT_MODE;
+      isManagementPipeline = true;
     } else {
       logger.warn(String
           .format("No %s pipeline to run for event: %s::%s", dataProvider.getPipelineName(),
@@ -857,11 +846,10 @@
     }
 
     // Should not run management mode and default/task pipelines at the same time.
-    if ((_inManagementMode && !Pipeline.Type.MANAGEMENT_MODE.equals(pipelineType))
-        || (!_inManagementMode && Pipeline.Type.MANAGEMENT_MODE.equals(pipelineType))) {
+    if (_inManagementMode != isManagementPipeline) {
       logger.info("Should not run management mode and default/task pipelines at the same time. "
-              + "cluster={}, inManagementMode={}, pipelineType={}. Ignoring the event: {}",
-          manager.getClusterName(), _inManagementMode, pipelineType, event.getEventType());
+              + "cluster={}, inManagementMode={}, isManagementPipeline={}. Ignoring the event: {}",
+          manager.getClusterName(), _inManagementMode, isManagementPipeline, event.getEventType());
       return;
     }
 
@@ -881,6 +869,8 @@
           checkRebalancingTimer(manager, Collections.<IdealState>emptyList(), dataProvider.getClusterConfig());
         }
         if (_isMonitoring) {
+          _clusterStatusMonitor.setEnabled(!_inManagementMode);
+          _clusterStatusMonitor.setPaused(_inManagementMode);
           event.addAttribute(AttributeName.clusterStatusMonitor.name(), _clusterStatusMonitor);
         }
       }
@@ -1335,25 +1325,8 @@
     }
 
     if (controllerIsLeader) {
-      HelixManager manager = changeContext.getManager();
-      HelixDataAccessor accessor = manager.getHelixDataAccessor();
-      Builder keyBuilder = accessor.keyBuilder();
-
-      PauseSignal pauseSignal = accessor.getProperty(keyBuilder.pause());
-      MaintenanceSignal maintenanceSignal = accessor.getProperty(keyBuilder.maintenance());
-      boolean prevPaused = _paused;
-      boolean prevInMaintenanceMode = _inMaintenanceMode;
-      _paused = updateControllerState(pauseSignal, _paused);
-      _inMaintenanceMode = updateControllerState(maintenanceSignal, _inMaintenanceMode);
-      // TODO: remove triggerResumeEvent when moving pause/maintenance to management pipeline
-      if (!triggerResumeEvent(changeContext, prevPaused, prevInMaintenanceMode)) {
-        pushToEventQueues(ClusterEventType.ControllerChange, changeContext, Collections.emptyMap());
-      }
-
       enableClusterStatusMonitor(true);
-      _clusterStatusMonitor.setEnabled(!_paused);
-      _clusterStatusMonitor.setPaused(_paused);
-      _clusterStatusMonitor.setMaintenance(_inMaintenanceMode);
+      pushToEventQueues(ClusterEventType.ControllerChange, changeContext, Collections.emptyMap());
     } else {
       enableClusterStatusMonitor(false);
       // Note that onControllerChange is executed in parallel with the event processing thread. It
@@ -1543,43 +1516,6 @@
     }
   }
 
-  private boolean updateControllerState(PauseSignal signal, boolean statusFlag) {
-    if (signal != null) {
-      if (!statusFlag) {
-        statusFlag = true;
-        // This log is recorded for the first time entering PAUSE/MAINTENANCE mode
-        logger.info(String.format("controller is now %s",
-            (signal instanceof MaintenanceSignal) ? "in maintenance mode" : "paused"));
-      }
-    } else {
-      statusFlag = false;
-    }
-    return statusFlag;
-  }
-
-  /**
-   * Trigger a Resume Event if the cluster is back to activated.
-   * @param changeContext
-   * @param prevPaused the previous paused status.
-   * @param prevInMaintenanceMode the previous in maintenance mode status.
-   */
-  private boolean triggerResumeEvent(NotificationContext changeContext, boolean prevPaused,
-      boolean prevInMaintenanceMode) {
-    /**
-     * WARNING: the logic here is tricky.
-     * 1. Only resume if not paused. So if the Maintenance mode is removed but the cluster is still
-     * paused, the resume event should not be sent.
-     * 2. Only send resume event if the status is changed back to active. So we don't send multiple
-     * event unnecessarily.
-     */
-    if (!_paused && (prevPaused || (prevInMaintenanceMode && !_inMaintenanceMode))) {
-      pushToEventQueues(ClusterEventType.Resume, changeContext, Collections.EMPTY_MAP);
-      logger.info("controller is now resumed from paused/maintenance state");
-      return true;
-    }
-    return false;
-  }
-
   // TODO: refactor this to use common/ClusterEventProcessor.
   @Deprecated
   private class ClusterEventProcessor extends Thread {
diff --git a/helix-core/src/main/java/org/apache/helix/controller/dataproviders/BaseControllerDataProvider.java b/helix-core/src/main/java/org/apache/helix/controller/dataproviders/BaseControllerDataProvider.java
index 6ce25e2..3c705f4 100644
--- a/helix-core/src/main/java/org/apache/helix/controller/dataproviders/BaseControllerDataProvider.java
+++ b/helix-core/src/main/java/org/apache/helix/controller/dataproviders/BaseControllerDataProvider.java
@@ -57,6 +57,7 @@
 import org.apache.helix.model.MaintenanceSignal;
 import org.apache.helix.model.Message;
 import org.apache.helix.model.ParticipantHistory;
+import org.apache.helix.model.PauseSignal;
 import org.apache.helix.model.ResourceConfig;
 import org.apache.helix.model.StateModelDefinition;
 import org.apache.helix.task.TaskConstants;
@@ -87,6 +88,7 @@
 
   private boolean _updateInstanceOfflineTime = true;
   private MaintenanceSignal _maintenanceSignal;
+  private PauseSignal _pauseSignal;
   private boolean _isMaintenanceModeEnabled;
   private boolean _hasMaintenanceSignalChanged;
   private ExecutorService _asyncTasksThreadPool;
@@ -300,8 +302,9 @@
     }
   }
 
-  private void updateMaintenanceInfo(final HelixDataAccessor accessor) {
+  private void refreshManagementSignals(final HelixDataAccessor accessor) {
     _maintenanceSignal = accessor.getProperty(accessor.keyBuilder().maintenance());
+    _pauseSignal = accessor.getProperty(accessor.keyBuilder().pause());
     _isMaintenanceModeEnabled = _maintenanceSignal != null;
     // The following flag is to guarantee that there's only one update per pineline run because we
     // check for whether maintenance recovery could happen twice every pipeline
@@ -373,7 +376,7 @@
     refreshResourceConfig(accessor, refreshedTypes);
     _stateModelDefinitionCache.refresh(accessor);
     _clusterConstraintsCache.refresh(accessor);
-    updateMaintenanceInfo(accessor);
+    refreshManagementSignals(accessor);
     timeoutNodesDuringMaintenance(accessor, _clusterConfig, _isMaintenanceModeEnabled);
 
     // TODO: once controller gets split, only one controller should update offline instance history
@@ -622,6 +625,16 @@
   }
 
   /**
+   * Gets all messages for each instance.
+   *
+   * @return Map of {instanceName -> Collection of Message}.
+   */
+  public Map<String, Collection<Message>> getAllInstancesMessages() {
+    return getAllInstances().stream().collect(
+        Collectors.toMap(instance -> instance, instance -> getMessages(instance).values()));
+  }
+
+  /**
    * This function is supposed to be only used by testing purpose for safety. For "get" usage,
    * please use getStaleMessagesByInstance.
    */
@@ -968,6 +981,10 @@
     return _maintenanceSignal;
   }
 
+  public PauseSignal getPauseSignal() {
+    return _pauseSignal;
+  }
+
   protected StringBuilder genCacheContentStringBuilder() {
     StringBuilder sb = new StringBuilder();
     sb.append(String.format("liveInstaceMap: %s", _liveInstanceCache.getPropertyMap())).append("\n");
diff --git a/helix-core/src/main/java/org/apache/helix/controller/dataproviders/ManagementControllerDataProvider.java b/helix-core/src/main/java/org/apache/helix/controller/dataproviders/ManagementControllerDataProvider.java
index d178ca5..fe940ff 100644
--- a/helix-core/src/main/java/org/apache/helix/controller/dataproviders/ManagementControllerDataProvider.java
+++ b/helix-core/src/main/java/org/apache/helix/controller/dataproviders/ManagementControllerDataProvider.java
@@ -19,9 +19,27 @@
  * under the License.
  */
 
+import java.util.Arrays;
+import java.util.List;
+
+import org.apache.helix.HelixConstants;
+
+/**
+ * Data provider for controller management mode pipeline.
+ */
 public class ManagementControllerDataProvider extends BaseControllerDataProvider {
-  // TODO: implement this class to only refresh required event types
-  public ManagementControllerDataProvider(String clusterName, String name) {
-    super(clusterName, name);
+  // Only these types of properties are refreshed for the full refresh request.
+  private static final List<HelixConstants.ChangeType> FULL_REFRESH_PROPERTIES =
+      Arrays.asList(HelixConstants.ChangeType.LIVE_INSTANCE, HelixConstants.ChangeType.MESSAGE);
+
+  public ManagementControllerDataProvider(String clusterName, String pipelineName) {
+    super(clusterName, pipelineName);
+  }
+
+  @Override
+  public void requireFullRefresh() {
+    for (HelixConstants.ChangeType type : FULL_REFRESH_PROPERTIES) {
+      _propertyDataChangedMap.get(type).set(true);
+    }
   }
 }
diff --git a/helix-core/src/main/java/org/apache/helix/controller/stages/ManagementModeStage.java b/helix-core/src/main/java/org/apache/helix/controller/stages/ManagementModeStage.java
index 512224d..042aa14 100644
--- a/helix-core/src/main/java/org/apache/helix/controller/stages/ManagementModeStage.java
+++ b/helix-core/src/main/java/org/apache/helix/controller/stages/ManagementModeStage.java
@@ -19,9 +19,9 @@
  * under the License.
  */
 
+import org.apache.helix.controller.LogUtil;
 import org.apache.helix.controller.dataproviders.ManagementControllerDataProvider;
 import org.apache.helix.controller.pipeline.AbstractBaseStage;
-import org.apache.helix.controller.pipeline.StageException;
 import org.apache.helix.util.HelixUtil;
 import org.apache.helix.util.RebalanceUtil;
 import org.slf4j.Logger;
@@ -36,13 +36,21 @@
   @Override
   public void process(ClusterEvent event) throws Exception {
     // TODO: implement the stage
+    _eventId = event.getEventId();
     String clusterName = event.getClusterName();
     ManagementControllerDataProvider cache =
         event.getAttribute(AttributeName.ControllerDataProvider.name());
-    if (!HelixUtil.inManagementMode(cache)) {
-      LOG.info("Exiting management mode pipeline for cluster {}", clusterName);
+
+    // TODO: move to the last stage of management pipeline
+    checkInManagementMode(clusterName, cache);
+  }
+
+  private void checkInManagementMode(String clusterName, ManagementControllerDataProvider cache) {
+    // Should exit management mode
+    if (!HelixUtil.inManagementMode(cache.getPauseSignal(), cache.getLiveInstances(),
+        cache.getEnabledLiveInstances(), cache.getAllInstancesMessages())) {
+      LogUtil.logInfo(LOG, _eventId, "Exiting management mode pipeline for cluster " + clusterName);
       RebalanceUtil.enableManagementMode(clusterName, false);
-      throw new StageException("Exiting management mode pipeline for cluster " + clusterName);
     }
   }
 }
diff --git a/helix-core/src/main/java/org/apache/helix/controller/stages/ResourceValidationStage.java b/helix-core/src/main/java/org/apache/helix/controller/stages/ResourceValidationStage.java
index a4d4783..613ce2e 100644
--- a/helix-core/src/main/java/org/apache/helix/controller/stages/ResourceValidationStage.java
+++ b/helix-core/src/main/java/org/apache/helix/controller/stages/ResourceValidationStage.java
@@ -28,6 +28,7 @@
 import org.apache.helix.model.IdealState;
 import org.apache.helix.model.Resource;
 import org.apache.helix.model.StateModelDefinition;
+import org.apache.helix.monitoring.mbeans.ClusterStatusMonitor;
 import org.apache.helix.util.HelixUtil;
 import org.apache.helix.util.RebalanceUtil;
 import org.slf4j.Logger;
@@ -44,12 +45,7 @@
       throw new StageException("Missing attributes in event:" + event + ". Requires DataCache");
     }
 
-    // Check if cluster is still in management mode. Eg. there exists any frozen live instance.
-    if (HelixUtil.inManagementMode(cache)) {
-      // Trigger an immediate management mode pipeline.
-      RebalanceUtil.enableManagementMode(event.getClusterName(), true);
-      throw new StageException("Pipeline should not be run because cluster is in management mode");
-    }
+    processManagementMode(event, cache);
 
     Map<String, Resource> resourceMap = event.getAttribute(AttributeName.RESOURCES.name());
     if (resourceMap == null) {
@@ -91,6 +87,27 @@
     }
   }
 
+  private void processManagementMode(ClusterEvent event, BaseControllerDataProvider cache)
+      throws StageException {
+    // Set cluster status monitor for maintenance mode
+    ClusterStatusMonitor monitor = event.getAttribute(AttributeName.clusterStatusMonitor.name());
+    if (monitor != null) {
+      monitor.setMaintenance(cache.isMaintenanceModeEnabled());
+    }
+
+    // Check if cluster is still in management mode. Eg. there exists any frozen live instance.
+    if (HelixUtil.inManagementMode(cache.getPauseSignal(), cache.getLiveInstances(),
+        cache.getEnabledLiveInstances(), cache.getAllInstancesMessages())) {
+      // Trigger an immediate management mode pipeline.
+      LogUtil.logInfo(LOG, _eventId,
+          "Enabling management mode pipeline for cluster " + event.getClusterName());
+      RebalanceUtil.enableManagementMode(event.getClusterName(), true);
+      throw new StageException(
+          "Pipeline should not be run because cluster " + event.getClusterName()
+              + "is in management mode");
+    }
+  }
+
   /**
    * Check if the ideal state adheres to a rule
    * @param idealState the ideal state to check
diff --git a/helix-core/src/main/java/org/apache/helix/model/Message.java b/helix-core/src/main/java/org/apache/helix/model/Message.java
index b509506..6222df5 100644
--- a/helix-core/src/main/java/org/apache/helix/model/Message.java
+++ b/helix-core/src/main/java/org/apache/helix/model/Message.java
@@ -59,6 +59,7 @@
     NO_OP,
     PARTICIPANT_ERROR_REPORT,
     PARTICIPANT_SESSION_CHANGE,
+    PARTICIPANT_STATUS_CHANGE,
     CHAINED_MESSAGE, // this is a message subtype
     RELAYED_MESSAGE
   }
@@ -927,6 +928,10 @@
     return getTgtName().equalsIgnoreCase(InstanceType.CONTROLLER.name());
   }
 
+  public boolean isParticipantStatusChangeType() {
+    return MessageType.PARTICIPANT_STATUS_CHANGE.name().equalsIgnoreCase(getMsgType());
+  }
+
   /**
    * Get the {@link PropertyKey} for this message
    * @param keyBuilder PropertyKey Builder
diff --git a/helix-core/src/main/java/org/apache/helix/tools/ClusterVerifiers/BestPossibleExternalViewVerifier.java b/helix-core/src/main/java/org/apache/helix/tools/ClusterVerifiers/BestPossibleExternalViewVerifier.java
index eaab1a2..3b133a1 100644
--- a/helix-core/src/main/java/org/apache/helix/tools/ClusterVerifiers/BestPossibleExternalViewVerifier.java
+++ b/helix-core/src/main/java/org/apache/helix/tools/ClusterVerifiers/BestPossibleExternalViewVerifier.java
@@ -416,7 +416,7 @@
    */
   private BestPossibleStateOutput calcBestPossState(ResourceControllerDataProvider cache, Set<String> resources)
       throws Exception {
-    ClusterEvent event = new ClusterEvent(ClusterEventType.StateVerifier);
+    ClusterEvent event = new ClusterEvent(_clusterName, ClusterEventType.StateVerifier);
     event.addAttribute(AttributeName.ControllerDataProvider.name(), cache);
 
     RebalanceUtil.runStage(event, new ResourceComputationStage());
diff --git a/helix-core/src/main/java/org/apache/helix/util/HelixUtil.java b/helix-core/src/main/java/org/apache/helix/util/HelixUtil.java
index 3151716..c770583 100644
--- a/helix-core/src/main/java/org/apache/helix/util/HelixUtil.java
+++ b/helix-core/src/main/java/org/apache/helix/util/HelixUtil.java
@@ -21,6 +21,7 @@
 
 import java.util.ArrayList;
 import java.util.Arrays;
+import java.util.Collection;
 import java.util.Collections;
 import java.util.HashMap;
 import java.util.HashSet;
@@ -38,7 +39,6 @@
 import org.apache.helix.HelixException;
 import org.apache.helix.PropertyType;
 import org.apache.helix.controller.common.PartitionStateMap;
-import org.apache.helix.controller.dataproviders.BaseControllerDataProvider;
 import org.apache.helix.controller.dataproviders.ResourceControllerDataProvider;
 import org.apache.helix.controller.rebalancer.AbstractRebalancer;
 import org.apache.helix.controller.rebalancer.strategy.RebalanceStrategy;
@@ -60,6 +60,7 @@
 import org.apache.helix.model.LiveInstance;
 import org.apache.helix.model.Message;
 import org.apache.helix.model.Partition;
+import org.apache.helix.model.PauseSignal;
 import org.apache.helix.model.ResourceAssignment;
 import org.apache.helix.model.ResourceConfig;
 import org.apache.helix.model.StateModelDefinition;
@@ -550,13 +551,33 @@
   }
 
   /**
-   * Checks whether or not the cluster is in management mode.
+   * Checks whether or not the cluster is in management mode. It checks:
+   * - pause signal
+   * - live instances: whether any live instance is not in normal status, eg. frozen.
+   * - messages: whether live instance has a participant status change message
    *
-   * @param cache
-   * @return
+   * @param pauseSignal pause signal
+   * @param liveInstanceMap map of live instances
+   * @param enabledLiveInstances set of enabled live instance names. They should be all included
+   *                             in the liveInstanceMap.
+   * @param instancesMessages a map of all instances' messages.
+   * @return true if cluster is in management mode; otherwise, false
    */
-  public static boolean inManagementMode(BaseControllerDataProvider cache) {
-    // TODO: implement the logic. Parameters can also change
-    return true;
+  public static boolean inManagementMode(PauseSignal pauseSignal,
+      Map<String, LiveInstance> liveInstanceMap, Set<String> enabledLiveInstances,
+      Map<String, Collection<Message>> instancesMessages) {
+    // Check pause signal and abnormal live instances (eg. in freeze mode)
+    // TODO: should check maintenance signal when moving maintenance to management pipeline
+    return pauseSignal != null || enabledLiveInstances.stream().anyMatch(
+        instance -> isInstanceInManagementMode(instance, liveInstanceMap, instancesMessages));
+  }
+
+  private static boolean isInstanceInManagementMode(String instance,
+      Map<String, LiveInstance> liveInstanceMap,
+      Map<String, Collection<Message>> instancesMessages) {
+    // Check live instance status and participant status change message
+    return LiveInstance.LiveInstanceStatus.PAUSED.equals(liveInstanceMap.get(instance).getStatus())
+        || (instancesMessages.getOrDefault(instance, Collections.emptyList()).stream()
+        .anyMatch(Message::isParticipantStatusChangeType));
   }
 }
diff --git a/helix-core/src/main/java/org/apache/helix/util/RebalanceUtil.java b/helix-core/src/main/java/org/apache/helix/util/RebalanceUtil.java
index db2b76f..f74b98f 100644
--- a/helix-core/src/main/java/org/apache/helix/util/RebalanceUtil.java
+++ b/helix-core/src/main/java/org/apache/helix/util/RebalanceUtil.java
@@ -159,8 +159,8 @@
           enabled);
       leaderController.setInManagementMode(enabled);
     } else {
-      LOG.error("Failed to switch management mode pipeline, enabled={}. "
-          + "Controller for cluster {} does not exist", clusterName, enabled);
+      throw new HelixException(String.format("Failed to switch management mode pipeline, "
+          + "enabled=%s. Controller for cluster %s does not exist", enabled, clusterName));
     }
 
     // Triggers an event to immediately run the pipeline
