| package org.apache.helix.messaging.p2pMessage; |
| |
| /* |
| * Licensed to the Apache Software Foundation (ASF) under one |
| * or more contributor license agreements. See the NOTICE file |
| * distributed with this work for additional information |
| * regarding copyright ownership. The ASF licenses this file |
| * to you under the Apache License, Version 2.0 (the |
| * "License"); you may not use this file except in compliance |
| * with the License. You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, |
| * software distributed under the License is distributed on an |
| * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY |
| * KIND, either express or implied. See the License for the |
| * specific language governing permissions and limitations |
| * under the License. |
| */ |
| |
| import java.util.List; |
| import java.util.Map; |
| import java.util.concurrent.Executors; |
| |
| import org.apache.helix.HelixConstants; |
| import org.apache.helix.controller.common.PartitionStateMap; |
| import org.apache.helix.controller.common.ResourcesStateMap; |
| import org.apache.helix.controller.dataproviders.ResourceControllerDataProvider; |
| import org.apache.helix.controller.pipeline.Pipeline; |
| import org.apache.helix.controller.stages.AttributeName; |
| import org.apache.helix.controller.stages.BaseStageTest; |
| import org.apache.helix.controller.stages.BestPossibleStateCalcStage; |
| import org.apache.helix.controller.stages.CurrentStateOutput; |
| import org.apache.helix.controller.stages.IntermediateStateCalcStage; |
| import org.apache.helix.controller.stages.MessageGenerationPhase; |
| import org.apache.helix.controller.stages.MessageOutput; |
| import org.apache.helix.controller.stages.MessageSelectionStage; |
| import org.apache.helix.controller.stages.MessageThrottleStage; |
| import org.apache.helix.controller.stages.ReadClusterDataStage; |
| import org.apache.helix.model.BuiltInStateModelDefinitions; |
| import org.apache.helix.model.ClusterConfig; |
| import org.apache.helix.model.IdealState; |
| import org.apache.helix.model.MasterSlaveSMD; |
| import org.apache.helix.model.Message; |
| import org.apache.helix.model.Partition; |
| import org.apache.helix.model.Resource; |
| import org.testng.Assert; |
| import org.testng.annotations.Test; |
| |
| public class TestP2PMessagesAvoidDuplicatedMessage extends BaseStageTest { |
| String _db = "testDB"; |
| int _numPartition = 1; |
| int _numReplica = 3; |
| |
| Partition _partition = new Partition(_db + "_0"); |
| |
| ResourceControllerDataProvider _dataCache; |
| Pipeline _fullPipeline; |
| Pipeline _messagePipeline; |
| |
| ResourcesStateMap _bestpossibleState; |
| |
| private void preSetup() throws Exception { |
| setupIdealState(3, new String[] { _db }, _numPartition, _numReplica, |
| IdealState.RebalanceMode.SEMI_AUTO, BuiltInStateModelDefinitions.MasterSlave.name()); |
| setupStateModel(); |
| setupInstances(3); |
| setupLiveInstances(3); |
| |
| ClusterConfig clusterConfig = new ClusterConfig(_clusterName); |
| clusterConfig.enableP2PMessage(true); |
| setClusterConfig(clusterConfig); |
| |
| Map<String, Resource> resourceMap = getResourceMap(new String[] { _db }, _numPartition, |
| BuiltInStateModelDefinitions.MasterSlave.name(), clusterConfig, null); |
| |
| _dataCache = new ResourceControllerDataProvider(); |
| _dataCache.setAsyncTasksThreadPool(Executors.newSingleThreadExecutor()); |
| |
| event.addAttribute(AttributeName.ControllerDataProvider.name(), _dataCache); |
| event.addAttribute(AttributeName.RESOURCES.name(), resourceMap); |
| event.addAttribute(AttributeName.RESOURCES_TO_REBALANCE.name(), resourceMap); |
| event.addAttribute(AttributeName.CURRENT_STATE.name(), new CurrentStateOutput()); |
| event.addAttribute(AttributeName.helixmanager.name(), manager); |
| |
| _fullPipeline = new Pipeline("FullPipeline"); |
| _fullPipeline.addStage(new ReadClusterDataStage()); |
| _fullPipeline.addStage(new BestPossibleStateCalcStage()); |
| _fullPipeline.addStage(new MessageGenerationPhase()); |
| _fullPipeline.addStage(new MessageSelectionStage()); |
| _fullPipeline.addStage(new IntermediateStateCalcStage()); |
| _fullPipeline.addStage(new MessageThrottleStage()); |
| |
| _messagePipeline = new Pipeline("MessagePipeline"); |
| _messagePipeline.addStage(new MessageGenerationPhase()); |
| _messagePipeline.addStage(new MessageSelectionStage()); |
| _messagePipeline.addStage(new IntermediateStateCalcStage()); |
| _messagePipeline.addStage(new MessageThrottleStage()); |
| |
| |
| _fullPipeline.handle(event); |
| _bestpossibleState = |
| event.getAttribute(AttributeName.BEST_POSSIBLE_STATE.name()); |
| } |
| |
| |
| @Test |
| public void testP2PAvoidDuplicatedMessage() throws Exception { |
| preSetup(); |
| |
| // Scenario 1: |
| // Disable old master ((initialMaster) instance, |
| // Validate: a M->S message should be sent to initialMaster with a P2P message attached for secondMaster. |
| String initialMaster = getTopStateInstance(_bestpossibleState.getInstanceStateMap(_db, _partition), |
| MasterSlaveSMD.States.MASTER.name()); |
| Assert.assertNotNull(initialMaster); |
| |
| // disable existing master instance |
| admin.enableInstance(_clusterName, initialMaster, false); |
| _dataCache = event.getAttribute(AttributeName.ControllerDataProvider.name()); |
| _dataCache.notifyDataChange(HelixConstants.ChangeType.INSTANCE_CONFIG); |
| |
| CurrentStateOutput currentStateOutput = |
| populateCurrentStateFromBestPossible(_bestpossibleState); |
| event.addAttribute(AttributeName.CURRENT_STATE.name(), currentStateOutput); |
| |
| _fullPipeline.handle(event); |
| |
| _bestpossibleState = event.getAttribute(AttributeName.BEST_POSSIBLE_STATE.name()); |
| |
| MessageOutput messageOutput = |
| event.getAttribute(AttributeName.MESSAGES_SELECTED.name()); |
| List<Message> messages = messageOutput.getMessages(_db, _partition); |
| |
| Assert.assertEquals(messages.size(), 1); |
| Message toSlaveMessage = messages.get(0); |
| Assert.assertEquals(toSlaveMessage.getTgtName(), initialMaster); |
| Assert.assertEquals(toSlaveMessage.getFromState(), MasterSlaveSMD.States.MASTER.name()); |
| Assert.assertEquals(toSlaveMessage.getToState(), MasterSlaveSMD.States.SLAVE.name()); |
| |
| // verify p2p message are attached to the M->S message sent to the old master instance |
| Assert.assertEquals(toSlaveMessage.getRelayMessages().entrySet().size(), 1); |
| String secondMaster = |
| getTopStateInstance(_bestpossibleState.getInstanceStateMap(_db, _partition), MasterSlaveSMD.States.MASTER.name()); |
| |
| Message relayMessage = toSlaveMessage.getRelayMessage(secondMaster); |
| Assert.assertNotNull(relayMessage); |
| Assert.assertEquals(relayMessage.getMsgSubType(), Message.MessageType.RELAYED_MESSAGE.name()); |
| Assert.assertEquals(relayMessage.getTgtName(), secondMaster); |
| Assert.assertEquals(relayMessage.getRelaySrcHost(), initialMaster); |
| Assert.assertEquals(relayMessage.getFromState(), MasterSlaveSMD.States.SLAVE.name()); |
| Assert.assertEquals(relayMessage.getToState(), MasterSlaveSMD.States.MASTER.name()); |
| |
| // Scenario 2A: |
| // Old master (initialMaster) completes the M->S transition, |
| // but has not forward p2p message to new master (secondMaster) yet. |
| // Validate: Controller should not send S->M message to new master. |
| currentStateOutput.setCurrentState(_db, _partition, initialMaster, "SLAVE"); |
| currentStateOutput.setPendingMessage(_db, _partition, initialMaster, toSlaveMessage); |
| currentStateOutput.setPendingRelayMessage(_db, _partition, initialMaster, relayMessage); |
| |
| event.addAttribute(AttributeName.CURRENT_STATE.name(), currentStateOutput); |
| |
| _fullPipeline.handle(event); |
| |
| messageOutput = event.getAttribute(AttributeName.MESSAGES_SELECTED.name()); |
| messages = messageOutput.getMessages(_db, _partition); |
| Assert.assertEquals(messages.size(), 0); |
| |
| |
| // Scenario 2B: |
| // Old master (initialMaster) completes the M->S transition, |
| // There is a pending p2p message to new master (secondMaster). |
| // Validate: Controller should send S->M message to new master at same time. |
| |
| currentStateOutput.setCurrentState(_db, _partition, initialMaster, "SLAVE"); |
| currentStateOutput.getPendingMessageMap(_db, _partition).clear(); |
| currentStateOutput.setPendingRelayMessage(_db, _partition, initialMaster, relayMessage); |
| |
| event.addAttribute(AttributeName.CURRENT_STATE.name(), currentStateOutput); |
| |
| _messagePipeline.handle(event); |
| |
| messageOutput = event.getAttribute(AttributeName.MESSAGES_SELECTED.name()); |
| messages = messageOutput.getMessages(_db, _partition); |
| Assert.assertEquals(messages.size(), 2); |
| |
| boolean hasToOffline = false; |
| boolean hasToMaster = false; |
| for (Message msg : messages) { |
| if (msg.getToState().equals(MasterSlaveSMD.States.MASTER.name()) && msg.getTgtName() |
| .equals(secondMaster)) { |
| hasToMaster = true; |
| } |
| if (msg.getToState().equals(MasterSlaveSMD.States.OFFLINE.name()) && msg.getTgtName() |
| .equals(initialMaster)) { |
| hasToOffline = true; |
| } |
| } |
| Assert.assertTrue(hasToMaster); |
| Assert.assertTrue(hasToOffline); |
| |
| // Secenario 2C |
| // Old master (initialMaster) completes the M->S transition, |
| // There is a pending p2p message to new master (secondMaster). |
| // However, the new master has been changed in bestPossible |
| // Validate: Controller should not send S->M message to the third master at same time. |
| |
| String thirdMaster = |
| getTopStateInstance(_bestpossibleState.getInstanceStateMap(_db, _partition), |
| MasterSlaveSMD.States.SLAVE.name()); |
| |
| Map<String, String> instanceStateMap = _bestpossibleState.getInstanceStateMap(_db, _partition); |
| instanceStateMap.put(secondMaster, "SLAVE"); |
| instanceStateMap.put(thirdMaster, "MASTER"); |
| _bestpossibleState.setState(_db, _partition, instanceStateMap); |
| |
| |
| event.addAttribute(AttributeName.CURRENT_STATE.name(), currentStateOutput); |
| event.addAttribute(AttributeName.BEST_POSSIBLE_STATE.name(), _bestpossibleState); |
| |
| _messagePipeline.handle(event); |
| |
| messageOutput = event.getAttribute(AttributeName.MESSAGES_SELECTED.name()); |
| messages = messageOutput.getMessages(_db, _partition); |
| Assert.assertEquals(messages.size(), 1); |
| Assert.assertTrue(messages.get(0).getToState().equals("OFFLINE")); |
| Assert.assertTrue(messages.get(0).getTgtName().equals(initialMaster)); |
| |
| |
| // Scenario 3: |
| // Old master (initialMaster) completes the M->S transition, |
| // and has already forwarded p2p message to new master (secondMaster) |
| // The original S->M message sent to old master has been removed. |
| // Validate: Controller should send S->O to old master, but not S->M message to new master. |
| instanceStateMap = _bestpossibleState.getInstanceStateMap(_db, _partition); |
| instanceStateMap.put(secondMaster, "MASTER"); |
| instanceStateMap.put(thirdMaster, "SLAVE"); |
| _bestpossibleState.setState(_db, _partition, instanceStateMap); |
| |
| currentStateOutput = |
| populateCurrentStateFromBestPossible(_bestpossibleState); |
| currentStateOutput.setCurrentState(_db, _partition, initialMaster, "SLAVE"); |
| currentStateOutput.setPendingMessage(_db, _partition, secondMaster, relayMessage); |
| event.addAttribute(AttributeName.CURRENT_STATE.name(), currentStateOutput); |
| |
| _fullPipeline.handle(event); |
| |
| messageOutput = |
| event.getAttribute(AttributeName.MESSAGES_SELECTED.name()); |
| messages = messageOutput.getMessages(_db, _partition); |
| Assert.assertEquals(messages.size(), 1); |
| |
| Message toOfflineMessage = messages.get(0); |
| Assert.assertEquals(toOfflineMessage.getTgtName(), initialMaster); |
| Assert.assertEquals(toOfflineMessage.getFromState(), MasterSlaveSMD.States.SLAVE.name()); |
| Assert.assertEquals(toOfflineMessage.getToState(), MasterSlaveSMD.States.OFFLINE.name()); |
| |
| |
| // Scenario 4: |
| // The old master (initialMaster) finish state transition, but has not forward p2p message yet. |
| // Then the preference list has changed, so now the new master (thirdMaster) is different from previously calculated new master (secondMaster) |
| // Validate: controller should not send S->M to thirdMaster. |
| currentStateOutput.setCurrentState(_db, _partition, initialMaster, "OFFLINE"); |
| event.addAttribute(AttributeName.CURRENT_STATE.name(), currentStateOutput); |
| |
| thirdMaster = |
| getTopStateInstance(_bestpossibleState.getInstanceStateMap(_db, _partition), |
| MasterSlaveSMD.States.SLAVE.name()); |
| |
| instanceStateMap = _bestpossibleState.getInstanceStateMap(_db, _partition); |
| instanceStateMap.put(secondMaster, "SLAVE"); |
| instanceStateMap.put(thirdMaster, "MASTER"); |
| _bestpossibleState.setState(_db, _partition, instanceStateMap); |
| |
| event.addAttribute(AttributeName.BEST_POSSIBLE_STATE.name(), _bestpossibleState); |
| |
| _messagePipeline.handle(event); |
| |
| messageOutput = |
| event.getAttribute(AttributeName.MESSAGES_SELECTED.name()); |
| messages = messageOutput.getMessages(_db, _partition); |
| Assert.assertEquals(messages.size(), 0); |
| |
| |
| // Scenario 5: |
| // The initial master has forwarded the p2p message to secondMaster and deleted original M->S message on initialMaster, |
| // But the S->M state-transition has not completed yet in secondMaster. |
| // Validate: Controller should not send S->M to thirdMaster. |
| currentStateOutput.setPendingMessage(_db, _partition, secondMaster, relayMessage); |
| event.addAttribute(AttributeName.CURRENT_STATE.name(), currentStateOutput); |
| |
| event.addAttribute(AttributeName.BEST_POSSIBLE_STATE.name(), _bestpossibleState); |
| |
| _messagePipeline.handle(event); |
| |
| messageOutput = |
| event.getAttribute(AttributeName.MESSAGES_SELECTED.name()); |
| messages = messageOutput.getMessages(_db, _partition); |
| Assert.assertEquals(messages.size(), 0); |
| |
| |
| // Scenario 5: |
| // The thirdMaster completed the state transition and deleted the p2p message. |
| // Validate: Controller should M->S message to secondMaster. |
| currentStateOutput = |
| populateCurrentStateFromBestPossible(_bestpossibleState); |
| currentStateOutput.setCurrentState(_db, _partition, secondMaster, "MASTER"); |
| currentStateOutput.setCurrentState(_db, _partition, thirdMaster, "SLAVE"); |
| |
| event.addAttribute(AttributeName.CURRENT_STATE.name(), currentStateOutput); |
| |
| _messagePipeline.handle(event); |
| |
| messageOutput = |
| event.getAttribute(AttributeName.MESSAGES_SELECTED.name()); |
| messages = messageOutput.getMessages(_db, _partition); |
| Assert.assertEquals(messages.size(), 1); |
| |
| toSlaveMessage = messages.get(0); |
| Assert.assertEquals(toSlaveMessage.getTgtName(), secondMaster); |
| Assert.assertEquals(toSlaveMessage.getFromState(), MasterSlaveSMD.States.MASTER.name()); |
| Assert.assertEquals(toSlaveMessage.getToState(), MasterSlaveSMD.States.SLAVE.name()); |
| |
| // verify p2p message are attached to the M->S message sent to the secondMaster |
| Assert.assertEquals(toSlaveMessage.getRelayMessages().entrySet().size(), 1); |
| |
| relayMessage = toSlaveMessage.getRelayMessage(thirdMaster); |
| Assert.assertNotNull(relayMessage); |
| Assert.assertEquals(relayMessage.getMsgSubType(), Message.MessageType.RELAYED_MESSAGE.name()); |
| Assert.assertEquals(relayMessage.getTgtName(), thirdMaster); |
| Assert.assertEquals(relayMessage.getRelaySrcHost(), secondMaster); |
| Assert.assertEquals(relayMessage.getFromState(), MasterSlaveSMD.States.SLAVE.name()); |
| Assert.assertEquals(relayMessage.getToState(), MasterSlaveSMD.States.MASTER.name()); |
| } |
| |
| |
| private String getTopStateInstance(Map<String, String> instanceStateMap, String topState) { |
| String masterInstance = null; |
| for (Map.Entry<String, String> e : instanceStateMap.entrySet()) { |
| if (topState.equals(e.getValue())) { |
| masterInstance = e.getKey(); |
| } |
| } |
| |
| return masterInstance; |
| } |
| |
| private CurrentStateOutput populateCurrentStateFromBestPossible( |
| ResourcesStateMap bestPossibleStateOutput) { |
| CurrentStateOutput currentStateOutput = new CurrentStateOutput(); |
| for (String resource : bestPossibleStateOutput.getResourceStatesMap().keySet()) { |
| PartitionStateMap partitionStateMap = bestPossibleStateOutput.getPartitionStateMap(resource); |
| for (Partition p : partitionStateMap.partitionSet()) { |
| Map<String, String> stateMap = partitionStateMap.getPartitionMap(p); |
| |
| for (Map.Entry<String, String> e : stateMap.entrySet()) { |
| currentStateOutput.setCurrentState(resource, p, e.getKey(), e.getValue()); |
| } |
| } |
| } |
| return currentStateOutput; |
| } |
| } |