blob: e15b9f5c46f070f20e09fb6fca68bb7abd35accf [file] [log] [blame]
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* See the License for the specific language governing permissions and
* limitations under the License.
package org.apache.accumulo.testing.randomwalk;
import static java.nio.charset.StandardCharsets.UTF_8;
import java.util.ArrayList;
import java.util.Collection;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Map.Entry;
import java.util.Properties;
import java.util.Random;
import java.util.Set;
import java.util.TreeSet;
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.FutureTask;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicBoolean;
import javax.xml.XMLConstants;
import javax.xml.parsers.DocumentBuilder;
import javax.xml.parsers.DocumentBuilderFactory;
import javax.xml.validation.Schema;
import javax.xml.validation.SchemaFactory;
import org.apache.accumulo.core.util.SimpleThreadPool;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.w3c.dom.Document;
import org.w3c.dom.Element;
import org.w3c.dom.NodeList;
* A module is directed graph of tests
public class Module extends Node {
private static final Logger log = LoggerFactory.getLogger(Module.class);
private class Dummy extends Node {
String name;
Dummy(String name) { = name;
public void visit(State state, RandWalkEnv env, Properties props) {
String print;
if ((print = props.getProperty("print")) != null) {
switch (print) {
case "TRACE":
case "DEBUG":
case "INFO":;
case "WARN":
case "ERROR":
public String toString() {
return name;
private class Alias extends Node {
Node target;
String targetId;
String id;
Alias(String id) {
target = null; = id;
public void visit(State state, RandWalkEnv env, Properties props) throws Exception {
throw new Exception("You don't visit aliases!");
public String toString() {
return id;
public void update(String node) throws Exception {
targetId = node;
target = getNode(node);
public Node get() {
return target;
public String getTargetId() {
return targetId;
private HashMap<String,Node> nodes = new HashMap<>();
private HashMap<String,Properties> localProps = new HashMap<>();
private class Edge {
String nodeId;
int weight;
private class AdjList {
private List<Edge> edges = new ArrayList<>();
private int totalWeight = 0;
private Random rand = new Random();
* Adds a neighbor node and weight of edge
private void addEdge(String nodeId, int weight) {
totalWeight += weight;
Edge e = new Edge();
e.nodeId = nodeId;
e.weight = weight;
* Chooses a random neighbor node
* @return Node or null if no edges
private String randomNeighbor() throws Exception {
String nodeId = null;
rand = new Random();
int randNum = rand.nextInt(totalWeight) + 1;
int sum = 0;
for (Edge e : edges) {
nodeId = e.nodeId;
sum += e.weight;
if (randNum <= sum) {
return nodeId;
private HashMap<String,String> prefixes = new HashMap<>();
private HashMap<String,AdjList> adjMap = new HashMap<>();
private HashMap<String,Set<String>> aliasMap = new HashMap<>();
private final String id;
private String initNodeId;
private Fixture fixture = null;
public Module(String id) throws Exception { = id;
public void visit(final State state, final RandWalkEnv env, Properties props) throws Exception {
int maxHops, maxSec;
boolean teardown;
Properties initProps = getProps("_init");
String prop;
if ((prop = initProps.getProperty("maxHops")) == null || prop.equals("0") || prop.equals(""))
maxHops = Integer.MAX_VALUE;
maxHops = Integer.parseInt(initProps.getProperty("maxHops", "0"));
if ((prop = initProps.getProperty("maxSec")) == null || prop.equals("0") || prop.equals(""))
maxSec = Integer.MAX_VALUE;
maxSec = Integer.parseInt(initProps.getProperty("maxSec", "0"));
if ((prop = initProps.getProperty("teardown")) == null || prop.equals("true")
|| prop.equals(""))
teardown = true;
teardown = false;
if (fixture != null) {
fixture.setUp(state, env);
ExecutorService service = new SimpleThreadPool(1, "RandomWalk Runner");
try {
Node initNode = getNode(initNodeId);
boolean test = false;
if (initNode instanceof Test) {
test = true;
initNode.visit(state, env, getProps(initNodeId));
if (test)
// update aliases
Set<String> aliases;
if ((aliases = aliasMap.get(initNodeId)) != null)
for (String alias : aliases) {
((Alias) nodes.get(alias)).update(initNodeId);
String curNodeId = initNodeId;
int numHops = 0;
long startTime = System.currentTimeMillis() / 1000;
while (true) {
// check if END state was reached
if (curNodeId.equalsIgnoreCase("END")) {
log.debug("reached END state");
// check if maxSec was reached
long curTime = System.currentTimeMillis() / 1000;
if ((curTime - startTime) > maxSec) {
log.debug("reached maxSec(" + maxSec + ")");
// The number of seconds before the test should exit
long secondsRemaining = maxSec - (curTime - startTime);
// check if maxHops was reached
if (numHops > maxHops) {
log.debug("reached maxHops(" + maxHops + ")");
if (!adjMap.containsKey(curNodeId) && !curNodeId.startsWith("alias.")) {
throw new Exception("Reached node(" + curNodeId + ") without outgoing edges in module("
+ this + ")");
AdjList adj = adjMap.get(curNodeId);
String nextNodeId = adj.randomNeighbor();
final Node nextNode;
Node nextNodeOrAlias = getNode(nextNodeId);
if (nextNodeOrAlias instanceof Alias) {
nextNodeId = ((Alias) nextNodeOrAlias).getTargetId();
nextNode = ((Alias) nextNodeOrAlias).get();
} else {
nextNode = nextNodeOrAlias;
final Properties nodeProps = getProps(nextNodeId);
try {
test = false;
if (nextNode instanceof Test) {
test = true;
// Wrap the visit of the next node in the module in a
// callable that returns a thrown exception
FutureTask<Exception> task = new FutureTask<>(new Callable<Exception>() {
public Exception call() throws Exception {
try {
nextNode.visit(state, env, nodeProps);
return null;
} catch (Exception e) {
return e;
// Run the task (should execute immediately)
Exception nodeException;
try {
// Bound the time we'll wait for the node to complete
nodeException = task.get(secondsRemaining, TimeUnit.SECONDS);
} catch (InterruptedException e) {
log.warn("Interrupted waiting for " + nextNode.getClass().getSimpleName()
+ " to complete. Exiting.", e);
} catch (ExecutionException e) {
log.error("Caught error executing " + nextNode.getClass().getSimpleName(), e);
throw e;
} catch (TimeoutException e) {"Timed out waiting for " + nextNode.getClass().getSimpleName()
+ " to complete (waited " + secondsRemaining + " seconds). Exiting.", e);
// The RandomWalk node throw an Exception that that Callable
// handed back
// Throw it and let the Module perform cleanup
if (null != nodeException) {
throw nodeException;
if (test)
} catch (Exception e) {
log.debug("AccumuloClient belongs to user: " + env.getAccumuloClient().whoami());
log.debug("Exception occured at: " + System.currentTimeMillis());
log.debug("Properties for node: " + nextNodeId);
for (Entry<Object,Object> entry : nodeProps.entrySet()) {
log.debug(" " + entry.getKey() + ": " + entry.getValue());
log.debug("Overall Configuration Properties");
for (Entry<Object,Object> entry : env.getTestProperties().entrySet()) {
log.debug(" " + entry.getKey() + ": " + entry.getValue());
log.debug("State information");
for (String key : new TreeSet<>(state.getMap().keySet())) {
Object value = state.getMap().get(key);
String logMsg = " " + key + ": ";
if (value == null)
logMsg += "null";
else if (value instanceof String || value instanceof Map || value instanceof Collection
|| value instanceof Number)
logMsg += value;
else if (value instanceof byte[])
logMsg += new String((byte[]) value, UTF_8);
else if (value instanceof PasswordToken)
logMsg += new String(((PasswordToken) value).getPassword(), UTF_8);
logMsg += value.getClass() + " - " + value;
throw new Exception("Error running node " + nextNodeId, e);
// update aliases
if ((aliases = aliasMap.get(curNodeId)) != null)
for (String alias : aliases) {
((Alias) nodes.get(alias)).update(curNodeId);
curNodeId = nextNodeId;
} finally {
if (null != service) {
if (teardown && (fixture != null)) {
log.debug("tearing down module");
fixture.tearDown(state, env);
Thread timer = null;
final int time = 5 * 1000 * 60;
AtomicBoolean runningLong = new AtomicBoolean(false);
long systemTime;
private void startTimer(final Node initNode) {
timer = new Thread(new Runnable() {
public void run() {
try {
systemTime = System.currentTimeMillis();
} catch (InterruptedException ie) {
long timeSinceLastProgress = System.currentTimeMillis() - initNode.lastProgress();
if (timeSinceLastProgress > time) {
log.warn("Node " + initNode + " has been running for " + timeSinceLastProgress / 1000.0
+ " seconds. You may want to look into it.");
private void stopTimer(Node nextNode) {
synchronized (timer) {
try {
} catch (InterruptedException e) {
log.error("Failed to join timer '" + timer.getName() + "'.", e);
if (runningLong.get())
log.warn("Node " + nextNode + ", which was running long, has now completed after "
+ (System.currentTimeMillis() - systemTime) / 1000.0 + " seconds");
public String toString() {
return id;
private String getFullName(String name) {
int index = name.indexOf(".");
if (index == -1 || name.endsWith(".xml")) {
return name;
String id = name.substring(0, index);
if (!prefixes.containsKey(id)) {
log.warn("Id (" + id + ") was not found in prefixes");
return name;
return prefixes.get(id).concat(name.substring(index + 1));
private Node createNode(String id, String src) throws Exception {
// check if id indicates dummy node
if (id.equalsIgnoreCase("END") || id.startsWith("dummy")) {
if (nodes.containsKey(id) == false) {
nodes.put(id, new Dummy(id));
return nodes.get(id);
if (id.startsWith("alias")) {
if (nodes.containsKey(id) == false) {
nodes.put(id, new Alias(id));
return nodes.get(id);
// grab node from framework based on its id or src
Node node;
if (src == null || src.isEmpty()) {
node = Framework.getInstance().getNode(getFullName(id));
} else {
node = Framework.getInstance().getNode(getFullName(src));
// add to node to this module's map
nodes.put(id, node);
return node;
private Node getNode(String id) throws Exception {
if (nodes.containsKey(id)) {
return nodes.get(id);
if (id.equalsIgnoreCase("END")) {
nodes.put(id, new Dummy(id));
return nodes.get(id);
return Framework.getInstance().getNode(getFullName(id));
private Properties getProps(String nodeId) {
if (localProps.containsKey(nodeId)) {
return localProps.get(nodeId);
return new Properties();
private void loadFromXml() throws Exception {
DocumentBuilderFactory dbf = DocumentBuilderFactory.newInstance();
DocumentBuilder docbuilder;
Document d = null;
// set the schema
SchemaFactory sf = SchemaFactory.newInstance(XMLConstants.W3C_XML_SCHEMA_NS_URI);
Schema moduleSchema = sf.newSchema(this.getClass().getClassLoader()
// parse the document
try {
docbuilder = dbf.newDocumentBuilder();
d = docbuilder.parse(this.getClass().getResourceAsStream("/randomwalk/modules/" + id));
} catch (Exception e) {
log.error("Failed to parse xml at randomwalk/modules/" + id, e);
throw new Exception("Failed to parse xml at randomwalk/modules/" + id);
// parse packages
NodeList nodelist = d.getDocumentElement().getElementsByTagName("package");
for (int i = 0; i < nodelist.getLength(); i++) {
Element el = (Element) nodelist.item(i);
String value = el.getAttribute("value");
if (!value.endsWith(".")) {
value = value.concat(".");
prefixes.put(el.getAttribute("prefix"), value);
// parse fixture node
nodelist = d.getDocumentElement().getElementsByTagName("fixture");
if (nodelist.getLength() > 0) {
Element fixtureEl = (Element) nodelist.item(0);
fixture = (Fixture) Class.forName(getFullName(fixtureEl.getAttribute("id"))).newInstance();
// parse initial node
Element initEl = (Element) d.getDocumentElement().getElementsByTagName("init").item(0);
initNodeId = initEl.getAttribute("id");
Properties initProps = new Properties();
String attr = initEl.getAttribute("maxHops");
if (attr != null)
initProps.setProperty("maxHops", attr);
attr = initEl.getAttribute("maxSec");
if (attr != null)
initProps.setProperty("maxSec", attr);
attr = initEl.getAttribute("teardown");
if (attr != null)
initProps.setProperty("teardown", attr);
localProps.put("_init", initProps);
// parse all nodes
nodelist = d.getDocumentElement().getElementsByTagName("node");
for (int i = 0; i < nodelist.getLength(); i++) {
Element nodeEl = (Element) nodelist.item(i);
// get attributes
String id = nodeEl.getAttribute("id");
if (adjMap.containsKey(id)) {
throw new Exception("Module already contains: " + id);
String src = nodeEl.getAttribute("src");
// create node
createNode(id, src);
// set some attributes in properties for later use
Properties props = new Properties();
props.setProperty("maxHops", nodeEl.getAttribute("maxHops"));
props.setProperty("maxSec", nodeEl.getAttribute("maxSec"));
props.setProperty("teardown", nodeEl.getAttribute("teardown"));
// parse aliases
NodeList aliaslist = nodeEl.getElementsByTagName("alias");
Set<String> aliases = new TreeSet<>();
for (int j = 0; j < aliaslist.getLength(); j++) {
Element propEl = (Element) aliaslist.item(j);
if (!propEl.hasAttribute("name")) {
throw new Exception("Node " + id + " has alias with no identifying name");
String key = "alias." + propEl.getAttribute("name");
createNode(key, null);
if (aliases.size() > 0)
aliasMap.put(id, aliases);
// parse properties of nodes
NodeList proplist = nodeEl.getElementsByTagName("property");
for (int j = 0; j < proplist.getLength(); j++) {
Element propEl = (Element) proplist.item(j);
if (!propEl.hasAttribute("key") || !propEl.hasAttribute("value")) {
throw new Exception("Node " + id + " has property with no key or value");
String key = propEl.getAttribute("key");
if (key.equals("maxHops") || key.equals("maxSec") || key.equals("teardown")) {
throw new Exception("The following property can only be set in attributes: " + key);
props.setProperty(key, propEl.getAttribute("value"));
localProps.put(id, props);
// parse edges of nodes
AdjList edges = new AdjList();
adjMap.put(id, edges);
NodeList edgelist = nodeEl.getElementsByTagName("edge");
if (edgelist.getLength() == 0) {
throw new Exception("Node " + id + " has no edges!");
for (int j = 0; j < edgelist.getLength(); j++) {
Element edgeEl = (Element) edgelist.item(j);
String edgeID = edgeEl.getAttribute("id");
if (!edgeEl.hasAttribute("weight")) {
throw new Exception("Edge with id=" + edgeID + " is missing weight");
int weight = Integer.parseInt(edgeEl.getAttribute("weight"));
edges.addEdge(edgeID, weight);