Separate AssignableNode properties by Immutable and Mutable (#485)
Make AssignableNode properties different by Immutable and Mutable
- It helps detect any wrong usage of these properties early
diff --git a/helix-core/src/main/java/org/apache/helix/controller/rebalancer/waged/constraints/NodeCapacityConstraint.java b/helix-core/src/main/java/org/apache/helix/controller/rebalancer/waged/constraints/NodeCapacityConstraint.java
index 5fc2faf..827d6ce 100644
--- a/helix-core/src/main/java/org/apache/helix/controller/rebalancer/waged/constraints/NodeCapacityConstraint.java
+++ b/helix-core/src/main/java/org/apache/helix/controller/rebalancer/waged/constraints/NodeCapacityConstraint.java
@@ -30,7 +30,7 @@
@Override
boolean isAssignmentValid(AssignableNode node, AssignableReplica replica,
ClusterContext clusterContext) {
- Map<String, Integer> nodeCapacity = node.getCurrentCapacity();
+ Map<String, Integer> nodeCapacity = node.getRemainingCapacity();
Map<String, Integer> replicaCapacity = replica.getCapacity();
for (String key : replicaCapacity.keySet()) {
diff --git a/helix-core/src/main/java/org/apache/helix/controller/rebalancer/waged/model/AssignableNode.java b/helix-core/src/main/java/org/apache/helix/controller/rebalancer/waged/model/AssignableNode.java
index 20de6da..2a68e15 100644
--- a/helix-core/src/main/java/org/apache/helix/controller/rebalancer/waged/model/AssignableNode.java
+++ b/helix-core/src/main/java/org/apache/helix/controller/rebalancer/waged/model/AssignableNode.java
@@ -23,7 +23,6 @@
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
-import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
@@ -35,6 +34,10 @@
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import com.google.common.collect.ImmutableList;
+import com.google.common.collect.ImmutableMap;
+import com.google.common.collect.ImmutableSet;
+
/**
* This class represents a possible allocation of the replication.
* Note that any usage updates to the AssignableNode are not thread safe.
@@ -42,39 +45,25 @@
public class AssignableNode implements Comparable<AssignableNode> {
private static final Logger LOG = LoggerFactory.getLogger(AssignableNode.class.getName());
- // basic node information
+ // Immutable Instance Properties
private final String _instanceName;
- private Set<String> _instanceTags;
- private String _faultZone;
- private Map<String, List<String>> _disabledPartitionsMap;
- private Map<String, Integer> _maxCapacity;
- private int _maxPartition; // maximum number of the partitions that can be assigned to the node.
+ private final String _faultZone;
+ // maximum number of the partitions that can be assigned to the instance.
+ private final int _maxPartition;
+ private final ImmutableSet<String> _instanceTags;
+ private final ImmutableMap<String, List<String>> _disabledPartitionsMap;
+ private final ImmutableMap<String, Integer> _maxAllowedCapacity;
+ // Mutable (Dynamic) Instance Properties
// A map of <resource name, <partition name, replica>> that tracks the replicas assigned to the
// node.
private Map<String, Map<String, AssignableReplica>> _currentAssignedReplicaMap;
// A map of <capacity key, capacity value> that tracks the current available node capacity
- private Map<String, Integer> _currentCapacityMap;
+ private Map<String, Integer> _remainingCapacity;
// The maximum capacity utilization (0.0 - 1.0) across all the capacity categories.
private float _highestCapacityUtilization;
/**
- * @param clusterConfig
- * @param instanceConfig
- * @param instanceName
- */
- AssignableNode(ClusterConfig clusterConfig, InstanceConfig instanceConfig, String instanceName) {
- _instanceName = instanceName;
- refresh(clusterConfig, instanceConfig);
- }
-
- private void reset() {
- _currentAssignedReplicaMap = new HashMap<>();
- _currentCapacityMap = new HashMap<>();
- _highestCapacityUtilization = 0;
- }
-
- /**
* Update the node with a ClusterDataCache. This resets the current assignment and recalculates
* currentCapacity.
* NOTE: While this is required to be used in the constructor, this can also be used when the
@@ -82,29 +71,31 @@
* refreshed. This is under the assumption that the capacity mappings of InstanceConfig and
* ResourceConfig could
* subject to change. If the assumption is no longer true, this function should become private.
- * @param clusterConfig - the Cluster Config of the cluster where the node is located
- * @param instanceConfig - the Instance Config of the node
*/
- private void refresh(ClusterConfig clusterConfig, InstanceConfig instanceConfig) {
- reset();
-
+ AssignableNode(ClusterConfig clusterConfig, InstanceConfig instanceConfig, String instanceName) {
+ _instanceName = instanceName;
Map<String, Integer> instanceCapacity = fetchInstanceCapacity(clusterConfig, instanceConfig);
- _currentCapacityMap.putAll(instanceCapacity);
_faultZone = computeFaultZone(clusterConfig, instanceConfig);
- _instanceTags = new HashSet<>(instanceConfig.getTags());
- _disabledPartitionsMap = instanceConfig.getDisabledPartitionsMap();
- _maxCapacity = instanceCapacity;
+ _instanceTags = ImmutableSet.copyOf(instanceConfig.getTags());
+ _disabledPartitionsMap = ImmutableMap.copyOf(instanceConfig.getDisabledPartitionsMap());
+ // make a copy of max capacity
+ _maxAllowedCapacity = ImmutableMap.copyOf(instanceCapacity);
+ _remainingCapacity = new HashMap<>(instanceCapacity);
_maxPartition = clusterConfig.getMaxPartitionsPerInstance();
+ _currentAssignedReplicaMap = new HashMap<>();
+ _highestCapacityUtilization = 0f;
}
/**
* This function should only be used to assign a set of new partitions that are not allocated on
- * this node.
+ * this node. It's because the any exception could occur at the middle of batch assignment and the
+ * previous finished assignment cannot be reverted
* Using this function avoids the overhead of updating capacity repeatedly.
*/
- void assignNewBatch(Collection<AssignableReplica> replicas) {
+ void assignInitBatch(Collection<AssignableReplica> replicas) {
Map<String, Integer> totalPartitionCapacity = new HashMap<>();
for (AssignableReplica replica : replicas) {
+ // TODO: the exception could occur in the middle of for loop and the previous added records cannot be reverted
addToAssignmentRecord(replica);
// increment the capacity requirement according to partition's capacity configuration.
for (Map.Entry<String, Integer> capacity : replica.getCapacity().entrySet()) {
@@ -115,8 +106,8 @@
}
// Update the global state after all single replications' calculation is done.
- for (String key : totalPartitionCapacity.keySet()) {
- updateCapacityAndUtilization(key, totalPartitionCapacity.get(key));
+ for (String capacityKey : totalPartitionCapacity.keySet()) {
+ updateCapacityAndUtilization(capacityKey, totalPartitionCapacity.get(capacityKey));
}
}
@@ -127,7 +118,7 @@
void assign(AssignableReplica assignableReplica) {
addToAssignmentRecord(assignableReplica);
assignableReplica.getCapacity().entrySet().stream()
- .forEach(capacity -> updateCapacityAndUtilization(capacity.getKey(), capacity.getValue()));
+ .forEach(capacity -> updateCapacityAndUtilization(capacity.getKey(), capacity.getValue()));
}
/**
@@ -218,8 +209,16 @@
/**
* @return The current available capacity.
*/
- public Map<String, Integer> getCurrentCapacity() {
- return _currentCapacityMap;
+ public Map<String, Integer> getRemainingCapacity() {
+ return _remainingCapacity;
+ }
+
+ /**
+ * @return A map of <capacity category, capacity number> that describes the max capacity of the
+ * node.
+ */
+ public Map<String, Integer> getMaxCapacity() {
+ return _maxAllowedCapacity;
}
/**
@@ -228,7 +227,6 @@
* categories.
* For example, if the current node usage is {CPU: 0.9, MEM: 0.4, DISK: 0.6}. Then this call shall
* return 0.9.
- *
* @return The highest utilization number of the node among all the capacity category.
*/
public float getHighestCapacityUtilization() {
@@ -260,14 +258,6 @@
}
/**
- * @return A map of <capacity category, capacity number> that describes the max capacity of the
- * node.
- */
- public Map<String, Integer> getMaxCapacity() {
- return _maxCapacity;
- }
-
- /**
* @return The max partition count that are allowed to be allocated on the node.
*/
public int getMaxPartition() {
@@ -294,14 +284,15 @@
if (topologyStr == null || faultZoneType == null) {
LOG.debug("Topology configuration is not complete. Topology define: {}, Fault Zone Type: {}",
topologyStr, faultZoneType);
- // Use the instance name, or the deprecated ZoneId field (if exists) as the default fault zone.
+ // Use the instance name, or the deprecated ZoneId field (if exists) as the default fault
+ // zone.
String zoneId = instanceConfig.getZoneId();
return zoneId == null ? instanceConfig.getInstanceName() : zoneId;
} else {
// Get the fault zone information from the complete topology definition.
String[] topologyDef = topologyStr.trim().split("/");
- if (topologyDef.length == 0 ||
- Arrays.stream(topologyDef).noneMatch(type -> type.equals(faultZoneType))) {
+ if (topologyDef.length == 0
+ || Arrays.stream(topologyDef).noneMatch(type -> type.equals(faultZoneType))) {
throw new HelixException(
"The configured topology definition is empty or does not contain the fault zone type.");
}
@@ -350,22 +341,22 @@
}
}
- private void updateCapacityAndUtilization(String capacityKey, int valueToSubtract) {
- if (_currentCapacityMap.containsKey(capacityKey)) {
- int newCapacity = _currentCapacityMap.get(capacityKey) - valueToSubtract;
- _currentCapacityMap.put(capacityKey, newCapacity);
- // For the purpose of constraint calculation, the max utilization cannot be larger than 100%.
- float utilization = Math.min(
- (float) (_maxCapacity.get(capacityKey) - newCapacity) / _maxCapacity.get(capacityKey), 1);
- _highestCapacityUtilization = Math.max(_highestCapacityUtilization, utilization);
+ private void updateCapacityAndUtilization(String capacityKey, int usage) {
+ if (!_remainingCapacity.containsKey(capacityKey)) {
+ //if the capacityKey belongs to replicas does not exist in the instance's capacity,
+ // it will be treated as if it has unlimited capacity of that capacityKey
+ return;
}
- // else if the capacityKey does not exist in the capacity map, this method essentially becomes
- // a NOP; in other words, this node will be treated as if it has unlimited capacity.
+ int newCapacity = _remainingCapacity.get(capacityKey) - usage;
+ _remainingCapacity.put(capacityKey, newCapacity);
+ // For the purpose of constraint calculation, the max utilization cannot be larger than 100%.
+ float utilization = Math.min((float) (_maxAllowedCapacity.get(capacityKey) - newCapacity)
+ / _maxAllowedCapacity.get(capacityKey), 1);
+ _highestCapacityUtilization = Math.max(_highestCapacityUtilization, utilization);
}
/**
* Get and validate the instance capacity from instance config.
- *
* @throws HelixException if any required capacity key is not configured in the instance config.
*/
private Map<String, Integer> fetchInstanceCapacity(ClusterConfig clusterConfig,
diff --git a/helix-core/src/main/java/org/apache/helix/controller/rebalancer/waged/model/ClusterModelProvider.java b/helix-core/src/main/java/org/apache/helix/controller/rebalancer/waged/model/ClusterModelProvider.java
index 2b53422..276b998 100644
--- a/helix-core/src/main/java/org/apache/helix/controller/rebalancer/waged/model/ClusterModelProvider.java
+++ b/helix-core/src/main/java/org/apache/helix/controller/rebalancer/waged/model/ClusterModelProvider.java
@@ -80,7 +80,7 @@
bestPossibleAssignment, allocatedReplicas);
// Update the allocated replicas to the assignable nodes.
- assignableNodes.stream().forEach(node -> node.assignNewBatch(
+ assignableNodes.stream().forEach(node -> node.assignInitBatch(
allocatedReplicas.getOrDefault(node.getInstanceName(), Collections.emptySet())));
// Construct and initialize cluster context.
diff --git a/helix-core/src/test/java/org/apache/helix/controller/rebalancer/waged/constraints/TestNodeCapacityConstraint.java b/helix-core/src/test/java/org/apache/helix/controller/rebalancer/waged/constraints/TestNodeCapacityConstraint.java
index 511f881..4365a42 100644
--- a/helix-core/src/test/java/org/apache/helix/controller/rebalancer/waged/constraints/TestNodeCapacityConstraint.java
+++ b/helix-core/src/test/java/org/apache/helix/controller/rebalancer/waged/constraints/TestNodeCapacityConstraint.java
@@ -39,7 +39,7 @@
@Test
public void testConstraintValidWhenNodeHasEnoughSpace() {
String key = "testKey";
- when(_testNode.getCurrentCapacity()).thenReturn(ImmutableMap.of(key, 10));
+ when(_testNode.getRemainingCapacity()).thenReturn(ImmutableMap.of(key, 10));
when(_testReplica.getCapacity()).thenReturn(ImmutableMap.of(key, 5));
Assert.assertTrue(_constraint.isAssignmentValid(_testNode, _testReplica, _clusterContext));
}
@@ -47,7 +47,7 @@
@Test
public void testConstraintInValidWhenNodeHasInsufficientSpace() {
String key = "testKey";
- when(_testNode.getCurrentCapacity()).thenReturn(ImmutableMap.of(key, 1));
+ when(_testNode.getRemainingCapacity()).thenReturn(ImmutableMap.of(key, 1));
when(_testReplica.getCapacity()).thenReturn(ImmutableMap.of(key, 5));
Assert.assertFalse(_constraint.isAssignmentValid(_testNode, _testReplica, _clusterContext));
}
diff --git a/helix-core/src/test/java/org/apache/helix/controller/rebalancer/waged/model/TestAssignableNode.java b/helix-core/src/test/java/org/apache/helix/controller/rebalancer/waged/model/TestAssignableNode.java
index 6975901..b48587f 100644
--- a/helix-core/src/test/java/org/apache/helix/controller/rebalancer/waged/model/TestAssignableNode.java
+++ b/helix-core/src/test/java/org/apache/helix/controller/rebalancer/waged/model/TestAssignableNode.java
@@ -19,9 +19,10 @@
* under the License.
*/
+import static org.mockito.Mockito.when;
+
import java.io.IOException;
import java.util.ArrayList;
-import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
@@ -37,8 +38,6 @@
import org.testng.annotations.BeforeClass;
import org.testng.annotations.Test;
-import static org.mockito.Mockito.when;
-
public class TestAssignableNode extends AbstractTestClusterModel {
@BeforeClass
public void initialize() {
@@ -65,7 +64,7 @@
AssignableNode assignableNode = new AssignableNode(testCache.getClusterConfig(),
testCache.getInstanceConfigMap().get(_testInstanceId), _testInstanceId);
- assignableNode.assignNewBatch(assignmentSet);
+ assignableNode.assignInitBatch(assignmentSet);
Assert.assertEquals(assignableNode.getAssignedPartitionsMap(), expectedAssignment);
Assert.assertEquals(assignableNode.getAssignedReplicaCount(), 4);
Assert.assertEquals(assignableNode.getHighestCapacityUtilization(), 16.0 / 20.0, 0.005);
@@ -74,7 +73,7 @@
Assert.assertEquals(assignableNode.getInstanceTags(), _testInstanceTags);
Assert.assertEquals(assignableNode.getFaultZone(), _testFaultZoneId);
Assert.assertEquals(assignableNode.getDisabledPartitionsMap(), _disabledPartitionsMap);
- Assert.assertEquals(assignableNode.getCurrentCapacity(), expectedCapacityMap);
+ Assert.assertEquals(assignableNode.getRemainingCapacity(), expectedCapacityMap);
Assert.assertEquals(assignableNode.getAssignedReplicas(), assignmentSet);
Assert.assertEquals(assignableNode.getAssignedPartitionsByResource(_resourceNames.get(0)),
expectedAssignmentSet1);
@@ -114,7 +113,7 @@
Assert.assertEquals(assignableNode.getInstanceTags(), _testInstanceTags);
Assert.assertEquals(assignableNode.getFaultZone(), _testFaultZoneId);
Assert.assertEquals(assignableNode.getDisabledPartitionsMap(), _disabledPartitionsMap);
- Assert.assertEquals(assignableNode.getCurrentCapacity(), expectedCapacityMap);
+ Assert.assertEquals(assignableNode.getRemainingCapacity(), expectedCapacityMap);
Assert.assertEquals(assignableNode.getAssignedReplicas(), assignmentSet);
Assert.assertEquals(assignableNode.getAssignedPartitionsByResource(_resourceNames.get(0)),
expectedAssignmentSet1);
@@ -147,7 +146,7 @@
Assert.assertEquals(assignableNode.getInstanceTags(), _testInstanceTags);
Assert.assertEquals(assignableNode.getFaultZone(), _testFaultZoneId);
Assert.assertEquals(assignableNode.getDisabledPartitionsMap(), _disabledPartitionsMap);
- Assert.assertEquals(assignableNode.getCurrentCapacity(), expectedCapacityMap);
+ Assert.assertEquals(assignableNode.getRemainingCapacity(), expectedCapacityMap);
Assert.assertEquals(assignableNode.getAssignedReplicas(), assignmentSet);
Assert.assertEquals(assignableNode.getAssignedPartitionsByResource(_resourceNames.get(0)),
expectedAssignmentSet1);
@@ -184,7 +183,7 @@
AssignableNode assignableNode = new AssignableNode(testCache.getClusterConfig(),
testCache.getInstanceConfigMap().get(_testInstanceId), _testInstanceId);
- assignableNode.assignNewBatch(assignmentSet);
+ assignableNode.assignInitBatch(assignmentSet);
AssignableReplica duplicateReplica = new AssignableReplica(testCache.getClusterConfig(),
testCache.getResourceConfig(_resourceNames.get(0)), _partitionNames.get(0), "SLAVE", 2);
assignableNode.assign(duplicateReplica);