PIG-5373: InterRecordReader might skip records if certain sync markers are used (szita)

git-svn-id: https://svn.apache.org/repos/asf/pig/trunk@1850245 13f79535-47bb-0310-9956-ffa450edef68
diff --git a/CHANGES.txt b/CHANGES.txt
index fc51596..b214b43 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -88,6 +88,8 @@
  
 BUG FIXES
 
+PIG-5373: InterRecordReader might skip records if certain sync markers are used (szita)
+
 PIG-5370: Union onschema + columnprune dropping used fields (knoguchi)
 
 PIG-5362: Parameter substitution of shell cmd results doesn't handle backslash (wlauer via rohini)
diff --git a/src/org/apache/pig/impl/io/InterRecordReader.java b/src/org/apache/pig/impl/io/InterRecordReader.java
index 6e2d3a6..19ac141 100644
--- a/src/org/apache/pig/impl/io/InterRecordReader.java
+++ b/src/org/apache/pig/impl/io/InterRecordReader.java
@@ -20,6 +20,7 @@
 import java.io.DataInputStream;
 import java.io.IOException;
 
+import org.apache.commons.collections4.queue.CircularFifoQueue;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FSDataInputStream;
 import org.apache.hadoop.fs.FileSystem;
@@ -89,35 +90,34 @@
      * @return true if marker was observed, false if EOF or EndOfSplit was reached
      * @throws IOException
      */
-  private boolean skipUntilMarkerOrSplitEndOrEOF() throws IOException {
+  public boolean skipUntilMarkerOrSplitEndOrEOF() throws IOException {
       int b = Integer.MIN_VALUE;
-outer:while (b != -1) {
-          if (b != syncMarker[0]) {
-
-              //There may be a case where we read through a whole split without a marker, then we shouldn't proceed
-              // because the records are from the next split which another reader would pick up too
-              if (in.getPosition() >= end) {
-                  return false;
-              }
-              b = in.read();
-              if ((byte) b != syncMarker[0] && b != -1) {
-                  continue;
-              }
-              if (b == -1) return false;
+      CircularFifoQueue<Integer> queue = new CircularFifoQueue(syncMarker.length);
+      outer:while (b != -1) {
+          //There may be a case where we read through a whole split without a marker, then we shouldn't proceed
+          // because the records are from the next split which another reader would pick up too
+          //One exception of reading past split end is if at least the first byte of the marker was seen before split
+          // end.
+          if (in.getPosition() >= (end+syncMarker.length-1)) {
+              return false;
           }
-          int i = 1;
-          while (i < syncMarker.length) {
-              b = in.read();
-              if (b == -1) return false;
-              if ((byte) b != syncMarker[i]) {
-                  if (in.getPosition() > end) {
-                      //Again we should not read past the split end, only if at least the first byte of marker was seen before it
-                      return false;
-                  }
+          b = in.read();
+
+          //EOF reached
+          if (b == -1) return false;
+
+          queue.add(b);
+          if (queue.size() != queue.maxSize()) {
+              //Not enough bytes read yet
+              continue outer;
+          }
+          int i = 0;
+          for (Integer seenByte : queue){
+              if (syncMarker[i++] != seenByte.byteValue()) {
                   continue outer;
               }
-              ++i;
           }
+          //Found marker: queue content equals sync marker
           lastSyncPos = in.getPosition();
           return true;
       }
diff --git a/test/org/apache/pig/test/TestBinInterSedes.java b/test/org/apache/pig/test/TestBinInterSedes.java
index 65eb15e..b2419f5 100644
--- a/test/org/apache/pig/test/TestBinInterSedes.java
+++ b/test/org/apache/pig/test/TestBinInterSedes.java
@@ -18,6 +18,7 @@
 package org.apache.pig.test;
 
 import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
 
 import java.io.ByteArrayInputStream;
 import java.io.ByteArrayOutputStream;
@@ -26,6 +27,7 @@
 import java.io.DataOutputStream;
 import java.io.File;
 import java.io.IOException;
+import java.util.Arrays;
 import java.util.HashMap;
 import java.util.Iterator;
 import java.util.Map;
@@ -42,8 +44,11 @@
 import org.apache.pig.data.InterSedesFactory;
 import org.apache.pig.data.Tuple;
 import org.apache.pig.data.TupleFactory;
+import org.apache.pig.impl.io.BufferedPositionedInputStream;
+import org.apache.pig.impl.io.InterRecordReader;
 import org.apache.pig.impl.util.TupleFormat;
 import org.junit.Test;
+import org.mockito.internal.util.reflection.Whitebox;
 
 public class TestBinInterSedes {
     private static final TupleFactory mTupleFactory = TupleFactory.getInstance();
@@ -437,6 +442,45 @@
 
     }
 
+    /**
+     * Tests all combination where:
+     * sync marker is {x, y, 4}
+     * data is {127, -2, 2, z, x, y, 4, 1, 2, 3}
+     * x,y,z in [-128,127]
+     * This means that a sync marker has to be found in all iterations (total=16,777,216)
+     * @throws Exception
+     */
+    @Test
+    public void testPrefixSyncMarkers() throws Exception {
+        long defaultInterval = PigConfiguration.PIG_INTERSTORAGE_SYNCMARKER_INTERVAL_DEFAULT;
+
+        for (int b0 = -128; b0 <= 127; b0++) {
+            for (int b1 = -128; b1 <= 127; b1++) {
+                for (int b2 = -128; b2 <= 127; b2++) {
+                    byte[] syncMarker = new byte[]{(byte) b0, (byte) b1, (byte)4};
+                    byte[] data = new byte[]{127, -1, 2, (byte) b2, (byte) b0, (byte) b1, 4, 1, 2, 3};
+
+                    ByteArrayInputStream bi = new ByteArrayInputStream(data);
+                    BufferedPositionedInputStream bpi = new BufferedPositionedInputStream(bi);
+
+                    InterRecordReader reader = new InterRecordReader(syncMarker.length, defaultInterval);
+                    Whitebox.setInternalState(reader, "syncMarker", syncMarker);
+                    Whitebox.setInternalState(reader, "end", data.length);
+                    Whitebox.setInternalState(reader, "in", bpi);
+
+                    try {
+                        boolean ret = reader.skipUntilMarkerOrSplitEndOrEOF();
+                        assertTrue("Marker should have been found: " + "marker: " +
+                                Arrays.toString(syncMarker) + " , data: " + Arrays.toString(data),ret);
+                    } finally {
+                        bpi.close();
+                    }
+
+                }
+            }
+        }
+    }
+
     private void testSerTuple(Tuple t, byte[] expected) throws Exception {
         ByteArrayOutputStream baos = new ByteArrayOutputStream();
         DataOutput out = new DataOutputStream(baos);