blob: 2fcfc3e59c6ec2b219f5bc9ca8f9bbaf35092d43 [file] [log] [blame]
package org.apache.helix.controller.stages;
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import org.apache.helix.HelixDefinedState;
import org.apache.helix.HelixManager;
import org.apache.helix.controller.common.PartitionStateMap;
import org.apache.helix.controller.common.ResourcesStateMap;
import org.apache.helix.controller.dataproviders.BaseControllerDataProvider;
import org.apache.helix.model.ClusterConfig;
import org.apache.helix.model.LiveInstance;
import org.apache.helix.model.Message;
import org.apache.helix.model.Partition;
import org.apache.helix.model.Resource;
import org.apache.helix.model.StateModelDefinition;
import org.apache.helix.task.TaskConstants;
import org.apache.helix.tools.StateModelConfigGenerator;
import org.testng.Assert;
import org.testng.annotations.Test;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;
public class TestCancellationMessageGeneration extends MessageGenerationPhase {
private static final String TEST_CLUSTER = "testCluster";
private static final String TEST_RESOURCE = "resource0";
private static final String TEST_INSTANCE = "instance0";
private static final String TEST_PARTITION = "partition0";
/*
* This test checks the cancellation message generation when currentState=null and desiredState=DROPPED
*/
@Test
public void TestOFFLINEToDROPPED() throws Exception {
ClusterEvent event = new ClusterEvent(TEST_CLUSTER, ClusterEventType.Unknown);
// Set current state to event
CurrentStateOutput currentStateOutput = mock(CurrentStateOutput.class);
Partition partition = mock(Partition.class);
when(partition.getPartitionName()).thenReturn(TEST_PARTITION);
when(currentStateOutput.getCurrentState(TEST_RESOURCE, partition, TEST_INSTANCE)).thenReturn(null);
Message message = mock(Message.class);
when(message.getFromState()).thenReturn("OFFLINE");
when(message.getToState()).thenReturn("SLAVE");
when(currentStateOutput.getPendingMessage(TEST_RESOURCE, partition, TEST_INSTANCE)).thenReturn(message);
event.addAttribute(AttributeName.CURRENT_STATE.name(), currentStateOutput);
// Set helix manager to event
event.addAttribute(AttributeName.helixmanager.name(), mock(HelixManager.class));
// Set controller data provider to event
BaseControllerDataProvider cache = mock(BaseControllerDataProvider.class);
StateModelDefinition stateModelDefinition = new StateModelDefinition(StateModelConfigGenerator.generateConfigForMasterSlave());
when(cache.getStateModelDef(TaskConstants.STATE_MODEL_NAME)).thenReturn(stateModelDefinition);
Map<String, LiveInstance> liveInstances= mock(Map.class);
LiveInstance mockLiveInstance = mock(LiveInstance.class);
when(mockLiveInstance.getInstanceName()).thenReturn(TEST_INSTANCE);
when(mockLiveInstance.getEphemeralOwner()).thenReturn("TEST");
when(liveInstances.values()).thenReturn(Arrays.asList(mockLiveInstance));
when(cache.getLiveInstances()).thenReturn(liveInstances);
ClusterConfig clusterConfig = mock(ClusterConfig.class);
when(cache.getClusterConfig()).thenReturn(clusterConfig);
when(clusterConfig.isStateTransitionCancelEnabled()).thenReturn(true);
event.addAttribute(AttributeName.ControllerDataProvider.name(), cache);
// Set resources to rebalance to event
Map<String, Resource> resourceMap = new HashMap<>();
Resource resource = mock(Resource.class);
when(resource.getResourceName()).thenReturn(TEST_RESOURCE);
List<Partition> partitions = Arrays.asList(partition);
when(resource.getPartitions()).thenReturn(partitions);
when(resource.getStateModelDefRef()).thenReturn(TaskConstants.STATE_MODEL_NAME);
resourceMap.put(TEST_RESOURCE, resource);
event.addAttribute(AttributeName.RESOURCES_TO_REBALANCE.name(), resourceMap);
// set up resource state map
BestPossibleStateOutput bestPossibleStateOutput = new BestPossibleStateOutput();
PartitionStateMap partitionStateMap = new PartitionStateMap(TEST_RESOURCE);
Map<Partition, Map<String, String>> stateMap = partitionStateMap.getStateMap();
Map<String, String> instanceStateMap = new HashMap<>();
instanceStateMap.put(TEST_INSTANCE, HelixDefinedState.DROPPED.name());
stateMap.put(partition, instanceStateMap);
bestPossibleStateOutput.setState(TEST_RESOURCE, partition, instanceStateMap);
event.addAttribute(AttributeName.BEST_POSSIBLE_STATE.name(), bestPossibleStateOutput);
process(event);
MessageOutput output = event.getAttribute(AttributeName.MESSAGES_ALL.name());
Assert.assertEquals(output.getMessages(TEST_RESOURCE, partition).size(), 1);
}
/*
* Tests that no cancellation message is created for
* pending ST message of error partition reset.
*/
@Test
public void testNoCancellationForErrorReset() throws Exception {
List<Message> messages = generateMessages("ERROR", "ERROR", "OFFLINE");
Assert.assertTrue(messages.isEmpty(), "Should not create cancellation message");
}
/*
* Tests that controller should be able to cancel ST: ONLINE -> OFFLINE
*/
@Test
public void testCancelOnlineToOffline() throws Exception {
List<Message> messages = generateMessages("ONLINE", "ONLINE", "OFFLINE");
Assert.assertEquals(messages.size(), 1, "Should create cancellation message");
Message msg = messages.get(0);
Assert.assertEquals(msg.getMsgType(), Message.MessageType.STATE_TRANSITION_CANCELLATION.name());
Assert.assertEquals(msg.getFromState(), "ONLINE");
Assert.assertEquals(msg.getToState(), "OFFLINE");
}
private List<Message> generateMessages(String currentState, String fromState, String toState)
throws Exception {
ClusterEvent event = new ClusterEvent(TEST_CLUSTER, ClusterEventType.Unknown);
// Set current state to event
CurrentStateOutput currentStateOutput = mock(CurrentStateOutput.class);
Partition partition = mock(Partition.class);
when(partition.getPartitionName()).thenReturn(TEST_PARTITION);
when(currentStateOutput.getCurrentState(TEST_RESOURCE, partition, TEST_INSTANCE))
.thenReturn(currentState);
// Pending message for error partition reset
Message pendingMessage = mock(Message.class);
when(pendingMessage.getFromState()).thenReturn(fromState);
when(pendingMessage.getToState()).thenReturn(toState);
when(currentStateOutput.getPendingMessage(TEST_RESOURCE, partition, TEST_INSTANCE))
.thenReturn(pendingMessage);
event.addAttribute(AttributeName.CURRENT_STATE.name(), currentStateOutput);
// Set helix manager to event
event.addAttribute(AttributeName.helixmanager.name(), mock(HelixManager.class));
StateModelDefinition stateModelDefinition = new StateModelDefinition.Builder("TestStateModel")
.addState("ONLINE", 1).addState("OFFLINE")
.addState("DROPPED").addState("ERROR")
.initialState("OFFLINE")
.addTransition("ERROR", "OFFLINE", 1).addTransition("ONLINE", "OFFLINE", 2)
.addTransition("OFFLINE", "DROPPED", 3).addTransition("OFFLINE", "ONLINE", 4)
.build();
// Set controller data provider to event
BaseControllerDataProvider cache = mock(BaseControllerDataProvider.class);
when(cache.getStateModelDef(TaskConstants.STATE_MODEL_NAME)).thenReturn(stateModelDefinition);
Map<String, LiveInstance> liveInstances = mock(Map.class);
LiveInstance mockLiveInstance = mock(LiveInstance.class);
when(mockLiveInstance.getInstanceName()).thenReturn(TEST_INSTANCE);
when(mockLiveInstance.getEphemeralOwner()).thenReturn("TEST");
when(liveInstances.values()).thenReturn(Collections.singletonList(mockLiveInstance));
when(cache.getLiveInstances()).thenReturn(liveInstances);
ClusterConfig clusterConfig = mock(ClusterConfig.class);
when(cache.getClusterConfig()).thenReturn(clusterConfig);
when(clusterConfig.isStateTransitionCancelEnabled()).thenReturn(true);
event.addAttribute(AttributeName.ControllerDataProvider.name(), cache);
// Set event attribute: resources to rebalance
Map<String, Resource> resourceMap = new HashMap<>();
Resource resource = mock(Resource.class);
when(resource.getResourceName()).thenReturn(TEST_RESOURCE);
List<Partition> partitions = Collections.singletonList(partition);
when(resource.getPartitions()).thenReturn(partitions);
when(resource.getStateModelDefRef()).thenReturn(TaskConstants.STATE_MODEL_NAME);
resourceMap.put(TEST_RESOURCE, resource);
event.addAttribute(AttributeName.RESOURCES_TO_REBALANCE.name(), resourceMap);
// set up resource state map
BestPossibleStateOutput bestPossibleStateOutput = new BestPossibleStateOutput();
PartitionStateMap partitionStateMap = new PartitionStateMap(TEST_RESOURCE);
Map<Partition, Map<String, String>> stateMap = partitionStateMap.getStateMap();
Map<String, String> instanceStateMap = new HashMap<>();
instanceStateMap.put(TEST_INSTANCE, currentState);
stateMap.put(partition, instanceStateMap);
bestPossibleStateOutput.setState(TEST_RESOURCE, partition, instanceStateMap);
// Process the event
event.addAttribute(AttributeName.BEST_POSSIBLE_STATE.name(), bestPossibleStateOutput);
process(event);
MessageOutput output = event.getAttribute(AttributeName.MESSAGES_ALL.name());
return output.getMessages(TEST_RESOURCE, partition);
}
}