Replace the HELIX_ENABLED config with InstanceOperation while maintaining backwards compatibility with old APIs (#2772)
Replace the HELIX_ENABLED config with InstanceOperation while maintaining backwards compatibility with old APIs.
In order to unify HELIX_ENABLED functionality with InstanceOperation, InstanceOperation will now have the following options: ENABLE, DISABLE, EVACUATE, SWAP_IN, UNKNOWN
diff --git a/helix-common/src/main/java/org/apache/helix/constants/InstanceConstants.java b/helix-common/src/main/java/org/apache/helix/constants/InstanceConstants.java
index e2cc2de..07eb498 100644
--- a/helix-common/src/main/java/org/apache/helix/constants/InstanceConstants.java
+++ b/helix-common/src/main/java/org/apache/helix/constants/InstanceConstants.java
@@ -1,8 +1,32 @@
package org.apache.helix.constants;
+import java.util.Set;
+
public class InstanceConstants {
public static final String INSTANCE_NOT_DISABLED = "INSTANCE_NOT_DISABLED";
+ /**
+ * The set contains the InstanceOperations that are allowed to be assigned replicas by the rebalancer.
+ */
+ public static final Set<InstanceOperation> ASSIGNABLE_INSTANCE_OPERATIONS =
+ Set.of(InstanceOperation.ENABLE, InstanceOperation.DISABLE);
+
+
+ /**
+ * The set contains the InstanceOperations that are overridden when the deprecated HELIX_ENABLED
+ * field is set to false. This will maintain backwards compatibility with the deprecated field.
+ * TODO: Remove this when the deprecated HELIX_ENABLED is removed.
+ */
+ public static final Set<InstanceOperation> INSTANCE_DISABLED_OVERRIDABLE_OPERATIONS =
+ Set.of(InstanceOperation.ENABLE, InstanceOperation.DISABLE, InstanceOperation.EVACUATE);
+
+
+ /**
+ * The set of InstanceOperations that are not allowed to be populated in the RoutingTableProvider.
+ */
+ public static final Set<InstanceOperation> UNSERVABLE_INSTANCE_OPERATIONS =
+ Set.of(InstanceOperation.SWAP_IN, InstanceOperation.UNKNOWN);
+
public enum InstanceDisabledType {
CLOUD_EVENT,
USER_OPERATION,
@@ -10,8 +34,35 @@
}
public enum InstanceOperation {
- EVACUATE, // Node will be removed after a period of time
- SWAP_IN, // New node joining for swap operation
- SWAP_OUT // Existing Node to be removed for swap operation
+ /**
+ * Behavior: Replicas will be assigned to the node and will receive upward state transitions if
+ * for new assignments and downward state transitions if replicas are being moved elsewhere.
+ * Final State: The node will have replicas assigned to it and will be considered for future assignment.
+ */
+ ENABLE,
+ /**
+ * Behavior: All replicas on the node will be set to OFFLINE.
+ * Final State: The node will have all replicas in the OFFLINE state and can't take new assignment.
+ */
+ DISABLE,
+ /**
+ * Behavior: All replicas will be moved off the node, after a replacement has been bootstrapped
+ * in another node in the cluster.
+ * Final State: The node will not contain any replicas and will not be considered for *NEW* assignment.
+ */
+ EVACUATE,
+ /**
+ * Behavior: Node will have all replicas on its corresponding(same logicalId) swap-out node bootstrapped
+ * (ERROR and OFFLINE replicas on swap-out node will not be bootstrapped) to the same states if the StateModelDef allows.
+ * This node will be excluded from the RoutingTableProvider.
+ * Final State: This node will be a mirror the swap-out node, will not be considered for assignment, and will not be populated
+ * in the RoutingTableProvider.
+ */
+ SWAP_IN,
+ /**
+ * Behavior: Node will have all of its replicas dropped immediately and will be removed from the RoutingTableProvider.
+ * Final State: Node will not hold replicas, be considered for assignment, or be populated in the RoutingTableProvider.
+ */
+ UNKNOWN
}
}
diff --git a/helix-core/src/main/java/org/apache/helix/HelixAdmin.java b/helix-core/src/main/java/org/apache/helix/HelixAdmin.java
index 021332b..d2e0c26 100644
--- a/helix-core/src/main/java/org/apache/helix/HelixAdmin.java
+++ b/helix-core/src/main/java/org/apache/helix/HelixAdmin.java
@@ -281,9 +281,11 @@
* @param instanceName
* @param enabled
*/
+ @Deprecated
void enableInstance(String clusterName, String instanceName, boolean enabled);
/**
+ * @deprecated use {@link #setInstanceOperation(String, String, InstanceConstants.InstanceOperation)}
* @param clusterName
* @param instanceName
* @param enabled
@@ -292,20 +294,24 @@
* @param reason set additional string description on why the instance is disabled when
* <code>enabled</code> is false. Existing disabled reason will be over write if instance is in disabled state.
*/
+ @Deprecated
void enableInstance(String clusterName, String instanceName, boolean enabled,
InstanceConstants.InstanceDisabledType disabledType, String reason);
/**
* Batch enable/disable instances in a cluster
* By default, all the instances are enabled
+ * @deprecated use {@link #setInstanceOperation(String, String, InstanceConstants.InstanceOperation)}
* @param clusterName
* @param instances
* @param enabled
*/
+ @Deprecated
void enableInstance(String clusterName, List<String> instances, boolean enabled);
/**
- * Set the instanceOperation field.
+ * Set the instanceOperation field. Setting it to null is equivalent to
+ * ENABLE.
*
* @param clusterName The cluster name
* @param instanceName The instance name
diff --git a/helix-core/src/main/java/org/apache/helix/controller/dataproviders/BaseControllerDataProvider.java b/helix-core/src/main/java/org/apache/helix/controller/dataproviders/BaseControllerDataProvider.java
index 1e40bbb..a91ae12 100644
--- a/helix-core/src/main/java/org/apache/helix/controller/dataproviders/BaseControllerDataProvider.java
+++ b/helix-core/src/main/java/org/apache/helix/controller/dataproviders/BaseControllerDataProvider.java
@@ -64,7 +64,6 @@
import org.apache.helix.model.StateModelDefinition;
import org.apache.helix.task.TaskConstants;
import org.apache.helix.util.HelixUtil;
-import org.apache.helix.util.InstanceValidationUtil;
import org.apache.helix.zookeeper.datamodel.ZNRecord;
import org.apache.helix.zookeeper.zkclient.DataUpdater;
import org.slf4j.Logger;
@@ -120,30 +119,40 @@
private final Set<String> _disabledInstanceSet = new HashSet<>();
private static final class DerivedInstanceCache {
- // Assignable instances are instances will contain at most one instance with a given logicalId.
- // This is used for SWAP related operations where there can be two instances with the same logicalId.
+ private final Map<InstanceConstants.InstanceOperation, Map<String, InstanceConfig>>
+ _instanceConfigMapByInstanceOperation;
private final Map<String, InstanceConfig> _assignableInstanceConfigMap;
private final Map<String, LiveInstance> _assignableLiveInstancesMap;
private final Map<String, String> _swapOutInstanceNameToSwapInInstanceName;
+ private final Map<String, String> _swapInInstanceNameToSwapOutInstanceName;
private final Set<String> _liveSwapInInstanceNames;
- private final Set<String> _enabledSwapInInstanceNames;
- DerivedInstanceCache(Map<String, InstanceConfig> assignableInstanceConfigMap,
+ DerivedInstanceCache(
+ Map<InstanceConstants.InstanceOperation, Map<String, InstanceConfig>> instanceConfigMapByInstanceOperation,
+ Map<String, InstanceConfig> assignableInstanceConfigMap,
Map<String, LiveInstance> assignableLiveInstancesMap,
Map<String, String> swapOutInstanceNameToSwapInInstanceName,
- Set<String> liveSwapInInstanceNames, Set<String> enabledSwapInInstanceNames) {
+ Set<String> liveSwapInInstanceNames) {
+ _instanceConfigMapByInstanceOperation = instanceConfigMapByInstanceOperation;
_assignableInstanceConfigMap = assignableInstanceConfigMap;
_assignableLiveInstancesMap = assignableLiveInstancesMap;
_swapOutInstanceNameToSwapInInstanceName = swapOutInstanceNameToSwapInInstanceName;
+ _swapInInstanceNameToSwapOutInstanceName = swapOutInstanceNameToSwapInInstanceName.entrySet()
+ .stream().collect(Collectors.toMap(Map.Entry::getValue, Map.Entry::getKey));
_liveSwapInInstanceNames = liveSwapInInstanceNames;
- _enabledSwapInInstanceNames = enabledSwapInInstanceNames;
+ }
+
+ private Map<String, InstanceConfig> getInstanceConfigMapByInstanceOperation(
+ InstanceConstants.InstanceOperation instanceOperation) {
+ return _instanceConfigMapByInstanceOperation.getOrDefault(instanceOperation,
+ Collections.emptyMap());
}
}
// All maps and sets are encapsulated in DerivedInstanceCache to ensure that they are updated together
// as a snapshot.
private DerivedInstanceCache _derivedInstanceCache =
- new DerivedInstanceCache(new HashMap<>(), new HashMap<>(), new HashMap<>(), new HashSet<>(),
+ new DerivedInstanceCache(new HashMap<>(), new HashMap<>(), new HashMap<>(), new HashMap<>(),
new HashSet<>());
private final Map<String, MonitoredAbnormalResolver> _abnormalStateResolverMap = new HashMap<>();
private final Set<String> _timedOutInstanceDuringMaintenance = new HashSet<>();
@@ -383,15 +392,14 @@
ClusterTopologyConfig.createFromClusterConfig(clusterConfig);
// Create new caches to be populated.
+ Map<InstanceConstants.InstanceOperation, Map<String, InstanceConfig>>
+ newInstanceConfigMapByInstanceOperation = new HashMap<>();
Map<String, InstanceConfig> newAssignableInstanceConfigMap = new HashMap<>();
Map<String, LiveInstance> newAssignableLiveInstancesMap = new HashMap<>();
- Map<String, String> newSwapOutInstanceNameToSwapInInstanceName = new HashMap<>();
+ Map<String, String> newSwapOutInstanceNameToSwapOutInstanceName = new HashMap<>();
Set<String> newLiveSwapInInstanceNames = new HashSet<>();
- Set<String> newEnabledSwapInInstanceNames = new HashSet<>();
-
- Map<String, String> filteredInstancesByLogicalId = new HashMap<>();
- Map<String, String> swapOutLogicalIdsByInstanceName = new HashMap<>();
- Map<String, String> swapInInstancesByLogicalId = new HashMap<>();
+ Map<String, String> swapInLogicalIdsByInstanceName = new HashMap<>();
+ Map<String, String> nonSwapInInstancesByLogicalId = new HashMap<>();
for (Map.Entry<String, InstanceConfig> entry : instanceConfigMap.entrySet()) {
String node = entry.getKey();
@@ -404,44 +412,20 @@
String currentInstanceLogicalId =
currentInstanceConfig.getLogicalId(clusterTopologyConfig.getEndNodeType());
- // Filter out instances with duplicate logical IDs. If there are duplicates, the instance with
- // InstanceOperation SWAP_OUT will be chosen over the instance with SWAP_IN. SWAP_IN is not
- // assignable. If there are duplicates with one node having no InstanceOperation and the other
- // having SWAP_OUT, the node with no InstanceOperation will be chosen. This signifies SWAP
- // completion, therefore making the node assignable.
- if (filteredInstancesByLogicalId.containsKey(currentInstanceLogicalId)) {
- String filteredNode = filteredInstancesByLogicalId.get(currentInstanceLogicalId);
- InstanceConfig filteredDuplicateInstanceConfig = instanceConfigMap.get(filteredNode);
+ newInstanceConfigMapByInstanceOperation.computeIfAbsent(
+ currentInstanceConfig.getInstanceOperation(), k -> new HashMap<>())
+ .put(node, currentInstanceConfig);
- if ((filteredDuplicateInstanceConfig.getInstanceOperation()
- .equals(InstanceConstants.InstanceOperation.SWAP_IN.name())
- && currentInstanceConfig.getInstanceOperation()
- .equals(InstanceConstants.InstanceOperation.SWAP_OUT.name()))
- || currentInstanceConfig.getInstanceOperation().isEmpty()) {
- // If the already filtered instance is SWAP_IN and this instance is in SWAP_OUT, then replace the filtered
- // instance with this instance. If this instance has no InstanceOperation, then replace the filtered instance
- // with this instance. This is the case where the SWAP_IN node has been marked as complete or SWAP_IN exists and
- // SWAP_OUT does not. There can never be a case where both have no InstanceOperation set.
- newAssignableInstanceConfigMap.remove(filteredNode);
- newAssignableInstanceConfigMap.put(node, currentInstanceConfig);
- filteredInstancesByLogicalId.put(currentInstanceLogicalId, node);
- }
- } else if (!currentInstanceConfig.getInstanceOperation()
- .equals(InstanceConstants.InstanceOperation.EVACUATE.name())) {
- // EVACUATE instances are not considered to be assignable.
+ if (currentInstanceConfig.isAssignable()) {
newAssignableInstanceConfigMap.put(node, currentInstanceConfig);
- filteredInstancesByLogicalId.put(currentInstanceLogicalId, node);
}
if (currentInstanceConfig.getInstanceOperation()
- .equals(InstanceConstants.InstanceOperation.SWAP_OUT.name())) {
- swapOutLogicalIdsByInstanceName.put(currentInstanceConfig.getInstanceName(),
+ .equals(InstanceConstants.InstanceOperation.SWAP_IN)) {
+ swapInLogicalIdsByInstanceName.put(currentInstanceConfig.getInstanceName(),
currentInstanceLogicalId);
- }
-
- if (currentInstanceConfig.getInstanceOperation()
- .equals(InstanceConstants.InstanceOperation.SWAP_IN.name())) {
- swapInInstancesByLogicalId.put(
+ } else {
+ nonSwapInInstancesByLogicalId.put(
currentInstanceConfig.getLogicalId(clusterTopologyConfig.getEndNodeType()),
currentInstanceConfig.getInstanceName());
}
@@ -453,25 +437,20 @@
}
});
- swapOutLogicalIdsByInstanceName.forEach((swapOutInstanceName, value) -> {
- String swapInInstanceName = swapInInstancesByLogicalId.get(value);
- if (swapInInstanceName != null) {
- newSwapOutInstanceNameToSwapInInstanceName.put(swapOutInstanceName, swapInInstanceName);
+ swapInLogicalIdsByInstanceName.forEach((swapInInstanceName, swapInLogicalId) -> {
+ String swapOutInstanceName = nonSwapInInstancesByLogicalId.get(swapInLogicalId);
+ if (swapOutInstanceName != null) {
+ newSwapOutInstanceNameToSwapOutInstanceName.put(swapOutInstanceName, swapInInstanceName);
if (liveInstancesMap.containsKey(swapInInstanceName)) {
newLiveSwapInInstanceNames.add(swapInInstanceName);
}
- if (InstanceValidationUtil.isInstanceEnabled(instanceConfigMap.get(swapInInstanceName),
- clusterConfig)) {
- newEnabledSwapInInstanceNames.add(swapInInstanceName);
- }
}
});
// Replace caches with up-to-date instance sets.
- _derivedInstanceCache =
- new DerivedInstanceCache(newAssignableInstanceConfigMap, newAssignableLiveInstancesMap,
- newSwapOutInstanceNameToSwapInInstanceName, newLiveSwapInInstanceNames,
- newEnabledSwapInInstanceNames);
+ _derivedInstanceCache = new DerivedInstanceCache(newInstanceConfigMapByInstanceOperation,
+ newAssignableInstanceConfigMap, newAssignableLiveInstancesMap,
+ newSwapOutInstanceNameToSwapOutInstanceName, newLiveSwapInInstanceNames);
}
private void refreshResourceConfig(final HelixDataAccessor accessor,
@@ -722,78 +701,50 @@
}
/**
- * Return all the live nodes that are enabled and assignable
+ * Return a set of all instances that have the UNKNOWN InstanceOperation.
+ * These instances are not assignable and should have all replicas dropped
+ * immediately.
*
- * @return A new set contains live instance name and that are marked enabled
+ * @return A new set contains
*/
- public Set<String> getAssignableEnabledLiveInstances() {
- Set<String> enabledLiveInstances = new HashSet<>(getAssignableLiveInstances().keySet());
- enabledLiveInstances.removeAll(getDisabledInstances());
-
- return enabledLiveInstances;
+ public Set<String> getUnknownInstances() {
+ return Collections.unmodifiableSet(
+ _derivedInstanceCache.getInstanceConfigMapByInstanceOperation(
+ InstanceConstants.InstanceOperation.UNKNOWN).keySet());
}
/**
- * Return all the live nodes that are enabled
+ * Return all the live nodes that are enabled. If a node is enabled, it is assignable.
* @return A new set contains live instance name and that are marked enabled
*/
public Set<String> getEnabledLiveInstances() {
Set<String> enabledLiveInstances = new HashSet<>(getLiveInstances().keySet());
- enabledLiveInstances.removeAll(getDisabledInstances());
+ enabledLiveInstances.retainAll(getEnabledInstances());
return enabledLiveInstances;
}
/**
- * Return all nodes that are enabled and assignable.
- *
- * @return A new set contains instance name and that are marked enabled
- */
- public Set<String> getAssignableEnabledInstances() {
- Set<String> enabledNodes = new HashSet<>(getAssignableInstances());
- enabledNodes.removeAll(getDisabledInstances());
-
- return enabledNodes;
- }
-
- /**
- * Return all nodes that are enabled.
+ * Return all nodes that are enabled. If a node is enabled, it is assignable.
* @return A new set contains instance name and that are marked enabled
*/
public Set<String> getEnabledInstances() {
- Set<String> enabledNodes = new HashSet<>(getAllInstances());
- enabledNodes.removeAll(getDisabledInstances());
-
- return enabledNodes;
+ return new HashSet<>(_derivedInstanceCache.getInstanceConfigMapByInstanceOperation(
+ InstanceConstants.InstanceOperation.ENABLE).keySet());
}
/**
- * Return all the live nodes that are enabled and assignable and tagged with given instanceTag.
+ * Return all the live nodes that are enabled and tagged with given instanceTag. If a node is
+ * enabled, it is assignable.
*
* @param instanceTag The instance group tag.
* @return A new set contains live instance name and that are marked enabled and have the
* specified tag.
*/
- public Set<String> getAssignableEnabledLiveInstancesWithTag(String instanceTag) {
- Set<String> enabledLiveInstancesWithTag = new HashSet<>(getAssignableLiveInstances().keySet());
- Set<String> instancesWithTag = getAssignableInstancesWithTag(instanceTag);
- enabledLiveInstancesWithTag.retainAll(instancesWithTag);
- enabledLiveInstancesWithTag.removeAll(getDisabledInstances());
-
- return enabledLiveInstancesWithTag;
- }
-
- /**
- * Return all the live nodes that are enabled and tagged with given instanceTag.
- * @param instanceTag The instance group tag.
- * @return A new set contains live instance name and that are marked enabled and have the
- * specified tag.
- */
public Set<String> getEnabledLiveInstancesWithTag(String instanceTag) {
- Set<String> enabledLiveInstancesWithTag = new HashSet<>(getLiveInstances().keySet());
+ Set<String> enabledLiveInstancesWithTag = new HashSet<>(getEnabledLiveInstances());
Set<String> instancesWithTag = getAssignableInstancesWithTag(instanceTag);
enabledLiveInstancesWithTag.retainAll(instancesWithTag);
- enabledLiveInstancesWithTag.removeAll(getDisabledInstances());
return enabledLiveInstancesWithTag;
}
@@ -858,9 +809,9 @@
}
/**
- * Get all swapping instance pairs.
+ * Get all swapping instance pairs keyed by swap-out instanceNames.
*
- * @return a map of SWAP_OUT instanceNames and their corresponding SWAP_IN instanceNames.
+ * @return a map of swap out instanceNames and their corresponding SWAP_IN instanceNames.
*/
public Map<String, String> getSwapOutToSwapInInstancePairs() {
return Collections.unmodifiableMap(
@@ -868,21 +819,22 @@
}
/**
- * Get all the live SWAP_IN instances.
+ * Get all swapping instance pairs keyed by swap-in instanceNames.
*
- * @return a set of SWAP_IN instanceNames that have a corresponding SWAP_OUT instance.
+ * @return a map of swap in instanceNames and their corresponding swap out instanceNames.
*/
- public Set<String> getLiveSwapInInstanceNames() {
- return Collections.unmodifiableSet(_derivedInstanceCache._liveSwapInInstanceNames);
+ public Map<String, String> getSwapInToSwapOutInstancePairs() {
+ return Collections.unmodifiableMap(
+ _derivedInstanceCache._swapInInstanceNameToSwapOutInstanceName);
}
/**
- * Get all the enabled SWAP_IN instances.
+ * Get all the live SWAP_IN instances.
*
- * @return a set of SWAP_IN instanceNames that have a corresponding SWAP_OUT instance.
+ * @return a set of SWAP_IN instanceNames that have a corresponding swap out instance.
*/
- public Set<String> getEnabledSwapInInstanceNames() {
- return Collections.unmodifiableSet(_derivedInstanceCache._enabledSwapInInstanceNames);
+ public Set<String> getLiveSwapInInstanceNames() {
+ return Collections.unmodifiableSet(_derivedInstanceCache._liveSwapInInstanceNames);
}
public synchronized void setLiveInstances(List<LiveInstance> liveInstances) {
@@ -1127,7 +1079,7 @@
_disabledInstanceSet.clear();
for (InstanceConfig config : allInstanceConfigs) {
Map<String, List<String>> disabledPartitionMap = config.getDisabledPartitionsMap();
- if (!InstanceValidationUtil.isInstanceEnabled(config, clusterConfig)) {
+ if (config.getInstanceOperation().equals(InstanceConstants.InstanceOperation.DISABLE)) {
_disabledInstanceSet.add(config.getInstanceName());
}
for (String resource : disabledPartitionMap.keySet()) {
diff --git a/helix-core/src/main/java/org/apache/helix/controller/rebalancer/AbstractRebalancer.java b/helix-core/src/main/java/org/apache/helix/controller/rebalancer/AbstractRebalancer.java
index 160e4ed..477ef20 100644
--- a/helix-core/src/main/java/org/apache/helix/controller/rebalancer/AbstractRebalancer.java
+++ b/helix-core/src/main/java/org/apache/helix/controller/rebalancer/AbstractRebalancer.java
@@ -362,12 +362,19 @@
}
}
+ // TODO: Consider moving this logic to assignStatesToInstances since we are already passing
+ // disabledInstancesForPartition to that method.
// (2) Set initial-state to certain instances that are disabled and in preference list.
// Be careful with the conditions.
for (String instance : preferenceList) {
if (disabledInstancesForPartition.contains(instance)) {
if (currentStateMap.containsKey(instance)) {
- if (!currentStateMap.get(instance).equals(HelixDefinedState.ERROR.name())) {
+ if (currentStateMap.get(instance).equals(HelixDefinedState.ERROR.name())) {
+ // Must set to ERROR state here because assignStatesToInstances will not assign
+ // any state for disabledInstancesForPartition. This prevents the ERROR partition
+ // from being DROPPED.
+ bestPossibleStateMap.put(instance, HelixDefinedState.ERROR.name());
+ } else {
bestPossibleStateMap.put(instance, stateModelDef.getInitialState());
}
} else {
diff --git a/helix-core/src/main/java/org/apache/helix/controller/rebalancer/DelayedAutoRebalancer.java b/helix-core/src/main/java/org/apache/helix/controller/rebalancer/DelayedAutoRebalancer.java
index 252b632..d55a6ea 100644
--- a/helix-core/src/main/java/org/apache/helix/controller/rebalancer/DelayedAutoRebalancer.java
+++ b/helix-core/src/main/java/org/apache/helix/controller/rebalancer/DelayedAutoRebalancer.java
@@ -95,7 +95,7 @@
String instanceTag = currentIdealState.getInstanceGroupTag();
if (instanceTag != null) {
- assignableLiveEnabledNodes = clusterData.getAssignableEnabledLiveInstancesWithTag(instanceTag);
+ assignableLiveEnabledNodes = clusterData.getEnabledLiveInstancesWithTag(instanceTag);
assignableNodes = clusterData.getAssignableInstancesWithTag(instanceTag);
if (LOG.isInfoEnabled()) {
@@ -105,7 +105,7 @@
currentIdealState.getInstanceGroupTag(), resourceName, assignableNodes, assignableLiveEnabledNodes));
}
} else {
- assignableLiveEnabledNodes = clusterData.getAssignableEnabledLiveInstances();
+ assignableLiveEnabledNodes = clusterData.getEnabledLiveInstances();
assignableNodes = clusterData.getAssignableInstances();
}
@@ -246,7 +246,7 @@
LOG.debug("Processing resource:" + resource.getResourceName());
}
- Set<String> allNodes = cache.getAssignableEnabledInstances();
+ Set<String> allNodes = cache.getEnabledInstances();
Set<String> liveNodes = cache.getAssignableLiveInstances().keySet();
ClusterConfig clusterConfig = cache.getClusterConfig();
@@ -268,7 +268,8 @@
Map<String, String> bestStateForPartition =
// We use cache.getLiveInstances().keySet() to make sure we gracefully handle n -> n + 1 replicas if possible
// when the one of the current nodes holding the replica is no longer considered assignable. (ex: EVACUATE)
- computeBestPossibleStateForPartition(cache.getLiveInstances().keySet(), stateModelDef, preferenceList,
+ computeBestPossibleStateForPartition(cache.getLiveInstances().keySet(),
+ stateModelDef, preferenceList,
currentStateOutput, disabledInstancesForPartition, idealState, clusterConfig,
partition, cache.getAbnormalStateResolver(stateModelDefName), cache);
@@ -328,6 +329,7 @@
while (it.hasNext()) {
String instance = it.next();
String state = currentStateMap.get(instance);
+ // TODO: This may never be a possible case, figure out if we can safely remove this.
if (state == null) {
it.remove();
instancesToDrop.add(instance); // These instances should be set to DROPPED after we get bestPossibleStateMap;
@@ -405,6 +407,8 @@
}
}
+ // TODO: This may not be necessary, all of the instances bestPossibleStateMap should be set to ERROR
+ // if necessary in the call to computeBestPossibleMap.
// Adding ERROR replica mapping to best possible
// ERROR assignment should be mutual excluded from DROPPED assignment because
// once there is an ERROR replica in the mapping, bestPossibleStateMap.size() > numReplicas prevents
diff --git a/helix-core/src/main/java/org/apache/helix/controller/rebalancer/topology/Topology.java b/helix-core/src/main/java/org/apache/helix/controller/rebalancer/topology/Topology.java
index 335c30f..2618275 100644
--- a/helix-core/src/main/java/org/apache/helix/controller/rebalancer/topology/Topology.java
+++ b/helix-core/src/main/java/org/apache/helix/controller/rebalancer/topology/Topology.java
@@ -205,7 +205,7 @@
}
addEndNode(root, instanceName, instanceTopologyMap, weight, _liveInstances);
} catch (IllegalArgumentException e) {
- if (InstanceValidationUtil.isInstanceEnabled(insConfig, clusterConfig)) {
+ if (insConfig.getInstanceEnabled()) {
throw e;
} else {
logger.warn("Topology setting {} for instance {} is unset or invalid, ignore the instance!",
diff --git a/helix-core/src/main/java/org/apache/helix/controller/rebalancer/util/DelayedRebalanceUtil.java b/helix-core/src/main/java/org/apache/helix/controller/rebalancer/util/DelayedRebalanceUtil.java
index 2796064..90da408 100644
--- a/helix-core/src/main/java/org/apache/helix/controller/rebalancer/util/DelayedRebalanceUtil.java
+++ b/helix-core/src/main/java/org/apache/helix/controller/rebalancer/util/DelayedRebalanceUtil.java
@@ -29,19 +29,16 @@
import java.util.stream.Collectors;
import org.apache.helix.HelixManager;
-import org.apache.helix.constants.InstanceConstants;
import org.apache.helix.controller.dataproviders.ResourceControllerDataProvider;
import org.apache.helix.controller.rebalancer.waged.model.AssignableReplica;
import org.apache.helix.controller.rebalancer.waged.model.ClusterModelProvider;
import org.apache.helix.model.ClusterConfig;
-import org.apache.helix.model.ClusterTopologyConfig;
import org.apache.helix.model.IdealState;
import org.apache.helix.model.InstanceConfig;
import org.apache.helix.model.Partition;
import org.apache.helix.model.ResourceAssignment;
import org.apache.helix.model.ResourceConfig;
import org.apache.helix.util.InstanceValidationUtil;
-import org.apache.helix.zookeeper.datamodel.ZNRecord;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -157,7 +154,7 @@
}
// check the time instance got disabled.
- if (!InstanceValidationUtil.isInstanceEnabled(instanceConfig, clusterConfig)) {
+ if (!instanceConfig.getInstanceEnabled()) {
long disabledTime = instanceConfig.getInstanceEnabledTime();
String batchedDisabledTime = clusterConfig.getInstanceHelixDisabledTimeStamp(instance);
if (batchedDisabledTime != null && !batchedDisabledTime.isEmpty()) {
@@ -409,7 +406,7 @@
ResourceAssignment resourceAssignment) {
String resourceName = resourceAssignment.getResourceName();
IdealState currentIdealState = clusterData.getIdealState(resourceName);
- Set<String> enabledLiveInstances = clusterData.getAssignableEnabledLiveInstances();
+ Set<String> enabledLiveInstances = clusterData.getEnabledLiveInstances();
int numReplica = currentIdealState.getReplicaCount(enabledLiveInstances.size());
int minActiveReplica = DelayedRebalanceUtil.getMinActiveReplica(ResourceConfig
.mergeIdealStateWithResourceConfig(clusterData.getResourceConfig(resourceName),
@@ -430,7 +427,7 @@
private static int getMinActiveReplica(ResourceControllerDataProvider clusterData, String resourceName) {
IdealState currentIdealState = clusterData.getIdealState(resourceName);
- Set<String> enabledLiveInstances = clusterData.getAssignableEnabledLiveInstances();
+ Set<String> enabledLiveInstances = clusterData.getEnabledLiveInstances();
int numReplica = currentIdealState.getReplicaCount(enabledLiveInstances.size());
return DelayedRebalanceUtil.getMinActiveReplica(ResourceConfig
.mergeIdealStateWithResourceConfig(clusterData.getResourceConfig(resourceName),
diff --git a/helix-core/src/main/java/org/apache/helix/controller/rebalancer/waged/WagedRebalancer.java b/helix-core/src/main/java/org/apache/helix/controller/rebalancer/waged/WagedRebalancer.java
index ae7e49a..39a197b 100644
--- a/helix-core/src/main/java/org/apache/helix/controller/rebalancer/waged/WagedRebalancer.java
+++ b/helix-core/src/main/java/org/apache/helix/controller/rebalancer/waged/WagedRebalancer.java
@@ -304,7 +304,7 @@
Set<String> activeNodes =
DelayedRebalanceUtil.getActiveNodes(clusterData.getAssignableInstances(),
- clusterData.getAssignableEnabledLiveInstances(),
+ clusterData.getEnabledLiveInstances(),
clusterData.getInstanceOfflineTimeMap(),
clusterData.getAssignableLiveInstances().keySet(),
clusterData.getAssignableInstanceConfigMap(), clusterData.getClusterConfig());
@@ -401,7 +401,7 @@
RebalanceAlgorithm algorithm) throws HelixRebalanceException {
// the "real" live nodes at the time
- final Set<String> enabledLiveInstances = clusterData.getAssignableEnabledLiveInstances();
+ final Set<String> enabledLiveInstances = clusterData.getEnabledLiveInstances();
if (activeNodes.equals(enabledLiveInstances) || !requireRebalanceOverwrite(clusterData, currentResourceAssignment)) {
// no need for additional process, return the current resource assignment
@@ -602,7 +602,7 @@
ClusterConfig clusterConfig = clusterData.getClusterConfig();
boolean delayedRebalanceEnabled = DelayedRebalanceUtil.isDelayRebalanceEnabled(clusterConfig);
Set<String> offlineOrDisabledInstances = new HashSet<>(delayedActiveNodes);
- offlineOrDisabledInstances.removeAll(clusterData.getAssignableEnabledLiveInstances());
+ offlineOrDisabledInstances.removeAll(clusterData.getEnabledLiveInstances());
for (String resource : resourceSet) {
DelayedRebalanceUtil
.setRebalanceScheduler(resource, delayedRebalanceEnabled, offlineOrDisabledInstances,
@@ -623,7 +623,7 @@
String resourceName = resourceAssignment.getResourceName();
IdealState currentIdealState = clusterData.getIdealState(resourceName);
- Set<String> enabledLiveInstances = clusterData.getAssignableEnabledLiveInstances();
+ Set<String> enabledLiveInstances = clusterData.getEnabledLiveInstances();
int numReplica = currentIdealState.getReplicaCount(enabledLiveInstances.size());
int minActiveReplica = DelayedRebalanceUtil.getMinActiveReplica(ResourceConfig
diff --git a/helix-core/src/main/java/org/apache/helix/controller/rebalancer/waged/model/ClusterModelProvider.java b/helix-core/src/main/java/org/apache/helix/controller/rebalancer/waged/model/ClusterModelProvider.java
index ddd9880..75151d3 100644
--- a/helix-core/src/main/java/org/apache/helix/controller/rebalancer/waged/model/ClusterModelProvider.java
+++ b/helix-core/src/main/java/org/apache/helix/controller/rebalancer/waged/model/ClusterModelProvider.java
@@ -164,7 +164,7 @@
ResourceControllerDataProvider dataProvider, Map<String, Resource> resourceMap,
Map<String, ResourceAssignment> currentStateAssignment) {
return generateClusterModel(dataProvider, resourceMap,
- dataProvider.getAssignableEnabledLiveInstances(), Collections.emptyMap(),
+ dataProvider.getEnabledLiveInstances(), Collections.emptyMap(),
Collections.emptyMap(), currentStateAssignment,
RebalanceScopeType.GLOBAL_BASELINE);
}
diff --git a/helix-core/src/main/java/org/apache/helix/controller/stages/AttributeName.java b/helix-core/src/main/java/org/apache/helix/controller/stages/AttributeName.java
index 8771bff..0db5252 100644
--- a/helix-core/src/main/java/org/apache/helix/controller/stages/AttributeName.java
+++ b/helix-core/src/main/java/org/apache/helix/controller/stages/AttributeName.java
@@ -24,6 +24,7 @@
RESOURCES_TO_REBALANCE,
BEST_POSSIBLE_STATE,
CURRENT_STATE,
+ CURRENT_STATE_EXCLUDING_UNKNOWN,
CUSTOMIZED_STATE,
INTERMEDIATE_STATE,
MESSAGES_ALL,
diff --git a/helix-core/src/main/java/org/apache/helix/controller/stages/BestPossibleStateCalcStage.java b/helix-core/src/main/java/org/apache/helix/controller/stages/BestPossibleStateCalcStage.java
index 2a6f964..1db0ecc 100644
--- a/helix-core/src/main/java/org/apache/helix/controller/stages/BestPossibleStateCalcStage.java
+++ b/helix-core/src/main/java/org/apache/helix/controller/stages/BestPossibleStateCalcStage.java
@@ -30,10 +30,13 @@
import java.util.concurrent.Callable;
import java.util.stream.Collectors;
+import org.apache.helix.HelixDefinedState;
import org.apache.helix.HelixException;
import org.apache.helix.HelixManager;
import org.apache.helix.HelixRebalanceException;
+import org.apache.helix.constants.InstanceConstants;
import org.apache.helix.controller.LogUtil;
+import org.apache.helix.controller.common.ResourcesStateMap;
import org.apache.helix.controller.dataproviders.ResourceControllerDataProvider;
import org.apache.helix.controller.pipeline.AbstractBaseStage;
import org.apache.helix.controller.pipeline.StageException;
@@ -74,7 +77,8 @@
@Override
public void process(ClusterEvent event) throws Exception {
_eventId = event.getEventId();
- CurrentStateOutput currentStateOutput = event.getAttribute(AttributeName.CURRENT_STATE.name());
+ CurrentStateOutput currentStateOutput =
+ event.getAttribute(AttributeName.CURRENT_STATE_EXCLUDING_UNKNOWN.name());
final Map<String, Resource> resourceMap =
event.getAttribute(AttributeName.RESOURCES_TO_REBALANCE.name());
final ClusterStatusMonitor clusterStatusMonitor =
@@ -83,8 +87,8 @@
event.getAttribute(AttributeName.ControllerDataProvider.name());
if (currentStateOutput == null || resourceMap == null || cache == null) {
- throw new StageException(
- "Missing attributes in event:" + event + ". Requires CURRENT_STATE|RESOURCES|DataCache");
+ throw new StageException("Missing attributes in event:" + event
+ + ". Requires CURRENT_STATE_EXCLUDING_UNKNOWN|RESOURCES|DataCache");
}
final BestPossibleStateOutput bestPossibleStateOutput =
@@ -131,82 +135,104 @@
});
}
+ private String selectSwapInState(StateModelDefinition stateModelDef, Map<String, String> stateMap,
+ String swapOutInstance) {
+ // If the swap-in node is live, select state with the following logic:
+ // 1. If the swap-out instance's replica is in the stateMap:
+ // - if the swap-out instance's replica is a topState, select the swap-in instance's replica to the topState.
+ // if another is allowed to be added, otherwise select the swap-in instance's replica to a secondTopState.
+ // - if the swap-out instance's replica is not a topState or ERROR, select the swap-in instance's replica to the same state.
+ // - if the swap-out instance's replica is ERROR, select the swap-in instance's replica to the initialState.
+ // 2. If the swap-out instance's replica is not in the stateMap, select the swap-in instance's replica to the initialState.
+ // This happens when the swap-out node is offline.
+ if (stateMap.containsKey(swapOutInstance)) {
+ if (stateMap.get(swapOutInstance).equals(stateModelDef.getTopState()) || stateMap.get(
+ swapOutInstance).equals(HelixDefinedState.ERROR.name())) {
+ // If the swap-out instance's replica is a topState, select the swap-in instance's replica
+ // to be the topState if the StateModel allows another to be added. If not, select the swap-in
+ // to be the secondTopState.
+ String topStateCount = stateModelDef.getNumInstancesPerState(stateModelDef.getTopState());
+ if (topStateCount.equals(StateModelDefinition.STATE_REPLICA_COUNT_ALL_CANDIDATE_NODES)
+ || topStateCount.equals(StateModelDefinition.STATE_REPLICA_COUNT_ALL_REPLICAS)) {
+ // If the StateModel allows for another replica with the topState to be added,
+ // select the swap-in instance's replica to the topState.
+ return stateModelDef.getTopState();
+ }
+ // If StateModel does not allow another topState replica to be
+ // added, select the swap-in instance's replica to be the secondTopState.
+ return stateModelDef.getSecondTopStates().iterator().next();
+ }
+ // If the swap-out instance's replica is not a topState or ERROR, select the swap-in instance's replica
+ // to be the same state
+ return stateMap.get(swapOutInstance);
+ }
+ // If the swap-out instance's replica is not in the stateMap, return null
+ return null;
+ }
+
private void addSwapInInstancesToBestPossibleState(Map<String, Resource> resourceMap,
BestPossibleStateOutput bestPossibleStateOutput, ResourceControllerDataProvider cache) {
- // 1. Get all SWAP_OUT instances and corresponding SWAP_IN instance pairs in the cluster.
+ // 1. Get all swap out instances and corresponding SWAP_IN instance pairs in the cluster.
Map<String, String> swapOutToSwapInInstancePairs = cache.getSwapOutToSwapInInstancePairs();
- // 2. Get all enabled and live SWAP_IN instances in the cluster.
+ Map<String, String> swapInToSwapOutInstancePairs = cache.getSwapInToSwapOutInstancePairs();
+
+ // 2. Get all live SWAP_IN instances in the cluster.
Set<String> liveSwapInInstances = cache.getLiveSwapInInstanceNames();
- Set<String> enabledSwapInInstances = cache.getEnabledSwapInInstanceNames();
- // 3. For each SWAP_OUT instance in any of the preferenceLists, add the corresponding SWAP_IN instance to
- // the stateMap with the correct state.
- // Skipping this when there are no SWAP_IN instances that are alive will reduce computation time.
- if (!liveSwapInInstances.isEmpty() && !cache.isMaintenanceModeEnabled()) {
- resourceMap.forEach((resourceName, resource) -> {
- StateModelDefinition stateModelDef = cache.getStateModelDef(resource.getStateModelDefRef());
- bestPossibleStateOutput.getResourceStatesMap().get(resourceName).getStateMap()
- .forEach((partition, stateMap) -> {
- // We use the preferenceList for the case where the swapOutInstance goes offline.
- // We do not want to drop the replicas that may have been bootstrapped on the swapInInstance
- // in the case that the swapOutInstance goes offline and no longer has an entry in the stateMap.
- Set<String> commonInstances = new HashSet<>(
- bestPossibleStateOutput.getPreferenceList(resourceName,
- partition.getPartitionName()));
- commonInstances.retainAll(swapOutToSwapInInstancePairs.keySet());
+ if (liveSwapInInstances.isEmpty() || cache.isMaintenanceModeEnabled()) {
+ return;
+ }
- commonInstances.forEach(swapOutInstance -> {
- // If the corresponding swap-in instance is not live, skip assigning to it.
- if (!liveSwapInInstances.contains(
- swapOutToSwapInInstancePairs.get(swapOutInstance))) {
- return;
- }
+ // 3. Find the assignment for each swap-in instance
+ // <instanceName> : <resourceName> : <partitionName>
+ Map<String, Map<String, Set<String>>> swapInInstanceAssignment = new HashMap<>();
+ resourceMap.forEach((resourceName, resource) -> {
+ bestPossibleStateOutput.getResourceStatesMap().get(resourceName).getStateMap()
+ .forEach((partition, stateMap) -> {
+ // We use the preferenceList for the case where the swapOutInstance goes offline.
+ // We do not want to drop the replicas that may have been bootstrapped on the swapInInstance
+ // in the case that the swapOutInstance goes offline and no longer has an entry in the stateMap.
+ Set<String> commonInstances =
+ bestPossibleStateOutput.getInstanceStateMap(resourceName, partition) != null
+ ? new HashSet<>(
+ bestPossibleStateOutput.getInstanceStateMap(resourceName, partition).keySet())
+ : Collections.emptySet();
+ if (commonInstances.isEmpty()) {
+ return;
+ }
+ commonInstances.retainAll(swapOutToSwapInInstancePairs.keySet());
- // If the corresponding swap-in instance is not enabled, assign replicas with
- // initial state.
- if (!enabledSwapInInstances.contains(
- swapOutToSwapInInstancePairs.get(swapOutInstance))) {
- stateMap.put(swapOutToSwapInInstancePairs.get(swapOutInstance),
- stateModelDef.getInitialState());
- return;
- }
-
- // If the swap-in node is live and enabled, do assignment with the following logic:
- // 1. If the swap-out instance's replica is a secondTopState, set the swap-in instance's replica
- // to the same secondTopState.
- // 2. If the swap-out instance's replica is any other state and is in the preferenceList,
- // set the swap-in instance's replica to the topState if the StateModel allows another to be added.
- // If not, set the swap-in instance's replica to the secondTopState.
- // We can make this assumption because if there is assignment to the swapOutInstance, it must be either
- // a topState or a secondTopState.
- if (stateMap.containsKey(swapOutInstance) && stateModelDef.getSecondTopStates()
- .contains(stateMap.get(swapOutInstance))) {
- // If the swap-out instance's replica is a secondTopState, set the swap-in instance's replica
- // to the same secondTopState.
- stateMap.put(swapOutToSwapInInstancePairs.get(swapOutInstance),
- stateMap.get(swapOutInstance));
- } else {
- // If the swap-out instance's replica is any other state in the stateMap or not present in the
- // stateMap, set the swap-in instance's replica to the topState if the StateModel allows another
- // to be added. If not, set the swap-in to the secondTopState.
- String topStateCount =
- stateModelDef.getNumInstancesPerState(stateModelDef.getTopState());
- if (topStateCount.equals(
- StateModelDefinition.STATE_REPLICA_COUNT_ALL_CANDIDATE_NODES)
- || topStateCount.equals(
- StateModelDefinition.STATE_REPLICA_COUNT_ALL_REPLICAS)) {
- // If the StateModel allows for another replica with the topState to be added,
- // set the swap-in instance's replica to the topState.
- stateMap.put(swapOutToSwapInInstancePairs.get(swapOutInstance),
- stateModelDef.getTopState());
- } else {
- // If StateModel does not allow another topState replica to be
- // added, set the swap-in instance's replica to the secondTopState.
- stateMap.put(swapOutToSwapInInstancePairs.get(swapOutInstance),
- stateModelDef.getSecondTopStates().iterator().next());
- }
- }
- });
+ commonInstances.forEach(swapOutInstance -> {
+ swapInInstanceAssignment.computeIfAbsent(
+ swapOutToSwapInInstancePairs.get(swapOutInstance), k -> new HashMap<>())
+ .computeIfAbsent(resourceName, k -> new HashSet<>())
+ .add(partition.getPartitionName());
});
+ });
+ });
+
+ // 4. Add the correct states for the swap-in instances to the bestPossibleStateOutput.
+ if (!swapInInstanceAssignment.isEmpty()) {
+ swapInInstanceAssignment.forEach((swapInInstance, resourceMapForInstance) -> {
+ // If the corresponding swap-in instance is not live, skip assigning to it.
+ if (!liveSwapInInstances.contains(swapInInstance)) {
+ return;
+ }
+
+ resourceMapForInstance.forEach((resourceName, partitions) -> {
+ partitions.forEach(partitionName -> {
+ Partition partition = new Partition(partitionName);
+ Map<String, String> stateMap =
+ bestPossibleStateOutput.getInstanceStateMap(resourceName, partition);
+
+ String selectedState = selectSwapInState(
+ cache.getStateModelDef(resourceMap.get(resourceName).getStateModelDefRef()),
+ stateMap, swapInToSwapOutInstancePairs.get(swapInInstance));
+ if (stateMap != null) {
+ bestPossibleStateOutput.setState(resourceName, partition, swapInInstance,
+ selectedState);
+ }
+ });
+ });
});
}
}
@@ -250,7 +276,7 @@
// Check whether the offline/disabled instance count in the cluster exceeds the set limit,
// if yes, put the cluster into maintenance mode.
boolean isValid =
- validateOfflineInstancesLimit(cache, event.getAttribute(AttributeName.helixmanager.name()));
+ validateInstancesUnableToAcceptOnlineReplicasLimit(cache, event.getAttribute(AttributeName.helixmanager.name()));
final List<String> failureResources = new ArrayList<>();
@@ -323,25 +349,32 @@
});
}
- // Check whether the offline/disabled instance count in the cluster reaches the set limit,
+ // Check whether the offline/unable to accept online replicas instance count in the cluster reaches the set limit,
// if yes, auto enable maintenance mode, and use the maintenance rebalancer for this pipeline.
- private boolean validateOfflineInstancesLimit(final ResourceControllerDataProvider cache,
+ private boolean validateInstancesUnableToAcceptOnlineReplicasLimit(final ResourceControllerDataProvider cache,
final HelixManager manager) {
- int maxOfflineInstancesAllowed = cache.getClusterConfig().getMaxOfflineInstancesAllowed();
- if (maxOfflineInstancesAllowed >= 0) {
- int offlineCount =
- cache.getAssignableInstances().size() - cache.getAssignableEnabledLiveInstances().size();
- if (offlineCount > maxOfflineInstancesAllowed) {
+ int maxInstancesUnableToAcceptOnlineReplicas =
+ cache.getClusterConfig().getMaxOfflineInstancesAllowed();
+ if (maxInstancesUnableToAcceptOnlineReplicas >= 0) {
+ // Instead of only checking the offline instances, we consider how many instances in the cluster
+ // are not assignable and live. This is because some instances may be online but have an unassignable
+ // InstanceOperation such as EVACUATE, DISABLE, or UNKNOWN. We will exclude SWAP_IN instances from
+ // they should not account against the capacity of the cluster.
+ int instancesUnableToAcceptOnlineReplicas = cache.getInstanceConfigMap().entrySet().stream()
+ .filter(instanceEntry -> !InstanceConstants.UNSERVABLE_INSTANCE_OPERATIONS.contains(
+ instanceEntry.getValue().getInstanceOperation())).collect(Collectors.toSet())
+ .size() - cache.getEnabledLiveInstances().size();
+ if (instancesUnableToAcceptOnlineReplicas > maxInstancesUnableToAcceptOnlineReplicas) {
String errMsg = String.format(
- "Offline Instances count %d greater than allowed count %d. Put cluster %s into "
- + "maintenance mode.",
- offlineCount, maxOfflineInstancesAllowed, cache.getClusterName());
+ "Instances unable to take ONLINE replicas count %d greater than allowed count %d. Put cluster %s into "
+ + "maintenance mode.", instancesUnableToAcceptOnlineReplicas,
+ maxInstancesUnableToAcceptOnlineReplicas, cache.getClusterName());
if (manager != null) {
if (manager.getHelixDataAccessor()
.getProperty(manager.getHelixDataAccessor().keyBuilder().maintenance()) == null) {
manager.getClusterManagmentTool()
.autoEnableMaintenanceMode(manager.getClusterName(), true, errMsg,
- MaintenanceSignal.AutoTriggerReason.MAX_OFFLINE_INSTANCES_EXCEEDED);
+ MaintenanceSignal.AutoTriggerReason.MAX_INSTANCES_UNABLE_TO_ACCEPT_ONLINE_REPLICAS);
LogUtil.logWarn(logger, _eventId, errMsg);
}
} else {
diff --git a/helix-core/src/main/java/org/apache/helix/controller/stages/CurrentStateComputationStage.java b/helix-core/src/main/java/org/apache/helix/controller/stages/CurrentStateComputationStage.java
index 6fbb2b6..3bf23d2 100644
--- a/helix-core/src/main/java/org/apache/helix/controller/stages/CurrentStateComputationStage.java
+++ b/helix-core/src/main/java/org/apache/helix/controller/stages/CurrentStateComputationStage.java
@@ -28,6 +28,7 @@
import java.util.concurrent.ExecutorService;
import java.util.stream.Collectors;
+import org.apache.helix.constants.InstanceConstants;
import org.apache.helix.controller.LogUtil;
import org.apache.helix.controller.dataproviders.BaseControllerDataProvider;
import org.apache.helix.controller.dataproviders.ResourceControllerDataProvider;
@@ -43,6 +44,7 @@
import org.apache.helix.controller.rebalancer.waged.model.ClusterModelProvider;
import org.apache.helix.model.CurrentState;
import org.apache.helix.model.IdealState;
+import org.apache.helix.model.InstanceConfig;
import org.apache.helix.model.LiveInstance;
import org.apache.helix.model.Message;
import org.apache.helix.model.Message.MessageType;
@@ -86,24 +88,39 @@
Map<String, LiveInstance> liveInstances = cache.getLiveInstances();
final CurrentStateOutput currentStateOutput = new CurrentStateOutput();
+ final CurrentStateOutput currentStateExcludingUnknown = new CurrentStateOutput();
for (LiveInstance instance : liveInstances.values()) {
String instanceName = instance.getInstanceName();
String instanceSessionId = instance.getEphemeralOwner();
+ InstanceConfig instanceConfig = cache.getInstanceConfigMap().get(instanceName);
+
+ Set<Message> existingStaleMessages = cache.getStaleMessagesByInstance(instanceName);
+ Map<String, Message> messages = cache.getMessages(instanceName);
+ Map<String, Message> relayMessages = cache.getRelayMessages(instanceName);
// update current states.
updateCurrentStates(instance,
cache.getCurrentState(instanceName, instanceSessionId, _isTaskFrameworkPipeline).values(),
currentStateOutput, resourceMap);
-
- Set<Message> existingStaleMessages = cache.getStaleMessagesByInstance(instanceName);
// update pending messages
- Map<String, Message> messages = cache.getMessages(instanceName);
- Map<String, Message> relayMessages = cache.getRelayMessages(instanceName);
updatePendingMessages(instance, cache, messages.values(), relayMessages.values(),
existingStaleMessages, currentStateOutput, resourceMap);
+
+ // Only update the currentStateExcludingUnknown if the instance is not in UNKNOWN InstanceOperation.
+ if (instanceConfig == null || !instanceConfig.getInstanceOperation()
+ .equals(InstanceConstants.InstanceOperation.UNKNOWN)) {
+ // update current states.
+ updateCurrentStates(instance,
+ cache.getCurrentState(instanceName, instanceSessionId, _isTaskFrameworkPipeline)
+ .values(), currentStateExcludingUnknown, resourceMap);
+ // update pending messages
+ updatePendingMessages(instance, cache, messages.values(), relayMessages.values(),
+ existingStaleMessages, currentStateExcludingUnknown, resourceMap);
+ }
}
event.addAttribute(AttributeName.CURRENT_STATE.name(), currentStateOutput);
+ event.addAttribute(AttributeName.CURRENT_STATE_EXCLUDING_UNKNOWN.name(), currentStateExcludingUnknown);
final ClusterStatusMonitor clusterStatusMonitor =
event.getAttribute(AttributeName.clusterStatusMonitor.name());
diff --git a/helix-core/src/main/java/org/apache/helix/controller/stages/IntermediateStateCalcStage.java b/helix-core/src/main/java/org/apache/helix/controller/stages/IntermediateStateCalcStage.java
index b399004..ba2e160 100644
--- a/helix-core/src/main/java/org/apache/helix/controller/stages/IntermediateStateCalcStage.java
+++ b/helix-core/src/main/java/org/apache/helix/controller/stages/IntermediateStateCalcStage.java
@@ -661,11 +661,11 @@
// preference list
if (preferenceList != null) {
return stateModelDefinition.getStateCountMap((int) preferenceList.stream().filter(
- i -> resourceControllerDataProvider.getAssignableEnabledLiveInstances().contains(i))
+ i -> resourceControllerDataProvider.getEnabledLiveInstances().contains(i))
.count(), requiredNumReplica); // StateModelDefinition's counts
}
return stateModelDefinition.getStateCountMap(
- resourceControllerDataProvider.getAssignableEnabledLiveInstances().size(),
+ resourceControllerDataProvider.getEnabledLiveInstances().size(),
requiredNumReplica); // StateModelDefinition's counts
}
diff --git a/helix-core/src/main/java/org/apache/helix/controller/stages/MaintenanceRecoveryStage.java b/helix-core/src/main/java/org/apache/helix/controller/stages/MaintenanceRecoveryStage.java
index d262d14..1a5185a 100644
--- a/helix-core/src/main/java/org/apache/helix/controller/stages/MaintenanceRecoveryStage.java
+++ b/helix-core/src/main/java/org/apache/helix/controller/stages/MaintenanceRecoveryStage.java
@@ -82,6 +82,7 @@
String reason;
switch (internalReason) {
case MAX_OFFLINE_INSTANCES_EXCEEDED:
+ case MAX_INSTANCES_UNABLE_TO_ACCEPT_ONLINE_REPLICAS:
// Check on the number of offline/disabled instances
int numOfflineInstancesForAutoExit =
cache.getClusterConfig().getNumOfflineInstancesForAutoExit();
@@ -90,7 +91,7 @@
}
// Get the count of all instances that are either offline or disabled
int offlineDisabledCount =
- cache.getAssignableInstances().size() - cache.getAssignableEnabledLiveInstances().size();
+ cache.getAssignableInstances().size() - cache.getEnabledLiveInstances().size();
shouldExitMaintenance = offlineDisabledCount <= numOfflineInstancesForAutoExit;
reason = String.format(
"Auto-exiting maintenance mode for cluster %s; Num. of offline/disabled instances is %d, less than or equal to the exit threshold %d",
diff --git a/helix-core/src/main/java/org/apache/helix/controller/stages/MessageGenerationPhase.java b/helix-core/src/main/java/org/apache/helix/controller/stages/MessageGenerationPhase.java
index 5c22c11..859c667 100644
--- a/helix-core/src/main/java/org/apache/helix/controller/stages/MessageGenerationPhase.java
+++ b/helix-core/src/main/java/org/apache/helix/controller/stages/MessageGenerationPhase.java
@@ -135,6 +135,8 @@
new HashMap<>(resourcesStateMap.getInstanceStateMap(resourceName, partition));
Map<String, String> pendingStateMap =
currentStateOutput.getPendingStateMap(resourceName, partition);
+ Map<String, String> currentStateMap =
+ currentStateOutput.getCurrentStateMap(resourceName, partition);
// The operation is combing pending state with best possible state. Since some replicas have
// been moved from one instance to another, the instance will exist in pending state but not
@@ -146,6 +148,16 @@
}
}
+ // Look through the current state map and add DROPPED message if the instance is not in the
+ // resourceStateMap. This instance may not have had been dropped by the rebalance strategy.
+ // This check is required to ensure that the instances removed from the ideal state stateMap
+ // are properly dropped.
+ for (String instance : currentStateMap.keySet()) {
+ if (!instanceStateMap.containsKey(instance)) {
+ instanceStateMap.put(instance, HelixDefinedState.DROPPED.name());
+ }
+ }
+
// we should generate message based on the desired-state priority
// so keep generated messages in a temp map keyed by state
// desired-state->list of generated-messages
diff --git a/helix-core/src/main/java/org/apache/helix/controller/stages/ReadClusterDataStage.java b/helix-core/src/main/java/org/apache/helix/controller/stages/ReadClusterDataStage.java
index 7e8bde9..6a0ae76 100644
--- a/helix-core/src/main/java/org/apache/helix/controller/stages/ReadClusterDataStage.java
+++ b/helix-core/src/main/java/org/apache/helix/controller/stages/ReadClusterDataStage.java
@@ -90,7 +90,7 @@
instanceMessageMap.put(instanceName,
Sets.newHashSet(dataProvider.getMessages(instanceName).values()));
}
- if (!InstanceValidationUtil.isInstanceEnabled(config, clusterConfig)) {
+ if (!config.getInstanceEnabled()) {
disabledInstanceSet.add(instanceName);
}
diff --git a/helix-core/src/main/java/org/apache/helix/examples/IdealStateBuilderExample.java b/helix-core/src/main/java/org/apache/helix/examples/IdealStateBuilderExample.java
index fd91588..036a548 100644
--- a/helix-core/src/main/java/org/apache/helix/examples/IdealStateBuilderExample.java
+++ b/helix-core/src/main/java/org/apache/helix/examples/IdealStateBuilderExample.java
@@ -19,6 +19,7 @@
* under the License.
*/
+import org.apache.helix.constants.InstanceConstants;
import org.apache.helix.controller.HelixControllerMain;
import org.apache.helix.manager.zk.ZKHelixAdmin;
import org.apache.helix.manager.zk.ZNRecordSerializer;
@@ -73,7 +74,7 @@
InstanceConfig config = new InstanceConfig("localhost_" + port);
config.setHostName("localhost");
config.setPort(Integer.toString(port));
- config.setInstanceEnabled(true);
+ config.setInstanceOperation(InstanceConstants.InstanceOperation.ENABLE);
admin.addInstance(clusterName, config);
}
diff --git a/helix-core/src/main/java/org/apache/helix/examples/IdealStateExample.java b/helix-core/src/main/java/org/apache/helix/examples/IdealStateExample.java
index 1f3939c..fa5b7cd 100644
--- a/helix-core/src/main/java/org/apache/helix/examples/IdealStateExample.java
+++ b/helix-core/src/main/java/org/apache/helix/examples/IdealStateExample.java
@@ -19,6 +19,7 @@
* under the License.
*/
+import org.apache.helix.constants.InstanceConstants;
import org.apache.helix.controller.HelixControllerMain;
import org.apache.helix.manager.zk.ZKHelixAdmin;
import org.apache.helix.manager.zk.ZNRecordSerializer;
@@ -113,7 +114,7 @@
InstanceConfig config = new InstanceConfig("localhost_" + port);
config.setHostName("localhost");
config.setPort(Integer.toString(port));
- config.setInstanceEnabled(true);
+ config.setInstanceOperation(InstanceConstants.InstanceOperation.ENABLE);
admin.addInstance(clusterName, config);
}
diff --git a/helix-core/src/main/java/org/apache/helix/examples/Quickstart.java b/helix-core/src/main/java/org/apache/helix/examples/Quickstart.java
index 9cc14b6..37fd1ac 100644
--- a/helix-core/src/main/java/org/apache/helix/examples/Quickstart.java
+++ b/helix-core/src/main/java/org/apache/helix/examples/Quickstart.java
@@ -29,6 +29,7 @@
import org.apache.helix.HelixManager;
import org.apache.helix.HelixManagerFactory;
import org.apache.helix.InstanceType;
+import org.apache.helix.constants.InstanceConstants;
import org.apache.helix.controller.HelixControllerMain;
import org.apache.helix.manager.zk.ZKHelixAdmin;
import org.apache.helix.model.ExternalView;
@@ -69,7 +70,7 @@
InstanceConfig instanceConfig = new InstanceConfig("localhost_" + port);
instanceConfig.setHostName("localhost");
instanceConfig.setPort("" + port);
- instanceConfig.setInstanceEnabled(true);
+ instanceConfig.setInstanceOperation(InstanceConstants.InstanceOperation.ENABLE);
INSTANCE_CONFIG_LIST.add(instanceConfig);
}
@@ -190,7 +191,7 @@
InstanceConfig instanceConfig = new InstanceConfig("localhost_" + port);
instanceConfig.setHostName("localhost");
instanceConfig.setPort("" + port);
- instanceConfig.setInstanceEnabled(true);
+ instanceConfig.setInstanceOperation(InstanceConstants.InstanceOperation.ENABLE);
echo("ADDING NEW NODE :" + instanceConfig.getInstanceName()
+ ". Partitions will move from old nodes to the new node.");
admin.addInstance(CLUSTER_NAME, instanceConfig);
diff --git a/helix-core/src/main/java/org/apache/helix/manager/zk/ZKHelixAdmin.java b/helix-core/src/main/java/org/apache/helix/manager/zk/ZKHelixAdmin.java
index d52967f..c7fe086 100644
--- a/helix-core/src/main/java/org/apache/helix/manager/zk/ZKHelixAdmin.java
+++ b/helix-core/src/main/java/org/apache/helix/manager/zk/ZKHelixAdmin.java
@@ -61,7 +61,6 @@
import org.apache.helix.api.status.ClusterManagementModeRequest;
import org.apache.helix.api.topology.ClusterTopology;
import org.apache.helix.constants.InstanceConstants;
-import org.apache.helix.controller.rebalancer.DelayedAutoRebalancer;
import org.apache.helix.controller.rebalancer.strategy.RebalanceStrategy;
import org.apache.helix.controller.rebalancer.util.WagedValidationUtil;
import org.apache.helix.controller.rebalancer.waged.WagedRebalancer;
@@ -119,10 +118,10 @@
public static final String CONNECTION_TIMEOUT = "helixAdmin.timeOutInSec";
private static final String MAINTENANCE_ZNODE_ID = "maintenance";
private static final int DEFAULT_SUPERCLUSTER_REPLICA = 3;
- private static final ImmutableSet<String> ALLOWED_INSTANCE_OPERATIONS_FOR_ADD_INSTANCE =
- ImmutableSet.of("", InstanceConstants.InstanceOperation.SWAP_IN.name());
- private static final ImmutableSet<String> INSTANCE_OPERATION_TO_EXCLUDE_FROM_ASSIGNMENT =
- ImmutableSet.of(InstanceConstants.InstanceOperation.EVACUATE.name());
+ private static final ImmutableSet<InstanceConstants.InstanceOperation>
+ INSTANCE_OPERATION_TO_EXCLUDE_FROM_ASSIGNMENT =
+ ImmutableSet.of(InstanceConstants.InstanceOperation.EVACUATE,
+ InstanceConstants.InstanceOperation.UNKNOWN);
private final RealmAwareZkClient _zkClient;
private final ConfigAccessor _configAccessor;
@@ -206,113 +205,29 @@
throw new HelixException("Node " + nodeId + " already exists in cluster " + clusterName);
}
- if (!ALLOWED_INSTANCE_OPERATIONS_FOR_ADD_INSTANCE.contains(
- instanceConfig.getInstanceOperation())) {
+ List<InstanceConfig> matchingLogicalIdInstances =
+ findInstancesMatchingLogicalId(clusterName, instanceConfig);
+ if (matchingLogicalIdInstances.size() > 1) {
throw new HelixException(
- "Instance can only be added if InstanceOperation is set to one of" + "the following: "
- + ALLOWED_INSTANCE_OPERATIONS_FOR_ADD_INSTANCE + " This instance: " + nodeId
- + " has InstanceOperation set to " + instanceConfig.getInstanceOperation());
+ "There are already more than one instance with the same logicalId in the cluster: "
+ + matchingLogicalIdInstances.stream().map(InstanceConfig::getInstanceName)
+ .collect(Collectors.joining(", "))
+ + " Please make sure there is at most 2 instance with the same logicalId in the cluster.");
}
- // Get the topology key used to determine the logicalId of a node.
- ClusterConfig clusterConfig = _configAccessor.getClusterConfig(clusterName);
- ClusterTopologyConfig clusterTopologyConfig =
- ClusterTopologyConfig.createFromClusterConfig(clusterConfig);
- String logicalIdKey = clusterTopologyConfig.getEndNodeType();
- String faultZoneKey = clusterTopologyConfig.getFaultZoneType();
- String toAddInstanceLogicalId = instanceConfig.getLogicalId(logicalIdKey);
-
- HelixConfigScope instanceConfigScope =
- new HelixConfigScopeBuilder(HelixConfigScope.ConfigScopeProperty.PARTICIPANT,
- clusterName).build();
- List<String> existingInstanceIds = getConfigKeys(instanceConfigScope);
- List<InstanceConfig> foundInstanceConfigsWithMatchingLogicalId =
- existingInstanceIds.parallelStream()
- .map(existingInstanceId -> getInstanceConfig(clusterName, existingInstanceId)).filter(
- existingInstanceConfig -> existingInstanceConfig.getLogicalId(logicalIdKey)
- .equals(toAddInstanceLogicalId)).collect(Collectors.toList());
-
- if (foundInstanceConfigsWithMatchingLogicalId.size() >= 2) {
- // If the length is 2, we cannot add an instance with the same logicalId as an existing instance
- // regardless of InstanceOperation.
- throw new HelixException(
- "There can only be 2 instances with the same logicalId in a cluster. "
- + "Existing instances: " + foundInstanceConfigsWithMatchingLogicalId.get(0)
- .getInstanceName() + " and " + foundInstanceConfigsWithMatchingLogicalId.get(1)
- .getInstanceName() + " already have the same logicalId: " + toAddInstanceLogicalId
- + "; therefore, " + nodeId + " cannot be added to the cluster.");
- } else if (foundInstanceConfigsWithMatchingLogicalId.size() == 1) {
- // If there is only one instance with the same logicalId,
- // we can infer that the intended behaviour is to SWAP_IN or EVACUATE + ADD.
- if (foundInstanceConfigsWithMatchingLogicalId.get(0).getInstanceOperation()
- .equals(InstanceConstants.InstanceOperation.SWAP_OUT.name())) {
- // If the existing instance with the same logicalId has SWAP_OUT InstanceOperation
-
- // If the InstanceOperation is unset, we will set it to SWAP_IN.
- if (!instanceConfig.getInstanceOperation()
- .equals(InstanceConstants.InstanceOperation.SWAP_IN.name())) {
- instanceConfig.setInstanceOperation(InstanceConstants.InstanceOperation.SWAP_IN);
- }
-
- // If the existing instance with the same logicalId is not in the same FAULT_ZONE as this instance, we cannot
- // add this instance.
- if (!foundInstanceConfigsWithMatchingLogicalId.get(0).getDomainAsMap()
- .containsKey(faultZoneKey) || !instanceConfig.getDomainAsMap().containsKey(faultZoneKey)
- || !foundInstanceConfigsWithMatchingLogicalId.get(0).getDomainAsMap().get(faultZoneKey)
- .equals(instanceConfig.getDomainAsMap().get(faultZoneKey))) {
- throw new HelixException(
- "Instance can only be added if the SWAP_OUT instance sharing the same logicalId is in the same FAULT_ZONE"
- + " as this instance. " + "Existing instance: "
- + foundInstanceConfigsWithMatchingLogicalId.get(0).getInstanceName()
- + " has FAULT_ZONE_TYPE: " + foundInstanceConfigsWithMatchingLogicalId.get(0)
- .getDomainAsMap().get(faultZoneKey) + " and this instance: " + nodeId
- + " has FAULT_ZONE_TYPE: " + instanceConfig.getDomainAsMap().get(faultZoneKey));
- }
-
- Map<String, Integer> foundInstanceCapacityMap =
- foundInstanceConfigsWithMatchingLogicalId.get(0).getInstanceCapacityMap().isEmpty()
- ? clusterConfig.getDefaultInstanceCapacityMap()
- : foundInstanceConfigsWithMatchingLogicalId.get(0).getInstanceCapacityMap();
- Map<String, Integer> instanceCapacityMap = instanceConfig.getInstanceCapacityMap().isEmpty()
- ? clusterConfig.getDefaultInstanceCapacityMap()
- : instanceConfig.getInstanceCapacityMap();
- // If the instance does not have the same capacity, we cannot add this instance.
- if (!new EqualsBuilder().append(foundInstanceCapacityMap, instanceCapacityMap).isEquals()) {
- throw new HelixException(
- "Instance can only be added if the SWAP_OUT instance sharing the same logicalId has the same capacity"
- + " as this instance. " + "Existing instance: "
- + foundInstanceConfigsWithMatchingLogicalId.get(0).getInstanceName()
- + " has capacity: " + foundInstanceCapacityMap + " and this instance: " + nodeId
- + " has capacity: " + instanceCapacityMap);
- }
- } else if (foundInstanceConfigsWithMatchingLogicalId.get(0).getInstanceOperation()
- .equals(InstanceConstants.InstanceOperation.EVACUATE.name())) {
- // No need to check anything on the new node, the old node will be evacuated and the new node
- // will be added.
- } else {
- // If the instanceConfig.getInstanceEnabled() is true and the existing instance with the same logicalId
- // does not have InstanceOperation set to one of the above, we cannot add this instance.
- throw new HelixException(
- "Instance can only be added if the exising instance sharing the same logicalId"
- + " has InstanceOperation set to "
- + InstanceConstants.InstanceOperation.SWAP_OUT.name()
- + " and this instance has InstanceOperation set to "
- + InstanceConstants.InstanceOperation.SWAP_IN.name()
- + " or the existing instance sharing the same logicalId has Instance Operation set to "
- + InstanceConstants.InstanceOperation.EVACUATE.name()
- + " and this instance has InstanceOperation unset. Existing instance: "
- + foundInstanceConfigsWithMatchingLogicalId.get(0).getInstanceName()
- + " has InstanceOperation: " + foundInstanceConfigsWithMatchingLogicalId.get(0)
- .getInstanceOperation());
- }
- } else if (!instanceConfig.getInstanceOperation().isEmpty()) {
- // If there are no instances with the same logicalId, we can only add this instance if InstanceOperation
- // is unset because it is a new instance.
- throw new HelixException(
- "There is no instance with logicalId: " + toAddInstanceLogicalId + " in cluster: "
- + clusterName + "; therefore, " + nodeId
- + " cannot join cluster with InstanceOperation set to "
- + instanceConfig.getInstanceOperation() + ".");
+ InstanceConstants.InstanceOperation attemptedInstanceOperation =
+ instanceConfig.getInstanceOperation();
+ try {
+ validateInstanceOperationTransition(instanceConfig,
+ !matchingLogicalIdInstances.isEmpty() ? matchingLogicalIdInstances.get(0) : null,
+ InstanceConstants.InstanceOperation.UNKNOWN,
+ attemptedInstanceOperation, clusterName);
+ } catch (HelixException e) {
+ instanceConfig.setInstanceOperation(InstanceConstants.InstanceOperation.UNKNOWN);
+ logger.error("Failed to add instance " + instanceConfig.getInstanceName() + " to cluster "
+ + clusterName + " with instance operation " + attemptedInstanceOperation
+ + ". Setting INSTANCE_OPERATION to " + instanceConfig.getInstanceOperation()
+ + " instead.", e);
}
ZKUtil.createChildren(_zkClient, instanceConfigsPath, instanceConfig.getRecord());
@@ -464,12 +379,14 @@
return accessor.setProperty(instanceConfigPropertyKey, newInstanceConfig);
}
+ @Deprecated
@Override
public void enableInstance(final String clusterName, final String instanceName,
final boolean enabled) {
enableInstance(clusterName, instanceName, enabled, null, null);
}
+ @Deprecated
@Override
public void enableInstance(final String clusterName, final String instanceName,
final boolean enabled, InstanceConstants.InstanceDisabledType disabledType, String reason) {
@@ -477,20 +394,6 @@
clusterName);
BaseDataAccessor<ZNRecord> baseAccessor = new ZkBaseDataAccessor<>(_zkClient);
- // If enabled is set to true and InstanceOperation is SWAP_IN, we should fail if there is not a
- // matching SWAP_OUT instance.
- InstanceConfig instanceConfig = getInstanceConfig(clusterName, instanceName);
- if (enabled && instanceConfig.getInstanceOperation()
- .equals(InstanceConstants.InstanceOperation.SWAP_IN.name())) {
- InstanceConfig matchingSwapInstance = findMatchingSwapInstance(clusterName, instanceConfig);
- if (matchingSwapInstance == null || !matchingSwapInstance.getInstanceOperation()
- .equals(InstanceConstants.InstanceOperation.SWAP_OUT.name())) {
- throw new HelixException("Instance cannot be enabled if InstanceOperation is set to "
- + instanceConfig.getInstanceOperation() + " when there is no matching "
- + InstanceConstants.InstanceOperation.SWAP_OUT.name() + " instance.");
- }
- }
-
// Eventually we will have all instances' enable/disable information in clusterConfig. Now we
// update both instanceConfig and clusterConfig in transition period.
enableSingleInstance(clusterName, instanceName, enabled, baseAccessor, disabledType, reason);
@@ -499,6 +402,7 @@
}
+ @Deprecated
@Override
public void enableInstance(String clusterName, List<String> instances, boolean enabled) {
// TODO: batch enable/disable is breaking backward compatibility on instance enable with older library
@@ -509,62 +413,88 @@
//enableInstance(clusterName, instances, enabled, null, null);
}
+ private void validateInstanceOperationTransition(InstanceConfig instanceConfig,
+ InstanceConfig matchingLogicalIdInstance,
+ InstanceConstants.InstanceOperation currentOperation,
+ InstanceConstants.InstanceOperation targetOperation,
+ String clusterName) {
+ boolean targetStateEnableOrDisable =
+ targetOperation.equals(InstanceConstants.InstanceOperation.ENABLE)
+ || targetOperation.equals(InstanceConstants.InstanceOperation.DISABLE);
+ switch (currentOperation) {
+ case ENABLE:
+ case DISABLE:
+ // ENABLE or DISABLE can be set to ENABLE, DISABLE, or EVACUATE at any time.
+ if (ImmutableSet.of(InstanceConstants.InstanceOperation.ENABLE,
+ InstanceConstants.InstanceOperation.DISABLE,
+ InstanceConstants.InstanceOperation.EVACUATE).contains(targetOperation)) {
+ return;
+ }
+ case SWAP_IN:
+ // We can only ENABLE or DISABLE a SWAP_IN instance if there is an instance with matching logicalId
+ // with an InstanceOperation set to UNKNOWN.
+ if ((targetStateEnableOrDisable && (matchingLogicalIdInstance == null
+ || matchingLogicalIdInstance.getInstanceOperation()
+ .equals(InstanceConstants.InstanceOperation.UNKNOWN))) || targetOperation.equals(
+ InstanceConstants.InstanceOperation.UNKNOWN)) {
+ return;
+ }
+ case EVACUATE:
+ // EVACUATE can only be set to ENABLE or DISABLE when there is no instance with the same
+ // logicalId in the cluster.
+ if ((targetStateEnableOrDisable && matchingLogicalIdInstance == null)
+ || targetOperation.equals(InstanceConstants.InstanceOperation.UNKNOWN)) {
+ return;
+ }
+ case UNKNOWN:
+ // UNKNOWN can be set to ENABLE or DISABLE when there is no instance with the same logicalId in the cluster
+ // or the instance with the same logicalId in the cluster has InstanceOperation set to EVACUATE.
+ // UNKNOWN can be set to SWAP_IN when there is an instance with the same logicalId in the cluster set to ENABLE,
+ // or DISABLE.
+ if ((targetStateEnableOrDisable && (matchingLogicalIdInstance == null
+ || matchingLogicalIdInstance.getInstanceOperation()
+ .equals(InstanceConstants.InstanceOperation.EVACUATE)))) {
+ return;
+ } else if (targetOperation.equals(InstanceConstants.InstanceOperation.SWAP_IN)
+ && matchingLogicalIdInstance != null && !ImmutableSet.of(
+ InstanceConstants.InstanceOperation.UNKNOWN,
+ InstanceConstants.InstanceOperation.EVACUATE)
+ .contains(matchingLogicalIdInstance.getInstanceOperation())) {
+ return;
+ }
+ default:
+ throw new HelixException(
+ "InstanceOperation cannot be set to " + targetOperation + " when the instance is in "
+ + currentOperation + " state");
+ }
+ }
+
+ /**
+ * Set the InstanceOperation of an instance in the cluster.
+ *
+ * @param clusterName The cluster name
+ * @param instanceName The instance name
+ * @param instanceOperation The instance operation
+ */
@Override
- // TODO: Name may change in future
public void setInstanceOperation(String clusterName, String instanceName,
@Nullable InstanceConstants.InstanceOperation instanceOperation) {
BaseDataAccessor<ZNRecord> baseAccessor = new ZkBaseDataAccessor<>(_zkClient);
String path = PropertyPathBuilder.instanceConfig(clusterName, instanceName);
- // InstanceOperation can only be set to SWAP_IN when the instance is added to the cluster
- // or if it is disabled.
- if (instanceOperation != null && instanceOperation.equals(
- InstanceConstants.InstanceOperation.SWAP_IN) && getInstanceConfig(clusterName,
- instanceName).getInstanceEnabled()) {
- throw new HelixException("InstanceOperation should only be set to "
- + InstanceConstants.InstanceOperation.SWAP_IN.name()
- + " when an instance joins the cluster for the first time(when "
- + "creating the InstanceConfig) or is disabled.");
+ InstanceConfig instanceConfig = getInstanceConfig(clusterName, instanceName);
+ if (instanceConfig == null) {
+ throw new HelixException("Cluster " + clusterName + ", instance: " + instanceName
+ + ", instance config does not exist");
}
-
- // InstanceOperation cannot be set to null if there is an instance with the same logicalId in
- // the cluster which does not have InstanceOperation set to SWAP_IN or SWAP_OUT.
- if (instanceOperation == null) {
- InstanceConfig instanceConfig = getInstanceConfig(clusterName, instanceName);
- String logicalIdKey = ClusterTopologyConfig.createFromClusterConfig(
- _configAccessor.getClusterConfig(clusterName)).getEndNodeType();
-
- HelixConfigScope instanceConfigScope =
- new HelixConfigScopeBuilder(HelixConfigScope.ConfigScopeProperty.PARTICIPANT,
- clusterName).build();
- List<String> existingInstanceIds = getConfigKeys(instanceConfigScope);
- List<InstanceConfig> matchingInstancesWithNonSwappingInstanceOperation =
- existingInstanceIds.parallelStream()
- .map(existingInstanceId -> getInstanceConfig(clusterName, existingInstanceId)).filter(
- existingInstanceConfig ->
- !existingInstanceConfig.getInstanceName().equals(instanceName)
- && existingInstanceConfig.getLogicalId(logicalIdKey)
- .equals(instanceConfig.getLogicalId(logicalIdKey))
- && !existingInstanceConfig.getInstanceOperation()
- .equals(InstanceConstants.InstanceOperation.SWAP_IN.name())
- && !existingInstanceConfig.getInstanceOperation()
- .equals(InstanceConstants.InstanceOperation.SWAP_OUT.name())
- && !existingInstanceConfig.getInstanceOperation()
- .equals(InstanceConstants.InstanceOperation.EVACUATE.name()))
- .collect(Collectors.toList());
-
- if (!matchingInstancesWithNonSwappingInstanceOperation.isEmpty()) {
- throw new HelixException("InstanceOperation cannot be set to null for " + instanceName
- + " if there are other instances with the same logicalId in the cluster that do not have"
- + " InstanceOperation set to SWAP_IN, SWAP_OUT, or EVACUATE.");
- }
- }
-
- if (!baseAccessor.exists(path, 0)) {
- throw new HelixException(
- "Cluster " + clusterName + ", instance: " + instanceName + ", instance config does not exist");
- }
+ List<InstanceConfig> matchingLogicalIdInstances =
+ findInstancesMatchingLogicalId(clusterName, instanceConfig);
+ validateInstanceOperationTransition(instanceConfig,
+ !matchingLogicalIdInstances.isEmpty() ? matchingLogicalIdInstances.get(0) : null,
+ instanceConfig.getInstanceOperation(),
+ instanceOperation == null ? InstanceConstants.InstanceOperation.ENABLE : instanceOperation,
+ clusterName);
boolean succeeded = baseAccessor.update(path, new DataUpdater<ZNRecord>() {
@Override
@@ -589,50 +519,33 @@
public boolean isEvacuateFinished(String clusterName, String instanceName) {
if (!instanceHasFullAutoCurrentStateOrMessage(clusterName, instanceName)) {
InstanceConfig config = getInstanceConfig(clusterName, instanceName);
- return config != null && config.getInstanceOperation().equals(InstanceConstants.InstanceOperation.EVACUATE.name());
+ return config != null && config.getInstanceOperation()
+ .equals(InstanceConstants.InstanceOperation.EVACUATE);
}
return false;
}
/**
- * Find the instance that the passed instance is swapping with. If the passed instance has
- * SWAP_OUT instanceOperation, then find the corresponding instance that has SWAP_IN
- * instanceOperation. If the passed instance has SWAP_IN instanceOperation, then find the
- * corresponding instance that has SWAP_OUT instanceOperation.
+ * Find the instance that the passed instance has a matching logicalId with.
*
* @param clusterName The cluster name
- * @param instanceConfig The instance to find the swap instance for
- * @return The swap instance if found, null otherwise.
+ * @param instanceConfig The instance to find the matching instance for
+ * @return The matching instance if found, null otherwise.
*/
- @Nullable
- private InstanceConfig findMatchingSwapInstance(String clusterName,
+ private List<InstanceConfig> findInstancesMatchingLogicalId(String clusterName,
InstanceConfig instanceConfig) {
String logicalIdKey =
ClusterTopologyConfig.createFromClusterConfig(_configAccessor.getClusterConfig(clusterName))
.getEndNodeType();
-
- for (String potentialSwappingInstance : getConfigKeys(
+ return getConfigKeys(
new HelixConfigScopeBuilder(HelixConfigScope.ConfigScopeProperty.PARTICIPANT,
- clusterName).build())) {
- InstanceConfig potentialSwappingInstanceConfig =
- getInstanceConfig(clusterName, potentialSwappingInstance);
-
- // Return if there is a matching Instance with the same logicalId and opposite InstanceOperation swap operation.
- if (potentialSwappingInstanceConfig.getLogicalId(logicalIdKey)
- .equals(instanceConfig.getLogicalId(logicalIdKey)) && (
- instanceConfig.getInstanceOperation()
- .equals(InstanceConstants.InstanceOperation.SWAP_IN.name())
- && potentialSwappingInstanceConfig.getInstanceOperation()
- .equals(InstanceConstants.InstanceOperation.SWAP_OUT.name())) || (
- instanceConfig.getInstanceOperation()
- .equals(InstanceConstants.InstanceOperation.SWAP_OUT.name())
- && potentialSwappingInstanceConfig.getInstanceOperation()
- .equals(InstanceConstants.InstanceOperation.SWAP_IN.name()))) {
- return potentialSwappingInstanceConfig;
- }
- }
-
- return null;
+ clusterName).build()).stream()
+ .map(instanceName -> getInstanceConfig(clusterName, instanceName)).filter(
+ potentialInstanceConfig ->
+ !potentialInstanceConfig.getInstanceName().equals(instanceConfig.getInstanceName())
+ && potentialInstanceConfig.getLogicalId(logicalIdKey)
+ .equals(instanceConfig.getLogicalId(logicalIdKey)))
+ .collect(Collectors.toList());
}
/**
@@ -661,14 +574,13 @@
accessor.getProperty(keyBuilder.liveInstance(swapInInstanceName));
InstanceConfig swapOutInstanceConfig = getInstanceConfig(clusterName, swapOutInstanceName);
InstanceConfig swapInInstanceConfig = getInstanceConfig(clusterName, swapInInstanceName);
- if (swapInLiveInstance == null || !swapInInstanceConfig.getInstanceEnabled()) {
+ if (swapInLiveInstance == null) {
logger.warn(
- "SwapOutInstance {} is {} + {} and SwapInInstance {} is {} + {} for cluster {}. Swap will"
- + " not complete unless SwapInInstance instance is ENABLED and ONLINE.",
+ "SwapOutInstance {} is {} + {} and SwapInInstance {} is OFFLINE + {} for cluster {}. Swap will"
+ + " not complete unless SwapInInstance instance is ONLINE.",
swapOutInstanceName, swapOutLiveInstance != null ? "ONLINE" : "OFFLINE",
- swapOutInstanceConfig.getInstanceEnabled() ? "ENABLED" : "DISABLED", swapInInstanceName,
- swapInLiveInstance != null ? "ONLINE" : "OFFLINE",
- swapInInstanceConfig.getInstanceEnabled() ? "ENABLED" : "DISABLED", clusterName);
+ swapOutInstanceConfig.getInstanceOperation(), swapInInstanceName,
+ swapInInstanceConfig.getInstanceOperation(), clusterName);
return false;
}
@@ -705,21 +617,15 @@
return false;
}
- // 4. Collect a list of all partitions that have a current state on swapOutInstance
- String swapOutLastActiveSession;
- if (swapOutLiveInstance == null) {
- // SwapOutInstance is down, try to find the last active session
- if (swapOutSessions.size() != 1) {
- logger.warn(
- "SwapOutInstance {} is offline and has {} sessions for cluster {}. Swap can't be "
- + "verified if last active session can't be determined. There should only be one session.",
- swapOutInstanceName, swapOutSessions.size(), clusterName);
- return false;
- }
- swapOutLastActiveSession = swapOutSessions.get(0);
- } else {
- swapOutLastActiveSession = swapOutLiveInstance.getEphemeralOwner();
+ // 4. If the swap-out instance is not alive or is disabled, we return true without checking
+ // the current states on the swap-in instance.
+ if (swapOutLiveInstance == null || swapOutInstanceConfig.getInstanceOperation()
+ .equals(InstanceConstants.InstanceOperation.DISABLE)) {
+ return true;
}
+
+ // 5. Collect a list of all partitions that have a current state on swapOutInstance
+ String swapOutLastActiveSession = swapOutLiveInstance.getEphemeralOwner();
String swapInActiveSession = swapInLiveInstance.getEphemeralOwner();
// Iterate over all resources with current states on the swapOutInstance
@@ -754,24 +660,22 @@
String swapOutPartitionState = swapOutResourceCurrentState.getState(partitionName);
String swapInPartitionState = swapInResourceCurrentState.getState(partitionName);
- // SwapInInstance should not have any partitions in ERROR state.
- if (swapInPartitionState.equals(HelixDefinedState.ERROR.name())) {
- logger.warn(
- "SwapOutInstance {} has partition {} in state {} and SwapInInstance {} has partition {} in state {} for cluster {}."
- + " Swap will not complete unless both instances have no partitions in ERROR state.",
- swapOutInstanceName, partitionName, swapOutPartitionState, swapInInstanceName,
- partitionName, swapInPartitionState, clusterName);
- return false;
- }
-
- // The state of the partition on the swapInInstance be in the topState or a secondTopState.
- // It should be in a topState only if the state model allows multiple replicas in the topState.
- // In all other cases it should be a secondTopState.
- if (!(swapInPartitionState.equals(topState) || secondTopStates.contains(
+ // SwapInInstance should have the correct state for the partition.
+ // All states should match except for the case where the topState is not ALL_REPLICAS or ALL_CANDIDATE_NODES
+ // or the swap-out partition is ERROR state.
+ // When the topState is not ALL_REPLICAS or ALL_CANDIDATE_NODES, the swap-in partition should be in a secondTopStates.
+ if (!(swapOutPartitionState.equals(HelixDefinedState.ERROR.name()) || (
+ topState.equals(swapOutPartitionState) && (
+ swapOutPartitionState.equals(swapInPartitionState) ||
+ !ImmutableSet.of(StateModelDefinition.STATE_REPLICA_COUNT_ALL_REPLICAS,
+ StateModelDefinition.STATE_REPLICA_COUNT_ALL_CANDIDATE_NODES).contains(
+ stateModelDefinition.getNumInstancesPerState(
+ stateModelDefinition.getTopState())) && secondTopStates.contains(
+ swapInPartitionState))) || swapOutPartitionState.equals(
swapInPartitionState))) {
logger.warn(
"SwapOutInstance {} has partition {} in {} but SwapInInstance {} has partition {} in state {} for cluster {}."
- + " Swap will not complete unless SwapInInstance has partition in topState or secondState.",
+ + " Swap will not complete unless SwapInInstance has partition in correct states.",
swapOutInstanceName, partitionName, swapOutPartitionState, swapInInstanceName,
partitionName, swapInPartitionState, clusterName);
return false;
@@ -792,12 +696,21 @@
return false;
}
- InstanceConfig swapOutInstanceConfig = instanceConfig.getInstanceOperation()
- .equals(InstanceConstants.InstanceOperation.SWAP_OUT.name()) ? instanceConfig
- : findMatchingSwapInstance(clusterName, instanceConfig);
+ List<InstanceConfig> swappingInstances =
+ findInstancesMatchingLogicalId(clusterName, instanceConfig);
+ if (swappingInstances.size() != 1) {
+ logger.warn(
+ "Instance {} in cluster {} is not swapping with any other instance. Cannot determine if the swap is complete.",
+ instanceName, clusterName);
+ return false;
+ }
+
+ InstanceConfig swapOutInstanceConfig =
+ !instanceConfig.getInstanceOperation().equals(InstanceConstants.InstanceOperation.SWAP_IN)
+ ? instanceConfig : swappingInstances.get(0);
InstanceConfig swapInInstanceConfig = instanceConfig.getInstanceOperation()
- .equals(InstanceConstants.InstanceOperation.SWAP_IN.name()) ? instanceConfig
- : findMatchingSwapInstance(clusterName, instanceConfig);
+ .equals(InstanceConstants.InstanceOperation.SWAP_IN) ? instanceConfig
+ : swappingInstances.get(0);
if (swapOutInstanceConfig == null || swapInInstanceConfig == null) {
logger.warn(
"Instance {} in cluster {} is not swapping with any other instance. Cannot determine if the swap is complete.",
@@ -821,12 +734,21 @@
return false;
}
- InstanceConfig swapOutInstanceConfig = instanceConfig.getInstanceOperation()
- .equals(InstanceConstants.InstanceOperation.SWAP_OUT.name()) ? instanceConfig
- : findMatchingSwapInstance(clusterName, instanceConfig);
+ List<InstanceConfig> swappingInstances =
+ findInstancesMatchingLogicalId(clusterName, instanceConfig);
+ if (swappingInstances.size() != 1) {
+ logger.warn(
+ "Instance {} in cluster {} is not swapping with any other instance. Cannot determine if the swap is complete.",
+ instanceName, clusterName);
+ return false;
+ }
+
+ InstanceConfig swapOutInstanceConfig =
+ !instanceConfig.getInstanceOperation().equals(InstanceConstants.InstanceOperation.SWAP_IN)
+ ? instanceConfig : swappingInstances.get(0);
InstanceConfig swapInInstanceConfig = instanceConfig.getInstanceOperation()
- .equals(InstanceConstants.InstanceOperation.SWAP_IN.name()) ? instanceConfig
- : findMatchingSwapInstance(clusterName, instanceConfig);
+ .equals(InstanceConstants.InstanceOperation.SWAP_IN) ? instanceConfig
+ : swappingInstances.get(0);
if (swapOutInstanceConfig == null || swapInInstanceConfig == null) {
logger.warn(
"Instance {} in cluster {} is not swapping with any other instance. Cannot determine if the swap is complete.",
@@ -840,11 +762,39 @@
return false;
}
- // Complete the swap by removing the InstanceOperation for the SWAP_IN node and disabling the SWAP_OUT node.
- setInstanceOperation(clusterName, swapInInstanceConfig.getInstanceName(), null);
- enableInstance(clusterName, swapOutInstanceConfig.getInstanceName(), false);
+ BaseDataAccessor<ZNRecord> baseAccessor = new ZkBaseDataAccessor<>(_zkClient);
+ String swapInInstanceConfigPath =
+ PropertyPathBuilder.instanceConfig(clusterName, swapInInstanceConfig.getInstanceName());
+ String swapOutInstanceConfigPath =
+ PropertyPathBuilder.instanceConfig(clusterName, swapOutInstanceConfig.getInstanceName());
- return true;
+ Map<String, DataUpdater<ZNRecord>> updaterMap = new HashMap<>();
+ updaterMap.put(swapInInstanceConfigPath, currentData -> {
+ if (currentData == null) {
+ throw new HelixException("Cluster: " + clusterName + ", instance: " + instanceName
+ + ", SWAP_IN instance config is null");
+ }
+
+ InstanceConfig currentSwapOutInstanceConfig =
+ getInstanceConfig(clusterName, swapOutInstanceConfig.getInstanceName());
+ InstanceConfig config = new InstanceConfig(currentData);
+ config.overwriteInstanceConfig(currentSwapOutInstanceConfig);
+ // Special handling in case the swap-out instance does not have HELIX_ENABLED or InstanceOperation set.
+ return config.getRecord();
+ });
+
+ updaterMap.put(swapOutInstanceConfigPath, currentData -> {
+ if (currentData == null) {
+ throw new HelixException("Cluster: " + clusterName + ", instance: " + instanceName
+ + ", swap out instance config is null");
+ }
+
+ InstanceConfig config = new InstanceConfig(currentData);
+ config.setInstanceOperation(InstanceConstants.InstanceOperation.UNKNOWN);
+ return config.getRecord();
+ });
+
+ return baseAccessor.multiSet(updaterMap);
}
@Override
@@ -2427,6 +2377,7 @@
setResourceIdealState(clusterName, resourceName, idealState);
}
+ @Deprecated
private void enableSingleInstance(final String clusterName, final String instanceName,
final boolean enabled, BaseDataAccessor<ZNRecord> baseAccessor,
InstanceConstants.InstanceDisabledType disabledType, String reason) {
@@ -2448,7 +2399,7 @@
InstanceConfig config = new InstanceConfig(currentData);
config.setInstanceEnabled(enabled);
if (!enabled) {
- // new disabled type and reason will over write existing ones.
+ // new disabled type and reason will overwrite existing ones.
config.resetInstanceDisabledTypeAndReason();
if (reason != null) {
config.setInstanceDisabledReason(reason);
diff --git a/helix-core/src/main/java/org/apache/helix/model/ClusterConfig.java b/helix-core/src/main/java/org/apache/helix/model/ClusterConfig.java
index fdc05ac..edb7a76 100644
--- a/helix-core/src/main/java/org/apache/helix/model/ClusterConfig.java
+++ b/helix-core/src/main/java/org/apache/helix/model/ClusterConfig.java
@@ -75,6 +75,8 @@
// TODO: if we want to support this for other rebalancers, we need to implement that logic
GLOBAL_MAX_PARTITIONS_ALLOWED_PER_INSTANCE,
// The following two include offline AND disabled instances
+ // TODO: At some point we should rename this to something like MAX_INSTANCES_UNABLE_TO_TAKE_ACCEPT_REPLICAS
+ // to make it clear that it includes both offline and non-assignable instances
MAX_OFFLINE_INSTANCES_ALLOWED,
NUM_OFFLINE_INSTANCES_FOR_AUTO_EXIT, // For auto-exiting maintenance mode
@@ -88,7 +90,9 @@
// state transition if the number of
// partitons that need recovery or in
// error exceeds this limitation
+ @Deprecated // TODO: Remove in Helix 2.0
DISABLED_INSTANCES,
+ @Deprecated // TODO: Remove in Helix 2.0
DISABLED_INSTANCES_WITH_INFO,
// disabled instances and disabled instances with info are for storing batch disabled instances.
// disabled instances will write into both 2 fields for backward compatibility.
@@ -816,8 +820,11 @@
/**
* Get current disabled instance map of <instance, disabledTimeStamp>
+ * @deprecated We will no longer be using the clusterConfig to disable instances
+ * please use the InstanceConfig to disable instances
* @return a non-null map of disabled instances in cluster config
*/
+ @Deprecated
public Map<String, String> getDisabledInstances() {
Map<String, String> disabledInstances =
_record.getMapField(ClusterConfigProperty.DISABLED_INSTANCES.name());
@@ -827,8 +834,10 @@
/**
* Get current disabled instance map of
* <instance, disabledReason = "res, disabledType = typ, disabledTimeStamp = time">
+ * @deprecated Please use InstanceConfig for enabling and disabling instances
* @return a non-null map of disabled instances in cluster config
*/
+ @Deprecated
public Map<String, String> getDisabledInstancesWithInfo() {
Map<String, String> disabledInstances =
_record.getMapField(ClusterConfigProperty.DISABLED_INSTANCES_WITH_INFO.name());
diff --git a/helix-core/src/main/java/org/apache/helix/model/InstanceConfig.java b/helix-core/src/main/java/org/apache/helix/model/InstanceConfig.java
index 2f3da14..de41646 100644
--- a/helix-core/src/main/java/org/apache/helix/model/InstanceConfig.java
+++ b/helix-core/src/main/java/org/apache/helix/model/InstanceConfig.java
@@ -29,6 +29,7 @@
import java.util.Set;
import java.util.stream.Collectors;
+import com.google.common.collect.ImmutableSet;
import org.apache.helix.HelixException;
import org.apache.helix.HelixProperty;
import org.apache.helix.constants.InstanceConstants;
@@ -47,9 +48,7 @@
* Configurable characteristics of an instance
*/
public enum InstanceConfigProperty {
- HELIX_HOST,
- HELIX_PORT,
- HELIX_ZONE_ID,
+ HELIX_HOST, HELIX_PORT, HELIX_ZONE_ID, @Deprecated
HELIX_ENABLED,
HELIX_ENABLED_TIMESTAMP,
HELIX_DISABLED_REASON,
@@ -71,6 +70,13 @@
private static final int TARGET_TASK_THREAD_POOL_SIZE_NOT_SET = -1;
private static final boolean HELIX_ENABLED_DEFAULT_VALUE = true;
+ // These fields are not allowed to be overwritten by the merge method because
+ // they are unique properties of an instance.
+ private static final ImmutableSet<InstanceConfigProperty> NON_OVERWRITABLE_PROPERTIES =
+ ImmutableSet.of(InstanceConfigProperty.HELIX_HOST, InstanceConfigProperty.HELIX_PORT,
+ InstanceConfigProperty.HELIX_ZONE_ID, InstanceConfigProperty.DOMAIN,
+ InstanceConfigProperty.INSTANCE_INFO_MAP);
+
private static final Logger _logger = LoggerFactory.getLogger(InstanceConfig.class.getName());
/**
@@ -252,20 +258,22 @@
}
/**
- * Check if this instance is enabled and able to serve replicas
- * @return true if enabled, false if disabled
+ * Get the timestamp (milliseconds from epoch) when this instance was enabled/disabled last time.
+ *
+ * @return the timestamp when the instance was enabled/disabled last time. If the instance is never
+ * enabled/disabled, return -1.
*/
- public boolean getInstanceEnabled() {
- return _record.getBooleanField(InstanceConfigProperty.HELIX_ENABLED.name(),
- HELIX_ENABLED_DEFAULT_VALUE);
+ public long getInstanceEnabledTime() {
+ return _record.getLongField(InstanceConfigProperty.HELIX_ENABLED_TIMESTAMP.name(), -1);
}
/**
- * Set the enabled state of the instance
- * If user enables the instance, HELIX_DISABLED_REASON filed will be removed.
- *
+ * Set the enabled state of the instance If user enables the instance, HELIX_DISABLED_REASON filed
+ * will be removed.
+ * @deprecated This method is deprecated. Please use setInstanceOperation instead.
* @param enabled true to enable, false to disable
*/
+ @Deprecated
public void setInstanceEnabled(boolean enabled) {
// set instance operation only when we need to change InstanceEnabled value.
setInstanceEnabledHelper(enabled);
@@ -292,7 +300,7 @@
* It will be a no-op when instance is enabled.
*/
public void setInstanceDisabledReason(String disabledReason) {
- if (!getInstanceEnabled()) {
+ if (getInstanceOperation().equals(InstanceConstants.InstanceOperation.DISABLE)) {
_record.setSimpleField(InstanceConfigProperty.HELIX_DISABLED_REASON.name(), disabledReason);
}
}
@@ -302,13 +310,14 @@
* It will be a no-op when instance is enabled.
*/
public void setInstanceDisabledType(InstanceConstants.InstanceDisabledType disabledType) {
- if (!getInstanceEnabled()) {
+ if (getInstanceOperation().equals(InstanceConstants.InstanceOperation.DISABLE)) {
_record.setSimpleField(InstanceConfigProperty.HELIX_DISABLED_TYPE.name(),
disabledType.name());
}
}
/**
+ * Get the instance disabled reason when instance is disabled.
* @return Return instance disabled reason. Default is am empty string.
*/
public String getInstanceDisabledReason() {
@@ -321,7 +330,7 @@
* Default is am empty string.
*/
public String getInstanceDisabledType() {
- if (getInstanceEnabled()) {
+ if (!getInstanceOperation().equals(InstanceConstants.InstanceOperation.DISABLE)) {
return InstanceConstants.INSTANCE_NOT_DISABLED;
}
return _record.getStringField(InstanceConfigProperty.HELIX_DISABLED_TYPE.name(),
@@ -329,21 +338,85 @@
}
/**
- * Get the timestamp (milliseconds from epoch) when this instance was enabled/disabled last time.
+ * Set the instance operation for this instance.
*
- * @return
+ * @param operation the instance operation
*/
- public long getInstanceEnabledTime() {
- return _record.getLongField(InstanceConfigProperty.HELIX_ENABLED_TIMESTAMP.name(), -1);
- }
-
public void setInstanceOperation(InstanceConstants.InstanceOperation operation) {
_record.setSimpleField(InstanceConfigProperty.INSTANCE_OPERATION.name(),
operation == null ? "" : operation.name());
+ if (operation == null || operation == InstanceConstants.InstanceOperation.ENABLE
+ || operation == InstanceConstants.InstanceOperation.DISABLE) {
+ // We are still setting the HELIX_ENABLED field for backwards compatibility.
+ // It is possible that users will be using earlier version of HelixAdmin or helix-rest
+ // is on older version.
+ // TODO: Remove this when we are sure that all users are using the new field INSTANCE_OPERATION.
+ setInstanceEnabledHelper(!(operation == InstanceConstants.InstanceOperation.DISABLE));
+ }
}
- public String getInstanceOperation() {
- return _record.getStringField(InstanceConfigProperty.INSTANCE_OPERATION.name(), "");
+ private void setInstanceOperationInit(InstanceConstants.InstanceOperation operation) {
+ if (operation == null) {
+ return;
+ }
+ _record.setSimpleField(InstanceConfigProperty.INSTANCE_OPERATION.name(), operation.name());
+ }
+
+ /**
+ * Get the InstanceOperation of this instance, default is ENABLE if nothing is set. If
+ * HELIX_ENABLED is set to false, then the instance operation is DISABLE for backwards
+ * compatibility.
+ *
+ * @return the instance operation
+ */
+ public InstanceConstants.InstanceOperation getInstanceOperation() {
+ String instanceOperationString =
+ _record.getSimpleField(InstanceConfigProperty.INSTANCE_OPERATION.name());
+
+ InstanceConstants.InstanceOperation instanceOperation;
+ try {
+ // If INSTANCE_OPERATION is not set, then the instance is enabled.
+ instanceOperation = (instanceOperationString == null || instanceOperationString.isEmpty())
+ ? InstanceConstants.InstanceOperation.ENABLE
+ : InstanceConstants.InstanceOperation.valueOf(instanceOperationString);
+ } catch (IllegalArgumentException e) {
+ _logger.error("Invalid instance operation: " + instanceOperationString + " for instance: "
+ + _record.getId()
+ + ". You may need to update your version of Helix to get support for this "
+ + "type of InstanceOperation. Defaulting to UNKNOWN.");
+ return InstanceConstants.InstanceOperation.UNKNOWN;
+ }
+
+ // Always respect the HELIX_ENABLED being set to false when instance operation is unset
+ // for backwards compatibility.
+ if (!_record.getBooleanField(InstanceConfigProperty.HELIX_ENABLED.name(),
+ HELIX_ENABLED_DEFAULT_VALUE)
+ && (InstanceConstants.INSTANCE_DISABLED_OVERRIDABLE_OPERATIONS.contains(
+ instanceOperation))) {
+ return InstanceConstants.InstanceOperation.DISABLE;
+ }
+
+ return instanceOperation;
+ }
+
+ /**
+ * Check if this instance is enabled. This is used to determine if the instance can host online
+ * replicas and take new assignment.
+ *
+ * @return true if enabled, false otherwise
+ */
+ public boolean getInstanceEnabled() {
+ return getInstanceOperation().equals(InstanceConstants.InstanceOperation.ENABLE);
+ }
+
+ /**
+ * Check to see if the instance is assignable. This is used to determine if the instance can be
+ * selected by the rebalancer to take assignment of replicas.
+ *
+ * @return true if the instance is assignable, false otherwise
+ */
+ public boolean isAssignable() {
+ return InstanceConstants.ASSIGNABLE_INSTANCE_OPERATIONS.contains(getInstanceOperation());
}
/**
@@ -777,6 +850,34 @@
return true;
}
+ /**
+ * Overwrite the InstanceConfigProperties from the given InstanceConfig to this InstanceConfig.
+ * The merge is done by overwriting the properties in this InstanceConfig with the properties
+ * from the given InstanceConfig. {@link #NON_OVERWRITABLE_PROPERTIES} will not be overridden.
+ *
+ * @param overwritingInstanceConfig the InstanceConfig to override into this InstanceConfig
+ */
+ public void overwriteInstanceConfig(InstanceConfig overwritingInstanceConfig) {
+ // Remove all overwritable fields from the record
+ Set<String> overwritableProperties = Arrays.stream(InstanceConfigProperty.values())
+ .filter(property -> !NON_OVERWRITABLE_PROPERTIES.contains(property)).map(Enum::name)
+ .collect(Collectors.toSet());
+ _record.getSimpleFields().keySet().removeAll(overwritableProperties);
+ _record.getListFields().keySet().removeAll(overwritableProperties);
+ _record.getMapFields().keySet().removeAll(overwritableProperties);
+
+ // Get all overwritable fields from the overwritingInstanceConfig and set them in this record
+ overwritingInstanceConfig.getRecord().getSimpleFields().entrySet().stream()
+ .filter(entry -> overwritableProperties.contains(entry.getKey()))
+ .forEach((entry) -> _record.setSimpleField(entry.getKey(), entry.getValue()));
+ overwritingInstanceConfig.getRecord().getListFields().entrySet().stream()
+ .filter(entry -> overwritableProperties.contains(entry.getKey()))
+ .forEach((entry) -> _record.setListField(entry.getKey(), entry.getValue()));
+ overwritingInstanceConfig.getRecord().getMapFields().entrySet().stream()
+ .filter(entry -> overwritableProperties.contains(entry.getKey()))
+ .forEach((entry) -> _record.setMapField(entry.getKey(), entry.getValue()));
+ }
+
public static class Builder {
private String _hostName;
private String _port;
@@ -828,12 +929,15 @@
instanceConfig.addTag(tag);
}
- if (_instanceEnabled != HELIX_ENABLED_DEFAULT_VALUE) {
- instanceConfig.setInstanceEnabled(_instanceEnabled);
+ if (_instanceOperation == null && _instanceEnabled != HELIX_ENABLED_DEFAULT_VALUE) {
+ instanceConfig.setInstanceOperationInit(
+ _instanceEnabled ? InstanceConstants.InstanceOperation.ENABLE
+ : InstanceConstants.InstanceOperation.DISABLE);
}
- if (_instanceOperation != null) {
- instanceConfig.setInstanceOperation(_instanceOperation);
+ if (_instanceOperation != null && !_instanceOperation.equals(
+ InstanceConstants.InstanceOperation.ENABLE)) {
+ instanceConfig.setInstanceOperationInit(_instanceOperation);
}
if (_instanceInfoMap != null) {
@@ -899,9 +1003,11 @@
/**
* Set the enabled status for this instance
+ * @deprecated HELIX_ENABLED is no longer in use. Use setInstanceOperation instead.
* @param instanceEnabled true if enabled, false otherwise
* @return InstanceConfig.Builder
*/
+ @Deprecated
public Builder setInstanceEnabled(boolean instanceEnabled) {
_instanceEnabled = instanceEnabled;
return this;
diff --git a/helix-core/src/main/java/org/apache/helix/model/MaintenanceSignal.java b/helix-core/src/main/java/org/apache/helix/model/MaintenanceSignal.java
index f978130..83e0e1c 100644
--- a/helix-core/src/main/java/org/apache/helix/model/MaintenanceSignal.java
+++ b/helix-core/src/main/java/org/apache/helix/model/MaintenanceSignal.java
@@ -50,7 +50,9 @@
* maintenance mode. This field does not apply when triggered manually.
*/
public enum AutoTriggerReason {
+ @Deprecated // Replaced with MAX_INSTANCES_UNABLE_TO_ACCEPT_ONLINE_REPLICAS
MAX_OFFLINE_INSTANCES_EXCEEDED,
+ MAX_INSTANCES_UNABLE_TO_ACCEPT_ONLINE_REPLICAS,
MAX_PARTITION_PER_INSTANCE_EXCEEDED,
NOT_APPLICABLE // Not triggered automatically or automatically exiting maintenance mode
}
diff --git a/helix-core/src/main/java/org/apache/helix/spectator/RoutingDataCache.java b/helix-core/src/main/java/org/apache/helix/spectator/RoutingDataCache.java
index 8872e9e..db9ada9 100644
--- a/helix-core/src/main/java/org/apache/helix/spectator/RoutingDataCache.java
+++ b/helix-core/src/main/java/org/apache/helix/spectator/RoutingDataCache.java
@@ -26,7 +26,6 @@
import java.util.stream.Collectors;
import com.google.common.collect.ImmutableMap;
-import com.google.common.collect.ImmutableSet;
import org.apache.helix.HelixConstants;
import org.apache.helix.HelixDataAccessor;
import org.apache.helix.HelixException;
@@ -45,16 +44,13 @@
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+
/**
* Cache the cluster data that are needed by RoutingTableProvider.
*/
class RoutingDataCache extends BasicClusterDataCache {
private static Logger LOG = LoggerFactory.getLogger(RoutingDataCache.class.getName());
- // When an instance has any of these instance operations, it should not be routable.
- private static final ImmutableSet<String> NON_ROUTABLE_INSTANCE_OPERATIONS =
- ImmutableSet.of(InstanceConstants.InstanceOperation.SWAP_IN.name());
-
private final Map<PropertyType, List<String>> _sourceDataTypeMap;
private CurrentStateCache _currentStateCache;
@@ -185,7 +181,7 @@
private void updateRoutableInstanceConfigMap(Map<String, InstanceConfig> instanceConfigMap) {
_routableInstanceConfigMap = instanceConfigMap.entrySet().stream().filter(
- (instanceConfigEntry) -> !NON_ROUTABLE_INSTANCE_OPERATIONS.contains(
+ (instanceConfigEntry) -> !InstanceConstants.UNSERVABLE_INSTANCE_OPERATIONS.contains(
instanceConfigEntry.getValue().getInstanceOperation()))
.collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue));
}
@@ -194,7 +190,7 @@
Map<String, LiveInstance> liveInstanceMap) {
_routableLiveInstanceMap = liveInstanceMap.entrySet().stream().filter(
(liveInstanceEntry) -> instanceConfigMap.containsKey(liveInstanceEntry.getKey())
- && !NON_ROUTABLE_INSTANCE_OPERATIONS.contains(
+ && !InstanceConstants.UNSERVABLE_INSTANCE_OPERATIONS.contains(
instanceConfigMap.get(liveInstanceEntry.getKey()).getInstanceOperation()))
.collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue));
}
diff --git a/helix-core/src/main/java/org/apache/helix/task/JobDispatcher.java b/helix-core/src/main/java/org/apache/helix/task/JobDispatcher.java
index fd22a8e..6d4c687 100644
--- a/helix-core/src/main/java/org/apache/helix/task/JobDispatcher.java
+++ b/helix-core/src/main/java/org/apache/helix/task/JobDispatcher.java
@@ -141,8 +141,8 @@
// Will contain the list of partitions that must be explicitly dropped from the ideal state that
// is stored in zk.
Set<String> liveInstances =
- jobCfg.getInstanceGroupTag() == null ? _dataProvider.getAssignableEnabledLiveInstances()
- : _dataProvider.getAssignableEnabledLiveInstancesWithTag(jobCfg.getInstanceGroupTag());
+ jobCfg.getInstanceGroupTag() == null ? _dataProvider.getEnabledLiveInstances()
+ : _dataProvider.getEnabledLiveInstancesWithTag(jobCfg.getInstanceGroupTag());
if (liveInstances.isEmpty()) {
LOG.error("No available instance found for job: {}", jobName);
diff --git a/helix-core/src/main/java/org/apache/helix/tools/ClusterSetup.java b/helix-core/src/main/java/org/apache/helix/tools/ClusterSetup.java
index 0d9a4f1..633ce03 100644
--- a/helix-core/src/main/java/org/apache/helix/tools/ClusterSetup.java
+++ b/helix-core/src/main/java/org/apache/helix/tools/ClusterSetup.java
@@ -313,7 +313,7 @@
ClusterConfig clusterConfig = accessor.getProperty(keyBuilder.clusterConfig());
// ensure node is disabled, otherwise fail
- if (InstanceValidationUtil.isInstanceEnabled(config, clusterConfig)) {
+ if (config.getInstanceEnabled()) {
String error = "Node " + instanceId + " is enabled, cannot drop";
_logger.warn(error);
throw new HelixException(error);
diff --git a/helix-core/src/main/java/org/apache/helix/tools/ClusterVerifiers/StrictMatchExternalViewVerifier.java b/helix-core/src/main/java/org/apache/helix/tools/ClusterVerifiers/StrictMatchExternalViewVerifier.java
index d0da9ba..7ed44f8 100644
--- a/helix-core/src/main/java/org/apache/helix/tools/ClusterVerifiers/StrictMatchExternalViewVerifier.java
+++ b/helix-core/src/main/java/org/apache/helix/tools/ClusterVerifiers/StrictMatchExternalViewVerifier.java
@@ -331,7 +331,7 @@
for (String partition : idealState.getPartitionSet()) {
List<String> preferenceList = AbstractRebalancer.getPreferenceList(new Partition(partition),
- idealState, cache.getAssignableEnabledLiveInstances());
+ idealState, cache.getEnabledLiveInstances());
Map<String, String> idealMapping;
if (_isDeactivatedNodeAware) {
idealMapping = HelixUtil.computeIdealMapping(preferenceList, stateModelDef,
@@ -339,7 +339,7 @@
cache.getDisabledInstancesForPartition(idealState.getResourceName(), partition));
} else {
idealMapping = HelixUtil.computeIdealMapping(preferenceList, stateModelDef,
- cache.getAssignableEnabledLiveInstances(),
+ cache.getEnabledLiveInstances(),
Collections.emptySet());
}
idealPartitionState.put(partition, idealMapping);
diff --git a/helix-core/src/main/java/org/apache/helix/util/HelixUtil.java b/helix-core/src/main/java/org/apache/helix/util/HelixUtil.java
index 4a3d49b..834b846 100644
--- a/helix-core/src/main/java/org/apache/helix/util/HelixUtil.java
+++ b/helix-core/src/main/java/org/apache/helix/util/HelixUtil.java
@@ -399,7 +399,7 @@
// Remove all disabled instances so that Helix will not consider them live.
List<String> disabledInstance = instanceConfigs.stream()
- .filter(instanceConfig -> !InstanceValidationUtil.isInstanceEnabled(instanceConfig, clusterConfig))
+ .filter(instanceConfig -> !instanceConfig.getInstanceEnabled())
.map(InstanceConfig::getInstanceName)
.collect(Collectors.toList());
liveInstances.removeAll(disabledInstance);
diff --git a/helix-core/src/main/java/org/apache/helix/util/InstanceValidationUtil.java b/helix-core/src/main/java/org/apache/helix/util/InstanceValidationUtil.java
index 2542ecf..5dea683 100644
--- a/helix-core/src/main/java/org/apache/helix/util/InstanceValidationUtil.java
+++ b/helix-core/src/main/java/org/apache/helix/util/InstanceValidationUtil.java
@@ -73,22 +73,22 @@
public static boolean isEnabled(HelixDataAccessor dataAccessor, String instanceName) {
PropertyKey.Builder propertyKeyBuilder = dataAccessor.keyBuilder();
InstanceConfig instanceConfig = dataAccessor.getProperty(propertyKeyBuilder.instanceConfig(instanceName));
- ClusterConfig clusterConfig = dataAccessor.getProperty(propertyKeyBuilder.clusterConfig());
- // TODO deprecate instance level config checks once migrated the enable status to cluster config only
- if (instanceConfig == null || clusterConfig == null) {
- throw new HelixException("InstanceConfig or ClusterConfig is NULL");
+ if (instanceConfig == null) {
+ throw new HelixException("InstanceConfig is NULL");
}
- return isInstanceEnabled(instanceConfig, clusterConfig);
-
+ return instanceConfig.getInstanceEnabled();
}
/**
* Check if the instance is enabled by configuration
+ * @deprecated Use {@link InstanceConfig#getInstanceEnabled()} instead. We will no longer
+ * be using cluster config to enable/disable instances.
* @param instanceConfig
* @param clusterConfig
* @return
*/
+ @Deprecated
public static boolean isInstanceEnabled(InstanceConfig instanceConfig, ClusterConfig clusterConfig) {
if (instanceConfig == null) {
throw new HelixException("InstanceConfig is NULL");
diff --git a/helix-core/src/main/java/org/apache/helix/util/WeightAwareRebalanceUtil.java b/helix-core/src/main/java/org/apache/helix/util/WeightAwareRebalanceUtil.java
index bc43954..18ddc02 100644
--- a/helix-core/src/main/java/org/apache/helix/util/WeightAwareRebalanceUtil.java
+++ b/helix-core/src/main/java/org/apache/helix/util/WeightAwareRebalanceUtil.java
@@ -29,6 +29,7 @@
import org.apache.helix.api.config.RebalanceConfig;
import org.apache.helix.api.rebalancer.constraint.AbstractRebalanceHardConstraint;
import org.apache.helix.api.rebalancer.constraint.AbstractRebalanceSoftConstraint;
+import org.apache.helix.constants.InstanceConstants;
import org.apache.helix.controller.common.PartitionStateMap;
import org.apache.helix.controller.common.ResourcesStateMap;
import org.apache.helix.controller.dataproviders.ResourceControllerDataProvider;
@@ -77,7 +78,7 @@
List<InstanceConfig> instanceConfigs) {
for (InstanceConfig instanceConfig : instanceConfigs) {
// ensure the instance is enabled
- instanceConfig.setInstanceEnabled(true);
+ instanceConfig.setInstanceOperation(InstanceConstants.InstanceOperation.ENABLE);
_instanceConfigMap.put(instanceConfig.getInstanceName(), instanceConfig);
}
// ensure no instance is disabled
diff --git a/helix-core/src/test/java/org/apache/helix/common/ZkTestBase.java b/helix-core/src/test/java/org/apache/helix/common/ZkTestBase.java
index 0218c3f..a265605 100644
--- a/helix-core/src/test/java/org/apache/helix/common/ZkTestBase.java
+++ b/helix-core/src/test/java/org/apache/helix/common/ZkTestBase.java
@@ -45,6 +45,7 @@
import org.apache.helix.SystemPropertyKeys;
import org.apache.helix.TestHelper;
import org.apache.helix.api.config.HelixConfigProperty;
+import org.apache.helix.constants.InstanceConstants;
import org.apache.helix.controller.pipeline.AbstractAsyncBaseStage;
import org.apache.helix.controller.pipeline.Pipeline;
import org.apache.helix.controller.pipeline.Stage;
@@ -633,7 +634,6 @@
for (int i = 0; i < liveInstances.length; i++) {
String instance = "localhost_" + liveInstances[i];
-
_liveInstanceOwners.putIfAbsent(clusterName, new HashMap<>());
Map<String, HelixZkClient> clientMap = _liveInstanceOwners.get(clusterName);
clientMap.putIfAbsent(instance, DedicatedZkClientFactory.getInstance()
@@ -687,7 +687,7 @@
InstanceConfig instanceConfig = new InstanceConfig(instance);
instanceConfig.setHostName("localhost");
instanceConfig.setPort("" + instances[i]);
- instanceConfig.setInstanceEnabled(true);
+ instanceConfig.setInstanceOperation(InstanceConstants.InstanceOperation.ENABLE);
admin.addInstance(clusterName, instanceConfig);
}
}
diff --git a/helix-core/src/test/java/org/apache/helix/controller/changedetector/TestResourceChangeDetector.java b/helix-core/src/test/java/org/apache/helix/controller/changedetector/TestResourceChangeDetector.java
index b4f405c..bdfa278 100644
--- a/helix-core/src/test/java/org/apache/helix/controller/changedetector/TestResourceChangeDetector.java
+++ b/helix-core/src/test/java/org/apache/helix/controller/changedetector/TestResourceChangeDetector.java
@@ -31,6 +31,7 @@
import org.apache.helix.PropertyKey;
import org.apache.helix.TestHelper;
import org.apache.helix.common.ZkTestBase;
+import org.apache.helix.constants.InstanceConstants;
import org.apache.helix.controller.dataproviders.ResourceControllerDataProvider;
import org.apache.helix.integration.manager.ClusterControllerManager;
import org.apache.helix.integration.manager.MockParticipantManager;
@@ -399,7 +400,7 @@
_dataAccessor.getProperty(_keyBuilder.instanceConfig(instanceName));
Assert.assertTrue(instanceConfig.getInstanceEnabled());
try {
- instanceConfig.setInstanceEnabled(false);
+ instanceConfig.setInstanceOperation(InstanceConstants.InstanceOperation.DISABLE);
_dataAccessor.updateProperty(_keyBuilder.instanceConfig(instanceName), instanceConfig);
_dataProvider.notifyDataChange(ChangeType.INSTANCE_CONFIG);
_dataProvider.refresh(_dataAccessor);
@@ -410,7 +411,7 @@
} finally {
// remove newly added resource/ideastate
_gSetupTool.getClusterManagementTool().dropResource(CLUSTER_NAME, resourceName);
- instanceConfig.setInstanceEnabled(true);
+ instanceConfig.setInstanceOperation(InstanceConstants.InstanceOperation.ENABLE);
_dataAccessor.updateProperty(_keyBuilder.instanceConfig(instanceName), instanceConfig);
}
}
diff --git a/helix-core/src/test/java/org/apache/helix/controller/rebalancer/TestAbstractRebalancer.java b/helix-core/src/test/java/org/apache/helix/controller/rebalancer/TestAbstractRebalancer.java
index 45c35f3..f8af3ee 100644
--- a/helix-core/src/test/java/org/apache/helix/controller/rebalancer/TestAbstractRebalancer.java
+++ b/helix-core/src/test/java/org/apache/helix/controller/rebalancer/TestAbstractRebalancer.java
@@ -56,6 +56,8 @@
new IdealState("test"), new ClusterConfig("TestCluster"), partition,
MonitoredAbnormalResolver.DUMMY_STATE_RESOLVER);
+ System.out.println("Expected best possible state map: " + expectedBestPossibleMap);
+ System.out.println("Actual best possible state map: " + bestPossibleMap);
Assert.assertTrue(bestPossibleMap.equals(expectedBestPossibleMap));
}
diff --git a/helix-core/src/test/java/org/apache/helix/controller/rebalancer/waged/TestWagedRebalancer.java b/helix-core/src/test/java/org/apache/helix/controller/rebalancer/waged/TestWagedRebalancer.java
index 951e0e3..a554283 100644
--- a/helix-core/src/test/java/org/apache/helix/controller/rebalancer/waged/TestWagedRebalancer.java
+++ b/helix-core/src/test/java/org/apache/helix/controller/rebalancer/waged/TestWagedRebalancer.java
@@ -118,9 +118,9 @@
liveInstanceMap.put(instanceName, testLiveInstance);
when(testCache.getAssignableLiveInstances()).thenReturn(liveInstanceMap);
when(testCache.getLiveInstances()).thenReturn(liveInstanceMap);
- when(testCache.getAssignableEnabledInstances()).thenReturn(liveInstanceMap.keySet());
when(testCache.getEnabledInstances()).thenReturn(liveInstanceMap.keySet());
- when(testCache.getAssignableEnabledLiveInstances()).thenReturn(liveInstanceMap.keySet());
+ when(testCache.getEnabledInstances()).thenReturn(liveInstanceMap.keySet());
+ when(testCache.getEnabledLiveInstances()).thenReturn(liveInstanceMap.keySet());
when(testCache.getEnabledLiveInstances()).thenReturn(liveInstanceMap.keySet());
when(testCache.getAssignableInstances()).thenReturn(_instances);
when(testCache.getAllInstances()).thenReturn(_instances);
@@ -375,7 +375,7 @@
Collectors.toMap(resourceName -> resourceName, Resource::new));
try {
rebalancer.computeBestPossibleAssignment(clusterData, resourceMap,
- clusterData.getAssignableEnabledLiveInstances(), new CurrentStateOutput(), _algorithm);
+ clusterData.getEnabledLiveInstances(), new CurrentStateOutput(), _algorithm);
Assert.fail("Rebalance shall fail.");
} catch (HelixRebalanceException ex) {
Assert.assertEquals(ex.getFailureType(), HelixRebalanceException.Type.FAILED_TO_CALCULATE);
@@ -439,7 +439,7 @@
// Calculation will fail
try {
rebalancer.computeBestPossibleAssignment(clusterData, resourceMap,
- clusterData.getAssignableEnabledLiveInstances(), new CurrentStateOutput(), badAlgorithm);
+ clusterData.getEnabledLiveInstances(), new CurrentStateOutput(), badAlgorithm);
Assert.fail("Rebalance shall fail.");
} catch (HelixRebalanceException ex) {
Assert.assertEquals(ex.getFailureType(), HelixRebalanceException.Type.FAILED_TO_CALCULATE);
@@ -749,8 +749,8 @@
Set<String> instances = new HashSet<>(_instances);
instances.add(offlineInstance);
when(clusterData.getAssignableInstances()).thenReturn(instances);
- when(clusterData.getAssignableEnabledInstances()).thenReturn(instances);
- when(clusterData.getAssignableEnabledLiveInstances()).thenReturn(
+ when(clusterData.getEnabledInstances()).thenReturn(instances);
+ when(clusterData.getEnabledLiveInstances()).thenReturn(
new HashSet<>(Arrays.asList(instance0, instance1, instance2)));
Map<String, Long> instanceOfflineTimeMap = new HashMap<>();
instanceOfflineTimeMap.put(offlineInstance, System.currentTimeMillis() + Integer.MAX_VALUE);
@@ -894,8 +894,8 @@
// force create a fake offlineInstance that's in delay window
Set<String> instances = new HashSet<>(_instances);
when(clusterData.getAssignableInstances()).thenReturn(instances);
- when(clusterData.getAssignableEnabledInstances()).thenReturn(instances);
- when(clusterData.getAssignableEnabledLiveInstances()).thenReturn(instances);
+ when(clusterData.getEnabledInstances()).thenReturn(instances);
+ when(clusterData.getEnabledLiveInstances()).thenReturn(instances);
Map<String, InstanceConfig> instanceConfigMap = clusterData.getAssignableInstanceConfigMap();
when(clusterData.getAssignableInstanceConfigMap()).thenReturn(instanceConfigMap);
when(clusterData.getInstanceConfigMap()).thenReturn(instanceConfigMap);
diff --git a/helix-core/src/test/java/org/apache/helix/controller/rebalancer/waged/TestWagedRebalancerMetrics.java b/helix-core/src/test/java/org/apache/helix/controller/rebalancer/waged/TestWagedRebalancerMetrics.java
index c5c7b56..3fb05e5 100644
--- a/helix-core/src/test/java/org/apache/helix/controller/rebalancer/waged/TestWagedRebalancerMetrics.java
+++ b/helix-core/src/test/java/org/apache/helix/controller/rebalancer/waged/TestWagedRebalancerMetrics.java
@@ -266,9 +266,9 @@
liveInstanceMap.put(instanceName, testLiveInstance);
when(testCache.getAssignableLiveInstances()).thenReturn(liveInstanceMap);
when(testCache.getLiveInstances()).thenReturn(liveInstanceMap);
- when(testCache.getAssignableEnabledInstances()).thenReturn(liveInstanceMap.keySet());
when(testCache.getEnabledInstances()).thenReturn(liveInstanceMap.keySet());
- when(testCache.getAssignableEnabledLiveInstances()).thenReturn(liveInstanceMap.keySet());
+ when(testCache.getEnabledInstances()).thenReturn(liveInstanceMap.keySet());
+ when(testCache.getEnabledLiveInstances()).thenReturn(liveInstanceMap.keySet());
when(testCache.getEnabledLiveInstances()).thenReturn(liveInstanceMap.keySet());
when(testCache.getAssignableInstances()).thenReturn(_instances);
when(testCache.getAllInstances()).thenReturn(_instances);
diff --git a/helix-core/src/test/java/org/apache/helix/controller/rebalancer/waged/model/AbstractTestClusterModel.java b/helix-core/src/test/java/org/apache/helix/controller/rebalancer/waged/model/AbstractTestClusterModel.java
index c9deb79..b75a340 100644
--- a/helix-core/src/test/java/org/apache/helix/controller/rebalancer/waged/model/AbstractTestClusterModel.java
+++ b/helix-core/src/test/java/org/apache/helix/controller/rebalancer/waged/model/AbstractTestClusterModel.java
@@ -29,6 +29,7 @@
import java.util.Set;
import java.util.stream.Collectors;
+import org.apache.helix.constants.InstanceConstants;
import org.apache.helix.controller.dataproviders.ResourceControllerDataProvider;
import org.apache.helix.controller.rebalancer.constraint.MonitoredAbnormalResolver;
import org.apache.helix.model.BuiltInStateModelDefinitions;
@@ -84,7 +85,7 @@
InstanceConfig testInstanceConfig = new InstanceConfig(instanceId);
testInstanceConfig.setInstanceCapacityMap(_capacityDataMap);
testInstanceConfig.addTag(_testInstanceTags.get(0));
- testInstanceConfig.setInstanceEnabled(true);
+ testInstanceConfig.setInstanceOperation(InstanceConstants.InstanceOperation.ENABLE);
testInstanceConfig.setZoneId(_testFaultZoneId);
return testInstanceConfig;
}
diff --git a/helix-core/src/test/java/org/apache/helix/controller/rebalancer/waged/model/TestClusterModelProvider.java b/helix-core/src/test/java/org/apache/helix/controller/rebalancer/waged/model/TestClusterModelProvider.java
index 34582d6..2e41b6d 100644
--- a/helix-core/src/test/java/org/apache/helix/controller/rebalancer/waged/model/TestClusterModelProvider.java
+++ b/helix-core/src/test/java/org/apache/helix/controller/rebalancer/waged/model/TestClusterModelProvider.java
@@ -106,7 +106,7 @@
activeInstances.add(instance1);
activeInstances.add(instance2);
when(testCache.getAssignableLiveInstances()).thenReturn(liveInstanceMap);
- when(testCache.getAssignableEnabledLiveInstances()).thenReturn(activeInstances);
+ when(testCache.getEnabledLiveInstances()).thenReturn(activeInstances);
// test 0, empty input
Assert.assertEquals(
@@ -144,7 +144,7 @@
// test 2, no additional replica to be assigned
testCache = setupClusterDataCache();
when(testCache.getAssignableLiveInstances()).thenReturn(liveInstanceMap);
- when(testCache.getAssignableEnabledLiveInstances()).thenReturn(activeInstances);
+ when(testCache.getEnabledLiveInstances()).thenReturn(activeInstances);
input = ImmutableMap.of(
_resourceNames.get(0),
ImmutableMap.of(
@@ -169,7 +169,7 @@
// test 3, minActiveReplica==2, two partitions falling short
testCache = setupClusterDataCache();
when(testCache.getAssignableLiveInstances()).thenReturn(liveInstanceMap);
- when(testCache.getAssignableEnabledLiveInstances()).thenReturn(activeInstances);
+ when(testCache.getEnabledLiveInstances()).thenReturn(activeInstances);
input = ImmutableMap.of(
_resourceNames.get(0),
ImmutableMap.of(
@@ -207,7 +207,7 @@
activeInstances.add(instance1);
activeInstances.add(instance2);
when(testCache.getAssignableLiveInstances()).thenReturn(liveInstanceMap);
- when(testCache.getAssignableEnabledLiveInstances()).thenReturn(activeInstances);
+ when(testCache.getEnabledLiveInstances()).thenReturn(activeInstances);
// test 1, one partition under minActiveReplica
Map<String, Map<String, Map<String, String>>> input = ImmutableMap.of(
@@ -247,7 +247,7 @@
// test 2, minActiveReplica==2, three partitions falling short
testCache = setupClusterDataCache();
when(testCache.getAssignableLiveInstances()).thenReturn(liveInstanceMap);
- when(testCache.getAssignableEnabledLiveInstances()).thenReturn(activeInstances);
+ when(testCache.getEnabledLiveInstances()).thenReturn(activeInstances);
input = ImmutableMap.of(
_resourceNames.get(0),
ImmutableMap.of(
diff --git a/helix-core/src/test/java/org/apache/helix/controller/stages/TestBestPossibleCalcStageCompatibility.java b/helix-core/src/test/java/org/apache/helix/controller/stages/TestBestPossibleCalcStageCompatibility.java
index 0027f8e..2a548ce 100644
--- a/helix-core/src/test/java/org/apache/helix/controller/stages/TestBestPossibleCalcStageCompatibility.java
+++ b/helix-core/src/test/java/org/apache/helix/controller/stages/TestBestPossibleCalcStageCompatibility.java
@@ -58,6 +58,7 @@
event.addAttribute(AttributeName.RESOURCES.name(), resourceMap);
event.addAttribute(AttributeName.RESOURCES_TO_REBALANCE.name(), resourceMap);
event.addAttribute(AttributeName.CURRENT_STATE.name(), currentStateOutput);
+ event.addAttribute(AttributeName.CURRENT_STATE_EXCLUDING_UNKNOWN.name(), currentStateOutput);
event.addAttribute(AttributeName.ControllerDataProvider.name(),
new ResourceControllerDataProvider());
@@ -94,6 +95,7 @@
event.addAttribute(AttributeName.RESOURCES.name(), resourceMap);
event.addAttribute(AttributeName.RESOURCES_TO_REBALANCE.name(), resourceMap);
event.addAttribute(AttributeName.CURRENT_STATE.name(), currentStateOutput);
+ event.addAttribute(AttributeName.CURRENT_STATE_EXCLUDING_UNKNOWN.name(), currentStateOutput);
event.addAttribute(AttributeName.ControllerDataProvider.name(),
new ResourceControllerDataProvider());
diff --git a/helix-core/src/test/java/org/apache/helix/controller/stages/TestBestPossibleStateCalcStage.java b/helix-core/src/test/java/org/apache/helix/controller/stages/TestBestPossibleStateCalcStage.java
index e33dc9f..518c610 100644
--- a/helix-core/src/test/java/org/apache/helix/controller/stages/TestBestPossibleStateCalcStage.java
+++ b/helix-core/src/test/java/org/apache/helix/controller/stages/TestBestPossibleStateCalcStage.java
@@ -58,6 +58,7 @@
event.addAttribute(AttributeName.RESOURCES.name(), resourceMap);
event.addAttribute(AttributeName.RESOURCES_TO_REBALANCE.name(), resourceMap);
event.addAttribute(AttributeName.CURRENT_STATE.name(), currentStateOutput);
+ event.addAttribute(AttributeName.CURRENT_STATE_EXCLUDING_UNKNOWN.name(), currentStateOutput);
event.addAttribute(AttributeName.ControllerDataProvider.name(), new ResourceControllerDataProvider());
ReadClusterDataStage stage1 = new ReadClusterDataStage();
@@ -117,6 +118,7 @@
event.addAttribute(AttributeName.RESOURCES.name(), resourceMap);
event.addAttribute(AttributeName.RESOURCES_TO_REBALANCE.name(), resourceMap);
event.addAttribute(AttributeName.CURRENT_STATE.name(), currentStateOutput);
+ event.addAttribute(AttributeName.CURRENT_STATE_EXCLUDING_UNKNOWN.name(), currentStateOutput);
event.addAttribute(AttributeName.ControllerDataProvider.name(),
new ResourceControllerDataProvider());
diff --git a/helix-core/src/test/java/org/apache/helix/controller/stages/TestCancellationMessageGeneration.java b/helix-core/src/test/java/org/apache/helix/controller/stages/TestCancellationMessageGeneration.java
index 7b89152..7d815c1 100644
--- a/helix-core/src/test/java/org/apache/helix/controller/stages/TestCancellationMessageGeneration.java
+++ b/helix-core/src/test/java/org/apache/helix/controller/stages/TestCancellationMessageGeneration.java
@@ -68,6 +68,7 @@
when(message.getToState()).thenReturn("SLAVE");
when(currentStateOutput.getPendingMessage(TEST_RESOURCE, partition, TEST_INSTANCE)).thenReturn(message);
event.addAttribute(AttributeName.CURRENT_STATE.name(), currentStateOutput);
+ event.addAttribute(AttributeName.CURRENT_STATE_EXCLUDING_UNKNOWN.name(), currentStateOutput);
// Set helix manager to event
event.addAttribute(AttributeName.helixmanager.name(), mock(HelixManager.class));
@@ -157,6 +158,7 @@
when(currentStateOutput.getPendingMessage(TEST_RESOURCE, partition, TEST_INSTANCE))
.thenReturn(pendingMessage);
event.addAttribute(AttributeName.CURRENT_STATE.name(), currentStateOutput);
+ event.addAttribute(AttributeName.CURRENT_STATE_EXCLUDING_UNKNOWN.name(), currentStateOutput);
// Set helix manager to event
event.addAttribute(AttributeName.helixmanager.name(), mock(HelixManager.class));
diff --git a/helix-core/src/test/java/org/apache/helix/controller/stages/TestIntermediateStateCalcStage.java b/helix-core/src/test/java/org/apache/helix/controller/stages/TestIntermediateStateCalcStage.java
index 7da3d64..f15e6b8 100644
--- a/helix-core/src/test/java/org/apache/helix/controller/stages/TestIntermediateStateCalcStage.java
+++ b/helix-core/src/test/java/org/apache/helix/controller/stages/TestIntermediateStateCalcStage.java
@@ -177,6 +177,7 @@
event.addAttribute(AttributeName.BEST_POSSIBLE_STATE.name(), bestPossibleStateOutput);
event.addAttribute(AttributeName.MESSAGES_SELECTED.name(), messageSelectOutput);
event.addAttribute(AttributeName.CURRENT_STATE.name(), currentStateOutput);
+ event.addAttribute(AttributeName.CURRENT_STATE_EXCLUDING_UNKNOWN.name(), currentStateOutput);
event.addAttribute(AttributeName.ControllerDataProvider.name(), new ResourceControllerDataProvider());
runStage(event, new ReadClusterDataStage());
runStage(event, new IntermediateStateCalcStage());
@@ -261,6 +262,7 @@
event.addAttribute(AttributeName.BEST_POSSIBLE_STATE.name(), bestPossibleStateOutput);
event.addAttribute(AttributeName.CURRENT_STATE.name(), currentStateOutput);
+ event.addAttribute(AttributeName.CURRENT_STATE_EXCLUDING_UNKNOWN.name(), currentStateOutput);
event.addAttribute(AttributeName.MESSAGES_SELECTED.name(), messageSelectOutput);
event.addAttribute(AttributeName.ControllerDataProvider.name(), new ResourceControllerDataProvider());
runStage(event, new ReadClusterDataStage());
@@ -379,6 +381,7 @@
event.addAttribute(AttributeName.BEST_POSSIBLE_STATE.name(), bestPossibleStateOutput);
event.addAttribute(AttributeName.CURRENT_STATE.name(), currentStateOutput);
+ event.addAttribute(AttributeName.CURRENT_STATE_EXCLUDING_UNKNOWN.name(), currentStateOutput);
event.addAttribute(AttributeName.MESSAGES_SELECTED.name(), messageSelectOutput);
event.addAttribute(AttributeName.ControllerDataProvider.name(),
new ResourceControllerDataProvider());
@@ -553,6 +556,7 @@
event.addAttribute(AttributeName.BEST_POSSIBLE_STATE.name(), bestPossibleStateOutput);
event.addAttribute(AttributeName.MESSAGES_SELECTED.name(), messageSelectOutput);
event.addAttribute(AttributeName.CURRENT_STATE.name(), currentStateOutput);
+ event.addAttribute(AttributeName.CURRENT_STATE_EXCLUDING_UNKNOWN.name(), currentStateOutput);
event.addAttribute(AttributeName.ControllerDataProvider.name(), new ResourceControllerDataProvider());
runStage(event, new ReadClusterDataStage());
runStage(event, new IntermediateStateCalcStage());
diff --git a/helix-core/src/test/java/org/apache/helix/controller/stages/TestManagementMessageGeneration.java b/helix-core/src/test/java/org/apache/helix/controller/stages/TestManagementMessageGeneration.java
index 7c20b02..f1a3da4 100644
--- a/helix-core/src/test/java/org/apache/helix/controller/stages/TestManagementMessageGeneration.java
+++ b/helix-core/src/test/java/org/apache/helix/controller/stages/TestManagementMessageGeneration.java
@@ -84,6 +84,7 @@
when(currentStateOutput.getPendingMessage(TEST_RESOURCE, partition, TEST_INSTANCE))
.thenReturn(pendingMessage);
event.addAttribute(AttributeName.CURRENT_STATE.name(), currentStateOutput);
+ event.addAttribute(AttributeName.CURRENT_STATE_EXCLUDING_UNKNOWN.name(), currentStateOutput);
// Set helix manager to event
event.addAttribute(AttributeName.helixmanager.name(), mock(HelixManager.class));
diff --git a/helix-core/src/test/java/org/apache/helix/controller/stages/TestManagementModeStage.java b/helix-core/src/test/java/org/apache/helix/controller/stages/TestManagementModeStage.java
index 1c61624..0c144d8 100644
--- a/helix-core/src/test/java/org/apache/helix/controller/stages/TestManagementModeStage.java
+++ b/helix-core/src/test/java/org/apache/helix/controller/stages/TestManagementModeStage.java
@@ -51,6 +51,7 @@
public void beforeClass() {
_clusterName = "CLUSTER_" + TestHelper.getTestClassName();
_accessor = new ZKHelixDataAccessor(_clusterName, new ZkBaseDataAccessor<>(_gZkClient));
+ _gSetupTool.setupTestCluster(_clusterName);
_manager = new DummyClusterManager(_clusterName, _accessor);
}
@@ -65,6 +66,8 @@
// ideal state: node0 is MASTER, node1 is SLAVE
// replica=2 means 1 master and 1 slave
setupIdealState(_clusterName, new int[]{0, 1}, new String[]{"TestDB"}, 1, 2);
+ _gSetupTool.addInstanceToCluster(_clusterName, "localhost_0");
+ _gSetupTool.addInstanceToCluster(_clusterName, "localhost_1");
List<LiveInstance> liveInstances = setupLiveInstances(_clusterName, new int[]{0, 1});
setupStateModel(_clusterName);
@@ -96,7 +99,7 @@
ControllerHistory history =
_accessor.getProperty(_accessor.keyBuilder().controllerLeaderHistory());
- Assert.assertNull(history);
+ Assert.assertTrue(history.getMaintenanceHistoryList().isEmpty());
// Mark both live instances to be frozen, then entering freeze mode is complete
for (int i = 0; i < 2; i++) {
diff --git a/helix-core/src/test/java/org/apache/helix/controller/stages/TestReplicaLevelThrottling.java b/helix-core/src/test/java/org/apache/helix/controller/stages/TestReplicaLevelThrottling.java
index e4aeed0..515340d 100644
--- a/helix-core/src/test/java/org/apache/helix/controller/stages/TestReplicaLevelThrottling.java
+++ b/helix-core/src/test/java/org/apache/helix/controller/stages/TestReplicaLevelThrottling.java
@@ -71,7 +71,7 @@
when(mock.cache.getClusterConfig()).thenReturn((ClusterConfig) cacheMap.get(CacheKeys.clusterConfig.name()));
when(mock.cache.getStateModelDef((String) cacheMap.get(CacheKeys.stateModelName.name()))).thenReturn(
(StateModelDefinition) cacheMap.get(CacheKeys.stateModelDef.name()));
- when(mock.cache.getAssignableEnabledLiveInstances()).thenReturn(new HashSet<>(
+ when(mock.cache.getEnabledLiveInstances()).thenReturn(new HashSet<>(
((Map<String, List<String>>) cacheMap.get(CacheKeys.preferenceList.name())).values().iterator().next()));
when(mock.cache.getLiveInstances()).thenReturn(new HashSet<>(
((Map<String, List<String>>) cacheMap.get(CacheKeys.preferenceList.name())).values().iterator().next()).stream()
@@ -189,6 +189,7 @@
}
ClusterEvent event = new ClusterEvent(CLUSTER_NAME, ClusterEventType.Unknown);
event.addAttribute(AttributeName.CURRENT_STATE.name(), currentStateOutput); // add current states
+ event.addAttribute(AttributeName.CURRENT_STATE_EXCLUDING_UNKNOWN.name(), currentStateOutput); // add current states
event.addAttribute(AttributeName.ControllerDataProvider.name(),
buildCache(mock, numReplica, minActiveReplica, stateModelDef, stateModelName, preferenceLists));
event.addAttribute(AttributeName.MESSAGES_SELECTED.name(), messageOutput);
diff --git a/helix-core/src/test/java/org/apache/helix/controller/stages/TestStateTransitionPriority.java b/helix-core/src/test/java/org/apache/helix/controller/stages/TestStateTransitionPriority.java
index 22c20f7..e457e31 100644
--- a/helix-core/src/test/java/org/apache/helix/controller/stages/TestStateTransitionPriority.java
+++ b/helix-core/src/test/java/org/apache/helix/controller/stages/TestStateTransitionPriority.java
@@ -85,6 +85,7 @@
event.addAttribute(AttributeName.BEST_POSSIBLE_STATE.name(), bestPossibleStateOutput);
event.addAttribute(AttributeName.MESSAGES_SELECTED.name(), messageSelectOutput);
event.addAttribute(AttributeName.CURRENT_STATE.name(), currentStateOutput);
+ event.addAttribute(AttributeName.CURRENT_STATE_EXCLUDING_UNKNOWN.name(), currentStateOutput);
runStage(event, new ReadClusterDataStage());
// Keep update the current state.
@@ -133,6 +134,7 @@
event.addAttribute(AttributeName.BEST_POSSIBLE_STATE.name(), bestPossibleStateOutput);
event.addAttribute(AttributeName.MESSAGES_SELECTED.name(), messageSelectOutput);
event.addAttribute(AttributeName.CURRENT_STATE.name(), currentStateOutput);
+ event.addAttribute(AttributeName.CURRENT_STATE_EXCLUDING_UNKNOWN.name(), currentStateOutput);
event.addAttribute(AttributeName.ControllerDataProvider.name(),
new ResourceControllerDataProvider());
runStage(event, new ReadClusterDataStage());
@@ -194,6 +196,7 @@
event.addAttribute(AttributeName.MESSAGES_SELECTED.name(),
generateMessageMapForPartition(bestPossibleMap, currentStateMap, Collections.emptyList(), resourceName));
event.addAttribute(AttributeName.CURRENT_STATE.name(), currentStateOutput);
+ event.addAttribute(AttributeName.CURRENT_STATE_EXCLUDING_UNKNOWN.name(), currentStateOutput);
event.addAttribute(AttributeName.ControllerDataProvider.name(),
new ResourceControllerDataProvider());
runStage(event, new ReadClusterDataStage());
@@ -350,6 +353,7 @@
resourcePriority.add(resourceName);
currentStateOutput.setCurrentState(resourceName, partition, instanceName, state);
event.addAttribute(AttributeName.CURRENT_STATE.name(), currentStateOutput);
+ event.addAttribute(AttributeName.CURRENT_STATE_EXCLUDING_UNKNOWN.name(), currentStateOutput);
}
private void updateCurrentStateForPartitionLevelPriority(List<String> partitionPriority,
diff --git a/helix-core/src/test/java/org/apache/helix/integration/TestAlertingRebalancerFailure.java b/helix-core/src/test/java/org/apache/helix/integration/TestAlertingRebalancerFailure.java
index 2f1dee2..1b2e54b 100644
--- a/helix-core/src/test/java/org/apache/helix/integration/TestAlertingRebalancerFailure.java
+++ b/helix-core/src/test/java/org/apache/helix/integration/TestAlertingRebalancerFailure.java
@@ -33,6 +33,7 @@
import org.apache.helix.HelixDataAccessor;
import org.apache.helix.PropertyKey;
import org.apache.helix.TestHelper;
+import org.apache.helix.constants.InstanceConstants;
import org.apache.helix.controller.rebalancer.strategy.CrushEdRebalanceStrategy;
import org.apache.helix.controller.rebalancer.strategy.CrushRebalanceStrategy;
import org.apache.helix.integration.common.ZkStandAloneCMTestBase;
@@ -297,7 +298,8 @@
private void setInstanceEnable(String instanceName, boolean enabled,
ConfigAccessor configAccessor) {
InstanceConfig instanceConfig = configAccessor.getInstanceConfig(CLUSTER_NAME, instanceName);
- instanceConfig.setInstanceEnabled(enabled);
+ instanceConfig.setInstanceOperation(enabled ? InstanceConstants.InstanceOperation.ENABLE
+ : InstanceConstants.InstanceOperation.DISABLE);
configAccessor.setInstanceConfig(CLUSTER_NAME, instanceName, instanceConfig);
}
diff --git a/helix-core/src/test/java/org/apache/helix/integration/TestDisableCustomCodeRunner.java b/helix-core/src/test/java/org/apache/helix/integration/TestDisableCustomCodeRunner.java
index a5416d4..ce3077d 100644
--- a/helix-core/src/test/java/org/apache/helix/integration/TestDisableCustomCodeRunner.java
+++ b/helix-core/src/test/java/org/apache/helix/integration/TestDisableCustomCodeRunner.java
@@ -32,6 +32,7 @@
import org.apache.helix.PropertyKey;
import org.apache.helix.TestHelper;
import org.apache.helix.ZkUnitTestBase;
+import org.apache.helix.constants.InstanceConstants;
import org.apache.helix.integration.manager.ClusterControllerManager;
import org.apache.helix.integration.manager.MockParticipantManager;
import org.apache.helix.manager.zk.ZKHelixAdmin;
@@ -170,7 +171,7 @@
InstanceConfig instanceConfig = new InstanceConfig(fakeInstanceName);
instanceConfig.setHostName("localhost");
instanceConfig.setPort("10000");
- instanceConfig.setInstanceEnabled(true);
+ instanceConfig.setInstanceOperation(InstanceConstants.InstanceOperation.ENABLE);
admin.addInstance(clusterName, instanceConfig);
LiveInstance fakeInstance = new LiveInstance(fakeInstanceName);
diff --git a/helix-core/src/test/java/org/apache/helix/integration/TestNoThrottleDisabledPartitions.java b/helix-core/src/test/java/org/apache/helix/integration/TestNoThrottleDisabledPartitions.java
index 98dd281..9cb248f 100644
--- a/helix-core/src/test/java/org/apache/helix/integration/TestNoThrottleDisabledPartitions.java
+++ b/helix-core/src/test/java/org/apache/helix/integration/TestNoThrottleDisabledPartitions.java
@@ -28,6 +28,7 @@
import org.apache.helix.TestHelper;
import org.apache.helix.api.config.StateTransitionThrottleConfig;
import org.apache.helix.common.ZkTestBase;
+import org.apache.helix.constants.InstanceConstants;
import org.apache.helix.integration.manager.ClusterControllerManager;
import org.apache.helix.integration.manager.MockParticipantManager;
import org.apache.helix.manager.zk.ZKHelixDataAccessor;
@@ -74,7 +75,7 @@
// Disable instance 0 so that it will cause a partition to do a load balance
PropertyKey key = _accessor.keyBuilder().instanceConfig(_participants[0].getInstanceName());
InstanceConfig instanceConfig = _accessor.getProperty(key);
- instanceConfig.setInstanceEnabled(false);
+ instanceConfig.setInstanceOperation(InstanceConstants.InstanceOperation.DISABLE);
_accessor.setProperty(key, instanceConfig);
// Resume the controller
@@ -234,7 +235,7 @@
// Disable an instance so that it will not be subject to throttling
PropertyKey key = _accessor.keyBuilder().instanceConfig(_participants[0].getInstanceName());
InstanceConfig instanceConfig = _accessor.getProperty(key);
- instanceConfig.setInstanceEnabled(false);
+ instanceConfig.setInstanceOperation(InstanceConstants.InstanceOperation.DISABLE);
_accessor.setProperty(key, instanceConfig);
// Set the state transition delay so that transitions would be processed slowly
diff --git a/helix-core/src/test/java/org/apache/helix/integration/controller/TestClusterMaintenanceMode.java b/helix-core/src/test/java/org/apache/helix/integration/controller/TestClusterMaintenanceMode.java
index ebfb03e..6654098 100644
--- a/helix-core/src/test/java/org/apache/helix/integration/controller/TestClusterMaintenanceMode.java
+++ b/helix-core/src/test/java/org/apache/helix/integration/controller/TestClusterMaintenanceMode.java
@@ -280,7 +280,7 @@
Assert.assertEquals(maintenanceSignal.getTriggeringEntity(),
MaintenanceSignal.TriggeringEntity.CONTROLLER);
Assert.assertEquals(maintenanceSignal.getAutoTriggerReason(),
- MaintenanceSignal.AutoTriggerReason.MAX_OFFLINE_INSTANCES_EXCEEDED);
+ MaintenanceSignal.AutoTriggerReason.MAX_INSTANCES_UNABLE_TO_ACCEPT_ONLINE_REPLICAS);
// Bring up all instances
for (int i = 0; i < 3; i++) {
@@ -306,7 +306,7 @@
Assert.assertEquals(maintenanceSignal.getTriggeringEntity(),
MaintenanceSignal.TriggeringEntity.CONTROLLER);
Assert.assertEquals(maintenanceSignal.getAutoTriggerReason(),
- MaintenanceSignal.AutoTriggerReason.MAX_OFFLINE_INSTANCES_EXCEEDED);
+ MaintenanceSignal.AutoTriggerReason.MAX_INSTANCES_UNABLE_TO_ACCEPT_ONLINE_REPLICAS);
// Set the cluster config for auto-exiting maintenance mode
ClusterConfig clusterConfig = _manager.getConfigAccessor().getClusterConfig(CLUSTER_NAME);
diff --git a/helix-core/src/test/java/org/apache/helix/integration/messaging/TestMessageThrottle2.java b/helix-core/src/test/java/org/apache/helix/integration/messaging/TestMessageThrottle2.java
index b11e635..a61ffea 100644
--- a/helix-core/src/test/java/org/apache/helix/integration/messaging/TestMessageThrottle2.java
+++ b/helix-core/src/test/java/org/apache/helix/integration/messaging/TestMessageThrottle2.java
@@ -41,6 +41,7 @@
import org.apache.helix.NotificationContext;
import org.apache.helix.PropertyKey.Builder;
import org.apache.helix.TestHelper;
+import org.apache.helix.constants.InstanceConstants;
import org.apache.helix.zookeeper.datamodel.ZNRecord;
import org.apache.helix.common.ZkTestBase;
import org.apache.helix.controller.HelixControllerMain;
@@ -379,7 +380,7 @@
if (instanceConfig == null) {
InstanceConfig config = new InstanceConfig(instanceName);
config.setHostName("localhost");
- config.setInstanceEnabled(true);
+ config.setInstanceOperation(InstanceConstants.InstanceOperation.ENABLE);
echo("Adding InstanceConfig:" + config);
admin.addInstance(_clusterName, config);
}
diff --git a/helix-core/src/test/java/org/apache/helix/integration/rebalancer/CrushRebalancers/TestNodeSwap.java b/helix-core/src/test/java/org/apache/helix/integration/rebalancer/CrushRebalancers/TestNodeSwap.java
index 3b13868..fef7ea0 100644
--- a/helix-core/src/test/java/org/apache/helix/integration/rebalancer/CrushRebalancers/TestNodeSwap.java
+++ b/helix-core/src/test/java/org/apache/helix/integration/rebalancer/CrushRebalancers/TestNodeSwap.java
@@ -30,6 +30,7 @@
import org.apache.helix.ConfigAccessor;
import org.apache.helix.TestHelper;
import org.apache.helix.common.ZkTestBase;
+import org.apache.helix.constants.InstanceConstants;
import org.apache.helix.controller.rebalancer.strategy.CrushEdRebalanceStrategy;
import org.apache.helix.controller.rebalancer.strategy.CrushRebalanceStrategy;
import org.apache.helix.controller.rebalancer.strategy.MultiRoundCrushRebalanceStrategy;
@@ -170,7 +171,7 @@
final InstanceConfig instanceConfig =
_gSetupTool.getClusterManagementTool().getInstanceConfig(CLUSTER_NAME, oldParticipantName);
// disable the node first
- instanceConfig.setInstanceEnabled(false);
+ instanceConfig.setInstanceOperation(InstanceConstants.InstanceOperation.DISABLE);
_gSetupTool.getClusterManagementTool().setInstanceConfig(CLUSTER_NAME, oldParticipantName,
instanceConfig);
Assert.assertTrue(_clusterVerifier.verify(10000));
diff --git a/helix-core/src/test/java/org/apache/helix/integration/rebalancer/PartitionMigration/TestExpandCluster.java b/helix-core/src/test/java/org/apache/helix/integration/rebalancer/PartitionMigration/TestExpandCluster.java
index 15b66af..cd5338e 100644
--- a/helix-core/src/test/java/org/apache/helix/integration/rebalancer/PartitionMigration/TestExpandCluster.java
+++ b/helix-core/src/test/java/org/apache/helix/integration/rebalancer/PartitionMigration/TestExpandCluster.java
@@ -21,6 +21,7 @@
import java.util.Map;
+import org.apache.helix.constants.InstanceConstants;
import org.apache.helix.integration.manager.MockParticipantManager;
import org.apache.helix.model.ClusterConfig;
import org.apache.helix.model.IdealState;
diff --git a/helix-core/src/test/java/org/apache/helix/integration/rebalancer/TestInstanceOperation.java b/helix-core/src/test/java/org/apache/helix/integration/rebalancer/TestInstanceOperation.java
index 1fc3a3e..85600c0 100644
--- a/helix-core/src/test/java/org/apache/helix/integration/rebalancer/TestInstanceOperation.java
+++ b/helix-core/src/test/java/org/apache/helix/integration/rebalancer/TestInstanceOperation.java
@@ -99,7 +99,7 @@
List<String> _participantNames = new ArrayList<>();
private Set<String> _allDBs = new HashSet<>();
private ZkHelixClusterVerifier _clusterVerifier;
- private ZkHelixClusterVerifier _bestPossibleClusterVerifier;
+ private BestPossibleExternalViewVerifier _bestPossibleClusterVerifier;
private ConfigAccessor _configAccessor;
private long _stateModelDelay = 3L;
@@ -204,7 +204,7 @@
Assert.assertTrue(_clusterVerifier.verifyByPolling());
}
- private void removeOfflineOrDisabledOrSwapInInstances() {
+ private void removeOfflineOrInactiveInstances() {
// Remove all instances that are not live, disabled, or in SWAP_IN state.
for (int i = 0; i < _participants.size(); i++) {
String participantName = _participantNames.get(i);
@@ -212,7 +212,7 @@
_gSetupTool.getClusterManagementTool().getInstanceConfig(CLUSTER_NAME, participantName);
if (!_participants.get(i).isConnected() || !instanceConfig.getInstanceEnabled()
|| instanceConfig.getInstanceOperation()
- .equals(InstanceConstants.InstanceOperation.SWAP_IN.name())) {
+ .equals(InstanceConstants.InstanceOperation.SWAP_IN)) {
if (_participants.get(i).isConnected()) {
_participants.get(i).syncStop();
}
@@ -268,11 +268,6 @@
_gSetupTool.dropResourceFromCluster(CLUSTER_NAME, semiAutoDB);
Assert.assertTrue(_clusterVerifier.verifyByPolling());
- // Disable, stop, and drop the instance from the cluster.
- _gSetupTool.getClusterManagementTool().enableInstance(CLUSTER_NAME, instanceToEvacuate, false);
- _participants.get(0).syncStop();
- removeOfflineOrDisabledOrSwapInInstances();
-
// Compare the current ev with the previous one, it should be exactly the same since the baseline should not change
// after the instance is dropped.
Assert.assertTrue(_clusterVerifier.verifyByPolling());
@@ -284,9 +279,12 @@
System.out.println("START TestInstanceOperation.testRevertEvacuation() at " + new Date(System.currentTimeMillis()));
// revert an evacuate instance
String instanceToEvacuate = _participants.get(0).getInstanceName();
- _gSetupTool.getClusterManagementTool()
- .setInstanceOperation(CLUSTER_NAME, instanceToEvacuate, null);
+ _gSetupTool.getClusterManagementTool().setInstanceOperation(CLUSTER_NAME, instanceToEvacuate,
+ InstanceConstants.InstanceOperation.ENABLE);
+ Assert.assertTrue(
+ _gSetupTool.getClusterManagementTool().getInstanceConfig(CLUSTER_NAME, instanceToEvacuate)
+ .getInstanceEnabled());
Assert.assertTrue(_clusterVerifier.verifyByPolling());
// EV should contain all participants, check resources one by one
@@ -302,10 +300,12 @@
System.out.println("START TestInstanceOperation.testAddingNodeWithEvacuationTag() at " + new Date(System.currentTimeMillis()));
// first disable and instance, and wait for all replicas to be moved out
String mockNewInstance = _participants.get(0).getInstanceName();
+ // This is using a deprecated method to ensure that the disabling still takes precedence over the InstanceOperation when being set
+ // to false.
_gSetupTool.getClusterManagementTool()
.enableInstance(CLUSTER_NAME, mockNewInstance, false);
Assert.assertTrue(_clusterVerifier.verifyByPolling());
- //ev should contain all instances but the disabled one
+ // ev should contain all instances but the disabled one
Map<String, ExternalView> assignment = getEVs();
List<String> currentActiveInstances =
_participantNames.stream().filter(n -> !n.equals(mockNewInstance)).collect(Collectors.toList());
@@ -317,10 +317,13 @@
}
// add evacuate tag and enable instance
+ // Because HELIX_ENABLED is set to false, getInstanceOperation still returns DISABLE
_gSetupTool.getClusterManagementTool()
.setInstanceOperation(CLUSTER_NAME, mockNewInstance, InstanceConstants.InstanceOperation.EVACUATE);
- _gSetupTool.getClusterManagementTool()
- .enableInstance(CLUSTER_NAME, mockNewInstance, true);
+
+ // enable instance so InstanceOperation is no longer overriden with DISABLE
+ _gSetupTool.getClusterManagementTool().enableInstance(CLUSTER_NAME, mockNewInstance, true);
+
//ev should be the same
assignment = getEVs();
currentActiveInstances =
@@ -347,84 +350,73 @@
}
}
- @Test(expectedExceptions = HelixException.class, dependsOnMethods = "testAddingNodeWithEvacuationTag")
+ @Test(dependsOnMethods = "testAddingNodeWithEvacuationTag")
public void testNodeSwapNoTopologySetup() throws Exception {
System.out.println("START TestInstanceOperation.testNodeSwapNoTopologySetup() at " + new Date(
System.currentTimeMillis()));
- removeOfflineOrDisabledOrSwapInInstances();
+ removeOfflineOrInactiveInstances();
- // Set instance's InstanceOperation to SWAP_OUT
String instanceToSwapOutName = _participants.get(0).getInstanceName();
- _gSetupTool.getClusterManagementTool().setInstanceOperation(CLUSTER_NAME, instanceToSwapOutName,
- InstanceConstants.InstanceOperation.SWAP_OUT);
- // Add instance with InstanceOperation set to SWAP_IN
- // There should be an error that the logicalId does not have SWAP_OUT instance because,
- // helix can't determine what topology key to use to get the logicalId if TOPOLOGY is not set.
+ // Add instance with InstanceOperation set to SWAP_IN as default
+ // The instance will be added with UNKNOWN because the logicalId will not match the
+ // swap out instance since the topology configs are not set.
String instanceToSwapInName = PARTICIPANT_PREFIX + "_" + _nextStartPort;
InstanceConfig instanceToSwapOutInstanceConfig = _gSetupTool.getClusterManagementTool()
.getInstanceConfig(CLUSTER_NAME, instanceToSwapOutName);
addParticipant(instanceToSwapInName, instanceToSwapOutInstanceConfig.getLogicalId(LOGICAL_ID),
instanceToSwapOutInstanceConfig.getDomainAsMap().get(ZONE),
- InstanceConstants.InstanceOperation.SWAP_IN, true, -1);
+ InstanceConstants.InstanceOperation.SWAP_IN, -1);
+
+ Assert.assertEquals(
+ _gSetupTool.getClusterManagementTool().getInstanceConfig(CLUSTER_NAME, instanceToSwapInName)
+ .getInstanceOperation(), InstanceConstants.InstanceOperation.UNKNOWN);
}
- @Test(expectedExceptions = HelixException.class, dependsOnMethods = "testNodeSwapNoTopologySetup")
- public void testAddingNodeWithSwapOutInstanceOperation() throws Exception {
+ @Test(dependsOnMethods = "testNodeSwapNoTopologySetup")
+ public void testAddingNodeWithEnableInstanceOperation() throws Exception {
System.out.println(
- "START TestInstanceOperation.testAddingNodeWithSwapOutInstanceOperation() at " + new Date(
+ "START TestInstanceOperation.testAddingNodeWithEnableInstanceOperation() at " + new Date(
System.currentTimeMillis()));
enabledTopologyAwareRebalance();
- removeOfflineOrDisabledOrSwapInInstances();
+ removeOfflineOrInactiveInstances();
- // Set instance's InstanceOperation to SWAP_OUT
String instanceToSwapOutName = _participants.get(0).getInstanceName();
InstanceConfig instanceToSwapOutInstanceConfig = _gSetupTool.getClusterManagementTool()
.getInstanceConfig(CLUSTER_NAME, instanceToSwapOutName);
- _gSetupTool.getClusterManagementTool().setInstanceOperation(CLUSTER_NAME, instanceToSwapOutName,
- InstanceConstants.InstanceOperation.SWAP_OUT);
- // Add instance with InstanceOperation set to SWAP_IN
+ // Add instance with InstanceOperation set to ENABLE
+ // The instance should be added with UNKNOWN since there is already an instance with
+ // the same logicalId in the cluster and this instance is not being set to SWAP_IN when
+ // added.
String instanceToSwapInName = PARTICIPANT_PREFIX + "_" + _nextStartPort;
addParticipant(instanceToSwapInName, instanceToSwapOutInstanceConfig.getLogicalId(LOGICAL_ID),
instanceToSwapOutInstanceConfig.getDomainAsMap().get(ZONE),
- InstanceConstants.InstanceOperation.SWAP_OUT, true, -1);
+ InstanceConstants.InstanceOperation.ENABLE, -1);
+
+ Assert.assertEquals(
+ _gSetupTool.getClusterManagementTool().getInstanceConfig(CLUSTER_NAME, instanceToSwapInName)
+ .getInstanceOperation(), InstanceConstants.InstanceOperation.UNKNOWN);
}
- @Test(expectedExceptions = HelixException.class, dependsOnMethods = "testAddingNodeWithSwapOutInstanceOperation")
- public void testAddingNodeWithSwapOutNodeInstanceOperationUnset() throws Exception {
- System.out.println(
- "START TestInstanceOperation.testAddingNodeWithSwapOutNodeInstanceOperationUnset() at "
- + new Date(System.currentTimeMillis()));
-
- removeOfflineOrDisabledOrSwapInInstances();
-
- // Set instance's InstanceOperation to null
- String instanceToSwapOutName = _participants.get(0).getInstanceName();
- InstanceConfig instanceToSwapOutInstanceConfig = _gSetupTool.getClusterManagementTool()
- .getInstanceConfig(CLUSTER_NAME, instanceToSwapOutName);
- _gSetupTool.getClusterManagementTool()
- .setInstanceOperation(CLUSTER_NAME, instanceToSwapOutName, null);
-
- // Add instance with InstanceOperation set to SWAP_IN
- String instanceToSwapInName = PARTICIPANT_PREFIX + "_" + _nextStartPort;
- addParticipant(instanceToSwapInName, instanceToSwapOutInstanceConfig.getLogicalId(LOGICAL_ID),
- instanceToSwapOutInstanceConfig.getDomainAsMap().get(ZONE),
- InstanceConstants.InstanceOperation.SWAP_IN, true, -1);
- }
-
- @Test(expectedExceptions = HelixException.class, dependsOnMethods = "testAddingNodeWithSwapOutNodeInstanceOperationUnset")
+ @Test(dependsOnMethods = "testAddingNodeWithEnableInstanceOperation")
public void testNodeSwapWithNoSwapOutNode() throws Exception {
System.out.println("START TestInstanceOperation.testNodeSwapWithNoSwapOutNode() at " + new Date(
System.currentTimeMillis()));
- removeOfflineOrDisabledOrSwapInInstances();
+ removeOfflineOrInactiveInstances();
// Add new instance with InstanceOperation set to SWAP_IN
+ // The instance should be added with UNKNOWN since there is not an instance with a matching
+ // logicalId in the cluster to swap with.
String instanceToSwapInName = PARTICIPANT_PREFIX + "_" + _nextStartPort;
addParticipant(instanceToSwapInName, "1000", "zone_1000",
- InstanceConstants.InstanceOperation.SWAP_IN, true, -1);
+ InstanceConstants.InstanceOperation.SWAP_IN, -1);
+
+ Assert.assertEquals(
+ _gSetupTool.getClusterManagementTool().getInstanceConfig(CLUSTER_NAME, instanceToSwapInName)
+ .getInstanceOperation(), InstanceConstants.InstanceOperation.UNKNOWN);
}
@Test(dependsOnMethods = "testNodeSwapWithNoSwapOutNode")
@@ -433,21 +425,27 @@
"START TestInstanceOperation.testNodeSwapSwapInNodeNoInstanceOperationEnabled() at "
+ new Date(System.currentTimeMillis()));
- removeOfflineOrDisabledOrSwapInInstances();
+ removeOfflineOrInactiveInstances();
- // Set instance's InstanceOperation to SWAP_OUT
String instanceToSwapOutName = _participants.get(0).getInstanceName();
InstanceConfig instanceToSwapOutInstanceConfig = _gSetupTool.getClusterManagementTool()
.getInstanceConfig(CLUSTER_NAME, instanceToSwapOutName);
- _gSetupTool.getClusterManagementTool().setInstanceOperation(CLUSTER_NAME, instanceToSwapOutName,
- InstanceConstants.InstanceOperation.SWAP_OUT);
- // Add instance with same logicalId with InstanceOperation unset
- // This should work because adding instance with InstanceOperation unset will automatically
- // set the InstanceOperation to SWAP_IN.
+ // Add instance with same logicalId with InstanceOperation unset, this is the same as default
+ // which is ENABLE.
+ // The instance should be set to UNKNOWN since there is already a matching logicalId in the cluster.
String instanceToSwapInName = PARTICIPANT_PREFIX + "_" + _nextStartPort;
addParticipant(instanceToSwapInName, instanceToSwapOutInstanceConfig.getLogicalId(LOGICAL_ID),
- instanceToSwapOutInstanceConfig.getDomainAsMap().get(ZONE), null, true, -1);
+ instanceToSwapOutInstanceConfig.getDomainAsMap().get(ZONE), null, -1);
+
+ Assert.assertEquals(
+ _gSetupTool.getClusterManagementTool().getInstanceConfig(CLUSTER_NAME, instanceToSwapInName)
+ .getInstanceOperation(), InstanceConstants.InstanceOperation.UNKNOWN);
+
+ // Setting the InstanceOperation to SWAP_IN should work because there is a matching logicalId in
+ // the cluster and the InstanceCapacityWeights and FaultZone match.
+ _gSetupTool.getClusterManagementTool().setInstanceOperation(CLUSTER_NAME, instanceToSwapInName,
+ InstanceConstants.InstanceOperation.SWAP_IN);
Assert.assertTrue(_bestPossibleClusterVerifier.verifyByPolling());
Assert.assertTrue(_gSetupTool.getClusterManagementTool()
@@ -461,20 +459,17 @@
"START TestInstanceOperation.testNodeSwapSwapInNodeWithAlreadySwappingPair() at "
+ new Date(System.currentTimeMillis()));
- removeOfflineOrDisabledOrSwapInInstances();
+ removeOfflineOrInactiveInstances();
- // Set instance's InstanceOperation to SWAP_OUT
String instanceToSwapOutName = _participants.get(0).getInstanceName();
InstanceConfig instanceToSwapOutInstanceConfig = _gSetupTool.getClusterManagementTool()
.getInstanceConfig(CLUSTER_NAME, instanceToSwapOutName);
- _gSetupTool.getClusterManagementTool().setInstanceOperation(CLUSTER_NAME, instanceToSwapOutName,
- InstanceConstants.InstanceOperation.SWAP_OUT);
// Add instance with InstanceOperation set to SWAP_IN
String instanceToSwapInName = PARTICIPANT_PREFIX + "_" + _nextStartPort;
addParticipant(instanceToSwapInName, instanceToSwapOutInstanceConfig.getLogicalId(LOGICAL_ID),
instanceToSwapOutInstanceConfig.getDomainAsMap().get(ZONE),
- InstanceConstants.InstanceOperation.SWAP_IN, true, -1);
+ InstanceConstants.InstanceOperation.SWAP_IN, -1);
// Add another instance with InstanceOperation set to SWAP_IN with same logicalId as previously
// added SWAP_IN instance.
@@ -482,88 +477,70 @@
addParticipant(secondInstanceToSwapInName,
instanceToSwapOutInstanceConfig.getLogicalId(LOGICAL_ID),
instanceToSwapOutInstanceConfig.getDomainAsMap().get(ZONE),
- InstanceConstants.InstanceOperation.SWAP_IN, true, -1);
+ InstanceConstants.InstanceOperation.SWAP_IN, -1);
+
+ // Instance should be UNKNOWN since there was already a swapping pair.
+ Assert.assertEquals(_gSetupTool.getClusterManagementTool()
+ .getInstanceConfig(CLUSTER_NAME, secondInstanceToSwapInName).getInstanceOperation(),
+ InstanceConstants.InstanceOperation.UNKNOWN);
+
+ // Try to set the InstanceOperation to SWAP_IN, it should throw an exception since there is already
+ // a swapping pair.
+ _gSetupTool.getClusterManagementTool()
+ .setInstanceOperation(CLUSTER_NAME, secondInstanceToSwapInName,
+ InstanceConstants.InstanceOperation.SWAP_IN);
}
- @Test(expectedExceptions = HelixException.class, dependsOnMethods = "testNodeSwapSwapInNodeWithAlreadySwappingPair")
- public void testNodeSwapWrongFaultZone() throws Exception {
- System.out.println("START TestInstanceOperation.testNodeSwapWrongFaultZone() at " + new Date(
- System.currentTimeMillis()));
- removeOfflineOrDisabledOrSwapInInstances();
-
- // Set instance's InstanceOperation to SWAP_OUT
- String instanceToSwapOutName = _participants.get(0).getInstanceName();
- _gSetupTool.getClusterManagementTool().setInstanceOperation(CLUSTER_NAME, instanceToSwapOutName,
- InstanceConstants.InstanceOperation.SWAP_OUT);
-
- // Add instance with InstanceOperation set to SWAP_IN
- // There should be an error because SWAP_IN instance must be in the same FAULT_ZONE as the SWAP_OUT instance.
- String instanceToSwapInName = PARTICIPANT_PREFIX + "_" + _nextStartPort;
- InstanceConfig instanceToSwapOutInstanceConfig = _gSetupTool.getClusterManagementTool()
- .getInstanceConfig(CLUSTER_NAME, instanceToSwapOutName);
- addParticipant(instanceToSwapInName, instanceToSwapOutInstanceConfig.getLogicalId(LOGICAL_ID),
- instanceToSwapOutInstanceConfig.getDomainAsMap().get(ZONE) + "1",
- InstanceConstants.InstanceOperation.SWAP_IN, true, -1);
- }
-
- @Test(expectedExceptions = HelixException.class, dependsOnMethods = "testNodeSwapWrongFaultZone")
- public void testNodeSwapWrongCapacity() throws Exception {
- System.out.println("START TestInstanceOperation.testNodeSwapWrongCapacity() at " + new Date(
- System.currentTimeMillis()));
- removeOfflineOrDisabledOrSwapInInstances();
-
- // Set instance's InstanceOperation to SWAP_OUT
- String instanceToSwapOutName = _participants.get(0).getInstanceName();
- _gSetupTool.getClusterManagementTool().setInstanceOperation(CLUSTER_NAME, instanceToSwapOutName,
- InstanceConstants.InstanceOperation.SWAP_OUT);
-
- // Add instance with InstanceOperation set to SWAP_IN
- // There should be an error because SWAP_IN instance must have same capacity as the SWAP_OUT node.
- String instanceToSwapInName = PARTICIPANT_PREFIX + "_" + _nextStartPort;
- InstanceConfig instanceToSwapOutInstanceConfig = _gSetupTool.getClusterManagementTool()
- .getInstanceConfig(CLUSTER_NAME, instanceToSwapOutName);
- addParticipant(instanceToSwapInName, instanceToSwapOutInstanceConfig.getLogicalId(LOGICAL_ID),
- instanceToSwapOutInstanceConfig.getDomainAsMap().get(ZONE),
- InstanceConstants.InstanceOperation.SWAP_IN, true, TEST_CAPACITY_VALUE - 10);
- }
-
- @Test(dependsOnMethods = "testNodeSwapWrongCapacity")
+ @Test(dependsOnMethods = "testNodeSwapSwapInNodeWithAlreadySwappingPair")
public void testNodeSwap() throws Exception {
System.out.println(
"START TestInstanceOperation.testNodeSwap() at " + new Date(System.currentTimeMillis()));
- removeOfflineOrDisabledOrSwapInInstances();
+
+ removeOfflineOrInactiveInstances();
+
+ Map<String, String> swapOutInstancesToSwapInInstances = new HashMap<>();
+
+ String instanceToSwapOutName = _participants.get(0).getInstanceName();
+ InstanceConfig instanceToSwapOutInstanceConfig = _gSetupTool.getClusterManagementTool()
+ .getInstanceConfig(CLUSTER_NAME, instanceToSwapOutName);
+
+ Assert.assertTrue(_clusterVerifier.verifyByPolling());
+
+ String resourceToDisablePartition = _allDBs.iterator().next();
+ // Disable 1 partition that is assigned to the instance that will be swapped out.
+ getPartitionsAndStatesOnInstance(getEVs(), instanceToSwapOutName).entrySet().stream()
+ .filter(entry -> entry.getKey().startsWith(resourceToDisablePartition)).findFirst()
+ .ifPresent(entry -> {
+ String partition = entry.getKey();
+ instanceToSwapOutInstanceConfig.setInstanceEnabledForPartition(resourceToDisablePartition,
+ partition, false);
+ });
+ _gSetupTool.getClusterManagementTool()
+ .setInstanceConfig(CLUSTER_NAME, instanceToSwapOutName, instanceToSwapOutInstanceConfig);
+
+ Assert.assertTrue(_clusterVerifier.verifyByPolling());
// Store original EV
Map<String, ExternalView> originalEVs = getEVs();
- Map<String, String> swapOutInstancesToSwapInInstances = new HashMap<>();
-
- // Set instance's InstanceOperation to SWAP_OUT
- String instanceToSwapOutName = _participants.get(0).getInstanceName();
- InstanceConfig instanceToSwapOutInstanceConfig = _gSetupTool.getClusterManagementTool()
- .getInstanceConfig(CLUSTER_NAME, instanceToSwapOutName);
- _gSetupTool.getClusterManagementTool().setInstanceOperation(CLUSTER_NAME, instanceToSwapOutName,
- InstanceConstants.InstanceOperation.SWAP_OUT);
-
- // Validate that the assignment has not changed since setting the InstanceOperation to SWAP_OUT
- Assert.assertTrue(_clusterVerifier.verifyByPolling());
validateEVsCorrect(getEVs(), originalEVs, swapOutInstancesToSwapInInstances,
Collections.emptySet(), Collections.emptySet());
+ // Create a custom change listener to check if the throttles are enabled after the swap is completed.
CustomIndividualInstanceConfigChangeListener instanceToSwapInInstanceConfigListener =
new CustomIndividualInstanceConfigChangeListener();
+
// Add instance with InstanceOperation set to SWAP_IN
String instanceToSwapInName = PARTICIPANT_PREFIX + "_" + _nextStartPort;
swapOutInstancesToSwapInInstances.put(instanceToSwapOutName, instanceToSwapInName);
addParticipant(instanceToSwapInName, instanceToSwapOutInstanceConfig.getLogicalId(LOGICAL_ID),
instanceToSwapOutInstanceConfig.getDomainAsMap().get(ZONE),
- InstanceConstants.InstanceOperation.SWAP_IN, true, -1, instanceToSwapInInstanceConfigListener);
+ InstanceConstants.InstanceOperation.SWAP_IN, -1, instanceToSwapInInstanceConfigListener);
+ // Validate that the throttles are off since the InstanceOperation is set to SWAP_IN
Assert.assertFalse(instanceToSwapInInstanceConfigListener.isThrottlesEnabled());
- // Validate that partitions on SWAP_OUT instance does not change after setting the InstanceOperation to SWAP_OUT
- // and adding the SWAP_IN instance to the cluster.
- // Check that the SWAP_IN instance has the same partitions as the SWAP_OUT instance
+ // Check that the SWAP_IN instance has the same partitions as the swap out instance
// but none of them are in a top state.
Assert.assertTrue(_bestPossibleClusterVerifier.verifyByPolling());
validateEVsCorrect(getEVs(), originalEVs, swapOutInstancesToSwapInInstances,
@@ -573,7 +550,7 @@
Assert.assertTrue(_gSetupTool.getClusterManagementTool()
.canCompleteSwap(CLUSTER_NAME, instanceToSwapOutName));
- // Validate that the SWAP_OUT instance is in routing tables and SWAP_IN is not.
+ // Validate that the swap out instance is in routing tables and SWAP_IN is not.
validateRoutingTablesInstance(getEVs(), instanceToSwapOutName, true);
validateRoutingTablesInstance(getEVs(), instanceToSwapInName, false);
@@ -581,20 +558,31 @@
Assert.assertTrue(_gSetupTool.getClusterManagementTool()
.completeSwapIfPossible(CLUSTER_NAME, instanceToSwapOutName, false));
+ // Get both instanceConfigs and make sure correct fields are copied over.
+ InstanceConfig instanceToSwapInInstanceConfig = _gSetupTool.getClusterManagementTool()
+ .getInstanceConfig(CLUSTER_NAME, instanceToSwapInName);
+
+ Assert.assertEquals(instanceToSwapInInstanceConfig.getRecord()
+ .getMapField(InstanceConfig.InstanceConfigProperty.HELIX_DISABLED_PARTITION.name()),
+ instanceToSwapInInstanceConfig.getRecord()
+ .getMapField(InstanceConfig.InstanceConfigProperty.HELIX_DISABLED_PARTITION.name()));
+
Assert.assertTrue(_clusterVerifier.verifyByPolling());
// Validate that the SWAP_IN instance is now in the routing tables.
validateRoutingTablesInstance(getEVs(), instanceToSwapInName, true);
-
- // Assert that SWAP_OUT instance is disabled and has no partitions assigned to it.
+ // Assert that swap out instance is not active and has no partitions assigned to it.
Assert.assertFalse(_gSetupTool.getClusterManagementTool()
.getInstanceConfig(CLUSTER_NAME, instanceToSwapOutName).getInstanceEnabled());
+ Assert.assertEquals(_gSetupTool.getClusterManagementTool()
+ .getInstanceConfig(CLUSTER_NAME, instanceToSwapOutName).getInstanceOperation(),
+ InstanceConstants.InstanceOperation.UNKNOWN);
// Check to make sure the throttle was enabled again after the swap was completed.
Assert.assertTrue(instanceToSwapInInstanceConfigListener.isThrottlesEnabled());
- // Validate that the SWAP_IN instance has the same partitions the SWAP_OUT instance had before
+ // Validate that the SWAP_IN instance has the same partitions the swap out instance had before
// swap was completed.
verifier(() -> (validateEVsCorrect(getEVs(), originalEVs, swapOutInstancesToSwapInInstances,
Collections.emptySet(), ImmutableSet.of(instanceToSwapInName))), TIMEOUT);
@@ -604,21 +592,18 @@
public void testNodeSwapDisableAndReenable() throws Exception {
System.out.println(
"START TestInstanceOperation.testNodeSwap() at " + new Date(System.currentTimeMillis()));
- removeOfflineOrDisabledOrSwapInInstances();
+ removeOfflineOrInactiveInstances();
// Store original EV
Map<String, ExternalView> originalEVs = getEVs();
Map<String, String> swapOutInstancesToSwapInInstances = new HashMap<>();
- // Set instance's InstanceOperation to SWAP_OUT
String instanceToSwapOutName = _participants.get(0).getInstanceName();
InstanceConfig instanceToSwapOutInstanceConfig = _gSetupTool.getClusterManagementTool()
.getInstanceConfig(CLUSTER_NAME, instanceToSwapOutName);
- _gSetupTool.getClusterManagementTool().setInstanceOperation(CLUSTER_NAME, instanceToSwapOutName,
- InstanceConstants.InstanceOperation.SWAP_OUT);
- // Validate that the assignment has not changed since setting the InstanceOperation to SWAP_OUT
+ // Validate that the assignment has not changed since setting the InstanceOperation to swap out
Assert.assertTrue(_clusterVerifier.verifyByPolling());
validateEVsCorrect(getEVs(), originalEVs, swapOutInstancesToSwapInInstances,
Collections.emptySet(), Collections.emptySet());
@@ -628,11 +613,9 @@
swapOutInstancesToSwapInInstances.put(instanceToSwapOutName, instanceToSwapInName);
addParticipant(instanceToSwapInName, instanceToSwapOutInstanceConfig.getLogicalId(LOGICAL_ID),
instanceToSwapOutInstanceConfig.getDomainAsMap().get(ZONE),
- InstanceConstants.InstanceOperation.SWAP_IN, true, -1);
+ InstanceConstants.InstanceOperation.SWAP_IN, -1);
- // Validate that partitions on SWAP_OUT instance does not change after setting the InstanceOperation to SWAP_OUT
- // and adding the SWAP_IN instance to the cluster.
- // Check that the SWAP_IN instance has the same partitions as the SWAP_OUT instance
+ // Check that the SWAP_IN instance has the same partitions as the swap out instance
// but none of them are in a top state.
Assert.assertTrue(_bestPossibleClusterVerifier.verifyByPolling());
validateEVsCorrect(getEVs(), originalEVs, swapOutInstancesToSwapInInstances,
@@ -642,12 +625,12 @@
Assert.assertTrue(_gSetupTool.getClusterManagementTool()
.canCompleteSwap(CLUSTER_NAME, instanceToSwapOutName));
- // Disable the SWAP_IN instance
+ // Try to disable the swap out instance, it should not do anything.
_gSetupTool.getClusterManagementTool()
- .enableInstance(CLUSTER_NAME, instanceToSwapInName, false);
+ .enableInstance(CLUSTER_NAME, instanceToSwapOutName, false);
// Check that the SWAP_IN instance's replicas match the SWAP_OUT instance's replicas
- // but all of them are OFFLINE
+ // and all of them are OFFLINE.
Assert.assertTrue(_bestPossibleClusterVerifier.verifyByPolling());
Map<String, Map<String, String>> resourcePartitionStateOnSwapOutInstance =
getResourcePartitionStateOnInstance(getEVs(), instanceToSwapOutName);
@@ -658,14 +641,27 @@
.collect(Collectors.toSet()),
resourcePartitionStateOnSwapOutInstance.values().stream().flatMap(p -> p.keySet().stream())
.collect(Collectors.toSet()));
+ Set<String> swapOutInstancePartitionStates =
+ resourcePartitionStateOnSwapOutInstance.values().stream().flatMap(e -> e.values().stream())
+ .collect(Collectors.toSet());
+ Assert.assertEquals(swapOutInstancePartitionStates.size(), 1);
+ Assert.assertTrue(swapOutInstancePartitionStates.contains("OFFLINE"));
Set<String> swapInInstancePartitionStates =
resourcePartitionStateOnSwapInInstance.values().stream().flatMap(e -> e.values().stream())
.collect(Collectors.toSet());
Assert.assertEquals(swapInInstancePartitionStates.size(), 1);
Assert.assertTrue(swapInInstancePartitionStates.contains("OFFLINE"));
- // Re-enable the SWAP_IN instance
- _gSetupTool.getClusterManagementTool().enableInstance(CLUSTER_NAME, instanceToSwapInName, true);
+ // Validate that the SWAP_OUT instance is in routing tables and SWAP_IN is not.
+ validateRoutingTablesInstance(getEVs(), instanceToSwapOutName, true);
+ validateRoutingTablesInstance(getEVs(), instanceToSwapInName, false);
+
+ // Assert canSwapBeCompleted is true
+ Assert.assertTrue(_gSetupTool.getClusterManagementTool()
+ .canCompleteSwap(CLUSTER_NAME, instanceToSwapOutName));
+
+ // Re-enable the swap out instance
+ _gSetupTool.getClusterManagementTool().enableInstance(CLUSTER_NAME, instanceToSwapOutName, true);
Assert.assertTrue(_bestPossibleClusterVerifier.verifyByPolling());
// Validate that the SWAP_OUT instance is in routing tables and SWAP_IN is not.
@@ -681,63 +677,61 @@
// Validate that the SWAP_IN instance is now in the routing tables.
validateRoutingTablesInstance(getEVs(), instanceToSwapInName, true);
- // Assert that SWAP_OUT instance is disabled and has no partitions assigned to it.
+ // Assert that swap out instance is not active and has no partitions assigned to it.
Assert.assertFalse(_gSetupTool.getClusterManagementTool()
.getInstanceConfig(CLUSTER_NAME, instanceToSwapOutName).getInstanceEnabled());
+ Assert.assertEquals(_gSetupTool.getClusterManagementTool()
+ .getInstanceConfig(CLUSTER_NAME, instanceToSwapOutName).getInstanceOperation(),
+ InstanceConstants.InstanceOperation.UNKNOWN);
- // Validate that the SWAP_IN instance has the same partitions the SWAP_OUT instance had before
+ // Validate that the SWAP_IN instance has the same partitions the swap out instance had before
// swap was completed.
verifier(() -> (validateEVsCorrect(getEVs(), originalEVs, swapOutInstancesToSwapInInstances,
Collections.emptySet(), ImmutableSet.of(instanceToSwapInName))), TIMEOUT);
}
@Test(dependsOnMethods = "testNodeSwapDisableAndReenable")
- public void testNodeSwapSwapInNodeNoInstanceOperationDisabled() throws Exception {
- System.out.println(
- "START TestInstanceOperation.testNodeSwapSwapInNodeNoInstanceOperationDisabled() at "
+ public void testNodeSwapSwapInNodeNoInstanceOperation() throws Exception {
+ System.out.println("START TestInstanceOperation.testNodeSwapSwapInNodeNoInstanceOperation() at "
+ new Date(System.currentTimeMillis()));
- removeOfflineOrDisabledOrSwapInInstances();
+ removeOfflineOrInactiveInstances();
// Store original EVs
Map<String, ExternalView> originalEVs = getEVs();
Map<String, String> swapOutInstancesToSwapInInstances = new HashMap<>();
- // Set instance's InstanceOperation to SWAP_OUT
String instanceToSwapOutName = _participants.get(0).getInstanceName();
InstanceConfig instanceToSwapOutInstanceConfig = _gSetupTool.getClusterManagementTool()
.getInstanceConfig(CLUSTER_NAME, instanceToSwapOutName);
- _gSetupTool.getClusterManagementTool().setInstanceOperation(CLUSTER_NAME, instanceToSwapOutName,
- InstanceConstants.InstanceOperation.SWAP_OUT);
- // Validate that the assignment has not changed since setting the InstanceOperation to SWAP_OUT
Assert.assertTrue(_clusterVerifier.verifyByPolling());
validateEVsCorrect(getEVs(), originalEVs, swapOutInstancesToSwapInInstances,
Collections.emptySet(), Collections.emptySet());
- // Add instance with InstanceOperation unset, should automatically be set to SWAP_IN
+ // Add instance with InstanceOperation unset, should set to UNKNOWN.
String instanceToSwapInName = PARTICIPANT_PREFIX + "_" + _nextStartPort;
swapOutInstancesToSwapInInstances.put(instanceToSwapOutName, instanceToSwapInName);
addParticipant(instanceToSwapInName, instanceToSwapOutInstanceConfig.getLogicalId(LOGICAL_ID),
- instanceToSwapOutInstanceConfig.getDomainAsMap().get(ZONE), null, false, -1);
+ instanceToSwapOutInstanceConfig.getDomainAsMap().get(ZONE), null, -1);
+ // Validate that the SWAP_IN instance does not have any partitions on it.
Assert.assertTrue(_clusterVerifier.verifyByPolling());
validateEVsCorrect(getEVs(), originalEVs, swapOutInstancesToSwapInInstances,
Collections.emptySet(), Collections.emptySet());
- // Enable the SWAP_IN instance, so it can start being assigned replicas
- _gSetupTool.getClusterManagementTool().enableInstance(CLUSTER_NAME, instanceToSwapInName, true);
+ // Set InstanceOperation to SWAP_IN
+ _gSetupTool.getClusterManagementTool().setInstanceOperation(CLUSTER_NAME, instanceToSwapInName,
+ InstanceConstants.InstanceOperation.SWAP_IN);
- // Validate that partitions on SWAP_OUT instance does not change after setting the InstanceOperation to SWAP_OUT
- // and adding the SWAP_IN instance to the cluster.
- // Check that the SWAP_IN instance has the same partitions as the SWAP_OUT instance
+ // Check that the SWAP_IN instance has the same partitions as the swap out instance
// but none of them are in a top state.
Assert.assertTrue(_bestPossibleClusterVerifier.verifyByPolling());
validateEVsCorrect(getEVs(), originalEVs, swapOutInstancesToSwapInInstances,
ImmutableSet.of(instanceToSwapInName), Collections.emptySet());
- // Validate that the SWAP_OUT instance is in routing tables and SWAP_IN is not.
+ // Validate that the swap out instance is in routing tables and SWAP_IN is not.
validateRoutingTablesInstance(getEVs(), instanceToSwapOutName, true);
validateRoutingTablesInstance(getEVs(), instanceToSwapInName, false);
@@ -750,37 +744,34 @@
Assert.assertTrue(_clusterVerifier.verifyByPolling());
- // Assert that SWAP_OUT instance is disabled and has no partitions assigned to it.
+ // Assert that swap out instance is inactive and has no partitions assigned to it.
Assert.assertFalse(_gSetupTool.getClusterManagementTool()
.getInstanceConfig(CLUSTER_NAME, instanceToSwapOutName).getInstanceEnabled());
- // Validate that the SWAP_IN instance has the same partitions the SWAP_OUT instance had before
+ // Validate that the SWAP_IN instance has the same partitions the swap out instance had before
// swap was completed.
verifier(() -> (validateEVsCorrect(getEVs(), originalEVs, swapOutInstancesToSwapInInstances,
Collections.emptySet(), ImmutableSet.of(instanceToSwapInName))), TIMEOUT);
}
- @Test(dependsOnMethods = "testNodeSwapSwapInNodeNoInstanceOperationDisabled")
+ @Test(dependsOnMethods = "testNodeSwapSwapInNodeNoInstanceOperation")
public void testNodeSwapCancelSwapWhenReadyToComplete() throws Exception {
System.out.println(
"START TestInstanceOperation.testNodeSwapCancelSwapWhenReadyToComplete() at " + new Date(
System.currentTimeMillis()));
- removeOfflineOrDisabledOrSwapInInstances();
+ removeOfflineOrInactiveInstances();
// Store original EVs
Map<String, ExternalView> originalEVs = getEVs();
Map<String, String> swapOutInstancesToSwapInInstances = new HashMap<>();
- // Set instance's InstanceOperation to SWAP_OUT
String instanceToSwapOutName = _participants.get(0).getInstanceName();
InstanceConfig instanceToSwapOutInstanceConfig = _gSetupTool.getClusterManagementTool()
.getInstanceConfig(CLUSTER_NAME, instanceToSwapOutName);
- _gSetupTool.getClusterManagementTool().setInstanceOperation(CLUSTER_NAME, instanceToSwapOutName,
- InstanceConstants.InstanceOperation.SWAP_OUT);
- // Validate that the assignment has not changed since setting the InstanceOperation to SWAP_OUT
+ // Validate that the assignment has not changed since setting the InstanceOperation to swap out
Assert.assertTrue(_clusterVerifier.verifyByPolling());
validateEVsCorrect(getEVs(), originalEVs, swapOutInstancesToSwapInInstances,
Collections.emptySet(), Collections.emptySet());
@@ -790,17 +781,15 @@
swapOutInstancesToSwapInInstances.put(instanceToSwapOutName, instanceToSwapInName);
addParticipant(instanceToSwapInName, instanceToSwapOutInstanceConfig.getLogicalId(LOGICAL_ID),
instanceToSwapOutInstanceConfig.getDomainAsMap().get(ZONE),
- InstanceConstants.InstanceOperation.SWAP_IN, true, -1);
+ InstanceConstants.InstanceOperation.SWAP_IN, -1);
- // Validate that partitions on SWAP_OUT instance does not change after setting the InstanceOperation to SWAP_OUT
- // and adding the SWAP_IN instance to the cluster.
- // Check that the SWAP_IN instance has the same partitions as the SWAP_OUT instance
+ // Check that the SWAP_IN instance has the same partitions as the swap out instance
// but none of them are in a top state.
Assert.assertTrue(_bestPossibleClusterVerifier.verifyByPolling());
validateEVsCorrect(getEVs(), originalEVs, swapOutInstancesToSwapInInstances,
ImmutableSet.of(instanceToSwapInName), Collections.emptySet());
- // Validate that the SWAP_OUT instance is in routing tables and SWAP_IN is not.
+ // Validate that the swap out instance is in routing tables and SWAP_IN is not.
validateRoutingTablesInstance(getEVs(), instanceToSwapOutName, true);
validateRoutingTablesInstance(getEVs(), instanceToSwapInName, false);
@@ -808,23 +797,29 @@
Assert.assertTrue(_gSetupTool.getClusterManagementTool()
.canCompleteSwap(CLUSTER_NAME, instanceToSwapOutName));
- // Cancel SWAP by disabling the SWAP_IN instance and remove SWAP_OUT InstanceOperation from SWAP_OUT instance.
- _gSetupTool.getClusterManagementTool()
- .enableInstance(CLUSTER_NAME, instanceToSwapInName, false);
+ // Cancel the swap by setting the InstanceOperation to UNKNOWN on the SWAP_IN instance.
+ _gSetupTool.getClusterManagementTool().setInstanceOperation(CLUSTER_NAME, instanceToSwapInName,
+ InstanceConstants.InstanceOperation.UNKNOWN);
+
+ // Validate there are no partitions on the SWAP_IN instance.
+ Assert.assertTrue(_bestPossibleClusterVerifier.verifyByPolling());
+ validateEVsCorrect(getEVs(), originalEVs, swapOutInstancesToSwapInInstances,
+ Collections.emptySet(), Collections.emptySet());
+
// Stop the participant
_participants.get(_participants.size() - 1).syncStop();
// Wait for cluster to converge.
Assert.assertTrue(_clusterVerifier.verifyByPolling());
- // Validate that the SWAP_OUT instance is in routing tables and SWAP_IN is not.
+ // Validate that the swap out instance is in routing tables and SWAP_IN is not.
validateRoutingTablesInstance(getEVs(), instanceToSwapOutName, true);
validateRoutingTablesInstance(getEVs(), instanceToSwapInName, false);
// Validate there are no partitions on the SWAP_IN instance.
Assert.assertEquals(getPartitionsAndStatesOnInstance(getEVs(), instanceToSwapInName).size(), 0);
- // Validate that the SWAP_OUT instance has the same partitions as it had before.
+ // Validate that the swap out instance has the same partitions as it had before.
validateEVsCorrect(getEVs(), originalEVs, swapOutInstancesToSwapInInstances,
Collections.emptySet(), Collections.emptySet());
@@ -833,7 +828,7 @@
Assert.assertTrue(_clusterVerifier.verifyByPolling());
- // Validate that the SWAP_OUT instance has the same partitions as it had before.
+ // Validate that the swap out instance has the same partitions as it had before.
verifier(() -> (validateEVsCorrect(getEVs(), originalEVs, swapOutInstancesToSwapInInstances,
Collections.emptySet(), Collections.emptySet())), TIMEOUT);
}
@@ -843,7 +838,7 @@
System.out.println("START TestInstanceOperation.testNodeSwapAfterEMM() at " + new Date(
System.currentTimeMillis()));
- removeOfflineOrDisabledOrSwapInInstances();
+ removeOfflineOrInactiveInstances();
// Store original EVs
Map<String, ExternalView> originalEVs = getEVs();
@@ -854,14 +849,11 @@
_gSetupTool.getClusterManagementTool()
.manuallyEnableMaintenanceMode(CLUSTER_NAME, true, null, null);
- // Set instance's InstanceOperation to SWAP_OUT
String instanceToSwapOutName = _participants.get(0).getInstanceName();
InstanceConfig instanceToSwapOutInstanceConfig = _gSetupTool.getClusterManagementTool()
.getInstanceConfig(CLUSTER_NAME, instanceToSwapOutName);
- _gSetupTool.getClusterManagementTool().setInstanceOperation(CLUSTER_NAME, instanceToSwapOutName,
- InstanceConstants.InstanceOperation.SWAP_OUT);
- // Validate that the assignment has not changed since setting the InstanceOperation to SWAP_OUT
+ // Validate that the assignment has not changed.
Assert.assertTrue(_clusterVerifier.verifyByPolling());
validateEVsCorrect(getEVs(), originalEVs, swapOutInstancesToSwapInInstances,
Collections.emptySet(), Collections.emptySet());
@@ -871,10 +863,10 @@
swapOutInstancesToSwapInInstances.put(instanceToSwapOutName, instanceToSwapInName);
addParticipant(instanceToSwapInName, instanceToSwapOutInstanceConfig.getLogicalId(LOGICAL_ID),
instanceToSwapOutInstanceConfig.getDomainAsMap().get(ZONE),
- InstanceConstants.InstanceOperation.SWAP_IN, true, -1);
+ InstanceConstants.InstanceOperation.SWAP_IN, -1);
// Validate that the assignment has not changed since adding the SWAP_IN node.
- // During MM, the cluster should not compute new assignment.
+ // During MM, the cluster should not compute new assignment on SWAP_IN node.
Assert.assertTrue(_bestPossibleClusterVerifier.verifyByPolling());
validateEVsCorrect(getEVs(), originalEVs, swapOutInstancesToSwapInInstances,
Collections.emptySet(), Collections.emptySet());
@@ -884,14 +876,14 @@
_gSetupTool.getClusterManagementTool()
.manuallyEnableMaintenanceMode(CLUSTER_NAME, false, null, null);
- // Validate that partitions on SWAP_OUT instance does not change after exiting MM
- // Check that the SWAP_IN instance has the same partitions as the SWAP_OUT instance
+ // Validate that partitions on swap out instance does not change after exiting MM
+ // Check that the SWAP_IN instance has the same partitions as the swap out instance
// but none of them are in a top state.
Assert.assertTrue(_bestPossibleClusterVerifier.verifyByPolling());
validateEVsCorrect(getEVs(), originalEVs, swapOutInstancesToSwapInInstances,
ImmutableSet.of(instanceToSwapInName), Collections.emptySet());
- // Validate that the SWAP_OUT instance is in routing tables and SWAP_IN is not.
+ // Validate that the swap out instance is in routing tables and SWAP_IN is not.
validateRoutingTablesInstance(getEVs(), instanceToSwapOutName, true);
validateRoutingTablesInstance(getEVs(), instanceToSwapInName, false);
@@ -907,11 +899,11 @@
// Validate that the SWAP_IN instance is now in the routing tables.
validateRoutingTablesInstance(getEVs(), instanceToSwapInName, true);
- // Assert that SWAP_OUT instance is disabled and has no partitions assigned to it.
+ // Assert that swap out instance is disabled and has no partitions assigned to it.
Assert.assertFalse(_gSetupTool.getClusterManagementTool()
.getInstanceConfig(CLUSTER_NAME, instanceToSwapOutName).getInstanceEnabled());
- // Validate that the SWAP_IN instance has the same partitions the SWAP_OUT instance had before
+ // Validate that the SWAP_IN instance has the same partitions the swap out instance had before
// swap was completed.
verifier(() -> (validateEVsCorrect(getEVs(), originalEVs, swapOutInstancesToSwapInInstances,
Collections.emptySet(), ImmutableSet.of(instanceToSwapInName))), TIMEOUT);
@@ -923,28 +915,25 @@
"START TestInstanceOperation.testNodeSwapWithSwapOutInstanceDisabled() at " + new Date(
System.currentTimeMillis()));
- removeOfflineOrDisabledOrSwapInInstances();
+ removeOfflineOrInactiveInstances();
// Store original EVs
Map<String, ExternalView> originalEVs = getEVs();
- // Set instance's InstanceOperation to SWAP_OUT
String instanceToSwapOutName = _participants.get(0).getInstanceName();
InstanceConfig instanceToSwapOutInstanceConfig = _gSetupTool.getClusterManagementTool()
.getInstanceConfig(CLUSTER_NAME, instanceToSwapOutName);
- _gSetupTool.getClusterManagementTool().setInstanceOperation(CLUSTER_NAME, instanceToSwapOutName,
- InstanceConstants.InstanceOperation.SWAP_OUT);
Set<String> swapOutInstanceOriginalPartitions =
getPartitionsAndStatesOnInstance(originalEVs, instanceToSwapOutName).keySet();
- // Disable the SWAP_OUT instance.
+ // Disable the swap out instance.
_gSetupTool.getClusterManagementTool()
.enableInstance(CLUSTER_NAME, instanceToSwapOutName, false);
Assert.assertTrue(_clusterVerifier.verifyByPolling());
- // Validate that the SWAP_OUT instance has all partitions in OFFLINE state
+ // Validate that the swap out instance has all partitions in OFFLINE state
Set<String> swapOutInstanceOfflineStates =
new HashSet<>(getPartitionsAndStatesOnInstance(getEVs(), instanceToSwapOutName).values());
Assert.assertEquals(swapOutInstanceOfflineStates.size(), 1);
@@ -954,21 +943,16 @@
String instanceToSwapInName = PARTICIPANT_PREFIX + "_" + _nextStartPort;
addParticipant(instanceToSwapInName, instanceToSwapOutInstanceConfig.getLogicalId(LOGICAL_ID),
instanceToSwapOutInstanceConfig.getDomainAsMap().get(ZONE),
- InstanceConstants.InstanceOperation.SWAP_IN, true, -1);
+ InstanceConstants.InstanceOperation.SWAP_IN, -1);
Assert.assertTrue(_bestPossibleClusterVerifier.verifyByPolling());
- // Validate that the SWAP_IN instance has the same partitions in secondTopState as the SWAP_OUT instance
- // did before being disabled.
+ // Validate that the SWAP_IN instance has no partitions because the swap started when the swap out node was offline
Map<String, String> swapInInstancePartitionsAndStates =
getPartitionsAndStatesOnInstance(getEVs(), instanceToSwapInName);
- Assert.assertTrue(
- swapInInstancePartitionsAndStates.keySet().containsAll(swapOutInstanceOriginalPartitions));
- Set<String> swapInInstanceStates = new HashSet<>(swapInInstancePartitionsAndStates.values());
- swapInInstanceStates.removeAll(SECONDARY_STATE_SET);
- Assert.assertEquals(swapInInstanceStates.size(), 0);
+ Assert.assertEquals(swapInInstancePartitionsAndStates.size(), 0);
- // Assert canSwapBeCompleted is false because SWAP_OUT instance is disabled.
+ // Assert canSwapBeCompleted is false because swap out instance is disabled.
Assert.assertTrue(_gSetupTool.getClusterManagementTool()
.canCompleteSwap(CLUSTER_NAME, instanceToSwapOutName));
@@ -976,9 +960,9 @@
Assert.assertTrue(_gSetupTool.getClusterManagementTool()
.completeSwapIfPossible(CLUSTER_NAME, instanceToSwapOutName, false));
- Assert.assertTrue(_clusterVerifier.verifyByPolling());
+ Assert.assertTrue(_bestPossibleClusterVerifier.verifyByPolling());
- // Assert that SWAP_OUT instance is disabled and has no partitions assigned to it.
+ // Assert that swap out instance is disabled and has no partitions assigned to it.
Assert.assertFalse(_gSetupTool.getClusterManagementTool()
.getInstanceConfig(CLUSTER_NAME, instanceToSwapOutName).getInstanceEnabled());
@@ -993,28 +977,23 @@
"START TestInstanceOperation.testNodeSwapWithSwapOutInstanceOffline() at " + new Date(
System.currentTimeMillis()));
- removeOfflineOrDisabledOrSwapInInstances();
+ removeOfflineOrInactiveInstances();
// Store original EV
Map<String, ExternalView> originalEVs = getEVs();
Map<String, String> swapOutInstancesToSwapInInstances = new HashMap<>();
- // Set instance's InstanceOperation to SWAP_OUT
String instanceToSwapOutName = _participants.get(0).getInstanceName();
InstanceConfig instanceToSwapOutInstanceConfig = _gSetupTool.getClusterManagementTool()
.getInstanceConfig(CLUSTER_NAME, instanceToSwapOutName);
- _gSetupTool.getClusterManagementTool().setInstanceOperation(CLUSTER_NAME, instanceToSwapOutName,
- InstanceConstants.InstanceOperation.SWAP_OUT);
-
- Assert.assertTrue(_clusterVerifier.verifyByPolling());
// Add instance with InstanceOperation set to SWAP_IN
String instanceToSwapInName = PARTICIPANT_PREFIX + "_" + _nextStartPort;
swapOutInstancesToSwapInInstances.put(instanceToSwapOutName, instanceToSwapInName);
addParticipant(instanceToSwapInName, instanceToSwapOutInstanceConfig.getLogicalId(LOGICAL_ID),
instanceToSwapOutInstanceConfig.getDomainAsMap().get(ZONE),
- InstanceConstants.InstanceOperation.SWAP_IN, true, -1);
+ InstanceConstants.InstanceOperation.SWAP_IN, -1);
// Kill the participant
_participants.get(0).syncStop();
@@ -1025,7 +1004,7 @@
Assert.assertTrue(_gSetupTool.getClusterManagementTool()
.canCompleteSwap(CLUSTER_NAME, instanceToSwapOutName));
- // Validate that the SWAP_OUT instance is in routing tables and SWAP_IN is not.
+ // Validate that the swap out instance is in routing tables and SWAP_IN is not.
validateRoutingTablesInstance(getEVs(), instanceToSwapInName, false);
// Assert completeSwapIfPossible is true
@@ -1037,11 +1016,11 @@
// Validate that the SWAP_IN instance is now in the routing tables.
validateRoutingTablesInstance(getEVs(), instanceToSwapInName, true);
- // Assert that SWAP_OUT instance is disabled and has no partitions assigned to it.
+ // Assert that swap out instance is inactive and has no partitions assigned to it.
Assert.assertFalse(_gSetupTool.getClusterManagementTool()
.getInstanceConfig(CLUSTER_NAME, instanceToSwapOutName).getInstanceEnabled());
- // Validate that the SWAP_IN instance has the same partitions the SWAP_OUT instance had before
+ // Validate that the SWAP_IN instance has the same partitions the swap out instance had before
// swap was completed.
verifier(() -> (validateEVsCorrect(getEVs(), originalEVs, swapOutInstancesToSwapInInstances,
Collections.emptySet(), ImmutableSet.of(instanceToSwapInName))), TIMEOUT);
@@ -1051,7 +1030,7 @@
public void testSwapEvacuateAdd() throws Exception {
System.out.println("START TestInstanceOperation.testSwapEvacuateAdd() at " + new Date(
System.currentTimeMillis()));
- removeOfflineOrDisabledOrSwapInInstances();
+ removeOfflineOrInactiveInstances();
// Store original EV
Map<String, ExternalView> originalEVs = getEVs();
@@ -1062,6 +1041,8 @@
_gSetupTool.getClusterManagementTool()
.manuallyEnableMaintenanceMode(CLUSTER_NAME, true, null, null);
+ Assert.assertTrue(_clusterVerifier.verifyByPolling());
+
// Set instance's InstanceOperation to EVACUATE
String instanceToSwapOutName = _participants.get(0).getInstanceName();
InstanceConfig instanceToSwapOutInstanceConfig = _gSetupTool.getClusterManagementTool()
@@ -1073,11 +1054,12 @@
validateEVsCorrect(getEVs(), originalEVs, swapOutInstancesToSwapInInstances,
Collections.emptySet(), Collections.emptySet());
- // Add instance with InstanceOperation set to SWAP_IN
+ // Add instance with InstanceOperation set to ENABLE
String instanceToSwapInName = PARTICIPANT_PREFIX + "_" + _nextStartPort;
swapOutInstancesToSwapInInstances.put(instanceToSwapOutName, instanceToSwapInName);
addParticipant(instanceToSwapInName, instanceToSwapOutInstanceConfig.getLogicalId(LOGICAL_ID),
- instanceToSwapOutInstanceConfig.getDomainAsMap().get(ZONE), null, true, -1);
+ instanceToSwapOutInstanceConfig.getDomainAsMap().get(ZONE),
+ InstanceConstants.InstanceOperation.ENABLE, -1);
// Exit maintenance mode
_gSetupTool.getClusterManagementTool()
@@ -1085,7 +1067,7 @@
Assert.assertTrue(_bestPossibleClusterVerifier.verifyByPolling());
- // Validate that the SWAP_IN instance has the same partitions the SWAP_OUT instance had.
+ // Validate that the SWAP_IN instance has the same partitions the swap out instance had.
verifier(() -> (validateEVsCorrect(getEVs(), originalEVs, swapOutInstancesToSwapInInstances,
Collections.emptySet(), ImmutableSet.of(instanceToSwapInName))), TIMEOUT);
@@ -1093,9 +1075,9 @@
Assert.assertTrue(_gSetupTool.getClusterManagementTool()
.isEvacuateFinished(CLUSTER_NAME, instanceToSwapOutName));
- // Disable the EVACUATE instance
- _gSetupTool.getClusterManagementTool()
- .enableInstance(CLUSTER_NAME, instanceToSwapOutName, false);
+ // Set the EVACUATE instance to UNKNOWN
+ _gSetupTool.getClusterManagementTool().setInstanceOperation(CLUSTER_NAME, instanceToSwapOutName,
+ InstanceConstants.InstanceOperation.UNKNOWN);
Assert.assertTrue(_clusterVerifier.verifyByPolling());
@@ -1105,31 +1087,13 @@
}
@Test(expectedExceptions = HelixException.class, dependsOnMethods = "testSwapEvacuateAdd")
- public void testNodeSwapAddSwapInFirstEnabledBeforeSwapOutSet() throws Exception {
+ public void testUnsetInstanceOperationOnSwapInWhenSwapping() throws Exception {
System.out.println(
- "START TestInstanceOperation.testNodeSwapAddSwapInFirstEnabledBeforeSwapOutSet() at "
+ "START TestInstanceOperation.testUnsetInstanceOperationOnSwapInWhenSwapping() at "
+ new Date(System.currentTimeMillis()));
- removeOfflineOrDisabledOrSwapInInstances();
- // Get the SWAP_OUT instance.
- String instanceToSwapOutName = _participants.get(0).getInstanceName();
- InstanceConfig instanceToSwapOutInstanceConfig = _gSetupTool.getClusterManagementTool()
- .getInstanceConfig(CLUSTER_NAME, instanceToSwapOutName);
+ removeOfflineOrInactiveInstances();
- // Add instance with InstanceOperation set to SWAP_IN enabled before setting SWAP_OUT instance.
- String instanceToSwapInName = PARTICIPANT_PREFIX + "_" + _nextStartPort;
- addParticipant(instanceToSwapInName, instanceToSwapOutInstanceConfig.getLogicalId(LOGICAL_ID),
- instanceToSwapOutInstanceConfig.getDomainAsMap().get(ZONE), null, true, -1);
- }
-
- @Test(expectedExceptions = HelixException.class, dependsOnMethods = "testNodeSwapAddSwapInFirstEnabledBeforeSwapOutSet")
- public void testNodeSwapAddSwapInFirstEnableBeforeSwapOutSet() throws Exception {
- System.out.println(
- "START TestInstanceOperation.testNodeSwapAddSwapInFirstEnableBeforeSwapOutSet() at "
- + new Date(System.currentTimeMillis()));
- removeOfflineOrDisabledOrSwapInInstances();
-
- // Get the SWAP_OUT instance.
String instanceToSwapOutName = _participants.get(0).getInstanceName();
InstanceConfig instanceToSwapOutInstanceConfig = _gSetupTool.getClusterManagementTool()
.getInstanceConfig(CLUSTER_NAME, instanceToSwapOutName);
@@ -1137,49 +1101,27 @@
// Add instance with InstanceOperation set to SWAP_IN
String instanceToSwapInName = PARTICIPANT_PREFIX + "_" + _nextStartPort;
addParticipant(instanceToSwapInName, instanceToSwapOutInstanceConfig.getLogicalId(LOGICAL_ID),
- instanceToSwapOutInstanceConfig.getDomainAsMap().get(ZONE), null, false, -1);
+ instanceToSwapOutInstanceConfig.getDomainAsMap().get(ZONE),
+ InstanceConstants.InstanceOperation.SWAP_IN, -1);
Assert.assertTrue(_bestPossibleClusterVerifier.verifyByPolling());
- // Enable the SWAP_IN instance before we have set the SWAP_OUT instance.
- _gSetupTool.getClusterManagementTool().enableInstance(CLUSTER_NAME, instanceToSwapInName, true);
- }
-
- @Test(expectedExceptions = HelixException.class, dependsOnMethods = "testNodeSwapAddSwapInFirstEnableBeforeSwapOutSet")
- public void testUnsetInstanceOperationOnSwapInWhenAlreadyUnsetOnSwapOut() throws Exception {
- System.out.println(
- "START TestInstanceOperation.testUnsetInstanceOperationOnSwapInWhenAlreadyUnsetOnSwapOut() at "
- + new Date(System.currentTimeMillis()));
- removeOfflineOrDisabledOrSwapInInstances();
-
- // Get the SWAP_OUT instance.
- String instanceToSwapOutName = _participants.get(0).getInstanceName();
- InstanceConfig instanceToSwapOutInstanceConfig = _gSetupTool.getClusterManagementTool()
- .getInstanceConfig(CLUSTER_NAME, instanceToSwapOutName);
-
- // Add instance with InstanceOperation set to SWAP_IN
- String instanceToSwapInName = PARTICIPANT_PREFIX + "_" + _nextStartPort;
- addParticipant(instanceToSwapInName, instanceToSwapOutInstanceConfig.getLogicalId(LOGICAL_ID),
- instanceToSwapOutInstanceConfig.getDomainAsMap().get(ZONE), null, false, -1);
-
- Assert.assertTrue(_bestPossibleClusterVerifier.verifyByPolling());
-
- // Try to remove the InstanceOperation from the SWAP_IN instance before the SWAP_OUT instance is set.
+ // Try to remove the InstanceOperation from the SWAP_IN instance before swap in instance is set to unknown.
// This should throw exception because we cannot ever have two instances with the same logicalId and both have InstanceOperation
// unset.
_gSetupTool.getClusterManagementTool()
.setInstanceOperation(CLUSTER_NAME, instanceToSwapInName, null);
}
- @Test(expectedExceptions = HelixException.class, dependsOnMethods = "testUnsetInstanceOperationOnSwapInWhenAlreadyUnsetOnSwapOut")
+ @Test(dependsOnMethods = "testUnsetInstanceOperationOnSwapInWhenSwapping")
public void testNodeSwapAddSwapInFirst() throws Exception {
System.out.println("START TestInstanceOperation.testNodeSwapAddSwapInFirst() at " + new Date(
System.currentTimeMillis()));
- removeOfflineOrDisabledOrSwapInInstances();
+ removeOfflineOrInactiveInstances();
// Store original EV
Map<String, ExternalView> originalEVs = getEVs();
- // Get the SWAP_OUT instance.
+ // Get the swap out instance.
String instanceToSwapOutName = _participants.get(0).getInstanceName();
InstanceConfig instanceToSwapOutInstanceConfig = _gSetupTool.getClusterManagementTool()
.getInstanceConfig(CLUSTER_NAME, instanceToSwapOutName);
@@ -1187,7 +1129,8 @@
// Add instance with InstanceOperation set to SWAP_IN
String instanceToSwapInName = PARTICIPANT_PREFIX + "_" + _nextStartPort;
addParticipant(instanceToSwapInName, instanceToSwapOutInstanceConfig.getLogicalId(LOGICAL_ID),
- instanceToSwapOutInstanceConfig.getDomainAsMap().get(ZONE), null, false, -1);
+ instanceToSwapOutInstanceConfig.getDomainAsMap().get(ZONE),
+ InstanceConstants.InstanceOperation.SWAP_IN, -1);
}
@Test(dependsOnMethods = "testNodeSwapAddSwapInFirst")
@@ -1195,7 +1138,8 @@
System.out.println(
"START TestInstanceOperation.testEvacuateAndCancelBeforeBootstrapFinish() at " + new Date(
System.currentTimeMillis()));
- removeOfflineOrDisabledOrSwapInInstances();
+
+ removeOfflineOrInactiveInstances();
// add a resource where downward state transition is slow
createResourceWithDelayedRebalance(CLUSTER_NAME, "TEST_DB3_DELAYED_CRUSHED", "MasterSlave",
@@ -1346,7 +1290,31 @@
_stateModelDelay = 3L;
}
- @Test(dependsOnMethods = "testMarkEvacuationAfterEMM")
+ @Test(expectedExceptions = HelixException.class, dependsOnMethods = "testMarkEvacuationAfterEMM")
+ public void testSwapEvacuateAddRemoveEvacuate() throws Exception {
+ System.out.println("START TestInstanceOperation.testSwapEvacuateAddRemoveEvacuate() at " + new Date(
+ System.currentTimeMillis()));
+ removeOfflineOrInactiveInstances();
+
+ // Set instance's InstanceOperation to EVACUATE
+ String instanceToSwapOutName = _participants.get(0).getInstanceName();
+ InstanceConfig instanceToSwapOutInstanceConfig = _gSetupTool.getClusterManagementTool()
+ .getInstanceConfig(CLUSTER_NAME, instanceToSwapOutName);
+ _gSetupTool.getClusterManagementTool().setInstanceOperation(CLUSTER_NAME, instanceToSwapOutName,
+ InstanceConstants.InstanceOperation.EVACUATE);
+
+ // Add instance with InstanceOperation set to ENABLE
+ String instanceToSwapInName = PARTICIPANT_PREFIX + "_" + _nextStartPort;
+ addParticipant(instanceToSwapInName, instanceToSwapOutInstanceConfig.getLogicalId(LOGICAL_ID),
+ instanceToSwapOutInstanceConfig.getDomainAsMap().get(ZONE),
+ InstanceConstants.InstanceOperation.ENABLE, -1);
+
+ // Remove EVACUATE instance's InstanceOperation
+ _gSetupTool.getClusterManagementTool()
+ .setInstanceOperation(CLUSTER_NAME, instanceToSwapOutName, null);
+ }
+
+ @Test(dependsOnMethods = "testSwapEvacuateAddRemoveEvacuate")
public void testEvacuationWithOfflineInstancesInCluster() throws Exception {
System.out.println(
"START TestInstanceOperation.testEvacuationWithOfflineInstancesInCluster() at " + new Date(
@@ -1358,11 +1326,10 @@
_gSetupTool.getClusterManagementTool().setInstanceOperation(CLUSTER_NAME, evacuateInstanceName,
InstanceConstants.InstanceOperation.EVACUATE);
- Map<String, ExternalView> assignment;
// EV should contain all participants, check resources one by one
- assignment = getEVs();
- for (String resource : _allDBs) {
- verifier(() -> {
+ verifier(() -> {
+ Map<String, ExternalView> assignment = getEVs();
+ for (String resource : _allDBs) {
ExternalView ev = assignment.get(resource);
for (String partition : ev.getPartitionSet()) {
AtomicInteger activeReplicaCount = new AtomicInteger();
@@ -1372,44 +1339,21 @@
.forEach(v -> activeReplicaCount.getAndIncrement());
if (activeReplicaCount.get() < REPLICA - 1 || (
ev.getStateMap(partition).containsKey(evacuateInstanceName) && ev.getStateMap(
- partition).get(evacuateInstanceName).equals("MASTER") && ev.getStateMap(partition)
- .get(evacuateInstanceName).equals("LEADER"))) {
+ partition).get(evacuateInstanceName).equals("MASTER") && ev.getStateMap(
+ partition).get(evacuateInstanceName).equals("LEADER"))) {
return false;
}
}
- return true;
- }, 30000);
- }
+ }
+ return true;
+ }, 30000);
- removeOfflineOrDisabledOrSwapInInstances();
+ removeOfflineOrInactiveInstances();
addParticipant(PARTICIPANT_PREFIX + "_" + _nextStartPort);
addParticipant(PARTICIPANT_PREFIX + "_" + _nextStartPort);
dropTestDBs(ImmutableSet.of("TEST_DB3_DELAYED_CRUSHED", "TEST_DB4_DELAYED_WAGED"));
}
- @Test(expectedExceptions = HelixException.class, dependsOnMethods = "testEvacuationWithOfflineInstancesInCluster")
- public void testSwapEvacuateAddRemoveEvacuate() throws Exception {
- System.out.println("START TestInstanceOperation.testSwapEvacuateAddRemoveEvacuate() at " + new Date(
- System.currentTimeMillis()));
- removeOfflineOrDisabledOrSwapInInstances();
-
- // Set instance's InstanceOperation to EVACUATE
- String instanceToSwapOutName = _participants.get(0).getInstanceName();
- InstanceConfig instanceToSwapOutInstanceConfig = _gSetupTool.getClusterManagementTool()
- .getInstanceConfig(CLUSTER_NAME, instanceToSwapOutName);
- _gSetupTool.getClusterManagementTool().setInstanceOperation(CLUSTER_NAME, instanceToSwapOutName,
- InstanceConstants.InstanceOperation.EVACUATE);
-
- // Add instance with InstanceOperation set to SWAP_IN
- String instanceToSwapInName = PARTICIPANT_PREFIX + "_" + _nextStartPort;
- addParticipant(instanceToSwapInName, instanceToSwapOutInstanceConfig.getLogicalId(LOGICAL_ID),
- instanceToSwapOutInstanceConfig.getDomainAsMap().get(ZONE), null, true, -1);
-
- // Remove EVACUATE instance's InstanceOperation
- _gSetupTool.getClusterManagementTool()
- .setInstanceOperation(CLUSTER_NAME, instanceToSwapOutName, null);
- }
-
/**
* Verifies that the given verifier returns true within the given timeout. Handles AssertionError
* by returning false, which TestHelper.verify will not do. Asserts that return value from
@@ -1449,9 +1393,9 @@
public void onInstanceConfigChange(List<InstanceConfig> instanceConfig,
NotificationContext context) {
if (instanceConfig.get(0).getInstanceOperation()
- .equals(InstanceConstants.InstanceOperation.SWAP_IN.name())) {
+ .equals(InstanceConstants.InstanceOperation.SWAP_IN)) {
throttlesEnabled = false;
- } else if (instanceConfig.get(0).getInstanceOperation().isEmpty()) {
+ } else {
throttlesEnabled = true;
}
}
@@ -1470,21 +1414,21 @@
private void addParticipant(String participantName) throws Exception {
addParticipant(participantName, UUID.randomUUID().toString(),
- "zone_" + _participants.size() % ZONE_COUNT, null, true, -1);
+ "zone_" + _participants.size() % ZONE_COUNT, null, -1);
}
private void addParticipant(String participantName, String logicalId, String zone,
- InstanceConstants.InstanceOperation instanceOperation, boolean enabled, int capacity)
+ InstanceConstants.InstanceOperation instanceOperation, int capacity)
throws Exception {
- addParticipant(participantName, logicalId, zone, instanceOperation, enabled, capacity, null);
+ addParticipant(participantName, logicalId, zone, instanceOperation, capacity, null);
}
private void addParticipant(String participantName, String logicalId, String zone,
- InstanceConstants.InstanceOperation instanceOperation, boolean enabled, int capacity,
+ InstanceConstants.InstanceOperation instanceOperation, int capacity,
InstanceConfigChangeListener listener) throws Exception {
InstanceConfig config = new InstanceConfig.Builder().setDomain(
String.format("%s=%s, %s=%s, %s=%s", ZONE, zone, HOST, participantName, LOGICAL_ID,
- logicalId)).setInstanceEnabled(enabled).setInstanceOperation(instanceOperation)
+ logicalId)).setInstanceOperation(instanceOperation)
.build(participantName);
if (capacity >= 0) {
diff --git a/helix-core/src/test/java/org/apache/helix/integration/rebalancer/WagedRebalancer/TestWagedExpandCluster.java b/helix-core/src/test/java/org/apache/helix/integration/rebalancer/WagedRebalancer/TestWagedExpandCluster.java
index 697847c..9cc4eea 100644
--- a/helix-core/src/test/java/org/apache/helix/integration/rebalancer/WagedRebalancer/TestWagedExpandCluster.java
+++ b/helix-core/src/test/java/org/apache/helix/integration/rebalancer/WagedRebalancer/TestWagedExpandCluster.java
@@ -22,6 +22,7 @@
import java.util.HashMap;
import java.util.Map;
+import org.apache.helix.constants.InstanceConstants;
import org.apache.helix.integration.manager.MockParticipantManager;
import org.apache.helix.integration.rebalancer.PartitionMigration.TestPartitionMigrationBase;
import org.apache.helix.model.ClusterConfig;
@@ -103,7 +104,7 @@
for (int i = numNodes; i < numNodes + NUM_NODE; i++) {
String storageNodeName = PARTICIPANT_PREFIX + "_" + (START_PORT + i);
InstanceConfig config = InstanceConfig.toInstanceConfig(storageNodeName);
- config.setInstanceEnabled(false);
+ config.setInstanceOperation(InstanceConstants.InstanceOperation.DISABLE);
config.getRecord().getSimpleFields()
.remove(InstanceConfig.InstanceConfigProperty.HELIX_ENABLED_TIMESTAMP.name());
diff --git a/helix-core/src/test/java/org/apache/helix/integration/rebalancer/WagedRebalancer/TestWagedNodeSwap.java b/helix-core/src/test/java/org/apache/helix/integration/rebalancer/WagedRebalancer/TestWagedNodeSwap.java
index f6ef827..fbb7304 100644
--- a/helix-core/src/test/java/org/apache/helix/integration/rebalancer/WagedRebalancer/TestWagedNodeSwap.java
+++ b/helix-core/src/test/java/org/apache/helix/integration/rebalancer/WagedRebalancer/TestWagedNodeSwap.java
@@ -32,6 +32,7 @@
import org.apache.helix.ConfigAccessor;
import org.apache.helix.TestHelper;
import org.apache.helix.common.ZkTestBase;
+import org.apache.helix.constants.InstanceConstants;
import org.apache.helix.integration.manager.ClusterControllerManager;
import org.apache.helix.integration.manager.MockParticipantManager;
import org.apache.helix.model.BuiltInStateModelDefinitions;
@@ -156,7 +157,7 @@
String oldParticipantName = oldParticipant.getInstanceName();
final InstanceConfig instanceConfig =
_gSetupTool.getClusterManagementTool().getInstanceConfig(CLUSTER_NAME, oldParticipantName);
- instanceConfig.setInstanceEnabled(false);
+ instanceConfig.setInstanceOperation(InstanceConstants.InstanceOperation.DISABLE);
_gSetupTool.getClusterManagementTool()
.setInstanceConfig(CLUSTER_NAME, oldParticipantName, instanceConfig);
Assert.assertTrue(_clusterVerifier.verify(10000));
@@ -231,7 +232,7 @@
InstanceConfig instanceConfig =
_gSetupTool.getClusterManagementTool().getInstanceConfig(CLUSTER_NAME, instanceName);
if (instanceConfig.getDomainAsMap().get("zone").equals(randZoneStr)) {
- instanceConfig.setInstanceEnabled(false);
+ instanceConfig.setInstanceOperation(InstanceConstants.InstanceOperation.DISABLE);
_gSetupTool.getClusterManagementTool()
.setInstanceConfig(CLUSTER_NAME, instanceName, instanceConfig);
removedInstanceConfigMap.put(instanceName, instanceConfig);
diff --git a/helix-core/src/test/java/org/apache/helix/integration/rebalancer/WagedRebalancer/TestWagedRebalance.java b/helix-core/src/test/java/org/apache/helix/integration/rebalancer/WagedRebalancer/TestWagedRebalance.java
index a7250d4..26eb13d 100644
--- a/helix-core/src/test/java/org/apache/helix/integration/rebalancer/WagedRebalancer/TestWagedRebalance.java
+++ b/helix-core/src/test/java/org/apache/helix/integration/rebalancer/WagedRebalancer/TestWagedRebalance.java
@@ -34,6 +34,7 @@
import org.apache.helix.HelixException;
import org.apache.helix.TestHelper;
import org.apache.helix.common.ZkTestBase;
+import org.apache.helix.constants.InstanceConstants;
import org.apache.helix.controller.rebalancer.strategy.CrushEdRebalanceStrategy;
import org.apache.helix.controller.rebalancer.strategy.CrushRebalanceStrategy;
import org.apache.helix.controller.rebalancer.util.RebalanceScheduler;
@@ -387,7 +388,7 @@
disableParticipants.add(p.getInstanceName());
InstanceConfig config = _gSetupTool.getClusterManagementTool()
.getInstanceConfig(CLUSTER_NAME, p.getInstanceName());
- config.setInstanceEnabled(false);
+ config.setInstanceOperation(InstanceConstants.InstanceOperation.DISABLE);
_gSetupTool.getClusterManagementTool()
.setInstanceConfig(CLUSTER_NAME, p.getInstanceName(), config);
}
@@ -408,7 +409,7 @@
for (String instanceName : disableParticipants) {
InstanceConfig config =
_gSetupTool.getClusterManagementTool().getInstanceConfig(CLUSTER_NAME, instanceName);
- config.setInstanceEnabled(true);
+ config.setInstanceOperation(InstanceConstants.InstanceOperation.ENABLE);
_gSetupTool.getClusterManagementTool()
.setInstanceConfig(CLUSTER_NAME, instanceName, config);
}
diff --git a/helix-core/src/test/java/org/apache/helix/messaging/p2pMessage/TestP2PMessagesAvoidDuplicatedMessage.java b/helix-core/src/test/java/org/apache/helix/messaging/p2pMessage/TestP2PMessagesAvoidDuplicatedMessage.java
index 40d5c97..77f4432 100644
--- a/helix-core/src/test/java/org/apache/helix/messaging/p2pMessage/TestP2PMessagesAvoidDuplicatedMessage.java
+++ b/helix-core/src/test/java/org/apache/helix/messaging/p2pMessage/TestP2PMessagesAvoidDuplicatedMessage.java
@@ -82,6 +82,7 @@
event.addAttribute(AttributeName.RESOURCES.name(), resourceMap);
event.addAttribute(AttributeName.RESOURCES_TO_REBALANCE.name(), resourceMap);
event.addAttribute(AttributeName.CURRENT_STATE.name(), new CurrentStateOutput());
+ event.addAttribute(AttributeName.CURRENT_STATE_EXCLUDING_UNKNOWN.name(), new CurrentStateOutput());
event.addAttribute(AttributeName.helixmanager.name(), manager);
_fullPipeline = new Pipeline("FullPipeline");
@@ -124,6 +125,7 @@
CurrentStateOutput currentStateOutput =
populateCurrentStateFromBestPossible(_bestpossibleState);
event.addAttribute(AttributeName.CURRENT_STATE.name(), currentStateOutput);
+ event.addAttribute(AttributeName.CURRENT_STATE_EXCLUDING_UNKNOWN.name(), currentStateOutput);
_fullPipeline.handle(event);
@@ -161,6 +163,7 @@
currentStateOutput.setPendingRelayMessage(_db, _partition, initialMaster, relayMessage);
event.addAttribute(AttributeName.CURRENT_STATE.name(), currentStateOutput);
+ event.addAttribute(AttributeName.CURRENT_STATE_EXCLUDING_UNKNOWN.name(), currentStateOutput);
_fullPipeline.handle(event);
@@ -179,6 +182,7 @@
currentStateOutput.setPendingRelayMessage(_db, _partition, initialMaster, relayMessage);
event.addAttribute(AttributeName.CURRENT_STATE.name(), currentStateOutput);
+ event.addAttribute(AttributeName.CURRENT_STATE_EXCLUDING_UNKNOWN.name(), currentStateOutput);
_messagePipeline.handle(event);
@@ -218,6 +222,7 @@
event.addAttribute(AttributeName.CURRENT_STATE.name(), currentStateOutput);
+ event.addAttribute(AttributeName.CURRENT_STATE_EXCLUDING_UNKNOWN.name(), currentStateOutput);
event.addAttribute(AttributeName.BEST_POSSIBLE_STATE.name(), _bestpossibleState);
_messagePipeline.handle(event);
@@ -244,6 +249,7 @@
currentStateOutput.setCurrentState(_db, _partition, initialMaster, "SLAVE");
currentStateOutput.setPendingMessage(_db, _partition, secondMaster, relayMessage);
event.addAttribute(AttributeName.CURRENT_STATE.name(), currentStateOutput);
+ event.addAttribute(AttributeName.CURRENT_STATE_EXCLUDING_UNKNOWN.name(), currentStateOutput);
_fullPipeline.handle(event);
@@ -264,6 +270,7 @@
// Validate: controller should not send S->M to thirdMaster.
currentStateOutput.setCurrentState(_db, _partition, initialMaster, "OFFLINE");
event.addAttribute(AttributeName.CURRENT_STATE.name(), currentStateOutput);
+ event.addAttribute(AttributeName.CURRENT_STATE_EXCLUDING_UNKNOWN.name(), currentStateOutput);
thirdMaster =
getTopStateInstance(_bestpossibleState.getInstanceStateMap(_db, _partition),
@@ -290,6 +297,7 @@
// Validate: Controller should not send S->M to thirdMaster.
currentStateOutput.setPendingMessage(_db, _partition, secondMaster, relayMessage);
event.addAttribute(AttributeName.CURRENT_STATE.name(), currentStateOutput);
+ event.addAttribute(AttributeName.CURRENT_STATE_EXCLUDING_UNKNOWN.name(), currentStateOutput);
event.addAttribute(AttributeName.BEST_POSSIBLE_STATE.name(), _bestpossibleState);
@@ -310,6 +318,7 @@
currentStateOutput.setCurrentState(_db, _partition, thirdMaster, "SLAVE");
event.addAttribute(AttributeName.CURRENT_STATE.name(), currentStateOutput);
+ event.addAttribute(AttributeName.CURRENT_STATE_EXCLUDING_UNKNOWN.name(), currentStateOutput);
_messagePipeline.handle(event);
diff --git a/helix-core/src/test/java/org/apache/helix/messaging/p2pMessage/TestP2PStateTransitionMessages.java b/helix-core/src/test/java/org/apache/helix/messaging/p2pMessage/TestP2PStateTransitionMessages.java
index 307022f..9a38656 100644
--- a/helix-core/src/test/java/org/apache/helix/messaging/p2pMessage/TestP2PStateTransitionMessages.java
+++ b/helix-core/src/test/java/org/apache/helix/messaging/p2pMessage/TestP2PStateTransitionMessages.java
@@ -95,6 +95,7 @@
event.addAttribute(AttributeName.RESOURCES.name(), resourceMap);
event.addAttribute(AttributeName.RESOURCES_TO_REBALANCE.name(), resourceMap);
event.addAttribute(AttributeName.CURRENT_STATE.name(), new CurrentStateOutput());
+ event.addAttribute(AttributeName.CURRENT_STATE_EXCLUDING_UNKNOWN.name(), new CurrentStateOutput());
event.addAttribute(AttributeName.helixmanager.name(), manager);
Pipeline pipeline = createPipeline();
@@ -106,6 +107,7 @@
CurrentStateOutput currentStateOutput =
populateCurrentStateFromBestPossible(bestPossibleStateOutput);
event.addAttribute(AttributeName.CURRENT_STATE.name(), currentStateOutput);
+ event.addAttribute(AttributeName.CURRENT_STATE_EXCLUDING_UNKNOWN.name(), currentStateOutput);
Partition p = new Partition(db + "_0");
@@ -153,6 +155,7 @@
currentStateOutput.setPendingRelayMessage(db, p, masterInstance, relayMessage);
event.addAttribute(AttributeName.CURRENT_STATE.name(), currentStateOutput);
+ event.addAttribute(AttributeName.CURRENT_STATE_EXCLUDING_UNKNOWN.name(), currentStateOutput);
pipeline.handle(event);
@@ -167,6 +170,7 @@
currentStateOutput.setCurrentState(db, p, masterInstance, "SLAVE");
currentStateOutput.setPendingMessage(db, p, newMasterInstance, relayMessage);
event.addAttribute(AttributeName.CURRENT_STATE.name(), currentStateOutput);
+ event.addAttribute(AttributeName.CURRENT_STATE_EXCLUDING_UNKNOWN.name(), currentStateOutput);
pipeline.handle(event);
@@ -186,6 +190,7 @@
// but controller should not send S->M to newly calculated master.
currentStateOutput.setCurrentState(db, p, masterInstance, "OFFLINE");
event.addAttribute(AttributeName.CURRENT_STATE.name(), currentStateOutput);
+ event.addAttribute(AttributeName.CURRENT_STATE_EXCLUDING_UNKNOWN.name(), currentStateOutput);
String slaveInstance =
getTopStateInstance(bestPossibleStateOutput.getInstanceStateMap(db, p),
@@ -217,6 +222,7 @@
// Controller will not send S->M to new master.
currentStateOutput.setPendingMessage(db, p, newMasterInstance, relayMessage);
event.addAttribute(AttributeName.CURRENT_STATE.name(), currentStateOutput);
+ event.addAttribute(AttributeName.CURRENT_STATE_EXCLUDING_UNKNOWN.name(), currentStateOutput);
event.addAttribute(AttributeName.BEST_POSSIBLE_STATE.name(), bestPossibleStateOutput);
event.addAttribute(AttributeName.INTERMEDIATE_STATE.name(), bestPossibleStateOutput);
@@ -244,6 +250,7 @@
currentStateOutput.setCurrentState(db, p, slaveInstance, "SLAVE");
event.addAttribute(AttributeName.CURRENT_STATE.name(), currentStateOutput);
+ event.addAttribute(AttributeName.CURRENT_STATE_EXCLUDING_UNKNOWN.name(), currentStateOutput);
pipeline = new Pipeline("test");
pipeline.addStage(new MessageGenerationPhase());
@@ -271,6 +278,7 @@
event.addAttribute(AttributeName.RESOURCES.name(), resourceMap);
event.addAttribute(AttributeName.RESOURCES_TO_REBALANCE.name(), resourceMap);
event.addAttribute(AttributeName.CURRENT_STATE.name(), new CurrentStateOutput());
+ event.addAttribute(AttributeName.CURRENT_STATE_EXCLUDING_UNKNOWN.name(), new CurrentStateOutput());
event.addAttribute(AttributeName.helixmanager.name(), manager);
event.addAttribute(AttributeName.ControllerDataProvider.name(),
new ResourceControllerDataProvider());
@@ -284,6 +292,7 @@
CurrentStateOutput currentStateOutput =
populateCurrentStateFromBestPossible(bestPossibleStateOutput);
event.addAttribute(AttributeName.CURRENT_STATE.name(), currentStateOutput);
+ event.addAttribute(AttributeName.CURRENT_STATE_EXCLUDING_UNKNOWN.name(), currentStateOutput);
Partition p = new Partition(db + "_0");
diff --git a/helix-core/src/test/java/org/apache/helix/messaging/p2pMessage/TestP2PWithStateCancellationMessage.java b/helix-core/src/test/java/org/apache/helix/messaging/p2pMessage/TestP2PWithStateCancellationMessage.java
index ea2a4aa..9f60a4a 100644
--- a/helix-core/src/test/java/org/apache/helix/messaging/p2pMessage/TestP2PWithStateCancellationMessage.java
+++ b/helix-core/src/test/java/org/apache/helix/messaging/p2pMessage/TestP2PWithStateCancellationMessage.java
@@ -165,6 +165,7 @@
currentStateOutput.setCurrentState(RESOURCE_NAME, new Partition("0"), "localhost_2", "SLAVE");
currentStateOutput.setCurrentState(RESOURCE_NAME, new Partition("1"), "localhost_2", "MASTER");
event.addAttribute(AttributeName.CURRENT_STATE.name(), currentStateOutput);
+ event.addAttribute(AttributeName.CURRENT_STATE_EXCLUDING_UNKNOWN.name(), currentStateOutput);
BestPossibleStateOutput bestPossibleStateOutput = new BestPossibleStateOutput();
bestPossibleStateOutput.setState(RESOURCE_NAME, new Partition("0"), "localhost_1", "SLAVE");
diff --git a/helix-core/src/test/java/org/apache/helix/mock/MockHelixAdmin.java b/helix-core/src/test/java/org/apache/helix/mock/MockHelixAdmin.java
index 60ee9b6..9a1311b 100644
--- a/helix-core/src/test/java/org/apache/helix/mock/MockHelixAdmin.java
+++ b/helix-core/src/test/java/org/apache/helix/mock/MockHelixAdmin.java
@@ -266,11 +266,13 @@
}
+ @Deprecated
@Override
public void enableInstance(String clusterName, String instanceName, boolean enabled) {
enableInstance(clusterName, instanceName, enabled, null, null);
}
+ @Deprecated
@Override
public void enableInstance(String clusterName, String instanceName, boolean enabled,
InstanceConstants.InstanceDisabledType disabledType, String reason) {
@@ -283,7 +285,8 @@
ZNRecord record = (ZNRecord) _baseDataAccessor.get(instanceConfigPath, null, 0);
InstanceConfig instanceConfig = new InstanceConfig(record);
- instanceConfig.setInstanceEnabled(enabled);
+ instanceConfig.setInstanceOperation(enabled ? InstanceConstants.InstanceOperation.ENABLE
+ : InstanceConstants.InstanceOperation.DISABLE);
if (!enabled) {
instanceConfig.resetInstanceDisabledTypeAndReason();
if (reason != null) {
@@ -296,6 +299,7 @@
_baseDataAccessor.set(instanceConfigPath, instanceConfig.getRecord(), 0);
}
+ @Deprecated
@Override
public void enableInstance(String clusterName, List<String> instances, boolean enabled) {
diff --git a/helix-core/src/test/java/org/apache/helix/model/TestInstanceConfig.java b/helix-core/src/test/java/org/apache/helix/model/TestInstanceConfig.java
index b8e6569..7da983b 100644
--- a/helix-core/src/test/java/org/apache/helix/model/TestInstanceConfig.java
+++ b/helix-core/src/test/java/org/apache/helix/model/TestInstanceConfig.java
@@ -51,7 +51,7 @@
@Test
public void testSetInstanceEnableWithReason() {
InstanceConfig instanceConfig = new InstanceConfig(new ZNRecord("id"));
- instanceConfig.setInstanceEnabled(true);
+ instanceConfig.setInstanceOperation(InstanceConstants.InstanceOperation.ENABLE);
instanceConfig.setInstanceDisabledReason("NoShowReason");
instanceConfig.setInstanceDisabledType(InstanceConstants.InstanceDisabledType.USER_OPERATION);
@@ -63,7 +63,7 @@
.get(InstanceConfig.InstanceConfigProperty.HELIX_DISABLED_TYPE.toString()), null);
- instanceConfig.setInstanceEnabled(false);
+ instanceConfig.setInstanceOperation(InstanceConstants.InstanceOperation.DISABLE);
String reasonCode = "ReasonCode";
instanceConfig.setInstanceDisabledReason(reasonCode);
instanceConfig.setInstanceDisabledType(InstanceConstants.InstanceDisabledType.USER_OPERATION);
@@ -197,4 +197,45 @@
Assert.assertEquals(instanceConfig.getInstanceInfoMap().get("CABINET"), "30");
Assert.assertEquals(instanceConfig.getInstanceCapacityMap().get("weight1"), Integer.valueOf(1));
}
+
+ @Test
+ public void testOverwriteInstanceConfig() {
+ InstanceConfig instanceConfig = new InstanceConfig("instance2");
+ instanceConfig.setHostName("host1");
+ instanceConfig.setPort("1234");
+ instanceConfig.setDomain("foo=bar");
+ instanceConfig.setWeight(100);
+ instanceConfig.setInstanceEnabled(false);
+ instanceConfig.addTag("tag1");
+ instanceConfig.addTag("tag2");
+ instanceConfig.setInstanceCapacityMap(ImmutableMap.of("weight1", 1));
+
+ InstanceConfig overrideConfig = new InstanceConfig("instance1");
+ overrideConfig.setHostName("host2");
+ overrideConfig.setPort("5678");
+ overrideConfig.setDomain("foo=bar2");
+ overrideConfig.setWeight(200);
+ overrideConfig.addTag("tag3");
+ overrideConfig.addTag("tag4");
+ overrideConfig.setInstanceOperation(InstanceConstants.InstanceOperation.EVACUATE);
+ overrideConfig.setInstanceCapacityMap(ImmutableMap.of("weight2", 2));
+
+ instanceConfig.overwriteInstanceConfig(overrideConfig);
+
+ Assert.assertEquals(instanceConfig.getId(), "instance2");
+ Assert.assertEquals(instanceConfig.getHostName(), "host1");
+ Assert.assertEquals(instanceConfig.getPort(), "1234");
+ Assert.assertEquals(instanceConfig.getDomainAsString(), "foo=bar");
+ Assert.assertEquals(instanceConfig.getWeight(), 200);
+ Assert.assertFalse(instanceConfig.getTags().contains("tag1"));
+ Assert.assertFalse(instanceConfig.getTags().contains("tag2"));
+ Assert.assertTrue(instanceConfig.getTags().contains("tag3"));
+ Assert.assertTrue(instanceConfig.getTags().contains("tag4"));
+ Assert.assertFalse(instanceConfig.getRecord().getSimpleFields()
+ .containsKey(InstanceConfig.InstanceConfigProperty.HELIX_ENABLED.toString()));
+ Assert.assertEquals(instanceConfig.getInstanceOperation(),
+ InstanceConstants.InstanceOperation.EVACUATE);
+ Assert.assertFalse(instanceConfig.getInstanceCapacityMap().containsKey("weight1"));
+ Assert.assertEquals(instanceConfig.getInstanceCapacityMap().get("weight2"), Integer.valueOf(2));
+ }
}
diff --git a/helix-core/src/test/java/org/apache/helix/monitoring/mbeans/TestRebalancerMetrics.java b/helix-core/src/test/java/org/apache/helix/monitoring/mbeans/TestRebalancerMetrics.java
index 57a3ad0..b02a0f4 100644
--- a/helix-core/src/test/java/org/apache/helix/monitoring/mbeans/TestRebalancerMetrics.java
+++ b/helix-core/src/test/java/org/apache/helix/monitoring/mbeans/TestRebalancerMetrics.java
@@ -69,6 +69,7 @@
event.addAttribute(AttributeName.RESOURCES.name(), resourceMap);
event.addAttribute(AttributeName.RESOURCES_TO_REBALANCE.name(), resourceMap);
event.addAttribute(AttributeName.CURRENT_STATE.name(), currentStateOutput);
+ event.addAttribute(AttributeName.CURRENT_STATE_EXCLUDING_UNKNOWN.name(), currentStateOutput);
event.addAttribute(AttributeName.ControllerDataProvider.name(), new ResourceControllerDataProvider());
ClusterStatusMonitor monitor = new ClusterStatusMonitor(_clusterName);
monitor.active();
@@ -119,6 +120,7 @@
event.addAttribute(AttributeName.RESOURCES.name(), resourceMap);
event.addAttribute(AttributeName.RESOURCES_TO_REBALANCE.name(), resourceMap);
event.addAttribute(AttributeName.CURRENT_STATE.name(), currentStateOutput);
+ event.addAttribute(AttributeName.CURRENT_STATE_EXCLUDING_UNKNOWN.name(), currentStateOutput);
event.addAttribute(AttributeName.ControllerDataProvider.name(), new ResourceControllerDataProvider());
ClusterStatusMonitor monitor = new ClusterStatusMonitor(_clusterName);
monitor.active();
@@ -131,6 +133,7 @@
event.getAttribute(AttributeName.BEST_POSSIBLE_STATE.name());
currentStateOutput = copyCurrentStateFromBestPossible(bestPossibleStateOutput, resource);
event.addAttribute(AttributeName.CURRENT_STATE.name(), currentStateOutput);
+ event.addAttribute(AttributeName.CURRENT_STATE_EXCLUDING_UNKNOWN.name(), currentStateOutput);
setupLiveInstances(4);
diff --git a/helix-core/src/test/java/org/apache/helix/task/TestTargetedTaskStateChange.java b/helix-core/src/test/java/org/apache/helix/task/TestTargetedTaskStateChange.java
index d8810b1..24113c9 100644
--- a/helix-core/src/test/java/org/apache/helix/task/TestTargetedTaskStateChange.java
+++ b/helix-core/src/test/java/org/apache/helix/task/TestTargetedTaskStateChange.java
@@ -86,7 +86,7 @@
when(mock._cache.getTaskDataCache()).thenReturn(mock._taskDataCache);
when(mock._cache.getJobContext(JOB_NAME)).thenReturn(mock._jobContext);
when(mock._cache.getIdealStates()).thenReturn(mock._idealStates);
- when(mock._cache.getAssignableEnabledLiveInstances()).thenReturn(_liveInstances.keySet());
+ when(mock._cache.getEnabledLiveInstances()).thenReturn(_liveInstances.keySet());
when(mock._cache.getAssignableInstanceConfigMap()).thenReturn(_instanceConfigs);
when(mock._cache.getClusterConfig()).thenReturn(_clusterConfig);
when(mock._taskDataCache.getRuntimeJobDag(WORKFLOW_NAME)).thenReturn(mock._runtimeJobDag);
@@ -123,7 +123,7 @@
when(mock._cache.getTaskDataCache()).thenReturn(mock._taskDataCache);
when(mock._cache.getJobContext(JOB_NAME)).thenReturn(mock._jobContext);
when(mock._cache.getIdealStates()).thenReturn(mock._idealStates);
- when(mock._cache.getAssignableEnabledLiveInstances()).thenReturn(_liveInstances.keySet());
+ when(mock._cache.getEnabledLiveInstances()).thenReturn(_liveInstances.keySet());
when(mock._cache.getAssignableInstanceConfigMap()).thenReturn(_instanceConfigs);
when(mock._cache.getClusterConfig()).thenReturn(_clusterConfig);
when(mock._taskDataCache.getRuntimeJobDag(WORKFLOW_NAME)).thenReturn(mock._runtimeJobDag);
diff --git a/helix-core/src/test/java/org/apache/helix/util/TestIdealStateAssignment.java b/helix-core/src/test/java/org/apache/helix/util/TestIdealStateAssignment.java
index 6a7c8ba..7d2a1b3 100644
--- a/helix-core/src/test/java/org/apache/helix/util/TestIdealStateAssignment.java
+++ b/helix-core/src/test/java/org/apache/helix/util/TestIdealStateAssignment.java
@@ -23,6 +23,7 @@
import java.util.List;
import java.util.Map;
+import org.apache.helix.constants.InstanceConstants;
import org.apache.helix.model.ClusterConfig;
import org.apache.helix.model.IdealState;
import org.apache.helix.model.InstanceConfig;
@@ -43,7 +44,8 @@
for (String instance : instances) {
instanceConfigs.add(new InstanceConfig(instance));
if (disabledInstances.contains(instance)) {
- instanceConfigs.get(instanceConfigs.size() - 1).setInstanceEnabled(false);
+ instanceConfigs.get(instanceConfigs.size() - 1)
+ .setInstanceOperation(InstanceConstants.InstanceOperation.DISABLE);
}
}
diff --git a/helix-core/src/test/java/org/apache/helix/util/TestInstanceValidationUtil.java b/helix-core/src/test/java/org/apache/helix/util/TestInstanceValidationUtil.java
index 79b0fdc..88dd053 100644
--- a/helix-core/src/test/java/org/apache/helix/util/TestInstanceValidationUtil.java
+++ b/helix-core/src/test/java/org/apache/helix/util/TestInstanceValidationUtil.java
@@ -33,6 +33,7 @@
import org.apache.helix.HelixException;
import org.apache.helix.PropertyKey;
import org.apache.helix.PropertyType;
+import org.apache.helix.constants.InstanceConstants;
import org.apache.helix.model.ClusterConfig;
import org.apache.helix.model.CurrentState;
import org.apache.helix.model.ExternalView;
@@ -77,7 +78,9 @@
boolean expected) {
Mock mock = new Mock();
InstanceConfig instanceConfig = new InstanceConfig(TEST_INSTANCE);
- instanceConfig.setInstanceEnabled(instanceConfigEnabled);
+ instanceConfig.setInstanceOperation(
+ instanceConfigEnabled ? InstanceConstants.InstanceOperation.ENABLE
+ : InstanceConstants.InstanceOperation.DISABLE);
doReturn(instanceConfig).when(mock.dataAccessor)
.getProperty(BUILDER.instanceConfig(TEST_INSTANCE));
ClusterConfig clusterConfig = new ClusterConfig(TEST_CLUSTER);
@@ -101,17 +104,6 @@
InstanceValidationUtil.isEnabled(mock.dataAccessor, TEST_INSTANCE);
}
- @Test(expectedExceptions = HelixException.class)
- public void TestIsInstanceEnabled_whenClusterConfigNull() {
- Mock mock = new Mock();
- doReturn(new InstanceConfig(TEST_INSTANCE)).when(mock.dataAccessor)
- .getProperty(argThat(new PropertyKeyArgument(PropertyType.CONFIGS)));
- doReturn(null).when(mock.dataAccessor)
- .getProperty(BUILDER.clusterConfig());
-
- InstanceValidationUtil.isEnabled(mock.dataAccessor, TEST_INSTANCE);
- }
-
@Test
public void TestIsInstanceAlive() {
Mock mock = new Mock();
diff --git a/helix-core/src/test/resources/TestAbstractRebalancer.ComputeBestPossibleState.json b/helix-core/src/test/resources/TestAbstractRebalancer.ComputeBestPossibleState.json
index 3659942..30553e0 100644
--- a/helix-core/src/test/resources/TestAbstractRebalancer.ComputeBestPossibleState.json
+++ b/helix-core/src/test/resources/TestAbstractRebalancer.ComputeBestPossibleState.json
@@ -94,7 +94,8 @@
"node_3"
],
"expectedBestPossibleStateMap": {
- "node_1": "OFFLINE"
+ "node_1": "OFFLINE",
+ "node_3": "ERROR"
}
},
{
diff --git a/helix-rest/src/main/java/org/apache/helix/rest/clusterMaintenanceService/StoppableInstancesSelector.java b/helix-rest/src/main/java/org/apache/helix/rest/clusterMaintenanceService/StoppableInstancesSelector.java
index 877aaa9..8a4bbf0 100644
--- a/helix-rest/src/main/java/org/apache/helix/rest/clusterMaintenanceService/StoppableInstancesSelector.java
+++ b/helix-rest/src/main/java/org/apache/helix/rest/clusterMaintenanceService/StoppableInstancesSelector.java
@@ -268,7 +268,7 @@
PropertyKey.Builder propertyKeyBuilder = _dataAccessor.keyBuilder();
InstanceConfig instanceConfig =
_dataAccessor.getProperty(propertyKeyBuilder.instanceConfig(instance));
- if (InstanceConstants.InstanceOperation.EVACUATE.name()
+ if (InstanceConstants.InstanceOperation.EVACUATE
.equals(instanceConfig.getInstanceOperation())) {
toBeStoppedInstances.add(instance);
}
diff --git a/helix-rest/src/main/java/org/apache/helix/rest/server/resources/helix/ResourceAssignmentOptimizerAccessor.java b/helix-rest/src/main/java/org/apache/helix/rest/server/resources/helix/ResourceAssignmentOptimizerAccessor.java
index 03465a9..714b53f 100644
--- a/helix-rest/src/main/java/org/apache/helix/rest/server/resources/helix/ResourceAssignmentOptimizerAccessor.java
+++ b/helix-rest/src/main/java/org/apache/helix/rest/server/resources/helix/ResourceAssignmentOptimizerAccessor.java
@@ -44,6 +44,7 @@
import com.fasterxml.jackson.databind.ObjectMapper;
import org.apache.helix.ConfigAccessor;
import org.apache.helix.HelixDataAccessor;
+import org.apache.helix.constants.InstanceConstants;
import org.apache.helix.controller.rebalancer.strategy.AutoRebalanceStrategy;
import org.apache.helix.controller.rebalancer.strategy.RebalanceStrategy;
import org.apache.helix.controller.rebalancer.waged.WagedRebalancer;
@@ -225,7 +226,8 @@
// Throw exception if there is no instanceConfig for activatedInstances instance.
for (String instance : inputFields.activatedInstances) {
if (instanceConfigMap.containsKey(instance)) {
- instanceConfigMap.get(instance).setInstanceEnabled(true);
+ instanceConfigMap.get(instance)
+ .setInstanceOperation(InstanceConstants.InstanceOperation.ENABLE);
} else {
throw new InvalidParameterException(
"instance: " + instance + "does not have instanceConfig");
@@ -234,7 +236,8 @@
for (String instance : inputFields.deactivatedInstances) {
if (instanceConfigMap.containsKey(instance)) {
- instanceConfigMap.get(instance).setInstanceEnabled(false);
+ instanceConfigMap.get(instance)
+ .setInstanceOperation(InstanceConstants.InstanceOperation.DISABLE);
}
}
diff --git a/helix-rest/src/test/java/org/apache/helix/rest/server/AbstractTestClass.java b/helix-rest/src/test/java/org/apache/helix/rest/server/AbstractTestClass.java
index d0f0c57..c6ff0d6 100644
--- a/helix-rest/src/test/java/org/apache/helix/rest/server/AbstractTestClass.java
+++ b/helix-rest/src/test/java/org/apache/helix/rest/server/AbstractTestClass.java
@@ -45,6 +45,7 @@
import org.apache.helix.PropertyPathBuilder;
import org.apache.helix.PropertyType;
import org.apache.helix.TestHelper;
+import org.apache.helix.constants.InstanceConstants;
import org.apache.helix.integration.manager.ClusterControllerManager;
import org.apache.helix.integration.manager.MockParticipantManager;
import org.apache.helix.integration.task.MockTask;
@@ -574,7 +575,7 @@
instanceConfigs.add(new InstanceConfig(instances.get(instances.size() - 1)));
instanceConfigs.get(instanceConfigs.size() - 1).setDomain("helixZoneId=zone2,host=instance5");
- instanceConfigs.get(1).setInstanceEnabled(false);
+ instanceConfigs.get(1).setInstanceOperation(InstanceConstants.InstanceOperation.DISABLE);
instanceConfigs.get(3).setInstanceEnabledForPartition("FakeResource", "FakePartition", false);
for (InstanceConfig instanceConfig : instanceConfigs) {
diff --git a/helix-rest/src/test/java/org/apache/helix/rest/server/TestInstancesAccessor.java b/helix-rest/src/test/java/org/apache/helix/rest/server/TestInstancesAccessor.java
index 93722f0..0403083 100644
--- a/helix-rest/src/test/java/org/apache/helix/rest/server/TestInstancesAccessor.java
+++ b/helix-rest/src/test/java/org/apache/helix/rest/server/TestInstancesAccessor.java
@@ -359,7 +359,7 @@
// Disable one selected instance0, it should failed to check
String instance = "instance0";
InstanceConfig instanceConfig = _configAccessor.getInstanceConfig(STOPPABLE_CLUSTER, instance);
- instanceConfig.setInstanceEnabled(false);
+ instanceConfig.setInstanceOperation(InstanceConstants.InstanceOperation.DISABLE);
instanceConfig.setInstanceEnabledForPartition("FakeResource", "FakePartition", false);
_configAccessor.setInstanceConfig(STOPPABLE_CLUSTER, instance, instanceConfig);
@@ -377,7 +377,7 @@
ImmutableSet.of("HELIX:HAS_DISABLED_PARTITION","HELIX:INSTANCE_NOT_ENABLED","HELIX:INSTANCE_NOT_STABLE","HELIX:MIN_ACTIVE_REPLICA_CHECK_FAILED"));
// Reenable instance0, it should passed the check
- instanceConfig.setInstanceEnabled(true);
+ instanceConfig.setInstanceOperation(InstanceConstants.InstanceOperation.ENABLE);
instanceConfig.setInstanceEnabledForPartition("FakeResource", "FakePartition", true);
_configAccessor.setInstanceConfig(STOPPABLE_CLUSTER, instance, instanceConfig);
Assert.assertTrue(verifier.verifyByPolling());
diff --git a/helix-rest/src/test/java/org/apache/helix/rest/server/TestPartitionAssignmentAPI.java b/helix-rest/src/test/java/org/apache/helix/rest/server/TestPartitionAssignmentAPI.java
index 2c7a46b..e00c392 100644
--- a/helix-rest/src/test/java/org/apache/helix/rest/server/TestPartitionAssignmentAPI.java
+++ b/helix-rest/src/test/java/org/apache/helix/rest/server/TestPartitionAssignmentAPI.java
@@ -36,6 +36,7 @@
import org.apache.helix.ConfigAccessor;
import org.apache.helix.HelixDataAccessor;
import org.apache.helix.TestHelper;
+import org.apache.helix.constants.InstanceConstants;
import org.apache.helix.controller.rebalancer.DelayedAutoRebalancer;
import org.apache.helix.controller.rebalancer.strategy.CrushEdRebalanceStrategy;
import org.apache.helix.controller.rebalancer.waged.WagedRebalancer;
@@ -107,7 +108,7 @@
for (int i = 0; i < DEFAULT_INSTANCE_COUNT; i++) {
String instanceName = INSTANCE_NAME_PREFIX + (INSTANCE_START_PORT + i);
InstanceConfig instanceConfig = new InstanceConfig(instanceName);
- instanceConfig.setInstanceEnabled(true);
+ instanceConfig.setInstanceOperation(InstanceConstants.InstanceOperation.ENABLE);
instanceConfig.setInstanceCapacityMap(
Collections.singletonMap(INSTANCE_CAPACITY_KEY, DEFAULT_INSTANCE_CAPACITY));
_gSetupTool.getClusterManagementTool().addInstance(CLUSTER_NAME, instanceConfig);
diff --git a/helix-rest/src/test/java/org/apache/helix/rest/server/TestPerInstanceAccessor.java b/helix-rest/src/test/java/org/apache/helix/rest/server/TestPerInstanceAccessor.java
index f9e48ca..943444c 100644
--- a/helix-rest/src/test/java/org/apache/helix/rest/server/TestPerInstanceAccessor.java
+++ b/helix-rest/src/test/java/org/apache/helix/rest/server/TestPerInstanceAccessor.java
@@ -36,7 +36,6 @@
import com.fasterxml.jackson.databind.node.ArrayNode;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap;
-import org.apache.helix.ConfigAccessor;
import org.apache.helix.HelixDataAccessor;
import org.apache.helix.HelixException;
import org.apache.helix.TestHelper;
@@ -50,9 +49,6 @@
import org.apache.helix.rest.server.resources.helix.InstancesAccessor;
import org.apache.helix.rest.server.resources.helix.PerInstanceAccessor;
import org.apache.helix.rest.server.util.JerseyUriRequestBuilder;
-import org.apache.helix.tools.ClusterStateVerifier;
-import org.apache.helix.tools.ClusterVerifiers.StrictMatchExternalViewVerifier;
-import org.apache.helix.tools.ClusterVerifiers.ZkHelixClusterVerifier;
import org.apache.helix.zookeeper.datamodel.ZNRecord;
import org.testng.Assert;
import org.testng.annotations.Test;
@@ -500,15 +496,15 @@
new JerseyUriRequestBuilder("clusters/{}/instances/{}?command=setInstanceOperation&instanceOperation=EVACUATE")
.format(CLUSTER_NAME, INSTANCE_NAME).post(this, entity);
instanceConfig = _configAccessor.getInstanceConfig(CLUSTER_NAME, INSTANCE_NAME);
- Assert.assertEquals(
- instanceConfig.getInstanceOperation(), InstanceConstants.InstanceOperation.EVACUATE.toString());
+ Assert.assertEquals(instanceConfig.getInstanceOperation(),
+ InstanceConstants.InstanceOperation.EVACUATE);
new JerseyUriRequestBuilder("clusters/{}/instances/{}?command=setInstanceOperation&instanceOperation=INVALIDOP")
.expectedReturnStatusCode(Response.Status.NOT_FOUND.getStatusCode()).format(CLUSTER_NAME, INSTANCE_NAME).post(this, entity);
new JerseyUriRequestBuilder("clusters/{}/instances/{}?command=setInstanceOperation&instanceOperation=")
.format(CLUSTER_NAME, INSTANCE_NAME).post(this, entity);
instanceConfig = _configAccessor.getInstanceConfig(CLUSTER_NAME, INSTANCE_NAME);
- Assert.assertEquals(
- instanceConfig.getInstanceOperation(), "");
+ Assert.assertEquals(instanceConfig.getInstanceOperation(),
+ InstanceConstants.InstanceOperation.ENABLE);
// test canCompleteSwap
Response canCompleteSwapResponse =
@@ -548,8 +544,8 @@
new JerseyUriRequestBuilder("clusters/{}/instances/{}?command=setInstanceOperation&instanceOperation=EVACUATE")
.format(CLUSTER_NAME, INSTANCE_NAME).post(this, entity);
instanceConfig = _configAccessor.getInstanceConfig(CLUSTER_NAME, INSTANCE_NAME);
- Assert.assertEquals(
- instanceConfig.getInstanceOperation(), InstanceConstants.InstanceOperation.EVACUATE.toString());
+ Assert.assertEquals(instanceConfig.getInstanceOperation(),
+ InstanceConstants.InstanceOperation.EVACUATE);
Response response = new JerseyUriRequestBuilder("clusters/{}/instances/{}?command=isEvacuateFinished")
.format(CLUSTER_NAME, INSTANCE_NAME).post(this, entity);
@@ -591,8 +587,8 @@
new JerseyUriRequestBuilder("clusters/{}/instances/{}?command=setInstanceOperation&instanceOperation=EVACUATE")
.format(CLUSTER_NAME, test_instance_name).post(this, entity);
instanceConfig = _configAccessor.getInstanceConfig(CLUSTER_NAME, test_instance_name);
- Assert.assertEquals(
- instanceConfig.getInstanceOperation(), InstanceConstants.InstanceOperation.EVACUATE.toString());
+ Assert.assertEquals(instanceConfig.getInstanceOperation(),
+ InstanceConstants.InstanceOperation.EVACUATE);
response = new JerseyUriRequestBuilder("clusters/{}/instances/{}?command=isEvacuateFinished")
.format(CLUSTER_NAME, test_instance_name).post(this, entity);
diff --git a/helix-rest/src/test/java/org/apache/helix/rest/server/TestResourceAssignmentOptimizerAccessor.java b/helix-rest/src/test/java/org/apache/helix/rest/server/TestResourceAssignmentOptimizerAccessor.java
index 8992566..7d49318 100644
--- a/helix-rest/src/test/java/org/apache/helix/rest/server/TestResourceAssignmentOptimizerAccessor.java
+++ b/helix-rest/src/test/java/org/apache/helix/rest/server/TestResourceAssignmentOptimizerAccessor.java
@@ -34,6 +34,7 @@
import com.fasterxml.jackson.core.type.TypeReference;
import org.apache.helix.HelixDataAccessor;
import org.apache.helix.TestHelper;
+import org.apache.helix.constants.InstanceConstants;
import org.apache.helix.manager.zk.ZKHelixDataAccessor;
import org.apache.helix.model.IdealState;
import org.apache.helix.model.InstanceConfig;
@@ -68,7 +69,7 @@
toEnabledInstance = liveInstances.get(2);
InstanceConfig config = _gSetupTool.getClusterManagementTool()
.getInstanceConfig(cluster, toEnabledInstance);
- config.setInstanceEnabled(false);
+ config.setInstanceOperation(InstanceConstants.InstanceOperation.DISABLE);
_gSetupTool.getClusterManagementTool()
.setInstanceConfig(cluster, toEnabledInstance, config);
@@ -94,7 +95,7 @@
}
InstanceConfig config = _gSetupTool.getClusterManagementTool()
.getInstanceConfig(cluster, toEnabledInstance);
- config.setInstanceEnabled(true);
+ config.setInstanceOperation(InstanceConstants.InstanceOperation.DISABLE);
_gSetupTool.getClusterManagementTool().setInstanceConfig(cluster, toEnabledInstance, config);
_gSetupTool.getClusterManagementTool()
.enableMaintenanceMode(cluster, false, TestHelper.getTestMethodName());
@@ -245,8 +246,8 @@
InstanceConfig toEnabledInstanceConfig =
_gSetupTool.getClusterManagementTool().getInstanceConfig(cluster, toEnabledInstance);
// Another way to mark the node as inactive or active.
- toDeactivatedInstanceConfig.setInstanceEnabled(false);
- toEnabledInstanceConfig.setInstanceEnabled(true);
+ toDeactivatedInstanceConfig.setInstanceOperation(InstanceConstants.InstanceOperation.DISABLE);
+ toEnabledInstanceConfig.setInstanceOperation(InstanceConstants.InstanceOperation.ENABLE);
// Write the current InstanceConfigs record to json string
StringWriter sw = new StringWriter();
OBJECT_MAPPER.writeValue(sw, toDeactivatedInstanceConfig.getRecord());
diff --git a/recipes/rabbitmq-consumer-group/src/main/java/org/apache/helix/recipes/rabbitmq/Consumer.java b/recipes/rabbitmq-consumer-group/src/main/java/org/apache/helix/recipes/rabbitmq/Consumer.java
index 998f117..eddd68b 100644
--- a/recipes/rabbitmq-consumer-group/src/main/java/org/apache/helix/recipes/rabbitmq/Consumer.java
+++ b/recipes/rabbitmq-consumer-group/src/main/java/org/apache/helix/recipes/rabbitmq/Consumer.java
@@ -24,6 +24,7 @@
import org.apache.helix.HelixManager;
import org.apache.helix.HelixManagerFactory;
import org.apache.helix.InstanceType;
+import org.apache.helix.constants.InstanceConstants;
import org.apache.helix.manager.zk.ZKHelixAdmin;
import org.apache.helix.manager.zk.ZNRecordSerializer;
import org.apache.helix.zookeeper.impl.client.ZkClient;
@@ -98,7 +99,7 @@
if (!nodes.contains("consumer_" + consumerId)) {
InstanceConfig config = new InstanceConfig("consumer_" + consumerId);
config.setHostName("localhost");
- config.setInstanceEnabled(true);
+ config.setInstanceOperation(InstanceConstants.InstanceOperation.ENABLE);
admin.addInstance(clusterName, config);
}
diff --git a/recipes/rsync-replicated-file-system/src/main/java/org/apache/helix/filestore/SetupCluster.java b/recipes/rsync-replicated-file-system/src/main/java/org/apache/helix/filestore/SetupCluster.java
index 9dbcbfe..5b8e736 100644
--- a/recipes/rsync-replicated-file-system/src/main/java/org/apache/helix/filestore/SetupCluster.java
+++ b/recipes/rsync-replicated-file-system/src/main/java/org/apache/helix/filestore/SetupCluster.java
@@ -19,6 +19,7 @@
* under the License.
*/
+import org.apache.helix.constants.InstanceConstants;
import org.apache.helix.manager.zk.ZKHelixAdmin;
import org.apache.helix.manager.zk.ZNRecordSerializer;
import org.apache.helix.zookeeper.impl.client.ZkClient;
@@ -65,7 +66,7 @@
InstanceConfig config = new InstanceConfig(serverId);
config.setHostName("localhost");
config.setPort(port);
- config.setInstanceEnabled(true);
+ config.setInstanceOperation(InstanceConstants.InstanceOperation.ENABLE);
admin.addInstance(clusterName, config);
}
// add resource "repository" which has 1 partition
diff --git a/recipes/task-execution/src/main/java/org/apache/helix/taskexecution/Worker.java b/recipes/task-execution/src/main/java/org/apache/helix/taskexecution/Worker.java
index 11d4395..3704c54 100644
--- a/recipes/task-execution/src/main/java/org/apache/helix/taskexecution/Worker.java
+++ b/recipes/task-execution/src/main/java/org/apache/helix/taskexecution/Worker.java
@@ -24,6 +24,7 @@
import org.apache.helix.HelixManager;
import org.apache.helix.HelixManagerFactory;
import org.apache.helix.InstanceType;
+import org.apache.helix.constants.InstanceConstants;
import org.apache.helix.manager.zk.ZKHelixAdmin;
import org.apache.helix.manager.zk.ZNRecordSerializer;
import org.apache.helix.zookeeper.impl.client.ZkClient;
@@ -98,7 +99,7 @@
if (!nodes.contains(_instanceName)) {
InstanceConfig config = new InstanceConfig(_instanceName);
config.setHostName("localhost");
- config.setInstanceEnabled(true);
+ config.setInstanceOperation(InstanceConstants.InstanceOperation.ENABLE);
admin.addInstance(_clusterName, config);
}