Add new WAGED rebalancer config item "GLOBAL_REBALANCE_ASYNC_MODE". (#637)
This option will be used by the WAGED rebalancer to determine if the global rebalance should be performed asynchronously.
diff --git a/helix-core/src/main/java/org/apache/helix/model/ClusterConfig.java b/helix-core/src/main/java/org/apache/helix/model/ClusterConfig.java
index f9d12f4..f2b6090 100644
--- a/helix-core/src/main/java/org/apache/helix/model/ClusterConfig.java
+++ b/helix-core/src/main/java/org/apache/helix/model/ClusterConfig.java
@@ -85,6 +85,10 @@
// Specifies job types and used for quota allocation
QUOTA_TYPES,
+ /**
+ * Configurable characteristics of the WAGED rebalancer.
+ * TODO: Split the WAGED rebalancer configuration items to the other config file.
+ */
// The required instance capacity keys for resource partition assignment calculation.
INSTANCE_CAPACITY_KEYS,
// The default instance capacity if no capacity is configured in the Instance Config node.
@@ -94,7 +98,18 @@
// The preference of the rebalance result.
// EVENNESS - Evenness of the resource utilization, partition, and top state distribution.
// LESS_MOVEMENT - the tendency of keeping the current assignment instead of moving the partition for optimal assignment.
- REBALANCE_PREFERENCE
+ REBALANCE_PREFERENCE,
+ // Specify if the WAGED rebalancer should asynchronously perform the global rebalance, which is
+ // in general slower than the partial rebalance.
+ // Note that asynchronous global rebalance calculation will reduce the controller rebalance
+ // delay. But it may cause more partition movements. This is because the partial rebalance will
+ // be performed with a stale baseline. The rebalance result would be an intermediate one and
+ // could be changed again when a new baseline is calculated.
+ // For more details, please refer to
+ // https://github.com/apache/helix/wiki/Weight-aware-Globally-Evenly-distributed-Rebalancer#rebalance-coordinator
+ //
+ // Default to be true.
+ GLOBAL_REBALANCE_ASYNC_MODE
}
public enum GlobalRebalancePreferenceKey {
@@ -121,6 +136,7 @@
.put(GlobalRebalancePreferenceKey.LESS_MOVEMENT, 1).build();
private final static int MAX_REBALANCE_PREFERENCE = 10;
private final static int MIN_REBALANCE_PREFERENCE = 0;
+ private final static boolean DEFAULT_GLOBAL_REBALANCE_ASYNC_MODE_ENABLED = true;
/**
* Instantiate for a specific cluster
@@ -823,6 +839,19 @@
}
/**
+ * Set the asynchronous global rebalance mode.
+ * @param isAsync true if the global rebalance should be performed asynchronously
+ */
+ public void setGlobalRebalanceAsyncMode(boolean isAsync) {
+ _record.setBooleanField(ClusterConfigProperty.GLOBAL_REBALANCE_ASYNC_MODE.name(), isAsync);
+ }
+
+ public boolean isGlobalRebalanceAsyncModeEnabled() {
+ return _record.getBooleanField(ClusterConfigProperty.GLOBAL_REBALANCE_ASYNC_MODE.name(),
+ DEFAULT_GLOBAL_REBALANCE_ASYNC_MODE_ENABLED);
+ }
+
+ /**
* Get IdealState rules defined in the cluster config.
* @return
*/
diff --git a/helix-core/src/test/java/org/apache/helix/model/TestClusterConfig.java b/helix-core/src/test/java/org/apache/helix/model/TestClusterConfig.java
index f6b269c..d688827 100644
--- a/helix-core/src/test/java/org/apache/helix/model/TestClusterConfig.java
+++ b/helix-core/src/test/java/org/apache/helix/model/TestClusterConfig.java
@@ -19,17 +19,17 @@
* under the License.
*/
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap;
import org.apache.helix.ZNRecord;
import org.testng.Assert;
import org.testng.annotations.Test;
-import java.util.Collections;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-
import static org.apache.helix.model.ClusterConfig.GlobalRebalancePreferenceKey.EVENNESS;
import static org.apache.helix.model.ClusterConfig.GlobalRebalancePreferenceKey.LESS_MOVEMENT;
@@ -239,4 +239,21 @@
ClusterConfig testConfig = new ClusterConfig("testConfig");
testConfig.setDefaultPartitionWeightMap(weightDataMap);
}
+
+ @Test
+ public void testAsyncGlobalRebalanceOption() {
+ ClusterConfig testConfig = new ClusterConfig("testConfig");
+ // Default value is true.
+ Assert.assertEquals(testConfig.isGlobalRebalanceAsyncModeEnabled(), true);
+ // Test get the option
+ testConfig.getRecord()
+ .setBooleanField(ClusterConfig.ClusterConfigProperty.GLOBAL_REBALANCE_ASYNC_MODE.name(),
+ false);
+ Assert.assertEquals(testConfig.isGlobalRebalanceAsyncModeEnabled(), false);
+ // Test set the option
+ testConfig.setGlobalRebalanceAsyncMode(true);
+ Assert.assertEquals(testConfig.getRecord()
+ .getBooleanField(ClusterConfig.ClusterConfigProperty.GLOBAL_REBALANCE_ASYNC_MODE.name(),
+ false), true);
+ }
}