Merge branch '1.4.5-SNAPSHOT' into 1.5.1-SNAPSHOT
Conflicts:
test/src/main/java/org/apache/accumulo/test/functional/RunTests.java
test/system/auto/TestUtils.py
diff --git a/test/src/main/java/org/apache/accumulo/test/functional/RunTests.java b/test/src/main/java/org/apache/accumulo/test/functional/RunTests.java
index 0da0f61..0f4dd8e 100644
--- a/test/src/main/java/org/apache/accumulo/test/functional/RunTests.java
+++ b/test/src/main/java/org/apache/accumulo/test/functional/RunTests.java
@@ -16,11 +16,14 @@
*/
package org.apache.accumulo.test.functional;
+import java.io.BufferedReader;
import java.io.File;
import java.io.IOException;
import java.io.InputStream;
+import java.io.InputStreamReader;
import java.util.Arrays;
import java.util.List;
+import java.util.Map;
import org.apache.accumulo.core.Constants;
import org.apache.accumulo.core.cli.Help;
@@ -83,9 +86,26 @@
static public class TestMapper extends Mapper<LongWritable,Text,Text,Text> {
+ private static final String REDUCER_RESULT_START = "::::: ";
+ private static final int RRS_LEN = REDUCER_RESULT_START.length();
+ private Text result = new Text();
+
+ private static enum Outcome {
+ SUCCESS, FAILURE, ERROR, UNEXPECTED_SUCCESS, EXPECTED_FAILURE
+ }
+ private static final Map<Character, Outcome> OUTCOME_COUNTERS;
+ static {
+ OUTCOME_COUNTERS = new java.util.HashMap<Character, Outcome>();
+ OUTCOME_COUNTERS.put('S', Outcome.SUCCESS);
+ OUTCOME_COUNTERS.put('F', Outcome.FAILURE);
+ OUTCOME_COUNTERS.put('E', Outcome.ERROR);
+ OUTCOME_COUNTERS.put('T', Outcome.UNEXPECTED_SUCCESS);
+ OUTCOME_COUNTERS.put('G', Outcome.EXPECTED_FAILURE);
+ }
+
@Override
protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
- List<String> cmd = Arrays.asList("/usr/bin/python", "test/system/auto/run.py", "-t", value.toString());
+ List<String> cmd = Arrays.asList("/usr/bin/python", "test/system/auto/run.py", "-m", "-t", value.toString());
log.info("Running test " + cmd);
ProcessBuilder pb = new ProcessBuilder(cmd);
pb.directory(new File(context.getConfiguration().get("accumulo.home")));
@@ -93,19 +113,31 @@
Process p = pb.start();
p.getOutputStream().close();
InputStream out = p.getInputStream();
- byte[] buffer = new byte[1024];
- int len = 0;
- Text result = new Text();
+ InputStreamReader outr = new InputStreamReader(out, Constants.UTF8);
+ BufferedReader br = new BufferedReader(outr);
+ String line;
try {
- while ((len = out.read(buffer)) > 0) {
- log.info("More: " + new String(buffer, 0, len, Constants.UTF8));
- result.append(buffer, 0, len);
+ while ((line = br.readLine()) != null) {
+ log.info("More: " + line);
+ if (line.startsWith(REDUCER_RESULT_START)) {
+ String resultLine = line.substring(RRS_LEN);
+ if (resultLine.length() > 0) {
+ Outcome outcome = OUTCOME_COUNTERS.get(resultLine.charAt(0));
+ if (outcome != null) {
+ context.getCounter(outcome).increment(1);
+ }
+ }
+ String taskAttemptId = context.getTaskAttemptID().toString();
+ result.set(taskAttemptId + " " + resultLine);
+ context.write(value, result);
+ }
}
} catch (Exception ex) {
- log.error(ex, ex);
+ log.error(ex);
+ context.progress();
}
+
p.waitFor();
- context.write(value, result);
}
}
diff --git a/test/system/auto/run.py b/test/system/auto/run.py
index 74704e4..2509e0b 100755
--- a/test/system/auto/run.py
+++ b/test/system/auto/run.py
@@ -168,6 +168,7 @@
unittest.TestResult.__init__(self)
self.stream = stream
self.descriptions = descriptions
+ self.successes = []
def getDescription(self, test):
if self.descriptions:
@@ -184,6 +185,7 @@
def addSuccess(self, test):
unittest.TestResult.addSuccess(self, test)
+ self.successes.append(test)
self.stream.writeln("ok")
def addError(self, test, err):
@@ -230,6 +232,9 @@
except:
compile()
+def emitMapReduceResult(code, test):
+ print '::::: %s %s' % (code, str(test))
+
def main():
makeDiskFailureLibrary()
@@ -256,7 +261,9 @@
parser.add_option('-s', '--start', dest='start', default=None,
help='Start the test list at the given test name')
parser.add_option('-x', '--xml', dest='xmlreport', default=False, action='store_true',
- help='Output tests results to xml (jenkins conpatible)')
+ help='Output test results to xml (jenkins compatible)')
+ parser.add_option('-m', '--mapreduce', dest='mapreduce', default=False, action='store_true',
+ help='Output test results suitable for mapreduce')
parser.add_option('-f', '--timeout-factor', dest='timeout_factor',
default=1, type=int,
help="Multiplier for some timeouts (use on slower hardware) (%default)")
@@ -316,8 +323,9 @@
else:
removeInstrumentedAccumuloJars()
+ results = []
for i in range(options.repeat):
- runner.run(suite)
+ results.append(runner.run(suite))
if options.coverage:
mergeCoverage()
@@ -326,6 +334,28 @@
os.path.join(ACCUMULO_HOME,'src','server','src','main','java')]
)
+ numFailures = 0
+ doEmitMR = options.mapreduce and not options.xmlreport
+ for result in results:
+ if doEmitMR:
+ for test in result.successes:
+ emitMapReduceResult('S', test)
+ if hasattr(result, 'expectedFailures'):
+ for test, err in result.expectedFailures:
+ emitMapReduceResult('G', test)
+ for test, err in result.failures:
+ emitMapReduceResult('F', test)
+ for test, err in result.errors:
+ emitMapReduceResult('E', test)
+ numFailures += len(result.failures)
+ numFailures += len(result.errors)
+ if hasattr(result, 'unexpectedSuccesses'):
+ if doEmitMR:
+ for test in result.unexpectedSuccesses:
+ emitMapReduceResult('T', test)
+ numFailures += len(result.unexpectedSuccesses)
+ if numFailures > 0:
+ sys.exit(1)
if __name__ == '__main__':
main()