blob: 79f238da7731897f95201518efe120e7a65a1329 [file] [log] [blame]
package org.apache.helix;
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
import java.io.File;
import java.io.IOException;
import java.lang.reflect.Method;
import java.net.ServerSocket;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.TreeMap;
import java.util.TreeSet;
import java.util.concurrent.Callable;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import org.apache.commons.io.FileUtils;
import org.apache.helix.PropertyKey.Builder;
import org.apache.helix.controller.rebalancer.strategy.CrushEdRebalanceStrategy;
import org.apache.helix.controller.rebalancer.strategy.RebalanceStrategy;
import org.apache.helix.integration.manager.ZkTestManager;
import org.apache.helix.manager.zk.CallbackHandler;
import org.apache.helix.manager.zk.ZKHelixAdmin;
import org.apache.helix.manager.zk.ZKHelixDataAccessor;
import org.apache.helix.manager.zk.ZNRecordSerializer;
import org.apache.helix.manager.zk.ZkBaseDataAccessor;
import org.apache.helix.model.CurrentState;
import org.apache.helix.model.ExternalView;
import org.apache.helix.model.IdealState.RebalanceMode;
import org.apache.helix.model.Message;
import org.apache.helix.model.Message.MessageType;
import org.apache.helix.model.StateModelDefinition;
import org.apache.helix.model.StateModelDefinition.StateModelDefinitionProperty;
import org.apache.helix.store.zk.ZNode;
import org.apache.helix.tools.ClusterSetup;
import org.apache.helix.util.ZKClientPool;
import org.apache.helix.zookeeper.api.client.HelixZkClient;
import org.apache.helix.zookeeper.api.client.RealmAwareZkClient;
import org.apache.helix.zookeeper.datamodel.ZNRecord;
import org.apache.helix.zookeeper.impl.factory.SharedZkClientFactory;
import org.apache.helix.zookeeper.zkclient.IDefaultNameSpace;
import org.apache.helix.zookeeper.zkclient.IZkChildListener;
import org.apache.helix.zookeeper.zkclient.IZkDataListener;
import org.apache.helix.zookeeper.zkclient.ZkClient;
import org.apache.helix.zookeeper.zkclient.ZkServer;
import org.apache.helix.zookeeper.zkclient.exception.ZkNoNodeException;
import org.apache.zookeeper.data.Stat;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.testng.Assert;
public class TestHelper {
private static final Logger LOG = LoggerFactory.getLogger(TestHelper.class);
public static final long WAIT_DURATION = 60 * 1000L; // 60 seconds
public static final int DEFAULT_REBALANCE_PROCESSING_WAIT_TIME = 1500;
/**
* Returns a unused random port.
*/
public static int getRandomPort() throws IOException {
ServerSocket sock = new ServerSocket();
sock.bind(null);
int port = sock.getLocalPort();
sock.close();
return port;
}
static public ZkServer startZkServer(final String zkAddress) throws Exception {
List<String> empty = Collections.emptyList();
return TestHelper.startZkServer(zkAddress, empty, true);
}
static public ZkServer startZkServer(final String zkAddress, final String rootNamespace)
throws Exception {
List<String> rootNamespaces = new ArrayList<String>();
rootNamespaces.add(rootNamespace);
return TestHelper.startZkServer(zkAddress, rootNamespaces, true);
}
static public ZkServer startZkServer(final String zkAddress, final List<String> rootNamespaces)
throws Exception {
return startZkServer(zkAddress, rootNamespaces, true);
}
static public ZkServer startZkServer(final String zkAddress, final List<String> rootNamespaces,
boolean overwrite) throws Exception {
System.out.println(
"Start zookeeper at " + zkAddress + " in thread " + Thread.currentThread().getName());
String zkDir = zkAddress.replace(':', '_');
final String logDir = "/tmp/" + zkDir + "/logs";
final String dataDir = "/tmp/" + zkDir + "/dataDir";
if (overwrite) {
FileUtils.deleteDirectory(new File(dataDir));
FileUtils.deleteDirectory(new File(logDir));
}
ZKClientPool.reset();
IDefaultNameSpace defaultNameSpace = new IDefaultNameSpace() {
@Override
public void createDefaultNameSpace(ZkClient zkClient) {
if (rootNamespaces == null) {
return;
}
for (String rootNamespace : rootNamespaces) {
try {
zkClient.deleteRecursive(rootNamespace);
} catch (Exception e) {
LOG.error("fail to deleteRecursive path:" + rootNamespace, e);
}
}
}
};
int port = Integer.parseInt(zkAddress.substring(zkAddress.lastIndexOf(':') + 1));
ZkServer zkServer = new ZkServer(dataDir, logDir, defaultNameSpace, port);
zkServer.start();
return zkServer;
}
static public void stopZkServer(ZkServer zkServer) {
if (zkServer != null) {
zkServer.shutdown();
System.out.println(
"Shut down zookeeper at port " + zkServer.getPort() + " in thread " + Thread
.currentThread().getName());
}
}
public static void setupEmptyCluster(HelixZkClient zkClient, String clusterName) {
ZKHelixAdmin admin = new ZKHelixAdmin(zkClient);
admin.addCluster(clusterName, true);
}
/**
* convert T[] to set<T>
* @param s
* @return
*/
public static <T> Set<T> setOf(T... s) {
Set<T> set = new HashSet<T>(Arrays.asList(s));
return set;
}
/**
* generic method for verification with a timeout
* @param verifierName
* @param args
*/
public static void verifyWithTimeout(String verifierName, long timeout, Object... args) {
final long sleepInterval = 1000; // in ms
final int loop = (int) (timeout / sleepInterval) + 1;
try {
boolean result = false;
int i = 0;
for (; i < loop; i++) {
Thread.sleep(sleepInterval);
// verifier should be static method
result = (Boolean) TestHelper.getMethod(verifierName).invoke(null, args);
if (result == true) {
break;
}
}
// debug
// LOG.info(verifierName + ": wait " + ((i + 1) * 1000) + "ms to verify ("
// + result + ")");
System.err.println(
verifierName + ": wait " + ((i + 1) * 1000) + "ms to verify " + " (" + result + ")");
LOG.debug("args:" + Arrays.toString(args));
// System.err.println("args:" + Arrays.toString(args));
if (result == false) {
LOG.error(verifierName + " fails");
LOG.error("args:" + Arrays.toString(args));
}
Assert.assertTrue(result);
} catch (Exception e) {
LOG.error("Exception in verify: " + verifierName, e);
}
}
private static Method getMethod(String name) {
Method[] methods = TestHelper.class.getMethods();
for (Method method : methods) {
if (name.equals(method.getName())) {
return method;
}
}
return null;
}
public static boolean verifyEmptyCurStateAndExtView(String clusterName, String resourceName,
Set<String> instanceNames, String zkAddr) {
HelixZkClient zkClient = SharedZkClientFactory.getInstance()
.buildZkClient(new HelixZkClient.ZkConnectionConfig(zkAddr));
zkClient.setZkSerializer(new ZNRecordSerializer());
try {
ZKHelixDataAccessor accessor =
new ZKHelixDataAccessor(clusterName, new ZkBaseDataAccessor<ZNRecord>(zkClient));
Builder keyBuilder = accessor.keyBuilder();
for (String instanceName : instanceNames) {
List<String> sessionIds = accessor.getChildNames(keyBuilder.sessions(instanceName));
for (String sessionId : sessionIds) {
CurrentState curState =
accessor.getProperty(keyBuilder.currentState(instanceName, sessionId, resourceName));
if (curState != null && curState.getRecord().getMapFields().size() != 0) {
return false;
}
CurrentState taskCurState =
accessor.getProperty(keyBuilder.taskCurrentState(instanceName, sessionId, resourceName));
if (taskCurState != null && taskCurState.getRecord().getMapFields().size() != 0) {
return false;
}
}
ExternalView extView = accessor.getProperty(keyBuilder.externalView(resourceName));
if (extView != null && extView.getRecord().getMapFields().size() != 0) {
return false;
}
}
return true;
} finally {
zkClient.close();
}
}
public static boolean verifyNotConnected(HelixManager manager) {
return !manager.isConnected();
}
public static void setupCluster(String clusterName, String zkAddr, int startPort,
String participantNamePrefix, String resourceNamePrefix, int resourceNb, int partitionNb,
int nodesNb, int replica, String stateModelDef, boolean doRebalance) throws Exception {
TestHelper
.setupCluster(clusterName, zkAddr, startPort, participantNamePrefix, resourceNamePrefix,
resourceNb, partitionNb, nodesNb, replica, stateModelDef, RebalanceMode.SEMI_AUTO,
doRebalance);
}
public static void setupCluster(String clusterName, String zkAddr, int startPort,
String participantNamePrefix, String resourceNamePrefix, int resourceNb, int partitionNb,
int nodesNb, int replica, String stateModelDef, RebalanceMode mode, boolean doRebalance) {
HelixZkClient zkClient = SharedZkClientFactory.getInstance()
.buildZkClient(new HelixZkClient.ZkConnectionConfig(zkAddr));
try {
if (zkClient.exists("/" + clusterName)) {
LOG.warn("Cluster already exists:" + clusterName + ". Deleting it");
zkClient.deleteRecursively("/" + clusterName);
}
ClusterSetup setupTool = new ClusterSetup(zkAddr);
setupTool.addCluster(clusterName, true);
for (int i = 0; i < nodesNb; i++) {
int port = startPort + i;
setupTool.addInstanceToCluster(clusterName, participantNamePrefix + "_" + port);
}
for (int i = 0; i < resourceNb; i++) {
String resourceName = resourceNamePrefix + i;
setupTool.addResourceToCluster(clusterName, resourceName, partitionNb, stateModelDef,
mode.name(),
mode == RebalanceMode.FULL_AUTO ? CrushEdRebalanceStrategy.class.getName()
: RebalanceStrategy.DEFAULT_REBALANCE_STRATEGY);
if (doRebalance) {
setupTool.rebalanceStorageCluster(clusterName, resourceName, replica);
}
}
} finally {
zkClient.close();
}
}
public static void dropCluster(String clusterName, RealmAwareZkClient zkClient) {
ClusterSetup setupTool = new ClusterSetup(zkClient);
dropCluster(clusterName, zkClient, setupTool);
}
public static void dropCluster(String clusterName, RealmAwareZkClient zkClient, ClusterSetup setup) {
String namespace = "/" + clusterName;
if (zkClient.exists(namespace)) {
try {
setup.deleteCluster(clusterName);
} catch (Exception ex) {
// Failed to delete, give some more time for connections to drop
try {
Thread.sleep(3000L);
setup.deleteCluster(clusterName);
} catch (Exception ignored) {
// OK - just ignore
}
}
}
}
/**
* @param stateMap
* : "ResourceName/partitionKey" -> setOf(instances)
* @param state
* : MASTER|SLAVE|ERROR...
*/
public static void verifyState(String clusterName, String zkAddr,
Map<String, Set<String>> stateMap, String state) {
HelixZkClient zkClient = SharedZkClientFactory.getInstance()
.buildZkClient(new HelixZkClient.ZkConnectionConfig(zkAddr));
zkClient.setZkSerializer(new ZNRecordSerializer());
try {
ZKHelixDataAccessor accessor =
new ZKHelixDataAccessor(clusterName, new ZkBaseDataAccessor<ZNRecord>(zkClient));
Builder keyBuilder = accessor.keyBuilder();
for (String resGroupPartitionKey : stateMap.keySet()) {
Map<String, String> retMap = getResourceAndPartitionKey(resGroupPartitionKey);
String resGroup = retMap.get("RESOURCE");
String partitionKey = retMap.get("PARTITION");
ExternalView extView = accessor.getProperty(keyBuilder.externalView(resGroup));
for (String instance : stateMap.get(resGroupPartitionKey)) {
String actualState = extView.getStateMap(partitionKey).get(instance);
Assert.assertNotNull(actualState,
"externalView doesn't contain state for " + resGroup + "/" + partitionKey + " on "
+ instance + " (expect " + state + ")");
Assert.assertEquals(actualState, state,
"externalView for " + resGroup + "/" + partitionKey + " on " + instance + " is "
+ actualState + " (expect " + state + ")");
}
}
} finally {
zkClient.close();
}
}
/**
* @param resourcePartition
* : key is in form of "resource/partitionKey" or "resource_x"
* @return
*/
private static Map<String, String> getResourceAndPartitionKey(String resourcePartition) {
String resourceName;
String partitionName;
int idx = resourcePartition.indexOf('/');
if (idx > -1) {
resourceName = resourcePartition.substring(0, idx);
partitionName = resourcePartition.substring(idx + 1);
} else {
idx = resourcePartition.lastIndexOf('_');
resourceName = resourcePartition.substring(0, idx);
partitionName = resourcePartition;
}
Map<String, String> retMap = new HashMap<String, String>();
retMap.put("RESOURCE", resourceName);
retMap.put("PARTITION", partitionName);
return retMap;
}
public static <T> Map<String, T> startThreadsConcurrently(final int nrThreads,
final Callable<T> method, final long timeout) {
final CountDownLatch startLatch = new CountDownLatch(1);
final CountDownLatch finishCounter = new CountDownLatch(nrThreads);
final Map<String, T> resultsMap = new ConcurrentHashMap<String, T>();
final List<Thread> threadList = new ArrayList<Thread>();
for (int i = 0; i < nrThreads; i++) {
Thread thread = new Thread() {
@Override
public void run() {
try {
boolean isTimeout = !startLatch.await(timeout, TimeUnit.SECONDS);
if (isTimeout) {
LOG.error("Timeout while waiting for start latch");
}
} catch (InterruptedException ex) {
LOG.error("Interrupted while waiting for start latch");
}
try {
T result = method.call();
if (result != null) {
resultsMap.put("thread_" + this.getId(), result);
}
LOG.debug("result=" + result);
} catch (Exception e) {
LOG.error("Exeption in executing " + method.getClass().getName(), e);
}
finishCounter.countDown();
}
};
threadList.add(thread);
thread.start();
}
startLatch.countDown();
// wait for all thread to complete
try {
boolean isTimeout = !finishCounter.await(timeout, TimeUnit.SECONDS);
if (isTimeout) {
LOG.error("Timeout while waiting for finish latch. Interrupt all threads");
for (Thread thread : threadList) {
thread.interrupt();
}
}
} catch (InterruptedException e) {
LOG.error("Interrupted while waiting for finish latch", e);
}
return resultsMap;
}
public static Message createMessage(String msgId, String fromState, String toState,
String tgtName, String resourceName, String partitionName) {
Message msg = new Message(MessageType.STATE_TRANSITION, msgId);
msg.setFromState(fromState);
msg.setToState(toState);
msg.setTgtName(tgtName);
msg.setResourceName(resourceName);
msg.setPartitionName(partitionName);
msg.setStateModelDef("MasterSlave");
return msg;
}
public static String getTestMethodName() {
StackTraceElement[] calls = Thread.currentThread().getStackTrace();
return calls[2].getMethodName();
}
public static String getTestClassName() {
StackTraceElement[] calls = Thread.currentThread().getStackTrace();
String fullClassName = calls[2].getClassName();
return fullClassName.substring(fullClassName.lastIndexOf('.') + 1);
}
public static <T> Map<String, T> startThreadsConcurrently(final List<Callable<T>> methods,
final long timeout) {
final int nrThreads = methods.size();
final CountDownLatch startLatch = new CountDownLatch(1);
final CountDownLatch finishCounter = new CountDownLatch(nrThreads);
final Map<String, T> resultsMap = new ConcurrentHashMap<String, T>();
final List<Thread> threadList = new ArrayList<Thread>();
for (int i = 0; i < nrThreads; i++) {
final Callable<T> method = methods.get(i);
Thread thread = new Thread() {
@Override
public void run() {
try {
boolean isTimeout = !startLatch.await(timeout, TimeUnit.SECONDS);
if (isTimeout) {
LOG.error("Timeout while waiting for start latch");
}
} catch (InterruptedException ex) {
LOG.error("Interrupted while waiting for start latch");
}
try {
T result = method.call();
if (result != null) {
resultsMap.put("thread_" + this.getId(), result);
}
LOG.debug("result=" + result);
} catch (Exception e) {
LOG.error("Exeption in executing " + method.getClass().getName(), e);
}
finishCounter.countDown();
}
};
threadList.add(thread);
thread.start();
}
startLatch.countDown();
// wait for all thread to complete
try {
boolean isTimeout = !finishCounter.await(timeout, TimeUnit.SECONDS);
if (isTimeout) {
LOG.error("Timeout while waiting for finish latch. Interrupt all threads");
for (Thread thread : threadList) {
thread.interrupt();
}
}
} catch (InterruptedException e) {
LOG.error("Interrupted while waiting for finish latch", e);
}
return resultsMap;
}
public static void printCache(Map<String, ZNode> cache) {
System.out.println("START:Print cache");
TreeMap<String, ZNode> map = new TreeMap<String, ZNode>();
map.putAll(cache);
for (String key : map.keySet()) {
ZNode node = map.get(key);
TreeSet<String> childSet = new TreeSet<String>();
childSet.addAll(node.getChildSet());
System.out.print(
key + "=" + node.getData() + ", " + childSet + ", " + (node.getStat() == null ? "null\n"
: node.getStat()));
}
System.out.println("END:Print cache");
}
public static void readZkRecursive(String path, Map<String, ZNode> map, HelixZkClient zkclient) {
try {
Stat stat = new Stat();
ZNRecord record = zkclient.readData(path, stat);
List<String> childNames = zkclient.getChildren(path);
ZNode node = new ZNode(path, record, stat);
node.addChildren(childNames);
map.put(path, node);
for (String childName : childNames) {
String childPath = path + "/" + childName;
readZkRecursive(childPath, map, zkclient);
}
} catch (ZkNoNodeException e) {
// OK
}
}
public static void readZkRecursive(String path, Map<String, ZNode> map,
BaseDataAccessor<ZNRecord> zkAccessor) {
try {
Stat stat = new Stat();
ZNRecord record = zkAccessor.get(path, stat, 0);
List<String> childNames = zkAccessor.getChildNames(path, 0);
// System.out.println("childNames: " + childNames);
ZNode node = new ZNode(path, record, stat);
node.addChildren(childNames);
map.put(path, node);
if (childNames != null && !childNames.isEmpty()) {
for (String childName : childNames) {
String childPath = path + "/" + childName;
readZkRecursive(childPath, map, zkAccessor);
}
}
} catch (ZkNoNodeException e) {
// OK
}
}
public static boolean verifyZkCache(List<String> paths, BaseDataAccessor<ZNRecord> zkAccessor,
HelixZkClient zkclient, boolean needVerifyStat) {
// read everything
Map<String, ZNode> zkMap = new HashMap<String, ZNode>();
Map<String, ZNode> cache = new HashMap<String, ZNode>();
for (String path : paths) {
readZkRecursive(path, zkMap, zkclient);
readZkRecursive(path, cache, zkAccessor);
}
// printCache(map);
return verifyZkCache(paths, null, cache, zkMap, needVerifyStat);
}
public static boolean verifyZkCache(List<String> paths, Map<String, ZNode> cache,
HelixZkClient zkclient, boolean needVerifyStat) {
return verifyZkCache(paths, null, cache, zkclient, needVerifyStat);
}
public static boolean verifyZkCache(List<String> paths, List<String> pathsExcludeForStat,
Map<String, ZNode> cache, HelixZkClient zkclient, boolean needVerifyStat) {
// read everything on zk under paths
Map<String, ZNode> zkMap = new HashMap<String, ZNode>();
for (String path : paths) {
readZkRecursive(path, zkMap, zkclient);
}
// printCache(map);
return verifyZkCache(paths, pathsExcludeForStat, cache, zkMap, needVerifyStat);
}
public static boolean verifyZkCache(List<String> paths, List<String> pathsExcludeForStat,
Map<String, ZNode> cache, Map<String, ZNode> zkMap, boolean needVerifyStat) {
// equal size
if (zkMap.size() != cache.size()) {
System.err
.println("size mismatch: cacheSize: " + cache.size() + ", zkMapSize: " + zkMap.size());
System.out.println("cache: (" + cache.size() + ")");
TestHelper.printCache(cache);
System.out.println("zkMap: (" + zkMap.size() + ")");
TestHelper.printCache(zkMap);
return false;
}
// everything in cache is also in map
for (String path : cache.keySet()) {
ZNode cacheNode = cache.get(path);
ZNode zkNode = zkMap.get(path);
if (zkNode == null) {
// in cache but not on zk
System.err.println("path: " + path + " in cache but not on zk: inCacheNode: " + cacheNode);
return false;
}
if ((zkNode.getData() == null && cacheNode.getData() != null) || (zkNode.getData() != null
&& cacheNode.getData() == null) || (zkNode.getData() != null
&& cacheNode.getData() != null && !zkNode.getData().equals(cacheNode.getData()))) {
// data not equal
System.err.println(
"data mismatch on path: " + path + ", inCache: " + cacheNode.getData() + ", onZk: "
+ zkNode.getData());
return false;
}
if ((zkNode.getChildSet() == null && cacheNode.getChildSet() != null) || (
zkNode.getChildSet() != null && cacheNode.getChildSet() == null) || (
zkNode.getChildSet() != null && cacheNode.getChildSet() != null && !zkNode.getChildSet()
.equals(cacheNode.getChildSet()))) {
// childSet not equal
System.err.println(
"childSet mismatch on path: " + path + ", inCache: " + cacheNode.getChildSet()
+ ", onZk: " + zkNode.getChildSet());
return false;
}
if (needVerifyStat && pathsExcludeForStat != null && !pathsExcludeForStat.contains(path)) {
if (cacheNode.getStat() == null || !zkNode.getStat().equals(cacheNode.getStat())) {
// stat not equal
System.err.println(
"Stat mismatch on path: " + path + ", inCache: " + cacheNode.getStat() + ", onZk: "
+ zkNode.getStat());
return false;
}
}
}
return true;
}
public static StateModelDefinition generateStateModelDefForBootstrap() {
ZNRecord record = new ZNRecord("Bootstrap");
record.setSimpleField(StateModelDefinitionProperty.INITIAL_STATE.toString(), "IDLE");
List<String> statePriorityList = new ArrayList<String>();
statePriorityList.add("ONLINE");
statePriorityList.add("BOOTSTRAP");
statePriorityList.add("OFFLINE");
statePriorityList.add("IDLE");
statePriorityList.add("DROPPED");
statePriorityList.add("ERROR");
record.setListField(StateModelDefinitionProperty.STATE_PRIORITY_LIST.toString(),
statePriorityList);
for (String state : statePriorityList) {
String key = state + ".meta";
Map<String, String> metadata = new HashMap<String, String>();
if (state.equals("ONLINE")) {
metadata.put("count", "R");
record.setMapField(key, metadata);
} else if (state.equals("BOOTSTRAP")) {
metadata.put("count", "-1");
record.setMapField(key, metadata);
} else if (state.equals("OFFLINE")) {
metadata.put("count", "-1");
record.setMapField(key, metadata);
} else if (state.equals("IDLE")) {
metadata.put("count", "-1");
record.setMapField(key, metadata);
} else if (state.equals("DROPPED")) {
metadata.put("count", "-1");
record.setMapField(key, metadata);
} else if (state.equals("ERROR")) {
metadata.put("count", "-1");
record.setMapField(key, metadata);
}
}
for (String state : statePriorityList) {
String key = state + ".next";
if (state.equals("ONLINE")) {
Map<String, String> metadata = new HashMap<String, String>();
metadata.put("BOOTSTRAP", "OFFLINE");
metadata.put("OFFLINE", "OFFLINE");
metadata.put("DROPPED", "OFFLINE");
metadata.put("IDLE", "OFFLINE");
record.setMapField(key, metadata);
} else if (state.equals("BOOTSTRAP")) {
Map<String, String> metadata = new HashMap<String, String>();
metadata.put("ONLINE", "ONLINE");
metadata.put("OFFLINE", "OFFLINE");
metadata.put("DROPPED", "OFFLINE");
metadata.put("IDLE", "OFFLINE");
record.setMapField(key, metadata);
} else if (state.equals("OFFLINE")) {
Map<String, String> metadata = new HashMap<String, String>();
metadata.put("ONLINE", "BOOTSTRAP");
metadata.put("BOOTSTRAP", "BOOTSTRAP");
metadata.put("DROPPED", "IDLE");
metadata.put("IDLE", "IDLE");
record.setMapField(key, metadata);
} else if (state.equals("IDLE")) {
Map<String, String> metadata = new HashMap<String, String>();
metadata.put("ONLINE", "OFFLINE");
metadata.put("BOOTSTRAP", "OFFLINE");
metadata.put("OFFLINE", "OFFLINE");
metadata.put("DROPPED", "DROPPED");
record.setMapField(key, metadata);
} else if (state.equals("ERROR")) {
Map<String, String> metadata = new HashMap<String, String>();
metadata.put("IDLE", "IDLE");
record.setMapField(key, metadata);
}
}
List<String> stateTransitionPriorityList = new ArrayList<String>();
stateTransitionPriorityList.add("ONLINE-OFFLINE");
stateTransitionPriorityList.add("BOOTSTRAP-ONLINE");
stateTransitionPriorityList.add("OFFLINE-BOOTSTRAP");
stateTransitionPriorityList.add("BOOTSTRAP-OFFLINE");
stateTransitionPriorityList.add("OFFLINE-IDLE");
stateTransitionPriorityList.add("IDLE-OFFLINE");
stateTransitionPriorityList.add("IDLE-DROPPED");
stateTransitionPriorityList.add("ERROR-IDLE");
record.setListField(StateModelDefinitionProperty.STATE_TRANSITION_PRIORITYLIST.toString(),
stateTransitionPriorityList);
return new StateModelDefinition(record);
}
public static String znrecordToString(ZNRecord record) {
StringBuffer sb = new StringBuffer();
sb.append(record.getId() + "\n");
Map<String, String> simpleFields = record.getSimpleFields();
if (simpleFields != null) {
sb.append("simpleFields\n");
for (String key : simpleFields.keySet()) {
sb.append(" " + key + "\t: " + simpleFields.get(key) + "\n");
}
}
Map<String, List<String>> listFields = record.getListFields();
sb.append("listFields\n");
for (String key : listFields.keySet()) {
List<String> list = listFields.get(key);
sb.append(" " + key + "\t: ");
for (String listValue : list) {
sb.append(listValue + ", ");
}
sb.append("\n");
}
Map<String, Map<String, String>> mapFields = record.getMapFields();
sb.append("mapFields\n");
for (String key : mapFields.keySet()) {
Map<String, String> map = mapFields.get(key);
sb.append(" " + key + "\t: \n");
for (String mapKey : map.keySet()) {
sb.append(" " + mapKey + "\t: " + map.get(mapKey) + "\n");
}
}
return sb.toString();
}
public interface Verifier {
boolean verify() throws Exception;
}
public static boolean verify(Verifier verifier, long timeout) throws Exception {
long start = System.currentTimeMillis();
do {
boolean result = verifier.verify();
boolean isTimedout = (System.currentTimeMillis() - start) > timeout;
if (result || isTimedout) {
if (isTimedout && !result) {
LOG.error("verifier time out, consider try longer timeout, stack trace{}",
Arrays.asList(Thread.currentThread().getStackTrace()));
}
return result;
}
Thread.sleep(50);
} while (true);
}
// debug code
public static String printHandlers(ZkTestManager manager) {
StringBuilder sb = new StringBuilder();
List<CallbackHandler> handlers = manager.getHandlers();
sb.append(manager.getInstanceName() + " has " + handlers.size() + " cb-handlers. [");
for (int i = 0; i < handlers.size(); i++) {
CallbackHandler handler = handlers.get(i);
String path = handler.getPath();
sb.append(
path.substring(manager.getClusterName().length() + 1) + ": " + handler.getListener());
if (i < (handlers.size() - 1)) {
sb.append(", ");
}
}
sb.append("]");
return sb.toString();
}
public static void printZkListeners(HelixZkClient client) throws Exception {
Map<String, Set<IZkDataListener>> datalisteners = ZkTestHelper.getZkDataListener(client);
Map<String, Set<IZkChildListener>> childListeners = ZkTestHelper.getZkChildListener(client);
System.out.println("dataListeners {");
for (String path : datalisteners.keySet()) {
System.out.println("\t" + path + ": ");
Set<IZkDataListener> set = datalisteners.get(path);
for (IZkDataListener listener : set) {
CallbackHandler handler = (CallbackHandler) listener;
System.out.println("\t\t" + handler.getListener());
}
}
System.out.println("}");
System.out.println("childListeners {");
for (String path : childListeners.keySet()) {
System.out.println("\t" + path + ": ");
Set<IZkChildListener> set = childListeners.get(path);
for (IZkChildListener listener : set) {
CallbackHandler handler = (CallbackHandler) listener;
System.out.println("\t\t" + handler.getListener());
}
}
System.out.println("}");
}
}