[REEF-1772] Implement IPipelineDataConverter in REEF.NET for arrays of data
This adds a new implementation for `IPipelineDataConverter` that supports
arrays of arbitrary primitive types.
JIRA:
[REEF-1772](https://issues.apache.org/jira/browse/REEF-1772)
Pull Request:
This closes #1286
diff --git a/lang/cs/Org.Apache.REEF.Network.Tests/GroupCommunication/PipeliningTests.cs b/lang/cs/Org.Apache.REEF.Network.Tests/GroupCommunication/PipeliningTests.cs
new file mode 100644
index 0000000..d777086
--- /dev/null
+++ b/lang/cs/Org.Apache.REEF.Network.Tests/GroupCommunication/PipeliningTests.cs
@@ -0,0 +1,230 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+using Org.Apache.REEF.Network.Group.Config;
+using Org.Apache.REEF.Network.Group.Pipelining;
+using Org.Apache.REEF.Network.Group.Pipelining.Impl;
+using Org.Apache.REEF.Tang.Exceptions;
+using Org.Apache.REEF.Tang.Implementations.Tang;
+using Org.Apache.REEF.Tang.Interface;
+using System;
+using System.Collections.Generic;
+using Xunit;
+
+namespace Org.Apache.REEF.Network.Tests.GroupCommunication
+{
+ /// <summary>
+ /// Defines pipelining component tests
+ /// </summary>
+ public class PipeliningTests
+ {
+ /// <summary>
+ /// Test the ArrayPipelineConverter with floats
+ /// </summary>
+ [Fact]
+ public void TestFloatArrayPipelineDataConverter()
+ {
+ float[] testArray = { 0.1f, 0.2f, 0.3f, 0.4f, 0.5f };
+ TestArrayPipelineDataConverter(testArray);
+ }
+
+ /// <summary>
+ /// Test the ArrayPipelineConverter with doubles
+ /// </summary>
+ [Fact]
+ public void TestDoubleArrayPipelineDataConverter()
+ {
+ double[] testArray = { 0.1, 0.2, 0.3, 0.4, 0.5 };
+ TestArrayPipelineDataConverter(testArray);
+ }
+
+ /// <summary>
+ /// Test the ArrayPipelineConverter with ints
+ /// </summary>
+ [Fact]
+ public void TestIntArrayPipelineDataConverter()
+ {
+ int[] testArray = { 1, 2, 3, 4, 5 };
+ TestArrayPipelineDataConverter(testArray);
+ }
+
+ /// <summary>
+ /// Test the ArrayPipelineConverter with longs
+ /// </summary>
+ [Fact]
+ public void TestLongArrayPipelineDataConverter()
+ {
+ long[] testArray = { 1L, 2L, 3L, 4L, 5L };
+ TestArrayPipelineDataConverter(testArray);
+ }
+
+ /// <summary>
+ /// Test the ArrayPipelineConverter with generic objects
+ /// </summary>
+ [Fact]
+ public void TestObjectArrayPipelineDataConverter()
+ {
+ object[] testArray =
+ {
+ new { A = 1, B = 2, C = 3 },
+ new { A = 2, B = 3, C = 4 },
+ new { A = 3, B = 4, C = 5 },
+ new { A = 4, B = 5, C = 6 },
+ new { F = 5, G = 6, H = 7 }
+ };
+ TestArrayPipelineDataConverter(testArray);
+ }
+
+ /// <summary>
+ /// Test the ArrayPipelineConverter with an empty array
+ /// </summary>
+ [Fact]
+ public void TestArrayPipelineDataConverterWithEmptyArray()
+ {
+ object[] testArray = new object[0];
+ TestArrayPipelineDataConverter(testArray);
+ }
+
+ /// <summary>
+ /// Test the ArrayPipelineConverter with a null array
+ /// </summary>
+ [Fact]
+ public void TestArrayPipelineDataConverterWithNullArray()
+ {
+ object[] testArray = null;
+ TestArrayPipelineDataConverter(testArray);
+ }
+
+ /// <summary>
+ /// Master test function for testing types of arrays in the ArrayPipelineDataConverter
+ /// </summary>
+ /// <typeparam name="T">The type of array to test</typeparam>
+ /// <param name="originalArray">An array to use in the test</param>
+ private static void TestArrayPipelineDataConverter<T>(T[] originalArray) where T : new()
+ {
+ // Verify that the constructor has the proper restrictions
+ AssertPositivePipelinePackageElementsRequired<T[], ArrayPipelineDataConverter<T>>();
+
+ // Test the valid case where we break up the array into smaller pieces
+ // First determine how many messages to create from originalArray
+ int pipelineMessageSize;
+ int nMessages;
+ if (originalArray == null)
+ {
+ nMessages = 0;
+ pipelineMessageSize = 1;
+ }
+ else if (originalArray.Length == 0 || originalArray.Length == 1)
+ {
+ nMessages = 1;
+ pipelineMessageSize = 1; // Necessary to instantiate the ArrayPipelineDataConverterConfig
+ }
+ else
+ {
+ nMessages = 2;
+ pipelineMessageSize = (int)Math.Ceiling(originalArray.Length / (double)nMessages);
+ }
+
+ // Test that the valid configuration can be injected
+ IConfiguration config = GetPipelineDataConverterConfig(pipelineMessageSize);
+ IPipelineDataConverter<T[]> dataConverter = TangFactory.GetTang().NewInjector(config).GetInstance<ArrayPipelineDataConverter<T>>();
+
+ var pipelineData = dataConverter.PipelineMessage(originalArray);
+
+ // Validate that the pipeline constructed the correct number of messages
+ Assert.Equal<int>(pipelineData.Count, nMessages);
+
+ T[] deserializedArray = dataConverter.FullMessage(pipelineData);
+
+ // Validate that the array is unaffected by the serialization / deserialization
+ AssertArrayEquality(originalArray, deserializedArray);
+
+ // Verify that the "IsLast" property is set correctly
+ AssertIsLastFlag(pipelineData);
+ }
+
+ /// <summary>
+ /// Verify that the IPipelineDataConverter<T> class requires a positive value for pipelinePackageElements
+ /// </summary>
+ /// <typeparam name="T"></typeparam>
+ /// <typeparam name="DataConverter"></typeparam>
+ private static void AssertPositivePipelinePackageElementsRequired<T, DataConverter>()
+ where DataConverter : class, IPipelineDataConverter<T>
+ {
+ // Verify that the PipelinePackageElements cannot be zero
+ var configWithZeroElements = GetPipelineDataConverterConfig(0);
+ Assert.Throws<InjectionException>(() => TangFactory.GetTang().NewInjector(configWithZeroElements).GetInstance<DataConverter>());
+
+ // Verify that the PipelinePackageElements cannot be less than 0
+ var configWithNegativeElements = GetPipelineDataConverterConfig(-2);
+ Assert.Throws<InjectionException>(() => TangFactory.GetTang().NewInjector(configWithNegativeElements).GetInstance<DataConverter>());
+ }
+
+ /// <summary>
+ /// Validate that the IsLast flag is properly set on a list of pipeline messages
+ /// </summary>
+ /// <typeparam name="T"></typeparam>
+ /// <param name="pipelineData">A list of PipelineMessages</param>
+ private static void AssertIsLastFlag<T>(IList<PipelineMessage<T>> pipelineData)
+ {
+ // Verify that the "IsLast" property is set correctly
+ for (int i = 0; i < pipelineData.Count; i++)
+ {
+ Assert.Equal(pipelineData[i].IsLast, i == pipelineData.Count - 1);
+ }
+ }
+
+ /// <summary>
+ /// Generic array equality method; Equality for type T must make sense for this to make sense
+ /// </summary>
+ /// <typeparam name="T"></typeparam>
+ /// <param name="expected"></param>
+ /// <param name="actual"></param>
+ private static void AssertArrayEquality<T>(T[] expected, T[] actual)
+ {
+ // Two null arrays are considered to be equal
+ // Check to make sure that the arrays are both defined or undefined
+ Assert.True((expected == null) == (actual == null));
+
+ // If the arrays are both null, then don't check any further
+ if (expected == null && actual == null)
+ {
+ return;
+ }
+
+ Assert.Equal(expected.Length, actual.Length);
+
+ for (int i = 0; i < actual.Length; i++)
+ {
+ Assert.True(EqualityComparer<T>.Default.Equals(expected[i], actual[i]));
+ }
+ }
+
+ /// <summary>
+ /// Create a configuration with a PipelinePackageElements parameter
+ /// </summary>
+ /// <param name="pipelineMessageSize">The length of the individual messages in the pipeline</param>
+ /// <returns></returns>
+ private static IConfiguration GetPipelineDataConverterConfig(int pipelineMessageSize)
+ {
+ return TangFactory.GetTang()
+ .NewConfigurationBuilder()
+ .BindNamedParameter(typeof(GroupCommConfigurationOptions.PipelineMessageSize), pipelineMessageSize.ToString())
+ .Build();
+ }
+ }
+}
diff --git a/lang/cs/Org.Apache.REEF.Network.Tests/Org.Apache.REEF.Network.Tests.csproj b/lang/cs/Org.Apache.REEF.Network.Tests/Org.Apache.REEF.Network.Tests.csproj
index 5bbc8c8..f430146 100644
--- a/lang/cs/Org.Apache.REEF.Network.Tests/Org.Apache.REEF.Network.Tests.csproj
+++ b/lang/cs/Org.Apache.REEF.Network.Tests/Org.Apache.REEF.Network.Tests.csproj
@@ -52,6 +52,7 @@
<Compile Include="$(SolutionDir)\SharedAssemblyInfo.cs">
<Link>Properties\SharedAssemblyInfo.cs</Link>
</Compile>
+ <Compile Include="GroupCommunication\PipeliningTests.cs" />
<Compile Include="TcpClientConfigurationModuleTests.cs" />
<Compile Include="BlockingCollectionExtensionTests.cs" />
<Compile Include="GroupCommunication\GroupCommuDriverTests.cs" />
diff --git a/lang/cs/Org.Apache.REEF.Network/Group/Config/GroupCommConfigurationOptions.cs b/lang/cs/Org.Apache.REEF.Network/Group/Config/GroupCommConfigurationOptions.cs
index 51463d3..1100376 100644
--- a/lang/cs/Org.Apache.REEF.Network/Group/Config/GroupCommConfigurationOptions.cs
+++ b/lang/cs/Org.Apache.REEF.Network/Group/Config/GroupCommConfigurationOptions.cs
@@ -113,5 +113,13 @@
public class Initialize : Name<bool>
{
}
+
+ /// <summary>
+ /// The number of elements to place into each message in the pipeline.
+ /// </summary>
+ [NamedParameter("The number of elements to place into each message in the pipeline.", "PipelineMessageSize", "1000000")]
+ internal sealed class PipelineMessageSize : Name<int>
+ {
+ }
}
}
diff --git a/lang/cs/Org.Apache.REEF.Network/Group/Pipelining/Impl/ArrayPipelineDataConverter.cs b/lang/cs/Org.Apache.REEF.Network/Group/Pipelining/Impl/ArrayPipelineDataConverter.cs
new file mode 100644
index 0000000..12540ac
--- /dev/null
+++ b/lang/cs/Org.Apache.REEF.Network/Group/Pipelining/Impl/ArrayPipelineDataConverter.cs
@@ -0,0 +1,132 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+using Org.Apache.REEF.Network.Group.Config;
+using Org.Apache.REEF.Tang.Annotations;
+using Org.Apache.REEF.Utilities.Logging;
+using System;
+using System.Collections.Generic;
+using System.Linq;
+using System.Runtime.Serialization;
+
+namespace Org.Apache.REEF.Network.Group.Pipelining.Impl
+{
+ /// <summary>
+ /// An implementation of IPipelineDataConverter for pipelining Arrays of objects.
+ /// </summary>
+ /// <typeparam name="T">The type of object of the array to pipeline</typeparam>
+ public sealed class ArrayPipelineDataConverter<T> : IPipelineDataConverter<T[]> where T : new()
+ {
+ private readonly int _pipelineMessageSize;
+ private static readonly Logger Logger = Logger.GetLogger(typeof(ArrayPipelineDataConverter<T>));
+
+ [Inject]
+ private ArrayPipelineDataConverter([Parameter(typeof(GroupCommConfigurationOptions.PipelineMessageSize))] int pipelineMessageSize)
+ {
+ if (pipelineMessageSize <= 0)
+ {
+ throw new ArgumentException("PipelinePackageSize must be strictly positive");
+ }
+ _pipelineMessageSize = pipelineMessageSize;
+ }
+
+ /// <summary>
+ /// Converts the original message to be communicated in to multiple messages, breaking the array into pieces of size PipelineMessageSize
+ /// </summary>
+ /// <param name="message">The original message</param>
+ /// <returns>The list of pipelined messages</returns>
+ public List<PipelineMessage<T[]>> PipelineMessage(T[] message)
+ {
+ // Return null messages as an empty list
+ if (message == null)
+ {
+ return new List<PipelineMessage<T[]>>();
+ }
+ else if (message.Length == 0)
+ {
+ // Special case; 0-length arrays are passed with a message with a zero-length array
+ return new List<PipelineMessage<T[]>>
+ {
+ new PipelineMessage<T[]>(new T[0], true)
+ };
+ }
+
+ int messageCount = ((message.Length - 1) / _pipelineMessageSize) + 1;
+ List<PipelineMessage<T[]>> messageList = new List<PipelineMessage<T[]>>(messageCount);
+ int offset = 0;
+ while (offset < message.Length)
+ {
+ int subLen = Math.Min(message.Length - offset, _pipelineMessageSize);
+ if (subLen <= 0)
+ {
+ throw new ArithmeticException("Tried to create a pipeline package with fewer than 1 element.");
+ }
+ T[] data = new T[subLen];
+ Array.Copy(message, offset, data, 0, subLen);
+ bool isLast = subLen + offset == message.Length;
+ messageList.Add(new PipelineMessage<T[]>(data, isLast));
+ offset += subLen;
+ }
+ if (messageCount != messageList.Count)
+ {
+ throw new SerializationException(string.Format("The wrong number of pipeline packages were created: Expected {0} but created {1}.", messageCount, messageList.Count));
+ }
+
+ return messageList;
+ }
+
+ /// <summary>
+ /// Constructs the full final message from the communicated pipelined message
+ /// </summary>
+ /// <param name="pipelineMessage">A list of received pipelined messages</param>
+ /// <returns>The full constructed message</returns>
+ public T[] FullMessage(List<PipelineMessage<T[]>> pipelineMessage)
+ {
+ // Null data corresponds to an empty list of pipeline messages
+ if (pipelineMessage.Count == 0)
+ {
+ return null;
+ }
+ if (pipelineMessage.Count == 1)
+ {
+ return pipelineMessage[0].Data;
+ }
+
+ int nElements = pipelineMessage.Sum(v => v.Data.Length);
+
+ // A zero-length array gets returned as a zero-length array
+ if (nElements == 0)
+ {
+ return new T[0];
+ }
+
+ int offset = 0;
+ T[] values = new T[nElements];
+ foreach (var message in pipelineMessage)
+ {
+ Array.Copy(message.Data, 0, values, offset, message.Data.Length);
+ offset += message.Data.Length;
+ }
+ if (offset != nElements)
+ {
+ throw new SerializationException(string.Format("The pipeline packages were deserialized incorrectly created: Expected {0} but created {1}.", nElements, offset));
+ }
+
+ return values;
+ }
+ }
+}
diff --git a/lang/cs/Org.Apache.REEF.Network/Org.Apache.REEF.Network.csproj b/lang/cs/Org.Apache.REEF.Network/Org.Apache.REEF.Network.csproj
index 0b57f89..0beb953 100644
--- a/lang/cs/Org.Apache.REEF.Network/Org.Apache.REEF.Network.csproj
+++ b/lang/cs/Org.Apache.REEF.Network/Org.Apache.REEF.Network.csproj
@@ -85,6 +85,7 @@
<Compile Include="Group\Operators\Impl\ScatterSender.cs" />
<Compile Include="Group\Operators\Impl\Sender.cs" />
<Compile Include="Group\Operators\IOperatorSpec.cs" />
+ <Compile Include="Group\Pipelining\Impl\ArrayPipelineDataConverter.cs" />
<Compile Include="Group\Pipelining\StreamingPipelineMessageCodec.cs" />
<Compile Include="Group\Task\ICommunicationGroupClientInternal.cs" />
<Compile Include="Group\Task\Impl\ChildNodeContainer.cs" />