PIG-5374: Use CircularFifoBuffer in InterRecordReader (szita)
git-svn-id: https://svn.apache.org/repos/asf/pig/trunk@1850723 13f79535-47bb-0310-9956-ffa450edef68
diff --git a/CHANGES.txt b/CHANGES.txt
index b214b43..dc92076 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -88,6 +88,8 @@
BUG FIXES
+PIG-5374: Use CircularFifoBuffer in InterRecordReader (szita)
+
PIG-5373: InterRecordReader might skip records if certain sync markers are used (szita)
PIG-5370: Union onschema + columnprune dropping used fields (knoguchi)
diff --git a/src/org/apache/pig/impl/io/InterRecordReader.java b/src/org/apache/pig/impl/io/InterRecordReader.java
index 19ac141..2d0acd0 100644
--- a/src/org/apache/pig/impl/io/InterRecordReader.java
+++ b/src/org/apache/pig/impl/io/InterRecordReader.java
@@ -20,7 +20,7 @@
import java.io.DataInputStream;
import java.io.IOException;
-import org.apache.commons.collections4.queue.CircularFifoQueue;
+import org.apache.commons.collections.buffer.CircularFifoBuffer;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FileSystem;
@@ -92,7 +92,7 @@
*/
public boolean skipUntilMarkerOrSplitEndOrEOF() throws IOException {
int b = Integer.MIN_VALUE;
- CircularFifoQueue<Integer> queue = new CircularFifoQueue(syncMarker.length);
+ CircularFifoBuffer queue = new CircularFifoBuffer(syncMarker.length);
outer:while (b != -1) {
//There may be a case where we read through a whole split without a marker, then we shouldn't proceed
// because the records are from the next split which another reader would pick up too
@@ -107,13 +107,13 @@
if (b == -1) return false;
queue.add(b);
- if (queue.size() != queue.maxSize()) {
+ if (!queue.isFull()) {
//Not enough bytes read yet
continue outer;
}
int i = 0;
- for (Integer seenByte : queue){
- if (syncMarker[i++] != seenByte.byteValue()) {
+ for (Object seenByte : queue){
+ if (syncMarker[i++] != ((Integer)seenByte).byteValue()) {
continue outer;
}
}