PIG-5373: InterRecordReader might skip records if certain sync markers are used (szita)
git-svn-id: https://svn.apache.org/repos/asf/pig/trunk@1850245 13f79535-47bb-0310-9956-ffa450edef68
diff --git a/CHANGES.txt b/CHANGES.txt
index fc51596..b214b43 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -88,6 +88,8 @@
BUG FIXES
+PIG-5373: InterRecordReader might skip records if certain sync markers are used (szita)
+
PIG-5370: Union onschema + columnprune dropping used fields (knoguchi)
PIG-5362: Parameter substitution of shell cmd results doesn't handle backslash (wlauer via rohini)
diff --git a/src/org/apache/pig/impl/io/InterRecordReader.java b/src/org/apache/pig/impl/io/InterRecordReader.java
index 6e2d3a6..19ac141 100644
--- a/src/org/apache/pig/impl/io/InterRecordReader.java
+++ b/src/org/apache/pig/impl/io/InterRecordReader.java
@@ -20,6 +20,7 @@
import java.io.DataInputStream;
import java.io.IOException;
+import org.apache.commons.collections4.queue.CircularFifoQueue;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FileSystem;
@@ -89,35 +90,34 @@
* @return true if marker was observed, false if EOF or EndOfSplit was reached
* @throws IOException
*/
- private boolean skipUntilMarkerOrSplitEndOrEOF() throws IOException {
+ public boolean skipUntilMarkerOrSplitEndOrEOF() throws IOException {
int b = Integer.MIN_VALUE;
-outer:while (b != -1) {
- if (b != syncMarker[0]) {
-
- //There may be a case where we read through a whole split without a marker, then we shouldn't proceed
- // because the records are from the next split which another reader would pick up too
- if (in.getPosition() >= end) {
- return false;
- }
- b = in.read();
- if ((byte) b != syncMarker[0] && b != -1) {
- continue;
- }
- if (b == -1) return false;
+ CircularFifoQueue<Integer> queue = new CircularFifoQueue(syncMarker.length);
+ outer:while (b != -1) {
+ //There may be a case where we read through a whole split without a marker, then we shouldn't proceed
+ // because the records are from the next split which another reader would pick up too
+ //One exception of reading past split end is if at least the first byte of the marker was seen before split
+ // end.
+ if (in.getPosition() >= (end+syncMarker.length-1)) {
+ return false;
}
- int i = 1;
- while (i < syncMarker.length) {
- b = in.read();
- if (b == -1) return false;
- if ((byte) b != syncMarker[i]) {
- if (in.getPosition() > end) {
- //Again we should not read past the split end, only if at least the first byte of marker was seen before it
- return false;
- }
+ b = in.read();
+
+ //EOF reached
+ if (b == -1) return false;
+
+ queue.add(b);
+ if (queue.size() != queue.maxSize()) {
+ //Not enough bytes read yet
+ continue outer;
+ }
+ int i = 0;
+ for (Integer seenByte : queue){
+ if (syncMarker[i++] != seenByte.byteValue()) {
continue outer;
}
- ++i;
}
+ //Found marker: queue content equals sync marker
lastSyncPos = in.getPosition();
return true;
}
diff --git a/test/org/apache/pig/test/TestBinInterSedes.java b/test/org/apache/pig/test/TestBinInterSedes.java
index 65eb15e..b2419f5 100644
--- a/test/org/apache/pig/test/TestBinInterSedes.java
+++ b/test/org/apache/pig/test/TestBinInterSedes.java
@@ -18,6 +18,7 @@
package org.apache.pig.test;
import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
import java.io.ByteArrayInputStream;
import java.io.ByteArrayOutputStream;
@@ -26,6 +27,7 @@
import java.io.DataOutputStream;
import java.io.File;
import java.io.IOException;
+import java.util.Arrays;
import java.util.HashMap;
import java.util.Iterator;
import java.util.Map;
@@ -42,8 +44,11 @@
import org.apache.pig.data.InterSedesFactory;
import org.apache.pig.data.Tuple;
import org.apache.pig.data.TupleFactory;
+import org.apache.pig.impl.io.BufferedPositionedInputStream;
+import org.apache.pig.impl.io.InterRecordReader;
import org.apache.pig.impl.util.TupleFormat;
import org.junit.Test;
+import org.mockito.internal.util.reflection.Whitebox;
public class TestBinInterSedes {
private static final TupleFactory mTupleFactory = TupleFactory.getInstance();
@@ -437,6 +442,45 @@
}
+ /**
+ * Tests all combination where:
+ * sync marker is {x, y, 4}
+ * data is {127, -2, 2, z, x, y, 4, 1, 2, 3}
+ * x,y,z in [-128,127]
+ * This means that a sync marker has to be found in all iterations (total=16,777,216)
+ * @throws Exception
+ */
+ @Test
+ public void testPrefixSyncMarkers() throws Exception {
+ long defaultInterval = PigConfiguration.PIG_INTERSTORAGE_SYNCMARKER_INTERVAL_DEFAULT;
+
+ for (int b0 = -128; b0 <= 127; b0++) {
+ for (int b1 = -128; b1 <= 127; b1++) {
+ for (int b2 = -128; b2 <= 127; b2++) {
+ byte[] syncMarker = new byte[]{(byte) b0, (byte) b1, (byte)4};
+ byte[] data = new byte[]{127, -1, 2, (byte) b2, (byte) b0, (byte) b1, 4, 1, 2, 3};
+
+ ByteArrayInputStream bi = new ByteArrayInputStream(data);
+ BufferedPositionedInputStream bpi = new BufferedPositionedInputStream(bi);
+
+ InterRecordReader reader = new InterRecordReader(syncMarker.length, defaultInterval);
+ Whitebox.setInternalState(reader, "syncMarker", syncMarker);
+ Whitebox.setInternalState(reader, "end", data.length);
+ Whitebox.setInternalState(reader, "in", bpi);
+
+ try {
+ boolean ret = reader.skipUntilMarkerOrSplitEndOrEOF();
+ assertTrue("Marker should have been found: " + "marker: " +
+ Arrays.toString(syncMarker) + " , data: " + Arrays.toString(data),ret);
+ } finally {
+ bpi.close();
+ }
+
+ }
+ }
+ }
+ }
+
private void testSerTuple(Tuple t, byte[] expected) throws Exception {
ByteArrayOutputStream baos = new ByteArrayOutputStream();
DataOutput out = new DataOutputStream(baos);