Merge branch '1.4.5-SNAPSHOT' into 1.5.1-SNAPSHOT

Conflicts:
	test/src/main/java/org/apache/accumulo/test/functional/RunTests.java
	test/system/auto/TestUtils.py
diff --git a/test/src/main/java/org/apache/accumulo/test/functional/RunTests.java b/test/src/main/java/org/apache/accumulo/test/functional/RunTests.java
index 0da0f61..0f4dd8e 100644
--- a/test/src/main/java/org/apache/accumulo/test/functional/RunTests.java
+++ b/test/src/main/java/org/apache/accumulo/test/functional/RunTests.java
@@ -16,11 +16,14 @@
  */
 package org.apache.accumulo.test.functional;
 
+import java.io.BufferedReader;
 import java.io.File;
 import java.io.IOException;
 import java.io.InputStream;
+import java.io.InputStreamReader;
 import java.util.Arrays;
 import java.util.List;
+import java.util.Map;
 
 import org.apache.accumulo.core.Constants;
 import org.apache.accumulo.core.cli.Help;
@@ -83,9 +86,26 @@
   
   static public class TestMapper extends Mapper<LongWritable,Text,Text,Text> {
     
+    private static final String REDUCER_RESULT_START = "::::: ";
+    private static final int RRS_LEN = REDUCER_RESULT_START.length();
+    private Text result = new Text();
+
+    private static enum Outcome {
+      SUCCESS, FAILURE, ERROR, UNEXPECTED_SUCCESS, EXPECTED_FAILURE
+    }
+    private static final Map<Character, Outcome> OUTCOME_COUNTERS;
+    static {
+      OUTCOME_COUNTERS = new java.util.HashMap<Character, Outcome>();
+      OUTCOME_COUNTERS.put('S', Outcome.SUCCESS);
+      OUTCOME_COUNTERS.put('F', Outcome.FAILURE);
+      OUTCOME_COUNTERS.put('E', Outcome.ERROR);
+      OUTCOME_COUNTERS.put('T', Outcome.UNEXPECTED_SUCCESS);
+      OUTCOME_COUNTERS.put('G', Outcome.EXPECTED_FAILURE);
+    }
+
     @Override
     protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
-      List<String> cmd = Arrays.asList("/usr/bin/python", "test/system/auto/run.py", "-t", value.toString());
+      List<String> cmd = Arrays.asList("/usr/bin/python", "test/system/auto/run.py", "-m", "-t", value.toString());
       log.info("Running test " + cmd);
       ProcessBuilder pb = new ProcessBuilder(cmd);
       pb.directory(new File(context.getConfiguration().get("accumulo.home")));
@@ -93,19 +113,31 @@
       Process p = pb.start();
       p.getOutputStream().close();
       InputStream out = p.getInputStream();
-      byte[] buffer = new byte[1024];
-      int len = 0;
-      Text result = new Text();
+      InputStreamReader outr = new InputStreamReader(out, Constants.UTF8);
+      BufferedReader br = new BufferedReader(outr);
+      String line;
       try {
-        while ((len = out.read(buffer)) > 0) {
-          log.info("More: " + new String(buffer, 0, len, Constants.UTF8));
-          result.append(buffer, 0, len);
+        while ((line = br.readLine()) != null) {
+          log.info("More: " + line);
+          if (line.startsWith(REDUCER_RESULT_START)) {
+            String resultLine = line.substring(RRS_LEN);
+            if (resultLine.length() > 0) {
+              Outcome outcome = OUTCOME_COUNTERS.get(resultLine.charAt(0));
+              if (outcome != null) {
+                context.getCounter(outcome).increment(1);
+              }
+            }
+            String taskAttemptId = context.getTaskAttemptID().toString();
+            result.set(taskAttemptId + " " + resultLine);
+            context.write(value, result);
+          }
         }
       } catch (Exception ex) {
-        log.error(ex, ex);
+        log.error(ex);
+        context.progress();
       }
+
       p.waitFor();
-      context.write(value, result);
     }
     
   }
diff --git a/test/system/auto/run.py b/test/system/auto/run.py
index 74704e4..2509e0b 100755
--- a/test/system/auto/run.py
+++ b/test/system/auto/run.py
@@ -168,6 +168,7 @@
         unittest.TestResult.__init__(self)
         self.stream = stream
         self.descriptions = descriptions
+        self.successes = []
 
     def getDescription(self, test):
         if self.descriptions:
@@ -184,6 +185,7 @@
 
     def addSuccess(self, test):
         unittest.TestResult.addSuccess(self, test)
+        self.successes.append(test)
         self.stream.writeln("ok")
 
     def addError(self, test, err):
@@ -230,6 +232,9 @@
     except:
         compile()
     
+def emitMapReduceResult(code, test):
+    print '::::: %s %s' % (code, str(test))
+
 def main():
     makeDiskFailureLibrary()
     
@@ -256,7 +261,9 @@
     parser.add_option('-s', '--start', dest='start', default=None, 
                       help='Start the test list at the given test name')
     parser.add_option('-x', '--xml', dest='xmlreport', default=False, action='store_true',
-                      help='Output tests results to xml (jenkins conpatible)')
+                      help='Output test results to xml (jenkins compatible)')
+    parser.add_option('-m', '--mapreduce', dest='mapreduce', default=False, action='store_true',
+                      help='Output test results suitable for mapreduce')
     parser.add_option('-f', '--timeout-factor', dest='timeout_factor',
                       default=1, type=int,
                       help="Multiplier for some timeouts (use on slower hardware) (%default)")
@@ -316,8 +323,9 @@
     else:
         removeInstrumentedAccumuloJars()
 
+    results = []
     for i in range(options.repeat):
-        runner.run(suite)
+        results.append(runner.run(suite))
 
     if options.coverage:
         mergeCoverage()
@@ -326,6 +334,28 @@
              os.path.join(ACCUMULO_HOME,'src','server','src','main','java')]
             )
 
+    numFailures = 0
+    doEmitMR = options.mapreduce and not options.xmlreport
+    for result in results:
+        if doEmitMR:
+            for test in result.successes:
+                emitMapReduceResult('S', test)
+            if hasattr(result, 'expectedFailures'):
+                for test, err in result.expectedFailures:
+                    emitMapReduceResult('G', test)
+            for test, err in result.failures:
+                emitMapReduceResult('F', test)
+            for test, err in result.errors:
+                emitMapReduceResult('E', test)
+        numFailures += len(result.failures)
+        numFailures += len(result.errors)
+        if hasattr(result, 'unexpectedSuccesses'):
+            if doEmitMR:
+                for test in result.unexpectedSuccesses:
+                    emitMapReduceResult('T', test)
+            numFailures += len(result.unexpectedSuccesses)
+    if numFailures > 0:
+        sys.exit(1)
 
 if __name__ == '__main__':
     main()