blob: d8bbcb4bf17c8e0d8473bb8bee6ce5cdc911de82 [file] [log] [blame]
package org.apache.helix.integration;
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Random;
import java.util.concurrent.ConcurrentHashMap;
import org.apache.helix.HelixManager;
import org.apache.helix.PropertyPathBuilder;
import org.apache.helix.TestHelper;
import org.apache.helix.zookeeper.datamodel.ZNRecord;
import org.apache.helix.common.ZkTestBase;
import org.apache.helix.integration.manager.ClusterControllerManager;
import org.apache.helix.integration.manager.MockParticipantManager;
import org.apache.helix.manager.zk.ZNRecordSerializer;
import org.apache.helix.zookeeper.api.client.HelixZkClient;
import org.apache.helix.zookeeper.impl.factory.SharedZkClientFactory;
import org.apache.helix.model.IdealState.IdealStateProperty;
import org.apache.helix.model.IdealState.RebalanceMode;
import org.apache.helix.store.PropertyJsonSerializer;
import org.apache.helix.store.PropertyStoreException;
import org.apache.helix.tools.ClusterSetup;
import org.apache.helix.tools.ClusterVerifiers.BestPossibleExternalViewVerifier;
import org.apache.helix.tools.ClusterVerifiers.ZkHelixClusterVerifier;
import org.apache.helix.tools.DefaultIdealStateCalculator;
import org.apache.helix.tools.TestCommand;
import org.apache.helix.tools.TestCommand.CommandType;
import org.apache.helix.tools.TestExecutor;
import org.apache.helix.tools.TestExecutor.ZnodePropertyType;
import org.apache.helix.tools.TestTrigger;
import org.apache.helix.tools.ZnodeOpArg;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.testng.Assert;
public class TestDriver {
private static Logger LOG = LoggerFactory.getLogger(TestDriver.class);
private static final String ZK_ADDR = ZkTestBase.ZK_ADDR;
// private static final String CLUSTER_PREFIX = "TestDriver";
private static final String STATE_MODEL = "MasterSlave";
private static final String TEST_DB_PREFIX = "TestDB";
private static final int START_PORT = 12918;
private static final String CONTROLLER_PREFIX = "controller";
private static final String PARTICIPANT_PREFIX = "localhost";
private static final Random RANDOM = new Random();
private static final PropertyJsonSerializer<ZNRecord> SERIALIZER =
new PropertyJsonSerializer<ZNRecord>(ZNRecord.class);
private static final Map<String, TestInfo> _testInfoMap =
new ConcurrentHashMap<String, TestInfo>();
public static class TestInfo {
public final HelixZkClient _zkClient;
public final String _clusterName;
public final int _numDb;
public final int _numPartitionsPerDb;
public final int _numNode;
public final int _replica;
public final Map<String, HelixManager> _managers =
new ConcurrentHashMap<String, HelixManager>();
public TestInfo(String clusterName, HelixZkClient zkClient, int numDb, int numPartitionsPerDb,
int numNode, int replica) {
this._clusterName = clusterName;
this._zkClient = zkClient;
this._numDb = numDb;
this._numPartitionsPerDb = numPartitionsPerDb;
this._numNode = numNode;
this._replica = replica;
}
}
public static TestInfo getTestInfo(String uniqClusterName) {
if (!_testInfoMap.containsKey(uniqClusterName)) {
String errMsg = "Cluster hasn't been setup for " + uniqClusterName;
throw new IllegalArgumentException(errMsg);
}
TestInfo testInfo = _testInfoMap.get(uniqClusterName);
return testInfo;
}
public static void setupClusterWithoutRebalance(String uniqClusterName, String zkAddr,
int numResources, int numPartitionsPerResource, int numInstances, int replica)
throws Exception {
setupCluster(uniqClusterName, zkAddr, numResources, numPartitionsPerResource, numInstances,
replica, false);
}
public static void setupCluster(String uniqClusterName, String zkAddr, int numResources,
int numPartitionsPerResource, int numInstances, int replica) throws Exception {
setupCluster(uniqClusterName, zkAddr, numResources, numPartitionsPerResource, numInstances,
replica, true);
}
public static void setupCluster(String uniqClusterName, String zkAddr, int numResources,
int numPartitionsPerResource, int numInstances, int replica, boolean doRebalance)
throws Exception {
HelixZkClient zkClient = SharedZkClientFactory.getInstance()
.buildZkClient(new HelixZkClient.ZkConnectionConfig(ZK_ADDR));
try {
zkClient.setZkSerializer(new ZNRecordSerializer());
// String clusterName = CLUSTER_PREFIX + "_" + uniqClusterName;
String clusterName = uniqClusterName;
if (zkClient.exists("/" + clusterName)) {
LOG.warn("test cluster already exists:" + clusterName + ", test name:" + uniqClusterName + " is not unique or test has been run without cleaning up zk; deleting it");
zkClient.deleteRecursively("/" + clusterName);
}
if (_testInfoMap.containsKey(uniqClusterName)) {
LOG.warn("test info already exists:" + uniqClusterName + " is not unique or test has been run without cleaning up test info map; removing it");
_testInfoMap.remove(uniqClusterName);
}
TestInfo testInfo =
new TestInfo(clusterName, zkClient, numResources, numPartitionsPerResource, numInstances,
replica);
_testInfoMap.put(uniqClusterName, testInfo);
ClusterSetup setupTool = new ClusterSetup(zkAddr);
setupTool.addCluster(clusterName, true);
for (int i = 0; i < numInstances; i++) {
int port = START_PORT + i;
setupTool.addInstanceToCluster(clusterName, PARTICIPANT_PREFIX + "_" + port);
}
for (int i = 0; i < numResources; i++) {
String dbName = TEST_DB_PREFIX + i;
setupTool.addResourceToCluster(clusterName, dbName, numPartitionsPerResource, STATE_MODEL);
if (doRebalance) {
setupTool.rebalanceStorageCluster(clusterName, dbName, replica);
// String idealStatePath = "/" + clusterName + "/" +
// PropertyType.IDEALSTATES.toString() + "/"
// + dbName;
// ZNRecord idealState = zkClient.<ZNRecord> readData(idealStatePath);
// testInfo._idealStateMap.put(dbName, idealState);
}
}
} finally {
zkClient.close();
}
}
/**
* starting a dummy participant with a given id
* @param uniqClusterName
* @param instanceId
*/
public static void startDummyParticipant(String uniqClusterName, int instanceId) throws Exception {
startDummyParticipants(uniqClusterName, new int[] {
instanceId
});
}
public static void startDummyParticipants(String uniqClusterName, int[] instanceIds)
throws Exception {
if (!_testInfoMap.containsKey(uniqClusterName)) {
String errMsg = "test cluster hasn't been setup:" + uniqClusterName;
throw new IllegalArgumentException(errMsg);
}
TestInfo testInfo = _testInfoMap.get(uniqClusterName);
String clusterName = testInfo._clusterName;
for (int id : instanceIds) {
String instanceName = PARTICIPANT_PREFIX + "_" + (START_PORT + id);
// if (testInfo._startCMResultMap.containsKey(instanceName)) {
if (testInfo._managers.containsKey(instanceName)) {
LOG.warn("Dummy participant:" + instanceName + " has already started; skip starting it");
} else {
// StartCMResult result = TestHelper.startDummyProcess(ZK_ADDR, clusterName, instanceName);
MockParticipantManager participant =
new MockParticipantManager(ZK_ADDR, clusterName, instanceName);
participant.syncStart();
testInfo._managers.put(instanceName, participant);
// testInfo._instanceStarted.countDown();
}
}
}
public static void startController(String uniqClusterName) throws Exception {
startController(uniqClusterName, new int[] {
0
});
}
public static void startController(String uniqClusterName, int[] nodeIds) throws Exception {
if (!_testInfoMap.containsKey(uniqClusterName)) {
String errMsg = "test cluster hasn't been setup:" + uniqClusterName;
throw new IllegalArgumentException(errMsg);
}
TestInfo testInfo = _testInfoMap.get(uniqClusterName);
String clusterName = testInfo._clusterName;
for (int id : nodeIds) {
String controllerName = CONTROLLER_PREFIX + "_" + id;
if (testInfo._managers.containsKey(controllerName)) {
LOG.warn("Controller:" + controllerName + " has already started; skip starting it");
} else {
ClusterControllerManager controller =
new ClusterControllerManager(ZK_ADDR, clusterName, controllerName);
controller.syncStart();
testInfo._managers.put(controllerName, controller);
}
}
}
public static void verifyCluster(String uniqClusterName, long beginTime, long timeout)
throws Exception {
Thread.sleep(beginTime);
if (!_testInfoMap.containsKey(uniqClusterName)) {
String errMsg = "test cluster hasn't been setup:" + uniqClusterName;
throw new IllegalArgumentException(errMsg);
}
TestInfo testInfo = _testInfoMap.get(uniqClusterName);
String clusterName = testInfo._clusterName;
ZkHelixClusterVerifier verifier =
new BestPossibleExternalViewVerifier.Builder(clusterName)
.setZkAddr(ZK_ADDR)
.setWaitTillVerify(TestHelper.DEFAULT_REBALANCE_PROCESSING_WAIT_TIME)
.build();
try {
Assert.assertTrue(verifier.verifyByPolling());
} finally {
verifier.close();
}
}
public static void stopCluster(String uniqClusterName) throws Exception {
if (!_testInfoMap.containsKey(uniqClusterName)) {
String errMsg = "test cluster hasn't been setup:" + uniqClusterName;
throw new IllegalArgumentException(errMsg);
}
TestInfo testInfo = _testInfoMap.remove(uniqClusterName);
// stop controller first
for (String instanceName : testInfo._managers.keySet()) {
if (instanceName.startsWith(CONTROLLER_PREFIX)) {
ClusterControllerManager controller =
(ClusterControllerManager) testInfo._managers.get(instanceName);
controller.syncStop();
}
}
Thread.sleep(1000);
for (String instanceName : testInfo._managers.keySet()) {
if (!instanceName.startsWith(CONTROLLER_PREFIX)) {
MockParticipantManager participant =
(MockParticipantManager) testInfo._managers.get(instanceName);
participant.syncStop();
}
}
testInfo._zkClient.close();
}
public static void stopDummyParticipant(String uniqClusterName, long beginTime, int instanceId)
throws Exception {
if (!_testInfoMap.containsKey(uniqClusterName)) {
String errMsg = "test cluster hasn't been setup:" + uniqClusterName;
throw new Exception(errMsg);
}
TestInfo testInfo = _testInfoMap.get(uniqClusterName);
String failHost = PARTICIPANT_PREFIX + "_" + (START_PORT + instanceId);
MockParticipantManager participant =
(MockParticipantManager) testInfo._managers.remove(failHost);
// TODO need sync
if (participant == null) {
String errMsg = "Dummy participant:" + failHost + " seems not running";
LOG.error(errMsg);
} else {
// System.err.println("try to stop participant: " +
// result._manager.getInstanceName());
// NodeOpArg arg = new NodeOpArg(result._manager, result._thread);
// TestCommand command = new TestCommand(CommandType.STOP, new TestTrigger(beginTime), arg);
// List<TestCommand> commandList = new ArrayList<TestCommand>();
// commandList.add(command);
// TestExecutor.executeTestAsync(commandList, ZK_ADDR);
participant.syncStop();
}
}
public static void setIdealState(String uniqClusterName, long beginTime, int percentage)
throws Exception {
if (!_testInfoMap.containsKey(uniqClusterName)) {
String errMsg = "test cluster hasn't been setup:" + uniqClusterName;
throw new IllegalArgumentException(errMsg);
}
TestInfo testInfo = _testInfoMap.get(uniqClusterName);
String clusterName = testInfo._clusterName;
List<String> instanceNames = new ArrayList<String>();
for (int i = 0; i < testInfo._numNode; i++) {
int port = START_PORT + i;
instanceNames.add(PARTICIPANT_PREFIX + "_" + port);
}
List<TestCommand> commandList = new ArrayList<TestCommand>();
for (int i = 0; i < testInfo._numDb; i++) {
String dbName = TEST_DB_PREFIX + i;
ZNRecord destIS =
DefaultIdealStateCalculator.calculateIdealState(instanceNames,
testInfo._numPartitionsPerDb, testInfo._replica - 1, dbName, "MASTER", "SLAVE");
// destIS.setId(dbName);
destIS.setSimpleField(IdealStateProperty.REBALANCE_MODE.toString(),
RebalanceMode.CUSTOMIZED.toString());
destIS.setSimpleField(IdealStateProperty.NUM_PARTITIONS.toString(),
Integer.toString(testInfo._numPartitionsPerDb));
destIS.setSimpleField(IdealStateProperty.STATE_MODEL_DEF_REF.toString(), STATE_MODEL);
destIS.setSimpleField(IdealStateProperty.REPLICAS.toString(), "" + testInfo._replica);
// String idealStatePath = "/" + clusterName + "/" +
// PropertyType.IDEALSTATES.toString() + "/"
// + TEST_DB_PREFIX + i;
ZNRecord initIS = new ZNRecord(dbName); // _zkClient.<ZNRecord>
// readData(idealStatePath);
initIS.setSimpleField(IdealStateProperty.REBALANCE_MODE.toString(),
RebalanceMode.CUSTOMIZED.toString());
initIS.setSimpleField(IdealStateProperty.NUM_PARTITIONS.toString(),
Integer.toString(testInfo._numPartitionsPerDb));
initIS.setSimpleField(IdealStateProperty.STATE_MODEL_DEF_REF.toString(), STATE_MODEL);
initIS.setSimpleField(IdealStateProperty.REPLICAS.toString(), "" + testInfo._replica);
int totalStep = calcuateNumTransitions(initIS, destIS);
// LOG.info("initIS:" + initIS);
// LOG.info("destIS:" + destIS);
// LOG.info("totalSteps from initIS to destIS:" + totalStep);
// System.out.println("initIS:" + initIS);
// System.out.println("destIS:" + destIS);
ZNRecord nextIS;
int step = totalStep * percentage / 100;
System.out.println("Resource:" + dbName + ", totalSteps from initIS to destIS:" + totalStep
+ ", walk " + step + " steps(" + percentage + "%)");
nextIS = nextIdealState(initIS, destIS, step);
// testInfo._idealStateMap.put(dbName, nextIS);
String idealStatePath = PropertyPathBuilder.idealState(clusterName, TEST_DB_PREFIX + i);
ZnodeOpArg arg = new ZnodeOpArg(idealStatePath, ZnodePropertyType.ZNODE, "+", nextIS);
TestCommand command = new TestCommand(CommandType.MODIFY, new TestTrigger(beginTime), arg);
commandList.add(command);
}
TestExecutor.executeTestAsync(commandList, ZK_ADDR);
}
private static List<String[]> findAllUnfinishPairs(ZNRecord cur, ZNRecord dest) {
// find all (host, resource) pairs that haven't reached destination state
List<String[]> list = new ArrayList<String[]>();
Map<String, Map<String, String>> map = dest.getMapFields();
for (Map.Entry<String, Map<String, String>> entry : map.entrySet()) {
String partitionName = entry.getKey();
Map<String, String> hostMap = entry.getValue();
for (Map.Entry<String, String> hostEntry : hostMap.entrySet()) {
String host = hostEntry.getKey();
String destState = hostEntry.getValue();
Map<String, String> curHostMap = cur.getMapField(partitionName);
String curState = null;
if (curHostMap != null) {
curState = curHostMap.get(host);
}
String[] pair = new String[3];
if (curState == null) {
if (destState.equalsIgnoreCase("SLAVE")) {
pair[0] = new String(partitionName);
pair[1] = new String(host);
pair[2] = new String("1"); // number of transitions required
list.add(pair);
} else if (destState.equalsIgnoreCase("MASTER")) {
pair[0] = new String(partitionName);
pair[1] = new String(host);
pair[2] = new String("2"); // number of transitions required
list.add(pair);
}
} else {
if (curState.equalsIgnoreCase("SLAVE") && destState.equalsIgnoreCase("MASTER")) {
pair[0] = new String(partitionName);
pair[1] = new String(host);
pair[2] = new String("1"); // number of transitions required
list.add(pair);
}
}
}
}
return list;
}
private static int calcuateNumTransitions(ZNRecord start, ZNRecord end) {
int totalSteps = 0;
List<String[]> list = findAllUnfinishPairs(start, end);
for (String[] pair : list) {
totalSteps += Integer.parseInt(pair[2]);
}
return totalSteps;
}
private static ZNRecord nextIdealState(final ZNRecord cur, final ZNRecord dest, final int steps)
throws PropertyStoreException {
// get a deep copy
ZNRecord next = SERIALIZER.deserialize(SERIALIZER.serialize(cur));
List<String[]> list = findAllUnfinishPairs(cur, dest);
// randomly pick up pairs that haven't reached destination state and
// progress
for (int i = 0; i < steps; i++) {
int randomInt = RANDOM.nextInt(list.size());
String[] pair = list.get(randomInt);
String curState = null;
Map<String, String> curHostMap = next.getMapField(pair[0]);
if (curHostMap != null) {
curState = curHostMap.get(pair[1]);
}
final String destState = dest.getMapField(pair[0]).get(pair[1]);
// TODO generalize it using state-model
if (curState == null && destState != null) {
Map<String, String> hostMap = next.getMapField(pair[0]);
if (hostMap == null) {
hostMap = new HashMap<String, String>();
}
hostMap.put(pair[1], "SLAVE");
next.setMapField(pair[0], hostMap);
} else if (curState.equalsIgnoreCase("SLAVE") && destState != null
&& destState.equalsIgnoreCase("MASTER")) {
next.getMapField(pair[0]).put(pair[1], "MASTER");
} else {
LOG.error("fail to calculate the next ideal state");
}
curState = next.getMapField(pair[0]).get(pair[1]);
if (curState != null && curState.equalsIgnoreCase(destState)) {
list.remove(randomInt);
}
}
LOG.info("nextIS:" + next);
return next;
}
}