blob: fc8e689ca4d87ba63069d88ea5cb99df8b35880a [file] [log] [blame]
/*
*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*
*/
package org.apache.qpid.transport.network.mina;
import java.net.BindException;
import java.net.InetAddress;
import java.net.InetSocketAddress;
import java.net.SocketAddress;
import java.net.UnknownHostException;
import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import junit.framework.TestCase;
import org.apache.qpid.framing.AMQDataBlock;
import org.apache.qpid.protocol.ProtocolEngine;
import org.apache.qpid.protocol.ProtocolEngineFactory;
import org.apache.qpid.transport.NetworkDriver;
import org.apache.qpid.transport.OpenException;
public class MINANetworkDriverTest extends TestCase
{
private static final String TEST_DATA = "YHALOTHAR";
private static int TEST_PORT = 2323;
private NetworkDriver _server;
private NetworkDriver _client;
private CountingProtocolEngine _countingEngine; // Keeps a count of how many bytes it's read
private Exception _thrownEx;
@Override
public void setUp()
{
_server = new MINANetworkDriver();
_client = new MINANetworkDriver();
_thrownEx = null;
_countingEngine = new CountingProtocolEngine();
// increment the port to prevent tests clashing with each other when
// the port is in TIMED_WAIT state.
TEST_PORT++;
}
@Override
public void tearDown()
{
if (_server != null)
{
_server.close();
}
if (_client != null)
{
_client.close();
}
}
/**
* Tests that a socket can't be opened if a driver hasn't been bound
* to the port and can be opened if a driver has been bound.
* @throws BindException
* @throws UnknownHostException
* @throws OpenException
*/
public void testBindOpen() throws BindException, UnknownHostException, OpenException
{
try
{
_client.open(TEST_PORT, InetAddress.getLocalHost(), _countingEngine, null, null);
}
catch (OpenException e)
{
_thrownEx = e;
}
assertNotNull("Open should have failed since no engine bound", _thrownEx);
_server.bind(TEST_PORT, null, new EchoProtocolEngineSingletonFactory(), null, null);
_client.open(TEST_PORT, InetAddress.getLocalHost(), _countingEngine, null, null);
}
/**
* Tests that a socket can't be opened after a bound NetworkDriver has been closed
* @throws BindException
* @throws UnknownHostException
* @throws OpenException
*/
public void testBindOpenCloseOpen() throws BindException, UnknownHostException, OpenException
{
_server.bind(TEST_PORT, null, new EchoProtocolEngineSingletonFactory(), null, null);
_client.open(TEST_PORT, InetAddress.getLocalHost(), _countingEngine, null, null);
_client.close();
_server.close();
try
{
_client.open(TEST_PORT, InetAddress.getLocalHost(), _countingEngine, null, null);
}
catch (OpenException e)
{
_thrownEx = e;
}
assertNotNull("Open should have failed", _thrownEx);
}
/**
* Checks that the right exception is thrown when binding a NetworkDriver to an already
* existing socket.
*/
public void testBindPortInUse()
{
try
{
_server.bind(TEST_PORT, null, new EchoProtocolEngineSingletonFactory(), null, null);
}
catch (BindException e)
{
fail("First bind should not fail");
}
try
{
_client.bind(TEST_PORT, null, new EchoProtocolEngineSingletonFactory(), null, null);
}
catch (BindException e)
{
_thrownEx = e;
}
assertNotNull("Second bind should throw BindException", _thrownEx);
}
/**
* tests that bytes sent on a network driver are received at the other end
*
* @throws UnknownHostException
* @throws OpenException
* @throws InterruptedException
* @throws BindException
*/
public void testSend() throws UnknownHostException, OpenException, InterruptedException, BindException
{
// Open a connection from a counting engine to an echo engine
_server.bind(TEST_PORT, null, new EchoProtocolEngineSingletonFactory(), null, null);
_client.open(TEST_PORT, InetAddress.getLocalHost(), _countingEngine, null, null);
// Tell the counting engine how much data we're sending
_countingEngine.setNewLatch(TEST_DATA.getBytes().length);
// Send the data and wait for up to 2 seconds to get it back
_client.send(ByteBuffer.wrap(TEST_DATA.getBytes()));
_countingEngine.getLatch().await(2, TimeUnit.SECONDS);
// Check what we got
assertEquals("Wrong amount of data recieved", TEST_DATA.getBytes().length, _countingEngine.getReadBytes());
}
/**
* Opens a connection with a low read idle and check that it gets triggered
* @throws BindException
* @throws OpenException
* @throws UnknownHostException
*
*/
public void testSetReadIdle() throws BindException, UnknownHostException, OpenException
{
// Open a connection from a counting engine to an echo engine
_server.bind(TEST_PORT, null, new EchoProtocolEngineSingletonFactory(), null, null);
_client.open(TEST_PORT, InetAddress.getLocalHost(), _countingEngine, null, null);
assertFalse("Reader should not have been idle", _countingEngine.getReaderHasBeenIdle());
_client.setMaxReadIdle(1);
sleepForAtLeast(1500);
assertTrue("Reader should have been idle", _countingEngine.getReaderHasBeenIdle());
}
/**
* Opens a connection with a low write idle and check that it gets triggered
* @throws BindException
* @throws OpenException
* @throws UnknownHostException
*
*/
public void testSetWriteIdle() throws BindException, UnknownHostException, OpenException
{
// Open a connection from a counting engine to an echo engine
_server.bind(TEST_PORT, null, new EchoProtocolEngineSingletonFactory(), null, null);
_client.open(TEST_PORT, InetAddress.getLocalHost(), _countingEngine, null, null);
assertFalse("Reader should not have been idle", _countingEngine.getWriterHasBeenIdle());
_client.setMaxWriteIdle(1);
sleepForAtLeast(1500);
assertTrue("Reader should have been idle", _countingEngine.getWriterHasBeenIdle());
}
/**
* Creates and then closes a connection from client to server and checks that the server
* has its closed() method called. Then creates a new client and closes the server to check
* that the client has its closed() method called.
* @throws BindException
* @throws UnknownHostException
* @throws OpenException
*/
public void testClosed() throws BindException, UnknownHostException, OpenException
{
// Open a connection from a counting engine to an echo engine
EchoProtocolEngineSingletonFactory factory = new EchoProtocolEngineSingletonFactory();
_server.bind(TEST_PORT, null, factory, null, null);
_client.open(TEST_PORT, InetAddress.getLocalHost(), _countingEngine, null, null);
EchoProtocolEngine serverEngine = null;
while (serverEngine == null)
{
serverEngine = factory.getEngine();
if (serverEngine == null)
{
try
{
Thread.sleep(10);
}
catch (InterruptedException e)
{
}
}
}
assertFalse("Server should not have been closed", serverEngine.getClosed());
serverEngine.setNewLatch(1);
_client.close();
try
{
serverEngine.getLatch().await(2, TimeUnit.SECONDS);
}
catch (InterruptedException e)
{
}
assertTrue("Server should have been closed", serverEngine.getClosed());
_client.open(TEST_PORT, InetAddress.getLocalHost(), _countingEngine, null, null);
_countingEngine.setClosed(false);
assertFalse("Client should not have been closed", _countingEngine.getClosed());
_countingEngine.setNewLatch(1);
_server.close();
try
{
_countingEngine.getLatch().await(2, TimeUnit.SECONDS);
}
catch (InterruptedException e)
{
}
assertTrue("Client should have been closed", _countingEngine.getClosed());
}
/**
* Create a connection and instruct the client to throw an exception when it gets some data
* and that the latch gets counted down.
* @throws BindException
* @throws UnknownHostException
* @throws OpenException
* @throws InterruptedException
*/
public void testExceptionCaught() throws BindException, UnknownHostException, OpenException, InterruptedException
{
_server.bind(TEST_PORT, null, new EchoProtocolEngineSingletonFactory(), null, null);
_client.open(TEST_PORT, InetAddress.getLocalHost(), _countingEngine, null, null);
assertEquals("Exception should not have been thrown", 1,
_countingEngine.getExceptionLatch().getCount());
_countingEngine.setErrorOnNextRead(true);
_countingEngine.setNewLatch(TEST_DATA.getBytes().length);
_client.send(ByteBuffer.wrap(TEST_DATA.getBytes()));
_countingEngine.getExceptionLatch().await(2, TimeUnit.SECONDS);
assertEquals("Exception should have been thrown", 0,
_countingEngine.getExceptionLatch().getCount());
}
/**
* Opens a connection and checks that the remote address is the one that was asked for
* @throws BindException
* @throws UnknownHostException
* @throws OpenException
*/
public void testGetRemoteAddress() throws BindException, UnknownHostException, OpenException
{
_server.bind(TEST_PORT, null, new EchoProtocolEngineSingletonFactory(), null, null);
_client.open(TEST_PORT, InetAddress.getLocalHost(), _countingEngine, null, null);
assertEquals(new InetSocketAddress(InetAddress.getLocalHost(), TEST_PORT),
_client.getRemoteAddress());
}
private class EchoProtocolEngineSingletonFactory implements ProtocolEngineFactory
{
EchoProtocolEngine _engine = null;
public ProtocolEngine newProtocolEngine(NetworkDriver driver)
{
if (_engine == null)
{
_engine = new EchoProtocolEngine();
_engine.setNetworkDriver(driver);
}
return getEngine();
}
public EchoProtocolEngine getEngine()
{
return _engine;
}
}
public class CountingProtocolEngine implements ProtocolEngine
{
protected NetworkDriver _driver;
public ArrayList<ByteBuffer> _receivedBytes = new ArrayList<ByteBuffer>();
private int _readBytes;
private CountDownLatch _latch = new CountDownLatch(0);
private boolean _readerHasBeenIdle;
private boolean _writerHasBeenIdle;
private boolean _closed = false;
private boolean _nextReadErrors = false;
private CountDownLatch _exceptionLatch = new CountDownLatch(1);
public void closed()
{
setClosed(true);
_latch.countDown();
}
public void setErrorOnNextRead(boolean b)
{
_nextReadErrors = b;
}
public void setNewLatch(int length)
{
_latch = new CountDownLatch(length);
}
public long getReadBytes()
{
return _readBytes;
}
public SocketAddress getRemoteAddress()
{
if (_driver != null)
{
return _driver.getRemoteAddress();
}
else
{
return null;
}
}
public SocketAddress getLocalAddress()
{
if (_driver != null)
{
return _driver.getLocalAddress();
}
else
{
return null;
}
}
public long getWrittenBytes()
{
return 0;
}
public void readerIdle()
{
_readerHasBeenIdle = true;
}
public void setNetworkDriver(NetworkDriver driver)
{
_driver = driver;
}
public void writeFrame(AMQDataBlock frame)
{
}
public void writerIdle()
{
_writerHasBeenIdle = true;
}
public void exception(Throwable t)
{
_exceptionLatch.countDown();
}
public CountDownLatch getExceptionLatch()
{
return _exceptionLatch;
}
public void received(ByteBuffer msg)
{
// increment read bytes and count down the latch for that many
int bytes = msg.remaining();
_readBytes += bytes;
for (int i = 0; i < bytes; i++)
{
_latch.countDown();
}
// Throw an error if we've been asked too, but we can still count
if (_nextReadErrors)
{
throw new RuntimeException("Was asked to error");
}
}
public CountDownLatch getLatch()
{
return _latch;
}
public boolean getWriterHasBeenIdle()
{
return _writerHasBeenIdle;
}
public boolean getReaderHasBeenIdle()
{
return _readerHasBeenIdle;
}
public void setClosed(boolean _closed)
{
this._closed = _closed;
}
public boolean getClosed()
{
return _closed;
}
}
private class EchoProtocolEngine extends CountingProtocolEngine
{
public void received(ByteBuffer msg)
{
super.received(msg);
msg.rewind();
_driver.send(msg);
}
}
public static void sleepForAtLeast(long period)
{
long start = System.currentTimeMillis();
long timeLeft = period;
while (timeLeft > 0)
{
try
{
Thread.sleep(timeLeft);
}
catch (InterruptedException e)
{
// Ignore it
}
timeLeft = period - (System.currentTimeMillis() - start);
}
}
}