Replace the HELIX_ENABLED config with InstanceOperation while maintaining backwards compatibility with old APIs (#2772)

Replace the HELIX_ENABLED config with InstanceOperation while maintaining backwards compatibility with old APIs.

In order to unify HELIX_ENABLED functionality with InstanceOperation, InstanceOperation will now have the following options: ENABLE, DISABLE, EVACUATE, SWAP_IN, UNKNOWN
diff --git a/helix-common/src/main/java/org/apache/helix/constants/InstanceConstants.java b/helix-common/src/main/java/org/apache/helix/constants/InstanceConstants.java
index e2cc2de..07eb498 100644
--- a/helix-common/src/main/java/org/apache/helix/constants/InstanceConstants.java
+++ b/helix-common/src/main/java/org/apache/helix/constants/InstanceConstants.java
@@ -1,8 +1,32 @@
 package org.apache.helix.constants;
 
+import java.util.Set;
+
 public class InstanceConstants {
   public static final String INSTANCE_NOT_DISABLED = "INSTANCE_NOT_DISABLED";
 
+  /**
+   * The set contains the InstanceOperations that are allowed to be assigned replicas by the rebalancer.
+   */
+  public static final Set<InstanceOperation> ASSIGNABLE_INSTANCE_OPERATIONS =
+      Set.of(InstanceOperation.ENABLE, InstanceOperation.DISABLE);
+
+
+  /**
+   * The set contains the InstanceOperations that are overridden when the deprecated HELIX_ENABLED
+   * field is set to false. This will maintain backwards compatibility with the deprecated field.
+   * TODO: Remove this when the deprecated HELIX_ENABLED is removed.
+   */
+  public static final Set<InstanceOperation> INSTANCE_DISABLED_OVERRIDABLE_OPERATIONS =
+      Set.of(InstanceOperation.ENABLE, InstanceOperation.DISABLE, InstanceOperation.EVACUATE);
+
+
+  /**
+   * The set of InstanceOperations that are not allowed to be populated in the RoutingTableProvider.
+   */
+  public static final Set<InstanceOperation> UNSERVABLE_INSTANCE_OPERATIONS =
+      Set.of(InstanceOperation.SWAP_IN, InstanceOperation.UNKNOWN);
+
   public enum InstanceDisabledType {
     CLOUD_EVENT,
     USER_OPERATION,
@@ -10,8 +34,35 @@
   }
 
   public enum InstanceOperation {
-    EVACUATE, // Node will be removed after a period of time
-    SWAP_IN,  // New node joining for swap operation
-    SWAP_OUT // Existing Node to be removed for swap operation
+    /**
+     * Behavior: Replicas will be assigned to the node and will receive upward state transitions if
+     * for new assignments and downward state transitions if replicas are being moved elsewhere.
+     * Final State: The node will have replicas assigned to it and will be considered for future assignment.
+     */
+    ENABLE,
+    /**
+     * Behavior: All replicas on the node will be set to OFFLINE.
+     * Final State: The node will have all replicas in the OFFLINE state and can't take new assignment.
+     */
+    DISABLE,
+    /**
+     * Behavior: All replicas will be moved off the node, after a replacement has been bootstrapped
+     * in another node in the cluster.
+     * Final State: The node will not contain any replicas and will not be considered for *NEW* assignment.
+     */
+    EVACUATE,
+    /**
+     * Behavior: Node will have all replicas on its corresponding(same logicalId) swap-out node bootstrapped
+     * (ERROR and OFFLINE replicas on swap-out node will not be bootstrapped) to the same states if the StateModelDef allows.
+     * This node will be excluded from the RoutingTableProvider.
+     * Final State: This node will be a mirror the swap-out node, will not be considered for assignment, and will not be populated
+     * in the RoutingTableProvider.
+     */
+    SWAP_IN,
+    /**
+     * Behavior: Node will have all of its replicas dropped immediately and will be removed from the RoutingTableProvider.
+     * Final State: Node will not hold replicas, be considered for assignment, or be populated in the RoutingTableProvider.
+     */
+    UNKNOWN
   }
 }
diff --git a/helix-core/src/main/java/org/apache/helix/HelixAdmin.java b/helix-core/src/main/java/org/apache/helix/HelixAdmin.java
index 021332b..d2e0c26 100644
--- a/helix-core/src/main/java/org/apache/helix/HelixAdmin.java
+++ b/helix-core/src/main/java/org/apache/helix/HelixAdmin.java
@@ -281,9 +281,11 @@
    * @param instanceName
    * @param enabled
    */
+  @Deprecated
   void enableInstance(String clusterName, String instanceName, boolean enabled);
 
   /**
+   * @deprecated use {@link #setInstanceOperation(String, String, InstanceConstants.InstanceOperation)}
    * @param clusterName
    * @param instanceName
    * @param enabled
@@ -292,20 +294,24 @@
    * @param reason set additional string description on why the instance is disabled when
    *          <code>enabled</code> is false. Existing disabled reason will be over write if instance is in disabled state.
    */
+  @Deprecated
   void enableInstance(String clusterName, String instanceName, boolean enabled,
       InstanceConstants.InstanceDisabledType disabledType, String reason);
 
   /**
    * Batch enable/disable instances in a cluster
    * By default, all the instances are enabled
+   * @deprecated use {@link #setInstanceOperation(String, String, InstanceConstants.InstanceOperation)}
    * @param clusterName
    * @param instances
    * @param enabled
    */
+  @Deprecated
   void enableInstance(String clusterName, List<String> instances, boolean enabled);
 
   /**
-   * Set the instanceOperation field.
+   * Set the instanceOperation field. Setting it to null is equivalent to
+   * ENABLE.
    *
    * @param clusterName       The cluster name
    * @param instanceName      The instance name
diff --git a/helix-core/src/main/java/org/apache/helix/controller/dataproviders/BaseControllerDataProvider.java b/helix-core/src/main/java/org/apache/helix/controller/dataproviders/BaseControllerDataProvider.java
index 1e40bbb..a91ae12 100644
--- a/helix-core/src/main/java/org/apache/helix/controller/dataproviders/BaseControllerDataProvider.java
+++ b/helix-core/src/main/java/org/apache/helix/controller/dataproviders/BaseControllerDataProvider.java
@@ -64,7 +64,6 @@
 import org.apache.helix.model.StateModelDefinition;
 import org.apache.helix.task.TaskConstants;
 import org.apache.helix.util.HelixUtil;
-import org.apache.helix.util.InstanceValidationUtil;
 import org.apache.helix.zookeeper.datamodel.ZNRecord;
 import org.apache.helix.zookeeper.zkclient.DataUpdater;
 import org.slf4j.Logger;
@@ -120,30 +119,40 @@
   private final Set<String> _disabledInstanceSet = new HashSet<>();
 
   private static final class DerivedInstanceCache {
-    // Assignable instances are instances will contain at most one instance with a given logicalId.
-    // This is used for SWAP related operations where there can be two instances with the same logicalId.
+    private final Map<InstanceConstants.InstanceOperation, Map<String, InstanceConfig>>
+        _instanceConfigMapByInstanceOperation;
     private final Map<String, InstanceConfig> _assignableInstanceConfigMap;
     private final Map<String, LiveInstance> _assignableLiveInstancesMap;
     private final Map<String, String> _swapOutInstanceNameToSwapInInstanceName;
+    private final Map<String, String> _swapInInstanceNameToSwapOutInstanceName;
     private final Set<String> _liveSwapInInstanceNames;
-    private final Set<String> _enabledSwapInInstanceNames;
 
-    DerivedInstanceCache(Map<String, InstanceConfig> assignableInstanceConfigMap,
+    DerivedInstanceCache(
+        Map<InstanceConstants.InstanceOperation, Map<String, InstanceConfig>> instanceConfigMapByInstanceOperation,
+        Map<String, InstanceConfig> assignableInstanceConfigMap,
         Map<String, LiveInstance> assignableLiveInstancesMap,
         Map<String, String> swapOutInstanceNameToSwapInInstanceName,
-        Set<String> liveSwapInInstanceNames, Set<String> enabledSwapInInstanceNames) {
+        Set<String> liveSwapInInstanceNames) {
+      _instanceConfigMapByInstanceOperation = instanceConfigMapByInstanceOperation;
       _assignableInstanceConfigMap = assignableInstanceConfigMap;
       _assignableLiveInstancesMap = assignableLiveInstancesMap;
       _swapOutInstanceNameToSwapInInstanceName = swapOutInstanceNameToSwapInInstanceName;
+      _swapInInstanceNameToSwapOutInstanceName = swapOutInstanceNameToSwapInInstanceName.entrySet()
+          .stream().collect(Collectors.toMap(Map.Entry::getValue, Map.Entry::getKey));
       _liveSwapInInstanceNames = liveSwapInInstanceNames;
-      _enabledSwapInInstanceNames = enabledSwapInInstanceNames;
+    }
+
+    private Map<String, InstanceConfig> getInstanceConfigMapByInstanceOperation(
+        InstanceConstants.InstanceOperation instanceOperation) {
+      return _instanceConfigMapByInstanceOperation.getOrDefault(instanceOperation,
+          Collections.emptyMap());
     }
   }
 
   // All maps and sets are encapsulated in DerivedInstanceCache to ensure that they are updated together
   // as a snapshot.
   private DerivedInstanceCache _derivedInstanceCache =
-      new DerivedInstanceCache(new HashMap<>(), new HashMap<>(), new HashMap<>(), new HashSet<>(),
+      new DerivedInstanceCache(new HashMap<>(), new HashMap<>(), new HashMap<>(), new HashMap<>(),
           new HashSet<>());
   private final Map<String, MonitoredAbnormalResolver> _abnormalStateResolverMap = new HashMap<>();
   private final Set<String> _timedOutInstanceDuringMaintenance = new HashSet<>();
@@ -383,15 +392,14 @@
         ClusterTopologyConfig.createFromClusterConfig(clusterConfig);
 
     // Create new caches to be populated.
+    Map<InstanceConstants.InstanceOperation, Map<String, InstanceConfig>>
+        newInstanceConfigMapByInstanceOperation = new HashMap<>();
     Map<String, InstanceConfig> newAssignableInstanceConfigMap = new HashMap<>();
     Map<String, LiveInstance> newAssignableLiveInstancesMap = new HashMap<>();
-    Map<String, String> newSwapOutInstanceNameToSwapInInstanceName = new HashMap<>();
+    Map<String, String> newSwapOutInstanceNameToSwapOutInstanceName = new HashMap<>();
     Set<String> newLiveSwapInInstanceNames = new HashSet<>();
-    Set<String> newEnabledSwapInInstanceNames = new HashSet<>();
-
-    Map<String, String> filteredInstancesByLogicalId = new HashMap<>();
-    Map<String, String> swapOutLogicalIdsByInstanceName = new HashMap<>();
-    Map<String, String> swapInInstancesByLogicalId = new HashMap<>();
+    Map<String, String> swapInLogicalIdsByInstanceName = new HashMap<>();
+    Map<String, String> nonSwapInInstancesByLogicalId = new HashMap<>();
 
     for (Map.Entry<String, InstanceConfig> entry : instanceConfigMap.entrySet()) {
       String node = entry.getKey();
@@ -404,44 +412,20 @@
       String currentInstanceLogicalId =
           currentInstanceConfig.getLogicalId(clusterTopologyConfig.getEndNodeType());
 
-      // Filter out instances with duplicate logical IDs. If there are duplicates, the instance with
-      // InstanceOperation SWAP_OUT will be chosen over the instance with SWAP_IN. SWAP_IN is not
-      // assignable. If there are duplicates with one node having no InstanceOperation and the other
-      // having SWAP_OUT, the node with no InstanceOperation will be chosen. This signifies SWAP
-      // completion, therefore making the node assignable.
-      if (filteredInstancesByLogicalId.containsKey(currentInstanceLogicalId)) {
-        String filteredNode = filteredInstancesByLogicalId.get(currentInstanceLogicalId);
-        InstanceConfig filteredDuplicateInstanceConfig = instanceConfigMap.get(filteredNode);
+      newInstanceConfigMapByInstanceOperation.computeIfAbsent(
+              currentInstanceConfig.getInstanceOperation(), k -> new HashMap<>())
+          .put(node, currentInstanceConfig);
 
-        if ((filteredDuplicateInstanceConfig.getInstanceOperation()
-            .equals(InstanceConstants.InstanceOperation.SWAP_IN.name())
-            && currentInstanceConfig.getInstanceOperation()
-            .equals(InstanceConstants.InstanceOperation.SWAP_OUT.name()))
-            || currentInstanceConfig.getInstanceOperation().isEmpty()) {
-          // If the already filtered instance is SWAP_IN and this instance is in SWAP_OUT, then replace the filtered
-          // instance with this instance. If this instance has no InstanceOperation, then replace the filtered instance
-          // with this instance. This is the case where the SWAP_IN node has been marked as complete or SWAP_IN exists and
-          // SWAP_OUT does not. There can never be a case where both have no InstanceOperation set.
-          newAssignableInstanceConfigMap.remove(filteredNode);
-          newAssignableInstanceConfigMap.put(node, currentInstanceConfig);
-          filteredInstancesByLogicalId.put(currentInstanceLogicalId, node);
-        }
-      } else if (!currentInstanceConfig.getInstanceOperation()
-          .equals(InstanceConstants.InstanceOperation.EVACUATE.name())) {
-        // EVACUATE instances are not considered to be assignable.
+      if (currentInstanceConfig.isAssignable()) {
         newAssignableInstanceConfigMap.put(node, currentInstanceConfig);
-        filteredInstancesByLogicalId.put(currentInstanceLogicalId, node);
       }
 
       if (currentInstanceConfig.getInstanceOperation()
-          .equals(InstanceConstants.InstanceOperation.SWAP_OUT.name())) {
-        swapOutLogicalIdsByInstanceName.put(currentInstanceConfig.getInstanceName(),
+          .equals(InstanceConstants.InstanceOperation.SWAP_IN)) {
+        swapInLogicalIdsByInstanceName.put(currentInstanceConfig.getInstanceName(),
             currentInstanceLogicalId);
-      }
-
-      if (currentInstanceConfig.getInstanceOperation()
-          .equals(InstanceConstants.InstanceOperation.SWAP_IN.name())) {
-        swapInInstancesByLogicalId.put(
+      } else {
+        nonSwapInInstancesByLogicalId.put(
             currentInstanceConfig.getLogicalId(clusterTopologyConfig.getEndNodeType()),
             currentInstanceConfig.getInstanceName());
       }
@@ -453,25 +437,20 @@
       }
     });
 
-    swapOutLogicalIdsByInstanceName.forEach((swapOutInstanceName, value) -> {
-      String swapInInstanceName = swapInInstancesByLogicalId.get(value);
-      if (swapInInstanceName != null) {
-        newSwapOutInstanceNameToSwapInInstanceName.put(swapOutInstanceName, swapInInstanceName);
+    swapInLogicalIdsByInstanceName.forEach((swapInInstanceName, swapInLogicalId) -> {
+      String swapOutInstanceName = nonSwapInInstancesByLogicalId.get(swapInLogicalId);
+      if (swapOutInstanceName != null) {
+        newSwapOutInstanceNameToSwapOutInstanceName.put(swapOutInstanceName, swapInInstanceName);
         if (liveInstancesMap.containsKey(swapInInstanceName)) {
           newLiveSwapInInstanceNames.add(swapInInstanceName);
         }
-        if (InstanceValidationUtil.isInstanceEnabled(instanceConfigMap.get(swapInInstanceName),
-            clusterConfig)) {
-          newEnabledSwapInInstanceNames.add(swapInInstanceName);
-        }
       }
     });
 
     // Replace caches with up-to-date instance sets.
-    _derivedInstanceCache =
-        new DerivedInstanceCache(newAssignableInstanceConfigMap, newAssignableLiveInstancesMap,
-            newSwapOutInstanceNameToSwapInInstanceName, newLiveSwapInInstanceNames,
-            newEnabledSwapInInstanceNames);
+    _derivedInstanceCache = new DerivedInstanceCache(newInstanceConfigMapByInstanceOperation,
+        newAssignableInstanceConfigMap, newAssignableLiveInstancesMap,
+        newSwapOutInstanceNameToSwapOutInstanceName, newLiveSwapInInstanceNames);
   }
 
   private void refreshResourceConfig(final HelixDataAccessor accessor,
@@ -722,78 +701,50 @@
   }
 
   /**
-   * Return all the live nodes that are enabled and assignable
+   * Return a set of all instances that have the UNKNOWN InstanceOperation.
+   * These instances are not assignable and should have all replicas dropped
+   * immediately.
    *
-   * @return A new set contains live instance name and that are marked enabled
+   * @return A new set contains
    */
-  public Set<String> getAssignableEnabledLiveInstances() {
-    Set<String> enabledLiveInstances = new HashSet<>(getAssignableLiveInstances().keySet());
-    enabledLiveInstances.removeAll(getDisabledInstances());
-
-    return enabledLiveInstances;
+  public Set<String> getUnknownInstances() {
+    return Collections.unmodifiableSet(
+        _derivedInstanceCache.getInstanceConfigMapByInstanceOperation(
+            InstanceConstants.InstanceOperation.UNKNOWN).keySet());
   }
 
   /**
-   * Return all the live nodes that are enabled
+   * Return all the live nodes that are enabled. If a node is enabled, it is assignable.
    * @return A new set contains live instance name and that are marked enabled
    */
   public Set<String> getEnabledLiveInstances() {
     Set<String> enabledLiveInstances = new HashSet<>(getLiveInstances().keySet());
-    enabledLiveInstances.removeAll(getDisabledInstances());
+    enabledLiveInstances.retainAll(getEnabledInstances());
 
     return enabledLiveInstances;
   }
 
   /**
-   * Return all nodes that are enabled and assignable.
-   *
-   * @return A new set contains instance name and that are marked enabled
-   */
-  public Set<String> getAssignableEnabledInstances() {
-    Set<String> enabledNodes = new HashSet<>(getAssignableInstances());
-    enabledNodes.removeAll(getDisabledInstances());
-
-    return enabledNodes;
-  }
-
-  /**
-   * Return all nodes that are enabled.
+   * Return all nodes that are enabled. If a node is enabled, it is assignable.
    * @return A new set contains instance name and that are marked enabled
    */
   public Set<String> getEnabledInstances() {
-    Set<String> enabledNodes = new HashSet<>(getAllInstances());
-    enabledNodes.removeAll(getDisabledInstances());
-
-    return enabledNodes;
+    return new HashSet<>(_derivedInstanceCache.getInstanceConfigMapByInstanceOperation(
+        InstanceConstants.InstanceOperation.ENABLE).keySet());
   }
 
   /**
-   * Return all the live nodes that are enabled and assignable and tagged with given instanceTag.
+   * Return all the live nodes that are enabled and tagged with given instanceTag. If a node is
+   * enabled, it is assignable.
    *
    * @param instanceTag The instance group tag.
    * @return A new set contains live instance name and that are marked enabled and have the
    * specified tag.
    */
-  public Set<String> getAssignableEnabledLiveInstancesWithTag(String instanceTag) {
-    Set<String> enabledLiveInstancesWithTag = new HashSet<>(getAssignableLiveInstances().keySet());
-    Set<String> instancesWithTag = getAssignableInstancesWithTag(instanceTag);
-    enabledLiveInstancesWithTag.retainAll(instancesWithTag);
-    enabledLiveInstancesWithTag.removeAll(getDisabledInstances());
-
-    return enabledLiveInstancesWithTag;
-  }
-
-  /**
-   * Return all the live nodes that are enabled and tagged with given instanceTag.
-   * @param instanceTag The instance group tag.
-   * @return A new set contains live instance name and that are marked enabled and have the
-   *         specified tag.
-   */
   public Set<String> getEnabledLiveInstancesWithTag(String instanceTag) {
-    Set<String> enabledLiveInstancesWithTag = new HashSet<>(getLiveInstances().keySet());
+    Set<String> enabledLiveInstancesWithTag = new HashSet<>(getEnabledLiveInstances());
     Set<String> instancesWithTag = getAssignableInstancesWithTag(instanceTag);
     enabledLiveInstancesWithTag.retainAll(instancesWithTag);
-    enabledLiveInstancesWithTag.removeAll(getDisabledInstances());
 
     return enabledLiveInstancesWithTag;
   }
@@ -858,9 +809,9 @@
   }
 
   /**
-   * Get all swapping instance pairs.
+   * Get all swapping instance pairs keyed by swap-out instanceNames.
    *
-   * @return a map of SWAP_OUT instanceNames and their corresponding SWAP_IN instanceNames.
+   * @return a map of swap out instanceNames and their corresponding SWAP_IN instanceNames.
    */
   public Map<String, String> getSwapOutToSwapInInstancePairs() {
     return Collections.unmodifiableMap(
@@ -868,21 +819,22 @@
   }
 
   /**
-   * Get all the live SWAP_IN instances.
+   * Get all swapping instance pairs keyed by swap-in instanceNames.
    *
-   * @return a set of SWAP_IN instanceNames that have a corresponding SWAP_OUT instance.
+   * @return a map of swap in instanceNames and their corresponding swap out instanceNames.
    */
-  public Set<String> getLiveSwapInInstanceNames() {
-    return Collections.unmodifiableSet(_derivedInstanceCache._liveSwapInInstanceNames);
+  public Map<String, String> getSwapInToSwapOutInstancePairs() {
+    return Collections.unmodifiableMap(
+        _derivedInstanceCache._swapInInstanceNameToSwapOutInstanceName);
   }
 
   /**
-   * Get all the enabled SWAP_IN instances.
+   * Get all the live SWAP_IN instances.
    *
-   * @return a set of SWAP_IN instanceNames that have a corresponding SWAP_OUT instance.
+   * @return a set of SWAP_IN instanceNames that have a corresponding swap out instance.
    */
-  public Set<String> getEnabledSwapInInstanceNames() {
-    return Collections.unmodifiableSet(_derivedInstanceCache._enabledSwapInInstanceNames);
+  public Set<String> getLiveSwapInInstanceNames() {
+    return Collections.unmodifiableSet(_derivedInstanceCache._liveSwapInInstanceNames);
   }
 
   public synchronized void setLiveInstances(List<LiveInstance> liveInstances) {
@@ -1127,7 +1079,7 @@
     _disabledInstanceSet.clear();
     for (InstanceConfig config : allInstanceConfigs) {
       Map<String, List<String>> disabledPartitionMap = config.getDisabledPartitionsMap();
-      if (!InstanceValidationUtil.isInstanceEnabled(config, clusterConfig)) {
+      if (config.getInstanceOperation().equals(InstanceConstants.InstanceOperation.DISABLE)) {
         _disabledInstanceSet.add(config.getInstanceName());
       }
       for (String resource : disabledPartitionMap.keySet()) {
diff --git a/helix-core/src/main/java/org/apache/helix/controller/rebalancer/AbstractRebalancer.java b/helix-core/src/main/java/org/apache/helix/controller/rebalancer/AbstractRebalancer.java
index 160e4ed..477ef20 100644
--- a/helix-core/src/main/java/org/apache/helix/controller/rebalancer/AbstractRebalancer.java
+++ b/helix-core/src/main/java/org/apache/helix/controller/rebalancer/AbstractRebalancer.java
@@ -362,12 +362,19 @@
       }
     }
 
+    // TODO: Consider moving this logic to assignStatesToInstances since we are already passing
+    //  disabledInstancesForPartition to that method.
     // (2) Set initial-state to certain instances that are disabled and in preference list.
     // Be careful with the conditions.
     for (String instance : preferenceList) {
       if (disabledInstancesForPartition.contains(instance)) {
         if (currentStateMap.containsKey(instance)) {
-          if (!currentStateMap.get(instance).equals(HelixDefinedState.ERROR.name())) {
+          if (currentStateMap.get(instance).equals(HelixDefinedState.ERROR.name())) {
+            // Must set to ERROR state here because assignStatesToInstances will not assign
+            // any state for disabledInstancesForPartition. This prevents the ERROR partition
+            // from being DROPPED.
+            bestPossibleStateMap.put(instance, HelixDefinedState.ERROR.name());
+          } else {
             bestPossibleStateMap.put(instance, stateModelDef.getInitialState());
           }
         } else {
diff --git a/helix-core/src/main/java/org/apache/helix/controller/rebalancer/DelayedAutoRebalancer.java b/helix-core/src/main/java/org/apache/helix/controller/rebalancer/DelayedAutoRebalancer.java
index 252b632..d55a6ea 100644
--- a/helix-core/src/main/java/org/apache/helix/controller/rebalancer/DelayedAutoRebalancer.java
+++ b/helix-core/src/main/java/org/apache/helix/controller/rebalancer/DelayedAutoRebalancer.java
@@ -95,7 +95,7 @@
 
     String instanceTag = currentIdealState.getInstanceGroupTag();
     if (instanceTag != null) {
-      assignableLiveEnabledNodes = clusterData.getAssignableEnabledLiveInstancesWithTag(instanceTag);
+      assignableLiveEnabledNodes = clusterData.getEnabledLiveInstancesWithTag(instanceTag);
       assignableNodes = clusterData.getAssignableInstancesWithTag(instanceTag);
 
       if (LOG.isInfoEnabled()) {
@@ -105,7 +105,7 @@
             currentIdealState.getInstanceGroupTag(), resourceName, assignableNodes, assignableLiveEnabledNodes));
       }
     } else {
-      assignableLiveEnabledNodes = clusterData.getAssignableEnabledLiveInstances();
+      assignableLiveEnabledNodes = clusterData.getEnabledLiveInstances();
       assignableNodes = clusterData.getAssignableInstances();
     }
 
@@ -246,7 +246,7 @@
       LOG.debug("Processing resource:" + resource.getResourceName());
     }
 
-    Set<String> allNodes = cache.getAssignableEnabledInstances();
+    Set<String> allNodes = cache.getEnabledInstances();
     Set<String> liveNodes = cache.getAssignableLiveInstances().keySet();
 
     ClusterConfig clusterConfig = cache.getClusterConfig();
@@ -268,7 +268,8 @@
       Map<String, String> bestStateForPartition =
           // We use cache.getLiveInstances().keySet() to make sure we gracefully handle n -> n + 1 replicas if possible
           // when the one of the current nodes holding the replica is no longer considered assignable. (ex: EVACUATE)
-          computeBestPossibleStateForPartition(cache.getLiveInstances().keySet(), stateModelDef, preferenceList,
+          computeBestPossibleStateForPartition(cache.getLiveInstances().keySet(),
+              stateModelDef, preferenceList,
               currentStateOutput, disabledInstancesForPartition, idealState, clusterConfig,
               partition, cache.getAbnormalStateResolver(stateModelDefName), cache);
 
@@ -328,6 +329,7 @@
     while (it.hasNext()) {
       String instance = it.next();
       String state = currentStateMap.get(instance);
+      // TODO: This may never be a possible case, figure out if we can safely remove this.
       if (state == null) {
         it.remove();
         instancesToDrop.add(instance); // These instances should be set to DROPPED after we get bestPossibleStateMap;
@@ -405,6 +407,8 @@
       }
     }
 
+    // TODO: This may not be necessary, all of the instances bestPossibleStateMap should be set to ERROR
+    //    if necessary in the call to computeBestPossibleMap.
     // Adding ERROR replica mapping to best possible
     // ERROR assignment should be mutual excluded from DROPPED assignment because
     // once there is an ERROR replica in the mapping, bestPossibleStateMap.size() > numReplicas prevents
diff --git a/helix-core/src/main/java/org/apache/helix/controller/rebalancer/topology/Topology.java b/helix-core/src/main/java/org/apache/helix/controller/rebalancer/topology/Topology.java
index 335c30f..2618275 100644
--- a/helix-core/src/main/java/org/apache/helix/controller/rebalancer/topology/Topology.java
+++ b/helix-core/src/main/java/org/apache/helix/controller/rebalancer/topology/Topology.java
@@ -205,7 +205,7 @@
         }
         addEndNode(root, instanceName, instanceTopologyMap, weight, _liveInstances);
       } catch (IllegalArgumentException e) {
-        if (InstanceValidationUtil.isInstanceEnabled(insConfig, clusterConfig)) {
+        if (insConfig.getInstanceEnabled()) {
           throw e;
         } else {
           logger.warn("Topology setting {} for instance {} is unset or invalid, ignore the instance!",
diff --git a/helix-core/src/main/java/org/apache/helix/controller/rebalancer/util/DelayedRebalanceUtil.java b/helix-core/src/main/java/org/apache/helix/controller/rebalancer/util/DelayedRebalanceUtil.java
index 2796064..90da408 100644
--- a/helix-core/src/main/java/org/apache/helix/controller/rebalancer/util/DelayedRebalanceUtil.java
+++ b/helix-core/src/main/java/org/apache/helix/controller/rebalancer/util/DelayedRebalanceUtil.java
@@ -29,19 +29,16 @@
 import java.util.stream.Collectors;
 
 import org.apache.helix.HelixManager;
-import org.apache.helix.constants.InstanceConstants;
 import org.apache.helix.controller.dataproviders.ResourceControllerDataProvider;
 import org.apache.helix.controller.rebalancer.waged.model.AssignableReplica;
 import org.apache.helix.controller.rebalancer.waged.model.ClusterModelProvider;
 import org.apache.helix.model.ClusterConfig;
-import org.apache.helix.model.ClusterTopologyConfig;
 import org.apache.helix.model.IdealState;
 import org.apache.helix.model.InstanceConfig;
 import org.apache.helix.model.Partition;
 import org.apache.helix.model.ResourceAssignment;
 import org.apache.helix.model.ResourceConfig;
 import org.apache.helix.util.InstanceValidationUtil;
-import org.apache.helix.zookeeper.datamodel.ZNRecord;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -157,7 +154,7 @@
     }
 
     // check the time instance got disabled.
-    if (!InstanceValidationUtil.isInstanceEnabled(instanceConfig, clusterConfig)) {
+    if (!instanceConfig.getInstanceEnabled()) {
       long disabledTime = instanceConfig.getInstanceEnabledTime();
       String batchedDisabledTime = clusterConfig.getInstanceHelixDisabledTimeStamp(instance);
       if (batchedDisabledTime != null && !batchedDisabledTime.isEmpty()) {
@@ -409,7 +406,7 @@
       ResourceAssignment resourceAssignment) {
     String resourceName = resourceAssignment.getResourceName();
     IdealState currentIdealState = clusterData.getIdealState(resourceName);
-    Set<String> enabledLiveInstances = clusterData.getAssignableEnabledLiveInstances();
+    Set<String> enabledLiveInstances = clusterData.getEnabledLiveInstances();
     int numReplica = currentIdealState.getReplicaCount(enabledLiveInstances.size());
     int minActiveReplica = DelayedRebalanceUtil.getMinActiveReplica(ResourceConfig
         .mergeIdealStateWithResourceConfig(clusterData.getResourceConfig(resourceName),
@@ -430,7 +427,7 @@
 
   private static int getMinActiveReplica(ResourceControllerDataProvider clusterData, String resourceName) {
     IdealState currentIdealState = clusterData.getIdealState(resourceName);
-    Set<String> enabledLiveInstances = clusterData.getAssignableEnabledLiveInstances();
+    Set<String> enabledLiveInstances = clusterData.getEnabledLiveInstances();
     int numReplica = currentIdealState.getReplicaCount(enabledLiveInstances.size());
     return DelayedRebalanceUtil.getMinActiveReplica(ResourceConfig
         .mergeIdealStateWithResourceConfig(clusterData.getResourceConfig(resourceName),
diff --git a/helix-core/src/main/java/org/apache/helix/controller/rebalancer/waged/WagedRebalancer.java b/helix-core/src/main/java/org/apache/helix/controller/rebalancer/waged/WagedRebalancer.java
index ae7e49a..39a197b 100644
--- a/helix-core/src/main/java/org/apache/helix/controller/rebalancer/waged/WagedRebalancer.java
+++ b/helix-core/src/main/java/org/apache/helix/controller/rebalancer/waged/WagedRebalancer.java
@@ -304,7 +304,7 @@
 
     Set<String> activeNodes =
         DelayedRebalanceUtil.getActiveNodes(clusterData.getAssignableInstances(),
-            clusterData.getAssignableEnabledLiveInstances(),
+            clusterData.getEnabledLiveInstances(),
             clusterData.getInstanceOfflineTimeMap(),
             clusterData.getAssignableLiveInstances().keySet(),
             clusterData.getAssignableInstanceConfigMap(), clusterData.getClusterConfig());
@@ -401,7 +401,7 @@
       RebalanceAlgorithm algorithm) throws HelixRebalanceException {
 
     // the "real" live nodes at the time
-    final Set<String> enabledLiveInstances = clusterData.getAssignableEnabledLiveInstances();
+    final Set<String> enabledLiveInstances = clusterData.getEnabledLiveInstances();
 
     if (activeNodes.equals(enabledLiveInstances) || !requireRebalanceOverwrite(clusterData, currentResourceAssignment)) {
       // no need for additional process, return the current resource assignment
@@ -602,7 +602,7 @@
       ClusterConfig clusterConfig = clusterData.getClusterConfig();
       boolean delayedRebalanceEnabled = DelayedRebalanceUtil.isDelayRebalanceEnabled(clusterConfig);
       Set<String> offlineOrDisabledInstances = new HashSet<>(delayedActiveNodes);
-      offlineOrDisabledInstances.removeAll(clusterData.getAssignableEnabledLiveInstances());
+      offlineOrDisabledInstances.removeAll(clusterData.getEnabledLiveInstances());
       for (String resource : resourceSet) {
         DelayedRebalanceUtil
             .setRebalanceScheduler(resource, delayedRebalanceEnabled, offlineOrDisabledInstances,
@@ -623,7 +623,7 @@
       String resourceName = resourceAssignment.getResourceName();
       IdealState currentIdealState = clusterData.getIdealState(resourceName);
 
-      Set<String> enabledLiveInstances = clusterData.getAssignableEnabledLiveInstances();
+      Set<String> enabledLiveInstances = clusterData.getEnabledLiveInstances();
 
       int numReplica = currentIdealState.getReplicaCount(enabledLiveInstances.size());
       int minActiveReplica = DelayedRebalanceUtil.getMinActiveReplica(ResourceConfig
diff --git a/helix-core/src/main/java/org/apache/helix/controller/rebalancer/waged/model/ClusterModelProvider.java b/helix-core/src/main/java/org/apache/helix/controller/rebalancer/waged/model/ClusterModelProvider.java
index ddd9880..75151d3 100644
--- a/helix-core/src/main/java/org/apache/helix/controller/rebalancer/waged/model/ClusterModelProvider.java
+++ b/helix-core/src/main/java/org/apache/helix/controller/rebalancer/waged/model/ClusterModelProvider.java
@@ -164,7 +164,7 @@
       ResourceControllerDataProvider dataProvider, Map<String, Resource> resourceMap,
       Map<String, ResourceAssignment> currentStateAssignment) {
     return generateClusterModel(dataProvider, resourceMap,
-        dataProvider.getAssignableEnabledLiveInstances(), Collections.emptyMap(),
+        dataProvider.getEnabledLiveInstances(), Collections.emptyMap(),
         Collections.emptyMap(), currentStateAssignment,
         RebalanceScopeType.GLOBAL_BASELINE);
   }
diff --git a/helix-core/src/main/java/org/apache/helix/controller/stages/AttributeName.java b/helix-core/src/main/java/org/apache/helix/controller/stages/AttributeName.java
index 8771bff..0db5252 100644
--- a/helix-core/src/main/java/org/apache/helix/controller/stages/AttributeName.java
+++ b/helix-core/src/main/java/org/apache/helix/controller/stages/AttributeName.java
@@ -24,6 +24,7 @@
   RESOURCES_TO_REBALANCE,
   BEST_POSSIBLE_STATE,
   CURRENT_STATE,
+  CURRENT_STATE_EXCLUDING_UNKNOWN,
   CUSTOMIZED_STATE,
   INTERMEDIATE_STATE,
   MESSAGES_ALL,
diff --git a/helix-core/src/main/java/org/apache/helix/controller/stages/BestPossibleStateCalcStage.java b/helix-core/src/main/java/org/apache/helix/controller/stages/BestPossibleStateCalcStage.java
index 2a6f964..1db0ecc 100644
--- a/helix-core/src/main/java/org/apache/helix/controller/stages/BestPossibleStateCalcStage.java
+++ b/helix-core/src/main/java/org/apache/helix/controller/stages/BestPossibleStateCalcStage.java
@@ -30,10 +30,13 @@
 import java.util.concurrent.Callable;
 import java.util.stream.Collectors;
 
+import org.apache.helix.HelixDefinedState;
 import org.apache.helix.HelixException;
 import org.apache.helix.HelixManager;
 import org.apache.helix.HelixRebalanceException;
+import org.apache.helix.constants.InstanceConstants;
 import org.apache.helix.controller.LogUtil;
+import org.apache.helix.controller.common.ResourcesStateMap;
 import org.apache.helix.controller.dataproviders.ResourceControllerDataProvider;
 import org.apache.helix.controller.pipeline.AbstractBaseStage;
 import org.apache.helix.controller.pipeline.StageException;
@@ -74,7 +77,8 @@
   @Override
   public void process(ClusterEvent event) throws Exception {
     _eventId = event.getEventId();
-    CurrentStateOutput currentStateOutput = event.getAttribute(AttributeName.CURRENT_STATE.name());
+    CurrentStateOutput currentStateOutput =
+        event.getAttribute(AttributeName.CURRENT_STATE_EXCLUDING_UNKNOWN.name());
     final Map<String, Resource> resourceMap =
         event.getAttribute(AttributeName.RESOURCES_TO_REBALANCE.name());
     final ClusterStatusMonitor clusterStatusMonitor =
@@ -83,8 +87,8 @@
         event.getAttribute(AttributeName.ControllerDataProvider.name());
 
     if (currentStateOutput == null || resourceMap == null || cache == null) {
-      throw new StageException(
-          "Missing attributes in event:" + event + ". Requires CURRENT_STATE|RESOURCES|DataCache");
+      throw new StageException("Missing attributes in event:" + event
+          + ". Requires CURRENT_STATE_EXCLUDING_UNKNOWN|RESOURCES|DataCache");
     }
 
     final BestPossibleStateOutput bestPossibleStateOutput =
@@ -131,82 +135,104 @@
     });
   }
 
+  private String selectSwapInState(StateModelDefinition stateModelDef, Map<String, String> stateMap,
+      String swapOutInstance) {
+    // If the swap-in node is live, select state with the following logic:
+    // 1. If the swap-out instance's replica is in the stateMap:
+    // - if the swap-out instance's replica is a topState, select the swap-in instance's replica to the topState.
+    //   if another is allowed to be added, otherwise select the swap-in instance's replica to a secondTopState.
+    // - if the swap-out instance's replica is not a topState or ERROR, select the swap-in instance's replica to the same state.
+    // - if the swap-out instance's replica is ERROR, select the swap-in instance's replica to the initialState.
+    // 2. If the swap-out instance's replica is not in the stateMap, select the swap-in instance's replica to the initialState.
+    // This happens when the swap-out node is offline.
+    if (stateMap.containsKey(swapOutInstance)) {
+      if (stateMap.get(swapOutInstance).equals(stateModelDef.getTopState()) || stateMap.get(
+          swapOutInstance).equals(HelixDefinedState.ERROR.name())) {
+        // If the swap-out instance's replica is a topState, select the swap-in instance's replica
+        // to be the topState if the StateModel allows another to be added. If not, select the swap-in
+        // to be the secondTopState.
+        String topStateCount = stateModelDef.getNumInstancesPerState(stateModelDef.getTopState());
+        if (topStateCount.equals(StateModelDefinition.STATE_REPLICA_COUNT_ALL_CANDIDATE_NODES)
+            || topStateCount.equals(StateModelDefinition.STATE_REPLICA_COUNT_ALL_REPLICAS)) {
+          // If the StateModel allows for another replica with the topState to be added,
+          // select the swap-in instance's replica to the topState.
+          return stateModelDef.getTopState();
+        }
+        // If StateModel does not allow another topState replica to be
+        // added, select the swap-in instance's replica to be the secondTopState.
+        return stateModelDef.getSecondTopStates().iterator().next();
+      }
+      // If the swap-out instance's replica is not a topState or ERROR, select the swap-in instance's replica
+      // to be the same state
+      return stateMap.get(swapOutInstance);
+    }
+    // If the swap-out instance's replica is not in the stateMap, return null
+    return null;
+  }
+
   private void addSwapInInstancesToBestPossibleState(Map<String, Resource> resourceMap,
       BestPossibleStateOutput bestPossibleStateOutput, ResourceControllerDataProvider cache) {
-    // 1. Get all SWAP_OUT instances and corresponding SWAP_IN instance pairs in the cluster.
+    // 1. Get all swap out instances and corresponding SWAP_IN instance pairs in the cluster.
     Map<String, String> swapOutToSwapInInstancePairs = cache.getSwapOutToSwapInInstancePairs();
-    // 2. Get all enabled and live SWAP_IN instances in the cluster.
+    Map<String, String> swapInToSwapOutInstancePairs = cache.getSwapInToSwapOutInstancePairs();
+
+    // 2. Get all live SWAP_IN instances in the cluster.
     Set<String> liveSwapInInstances = cache.getLiveSwapInInstanceNames();
-    Set<String> enabledSwapInInstances = cache.getEnabledSwapInInstanceNames();
-    // 3. For each SWAP_OUT instance in any of the preferenceLists, add the corresponding SWAP_IN instance to
-    // the stateMap with the correct state.
-    // Skipping this when there are no SWAP_IN instances that are alive will reduce computation time.
-    if (!liveSwapInInstances.isEmpty() && !cache.isMaintenanceModeEnabled()) {
-      resourceMap.forEach((resourceName, resource) -> {
-        StateModelDefinition stateModelDef = cache.getStateModelDef(resource.getStateModelDefRef());
-        bestPossibleStateOutput.getResourceStatesMap().get(resourceName).getStateMap()
-            .forEach((partition, stateMap) -> {
-              // We use the preferenceList for the case where the swapOutInstance goes offline.
-              // We do not want to drop the replicas that may have been bootstrapped on the swapInInstance
-              // in the case that the swapOutInstance goes offline and no longer has an entry in the stateMap.
-              Set<String> commonInstances = new HashSet<>(
-                  bestPossibleStateOutput.getPreferenceList(resourceName,
-                      partition.getPartitionName()));
-              commonInstances.retainAll(swapOutToSwapInInstancePairs.keySet());
+    if (liveSwapInInstances.isEmpty() || cache.isMaintenanceModeEnabled()) {
+      return;
+    }
 
-              commonInstances.forEach(swapOutInstance -> {
-                // If the corresponding swap-in instance is not live, skip assigning to it.
-                if (!liveSwapInInstances.contains(
-                    swapOutToSwapInInstancePairs.get(swapOutInstance))) {
-                  return;
-                }
+    // 3. Find the assignment for each swap-in instance
+    // <instanceName> : <resourceName> : <partitionName>
+    Map<String, Map<String, Set<String>>> swapInInstanceAssignment = new HashMap<>();
+    resourceMap.forEach((resourceName, resource) -> {
+      bestPossibleStateOutput.getResourceStatesMap().get(resourceName).getStateMap()
+          .forEach((partition, stateMap) -> {
+            // We use the preferenceList for the case where the swapOutInstance goes offline.
+            // We do not want to drop the replicas that may have been bootstrapped on the swapInInstance
+            // in the case that the swapOutInstance goes offline and no longer has an entry in the stateMap.
+            Set<String> commonInstances =
+                bestPossibleStateOutput.getInstanceStateMap(resourceName, partition) != null
+                    ? new HashSet<>(
+                    bestPossibleStateOutput.getInstanceStateMap(resourceName, partition).keySet())
+                    : Collections.emptySet();
+            if (commonInstances.isEmpty()) {
+              return;
+            }
+            commonInstances.retainAll(swapOutToSwapInInstancePairs.keySet());
 
-                // If the corresponding swap-in instance is not enabled, assign replicas with
-                // initial state.
-                if (!enabledSwapInInstances.contains(
-                    swapOutToSwapInInstancePairs.get(swapOutInstance))) {
-                  stateMap.put(swapOutToSwapInInstancePairs.get(swapOutInstance),
-                      stateModelDef.getInitialState());
-                  return;
-                }
-
-                // If the swap-in node is live and enabled, do assignment with the following logic:
-                // 1. If the swap-out instance's replica is a secondTopState, set the swap-in instance's replica
-                // to the same secondTopState.
-                // 2. If the swap-out instance's replica is any other state and is in the preferenceList,
-                // set the swap-in instance's replica to the topState if the StateModel allows another to be added.
-                // If not, set the swap-in instance's replica to the secondTopState.
-                // We can make this assumption because if there is assignment to the swapOutInstance, it must be either
-                // a topState or a secondTopState.
-                if (stateMap.containsKey(swapOutInstance) && stateModelDef.getSecondTopStates()
-                    .contains(stateMap.get(swapOutInstance))) {
-                  // If the swap-out instance's replica is a secondTopState, set the swap-in instance's replica
-                  // to the same secondTopState.
-                  stateMap.put(swapOutToSwapInInstancePairs.get(swapOutInstance),
-                      stateMap.get(swapOutInstance));
-                } else {
-                  // If the swap-out instance's replica is any other state in the stateMap or not present in the
-                  // stateMap, set the swap-in instance's replica to the topState if the StateModel allows another
-                  // to be added. If not, set the swap-in to the secondTopState.
-                  String topStateCount =
-                      stateModelDef.getNumInstancesPerState(stateModelDef.getTopState());
-                  if (topStateCount.equals(
-                      StateModelDefinition.STATE_REPLICA_COUNT_ALL_CANDIDATE_NODES)
-                      || topStateCount.equals(
-                      StateModelDefinition.STATE_REPLICA_COUNT_ALL_REPLICAS)) {
-                    // If the StateModel allows for another replica with the topState to be added,
-                    // set the swap-in instance's replica to the topState.
-                    stateMap.put(swapOutToSwapInInstancePairs.get(swapOutInstance),
-                        stateModelDef.getTopState());
-                  } else {
-                    // If StateModel does not allow another topState replica to be
-                    // added, set the swap-in instance's replica to the secondTopState.
-                    stateMap.put(swapOutToSwapInInstancePairs.get(swapOutInstance),
-                        stateModelDef.getSecondTopStates().iterator().next());
-                  }
-                }
-              });
+            commonInstances.forEach(swapOutInstance -> {
+              swapInInstanceAssignment.computeIfAbsent(
+                      swapOutToSwapInInstancePairs.get(swapOutInstance), k -> new HashMap<>())
+                  .computeIfAbsent(resourceName, k -> new HashSet<>())
+                  .add(partition.getPartitionName());
             });
+          });
+    });
+
+    // 4. Add the correct states for the swap-in instances to the bestPossibleStateOutput.
+    if (!swapInInstanceAssignment.isEmpty()) {
+      swapInInstanceAssignment.forEach((swapInInstance, resourceMapForInstance) -> {
+        // If the corresponding swap-in instance is not live, skip assigning to it.
+        if (!liveSwapInInstances.contains(swapInInstance)) {
+          return;
+        }
+
+        resourceMapForInstance.forEach((resourceName, partitions) -> {
+          partitions.forEach(partitionName -> {
+            Partition partition = new Partition(partitionName);
+            Map<String, String> stateMap =
+                bestPossibleStateOutput.getInstanceStateMap(resourceName, partition);
+
+            String selectedState = selectSwapInState(
+                cache.getStateModelDef(resourceMap.get(resourceName).getStateModelDefRef()),
+                stateMap, swapInToSwapOutInstancePairs.get(swapInInstance));
+            if (stateMap != null) {
+              bestPossibleStateOutput.setState(resourceName, partition, swapInInstance,
+                  selectedState);
+            }
+          });
+        });
       });
     }
   }
@@ -250,7 +276,7 @@
     // Check whether the offline/disabled instance count in the cluster exceeds the set limit,
     // if yes, put the cluster into maintenance mode.
     boolean isValid =
-        validateOfflineInstancesLimit(cache, event.getAttribute(AttributeName.helixmanager.name()));
+        validateInstancesUnableToAcceptOnlineReplicasLimit(cache, event.getAttribute(AttributeName.helixmanager.name()));
 
     final List<String> failureResources = new ArrayList<>();
 
@@ -323,25 +349,32 @@
     });
   }
 
-  // Check whether the offline/disabled instance count in the cluster reaches the set limit,
+  // Check whether the offline/unable to accept online replicas instance count in the cluster reaches the set limit,
   // if yes, auto enable maintenance mode, and use the maintenance rebalancer for this pipeline.
-  private boolean validateOfflineInstancesLimit(final ResourceControllerDataProvider cache,
+  private boolean validateInstancesUnableToAcceptOnlineReplicasLimit(final ResourceControllerDataProvider cache,
       final HelixManager manager) {
-    int maxOfflineInstancesAllowed = cache.getClusterConfig().getMaxOfflineInstancesAllowed();
-    if (maxOfflineInstancesAllowed >= 0) {
-      int offlineCount =
-          cache.getAssignableInstances().size() - cache.getAssignableEnabledLiveInstances().size();
-      if (offlineCount > maxOfflineInstancesAllowed) {
+    int maxInstancesUnableToAcceptOnlineReplicas =
+        cache.getClusterConfig().getMaxOfflineInstancesAllowed();
+    if (maxInstancesUnableToAcceptOnlineReplicas >= 0) {
+      // Instead of only checking the offline instances, we consider how many instances in the cluster
+      // are not assignable and live. This is because some instances may be online but have an unassignable
+      // InstanceOperation such as EVACUATE, DISABLE, or UNKNOWN. We will exclude SWAP_IN instances from
+      // they should not account against the capacity of the cluster.
+      int instancesUnableToAcceptOnlineReplicas = cache.getInstanceConfigMap().entrySet().stream()
+          .filter(instanceEntry -> !InstanceConstants.UNSERVABLE_INSTANCE_OPERATIONS.contains(
+              instanceEntry.getValue().getInstanceOperation())).collect(Collectors.toSet())
+          .size() - cache.getEnabledLiveInstances().size();
+      if (instancesUnableToAcceptOnlineReplicas > maxInstancesUnableToAcceptOnlineReplicas) {
         String errMsg = String.format(
-            "Offline Instances count %d greater than allowed count %d. Put cluster %s into "
-                + "maintenance mode.",
-            offlineCount, maxOfflineInstancesAllowed, cache.getClusterName());
+            "Instances unable to take ONLINE replicas count %d greater than allowed count %d. Put cluster %s into "
+                + "maintenance mode.", instancesUnableToAcceptOnlineReplicas,
+            maxInstancesUnableToAcceptOnlineReplicas, cache.getClusterName());
         if (manager != null) {
           if (manager.getHelixDataAccessor()
               .getProperty(manager.getHelixDataAccessor().keyBuilder().maintenance()) == null) {
             manager.getClusterManagmentTool()
                 .autoEnableMaintenanceMode(manager.getClusterName(), true, errMsg,
-                    MaintenanceSignal.AutoTriggerReason.MAX_OFFLINE_INSTANCES_EXCEEDED);
+                    MaintenanceSignal.AutoTriggerReason.MAX_INSTANCES_UNABLE_TO_ACCEPT_ONLINE_REPLICAS);
             LogUtil.logWarn(logger, _eventId, errMsg);
           }
         } else {
diff --git a/helix-core/src/main/java/org/apache/helix/controller/stages/CurrentStateComputationStage.java b/helix-core/src/main/java/org/apache/helix/controller/stages/CurrentStateComputationStage.java
index 6fbb2b6..3bf23d2 100644
--- a/helix-core/src/main/java/org/apache/helix/controller/stages/CurrentStateComputationStage.java
+++ b/helix-core/src/main/java/org/apache/helix/controller/stages/CurrentStateComputationStage.java
@@ -28,6 +28,7 @@
 import java.util.concurrent.ExecutorService;
 import java.util.stream.Collectors;
 
+import org.apache.helix.constants.InstanceConstants;
 import org.apache.helix.controller.LogUtil;
 import org.apache.helix.controller.dataproviders.BaseControllerDataProvider;
 import org.apache.helix.controller.dataproviders.ResourceControllerDataProvider;
@@ -43,6 +44,7 @@
 import org.apache.helix.controller.rebalancer.waged.model.ClusterModelProvider;
 import org.apache.helix.model.CurrentState;
 import org.apache.helix.model.IdealState;
+import org.apache.helix.model.InstanceConfig;
 import org.apache.helix.model.LiveInstance;
 import org.apache.helix.model.Message;
 import org.apache.helix.model.Message.MessageType;
@@ -86,24 +88,39 @@
 
     Map<String, LiveInstance> liveInstances = cache.getLiveInstances();
     final CurrentStateOutput currentStateOutput = new CurrentStateOutput();
+    final CurrentStateOutput currentStateExcludingUnknown = new CurrentStateOutput();
 
     for (LiveInstance instance : liveInstances.values()) {
       String instanceName = instance.getInstanceName();
       String instanceSessionId = instance.getEphemeralOwner();
+      InstanceConfig instanceConfig = cache.getInstanceConfigMap().get(instanceName);
+
+      Set<Message> existingStaleMessages = cache.getStaleMessagesByInstance(instanceName);
+      Map<String, Message> messages = cache.getMessages(instanceName);
+      Map<String, Message> relayMessages = cache.getRelayMessages(instanceName);
 
       // update current states.
       updateCurrentStates(instance,
           cache.getCurrentState(instanceName, instanceSessionId, _isTaskFrameworkPipeline).values(),
           currentStateOutput, resourceMap);
-
-      Set<Message> existingStaleMessages = cache.getStaleMessagesByInstance(instanceName);
       // update pending messages
-      Map<String, Message> messages = cache.getMessages(instanceName);
-      Map<String, Message> relayMessages = cache.getRelayMessages(instanceName);
       updatePendingMessages(instance, cache, messages.values(), relayMessages.values(),
           existingStaleMessages, currentStateOutput, resourceMap);
+
+      // Only update the currentStateExcludingUnknown if the instance is not in UNKNOWN InstanceOperation.
+      if (instanceConfig == null || !instanceConfig.getInstanceOperation()
+          .equals(InstanceConstants.InstanceOperation.UNKNOWN)) {
+        // update current states.
+        updateCurrentStates(instance,
+            cache.getCurrentState(instanceName, instanceSessionId, _isTaskFrameworkPipeline)
+                .values(), currentStateExcludingUnknown, resourceMap);
+        // update pending messages
+        updatePendingMessages(instance, cache, messages.values(), relayMessages.values(),
+            existingStaleMessages, currentStateExcludingUnknown, resourceMap);
+      }
     }
     event.addAttribute(AttributeName.CURRENT_STATE.name(), currentStateOutput);
+    event.addAttribute(AttributeName.CURRENT_STATE_EXCLUDING_UNKNOWN.name(), currentStateExcludingUnknown);
 
     final ClusterStatusMonitor clusterStatusMonitor =
         event.getAttribute(AttributeName.clusterStatusMonitor.name());
diff --git a/helix-core/src/main/java/org/apache/helix/controller/stages/IntermediateStateCalcStage.java b/helix-core/src/main/java/org/apache/helix/controller/stages/IntermediateStateCalcStage.java
index b399004..ba2e160 100644
--- a/helix-core/src/main/java/org/apache/helix/controller/stages/IntermediateStateCalcStage.java
+++ b/helix-core/src/main/java/org/apache/helix/controller/stages/IntermediateStateCalcStage.java
@@ -661,11 +661,11 @@
     // preference list
     if (preferenceList != null) {
       return stateModelDefinition.getStateCountMap((int) preferenceList.stream().filter(
-              i -> resourceControllerDataProvider.getAssignableEnabledLiveInstances().contains(i))
+              i -> resourceControllerDataProvider.getEnabledLiveInstances().contains(i))
           .count(), requiredNumReplica); // StateModelDefinition's counts
     }
     return stateModelDefinition.getStateCountMap(
-        resourceControllerDataProvider.getAssignableEnabledLiveInstances().size(),
+        resourceControllerDataProvider.getEnabledLiveInstances().size(),
         requiredNumReplica); // StateModelDefinition's counts
   }
 
diff --git a/helix-core/src/main/java/org/apache/helix/controller/stages/MaintenanceRecoveryStage.java b/helix-core/src/main/java/org/apache/helix/controller/stages/MaintenanceRecoveryStage.java
index d262d14..1a5185a 100644
--- a/helix-core/src/main/java/org/apache/helix/controller/stages/MaintenanceRecoveryStage.java
+++ b/helix-core/src/main/java/org/apache/helix/controller/stages/MaintenanceRecoveryStage.java
@@ -82,6 +82,7 @@
     String reason;
     switch (internalReason) {
     case MAX_OFFLINE_INSTANCES_EXCEEDED:
+    case MAX_INSTANCES_UNABLE_TO_ACCEPT_ONLINE_REPLICAS:
       // Check on the number of offline/disabled instances
       int numOfflineInstancesForAutoExit =
           cache.getClusterConfig().getNumOfflineInstancesForAutoExit();
@@ -90,7 +91,7 @@
       }
       // Get the count of all instances that are either offline or disabled
       int offlineDisabledCount =
-          cache.getAssignableInstances().size() - cache.getAssignableEnabledLiveInstances().size();
+          cache.getAssignableInstances().size() - cache.getEnabledLiveInstances().size();
       shouldExitMaintenance = offlineDisabledCount <= numOfflineInstancesForAutoExit;
       reason = String.format(
           "Auto-exiting maintenance mode for cluster %s; Num. of offline/disabled instances is %d, less than or equal to the exit threshold %d",
diff --git a/helix-core/src/main/java/org/apache/helix/controller/stages/MessageGenerationPhase.java b/helix-core/src/main/java/org/apache/helix/controller/stages/MessageGenerationPhase.java
index 5c22c11..859c667 100644
--- a/helix-core/src/main/java/org/apache/helix/controller/stages/MessageGenerationPhase.java
+++ b/helix-core/src/main/java/org/apache/helix/controller/stages/MessageGenerationPhase.java
@@ -135,6 +135,8 @@
           new HashMap<>(resourcesStateMap.getInstanceStateMap(resourceName, partition));
       Map<String, String> pendingStateMap =
           currentStateOutput.getPendingStateMap(resourceName, partition);
+      Map<String, String> currentStateMap =
+          currentStateOutput.getCurrentStateMap(resourceName, partition);
 
       // The operation is combing pending state with best possible state. Since some replicas have
       // been moved from one instance to another, the instance will exist in pending state but not
@@ -146,6 +148,16 @@
         }
       }
 
+      // Look through the current state map and add DROPPED message if the instance is not in the
+      // resourceStateMap. This instance may not have had been dropped by the rebalance strategy.
+      // This check is required to ensure that the instances removed from the ideal state stateMap
+      // are properly dropped.
+      for (String instance : currentStateMap.keySet()) {
+        if (!instanceStateMap.containsKey(instance)) {
+          instanceStateMap.put(instance, HelixDefinedState.DROPPED.name());
+        }
+      }
+
       // we should generate message based on the desired-state priority
       // so keep generated messages in a temp map keyed by state
       // desired-state->list of generated-messages
diff --git a/helix-core/src/main/java/org/apache/helix/controller/stages/ReadClusterDataStage.java b/helix-core/src/main/java/org/apache/helix/controller/stages/ReadClusterDataStage.java
index 7e8bde9..6a0ae76 100644
--- a/helix-core/src/main/java/org/apache/helix/controller/stages/ReadClusterDataStage.java
+++ b/helix-core/src/main/java/org/apache/helix/controller/stages/ReadClusterDataStage.java
@@ -90,7 +90,7 @@
                 instanceMessageMap.put(instanceName,
                     Sets.newHashSet(dataProvider.getMessages(instanceName).values()));
               }
-              if (!InstanceValidationUtil.isInstanceEnabled(config, clusterConfig)) {
+              if (!config.getInstanceEnabled()) {
                 disabledInstanceSet.add(instanceName);
               }
 
diff --git a/helix-core/src/main/java/org/apache/helix/examples/IdealStateBuilderExample.java b/helix-core/src/main/java/org/apache/helix/examples/IdealStateBuilderExample.java
index fd91588..036a548 100644
--- a/helix-core/src/main/java/org/apache/helix/examples/IdealStateBuilderExample.java
+++ b/helix-core/src/main/java/org/apache/helix/examples/IdealStateBuilderExample.java
@@ -19,6 +19,7 @@
  * under the License.
  */
 
+import org.apache.helix.constants.InstanceConstants;
 import org.apache.helix.controller.HelixControllerMain;
 import org.apache.helix.manager.zk.ZKHelixAdmin;
 import org.apache.helix.manager.zk.ZNRecordSerializer;
@@ -73,7 +74,7 @@
       InstanceConfig config = new InstanceConfig("localhost_" + port);
       config.setHostName("localhost");
       config.setPort(Integer.toString(port));
-      config.setInstanceEnabled(true);
+      config.setInstanceOperation(InstanceConstants.InstanceOperation.ENABLE);
       admin.addInstance(clusterName, config);
     }
 
diff --git a/helix-core/src/main/java/org/apache/helix/examples/IdealStateExample.java b/helix-core/src/main/java/org/apache/helix/examples/IdealStateExample.java
index 1f3939c..fa5b7cd 100644
--- a/helix-core/src/main/java/org/apache/helix/examples/IdealStateExample.java
+++ b/helix-core/src/main/java/org/apache/helix/examples/IdealStateExample.java
@@ -19,6 +19,7 @@
  * under the License.
  */
 
+import org.apache.helix.constants.InstanceConstants;
 import org.apache.helix.controller.HelixControllerMain;
 import org.apache.helix.manager.zk.ZKHelixAdmin;
 import org.apache.helix.manager.zk.ZNRecordSerializer;
@@ -113,7 +114,7 @@
       InstanceConfig config = new InstanceConfig("localhost_" + port);
       config.setHostName("localhost");
       config.setPort(Integer.toString(port));
-      config.setInstanceEnabled(true);
+      config.setInstanceOperation(InstanceConstants.InstanceOperation.ENABLE);
       admin.addInstance(clusterName, config);
     }
 
diff --git a/helix-core/src/main/java/org/apache/helix/examples/Quickstart.java b/helix-core/src/main/java/org/apache/helix/examples/Quickstart.java
index 9cc14b6..37fd1ac 100644
--- a/helix-core/src/main/java/org/apache/helix/examples/Quickstart.java
+++ b/helix-core/src/main/java/org/apache/helix/examples/Quickstart.java
@@ -29,6 +29,7 @@
 import org.apache.helix.HelixManager;
 import org.apache.helix.HelixManagerFactory;
 import org.apache.helix.InstanceType;
+import org.apache.helix.constants.InstanceConstants;
 import org.apache.helix.controller.HelixControllerMain;
 import org.apache.helix.manager.zk.ZKHelixAdmin;
 import org.apache.helix.model.ExternalView;
@@ -69,7 +70,7 @@
       InstanceConfig instanceConfig = new InstanceConfig("localhost_" + port);
       instanceConfig.setHostName("localhost");
       instanceConfig.setPort("" + port);
-      instanceConfig.setInstanceEnabled(true);
+      instanceConfig.setInstanceOperation(InstanceConstants.InstanceOperation.ENABLE);
       INSTANCE_CONFIG_LIST.add(instanceConfig);
     }
 
@@ -190,7 +191,7 @@
     InstanceConfig instanceConfig = new InstanceConfig("localhost_" + port);
     instanceConfig.setHostName("localhost");
     instanceConfig.setPort("" + port);
-    instanceConfig.setInstanceEnabled(true);
+    instanceConfig.setInstanceOperation(InstanceConstants.InstanceOperation.ENABLE);
     echo("ADDING NEW NODE :" + instanceConfig.getInstanceName()
         + ". Partitions will move from old nodes to the new node.");
     admin.addInstance(CLUSTER_NAME, instanceConfig);
diff --git a/helix-core/src/main/java/org/apache/helix/manager/zk/ZKHelixAdmin.java b/helix-core/src/main/java/org/apache/helix/manager/zk/ZKHelixAdmin.java
index d52967f..c7fe086 100644
--- a/helix-core/src/main/java/org/apache/helix/manager/zk/ZKHelixAdmin.java
+++ b/helix-core/src/main/java/org/apache/helix/manager/zk/ZKHelixAdmin.java
@@ -61,7 +61,6 @@
 import org.apache.helix.api.status.ClusterManagementModeRequest;
 import org.apache.helix.api.topology.ClusterTopology;
 import org.apache.helix.constants.InstanceConstants;
-import org.apache.helix.controller.rebalancer.DelayedAutoRebalancer;
 import org.apache.helix.controller.rebalancer.strategy.RebalanceStrategy;
 import org.apache.helix.controller.rebalancer.util.WagedValidationUtil;
 import org.apache.helix.controller.rebalancer.waged.WagedRebalancer;
@@ -119,10 +118,10 @@
   public static final String CONNECTION_TIMEOUT = "helixAdmin.timeOutInSec";
   private static final String MAINTENANCE_ZNODE_ID = "maintenance";
   private static final int DEFAULT_SUPERCLUSTER_REPLICA = 3;
-  private static final ImmutableSet<String> ALLOWED_INSTANCE_OPERATIONS_FOR_ADD_INSTANCE =
-      ImmutableSet.of("", InstanceConstants.InstanceOperation.SWAP_IN.name());
-  private static final ImmutableSet<String> INSTANCE_OPERATION_TO_EXCLUDE_FROM_ASSIGNMENT =
-      ImmutableSet.of(InstanceConstants.InstanceOperation.EVACUATE.name());
+  private static final ImmutableSet<InstanceConstants.InstanceOperation>
+      INSTANCE_OPERATION_TO_EXCLUDE_FROM_ASSIGNMENT =
+      ImmutableSet.of(InstanceConstants.InstanceOperation.EVACUATE,
+          InstanceConstants.InstanceOperation.UNKNOWN);
 
   private final RealmAwareZkClient _zkClient;
   private final ConfigAccessor _configAccessor;
@@ -206,113 +205,29 @@
       throw new HelixException("Node " + nodeId + " already exists in cluster " + clusterName);
     }
 
-    if (!ALLOWED_INSTANCE_OPERATIONS_FOR_ADD_INSTANCE.contains(
-        instanceConfig.getInstanceOperation())) {
+    List<InstanceConfig> matchingLogicalIdInstances =
+        findInstancesMatchingLogicalId(clusterName, instanceConfig);
+    if (matchingLogicalIdInstances.size() > 1) {
       throw new HelixException(
-          "Instance can only be added if InstanceOperation is set to one of" + "the following: "
-              + ALLOWED_INSTANCE_OPERATIONS_FOR_ADD_INSTANCE + " This instance: " + nodeId
-              + " has InstanceOperation set to " + instanceConfig.getInstanceOperation());
+          "There are already more than one instance with the same logicalId in the cluster: "
+              + matchingLogicalIdInstances.stream().map(InstanceConfig::getInstanceName)
+              .collect(Collectors.joining(", "))
+              + " Please make sure there is at most 2 instance with the same logicalId in the cluster.");
     }
 
-    // Get the topology key used to determine the logicalId of a node.
-    ClusterConfig clusterConfig = _configAccessor.getClusterConfig(clusterName);
-    ClusterTopologyConfig clusterTopologyConfig =
-        ClusterTopologyConfig.createFromClusterConfig(clusterConfig);
-    String logicalIdKey = clusterTopologyConfig.getEndNodeType();
-    String faultZoneKey = clusterTopologyConfig.getFaultZoneType();
-    String toAddInstanceLogicalId = instanceConfig.getLogicalId(logicalIdKey);
-
-    HelixConfigScope instanceConfigScope =
-        new HelixConfigScopeBuilder(HelixConfigScope.ConfigScopeProperty.PARTICIPANT,
-            clusterName).build();
-    List<String> existingInstanceIds = getConfigKeys(instanceConfigScope);
-    List<InstanceConfig> foundInstanceConfigsWithMatchingLogicalId =
-        existingInstanceIds.parallelStream()
-            .map(existingInstanceId -> getInstanceConfig(clusterName, existingInstanceId)).filter(
-                existingInstanceConfig -> existingInstanceConfig.getLogicalId(logicalIdKey)
-                    .equals(toAddInstanceLogicalId)).collect(Collectors.toList());
-
-    if (foundInstanceConfigsWithMatchingLogicalId.size() >= 2) {
-      // If the length is 2, we cannot add an instance with the same logicalId as an existing instance
-      // regardless of InstanceOperation.
-      throw new HelixException(
-          "There can only be 2 instances with the same logicalId in a cluster. "
-              + "Existing instances: " + foundInstanceConfigsWithMatchingLogicalId.get(0)
-              .getInstanceName() + " and " + foundInstanceConfigsWithMatchingLogicalId.get(1)
-              .getInstanceName() + " already have the same logicalId: " + toAddInstanceLogicalId
-              + "; therefore, " + nodeId + " cannot be added to the cluster.");
-    } else if (foundInstanceConfigsWithMatchingLogicalId.size() == 1) {
-      // If there is only one instance with the same logicalId,
-      // we can infer that the intended behaviour is to SWAP_IN or EVACUATE + ADD.
-      if (foundInstanceConfigsWithMatchingLogicalId.get(0).getInstanceOperation()
-          .equals(InstanceConstants.InstanceOperation.SWAP_OUT.name())) {
-        // If the existing instance with the same logicalId has SWAP_OUT InstanceOperation
-
-        // If the InstanceOperation is unset, we will set it to SWAP_IN.
-        if (!instanceConfig.getInstanceOperation()
-            .equals(InstanceConstants.InstanceOperation.SWAP_IN.name())) {
-          instanceConfig.setInstanceOperation(InstanceConstants.InstanceOperation.SWAP_IN);
-        }
-
-        // If the existing instance with the same logicalId is not in the same FAULT_ZONE as this instance, we cannot
-        // add this instance.
-        if (!foundInstanceConfigsWithMatchingLogicalId.get(0).getDomainAsMap()
-            .containsKey(faultZoneKey) || !instanceConfig.getDomainAsMap().containsKey(faultZoneKey)
-            || !foundInstanceConfigsWithMatchingLogicalId.get(0).getDomainAsMap().get(faultZoneKey)
-            .equals(instanceConfig.getDomainAsMap().get(faultZoneKey))) {
-          throw new HelixException(
-              "Instance can only be added if the SWAP_OUT instance sharing the same logicalId is in the same FAULT_ZONE"
-                  + " as this instance. " + "Existing instance: "
-                  + foundInstanceConfigsWithMatchingLogicalId.get(0).getInstanceName()
-                  + " has FAULT_ZONE_TYPE: " + foundInstanceConfigsWithMatchingLogicalId.get(0)
-                  .getDomainAsMap().get(faultZoneKey) + " and this instance: " + nodeId
-                  + " has FAULT_ZONE_TYPE: " + instanceConfig.getDomainAsMap().get(faultZoneKey));
-        }
-
-        Map<String, Integer> foundInstanceCapacityMap =
-            foundInstanceConfigsWithMatchingLogicalId.get(0).getInstanceCapacityMap().isEmpty()
-                ? clusterConfig.getDefaultInstanceCapacityMap()
-                : foundInstanceConfigsWithMatchingLogicalId.get(0).getInstanceCapacityMap();
-        Map<String, Integer> instanceCapacityMap = instanceConfig.getInstanceCapacityMap().isEmpty()
-            ? clusterConfig.getDefaultInstanceCapacityMap()
-            : instanceConfig.getInstanceCapacityMap();
-        // If the instance does not have the same capacity, we cannot add this instance.
-        if (!new EqualsBuilder().append(foundInstanceCapacityMap, instanceCapacityMap).isEquals()) {
-          throw new HelixException(
-              "Instance can only be added if the SWAP_OUT instance sharing the same logicalId has the same capacity"
-                  + " as this instance. " + "Existing instance: "
-                  + foundInstanceConfigsWithMatchingLogicalId.get(0).getInstanceName()
-                  + " has capacity: " + foundInstanceCapacityMap + " and this instance: " + nodeId
-                  + " has capacity: " + instanceCapacityMap);
-        }
-      } else if (foundInstanceConfigsWithMatchingLogicalId.get(0).getInstanceOperation()
-          .equals(InstanceConstants.InstanceOperation.EVACUATE.name())) {
-        // No need to check anything on the new node, the old node will be evacuated and the new node
-        // will be added.
-      } else {
-        // If the instanceConfig.getInstanceEnabled() is true and the existing instance with the same logicalId
-        // does not have InstanceOperation set to one of the above, we cannot add this instance.
-        throw new HelixException(
-            "Instance can only be added if the exising instance sharing the same logicalId"
-                + " has InstanceOperation set to "
-                + InstanceConstants.InstanceOperation.SWAP_OUT.name()
-                + " and this instance has InstanceOperation set to "
-                + InstanceConstants.InstanceOperation.SWAP_IN.name()
-                + " or the existing instance sharing the same logicalId has Instance Operation set to "
-                + InstanceConstants.InstanceOperation.EVACUATE.name()
-                + " and this instance has InstanceOperation unset. Existing instance: "
-                + foundInstanceConfigsWithMatchingLogicalId.get(0).getInstanceName()
-                + " has InstanceOperation: " + foundInstanceConfigsWithMatchingLogicalId.get(0)
-                .getInstanceOperation());
-      }
-    } else if (!instanceConfig.getInstanceOperation().isEmpty()) {
-      // If there are no instances with the same logicalId, we can only add this instance if InstanceOperation
-      // is unset because it is a new instance.
-      throw new HelixException(
-          "There is no instance with logicalId: " + toAddInstanceLogicalId + " in cluster: "
-              + clusterName + "; therefore, " + nodeId
-              + " cannot join cluster with InstanceOperation set to "
-              + instanceConfig.getInstanceOperation() + ".");
+    InstanceConstants.InstanceOperation attemptedInstanceOperation =
+        instanceConfig.getInstanceOperation();
+    try {
+      validateInstanceOperationTransition(instanceConfig,
+          !matchingLogicalIdInstances.isEmpty() ? matchingLogicalIdInstances.get(0) : null,
+          InstanceConstants.InstanceOperation.UNKNOWN,
+          attemptedInstanceOperation, clusterName);
+    } catch (HelixException e) {
+      instanceConfig.setInstanceOperation(InstanceConstants.InstanceOperation.UNKNOWN);
+      logger.error("Failed to add instance " + instanceConfig.getInstanceName() + " to cluster "
+          + clusterName + " with instance operation " + attemptedInstanceOperation
+          + ". Setting INSTANCE_OPERATION to " + instanceConfig.getInstanceOperation()
+          + " instead.", e);
     }
 
     ZKUtil.createChildren(_zkClient, instanceConfigsPath, instanceConfig.getRecord());
@@ -464,12 +379,14 @@
     return accessor.setProperty(instanceConfigPropertyKey, newInstanceConfig);
   }
 
+  @Deprecated
   @Override
   public void enableInstance(final String clusterName, final String instanceName,
       final boolean enabled) {
     enableInstance(clusterName, instanceName, enabled, null, null);
   }
 
+  @Deprecated
   @Override
   public void enableInstance(final String clusterName, final String instanceName,
       final boolean enabled, InstanceConstants.InstanceDisabledType disabledType, String reason) {
@@ -477,20 +394,6 @@
         clusterName);
     BaseDataAccessor<ZNRecord> baseAccessor = new ZkBaseDataAccessor<>(_zkClient);
 
-    // If enabled is set to true and InstanceOperation is SWAP_IN, we should fail if there is not a
-    // matching SWAP_OUT instance.
-    InstanceConfig instanceConfig = getInstanceConfig(clusterName, instanceName);
-    if (enabled && instanceConfig.getInstanceOperation()
-        .equals(InstanceConstants.InstanceOperation.SWAP_IN.name())) {
-      InstanceConfig matchingSwapInstance = findMatchingSwapInstance(clusterName, instanceConfig);
-      if (matchingSwapInstance == null || !matchingSwapInstance.getInstanceOperation()
-          .equals(InstanceConstants.InstanceOperation.SWAP_OUT.name())) {
-        throw new HelixException("Instance cannot be enabled if InstanceOperation is set to "
-            + instanceConfig.getInstanceOperation() + " when there is no matching "
-            + InstanceConstants.InstanceOperation.SWAP_OUT.name() + " instance.");
-      }
-    }
-
     // Eventually we will have all instances' enable/disable information in clusterConfig. Now we
     // update both instanceConfig and clusterConfig in transition period.
     enableSingleInstance(clusterName, instanceName, enabled, baseAccessor, disabledType, reason);
@@ -499,6 +402,7 @@
 
   }
 
+  @Deprecated
   @Override
   public void enableInstance(String clusterName, List<String> instances, boolean enabled) {
     // TODO: batch enable/disable is breaking backward compatibility on instance enable with older library
@@ -509,62 +413,88 @@
     //enableInstance(clusterName, instances, enabled, null, null);
   }
 
+  private void validateInstanceOperationTransition(InstanceConfig instanceConfig,
+      InstanceConfig matchingLogicalIdInstance,
+      InstanceConstants.InstanceOperation currentOperation,
+      InstanceConstants.InstanceOperation targetOperation,
+      String clusterName) {
+    boolean targetStateEnableOrDisable =
+        targetOperation.equals(InstanceConstants.InstanceOperation.ENABLE)
+            || targetOperation.equals(InstanceConstants.InstanceOperation.DISABLE);
+    switch (currentOperation) {
+      case ENABLE:
+      case DISABLE:
+        // ENABLE or DISABLE can be set to ENABLE, DISABLE, or EVACUATE at any time.
+        if (ImmutableSet.of(InstanceConstants.InstanceOperation.ENABLE,
+            InstanceConstants.InstanceOperation.DISABLE,
+            InstanceConstants.InstanceOperation.EVACUATE).contains(targetOperation)) {
+          return;
+        }
+      case SWAP_IN:
+        // We can only ENABLE or DISABLE a SWAP_IN instance if there is an instance with matching logicalId
+        // with an InstanceOperation set to UNKNOWN.
+        if ((targetStateEnableOrDisable && (matchingLogicalIdInstance == null
+            || matchingLogicalIdInstance.getInstanceOperation()
+            .equals(InstanceConstants.InstanceOperation.UNKNOWN))) || targetOperation.equals(
+            InstanceConstants.InstanceOperation.UNKNOWN)) {
+          return;
+        }
+      case EVACUATE:
+        // EVACUATE can only be set to ENABLE or DISABLE when there is no instance with the same
+        // logicalId in the cluster.
+        if ((targetStateEnableOrDisable && matchingLogicalIdInstance == null)
+            || targetOperation.equals(InstanceConstants.InstanceOperation.UNKNOWN)) {
+          return;
+        }
+      case UNKNOWN:
+        // UNKNOWN can be set to ENABLE or DISABLE when there is no instance with the same logicalId in the cluster
+        // or the instance with the same logicalId in the cluster has InstanceOperation set to EVACUATE.
+        // UNKNOWN can be set to SWAP_IN when there is an instance with the same logicalId in the cluster set to ENABLE,
+        // or DISABLE.
+        if ((targetStateEnableOrDisable && (matchingLogicalIdInstance == null
+            || matchingLogicalIdInstance.getInstanceOperation()
+            .equals(InstanceConstants.InstanceOperation.EVACUATE)))) {
+          return;
+        } else if (targetOperation.equals(InstanceConstants.InstanceOperation.SWAP_IN)
+            && matchingLogicalIdInstance != null && !ImmutableSet.of(
+                InstanceConstants.InstanceOperation.UNKNOWN,
+                InstanceConstants.InstanceOperation.EVACUATE)
+            .contains(matchingLogicalIdInstance.getInstanceOperation())) {
+          return;
+        }
+      default:
+        throw new HelixException(
+            "InstanceOperation cannot be set to " + targetOperation + " when the instance is in "
+                + currentOperation + " state");
+    }
+  }
+
+  /**
+   * Set the InstanceOperation of an instance in the cluster.
+   *
+   * @param clusterName       The cluster name
+   * @param instanceName      The instance name
+   * @param instanceOperation The instance operation
+   */
   @Override
-  // TODO: Name may change in future
   public void setInstanceOperation(String clusterName, String instanceName,
       @Nullable InstanceConstants.InstanceOperation instanceOperation) {
 
     BaseDataAccessor<ZNRecord> baseAccessor = new ZkBaseDataAccessor<>(_zkClient);
     String path = PropertyPathBuilder.instanceConfig(clusterName, instanceName);
 
-    // InstanceOperation can only be set to SWAP_IN when the instance is added to the cluster
-    // or if it is disabled.
-    if (instanceOperation != null && instanceOperation.equals(
-        InstanceConstants.InstanceOperation.SWAP_IN) && getInstanceConfig(clusterName,
-        instanceName).getInstanceEnabled()) {
-      throw new HelixException("InstanceOperation should only be set to "
-          + InstanceConstants.InstanceOperation.SWAP_IN.name()
-          + " when an instance joins the cluster for the first time(when "
-          + "creating the InstanceConfig) or is disabled.");
+    InstanceConfig instanceConfig = getInstanceConfig(clusterName, instanceName);
+    if (instanceConfig == null) {
+      throw new HelixException("Cluster " + clusterName + ", instance: " + instanceName
+          + ", instance config does not exist");
     }
-
-    // InstanceOperation cannot be set to null if there is an instance with the same logicalId in
-    // the cluster which does not have InstanceOperation set to SWAP_IN or SWAP_OUT.
-    if (instanceOperation == null) {
-      InstanceConfig instanceConfig = getInstanceConfig(clusterName, instanceName);
-      String logicalIdKey = ClusterTopologyConfig.createFromClusterConfig(
-          _configAccessor.getClusterConfig(clusterName)).getEndNodeType();
-
-      HelixConfigScope instanceConfigScope =
-          new HelixConfigScopeBuilder(HelixConfigScope.ConfigScopeProperty.PARTICIPANT,
-              clusterName).build();
-      List<String> existingInstanceIds = getConfigKeys(instanceConfigScope);
-      List<InstanceConfig> matchingInstancesWithNonSwappingInstanceOperation =
-          existingInstanceIds.parallelStream()
-              .map(existingInstanceId -> getInstanceConfig(clusterName, existingInstanceId)).filter(
-                  existingInstanceConfig ->
-                      !existingInstanceConfig.getInstanceName().equals(instanceName)
-                          && existingInstanceConfig.getLogicalId(logicalIdKey)
-                          .equals(instanceConfig.getLogicalId(logicalIdKey))
-                          && !existingInstanceConfig.getInstanceOperation()
-                          .equals(InstanceConstants.InstanceOperation.SWAP_IN.name())
-                          && !existingInstanceConfig.getInstanceOperation()
-                          .equals(InstanceConstants.InstanceOperation.SWAP_OUT.name())
-                          && !existingInstanceConfig.getInstanceOperation()
-                          .equals(InstanceConstants.InstanceOperation.EVACUATE.name()))
-              .collect(Collectors.toList());
-
-      if (!matchingInstancesWithNonSwappingInstanceOperation.isEmpty()) {
-        throw new HelixException("InstanceOperation cannot be set to null for " + instanceName
-            + " if there are other instances with the same logicalId in the cluster that do not have"
-            + " InstanceOperation set to SWAP_IN, SWAP_OUT, or EVACUATE.");
-      }
-    }
-
-    if (!baseAccessor.exists(path, 0)) {
-      throw new HelixException(
-          "Cluster " + clusterName + ", instance: " + instanceName + ", instance config does not exist");
-    }
+    List<InstanceConfig> matchingLogicalIdInstances =
+        findInstancesMatchingLogicalId(clusterName, instanceConfig);
+    validateInstanceOperationTransition(instanceConfig,
+        !matchingLogicalIdInstances.isEmpty() ? matchingLogicalIdInstances.get(0) : null,
+        instanceConfig.getInstanceOperation(),
+        instanceOperation == null ? InstanceConstants.InstanceOperation.ENABLE : instanceOperation,
+        clusterName);
 
    boolean succeeded = baseAccessor.update(path, new DataUpdater<ZNRecord>() {
       @Override
@@ -589,50 +519,33 @@
   public boolean isEvacuateFinished(String clusterName, String instanceName) {
     if (!instanceHasFullAutoCurrentStateOrMessage(clusterName, instanceName)) {
       InstanceConfig config = getInstanceConfig(clusterName, instanceName);
-      return config != null && config.getInstanceOperation().equals(InstanceConstants.InstanceOperation.EVACUATE.name());
+      return config != null && config.getInstanceOperation()
+          .equals(InstanceConstants.InstanceOperation.EVACUATE);
     }
     return false;
   }
 
   /**
-   * Find the instance that the passed instance is swapping with. If the passed instance has
-   * SWAP_OUT instanceOperation, then find the corresponding instance that has SWAP_IN
-   * instanceOperation. If the passed instance has SWAP_IN instanceOperation, then find the
-   * corresponding instance that has SWAP_OUT instanceOperation.
+   * Find the instance that the passed instance has a matching logicalId with.
    *
    * @param clusterName    The cluster name
-   * @param instanceConfig The instance to find the swap instance for
-   * @return The swap instance if found, null otherwise.
+   * @param instanceConfig The instance to find the matching instance for
+   * @return The matching instance if found, null otherwise.
    */
-  @Nullable
-  private InstanceConfig findMatchingSwapInstance(String clusterName,
+  private List<InstanceConfig> findInstancesMatchingLogicalId(String clusterName,
       InstanceConfig instanceConfig) {
     String logicalIdKey =
         ClusterTopologyConfig.createFromClusterConfig(_configAccessor.getClusterConfig(clusterName))
             .getEndNodeType();
-
-    for (String potentialSwappingInstance : getConfigKeys(
+    return getConfigKeys(
         new HelixConfigScopeBuilder(HelixConfigScope.ConfigScopeProperty.PARTICIPANT,
-            clusterName).build())) {
-      InstanceConfig potentialSwappingInstanceConfig =
-          getInstanceConfig(clusterName, potentialSwappingInstance);
-
-      // Return if there is a matching Instance with the same logicalId and opposite InstanceOperation swap operation.
-      if (potentialSwappingInstanceConfig.getLogicalId(logicalIdKey)
-          .equals(instanceConfig.getLogicalId(logicalIdKey)) && (
-          instanceConfig.getInstanceOperation()
-              .equals(InstanceConstants.InstanceOperation.SWAP_IN.name())
-              && potentialSwappingInstanceConfig.getInstanceOperation()
-              .equals(InstanceConstants.InstanceOperation.SWAP_OUT.name())) || (
-          instanceConfig.getInstanceOperation()
-              .equals(InstanceConstants.InstanceOperation.SWAP_OUT.name())
-              && potentialSwappingInstanceConfig.getInstanceOperation()
-              .equals(InstanceConstants.InstanceOperation.SWAP_IN.name()))) {
-        return potentialSwappingInstanceConfig;
-      }
-    }
-
-    return null;
+            clusterName).build()).stream()
+        .map(instanceName -> getInstanceConfig(clusterName, instanceName)).filter(
+            potentialInstanceConfig ->
+                !potentialInstanceConfig.getInstanceName().equals(instanceConfig.getInstanceName())
+                    && potentialInstanceConfig.getLogicalId(logicalIdKey)
+                    .equals(instanceConfig.getLogicalId(logicalIdKey)))
+        .collect(Collectors.toList());
   }
 
   /**
@@ -661,14 +574,13 @@
         accessor.getProperty(keyBuilder.liveInstance(swapInInstanceName));
     InstanceConfig swapOutInstanceConfig = getInstanceConfig(clusterName, swapOutInstanceName);
     InstanceConfig swapInInstanceConfig = getInstanceConfig(clusterName, swapInInstanceName);
-    if (swapInLiveInstance == null || !swapInInstanceConfig.getInstanceEnabled()) {
+    if (swapInLiveInstance == null) {
       logger.warn(
-          "SwapOutInstance {} is {} + {} and SwapInInstance {} is {} + {} for cluster {}. Swap will"
-              + " not complete unless SwapInInstance instance is ENABLED and ONLINE.",
+          "SwapOutInstance {} is {} + {} and SwapInInstance {} is OFFLINE + {} for cluster {}. Swap will"
+              + " not complete unless SwapInInstance instance is ONLINE.",
           swapOutInstanceName, swapOutLiveInstance != null ? "ONLINE" : "OFFLINE",
-          swapOutInstanceConfig.getInstanceEnabled() ? "ENABLED" : "DISABLED", swapInInstanceName,
-          swapInLiveInstance != null ? "ONLINE" : "OFFLINE",
-          swapInInstanceConfig.getInstanceEnabled() ? "ENABLED" : "DISABLED", clusterName);
+          swapOutInstanceConfig.getInstanceOperation(), swapInInstanceName,
+          swapInInstanceConfig.getInstanceOperation(), clusterName);
       return false;
     }
 
@@ -705,21 +617,15 @@
       return false;
     }
 
-    // 4. Collect a list of all partitions that have a current state on swapOutInstance
-    String swapOutLastActiveSession;
-    if (swapOutLiveInstance == null) {
-      // SwapOutInstance is down, try to find the last active session
-      if (swapOutSessions.size() != 1) {
-        logger.warn(
-            "SwapOutInstance {} is offline and has {} sessions for cluster {}. Swap can't be "
-                + "verified if last active session can't be determined. There should only be one session.",
-            swapOutInstanceName, swapOutSessions.size(), clusterName);
-        return false;
-      }
-      swapOutLastActiveSession = swapOutSessions.get(0);
-    } else {
-      swapOutLastActiveSession = swapOutLiveInstance.getEphemeralOwner();
+    // 4. If the swap-out instance is not alive or is disabled, we return true without checking
+    // the current states on the swap-in instance.
+    if (swapOutLiveInstance == null || swapOutInstanceConfig.getInstanceOperation()
+        .equals(InstanceConstants.InstanceOperation.DISABLE)) {
+      return true;
     }
+
+    // 5. Collect a list of all partitions that have a current state on swapOutInstance
+    String swapOutLastActiveSession = swapOutLiveInstance.getEphemeralOwner();
     String swapInActiveSession = swapInLiveInstance.getEphemeralOwner();
 
     // Iterate over all resources with current states on the swapOutInstance
@@ -754,24 +660,22 @@
         String swapOutPartitionState = swapOutResourceCurrentState.getState(partitionName);
         String swapInPartitionState = swapInResourceCurrentState.getState(partitionName);
 
-        // SwapInInstance should not have any partitions in ERROR state.
-        if (swapInPartitionState.equals(HelixDefinedState.ERROR.name())) {
-          logger.warn(
-              "SwapOutInstance {} has partition {} in state {} and SwapInInstance {} has partition {} in state {} for cluster {}."
-                  + " Swap will not complete unless both instances have no partitions in ERROR state.",
-              swapOutInstanceName, partitionName, swapOutPartitionState, swapInInstanceName,
-              partitionName, swapInPartitionState, clusterName);
-          return false;
-        }
-
-        // The state of the partition on the swapInInstance be in the topState or a secondTopState.
-        // It should be in a topState only if the state model allows multiple replicas in the topState.
-        // In all other cases it should be a secondTopState.
-        if (!(swapInPartitionState.equals(topState) || secondTopStates.contains(
+        // SwapInInstance should have the correct state for the partition.
+        // All states should match except for the case where the topState is not ALL_REPLICAS or ALL_CANDIDATE_NODES
+        // or the swap-out partition is ERROR state.
+        // When the topState is not ALL_REPLICAS or ALL_CANDIDATE_NODES, the swap-in partition should be in a secondTopStates.
+        if (!(swapOutPartitionState.equals(HelixDefinedState.ERROR.name()) || (
+            topState.equals(swapOutPartitionState) && (
+                swapOutPartitionState.equals(swapInPartitionState) ||
+                    !ImmutableSet.of(StateModelDefinition.STATE_REPLICA_COUNT_ALL_REPLICAS,
+                        StateModelDefinition.STATE_REPLICA_COUNT_ALL_CANDIDATE_NODES).contains(
+                        stateModelDefinition.getNumInstancesPerState(
+                            stateModelDefinition.getTopState())) && secondTopStates.contains(
+                        swapInPartitionState))) || swapOutPartitionState.equals(
             swapInPartitionState))) {
           logger.warn(
               "SwapOutInstance {} has partition {} in {} but SwapInInstance {} has partition {} in state {} for cluster {}."
-                  + " Swap will not complete unless SwapInInstance has partition in topState or secondState.",
+                  + " Swap will not complete unless SwapInInstance has partition in correct states.",
               swapOutInstanceName, partitionName, swapOutPartitionState, swapInInstanceName,
               partitionName, swapInPartitionState, clusterName);
           return false;
@@ -792,12 +696,21 @@
       return false;
     }
 
-    InstanceConfig swapOutInstanceConfig = instanceConfig.getInstanceOperation()
-        .equals(InstanceConstants.InstanceOperation.SWAP_OUT.name()) ? instanceConfig
-        : findMatchingSwapInstance(clusterName, instanceConfig);
+    List<InstanceConfig> swappingInstances =
+        findInstancesMatchingLogicalId(clusterName, instanceConfig);
+    if (swappingInstances.size() != 1) {
+      logger.warn(
+          "Instance {} in cluster {} is not swapping with any other instance. Cannot determine if the swap is complete.",
+          instanceName, clusterName);
+      return false;
+    }
+
+    InstanceConfig swapOutInstanceConfig =
+        !instanceConfig.getInstanceOperation().equals(InstanceConstants.InstanceOperation.SWAP_IN)
+            ? instanceConfig : swappingInstances.get(0);
     InstanceConfig swapInInstanceConfig = instanceConfig.getInstanceOperation()
-        .equals(InstanceConstants.InstanceOperation.SWAP_IN.name()) ? instanceConfig
-        : findMatchingSwapInstance(clusterName, instanceConfig);
+        .equals(InstanceConstants.InstanceOperation.SWAP_IN) ? instanceConfig
+        : swappingInstances.get(0);
     if (swapOutInstanceConfig == null || swapInInstanceConfig == null) {
       logger.warn(
           "Instance {} in cluster {} is not swapping with any other instance. Cannot determine if the swap is complete.",
@@ -821,12 +734,21 @@
       return false;
     }
 
-    InstanceConfig swapOutInstanceConfig = instanceConfig.getInstanceOperation()
-        .equals(InstanceConstants.InstanceOperation.SWAP_OUT.name()) ? instanceConfig
-        : findMatchingSwapInstance(clusterName, instanceConfig);
+    List<InstanceConfig> swappingInstances =
+        findInstancesMatchingLogicalId(clusterName, instanceConfig);
+    if (swappingInstances.size() != 1) {
+      logger.warn(
+          "Instance {} in cluster {} is not swapping with any other instance. Cannot determine if the swap is complete.",
+          instanceName, clusterName);
+      return false;
+    }
+
+    InstanceConfig swapOutInstanceConfig =
+        !instanceConfig.getInstanceOperation().equals(InstanceConstants.InstanceOperation.SWAP_IN)
+            ? instanceConfig : swappingInstances.get(0);
     InstanceConfig swapInInstanceConfig = instanceConfig.getInstanceOperation()
-        .equals(InstanceConstants.InstanceOperation.SWAP_IN.name()) ? instanceConfig
-        : findMatchingSwapInstance(clusterName, instanceConfig);
+        .equals(InstanceConstants.InstanceOperation.SWAP_IN) ? instanceConfig
+        : swappingInstances.get(0);
     if (swapOutInstanceConfig == null || swapInInstanceConfig == null) {
       logger.warn(
           "Instance {} in cluster {} is not swapping with any other instance. Cannot determine if the swap is complete.",
@@ -840,11 +762,39 @@
       return false;
     }
 
-    // Complete the swap by removing the InstanceOperation for the SWAP_IN node and disabling the SWAP_OUT node.
-    setInstanceOperation(clusterName, swapInInstanceConfig.getInstanceName(), null);
-    enableInstance(clusterName, swapOutInstanceConfig.getInstanceName(), false);
+    BaseDataAccessor<ZNRecord> baseAccessor = new ZkBaseDataAccessor<>(_zkClient);
+    String swapInInstanceConfigPath =
+        PropertyPathBuilder.instanceConfig(clusterName, swapInInstanceConfig.getInstanceName());
+    String swapOutInstanceConfigPath =
+        PropertyPathBuilder.instanceConfig(clusterName, swapOutInstanceConfig.getInstanceName());
 
-    return true;
+    Map<String, DataUpdater<ZNRecord>> updaterMap = new HashMap<>();
+    updaterMap.put(swapInInstanceConfigPath, currentData -> {
+      if (currentData == null) {
+        throw new HelixException("Cluster: " + clusterName + ", instance: " + instanceName
+            + ", SWAP_IN instance config is null");
+      }
+
+      InstanceConfig currentSwapOutInstanceConfig =
+          getInstanceConfig(clusterName, swapOutInstanceConfig.getInstanceName());
+      InstanceConfig config = new InstanceConfig(currentData);
+      config.overwriteInstanceConfig(currentSwapOutInstanceConfig);
+      // Special handling in case the swap-out instance does not have HELIX_ENABLED or InstanceOperation set.
+      return config.getRecord();
+    });
+
+    updaterMap.put(swapOutInstanceConfigPath, currentData -> {
+      if (currentData == null) {
+        throw new HelixException("Cluster: " + clusterName + ", instance: " + instanceName
+            + ", swap out instance config is null");
+      }
+
+      InstanceConfig config = new InstanceConfig(currentData);
+      config.setInstanceOperation(InstanceConstants.InstanceOperation.UNKNOWN);
+      return config.getRecord();
+    });
+
+    return baseAccessor.multiSet(updaterMap);
   }
 
   @Override
@@ -2427,6 +2377,7 @@
     setResourceIdealState(clusterName, resourceName, idealState);
   }
 
+  @Deprecated
   private void enableSingleInstance(final String clusterName, final String instanceName,
       final boolean enabled, BaseDataAccessor<ZNRecord> baseAccessor,
       InstanceConstants.InstanceDisabledType disabledType, String reason) {
@@ -2448,7 +2399,7 @@
         InstanceConfig config = new InstanceConfig(currentData);
         config.setInstanceEnabled(enabled);
         if (!enabled) {
-          // new disabled type and reason will over write existing ones.
+          // new disabled type and reason will overwrite existing ones.
           config.resetInstanceDisabledTypeAndReason();
           if (reason != null) {
             config.setInstanceDisabledReason(reason);
diff --git a/helix-core/src/main/java/org/apache/helix/model/ClusterConfig.java b/helix-core/src/main/java/org/apache/helix/model/ClusterConfig.java
index fdc05ac..edb7a76 100644
--- a/helix-core/src/main/java/org/apache/helix/model/ClusterConfig.java
+++ b/helix-core/src/main/java/org/apache/helix/model/ClusterConfig.java
@@ -75,6 +75,8 @@
     // TODO: if we want to support this for other rebalancers, we need to implement that logic
     GLOBAL_MAX_PARTITIONS_ALLOWED_PER_INSTANCE,
     // The following two include offline AND disabled instances
+    // TODO: At some point we should rename this to something like MAX_INSTANCES_UNABLE_TO_TAKE_ACCEPT_REPLICAS
+    //     to make it clear that it includes both offline and non-assignable instances
     MAX_OFFLINE_INSTANCES_ALLOWED,
     NUM_OFFLINE_INSTANCES_FOR_AUTO_EXIT, // For auto-exiting maintenance mode
 
@@ -88,7 +90,9 @@
     // state transition if the number of
     // partitons that need recovery or in
     // error exceeds this limitation
+    @Deprecated // TODO: Remove in Helix 2.0
     DISABLED_INSTANCES,
+    @Deprecated // TODO: Remove in Helix 2.0
     DISABLED_INSTANCES_WITH_INFO,
     // disabled instances and disabled instances with info are for storing batch disabled instances.
     // disabled instances will write into both 2 fields for backward compatibility.
@@ -816,8 +820,11 @@
 
   /**
    * Get current disabled instance map of <instance, disabledTimeStamp>
+   * @deprecated We will no longer be using the clusterConfig to disable instances
+   * please use the InstanceConfig to disable instances
    * @return a non-null map of disabled instances in cluster config
    */
+  @Deprecated
   public Map<String, String> getDisabledInstances() {
     Map<String, String> disabledInstances =
         _record.getMapField(ClusterConfigProperty.DISABLED_INSTANCES.name());
@@ -827,8 +834,10 @@
   /**
    * Get current disabled instance map of
    * <instance, disabledReason = "res, disabledType = typ, disabledTimeStamp = time">
+   * @deprecated Please use InstanceConfig for enabling and disabling instances
    * @return a non-null map of disabled instances in cluster config
    */
+  @Deprecated
   public Map<String, String> getDisabledInstancesWithInfo() {
     Map<String, String> disabledInstances =
         _record.getMapField(ClusterConfigProperty.DISABLED_INSTANCES_WITH_INFO.name());
diff --git a/helix-core/src/main/java/org/apache/helix/model/InstanceConfig.java b/helix-core/src/main/java/org/apache/helix/model/InstanceConfig.java
index 2f3da14..de41646 100644
--- a/helix-core/src/main/java/org/apache/helix/model/InstanceConfig.java
+++ b/helix-core/src/main/java/org/apache/helix/model/InstanceConfig.java
@@ -29,6 +29,7 @@
 import java.util.Set;
 import java.util.stream.Collectors;
 
+import com.google.common.collect.ImmutableSet;
 import org.apache.helix.HelixException;
 import org.apache.helix.HelixProperty;
 import org.apache.helix.constants.InstanceConstants;
@@ -47,9 +48,7 @@
    * Configurable characteristics of an instance
    */
   public enum InstanceConfigProperty {
-    HELIX_HOST,
-    HELIX_PORT,
-    HELIX_ZONE_ID,
+    HELIX_HOST, HELIX_PORT, HELIX_ZONE_ID, @Deprecated
     HELIX_ENABLED,
     HELIX_ENABLED_TIMESTAMP,
     HELIX_DISABLED_REASON,
@@ -71,6 +70,13 @@
   private static final int TARGET_TASK_THREAD_POOL_SIZE_NOT_SET = -1;
   private static final boolean HELIX_ENABLED_DEFAULT_VALUE = true;
 
+  // These fields are not allowed to be overwritten by the merge method because
+  // they are unique properties of an instance.
+  private static final ImmutableSet<InstanceConfigProperty> NON_OVERWRITABLE_PROPERTIES =
+      ImmutableSet.of(InstanceConfigProperty.HELIX_HOST, InstanceConfigProperty.HELIX_PORT,
+          InstanceConfigProperty.HELIX_ZONE_ID, InstanceConfigProperty.DOMAIN,
+          InstanceConfigProperty.INSTANCE_INFO_MAP);
+
   private static final Logger _logger = LoggerFactory.getLogger(InstanceConfig.class.getName());
 
   /**
@@ -252,20 +258,22 @@
   }
 
   /**
-   * Check if this instance is enabled and able to serve replicas
-   * @return true if enabled, false if disabled
+   * Get the timestamp (milliseconds from epoch) when this instance was enabled/disabled last time.
+   *
+   * @return the timestamp when the instance was enabled/disabled last time. If the instance is never
+   *        enabled/disabled, return -1.
    */
-  public boolean getInstanceEnabled() {
-    return _record.getBooleanField(InstanceConfigProperty.HELIX_ENABLED.name(),
-        HELIX_ENABLED_DEFAULT_VALUE);
+  public long getInstanceEnabledTime() {
+    return _record.getLongField(InstanceConfigProperty.HELIX_ENABLED_TIMESTAMP.name(), -1);
   }
 
   /**
-   * Set the enabled state of the instance
-   * If user enables the instance, HELIX_DISABLED_REASON filed will be removed.
-   *
+   * Set the enabled state of the instance If user enables the instance, HELIX_DISABLED_REASON filed
+   * will be removed.
+   * @deprecated This method is deprecated. Please use setInstanceOperation instead.
    * @param enabled true to enable, false to disable
    */
+  @Deprecated
   public void setInstanceEnabled(boolean enabled) {
     // set instance operation only when we need to change InstanceEnabled value.
     setInstanceEnabledHelper(enabled);
@@ -292,7 +300,7 @@
    * It will be a no-op when instance is enabled.
    */
   public void setInstanceDisabledReason(String disabledReason) {
-     if (!getInstanceEnabled()) {
+    if (getInstanceOperation().equals(InstanceConstants.InstanceOperation.DISABLE)) {
        _record.setSimpleField(InstanceConfigProperty.HELIX_DISABLED_REASON.name(), disabledReason);
      }
   }
@@ -302,13 +310,14 @@
    * It will be a no-op when instance is enabled.
    */
   public void setInstanceDisabledType(InstanceConstants.InstanceDisabledType disabledType) {
-    if (!getInstanceEnabled()) {
+    if (getInstanceOperation().equals(InstanceConstants.InstanceOperation.DISABLE)) {
       _record.setSimpleField(InstanceConfigProperty.HELIX_DISABLED_TYPE.name(),
           disabledType.name());
     }
   }
 
   /**
+   * Get the instance disabled reason when instance is disabled.
    * @return Return instance disabled reason. Default is am empty string.
    */
   public String getInstanceDisabledReason() {
@@ -321,7 +330,7 @@
    *         Default is am empty string.
    */
   public String getInstanceDisabledType() {
-    if (getInstanceEnabled()) {
+    if (!getInstanceOperation().equals(InstanceConstants.InstanceOperation.DISABLE)) {
       return InstanceConstants.INSTANCE_NOT_DISABLED;
     }
     return _record.getStringField(InstanceConfigProperty.HELIX_DISABLED_TYPE.name(),
@@ -329,21 +338,85 @@
   }
 
   /**
-   * Get the timestamp (milliseconds from epoch) when this instance was enabled/disabled last time.
+   * Set the instance operation for this instance.
    *
-   * @return
+   * @param operation the instance operation
    */
-  public long getInstanceEnabledTime() {
-    return _record.getLongField(InstanceConfigProperty.HELIX_ENABLED_TIMESTAMP.name(), -1);
-  }
-
   public void setInstanceOperation(InstanceConstants.InstanceOperation operation) {
     _record.setSimpleField(InstanceConfigProperty.INSTANCE_OPERATION.name(),
         operation == null ? "" : operation.name());
+    if (operation == null || operation == InstanceConstants.InstanceOperation.ENABLE
+        || operation == InstanceConstants.InstanceOperation.DISABLE) {
+      // We are still setting the HELIX_ENABLED field for backwards compatibility.
+      // It is possible that users will be using earlier version of HelixAdmin or helix-rest
+      // is on older version.
+      // TODO: Remove this when we are sure that all users are using the new field INSTANCE_OPERATION.
+      setInstanceEnabledHelper(!(operation == InstanceConstants.InstanceOperation.DISABLE));
+    }
   }
 
-  public String getInstanceOperation() {
-    return _record.getStringField(InstanceConfigProperty.INSTANCE_OPERATION.name(), "");
+  private void setInstanceOperationInit(InstanceConstants.InstanceOperation operation) {
+    if (operation == null) {
+      return;
+    }
+    _record.setSimpleField(InstanceConfigProperty.INSTANCE_OPERATION.name(), operation.name());
+  }
+
+  /**
+   * Get the InstanceOperation of this instance, default is ENABLE if nothing is set. If
+   * HELIX_ENABLED is set to false, then the instance operation is DISABLE for backwards
+   * compatibility.
+   *
+   * @return the instance operation
+   */
+  public InstanceConstants.InstanceOperation getInstanceOperation() {
+    String instanceOperationString =
+        _record.getSimpleField(InstanceConfigProperty.INSTANCE_OPERATION.name());
+
+    InstanceConstants.InstanceOperation instanceOperation;
+    try {
+      // If INSTANCE_OPERATION is not set, then the instance is enabled.
+      instanceOperation = (instanceOperationString == null || instanceOperationString.isEmpty())
+          ? InstanceConstants.InstanceOperation.ENABLE
+          : InstanceConstants.InstanceOperation.valueOf(instanceOperationString);
+    } catch (IllegalArgumentException e) {
+      _logger.error("Invalid instance operation: " + instanceOperationString + " for instance: "
+          + _record.getId()
+          + ". You may need to update your version of Helix to get support for this "
+          + "type of InstanceOperation. Defaulting to UNKNOWN.");
+      return InstanceConstants.InstanceOperation.UNKNOWN;
+    }
+
+    // Always respect the HELIX_ENABLED being set to false when instance operation is unset
+    // for backwards compatibility.
+    if (!_record.getBooleanField(InstanceConfigProperty.HELIX_ENABLED.name(),
+        HELIX_ENABLED_DEFAULT_VALUE)
+        && (InstanceConstants.INSTANCE_DISABLED_OVERRIDABLE_OPERATIONS.contains(
+        instanceOperation))) {
+      return InstanceConstants.InstanceOperation.DISABLE;
+    }
+
+    return instanceOperation;
+  }
+
+  /**
+   * Check if this instance is enabled. This is used to determine if the instance can host online
+   * replicas and take new assignment.
+   *
+   * @return true if enabled, false otherwise
+   */
+  public boolean getInstanceEnabled() {
+    return getInstanceOperation().equals(InstanceConstants.InstanceOperation.ENABLE);
+  }
+
+  /**
+   * Check to see if the instance is assignable. This is used to determine if the instance can be
+   * selected by the rebalancer to take assignment of replicas.
+   *
+   * @return true if the instance is assignable, false otherwise
+   */
+  public boolean isAssignable() {
+    return InstanceConstants.ASSIGNABLE_INSTANCE_OPERATIONS.contains(getInstanceOperation());
   }
 
   /**
@@ -777,6 +850,34 @@
     return true;
   }
 
+  /**
+   * Overwrite the InstanceConfigProperties from the given InstanceConfig to this InstanceConfig.
+   * The merge is done by overwriting the properties in this InstanceConfig with the properties
+   * from the given InstanceConfig. {@link #NON_OVERWRITABLE_PROPERTIES} will not be overridden.
+   *
+   * @param overwritingInstanceConfig the InstanceConfig to override into this InstanceConfig
+   */
+  public void overwriteInstanceConfig(InstanceConfig overwritingInstanceConfig) {
+    // Remove all overwritable fields from the record
+    Set<String> overwritableProperties = Arrays.stream(InstanceConfigProperty.values())
+        .filter(property -> !NON_OVERWRITABLE_PROPERTIES.contains(property)).map(Enum::name)
+        .collect(Collectors.toSet());
+    _record.getSimpleFields().keySet().removeAll(overwritableProperties);
+    _record.getListFields().keySet().removeAll(overwritableProperties);
+    _record.getMapFields().keySet().removeAll(overwritableProperties);
+
+    // Get all overwritable fields from the overwritingInstanceConfig and set them in this record
+    overwritingInstanceConfig.getRecord().getSimpleFields().entrySet().stream()
+        .filter(entry -> overwritableProperties.contains(entry.getKey()))
+        .forEach((entry) -> _record.setSimpleField(entry.getKey(), entry.getValue()));
+    overwritingInstanceConfig.getRecord().getListFields().entrySet().stream()
+        .filter(entry -> overwritableProperties.contains(entry.getKey()))
+        .forEach((entry) -> _record.setListField(entry.getKey(), entry.getValue()));
+    overwritingInstanceConfig.getRecord().getMapFields().entrySet().stream()
+        .filter(entry -> overwritableProperties.contains(entry.getKey()))
+        .forEach((entry) -> _record.setMapField(entry.getKey(), entry.getValue()));
+  }
+
   public static class Builder {
     private String _hostName;
     private String _port;
@@ -828,12 +929,15 @@
         instanceConfig.addTag(tag);
       }
 
-      if (_instanceEnabled != HELIX_ENABLED_DEFAULT_VALUE) {
-        instanceConfig.setInstanceEnabled(_instanceEnabled);
+      if (_instanceOperation == null && _instanceEnabled != HELIX_ENABLED_DEFAULT_VALUE) {
+        instanceConfig.setInstanceOperationInit(
+            _instanceEnabled ? InstanceConstants.InstanceOperation.ENABLE
+                : InstanceConstants.InstanceOperation.DISABLE);
       }
 
-      if (_instanceOperation != null) {
-        instanceConfig.setInstanceOperation(_instanceOperation);
+      if (_instanceOperation != null && !_instanceOperation.equals(
+          InstanceConstants.InstanceOperation.ENABLE)) {
+        instanceConfig.setInstanceOperationInit(_instanceOperation);
       }
 
       if (_instanceInfoMap != null) {
@@ -899,9 +1003,11 @@
 
     /**
      * Set the enabled status for this instance
+     * @deprecated HELIX_ENABLED is no longer in use. Use setInstanceOperation instead.
      * @param instanceEnabled true if enabled, false otherwise
      * @return InstanceConfig.Builder
      */
+    @Deprecated
     public Builder setInstanceEnabled(boolean instanceEnabled) {
       _instanceEnabled = instanceEnabled;
       return this;
diff --git a/helix-core/src/main/java/org/apache/helix/model/MaintenanceSignal.java b/helix-core/src/main/java/org/apache/helix/model/MaintenanceSignal.java
index f978130..83e0e1c 100644
--- a/helix-core/src/main/java/org/apache/helix/model/MaintenanceSignal.java
+++ b/helix-core/src/main/java/org/apache/helix/model/MaintenanceSignal.java
@@ -50,7 +50,9 @@
    * maintenance mode. This field does not apply when triggered manually.
    */
   public enum AutoTriggerReason {
+    @Deprecated // Replaced with MAX_INSTANCES_UNABLE_TO_ACCEPT_ONLINE_REPLICAS
     MAX_OFFLINE_INSTANCES_EXCEEDED,
+    MAX_INSTANCES_UNABLE_TO_ACCEPT_ONLINE_REPLICAS,
     MAX_PARTITION_PER_INSTANCE_EXCEEDED,
     NOT_APPLICABLE // Not triggered automatically or automatically exiting maintenance mode
   }
diff --git a/helix-core/src/main/java/org/apache/helix/spectator/RoutingDataCache.java b/helix-core/src/main/java/org/apache/helix/spectator/RoutingDataCache.java
index 8872e9e..db9ada9 100644
--- a/helix-core/src/main/java/org/apache/helix/spectator/RoutingDataCache.java
+++ b/helix-core/src/main/java/org/apache/helix/spectator/RoutingDataCache.java
@@ -26,7 +26,6 @@
 import java.util.stream.Collectors;
 
 import com.google.common.collect.ImmutableMap;
-import com.google.common.collect.ImmutableSet;
 import org.apache.helix.HelixConstants;
 import org.apache.helix.HelixDataAccessor;
 import org.apache.helix.HelixException;
@@ -45,16 +44,13 @@
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+
 /**
  * Cache the cluster data that are needed by RoutingTableProvider.
  */
 class RoutingDataCache extends BasicClusterDataCache {
   private static Logger LOG = LoggerFactory.getLogger(RoutingDataCache.class.getName());
 
-  // When an instance has any of these instance operations, it should not be routable.
-  private static final ImmutableSet<String> NON_ROUTABLE_INSTANCE_OPERATIONS =
-      ImmutableSet.of(InstanceConstants.InstanceOperation.SWAP_IN.name());
-
   private final Map<PropertyType, List<String>> _sourceDataTypeMap;
 
   private CurrentStateCache _currentStateCache;
@@ -185,7 +181,7 @@
 
   private void updateRoutableInstanceConfigMap(Map<String, InstanceConfig> instanceConfigMap) {
     _routableInstanceConfigMap = instanceConfigMap.entrySet().stream().filter(
-            (instanceConfigEntry) -> !NON_ROUTABLE_INSTANCE_OPERATIONS.contains(
+            (instanceConfigEntry) -> !InstanceConstants.UNSERVABLE_INSTANCE_OPERATIONS.contains(
                 instanceConfigEntry.getValue().getInstanceOperation()))
         .collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue));
   }
@@ -194,7 +190,7 @@
       Map<String, LiveInstance> liveInstanceMap) {
     _routableLiveInstanceMap = liveInstanceMap.entrySet().stream().filter(
             (liveInstanceEntry) -> instanceConfigMap.containsKey(liveInstanceEntry.getKey())
-                && !NON_ROUTABLE_INSTANCE_OPERATIONS.contains(
+                && !InstanceConstants.UNSERVABLE_INSTANCE_OPERATIONS.contains(
                 instanceConfigMap.get(liveInstanceEntry.getKey()).getInstanceOperation()))
         .collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue));
   }
diff --git a/helix-core/src/main/java/org/apache/helix/task/JobDispatcher.java b/helix-core/src/main/java/org/apache/helix/task/JobDispatcher.java
index fd22a8e..6d4c687 100644
--- a/helix-core/src/main/java/org/apache/helix/task/JobDispatcher.java
+++ b/helix-core/src/main/java/org/apache/helix/task/JobDispatcher.java
@@ -141,8 +141,8 @@
     // Will contain the list of partitions that must be explicitly dropped from the ideal state that
     // is stored in zk.
     Set<String> liveInstances =
-        jobCfg.getInstanceGroupTag() == null ? _dataProvider.getAssignableEnabledLiveInstances()
-            : _dataProvider.getAssignableEnabledLiveInstancesWithTag(jobCfg.getInstanceGroupTag());
+        jobCfg.getInstanceGroupTag() == null ? _dataProvider.getEnabledLiveInstances()
+            : _dataProvider.getEnabledLiveInstancesWithTag(jobCfg.getInstanceGroupTag());
 
     if (liveInstances.isEmpty()) {
       LOG.error("No available instance found for job: {}", jobName);
diff --git a/helix-core/src/main/java/org/apache/helix/tools/ClusterSetup.java b/helix-core/src/main/java/org/apache/helix/tools/ClusterSetup.java
index 0d9a4f1..633ce03 100644
--- a/helix-core/src/main/java/org/apache/helix/tools/ClusterSetup.java
+++ b/helix-core/src/main/java/org/apache/helix/tools/ClusterSetup.java
@@ -313,7 +313,7 @@
 
     ClusterConfig clusterConfig = accessor.getProperty(keyBuilder.clusterConfig());
     // ensure node is disabled, otherwise fail
-    if (InstanceValidationUtil.isInstanceEnabled(config, clusterConfig)) {
+    if (config.getInstanceEnabled()) {
       String error = "Node " + instanceId + " is enabled, cannot drop";
       _logger.warn(error);
       throw new HelixException(error);
diff --git a/helix-core/src/main/java/org/apache/helix/tools/ClusterVerifiers/StrictMatchExternalViewVerifier.java b/helix-core/src/main/java/org/apache/helix/tools/ClusterVerifiers/StrictMatchExternalViewVerifier.java
index d0da9ba..7ed44f8 100644
--- a/helix-core/src/main/java/org/apache/helix/tools/ClusterVerifiers/StrictMatchExternalViewVerifier.java
+++ b/helix-core/src/main/java/org/apache/helix/tools/ClusterVerifiers/StrictMatchExternalViewVerifier.java
@@ -331,7 +331,7 @@
 
     for (String partition : idealState.getPartitionSet()) {
       List<String> preferenceList = AbstractRebalancer.getPreferenceList(new Partition(partition),
-          idealState, cache.getAssignableEnabledLiveInstances());
+          idealState, cache.getEnabledLiveInstances());
       Map<String, String> idealMapping;
       if (_isDeactivatedNodeAware) {
         idealMapping = HelixUtil.computeIdealMapping(preferenceList, stateModelDef,
@@ -339,7 +339,7 @@
                 cache.getDisabledInstancesForPartition(idealState.getResourceName(), partition));
       } else {
         idealMapping = HelixUtil.computeIdealMapping(preferenceList, stateModelDef,
-            cache.getAssignableEnabledLiveInstances(),
+            cache.getEnabledLiveInstances(),
                 Collections.emptySet());
       }
       idealPartitionState.put(partition, idealMapping);
diff --git a/helix-core/src/main/java/org/apache/helix/util/HelixUtil.java b/helix-core/src/main/java/org/apache/helix/util/HelixUtil.java
index 4a3d49b..834b846 100644
--- a/helix-core/src/main/java/org/apache/helix/util/HelixUtil.java
+++ b/helix-core/src/main/java/org/apache/helix/util/HelixUtil.java
@@ -399,7 +399,7 @@
 
     // Remove all disabled instances so that Helix will not consider them live.
     List<String> disabledInstance = instanceConfigs.stream()
-        .filter(instanceConfig -> !InstanceValidationUtil.isInstanceEnabled(instanceConfig, clusterConfig))
+        .filter(instanceConfig -> !instanceConfig.getInstanceEnabled())
         .map(InstanceConfig::getInstanceName)
         .collect(Collectors.toList());
     liveInstances.removeAll(disabledInstance);
diff --git a/helix-core/src/main/java/org/apache/helix/util/InstanceValidationUtil.java b/helix-core/src/main/java/org/apache/helix/util/InstanceValidationUtil.java
index 2542ecf..5dea683 100644
--- a/helix-core/src/main/java/org/apache/helix/util/InstanceValidationUtil.java
+++ b/helix-core/src/main/java/org/apache/helix/util/InstanceValidationUtil.java
@@ -73,22 +73,22 @@
   public static boolean isEnabled(HelixDataAccessor dataAccessor, String instanceName) {
     PropertyKey.Builder propertyKeyBuilder = dataAccessor.keyBuilder();
     InstanceConfig instanceConfig = dataAccessor.getProperty(propertyKeyBuilder.instanceConfig(instanceName));
-    ClusterConfig clusterConfig = dataAccessor.getProperty(propertyKeyBuilder.clusterConfig());
-    // TODO deprecate instance level config checks once migrated the enable status to cluster config only
-    if (instanceConfig == null || clusterConfig == null) {
-      throw new HelixException("InstanceConfig or ClusterConfig is NULL");
+    if (instanceConfig == null) {
+      throw new HelixException("InstanceConfig is NULL");
     }
 
-    return isInstanceEnabled(instanceConfig, clusterConfig);
-
+    return instanceConfig.getInstanceEnabled();
   }
 
   /**
    * Check if the instance is enabled by configuration
+   * @deprecated Use {@link InstanceConfig#getInstanceEnabled()} instead. We will no longer
+   * be using cluster config to enable/disable instances.
    * @param instanceConfig
    * @param clusterConfig
    * @return
    */
+  @Deprecated
   public static boolean isInstanceEnabled(InstanceConfig instanceConfig, ClusterConfig clusterConfig) {
     if (instanceConfig == null) {
       throw new HelixException("InstanceConfig is NULL");
diff --git a/helix-core/src/main/java/org/apache/helix/util/WeightAwareRebalanceUtil.java b/helix-core/src/main/java/org/apache/helix/util/WeightAwareRebalanceUtil.java
index bc43954..18ddc02 100644
--- a/helix-core/src/main/java/org/apache/helix/util/WeightAwareRebalanceUtil.java
+++ b/helix-core/src/main/java/org/apache/helix/util/WeightAwareRebalanceUtil.java
@@ -29,6 +29,7 @@
 import org.apache.helix.api.config.RebalanceConfig;
 import org.apache.helix.api.rebalancer.constraint.AbstractRebalanceHardConstraint;
 import org.apache.helix.api.rebalancer.constraint.AbstractRebalanceSoftConstraint;
+import org.apache.helix.constants.InstanceConstants;
 import org.apache.helix.controller.common.PartitionStateMap;
 import org.apache.helix.controller.common.ResourcesStateMap;
 import org.apache.helix.controller.dataproviders.ResourceControllerDataProvider;
@@ -77,7 +78,7 @@
       List<InstanceConfig> instanceConfigs) {
     for (InstanceConfig instanceConfig : instanceConfigs) {
       // ensure the instance is enabled
-      instanceConfig.setInstanceEnabled(true);
+      instanceConfig.setInstanceOperation(InstanceConstants.InstanceOperation.ENABLE);
       _instanceConfigMap.put(instanceConfig.getInstanceName(), instanceConfig);
     }
     // ensure no instance is disabled
diff --git a/helix-core/src/test/java/org/apache/helix/common/ZkTestBase.java b/helix-core/src/test/java/org/apache/helix/common/ZkTestBase.java
index 0218c3f..a265605 100644
--- a/helix-core/src/test/java/org/apache/helix/common/ZkTestBase.java
+++ b/helix-core/src/test/java/org/apache/helix/common/ZkTestBase.java
@@ -45,6 +45,7 @@
 import org.apache.helix.SystemPropertyKeys;
 import org.apache.helix.TestHelper;
 import org.apache.helix.api.config.HelixConfigProperty;
+import org.apache.helix.constants.InstanceConstants;
 import org.apache.helix.controller.pipeline.AbstractAsyncBaseStage;
 import org.apache.helix.controller.pipeline.Pipeline;
 import org.apache.helix.controller.pipeline.Stage;
@@ -633,7 +634,6 @@
 
     for (int i = 0; i < liveInstances.length; i++) {
       String instance = "localhost_" + liveInstances[i];
-
       _liveInstanceOwners.putIfAbsent(clusterName, new HashMap<>());
       Map<String, HelixZkClient> clientMap = _liveInstanceOwners.get(clusterName);
       clientMap.putIfAbsent(instance, DedicatedZkClientFactory.getInstance()
@@ -687,7 +687,7 @@
       InstanceConfig instanceConfig = new InstanceConfig(instance);
       instanceConfig.setHostName("localhost");
       instanceConfig.setPort("" + instances[i]);
-      instanceConfig.setInstanceEnabled(true);
+      instanceConfig.setInstanceOperation(InstanceConstants.InstanceOperation.ENABLE);
       admin.addInstance(clusterName, instanceConfig);
     }
   }
diff --git a/helix-core/src/test/java/org/apache/helix/controller/changedetector/TestResourceChangeDetector.java b/helix-core/src/test/java/org/apache/helix/controller/changedetector/TestResourceChangeDetector.java
index b4f405c..bdfa278 100644
--- a/helix-core/src/test/java/org/apache/helix/controller/changedetector/TestResourceChangeDetector.java
+++ b/helix-core/src/test/java/org/apache/helix/controller/changedetector/TestResourceChangeDetector.java
@@ -31,6 +31,7 @@
 import org.apache.helix.PropertyKey;
 import org.apache.helix.TestHelper;
 import org.apache.helix.common.ZkTestBase;
+import org.apache.helix.constants.InstanceConstants;
 import org.apache.helix.controller.dataproviders.ResourceControllerDataProvider;
 import org.apache.helix.integration.manager.ClusterControllerManager;
 import org.apache.helix.integration.manager.MockParticipantManager;
@@ -399,7 +400,7 @@
         _dataAccessor.getProperty(_keyBuilder.instanceConfig(instanceName));
     Assert.assertTrue(instanceConfig.getInstanceEnabled());
     try {
-      instanceConfig.setInstanceEnabled(false);
+      instanceConfig.setInstanceOperation(InstanceConstants.InstanceOperation.DISABLE);
       _dataAccessor.updateProperty(_keyBuilder.instanceConfig(instanceName), instanceConfig);
       _dataProvider.notifyDataChange(ChangeType.INSTANCE_CONFIG);
       _dataProvider.refresh(_dataAccessor);
@@ -410,7 +411,7 @@
     } finally {
       // remove newly added resource/ideastate
       _gSetupTool.getClusterManagementTool().dropResource(CLUSTER_NAME, resourceName);
-      instanceConfig.setInstanceEnabled(true);
+      instanceConfig.setInstanceOperation(InstanceConstants.InstanceOperation.ENABLE);
       _dataAccessor.updateProperty(_keyBuilder.instanceConfig(instanceName), instanceConfig);
     }
   }
diff --git a/helix-core/src/test/java/org/apache/helix/controller/rebalancer/TestAbstractRebalancer.java b/helix-core/src/test/java/org/apache/helix/controller/rebalancer/TestAbstractRebalancer.java
index 45c35f3..f8af3ee 100644
--- a/helix-core/src/test/java/org/apache/helix/controller/rebalancer/TestAbstractRebalancer.java
+++ b/helix-core/src/test/java/org/apache/helix/controller/rebalancer/TestAbstractRebalancer.java
@@ -56,6 +56,8 @@
             new IdealState("test"), new ClusterConfig("TestCluster"), partition,
             MonitoredAbnormalResolver.DUMMY_STATE_RESOLVER);
 
+    System.out.println("Expected best possible state map: " + expectedBestPossibleMap);
+    System.out.println("Actual best possible state map: " + bestPossibleMap);
     Assert.assertTrue(bestPossibleMap.equals(expectedBestPossibleMap));
   }
 
diff --git a/helix-core/src/test/java/org/apache/helix/controller/rebalancer/waged/TestWagedRebalancer.java b/helix-core/src/test/java/org/apache/helix/controller/rebalancer/waged/TestWagedRebalancer.java
index 951e0e3..a554283 100644
--- a/helix-core/src/test/java/org/apache/helix/controller/rebalancer/waged/TestWagedRebalancer.java
+++ b/helix-core/src/test/java/org/apache/helix/controller/rebalancer/waged/TestWagedRebalancer.java
@@ -118,9 +118,9 @@
       liveInstanceMap.put(instanceName, testLiveInstance);
       when(testCache.getAssignableLiveInstances()).thenReturn(liveInstanceMap);
       when(testCache.getLiveInstances()).thenReturn(liveInstanceMap);
-      when(testCache.getAssignableEnabledInstances()).thenReturn(liveInstanceMap.keySet());
       when(testCache.getEnabledInstances()).thenReturn(liveInstanceMap.keySet());
-      when(testCache.getAssignableEnabledLiveInstances()).thenReturn(liveInstanceMap.keySet());
+      when(testCache.getEnabledInstances()).thenReturn(liveInstanceMap.keySet());
+      when(testCache.getEnabledLiveInstances()).thenReturn(liveInstanceMap.keySet());
       when(testCache.getEnabledLiveInstances()).thenReturn(liveInstanceMap.keySet());
       when(testCache.getAssignableInstances()).thenReturn(_instances);
       when(testCache.getAllInstances()).thenReturn(_instances);
@@ -375,7 +375,7 @@
         Collectors.toMap(resourceName -> resourceName, Resource::new));
     try {
       rebalancer.computeBestPossibleAssignment(clusterData, resourceMap,
-          clusterData.getAssignableEnabledLiveInstances(), new CurrentStateOutput(), _algorithm);
+          clusterData.getEnabledLiveInstances(), new CurrentStateOutput(), _algorithm);
       Assert.fail("Rebalance shall fail.");
     } catch (HelixRebalanceException ex) {
       Assert.assertEquals(ex.getFailureType(), HelixRebalanceException.Type.FAILED_TO_CALCULATE);
@@ -439,7 +439,7 @@
     // Calculation will fail
     try {
       rebalancer.computeBestPossibleAssignment(clusterData, resourceMap,
-          clusterData.getAssignableEnabledLiveInstances(), new CurrentStateOutput(), badAlgorithm);
+          clusterData.getEnabledLiveInstances(), new CurrentStateOutput(), badAlgorithm);
       Assert.fail("Rebalance shall fail.");
     } catch (HelixRebalanceException ex) {
       Assert.assertEquals(ex.getFailureType(), HelixRebalanceException.Type.FAILED_TO_CALCULATE);
@@ -749,8 +749,8 @@
     Set<String> instances = new HashSet<>(_instances);
     instances.add(offlineInstance);
     when(clusterData.getAssignableInstances()).thenReturn(instances);
-    when(clusterData.getAssignableEnabledInstances()).thenReturn(instances);
-    when(clusterData.getAssignableEnabledLiveInstances()).thenReturn(
+    when(clusterData.getEnabledInstances()).thenReturn(instances);
+    when(clusterData.getEnabledLiveInstances()).thenReturn(
         new HashSet<>(Arrays.asList(instance0, instance1, instance2)));
     Map<String, Long> instanceOfflineTimeMap = new HashMap<>();
     instanceOfflineTimeMap.put(offlineInstance, System.currentTimeMillis() + Integer.MAX_VALUE);
@@ -894,8 +894,8 @@
     // force create a fake offlineInstance that's in delay window
     Set<String> instances = new HashSet<>(_instances);
     when(clusterData.getAssignableInstances()).thenReturn(instances);
-    when(clusterData.getAssignableEnabledInstances()).thenReturn(instances);
-    when(clusterData.getAssignableEnabledLiveInstances()).thenReturn(instances);
+    when(clusterData.getEnabledInstances()).thenReturn(instances);
+    when(clusterData.getEnabledLiveInstances()).thenReturn(instances);
     Map<String, InstanceConfig> instanceConfigMap = clusterData.getAssignableInstanceConfigMap();
     when(clusterData.getAssignableInstanceConfigMap()).thenReturn(instanceConfigMap);
     when(clusterData.getInstanceConfigMap()).thenReturn(instanceConfigMap);
diff --git a/helix-core/src/test/java/org/apache/helix/controller/rebalancer/waged/TestWagedRebalancerMetrics.java b/helix-core/src/test/java/org/apache/helix/controller/rebalancer/waged/TestWagedRebalancerMetrics.java
index c5c7b56..3fb05e5 100644
--- a/helix-core/src/test/java/org/apache/helix/controller/rebalancer/waged/TestWagedRebalancerMetrics.java
+++ b/helix-core/src/test/java/org/apache/helix/controller/rebalancer/waged/TestWagedRebalancerMetrics.java
@@ -266,9 +266,9 @@
       liveInstanceMap.put(instanceName, testLiveInstance);
       when(testCache.getAssignableLiveInstances()).thenReturn(liveInstanceMap);
       when(testCache.getLiveInstances()).thenReturn(liveInstanceMap);
-      when(testCache.getAssignableEnabledInstances()).thenReturn(liveInstanceMap.keySet());
       when(testCache.getEnabledInstances()).thenReturn(liveInstanceMap.keySet());
-      when(testCache.getAssignableEnabledLiveInstances()).thenReturn(liveInstanceMap.keySet());
+      when(testCache.getEnabledInstances()).thenReturn(liveInstanceMap.keySet());
+      when(testCache.getEnabledLiveInstances()).thenReturn(liveInstanceMap.keySet());
       when(testCache.getEnabledLiveInstances()).thenReturn(liveInstanceMap.keySet());
       when(testCache.getAssignableInstances()).thenReturn(_instances);
       when(testCache.getAllInstances()).thenReturn(_instances);
diff --git a/helix-core/src/test/java/org/apache/helix/controller/rebalancer/waged/model/AbstractTestClusterModel.java b/helix-core/src/test/java/org/apache/helix/controller/rebalancer/waged/model/AbstractTestClusterModel.java
index c9deb79..b75a340 100644
--- a/helix-core/src/test/java/org/apache/helix/controller/rebalancer/waged/model/AbstractTestClusterModel.java
+++ b/helix-core/src/test/java/org/apache/helix/controller/rebalancer/waged/model/AbstractTestClusterModel.java
@@ -29,6 +29,7 @@
 import java.util.Set;
 import java.util.stream.Collectors;
 
+import org.apache.helix.constants.InstanceConstants;
 import org.apache.helix.controller.dataproviders.ResourceControllerDataProvider;
 import org.apache.helix.controller.rebalancer.constraint.MonitoredAbnormalResolver;
 import org.apache.helix.model.BuiltInStateModelDefinitions;
@@ -84,7 +85,7 @@
     InstanceConfig testInstanceConfig = new InstanceConfig(instanceId);
     testInstanceConfig.setInstanceCapacityMap(_capacityDataMap);
     testInstanceConfig.addTag(_testInstanceTags.get(0));
-    testInstanceConfig.setInstanceEnabled(true);
+    testInstanceConfig.setInstanceOperation(InstanceConstants.InstanceOperation.ENABLE);
     testInstanceConfig.setZoneId(_testFaultZoneId);
     return testInstanceConfig;
   }
diff --git a/helix-core/src/test/java/org/apache/helix/controller/rebalancer/waged/model/TestClusterModelProvider.java b/helix-core/src/test/java/org/apache/helix/controller/rebalancer/waged/model/TestClusterModelProvider.java
index 34582d6..2e41b6d 100644
--- a/helix-core/src/test/java/org/apache/helix/controller/rebalancer/waged/model/TestClusterModelProvider.java
+++ b/helix-core/src/test/java/org/apache/helix/controller/rebalancer/waged/model/TestClusterModelProvider.java
@@ -106,7 +106,7 @@
     activeInstances.add(instance1);
     activeInstances.add(instance2);
     when(testCache.getAssignableLiveInstances()).thenReturn(liveInstanceMap);
-    when(testCache.getAssignableEnabledLiveInstances()).thenReturn(activeInstances);
+    when(testCache.getEnabledLiveInstances()).thenReturn(activeInstances);
 
     // test 0, empty input
     Assert.assertEquals(
@@ -144,7 +144,7 @@
     // test 2, no additional replica to be assigned
     testCache = setupClusterDataCache();
     when(testCache.getAssignableLiveInstances()).thenReturn(liveInstanceMap);
-    when(testCache.getAssignableEnabledLiveInstances()).thenReturn(activeInstances);
+    when(testCache.getEnabledLiveInstances()).thenReturn(activeInstances);
     input = ImmutableMap.of(
         _resourceNames.get(0),
         ImmutableMap.of(
@@ -169,7 +169,7 @@
     // test 3, minActiveReplica==2, two partitions falling short
     testCache = setupClusterDataCache();
     when(testCache.getAssignableLiveInstances()).thenReturn(liveInstanceMap);
-    when(testCache.getAssignableEnabledLiveInstances()).thenReturn(activeInstances);
+    when(testCache.getEnabledLiveInstances()).thenReturn(activeInstances);
     input = ImmutableMap.of(
         _resourceNames.get(0),
         ImmutableMap.of(
@@ -207,7 +207,7 @@
     activeInstances.add(instance1);
     activeInstances.add(instance2);
     when(testCache.getAssignableLiveInstances()).thenReturn(liveInstanceMap);
-    when(testCache.getAssignableEnabledLiveInstances()).thenReturn(activeInstances);
+    when(testCache.getEnabledLiveInstances()).thenReturn(activeInstances);
 
     // test 1, one partition under minActiveReplica
     Map<String, Map<String, Map<String, String>>> input = ImmutableMap.of(
@@ -247,7 +247,7 @@
     // test 2, minActiveReplica==2, three partitions falling short
     testCache = setupClusterDataCache();
     when(testCache.getAssignableLiveInstances()).thenReturn(liveInstanceMap);
-    when(testCache.getAssignableEnabledLiveInstances()).thenReturn(activeInstances);
+    when(testCache.getEnabledLiveInstances()).thenReturn(activeInstances);
     input = ImmutableMap.of(
         _resourceNames.get(0),
         ImmutableMap.of(
diff --git a/helix-core/src/test/java/org/apache/helix/controller/stages/TestBestPossibleCalcStageCompatibility.java b/helix-core/src/test/java/org/apache/helix/controller/stages/TestBestPossibleCalcStageCompatibility.java
index 0027f8e..2a548ce 100644
--- a/helix-core/src/test/java/org/apache/helix/controller/stages/TestBestPossibleCalcStageCompatibility.java
+++ b/helix-core/src/test/java/org/apache/helix/controller/stages/TestBestPossibleCalcStageCompatibility.java
@@ -58,6 +58,7 @@
     event.addAttribute(AttributeName.RESOURCES.name(), resourceMap);
     event.addAttribute(AttributeName.RESOURCES_TO_REBALANCE.name(), resourceMap);
     event.addAttribute(AttributeName.CURRENT_STATE.name(), currentStateOutput);
+    event.addAttribute(AttributeName.CURRENT_STATE_EXCLUDING_UNKNOWN.name(), currentStateOutput);
     event.addAttribute(AttributeName.ControllerDataProvider.name(),
         new ResourceControllerDataProvider());
 
@@ -94,6 +95,7 @@
     event.addAttribute(AttributeName.RESOURCES.name(), resourceMap);
     event.addAttribute(AttributeName.RESOURCES_TO_REBALANCE.name(), resourceMap);
     event.addAttribute(AttributeName.CURRENT_STATE.name(), currentStateOutput);
+    event.addAttribute(AttributeName.CURRENT_STATE_EXCLUDING_UNKNOWN.name(), currentStateOutput);
     event.addAttribute(AttributeName.ControllerDataProvider.name(),
         new ResourceControllerDataProvider());
 
diff --git a/helix-core/src/test/java/org/apache/helix/controller/stages/TestBestPossibleStateCalcStage.java b/helix-core/src/test/java/org/apache/helix/controller/stages/TestBestPossibleStateCalcStage.java
index e33dc9f..518c610 100644
--- a/helix-core/src/test/java/org/apache/helix/controller/stages/TestBestPossibleStateCalcStage.java
+++ b/helix-core/src/test/java/org/apache/helix/controller/stages/TestBestPossibleStateCalcStage.java
@@ -58,6 +58,7 @@
     event.addAttribute(AttributeName.RESOURCES.name(), resourceMap);
     event.addAttribute(AttributeName.RESOURCES_TO_REBALANCE.name(), resourceMap);
     event.addAttribute(AttributeName.CURRENT_STATE.name(), currentStateOutput);
+    event.addAttribute(AttributeName.CURRENT_STATE_EXCLUDING_UNKNOWN.name(), currentStateOutput);
     event.addAttribute(AttributeName.ControllerDataProvider.name(), new ResourceControllerDataProvider());
 
     ReadClusterDataStage stage1 = new ReadClusterDataStage();
@@ -117,6 +118,7 @@
     event.addAttribute(AttributeName.RESOURCES.name(), resourceMap);
     event.addAttribute(AttributeName.RESOURCES_TO_REBALANCE.name(), resourceMap);
     event.addAttribute(AttributeName.CURRENT_STATE.name(), currentStateOutput);
+    event.addAttribute(AttributeName.CURRENT_STATE_EXCLUDING_UNKNOWN.name(), currentStateOutput);
     event.addAttribute(AttributeName.ControllerDataProvider.name(),
         new ResourceControllerDataProvider());
 
diff --git a/helix-core/src/test/java/org/apache/helix/controller/stages/TestCancellationMessageGeneration.java b/helix-core/src/test/java/org/apache/helix/controller/stages/TestCancellationMessageGeneration.java
index 7b89152..7d815c1 100644
--- a/helix-core/src/test/java/org/apache/helix/controller/stages/TestCancellationMessageGeneration.java
+++ b/helix-core/src/test/java/org/apache/helix/controller/stages/TestCancellationMessageGeneration.java
@@ -68,6 +68,7 @@
     when(message.getToState()).thenReturn("SLAVE");
     when(currentStateOutput.getPendingMessage(TEST_RESOURCE, partition, TEST_INSTANCE)).thenReturn(message);
     event.addAttribute(AttributeName.CURRENT_STATE.name(), currentStateOutput);
+    event.addAttribute(AttributeName.CURRENT_STATE_EXCLUDING_UNKNOWN.name(), currentStateOutput);
 
     // Set helix manager to event
     event.addAttribute(AttributeName.helixmanager.name(), mock(HelixManager.class));
@@ -157,6 +158,7 @@
     when(currentStateOutput.getPendingMessage(TEST_RESOURCE, partition, TEST_INSTANCE))
         .thenReturn(pendingMessage);
     event.addAttribute(AttributeName.CURRENT_STATE.name(), currentStateOutput);
+    event.addAttribute(AttributeName.CURRENT_STATE_EXCLUDING_UNKNOWN.name(), currentStateOutput);
 
     // Set helix manager to event
     event.addAttribute(AttributeName.helixmanager.name(), mock(HelixManager.class));
diff --git a/helix-core/src/test/java/org/apache/helix/controller/stages/TestIntermediateStateCalcStage.java b/helix-core/src/test/java/org/apache/helix/controller/stages/TestIntermediateStateCalcStage.java
index 7da3d64..f15e6b8 100644
--- a/helix-core/src/test/java/org/apache/helix/controller/stages/TestIntermediateStateCalcStage.java
+++ b/helix-core/src/test/java/org/apache/helix/controller/stages/TestIntermediateStateCalcStage.java
@@ -177,6 +177,7 @@
     event.addAttribute(AttributeName.BEST_POSSIBLE_STATE.name(), bestPossibleStateOutput);
     event.addAttribute(AttributeName.MESSAGES_SELECTED.name(), messageSelectOutput);
     event.addAttribute(AttributeName.CURRENT_STATE.name(), currentStateOutput);
+    event.addAttribute(AttributeName.CURRENT_STATE_EXCLUDING_UNKNOWN.name(), currentStateOutput);
     event.addAttribute(AttributeName.ControllerDataProvider.name(), new ResourceControllerDataProvider());
     runStage(event, new ReadClusterDataStage());
     runStage(event, new IntermediateStateCalcStage());
@@ -261,6 +262,7 @@
 
     event.addAttribute(AttributeName.BEST_POSSIBLE_STATE.name(), bestPossibleStateOutput);
     event.addAttribute(AttributeName.CURRENT_STATE.name(), currentStateOutput);
+    event.addAttribute(AttributeName.CURRENT_STATE_EXCLUDING_UNKNOWN.name(), currentStateOutput);
     event.addAttribute(AttributeName.MESSAGES_SELECTED.name(), messageSelectOutput);
     event.addAttribute(AttributeName.ControllerDataProvider.name(), new ResourceControllerDataProvider());
     runStage(event, new ReadClusterDataStage());
@@ -379,6 +381,7 @@
 
     event.addAttribute(AttributeName.BEST_POSSIBLE_STATE.name(), bestPossibleStateOutput);
     event.addAttribute(AttributeName.CURRENT_STATE.name(), currentStateOutput);
+    event.addAttribute(AttributeName.CURRENT_STATE_EXCLUDING_UNKNOWN.name(), currentStateOutput);
     event.addAttribute(AttributeName.MESSAGES_SELECTED.name(), messageSelectOutput);
     event.addAttribute(AttributeName.ControllerDataProvider.name(),
         new ResourceControllerDataProvider());
@@ -553,6 +556,7 @@
     event.addAttribute(AttributeName.BEST_POSSIBLE_STATE.name(), bestPossibleStateOutput);
     event.addAttribute(AttributeName.MESSAGES_SELECTED.name(), messageSelectOutput);
     event.addAttribute(AttributeName.CURRENT_STATE.name(), currentStateOutput);
+    event.addAttribute(AttributeName.CURRENT_STATE_EXCLUDING_UNKNOWN.name(), currentStateOutput);
     event.addAttribute(AttributeName.ControllerDataProvider.name(), new ResourceControllerDataProvider());
     runStage(event, new ReadClusterDataStage());
     runStage(event, new IntermediateStateCalcStage());
diff --git a/helix-core/src/test/java/org/apache/helix/controller/stages/TestManagementMessageGeneration.java b/helix-core/src/test/java/org/apache/helix/controller/stages/TestManagementMessageGeneration.java
index 7c20b02..f1a3da4 100644
--- a/helix-core/src/test/java/org/apache/helix/controller/stages/TestManagementMessageGeneration.java
+++ b/helix-core/src/test/java/org/apache/helix/controller/stages/TestManagementMessageGeneration.java
@@ -84,6 +84,7 @@
     when(currentStateOutput.getPendingMessage(TEST_RESOURCE, partition, TEST_INSTANCE))
         .thenReturn(pendingMessage);
     event.addAttribute(AttributeName.CURRENT_STATE.name(), currentStateOutput);
+    event.addAttribute(AttributeName.CURRENT_STATE_EXCLUDING_UNKNOWN.name(), currentStateOutput);
 
     // Set helix manager to event
     event.addAttribute(AttributeName.helixmanager.name(), mock(HelixManager.class));
diff --git a/helix-core/src/test/java/org/apache/helix/controller/stages/TestManagementModeStage.java b/helix-core/src/test/java/org/apache/helix/controller/stages/TestManagementModeStage.java
index 1c61624..0c144d8 100644
--- a/helix-core/src/test/java/org/apache/helix/controller/stages/TestManagementModeStage.java
+++ b/helix-core/src/test/java/org/apache/helix/controller/stages/TestManagementModeStage.java
@@ -51,6 +51,7 @@
   public void beforeClass() {
     _clusterName = "CLUSTER_" + TestHelper.getTestClassName();
     _accessor = new ZKHelixDataAccessor(_clusterName, new ZkBaseDataAccessor<>(_gZkClient));
+    _gSetupTool.setupTestCluster(_clusterName);
     _manager = new DummyClusterManager(_clusterName, _accessor);
   }
 
@@ -65,6 +66,8 @@
     // ideal state: node0 is MASTER, node1 is SLAVE
     // replica=2 means 1 master and 1 slave
     setupIdealState(_clusterName, new int[]{0, 1}, new String[]{"TestDB"}, 1, 2);
+    _gSetupTool.addInstanceToCluster(_clusterName, "localhost_0");
+    _gSetupTool.addInstanceToCluster(_clusterName, "localhost_1");
     List<LiveInstance> liveInstances = setupLiveInstances(_clusterName, new int[]{0, 1});
     setupStateModel(_clusterName);
 
@@ -96,7 +99,7 @@
 
     ControllerHistory history =
         _accessor.getProperty(_accessor.keyBuilder().controllerLeaderHistory());
-    Assert.assertNull(history);
+    Assert.assertTrue(history.getMaintenanceHistoryList().isEmpty());
 
     // Mark both live instances to be frozen, then entering freeze mode is complete
     for (int i = 0; i < 2; i++) {
diff --git a/helix-core/src/test/java/org/apache/helix/controller/stages/TestReplicaLevelThrottling.java b/helix-core/src/test/java/org/apache/helix/controller/stages/TestReplicaLevelThrottling.java
index e4aeed0..515340d 100644
--- a/helix-core/src/test/java/org/apache/helix/controller/stages/TestReplicaLevelThrottling.java
+++ b/helix-core/src/test/java/org/apache/helix/controller/stages/TestReplicaLevelThrottling.java
@@ -71,7 +71,7 @@
     when(mock.cache.getClusterConfig()).thenReturn((ClusterConfig) cacheMap.get(CacheKeys.clusterConfig.name()));
     when(mock.cache.getStateModelDef((String) cacheMap.get(CacheKeys.stateModelName.name()))).thenReturn(
         (StateModelDefinition) cacheMap.get(CacheKeys.stateModelDef.name()));
-    when(mock.cache.getAssignableEnabledLiveInstances()).thenReturn(new HashSet<>(
+    when(mock.cache.getEnabledLiveInstances()).thenReturn(new HashSet<>(
         ((Map<String, List<String>>) cacheMap.get(CacheKeys.preferenceList.name())).values().iterator().next()));
     when(mock.cache.getLiveInstances()).thenReturn(new HashSet<>(
         ((Map<String, List<String>>) cacheMap.get(CacheKeys.preferenceList.name())).values().iterator().next()).stream()
@@ -189,6 +189,7 @@
         }
         ClusterEvent event = new ClusterEvent(CLUSTER_NAME, ClusterEventType.Unknown);
         event.addAttribute(AttributeName.CURRENT_STATE.name(), currentStateOutput); // add current states
+        event.addAttribute(AttributeName.CURRENT_STATE_EXCLUDING_UNKNOWN.name(), currentStateOutput); // add current states
         event.addAttribute(AttributeName.ControllerDataProvider.name(),
             buildCache(mock, numReplica, minActiveReplica, stateModelDef, stateModelName, preferenceLists));
         event.addAttribute(AttributeName.MESSAGES_SELECTED.name(), messageOutput);
diff --git a/helix-core/src/test/java/org/apache/helix/controller/stages/TestStateTransitionPriority.java b/helix-core/src/test/java/org/apache/helix/controller/stages/TestStateTransitionPriority.java
index 22c20f7..e457e31 100644
--- a/helix-core/src/test/java/org/apache/helix/controller/stages/TestStateTransitionPriority.java
+++ b/helix-core/src/test/java/org/apache/helix/controller/stages/TestStateTransitionPriority.java
@@ -85,6 +85,7 @@
     event.addAttribute(AttributeName.BEST_POSSIBLE_STATE.name(), bestPossibleStateOutput);
     event.addAttribute(AttributeName.MESSAGES_SELECTED.name(), messageSelectOutput);
     event.addAttribute(AttributeName.CURRENT_STATE.name(), currentStateOutput);
+    event.addAttribute(AttributeName.CURRENT_STATE_EXCLUDING_UNKNOWN.name(), currentStateOutput);
     runStage(event, new ReadClusterDataStage());
 
     // Keep update the current state.
@@ -133,6 +134,7 @@
     event.addAttribute(AttributeName.BEST_POSSIBLE_STATE.name(), bestPossibleStateOutput);
     event.addAttribute(AttributeName.MESSAGES_SELECTED.name(), messageSelectOutput);
     event.addAttribute(AttributeName.CURRENT_STATE.name(), currentStateOutput);
+    event.addAttribute(AttributeName.CURRENT_STATE_EXCLUDING_UNKNOWN.name(), currentStateOutput);
     event.addAttribute(AttributeName.ControllerDataProvider.name(),
         new ResourceControllerDataProvider());
     runStage(event, new ReadClusterDataStage());
@@ -194,6 +196,7 @@
     event.addAttribute(AttributeName.MESSAGES_SELECTED.name(),
         generateMessageMapForPartition(bestPossibleMap, currentStateMap, Collections.emptyList(), resourceName));
     event.addAttribute(AttributeName.CURRENT_STATE.name(), currentStateOutput);
+    event.addAttribute(AttributeName.CURRENT_STATE_EXCLUDING_UNKNOWN.name(), currentStateOutput);
     event.addAttribute(AttributeName.ControllerDataProvider.name(),
         new ResourceControllerDataProvider());
     runStage(event, new ReadClusterDataStage());
@@ -350,6 +353,7 @@
     resourcePriority.add(resourceName);
     currentStateOutput.setCurrentState(resourceName, partition, instanceName, state);
     event.addAttribute(AttributeName.CURRENT_STATE.name(), currentStateOutput);
+    event.addAttribute(AttributeName.CURRENT_STATE_EXCLUDING_UNKNOWN.name(), currentStateOutput);
   }
 
   private void updateCurrentStateForPartitionLevelPriority(List<String> partitionPriority,
diff --git a/helix-core/src/test/java/org/apache/helix/integration/TestAlertingRebalancerFailure.java b/helix-core/src/test/java/org/apache/helix/integration/TestAlertingRebalancerFailure.java
index 2f1dee2..1b2e54b 100644
--- a/helix-core/src/test/java/org/apache/helix/integration/TestAlertingRebalancerFailure.java
+++ b/helix-core/src/test/java/org/apache/helix/integration/TestAlertingRebalancerFailure.java
@@ -33,6 +33,7 @@
 import org.apache.helix.HelixDataAccessor;
 import org.apache.helix.PropertyKey;
 import org.apache.helix.TestHelper;
+import org.apache.helix.constants.InstanceConstants;
 import org.apache.helix.controller.rebalancer.strategy.CrushEdRebalanceStrategy;
 import org.apache.helix.controller.rebalancer.strategy.CrushRebalanceStrategy;
 import org.apache.helix.integration.common.ZkStandAloneCMTestBase;
@@ -297,7 +298,8 @@
   private void setInstanceEnable(String instanceName, boolean enabled,
       ConfigAccessor configAccessor) {
     InstanceConfig instanceConfig = configAccessor.getInstanceConfig(CLUSTER_NAME, instanceName);
-    instanceConfig.setInstanceEnabled(enabled);
+    instanceConfig.setInstanceOperation(enabled ? InstanceConstants.InstanceOperation.ENABLE
+        : InstanceConstants.InstanceOperation.DISABLE);
     configAccessor.setInstanceConfig(CLUSTER_NAME, instanceName, instanceConfig);
   }
 
diff --git a/helix-core/src/test/java/org/apache/helix/integration/TestDisableCustomCodeRunner.java b/helix-core/src/test/java/org/apache/helix/integration/TestDisableCustomCodeRunner.java
index a5416d4..ce3077d 100644
--- a/helix-core/src/test/java/org/apache/helix/integration/TestDisableCustomCodeRunner.java
+++ b/helix-core/src/test/java/org/apache/helix/integration/TestDisableCustomCodeRunner.java
@@ -32,6 +32,7 @@
 import org.apache.helix.PropertyKey;
 import org.apache.helix.TestHelper;
 import org.apache.helix.ZkUnitTestBase;
+import org.apache.helix.constants.InstanceConstants;
 import org.apache.helix.integration.manager.ClusterControllerManager;
 import org.apache.helix.integration.manager.MockParticipantManager;
 import org.apache.helix.manager.zk.ZKHelixAdmin;
@@ -170,7 +171,7 @@
     InstanceConfig instanceConfig = new InstanceConfig(fakeInstanceName);
     instanceConfig.setHostName("localhost");
     instanceConfig.setPort("10000");
-    instanceConfig.setInstanceEnabled(true);
+    instanceConfig.setInstanceOperation(InstanceConstants.InstanceOperation.ENABLE);
     admin.addInstance(clusterName, instanceConfig);
 
     LiveInstance fakeInstance = new LiveInstance(fakeInstanceName);
diff --git a/helix-core/src/test/java/org/apache/helix/integration/TestNoThrottleDisabledPartitions.java b/helix-core/src/test/java/org/apache/helix/integration/TestNoThrottleDisabledPartitions.java
index 98dd281..9cb248f 100644
--- a/helix-core/src/test/java/org/apache/helix/integration/TestNoThrottleDisabledPartitions.java
+++ b/helix-core/src/test/java/org/apache/helix/integration/TestNoThrottleDisabledPartitions.java
@@ -28,6 +28,7 @@
 import org.apache.helix.TestHelper;
 import org.apache.helix.api.config.StateTransitionThrottleConfig;
 import org.apache.helix.common.ZkTestBase;
+import org.apache.helix.constants.InstanceConstants;
 import org.apache.helix.integration.manager.ClusterControllerManager;
 import org.apache.helix.integration.manager.MockParticipantManager;
 import org.apache.helix.manager.zk.ZKHelixDataAccessor;
@@ -74,7 +75,7 @@
     // Disable instance 0 so that it will cause a partition to do a load balance
     PropertyKey key = _accessor.keyBuilder().instanceConfig(_participants[0].getInstanceName());
     InstanceConfig instanceConfig = _accessor.getProperty(key);
-    instanceConfig.setInstanceEnabled(false);
+    instanceConfig.setInstanceOperation(InstanceConstants.InstanceOperation.DISABLE);
     _accessor.setProperty(key, instanceConfig);
 
     // Resume the controller
@@ -234,7 +235,7 @@
     // Disable an instance so that it will not be subject to throttling
     PropertyKey key = _accessor.keyBuilder().instanceConfig(_participants[0].getInstanceName());
     InstanceConfig instanceConfig = _accessor.getProperty(key);
-    instanceConfig.setInstanceEnabled(false);
+    instanceConfig.setInstanceOperation(InstanceConstants.InstanceOperation.DISABLE);
     _accessor.setProperty(key, instanceConfig);
 
     // Set the state transition delay so that transitions would be processed slowly
diff --git a/helix-core/src/test/java/org/apache/helix/integration/controller/TestClusterMaintenanceMode.java b/helix-core/src/test/java/org/apache/helix/integration/controller/TestClusterMaintenanceMode.java
index ebfb03e..6654098 100644
--- a/helix-core/src/test/java/org/apache/helix/integration/controller/TestClusterMaintenanceMode.java
+++ b/helix-core/src/test/java/org/apache/helix/integration/controller/TestClusterMaintenanceMode.java
@@ -280,7 +280,7 @@
     Assert.assertEquals(maintenanceSignal.getTriggeringEntity(),
         MaintenanceSignal.TriggeringEntity.CONTROLLER);
     Assert.assertEquals(maintenanceSignal.getAutoTriggerReason(),
-        MaintenanceSignal.AutoTriggerReason.MAX_OFFLINE_INSTANCES_EXCEEDED);
+        MaintenanceSignal.AutoTriggerReason.MAX_INSTANCES_UNABLE_TO_ACCEPT_ONLINE_REPLICAS);
 
     // Bring up all instances
     for (int i = 0; i < 3; i++) {
@@ -306,7 +306,7 @@
     Assert.assertEquals(maintenanceSignal.getTriggeringEntity(),
         MaintenanceSignal.TriggeringEntity.CONTROLLER);
     Assert.assertEquals(maintenanceSignal.getAutoTriggerReason(),
-        MaintenanceSignal.AutoTriggerReason.MAX_OFFLINE_INSTANCES_EXCEEDED);
+        MaintenanceSignal.AutoTriggerReason.MAX_INSTANCES_UNABLE_TO_ACCEPT_ONLINE_REPLICAS);
 
     // Set the cluster config for auto-exiting maintenance mode
     ClusterConfig clusterConfig = _manager.getConfigAccessor().getClusterConfig(CLUSTER_NAME);
diff --git a/helix-core/src/test/java/org/apache/helix/integration/messaging/TestMessageThrottle2.java b/helix-core/src/test/java/org/apache/helix/integration/messaging/TestMessageThrottle2.java
index b11e635..a61ffea 100644
--- a/helix-core/src/test/java/org/apache/helix/integration/messaging/TestMessageThrottle2.java
+++ b/helix-core/src/test/java/org/apache/helix/integration/messaging/TestMessageThrottle2.java
@@ -41,6 +41,7 @@
 import org.apache.helix.NotificationContext;
 import org.apache.helix.PropertyKey.Builder;
 import org.apache.helix.TestHelper;
+import org.apache.helix.constants.InstanceConstants;
 import org.apache.helix.zookeeper.datamodel.ZNRecord;
 import org.apache.helix.common.ZkTestBase;
 import org.apache.helix.controller.HelixControllerMain;
@@ -379,7 +380,7 @@
       if (instanceConfig == null) {
         InstanceConfig config = new InstanceConfig(instanceName);
         config.setHostName("localhost");
-        config.setInstanceEnabled(true);
+        config.setInstanceOperation(InstanceConstants.InstanceOperation.ENABLE);
         echo("Adding InstanceConfig:" + config);
         admin.addInstance(_clusterName, config);
       }
diff --git a/helix-core/src/test/java/org/apache/helix/integration/rebalancer/CrushRebalancers/TestNodeSwap.java b/helix-core/src/test/java/org/apache/helix/integration/rebalancer/CrushRebalancers/TestNodeSwap.java
index 3b13868..fef7ea0 100644
--- a/helix-core/src/test/java/org/apache/helix/integration/rebalancer/CrushRebalancers/TestNodeSwap.java
+++ b/helix-core/src/test/java/org/apache/helix/integration/rebalancer/CrushRebalancers/TestNodeSwap.java
@@ -30,6 +30,7 @@
 import org.apache.helix.ConfigAccessor;
 import org.apache.helix.TestHelper;
 import org.apache.helix.common.ZkTestBase;
+import org.apache.helix.constants.InstanceConstants;
 import org.apache.helix.controller.rebalancer.strategy.CrushEdRebalanceStrategy;
 import org.apache.helix.controller.rebalancer.strategy.CrushRebalanceStrategy;
 import org.apache.helix.controller.rebalancer.strategy.MultiRoundCrushRebalanceStrategy;
@@ -170,7 +171,7 @@
     final InstanceConfig instanceConfig =
         _gSetupTool.getClusterManagementTool().getInstanceConfig(CLUSTER_NAME, oldParticipantName);
     // disable the node first
-    instanceConfig.setInstanceEnabled(false);
+    instanceConfig.setInstanceOperation(InstanceConstants.InstanceOperation.DISABLE);
     _gSetupTool.getClusterManagementTool().setInstanceConfig(CLUSTER_NAME, oldParticipantName,
         instanceConfig);
     Assert.assertTrue(_clusterVerifier.verify(10000));
diff --git a/helix-core/src/test/java/org/apache/helix/integration/rebalancer/PartitionMigration/TestExpandCluster.java b/helix-core/src/test/java/org/apache/helix/integration/rebalancer/PartitionMigration/TestExpandCluster.java
index 15b66af..cd5338e 100644
--- a/helix-core/src/test/java/org/apache/helix/integration/rebalancer/PartitionMigration/TestExpandCluster.java
+++ b/helix-core/src/test/java/org/apache/helix/integration/rebalancer/PartitionMigration/TestExpandCluster.java
@@ -21,6 +21,7 @@
 
 import java.util.Map;
 
+import org.apache.helix.constants.InstanceConstants;
 import org.apache.helix.integration.manager.MockParticipantManager;
 import org.apache.helix.model.ClusterConfig;
 import org.apache.helix.model.IdealState;
diff --git a/helix-core/src/test/java/org/apache/helix/integration/rebalancer/TestInstanceOperation.java b/helix-core/src/test/java/org/apache/helix/integration/rebalancer/TestInstanceOperation.java
index 1fc3a3e..85600c0 100644
--- a/helix-core/src/test/java/org/apache/helix/integration/rebalancer/TestInstanceOperation.java
+++ b/helix-core/src/test/java/org/apache/helix/integration/rebalancer/TestInstanceOperation.java
@@ -99,7 +99,7 @@
   List<String> _participantNames = new ArrayList<>();
   private Set<String> _allDBs = new HashSet<>();
   private ZkHelixClusterVerifier _clusterVerifier;
-  private ZkHelixClusterVerifier _bestPossibleClusterVerifier;
+  private BestPossibleExternalViewVerifier _bestPossibleClusterVerifier;
   private ConfigAccessor _configAccessor;
   private long _stateModelDelay = 3L;
 
@@ -204,7 +204,7 @@
     Assert.assertTrue(_clusterVerifier.verifyByPolling());
   }
 
-  private void removeOfflineOrDisabledOrSwapInInstances() {
+  private void removeOfflineOrInactiveInstances() {
     // Remove all instances that are not live, disabled, or in SWAP_IN state.
     for (int i = 0; i < _participants.size(); i++) {
       String participantName = _participantNames.get(i);
@@ -212,7 +212,7 @@
           _gSetupTool.getClusterManagementTool().getInstanceConfig(CLUSTER_NAME, participantName);
       if (!_participants.get(i).isConnected() || !instanceConfig.getInstanceEnabled()
           || instanceConfig.getInstanceOperation()
-          .equals(InstanceConstants.InstanceOperation.SWAP_IN.name())) {
+          .equals(InstanceConstants.InstanceOperation.SWAP_IN)) {
         if (_participants.get(i).isConnected()) {
           _participants.get(i).syncStop();
         }
@@ -268,11 +268,6 @@
     _gSetupTool.dropResourceFromCluster(CLUSTER_NAME, semiAutoDB);
     Assert.assertTrue(_clusterVerifier.verifyByPolling());
 
-    // Disable, stop, and drop the instance from the cluster.
-    _gSetupTool.getClusterManagementTool().enableInstance(CLUSTER_NAME, instanceToEvacuate, false);
-    _participants.get(0).syncStop();
-    removeOfflineOrDisabledOrSwapInInstances();
-
     // Compare the current ev with the previous one, it should be exactly the same since the baseline should not change
     // after the instance is dropped.
     Assert.assertTrue(_clusterVerifier.verifyByPolling());
@@ -284,9 +279,12 @@
     System.out.println("START TestInstanceOperation.testRevertEvacuation() at " + new Date(System.currentTimeMillis()));
     // revert an evacuate instance
     String instanceToEvacuate = _participants.get(0).getInstanceName();
-    _gSetupTool.getClusterManagementTool()
-        .setInstanceOperation(CLUSTER_NAME, instanceToEvacuate, null);
+    _gSetupTool.getClusterManagementTool().setInstanceOperation(CLUSTER_NAME, instanceToEvacuate,
+        InstanceConstants.InstanceOperation.ENABLE);
 
+    Assert.assertTrue(
+        _gSetupTool.getClusterManagementTool().getInstanceConfig(CLUSTER_NAME, instanceToEvacuate)
+            .getInstanceEnabled());
     Assert.assertTrue(_clusterVerifier.verifyByPolling());
 
     // EV should contain all participants, check resources one by one
@@ -302,10 +300,12 @@
     System.out.println("START TestInstanceOperation.testAddingNodeWithEvacuationTag() at " + new Date(System.currentTimeMillis()));
     // first disable and instance, and wait for all replicas to be moved out
     String mockNewInstance = _participants.get(0).getInstanceName();
+    // This is using a deprecated method to ensure that the disabling still takes precedence over the InstanceOperation when being set
+    // to false.
     _gSetupTool.getClusterManagementTool()
         .enableInstance(CLUSTER_NAME, mockNewInstance, false);
     Assert.assertTrue(_clusterVerifier.verifyByPolling());
-    //ev should contain all instances but the disabled one
+    // ev should contain all instances but the disabled one
     Map<String, ExternalView> assignment = getEVs();
     List<String> currentActiveInstances =
         _participantNames.stream().filter(n -> !n.equals(mockNewInstance)).collect(Collectors.toList());
@@ -317,10 +317,13 @@
     }
 
     // add evacuate tag and enable instance
+    // Because HELIX_ENABLED is set to false, getInstanceOperation still returns DISABLE
     _gSetupTool.getClusterManagementTool()
         .setInstanceOperation(CLUSTER_NAME, mockNewInstance, InstanceConstants.InstanceOperation.EVACUATE);
-    _gSetupTool.getClusterManagementTool()
-        .enableInstance(CLUSTER_NAME, mockNewInstance, true);
+
+    // enable instance so InstanceOperation is no longer overriden with DISABLE
+    _gSetupTool.getClusterManagementTool().enableInstance(CLUSTER_NAME, mockNewInstance, true);
+
     //ev should be the same
     assignment = getEVs();
     currentActiveInstances =
@@ -347,84 +350,73 @@
     }
   }
 
-  @Test(expectedExceptions = HelixException.class, dependsOnMethods = "testAddingNodeWithEvacuationTag")
+  @Test(dependsOnMethods = "testAddingNodeWithEvacuationTag")
   public void testNodeSwapNoTopologySetup() throws Exception {
     System.out.println("START TestInstanceOperation.testNodeSwapNoTopologySetup() at " + new Date(
         System.currentTimeMillis()));
-    removeOfflineOrDisabledOrSwapInInstances();
+    removeOfflineOrInactiveInstances();
 
-    // Set instance's InstanceOperation to SWAP_OUT
     String instanceToSwapOutName = _participants.get(0).getInstanceName();
-    _gSetupTool.getClusterManagementTool().setInstanceOperation(CLUSTER_NAME, instanceToSwapOutName,
-        InstanceConstants.InstanceOperation.SWAP_OUT);
 
-    // Add instance with InstanceOperation set to SWAP_IN
-    // There should be an error that the logicalId does not have SWAP_OUT instance because,
-    // helix can't determine what topology key to use to get the logicalId if TOPOLOGY is not set.
+    // Add instance with InstanceOperation set to SWAP_IN as default
+    // The instance will be added with UNKNOWN because the logicalId will not match the
+    // swap out instance since the topology configs are not set.
     String instanceToSwapInName = PARTICIPANT_PREFIX + "_" + _nextStartPort;
     InstanceConfig instanceToSwapOutInstanceConfig = _gSetupTool.getClusterManagementTool()
         .getInstanceConfig(CLUSTER_NAME, instanceToSwapOutName);
     addParticipant(instanceToSwapInName, instanceToSwapOutInstanceConfig.getLogicalId(LOGICAL_ID),
         instanceToSwapOutInstanceConfig.getDomainAsMap().get(ZONE),
-        InstanceConstants.InstanceOperation.SWAP_IN, true, -1);
+        InstanceConstants.InstanceOperation.SWAP_IN, -1);
+
+    Assert.assertEquals(
+        _gSetupTool.getClusterManagementTool().getInstanceConfig(CLUSTER_NAME, instanceToSwapInName)
+            .getInstanceOperation(), InstanceConstants.InstanceOperation.UNKNOWN);
   }
 
-  @Test(expectedExceptions = HelixException.class, dependsOnMethods = "testNodeSwapNoTopologySetup")
-  public void testAddingNodeWithSwapOutInstanceOperation() throws Exception {
+  @Test(dependsOnMethods = "testNodeSwapNoTopologySetup")
+  public void testAddingNodeWithEnableInstanceOperation() throws Exception {
     System.out.println(
-        "START TestInstanceOperation.testAddingNodeWithSwapOutInstanceOperation() at " + new Date(
+        "START TestInstanceOperation.testAddingNodeWithEnableInstanceOperation() at " + new Date(
             System.currentTimeMillis()));
 
     enabledTopologyAwareRebalance();
-    removeOfflineOrDisabledOrSwapInInstances();
+    removeOfflineOrInactiveInstances();
 
-    // Set instance's InstanceOperation to SWAP_OUT
     String instanceToSwapOutName = _participants.get(0).getInstanceName();
     InstanceConfig instanceToSwapOutInstanceConfig = _gSetupTool.getClusterManagementTool()
         .getInstanceConfig(CLUSTER_NAME, instanceToSwapOutName);
-    _gSetupTool.getClusterManagementTool().setInstanceOperation(CLUSTER_NAME, instanceToSwapOutName,
-        InstanceConstants.InstanceOperation.SWAP_OUT);
 
-    // Add instance with InstanceOperation set to SWAP_IN
+    // Add instance with InstanceOperation set to ENABLE
+    // The instance should be added with UNKNOWN since there is already an instance with
+    // the same logicalId in the cluster and this instance is not being set to SWAP_IN when
+    // added.
     String instanceToSwapInName = PARTICIPANT_PREFIX + "_" + _nextStartPort;
     addParticipant(instanceToSwapInName, instanceToSwapOutInstanceConfig.getLogicalId(LOGICAL_ID),
         instanceToSwapOutInstanceConfig.getDomainAsMap().get(ZONE),
-        InstanceConstants.InstanceOperation.SWAP_OUT, true, -1);
+        InstanceConstants.InstanceOperation.ENABLE, -1);
+
+    Assert.assertEquals(
+        _gSetupTool.getClusterManagementTool().getInstanceConfig(CLUSTER_NAME, instanceToSwapInName)
+            .getInstanceOperation(), InstanceConstants.InstanceOperation.UNKNOWN);
   }
 
-  @Test(expectedExceptions = HelixException.class, dependsOnMethods = "testAddingNodeWithSwapOutInstanceOperation")
-  public void testAddingNodeWithSwapOutNodeInstanceOperationUnset() throws Exception {
-    System.out.println(
-        "START TestInstanceOperation.testAddingNodeWithSwapOutNodeInstanceOperationUnset() at "
-            + new Date(System.currentTimeMillis()));
-
-    removeOfflineOrDisabledOrSwapInInstances();
-
-    // Set instance's InstanceOperation to null
-    String instanceToSwapOutName = _participants.get(0).getInstanceName();
-    InstanceConfig instanceToSwapOutInstanceConfig = _gSetupTool.getClusterManagementTool()
-        .getInstanceConfig(CLUSTER_NAME, instanceToSwapOutName);
-    _gSetupTool.getClusterManagementTool()
-        .setInstanceOperation(CLUSTER_NAME, instanceToSwapOutName, null);
-
-    // Add instance with InstanceOperation set to SWAP_IN
-    String instanceToSwapInName = PARTICIPANT_PREFIX + "_" + _nextStartPort;
-    addParticipant(instanceToSwapInName, instanceToSwapOutInstanceConfig.getLogicalId(LOGICAL_ID),
-        instanceToSwapOutInstanceConfig.getDomainAsMap().get(ZONE),
-        InstanceConstants.InstanceOperation.SWAP_IN, true, -1);
-  }
-
-  @Test(expectedExceptions = HelixException.class, dependsOnMethods = "testAddingNodeWithSwapOutNodeInstanceOperationUnset")
+  @Test(dependsOnMethods = "testAddingNodeWithEnableInstanceOperation")
   public void testNodeSwapWithNoSwapOutNode() throws Exception {
     System.out.println("START TestInstanceOperation.testNodeSwapWithNoSwapOutNode() at " + new Date(
         System.currentTimeMillis()));
 
-    removeOfflineOrDisabledOrSwapInInstances();
+    removeOfflineOrInactiveInstances();
 
     // Add new instance with InstanceOperation set to SWAP_IN
+    // The instance should be added with UNKNOWN since there is not an instance with a matching
+    // logicalId in the cluster to swap with.
     String instanceToSwapInName = PARTICIPANT_PREFIX + "_" + _nextStartPort;
     addParticipant(instanceToSwapInName, "1000", "zone_1000",
-        InstanceConstants.InstanceOperation.SWAP_IN, true, -1);
+        InstanceConstants.InstanceOperation.SWAP_IN, -1);
+
+    Assert.assertEquals(
+        _gSetupTool.getClusterManagementTool().getInstanceConfig(CLUSTER_NAME, instanceToSwapInName)
+            .getInstanceOperation(), InstanceConstants.InstanceOperation.UNKNOWN);
   }
 
   @Test(dependsOnMethods = "testNodeSwapWithNoSwapOutNode")
@@ -433,21 +425,27 @@
         "START TestInstanceOperation.testNodeSwapSwapInNodeNoInstanceOperationEnabled() at "
             + new Date(System.currentTimeMillis()));
 
-    removeOfflineOrDisabledOrSwapInInstances();
+    removeOfflineOrInactiveInstances();
 
-    // Set instance's InstanceOperation to SWAP_OUT
     String instanceToSwapOutName = _participants.get(0).getInstanceName();
     InstanceConfig instanceToSwapOutInstanceConfig = _gSetupTool.getClusterManagementTool()
         .getInstanceConfig(CLUSTER_NAME, instanceToSwapOutName);
-    _gSetupTool.getClusterManagementTool().setInstanceOperation(CLUSTER_NAME, instanceToSwapOutName,
-        InstanceConstants.InstanceOperation.SWAP_OUT);
 
-    // Add instance with same logicalId with InstanceOperation unset
-    // This should work because adding instance with InstanceOperation unset will automatically
-    // set the InstanceOperation to SWAP_IN.
+    // Add instance with same logicalId with InstanceOperation unset, this is the same as default
+    // which is ENABLE.
+    // The instance should be set to UNKNOWN since there is already a matching logicalId in the cluster.
     String instanceToSwapInName = PARTICIPANT_PREFIX + "_" + _nextStartPort;
     addParticipant(instanceToSwapInName, instanceToSwapOutInstanceConfig.getLogicalId(LOGICAL_ID),
-        instanceToSwapOutInstanceConfig.getDomainAsMap().get(ZONE), null, true, -1);
+        instanceToSwapOutInstanceConfig.getDomainAsMap().get(ZONE), null, -1);
+
+    Assert.assertEquals(
+        _gSetupTool.getClusterManagementTool().getInstanceConfig(CLUSTER_NAME, instanceToSwapInName)
+            .getInstanceOperation(), InstanceConstants.InstanceOperation.UNKNOWN);
+
+    // Setting the InstanceOperation to SWAP_IN should work because there is a matching logicalId in
+    // the cluster and the InstanceCapacityWeights and FaultZone match.
+    _gSetupTool.getClusterManagementTool().setInstanceOperation(CLUSTER_NAME, instanceToSwapInName,
+        InstanceConstants.InstanceOperation.SWAP_IN);
 
     Assert.assertTrue(_bestPossibleClusterVerifier.verifyByPolling());
     Assert.assertTrue(_gSetupTool.getClusterManagementTool()
@@ -461,20 +459,17 @@
         "START TestInstanceOperation.testNodeSwapSwapInNodeWithAlreadySwappingPair() at "
             + new Date(System.currentTimeMillis()));
 
-    removeOfflineOrDisabledOrSwapInInstances();
+    removeOfflineOrInactiveInstances();
 
-    // Set instance's InstanceOperation to SWAP_OUT
     String instanceToSwapOutName = _participants.get(0).getInstanceName();
     InstanceConfig instanceToSwapOutInstanceConfig = _gSetupTool.getClusterManagementTool()
         .getInstanceConfig(CLUSTER_NAME, instanceToSwapOutName);
-    _gSetupTool.getClusterManagementTool().setInstanceOperation(CLUSTER_NAME, instanceToSwapOutName,
-        InstanceConstants.InstanceOperation.SWAP_OUT);
 
     // Add instance with InstanceOperation set to SWAP_IN
     String instanceToSwapInName = PARTICIPANT_PREFIX + "_" + _nextStartPort;
     addParticipant(instanceToSwapInName, instanceToSwapOutInstanceConfig.getLogicalId(LOGICAL_ID),
         instanceToSwapOutInstanceConfig.getDomainAsMap().get(ZONE),
-        InstanceConstants.InstanceOperation.SWAP_IN, true, -1);
+        InstanceConstants.InstanceOperation.SWAP_IN, -1);
 
     // Add another instance with InstanceOperation set to SWAP_IN with same logicalId as previously
     // added SWAP_IN instance.
@@ -482,88 +477,70 @@
     addParticipant(secondInstanceToSwapInName,
         instanceToSwapOutInstanceConfig.getLogicalId(LOGICAL_ID),
         instanceToSwapOutInstanceConfig.getDomainAsMap().get(ZONE),
-        InstanceConstants.InstanceOperation.SWAP_IN, true, -1);
+        InstanceConstants.InstanceOperation.SWAP_IN, -1);
+
+    // Instance should be UNKNOWN since there was already a swapping pair.
+    Assert.assertEquals(_gSetupTool.getClusterManagementTool()
+            .getInstanceConfig(CLUSTER_NAME, secondInstanceToSwapInName).getInstanceOperation(),
+        InstanceConstants.InstanceOperation.UNKNOWN);
+
+    // Try to set the InstanceOperation to SWAP_IN, it should throw an exception since there is already
+    // a swapping pair.
+    _gSetupTool.getClusterManagementTool()
+        .setInstanceOperation(CLUSTER_NAME, secondInstanceToSwapInName,
+            InstanceConstants.InstanceOperation.SWAP_IN);
   }
 
-  @Test(expectedExceptions = HelixException.class, dependsOnMethods = "testNodeSwapSwapInNodeWithAlreadySwappingPair")
-  public void testNodeSwapWrongFaultZone() throws Exception {
-    System.out.println("START TestInstanceOperation.testNodeSwapWrongFaultZone() at " + new Date(
-        System.currentTimeMillis()));
-    removeOfflineOrDisabledOrSwapInInstances();
-
-    // Set instance's InstanceOperation to SWAP_OUT
-    String instanceToSwapOutName = _participants.get(0).getInstanceName();
-    _gSetupTool.getClusterManagementTool().setInstanceOperation(CLUSTER_NAME, instanceToSwapOutName,
-        InstanceConstants.InstanceOperation.SWAP_OUT);
-
-    // Add instance with InstanceOperation set to SWAP_IN
-    // There should be an error because SWAP_IN instance must be in the same FAULT_ZONE as the SWAP_OUT instance.
-    String instanceToSwapInName = PARTICIPANT_PREFIX + "_" + _nextStartPort;
-    InstanceConfig instanceToSwapOutInstanceConfig = _gSetupTool.getClusterManagementTool()
-        .getInstanceConfig(CLUSTER_NAME, instanceToSwapOutName);
-    addParticipant(instanceToSwapInName, instanceToSwapOutInstanceConfig.getLogicalId(LOGICAL_ID),
-        instanceToSwapOutInstanceConfig.getDomainAsMap().get(ZONE) + "1",
-        InstanceConstants.InstanceOperation.SWAP_IN, true, -1);
-  }
-
-  @Test(expectedExceptions = HelixException.class, dependsOnMethods = "testNodeSwapWrongFaultZone")
-  public void testNodeSwapWrongCapacity() throws Exception {
-    System.out.println("START TestInstanceOperation.testNodeSwapWrongCapacity() at " + new Date(
-        System.currentTimeMillis()));
-    removeOfflineOrDisabledOrSwapInInstances();
-
-    // Set instance's InstanceOperation to SWAP_OUT
-    String instanceToSwapOutName = _participants.get(0).getInstanceName();
-    _gSetupTool.getClusterManagementTool().setInstanceOperation(CLUSTER_NAME, instanceToSwapOutName,
-        InstanceConstants.InstanceOperation.SWAP_OUT);
-
-    // Add instance with InstanceOperation set to SWAP_IN
-    // There should be an error because SWAP_IN instance must have same capacity as the SWAP_OUT node.
-    String instanceToSwapInName = PARTICIPANT_PREFIX + "_" + _nextStartPort;
-    InstanceConfig instanceToSwapOutInstanceConfig = _gSetupTool.getClusterManagementTool()
-        .getInstanceConfig(CLUSTER_NAME, instanceToSwapOutName);
-    addParticipant(instanceToSwapInName, instanceToSwapOutInstanceConfig.getLogicalId(LOGICAL_ID),
-        instanceToSwapOutInstanceConfig.getDomainAsMap().get(ZONE),
-        InstanceConstants.InstanceOperation.SWAP_IN, true, TEST_CAPACITY_VALUE - 10);
-  }
-
-  @Test(dependsOnMethods = "testNodeSwapWrongCapacity")
+  @Test(dependsOnMethods = "testNodeSwapSwapInNodeWithAlreadySwappingPair")
   public void testNodeSwap() throws Exception {
     System.out.println(
         "START TestInstanceOperation.testNodeSwap() at " + new Date(System.currentTimeMillis()));
-    removeOfflineOrDisabledOrSwapInInstances();
+
+    removeOfflineOrInactiveInstances();
+
+    Map<String, String> swapOutInstancesToSwapInInstances = new HashMap<>();
+
+    String instanceToSwapOutName = _participants.get(0).getInstanceName();
+    InstanceConfig instanceToSwapOutInstanceConfig = _gSetupTool.getClusterManagementTool()
+        .getInstanceConfig(CLUSTER_NAME, instanceToSwapOutName);
+
+    Assert.assertTrue(_clusterVerifier.verifyByPolling());
+
+    String resourceToDisablePartition = _allDBs.iterator().next();
+    // Disable 1 partition that is assigned to the instance that will be swapped out.
+    getPartitionsAndStatesOnInstance(getEVs(), instanceToSwapOutName).entrySet().stream()
+        .filter(entry -> entry.getKey().startsWith(resourceToDisablePartition)).findFirst()
+        .ifPresent(entry -> {
+          String partition = entry.getKey();
+          instanceToSwapOutInstanceConfig.setInstanceEnabledForPartition(resourceToDisablePartition,
+              partition, false);
+        });
+    _gSetupTool.getClusterManagementTool()
+        .setInstanceConfig(CLUSTER_NAME, instanceToSwapOutName, instanceToSwapOutInstanceConfig);
+
+    Assert.assertTrue(_clusterVerifier.verifyByPolling());
 
     // Store original EV
     Map<String, ExternalView> originalEVs = getEVs();
 
-    Map<String, String> swapOutInstancesToSwapInInstances = new HashMap<>();
-
-    // Set instance's InstanceOperation to SWAP_OUT
-    String instanceToSwapOutName = _participants.get(0).getInstanceName();
-    InstanceConfig instanceToSwapOutInstanceConfig = _gSetupTool.getClusterManagementTool()
-        .getInstanceConfig(CLUSTER_NAME, instanceToSwapOutName);
-    _gSetupTool.getClusterManagementTool().setInstanceOperation(CLUSTER_NAME, instanceToSwapOutName,
-        InstanceConstants.InstanceOperation.SWAP_OUT);
-
-    // Validate that the assignment has not changed since setting the InstanceOperation to SWAP_OUT
-    Assert.assertTrue(_clusterVerifier.verifyByPolling());
     validateEVsCorrect(getEVs(), originalEVs, swapOutInstancesToSwapInInstances,
         Collections.emptySet(), Collections.emptySet());
 
+    // Create a custom change listener to check if the throttles are enabled after the swap is completed.
     CustomIndividualInstanceConfigChangeListener instanceToSwapInInstanceConfigListener =
         new CustomIndividualInstanceConfigChangeListener();
+
     // Add instance with InstanceOperation set to SWAP_IN
     String instanceToSwapInName = PARTICIPANT_PREFIX + "_" + _nextStartPort;
     swapOutInstancesToSwapInInstances.put(instanceToSwapOutName, instanceToSwapInName);
     addParticipant(instanceToSwapInName, instanceToSwapOutInstanceConfig.getLogicalId(LOGICAL_ID),
         instanceToSwapOutInstanceConfig.getDomainAsMap().get(ZONE),
-        InstanceConstants.InstanceOperation.SWAP_IN, true, -1, instanceToSwapInInstanceConfigListener);
+        InstanceConstants.InstanceOperation.SWAP_IN, -1, instanceToSwapInInstanceConfigListener);
 
+    // Validate that the throttles are off since the InstanceOperation is set to SWAP_IN
     Assert.assertFalse(instanceToSwapInInstanceConfigListener.isThrottlesEnabled());
 
-    // Validate that partitions on SWAP_OUT instance does not change after setting the InstanceOperation to SWAP_OUT
-    // and adding the SWAP_IN instance to the cluster.
-    // Check that the SWAP_IN instance has the same partitions as the SWAP_OUT instance
+    // Check that the SWAP_IN instance has the same partitions as the swap out instance
     // but none of them are in a top state.
     Assert.assertTrue(_bestPossibleClusterVerifier.verifyByPolling());
     validateEVsCorrect(getEVs(), originalEVs, swapOutInstancesToSwapInInstances,
@@ -573,7 +550,7 @@
     Assert.assertTrue(_gSetupTool.getClusterManagementTool()
         .canCompleteSwap(CLUSTER_NAME, instanceToSwapOutName));
 
-    // Validate that the SWAP_OUT instance is in routing tables and SWAP_IN is not.
+    // Validate that the swap out instance is in routing tables and SWAP_IN is not.
     validateRoutingTablesInstance(getEVs(), instanceToSwapOutName, true);
     validateRoutingTablesInstance(getEVs(), instanceToSwapInName, false);
 
@@ -581,20 +558,31 @@
     Assert.assertTrue(_gSetupTool.getClusterManagementTool()
         .completeSwapIfPossible(CLUSTER_NAME, instanceToSwapOutName, false));
 
+    // Get both instanceConfigs and make sure correct fields are copied over.
+    InstanceConfig instanceToSwapInInstanceConfig = _gSetupTool.getClusterManagementTool()
+        .getInstanceConfig(CLUSTER_NAME, instanceToSwapInName);
+
+    Assert.assertEquals(instanceToSwapInInstanceConfig.getRecord()
+            .getMapField(InstanceConfig.InstanceConfigProperty.HELIX_DISABLED_PARTITION.name()),
+        instanceToSwapInInstanceConfig.getRecord()
+            .getMapField(InstanceConfig.InstanceConfigProperty.HELIX_DISABLED_PARTITION.name()));
+
     Assert.assertTrue(_clusterVerifier.verifyByPolling());
 
     // Validate that the SWAP_IN instance is now in the routing tables.
     validateRoutingTablesInstance(getEVs(), instanceToSwapInName, true);
 
-
-    // Assert that SWAP_OUT instance is disabled and has no partitions assigned to it.
+    // Assert that swap out instance is not active and has no partitions assigned to it.
     Assert.assertFalse(_gSetupTool.getClusterManagementTool()
         .getInstanceConfig(CLUSTER_NAME, instanceToSwapOutName).getInstanceEnabled());
+    Assert.assertEquals(_gSetupTool.getClusterManagementTool()
+            .getInstanceConfig(CLUSTER_NAME, instanceToSwapOutName).getInstanceOperation(),
+        InstanceConstants.InstanceOperation.UNKNOWN);
 
     // Check to make sure the throttle was enabled again after the swap was completed.
     Assert.assertTrue(instanceToSwapInInstanceConfigListener.isThrottlesEnabled());
 
-    // Validate that the SWAP_IN instance has the same partitions the SWAP_OUT instance had before
+    // Validate that the SWAP_IN instance has the same partitions the swap out instance had before
     // swap was completed.
     verifier(() -> (validateEVsCorrect(getEVs(), originalEVs, swapOutInstancesToSwapInInstances,
         Collections.emptySet(), ImmutableSet.of(instanceToSwapInName))), TIMEOUT);
@@ -604,21 +592,18 @@
   public void testNodeSwapDisableAndReenable() throws Exception {
     System.out.println(
         "START TestInstanceOperation.testNodeSwap() at " + new Date(System.currentTimeMillis()));
-    removeOfflineOrDisabledOrSwapInInstances();
+    removeOfflineOrInactiveInstances();
 
     // Store original EV
     Map<String, ExternalView> originalEVs = getEVs();
 
     Map<String, String> swapOutInstancesToSwapInInstances = new HashMap<>();
 
-    // Set instance's InstanceOperation to SWAP_OUT
     String instanceToSwapOutName = _participants.get(0).getInstanceName();
     InstanceConfig instanceToSwapOutInstanceConfig = _gSetupTool.getClusterManagementTool()
         .getInstanceConfig(CLUSTER_NAME, instanceToSwapOutName);
-    _gSetupTool.getClusterManagementTool().setInstanceOperation(CLUSTER_NAME, instanceToSwapOutName,
-        InstanceConstants.InstanceOperation.SWAP_OUT);
 
-    // Validate that the assignment has not changed since setting the InstanceOperation to SWAP_OUT
+    // Validate that the assignment has not changed since setting the InstanceOperation to swap out
     Assert.assertTrue(_clusterVerifier.verifyByPolling());
     validateEVsCorrect(getEVs(), originalEVs, swapOutInstancesToSwapInInstances,
         Collections.emptySet(), Collections.emptySet());
@@ -628,11 +613,9 @@
     swapOutInstancesToSwapInInstances.put(instanceToSwapOutName, instanceToSwapInName);
     addParticipant(instanceToSwapInName, instanceToSwapOutInstanceConfig.getLogicalId(LOGICAL_ID),
         instanceToSwapOutInstanceConfig.getDomainAsMap().get(ZONE),
-        InstanceConstants.InstanceOperation.SWAP_IN, true, -1);
+        InstanceConstants.InstanceOperation.SWAP_IN, -1);
 
-    // Validate that partitions on SWAP_OUT instance does not change after setting the InstanceOperation to SWAP_OUT
-    // and adding the SWAP_IN instance to the cluster.
-    // Check that the SWAP_IN instance has the same partitions as the SWAP_OUT instance
+    // Check that the SWAP_IN instance has the same partitions as the swap out instance
     // but none of them are in a top state.
     Assert.assertTrue(_bestPossibleClusterVerifier.verifyByPolling());
     validateEVsCorrect(getEVs(), originalEVs, swapOutInstancesToSwapInInstances,
@@ -642,12 +625,12 @@
     Assert.assertTrue(_gSetupTool.getClusterManagementTool()
         .canCompleteSwap(CLUSTER_NAME, instanceToSwapOutName));
 
-    // Disable the SWAP_IN instance
+    // Try to disable the swap out instance, it should not do anything.
     _gSetupTool.getClusterManagementTool()
-        .enableInstance(CLUSTER_NAME, instanceToSwapInName, false);
+        .enableInstance(CLUSTER_NAME, instanceToSwapOutName, false);
 
     // Check that the SWAP_IN instance's replicas match the SWAP_OUT instance's replicas
-    // but all of them are OFFLINE
+    // and all of them are OFFLINE.
     Assert.assertTrue(_bestPossibleClusterVerifier.verifyByPolling());
     Map<String, Map<String, String>> resourcePartitionStateOnSwapOutInstance =
         getResourcePartitionStateOnInstance(getEVs(), instanceToSwapOutName);
@@ -658,14 +641,27 @@
             .collect(Collectors.toSet()),
         resourcePartitionStateOnSwapOutInstance.values().stream().flatMap(p -> p.keySet().stream())
             .collect(Collectors.toSet()));
+    Set<String> swapOutInstancePartitionStates =
+        resourcePartitionStateOnSwapOutInstance.values().stream().flatMap(e -> e.values().stream())
+            .collect(Collectors.toSet());
+    Assert.assertEquals(swapOutInstancePartitionStates.size(), 1);
+    Assert.assertTrue(swapOutInstancePartitionStates.contains("OFFLINE"));
     Set<String> swapInInstancePartitionStates =
         resourcePartitionStateOnSwapInInstance.values().stream().flatMap(e -> e.values().stream())
             .collect(Collectors.toSet());
     Assert.assertEquals(swapInInstancePartitionStates.size(), 1);
     Assert.assertTrue(swapInInstancePartitionStates.contains("OFFLINE"));
 
-    // Re-enable the SWAP_IN instance
-    _gSetupTool.getClusterManagementTool().enableInstance(CLUSTER_NAME, instanceToSwapInName, true);
+    // Validate that the SWAP_OUT instance is in routing tables and SWAP_IN is not.
+    validateRoutingTablesInstance(getEVs(), instanceToSwapOutName, true);
+    validateRoutingTablesInstance(getEVs(), instanceToSwapInName, false);
+
+    // Assert canSwapBeCompleted is true
+    Assert.assertTrue(_gSetupTool.getClusterManagementTool()
+        .canCompleteSwap(CLUSTER_NAME, instanceToSwapOutName));
+
+    // Re-enable the swap out instance
+    _gSetupTool.getClusterManagementTool().enableInstance(CLUSTER_NAME, instanceToSwapOutName, true);
     Assert.assertTrue(_bestPossibleClusterVerifier.verifyByPolling());
 
     // Validate that the SWAP_OUT instance is in routing tables and SWAP_IN is not.
@@ -681,63 +677,61 @@
     // Validate that the SWAP_IN instance is now in the routing tables.
     validateRoutingTablesInstance(getEVs(), instanceToSwapInName, true);
 
-    // Assert that SWAP_OUT instance is disabled and has no partitions assigned to it.
+    // Assert that swap out instance is not active and has no partitions assigned to it.
     Assert.assertFalse(_gSetupTool.getClusterManagementTool()
         .getInstanceConfig(CLUSTER_NAME, instanceToSwapOutName).getInstanceEnabled());
+    Assert.assertEquals(_gSetupTool.getClusterManagementTool()
+            .getInstanceConfig(CLUSTER_NAME, instanceToSwapOutName).getInstanceOperation(),
+        InstanceConstants.InstanceOperation.UNKNOWN);
 
-    // Validate that the SWAP_IN instance has the same partitions the SWAP_OUT instance had before
+    // Validate that the SWAP_IN instance has the same partitions the swap out instance had before
     // swap was completed.
     verifier(() -> (validateEVsCorrect(getEVs(), originalEVs, swapOutInstancesToSwapInInstances,
         Collections.emptySet(), ImmutableSet.of(instanceToSwapInName))), TIMEOUT);
   }
 
   @Test(dependsOnMethods = "testNodeSwapDisableAndReenable")
-  public void testNodeSwapSwapInNodeNoInstanceOperationDisabled() throws Exception {
-    System.out.println(
-        "START TestInstanceOperation.testNodeSwapSwapInNodeNoInstanceOperationDisabled() at "
+  public void testNodeSwapSwapInNodeNoInstanceOperation() throws Exception {
+    System.out.println("START TestInstanceOperation.testNodeSwapSwapInNodeNoInstanceOperation() at "
             + new Date(System.currentTimeMillis()));
 
-    removeOfflineOrDisabledOrSwapInInstances();
+    removeOfflineOrInactiveInstances();
 
     // Store original EVs
     Map<String, ExternalView> originalEVs = getEVs();
 
     Map<String, String> swapOutInstancesToSwapInInstances = new HashMap<>();
 
-    // Set instance's InstanceOperation to SWAP_OUT
     String instanceToSwapOutName = _participants.get(0).getInstanceName();
     InstanceConfig instanceToSwapOutInstanceConfig = _gSetupTool.getClusterManagementTool()
         .getInstanceConfig(CLUSTER_NAME, instanceToSwapOutName);
-    _gSetupTool.getClusterManagementTool().setInstanceOperation(CLUSTER_NAME, instanceToSwapOutName,
-        InstanceConstants.InstanceOperation.SWAP_OUT);
 
-    // Validate that the assignment has not changed since setting the InstanceOperation to SWAP_OUT
     Assert.assertTrue(_clusterVerifier.verifyByPolling());
     validateEVsCorrect(getEVs(), originalEVs, swapOutInstancesToSwapInInstances,
         Collections.emptySet(), Collections.emptySet());
 
-    // Add instance with InstanceOperation unset, should automatically be set to SWAP_IN
+    // Add instance with InstanceOperation unset, should set to UNKNOWN.
     String instanceToSwapInName = PARTICIPANT_PREFIX + "_" + _nextStartPort;
     swapOutInstancesToSwapInInstances.put(instanceToSwapOutName, instanceToSwapInName);
     addParticipant(instanceToSwapInName, instanceToSwapOutInstanceConfig.getLogicalId(LOGICAL_ID),
-        instanceToSwapOutInstanceConfig.getDomainAsMap().get(ZONE), null, false, -1);
+        instanceToSwapOutInstanceConfig.getDomainAsMap().get(ZONE), null, -1);
 
+    // Validate that the SWAP_IN instance does not have any partitions on it.
     Assert.assertTrue(_clusterVerifier.verifyByPolling());
     validateEVsCorrect(getEVs(), originalEVs, swapOutInstancesToSwapInInstances,
         Collections.emptySet(), Collections.emptySet());
 
-    // Enable the SWAP_IN instance, so it can start being assigned replicas
-    _gSetupTool.getClusterManagementTool().enableInstance(CLUSTER_NAME, instanceToSwapInName, true);
+    // Set InstanceOperation to SWAP_IN
+    _gSetupTool.getClusterManagementTool().setInstanceOperation(CLUSTER_NAME, instanceToSwapInName,
+        InstanceConstants.InstanceOperation.SWAP_IN);
 
-    // Validate that partitions on SWAP_OUT instance does not change after setting the InstanceOperation to SWAP_OUT
-    // and adding the SWAP_IN instance to the cluster.
-    // Check that the SWAP_IN instance has the same partitions as the SWAP_OUT instance
+    // Check that the SWAP_IN instance has the same partitions as the swap out instance
     // but none of them are in a top state.
     Assert.assertTrue(_bestPossibleClusterVerifier.verifyByPolling());
     validateEVsCorrect(getEVs(), originalEVs, swapOutInstancesToSwapInInstances,
         ImmutableSet.of(instanceToSwapInName), Collections.emptySet());
 
-    // Validate that the SWAP_OUT instance is in routing tables and SWAP_IN is not.
+    // Validate that the swap out instance is in routing tables and SWAP_IN is not.
     validateRoutingTablesInstance(getEVs(), instanceToSwapOutName, true);
     validateRoutingTablesInstance(getEVs(), instanceToSwapInName, false);
 
@@ -750,37 +744,34 @@
 
     Assert.assertTrue(_clusterVerifier.verifyByPolling());
 
-    // Assert that SWAP_OUT instance is disabled and has no partitions assigned to it.
+    // Assert that swap out instance is inactive and has no partitions assigned to it.
     Assert.assertFalse(_gSetupTool.getClusterManagementTool()
         .getInstanceConfig(CLUSTER_NAME, instanceToSwapOutName).getInstanceEnabled());
 
-    // Validate that the SWAP_IN instance has the same partitions the SWAP_OUT instance had before
+    // Validate that the SWAP_IN instance has the same partitions the swap out instance had before
     // swap was completed.
     verifier(() -> (validateEVsCorrect(getEVs(), originalEVs, swapOutInstancesToSwapInInstances,
         Collections.emptySet(), ImmutableSet.of(instanceToSwapInName))), TIMEOUT);
   }
 
-  @Test(dependsOnMethods = "testNodeSwapSwapInNodeNoInstanceOperationDisabled")
+  @Test(dependsOnMethods = "testNodeSwapSwapInNodeNoInstanceOperation")
   public void testNodeSwapCancelSwapWhenReadyToComplete() throws Exception {
     System.out.println(
         "START TestInstanceOperation.testNodeSwapCancelSwapWhenReadyToComplete() at " + new Date(
             System.currentTimeMillis()));
 
-    removeOfflineOrDisabledOrSwapInInstances();
+    removeOfflineOrInactiveInstances();
 
     // Store original EVs
     Map<String, ExternalView> originalEVs = getEVs();
 
     Map<String, String> swapOutInstancesToSwapInInstances = new HashMap<>();
 
-    // Set instance's InstanceOperation to SWAP_OUT
     String instanceToSwapOutName = _participants.get(0).getInstanceName();
     InstanceConfig instanceToSwapOutInstanceConfig = _gSetupTool.getClusterManagementTool()
         .getInstanceConfig(CLUSTER_NAME, instanceToSwapOutName);
-    _gSetupTool.getClusterManagementTool().setInstanceOperation(CLUSTER_NAME, instanceToSwapOutName,
-        InstanceConstants.InstanceOperation.SWAP_OUT);
 
-    // Validate that the assignment has not changed since setting the InstanceOperation to SWAP_OUT
+    // Validate that the assignment has not changed since setting the InstanceOperation to swap out
     Assert.assertTrue(_clusterVerifier.verifyByPolling());
     validateEVsCorrect(getEVs(), originalEVs, swapOutInstancesToSwapInInstances,
         Collections.emptySet(), Collections.emptySet());
@@ -790,17 +781,15 @@
     swapOutInstancesToSwapInInstances.put(instanceToSwapOutName, instanceToSwapInName);
     addParticipant(instanceToSwapInName, instanceToSwapOutInstanceConfig.getLogicalId(LOGICAL_ID),
         instanceToSwapOutInstanceConfig.getDomainAsMap().get(ZONE),
-        InstanceConstants.InstanceOperation.SWAP_IN, true, -1);
+        InstanceConstants.InstanceOperation.SWAP_IN, -1);
 
-    // Validate that partitions on SWAP_OUT instance does not change after setting the InstanceOperation to SWAP_OUT
-    // and adding the SWAP_IN instance to the cluster.
-    // Check that the SWAP_IN instance has the same partitions as the SWAP_OUT instance
+    // Check that the SWAP_IN instance has the same partitions as the swap out instance
     // but none of them are in a top state.
     Assert.assertTrue(_bestPossibleClusterVerifier.verifyByPolling());
     validateEVsCorrect(getEVs(), originalEVs, swapOutInstancesToSwapInInstances,
         ImmutableSet.of(instanceToSwapInName), Collections.emptySet());
 
-    // Validate that the SWAP_OUT instance is in routing tables and SWAP_IN is not.
+    // Validate that the swap out instance is in routing tables and SWAP_IN is not.
     validateRoutingTablesInstance(getEVs(), instanceToSwapOutName, true);
     validateRoutingTablesInstance(getEVs(), instanceToSwapInName, false);
 
@@ -808,23 +797,29 @@
     Assert.assertTrue(_gSetupTool.getClusterManagementTool()
         .canCompleteSwap(CLUSTER_NAME, instanceToSwapOutName));
 
-    // Cancel SWAP by disabling the SWAP_IN instance and remove SWAP_OUT InstanceOperation from SWAP_OUT instance.
-    _gSetupTool.getClusterManagementTool()
-        .enableInstance(CLUSTER_NAME, instanceToSwapInName, false);
+    // Cancel the swap by setting the InstanceOperation to UNKNOWN on the SWAP_IN instance.
+    _gSetupTool.getClusterManagementTool().setInstanceOperation(CLUSTER_NAME, instanceToSwapInName,
+        InstanceConstants.InstanceOperation.UNKNOWN);
+
+    // Validate there are no partitions on the SWAP_IN instance.
+    Assert.assertTrue(_bestPossibleClusterVerifier.verifyByPolling());
+    validateEVsCorrect(getEVs(), originalEVs, swapOutInstancesToSwapInInstances,
+        Collections.emptySet(), Collections.emptySet());
+
     // Stop the participant
     _participants.get(_participants.size() - 1).syncStop();
 
     // Wait for cluster to converge.
     Assert.assertTrue(_clusterVerifier.verifyByPolling());
 
-    // Validate that the SWAP_OUT instance is in routing tables and SWAP_IN is not.
+    // Validate that the swap out instance is in routing tables and SWAP_IN is not.
     validateRoutingTablesInstance(getEVs(), instanceToSwapOutName, true);
     validateRoutingTablesInstance(getEVs(), instanceToSwapInName, false);
 
     // Validate there are no partitions on the SWAP_IN instance.
     Assert.assertEquals(getPartitionsAndStatesOnInstance(getEVs(), instanceToSwapInName).size(), 0);
 
-    // Validate that the SWAP_OUT instance has the same partitions as it had before.
+    // Validate that the swap out instance has the same partitions as it had before.
     validateEVsCorrect(getEVs(), originalEVs, swapOutInstancesToSwapInInstances,
         Collections.emptySet(), Collections.emptySet());
 
@@ -833,7 +828,7 @@
 
     Assert.assertTrue(_clusterVerifier.verifyByPolling());
 
-    // Validate that the SWAP_OUT instance has the same partitions as it had before.
+    // Validate that the swap out instance has the same partitions as it had before.
     verifier(() -> (validateEVsCorrect(getEVs(), originalEVs, swapOutInstancesToSwapInInstances,
         Collections.emptySet(), Collections.emptySet())), TIMEOUT);
   }
@@ -843,7 +838,7 @@
     System.out.println("START TestInstanceOperation.testNodeSwapAfterEMM() at " + new Date(
         System.currentTimeMillis()));
 
-    removeOfflineOrDisabledOrSwapInInstances();
+    removeOfflineOrInactiveInstances();
 
     // Store original EVs
     Map<String, ExternalView> originalEVs = getEVs();
@@ -854,14 +849,11 @@
     _gSetupTool.getClusterManagementTool()
         .manuallyEnableMaintenanceMode(CLUSTER_NAME, true, null, null);
 
-    // Set instance's InstanceOperation to SWAP_OUT
     String instanceToSwapOutName = _participants.get(0).getInstanceName();
     InstanceConfig instanceToSwapOutInstanceConfig = _gSetupTool.getClusterManagementTool()
         .getInstanceConfig(CLUSTER_NAME, instanceToSwapOutName);
-    _gSetupTool.getClusterManagementTool().setInstanceOperation(CLUSTER_NAME, instanceToSwapOutName,
-        InstanceConstants.InstanceOperation.SWAP_OUT);
 
-    // Validate that the assignment has not changed since setting the InstanceOperation to SWAP_OUT
+    // Validate that the assignment has not changed.
     Assert.assertTrue(_clusterVerifier.verifyByPolling());
     validateEVsCorrect(getEVs(), originalEVs, swapOutInstancesToSwapInInstances,
         Collections.emptySet(), Collections.emptySet());
@@ -871,10 +863,10 @@
     swapOutInstancesToSwapInInstances.put(instanceToSwapOutName, instanceToSwapInName);
     addParticipant(instanceToSwapInName, instanceToSwapOutInstanceConfig.getLogicalId(LOGICAL_ID),
         instanceToSwapOutInstanceConfig.getDomainAsMap().get(ZONE),
-        InstanceConstants.InstanceOperation.SWAP_IN, true, -1);
+        InstanceConstants.InstanceOperation.SWAP_IN, -1);
 
     // Validate that the assignment has not changed since adding the SWAP_IN node.
-    // During MM, the cluster should not compute new assignment.
+    // During MM, the cluster should not compute new assignment on SWAP_IN node.
     Assert.assertTrue(_bestPossibleClusterVerifier.verifyByPolling());
     validateEVsCorrect(getEVs(), originalEVs, swapOutInstancesToSwapInInstances,
         Collections.emptySet(), Collections.emptySet());
@@ -884,14 +876,14 @@
     _gSetupTool.getClusterManagementTool()
         .manuallyEnableMaintenanceMode(CLUSTER_NAME, false, null, null);
 
-    // Validate that partitions on SWAP_OUT instance does not change after exiting MM
-    // Check that the SWAP_IN instance has the same partitions as the SWAP_OUT instance
+    // Validate that partitions on swap out instance does not change after exiting MM
+    // Check that the SWAP_IN instance has the same partitions as the swap out instance
     // but none of them are in a top state.
     Assert.assertTrue(_bestPossibleClusterVerifier.verifyByPolling());
     validateEVsCorrect(getEVs(), originalEVs, swapOutInstancesToSwapInInstances,
         ImmutableSet.of(instanceToSwapInName), Collections.emptySet());
 
-    // Validate that the SWAP_OUT instance is in routing tables and SWAP_IN is not.
+    // Validate that the swap out instance is in routing tables and SWAP_IN is not.
     validateRoutingTablesInstance(getEVs(), instanceToSwapOutName, true);
     validateRoutingTablesInstance(getEVs(), instanceToSwapInName, false);
 
@@ -907,11 +899,11 @@
     // Validate that the SWAP_IN instance is now in the routing tables.
     validateRoutingTablesInstance(getEVs(), instanceToSwapInName, true);
 
-    // Assert that SWAP_OUT instance is disabled and has no partitions assigned to it.
+    // Assert that swap out instance is disabled and has no partitions assigned to it.
     Assert.assertFalse(_gSetupTool.getClusterManagementTool()
         .getInstanceConfig(CLUSTER_NAME, instanceToSwapOutName).getInstanceEnabled());
 
-    // Validate that the SWAP_IN instance has the same partitions the SWAP_OUT instance had before
+    // Validate that the SWAP_IN instance has the same partitions the swap out instance had before
     // swap was completed.
     verifier(() -> (validateEVsCorrect(getEVs(), originalEVs, swapOutInstancesToSwapInInstances,
         Collections.emptySet(), ImmutableSet.of(instanceToSwapInName))), TIMEOUT);
@@ -923,28 +915,25 @@
         "START TestInstanceOperation.testNodeSwapWithSwapOutInstanceDisabled() at " + new Date(
             System.currentTimeMillis()));
 
-    removeOfflineOrDisabledOrSwapInInstances();
+    removeOfflineOrInactiveInstances();
 
     // Store original EVs
     Map<String, ExternalView> originalEVs = getEVs();
 
-    // Set instance's InstanceOperation to SWAP_OUT
     String instanceToSwapOutName = _participants.get(0).getInstanceName();
     InstanceConfig instanceToSwapOutInstanceConfig = _gSetupTool.getClusterManagementTool()
         .getInstanceConfig(CLUSTER_NAME, instanceToSwapOutName);
-    _gSetupTool.getClusterManagementTool().setInstanceOperation(CLUSTER_NAME, instanceToSwapOutName,
-        InstanceConstants.InstanceOperation.SWAP_OUT);
 
     Set<String> swapOutInstanceOriginalPartitions =
         getPartitionsAndStatesOnInstance(originalEVs, instanceToSwapOutName).keySet();
 
-    // Disable the SWAP_OUT instance.
+    // Disable the swap out instance.
     _gSetupTool.getClusterManagementTool()
         .enableInstance(CLUSTER_NAME, instanceToSwapOutName, false);
 
     Assert.assertTrue(_clusterVerifier.verifyByPolling());
 
-    // Validate that the SWAP_OUT instance has all partitions in OFFLINE state
+    // Validate that the swap out instance has all partitions in OFFLINE state
     Set<String> swapOutInstanceOfflineStates =
         new HashSet<>(getPartitionsAndStatesOnInstance(getEVs(), instanceToSwapOutName).values());
     Assert.assertEquals(swapOutInstanceOfflineStates.size(), 1);
@@ -954,21 +943,16 @@
     String instanceToSwapInName = PARTICIPANT_PREFIX + "_" + _nextStartPort;
     addParticipant(instanceToSwapInName, instanceToSwapOutInstanceConfig.getLogicalId(LOGICAL_ID),
         instanceToSwapOutInstanceConfig.getDomainAsMap().get(ZONE),
-        InstanceConstants.InstanceOperation.SWAP_IN, true, -1);
+        InstanceConstants.InstanceOperation.SWAP_IN, -1);
 
     Assert.assertTrue(_bestPossibleClusterVerifier.verifyByPolling());
 
-    // Validate that the SWAP_IN instance has the same partitions in secondTopState as the SWAP_OUT instance
-    // did before being disabled.
+    // Validate that the SWAP_IN instance has no partitions because the swap started when the swap out node was offline
     Map<String, String> swapInInstancePartitionsAndStates =
         getPartitionsAndStatesOnInstance(getEVs(), instanceToSwapInName);
-    Assert.assertTrue(
-        swapInInstancePartitionsAndStates.keySet().containsAll(swapOutInstanceOriginalPartitions));
-    Set<String> swapInInstanceStates = new HashSet<>(swapInInstancePartitionsAndStates.values());
-    swapInInstanceStates.removeAll(SECONDARY_STATE_SET);
-    Assert.assertEquals(swapInInstanceStates.size(), 0);
+    Assert.assertEquals(swapInInstancePartitionsAndStates.size(), 0);
 
-    // Assert canSwapBeCompleted is false because SWAP_OUT instance is disabled.
+    // Assert canSwapBeCompleted is false because swap out instance is disabled.
     Assert.assertTrue(_gSetupTool.getClusterManagementTool()
         .canCompleteSwap(CLUSTER_NAME, instanceToSwapOutName));
 
@@ -976,9 +960,9 @@
     Assert.assertTrue(_gSetupTool.getClusterManagementTool()
         .completeSwapIfPossible(CLUSTER_NAME, instanceToSwapOutName, false));
 
-    Assert.assertTrue(_clusterVerifier.verifyByPolling());
+    Assert.assertTrue(_bestPossibleClusterVerifier.verifyByPolling());
 
-    // Assert that SWAP_OUT instance is disabled and has no partitions assigned to it.
+    // Assert that swap out instance is disabled and has no partitions assigned to it.
     Assert.assertFalse(_gSetupTool.getClusterManagementTool()
         .getInstanceConfig(CLUSTER_NAME, instanceToSwapOutName).getInstanceEnabled());
 
@@ -993,28 +977,23 @@
         "START TestInstanceOperation.testNodeSwapWithSwapOutInstanceOffline() at " + new Date(
             System.currentTimeMillis()));
 
-    removeOfflineOrDisabledOrSwapInInstances();
+    removeOfflineOrInactiveInstances();
 
     // Store original EV
     Map<String, ExternalView> originalEVs = getEVs();
 
     Map<String, String> swapOutInstancesToSwapInInstances = new HashMap<>();
 
-    // Set instance's InstanceOperation to SWAP_OUT
     String instanceToSwapOutName = _participants.get(0).getInstanceName();
     InstanceConfig instanceToSwapOutInstanceConfig = _gSetupTool.getClusterManagementTool()
         .getInstanceConfig(CLUSTER_NAME, instanceToSwapOutName);
-    _gSetupTool.getClusterManagementTool().setInstanceOperation(CLUSTER_NAME, instanceToSwapOutName,
-        InstanceConstants.InstanceOperation.SWAP_OUT);
-
-    Assert.assertTrue(_clusterVerifier.verifyByPolling());
 
     // Add instance with InstanceOperation set to SWAP_IN
     String instanceToSwapInName = PARTICIPANT_PREFIX + "_" + _nextStartPort;
     swapOutInstancesToSwapInInstances.put(instanceToSwapOutName, instanceToSwapInName);
     addParticipant(instanceToSwapInName, instanceToSwapOutInstanceConfig.getLogicalId(LOGICAL_ID),
         instanceToSwapOutInstanceConfig.getDomainAsMap().get(ZONE),
-        InstanceConstants.InstanceOperation.SWAP_IN, true, -1);
+        InstanceConstants.InstanceOperation.SWAP_IN, -1);
 
     // Kill the participant
     _participants.get(0).syncStop();
@@ -1025,7 +1004,7 @@
     Assert.assertTrue(_gSetupTool.getClusterManagementTool()
         .canCompleteSwap(CLUSTER_NAME, instanceToSwapOutName));
 
-    // Validate that the SWAP_OUT instance is in routing tables and SWAP_IN is not.
+    // Validate that the swap out instance is in routing tables and SWAP_IN is not.
     validateRoutingTablesInstance(getEVs(), instanceToSwapInName, false);
 
     // Assert completeSwapIfPossible is true
@@ -1037,11 +1016,11 @@
     // Validate that the SWAP_IN instance is now in the routing tables.
     validateRoutingTablesInstance(getEVs(), instanceToSwapInName, true);
 
-    // Assert that SWAP_OUT instance is disabled and has no partitions assigned to it.
+    // Assert that swap out instance is inactive and has no partitions assigned to it.
     Assert.assertFalse(_gSetupTool.getClusterManagementTool()
         .getInstanceConfig(CLUSTER_NAME, instanceToSwapOutName).getInstanceEnabled());
 
-    // Validate that the SWAP_IN instance has the same partitions the SWAP_OUT instance had before
+    // Validate that the SWAP_IN instance has the same partitions the swap out instance had before
     // swap was completed.
     verifier(() -> (validateEVsCorrect(getEVs(), originalEVs, swapOutInstancesToSwapInInstances,
         Collections.emptySet(), ImmutableSet.of(instanceToSwapInName))), TIMEOUT);
@@ -1051,7 +1030,7 @@
   public void testSwapEvacuateAdd() throws Exception {
     System.out.println("START TestInstanceOperation.testSwapEvacuateAdd() at " + new Date(
         System.currentTimeMillis()));
-    removeOfflineOrDisabledOrSwapInInstances();
+    removeOfflineOrInactiveInstances();
 
     // Store original EV
     Map<String, ExternalView> originalEVs = getEVs();
@@ -1062,6 +1041,8 @@
     _gSetupTool.getClusterManagementTool()
         .manuallyEnableMaintenanceMode(CLUSTER_NAME, true, null, null);
 
+    Assert.assertTrue(_clusterVerifier.verifyByPolling());
+
     // Set instance's InstanceOperation to EVACUATE
     String instanceToSwapOutName = _participants.get(0).getInstanceName();
     InstanceConfig instanceToSwapOutInstanceConfig = _gSetupTool.getClusterManagementTool()
@@ -1073,11 +1054,12 @@
     validateEVsCorrect(getEVs(), originalEVs, swapOutInstancesToSwapInInstances,
         Collections.emptySet(), Collections.emptySet());
 
-    // Add instance with InstanceOperation set to SWAP_IN
+    // Add instance with InstanceOperation set to ENABLE
     String instanceToSwapInName = PARTICIPANT_PREFIX + "_" + _nextStartPort;
     swapOutInstancesToSwapInInstances.put(instanceToSwapOutName, instanceToSwapInName);
     addParticipant(instanceToSwapInName, instanceToSwapOutInstanceConfig.getLogicalId(LOGICAL_ID),
-        instanceToSwapOutInstanceConfig.getDomainAsMap().get(ZONE), null, true, -1);
+        instanceToSwapOutInstanceConfig.getDomainAsMap().get(ZONE),
+        InstanceConstants.InstanceOperation.ENABLE, -1);
 
     // Exit maintenance mode
     _gSetupTool.getClusterManagementTool()
@@ -1085,7 +1067,7 @@
 
     Assert.assertTrue(_bestPossibleClusterVerifier.verifyByPolling());
 
-    // Validate that the SWAP_IN instance has the same partitions the SWAP_OUT instance had.
+    // Validate that the SWAP_IN instance has the same partitions the swap out instance had.
     verifier(() -> (validateEVsCorrect(getEVs(), originalEVs, swapOutInstancesToSwapInInstances,
         Collections.emptySet(), ImmutableSet.of(instanceToSwapInName))), TIMEOUT);
 
@@ -1093,9 +1075,9 @@
     Assert.assertTrue(_gSetupTool.getClusterManagementTool()
         .isEvacuateFinished(CLUSTER_NAME, instanceToSwapOutName));
 
-    // Disable the EVACUATE instance
-    _gSetupTool.getClusterManagementTool()
-        .enableInstance(CLUSTER_NAME, instanceToSwapOutName, false);
+    // Set the EVACUATE instance to UNKNOWN
+    _gSetupTool.getClusterManagementTool().setInstanceOperation(CLUSTER_NAME, instanceToSwapOutName,
+        InstanceConstants.InstanceOperation.UNKNOWN);
 
     Assert.assertTrue(_clusterVerifier.verifyByPolling());
 
@@ -1105,31 +1087,13 @@
   }
 
   @Test(expectedExceptions = HelixException.class, dependsOnMethods = "testSwapEvacuateAdd")
-  public void testNodeSwapAddSwapInFirstEnabledBeforeSwapOutSet() throws Exception {
+  public void testUnsetInstanceOperationOnSwapInWhenSwapping() throws Exception {
     System.out.println(
-        "START TestInstanceOperation.testNodeSwapAddSwapInFirstEnabledBeforeSwapOutSet() at "
+        "START TestInstanceOperation.testUnsetInstanceOperationOnSwapInWhenSwapping() at "
             + new Date(System.currentTimeMillis()));
-    removeOfflineOrDisabledOrSwapInInstances();
 
-    // Get the SWAP_OUT instance.
-    String instanceToSwapOutName = _participants.get(0).getInstanceName();
-    InstanceConfig instanceToSwapOutInstanceConfig = _gSetupTool.getClusterManagementTool()
-        .getInstanceConfig(CLUSTER_NAME, instanceToSwapOutName);
+    removeOfflineOrInactiveInstances();
 
-    // Add instance with InstanceOperation set to SWAP_IN enabled before setting SWAP_OUT instance.
-    String instanceToSwapInName = PARTICIPANT_PREFIX + "_" + _nextStartPort;
-    addParticipant(instanceToSwapInName, instanceToSwapOutInstanceConfig.getLogicalId(LOGICAL_ID),
-        instanceToSwapOutInstanceConfig.getDomainAsMap().get(ZONE), null, true, -1);
-  }
-
-  @Test(expectedExceptions = HelixException.class, dependsOnMethods = "testNodeSwapAddSwapInFirstEnabledBeforeSwapOutSet")
-  public void testNodeSwapAddSwapInFirstEnableBeforeSwapOutSet() throws Exception {
-    System.out.println(
-        "START TestInstanceOperation.testNodeSwapAddSwapInFirstEnableBeforeSwapOutSet() at "
-            + new Date(System.currentTimeMillis()));
-    removeOfflineOrDisabledOrSwapInInstances();
-
-    // Get the SWAP_OUT instance.
     String instanceToSwapOutName = _participants.get(0).getInstanceName();
     InstanceConfig instanceToSwapOutInstanceConfig = _gSetupTool.getClusterManagementTool()
         .getInstanceConfig(CLUSTER_NAME, instanceToSwapOutName);
@@ -1137,49 +1101,27 @@
     // Add instance with InstanceOperation set to SWAP_IN
     String instanceToSwapInName = PARTICIPANT_PREFIX + "_" + _nextStartPort;
     addParticipant(instanceToSwapInName, instanceToSwapOutInstanceConfig.getLogicalId(LOGICAL_ID),
-        instanceToSwapOutInstanceConfig.getDomainAsMap().get(ZONE), null, false, -1);
+        instanceToSwapOutInstanceConfig.getDomainAsMap().get(ZONE),
+        InstanceConstants.InstanceOperation.SWAP_IN, -1);
 
     Assert.assertTrue(_bestPossibleClusterVerifier.verifyByPolling());
 
-    // Enable the SWAP_IN instance before we have set the SWAP_OUT instance.
-    _gSetupTool.getClusterManagementTool().enableInstance(CLUSTER_NAME, instanceToSwapInName, true);
-  }
-
-  @Test(expectedExceptions = HelixException.class, dependsOnMethods = "testNodeSwapAddSwapInFirstEnableBeforeSwapOutSet")
-  public void testUnsetInstanceOperationOnSwapInWhenAlreadyUnsetOnSwapOut() throws Exception {
-    System.out.println(
-        "START TestInstanceOperation.testUnsetInstanceOperationOnSwapInWhenAlreadyUnsetOnSwapOut() at "
-            + new Date(System.currentTimeMillis()));
-    removeOfflineOrDisabledOrSwapInInstances();
-
-    // Get the SWAP_OUT instance.
-    String instanceToSwapOutName = _participants.get(0).getInstanceName();
-    InstanceConfig instanceToSwapOutInstanceConfig = _gSetupTool.getClusterManagementTool()
-        .getInstanceConfig(CLUSTER_NAME, instanceToSwapOutName);
-
-    // Add instance with InstanceOperation set to SWAP_IN
-    String instanceToSwapInName = PARTICIPANT_PREFIX + "_" + _nextStartPort;
-    addParticipant(instanceToSwapInName, instanceToSwapOutInstanceConfig.getLogicalId(LOGICAL_ID),
-        instanceToSwapOutInstanceConfig.getDomainAsMap().get(ZONE), null, false, -1);
-
-    Assert.assertTrue(_bestPossibleClusterVerifier.verifyByPolling());
-
-    // Try to remove the InstanceOperation from the SWAP_IN instance before the SWAP_OUT instance is set.
+    // Try to remove the InstanceOperation from the SWAP_IN instance before swap in instance is set to unknown.
     // This should throw exception because we cannot ever have two instances with the same logicalId and both have InstanceOperation
     // unset.
     _gSetupTool.getClusterManagementTool()
         .setInstanceOperation(CLUSTER_NAME, instanceToSwapInName, null);
   }
 
-  @Test(expectedExceptions = HelixException.class, dependsOnMethods = "testUnsetInstanceOperationOnSwapInWhenAlreadyUnsetOnSwapOut")
+  @Test(dependsOnMethods = "testUnsetInstanceOperationOnSwapInWhenSwapping")
   public void testNodeSwapAddSwapInFirst() throws Exception {
     System.out.println("START TestInstanceOperation.testNodeSwapAddSwapInFirst() at " + new Date(
         System.currentTimeMillis()));
-    removeOfflineOrDisabledOrSwapInInstances();
+    removeOfflineOrInactiveInstances();
 
     // Store original EV
     Map<String, ExternalView> originalEVs = getEVs();
-    // Get the SWAP_OUT instance.
+    // Get the swap out instance.
     String instanceToSwapOutName = _participants.get(0).getInstanceName();
     InstanceConfig instanceToSwapOutInstanceConfig = _gSetupTool.getClusterManagementTool()
         .getInstanceConfig(CLUSTER_NAME, instanceToSwapOutName);
@@ -1187,7 +1129,8 @@
     // Add instance with InstanceOperation set to SWAP_IN
     String instanceToSwapInName = PARTICIPANT_PREFIX + "_" + _nextStartPort;
     addParticipant(instanceToSwapInName, instanceToSwapOutInstanceConfig.getLogicalId(LOGICAL_ID),
-        instanceToSwapOutInstanceConfig.getDomainAsMap().get(ZONE), null, false, -1);
+        instanceToSwapOutInstanceConfig.getDomainAsMap().get(ZONE),
+        InstanceConstants.InstanceOperation.SWAP_IN, -1);
   }
 
   @Test(dependsOnMethods = "testNodeSwapAddSwapInFirst")
@@ -1195,7 +1138,8 @@
     System.out.println(
         "START TestInstanceOperation.testEvacuateAndCancelBeforeBootstrapFinish() at " + new Date(
             System.currentTimeMillis()));
-    removeOfflineOrDisabledOrSwapInInstances();
+
+    removeOfflineOrInactiveInstances();
 
     // add a resource where downward state transition is slow
     createResourceWithDelayedRebalance(CLUSTER_NAME, "TEST_DB3_DELAYED_CRUSHED", "MasterSlave",
@@ -1346,7 +1290,31 @@
     _stateModelDelay = 3L;
   }
 
-  @Test(dependsOnMethods = "testMarkEvacuationAfterEMM")
+  @Test(expectedExceptions = HelixException.class, dependsOnMethods = "testMarkEvacuationAfterEMM")
+  public void testSwapEvacuateAddRemoveEvacuate() throws Exception {
+    System.out.println("START TestInstanceOperation.testSwapEvacuateAddRemoveEvacuate() at " + new Date(
+        System.currentTimeMillis()));
+    removeOfflineOrInactiveInstances();
+
+    // Set instance's InstanceOperation to EVACUATE
+    String instanceToSwapOutName = _participants.get(0).getInstanceName();
+    InstanceConfig instanceToSwapOutInstanceConfig = _gSetupTool.getClusterManagementTool()
+        .getInstanceConfig(CLUSTER_NAME, instanceToSwapOutName);
+    _gSetupTool.getClusterManagementTool().setInstanceOperation(CLUSTER_NAME, instanceToSwapOutName,
+        InstanceConstants.InstanceOperation.EVACUATE);
+
+    // Add instance with InstanceOperation set to ENABLE
+    String instanceToSwapInName = PARTICIPANT_PREFIX + "_" + _nextStartPort;
+    addParticipant(instanceToSwapInName, instanceToSwapOutInstanceConfig.getLogicalId(LOGICAL_ID),
+        instanceToSwapOutInstanceConfig.getDomainAsMap().get(ZONE),
+        InstanceConstants.InstanceOperation.ENABLE, -1);
+
+    // Remove EVACUATE instance's InstanceOperation
+    _gSetupTool.getClusterManagementTool()
+        .setInstanceOperation(CLUSTER_NAME, instanceToSwapOutName, null);
+  }
+
+  @Test(dependsOnMethods = "testSwapEvacuateAddRemoveEvacuate")
   public void testEvacuationWithOfflineInstancesInCluster() throws Exception {
     System.out.println(
         "START TestInstanceOperation.testEvacuationWithOfflineInstancesInCluster() at " + new Date(
@@ -1358,11 +1326,10 @@
     _gSetupTool.getClusterManagementTool().setInstanceOperation(CLUSTER_NAME, evacuateInstanceName,
         InstanceConstants.InstanceOperation.EVACUATE);
 
-    Map<String, ExternalView> assignment;
     // EV should contain all participants, check resources one by one
-    assignment = getEVs();
-    for (String resource : _allDBs) {
-      verifier(() -> {
+    verifier(() -> {
+      Map<String, ExternalView> assignment = getEVs();
+      for (String resource : _allDBs) {
         ExternalView ev = assignment.get(resource);
         for (String partition : ev.getPartitionSet()) {
           AtomicInteger activeReplicaCount = new AtomicInteger();
@@ -1372,44 +1339,21 @@
               .forEach(v -> activeReplicaCount.getAndIncrement());
           if (activeReplicaCount.get() < REPLICA - 1 || (
               ev.getStateMap(partition).containsKey(evacuateInstanceName) && ev.getStateMap(
-                  partition).get(evacuateInstanceName).equals("MASTER") && ev.getStateMap(partition)
-                  .get(evacuateInstanceName).equals("LEADER"))) {
+                  partition).get(evacuateInstanceName).equals("MASTER") && ev.getStateMap(
+                  partition).get(evacuateInstanceName).equals("LEADER"))) {
             return false;
           }
         }
-        return true;
-      }, 30000);
-    }
+      }
+      return true;
+    }, 30000);
 
-    removeOfflineOrDisabledOrSwapInInstances();
+    removeOfflineOrInactiveInstances();
     addParticipant(PARTICIPANT_PREFIX + "_" + _nextStartPort);
     addParticipant(PARTICIPANT_PREFIX + "_" + _nextStartPort);
     dropTestDBs(ImmutableSet.of("TEST_DB3_DELAYED_CRUSHED", "TEST_DB4_DELAYED_WAGED"));
   }
 
-  @Test(expectedExceptions = HelixException.class, dependsOnMethods = "testEvacuationWithOfflineInstancesInCluster")
-  public void testSwapEvacuateAddRemoveEvacuate() throws Exception {
-    System.out.println("START TestInstanceOperation.testSwapEvacuateAddRemoveEvacuate() at " + new Date(
-        System.currentTimeMillis()));
-    removeOfflineOrDisabledOrSwapInInstances();
-
-    // Set instance's InstanceOperation to EVACUATE
-    String instanceToSwapOutName = _participants.get(0).getInstanceName();
-    InstanceConfig instanceToSwapOutInstanceConfig = _gSetupTool.getClusterManagementTool()
-        .getInstanceConfig(CLUSTER_NAME, instanceToSwapOutName);
-    _gSetupTool.getClusterManagementTool().setInstanceOperation(CLUSTER_NAME, instanceToSwapOutName,
-        InstanceConstants.InstanceOperation.EVACUATE);
-
-    // Add instance with InstanceOperation set to SWAP_IN
-    String instanceToSwapInName = PARTICIPANT_PREFIX + "_" + _nextStartPort;
-    addParticipant(instanceToSwapInName, instanceToSwapOutInstanceConfig.getLogicalId(LOGICAL_ID),
-        instanceToSwapOutInstanceConfig.getDomainAsMap().get(ZONE), null, true, -1);
-
-    // Remove EVACUATE instance's InstanceOperation
-    _gSetupTool.getClusterManagementTool()
-        .setInstanceOperation(CLUSTER_NAME, instanceToSwapOutName, null);
-  }
-
   /**
    * Verifies that the given verifier returns true within the given timeout. Handles AssertionError
    * by returning false, which TestHelper.verify will not do. Asserts that return value from
@@ -1449,9 +1393,9 @@
     public void onInstanceConfigChange(List<InstanceConfig> instanceConfig,
         NotificationContext context) {
       if (instanceConfig.get(0).getInstanceOperation()
-          .equals(InstanceConstants.InstanceOperation.SWAP_IN.name())) {
+          .equals(InstanceConstants.InstanceOperation.SWAP_IN)) {
         throttlesEnabled = false;
-      } else if (instanceConfig.get(0).getInstanceOperation().isEmpty()) {
+      } else {
         throttlesEnabled = true;
       }
     }
@@ -1470,21 +1414,21 @@
 
   private void addParticipant(String participantName) throws Exception {
     addParticipant(participantName, UUID.randomUUID().toString(),
-        "zone_" + _participants.size() % ZONE_COUNT, null, true, -1);
+        "zone_" + _participants.size() % ZONE_COUNT, null, -1);
   }
 
   private void addParticipant(String participantName, String logicalId, String zone,
-      InstanceConstants.InstanceOperation instanceOperation, boolean enabled, int capacity)
+      InstanceConstants.InstanceOperation instanceOperation, int capacity)
       throws Exception {
-    addParticipant(participantName, logicalId, zone, instanceOperation, enabled, capacity, null);
+    addParticipant(participantName, logicalId, zone, instanceOperation, capacity, null);
   }
 
   private void addParticipant(String participantName, String logicalId, String zone,
-      InstanceConstants.InstanceOperation instanceOperation, boolean enabled, int capacity,
+      InstanceConstants.InstanceOperation instanceOperation, int capacity,
       InstanceConfigChangeListener listener) throws Exception {
     InstanceConfig config = new InstanceConfig.Builder().setDomain(
             String.format("%s=%s, %s=%s, %s=%s", ZONE, zone, HOST, participantName, LOGICAL_ID,
-                logicalId)).setInstanceEnabled(enabled).setInstanceOperation(instanceOperation)
+                logicalId)).setInstanceOperation(instanceOperation)
         .build(participantName);
 
     if (capacity >= 0) {
diff --git a/helix-core/src/test/java/org/apache/helix/integration/rebalancer/WagedRebalancer/TestWagedExpandCluster.java b/helix-core/src/test/java/org/apache/helix/integration/rebalancer/WagedRebalancer/TestWagedExpandCluster.java
index 697847c..9cc4eea 100644
--- a/helix-core/src/test/java/org/apache/helix/integration/rebalancer/WagedRebalancer/TestWagedExpandCluster.java
+++ b/helix-core/src/test/java/org/apache/helix/integration/rebalancer/WagedRebalancer/TestWagedExpandCluster.java
@@ -22,6 +22,7 @@
 import java.util.HashMap;
 import java.util.Map;
 
+import org.apache.helix.constants.InstanceConstants;
 import org.apache.helix.integration.manager.MockParticipantManager;
 import org.apache.helix.integration.rebalancer.PartitionMigration.TestPartitionMigrationBase;
 import org.apache.helix.model.ClusterConfig;
@@ -103,7 +104,7 @@
     for (int i = numNodes; i < numNodes + NUM_NODE; i++) {
       String storageNodeName = PARTICIPANT_PREFIX + "_" + (START_PORT + i);
       InstanceConfig config = InstanceConfig.toInstanceConfig(storageNodeName);
-      config.setInstanceEnabled(false);
+      config.setInstanceOperation(InstanceConstants.InstanceOperation.DISABLE);
       config.getRecord().getSimpleFields()
           .remove(InstanceConfig.InstanceConfigProperty.HELIX_ENABLED_TIMESTAMP.name());
 
diff --git a/helix-core/src/test/java/org/apache/helix/integration/rebalancer/WagedRebalancer/TestWagedNodeSwap.java b/helix-core/src/test/java/org/apache/helix/integration/rebalancer/WagedRebalancer/TestWagedNodeSwap.java
index f6ef827..fbb7304 100644
--- a/helix-core/src/test/java/org/apache/helix/integration/rebalancer/WagedRebalancer/TestWagedNodeSwap.java
+++ b/helix-core/src/test/java/org/apache/helix/integration/rebalancer/WagedRebalancer/TestWagedNodeSwap.java
@@ -32,6 +32,7 @@
 import org.apache.helix.ConfigAccessor;
 import org.apache.helix.TestHelper;
 import org.apache.helix.common.ZkTestBase;
+import org.apache.helix.constants.InstanceConstants;
 import org.apache.helix.integration.manager.ClusterControllerManager;
 import org.apache.helix.integration.manager.MockParticipantManager;
 import org.apache.helix.model.BuiltInStateModelDefinitions;
@@ -156,7 +157,7 @@
     String oldParticipantName = oldParticipant.getInstanceName();
     final InstanceConfig instanceConfig =
         _gSetupTool.getClusterManagementTool().getInstanceConfig(CLUSTER_NAME, oldParticipantName);
-    instanceConfig.setInstanceEnabled(false);
+    instanceConfig.setInstanceOperation(InstanceConstants.InstanceOperation.DISABLE);
     _gSetupTool.getClusterManagementTool()
         .setInstanceConfig(CLUSTER_NAME, oldParticipantName, instanceConfig);
     Assert.assertTrue(_clusterVerifier.verify(10000));
@@ -231,7 +232,7 @@
       InstanceConfig instanceConfig =
           _gSetupTool.getClusterManagementTool().getInstanceConfig(CLUSTER_NAME, instanceName);
       if (instanceConfig.getDomainAsMap().get("zone").equals(randZoneStr)) {
-        instanceConfig.setInstanceEnabled(false);
+        instanceConfig.setInstanceOperation(InstanceConstants.InstanceOperation.DISABLE);
         _gSetupTool.getClusterManagementTool()
             .setInstanceConfig(CLUSTER_NAME, instanceName, instanceConfig);
         removedInstanceConfigMap.put(instanceName, instanceConfig);
diff --git a/helix-core/src/test/java/org/apache/helix/integration/rebalancer/WagedRebalancer/TestWagedRebalance.java b/helix-core/src/test/java/org/apache/helix/integration/rebalancer/WagedRebalancer/TestWagedRebalance.java
index a7250d4..26eb13d 100644
--- a/helix-core/src/test/java/org/apache/helix/integration/rebalancer/WagedRebalancer/TestWagedRebalance.java
+++ b/helix-core/src/test/java/org/apache/helix/integration/rebalancer/WagedRebalancer/TestWagedRebalance.java
@@ -34,6 +34,7 @@
 import org.apache.helix.HelixException;
 import org.apache.helix.TestHelper;
 import org.apache.helix.common.ZkTestBase;
+import org.apache.helix.constants.InstanceConstants;
 import org.apache.helix.controller.rebalancer.strategy.CrushEdRebalanceStrategy;
 import org.apache.helix.controller.rebalancer.strategy.CrushRebalanceStrategy;
 import org.apache.helix.controller.rebalancer.util.RebalanceScheduler;
@@ -387,7 +388,7 @@
         disableParticipants.add(p.getInstanceName());
         InstanceConfig config = _gSetupTool.getClusterManagementTool()
             .getInstanceConfig(CLUSTER_NAME, p.getInstanceName());
-        config.setInstanceEnabled(false);
+        config.setInstanceOperation(InstanceConstants.InstanceOperation.DISABLE);
         _gSetupTool.getClusterManagementTool()
             .setInstanceConfig(CLUSTER_NAME, p.getInstanceName(), config);
       }
@@ -408,7 +409,7 @@
       for (String instanceName : disableParticipants) {
         InstanceConfig config =
             _gSetupTool.getClusterManagementTool().getInstanceConfig(CLUSTER_NAME, instanceName);
-        config.setInstanceEnabled(true);
+        config.setInstanceOperation(InstanceConstants.InstanceOperation.ENABLE);
         _gSetupTool.getClusterManagementTool()
             .setInstanceConfig(CLUSTER_NAME, instanceName, config);
       }
diff --git a/helix-core/src/test/java/org/apache/helix/messaging/p2pMessage/TestP2PMessagesAvoidDuplicatedMessage.java b/helix-core/src/test/java/org/apache/helix/messaging/p2pMessage/TestP2PMessagesAvoidDuplicatedMessage.java
index 40d5c97..77f4432 100644
--- a/helix-core/src/test/java/org/apache/helix/messaging/p2pMessage/TestP2PMessagesAvoidDuplicatedMessage.java
+++ b/helix-core/src/test/java/org/apache/helix/messaging/p2pMessage/TestP2PMessagesAvoidDuplicatedMessage.java
@@ -82,6 +82,7 @@
     event.addAttribute(AttributeName.RESOURCES.name(), resourceMap);
     event.addAttribute(AttributeName.RESOURCES_TO_REBALANCE.name(), resourceMap);
     event.addAttribute(AttributeName.CURRENT_STATE.name(), new CurrentStateOutput());
+    event.addAttribute(AttributeName.CURRENT_STATE_EXCLUDING_UNKNOWN.name(), new CurrentStateOutput());
     event.addAttribute(AttributeName.helixmanager.name(), manager);
 
     _fullPipeline = new Pipeline("FullPipeline");
@@ -124,6 +125,7 @@
     CurrentStateOutput currentStateOutput =
         populateCurrentStateFromBestPossible(_bestpossibleState);
     event.addAttribute(AttributeName.CURRENT_STATE.name(), currentStateOutput);
+    event.addAttribute(AttributeName.CURRENT_STATE_EXCLUDING_UNKNOWN.name(), currentStateOutput);
 
     _fullPipeline.handle(event);
 
@@ -161,6 +163,7 @@
     currentStateOutput.setPendingRelayMessage(_db, _partition, initialMaster, relayMessage);
 
     event.addAttribute(AttributeName.CURRENT_STATE.name(), currentStateOutput);
+    event.addAttribute(AttributeName.CURRENT_STATE_EXCLUDING_UNKNOWN.name(), currentStateOutput);
 
     _fullPipeline.handle(event);
 
@@ -179,6 +182,7 @@
     currentStateOutput.setPendingRelayMessage(_db, _partition, initialMaster, relayMessage);
 
     event.addAttribute(AttributeName.CURRENT_STATE.name(), currentStateOutput);
+    event.addAttribute(AttributeName.CURRENT_STATE_EXCLUDING_UNKNOWN.name(), currentStateOutput);
 
     _messagePipeline.handle(event);
 
@@ -218,6 +222,7 @@
 
 
     event.addAttribute(AttributeName.CURRENT_STATE.name(), currentStateOutput);
+    event.addAttribute(AttributeName.CURRENT_STATE_EXCLUDING_UNKNOWN.name(), currentStateOutput);
     event.addAttribute(AttributeName.BEST_POSSIBLE_STATE.name(), _bestpossibleState);
 
     _messagePipeline.handle(event);
@@ -244,6 +249,7 @@
     currentStateOutput.setCurrentState(_db, _partition, initialMaster, "SLAVE");
     currentStateOutput.setPendingMessage(_db, _partition, secondMaster, relayMessage);
     event.addAttribute(AttributeName.CURRENT_STATE.name(), currentStateOutput);
+    event.addAttribute(AttributeName.CURRENT_STATE_EXCLUDING_UNKNOWN.name(), currentStateOutput);
 
     _fullPipeline.handle(event);
 
@@ -264,6 +270,7 @@
     // Validate: controller should not send S->M to thirdMaster.
     currentStateOutput.setCurrentState(_db, _partition, initialMaster, "OFFLINE");
     event.addAttribute(AttributeName.CURRENT_STATE.name(), currentStateOutput);
+    event.addAttribute(AttributeName.CURRENT_STATE_EXCLUDING_UNKNOWN.name(), currentStateOutput);
 
     thirdMaster =
         getTopStateInstance(_bestpossibleState.getInstanceStateMap(_db, _partition),
@@ -290,6 +297,7 @@
     // Validate: Controller should not send S->M to thirdMaster.
     currentStateOutput.setPendingMessage(_db, _partition, secondMaster, relayMessage);
     event.addAttribute(AttributeName.CURRENT_STATE.name(), currentStateOutput);
+    event.addAttribute(AttributeName.CURRENT_STATE_EXCLUDING_UNKNOWN.name(), currentStateOutput);
 
     event.addAttribute(AttributeName.BEST_POSSIBLE_STATE.name(), _bestpossibleState);
 
@@ -310,6 +318,7 @@
     currentStateOutput.setCurrentState(_db, _partition, thirdMaster, "SLAVE");
 
     event.addAttribute(AttributeName.CURRENT_STATE.name(), currentStateOutput);
+    event.addAttribute(AttributeName.CURRENT_STATE_EXCLUDING_UNKNOWN.name(), currentStateOutput);
 
     _messagePipeline.handle(event);
 
diff --git a/helix-core/src/test/java/org/apache/helix/messaging/p2pMessage/TestP2PStateTransitionMessages.java b/helix-core/src/test/java/org/apache/helix/messaging/p2pMessage/TestP2PStateTransitionMessages.java
index 307022f..9a38656 100644
--- a/helix-core/src/test/java/org/apache/helix/messaging/p2pMessage/TestP2PStateTransitionMessages.java
+++ b/helix-core/src/test/java/org/apache/helix/messaging/p2pMessage/TestP2PStateTransitionMessages.java
@@ -95,6 +95,7 @@
     event.addAttribute(AttributeName.RESOURCES.name(), resourceMap);
     event.addAttribute(AttributeName.RESOURCES_TO_REBALANCE.name(), resourceMap);
     event.addAttribute(AttributeName.CURRENT_STATE.name(), new CurrentStateOutput());
+    event.addAttribute(AttributeName.CURRENT_STATE_EXCLUDING_UNKNOWN.name(), new CurrentStateOutput());
     event.addAttribute(AttributeName.helixmanager.name(), manager);
 
     Pipeline pipeline = createPipeline();
@@ -106,6 +107,7 @@
     CurrentStateOutput currentStateOutput =
         populateCurrentStateFromBestPossible(bestPossibleStateOutput);
     event.addAttribute(AttributeName.CURRENT_STATE.name(), currentStateOutput);
+    event.addAttribute(AttributeName.CURRENT_STATE_EXCLUDING_UNKNOWN.name(), currentStateOutput);
 
     Partition p = new Partition(db + "_0");
 
@@ -153,6 +155,7 @@
     currentStateOutput.setPendingRelayMessage(db, p, masterInstance, relayMessage);
 
     event.addAttribute(AttributeName.CURRENT_STATE.name(), currentStateOutput);
+    event.addAttribute(AttributeName.CURRENT_STATE_EXCLUDING_UNKNOWN.name(), currentStateOutput);
 
     pipeline.handle(event);
 
@@ -167,6 +170,7 @@
     currentStateOutput.setCurrentState(db, p, masterInstance, "SLAVE");
     currentStateOutput.setPendingMessage(db, p, newMasterInstance, relayMessage);
     event.addAttribute(AttributeName.CURRENT_STATE.name(), currentStateOutput);
+    event.addAttribute(AttributeName.CURRENT_STATE_EXCLUDING_UNKNOWN.name(), currentStateOutput);
 
     pipeline.handle(event);
 
@@ -186,6 +190,7 @@
     // but controller should not send S->M to newly calculated master.
     currentStateOutput.setCurrentState(db, p, masterInstance, "OFFLINE");
     event.addAttribute(AttributeName.CURRENT_STATE.name(), currentStateOutput);
+    event.addAttribute(AttributeName.CURRENT_STATE_EXCLUDING_UNKNOWN.name(), currentStateOutput);
 
     String slaveInstance =
         getTopStateInstance(bestPossibleStateOutput.getInstanceStateMap(db, p),
@@ -217,6 +222,7 @@
     // Controller will not send S->M to new master.
     currentStateOutput.setPendingMessage(db, p, newMasterInstance, relayMessage);
     event.addAttribute(AttributeName.CURRENT_STATE.name(), currentStateOutput);
+    event.addAttribute(AttributeName.CURRENT_STATE_EXCLUDING_UNKNOWN.name(), currentStateOutput);
 
     event.addAttribute(AttributeName.BEST_POSSIBLE_STATE.name(), bestPossibleStateOutput);
     event.addAttribute(AttributeName.INTERMEDIATE_STATE.name(), bestPossibleStateOutput);
@@ -244,6 +250,7 @@
     currentStateOutput.setCurrentState(db, p, slaveInstance, "SLAVE");
 
     event.addAttribute(AttributeName.CURRENT_STATE.name(), currentStateOutput);
+    event.addAttribute(AttributeName.CURRENT_STATE_EXCLUDING_UNKNOWN.name(), currentStateOutput);
 
     pipeline = new Pipeline("test");
     pipeline.addStage(new MessageGenerationPhase());
@@ -271,6 +278,7 @@
     event.addAttribute(AttributeName.RESOURCES.name(), resourceMap);
     event.addAttribute(AttributeName.RESOURCES_TO_REBALANCE.name(), resourceMap);
     event.addAttribute(AttributeName.CURRENT_STATE.name(), new CurrentStateOutput());
+    event.addAttribute(AttributeName.CURRENT_STATE_EXCLUDING_UNKNOWN.name(), new CurrentStateOutput());
     event.addAttribute(AttributeName.helixmanager.name(), manager);
     event.addAttribute(AttributeName.ControllerDataProvider.name(),
         new ResourceControllerDataProvider());
@@ -284,6 +292,7 @@
     CurrentStateOutput currentStateOutput =
         populateCurrentStateFromBestPossible(bestPossibleStateOutput);
     event.addAttribute(AttributeName.CURRENT_STATE.name(), currentStateOutput);
+    event.addAttribute(AttributeName.CURRENT_STATE_EXCLUDING_UNKNOWN.name(), currentStateOutput);
 
     Partition p = new Partition(db + "_0");
 
diff --git a/helix-core/src/test/java/org/apache/helix/messaging/p2pMessage/TestP2PWithStateCancellationMessage.java b/helix-core/src/test/java/org/apache/helix/messaging/p2pMessage/TestP2PWithStateCancellationMessage.java
index ea2a4aa..9f60a4a 100644
--- a/helix-core/src/test/java/org/apache/helix/messaging/p2pMessage/TestP2PWithStateCancellationMessage.java
+++ b/helix-core/src/test/java/org/apache/helix/messaging/p2pMessage/TestP2PWithStateCancellationMessage.java
@@ -165,6 +165,7 @@
     currentStateOutput.setCurrentState(RESOURCE_NAME, new Partition("0"), "localhost_2", "SLAVE");
     currentStateOutput.setCurrentState(RESOURCE_NAME, new Partition("1"), "localhost_2", "MASTER");
     event.addAttribute(AttributeName.CURRENT_STATE.name(), currentStateOutput);
+    event.addAttribute(AttributeName.CURRENT_STATE_EXCLUDING_UNKNOWN.name(), currentStateOutput);
 
     BestPossibleStateOutput bestPossibleStateOutput = new BestPossibleStateOutput();
     bestPossibleStateOutput.setState(RESOURCE_NAME, new Partition("0"), "localhost_1", "SLAVE");
diff --git a/helix-core/src/test/java/org/apache/helix/mock/MockHelixAdmin.java b/helix-core/src/test/java/org/apache/helix/mock/MockHelixAdmin.java
index 60ee9b6..9a1311b 100644
--- a/helix-core/src/test/java/org/apache/helix/mock/MockHelixAdmin.java
+++ b/helix-core/src/test/java/org/apache/helix/mock/MockHelixAdmin.java
@@ -266,11 +266,13 @@
 
   }
 
+  @Deprecated
   @Override
   public void enableInstance(String clusterName, String instanceName, boolean enabled) {
     enableInstance(clusterName, instanceName, enabled, null, null);
   }
 
+  @Deprecated
   @Override
   public void enableInstance(String clusterName, String instanceName, boolean enabled,
       InstanceConstants.InstanceDisabledType disabledType, String reason) {
@@ -283,7 +285,8 @@
 
     ZNRecord record = (ZNRecord) _baseDataAccessor.get(instanceConfigPath, null, 0);
     InstanceConfig instanceConfig = new InstanceConfig(record);
-    instanceConfig.setInstanceEnabled(enabled);
+    instanceConfig.setInstanceOperation(enabled ? InstanceConstants.InstanceOperation.ENABLE
+        : InstanceConstants.InstanceOperation.DISABLE);
     if (!enabled) {
       instanceConfig.resetInstanceDisabledTypeAndReason();
       if (reason != null) {
@@ -296,6 +299,7 @@
     _baseDataAccessor.set(instanceConfigPath, instanceConfig.getRecord(), 0);
   }
 
+  @Deprecated
   @Override
   public void enableInstance(String clusterName, List<String> instances, boolean enabled) {
 
diff --git a/helix-core/src/test/java/org/apache/helix/model/TestInstanceConfig.java b/helix-core/src/test/java/org/apache/helix/model/TestInstanceConfig.java
index b8e6569..7da983b 100644
--- a/helix-core/src/test/java/org/apache/helix/model/TestInstanceConfig.java
+++ b/helix-core/src/test/java/org/apache/helix/model/TestInstanceConfig.java
@@ -51,7 +51,7 @@
   @Test
   public void testSetInstanceEnableWithReason() {
     InstanceConfig instanceConfig = new InstanceConfig(new ZNRecord("id"));
-    instanceConfig.setInstanceEnabled(true);
+    instanceConfig.setInstanceOperation(InstanceConstants.InstanceOperation.ENABLE);
     instanceConfig.setInstanceDisabledReason("NoShowReason");
     instanceConfig.setInstanceDisabledType(InstanceConstants.InstanceDisabledType.USER_OPERATION);
 
@@ -63,7 +63,7 @@
         .get(InstanceConfig.InstanceConfigProperty.HELIX_DISABLED_TYPE.toString()), null);
 
 
-    instanceConfig.setInstanceEnabled(false);
+    instanceConfig.setInstanceOperation(InstanceConstants.InstanceOperation.DISABLE);
     String reasonCode = "ReasonCode";
     instanceConfig.setInstanceDisabledReason(reasonCode);
     instanceConfig.setInstanceDisabledType(InstanceConstants.InstanceDisabledType.USER_OPERATION);
@@ -197,4 +197,45 @@
     Assert.assertEquals(instanceConfig.getInstanceInfoMap().get("CABINET"), "30");
     Assert.assertEquals(instanceConfig.getInstanceCapacityMap().get("weight1"), Integer.valueOf(1));
   }
+
+  @Test
+  public void testOverwriteInstanceConfig() {
+    InstanceConfig instanceConfig = new InstanceConfig("instance2");
+    instanceConfig.setHostName("host1");
+    instanceConfig.setPort("1234");
+    instanceConfig.setDomain("foo=bar");
+    instanceConfig.setWeight(100);
+    instanceConfig.setInstanceEnabled(false);
+    instanceConfig.addTag("tag1");
+    instanceConfig.addTag("tag2");
+    instanceConfig.setInstanceCapacityMap(ImmutableMap.of("weight1", 1));
+
+    InstanceConfig overrideConfig = new InstanceConfig("instance1");
+    overrideConfig.setHostName("host2");
+    overrideConfig.setPort("5678");
+    overrideConfig.setDomain("foo=bar2");
+    overrideConfig.setWeight(200);
+    overrideConfig.addTag("tag3");
+    overrideConfig.addTag("tag4");
+    overrideConfig.setInstanceOperation(InstanceConstants.InstanceOperation.EVACUATE);
+    overrideConfig.setInstanceCapacityMap(ImmutableMap.of("weight2", 2));
+
+    instanceConfig.overwriteInstanceConfig(overrideConfig);
+
+    Assert.assertEquals(instanceConfig.getId(), "instance2");
+    Assert.assertEquals(instanceConfig.getHostName(), "host1");
+    Assert.assertEquals(instanceConfig.getPort(), "1234");
+    Assert.assertEquals(instanceConfig.getDomainAsString(), "foo=bar");
+    Assert.assertEquals(instanceConfig.getWeight(), 200);
+    Assert.assertFalse(instanceConfig.getTags().contains("tag1"));
+    Assert.assertFalse(instanceConfig.getTags().contains("tag2"));
+    Assert.assertTrue(instanceConfig.getTags().contains("tag3"));
+    Assert.assertTrue(instanceConfig.getTags().contains("tag4"));
+    Assert.assertFalse(instanceConfig.getRecord().getSimpleFields()
+        .containsKey(InstanceConfig.InstanceConfigProperty.HELIX_ENABLED.toString()));
+    Assert.assertEquals(instanceConfig.getInstanceOperation(),
+        InstanceConstants.InstanceOperation.EVACUATE);
+    Assert.assertFalse(instanceConfig.getInstanceCapacityMap().containsKey("weight1"));
+    Assert.assertEquals(instanceConfig.getInstanceCapacityMap().get("weight2"), Integer.valueOf(2));
+  }
 }
diff --git a/helix-core/src/test/java/org/apache/helix/monitoring/mbeans/TestRebalancerMetrics.java b/helix-core/src/test/java/org/apache/helix/monitoring/mbeans/TestRebalancerMetrics.java
index 57a3ad0..b02a0f4 100644
--- a/helix-core/src/test/java/org/apache/helix/monitoring/mbeans/TestRebalancerMetrics.java
+++ b/helix-core/src/test/java/org/apache/helix/monitoring/mbeans/TestRebalancerMetrics.java
@@ -69,6 +69,7 @@
     event.addAttribute(AttributeName.RESOURCES.name(), resourceMap);
     event.addAttribute(AttributeName.RESOURCES_TO_REBALANCE.name(), resourceMap);
     event.addAttribute(AttributeName.CURRENT_STATE.name(), currentStateOutput);
+    event.addAttribute(AttributeName.CURRENT_STATE_EXCLUDING_UNKNOWN.name(), currentStateOutput);
     event.addAttribute(AttributeName.ControllerDataProvider.name(), new ResourceControllerDataProvider());
     ClusterStatusMonitor monitor = new ClusterStatusMonitor(_clusterName);
     monitor.active();
@@ -119,6 +120,7 @@
     event.addAttribute(AttributeName.RESOURCES.name(), resourceMap);
     event.addAttribute(AttributeName.RESOURCES_TO_REBALANCE.name(), resourceMap);
     event.addAttribute(AttributeName.CURRENT_STATE.name(), currentStateOutput);
+    event.addAttribute(AttributeName.CURRENT_STATE_EXCLUDING_UNKNOWN.name(), currentStateOutput);
     event.addAttribute(AttributeName.ControllerDataProvider.name(), new ResourceControllerDataProvider());
     ClusterStatusMonitor monitor = new ClusterStatusMonitor(_clusterName);
     monitor.active();
@@ -131,6 +133,7 @@
         event.getAttribute(AttributeName.BEST_POSSIBLE_STATE.name());
     currentStateOutput = copyCurrentStateFromBestPossible(bestPossibleStateOutput, resource);
     event.addAttribute(AttributeName.CURRENT_STATE.name(), currentStateOutput);
+    event.addAttribute(AttributeName.CURRENT_STATE_EXCLUDING_UNKNOWN.name(), currentStateOutput);
     setupLiveInstances(4);
 
 
diff --git a/helix-core/src/test/java/org/apache/helix/task/TestTargetedTaskStateChange.java b/helix-core/src/test/java/org/apache/helix/task/TestTargetedTaskStateChange.java
index d8810b1..24113c9 100644
--- a/helix-core/src/test/java/org/apache/helix/task/TestTargetedTaskStateChange.java
+++ b/helix-core/src/test/java/org/apache/helix/task/TestTargetedTaskStateChange.java
@@ -86,7 +86,7 @@
     when(mock._cache.getTaskDataCache()).thenReturn(mock._taskDataCache);
     when(mock._cache.getJobContext(JOB_NAME)).thenReturn(mock._jobContext);
     when(mock._cache.getIdealStates()).thenReturn(mock._idealStates);
-    when(mock._cache.getAssignableEnabledLiveInstances()).thenReturn(_liveInstances.keySet());
+    when(mock._cache.getEnabledLiveInstances()).thenReturn(_liveInstances.keySet());
     when(mock._cache.getAssignableInstanceConfigMap()).thenReturn(_instanceConfigs);
     when(mock._cache.getClusterConfig()).thenReturn(_clusterConfig);
     when(mock._taskDataCache.getRuntimeJobDag(WORKFLOW_NAME)).thenReturn(mock._runtimeJobDag);
@@ -123,7 +123,7 @@
     when(mock._cache.getTaskDataCache()).thenReturn(mock._taskDataCache);
     when(mock._cache.getJobContext(JOB_NAME)).thenReturn(mock._jobContext);
     when(mock._cache.getIdealStates()).thenReturn(mock._idealStates);
-    when(mock._cache.getAssignableEnabledLiveInstances()).thenReturn(_liveInstances.keySet());
+    when(mock._cache.getEnabledLiveInstances()).thenReturn(_liveInstances.keySet());
     when(mock._cache.getAssignableInstanceConfigMap()).thenReturn(_instanceConfigs);
     when(mock._cache.getClusterConfig()).thenReturn(_clusterConfig);
     when(mock._taskDataCache.getRuntimeJobDag(WORKFLOW_NAME)).thenReturn(mock._runtimeJobDag);
diff --git a/helix-core/src/test/java/org/apache/helix/util/TestIdealStateAssignment.java b/helix-core/src/test/java/org/apache/helix/util/TestIdealStateAssignment.java
index 6a7c8ba..7d2a1b3 100644
--- a/helix-core/src/test/java/org/apache/helix/util/TestIdealStateAssignment.java
+++ b/helix-core/src/test/java/org/apache/helix/util/TestIdealStateAssignment.java
@@ -23,6 +23,7 @@
 import java.util.List;
 import java.util.Map;
 
+import org.apache.helix.constants.InstanceConstants;
 import org.apache.helix.model.ClusterConfig;
 import org.apache.helix.model.IdealState;
 import org.apache.helix.model.InstanceConfig;
@@ -43,7 +44,8 @@
     for (String instance : instances) {
       instanceConfigs.add(new InstanceConfig(instance));
       if (disabledInstances.contains(instance)) {
-        instanceConfigs.get(instanceConfigs.size() - 1).setInstanceEnabled(false);
+        instanceConfigs.get(instanceConfigs.size() - 1)
+            .setInstanceOperation(InstanceConstants.InstanceOperation.DISABLE);
       }
     }
 
diff --git a/helix-core/src/test/java/org/apache/helix/util/TestInstanceValidationUtil.java b/helix-core/src/test/java/org/apache/helix/util/TestInstanceValidationUtil.java
index 79b0fdc..88dd053 100644
--- a/helix-core/src/test/java/org/apache/helix/util/TestInstanceValidationUtil.java
+++ b/helix-core/src/test/java/org/apache/helix/util/TestInstanceValidationUtil.java
@@ -33,6 +33,7 @@
 import org.apache.helix.HelixException;
 import org.apache.helix.PropertyKey;
 import org.apache.helix.PropertyType;
+import org.apache.helix.constants.InstanceConstants;
 import org.apache.helix.model.ClusterConfig;
 import org.apache.helix.model.CurrentState;
 import org.apache.helix.model.ExternalView;
@@ -77,7 +78,9 @@
       boolean expected) {
     Mock mock = new Mock();
     InstanceConfig instanceConfig = new InstanceConfig(TEST_INSTANCE);
-    instanceConfig.setInstanceEnabled(instanceConfigEnabled);
+    instanceConfig.setInstanceOperation(
+        instanceConfigEnabled ? InstanceConstants.InstanceOperation.ENABLE
+            : InstanceConstants.InstanceOperation.DISABLE);
     doReturn(instanceConfig).when(mock.dataAccessor)
         .getProperty(BUILDER.instanceConfig(TEST_INSTANCE));
     ClusterConfig clusterConfig = new ClusterConfig(TEST_CLUSTER);
@@ -101,17 +104,6 @@
     InstanceValidationUtil.isEnabled(mock.dataAccessor, TEST_INSTANCE);
   }
 
-  @Test(expectedExceptions = HelixException.class)
-  public void TestIsInstanceEnabled_whenClusterConfigNull() {
-    Mock mock = new Mock();
-    doReturn(new InstanceConfig(TEST_INSTANCE)).when(mock.dataAccessor)
-        .getProperty(argThat(new PropertyKeyArgument(PropertyType.CONFIGS)));
-    doReturn(null).when(mock.dataAccessor)
-        .getProperty(BUILDER.clusterConfig());
-
-    InstanceValidationUtil.isEnabled(mock.dataAccessor, TEST_INSTANCE);
-  }
-
   @Test
   public void TestIsInstanceAlive() {
     Mock mock = new Mock();
diff --git a/helix-core/src/test/resources/TestAbstractRebalancer.ComputeBestPossibleState.json b/helix-core/src/test/resources/TestAbstractRebalancer.ComputeBestPossibleState.json
index 3659942..30553e0 100644
--- a/helix-core/src/test/resources/TestAbstractRebalancer.ComputeBestPossibleState.json
+++ b/helix-core/src/test/resources/TestAbstractRebalancer.ComputeBestPossibleState.json
@@ -94,7 +94,8 @@
       "node_3"
     ],
     "expectedBestPossibleStateMap": {
-      "node_1": "OFFLINE"
+      "node_1": "OFFLINE",
+      "node_3": "ERROR"
     }
   },
   {
diff --git a/helix-rest/src/main/java/org/apache/helix/rest/clusterMaintenanceService/StoppableInstancesSelector.java b/helix-rest/src/main/java/org/apache/helix/rest/clusterMaintenanceService/StoppableInstancesSelector.java
index 877aaa9..8a4bbf0 100644
--- a/helix-rest/src/main/java/org/apache/helix/rest/clusterMaintenanceService/StoppableInstancesSelector.java
+++ b/helix-rest/src/main/java/org/apache/helix/rest/clusterMaintenanceService/StoppableInstancesSelector.java
@@ -268,7 +268,7 @@
       PropertyKey.Builder propertyKeyBuilder = _dataAccessor.keyBuilder();
       InstanceConfig instanceConfig =
           _dataAccessor.getProperty(propertyKeyBuilder.instanceConfig(instance));
-      if (InstanceConstants.InstanceOperation.EVACUATE.name()
+      if (InstanceConstants.InstanceOperation.EVACUATE
           .equals(instanceConfig.getInstanceOperation())) {
         toBeStoppedInstances.add(instance);
       }
diff --git a/helix-rest/src/main/java/org/apache/helix/rest/server/resources/helix/ResourceAssignmentOptimizerAccessor.java b/helix-rest/src/main/java/org/apache/helix/rest/server/resources/helix/ResourceAssignmentOptimizerAccessor.java
index 03465a9..714b53f 100644
--- a/helix-rest/src/main/java/org/apache/helix/rest/server/resources/helix/ResourceAssignmentOptimizerAccessor.java
+++ b/helix-rest/src/main/java/org/apache/helix/rest/server/resources/helix/ResourceAssignmentOptimizerAccessor.java
@@ -44,6 +44,7 @@
 import com.fasterxml.jackson.databind.ObjectMapper;
 import org.apache.helix.ConfigAccessor;
 import org.apache.helix.HelixDataAccessor;
+import org.apache.helix.constants.InstanceConstants;
 import org.apache.helix.controller.rebalancer.strategy.AutoRebalanceStrategy;
 import org.apache.helix.controller.rebalancer.strategy.RebalanceStrategy;
 import org.apache.helix.controller.rebalancer.waged.WagedRebalancer;
@@ -225,7 +226,8 @@
     // Throw exception if there is no instanceConfig for activatedInstances instance.
     for (String instance : inputFields.activatedInstances) {
       if (instanceConfigMap.containsKey(instance)) {
-        instanceConfigMap.get(instance).setInstanceEnabled(true);
+        instanceConfigMap.get(instance)
+            .setInstanceOperation(InstanceConstants.InstanceOperation.ENABLE);
       } else {
         throw new InvalidParameterException(
             "instance: " + instance + "does not have instanceConfig");
@@ -234,7 +236,8 @@
 
     for (String instance : inputFields.deactivatedInstances) {
       if (instanceConfigMap.containsKey(instance)) {
-        instanceConfigMap.get(instance).setInstanceEnabled(false);
+        instanceConfigMap.get(instance)
+            .setInstanceOperation(InstanceConstants.InstanceOperation.DISABLE);
       }
     }
 
diff --git a/helix-rest/src/test/java/org/apache/helix/rest/server/AbstractTestClass.java b/helix-rest/src/test/java/org/apache/helix/rest/server/AbstractTestClass.java
index d0f0c57..c6ff0d6 100644
--- a/helix-rest/src/test/java/org/apache/helix/rest/server/AbstractTestClass.java
+++ b/helix-rest/src/test/java/org/apache/helix/rest/server/AbstractTestClass.java
@@ -45,6 +45,7 @@
 import org.apache.helix.PropertyPathBuilder;
 import org.apache.helix.PropertyType;
 import org.apache.helix.TestHelper;
+import org.apache.helix.constants.InstanceConstants;
 import org.apache.helix.integration.manager.ClusterControllerManager;
 import org.apache.helix.integration.manager.MockParticipantManager;
 import org.apache.helix.integration.task.MockTask;
@@ -574,7 +575,7 @@
     instanceConfigs.add(new InstanceConfig(instances.get(instances.size() - 1)));
     instanceConfigs.get(instanceConfigs.size() - 1).setDomain("helixZoneId=zone2,host=instance5");
 
-    instanceConfigs.get(1).setInstanceEnabled(false);
+    instanceConfigs.get(1).setInstanceOperation(InstanceConstants.InstanceOperation.DISABLE);
     instanceConfigs.get(3).setInstanceEnabledForPartition("FakeResource", "FakePartition", false);
 
     for (InstanceConfig instanceConfig : instanceConfigs) {
diff --git a/helix-rest/src/test/java/org/apache/helix/rest/server/TestInstancesAccessor.java b/helix-rest/src/test/java/org/apache/helix/rest/server/TestInstancesAccessor.java
index 93722f0..0403083 100644
--- a/helix-rest/src/test/java/org/apache/helix/rest/server/TestInstancesAccessor.java
+++ b/helix-rest/src/test/java/org/apache/helix/rest/server/TestInstancesAccessor.java
@@ -359,7 +359,7 @@
     // Disable one selected instance0, it should failed to check
     String instance = "instance0";
     InstanceConfig instanceConfig = _configAccessor.getInstanceConfig(STOPPABLE_CLUSTER, instance);
-    instanceConfig.setInstanceEnabled(false);
+    instanceConfig.setInstanceOperation(InstanceConstants.InstanceOperation.DISABLE);
     instanceConfig.setInstanceEnabledForPartition("FakeResource", "FakePartition", false);
     _configAccessor.setInstanceConfig(STOPPABLE_CLUSTER, instance, instanceConfig);
 
@@ -377,7 +377,7 @@
             ImmutableSet.of("HELIX:HAS_DISABLED_PARTITION","HELIX:INSTANCE_NOT_ENABLED","HELIX:INSTANCE_NOT_STABLE","HELIX:MIN_ACTIVE_REPLICA_CHECK_FAILED"));
 
     // Reenable instance0, it should passed the check
-    instanceConfig.setInstanceEnabled(true);
+    instanceConfig.setInstanceOperation(InstanceConstants.InstanceOperation.ENABLE);
     instanceConfig.setInstanceEnabledForPartition("FakeResource", "FakePartition", true);
     _configAccessor.setInstanceConfig(STOPPABLE_CLUSTER, instance, instanceConfig);
     Assert.assertTrue(verifier.verifyByPolling());
diff --git a/helix-rest/src/test/java/org/apache/helix/rest/server/TestPartitionAssignmentAPI.java b/helix-rest/src/test/java/org/apache/helix/rest/server/TestPartitionAssignmentAPI.java
index 2c7a46b..e00c392 100644
--- a/helix-rest/src/test/java/org/apache/helix/rest/server/TestPartitionAssignmentAPI.java
+++ b/helix-rest/src/test/java/org/apache/helix/rest/server/TestPartitionAssignmentAPI.java
@@ -36,6 +36,7 @@
 import org.apache.helix.ConfigAccessor;
 import org.apache.helix.HelixDataAccessor;
 import org.apache.helix.TestHelper;
+import org.apache.helix.constants.InstanceConstants;
 import org.apache.helix.controller.rebalancer.DelayedAutoRebalancer;
 import org.apache.helix.controller.rebalancer.strategy.CrushEdRebalanceStrategy;
 import org.apache.helix.controller.rebalancer.waged.WagedRebalancer;
@@ -107,7 +108,7 @@
     for (int i = 0; i < DEFAULT_INSTANCE_COUNT; i++) {
       String instanceName = INSTANCE_NAME_PREFIX + (INSTANCE_START_PORT + i);
       InstanceConfig instanceConfig = new InstanceConfig(instanceName);
-      instanceConfig.setInstanceEnabled(true);
+      instanceConfig.setInstanceOperation(InstanceConstants.InstanceOperation.ENABLE);
       instanceConfig.setInstanceCapacityMap(
           Collections.singletonMap(INSTANCE_CAPACITY_KEY, DEFAULT_INSTANCE_CAPACITY));
       _gSetupTool.getClusterManagementTool().addInstance(CLUSTER_NAME, instanceConfig);
diff --git a/helix-rest/src/test/java/org/apache/helix/rest/server/TestPerInstanceAccessor.java b/helix-rest/src/test/java/org/apache/helix/rest/server/TestPerInstanceAccessor.java
index f9e48ca..943444c 100644
--- a/helix-rest/src/test/java/org/apache/helix/rest/server/TestPerInstanceAccessor.java
+++ b/helix-rest/src/test/java/org/apache/helix/rest/server/TestPerInstanceAccessor.java
@@ -36,7 +36,6 @@
 import com.fasterxml.jackson.databind.node.ArrayNode;
 import com.google.common.collect.ImmutableList;
 import com.google.common.collect.ImmutableMap;
-import org.apache.helix.ConfigAccessor;
 import org.apache.helix.HelixDataAccessor;
 import org.apache.helix.HelixException;
 import org.apache.helix.TestHelper;
@@ -50,9 +49,6 @@
 import org.apache.helix.rest.server.resources.helix.InstancesAccessor;
 import org.apache.helix.rest.server.resources.helix.PerInstanceAccessor;
 import org.apache.helix.rest.server.util.JerseyUriRequestBuilder;
-import org.apache.helix.tools.ClusterStateVerifier;
-import org.apache.helix.tools.ClusterVerifiers.StrictMatchExternalViewVerifier;
-import org.apache.helix.tools.ClusterVerifiers.ZkHelixClusterVerifier;
 import org.apache.helix.zookeeper.datamodel.ZNRecord;
 import org.testng.Assert;
 import org.testng.annotations.Test;
@@ -500,15 +496,15 @@
     new JerseyUriRequestBuilder("clusters/{}/instances/{}?command=setInstanceOperation&instanceOperation=EVACUATE")
         .format(CLUSTER_NAME, INSTANCE_NAME).post(this, entity);
     instanceConfig = _configAccessor.getInstanceConfig(CLUSTER_NAME, INSTANCE_NAME);
-    Assert.assertEquals(
-        instanceConfig.getInstanceOperation(), InstanceConstants.InstanceOperation.EVACUATE.toString());
+    Assert.assertEquals(instanceConfig.getInstanceOperation(),
+        InstanceConstants.InstanceOperation.EVACUATE);
     new JerseyUriRequestBuilder("clusters/{}/instances/{}?command=setInstanceOperation&instanceOperation=INVALIDOP")
         .expectedReturnStatusCode(Response.Status.NOT_FOUND.getStatusCode()).format(CLUSTER_NAME, INSTANCE_NAME).post(this, entity);
     new JerseyUriRequestBuilder("clusters/{}/instances/{}?command=setInstanceOperation&instanceOperation=")
         .format(CLUSTER_NAME, INSTANCE_NAME).post(this, entity);
     instanceConfig = _configAccessor.getInstanceConfig(CLUSTER_NAME, INSTANCE_NAME);
-    Assert.assertEquals(
-        instanceConfig.getInstanceOperation(), "");
+    Assert.assertEquals(instanceConfig.getInstanceOperation(),
+        InstanceConstants.InstanceOperation.ENABLE);
 
     // test canCompleteSwap
     Response canCompleteSwapResponse =
@@ -548,8 +544,8 @@
     new JerseyUriRequestBuilder("clusters/{}/instances/{}?command=setInstanceOperation&instanceOperation=EVACUATE")
         .format(CLUSTER_NAME, INSTANCE_NAME).post(this, entity);
     instanceConfig = _configAccessor.getInstanceConfig(CLUSTER_NAME, INSTANCE_NAME);
-    Assert.assertEquals(
-        instanceConfig.getInstanceOperation(), InstanceConstants.InstanceOperation.EVACUATE.toString());
+    Assert.assertEquals(instanceConfig.getInstanceOperation(),
+        InstanceConstants.InstanceOperation.EVACUATE);
 
     Response response = new JerseyUriRequestBuilder("clusters/{}/instances/{}?command=isEvacuateFinished")
         .format(CLUSTER_NAME, INSTANCE_NAME).post(this, entity);
@@ -591,8 +587,8 @@
     new JerseyUriRequestBuilder("clusters/{}/instances/{}?command=setInstanceOperation&instanceOperation=EVACUATE")
         .format(CLUSTER_NAME, test_instance_name).post(this, entity);
     instanceConfig = _configAccessor.getInstanceConfig(CLUSTER_NAME, test_instance_name);
-    Assert.assertEquals(
-        instanceConfig.getInstanceOperation(), InstanceConstants.InstanceOperation.EVACUATE.toString());
+    Assert.assertEquals(instanceConfig.getInstanceOperation(),
+        InstanceConstants.InstanceOperation.EVACUATE);
 
     response = new JerseyUriRequestBuilder("clusters/{}/instances/{}?command=isEvacuateFinished")
         .format(CLUSTER_NAME, test_instance_name).post(this, entity);
diff --git a/helix-rest/src/test/java/org/apache/helix/rest/server/TestResourceAssignmentOptimizerAccessor.java b/helix-rest/src/test/java/org/apache/helix/rest/server/TestResourceAssignmentOptimizerAccessor.java
index 8992566..7d49318 100644
--- a/helix-rest/src/test/java/org/apache/helix/rest/server/TestResourceAssignmentOptimizerAccessor.java
+++ b/helix-rest/src/test/java/org/apache/helix/rest/server/TestResourceAssignmentOptimizerAccessor.java
@@ -34,6 +34,7 @@
 import com.fasterxml.jackson.core.type.TypeReference;
 import org.apache.helix.HelixDataAccessor;
 import org.apache.helix.TestHelper;
+import org.apache.helix.constants.InstanceConstants;
 import org.apache.helix.manager.zk.ZKHelixDataAccessor;
 import org.apache.helix.model.IdealState;
 import org.apache.helix.model.InstanceConfig;
@@ -68,7 +69,7 @@
     toEnabledInstance = liveInstances.get(2);
     InstanceConfig config = _gSetupTool.getClusterManagementTool()
         .getInstanceConfig(cluster, toEnabledInstance);
-    config.setInstanceEnabled(false);
+    config.setInstanceOperation(InstanceConstants.InstanceOperation.DISABLE);
     _gSetupTool.getClusterManagementTool()
         .setInstanceConfig(cluster, toEnabledInstance, config);
 
@@ -94,7 +95,7 @@
     }
     InstanceConfig config = _gSetupTool.getClusterManagementTool()
         .getInstanceConfig(cluster, toEnabledInstance);
-    config.setInstanceEnabled(true);
+    config.setInstanceOperation(InstanceConstants.InstanceOperation.DISABLE);
     _gSetupTool.getClusterManagementTool().setInstanceConfig(cluster, toEnabledInstance, config);
     _gSetupTool.getClusterManagementTool()
         .enableMaintenanceMode(cluster, false, TestHelper.getTestMethodName());
@@ -245,8 +246,8 @@
     InstanceConfig toEnabledInstanceConfig =
         _gSetupTool.getClusterManagementTool().getInstanceConfig(cluster, toEnabledInstance);
     // Another way to mark the node as inactive or active.
-    toDeactivatedInstanceConfig.setInstanceEnabled(false);
-    toEnabledInstanceConfig.setInstanceEnabled(true);
+    toDeactivatedInstanceConfig.setInstanceOperation(InstanceConstants.InstanceOperation.DISABLE);
+    toEnabledInstanceConfig.setInstanceOperation(InstanceConstants.InstanceOperation.ENABLE);
     // Write the current InstanceConfigs record to json string
     StringWriter sw = new StringWriter();
     OBJECT_MAPPER.writeValue(sw, toDeactivatedInstanceConfig.getRecord());
diff --git a/recipes/rabbitmq-consumer-group/src/main/java/org/apache/helix/recipes/rabbitmq/Consumer.java b/recipes/rabbitmq-consumer-group/src/main/java/org/apache/helix/recipes/rabbitmq/Consumer.java
index 998f117..eddd68b 100644
--- a/recipes/rabbitmq-consumer-group/src/main/java/org/apache/helix/recipes/rabbitmq/Consumer.java
+++ b/recipes/rabbitmq-consumer-group/src/main/java/org/apache/helix/recipes/rabbitmq/Consumer.java
@@ -24,6 +24,7 @@
 import org.apache.helix.HelixManager;
 import org.apache.helix.HelixManagerFactory;
 import org.apache.helix.InstanceType;
+import org.apache.helix.constants.InstanceConstants;
 import org.apache.helix.manager.zk.ZKHelixAdmin;
 import org.apache.helix.manager.zk.ZNRecordSerializer;
 import org.apache.helix.zookeeper.impl.client.ZkClient;
@@ -98,7 +99,7 @@
       if (!nodes.contains("consumer_" + consumerId)) {
         InstanceConfig config = new InstanceConfig("consumer_" + consumerId);
         config.setHostName("localhost");
-        config.setInstanceEnabled(true);
+        config.setInstanceOperation(InstanceConstants.InstanceOperation.ENABLE);
         admin.addInstance(clusterName, config);
       }
 
diff --git a/recipes/rsync-replicated-file-system/src/main/java/org/apache/helix/filestore/SetupCluster.java b/recipes/rsync-replicated-file-system/src/main/java/org/apache/helix/filestore/SetupCluster.java
index 9dbcbfe..5b8e736 100644
--- a/recipes/rsync-replicated-file-system/src/main/java/org/apache/helix/filestore/SetupCluster.java
+++ b/recipes/rsync-replicated-file-system/src/main/java/org/apache/helix/filestore/SetupCluster.java
@@ -19,6 +19,7 @@
  * under the License.
  */
 
+import org.apache.helix.constants.InstanceConstants;
 import org.apache.helix.manager.zk.ZKHelixAdmin;
 import org.apache.helix.manager.zk.ZNRecordSerializer;
 import org.apache.helix.zookeeper.impl.client.ZkClient;
@@ -65,7 +66,7 @@
         InstanceConfig config = new InstanceConfig(serverId);
         config.setHostName("localhost");
         config.setPort(port);
-        config.setInstanceEnabled(true);
+        config.setInstanceOperation(InstanceConstants.InstanceOperation.ENABLE);
         admin.addInstance(clusterName, config);
       }
       // add resource "repository" which has 1 partition
diff --git a/recipes/task-execution/src/main/java/org/apache/helix/taskexecution/Worker.java b/recipes/task-execution/src/main/java/org/apache/helix/taskexecution/Worker.java
index 11d4395..3704c54 100644
--- a/recipes/task-execution/src/main/java/org/apache/helix/taskexecution/Worker.java
+++ b/recipes/task-execution/src/main/java/org/apache/helix/taskexecution/Worker.java
@@ -24,6 +24,7 @@
 import org.apache.helix.HelixManager;
 import org.apache.helix.HelixManagerFactory;
 import org.apache.helix.InstanceType;
+import org.apache.helix.constants.InstanceConstants;
 import org.apache.helix.manager.zk.ZKHelixAdmin;
 import org.apache.helix.manager.zk.ZNRecordSerializer;
 import org.apache.helix.zookeeper.impl.client.ZkClient;
@@ -98,7 +99,7 @@
       if (!nodes.contains(_instanceName)) {
         InstanceConfig config = new InstanceConfig(_instanceName);
         config.setHostName("localhost");
-        config.setInstanceEnabled(true);
+        config.setInstanceOperation(InstanceConstants.InstanceOperation.ENABLE);
         admin.addInstance(_clusterName, config);
       }