Fix endless creation of best possible nodes triggered by unintended modification of cached best possible map. (#2970)
The baseline and bestPossible maps are type Map<String, ResourceAssignment>. When this map is stored in the cache by persistBaseline() and persistBestPossibleAssignment(), the ResourceAssignment objects can still be accessed and modified after calling the relevant persist method. This change creates a deep copy of the assignment map with new ResourceAssignment objects to prevent unintended modification of the cached assignment after the call to persist.
diff --git a/helix-core/src/main/java/org/apache/helix/controller/rebalancer/waged/AssignmentMetadataStore.java b/helix-core/src/main/java/org/apache/helix/controller/rebalancer/waged/AssignmentMetadataStore.java
index 07056aa..157bb0a 100644
--- a/helix-core/src/main/java/org/apache/helix/controller/rebalancer/waged/AssignmentMetadataStore.java
+++ b/helix-core/src/main/java/org/apache/helix/controller/rebalancer/waged/AssignmentMetadataStore.java
@@ -24,6 +24,7 @@
import java.util.HashMap;
import java.util.Map;
+import java.util.stream.Collectors;
import org.apache.helix.BucketDataAccessor;
import org.apache.helix.HelixException;
import org.apache.helix.HelixProperty;
@@ -133,11 +134,15 @@
* @param globalBaseline
*/
public synchronized void persistBaseline(Map<String, ResourceAssignment> globalBaseline) {
+ // Create defensive copy so that the in-memory assignment is not modified after it is persisted
+ Map<String, ResourceAssignment> baselineCopy = globalBaseline.entrySet().stream().collect(
+ Collectors.toMap(Map.Entry::getKey,
+ entry -> new ResourceAssignment(entry.getValue().getRecord())));
// write to metadata store
- persistAssignmentToMetadataStore(globalBaseline, _baselinePath, BASELINE_KEY);
+ persistAssignmentToMetadataStore(baselineCopy, _baselinePath, BASELINE_KEY);
// write to memory
getBaseline().clear();
- getBaseline().putAll(globalBaseline);
+ getBaseline().putAll(baselineCopy);
}
/**
@@ -146,11 +151,15 @@
* @param bestPossibleAssignment
*/
public synchronized void persistBestPossibleAssignment(Map<String, ResourceAssignment> bestPossibleAssignment) {
+ // Create defensive copy so that the in-memory assignment is not modified after it is persisted
+ Map<String, ResourceAssignment> bestPossibleAssignmentCopy = bestPossibleAssignment.entrySet().stream().collect(
+ Collectors.toMap(Map.Entry::getKey,
+ entry -> new ResourceAssignment(entry.getValue().getRecord())));
// write to metadata store
- persistAssignmentToMetadataStore(bestPossibleAssignment, _bestPossiblePath, BEST_POSSIBLE_KEY);
+ persistAssignmentToMetadataStore(bestPossibleAssignmentCopy, _bestPossiblePath, BEST_POSSIBLE_KEY);
// write to memory
getBestPossibleAssignment().clear();
- getBestPossibleAssignment().putAll(bestPossibleAssignment);
+ getBestPossibleAssignment().putAll(bestPossibleAssignmentCopy);
_bestPossibleVersion++;
_lastPersistedBestPossibleVersion = _bestPossibleVersion;
}
diff --git a/helix-core/src/test/java/org/apache/helix/integration/TestEndlessBestPossibleNodes.java b/helix-core/src/test/java/org/apache/helix/integration/TestEndlessBestPossibleNodes.java
new file mode 100644
index 0000000..bec7a0f
--- /dev/null
+++ b/helix-core/src/test/java/org/apache/helix/integration/TestEndlessBestPossibleNodes.java
@@ -0,0 +1,96 @@
+package org.apache.helix.integration;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+import org.apache.helix.ConfigAccessor;
+import org.apache.helix.TestHelper;
+import org.apache.helix.common.ZkTestBase;
+import org.apache.helix.controller.rebalancer.waged.WagedRebalancer;
+import org.apache.helix.integration.manager.ClusterControllerManager;
+import org.apache.helix.integration.manager.MockParticipantManager;
+import org.apache.helix.model.ClusterConfig;
+import org.apache.helix.model.IdealState;
+import org.apache.helix.tools.ClusterVerifiers.BestPossibleExternalViewVerifier;
+import org.testng.Assert;
+import org.testng.annotations.BeforeClass;
+import org.testng.annotations.Test;
+
+
+public class TestEndlessBestPossibleNodes extends ZkTestBase {
+
+ public static String CLUSTER_NAME = TestHelper.getTestClassName() + "_cluster";
+ public static int PARTICIPANT_COUNT = 12;
+ public static List<MockParticipantManager> _participants = new ArrayList<>();
+ public static ClusterControllerManager _controller;
+ public static ConfigAccessor _configAccessor;
+
+ @BeforeClass
+ public void beforeClass() {
+ System.out.println("Start test " + TestHelper.getTestClassName());
+ _gSetupTool.addCluster(CLUSTER_NAME, true);
+ for (int i = 0; i < PARTICIPANT_COUNT; i++) {
+ addParticipant("localhost_" + i);
+ }
+
+ String controllerName = CONTROLLER_PREFIX + "_0";
+ _controller = new ClusterControllerManager(ZK_ADDR, CLUSTER_NAME, controllerName);
+ _controller.syncStart();
+
+ _configAccessor = new ConfigAccessor(_gZkClient);
+ ClusterConfig clusterConfig = _configAccessor.getClusterConfig(CLUSTER_NAME);
+ clusterConfig.setInstanceCapacityKeys(Collections.singletonList("partcount"));
+ clusterConfig.setDefaultInstanceCapacityMap(Collections.singletonMap("partcount", 10000));
+ clusterConfig.setDefaultPartitionWeightMap(Collections.singletonMap("partcount", 1));
+ clusterConfig.setDelayRebalaceEnabled(true);
+ clusterConfig.setRebalanceDelayTime(57600000);
+ clusterConfig.setPersistBestPossibleAssignment(true);
+ _configAccessor.setClusterConfig(CLUSTER_NAME, clusterConfig);
+ }
+
+ // This test was constructed to capture the bug described in issue 2971
+ // https://github.com/apache/helix/issues/2971
+ @Test
+ public void testEndlessBestPossibleNodes() throws Exception {
+ int numPartition = 10;
+ BestPossibleExternalViewVerifier verifier =
+ new BestPossibleExternalViewVerifier.Builder(CLUSTER_NAME).setZkAddr(ZK_ADDR)
+ .setWaitTillVerify(TestHelper.DEFAULT_REBALANCE_PROCESSING_WAIT_TIME).build();
+
+ // Create 1 WAGED Resource
+ String firstDB = "InPlaceMigrationTestDB3";
+ _gSetupTool.addResourceToCluster(CLUSTER_NAME, firstDB, numPartition, "LeaderStandby",
+ IdealState.RebalanceMode.FULL_AUTO.name(), null);
+ IdealState idealStateOne =
+ _gSetupTool.getClusterManagementTool().getResourceIdealState(CLUSTER_NAME, firstDB);
+ idealStateOne.setMinActiveReplicas(2);
+ idealStateOne.setRebalancerClassName(WagedRebalancer.class.getName());
+ _gSetupTool.getClusterManagementTool().setResourceIdealState(CLUSTER_NAME, firstDB, idealStateOne);
+ _gSetupTool.rebalanceStorageCluster(CLUSTER_NAME, firstDB, 3);
+ Assert.assertTrue(verifier.verifyByPolling());
+
+ // Disable instances so delay rebalance overwrite is required due to min active
+ for (int i = 0; i < PARTICIPANT_COUNT/2; i++) {
+ _gSetupTool.getClusterManagementTool().enableInstance(CLUSTER_NAME, _participants.get(i).getInstanceName(), false);
+ }
+
+ // Add new instance to cause partial rebalance to calculate a new best possible
+ addParticipant("newInstance_0");
+
+ // Sleep to let pipeline run and ZK writes occur
+ Thread.sleep(5000);
+
+ // There should only be 2 best possibles created (children will be 0, 1, LAST_WRITE, and LAST_SUCCESSFUL_WRITE)
+ int childCount = _gZkClient.getChildren("/" + CLUSTER_NAME + "/ASSIGNMENT_METADATA/BEST_POSSIBLE").size();
+ Assert.assertTrue(childCount > 0);
+ Assert.assertTrue(childCount < 5, "Child count was " + childCount);
+ }
+
+ public MockParticipantManager addParticipant(String instanceName) {
+ _gSetupTool.addInstanceToCluster(CLUSTER_NAME, instanceName);
+ MockParticipantManager participant = new MockParticipantManager(ZK_ADDR, CLUSTER_NAME, instanceName);
+ participant.syncStart();
+ _participants.add(participant);
+ return participant;
+ }
+}