PIG-5374: Use CircularFifoBuffer in InterRecordReader (szita)

git-svn-id: https://svn.apache.org/repos/asf/pig/trunk@1850723 13f79535-47bb-0310-9956-ffa450edef68
diff --git a/CHANGES.txt b/CHANGES.txt
index b214b43..dc92076 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -88,6 +88,8 @@
  
 BUG FIXES
 
+PIG-5374: Use CircularFifoBuffer in InterRecordReader (szita)
+
 PIG-5373: InterRecordReader might skip records if certain sync markers are used (szita)
 
 PIG-5370: Union onschema + columnprune dropping used fields (knoguchi)
diff --git a/src/org/apache/pig/impl/io/InterRecordReader.java b/src/org/apache/pig/impl/io/InterRecordReader.java
index 19ac141..2d0acd0 100644
--- a/src/org/apache/pig/impl/io/InterRecordReader.java
+++ b/src/org/apache/pig/impl/io/InterRecordReader.java
@@ -20,7 +20,7 @@
 import java.io.DataInputStream;
 import java.io.IOException;
 
-import org.apache.commons.collections4.queue.CircularFifoQueue;
+import org.apache.commons.collections.buffer.CircularFifoBuffer;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FSDataInputStream;
 import org.apache.hadoop.fs.FileSystem;
@@ -92,7 +92,7 @@
      */
   public boolean skipUntilMarkerOrSplitEndOrEOF() throws IOException {
       int b = Integer.MIN_VALUE;
-      CircularFifoQueue<Integer> queue = new CircularFifoQueue(syncMarker.length);
+      CircularFifoBuffer queue = new CircularFifoBuffer(syncMarker.length);
       outer:while (b != -1) {
           //There may be a case where we read through a whole split without a marker, then we shouldn't proceed
           // because the records are from the next split which another reader would pick up too
@@ -107,13 +107,13 @@
           if (b == -1) return false;
 
           queue.add(b);
-          if (queue.size() != queue.maxSize()) {
+          if (!queue.isFull()) {
               //Not enough bytes read yet
               continue outer;
           }
           int i = 0;
-          for (Integer seenByte : queue){
-              if (syncMarker[i++] != seenByte.byteValue()) {
+          for (Object seenByte : queue){
+              if (syncMarker[i++] != ((Integer)seenByte).byteValue()) {
                   continue outer;
               }
           }