MAPREDUCE-577. Fixes duplicate records in StreamXmlRecordReader. Contributed by Ravi Gummadi
git-svn-id: https://svn.apache.org/repos/asf/hadoop/mapreduce/trunk@960534 13f79535-47bb-0310-9956-ffa450edef68
diff --git a/CHANGES.txt b/CHANGES.txt
index ababe2a..b76c72b 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -140,6 +140,9 @@
MAPREDUCE-1888. Fixes Streaming to override output key and value types,
only if mapper/reducer is a command. (Ravi Gummadi via amareshwari)
+ MAPREDUCE-577. Fixes duplicate records in StreamXmlRecordReader.
+ (Ravi Gummadi via amareshwari)
+
Release 0.21.0 - Unreleased
INCOMPATIBLE CHANGES
diff --git a/src/contrib/streaming/src/java/org/apache/hadoop/streaming/StreamXmlRecordReader.java b/src/contrib/streaming/src/java/org/apache/hadoop/streaming/StreamXmlRecordReader.java
index 7544151..b5327da 100644
--- a/src/contrib/streaming/src/java/org/apache/hadoop/streaming/StreamXmlRecordReader.java
+++ b/src/contrib/streaming/src/java/org/apache/hadoop/streaming/StreamXmlRecordReader.java
@@ -88,7 +88,7 @@
if (!readUntilMatchBegin()) {
return false;
}
- if (!readUntilMatchEnd(buf)) {
+ if (pos_ >= end_ || !readUntilMatchEnd(buf)) {
return false;
}
@@ -258,8 +258,8 @@
if (outBufOrNull != null) {
outBufOrNull.write(cpat, 0, m);
outBufOrNull.write(c);
- pos_ += m;
}
+ pos_ += m + 1; // skip m chars, +1 for 'c'
m = 0;
}
}
diff --git a/src/contrib/streaming/src/test/org/apache/hadoop/streaming/TestStreamXmlMultipleRecords.java b/src/contrib/streaming/src/test/org/apache/hadoop/streaming/TestStreamXmlMultipleRecords.java
new file mode 100644
index 0000000..cd64c82
--- /dev/null
+++ b/src/contrib/streaming/src/test/org/apache/hadoop/streaming/TestStreamXmlMultipleRecords.java
@@ -0,0 +1,184 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.streaming;
+
+import java.io.IOException;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+
+import org.junit.Before;
+import org.junit.Test;
+
+/**
+ * Tests if StreamXmlRecordReader will read the next record, _after_ the
+ * end of a split if the split falls before the end of end-tag of a record.
+ * Also tests if StreamXmlRecordReader will read a record twice if end of a
+ * split is after few characters after the end-tag of a record but before the
+ * begin-tag of next record.
+ */
+public class TestStreamXmlMultipleRecords extends TestStreaming
+{
+ private static final Log LOG = LogFactory.getLog(
+ TestStreamXmlMultipleRecords.class);
+
+ private boolean hasPerl = false;
+ private long blockSize;
+ private String isSlowMatch;
+
+ // Our own configuration used for creating FileSystem object where
+ // fs.local.block.size is set to 60 OR 80.
+ // See 60th char in input. It is before the end of end-tag of a record.
+ // See 80th char in input. It is in between the end-tag of a record and
+ // the begin-tag of next record.
+ private Configuration conf = null;
+
+ private String myPerlMapper =
+ "perl -n -a -e 'print join(\"\\n\", map { \"$_\\t1\" } @F), \"\\n\";'";
+ private String myPerlReducer =
+ "perl -n -a -e '$freq{$F[0]}++; END { print \"is\\t$freq{is}\\n\"; }'";
+
+ public TestStreamXmlMultipleRecords() throws IOException {
+ super();
+
+ input = "<line>This is a single line,\nand it is containing multiple" +
+ " words.</line> <line>Only is appears more than" +
+ " once.</line>\n";
+ outputExpect = "is\t3\n";
+
+ map = myPerlMapper;
+ reduce = myPerlReducer;
+
+ hasPerl = UtilTest.hasPerlSupport();
+ }
+
+ @Override
+ @Before
+ public void setUp() throws IOException {
+ super.setUp();
+ // Without this closeAll() call, setting of FileSystem block size is
+ // not effective and will be old block size set in earlier test.
+ FileSystem.closeAll();
+ }
+
+ // Set file system block size such that split falls
+ // (a) before the end of end-tag of a record (testStreamXmlMultiInner...) OR
+ // (b) between records(testStreamXmlMultiOuter...)
+ @Override
+ protected Configuration getConf() {
+ conf = new Configuration();
+ conf.setLong("fs.local.block.size", blockSize);
+ return conf;
+ }
+
+ @Override
+ protected String[] genArgs() {
+ args.add("-inputreader");
+ args.add("StreamXmlRecordReader,begin=<line>,end=</line>,slowmatch=" +
+ isSlowMatch);
+ return super.genArgs();
+ }
+
+ /**
+ * Tests if StreamXmlRecordReader will read the next record, _after_ the
+ * end of a split if the split falls before the end of end-tag of a record.
+ * Tests with slowmatch=false.
+ * @throws Exception
+ */
+ @Test
+ public void testStreamXmlMultiInnerFast() throws Exception {
+ if (hasPerl) {
+ blockSize = 60;
+
+ isSlowMatch = "false";
+ super.testCommandLine();
+ }
+ else {
+ LOG.warn("No perl; skipping test.");
+ }
+ }
+
+ /**
+ * Tests if StreamXmlRecordReader will read a record twice if end of a
+ * split is after few characters after the end-tag of a record but before the
+ * begin-tag of next record.
+ * Tests with slowmatch=false.
+ * @throws Exception
+ */
+ @Test
+ public void testStreamXmlMultiOuterFast() throws Exception {
+ if (hasPerl) {
+ blockSize = 80;
+
+ isSlowMatch = "false";
+ super.testCommandLine();
+ }
+ else {
+ LOG.warn("No perl; skipping test.");
+ }
+ }
+
+ /**
+ * Tests if StreamXmlRecordReader will read the next record, _after_ the
+ * end of a split if the split falls before the end of end-tag of a record.
+ * Tests with slowmatch=true.
+ * @throws Exception
+ */
+ @Test
+ public void testStreamXmlMultiInnerSlow() throws Exception {
+ if (hasPerl) {
+ blockSize = 60;
+
+ isSlowMatch = "true";
+ super.testCommandLine();
+ }
+ else {
+ LOG.warn("No perl; skipping test.");
+ }
+ }
+
+ /**
+ * Tests if StreamXmlRecordReader will read a record twice if end of a
+ * split is after few characters after the end-tag of a record but before the
+ * begin-tag of next record.
+ * Tests with slowmatch=true.
+ * @throws Exception
+ */
+ @Test
+ public void testStreamXmlMultiOuterSlow() throws Exception {
+ if (hasPerl) {
+ blockSize = 80;
+
+ isSlowMatch = "true";
+ super.testCommandLine();
+ }
+ else {
+ LOG.warn("No perl; skipping test.");
+ }
+ }
+
+ @Override
+ @Test
+ public void testCommandLine() {
+ // Do nothing
+ }
+}
diff --git a/src/contrib/streaming/src/test/org/apache/hadoop/streaming/TestStreaming.java b/src/contrib/streaming/src/test/org/apache/hadoop/streaming/TestStreaming.java
index 7d8cce4..e465a86 100644
--- a/src/contrib/streaming/src/test/org/apache/hadoop/streaming/TestStreaming.java
+++ b/src/contrib/streaming/src/test/org/apache/hadoop/streaming/TestStreaming.java
@@ -69,6 +69,7 @@
public void setUp() throws IOException {
UtilTest.recursiveDelete(TEST_DIR);
assertTrue("Creating " + TEST_DIR, TEST_DIR.mkdirs());
+ args.clear();
}
@After
diff --git a/src/contrib/streaming/src/test/org/apache/hadoop/streaming/UtilTest.java b/src/contrib/streaming/src/test/org/apache/hadoop/streaming/UtilTest.java
index 03a9022..1fb3274 100644
--- a/src/contrib/streaming/src/test/org/apache/hadoop/streaming/UtilTest.java
+++ b/src/contrib/streaming/src/test/org/apache/hadoop/streaming/UtilTest.java
@@ -23,8 +23,14 @@
import java.io.IOException;
import java.io.PrintStream;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.util.Shell.ShellCommandExecutor;
+
class UtilTest {
+ private static final Log LOG = LogFactory.getLog(UtilTest.class);
+
/**
* Utility routine to recurisvely delete a directory.
* On normal return, the file does not exist.
@@ -76,6 +82,28 @@
}
}
+ /**
+ * Is perl supported on this machine ?
+ * @return true if perl is available and is working as expected
+ */
+ public static boolean hasPerlSupport() {
+ boolean hasPerl = false;
+ ShellCommandExecutor shexec = new ShellCommandExecutor(
+ new String[] { "perl", "-e", "print 42" });
+ try {
+ shexec.execute();
+ if (shexec.getOutput().equals("42")) {
+ hasPerl = true;
+ }
+ else {
+ LOG.warn("Perl is installed, but isn't behaving as expected.");
+ }
+ } catch (Exception e) {
+ LOG.warn("Could not run perl: " + e);
+ }
+ return hasPerl;
+ }
+
private String userDir_;
private String antTestDir_;
private String testName_;