| package org.apache.helix.integration.rebalancer; |
| |
| import java.util.ArrayList; |
| import java.util.Arrays; |
| import java.util.Collection; |
| import java.util.Collections; |
| import java.util.Date; |
| import java.util.HashMap; |
| import java.util.HashSet; |
| import java.util.List; |
| import java.util.Map; |
| import java.util.Set; |
| import java.util.UUID; |
| import java.util.concurrent.atomic.AtomicInteger; |
| import java.util.stream.Collectors; |
| |
| import com.google.common.collect.ImmutableMap; |
| import com.google.common.collect.ImmutableSet; |
| import org.apache.helix.ConfigAccessor; |
| import org.apache.helix.HelixAdmin; |
| import org.apache.helix.HelixConstants; |
| import org.apache.helix.HelixDataAccessor; |
| import org.apache.helix.HelixException; |
| import org.apache.helix.HelixManager; |
| import org.apache.helix.HelixManagerFactory; |
| import org.apache.helix.HelixRollbackException; |
| import org.apache.helix.InstanceType; |
| import org.apache.helix.NotificationContext; |
| import org.apache.helix.PropertyKey; |
| import org.apache.helix.PropertyType; |
| import org.apache.helix.TestHelper; |
| import org.apache.helix.api.listeners.InstanceConfigChangeListener; |
| import org.apache.helix.common.ZkTestBase; |
| import org.apache.helix.constants.InstanceConstants; |
| import org.apache.helix.controller.rebalancer.strategy.CrushEdRebalanceStrategy; |
| import org.apache.helix.controller.rebalancer.waged.AssignmentMetadataStore; |
| import org.apache.helix.integration.manager.ClusterControllerManager; |
| import org.apache.helix.integration.manager.MockParticipantManager; |
| import org.apache.helix.manager.zk.ZKHelixAdmin; |
| import org.apache.helix.manager.zk.ZKHelixDataAccessor; |
| import org.apache.helix.manager.zk.ZKHelixManager; |
| import org.apache.helix.manager.zk.ZkBucketDataAccessor; |
| import org.apache.helix.model.BuiltInStateModelDefinitions; |
| import org.apache.helix.model.ClusterConfig; |
| import org.apache.helix.model.ExternalView; |
| import org.apache.helix.model.IdealState; |
| import org.apache.helix.model.InstanceConfig; |
| import org.apache.helix.model.Message; |
| import org.apache.helix.model.ResourceAssignment; |
| import org.apache.helix.model.StateModelDefinition; |
| import org.apache.helix.participant.StateMachineEngine; |
| import org.apache.helix.participant.statemachine.StateModel; |
| import org.apache.helix.participant.statemachine.StateModelFactory; |
| import org.apache.helix.participant.statemachine.StateModelInfo; |
| import org.apache.helix.participant.statemachine.Transition; |
| import org.apache.helix.spectator.RoutingTableProvider; |
| import org.apache.helix.tools.ClusterVerifiers.BestPossibleExternalViewVerifier; |
| import org.apache.helix.tools.ClusterVerifiers.StrictMatchExternalViewVerifier; |
| import org.apache.helix.tools.ClusterVerifiers.ZkHelixClusterVerifier; |
| import org.apache.zookeeper.Watcher; |
| import org.slf4j.Logger; |
| import org.slf4j.LoggerFactory; |
| import org.testng.Assert; |
| import org.testng.annotations.AfterClass; |
| import org.testng.annotations.BeforeClass; |
| import org.testng.annotations.Test; |
| |
| |
| public class TestInstanceOperation extends ZkTestBase { |
| private static final Logger LOG = LoggerFactory.getLogger(TestHelper.class); |
| public static final int TIMEOUT = 10000; |
| private final int ZONE_COUNT = 4; |
| protected final int START_NUM_NODE = 10; |
| protected static final int START_PORT = 12918; |
| private static int _nextStartPort = START_PORT; |
| protected static final int PARTITIONS = 20; |
| |
| protected final String CLASS_NAME = getShortClassName(); |
| protected final String CLUSTER_NAME = CLUSTER_PREFIX + "_" + CLASS_NAME; |
| private final String TEST_CAPACITY_KEY = "TestCapacityKey"; |
| private final int TEST_CAPACITY_VALUE = 100; |
| protected static final String ZONE = "zone"; |
| protected static final String HOST = "host"; |
| protected static final String LOGICAL_ID = "logicalId"; |
| protected static final String TOPOLOGY = String.format("%s/%s/%s", ZONE, HOST, LOGICAL_ID); |
| protected static final ImmutableSet<String> TOP_STATE_SET = |
| ImmutableSet.of("MASTER"); |
| protected static final ImmutableSet<String> SECONDARY_STATE_SET = |
| ImmutableSet.of("SLAVE", "STANDBY"); |
| protected static final ImmutableSet<String> ACCEPTABLE_STATE_SET = |
| ImmutableSet.of("MASTER", "LEADER", "SLAVE", "STANDBY"); |
| private int REPLICA = 3; |
| protected ClusterControllerManager _controller; |
| private HelixManager _spectator; |
| private RoutingTableProvider _routingTableProviderDefault; |
| private RoutingTableProvider _routingTableProviderEV; |
| private RoutingTableProvider _routingTableProviderCS; |
| List<MockParticipantManager> _participants = new ArrayList<>(); |
| List<String> _participantNames = new ArrayList<>(); |
| private Set<String> _allDBs = new HashSet<>(); |
| private ZkHelixClusterVerifier _clusterVerifier; |
| private BestPossibleExternalViewVerifier _bestPossibleClusterVerifier; |
| private ConfigAccessor _configAccessor; |
| private long _stateModelDelay = 3L; |
| |
| private final long DEFAULT_RESOURCE_DELAY_TIME = 1800000L; |
| private HelixAdmin _admin; |
| protected AssignmentMetadataStore _assignmentMetadataStore; |
| HelixDataAccessor _dataAccessor; |
| |
| @BeforeClass |
| public void beforeClass() throws Exception { |
| System.out.println("START " + CLASS_NAME + " at " + new Date(System.currentTimeMillis())); |
| |
| _gSetupTool.addCluster(CLUSTER_NAME, true); |
| |
| for (int i = 0; i < START_NUM_NODE; i++) { |
| String participantName = PARTICIPANT_PREFIX + "_" + _nextStartPort; |
| addParticipant(participantName); |
| } |
| |
| // start controller |
| String controllerName = CONTROLLER_PREFIX + "_0"; |
| _controller = new ClusterControllerManager(ZK_ADDR, CLUSTER_NAME, controllerName); |
| _controller.syncStart(); |
| _clusterVerifier = new StrictMatchExternalViewVerifier.Builder(CLUSTER_NAME).setZkAddr(ZK_ADDR) |
| .setDeactivatedNodeAwareness(true) |
| .setResources(_allDBs) |
| .setWaitTillVerify(TestHelper.DEFAULT_REBALANCE_PROCESSING_WAIT_TIME) |
| .build(); |
| _bestPossibleClusterVerifier = new BestPossibleExternalViewVerifier.Builder(CLUSTER_NAME).setZkAddr(ZK_ADDR) |
| .setResources(_allDBs) |
| .setWaitTillVerify(TestHelper.DEFAULT_REBALANCE_PROCESSING_WAIT_TIME) |
| .build(); |
| enablePersistBestPossibleAssignment(_gZkClient, CLUSTER_NAME, true); |
| _configAccessor = new ConfigAccessor(_gZkClient); |
| _dataAccessor = new ZKHelixDataAccessor(CLUSTER_NAME, _baseAccessor); |
| |
| // start spectator |
| _spectator = |
| HelixManagerFactory.getZKHelixManager(CLUSTER_NAME, "spectator", InstanceType.SPECTATOR, |
| ZK_ADDR); |
| _spectator.connect(); |
| _routingTableProviderDefault = new RoutingTableProvider(_spectator); |
| _routingTableProviderEV = new RoutingTableProvider(_spectator, PropertyType.EXTERNALVIEW); |
| _routingTableProviderCS = new RoutingTableProvider(_spectator, PropertyType.CURRENTSTATES); |
| |
| setupClusterConfig(); |
| |
| createTestDBs(DEFAULT_RESOURCE_DELAY_TIME); |
| |
| setUpWagedBaseline(); |
| |
| _admin = new ZKHelixAdmin(_gZkClient); |
| } |
| |
| @AfterClass |
| public void afterClass() { |
| // Drop all DBs |
| for (String db : _allDBs) { |
| _gSetupTool.dropResourceFromCluster(CLUSTER_NAME, db); |
| } |
| |
| Assert.assertTrue(_clusterVerifier.verifyByPolling()); |
| |
| for (MockParticipantManager p : _participants) { |
| p.syncStop(); |
| } |
| _controller.syncStop(); |
| _routingTableProviderDefault.shutdown(); |
| _routingTableProviderEV.shutdown(); |
| _routingTableProviderCS.shutdown(); |
| _spectator.disconnect(); |
| } |
| |
| private void setupClusterConfig() { |
| _stateModelDelay = 3L; |
| ClusterConfig clusterConfig = _configAccessor.getClusterConfig(CLUSTER_NAME); |
| clusterConfig.stateTransitionCancelEnabled(true); |
| clusterConfig.setDelayRebalaceEnabled(true); |
| clusterConfig.setRebalanceDelayTime(1800000L); |
| _configAccessor.setClusterConfig(CLUSTER_NAME, clusterConfig); |
| |
| Assert.assertTrue(_clusterVerifier.verifyByPolling()); |
| } |
| |
| private void enabledTopologyAwareRebalance() { |
| ClusterConfig clusterConfig = _configAccessor.getClusterConfig(CLUSTER_NAME); |
| clusterConfig.setTopology(TOPOLOGY); |
| clusterConfig.setFaultZoneType(ZONE); |
| clusterConfig.setTopologyAwareEnabled(true); |
| _configAccessor.setClusterConfig(CLUSTER_NAME, clusterConfig); |
| |
| Assert.assertTrue(_clusterVerifier.verifyByPolling()); |
| } |
| |
| private void disableTopologyAwareRebalance() { |
| ClusterConfig clusterConfig = _configAccessor.getClusterConfig(CLUSTER_NAME); |
| clusterConfig.setTopologyAwareEnabled(false); |
| clusterConfig.setTopology(null); |
| clusterConfig.setFaultZoneType(null); |
| _configAccessor.setClusterConfig(CLUSTER_NAME, clusterConfig); |
| |
| Assert.assertTrue(_clusterVerifier.verifyByPolling()); |
| } |
| |
| private void removeOfflineOrInactiveInstances() { |
| // Remove all instances that are not live, disabled, or in SWAP_IN state. |
| for (int i = 0; i < _participants.size(); i++) { |
| String participantName = _participantNames.get(i); |
| InstanceConfig instanceConfig = |
| _gSetupTool.getClusterManagementTool().getInstanceConfig(CLUSTER_NAME, participantName); |
| if (!_participants.get(i).isConnected() || !instanceConfig.getInstanceEnabled() |
| || instanceConfig.getInstanceOperation() |
| .equals(InstanceConstants.InstanceOperation.SWAP_IN)) { |
| if (_participants.get(i).isConnected()) { |
| _participants.get(i).syncStop(); |
| } |
| _gSetupTool.getClusterManagementTool().dropInstance(CLUSTER_NAME, instanceConfig); |
| _participantNames.remove(i); |
| _participants.remove(i); |
| i--; |
| } |
| } |
| |
| Assert.assertTrue(_clusterVerifier.verifyByPolling()); |
| } |
| |
| @Test |
| public void testEvacuate() throws Exception { |
| System.out.println("START TestInstanceOperation.testEvacuate() at " + new Date(System.currentTimeMillis())); |
| |
| // Add semi-auto DBs |
| String semiAutoDB = "SemiAutoTestDB_1"; |
| createDBInSemiAuto(_gSetupTool, CLUSTER_NAME, semiAutoDB, |
| _participants.stream().map(ZKHelixManager::getInstanceName).collect(Collectors.toList()), |
| BuiltInStateModelDefinitions.OnlineOffline.name(), 1, _participants.size()); |
| Assert.assertTrue(_clusterVerifier.verifyByPolling()); |
| |
| // EV should contain all participants, check resources one by one |
| Map<String, ExternalView> assignment = getEVs(); |
| for (String resource : _allDBs) { |
| Assert.assertTrue(getParticipantsInEv(assignment.get(resource)).containsAll(_participantNames)); |
| } |
| |
| // evacuated instance |
| String instanceToEvacuate = _participants.get(0).getInstanceName(); |
| _gSetupTool.getClusterManagementTool() |
| .setInstanceOperation(CLUSTER_NAME, instanceToEvacuate, InstanceConstants.InstanceOperation.EVACUATE); |
| |
| Assert.assertTrue(_clusterVerifier.verifyByPolling()); |
| |
| // New ev should contain all instances but the evacuated one |
| assignment = getEVs(); |
| List<String> currentActiveInstances = |
| _participantNames.stream().filter(n -> !n.equals(instanceToEvacuate)).collect(Collectors.toList()); |
| for (String resource : _allDBs) { |
| validateAssignmentInEv(assignment.get(resource)); |
| Set<String> newPAssignedParticipants = getParticipantsInEv(assignment.get(resource)); |
| Assert.assertFalse(newPAssignedParticipants.contains(instanceToEvacuate)); |
| Assert.assertTrue(newPAssignedParticipants.containsAll(currentActiveInstances)); |
| } |
| |
| Assert.assertTrue(_admin.isEvacuateFinished(CLUSTER_NAME, instanceToEvacuate)); |
| Assert.assertTrue(_admin.isReadyForPreparingJoiningCluster(CLUSTER_NAME, instanceToEvacuate)); |
| |
| // Drop semi-auto DBs |
| _gSetupTool.dropResourceFromCluster(CLUSTER_NAME, semiAutoDB); |
| Assert.assertTrue(_clusterVerifier.verifyByPolling()); |
| |
| // Compare the current ev with the previous one, it should be exactly the same since the baseline should not change |
| // after the instance is dropped. |
| Assert.assertTrue(_clusterVerifier.verifyByPolling()); |
| Assert.assertEquals(getEVs(), assignment); |
| } |
| |
| @Test(dependsOnMethods = "testEvacuate") |
| public void testRevertEvacuation() throws Exception { |
| System.out.println("START TestInstanceOperation.testRevertEvacuation() at " + new Date(System.currentTimeMillis())); |
| // revert an evacuate instance |
| String instanceToEvacuate = _participants.get(0).getInstanceName(); |
| _gSetupTool.getClusterManagementTool().setInstanceOperation(CLUSTER_NAME, instanceToEvacuate, |
| InstanceConstants.InstanceOperation.ENABLE); |
| |
| Assert.assertTrue( |
| _gSetupTool.getClusterManagementTool().getInstanceConfig(CLUSTER_NAME, instanceToEvacuate) |
| .getInstanceEnabled()); |
| Assert.assertTrue(_clusterVerifier.verifyByPolling()); |
| |
| // EV should contain all participants, check resources one by one |
| Map<String, ExternalView> assignment = getEVs(); |
| for (String resource : _allDBs) { |
| Assert.assertTrue(getParticipantsInEv(assignment.get(resource)).containsAll(_participantNames)); |
| validateAssignmentInEv(assignment.get(resource)); |
| } |
| } |
| |
| @Test(dependsOnMethods = "testRevertEvacuation") |
| public void testAddingNodeWithEvacuationTag() throws Exception { |
| System.out.println("START TestInstanceOperation.testAddingNodeWithEvacuationTag() at " + new Date(System.currentTimeMillis())); |
| // first disable and instance, and wait for all replicas to be moved out |
| String mockNewInstance = _participants.get(0).getInstanceName(); |
| // This is using a deprecated method to ensure that the disabling still takes precedence over the InstanceOperation when being set |
| // to false. |
| _gSetupTool.getClusterManagementTool() |
| .enableInstance(CLUSTER_NAME, mockNewInstance, false); |
| Assert.assertTrue(_clusterVerifier.verifyByPolling()); |
| // ev should contain all instances but the disabled one |
| Map<String, ExternalView> assignment = getEVs(); |
| List<String> currentActiveInstances = |
| _participantNames.stream().filter(n -> !n.equals(mockNewInstance)).collect(Collectors.toList()); |
| for (String resource : _allDBs) { |
| validateAssignmentInEv(assignment.get(resource), REPLICA-1); |
| Set<String> newPAssignedParticipants = getParticipantsInEv(assignment.get(resource)); |
| Assert.assertFalse(newPAssignedParticipants.contains(mockNewInstance)); |
| Assert.assertTrue(newPAssignedParticipants.containsAll(currentActiveInstances)); |
| } |
| |
| // add evacuate tag and enable instance |
| // Because HELIX_ENABLED is set to false, getInstanceOperation still returns DISABLE |
| _gSetupTool.getClusterManagementTool() |
| .setInstanceOperation(CLUSTER_NAME, mockNewInstance, InstanceConstants.InstanceOperation.EVACUATE); |
| |
| // enable instance so InstanceOperation is no longer overriden with DISABLE |
| _gSetupTool.getClusterManagementTool().enableInstance(CLUSTER_NAME, mockNewInstance, true); |
| |
| //ev should be the same |
| assignment = getEVs(); |
| currentActiveInstances = |
| _participantNames.stream().filter(n -> !n.equals(mockNewInstance)).collect(Collectors.toList()); |
| for (String resource : _allDBs) { |
| validateAssignmentInEv(assignment.get(resource), REPLICA-1); |
| Set<String> newPAssignedParticipants = getParticipantsInEv(assignment.get(resource)); |
| Assert.assertFalse(newPAssignedParticipants.contains(mockNewInstance)); |
| Assert.assertTrue(newPAssignedParticipants.containsAll(currentActiveInstances)); |
| } |
| |
| // now remove operation tag |
| String instanceToEvacuate = _participants.get(0).getInstanceName(); |
| _gSetupTool.getClusterManagementTool() |
| .setInstanceOperation(CLUSTER_NAME, instanceToEvacuate, null); |
| |
| Assert.assertTrue(_clusterVerifier.verifyByPolling()); |
| |
| // EV should contain all participants, check resources one by one |
| assignment = getEVs(); |
| for (String resource : _allDBs) { |
| Assert.assertTrue(getParticipantsInEv(assignment.get(resource)).containsAll(_participantNames)); |
| validateAssignmentInEv(assignment.get(resource)); |
| } |
| } |
| |
| @Test(dependsOnMethods = "testAddingNodeWithEvacuationTag") |
| public void testNodeSwapNoTopologySetup() throws Exception { |
| System.out.println("START TestInstanceOperation.testNodeSwapNoTopologySetup() at " + new Date( |
| System.currentTimeMillis())); |
| removeOfflineOrInactiveInstances(); |
| |
| String instanceToSwapOutName = _participants.get(0).getInstanceName(); |
| |
| // Add instance with InstanceOperation set to SWAP_IN as default |
| // The instance will be added with UNKNOWN because the logicalId will not match the |
| // swap out instance since the topology configs are not set. |
| String instanceToSwapInName = PARTICIPANT_PREFIX + "_" + _nextStartPort; |
| InstanceConfig instanceToSwapOutInstanceConfig = _gSetupTool.getClusterManagementTool() |
| .getInstanceConfig(CLUSTER_NAME, instanceToSwapOutName); |
| addParticipant(instanceToSwapInName, instanceToSwapOutInstanceConfig.getLogicalId(LOGICAL_ID), |
| instanceToSwapOutInstanceConfig.getDomainAsMap().get(ZONE), |
| InstanceConstants.InstanceOperation.SWAP_IN, -1); |
| |
| Assert.assertEquals( |
| _gSetupTool.getClusterManagementTool().getInstanceConfig(CLUSTER_NAME, instanceToSwapInName) |
| .getInstanceOperation(), InstanceConstants.InstanceOperation.UNKNOWN); |
| } |
| |
| @Test(dependsOnMethods = "testNodeSwapNoTopologySetup") |
| public void testAddingNodeWithEnableInstanceOperation() throws Exception { |
| System.out.println( |
| "START TestInstanceOperation.testAddingNodeWithEnableInstanceOperation() at " + new Date( |
| System.currentTimeMillis())); |
| |
| enabledTopologyAwareRebalance(); |
| removeOfflineOrInactiveInstances(); |
| |
| String instanceToSwapOutName = _participants.get(0).getInstanceName(); |
| InstanceConfig instanceToSwapOutInstanceConfig = _gSetupTool.getClusterManagementTool() |
| .getInstanceConfig(CLUSTER_NAME, instanceToSwapOutName); |
| |
| // Add instance with InstanceOperation set to ENABLE |
| // The instance should be added with UNKNOWN since there is already an instance with |
| // the same logicalId in the cluster and this instance is not being set to SWAP_IN when |
| // added. |
| String instanceToSwapInName = PARTICIPANT_PREFIX + "_" + _nextStartPort; |
| addParticipant(instanceToSwapInName, instanceToSwapOutInstanceConfig.getLogicalId(LOGICAL_ID), |
| instanceToSwapOutInstanceConfig.getDomainAsMap().get(ZONE), |
| InstanceConstants.InstanceOperation.ENABLE, -1); |
| |
| Assert.assertEquals( |
| _gSetupTool.getClusterManagementTool().getInstanceConfig(CLUSTER_NAME, instanceToSwapInName) |
| .getInstanceOperation(), InstanceConstants.InstanceOperation.UNKNOWN); |
| } |
| |
| @Test(dependsOnMethods = "testAddingNodeWithEnableInstanceOperation") |
| public void testNodeSwapWithNoSwapOutNode() throws Exception { |
| System.out.println("START TestInstanceOperation.testNodeSwapWithNoSwapOutNode() at " + new Date( |
| System.currentTimeMillis())); |
| |
| removeOfflineOrInactiveInstances(); |
| |
| // Add new instance with InstanceOperation set to SWAP_IN |
| // The instance should be added with UNKNOWN since there is not an instance with a matching |
| // logicalId in the cluster to swap with. |
| String instanceToSwapInName = PARTICIPANT_PREFIX + "_" + _nextStartPort; |
| addParticipant(instanceToSwapInName, "1000", "zone_1000", |
| InstanceConstants.InstanceOperation.SWAP_IN, -1); |
| |
| Assert.assertEquals( |
| _gSetupTool.getClusterManagementTool().getInstanceConfig(CLUSTER_NAME, instanceToSwapInName) |
| .getInstanceOperation(), InstanceConstants.InstanceOperation.UNKNOWN); |
| } |
| |
| @Test(dependsOnMethods = "testNodeSwapWithNoSwapOutNode") |
| public void testNodeSwapSwapInNodeNoInstanceOperationEnabled() throws Exception { |
| System.out.println( |
| "START TestInstanceOperation.testNodeSwapSwapInNodeNoInstanceOperationEnabled() at " |
| + new Date(System.currentTimeMillis())); |
| |
| removeOfflineOrInactiveInstances(); |
| |
| String instanceToSwapOutName = _participants.get(0).getInstanceName(); |
| InstanceConfig instanceToSwapOutInstanceConfig = _gSetupTool.getClusterManagementTool() |
| .getInstanceConfig(CLUSTER_NAME, instanceToSwapOutName); |
| |
| // Add instance with same logicalId with InstanceOperation unset, this is the same as default |
| // which is ENABLE. |
| // The instance should be set to UNKNOWN since there is already a matching logicalId in the cluster. |
| String instanceToSwapInName = PARTICIPANT_PREFIX + "_" + _nextStartPort; |
| addParticipant(instanceToSwapInName, instanceToSwapOutInstanceConfig.getLogicalId(LOGICAL_ID), |
| instanceToSwapOutInstanceConfig.getDomainAsMap().get(ZONE), null, -1); |
| |
| Assert.assertEquals( |
| _gSetupTool.getClusterManagementTool().getInstanceConfig(CLUSTER_NAME, instanceToSwapInName) |
| .getInstanceOperation(), InstanceConstants.InstanceOperation.UNKNOWN); |
| |
| // Setting the InstanceOperation to SWAP_IN should work because there is a matching logicalId in |
| // the cluster and the InstanceCapacityWeights and FaultZone match. |
| _gSetupTool.getClusterManagementTool().setInstanceOperation(CLUSTER_NAME, instanceToSwapInName, |
| InstanceConstants.InstanceOperation.SWAP_IN); |
| |
| Assert.assertTrue(_bestPossibleClusterVerifier.verifyByPolling()); |
| Assert.assertTrue(_gSetupTool.getClusterManagementTool() |
| .completeSwapIfPossible(CLUSTER_NAME, instanceToSwapOutName, false)); |
| Assert.assertTrue(_clusterVerifier.verifyByPolling()); |
| } |
| |
| @Test(expectedExceptions = HelixException.class, dependsOnMethods = "testNodeSwapSwapInNodeNoInstanceOperationEnabled") |
| public void testNodeSwapSwapInNodeWithAlreadySwappingPair() throws Exception { |
| System.out.println( |
| "START TestInstanceOperation.testNodeSwapSwapInNodeWithAlreadySwappingPair() at " |
| + new Date(System.currentTimeMillis())); |
| |
| removeOfflineOrInactiveInstances(); |
| |
| String instanceToSwapOutName = _participants.get(0).getInstanceName(); |
| InstanceConfig instanceToSwapOutInstanceConfig = _gSetupTool.getClusterManagementTool() |
| .getInstanceConfig(CLUSTER_NAME, instanceToSwapOutName); |
| |
| // Add instance with InstanceOperation set to SWAP_IN |
| String instanceToSwapInName = PARTICIPANT_PREFIX + "_" + _nextStartPort; |
| addParticipant(instanceToSwapInName, instanceToSwapOutInstanceConfig.getLogicalId(LOGICAL_ID), |
| instanceToSwapOutInstanceConfig.getDomainAsMap().get(ZONE), |
| InstanceConstants.InstanceOperation.SWAP_IN, -1); |
| |
| // Add another instance with InstanceOperation set to SWAP_IN with same logicalId as previously |
| // added SWAP_IN instance. |
| String secondInstanceToSwapInName = PARTICIPANT_PREFIX + "_" + _nextStartPort; |
| addParticipant(secondInstanceToSwapInName, |
| instanceToSwapOutInstanceConfig.getLogicalId(LOGICAL_ID), |
| instanceToSwapOutInstanceConfig.getDomainAsMap().get(ZONE), |
| InstanceConstants.InstanceOperation.SWAP_IN, -1); |
| |
| // Instance should be UNKNOWN since there was already a swapping pair. |
| Assert.assertEquals(_gSetupTool.getClusterManagementTool() |
| .getInstanceConfig(CLUSTER_NAME, secondInstanceToSwapInName).getInstanceOperation(), |
| InstanceConstants.InstanceOperation.UNKNOWN); |
| |
| // Try to set the InstanceOperation to SWAP_IN, it should throw an exception since there is already |
| // a swapping pair. |
| _gSetupTool.getClusterManagementTool() |
| .setInstanceOperation(CLUSTER_NAME, secondInstanceToSwapInName, |
| InstanceConstants.InstanceOperation.SWAP_IN); |
| } |
| |
| @Test(dependsOnMethods = "testNodeSwapSwapInNodeWithAlreadySwappingPair") |
| public void testNodeSwap() throws Exception { |
| System.out.println( |
| "START TestInstanceOperation.testNodeSwap() at " + new Date(System.currentTimeMillis())); |
| |
| removeOfflineOrInactiveInstances(); |
| |
| Map<String, String> swapOutInstancesToSwapInInstances = new HashMap<>(); |
| |
| String instanceToSwapOutName = _participants.get(0).getInstanceName(); |
| InstanceConfig instanceToSwapOutInstanceConfig = _gSetupTool.getClusterManagementTool() |
| .getInstanceConfig(CLUSTER_NAME, instanceToSwapOutName); |
| |
| Assert.assertTrue(_clusterVerifier.verifyByPolling()); |
| |
| String resourceToDisablePartition = _allDBs.iterator().next(); |
| // Disable 1 partition that is assigned to the instance that will be swapped out. |
| getPartitionsAndStatesOnInstance(getEVs(), instanceToSwapOutName).entrySet().stream() |
| .filter(entry -> entry.getKey().startsWith(resourceToDisablePartition)).findFirst() |
| .ifPresent(entry -> { |
| String partition = entry.getKey(); |
| instanceToSwapOutInstanceConfig.setInstanceEnabledForPartition(resourceToDisablePartition, |
| partition, false); |
| }); |
| _gSetupTool.getClusterManagementTool() |
| .setInstanceConfig(CLUSTER_NAME, instanceToSwapOutName, instanceToSwapOutInstanceConfig); |
| |
| Assert.assertTrue(_clusterVerifier.verifyByPolling()); |
| |
| // Store original EV |
| Map<String, ExternalView> originalEVs = getEVs(); |
| |
| validateEVsCorrect(getEVs(), originalEVs, swapOutInstancesToSwapInInstances, |
| Collections.emptySet(), Collections.emptySet()); |
| |
| // Create a custom change listener to check if the throttles are enabled after the swap is completed. |
| CustomIndividualInstanceConfigChangeListener instanceToSwapInInstanceConfigListener = |
| new CustomIndividualInstanceConfigChangeListener(); |
| |
| // Add instance with InstanceOperation set to SWAP_IN |
| String instanceToSwapInName = PARTICIPANT_PREFIX + "_" + _nextStartPort; |
| swapOutInstancesToSwapInInstances.put(instanceToSwapOutName, instanceToSwapInName); |
| addParticipant(instanceToSwapInName, instanceToSwapOutInstanceConfig.getLogicalId(LOGICAL_ID), |
| instanceToSwapOutInstanceConfig.getDomainAsMap().get(ZONE), |
| InstanceConstants.InstanceOperation.SWAP_IN, -1, instanceToSwapInInstanceConfigListener); |
| |
| // Validate that the throttles are off since the InstanceOperation is set to SWAP_IN |
| Assert.assertFalse(instanceToSwapInInstanceConfigListener.isThrottlesEnabled()); |
| |
| // Check that the SWAP_IN instance has the same partitions as the swap out instance |
| // but none of them are in a top state. |
| Assert.assertTrue(_bestPossibleClusterVerifier.verifyByPolling()); |
| validateEVsCorrect(getEVs(), originalEVs, swapOutInstancesToSwapInInstances, |
| ImmutableSet.of(instanceToSwapInName), Collections.emptySet()); |
| |
| // Assert canSwapBeCompleted is true |
| Assert.assertTrue(_gSetupTool.getClusterManagementTool() |
| .canCompleteSwap(CLUSTER_NAME, instanceToSwapOutName)); |
| |
| // Validate that the swap out instance is in routing tables and SWAP_IN is not. |
| validateRoutingTablesInstance(getEVs(), instanceToSwapOutName, true); |
| validateRoutingTablesInstance(getEVs(), instanceToSwapInName, false); |
| |
| // Assert completeSwapIfPossible is true |
| Assert.assertTrue(_gSetupTool.getClusterManagementTool() |
| .completeSwapIfPossible(CLUSTER_NAME, instanceToSwapOutName, false)); |
| |
| // Get both instanceConfigs and make sure correct fields are copied over. |
| InstanceConfig instanceToSwapInInstanceConfig = _gSetupTool.getClusterManagementTool() |
| .getInstanceConfig(CLUSTER_NAME, instanceToSwapInName); |
| |
| Assert.assertEquals(instanceToSwapInInstanceConfig.getRecord() |
| .getMapField(InstanceConfig.InstanceConfigProperty.HELIX_DISABLED_PARTITION.name()), |
| instanceToSwapInInstanceConfig.getRecord() |
| .getMapField(InstanceConfig.InstanceConfigProperty.HELIX_DISABLED_PARTITION.name())); |
| |
| Assert.assertTrue(_clusterVerifier.verifyByPolling()); |
| |
| // Validate that the SWAP_IN instance is now in the routing tables. |
| validateRoutingTablesInstance(getEVs(), instanceToSwapInName, true); |
| |
| // Assert that swap out instance is not active and has no partitions assigned to it. |
| Assert.assertFalse(_gSetupTool.getClusterManagementTool() |
| .getInstanceConfig(CLUSTER_NAME, instanceToSwapOutName).getInstanceEnabled()); |
| Assert.assertEquals(_gSetupTool.getClusterManagementTool() |
| .getInstanceConfig(CLUSTER_NAME, instanceToSwapOutName).getInstanceOperation(), |
| InstanceConstants.InstanceOperation.UNKNOWN); |
| |
| // Check to make sure the throttle was enabled again after the swap was completed. |
| Assert.assertTrue(instanceToSwapInInstanceConfigListener.isThrottlesEnabled()); |
| |
| // Validate that the SWAP_IN instance has the same partitions the swap out instance had before |
| // swap was completed. |
| verifier(() -> (validateEVsCorrect(getEVs(), originalEVs, swapOutInstancesToSwapInInstances, |
| Collections.emptySet(), ImmutableSet.of(instanceToSwapInName))), TIMEOUT); |
| } |
| |
| @Test(dependsOnMethods = "testNodeSwap") |
| public void testNodeSwapDisableAndReenable() throws Exception { |
| System.out.println( |
| "START TestInstanceOperation.testNodeSwap() at " + new Date(System.currentTimeMillis())); |
| removeOfflineOrInactiveInstances(); |
| |
| // Store original EV |
| Map<String, ExternalView> originalEVs = getEVs(); |
| |
| Map<String, String> swapOutInstancesToSwapInInstances = new HashMap<>(); |
| |
| String instanceToSwapOutName = _participants.get(0).getInstanceName(); |
| InstanceConfig instanceToSwapOutInstanceConfig = _gSetupTool.getClusterManagementTool() |
| .getInstanceConfig(CLUSTER_NAME, instanceToSwapOutName); |
| |
| // Validate that the assignment has not changed since setting the InstanceOperation to swap out |
| Assert.assertTrue(_clusterVerifier.verifyByPolling()); |
| validateEVsCorrect(getEVs(), originalEVs, swapOutInstancesToSwapInInstances, |
| Collections.emptySet(), Collections.emptySet()); |
| |
| // Add instance with InstanceOperation set to SWAP_IN |
| String instanceToSwapInName = PARTICIPANT_PREFIX + "_" + _nextStartPort; |
| swapOutInstancesToSwapInInstances.put(instanceToSwapOutName, instanceToSwapInName); |
| addParticipant(instanceToSwapInName, instanceToSwapOutInstanceConfig.getLogicalId(LOGICAL_ID), |
| instanceToSwapOutInstanceConfig.getDomainAsMap().get(ZONE), |
| InstanceConstants.InstanceOperation.SWAP_IN, -1); |
| |
| // Check that the SWAP_IN instance has the same partitions as the swap out instance |
| // but none of them are in a top state. |
| Assert.assertTrue(_bestPossibleClusterVerifier.verifyByPolling()); |
| validateEVsCorrect(getEVs(), originalEVs, swapOutInstancesToSwapInInstances, |
| ImmutableSet.of(instanceToSwapInName), Collections.emptySet()); |
| |
| // Assert canSwapBeCompleted is true |
| Assert.assertTrue(_gSetupTool.getClusterManagementTool() |
| .canCompleteSwap(CLUSTER_NAME, instanceToSwapOutName)); |
| |
| // Try to disable the swap out instance, it should not do anything. |
| _gSetupTool.getClusterManagementTool() |
| .enableInstance(CLUSTER_NAME, instanceToSwapOutName, false); |
| |
| // Check that the SWAP_IN instance's replicas match the SWAP_OUT instance's replicas |
| // and all of them are OFFLINE. |
| Assert.assertTrue(_bestPossibleClusterVerifier.verifyByPolling()); |
| Map<String, Map<String, String>> resourcePartitionStateOnSwapOutInstance = |
| getResourcePartitionStateOnInstance(getEVs(), instanceToSwapOutName); |
| Map<String, Map<String, String>> resourcePartitionStateOnSwapInInstance = |
| getResourcePartitionStateOnInstance(getEVs(), instanceToSwapInName); |
| Assert.assertEquals( |
| resourcePartitionStateOnSwapInInstance.values().stream().flatMap(p -> p.keySet().stream()) |
| .collect(Collectors.toSet()), |
| resourcePartitionStateOnSwapOutInstance.values().stream().flatMap(p -> p.keySet().stream()) |
| .collect(Collectors.toSet())); |
| Set<String> swapOutInstancePartitionStates = |
| resourcePartitionStateOnSwapOutInstance.values().stream().flatMap(e -> e.values().stream()) |
| .collect(Collectors.toSet()); |
| Assert.assertEquals(swapOutInstancePartitionStates.size(), 1); |
| Assert.assertTrue(swapOutInstancePartitionStates.contains("OFFLINE")); |
| Set<String> swapInInstancePartitionStates = |
| resourcePartitionStateOnSwapInInstance.values().stream().flatMap(e -> e.values().stream()) |
| .collect(Collectors.toSet()); |
| Assert.assertEquals(swapInInstancePartitionStates.size(), 1); |
| Assert.assertTrue(swapInInstancePartitionStates.contains("OFFLINE")); |
| |
| // Validate that the SWAP_OUT instance is in routing tables and SWAP_IN is not. |
| validateRoutingTablesInstance(getEVs(), instanceToSwapOutName, true); |
| validateRoutingTablesInstance(getEVs(), instanceToSwapInName, false); |
| |
| // Assert canSwapBeCompleted is true |
| Assert.assertTrue(_gSetupTool.getClusterManagementTool() |
| .canCompleteSwap(CLUSTER_NAME, instanceToSwapOutName)); |
| |
| // Re-enable the swap out instance |
| _gSetupTool.getClusterManagementTool().enableInstance(CLUSTER_NAME, instanceToSwapOutName, true); |
| Assert.assertTrue(_bestPossibleClusterVerifier.verifyByPolling()); |
| |
| // Validate that the SWAP_OUT instance is in routing tables and SWAP_IN is not. |
| validateRoutingTablesInstance(getEVs(), instanceToSwapOutName, true); |
| validateRoutingTablesInstance(getEVs(), instanceToSwapInName, false); |
| |
| // Assert completeSwapIfPossible is true |
| Assert.assertTrue(_gSetupTool.getClusterManagementTool() |
| .completeSwapIfPossible(CLUSTER_NAME, instanceToSwapOutName, false)); |
| |
| Assert.assertTrue(_clusterVerifier.verifyByPolling()); |
| |
| // Validate that the SWAP_IN instance is now in the routing tables. |
| validateRoutingTablesInstance(getEVs(), instanceToSwapInName, true); |
| |
| // Assert that swap out instance is not active and has no partitions assigned to it. |
| Assert.assertFalse(_gSetupTool.getClusterManagementTool() |
| .getInstanceConfig(CLUSTER_NAME, instanceToSwapOutName).getInstanceEnabled()); |
| Assert.assertEquals(_gSetupTool.getClusterManagementTool() |
| .getInstanceConfig(CLUSTER_NAME, instanceToSwapOutName).getInstanceOperation(), |
| InstanceConstants.InstanceOperation.UNKNOWN); |
| |
| // Validate that the SWAP_IN instance has the same partitions the swap out instance had before |
| // swap was completed. |
| verifier(() -> (validateEVsCorrect(getEVs(), originalEVs, swapOutInstancesToSwapInInstances, |
| Collections.emptySet(), ImmutableSet.of(instanceToSwapInName))), TIMEOUT); |
| } |
| |
| @Test(dependsOnMethods = "testNodeSwapDisableAndReenable") |
| public void testNodeSwapSwapInNodeNoInstanceOperation() throws Exception { |
| System.out.println("START TestInstanceOperation.testNodeSwapSwapInNodeNoInstanceOperation() at " |
| + new Date(System.currentTimeMillis())); |
| |
| removeOfflineOrInactiveInstances(); |
| |
| // Store original EVs |
| Map<String, ExternalView> originalEVs = getEVs(); |
| |
| Map<String, String> swapOutInstancesToSwapInInstances = new HashMap<>(); |
| |
| String instanceToSwapOutName = _participants.get(0).getInstanceName(); |
| InstanceConfig instanceToSwapOutInstanceConfig = _gSetupTool.getClusterManagementTool() |
| .getInstanceConfig(CLUSTER_NAME, instanceToSwapOutName); |
| |
| Assert.assertTrue(_clusterVerifier.verifyByPolling()); |
| validateEVsCorrect(getEVs(), originalEVs, swapOutInstancesToSwapInInstances, |
| Collections.emptySet(), Collections.emptySet()); |
| |
| // Add instance with InstanceOperation unset, should set to UNKNOWN. |
| String instanceToSwapInName = PARTICIPANT_PREFIX + "_" + _nextStartPort; |
| swapOutInstancesToSwapInInstances.put(instanceToSwapOutName, instanceToSwapInName); |
| addParticipant(instanceToSwapInName, instanceToSwapOutInstanceConfig.getLogicalId(LOGICAL_ID), |
| instanceToSwapOutInstanceConfig.getDomainAsMap().get(ZONE), null, -1); |
| |
| // Validate that the SWAP_IN instance does not have any partitions on it. |
| Assert.assertTrue(_clusterVerifier.verifyByPolling()); |
| validateEVsCorrect(getEVs(), originalEVs, swapOutInstancesToSwapInInstances, |
| Collections.emptySet(), Collections.emptySet()); |
| |
| // Set InstanceOperation to SWAP_IN |
| _gSetupTool.getClusterManagementTool().setInstanceOperation(CLUSTER_NAME, instanceToSwapInName, |
| InstanceConstants.InstanceOperation.SWAP_IN); |
| |
| // Check that the SWAP_IN instance has the same partitions as the swap out instance |
| // but none of them are in a top state. |
| Assert.assertTrue(_bestPossibleClusterVerifier.verifyByPolling()); |
| validateEVsCorrect(getEVs(), originalEVs, swapOutInstancesToSwapInInstances, |
| ImmutableSet.of(instanceToSwapInName), Collections.emptySet()); |
| |
| // Validate that the swap out instance is in routing tables and SWAP_IN is not. |
| validateRoutingTablesInstance(getEVs(), instanceToSwapOutName, true); |
| validateRoutingTablesInstance(getEVs(), instanceToSwapInName, false); |
| |
| // Assert canSwapBeCompleted is true |
| Assert.assertTrue(_gSetupTool.getClusterManagementTool() |
| .canCompleteSwap(CLUSTER_NAME, instanceToSwapOutName)); |
| // Assert completeSwapIfPossible is true |
| Assert.assertTrue(_gSetupTool.getClusterManagementTool() |
| .completeSwapIfPossible(CLUSTER_NAME, instanceToSwapOutName, false)); |
| |
| Assert.assertTrue(_clusterVerifier.verifyByPolling()); |
| |
| // Assert that swap out instance is inactive and has no partitions assigned to it. |
| Assert.assertFalse(_gSetupTool.getClusterManagementTool() |
| .getInstanceConfig(CLUSTER_NAME, instanceToSwapOutName).getInstanceEnabled()); |
| |
| // Validate that the SWAP_IN instance has the same partitions the swap out instance had before |
| // swap was completed. |
| verifier(() -> (validateEVsCorrect(getEVs(), originalEVs, swapOutInstancesToSwapInInstances, |
| Collections.emptySet(), ImmutableSet.of(instanceToSwapInName))), TIMEOUT); |
| } |
| |
| @Test(dependsOnMethods = "testNodeSwapSwapInNodeNoInstanceOperation") |
| public void testNodeSwapCancelSwapWhenReadyToComplete() throws Exception { |
| System.out.println( |
| "START TestInstanceOperation.testNodeSwapCancelSwapWhenReadyToComplete() at " + new Date( |
| System.currentTimeMillis())); |
| |
| removeOfflineOrInactiveInstances(); |
| |
| // Store original EVs |
| Map<String, ExternalView> originalEVs = getEVs(); |
| |
| Map<String, String> swapOutInstancesToSwapInInstances = new HashMap<>(); |
| |
| String instanceToSwapOutName = _participants.get(0).getInstanceName(); |
| InstanceConfig instanceToSwapOutInstanceConfig = _gSetupTool.getClusterManagementTool() |
| .getInstanceConfig(CLUSTER_NAME, instanceToSwapOutName); |
| |
| // Validate that the assignment has not changed since setting the InstanceOperation to swap out |
| Assert.assertTrue(_clusterVerifier.verifyByPolling()); |
| validateEVsCorrect(getEVs(), originalEVs, swapOutInstancesToSwapInInstances, |
| Collections.emptySet(), Collections.emptySet()); |
| |
| // Add instance with InstanceOperation set to SWAP_IN |
| String instanceToSwapInName = PARTICIPANT_PREFIX + "_" + _nextStartPort; |
| swapOutInstancesToSwapInInstances.put(instanceToSwapOutName, instanceToSwapInName); |
| addParticipant(instanceToSwapInName, instanceToSwapOutInstanceConfig.getLogicalId(LOGICAL_ID), |
| instanceToSwapOutInstanceConfig.getDomainAsMap().get(ZONE), |
| InstanceConstants.InstanceOperation.SWAP_IN, -1); |
| |
| // Check that the SWAP_IN instance has the same partitions as the swap out instance |
| // but none of them are in a top state. |
| Assert.assertTrue(_bestPossibleClusterVerifier.verifyByPolling()); |
| validateEVsCorrect(getEVs(), originalEVs, swapOutInstancesToSwapInInstances, |
| ImmutableSet.of(instanceToSwapInName), Collections.emptySet()); |
| |
| // Validate that the swap out instance is in routing tables and SWAP_IN is not. |
| validateRoutingTablesInstance(getEVs(), instanceToSwapOutName, true); |
| validateRoutingTablesInstance(getEVs(), instanceToSwapInName, false); |
| |
| // Assert canSwapBeCompleted is true |
| Assert.assertTrue(_gSetupTool.getClusterManagementTool() |
| .canCompleteSwap(CLUSTER_NAME, instanceToSwapOutName)); |
| |
| // Cancel the swap by setting the InstanceOperation to UNKNOWN on the SWAP_IN instance. |
| _gSetupTool.getClusterManagementTool().setInstanceOperation(CLUSTER_NAME, instanceToSwapInName, |
| InstanceConstants.InstanceOperation.UNKNOWN); |
| |
| // Validate there are no partitions on the SWAP_IN instance. |
| Assert.assertTrue(_bestPossibleClusterVerifier.verifyByPolling()); |
| validateEVsCorrect(getEVs(), originalEVs, swapOutInstancesToSwapInInstances, |
| Collections.emptySet(), Collections.emptySet()); |
| |
| // Stop the participant |
| _participants.get(_participants.size() - 1).syncStop(); |
| |
| // Wait for cluster to converge. |
| Assert.assertTrue(_clusterVerifier.verifyByPolling()); |
| |
| // Validate that the swap out instance is in routing tables and SWAP_IN is not. |
| validateRoutingTablesInstance(getEVs(), instanceToSwapOutName, true); |
| validateRoutingTablesInstance(getEVs(), instanceToSwapInName, false); |
| |
| // Validate there are no partitions on the SWAP_IN instance. |
| Assert.assertEquals(getPartitionsAndStatesOnInstance(getEVs(), instanceToSwapInName).size(), 0); |
| |
| // Validate that the swap out instance has the same partitions as it had before. |
| validateEVsCorrect(getEVs(), originalEVs, swapOutInstancesToSwapInInstances, |
| Collections.emptySet(), Collections.emptySet()); |
| |
| _gSetupTool.getClusterManagementTool() |
| .setInstanceOperation(CLUSTER_NAME, instanceToSwapOutName, null); |
| |
| Assert.assertTrue(_clusterVerifier.verifyByPolling()); |
| |
| // Validate that the swap out instance has the same partitions as it had before. |
| verifier(() -> (validateEVsCorrect(getEVs(), originalEVs, swapOutInstancesToSwapInInstances, |
| Collections.emptySet(), Collections.emptySet())), TIMEOUT); |
| } |
| |
| @Test(dependsOnMethods = "testNodeSwapCancelSwapWhenReadyToComplete") |
| public void testNodeSwapAfterEMM() throws Exception { |
| System.out.println("START TestInstanceOperation.testNodeSwapAfterEMM() at " + new Date( |
| System.currentTimeMillis())); |
| |
| removeOfflineOrInactiveInstances(); |
| |
| // Store original EVs |
| Map<String, ExternalView> originalEVs = getEVs(); |
| |
| Map<String, String> swapOutInstancesToSwapInInstances = new HashMap<>(); |
| |
| // Put the cluster in maintenance mode. |
| _gSetupTool.getClusterManagementTool() |
| .manuallyEnableMaintenanceMode(CLUSTER_NAME, true, null, null); |
| |
| String instanceToSwapOutName = _participants.get(0).getInstanceName(); |
| InstanceConfig instanceToSwapOutInstanceConfig = _gSetupTool.getClusterManagementTool() |
| .getInstanceConfig(CLUSTER_NAME, instanceToSwapOutName); |
| |
| // Validate that the assignment has not changed. |
| Assert.assertTrue(_clusterVerifier.verifyByPolling()); |
| validateEVsCorrect(getEVs(), originalEVs, swapOutInstancesToSwapInInstances, |
| Collections.emptySet(), Collections.emptySet()); |
| |
| // Add instance with InstanceOperation set to SWAP_IN |
| String instanceToSwapInName = PARTICIPANT_PREFIX + "_" + _nextStartPort; |
| swapOutInstancesToSwapInInstances.put(instanceToSwapOutName, instanceToSwapInName); |
| addParticipant(instanceToSwapInName, instanceToSwapOutInstanceConfig.getLogicalId(LOGICAL_ID), |
| instanceToSwapOutInstanceConfig.getDomainAsMap().get(ZONE), |
| InstanceConstants.InstanceOperation.SWAP_IN, -1); |
| |
| // Validate that the assignment has not changed since adding the SWAP_IN node. |
| // During MM, the cluster should not compute new assignment on SWAP_IN node. |
| Assert.assertTrue(_bestPossibleClusterVerifier.verifyByPolling()); |
| validateEVsCorrect(getEVs(), originalEVs, swapOutInstancesToSwapInInstances, |
| Collections.emptySet(), Collections.emptySet()); |
| |
| // Remove the cluster from maintenance mode. |
| // Now swapping will begin |
| _gSetupTool.getClusterManagementTool() |
| .manuallyEnableMaintenanceMode(CLUSTER_NAME, false, null, null); |
| |
| // Validate that partitions on swap out instance does not change after exiting MM |
| // Check that the SWAP_IN instance has the same partitions as the swap out instance |
| // but none of them are in a top state. |
| Assert.assertTrue(_bestPossibleClusterVerifier.verifyByPolling()); |
| validateEVsCorrect(getEVs(), originalEVs, swapOutInstancesToSwapInInstances, |
| ImmutableSet.of(instanceToSwapInName), Collections.emptySet()); |
| |
| // Validate that the swap out instance is in routing tables and SWAP_IN is not. |
| validateRoutingTablesInstance(getEVs(), instanceToSwapOutName, true); |
| validateRoutingTablesInstance(getEVs(), instanceToSwapInName, false); |
| |
| // Assert canSwapBeCompleted is true |
| Assert.assertTrue(_gSetupTool.getClusterManagementTool() |
| .canCompleteSwap(CLUSTER_NAME, instanceToSwapOutName)); |
| // Assert completeSwapIfPossible is true |
| Assert.assertTrue(_gSetupTool.getClusterManagementTool() |
| .completeSwapIfPossible(CLUSTER_NAME, instanceToSwapOutName, false)); |
| |
| Assert.assertTrue(_clusterVerifier.verifyByPolling()); |
| |
| // Validate that the SWAP_IN instance is now in the routing tables. |
| validateRoutingTablesInstance(getEVs(), instanceToSwapInName, true); |
| |
| // Assert that swap out instance is disabled and has no partitions assigned to it. |
| Assert.assertFalse(_gSetupTool.getClusterManagementTool() |
| .getInstanceConfig(CLUSTER_NAME, instanceToSwapOutName).getInstanceEnabled()); |
| |
| // Validate that the SWAP_IN instance has the same partitions the swap out instance had before |
| // swap was completed. |
| verifier(() -> (validateEVsCorrect(getEVs(), originalEVs, swapOutInstancesToSwapInInstances, |
| Collections.emptySet(), ImmutableSet.of(instanceToSwapInName))), TIMEOUT); |
| } |
| |
| @Test(dependsOnMethods = "testNodeSwapAfterEMM") |
| public void testNodeSwapWithSwapOutInstanceDisabled() throws Exception { |
| System.out.println( |
| "START TestInstanceOperation.testNodeSwapWithSwapOutInstanceDisabled() at " + new Date( |
| System.currentTimeMillis())); |
| |
| removeOfflineOrInactiveInstances(); |
| |
| // Store original EVs |
| Map<String, ExternalView> originalEVs = getEVs(); |
| |
| String instanceToSwapOutName = _participants.get(0).getInstanceName(); |
| InstanceConfig instanceToSwapOutInstanceConfig = _gSetupTool.getClusterManagementTool() |
| .getInstanceConfig(CLUSTER_NAME, instanceToSwapOutName); |
| |
| Set<String> swapOutInstanceOriginalPartitions = |
| getPartitionsAndStatesOnInstance(originalEVs, instanceToSwapOutName).keySet(); |
| |
| // Disable the swap out instance. |
| _gSetupTool.getClusterManagementTool() |
| .enableInstance(CLUSTER_NAME, instanceToSwapOutName, false); |
| |
| Assert.assertTrue(_clusterVerifier.verifyByPolling()); |
| |
| // Validate that the swap out instance has all partitions in OFFLINE state |
| Set<String> swapOutInstanceOfflineStates = |
| new HashSet<>(getPartitionsAndStatesOnInstance(getEVs(), instanceToSwapOutName).values()); |
| Assert.assertEquals(swapOutInstanceOfflineStates.size(), 1); |
| Assert.assertTrue(swapOutInstanceOfflineStates.contains("OFFLINE")); |
| |
| // Add instance with InstanceOperation set to SWAP_IN |
| String instanceToSwapInName = PARTICIPANT_PREFIX + "_" + _nextStartPort; |
| addParticipant(instanceToSwapInName, instanceToSwapOutInstanceConfig.getLogicalId(LOGICAL_ID), |
| instanceToSwapOutInstanceConfig.getDomainAsMap().get(ZONE), |
| InstanceConstants.InstanceOperation.SWAP_IN, -1); |
| |
| Assert.assertTrue(_bestPossibleClusterVerifier.verifyByPolling()); |
| |
| // Validate that the SWAP_IN instance has no partitions because the swap started when the swap out node was offline |
| Map<String, String> swapInInstancePartitionsAndStates = |
| getPartitionsAndStatesOnInstance(getEVs(), instanceToSwapInName); |
| Assert.assertEquals(swapInInstancePartitionsAndStates.size(), 0); |
| |
| // Assert canSwapBeCompleted is false because swap out instance is disabled. |
| Assert.assertTrue(_gSetupTool.getClusterManagementTool() |
| .canCompleteSwap(CLUSTER_NAME, instanceToSwapOutName)); |
| |
| // Assert completeSwapIfPossible is true |
| Assert.assertTrue(_gSetupTool.getClusterManagementTool() |
| .completeSwapIfPossible(CLUSTER_NAME, instanceToSwapOutName, false)); |
| |
| Assert.assertTrue(_bestPossibleClusterVerifier.verifyByPolling()); |
| |
| // Assert that swap out instance is disabled and has no partitions assigned to it. |
| Assert.assertFalse(_gSetupTool.getClusterManagementTool() |
| .getInstanceConfig(CLUSTER_NAME, instanceToSwapOutName).getInstanceEnabled()); |
| |
| verifier( |
| () -> (getPartitionsAndStatesOnInstance(getEVs(), instanceToSwapOutName).isEmpty()), |
| TIMEOUT); |
| } |
| |
| @Test(dependsOnMethods = "testNodeSwapWithSwapOutInstanceDisabled") |
| public void testNodeSwapWithSwapOutInstanceOffline() throws Exception { |
| System.out.println( |
| "START TestInstanceOperation.testNodeSwapWithSwapOutInstanceOffline() at " + new Date( |
| System.currentTimeMillis())); |
| |
| removeOfflineOrInactiveInstances(); |
| |
| // Store original EV |
| Map<String, ExternalView> originalEVs = getEVs(); |
| |
| Map<String, String> swapOutInstancesToSwapInInstances = new HashMap<>(); |
| |
| String instanceToSwapOutName = _participants.get(0).getInstanceName(); |
| InstanceConfig instanceToSwapOutInstanceConfig = _gSetupTool.getClusterManagementTool() |
| .getInstanceConfig(CLUSTER_NAME, instanceToSwapOutName); |
| |
| // Add instance with InstanceOperation set to SWAP_IN |
| String instanceToSwapInName = PARTICIPANT_PREFIX + "_" + _nextStartPort; |
| swapOutInstancesToSwapInInstances.put(instanceToSwapOutName, instanceToSwapInName); |
| addParticipant(instanceToSwapInName, instanceToSwapOutInstanceConfig.getLogicalId(LOGICAL_ID), |
| instanceToSwapOutInstanceConfig.getDomainAsMap().get(ZONE), |
| InstanceConstants.InstanceOperation.SWAP_IN, -1); |
| |
| // Kill the participant |
| _participants.get(0).syncStop(); |
| |
| Assert.assertTrue(_bestPossibleClusterVerifier.verifyByPolling()); |
| |
| // Assert canSwapBeCompleted is true |
| Assert.assertTrue(_gSetupTool.getClusterManagementTool() |
| .canCompleteSwap(CLUSTER_NAME, instanceToSwapOutName)); |
| |
| // Validate that the swap out instance is in routing tables and SWAP_IN is not. |
| validateRoutingTablesInstance(getEVs(), instanceToSwapInName, false); |
| |
| // Assert completeSwapIfPossible is true |
| Assert.assertTrue(_gSetupTool.getClusterManagementTool() |
| .completeSwapIfPossible(CLUSTER_NAME, instanceToSwapOutName, false)); |
| |
| Assert.assertTrue(_clusterVerifier.verifyByPolling()); |
| |
| // Validate that the SWAP_IN instance is now in the routing tables. |
| validateRoutingTablesInstance(getEVs(), instanceToSwapInName, true); |
| |
| // Assert that swap out instance is inactive and has no partitions assigned to it. |
| Assert.assertFalse(_gSetupTool.getClusterManagementTool() |
| .getInstanceConfig(CLUSTER_NAME, instanceToSwapOutName).getInstanceEnabled()); |
| |
| // Validate that the SWAP_IN instance has the same partitions the swap out instance had before |
| // swap was completed. |
| verifier(() -> (validateEVsCorrect(getEVs(), originalEVs, swapOutInstancesToSwapInInstances, |
| Collections.emptySet(), ImmutableSet.of(instanceToSwapInName))), TIMEOUT); |
| } |
| |
| @Test(dependsOnMethods = "testNodeSwapWithSwapOutInstanceOffline") |
| public void testSwapEvacuateAdd() throws Exception { |
| System.out.println("START TestInstanceOperation.testSwapEvacuateAdd() at " + new Date( |
| System.currentTimeMillis())); |
| removeOfflineOrInactiveInstances(); |
| |
| // Store original EV |
| Map<String, ExternalView> originalEVs = getEVs(); |
| |
| Map<String, String> swapOutInstancesToSwapInInstances = new HashMap<>(); |
| |
| // Enter maintenance mode |
| _gSetupTool.getClusterManagementTool() |
| .manuallyEnableMaintenanceMode(CLUSTER_NAME, true, null, null); |
| |
| Assert.assertTrue(_clusterVerifier.verifyByPolling()); |
| |
| // Set instance's InstanceOperation to EVACUATE |
| String instanceToSwapOutName = _participants.get(0).getInstanceName(); |
| InstanceConfig instanceToSwapOutInstanceConfig = _gSetupTool.getClusterManagementTool() |
| .getInstanceConfig(CLUSTER_NAME, instanceToSwapOutName); |
| _gSetupTool.getClusterManagementTool().setInstanceOperation(CLUSTER_NAME, instanceToSwapOutName, |
| InstanceConstants.InstanceOperation.EVACUATE); |
| |
| // Validate that the assignment has not changed since setting the InstanceOperation to EVACUATE |
| validateEVsCorrect(getEVs(), originalEVs, swapOutInstancesToSwapInInstances, |
| Collections.emptySet(), Collections.emptySet()); |
| |
| // Add instance with InstanceOperation set to ENABLE |
| String instanceToSwapInName = PARTICIPANT_PREFIX + "_" + _nextStartPort; |
| swapOutInstancesToSwapInInstances.put(instanceToSwapOutName, instanceToSwapInName); |
| addParticipant(instanceToSwapInName, instanceToSwapOutInstanceConfig.getLogicalId(LOGICAL_ID), |
| instanceToSwapOutInstanceConfig.getDomainAsMap().get(ZONE), |
| InstanceConstants.InstanceOperation.ENABLE, -1); |
| |
| // Exit maintenance mode |
| _gSetupTool.getClusterManagementTool() |
| .manuallyEnableMaintenanceMode(CLUSTER_NAME, false, null, null); |
| |
| Assert.assertTrue(_bestPossibleClusterVerifier.verifyByPolling()); |
| |
| // Validate that the SWAP_IN instance has the same partitions the swap out instance had. |
| verifier(() -> (validateEVsCorrect(getEVs(), originalEVs, swapOutInstancesToSwapInInstances, |
| Collections.emptySet(), ImmutableSet.of(instanceToSwapInName))), TIMEOUT); |
| |
| // Assert isEvacuateFinished is true |
| Assert.assertTrue(_gSetupTool.getClusterManagementTool() |
| .isEvacuateFinished(CLUSTER_NAME, instanceToSwapOutName)); |
| |
| // Set the EVACUATE instance to UNKNOWN |
| _gSetupTool.getClusterManagementTool().setInstanceOperation(CLUSTER_NAME, instanceToSwapOutName, |
| InstanceConstants.InstanceOperation.UNKNOWN); |
| |
| Assert.assertTrue(_clusterVerifier.verifyByPolling()); |
| |
| // Validate that dropping the instance has not changed the assignment |
| verifier(() -> (validateEVsCorrect(getEVs(), originalEVs, swapOutInstancesToSwapInInstances, |
| Collections.emptySet(), ImmutableSet.of(instanceToSwapInName))), TIMEOUT); |
| } |
| |
| @Test(expectedExceptions = HelixException.class, dependsOnMethods = "testSwapEvacuateAdd") |
| public void testUnsetInstanceOperationOnSwapInWhenSwapping() throws Exception { |
| System.out.println( |
| "START TestInstanceOperation.testUnsetInstanceOperationOnSwapInWhenSwapping() at " |
| + new Date(System.currentTimeMillis())); |
| |
| removeOfflineOrInactiveInstances(); |
| |
| String instanceToSwapOutName = _participants.get(0).getInstanceName(); |
| InstanceConfig instanceToSwapOutInstanceConfig = _gSetupTool.getClusterManagementTool() |
| .getInstanceConfig(CLUSTER_NAME, instanceToSwapOutName); |
| |
| // Add instance with InstanceOperation set to SWAP_IN |
| String instanceToSwapInName = PARTICIPANT_PREFIX + "_" + _nextStartPort; |
| addParticipant(instanceToSwapInName, instanceToSwapOutInstanceConfig.getLogicalId(LOGICAL_ID), |
| instanceToSwapOutInstanceConfig.getDomainAsMap().get(ZONE), |
| InstanceConstants.InstanceOperation.SWAP_IN, -1); |
| |
| Assert.assertTrue(_bestPossibleClusterVerifier.verifyByPolling()); |
| |
| // Try to remove the InstanceOperation from the SWAP_IN instance before swap in instance is set to unknown. |
| // This should throw exception because we cannot ever have two instances with the same logicalId and both have InstanceOperation |
| // unset. |
| _gSetupTool.getClusterManagementTool() |
| .setInstanceOperation(CLUSTER_NAME, instanceToSwapInName, null); |
| } |
| |
| @Test(dependsOnMethods = "testUnsetInstanceOperationOnSwapInWhenSwapping") |
| public void testNodeSwapAddSwapInFirst() throws Exception { |
| System.out.println("START TestInstanceOperation.testNodeSwapAddSwapInFirst() at " + new Date( |
| System.currentTimeMillis())); |
| removeOfflineOrInactiveInstances(); |
| |
| // Store original EV |
| Map<String, ExternalView> originalEVs = getEVs(); |
| // Get the swap out instance. |
| String instanceToSwapOutName = _participants.get(0).getInstanceName(); |
| InstanceConfig instanceToSwapOutInstanceConfig = _gSetupTool.getClusterManagementTool() |
| .getInstanceConfig(CLUSTER_NAME, instanceToSwapOutName); |
| |
| // Add instance with InstanceOperation set to SWAP_IN |
| String instanceToSwapInName = PARTICIPANT_PREFIX + "_" + _nextStartPort; |
| addParticipant(instanceToSwapInName, instanceToSwapOutInstanceConfig.getLogicalId(LOGICAL_ID), |
| instanceToSwapOutInstanceConfig.getDomainAsMap().get(ZONE), |
| InstanceConstants.InstanceOperation.SWAP_IN, -1); |
| } |
| |
| @Test(dependsOnMethods = "testNodeSwapAddSwapInFirst") |
| public void testEvacuateAndCancelBeforeBootstrapFinish() throws Exception { |
| System.out.println( |
| "START TestInstanceOperation.testEvacuateAndCancelBeforeBootstrapFinish() at " + new Date( |
| System.currentTimeMillis())); |
| |
| removeOfflineOrInactiveInstances(); |
| |
| // add a resource where downward state transition is slow |
| createResourceWithDelayedRebalance(CLUSTER_NAME, "TEST_DB3_DELAYED_CRUSHED", "MasterSlave", |
| PARTITIONS, REPLICA, REPLICA - 1, 200000, CrushEdRebalanceStrategy.class.getName()); |
| _allDBs.add("TEST_DB3_DELAYED_CRUSHED"); |
| // add a resource where downward state transition is slow |
| createResourceWithWagedRebalance(CLUSTER_NAME, "TEST_DB4_DELAYED_WAGED", "MasterSlave", |
| PARTITIONS, REPLICA, REPLICA - 1); |
| _allDBs.add("TEST_DB4_DELAYED_WAGED"); |
| // wait for assignment to finish |
| Assert.assertTrue(_clusterVerifier.verifyByPolling()); |
| |
| // set bootstrap ST delay to a large number |
| _stateModelDelay = -10000L; |
| // evacuate an instance |
| String instanceToEvacuate = _participants.get(0).getInstanceName(); |
| _gSetupTool.getClusterManagementTool().setInstanceOperation(CLUSTER_NAME, instanceToEvacuate, |
| InstanceConstants.InstanceOperation.EVACUATE); |
| // Messages should be pending at all instances besides the evacuate one |
| for (String participant : _participantNames) { |
| if (participant.equals(instanceToEvacuate)) { |
| continue; |
| } |
| verifier(() -> ((_dataAccessor.getChildNames( |
| _dataAccessor.keyBuilder().messages(participant))).isEmpty()), 30000); |
| } |
| Assert.assertFalse(_admin.isEvacuateFinished(CLUSTER_NAME, instanceToEvacuate)); |
| Assert.assertFalse(_admin.isReadyForPreparingJoiningCluster(CLUSTER_NAME, instanceToEvacuate)); |
| |
| // sleep a bit so ST messages can start executing |
| Thread.sleep(Math.abs(_stateModelDelay / 100)); |
| // before we cancel, check current EV |
| Map<String, ExternalView> assignment = getEVs(); |
| for (String resource : _allDBs) { |
| // check every replica has >= 3 partitions and a top state partition |
| validateAssignmentInEv(assignment.get(resource)); |
| } |
| |
| // cancel the evacuation |
| _gSetupTool.getClusterManagementTool() |
| .setInstanceOperation(CLUSTER_NAME, instanceToEvacuate, null); |
| |
| assignment = getEVs(); |
| for (String resource : _allDBs) { |
| // check every replica has >= 3 active replicas, even before cluster converge |
| validateAssignmentInEv(assignment.get(resource)); |
| } |
| |
| // check cluster converge. We have longer delay for ST then verifier timeout. It will only converge if we cancel ST. |
| Assert.assertTrue(_clusterVerifier.verifyByPolling()); |
| |
| // EV should contain all participants, check resources one by one |
| assignment = getEVs(); |
| for (String resource : _allDBs) { |
| Assert.assertTrue( |
| getParticipantsInEv(assignment.get(resource)).containsAll(_participantNames)); |
| // check every replica has >= 3 active replicas again |
| validateAssignmentInEv(assignment.get(resource)); |
| } |
| } |
| |
| @Test(dependsOnMethods = "testEvacuateAndCancelBeforeBootstrapFinish") |
| public void testEvacuateAndCancelBeforeDropFinish() throws Exception { |
| System.out.println( |
| "START TestInstanceOperation.testEvacuateAndCancelBeforeDropFinish() at " + new Date( |
| System.currentTimeMillis())); |
| |
| // set DROP ST delay to a large number |
| _stateModelDelay = 10000L; |
| |
| // evacuate an instance |
| String instanceToEvacuate = _participants.get(0).getInstanceName(); |
| _gSetupTool.getClusterManagementTool().setInstanceOperation(CLUSTER_NAME, instanceToEvacuate, |
| InstanceConstants.InstanceOperation.EVACUATE); |
| |
| // message should be pending at the to evacuate participant |
| verifier(() -> ((_dataAccessor.getChildNames( |
| _dataAccessor.keyBuilder().messages(instanceToEvacuate))).isEmpty()), 30000); |
| Assert.assertFalse(_admin.isEvacuateFinished(CLUSTER_NAME, instanceToEvacuate)); |
| |
| // cancel evacuation |
| _gSetupTool.getClusterManagementTool() |
| .setInstanceOperation(CLUSTER_NAME, instanceToEvacuate, null); |
| // check every replica has >= 3 active replicas, even before cluster converge |
| Map<String, ExternalView> assignment = getEVs(); |
| for (String resource : _allDBs) { |
| validateAssignmentInEv(assignment.get(resource)); |
| } |
| |
| Assert.assertTrue(_clusterVerifier.verifyByPolling()); |
| |
| // EV should contain all participants, check resources one by one |
| assignment = getEVs(); |
| for (String resource : _allDBs) { |
| Assert.assertTrue( |
| getParticipantsInEv(assignment.get(resource)).containsAll(_participantNames)); |
| // check every replica has >= 3 active replicas |
| validateAssignmentInEv(assignment.get(resource)); |
| } |
| } |
| |
| @Test(dependsOnMethods = "testEvacuateAndCancelBeforeDropFinish") |
| public void testMarkEvacuationAfterEMM() throws Exception { |
| System.out.println("START TestInstanceOperation.testMarkEvacuationAfterEMM() at " + new Date( |
| System.currentTimeMillis())); |
| _stateModelDelay = 1000L; |
| Assert.assertFalse(_gSetupTool.getClusterManagementTool().isInMaintenanceMode(CLUSTER_NAME)); |
| _gSetupTool.getClusterManagementTool() |
| .manuallyEnableMaintenanceMode(CLUSTER_NAME, true, null, null); |
| String newParticipantName = PARTICIPANT_PREFIX + "_" + _nextStartPort; |
| addParticipant(newParticipantName); |
| |
| Assert.assertTrue(_clusterVerifier.verifyByPolling()); |
| Map<String, ExternalView> assignment = getEVs(); |
| for (String resource : _allDBs) { |
| Assert.assertFalse( |
| getParticipantsInEv(assignment.get(resource)).contains(newParticipantName)); |
| } |
| |
| // set evacuate operation |
| String instanceToEvacuate = _participants.get(0).getInstanceName(); |
| _gSetupTool.getClusterManagementTool().setInstanceOperation(CLUSTER_NAME, instanceToEvacuate, |
| InstanceConstants.InstanceOperation.EVACUATE); |
| |
| // there should be no evacuation happening |
| for (String resource : _allDBs) { |
| Assert.assertTrue(getParticipantsInEv(assignment.get(resource)).contains(instanceToEvacuate)); |
| } |
| |
| // exit MM |
| _gSetupTool.getClusterManagementTool() |
| .manuallyEnableMaintenanceMode(CLUSTER_NAME, false, null, null); |
| |
| Assert.assertTrue(_clusterVerifier.verifyByPolling()); |
| |
| assignment = getEVs(); |
| List<String> currentActiveInstances = |
| _participantNames.stream().filter(n -> !n.equals(instanceToEvacuate)) |
| .collect(Collectors.toList()); |
| for (String resource : _allDBs) { |
| validateAssignmentInEv(assignment.get(resource)); |
| Set<String> newPAssignedParticipants = getParticipantsInEv(assignment.get(resource)); |
| Assert.assertFalse(newPAssignedParticipants.contains(instanceToEvacuate)); |
| Assert.assertTrue(newPAssignedParticipants.containsAll(currentActiveInstances)); |
| } |
| Assert.assertTrue(_admin.isReadyForPreparingJoiningCluster(CLUSTER_NAME, instanceToEvacuate)); |
| |
| _stateModelDelay = 3L; |
| } |
| |
| @Test(expectedExceptions = HelixException.class, dependsOnMethods = "testMarkEvacuationAfterEMM") |
| public void testSwapEvacuateAddRemoveEvacuate() throws Exception { |
| System.out.println("START TestInstanceOperation.testSwapEvacuateAddRemoveEvacuate() at " + new Date( |
| System.currentTimeMillis())); |
| removeOfflineOrInactiveInstances(); |
| |
| // Set instance's InstanceOperation to EVACUATE |
| String instanceToSwapOutName = _participants.get(0).getInstanceName(); |
| InstanceConfig instanceToSwapOutInstanceConfig = _gSetupTool.getClusterManagementTool() |
| .getInstanceConfig(CLUSTER_NAME, instanceToSwapOutName); |
| _gSetupTool.getClusterManagementTool().setInstanceOperation(CLUSTER_NAME, instanceToSwapOutName, |
| InstanceConstants.InstanceOperation.EVACUATE); |
| |
| // Add instance with InstanceOperation set to ENABLE |
| String instanceToSwapInName = PARTICIPANT_PREFIX + "_" + _nextStartPort; |
| addParticipant(instanceToSwapInName, instanceToSwapOutInstanceConfig.getLogicalId(LOGICAL_ID), |
| instanceToSwapOutInstanceConfig.getDomainAsMap().get(ZONE), |
| InstanceConstants.InstanceOperation.ENABLE, -1); |
| |
| // Remove EVACUATE instance's InstanceOperation |
| _gSetupTool.getClusterManagementTool() |
| .setInstanceOperation(CLUSTER_NAME, instanceToSwapOutName, null); |
| } |
| |
| @Test(dependsOnMethods = "testSwapEvacuateAddRemoveEvacuate") |
| public void testEvacuationWithOfflineInstancesInCluster() throws Exception { |
| System.out.println( |
| "START TestInstanceOperation.testEvacuationWithOfflineInstancesInCluster() at " + new Date( |
| System.currentTimeMillis())); |
| _participants.get(1).syncStop(); |
| _participants.get(2).syncStop(); |
| |
| String evacuateInstanceName = _participants.get(_participants.size() - 2).getInstanceName(); |
| _gSetupTool.getClusterManagementTool().setInstanceOperation(CLUSTER_NAME, evacuateInstanceName, |
| InstanceConstants.InstanceOperation.EVACUATE); |
| |
| // EV should contain all participants, check resources one by one |
| verifier(() -> { |
| Map<String, ExternalView> assignment = getEVs(); |
| for (String resource : _allDBs) { |
| ExternalView ev = assignment.get(resource); |
| for (String partition : ev.getPartitionSet()) { |
| AtomicInteger activeReplicaCount = new AtomicInteger(); |
| ev.getStateMap(partition).values().stream().filter( |
| v -> v.equals("MASTER") || v.equals("LEADER") || v.equals("SLAVE") || v.equals( |
| "FOLLOWER") || v.equals("STANDBY")) |
| .forEach(v -> activeReplicaCount.getAndIncrement()); |
| if (activeReplicaCount.get() < REPLICA - 1 || ( |
| ev.getStateMap(partition).containsKey(evacuateInstanceName) && ev.getStateMap( |
| partition).get(evacuateInstanceName).equals("MASTER") && ev.getStateMap( |
| partition).get(evacuateInstanceName).equals("LEADER"))) { |
| return false; |
| } |
| } |
| } |
| return true; |
| }, 30000); |
| |
| removeOfflineOrInactiveInstances(); |
| addParticipant(PARTICIPANT_PREFIX + "_" + _nextStartPort); |
| addParticipant(PARTICIPANT_PREFIX + "_" + _nextStartPort); |
| dropTestDBs(ImmutableSet.of("TEST_DB3_DELAYED_CRUSHED", "TEST_DB4_DELAYED_WAGED")); |
| } |
| |
| /** |
| * Verifies that the given verifier returns true within the given timeout. Handles AssertionError |
| * by returning false, which TestHelper.verify will not do. Asserts that return value from |
| * TestHelper.verify is true. |
| * |
| * @param verifier the verifier to run |
| * @param timeout the timeout to wait for the verifier to return true |
| * @throws Exception if TestHelper.verify throws an exception |
| */ |
| private static void verifier(TestHelper.Verifier verifier, long timeout) throws Exception { |
| Assert.assertTrue(TestHelper.verify(() -> { |
| try { |
| boolean result = verifier.verify(); |
| if (!result) { |
| LOG.error("Verifier returned false, retrying..."); |
| } |
| return result; |
| } catch (AssertionError e) { |
| LOG.error("Caught AssertionError on verifier attempt: ", e); |
| return false; |
| } |
| }, timeout)); |
| } |
| |
| private static class CustomIndividualInstanceConfigChangeListener implements InstanceConfigChangeListener { |
| private boolean throttlesEnabled; |
| |
| public CustomIndividualInstanceConfigChangeListener() { |
| throttlesEnabled = true; |
| } |
| |
| public boolean isThrottlesEnabled() { |
| return throttlesEnabled; |
| } |
| |
| @Override |
| public void onInstanceConfigChange(List<InstanceConfig> instanceConfig, |
| NotificationContext context) { |
| if (instanceConfig.get(0).getInstanceOperation() |
| .equals(InstanceConstants.InstanceOperation.SWAP_IN)) { |
| throttlesEnabled = false; |
| } else { |
| throttlesEnabled = true; |
| } |
| } |
| } |
| |
| private MockParticipantManager createParticipant(String participantName) throws Exception { |
| // start dummy participants |
| MockParticipantManager participant = |
| new MockParticipantManager(ZK_ADDR, CLUSTER_NAME, participantName, 10, null); |
| StateMachineEngine stateMachine = participant.getStateMachineEngine(); |
| // Using a delayed state model |
| StDelayMSStateModelFactory delayFactory = new StDelayMSStateModelFactory(); |
| stateMachine.registerStateModelFactory("MasterSlave", delayFactory); |
| return participant; |
| } |
| |
| private void addParticipant(String participantName) throws Exception { |
| addParticipant(participantName, UUID.randomUUID().toString(), |
| "zone_" + _participants.size() % ZONE_COUNT, null, -1); |
| } |
| |
| private void addParticipant(String participantName, String logicalId, String zone, |
| InstanceConstants.InstanceOperation instanceOperation, int capacity) |
| throws Exception { |
| addParticipant(participantName, logicalId, zone, instanceOperation, capacity, null); |
| } |
| |
| private void addParticipant(String participantName, String logicalId, String zone, |
| InstanceConstants.InstanceOperation instanceOperation, int capacity, |
| InstanceConfigChangeListener listener) throws Exception { |
| InstanceConfig config = new InstanceConfig.Builder().setDomain( |
| String.format("%s=%s, %s=%s, %s=%s", ZONE, zone, HOST, participantName, LOGICAL_ID, |
| logicalId)).setInstanceOperation(instanceOperation) |
| .build(participantName); |
| |
| if (capacity >= 0) { |
| config.setInstanceCapacityMap(ImmutableMap.of(TEST_CAPACITY_KEY, capacity)); |
| } |
| _gSetupTool.getClusterManagementTool().addInstance(CLUSTER_NAME, config); |
| |
| MockParticipantManager participant = createParticipant(participantName); |
| |
| participant.syncStart(); |
| if (listener != null) { |
| participant.addListener(listener, |
| new PropertyKey.Builder(CLUSTER_NAME).instanceConfig(participantName), |
| HelixConstants.ChangeType.INSTANCE_CONFIG, |
| new Watcher.Event.EventType[]{Watcher.Event.EventType.NodeDataChanged}); |
| } |
| _participants.add(participant); |
| _participantNames.add(participantName); |
| _nextStartPort++; |
| } |
| |
| private void createTestDBs(long delayTime) throws InterruptedException { |
| createResourceWithDelayedRebalance(CLUSTER_NAME, "TEST_DB0_CRUSHED", |
| BuiltInStateModelDefinitions.LeaderStandby.name(), PARTITIONS, REPLICA, REPLICA - 1, -1, |
| CrushEdRebalanceStrategy.class.getName()); |
| _allDBs.add("TEST_DB0_CRUSHED"); |
| createResourceWithDelayedRebalance(CLUSTER_NAME, "TEST_DB1_CRUSHED", |
| BuiltInStateModelDefinitions.LeaderStandby.name(), PARTITIONS, REPLICA, REPLICA - 1, 2000000, |
| CrushEdRebalanceStrategy.class.getName()); |
| _allDBs.add("TEST_DB1_CRUSHED"); |
| createResourceWithWagedRebalance(CLUSTER_NAME, "TEST_DB2_WAGED", BuiltInStateModelDefinitions.LeaderStandby.name(), |
| PARTITIONS, REPLICA, REPLICA - 1); |
| _allDBs.add("TEST_DB2_WAGED"); |
| |
| Assert.assertTrue(_clusterVerifier.verifyByPolling()); |
| } |
| |
| private void dropTestDBs(Set<String> dbs) throws Exception { |
| for (String db : dbs) { |
| _gSetupTool.getClusterManagementTool().dropResource(CLUSTER_NAME, db); |
| _allDBs.remove(db); |
| } |
| Assert.assertTrue(_clusterVerifier.verifyByPolling()); |
| } |
| |
| private Map<String, ExternalView> getEVs() { |
| Map<String, ExternalView> externalViews = new HashMap<String, ExternalView>(); |
| for (String db : _allDBs) { |
| ExternalView ev = _gSetupTool.getClusterManagementTool().getResourceExternalView(CLUSTER_NAME, db); |
| externalViews.put(db, ev); |
| } |
| return externalViews; |
| } |
| |
| private boolean verifyIS(String evacuateInstanceName) { |
| for (String db : _allDBs) { |
| IdealState is = _gSetupTool.getClusterManagementTool().getResourceIdealState(CLUSTER_NAME, db); |
| for (String partition : is.getPartitionSet()) { |
| List<String> newPAssignedParticipants = is.getPreferenceList(partition); |
| if (newPAssignedParticipants.contains(evacuateInstanceName)) { |
| System.out.println("partition " + partition + " assignment " + newPAssignedParticipants + " ev " + evacuateInstanceName); |
| return false; |
| } |
| } |
| } |
| return true; |
| } |
| |
| private Set<String> getParticipantsInEv(ExternalView ev) { |
| Set<String> assignedParticipants = new HashSet<>(); |
| for (String partition : ev.getPartitionSet()) { |
| ev.getStateMap(partition) |
| .keySet() |
| .stream() |
| .filter(k -> !ev.getStateMap(partition).get(k).equals("OFFLINE")) |
| .forEach(assignedParticipants::add); |
| } |
| return assignedParticipants; |
| } |
| |
| private Map<String, String> getPartitionsAndStatesOnInstance(Map<String, ExternalView> evs, |
| String instanceName) { |
| Map<String, String> instancePartitions = new HashMap<>(); |
| for (String resourceEV : evs.keySet()) { |
| for (String partition : evs.get(resourceEV).getPartitionSet()) { |
| if (evs.get(resourceEV).getStateMap(partition).containsKey(instanceName)) { |
| instancePartitions.put(partition, |
| evs.get(resourceEV).getStateMap(partition).get(instanceName)); |
| } |
| } |
| } |
| |
| return instancePartitions; |
| } |
| |
| private Map<String, Map<String, String>> getResourcePartitionStateOnInstance( |
| Map<String, ExternalView> evs, String instanceName) { |
| Map<String, Map<String, String>> stateByPartitionByResource = new HashMap<>(); |
| for (String resourceEV : evs.keySet()) { |
| for (String partition : evs.get(resourceEV).getPartitionSet()) { |
| if (evs.get(resourceEV).getStateMap(partition).containsKey(instanceName)) { |
| if (!stateByPartitionByResource.containsKey(resourceEV)) { |
| stateByPartitionByResource.put(resourceEV, new HashMap<>()); |
| } |
| stateByPartitionByResource.get(resourceEV) |
| .put(partition, evs.get(resourceEV).getStateMap(partition).get(instanceName)); |
| } |
| } |
| } |
| |
| return stateByPartitionByResource; |
| } |
| |
| private Set<String> getInstanceNames(Collection<InstanceConfig> instanceConfigs) { |
| return instanceConfigs.stream().map(InstanceConfig::getInstanceName) |
| .collect(Collectors.toSet()); |
| } |
| |
| private void validateRoutingTablesInstance(Map<String, ExternalView> evs, String instanceName, |
| boolean shouldContain) { |
| RoutingTableProvider[] routingTableProviders = |
| new RoutingTableProvider[]{_routingTableProviderDefault, _routingTableProviderEV, _routingTableProviderCS}; |
| getResourcePartitionStateOnInstance(evs, instanceName).forEach((resource, partitions) -> { |
| partitions.forEach((partition, state) -> { |
| Arrays.stream(routingTableProviders).forEach(rtp -> Assert.assertEquals( |
| getInstanceNames(rtp.getInstancesForResource(resource, partition, state)).contains( |
| instanceName), shouldContain)); |
| }); |
| }); |
| |
| Arrays.stream(routingTableProviders).forEach(rtp -> { |
| Assert.assertEquals(getInstanceNames(rtp.getInstanceConfigs()).contains(instanceName), |
| shouldContain); |
| }); |
| } |
| |
| private void validateEVCorrect(ExternalView actual, ExternalView original, |
| Map<String, String> swapOutInstancesToSwapInInstances, Set<String> inFlightSwapInInstances, |
| Set<String> completedSwapInInstanceNames) { |
| Assert.assertEquals(actual.getPartitionSet(), original.getPartitionSet()); |
| IdealState is = _gSetupTool.getClusterManagementTool() |
| .getResourceIdealState(CLUSTER_NAME, original.getResourceName()); |
| StateModelDefinition stateModelDef = _gSetupTool.getClusterManagementTool() |
| .getStateModelDef(CLUSTER_NAME, is.getStateModelDefRef()); |
| for (String partition : actual.getPartitionSet()) { |
| Map<String, String> expectedStateMap = new HashMap<>(original.getStateMap(partition)); |
| for (String swapOutInstance : swapOutInstancesToSwapInInstances.keySet()) { |
| if (expectedStateMap.containsKey(swapOutInstance) && inFlightSwapInInstances.contains( |
| swapOutInstancesToSwapInInstances.get(swapOutInstance))) { |
| // If the corresponding swapInInstance is in-flight, add it to the expectedStateMap |
| // with the same state as the swapOutInstance or secondState if the swapOutInstance |
| // has a topState. |
| expectedStateMap.put(swapOutInstancesToSwapInInstances.get(swapOutInstance), |
| expectedStateMap.get(swapOutInstance).equals(stateModelDef.getTopState()) |
| ? (String) stateModelDef.getSecondTopStates().toArray()[0] |
| : expectedStateMap.get(swapOutInstance)); |
| } else if (expectedStateMap.containsKey(swapOutInstance) |
| && completedSwapInInstanceNames.contains( |
| swapOutInstancesToSwapInInstances.get(swapOutInstance))) { |
| // If the corresponding swapInInstance is completed, add it to the expectedStateMap |
| // with the same state as the swapOutInstance. |
| expectedStateMap.put(swapOutInstancesToSwapInInstances.get(swapOutInstance), |
| expectedStateMap.get(swapOutInstance)); |
| expectedStateMap.remove(swapOutInstance); |
| } |
| } |
| Assert.assertEquals(actual.getStateMap(partition), expectedStateMap, "Error for partition " + partition |
| + " in resource " + actual.getResourceName()); |
| } |
| } |
| |
| private boolean validateEVsCorrect(Map<String, ExternalView> actuals, |
| Map<String, ExternalView> originals, Map<String, String> swapOutInstancesToSwapInInstances, |
| Set<String> inFlightSwapInInstances, Set<String> completedSwapInInstanceNames) { |
| Assert.assertEquals(actuals.keySet(), originals.keySet()); |
| for (String resource : actuals.keySet()) { |
| validateEVCorrect(actuals.get(resource), originals.get(resource), |
| swapOutInstancesToSwapInInstances, inFlightSwapInInstances, completedSwapInInstanceNames); |
| } |
| return true; |
| } |
| |
| private void validateAssignmentInEv(ExternalView ev) { |
| validateAssignmentInEv(ev, REPLICA); |
| } |
| |
| private void validateAssignmentInEv(ExternalView ev, int expectedNumber) { |
| Set<String> partitionSet = ev.getPartitionSet(); |
| for (String partition : partitionSet) { |
| AtomicInteger activeReplicaCount = new AtomicInteger(); |
| ev.getStateMap(partition).values().stream().filter(ACCEPTABLE_STATE_SET::contains) |
| .forEach(v -> activeReplicaCount.getAndIncrement()); |
| Assert.assertTrue(activeReplicaCount.get() >=expectedNumber); |
| } |
| } |
| |
| private void setUpWagedBaseline() { |
| _assignmentMetadataStore = new AssignmentMetadataStore(new ZkBucketDataAccessor(ZK_ADDR), CLUSTER_NAME) { |
| public Map<String, ResourceAssignment> getBaseline() { |
| // Ensure this metadata store always read from the ZK without using cache. |
| super.reset(); |
| return super.getBaseline(); |
| } |
| |
| public synchronized Map<String, ResourceAssignment> getBestPossibleAssignment() { |
| // Ensure this metadata store always read from the ZK without using cache. |
| super.reset(); |
| return super.getBestPossibleAssignment(); |
| } |
| }; |
| |
| // Set test instance capacity and partition weights |
| ClusterConfig clusterConfig = _dataAccessor.getProperty(_dataAccessor.keyBuilder().clusterConfig()); |
| clusterConfig.setInstanceCapacityKeys(Collections.singletonList(TEST_CAPACITY_KEY)); |
| clusterConfig.setDefaultInstanceCapacityMap( |
| Collections.singletonMap(TEST_CAPACITY_KEY, TEST_CAPACITY_VALUE)); |
| clusterConfig.setDefaultPartitionWeightMap(Collections.singletonMap(TEST_CAPACITY_KEY, 1)); |
| _dataAccessor.setProperty(_dataAccessor.keyBuilder().clusterConfig(), clusterConfig); |
| } |
| |
| // A state transition model where either downward ST are slow (_stateModelDelay >0) or upward ST are slow (_stateModelDelay <0) |
| public class StDelayMSStateModelFactory extends StateModelFactory<StDelayMSStateModel> { |
| |
| @Override |
| public StDelayMSStateModel createNewStateModel(String resourceName, String partitionKey) { |
| StDelayMSStateModel model = new StDelayMSStateModel(); |
| return model; |
| } |
| } |
| |
| @StateModelInfo(initialState = "OFFLINE", states = {"MASTER", "SLAVE", "ERROR"}) |
| public class StDelayMSStateModel extends StateModel { |
| |
| public StDelayMSStateModel() { |
| _cancelled = false; |
| } |
| |
| private void sleepWhileNotCanceled(long sleepTime) throws InterruptedException{ |
| while(sleepTime >0 && !isCancelled()) { |
| Thread.sleep(TIMEOUT); |
| sleepTime = sleepTime - TIMEOUT; |
| } |
| if (isCancelled()) { |
| _cancelled = false; |
| throw new HelixRollbackException("EX"); |
| } |
| } |
| |
| @Transition(to = "SLAVE", from = "OFFLINE") |
| public void onBecomeSlaveFromOffline(Message message, NotificationContext context) throws InterruptedException { |
| if (_stateModelDelay < 0) { |
| sleepWhileNotCanceled(Math.abs(_stateModelDelay)); |
| } |
| } |
| |
| @Transition(to = "MASTER", from = "SLAVE") |
| public void onBecomeMasterFromSlave(Message message, NotificationContext context) throws InterruptedException { |
| if (_stateModelDelay < 0) { |
| sleepWhileNotCanceled(Math.abs(_stateModelDelay)); |
| } |
| } |
| |
| @Transition(to = "SLAVE", from = "MASTER") |
| public void onBecomeSlaveFromMaster(Message message, NotificationContext context) throws InterruptedException { |
| if (_stateModelDelay > 0) { |
| sleepWhileNotCanceled(_stateModelDelay); |
| } |
| } |
| |
| @Transition(to = "OFFLINE", from = "SLAVE") |
| public void onBecomeOfflineFromSlave(Message message, NotificationContext context) throws InterruptedException { |
| if (_stateModelDelay > 0) { |
| sleepWhileNotCanceled(_stateModelDelay); |
| } |
| } |
| |
| @Transition(to = "DROPPED", from = "OFFLINE") |
| public void onBecomeDroppedFromOffline(Message message, NotificationContext context) throws InterruptedException { |
| if (_stateModelDelay > 0) { |
| sleepWhileNotCanceled(_stateModelDelay); |
| } |
| } |
| } |
| } |