blob: 6a757f3ecfe6b00afe95283012b8ec7b81b9a0f4 [file] [log] [blame]
package org.apache.helix.tools;
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
import java.util.ArrayList;
import java.util.Collections;
import java.util.Comparator;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.TreeMap;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.CountDownLatch;
import org.I0Itec.zkclient.exception.ZkBadVersionException;
import org.I0Itec.zkclient.exception.ZkNodeExistsException;
import org.apache.helix.HelixManager;
import org.apache.helix.ZNRecord;
import org.apache.helix.manager.zk.ZNRecordSerializer;
import org.apache.helix.manager.zk.client.HelixZkClient;
import org.apache.helix.manager.zk.client.SharedZkClientFactory;
import org.apache.helix.store.PropertyJsonComparator;
import org.apache.helix.store.PropertyJsonSerializer;
import org.apache.helix.store.PropertyStoreException;
import org.apache.helix.tools.TestCommand.CommandType;
import org.apache.zookeeper.data.Stat;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
* a test is structured logically as a list of commands a command has three parts: COMMAND
* | TRIGGER | ARG'S COMMAND could be: modify, verify, start, stop
* TRIGGER is optional and consists of start-time, timeout, and expect-value which means
* the COMMAND is triggered between [start-time, start-time + timeout] and is triggered
* when the value in concern equals to expect-value
* ARG's format depends on COMMAND if COMMAND is modify/verify, arg is in form of:
* <znode-path, property-type (SIMPLE, LIST, or MAP), operation(+, -, ==, !=), key,
* update-value> in which key is k1 for SIMPLE, k1|index for LIST, and k1|k2 for MAP field
* if COMMAND is start/stop, arg is a thread handler
*/
public class TestExecutor {
/**
* SIMPLE: simple field change LIST: list field change MAP: map field change ZNODE:
* entire znode change
*/
public enum ZnodePropertyType {
SIMPLE,
LIST,
MAP,
ZNODE
}
private enum ZnodeModValueType {
INVALID,
SINGLE_VALUE,
LIST_VALUE,
MAP_VALUE,
ZNODE_VALUE
}
private static Logger logger = LoggerFactory.getLogger(TestExecutor.class);
private static final long SLEEP_TIME = 500; // in
// ms
private final static PropertyJsonComparator<String> STRING_COMPARATOR =
new PropertyJsonComparator<String>(String.class);
private final static PropertyJsonSerializer<ZNRecord> ZNRECORD_SERIALIZER =
new PropertyJsonSerializer<ZNRecord>(ZNRecord.class);
private static ZnodeModValueType getValueType(ZnodePropertyType type, String key) {
ZnodeModValueType valueType = ZnodeModValueType.INVALID;
switch (type) {
case SIMPLE:
if (key == null) {
logger.warn("invalid key for simple field: key is null");
} else {
String keyParts[] = key.split("/");
if (keyParts.length != 1) {
logger.warn("invalid key for simple field: " + key + ", expect 1 part: key1 (no slash)");
} else {
valueType = ZnodeModValueType.SINGLE_VALUE;
}
}
break;
case LIST:
if (key == null) {
logger.warn("invalid key for simple field: key is null");
} else {
String keyParts[] = key.split("/");
if (keyParts.length < 1 || keyParts.length > 2) {
logger.warn("invalid key for list field: " + key
+ ", expect 1 or 2 parts: key1 or key1/index)");
} else if (keyParts.length == 1) {
valueType = ZnodeModValueType.LIST_VALUE;
} else {
try {
int index = Integer.parseInt(keyParts[1]);
if (index < 0) {
logger.warn("invalid key for list field: " + key + ", index < 0");
} else {
valueType = ZnodeModValueType.SINGLE_VALUE;
}
} catch (NumberFormatException e) {
logger.warn("invalid key for list field: " + key + ", part-2 is NOT an integer");
}
}
}
break;
case MAP:
if (key == null) {
logger.warn("invalid key for simple field: key is null");
} else {
String keyParts[] = key.split("/");
if (keyParts.length < 1 || keyParts.length > 2) {
logger.warn("invalid key for map field: " + key
+ ", expect 1 or 2 parts: key1 or key1/key2)");
} else if (keyParts.length == 1) {
valueType = ZnodeModValueType.MAP_VALUE;
} else {
valueType = ZnodeModValueType.SINGLE_VALUE;
}
}
break;
case ZNODE:
valueType = ZnodeModValueType.ZNODE_VALUE;
default:
break;
}
return valueType;
}
private static String getSingleValue(ZNRecord record, ZnodePropertyType type, String key) {
if (record == null || key == null) {
return null;
}
String value = null;
String keyParts[] = key.split("/");
switch (type) {
case SIMPLE:
value = record.getSimpleField(key);
break;
case LIST:
List<String> list = record.getListField(keyParts[0]);
if (list == null) {
logger.warn("invalid key for list field: " + key + ", map for key part-1 doesn't exist");
return null;
}
int idx = Integer.parseInt(keyParts[1]);
value = list.get(idx);
break;
case MAP:
Map<String, String> map = record.getMapField(keyParts[0]);
if (map == null) {
logger.warn("invalid key for map field: " + key + ", map for key part-1 doesn't exist");
return null;
}
value = map.get(keyParts[1]);
break;
default:
break;
}
return value;
}
private static List<String> getListValue(ZNRecord record, String key) {
if (record == null) {
return null;
}
return record.getListField(key);
}
private static Map<String, String> getMapValue(ZNRecord record, String key) {
return record.getMapField(key);
}
// comparator's for single/list/map-value
private static boolean compareSingleValue(String actual, String expect, String key, ZNRecord diff) {
boolean ret = (STRING_COMPARATOR.compare(actual, expect) == 0);
if (diff != null) {
diff.setSimpleField(key + "/expect", expect);
diff.setSimpleField(key + "/actual", actual);
}
return ret;
}
private static boolean compareListValue(List<String> actualList, List<String> expectList,
String key, ZNRecord diff) {
boolean ret = true;
if (actualList == null && expectList == null) {
ret = true;
} else if (actualList == null && expectList != null) {
ret = false;
if (diff != null) {
diff.setListField(key + "/expect", expectList);
}
} else if (actualList != null && expectList == null) {
ret = false;
if (diff != null) {
diff.setListField(key + "/actual", actualList);
}
} else {
Iterator<String> itrActual = actualList.iterator();
Iterator<String> itrExpect = expectList.iterator();
if (diff != null && diff.getListField(key + "/expect") == null) {
diff.setListField(key + "/expect", new ArrayList<String>());
}
if (diff != null && diff.getListField(key + "/actual") == null) {
diff.setListField(key + "/actual", new ArrayList<String>());
}
while (itrActual.hasNext() && itrExpect.hasNext()) {
String actual = itrActual.next();
String expect = itrExpect.next();
if (STRING_COMPARATOR.compare(actual, expect) != 0) {
ret = false;
if (diff != null) {
diff.getListField(key + "/expect").add(expect);
diff.getListField(key + "/actual").add(actual);
}
}
}
while (itrActual.hasNext()) {
String actual = itrActual.next();
if (diff != null) {
diff.getListField(key + "/actual").add(actual);
}
}
while (itrExpect.hasNext()) {
String expect = itrExpect.next();
if (diff != null) {
diff.getListField(key + "/expect").add(expect);
}
}
}
return ret;
}
private static void setMapField(ZNRecord record, String key1, String key2, String value) {
if (record.getMapField(key1) == null) {
record.setMapField(key1, new TreeMap<String, String>());
}
record.getMapField(key1).put(key2, value);
}
private static boolean compareMapValue(Map<String, String> actualMap,
Map<String, String> expectMap, String mapKey, ZNRecord diff) {
boolean ret = true;
if (actualMap == null && expectMap == null) {
ret = true;
} else if (actualMap == null && expectMap != null) {
ret = false;
if (diff != null) {
diff.setMapField(mapKey + "/expect", expectMap);
}
} else if (actualMap != null && expectMap == null) {
ret = false;
if (diff != null) {
diff.setMapField(mapKey + "/actual", actualMap);
}
} else {
for (String key : actualMap.keySet()) {
String actual = actualMap.get(key);
if (!expectMap.containsKey(key)) {
ret = false;
if (diff != null) {
setMapField(diff, mapKey + "/actual", key, actual);
}
} else {
String expect = expectMap.get(key);
if (STRING_COMPARATOR.compare(actual, expect) != 0) {
ret = false;
if (diff != null) {
setMapField(diff, mapKey + "/actual", key, actual);
setMapField(diff, mapKey + "/expect", key, expect);
}
}
}
}
for (String key : expectMap.keySet()) {
String expect = expectMap.get(key);
if (!actualMap.containsKey(key)) {
ret = false;
if (diff != null) {
setMapField(diff, mapKey + "/expect", key, expect);
}
} else {
String actual = actualMap.get(key);
if (STRING_COMPARATOR.compare(actual, expect) != 0) {
ret = false;
if (diff != null) {
setMapField(diff, mapKey + "/actual", key, actual);
setMapField(diff, mapKey + "/expect", key, expect);
}
}
}
}
}
return ret;
}
private static void setZNRecord(ZNRecord diff, ZNRecord record, String keySuffix) {
if (diff == null || record == null) {
return;
}
for (String key : record.getSimpleFields().keySet()) {
diff.setSimpleField(key + "/" + keySuffix, record.getSimpleField(key));
}
for (String key : record.getListFields().keySet()) {
diff.setListField(key + "/" + keySuffix, record.getListField(key));
}
for (String key : record.getMapFields().keySet()) {
diff.setMapField(key + "/" + keySuffix, record.getMapField(key));
}
}
private static boolean compareZnodeValue(ZNRecord actual, ZNRecord expect, ZNRecord diff) {
boolean ret = true;
if (actual == null && expect == null) {
ret = true;
} else if (actual == null && expect != null) {
ret = false;
if (diff != null) {
setZNRecord(diff, expect, "expect");
}
} else if (actual != null && expect == null) {
ret = false;
if (diff != null) {
setZNRecord(diff, actual, "actual");
}
} else {
for (String key : actual.getSimpleFields().keySet()) {
if (compareSingleValue(actual.getSimpleField(key), expect.getSimpleField(key), key, diff) == false) {
ret = false;
}
}
for (String key : expect.getMapFields().keySet()) {
if (!actual.getMapFields().containsKey(key)) {
if (diff != null) {
ret = false;
diff.setMapField(key + "/expect", expect.getMapField(key));
}
} else {
if (compareMapValue(actual.getMapField(key), expect.getMapField(key), key, diff) == false) {
ret = false;
}
}
}
for (String key : actual.getMapFields().keySet()) {
if (!expect.getMapFields().containsKey(key)) {
if (diff != null) {
ret = false;
diff.setMapField(key + "/actual", actual.getMapField(key));
}
} else {
if (compareMapValue(actual.getMapField(key), expect.getMapField(key), key, diff) == false) {
ret = false;
}
}
}
}
return ret;
}
private static void resetZNRecord(ZNRecord record) {
if (record != null) {
record.getSimpleFields().clear();
record.getListFields().clear();
record.getMapFields().clear();
}
}
private static boolean isValueExpected(ZNRecord current, ZnodePropertyType type, String key,
ZnodeValue expect, ZNRecord diff) {
// expect value = null means not expect any value
if (expect == null) {
return true;
}
boolean result = false;
resetZNRecord(diff);
ZnodeModValueType valueType = getValueType(type, key);
switch (valueType) {
case SINGLE_VALUE:
String singleValue = getSingleValue(current, type, key);
result = compareSingleValue(singleValue, expect._singleValue, key, diff);
break;
case LIST_VALUE:
List<String> listValue = getListValue(current, key);
result = compareListValue(listValue, expect._listValue, key, diff);
break;
case MAP_VALUE:
Map<String, String> mapValue = getMapValue(current, key);
result = compareMapValue(mapValue, expect._mapValue, key, diff);
break;
case ZNODE_VALUE:
result = compareZnodeValue(current, expect._znodeValue, diff);
break;
case INVALID:
break;
default:
break;
}
return result;
}
private static void setSingleValue(ZNRecord record, ZnodePropertyType type, String key,
String value) {
String keyParts[] = key.split("/");
switch (type) {
case SIMPLE:
record.setSimpleField(key, value);
break;
case LIST:
List<String> list = record.getListField(keyParts[0]);
if (list == null) {
logger.warn("invalid key for list field: " + key + ", value for key part-1 doesn't exist");
return;
}
int idx = Integer.parseInt(keyParts[1]);
list.remove(idx);
list.add(idx, value);
break;
case MAP:
Map<String, String> map = record.getMapField(keyParts[0]);
if (map == null) {
logger.warn("invalid key for map field: " + key + ", value for key part-1 doesn't exist");
return;
}
map.put(keyParts[1], value);
break;
default:
break;
}
}
private static void setListValue(ZNRecord record, String key, List<String> value) {
record.setListField(key, value);
}
private static void setMapValue(ZNRecord record, String key, Map<String, String> value) {
record.setMapField(key, value);
}
private static void removeSingleValue(ZNRecord record, ZnodePropertyType type, String key) {
if (record == null) {
return;
}
String keyParts[] = key.split("/");
switch (type) {
case SIMPLE:
record.getSimpleFields().remove(key);
break;
case LIST:
List<String> list = record.getListField(keyParts[0]);
if (list == null) {
logger.warn("invalid key for list field: " + key + ", value for key part-1 doesn't exist");
return;
}
int idx = Integer.parseInt(keyParts[1]);
list.remove(idx);
break;
case MAP:
Map<String, String> map = record.getMapField(keyParts[0]);
if (map == null) {
logger.warn("invalid key for map field: " + key + ", value for key part-1 doesn't exist");
return;
}
map.remove(keyParts[1]);
break;
default:
break;
}
}
private static void removeListValue(ZNRecord record, String key) {
if (record == null || record.getListFields() == null) {
record.getListFields().remove(key);
}
}
private static void removeMapValue(ZNRecord record, String key) {
record.getMapFields().remove(key);
}
private static boolean executeVerifier(ZNRecord actual, TestCommand command, ZNRecord diff) {
final ZnodeOpArg arg = command._znodeOpArg;
final ZnodeValue expectValue = command._trigger._expectValue;
boolean result = isValueExpected(actual, arg._propertyType, arg._key, expectValue, diff);
String operation = arg._operation;
if (operation.equals("!=")) {
result = !result;
} else if (!operation.equals("==")) {
logger.warn("fail to execute (unsupport operation=" + operation + "):" + operation);
result = false;
}
return result;
}
private static boolean compareAndSetZnode(ZnodeValue expect, ZnodeOpArg arg, HelixZkClient zkClient,
ZNRecord diff) {
String path = arg._znodePath;
ZnodePropertyType type = arg._propertyType;
String key = arg._key;
boolean success = true;
// retry 3 times in case there are write conflicts
long backoffTime = 20; // ms
for (int i = 0; i < 3; i++) {
try {
Stat stat = new Stat();
ZNRecord record = zkClient.<ZNRecord> readDataAndStat(path, stat, true);
if (isValueExpected(record, type, key, expect, diff)) {
if (arg._operation.compareTo("+") == 0) {
if (record == null) {
record = new ZNRecord("default");
}
ZnodeModValueType valueType = getValueType(arg._propertyType, arg._key);
switch (valueType) {
case SINGLE_VALUE:
setSingleValue(record, arg._propertyType, arg._key, arg._updateValue._singleValue);
break;
case LIST_VALUE:
setListValue(record, arg._key, arg._updateValue._listValue);
break;
case MAP_VALUE:
setMapValue(record, arg._key, arg._updateValue._mapValue);
break;
case ZNODE_VALUE:
// deep copy
record =
ZNRECORD_SERIALIZER.deserialize(ZNRECORD_SERIALIZER
.serialize(arg._updateValue._znodeValue));
break;
case INVALID:
break;
default:
break;
}
} else if (arg._operation.compareTo("-") == 0) {
ZnodeModValueType valueType = getValueType(arg._propertyType, arg._key);
switch (valueType) {
case SINGLE_VALUE:
removeSingleValue(record, arg._propertyType, arg._key);
break;
case LIST_VALUE:
removeListValue(record, arg._key);
break;
case MAP_VALUE:
removeMapValue(record, arg._key);
break;
case ZNODE_VALUE:
record = null;
break;
case INVALID:
break;
default:
break;
}
} else {
logger.warn("fail to execute (unsupport operation): " + arg._operation);
success = false;
}
if (success == true) {
if (record == null) {
zkClient.delete(path);
} else {
try {
zkClient.createPersistent(path, true);
} catch (ZkNodeExistsException e) {
// OK
}
zkClient.writeData(path, record, stat.getVersion());
}
return true;
} else {
return false;
}
}
} catch (ZkBadVersionException e) {
// e.printStackTrace();
} catch (PropertyStoreException e) {
// e.printStackTrace();
}
try {
Thread.sleep(backoffTime);
} catch (InterruptedException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
backoffTime *= 2;
}
return false;
}
private static class ExecuteCommand implements Runnable {
private final TestCommand _command;
private final long _startTime;
private final HelixZkClient _zkClient;
private final CountDownLatch _countDown;
private final Map<TestCommand, Boolean> _testResults;
public ExecuteCommand(long startTime, TestCommand command, CountDownLatch countDown,
HelixZkClient zkClient, Map<TestCommand, Boolean> testResults) {
_startTime = startTime;
_command = command;
_countDown = countDown;
_zkClient = zkClient;
_testResults = testResults;
}
@Override
public void run() {
boolean result = false;
long now = System.currentTimeMillis();
final long timeout = now + _command._trigger._timeout;
ZNRecord diff = new ZNRecord("diff");
try {
if (now < _startTime) {
Thread.sleep(_startTime - now);
}
do {
if (_command._commandType == CommandType.MODIFY) {
ZnodeOpArg arg = _command._znodeOpArg;
final ZnodeValue expectValue = _command._trigger._expectValue;
result = compareAndSetZnode(expectValue, arg, _zkClient, diff);
// logger.error("result:" + result + ", " + _command);
if (result == true) {
_command._finishTimestamp = System.currentTimeMillis();
_testResults.put(_command, true);
break;
} else {
// logger.error("result:" + result + ", diff:" + diff);
}
} else if (_command._commandType == CommandType.VERIFY) {
ZnodeOpArg arg = _command._znodeOpArg;
final String znodePath = arg._znodePath;
ZNRecord record = _zkClient.<ZNRecord> readData(znodePath, true);
result = executeVerifier(record, _command, diff);
// logger.error("result:" + result + ", " + _command.toString());
if (result == true) {
_command._finishTimestamp = System.currentTimeMillis();
_testResults.put(_command, true);
break;
} else {
// logger.error("result:" + result + ", diff:" + diff);
}
} else if (_command._commandType == CommandType.START) {
// TODO add data trigger for START command
Thread thread = _command._nodeOpArg._thread;
thread.start();
result = true;
_command._finishTimestamp = System.currentTimeMillis();
logger.info("result:" + result + ", " + _command.toString());
_testResults.put(_command, true);
break;
} else if (_command._commandType == CommandType.STOP) {
// TODO add data trigger for STOP command
HelixManager manager = _command._nodeOpArg._manager;
manager.disconnect();
Thread thread = _command._nodeOpArg._thread;
thread.interrupt();
// System.err.println("stop " +
// _command._nodeOpArg._manager.getInstanceName());
result = true;
_command._finishTimestamp = System.currentTimeMillis();
logger.info("result:" + result + ", " + _command.toString());
_testResults.put(_command, true);
break;
} else {
throw new IllegalArgumentException("Unsupport command type (was "
+ _command._commandType + ")");
}
Thread.sleep(SLEEP_TIME);
now = System.currentTimeMillis();
} while (now <= timeout);
} catch (Exception e) {
// TODO Auto-generated catch block
e.printStackTrace();
} finally {
if (result == false) {
_command._finishTimestamp = System.currentTimeMillis();
logger.error("result:" + result + ", diff: " + diff);
}
_countDown.countDown();
if (_countDown.getCount() == 0) {
if (_zkClient != null && !_zkClient.isClosed()) {
_zkClient.close();
}
}
}
}
}
private static Map<TestCommand, Boolean> executeTestHelper(List<TestCommand> commandList,
String zkAddr, CountDownLatch countDown) {
final Map<TestCommand, Boolean> testResults = new ConcurrentHashMap<TestCommand, Boolean>();
HelixZkClient.ZkClientConfig clientConfig = new HelixZkClient.ZkClientConfig();
clientConfig.setZkSerializer(new ZNRecordSerializer());
HelixZkClient zkClient = SharedZkClientFactory
.getInstance().buildZkClient(new HelixZkClient.ZkConnectionConfig(zkAddr), clientConfig);
// sort on trigger's start time, stable sort
Collections.sort(commandList, new Comparator<TestCommand>() {
@Override
public int compare(TestCommand o1, TestCommand o2) {
return (int) (o1._trigger._startTime - o2._trigger._startTime);
}
});
for (TestCommand command : commandList) {
testResults.put(command, new Boolean(false));
TestTrigger trigger = command._trigger;
command._startTimestamp = System.currentTimeMillis() + trigger._startTime;
new Thread(new ExecuteCommand(command._startTimestamp, command, countDown, zkClient,
testResults)).start();
}
return testResults;
}
public static void executeTestAsync(List<TestCommand> commandList, String zkAddr)
throws InterruptedException {
CountDownLatch countDown = new CountDownLatch(commandList.size());
executeTestHelper(commandList, zkAddr, countDown);
}
public static Map<TestCommand, Boolean> executeTest(List<TestCommand> commandList, String zkAddr)
throws InterruptedException {
final CountDownLatch countDown = new CountDownLatch(commandList.size());
Map<TestCommand, Boolean> testResults = executeTestHelper(commandList, zkAddr, countDown);
// TODO add timeout
countDown.await();
return testResults;
}
}