PIG-5108: AvroStorage on Tez with exception on nested records

git-svn-id: https://svn.apache.org/repos/asf/pig/trunk@1779402 13f79535-47bb-0310-9956-ffa450edef68
diff --git a/CHANGES.txt b/CHANGES.txt
index dcef62e..5fb6142 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -187,6 +187,8 @@
 
 BUG FIXES
 
+PIG-5108: AvroStorage on Tez with exception on nested records (daijy)
+
 PIG-4260: SpillableMemoryManager.spill should revert spill on all exception (rohini)
 
 PIG-4918: Pig on Tez cannot switch pig.temp.dir to another fs (daijy)
diff --git a/src/org/apache/pig/impl/util/avro/AvroTupleWrapper.java b/src/org/apache/pig/impl/util/avro/AvroTupleWrapper.java
index 648e639..8adbe84 100644
--- a/src/org/apache/pig/impl/util/avro/AvroTupleWrapper.java
+++ b/src/org/apache/pig/impl/util/avro/AvroTupleWrapper.java
@@ -33,6 +33,7 @@
 import org.apache.pig.backend.executionengine.ExecException;
 import org.apache.pig.data.DataByteArray;
 import org.apache.pig.data.Tuple;
+import org.apache.pig.data.TupleFactory;
 
 import java.io.DataInput;
 import java.io.DataOutput;
@@ -49,6 +50,7 @@
 public final class AvroTupleWrapper <T extends IndexedRecord>
     implements Tuple {
     private static final Log LOG = LogFactory.getLog(AvroTupleWrapper.class);
+    private TupleFactory mTupleFactory = TupleFactory.getInstance();
 
   /**
    * The Avro object wrapped in the pig Tuple.
@@ -64,9 +66,9 @@
   }
 
   @Override
-  public void write(final DataOutput o) throws IOException {
-    throw new IOException(
-        this.getClass().toString() + ".write called, but not implemented yet");
+  public void write(DataOutput out) throws IOException {
+      Tuple t = mTupleFactory.newTupleNoCopy(getAll());
+      t.write(out);
   }
 
   @SuppressWarnings("rawtypes")
diff --git a/test/org/apache/pig/builtin/TestAvroStorage.java b/test/org/apache/pig/builtin/TestAvroStorage.java
index ee12ccb..531bc26 100644
--- a/test/org/apache/pig/builtin/TestAvroStorage.java
+++ b/test/org/apache/pig/builtin/TestAvroStorage.java
@@ -709,6 +709,19 @@
     }
 
     @Test
+    public void testGroupWithRepeatedSubRecords() throws Exception {
+      final String input = basedir + "data/avro/uncompressed/recordWithRepeatedSubRecords.avro";
+      final String check = basedir + "data/avro/uncompressed/recordWithRepeatedSubRecords.avro";
+      testAvroStorage(true, basedir + "code/pig/group_test.pig",
+          ImmutableMap.of(
+              "INFILE",           input,
+              "AVROSTORAGE_OUT_2", "-f " + basedir + "schema/recordWithRepeatedSubRecords.avsc",
+              "OUTFILE",          createOutputName())
+        );
+      verifyResults(createOutputName(),check);
+    }
+
+    @Test
     public void testLoadDirectory() throws Exception {
       final String input = basedir + "data/avro/uncompressed/testdirectory";
       final String check = basedir + "data/avro/uncompressed/testDirectoryCounts.avro";
diff --git a/test/org/apache/pig/builtin/avro/code/pig/group_test.pig b/test/org/apache/pig/builtin/avro/code/pig/group_test.pig
new file mode 100644
index 0000000..64a9577
--- /dev/null
+++ b/test/org/apache/pig/builtin/avro/code/pig/group_test.pig
@@ -0,0 +1,5 @@
+in = LOAD '$INFILE' USING AvroStorage();
+grouped = GROUP in BY (value1.thing);
+flattened = FOREACH grouped GENERATE flatten(in) as (key: chararray,value1: (thing: chararray,count: int),value2: (thing: chararray,count: int));
+RMF $OUTFILE;
+STORE flattened INTO '$OUTFILE' USING AvroStorage();