CRUNCH-698: Inclusion of local patch for AVRO-2944
diff --git a/crunch-core/src/main/java/org/apache/crunch/types/avro/AvroRecordReader.java b/crunch-core/src/main/java/org/apache/crunch/types/avro/AvroRecordReader.java
index ab2f30e..45e2130 100644
--- a/crunch-core/src/main/java/org/apache/crunch/types/avro/AvroRecordReader.java
+++ b/crunch-core/src/main/java/org/apache/crunch/types/avro/AvroRecordReader.java
@@ -17,10 +17,15 @@
*/
package org.apache.crunch.types.avro;
+import static org.apache.avro.file.DataFileConstants.MAGIC;
+
+import java.io.EOFException;
import java.io.IOException;
+import java.util.Arrays;
import org.apache.avro.Schema;
import org.apache.avro.file.DataFileReader;
+import org.apache.avro.file.DataFileReader12;
import org.apache.avro.file.FileReader;
import org.apache.avro.file.SeekableInput;
import org.apache.avro.io.DatumReader;
@@ -55,7 +60,7 @@
DatumReader<T> datumReader = AvroMode
.fromConfiguration(context.getConfiguration())
.getReader(schema);
- this.reader = DataFileReader.openReader(in, datumReader);
+ this.reader = openAvroDataFileReader(in, datumReader);
reader.sync(split.getStart()); // sync to start
this.start = reader.tell();
this.end = split.getStart() + split.getLength();
@@ -108,4 +113,34 @@
reader = null;
}
}
+
+ /**
+ * Local patch for AVRO-2944.
+ */
+ private static <D> FileReader<D> openAvroDataFileReader(SeekableInput in, DatumReader<D> reader) throws IOException {
+ if (in.length() < MAGIC.length)
+ throw new IOException("Not an Avro data file");
+
+ // read magic header
+ byte[] magic = new byte[MAGIC.length];
+ in.seek(0);
+ int offset = 0;
+ int length = magic.length;
+ while (length > 0) {
+ int bytesRead = in.read(magic, offset, length);
+ if (bytesRead < 0)
+ throw new EOFException("Unexpected EOF with " + length + " bytes remaining to read");
+
+ length -= bytesRead;
+ offset += bytesRead;
+ }
+ in.seek(0);
+
+ if (Arrays.equals(MAGIC, magic)) // current format
+ return new DataFileReader<>(in, reader);
+ if (Arrays.equals(new byte[] { (byte) 'O', (byte) 'b', (byte) 'j', (byte) 0 }, magic)) // 1.2 format
+ return new DataFileReader12<>(in, reader);
+
+ throw new IOException("Not an Avro data file");
+ }
}