[REEF-1775] ArrayPipelineDataConverter : FullMessage will throw on null data
This adds an explicit null-filter on incoming pipeline messages.
JIRA:
[REEF-1775](https://issues.apache.org/jira/browse/REEF-1775)
Pull Request:
This closes #1289
diff --git a/lang/cs/Org.Apache.REEF.Network.Tests/GroupCommunication/PipeliningTests.cs b/lang/cs/Org.Apache.REEF.Network.Tests/GroupCommunication/PipeliningTests.cs
index d777086..9853725 100644
--- a/lang/cs/Org.Apache.REEF.Network.Tests/GroupCommunication/PipeliningTests.cs
+++ b/lang/cs/Org.Apache.REEF.Network.Tests/GroupCommunication/PipeliningTests.cs
@@ -119,6 +119,9 @@
// Verify that the constructor has the proper restrictions
AssertPositivePipelinePackageElementsRequired<T[], ArrayPipelineDataConverter<T>>();
+ // Verify that the FullMessage method properly handles null and zero-length arrays
+ AssertNullAndZeroLengthArraysHandledByFullMessage<T, ArrayPipelineDataConverter<T>>();
+
// Test the valid case where we break up the array into smaller pieces
// First determine how many messages to create from originalArray
int pipelineMessageSize;
@@ -175,6 +178,59 @@
}
/// <summary>
+ /// Verify that the FullMessage method properly handles null and zero-length arrays
+ /// </summary>
+ /// <typeparam name="T">The type of array the IPipelineDataConverter converts</typeparam>
+ /// <typeparam name="DataConverter">The IPipelineDataConverter implementation</typeparam>
+ private static void AssertNullAndZeroLengthArraysHandledByFullMessage<T, DataConverter>()
+ where T : new()
+ where DataConverter : class, IPipelineDataConverter<T[]>
+ {
+ // Test that the valid configuration can be injected
+ IConfiguration config = GetPipelineDataConverterConfig(1);
+ IPipelineDataConverter<T[]> dataConverter = TangFactory.GetTang().NewInjector(config).GetInstance<ArrayPipelineDataConverter<T>>();
+
+ // Test that a null message returns a null object
+ List<PipelineMessage<T[]>> nullMessage = new List<PipelineMessage<T[]>>
+ {
+ new PipelineMessage<T[]>(null, true)
+ };
+ Assert.Null(dataConverter.FullMessage(nullMessage));
+
+ // Test that many null messages returns a null object
+ List<PipelineMessage<T[]>> manyNullMessages = new List<PipelineMessage<T[]>>
+ {
+ new PipelineMessage<T[]>(null, false),
+ new PipelineMessage<T[]>(null, true)
+ };
+ Assert.Null(dataConverter.FullMessage(manyNullMessages));
+
+ // Test that null messages mixed with non-null returns the non-null object
+ List<PipelineMessage<T[]>> someNullMessages = new List<PipelineMessage<T[]>>
+ {
+ new PipelineMessage<T[]>(null, false),
+ new PipelineMessage<T[]>(new T[2], false),
+ new PipelineMessage<T[]>(null, true)
+ };
+ Assert.Equal(2, dataConverter.FullMessage(someNullMessages).Length);
+
+ // Test that a zero-length message returns a zero-length object
+ List<PipelineMessage<T[]>> zeroLengthMessage = new List<PipelineMessage<T[]>>
+ {
+ new PipelineMessage<T[]>(new T[0], true)
+ };
+ Assert.Equal(0, dataConverter.FullMessage(zeroLengthMessage).Length);
+
+ // Test that many zero-length message return a zero-length object
+ List<PipelineMessage<T[]>> manyZeroLengthMessages = new List<PipelineMessage<T[]>>
+ {
+ new PipelineMessage<T[]>(new T[0], false),
+ new PipelineMessage<T[]>(new T[0], true)
+ };
+ Assert.Equal(0, dataConverter.FullMessage(manyZeroLengthMessages).Length);
+ }
+
+ /// <summary>
/// Validate that the IsLast flag is properly set on a list of pipeline messages
/// </summary>
/// <typeparam name="T"></typeparam>
diff --git a/lang/cs/Org.Apache.REEF.Network/Group/Pipelining/Impl/ArrayPipelineDataConverter.cs b/lang/cs/Org.Apache.REEF.Network/Group/Pipelining/Impl/ArrayPipelineDataConverter.cs
index 12540ac..6d43812 100644
--- a/lang/cs/Org.Apache.REEF.Network/Group/Pipelining/Impl/ArrayPipelineDataConverter.cs
+++ b/lang/cs/Org.Apache.REEF.Network/Group/Pipelining/Impl/ArrayPipelineDataConverter.cs
@@ -96,6 +96,9 @@
/// <returns>The full constructed message</returns>
public T[] FullMessage(List<PipelineMessage<T[]>> pipelineMessage)
{
+ // Filter out messages with null data payloads
+ pipelineMessage = pipelineMessage.Where(message => message.Data != null).ToList();
+
// Null data corresponds to an empty list of pipeline messages
if (pipelineMessage.Count == 0)
{