blob: 0072b3917aa7c745d6b539ff9828079585fb8f9c [file] [log] [blame]
package org.apache.helix.controller.stages;
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap;
import java.util.HashMap;
import java.util.Map;
import java.util.List;
import org.apache.helix.PropertyKey.Builder;
import org.apache.helix.controller.dataproviders.WorkflowControllerDataProvider;
import org.apache.helix.model.ClusterConfig;
import org.apache.helix.model.InstanceConfig;
import org.apache.helix.monitoring.mbeans.ClusterStatusMonitor;
import org.apache.helix.zookeeper.datamodel.ZNRecord;
import org.apache.helix.controller.dataproviders.ResourceControllerDataProvider;
import org.apache.helix.model.CurrentState;
import org.apache.helix.model.Message;
import org.apache.helix.model.Partition;
import org.apache.helix.model.Resource;
import org.testng.AssertJUnit;
import org.testng.annotations.Test;
public class TestCurrentStateComputationStage extends BaseStageTest {
@Test
public void testEmptyCS() {
Map<String, Resource> resourceMap = getResourceMap();
event.addAttribute(AttributeName.RESOURCES.name(), resourceMap);
event.addAttribute(AttributeName.RESOURCES_TO_REBALANCE.name(), resourceMap);
ResourceControllerDataProvider dataCache = new ResourceControllerDataProvider();
event.addAttribute(AttributeName.ControllerDataProvider.name(), dataCache);
event.addAttribute(AttributeName.clusterStatusMonitor.name(), new ClusterStatusMonitor(_clusterName));
CurrentStateComputationStage stage = new CurrentStateComputationStage();
runStage(event, new ReadClusterDataStage());
ClusterConfig clsCfg = dataCache.getClusterConfig();
clsCfg.setInstanceCapacityKeys(ImmutableList.of("s1", "s2", "s3"));
dataCache.setClusterConfig(clsCfg);
dataCache.setInstanceConfigMap(ImmutableMap.of(
"a", new InstanceConfig("a")
));
runStage(event, stage);
CurrentStateOutput output = event.getAttribute(AttributeName.CURRENT_STATE.name());
AssertJUnit.assertEquals(
output.getCurrentStateMap("testResourceName", new Partition("testResourceName_0")).size(),
0);
}
@Test
public void testSimpleCS() {
// setup resource
Map<String, Resource> resourceMap = getResourceMap();
setupLiveInstances(5);
event.addAttribute(AttributeName.RESOURCES.name(), resourceMap);
event.addAttribute(AttributeName.RESOURCES_TO_REBALANCE.name(), resourceMap);
ResourceControllerDataProvider dataCache = new ResourceControllerDataProvider();
event.addAttribute(AttributeName.ControllerDataProvider.name(), dataCache);
CurrentStateComputationStage stage = new CurrentStateComputationStage();
runStage(event, new ReadClusterDataStage());
runStage(event, stage);
CurrentStateOutput output1 = event.getAttribute(AttributeName.CURRENT_STATE.name());
AssertJUnit.assertEquals(
output1.getCurrentStateMap("testResourceName", new Partition("testResourceName_0")).size(),
0);
// Add a state transition messages
Message message = new Message(Message.MessageType.STATE_TRANSITION, "msg1");
message.setFromState("OFFLINE");
message.setToState("SLAVE");
message.setResourceName("testResourceName");
message.setPartitionName("testResourceName_1");
message.setTgtName("localhost_3");
message.setTgtSessionId("session_3");
Builder keyBuilder = accessor.keyBuilder();
accessor.setProperty(keyBuilder.message("localhost_" + 3, message.getId()), message);
runStage(event, new ReadClusterDataStage());
runStage(event, stage);
CurrentStateOutput output2 = event.getAttribute(AttributeName.CURRENT_STATE.name());
String pendingState =
output2.getPendingMessage("testResourceName", new Partition("testResourceName_1"),
"localhost_3").getToState();
AssertJUnit.assertEquals(pendingState, "SLAVE");
ZNRecord record1 = new ZNRecord("testResourceName");
// Add a current state that matches sessionId and one that does not match
CurrentState stateWithLiveSession = new CurrentState(record1);
stateWithLiveSession.setSessionId("session_3");
stateWithLiveSession.setStateModelDefRef("MasterSlave");
stateWithLiveSession.setState("testResourceName_1", "OFFLINE");
ZNRecord record2 = new ZNRecord("testResourceName");
CurrentState stateWithDeadSession = new CurrentState(record2);
stateWithDeadSession.setSessionId("session_dead");
stateWithDeadSession.setStateModelDefRef("MasterSlave");
stateWithDeadSession.setState("testResourceName_1", "MASTER");
ZNRecord record3 = new ZNRecord("testTaskResourceName");
CurrentState taskStateWithLiveSession = new CurrentState(record3);
taskStateWithLiveSession.setSessionId("session_3");
taskStateWithLiveSession.setStateModelDefRef("Task");
taskStateWithLiveSession.setState("testTaskResourceName_1", "INIT");
ZNRecord record4 = new ZNRecord("testTaskResourceName");
CurrentState taskStateWithDeadSession = new CurrentState(record4);
taskStateWithDeadSession.setSessionId("session_dead");
taskStateWithDeadSession.setStateModelDefRef("Task");
taskStateWithDeadSession.setState("testTaskResourceName_1", "INIT");
accessor.setProperty(keyBuilder.currentState("localhost_3", "session_3", "testResourceName"),
stateWithLiveSession);
accessor.setProperty(keyBuilder.currentState("localhost_3", "session_dead", "testResourceName"),
stateWithDeadSession);
accessor.setProperty(
keyBuilder.taskCurrentState("localhost_3", "session_3", "testTaskResourceName"),
taskStateWithLiveSession);
accessor.setProperty(
keyBuilder.taskCurrentState("localhost_3", "session_dead", "testTaskResourceName"),
taskStateWithDeadSession);
runStage(event, new ReadClusterDataStage());
runStage(event, stage);
CurrentStateOutput output3 = event.getAttribute(AttributeName.CURRENT_STATE.name());
String currentState =
output3.getCurrentState("testResourceName", new Partition("testResourceName_1"),
"localhost_3");
AssertJUnit.assertEquals(currentState, "OFFLINE");
// Non Task Framework event will cause task current states to be ignored
String taskCurrentState = output3
.getCurrentState("testTaskResourceName", new Partition("testTaskResourceName_1"),
"localhost_3");
AssertJUnit.assertNull(taskCurrentState);
// Add another state transition message which is stale
message = new Message(Message.MessageType.STATE_TRANSITION, "msg2");
message.setFromState("SLAVE");
message.setToState("OFFLINE");
message.setResourceName("testResourceName");
message.setPartitionName("testResourceName_1");
message.setTgtName("localhost_3");
message.setTgtSessionId("session_3");
accessor.setProperty(keyBuilder.message("localhost_" + 3, message.getId()), message);
runStage(event, new ReadClusterDataStage());
runStage(event, stage);
CurrentStateOutput output4 = event.getAttribute(AttributeName.CURRENT_STATE.name());
AssertJUnit.assertEquals(dataCache.getStaleMessages().size(), 1);
AssertJUnit.assertTrue(dataCache.getStaleMessages().containsKey("localhost_3"));
AssertJUnit.assertTrue(dataCache.getStaleMessages().get("localhost_3").containsKey("msg2"));
// Use a task event to check that task current states are included
resourceMap = new HashMap<String, Resource>();
Resource testTaskResource = new Resource("testTaskResourceName");
testTaskResource.setStateModelDefRef("Task");
testTaskResource.addPartition("testTaskResourceName_1");
resourceMap.put("testTaskResourceName", testTaskResource);
ClusterEvent taskEvent = new ClusterEvent(ClusterEventType.Unknown);
taskEvent.addAttribute(AttributeName.RESOURCES.name(), resourceMap);
taskEvent.addAttribute(AttributeName.ControllerDataProvider.name(),
new WorkflowControllerDataProvider());
runStage(taskEvent, new ReadClusterDataStage());
runStage(taskEvent, stage);
CurrentStateOutput output5 = taskEvent.getAttribute(AttributeName.CURRENT_STATE.name());
taskCurrentState = output5
.getCurrentState("testTaskResourceName", new Partition("testTaskResourceName_1"),
"localhost_3");
AssertJUnit.assertEquals(taskCurrentState, "INIT");
}
}