blob: 45c9bbd530d25c828653c9ff4c10aecfeeabd7b5 [file] [log] [blame]
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
using System.Collections.Generic;
using System.Linq;
using Org.Apache.REEF.Network.Group.Config;
using Org.Apache.REEF.Network.Group.Driver;
using Org.Apache.REEF.Network.Group.Operators;
using Org.Apache.REEF.Network.Group.Operators.Impl;
using Org.Apache.REEF.Network.Group.Pipelining.Impl;
using Org.Apache.REEF.Network.Group.Topology;
using Org.Apache.REEF.Tang.Interface;
using Org.Apache.REEF.Tang.Util;
using Org.Apache.REEF.Wake.StreamingCodec.CommonStreamingCodecs;
using Xunit;
namespace Org.Apache.REEF.Network.Tests.GroupCommunication
{
public class GroupCommunicationTreeTopologyTests
{
[Fact]
public void TestTreeTopology()
{
TreeTopology<int> topology = new TreeTopology<int>("Operator", "Operator", "task1", "driverid",
new BroadcastOperatorSpec("task1", GetDefaultDataConverterConfig()), 2);
for (int i = 1; i < 8; i++)
{
string taskid = "task" + i;
topology.AddTask(taskid);
}
for (int i = 1; i < 8; i++)
{
var conf = topology.GetTaskConfiguration("task" + i);
}
}
[Fact]
public void TestReduceOperator()
{
string groupName = "group1";
string operatorName = "reduce";
int numTasks = 10;
string driverId = "driverId";
string masterTaskId = "task0";
int fanOut = 3;
var groupCommDriver = GroupCommunicationTests.GetInstanceOfGroupCommDriver(driverId, masterTaskId, groupName, fanOut, numTasks);
ICommunicationGroupDriver commGroup = groupCommDriver.DefaultGroup
.AddReduce<int>(operatorName, masterTaskId, TopologyTypes.Tree, GetDefaultDataConverterConfig(), GetDefaultReduceFuncConfig())
.Build();
var commGroups = GroupCommunicationTests.CommGroupClients(groupName, numTasks, groupCommDriver, commGroup, GetDefaultCodecConfig());
IReduceReceiver<int> receiver = commGroups[0].GetReduceReceiver<int>(operatorName);
IReduceSender<int> sender1 = commGroups[1].GetReduceSender<int>(operatorName);
IReduceSender<int> sender2 = commGroups[2].GetReduceSender<int>(operatorName);
IReduceSender<int> sender3 = commGroups[3].GetReduceSender<int>(operatorName);
IReduceSender<int> sender4 = commGroups[4].GetReduceSender<int>(operatorName);
IReduceSender<int> sender5 = commGroups[5].GetReduceSender<int>(operatorName);
IReduceSender<int> sender6 = commGroups[6].GetReduceSender<int>(operatorName);
IReduceSender<int> sender7 = commGroups[7].GetReduceSender<int>(operatorName);
IReduceSender<int> sender8 = commGroups[8].GetReduceSender<int>(operatorName);
IReduceSender<int> sender9 = commGroups[9].GetReduceSender<int>(operatorName);
Assert.NotNull(receiver);
Assert.NotNull(sender1);
Assert.NotNull(sender2);
Assert.NotNull(sender3);
Assert.NotNull(sender4);
Assert.NotNull(sender5);
Assert.NotNull(sender6);
Assert.NotNull(sender7);
Assert.NotNull(sender8);
Assert.NotNull(sender9);
sender9.Send(9);
sender8.Send(8);
sender7.Send(7);
sender6.Send(6);
sender5.Send(5);
sender4.Send(4);
sender3.Send(3);
sender2.Send(2);
sender1.Send(1);
Assert.Equal(45, receiver.Reduce());
}
[Fact]
public void TestBroadcastOperator()
{
string groupName = "group1";
string operatorName = "broadcast";
string driverId = "driverId";
string masterTaskId = "task0";
int numTasks = 10;
int value1 = 1337;
int value2 = 42;
int value3 = 99;
int fanOut = 3;
var groupCommDriver = GroupCommunicationTests.GetInstanceOfGroupCommDriver(driverId, masterTaskId, groupName, fanOut, numTasks);
ICommunicationGroupDriver commGroup = groupCommDriver.DefaultGroup
.AddBroadcast<int>(operatorName, masterTaskId, TopologyTypes.Tree, GetDefaultDataConverterConfig(), GetDefaultReduceFuncConfig())
.Build();
var commGroups = GroupCommunicationTests.CommGroupClients(groupName, numTasks, groupCommDriver, commGroup, GetDefaultCodecConfig());
IBroadcastSender<int> sender = commGroups[0].GetBroadcastSender<int>(operatorName);
IBroadcastReceiver<int> receiver1 = commGroups[1].GetBroadcastReceiver<int>(operatorName);
IBroadcastReceiver<int> receiver2 = commGroups[2].GetBroadcastReceiver<int>(operatorName);
IBroadcastReceiver<int> receiver3 = commGroups[3].GetBroadcastReceiver<int>(operatorName);
IBroadcastReceiver<int> receiver4 = commGroups[4].GetBroadcastReceiver<int>(operatorName);
IBroadcastReceiver<int> receiver5 = commGroups[5].GetBroadcastReceiver<int>(operatorName);
IBroadcastReceiver<int> receiver6 = commGroups[6].GetBroadcastReceiver<int>(operatorName);
IBroadcastReceiver<int> receiver7 = commGroups[7].GetBroadcastReceiver<int>(operatorName);
IBroadcastReceiver<int> receiver8 = commGroups[8].GetBroadcastReceiver<int>(operatorName);
IBroadcastReceiver<int> receiver9 = commGroups[9].GetBroadcastReceiver<int>(operatorName);
Assert.NotNull(sender);
Assert.NotNull(receiver1);
Assert.NotNull(receiver2);
Assert.NotNull(receiver3);
Assert.NotNull(receiver4);
Assert.NotNull(receiver5);
Assert.NotNull(receiver6);
Assert.NotNull(receiver7);
Assert.NotNull(receiver8);
Assert.NotNull(receiver9);
sender.Send(value1);
Assert.Equal(value1, receiver1.Receive());
Assert.Equal(value1, receiver2.Receive());
Assert.Equal(value1, receiver3.Receive());
Assert.Equal(value1, receiver4.Receive());
Assert.Equal(value1, receiver5.Receive());
Assert.Equal(value1, receiver6.Receive());
Assert.Equal(value1, receiver7.Receive());
Assert.Equal(value1, receiver8.Receive());
Assert.Equal(value1, receiver9.Receive());
sender.Send(value2);
Assert.Equal(value2, receiver1.Receive());
Assert.Equal(value2, receiver2.Receive());
Assert.Equal(value2, receiver3.Receive());
Assert.Equal(value2, receiver4.Receive());
Assert.Equal(value2, receiver5.Receive());
Assert.Equal(value2, receiver6.Receive());
Assert.Equal(value2, receiver7.Receive());
Assert.Equal(value2, receiver8.Receive());
Assert.Equal(value2, receiver9.Receive());
sender.Send(value3);
Assert.Equal(value3, receiver1.Receive());
Assert.Equal(value3, receiver2.Receive());
Assert.Equal(value3, receiver3.Receive());
Assert.Equal(value3, receiver4.Receive());
Assert.Equal(value3, receiver5.Receive());
Assert.Equal(value3, receiver6.Receive());
Assert.Equal(value3, receiver7.Receive());
Assert.Equal(value3, receiver8.Receive());
Assert.Equal(value3, receiver9.Receive());
}
[Fact]
public void TestBroadcastReduceOperators()
{
string groupName = "group1";
string broadcastOperatorName = "broadcast";
string reduceOperatorName = "reduce";
string masterTaskId = "task0";
string driverId = "Driver Id";
int numTasks = 10;
int fanOut = 3;
var groupCommDriver = GroupCommunicationTests.GetInstanceOfGroupCommDriver(driverId, masterTaskId, groupName, fanOut, numTasks);
ICommunicationGroupDriver commGroup = groupCommDriver.DefaultGroup
.AddBroadcast<int>(
broadcastOperatorName,
masterTaskId,
TopologyTypes.Tree,
GetDefaultDataConverterConfig())
.AddReduce<int>(
reduceOperatorName,
masterTaskId,
TopologyTypes.Tree,
GetDefaultDataConverterConfig(),
GetDefaultReduceFuncConfig())
.Build();
var commGroups = GroupCommunicationTests.CommGroupClients(groupName, numTasks, groupCommDriver, commGroup, GetDefaultCodecConfig());
// for master task
IBroadcastSender<int> broadcastSender = commGroups[0].GetBroadcastSender<int>(broadcastOperatorName);
IReduceReceiver<int> sumReducer = commGroups[0].GetReduceReceiver<int>(reduceOperatorName);
IBroadcastReceiver<int> broadcastReceiver1 = commGroups[1].GetBroadcastReceiver<int>(broadcastOperatorName);
IReduceSender<int> triangleNumberSender1 = commGroups[1].GetReduceSender<int>(reduceOperatorName);
IBroadcastReceiver<int> broadcastReceiver2 = commGroups[2].GetBroadcastReceiver<int>(broadcastOperatorName);
IReduceSender<int> triangleNumberSender2 = commGroups[2].GetReduceSender<int>(reduceOperatorName);
IBroadcastReceiver<int> broadcastReceiver3 = commGroups[3].GetBroadcastReceiver<int>(broadcastOperatorName);
IReduceSender<int> triangleNumberSender3 = commGroups[3].GetReduceSender<int>(reduceOperatorName);
IBroadcastReceiver<int> broadcastReceiver4 = commGroups[4].GetBroadcastReceiver<int>(broadcastOperatorName);
IReduceSender<int> triangleNumberSender4 = commGroups[4].GetReduceSender<int>(reduceOperatorName);
IBroadcastReceiver<int> broadcastReceiver5 = commGroups[5].GetBroadcastReceiver<int>(broadcastOperatorName);
IReduceSender<int> triangleNumberSender5 = commGroups[5].GetReduceSender<int>(reduceOperatorName);
IBroadcastReceiver<int> broadcastReceiver6 = commGroups[6].GetBroadcastReceiver<int>(broadcastOperatorName);
IReduceSender<int> triangleNumberSender6 = commGroups[6].GetReduceSender<int>(reduceOperatorName);
IBroadcastReceiver<int> broadcastReceiver7 = commGroups[7].GetBroadcastReceiver<int>(broadcastOperatorName);
IReduceSender<int> triangleNumberSender7 = commGroups[7].GetReduceSender<int>(reduceOperatorName);
IBroadcastReceiver<int> broadcastReceiver8 = commGroups[8].GetBroadcastReceiver<int>(broadcastOperatorName);
IReduceSender<int> triangleNumberSender8 = commGroups[8].GetReduceSender<int>(reduceOperatorName);
IBroadcastReceiver<int> broadcastReceiver9 = commGroups[9].GetBroadcastReceiver<int>(broadcastOperatorName);
IReduceSender<int> triangleNumberSender9 = commGroups[9].GetReduceSender<int>(reduceOperatorName);
for (int i = 1; i <= 10; i++)
{
broadcastSender.Send(i);
int n1 = broadcastReceiver1.Receive();
int n2 = broadcastReceiver2.Receive();
int n3 = broadcastReceiver3.Receive();
int n4 = broadcastReceiver4.Receive();
int n5 = broadcastReceiver5.Receive();
int n6 = broadcastReceiver6.Receive();
int n7 = broadcastReceiver7.Receive();
int n8 = broadcastReceiver8.Receive();
int n9 = broadcastReceiver9.Receive();
Assert.Equal(i, n1);
Assert.Equal(i, n2);
Assert.Equal(i, n3);
Assert.Equal(i, n4);
Assert.Equal(i, n5);
Assert.Equal(i, n6);
Assert.Equal(i, n7);
Assert.Equal(i, n8);
Assert.Equal(i, n9);
int triangleNum9 = GroupCommunicationTests.TriangleNumber(n9);
triangleNumberSender9.Send(triangleNum9);
int triangleNum8 = GroupCommunicationTests.TriangleNumber(n8);
triangleNumberSender8.Send(triangleNum8);
int triangleNum7 = GroupCommunicationTests.TriangleNumber(n7);
triangleNumberSender7.Send(triangleNum7);
int triangleNum6 = GroupCommunicationTests.TriangleNumber(n6);
triangleNumberSender6.Send(triangleNum6);
int triangleNum5 = GroupCommunicationTests.TriangleNumber(n5);
triangleNumberSender5.Send(triangleNum5);
int triangleNum4 = GroupCommunicationTests.TriangleNumber(n4);
triangleNumberSender4.Send(triangleNum4);
int triangleNum3 = GroupCommunicationTests.TriangleNumber(n3);
triangleNumberSender3.Send(triangleNum3);
int triangleNum2 = GroupCommunicationTests.TriangleNumber(n2);
triangleNumberSender2.Send(triangleNum2);
int triangleNum1 = GroupCommunicationTests.TriangleNumber(n1);
triangleNumberSender1.Send(triangleNum1);
int sum = sumReducer.Reduce();
int expected = GroupCommunicationTests.TriangleNumber(i) * (numTasks - 1);
Assert.Equal(sum, expected);
}
}
[Fact]
public void TestScatterOperator()
{
string groupName = "group1";
string operatorName = "scatter";
string masterTaskId = "task0";
string driverId = "Driver Id";
int numTasks = 5;
int fanOut = 2;
var groupCommDriver = GroupCommunicationTests.GetInstanceOfGroupCommDriver(driverId, masterTaskId, groupName, fanOut, numTasks);
ICommunicationGroupDriver commGroup = groupCommDriver.DefaultGroup
.AddScatter<int>(operatorName, masterTaskId, TopologyTypes.Tree, GetDefaultDataConverterConfig())
.Build();
var commGroups = GroupCommunicationTests.CommGroupClients(groupName, numTasks, groupCommDriver, commGroup, GetDefaultCodecConfig());
IScatterSender<int> sender = commGroups[0].GetScatterSender<int>(operatorName);
IScatterReceiver<int> receiver1 = commGroups[1].GetScatterReceiver<int>(operatorName);
IScatterReceiver<int> receiver2 = commGroups[2].GetScatterReceiver<int>(operatorName);
IScatterReceiver<int> receiver3 = commGroups[3].GetScatterReceiver<int>(operatorName);
IScatterReceiver<int> receiver4 = commGroups[4].GetScatterReceiver<int>(operatorName);
Assert.NotNull(sender);
Assert.NotNull(receiver1);
Assert.NotNull(receiver2);
Assert.NotNull(receiver3);
Assert.NotNull(receiver4);
List<int> data = new List<int> { 1, 2, 3, 4 };
sender.Send(data);
var receved1 = receiver1.Receive().ToArray();
Assert.Equal(1, receved1[0]);
Assert.Equal(2, receved1[1]);
var receved2 = receiver2.Receive().ToArray();
Assert.Equal(3, receved2[0]);
Assert.Equal(4, receved2[1]);
Assert.Equal(1, receiver3.Receive().Single());
Assert.Equal(2, receiver4.Receive().Single());
}
[Fact]
public void TestScatterOperator2()
{
string groupName = "group1";
string operatorName = "scatter";
string masterTaskId = "task0";
string driverId = "Driver Id";
int numTasks = 5;
int fanOut = 2;
var groupCommDriver = GroupCommunicationTests.GetInstanceOfGroupCommDriver(driverId, masterTaskId, groupName, fanOut, numTasks);
ICommunicationGroupDriver commGroup = groupCommDriver.DefaultGroup
.AddScatter<int>(operatorName, masterTaskId, TopologyTypes.Tree, GetDefaultDataConverterConfig())
.Build();
var commGroups = GroupCommunicationTests.CommGroupClients(groupName, numTasks, groupCommDriver, commGroup, GetDefaultCodecConfig());
IScatterSender<int> sender = commGroups[0].GetScatterSender<int>(operatorName);
IScatterReceiver<int> receiver1 = commGroups[1].GetScatterReceiver<int>(operatorName);
IScatterReceiver<int> receiver2 = commGroups[2].GetScatterReceiver<int>(operatorName);
IScatterReceiver<int> receiver3 = commGroups[3].GetScatterReceiver<int>(operatorName);
IScatterReceiver<int> receiver4 = commGroups[4].GetScatterReceiver<int>(operatorName);
Assert.NotNull(sender);
Assert.NotNull(receiver1);
Assert.NotNull(receiver2);
Assert.NotNull(receiver3);
Assert.NotNull(receiver4);
List<int> data = new List<int> { 1, 2, 3, 4, 5, 6, 7, 8 };
sender.Send(data);
var data1 = receiver1.Receive().ToArray();
Assert.Equal(1, data1[0]);
Assert.Equal(2, data1[1]);
Assert.Equal(3, data1[2]);
Assert.Equal(4, data1[3]);
var data2 = receiver2.Receive().ToArray();
Assert.Equal(5, data2[0]);
Assert.Equal(6, data2[1]);
Assert.Equal(7, data2[2]);
Assert.Equal(8, data2[3]);
var data3 = receiver3.Receive();
Assert.Equal(1, data3.First());
Assert.Equal(2, data3.Last());
var data4 = receiver4.Receive();
Assert.Equal(3, data4.First());
Assert.Equal(4, data4.Last());
}
[Fact]
public void TestScatterOperator3()
{
string groupName = "group1";
string operatorName = "scatter";
string masterTaskId = "task0";
string driverId = "Driver Id";
int numTasks = 4;
int fanOut = 2;
var groupCommDriver = GroupCommunicationTests.GetInstanceOfGroupCommDriver(driverId, masterTaskId, groupName, fanOut, numTasks);
ICommunicationGroupDriver commGroup = groupCommDriver.DefaultGroup
.AddScatter<int>(operatorName, masterTaskId, TopologyTypes.Tree, GetDefaultDataConverterConfig())
.Build();
var commGroups = GroupCommunicationTests.CommGroupClients(groupName, numTasks, groupCommDriver, commGroup, GetDefaultCodecConfig());
IScatterSender<int> sender = commGroups[0].GetScatterSender<int>(operatorName);
IScatterReceiver<int> receiver1 = commGroups[1].GetScatterReceiver<int>(operatorName);
IScatterReceiver<int> receiver2 = commGroups[2].GetScatterReceiver<int>(operatorName);
IScatterReceiver<int> receiver3 = commGroups[3].GetScatterReceiver<int>(operatorName);
Assert.NotNull(sender);
Assert.NotNull(receiver1);
Assert.NotNull(receiver2);
Assert.NotNull(receiver3);
List<int> data = new List<int> { 1, 2, 3, 4, 5, 6, 7, 8 };
sender.Send(data);
var data1 = receiver1.Receive().ToArray();
Assert.Equal(1, data1[0]);
Assert.Equal(2, data1[1]);
Assert.Equal(3, data1[2]);
Assert.Equal(4, data1[3]);
var data2 = receiver2.Receive().ToArray();
Assert.Equal(5, data2[0]);
Assert.Equal(6, data2[1]);
Assert.Equal(7, data2[2]);
Assert.Equal(8, data2[3]);
var data3 = receiver3.Receive().ToArray();
Assert.Equal(1, data3[0]);
Assert.Equal(2, data3[1]);
Assert.Equal(3, data3[2]);
Assert.Equal(4, data3[3]);
}
[Fact]
public void TestScatterOperator4()
{
string groupName = "group1";
string operatorName = "scatter";
string masterTaskId = "task0";
string driverId = "Driver Id";
int numTasks = 4;
int fanOut = 2;
var groupCommDriver = GroupCommunicationTests.GetInstanceOfGroupCommDriver(driverId, masterTaskId, groupName, fanOut, numTasks);
ICommunicationGroupDriver commGroup = groupCommDriver.DefaultGroup
.AddScatter<int>(operatorName, masterTaskId, TopologyTypes.Tree, GetDefaultDataConverterConfig())
.Build();
var commGroups = GroupCommunicationTests.CommGroupClients(groupName, numTasks, groupCommDriver, commGroup, GetDefaultCodecConfig());
IScatterSender<int> sender = commGroups[0].GetScatterSender<int>(operatorName);
IScatterReceiver<int> receiver1 = commGroups[1].GetScatterReceiver<int>(operatorName);
IScatterReceiver<int> receiver2 = commGroups[2].GetScatterReceiver<int>(operatorName);
IScatterReceiver<int> receiver3 = commGroups[3].GetScatterReceiver<int>(operatorName);
Assert.NotNull(sender);
Assert.NotNull(receiver1);
Assert.NotNull(receiver2);
Assert.NotNull(receiver3);
List<int> data = new List<int> { 1, 2, 3, 4, 5, 6, 7, 8 };
List<string> order = new List<string> { "task2", "task1" };
sender.Send(data, order);
var data2 = receiver2.Receive().ToArray();
Assert.Equal(1, data2[0]);
Assert.Equal(2, data2[1]);
Assert.Equal(3, data2[2]);
Assert.Equal(4, data2[3]);
var data1 = receiver1.Receive().ToArray();
Assert.Equal(5, data1[0]);
Assert.Equal(6, data1[1]);
Assert.Equal(7, data1[2]);
Assert.Equal(8, data1[3]);
var data3 = receiver3.Receive().ToArray();
Assert.Equal(5, data3[0]);
Assert.Equal(6, data3[1]);
Assert.Equal(7, data3[2]);
Assert.Equal(8, data3[3]);
}
[Fact]
public void TestScatterOperator5()
{
string groupName = "group1";
string operatorName = "scatter";
string masterTaskId = "task0";
string driverId = "Driver Id";
int numTasks = 6;
int fanOut = 2;
var groupCommDriver = GroupCommunicationTests.GetInstanceOfGroupCommDriver(driverId, masterTaskId, groupName, fanOut, numTasks);
ICommunicationGroupDriver commGroup = groupCommDriver.DefaultGroup
.AddScatter<int>(operatorName, masterTaskId, TopologyTypes.Tree, GetDefaultDataConverterConfig())
.Build();
var commGroups = GroupCommunicationTests.CommGroupClients(groupName, numTasks, groupCommDriver, commGroup, GetDefaultCodecConfig());
IScatterSender<int> sender = commGroups[0].GetScatterSender<int>(operatorName);
IScatterReceiver<int> receiver1 = commGroups[1].GetScatterReceiver<int>(operatorName);
IScatterReceiver<int> receiver2 = commGroups[2].GetScatterReceiver<int>(operatorName);
IScatterReceiver<int> receiver3 = commGroups[3].GetScatterReceiver<int>(operatorName);
IScatterReceiver<int> receiver4 = commGroups[4].GetScatterReceiver<int>(operatorName);
IScatterReceiver<int> receiver5 = commGroups[5].GetScatterReceiver<int>(operatorName);
Assert.NotNull(sender);
Assert.NotNull(receiver1);
Assert.NotNull(receiver2);
Assert.NotNull(receiver3);
Assert.NotNull(receiver4);
Assert.NotNull(receiver5);
List<int> data = new List<int> { 1, 2, 3, 4, 5, 6, 7, 8 };
sender.Send(data);
var data1 = receiver1.Receive().ToArray();
Assert.Equal(1, data1[0]);
Assert.Equal(2, data1[1]);
Assert.Equal(3, data1[2]);
Assert.Equal(4, data1[3]);
var data2 = receiver2.Receive().ToArray();
Assert.Equal(5, data2[0]);
Assert.Equal(6, data2[1]);
Assert.Equal(7, data2[2]);
Assert.Equal(8, data2[3]);
var data3 = receiver3.Receive().ToArray();
Assert.Equal(1, data3[0]);
Assert.Equal(2, data3[1]);
var data4 = receiver4.Receive().ToArray();
Assert.Equal(3, data4[0]);
Assert.Equal(4, data4[1]);
var data5 = receiver5.Receive().ToArray();
Assert.Equal(5, data5[0]);
Assert.Equal(6, data5[1]);
Assert.Equal(7, data5[2]);
Assert.Equal(8, data5[3]);
}
[Fact]
public void TestScatterReduceOperators()
{
string groupName = "group1";
string scatterOperatorName = "scatter";
string reduceOperatorName = "reduce";
string masterTaskId = "task0";
string driverId = "Driver Id";
int numTasks = 5;
int fanOut = 2;
var groupCommDriver = GroupCommunicationTests.GetInstanceOfGroupCommDriver(driverId, masterTaskId, groupName, fanOut, numTasks);
ICommunicationGroupDriver commGroup = groupCommDriver.DefaultGroup
.AddScatter<int>(
scatterOperatorName,
masterTaskId,
TopologyTypes.Tree,
GetDefaultDataConverterConfig())
.AddReduce<int>(
reduceOperatorName,
masterTaskId,
TopologyTypes.Tree,
GetDefaultDataConverterConfig(),
GetDefaultReduceFuncConfig())
.Build();
var commGroups = GroupCommunicationTests.CommGroupClients(groupName, numTasks, groupCommDriver, commGroup, GetDefaultCodecConfig());
IScatterSender<int> sender = commGroups[0].GetScatterSender<int>(scatterOperatorName);
IReduceReceiver<int> sumReducer = commGroups[0].GetReduceReceiver<int>(reduceOperatorName);
IScatterReceiver<int> receiver1 = commGroups[1].GetScatterReceiver<int>(scatterOperatorName);
IReduceSender<int> sumSender1 = commGroups[1].GetReduceSender<int>(reduceOperatorName);
IScatterReceiver<int> receiver2 = commGroups[2].GetScatterReceiver<int>(scatterOperatorName);
IReduceSender<int> sumSender2 = commGroups[2].GetReduceSender<int>(reduceOperatorName);
IScatterReceiver<int> receiver3 = commGroups[3].GetScatterReceiver<int>(scatterOperatorName);
IReduceSender<int> sumSender3 = commGroups[3].GetReduceSender<int>(reduceOperatorName);
IScatterReceiver<int> receiver4 = commGroups[4].GetScatterReceiver<int>(scatterOperatorName);
IReduceSender<int> sumSender4 = commGroups[4].GetReduceSender<int>(reduceOperatorName);
Assert.NotNull(sender);
Assert.NotNull(receiver1);
Assert.NotNull(receiver2);
Assert.NotNull(receiver3);
Assert.NotNull(receiver4);
List<int> data = Enumerable.Range(1, 100).ToList();
sender.Send(data);
List<int> data1 = receiver1.Receive();
List<int> data2 = receiver2.Receive();
List<int> data3 = receiver3.Receive();
List<int> data4 = receiver4.Receive();
int sum3 = data3.Sum();
sumSender3.Send(sum3);
int sum4 = data4.Sum();
sumSender4.Send(sum4);
int sum2 = data2.Sum();
sumSender2.Send(sum2);
int sum1 = data1.Sum();
sumSender1.Send(sum1);
int sum = sumReducer.Reduce();
Assert.Equal(sum, 6325);
}
private IConfiguration GetDefaultCodecConfig()
{
return StreamingCodecConfiguration<int>.Conf
.Set(StreamingCodecConfiguration<int>.Codec, GenericType<IntStreamingCodec>.Class)
.Build();
}
private IConfiguration GetDefaultReduceFuncConfig()
{
return ReduceFunctionConfiguration<int>.Conf
.Set(ReduceFunctionConfiguration<int>.ReduceFunction, GenericType<SumFunction>.Class)
.Build();
}
private IConfiguration GetDefaultDataConverterConfig()
{
return PipelineDataConverterConfiguration<int>.Conf
.Set(PipelineDataConverterConfiguration<int>.DataConverter, GenericType<DefaultPipelineDataConverter<int>>.Class)
.Build();
}
}
}