Merge pull request #34 from noslowerdna/CRUNCH-698

CRUNCH-698: Inclusion of local patch for AVRO-2944
diff --git a/crunch-core/src/main/java/org/apache/crunch/types/avro/AvroRecordReader.java b/crunch-core/src/main/java/org/apache/crunch/types/avro/AvroRecordReader.java
index ab2f30e..45e2130 100644
--- a/crunch-core/src/main/java/org/apache/crunch/types/avro/AvroRecordReader.java
+++ b/crunch-core/src/main/java/org/apache/crunch/types/avro/AvroRecordReader.java
@@ -17,10 +17,15 @@
  */
 package org.apache.crunch.types.avro;
 
+import static org.apache.avro.file.DataFileConstants.MAGIC;
+
+import java.io.EOFException;
 import java.io.IOException;
+import java.util.Arrays;
 
 import org.apache.avro.Schema;
 import org.apache.avro.file.DataFileReader;
+import org.apache.avro.file.DataFileReader12;
 import org.apache.avro.file.FileReader;
 import org.apache.avro.file.SeekableInput;
 import org.apache.avro.io.DatumReader;
@@ -55,7 +60,7 @@
     DatumReader<T> datumReader = AvroMode
         .fromConfiguration(context.getConfiguration())
         .getReader(schema);
-    this.reader = DataFileReader.openReader(in, datumReader);
+    this.reader = openAvroDataFileReader(in, datumReader);
     reader.sync(split.getStart()); // sync to start
     this.start = reader.tell();
     this.end = split.getStart() + split.getLength();
@@ -108,4 +113,34 @@
       reader = null;
     }
   }
+
+  /**
+   * Local patch for AVRO-2944.
+   */
+  private static <D> FileReader<D> openAvroDataFileReader(SeekableInput in, DatumReader<D> reader) throws IOException {
+    if (in.length() < MAGIC.length)
+      throw new IOException("Not an Avro data file");
+
+    // read magic header
+    byte[] magic = new byte[MAGIC.length];
+    in.seek(0);
+    int offset = 0;
+    int length = magic.length;
+    while (length > 0) {
+      int bytesRead = in.read(magic, offset, length);
+      if (bytesRead < 0)
+        throw new EOFException("Unexpected EOF with " + length + " bytes remaining to read");
+
+      length -= bytesRead;
+      offset += bytesRead;
+    }
+    in.seek(0);
+
+    if (Arrays.equals(MAGIC, magic)) // current format
+      return new DataFileReader<>(in, reader);
+    if (Arrays.equals(new byte[] { (byte) 'O', (byte) 'b', (byte) 'j', (byte) 0 }, magic)) // 1.2 format
+      return new DataFileReader12<>(in, reader);
+
+    throw new IOException("Not an Avro data file");
+  }
 }