[REEF-1775] ArrayPipelineDataConverter : FullMessage will throw on null data

This adds an explicit null-filter on incoming pipeline messages.

JIRA:
  [REEF-1775](https://issues.apache.org/jira/browse/REEF-1775)

Pull Request:
  This closes #1289
diff --git a/lang/cs/Org.Apache.REEF.Network.Tests/GroupCommunication/PipeliningTests.cs b/lang/cs/Org.Apache.REEF.Network.Tests/GroupCommunication/PipeliningTests.cs
index d777086..9853725 100644
--- a/lang/cs/Org.Apache.REEF.Network.Tests/GroupCommunication/PipeliningTests.cs
+++ b/lang/cs/Org.Apache.REEF.Network.Tests/GroupCommunication/PipeliningTests.cs
@@ -119,6 +119,9 @@
             // Verify that the constructor has the proper restrictions
             AssertPositivePipelinePackageElementsRequired<T[], ArrayPipelineDataConverter<T>>();
 
+            // Verify that the FullMessage method properly handles null and zero-length arrays
+            AssertNullAndZeroLengthArraysHandledByFullMessage<T, ArrayPipelineDataConverter<T>>();
+
             // Test the valid case where we break up the array into smaller pieces
             // First determine how many messages to create from originalArray
             int pipelineMessageSize;
@@ -175,6 +178,59 @@
         }
 
         /// <summary>
+        /// Verify that the FullMessage method properly handles null and zero-length arrays
+        /// </summary>
+        /// <typeparam name="T">The type of array the IPipelineDataConverter converts</typeparam>
+        /// <typeparam name="DataConverter">The IPipelineDataConverter implementation</typeparam>
+        private static void AssertNullAndZeroLengthArraysHandledByFullMessage<T, DataConverter>()
+            where T : new()
+            where DataConverter : class, IPipelineDataConverter<T[]>
+        {
+            // Test that the valid configuration can be injected
+            IConfiguration config = GetPipelineDataConverterConfig(1);
+            IPipelineDataConverter<T[]> dataConverter = TangFactory.GetTang().NewInjector(config).GetInstance<ArrayPipelineDataConverter<T>>();
+
+            // Test that a null message returns a null object
+            List<PipelineMessage<T[]>> nullMessage = new List<PipelineMessage<T[]>>
+            {
+                new PipelineMessage<T[]>(null, true)
+            };
+            Assert.Null(dataConverter.FullMessage(nullMessage));
+
+            // Test that many null messages returns a null object
+            List<PipelineMessage<T[]>> manyNullMessages = new List<PipelineMessage<T[]>>
+            {
+                new PipelineMessage<T[]>(null, false),
+                new PipelineMessage<T[]>(null, true)
+            };
+            Assert.Null(dataConverter.FullMessage(manyNullMessages));
+
+            // Test that null messages mixed with non-null returns the non-null object
+            List<PipelineMessage<T[]>> someNullMessages = new List<PipelineMessage<T[]>>
+            {
+                new PipelineMessage<T[]>(null, false),
+                new PipelineMessage<T[]>(new T[2], false),
+                new PipelineMessage<T[]>(null, true)
+            };
+            Assert.Equal(2, dataConverter.FullMessage(someNullMessages).Length);
+
+            // Test that a zero-length message returns a zero-length object
+            List<PipelineMessage<T[]>> zeroLengthMessage = new List<PipelineMessage<T[]>>
+            {
+                new PipelineMessage<T[]>(new T[0], true)
+            };
+            Assert.Equal(0, dataConverter.FullMessage(zeroLengthMessage).Length);
+
+            // Test that many zero-length message return a zero-length object
+            List<PipelineMessage<T[]>> manyZeroLengthMessages = new List<PipelineMessage<T[]>>
+            {
+                new PipelineMessage<T[]>(new T[0], false),
+                new PipelineMessage<T[]>(new T[0], true)
+            };
+            Assert.Equal(0, dataConverter.FullMessage(manyZeroLengthMessages).Length);
+        }
+
+        /// <summary>
         /// Validate that the IsLast flag is properly set on a list of pipeline messages
         /// </summary>
         /// <typeparam name="T"></typeparam>
diff --git a/lang/cs/Org.Apache.REEF.Network/Group/Pipelining/Impl/ArrayPipelineDataConverter.cs b/lang/cs/Org.Apache.REEF.Network/Group/Pipelining/Impl/ArrayPipelineDataConverter.cs
index 12540ac..6d43812 100644
--- a/lang/cs/Org.Apache.REEF.Network/Group/Pipelining/Impl/ArrayPipelineDataConverter.cs
+++ b/lang/cs/Org.Apache.REEF.Network/Group/Pipelining/Impl/ArrayPipelineDataConverter.cs
@@ -96,6 +96,9 @@
         /// <returns>The full constructed message</returns>
         public T[] FullMessage(List<PipelineMessage<T[]>> pipelineMessage)
         {
+            // Filter out messages with null data payloads
+            pipelineMessage = pipelineMessage.Where(message => message.Data != null).ToList();
+
             // Null data corresponds to an empty list of pipeline messages
             if (pipelineMessage.Count == 0)
             {