MAPREDUCE-2529. Add support for regex-based shuffle metric counting
exceptions. Contributed by Thomas Graves


git-svn-id: https://svn.apache.org/repos/asf/hadoop/mapreduce/trunk@1131736 13f79535-47bb-0310-9956-ffa450edef68
diff --git a/CHANGES.txt b/CHANGES.txt
index 173cd55..99cef35 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -272,6 +272,9 @@
     MAPREDUCE-2536. Update FsShell -mv command usage in TestMRCLI.  (Daryn
     Sharp via szetszwo)
 
+    MAPREDUCE-2529. Add support for regex-based shuffle metric counting
+    exceptions. (Thomas Graves via cdouglas)
+
 Release 0.22.0 - Unreleased
 
   INCOMPATIBLE CHANGES
diff --git a/src/java/org/apache/hadoop/mapred/TaskTracker.java b/src/java/org/apache/hadoop/mapred/TaskTracker.java
index ba1fd2d..a07e626 100644
--- a/src/java/org/apache/hadoop/mapred/TaskTracker.java
+++ b/src/java/org/apache/hadoop/mapred/TaskTracker.java
@@ -327,12 +327,13 @@
    * the specific metrics for shuffle. The TaskTracker is actually a server for
    * the shuffle and hence the name ShuffleServerMetrics.
    */
-  private class ShuffleServerMetrics implements Updater {
+  class ShuffleServerMetrics implements Updater {
     private MetricsRecord shuffleMetricsRecord = null;
     private int serverHandlerBusy = 0;
     private long outputBytes = 0;
     private int failedOutputs = 0;
     private int successOutputs = 0;
+    private int exceptionsCaught = 0;
     ShuffleServerMetrics(JobConf conf) {
       MetricsContext context = MetricsUtil.getContext("mapred");
       shuffleMetricsRecord = 
@@ -355,6 +356,9 @@
     synchronized void successOutput() {
       ++successOutputs;
     }
+    synchronized void exceptionsCaught() {
+      ++exceptionsCaught;
+    }
     public void doUpdates(MetricsContext unused) {
       synchronized (this) {
         if (workerThreads != 0) {
@@ -369,9 +373,12 @@
                                         failedOutputs);
         shuffleMetricsRecord.incrMetric("shuffle_success_outputs", 
                                         successOutputs);
+        shuffleMetricsRecord.incrMetric("shuffle_exceptions_caught",
+                                        exceptionsCaught);
         outputBytes = 0;
         failedOutputs = 0;
         successOutputs = 0;
+        exceptionsCaught = 0;
       }
       shuffleMetricsRecord.update();
     }
@@ -1376,6 +1383,10 @@
     server.setAttribute("log", LOG);
     server.setAttribute("localDirAllocator", localDirAllocator);
     server.setAttribute("shuffleServerMetrics", shuffleServerMetrics);
+    String exceptionStackRegex = conf.get(JTConfig.SHUFFLE_EXCEPTION_STACK_REGEX);
+    String exceptionMsgRegex = conf.get(JTConfig.SHUFFLE_EXCEPTION_MSG_REGEX);
+    server.setAttribute("exceptionStackRegex", exceptionStackRegex);
+    server.setAttribute("exceptionMsgRegex", exceptionMsgRegex);
     server.addInternalServlet("mapOutput", "/mapOutput", MapOutputServlet.class);
     server.addServlet("taskLog", "/tasklog", TaskLogServlet.class);
     server.start();
@@ -3660,6 +3671,10 @@
         (ShuffleServerMetrics) context.getAttribute("shuffleServerMetrics");
       TaskTracker tracker = 
         (TaskTracker) context.getAttribute("task.tracker");
+      String exceptionStackRegex =
+        (String) context.getAttribute("exceptionStackRegex");
+      String exceptionMsgRegex =
+        (String) context.getAttribute("exceptionMsgRegex");
 
       verifyRequest(request, response, tracker, jobId);
       
@@ -3690,6 +3705,7 @@
         String errorMsg = ("getMapOutputs(" + mapIds + "," + reduceId + 
                            ") failed");
         log.warn(errorMsg, ie);
+        checkException(ie, exceptionMsgRegex, exceptionStackRegex, shuffleMetrics);
         response.sendError(HttpServletResponse.SC_GONE, errorMsg);
         shuffleMetrics.failedOutput();
         throw ie;
@@ -3712,6 +3728,37 @@
       }
     }
 
+    protected void checkException(IOException ie, String exceptionMsgRegex,
+        String exceptionStackRegex, ShuffleServerMetrics shuffleMetrics) {
+      // parse exception to see if it looks like a regular expression you
+      // configure. If both msgRegex and StackRegex set then make sure both
+      // match, otherwise only the one set has to match.
+      if (exceptionMsgRegex != null) {
+        String msg = ie.getMessage();
+        if (msg == null || !msg.matches(exceptionMsgRegex)) {
+          return;
+        }
+      }
+      if (exceptionStackRegex != null
+          && !checkStackException(ie, exceptionStackRegex)) {
+        return;
+      }
+      shuffleMetrics.exceptionsCaught();
+    }
+
+    private boolean checkStackException(IOException ie,
+        String exceptionStackRegex) {
+      StackTraceElement[] stack = ie.getStackTrace();
+
+      for (StackTraceElement elem : stack) {
+        String stacktrace = elem.toString();
+        if (stacktrace.matches(exceptionStackRegex)) {
+          return true;
+        }
+      }
+      return false;
+    }
+
     private void sendMapFile(String jobId, String mapId,
                              int reduce,
                              Configuration conf,
diff --git a/src/java/org/apache/hadoop/mapreduce/server/jobtracker/JTConfig.java b/src/java/org/apache/hadoop/mapreduce/server/jobtracker/JTConfig.java
index d6a4c0f..4eb9188 100644
--- a/src/java/org/apache/hadoop/mapreduce/server/jobtracker/JTConfig.java
+++ b/src/java/org/apache/hadoop/mapreduce/server/jobtracker/JTConfig.java
@@ -111,4 +111,9 @@
      "mapreduce.jobtracker.webinterface.trusted";
   public static final String JT_PLUGINS = 
     "mapreduce.jobtracker.plugins";
+  public static final String SHUFFLE_EXCEPTION_STACK_REGEX =
+    "mapreduce.reduce.shuffle.catch.exception.stack.regex";
+  public static final String SHUFFLE_EXCEPTION_MSG_REGEX =
+    "mapreduce.reduce.shuffle.catch.exception.message.regex";
+
 }
diff --git a/src/test/mapred/org/apache/hadoop/mapred/TestShuffleExceptionCount.java b/src/test/mapred/org/apache/hadoop/mapred/TestShuffleExceptionCount.java
new file mode 100644
index 0000000..9de261b
--- /dev/null
+++ b/src/test/mapred/org/apache/hadoop/mapred/TestShuffleExceptionCount.java
@@ -0,0 +1,245 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.mapred;
+
+import static org.junit.Assert.assertEquals;
+
+import java.io.IOException;
+import java.util.Collection;
+import java.util.Map;
+
+import org.apache.hadoop.mapred.TaskTracker.ShuffleServerMetrics;
+import org.apache.hadoop.metrics.ContextFactory;
+import org.apache.hadoop.metrics.MetricsContext;
+import org.apache.hadoop.metrics.spi.OutputRecord;
+import org.junit.Test;
+
+public class TestShuffleExceptionCount {
+
+  public static class TestMapOutputServlet extends TaskTracker.MapOutputServlet {
+
+    public void checkException(IOException ie, String exceptionMsgRegex,
+        String exceptionStackRegex, ShuffleServerMetrics shuffleMetrics) {
+      super.checkException(ie, exceptionMsgRegex, exceptionStackRegex,
+          shuffleMetrics);
+    }
+
+  }
+
+  @Test
+  public void testCheckException() throws IOException, InterruptedException,
+      ClassNotFoundException, InstantiationException, IllegalAccessException {
+    TestMapOutputServlet testServlet = new TestMapOutputServlet();
+    JobConf conf = new JobConf();
+    conf.setUser("testuser");
+    conf.setJobName("testJob");
+    conf.setSessionId("testSession");
+
+    // setup metrics context factory
+    ContextFactory factory = ContextFactory.getFactory();
+    factory.setAttribute("mapred.class",
+        "org.apache.hadoop.metrics.spi.NoEmitMetricsContext");
+
+    TaskTracker tt = new TaskTracker();
+    tt.setConf(conf);
+    ShuffleServerMetrics shuffleMetrics = tt.new ShuffleServerMetrics(conf);
+
+    // first test with only MsgRegex set but doesn't match
+    String exceptionMsgRegex = "Broken pipe";
+    String exceptionStackRegex = null;
+    IOException ie = new IOException("EOFException");
+    testServlet.checkException(ie, exceptionMsgRegex, exceptionStackRegex,
+        shuffleMetrics);
+
+    MetricsContext context = factory.getContext("mapred");
+    shuffleMetrics.doUpdates(context);
+    Map<String, Collection<OutputRecord>> records = context.getAllRecords();
+    Collection<OutputRecord> col = records.get("shuffleOutput");
+    OutputRecord outputRecord = col.iterator().next();
+    assertEquals(0, outputRecord.getMetric("shuffle_exceptions_caught")
+        .intValue());
+
+    // test with only MsgRegex set that does match
+    ie = new IOException("Broken pipe");
+    testServlet.checkException(ie, exceptionMsgRegex, exceptionStackRegex,
+        shuffleMetrics);
+
+    shuffleMetrics.doUpdates(context);
+    assertEquals(1, outputRecord.getMetric("shuffle_exceptions_caught")
+        .intValue());
+
+    // test with neither set, make sure incremented
+    exceptionStackRegex = null;
+    exceptionMsgRegex = null;
+    testServlet.checkException(ie, exceptionMsgRegex, exceptionStackRegex,
+        shuffleMetrics);
+    shuffleMetrics.doUpdates(context);
+    assertEquals(2, outputRecord.getMetric("shuffle_exceptions_caught")
+        .intValue());
+
+    // test with only StackRegex set doesn't match
+    exceptionStackRegex = ".*\\.doesnt\\$SelectSet\\.wakeup.*";
+    exceptionMsgRegex = null;
+    ie.setStackTrace(constructStackTrace());
+    testServlet.checkException(ie, exceptionMsgRegex, exceptionStackRegex,
+        shuffleMetrics);
+    shuffleMetrics.doUpdates(context);
+    assertEquals(2, outputRecord.getMetric("shuffle_exceptions_caught")
+        .intValue());
+
+    // test with only StackRegex set does match
+    exceptionStackRegex = ".*\\.SelectorManager\\$SelectSet\\.wakeup.*";
+    testServlet.checkException(ie, exceptionMsgRegex, exceptionStackRegex,
+        shuffleMetrics);
+    shuffleMetrics.doUpdates(context);
+    assertEquals(3, outputRecord.getMetric("shuffle_exceptions_caught")
+        .intValue());
+
+    // test with both regex set and matches
+    exceptionMsgRegex = "Broken pipe";
+    ie.setStackTrace(constructStackTraceTwo());
+    testServlet.checkException(ie, exceptionMsgRegex, exceptionStackRegex,
+        shuffleMetrics);
+    shuffleMetrics.doUpdates(context);
+    assertEquals(4, outputRecord.getMetric("shuffle_exceptions_caught")
+        .intValue());
+
+    // test with both regex set and only msg matches
+    exceptionStackRegex = ".*[1-9]+BOGUSREGEX";
+    testServlet.checkException(ie, exceptionMsgRegex, exceptionStackRegex,
+        shuffleMetrics);
+    shuffleMetrics.doUpdates(context);
+    assertEquals(4, outputRecord.getMetric("shuffle_exceptions_caught")
+        .intValue());
+
+    // test with both regex set and only stack matches
+    exceptionStackRegex = ".*\\.SelectorManager\\$SelectSet\\.wakeup.*";
+    exceptionMsgRegex = "EOFException";
+    testServlet.checkException(ie, exceptionMsgRegex, exceptionStackRegex,
+        shuffleMetrics);
+    shuffleMetrics.doUpdates(context);
+    assertEquals(4, outputRecord.getMetric("shuffle_exceptions_caught")
+        .intValue());
+  }
+
+  /*
+   * Construction exception like: java.io.IOException: Broken pipe at
+   * sun.nio.ch.EPollArrayWrapper.interrupt(Native Method) at
+   * sun.nio.ch.EPollArrayWrapper.interrupt(EPollArrayWrapper.java:256) at
+   * sun.nio.ch.EPollSelectorImpl.wakeup(EPollSelectorImpl.java:175) at
+   * org.mortbay
+   * .io.nio.SelectorManager$SelectSet.wakeup(SelectorManager.java:831) at
+   * org.mortbay
+   * .io.nio.SelectorManager$SelectSet.doSelect(SelectorManager.java:709) at
+   * org.mortbay.io.nio.SelectorManager.doSelect(SelectorManager.java:192) at
+   * org
+   * .mortbay.jetty.nio.SelectChannelConnector.accept(SelectChannelConnector.java
+   * :124) at
+   * org.mortbay.jetty.AbstractConnector$Acceptor.run(AbstractConnector.
+   * java:708) at
+   * org.mortbay.thread.QueuedThreadPool$PoolThread.run(QueuedThreadPool
+   * .java:582)
+   */
+  private StackTraceElement[] constructStackTrace() {
+    StackTraceElement[] stack = new StackTraceElement[9];
+    stack[0] = new StackTraceElement("sun.nio.ch.EPollArrayWrapper",
+        "interrupt", "", -2);
+    stack[1] = new StackTraceElement("sun.nio.ch.EPollArrayWrapper",
+        "interrupt", "EPollArrayWrapper.java", 256);
+    stack[2] = new StackTraceElement("sun.nio.ch.EPollSelectorImpl", "wakeup",
+        "EPollSelectorImpl.java", 175);
+    stack[3] = new StackTraceElement(
+        "org.mortbay.io.nio.SelectorManager$SelectSet", "wakeup",
+        "SelectorManager.java", 831);
+    stack[4] = new StackTraceElement(
+        "org.mortbay.io.nio.SelectorManager$SelectSet", "doSelect",
+        "SelectorManager.java", 709);
+    stack[5] = new StackTraceElement("org.mortbay.io.nio.SelectorManager",
+        "doSelect", "SelectorManager.java", 192);
+    stack[6] = new StackTraceElement(
+        "org.mortbay.jetty.nio.SelectChannelConnector", "accept",
+        "SelectChannelConnector.java", 124);
+    stack[7] = new StackTraceElement(
+        "org.mortbay.jetty.AbstractConnector$Acceptor", "run",
+        "AbstractConnector.java", 708);
+    stack[8] = new StackTraceElement(
+        "org.mortbay.thread.QueuedThreadPool$PoolThread", "run",
+        "QueuedThreadPool.java", 582);
+
+    return stack;
+  }
+
+  /*
+   * java.io.IOException: Broken pipe at
+   * sun.nio.ch.EPollArrayWrapper.interrupt(Native Method) at
+   * sun.nio.ch.EPollArrayWrapper.interrupt(EPollArrayWrapper.java:256) at
+   * sun.nio.ch.EPollSelectorImpl.wakeup(EPollSelectorImpl.java:175) at
+   * org.mortbay
+   * .io.nio.SelectorManager$SelectSet.wakeup(SelectorManager.java:831) at
+   * org.mortbay
+   * .io.nio.SelectChannelEndPoint.updateKey(SelectChannelEndPoint.java:335) at
+   * org
+   * .mortbay.io.nio.SelectChannelEndPoint.blockWritable(SelectChannelEndPoint
+   * .java:278) at
+   * org.mortbay.jetty.AbstractGenerator$Output.blockForOutput(AbstractGenerator
+   * .java:545) at
+   * org.mortbay.jetty.AbstractGenerator$Output.flush(AbstractGenerator
+   * .java:572) at
+   * org.mortbay.jetty.HttpConnection$Output.flush(HttpConnection.java:1012) at
+   * org
+   * .mortbay.jetty.AbstractGenerator$Output.write(AbstractGenerator.java:651)at
+   * org
+   * .mortbay.jetty.AbstractGenerator$Output.write(AbstractGenerator.java:580)
+   * at
+   */
+  private StackTraceElement[] constructStackTraceTwo() {
+    StackTraceElement[] stack = new StackTraceElement[11];
+    stack[0] = new StackTraceElement("sun.nio.ch.EPollArrayWrapper",
+        "interrupt", "", -2);
+    stack[1] = new StackTraceElement("sun.nio.ch.EPollArrayWrapper",
+        "interrupt", "EPollArrayWrapper.java", 256);
+    stack[2] = new StackTraceElement("sun.nio.ch.EPollSelectorImpl", "wakeup",
+        "EPollSelectorImpl.java", 175);
+    stack[3] = new StackTraceElement(
+        "org.mortbay.io.nio.SelectorManager$SelectSet", "wakeup",
+        "SelectorManager.java", 831);
+    stack[4] = new StackTraceElement(
+        "org.mortbay.io.nio.SelectChannelEndPoint", "updateKey",
+        "SelectChannelEndPoint.java", 335);
+    stack[5] = new StackTraceElement(
+        "org.mortbay.io.nio.SelectChannelEndPoint", "blockWritable",
+        "SelectChannelEndPoint.java", 278);
+    stack[6] = new StackTraceElement(
+        "org.mortbay.jetty.AbstractGenerator$Output", "blockForOutput",
+        "AbstractGenerator.java", 545);
+    stack[7] = new StackTraceElement(
+        "org.mortbay.jetty.AbstractGenerator$Output", "flush",
+        "AbstractGenerator.java", 572);
+    stack[8] = new StackTraceElement("org.mortbay.jetty.HttpConnection$Output",
+        "flush", "HttpConnection.java", 1012);
+    stack[9] = new StackTraceElement(
+        "org.mortbay.jetty.AbstractGenerator$Output", "write",
+        "AbstractGenerator.java", 651);
+    stack[10] = new StackTraceElement(
+        "org.mortbay.jetty.AbstractGenerator$Output", "write",
+        "AbstractGenerator.java", 580);
+
+    return stack;
+  }
+
+}