blob: 29d2b6338f72f2e4fa7cda894bb4e63bb7c62443 [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
using System;
using System.Diagnostics;
using System.Net.Sockets;
using System.Threading;
using Avro.ipc;
using Avro.ipc.Specific;
using NUnit.Framework;
using org.apache.avro.test;
namespace Avro.Test.Ipc
{
[TestFixture]
public class SocketServerWithCallbacksTest
{
private static volatile bool ackFlag;
private static volatile CountdownLatch ackLatch = new CountdownLatch(1);
private SocketServer server;
private SocketTransceiver transceiver;
private SimpleCallback simpleClient;
[TestFixtureSetUp]
public void Init()
{
var responder = new SpecificResponder<Simple>(new SimpleImpl());
server = new SocketServer("localhost", 0, responder);
server.Start();
transceiver = new SocketTransceiver("localhost", server.Port);
simpleClient = SpecificRequestor.CreateClient<SimpleCallback>(transceiver);
}
[TestFixtureTearDown]
public void TearDown()
{
try
{
if (transceiver != null)
{
transceiver.Disconnect();
}
}
catch
{
}
try
{
server.Stop();
}
catch
{
}
}
// AVRO-625 [Test]
public void CancelPendingRequestsOnTransceiverClose()
{
// Start up a second server so that closing the server doesn't
// interfere with the other unit tests:
var blockingSimpleImpl = new BlockingSimpleImpl();
var responder = new SpecificResponder<Simple>(blockingSimpleImpl);
var server2 = new SocketServer("localhost", 0, responder);
server2.Start();
try
{
int serverPort = server2.Port;
var transceiver2 = new SocketTransceiver("localhost", serverPort);
var addFuture = new CallFuture<int>();
try
{
var simpleClient2 = SpecificRequestor.CreateClient<SimpleCallback>(transceiver2);
// The first call has to block for the handshake:
Assert.AreEqual(3, simpleClient2.add(1, 2));
// Now acquire the semaphore so that the server will block:
blockingSimpleImpl.acquireRunPermit();
simpleClient2.add(1, 2, addFuture);
}
finally
{
// When the transceiver is closed, the CallFuture should get
// an IOException
transceiver2.Close();
}
bool ioeThrown = false;
try
{
addFuture.WaitForResult(2000);
}
catch (Exception)
{
}
//catch (ExecutionException e) {
// ioeThrown = e.getCause() instanceof IOException;
// Assert.assertTrue(e.getCause() instanceof IOException);
//} catch (Exception e) {
// e.printStackTrace();
// Assert.fail("Unexpected Exception: " + e.toString());
//}
Assert.IsTrue(ioeThrown, "Expected IOException to be thrown");
}
finally
{
blockingSimpleImpl.releaseRunPermit();
server2.Stop();
}
}
// AVRO-625 [Test]
public void CancelPendingRequestsAfterChannelCloseByServerShutdown()
{
// The purpose of this test is to verify that a client doesn't stay
// blocked when a server is unexpectedly killed (or when for some
// other reason the channel is suddenly closed) while the server
// was in the process of handling a request (thus after it received
// the request, and before it returned the response).
// Start up a second server so that closing the server doesn't
// interfere with the other unit tests:
var blockingSimpleImpl = new BlockingSimpleImpl();
var responder = new SpecificResponder<Simple>(blockingSimpleImpl);
var server2 = new SocketServer("localhost", 0, responder);
server2.Start();
SocketTransceiver transceiver2 = null;
try
{
transceiver2 = new SocketTransceiver("localhost", server2.Port);
var simpleClient2 = SpecificRequestor.CreateClient<SimpleCallback>(transceiver2);
// Acquire the method-enter permit, which will be released by the
// server method once we call it
blockingSimpleImpl.acquireEnterPermit();
// Acquire the run permit, to avoid that the server method returns immediately
blockingSimpleImpl.acquireRunPermit();
var t = new Thread(() =>
{
try
{
simpleClient2.add(3, 4);
Assert.Fail("Expected an exception");
}
catch (Exception)
{
// expected
}
});
// Start client call
t.Start();
// Wait until method is entered on the server side
blockingSimpleImpl.acquireEnterPermit();
// The server side method is now blocked waiting on the run permit
// (= is busy handling the request)
// Stop the server
server2.Stop();
// With the server gone, we expect the client to get some exception and exit
// Wait for client thread to exit
t.Join(10000);
Assert.IsFalse(t.IsAlive, "Client request should not be blocked on server shutdown");
}
finally
{
blockingSimpleImpl.releaseRunPermit();
server2.Stop();
if (transceiver2 != null)
transceiver2.Close();
}
}
private class CallbackCallFuture<T> : CallFuture<T>
{
private readonly Action<Exception> _handleException;
private readonly Action<T> _handleResult;
public CallbackCallFuture(Action<T> handleResult = null, Action<Exception> handleException = null)
{
_handleResult = handleResult;
_handleException = handleException;
}
public override void HandleResult(T result)
{
_handleResult(result);
}
public override void HandleException(Exception exception)
{
_handleException(exception);
}
}
private class NestedCallFuture<T> : CallFuture<T>
{
private readonly CallFuture<T> cf;
public NestedCallFuture(CallFuture<T> cf)
{
this.cf = cf;
}
public override void HandleResult(T result)
{
cf.HandleResult(result);
}
public override void HandleException(Exception exception)
{
cf.HandleException(exception);
}
}
private string Hello(string howAreYou)
{
var response = new CallFuture<string>();
simpleClient.hello(howAreYou, response);
return response.WaitForResult(2000);
}
private void Hello(string howAreYou, CallFuture<string> future1)
{
simpleClient.hello(howAreYou, future1);
}
private class BlockingSimpleImpl : SimpleImpl
{
/** Semaphore that is released when the method is entered. */
private readonly Semaphore enterSemaphore = new Semaphore(1, 1);
/** Semaphore that must be acquired for the method to run and exit. */
private readonly Semaphore runSemaphore = new Semaphore(1, 1);
public override string hello(string greeting)
{
releaseEnterPermit();
acquireRunPermit();
try
{
return base.hello(greeting);
}
finally
{
releaseRunPermit();
}
}
public override TestRecord echo(TestRecord record)
{
releaseEnterPermit();
acquireRunPermit();
try
{
return base.echo(record);
}
finally
{
releaseRunPermit();
}
}
public override int add(int arg1, int arg2)
{
releaseEnterPermit();
acquireRunPermit();
try
{
return base.add(arg1, arg2);
}
finally
{
releaseRunPermit();
}
}
public override byte[] echoBytes(byte[] data)
{
releaseEnterPermit();
acquireRunPermit();
try
{
return base.echoBytes(data);
}
finally
{
releaseRunPermit();
}
}
public override object error()
{
releaseEnterPermit();
acquireRunPermit();
try
{
return base.error();
}
finally
{
releaseRunPermit();
}
}
public override void ack()
{
releaseEnterPermit();
acquireRunPermit();
try
{
base.ack();
}
finally
{
releaseRunPermit();
}
}
/**
* Acquires a single permit from the semaphore.
*/
public void acquireRunPermit()
{
try
{
runSemaphore.WaitOne();
// } catch (InterruptedException e) {
// Thread.currentThread().interrupt();
// throw new RuntimeException(e);
//}
}
finally
{
}
}
/**
* Releases a single permit to the semaphore.
*/
public void releaseRunPermit()
{
try
{
runSemaphore.Release();
}
catch //(SemaphoreFullException)
{
}
}
private void releaseEnterPermit()
{
try
{
enterSemaphore.Release();
}
catch //(SemaphoreFullException)
{
}
}
/**
* Acquires a single permit from the semaphore.
*/
public void acquireEnterPermit()
{
try
{
enterSemaphore.WaitOne();
// } catch (InterruptedException e) {
// Thread.currentThread().interrupt();
// throw new RuntimeException(e);
//}
}
finally
{
}
}
}
[Test]
public void Ack()
{
simpleClient.ack();
ackLatch.Wait(2000);
Assert.IsTrue(ackFlag, "Expected ack flag to be set");
ackLatch = new CountdownLatch(1);
simpleClient.ack();
ackLatch.Wait(2000);
Assert.IsFalse(ackFlag, "Expected ack flag to be cleared");
}
[Test]
public void Add()
{
// Test synchronous RPC:
Assert.AreEqual(8, simpleClient.add(2, 6));
// Test asynchronous RPC (future):
var future1 = new CallFuture<int>();
simpleClient.add(8, 8, future1);
Assert.AreEqual(16, future1.WaitForResult(2000));
Assert.IsNull(future1.Error);
// Test asynchronous RPC (callback):
var future2 = new CallFuture<int>();
simpleClient.add(512, 256, new NestedCallFuture<int>(future2));
Assert.AreEqual(768, future2.WaitForResult(2000));
Assert.IsNull(future2.Error);
}
[Test]
public void ClientReconnectAfterServerRestart()
{
// Start up a second server so that closing the server doesn't
// interfere with the other unit tests:
SimpleImpl simpleImpl = new BlockingSimpleImpl();
var responder = new SpecificResponder<Simple>(simpleImpl);
var server2 = new SocketServer("localhost", 0, responder);
server2.Start();
try
{
int serverPort = server2.Port;
// Initialize a client, and establish a connection to the server:
Transceiver transceiver2 = new SocketTransceiver("localhost", serverPort);
var simpleClient2 =
SpecificRequestor.CreateClient<SimpleCallback>(transceiver2);
Assert.AreEqual(3, simpleClient2.add(1, 2));
// Restart the server:
server2.Stop();
try
{
simpleClient2.add(2, -1);
Assert.Fail("Client should not be able to invoke RPCs because server is no longer running");
}
catch (Exception)
{
// Expected since server is no longer running
}
Thread.Sleep(2000);
server2 = new SocketServer("localhost", serverPort, new SpecificResponder<Simple>(new SimpleImpl()));
server2.Start();
// Invoke an RPC using the same client, which should reestablish the
// connection to the server:
Assert.AreEqual(3, simpleClient2.add(1, 2));
}
finally
{
server2.Stop();
}
}
[Test]
public void Echo()
{
var record = new TestRecord
{
hash =
new MD5
{
Value =
new byte[] {1, 2, 3, 4, 5, 6, 7, 8, 1, 2, 3, 4, 5, 6, 7, 8}
},
kind = Kind.FOO,
name = "My Record"
};
// Test synchronous RPC:
TestRecord testRecord = simpleClient.echo(record);
Assert.AreEqual(record, testRecord);
// Test asynchronous RPC (future):
var future1 = new CallFuture<TestRecord>();
simpleClient.echo(record, future1);
Assert.AreEqual(record, future1.WaitForResult(2000));
Assert.IsNull(future1.Error);
// Test asynchronous RPC (callback):
var future2 = new CallFuture<TestRecord>();
simpleClient.echo(record, new NestedCallFuture<TestRecord>(future2));
Assert.AreEqual(record, future2.WaitForResult(2000));
Assert.IsNull(future2.Error);
}
[Test]
public void EchoBytes()
{
var byteBuffer = new byte[] {1, 2, 3, 4, 5, 6, 7, 8};
// Test synchronous RPC:
Assert.AreEqual(byteBuffer, simpleClient.echoBytes(byteBuffer));
// Test asynchronous RPC (future):
var future1 = new CallFuture<byte[]>();
simpleClient.echoBytes(byteBuffer, future1);
Assert.AreEqual(byteBuffer, future1.WaitForResult(2000));
Assert.IsNull(future1.Error);
// Test asynchronous RPC (callback):
var future2 = new CallFuture<byte[]>();
simpleClient.echoBytes(byteBuffer, new NestedCallFuture<byte[]>(future2));
Assert.AreEqual(byteBuffer, future2.WaitForResult(2000));
Assert.IsNull(future2.Error);
}
[Test, TestCase(false, TestName = "Specific error"), TestCase(true, TestName = "System error")]
public void Error(bool systemError)
{
Type expected;
if(systemError)
{
expected = typeof(Exception);
SimpleImpl.throwSystemError = true;
}
else
{
expected = typeof(TestError);
SimpleImpl.throwSystemError = false;
}
// Test synchronous RPC:
try
{
simpleClient.error();
Assert.Fail("Expected " + expected.Name + " to be thrown");
}
catch (Exception e)
{
Assert.AreEqual(expected, e.GetType());
}
// Test asynchronous RPC (future):
var future = new CallFuture<object>();
simpleClient.error(future);
try
{
future.WaitForResult(2000);
Assert.Fail("Expected " + expected.Name + " to be thrown");
}
catch (Exception e)
{
Assert.AreEqual(expected, e.GetType());
}
Assert.IsNotNull(future.Error);
Assert.AreEqual(expected, future.Error.GetType());
Assert.IsNull(future.Result);
// Test asynchronous RPC (callback):
Exception errorRef = null;
var latch = new CountdownLatch(1);
simpleClient.error(new CallbackCallFuture<object>(
result => Assert.Fail("Expected " + expected.Name),
exception =>
{
errorRef = exception;
latch.Signal();
}));
Assert.IsTrue(latch.Wait(2000), "Timed out waiting for error");
Assert.IsNotNull(errorRef);
Assert.AreEqual(expected, errorRef.GetType());
}
[Test]
public void Greeting()
{
// Test synchronous RPC:
string response = Hello("how are you?");
Assert.AreEqual("Hello, how are you?", response);
// Test asynchronous RPC (future):
var future1 = new CallFuture<String>();
Hello("World!", future1);
string result = future1.WaitForResult();
Assert.AreEqual("Hello, World!", result);
Assert.IsNull(future1.Error);
// Test asynchronous RPC (callback):
var future2 = new CallFuture<String>();
Hello("what's up?", new NestedCallFuture<string>(future2));
Assert.AreEqual("Hello, what's up?", future2.WaitForResult());
Assert.IsNull(future2.Error);
}
//[Test]
public void PerformanceTest()
{
const int threadCount = 8;
const long runTimeMillis = 10*1000L;
long rpcCount = 0;
int[] runFlag = {1};
var startLatch = new CountdownLatch(threadCount);
for (int ii = 0; ii < threadCount; ii++)
{
new Thread(() =>
{
{
try
{
startLatch.Signal();
startLatch.Wait(2000);
while (Interlocked.Add(ref runFlag[0], 0) == 1)
{
Interlocked.Increment(ref rpcCount);
Assert.AreEqual("Hello, World!", simpleClient.hello("World!"));
}
}
catch (Exception e)
{
Console.WriteLine(e);
}
}
}).Start();
}
startLatch.Wait(2000);
Thread.Sleep(2000);
Interlocked.Exchange(ref runFlag[0], 1);
string results = "Completed " + rpcCount + " RPCs in " + runTimeMillis +
"ms => " + ((rpcCount/(double) runTimeMillis)*1000) + " RPCs/sec, " +
(runTimeMillis/(double) rpcCount) + " ms/RPC.";
Debug.WriteLine(results);
}
[Test]
public void TestSendAfterChannelClose()
{
// Start up a second server so that closing the server doesn't
// interfere with the other unit tests:
var responder = new SpecificResponder<Simple>(new SimpleImpl());
var server2 = new SocketServer("localhost", 0, responder);
server2.Start();
try
{
var transceiver2 = new SocketTransceiver("localhost", server2.Port);
try
{
var simpleClient2 = SpecificRequestor.CreateClient<SimpleCallback>(transceiver2);
// Verify that connection works:
Assert.AreEqual(3, simpleClient2.add(1, 2));
// Try again with callbacks:
var addFuture = new CallFuture<int>();
simpleClient2.add(1, 2, addFuture);
Assert.AreEqual(3, addFuture.WaitForResult(2000));
// Shut down server:
server2.Stop();
// Send a new RPC, and verify that it throws an Exception that
// can be detected by the client:
bool ioeCaught = false;
try
{
simpleClient2.add(1, 2);
Assert.Fail("Send after server close should have thrown Exception");
}
catch (SocketException)
{
ioeCaught = true;
}
Assert.IsTrue(ioeCaught, "Expected IOException");
// Send a new RPC with callback, and verify that the correct Exception
// is thrown:
ioeCaught = false;
try
{
addFuture = new CallFuture<int>();
simpleClient2.add(1, 2, addFuture);
addFuture.WaitForResult(2000);
Assert.Fail("Send after server close should have thrown Exception");
}
catch (SocketException)
{
ioeCaught = true;
}
Assert.IsTrue(ioeCaught, "Expected IOException");
}
finally
{
transceiver2.Disconnect();
}
}
finally
{
server2.Stop();
Thread.Sleep(1000);
}
}
private class SimpleImpl : Simple
{
public static bool throwSystemError = false;
public override string hello(string greeting)
{
return "Hello, " + greeting;
}
public override TestRecord echo(TestRecord record)
{
return record;
}
public override int add(int arg1, int arg2)
{
return arg1 + arg2;
}
public override byte[] echoBytes(byte[] data)
{
return data;
}
public override object error()
{
if(throwSystemError)
throw new SystemException("System error");
else
throw new TestError { message = "Test Message" };
}
public override void ack()
{
ackFlag = !ackFlag;
ackLatch.Signal();
}
}
}
}