blob: 74a18093946cc8c166d01c47a254d007e00a707a [file] [log] [blame]
/*
*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*
*/
package org.apache.qpid.client.protocol;
import java.net.InetAddress;
import java.net.InetSocketAddress;
import java.net.SocketAddress;
import java.net.UnknownHostException;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.apache.qpid.AMQException;
import org.apache.qpid.QpidException;
import org.apache.qpid.client.AMQAuthenticationException;
import org.apache.qpid.client.AMQProtocolHandler;
import org.apache.qpid.client.MockAMQConnection;
import org.apache.qpid.client.state.AMQState;
import org.apache.qpid.client.transport.TestNetworkConnection;
import org.apache.qpid.framing.AMQBody;
import org.apache.qpid.framing.AMQFrame;
import org.apache.qpid.framing.AMQMethodBody;
import org.apache.qpid.framing.BasicRecoverSyncOkBody;
import org.apache.qpid.framing.ProtocolVersion;
import org.apache.qpid.protocol.ErrorCodes;
import org.apache.qpid.test.utils.QpidTestCase;
/**
* This is a test address QPID-1431 where frame listeners would fail to be notified of an incomming exception.
*
* Currently we do checks at the Session level to ensure that the connection/session are open. However, it is possible
* for the connection to close AFTER this check has been performed.
*
* Performing a similar check at the frameListener level in AMQProtocolHandler makes most sence as this will prevent
* listening when there can be no returning frames.
*
* With the correction in place it also means that the new listener will either make it on to the list for notification
* or it will be notified of any existing exception due to the connection being closed.
*
* There may still be an issue in this space if the client utilises a second thread to close the session as there will
* be no exception set to throw and so the wait will occur. That said when the session is closed the framelisteners
* should be notified. Not sure this is tested.
*/
public class AMQProtocolHandlerTest extends QpidTestCase
{
private static final Logger _logger = LoggerFactory.getLogger(AMQProtocolHandlerTest.class);
// The handler to test
private AMQProtocolHandler _handler;
// A frame to block upon whilst waiting the exception
private AMQFrame _blockFrame;
// Latch to know when the listener receives an exception
private CountDownLatch _handleCountDown;
// The listener that will receive an exception
private BlockToAccessFrameListener _listener;
@Override
public void setUp() throws Exception
{
super.setUp();
//Create a new ProtocolHandler with a fake connection.
_handler = new AMQProtocolHandler(new MockAMQConnection("amqp://guest:guest@client/test?brokerlist='tcp://localhost:1'"));
_handler.setNetworkConnection(new TestNetworkConnection());
AMQBody body = new BasicRecoverSyncOkBody(ProtocolVersion.v0_8);
_blockFrame = new AMQFrame(0, body);
_handleCountDown = new CountDownLatch(1);
_logger.info("Creating _Listener that should also receive the thrown exception.");
_listener = new BlockToAccessFrameListener(1);
}
/**
* There are two paths based on the type of exception thrown.
*
* This tests that when an AMQException is thrown we get the same type of AMQException back with the real exception
* wrapped as the cause.
*
* @throws InterruptedException - if we are unable to wait for the test signals
*/
public void testFrameListenerUpdateWithAMQException() throws InterruptedException
{
AMQAuthenticationException trigger = new AMQAuthenticationException(
"AMQPHTest", new RuntimeException());
performWithException(trigger);
Exception receivedException = _listener.getReceivedException();
assertEquals("Return exception was not the expected type",
AMQAuthenticationException.class, receivedException.getClass());
assertEquals("The _Listener did not receive the correct error code",
trigger.getErrorCode(), ((AMQAuthenticationException)receivedException).getErrorCode());
}
/**
* There are two paths based on the type of exception thrown.
*
* This tests that when a generic Exception is thrown we get the exception back wrapped in a AMQException
* as the cause.
* @throws InterruptedException - if we are unable to wait for the test signals
*/
public void testFrameListenerUpdateWithException() throws InterruptedException
{
Exception trigger = new Exception(new RuntimeException());
performWithException(trigger);
assertEquals("The _Listener did not receive the correct error code",
ErrorCodes.INTERNAL_ERROR, ((AMQException)_listener.getReceivedException()).getErrorCode());
}
public void testTemporaryQueueWildcard() throws UnknownHostException
{
checkTempQueueName(new InetSocketAddress(1234), "tmp_0_0_0_0_0_0_0_0_1234_1");
}
public void testTemporaryQueueLocalhostAddr() throws UnknownHostException
{
checkTempQueueName(new InetSocketAddress(InetAddress.getByName("127.0.0.1"), 1234), "tmp_127_0_0_1_1234_1");
}
public void testTemporaryQueueLocalhostName() throws UnknownHostException
{
checkTempQueueName(new InetSocketAddress(InetAddress.getByName("localhost"), 1234), "tmp_localhost_127_0_0_1_1234_1");
}
public void testTemporaryQueueInet4() throws UnknownHostException
{
checkTempQueueName(new InetSocketAddress(InetAddress.getByName("192.168.1.2"), 1234), "tmp_192_168_1_2_1234_1");
}
public void testTemporaryQueueInet6() throws UnknownHostException
{
checkTempQueueName(new InetSocketAddress(InetAddress.getByName("1080:0:0:0:8:800:200C:417A"), 1234), "tmp_1080_0_0_0_8_800_200c_417a_1234_1");
}
private void checkTempQueueName(SocketAddress address, String queueName)
{
TestNetworkConnection networkConnection = new TestNetworkConnection();
networkConnection.setLocalAddress(address);
_handler.setNetworkConnection(networkConnection);
assertEquals("Wrong queue name", queueName, _handler.generateQueueName());
}
/**
* This is the main test method for both test cases.
*
* What occurs is that we create a new thread that will block (<30s[DEFAULT]) for a frame or exception to occur .
*
* We use a CountDownLatch to ensure that the new thread is running before we then yield and sleep to help ensure
* the new thread has entered the synchronized block in the writeCommandFrameAndWaitForReply.
*
* We can then ack like an the incomming exception handler in (ConnectionCloseMethodHandler).
*
* We fire the error to the stateManager, which in this case will recored the error as there are no state listeners.
*
* We then set the connection to be closed, as we would normally close the socket at this point.
*
* Then fire the exception in to any frameListeners.
*
* The blocked listener (created above) when receiving the error simulates the user by creating a new request to
* block for a frame.
*
* This request should fail. Prior to the fix this will fail with a NPE as we are attempting to use a null listener
* in the writeCommand.... call L:268.
*
* This highlights that the listener would be added dispite there being a pending error state that the listener will
* miss as it is not currently part of the _frameListeners set that is being notified by the iterator.
*
* The method waits to ensure that an exception is received before returning.
*
* The calling methods validate that exception that was received based on the one they sent in.
*
* @param trigger The exception to throw through the handler
*/
private void performWithException(Exception trigger) throws InterruptedException
{
final CountDownLatch callingWriteCommand = new CountDownLatch(1);
//Set an initial listener that will allow us to create a new blocking method
new Thread(new Runnable()
{
public void run()
{
try
{
_logger.info("At initial block, signalling to fire new exception");
callingWriteCommand.countDown();
_handler.writeCommandFrameAndWaitForReply(_blockFrame, _listener);
}
catch (Exception e)
{
_logger.error(e.getMessage(), e);
fail(e.getMessage());
}
}
}).start();
_logger.info("Waiting for 'initial block' to start ");
if (!callingWriteCommand.await(1000, TimeUnit.MILLISECONDS))
{
fail("Failed to start new thread to block for frame");
}
// Do what we can to ensure that this thread does not continue before the above thread has hit the synchronized
// block in the writeCommandFrameAndWaitForReply
Thread.yield();
Thread.sleep(1000);
_logger.info("Firing Erorr through state manager. There should be not state waiters here.");
_handler.getStateManager().error(trigger);
_logger.info("Setting state to be CONNECTION_CLOSED.");
_handler.getStateManager().changeState(AMQState.CONNECTION_CLOSED);
_logger.info("Firing exception");
_handler.propagateExceptionToFrameListeners(trigger);
_logger.info("Awaiting notifcation from handler that exception arrived.");
if (!_handleCountDown.await(2000, TimeUnit.MILLISECONDS))
{
fail("Failed to handle exception and timeout has not occured");
}
assertNotNull("The _Listener did not receive the exception", _listener.getReceivedException());
assertTrue("Received exception not an AMQException",
_listener.getReceivedException() instanceof QpidException);
QpidException receivedException = (QpidException) _listener.getReceivedException();
assertTrue("The _Listener did not receive the correct message",
receivedException.getMessage().startsWith(trigger.getMessage()));
assertEquals("The _Listener did not receive the correct cause",
trigger, receivedException.getCause());
assertEquals("The _Listener did not receive the correct sub cause",
trigger.getCause(), receivedException.getCause().getCause());
}
class BlockToAccessFrameListener extends BlockingMethodFrameListener
{
private Exception _receivedException = null;
/**
* Creates a new method listener, that filters incoming method to just those that match the specified channel id.
*
* @param channelId The channel id to filter incoming methods with.
*/
public BlockToAccessFrameListener(int channelId)
{
super(channelId);
_logger.info("Creating a listener:" + this);
}
public boolean processMethod(int channelId, AMQMethodBody frame)
{
return true;
}
@Override
public void error(Exception e)
{
_logger.info("Exception(" + e + ") Received by:" + this);
// Create a new Thread to start the blocking registration.
new Thread(new Runnable()
{
public void run()
{
//Set an initial listener that will allow us to create a new blocking method
try
{
_handler.writeCommandFrameAndWaitForReply(_blockFrame, null, 2000L);
_logger.info("listener(" + this + ") Wait completed");
}
catch (Exception e)
{
_logger.info("listener(" + this + ") threw exception:" + e.getMessage());
_receivedException = e;
}
_logger.info("listener(" + this + ") completed");
_handleCountDown.countDown();
}
}).start();
}
public Exception getReceivedException()
{
return _receivedException;
}
}
}