blob: 7f17dea77d185918523f1203947a9b90992d2ad7 [file] [log] [blame]
package org.apache.helix.controller.stages;
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import org.apache.helix.api.config.StateTransitionThrottleConfig;
import org.apache.helix.controller.dataproviders.ResourceControllerDataProvider;
import org.apache.helix.model.ClusterConfig;
import org.apache.helix.model.IdealState;
import org.apache.helix.model.Message;
import org.apache.helix.model.Partition;
import org.testng.Assert;
import org.testng.annotations.Test;
public class TestIntermediateStateCalcStage extends BaseStageTest {
private ClusterConfig _clusterConfig;
public void testNoStateMissing() {
String resourcePrefix = "resource";
int nResource = 4;
int nPartition = 2;
int nReplica = 3;
String[] resources = new String[nResource];
for (int i = 0; i < nResource; i++) {
resources[i] = resourcePrefix + "_" + i;
preSetup(resources, nReplica, nReplica);
event.addAttribute(, getResourceMap(resources, nPartition, "OnlineOffline"));
getResourceMap(resources, nPartition, "OnlineOffline"));
// Initialize bestpossible state and current state
BestPossibleStateOutput bestPossibleStateOutput = new BestPossibleStateOutput();
CurrentStateOutput currentStateOutput = new CurrentStateOutput();
MessageOutput messageSelectOutput = new MessageOutput();
IntermediateStateOutput expectedResult = new IntermediateStateOutput();
for (String resource : resources) {
IdealState is = accessor.getProperty(accessor.keyBuilder().idealStates(resource));
Map<String, List<String>> partitionMap = new HashMap<>();
for (int p = 0; p < nPartition; p++) {
Partition partition = new Partition(resource + "_" + p);
for (int r = 0; r < nReplica; r++) {
String instanceName = HOSTNAME_PREFIX + r;
partitionMap.put(partition.getPartitionName(), Collections.singletonList(instanceName));
if (resource.endsWith("0")) {
// Regular recovery balance
currentStateOutput.setCurrentState(resource, partition, instanceName, "OFFLINE");
// add blocked state transition messages
Message pendingMessage = generateMessage("OFFLINE", "ONLINE", instanceName);
currentStateOutput.setPendingMessage(resource, partition, instanceName, pendingMessage);
bestPossibleStateOutput.setState(resource, partition, instanceName, "ONLINE");
// should be recovered:
expectedResult.setState(resource, partition, instanceName, "ONLINE");
} else if (resource.endsWith("1")) {
// Regular load balance
currentStateOutput.setCurrentState(resource, partition, instanceName, "ONLINE");
currentStateOutput.setCurrentState(resource, partition, instanceName + "-1", "OFFLINE");
bestPossibleStateOutput.setState(resource, partition, instanceName, "ONLINE");
messageSelectOutput.addMessage(resource, partition,
generateMessage("OFFLINE", "DROPPED", instanceName + "-1"));
// should be recovered:
expectedResult.setState(resource, partition, instanceName, "ONLINE");
} else if (resource.endsWith("2")) {
// Recovery balance with transient states, should keep the current states in the output.
currentStateOutput.setCurrentState(resource, partition, instanceName, "OFFLINE");
bestPossibleStateOutput.setState(resource, partition, instanceName, "OFFLINE");
// should be kept unchanged:
expectedResult.setState(resource, partition, instanceName, "OFFLINE");
} else if (resource.endsWith("3")) {
// One unresolved error should not prevent recovery balance
bestPossibleStateOutput.setState(resource, partition, instanceName, "ONLINE");
if (p == 0) {
if (r == 0) {
currentStateOutput.setCurrentState(resource, partition, instanceName, "ERROR");
bestPossibleStateOutput.setState(resource, partition, instanceName, "ERROR");
// This partition is still ERROR
expectedResult.setState(resource, partition, instanceName, "ERROR");
} else {
currentStateOutput.setCurrentState(resource, partition, instanceName, "OFFLINE");
messageSelectOutput.addMessage(resource, partition, generateMessage("OFFLINE", "ONLINE", instanceName));
// Recovery balance
expectedResult.setState(resource, partition, instanceName, "ONLINE");
} else {
currentStateOutput.setCurrentState(resource, partition, instanceName, "ONLINE");
currentStateOutput.setCurrentState(resource, partition, instanceName + "-1", "OFFLINE");
// load balance is throttled, so keep all current states
messageSelectOutput.addMessage(resource, partition,
generateMessage("OFFLINE", "DROPPED", instanceName + "-1"));
expectedResult.setState(resource, partition, instanceName, "ONLINE");
// The following must be removed because now downward state transitions are allowed
// expectedResult.setState(resource, partition, instanceName + "-1", "OFFLINE");
} else if (resource.endsWith("4")) {
// Test that partitions with replicas to drop are dropping them when recovery is
// happening for other partitions
if (p == 0) {
// This partition requires recovery
currentStateOutput.setCurrentState(resource, partition, instanceName, "OFFLINE");
bestPossibleStateOutput.setState(resource, partition, instanceName, "ONLINE");
messageSelectOutput.addMessage(resource, partition, generateMessage("OFFLINE", "ONLINE", instanceName));
// After recovery, it should be back ONLINE
expectedResult.setState(resource, partition, instanceName, "ONLINE");
} else {
// Other partitions require dropping of replicas
currentStateOutput.setCurrentState(resource, partition, instanceName, "ONLINE");
currentStateOutput.setCurrentState(resource, partition, instanceName + "-1", "OFFLINE");
// BestPossibleState dictates that we only need one ONLINE replica
bestPossibleStateOutput.setState(resource, partition, instanceName, "ONLINE");
bestPossibleStateOutput.setState(resource, partition, instanceName + "-1", "DROPPED");
messageSelectOutput.addMessage(resource, partition,
generateMessage("OFFLINE", "DROPPED", instanceName + "-1"));
// So instanceName-1 will NOT be expected to show up in expectedResult
expectedResult.setState(resource, partition, instanceName, "ONLINE");
expectedResult.setState(resource, partition, instanceName + "-1", "DROPPED");
} else if (resource.endsWith("5")) {
// Test that load balance bringing up a new replica does NOT happen with a recovery
// partition
if (p == 0) {
// Set up a partition requiring recovery
currentStateOutput.setCurrentState(resource, partition, instanceName, "OFFLINE");
bestPossibleStateOutput.setState(resource, partition, instanceName, "ONLINE");
messageSelectOutput.addMessage(resource, partition, generateMessage("OFFLINE", "ONLINE", instanceName));
// After recovery, it should be back ONLINE
expectedResult.setState(resource, partition, instanceName, "ONLINE");
} else {
currentStateOutput.setCurrentState(resource, partition, instanceName, "ONLINE");
bestPossibleStateOutput.setState(resource, partition, instanceName, "ONLINE");
// Check that load balance (bringing up a new node) did not take place
bestPossibleStateOutput.setState(resource, partition, instanceName + "-1", "ONLINE");
messageSelectOutput.addMessage(resource, partition,
generateMessage("OFFLINE", "ONLINE", instanceName + "-1"));
expectedResult.setState(resource, partition, instanceName, "ONLINE");
bestPossibleStateOutput.setPreferenceLists(resource, partitionMap);
event.addAttribute(, bestPossibleStateOutput);
event.addAttribute(, messageSelectOutput);
event.addAttribute(, currentStateOutput);
event.addAttribute(, new ResourceControllerDataProvider());
runStage(event, new ReadClusterDataStage());
runStage(event, new IntermediateStateCalcStage());
IntermediateStateOutput output = event.getAttribute(;
for (String resource : resources) {
// Note Assert.assertEquals won't work. If "actual" is an empty map, it won't compare
// anything.
public void testWithClusterConfigChange() {
String resourcePrefix = "resource";
int nResource = 1;
int nPartition = 2;
int nReplica = 3;
String[] resources = new String[nResource];
for (int i = 0; i < nResource; i++) {
resources[i] = resourcePrefix + "_" + i;
preSetup(resources, nReplica, nReplica);
event.addAttribute(, getResourceMap(resources, nPartition, "OnlineOffline"));
getResourceMap(resources, nPartition, "OnlineOffline"));
// Initialize best possible state and current state
BestPossibleStateOutput bestPossibleStateOutput = new BestPossibleStateOutput();
MessageOutput messageSelectOutput = new MessageOutput();
CurrentStateOutput currentStateOutput = new CurrentStateOutput();
IntermediateStateOutput expectedResult = new IntermediateStateOutput();
for (String resource : resources) {
IdealState is = accessor.getProperty(accessor.keyBuilder().idealStates(resource));
Map<String, List<String>> partitionMap = new HashMap<>();
for (int p = 0; p < nPartition; p++) {
Partition partition = new Partition(resource + "_" + p);
for (int r = 0; r < nReplica; r++) {
String instanceName = HOSTNAME_PREFIX + r;
partitionMap.put(partition.getPartitionName(), Collections.singletonList(instanceName));
if (resource.endsWith("0")) {
// Test that when the threshold is set at a number greater than the number of error and
// recovery partitions, load balance DOES take place
if (p == 0) {
// Set up a partition requiring recovery
currentStateOutput.setCurrentState(resource, partition, instanceName, "OFFLINE");
bestPossibleStateOutput.setState(resource, partition, instanceName, "ONLINE");
messageSelectOutput.addMessage(resource, partition, generateMessage("OFFLINE", "ONLINE", instanceName));
// After recovery, it should be back ONLINE
expectedResult.setState(resource, partition, instanceName, "ONLINE");
} else {
// Ensure we have at least one ONLINE replica so that this partition does not need
// recovery
currentStateOutput.setCurrentState(resource, partition, instanceName, "ONLINE");
bestPossibleStateOutput.setState(resource, partition, instanceName, "ONLINE");
expectedResult.setState(resource, partition, instanceName, "ONLINE");
// This partition to bring up a replica (load balance will happen)
bestPossibleStateOutput.setState(resource, partition, instanceName + "-1", "ONLINE");
messageSelectOutput.addMessage(resource, partition,
generateMessage("OFFLINE", "ONLINE", instanceName + "-1"));
expectedResult.setState(resource, partition, instanceName + "-1", "ONLINE");
bestPossibleStateOutput.setPreferenceLists(resource, partitionMap);
event.addAttribute(, bestPossibleStateOutput);
event.addAttribute(, currentStateOutput);
event.addAttribute(, messageSelectOutput);
event.addAttribute(, new ResourceControllerDataProvider());
runStage(event, new ReadClusterDataStage());
runStage(event, new IntermediateStateCalcStage());
IntermediateStateOutput output = event.getAttribute(;
for (String resource : resources) {
// Note Assert.assertEquals won't work. If "actual" is an empty map, it won't compare
// anything.
private void preSetup(String[] resources, int numOfLiveInstances, int numOfReplicas) {
setupIdealState(numOfLiveInstances, resources, numOfLiveInstances, numOfReplicas,
IdealState.RebalanceMode.FULL_AUTO, "OnlineOffline");
// Set up cluster configs
_clusterConfig = accessor.getProperty(accessor.keyBuilder().clusterConfig());
new StateTransitionThrottleConfig(StateTransitionThrottleConfig.RebalanceType.RECOVERY_BALANCE,
StateTransitionThrottleConfig.ThrottleScope.INSTANCE, 3),
new StateTransitionThrottleConfig(StateTransitionThrottleConfig.RebalanceType.LOAD_BALANCE,
StateTransitionThrottleConfig.ThrottleScope.INSTANCE, 3)));