PIG-5108: AvroStorage on Tez with exception on nested records
git-svn-id: https://svn.apache.org/repos/asf/pig/trunk@1779402 13f79535-47bb-0310-9956-ffa450edef68
diff --git a/CHANGES.txt b/CHANGES.txt
index dcef62e..5fb6142 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -187,6 +187,8 @@
BUG FIXES
+PIG-5108: AvroStorage on Tez with exception on nested records (daijy)
+
PIG-4260: SpillableMemoryManager.spill should revert spill on all exception (rohini)
PIG-4918: Pig on Tez cannot switch pig.temp.dir to another fs (daijy)
diff --git a/src/org/apache/pig/impl/util/avro/AvroTupleWrapper.java b/src/org/apache/pig/impl/util/avro/AvroTupleWrapper.java
index 648e639..8adbe84 100644
--- a/src/org/apache/pig/impl/util/avro/AvroTupleWrapper.java
+++ b/src/org/apache/pig/impl/util/avro/AvroTupleWrapper.java
@@ -33,6 +33,7 @@
import org.apache.pig.backend.executionengine.ExecException;
import org.apache.pig.data.DataByteArray;
import org.apache.pig.data.Tuple;
+import org.apache.pig.data.TupleFactory;
import java.io.DataInput;
import java.io.DataOutput;
@@ -49,6 +50,7 @@
public final class AvroTupleWrapper <T extends IndexedRecord>
implements Tuple {
private static final Log LOG = LogFactory.getLog(AvroTupleWrapper.class);
+ private TupleFactory mTupleFactory = TupleFactory.getInstance();
/**
* The Avro object wrapped in the pig Tuple.
@@ -64,9 +66,9 @@
}
@Override
- public void write(final DataOutput o) throws IOException {
- throw new IOException(
- this.getClass().toString() + ".write called, but not implemented yet");
+ public void write(DataOutput out) throws IOException {
+ Tuple t = mTupleFactory.newTupleNoCopy(getAll());
+ t.write(out);
}
@SuppressWarnings("rawtypes")
diff --git a/test/org/apache/pig/builtin/TestAvroStorage.java b/test/org/apache/pig/builtin/TestAvroStorage.java
index ee12ccb..531bc26 100644
--- a/test/org/apache/pig/builtin/TestAvroStorage.java
+++ b/test/org/apache/pig/builtin/TestAvroStorage.java
@@ -709,6 +709,19 @@
}
@Test
+ public void testGroupWithRepeatedSubRecords() throws Exception {
+ final String input = basedir + "data/avro/uncompressed/recordWithRepeatedSubRecords.avro";
+ final String check = basedir + "data/avro/uncompressed/recordWithRepeatedSubRecords.avro";
+ testAvroStorage(true, basedir + "code/pig/group_test.pig",
+ ImmutableMap.of(
+ "INFILE", input,
+ "AVROSTORAGE_OUT_2", "-f " + basedir + "schema/recordWithRepeatedSubRecords.avsc",
+ "OUTFILE", createOutputName())
+ );
+ verifyResults(createOutputName(),check);
+ }
+
+ @Test
public void testLoadDirectory() throws Exception {
final String input = basedir + "data/avro/uncompressed/testdirectory";
final String check = basedir + "data/avro/uncompressed/testDirectoryCounts.avro";
diff --git a/test/org/apache/pig/builtin/avro/code/pig/group_test.pig b/test/org/apache/pig/builtin/avro/code/pig/group_test.pig
new file mode 100644
index 0000000..64a9577
--- /dev/null
+++ b/test/org/apache/pig/builtin/avro/code/pig/group_test.pig
@@ -0,0 +1,5 @@
+in = LOAD '$INFILE' USING AvroStorage();
+grouped = GROUP in BY (value1.thing);
+flattened = FOREACH grouped GENERATE flatten(in) as (key: chararray,value1: (thing: chararray,count: int),value2: (thing: chararray,count: int));
+RMF $OUTFILE;
+STORE flattened INTO '$OUTFILE' USING AvroStorage();