MAPREDUCE-2000. Fix parsing of JobHistory lines in Rumen when quotes are
escaped. Contributed by Hong Tang


git-svn-id: https://svn.apache.org/repos/asf/hadoop/mapreduce/trunk@984695 13f79535-47bb-0310-9956-ffa450edef68
diff --git a/CHANGES.txt b/CHANGES.txt
index 07c2a12..1f52d4a 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -126,9 +126,6 @@
     MAPREDUCE-1829. JobInProgress.findSpeculativeTask should use min() to
     find the candidate instead of sort(). (Scott Chen via vinodkv)
 
-    MAPREDUCE-1961. Fix ConcurrentModificationException in Gridmix during
-    shutdown. (Hong Tang via cdouglas)
-
   BUG FIXES
 
     MAPREDUCE-1845. FairScheduler.tasksToPreempt() can return negative number.
@@ -248,6 +245,12 @@
     MAPREDUCE-1780. AccessControlList.toString() is used for serialization of
     ACL in JobStatus.java. (Ravi Gummadi via vinodkv)
 
+    MAPREDUCE-1961. Fix ConcurrentModificationException in Gridmix during
+    shutdown. (Hong Tang via cdouglas)
+
+    MAPREDUCE-2000. Fix parsing of JobHistory lines in Rumen when quotes are
+    escaped. (Hong Tang via cdouglas)
+
 Release 0.21.0 - Unreleased
 
   INCOMPATIBLE CHANGES
diff --git a/src/test/mapred/org/apache/hadoop/tools/rumen/TestParsedLine.java b/src/test/mapred/org/apache/hadoop/tools/rumen/TestParsedLine.java
new file mode 100644
index 0000000..4464848
--- /dev/null
+++ b/src/test/mapred/org/apache/hadoop/tools/rumen/TestParsedLine.java
@@ -0,0 +1,105 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.tools.rumen;
+
+import org.apache.hadoop.util.StringUtils;
+import org.junit.Test;
+import static org.junit.Assert.*;
+
+public class TestParsedLine {
+  static final char[] CHARS_TO_ESCAPE = new char[]{'=', '"', '.'};
+  
+  String buildLine(String type, String[] kvseq) {
+    StringBuilder sb = new StringBuilder();
+    sb.append(type);
+    for (int i=0; i<kvseq.length; ++i) {
+      sb.append(" ");
+      if (kvseq[i].equals(".") || kvseq[i].equals("\n")) {
+        sb.append(kvseq[i]);
+        continue;
+      }
+      if (i == kvseq.length-1) {
+        fail("Incorrect input, expecting value.");
+      }
+      sb.append(kvseq[i++]);
+      sb.append("=\"");
+      sb.append(StringUtils.escapeString(kvseq[i], StringUtils.ESCAPE_CHAR,
+          CHARS_TO_ESCAPE));
+      sb.append("\"");
+    }
+    return sb.toString();
+  }
+  
+  void testOneLine(String type, String... kvseq) {
+    String line = buildLine(type, kvseq);
+    ParsedLine pl = new ParsedLine(line, Hadoop20JHParser.internalVersion);
+    assertEquals("Mismatching type", type, pl.getType().toString());
+    for (int i = 0; i < kvseq.length; ++i) {
+      if (kvseq[i].equals(".") || kvseq[i].equals("\n")) {
+        continue;
+      }
+
+      assertEquals("Key mismatching for " + kvseq[i], kvseq[i + 1], StringUtils
+          .unEscapeString(pl.get(kvseq[i]), StringUtils.ESCAPE_CHAR,
+              CHARS_TO_ESCAPE));
+      ++i;
+    }
+  }
+  
+  @Test
+  public void testEscapedQuote() {
+    testOneLine("REC", "A", "x", "B", "abc\"de", "C", "f");
+    testOneLine("REC", "B", "abcde\"", "C", "f");
+    testOneLine("REC", "A", "x", "B", "\"abcde");
+  }
+
+  @Test
+  public void testEqualSign() {
+    testOneLine("REC1", "A", "x", "B", "abc=de", "C", "f");
+    testOneLine("REC2", "B", "=abcde", "C", "f");
+    testOneLine("REC3", "A", "x", "B", "abcde=");
+  }
+
+  @Test
+  public void testSpace() {
+    testOneLine("REC1", "A", "x", "B", "abc de", "C", "f");
+    testOneLine("REC2", "B", " ab c de", "C", "f");
+    testOneLine("REC3", "A", "x", "B", "abc\t  de  ");
+  }
+
+  @Test
+  public void testBackSlash() {
+    testOneLine("REC1", "A", "x", "B", "abc\\de", "C", "f");
+    testOneLine("REC2", "B", "\\ab\\c\\de", "C", "f");
+    testOneLine("REC3", "A", "x", "B", "abc\\\\de\\");
+    testOneLine("REC4", "A", "x", "B", "abc\\\"de\\\"", "C", "f");
+  }
+
+  @Test
+  public void testLineDelimiter() {
+    testOneLine("REC1", "A", "x", "B", "abc.de", "C", "f");
+    testOneLine("REC2", "B", ".ab.de");
+    testOneLine("REC3", "A", "x", "B", "abc.de.");
+    testOneLine("REC4", "A", "x", "B", "abc.de", ".");
+  }
+  
+  @Test
+  public void testMultipleLines() {
+    testOneLine("REC1", "A", "x", "\n", "B", "abc.de", "\n", "C", "f", "\n", ".");
+  }
+}
diff --git a/src/tools/org/apache/hadoop/tools/rumen/ParsedLine.java b/src/tools/org/apache/hadoop/tools/rumen/ParsedLine.java
index 2842c42..df82052 100644
--- a/src/tools/org/apache/hadoop/tools/rumen/ParsedLine.java
+++ b/src/tools/org/apache/hadoop/tools/rumen/ParsedLine.java
@@ -18,14 +18,33 @@
 package org.apache.hadoop.tools.rumen;
 
 import java.util.Properties;
+import java.util.regex.Matcher;
 import java.util.regex.Pattern;
 
 class ParsedLine {
   Properties content;
   LogRecordType type;
 
-  static final Pattern keyValPair =
-      Pattern.compile(" *([a-zA-Z0-9_]+)=\"((?:[^\"\\\\]|\\\\[ .\"\\\\])*)\"");
+  static final String KEY = "(\\w+)";
+  /**
+   * The value string is enclosed in double quotation marks ('"') and
+   * occurrences of '"' and '\' are escaped with a '\'. So the escaped value
+   * string is essentially a string of escaped sequence ('\' followed by any
+   * character) or any character other than '"' and '\'.
+   * 
+   * The straightforward REGEX to capture the above is "((?:[^\"\\\\]|\\\\.)*)".
+   * Unfortunately Java's REGEX implementation is "broken" that it does not
+   * perform the NFA-to-DFA conversion and such expressions would lead to
+   * backtracking and stack overflow when matching with long strings. The
+   * following is a manual "unfolding" of the REGEX to get rid of backtracking.
+   */
+  static final String VALUE = "([^\"\\\\]*+(?:\\\\.[^\"\\\\]*+)*+)";
+  /**
+   * REGEX to match the Key-Value pairs in an input line. Capture group 1
+   * matches the key and capture group 2 matches the value (without quotation
+   * marks).
+   */
+  static final Pattern keyValPair = Pattern.compile(KEY + "=" + "\"" + VALUE + "\"");
 
   @SuppressWarnings("unused")
   ParsedLine(String fullLine, int version) {
@@ -47,65 +66,12 @@
 
     String propValPairs = fullLine.substring(firstSpace + 1);
 
-    int pvPairsFirstNonBlank = 0;
-    int pvPairsLength = propValPairs.length();
+    Matcher matcher = keyValPair.matcher(propValPairs);
 
-    while (pvPairsLength > pvPairsFirstNonBlank
-        && propValPairs.charAt(pvPairsFirstNonBlank) == ' ') {
-      ++pvPairsFirstNonBlank;
-    }
-
-    if (pvPairsFirstNonBlank != 0) {
-      propValPairs = propValPairs.substring(pvPairsFirstNonBlank);
-    }
-
-    int cursor = 0;
-
-    while (cursor < propValPairs.length()) {
-      int equals = propValPairs.indexOf('=', cursor);
-
-      if (equals < 0) {
-        // maybe we do some error processing
-        return;
-      }
-
-      int nextCursor;
-
-      int endValue;
-
-      if (propValPairs.charAt(equals + 1) == '\"') {
-        int closeQuote = propValPairs.indexOf('\"', equals + 2);
-
-        nextCursor = closeQuote + 1;
-
-        endValue = closeQuote;
-
-        if (closeQuote < 0) {
-          endValue = propValPairs.length();
-
-          nextCursor = endValue;
-        }
-      } else {
-        int closeSpace = propValPairs.indexOf(' ', equals + 1);
-
-        if (closeSpace < 0) {
-          closeSpace = propValPairs.length();
-        }
-
-        endValue = closeSpace;
-
-        nextCursor = endValue;
-      }
-
-      content.setProperty(propValPairs.substring(cursor, equals), propValPairs
-          .substring(equals + 2, endValue));
-
-      cursor = nextCursor;
-
-      while (cursor < propValPairs.length()
-          && propValPairs.charAt(cursor) == ' ') {
-        ++cursor;
-      }
+    while(matcher.find()){
+      String key = matcher.group(1);
+      String value = matcher.group(2);
+      content.setProperty(key, value);
     }
   }