blob: 9a5e08595219104d41dbf48c282788f122f9d22c [file] [log] [blame]
package org.apache.helix.controller.rebalancer;
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
import java.io.IOException;
import java.io.InputStream;
import java.util.ArrayList;
import java.util.Collections;
import java.util.Date;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Random;
import java.util.Set;
import java.util.UUID;
import org.apache.helix.zookeeper.datamodel.ZNRecord;
import org.apache.helix.controller.stages.BaseStageTest;
import org.apache.helix.controller.stages.CurrentStateOutput;
import org.apache.helix.model.BuiltInStateModelDefinitions;
import org.apache.helix.model.ClusterConfig;
import org.apache.helix.model.IdealState;
import org.apache.helix.model.Message;
import org.apache.helix.model.Partition;
import org.apache.helix.model.StateModelDefinition;
import org.codehaus.jackson.map.ObjectMapper;
import org.codehaus.jackson.map.ObjectReader;
import org.testng.Assert;
import org.testng.annotations.DataProvider;
import org.testng.annotations.Test;
public class TestZeroReplicaAvoidance extends BaseStageTest {
@Test(dataProvider = "zeroReplicaInput")
public void testZeroReplicaAvoidanceDuringRebalance(StateModelDefinition stateModelDef,
List<String> instancePreferenceList, Map<String, String> currentStateMap,
Map<String, List<Message>> pendingMessages, Map<String, String> expectedBestPossibleMap) {
System.out
.println("START TestDelayedAutoRebalancer at " + new Date(System.currentTimeMillis()));
System.err.println("Test input: " + instancePreferenceList + ":" + currentStateMap + ":");
int numNode = 6;
Set<String> liveInstances = new HashSet<>();
for (int i = 0; i < numNode; i++) {
liveInstances.add("localhost_" + i);
}
IdealState is = new IdealState("test");
is.setReplicas("3");
Partition partition = new Partition("testPartition");
DelayedAutoRebalancer rebalancer = new DelayedAutoRebalancer();
CurrentStateOutput currentStateOutput = new CurrentStateOutput();
for (String instance : currentStateMap.keySet()) {
currentStateOutput.setCurrentState("test", partition, instance,
currentStateMap.get(instance));
}
Set<String> allInstances = new HashSet<>(instancePreferenceList);
allInstances.addAll(currentStateMap.keySet());
if (pendingMessages != null) {
for (String instance : allInstances) {
List<Message> messages = pendingMessages.get(instance);
if (messages != null) {
for (Message message : messages) {
currentStateOutput.setPendingMessage("test", partition, instance, message);
}
}
}
}
Map<String, String> bestPossibleMap = rebalancer.computeBestPossibleStateForPartition(
liveInstances, stateModelDef, instancePreferenceList, currentStateOutput,
Collections.emptySet(), is, new ClusterConfig("TestCluster"), partition);
Assert.assertEquals(bestPossibleMap, expectedBestPossibleMap,
"Differs, get " + bestPossibleMap + "\nexpected: " + expectedBestPossibleMap
+ "\ncurrentState: " + currentStateMap + "\npreferenceList: " + instancePreferenceList);
System.out
.println("END TestBestPossibleStateCalcStage at " + new Date(System.currentTimeMillis()));
}
@DataProvider(name = "zeroReplicaInput")
public Object[][] rebalanceStrategies() {
List<Object[]> data = new ArrayList<>();
data.addAll(loadTestInputs("TestDelayedAutoRebalancer.MasterSlave.json"));
data.addAll(loadTestInputs("TestDelayedAutoRebalancer.OnlineOffline.json"));
Object[][] ret = new Object[data.size()][];
for (int i = 0; i < data.size(); i++) {
ret[i] = data.get(i);
}
return ret;
}
private final String INPUT = "inputs";
private final String CURRENT_STATE = "currentStates";
private final String PENDING_MESSAGES = "pendingMessages";
private final String BEST_POSSIBLE_STATE = "bestPossibleStates";
private final String PREFERENCE_LIST = "preferenceList";
private final String STATE_MODEL = "statemodel";
public List<Object[]> loadTestInputs(String fileName) {
List<Object[]> ret = null;
InputStream inputStream = getClass().getClassLoader().getResourceAsStream(fileName);
try {
ObjectReader mapReader = new ObjectMapper().reader(Map.class);
Map<String, Object> inputMaps = mapReader.readValue(inputStream);
String stateModelName = (String) inputMaps.get(STATE_MODEL);
StateModelDefinition stateModelDef =
BuiltInStateModelDefinitions.valueOf(stateModelName).getStateModelDefinition();
List<Map<String, Object>> inputs = (List<Map<String, Object>>) inputMaps.get(INPUT);
ret = new ArrayList<>();
for (Map<String, Object> inMap : inputs) {
Map<String, String> currentStates = (Map<String, String>) inMap.get(CURRENT_STATE);
Map<String, String> bestPossibleStates =
(Map<String, String>) inMap.get(BEST_POSSIBLE_STATE);
List<String> preferenceList = (List<String>) inMap.get(PREFERENCE_LIST);
Map<String, String> pendingStates = (Map<String, String>) inMap.get(PENDING_MESSAGES);
Map<String, List<Message>> pendingMessages = null;
if (pendingStates != null) {
Random r = new Random();
pendingMessages = new HashMap<>();
for (String instance : pendingStates.keySet()) {
pendingMessages.put(instance, new ArrayList<>());
Message m = new Message(new ZNRecord(UUID.randomUUID().toString()));
m.setFromState(pendingStates.get(instance).split(":")[0]);
m.setToState(pendingStates.get(instance).split(":")[1]);
pendingMessages.get(instance).add(m);
}
}
ret.add(new Object[] {
stateModelDef, preferenceList, currentStates, pendingMessages, bestPossibleStates
});
}
} catch (IOException e) {
e.printStackTrace();
}
return ret;
}
}