[REEF-1772] Implement IPipelineDataConverter in REEF.NET for arrays of data

This adds a new implementation for `IPipelineDataConverter` that supports
arrays of arbitrary primitive types.

JIRA:
  [REEF-1772](https://issues.apache.org/jira/browse/REEF-1772)

Pull Request:
  This closes #1286
diff --git a/lang/cs/Org.Apache.REEF.Network.Tests/GroupCommunication/PipeliningTests.cs b/lang/cs/Org.Apache.REEF.Network.Tests/GroupCommunication/PipeliningTests.cs
new file mode 100644
index 0000000..d777086
--- /dev/null
+++ b/lang/cs/Org.Apache.REEF.Network.Tests/GroupCommunication/PipeliningTests.cs
@@ -0,0 +1,230 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+using Org.Apache.REEF.Network.Group.Config;
+using Org.Apache.REEF.Network.Group.Pipelining;
+using Org.Apache.REEF.Network.Group.Pipelining.Impl;
+using Org.Apache.REEF.Tang.Exceptions;
+using Org.Apache.REEF.Tang.Implementations.Tang;
+using Org.Apache.REEF.Tang.Interface;
+using System;
+using System.Collections.Generic;
+using Xunit;
+
+namespace Org.Apache.REEF.Network.Tests.GroupCommunication
+{
+    /// <summary>
+    /// Defines pipelining component tests
+    /// </summary>
+    public class PipeliningTests
+    {
+        /// <summary>
+        /// Test the ArrayPipelineConverter with floats
+        /// </summary>
+        [Fact]
+        public void TestFloatArrayPipelineDataConverter()
+        {
+            float[] testArray = { 0.1f, 0.2f, 0.3f, 0.4f, 0.5f };
+            TestArrayPipelineDataConverter(testArray);
+        }
+
+        /// <summary>
+        /// Test the ArrayPipelineConverter with doubles
+        /// </summary>
+        [Fact]
+        public void TestDoubleArrayPipelineDataConverter()
+        {
+            double[] testArray = { 0.1, 0.2, 0.3, 0.4, 0.5 };
+            TestArrayPipelineDataConverter(testArray);
+        }
+
+        /// <summary>
+        /// Test the ArrayPipelineConverter with ints
+        /// </summary>
+        [Fact]
+        public void TestIntArrayPipelineDataConverter()
+        {
+            int[] testArray = { 1, 2, 3, 4, 5 };
+            TestArrayPipelineDataConverter(testArray);
+        }
+
+        /// <summary>
+        /// Test the ArrayPipelineConverter with longs
+        /// </summary>
+        [Fact]
+        public void TestLongArrayPipelineDataConverter()
+        {
+            long[] testArray = { 1L, 2L, 3L, 4L, 5L };
+            TestArrayPipelineDataConverter(testArray);
+        }
+
+        /// <summary>
+        /// Test the ArrayPipelineConverter with generic objects
+        /// </summary>
+        [Fact]
+        public void TestObjectArrayPipelineDataConverter()
+        {
+            object[] testArray = 
+            {
+                new { A = 1, B = 2, C = 3 },
+                new { A = 2, B = 3, C = 4 },
+                new { A = 3, B = 4, C = 5 },
+                new { A = 4, B = 5, C = 6 },
+                new { F = 5, G = 6, H = 7 }
+            };
+            TestArrayPipelineDataConverter(testArray);
+        }
+
+        /// <summary>
+        /// Test the ArrayPipelineConverter with an empty array
+        /// </summary>
+        [Fact]
+        public void TestArrayPipelineDataConverterWithEmptyArray()
+        {
+            object[] testArray = new object[0];
+            TestArrayPipelineDataConverter(testArray);
+        }
+
+        /// <summary>
+        /// Test the ArrayPipelineConverter with a null array
+        /// </summary>
+        [Fact]
+        public void TestArrayPipelineDataConverterWithNullArray()
+        {
+            object[] testArray = null;
+            TestArrayPipelineDataConverter(testArray);
+        }
+
+        /// <summary>
+        /// Master test function for testing types of arrays in the ArrayPipelineDataConverter
+        /// </summary>
+        /// <typeparam name="T">The type of array to test</typeparam>
+        /// <param name="originalArray">An array to use in the test</param>
+        private static void TestArrayPipelineDataConverter<T>(T[] originalArray) where T : new()
+        {
+            // Verify that the constructor has the proper restrictions
+            AssertPositivePipelinePackageElementsRequired<T[], ArrayPipelineDataConverter<T>>();
+
+            // Test the valid case where we break up the array into smaller pieces
+            // First determine how many messages to create from originalArray
+            int pipelineMessageSize;
+            int nMessages;
+            if (originalArray == null)
+            {
+                nMessages = 0;
+                pipelineMessageSize = 1;
+            }
+            else if (originalArray.Length == 0 || originalArray.Length == 1)
+            {
+                nMessages = 1;
+                pipelineMessageSize = 1; // Necessary to instantiate the ArrayPipelineDataConverterConfig
+            }
+            else
+            {
+                nMessages = 2;
+                pipelineMessageSize = (int)Math.Ceiling(originalArray.Length / (double)nMessages);
+            }
+
+            // Test that the valid configuration can be injected
+            IConfiguration config = GetPipelineDataConverterConfig(pipelineMessageSize);
+            IPipelineDataConverter<T[]> dataConverter = TangFactory.GetTang().NewInjector(config).GetInstance<ArrayPipelineDataConverter<T>>();
+
+            var pipelineData = dataConverter.PipelineMessage(originalArray);
+
+            // Validate that the pipeline constructed the correct number of messages
+            Assert.Equal<int>(pipelineData.Count, nMessages);
+
+            T[] deserializedArray = dataConverter.FullMessage(pipelineData);
+
+            // Validate that the array is unaffected by the serialization / deserialization
+            AssertArrayEquality(originalArray, deserializedArray);
+
+            // Verify that the "IsLast" property is set correctly
+            AssertIsLastFlag(pipelineData);
+        }
+
+        /// <summary>
+        /// Verify that the IPipelineDataConverter<T> class requires a positive value for pipelinePackageElements
+        /// </summary>
+        /// <typeparam name="T"></typeparam>
+        /// <typeparam name="DataConverter"></typeparam>
+        private static void AssertPositivePipelinePackageElementsRequired<T, DataConverter>()
+            where DataConverter : class, IPipelineDataConverter<T>
+        {
+            // Verify that the PipelinePackageElements cannot be zero
+            var configWithZeroElements = GetPipelineDataConverterConfig(0);
+            Assert.Throws<InjectionException>(() => TangFactory.GetTang().NewInjector(configWithZeroElements).GetInstance<DataConverter>());
+
+            // Verify that the PipelinePackageElements cannot be less than 0
+            var configWithNegativeElements = GetPipelineDataConverterConfig(-2);
+            Assert.Throws<InjectionException>(() => TangFactory.GetTang().NewInjector(configWithNegativeElements).GetInstance<DataConverter>());
+        }
+
+        /// <summary>
+        /// Validate that the IsLast flag is properly set on a list of pipeline messages
+        /// </summary>
+        /// <typeparam name="T"></typeparam>
+        /// <param name="pipelineData">A list of PipelineMessages</param>
+        private static void AssertIsLastFlag<T>(IList<PipelineMessage<T>> pipelineData)
+        {
+            // Verify that the "IsLast" property is set correctly
+            for (int i = 0; i < pipelineData.Count; i++)
+            {
+                Assert.Equal(pipelineData[i].IsLast, i == pipelineData.Count - 1);
+            }
+        }
+
+        /// <summary>
+        /// Generic array equality method; Equality for type T must make sense for this to make sense
+        /// </summary>
+        /// <typeparam name="T"></typeparam>
+        /// <param name="expected"></param>
+        /// <param name="actual"></param>
+        private static void AssertArrayEquality<T>(T[] expected, T[] actual)
+        {
+            // Two null arrays are considered to be equal
+            // Check to make sure that the arrays are both defined or undefined
+            Assert.True((expected == null) == (actual == null));
+
+            // If the arrays are both null, then don't check any further
+            if (expected == null && actual == null)
+            {
+                return;
+            }
+
+            Assert.Equal(expected.Length, actual.Length);
+
+            for (int i = 0; i < actual.Length; i++)
+            {
+                Assert.True(EqualityComparer<T>.Default.Equals(expected[i], actual[i]));
+            }
+        }
+
+        /// <summary>
+        /// Create a configuration with a PipelinePackageElements parameter
+        /// </summary>
+        /// <param name="pipelineMessageSize">The length of the individual messages in the pipeline</param>
+        /// <returns></returns>
+        private static IConfiguration GetPipelineDataConverterConfig(int pipelineMessageSize)
+        {
+            return TangFactory.GetTang()
+                    .NewConfigurationBuilder()
+                    .BindNamedParameter(typeof(GroupCommConfigurationOptions.PipelineMessageSize), pipelineMessageSize.ToString())
+                    .Build();
+        }
+    }
+}
diff --git a/lang/cs/Org.Apache.REEF.Network.Tests/Org.Apache.REEF.Network.Tests.csproj b/lang/cs/Org.Apache.REEF.Network.Tests/Org.Apache.REEF.Network.Tests.csproj
index 5bbc8c8..f430146 100644
--- a/lang/cs/Org.Apache.REEF.Network.Tests/Org.Apache.REEF.Network.Tests.csproj
+++ b/lang/cs/Org.Apache.REEF.Network.Tests/Org.Apache.REEF.Network.Tests.csproj
@@ -52,6 +52,7 @@
     <Compile Include="$(SolutionDir)\SharedAssemblyInfo.cs">
       <Link>Properties\SharedAssemblyInfo.cs</Link>
     </Compile>
+    <Compile Include="GroupCommunication\PipeliningTests.cs" />
     <Compile Include="TcpClientConfigurationModuleTests.cs" />
     <Compile Include="BlockingCollectionExtensionTests.cs" />
     <Compile Include="GroupCommunication\GroupCommuDriverTests.cs" />
diff --git a/lang/cs/Org.Apache.REEF.Network/Group/Config/GroupCommConfigurationOptions.cs b/lang/cs/Org.Apache.REEF.Network/Group/Config/GroupCommConfigurationOptions.cs
index 51463d3..1100376 100644
--- a/lang/cs/Org.Apache.REEF.Network/Group/Config/GroupCommConfigurationOptions.cs
+++ b/lang/cs/Org.Apache.REEF.Network/Group/Config/GroupCommConfigurationOptions.cs
@@ -113,5 +113,13 @@
         public class Initialize : Name<bool>
         {
         }
+
+        /// <summary>
+        /// The number of elements to place into each message in the pipeline.
+        /// </summary>
+        [NamedParameter("The number of elements to place into each message in the pipeline.", "PipelineMessageSize", "1000000")]
+        internal sealed class PipelineMessageSize : Name<int>
+        {
+        }
     }
 }
diff --git a/lang/cs/Org.Apache.REEF.Network/Group/Pipelining/Impl/ArrayPipelineDataConverter.cs b/lang/cs/Org.Apache.REEF.Network/Group/Pipelining/Impl/ArrayPipelineDataConverter.cs
new file mode 100644
index 0000000..12540ac
--- /dev/null
+++ b/lang/cs/Org.Apache.REEF.Network/Group/Pipelining/Impl/ArrayPipelineDataConverter.cs
@@ -0,0 +1,132 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+using Org.Apache.REEF.Network.Group.Config;
+using Org.Apache.REEF.Tang.Annotations;
+using Org.Apache.REEF.Utilities.Logging;
+using System;
+using System.Collections.Generic;
+using System.Linq;
+using System.Runtime.Serialization;
+
+namespace Org.Apache.REEF.Network.Group.Pipelining.Impl
+{
+    /// <summary>
+    /// An implementation of IPipelineDataConverter for pipelining Arrays of objects.
+    /// </summary>
+    /// <typeparam name="T">The type of object of the array to pipeline</typeparam>
+    public sealed class ArrayPipelineDataConverter<T> : IPipelineDataConverter<T[]> where T : new()
+    {
+        private readonly int _pipelineMessageSize;
+        private static readonly Logger Logger = Logger.GetLogger(typeof(ArrayPipelineDataConverter<T>));
+
+        [Inject]
+        private ArrayPipelineDataConverter([Parameter(typeof(GroupCommConfigurationOptions.PipelineMessageSize))] int pipelineMessageSize)
+        {
+            if (pipelineMessageSize <= 0)
+            {
+                throw new ArgumentException("PipelinePackageSize must be strictly positive");
+            }
+            _pipelineMessageSize = pipelineMessageSize;
+        }
+
+        /// <summary>
+        /// Converts the original message to be communicated in to multiple messages, breaking the array into pieces of size PipelineMessageSize
+        /// </summary>
+        /// <param name="message">The original message</param>
+        /// <returns>The list of pipelined messages</returns>
+        public List<PipelineMessage<T[]>> PipelineMessage(T[] message)
+        {
+            // Return null messages as an empty list
+            if (message == null)
+            {
+                return new List<PipelineMessage<T[]>>();
+            }
+            else if (message.Length == 0)
+            {
+                // Special case; 0-length arrays are passed with a message with a zero-length array
+                return new List<PipelineMessage<T[]>>
+                {
+                    new PipelineMessage<T[]>(new T[0], true)
+                };
+            }
+
+            int messageCount = ((message.Length - 1) / _pipelineMessageSize) + 1;
+            List<PipelineMessage<T[]>> messageList = new List<PipelineMessage<T[]>>(messageCount);
+            int offset = 0;
+            while (offset < message.Length)
+            {
+                int subLen = Math.Min(message.Length - offset, _pipelineMessageSize);
+                if (subLen <= 0)
+                {
+                    throw new ArithmeticException("Tried to create a pipeline package with fewer than 1 element.");
+                }
+                T[] data = new T[subLen];
+                Array.Copy(message, offset, data, 0, subLen);
+                bool isLast = subLen + offset == message.Length;
+                messageList.Add(new PipelineMessage<T[]>(data, isLast));
+                offset += subLen;
+            }
+            if (messageCount != messageList.Count)
+            {
+                throw new SerializationException(string.Format("The wrong number of pipeline packages were created: Expected {0} but created {1}.", messageCount, messageList.Count));
+            }
+
+            return messageList;
+        }
+
+        /// <summary>
+        /// Constructs the full final message from the communicated pipelined message
+        /// </summary>
+        /// <param name="pipelineMessage">A list of received pipelined messages</param>
+        /// <returns>The full constructed message</returns>
+        public T[] FullMessage(List<PipelineMessage<T[]>> pipelineMessage)
+        {
+            // Null data corresponds to an empty list of pipeline messages
+            if (pipelineMessage.Count == 0)
+            {
+                return null;
+            }
+            if (pipelineMessage.Count == 1)
+            {
+                return pipelineMessage[0].Data;
+            }
+
+            int nElements = pipelineMessage.Sum(v => v.Data.Length);
+            
+            // A zero-length array gets returned as a zero-length array
+            if (nElements == 0)
+            {
+                return new T[0];
+            }
+
+            int offset = 0;
+            T[] values = new T[nElements];
+            foreach (var message in pipelineMessage)
+            {
+                Array.Copy(message.Data, 0, values, offset, message.Data.Length);
+                offset += message.Data.Length;
+            }
+            if (offset != nElements)
+            {
+                throw new SerializationException(string.Format("The pipeline packages were deserialized incorrectly created: Expected {0} but created {1}.", nElements, offset));
+            }
+
+            return values;
+        }
+    }
+}
diff --git a/lang/cs/Org.Apache.REEF.Network/Org.Apache.REEF.Network.csproj b/lang/cs/Org.Apache.REEF.Network/Org.Apache.REEF.Network.csproj
index 0b57f89..0beb953 100644
--- a/lang/cs/Org.Apache.REEF.Network/Org.Apache.REEF.Network.csproj
+++ b/lang/cs/Org.Apache.REEF.Network/Org.Apache.REEF.Network.csproj
@@ -85,6 +85,7 @@
     <Compile Include="Group\Operators\Impl\ScatterSender.cs" />
     <Compile Include="Group\Operators\Impl\Sender.cs" />
     <Compile Include="Group\Operators\IOperatorSpec.cs" />
+    <Compile Include="Group\Pipelining\Impl\ArrayPipelineDataConverter.cs" />
     <Compile Include="Group\Pipelining\StreamingPipelineMessageCodec.cs" />
     <Compile Include="Group\Task\ICommunicationGroupClientInternal.cs" />
     <Compile Include="Group\Task\Impl\ChildNodeContainer.cs" />