blob: 8f4a1ca227397d5f379f1d843a17bf32dbbb0510 [file] [log] [blame]
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
using System;
using System.Collections.Concurrent;
using System.Net;
using System.Reactive;
using Org.Apache.REEF.Wake.Avro;
using Org.Apache.REEF.Wake.Remote;
using Org.Apache.REEF.Wake.Remote.Impl;
using Org.Apache.REEF.Tang.Implementations.Tang;
using org.apache.reef.wake.tests.message;
using Xunit;
namespace Org.Apache.REEF.Wake.Tests
{
/// <summary>
/// Observer to receive and verify test message contents.
/// </summary>
internal sealed class TestMessageObserver : IObserver<MessageInstance<AvroTestMessage>>
{
int number;
string data;
public TestMessageObserver(int number, string data)
{
this.number = number;
this.data = data;
}
public void OnNext(MessageInstance<AvroTestMessage> instance)
{
Assert.Equal(instance.message.number, this.number);
Assert.Equal(instance.message.data, this.data);
}
public void OnError(Exception error)
{
throw new NotImplementedException();
}
public void OnCompleted()
{
throw new NotImplementedException();
}
}
[Collection("FunctionalTests")]
public class TestProtocolSerializer
{
/// <summary>
/// Setup two way communication between two remote managers through the loopback
/// network and verify that Avro messages are properly serialized and deserialzied
/// by the ProtocolSerializer class.
/// </summary>
[Fact]
[Trait("Priority", "1")]
public void TestTwoWayCommunication()
{
// Test data.
int[] numbers = { 12, 25 };
string[] strings = { "The first string", "The second string" };
IPAddress listeningAddress = IPAddress.Parse("127.0.0.1");
BlockingCollection<byte[]> queue1 = new BlockingCollection<byte[]>();
BlockingCollection<byte[]> queue2 = new BlockingCollection<byte[]>();
ProtocolSerializer serializer = new ProtocolSerializer(this.GetType().Assembly, "org.apache.reef.wake.tests.message");
IRemoteManagerFactory _remoteManagerFactory = TangFactory.GetTang().NewInjector().GetInstance<IRemoteManagerFactory>();
using (var remoteManager1 = _remoteManagerFactory.GetInstance(listeningAddress, new ByteCodec()))
using (var remoteManager2 = _remoteManagerFactory.GetInstance(listeningAddress, new ByteCodec()))
{
// Register observers for remote manager 1 and remote manager 2
var remoteEndpoint = new IPEndPoint(listeningAddress, 0);
var observer1 = Observer.Create<byte[]>(queue1.Add);
var observer2 = Observer.Create<byte[]>(queue2.Add);
remoteManager1.RegisterObserver(remoteEndpoint, observer1);
remoteManager2.RegisterObserver(remoteEndpoint, observer2);
// Remote manager 1 sends avro message to remote manager 2
var remoteObserver1 = remoteManager1.GetRemoteObserver(remoteManager2.LocalEndpoint);
remoteObserver1.OnNext(serializer.Write(new AvroTestMessage(numbers[0], strings[0]), 1));
// Remote manager 2 sends avro message to remote manager 1
var remoteObserver2 = remoteManager2.GetRemoteObserver(remoteManager1.LocalEndpoint);
remoteObserver2.OnNext(serializer.Write(new AvroTestMessage(numbers[1], strings[1]), 2));
// Verify the messages are properly received.
serializer.Read(queue1.Take(), new TestMessageObserver(numbers[1], strings[1]));
serializer.Read(queue2.Take(), new TestMessageObserver(numbers[0], strings[0]));
}
}
}
}