Decouple the event type and the scheduled rebalance cache refresh option. (#638)
The previous design is that both on-demand and periodic rebalance scheduling task will request for a cache refresh. This won't be always true moving forward.
For example, the WAGED rebalancer async baseline calculating requests for a scheduled rebalance. But cache refresh won't be necessary.
This PR does not change any business logic. It prepares for future feature change.
This PR ensures strict backward compatibility.
diff --git a/helix-core/src/main/java/org/apache/helix/controller/GenericHelixController.java b/helix-core/src/main/java/org/apache/helix/controller/GenericHelixController.java
index 39a5ad7..3ccea41 100644
--- a/helix-core/src/main/java/org/apache/helix/controller/GenericHelixController.java
+++ b/helix-core/src/main/java/org/apache/helix/controller/GenericHelixController.java
@@ -25,6 +25,7 @@
import java.util.HashMap;
import java.util.List;
import java.util.Map;
+import java.util.Optional;
import java.util.Set;
import java.util.Timer;
import java.util.TimerTask;
@@ -221,17 +222,29 @@
class RebalanceTask extends TimerTask {
final HelixManager _manager;
final ClusterEventType _clusterEventType;
+ private final Optional<Boolean> _shouldRefreshCacheOption;
private long _nextRebalanceTime;
public RebalanceTask(HelixManager manager, ClusterEventType clusterEventType) {
this(manager, clusterEventType, -1);
-
}
- public RebalanceTask(HelixManager manager, ClusterEventType clusterEventType, long nextRebalanceTime) {
+ public RebalanceTask(HelixManager manager, ClusterEventType clusterEventType,
+ long nextRebalanceTime) {
+ this(manager, clusterEventType, nextRebalanceTime, Optional.empty());
+ }
+
+ public RebalanceTask(HelixManager manager, ClusterEventType clusterEventType,
+ long nextRebalanceTime, boolean shouldRefreshCache) {
+ this(manager, clusterEventType, nextRebalanceTime, Optional.of(shouldRefreshCache));
+ }
+
+ private RebalanceTask(HelixManager manager, ClusterEventType clusterEventType,
+ long nextRebalanceTime, Optional<Boolean> shouldRefreshCacheOption) {
_manager = manager;
_clusterEventType = clusterEventType;
_nextRebalanceTime = nextRebalanceTime;
+ _shouldRefreshCacheOption = shouldRefreshCacheOption;
}
public long getNextRebalanceTime() {
@@ -241,8 +254,9 @@
@Override
public void run() {
try {
- if (_clusterEventType.equals(ClusterEventType.PeriodicalRebalance) || _clusterEventType
- .equals(ClusterEventType.OnDemandRebalance)) {
+ if (_shouldRefreshCacheOption.orElse(
+ _clusterEventType.equals(ClusterEventType.PeriodicalRebalance) || _clusterEventType
+ .equals(ClusterEventType.OnDemandRebalance))) {
requestDataProvidersFullRefresh();
HelixDataAccessor accessor = _manager.getHelixDataAccessor();
@@ -360,7 +374,17 @@
* Schedule an on demand rebalance pipeline.
* @param delay
*/
+ @Deprecated
public void scheduleOnDemandRebalance(long delay) {
+ scheduleOnDemandRebalance(delay, true);
+ }
+
+ /**
+ * Schedule an on demand rebalance pipeline.
+ * @param delay
+ * @param shouldRefreshCache true if refresh the cache before scheduling a rebalance.
+ */
+ public void scheduleOnDemandRebalance(long delay, boolean shouldRefreshCache) {
if (_helixManager == null) {
logger.error("Failed to schedule a future pipeline run for cluster {}. Helix manager is null!",
_clusterName);
@@ -378,7 +402,8 @@
}
RebalanceTask newTask =
- new RebalanceTask(_helixManager, ClusterEventType.OnDemandRebalance, rebalanceTime);
+ new RebalanceTask(_helixManager, ClusterEventType.OnDemandRebalance, rebalanceTime,
+ shouldRefreshCache);
_onDemandRebalanceTimer.schedule(newTask, delay);
logger.info("Scheduled instant pipeline run for cluster {}." , _helixManager.getClusterName());
@@ -1233,4 +1258,4 @@
eventThread.setDaemon(true);
eventThread.start();
}
-}
\ No newline at end of file
+}
diff --git a/helix-core/src/main/java/org/apache/helix/util/RebalanceUtil.java b/helix-core/src/main/java/org/apache/helix/util/RebalanceUtil.java
index 18163bd..050762d 100644
--- a/helix-core/src/main/java/org/apache/helix/util/RebalanceUtil.java
+++ b/helix-core/src/main/java/org/apache/helix/util/RebalanceUtil.java
@@ -143,6 +143,11 @@
}
public static void scheduleOnDemandPipeline(String clusterName, long delay) {
+ scheduleOnDemandPipeline(clusterName, delay, true);
+ }
+
+ public static void scheduleOnDemandPipeline(String clusterName, long delay,
+ boolean shouldRefreshCache) {
if (clusterName == null) {
LOG.error("Failed to issue a pipeline run. ClusterName is null.");
return;
@@ -153,7 +158,7 @@
}
GenericHelixController controller = GenericHelixController.getController(clusterName);
if (controller != null) {
- controller.scheduleOnDemandRebalance(delay);
+ controller.scheduleOnDemandRebalance(delay, shouldRefreshCache);
} else {
LOG.error("Failed to issue a pipeline. Controller for cluster {} does not exist.",
clusterName);